new makefile, add db table for errors, minor fixes

pull/1/head
plazmoid 3 years ago
parent 37ae67bd1c
commit 56bdb3bac7
  1. 26
      Makefile
  2. 48
      Makefile.toml
  3. 50
      bin/u_agent/src/lib.rs
  4. 16
      bin/u_panel/src/main.rs
  5. 10
      bin/u_server/src/db.rs
  6. 4
      bin/u_server/src/filters.rs
  7. 27
      bin/u_server/src/handlers.rs
  8. 6
      bin/u_server/src/u_server.rs
  9. 6
      integration/tests/behaviour.rs
  10. 6
      integration/tests/fixtures/agent.rs
  11. 2
      lib/u_lib/src/api.rs
  12. 36
      lib/u_lib/src/builder.rs
  13. 3
      lib/u_lib/src/errors.rs
  14. 14
      lib/u_lib/src/executor.rs
  15. 10
      lib/u_lib/src/messaging/mod.rs
  16. 21
      lib/u_lib/src/models/agent.rs
  17. 35
      lib/u_lib/src/models/errors.rs
  18. 28
      lib/u_lib/src/models/jobs/assigned.rs
  19. 41
      lib/u_lib/src/models/jobs/meta.rs
  20. 13
      lib/u_lib/src/models/mod.rs
  21. 13
      lib/u_lib/src/models/schema.rs
  22. 4
      lib/u_lib/src/utils/combined_result.rs
  23. 1
      migrations/2020-10-24-111622_create_all/down.sql
  24. 10
      migrations/2020-10-24-111622_create_all/up.sql

@ -1,26 +0,0 @@
.PHONY: _pre_build debug release run clean unit integration test
CARGO=./scripts/cargo_musl.sh
clean:
${CARGO} clean
_pre_build:
docker build -t unki/musllibs ./muslrust
debug: _pre_build
${CARGO} build
release: _pre_build
${CARGO} build --release
run: build
${CARGO} run
unit:
${CARGO} test --lib
integration:
cd ./integration && ./integration_tests.sh
test: unit integration

@ -0,0 +1,48 @@
[config]
default_to_workspace = false
[env]
CARGO = "./scripts/cargo_musl.sh"
[tasks.build_cargo_image]
script = "docker build -t unki/musllibs ./muslrust"
[tasks.clean]
command = "${CARGO}"
args = ["clean"]
[tasks.debug]
dependencies = ["build_cargo_image"]
command = "${CARGO}"
args = ["build"]
[tasks.release]
dependencies = ["build_cargo_image"]
command = "${CARGO}"
args = ["build", "--release"]
[tasks.run]
script = '''
echo "Only integration tests are supported."
exit 1
'''
[tasks.unit]
command = "${CARGO}"
args = ["test", "--lib", "${@}"]
[tasks.integration]
script = '''
cd ./integration
bash integration_tests.sh
'''
[tasks.gen_schema]
script = '''
cd ./integration
docker-compose up -d u_server
docker-compose down
'''
[tasks.test]
dependencies = ["unit", "integration"]

@ -14,32 +14,28 @@ use u_lib::{
builder::JobBuilder, builder::JobBuilder,
cache::JobCache, cache::JobCache,
executor::pop_completed, executor::pop_completed,
models::{AssignedJob, ExecResult}, messaging::Reportable,
models::AssignedJob,
UID, UID,
//daemonize //daemonize
}; };
#[macro_export] const ITERATION_LATENCY: u64 = 5;
macro_rules! retry_until_ok {
( $body:expr ) => {
loop {
match $body {
Ok(r) => break r,
Err(e) => error!("{:?}", e),
};
sleep(Duration::from_secs(5)).await;
}
};
}
pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) { pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) {
if job_requests.len() > 0 { if job_requests.len() > 0 {
for jr in &job_requests { for jr in &job_requests {
if !JobCache::contains(&jr.job_id) { if !JobCache::contains(&jr.job_id) {
info!("Fetching job: {}", &jr.job_id); info!("Fetching job: {}", &jr.job_id);
let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await) let fetched_job = loop {
.pop() match client.get_jobs(Some(jr.job_id)).await {
.unwrap(); Ok(mut result) => break result.pop().unwrap(),
Err(err) => {
error!("{:?} \nretrying...", err);
sleep(Duration::from_secs(ITERATION_LATENCY)).await;
}
}
};
JobCache::insert(fetched_job); JobCache::insert(fetched_job);
} }
} }
@ -71,16 +67,24 @@ pub async fn run_forever() {
//daemonize(); //daemonize();
env_logger::init(); env_logger::init();
let arg_ip = env::args().nth(1); let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip.as_deref()); let client = ClientHandler::new(arg_ip.as_deref());
info!("Connecting to the server"); info!("Connecting to the server");
loop { loop {
let job_requests: Vec<AssignedJob> = match client.get_personal_jobs(Some(*UID)).await {
retry_until_ok!(instance.get_personal_jobs(Some(*UID)).await).into_builtin_vec(); Ok(resp) => {
process_request(job_requests, &instance).await; let job_requests = resp.into_builtin_vec();
let result: Vec<ExecResult> = pop_completed().await.into_iter().collect(); process_request(job_requests, &client).await;
}
Err(err) => {
error!("{:?}", err);
}
}
let result: Vec<Reportable> = pop_completed().await.into_iter().collect();
if result.len() > 0 { if result.len() > 0 {
retry_until_ok!(instance.report(&result).await); if let Err(err) = client.report(&result).await {
error!("{:?}", err);
}
} }
sleep(Duration::from_secs(5)).await; sleep(Duration::from_secs(ITERATION_LATENCY)).await;
} }
} }

