You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

311 lines
10 KiB

use crate::{
cache::JobCache,
executor::{DynFut, Waiter},
messaging::Reportable,
models::{Agent, AssignedJob, JobMeta, JobType, RawJobMeta},
utils::{CombinedResult, OneOrVec},
UError, UResult,
};
use guess_host_triple::guess_host_triple;
use std::collections::HashMap;
pub struct JobBuilder {
waiter: Waiter,
}
impl JobBuilder {
pub fn from_request(job_requests: impl OneOrVec<AssignedJob>) -> CombinedResult<Self> {
let job_requests = job_requests.into_vec();
let mut prepared: Vec<DynFut> = vec![];
let mut result = CombinedResult::<JobBuilder, _>::new();
for req in job_requests {
let job_meta = JobCache::get(req.job_id);
if job_meta.is_none() {
result.err(UError::NoJob(req.job_id));
continue;
}
let job_meta = job_meta.unwrap();
//waiting for try-blocks stabilization
let built_req = (|| -> UResult<()> {
Ok(match job_meta.exec_type {
JobType::Shell => {
let meta = JobCache::get(req.job_id).ok_or(UError::NoJob(req.job_id))?;
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string();
//TODO: extend platform checking (partial check)
if meta.platform != curr_platform {
return Err(UError::InsuitablePlatform(
meta.platform.clone(),
curr_platform,
)
.into());
}
let job = AssignedJob::new(req.job_id, Some(&req));
prepared.push(Box::pin(job.run()))
}
JobType::Manage => prepared.push(Box::pin(Agent::run())),
_ => todo!(),
})
})();
if let Err(e) = built_req {
result.err(e)
}
}
result.ok(Self {
waiter: Waiter::new(prepared),
});
result
}
pub fn from_meta(job_metas: impl OneOrVec<JobMeta>) -> CombinedResult<Self> {
let job_requests = job_metas
.into_vec()
.into_iter()
.map(|jm| {
let j_uid = jm.id;
JobCache::insert(jm);
AssignedJob::new(j_uid, None)
})
.collect::<Vec<AssignedJob>>();
JobBuilder::from_request(job_requests)
}
/// Spawn jobs and pop results later
pub async fn spawn(mut self) -> Self {
self.waiter = self.waiter.spawn().await;
self
}
/// Spawn jobs and wait for result
pub async fn wait(self) -> Vec<Reportable> {
self.waiter.spawn().await.wait().await
}
/// Spawn one job and wait for result
pub async fn wait_one(self) -> Reportable {
self.waiter.spawn().await.wait().await.pop().unwrap()
}
}
/// Store jobs and get results by name
pub struct NamedJobBuilder {
builder: Option<JobBuilder>,
job_names: Vec<&'static str>,
results: HashMap<&'static str, Reportable>,
}
impl NamedJobBuilder {
pub fn from_shell(
named_jobs: impl OneOrVec<(&'static str, &'static str)>,
) -> CombinedResult<Self> {
let mut result = CombinedResult::new();
let jobs: Vec<(&'static str, JobMeta)> = named_jobs
.into_vec()
.into_iter()
.filter_map(
|(alias, cmd)| match RawJobMeta::builder().with_shell(cmd).build() {
Ok(meta) => Some((alias, meta)),
Err(e) => {
result.err(e);
None
}
},
)
.collect();
result.ok(Self::from_meta(jobs));
result
}
pub fn from_meta(named_jobs: impl OneOrVec<(&'static str, JobMeta)>) -> Self {
let mut job_names = vec![];
let job_metas: Vec<JobMeta> = named_jobs
.into_vec()
.into_iter()
.map(|(alias, meta)| {
job_names.push(alias);
meta
})
.collect();
Self {
builder: Some(JobBuilder::from_meta(job_metas).unwrap_one()),
job_names,
results: HashMap::new(),
}
}
pub async fn wait(mut self) -> Self {
let results = self.builder.take().unwrap().wait().await;
for (name, result) in self.job_names.iter().zip(results.into_iter()) {
self.results.insert(name, result);
}
self
}
pub fn pop_opt(&mut self, name: &'static str) -> Option<Reportable> {
self.results.remove(name)
}
pub fn pop(&mut self, name: &'static str) -> Reportable {
self.pop_opt(name).unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
builder::{JobBuilder, NamedJobBuilder},
models::{misc::JobType, JobMeta},
unwrap_enum,
};
use std::time::SystemTime;
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = RawJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
#[rstest]
#[case::sh_payload(
"/bin/sh {}",
Some(b"echo test01 > /tmp/asd; cat /tmp/asd".as_slice()),
"test01"
)]
#[case::python_cmd(r#"/usr/bin/python3 -c 'print("test02")'"#, None, "test02")]
#[case::sh_multiline_payload(
"/{}",
Some(
br#"#!/bin/sh
TMPPATH=/tmp/lol
mkdir -p $TMPPATH
echo test03 > $TMPPATH/t
cat $TMPPATH/t
rm -rf $TMPPATH"#.as_slice()
),
"test03"
)]
#[case::standalone_binary_with_args(
"/{} 'some msg as arg'",
Some(include_bytes!("../tests/fixtures/echoer").as_slice()),
"some msg as arg"
)]
#[tokio::test]
async fn test_shell_job(
#[case] cmd: &str,
#[case] payload: Option<&[u8]>,
#[case] expected_result: &str,
) -> TestResult {
let mut job = RawJobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let job = job.build().unwrap();
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let result = unwrap_enum!(job_result, Reportable::Assigned);
let result = result.to_string_result();
assert_eq!(result.trim(), expected_result);
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> TestResult {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = RawJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = JobBuilder::from_meta(longest_job)
.unwrap_one()
.spawn()
.await;
let ls = JobBuilder::from_meta(RawJobMeta::from_shell("ls")?)
.unwrap_one()
.wait_one()
.await;
let ls = unwrap_enum!(ls, Reportable::Assigned);
assert_eq!(ls.retcode.unwrap(), 0);
let folders = ls.to_string_result();
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| RawJobMeta::from_shell(format!("ls {}", f)).unwrap())
.collect();
let ls_subfolders = JobBuilder::from_meta(subfolders_jobs)
.unwrap_one()
.wait()
.await;
for result in ls_subfolders {
let result = unwrap_enum!(result, Reportable::Assigned);
assert_eq!(result.retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> TestResult {
let job = RawJobMeta::from_shell("lol_kek_puk")?;
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let job_result = unwrap_enum!(job_result, Reportable::Assigned);
let output = job_result.to_string_result();
assert!(output.contains("No such file"));
assert!(job_result.retcode.is_none());
Ok(())
}
#[rstest]
#[case::no_binary("/bin/bash {}", None, "contains executable")]
#[case::no_path_to_binary("/bin/bash", Some(b"whoami".as_slice()), "contains no executable")]
#[tokio::test]
async fn test_job_building_failed(
#[case] cmd: &str,
#[case] payload: Option<&[u8]>,
#[case] err_str: &str,
) -> TestResult {
let mut job = RawJobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let err = job.build().unwrap_err();
let err_msg = unwrap_enum!(err, UError::JobArgsError);
assert!(err_msg.contains(err_str));
Ok(())
}
#[tokio::test]
async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobBuilder::from_meta(vec![
("sleeper", RawJobMeta::from_shell("sleep 3")?),
(
"gatherer",
RawJobMeta::builder().with_type(JobType::Manage).build()?,
),
])
.wait()
.await;
let gathered = jobs.pop("gatherer");
assert_eq!(unwrap_enum!(gathered, Reportable::Agent).alias, None);
Ok(())
}
}