diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 4d65188..0777a37 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -16,6 +16,8 @@ tokio = { version = "0.2.22", features = ["macros", "process"] } reqwest = { version = "0.10.7", features = ["json"] } futures = "0.3.5" guess_host_triple = "0.1.2" +thiserror = "*" +async-trait = "*" [dependencies.diesel] version = "1.4.5" diff --git a/lib/u_lib/src/errors.rs b/lib/u_lib/src/errors.rs index f7fe1c4..310b56a 100644 --- a/lib/u_lib/src/errors.rs +++ b/lib/u_lib/src/errors.rs @@ -1,95 +1,39 @@ -use std::fmt; -use std::error::Error as StdError; use reqwest::Error as ReqError; use serde::{ Serialize, Deserialize }; +use thiserror::Error; +use uuid::Uuid; -//pub type BoxError = Box<(dyn StdError + Send + Sync + 'static)>; pub type UResult = std::result::Result; +#[derive(Error, Debug, Serialize, Deserialize, Clone)] +pub enum UError { + #[error("Error: {0}")] + Raw(&'static str), -#[derive(Serialize, Deserialize, Clone, Debug)] -pub enum JobErrType { - AlreadyRunning, - Finished, - System -} + #[error("Connection error: {0}")] + ConnectionError(String), -#[derive(Serialize, Deserialize, Clone, Debug)] -pub enum UErrType { - ConnectionError, + #[error("Parse error")] ParseError, - JobError(JobErrType), - Unknown, - Raw(String) -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -struct Inner { - err_type: UErrType, - source: String, -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct UError { - inner: Box -} - -impl UError { - pub fn new(err_type: UErrType, source: String) -> Self { - Self { - inner: Box::new(Inner { - source, - err_type - }) - } - } - pub fn new_type(err_type: UErrType) -> Self { - UError::new(err_type, String::new()) - } -} + #[error("Job error: {0}")] + JobError(String), -impl fmt::Debug for UError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut builder = f.debug_struct("errors::UError"); - builder.field("kind", &self.inner.err_type); - builder.field("source", &self.inner.source); - builder.finish() - } -} + #[error("Job is uncompleted yet")] + JobUncompleted, -impl fmt::Display for UError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let e_type = match self.inner.err_type { - UErrType::Raw(ref msg) => msg.clone(), - UErrType::ConnectionError => "Connection error".to_string(), - UErrType::ParseError => "Parse error".to_string(), - UErrType::JobError(ref inner) => - (String::from("Job error: ") + match *inner { - JobErrType::AlreadyRunning => "job is already running", - JobErrType::Finished => "once-scheduled job is already finished", - JobErrType::System => "system error" - }), - UErrType::Unknown => "Unknown error".to_string(), - }; - f.write_str(&e_type)?; + #[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")] + InsuitablePlatform(String, String), - write!(f, ": {}", self.inner.source) - } + #[error("Task {0} doesn't exist")] + NoTask(Uuid) } impl From for UError { fn from(e: ReqError) -> Self { - let err_type = if e.is_request() { - UErrType::ConnectionError - } else if e.is_decode() { - UErrType::ParseError - } else { - UErrType::Unknown - }; - UError::new(err_type, e.to_string()) + UError::ConnectionError(e.to_string()) } -} +} \ No newline at end of file diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index d32772a..8ba1880 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -1,24 +1,15 @@ // list of jobs: job (cmd, args) OR rust fn OR python func + cron-like timing // job runner (thread) // every job runs in other thread/process -/* -enum Schedule { - Persistent, // run forever, restart if stops (set max_retries) - Cron(CronSchedule), - Once -} -*/ - -use crate::{ - models::*, - UResult, -}; +use crate::{models::*, UResult, UError}; use std::collections::HashMap; use std::pin::Pin; use std::thread::sleep; use std::time::{Duration, Instant}; +use std::slice::Iter; +use std::task::Poll; use tokio::process::Command; use futures::{lock::Mutex, prelude::*}; @@ -26,14 +17,14 @@ use lazy_static::lazy_static; use tokio::{prelude::*, spawn, task::JoinHandle}; use uuid::Uuid; -pub type FutRes = String; +pub type FutRes = UResult; type BoxFut = Pin + Send + 'static>>; lazy_static! { static ref FUT_RESULTS: Mutex>> = Mutex::new(HashMap::new()); } -pub async fn apply_tasks(tasks: Vec + Send + 'static>) -> Vec { +pub async fn append_tasks(tasks: Vec + Send + 'static>) -> Vec { let mut fids = Vec::::new(); for f in tasks.into_iter() { let fid = Uuid::new_v4(); @@ -44,10 +35,46 @@ pub async fn apply_tasks(tasks: Vec + Send + 'static> fids } -pub async fn apply_task(task: impl Future + Send + 'static) -> Uuid { - apply_tasks(vec![Box::pin(task)]).await[0] +pub async fn append_task(task: impl Future + Send + 'static) -> Uuid { + append_tasks(vec![Box::pin(task)]).await[0] +} + +pub async fn pop_task(fid: Uuid) -> JoinHandle { + FUT_RESULTS.lock().await.remove(&fid).expect(&UError::NoTask(fid).to_string()) +} + +async fn task_present(fid: Uuid) -> bool { + FUT_RESULTS.lock().await.get(&fid).is_some() +} + +pub async fn pop_task_if_completed(fid: Uuid) -> Option{ + let task = FUT_RESULTS + .lock() + .await + .get_mut(&fid) + .expect(&UError::NoTask(fid).to_string()); + let status = match task.poll() { + Poll::Pending => None, + Poll::Ready(r) => Some(r.unwrap()) + }; + if status.is_some() { + pop_task(fid); + } + status +} + +pub async fn pop_completed(fids: Option>) -> Vec> { + match fids { + Some(v) => v, + None => FUT_RESULTS.lock() + .await + .keys() + .map(|k| *k) + .collect::>() + }.into_iter().map(pop_task_if_completed).collect() } -pub async fn pop(fid: Uuid) -> Option> { - FUT_RESULTS.lock().await.remove(&fid) +pub async fn run_until_complete(task: impl Future + Send + 'static) -> FutRes { + let task_fid = append_task(task).await; + pop_task(task_fid).await.await.unwrap() } \ No newline at end of file diff --git a/lib/u_lib/src/messaging.rs b/lib/u_lib/src/messaging.rs index c92b84b..03f7b3a 100644 --- a/lib/u_lib/src/messaging.rs +++ b/lib/u_lib/src/messaging.rs @@ -1,7 +1,7 @@ use serde::{ Serialize, Deserialize, - de::DeserializeOwned, + //de::DeserializeOwned, }; use std::{ borrow::Cow, diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 2bf426a..ec1697e 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -9,13 +9,14 @@ use diesel::{ Identifiable, Insertable }; -; + use crate::{ - models::* + models::*, UID, - exec_job, utils::vec_to_string, - models::schema::* + models::schema::*, + executor::*, + jobs::create_job }; use guess_host_triple::guess_host_triple; @@ -56,10 +57,8 @@ pub struct IAgent { pub async fn gather() -> IAgent { async fn run_cmd_fast(cmd: String) -> String { - let job = exec_job( - JobMeta::from_shell_arc(cmd) - ).await; - let job_result = match job.unwrap().data.unwrap() { + let job_result = exec_job(JobMeta::from_shell(cmd)).await; + let job_result = match job_result.unwrap().result.unwrap() { Ok(output) => output.multiline(), Err(e) => e.to_string() }; diff --git a/lib/u_lib/src/models/jobs.rs b/lib/u_lib/src/models/jobs.rs index d086c02..3420587 100644 --- a/lib/u_lib/src/models/jobs.rs +++ b/lib/u_lib/src/models/jobs.rs @@ -2,7 +2,6 @@ use std::{ // process::Command, time::SystemTime, cmp::PartialEq, - sync::{Arc, Mutex, MutexGuard}, }; use serde::{ Serialize, @@ -11,19 +10,12 @@ use serde::{ use uuid::Uuid; use guess_host_triple::guess_host_triple; use tokio::process::Command; -use crate::{ - models::* - UError, - UErrType, - UErrType::JobError, - JobErrType, - UResult, - utils::format_err, - UID -}; +use crate::{models::schema::*, UError, UResult, UID, run_until_complete}; +use async_trait::async_trait; //pub type JobMetaRef = Arc>; + #[derive(Serialize, Deserialize, Clone, Debug)] pub enum ManageAction { Ping, @@ -53,40 +45,10 @@ pub enum JobType { Manage, Shell, Python, - Binary -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct JobMeta { - pub id: Uuid, - pub alias: String, - pub exec_type: JobType, - pub schedule: JobSchedule, - pub platform: String, - pub payload: Option>>, -} - -impl JobMeta { - pub fn from_shell(shell_cmd: String) -> Self { - let job_name = shell_cmd.split(" ").nth(0).unwrap(); - Self { - id: Uuid::new_v4(), - alias: job_name.to_string(), - exec_type: JobType::Shell, - schedule: JobSchedule::Once, - platform: guess_host_triple().unwrap_or("").to_string(), - payload: Some(Box::new(shell_cmd.into_bytes())) - } - } - - pub fn from_shell_arc(shell_cmd: String) -> JobMetaRef { - Arc::new(Mutex::new( - Self::from_shell(shell_cmd) - )) - } + Binary, + Dummy } - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct JobOutput { pub stdout: Vec, @@ -127,8 +89,8 @@ impl JobOutput { let mut result = String::new(); if self.stdout.len() > 0 { result += &format!("{stdout_head}{stdout}\n", - stdout_head = JobOutput::create_delim(JobOutput::STDOUT), - stdout = String::from_utf8_lossy(&self.stdout)) + stdout_head = JobOutput::create_delim(JobOutput::STDOUT), + stdout = String::from_utf8_lossy(&self.stdout)) } if self.stderr.len() > 0 { @@ -159,14 +121,38 @@ impl JobOutput { } else if self.stderr.len() > 0 { self.stderr } else { - format_err("No data").as_bytes().to_vec() + UError::Raw("No data").to_string().as_bytes().to_vec() + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct JobMeta { + pub id: Uuid, + pub alias: String, + pub exec_type: JobType, + pub schedule: JobSchedule, + pub platform: String, + pub payload: Option>>, +} + +impl JobMeta { + pub fn from_shell(shell_cmd: String) -> Self { + let job_name = shell_cmd.split(" ").nth(0).unwrap(); + Self { + id: Uuid::new_v4(), + alias: job_name.to_string(), + exec_type: JobType::Shell, + schedule: JobSchedule::Once, + platform: guess_host_triple().unwrap_or("unknown").to_string(), + payload: Some(Box::new(shell_cmd.into_bytes())) } } } + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct JobResult { - //pub id: i32, pub agent_id: Uuid, pub job_id: Uuid, pub result: Option>, @@ -178,15 +164,10 @@ pub struct JobResult { impl JobResult { pub fn from_meta(meta: &JobMeta) -> Self { let job_id = meta.id.clone(); - let state = meta.state.clone(); JobResult { agent_id: *UID, job_id, - state: if state == JobState::Queued { - JobState::Pending - } else { - state - }, + state: JobState::Running, result: None, retcode: None, timestamp: SystemTime::now() @@ -194,45 +175,53 @@ impl JobResult { } } +impl Default for JobResult { + fn default() -> Self { + Self { + agent_id: Uuid::nil(), + job_id: Uuid::nil(), + result: None, + state: JobState::Running, + retcode: None, + timestamp: SystemTime::now() + } + } +} + pub struct Job { + exec_type: JobType, + payload: Option>>, result: JobResult } impl Job { - pub fn new(job_meta: JobMeta) -> Self { - Self { - result: JobResult::from_meta(&job_meta), + fn build(job_meta: JobMeta) -> UResult { + match job_meta.exec_type { + JobType::Shell => { + let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); + if job_meta.platform != curr_platform { + return Err(UError::InsuitablePlatform(job_meta.platform, curr_platform)) + } + Ok(Self { + exec_type: job_meta.exec_type, + payload: job_meta.payload, + result: JobResult::from_meta(&job_meta) + }) + }, + _ => todo!() } } - pub async fn run(mut self) -> UResult { - match self.exec_type() { + async fn run(mut self) -> UResult { + match job_meta.exec_type { JobType::Shell => { - match self.state() { - JobState::Queued | JobState::Pending => { - self.update_state(Some(JobState::Running)); - }, - JobState::Finished => { - if self.schedule() == JobSchedule::Permanent { - self.update_state(Some(JobState::Running)) - } else { - return Err(UError::new_type( - JobError(JobErrType::Finished) - )) - } - }, - JobState::Running => return Err(UError::new_type( - JobError(JobErrType::AlreadyRunning) - )) - } - - let str_payload = match &self.lock().payload { + let str_payload = match &self.payload { Some(box_payload) => { String::from_utf8_lossy(box_payload).into_owned() } None => unimplemented!() }; - let mut cmd_parts = str_payload + let mut cmd_parts = str_payload // WRONG .split(" ") .map(String::from) .collect::>() @@ -255,65 +244,24 @@ impl Job { } Err(e) => { ( - Some(Err(UError::new( - UErrType::JobError(JobErrType::System), - e.to_string() - ))), + Some(Err(UError::JobError(e.to_string()))), None ) } }; - self.update_state(Some(JobState::Finished)); - self.result.data = data; + self.result.result = data; self.result.retcode = retcode; + self.result.timestamp = SystemTime::now(); }, - _ => unimplemented!() + _ => todo!() } - Ok(self.into_result()) - } - - /// None => state is copied from meta to result field - /// Some => state is applied to both meta and result fields - pub fn update_state(&mut self, state: Option) { - match state { - Some(state) => { - self.meta.lock().unwrap().state = state.clone(); - self.result.state = state; - } - None => { - self.result.state = self.state(); - } - } - } - - fn lock(&self) -> MutexGuard { - self.meta.lock().unwrap() - } - - pub fn id(&self) -> Uuid { - self.lock().id.clone() - } - - pub fn state(&self) -> JobState { - self.lock().state.clone() - } - - pub fn exec_type(&self) -> JobType { - self.lock().exec_type.clone() - } - - pub fn schedule(&self) -> JobSchedule { - self.lock().schedule.clone() - } - - pub fn finished(&self) -> bool { - self.state() == JobState::Finished + Ok(self.result) } +} - pub fn into_result(mut self) -> JobResult { //TODO: Cow - self.result.timestamp = SystemTime::now(); - self.result - } +pub async fn exec_job(job_meta: JobMeta) -> UResult { + let job = create_job(job_meta); + run_until_complete(job.run()).await } @@ -322,7 +270,6 @@ mod tests { use super::*; use crate::{ send_jobs_to_executor, - exec_job, utils::vec_to_string }; diff --git a/lib/u_lib/src/utils.rs b/lib/u_lib/src/utils.rs index 08020e1..5dde561 100644 --- a/lib/u_lib/src/utils.rs +++ b/lib/u_lib/src/utils.rs @@ -48,8 +48,4 @@ pub fn setsig(sig: Signal, hnd: SigHandler) { pub fn vec_to_string(v: &Vec) -> String { String::from_utf8_lossy(v).to_string() -} - -pub fn format_err(s: &str) -> String { - format!("Error: {}", s) } \ No newline at end of file