finished executor

realised that tokio::spawn sucks
4-update-check
plazmoid 4 years ago
parent 369359761e
commit e921ee98a5
  1. 2
      bin/u_agent/src/main.rs
  2. 2
      lib/u_lib/Cargo.toml
  3. 145
      lib/u_lib/src/executor.rs
  4. 2
      lib/u_lib/src/messaging.rs
  5. 4
      lib/u_lib/src/models/agent.rs
  6. 142
      lib/u_lib/src/models/jobs.rs

@ -23,7 +23,7 @@ async fn main() {
//daemonize();
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip);
let cli_info = agent::gather().await;
let cli_info = gather().await;
retry_until_ok!(instance.init(&cli_info).await);
loop {/*
let jobs = retry_until_ok!(instance.get_jobs().await);

@ -12,7 +12,7 @@ uuid = { version = "0.6.5", features = ["serde", "v4"] }
nix = "0.17"
libc = "^0.2"
lazy_static = "1.4.0"
tokio = { version = "0.2.22", features = ["macros", "process"] }
tokio = { version = "1.2.0", features = ["rt", "sync", "macros", "process", "time"] }
reqwest = { version = "0.10.7", features = ["json"] }
futures = "0.3.5"
guess_host_triple = "0.1.2"

@ -1,35 +1,52 @@
// list of jobs: job (cmd, args) OR rust fn OR python func + cron-like timing
// list of jobs: job (cmd, args) OR rust fn OR python func + cron-like timing tokoi::time::interval:
// job runner (thread)
// every job runs in other thread/process
use crate::{models::*, UResult, UError};
use crate::{models::*, UResult, UError, OneOrMany};
use std::collections::HashMap;
use std::task::Poll;
use futures::{lock::Mutex, prelude::*, poll};
use futures::{lock::{Mutex, MutexGuard}, future::BoxFuture};
use lazy_static::lazy_static;
use tokio::{prelude::*, spawn, task::JoinHandle};
use tokio::{
spawn,
task::JoinHandle,
sync::mpsc::{channel, Receiver, Sender}
};
use uuid::Uuid;
use crate::OneOrMany;
pub type FutRes = UResult<JobResult>;
pub type DynFut = BoxFuture<'static, FutRes>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinHandle<FutRes>>> = Mutex::new(HashMap::new());
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
static ref FUT_CHANNEL: (Mutex<Sender<Uuid>>, Mutex<Receiver<Uuid>>) = {
spawn(init_receiver());
let (tx, rx) = channel(100);
(Mutex::new(tx), Mutex::new(rx))
};
}
async fn get_static_channel() -> (Sender<Uuid>, MutexGuard<'static, Receiver<Uuid>>) {
(
FUT_CHANNEL.0.lock().await.clone(),
FUT_CHANNEL.1.lock().await
)
}
struct JoinInfo {
handle: JoinHandle<FutRes>,
completed: bool,
collectable: bool // indicates if future can be popped from pool via pop_task_if_completed
}
pub struct Waiter<T>
where T: Future<Output=FutRes> + Send + 'static {
tasks: Vec<T>,
pub struct Waiter {
tasks: Vec<DynFut>,
fids: Vec<Uuid>
}
impl<T> Waiter<T>
where
T: Future<Output=FutRes> + Send + 'static
{
pub fn new<S: OneOrMany<T>>(tasks: S) -> Self {
impl Waiter {
pub fn new<S: OneOrMany<DynFut>>(tasks: S) -> Self {
Self {
tasks: tasks.into_vec(),
fids: vec![]
@ -37,11 +54,27 @@ where
}
pub async fn spawn(mut self) -> Self {
let collectable = self.tasks.len() != 1;
for f in self.tasks.drain(..) {
//eprintln!("before static channel");
let tx = get_static_channel().await.0;
//eprintln!("after static channel");
let fid = Uuid::new_v4();
let result = spawn(Box::pin(f));
FUT_RESULTS.lock().await.insert(fid, result);
self.fids.push(fid);
let task_wrapper = async move {
//eprintln!("inside wrapper (started): {}", fid);
let result = f.await;
tx.send(fid).await.unwrap();
result
};
let handle = JoinInfo {
handle: spawn(task_wrapper),
completed: false,
collectable
};
//eprintln!("before push: {}", fid);
spawn(async {}).await.ok();
FUT_RESULTS.lock().await.insert(fid, handle);
}
self
}
@ -53,7 +86,8 @@ where
let mut result = vec![];
for fid in self.fids {
if let Some(task) = pop_task(fid).await {
result.push(task.await.unwrap());
let r = task.handle.await;
result.push(r.unwrap());
}
}
result
@ -68,38 +102,44 @@ where
}
}
async fn pop_task(fid: Uuid) -> Option<JoinHandle<FutRes>> {
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
pub async fn task_present(fid: Uuid) -> bool {
FUT_RESULTS.lock().await.get(&fid).is_some()
async fn init_receiver() {
while let Some(fid) = get_static_channel().await.1.recv().await {
eprintln!("task {} is done", fid);
if let Some(j) = FUT_RESULTS.lock().await.get_mut(&fid) {
j.completed = true;
}
}
}
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes>{
let mut tasks = FUT_RESULTS
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS
.lock()
.await;
let task = tasks
.get_mut(&fid)
.expect(&UError::NoTask(fid).to_string());
let status = match poll!(task) {
Poll::Pending => None,
Poll::Ready(r) => Some(r.unwrap())
.await
.get_mut(&fid) {
Some(t) => t,
None => return None
};
if status.is_some() {
pop_task(fid).await;
//eprint!("{}, {}: ", &fid, *collectable);
if collectable && completed {
let task = pop_task(fid).await.unwrap();
let result = task.handle.await.unwrap();
Some(result)
} else {
None
}
status
}
pub async fn pop_completed() -> Vec<FutRes> {
let mut completed: Vec<FutRes> = vec![];
let fids = FUT_RESULTS.lock()
.await
.keys()
.map(|k| *k)
.collect::<Vec<Uuid>>();
let mut completed: Vec<FutRes> = vec![];
for fid in fids {
if let Some(r) = pop_task_if_completed(fid).await {
completed.push(r);
@ -107,20 +147,29 @@ pub async fn pop_completed() -> Vec<FutRes> {
}
completed
}
/*
pub async fn wait_for_task(fid: Uuid) -> FutRes {
pop_task(fid).await.await.unwrap()
}
pub async fn wait_for_tasks(fids: Vec<Uuid>) -> Vec<FutRes> {
let mut results = vec![];
for fid in fids {
results.push(wait_for_task(fid).await);
#[cfg(test)]
mod tests {
use super::*;
// WTF
// WTF
// WTF
#[tokio::test]
async fn test_spawn() {
use std::sync::Arc;
let val = Arc::new(Mutex::new(0));
let t = {
let v = val.clone();
spawn(async move {
*v.lock().await = 5;
})
};
assert_eq!(0, *val.lock().await);
spawn(async {}).await.ok();
assert_eq!(5, *val.lock().await);
t.await.ok();
assert_eq!(5, *val.lock().await);
}
results
}
pub async fn run_until_complete(task: impl Future<Output=FutRes> + Send + 'static) -> FutRes {
let fid = append_task(task).await;
wait_for_task(fid).await
}*/

@ -45,7 +45,7 @@ impl<'cow, I> Message<'cow, I>
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RawMsg(pub String);
impl<T: ToMsg> ToMsg for Vec<T> {}
impl<T: ToMsg> ToMsg for Vec<T> {} //TODO: impl this for all collections
#[derive(Serialize, Deserialize, Debug, Clone)]

@ -53,9 +53,11 @@ pub struct IAgent {
pub async fn gather() -> IAgent {
async fn run_cmd_fast<S: Into<String>>(cmd: S) -> String {
let job_result = build_jobs(JobMeta::from_shell(cmd))
let jm = JobMeta::from_shell(cmd);
let job_result = build_jobs(jm)
.run_one_until_complete()
.await
.unwrap()
.result
.unwrap();
JobOutput::from_raw(&job_result)

@ -1,7 +1,7 @@
use std::{
// process::Command,
time::SystemTime,
cmp::PartialEq,
time::{SystemTime, Duration},
thread,
cmp::PartialEq
};
use serde::{
Serialize,
@ -10,7 +10,15 @@ use serde::{
use uuid::Uuid;
use guess_host_triple::guess_host_triple;
use tokio::process::Command;
use crate::{models::schema::*, UError, UResult, UID, Waiter, OneOrMany};
use crate::{
models::schema::*,
UError,
UResult,
UID,
Waiter,
OneOrMany,
DynFut,
};
use diesel_derive_enum::DbEnum;
use diesel::{
Queryable,
@ -53,12 +61,12 @@ pub enum JobType {
}
#[derive(Clone, Debug)]
pub struct JobOutput<'s> {
pub stdout: &'s [u8],
pub stderr: &'s [u8],
pub struct JobOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
impl<'s, 'src: 's> JobOutput<'s> {
impl JobOutput {
const STREAM_BORDER: &'static str = "***";
const STDOUT: &'static str = "STDOUT";
const STDERR: &'static str = "STDERR";
@ -73,18 +81,18 @@ impl<'s, 'src: 's> JobOutput<'s> {
pub fn new() -> Self {
Self {
stdout: &[],
stderr: &[],
stdout: vec![],
stderr: vec![],
}
}
pub fn stdout(mut self, data: &'s [u8]) -> Self {
self.stdout = data;
pub fn stdout(mut self, data: &[u8]) -> Self {
self.stdout = data.to_owned();
self
}
pub fn stderr(mut self, data: &'s [u8]) -> Self {
self.stderr = data;
pub fn stderr(mut self, data: &[u8]) -> Self {
self.stderr = data.to_owned();
self
}
@ -92,19 +100,19 @@ impl<'s, 'src: 's> JobOutput<'s> {
let mut result: Vec<u8> = vec![];
if self.stdout.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes());
result.extend(self.stdout);
result.extend(&self.stdout);
result.push(b'\n');
}
if self.stderr.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes());
result.extend(self.stderr);
result.extend(&self.stderr);
result.push(b'\n');
}
result
}
pub fn from_raw(raw: &'src [u8]) -> Option<Self> {
pub fn from_raw(raw: &[u8]) -> Option<Self> {
let raw = String::from_utf8_lossy(raw);
let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
@ -119,13 +127,13 @@ impl<'s, 'src: 's> JobOutput<'s> {
})
}
pub fn into_appropriate(self) -> &'s [u8] {
pub fn into_appropriate(self) -> Vec<u8> {
if self.stdout.len() > 0 {
self.stdout
} else if self.stderr.len() > 0 {
self.stderr
} else {
b"No data"
b"No data".to_vec()
}
}
}
@ -264,54 +272,28 @@ impl Job {
}
}
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter<_> {
let prepared_jobs = job_metas.into_vec().into_iter().map(|job| {
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let prepared_jobs = job_metas.into_vec().into_iter().map(|job| -> DynFut {
let j = Job::build(job).unwrap();
j.run()
}).collect();
Box::pin(j.run())
}).collect::<Vec<DynFut>>();
Waiter::new(prepared_jobs)
}
/*
pub async fn exec_jobs(jobs: Vec<JobMeta>) -> Vec<UResult<JobResult>> {
let fids = exec_jobs_nowait(jobs).await.unwrap();
wait_for_tasks(fids).await
}
pub async fn exec_jobs_nowait(jobs: Vec<JobMeta>) -> UResult<Vec<Uuid>> {
let prepared_jobs = jobs.into_iter().map(|job| {
let j = Job::build(job).unwrap();
j.run()
}).collect();
let fids = append_tasks(prepared_jobs).await;
Ok(fids)
}
pub async fn exec_job(job_meta: JobMeta) -> UResult<JobResult> {
let job = Job::build(job_meta)?;
run_until_complete(job.run()).await
}
pub async fn exec_job_nowait(job_meta: JobMeta) -> UResult<Uuid> {
let job = Job::build(job_meta)?;
let fid = append_task(job.run()).await;
Ok(fid)
}
*/
#[cfg(test)]
mod tests {
use super::*;
use crate::{build_jobs, utils::vec_to_string};
use crate::{build_jobs, utils::vec_to_string, pop_completed, spawn_dummy};
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let sleep_jobs = vec![job.clone(), job.clone(), job.clone()];
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
let fids = build_jobs(sleep_jobs).run_until_complete().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS)
build_jobs(sleep_jobs).run_until_complete().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS+2)
}
#[tokio::test]
@ -320,30 +302,37 @@ mod tests {
let job_result = build_jobs(job)
.run_one_until_complete()
.await;
let stdout = JobOutput::from_raw(&job_result.result.unwrap()).unwrap().stdout;
let stdout = JobOutput::from_raw(
&job_result.unwrap().result.unwrap()
).unwrap().stdout;
assert_eq!(
vec_to_string(stdout).trim(),
vec_to_string(&stdout).trim(),
"plazmoid"
);
Ok(())
}
#[tokio::test]
async fn test_complex_shell_jobs_load() -> UResult<()> {
async fn test_complex_load() -> UResult<()> {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let longest_job = build_jobs(longest_job).spawn().await;
let ls = build_jobs(JobMeta::from_shell("ls"))
.run_one_until_complete()
.await;
.await
.unwrap();
assert_eq!(ls.retcode.unwrap(), 0);
let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap();
let folders = String::from_utf8_lossy(
&result.stdout
);
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)))
.collect();
let ls_subfolders = build_jobs(
folders.lines().map(|f| JobMeta::from_shell(format!("ls {}", f))).collect()
subfolders_jobs
).run_until_complete().await;
for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0);
@ -353,10 +342,27 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk");
let job_result = build_jobs(job).run_one_until_complete().await;
let job_result = build_jobs(job).run_one_until_complete().await.unwrap();
let output = JobOutput::from_raw(&job_result.result.unwrap());
assert!(output.is_none());
assert!(job_result.retcode.is_none());
@ -366,29 +372,29 @@ mod tests {
#[test]
fn test_to_multiline() {
let mut output = JobOutput::new();
output.stdout = b"lol";
output.stderr = b"kek";
output.stdout = b"lol".to_vec();
output.stderr = b"kek".to_vec();
assert_eq!(
output.multiline(),
vec_to_string(&output.multiline()),
String::from(
"*** STDOUT ***\n\
lol\n\
*** STDERR ***\n\
kek\n"
).into_bytes()
)
)
}
#[test]
fn test_to_multiline_stderr_only() {
let mut output = JobOutput::new();
output.stderr = b"kek";
output.stderr = b"kek".to_vec();
assert_eq!(
output.multiline(),
vec_to_string(&output.multiline()),
String::from(
"*** STDERR ***\n\
kek\n"
).into_bytes()
)
)
}
@ -398,8 +404,8 @@ mod tests {
puk\n".as_bytes();
let output = JobOutput::from_raw(txt).unwrap();
assert_eq!(
output.stdout,
b"puk".to_vec()
vec_to_string(&output.stdout),
"puk".to_string()
);
assert_eq!(output.stderr.len(), 0);
}

Loading…
Cancel
Save