diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index ad8f5c0..ffd82b4 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -5,16 +5,13 @@ use crate::{models::*, UResult, UError}; use std::collections::HashMap; -use std::pin::Pin; -use std::thread::sleep; -use std::time::{Duration, Instant}; use std::task::Poll; -use tokio::process::Command; use futures::{lock::Mutex, prelude::*, poll}; use lazy_static::lazy_static; use tokio::{prelude::*, spawn, task::JoinHandle}; use uuid::Uuid; +use crate::OneOrMany; pub type FutRes = UResult; @@ -22,25 +19,57 @@ lazy_static! { static ref FUT_RESULTS: Mutex>> = Mutex::new(HashMap::new()); } -//TODO: waiter struct -pub async fn append_task(task: impl Future + Send + 'static) -> Uuid { - let fid = Uuid::new_v4(); - let result = spawn(Box::pin(task)); - FUT_RESULTS.lock().await.insert(fid, result); - fid +pub struct Waiter +where T: Future + Send + 'static { + tasks: Vec, + fids: Vec } -pub async fn append_tasks(tasks: Vec + Send + 'static>) -> Vec { - let mut fids = Vec::::new(); - for f in tasks.into_iter() { - let fid = append_task(f).await; - fids.push(fid); +impl Waiter +where + T: Future + Send + 'static +{ + pub fn new>(tasks: S) -> Self { + Self { + tasks: tasks.into_vec(), + fids: vec![] + } + } + + pub async fn spawn(mut self) -> Self { + for f in self.tasks.drain(..) { + let fid = Uuid::new_v4(); + let result = spawn(Box::pin(f)); + FUT_RESULTS.lock().await.insert(fid, result); + self.fids.push(fid); + } + self + } + + // wait until a bunch of tasks is finished + // NOT GUARANTEED that all tasks will be returned due to + // possibility to pop them in other places + pub async fn wait(self) -> Vec { + let mut result = vec![]; + for fid in self.fids { + if let Some(task) = pop_task(fid).await { + result.push(task.await.unwrap()); + } + } + result + } + + pub async fn run_until_complete(self) -> Vec { + self.spawn().await.wait().await + } + + pub async fn run_one_until_complete(self) -> FutRes { + self.run_until_complete().await.pop().unwrap() } - fids } -pub async fn pop_task(fid: Uuid) -> JoinHandle { - FUT_RESULTS.lock().await.remove(&fid).expect(&UError::NoTask(fid).to_string()) +async fn pop_task(fid: Uuid) -> Option> { + FUT_RESULTS.lock().await.remove(&fid) } pub async fn task_present(fid: Uuid) -> bool { @@ -64,22 +93,21 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option{ status } -pub async fn pop_completed(fids: Option>) -> Vec> { - let fids = match fids { - Some(v) => v, - None => FUT_RESULTS.lock() - .await - .keys() - .map(|k| *k) - .collect::>() - }; - let mut completed: Vec> = vec![]; +pub async fn pop_completed() -> Vec { + let fids = FUT_RESULTS.lock() + .await + .keys() + .map(|k| *k) + .collect::>(); + let mut completed: Vec = vec![]; for fid in fids { - completed.push(pop_task_if_completed(fid).await) + if let Some(r) = pop_task_if_completed(fid).await { + completed.push(r); + } } completed } - +/* pub async fn wait_for_task(fid: Uuid) -> FutRes { pop_task(fid).await.await.unwrap() } @@ -95,4 +123,4 @@ pub async fn wait_for_tasks(fids: Vec) -> Vec { pub async fn run_until_complete(task: impl Future + Send + 'static) -> FutRes { let fid = append_task(task).await; wait_for_task(fid).await -} \ No newline at end of file +}*/ \ No newline at end of file diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 3bfcb1d..9e20dc4 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -14,8 +14,6 @@ use crate::{ UID, utils::vec_to_string, models::schema::*, - executor::*, - jobs::exec_job }; use guess_host_triple::guess_host_triple; @@ -55,7 +53,11 @@ pub struct IAgent { pub async fn gather() -> IAgent { async fn run_cmd_fast>(cmd: S) -> String { - let job_result = exec_job(JobMeta::from_shell(cmd)).await.unwrap().result.unwrap(); + let job_result = build_jobs(JobMeta::from_shell(cmd)) + .run_one_until_complete() + .await + .result + .unwrap(); JobOutput::from_raw(&job_result) .map(|o| vec_to_string(&o.into_appropriate())) .unwrap_or(String::from_utf8_lossy(&job_result).to_string()) diff --git a/lib/u_lib/src/models/jobs.rs b/lib/u_lib/src/models/jobs.rs index 00c7f17..a494bb0 100644 --- a/lib/u_lib/src/models/jobs.rs +++ b/lib/u_lib/src/models/jobs.rs @@ -10,7 +10,7 @@ use serde::{ use uuid::Uuid; use guess_host_triple::guess_host_triple; use tokio::process::Command; -use crate::{models::schema::*, UError, UResult, UID, run_until_complete, append_tasks, append_task, wait_for_tasks}; +use crate::{models::schema::*, UError, UResult, UID, Waiter, OneOrMany}; use diesel_derive_enum::DbEnum; use diesel::{ Queryable, @@ -53,12 +53,12 @@ pub enum JobType { } #[derive(Clone, Debug)] -pub struct JobOutput { - pub stdout: Vec, - pub stderr: Vec, +pub struct JobOutput<'s> { + pub stdout: &'s [u8], + pub stderr: &'s [u8], } -impl JobOutput { +impl<'s, 'src: 's> JobOutput<'s> { const STREAM_BORDER: &'static str = "***"; const STDOUT: &'static str = "STDOUT"; const STDERR: &'static str = "STDERR"; @@ -73,17 +73,17 @@ impl JobOutput { pub fn new() -> Self { Self { - stdout: Vec::new(), - stderr: Vec::new(), + stdout: &[], + stderr: &[], } } - pub fn stdout(mut self, data: Vec) -> Self { + pub fn stdout(mut self, data: &'s [u8]) -> Self { self.stdout = data; self } - pub fn stderr(mut self, data: Vec) -> Self { + pub fn stderr(mut self, data: &'s [u8]) -> Self { self.stderr = data; self } @@ -92,40 +92,40 @@ impl JobOutput { let mut result: Vec = vec![]; if self.stdout.len() > 0 { result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes()); - result.extend(&self.stdout); + result.extend(self.stdout); result.push(b'\n'); } if self.stderr.len() > 0 { result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes()); - result.extend(&self.stderr); + result.extend(self.stderr); result.push(b'\n'); } result } - pub fn from_raw(raw: &[u8]) -> Option { + pub fn from_raw(raw: &'src [u8]) -> Option { let raw = String::from_utf8_lossy(raw); let err_header = JobOutput::create_delim(JobOutput::STDERR); raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) .map(|s: &str| { let mut parts = s.split(&err_header) - .map(|d| Vec::from(d.trim().as_bytes())) - .collect::>>() + .map(|d| d.trim().as_bytes()) + .collect::>() .into_iter(); JobOutput::new() .stdout(parts.next().unwrap()) - .stderr(parts.next().unwrap_or(vec![])) + .stderr(parts.next().unwrap_or(&[])) }) } - pub fn into_appropriate(self) -> Vec { + pub fn into_appropriate(self) -> &'s [u8] { if self.stdout.len() > 0 { self.stdout } else if self.stderr.len() > 0 { self.stderr } else { - UError::Raw("No data").to_string().into_bytes() + b"No data" } } } @@ -175,6 +175,8 @@ impl JobResult { inst.job_id = job_id; inst } + + //pub fn as_job_output(&self) -> JobOutput {} } impl Default for JobResult { @@ -238,8 +240,8 @@ impl Job { Ok(output) => { ( Some(JobOutput::new() - .stdout(output.stdout.to_vec()) - .stderr(output.stderr.to_vec()) + .stdout(&output.stdout) + .stderr(&output.stderr) .multiline() ), output.status.code() @@ -262,6 +264,15 @@ impl Job { } } +pub fn build_jobs>(job_metas: J) -> Waiter<_> { + let prepared_jobs = job_metas.into_vec().into_iter().map(|job| { + let j = Job::build(job).unwrap(); + j.run() + }).collect(); + Waiter::new(prepared_jobs) +} + +/* pub async fn exec_jobs(jobs: Vec) -> Vec> { let fids = exec_jobs_nowait(jobs).await.unwrap(); wait_for_tasks(fids).await @@ -286,12 +297,12 @@ pub async fn exec_job_nowait(job_meta: JobMeta) -> UResult { let fid = append_task(job.run()).await; Ok(fid) } - +*/ #[cfg(test)] mod tests { use super::*; - use crate::{exec_job, utils::vec_to_string, wait_for_task, append_task}; + use crate::{build_jobs, utils::vec_to_string}; #[tokio::test] async fn test_is_really_async() { @@ -299,20 +310,19 @@ mod tests { let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); let sleep_jobs = vec![job.clone(), job.clone(), job.clone()]; let now = SystemTime::now(); - let fids = exec_jobs_nowait(sleep_jobs).await.unwrap(); - for f in fids.into_iter() { - wait_for_task(f).await; - } + let fids = build_jobs(sleep_jobs).run_until_complete().await; assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS) } #[tokio::test] async fn test_shell_job() -> UResult<()> { let job = JobMeta::from_shell("whoami"); - let job_result = exec_job(job).await.unwrap(); + let job_result = build_jobs(job) + .run_one_until_complete() + .await; let stdout = JobOutput::from_raw(&job_result.result.unwrap()).unwrap().stdout; assert_eq!( - vec_to_string(&stdout).trim(), + vec_to_string(stdout).trim(), "plazmoid" ); Ok(()) @@ -323,20 +333,22 @@ mod tests { const SLEEP_SECS: u64 = 1; let now = SystemTime::now(); let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); - let longest_job_id = exec_job_nowait(longest_job).await.unwrap(); - let ls = exec_job(JobMeta::from_shell("ls")).await.unwrap(); + let longest_job = build_jobs(longest_job).spawn().await; + let ls = build_jobs(JobMeta::from_shell("ls")) + .run_one_until_complete() + .await; assert_eq!(ls.retcode.unwrap(), 0); let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap(); let folders = String::from_utf8_lossy( &result.stdout ); - let ls_subfolders = exec_jobs( + let ls_subfolders = build_jobs( folders.lines().map(|f| JobMeta::from_shell(format!("ls {}", f))).collect() - ).await; + ).run_until_complete().await; for result in ls_subfolders { assert_eq!(result.unwrap().retcode.unwrap(), 0); } - wait_for_task(longest_job_id).await; + longest_job.wait().await; assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); Ok(()) } @@ -344,7 +356,7 @@ mod tests { #[tokio::test] async fn test_failing_shell_job() -> UResult<()> { let job = JobMeta::from_shell("lol_kek_puk"); - let job_result = exec_job(job).await.unwrap(); + let job_result = build_jobs(job).run_one_until_complete().await; let output = JobOutput::from_raw(&job_result.result.unwrap()); assert!(output.is_none()); assert!(job_result.retcode.is_none()); @@ -354,8 +366,8 @@ mod tests { #[test] fn test_to_multiline() { let mut output = JobOutput::new(); - output.stdout = b"lol".to_vec(); - output.stderr = b"kek".to_vec(); + output.stdout = b"lol"; + output.stderr = b"kek"; assert_eq!( output.multiline(), String::from( @@ -370,7 +382,7 @@ mod tests { #[test] fn test_to_multiline_stderr_only() { let mut output = JobOutput::new(); - output.stderr = b"kek".to_vec(); + output.stderr = b"kek"; assert_eq!( output.multiline(), String::from( diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index e69de29..f459e0f 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -0,0 +1,86 @@ +table! { + use diesel::sql_types::*; + use crate::*; + + agents (id) { + alias -> Nullable, + hostname -> Text, + id -> Uuid, + is_root -> Bool, + is_root_allowed -> Bool, + last_active -> Timestamp, + platform -> Text, + regtime -> Timestamp, + status -> Nullable, + token -> Nullable, + username -> Text, + } +} + +table! { + use diesel::sql_types::*; + use crate::*; + + certificates (id) { + agent_id -> Uuid, + id -> Uuid, + is_revoked -> Bool, + } +} + +table! { + use diesel::sql_types::*; + use crate::*; + + ip_addrs (id) { + agent_id -> Uuid, + check_ts -> Timestamp, + gateway -> Nullable, + id -> Uuid, + iface -> Text, + ip_addr -> Text, + is_gray -> Bool, + netmask -> Text, + } +} + +table! { + use diesel::sql_types::*; + use crate::*; + + jobs (id) { + alias -> Nullable, + id -> Uuid, + exec_type -> JobType, + platform -> Text, + payload -> Nullable, + } +} + +table! { + use diesel::sql_types::*; + use crate::*; + + results (id) { + agent_id -> Uuid, + created -> Timestamp, + id -> Uuid, + job_id -> Uuid, + result -> Nullable, + retcode -> Nullable, + ts -> Timestamp, + } +} + +joinable!(certificates -> agents (agent_id)); +joinable!(ip_addrs -> agents (agent_id)); +joinable!(results -> agents (agent_id)); +joinable!(results -> jobs (job_id)); + +allow_tables_to_appear_in_same_query!( + agents, + certificates, + ip_addrs, + jobs, + results, +); diff --git a/lib/u_lib/src/utils.rs b/lib/u_lib/src/utils.rs index 5dde561..7e2c361 100644 --- a/lib/u_lib/src/utils.rs +++ b/lib/u_lib/src/utils.rs @@ -15,6 +15,21 @@ use nix::{ }; use std::process::exit; +pub trait OneOrMany { + fn into_vec(self) -> Vec; +} + +impl OneOrMany for T { + fn into_vec(self) -> Vec { + vec![self] + } +} + +impl OneOrMany for Vec { + fn into_vec(self) -> Vec { + self + } +} pub fn daemonize() { if getppid().as_raw() != 1 { @@ -46,6 +61,6 @@ pub fn setsig(sig: Signal, hnd: SigHandler) { } } -pub fn vec_to_string(v: &Vec) -> String { +pub fn vec_to_string(v: &[u8]) -> String { String::from_utf8_lossy(v).to_string() } \ No newline at end of file