From 35ca8d0a78842ad82818a3f0cd9d0bf4650d66cc Mon Sep 17 00:00:00 2001 From: plazmoid Date: Thu, 4 Mar 2021 20:04:29 +0500 Subject: [PATCH] split jobs into submodules, reworked db interface --- .cargo/config.toml | 3 + bin/u_server/Cargo.toml | 1 + bin/u_server/src/db.rs | 148 +++---- bin/u_server/src/errors.rs | 16 - bin/u_server/src/handlers.rs | 136 +++---- bin/u_server/src/main.rs | 118 +++--- lib/u_lib/Cargo.toml | 1 + lib/u_lib/src/api.rs | 31 +- lib/u_lib/src/errors.rs | 23 +- lib/u_lib/src/executor.rs | 44 ++- lib/u_lib/src/lib.rs | 20 +- lib/u_lib/src/models/agent.rs | 87 ++-- lib/u_lib/src/models/jobs.rs | 570 --------------------------- lib/u_lib/src/models/jobs/builder.rs | 116 ++++++ lib/u_lib/src/models/jobs/cache.rs | 47 +++ lib/u_lib/src/models/jobs/meta.rs | 68 ++++ lib/u_lib/src/models/jobs/misc.rs | 164 ++++++++ lib/u_lib/src/models/jobs/mod.rs | 7 + lib/u_lib/src/models/jobs/result.rs | 155 ++++++++ 19 files changed, 830 insertions(+), 925 deletions(-) create mode 100644 .cargo/config.toml delete mode 100644 bin/u_server/src/errors.rs delete mode 100644 lib/u_lib/src/models/jobs.rs create mode 100644 lib/u_lib/src/models/jobs/builder.rs create mode 100644 lib/u_lib/src/models/jobs/cache.rs create mode 100644 lib/u_lib/src/models/jobs/meta.rs create mode 100644 lib/u_lib/src/models/jobs/misc.rs create mode 100644 lib/u_lib/src/models/jobs/mod.rs create mode 100644 lib/u_lib/src/models/jobs/result.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..664ed6c --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[build] +target = "x86_64-unknown-linux-gnu" # -musl" + diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 498adc0..238348b 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -11,6 +11,7 @@ log = "0.4.11" thiserror = "*" warp = "0.2.4" uuid = { version = "0.6.5", features = ["serde", "v4"] } +once_cell = "1.7.2" [dependencies.diesel] features = ["postgres", "uuid"] diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index e2a9f24..27fc8dd 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -1,39 +1,56 @@ -use diesel::{ - pg::PgConnection, - prelude::*, - result::Error as DslError -}; +use diesel::{pg::PgConnection, prelude::*, result::Error as DslError}; use dotenv::dotenv; +use once_cell::sync::OnceCell; use std::{ env, - sync::{Arc, Mutex} -}; -use crate::{ - errors::* + sync::{Arc, Mutex, MutexGuard}, }; use u_lib::{ - models::* + models::{schema, Agent, ExactJob, IAgent, JobMeta, JobState}, + ULocalError, ULocalResult, }; use uuid::Uuid; -pub type Storage = Arc>; - pub struct UDB { - pub conn: PgConnection + pub conn: PgConnection, } -impl UDB { - pub fn new() -> USrvResult { +static DB: OnceCell>> = OnceCell::new(); + +pub fn lock_db() -> MutexGuard<'static, UDB> { + DB.get_or_init(|| { dotenv().unwrap(); let db_path = env::var("DATABASE_URL").unwrap(); let conn = PgConnection::establish(&db_path).unwrap(); - let instance = UDB { - conn + let instance = UDB { conn }; + Arc::new(Mutex::new(instance)) + }) + .lock() + .unwrap() +} + +impl UDB { + pub fn insert_jobs(&self, job_metas: &Vec) -> ULocalResult<()> { + use schema::jobs; + diesel::insert_into(jobs::table) + .values(job_metas) + .execute(&self.conn)?; + Ok(()) + } + + pub fn get_jobs(&self, uid: Option) -> ULocalResult> { + use schema::jobs; + let result = if uid.is_some() { + jobs::table + .filter(jobs::id.eq(uid.unwrap())) + .get_results::(&self.conn)? + } else { + jobs::table.load::(&self.conn)? }; - Ok(Arc::new(Mutex::new(instance))) + Ok(result) } - pub fn new_agent(&self, agent: &IAgent) -> USrvResult<()> { + pub fn insert_agents(&self, agent: &IAgent) -> ULocalResult<()> { use schema::agents; diesel::insert_into(agents::table) .values(agent) @@ -41,33 +58,19 @@ impl UDB { Ok(()) } - pub fn get_agents(&self, uid: Option) -> USrvResult> { + pub fn get_agents(&self, uid: Option) -> ULocalResult> { use schema::agents; let result = if uid.is_some() { agents::table .filter(agents::id.eq(uid.unwrap())) .load::(&self.conn)? } else { - agents::table - .load::(&self.conn)? - }; - Ok(result) - } - - pub fn get_jobs(&self, uid: Option) -> USrvResult> { - use schema::jobs; - let result = if uid.is_some() { - jobs::table - .filter(jobs::id.eq(uid.unwrap())) - .get_results::(&self.conn)? - } else { - jobs::table - .load::(&self.conn)? + agents::table.load::(&self.conn)? }; Ok(result) } - pub fn update_job_status(&self, uid: Uuid, status: JobState) -> USrvResult<()> { + pub fn update_job_status(&self, uid: Uuid, status: JobState) -> ULocalResult<()> { use schema::results; diesel::update(results::table) .filter(results::id.eq(uid)) @@ -76,63 +79,62 @@ impl UDB { Ok(()) } - pub fn get_agent_jobs(&self, uid: Option, personal: bool) -> USrvResult> { + //TODO: filters possibly could work in a wrong way, check + pub fn get_exact_jobs(&self, uid: Option, personal: bool) -> ULocalResult> { use schema::results; - let mut q = results::table - .into_boxed(); + let mut q = results::table.into_boxed(); if uid.is_some() { q = q.filter(results::agent_id.eq(uid.unwrap())) } if personal { q = q.filter( - results::state.eq(JobState::Queued) - .and(results::agent_id.eq(uid.unwrap())) + results::state + .eq(JobState::Queued) + .and(results::agent_id.eq(uid.unwrap())), ) } else if uid.is_some() { - q = q.filter(results::agent_id.eq(uid.unwrap())) + q = q + .filter(results::agent_id.eq(uid.unwrap())) .or_filter(results::job_id.eq(uid.unwrap())) .or_filter(results::id.eq(uid.unwrap())) - } let result = q.load::(&self.conn)?; Ok(result) } - pub fn add_jobs(&self, job_metas: &Vec) -> USrvResult<()> { - use schema::jobs; - diesel::insert_into(jobs::table) - .values(job_metas) - .execute(&self.conn)?; - Ok(()) - } - - pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &Vec) -> USrvResult<()> { + pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &Vec) -> ULocalResult<()> { use schema::{agents::dsl::agents, jobs::dsl::jobs, results}; if let Err(DslError::NotFound) = agents.find(agent_uid).first::(&self.conn) { - return Err(USrvError::NotFound(agent_uid.to_string())) + return Err(ULocalError::NotFound(agent_uid.to_string())); } - let not_found_jobs = job_uids.iter().filter_map(|job_uid| { - if let Err(DslError::NotFound) = jobs.find(job_uid).first::(&self.conn) { - Some(job_uid.to_string()) - } else { None } - }).collect::>(); + let not_found_jobs = job_uids + .iter() + .filter_map(|job_uid| { + if let Err(DslError::NotFound) = jobs.find(job_uid).first::(&self.conn) { + Some(job_uid.to_string()) + } else { + None + } + }) + .collect::>(); if not_found_jobs.len() > 0 { - return Err(USrvError::NotFound(not_found_jobs.join(", "))); + return Err(ULocalError::NotFound(not_found_jobs.join(", "))); } - let job_requests = job_uids.iter().map(|job_uid| { - ExactJob { + let job_requests = job_uids + .iter() + .map(|job_uid| ExactJob { job_id: *job_uid, agent_id: *agent_uid, ..Default::default() - } - }).collect::>(); + }) + .collect::>(); diesel::insert_into(results::table) .values(&job_requests) .execute(&self.conn)?; Ok(()) } - pub fn del_jobs(&self, uids: &Vec) -> USrvResult { + pub fn del_jobs(&self, uids: &Vec) -> ULocalResult { use schema::jobs; let mut affected = 0; for &uid in uids { @@ -144,24 +146,24 @@ impl UDB { Ok(affected) } - pub fn del_agents(&self, uids: &Vec) -> USrvResult { - use schema::agents; + pub fn del_results(&self, uids: &Vec) -> ULocalResult { + use schema::results; let mut affected = 0; for &uid in uids { - let deleted = diesel::delete(agents::table) - .filter(agents::id.eq(uid)) + let deleted = diesel::delete(results::table) + .filter(results::id.eq(uid)) .execute(&self.conn)?; affected += deleted; } Ok(affected) } - pub fn del_results(&self, uids: &Vec) -> USrvResult { - use schema::results; + pub fn del_agents(&self, uids: &Vec) -> ULocalResult { + use schema::agents; let mut affected = 0; for &uid in uids { - let deleted = diesel::delete(results::table) - .filter(results::id.eq(uid)) + let deleted = diesel::delete(agents::table) + .filter(agents::id.eq(uid)) .execute(&self.conn)?; affected += deleted; } @@ -199,4 +201,4 @@ mod tests { ) } } -*/ \ No newline at end of file +*/ diff --git a/bin/u_server/src/errors.rs b/bin/u_server/src/errors.rs deleted file mode 100644 index 64f51c3..0000000 --- a/bin/u_server/src/errors.rs +++ /dev/null @@ -1,16 +0,0 @@ -use thiserror::Error; -use diesel::result::Error as DslError; - -pub type USrvResult = Result; - -#[derive(Error, Debug)] -pub enum USrvError { - #[error("{0} is not found")] - NotFound(String), - - #[error("Error processing {0}")] - ProcessingError(String), - - #[error(transparent)] - DBError(#[from] DslError) -} diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 37c58c7..da95833 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -1,119 +1,78 @@ +use crate::db::{lock_db, UDB}; +use diesel::SaveChangesDsl; use std::fmt::Display; use u_lib::{ - models::* + messaging::{BaseMessage, ToMsg}, + models::{ExactJob, IAgent, JobMeta, JobState}, + ULocalError, }; +use uuid::Uuid; use warp::{ - Rejection, - Reply, - reply::{with_status}, - http::{StatusCode, Response} + http::{Response, StatusCode}, + reply::with_status, + Rejection, Reply, }; -use crate::db::{ - Storage, - UDB, -}; -use uuid::Uuid; -use diesel::SaveChangesDsl; -use crate::errors::USrvError; fn build_response(code: StatusCode, body: S) -> Result, Rejection> { Ok(Response::builder() .status(code) - .body(format!("{}", body)).unwrap()) + .body(format!("{}", body)) + .unwrap()) } fn build_empty_200() -> Result, Rejection> { build_response(StatusCode::OK, "") } -pub async fn add_agent( - msg: BaseMessage<'_, IAgent>, - db: Storage) -> Result -{ - match db.lock() - .unwrap() - .new_agent(&msg.into_inner()) { +pub async fn add_agent(msg: BaseMessage<'_, IAgent>) -> Result { + match lock_db().insert_agents(&msg.into_inner()) { Ok(_) => build_empty_200(), - Err(e) => build_response(StatusCode::BAD_REQUEST, e) + Err(e) => build_response(StatusCode::BAD_REQUEST, e), } } -pub async fn get_agents( - uid: Option, - db: Storage -) -> Result { - match db.lock() - .unwrap() - .get_agents(uid) { - Ok(r) => Ok(warp::reply::json( - &r.as_message() - )), - Err(e) => Err(warp::reject()) +pub async fn get_agents(uid: Option) -> Result { + match lock_db().get_agents(uid) { + Ok(r) => Ok(warp::reply::json(&r.as_message())), + Err(e) => Err(warp::reject()), } } -pub async fn get_jobs( - uid: Option, - db: Storage) -> Result -{ - match db.lock() - .unwrap() - .get_jobs(uid) { - Ok(r) => Ok(warp::reply::json( - &r.as_message() - )), - Err(e) => Err(warp::reject()) +pub async fn get_jobs(uid: Option) -> Result { + match lock_db().get_jobs(uid) { + Ok(r) => Ok(warp::reply::json(&r.as_message())), + Err(e) => Err(warp::reject()), } } -pub async fn get_agent_jobs( - uid: Option, - db: Storage, - personal: bool) -> Result -{ - let result = db.lock() - .unwrap() - .get_agent_jobs(uid, personal); +pub async fn get_agent_jobs(uid: Option, personal: bool) -> Result { + let result = lock_db().get_exact_jobs(uid, personal); match result { Ok(r) => { - let _db = db.lock().unwrap(); + let _db = lock_db(); for j in r.iter() { _db.update_job_status(j.id, JobState::Running).ok(); } - Ok(warp::reply::json( - &r.as_message() - )) - }, - Err(e) => Err(warp::reject()) + Ok(warp::reply::json(&r.as_message())) + } + Err(e) => Err(warp::reject()), } } -pub async fn upload_jobs( - msg: BaseMessage<'_, Vec>, - db: Storage) -> Result -{ - match db.lock() - .unwrap() - .add_jobs(&msg.into_inner()) { +pub async fn upload_jobs(msg: BaseMessage<'_, Vec>) -> Result { + match lock_db().insert_jobs(&msg.into_inner()) { Ok(_) => build_empty_200(), - Err(e) => build_response(StatusCode::BAD_REQUEST, e) + Err(e) => build_response(StatusCode::BAD_REQUEST, e), } } -pub async fn del( - uid: Uuid, - db: Storage) -> Result -{ - let db = db.lock().unwrap(); - let del_fns = &[ - UDB::del_agents, - UDB::del_jobs, - UDB::del_results, - ]; +pub async fn del(uid: Uuid) -> Result { + let db = lock_db(); + let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; for del_fn in del_fns { let affected = del_fn(&db, &vec![uid]).unwrap(); if affected > 0 { - return build_response(StatusCode::OK, affected) + return build_response(StatusCode::OK, affected); } } build_response(StatusCode::BAD_REQUEST, 0) @@ -122,33 +81,30 @@ pub async fn del( pub async fn set_jobs( agent_uid: Uuid, msg: BaseMessage<'_, Vec>, - db: Storage) -> Result -{ - match db.lock() - .unwrap() - .set_jobs_for_agent(&agent_uid, &msg.into_inner()) { +) -> Result { + match lock_db().set_jobs_for_agent(&agent_uid, &msg.into_inner()) { Ok(_) => build_empty_200(), - Err(e) => build_response(StatusCode::BAD_REQUEST, e) + Err(e) => build_response(StatusCode::BAD_REQUEST, e), } } -pub async fn report( - msg: BaseMessage<'_, Vec>, - db: Storage) -> Result -{ - let db = db.lock().unwrap(); +pub async fn report(msg: BaseMessage<'_, Vec>) -> Result { + let db = lock_db(); let id = msg.id; let mut failed = vec![]; for res in msg.into_inner() { if id != res.agent_id { - continue + continue; } - if let Err(e) = res.save_changes::(&db.conn).map_err(USrvError::from) { + if let Err(e) = res + .save_changes::(&db.conn) + .map_err(ULocalError::from) + { failed.push(e.to_string()) } } if failed.len() > 0 { - let err_msg = USrvError::ProcessingError(failed.join(", ")); + let err_msg = ULocalError::ProcessingError(failed.join(", ")); return build_response(StatusCode::BAD_REQUEST, err_msg); } build_empty_200() diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index abd0531..d6453df 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -1,102 +1,89 @@ -mod handlers; mod db; -mod errors; +mod handlers; -use warp::{ - Filter, - Rejection, - Reply, - body -}; +use warp::{body, Filter, Rejection, Reply}; -extern crate log; extern crate env_logger; +extern crate log; -use u_lib::{ - MASTER_PORT, - api::Paths, - models::* -}; -use db::*; -use serde::{ - de::DeserializeOwned -}; +use db::lock_db; +use serde::de::DeserializeOwned; +use u_lib::{api::Paths, config::MASTER_PORT, models::*}; use uuid::Uuid; -fn get_content() - -> impl Filter,), - Error = Rejection> + Clone +fn get_content() -> impl Filter,), Error = Rejection> + Clone where - M: ToMsg + Sync + Send + DeserializeOwned + 'static + M: ToMsg + Sync + Send + DeserializeOwned + 'static, { - body::content_length_limit(1024*64) - .and(body::json::>()) + body::content_length_limit(1024 * 64).and(body::json::>()) } +fn init() { + env_logger::init(); + lock_db(); +} #[tokio::main] async fn main() { - env_logger::init(); - - let base_db = UDB::new().unwrap(); - let db = warp::any().map(move || base_db.clone()); - let infallible_none = |_| async { - Ok::<(Option,), std::convert::Infallible>((None,)) - }; + init(); + let infallible_none = |_| async { Ok::<(Option,), std::convert::Infallible>((None,)) }; let new_agent = warp::post() .and(warp::path(Paths::init)) .and(get_content::()) - .and(db.clone()) .and_then(handlers::add_agent); let get_agents = warp::get() .and(warp::path(Paths::get_agents)) - .and(warp::path::param::().map(Some).or_else(infallible_none)) - .and(db.clone()) + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) .and_then(handlers::get_agents); let upload_jobs = warp::post() .and(warp::path(Paths::upload_jobs)) .and(get_content::>()) - .and(db.clone()) .and_then(handlers::upload_jobs); let get_jobs = warp::get() .and(warp::path(Paths::get_jobs)) - .and(warp::path::param::().map(Some).or_else(infallible_none)) - .and(db.clone()) + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) .and_then(handlers::get_jobs); let get_agent_jobs = warp::get() .and(warp::path(Paths::get_agent_jobs)) - .and(warp::path::param::().map(Some).or_else(infallible_none)) - .and(db.clone()) - .and_then(|uid, db| handlers::get_agent_jobs(uid, db, false)); + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) + .and_then(|uid| handlers::get_agent_jobs(uid, false)); let get_personal_jobs = warp::get() .and(warp::path(Paths::get_agent_jobs)) .and(warp::path::param::().map(Some)) - .and(db.clone()) - .and_then(|uid, db| handlers::get_agent_jobs(uid, db, true)); + .and_then(|uid| handlers::get_agent_jobs(uid, true)); let del = warp::get() .and(warp::path(Paths::del)) .and(warp::path::param::()) - .and(db.clone()) .and_then(handlers::del); let set_jobs = warp::post() .and(warp::path(Paths::set_jobs)) .and(warp::path::param::()) .and(get_content::>()) - .and(db.clone()) .and_then(handlers::set_jobs); let report = warp::post() .and(warp::path(Paths::report)) .and(get_content::>()) - .and(db.clone()) .and_then(handlers::report); let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); @@ -104,32 +91,29 @@ async fn main() { let agent_zone = new_agent .or(get_jobs.clone()) .or(get_personal_jobs) - .or(report) - ; - - let auth_zone = auth_token - .and( - get_agents - .or(get_jobs) - .or(upload_jobs) - .or(del) - .or(set_jobs) - .or(get_agent_jobs) - ); - - let routes = agent_zone - .or(auth_zone); + .or(report); + + let auth_zone = auth_token.and( + get_agents + .or(get_jobs) + .or(upload_jobs) + .or(del) + .or(set_jobs) + .or(get_agent_jobs), + ); + + let routes = agent_zone.or(auth_zone); warp::serve(routes.with(warp::log("warp"))) - .run(([0,0,0,0], MASTER_PORT)).await; + .run(([0, 0, 0, 0], MASTER_PORT)) + .await; } - #[cfg(test)] mod tests { use super::*; -/* - #[tokio::test] - async fn test_gather() { - } -*/ + /* + #[tokio::test] + async fn test_gather() { + } + */ } diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index d9a0376..231741e 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -22,6 +22,7 @@ env_logger = "0.8.3" diesel-derive-enum = { version = "1", features = ["postgres"] } chrono = "0.4.19" strum = { version = "0.20", features = ["derive"] } +once_cell = "1.7.2" [dependencies.diesel] version = "1.4.5" diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index ac0fc11..bf143b8 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -1,22 +1,12 @@ #[allow(non_upper_case_globals)] - use crate::{ - MASTER_SERVER, - MASTER_PORT, + config::{MASTER_PORT, MASTER_SERVER}, models::*, - UResult, - UError, - utils::opt_to_string -}; -use reqwest::{ - Client, - Url, - RequestBuilder -}; -use std::{ - net::Ipv4Addr, - str::FromStr + utils::opt_to_string, + UError, UResult, }; +use reqwest::{Client, RequestBuilder, Url}; +use std::{net::Ipv4Addr, str::FromStr}; use uuid::Uuid; pub struct Paths; @@ -132,11 +122,10 @@ macro_rules! build_handler { }; } - pub struct ClientHandler { base_url: Url, client: Client, - password: Option + password: Option, } impl ClientHandler { @@ -146,10 +135,8 @@ impl ClientHandler { .unwrap_or(MASTER_SERVER); Self { client: Client::new(), - base_url: Url::parse( - &format!("http://{}:{}", master_server, MASTER_PORT) - ).unwrap(), - password: None + base_url: Url::parse(&format!("http://{}:{}", master_server, MASTER_PORT)).unwrap(), + password: None, } } @@ -161,7 +148,7 @@ impl ClientHandler { fn set_pwd(&self, rb: RequestBuilder) -> RequestBuilder { match &self.password { Some(p) => rb.bearer_auth(p), - None => rb + None => rb, } } diff --git a/lib/u_lib/src/errors.rs b/lib/u_lib/src/errors.rs index 6dc51bf..799ee3d 100644 --- a/lib/u_lib/src/errors.rs +++ b/lib/u_lib/src/errors.rs @@ -1,12 +1,11 @@ +use diesel::result::Error as DslError; use reqwest::Error as ReqError; -use serde::{ - Serialize, - Deserialize -}; +use serde::{Deserialize, Serialize}; use thiserror::Error; use uuid::Uuid; pub type UResult = std::result::Result; +pub type ULocalResult = std::result::Result; #[derive(Error, Debug, Serialize, Deserialize, Clone)] pub enum UError { @@ -32,11 +31,23 @@ pub enum UError { NoJob(Uuid), #[error("Error opening {0}: {1}")] - FilesystemError(String, String) + FilesystemError(String, String), } impl From for UError { fn from(e: ReqError) -> Self { UError::NetError(e.to_string(), String::new()) } -} \ No newline at end of file +} + +#[derive(Error, Debug)] +pub enum ULocalError { + #[error("{0} is not found")] + NotFound(String), + + #[error("Error processing {0}")] + ProcessingError(String), + + #[error(transparent)] + DBError(#[from] DslError), +} diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index ce6f672..05fad1b 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -2,16 +2,16 @@ // job runner (thread) // every job runs in other thread/process -use crate::{models::*, UResult, OneOrMany}; +use crate::{models::*, utils::OneOrMany, UResult}; use std::collections::HashMap; -use futures::{lock::{Mutex, MutexGuard}, future::BoxFuture}; +use futures::{future::BoxFuture, lock::Mutex}; use lazy_static::lazy_static; use tokio::{ spawn, + sync::mpsc::{channel, Receiver, Sender}, task::JoinHandle, - sync::mpsc::{channel, Receiver, Sender} }; use uuid::Uuid; @@ -27,33 +27,33 @@ lazy_static! { }; } -async fn get_sender() -> Sender { +fn get_sender() -> Sender { FUT_CHANNEL.0.clone() } struct JoinInfo { handle: JoinHandle, completed: bool, - collectable: bool // indicates if future can be popped from pool via pop_task_if_completed + collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed } pub struct Waiter { tasks: Vec, - fids: Vec + fids: Vec, } impl Waiter { pub fn new>(tasks: S) -> Self { Self { tasks: tasks.into_vec(), - fids: vec![] + fids: vec![], } } pub async fn spawn(mut self) -> Self { let collectable = true; //TODO: self.tasks.len() != 1; for f in self.tasks.drain(..) { - let tx = get_sender().await; + let tx = get_sender(); let fid = Uuid::new_v4(); self.fids.push(fid); let task_wrapper = async move { @@ -66,7 +66,7 @@ impl Waiter { let handle = JoinInfo { handle: spawn(task_wrapper), completed: false, - collectable + collectable, }; debug!("before push: {}", fid); //spawn(async {}).await.ok(); @@ -98,10 +98,6 @@ impl Waiter { } } -async fn pop_task(fid: Uuid) -> Option { - FUT_RESULTS.lock().await.remove(&fid) -} - async fn init_receiver() { while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { //info!("init_receiver: next val: {}", &fid); @@ -114,13 +110,18 @@ async fn init_receiver() { } } -pub async fn pop_task_if_completed(fid: Uuid) -> Option { - let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS - .lock() - .await - .get_mut(&fid) { +async fn pop_task(fid: Uuid) -> Option { + FUT_RESULTS.lock().await.remove(&fid) +} + +async fn pop_task_if_completed(fid: Uuid) -> Option { + let &mut JoinInfo { + handle: _, + collectable, + completed, + } = match FUT_RESULTS.lock().await.get_mut(&fid) { Some(t) => t, - None => return None + None => return None, }; if collectable && completed { let task = pop_task(fid).await.unwrap(); @@ -133,7 +134,8 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option { pub async fn pop_completed() -> Vec { let mut completed: Vec = vec![]; - let fids = FUT_RESULTS.lock() + let fids = FUT_RESULTS + .lock() .await .keys() .map(|k| *k) @@ -170,4 +172,4 @@ mod tests { t.await.ok(); assert_eq!(5, *val.lock().await); } -} \ No newline at end of file +} diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index 8cf8352..548b99c 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -1,19 +1,15 @@ #![allow(non_upper_case_globals)] -pub mod executor; +pub mod api; pub mod config; -pub mod utils; pub mod errors; -pub mod api; -pub mod models; +pub mod executor; pub mod messaging; +pub mod models; +pub mod utils; -pub use { - utils::*, - config::*, - executor::*, - errors::*, - models::* -}; +pub use config::UID; +pub use errors::{UError, ULocalError, ULocalResult, UResult}; +pub use models::{Jobstate, Jobtype}; // for schema #[macro_use] extern crate lazy_static; @@ -23,4 +19,4 @@ extern crate diesel; #[macro_use] extern crate log; -extern crate env_logger; \ No newline at end of file +extern crate env_logger; diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index e5cf7d9..61bc57b 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -1,20 +1,15 @@ -use serde::{ - Serialize, - Deserialize -}; -use std::time::SystemTime; +use diesel::{Identifiable, Insertable, Queryable}; +use serde::{Deserialize, Serialize}; use std::fmt; -use diesel::{ - Queryable, - Identifiable, - Insertable -}; +use std::time::SystemTime; use crate::{ - models::*, + models::{ + jobs::{build_jobs, JobMeta, JobOutput}, + schema::*, + }, + utils::{systime_to_string, vec_to_string}, UID, - utils::*, - models::schema::*, }; use guess_host_triple::guess_host_triple; @@ -34,12 +29,12 @@ pub struct Agent { pub regtime: SystemTime, pub status: Option, pub token: Option, - pub username: String + pub username: String, } impl fmt::Display for Agent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut out = format!("Agent {}", self.id); + let mut out = format!("Agent: {}", self.id); if self.alias.is_some() { out += &format!(" ({})", self.alias.as_ref().unwrap()) } @@ -64,50 +59,46 @@ pub struct IAgent { pub platform: String, pub status: Option, pub token: Option, - pub username: String + pub username: String, } +impl IAgent { + pub async fn gather() -> Self { + async fn run_cmd_fast>(cmd: S) -> String { + let jm = JobMeta::from_shell(cmd); + let job_result = build_jobs(jm) + .run_one_until_complete() + .await + .unwrap() + .result + .unwrap(); + JobOutput::from_raw(&job_result) + .map(|o| vec_to_string(&o.into_appropriate())) + .unwrap_or(String::from_utf8_lossy(&job_result).to_string()) + } -pub async fn gather() -> IAgent { - async fn run_cmd_fast>(cmd: S) -> String { - let jm = JobMeta::from_shell(cmd); - let job_result = build_jobs(jm) - .run_one_until_complete() - .await - .unwrap() - .result - .unwrap(); - JobOutput::from_raw(&job_result) - .map(|o| vec_to_string(&o.into_appropriate())) - .unwrap_or(String::from_utf8_lossy(&job_result).to_string()) - } - - #[cfg(unix)] - IAgent { - alias: None, - id: UID.clone(), - hostname: run_cmd_fast("hostname").await, - is_root: &run_cmd_fast("id -u").await == "0", - is_root_allowed: false, - platform: guess_host_triple().unwrap_or("Error").to_string(), - status: None, //wtf? - token: None, - username: run_cmd_fast("id -un").await, + #[cfg(unix)] + Self { + alias: None, + id: UID.clone(), + hostname: run_cmd_fast("hostname").await, + is_root: &run_cmd_fast("id -u").await == "0", + is_root_allowed: false, + platform: guess_host_triple().unwrap_or("Error").to_string(), + status: None, //wtf? + token: None, + username: run_cmd_fast("id -un").await, + } } } - #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_gather() { - let cli_info = gather().await; - assert_eq!( - &cli_info.username, - "plazmoid" - ) + let cli_info = IAgent::gather().await; + assert_eq!(&cli_info.username, "plazmoid") } - } diff --git a/lib/u_lib/src/models/jobs.rs b/lib/u_lib/src/models/jobs.rs deleted file mode 100644 index 76efbbd..0000000 --- a/lib/u_lib/src/models/jobs.rs +++ /dev/null @@ -1,570 +0,0 @@ -use std::{ - time::{SystemTime, Duration}, - thread, - sync::{RwLock, RwLockReadGuard}, - cmp::PartialEq, - fmt, - string::ToString, - path::PathBuf, - fs, - process::Output, - collections::HashMap, - ops::Deref, -}; -use serde::{ - Serialize, - Deserialize -}; -use uuid::Uuid; -use guess_host_triple::guess_host_triple; -use tokio::{ - process::Command -}; -use crate::{ - utils::systime_to_string, - models::schema::*, - Agent, - UError, - UResult, - UID, - Waiter, - OneOrMany, - DynFut, -}; -use diesel_derive_enum::DbEnum; -use diesel::{ - Queryable, - Identifiable, - Insertable, -}; -use strum::Display; - -type Cache = HashMap; - -lazy_static! { - static ref JOB_CACHE: RwLock = RwLock::new(HashMap::new()); -} - -pub struct JobCache; - -impl JobCache { - pub fn insert(job_meta: JobMeta) { - JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta); - } - - pub fn contains(uid: &Uuid) -> bool { - JOB_CACHE.read().unwrap().contains_key(uid) - } - - pub fn get(uid: &Uuid) -> Option { - if !Self::contains(uid) { - return None - } - let lock = JOB_CACHE.read().unwrap(); - Some(JobCacheHolder(lock, uid)) - } - - pub fn remove(uid: &Uuid) { - JOB_CACHE.write().unwrap().remove(uid); - } -} - -pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid); - -impl<'jm> Deref for JobCacheHolder<'jm> { - type Target = JobMeta; - - fn deref(&self) -> &Self::Target { - self.0.get(self.1).unwrap() - } -} - - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub enum ManageAction { - Ping, - UpdateAvailable, - JobsResultsRequest, - Terminate -} - -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub enum JobSchedule { - Once, - Permanent, - //Scheduled -} - -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)] -#[PgType = "JobState"] -#[DieselType = "Jobstate"] -pub enum JobState { - Queued, // server created a job, but client didn't get it yet - //Pending, // client got a job, but not running yet - Running, // client is currently running a job - Finished, -} - -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)] -#[PgType = "JobType"] -#[DieselType = "Jobtype"] -pub enum JobType { - Manage, - Shell, - Python, - Binary, - Dummy -} - -#[derive(Clone, Debug)] -pub struct JobOutput { - pub stdout: Vec, - pub stderr: Vec, -} - -impl JobOutput { - const STREAM_BORDER: &'static str = "***"; - const STDOUT: &'static str = "STDOUT"; - const STDERR: &'static str = "STDERR"; - - #[inline] - fn create_delim(header: &'static str) -> String { - format!("{border} {head} {border}\n", - border = JobOutput::STREAM_BORDER, - head = header - ) - } - - pub fn new() -> Self { - Self { - stdout: vec![], - stderr: vec![], - } - } - - pub fn stdout(mut self, data: Vec) -> Self { - self.stdout = data; - self - } - - pub fn stderr(mut self, data: Vec) -> Self { - self.stderr = data; - self - } - - pub fn multiline(&self) -> Vec { - let mut result: Vec = vec![]; - if self.stdout.len() > 0 { - result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes()); - result.extend(&self.stdout); - result.push(b'\n'); - } - - if self.stderr.len() > 0 { - result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes()); - result.extend(&self.stderr); - result.push(b'\n'); - } - result - } - - pub fn from_raw(raw: &[u8]) -> Option { - let raw = String::from_utf8_lossy(raw); - let err_header = JobOutput::create_delim(JobOutput::STDERR); - raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) - .map(|s: &str| { - let mut parts = s.split(&err_header) - .map(|d| d.trim().as_bytes().to_vec()) - .collect::>>() - .into_iter(); - JobOutput::new() - .stdout(parts.next().unwrap()) - .stderr(parts.next().unwrap_or(vec![])) - }) - } - - pub fn into_appropriate(self) -> Vec { - if self.stdout.len() > 0 { - self.stdout - } else if self.stderr.len() > 0 { - self.stderr - } else { - b"No data".to_vec() - } - } -} - -#[derive( - Serialize, - Deserialize, - Clone, - Debug, - Queryable, - Identifiable, - Insertable -)] -#[table_name = "jobs"] -pub struct JobMeta { - pub alias: String, - pub id: Uuid, - pub exec_type: JobType, - //pub schedule: JobSchedule, - pub platform: String, - pub payload: Option>, -} - -impl fmt::Display for JobMeta { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut out = format!("Job {}", self.id); - out += &format!(" ({})", self.alias); - out += &format!("\nExecutable type: {}", self.exec_type); - out += &format!("\nPlatform: {}", self.platform); - if self.exec_type == JobType::Shell && self.payload.is_some() { - out += &format!("\nPayload: {}", String::from_utf8_lossy(self.payload.as_ref().unwrap())); - } - write!(f, "{}", out) - } -} - -impl JobMeta { - pub fn from_shell>(shell_cmd: S) -> Self { - let shell_cmd = shell_cmd.into(); - let job_name = shell_cmd.split(" ").nth(0).unwrap(); - Self { - alias: job_name.to_string(), - payload: Some(shell_cmd.into_bytes()), - ..Default::default() - } - } -/* - pub fn from_file(path: PathBuf) -> UResult { - let data = fs::read(path) - .map_err(|e| UError::FilesystemError( - path.to_string_lossy().to_string(), - e.to_string() - ))?; - let filename = path.file_name().unwrap().to_str().unwrap(); - - }*/ -} - -impl Default for JobMeta { - fn default() -> Self { - Self { - id: Uuid::new_v4(), - alias: String::new(), - exec_type: JobType::Shell, - platform: guess_host_triple().unwrap_or("unknown").to_string(), - payload: None - } - } -} - - -#[derive( - Serialize, - Deserialize, - Clone, - Debug, - Queryable, - Identifiable, - Insertable, - AsChangeset, -)] -#[table_name = "results"] -pub struct ExactJob { - pub agent_id: Uuid, - pub created: SystemTime, - pub id: Uuid, - pub job_id: Uuid, - pub result: Option>, - pub state: JobState, - pub retcode: Option, - pub updated: SystemTime, -} - -impl fmt::Display for ExactJob { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut out = format!("Result {}", self.id); - out += &format!("\nAgent {}", self.agent_id); - out += &format!("\nJob: {}", self.job_id); - out += &format!("\nUpdated: {}", systime_to_string(&self.updated)); - out += &format!("\nState: {}", self.state); - if self.state == JobState::Finished { - if self.retcode.is_some() { - out += &format!("\nReturn code: {}", self.retcode.unwrap()); - } - if self.result.is_some() { - out += &format!("\nResult: {}", String::from_utf8_lossy(self.result.as_ref().unwrap())); - } - } - write!(f, "{}", out) - } -} - -impl ExactJob { - pub fn from_meta(job_id: Uuid, result_id: Option) -> Self { - Self { - id: result_id.unwrap_or(Uuid::new_v4()), - agent_id: *UID, - job_id, - ..Default::default() - } - } - - //pub fn as_job_output(&self) -> JobOutput {} -} - -impl Default for ExactJob { - fn default() -> Self { - Self { - agent_id: Uuid::nil(), - created: SystemTime::now(), - id: Uuid::new_v4(), - job_id: Uuid::nil(), - result: None, - state: JobState::Queued, - retcode: None, - updated: SystemTime::now() - } - } -} - -pub struct Job { - exec_type: JobType, - payload: Option>, - result: ExactJob -} - -impl Job { - fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult { - match job_meta.exec_type { - JobType::Shell => { - let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); - if job_meta.platform != curr_platform { - return Err(UError::InsuitablePlatform( - job_meta.platform.clone(), curr_platform - )) - } - let job_meta = job_meta.clone(); - Ok(Self { - exec_type: job_meta.exec_type, - payload: job_meta.payload, - result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id)) - }) - }, - _ => todo!() - } - } - - async fn run(mut self) -> UResult { - match self.exec_type { - JobType::Shell => { - let str_payload = match &self.payload { - Some(box_payload) => { - String::from_utf8_lossy(box_payload).into_owned() - } - None => unimplemented!() - }; - let mut cmd_parts = str_payload - .split(" ") - .map(String::from) - .collect::>() - .into_iter(); - let cmd = cmd_parts.nth(0).unwrap(); - let args = cmd_parts.collect::>(); - let cmd_result = Command::new(cmd) - .args(args) - .output() - .await; - let (data, retcode) = match cmd_result { - Ok(Output {status, stdout, stderr}) => { - ( - Some(JobOutput::new() - .stdout(stdout) - .stderr(stderr) - .multiline() - ), - status.code() - ) - } - Err(e) => { - ( - Some(UError::JobError(e.to_string()).to_string().into_bytes()), - None - ) - } - }; - self.result.result = data; - self.result.retcode = retcode; - self.result.updated = SystemTime::now(); - self.result.state = JobState::Finished; - }, - _ => todo!() - } - Ok(self.result) - } -} - -pub fn build_jobs_with_result>(job_requests: J) -> Waiter { - let prepared_jobs = job_requests.into_vec() - .into_iter() - .filter_map(|jr| -> Option { - let job = { - let job_meta = JobCache::get(&jr.job_id); - if job_meta.is_none() { - Err(UError::NoJob(jr.job_id)) - } else { - Job::build(&*job_meta.unwrap(), jr.id) - } - }; - match job { - Ok(j) => Some(Box::pin(j.run())), - Err(e) => { - warn!("Job building error: {}", e); - None - } - } - }).collect::>(); - Waiter::new(prepared_jobs) -} - -pub fn build_jobs>(job_metas: J) -> Waiter { - let job_requests = job_metas.into_vec().into_iter().map(|jm| { - let j_uid = jm.id; - JobCache::insert(jm); - ExactJob::from_meta(j_uid, None) - }).collect::>(); - build_jobs_with_result(job_requests) -} - - -#[cfg(test)] -mod tests { - use super::*; - use crate::{build_jobs, utils::vec_to_string, pop_completed}; - - #[tokio::test] - async fn test_is_really_async() { - const SLEEP_SECS: u64 = 1; - let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); - let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); - let now = SystemTime::now(); - build_jobs(sleep_jobs).run_until_complete().await; - assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS+2) - } - - #[tokio::test] - async fn test_shell_job() -> UResult<()> { - let job = JobMeta::from_shell("whoami"); - let job_result = build_jobs(job) - .run_one_until_complete() - .await; - let stdout = JobOutput::from_raw( - &job_result.unwrap().result.unwrap() - ).unwrap().stdout; - assert_eq!( - vec_to_string(&stdout).trim(), - "plazmoid" - ); - Ok(()) - } - - #[tokio::test] - async fn test_complex_load() -> UResult<()> { - const SLEEP_SECS: u64 = 1; - let now = SystemTime::now(); - let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); - let longest_job = build_jobs(longest_job).spawn().await; - let ls = build_jobs(JobMeta::from_shell("ls")) - .run_one_until_complete() - .await - .unwrap(); - assert_eq!(ls.retcode.unwrap(), 0); - let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap(); - let folders = String::from_utf8_lossy( - &result.stdout - ); - let subfolders_jobs: Vec = folders - .lines() - .map(|f| JobMeta::from_shell(format!("ls {}", f))) - .collect(); - let ls_subfolders = build_jobs( - subfolders_jobs - ).run_until_complete().await; - for result in ls_subfolders { - assert_eq!(result.unwrap().retcode.unwrap(), 0); - } - longest_job.wait().await; - assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); - Ok(()) - } -/* - #[tokio::test] - async fn test_exec_multiple_jobs_nowait() -> UResult<()> { - const REPEATS: usize = 10; - let job = JobMeta::from_shell("whoami"); - let sleep_jobs: Vec = (0..=REPEATS).map(|_| job.clone()).collect(); - build_jobs(sleep_jobs).spawn().await; - let mut completed = 0; - while completed < REPEATS { - let c = pop_completed().await.len(); - if c > 0 { - completed += c; - println!("{}", c); - } - } - Ok(()) - } -*/ - #[tokio::test] - async fn test_failing_shell_job() -> UResult<()> { - let job = JobMeta::from_shell("lol_kek_puk"); - let job_result = build_jobs(job).run_one_until_complete().await.unwrap(); - let output = JobOutput::from_raw(&job_result.result.unwrap()); - assert!(output.is_none()); - assert!(job_result.retcode.is_none()); - Ok(()) - } - - #[test] - fn test_to_multiline() { - let mut output = JobOutput::new(); - output.stdout = b"lol".to_vec(); - output.stderr = b"kek".to_vec(); - assert_eq!( - vec_to_string(&output.multiline()), - String::from( - "*** STDOUT ***\n\ - lol\n\ - *** STDERR ***\n\ - kek\n" - ) - ) - } - - #[test] - fn test_to_multiline_stderr_only() { - let mut output = JobOutput::new(); - output.stderr = b"kek".to_vec(); - assert_eq!( - vec_to_string(&output.multiline()), - String::from( - "*** STDERR ***\n\ - kek\n" - ) - ) - } - - #[test] - fn test_from_multiline() { - let txt = "*** STDOUT ***\n\ - puk\n".as_bytes(); - let output = JobOutput::from_raw(txt).unwrap(); - assert_eq!( - vec_to_string(&output.stdout), - "puk".to_string() - ); - assert_eq!(output.stderr.len(), 0); - } -} diff --git a/lib/u_lib/src/models/jobs/builder.rs b/lib/u_lib/src/models/jobs/builder.rs new file mode 100644 index 0000000..0b6bd1d --- /dev/null +++ b/lib/u_lib/src/models/jobs/builder.rs @@ -0,0 +1,116 @@ +use super::{ExactJob, JobCache, JobMeta, JobOutput, JobState, JobType}; +use crate::{ + executor::{DynFut, Waiter}, + utils::OneOrMany, + UError, UResult, +}; +use guess_host_triple::guess_host_triple; +use std::{process::Output, time::SystemTime}; +use tokio::process::Command; +use uuid::Uuid; + +pub struct Job { + exec_type: JobType, + payload: Option>, + result: ExactJob, +} + +impl Job { + fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult { + match job_meta.exec_type { + JobType::Shell => { + let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); + if job_meta.platform != curr_platform { + return Err(UError::InsuitablePlatform( + job_meta.platform.clone(), + curr_platform, + )); + } + let job_meta = job_meta.clone(); + Ok(Self { + exec_type: job_meta.exec_type, + payload: job_meta.payload, + result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id)), + }) + } + _ => todo!(), + } + } + + async fn run(mut self) -> UResult { + match self.exec_type { + JobType::Shell => { + let str_payload = match &self.payload { + Some(box_payload) => String::from_utf8_lossy(box_payload).into_owned(), + None => unimplemented!(), + }; + let mut cmd_parts = str_payload + .split(" ") + .map(String::from) + .collect::>() + .into_iter(); + let cmd = cmd_parts.nth(0).unwrap(); + let args = cmd_parts.collect::>(); + let cmd_result = Command::new(cmd).args(args).output().await; + let (data, retcode) = match cmd_result { + Ok(Output { + status, + stdout, + stderr, + }) => ( + JobOutput::new().stdout(stdout).stderr(stderr).multiline(), + status.code(), + ), + Err(e) => ( + UError::JobError(e.to_string()).to_string().into_bytes(), + None, + ), + }; + self.result.result = Some(data); + self.result.retcode = retcode; + self.result.updated = SystemTime::now(); + self.result.state = JobState::Finished; + } + _ => todo!(), + } + Ok(self.result) + } +} + +pub fn build_jobs_with_result>(job_requests: J) -> Waiter { + let prepared_jobs = job_requests + .into_vec() + .into_iter() + .filter_map(|jr| -> Option { + let job = { + let job_meta = JobCache::get(&jr.job_id); + if job_meta.is_none() { + Err(UError::NoJob(jr.job_id)) + } else { + Job::build(&*job_meta.unwrap(), jr.id) + } + }; + match job { + Ok(j) => Some(Box::pin(j.run())), + Err(e) => { + warn!("Job building error: {}", e); + None + } + } + }) + .collect::>(); + Waiter::new(prepared_jobs) +} + +pub fn build_jobs>(job_metas: J) -> Waiter { + let job_requests = job_metas + .into_vec() + .into_iter() + .map(|jm| { + let j_uid = jm.id; + JobCache::insert(jm); + ExactJob::from_meta(j_uid, None) + }) + .collect::>(); + build_jobs_with_result(job_requests) +} diff --git a/lib/u_lib/src/models/jobs/cache.rs b/lib/u_lib/src/models/jobs/cache.rs new file mode 100644 index 0000000..a486906 --- /dev/null +++ b/lib/u_lib/src/models/jobs/cache.rs @@ -0,0 +1,47 @@ +use crate::models::JobMeta; +use std::{ + collections::HashMap, + ops::Deref, + sync::{RwLock, RwLockReadGuard}, +}; +use uuid::Uuid; + +type Cache = HashMap; + +lazy_static! { + static ref JOB_CACHE: RwLock = RwLock::new(HashMap::new()); +} + +pub struct JobCache; + +impl JobCache { + pub fn insert(job_meta: JobMeta) { + JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta); + } + + pub fn contains(uid: &Uuid) -> bool { + JOB_CACHE.read().unwrap().contains_key(uid) + } + + pub fn get(uid: &Uuid) -> Option { + if !Self::contains(uid) { + return None; + } + let lock = JOB_CACHE.read().unwrap(); + Some(JobCacheHolder(lock, uid)) + } + + pub fn remove(uid: &Uuid) { + JOB_CACHE.write().unwrap().remove(uid); + } +} + +pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid); + +impl<'jm> Deref for JobCacheHolder<'jm> { + type Target = JobMeta; + + fn deref(&self) -> &Self::Target { + self.0.get(self.1).unwrap() + } +} diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs new file mode 100644 index 0000000..3e1f01b --- /dev/null +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -0,0 +1,68 @@ +use super::JobType; +use crate::models::schema::*; +use diesel::{Identifiable, Insertable, Queryable}; +use guess_host_triple::guess_host_triple; +use serde::{Deserialize, Serialize}; +use std::fmt; +use uuid::Uuid; + +#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable)] +#[table_name = "jobs"] +pub struct JobMeta { + pub alias: String, + pub id: Uuid, + pub exec_type: JobType, + //pub schedule: JobSchedule, + pub platform: String, + pub payload: Option>, +} + +impl fmt::Display for JobMeta { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut out = format!("Job {}", self.id); + out += &format!(" ({})", self.alias); + out += &format!("\nExecutable type: {}", self.exec_type); + out += &format!("\nPlatform: {}", self.platform); + if self.exec_type == JobType::Shell && self.payload.is_some() { + out += &format!( + "\nPayload: {}", + String::from_utf8_lossy(self.payload.as_ref().unwrap()) + ); + } + write!(f, "{}", out) + } +} + +impl JobMeta { + pub fn from_shell>(shell_cmd: S) -> Self { + let shell_cmd = shell_cmd.into(); + let job_name = shell_cmd.split(" ").nth(0).unwrap(); + Self { + alias: job_name.to_string(), + payload: Some(shell_cmd.into_bytes()), + ..Default::default() + } + } + /* + pub fn from_file(path: PathBuf) -> UResult { + let data = fs::read(path) + .map_err(|e| UError::FilesystemError( + path.to_string_lossy().to_string(), + e.to_string() + ))?; + let filename = path.file_name().unwrap().to_str().unwrap(); + + }*/ +} + +impl Default for JobMeta { + fn default() -> Self { + Self { + id: Uuid::new_v4(), + alias: String::new(), + exec_type: JobType::Shell, + platform: guess_host_triple().unwrap_or("unknown").to_string(), + payload: None, + } + } +} diff --git a/lib/u_lib/src/models/jobs/misc.rs b/lib/u_lib/src/models/jobs/misc.rs new file mode 100644 index 0000000..552603d --- /dev/null +++ b/lib/u_lib/src/models/jobs/misc.rs @@ -0,0 +1,164 @@ +use diesel_derive_enum::DbEnum; +use serde::{Deserialize, Serialize}; +use strum::Display; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum ManageAction { + Ping, + UpdateAvailable, + JobsResultsRequest, + Terminate, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub enum JobSchedule { + Once, + Permanent, + //Scheduled +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)] +#[PgType = "JobState"] +#[DieselType = "Jobstate"] +pub enum JobState { + Queued, // server created a job, but client didn't get it yet + //Pending, // client got a job, but not running yet + Running, // client is currently running a job + Finished, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)] +#[PgType = "JobType"] +#[DieselType = "Jobtype"] +pub enum JobType { + Manage, + Shell, + Python, + Binary, + Dummy, +} + +#[derive(Clone, Debug)] +pub struct JobOutput { + pub stdout: Vec, + pub stderr: Vec, +} + +impl JobOutput { + const STREAM_BORDER: &'static str = "***"; + const STDOUT: &'static str = "STDOUT"; + const STDERR: &'static str = "STDERR"; + + #[inline] + fn create_delim(header: &'static str) -> String { + format!( + "{border} {head} {border}\n", + border = JobOutput::STREAM_BORDER, + head = header + ) + } + + pub fn new() -> Self { + Self { + stdout: vec![], + stderr: vec![], + } + } + + pub fn stdout(mut self, data: Vec) -> Self { + self.stdout = data; + self + } + + pub fn stderr(mut self, data: Vec) -> Self { + self.stderr = data; + self + } + + pub fn multiline(&self) -> Vec { + let mut result: Vec = vec![]; + if self.stdout.len() > 0 { + result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes()); + result.extend(&self.stdout); + result.push(b'\n'); + } + + if self.stderr.len() > 0 { + result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes()); + result.extend(&self.stderr); + result.push(b'\n'); + } + result + } + + pub fn from_raw(raw: &[u8]) -> Option { + let raw = String::from_utf8_lossy(raw); + let err_header = JobOutput::create_delim(JobOutput::STDERR); + raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) + .map(|s: &str| { + let mut parts = s + .split(&err_header) + .map(|d| d.trim().as_bytes().to_vec()) + .collect::>>() + .into_iter(); + JobOutput::new() + .stdout(parts.next().unwrap()) + .stderr(parts.next().unwrap_or(vec![])) + }) + } + + pub fn into_appropriate(self) -> Vec { + if self.stdout.len() > 0 { + self.stdout + } else if self.stderr.len() > 0 { + self.stderr + } else { + b"No data".to_vec() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::utils::vec_to_string; + + #[test] + fn test_to_multiline() { + let mut output = JobOutput::new(); + output.stdout = b"lol".to_vec(); + output.stderr = b"kek".to_vec(); + assert_eq!( + vec_to_string(&output.multiline()), + String::from( + "*** STDOUT ***\n\ + lol\n\ + *** STDERR ***\n\ + kek\n" + ) + ) + } + + #[test] + fn test_to_multiline_stderr_only() { + let mut output = JobOutput::new(); + output.stderr = b"kek".to_vec(); + assert_eq!( + vec_to_string(&output.multiline()), + String::from( + "*** STDERR ***\n\ + kek\n" + ) + ) + } + + #[test] + fn test_from_multiline() { + let txt = "*** STDOUT ***\n\ + puk\n" + .as_bytes(); + let output = JobOutput::from_raw(txt).unwrap(); + assert_eq!(vec_to_string(&output.stdout), "puk".to_string()); + assert_eq!(output.stderr.len(), 0); + } +} diff --git a/lib/u_lib/src/models/jobs/mod.rs b/lib/u_lib/src/models/jobs/mod.rs new file mode 100644 index 0000000..7e1fec6 --- /dev/null +++ b/lib/u_lib/src/models/jobs/mod.rs @@ -0,0 +1,7 @@ +pub mod builder; +pub mod cache; +pub mod meta; +pub mod misc; +pub mod result; + +pub use {builder::*, cache::*, meta::*, misc::*, result::*}; diff --git a/lib/u_lib/src/models/jobs/result.rs b/lib/u_lib/src/models/jobs/result.rs new file mode 100644 index 0000000..e9d9bb0 --- /dev/null +++ b/lib/u_lib/src/models/jobs/result.rs @@ -0,0 +1,155 @@ +use super::JobState; +use crate::{models::schema::*, utils::systime_to_string, UID}; +use diesel::{Identifiable, Insertable, Queryable}; +use serde::{Deserialize, Serialize}; +use std::{fmt, time::SystemTime}; +use uuid::Uuid; + +#[derive( + Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable, AsChangeset, +)] +#[table_name = "results"] +pub struct ExactJob { + pub agent_id: Uuid, + pub created: SystemTime, + pub id: Uuid, + pub job_id: Uuid, + pub result: Option>, + pub state: JobState, + pub retcode: Option, + pub updated: SystemTime, +} + +impl Default for ExactJob { + fn default() -> Self { + Self { + agent_id: Uuid::nil(), + created: SystemTime::now(), + id: Uuid::new_v4(), + job_id: Uuid::nil(), + result: None, + state: JobState::Queued, + retcode: None, + updated: SystemTime::now(), + } + } +} + +impl fmt::Display for ExactJob { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut out = format!("Result {}", self.id); + out += &format!("\nAgent {}", self.agent_id); + out += &format!("\nJob: {}", self.job_id); + out += &format!("\nUpdated: {}", systime_to_string(&self.updated)); + out += &format!("\nState: {}", self.state); + if self.state == JobState::Finished { + if self.retcode.is_some() { + out += &format!("\nReturn code: {}", self.retcode.unwrap()); + } + if self.result.is_some() { + out += &format!( + "\nResult: {}", + String::from_utf8_lossy(self.result.as_ref().unwrap()) + ); + } + } + write!(f, "{}", out) + } +} + +impl ExactJob { + pub fn from_meta(job_id: Uuid, result_id: Option) -> Self { + Self { + id: result_id.unwrap_or(Uuid::new_v4()), + agent_id: *UID, + job_id, + ..Default::default() + } + } + + //pub fn as_job_output(&self) -> JobOutput {} +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + models::jobs::{build_jobs, JobMeta, JobOutput}, + utils::vec_to_string, + UResult, + }; + + #[tokio::test] + async fn test_is_really_async() { + const SLEEP_SECS: u64 = 1; + let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); + let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); + let now = SystemTime::now(); + build_jobs(sleep_jobs).run_until_complete().await; + assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) + } + + #[tokio::test] + async fn test_shell_job() -> UResult<()> { + let job = JobMeta::from_shell("whoami"); + let job_result = build_jobs(job).run_one_until_complete().await; + let stdout = JobOutput::from_raw(&job_result.unwrap().result.unwrap()) + .unwrap() + .stdout; + assert_eq!(vec_to_string(&stdout).trim(), "plazmoid"); + Ok(()) + } + + #[tokio::test] + async fn test_complex_load() -> UResult<()> { + const SLEEP_SECS: u64 = 1; + let now = SystemTime::now(); + let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); + let longest_job = build_jobs(longest_job).spawn().await; + let ls = build_jobs(JobMeta::from_shell("ls")) + .run_one_until_complete() + .await + .unwrap(); + assert_eq!(ls.retcode.unwrap(), 0); + let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap(); + let folders = String::from_utf8_lossy(&result.stdout); + let subfolders_jobs: Vec = folders + .lines() + .map(|f| JobMeta::from_shell(format!("ls {}", f))) + .collect(); + let ls_subfolders = build_jobs(subfolders_jobs).run_until_complete().await; + for result in ls_subfolders { + assert_eq!(result.unwrap().retcode.unwrap(), 0); + } + longest_job.wait().await; + assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); + Ok(()) + } + /* + #[tokio::test] + async fn test_exec_multiple_jobs_nowait() -> UResult<()> { + const REPEATS: usize = 10; + let job = JobMeta::from_shell("whoami"); + let sleep_jobs: Vec = (0..=REPEATS).map(|_| job.clone()).collect(); + build_jobs(sleep_jobs).spawn().await; + let mut completed = 0; + while completed < REPEATS { + let c = pop_completed().await.len(); + if c > 0 { + completed += c; + println!("{}", c); + } + } + Ok(()) + } + */ + #[tokio::test] + async fn test_failing_shell_job() -> UResult<()> { + let job = JobMeta::from_shell("lol_kek_puk"); + let job_result = build_jobs(job).run_one_until_complete().await.unwrap(); + let output = JobOutput::from_raw(&job_result.result.unwrap()); + assert!(output.is_none()); + assert!(job_result.retcode.is_none()); + Ok(()) + } +}