@ -1,9 +1,10 @@
use std::env; use std::env;
use std::fmt; use std::fmt;
use std::process;
use structopt::StructOpt; use structopt::StructOpt;
use u_lib::{ use u_lib::{
api::ClientHandler, datatypes::DataResult, messaging::AsMsg, models::JobMeta, utils::init_env, api::ClientHandler, datatypes::DataResult, messaging::AsMsg, models::JobMeta, utils::init_env,
UResult, UError, UResult,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -78,7 +79,7 @@ fn parse_uuid(src: &str) -> Result<Uuid, String> {
Uuid::parse_str(src).map_err(|e| e.to_string()) Uuid::parse_str(src).map_err(|e| e.to_string())
} }
async fn process_cmd(args: Args) { async fn process_cmd(args: Args) -> UResult<()> {
struct Printer { struct Printer {
json: bool, json: bool,
} }
@ -100,7 +101,7 @@ async fn process_cmd(args: Args) {
} }
} }
let token = env::var("ADMIN_AUTH_TOKEN").expect("Authentication token is not set"); let token = env::var("ADMIN_AUTH_TOKEN").map_err(|_| UError::WrongToken)?;
let cli_handler = ClientHandler::new(None).password(token); let cli_handler = ClientHandler::new(None).password(token);
let printer = Printer { json: args.json }; let printer = Printer { json: args.json };
match args.cmd { match args.cmd {
@ -117,8 +118,7 @@ async fn process_cmd(args: Args) {
let job = JobMeta::builder() let job = JobMeta::builder()
.with_shell(cmd.join(" ")) .with_shell(cmd.join(" "))
.with_alias(alias) .with_alias(alias)
.build() .build()?;
.unwrap();
printer.print(cli_handler.upload_jobs(&[job]).await); printer.print(cli_handler.upload_jobs(&[job]).await);
} }
JobALD::LD(LD::List { uid }) => printer.print(cli_handler.get_jobs(uid).await), JobALD::LD(LD::List { uid }) => printer.print(cli_handler.get_jobs(uid).await),
@ -133,11 +133,15 @@ async fn process_cmd(args: Args) {
JobMapALD::Delete { uid } => printer.print(cli_handler.del(Some(uid)).await), JobMapALD::Delete { uid } => printer.print(cli_handler.del(Some(uid)).await),
}, },
} }
Ok(())
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
init_env(); init_env();
let args: Args = Args::from_args(); let args: Args = Args::from_args();
process_cmd(args).await; if let Err(e) = process_cmd(args).await {
eprintln!("Error: {}", e);
process::exit(1)
}
} }

