diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index 286b3ac..a7405ab 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -7,7 +7,6 @@ extern crate log; use daemonize::Daemonize; -use std::panic; use std::sync::Arc; use tokio::time::{sleep, Duration}; use u_lib::{ @@ -22,7 +21,7 @@ pub async fn process_request(job_requests: Vec, client: &ClientHand if !job_requests.is_empty() { for jr in &job_requests { if !JobCache::contains(jr.job_id) { - debug!("Fetching job: {}", &jr.job_id); + info!("Fetching job: {}", &jr.job_id); let fetched_job = loop { match client.get_jobs(Some(jr.job_id)).await { Ok(mut result) => break result.pop().unwrap(), @@ -35,7 +34,7 @@ pub async fn process_request(job_requests: Vec, client: &ClientHand JobCache::insert(fetched_job); } } - debug!( + info!( "Scheduling jobs: {}", job_requests .iter() @@ -46,7 +45,9 @@ pub async fn process_request(job_requests: Vec, client: &ClientHand let mut builder = JobBuilder::from_request(job_requests); let errors = builder.pop_errors(); if !errors.is_empty() { - errors.into_iter().for_each(ErrChan::send) + for e in errors { + ErrChan::send(e, "ebld").await; + } } builder.unwrap_one().spawn().await; } @@ -54,32 +55,35 @@ pub async fn process_request(job_requests: Vec, client: &ClientHand async fn error_reporting(client: Arc) -> ! { loop { - let err = ErrChan::recv(); - debug!("Error encountered: {:?}", err); - 'retry: for _ in 0..3 { - match client.report(&[Reportable::Error(err.to_string())]).await { - Ok(_) => break 'retry, - Err(e) => { - debug!("Reporting error: {:?}", e); - sleep(Duration::from_secs(10)).await; + match ErrChan::recv().await { + Some(err) => { + 'retry: for _ in 0..3 { + match client.report(&[Reportable::Error(err.to_string())]).await { + Ok(_) => break 'retry, + Err(e) => { + debug!("Reporting error: {:?}", e); + sleep(Duration::from_secs(10)).await; + } + } } } + None => sleep(Duration::from_secs(3)).await, } } } async fn do_stuff(client: Arc) -> ! { loop { - match client.get_personal_jobs(Some(get_self_uid())).await { + match client.get_personal_jobs(get_self_uid()).await { Ok(resp) => { process_request(resp, &client).await; } - Err(err) => ErrChan::send(err), + Err(err) => ErrChan::send(err, "personal").await, } let result: Vec = pop_completed().await.into_iter().collect(); if !result.is_empty() { if let Err(err) = client.report(&result).await { - ErrChan::send(err) + ErrChan::send(err, "report").await; } } sleep(Duration::from_secs(ITERATION_LATENCY)).await; @@ -87,11 +91,12 @@ async fn do_stuff(client: Arc) -> ! { } pub async fn run_forever() -> ! { - panic::set_hook(Box::new(|panic_info| { - ErrChan::send(UError::Panic(panic_info.to_string())) - })); + let env = load_env_default().unwrap(); + let client = Arc::new(ClientHandler::new(&env.u_server, None)); + tokio::spawn(error_reporting(client.clone())); + if cfg!(debug_assertions) { - init_logger(format!( + init_logger(Some(format!( "u_agent-{}", get_self_uid() .hyphenated() @@ -99,14 +104,12 @@ pub async fn run_forever() -> ! { .split("-") .next() .unwrap() - )); + ))); } else { if let Err(e) = Daemonize::new().start() { - ErrChan::send(UError::Runtime(e.to_string())) + ErrChan::send(UError::Runtime(e.to_string()), "deeeemon").await } } - let env = load_env_default().unwrap(); - let client = Arc::new(ClientHandler::new(&env.u_server, None)); - tokio::spawn(error_reporting(client.clone())); + info!("Startup"); do_stuff(client).await } diff --git a/bin/u_panel/Cargo.toml b/bin/u_panel/Cargo.toml index 21456d0..4ba454a 100644 --- a/bin/u_panel/Cargo.toml +++ b/bin/u_panel/Cargo.toml @@ -10,7 +10,6 @@ edition = "2021" actix-web = "3.3.2" backtrace = "0.3.61" structopt = "0.3.21" -log = "^0.4" uuid = "0.6.5" serde_json = "1.0.4" serde = { version = "1.0.114", features = ["derive"] } diff --git a/bin/u_panel/src/argparse.rs b/bin/u_panel/src/argparse.rs index 371cb77..e89710b 100644 --- a/bin/u_panel/src/argparse.rs +++ b/bin/u_panel/src/argparse.rs @@ -1,3 +1,4 @@ +use anyhow::Result as AnyResult; use structopt::StructOpt; use u_lib::{ api::ClientHandler, @@ -76,10 +77,10 @@ fn parse_uuid(src: &str) -> Result { } pub async fn process_cmd(client: ClientHandler, args: Args) -> UResult { - fn to_json(data: UResult) -> String { + fn to_json(data: AnyResult) -> String { let data = match data { Ok(r) => PanelResult::Ok(r), - Err(e) => PanelResult::Err(e), + Err(e) => PanelResult::Err(e.downcast().expect("unknown error type")), }; serde_json::to_string(&data).unwrap() } diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index 413a0b7..0726a88 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -10,6 +10,7 @@ use argparse::{process_cmd, Args}; use serde::Deserialize; use structopt::StructOpt; use u_lib::api::ClientHandler; +use u_lib::logging::init_logger; use u_lib::utils::{env::default_host, load_env}; #[derive(Deserialize)] @@ -25,6 +26,8 @@ async fn main() -> AnyResult<()> { let client = ClientHandler::new(&env.u_server, Some(env.admin_auth_token)); let args = Args::from_args(); - process_cmd(client, args).await?; + init_logger(None::<&str>); + let result = process_cmd(client, args).await?; + println!("{result}"); Ok(()) } diff --git a/bin/u_panel/src/server/mod.rs b/bin/u_panel/src/server/mod.rs index f5a5c30..a427567 100644 --- a/bin/u_panel/src/server/mod.rs +++ b/bin/u_panel/src/server/mod.rs @@ -68,7 +68,7 @@ async fn send_cmd( #[actix_web::main] pub async fn serve(client: ClientHandler) -> std::io::Result<()> { - init_logger("u_panel"); + init_logger(Some("u_panel")); let addr = "127.0.0.1:8080"; info!("Serving at http://{}", addr); diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 41e4693..d2e5f75 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -5,7 +5,7 @@ name = "u_server" version = "0.1.0" [dependencies] -log = "0.4.11" +tracing = "0.1.35" thiserror = "*" warp = { version = "0.3.1", features = ["tls"] } uuid = { version = "0.6.5", features = ["serde", "v4"] } diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index efcb78a..41c2411 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -112,7 +112,7 @@ impl Endpoints { err.agent_id, Stripped(&err.msg.as_str()) ); - UDB::lock_db().report_error(&err)?; + //UDB::lock_db().report_error(&err)?; } Reportable::Dummy => (), } diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index 84d2139..77877ce 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -1,6 +1,11 @@ use u_server_lib::serve; +#[macro_use] +extern crate tracing; + #[tokio::main] -async fn main() -> Result<(), String> { - serve().await.map_err(|e| e.to_string()) +async fn main() { + if let Err(e) = serve().await { + error!("U_SERVER error: {}", e); + } } diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index d7ec8b3..406159c 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -1,5 +1,5 @@ #[macro_use] -extern crate log; +extern crate tracing; #[cfg(test)] #[macro_use] @@ -131,7 +131,7 @@ pub fn init_endpoints( .map(ok); let auth_token = format!("Bearer {auth_token}",).into_boxed_str(); - let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); + let auth_header = warp::header::exact("Authorization", Box::leak(auth_token)); let auth_zone = (get_agents .or(get_jobs) @@ -160,7 +160,7 @@ pub fn prefill_jobs() -> SResult<()> { } pub async fn serve() -> SResult<()> { - init_logger("u_server"); + init_logger(Some("u_server")); prefill_jobs()?; let env = load_env::().map_err(|e| Error::Other(e.to_string()))?; diff --git a/integration/Cargo.toml b/integration/Cargo.toml index 5e3f6fd..eda6095 100644 --- a/integration/Cargo.toml +++ b/integration/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] } -log = "^0.4" +tracing = "0.1.35" uuid = { version = "0.6.5", features = ["serde", "v4"] } reqwest = { version = "0.11", features = ["json"] } serde_json = "1.0" @@ -20,6 +20,7 @@ once_cell = "1.10.0" [dependencies.u_lib] path = "../lib/u_lib" version = "*" +features = ["panel"] [[test]] diff --git a/integration/docker-compose.yml b/integration/docker-compose.yml index bca0fdd..04bc87b 100644 --- a/integration/docker-compose.yml +++ b/integration/docker-compose.yml @@ -25,7 +25,7 @@ services: - ../.env - ../.env.private environment: - RUST_LOG: trace + RUST_LOG: info healthcheck: test: ss -tlpn | grep 63714 interval: 5s @@ -36,8 +36,8 @@ services: image: unki/u_db networks: - u_net - expose: - - '5432' + ports: + - 54321:5432 env_file: - ../.env - ../.env.private @@ -58,12 +58,14 @@ services: networks: - u_net volumes: - - ../target/x86_64-unknown-linux-musl/${PROFILE:-debug}/u_agent:/u_agent - command: /u_agent u_server + - ../target/x86_64-unknown-linux-musl/${PROFILE:-debug}/u_agent:/unki/u_agent + - ../logs:/unki/logs:rw + working_dir: /unki + command: /unki/u_agent u_server env_file: - ../.env environment: - RUST_LOG: u_agent=debug + RUST_LOG: u_agent=debug,u_lib=debug depends_on: u_server: condition: service_healthy @@ -78,6 +80,7 @@ services: - ../certs:/certs - ../target/x86_64-unknown-linux-musl/${PROFILE:-debug}/u_panel:/u_panel - ../lib/u_lib:/lib/u_lib + - ../logs:/tests/logs:rw working_dir: /tests/ depends_on: diff --git a/integration/tests/fixtures/agent.rs b/integration/tests/fixtures/agent.rs index 4160af7..b80366d 100644 --- a/integration/tests/fixtures/agent.rs +++ b/integration/tests/fixtures/agent.rs @@ -18,7 +18,7 @@ pub async fn register_agent() -> RegisteredAgent { let cli = ClientHandler::new(&ENV.u_server, None); let agent_uid = Uuid::new_v4(); let resp = cli - .get_personal_jobs(Some(agent_uid)) + .get_personal_jobs(agent_uid) .await .unwrap() .pop() diff --git a/integration/tests/helpers/panel.rs b/integration/tests/helpers/panel.rs index 526b028..d381898 100644 --- a/integration/tests/helpers/panel.rs +++ b/integration/tests/helpers/panel.rs @@ -13,11 +13,7 @@ pub struct Panel; impl Panel { fn run(args: &[&str]) -> Output { - Command::new(PANEL_BINARY) - .arg("--json") - .args(args) - .output() - .unwrap() + Command::new(PANEL_BINARY).args(args).output().unwrap() } pub fn output_argv(argv: &[&str]) -> PanelResult { diff --git a/integration/tests/integration/behaviour.rs b/integration/tests/integration/behaviour.rs index 6ff1bf3..6472505 100644 --- a/integration/tests/integration/behaviour.rs +++ b/integration/tests/integration/behaviour.rs @@ -2,6 +2,7 @@ use crate::fixtures::agent::*; use crate::helpers::Panel; use rstest::rstest; +use serde_json::{json, to_string}; use std::error::Error; use std::time::Duration; use tokio::time::sleep; @@ -14,7 +15,7 @@ type TestResult = Result>; #[tokio::test] async fn test_registration(#[future] register_agent: RegisteredAgent) -> TestResult { let agent = register_agent.await; - let agents: Vec = Panel::check_output("agents list"); + let agents: Vec = Panel::check_output("agents read"); let found = agents.iter().find(|v| v.id == agent.uid); assert!(found.is_some()); Panel::check_status(format!("agents delete {}", agent.uid)); @@ -23,17 +24,22 @@ async fn test_registration(#[future] register_agent: RegisteredAgent) -> TestRes #[tokio::test] async fn test_setup_tasks() -> TestResult { - //some independent agents should present - let agents: Vec = Panel::check_output("agents list"); - let agent_uid = agents[0].id; + let agents: Vec = Panel::check_output("agents read"); + let agent_uid = match agents.get(0) { + Some(a) => a.id, + None => panic!("Some independent agents should present"), + }; let job_alias = "passwd_contents"; - let cmd = format!("jobs add --alias {job_alias} 'cat /etc/passwd'"); + let job = json!( + {"alias": job_alias, "payload": b"cat /etc/passwd" } + ); + let cmd = format!("jobs create '{}'", to_string(&job).unwrap()); Panel::check_status(cmd); - let cmd = format!("map add {} {}", agent_uid, job_alias); + let cmd = format!("map create {} {}", agent_uid, job_alias); let assigned_uids: Vec = Panel::check_output(cmd); for _ in 0..3 { let result: Vec = - Panel::check_output(format!("map list {}", assigned_uids[0])); + Panel::check_output(format!("map read {}", assigned_uids[0])); if result[0].state == JobState::Finished { return Ok(()); } else { diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 88a2cc9..54f1c42 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -15,7 +15,6 @@ libc = "^0.2" lazy_static = "1.4.0" futures = "0.3.5" thiserror = "*" -log = "*" diesel-derive-enum = { version = "1", features = ["postgres"] } chrono = "0.4.19" strum = { version = "0.20", features = ["derive"] } @@ -27,6 +26,8 @@ envy = "0.4.2" serde_json = "1.0.81" tracing-subscriber = { version = "0.3.14", features = ["env-filter"] } tracing-appender = "0.2.2" +log = "*" +anyhow = "1.0.58" [features] panel = [] diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 7bc0e6d..f5c86fd 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -1,20 +1,23 @@ use std::collections::HashMap; +use std::fmt::Debug; use crate::{ config::MASTER_PORT, messaging::{self, AsMsg, BaseMessage, Empty}, - models::{self, Agent}, + models::{self}, utils::opt_to_string, - UError, UResult, + UError, }; +use anyhow::{Context, Result}; use reqwest::{header::HeaderMap, Certificate, Client, Identity, Url}; use serde::de::DeserializeOwned; +use serde_json::from_str; use uuid::Uuid; const AGENT_IDENTITY: &[u8] = include_bytes!("../../../certs/alice.p12"); const ROOT_CA_CERT: &[u8] = include_bytes!("../../../certs/ca.crt"); -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ClientHandler { base_url: Url, client: Client, @@ -43,50 +46,50 @@ impl ClientHandler { } } - async fn _req( + async fn _req( &self, - url: impl AsRef, + url: impl AsRef + Debug, payload: P, - ) -> UResult { + ) -> Result { let request = self .client .post(self.base_url.join(url.as_ref()).unwrap()) .json(&payload.as_message()); - let response = request.send().await?; + let response = request.send().await.context("send")?; + let content_len = response.content_length(); let is_success = match response.error_for_status_ref() { Ok(_) => Ok(()), Err(e) => Err(UError::from(e)), }; - let resp = response.text().await?; + let resp = response.text().await.context("resp")?; + debug!("url = {}, resp = {}", url.as_ref(), resp); match is_success { - Ok(_) => serde_json::from_str::>(&resp) + Ok(_) => from_str::>(&resp) .map(|msg| msg.into_inner()) - .or_else(|e| Err(UError::NetError(e.to_string(), resp.clone()))), + .or_else(|e| match content_len { + Some(0) => Ok(Default::default()), + _ => Err(UError::NetError(e.to_string(), resp)), + }), Err(UError::NetError(err, _)) => Err(UError::NetError(err, resp)), _ => unreachable!(), } + .map_err(From::from) } // get jobs for client - pub async fn get_personal_jobs( - &self, - url_param: Option, - ) -> UResult> { - self._req( - format!("get_personal_jobs/{}", opt_to_string(url_param)), - Empty, - ) - .await + pub async fn get_personal_jobs(&self, url_param: Uuid) -> Result> { + self._req(format!("get_personal_jobs/{}", url_param), Empty) + .await } // send something to server - pub async fn report(&self, payload: &[messaging::Reportable]) -> UResult { + pub async fn report(&self, payload: &[messaging::Reportable]) -> Result { self._req("report", payload).await } // download file - pub async fn dl(&self, file: String) -> UResult> { + pub async fn dl(&self, file: String) -> Result> { self._req(format!("dl/{file}"), Empty).await } } @@ -95,40 +98,40 @@ impl ClientHandler { #[cfg(feature = "panel")] impl ClientHandler { /// agent listing - pub async fn get_agents(&self, agent: Option) -> UResult> { + pub async fn get_agents(&self, agent: Option) -> Result> { self._req(format!("get_agents/{}", opt_to_string(agent)), Empty) .await } /// update something - pub async fn update_item(&self, item: impl AsMsg) -> UResult { + pub async fn update_item(&self, item: impl AsMsg + Debug) -> Result { self._req("update_item", item).await } /// get all available jobs - pub async fn get_jobs(&self, job: Option) -> UResult> { + pub async fn get_jobs(&self, job: Option) -> Result> { self._req(format!("get_jobs/{}", opt_to_string(job)), Empty) .await } /// create and upload job - pub async fn upload_jobs(&self, payload: &[models::JobMeta]) -> UResult { + pub async fn upload_jobs(&self, payload: &[models::JobMeta]) -> Result { self._req("upload_jobs", payload).await } /// delete something - pub async fn del(&self, item: Uuid) -> UResult { + pub async fn del(&self, item: Uuid) -> Result { self._req(format!("del/{item}"), Empty).await } /// set jobs for any agent - pub async fn set_jobs(&self, agent: Uuid, job_idents: &[String]) -> UResult> { + pub async fn set_jobs(&self, agent: Uuid, job_idents: &[String]) -> Result> { self._req(format!("set_jobs/{agent}"), job_idents).await } /// get jobs for any agent - pub async fn get_agent_jobs(&self, agent: Option) -> UResult> { - self._req(format!("set_jobs/{}", opt_to_string(agent)), Empty) + pub async fn get_agent_jobs(&self, agent: Option) -> Result> { + self._req(format!("get_personal_jobs/{}", opt_to_string(agent)), Empty) .await } } diff --git a/lib/u_lib/src/builder.rs b/lib/u_lib/src/builder.rs index 1c1f6ba..01b949d 100644 --- a/lib/u_lib/src/builder.rs +++ b/lib/u_lib/src/builder.rs @@ -17,7 +17,7 @@ impl JobBuilder { pub fn from_request(job_requests: impl OneOrVec) -> CombinedResult { let job_requests = job_requests.into_vec(); let mut prepared: Vec = vec![]; - let mut result = CombinedResult::::new(); + let mut result = CombinedResult::::new(); for req in job_requests { let job_meta = JobCache::get(req.job_id); if job_meta.is_none() { diff --git a/lib/u_lib/src/errors/chan.rs b/lib/u_lib/src/errors/chan.rs index 1c2b995..10668a4 100644 --- a/lib/u_lib/src/errors/chan.rs +++ b/lib/u_lib/src/errors/chan.rs @@ -1,9 +1,10 @@ -use crate::UError; -use crossbeam::channel::{self, Receiver, Sender}; +use anyhow::Error; use once_cell::sync::OnceCell; +use tokio::sync::mpsc::{channel, error::TryRecvError, Receiver, Sender}; +use tokio::sync::{Mutex, MutexGuard}; -type ChanError = UError; -static ERR_CHAN: OnceCell = OnceCell::new(); +type ChanError = Error; +static ERR_CHAN: OnceCell> = OnceCell::new(); pub struct ErrChan { tx: Sender, @@ -11,18 +12,27 @@ pub struct ErrChan { } impl ErrChan { - fn get() -> &'static Self { - ERR_CHAN.get_or_init(|| { - let (tx, rx) = channel::bounded(20); - Self { tx, rx } - }) + async fn get() -> MutexGuard<'static, Self> { + ERR_CHAN + .get_or_init(|| { + let (tx, rx) = channel(20); + Mutex::new(Self { tx, rx }) + }) + .lock() + .await } - pub fn send(msg: ChanError) { - Self::get().tx.send(msg).unwrap() + pub async fn send(err: impl Into, ctx: impl AsRef) { + let err = err.into(); + error!("Encountered an error at '{}': {:?}", ctx.as_ref(), err); + Self::get().await.tx.try_send(err).unwrap(); } - pub fn recv() -> ChanError { - Self::get().rx.recv().unwrap() + pub async fn recv() -> Option { + match Self::get().await.rx.try_recv() { + Ok(r) => Some(r), + Err(TryRecvError::Disconnected) => panic!("err chan disconnected"), + Err(TryRecvError::Empty) => None, + } } } diff --git a/lib/u_lib/src/logging.rs b/lib/u_lib/src/logging.rs index cbd7dac..7c4b991 100644 --- a/lib/u_lib/src/logging.rs +++ b/lib/u_lib/src/logging.rs @@ -2,15 +2,26 @@ use std::env; use std::path::Path; use tracing_appender::rolling; -use tracing_subscriber::EnvFilter; +use tracing_subscriber::{fmt, prelude::*, registry, EnvFilter}; -pub fn init_logger(logfile: impl AsRef + Send + Sync + 'static) { +pub fn init_logger(logfile: Option + Send + Sync + 'static>) { if env::var("RUST_LOG").is_err() { env::set_var("RUST_LOG", "info") } - tracing_subscriber::fmt::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .with_writer(move || rolling::never(".", logfile.as_ref().with_extension("log"))) - .init(); + let reg = registry() + .with(EnvFilter::from_default_env()) + .with(fmt::layer()); + match logfile { + Some(file) => reg + .with( + fmt::layer() + .with_writer(move || { + rolling::never("logs", file.as_ref().with_extension("log")) + }) + .with_ansi(false), + ) + .init(), + None => reg.init(), + }; } diff --git a/lib/u_lib/src/messaging/mod.rs b/lib/u_lib/src/messaging/mod.rs index 64485ae..e79ef34 100644 --- a/lib/u_lib/src/messaging/mod.rs +++ b/lib/u_lib/src/messaging/mod.rs @@ -28,7 +28,7 @@ impl fmt::Display for Empty { } } -#[derive(Serialize, Deserialize, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] pub enum Reportable { Assigned(AssignedJob), Agent(Agent), diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 6074c31..657be72 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -25,13 +25,20 @@ pub struct JobMeta { #[derive(Deserialize)] pub struct RawJobMeta { + #[serde(default)] pub alias: Option, + #[serde(default)] pub argv: String, + #[serde(default)] pub id: Uuid, + #[serde(default)] pub exec_type: JobType, //pub schedule: JobSchedule, + #[serde(default)] pub platform: String, + #[serde(default)] pub payload: Option>, + #[serde(default)] pub payload_path: Option, } diff --git a/lib/u_lib/src/utils/combined_result.rs b/lib/u_lib/src/utils/combined_result.rs index b9a7125..12dd4d6 100644 --- a/lib/u_lib/src/utils/combined_result.rs +++ b/lib/u_lib/src/utils/combined_result.rs @@ -1,9 +1,9 @@ use std::fmt::Debug; use crate::utils::OneOrVec; -use crate::UError; +use anyhow::Error; -pub struct CombinedResult { +pub struct CombinedResult { ok: Vec, err: Vec, } @@ -20,8 +20,13 @@ impl CombinedResult { self.ok.extend(result.into_vec()); } - pub fn err(&mut self, err: impl OneOrVec) { - self.err.extend(err.into_vec()); + pub fn err>(&mut self, err: impl OneOrVec) { + self.err.extend( + err.into_vec() + .into_iter() + .map(Into::into) + .collect::>(), + ); } pub fn unwrap(self) -> Vec { diff --git a/lib/u_lib/src/utils/fmt/stripped.rs b/lib/u_lib/src/utils/fmt/stripped.rs index b1ed949..21d0032 100644 --- a/lib/u_lib/src/utils/fmt/stripped.rs +++ b/lib/u_lib/src/utils/fmt/stripped.rs @@ -3,7 +3,7 @@ use std::iter::Iterator; use std::slice::Iter as SliceIter; use std::str::Chars; -const MAX_DATA_LEN: usize = 200; +const MAX_DATA_LEN: usize = 2000; pub trait Strippable { type Item: fmt::Display; diff --git a/lib/u_lib/src/utils/proc_output.rs b/lib/u_lib/src/utils/proc_output.rs index a9c4135..d060e89 100644 --- a/lib/u_lib/src/utils/proc_output.rs +++ b/lib/u_lib/src/utils/proc_output.rs @@ -108,7 +108,7 @@ impl ProcOutput { altered = true; } if !altered { - result.extend(b"No data"); + result.extend(b""); } result } diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index ec2949f..d319d65 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -4,7 +4,7 @@ CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished'); CREATE TYPE AgentState AS ENUM ('new', 'active', 'banned'); CREATE TABLE IF NOT EXISTS agents ( - alias TEXT UNIQUE + alias TEXT , hostname TEXT NOT NULL , id UUID NOT NULL DEFAULT uuid_generate_v4() , is_root BOOLEAN NOT NULL DEFAULT false @@ -34,11 +34,9 @@ CREATE TABLE IF NOT EXISTS ip_addrs ( ); CREATE TABLE IF NOT EXISTS jobs ( - alias TEXT UNIQUE + alias TEXT , argv TEXT NOT NULL , id UUID NOT NULL DEFAULT uuid_generate_v4() - -- Shell, Binary (with program download), - -- Python (with program and python download if not exist), Management , exec_type JobType NOT NULL DEFAULT 'shell' , platform TEXT NOT NULL , payload BYTEA @@ -47,7 +45,7 @@ CREATE TABLE IF NOT EXISTS jobs ( CREATE TABLE IF NOT EXISTS results ( agent_id UUID NOT NULL - , alias TEXT UNIQUE + , alias TEXT , created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , id UUID NOT NULL DEFAULT uuid_generate_v4() , job_id UUID NOT NULL