diff --git a/Makefile b/Makefile deleted file mode 100644 index ea72f7c..0000000 --- a/Makefile +++ /dev/null @@ -1,26 +0,0 @@ -.PHONY: _pre_build debug release run clean unit integration test - -CARGO=./scripts/cargo_musl.sh - -clean: - ${CARGO} clean - -_pre_build: - docker build -t unki/musllibs ./muslrust - -debug: _pre_build - ${CARGO} build - -release: _pre_build - ${CARGO} build --release - -run: build - ${CARGO} run - -unit: - ${CARGO} test --lib - -integration: - cd ./integration && ./integration_tests.sh - -test: unit integration \ No newline at end of file diff --git a/Makefile.toml b/Makefile.toml new file mode 100644 index 0000000..87cffed --- /dev/null +++ b/Makefile.toml @@ -0,0 +1,48 @@ +[config] +default_to_workspace = false + +[env] +CARGO = "./scripts/cargo_musl.sh" + +[tasks.build_cargo_image] +script = "docker build -t unki/musllibs ./muslrust" + +[tasks.clean] +command = "${CARGO}" +args = ["clean"] + +[tasks.debug] +dependencies = ["build_cargo_image"] +command = "${CARGO}" +args = ["build"] + +[tasks.release] +dependencies = ["build_cargo_image"] +command = "${CARGO}" +args = ["build", "--release"] + +[tasks.run] +script = ''' +echo "Only integration tests are supported." +exit 1 +''' + +[tasks.unit] +command = "${CARGO}" +args = ["test", "--lib", "${@}"] + +[tasks.integration] +script = ''' +cd ./integration +bash integration_tests.sh +''' + +[tasks.gen_schema] +script = ''' +cd ./integration +docker-compose up -d u_server +docker-compose down +''' + +[tasks.test] +dependencies = ["unit", "integration"] \ No newline at end of file diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index fefe9dc..eeb1b0d 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -14,32 +14,28 @@ use u_lib::{ builder::JobBuilder, cache::JobCache, executor::pop_completed, - models::{AssignedJob, ExecResult}, + messaging::Reportable, + models::AssignedJob, UID, //daemonize }; -#[macro_export] -macro_rules! retry_until_ok { - ( $body:expr ) => { - loop { - match $body { - Ok(r) => break r, - Err(e) => error!("{:?}", e), - }; - sleep(Duration::from_secs(5)).await; - } - }; -} +const ITERATION_LATENCY: u64 = 5; pub async fn process_request(job_requests: Vec, client: &ClientHandler) { if job_requests.len() > 0 { for jr in &job_requests { if !JobCache::contains(&jr.job_id) { info!("Fetching job: {}", &jr.job_id); - let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await) - .pop() - .unwrap(); + let fetched_job = loop { + match client.get_jobs(Some(jr.job_id)).await { + Ok(mut result) => break result.pop().unwrap(), + Err(err) => { + error!("{:?} \nretrying...", err); + sleep(Duration::from_secs(ITERATION_LATENCY)).await; + } + } + }; JobCache::insert(fetched_job); } } @@ -71,16 +67,24 @@ pub async fn run_forever() { //daemonize(); env_logger::init(); let arg_ip = env::args().nth(1); - let instance = ClientHandler::new(arg_ip.as_deref()); + let client = ClientHandler::new(arg_ip.as_deref()); info!("Connecting to the server"); loop { - let job_requests: Vec = - retry_until_ok!(instance.get_personal_jobs(Some(*UID)).await).into_builtin_vec(); - process_request(job_requests, &instance).await; - let result: Vec = pop_completed().await.into_iter().collect(); + match client.get_personal_jobs(Some(*UID)).await { + Ok(resp) => { + let job_requests = resp.into_builtin_vec(); + process_request(job_requests, &client).await; + } + Err(err) => { + error!("{:?}", err); + } + } + let result: Vec = pop_completed().await.into_iter().collect(); if result.len() > 0 { - retry_until_ok!(instance.report(&result).await); + if let Err(err) = client.report(&result).await { + error!("{:?}", err); + } } - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(ITERATION_LATENCY)).await; } } diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index 4b86905..39f0a7b 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -1,9 +1,10 @@ use std::env; use std::fmt; +use std::process; use structopt::StructOpt; use u_lib::{ api::ClientHandler, datatypes::DataResult, messaging::AsMsg, models::JobMeta, utils::init_env, - UResult, + UError, UResult, }; use uuid::Uuid; @@ -78,7 +79,7 @@ fn parse_uuid(src: &str) -> Result { Uuid::parse_str(src).map_err(|e| e.to_string()) } -async fn process_cmd(args: Args) { +async fn process_cmd(args: Args) -> UResult<()> { struct Printer { json: bool, } @@ -100,7 +101,7 @@ async fn process_cmd(args: Args) { } } - let token = env::var("ADMIN_AUTH_TOKEN").expect("Authentication token is not set"); + let token = env::var("ADMIN_AUTH_TOKEN").map_err(|_| UError::WrongToken)?; let cli_handler = ClientHandler::new(None).password(token); let printer = Printer { json: args.json }; match args.cmd { @@ -117,8 +118,7 @@ async fn process_cmd(args: Args) { let job = JobMeta::builder() .with_shell(cmd.join(" ")) .with_alias(alias) - .build() - .unwrap(); + .build()?; printer.print(cli_handler.upload_jobs(&[job]).await); } JobALD::LD(LD::List { uid }) => printer.print(cli_handler.get_jobs(uid).await), @@ -133,11 +133,15 @@ async fn process_cmd(args: Args) { JobMapALD::Delete { uid } => printer.print(cli_handler.del(Some(uid)).await), }, } + Ok(()) } #[tokio::main] async fn main() { init_env(); let args: Args = Args::from_args(); - process_cmd(args).await; + if let Err(e) = process_cmd(args).await { + eprintln!("Error: {}", e); + process::exit(1) + } } diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index fa39ad4..d5db645 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -5,7 +5,7 @@ use std::{ sync::{Arc, Mutex, MutexGuard}, }; use u_lib::{ - models::{schema, Agent, AssignedJob, JobMeta, JobState}, + models::{schema, Agent, AgentError, AssignedJob, JobMeta, JobState}, ULocalError, ULocalResult, }; use uuid::Uuid; @@ -37,6 +37,14 @@ impl UDB { .unwrap() } + pub fn report_error(&self, error: &AgentError) -> ULocalResult<()> { + use schema::errors; + diesel::insert_into(errors::table) + .values(error) + .execute(&self.conn)?; + Ok(()) + } + pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> { use schema::jobs; diesel::insert_into(jobs::table) diff --git a/bin/u_server/src/filters.rs b/bin/u_server/src/filters.rs index 59527a0..126f11b 100644 --- a/bin/u_server/src/filters.rs +++ b/bin/u_server/src/filters.rs @@ -2,7 +2,7 @@ use crate::handlers::Endpoints; use serde::de::DeserializeOwned; use std::env; use u_lib::{ - messaging::{AsMsg, BaseMessage}, + messaging::{AsMsg, BaseMessage, Reportable}, models::*, }; use uuid::Uuid; @@ -68,7 +68,7 @@ pub fn make_filters() -> impl Filter let report = warp::post() .and(warp::path("report")) - .and(get_content::>().and_then(Endpoints::report)); + .and(get_content::>().and_then(Endpoints::report)); let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index db8ff4c..d769206 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -3,8 +3,9 @@ use diesel::SaveChangesDsl; use hyper::Body; use serde::Serialize; use u_lib::{ - messaging::{AsMsg, BaseMessage}, - models::{Agent, AgentState, AssignedJob, ExecResult, JobMeta, JobState}, + messaging::{AsMsg, BaseMessage, Reportable}, + models::*, + utils::OneOrVec, ULocalError, }; use uuid::Uuid; @@ -13,15 +14,15 @@ use warp::{ Rejection, Reply, }; -pub fn build_response>(code: StatusCode, body: S) -> Response { +pub fn build_response(code: StatusCode, body: impl Into) -> Response { Response::builder().status(code).body(body.into()).unwrap() } -pub fn build_ok>(body: S) -> Response { +pub fn build_ok(body: impl Into) -> Response { build_response(StatusCode::OK, body) } -pub fn build_err(body: S) -> Response { +pub fn build_err(body: impl ToString) -> Response { build_response(StatusCode::BAD_REQUEST, body.to_string()) } @@ -135,15 +136,15 @@ impl Endpoints { } } - pub async fn report( - msg: BaseMessage<'static, Vec>, + pub async fn report + AsMsg + 'static>( + msg: BaseMessage<'static, Data>, ) -> Result, Rejection> { info!("hnd: report"); let id = msg.id; let mut failed = vec![]; - for entry in msg.into_inner() { + for entry in msg.into_inner().into_vec() { match entry { - ExecResult::Assigned(res) => { + Reportable::Assigned(res) => { if id != res.agent_id { continue; } @@ -155,11 +156,15 @@ impl Endpoints { failed.push(e.to_string()) } } - ExecResult::Agent(mut a) => { + Reportable::Agent(mut a) => { a.state = AgentState::Active; Self::add_agent(a).await?; } - ExecResult::Dummy => (), + Reportable::Error(e) => { + let err = AgentError::from_msg(e, id); + UDB::lock_db().report_error(&err).unwrap(); + } + Reportable::Dummy => (), } } if failed.len() > 0 { diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index cbba6b1..e246bca 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -77,7 +77,7 @@ mod tests { use handlers::build_ok; use mockall::predicate::*; use test_case::test_case; - use u_lib::messaging::{AsMsg, BaseMessage}; + use u_lib::messaging::{AsMsg, BaseMessage, Reportable}; use uuid::Uuid; use warp::test::request; @@ -103,12 +103,12 @@ mod tests { async fn test_report_unauth_successful() { let mock = Endpoints::report_context(); mock.expect() - .withf(|msg: &BaseMessage<'_, Vec>| msg.inner_ref()[0] == ExecResult::Dummy) + .withf(|msg: &BaseMessage<'_, Vec>| msg.inner_ref()[0] == Reportable::Dummy) .returning(|_| Ok(build_ok(""))); request() .path("/report/") .method("POST") - .json(&vec![ExecResult::Dummy].as_message()) + .json(&vec![Reportable::Dummy].as_message()) .filter(&make_filters()) .await .unwrap(); diff --git a/integration/tests/behaviour.rs b/integration/tests/behaviour.rs index 6139d5e..616c7a9 100644 --- a/integration/tests/behaviour.rs +++ b/integration/tests/behaviour.rs @@ -3,9 +3,9 @@ use crate::helpers::Panel; use rstest::rstest; use std::error::Error; -use std::thread::sleep; use std::time::Duration; -use u_lib::models::*; +use tokio::time::sleep; +use u_lib::{messaging::Empty, models::*}; use uuid::Uuid; type TestResult = Result>; @@ -38,7 +38,7 @@ async fn test_setup_tasks() -> TestResult { if result[0].state == JobState::Finished { return Ok(()); } else { - sleep(Duration::from_secs(5)); + sleep(Duration::from_secs(5)).await; eprintln!("waiting for task"); } } diff --git a/integration/tests/fixtures/agent.rs b/integration/tests/fixtures/agent.rs index 928b661..596f9a8 100644 --- a/integration/tests/fixtures/agent.rs +++ b/integration/tests/fixtures/agent.rs @@ -1,4 +1,4 @@ -use u_lib::{api::ClientHandler, models::*}; +use u_lib::{api::ClientHandler, messaging::Reportable, models::*}; use uuid::Uuid; pub struct RegisteredAgent { @@ -29,8 +29,6 @@ pub async fn register_agent() -> RegisteredAgent { id: agent_uid, ..Default::default() }; - cli.report(&vec![ExecResult::Agent(agent_data)]) - .await - .unwrap(); + cli.report(&[Reportable::Agent(agent_data)]).await.unwrap(); RegisteredAgent { uid: agent_uid } } diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 763f5f7..7dbb774 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -66,7 +66,7 @@ impl ClientHandler { // // send something to server #[api_route("POST")] - async fn report(&self, payload: &M) -> messaging::Empty {} + async fn report(&self, payload: &[messaging::Reportable]) -> messaging::Empty {} // // download file #[api_route("GET")] diff --git a/lib/u_lib/src/builder.rs b/lib/u_lib/src/builder.rs index 5657745..4075a24 100644 --- a/lib/u_lib/src/builder.rs +++ b/lib/u_lib/src/builder.rs @@ -1,6 +1,7 @@ use crate::{ UError, UResult, cache::JobCache, executor::{Waiter, DynFut}, - models::{Agent, AssignedJob, JobMeta, JobType, ExecResult}, + models::{Agent, AssignedJob, JobMeta, JobType}, + messaging::Reportable, utils::{CombinedResult, OneOrVec} }; use guess_host_triple::guess_host_triple; @@ -72,12 +73,12 @@ impl JobBuilder { } /// Spawn jobs and wait for result - pub async fn wait(self) -> Vec { + pub async fn wait(self) -> Vec { self.waiter.spawn().await.wait().await } /// Spawn one job and wait for result - pub async fn wait_one(self) -> ExecResult { + pub async fn wait_one(self) -> Reportable { self.waiter.spawn().await.wait().await.pop().unwrap() } } @@ -86,7 +87,7 @@ impl JobBuilder { pub struct NamedJobBuilder { builder: Option, job_names: Vec<&'static str>, - results: HashMap<&'static str, ExecResult>, + results: HashMap<&'static str, Reportable>, } impl NamedJobBuilder { @@ -136,11 +137,11 @@ impl NamedJobBuilder { self } - pub fn pop_opt(&mut self, name: &'static str) -> Option { + pub fn pop_opt(&mut self, name: &'static str) -> Option { self.results.remove(name) } - pub fn pop(&mut self, name: &'static str) -> ExecResult { + pub fn pop(&mut self, name: &'static str) -> Reportable { self.pop_opt(name).unwrap() } } @@ -148,13 +149,13 @@ impl NamedJobBuilder { #[cfg(test)] mod tests { + use super::*; use test_case::test_case; use std::{time::SystemTime}; use crate::{ errors::UError, models::{ - jobs::{JobMeta}, - ExecResult, + JobMeta, misc::JobType }, builder::{JobBuilder, NamedJobBuilder}, @@ -192,7 +193,8 @@ mod tests { TMPPATH=/tmp/lol mkdir -p $TMPPATH echo test03 > $TMPPATH/t - cat $TMPPATH/t"# + cat $TMPPATH/t + rm -rf $TMPPATH"# ), "test03" ;"sh multiline payload" @@ -211,8 +213,8 @@ mod tests { } let job = job.build().unwrap(); let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; - let result = unwrap_enum!(job_result, ExecResult::Assigned); - let result = result.to_string_result().unwrap(); + let result = unwrap_enum!(job_result, Reportable::Assigned); + let result = result.to_string_result(); assert_eq!(result.trim(), expected_result); Ok(()) } @@ -226,9 +228,9 @@ mod tests { let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one() .wait_one() .await; - let ls = unwrap_enum!(ls, ExecResult::Assigned); + let ls = unwrap_enum!(ls, Reportable::Assigned); assert_eq!(ls.retcode.unwrap(), 0); - let folders = ls.to_string_result().unwrap(); + let folders = ls.to_string_result(); let subfolders_jobs: Vec = folders .lines() .map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) @@ -238,7 +240,7 @@ mod tests { .wait() .await; for result in ls_subfolders { - let result = unwrap_enum!(result, ExecResult::Assigned); + let result = unwrap_enum!(result, Reportable::Assigned); assert_eq!(result.retcode.unwrap(), 0); } longest_job.wait().await; @@ -270,8 +272,8 @@ mod tests { .unwrap_one() .wait_one() .await; - let job_result = unwrap_enum!(job_result, ExecResult::Assigned); - let output = job_result.to_string_result().unwrap(); + let job_result = unwrap_enum!(job_result, Reportable::Assigned); + let output = job_result.to_string_result(); assert!(output.contains("No such file")); assert!(job_result.retcode.is_none()); Ok(()) @@ -308,7 +310,7 @@ mod tests { ("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?) ]).wait().await; let gathered = jobs.pop("gatherer"); - assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None); + assert_eq!(unwrap_enum!(gathered, Reportable::Agent).alias, None); Ok(()) } diff --git a/lib/u_lib/src/errors.rs b/lib/u_lib/src/errors.rs index 206e363..5f46197 100644 --- a/lib/u_lib/src/errors.rs +++ b/lib/u_lib/src/errors.rs @@ -35,6 +35,9 @@ pub enum UError { #[error("Error opening {0}: {1}")] FSError(String, String), + + #[error("Wrong auth token")] + WrongToken, } impl From for UError { diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index 1c128e7..4155cf1 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -1,4 +1,4 @@ -use crate::{models::ExecResult, utils::OneOrVec}; +use crate::{messaging::Reportable, utils::OneOrVec}; use futures::{future::BoxFuture, lock::Mutex}; use lazy_static::lazy_static; use std::collections::HashMap; @@ -9,7 +9,7 @@ use tokio::{ }; use uuid::Uuid; -pub type DynFut = BoxFuture<'static, ExecResult>; +pub type DynFut = BoxFuture<'static, Reportable>; lazy_static! { static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); @@ -21,7 +21,7 @@ lazy_static! { } struct JoinInfo { - handle: JoinHandle, + handle: JoinHandle, completed: bool, collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed } @@ -68,7 +68,7 @@ impl Waiter { /// Wait until a bunch of tasks is finished. /// NOT GUARANTEED that all tasks will be returned due to /// possibility to pop them in other places - pub async fn wait(self) -> Vec { + pub async fn wait(self) -> Vec { let mut result = vec![]; for fid in self.fids { if let Some(task) = pop_task(fid).await { @@ -94,7 +94,7 @@ async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } -pub async fn pop_task_if_completed(fid: Uuid) -> Option { +pub async fn pop_task_if_completed(fid: Uuid) -> Option { let &mut JoinInfo { handle: _, collectable, @@ -112,8 +112,8 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option { } } -pub async fn pop_completed() -> Vec { - let mut completed: Vec = vec![]; +pub async fn pop_completed() -> Vec { + let mut completed: Vec = vec![]; let fids = FUT_RESULTS .lock() .await diff --git a/lib/u_lib/src/messaging/mod.rs b/lib/u_lib/src/messaging/mod.rs index 12898ce..64485ae 100644 --- a/lib/u_lib/src/messaging/mod.rs +++ b/lib/u_lib/src/messaging/mod.rs @@ -11,7 +11,7 @@ use uuid::Uuid; impl AsMsg for Agent {} impl AsMsg for AssignedJob {} impl AsMsg for DownloadInfo {} -impl AsMsg for ExecResult {} +impl AsMsg for Reportable {} impl AsMsg for JobMeta {} impl AsMsg for String {} impl AsMsg for Uuid {} @@ -27,3 +27,11 @@ impl fmt::Display for Empty { write!(f, "") } } + +#[derive(Serialize, Deserialize, Clone, PartialEq)] +pub enum Reportable { + Assigned(AssignedJob), + Agent(Agent), + Dummy, + Error(String), +} diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 16a78db..bcc46e9 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -5,11 +5,8 @@ use std::{fmt, time::SystemTime}; use strum::Display; use crate::{ - builder::NamedJobBuilder, - models::{schema::*, ExecResult}, - unwrap_enum, - utils::systime_to_string, - UID, + builder::NamedJobBuilder, messaging::Reportable, models::schema::*, unwrap_enum, + utils::systime_to_string, UID, }; use guess_host_triple::guess_host_triple; @@ -54,8 +51,8 @@ pub struct Agent { impl fmt::Display for Agent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut out = format!("Agent: {}", self.id); - if self.alias.is_some() { - out += &format!(" ({})", self.alias.as_ref().unwrap()) + if let Some(ref alias) = self.alias { + out += &format!(" ({})", alias) } out += &format!("\nUsername: {}", self.username); out += &format!("\nHostname: {}", self.hostname); @@ -85,9 +82,9 @@ impl Agent { .unwrap_one() .wait() .await; - let decoder = |job_result: ExecResult| { - let assoc_job = unwrap_enum!(job_result, ExecResult::Assigned); - assoc_job.to_string_result().unwrap().trim().to_string() + let decoder = |job_result: Reportable| { + let assoc_job = unwrap_enum!(job_result, Reportable::Assigned); + assoc_job.to_string_result().trim().to_string() }; Self { @@ -99,9 +96,9 @@ impl Agent { } } - pub async fn run() -> ExecResult { + pub async fn run() -> Reportable { #[cfg(unix)] - ExecResult::Agent(Agent::gather().await) + Reportable::Agent(Agent::gather().await) } } diff --git a/lib/u_lib/src/models/errors.rs b/lib/u_lib/src/models/errors.rs new file mode 100644 index 0000000..a916dd0 --- /dev/null +++ b/lib/u_lib/src/models/errors.rs @@ -0,0 +1,35 @@ +use crate::models::schema::*; +use diesel::{Insertable, Queryable}; +use serde::{Deserialize, Serialize}; +use std::time::SystemTime; +use uuid::Uuid; + +#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Insertable, PartialEq)] +#[table_name = "errors"] +pub struct AgentError { + pub agent_id: Uuid, + pub created: SystemTime, + pub id: Uuid, + pub msg: String, +} + +impl AgentError { + pub fn from_msg(msg: impl Into, agent_id: Uuid) -> Self { + AgentError { + agent_id, + msg: msg.into(), + ..Default::default() + } + } +} + +impl Default for AgentError { + fn default() -> Self { + Self { + agent_id: Uuid::new_v4(), + created: SystemTime::now(), + id: Uuid::new_v4(), + msg: String::new(), + } + } +} diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index b11c50b..a4a0d35 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -1,13 +1,14 @@ use super::JobState; use crate::{ cache::JobCache, - models::{schema::*, ExecResult, JobOutput}, + messaging::Reportable, + models::{schema::*, JobOutput}, utils::{systime_to_string, TempFile}, UID, }; use diesel::{Identifiable, Insertable, Queryable}; use serde::{Deserialize, Serialize}; -use std::{fmt, process::Output, string::FromUtf8Error, time::SystemTime}; +use std::{fmt, process::Output, time::SystemTime}; use tokio::process::Command; use uuid::Uuid; @@ -40,20 +41,17 @@ impl fmt::Display for AssignedJob { let mut out = format!("Result: {}", self.id); out += &format!("\nAgent: {}", self.agent_id); out += &format!("\nJob: {}", self.job_id); - if self.alias.is_some() { - out += &format!("\nAlias: {}", self.alias.as_ref().unwrap()); + if let Some(ref alias) = self.alias { + out += &format!("\nAlias: {}", alias); } out += &format!("\nUpdated: {}", systime_to_string(&self.updated)); out += &format!("\nState: {}", self.state); if self.state == JobState::Finished { - if self.retcode.is_some() { - out += &format!("\nReturn code: {}", self.retcode.unwrap()); + if let Some(ref retcode) = self.retcode { + out += &format!("\nReturn code: {}", retcode); } - if self.result.is_some() { - out += &format!( - "\nResult: {}", - String::from_utf8_lossy(self.result.as_ref().unwrap()) - ); + if let Some(ref result) = self.result { + out += &format!("\nResult: {}", String::from_utf8_lossy(result)); } } write!(f, "{}", out) @@ -77,7 +75,7 @@ impl Default for AssignedJob { } impl AssignedJob { - pub async fn run(mut self) -> ExecResult { + pub async fn run(mut self) -> Reportable { let (argv, _payload) = { let meta = JobCache::get(&self.job_id).unwrap(); let extracted_payload = meta @@ -118,7 +116,7 @@ impl AssignedJob { self.retcode = retcode; self.updated = SystemTime::now(); self.state = JobState::Finished; - ExecResult::Assigned(self) + Reportable::Assigned(self) } pub fn new(job_id: Uuid, other: Option<&Self>) -> Self { @@ -145,7 +143,7 @@ impl AssignedJob { } } - pub fn to_string_result(&self) -> Result { - String::from_utf8(self.to_raw_result()) + pub fn to_string_result(&self) -> String { + String::from_utf8_lossy(&self.to_raw_result()).into_owned() } } diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index a0d9d8a..4cb07cc 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -33,32 +33,29 @@ impl JobMeta { impl fmt::Display for JobMeta { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut out = format!("Job: {}", self.id); - if self.alias.is_some() { - out += &format!(" ({})", self.alias.as_ref().unwrap()); + if let Some(ref alias) = self.alias { + out += &format!(" ({})", alias); } out += &format!("\nArgv: {}", self.argv); out += &format!("\nExecutable type: {}", self.exec_type); out += &format!("\nPlatform: {}", self.platform); - if self.exec_type == JobType::Shell && self.payload.is_some() { - let payload = self.payload.as_ref().unwrap(); - let (pld_len, large) = { - let pl = payload.len(); - if pl > 20 { - (20, true) - } else { - (pl, false) - } - }; - let pld_beginning = payload - .iter() - .take(pld_len) - .map(|u| *u) - .collect::>(); - out += &format!( - "\nPayload: {}{}", - String::from_utf8_lossy(&pld_beginning), - if large { "" } else { " <...>" } - ); + if let Some(ref payload) = self.payload { + if self.exec_type == JobType::Shell { + let (pld_len, large) = { + let pl = payload.len(); + if pl > 20 { + (20, true) + } else { + (pl, false) + } + }; + let pld_beginning = &payload[..pld_len]; + out += &format!( + "\nPayload: {}{}", + String::from_utf8_lossy(pld_beginning), + if large { "" } else { " <...>" } + ); + } } write!(f, "{}", out) } diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index d8092f3..bd44781 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -1,13 +1,6 @@ mod agent; -pub mod jobs; +mod errors; +mod jobs; pub mod schema; -pub use crate::models::{agent::*, jobs::*}; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Clone, PartialEq)] -pub enum ExecResult { - Assigned(AssignedJob), - Agent(Agent), - Dummy, -} +pub use crate::models::{agent::*, errors::*, jobs::*}; diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index be2edf1..83c1f59 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -26,6 +26,17 @@ table! { } } +table! { + use crate::schema_exports::*; + + errors (id) { + agent_id -> Uuid, + created -> Timestamp, + id -> Uuid, + msg -> Nullable, + } +} + table! { use crate::schema_exports::*; @@ -71,6 +82,7 @@ table! { } joinable!(certificates -> agents (agent_id)); +joinable!(errors -> agents (agent_id)); joinable!(ip_addrs -> agents (agent_id)); joinable!(results -> agents (agent_id)); joinable!(results -> jobs (job_id)); @@ -78,6 +90,7 @@ joinable!(results -> jobs (job_id)); allow_tables_to_appear_in_same_query!( agents, certificates, + errors, ip_addrs, jobs, results, diff --git a/lib/u_lib/src/utils/combined_result.rs b/lib/u_lib/src/utils/combined_result.rs index 4699ae6..2baf35f 100644 --- a/lib/u_lib/src/utils/combined_result.rs +++ b/lib/u_lib/src/utils/combined_result.rs @@ -30,6 +30,10 @@ impl CombinedResult { self.ok } + pub fn has_err(&self) -> bool { + self.err.len() > 0 + } + pub fn unwrap_one(self) -> T { self.unwrap().pop().unwrap() } diff --git a/migrations/2020-10-24-111622_create_all/down.sql b/migrations/2020-10-24-111622_create_all/down.sql index 3ded775..c3bfd8c 100644 --- a/migrations/2020-10-24-111622_create_all/down.sql +++ b/migrations/2020-10-24-111622_create_all/down.sql @@ -3,6 +3,7 @@ DROP TABLE results; DROP TABLE certificates; DROP TABLE jobs; DROP TABLE agents; +DROP TABLE errors; DROP TYPE IF EXISTS JobState; DROP TYPE IF EXISTS JobType; diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index ef956ac..ec2949f 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -66,4 +66,14 @@ CREATE TABLE IF NOT EXISTS certificates ( , is_revoked BOOLEAN NOT NULL DEFAULT FALSE , PRIMARY KEY(id) , FOREIGN KEY(agent_id) REFERENCES agents(id) +); + + +CREATE TABLE IF NOT EXISTS errors ( + agent_id UUID NOT NULL + , created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + , id UUID NOT NULL DEFAULT uuid_generate_v4() + , msg TEXT + , FOREIGN KEY(agent_id) REFERENCES agents(id) ON DELETE CASCADE + , PRIMARY KEY(id) ); \ No newline at end of file