You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

400 lines
11 KiB

use std::{
// process::Command,
time::SystemTime,
cmp::PartialEq,
sync::{Arc, Mutex, MutexGuard},
};
use serde::{
Serialize,
Deserialize
};
use uuid::Uuid;
use tokio::process::Command;
use super::*;
use crate::{
UError,
UErrType,
UErrType::JobError,
JobErrType,
UResult,
utils::format_err
};
pub type JobMetaRef = Arc<Mutex<JobMeta>>;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
Ping,
UpdateAvailable,
JobsResultsRequest,
Terminate
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobSchedule {
Once,
Permanent,
//TODO: Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
Pending, // client got a job, but not running yet
Running, // client is currently running a job
// Rerunning, // if job is cycled
Finished,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum JobType {
Manage(ManageAction),
Shell,
Python,
Binary
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobMeta {
pub id: Uuid,
pub name: String,
pub created: SystemTime,
pub updated: SystemTime,
pub state: JobState,
pub exec_type: JobType,
pub schedule: JobSchedule,
//pub append_result: bool, //true: append, false: rewrite
pub payload: Option<Box<Vec<u8>>>,
}
impl JobMeta {
pub fn from_shell(shell_cmd: String) -> Self {
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
id: Uuid::new_v4(),
name: job_name.to_string(),
created: SystemTime::now(),
updated: SystemTime::now(),
state: JobState::Pending,
exec_type: JobType::Shell,
schedule: JobSchedule::Once,
payload: Some(Box::new(shell_cmd.into_bytes()))
}
}
pub fn from_shell_arc(shell_cmd: String) -> JobMetaRef {
Arc::new(Mutex::new(
Self::from_shell(shell_cmd)
))
}
pub fn touch(&mut self) {
self.updated = SystemTime::now();
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
impl JobOutput {
const STREAM_BORDER: &'static str = "***";
const STDOUT: &'static str = "STDOUT";
const STDERR: &'static str = "STDERR";
#[inline]
fn create_delim(header: &'static str) -> String {
format!("{border} {head} {border}\n",
border = JobOutput::STREAM_BORDER,
head = header
)
}
pub fn new() -> Self {
Self {
stdout: Vec::new(),
stderr: Vec::new(),
}
}
pub fn stdout(mut self, data: Vec<u8>) -> Self {
self.stdout = data;
self
}
pub fn stderr(mut self, data: Vec<u8>) -> Self {
self.stderr = data;
self
}
pub fn multiline(&self) -> String {
let mut result = String::new();
if self.stdout.len() > 0 {
result += &format!("{stdout_head}{stdout}\n",
stdout_head = JobOutput::create_delim(JobOutput::STDOUT),
stdout = String::from_utf8_lossy(&self.stdout))
}
if self.stderr.len() > 0 {
result += &format!("{stderr_head}{stderr}\n",
stderr_head = JobOutput::create_delim(JobOutput::STDERR),
stderr = String::from_utf8_lossy(&self.stderr))
}
result
}
pub fn from_multiline(raw: &String) -> Option<Self> {
let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| {
let mut parts = s.split(&err_header)
.map(|d| Vec::from(d.trim().as_bytes()))
.collect::<Vec<Vec<u8>>>()
.into_iter();
JobOutput::new()
.stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(vec![]))
})
}
pub fn into_appropriate(self) -> Vec<u8> {
if self.stdout.len() > 0 {
self.stdout
} else if self.stderr.len() > 0 {
self.stderr
} else {
format_err("No data").as_bytes().to_vec()
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobResult {
pub id: Uuid,
pub data: Option<Result<JobOutput, UError>>,
pub state: JobState,
pub retcode: Option<i32>,
pub timestamp: SystemTime,
}
pub struct Job {
result: JobResult,
meta: JobMetaRef,
}
impl Job {
pub fn new(job_meta: JobMetaRef) -> Self {
let id = job_meta.lock().unwrap().id.clone();
let state = job_meta.lock().unwrap().state.clone();
Self {
result: JobResult {
id,
state: if state == JobState::Queued {
JobState::Pending
} else {
state
},
data: None,
retcode: None,
timestamp: SystemTime::now()
},
meta: job_meta,
}
}
pub async fn run(mut self) -> UResult<JobResult> {
match self.exec_type() {
JobType::Shell => {
match self.state() {
JobState::Queued | JobState::Pending => {
self.update_state(Some(JobState::Running));
},
JobState::Finished => {
if self.schedule() == JobSchedule::Permanent {
self.update_state(Some(JobState::Running))
} else {
return Err(UError::new_type(
JobError(JobErrType::Finished)
))
}
},
JobState::Running => return Err(UError::new_type(
JobError(JobErrType::AlreadyRunning)
))
}
let str_payload = match &self.lock().payload {
Some(box_payload) => {
String::from_utf8_lossy(box_payload).into_owned()
}
None => unimplemented!()
};
let mut cmd_parts = str_payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
.into_iter();
let cmd = cmd_parts.nth(0).unwrap();
let args = cmd_parts.collect::<Vec<_>>();
let cmd_result = Command::new(cmd)
.args(args)
.output()
.await;
let (data, retcode) = match cmd_result {
Ok(output) => {
(
Some(Ok(JobOutput::new()
.stdout(output.stdout.to_vec())
.stderr(output.stderr.to_vec()))
),
output.status.code()
)
}
Err(e) => {
(
Some(Err(UError::new(
UErrType::JobError(JobErrType::System),
e.to_string()
))),
None
)
}
};
self.update_state(Some(JobState::Finished));
self.result.data = data;
self.result.retcode = retcode;
},
_ => unimplemented!()
}
Ok(self.into_result())
}
/// None => state is copied from meta to result field
/// Some => state is applied to both meta and result fields
pub fn update_state(&mut self, state: Option<JobState>) {
match state {
Some(state) => {
self.meta.lock().unwrap().state = state.clone();
self.result.state = state;
}
None => {
self.result.state = self.state();
}
}
}
fn lock(&self) -> MutexGuard<JobMeta> {
self.meta.lock().unwrap()
}
pub fn id(&self) -> Uuid {
self.lock().id.clone()
}
pub fn state(&self) -> JobState {
self.lock().state.clone()
}
pub fn exec_type(&self) -> JobType {
self.lock().exec_type.clone()
}
pub fn schedule(&self) -> JobSchedule {
self.lock().schedule.clone()
}
pub fn finished(&self) -> bool {
self.state() == JobState::Finished
}
pub fn into_result(mut self) -> JobResult {
self.result.timestamp = SystemTime::now();
self.result
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
send_jobs_to_executor,
exec_job,
utils::vec_to_string
};
#[tokio::test]
async fn test_is_really_async() {
let secs_to_sleep = 1;
let job = JobMeta::from_shell_arc(format!("sleep {}", secs_to_sleep));
let sleep_jobs = vec![job.clone(), job.clone(), job.clone()];
let now = SystemTime::now();
send_jobs_to_executor(sleep_jobs).await;
assert_eq!(now.elapsed().unwrap().as_secs(), secs_to_sleep)
}
#[tokio::test]
async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell_arc("whoami".into());
let job_result = exec_job(job.clone()).await.unwrap();
assert_eq!(
vec_to_string(&job_result.data.unwrap()?.stdout).trim(),
"plazmoid"
);
Ok(())
}
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell_arc("lol_kek_puk".into());
let job_result = exec_job(job.clone()).await.unwrap();
assert!(job_result.data.unwrap().is_err());
assert_eq!(job_result.retcode, None);
Ok(())
}
#[test]
fn test_to_multiline() {
let mut output = JobOutput::new();
output.stdout = b"lol".to_vec();
output.stderr = b"kek".to_vec();
assert_eq!(
output.multiline(),
String::from(
"*** STDOUT ***\n\
lol\n\
*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_to_multiline_stderr_only() {
let mut output = JobOutput::new();
output.stderr = b"kek".to_vec();
assert_eq!(
output.multiline(),
String::from(
"*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_from_multiline() {
let txt = "*** STDOUT ***\n\
puk\n".to_string();
let output = JobOutput::from_multiline(&txt).unwrap();
assert_eq!(
output.stdout,
b"puk".to_vec()
);
assert_eq!(output.stderr.len(), 0);
}
}