Scheduling & stats #12

Merged
root merged 4 commits from 10-scheduling into master 1 year ago
  1. 760
      Cargo.lock
  2. 183
      bin/u_agent/src/lib.rs
  3. 2
      bin/u_panel/src/gui/fe/src/app/app.module.ts
  4. 4
      bin/u_panel/src/gui/fe/src/app/components/dialogs/base-info-dialog.component.less
  5. 5
      bin/u_panel/src/gui/fe/src/app/components/payload-overview/payload-overview.component.html
  6. 2
      bin/u_panel/src/gui/fe/src/index.html
  7. 45
      bin/u_server/src/handlers.rs
  8. 2
      integration-tests/docker-compose.yml
  9. 2
      integration-tests/tests/fixtures/agent.rs
  10. 2
      integration-tests/tests/helpers/panel.rs
  11. 51
      integration-tests/tests/integration_tests/behaviour.rs
  12. 4
      lib/u_lib/Cargo.toml
  13. 7
      lib/u_lib/src/api.rs
  14. 10
      lib/u_lib/src/cache.rs
  15. 167
      lib/u_lib/src/executor.rs
  16. 92
      lib/u_lib/src/jobs.rs
  17. 3
      lib/u_lib/src/lib.rs
  18. 4
      lib/u_lib/src/models/agent.rs
  19. 33
      lib/u_lib/src/models/jobs/assigned.rs
  20. 11
      lib/u_lib/src/models/jobs/meta.rs
  21. 2
      lib/u_lib/src/models/jobs/misc.rs
  22. 3
      lib/u_lib/src/models/mod.rs
  23. 2
      lib/u_lib/src/models/payload.rs
  24. 7
      lib/u_lib/src/models/schema.rs
  25. 14
      lib/u_lib/src/models/stats.rs
  26. 2
      lib/u_lib/src/proc_output.rs
  27. 91
      lib/u_lib/src/scheduler/entry.rs
  28. 178
      lib/u_lib/src/scheduler/mod.rs
  29. 200
      lib/u_lib/src/u_runner.rs
  30. 49
      lib/u_lib/src/ufs/index.rs
  31. 59
      lib/u_lib/src/ufs/mod.rs
  32. 2
      migrations/2020-10-24-111622_create_all/up.sql

