You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
321 lines
10 KiB
321 lines
10 KiB
use crate::{ |
|
UError, |
|
UErrorBt, |
|
UResult, |
|
cache::JobCache, |
|
executor::{Waiter, DynFut}, |
|
messaging::Reportable, |
|
models::{Agent, AssignedJob, JobMeta, JobType}, |
|
utils::{CombinedResult, OneOrVec} |
|
}; |
|
use guess_host_triple::guess_host_triple; |
|
use std::collections::HashMap; |
|
|
|
pub struct JobBuilder { |
|
waiter: Waiter, |
|
} |
|
|
|
impl JobBuilder { |
|
pub fn from_request(job_requests: impl OneOrVec<AssignedJob>) -> CombinedResult<Self> { |
|
let job_requests = job_requests.into_vec(); |
|
let mut prepared: Vec<DynFut> = vec![]; |
|
let mut result = CombinedResult::<JobBuilder, UErrorBt>::new(); |
|
for req in job_requests { |
|
let job_meta = JobCache::get(req.job_id); |
|
if job_meta.is_none() { |
|
result.err(UError::NoJob(req.job_id).into_bt()); |
|
continue; |
|
} |
|
let job_meta = job_meta.unwrap(); |
|
//waiting for try-blocks stabilization |
|
let built_req = (|| -> UResult<()> { |
|
Ok(match job_meta.exec_type { |
|
JobType::Shell => { |
|
let meta = JobCache::get(req.job_id).ok_or(UError::NoJob(req.job_id))?; |
|
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); |
|
//TODO: extend platform checking (partial check) |
|
if meta.platform != curr_platform { |
|
return Err(UError::InsuitablePlatform( |
|
meta.platform.clone(), |
|
curr_platform, |
|
).into()); |
|
} |
|
let job = AssignedJob::new(req.job_id, Some(&req)); |
|
prepared.push(Box::pin(job.run())) |
|
} |
|
JobType::Manage => prepared.push(Box::pin(Agent::run())), |
|
_ => todo!(), |
|
}) |
|
})(); |
|
if let Err(e) = built_req { |
|
result.err(e) |
|
} |
|
} |
|
result.ok(Self { |
|
waiter: Waiter::new(prepared), |
|
}); |
|
result |
|
} |
|
|
|
pub fn from_meta(job_metas: impl OneOrVec<JobMeta>) -> CombinedResult<Self> { |
|
let job_requests = job_metas |
|
.into_vec() |
|
.into_iter() |
|
.map(|jm| { |
|
let j_uid = jm.id; |
|
JobCache::insert(jm); |
|
AssignedJob::new(j_uid, None) |
|
}) |
|
.collect::<Vec<AssignedJob>>(); |
|
JobBuilder::from_request(job_requests) |
|
} |
|
|
|
/// Spawn jobs and pop results later |
|
pub async fn spawn(mut self) -> Self { |
|
self.waiter = self.waiter.spawn().await; |
|
self |
|
} |
|
|
|
/// Spawn jobs and wait for result |
|
pub async fn wait(self) -> Vec<Reportable> { |
|
self.waiter.spawn().await.wait().await |
|
} |
|
|
|
/// Spawn one job and wait for result |
|
pub async fn wait_one(self) -> Reportable { |
|
self.waiter.spawn().await.wait().await.pop().unwrap() |
|
} |
|
} |
|
|
|
/// Store jobs and get results by name |
|
pub struct NamedJobBuilder { |
|
builder: Option<JobBuilder>, |
|
job_names: Vec<&'static str>, |
|
results: HashMap<&'static str, Reportable>, |
|
} |
|
|
|
impl NamedJobBuilder { |
|
pub fn from_shell( |
|
named_jobs: impl OneOrVec<(&'static str, &'static str)>, |
|
) -> CombinedResult<Self> { |
|
let mut result = CombinedResult::new(); |
|
let jobs: Vec<(&'static str, JobMeta)> = named_jobs |
|
.into_vec() |
|
.into_iter() |
|
.filter_map( |
|
|(alias, cmd)| match JobMeta::builder().with_shell(cmd).build() { |
|
Ok(meta) => Some((alias, meta)), |
|
Err(e) => { |
|
result.err(e); |
|
None |
|
} |
|
}, |
|
) |
|
.collect(); |
|
result.ok(Self::from_meta(jobs)); |
|
result |
|
} |
|
|
|
pub fn from_meta(named_jobs: impl OneOrVec<(&'static str, JobMeta)>) -> Self { |
|
let mut job_names = vec![]; |
|
let job_metas: Vec<JobMeta> = named_jobs |
|
.into_vec() |
|
.into_iter() |
|
.map(|(alias, meta)| { |
|
job_names.push(alias); |
|
meta |
|
}) |
|
.collect(); |
|
Self { |
|
builder: Some(JobBuilder::from_meta(job_metas).unwrap_one()), |
|
job_names, |
|
results: HashMap::new(), |
|
} |
|
} |
|
|
|
pub async fn wait(mut self) -> Self { |
|
let results = self.builder.take().unwrap().wait().await; |
|
for (name, result) in self.job_names.iter().zip(results.into_iter()) { |
|
self.results.insert(name, result); |
|
} |
|
self |
|
} |
|
|
|
pub fn pop_opt(&mut self, name: &'static str) -> Option<Reportable> { |
|
self.results.remove(name) |
|
} |
|
|
|
pub fn pop(&mut self, name: &'static str) -> Reportable { |
|
self.pop_opt(name).unwrap() |
|
} |
|
} |
|
|
|
#[cfg(test)] |
|
mod tests { |
|
|
|
use super::*; |
|
use test_case::test_case; |
|
use std::{time::SystemTime}; |
|
use crate::{ |
|
errors::UError, |
|
models::{ |
|
JobMeta, |
|
misc::JobType |
|
}, |
|
builder::{JobBuilder, NamedJobBuilder}, |
|
unwrap_enum, |
|
}; |
|
|
|
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>; |
|
|
|
#[tokio::test] |
|
async fn test_is_really_async() { |
|
const SLEEP_SECS: u64 = 1; |
|
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); |
|
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect(); |
|
let now = SystemTime::now(); |
|
JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await; |
|
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) |
|
} |
|
|
|
#[test_case( |
|
"/bin/sh {}", |
|
Some(b"echo test01 > /tmp/asd; cat /tmp/asd"), |
|
"test01" |
|
;"sh payload" |
|
)] |
|
#[test_case( |
|
r#"/usr/bin/python3 -c 'print("test02")'"#, |
|
None, |
|
"test02" |
|
;"python cmd" |
|
)] |
|
#[test_case( |
|
"/{}", |
|
Some( |
|
br#"#!/bin/sh |
|
TMPPATH=/tmp/lol |
|
mkdir -p $TMPPATH |
|
echo test03 > $TMPPATH/t |
|
cat $TMPPATH/t |
|
rm -rf $TMPPATH"# |
|
), |
|
"test03" |
|
;"sh multiline payload" |
|
)] |
|
#[test_case( |
|
"/{} 'some msg as arg'", |
|
Some(include_bytes!("../tests/fixtures/echoer")), |
|
"some msg as arg" |
|
;"standalone binary with args" |
|
)] |
|
#[tokio::test] |
|
async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult { |
|
let mut job = JobMeta::builder().with_shell(cmd); |
|
if let Some(p) = payload { |
|
job = job.with_payload(p); |
|
} |
|
let job = job.build().unwrap(); |
|
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; |
|
let result = unwrap_enum!(job_result, Reportable::Assigned); |
|
let result = result.to_string_result(); |
|
assert_eq!(result.trim(), expected_result); |
|
Ok(()) |
|
} |
|
|
|
#[tokio::test] |
|
async fn test_complex_load() -> TestResult { |
|
const SLEEP_SECS: u64 = 1; |
|
let now = SystemTime::now(); |
|
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); |
|
let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await; |
|
let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one() |
|
.wait_one() |
|
.await; |
|
let ls = unwrap_enum!(ls, Reportable::Assigned); |
|
assert_eq!(ls.retcode.unwrap(), 0); |
|
let folders = ls.to_string_result(); |
|
let subfolders_jobs: Vec<JobMeta> = folders |
|
.lines() |
|
.map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) |
|
.collect(); |
|
let ls_subfolders = JobBuilder::from_meta(subfolders_jobs) |
|
.unwrap_one() |
|
.wait() |
|
.await; |
|
for result in ls_subfolders { |
|
let result = unwrap_enum!(result, Reportable::Assigned); |
|
assert_eq!(result.retcode.unwrap(), 0); |
|
} |
|
longest_job.wait().await; |
|
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); |
|
Ok(()) |
|
} |
|
/* |
|
#[tokio::test] |
|
async fn test_exec_multiple_jobs_nowait() -> UResult<()> { |
|
const REPEATS: usize = 10; |
|
let job = JobMeta::from_shell("whoami"); |
|
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect(); |
|
build_jobs(sleep_jobs).spawn().await; |
|
let mut completed = 0; |
|
while completed < REPEATS { |
|
let c = pop_completed().await.len(); |
|
if c > 0 { |
|
completed += c; |
|
println!("{}", c); |
|
} |
|
} |
|
Ok(()) |
|
} |
|
*/ |
|
#[tokio::test] |
|
async fn test_failing_shell_job() -> TestResult { |
|
let job = JobMeta::from_shell("lol_kek_puk")?; |
|
let job_result = JobBuilder::from_meta(job) |
|
.unwrap_one() |
|
.wait_one() |
|
.await; |
|
let job_result = unwrap_enum!(job_result, Reportable::Assigned); |
|
let output = job_result.to_string_result(); |
|
assert!(output.contains("No such file")); |
|
assert!(job_result.retcode.is_none()); |
|
Ok(()) |
|
} |
|
|
|
#[test_case( |
|
"/bin/bash {}", |
|
None, |
|
"contains executable" |
|
; "no binary" |
|
)] |
|
#[test_case( |
|
"/bin/bash", |
|
Some(b"whoami"), |
|
"contains no executable" |
|
; "no path to binary" |
|
)] |
|
#[tokio::test] |
|
async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult { |
|
let mut job = JobMeta::builder().with_shell(cmd); |
|
if let Some(p) = payload { |
|
job = job.with_payload(p); |
|
} |
|
let err = job.build().unwrap_err(); |
|
let err_msg = unwrap_enum!(err.err, UError::JobArgsError); |
|
assert!(err_msg.contains(err_str)); |
|
Ok(()) |
|
} |
|
|
|
#[tokio::test] |
|
async fn test_different_job_types() -> TestResult { |
|
let mut jobs = NamedJobBuilder::from_meta(vec![ |
|
("sleeper", JobMeta::from_shell("sleep 3")?), |
|
("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?) |
|
]).wait().await; |
|
let gathered = jobs.pop("gatherer"); |
|
assert_eq!(unwrap_enum!(gathered, Reportable::Agent).alias, None); |
|
Ok(()) |
|
} |
|
|
|
} |