From 0cad9c107ac01c9dd122ea114543c26ed926801e Mon Sep 17 00:00:00 2001 From: plazmoid Date: Fri, 26 Feb 2021 20:16:18 +0500 Subject: [PATCH] improved executor and some fixes --- bin/u_agent/Cargo.toml | 2 +- bin/u_agent/src/main.rs | 55 ++++++++++++++++++++++-------------- bin/u_server/src/db.rs | 17 ++++++++--- bin/u_server/src/handlers.rs | 21 +++++++++----- bin/u_server/src/main.rs | 2 +- lib/u_lib/src/api.rs | 4 +-- lib/u_lib/src/executor.rs | 55 ++++++++++++++++++------------------ lib/u_lib/src/models/jobs.rs | 22 +++++++-------- lib/u_lib/src/models/mod.rs | 4 +-- 9 files changed, 105 insertions(+), 77 deletions(-) diff --git a/bin/u_agent/Cargo.toml b/bin/u_agent/Cargo.toml index 88b7024..a0fa747 100644 --- a/bin/u_agent/Cargo.toml +++ b/bin/u_agent/Cargo.toml @@ -11,6 +11,6 @@ tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", sysinfo = "0.10.5" log = "^0.4" env_logger = "0.8.3" -uuid = "0.8.1" +uuid = "0.6.5" reqwest = { version = "0.11", features = ["json"] } u_lib = { version = "*", path = "../../lib/u_lib" } \ No newline at end of file diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index 0281762..05d76b9 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -14,9 +14,11 @@ use u_lib::{ api::ClientHandler, models::{gather}, build_jobs_with_result, + pop_completed, UID, - JobResult, - JobCache + ExactJob, + JobCache, + //daemonize }; use tokio::{time::{Duration, sleep}}; @@ -33,6 +35,28 @@ macro_rules! retry_until_ok { } } +async fn process_request(job_requests: Vec, client: &ClientHandler) { + if job_requests.len() > 0 { + for jr in job_requests.iter() { + if !JobCache::contains(&jr.job_id) { + info!("Fetching job: {}", &jr.job_id); + let fetched_job = + retry_until_ok!(client.get_jobs(Some(&jr.job_id)).await).pop().unwrap(); + JobCache::insert(fetched_job); + } + }; + info!( + "Scheduling jobs: \n{}", + job_requests.iter() + .map(|j| j.job_id.to_string()) + .collect::>().join("\n") + ); + build_jobs_with_result(job_requests) + .spawn() + .await; + } +} + #[tokio::main] async fn main() { //daemonize(); @@ -44,26 +68,15 @@ async fn main() { retry_until_ok!(instance.init(&cli_info).await); info!("Instanciated! Running main loop"); loop { - let job_requests: Vec = + let job_requests: Vec = retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await); - if job_requests.len() > 0 { - for jr in job_requests.iter() { - if !JobCache::contains(&jr.job_id) { - info!("Fetching job: {}", &jr.job_id); - let fetched_job = - retry_until_ok!(instance.get_jobs(Some(&jr.job_id)).await).pop().unwrap(); - JobCache::insert(fetched_job); - } - }; - let result = build_jobs_with_result(job_requests) - .run_until_complete() - .await - .into_iter() - .map(|r| r.unwrap()) - .collect(); - retry_until_ok!(instance.report( - &result - ).await) + process_request(job_requests, &instance).await; + let result: Vec = pop_completed().await + .into_iter() + .map(|r| r.unwrap()) + .collect(); + if result.len() > 0 { + retry_until_ok!(instance.report(&result).await) } sleep(Duration::from_secs(5)).await; } diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 943669b..e2a9f24 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -67,7 +67,16 @@ impl UDB { Ok(result) } - pub fn get_agent_jobs(&self, uid: Option, personal: bool) -> USrvResult> { + pub fn update_job_status(&self, uid: Uuid, status: JobState) -> USrvResult<()> { + use schema::results; + diesel::update(results::table) + .filter(results::id.eq(uid)) + .set(results::state.eq(status)) + .execute(&self.conn)?; + Ok(()) + } + + pub fn get_agent_jobs(&self, uid: Option, personal: bool) -> USrvResult> { use schema::results; let mut q = results::table .into_boxed(); @@ -85,7 +94,7 @@ impl UDB { .or_filter(results::id.eq(uid.unwrap())) } - let result = q.load::(&self.conn)?; + let result = q.load::(&self.conn)?; Ok(result) } @@ -111,12 +120,12 @@ impl UDB { return Err(USrvError::NotFound(not_found_jobs.join(", "))); } let job_requests = job_uids.iter().map(|job_uid| { - JobResult { + ExactJob { job_id: *job_uid, agent_id: *agent_uid, ..Default::default() } - }).collect::>(); + }).collect::>(); diesel::insert_into(results::table) .values(&job_requests) .execute(&self.conn)?; diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 8a586f2..37c58c7 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -71,12 +71,19 @@ pub async fn get_agent_jobs( db: Storage, personal: bool) -> Result { - match db.lock() + let result = db.lock() .unwrap() - .get_agent_jobs(uid, personal) { - Ok(r) => Ok(warp::reply::json( - &r.as_message() - )), + .get_agent_jobs(uid, personal); + match result { + Ok(r) => { + let _db = db.lock().unwrap(); + for j in r.iter() { + _db.update_job_status(j.id, JobState::Running).ok(); + } + Ok(warp::reply::json( + &r.as_message() + )) + }, Err(e) => Err(warp::reject()) } } @@ -126,7 +133,7 @@ pub async fn set_jobs( } pub async fn report( - msg: BaseMessage<'_, Vec>, + msg: BaseMessage<'_, Vec>, db: Storage) -> Result { let db = db.lock().unwrap(); @@ -136,7 +143,7 @@ pub async fn report( if id != res.agent_id { continue } - if let Err(e) = res.save_changes::(&db.conn).map_err(USrvError::from) { + if let Err(e) = res.save_changes::(&db.conn).map_err(USrvError::from) { failed.push(e.to_string()) } } diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index 771a22a..abd0531 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -95,7 +95,7 @@ async fn main() { let report = warp::post() .and(warp::path(Paths::report)) - .and(get_content::>()) + .and(get_content::>()) .and(db.clone()) .and_then(handlers::report); diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index f45e660..ac0fc11 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -182,7 +182,7 @@ impl ClientHandler { // client listing (A) build_handler!(GET get_agents/Uuid() -> Vec); // get jobs for client -build_handler!(GET get_agent_jobs/Uuid() -> Vec); +build_handler!(GET get_agent_jobs/Uuid() -> Vec); // get all available jobs (A) build_handler!(GET get_jobs/Uuid() -> Vec); // add client to server's db @@ -195,4 +195,4 @@ build_handler!(GET del/Uuid() -> String); // POST /set_jobs/Uuid json: Vec build_handler!(POST set_jobs/Uuid(Vec) -> ()); // report job result -build_handler!(POST report(Vec) -> ()); +build_handler!(POST report(Vec) -> ()); diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index 54d02e3..ce6f672 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -11,29 +11,26 @@ use lazy_static::lazy_static; use tokio::{ spawn, task::JoinHandle, - //sync::mpsc::{channel, Receiver, Sender} + sync::mpsc::{channel, Receiver, Sender} }; use uuid::Uuid; -pub type FutRes = UResult; +pub type FutRes = UResult; pub type DynFut = BoxFuture<'static, FutRes>; lazy_static! { static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); - /*static ref FUT_CHANNEL: (Mutex>, Mutex>) = { + static ref FUT_CHANNEL: (Sender, Mutex>) = { spawn(init_receiver()); let (tx, rx) = channel(100); - (Mutex::new(tx), Mutex::new(rx)) - };*/ + (tx, Mutex::new(rx)) + }; } -/* -async fn get_static_channel() -> (Sender, MutexGuard<'static, Receiver>) { - ( - FUT_CHANNEL.0.lock().await.clone(), - FUT_CHANNEL.1.lock().await - ) + +async fn get_sender() -> Sender { + FUT_CHANNEL.0.clone() } -*/ + struct JoinInfo { handle: JoinHandle, completed: bool, @@ -54,26 +51,25 @@ impl Waiter { } pub async fn spawn(mut self) -> Self { - let collectable = self.tasks.len() != 1; + let collectable = true; //TODO: self.tasks.len() != 1; for f in self.tasks.drain(..) { - //eprintln!("before static channel"); - //let tx = get_static_channel().await.0; - //eprintln!("after static channel"); + let tx = get_sender().await; let fid = Uuid::new_v4(); self.fids.push(fid); - /*let task_wrapper = async move { - //eprintln!("inside wrapper (started): {}", fid); + let task_wrapper = async move { + debug!("inside wrapper (started): {}", fid); let result = f.await; tx.send(fid).await.unwrap(); result - };*/ + }; + debug!("before JoinInfo"); let handle = JoinInfo { - handle: spawn(f), + handle: spawn(task_wrapper), completed: false, collectable }; - //eprintln!("before push: {}", fid); - spawn(async {}).await.ok(); + debug!("before push: {}", fid); + //spawn(async {}).await.ok(); FUT_RESULTS.lock().await.insert(fid, handle); } self @@ -105,15 +101,19 @@ impl Waiter { async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } -/* + async fn init_receiver() { - while let Some(fid) = get_static_channel().await.1.recv().await { - if let Some(j) = FUT_RESULTS.lock().await.get_mut(&fid) { - j.completed = true; + while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { + //info!("init_receiver: next val: {}", &fid); + if let Some(mut lock) = FUT_RESULTS.try_lock() { + if let Some(j) = lock.get_mut(&fid) { + //info!("init_receiver: marked as completed"); + j.completed = true; + } } } } -*/ + pub async fn pop_task_if_completed(fid: Uuid) -> Option { let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS .lock() @@ -122,7 +122,6 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option { Some(t) => t, None => return None }; - //eprint!("{}, {}: ", &fid, *collectable); if collectable && completed { let task = pop_task(fid).await.unwrap(); let result = task.handle.await.unwrap(); diff --git a/lib/u_lib/src/models/jobs.rs b/lib/u_lib/src/models/jobs.rs index e927d8c..76efbbd 100644 --- a/lib/u_lib/src/models/jobs.rs +++ b/lib/u_lib/src/models/jobs.rs @@ -272,7 +272,7 @@ impl Default for JobMeta { AsChangeset, )] #[table_name = "results"] -pub struct JobResult { +pub struct ExactJob { pub agent_id: Uuid, pub created: SystemTime, pub id: Uuid, @@ -283,7 +283,7 @@ pub struct JobResult { pub updated: SystemTime, } -impl fmt::Display for JobResult { +impl fmt::Display for ExactJob { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut out = format!("Result {}", self.id); out += &format!("\nAgent {}", self.agent_id); @@ -302,7 +302,7 @@ impl fmt::Display for JobResult { } } -impl JobResult { +impl ExactJob { pub fn from_meta(job_id: Uuid, result_id: Option) -> Self { Self { id: result_id.unwrap_or(Uuid::new_v4()), @@ -315,7 +315,7 @@ impl JobResult { //pub fn as_job_output(&self) -> JobOutput {} } -impl Default for JobResult { +impl Default for ExactJob { fn default() -> Self { Self { agent_id: Uuid::nil(), @@ -333,7 +333,7 @@ impl Default for JobResult { pub struct Job { exec_type: JobType, payload: Option>, - result: JobResult + result: ExactJob } impl Job { @@ -350,14 +350,14 @@ impl Job { Ok(Self { exec_type: job_meta.exec_type, payload: job_meta.payload, - result: JobResult::from_meta(job_meta.id.clone(), Some(result_id)) + result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id)) }) }, _ => todo!() } } - async fn run(mut self) -> UResult { + async fn run(mut self) -> UResult { match self.exec_type { JobType::Shell => { let str_payload = match &self.payload { @@ -406,7 +406,7 @@ impl Job { } } -pub fn build_jobs_with_result>(job_requests: J) -> Waiter { +pub fn build_jobs_with_result>(job_requests: J) -> Waiter { let prepared_jobs = job_requests.into_vec() .into_iter() .filter_map(|jr| -> Option { @@ -433,8 +433,8 @@ pub fn build_jobs>(job_metas: J) -> Waiter { let job_requests = job_metas.into_vec().into_iter().map(|jm| { let j_uid = jm.id; JobCache::insert(jm); - JobResult::from_meta(j_uid, None) - }).collect::>(); + ExactJob::from_meta(j_uid, None) + }).collect::>(); build_jobs_with_result(job_requests) } @@ -442,7 +442,7 @@ pub fn build_jobs>(job_metas: J) -> Waiter { #[cfg(test)] mod tests { use super::*; - use crate::{build_jobs, utils::vec_to_string, pop_completed, spawn_dummy}; + use crate::{build_jobs, utils::vec_to_string, pop_completed}; #[tokio::test] async fn test_is_really_async() { diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index 98bdcfd..ae54c3d 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -38,11 +38,11 @@ to_message!( Agent, IAgent, JobMeta, - JobResult, + ExactJob, String, Vec, Vec, - Vec, + Vec, Vec, () );