initial stats

pull/12/head
plazmoid 11 months ago
parent b75871e25d
commit 2816e72cbd
  1. 49
      Cargo.lock
  2. 15
      bin/u_agent/src/lib.rs
  3. 6
      bin/u_server/src/handlers.rs
  4. 3
      lib/u_lib/Cargo.toml
  5. 10
      lib/u_lib/src/cache.rs
  6. 45
      lib/u_lib/src/jobs.rs
  7. 3
      lib/u_lib/src/lib.rs
  8. 2
      lib/u_lib/src/models/jobs/assigned.rs
  9. 2
      lib/u_lib/src/models/jobs/meta.rs
  10. 2
      lib/u_lib/src/models/jobs/misc.rs
  11. 1
      lib/u_lib/src/models/mod.rs
  12. 2
      lib/u_lib/src/models/payload.rs
  13. 14
      lib/u_lib/src/models/stats.rs
  14. 47
      lib/u_lib/src/scheduler/entry.rs
  15. 77
      lib/u_lib/src/scheduler/mod.rs
  16. 85
      lib/u_lib/src/scheduler/spawner.rs
  17. 88
      lib/u_lib/src/u_runner.rs
  18. 47
      lib/u_lib/src/ufs/index.rs
  19. 52
      lib/u_lib/src/ufs/mod.rs

49
Cargo.lock generated

@ -219,9 +219,9 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.8.5" version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"getrandom", "getrandom",
@ -443,6 +443,7 @@ dependencies = [
"iana-time-zone", "iana-time-zone",
"js-sys", "js-sys",
"num-traits", "num-traits",
"serde",
"wasm-bindgen", "wasm-bindgen",
"windows-targets", "windows-targets",
] ]
@ -1477,7 +1478,7 @@ dependencies = [
"log", "log",
"memchr", "memchr",
"mime", "mime",
"spin 0.9.8", "spin",
"version_check", "version_check",
] ]
@ -1931,17 +1932,16 @@ checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.16.20" version = "0.17.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b"
dependencies = [ dependencies = [
"cc", "cc",
"getrandom",
"libc", "libc",
"once_cell", "spin",
"spin 0.5.2",
"untrusted", "untrusted",
"web-sys", "windows-sys",
"winapi",
] ]
[[package]] [[package]]
@ -2054,9 +2054,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.21.7" version = "0.21.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" checksum = "446e14c5cda4f3f30fe71863c34ec70f5ac79d6087097ad0bb433e1be5edf04c"
dependencies = [ dependencies = [
"log", "log",
"ring", "ring",
@ -2075,9 +2075,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
version = "0.101.6" version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [ dependencies = [
"ring", "ring",
"untrusted", "untrusted",
@ -2121,9 +2121,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "sct" name = "sct"
version = "0.7.0" version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [ dependencies = [
"ring", "ring",
"untrusted", "untrusted",
@ -2315,12 +2315,6 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "spin" name = "spin"
version = "0.9.8" version = "0.9.8"
@ -2801,7 +2795,6 @@ name = "u_lib"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"bincode", "bincode",
"chrono", "chrono",
"cron", "cron",
@ -2933,9 +2926,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85"
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.7.1" version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]] [[package]]
name = "url" name = "url"
@ -3254,18 +3247,18 @@ dependencies = [
[[package]] [[package]]
name = "zerocopy" name = "zerocopy"
version = "0.7.11" version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c19fae0c8a9efc6a8281f2e623db8af1db9e57852e04cde3e754dd2dc29340f" checksum = "69c48d63854f77746c68a5fbb4aa17f3997ece1cb301689a257af8cb80610d21"
dependencies = [ dependencies = [
"zerocopy-derive", "zerocopy-derive",
] ]
[[package]] [[package]]
name = "zerocopy-derive" name = "zerocopy-derive"
version = "0.7.11" version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc56589e9ddd1f1c28d4b4b5c773ce232910a6bb67a70133d61c9e347585efe9" checksum = "c258c1040279e4f88763a113de72ce32dde2d50e2a94573f15dd534cea36a16d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

@ -5,7 +5,7 @@ use tokio::runtime::Builder;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use u_lib::models::PreparedJob; use u_lib::models::PreparedJob;
use u_lib::scheduler::SCHEDULER; use u_lib::scheduler::SCHEDULER;
use u_lib::u_runner::URunner; use u_lib::u_runner::{IdentifiableFuture, URunner};
use u_lib::{ use u_lib::{
api::HttpClient, api::HttpClient,
cache::JobCache, cache::JobCache,
@ -143,17 +143,26 @@ pub fn run_forever() -> ! {
SCHEDULER SCHEDULER
.add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || { .add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || {
let client = client.clone(); let client = client.clone();
error_reporting(client.clone())
IdentifiableFuture::from_fut_with_ident("error_reporting", async move {
error_reporting(client.clone()).await
})
}) })
.await; .await;
} }
{
let client = client.clone();
SCHEDULER SCHEDULER
.add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || { .add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || {
let client = client.clone(); let client = client.clone();
agent_loop(client)
IdentifiableFuture::from_fut_with_ident("agent_loop", async move {
agent_loop(client).await
})
}) })
.await; .await;
}
SCHEDULER.start_blocking().await SCHEDULER.start_blocking().await
}) })