760
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -1,110 +1,102 @@
#[macro_use]
extern crate log;
use std::process::exit;
use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};
use u_lib::models::PreparedJob;
use u_lib::scheduler::SCHEDULER;
use u_lib::u_runner::{IdentifiableFuture, URunner};
use u_lib::{
api::HttpClient,
cache::JobCache,
config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL},
error::ErrChan,
executor,
jobs::AnonymousJobBatch,
logging::init_logger,
messaging::Reportable,
models::AssignedJobById,
};
async fn process_request(jobs: Vec<AssignedJobById>, client: &HttpClient) {
if !jobs.is_empty() {
for jr in &jobs {
if !JobCache::contains(jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let mut fetched_job = loop {
//todo: use payload cache
match client.get_full_job(jr.job_id).await {
Ok(result) => break result,
Err(err) => {
debug!("{:?} \nretrying...", err);
sleep(AGENT_ITERATION_INTERVAL).await;
}
}
};
if let Some(payload) = &mut fetched_job.payload {
if let Err(e) = payload.maybe_split_payload() {
ErrChan::send(e, "pld").await;
async fn process_request(assigned_jobs: Vec<AssignedJobById>, client: &HttpClient) {
for asgn_job in assigned_jobs {
if !JobCache::contains(asgn_job.job_id) {
info!("Fetching job: {}", &asgn_job.job_id);
let mut fetched_job = loop {
//todo: use payload cache
match client.get_full_job(asgn_job.job_id).await {
Ok(result) => break result,
Err(err) => {
debug!("{:?} \nretrying...", err);
sleep(AGENT_ITERATION_INTERVAL).await;
}
}
JobCache::insert(fetched_job);
};
if let Some(payload) = &mut fetched_job.payload {
if let Err(e) = payload.maybe_split_payload() {
ErrChan::send(e, "pay").await;
continue;
}
}
JobCache::insert(fetched_job);
}
info!(
"Scheduling jobs: {}",
jobs.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join(", ")
);
let meta_with_ids = jobs
.into_iter()
.map(|job| {
let meta = JobCache::get(job.job_id).unwrap().clone();
(meta, job)
})
.collect::<Vec<_>>();
let job = match JobCache::get(asgn_job.job_id).as_deref() {
Some(job) => job.clone(),
None => continue,
};
AnonymousJobBatch::from_meta_with_id(meta_with_ids)
.spawn()
.await;
}
}
info!("Scheduling job {}", job.meta.id.to_string());
async fn error_reporting(client: HttpClient) -> ! {
loop {
match ErrChan::recv().await {
Some(err) => {
'retry: for _ in 0..3 {
match client.report([Reportable::Error(err.clone())]).await {
Ok(_) => break 'retry,
Err(e) => {
debug!("Reporting error: {:?}", e);
sleep(Duration::from_secs(10)).await;
let schedule = match job.meta.schedule.clone() {
Some(sched) => {
if sched.is_empty() {
None
} else {
match sched.as_str().try_into() {
Ok(s) => Some(s),
Err(err) => {
ErrChan::send(err, "sch").await;
continue;
}
}
}
}
None => sleep(Duration::from_secs(3)).await,
}
None => None,
};
SCHEDULER
.add_job(schedule, PreparedJob { job, ids: asgn_job })
.await;
}
}
async fn error_reporting(client: HttpClient) {
while let Some(err) = ErrChan::recv().await {
let _ = client.report([Reportable::Error(err.clone())]).await;
}
}
async fn agent_loop(client: HttpClient) -> ! {
async fn agent_loop(client: HttpClient) {
let self_id = get_self_id();
loop {
match client.get_personal_jobs(self_id).await {
Ok(jobs) => {
process_request(jobs, &client).await;
}
Err(err) => ErrChan::send(err, "processing").await,
match client.get_personal_jobs(self_id).await {
Ok(jobs) => {
process_request(jobs, &client).await;
}
Err(err) => ErrChan::send(err, "pro").await,
}
let result: Vec<Reportable> = executor::pop_completed()
.await
.into_iter()
.map(|result| match result {
Ok(r) => Reportable::Assigned(r),
Err(e) => Reportable::Error(e),
})
.collect();
if !result.is_empty() {
if let Err(err) = client.report(result).await {
ErrChan::send(err, "report").await;
}
let result: Vec<Reportable> = URunner::pop_completed()
.await
.into_iter()
.map(|result| match result {
Ok(r) => Reportable::Assigned(r),
Err(e) => Reportable::Error(e),
})
.collect();
if !result.is_empty() {
if let Err(err) = client.report(result).await {
ErrChan::send(err, "rep").await;
}
sleep(AGENT_ITERATION_INTERVAL).await;
}
}
@ -134,15 +126,42 @@ pub fn run_forever() -> ! {
.build()
.unwrap()
.block_on(async {
match HttpClient::new(&env.u_server, None).await {
Ok(client) => {
tokio::spawn(error_reporting(client.clone()));
agent_loop(client).await
}
Err(e) => {
error!("client init failed: {}", e);
exit(7) // todo: wtf?
let client = loop {
match HttpClient::new(&env.u_server, None).await {
Ok(client) => break client,
Err(e) => {
error!("client init failed: {}", e);
sleep(Duration::from_secs(5)).await;
continue;
}
}
};
{
let client = client.clone();
SCHEDULER
.add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || {
let client = client.clone();
IdentifiableFuture::from_fut_with_ident("error_reporting", async move {
error_reporting(client.clone()).await
})
})
.await;
}
{
let client = client.clone();
SCHEDULER
.add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || {
let client = client.clone();
IdentifiableFuture::from_fut_with_ident("agent_loop", async move {
agent_loop(client).await
})
})
.await;
}
SCHEDULER.start_blocking().await
})
}

@ -12,6 +12,7 @@ import { MatSelectModule } from '@angular/material/select';
import { MatProgressSpinnerModule } from '@angular/material/progress-spinner';
import { HttpClientModule } from '@angular/common/http';
import { MatDialogModule } from '@angular/material/dialog';
import { MatGridListModule } from '@angular/material/grid-list';
import { MatIconModule } from '@angular/material/icon';
import { FormsModule } from '@angular/forms';
import { AgentComponent, JobComponent, ResultComponent, PayloadComponent } from './components/tables';
@ -62,6 +63,7 @@ import { NewPayloadDialogComponent } from './components/dialogs/new-payload-dial
MatSnackBarModule,
MatSelectModule,
MatListModule,
MatGridListModule,
FormsModule,
BrowserAnimationsModule
],

@ -12,3 +12,7 @@ div.info-dialog-forms-box-smol {
float: left;
margin-right: 10px;
}
.code {
font-family: "Roboto Mono", monospace;
}

@ -1,6 +1,7 @@
<mat-form-field class="info-dlg-field" floatLabel="always">
<mat-form-field style="box-sizing:border-box; width:100%" class="info-dlg-field" floatLabel="always">
<mat-label>Payload data</mat-label>
<textarea matInput cdkTextareaAutosize *ngIf="!isTooBigPayload" [readonly]="isPreview" [(ngModel)]="decodedPayload">
<textarea class="code" matInput cdkTextareaAutosize="true" *ngIf="!isTooBigPayload" [readonly]="isPreview"
[(ngModel)]="decodedPayload">
</textarea>
<input matInput *ngIf="isTooBigPayload" disabled placeholder="Payload is too big to display">
</mat-form-field>

@ -7,7 +7,7 @@
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="icon" type="image/x-icon" href="favicon.ico">
<link rel="preconnect" href="https://fonts.gstatic.com">
<link href="https://fonts.googleapis.com/css2?family=Roboto:wght@300;400;500&display=swap" rel="stylesheet">
<link href="https://fonts.googleapis.com/css2?family=Roboto+Mono:wght@300;400;500&display=swap" rel="stylesheet">
<link href="https://fonts.googleapis.com/icon?family=Material+Icons" rel="stylesheet">
</head>

@ -249,7 +249,10 @@ impl Endpoints {
Reportable::Assigned(mut result) => {
let result_agent_id = &result.agent_id;
if agent_id != *result_agent_id {
warn!("Agent ids are not equal! actual id: {agent_id}, id from job: {result_agent_id}");
warn!(
"Agent ids are not equal! actual id: {agent_id}, \
id from job: {result_agent_id}"
);
continue;
}
result.touch();
@ -260,26 +263,23 @@ impl Endpoints {
JobType::Init => {
result.state = JobState::Finished;
match &result.result {
Some(rbytes) => {
let mut agent: Agent = match serde_json::from_slice(&rbytes) {
Ok(a) => a,
Err(e) => {
error!("Error deserializing agent data from {agent_id}: {e}");
continue;
}
};
agent.state = AgentState::Active;
db.upsert_agent(&agent)?;
}
None => error!("Empty agent data"),
}},
JobType::Shell => {
result.state = JobState::Finished
},
JobType::Service => (),
JobType::Terminate => todo!(),
JobType::Update => todo!(),
let mut agent: Agent = match result.deserialize() {
Ok(a) => a,
Err(e) => {
error!(
"Error deserializing agent \
data from {agent_id}: {e}"
);
continue;
}
};
agent.state = AgentState::Active;
db.upsert_agent(&agent)?;
}
JobType::Shell => result.state = JobState::Finished,
JobType::Stats => result.state = JobState::Finished,
JobType::Terminate => (),
JobType::Update => (),
}
db.update_result(&result)?;
}
@ -287,7 +287,8 @@ impl Endpoints {
error!("agent {agent_id} reported: {e}");
}
Reportable::Dummy => (),
}}
}
}
Ok(())
})
.await

@ -40,7 +40,7 @@ services:
networks:
- u_net
ports:
- 54321:5432
- 5432:5432
env_file:
- ../.env
- ../.env.private

@ -25,7 +25,7 @@ pub fn registered_agent(client: &HttpClient) -> RegisteredAgent {
assert_eq!(job.meta.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from((&job.meta, resp));
let mut agent_data = AssignedJob::from(&PreparedJob { job, ids: resp });
agent_data.set_result(&agent);
client

@ -80,7 +80,7 @@ pub trait IntoArgs {
impl IntoArgs for String {
fn into_args(self) -> Vec<String> {
<Self as AsRef<str>>::as_ref(&self).into_args()
self.as_str().into_args()
}
fn display(&self) -> String {

@ -16,7 +16,7 @@ async fn registration(registered_agent: &RegisteredAgent) {
}
#[tokio::test]
async fn setup_tasks() {
async fn setup_jobs() {
let agents: Vec<Agent> = Panel::check_output("agents read");
let agent_id = agents[0].id;
let job_alias = "passwd_contents";
@ -88,3 +88,52 @@ async fn large_payload() {
})
.await;
}
#[tokio::test]
async fn gather_stats() {
let agent = &Panel::check_output::<Vec<Agent>>("agents read")[0];
let job_alias = "stats";
let job = RawJob::default()
.with_type(JobType::Stats)
.with_alias(job_alias)
.with_target_platforms(&agent.platform)
.try_into_job()
.unwrap();
let job_id = job.meta.id;
let agent_id = agent.id;
Panel::check_status(["jobs", "create", &to_string(&RawJob::from(job)).unwrap()]);
let assigned = AssignedJobById {
agent_id,
job_id,
..Default::default()
};
Panel::check_status(["map", "create", &to_string(&[assigned]).unwrap()]);
let result = retry_with_interval(5, AGENT_ITERATION_INTERVAL, || {
let result =
Panel::check_output::<Vec<AssignedJob>>(["map", "read", &job_id.to_string()]).remove(0);
if result.state == JobState::Finished {
Ok(result)
} else {
Err("job didn't finish")
}
})
.await;
let stats = result.deserialize::<Stats>().unwrap();
assert_eq!(stats.agent_id, agent_id);
assert!(stats.cached_jobs.contains(&job_id));
assert!(stats
.scheduled_jobs
.iter()
.find(|j| j.job_ident == "error_reporting")
.is_some());
assert!(stats
.scheduled_jobs
.iter()
.find(|j| j.job_ident == "agent_loop")
.is_some())
}

@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
anyhow = { workspace = true }
chrono = "0.4.19"
chrono = { version = "0.4.19", features = ["serde"] }
diesel = { workspace = true, optional = true }
diesel-derive-enum = { version = "2.0.0", features = ["postgres"], optional = true }
deadpool-diesel = { workspace = true, optional = true }
@ -33,6 +33,8 @@ uuid = { workspace = true, features = ["serde", "v4"] }
parking_lot = "0.12.1"
bincode = "1.3.3"
sha3 = "0.10.7"
cron = "0.12.0"
hex = "0.4.3"
[target.'cfg(unix)'.dependencies]
daemonize = "0.5"

@ -233,11 +233,8 @@ impl HttpClient {
&self,
assigned: impl IntoIterator<Item = &AssignedJobById>,
) -> Result<api_types::SetJobs> {
self.req_with_payload(
format!("assign_jobs"),
&assigned.into_iter().collect::<Vec<_>>(),
)
.await
self.req_with_payload("assign_jobs", &assigned.into_iter().collect::<Vec<_>>())
.await
}
/// get jobs for any agent by job_id, agent_id or result_id

@ -22,7 +22,7 @@ impl JobCache {
JOB_CACHE.read().contains_key(&id)
}
pub fn get<'jh>(id: Id) -> Option<JobCacheHolder<'jh>> {
pub fn get(id: Id) -> Option<JobCacheHolder> {
if !Self::contains(id) {
return None;
}
@ -33,11 +33,15 @@ impl JobCache {
pub fn remove(id: Id) {
JOB_CACHE.write().remove(&id);
}
pub fn stats() -> Vec<Id> {
JOB_CACHE.read().values().map(|job| job.meta.id).collect()
}
}
pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Id);
pub struct JobCacheHolder(pub RwLockReadGuard<'static, Cache>, pub Id);
impl<'jh> Deref for JobCacheHolder<'jh> {
impl Deref for JobCacheHolder {
type Target = Val;
fn deref(&self) -> &Self::Target {

@ -1,167 +0,0 @@
use crate::{models::AssignedJob, UResult};
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use tokio::{
runtime::Handle,
sync::mpsc::{channel, Receiver, Sender},
sync::Mutex,
task::{spawn, spawn_blocking, JoinHandle},
};
use uuid::Uuid;
pub type ExecResult = UResult<AssignedJob>;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = {
spawn(init_receiver());
let (tx, rx) = channel(100);
(tx, Mutex::new(rx))
};
}
struct JoinInfo {
handle: JoinHandle<JoinHandle<ExecResult>>,
completed: bool,
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
}
impl JoinInfo {
async fn wait_result(self) -> ExecResult {
self.handle.await.unwrap().await.unwrap()
}
}
fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone()
}
pub struct Waiter {
tasks: Vec<BoxFuture<'static, ExecResult>>,
fids: Vec<Uuid>,
}
impl Waiter {
pub fn new() -> Self {
Self {
tasks: vec![],
fids: vec![],
}
}
pub fn push(&mut self, task: impl Future<Output = ExecResult> + Send + 'static) {
self.tasks.push(Box::pin(task));
}
/// Spawn prepared tasks
pub async fn spawn(mut self) -> Self {
let collectable = true; //TODO: self.tasks.len() != 1;
for f in self.tasks.drain(..) {
let handle = Handle::current();
let fid = Uuid::new_v4();
let tx = get_sender();
self.fids.push(fid);
let task_wrapper = async move {
debug!("inside wrapper (started): {}", fid);
let result = f.await;
tx.send(fid).await.unwrap();
result
};
let handler = JoinInfo {
handle: spawn_blocking(move || handle.spawn(task_wrapper)),
completed: false,
collectable,
};
FUT_RESULTS.lock().await.insert(fid, handler);
}
self
}
/// Wait until a bunch of tasks is finished.
/// NOT GUARANTEED that all tasks will be returned due to
/// possibility to pop them in other places
pub async fn wait(self) -> Vec<ExecResult> {
let mut result = vec![];
for fid in self.fids {
if let Some(task) = pop_task(fid).await {
result.push(task.wait_result().await);
}
}
result
}
}
async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
let mut lock = FUT_RESULTS.lock().await;
if let Some(j) = lock.get_mut(&fid) {
j.completed = true;
}
}
}
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> {
let &JoinInfo {
handle: _,
collectable,
completed,
} = match FUT_RESULTS.lock().await.get(&fid) {
Some(t) => t,
None => return None,
};
if collectable && completed {
let task = pop_task(fid).await.unwrap();
Some(task.wait_result().await)
} else {
None
}
}
pub async fn pop_completed() -> Vec<ExecResult> {
let mut completed: Vec<ExecResult> = vec![];
let fids = FUT_RESULTS
.lock()
.await
.keys()
.copied()
.collect::<Vec<Uuid>>();
for fid in fids {
if let Some(r) = pop_task_if_completed(fid).await {
completed.push(r)
}
}
completed
}
#[cfg(test)]
mod tests {
use super::*;
// WTF
// WTF
// WTF
#[tokio::test]
async fn test_spawn() {
use std::sync::Arc;
let val = Arc::new(Mutex::new(0));
let t = {
let v = val.clone();
spawn(async move {
*v.lock().await = 5;
})
};
assert_eq!(0, *val.lock().await);
spawn(async {}).await.unwrap();
assert_eq!(5, *val.lock().await);
t.await.unwrap();
assert_eq!(5, *val.lock().await);
}
}

@ -1,63 +1,67 @@
use crate::{
cache::JobCache,
combined_result::CombinedResult,
executor::{ExecResult, Waiter},
models::{Agent, AssignedJob, AssignedJobById, Job, JobType, RawJob},
config::get_self_id,
models::{Agent, AssignedJob, AssignedJobById, Job, JobType, PreparedJob, RawJob, Stats},
proc_output::ProcOutput,
scheduler::SCHEDULER,
u_runner::{ExecResult, IdentifiableFuture, URunner},
ufs,
};
use std::collections::HashMap;
use std::process::exit;
use tokio::process::Command;
pub struct AnonymousJobBatch {
waiter: Waiter,
runner: URunner,
is_running: bool,
}
impl AnonymousJobBatch {
pub fn from_meta_with_id(jobs: impl IntoIterator<Item = (Job, AssignedJobById)>) -> Self {
let mut waiter = Waiter::new();
for (job, ids) in jobs {
waiter.push(run_assigned_job(job, ids));
pub fn from_prepared_jobs(jobs: impl IntoIterator<Item = PreparedJob>) -> Self {
let mut runner = URunner::new();
for job in jobs {
runner.push(IdentifiableFuture::from_fut_with_ident(
job.ids.id,
run_assigned_job(job),
));
}
Self {
waiter,
runner,
is_running: false,
}
}
pub fn from_meta(jobs: impl IntoIterator<Item = Job>) -> Self {
pub fn from_jobs(jobs: impl IntoIterator<Item = Job>) -> Self {
let jobs_ids: Vec<_> = jobs
.into_iter()
.map(|job| {
let job_id = job.meta.id;
(
job,
AssignedJobById {
job_id,
..Default::default()
},
)
.map(|job| PreparedJob {
ids: AssignedJobById {
job_id: job.meta.id,
..Default::default()
},
job,
})
.collect();
AnonymousJobBatch::from_meta_with_id(jobs_ids)
AnonymousJobBatch::from_prepared_jobs(jobs_ids)
}
/// Spawn jobs
pub async fn spawn(mut self) -> Self {
debug!("spawning jobs");
self.waiter = self.waiter.spawn().await;
self.runner = self.runner.spawn().await;
self.is_running = true;
self
}
/// Spawn jobs and wait for result
pub async fn wait(self) -> Vec<ExecResult> {
let waiter = if !self.is_running {
self.spawn().await.waiter
let runner = if !self.is_running {
self.spawn().await.runner
} else {
self.waiter
self.runner
};
waiter.wait().await
runner.wait().await
}
/// Spawn one job and wait for result
@ -102,7 +106,7 @@ impl NamedJobBatch {
.map(|job| (job.meta.alias.clone().unwrap(), job))
.unzip();
Self {
runner: Some(AnonymousJobBatch::from_meta(jobs)),
runner: Some(AnonymousJobBatch::from_jobs(jobs)),
job_names,
results: HashMap::new(),
}
@ -132,9 +136,9 @@ impl NamedJobBatch<true> {
}
}
pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult {
let Job { meta, payload } = job;
let mut result = AssignedJob::from((&meta, ids));
pub async fn run_assigned_job(prepared_job: PreparedJob) -> ExecResult {
let mut result = AssignedJob::from(&prepared_job);
let Job { meta, payload } = prepared_job.job;
match meta.exec_type {
JobType::Shell => {
let (argv, _prepared_payload) = {
@ -150,7 +154,11 @@ pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult {
let mut split_cmd = shlex::split(&argv).unwrap().into_iter();
let cmd = split_cmd.nth(0).unwrap();
let args = split_cmd.collect::<Vec<String>>();
let cmd_result = Command::new(cmd).args(args).output().await;
let cmd_result = Command::new(cmd)
.kill_on_drop(true)
.args(args)
.output()
.await;
let (data, retcode) = match cmd_result {
Ok(output) => (
ProcOutput::from_output(&output).into_vec(),
@ -172,8 +180,18 @@ pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult {
result.set_result(&agent);
result.retcode = Some(0);
}
JobType::Service => todo!(),
JobType::Update => todo!(),
JobType::Stats => {
let stats = Stats {
agent_id: get_self_id(),
running_jobs: URunner::stats().await,
scheduled_jobs: SCHEDULER.stats().await,
cached_jobs: JobCache::stats(),
cached_payloads: ufs::stats(),
};
result.set_result(&stats);
result.retcode = Some(0)
}
JobType::Update => (),
JobType::Terminate => exit(0),
};
Ok(result)
@ -197,7 +215,7 @@ mod tests {
let sleep_jobs = (0..50).map(|_| job.clone()).collect::<Vec<_>>();
let now = SystemTime::now();
AnonymousJobBatch::from_meta(sleep_jobs).wait().await;
AnonymousJobBatch::from_jobs(sleep_jobs).wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
@ -236,7 +254,7 @@ mod tests {
raw_job = raw_job.with_raw_payload(p);
}
let job = raw_job.try_into_job().unwrap();
let result = AnonymousJobBatch::from_meta([job])
let result = AnonymousJobBatch::from_jobs([job])
.wait_one()
.await
.unwrap();
@ -250,8 +268,8 @@ mod tests {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = RawJob::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = AnonymousJobBatch::from_meta([longest_job]).spawn().await;
let ls = AnonymousJobBatch::from_meta([RawJob::from_shell("ls").unwrap()])
let longest_job = AnonymousJobBatch::from_jobs([longest_job]).spawn().await;
let ls = AnonymousJobBatch::from_jobs([RawJob::from_shell("ls").unwrap()])
.wait_one()
.await
.unwrap();
@ -264,7 +282,7 @@ mod tests {
.map(|f| RawJob::from_shell(format!("ls {f}")).unwrap())
.collect::<Vec<_>>();
let ls_subfolders = AnonymousJobBatch::from_meta(subfolders_jobs).wait().await;
let ls_subfolders = AnonymousJobBatch::from_jobs(subfolders_jobs).wait().await;
for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0);
@ -295,7 +313,7 @@ mod tests {
#[tokio::test]
async fn test_failing_shell_job() -> TestResult {
let job = RawJob::from_shell("lol_kek_puk").unwrap();
let job_result = AnonymousJobBatch::from_meta([job])
let job_result = AnonymousJobBatch::from_jobs([job])
.wait_one()
.await
.unwrap();

@ -7,7 +7,6 @@ pub mod conv;
#[cfg(feature = "server")]
pub mod db;
pub mod error;
pub mod executor;
pub mod jobs;
pub mod logging;
pub mod messaging;
@ -15,7 +14,9 @@ pub mod misc;
pub mod models;
pub mod platform;
pub mod proc_output;
pub mod scheduler;
pub mod types;
pub mod u_runner;
pub mod ufs;
#[cfg(unix)]
pub mod unix;

@ -13,8 +13,8 @@ mod server {
use self::server::*;
use crate::{
config::get_self_id, conv::systime_to_string, executor::ExecResult, jobs::NamedJobBatch,
platform, types::Id,
config::get_self_id, conv::systime_to_string, jobs::NamedJobBatch, platform, types::Id,
u_runner::ExecResult,
};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)]

@ -1,16 +1,19 @@
use super::{JobMeta, JobState, JobType};
use super::{JobState, JobType};
#[cfg(feature = "server")]
use crate::models::schema::*;
use crate::{
config::get_self_id,
conv::{bytes_to_string_truncated, systime_to_string},
models::Job,
types::Id,
UError, UResult,
};
#[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{borrow::Cow, fmt::Debug, time::SystemTime};
/// A job assigned to some agents, stores execution state and result
#[derive(Serialize, Deserialize, Clone, PartialEq)]
#[cfg_attr(
feature = "server",
@ -53,6 +56,13 @@ impl Debug for AssignedJob {
}
}
#[derive(Debug, Clone)]
pub struct PreparedJob {
pub job: Job,
pub ids: AssignedJobById,
}
/// Lightweight linking struct between agent job and assigned job
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct AssignedJobById {
pub agent_id: Id,
@ -61,20 +71,20 @@ pub struct AssignedJobById {
pub job_id: Id,
}
impl From<(&JobMeta, AssignedJobById)> for AssignedJob {
fn from((job, ids): (&JobMeta, AssignedJobById)) -> Self {
impl From<&PreparedJob> for AssignedJob {
fn from(prep_job: &PreparedJob) -> Self {
let AssignedJobById {
agent_id,
id,
job_id,
} = ids;
} = prep_job.ids;
AssignedJob {
id,
agent_id,
job_id,
alias: job.alias.clone(),
exec_type: job.exec_type,
alias: prep_job.job.meta.alias.clone(),
exec_type: prep_job.job.meta.exec_type,
..Default::default()
}
}
@ -136,7 +146,14 @@ impl AssignedJob {
String::from_utf8_lossy(self.to_raw_result())
}
pub fn set_result<S: Serialize>(&mut self, result: &S) {
pub fn deserialize<T: DeserializeOwned>(&self) -> UResult<T> {
let body = self.result.as_deref().unwrap_or_default();
serde_json::from_slice(body).map_err(|e| {
UError::DeserializeError(e.to_string(), bytes_to_string_truncated(body, 1024))
})
}
pub fn set_result(&mut self, result: &impl Serialize) {
self.result = Some(serde_json::to_vec(result).unwrap());
}

@ -7,6 +7,7 @@ use crate::models::Payload;
use crate::platform;
use crate::types::Id;
use crate::{UError, UResult};
use cron::Schedule;
#[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
@ -102,7 +103,7 @@ impl JobMeta {
return Err(mk_err("Argv contains no executable placeholder"));
}
if self.argv.contains("{}") && self.payload_id.is_none() {
if self.payload_id.is_none() && self.argv.contains("{}") {
return Err(mk_err(
"No payload provided, but argv contains executable placeholder",
));
@ -119,16 +120,24 @@ impl JobMeta {
)));
}
if let Some(schedule) = &self.schedule {
Schedule::try_from(schedule.as_str())
.map_err(|e| mk_err(format!("bad schedule: {e}")))?;
}
Ok(self)
}
}
/// An abstract valid-constructed job that can be assigned to any agent.
/// Contains job meta and payload meta
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Job {
pub meta: JobMeta,
pub payload: Option<Payload>,
}
/// Job that has only been deserialized without checks
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct RawJob<'p> {
#[serde(default)]

@ -35,7 +35,7 @@ pub enum JobState {
)]
pub enum JobType {
Init,
Service,
Stats,
#[default]
Shell,
Terminate,

@ -3,8 +3,9 @@ mod jobs;
mod payload;
#[cfg(feature = "server")]
pub mod schema;
mod stats;
pub use crate::models::{agent::*, jobs::*, payload::*};
pub use crate::models::{agent::*, jobs::*, payload::*, stats::*};
use serde::Deserialize;
use strum::{Display as StrumDisplay, EnumString};

@ -185,7 +185,7 @@ impl Payload {
#[cfg(windows)]
pub fn prepare_executable(&self) -> Result<(File, String)> {
todo!()
compile_error!("допилить")
}
}

@ -86,4 +86,9 @@ diesel::joinable!(jobs -> payloads (payload_id));
diesel::joinable!(results -> agents (agent_id));
diesel::joinable!(results -> jobs (job_id));
diesel::allow_tables_to_appear_in_same_query!(agents, jobs, payloads, results,);
diesel::allow_tables_to_appear_in_same_query!(
agents,
jobs,
payloads,
results,
);

@ -0,0 +1,14 @@
use serde::{Deserialize, Serialize};
use crate::{messaging::AsMsg, scheduler::EntryStat, types::Id, ufs::IndexFileMeta};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stats {
pub agent_id: Id,
pub running_jobs: Vec<String>,
pub scheduled_jobs: Vec<EntryStat>,
pub cached_jobs: Vec<Id>,
pub cached_payloads: Vec<IndexFileMeta>,
}
impl AsMsg for Stats {}

@ -7,7 +7,7 @@ pub struct ProcOutput {
}
impl ProcOutput {
const STDERR_DELIMETER: &[u8] = b"[STDERR]\n";
const STDERR_DELIMETER: &'static [u8] = b"[STDERR]\n";
pub fn from_output(output: &Output) -> Self {
Self::new()

@ -0,0 +1,91 @@
use crate::{models::PreparedJob, types::Id, u_runner::IdentifiableFuture};
use chrono::{DateTime, Utc};
use core::fmt;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub trait SchedulerJob {
fn call(&self) -> IdentifiableFuture<()>;
}
// scheduler job need to be called repeatedly,
// the only way to achieve this is to use Fn() -> Future
impl<F> SchedulerJob for F
where
F: Fn() -> IdentifiableFuture<()>,
{
fn call(&self) -> IdentifiableFuture<()> {
self()
}
}
#[derive(Clone)]
pub struct Entry {
pub entry_id: Id,
pub schedule: Option<cron::Schedule>,
pub next: Option<DateTime<Utc>>,
pub runnable: EntryType,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EntryStat {
pub entry_id: Id,
pub job_ident: String,
pub schedule: String,
pub next: Option<DateTime<Utc>>,
}
#[derive(Clone)]
pub enum EntryType {
Common(Arc<dyn SchedulerJob + Send + Sync>),
URunner(PreparedJob),
}
impl<J: SchedulerJob + Send + Sync + 'static> From<J> for EntryType {
fn from(value: J) -> Self {
Self::Common(Arc::new(value))
}
}
impl From<PreparedJob> for EntryType {
fn from(value: PreparedJob) -> Self {
Self::URunner(value)
}
}
impl Entry {
pub fn set_next_run_time(&mut self) {
self.next = self.get_next_run_time();
}
pub fn get_next_run_time(&self) -> Option<DateTime<Utc>> {
match &self.schedule {
Some(schedule) => schedule.upcoming(Utc).next(),
None => Some(Utc::now()),
}
}
pub fn as_stat(&self) -> EntryStat {
let job_ident = match &self.runnable {
EntryType::URunner(entry) => entry.job.meta.id.to_string(),
EntryType::Common(entry) => entry.call().job_ident.to_string(),
};
EntryStat {
entry_id: self.entry_id,
job_ident,
schedule: self
.schedule
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default(),
next: self.next.clone(),
}
}
}
impl fmt::Debug for Entry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} {:?} {:?}", self.entry_id, self.schedule, self.next)
}
}

@ -0,0 +1,178 @@
mod entry;
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use cron::Schedule;
use entry::Entry;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::jobs::AnonymousJobBatch;
use crate::types::Id;
use self::entry::EntryType;
pub use entry::EntryStat;
pub static SCHEDULER: Lazy<AsyncScheduler> = Lazy::new(|| AsyncScheduler::new());
#[derive(Clone)]
pub struct AsyncScheduler {
entries: Arc<Mutex<Vec<Entry>>>,
}
impl AsyncScheduler {
pub fn new() -> AsyncScheduler {
AsyncScheduler {
entries: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn start_blocking(&self) -> ! {
for entry in self.entries.lock().await.iter_mut() {
entry.set_next_run_time();
}
loop {
let mut entries = self.entries.lock().await;
entries.sort_by(|b, a| b.next.cmp(&a.next));
let wait_duration = if let Some(entry) = entries.first() {
let wait_millis = (entry.next.as_ref().unwrap().timestamp_millis() as u64)
.saturating_sub(Utc::now().timestamp_millis() as u64);
Duration::from_millis(wait_millis)
} else {
Duration::from_secs(1)
};
drop(entries);
tokio::time::sleep(wait_duration).await;
let mut entries = self.entries.lock().await;
let mut job_batch = vec![];
for entry in &mut *entries {
match entry.next.as_ref() {
Some(next) => {
if next.gt(&Utc::now()) {
break;
}
match &entry.runnable {
EntryType::Common(runnable) => {
let fut = runnable.call();
tokio::spawn(fut);
}
EntryType::URunner(runnable) => {
debug!("starting assigned job {:?}", runnable.ids);
job_batch.push(runnable.clone())
}
}
entry.set_next_run_time();
}
None => {}
}
}
AnonymousJobBatch::from_prepared_jobs(job_batch)
.spawn()
.await;
entries.retain(|e| e.schedule.is_some());
}
}
pub async fn add_job(&self, schedule: Option<Schedule>, runnable: impl Into<EntryType>) -> Id {
let entry_id = Uuid::new_v4();
let runnable = runnable.into();
let mut entry = Entry {
entry_id,
schedule,
next: None,
runnable,
};
entry.set_next_run_time();
self.entries.lock().await.push(entry);
entry_id
}
pub async fn del_job(&self, entry_id: Id) {
self.entries.lock().await.retain(|e| e.entry_id != entry_id);
}
pub async fn stats(&self) -> Vec<EntryStat> {
self.entries
.lock()
.await
.iter()
.map(|entry| entry.as_stat())
.collect()
}
pub async fn start(&self) {
let cloned = self.clone();
tokio::spawn(async move {
cloned.start_blocking().await;
});
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::time::sleep;
use crate::u_runner::IdentifiableFuture;
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn scheduling() {
let v = Arc::new(Mutex::new(0));
let scheduler = AsyncScheduler::new();
{
let v = v.clone();
scheduler
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
let v = v.clone();
IdentifiableFuture::from_fut_with_ident("testf", async move {
*v.lock().await += 1;
})
})
.await;
}
scheduler
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
IdentifiableFuture::from_fut_with_ident("testf", async move {
println!("hello1");
})
})
.await;
scheduler
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
IdentifiableFuture::from_fut_with_ident("testf", async move {
println!("hello2");
})
})
.await;
scheduler.start().await;
sleep(Duration::from_secs_f32(0.1)).await; // wait for scheduler
sleep(Duration::from_secs(1)).await;
assert_eq!(*v.lock().await, 1);
sleep(Duration::from_secs(1)).await;
assert_eq!(*v.lock().await, 2);
}
}

@ -0,0 +1,200 @@
use crate::{models::AssignedJob, UResult};
use lazy_static::lazy_static;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::{collections::HashMap, task::Context};
use tokio::{
runtime::Handle,
sync::mpsc::{channel, Receiver, Sender},
sync::Mutex,
task::{spawn, spawn_blocking, JoinHandle},
};
use uuid::Uuid;
pub type ExecResult = UResult<AssignedJob>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = {
spawn(init_receiver());
let (tx, rx) = channel(100);
(tx, Mutex::new(rx))
};
}
pub struct IdentifiableFuture<R> {
pub job_ident: String,
fut: Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>,
}
impl<R> IdentifiableFuture<R> {
pub fn from_fut_with_ident(
job_ident: impl Into<String>,
job: impl Future<Output = R> + Send + Sync + 'static,
) -> Self {
Self {
fut: Box::pin(job),
job_ident: job_ident.into(),
}
}
}
impl<R> Future for IdentifiableFuture<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut.as_mut().poll(cx)
}
}
struct JoinInfo {
job_ident: String,
handle: JoinHandle<JoinHandle<ExecResult>>,
completed: bool,
// collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
}
impl JoinInfo {
async fn wait_result(self) -> ExecResult {
self.handle.await.unwrap().await.unwrap()
}
}
fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone()
}
/// Job runner. Has multiple ways of work.
/// - run 1 or more jobs and wait until they're all done
/// - run 1 or more jobs in background and collect results of completed jobs later
pub struct URunner {
executables: Vec<IdentifiableFuture<ExecResult>>,
fids: Vec<Uuid>,
}
impl URunner {
pub fn new() -> Self {
Self {
executables: vec![],
fids: vec![],
}
}
pub fn push(&mut self, job: IdentifiableFuture<ExecResult>) {
self.executables.push(job);
}
/// Spawn prepared tasks
pub async fn spawn(mut self) -> Self {
for executable in self.executables.drain(..) {
let handle = Handle::current();
let fid = Uuid::new_v4();
let tx = get_sender();
let job_id = executable.job_ident.clone();
self.fids.push(fid);
let job_wrapper = async move {
debug!("inside wrapper (started): {}", fid);
let result = executable.await;
tx.send(fid).await.ok();
result
};
let handler = JoinInfo {
job_ident: job_id,
handle: spawn_blocking(move || handle.spawn(job_wrapper)),
completed: false,
};
FUT_RESULTS.lock().await.insert(fid, handler);
}
self
}
/// Wait until a bunch of tasks is finished.
/// NOT GUARANTEED that all tasks will be returned due to
/// possibility to pop them in other places
pub async fn wait(self) -> Vec<ExecResult> {
let mut result = vec![];
for fid in self.fids {
if let Some(job) = Self::pop_job(fid).await {
result.push(job.wait_result().await);
}
}
result
}
pub async fn pop_job_if_completed(fid: Uuid) -> Option<ExecResult> {
let Some(&JoinInfo { completed, .. }) = FUT_RESULTS.lock().await.get(&fid) else {
return None;
};
if completed {
let job = Self::pop_job(fid).await.unwrap();
Some(job.wait_result().await)
} else {
None
}
}
pub async fn pop_completed() -> Vec<ExecResult> {
let mut completed: Vec<ExecResult> = vec![];
let fids = FUT_RESULTS
.lock()
.await
.keys()
.copied()
.collect::<Vec<Uuid>>();
for fid in fids {
if let Some(r) = Self::pop_job_if_completed(fid).await {
completed.push(r)
}
}
completed
}
pub async fn stats() -> Vec<String> {
FUT_RESULTS
.lock()
.await
.values()
.map(|v| v.job_ident.clone())
.collect()
}
async fn pop_job(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
}
async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
let mut lock = FUT_RESULTS.lock().await;
if let Some(j) = lock.get_mut(&fid) {
j.completed = true;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
// WTF
// WTF
// WTF
#[tokio::test]
async fn test_spawn() {
use std::sync::Arc;
let val = Arc::new(Mutex::new(0));
let t = {
let v = val.clone();
spawn(async move {
*v.lock().await = 5;
})
};
assert_eq!(0, *val.lock().await);
spawn(async {}).await.unwrap();
assert_eq!(5, *val.lock().await);
t.await.unwrap();
assert_eq!(5, *val.lock().await);
}
}

@ -1,13 +1,15 @@
use super::{Error, FileMeta};
use super::Error;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env::temp_dir;
use std::ffi::OsString;
use std::fs;
use std::path::PathBuf;
// index format: given_name -> payload_meta
type Index = HashMap<String, FileMeta>;
type Index = HashMap<String, IndexFileMeta>;
static IDX_FILE_NAME: Lazy<PathBuf> = Lazy::new(|| temp_dir().join(".i"));
static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| {
@ -28,6 +30,35 @@ static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| {
Mutex::new(idx)
});
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct IndexFileMeta {
extension: Option<OsString>,
pub external: bool, // if file is present before adding to index
pub hashsum: String,
pub path: PathBuf, // actual file path
pub size: u64,
}
impl IndexFileMeta {
pub fn new(
full_path: impl Into<PathBuf>,
hashsum: String,
external: bool,
) -> Result<Self, Error> {
let full_path: PathBuf = full_path.into();
let extension = full_path.extension().map(ToOwned::to_owned);
let size = fs::metadata(&full_path)?.len();
Ok(IndexFileMeta {
path: full_path,
extension,
external,
hashsum,
size,
})
}
}
mod sync {
use super::{Index, IDX_FILE_NAME};
use std::fs;
@ -57,7 +88,7 @@ mod sync {
}
}
pub fn get(name: impl AsRef<str>) -> Option<FileMeta> {
pub fn get(name: impl AsRef<str>) -> Option<IndexFileMeta> {
let mut index = INDEX.lock();
sync::deleted(&mut index);
@ -65,18 +96,18 @@ pub fn get(name: impl AsRef<str>) -> Option<FileMeta> {
index.get(name.as_ref()).cloned()
}
pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, FileMeta)> {
pub fn get_by_hash(hash: impl AsRef<str>) -> Option<(String, IndexFileMeta)> {
let mut index = INDEX.lock();
sync::deleted(&mut index);
index
.iter()
.find(|(_name, meta)| meta.hash == hash.as_ref())
.find(|(_name, meta)| meta.hashsum == hash.as_ref())
.map(|(n, m)| (n.to_owned(), m.clone()))
}
pub fn insert(name: impl Into<String>, meta: FileMeta) {
pub fn insert(name: impl Into<String>, meta: IndexFileMeta) {
let mut index = INDEX.lock();
sync::deleted(&mut index);
@ -87,7 +118,7 @@ pub fn insert(name: impl Into<String>, meta: FileMeta) {
sync::index2fs(&mut index);
}
pub fn remove(name: impl AsRef<str>) -> Option<FileMeta> {
pub fn remove(name: impl AsRef<str>) -> Option<IndexFileMeta> {
let mut index = INDEX.lock();
sync::deleted(&mut index);
@ -99,3 +130,7 @@ pub fn remove(name: impl AsRef<str>) -> Option<FileMeta> {
result
}
pub fn stats() -> Vec<IndexFileMeta> {
INDEX.lock().values().cloned().collect()
}

@ -1,49 +1,19 @@
// This module is aiming to store (obfuscated?) payloads, get them by name,
// rename, update, delete or prepare to execute via memfd_create (unix)
/// This module is aiming to store (obfuscated?) payloads, get them by name,
/// rename, update, delete or prepare to execute via memfd_create (unix)
mod error;
mod index;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
use std::ffi::OsString;
use std::fs;
use std::path::{Path, PathBuf};
use uuid::Uuid;
mod error;
mod index;
pub use error::Error;
pub use index::IndexFileMeta;
const OBFUSCATE: bool = cfg!(feature = "agent");
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileMeta {
extension: Option<OsString>,
external: bool, // if file is present before adding to index
hash: Vec<u8>,
pub path: PathBuf, // actual file path
pub size: u64,
}
impl FileMeta {
pub fn new(
full_path: impl Into<PathBuf>,
hash: Vec<u8>,
external: bool,
) -> Result<Self, Error> {
let full_path: PathBuf = full_path.into();
let extension = full_path.extension().map(ToOwned::to_owned);
let size = fs::metadata(&full_path)?.len();
Ok(FileMeta {
path: full_path,
extension,
external,
hash,
size,
})
}
}
/// Check if file exists in index.
/// File may present in fs but not in index, fn will return false then.
pub fn exists_in_index(name: impl AsRef<str>) -> bool {
@ -51,7 +21,7 @@ pub fn exists_in_index(name: impl AsRef<str>) -> bool {
}
#[inline]
pub fn read_meta(name: impl AsRef<str>) -> Result<FileMeta> {
pub fn read_meta(name: impl AsRef<str>) -> Result<IndexFileMeta> {
index::get(&name)
.ok_or_else(|| Error::not_found(name.as_ref()))
.context("meta")
@ -113,7 +83,7 @@ pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
};
index::insert(
name,
FileMeta::new(path, data_hash, false).context("put_insert")?,
IndexFileMeta::new(path, data_hash, false).context("put_insert")?,
);
Ok(())
@ -123,7 +93,7 @@ pub fn edit(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
let meta = read_meta(&name).context("edit_meta")?;
let data_hash = hash_data(&data);
if meta.hash == data_hash {
if meta.hashsum == data_hash {
return Ok(());
}
@ -131,7 +101,7 @@ pub fn edit(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
.map_err(|e| Error::new(e, &meta.path))
.context("edit_write")?;
let new_meta = FileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?;
let new_meta = IndexFileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?;
index::remove(&name);
index::insert(name.as_ref(), new_meta);
@ -215,12 +185,16 @@ pub fn put_external(path: impl AsRef<Path>) -> Result<()> {
index::insert(
path_str,
FileMeta::new(path, data_hash, true).context("ext2")?,
IndexFileMeta::new(path, data_hash, true).context("ext2")?,
);
Ok(())
}
pub fn stats() -> Vec<IndexFileMeta> {
index::stats()
}
/*
pub fn cleanup() {
let index = INDEX.read();
@ -231,10 +205,11 @@ pub fn cleanup() {
}
*/
fn hash_data(data: impl AsRef<[u8]>) -> Vec<u8> {
fn hash_data(data: impl AsRef<[u8]>) -> String {
use sha3::{Digest, Sha3_256};
let mut hasher = Sha3_256::new();
hasher.update(data);
hasher.finalize().to_vec()
let hashsum = hasher.finalize().to_vec();
hex::encode(hashsum)
}

@ -1,5 +1,5 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE JobType AS ENUM ('shell', 'init', 'python', 'service');
CREATE TYPE JobType AS ENUM ('shell', 'init', 'python', 'service', 'stats');
CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished');
CREATE TYPE AgentState AS ENUM ('new', 'active', 'banned');

Loading…
Cancel
Save