From aedd8f774c10c90c3f4e825e569703f9a2adf199 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Tue, 12 Jan 2021 12:29:28 +0500 Subject: [PATCH] exequteor v2 --- diesel.toml | 1 + lib/u_lib/Cargo.toml | 2 +- lib/u_lib/src/executor.rs | 56 ++++-- lib/u_lib/src/models/agent.rs | 22 +-- lib/u_lib/src/models/jobs.rs | 182 +++++++++++------- .../2020-10-24-111622_create_all/down.sql | 9 +- .../2020-10-24-111622_create_all/up.sql | 15 +- 7 files changed, 177 insertions(+), 110 deletions(-) diff --git a/diesel.toml b/diesel.toml index 09a260f..6693302 100644 --- a/diesel.toml +++ b/diesel.toml @@ -3,3 +3,4 @@ [print_schema] file = "lib/u_lib/src/models/schema.rs" +import_types = ["diesel::sql_types::*", "crate::*"] \ No newline at end of file diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 0777a37..de5f1be 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -17,7 +17,7 @@ reqwest = { version = "0.10.7", features = ["json"] } futures = "0.3.5" guess_host_triple = "0.1.2" thiserror = "*" -async-trait = "*" +diesel-derive-enum = { version = "1", features = ["postgres"] } [dependencies.diesel] version = "1.4.5" diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index 8ba1880..ad8f5c0 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -8,73 +8,91 @@ use std::collections::HashMap; use std::pin::Pin; use std::thread::sleep; use std::time::{Duration, Instant}; -use std::slice::Iter; use std::task::Poll; use tokio::process::Command; -use futures::{lock::Mutex, prelude::*}; +use futures::{lock::Mutex, prelude::*, poll}; use lazy_static::lazy_static; use tokio::{prelude::*, spawn, task::JoinHandle}; use uuid::Uuid; pub type FutRes = UResult; -type BoxFut = Pin + Send + 'static>>; lazy_static! { static ref FUT_RESULTS: Mutex>> = Mutex::new(HashMap::new()); } +//TODO: waiter struct +pub async fn append_task(task: impl Future + Send + 'static) -> Uuid { + let fid = Uuid::new_v4(); + let result = spawn(Box::pin(task)); + FUT_RESULTS.lock().await.insert(fid, result); + fid +} + pub async fn append_tasks(tasks: Vec + Send + 'static>) -> Vec { let mut fids = Vec::::new(); for f in tasks.into_iter() { - let fid = Uuid::new_v4(); + let fid = append_task(f).await; fids.push(fid); - let result = spawn(Box::pin(f)); - FUT_RESULTS.lock().await.insert(fid, result); } fids } -pub async fn append_task(task: impl Future + Send + 'static) -> Uuid { - append_tasks(vec![Box::pin(task)]).await[0] -} - pub async fn pop_task(fid: Uuid) -> JoinHandle { FUT_RESULTS.lock().await.remove(&fid).expect(&UError::NoTask(fid).to_string()) } -async fn task_present(fid: Uuid) -> bool { +pub async fn task_present(fid: Uuid) -> bool { FUT_RESULTS.lock().await.get(&fid).is_some() } pub async fn pop_task_if_completed(fid: Uuid) -> Option{ - let task = FUT_RESULTS + let mut tasks = FUT_RESULTS .lock() - .await + .await; + let task = tasks .get_mut(&fid) .expect(&UError::NoTask(fid).to_string()); - let status = match task.poll() { + let status = match poll!(task) { Poll::Pending => None, Poll::Ready(r) => Some(r.unwrap()) }; if status.is_some() { - pop_task(fid); + pop_task(fid).await; } status } pub async fn pop_completed(fids: Option>) -> Vec> { - match fids { + let fids = match fids { Some(v) => v, None => FUT_RESULTS.lock() .await .keys() .map(|k| *k) .collect::>() - }.into_iter().map(pop_task_if_completed).collect() + }; + let mut completed: Vec> = vec![]; + for fid in fids { + completed.push(pop_task_if_completed(fid).await) + } + completed +} + +pub async fn wait_for_task(fid: Uuid) -> FutRes { + pop_task(fid).await.await.unwrap() +} + +pub async fn wait_for_tasks(fids: Vec) -> Vec { + let mut results = vec![]; + for fid in fids { + results.push(wait_for_task(fid).await); + } + results } pub async fn run_until_complete(task: impl Future + Send + 'static) -> FutRes { - let task_fid = append_task(task).await; - pop_task(task_fid).await.await.unwrap() + let fid = append_task(task).await; + wait_for_task(fid).await } \ No newline at end of file diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index ec1697e..3bfcb1d 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -3,7 +3,6 @@ use serde::{ Deserialize }; use std::time::SystemTime; -use std::collections::HashMap; use diesel::{ Queryable, Identifiable, @@ -16,7 +15,7 @@ use crate::{ utils::vec_to_string, models::schema::*, executor::*, - jobs::create_job + jobs::exec_job }; use guess_host_triple::guess_host_triple; @@ -55,29 +54,24 @@ pub struct IAgent { pub async fn gather() -> IAgent { - - async fn run_cmd_fast(cmd: String) -> String { - let job_result = exec_job(JobMeta::from_shell(cmd)).await; - let job_result = match job_result.unwrap().result.unwrap() { - Ok(output) => output.multiline(), - Err(e) => e.to_string() - }; - JobOutput::from_multiline(&job_result) + async fn run_cmd_fast>(cmd: S) -> String { + let job_result = exec_job(JobMeta::from_shell(cmd)).await.unwrap().result.unwrap(); + JobOutput::from_raw(&job_result) .map(|o| vec_to_string(&o.into_appropriate())) - .unwrap_or(job_result) + .unwrap_or(String::from_utf8_lossy(&job_result).to_string()) } #[cfg(unix)] IAgent { alias: None, id: UID.clone(), - hostname: run_cmd_fast("hostname".to_string()).await, - is_root: &run_cmd_fast("id -u".to_string()).await == "0", + hostname: run_cmd_fast("hostname").await, + is_root: &run_cmd_fast("id -u").await == "0", is_root_allowed: false, //TODO platform: guess_host_triple().unwrap_or("Error").to_string(), status: None, //TODO token: None, //TODO - username: run_cmd_fast("id -un".to_string()).await, + username: run_cmd_fast("id -un").await, } } diff --git a/lib/u_lib/src/models/jobs.rs b/lib/u_lib/src/models/jobs.rs index 3420587..00c7f17 100644 --- a/lib/u_lib/src/models/jobs.rs +++ b/lib/u_lib/src/models/jobs.rs @@ -10,10 +10,13 @@ use serde::{ use uuid::Uuid; use guess_host_triple::guess_host_triple; use tokio::process::Command; -use crate::{models::schema::*, UError, UResult, UID, run_until_complete}; - -use async_trait::async_trait; -//pub type JobMetaRef = Arc>; +use crate::{models::schema::*, UError, UResult, UID, run_until_complete, append_tasks, append_task, wait_for_tasks}; +use diesel_derive_enum::DbEnum; +use diesel::{ + Queryable, + Identifiable, + Insertable +}; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -31,16 +34,16 @@ pub enum JobSchedule { //TODO: Scheduled } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub enum JobState { +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum)] +pub enum Jobstate { Queued, // server created a job, but client didn't get it yet - Pending, // client got a job, but not running yet + //Pending, // client got a job, but not running yet Running, // client is currently running a job -// Rerunning, // if job is cycled Finished, } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum)] +#[PgType = "JobType"] pub enum JobType { Manage, Shell, @@ -49,7 +52,7 @@ pub enum JobType { Dummy } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Clone, Debug)] pub struct JobOutput { pub stdout: Vec, pub stderr: Vec, @@ -85,23 +88,24 @@ impl JobOutput { self } - pub fn multiline(&self) -> String { - let mut result = String::new(); + pub fn multiline(&self) -> Vec { + let mut result: Vec = vec![]; if self.stdout.len() > 0 { - result += &format!("{stdout_head}{stdout}\n", - stdout_head = JobOutput::create_delim(JobOutput::STDOUT), - stdout = String::from_utf8_lossy(&self.stdout)) + result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes()); + result.extend(&self.stdout); + result.push(b'\n'); } if self.stderr.len() > 0 { - result += &format!("{stderr_head}{stderr}\n", - stderr_head = JobOutput::create_delim(JobOutput::STDERR), - stderr = String::from_utf8_lossy(&self.stderr)) + result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes()); + result.extend(&self.stderr); + result.push(b'\n'); } result } - pub fn from_multiline(raw: &String) -> Option { + pub fn from_raw(raw: &[u8]) -> Option { + let raw = String::from_utf8_lossy(raw); let err_header = JobOutput::create_delim(JobOutput::STDERR); raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) .map(|s: &str| { @@ -121,29 +125,31 @@ impl JobOutput { } else if self.stderr.len() > 0 { self.stderr } else { - UError::Raw("No data").to_string().as_bytes().to_vec() + UError::Raw("No data").to_string().into_bytes() } } } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Identifiable)] +#[table_name = "jobs"] pub struct JobMeta { - pub id: Uuid, pub alias: String, + pub id: Uuid, pub exec_type: JobType, - pub schedule: JobSchedule, + //pub schedule: JobSchedule, pub platform: String, pub payload: Option>>, } impl JobMeta { - pub fn from_shell(shell_cmd: String) -> Self { + pub fn from_shell>(shell_cmd: S) -> Self { + let shell_cmd = shell_cmd.into(); let job_name = shell_cmd.split(" ").nth(0).unwrap(); Self { id: Uuid::new_v4(), alias: job_name.to_string(), exec_type: JobType::Shell, - schedule: JobSchedule::Once, + //schedule: JobSchedule::Once, platform: guess_host_triple().unwrap_or("unknown").to_string(), payload: Some(Box::new(shell_cmd.into_bytes())) } @@ -151,27 +157,23 @@ impl JobMeta { } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Insertable)] +#[table_name = "results"] pub struct JobResult { pub agent_id: Uuid, pub job_id: Uuid, - pub result: Option>, - pub state: JobState, + pub result: Option>, +// pub state: Jobstate, pub retcode: Option, - pub timestamp: SystemTime, + pub ts: SystemTime, } impl JobResult { - pub fn from_meta(meta: &JobMeta) -> Self { - let job_id = meta.id.clone(); - JobResult { - agent_id: *UID, - job_id, - state: JobState::Running, - result: None, - retcode: None, - timestamp: SystemTime::now() - } + pub fn from_meta(job_id: Uuid) -> Self { + let mut inst = JobResult::default(); + inst.agent_id = *UID; + inst.job_id = job_id; + inst } } @@ -181,9 +183,9 @@ impl Default for JobResult { agent_id: Uuid::nil(), job_id: Uuid::nil(), result: None, - state: JobState::Running, +// state: Jobstate::Running, retcode: None, - timestamp: SystemTime::now() + ts: SystemTime::now() } } } @@ -205,7 +207,7 @@ impl Job { Ok(Self { exec_type: job_meta.exec_type, payload: job_meta.payload, - result: JobResult::from_meta(&job_meta) + result: JobResult::from_meta(job_meta.id.clone()) }) }, _ => todo!() @@ -213,7 +215,7 @@ impl Job { } async fn run(mut self) -> UResult { - match job_meta.exec_type { + match self.exec_type { JobType::Shell => { let str_payload = match &self.payload { Some(box_payload) => { @@ -235,23 +237,24 @@ impl Job { let (data, retcode) = match cmd_result { Ok(output) => { ( - Some(Ok(JobOutput::new() + Some(JobOutput::new() .stdout(output.stdout.to_vec()) - .stderr(output.stderr.to_vec())) + .stderr(output.stderr.to_vec()) + .multiline() ), output.status.code() ) } Err(e) => { ( - Some(Err(UError::JobError(e.to_string()))), + Some(UError::JobError(e.to_string()).to_string().into_bytes()), None ) } }; self.result.result = data; self.result.retcode = retcode; - self.result.timestamp = SystemTime::now(); + self.result.ts = SystemTime::now(); }, _ => todo!() } @@ -259,47 +262,92 @@ impl Job { } } +pub async fn exec_jobs(jobs: Vec) -> Vec> { + let fids = exec_jobs_nowait(jobs).await.unwrap(); + wait_for_tasks(fids).await +} + +pub async fn exec_jobs_nowait(jobs: Vec) -> UResult> { + let prepared_jobs = jobs.into_iter().map(|job| { + let j = Job::build(job).unwrap(); + j.run() + }).collect(); + let fids = append_tasks(prepared_jobs).await; + Ok(fids) +} + pub async fn exec_job(job_meta: JobMeta) -> UResult { - let job = create_job(job_meta); + let job = Job::build(job_meta)?; run_until_complete(job.run()).await } +pub async fn exec_job_nowait(job_meta: JobMeta) -> UResult { + let job = Job::build(job_meta)?; + let fid = append_task(job.run()).await; + Ok(fid) +} + #[cfg(test)] mod tests { use super::*; - use crate::{ - send_jobs_to_executor, - utils::vec_to_string - }; + use crate::{exec_job, utils::vec_to_string, wait_for_task, append_task}; #[tokio::test] async fn test_is_really_async() { - let secs_to_sleep = 1; - let job = JobMeta::from_shell_arc(format!("sleep {}", secs_to_sleep)); + const SLEEP_SECS: u64 = 1; + let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); let sleep_jobs = vec![job.clone(), job.clone(), job.clone()]; let now = SystemTime::now(); - send_jobs_to_executor(sleep_jobs).await; - assert_eq!(now.elapsed().unwrap().as_secs(), secs_to_sleep) + let fids = exec_jobs_nowait(sleep_jobs).await.unwrap(); + for f in fids.into_iter() { + wait_for_task(f).await; + } + assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS) } #[tokio::test] async fn test_shell_job() -> UResult<()> { - let job = JobMeta::from_shell_arc("whoami".into()); - let job_result = exec_job(job.clone()).await.unwrap(); + let job = JobMeta::from_shell("whoami"); + let job_result = exec_job(job).await.unwrap(); + let stdout = JobOutput::from_raw(&job_result.result.unwrap()).unwrap().stdout; assert_eq!( - vec_to_string(&job_result.data.unwrap()?.stdout).trim(), + vec_to_string(&stdout).trim(), "plazmoid" ); Ok(()) } + #[tokio::test] + async fn test_complex_shell_jobs_load() -> UResult<()> { + const SLEEP_SECS: u64 = 1; + let now = SystemTime::now(); + let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); + let longest_job_id = exec_job_nowait(longest_job).await.unwrap(); + let ls = exec_job(JobMeta::from_shell("ls")).await.unwrap(); + assert_eq!(ls.retcode.unwrap(), 0); + let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap(); + let folders = String::from_utf8_lossy( + &result.stdout + ); + let ls_subfolders = exec_jobs( + folders.lines().map(|f| JobMeta::from_shell(format!("ls {}", f))).collect() + ).await; + for result in ls_subfolders { + assert_eq!(result.unwrap().retcode.unwrap(), 0); + } + wait_for_task(longest_job_id).await; + assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); + Ok(()) + } + #[tokio::test] async fn test_failing_shell_job() -> UResult<()> { - let job = JobMeta::from_shell_arc("lol_kek_puk".into()); - let job_result = exec_job(job.clone()).await.unwrap(); - assert!(job_result.data.unwrap().is_err()); - assert_eq!(job_result.retcode, None); + let job = JobMeta::from_shell("lol_kek_puk"); + let job_result = exec_job(job).await.unwrap(); + let output = JobOutput::from_raw(&job_result.result.unwrap()); + assert!(output.is_none()); + assert!(job_result.retcode.is_none()); Ok(()) } @@ -315,7 +363,7 @@ mod tests { lol\n\ *** STDERR ***\n\ kek\n" - ) + ).into_bytes() ) } @@ -328,15 +376,15 @@ mod tests { String::from( "*** STDERR ***\n\ kek\n" - ) + ).into_bytes() ) } #[test] fn test_from_multiline() { let txt = "*** STDOUT ***\n\ - puk\n".to_string(); - let output = JobOutput::from_multiline(&txt).unwrap(); + puk\n".as_bytes(); + let output = JobOutput::from_raw(txt).unwrap(); assert_eq!( output.stdout, b"puk".to_vec() diff --git a/migrations/2020-10-24-111622_create_all/down.sql b/migrations/2020-10-24-111622_create_all/down.sql index faef607..4c1d485 100644 --- a/migrations/2020-10-24-111622_create_all/down.sql +++ b/migrations/2020-10-24-111622_create_all/down.sql @@ -1 +1,8 @@ -DROP DATABASE u_db; +DROP TABLE ip_addrs; +DROP TABLE results; +DROP TABLE certificates; +DROP TABLE jobs; +DROP TABLE agents; + +DROP TYPE IF EXISTS JobState; +DROP TYPE IF EXISTS JobType; \ No newline at end of file diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index ea352d1..d3dcdd9 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -1,6 +1,6 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -CREATE TYPE JOB_TYPE AS ENUM ('shell', 'manage', 'binary', 'python'); -CREATE TYPE TASK_STATUS AS ENUM ('queued', 'running', 'finished'); +CREATE TYPE JobType AS ENUM ('shell', 'manage', 'binary', 'python'); +CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished'); CREATE TABLE IF NOT EXISTS agents ( alias TEXT @@ -37,9 +37,9 @@ CREATE TABLE IF NOT EXISTS jobs ( , id UUID NOT NULL DEFAULT uuid_generate_v4() -- Shell, Binary (with program download), -- Python (with program and python download if not exist), Management - , exec_type JOB_TYPE NOT NULL DEFAULT 'shell' + , exec_type JobType NOT NULL DEFAULT 'shell' , platform TEXT NOT NULL - , path TEXT NOT NULL + , payload BYTEA , PRIMARY KEY(id) ); @@ -47,11 +47,10 @@ CREATE TABLE IF NOT EXISTS results ( agent_id UUID NOT NULL , created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , id UUID NOT NULL DEFAULT uuid_generate_v4() - , job_id INTEGER NOT NULL - , result TEXT + , job_id UUID NOT NULL + , result BYTEA , retcode INTEGER - -- Queued, Pending, Running, Finished - , state TASK_STATUS NOT NULL DEFAULT 'queued' + -- , state JobState NOT NULL DEFAULT 'queued' , ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , FOREIGN KEY(agent_id) REFERENCES agents(id) , FOREIGN KEY(job_id) REFERENCES jobs(id)