@ -5,7 +5,7 @@ use std::{
sync::{Arc, Mutex, MutexGuard}, sync::{Arc, Mutex, MutexGuard},
}; };
use u_lib::{ use u_lib::{
models::{schema, Agent, AssignedJob, JobMeta, JobState}, models::{schema, Agent, AgentError, AssignedJob, JobMeta, JobState},
ULocalError, ULocalResult, ULocalError, ULocalResult,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -37,6 +37,14 @@ impl UDB {
.unwrap() .unwrap()
} }
pub fn report_error(&self, error: &AgentError) -> ULocalResult<()> {
use schema::errors;
diesel::insert_into(errors::table)
.values(error)
.execute(&self.conn)?;
Ok(())
}
pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> { pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> {
use schema::jobs; use schema::jobs;
diesel::insert_into(jobs::table) diesel::insert_into(jobs::table)

@ -2,7 +2,7 @@ use crate::handlers::Endpoints;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use std::env; use std::env;
use u_lib::{ use u_lib::{
messaging::{AsMsg, BaseMessage}, messaging::{AsMsg, BaseMessage, Reportable},
models::*, models::*,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -68,7 +68,7 @@ pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection>
let report = warp::post() let report = warp::post()
.and(warp::path("report")) .and(warp::path("report"))
.and(get_content::<Vec<ExecResult>>().and_then(Endpoints::report)); .and(get_content::<Vec<Reportable>>().and_then(Endpoints::report));
let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str();
let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); let auth_header = warp::header::exact("authorization", Box::leak(auth_token));

@ -3,8 +3,9 @@ use diesel::SaveChangesDsl;
use hyper::Body; use hyper::Body;
use serde::Serialize; use serde::Serialize;
use u_lib::{ use u_lib::{
messaging::{AsMsg, BaseMessage}, messaging::{AsMsg, BaseMessage, Reportable},
models::{Agent, AgentState, AssignedJob, ExecResult, JobMeta, JobState}, models::*,
utils::OneOrVec,
ULocalError, ULocalError,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -13,15 +14,15 @@ use warp::{
Rejection, Reply, Rejection, Reply,
}; };
pub fn build_response<S: Into<Body>>(code: StatusCode, body: S) -> Response<Body> { pub fn build_response(code: StatusCode, body: impl Into<Body>) -> Response<Body> {
Response::builder().status(code).body(body.into()).unwrap() Response::builder().status(code).body(body.into()).unwrap()
} }
pub fn build_ok<S: Into<Body>>(body: S) -> Response<Body> { pub fn build_ok(body: impl Into<Body>) -> Response<Body> {
build_response(StatusCode::OK, body) build_response(StatusCode::OK, body)
} }
pub fn build_err<S: ToString>(body: S) -> Response<Body> { pub fn build_err(body: impl ToString) -> Response<Body> {
build_response(StatusCode::BAD_REQUEST, body.to_string()) build_response(StatusCode::BAD_REQUEST, body.to_string())
} }
@ -135,15 +136,15 @@ impl Endpoints {
} }
} }
pub async fn report( pub async fn report<Data: OneOrVec<Reportable> + AsMsg + 'static>(
msg: BaseMessage<'static, Vec<ExecResult>>, msg: BaseMessage<'static, Data>,
) -> Result<Response<Body>, Rejection> { ) -> Result<Response<Body>, Rejection> {
info!("hnd: report"); info!("hnd: report");
let id = msg.id; let id = msg.id;
let mut failed = vec![]; let mut failed = vec![];
for entry in msg.into_inner() { for entry in msg.into_inner().into_vec() {
match entry { match entry {
ExecResult::Assigned(res) => { Reportable::Assigned(res) => {
if id != res.agent_id { if id != res.agent_id {
continue; continue;
} }
@ -155,11 +156,15 @@ impl Endpoints {
failed.push(e.to_string()) failed.push(e.to_string())
} }
} }
ExecResult::Agent(mut a) => { Reportable::Agent(mut a) => {
a.state = AgentState::Active; a.state = AgentState::Active;
Self::add_agent(a).await?; Self::add_agent(a).await?;
} }
ExecResult::Dummy => (), Reportable::Error(e) => {
let err = AgentError::from_msg(e, id);
UDB::lock_db().report_error(&err).unwrap();
}
Reportable::Dummy => (),
} }
} }
if failed.len() > 0 { if failed.len() > 0 {

@ -77,7 +77,7 @@ mod tests {
use handlers::build_ok; use handlers::build_ok;
use mockall::predicate::*; use mockall::predicate::*;
use test_case::test_case; use test_case::test_case;
use u_lib::messaging::{AsMsg, BaseMessage}; use u_lib::messaging::{AsMsg, BaseMessage, Reportable};
use uuid::Uuid; use uuid::Uuid;
use warp::test::request; use warp::test::request;
@ -103,12 +103,12 @@ mod tests {
async fn test_report_unauth_successful() { async fn test_report_unauth_successful() {
let mock = Endpoints::report_context(); let mock = Endpoints::report_context();
mock.expect() mock.expect()
.withf(|msg: &BaseMessage<'_, Vec<ExecResult>>| msg.inner_ref()[0] == ExecResult::Dummy) .withf(|msg: &BaseMessage<'_, Vec<Reportable>>| msg.inner_ref()[0] == Reportable::Dummy)
.returning(|_| Ok(build_ok(""))); .returning(|_| Ok(build_ok("")));
request() request()
.path("/report/") .path("/report/")
.method("POST") .method("POST")
.json(&vec![ExecResult::Dummy].as_message()) .json(&vec![Reportable::Dummy].as_message())
.filter(&make_filters()) .filter(&make_filters())
.await .await
.unwrap(); .unwrap();

@ -3,9 +3,9 @@ use crate::helpers::Panel;
use rstest::rstest; use rstest::rstest;
use std::error::Error; use std::error::Error;
use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use u_lib::models::*; use tokio::time::sleep;
use u_lib::{messaging::Empty, models::*};
use uuid::Uuid; use uuid::Uuid;
type TestResult<R = ()> = Result<R, Box<dyn Error>>; type TestResult<R = ()> = Result<R, Box<dyn Error>>;
@ -38,7 +38,7 @@ async fn test_setup_tasks() -> TestResult {
if result[0].state == JobState::Finished { if result[0].state == JobState::Finished {
return Ok(()); return Ok(());
} else { } else {
sleep(Duration::from_secs(5)); sleep(Duration::from_secs(5)).await;
eprintln!("waiting for task"); eprintln!("waiting for task");
} }
} }

@ -1,4 +1,4 @@
use u_lib::{api::ClientHandler, models::*}; use u_lib::{api::ClientHandler, messaging::Reportable, models::*};
use uuid::Uuid; use uuid::Uuid;
pub struct RegisteredAgent { pub struct RegisteredAgent {
@ -29,8 +29,6 @@ pub async fn register_agent() -> RegisteredAgent {
id: agent_uid, id: agent_uid,
..Default::default() ..Default::default()
}; };
cli.report(&vec![ExecResult::Agent(agent_data)]) cli.report(&[Reportable::Agent(agent_data)]).await.unwrap();
.await
.unwrap();
RegisteredAgent { uid: agent_uid } RegisteredAgent { uid: agent_uid }
} }

@ -66,7 +66,7 @@ impl ClientHandler {
// //
// send something to server // send something to server
#[api_route("POST")] #[api_route("POST")]
async fn report<M: AsMsg>(&self, payload: &M) -> messaging::Empty {} async fn report(&self, payload: &[messaging::Reportable]) -> messaging::Empty {}
// //
// download file // download file
#[api_route("GET")] #[api_route("GET")]

@ -1,6 +1,7 @@
use crate::{ use crate::{
UError, UResult, cache::JobCache, executor::{Waiter, DynFut}, UError, UResult, cache::JobCache, executor::{Waiter, DynFut},
models::{Agent, AssignedJob, JobMeta, JobType, ExecResult}, models::{Agent, AssignedJob, JobMeta, JobType},
messaging::Reportable,
utils::{CombinedResult, OneOrVec} utils::{CombinedResult, OneOrVec}
}; };
use guess_host_triple::guess_host_triple; use guess_host_triple::guess_host_triple;
@ -72,12 +73,12 @@ impl JobBuilder {
} }
/// Spawn jobs and wait for result /// Spawn jobs and wait for result
pub async fn wait(self) -> Vec<ExecResult> { pub async fn wait(self) -> Vec<Reportable> {
self.waiter.spawn().await.wait().await self.waiter.spawn().await.wait().await
} }
/// Spawn one job and wait for result /// Spawn one job and wait for result
pub async fn wait_one(self) -> ExecResult { pub async fn wait_one(self) -> Reportable {
self.waiter.spawn().await.wait().await.pop().unwrap() self.waiter.spawn().await.wait().await.pop().unwrap()
} }
} }
@ -86,7 +87,7 @@ impl JobBuilder {
pub struct NamedJobBuilder { pub struct NamedJobBuilder {
builder: Option<JobBuilder>, builder: Option<JobBuilder>,
job_names: Vec<&'static str>, job_names: Vec<&'static str>,
results: HashMap<&'static str, ExecResult>, results: HashMap<&'static str, Reportable>,
} }
impl NamedJobBuilder { impl NamedJobBuilder {
@ -136,11 +137,11 @@ impl NamedJobBuilder {
self self
} }
pub fn pop_opt(&mut self, name: &'static str) -> Option<ExecResult> { pub fn pop_opt(&mut self, name: &'static str) -> Option<Reportable> {
self.results.remove(name) self.results.remove(name)
} }
pub fn pop(&mut self, name: &'static str) -> ExecResult { pub fn pop(&mut self, name: &'static str) -> Reportable {
self.pop_opt(name).unwrap() self.pop_opt(name).unwrap()
} }
} }
@ -148,13 +149,13 @@ impl NamedJobBuilder {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
use test_case::test_case; use test_case::test_case;
use std::{time::SystemTime}; use std::{time::SystemTime};
use crate::{ use crate::{
errors::UError, errors::UError,
models::{ models::{
jobs::{JobMeta}, JobMeta,
ExecResult,
misc::JobType misc::JobType
}, },
builder::{JobBuilder, NamedJobBuilder}, builder::{JobBuilder, NamedJobBuilder},
@ -192,7 +193,8 @@ mod tests {
TMPPATH=/tmp/lol TMPPATH=/tmp/lol
mkdir -p $TMPPATH mkdir -p $TMPPATH
echo test03 > $TMPPATH/t echo test03 > $TMPPATH/t
cat $TMPPATH/t"# cat $TMPPATH/t
rm -rf $TMPPATH"#
), ),
"test03" "test03"
;"sh multiline payload" ;"sh multiline payload"
@ -211,8 +213,8 @@ mod tests {
} }
let job = job.build().unwrap(); let job = job.build().unwrap();
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let result = unwrap_enum!(job_result, ExecResult::Assigned); let result = unwrap_enum!(job_result, Reportable::Assigned);
let result = result.to_string_result().unwrap(); let result = result.to_string_result();
assert_eq!(result.trim(), expected_result); assert_eq!(result.trim(), expected_result);
Ok(()) Ok(())
} }
@ -226,9 +228,9 @@ mod tests {
let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one() let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one()
.wait_one() .wait_one()
.await; .await;
let ls = unwrap_enum!(ls, ExecResult::Assigned); let ls = unwrap_enum!(ls, Reportable::Assigned);
assert_eq!(ls.retcode.unwrap(), 0); assert_eq!(ls.retcode.unwrap(), 0);
let folders = ls.to_string_result().unwrap(); let folders = ls.to_string_result();
let subfolders_jobs: Vec<JobMeta> = folders let subfolders_jobs: Vec<JobMeta> = folders
.lines() .lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) .map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap())
@ -238,7 +240,7 @@ mod tests {
.wait() .wait()
.await; .await;
for result in ls_subfolders { for result in ls_subfolders {
let result = unwrap_enum!(result, ExecResult::Assigned); let result = unwrap_enum!(result, Reportable::Assigned);
assert_eq!(result.retcode.unwrap(), 0); assert_eq!(result.retcode.unwrap(), 0);
} }
longest_job.wait().await; longest_job.wait().await;
@ -270,8 +272,8 @@ mod tests {
.unwrap_one() .unwrap_one()
.wait_one() .wait_one()
.await; .await;
let job_result = unwrap_enum!(job_result, ExecResult::Assigned); let job_result = unwrap_enum!(job_result, Reportable::Assigned);
let output = job_result.to_string_result().unwrap(); let output = job_result.to_string_result();
assert!(output.contains("No such file")); assert!(output.contains("No such file"));
assert!(job_result.retcode.is_none()); assert!(job_result.retcode.is_none());
Ok(()) Ok(())
@ -308,7 +310,7 @@ mod tests {
("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?) ("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?)
]).wait().await; ]).wait().await;
let gathered = jobs.pop("gatherer"); let gathered = jobs.pop("gatherer");
assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None); assert_eq!(unwrap_enum!(gathered, Reportable::Agent).alias, None);
Ok(()) Ok(())
} }

