use crate::{UError, UErrorBt, UResult, cache::JobCache, executor::{Waiter, DynFut}, messaging::Reportable, models::{Agent, AssignedJob, JobMeta, JobType}, utils::{CombinedResult, OneOrVec}}; use guess_host_triple::guess_host_triple; use std::collections::HashMap; pub struct JobBuilder { waiter: Waiter, } impl JobBuilder { pub fn from_request(job_requests: impl OneOrVec) -> CombinedResult { let job_requests = job_requests.into_vec(); let mut prepared: Vec = vec![]; let mut result = CombinedResult::::new(); for req in job_requests { let job_meta = JobCache::get(req.job_id); if job_meta.is_none() { result.err(UError::NoJob(req.job_id).into_bt()); continue; } let job_meta = job_meta.unwrap(); //waiting for try-blocks stabilization let built_req = (|| -> UResult<()> { Ok(match job_meta.exec_type { JobType::Shell => { let meta = JobCache::get(req.job_id).ok_or(UError::NoJob(req.job_id))?; let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); //TODO: extend platform checking (partial check) if meta.platform != curr_platform { return Err(UError::InsuitablePlatform( meta.platform.clone(), curr_platform, ).into()); } let job = AssignedJob::new(req.job_id, Some(&req)); prepared.push(Box::pin(job.run())) } JobType::Manage => prepared.push(Box::pin(Agent::run())), _ => todo!(), }) })(); if let Err(e) = built_req { result.err(e) } } result.ok(Self { waiter: Waiter::new(prepared), }); result } pub fn from_meta(job_metas: impl OneOrVec) -> CombinedResult { let job_requests = job_metas .into_vec() .into_iter() .map(|jm| { let j_uid = jm.id; JobCache::insert(jm); AssignedJob::new(j_uid, None) }) .collect::>(); JobBuilder::from_request(job_requests) } /// Spawn jobs and pop results later pub async fn spawn(mut self) -> Self { self.waiter = self.waiter.spawn().await; self } /// Spawn jobs and wait for result pub async fn wait(self) -> Vec { self.waiter.spawn().await.wait().await } /// Spawn one job and wait for result pub async fn wait_one(self) -> Reportable { self.waiter.spawn().await.wait().await.pop().unwrap() } } /// Store jobs and get results by name pub struct NamedJobBuilder { builder: Option, job_names: Vec<&'static str>, results: HashMap<&'static str, Reportable>, } impl NamedJobBuilder { pub fn from_shell( named_jobs: impl OneOrVec<(&'static str, &'static str)>, ) -> CombinedResult { let mut result = CombinedResult::new(); let jobs: Vec<(&'static str, JobMeta)> = named_jobs .into_vec() .into_iter() .filter_map( |(alias, cmd)| match JobMeta::builder().with_shell(cmd).build() { Ok(meta) => Some((alias, meta)), Err(e) => { result.err(e); None } }, ) .collect(); result.ok(Self::from_meta(jobs)); result } pub fn from_meta(named_jobs: impl OneOrVec<(&'static str, JobMeta)>) -> Self { let mut job_names = vec![]; let job_metas: Vec = named_jobs .into_vec() .into_iter() .map(|(alias, meta)| { job_names.push(alias); meta }) .collect(); Self { builder: Some(JobBuilder::from_meta(job_metas).unwrap_one()), job_names, results: HashMap::new(), } } pub async fn wait(mut self) -> Self { let results = self.builder.take().unwrap().wait().await; for (name, result) in self.job_names.iter().zip(results.into_iter()) { self.results.insert(name, result); } self } pub fn pop_opt(&mut self, name: &'static str) -> Option { self.results.remove(name) } pub fn pop(&mut self, name: &'static str) -> Reportable { self.pop_opt(name).unwrap() } } #[cfg(test)] mod tests { use super::*; use test_case::test_case; use std::{time::SystemTime}; use crate::{ errors::UError, models::{ JobMeta, misc::JobType }, builder::{JobBuilder, NamedJobBuilder}, unwrap_enum, }; type TestResult = Result>; #[tokio::test] async fn test_is_really_async() { const SLEEP_SECS: u64 = 1; let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); let now = SystemTime::now(); JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await; assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) } #[test_case( "/bin/sh {}", Some(b"echo test01 > /tmp/asd; cat /tmp/asd"), "test01" ;"sh payload" )] #[test_case( r#"/usr/bin/python3 -c 'print("test02")'"#, None, "test02" ;"python cmd" )] #[test_case( "/{}", Some( br#"#!/bin/sh TMPPATH=/tmp/lol mkdir -p $TMPPATH echo test03 > $TMPPATH/t cat $TMPPATH/t rm -rf $TMPPATH"# ), "test03" ;"sh multiline payload" )] #[test_case( "/{} 'some msg as arg'", Some(include_bytes!("../tests/fixtures/echoer")), "some msg as arg" ;"standalone binary with args" )] #[tokio::test] async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult { let mut job = JobMeta::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_payload(p); } let job = job.build().unwrap(); let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; let result = unwrap_enum!(job_result, Reportable::Assigned); let result = result.to_string_result(); assert_eq!(result.trim(), expected_result); Ok(()) } #[tokio::test] async fn test_complex_load() -> TestResult { const SLEEP_SECS: u64 = 1; let now = SystemTime::now(); let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await; let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one() .wait_one() .await; let ls = unwrap_enum!(ls, Reportable::Assigned); assert_eq!(ls.retcode.unwrap(), 0); let folders = ls.to_string_result(); let subfolders_jobs: Vec = folders .lines() .map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) .collect(); let ls_subfolders = JobBuilder::from_meta(subfolders_jobs) .unwrap_one() .wait() .await; for result in ls_subfolders { let result = unwrap_enum!(result, Reportable::Assigned); assert_eq!(result.retcode.unwrap(), 0); } longest_job.wait().await; assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); Ok(()) } /* #[tokio::test] async fn test_exec_multiple_jobs_nowait() -> UResult<()> { const REPEATS: usize = 10; let job = JobMeta::from_shell("whoami"); let sleep_jobs: Vec = (0..=REPEATS).map(|_| job.clone()).collect(); build_jobs(sleep_jobs).spawn().await; let mut completed = 0; while completed < REPEATS { let c = pop_completed().await.len(); if c > 0 { completed += c; println!("{}", c); } } Ok(()) } */ #[tokio::test] async fn test_failing_shell_job() -> TestResult { let job = JobMeta::from_shell("lol_kek_puk")?; let job_result = JobBuilder::from_meta(job) .unwrap_one() .wait_one() .await; let job_result = unwrap_enum!(job_result, Reportable::Assigned); let output = job_result.to_string_result(); assert!(output.contains("No such file")); assert!(job_result.retcode.is_none()); Ok(()) } #[test_case( "/bin/bash {}", None, "contains executable" ; "no binary" )] #[test_case( "/bin/bash", Some(b"whoami"), "contains no executable" ; "no path to binary" )] #[tokio::test] async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult { let mut job = JobMeta::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_payload(p); } let err = job.build().unwrap_err(); let err_msg = unwrap_enum!(err.err, UError::JobArgsError); assert!(err_msg.contains(err_str)); Ok(()) } #[tokio::test] async fn test_different_job_types() -> TestResult { let mut jobs = NamedJobBuilder::from_meta(vec![ ("sleeper", JobMeta::from_shell("sleep 3")?), ("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?) ]).wait().await; let gathered = jobs.pop("gatherer"); assert_eq!(unwrap_enum!(gathered, Reportable::Agent).alias, None); Ok(()) } }