half-way to nice arch

4-update-check
plazmoid 4 years ago
parent c94912252a
commit 28c3e96966
  1. 2
      lib/u_lib/Cargo.toml
  2. 92
      lib/u_lib/src/errors.rs
  3. 63
      lib/u_lib/src/executor.rs
  4. 2
      lib/u_lib/src/messaging.rs
  5. 15
      lib/u_lib/src/models/agent.rs
  6. 205
      lib/u_lib/src/models/jobs.rs
  7. 4
      lib/u_lib/src/utils.rs

@ -16,6 +16,8 @@ tokio = { version = "0.2.22", features = ["macros", "process"] }
reqwest = { version = "0.10.7", features = ["json"] }
futures = "0.3.5"
guess_host_triple = "0.1.2"
thiserror = "*"
async-trait = "*"
[dependencies.diesel]
version = "1.4.5"

@ -1,95 +1,39 @@
use std::fmt;
use std::error::Error as StdError;
use reqwest::Error as ReqError;
use serde::{
Serialize,
Deserialize
};
use thiserror::Error;
use uuid::Uuid;
//pub type BoxError = Box<(dyn StdError + Send + Sync + 'static)>;
pub type UResult<T> = std::result::Result<T, UError>;
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum UError {
#[error("Error: {0}")]
Raw(&'static str),
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum JobErrType {
AlreadyRunning,
Finished,
System
}
#[error("Connection error: {0}")]
ConnectionError(String),
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum UErrType {
ConnectionError,
#[error("Parse error")]
ParseError,
JobError(JobErrType),
Unknown,
Raw(String)
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct Inner {
err_type: UErrType,
source: String,
}
#[error("Job error: {0}")]
JobError(String),
#[derive(Serialize, Deserialize, Clone)]
pub struct UError {
inner: Box<Inner>
}
#[error("Job is uncompleted yet")]
JobUncompleted,
impl UError {
pub fn new(err_type: UErrType, source: String) -> Self {
Self {
inner: Box::new(Inner {
source,
err_type
})
}
}
#[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")]
InsuitablePlatform(String, String),
pub fn new_type(err_type: UErrType) -> Self {
UError::new(err_type, String::new())
}
}
impl fmt::Debug for UError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut builder = f.debug_struct("errors::UError");
builder.field("kind", &self.inner.err_type);
builder.field("source", &self.inner.source);
builder.finish()
}
}
impl fmt::Display for UError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let e_type = match self.inner.err_type {
UErrType::Raw(ref msg) => msg.clone(),
UErrType::ConnectionError => "Connection error".to_string(),
UErrType::ParseError => "Parse error".to_string(),
UErrType::JobError(ref inner) =>
(String::from("Job error: ") + match *inner {
JobErrType::AlreadyRunning => "job is already running",
JobErrType::Finished => "once-scheduled job is already finished",
JobErrType::System => "system error"
}),
UErrType::Unknown => "Unknown error".to_string(),
};
f.write_str(&e_type)?;
write!(f, ": {}", self.inner.source)
}
#[error("Task {0} doesn't exist")]
NoTask(Uuid)
}
impl From<ReqError> for UError {
fn from(e: ReqError) -> Self {
let err_type = if e.is_request() {
UErrType::ConnectionError
} else if e.is_decode() {
UErrType::ParseError
} else {
UErrType::Unknown
};
UError::new(err_type, e.to_string())
UError::ConnectionError(e.to_string())
}
}

