diff --git a/.gitignore b/.gitignore index 53eaa21..f8abd6f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target **/*.rs.bk +/.idea +/data \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index fd7382c..e1a1cda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "bin/u_run", "bin/u_server", "lib/u_lib", + "lib/u_api_proc_macro" ] [profile.release] diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index 05d76b9..a854863 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -10,17 +10,16 @@ extern crate log; extern crate env_logger; use std::env; +use tokio::time::{sleep, Duration}; use u_lib::{ api::ClientHandler, - models::{gather}, - build_jobs_with_result, - pop_completed, + builder::JobBuilder, + cache::JobCache, + executor::pop_completed, + models::{AssignedJob, ExecResult}, UID, - ExactJob, - JobCache, //daemonize }; -use tokio::{time::{Duration, sleep}}; #[macro_export] macro_rules! retry_until_ok { @@ -28,32 +27,45 @@ macro_rules! retry_until_ok { loop { match $body { Ok(r) => break r, - Err(e) => error!("{:?}", e) + Err(e) => error!("{:?}", e), }; sleep(Duration::from_secs(5)).await; } - } + }; } -async fn process_request(job_requests: Vec, client: &ClientHandler) { +async fn process_request(job_requests: Vec, client: &ClientHandler) { if job_requests.len() > 0 { - for jr in job_requests.iter() { + for jr in &job_requests { if !JobCache::contains(&jr.job_id) { info!("Fetching job: {}", &jr.job_id); - let fetched_job = - retry_until_ok!(client.get_jobs(Some(&jr.job_id)).await).pop().unwrap(); + let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await) + .pop() + .unwrap(); JobCache::insert(fetched_job); } - }; + } info!( "Scheduling jobs: \n{}", - job_requests.iter() + job_requests + .iter() .map(|j| j.job_id.to_string()) - .collect::>().join("\n") + .collect::>() + .join("\n") ); - build_jobs_with_result(job_requests) - .spawn() - .await; + let mut builder = JobBuilder::from_request(job_requests); + let errors = builder.pop_errors(); + if errors.len() > 0 { + error!( + "Some errors encountered: \n{}", + errors + .iter() + .map(|j| j.to_string()) + .collect::>() + .join("\n") + ); + } + builder.unwrap_one().spawn().await; } } @@ -63,22 +75,15 @@ async fn main() { env_logger::init(); let arg_ip = env::args().nth(1); let instance = ClientHandler::new(arg_ip); - let cli_info = gather().await; info!("Connecting to the server"); - retry_until_ok!(instance.init(&cli_info).await); - info!("Instanciated! Running main loop"); loop { - let job_requests: Vec = - retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await); + let job_requests: Vec = + retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await); process_request(job_requests, &instance).await; - let result: Vec = pop_completed().await - .into_iter() - .map(|r| r.unwrap()) - .collect(); + let result: Vec = pop_completed().await.into_iter().collect(); if result.len() > 0 { retry_until_ok!(instance.report(&result).await) } sleep(Duration::from_secs(5)).await; } } - diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index 1a873f7..e6b8990 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -1,10 +1,5 @@ use structopt::StructOpt; -use u_lib::{ - api::ClientHandler, - models::JobMeta, - UError -}; -use std::path::PathBuf; +use u_lib::{api::ClientHandler, models::JobMeta, UError}; use uuid::Uuid; const DELIM: &'static str = "*************\n"; @@ -12,14 +7,14 @@ const DELIM: &'static str = "*************\n"; #[derive(StructOpt, Debug)] struct Args { #[structopt(subcommand)] - cmd: Cmd + cmd: Cmd, } #[derive(StructOpt, Debug)] enum Cmd { Agents(LD), Jobs(JobALD), - Jobmap(JmALD) + Jobmap(JmALD), } #[derive(StructOpt, Debug)] @@ -29,16 +24,16 @@ enum JobALD { agent: Option, #[structopt(subcommand)] - cmd: JobCmd + cmd: JobCmd, }, #[structopt(flatten)] - LD(LD) + LD(LD), } #[derive(StructOpt, Debug)] enum JobCmd { #[structopt(external_subcommand)] - Cmd(Vec) + Cmd(Vec), } #[derive(StructOpt, Debug)] @@ -48,16 +43,16 @@ enum JmALD { agent_uid: Uuid, #[structopt(parse(try_from_str = parse_uuid))] - job_uids: Vec + job_uids: Vec, }, List { #[structopt(parse(try_from_str = parse_uuid))] - uid: Option + uid: Option, }, Delete { #[structopt(parse(try_from_str = parse_uuid))] - uid: Uuid - } + uid: Uuid, + }, } #[derive(StructOpt, Debug)] @@ -68,8 +63,8 @@ enum LD { }, Delete { #[structopt(parse(try_from_str = parse_uuid))] - uid: Uuid - } + uid: Uuid, + }, } fn parse_uuid(src: &str) -> Result { @@ -77,40 +72,51 @@ fn parse_uuid(src: &str) -> Result { } async fn process_cmd(cmd: Cmd) -> Result<(), UError> { - let cli_handler = ClientHandler::new(None) - .password("123qwe".to_string()); + let cli_handler = ClientHandler::new(None).password("123qwe".to_string()); match cmd { Cmd::Agents(action) => match action { - LD::List {uid} => cli_handler.get_agents(uid.as_ref()) - .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)), - LD::Delete {uid} => { - println!("{}", cli_handler.del(Some(&uid)).await?); + LD::List { uid } => cli_handler + .get_agents(uid) + .await? + .into_iter() + .for_each(|r| println!("{}{}", DELIM, r)), + LD::Delete { uid } => { + println!("{}", cli_handler.del(Some(uid)).await?); } }, Cmd::Jobs(action) => match action { - JobALD::Add {cmd: JobCmd::Cmd(cmd), agent} => { - let job = JobMeta::from_shell(cmd.join(" ")); + JobALD::Add { + cmd: JobCmd::Cmd(cmd), + agent, + } => { + let job = JobMeta::from_shell(cmd.join(" "))?; let job_uid = job.id; cli_handler.upload_jobs(&vec![job]).await?; if agent.is_some() { - cli_handler.set_jobs(&vec![job_uid], agent.as_ref()).await? + cli_handler.set_jobs(agent, &vec![job_uid]).await? } - }, - JobALD::LD(LD::List {uid}) => cli_handler.get_jobs(uid.as_ref()) - .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)), - JobALD::LD(LD::Delete {uid}) => { - println!("{}", cli_handler.del(Some(&uid)).await?) } - } + JobALD::LD(LD::List { uid }) => cli_handler + .get_jobs(uid) + .await? + .into_iter() + .for_each(|r| println!("{}{}", DELIM, r)), + JobALD::LD(LD::Delete { uid }) => { + println!("{}", cli_handler.del(Some(uid)).await?) + } + }, Cmd::Jobmap(action) => match action { - JmALD::Add {agent_uid, job_uids} => cli_handler.set_jobs(&job_uids, Some(&agent_uid)) - .await?, - JmALD::List {uid} => { - cli_handler.get_agent_jobs(uid.as_ref()) - .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)) - }, - JmALD::Delete {uid} => println!("{}", cli_handler.del(Some(&uid)).await?) - } + JmALD::Add { + agent_uid, + job_uids, + } => cli_handler.set_jobs(Some(agent_uid), &job_uids).await?, + JmALD::List { uid } => cli_handler + .get_agent_jobs(uid) + .await? + .into_iter() + .for_each(|r| println!("{}{}", DELIM, r)), + JmALD::Delete { uid } => println!("{}", cli_handler.del(Some(uid)).await?), + }, } Ok(()) } diff --git a/bin/u_run/src/main.rs b/bin/u_run/src/main.rs index de6c406..501e23d 100644 --- a/bin/u_run/src/main.rs +++ b/bin/u_run/src/main.rs @@ -1,17 +1,12 @@ +use libc::{c_void, fdopen, fwrite, getpid}; use nix::{ sys::memfd::{memfd_create, MemFdCreateFlag}, - unistd::execv -}; -use libc::{ - fdopen, - fwrite, - c_void, - getpid + unistd::execv, }; use std::{ - ffi::{CString, CStr}, - fs::{File, read_dir}, - io::Read + ffi::{CStr, CString}, + fs::{read_dir, File}, + io::Read, }; fn get_uagent_data() -> Option> { @@ -23,7 +18,7 @@ fn get_uagent_data() -> Option> { let mut file = File::open(filename).unwrap(); let mut data: Vec = vec![]; file.read_to_end(&mut data).unwrap(); - return Some(data) + return Some(data); } } None @@ -35,10 +30,7 @@ fn get_proc_name() -> CString { fn create_memfd() -> i32 { let name = get_proc_name(); - let fd = memfd_create( - &name, - MemFdCreateFlag::empty() - ).unwrap(); + let fd = memfd_create(&name, MemFdCreateFlag::empty()).unwrap(); assert!(fd > 0); fd } @@ -57,32 +49,18 @@ fn main() { let ad_len = agent_data.len(); let fd = create_memfd(); let file = unsafe { - let res = fdopen(fd, - CStr::from_bytes_with_nul(b"w\0") - .unwrap().as_ptr() - ); + let res = fdopen(fd, CStr::from_bytes_with_nul(b"w\0").unwrap().as_ptr()); if res.is_null() { panic!("WTF!"); } res }; - let res_len = unsafe { - fwrite( - agent_data.as_ptr() as *const c_void, - 1, ad_len, file - ) - }; + let res_len = unsafe { fwrite(agent_data.as_ptr() as *const c_void, 1, ad_len, file) }; if res_len != ad_len { - panic!("write wasn't successful: {}, need {}", - res_len, ad_len); + panic!("write wasn't successful: {}, need {}", res_len, ad_len); } - let exec_path: String = format!( - "/proc/{}/fd/{}", unsafe { getpid() }, fd - ); + let exec_path: String = format!("/proc/{}/fd/{}", unsafe { getpid() }, fd); let proc_name = get_proc_name(); let args = [proc_name.as_c_str()]; - execv( - &CString::new(exec_path).unwrap(), - &args - ).unwrap(); + execv(&CString::new(exec_path).unwrap(), &args).unwrap(); } diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 238348b..6ef7838 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -12,6 +12,7 @@ thiserror = "*" warp = "0.2.4" uuid = { version = "0.6.5", features = ["serde", "v4"] } once_cell = "1.7.2" +hyper = "0.13.10" [dependencies.diesel] features = ["postgres", "uuid"] diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 27fc8dd..1302443 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -6,7 +6,7 @@ use std::{ sync::{Arc, Mutex, MutexGuard}, }; use u_lib::{ - models::{schema, Agent, ExactJob, IAgent, JobMeta, JobState}, + models::{schema, Agent, AssignedJob, JobMeta, JobState}, ULocalError, ULocalResult, }; use uuid::Uuid; @@ -30,7 +30,7 @@ pub fn lock_db() -> MutexGuard<'static, UDB> { } impl UDB { - pub fn insert_jobs(&self, job_metas: &Vec) -> ULocalResult<()> { + pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> { use schema::jobs; diesel::insert_into(jobs::table) .values(job_metas) @@ -50,10 +50,21 @@ impl UDB { Ok(result) } - pub fn insert_agents(&self, agent: &IAgent) -> ULocalResult<()> { + pub fn find_job_by_alias(&self, alias: &str) -> ULocalResult { + use schema::jobs; + let result = jobs::table + .filter(jobs::alias.eq(alias)) + .first::(&self.conn)?; + Ok(result) + } + + pub fn insert_agent(&self, agent: &Agent) -> ULocalResult<()> { use schema::agents; diesel::insert_into(agents::table) .values(agent) + .on_conflict(agents::id) + .do_update() + .set(agent) .execute(&self.conn)?; Ok(()) } @@ -80,7 +91,11 @@ impl UDB { } //TODO: filters possibly could work in a wrong way, check - pub fn get_exact_jobs(&self, uid: Option, personal: bool) -> ULocalResult> { + pub fn get_exact_jobs( + &self, + uid: Option, + personal: bool, + ) -> ULocalResult> { use schema::results; let mut q = results::table.into_boxed(); if uid.is_some() { @@ -98,11 +113,11 @@ impl UDB { .or_filter(results::job_id.eq(uid.unwrap())) .or_filter(results::id.eq(uid.unwrap())) } - let result = q.load::(&self.conn)?; + let result = q.load::(&self.conn)?; Ok(result) } - pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &Vec) -> ULocalResult<()> { + pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &[Uuid]) -> ULocalResult<()> { use schema::{agents::dsl::agents, jobs::dsl::jobs, results}; if let Err(DslError::NotFound) = agents.find(agent_uid).first::(&self.conn) { return Err(ULocalError::NotFound(agent_uid.to_string())); @@ -122,12 +137,12 @@ impl UDB { } let job_requests = job_uids .iter() - .map(|job_uid| ExactJob { + .map(|job_uid| AssignedJob { job_id: *job_uid, agent_id: *agent_uid, ..Default::default() }) - .collect::>(); + .collect::>(); diesel::insert_into(results::table) .values(&job_requests) .execute(&self.conn)?; diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index da95833..df6e58f 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -1,111 +1,142 @@ use crate::db::{lock_db, UDB}; use diesel::SaveChangesDsl; -use std::fmt::Display; +use hyper::Body; +use serde::Serialize; use u_lib::{ - messaging::{BaseMessage, ToMsg}, - models::{ExactJob, IAgent, JobMeta, JobState}, + messaging::{AsMsg, BaseMessage}, + models::{Agent, AssignedJob, ExecResult, JobMeta, JobState}, ULocalError, }; use uuid::Uuid; use warp::{ http::{Response, StatusCode}, - reply::with_status, Rejection, Reply, }; -fn build_response(code: StatusCode, body: S) -> Result, Rejection> { - Ok(Response::builder() - .status(code) - .body(format!("{}", body)) - .unwrap()) +fn build_response>(code: StatusCode, body: S) -> Response { + Response::builder().status(code).body(body.into()).unwrap() } -fn build_empty_200() -> Result, Rejection> { - build_response(StatusCode::OK, "") +fn build_ok>(body: S) -> Response { + build_response(StatusCode::OK, body) } -pub async fn add_agent(msg: BaseMessage<'_, IAgent>) -> Result { - match lock_db().insert_agents(&msg.into_inner()) { - Ok(_) => build_empty_200(), - Err(e) => build_response(StatusCode::BAD_REQUEST, e), - } +fn build_err(body: S) -> Response { + build_response(StatusCode::BAD_REQUEST, body.to_string()) +} + +fn build_message(m: M) -> Response { + warp::reply::json(&m.as_message()).into_response() +} + +pub async fn add_agent(msg: Agent) -> Result { + debug!("hnd: add_agent"); + lock_db() + .insert_agent(&msg) + .map(|_| build_ok("")) + .or_else(|e| Ok(build_err(e))) } pub async fn get_agents(uid: Option) -> Result { - match lock_db().get_agents(uid) { - Ok(r) => Ok(warp::reply::json(&r.as_message())), - Err(e) => Err(warp::reject()), - } + debug!("hnd: get_agents"); + lock_db() + .get_agents(uid) + .map(|m| build_message(m)) + .or_else(|e| Ok(build_err(e))) } pub async fn get_jobs(uid: Option) -> Result { - match lock_db().get_jobs(uid) { - Ok(r) => Ok(warp::reply::json(&r.as_message())), - Err(e) => Err(warp::reject()), - } + debug!("hnd: get_jobs"); + lock_db() + .get_jobs(uid) + .map(|m| build_message(m)) + .or_else(|e| Ok(build_err(e))) } pub async fn get_agent_jobs(uid: Option, personal: bool) -> Result { + info!("hnd: get_agent_jobs {}", personal); + if personal { + let agents = lock_db().get_agents(uid).unwrap(); + if agents.len() == 0 { + let db = lock_db(); + db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap(); + let job = db.find_job_by_alias("agent_hello").unwrap(); + if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) { + return Ok(build_err(e)); + } + } + } let result = lock_db().get_exact_jobs(uid, personal); match result { Ok(r) => { - let _db = lock_db(); + let db = lock_db(); for j in r.iter() { - _db.update_job_status(j.id, JobState::Running).ok(); + db.update_job_status(j.id, JobState::Running).ok(); } - Ok(warp::reply::json(&r.as_message())) + Ok(build_message(r)) } - Err(e) => Err(warp::reject()), + Err(e) => Ok(build_err(e)), } } pub async fn upload_jobs(msg: BaseMessage<'_, Vec>) -> Result { - match lock_db().insert_jobs(&msg.into_inner()) { - Ok(_) => build_empty_200(), - Err(e) => build_response(StatusCode::BAD_REQUEST, e), - } + debug!("hnd: upload_jobs"); + lock_db() + .insert_jobs(&msg.into_inner()) + .map(|_| build_ok("")) + .or_else(|e| Ok(build_err(e))) } pub async fn del(uid: Uuid) -> Result { + debug!("hnd: del"); let db = lock_db(); let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; for del_fn in del_fns { let affected = del_fn(&db, &vec![uid]).unwrap(); if affected > 0 { - return build_response(StatusCode::OK, affected); + return Ok(build_ok(affected.to_string())); } } - build_response(StatusCode::BAD_REQUEST, 0) + Ok(build_err("0")) } pub async fn set_jobs( agent_uid: Uuid, msg: BaseMessage<'_, Vec>, ) -> Result { - match lock_db().set_jobs_for_agent(&agent_uid, &msg.into_inner()) { - Ok(_) => build_empty_200(), - Err(e) => build_response(StatusCode::BAD_REQUEST, e), - } + debug!("hnd: set_jobs"); + lock_db() + .set_jobs_for_agent(&agent_uid, &msg.into_inner()) + .map(|_| build_ok("")) + .or_else(|e| Ok(build_err(e))) } -pub async fn report(msg: BaseMessage<'_, Vec>) -> Result { - let db = lock_db(); +pub async fn report(msg: BaseMessage<'_, Vec>) -> Result { + debug!("hnd: report"); let id = msg.id; let mut failed = vec![]; - for res in msg.into_inner() { - if id != res.agent_id { - continue; - } - if let Err(e) = res - .save_changes::(&db.conn) - .map_err(ULocalError::from) - { - failed.push(e.to_string()) + for entry in msg.into_inner() { + match entry { + ExecResult::Assigned(res) => { + if id != res.agent_id { + continue; + } + let db = lock_db(); + if let Err(e) = res + .save_changes::(&db.conn) + .map_err(ULocalError::from) + { + failed.push(e.to_string()) + } + } + ExecResult::Agent(a) => { + add_agent(a).await?; + } } } if failed.len() > 0 { let err_msg = ULocalError::ProcessingError(failed.join(", ")); - return build_response(StatusCode::BAD_REQUEST, err_msg); + return Ok(build_err(err_msg)); } - build_empty_200() + Ok(build_ok("")) } diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index d6453df..c606e09 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -1,26 +1,40 @@ mod db; mod handlers; -use warp::{body, Filter, Rejection, Reply}; +use warp::{body, Filter, Rejection}; -extern crate env_logger; +#[macro_use] extern crate log; +extern crate env_logger; use db::lock_db; use serde::de::DeserializeOwned; -use u_lib::{api::Paths, config::MASTER_PORT, models::*}; +use u_lib::{ + config::MASTER_PORT, + messaging::{AsMsg, BaseMessage}, + models::*, +}; use uuid::Uuid; fn get_content() -> impl Filter,), Error = Rejection> + Clone where - M: ToMsg + Sync + Send + DeserializeOwned + 'static, + M: AsMsg + Sync + Send + DeserializeOwned + 'static, { body::content_length_limit(1024 * 64).and(body::json::>()) } +fn prefill_jobs() { + let agent_hello = JobMeta::builder() + .with_type(misc::JobType::Manage) + .with_alias("agent_hello") + .build() + .unwrap(); + lock_db().insert_jobs(&[agent_hello]).ok(); +} + fn init() { env_logger::init(); - lock_db(); + prefill_jobs(); } #[tokio::main] @@ -28,13 +42,8 @@ async fn main() { init(); let infallible_none = |_| async { Ok::<(Option,), std::convert::Infallible>((None,)) }; - let new_agent = warp::post() - .and(warp::path(Paths::init)) - .and(get_content::()) - .and_then(handlers::add_agent); - let get_agents = warp::get() - .and(warp::path(Paths::get_agents)) + .and(warp::path("get_agents")) .and( warp::path::param::() .map(Some) @@ -43,12 +52,12 @@ async fn main() { .and_then(handlers::get_agents); let upload_jobs = warp::post() - .and(warp::path(Paths::upload_jobs)) + .and(warp::path("upload_jobs")) .and(get_content::>()) .and_then(handlers::upload_jobs); let get_jobs = warp::get() - .and(warp::path(Paths::get_jobs)) + .and(warp::path("get_jobs")) .and( warp::path::param::() .map(Some) @@ -57,7 +66,7 @@ async fn main() { .and_then(handlers::get_jobs); let get_agent_jobs = warp::get() - .and(warp::path(Paths::get_agent_jobs)) + .and(warp::path("get_agent_jobs")) .and( warp::path::param::() .map(Some) @@ -66,32 +75,28 @@ async fn main() { .and_then(|uid| handlers::get_agent_jobs(uid, false)); let get_personal_jobs = warp::get() - .and(warp::path(Paths::get_agent_jobs)) + .and(warp::path("get_agent_jobs")) .and(warp::path::param::().map(Some)) .and_then(|uid| handlers::get_agent_jobs(uid, true)); let del = warp::get() - .and(warp::path(Paths::del)) + .and(warp::path("del")) .and(warp::path::param::()) .and_then(handlers::del); let set_jobs = warp::post() - .and(warp::path(Paths::set_jobs)) + .and(warp::path("set_jobs")) .and(warp::path::param::()) .and(get_content::>()) .and_then(handlers::set_jobs); let report = warp::post() - .and(warp::path(Paths::report)) - .and(get_content::>()) - .and_then(handlers::report); + .and(warp::path("report")) + .and(get_content::>().and_then(handlers::report)); let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); - let agent_zone = new_agent - .or(get_jobs.clone()) - .or(get_personal_jobs) - .or(report); + let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); let auth_zone = auth_token.and( get_agents @@ -107,7 +112,7 @@ async fn main() { .run(([0, 0, 0, 0], MASTER_PORT)) .await; } - +/* #[cfg(test)] mod tests { use super::*; @@ -117,3 +122,4 @@ mod tests { } */ } +*/ diff --git a/diesel.toml b/diesel.toml index 6693302..7fa37e0 100644 --- a/diesel.toml +++ b/diesel.toml @@ -3,4 +3,4 @@ [print_schema] file = "lib/u_lib/src/models/schema.rs" -import_types = ["diesel::sql_types::*", "crate::*"] \ No newline at end of file +import_types = ["crate::schema_exports::*"] \ No newline at end of file diff --git a/lib/u_api_proc_macro/Cargo.toml b/lib/u_api_proc_macro/Cargo.toml new file mode 100644 index 0000000..c0e8456 --- /dev/null +++ b/lib/u_api_proc_macro/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "u_api_proc_macro" +version = "0.1.0" +authors = ["plazmoid "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +proc-macro = true + +[dependencies] +syn = { version = "1.0", features = ["full", "extra-traits"] } +quote = "1.0" +strum = { version = "0.20", features = ["derive"] } +proc-macro2 = "1.0" \ No newline at end of file diff --git a/lib/u_api_proc_macro/src/lib.rs b/lib/u_api_proc_macro/src/lib.rs new file mode 100644 index 0000000..46f1efa --- /dev/null +++ b/lib/u_api_proc_macro/src/lib.rs @@ -0,0 +1,179 @@ +use proc_macro::TokenStream; +use proc_macro2::{Ident, TokenStream as TokenStream2}; +use quote::quote; +use std::{collections::HashMap, str::FromStr}; +use strum::EnumString; +use syn::{ + parse_macro_input, punctuated::Punctuated, AttributeArgs, FnArg, ItemFn, Lit, NestedMeta, + ReturnType, Signature, Token, Type, +}; + +#[derive(EnumString, Debug)] +enum ReqMethod { + GET, + POST, +} + +#[derive(Debug)] +struct Endpoint { + method: ReqMethod, +} + +#[derive(Debug)] +struct FnArgs { + url_param: Option, + payload: Option, +} + +#[proc_macro_attribute] +pub fn api_route(args: TokenStream, item: TokenStream) -> TokenStream { + let args: AttributeArgs = parse_macro_input!(args); + let input: ItemFn = parse_macro_input!(item); + let Signature { + ident, + inputs, + generics, + output, + .. + } = input.sig; + let (impl_generics, _, _) = generics.split_for_impl(); + let FnArgs { url_param, payload } = parse_fn_args(inputs); + let Endpoint { method } = parse_attr_args(args); + let url_path = build_url_path(&ident, &url_param); + let return_ty = match output { + ReturnType::Type(_, ty) => quote!(#ty), + ReturnType::Default => quote!(()), + }; + let request = match method { + ReqMethod::GET => build_get(url_path), + ReqMethod::POST => build_post(url_path, &payload), + }; + let url_param = match url_param { + Some(p) => quote!(, param: #p), + None => TokenStream2::new(), + }; + let payload = match payload { + Some(p) => quote!(, payload: #p), + None => TokenStream2::new(), + }; + let q = quote! { + pub async fn #ident #impl_generics( + &self #url_param #payload + ) -> UResult<#return_ty> { + let request = { + #request + }; + let response = request.send().await?; + let content_len = response.content_length(); + let is_success = match response.error_for_status_ref() { + Ok(_) => Ok(()), + Err(e) => Err(UError::from(e)) + }; + match is_success { + Ok(_) => response.json::>() + .await + .map(|msg| msg.into_inner()) + .or_else(|e| { + match content_len { + Some(0) => Ok(Default::default()), + _ => Err(UError::from(e)) + } + }), + Err(UError::NetError(err_src, _)) => Err( + UError::NetError( + err_src, + response.text().await.unwrap() + ) + ), + _ => unreachable!() + } + } + }; + //eprintln!("#!#! RESULT:\n{}", q); + q.into() +} + +fn parse_fn_args(raw: Punctuated) -> FnArgs { + let mut arg: HashMap = raw + .into_iter() + .filter_map(|arg| { + if let FnArg::Typed(argt) = arg { + let mut arg_name = String::new(); + // did you think I won't overplay you? won't destroy? + |arg_ident| -> TokenStream { + let q: TokenStream = quote!(#arg_ident).into(); + arg_name = parse_macro_input!(q as Ident).to_string(); + TokenStream::new() + }(argt.pat); + if &arg_name != "url_param" && &arg_name != "payload" { + panic!("Wrong arg name: {}", &arg_name) + } + let arg_type = *argt.ty.clone(); + Some((arg_name, arg_type)) + } else { + None + } + }) + .collect(); + FnArgs { + url_param: arg.remove("url_param"), + payload: arg.remove("payload"), + } +} + +fn build_get(url: TokenStream2) -> TokenStream2 { + quote! { + let request = self.build_get(#url); + request + } +} + +fn build_post(url: TokenStream2, payload: &Option) -> TokenStream2 { + let pld = match payload { + Some(_) => quote! { + .json(&payload.as_message()) + }, + None => TokenStream2::new(), + }; + quote! { + let request = self.build_post(#url); + request #pld + } +} + +fn build_url_path(path: &Ident, url_param: &Option) -> TokenStream2 { + let url_param = match url_param { + Some(_) => quote! { + + &opt_to_string(param) + }, + None => TokenStream2::new(), + }; + quote! { + &format!( + "{}/{}", + stringify!(#path), + String::new() #url_param + ) + } +} + +fn parse_attr_args(args: AttributeArgs) -> Endpoint { + let mut args = args.into_iter(); + let method = match args.next() { + Some(method) => match method { + NestedMeta::Lit(l) => { + if let Lit::Str(s) = l { + match ReqMethod::from_str(&s.value()) { + Ok(v) => v, + Err(_) => panic!("Unknown method"), + } + } else { + panic!("Method must be a str") + } + } + _ => panic!("Method must be on the first place"), + }, + None => panic!("Method required"), + }; + Endpoint { method } +} diff --git a/lib/u_api_proc_macro/tests/tests.rs b/lib/u_api_proc_macro/tests/tests.rs new file mode 100644 index 0000000..7c4b404 --- /dev/null +++ b/lib/u_api_proc_macro/tests/tests.rs @@ -0,0 +1,15 @@ +/* +use std::fmt::Display; +use u_api_proc_macro::api_route; + +type UResult = Result; + +struct ClientHandler; +struct Paths; + +#[test] +fn test1() { + #[api_route("GET", Uuid)] + fn list(url_param: T) {} +} +*/ diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 231741e..295b216 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -23,7 +23,12 @@ diesel-derive-enum = { version = "1", features = ["postgres"] } chrono = "0.4.19" strum = { version = "0.20", features = ["derive"] } once_cell = "1.7.2" +shlex = "1.0.0" +u_api_proc_macro = { version = "*", path = "../u_api_proc_macro" } [dependencies.diesel] version = "1.4.5" -features = ["postgres", "uuid"] \ No newline at end of file +features = ["postgres", "uuid"] + +[dev-dependencies] +test-case = "1.1.0" \ No newline at end of file diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index bf143b8..5e0cd3c 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -1,127 +1,16 @@ #[allow(non_upper_case_globals)] use crate::{ config::{MASTER_PORT, MASTER_SERVER}, + messaging::{AsMsg, BaseMessage}, models::*, utils::opt_to_string, UError, UResult, }; use reqwest::{Client, RequestBuilder, Url}; use std::{net::Ipv4Addr, str::FromStr}; +use u_api_proc_macro::api_route; use uuid::Uuid; -pub struct Paths; - -#[macro_export] -macro_rules! build_url_by_method { - ( - POST $path:tt, - pname = $($param_name:literal)?, - ptype = $($param_type:ty)?, - urlparam = $($url_param:ty)? - ) => { - | - instance: &ClientHandler - $(, param: &$param_type)? - $(, url: Option<&$url_param>)? - | { - let request = ClientHandler::build_post( - instance, - &format!("{}/{}", - stringify!($path), - String::new() - $(+ - &opt_to_string(url as Option<&$url_param>) - )? - ) - ); - request - $( .json::>(¶m.as_message()) )? - } - }; - ( - GET $path:tt, - pname = $($param_name:literal)?, - ptype = $($param_type:ty)?, - urlparam = $($url_param:ty)? - ) => { - | - instance: &ClientHandler - $(, param: &$param_type)? - $(, url: Option<&$url_param>)? - | { - let request = ClientHandler::build_get( - instance, - &format!("{}/{}", - stringify!($path), - String::new() - $(+ - &opt_to_string(url as Option<&$url_param>) - )? - ) - ); - request - $( .query(&[(stringify!($param_name), param.to_string())]) )? - } - }; -} - -// param_type and result must impl ToMsg -#[macro_export] -macro_rules! build_handler { - ( - $method:tt - $path:tt $( /$url_param:tt )? ( - $( $param_name:literal: )? - $( $param_type:ty )? - ) -> $result:ty - ) => { - impl ClientHandler { - pub async fn $path( - &self - $(, param: &$param_type)? - $(, url_param: Option<&$url_param>)? - ) -> UResult<$result> { - let request = $crate::build_url_by_method!( - $method $path, - pname = $($param_name)?, ptype = $($param_type)?, urlparam = $($url_param)? - )( - self - $(, param as &$param_type)? - $(, url_param as Option<&$url_param>)? - ); - let response = request.send().await?; - let content_len = response.content_length(); - let is_success = match response.error_for_status_ref() { - Ok(_) => Ok(()), - Err(e) => Err(UError::from(e)) - }; - match is_success { - Ok(_) => response.json::>() - .await - .map(|msg| msg.into_inner()) - .or_else(|e| { - match content_len { - Some(0) => Ok(Default::default()), - _ => Err(UError::from(e)) - } - }), - Err(UError::NetError(err_src, _)) => Err( - UError::NetError( - err_src, - response.text().await.unwrap() - ) - ), - _ => unreachable!() - } - } - } - - impl Paths { - pub const $path: &'static str = stringify!($path); - } - }; -} - pub struct ClientHandler { base_url: Url, client: Client, @@ -161,25 +50,33 @@ impl ClientHandler { let rb = self.client.post(self.base_url.join(url).unwrap()); self.set_pwd(rb) } + // + // get jobs for client + #[api_route("GET")] + fn get_agent_jobs(&self, url_param: Option) -> Vec {} + // + // send something to server + #[api_route("POST")] + fn report(&self, payload: &M) {} + + //#/////////#// Admin area //#////////#// + /// client listing + #[api_route("GET")] + fn get_agents(&self, url_param: Option) -> Vec {} + // + // get all available jobs + #[api_route("GET")] + fn get_jobs(&self, url_param: Option) -> Vec {} + // + // create and upload job + #[api_route("POST")] + fn upload_jobs(&self, payload: &Vec) {} + // + // delete something + #[api_route("GET")] + fn del(&self, url_param: Option) -> String {} + // + // set jobs for client + #[api_route("POST")] + fn set_jobs(&self, url_param: Option, payload: &Vec) {} } -////////////////// -// method basic_path(json/query param; additional_url_param) -> return value -// A - admin only -////////////////// -// client listing (A) -build_handler!(GET get_agents/Uuid() -> Vec); -// get jobs for client -build_handler!(GET get_agent_jobs/Uuid() -> Vec); -// get all available jobs (A) -build_handler!(GET get_jobs/Uuid() -> Vec); -// add client to server's db -build_handler!(POST init(IAgent) -> ()); -// create and upload job (A) -build_handler!(POST upload_jobs(Vec) -> ()); -// delete something (A) -build_handler!(GET del/Uuid() -> String); -// set jobs for client (A) -// POST /set_jobs/Uuid json: Vec -build_handler!(POST set_jobs/Uuid(Vec) -> ()); -// report job result -build_handler!(POST report(Vec) -> ()); diff --git a/lib/u_lib/src/builder.rs b/lib/u_lib/src/builder.rs new file mode 100644 index 0000000..7948b33 --- /dev/null +++ b/lib/u_lib/src/builder.rs @@ -0,0 +1,146 @@ +use crate::{ + cache::JobCache, + executor::{FutRes, Waiter, DynFut}, + models::{Agent, AssignedJob, JobMeta, JobType}, + utils::{CombinedResult, OneOrMany}, + UError, +}; +use guess_host_triple::guess_host_triple; +use std::collections::HashMap; + +pub struct JobBuilder { + jobs: Waiter, +} + +impl JobBuilder { + pub fn from_request>(job_requests: J) -> CombinedResult { + let job_requests = job_requests.into_vec(); + let mut prepared: Vec = vec![]; + let mut result = CombinedResult::new(); + for req in job_requests { + let job_meta = JobCache::get(&req.job_id); + if job_meta.is_none() { + result.err(UError::NoJob(req.job_id)); + continue; + } + let job_meta = job_meta.unwrap(); + let built_req = (|| { + Ok(match job_meta.exec_type { + JobType::Shell => { + let meta = JobCache::get(&req.job_id).ok_or(UError::NoJob(req.job_id))?; + let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); + if meta.platform != curr_platform { + return Err(UError::InsuitablePlatform( + meta.platform.clone(), + curr_platform, + )); + } + let job = AssignedJob::new(req.job_id, Some(&req)); + prepared.push(Box::pin(job.run())) + } + JobType::Manage => prepared.push(Box::pin(Agent::run())), + _ => todo!(), + }) + })(); + if let Err(e) = built_req { + result.err(e) + } + } + result.ok(Self { + jobs: Waiter::new(prepared), + }); + result + } + + pub fn from_meta>(job_metas: J) -> CombinedResult { + let job_requests = job_metas + .into_vec() + .into_iter() + .map(|jm| { + let j_uid = jm.id; + JobCache::insert(jm); + AssignedJob::new(j_uid, None) + }) + .collect::>(); + JobBuilder::from_request(job_requests) + } + + /// Spawn jobs and pop results later + pub async fn spawn(mut self) -> Self { + self.jobs = self.jobs.spawn().await; + self + } + + /// Spawn jobs and wait for result + pub async fn wait(self) -> Vec { + self.jobs.spawn().await.wait().await + } + + /// Spawn one job and wait for result + pub async fn wait_one(self) -> FutRes { + self.jobs.spawn().await.wait().await.pop().unwrap() + } +} + +/// Store jobs and get results by name +pub struct NamedJobBuilder { + builder: Option, + job_names: Vec<&'static str>, + results: HashMap<&'static str, FutRes>, +} + +impl NamedJobBuilder { + pub fn from_shell>( + named_jobs: J, + ) -> CombinedResult { + let mut result = CombinedResult::new(); + let jobs: Vec<(&'static str, JobMeta)> = named_jobs + .into_vec() + .into_iter() + .filter_map( + |(alias, cmd)| match JobMeta::builder().with_shell(cmd).build() { + Ok(meta) => Some((alias, meta)), + Err(e) => { + result.err(e); + None + } + }, + ) + .collect(); + result.ok(Self::from_meta(jobs)); + result + } + + pub fn from_meta>(named_jobs: J) -> Self { + let mut job_names = vec![]; + let job_metas: Vec = named_jobs + .into_vec() + .into_iter() + .map(|(alias, meta)| { + job_names.push(alias); + meta + }) + .collect(); + Self { + builder: Some(JobBuilder::from_meta(job_metas).unwrap_one()), + job_names, + results: HashMap::new(), + } + } + + pub async fn wait(mut self) -> Self { + let results = self.builder.take().unwrap().wait().await; + for (name, result) in self.job_names.iter().zip(results.into_iter()) { + self.results.insert(name, result); + } + self + } + + pub fn pop_opt(&mut self, name: &'static str) -> Option { + self.results.remove(name) + } + + pub fn pop(&mut self, name: &'static str) -> FutRes { + self.pop_opt(name).unwrap() + } +} diff --git a/lib/u_lib/src/models/jobs/cache.rs b/lib/u_lib/src/cache.rs similarity index 100% rename from lib/u_lib/src/models/jobs/cache.rs rename to lib/u_lib/src/cache.rs diff --git a/lib/u_lib/src/errors.rs b/lib/u_lib/src/errors.rs index 799ee3d..a180311 100644 --- a/lib/u_lib/src/errors.rs +++ b/lib/u_lib/src/errors.rs @@ -21,6 +21,9 @@ pub enum UError { #[error("Job error: {0}")] JobError(String), + #[error("Argument parsing failed: {0}")] + JobArgsError(&'static str), + #[error("Job is uncompleted yet")] JobUncompleted, @@ -31,7 +34,7 @@ pub enum UError { NoJob(Uuid), #[error("Error opening {0}: {1}")] - FilesystemError(String, String), + FSError(String, String), } impl From for UError { diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index 05fad1b..c4d6492 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -1,13 +1,7 @@ -// list of jobs: job (cmd, args) OR rust fn OR python func + cron-like timing tokoi::time::interval: -// job runner (thread) -// every job runs in other thread/process - -use crate::{models::*, utils::OneOrMany, UResult}; - -use std::collections::HashMap; - +use crate::{utils::OneOrMany, models::ExecResult}; use futures::{future::BoxFuture, lock::Mutex}; use lazy_static::lazy_static; +use std::{collections::HashMap}; use tokio::{ spawn, sync::mpsc::{channel, Receiver, Sender}, @@ -15,7 +9,7 @@ use tokio::{ }; use uuid::Uuid; -pub type FutRes = UResult; +pub type FutRes = ExecResult; pub type DynFut = BoxFuture<'static, FutRes>; lazy_static! { @@ -27,16 +21,16 @@ lazy_static! { }; } -fn get_sender() -> Sender { - FUT_CHANNEL.0.clone() -} - struct JoinInfo { handle: JoinHandle, completed: bool, collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed } +fn get_sender() -> Sender { + FUT_CHANNEL.0.clone() +} + pub struct Waiter { tasks: Vec, fids: Vec, @@ -62,14 +56,11 @@ impl Waiter { tx.send(fid).await.unwrap(); result }; - debug!("before JoinInfo"); let handle = JoinInfo { handle: spawn(task_wrapper), completed: false, collectable, }; - debug!("before push: {}", fid); - //spawn(async {}).await.ok(); FUT_RESULTS.lock().await.insert(fid, handle); } self @@ -88,14 +79,6 @@ impl Waiter { } result } - - pub async fn run_until_complete(self) -> Vec { - self.spawn().await.wait().await - } - - pub async fn run_one_until_complete(self) -> FutRes { - self.run_until_complete().await.pop().unwrap() - } } async fn init_receiver() { @@ -113,8 +96,12 @@ async fn init_receiver() { async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } - -async fn pop_task_if_completed(fid: Uuid) -> Option { +/* +async fn insert_task(fid: Uuid) { + FUT_RESULTS.lock().await.remove(&fid) +} +*/ +pub async fn pop_task_if_completed(fid: Uuid) -> Option { let &mut JoinInfo { handle: _, collectable, @@ -132,8 +119,8 @@ async fn pop_task_if_completed(fid: Uuid) -> Option { } } -pub async fn pop_completed() -> Vec { - let mut completed: Vec = vec![]; +pub async fn pop_completed() -> Vec { + let mut completed: Vec = vec![]; let fids = FUT_RESULTS .lock() .await @@ -142,7 +129,7 @@ pub async fn pop_completed() -> Vec { .collect::>(); for fid in fids { if let Some(r) = pop_task_if_completed(fid).await { - completed.push(r); + completed.push(r) } } completed diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index 548b99c..d615391 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -1,5 +1,7 @@ #![allow(non_upper_case_globals)] pub mod api; +pub mod builder; +pub mod cache; pub mod config; pub mod errors; pub mod executor; @@ -9,7 +11,11 @@ pub mod utils; pub use config::UID; pub use errors::{UError, ULocalError, ULocalResult, UResult}; -pub use models::{Jobstate, Jobtype}; // for schema + +pub mod schema_exports { + pub use crate::models::{Agentstate, Jobstate, Jobtype}; + pub use diesel::sql_types::*; +} #[macro_use] extern crate lazy_static; diff --git a/lib/u_lib/src/messaging.rs b/lib/u_lib/src/messaging.rs index e026739..7d7a34c 100644 --- a/lib/u_lib/src/messaging.rs +++ b/lib/u_lib/src/messaging.rs @@ -1,43 +1,54 @@ -use serde::{ - Serialize, - Deserialize, -}; -use std::{ - borrow::Cow, -}; +use crate::UID; +use serde::{Deserialize, Serialize}; +use std::borrow::Cow; use uuid::Uuid; -use crate::{UID}; -//this is only for impl From for Cow pub struct Moo<'cow, T: Clone>(pub Cow<'cow, T>); -pub trait ToMsg: Clone { +pub trait AsMsg: Clone + Serialize { fn as_message<'m>(&'m self) -> BaseMessage<'m, Self> - where Moo<'m, Self>: From<&'m Self> { + where + Moo<'m, Self>: From<&'m Self>, + { BaseMessage::new(self) } } +impl<'cow, M: AsMsg> From for Moo<'cow, M> { + #[inline] + fn from(obj: M) -> Moo<'cow, M> { + Moo(Cow::Owned(obj)) + } +} + +impl<'cow, M: AsMsg> From<&'cow M> for Moo<'cow, M> { + #[inline] + fn from(obj: &'cow M) -> Moo<'cow, M> { + Moo(Cow::Borrowed(obj)) + } +} + +impl AsMsg for Vec {} + #[derive(Serialize, Deserialize, Debug)] -pub struct BaseMessage<'cow, I> -where I: ToMsg { +pub struct BaseMessage<'cow, I: AsMsg> { pub id: Uuid, - inner: Cow<'cow, I> + inner: Cow<'cow, I>, } -impl<'cow, I> BaseMessage<'cow, I> - where I: ToMsg -{ +impl<'cow, I: AsMsg> BaseMessage<'cow, I> { pub fn new(inner: C) -> Self - where C: Into> { + where + C: Into>, + { let Moo(inner) = inner.into(); Self { id: UID.clone(), - inner + inner, } } pub fn into_inner(self) -> I { self.inner.into_owned() } -} \ No newline at end of file +} diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 61bc57b..fe213b9 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -1,22 +1,33 @@ -use diesel::{Identifiable, Insertable, Queryable}; +use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; +use diesel_derive_enum::DbEnum; use serde::{Deserialize, Serialize}; -use std::fmt; -use std::time::SystemTime; +use std::{fmt, time::SystemTime}; +use strum::Display; use crate::{ - models::{ - jobs::{build_jobs, JobMeta, JobOutput}, - schema::*, - }, - utils::{systime_to_string, vec_to_string}, + builder::NamedJobBuilder, + models::{schema::*, ExecResult}, + unwrap_enum, + utils::systime_to_string, UID, }; use guess_host_triple::guess_host_triple; use uuid::Uuid; +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)] +#[PgType = "AgentState"] +#[DieselType = "Agentstate"] +pub enum AgentState { + New, + Active, + Banned, +} + //belongs_to -#[derive(Clone, Debug, Serialize, Deserialize, Identifiable, Queryable)] +#[derive( + Clone, Debug, Serialize, Deserialize, Identifiable, Queryable, Insertable, AsChangeset, +)] #[table_name = "agents"] pub struct Agent { pub alias: Option, @@ -27,7 +38,7 @@ pub struct Agent { pub last_active: SystemTime, pub platform: String, pub regtime: SystemTime, - pub status: Option, + pub state: AgentState, pub token: Option, pub username: String, } @@ -38,56 +49,69 @@ impl fmt::Display for Agent { if self.alias.is_some() { out += &format!(" ({})", self.alias.as_ref().unwrap()) } + out += &format!("\nUsername: {}", self.username); out += &format!("\nHostname: {}", self.hostname); out += &format!("\nIs root: {}", self.is_root); out += &format!("\nRoot allowed: {}", self.is_root_allowed); out += &format!("\nLast active: {}", systime_to_string(&self.last_active)); out += &format!("\nPlatform: {}", self.platform); - out += &format!("\nUsername: {}", self.username); + out += &format!("\nState: {}", self.state); write!(f, "{}", out) } } -#[derive(Clone, Debug, Serialize, Deserialize, Insertable)] -#[table_name = "agents"] -pub struct IAgent { - pub alias: Option, - pub id: Uuid, - pub hostname: String, - pub is_root: bool, - pub is_root_allowed: bool, - pub platform: String, - pub status: Option, - pub token: Option, - pub username: String, -} +impl Agent { + pub fn with_id(uid: Uuid) -> Self { + Self { + id: uid, + ..Default::default() + } + } -impl IAgent { pub async fn gather() -> Self { - async fn run_cmd_fast>(cmd: S) -> String { - let jm = JobMeta::from_shell(cmd); - let job_result = build_jobs(jm) - .run_one_until_complete() - .await - .unwrap() - .result - .unwrap(); - JobOutput::from_raw(&job_result) - .map(|o| vec_to_string(&o.into_appropriate())) - .unwrap_or(String::from_utf8_lossy(&job_result).to_string()) + let mut builder = NamedJobBuilder::from_shell(vec![ + ("hostname", "hostname"), + ("is_root", "id -u"), + ("username", "id -un"), + ]) + .unwrap_one() + .wait() + .await; + let decoder = |job_result: ExecResult| { + let assoc_job = unwrap_enum!(job_result, ExecResult::Assigned); + assoc_job.to_string_result().unwrap() + }; + + #[cfg(unix)] + Self { + hostname: decoder(builder.pop("hostname")), + is_root: &decoder(builder.pop("is_root")) == "0", + username: decoder(builder.pop("username")), + platform: guess_host_triple().unwrap_or("unknown").to_string(), + ..Default::default() } + } + pub async fn run() -> ExecResult { #[cfg(unix)] + ExecResult::Agent(Agent::gather().await) + } +} + +impl Default for Agent { + fn default() -> Self { Self { alias: None, id: UID.clone(), - hostname: run_cmd_fast("hostname").await, - is_root: &run_cmd_fast("id -u").await == "0", + hostname: String::new(), + is_root: false, is_root_allowed: false, - platform: guess_host_triple().unwrap_or("Error").to_string(), - status: None, //wtf? + last_active: SystemTime::now(), + platform: String::new(), + regtime: SystemTime::now(), + state: AgentState::New, token: None, - username: run_cmd_fast("id -un").await, + username: String::new(), } } } @@ -98,7 +122,7 @@ mod tests { #[tokio::test] async fn test_gather() { - let cli_info = IAgent::gather().await; - assert_eq!(&cli_info.username, "plazmoid") + let cli_info = Agent::gather().await; + assert_eq!(cli_info.alias, None) } } diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs new file mode 100644 index 0000000..d9350c4 --- /dev/null +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -0,0 +1,143 @@ +use super::JobState; +use crate::{ + cache::JobCache, + models::{schema::*, ExecResult, JobOutput}, + utils::{systime_to_string, TempFile}, + UID, +}; +use diesel::{Identifiable, Insertable, Queryable}; +use serde::{Deserialize, Serialize}; +use std::{fmt, process::Output, string::FromUtf8Error, time::SystemTime}; +use tokio::process::Command; +use uuid::Uuid; + +#[derive( + Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable, AsChangeset, +)] +#[table_name = "results"] +pub struct AssignedJob { + pub agent_id: Uuid, + pub alias: Option, + pub created: SystemTime, + pub id: Uuid, + pub job_id: Uuid, + pub result: Option>, + pub state: JobState, + pub retcode: Option, + pub updated: SystemTime, +} + +impl fmt::Display for AssignedJob { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut out = format!("Result: {}", self.id); + out += &format!("\nAgent: {}", self.agent_id); + out += &format!("\nJob: {}", self.job_id); + if self.alias.is_some() { + out += &format!("\nAlias: {}", self.alias.as_ref().unwrap()); + } + out += &format!("\nUpdated: {}", systime_to_string(&self.updated)); + out += &format!("\nState: {}", self.state); + if self.state == JobState::Finished { + if self.retcode.is_some() { + out += &format!("\nReturn code: {}", self.retcode.unwrap()); + } + if self.result.is_some() { + out += &format!( + "\nResult: {}", + String::from_utf8_lossy(self.result.as_ref().unwrap()) + ); + } + } + write!(f, "{}", out) + } +} + +impl Default for AssignedJob { + fn default() -> Self { + Self { + agent_id: Uuid::nil(), + alias: None, + created: SystemTime::now(), + id: Uuid::new_v4(), + job_id: Uuid::nil(), + result: None, + state: JobState::Queued, + retcode: None, + updated: SystemTime::now(), + } + } +} + +impl AssignedJob { + pub async fn run(mut self) -> ExecResult { + let (argv, _payload) = { + let meta = JobCache::get(&self.job_id).unwrap(); + let extracted_payload = meta + .payload + .as_ref() + .and_then(|p| TempFile::write_exec(p).ok()); + let argv = if let Some(ref p) = &extracted_payload { + meta.argv.replace("{}", &p.get_path()) + } else { + meta.argv.clone() + }; + (argv, extracted_payload) + }; + let mut split_cmd = shlex::split(&argv).unwrap().into_iter(); + let cmd = split_cmd.nth(0).unwrap(); + let args = split_cmd.collect::>(); + let cmd_result = Command::new(cmd).args(args).output().await; + let (data, retcode) = match cmd_result { + Ok(Output { + status, + stdout, + stderr, + }) => ( + JobOutput::new() + .stdout(stdout) + .stderr(stderr) + .into_combined(), + status.code(), + ), + Err(e) => ( + JobOutput::new() + .stderr(e.to_string().into_bytes()) + .into_combined(), + None, + ), + }; + self.result = Some(data); + self.retcode = retcode; + self.updated = SystemTime::now(); + self.state = JobState::Finished; + ExecResult::Assigned(self) + } + + pub fn new(job_id: Uuid, other: Option<&Self>) -> Self { + Self { + agent_id: *UID, + job_id, + ..other.unwrap_or(&Default::default()).clone() + } + } + + pub fn as_job_output(&self) -> Option { + self.result + .as_ref() + .and_then(|r| JobOutput::from_combined(r)) + } + + pub fn to_raw_result(&self) -> Vec { + match self.result.as_ref() { + Some(r) => match JobOutput::from_combined(r) { + Some(o) => o.to_appropriate(), + None => r.clone(), + }, + None => b"No data".to_vec(), + } + } + + pub fn to_string_result(&self) -> Result { + String::from_utf8(self.to_raw_result()) + } +} diff --git a/lib/u_lib/src/models/jobs/builder.rs b/lib/u_lib/src/models/jobs/builder.rs deleted file mode 100644 index 0b6bd1d..0000000 --- a/lib/u_lib/src/models/jobs/builder.rs +++ /dev/null @@ -1,116 +0,0 @@ -use super::{ExactJob, JobCache, JobMeta, JobOutput, JobState, JobType}; -use crate::{ - executor::{DynFut, Waiter}, - utils::OneOrMany, - UError, UResult, -}; -use guess_host_triple::guess_host_triple; -use std::{process::Output, time::SystemTime}; -use tokio::process::Command; -use uuid::Uuid; - -pub struct Job { - exec_type: JobType, - payload: Option>, - result: ExactJob, -} - -impl Job { - fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult { - match job_meta.exec_type { - JobType::Shell => { - let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); - if job_meta.platform != curr_platform { - return Err(UError::InsuitablePlatform( - job_meta.platform.clone(), - curr_platform, - )); - } - let job_meta = job_meta.clone(); - Ok(Self { - exec_type: job_meta.exec_type, - payload: job_meta.payload, - result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id)), - }) - } - _ => todo!(), - } - } - - async fn run(mut self) -> UResult { - match self.exec_type { - JobType::Shell => { - let str_payload = match &self.payload { - Some(box_payload) => String::from_utf8_lossy(box_payload).into_owned(), - None => unimplemented!(), - }; - let mut cmd_parts = str_payload - .split(" ") - .map(String::from) - .collect::>() - .into_iter(); - let cmd = cmd_parts.nth(0).unwrap(); - let args = cmd_parts.collect::>(); - let cmd_result = Command::new(cmd).args(args).output().await; - let (data, retcode) = match cmd_result { - Ok(Output { - status, - stdout, - stderr, - }) => ( - JobOutput::new().stdout(stdout).stderr(stderr).multiline(), - status.code(), - ), - Err(e) => ( - UError::JobError(e.to_string()).to_string().into_bytes(), - None, - ), - }; - self.result.result = Some(data); - self.result.retcode = retcode; - self.result.updated = SystemTime::now(); - self.result.state = JobState::Finished; - } - _ => todo!(), - } - Ok(self.result) - } -} - -pub fn build_jobs_with_result>(job_requests: J) -> Waiter { - let prepared_jobs = job_requests - .into_vec() - .into_iter() - .filter_map(|jr| -> Option { - let job = { - let job_meta = JobCache::get(&jr.job_id); - if job_meta.is_none() { - Err(UError::NoJob(jr.job_id)) - } else { - Job::build(&*job_meta.unwrap(), jr.id) - } - }; - match job { - Ok(j) => Some(Box::pin(j.run())), - Err(e) => { - warn!("Job building error: {}", e); - None - } - } - }) - .collect::>(); - Waiter::new(prepared_jobs) -} - -pub fn build_jobs>(job_metas: J) -> Waiter { - let job_requests = job_metas - .into_vec() - .into_iter() - .map(|jm| { - let j_uid = jm.id; - JobCache::insert(jm); - ExactJob::from_meta(j_uid, None) - }) - .collect::>(); - build_jobs_with_result(job_requests) -} diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 3e1f01b..0fd1c41 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -1,5 +1,5 @@ use super::JobType; -use crate::models::schema::*; +use crate::{models::schema::*, UError, UResult}; use diesel::{Identifiable, Insertable, Queryable}; use guess_host_triple::guess_host_triple; use serde::{Deserialize, Serialize}; @@ -9,7 +9,10 @@ use uuid::Uuid; #[derive(Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable)] #[table_name = "jobs"] pub struct JobMeta { - pub alias: String, + pub alias: Option, + /// string like `bash -c {} -a 1 --arg2`, + /// where {} is replaced by executable's tmp path + pub argv: String, pub id: Uuid, pub exec_type: JobType, //pub schedule: JobSchedule, @@ -17,30 +20,130 @@ pub struct JobMeta { pub payload: Option>, } +impl JobMeta { + pub fn builder() -> JobMetaBuilder { + JobMetaBuilder::default() + } + + pub fn from_shell>(cmd: S) -> UResult { + Self::builder().with_shell(cmd).build() + } +} + impl fmt::Display for JobMeta { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut out = format!("Job {}", self.id); - out += &format!(" ({})", self.alias); + let mut out = format!("Job: {}", self.id); + if self.alias.is_some() { + out += &format!(" ({})", self.alias.as_ref().unwrap()); + } + out += &format!("\nArgv: {}", self.argv); out += &format!("\nExecutable type: {}", self.exec_type); out += &format!("\nPlatform: {}", self.platform); if self.exec_type == JobType::Shell && self.payload.is_some() { + let payload = self.payload.as_ref().unwrap(); + let pld_len = { + let pl = payload.len(); + if pl > 20 { + 20 + } else { + pl + } + }; + let pld_beginning = payload + .iter() + .take(pld_len) + .map(|u| *u) + .collect::>(); out += &format!( - "\nPayload: {}", - String::from_utf8_lossy(self.payload.as_ref().unwrap()) + "\nPayload: {}{}", + String::from_utf8_lossy(&pld_beginning), + if pld_len <= 20 { "" } else { " <...>" } ); } write!(f, "{}", out) } } -impl JobMeta { - pub fn from_shell>(shell_cmd: S) -> Self { - let shell_cmd = shell_cmd.into(); - let job_name = shell_cmd.split(" ").nth(0).unwrap(); +impl Default for JobMeta { + fn default() -> Self { Self { - alias: job_name.to_string(), - payload: Some(shell_cmd.into_bytes()), - ..Default::default() + id: Uuid::new_v4(), + alias: None, + argv: String::from("/bin/bash -c {}"), + exec_type: JobType::Shell, + platform: guess_host_triple().unwrap_or("unknown").to_string(), + payload: None, + } + } +} + +pub struct JobMetaBuilder { + inner: JobMeta, +} + +impl Default for JobMetaBuilder { + fn default() -> Self { + Self { + inner: JobMeta::default(), + } + } +} + +impl JobMetaBuilder { + pub fn with_shell>(mut self, shell_cmd: S) -> Self { + self.inner.argv = shell_cmd.into(); + self + } + + pub fn with_payload>>(mut self, payload: C) -> Self { + self.inner.payload = Some(payload.into()); + self + } + + pub fn with_alias>(mut self, alias: S) -> Self { + self.inner.alias = Some(alias.into()); + self + } + + pub fn with_type(mut self, e_type: JobType) -> Self { + self.inner.exec_type = e_type; + self + } + + pub fn build(self) -> UResult { + let inner = self.inner; + match inner.exec_type { + JobType::Shell => { + let argv_parts = + shlex::split(&inner.argv).ok_or(UError::JobArgsError("Shlex failed"))?; + let empty_err = UError::JobArgsError("Empty argv"); + if argv_parts.get(0).ok_or(empty_err.clone())?.len() == 0 { + return Err(empty_err); + } + match inner.payload.as_ref() { + Some(_) => { + if !inner.argv.contains("{}") { + return Err(UError::JobArgsError( + "Argv contains no executable placeholder", + )); + } else { + () + } + } + None => { + if inner.argv.contains("{}") { + return Err(UError::JobArgsError( + "No payload provided, but argv contains executable placeholder", + )); + } else { + () + } + } + }; + Ok(inner) + } + JobType::Manage => Ok(inner), + _ => todo!(), } } /* @@ -54,15 +157,3 @@ impl JobMeta { }*/ } - -impl Default for JobMeta { - fn default() -> Self { - Self { - id: Uuid::new_v4(), - alias: String::new(), - exec_type: JobType::Shell, - platform: guess_host_triple().unwrap_or("unknown").to_string(), - payload: None, - } - } -} diff --git a/lib/u_lib/src/models/jobs/misc.rs b/lib/u_lib/src/models/jobs/misc.rs index 552603d..9c45195 100644 --- a/lib/u_lib/src/models/jobs/misc.rs +++ b/lib/u_lib/src/models/jobs/misc.rs @@ -34,8 +34,6 @@ pub enum JobType { Manage, Shell, Python, - Binary, - Dummy, } #[derive(Clone, Debug)] @@ -50,12 +48,13 @@ impl JobOutput { const STDERR: &'static str = "STDERR"; #[inline] - fn create_delim(header: &'static str) -> String { + fn create_delim(header: &'static str) -> Vec { format!( - "{border} {head} {border}\n", - border = JobOutput::STREAM_BORDER, + "<{border}{head}{border}>", + border = Self::STREAM_BORDER, head = header ) + .into_bytes() } pub fn new() -> Self { @@ -75,90 +74,65 @@ impl JobOutput { self } - pub fn multiline(&self) -> Vec { + pub fn into_combined(self) -> Vec { let mut result: Vec = vec![]; if self.stdout.len() > 0 { - result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes()); - result.extend(&self.stdout); - result.push(b'\n'); + result.extend(Self::create_delim(Self::STDOUT)); + result.extend(self.stdout); } if self.stderr.len() > 0 { - result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes()); - result.extend(&self.stderr); - result.push(b'\n'); + result.extend(Self::create_delim(Self::STDERR)); + result.extend(self.stderr); } result } - pub fn from_raw(raw: &[u8]) -> Option { - let raw = String::from_utf8_lossy(raw); - let err_header = JobOutput::create_delim(JobOutput::STDERR); - raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) - .map(|s: &str| { - let mut parts = s - .split(&err_header) - .map(|d| d.trim().as_bytes().to_vec()) - .collect::>>() - .into_iter(); - JobOutput::new() - .stdout(parts.next().unwrap()) - .stderr(parts.next().unwrap_or(vec![])) + pub fn from_combined(raw: &[u8]) -> Option { + enum ParseFirst { + Stdout, + Stderr, + } + fn split_by_subslice<'s>(slice: &'s [u8], subslice: &[u8]) -> Option<(&'s [u8], &'s [u8])> { + slice + .windows(subslice.len()) + .position(|w| w == subslice) + .map(|split_pos| { + let splitted = slice.split_at(split_pos); + (&splitted.0[..split_pos], &splitted.1[subslice.len()..]) + }) + } + let splitter = |p: ParseFirst| { + let (first_hdr, second_hdr) = match p { + ParseFirst::Stdout => (Self::STDOUT, Self::STDERR), + ParseFirst::Stderr => (Self::STDERR, Self::STDOUT), + }; + let first_hdr = Self::create_delim(first_hdr); + let second_hdr = Self::create_delim(second_hdr); + split_by_subslice(raw, &first_hdr).map(|(_, p2)| { + match split_by_subslice(p2, &second_hdr) { + Some((p2_1, p2_2)) => Self::new().stdout(p2_1.to_vec()).stderr(p2_2.to_vec()), + None => Self::new().stdout(p2.to_vec()), + } }) + }; + splitter(ParseFirst::Stdout).or(splitter(ParseFirst::Stderr)) } - pub fn into_appropriate(self) -> Vec { + pub fn to_appropriate(&self) -> Vec { + let mut result: Vec = vec![]; if self.stdout.len() > 0 { - self.stdout - } else if self.stderr.len() > 0 { - self.stderr - } else { - b"No data".to_vec() + result.extend(&self.stdout); } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::utils::vec_to_string; - - #[test] - fn test_to_multiline() { - let mut output = JobOutput::new(); - output.stdout = b"lol".to_vec(); - output.stderr = b"kek".to_vec(); - assert_eq!( - vec_to_string(&output.multiline()), - String::from( - "*** STDOUT ***\n\ - lol\n\ - *** STDERR ***\n\ - kek\n" - ) - ) - } - - #[test] - fn test_to_multiline_stderr_only() { - let mut output = JobOutput::new(); - output.stderr = b"kek".to_vec(); - assert_eq!( - vec_to_string(&output.multiline()), - String::from( - "*** STDERR ***\n\ - kek\n" - ) - ) - } - - #[test] - fn test_from_multiline() { - let txt = "*** STDOUT ***\n\ - puk\n" - .as_bytes(); - let output = JobOutput::from_raw(txt).unwrap(); - assert_eq!(vec_to_string(&output.stdout), "puk".to_string()); - assert_eq!(output.stderr.len(), 0); + if self.stderr.len() > 0 { + if result.len() > 0 { + result.push(b'\n'); + } + result.extend(&self.stderr); + } + if result.len() == 0 { + result.extend(b"No data"); + } + result } } diff --git a/lib/u_lib/src/models/jobs/mod.rs b/lib/u_lib/src/models/jobs/mod.rs index 7e1fec6..7111111 100644 --- a/lib/u_lib/src/models/jobs/mod.rs +++ b/lib/u_lib/src/models/jobs/mod.rs @@ -1,7 +1,5 @@ -pub mod builder; -pub mod cache; +pub mod assigned; pub mod meta; pub mod misc; -pub mod result; -pub use {builder::*, cache::*, meta::*, misc::*, result::*}; +pub use {assigned::*, meta::*, misc::*}; diff --git a/lib/u_lib/src/models/jobs/result.rs b/lib/u_lib/src/models/jobs/result.rs deleted file mode 100644 index e9d9bb0..0000000 --- a/lib/u_lib/src/models/jobs/result.rs +++ /dev/null @@ -1,155 +0,0 @@ -use super::JobState; -use crate::{models::schema::*, utils::systime_to_string, UID}; -use diesel::{Identifiable, Insertable, Queryable}; -use serde::{Deserialize, Serialize}; -use std::{fmt, time::SystemTime}; -use uuid::Uuid; - -#[derive( - Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable, AsChangeset, -)] -#[table_name = "results"] -pub struct ExactJob { - pub agent_id: Uuid, - pub created: SystemTime, - pub id: Uuid, - pub job_id: Uuid, - pub result: Option>, - pub state: JobState, - pub retcode: Option, - pub updated: SystemTime, -} - -impl Default for ExactJob { - fn default() -> Self { - Self { - agent_id: Uuid::nil(), - created: SystemTime::now(), - id: Uuid::new_v4(), - job_id: Uuid::nil(), - result: None, - state: JobState::Queued, - retcode: None, - updated: SystemTime::now(), - } - } -} - -impl fmt::Display for ExactJob { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut out = format!("Result {}", self.id); - out += &format!("\nAgent {}", self.agent_id); - out += &format!("\nJob: {}", self.job_id); - out += &format!("\nUpdated: {}", systime_to_string(&self.updated)); - out += &format!("\nState: {}", self.state); - if self.state == JobState::Finished { - if self.retcode.is_some() { - out += &format!("\nReturn code: {}", self.retcode.unwrap()); - } - if self.result.is_some() { - out += &format!( - "\nResult: {}", - String::from_utf8_lossy(self.result.as_ref().unwrap()) - ); - } - } - write!(f, "{}", out) - } -} - -impl ExactJob { - pub fn from_meta(job_id: Uuid, result_id: Option) -> Self { - Self { - id: result_id.unwrap_or(Uuid::new_v4()), - agent_id: *UID, - job_id, - ..Default::default() - } - } - - //pub fn as_job_output(&self) -> JobOutput {} -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - models::jobs::{build_jobs, JobMeta, JobOutput}, - utils::vec_to_string, - UResult, - }; - - #[tokio::test] - async fn test_is_really_async() { - const SLEEP_SECS: u64 = 1; - let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); - let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); - let now = SystemTime::now(); - build_jobs(sleep_jobs).run_until_complete().await; - assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) - } - - #[tokio::test] - async fn test_shell_job() -> UResult<()> { - let job = JobMeta::from_shell("whoami"); - let job_result = build_jobs(job).run_one_until_complete().await; - let stdout = JobOutput::from_raw(&job_result.unwrap().result.unwrap()) - .unwrap() - .stdout; - assert_eq!(vec_to_string(&stdout).trim(), "plazmoid"); - Ok(()) - } - - #[tokio::test] - async fn test_complex_load() -> UResult<()> { - const SLEEP_SECS: u64 = 1; - let now = SystemTime::now(); - let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); - let longest_job = build_jobs(longest_job).spawn().await; - let ls = build_jobs(JobMeta::from_shell("ls")) - .run_one_until_complete() - .await - .unwrap(); - assert_eq!(ls.retcode.unwrap(), 0); - let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap(); - let folders = String::from_utf8_lossy(&result.stdout); - let subfolders_jobs: Vec = folders - .lines() - .map(|f| JobMeta::from_shell(format!("ls {}", f))) - .collect(); - let ls_subfolders = build_jobs(subfolders_jobs).run_until_complete().await; - for result in ls_subfolders { - assert_eq!(result.unwrap().retcode.unwrap(), 0); - } - longest_job.wait().await; - assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); - Ok(()) - } - /* - #[tokio::test] - async fn test_exec_multiple_jobs_nowait() -> UResult<()> { - const REPEATS: usize = 10; - let job = JobMeta::from_shell("whoami"); - let sleep_jobs: Vec = (0..=REPEATS).map(|_| job.clone()).collect(); - build_jobs(sleep_jobs).spawn().await; - let mut completed = 0; - while completed < REPEATS { - let c = pop_completed().await.len(); - if c > 0 { - completed += c; - println!("{}", c); - } - } - Ok(()) - } - */ - #[tokio::test] - async fn test_failing_shell_job() -> UResult<()> { - let job = JobMeta::from_shell("lol_kek_puk"); - let job_result = build_jobs(job).run_one_until_complete().await.unwrap(); - let output = JobOutput::from_raw(&job_result.result.unwrap()); - assert!(output.is_none()); - assert!(job_result.retcode.is_none()); - Ok(()) - } -} diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index ae54c3d..9406e9a 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -1,48 +1,17 @@ mod agent; -pub mod schema; pub mod jobs; +mod result; +pub mod schema; -pub use crate::{ - models::{ - agent::*, - jobs::*, - }, - messaging::*, -}; +use crate::messaging::AsMsg; +pub use crate::models::result::ExecResult; +pub use crate::models::{agent::*, jobs::*}; use uuid::Uuid; -use std::borrow::Cow; - -// with this macro, a type can be used as message (see api) -macro_rules! to_message { - ($($type:ty),+) => { $( - - impl ToMsg for $type {} - - impl<'cow> From<$type> for Moo<'cow, $type> { - #[inline] - fn from(obj: $type) -> Moo<'cow, $type> { - Moo(Cow::Owned(obj)) - } - } - - impl<'cow> From<&'cow $type> for Moo<'cow, $type> { - #[inline] - fn from(obj: &'cow $type) -> Moo<'cow, $type> { - Moo(Cow::Borrowed(obj)) - } - } )+ - } -} -to_message!( - Agent, - IAgent, - JobMeta, - ExactJob, - String, - Vec, - Vec, - Vec, - Vec, - () -); +impl AsMsg for Agent {} +impl AsMsg for AssignedJob {} +impl AsMsg for ExecResult {} +impl AsMsg for JobMeta {} +impl AsMsg for String {} +impl AsMsg for Uuid {} +impl AsMsg for () {} diff --git a/lib/u_lib/src/models/result.rs b/lib/u_lib/src/models/result.rs new file mode 100644 index 0000000..5f77f8f --- /dev/null +++ b/lib/u_lib/src/models/result.rs @@ -0,0 +1,8 @@ +use crate::models::{Agent, AssignedJob}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +pub enum ExecResult { + Assigned(AssignedJob), + Agent(Agent), +} diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index aa58e92..be2edf1 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -1,6 +1,5 @@ table! { - use diesel::sql_types::*; - use crate::*; + use crate::schema_exports::*; agents (id) { alias -> Nullable, @@ -11,15 +10,14 @@ table! { last_active -> Timestamp, platform -> Text, regtime -> Timestamp, - status -> Nullable, + state -> Agentstate, token -> Nullable, username -> Text, } } table! { - use diesel::sql_types::*; - use crate::*; + use crate::schema_exports::*; certificates (id) { agent_id -> Uuid, @@ -29,8 +27,7 @@ table! { } table! { - use diesel::sql_types::*; - use crate::*; + use crate::schema_exports::*; ip_addrs (id) { agent_id -> Uuid, @@ -45,11 +42,11 @@ table! { } table! { - use diesel::sql_types::*; - use crate::*; + use crate::schema_exports::*; jobs (id) { - alias -> Text, + alias -> Nullable, + argv -> Text, id -> Uuid, exec_type -> Jobtype, platform -> Text, @@ -58,11 +55,11 @@ table! { } table! { - use diesel::sql_types::*; - use crate::*; + use crate::schema_exports::*; results (id) { agent_id -> Uuid, + alias -> Nullable, created -> Timestamp, id -> Uuid, job_id -> Uuid, diff --git a/lib/u_lib/src/utils.rs b/lib/u_lib/src/utils.rs index 33b2bbb..ebf3ebe 100644 --- a/lib/u_lib/src/utils.rs +++ b/lib/u_lib/src/utils.rs @@ -1,24 +1,14 @@ +use crate::UError; +use chrono::{offset::Local, DateTime}; use nix::{ - unistd::{ - getppid, - setsid, - fork, - ForkResult, - close as fdclose, - chdir - }, - sys::signal::{ - signal, - Signal, - SigHandler - } + sys::signal::{signal, SigHandler, Signal}, + unistd::{chdir, close as fdclose, fork, getppid, setsid, ForkResult}, }; -use std::process::exit; -use std::time::SystemTime; -use chrono::{ - DateTime, - offset::Local +use std::{ + env::temp_dir, fs, ops::Drop, os::unix::fs::PermissionsExt, path::PathBuf, process::exit, + time::SystemTime, }; +use uuid::Uuid; pub trait OneOrMany { fn into_vec(self) -> Vec; @@ -43,20 +33,22 @@ pub fn daemonize() { setsig(Signal::SIGTSTP, SigHandler::SigIgn); } for fd in 0..=2 { - match fdclose(fd) { _ => () } + match fdclose(fd) { + _ => (), + } } - match chdir("/") { _ => () }; + match chdir("/") { + _ => (), + }; match fork() { - Ok(ForkResult::Parent {..}) => { + Ok(ForkResult::Parent { .. }) => { exit(0); - }, - Ok(ForkResult::Child) => { - match setsid() { _ => () } - } - Err(_) => { - exit(255) } + Ok(ForkResult::Child) => match setsid() { + _ => (), + }, + Err(_) => exit(255), } } @@ -73,10 +65,97 @@ pub fn vec_to_string(v: &[u8]) -> String { pub fn opt_to_string(item: Option) -> String { match item { Some(s) => s.to_string(), - None => String::new() + None => String::new(), } } pub fn systime_to_string(time: &SystemTime) -> String { - DateTime::::from(*time).format("%d/%m/%Y %T").to_string() -} \ No newline at end of file + DateTime::::from(*time) + .format("%d/%m/%Y %T") + .to_string() +} + +pub struct TempFile { + path: PathBuf, +} + +impl TempFile { + pub fn get_path(&self) -> String { + self.path.to_string_lossy().to_string() + } + + pub fn new() -> Self { + let name = Uuid::simple(&Uuid::new_v4()).to_string(); + let mut path = temp_dir(); + path.push(name); + Self { path } + } + + pub fn write_all(&self, data: &[u8]) -> Result<(), String> { + fs::write(&self.path, data).map_err(|e| e.to_string()) + } + + pub fn write_exec(data: &[u8]) -> Result { + let this = Self::new(); + let path = this.get_path(); + this.write_all(data).map_err(|e| (path.clone(), e))?; + let perms = fs::Permissions::from_mode(0o555); + fs::set_permissions(&path, perms).map_err(|e| (path, e.to_string()))?; + Ok(this) + } +} + +impl Drop for TempFile { + fn drop(&mut self) { + fs::remove_file(&self.path).ok(); + } +} + +pub struct CombinedResult { + ok: Vec, + err: Vec, +} + +impl CombinedResult { + pub fn new() -> Self { + Self { + ok: vec![], + err: vec![], + } + } + + pub fn ok>(&mut self, result: I) { + self.ok.extend(result.into_vec()); + } + + pub fn err>(&mut self, err: I) { + self.err.extend(err.into_vec()); + } + + pub fn unwrap(self) -> Vec { + let err_len = self.err.len(); + if err_len > 0 { + panic!("CombinedResult has {} errors", err_len); + } + self.ok + } + + pub fn unwrap_one(self) -> T { + self.unwrap().pop().unwrap() + } + + pub fn pop_errors(&mut self) -> Vec { + self.err.drain(..).collect() + } +} + +#[macro_export] +macro_rules! unwrap_enum { + ($src:ident, $t:path) => { + if let $t(result) = $src { + result + } else { + panic!("wrong type") + } + }; +} diff --git a/lib/u_lib/tests/api_macro.rs b/lib/u_lib/tests/api_macro.rs new file mode 100644 index 0000000..e5fd094 --- /dev/null +++ b/lib/u_lib/tests/api_macro.rs @@ -0,0 +1,17 @@ +/* +use std::fmt::Display; +use u_api_proc_macro::api_route; +use uuid::Uuid; + +struct Paths; +struct ClientHandler; + +#[test] +fn test_api_proc_macro() { + #[api_route("GET", Uuid)] + fn list(&self, msg: T) -> String {} + + #[api_route("POST", Uuid)] + fn report(&self, msg: T) -> String {} +} +*/ diff --git a/lib/u_lib/tests/fixtures/echoer b/lib/u_lib/tests/fixtures/echoer new file mode 100755 index 0000000..4c13f8d Binary files /dev/null and b/lib/u_lib/tests/fixtures/echoer differ diff --git a/lib/u_lib/tests/fixtures/echoer.rs b/lib/u_lib/tests/fixtures/echoer.rs new file mode 100644 index 0000000..cc7eb02 --- /dev/null +++ b/lib/u_lib/tests/fixtures/echoer.rs @@ -0,0 +1,5 @@ +use std::env; + +fn main() { + println!("{}", env::args().nth(1).unwrap_or(String::new())); +} diff --git a/lib/u_lib/tests/jobs/execution.rs b/lib/u_lib/tests/jobs/execution.rs new file mode 100644 index 0000000..598dd8b --- /dev/null +++ b/lib/u_lib/tests/jobs/execution.rs @@ -0,0 +1,161 @@ +use std::{time::SystemTime}; +use u_lib::{ + errors::UError, + models::{ + jobs::{JobMeta}, + ExecResult, + misc::JobType + }, + builder::{JobBuilder, NamedJobBuilder} +}; + +type TestResult = Result>; + +#[tokio::test] +async fn test_is_really_async() { + const SLEEP_SECS: u64 = 1; + let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); + let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); + let now = SystemTime::now(); + JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await; + assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) +} + +#[test_case( + "/bin/sh {}", + Some(b"echo test01 > /tmp/asd; cat /tmp/asd"), + "test01" + ;"sh payload" +)] +#[test_case( + r#"/usr/bin/python -c 'print("test02")'"#, + None, + "test02" + ;"python cmd" +)] +#[test_case( + "/{}", + Some( +br#"#!/bin/sh +TMPPATH=/tmp/lol +mkdir -p $TMPPATH +echo test03 > $TMPPATH/t +cat $TMPPATH/t"# + ), + "test03" + ;"sh multiline payload" +)] +#[test_case( + "/{} 'some msg as arg'", + Some(include_bytes!("../fixtures/echoer")), + "some msg as arg" + ;"standalone binary with args" +)] +#[tokio::test] +async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult { + let mut job = JobMeta::builder().with_shell(cmd); + if let Some(p) = payload { + job = job.with_payload(p); + } + let job = job.build().unwrap(); + let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; + let result = unwrap_enum!(job_result, ExecResult::Assigned); + let result = result.to_string_result().unwrap(); + assert_eq!(result.trim(), expected_result); + Ok(()) +} + +#[tokio::test] +async fn test_complex_load() -> TestResult { + const SLEEP_SECS: u64 = 1; + let now = SystemTime::now(); + let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); + let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await; + let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one() + .wait_one() + .await; + let ls = unwrap_enum!(ls, ExecResult::Assigned); + assert_eq!(ls.retcode.unwrap(), 0); + let folders = ls.to_string_result().unwrap(); + let subfolders_jobs: Vec = folders + .lines() + .map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) + .collect(); + let ls_subfolders = JobBuilder::from_meta(subfolders_jobs) + .unwrap_one() + .wait() + .await; + for result in ls_subfolders { + let result = unwrap_enum!(result, ExecResult::Assigned); + assert_eq!(result.retcode.unwrap(), 0); + } + longest_job.wait().await; + assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); + Ok(()) +} +/* +#[tokio::test] +async fn test_exec_multiple_jobs_nowait() -> UResult<()> { + const REPEATS: usize = 10; + let job = JobMeta::from_shell("whoami"); + let sleep_jobs: Vec = (0..=REPEATS).map(|_| job.clone()).collect(); + build_jobs(sleep_jobs).spawn().await; + let mut completed = 0; + while completed < REPEATS { + let c = pop_completed().await.len(); + if c > 0 { + completed += c; + println!("{}", c); + } + } + Ok(()) +} +*/ +#[tokio::test] +async fn test_failing_shell_job() -> TestResult { + let job = JobMeta::from_shell("lol_kek_puk")?; + let job_result = JobBuilder::from_meta(job) + .unwrap_one() + .wait_one() + .await; + let job_result = unwrap_enum!(job_result, ExecResult::Assigned); + let output = job_result.to_string_result().unwrap(); + assert!(output.contains("No such file")); + assert!(job_result.retcode.is_none()); + Ok(()) +} + +#[test_case( + "/bin/bash {}", + None, + "contains executable" + ; "no binary" +)] +#[test_case( + "/bin/bash", + Some(b"whoami"), + "contains no executable" + ; "no path to binary" +)] +#[tokio::test] +async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult { + let mut job = JobMeta::builder().with_shell(cmd); + if let Some(p) = payload { + job = job.with_payload(p); + } + let err = job.build().unwrap_err(); + let err_msg = unwrap_enum!(err, UError::JobArgsError); + assert!(err_msg.contains(err_str)); + Ok(()) +} + +#[tokio::test] +async fn test_different_job_types() -> TestResult { + let mut jobs = NamedJobBuilder::from_meta(vec![ + ("sleeper", JobMeta::from_shell("sleep 3")?), + ("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?) + ]).wait().await; + let gathered = jobs.pop("gatherer"); + assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None); + Ok(()) +} diff --git a/lib/u_lib/tests/jobs/output.rs b/lib/u_lib/tests/jobs/output.rs new file mode 100644 index 0000000..0fc1522 --- /dev/null +++ b/lib/u_lib/tests/jobs/output.rs @@ -0,0 +1,43 @@ +use u_lib::{models::JobOutput, utils::vec_to_string}; + +const STDOUT: &str = "<***STDOUT***>"; +const STDERR: &str = "<***STDERR***>"; + +#[test_case( + "lol", + "kek", + &format!("{}lol{}kek", STDOUT, STDERR) + ;"stdout stderr" +)] +#[test_case( + "", + "kek", + &format!("{}kek", STDERR) + ;"stderr" +)] +fn test_to_combined(stdout: &str, stderr: &str, result: &str) { + let output = JobOutput::new() + .stdout(stdout.as_bytes().to_vec()) + .stderr(stderr.as_bytes().to_vec()); + assert_eq!(&vec_to_string(&output.into_combined()), result) +} + +#[test_case( + &format!("{}lal{}kik", STDOUT, STDERR), + "lal\nkik" + ;"stdout stderr" +)] +#[test_case( + &format!("{}qeq", STDOUT), + "qeq" + ;"stdout" +)] +#[test_case( + &format!("{}vev", STDERR), + "vev" + ;"stderr" +)] +fn test_from_combined(src: &str, result: &str) { + let output = JobOutput::from_combined(src.as_bytes()).unwrap(); + assert_eq!(vec_to_string(&output.to_appropriate()).trim(), result); +} diff --git a/lib/u_lib/tests/tests.rs b/lib/u_lib/tests/tests.rs new file mode 100644 index 0000000..b0e8959 --- /dev/null +++ b/lib/u_lib/tests/tests.rs @@ -0,0 +1,10 @@ +#[macro_use] +extern crate test_case; + +#[macro_use] +extern crate u_lib; + +mod jobs { + mod execution; + mod output; +} diff --git a/migrations/00000000000000_diesel_initial_setup/down.sql b/migrations/00000000000000_diesel_initial_setup/down.sql new file mode 100644 index 0000000..a9f5260 --- /dev/null +++ b/migrations/00000000000000_diesel_initial_setup/down.sql @@ -0,0 +1,6 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + +DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass); +DROP FUNCTION IF EXISTS diesel_set_updated_at(); diff --git a/migrations/00000000000000_diesel_initial_setup/up.sql b/migrations/00000000000000_diesel_initial_setup/up.sql new file mode 100644 index 0000000..d68895b --- /dev/null +++ b/migrations/00000000000000_diesel_initial_setup/up.sql @@ -0,0 +1,36 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + + + + +-- Sets up a trigger for the given table to automatically set a column called +-- `updated_at` whenever the row is modified (unless `updated_at` was included +-- in the modified columns) +-- +-- # Example +-- +-- ```sql +-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW()); +-- +-- SELECT diesel_manage_updated_at('users'); +-- ``` +CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$ +BEGIN + EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s + FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$ +BEGIN + IF ( + NEW IS DISTINCT FROM OLD AND + NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at + ) THEN + NEW.updated_at := current_timestamp; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/migrations/2020-10-24-111622_create_all/down.sql b/migrations/2020-10-24-111622_create_all/down.sql index 4c1d485..3ded775 100644 --- a/migrations/2020-10-24-111622_create_all/down.sql +++ b/migrations/2020-10-24-111622_create_all/down.sql @@ -5,4 +5,5 @@ DROP TABLE jobs; DROP TABLE agents; DROP TYPE IF EXISTS JobState; -DROP TYPE IF EXISTS JobType; \ No newline at end of file +DROP TYPE IF EXISTS JobType; +DROP TYPE IF EXISTS AgentState; \ No newline at end of file diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index a57754b..ef956ac 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -1,9 +1,10 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -CREATE TYPE JobType AS ENUM ('shell', 'manage', 'binary', 'python'); +CREATE TYPE JobType AS ENUM ('shell', 'manage', 'python'); CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished'); +CREATE TYPE AgentState AS ENUM ('new', 'active', 'banned'); CREATE TABLE IF NOT EXISTS agents ( - alias TEXT + alias TEXT UNIQUE , hostname TEXT NOT NULL , id UUID NOT NULL DEFAULT uuid_generate_v4() , is_root BOOLEAN NOT NULL DEFAULT false @@ -12,7 +13,7 @@ CREATE TABLE IF NOT EXISTS agents ( -- target triplet , platform TEXT NOT NULL , regtime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP - , status TEXT + , state AgentState NOT NULL DEFAULT 'new' -- is needed to processing requests , token TEXT , username TEXT NOT NULL @@ -20,7 +21,7 @@ CREATE TABLE IF NOT EXISTS agents ( ); CREATE TABLE IF NOT EXISTS ip_addrs ( - agent_id UUID NOT NULL + agent_id UUID NOT NULL , check_ts TIMESTAMP NOT NULL , gateway TEXT , id UUID NOT NULL DEFAULT uuid_generate_v4() @@ -33,7 +34,8 @@ CREATE TABLE IF NOT EXISTS ip_addrs ( ); CREATE TABLE IF NOT EXISTS jobs ( - alias TEXT NOT NULL + alias TEXT UNIQUE + , argv TEXT NOT NULL , id UUID NOT NULL DEFAULT uuid_generate_v4() -- Shell, Binary (with program download), -- Python (with program and python download if not exist), Management @@ -44,7 +46,8 @@ CREATE TABLE IF NOT EXISTS jobs ( ); CREATE TABLE IF NOT EXISTS results ( - agent_id UUID NOT NULL + agent_id UUID NOT NULL + , alias TEXT UNIQUE , created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , id UUID NOT NULL DEFAULT uuid_generate_v4() , job_id UUID NOT NULL @@ -58,7 +61,7 @@ CREATE TABLE IF NOT EXISTS results ( ); CREATE TABLE IF NOT EXISTS certificates ( - agent_id UUID NOT NULL + agent_id UUID NOT NULL , id UUID NOT NULL DEFAULT uuid_generate_v4() , is_revoked BOOLEAN NOT NULL DEFAULT FALSE , PRIMARY KEY(id)