@ -1,70 +1,49 @@
use crate ::{
use crate ::{
cache ::JobCache ,
combined_result ::CombinedResult ,
combined_result ::CombinedResult ,
executor ::{ ExecResult , Waiter } ,
executor ::{ ExecResult , Waiter } ,
misc ::OneOrVec ,
misc ::OneOrVec ,
models ::{ Agent , AssignedJob , AssignedJobById , JobMeta , JobType } ,
models ::{ Agent , AssignedJob , AssignedJobById , FatJobMeta , JobType , Payload , ThinJobMeta } ,
platform ::Platform ,
proc_output ::ProcOutput ,
proc_output ::ProcOutput ,
tempfile ::TempFile ,
ufs ,
UError , UResult ,
} ;
} ;
use std ::collections ::HashMap ;
use std ::collections ::HashMap ;
use std ::process ::exit ;
use std ::process ::exit ;
use tokio ::process ::Command ;
use tokio ::process ::Command ;
pub struct JobRunner {
pub struct UnnamedJobsBatch {
waiter : Waiter ,
waiter : Waiter ,
is_running : bool ,
is_running : bool ,
}
}
impl JobRunner {
impl UnnamedJobsBatch {
pub fn from_jobs ( jobs : impl OneOrVec < AssignedJobById > ) -> CombinedResult < Self > {
pub fn from_meta_with_id ( jobs : impl OneOrVec < ( ThinJobMeta , AssignedJobById ) > ) -> Self {
let jobs = jobs . into_vec ( ) ;
let jobs = jobs . into_vec ( ) ;
let mut waiter = Waiter ::new ( ) ;
let mut waiter = Waiter ::new ( ) ;
let mut result = CombinedResult ::new ( ) ;
for ( meta , job ) in jobs {
for job in jobs {
waiter . push ( run_assigned_job ( meta , job ) ) ;
//waiting for try-blocks stabilization
let built_job : UResult < ( ) > = ( | | {
let meta = JobCache ::get ( job . job_id ) . ok_or ( UError ::NoJob ( job . job_id ) ) ? ;
let curr_platform = Platform ::current ( ) ;
if ! curr_platform . matches ( & meta . platform ) {
return Err ( UError ::InsuitablePlatform (
meta . platform . clone ( ) ,
curr_platform . into_string ( ) ,
) ) ;
}
let job = AssignedJob ::from ( ( & * meta , job ) ) ;
waiter . push ( run_assigned_job ( job ) ) ;
Ok ( ( ) )
} ) ( ) ;
if let Err ( e ) = built_job {
result . err ( e )
}
}
}
result . ok ( Self {
Self {
waiter ,
waiter ,
is_running : false ,
is_running : false ,
} ) ;
}
result
}
}
pub fn from_meta ( metas : impl OneOrVec < JobMeta > ) -> CombinedResult < Self > {
pub fn from_meta ( metas : impl OneOrVec < ThinJobMeta > ) -> Self {
let jobs = metas
let jobs : Vec < _ > = metas
. into_vec ( )
. into_vec ( )
. into_iter ( )
. into_iter ( )
. map ( | jm | {
. map ( | meta | {
let job_id = jm . id ;
let job_id = meta . id ;
if ! JobCache ::contains ( job_id ) {
(
JobCache ::insert ( jm ) ;
meta ,
}
AssignedJobById {
AssignedJobById {
job_id ,
job_id ,
.. Default ::default ( )
.. Default ::default ( )
}
} ,
)
} )
} )
. collect ::< Vec < AssignedJobById > > ( ) ;
. collect ( ) ;
JobRunner ::from_jobs ( jobs )
UnnamedJobsBatch ::from_meta_with_id ( jobs )
}
}
/// Spawn jobs
/// Spawn jobs
@ -90,24 +69,20 @@ impl JobRunner {
}
}
}
}
pub async fn run_assigned_job ( mut job : AssignedJob ) -> ExecResult {
pub async fn run_assigned_job ( meta : ThinJobMeta , ids : AssignedJobById ) -> ExecResult {
match job . exec_type {
let mut job = AssignedJob ::from ( ( & meta , ids ) ) ;
match meta . exec_type {
JobType ::Shell = > {
JobType ::Shell = > {
let ( argv , _payload ) = {
let ( argv , _prepared_payload ) = {
let meta = JobCache ::get ( job . job_id ) . unwrap ( ) ;
if let Some ( ref payload ) = meta . payload {
if let Some ( ref payload ) = meta . payload {
let extracted_payload = match TempFile ::write_exec ( payload ) {
let ( prep_exec , prep_exec_path ) = ufs ::prepare_executable ( payload ) ? ;
Ok ( p ) = > p ,
let argv_with_exec = meta . argv . replace ( "{}" , & prep_exec_path ) ;
Err ( e ) = > return Err ( UError ::Runtime ( e . to_string ( ) ) ) ,
( argv_with_exec , Some ( prep_exec ) )
} ;
(
meta . argv . replace ( "{}" , & extracted_payload . get_path ( ) ) ,
Some ( extracted_payload ) ,
)
} else {
} else {
( meta . argv . clone ( ) , None )
( meta . argv . clone ( ) , None )
}
}
} ;
} ;
let mut split_cmd = shlex ::split ( & argv ) . unwrap ( ) . into_iter ( ) ;
let mut split_cmd = shlex ::split ( & argv ) . unwrap ( ) . into_iter ( ) ;
let cmd = split_cmd . nth ( 0 ) . unwrap ( ) ;
let cmd = split_cmd . nth ( 0 ) . unwrap ( ) ;
let args = split_cmd . collect ::< Vec < String > > ( ) ;
let args = split_cmd . collect ::< Vec < String > > ( ) ;
@ -119,7 +94,7 @@ pub async fn run_assigned_job(mut job: AssignedJob) -> ExecResult {
) ,
) ,
Err ( e ) = > (
Err ( e ) = > (
ProcOutput ::new ( )
ProcOutput ::new ( )
. stderr ( e . to_string ( ) . into_bytes ( ) )
. set_s tderr ( e . to_string ( ) . into_bytes ( ) )
. into_vec ( ) ,
. into_vec ( ) ,
None ,
None ,
) ,
) ,
@ -138,61 +113,116 @@ pub async fn run_assigned_job(mut job: AssignedJob) -> ExecResult {
Ok ( job )
Ok ( job )
}
}
pub fn fat_meta_to_thin ( meta : FatJobMeta < true > ) -> Result < ThinJobMeta , ufs ::Error > {
let payload_ident = if let Some ( mut payload ) = meta . payload {
let job_name = match & meta . alias {
Some ( a ) = > a . to_string ( ) ,
None = > meta . id . simple ( ) . to_string ( ) ,
} ;
payload . write_self_into ( & job_name ) ? ;
Some ( job_name )
} else {
None
} ;
Ok ( ThinJobMeta {
alias : meta . alias ,
argv : meta . argv ,
id : meta . id ,
exec_type : meta . exec_type ,
platform : meta . platform ,
payload : payload_ident ,
schedule : meta . schedule ,
} )
}
pub fn thin_meta_to_fat ( meta : ThinJobMeta ) -> Result < FatJobMeta < true > , ufs ::Error > {
let payload = if let Some ( payload ) = meta . payload {
let mut fat_payload = Payload ::Ident ( payload ) ;
fat_payload . read_into_self ( ) ? ;
Some ( fat_payload )
} else {
None
} ;
Ok ( FatJobMeta {
alias : meta . alias ,
argv : meta . argv ,
id : meta . id ,
exec_type : meta . exec_type ,
platform : meta . platform ,
payload ,
schedule : meta . schedule ,
} )
}
/// Store jobs and get results by name
/// Store jobs and get results by name
pub struct NamedJobRunner {
pub struct NamedJobsBatch < const FINISHED : bool = false > {
runner : Option < JobRunner > ,
runner : Option < UnnamedJobsBatch > ,
job_names : Vec < & ' static str > ,
job_names : Vec < String > ,
results : HashMap < & ' static str , ExecResult > ,
results : HashMap < String , ExecResult > ,
}
}
impl NamedJobRunner {
impl NamedJobsBatch {
pub fn from_shell (
pub fn from_shell (
named_jobs : impl OneOrVec < ( & ' static str , & ' static str ) > ,
named_jobs : impl OneOrVec < ( & ' static str , & ' static str ) > ,
) -> CombinedResult < Self > {
) -> CombinedResult < Self > {
let mut result = CombinedResult ::new ( ) ;
let mut result = CombinedResult ::new ( ) ;
let jobs : Vec < ( & ' static str , JobMeta ) > = named_jobs
let jobs : Vec < _ > = named_jobs
. into_vec ( )
. into_vec ( )
. into_iter ( )
. into_iter ( )
. filter_map (
. filter_map ( | ( alias , cmd ) | {
| ( alias , cmd ) | match JobMeta ::builder ( ) . with_shell ( cmd ) . build ( ) {
match FatJobMeta ::builder ( )
Ok ( meta ) = > Some ( ( alias , meta ) ) ,
. with_shell ( cmd )
. with_alias ( alias )
. build ( )
{
Ok ( fat_meta ) = > match fat_meta_to_thin ( fat_meta ) {
Ok ( thin_meta ) = > Some ( thin_meta ) ,
Err ( e ) = > {
Err ( e ) = > {
result . err ( e ) ;
result . err ( e ) ;
None
None
}
}
} ,
} ,
)
Err ( e ) = > {
result . err ( e ) ;
None
}
}
} )
. collect ( ) ;
. collect ( ) ;
result . ok ( Self ::from_meta ( jobs ) ) ;
result . ok ( Self ::from_meta ( jobs ) ) ;
result
result
}
}
pub fn from_meta ( named_jobs : impl OneOrVec < ( & ' static str , JobMeta ) > ) -> Self {
pub fn from_meta ( named_jobs : impl OneOrVec < ThinJobMeta > ) -> Self {
let mut job_names = vec! [ ] ;
let ( job_names , job_metas ) : ( Vec < _ > , Vec < _ > ) = named_jobs
let job_metas : Vec < JobMeta > = named_jobs
. into_vec ( )
. into_vec ( )
. into_iter ( )
. into_iter ( )
. map ( | ( alias , mut meta ) | {
. map ( | meta | ( meta . alias . clone ( ) . unwrap ( ) , meta ) )
job_names . push ( alias ) ;
. unzip ( ) ;
meta . alias = Some ( alias . to_string ( ) ) ;
meta
} )
. collect ( ) ;
Self {
Self {
runner : Some ( JobRunner ::from_meta ( job_metas ) . unwrap_one ( ) ) ,
runner : Some ( UnnamedJobsBatch ::from_meta ( job_metas ) ) ,
job_names ,
job_names ,
results : HashMap ::new ( ) ,
results : HashMap ::new ( ) ,
}
}
}
}
pub async fn wait ( mut self ) -> Self {
pub async fn wait ( mut self ) -> NamedJobsBatch < true > {
let results = self . runner . take ( ) . unwrap ( ) . wait ( ) . await ;
let results = self . runner . take ( ) . unwrap ( ) . wait ( ) . await ;
for ( name , result ) in self . job_names . iter ( ) . zip ( results . into_iter ( ) ) {
for ( name , result ) in self . job_names . into_i ter ( ) . zip ( results . into_iter ( ) ) {
self . results . insert ( name , result ) ;
self . results . insert ( name , result ) ;
}
}
self
NamedJobsBatch ::< true > {
runner : None ,
job_names : vec ! [ ] ,
results : self . results ,
}
}
}
}
impl NamedJobsBatch < true > {
pub fn pop_opt ( & mut self , name : & ' static str ) -> Option < ExecResult > {
pub fn pop_opt ( & mut self , name : & ' static str ) -> Option < ExecResult > {
self . results . remove ( name )
self . results . remove ( name )
}
}
@ -206,9 +236,9 @@ impl NamedJobRunner {
mod tests {
mod tests {
use super ::* ;
use super ::* ;
use crate ::{
use crate ::{
models ::{ misc ::JobType , JobMeta } ,
jobs ::{ NamedJobsBatch , UnnamedJobsBatch } ,
runner ::{ JobRunner , NamedJobRunner } ,
models ::{ misc ::JobType , FatJobMeta } ,
unwrap_enum ,
unwrap_enum , UError ,
} ;
} ;
use std ::time ::SystemTime ;
use std ::time ::SystemTime ;
@ -217,10 +247,13 @@ mod tests {
#[ tokio::test ]
#[ tokio::test ]
async fn test_is_really_async ( ) {
async fn test_is_really_async ( ) {
const SLEEP_SECS : u64 = 1 ;
const SLEEP_SECS : u64 = 1 ;
let job = JobMeta ::from_shell ( format! ( "sleep {}" , SLEEP_SECS ) ) . unwrap ( ) ;
let job = FatJobMeta ::from_shell ( format! ( "sleep {SLEEP_SECS}" ) ) . unwrap ( ) ;
let sleep_jobs : Vec < JobMeta > = ( 0 .. 50 ) . map ( | _ | job . clone ( ) ) . collect ( ) ;
let sleep_jobs = ( 0 .. 50 )
. map ( | _ | fat_meta_to_thin ( job . clone ( ) ) . unwrap ( ) )
. collect ::< Vec < _ > > ( ) ;
let now = SystemTime ::now ( ) ;
let now = SystemTime ::now ( ) ;
JobRunner ::from_meta ( sleep_jobs ) . unwrap_one ( ) . wait ( ) . await ;
UnnamedJobsBatch ::from_meta ( sleep_jobs ) . wait ( ) . await ;
assert! ( now . elapsed ( ) . unwrap ( ) . as_secs ( ) < SLEEP_SECS + 2 )
assert! ( now . elapsed ( ) . unwrap ( ) . as_secs ( ) < SLEEP_SECS + 2 )
}
}
@ -254,16 +287,12 @@ mod tests {
#[ case ] payload : Option < & [ u8 ] > ,
#[ case ] payload : Option < & [ u8 ] > ,
#[ case ] expected_result : & str ,
#[ case ] expected_result : & str ,
) -> TestResult {
) -> TestResult {
let mut job = JobMeta ::builder ( ) . with_shell ( cmd ) ;
let mut job = Fat JobMeta ::builder ( ) . with_shell ( cmd ) ;
if let Some ( p ) = payload {
if let Some ( p ) = payload {
job = job . with_payload ( p ) ;
job = job . with_payload ( p ) ;
}
}
let job = job . build ( ) . unwrap ( ) ;
let job = fat_meta_to_thin ( job . build ( ) . unwrap ( ) ) . unwrap ( ) ;
let result = JobRunner ::from_meta ( job )
let result = UnnamedJobsBatch ::from_meta ( job ) . wait_one ( ) . await . unwrap ( ) ;
. unwrap_one ( )
. wait_one ( )
. await
. unwrap ( ) ;
let result = result . to_str_result ( ) ;
let result = result . to_str_result ( ) ;
assert_eq! ( result . trim ( ) , expected_result ) ;
assert_eq! ( result . trim ( ) , expected_result ) ;
Ok ( ( ) )
Ok ( ( ) )
@ -273,26 +302,31 @@ mod tests {
async fn test_complex_load ( ) -> TestResult {
async fn test_complex_load ( ) -> TestResult {
const SLEEP_SECS : u64 = 1 ;
const SLEEP_SECS : u64 = 1 ;
let now = SystemTime ::now ( ) ;
let now = SystemTime ::now ( ) ;
let longest_job = JobMeta ::from_shell ( format! ( "sleep {}" , SLEEP_SECS ) ) . unwrap ( ) ;
let longest_job = FatJobMeta ::from_shell ( format! ( "sleep {}" , SLEEP_SECS ) ) . unwrap ( ) ;
let longest_job = JobRunner ::from_meta ( longest_job ) . unwrap_one ( ) . spawn ( ) . await ;
let longest_job = UnnamedJobsBatch ::from_meta ( fat_meta_to_thin ( longest_job ) . unwrap ( ) )
let ls = JobRunner ::from_meta ( JobMeta ::from_shell ( "ls" ) ? )
. spawn ( )
. unwrap_one ( )
. await ;
let ls = UnnamedJobsBatch ::from_meta (
fat_meta_to_thin ( FatJobMeta ::from_shell ( "ls" ) . unwrap ( ) ) . unwrap ( ) ,
)
. wait_one ( )
. wait_one ( )
. await
. await
. unwrap ( ) ;
. unwrap ( ) ;
assert_eq! ( ls . retcode . unwrap ( ) , 0 ) ;
assert_eq! ( ls . retcode . unwrap ( ) , 0 ) ;
let folders = ls . to_str_result ( ) ;
let folders = ls . to_str_result ( ) ;
let subfolders_jobs : Vec < JobMeta > = folders
let subfolders_jobs = folders
. lines ( )
. lines ( )
. map ( | f | JobMeta ::from_shell ( format! ( "ls {}" , f ) ) . unwrap ( ) )
. map ( | f | fat_meta_to_thin ( FatJobMeta ::from_shell ( format! ( "ls {f}" ) ) . unwrap ( ) ) . unwrap ( ) )
. collect ( ) ;
. collect ::< Vec < _ > > ( ) ;
let ls_subfolders = JobRunner ::from_meta ( subfolders_jobs )
. unwrap_one ( )
let ls_subfolders = UnnamedJobsBatch ::from_meta ( subfolders_jobs ) . wait ( ) . await ;
. wait ( )
. await ;
for result in ls_subfolders {
for result in ls_subfolders {
assert_eq! ( result . unwrap ( ) . retcode . unwrap ( ) , 0 ) ;
assert_eq! ( result . unwrap ( ) . retcode . unwrap ( ) , 0 ) ;
}
}
longest_job . wait ( ) . await ;
longest_job . wait ( ) . await ;
assert_eq! ( now . elapsed ( ) . unwrap ( ) . as_secs ( ) , SLEEP_SECS ) ;
assert_eq! ( now . elapsed ( ) . unwrap ( ) . as_secs ( ) , SLEEP_SECS ) ;
Ok ( ( ) )
Ok ( ( ) )
@ -317,12 +351,8 @@ mod tests {
* /
* /
#[ tokio::test ]
#[ tokio::test ]
async fn test_failing_shell_job ( ) -> TestResult {
async fn test_failing_shell_job ( ) -> TestResult {
let job = JobMeta ::from_shell ( "lol_kek_puk" ) ? ;
let job = fat_meta_to_thin ( FatJobMeta ::from_shell ( "lol_kek_puk" ) . unwrap ( ) ) . unwrap ( ) ;
let job_result = JobRunner ::from_meta ( job )
let job_result = UnnamedJobsBatch ::from_meta ( job ) . wait_one ( ) . await . unwrap ( ) ;
. unwrap_one ( )
. wait_one ( )
. await
. unwrap ( ) ;
let output = job_result . to_str_result ( ) ;
let output = job_result . to_str_result ( ) ;
assert! ( output . contains ( "No such file" ) ) ;
assert! ( output . contains ( "No such file" ) ) ;
assert! ( job_result . retcode . is_none ( ) ) ;
assert! ( job_result . retcode . is_none ( ) ) ;
@ -338,29 +368,39 @@ mod tests {
#[ case ] payload : Option < & [ u8 ] > ,
#[ case ] payload : Option < & [ u8 ] > ,
#[ case ] err_str : & str ,
#[ case ] err_str : & str ,
) -> TestResult {
) -> TestResult {
let mut job = JobMeta ::builder ( ) . with_shell ( cmd ) ;
let mut job = Fat JobMeta ::builder ( ) . with_shell ( cmd ) ;
if let Some ( p ) = payload {
if let Some ( p ) = payload {
job = job . with_payload ( p ) ;
job = job . with_payload ( p ) ;
}
}
let err = job . build ( ) . unwrap_err ( ) ;
let err = job . build ( ) . unwrap_err ( ) ;
let err_msg = unwrap_enum ! ( err , UError ::JobArgs Error ) ;
let err_msg = unwrap_enum ! ( err , UError ::JobBuild Error ) ;
assert! ( err_msg . contains ( err_str ) ) ;
assert! ( err_msg . contains ( err_str ) ) ;
Ok ( ( ) )
Ok ( ( ) )
}
}
#[ tokio::test ]
#[ tokio::test ]
async fn test_different_job_types ( ) -> TestResult {
async fn test_different_job_types ( ) -> TestResult {
let mut jobs = NamedJobRunner ::from_meta ( vec! [
let mut jobs = NamedJobsBatch ::from_meta (
( "sleeper" , JobMeta ::from_shell ( "sleep 3" ) ? ) ,
[
(
FatJobMeta ::builder ( )
"gatherer" ,
. with_shell ( "sleep 3" )
JobMeta ::builder ( ) . with_type ( JobType ::Init ) . build ( ) ? ,
. with_alias ( "sleeper" )
) ,
. build ( )
] )
. unwrap ( ) ,
FatJobMeta ::builder ( )
. with_type ( JobType ::Init )
. with_alias ( "gatherer" )
. build ( )
. unwrap ( ) ,
]
. into_iter ( )
. map ( | meta | fat_meta_to_thin ( meta ) . unwrap ( ) )
. collect ::< Vec < _ > > ( ) ,
)
. wait ( )
. wait ( )
. await ;
. await ;
let gathered = jobs . pop ( "gatherer" ) ;
let gathered = jobs . pop ( "gatherer" ) . unwrap ( ) ;
assert_eq! ( gathered . unwrap ( ) . alias , None ) ;
assert_eq! ( gathered . alias . unwrap ( ) , "gatherer" ) ;
Ok ( ( ) )
Ok ( ( ) )
}
}
}
}