diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index 5950181..b0d80b5 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -23,7 +23,7 @@ async fn main() { //daemonize(); let arg_ip = env::args().nth(1); let instance = ClientHandler::new(arg_ip); - let cli_info = agent::gather().await; + let cli_info = gather().await; retry_until_ok!(instance.init(&cli_info).await); loop {/* let jobs = retry_until_ok!(instance.get_jobs().await); diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index de5f1be..cba3a30 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -12,7 +12,7 @@ uuid = { version = "0.6.5", features = ["serde", "v4"] } nix = "0.17" libc = "^0.2" lazy_static = "1.4.0" -tokio = { version = "0.2.22", features = ["macros", "process"] } +tokio = { version = "1.2.0", features = ["rt", "sync", "macros", "process", "time"] } reqwest = { version = "0.10.7", features = ["json"] } futures = "0.3.5" guess_host_triple = "0.1.2" diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index ffd82b4..321a84a 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -1,35 +1,52 @@ -// list of jobs: job (cmd, args) OR rust fn OR python func + cron-like timing +// list of jobs: job (cmd, args) OR rust fn OR python func + cron-like timing tokoi::time::interval: // job runner (thread) // every job runs in other thread/process -use crate::{models::*, UResult, UError}; +use crate::{models::*, UResult, UError, OneOrMany}; use std::collections::HashMap; -use std::task::Poll; -use futures::{lock::Mutex, prelude::*, poll}; +use futures::{lock::{Mutex, MutexGuard}, future::BoxFuture}; use lazy_static::lazy_static; -use tokio::{prelude::*, spawn, task::JoinHandle}; +use tokio::{ + spawn, + task::JoinHandle, + sync::mpsc::{channel, Receiver, Sender} +}; use uuid::Uuid; -use crate::OneOrMany; pub type FutRes = UResult; +pub type DynFut = BoxFuture<'static, FutRes>; lazy_static! { - static ref FUT_RESULTS: Mutex>> = Mutex::new(HashMap::new()); + static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); + static ref FUT_CHANNEL: (Mutex>, Mutex>) = { + spawn(init_receiver()); + let (tx, rx) = channel(100); + (Mutex::new(tx), Mutex::new(rx)) + }; +} + +async fn get_static_channel() -> (Sender, MutexGuard<'static, Receiver>) { + ( + FUT_CHANNEL.0.lock().await.clone(), + FUT_CHANNEL.1.lock().await + ) } -pub struct Waiter -where T: Future + Send + 'static { - tasks: Vec, +struct JoinInfo { + handle: JoinHandle, + completed: bool, + collectable: bool // indicates if future can be popped from pool via pop_task_if_completed +} + +pub struct Waiter { + tasks: Vec, fids: Vec } -impl Waiter -where - T: Future + Send + 'static -{ - pub fn new>(tasks: S) -> Self { +impl Waiter { + pub fn new>(tasks: S) -> Self { Self { tasks: tasks.into_vec(), fids: vec![] @@ -37,11 +54,27 @@ where } pub async fn spawn(mut self) -> Self { + let collectable = self.tasks.len() != 1; for f in self.tasks.drain(..) { + //eprintln!("before static channel"); + let tx = get_static_channel().await.0; + //eprintln!("after static channel"); let fid = Uuid::new_v4(); - let result = spawn(Box::pin(f)); - FUT_RESULTS.lock().await.insert(fid, result); self.fids.push(fid); + let task_wrapper = async move { + //eprintln!("inside wrapper (started): {}", fid); + let result = f.await; + tx.send(fid).await.unwrap(); + result + }; + let handle = JoinInfo { + handle: spawn(task_wrapper), + completed: false, + collectable + }; + //eprintln!("before push: {}", fid); + spawn(async {}).await.ok(); + FUT_RESULTS.lock().await.insert(fid, handle); } self } @@ -53,7 +86,8 @@ where let mut result = vec![]; for fid in self.fids { if let Some(task) = pop_task(fid).await { - result.push(task.await.unwrap()); + let r = task.handle.await; + result.push(r.unwrap()); } } result @@ -68,38 +102,44 @@ where } } -async fn pop_task(fid: Uuid) -> Option> { +async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } -pub async fn task_present(fid: Uuid) -> bool { - FUT_RESULTS.lock().await.get(&fid).is_some() +async fn init_receiver() { + while let Some(fid) = get_static_channel().await.1.recv().await { + eprintln!("task {} is done", fid); + if let Some(j) = FUT_RESULTS.lock().await.get_mut(&fid) { + j.completed = true; + } + } } -pub async fn pop_task_if_completed(fid: Uuid) -> Option{ - let mut tasks = FUT_RESULTS +pub async fn pop_task_if_completed(fid: Uuid) -> Option { + let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS .lock() - .await; - let task = tasks - .get_mut(&fid) - .expect(&UError::NoTask(fid).to_string()); - let status = match poll!(task) { - Poll::Pending => None, - Poll::Ready(r) => Some(r.unwrap()) + .await + .get_mut(&fid) { + Some(t) => t, + None => return None }; - if status.is_some() { - pop_task(fid).await; + //eprint!("{}, {}: ", &fid, *collectable); + if collectable && completed { + let task = pop_task(fid).await.unwrap(); + let result = task.handle.await.unwrap(); + Some(result) + } else { + None } - status } pub async fn pop_completed() -> Vec { + let mut completed: Vec = vec![]; let fids = FUT_RESULTS.lock() .await .keys() .map(|k| *k) .collect::>(); - let mut completed: Vec = vec![]; for fid in fids { if let Some(r) = pop_task_if_completed(fid).await { completed.push(r); @@ -107,20 +147,29 @@ pub async fn pop_completed() -> Vec { } completed } -/* -pub async fn wait_for_task(fid: Uuid) -> FutRes { - pop_task(fid).await.await.unwrap() -} -pub async fn wait_for_tasks(fids: Vec) -> Vec { - let mut results = vec![]; - for fid in fids { - results.push(wait_for_task(fid).await); +#[cfg(test)] +mod tests { + use super::*; + + // WTF + // WTF + // WTF + #[tokio::test] + async fn test_spawn() { + use std::sync::Arc; + + let val = Arc::new(Mutex::new(0)); + let t = { + let v = val.clone(); + spawn(async move { + *v.lock().await = 5; + }) + }; + assert_eq!(0, *val.lock().await); + spawn(async {}).await.ok(); + assert_eq!(5, *val.lock().await); + t.await.ok(); + assert_eq!(5, *val.lock().await); } - results -} - -pub async fn run_until_complete(task: impl Future + Send + 'static) -> FutRes { - let fid = append_task(task).await; - wait_for_task(fid).await -}*/ \ No newline at end of file +} \ No newline at end of file diff --git a/lib/u_lib/src/messaging.rs b/lib/u_lib/src/messaging.rs index 03f7b3a..c10a5a5 100644 --- a/lib/u_lib/src/messaging.rs +++ b/lib/u_lib/src/messaging.rs @@ -45,7 +45,7 @@ impl<'cow, I> Message<'cow, I> #[derive(Serialize, Deserialize, Debug, Clone)] pub struct RawMsg(pub String); -impl ToMsg for Vec {} +impl ToMsg for Vec {} //TODO: impl this for all collections #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 9e20dc4..6a038e6 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -53,9 +53,11 @@ pub struct IAgent { pub async fn gather() -> IAgent { async fn run_cmd_fast>(cmd: S) -> String { - let job_result = build_jobs(JobMeta::from_shell(cmd)) + let jm = JobMeta::from_shell(cmd); + let job_result = build_jobs(jm) .run_one_until_complete() .await + .unwrap() .result .unwrap(); JobOutput::from_raw(&job_result) diff --git a/lib/u_lib/src/models/jobs.rs b/lib/u_lib/src/models/jobs.rs index a494bb0..3b80168 100644 --- a/lib/u_lib/src/models/jobs.rs +++ b/lib/u_lib/src/models/jobs.rs @@ -1,7 +1,7 @@ use std::{ -// process::Command, - time::SystemTime, - cmp::PartialEq, + time::{SystemTime, Duration}, + thread, + cmp::PartialEq }; use serde::{ Serialize, @@ -10,7 +10,15 @@ use serde::{ use uuid::Uuid; use guess_host_triple::guess_host_triple; use tokio::process::Command; -use crate::{models::schema::*, UError, UResult, UID, Waiter, OneOrMany}; +use crate::{ + models::schema::*, + UError, + UResult, + UID, + Waiter, + OneOrMany, + DynFut, +}; use diesel_derive_enum::DbEnum; use diesel::{ Queryable, @@ -53,12 +61,12 @@ pub enum JobType { } #[derive(Clone, Debug)] -pub struct JobOutput<'s> { - pub stdout: &'s [u8], - pub stderr: &'s [u8], +pub struct JobOutput { + pub stdout: Vec, + pub stderr: Vec, } -impl<'s, 'src: 's> JobOutput<'s> { +impl JobOutput { const STREAM_BORDER: &'static str = "***"; const STDOUT: &'static str = "STDOUT"; const STDERR: &'static str = "STDERR"; @@ -73,18 +81,18 @@ impl<'s, 'src: 's> JobOutput<'s> { pub fn new() -> Self { Self { - stdout: &[], - stderr: &[], + stdout: vec![], + stderr: vec![], } } - pub fn stdout(mut self, data: &'s [u8]) -> Self { - self.stdout = data; + pub fn stdout(mut self, data: &[u8]) -> Self { + self.stdout = data.to_owned(); self } - pub fn stderr(mut self, data: &'s [u8]) -> Self { - self.stderr = data; + pub fn stderr(mut self, data: &[u8]) -> Self { + self.stderr = data.to_owned(); self } @@ -92,19 +100,19 @@ impl<'s, 'src: 's> JobOutput<'s> { let mut result: Vec = vec![]; if self.stdout.len() > 0 { result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes()); - result.extend(self.stdout); + result.extend(&self.stdout); result.push(b'\n'); } if self.stderr.len() > 0 { result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes()); - result.extend(self.stderr); + result.extend(&self.stderr); result.push(b'\n'); } result } - pub fn from_raw(raw: &'src [u8]) -> Option { + pub fn from_raw(raw: &[u8]) -> Option { let raw = String::from_utf8_lossy(raw); let err_header = JobOutput::create_delim(JobOutput::STDERR); raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) @@ -119,13 +127,13 @@ impl<'s, 'src: 's> JobOutput<'s> { }) } - pub fn into_appropriate(self) -> &'s [u8] { + pub fn into_appropriate(self) -> Vec { if self.stdout.len() > 0 { self.stdout } else if self.stderr.len() > 0 { self.stderr } else { - b"No data" + b"No data".to_vec() } } } @@ -264,54 +272,28 @@ impl Job { } } -pub fn build_jobs>(job_metas: J) -> Waiter<_> { - let prepared_jobs = job_metas.into_vec().into_iter().map(|job| { +pub fn build_jobs>(job_metas: J) -> Waiter { + let prepared_jobs = job_metas.into_vec().into_iter().map(|job| -> DynFut { let j = Job::build(job).unwrap(); - j.run() - }).collect(); + Box::pin(j.run()) + }).collect::>(); Waiter::new(prepared_jobs) } -/* -pub async fn exec_jobs(jobs: Vec) -> Vec> { - let fids = exec_jobs_nowait(jobs).await.unwrap(); - wait_for_tasks(fids).await -} - -pub async fn exec_jobs_nowait(jobs: Vec) -> UResult> { - let prepared_jobs = jobs.into_iter().map(|job| { - let j = Job::build(job).unwrap(); - j.run() - }).collect(); - let fids = append_tasks(prepared_jobs).await; - Ok(fids) -} - -pub async fn exec_job(job_meta: JobMeta) -> UResult { - let job = Job::build(job_meta)?; - run_until_complete(job.run()).await -} - -pub async fn exec_job_nowait(job_meta: JobMeta) -> UResult { - let job = Job::build(job_meta)?; - let fid = append_task(job.run()).await; - Ok(fid) -} -*/ #[cfg(test)] mod tests { use super::*; - use crate::{build_jobs, utils::vec_to_string}; + use crate::{build_jobs, utils::vec_to_string, pop_completed, spawn_dummy}; #[tokio::test] async fn test_is_really_async() { const SLEEP_SECS: u64 = 1; let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); - let sleep_jobs = vec![job.clone(), job.clone(), job.clone()]; + let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); let now = SystemTime::now(); - let fids = build_jobs(sleep_jobs).run_until_complete().await; - assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS) + build_jobs(sleep_jobs).run_until_complete().await; + assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS+2) } #[tokio::test] @@ -320,30 +302,37 @@ mod tests { let job_result = build_jobs(job) .run_one_until_complete() .await; - let stdout = JobOutput::from_raw(&job_result.result.unwrap()).unwrap().stdout; + let stdout = JobOutput::from_raw( + &job_result.unwrap().result.unwrap() + ).unwrap().stdout; assert_eq!( - vec_to_string(stdout).trim(), + vec_to_string(&stdout).trim(), "plazmoid" ); Ok(()) } #[tokio::test] - async fn test_complex_shell_jobs_load() -> UResult<()> { + async fn test_complex_load() -> UResult<()> { const SLEEP_SECS: u64 = 1; let now = SystemTime::now(); let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); let longest_job = build_jobs(longest_job).spawn().await; let ls = build_jobs(JobMeta::from_shell("ls")) .run_one_until_complete() - .await; + .await + .unwrap(); assert_eq!(ls.retcode.unwrap(), 0); let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap(); let folders = String::from_utf8_lossy( &result.stdout ); + let subfolders_jobs: Vec = folders + .lines() + .map(|f| JobMeta::from_shell(format!("ls {}", f))) + .collect(); let ls_subfolders = build_jobs( - folders.lines().map(|f| JobMeta::from_shell(format!("ls {}", f))).collect() + subfolders_jobs ).run_until_complete().await; for result in ls_subfolders { assert_eq!(result.unwrap().retcode.unwrap(), 0); @@ -353,10 +342,27 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_exec_multiple_jobs_nowait() -> UResult<()> { + const REPEATS: usize = 10; + let job = JobMeta::from_shell("whoami"); + let sleep_jobs: Vec = (0..=REPEATS).map(|_| job.clone()).collect(); + build_jobs(sleep_jobs).spawn().await; + let mut completed = 0; + while completed < REPEATS { + let c = pop_completed().await.len(); + if c > 0 { + completed += c; + println!("{}", c); + } + } + Ok(()) + } + #[tokio::test] async fn test_failing_shell_job() -> UResult<()> { let job = JobMeta::from_shell("lol_kek_puk"); - let job_result = build_jobs(job).run_one_until_complete().await; + let job_result = build_jobs(job).run_one_until_complete().await.unwrap(); let output = JobOutput::from_raw(&job_result.result.unwrap()); assert!(output.is_none()); assert!(job_result.retcode.is_none()); @@ -366,29 +372,29 @@ mod tests { #[test] fn test_to_multiline() { let mut output = JobOutput::new(); - output.stdout = b"lol"; - output.stderr = b"kek"; + output.stdout = b"lol".to_vec(); + output.stderr = b"kek".to_vec(); assert_eq!( - output.multiline(), + vec_to_string(&output.multiline()), String::from( "*** STDOUT ***\n\ lol\n\ *** STDERR ***\n\ kek\n" - ).into_bytes() + ) ) } #[test] fn test_to_multiline_stderr_only() { let mut output = JobOutput::new(); - output.stderr = b"kek"; + output.stderr = b"kek".to_vec(); assert_eq!( - output.multiline(), + vec_to_string(&output.multiline()), String::from( "*** STDERR ***\n\ kek\n" - ).into_bytes() + ) ) } @@ -398,8 +404,8 @@ mod tests { puk\n".as_bytes(); let output = JobOutput::from_raw(txt).unwrap(); assert_eq!( - output.stdout, - b"puk".to_vec() + vec_to_string(&output.stdout), + "puk".to_string() ); assert_eq!(output.stderr.len(), 0); }