diff --git a/Cargo.lock b/Cargo.lock index 381971d..794eaa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -219,9 +219,9 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" [[package]] name = "ahash" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" +checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" dependencies = [ "cfg-if 1.0.0", "getrandom", @@ -443,6 +443,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets", ] @@ -1477,7 +1478,7 @@ dependencies = [ "log", "memchr", "mime", - "spin 0.9.8", + "spin", "version_check", ] @@ -1931,17 +1932,16 @@ checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" [[package]] name = "ring" -version = "0.16.20" +version = "0.17.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b" dependencies = [ "cc", + "getrandom", "libc", - "once_cell", - "spin 0.5.2", + "spin", "untrusted", - "web-sys", - "winapi", + "windows-sys", ] [[package]] @@ -2054,9 +2054,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.7" +version = "0.21.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" +checksum = "446e14c5cda4f3f30fe71863c34ec70f5ac79d6087097ad0bb433e1be5edf04c" dependencies = [ "log", "ring", @@ -2075,9 +2075,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.6" +version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ "ring", "untrusted", @@ -2121,9 +2121,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "sct" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ "ring", "untrusted", @@ -2315,12 +2315,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "spin" version = "0.9.8" @@ -2801,7 +2795,6 @@ name = "u_lib" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "bincode", "chrono", "cron", @@ -2933,9 +2926,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] name = "untrusted" -version = "0.7.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" @@ -3254,18 +3247,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.11" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c19fae0c8a9efc6a8281f2e623db8af1db9e57852e04cde3e754dd2dc29340f" +checksum = "69c48d63854f77746c68a5fbb4aa17f3997ece1cb301689a257af8cb80610d21" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.11" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc56589e9ddd1f1c28d4b4b5c773ce232910a6bb67a70133d61c9e347585efe9" +checksum = "c258c1040279e4f88763a113de72ce32dde2d50e2a94573f15dd534cea36a16d" dependencies = [ "proc-macro2", "quote", diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index 7c69c2e..709bf59 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -5,7 +5,7 @@ use tokio::runtime::Builder; use tokio::time::{sleep, Duration}; use u_lib::models::PreparedJob; use u_lib::scheduler::SCHEDULER; -use u_lib::u_runner::URunner; +use u_lib::u_runner::{IdentifiableFuture, URunner}; use u_lib::{ api::HttpClient, cache::JobCache, @@ -143,17 +143,26 @@ pub fn run_forever() -> ! { SCHEDULER .add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || { let client = client.clone(); - error_reporting(client.clone()) + + IdentifiableFuture::from_fut_with_ident("error_reporting", async move { + error_reporting(client.clone()).await + }) }) .await; } - SCHEDULER - .add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || { - let client = client.clone(); - agent_loop(client) - }) - .await; + { + let client = client.clone(); + SCHEDULER + .add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || { + let client = client.clone(); + + IdentifiableFuture::from_fut_with_ident("agent_loop", async move { + agent_loop(client).await + }) + }) + .await; + } SCHEDULER.start_blocking().await }) diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index e1a8ec7..f9aac78 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -277,9 +277,9 @@ impl Endpoints { JobType::Shell => { result.state = JobState::Finished }, - JobType::Service => (), - JobType::Terminate => todo!(), - JobType::Update => todo!(), + JobType::Stats => (), + JobType::Terminate => (), + JobType::Update => (), } db.update_result(&result)?; } diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index e091b62..841049c 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] anyhow = { workspace = true } -chrono = "0.4.19" +chrono = { version = "0.4.19", features = ["serde"] } diesel = { workspace = true, optional = true } diesel-derive-enum = { version = "2.0.0", features = ["postgres"], optional = true } deadpool-diesel = { workspace = true, optional = true } @@ -34,7 +34,6 @@ parking_lot = "0.12.1" bincode = "1.3.3" sha3 = "0.10.7" cron = "0.12.0" -async-trait = "0.1.74" [target.'cfg(unix)'.dependencies] daemonize = "0.5" diff --git a/lib/u_lib/src/cache.rs b/lib/u_lib/src/cache.rs index 3031b0f..ef88ce1 100644 --- a/lib/u_lib/src/cache.rs +++ b/lib/u_lib/src/cache.rs @@ -22,7 +22,7 @@ impl JobCache { JOB_CACHE.read().contains_key(&id) } - pub fn get<'jh>(id: Id) -> Option> { + pub fn get(id: Id) -> Option { if !Self::contains(id) { return None; } @@ -33,11 +33,15 @@ impl JobCache { pub fn remove(id: Id) { JOB_CACHE.write().remove(&id); } + + pub fn stats() -> Vec { + JOB_CACHE.read().values().map(|job| job.meta.id).collect() + } } -pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Id); +pub struct JobCacheHolder(pub RwLockReadGuard<'static, Cache>, pub Id); -impl<'jh> Deref for JobCacheHolder<'jh> { +impl Deref for JobCacheHolder { type Target = Val; fn deref(&self) -> &Self::Target { diff --git a/lib/u_lib/src/jobs.rs b/lib/u_lib/src/jobs.rs index 9e4f42d..cc83d18 100644 --- a/lib/u_lib/src/jobs.rs +++ b/lib/u_lib/src/jobs.rs @@ -1,8 +1,14 @@ use crate::{ + cache::JobCache, combined_result::CombinedResult, - models::{Agent, AssignedJob, AssignedJobById, Job, JobType, PreparedJob, RawJob}, + config::get_self_id, + models::{ + stats::Stats, Agent, AssignedJob, AssignedJobById, Job, JobType, PreparedJob, RawJob, + }, proc_output::ProcOutput, - u_runner::{ExecResult, URunner}, + scheduler::SCHEDULER, + u_runner::{ExecResult, IdentifiableFuture, URunner}, + ufs, }; use std::collections::HashMap; use std::process::exit; @@ -17,7 +23,10 @@ impl AnonymousJobBatch { pub fn from_prepared_jobs(jobs: impl IntoIterator) -> Self { let mut runner = URunner::new(); for job in jobs { - runner.push(run_assigned_job(job)); + runner.push(IdentifiableFuture::from_fut_with_ident( + job.ids.id, + run_assigned_job(job), + )); } Self { runner, @@ -25,7 +34,7 @@ impl AnonymousJobBatch { } } - pub fn from_meta(jobs: impl IntoIterator) -> Self { + pub fn from_jobs(jobs: impl IntoIterator) -> Self { let jobs_ids: Vec<_> = jobs .into_iter() .map(|job| PreparedJob { @@ -99,7 +108,7 @@ impl NamedJobBatch { .map(|job| (job.meta.alias.clone().unwrap(), job)) .unzip(); Self { - runner: Some(AnonymousJobBatch::from_meta(jobs)), + runner: Some(AnonymousJobBatch::from_jobs(jobs)), job_names, results: HashMap::new(), } @@ -173,8 +182,18 @@ pub async fn run_assigned_job(prepared_job: PreparedJob) -> ExecResult { result.set_result(&agent); result.retcode = Some(0); } - JobType::Service => todo!(), - JobType::Update => todo!(), + JobType::Stats => { + let stats = Stats { + agent_id: get_self_id(), + running_jobs: URunner::stats().await, + scheduled_jobs: SCHEDULER.stats().await, + cached_jobs: JobCache::stats(), + cached_payloads: ufs::stats(), + }; + result.set_result(&stats); + result.retcode = Some(0) + } + JobType::Update => (), JobType::Terminate => exit(0), }; Ok(result) @@ -198,7 +217,7 @@ mod tests { let sleep_jobs = (0..50).map(|_| job.clone()).collect::>(); let now = SystemTime::now(); - AnonymousJobBatch::from_meta(sleep_jobs).wait().await; + AnonymousJobBatch::from_jobs(sleep_jobs).wait().await; assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) } @@ -237,7 +256,7 @@ mod tests { raw_job = raw_job.with_raw_payload(p); } let job = raw_job.try_into_job().unwrap(); - let result = AnonymousJobBatch::from_meta([job]) + let result = AnonymousJobBatch::from_jobs([job]) .wait_one() .await .unwrap(); @@ -251,8 +270,8 @@ mod tests { const SLEEP_SECS: u64 = 1; let now = SystemTime::now(); let longest_job = RawJob::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); - let longest_job = AnonymousJobBatch::from_meta([longest_job]).spawn().await; - let ls = AnonymousJobBatch::from_meta([RawJob::from_shell("ls").unwrap()]) + let longest_job = AnonymousJobBatch::from_jobs([longest_job]).spawn().await; + let ls = AnonymousJobBatch::from_jobs([RawJob::from_shell("ls").unwrap()]) .wait_one() .await .unwrap(); @@ -265,7 +284,7 @@ mod tests { .map(|f| RawJob::from_shell(format!("ls {f}")).unwrap()) .collect::>(); - let ls_subfolders = AnonymousJobBatch::from_meta(subfolders_jobs).wait().await; + let ls_subfolders = AnonymousJobBatch::from_jobs(subfolders_jobs).wait().await; for result in ls_subfolders { assert_eq!(result.unwrap().retcode.unwrap(), 0); @@ -296,7 +315,7 @@ mod tests { #[tokio::test] async fn test_failing_shell_job() -> TestResult { let job = RawJob::from_shell("lol_kek_puk").unwrap(); - let job_result = AnonymousJobBatch::from_meta([job]) + let job_result = AnonymousJobBatch::from_jobs([job]) .wait_one() .await .unwrap(); diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index 92b8382..59c30f5 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -38,6 +38,3 @@ extern crate tracing; #[cfg(test)] #[macro_use] extern crate rstest; - -#[macro_use] -extern crate async_trait; diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index dfbb254..397cf56 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -145,7 +145,7 @@ impl AssignedJob { String::from_utf8_lossy(self.to_raw_result()) } - pub fn set_result(&mut self, result: &S) { + pub fn set_result(&mut self, result: &impl Serialize) { self.result = Some(serde_json::to_vec(result).unwrap()); } diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 0426661..a450f97 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -103,7 +103,7 @@ impl JobMeta { return Err(mk_err("Argv contains no executable placeholder")); } - if self.argv.contains("{}") && self.payload_id.is_none() { + if self.payload_id.is_none() && self.argv.contains("{}") { return Err(mk_err( "No payload provided, but argv contains executable placeholder", )); diff --git a/lib/u_lib/src/models/jobs/misc.rs b/lib/u_lib/src/models/jobs/misc.rs index faf108e..0fbca99 100644 --- a/lib/u_lib/src/models/jobs/misc.rs +++ b/lib/u_lib/src/models/jobs/misc.rs @@ -35,7 +35,7 @@ pub enum JobState { )] pub enum JobType { Init, - Service, + Stats, #[default] Shell, Terminate, diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index 61c9a56..0911500 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -3,6 +3,7 @@ mod jobs; mod payload; #[cfg(feature = "server")] pub mod schema; +pub mod stats; pub use crate::models::{agent::*, jobs::*, payload::*}; use serde::Deserialize; diff --git a/lib/u_lib/src/models/payload.rs b/lib/u_lib/src/models/payload.rs index 84db3d7..f947a28 100644 --- a/lib/u_lib/src/models/payload.rs +++ b/lib/u_lib/src/models/payload.rs @@ -185,7 +185,7 @@ impl Payload { #[cfg(windows)] pub fn prepare_executable(&self) -> Result<(File, String)> { - todo!() + compile_error!("допилить") } } diff --git a/lib/u_lib/src/models/stats.rs b/lib/u_lib/src/models/stats.rs new file mode 100644 index 0000000..d282286 --- /dev/null +++ b/lib/u_lib/src/models/stats.rs @@ -0,0 +1,14 @@ +use serde::Serialize; + +use crate::{messaging::AsMsg, scheduler::EntryStat, types::Id, ufs::IndexFileMeta}; + +#[derive(Debug, Clone, Serialize)] +pub struct Stats { + pub agent_id: Id, + pub running_jobs: Vec, + pub scheduled_jobs: Vec, + pub cached_jobs: Vec, + pub cached_payloads: Vec, +} + +impl AsMsg for Stats {} diff --git a/lib/u_lib/src/scheduler/entry.rs b/lib/u_lib/src/scheduler/entry.rs index 2d3c422..b235b51 100644 --- a/lib/u_lib/src/scheduler/entry.rs +++ b/lib/u_lib/src/scheduler/entry.rs @@ -1,33 +1,40 @@ -use crate::{models::PreparedJob, types::Id}; +use crate::{models::PreparedJob, types::Id, u_runner::IdentifiableFuture}; use chrono::{DateTime, Utc}; use core::fmt; -use std::{future::Future, pin::Pin, sync::Arc}; - -pub type JobFuture = Box + Send>; +use serde::Serialize; +use std::sync::Arc; pub trait SchedulerJob { - fn call(&self) -> Pin; + fn call(&self) -> IdentifiableFuture<()>; } -impl SchedulerJob for F +// scheduler job need to be called repeatedly, +// the only way to achieve this is to use Fn() -> Future +impl SchedulerJob for F where - F: Fn() -> T, - T: Future + Send + 'static, + F: Fn() -> IdentifiableFuture<()>, { - fn call(&self) -> Pin { - Box::pin(self()) + fn call(&self) -> IdentifiableFuture<()> { + self() } } #[derive(Clone)] pub struct Entry { pub id: Id, - pub job_id: Id, pub schedule: Option, pub next: Option>, pub runnable: EntryType, } +#[derive(Clone, Debug, Serialize)] +pub struct EntryStat { + pub id: Id, + pub job_id: String, + pub schedule: String, + pub next: Option>, +} + #[derive(Clone)] pub enum EntryType { Common(Arc), @@ -57,6 +64,24 @@ impl Entry { None => Some(Utc::now()), } } + + pub fn as_stat(&self) -> EntryStat { + let job_id = match &self.runnable { + EntryType::URunner(entry) => entry.job.meta.id.to_string(), + EntryType::Common(entry) => entry.call().job_id.to_string(), + }; + + EntryStat { + id: self.id, + job_id, + schedule: self + .schedule + .as_ref() + .map(|s| s.to_string()) + .unwrap_or_default(), + next: self.next.clone(), + } + } } impl fmt::Debug for Entry { diff --git a/lib/u_lib/src/scheduler/mod.rs b/lib/u_lib/src/scheduler/mod.rs index 0d2cc8c..29e40c7 100644 --- a/lib/u_lib/src/scheduler/mod.rs +++ b/lib/u_lib/src/scheduler/mod.rs @@ -1,5 +1,4 @@ mod entry; -mod spawner; use std::sync::Arc; use std::time::Duration; @@ -9,12 +8,15 @@ use cron::Schedule; use entry::Entry; use once_cell::sync::Lazy; use tokio::sync::Mutex; +use tokio::time::timeout; use uuid::Uuid; +use crate::jobs::AnonymousJobBatch; use crate::types::Id; use self::entry::EntryType; -use self::spawner::{CommonSpawner, Spawner, URunnerSpawner}; + +pub use entry::EntryStat; pub static SCHEDULER: Lazy = Lazy::new(|| AsyncScheduler::new()); @@ -52,18 +54,33 @@ impl AsyncScheduler { tokio::time::sleep(wait_duration).await; let mut entries = self.entries.lock().await; - - CommonSpawner { - entries: &mut entries, + let mut job_batch = vec![]; + + for entry in &mut *entries { + match entry.next.as_ref() { + Some(next) => { + match &entry.runnable { + EntryType::Common(runnable) => { + let cancel_timeout = + next.timestamp_millis() - Utc::now().timestamp_millis(); + let cancel_timeout = Duration::from_millis(cancel_timeout as u64); + + tokio::spawn(timeout(cancel_timeout, runnable.call())); + } + EntryType::URunner(runnable) => job_batch.push(runnable.clone()), + } + entry.set_next_run_time(); + } + None => {} + } + if entry.next.as_ref().unwrap().gt(&Utc::now()) { + break; + } } - .spawn() - .await; - URunnerSpawner { - entries: &mut entries, - } - .spawn() - .await; + AnonymousJobBatch::from_prepared_jobs(job_batch) + .spawn() + .await; entries.retain(|e| e.schedule.is_some()); } @@ -72,13 +89,8 @@ impl AsyncScheduler { pub async fn add_job(&self, schedule: Option, runnable: impl Into) -> Id { let entry_id = Uuid::new_v4(); let runnable = runnable.into(); - let job_id = match &runnable { - EntryType::URunner(j) => j.job.meta.id, - _ => Id::new_v4(), - }; let mut entry = Entry { id: entry_id, - job_id, schedule, next: None, runnable, @@ -94,6 +106,15 @@ impl AsyncScheduler { self.entries.lock().await.retain(|e| e.id != entry_id); } + pub async fn stats(&self) -> Vec { + self.entries + .lock() + .await + .iter() + .map(|entry| entry.as_stat()) + .collect() + } + pub async fn start(&self) { let cloned = self.clone(); tokio::spawn(async move { @@ -107,6 +128,8 @@ mod tests { use std::time::Duration; use tokio::time::sleep; + use crate::u_runner::IdentifiableFuture; + use super::*; #[tokio::test(flavor = "multi_thread")] @@ -119,29 +142,27 @@ mod tests { scheduler .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { let v = v.clone(); - async move { + IdentifiableFuture::from_fut_with_ident("testf", async move { *v.lock().await += 1; - } + }) }) .await; } scheduler - .add_job( - Some("*/1 * * * * * *".try_into().unwrap()), - move || async move { + .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { + IdentifiableFuture::from_fut_with_ident("testf", async move { println!("hello1"); - }, - ) + }) + }) .await; scheduler - .add_job( - Some("*/1 * * * * * *".try_into().unwrap()), - move || async move { + .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { + IdentifiableFuture::from_fut_with_ident("testf", async move { println!("hello2"); - }, - ) + }) + }) .await; scheduler.start().await; @@ -170,10 +191,10 @@ mod tests { scheduler .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { let counter = counter.clone(); - async move { + IdentifiableFuture::from_fut_with_ident("testf", async move { sleep(Duration::from_secs(2)).await; *counter.lock().await += 1; - } + }) }) .await; } diff --git a/lib/u_lib/src/scheduler/spawner.rs b/lib/u_lib/src/scheduler/spawner.rs deleted file mode 100644 index 597efef..0000000 --- a/lib/u_lib/src/scheduler/spawner.rs +++ /dev/null @@ -1,85 +0,0 @@ -use std::time::Duration; - -use crate::jobs::AnonymousJobBatch; - -use super::entry::{Entry, EntryType}; -use chrono::Utc; -use tokio::time::timeout; - -#[async_trait] -pub trait Spawner { - async fn spawn(self); -} - -/// Spawn jobs in scheduler thru `tokio::spawn` -pub struct CommonSpawner<'e> { - pub entries: &'e mut [Entry], -} - -#[async_trait] -impl Spawner for CommonSpawner<'_> { - async fn spawn(self) { - for entry in self - .entries - .iter_mut() - .filter(|e| matches!(e.runnable, EntryType::Common(_))) - { - if let EntryType::Common(runnable) = &entry.runnable { - match entry.next.as_ref() { - Some(next) => { - let cancel_timeout = - next.timestamp_millis() - Utc::now().timestamp_millis(); - let cancel_timeout = Duration::from_millis(cancel_timeout as u64); - - tokio::spawn(timeout(cancel_timeout, runnable.call())); - - entry.set_next_run_time(); - } - None => { - error!("not implemented yet"); - todo!(); - } - } - } - if entry.next.as_ref().unwrap().gt(&Utc::now()) { - break; - } - } - } -} - -/// Spawn jobs in scheduler thru `URunner` -pub struct URunnerSpawner<'e> { - pub entries: &'e mut [Entry], -} - -#[async_trait] -impl Spawner for URunnerSpawner<'_> { - async fn spawn(self) { - let mut job_batch = vec![]; - for entry in self - .entries - .iter_mut() - .filter(|e| matches!(e.runnable, EntryType::URunner(_))) - { - if let EntryType::URunner(runnable) = &entry.runnable { - match entry.next.as_ref() { - Some(_) => { - job_batch.push(runnable.clone()); - entry.set_next_run_time(); - } - None => { - error!("not implemented yet"); - todo!(); - } - } - } - if entry.next.as_ref().unwrap().gt(&Utc::now()) { - break; - } - } - AnonymousJobBatch::from_prepared_jobs(job_batch) - .spawn() - .await; - } -} diff --git a/lib/u_lib/src/u_runner.rs b/lib/u_lib/src/u_runner.rs index 712c1f8..3696885 100644 --- a/lib/u_lib/src/u_runner.rs +++ b/lib/u_lib/src/u_runner.rs @@ -1,8 +1,9 @@ use crate::{models::AssignedJob, UResult}; use lazy_static::lazy_static; -use std::collections::HashMap; use std::future::Future; use std::pin::Pin; +use std::task::Poll; +use std::{collections::HashMap, task::Context}; use tokio::{ runtime::Handle, sync::mpsc::{channel, Receiver, Sender}, @@ -12,7 +13,6 @@ use tokio::{ use uuid::Uuid; pub type ExecResult = UResult; -type BoxFuture<'a, T> = Pin + Send + 'a>>; lazy_static! { static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); @@ -23,10 +23,36 @@ lazy_static! { }; } +pub struct IdentifiableFuture { + pub job_id: String, + fut: Pin + Send + Sync + 'static>>, +} + +impl IdentifiableFuture { + pub fn from_fut_with_ident( + job_id: impl Into, + job: impl Future + Send + Sync + 'static, + ) -> Self { + Self { + fut: Box::pin(job), + job_id: job_id.into(), + } + } +} + +impl Future for IdentifiableFuture { + type Output = R; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.fut.as_mut().poll(cx) + } +} + struct JoinInfo { + job_id: String, handle: JoinHandle>, completed: bool, - collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed + // collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed } impl JoinInfo { @@ -43,40 +69,40 @@ fn get_sender() -> Sender { /// - run 1 or more jobs and wait until they're all done /// - run 1 or more jobs in background and collect results of completed jobs later pub struct URunner { - tasks: Vec>, + executables: Vec>, fids: Vec, } impl URunner { pub fn new() -> Self { Self { - tasks: vec![], + executables: vec![], fids: vec![], } } - pub fn push(&mut self, task: impl Future + Send + 'static) { - self.tasks.push(Box::pin(task)); + pub fn push(&mut self, job: IdentifiableFuture) { + self.executables.push(job); } /// Spawn prepared tasks pub async fn spawn(mut self) -> Self { - let collectable = true; //TODO: self.tasks.len() != 1; - for f in self.tasks.drain(..) { + for executable in self.executables.drain(..) { let handle = Handle::current(); let fid = Uuid::new_v4(); let tx = get_sender(); + let job_id = executable.job_id.clone(); self.fids.push(fid); - let task_wrapper = async move { + let job_wrapper = async move { debug!("inside wrapper (started): {}", fid); - let result = f.await; - tx.send(fid).await.unwrap(); + let result = executable.await; + tx.send(fid).await.ok(); result }; let handler = JoinInfo { - handle: spawn_blocking(move || handle.spawn(task_wrapper)), + job_id, + handle: spawn_blocking(move || handle.spawn(job_wrapper)), completed: false, - collectable, }; FUT_RESULTS.lock().await.insert(fid, handler); } @@ -89,25 +115,20 @@ impl URunner { pub async fn wait(self) -> Vec { let mut result = vec![]; for fid in self.fids { - if let Some(task) = Self::pop_task(fid).await { - result.push(task.wait_result().await); + if let Some(job) = Self::pop_job(fid).await { + result.push(job.wait_result().await); } } result } - pub async fn pop_task_if_completed(fid: Uuid) -> Option { - let &JoinInfo { - handle: _, - collectable, - completed, - } = match FUT_RESULTS.lock().await.get(&fid) { - Some(t) => t, - None => return None, + pub async fn pop_job_if_completed(fid: Uuid) -> Option { + let Some(&JoinInfo { completed, .. }) = FUT_RESULTS.lock().await.get(&fid) else { + return None; }; - if collectable && completed { - let task = Self::pop_task(fid).await.unwrap(); - Some(task.wait_result().await) + if completed { + let job = Self::pop_job(fid).await.unwrap(); + Some(job.wait_result().await) } else { None } @@ -122,14 +143,23 @@ impl URunner { .copied() .collect::>(); for fid in fids { - if let Some(r) = Self::pop_task_if_completed(fid).await { + if let Some(r) = Self::pop_job_if_completed(fid).await { completed.push(r) } } completed } - async fn pop_task(fid: Uuid) -> Option { + pub async fn stats() -> Vec { + FUT_RESULTS + .lock() + .await + .values() + .map(|v| v.job_id.clone()) + .collect() + } + + async fn pop_job(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } } diff --git a/lib/u_lib/src/ufs/index.rs b/lib/u_lib/src/ufs/index.rs index 9a58ef6..dbfdbb2 100644 --- a/lib/u_lib/src/ufs/index.rs +++ b/lib/u_lib/src/ufs/index.rs @@ -1,13 +1,15 @@ -use super::{Error, FileMeta}; +use super::Error; use once_cell::sync::Lazy; use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env::temp_dir; +use std::ffi::OsString; use std::fs; use std::path::PathBuf; // index format: given_name -> payload_meta -type Index = HashMap; +type Index = HashMap; static IDX_FILE_NAME: Lazy = Lazy::new(|| temp_dir().join(".i")); static INDEX: Lazy> = Lazy::new(|| { @@ -28,6 +30,35 @@ static INDEX: Lazy> = Lazy::new(|| { Mutex::new(idx) }); +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct IndexFileMeta { + extension: Option, + pub external: bool, // if file is present before adding to index + pub hash: Vec, + pub path: PathBuf, // actual file path + pub size: u64, +} + +impl IndexFileMeta { + pub fn new( + full_path: impl Into, + hash: Vec, + external: bool, + ) -> Result { + let full_path: PathBuf = full_path.into(); + let extension = full_path.extension().map(ToOwned::to_owned); + let size = fs::metadata(&full_path)?.len(); + + Ok(IndexFileMeta { + path: full_path, + extension, + external, + hash, + size, + }) + } +} + mod sync { use super::{Index, IDX_FILE_NAME}; use std::fs; @@ -57,7 +88,7 @@ mod sync { } } -pub fn get(name: impl AsRef) -> Option { +pub fn get(name: impl AsRef) -> Option { let mut index = INDEX.lock(); sync::deleted(&mut index); @@ -65,7 +96,7 @@ pub fn get(name: impl AsRef) -> Option { index.get(name.as_ref()).cloned() } -pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, FileMeta)> { +pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, IndexFileMeta)> { let mut index = INDEX.lock(); sync::deleted(&mut index); @@ -76,7 +107,7 @@ pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, FileMeta)> { .map(|(n, m)| (n.to_owned(), m.clone())) } -pub fn insert(name: impl Into, meta: FileMeta) { +pub fn insert(name: impl Into, meta: IndexFileMeta) { let mut index = INDEX.lock(); sync::deleted(&mut index); @@ -87,7 +118,7 @@ pub fn insert(name: impl Into, meta: FileMeta) { sync::index2fs(&mut index); } -pub fn remove(name: impl AsRef) -> Option { +pub fn remove(name: impl AsRef) -> Option { let mut index = INDEX.lock(); sync::deleted(&mut index); @@ -99,3 +130,7 @@ pub fn remove(name: impl AsRef) -> Option { result } + +pub fn stats() -> Vec { + INDEX.lock().values().cloned().collect() +} diff --git a/lib/u_lib/src/ufs/mod.rs b/lib/u_lib/src/ufs/mod.rs index 96f0bbd..5fce83f 100644 --- a/lib/u_lib/src/ufs/mod.rs +++ b/lib/u_lib/src/ufs/mod.rs @@ -1,49 +1,19 @@ -// This module is aiming to store (obfuscated?) payloads, get them by name, -// rename, update, delete or prepare to execute via memfd_create (unix) +/// This module is aiming to store (obfuscated?) payloads, get them by name, +/// rename, update, delete or prepare to execute via memfd_create (unix) +mod error; +mod index; use anyhow::{Context, Result}; -use serde::{Deserialize, Serialize}; use std::env::temp_dir; -use std::ffi::OsString; use std::fs; use std::path::{Path, PathBuf}; use uuid::Uuid; -mod error; -mod index; pub use error::Error; +pub use index::IndexFileMeta; const OBFUSCATE: bool = cfg!(feature = "agent"); -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct FileMeta { - extension: Option, - external: bool, // if file is present before adding to index - hash: Vec, - pub path: PathBuf, // actual file path - pub size: u64, -} - -impl FileMeta { - pub fn new( - full_path: impl Into, - hash: Vec, - external: bool, - ) -> Result { - let full_path: PathBuf = full_path.into(); - let extension = full_path.extension().map(ToOwned::to_owned); - let size = fs::metadata(&full_path)?.len(); - - Ok(FileMeta { - path: full_path, - extension, - external, - hash, - size, - }) - } -} - /// Check if file exists in index. /// File may present in fs but not in index, fn will return false then. pub fn exists_in_index(name: impl AsRef) -> bool { @@ -51,7 +21,7 @@ pub fn exists_in_index(name: impl AsRef) -> bool { } #[inline] -pub fn read_meta(name: impl AsRef) -> Result { +pub fn read_meta(name: impl AsRef) -> Result { index::get(&name) .ok_or_else(|| Error::not_found(name.as_ref())) .context("meta") @@ -113,7 +83,7 @@ pub fn put(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<()> { }; index::insert( name, - FileMeta::new(path, data_hash, false).context("put_insert")?, + IndexFileMeta::new(path, data_hash, false).context("put_insert")?, ); Ok(()) @@ -131,7 +101,7 @@ pub fn edit(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<()> { .map_err(|e| Error::new(e, &meta.path)) .context("edit_write")?; - let new_meta = FileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?; + let new_meta = IndexFileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?; index::remove(&name); index::insert(name.as_ref(), new_meta); @@ -215,12 +185,16 @@ pub fn put_external(path: impl AsRef) -> Result<()> { index::insert( path_str, - FileMeta::new(path, data_hash, true).context("ext2")?, + IndexFileMeta::new(path, data_hash, true).context("ext2")?, ); Ok(()) } +pub fn stats() -> Vec { + index::stats() +} + /* pub fn cleanup() { let index = INDEX.read();