@ -35,6 +35,9 @@ pub enum UError {
#[error("Error opening {0}: {1}")] #[error("Error opening {0}: {1}")]
FSError(String, String), FSError(String, String),
#[error("Wrong auth token")]
WrongToken,
} }
impl From<ReqError> for UError { impl From<ReqError> for UError {

@ -1,4 +1,4 @@
use crate::{models::ExecResult, utils::OneOrVec}; use crate::{messaging::Reportable, utils::OneOrVec};
use futures::{future::BoxFuture, lock::Mutex}; use futures::{future::BoxFuture, lock::Mutex};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
@ -9,7 +9,7 @@ use tokio::{
}; };
use uuid::Uuid; use uuid::Uuid;
pub type DynFut = BoxFuture<'static, ExecResult>; pub type DynFut = BoxFuture<'static, Reportable>;
lazy_static! { lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
@ -21,7 +21,7 @@ lazy_static! {
} }
struct JoinInfo { struct JoinInfo {
handle: JoinHandle<ExecResult>, handle: JoinHandle<Reportable>,
completed: bool, completed: bool,
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
} }
@ -68,7 +68,7 @@ impl Waiter {
/// Wait until a bunch of tasks is finished. /// Wait until a bunch of tasks is finished.
/// NOT GUARANTEED that all tasks will be returned due to /// NOT GUARANTEED that all tasks will be returned due to
/// possibility to pop them in other places /// possibility to pop them in other places
pub async fn wait(self) -> Vec<ExecResult> { pub async fn wait(self) -> Vec<Reportable> {
let mut result = vec![]; let mut result = vec![];
for fid in self.fids { for fid in self.fids {
if let Some(task) = pop_task(fid).await { if let Some(task) = pop_task(fid).await {
@ -94,7 +94,7 @@ async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid) FUT_RESULTS.lock().await.remove(&fid)
} }
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> { pub async fn pop_task_if_completed(fid: Uuid) -> Option<Reportable> {
let &mut JoinInfo { let &mut JoinInfo {
handle: _, handle: _,
collectable, collectable,
@ -112,8 +112,8 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> {
} }
} }
pub async fn pop_completed() -> Vec<ExecResult> { pub async fn pop_completed() -> Vec<Reportable> {
let mut completed: Vec<ExecResult> = vec![]; let mut completed: Vec<Reportable> = vec![];
let fids = FUT_RESULTS let fids = FUT_RESULTS
.lock() .lock()
.await .await

@ -11,7 +11,7 @@ use uuid::Uuid;
impl AsMsg for Agent {} impl AsMsg for Agent {}
impl AsMsg for AssignedJob {} impl AsMsg for AssignedJob {}
impl AsMsg for DownloadInfo {} impl AsMsg for DownloadInfo {}
impl AsMsg for ExecResult {} impl AsMsg for Reportable {}
impl AsMsg for JobMeta {} impl AsMsg for JobMeta {}
impl AsMsg for String {} impl AsMsg for String {}
impl AsMsg for Uuid {} impl AsMsg for Uuid {}
@ -27,3 +27,11 @@ impl fmt::Display for Empty {
write!(f, "<empty>") write!(f, "<empty>")
} }
} }
#[derive(Serialize, Deserialize, Clone, PartialEq)]
pub enum Reportable {
Assigned(AssignedJob),
Agent(Agent),
Dummy,
Error(String),
}

@ -5,11 +5,8 @@ use std::{fmt, time::SystemTime};
use strum::Display; use strum::Display;
use crate::{ use crate::{
builder::NamedJobBuilder, builder::NamedJobBuilder, messaging::Reportable, models::schema::*, unwrap_enum,
models::{schema::*, ExecResult}, utils::systime_to_string, UID,
unwrap_enum,
utils::systime_to_string,
UID,
}; };
use guess_host_triple::guess_host_triple; use guess_host_triple::guess_host_triple;
@ -54,8 +51,8 @@ pub struct Agent {
impl fmt::Display for Agent { impl fmt::Display for Agent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Agent: {}", self.id); let mut out = format!("Agent: {}", self.id);
if self.alias.is_some() { if let Some(ref alias) = self.alias {
out += &format!(" ({})", self.alias.as_ref().unwrap()) out += &format!(" ({})", alias)
} }
out += &format!("\nUsername: {}", self.username); out += &format!("\nUsername: {}", self.username);
out += &format!("\nHostname: {}", self.hostname); out += &format!("\nHostname: {}", self.hostname);
@ -85,9 +82,9 @@ impl Agent {
.unwrap_one() .unwrap_one()
.wait() .wait()
.await; .await;
let decoder = |job_result: ExecResult| { let decoder = |job_result: Reportable| {
let assoc_job = unwrap_enum!(job_result, ExecResult::Assigned); let assoc_job = unwrap_enum!(job_result, Reportable::Assigned);
assoc_job.to_string_result().unwrap().trim().to_string() assoc_job.to_string_result().trim().to_string()
}; };
Self { Self {
@ -99,9 +96,9 @@ impl Agent {
} }
} }
pub async fn run() -> ExecResult { pub async fn run() -> Reportable {
#[cfg(unix)] #[cfg(unix)]
ExecResult::Agent(Agent::gather().await) Reportable::Agent(Agent::gather().await)
} }
} }

@ -0,0 +1,35 @@
use crate::models::schema::*;
use diesel::{Insertable, Queryable};
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Insertable, PartialEq)]
#[table_name = "errors"]
pub struct AgentError {
pub agent_id: Uuid,
pub created: SystemTime,
pub id: Uuid,
pub msg: String,
}
impl AgentError {
pub fn from_msg(msg: impl Into<String>, agent_id: Uuid) -> Self {
AgentError {
agent_id,
msg: msg.into(),
..Default::default()
}
}
}
impl Default for AgentError {
fn default() -> Self {
Self {
agent_id: Uuid::new_v4(),
created: SystemTime::now(),
id: Uuid::new_v4(),
msg: String::new(),
}
}
}

@ -1,13 +1,14 @@
use super::JobState; use super::JobState;
use crate::{ use crate::{
cache::JobCache, cache::JobCache,
models::{schema::*, ExecResult, JobOutput}, messaging::Reportable,
models::{schema::*, JobOutput},
utils::{systime_to_string, TempFile}, utils::{systime_to_string, TempFile},
UID, UID,
}; };
use diesel::{Identifiable, Insertable, Queryable}; use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt, process::Output, string::FromUtf8Error, time::SystemTime}; use std::{fmt, process::Output, time::SystemTime};
use tokio::process::Command; use tokio::process::Command;
use uuid::Uuid; use uuid::Uuid;
@ -40,20 +41,17 @@ impl fmt::Display for AssignedJob {
let mut out = format!("Result: {}", self.id); let mut out = format!("Result: {}", self.id);
out += &format!("\nAgent: {}", self.agent_id); out += &format!("\nAgent: {}", self.agent_id);
out += &format!("\nJob: {}", self.job_id); out += &format!("\nJob: {}", self.job_id);
if self.alias.is_some() { if let Some(ref alias) = self.alias {
out += &format!("\nAlias: {}", self.alias.as_ref().unwrap()); out += &format!("\nAlias: {}", alias);
} }
out += &format!("\nUpdated: {}", systime_to_string(&self.updated)); out += &format!("\nUpdated: {}", systime_to_string(&self.updated));
out += &format!("\nState: {}", self.state); out += &format!("\nState: {}", self.state);
if self.state == JobState::Finished { if self.state == JobState::Finished {
if self.retcode.is_some() { if let Some(ref retcode) = self.retcode {
out += &format!("\nReturn code: {}", self.retcode.unwrap()); out += &format!("\nReturn code: {}", retcode);
} }
if self.result.is_some() { if let Some(ref result) = self.result {
out += &format!( out += &format!("\nResult: {}", String::from_utf8_lossy(result));
"\nResult: {}",
String::from_utf8_lossy(self.result.as_ref().unwrap())
);
} }
} }
write!(f, "{}", out) write!(f, "{}", out)
@ -77,7 +75,7 @@ impl Default for AssignedJob {
} }
impl AssignedJob { impl AssignedJob {
pub async fn run(mut self) -> ExecResult { pub async fn run(mut self) -> Reportable {
let (argv, _payload) = { let (argv, _payload) = {
let meta = JobCache::get(&self.job_id).unwrap(); let meta = JobCache::get(&self.job_id).unwrap();
let extracted_payload = meta let extracted_payload = meta
@ -118,7 +116,7 @@ impl AssignedJob {
self.retcode = retcode; self.retcode = retcode;
self.updated = SystemTime::now(); self.updated = SystemTime::now();
self.state = JobState::Finished; self.state = JobState::Finished;
ExecResult::Assigned(self) Reportable::Assigned(self)
} }
pub fn new(job_id: Uuid, other: Option<&Self>) -> Self { pub fn new(job_id: Uuid, other: Option<&Self>) -> Self {
@ -145,7 +143,7 @@ impl AssignedJob {
} }
} }
pub fn to_string_result(&self) -> Result<String, FromUtf8Error> { pub fn to_string_result(&self) -> String {
String::from_utf8(self.to_raw_result()) String::from_utf8_lossy(&self.to_raw_result()).into_owned()
} }
} }

@ -33,32 +33,29 @@ impl JobMeta {
impl fmt::Display for JobMeta { impl fmt::Display for JobMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Job: {}", self.id); let mut out = format!("Job: {}", self.id);
if self.alias.is_some() { if let Some(ref alias) = self.alias {
out += &format!(" ({})", self.alias.as_ref().unwrap()); out += &format!(" ({})", alias);
} }
out += &format!("\nArgv: {}", self.argv); out += &format!("\nArgv: {}", self.argv);
out += &format!("\nExecutable type: {}", self.exec_type); out += &format!("\nExecutable type: {}", self.exec_type);
out += &format!("\nPlatform: {}", self.platform); out += &format!("\nPlatform: {}", self.platform);
if self.exec_type == JobType::Shell && self.payload.is_some() { if let Some(ref payload) = self.payload {
let payload = self.payload.as_ref().unwrap(); if self.exec_type == JobType::Shell {
let (pld_len, large) = { let (pld_len, large) = {
let pl = payload.len(); let pl = payload.len();
if pl > 20 { if pl > 20 {
(20, true) (20, true)
} else { } else {
(pl, false) (pl, false)
} }
}; };
let pld_beginning = payload let pld_beginning = &payload[..pld_len];
.iter() out += &format!(
.take(pld_len) "\nPayload: {}{}",
.map(|u| *u) String::from_utf8_lossy(pld_beginning),
.collect::<Vec<u8>>(); if large { "" } else { " <...>" }
out += &format!( );
"\nPayload: {}{}", }
String::from_utf8_lossy(&pld_beginning),
if large { "" } else { " <...>" }
);
} }
write!(f, "{}", out) write!(f, "{}", out)
} }

@ -1,13 +1,6 @@
mod agent; mod agent;
pub mod jobs; mod errors;
mod jobs;
pub mod schema; pub mod schema;
pub use crate::models::{agent::*, jobs::*}; pub use crate::models::{agent::*, errors::*, jobs::*};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, PartialEq)]
pub enum ExecResult {
Assigned(AssignedJob),
Agent(Agent),
Dummy,
}

@ -26,6 +26,17 @@ table! {
} }
} }
table! {
use crate::schema_exports::*;
errors (id) {
agent_id -> Uuid,
created -> Timestamp,
id -> Uuid,
msg -> Nullable<Text>,
}
}
table! { table! {
use crate::schema_exports::*; use crate::schema_exports::*;
@ -71,6 +82,7 @@ table! {
} }
joinable!(certificates -> agents (agent_id)); joinable!(certificates -> agents (agent_id));
joinable!(errors -> agents (agent_id));
joinable!(ip_addrs -> agents (agent_id)); joinable!(ip_addrs -> agents (agent_id));
joinable!(results -> agents (agent_id)); joinable!(results -> agents (agent_id));
joinable!(results -> jobs (job_id)); joinable!(results -> jobs (job_id));
@ -78,6 +90,7 @@ joinable!(results -> jobs (job_id));
allow_tables_to_appear_in_same_query!( allow_tables_to_appear_in_same_query!(
agents, agents,
certificates, certificates,
errors,
ip_addrs, ip_addrs,
jobs, jobs,
results, results,

@ -30,6 +30,10 @@ impl<T, E> CombinedResult<T, E> {
self.ok self.ok
} }
pub fn has_err(&self) -> bool {
self.err.len() > 0
}
pub fn unwrap_one(self) -> T { pub fn unwrap_one(self) -> T {
self.unwrap().pop().unwrap() self.unwrap().pop().unwrap()
} }

@ -3,6 +3,7 @@ DROP TABLE results;
DROP TABLE certificates; DROP TABLE certificates;
DROP TABLE jobs; DROP TABLE jobs;
DROP TABLE agents; DROP TABLE agents;
DROP TABLE errors;
DROP TYPE IF EXISTS JobState; DROP TYPE IF EXISTS JobState;
DROP TYPE IF EXISTS JobType; DROP TYPE IF EXISTS JobType;

@ -66,4 +66,14 @@ CREATE TABLE IF NOT EXISTS certificates (
, is_revoked BOOLEAN NOT NULL DEFAULT FALSE , is_revoked BOOLEAN NOT NULL DEFAULT FALSE
, PRIMARY KEY(id) , PRIMARY KEY(id)
, FOREIGN KEY(agent_id) REFERENCES agents(id) , FOREIGN KEY(agent_id) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS errors (
agent_id UUID NOT NULL
, created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, id UUID NOT NULL DEFAULT uuid_generate_v4()
, msg TEXT
, FOREIGN KEY(agent_id) REFERENCES agents(id) ON DELETE CASCADE
, PRIMARY KEY(id)
); );
Loading…
Cancel
Save