@ -1,24 +1,15 @@
// list of jobs: job (cmd, args) OR rust fn OR python func + cron-like timing
// job runner (thread)
// every job runs in other thread/process
/*
enum Schedule {
Persistent, // run forever, restart if stops (set max_retries)
Cron(CronSchedule),
Once
}
*/
use crate::{
models::*,
UResult,
};
use crate::{models::*, UResult, UError};
use std::collections::HashMap;
use std::pin::Pin;
use std::thread::sleep;
use std::time::{Duration, Instant};
use std::slice::Iter;
use std::task::Poll;
use tokio::process::Command;
use futures::{lock::Mutex, prelude::*};
@ -26,14 +17,14 @@ use lazy_static::lazy_static;
use tokio::{prelude::*, spawn, task::JoinHandle};
use uuid::Uuid;
pub type FutRes = String;
pub type FutRes = UResult<JobResult>;
type BoxFut<O> = Pin<Box<dyn Future<Output = O> + Send + 'static>>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinHandle<FutRes>>> = Mutex::new(HashMap::new());
}
pub async fn apply_tasks(tasks: Vec<impl Future<Output=FutRes> + Send + 'static>) -> Vec<Uuid> {
pub async fn append_tasks(tasks: Vec<impl Future<Output=FutRes> + Send + 'static>) -> Vec<Uuid> {
let mut fids = Vec::<Uuid>::new();
for f in tasks.into_iter() {
let fid = Uuid::new_v4();
@ -44,10 +35,46 @@ pub async fn apply_tasks(tasks: Vec<impl Future<Output=FutRes> + Send + 'static>
fids
}
pub async fn apply_task(task: impl Future<Output=FutRes> + Send + 'static) -> Uuid {
apply_tasks(vec![Box::pin(task)]).await[0]
pub async fn append_task(task: impl Future<Output=FutRes> + Send + 'static) -> Uuid {
append_tasks(vec![Box::pin(task)]).await[0]
}
pub async fn pop_task(fid: Uuid) -> JoinHandle<FutRes> {
FUT_RESULTS.lock().await.remove(&fid).expect(&UError::NoTask(fid).to_string())
}
async fn task_present(fid: Uuid) -> bool {
FUT_RESULTS.lock().await.get(&fid).is_some()
}
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes>{
let task = FUT_RESULTS
.lock()
.await
.get_mut(&fid)
.expect(&UError::NoTask(fid).to_string());
let status = match task.poll() {
Poll::Pending => None,
Poll::Ready(r) => Some(r.unwrap())
};
if status.is_some() {
pop_task(fid);
}
status
}
pub async fn pop_completed(fids: Option<Vec<Uuid>>) -> Vec<Option<FutRes>> {
match fids {
Some(v) => v,
None => FUT_RESULTS.lock()
.await
.keys()
.map(|k| *k)
.collect::<Vec<Uuid>>()
}.into_iter().map(pop_task_if_completed).collect()
}
pub async fn pop(fid: Uuid) -> Option<JoinHandle<FutRes>> {
FUT_RESULTS.lock().await.remove(&fid)
pub async fn run_until_complete(task: impl Future<Output=FutRes> + Send + 'static) -> FutRes {
let task_fid = append_task(task).await;
pop_task(task_fid).await.await.unwrap()
}

@ -1,7 +1,7 @@
use serde::{
Serialize,
Deserialize,
de::DeserializeOwned,
//de::DeserializeOwned,
};
use std::{
borrow::Cow,

@ -9,13 +9,14 @@ use diesel::{
Identifiable,
Insertable
};
;
use crate::{
models::*
models::*,
UID,
exec_job,
utils::vec_to_string,
models::schema::*
models::schema::*,
executor::*,
jobs::create_job
};
use guess_host_triple::guess_host_triple;
@ -56,10 +57,8 @@ pub struct IAgent {
pub async fn gather() -> IAgent {
async fn run_cmd_fast(cmd: String) -> String {
let job = exec_job(
JobMeta::from_shell_arc(cmd)
).await;
let job_result = match job.unwrap().data.unwrap() {
let job_result = exec_job(JobMeta::from_shell(cmd)).await;
let job_result = match job_result.unwrap().result.unwrap() {
Ok(output) => output.multiline(),
Err(e) => e.to_string()
};

@ -2,7 +2,6 @@ use std::{
// process::Command,
time::SystemTime,
cmp::PartialEq,
sync::{Arc, Mutex, MutexGuard},
};
use serde::{
Serialize,
@ -11,19 +10,12 @@ use serde::{
use uuid::Uuid;
use guess_host_triple::guess_host_triple;
use tokio::process::Command;
use crate::{
models::*
UError,
UErrType,
UErrType::JobError,
JobErrType,
UResult,
utils::format_err,
UID
};
use crate::{models::schema::*, UError, UResult, UID, run_until_complete};
use async_trait::async_trait;
//pub type JobMetaRef = Arc<Mutex<JobMeta>>;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
Ping,
@ -53,40 +45,10 @@ pub enum JobType {
Manage,
Shell,
Python,
Binary
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobMeta {
pub id: Uuid,
pub alias: String,
pub exec_type: JobType,
pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Box<Vec<u8>>>,
}
impl JobMeta {
pub fn from_shell(shell_cmd: String) -> Self {
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
id: Uuid::new_v4(),
alias: job_name.to_string(),
exec_type: JobType::Shell,
schedule: JobSchedule::Once,
platform: guess_host_triple().unwrap_or("").to_string(),
payload: Some(Box::new(shell_cmd.into_bytes()))
}
}
pub fn from_shell_arc(shell_cmd: String) -> JobMetaRef {
Arc::new(Mutex::new(
Self::from_shell(shell_cmd)
))
}
Binary,
Dummy
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobOutput {
pub stdout: Vec<u8>,
@ -127,8 +89,8 @@ impl JobOutput {
let mut result = String::new();
if self.stdout.len() > 0 {
result += &format!("{stdout_head}{stdout}\n",
stdout_head = JobOutput::create_delim(JobOutput::STDOUT),
stdout = String::from_utf8_lossy(&self.stdout))
stdout_head = JobOutput::create_delim(JobOutput::STDOUT),
stdout = String::from_utf8_lossy(&self.stdout))
}
if self.stderr.len() > 0 {
@ -159,14 +121,38 @@ impl JobOutput {
} else if self.stderr.len() > 0 {
self.stderr
} else {
format_err("No data").as_bytes().to_vec()
UError::Raw("No data").to_string().as_bytes().to_vec()
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobMeta {
pub id: Uuid,
pub alias: String,
pub exec_type: JobType,
pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Box<Vec<u8>>>,
}
impl JobMeta {
pub fn from_shell(shell_cmd: String) -> Self {
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
id: Uuid::new_v4(),
alias: job_name.to_string(),
exec_type: JobType::Shell,
schedule: JobSchedule::Once,
platform: guess_host_triple().unwrap_or("unknown").to_string(),
payload: Some(Box::new(shell_cmd.into_bytes()))
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobResult {
//pub id: i32,
pub agent_id: Uuid,
pub job_id: Uuid,
pub result: Option<Result<JobOutput, UError>>,
@ -178,15 +164,10 @@ pub struct JobResult {
impl JobResult {
pub fn from_meta(meta: &JobMeta) -> Self {
let job_id = meta.id.clone();
let state = meta.state.clone();
JobResult {
agent_id: *UID,
job_id,
state: if state == JobState::Queued {
JobState::Pending
} else {
state
},
state: JobState::Running,
result: None,
retcode: None,
timestamp: SystemTime::now()
@ -194,45 +175,53 @@ impl JobResult {
}
}
impl Default for JobResult {
fn default() -> Self {
Self {
agent_id: Uuid::nil(),
job_id: Uuid::nil(),
result: None,
state: JobState::Running,
retcode: None,
timestamp: SystemTime::now()
}
}
}
pub struct Job {
exec_type: JobType,
payload: Option<Box<Vec<u8>>>,
result: JobResult
}
impl Job {
pub fn new(job_meta: JobMeta) -> Self {
Self {
result: JobResult::from_meta(&job_meta),
fn build(job_meta: JobMeta) -> UResult<Self> {
match job_meta.exec_type {
JobType::Shell => {
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string();
if job_meta.platform != curr_platform {
return Err(UError::InsuitablePlatform(job_meta.platform, curr_platform))
}
Ok(Self {
exec_type: job_meta.exec_type,
payload: job_meta.payload,
result: JobResult::from_meta(&job_meta)
})
},
_ => todo!()
}
}
pub async fn run(mut self) -> UResult<JobResult> {
match self.exec_type() {
async fn run(mut self) -> UResult<JobResult> {
match job_meta.exec_type {
JobType::Shell => {
match self.state() {
JobState::Queued | JobState::Pending => {
self.update_state(Some(JobState::Running));
},
JobState::Finished => {
if self.schedule() == JobSchedule::Permanent {
self.update_state(Some(JobState::Running))
} else {
return Err(UError::new_type(
JobError(JobErrType::Finished)
))
}
},
JobState::Running => return Err(UError::new_type(
JobError(JobErrType::AlreadyRunning)
))
}
let str_payload = match &self.lock().payload {
let str_payload = match &self.payload {
Some(box_payload) => {
String::from_utf8_lossy(box_payload).into_owned()
}
None => unimplemented!()
};
let mut cmd_parts = str_payload
let mut cmd_parts = str_payload // WRONG
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
@ -255,65 +244,24 @@ impl Job {
}
Err(e) => {
(
Some(Err(UError::new(
UErrType::JobError(JobErrType::System),
e.to_string()
))),
Some(Err(UError::JobError(e.to_string()))),
None
)
}
};
self.update_state(Some(JobState::Finished));
self.result.data = data;
self.result.result = data;
self.result.retcode = retcode;
self.result.timestamp = SystemTime::now();
},
_ => unimplemented!()
_ => todo!()
}
Ok(self.into_result())
}
/// None => state is copied from meta to result field
/// Some => state is applied to both meta and result fields
pub fn update_state(&mut self, state: Option<JobState>) {
match state {
Some(state) => {
self.meta.lock().unwrap().state = state.clone();
self.result.state = state;
}
None => {
self.result.state = self.state();
}
}
}
fn lock(&self) -> MutexGuard<JobMeta> {
self.meta.lock().unwrap()
}
pub fn id(&self) -> Uuid {
self.lock().id.clone()
}
pub fn state(&self) -> JobState {
self.lock().state.clone()
}
pub fn exec_type(&self) -> JobType {
self.lock().exec_type.clone()
}
pub fn schedule(&self) -> JobSchedule {
self.lock().schedule.clone()
}
pub fn finished(&self) -> bool {
self.state() == JobState::Finished
Ok(self.result)
}
}
pub fn into_result(mut self) -> JobResult { //TODO: Cow
self.result.timestamp = SystemTime::now();
self.result
}
pub async fn exec_job(job_meta: JobMeta) -> UResult<JobResult> {
let job = create_job(job_meta);
run_until_complete(job.run()).await
}
@ -322,7 +270,6 @@ mod tests {
use super::*;
use crate::{
send_jobs_to_executor,
exec_job,
utils::vec_to_string
};

@ -49,7 +49,3 @@ pub fn setsig(sig: Signal, hnd: SigHandler) {
pub fn vec_to_string(v: &Vec<u8>) -> String {
String::from_utf8_lossy(v).to_string()
}
pub fn format_err(s: &str) -> String {
format!("Error: {}", s)
}
Loading…
Cancel
Save