diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index 99cfed0..f6335a2 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -19,7 +19,7 @@ struct Args { enum Cmd { Agents(LD), Jobs(JobALD), - Jobmap(JmALD), + Jobmap(JobMapALD), } #[derive(StructOpt, Debug)] @@ -28,6 +28,9 @@ enum JobALD { #[structopt(long, parse(try_from_str = parse_uuid))] agent: Option, + #[structopt(long)] + alias: String, + #[structopt(subcommand)] cmd: JobCmd, }, @@ -42,13 +45,12 @@ enum JobCmd { } #[derive(StructOpt, Debug)] -enum JmALD { +enum JobMapALD { Add { #[structopt(parse(try_from_str = parse_uuid))] agent_uid: Uuid, - #[structopt(parse(try_from_str = parse_uuid))] - job_uids: Vec, + job_idents: Vec, }, List { #[structopt(parse(try_from_str = parse_uuid))] @@ -111,21 +113,29 @@ async fn process_cmd(args: Args) { Cmd::Jobs(action) => match action { JobALD::Add { cmd: JobCmd::Cmd(cmd), + alias, agent: _agent, } => { - let job = JobMeta::from_shell(cmd.join(" ")).unwrap(); + let job = JobMeta::builder() + .with_shell(cmd.join(" ")) + .with_alias(alias) + .build() + .unwrap(); printer(cli_handler.upload_jobs(&[job]).await, json); } JobALD::LD(LD::List { uid }) => printer(cli_handler.get_jobs(uid).await, json), JobALD::LD(LD::Delete { uid }) => printer(cli_handler.del(Some(uid)).await, json), }, Cmd::Jobmap(action) => match action { - JmALD::Add { + JobMapALD::Add { agent_uid, - job_uids, - } => printer(cli_handler.set_jobs(Some(agent_uid), &job_uids).await, json), - JmALD::List { uid } => printer(cli_handler.get_agent_jobs(uid).await, json), - JmALD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json), + job_idents, + } => printer( + cli_handler.set_jobs(Some(agent_uid), &job_idents).await, + json, + ), + JobMapALD::List { uid } => printer(cli_handler.get_agent_jobs(uid).await, json), + JobMapALD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json), }, } } diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 28c5467..96a3651 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -5,8 +5,8 @@ name = "u_server" version = "0.1.0" [dependencies] -env_logger = "0.7.1" log = "0.4.11" +simplelog = "0.10" thiserror = "*" warp = "0.2.4" uuid = { version = "0.6.5", features = ["serde", "v4"] } diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index a3ab7fb..c104968 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -116,7 +116,11 @@ impl UDB { Ok(result) } - pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &[Uuid]) -> ULocalResult<()> { + pub fn set_jobs_for_agent( + &self, + agent_uid: &Uuid, + job_uids: &[Uuid], + ) -> ULocalResult> { use schema::{agents::dsl::agents, jobs::dsl::jobs, results}; if let Err(DslError::NotFound) = agents.find(agent_uid).first::(&self.conn) { return Err(ULocalError::NotFound(agent_uid.to_string())); @@ -145,7 +149,8 @@ impl UDB { diesel::insert_into(results::table) .values(&job_requests) .execute(&self.conn)?; - Ok(()) + let assigned_uids = job_requests.iter().map(|aj| aj.id).collect(); + Ok(assigned_uids) } pub fn del_jobs(&self, uids: &Vec) -> ULocalResult { diff --git a/bin/u_server/src/filters.rs b/bin/u_server/src/filters.rs new file mode 100644 index 0000000..a9c51f0 --- /dev/null +++ b/bin/u_server/src/filters.rs @@ -0,0 +1,88 @@ +use crate::handlers::Endpoints; +use serde::de::DeserializeOwned; +use std::env; +use u_lib::{ + messaging::{AsMsg, BaseMessage}, + models::*, +}; +use uuid::Uuid; +use warp::{body, Filter, Rejection, Reply}; + +fn get_content() -> impl Filter,), Error = Rejection> + Clone +where + M: AsMsg + Sync + Send + DeserializeOwned + 'static, +{ + body::content_length_limit(1024 * 64).and(body::json::>()) +} + +pub fn make_filters() -> impl Filter + Clone { + let infallible_none = |_| async { Ok::<(Option,), std::convert::Infallible>((None,)) }; + + let get_agents = warp::get() + .and(warp::path("get_agents")) + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) + .and_then(Endpoints::get_agents); + + let upload_jobs = warp::post() + .and(warp::path("upload_jobs")) + .and(get_content::>()) + .and_then(Endpoints::upload_jobs); + + let get_jobs = warp::get() + .and(warp::path("get_jobs")) + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) + .and_then(Endpoints::get_jobs); + + let get_agent_jobs = warp::get() + .and(warp::path("get_agent_jobs")) + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) + .and_then(|uid| Endpoints::get_agent_jobs(uid, false)); + + let get_personal_jobs = warp::get() + .and(warp::path("get_agent_jobs")) + .and(warp::path::param::().map(Some)) + .and_then(|uid| Endpoints::get_agent_jobs(uid, true)); + + let del = warp::get() + .and(warp::path("del")) + .and(warp::path::param::()) + .and_then(Endpoints::del); + + let set_jobs = warp::post() + .and(warp::path("set_jobs")) + .and(warp::path::param::()) + .and(get_content::>()) + .and_then(Endpoints::set_jobs); + + let report = warp::post() + .and(warp::path("report")) + .and(get_content::>().and_then(Endpoints::report)); + + let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); + let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); + + let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); + + let auth_zone = auth_header.and( + get_agents + .or(get_jobs) + .or(upload_jobs) + .or(del) + .or(set_jobs) + .or(get_agent_jobs), + ); + + agent_zone.or(auth_zone) +} diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 76d26de..f792188 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -34,7 +34,7 @@ pub struct Endpoints; #[cfg_attr(test, automock)] impl Endpoints { pub async fn add_agent(msg: Agent) -> Result, Rejection> { - debug!("hnd: add_agent"); + info!("hnd: add_agent"); UDB::lock_db() .insert_agent(&msg) .map(|_| build_ok("")) @@ -42,7 +42,7 @@ impl Endpoints { } pub async fn get_agents(uid: Option) -> Result, Rejection> { - debug!("hnd: get_agents"); + info!("hnd: get_agents"); UDB::lock_db() .get_agents(uid) .map(|m| build_message(m)) @@ -50,7 +50,7 @@ impl Endpoints { } pub async fn get_jobs(uid: Option) -> Result, Rejection> { - debug!("hnd: get_jobs"); + info!("hnd: get_jobs"); UDB::lock_db() .get_jobs(uid) .map(|m| build_message(m)) @@ -91,7 +91,7 @@ impl Endpoints { pub async fn upload_jobs( msg: BaseMessage<'static, Vec>, ) -> Result, Rejection> { - debug!("hnd: upload_jobs"); + info!("hnd: upload_jobs"); UDB::lock_db() .insert_jobs(&msg.into_inner()) .map(|_| build_ok("")) @@ -99,7 +99,7 @@ impl Endpoints { } pub async fn del(uid: Uuid) -> Result, Rejection> { - debug!("hnd: del"); + info!("hnd: del"); let db = UDB::lock_db(); let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; for del_fn in del_fns { @@ -113,19 +113,30 @@ impl Endpoints { pub async fn set_jobs( agent_uid: Uuid, - msg: BaseMessage<'static, Vec>, + msg: BaseMessage<'static, Vec>, ) -> Result, Rejection> { - debug!("hnd: set_jobs"); - UDB::lock_db() - .set_jobs_for_agent(&agent_uid, &msg.into_inner()) - .map(|_| build_ok("")) - .or_else(|e| Ok(build_err(e))) + info!("hnd: set_jobs_by_alias"); + let jobs: Result, ULocalError> = msg + .into_inner() + .into_iter() + .map(|ident| { + Uuid::parse_str(&ident) + .or_else(|_| UDB::lock_db().find_job_by_alias(&ident).map(|j| j.id)) + }) + .collect(); + match jobs { + Ok(j) => UDB::lock_db() + .set_jobs_for_agent(&agent_uid, &j) + .map(|assigned_uids| build_message(assigned_uids)) + .or_else(|e| Ok(build_err(e))), + Err(e) => Ok(build_err(e)), + } } pub async fn report( msg: BaseMessage<'static, Vec>, ) -> Result, Rejection> { - debug!("hnd: report"); + info!("hnd: report"); let id = msg.id; let mut failed = vec![]; for entry in msg.into_inner() { diff --git a/bin/u_server/src/lib.rs b/bin/u_server/src/lib.rs index 1f6b6bb..6b24a9a 100644 --- a/bin/u_server/src/lib.rs +++ b/bin/u_server/src/lib.rs @@ -1,11 +1,5 @@ -mod db; -mod handlers; - -use warp::{body, Filter, Rejection, Reply}; - #[macro_use] extern crate log; -extern crate env_logger; #[macro_use] extern crate mockall; @@ -17,26 +11,16 @@ extern crate openssl; #[macro_use] extern crate diesel; // +mod db; +mod filters; +mod handlers; use db::UDB; -#[double] -use handlers::Endpoints; -use serde::de::DeserializeOwned; -use std::env; -use u_lib::{ - config::MASTER_PORT, - messaging::{AsMsg, BaseMessage}, - models::*, - utils::init_env, -}; -use uuid::Uuid; +use filters::make_filters; +use u_lib::{config::MASTER_PORT, models::*, utils::init_env}; +use warp::Filter; -fn get_content() -> impl Filter,), Error = Rejection> + Clone -where - M: AsMsg + Sync + Send + DeserializeOwned + 'static, -{ - body::content_length_limit(1024 * 64).and(body::json::>()) -} +const LOGFILE: &str = "u_server.log"; fn prefill_jobs() { let agent_hello = JobMeta::builder() @@ -47,86 +31,38 @@ fn prefill_jobs() { UDB::lock_db().insert_jobs(&[agent_hello]).ok(); } -fn init() { - init_env(); - env_logger::init(); - prefill_jobs(); +fn init_logger() { + use simplelog::*; + use std::fs::OpenOptions; + let log_cfg = ConfigBuilder::new() + .set_time_format_str("%x %X") + .set_time_to_local(true) + .build(); + let logfile = OpenOptions::new() + .append(true) + .create(true) + .open(LOGFILE) + .unwrap(); + let loggers = vec![ + WriteLogger::new(LevelFilter::Info, log_cfg.clone(), logfile) as Box, + TermLogger::new( + LevelFilter::Info, + log_cfg, + TerminalMode::Stderr, + ColorChoice::Auto, + ), + ]; + CombinedLogger::init(loggers).unwrap(); } -fn make_filters() -> impl Filter + Clone { - let infallible_none = |_| async { Ok::<(Option,), std::convert::Infallible>((None,)) }; - - let get_agents = warp::get() - .and(warp::path("get_agents")) - .and( - warp::path::param::() - .map(Some) - .or_else(infallible_none), - ) - .and_then(Endpoints::get_agents); - - let upload_jobs = warp::post() - .and(warp::path("upload_jobs")) - .and(get_content::>()) - .and_then(Endpoints::upload_jobs); - - let get_jobs = warp::get() - .and(warp::path("get_jobs")) - .and( - warp::path::param::() - .map(Some) - .or_else(infallible_none), - ) - .and_then(Endpoints::get_jobs); - - let get_agent_jobs = warp::get() - .and(warp::path("get_agent_jobs")) - .and( - warp::path::param::() - .map(Some) - .or_else(infallible_none), - ) - .and_then(|uid| Endpoints::get_agent_jobs(uid, false)); - - let get_personal_jobs = warp::get() - .and(warp::path("get_agent_jobs")) - .and(warp::path::param::().map(Some)) - .and_then(|uid| Endpoints::get_agent_jobs(uid, true)); - - let del = warp::get() - .and(warp::path("del")) - .and(warp::path::param::()) - .and_then(Endpoints::del); - - let set_jobs = warp::post() - .and(warp::path("set_jobs")) - .and(warp::path::param::()) - .and(get_content::>()) - .and_then(Endpoints::set_jobs); - - let report = warp::post() - .and(warp::path("report")) - .and(get_content::>().and_then(Endpoints::report)); - - let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); - let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); - - let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); - - let auth_zone = auth_header.and( - get_agents - .or(get_jobs) - .or(upload_jobs) - .or(del) - .or(set_jobs) - .or(get_agent_jobs), - ); - - agent_zone.or(auth_zone) +fn init_all() { + init_logger(); + init_env(); + prefill_jobs(); } pub async fn serve() { - init(); + init_all(); let routes = make_filters(); warp::serve(routes.with(warp::log("warp"))) .run(([0, 0, 0, 0], MASTER_PORT)) @@ -136,9 +72,12 @@ pub async fn serve() { #[cfg(test)] mod tests { use super::*; + #[double] + use crate::handlers::Endpoints; use handlers::build_ok; use mockall::predicate::*; use test_case::test_case; + use uuid::Uuid; use warp::test::request; #[test_case(Some(Uuid::new_v4()))] @@ -158,6 +97,6 @@ mod tests { .filter(&make_filters()) .await .unwrap(); - mock.checkpoint() + mock.checkpoint(); } } diff --git a/integration/docker-compose.yml b/integration/docker-compose.yml index 997f1bf..bd83d3e 100644 --- a/integration/docker-compose.yml +++ b/integration/docker-compose.yml @@ -20,7 +20,7 @@ services: expose: - '63714' environment: - RUST_LOG: warp + RUST_LOG: warp=info env_file: - ../.env healthcheck: diff --git a/integration/tests/helpers/panel.rs b/integration/tests/helpers/panel.rs index 1d616c7..f250332 100644 --- a/integration/tests/helpers/panel.rs +++ b/integration/tests/helpers/panel.rs @@ -21,8 +21,8 @@ impl Panel { from_slice(&result.stdout).unwrap() } - pub fn output(args: &str) -> Value { - let splitted = split(args).unwrap(); + pub fn output>(args: S) -> Value { + let splitted = split(args.into().as_ref()).unwrap(); Self::output_argv( splitted .iter() @@ -32,9 +32,22 @@ impl Panel { ) } - pub fn check_output(args: &str) -> Vec { + fn status_is_ok(data: &Value) { + assert_eq!( + data["status"], "ok", + "Panel failed with erroneous status: {}", + data["data"] + ); + } + + pub fn check_status>(args: S) { + let result = Self::output(args); + Self::status_is_ok(&result); + } + + pub fn check_output>(args: S) -> Vec { let result = Self::output(args); - assert_eq!(result["status"], "ok"); + Self::status_is_ok(&result); result["data"].as_array().unwrap().clone() } } diff --git a/integration/tests/tests.rs b/integration/tests/tests.rs index 5b92396..2cc9bce 100644 --- a/integration/tests/tests.rs +++ b/integration/tests/tests.rs @@ -3,6 +3,8 @@ mod helpers; use helpers::{AgentClient, Panel}; use serde_json::json; +use std::thread::sleep; +use std::time::Duration; use uuid::Uuid; type TestResult = Result>; @@ -28,27 +30,51 @@ async fn register_agent() -> Uuid { "state":"New", "token":null, "username":"root"} - }]} + } + ]} }; cli.post("report", &agent_data).await; agent_uid } #[tokio::test] -async fn test_first_connection() -> TestResult { - let uid = register_agent().await; +async fn test_registration() -> TestResult { + let agent_uid = register_agent().await; let agents = Panel::check_output("agents list"); - dbg!(&agents); - assert_eq!(agents.len(), 2); let found = agents .iter() - .find(|v| v["id"].as_str().unwrap() == uid.to_string()); + .find(|v| v["id"].as_str().unwrap() == agent_uid.to_string()); assert!(found.is_some()); Ok(()) } #[tokio::test] async fn test_setup_tasks() -> TestResult { - register_agent().await; + let agent_uid = Panel::check_output("agents list")[0]["id"].clone(); + dbg!(&agent_uid); + let job_alias = "passwd_contents"; + let cmd = format!("jobs add --alias {} 'cat /etc/passwd'", job_alias); + Panel::check_status(cmd); + let cmd = format!("jobmap add {} {}", agent_uid, job_alias); + let assigned_uids = Panel::check_output(cmd); + dbg!(&assigned_uids); + loop { + let result = Panel::check_output(format!("jobmap list {}", assigned_uids[0])); + dbg!(&result); + match result.get(0) { + Some(entry) if entry["state"] == "Finished" => { + println!("{}", result[0]); + break; + } + None => { + eprintln!("jobmap list is empty (bad bad bad)"); + continue; + } + _ => { + sleep(Duration::from_secs(1)); + eprintln!("waiting for task"); + } + } + } Ok(()) } diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 1481ff6..636a0dc 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -2,7 +2,7 @@ use crate::{ config::{MASTER_PORT, MASTER_SERVER}, messaging::{AsMsg, BaseMessage}, - models::*, + models, utils::{opt_to_string, VecDisplay}, UError, UResult, }; @@ -52,24 +52,24 @@ impl ClientHandler { // // get jobs for client #[api_route("GET")] - fn get_agent_jobs(&self, url_param: Option) -> VecDisplay {} + fn get_agent_jobs(&self, url_param: Option) -> VecDisplay {} // // send something to server #[api_route("POST")] - fn report(&self, payload: &M) -> Empty {} + fn report(&self, payload: &M) -> models::Empty {} //##########// Admin area //##########// /// client listing #[api_route("GET")] - fn get_agents(&self, url_param: Option) -> VecDisplay {} + fn get_agents(&self, url_param: Option) -> VecDisplay {} // // get all available jobs #[api_route("GET")] - fn get_jobs(&self, url_param: Option) -> VecDisplay {} + fn get_jobs(&self, url_param: Option) -> VecDisplay {} // // create and upload job #[api_route("POST")] - fn upload_jobs(&self, payload: &[JobMeta]) -> Empty {} + fn upload_jobs(&self, payload: &[models::JobMeta]) -> models::Empty {} // // delete something #[api_route("GET")] @@ -77,5 +77,5 @@ impl ClientHandler { // // set jobs for client #[api_route("POST")] - fn set_jobs(&self, url_param: Option, payload: &[Uuid]) -> Empty {} + fn set_jobs(&self, url_param: Option, payload: &[String]) -> VecDisplay {} }