From 47a0ae4b2b2c821d7d9ccbae46fc25ba6d1e76b1 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Sun, 13 Sep 2020 03:05:03 +0500 Subject: [PATCH] EXEQUTEOR --- bin/u_server/src/handlers.rs | 2 +- bin/u_server/src/main.rs | 4 +- cargo_static_docker.sh => cargo_musl.sh | 0 lib/u_lib/Cargo.toml | 2 +- lib/u_lib/src/client/client.rs | 24 +-- lib/u_lib/src/client/network.rs | 15 +- lib/u_lib/src/contracts/datatypes.rs | 12 +- lib/u_lib/src/contracts/jobs.rs | 220 ++++++++++++++++-------- lib/u_lib/src/contracts/messaging.rs | 9 - lib/u_lib/src/contracts/mod.rs | 2 +- lib/u_lib/src/errors.rs | 34 +++- lib/u_lib/src/executor.rs | 136 ++++++++++++++- 12 files changed, 333 insertions(+), 127 deletions(-) rename cargo_static_docker.sh => cargo_musl.sh (100%) diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 505b7bf..7ec2a49 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -42,7 +42,7 @@ pub async fn get_jobs( pub async fn set_jobs( uid: Option, - msg: Message<'_, CollectionWrapper>, + msg: Message<'_, CollectionWrapper>, db: Storage) -> Result { let mut clients = db.lock().await; diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index 1bd3dcb..ed801e6 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -46,14 +46,14 @@ async fn main() { let set_jobs = warp::post() .and(warp::path(Paths::set_jobs)) .and(warp::path::param::().map(Some)) - .and(get_content::>()) + .and(get_content::>()) .and(db.clone()) .and_then(handlers::set_jobs); let update_own_jobs = warp::post() .and(warp::path(Paths::set_jobs)) .and(warp::path::param::().map(Some)) - .and(get_content::>()) + .and(get_content::>()) .and(db.clone()) .and_then(handlers::set_jobs); diff --git a/cargo_static_docker.sh b/cargo_musl.sh similarity index 100% rename from cargo_static_docker.sh rename to cargo_musl.sh diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index fc16abc..beb15c3 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -14,4 +14,4 @@ libc = "^0.2" lazy_static = "1.4.0" tokio = { version = "0.2.22", features = ["macros", "process"] } reqwest = { version = "0.10.7", features = ["json"] } -bytes = "0.5.6" +futures = "0.3.5" diff --git a/lib/u_lib/src/client/client.rs b/lib/u_lib/src/client/client.rs index 6ec31bf..83d17c2 100644 --- a/lib/u_lib/src/client/client.rs +++ b/lib/u_lib/src/client/client.rs @@ -12,7 +12,7 @@ use crate::{contracts::*, UID, exec_job}; pub struct UClient { pub client_info: ClientInfo, - pub jobs: JobStorage, // TODO: to futures + pub jobs: JobMetaStorage, } impl UClient { @@ -31,12 +31,12 @@ pub struct ClientInfo { } impl ClientInfo { - pub fn gather() -> Self { + pub async fn gather() -> Self { let mut info: HashMap = HashMap::new(); for job in DEFAULT_JOBS { - let mut job_meta = JobMeta::from_shell(job.1.into()); - let job_result = exec_job(&mut job_meta); - let job_data = match job_result.data { + let job_meta = JobMeta::from_shell(job.1.into()).into_arc(); + let job_result = exec_job(job_meta.clone()).await; + let job_data = match job_result.unwrap().data.unwrap() { Ok(output) => output.multiline(), Err(e) => e.to_string() }; @@ -64,17 +64,19 @@ const DEFAULT_JOBS: &[(&str, &str)] = &[ #[cfg(test)] mod tests { use super::*; - use crate::utils::vec_to_string; + use crate::{ + utils::vec_to_string + }; + use std::time::SystemTime; - #[test] - fn test_gather() { - let cli_info = ClientInfo::gather(); + #[tokio::test] + async fn test_gather() { + let cli_info = ClientInfo::gather().await; let field = cli_info.get_field("username").unwrap(); let stdout = JobOutput::from_multiline(field).unwrap().stdout; assert_eq!( &vec_to_string(&stdout), - "plazmoid" + "root" ) } - } diff --git a/lib/u_lib/src/client/network.rs b/lib/u_lib/src/client/network.rs index f24e5d2..cd05bd1 100644 --- a/lib/u_lib/src/client/network.rs +++ b/lib/u_lib/src/client/network.rs @@ -47,7 +47,7 @@ macro_rules! build_handler { }; let mut request = builder(self, stringify!($path)); request = request - $(.json::>(¶m.as_message()))?; + $( .json::>(¶m.as_message()) )? ; let response = request.send().await?; ($crate::get_result!($result)(response)).await } @@ -103,8 +103,15 @@ impl ClientHandler { } } -build_handler!(POST init(ClientInfo) -> RawMsg); +// A - admin only +// client listing (A) build_handler!(GET ls() -> CollectionWrapper>); +// get jobs for client himself (A: id=client_id) +build_handler!(GET get_jobs() -> CollectionWrapper); +// add client to server's db +build_handler!(POST init(ClientInfo) -> RawMsg); +// ??? build_handler!(POST del() -> ()); -build_handler!(GET get_jobs() -> CollectionWrapper); -build_handler!(POST set_jobs(CollectionWrapper) -> ()); +// set jobs for client (A) +build_handler!(POST set_jobs(CollectionWrapper) -> ()); +// get_results (A): user_id, job_id diff --git a/lib/u_lib/src/contracts/datatypes.rs b/lib/u_lib/src/contracts/datatypes.rs index 065bf84..8d2b353 100644 --- a/lib/u_lib/src/contracts/datatypes.rs +++ b/lib/u_lib/src/contracts/datatypes.rs @@ -4,14 +4,18 @@ use { Mutex, MutexGuard }, - std::sync::Arc, - std::collections::HashMap, + std::{ + sync::{Arc, Mutex as StdMutex}, + collections::HashMap, + }, uuid::Uuid, serde::{Serialize, Deserialize} }; + pub type CliStorage = HashMap; -pub type JobStorage = HashMap; +pub type JobMetaStorage = HashMap; +pub type JobMetaRef = Arc>; // because can't impl From> for Cow #[derive(Serialize, Deserialize, Debug, Clone)] @@ -26,14 +30,12 @@ impl From for CollectionWrapper { impl ToMsg for CollectionWrapper {} impl<'cow, T: Clone> From> for Cow<'cow, CollectionWrapper> { - #[inline] fn from(obj: CollectionWrapper) -> Cow<'cow, CollectionWrapper> { Cow::Owned(obj) } } impl<'cow, T: Clone> From<&'cow CollectionWrapper> for Cow<'cow, CollectionWrapper> { - #[inline] fn from(obj: &'cow CollectionWrapper) -> Cow<'cow, CollectionWrapper> { Cow::Borrowed(obj) } diff --git a/lib/u_lib/src/contracts/jobs.rs b/lib/u_lib/src/contracts/jobs.rs index d05dd11..172a039 100644 --- a/lib/u_lib/src/contracts/jobs.rs +++ b/lib/u_lib/src/contracts/jobs.rs @@ -1,16 +1,24 @@ use std::{ - process::Command, +// process::Command, time::SystemTime, cmp::PartialEq, + sync::{Arc, Mutex, MutexGuard}, }; use serde::{ Serialize, Deserialize }; use uuid::Uuid; -//use tokio::process::Command; +use tokio::process::Command; use super::*; -use crate::{UError, UErrType}; +use crate::{ + UError, + UErrType, + UErrType::JobError, + BoxError, + JobErrType, + UResult, +}; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -29,7 +37,7 @@ pub enum JobSchedule { //TODO: Scheduled } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum JobState { Queued, // server created a job, but client didn't get it yet Pending, // client got a job, but not running yet @@ -75,13 +83,15 @@ impl JobMeta { } } + pub fn into_arc(self) -> JobMetaRef { + Arc::new(Mutex::new(self)) + } + pub fn touch(&mut self) { self.updated = SystemTime::now(); } } -impl ToMsg for JobMeta {} - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct JobOutput { @@ -109,6 +119,16 @@ impl JobOutput { } } + pub fn stdout(mut self, data: Vec) -> Self { + self.stdout = data; + self + } + + pub fn stderr(mut self, data: Vec) -> Self { + self.stderr = data; + self + } + pub fn multiline(&self) -> String { let mut result = String::new(); if self.stdout.len() > 0 { @@ -133,10 +153,9 @@ impl JobOutput { .map(|d| Vec::from(d.trim().as_bytes())) .collect::>>() .into_iter(); - let mut instance = JobOutput::new(); - instance.stdout = parts.next().unwrap(); - instance.stderr = parts.next().unwrap_or(vec![]); - instance + JobOutput::new() + .stdout(parts.next().unwrap()) + .stderr(parts.next().unwrap_or(vec![])) }) } } @@ -144,83 +163,102 @@ impl JobOutput { #[derive(Serialize, Deserialize, Clone, Debug)] pub struct JobResult { pub id: Uuid, - pub data: Result, + pub data: Option>, pub state: JobState, pub retcode: Option, + pub date: SystemTime, } -impl ToMsg for JobResult {} - -pub struct Job<'meta> { +pub struct Job { result: JobResult, - meta: &'meta mut JobMeta, + meta: JobMetaRef, } -impl<'meta> Job<'meta> { - pub fn new(job_meta: &'meta mut JobMeta) -> Self { +impl Job { + pub fn new(job_meta: JobMetaRef) -> Self { + let id = job_meta.lock().unwrap().id.clone(); + let state = job_meta.lock().unwrap().state.clone(); Self { result: JobResult { - id: job_meta.id.clone(), - state: job_meta.state.clone(), - data: Ok(JobOutput::new()), + id, + state: if state == JobState::Queued { + JobState::Pending + } else { + state + }, + data: None, retcode: None, }, meta: job_meta, } } - pub fn run(&mut self) { - match self.meta.exec_type { + pub async fn run(mut self) -> UResult { + match self.exec_type() { JobType::Shell => { - match self.meta.state { + match self.state() { JobState::Queued | JobState::Pending => { self.update_state(Some(JobState::Running)); }, JobState::Finished => { - if self.meta.schedule == JobSchedule::Permanent { + if self.schedule() == JobSchedule::Permanent { self.update_state(Some(JobState::Running)) } else { - return + return Err(UError::new_type( + JobError(JobErrType::Finished) + )) } }, - JobState::Running => return + JobState::Running => return Err(UError::new_type( + JobError(JobErrType::AlreadyRunning) + )) } - match &self.meta.payload { + let str_payload = match &self.lock().payload { Some(box_payload) => { - let payload = String::from_utf8_lossy(box_payload).into_owned(); - let mut cmd_parts = payload - .split(" ") - .map(String::from) - .collect::>() - .into_iter(); - let cmd = cmd_parts.nth(0).unwrap(); - let args = cmd_parts.collect::>(); - let result = Command::new(cmd) - .args(args) - .output(); - match result { - Ok(output) => { - let job_out: &mut JobOutput = self.result.data.as_mut().unwrap(); - job_out.stdout = output.stdout.to_vec(); - job_out.stderr = output.stderr.to_vec(); - self.result.retcode = output.status.code(); - } - Err(e) => { - self.result.data = Err( - UError::new(UErrType::JobError, e.to_string()) - ); - self.result.retcode = None; - } - } + String::from_utf8_lossy(box_payload).into_owned() } - None => return - } - self.meta.state = JobState::Finished; + None => unimplemented!() + }; + let mut cmd_parts = str_payload + .split(" ") + .map(String::from) + .collect::>() + .into_iter(); + let cmd = cmd_parts.nth(0).unwrap(); + let args = cmd_parts.collect::>(); + let cmd_result = Command::new(cmd) + .args(args) + .output() + .await; + let (data, retcode) = match cmd_result { + Ok(output) => { + ( + Some(Ok(JobOutput::new() + .stdout(output.stdout.to_vec()) + .stderr(output.stderr.to_vec())) + ), + output.status.code() + ) + } + Err(e) => { + ( + Some(Err(UError::new( + UErrType::JobError(JobErrType::System), + e.to_string() + ))), + None + ) + } + }; + self.update_state(Some(JobState::Finished)); + self.result.data = data; + self.result.retcode = retcode; }, _ => unimplemented!() } + Ok(self.into_result()) } /// None => state is copied from meta to result field @@ -228,17 +266,40 @@ impl<'meta> Job<'meta> { pub fn update_state(&mut self, state: Option) { match state { Some(state) => { - self.meta.state = state.clone(); + self.meta.lock().unwrap().state = state.clone(); self.result.state = state; } None => { - self.result.state = self.meta.state.clone(); + self.result.state = self.state(); } } } - pub fn into_result(mut self) -> JobResult { - Self::update_state(&mut self, None); + fn lock(&self) -> MutexGuard { + self.meta.lock().unwrap() + } + + pub fn id(&self) -> Uuid { + self.lock().id.clone() + } + + pub fn state(&self) -> JobState { + self.lock().state.clone() + } + + pub fn exec_type(&self) -> JobType { + self.lock().exec_type.clone() + } + + pub fn schedule(&self) -> JobSchedule { + self.lock().schedule.clone() + } + + pub fn finished(&self) -> bool { + self.state() == JobState::Finished + } + + pub fn into_result(self) -> JobResult { self.result } } @@ -248,30 +309,39 @@ impl<'meta> Job<'meta> { mod tests { use super::*; use crate::{ - execute_jobs, + send_jobs_to_executor, + exec_job, utils::vec_to_string }; - #[test] - fn test_shell_job() { - let mut job = JobMeta::from_shell("whoami".into()); - let mut jobs: Vec = vec![Job::new(&mut job)]; - execute_jobs(&mut jobs); - let job_result = jobs.pop().unwrap().into_result(); + #[tokio::test] + async fn test_is_really_async() { + let secs_to_sleep = 1; + let job = JobMeta::from_shell(format!("sleep {}", secs_to_sleep)).into_arc(); + let sleep_jobs = vec![job.clone(), job.clone(), job.clone()]; + let now = SystemTime::now(); + send_jobs_to_executor(sleep_jobs).await; + assert_eq!(now.elapsed().unwrap().as_secs(), secs_to_sleep) + } + + #[tokio::test] + async fn test_shell_job() -> UResult<()> { + let job = JobMeta::from_shell("whoami".into()).into_arc(); + let job_result = exec_job(job.clone()).await.unwrap(); assert_eq!( - &vec_to_string(&job_result.data.unwrap().stdout), - "plazmoid" + vec_to_string(&job_result.data.unwrap()?.stdout).trim(), + "root" ); + Ok(()) } - #[test] - fn test_failing_shell_job() { - let mut job = JobMeta::from_shell("lol_kek_puk".into()); - let mut jobs: Vec = vec![Job::new(&mut job)]; - execute_jobs(&mut jobs); - let job_result = jobs.pop().unwrap().into_result(); - assert!(job_result.data.is_err()); + #[tokio::test] + async fn test_failing_shell_job() -> UResult<()> { + let job = JobMeta::from_shell("lol_kek_puk".into()).into_arc(); + let job_result = exec_job(job.clone()).await.unwrap(); + assert!(job_result.data.unwrap().is_err()); assert_eq!(job_result.retcode, None); + Ok(()) } #[test] diff --git a/lib/u_lib/src/contracts/messaging.rs b/lib/u_lib/src/contracts/messaging.rs index 0fa2fc7..fa5c862 100644 --- a/lib/u_lib/src/contracts/messaging.rs +++ b/lib/u_lib/src/contracts/messaging.rs @@ -33,15 +33,6 @@ where I: Clone { } } - /* crutch - // because Vec is stored as &[] (wtf?) - pub fn new_owned(item: I) -> Self { - Self { - id: *UID, - item: Cow::Owned(item) - } - }*/ - pub fn into_item(self) -> Cow<'cow, I> { self.item } diff --git a/lib/u_lib/src/contracts/mod.rs b/lib/u_lib/src/contracts/mod.rs index 369798a..87f031f 100644 --- a/lib/u_lib/src/contracts/mod.rs +++ b/lib/u_lib/src/contracts/mod.rs @@ -34,4 +34,4 @@ macro_rules! to_message { } } -to_message!(ClientInfo, RawMsg); \ No newline at end of file +to_message!(ClientInfo, RawMsg, JobMeta, JobResult); \ No newline at end of file diff --git a/lib/u_lib/src/errors.rs b/lib/u_lib/src/errors.rs index e388f58..114cac5 100644 --- a/lib/u_lib/src/errors.rs +++ b/lib/u_lib/src/errors.rs @@ -9,11 +9,19 @@ use serde::{ pub type BoxError = Box<(dyn StdError + Send + Sync + 'static)>; pub type UResult = std::result::Result; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum JobErrType { + AlreadyRunning, + Finished, + System +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub enum UErrType { ConnectionError, ParseError, - JobError, + JobError(JobErrType), Unknown, Raw(String) } @@ -38,12 +46,15 @@ impl UError { }) } } + + pub fn new_type(err_type: UErrType) -> Self { + UError::new(err_type, String::new()) + } } impl fmt::Debug for UError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut builder = f.debug_struct("errors::UError"); - builder.field("kind", &self.inner.err_type); builder.field("source", &self.inner.source); builder.finish() @@ -53,13 +64,18 @@ impl fmt::Debug for UError { impl fmt::Display for UError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let e_type = match self.inner.err_type { - UErrType::Raw(ref msg) => msg, - UErrType::ConnectionError => "Connection error", - UErrType::ParseError => "Parse error", - UErrType::JobError => "Job error", - UErrType::Unknown => "Unknown error", + UErrType::Raw(ref msg) => msg.clone(), + UErrType::ConnectionError => "Connection error".to_string(), + UErrType::ParseError => "Parse error".to_string(), + UErrType::JobError(ref inner) => + (String::from("Job error: ") + match *inner { + JobErrType::AlreadyRunning => "job is already running", + JobErrType::Finished => "once-scheduled job is already finished", + JobErrType::System => "system error" + }), + UErrType::Unknown => "Unknown error".to_string(), }; - f.write_str(e_type)?; + f.write_str(&e_type)?; write!(f, ": {}", self.inner.source) } @@ -76,4 +92,4 @@ impl From for UError { }; UError::new(err_type, e.to_string()) } -} \ No newline at end of file +} diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index 00cc3b0..29111f7 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -10,20 +10,138 @@ enum Schedule { Once } +*/ -lazy_static! { - pub static ref EXECUTOR: Vec<> +use crate::{ + contracts::*, + UResult, +}; +use std::{ + sync::{Mutex, MutexGuard, Arc}, + thread, + time::Duration, + collections::HashMap, +}; +use uuid::Uuid; +//use tokio::task::JoinHandle; +use futures::future::{join_all, JoinAll}; +use futures::Future; + + +type Executables = Vec; + +struct AsyncExecutor { + new_jobs: Executables, + promises: Executables } + +impl AsyncExecutor { + pub fn new() -> Self { + Self { + new_jobs: vec![], + promises: vec![] + } + } +/* + pub fn process_jobs(&mut self) { + new_jobs + if job.state() == JobState::Pending { + tokio::spawn(job.run); + } + } */ + pub async fn apply_job(&mut self, mut new_job: Job) -> UResult { + /*let id = new_job.id(); + let mut job_pool = self.jobs.lock().unwrap(); + job_pool.push(new_job); + id*/ + tokio::spawn(async move { + new_job.run().await + }).await.unwrap() + } + + pub async fn apply_jobs(&mut self, new_jobs: Vec) -> Vec> { + /* + let mut job_pool = self.jobs.lock().unwrap(); + job_pool.extend( + new_jobs.into_iter() + .map(|job| (job.id(), job)) + );*/ + let futures = new_jobs.into_iter().map(|mut job| { + job.run() + }).collect::>(); + join_all(futures).await + + } +/* + pub fn get_job_result(&self, id: &Uuid) -> Option { + let mut job_pool = self.jobs.lock().unwrap(); + let (state, result) = match job_pool.get(id) { + Some(job) => (job.state(), job.get_result()), + None => return None + }; + if state == JobState::Finished { + job_pool.remove(&id); + } + Some(result) + } -use crate::contracts::*; + pub fn get_all_results(&self) -> Vec { + let mut job_pool = self.jobs.lock().unwrap(); + let to_remove = job_pool.values() + .filter(|job| job.finished()) + .map(|job| job.id()) + .collect::>(); + let results = job_pool.values() + .map(|job| job.get_result()) + .collect(); + to_remove.into_iter().for_each(|id| { + job_pool.remove(&id); + }); + results + }*/ +} + +lazy_static! { + static ref EXECUTOR: Mutex = + Mutex::new(AsyncExecutor::new()); +} +/* +pub fn get_job_result(id: &Uuid, wait: bool) -> Option { + let executor: MutexGuard = EXECUTOR.lock().unwrap(); + loop { + let result = executor.get_job_result(&id); + if wait { + if result.is_some() && result.as_ref().unwrap().state == JobState::Finished { + return result + } + thread::sleep(Duration::from_secs(1)) + } else { + return result + } + } +} + +pub fn get_all_results() -> Vec { + let executor: MutexGuard = EXECUTOR.lock().unwrap(); + executor.get_all_results() +} +*/ -pub fn execute_jobs(jobs: &mut Vec) { - jobs.iter_mut().for_each(|job| job.run()) +// run jobs without awaiting (now is waiting) +pub async fn send_jobs_to_executor( + jobs: Vec +) -> Vec> { + let mut executor: MutexGuard = EXECUTOR.lock().unwrap(); + let executables = jobs.into_iter() + .map(|job_meta| Job::new(job_meta)) + .collect(); + executor.apply_jobs(executables).await } -pub fn exec_job(job_meta: &mut JobMeta) -> JobResult { - let mut job = Job::new(job_meta); - job.run(); - job.into_result() +// run job and await result +pub async fn exec_job(job_meta: JobMetaRef) -> UResult { + let mut executor: MutexGuard = EXECUTOR.lock().unwrap(); + let job = Job::new(job_meta); + executor.apply_job(job).await } \ No newline at end of file