waiter struct

fixed joboutput
4-update-check
plazmoid 4 years ago
parent aedd8f774c
commit 369359761e
  1. 90
      lib/u_lib/src/executor.rs
  2. 8
      lib/u_lib/src/models/agent.rs
  3. 84
      lib/u_lib/src/models/jobs.rs
  4. 86
      lib/u_lib/src/models/schema.rs
  5. 17
      lib/u_lib/src/utils.rs

@ -5,16 +5,13 @@
use crate::{models::*, UResult, UError}; use crate::{models::*, UResult, UError};
use std::collections::HashMap; use std::collections::HashMap;
use std::pin::Pin;
use std::thread::sleep;
use std::time::{Duration, Instant};
use std::task::Poll; use std::task::Poll;
use tokio::process::Command;
use futures::{lock::Mutex, prelude::*, poll}; use futures::{lock::Mutex, prelude::*, poll};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tokio::{prelude::*, spawn, task::JoinHandle}; use tokio::{prelude::*, spawn, task::JoinHandle};
use uuid::Uuid; use uuid::Uuid;
use crate::OneOrMany;
pub type FutRes = UResult<JobResult>; pub type FutRes = UResult<JobResult>;
@ -22,25 +19,57 @@ lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinHandle<FutRes>>> = Mutex::new(HashMap::new()); static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinHandle<FutRes>>> = Mutex::new(HashMap::new());
} }
//TODO: waiter struct pub struct Waiter<T>
pub async fn append_task(task: impl Future<Output=FutRes> + Send + 'static) -> Uuid { where T: Future<Output=FutRes> + Send + 'static {
let fid = Uuid::new_v4(); tasks: Vec<T>,
let result = spawn(Box::pin(task)); fids: Vec<Uuid>
FUT_RESULTS.lock().await.insert(fid, result);
fid
} }
pub async fn append_tasks(tasks: Vec<impl Future<Output=FutRes> + Send + 'static>) -> Vec<Uuid> { impl<T> Waiter<T>
let mut fids = Vec::<Uuid>::new(); where
for f in tasks.into_iter() { T: Future<Output=FutRes> + Send + 'static
let fid = append_task(f).await; {
fids.push(fid); pub fn new<S: OneOrMany<T>>(tasks: S) -> Self {
Self {
tasks: tasks.into_vec(),
fids: vec![]
}
}
pub async fn spawn(mut self) -> Self {
for f in self.tasks.drain(..) {
let fid = Uuid::new_v4();
let result = spawn(Box::pin(f));
FUT_RESULTS.lock().await.insert(fid, result);
self.fids.push(fid);
}
self
}
// wait until a bunch of tasks is finished
// NOT GUARANTEED that all tasks will be returned due to
// possibility to pop them in other places
pub async fn wait(self) -> Vec<FutRes> {
let mut result = vec![];
for fid in self.fids {
if let Some(task) = pop_task(fid).await {
result.push(task.await.unwrap());
}
}
result
}
pub async fn run_until_complete(self) -> Vec<FutRes> {
self.spawn().await.wait().await
}
pub async fn run_one_until_complete(self) -> FutRes {
self.run_until_complete().await.pop().unwrap()
} }
fids
} }
pub async fn pop_task(fid: Uuid) -> JoinHandle<FutRes> { async fn pop_task(fid: Uuid) -> Option<JoinHandle<FutRes>> {
FUT_RESULTS.lock().await.remove(&fid).expect(&UError::NoTask(fid).to_string()) FUT_RESULTS.lock().await.remove(&fid)
} }
pub async fn task_present(fid: Uuid) -> bool { pub async fn task_present(fid: Uuid) -> bool {
@ -64,22 +93,21 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes>{
status status
} }
pub async fn pop_completed(fids: Option<Vec<Uuid>>) -> Vec<Option<FutRes>> { pub async fn pop_completed() -> Vec<FutRes> {
let fids = match fids { let fids = FUT_RESULTS.lock()
Some(v) => v, .await
None => FUT_RESULTS.lock() .keys()
.await .map(|k| *k)
.keys() .collect::<Vec<Uuid>>();
.map(|k| *k) let mut completed: Vec<FutRes> = vec![];
.collect::<Vec<Uuid>>()
};
let mut completed: Vec<Option<FutRes>> = vec![];
for fid in fids { for fid in fids {
completed.push(pop_task_if_completed(fid).await) if let Some(r) = pop_task_if_completed(fid).await {
completed.push(r);
}
} }
completed completed
} }
/*
pub async fn wait_for_task(fid: Uuid) -> FutRes { pub async fn wait_for_task(fid: Uuid) -> FutRes {
pop_task(fid).await.await.unwrap() pop_task(fid).await.await.unwrap()
} }
@ -95,4 +123,4 @@ pub async fn wait_for_tasks(fids: Vec<Uuid>) -> Vec<FutRes> {
pub async fn run_until_complete(task: impl Future<Output=FutRes> + Send + 'static) -> FutRes { pub async fn run_until_complete(task: impl Future<Output=FutRes> + Send + 'static) -> FutRes {
let fid = append_task(task).await; let fid = append_task(task).await;
wait_for_task(fid).await wait_for_task(fid).await
} }*/

@ -14,8 +14,6 @@ use crate::{
UID, UID,
utils::vec_to_string, utils::vec_to_string,
models::schema::*, models::schema::*,
executor::*,
jobs::exec_job
}; };
use guess_host_triple::guess_host_triple; use guess_host_triple::guess_host_triple;
@ -55,7 +53,11 @@ pub struct IAgent {
pub async fn gather() -> IAgent { pub async fn gather() -> IAgent {
async fn run_cmd_fast<S: Into<String>>(cmd: S) -> String { async fn run_cmd_fast<S: Into<String>>(cmd: S) -> String {
let job_result = exec_job(JobMeta::from_shell(cmd)).await.unwrap().result.unwrap(); let job_result = build_jobs(JobMeta::from_shell(cmd))
.run_one_until_complete()
.await
.result
.unwrap();
JobOutput::from_raw(&job_result) JobOutput::from_raw(&job_result)
.map(|o| vec_to_string(&o.into_appropriate())) .map(|o| vec_to_string(&o.into_appropriate()))
.unwrap_or(String::from_utf8_lossy(&job_result).to_string()) .unwrap_or(String::from_utf8_lossy(&job_result).to_string())

@ -10,7 +10,7 @@ use serde::{
use uuid::Uuid; use uuid::Uuid;
use guess_host_triple::guess_host_triple; use guess_host_triple::guess_host_triple;
use tokio::process::Command; use tokio::process::Command;
use crate::{models::schema::*, UError, UResult, UID, run_until_complete, append_tasks, append_task, wait_for_tasks}; use crate::{models::schema::*, UError, UResult, UID, Waiter, OneOrMany};
use diesel_derive_enum::DbEnum; use diesel_derive_enum::DbEnum;
use diesel::{ use diesel::{
Queryable, Queryable,
@ -53,12 +53,12 @@ pub enum JobType {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct JobOutput { pub struct JobOutput<'s> {
pub stdout: Vec<u8>, pub stdout: &'s [u8],
pub stderr: Vec<u8>, pub stderr: &'s [u8],
} }
impl JobOutput { impl<'s, 'src: 's> JobOutput<'s> {
const STREAM_BORDER: &'static str = "***"; const STREAM_BORDER: &'static str = "***";
const STDOUT: &'static str = "STDOUT"; const STDOUT: &'static str = "STDOUT";
const STDERR: &'static str = "STDERR"; const STDERR: &'static str = "STDERR";
@ -73,17 +73,17 @@ impl JobOutput {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
stdout: Vec::new(), stdout: &[],
stderr: Vec::new(), stderr: &[],
} }
} }
pub fn stdout(mut self, data: Vec<u8>) -> Self { pub fn stdout(mut self, data: &'s [u8]) -> Self {
self.stdout = data; self.stdout = data;
self self
} }
pub fn stderr(mut self, data: Vec<u8>) -> Self { pub fn stderr(mut self, data: &'s [u8]) -> Self {
self.stderr = data; self.stderr = data;
self self
} }
@ -92,40 +92,40 @@ impl JobOutput {
let mut result: Vec<u8> = vec![]; let mut result: Vec<u8> = vec![];
if self.stdout.len() > 0 { if self.stdout.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes()); result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes());
result.extend(&self.stdout); result.extend(self.stdout);
result.push(b'\n'); result.push(b'\n');
} }
if self.stderr.len() > 0 { if self.stderr.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes()); result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes());
result.extend(&self.stderr); result.extend(self.stderr);
result.push(b'\n'); result.push(b'\n');
} }
result result
} }
pub fn from_raw(raw: &[u8]) -> Option<Self> { pub fn from_raw(raw: &'src [u8]) -> Option<Self> {
let raw = String::from_utf8_lossy(raw); let raw = String::from_utf8_lossy(raw);
let err_header = JobOutput::create_delim(JobOutput::STDERR); let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| { .map(|s: &str| {
let mut parts = s.split(&err_header) let mut parts = s.split(&err_header)
.map(|d| Vec::from(d.trim().as_bytes())) .map(|d| d.trim().as_bytes())
.collect::<Vec<Vec<u8>>>() .collect::<Vec<&[u8]>>()
.into_iter(); .into_iter();
JobOutput::new() JobOutput::new()
.stdout(parts.next().unwrap()) .stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(vec![])) .stderr(parts.next().unwrap_or(&[]))
}) })
} }
pub fn into_appropriate(self) -> Vec<u8> { pub fn into_appropriate(self) -> &'s [u8] {
if self.stdout.len() > 0 { if self.stdout.len() > 0 {
self.stdout self.stdout
} else if self.stderr.len() > 0 { } else if self.stderr.len() > 0 {
self.stderr self.stderr
} else { } else {
UError::Raw("No data").to_string().into_bytes() b"No data"
} }
} }
} }
@ -175,6 +175,8 @@ impl JobResult {
inst.job_id = job_id; inst.job_id = job_id;
inst inst
} }
//pub fn as_job_output(&self) -> JobOutput {}
} }
impl Default for JobResult { impl Default for JobResult {
@ -238,8 +240,8 @@ impl Job {
Ok(output) => { Ok(output) => {
( (
Some(JobOutput::new() Some(JobOutput::new()
.stdout(output.stdout.to_vec()) .stdout(&output.stdout)
.stderr(output.stderr.to_vec()) .stderr(&output.stderr)
.multiline() .multiline()
), ),
output.status.code() output.status.code()
@ -262,6 +264,15 @@ impl Job {
} }
} }
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter<_> {
let prepared_jobs = job_metas.into_vec().into_iter().map(|job| {
let j = Job::build(job).unwrap();
j.run()
}).collect();
Waiter::new(prepared_jobs)
}
/*
pub async fn exec_jobs(jobs: Vec<JobMeta>) -> Vec<UResult<JobResult>> { pub async fn exec_jobs(jobs: Vec<JobMeta>) -> Vec<UResult<JobResult>> {
let fids = exec_jobs_nowait(jobs).await.unwrap(); let fids = exec_jobs_nowait(jobs).await.unwrap();
wait_for_tasks(fids).await wait_for_tasks(fids).await
@ -286,12 +297,12 @@ pub async fn exec_job_nowait(job_meta: JobMeta) -> UResult<Uuid> {
let fid = append_task(job.run()).await; let fid = append_task(job.run()).await;
Ok(fid) Ok(fid)
} }
*/
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{exec_job, utils::vec_to_string, wait_for_task, append_task}; use crate::{build_jobs, utils::vec_to_string};
#[tokio::test] #[tokio::test]
async fn test_is_really_async() { async fn test_is_really_async() {
@ -299,20 +310,19 @@ mod tests {
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let sleep_jobs = vec![job.clone(), job.clone(), job.clone()]; let sleep_jobs = vec![job.clone(), job.clone(), job.clone()];
let now = SystemTime::now(); let now = SystemTime::now();
let fids = exec_jobs_nowait(sleep_jobs).await.unwrap(); let fids = build_jobs(sleep_jobs).run_until_complete().await;
for f in fids.into_iter() {
wait_for_task(f).await;
}
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS) assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS)
} }
#[tokio::test] #[tokio::test]
async fn test_shell_job() -> UResult<()> { async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("whoami"); let job = JobMeta::from_shell("whoami");
let job_result = exec_job(job).await.unwrap(); let job_result = build_jobs(job)
.run_one_until_complete()
.await;
let stdout = JobOutput::from_raw(&job_result.result.unwrap()).unwrap().stdout; let stdout = JobOutput::from_raw(&job_result.result.unwrap()).unwrap().stdout;
assert_eq!( assert_eq!(
vec_to_string(&stdout).trim(), vec_to_string(stdout).trim(),
"plazmoid" "plazmoid"
); );
Ok(()) Ok(())
@ -323,20 +333,22 @@ mod tests {
const SLEEP_SECS: u64 = 1; const SLEEP_SECS: u64 = 1;
let now = SystemTime::now(); let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)); let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let longest_job_id = exec_job_nowait(longest_job).await.unwrap(); let longest_job = build_jobs(longest_job).spawn().await;
let ls = exec_job(JobMeta::from_shell("ls")).await.unwrap(); let ls = build_jobs(JobMeta::from_shell("ls"))
.run_one_until_complete()
.await;
assert_eq!(ls.retcode.unwrap(), 0); assert_eq!(ls.retcode.unwrap(), 0);
let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap(); let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap();
let folders = String::from_utf8_lossy( let folders = String::from_utf8_lossy(
&result.stdout &result.stdout
); );
let ls_subfolders = exec_jobs( let ls_subfolders = build_jobs(
folders.lines().map(|f| JobMeta::from_shell(format!("ls {}", f))).collect() folders.lines().map(|f| JobMeta::from_shell(format!("ls {}", f))).collect()
).await; ).run_until_complete().await;
for result in ls_subfolders { for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0); assert_eq!(result.unwrap().retcode.unwrap(), 0);
} }
wait_for_task(longest_job_id).await; longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(()) Ok(())
} }
@ -344,7 +356,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_failing_shell_job() -> UResult<()> { async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk"); let job = JobMeta::from_shell("lol_kek_puk");
let job_result = exec_job(job).await.unwrap(); let job_result = build_jobs(job).run_one_until_complete().await;
let output = JobOutput::from_raw(&job_result.result.unwrap()); let output = JobOutput::from_raw(&job_result.result.unwrap());
assert!(output.is_none()); assert!(output.is_none());
assert!(job_result.retcode.is_none()); assert!(job_result.retcode.is_none());
@ -354,8 +366,8 @@ mod tests {
#[test] #[test]
fn test_to_multiline() { fn test_to_multiline() {
let mut output = JobOutput::new(); let mut output = JobOutput::new();
output.stdout = b"lol".to_vec(); output.stdout = b"lol";
output.stderr = b"kek".to_vec(); output.stderr = b"kek";
assert_eq!( assert_eq!(
output.multiline(), output.multiline(),
String::from( String::from(
@ -370,7 +382,7 @@ mod tests {
#[test] #[test]
fn test_to_multiline_stderr_only() { fn test_to_multiline_stderr_only() {
let mut output = JobOutput::new(); let mut output = JobOutput::new();
output.stderr = b"kek".to_vec(); output.stderr = b"kek";
assert_eq!( assert_eq!(
output.multiline(), output.multiline(),
String::from( String::from(

@ -0,0 +1,86 @@
table! {
use diesel::sql_types::*;
use crate::*;
agents (id) {
alias -> Nullable<Text>,
hostname -> Text,
id -> Uuid,
is_root -> Bool,
is_root_allowed -> Bool,
last_active -> Timestamp,
platform -> Text,
regtime -> Timestamp,
status -> Nullable<Text>,
token -> Nullable<Text>,
username -> Text,
}
}
table! {
use diesel::sql_types::*;
use crate::*;
certificates (id) {
agent_id -> Uuid,
id -> Uuid,
is_revoked -> Bool,
}
}
table! {
use diesel::sql_types::*;
use crate::*;
ip_addrs (id) {
agent_id -> Uuid,
check_ts -> Timestamp,
gateway -> Nullable<Text>,
id -> Uuid,
iface -> Text,
ip_addr -> Text,
is_gray -> Bool,
netmask -> Text,
}
}
table! {
use diesel::sql_types::*;
use crate::*;
jobs (id) {
alias -> Nullable<Text>,
id -> Uuid,
exec_type -> JobType,
platform -> Text,
payload -> Nullable<Bytea>,
}
}
table! {
use diesel::sql_types::*;
use crate::*;
results (id) {
agent_id -> Uuid,
created -> Timestamp,
id -> Uuid,
job_id -> Uuid,
result -> Nullable<Bytea>,
retcode -> Nullable<Int4>,
ts -> Timestamp,
}
}
joinable!(certificates -> agents (agent_id));
joinable!(ip_addrs -> agents (agent_id));
joinable!(results -> agents (agent_id));
joinable!(results -> jobs (job_id));
allow_tables_to_appear_in_same_query!(
agents,
certificates,
ip_addrs,
jobs,
results,
);

@ -15,6 +15,21 @@ use nix::{
}; };
use std::process::exit; use std::process::exit;
pub trait OneOrMany<T> {
fn into_vec(self) -> Vec<T>;
}
impl<T> OneOrMany<T> for T {
fn into_vec(self) -> Vec<T> {
vec![self]
}
}
impl<T> OneOrMany<T> for Vec<T> {
fn into_vec(self) -> Vec<T> {
self
}
}
pub fn daemonize() { pub fn daemonize() {
if getppid().as_raw() != 1 { if getppid().as_raw() != 1 {
@ -46,6 +61,6 @@ pub fn setsig(sig: Signal, hnd: SigHandler) {
} }
} }
pub fn vec_to_string(v: &Vec<u8>) -> String { pub fn vec_to_string(v: &[u8]) -> String {
String::from_utf8_lossy(v).to_string() String::from_utf8_lossy(v).to_string()
} }
Loading…
Cancel
Save