@ -277,9 +277,9 @@ impl Endpoints {
JobType::Shell => { JobType::Shell => {
result.state = JobState::Finished result.state = JobState::Finished
}, },
JobType::Service => (), JobType::Stats => (),
JobType::Terminate => todo!(), JobType::Terminate => (),
JobType::Update => todo!(), JobType::Update => (),
} }
db.update_result(&result)?; db.update_result(&result)?;
} }

@ -8,7 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }
chrono = "0.4.19" chrono = { version = "0.4.19", features = ["serde"] }
diesel = { workspace = true, optional = true } diesel = { workspace = true, optional = true }
diesel-derive-enum = { version = "2.0.0", features = ["postgres"], optional = true } diesel-derive-enum = { version = "2.0.0", features = ["postgres"], optional = true }
deadpool-diesel = { workspace = true, optional = true } deadpool-diesel = { workspace = true, optional = true }
@ -34,7 +34,6 @@ parking_lot = "0.12.1"
bincode = "1.3.3" bincode = "1.3.3"
sha3 = "0.10.7" sha3 = "0.10.7"
cron = "0.12.0" cron = "0.12.0"
async-trait = "0.1.74"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
daemonize = "0.5" daemonize = "0.5"

@ -22,7 +22,7 @@ impl JobCache {
JOB_CACHE.read().contains_key(&id) JOB_CACHE.read().contains_key(&id)
} }
pub fn get<'jh>(id: Id) -> Option<JobCacheHolder<'jh>> { pub fn get(id: Id) -> Option<JobCacheHolder> {
if !Self::contains(id) { if !Self::contains(id) {
return None; return None;
} }
@ -33,11 +33,15 @@ impl JobCache {
pub fn remove(id: Id) { pub fn remove(id: Id) {
JOB_CACHE.write().remove(&id); JOB_CACHE.write().remove(&id);
} }
pub fn stats() -> Vec<Id> {
JOB_CACHE.read().values().map(|job| job.meta.id).collect()
}
} }
pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Id); pub struct JobCacheHolder(pub RwLockReadGuard<'static, Cache>, pub Id);
impl<'jh> Deref for JobCacheHolder<'jh> { impl Deref for JobCacheHolder {
type Target = Val; type Target = Val;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {

@ -1,8 +1,14 @@
use crate::{ use crate::{
cache::JobCache,
combined_result::CombinedResult, combined_result::CombinedResult,
models::{Agent, AssignedJob, AssignedJobById, Job, JobType, PreparedJob, RawJob}, config::get_self_id,
models::{
stats::Stats, Agent, AssignedJob, AssignedJobById, Job, JobType, PreparedJob, RawJob,
},
proc_output::ProcOutput, proc_output::ProcOutput,
u_runner::{ExecResult, URunner}, scheduler::SCHEDULER,
u_runner::{ExecResult, IdentifiableFuture, URunner},
ufs,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::process::exit; use std::process::exit;
@ -17,7 +23,10 @@ impl AnonymousJobBatch {
pub fn from_prepared_jobs(jobs: impl IntoIterator<Item = PreparedJob>) -> Self { pub fn from_prepared_jobs(jobs: impl IntoIterator<Item = PreparedJob>) -> Self {
let mut runner = URunner::new(); let mut runner = URunner::new();
for job in jobs { for job in jobs {
runner.push(run_assigned_job(job)); runner.push(IdentifiableFuture::from_fut_with_ident(
job.ids.id,
run_assigned_job(job),
));
} }
Self { Self {
runner, runner,
@ -25,7 +34,7 @@ impl AnonymousJobBatch {
} }
} }
pub fn from_meta(jobs: impl IntoIterator<Item = Job>) -> Self { pub fn from_jobs(jobs: impl IntoIterator<Item = Job>) -> Self {
let jobs_ids: Vec<_> = jobs let jobs_ids: Vec<_> = jobs
.into_iter() .into_iter()
.map(|job| PreparedJob { .map(|job| PreparedJob {
@ -99,7 +108,7 @@ impl NamedJobBatch {
.map(|job| (job.meta.alias.clone().unwrap(), job)) .map(|job| (job.meta.alias.clone().unwrap(), job))
.unzip(); .unzip();
Self { Self {
runner: Some(AnonymousJobBatch::from_meta(jobs)), runner: Some(AnonymousJobBatch::from_jobs(jobs)),
job_names, job_names,
results: HashMap::new(), results: HashMap::new(),
} }
@ -173,8 +182,18 @@ pub async fn run_assigned_job(prepared_job: PreparedJob) -> ExecResult {
result.set_result(&agent); result.set_result(&agent);
result.retcode = Some(0); result.retcode = Some(0);
} }
JobType::Service => todo!(), JobType::Stats => {
JobType::Update => todo!(), let stats = Stats {
agent_id: get_self_id(),
running_jobs: URunner::stats().await,
scheduled_jobs: SCHEDULER.stats().await,
cached_jobs: JobCache::stats(),
cached_payloads: ufs::stats(),
};
result.set_result(&stats);
result.retcode = Some(0)
}
JobType::Update => (),
JobType::Terminate => exit(0), JobType::Terminate => exit(0),
}; };
Ok(result) Ok(result)
@ -198,7 +217,7 @@ mod tests {
let sleep_jobs = (0..50).map(|_| job.clone()).collect::<Vec<_>>(); let sleep_jobs = (0..50).map(|_| job.clone()).collect::<Vec<_>>();
let now = SystemTime::now(); let now = SystemTime::now();
AnonymousJobBatch::from_meta(sleep_jobs).wait().await; AnonymousJobBatch::from_jobs(sleep_jobs).wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
} }
@ -237,7 +256,7 @@ mod tests {
raw_job = raw_job.with_raw_payload(p); raw_job = raw_job.with_raw_payload(p);
} }
let job = raw_job.try_into_job().unwrap(); let job = raw_job.try_into_job().unwrap();
let result = AnonymousJobBatch::from_meta([job]) let result = AnonymousJobBatch::from_jobs([job])
.wait_one() .wait_one()
.await .await
.unwrap(); .unwrap();
@ -251,8 +270,8 @@ mod tests {
const SLEEP_SECS: u64 = 1; const SLEEP_SECS: u64 = 1;
let now = SystemTime::now(); let now = SystemTime::now();
let longest_job = RawJob::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); let longest_job = RawJob::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = AnonymousJobBatch::from_meta([longest_job]).spawn().await; let longest_job = AnonymousJobBatch::from_jobs([longest_job]).spawn().await;
let ls = AnonymousJobBatch::from_meta([RawJob::from_shell("ls").unwrap()]) let ls = AnonymousJobBatch::from_jobs([RawJob::from_shell("ls").unwrap()])
.wait_one() .wait_one()
.await .await
.unwrap(); .unwrap();
@ -265,7 +284,7 @@ mod tests {
.map(|f| RawJob::from_shell(format!("ls {f}")).unwrap()) .map(|f| RawJob::from_shell(format!("ls {f}")).unwrap())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let ls_subfolders = AnonymousJobBatch::from_meta(subfolders_jobs).wait().await; let ls_subfolders = AnonymousJobBatch::from_jobs(subfolders_jobs).wait().await;
for result in ls_subfolders { for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0); assert_eq!(result.unwrap().retcode.unwrap(), 0);
@ -296,7 +315,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_failing_shell_job() -> TestResult { async fn test_failing_shell_job() -> TestResult {
let job = RawJob::from_shell("lol_kek_puk").unwrap(); let job = RawJob::from_shell("lol_kek_puk").unwrap();
let job_result = AnonymousJobBatch::from_meta([job]) let job_result = AnonymousJobBatch::from_jobs([job])
.wait_one() .wait_one()
.await .await
.unwrap(); .unwrap();

@ -38,6 +38,3 @@ extern crate tracing;
#[cfg(test)] #[cfg(test)]
#[macro_use] #[macro_use]
extern crate rstest; extern crate rstest;
#[macro_use]
extern crate async_trait;

@ -145,7 +145,7 @@ impl AssignedJob {
String::from_utf8_lossy(self.to_raw_result()) String::from_utf8_lossy(self.to_raw_result())
} }
pub fn set_result<S: Serialize>(&mut self, result: &S) { pub fn set_result(&mut self, result: &impl Serialize) {
self.result = Some(serde_json::to_vec(result).unwrap()); self.result = Some(serde_json::to_vec(result).unwrap());
} }

@ -103,7 +103,7 @@ impl JobMeta {
return Err(mk_err("Argv contains no executable placeholder")); return Err(mk_err("Argv contains no executable placeholder"));
} }
if self.argv.contains("{}") && self.payload_id.is_none() { if self.payload_id.is_none() && self.argv.contains("{}") {
return Err(mk_err( return Err(mk_err(
"No payload provided, but argv contains executable placeholder", "No payload provided, but argv contains executable placeholder",
)); ));

@ -35,7 +35,7 @@ pub enum JobState {
)] )]
pub enum JobType { pub enum JobType {
Init, Init,
Service, Stats,
#[default] #[default]
Shell, Shell,
Terminate, Terminate,

@ -3,6 +3,7 @@ mod jobs;
mod payload; mod payload;
#[cfg(feature = "server")] #[cfg(feature = "server")]
pub mod schema; pub mod schema;
pub mod stats;
pub use crate::models::{agent::*, jobs::*, payload::*}; pub use crate::models::{agent::*, jobs::*, payload::*};
use serde::Deserialize; use serde::Deserialize;

@ -185,7 +185,7 @@ impl Payload {
#[cfg(windows)] #[cfg(windows)]
pub fn prepare_executable(&self) -> Result<(File, String)> { pub fn prepare_executable(&self) -> Result<(File, String)> {
todo!() compile_error!("допилить")
} }
} }

@ -0,0 +1,14 @@
use serde::Serialize;
use crate::{messaging::AsMsg, scheduler::EntryStat, types::Id, ufs::IndexFileMeta};
#[derive(Debug, Clone, Serialize)]
pub struct Stats {
pub agent_id: Id,
pub running_jobs: Vec<String>,
pub scheduled_jobs: Vec<EntryStat>,
pub cached_jobs: Vec<Id>,
pub cached_payloads: Vec<IndexFileMeta>,
}
impl AsMsg for Stats {}

@ -1,33 +1,40 @@
use crate::{models::PreparedJob, types::Id}; use crate::{models::PreparedJob, types::Id, u_runner::IdentifiableFuture};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use core::fmt; use core::fmt;
use std::{future::Future, pin::Pin, sync::Arc}; use serde::Serialize;
use std::sync::Arc;
pub type JobFuture = Box<dyn Future<Output = ()> + Send>;
pub trait SchedulerJob { pub trait SchedulerJob {
fn call(&self) -> Pin<JobFuture>; fn call(&self) -> IdentifiableFuture<()>;
} }
impl<F, T> SchedulerJob for F // scheduler job need to be called repeatedly,
// the only way to achieve this is to use Fn() -> Future
impl<F> SchedulerJob for F
where where
F: Fn() -> T, F: Fn() -> IdentifiableFuture<()>,
T: Future<Output = ()> + Send + 'static,
{ {
fn call(&self) -> Pin<JobFuture> { fn call(&self) -> IdentifiableFuture<()> {
Box::pin(self()) self()
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Entry { pub struct Entry {
pub id: Id, pub id: Id,
pub job_id: Id,
pub schedule: Option<cron::Schedule>, pub schedule: Option<cron::Schedule>,
pub next: Option<DateTime<Utc>>, pub next: Option<DateTime<Utc>>,
pub runnable: EntryType, pub runnable: EntryType,
} }
#[derive(Clone, Debug, Serialize)]
pub struct EntryStat {
pub id: Id,
pub job_id: String,
pub schedule: String,
pub next: Option<DateTime<Utc>>,
}
#[derive(Clone)] #[derive(Clone)]
pub enum EntryType { pub enum EntryType {
Common(Arc<dyn SchedulerJob + Send + Sync>), Common(Arc<dyn SchedulerJob + Send + Sync>),
@ -57,6 +64,24 @@ impl Entry {
None => Some(Utc::now()), None => Some(Utc::now()),
} }
} }
pub fn as_stat(&self) -> EntryStat {
let job_id = match &self.runnable {
EntryType::URunner(entry) => entry.job.meta.id.to_string(),
EntryType::Common(entry) => entry.call().job_id.to_string(),
};
EntryStat {
id: self.id,
job_id,
schedule: self
.schedule
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default(),
next: self.next.clone(),
}
}
} }
impl fmt::Debug for Entry { impl fmt::Debug for Entry {

@ -1,5 +1,4 @@
mod entry; mod entry;
mod spawner;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -9,12 +8,15 @@ use cron::Schedule;
use entry::Entry; use entry::Entry;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::timeout;
use uuid::Uuid; use uuid::Uuid;
use crate::jobs::AnonymousJobBatch;
use crate::types::Id; use crate::types::Id;
use self::entry::EntryType; use self::entry::EntryType;
use self::spawner::{CommonSpawner, Spawner, URunnerSpawner};
pub use entry::EntryStat;
pub static SCHEDULER: Lazy<AsyncScheduler> = Lazy::new(|| AsyncScheduler::new()); pub static SCHEDULER: Lazy<AsyncScheduler> = Lazy::new(|| AsyncScheduler::new());
@ -52,16 +54,31 @@ impl AsyncScheduler {
tokio::time::sleep(wait_duration).await; tokio::time::sleep(wait_duration).await;
let mut entries = self.entries.lock().await; let mut entries = self.entries.lock().await;
let mut job_batch = vec![];
CommonSpawner { for entry in &mut *entries {
entries: &mut entries, match entry.next.as_ref() {
} Some(next) => {
.spawn() match &entry.runnable {
.await; EntryType::Common(runnable) => {
let cancel_timeout =
next.timestamp_millis() - Utc::now().timestamp_millis();
let cancel_timeout = Duration::from_millis(cancel_timeout as u64);
URunnerSpawner { tokio::spawn(timeout(cancel_timeout, runnable.call()));
entries: &mut entries, }
EntryType::URunner(runnable) => job_batch.push(runnable.clone()),
}
entry.set_next_run_time();
} }
None => {}
}
if entry.next.as_ref().unwrap().gt(&Utc::now()) {
break;
}
}
AnonymousJobBatch::from_prepared_jobs(job_batch)
.spawn() .spawn()
.await; .await;
@ -72,13 +89,8 @@ impl AsyncScheduler {
pub async fn add_job(&self, schedule: Option<Schedule>, runnable: impl Into<EntryType>) -> Id { pub async fn add_job(&self, schedule: Option<Schedule>, runnable: impl Into<EntryType>) -> Id {
let entry_id = Uuid::new_v4(); let entry_id = Uuid::new_v4();
let runnable = runnable.into(); let runnable = runnable.into();
let job_id = match &runnable {
EntryType::URunner(j) => j.job.meta.id,
_ => Id::new_v4(),
};
let mut entry = Entry { let mut entry = Entry {
id: entry_id, id: entry_id,
job_id,
schedule, schedule,
next: None, next: None,
runnable, runnable,
@ -94,6 +106,15 @@ impl AsyncScheduler {
self.entries.lock().await.retain(|e| e.id != entry_id); self.entries.lock().await.retain(|e| e.id != entry_id);
} }
pub async fn stats(&self) -> Vec<EntryStat> {
self.entries
.lock()
.await
.iter()
.map(|entry| entry.as_stat())
.collect()
}
pub async fn start(&self) { pub async fn start(&self) {
let cloned = self.clone(); let cloned = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -107,6 +128,8 @@ mod tests {
use std::time::Duration; use std::time::Duration;
use tokio::time::sleep; use tokio::time::sleep;
use crate::u_runner::IdentifiableFuture;
use super::*; use super::*;
#[tokio::test(flavor = "multi_thread")] #[tokio::test(flavor = "multi_thread")]
@ -119,29 +142,27 @@ mod tests {
scheduler scheduler
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
let v = v.clone(); let v = v.clone();
async move { IdentifiableFuture::from_fut_with_ident("testf", async move {
*v.lock().await += 1; *v.lock().await += 1;
} })
}) })
.await; .await;
} }
scheduler scheduler
.add_job( .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
Some("*/1 * * * * * *".try_into().unwrap()), IdentifiableFuture::from_fut_with_ident("testf", async move {
move || async move {
println!("hello1"); println!("hello1");
}, })
) })
.await; .await;
scheduler scheduler
.add_job( .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
Some("*/1 * * * * * *".try_into().unwrap()), IdentifiableFuture::from_fut_with_ident("testf", async move {
move || async move {
println!("hello2"); println!("hello2");
}, })
) })
.await; .await;
scheduler.start().await; scheduler.start().await;
@ -170,10 +191,10 @@ mod tests {
scheduler scheduler
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
let counter = counter.clone(); let counter = counter.clone();
async move { IdentifiableFuture::from_fut_with_ident("testf", async move {
sleep(Duration::from_secs(2)).await; sleep(Duration::from_secs(2)).await;
*counter.lock().await += 1; *counter.lock().await += 1;
} })
}) })
.await; .await;
} }

@ -1,85 +0,0 @@
use std::time::Duration;
use crate::jobs::AnonymousJobBatch;
use super::entry::{Entry, EntryType};
use chrono::Utc;
use tokio::time::timeout;
#[async_trait]
pub trait Spawner {
async fn spawn(self);
}
/// Spawn jobs in scheduler thru `tokio::spawn`
pub struct CommonSpawner<'e> {
pub entries: &'e mut [Entry],
}
#[async_trait]
impl Spawner for CommonSpawner<'_> {
async fn spawn(self) {
for entry in self
.entries
.iter_mut()
.filter(|e| matches!(e.runnable, EntryType::Common(_)))
{
if let EntryType::Common(runnable) = &entry.runnable {
match entry.next.as_ref() {
Some(next) => {
let cancel_timeout =
next.timestamp_millis() - Utc::now().timestamp_millis();
let cancel_timeout = Duration::from_millis(cancel_timeout as u64);
tokio::spawn(timeout(cancel_timeout, runnable.call()));
entry.set_next_run_time();
}
None => {
error!("not implemented yet");
todo!();
}
}
}
if entry.next.as_ref().unwrap().gt(&Utc::now()) {
break;
}
}
}
}
/// Spawn jobs in scheduler thru `URunner`
pub struct URunnerSpawner<'e> {
pub entries: &'e mut [Entry],
}
#[async_trait]
impl Spawner for URunnerSpawner<'_> {
async fn spawn(self) {
let mut job_batch = vec![];
for entry in self
.entries
.iter_mut()
.filter(|e| matches!(e.runnable, EntryType::URunner(_)))
{
if let EntryType::URunner(runnable) = &entry.runnable {
match entry.next.as_ref() {
Some(_) => {
job_batch.push(runnable.clone());
entry.set_next_run_time();
}
None => {
error!("not implemented yet");
todo!();
}
}
}
if entry.next.as_ref().unwrap().gt(&Utc::now()) {
break;
}
}
AnonymousJobBatch::from_prepared_jobs(job_batch)
.spawn()
.await;
}
}

@ -1,8 +1,9 @@
use crate::{models::AssignedJob, UResult}; use crate::{models::AssignedJob, UResult};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::HashMap;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::Poll;
use std::{collections::HashMap, task::Context};
use tokio::{ use tokio::{
runtime::Handle, runtime::Handle,
sync::mpsc::{channel, Receiver, Sender}, sync::mpsc::{channel, Receiver, Sender},
@ -12,7 +13,6 @@ use tokio::{
use uuid::Uuid; use uuid::Uuid;
pub type ExecResult = UResult<AssignedJob>; pub type ExecResult = UResult<AssignedJob>;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
lazy_static! { lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
@ -23,10 +23,36 @@ lazy_static! {
}; };
} }
pub struct IdentifiableFuture<R> {
pub job_id: String,
fut: Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>,
}
impl<R> IdentifiableFuture<R> {
pub fn from_fut_with_ident(
job_id: impl Into<String>,
job: impl Future<Output = R> + Send + Sync + 'static,
) -> Self {
Self {
fut: Box::pin(job),
job_id: job_id.into(),
}
}
}
impl<R> Future for IdentifiableFuture<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut.as_mut().poll(cx)
}
}
struct JoinInfo { struct JoinInfo {
job_id: String,
handle: JoinHandle<JoinHandle<ExecResult>>, handle: JoinHandle<JoinHandle<ExecResult>>,
completed: bool, completed: bool,
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed // collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
} }
impl JoinInfo { impl JoinInfo {
@ -43,40 +69,40 @@ fn get_sender() -> Sender<Uuid> {
/// - run 1 or more jobs and wait until they're all done /// - run 1 or more jobs and wait until they're all done
/// - run 1 or more jobs in background and collect results of completed jobs later /// - run 1 or more jobs in background and collect results of completed jobs later
pub struct URunner { pub struct URunner {
tasks: Vec<BoxFuture<'static, ExecResult>>, executables: Vec<IdentifiableFuture<ExecResult>>,
fids: Vec<Uuid>, fids: Vec<Uuid>,
} }
impl URunner { impl URunner {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
tasks: vec![], executables: vec![],
fids: vec![], fids: vec![],
} }
} }
pub fn push(&mut self, task: impl Future<Output = ExecResult> + Send + 'static) { pub fn push(&mut self, job: IdentifiableFuture<ExecResult>) {
self.tasks.push(Box::pin(task)); self.executables.push(job);
} }
/// Spawn prepared tasks /// Spawn prepared tasks
pub async fn spawn(mut self) -> Self { pub async fn spawn(mut self) -> Self {
let collectable = true; //TODO: self.tasks.len() != 1; for executable in self.executables.drain(..) {
for f in self.tasks.drain(..) {
let handle = Handle::current(); let handle = Handle::current();
let fid = Uuid::new_v4(); let fid = Uuid::new_v4();
let tx = get_sender(); let tx = get_sender();
let job_id = executable.job_id.clone();
self.fids.push(fid); self.fids.push(fid);
let task_wrapper = async move { let job_wrapper = async move {
debug!("inside wrapper (started): {}", fid); debug!("inside wrapper (started): {}", fid);
let result = f.await; let result = executable.await;
tx.send(fid).await.unwrap(); tx.send(fid).await.ok();
result result
}; };
let handler = JoinInfo { let handler = JoinInfo {
handle: spawn_blocking(move || handle.spawn(task_wrapper)), job_id,
handle: spawn_blocking(move || handle.spawn(job_wrapper)),
completed: false, completed: false,
collectable,
}; };
FUT_RESULTS.lock().await.insert(fid, handler); FUT_RESULTS.lock().await.insert(fid, handler);
} }
@ -89,25 +115,20 @@ impl URunner {
pub async fn wait(self) -> Vec<ExecResult> { pub async fn wait(self) -> Vec<ExecResult> {
let mut result = vec![]; let mut result = vec![];
for fid in self.fids { for fid in self.fids {
if let Some(task) = Self::pop_task(fid).await { if let Some(job) = Self::pop_job(fid).await {
result.push(task.wait_result().await); result.push(job.wait_result().await);
} }
} }
result result
} }
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> { pub async fn pop_job_if_completed(fid: Uuid) -> Option<ExecResult> {
let &JoinInfo { let Some(&JoinInfo { completed, .. }) = FUT_RESULTS.lock().await.get(&fid) else {
handle: _, return None;
collectable,
completed,
} = match FUT_RESULTS.lock().await.get(&fid) {
Some(t) => t,
None => return None,
}; };
if collectable && completed { if completed {
let task = Self::pop_task(fid).await.unwrap(); let job = Self::pop_job(fid).await.unwrap();
Some(task.wait_result().await) Some(job.wait_result().await)
} else { } else {
None None
} }
@ -122,14 +143,23 @@ impl URunner {
.copied() .copied()
.collect::<Vec<Uuid>>(); .collect::<Vec<Uuid>>();
for fid in fids { for fid in fids {
if let Some(r) = Self::pop_task_if_completed(fid).await { if let Some(r) = Self::pop_job_if_completed(fid).await {
completed.push(r) completed.push(r)
} }
} }
completed completed
} }
async fn pop_task(fid: Uuid) -> Option<JoinInfo> { pub async fn stats() -> Vec<String> {
FUT_RESULTS
.lock()
.await
.values()
.map(|v| v.job_id.clone())
.collect()
}
async fn pop_job(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid) FUT_RESULTS.lock().await.remove(&fid)
} }
} }

@ -1,13 +1,15 @@
use super::{Error, FileMeta}; use super::Error;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::OsString;
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
// index format: given_name -> payload_meta // index format: given_name -> payload_meta
type Index = HashMap<String, FileMeta>; type Index = HashMap<String, IndexFileMeta>;
static IDX_FILE_NAME: Lazy<PathBuf> = Lazy::new(|| temp_dir().join(".i")); static IDX_FILE_NAME: Lazy<PathBuf> = Lazy::new(|| temp_dir().join(".i"));
static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| { static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| {
@ -28,6 +30,35 @@ static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| {
Mutex::new(idx) Mutex::new(idx)
}); });
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct IndexFileMeta {
extension: Option<OsString>,
pub external: bool, // if file is present before adding to index
pub hash: Vec<u8>,
pub path: PathBuf, // actual file path
pub size: u64,
}
impl IndexFileMeta {
pub fn new(
full_path: impl Into<PathBuf>,
hash: Vec<u8>,
external: bool,
) -> Result<Self, Error> {
let full_path: PathBuf = full_path.into();
let extension = full_path.extension().map(ToOwned::to_owned);
let size = fs::metadata(&full_path)?.len();
Ok(IndexFileMeta {
path: full_path,
extension,
external,
hash,
size,
})
}
}
mod sync { mod sync {
use super::{Index, IDX_FILE_NAME}; use super::{Index, IDX_FILE_NAME};
use std::fs; use std::fs;
@ -57,7 +88,7 @@ mod sync {
} }
} }
pub fn get(name: impl AsRef<str>) -> Option<FileMeta> { pub fn get(name: impl AsRef<str>) -> Option<IndexFileMeta> {
let mut index = INDEX.lock(); let mut index = INDEX.lock();
sync::deleted(&mut index); sync::deleted(&mut index);
@ -65,7 +96,7 @@ pub fn get(name: impl AsRef<str>) -> Option<FileMeta> {
index.get(name.as_ref()).cloned() index.get(name.as_ref()).cloned()
} }
pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, FileMeta)> { pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, IndexFileMeta)> {
let mut index = INDEX.lock(); let mut index = INDEX.lock();
sync::deleted(&mut index); sync::deleted(&mut index);
@ -76,7 +107,7 @@ pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, FileMeta)> {
.map(|(n, m)| (n.to_owned(), m.clone())) .map(|(n, m)| (n.to_owned(), m.clone()))
} }
pub fn insert(name: impl Into<String>, meta: FileMeta) { pub fn insert(name: impl Into<String>, meta: IndexFileMeta) {
let mut index = INDEX.lock(); let mut index = INDEX.lock();
sync::deleted(&mut index); sync::deleted(&mut index);
@ -87,7 +118,7 @@ pub fn insert(name: impl Into<String>, meta: FileMeta) {
sync::index2fs(&mut index); sync::index2fs(&mut index);
} }
pub fn remove(name: impl AsRef<str>) -> Option<FileMeta> { pub fn remove(name: impl AsRef<str>) -> Option<IndexFileMeta> {
let mut index = INDEX.lock(); let mut index = INDEX.lock();
sync::deleted(&mut index); sync::deleted(&mut index);
@ -99,3 +130,7 @@ pub fn remove(name: impl AsRef<str>) -> Option<FileMeta> {
result result
} }
pub fn stats() -> Vec<IndexFileMeta> {
INDEX.lock().values().cloned().collect()
}

@ -1,49 +1,19 @@
// This module is aiming to store (obfuscated?) payloads, get them by name, /// This module is aiming to store (obfuscated?) payloads, get them by name,
// rename, update, delete or prepare to execute via memfd_create (unix) /// rename, update, delete or prepare to execute via memfd_create (unix)
mod error;
mod index;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::OsString;
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use uuid::Uuid; use uuid::Uuid;
mod error;
mod index;
pub use error::Error; pub use error::Error;
pub use index::IndexFileMeta;
const OBFUSCATE: bool = cfg!(feature = "agent"); const OBFUSCATE: bool = cfg!(feature = "agent");
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileMeta {
extension: Option<OsString>,
external: bool, // if file is present before adding to index
hash: Vec<u8>,
pub path: PathBuf, // actual file path
pub size: u64,
}
impl FileMeta {
pub fn new(
full_path: impl Into<PathBuf>,
hash: Vec<u8>,
external: bool,
) -> Result<Self, Error> {
let full_path: PathBuf = full_path.into();
let extension = full_path.extension().map(ToOwned::to_owned);
let size = fs::metadata(&full_path)?.len();
Ok(FileMeta {
path: full_path,
extension,
external,
hash,
size,
})
}
}
/// Check if file exists in index. /// Check if file exists in index.
/// File may present in fs but not in index, fn will return false then. /// File may present in fs but not in index, fn will return false then.
pub fn exists_in_index(name: impl AsRef<str>) -> bool { pub fn exists_in_index(name: impl AsRef<str>) -> bool {
@ -51,7 +21,7 @@ pub fn exists_in_index(name: impl AsRef<str>) -> bool {
} }
#[inline] #[inline]
pub fn read_meta(name: impl AsRef<str>) -> Result<FileMeta> { pub fn read_meta(name: impl AsRef<str>) -> Result<IndexFileMeta> {
index::get(&name) index::get(&name)
.ok_or_else(|| Error::not_found(name.as_ref())) .ok_or_else(|| Error::not_found(name.as_ref()))
.context("meta") .context("meta")
@ -113,7 +83,7 @@ pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
}; };
index::insert( index::insert(
name, name,
FileMeta::new(path, data_hash, false).context("put_insert")?, IndexFileMeta::new(path, data_hash, false).context("put_insert")?,
); );
Ok(()) Ok(())
@ -131,7 +101,7 @@ pub fn edit(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
.map_err(|e| Error::new(e, &meta.path)) .map_err(|e| Error::new(e, &meta.path))
.context("edit_write")?; .context("edit_write")?;
let new_meta = FileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?; let new_meta = IndexFileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?;
index::remove(&name); index::remove(&name);
index::insert(name.as_ref(), new_meta); index::insert(name.as_ref(), new_meta);
@ -215,12 +185,16 @@ pub fn put_external(path: impl AsRef<Path>) -> Result<()> {
index::insert( index::insert(
path_str, path_str,
FileMeta::new(path, data_hash, true).context("ext2")?, IndexFileMeta::new(path, data_hash, true).context("ext2")?,
); );
Ok(()) Ok(())
} }
pub fn stats() -> Vec<IndexFileMeta> {
index::stats()
}
/* /*
pub fn cleanup() { pub fn cleanup() {
let index = INDEX.read(); let index = INDEX.read();

Loading…
Cancel
Save