split jobs into submodules, reworked db interface

4-update-check
plazmoid 4 years ago
parent 0cad9c107a
commit 35ca8d0a78
  1. 3
      .cargo/config.toml
  2. 1
      bin/u_server/Cargo.toml
  3. 142
      bin/u_server/src/db.rs
  4. 16
      bin/u_server/src/errors.rs
  5. 136
      bin/u_server/src/handlers.rs
  6. 92
      bin/u_server/src/main.rs
  7. 1
      lib/u_lib/Cargo.toml
  8. 31
      lib/u_lib/src/api.rs
  9. 21
      lib/u_lib/src/errors.rs
  10. 42
      lib/u_lib/src/executor.rs
  11. 18
      lib/u_lib/src/lib.rs
  12. 43
      lib/u_lib/src/models/agent.rs
  13. 570
      lib/u_lib/src/models/jobs.rs
  14. 116
      lib/u_lib/src/models/jobs/builder.rs
  15. 47
      lib/u_lib/src/models/jobs/cache.rs
  16. 68
      lib/u_lib/src/models/jobs/meta.rs
  17. 164
      lib/u_lib/src/models/jobs/misc.rs
  18. 7
      lib/u_lib/src/models/jobs/mod.rs
  19. 155
      lib/u_lib/src/models/jobs/result.rs

@ -0,0 +1,3 @@
[build]
target = "x86_64-unknown-linux-gnu" # -musl"

@ -11,6 +11,7 @@ log = "0.4.11"
thiserror = "*"
warp = "0.2.4"
uuid = { version = "0.6.5", features = ["serde", "v4"] }
once_cell = "1.7.2"
[dependencies.diesel]
features = ["postgres", "uuid"]

@ -1,39 +1,56 @@
use diesel::{
pg::PgConnection,
prelude::*,
result::Error as DslError
};
use diesel::{pg::PgConnection, prelude::*, result::Error as DslError};
use dotenv::dotenv;
use once_cell::sync::OnceCell;
use std::{
env,
sync::{Arc, Mutex}
};
use crate::{
errors::*
sync::{Arc, Mutex, MutexGuard},
};
use u_lib::{
models::*
models::{schema, Agent, ExactJob, IAgent, JobMeta, JobState},
ULocalError, ULocalResult,
};
use uuid::Uuid;
pub type Storage = Arc<Mutex<UDB>>;
pub struct UDB {
pub conn: PgConnection
pub conn: PgConnection,
}
impl UDB {
pub fn new() -> USrvResult<Storage> {
static DB: OnceCell<Arc<Mutex<UDB>>> = OnceCell::new();
pub fn lock_db() -> MutexGuard<'static, UDB> {
DB.get_or_init(|| {
dotenv().unwrap();
let db_path = env::var("DATABASE_URL").unwrap();
let conn = PgConnection::establish(&db_path).unwrap();
let instance = UDB {
conn
let instance = UDB { conn };
Arc::new(Mutex::new(instance))
})
.lock()
.unwrap()
}
impl UDB {
pub fn insert_jobs(&self, job_metas: &Vec<JobMeta>) -> ULocalResult<()> {
use schema::jobs;
diesel::insert_into(jobs::table)
.values(job_metas)
.execute(&self.conn)?;
Ok(())
}
pub fn get_jobs(&self, uid: Option<Uuid>) -> ULocalResult<Vec<JobMeta>> {
use schema::jobs;
let result = if uid.is_some() {
jobs::table
.filter(jobs::id.eq(uid.unwrap()))
.get_results::<JobMeta>(&self.conn)?
} else {
jobs::table.load::<JobMeta>(&self.conn)?
};
Ok(Arc::new(Mutex::new(instance)))
Ok(result)
}
pub fn new_agent(&self, agent: &IAgent) -> USrvResult<()> {
pub fn insert_agents(&self, agent: &IAgent) -> ULocalResult<()> {
use schema::agents;
diesel::insert_into(agents::table)
.values(agent)
@ -41,33 +58,19 @@ impl UDB {
Ok(())
}
pub fn get_agents(&self, uid: Option<Uuid>) -> USrvResult<Vec<Agent>> {
pub fn get_agents(&self, uid: Option<Uuid>) -> ULocalResult<Vec<Agent>> {
use schema::agents;
let result = if uid.is_some() {
agents::table
.filter(agents::id.eq(uid.unwrap()))
.load::<Agent>(&self.conn)?
} else {
agents::table
.load::<Agent>(&self.conn)?
agents::table.load::<Agent>(&self.conn)?
};
Ok(result)
}
pub fn get_jobs(&self, uid: Option<Uuid>) -> USrvResult<Vec<JobMeta>> {
use schema::jobs;
let result = if uid.is_some() {
jobs::table
.filter(jobs::id.eq(uid.unwrap()))
.get_results::<JobMeta>(&self.conn)?
} else {
jobs::table
.load::<JobMeta>(&self.conn)?
};
Ok(result)
}
pub fn update_job_status(&self, uid: Uuid, status: JobState) -> USrvResult<()> {
pub fn update_job_status(&self, uid: Uuid, status: JobState) -> ULocalResult<()> {
use schema::results;
diesel::update(results::table)
.filter(results::id.eq(uid))
@ -76,63 +79,62 @@ impl UDB {
Ok(())
}
pub fn get_agent_jobs(&self, uid: Option<Uuid>, personal: bool) -> USrvResult<Vec<ExactJob>> {
//TODO: filters possibly could work in a wrong way, check
pub fn get_exact_jobs(&self, uid: Option<Uuid>, personal: bool) -> ULocalResult<Vec<ExactJob>> {
use schema::results;
let mut q = results::table
.into_boxed();
let mut q = results::table.into_boxed();
if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap()))
}
if personal {
q = q.filter(
results::state.eq(JobState::Queued)
.and(results::agent_id.eq(uid.unwrap()))
results::state
.eq(JobState::Queued)
.and(results::agent_id.eq(uid.unwrap())),
)
} else if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap()))
q = q
.filter(results::agent_id.eq(uid.unwrap()))
.or_filter(results::job_id.eq(uid.unwrap()))
.or_filter(results::id.eq(uid.unwrap()))
}
let result = q.load::<ExactJob>(&self.conn)?;
Ok(result)
}
pub fn add_jobs(&self, job_metas: &Vec<JobMeta>) -> USrvResult<()> {
use schema::jobs;
diesel::insert_into(jobs::table)
.values(job_metas)
.execute(&self.conn)?;
Ok(())
}
pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &Vec<Uuid>) -> USrvResult<()> {
pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &Vec<Uuid>) -> ULocalResult<()> {
use schema::{agents::dsl::agents, jobs::dsl::jobs, results};
if let Err(DslError::NotFound) = agents.find(agent_uid).first::<Agent>(&self.conn) {
return Err(USrvError::NotFound(agent_uid.to_string()))
return Err(ULocalError::NotFound(agent_uid.to_string()));
}
let not_found_jobs = job_uids.iter().filter_map(|job_uid| {
let not_found_jobs = job_uids
.iter()
.filter_map(|job_uid| {
if let Err(DslError::NotFound) = jobs.find(job_uid).first::<JobMeta>(&self.conn) {
Some(job_uid.to_string())
} else { None }
}).collect::<Vec<String>>();
} else {
None
}
})
.collect::<Vec<String>>();
if not_found_jobs.len() > 0 {
return Err(USrvError::NotFound(not_found_jobs.join(", ")));
return Err(ULocalError::NotFound(not_found_jobs.join(", ")));
}
let job_requests = job_uids.iter().map(|job_uid| {
ExactJob {
let job_requests = job_uids
.iter()
.map(|job_uid| ExactJob {
job_id: *job_uid,
agent_id: *agent_uid,
..Default::default()
}
}).collect::<Vec<ExactJob>>();
})
.collect::<Vec<ExactJob>>();
diesel::insert_into(results::table)
.values(&job_requests)
.execute(&self.conn)?;
Ok(())
}
pub fn del_jobs(&self, uids: &Vec<Uuid>) -> USrvResult<usize> {
pub fn del_jobs(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::jobs;
let mut affected = 0;
for &uid in uids {
@ -144,24 +146,24 @@ impl UDB {
Ok(affected)
}
pub fn del_agents(&self, uids: &Vec<Uuid>) -> USrvResult<usize> {
use schema::agents;
pub fn del_results(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::results;
let mut affected = 0;
for &uid in uids {
let deleted = diesel::delete(agents::table)
.filter(agents::id.eq(uid))
let deleted = diesel::delete(results::table)
.filter(results::id.eq(uid))
.execute(&self.conn)?;
affected += deleted;
}
Ok(affected)
}
pub fn del_results(&self, uids: &Vec<Uuid>) -> USrvResult<usize> {
use schema::results;
pub fn del_agents(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::agents;
let mut affected = 0;
for &uid in uids {
let deleted = diesel::delete(results::table)
.filter(results::id.eq(uid))
let deleted = diesel::delete(agents::table)
.filter(agents::id.eq(uid))
.execute(&self.conn)?;
affected += deleted;
}

@ -1,16 +0,0 @@
use thiserror::Error;
use diesel::result::Error as DslError;
pub type USrvResult<T> = Result<T, USrvError>;
#[derive(Error, Debug)]
pub enum USrvError {
#[error("{0} is not found")]
NotFound(String),
#[error("Error processing {0}")]
ProcessingError(String),
#[error(transparent)]
DBError(#[from] DslError)
}

@ -1,119 +1,78 @@
use crate::db::{lock_db, UDB};
use diesel::SaveChangesDsl;
use std::fmt::Display;
use u_lib::{
models::*
messaging::{BaseMessage, ToMsg},
models::{ExactJob, IAgent, JobMeta, JobState},
ULocalError,
};
use uuid::Uuid;
use warp::{
Rejection,
Reply,
reply::{with_status},
http::{StatusCode, Response}
http::{Response, StatusCode},
reply::with_status,
Rejection, Reply,
};
use crate::db::{
Storage,
UDB,
};
use uuid::Uuid;
use diesel::SaveChangesDsl;
use crate::errors::USrvError;
fn build_response<S: Display>(code: StatusCode, body: S) -> Result<Response<String>, Rejection> {
Ok(Response::builder()
.status(code)
.body(format!("{}", body)).unwrap())
.body(format!("{}", body))
.unwrap())
}
fn build_empty_200() -> Result<Response<String>, Rejection> {
build_response(StatusCode::OK, "")
}
pub async fn add_agent(
msg: BaseMessage<'_, IAgent>,
db: Storage) -> Result<impl Reply, Rejection>
{
match db.lock()
.unwrap()
.new_agent(&msg.into_inner()) {
pub async fn add_agent(msg: BaseMessage<'_, IAgent>) -> Result<impl Reply, Rejection> {
match lock_db().insert_agents(&msg.into_inner()) {
Ok(_) => build_empty_200(),
Err(e) => build_response(StatusCode::BAD_REQUEST, e)
Err(e) => build_response(StatusCode::BAD_REQUEST, e),
}
}
pub async fn get_agents(
uid: Option<Uuid>,
db: Storage
) -> Result<impl Reply, Rejection> {
match db.lock()
.unwrap()
.get_agents(uid) {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
Err(e) => Err(warp::reject())
pub async fn get_agents(uid: Option<Uuid>) -> Result<impl Reply, Rejection> {
match lock_db().get_agents(uid) {
Ok(r) => Ok(warp::reply::json(&r.as_message())),
Err(e) => Err(warp::reject()),
}
}
pub async fn get_jobs(
uid: Option<Uuid>,
db: Storage) -> Result<impl Reply, Rejection>
{
match db.lock()
.unwrap()
.get_jobs(uid) {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
Err(e) => Err(warp::reject())
pub async fn get_jobs(uid: Option<Uuid>) -> Result<impl Reply, Rejection> {
match lock_db().get_jobs(uid) {
Ok(r) => Ok(warp::reply::json(&r.as_message())),
Err(e) => Err(warp::reject()),
}
}
pub async fn get_agent_jobs(
uid: Option<Uuid>,
db: Storage,
personal: bool) -> Result<impl Reply, Rejection>
{
let result = db.lock()
.unwrap()
.get_agent_jobs(uid, personal);
pub async fn get_agent_jobs(uid: Option<Uuid>, personal: bool) -> Result<impl Reply, Rejection> {
let result = lock_db().get_exact_jobs(uid, personal);
match result {
Ok(r) => {
let _db = db.lock().unwrap();
let _db = lock_db();
for j in r.iter() {
_db.update_job_status(j.id, JobState::Running).ok();
}
Ok(warp::reply::json(
&r.as_message()
))
},
Err(e) => Err(warp::reject())
Ok(warp::reply::json(&r.as_message()))
}
Err(e) => Err(warp::reject()),
}
}
pub async fn upload_jobs(
msg: BaseMessage<'_, Vec<JobMeta>>,
db: Storage) -> Result<impl Reply, Rejection>
{
match db.lock()
.unwrap()
.add_jobs(&msg.into_inner()) {
pub async fn upload_jobs(msg: BaseMessage<'_, Vec<JobMeta>>) -> Result<impl Reply, Rejection> {
match lock_db().insert_jobs(&msg.into_inner()) {
Ok(_) => build_empty_200(),
Err(e) => build_response(StatusCode::BAD_REQUEST, e)
Err(e) => build_response(StatusCode::BAD_REQUEST, e),
}
}
pub async fn del(
uid: Uuid,
db: Storage) -> Result<impl Reply, Rejection>
{
let db = db.lock().unwrap();
let del_fns = &[
UDB::del_agents,
UDB::del_jobs,
UDB::del_results,
];
pub async fn del(uid: Uuid) -> Result<impl Reply, Rejection> {
let db = lock_db();
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns {
let affected = del_fn(&db, &vec![uid]).unwrap();
if affected > 0 {
return build_response(StatusCode::OK, affected)
return build_response(StatusCode::OK, affected);
}
}
build_response(StatusCode::BAD_REQUEST, 0)
@ -122,33 +81,30 @@ pub async fn del(
pub async fn set_jobs(
agent_uid: Uuid,
msg: BaseMessage<'_, Vec<Uuid>>,
db: Storage) -> Result<impl Reply, Rejection>
{
match db.lock()
.unwrap()
.set_jobs_for_agent(&agent_uid, &msg.into_inner()) {
) -> Result<impl Reply, Rejection> {
match lock_db().set_jobs_for_agent(&agent_uid, &msg.into_inner()) {
Ok(_) => build_empty_200(),
Err(e) => build_response(StatusCode::BAD_REQUEST, e)
Err(e) => build_response(StatusCode::BAD_REQUEST, e),
}
}
pub async fn report(
msg: BaseMessage<'_, Vec<ExactJob>>,
db: Storage) -> Result<impl Reply, Rejection>
{
let db = db.lock().unwrap();
pub async fn report(msg: BaseMessage<'_, Vec<ExactJob>>) -> Result<impl Reply, Rejection> {
let db = lock_db();
let id = msg.id;
let mut failed = vec![];
for res in msg.into_inner() {
if id != res.agent_id {
continue
continue;
}
if let Err(e) = res.save_changes::<ExactJob>(&db.conn).map_err(USrvError::from) {
if let Err(e) = res
.save_changes::<ExactJob>(&db.conn)
.map_err(ULocalError::from)
{
failed.push(e.to_string())
}
}
if failed.len() > 0 {
let err_msg = USrvError::ProcessingError(failed.join(", "));
let err_msg = ULocalError::ProcessingError(failed.join(", "));
return build_response(StatusCode::BAD_REQUEST, err_msg);
}
build_empty_200()

@ -1,102 +1,89 @@
mod handlers;
mod db;
mod errors;
mod handlers;
use warp::{
Filter,
Rejection,
Reply,
body
};
use warp::{body, Filter, Rejection, Reply};
extern crate log;
extern crate env_logger;
extern crate log;
use u_lib::{
MASTER_PORT,
api::Paths,
models::*
};
use db::*;
use serde::{
de::DeserializeOwned
};
use db::lock_db;
use serde::de::DeserializeOwned;
use u_lib::{api::Paths, config::MASTER_PORT, models::*};
use uuid::Uuid;
fn get_content<M>()
-> impl Filter<Extract = (BaseMessage<'static, M>,),
Error = Rejection> + Clone
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: ToMsg + Sync + Send + DeserializeOwned + 'static
M: ToMsg + Sync + Send + DeserializeOwned + 'static,
{
body::content_length_limit(1024*64)
.and(body::json::<BaseMessage<M>>())
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
}
fn init() {
env_logger::init();
lock_db();
}
#[tokio::main]
async fn main() {
env_logger::init();
let base_db = UDB::new().unwrap();
let db = warp::any().map(move || base_db.clone());
let infallible_none = |_| async {
Ok::<(Option<Uuid>,), std::convert::Infallible>((None,))
};
init();
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) };
let new_agent = warp::post()
.and(warp::path(Paths::init))
.and(get_content::<IAgent>())
.and(db.clone())
.and_then(handlers::add_agent);
let get_agents = warp::get()
.and(warp::path(Paths::get_agents))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none))
.and(db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(handlers::get_agents);
let upload_jobs = warp::post()
.and(warp::path(Paths::upload_jobs))
.and(get_content::<Vec<JobMeta>>())
.and(db.clone())
.and_then(handlers::upload_jobs);
let get_jobs = warp::get()
.and(warp::path(Paths::get_jobs))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none))
.and(db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(handlers::get_jobs);
let get_agent_jobs = warp::get()
.and(warp::path(Paths::get_agent_jobs))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none))
.and(db.clone())
.and_then(|uid, db| handlers::get_agent_jobs(uid, db, false));
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(|uid| handlers::get_agent_jobs(uid, false));
let get_personal_jobs = warp::get()
.and(warp::path(Paths::get_agent_jobs))
.and(warp::path::param::<Uuid>().map(Some))
.and(db.clone())
.and_then(|uid, db| handlers::get_agent_jobs(uid, db, true));
.and_then(|uid| handlers::get_agent_jobs(uid, true));
let del = warp::get()
.and(warp::path(Paths::del))
.and(warp::path::param::<Uuid>())
.and(db.clone())
.and_then(handlers::del);
let set_jobs = warp::post()
.and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<Uuid>>())
.and(db.clone())
.and_then(handlers::set_jobs);
let report = warp::post()
.and(warp::path(Paths::report))
.and(get_content::<Vec<ExactJob>>())
.and(db.clone())
.and_then(handlers::report);
let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
@ -104,26 +91,23 @@ async fn main() {
let agent_zone = new_agent
.or(get_jobs.clone())
.or(get_personal_jobs)
.or(report)
;
.or(report);
let auth_zone = auth_token
.and(
let auth_zone = auth_token.and(
get_agents
.or(get_jobs)
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs)
.or(get_agent_jobs),
);
let routes = agent_zone
.or(auth_zone);
let routes = agent_zone.or(auth_zone);
warp::serve(routes.with(warp::log("warp")))
.run(([0,0,0,0], MASTER_PORT)).await;
.run(([0, 0, 0, 0], MASTER_PORT))
.await;
}
#[cfg(test)]
mod tests {
use super::*;

@ -22,6 +22,7 @@ env_logger = "0.8.3"
diesel-derive-enum = { version = "1", features = ["postgres"] }
chrono = "0.4.19"
strum = { version = "0.20", features = ["derive"] }
once_cell = "1.7.2"
[dependencies.diesel]
version = "1.4.5"

@ -1,22 +1,12 @@
#[allow(non_upper_case_globals)]
use crate::{
MASTER_SERVER,
MASTER_PORT,
config::{MASTER_PORT, MASTER_SERVER},
models::*,
UResult,
UError,
utils::opt_to_string
};
use reqwest::{
Client,
Url,
RequestBuilder
};
use std::{
net::Ipv4Addr,
str::FromStr
utils::opt_to_string,
UError, UResult,
};
use reqwest::{Client, RequestBuilder, Url};
use std::{net::Ipv4Addr, str::FromStr};
use uuid::Uuid;
pub struct Paths;
@ -132,11 +122,10 @@ macro_rules! build_handler {
};
}
pub struct ClientHandler {
base_url: Url,
client: Client,
password: Option<String>
password: Option<String>,
}
impl ClientHandler {
@ -146,10 +135,8 @@ impl ClientHandler {
.unwrap_or(MASTER_SERVER);
Self {
client: Client::new(),
base_url: Url::parse(
&format!("http://{}:{}", master_server, MASTER_PORT)
).unwrap(),
password: None
base_url: Url::parse(&format!("http://{}:{}", master_server, MASTER_PORT)).unwrap(),
password: None,
}
}
@ -161,7 +148,7 @@ impl ClientHandler {
fn set_pwd(&self, rb: RequestBuilder) -> RequestBuilder {
match &self.password {
Some(p) => rb.bearer_auth(p),
None => rb
None => rb,
}
}

@ -1,12 +1,11 @@
use diesel::result::Error as DslError;
use reqwest::Error as ReqError;
use serde::{
Serialize,
Deserialize
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
pub type UResult<T> = std::result::Result<T, UError>;
pub type ULocalResult<T> = std::result::Result<T, ULocalError>;
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum UError {
@ -32,7 +31,7 @@ pub enum UError {
NoJob(Uuid),
#[error("Error opening {0}: {1}")]
FilesystemError(String, String)
FilesystemError(String, String),
}
impl From<ReqError> for UError {
@ -40,3 +39,15 @@ impl From<ReqError> for UError {
UError::NetError(e.to_string(), String::new())
}
}
#[derive(Error, Debug)]
pub enum ULocalError {
#[error("{0} is not found")]
NotFound(String),
#[error("Error processing {0}")]
ProcessingError(String),
#[error(transparent)]
DBError(#[from] DslError),
}

@ -2,16 +2,16 @@
// job runner (thread)
// every job runs in other thread/process
use crate::{models::*, UResult, OneOrMany};
use crate::{models::*, utils::OneOrMany, UResult};
use std::collections::HashMap;
use futures::{lock::{Mutex, MutexGuard}, future::BoxFuture};
use futures::{future::BoxFuture, lock::Mutex};
use lazy_static::lazy_static;
use tokio::{
spawn,
sync::mpsc::{channel, Receiver, Sender},
task::JoinHandle,
sync::mpsc::{channel, Receiver, Sender}
};
use uuid::Uuid;
@ -27,33 +27,33 @@ lazy_static! {
};
}
async fn get_sender() -> Sender<Uuid> {
fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone()
}
struct JoinInfo {
handle: JoinHandle<FutRes>,
completed: bool,
collectable: bool // indicates if future can be popped from pool via pop_task_if_completed
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
}
pub struct Waiter {
tasks: Vec<DynFut>,
fids: Vec<Uuid>
fids: Vec<Uuid>,
}
impl Waiter {
pub fn new<S: OneOrMany<DynFut>>(tasks: S) -> Self {
Self {
tasks: tasks.into_vec(),
fids: vec![]
fids: vec![],
}
}
pub async fn spawn(mut self) -> Self {
let collectable = true; //TODO: self.tasks.len() != 1;
for f in self.tasks.drain(..) {
let tx = get_sender().await;
let tx = get_sender();
let fid = Uuid::new_v4();
self.fids.push(fid);
let task_wrapper = async move {
@ -66,7 +66,7 @@ impl Waiter {
let handle = JoinInfo {
handle: spawn(task_wrapper),
completed: false,
collectable
collectable,
};
debug!("before push: {}", fid);
//spawn(async {}).await.ok();
@ -98,10 +98,6 @@ impl Waiter {
}
}
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
//info!("init_receiver: next val: {}", &fid);
@ -114,13 +110,18 @@ async fn init_receiver() {
}
}
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS
.lock()
.await
.get_mut(&fid) {
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
let &mut JoinInfo {
handle: _,
collectable,
completed,
} = match FUT_RESULTS.lock().await.get_mut(&fid) {
Some(t) => t,
None => return None
None => return None,
};
if collectable && completed {
let task = pop_task(fid).await.unwrap();
@ -133,7 +134,8 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
pub async fn pop_completed() -> Vec<FutRes> {
let mut completed: Vec<FutRes> = vec![];
let fids = FUT_RESULTS.lock()
let fids = FUT_RESULTS
.lock()
.await
.keys()
.map(|k| *k)

@ -1,19 +1,15 @@
#![allow(non_upper_case_globals)]
pub mod executor;
pub mod api;
pub mod config;
pub mod utils;
pub mod errors;
pub mod api;
pub mod models;
pub mod executor;
pub mod messaging;
pub mod models;
pub mod utils;
pub use {
utils::*,
config::*,
executor::*,
errors::*,
models::*
};
pub use config::UID;
pub use errors::{UError, ULocalError, ULocalResult, UResult};
pub use models::{Jobstate, Jobtype}; // for schema
#[macro_use]
extern crate lazy_static;

@ -1,20 +1,15 @@
use serde::{
Serialize,
Deserialize
};
use std::time::SystemTime;
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use std::fmt;
use diesel::{
Queryable,
Identifiable,
Insertable
};
use std::time::SystemTime;
use crate::{
models::*,
models::{
jobs::{build_jobs, JobMeta, JobOutput},
schema::*,
},
utils::{systime_to_string, vec_to_string},
UID,
utils::*,
models::schema::*,
};
use guess_host_triple::guess_host_triple;
@ -34,12 +29,12 @@ pub struct Agent {
pub regtime: SystemTime,
pub status: Option<String>,
pub token: Option<String>,
pub username: String
pub username: String,
}
impl fmt::Display for Agent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Agent {}", self.id);
let mut out = format!("Agent: {}", self.id);
if self.alias.is_some() {
out += &format!(" ({})", self.alias.as_ref().unwrap())
}
@ -64,11 +59,11 @@ pub struct IAgent {
pub platform: String,
pub status: Option<String>,
pub token: Option<String>,
pub username: String
pub username: String,
}
pub async fn gather() -> IAgent {
impl IAgent {
pub async fn gather() -> Self {
async fn run_cmd_fast<S: Into<String>>(cmd: S) -> String {
let jm = JobMeta::from_shell(cmd);
let job_result = build_jobs(jm)
@ -83,7 +78,7 @@ pub async fn gather() -> IAgent {
}
#[cfg(unix)]
IAgent {
Self {
alias: None,
id: UID.clone(),
hostname: run_cmd_fast("hostname").await,
@ -95,7 +90,7 @@ pub async fn gather() -> IAgent {
username: run_cmd_fast("id -un").await,
}
}
}
#[cfg(test)]
mod tests {
@ -103,11 +98,7 @@ mod tests {
#[tokio::test]
async fn test_gather() {
let cli_info = gather().await;
assert_eq!(
&cli_info.username,
"plazmoid"
)
let cli_info = IAgent::gather().await;
assert_eq!(&cli_info.username, "plazmoid")
}
}

@ -1,570 +0,0 @@
use std::{
time::{SystemTime, Duration},
thread,
sync::{RwLock, RwLockReadGuard},
cmp::PartialEq,
fmt,
string::ToString,
path::PathBuf,
fs,
process::Output,
collections::HashMap,
ops::Deref,
};
use serde::{
Serialize,
Deserialize
};
use uuid::Uuid;
use guess_host_triple::guess_host_triple;
use tokio::{
process::Command
};
use crate::{
utils::systime_to_string,
models::schema::*,
Agent,
UError,
UResult,
UID,
Waiter,
OneOrMany,
DynFut,
};
use diesel_derive_enum::DbEnum;
use diesel::{
Queryable,
Identifiable,
Insertable,
};
use strum::Display;
type Cache = HashMap<Uuid, JobMeta>;
lazy_static! {
static ref JOB_CACHE: RwLock<Cache> = RwLock::new(HashMap::new());
}
pub struct JobCache;
impl JobCache {
pub fn insert(job_meta: JobMeta) {
JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta);
}
pub fn contains(uid: &Uuid) -> bool {
JOB_CACHE.read().unwrap().contains_key(uid)
}
pub fn get(uid: &Uuid) -> Option<JobCacheHolder> {
if !Self::contains(uid) {
return None
}
let lock = JOB_CACHE.read().unwrap();
Some(JobCacheHolder(lock, uid))
}
pub fn remove(uid: &Uuid) {
JOB_CACHE.write().unwrap().remove(uid);
}
}
pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid);
impl<'jm> Deref for JobCacheHolder<'jm> {
type Target = JobMeta;
fn deref(&self) -> &Self::Target {
self.0.get(self.1).unwrap()
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
Ping,
UpdateAvailable,
JobsResultsRequest,
Terminate
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobSchedule {
Once,
Permanent,
//Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
#[PgType = "JobState"]
#[DieselType = "Jobstate"]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
//Pending, // client got a job, but not running yet
Running, // client is currently running a job
Finished,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
#[PgType = "JobType"]
#[DieselType = "Jobtype"]
pub enum JobType {
Manage,
Shell,
Python,
Binary,
Dummy
}
#[derive(Clone, Debug)]
pub struct JobOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
impl JobOutput {
const STREAM_BORDER: &'static str = "***";
const STDOUT: &'static str = "STDOUT";
const STDERR: &'static str = "STDERR";
#[inline]
fn create_delim(header: &'static str) -> String {
format!("{border} {head} {border}\n",
border = JobOutput::STREAM_BORDER,
head = header
)
}
pub fn new() -> Self {
Self {
stdout: vec![],
stderr: vec![],
}
}
pub fn stdout(mut self, data: Vec<u8>) -> Self {
self.stdout = data;
self
}
pub fn stderr(mut self, data: Vec<u8>) -> Self {
self.stderr = data;
self
}
pub fn multiline(&self) -> Vec<u8> {
let mut result: Vec<u8> = vec![];
if self.stdout.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes());
result.extend(&self.stdout);
result.push(b'\n');
}
if self.stderr.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes());
result.extend(&self.stderr);
result.push(b'\n');
}
result
}
pub fn from_raw(raw: &[u8]) -> Option<Self> {
let raw = String::from_utf8_lossy(raw);
let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| {
let mut parts = s.split(&err_header)
.map(|d| d.trim().as_bytes().to_vec())
.collect::<Vec<Vec<u8>>>()
.into_iter();
JobOutput::new()
.stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(vec![]))
})
}
pub fn into_appropriate(self) -> Vec<u8> {
if self.stdout.len() > 0 {
self.stdout
} else if self.stderr.len() > 0 {
self.stderr
} else {
b"No data".to_vec()
}
}
}
#[derive(
Serialize,
Deserialize,
Clone,
Debug,
Queryable,
Identifiable,
Insertable
)]
#[table_name = "jobs"]
pub struct JobMeta {
pub alias: String,
pub id: Uuid,
pub exec_type: JobType,
//pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Vec<u8>>,
}
impl fmt::Display for JobMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Job {}", self.id);
out += &format!(" ({})", self.alias);
out += &format!("\nExecutable type: {}", self.exec_type);
out += &format!("\nPlatform: {}", self.platform);
if self.exec_type == JobType::Shell && self.payload.is_some() {
out += &format!("\nPayload: {}", String::from_utf8_lossy(self.payload.as_ref().unwrap()));
}
write!(f, "{}", out)
}
}
impl JobMeta {
pub fn from_shell<S: Into<String>>(shell_cmd: S) -> Self {
let shell_cmd = shell_cmd.into();
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
alias: job_name.to_string(),
payload: Some(shell_cmd.into_bytes()),
..Default::default()
}
}
/*
pub fn from_file(path: PathBuf) -> UResult<Self> {
let data = fs::read(path)
.map_err(|e| UError::FilesystemError(
path.to_string_lossy().to_string(),
e.to_string()
))?;
let filename = path.file_name().unwrap().to_str().unwrap();
}*/
}
impl Default for JobMeta {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
alias: String::new(),
exec_type: JobType::Shell,
platform: guess_host_triple().unwrap_or("unknown").to_string(),
payload: None
}
}
}
#[derive(
Serialize,
Deserialize,
Clone,
Debug,
Queryable,
Identifiable,
Insertable,
AsChangeset,
)]
#[table_name = "results"]
pub struct ExactJob {
pub agent_id: Uuid,
pub created: SystemTime,
pub id: Uuid,
pub job_id: Uuid,
pub result: Option<Vec<u8>>,
pub state: JobState,
pub retcode: Option<i32>,
pub updated: SystemTime,
}
impl fmt::Display for ExactJob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Result {}", self.id);
out += &format!("\nAgent {}", self.agent_id);
out += &format!("\nJob: {}", self.job_id);
out += &format!("\nUpdated: {}", systime_to_string(&self.updated));
out += &format!("\nState: {}", self.state);
if self.state == JobState::Finished {
if self.retcode.is_some() {
out += &format!("\nReturn code: {}", self.retcode.unwrap());
}
if self.result.is_some() {
out += &format!("\nResult: {}", String::from_utf8_lossy(self.result.as_ref().unwrap()));
}
}
write!(f, "{}", out)
}
}
impl ExactJob {
pub fn from_meta(job_id: Uuid, result_id: Option<Uuid>) -> Self {
Self {
id: result_id.unwrap_or(Uuid::new_v4()),
agent_id: *UID,
job_id,
..Default::default()
}
}
//pub fn as_job_output(&self) -> JobOutput {}
}
impl Default for ExactJob {
fn default() -> Self {
Self {
agent_id: Uuid::nil(),
created: SystemTime::now(),
id: Uuid::new_v4(),
job_id: Uuid::nil(),
result: None,
state: JobState::Queued,
retcode: None,
updated: SystemTime::now()
}
}
}
pub struct Job {
exec_type: JobType,
payload: Option<Vec<u8>>,
result: ExactJob
}
impl Job {
fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult<Self> {
match job_meta.exec_type {
JobType::Shell => {
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string();
if job_meta.platform != curr_platform {
return Err(UError::InsuitablePlatform(
job_meta.platform.clone(), curr_platform
))
}
let job_meta = job_meta.clone();
Ok(Self {
exec_type: job_meta.exec_type,
payload: job_meta.payload,
result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id))
})
},
_ => todo!()
}
}
async fn run(mut self) -> UResult<ExactJob> {
match self.exec_type {
JobType::Shell => {
let str_payload = match &self.payload {
Some(box_payload) => {
String::from_utf8_lossy(box_payload).into_owned()
}
None => unimplemented!()
};
let mut cmd_parts = str_payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
.into_iter();
let cmd = cmd_parts.nth(0).unwrap();
let args = cmd_parts.collect::<Vec<_>>();
let cmd_result = Command::new(cmd)
.args(args)
.output()
.await;
let (data, retcode) = match cmd_result {
Ok(Output {status, stdout, stderr}) => {
(
Some(JobOutput::new()
.stdout(stdout)
.stderr(stderr)
.multiline()
),
status.code()
)
}
Err(e) => {
(
Some(UError::JobError(e.to_string()).to_string().into_bytes()),
None
)
}
};
self.result.result = data;
self.result.retcode = retcode;
self.result.updated = SystemTime::now();
self.result.state = JobState::Finished;
},
_ => todo!()
}
Ok(self.result)
}
}
pub fn build_jobs_with_result<J: OneOrMany<ExactJob>>(job_requests: J) -> Waiter {
let prepared_jobs = job_requests.into_vec()
.into_iter()
.filter_map(|jr| -> Option<DynFut> {
let job = {
let job_meta = JobCache::get(&jr.job_id);
if job_meta.is_none() {
Err(UError::NoJob(jr.job_id))
} else {
Job::build(&*job_meta.unwrap(), jr.id)
}
};
match job {
Ok(j) => Some(Box::pin(j.run())),
Err(e) => {
warn!("Job building error: {}", e);
None
}
}
}).collect::<Vec<DynFut>>();
Waiter::new(prepared_jobs)
}
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let job_requests = job_metas.into_vec().into_iter().map(|jm| {
let j_uid = jm.id;
JobCache::insert(jm);
ExactJob::from_meta(j_uid, None)
}).collect::<Vec<ExactJob>>();
build_jobs_with_result(job_requests)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{build_jobs, utils::vec_to_string, pop_completed};
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
build_jobs(sleep_jobs).run_until_complete().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS+2)
}
#[tokio::test]
async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("whoami");
let job_result = build_jobs(job)
.run_one_until_complete()
.await;
let stdout = JobOutput::from_raw(
&job_result.unwrap().result.unwrap()
).unwrap().stdout;
assert_eq!(
vec_to_string(&stdout).trim(),
"plazmoid"
);
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> UResult<()> {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let longest_job = build_jobs(longest_job).spawn().await;
let ls = build_jobs(JobMeta::from_shell("ls"))
.run_one_until_complete()
.await
.unwrap();
assert_eq!(ls.retcode.unwrap(), 0);
let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap();
let folders = String::from_utf8_lossy(
&result.stdout
);
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)))
.collect();
let ls_subfolders = build_jobs(
subfolders_jobs
).run_until_complete().await;
for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk");
let job_result = build_jobs(job).run_one_until_complete().await.unwrap();
let output = JobOutput::from_raw(&job_result.result.unwrap());
assert!(output.is_none());
assert!(job_result.retcode.is_none());
Ok(())
}
#[test]
fn test_to_multiline() {
let mut output = JobOutput::new();
output.stdout = b"lol".to_vec();
output.stderr = b"kek".to_vec();
assert_eq!(
vec_to_string(&output.multiline()),
String::from(
"*** STDOUT ***\n\
lol\n\
*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_to_multiline_stderr_only() {
let mut output = JobOutput::new();
output.stderr = b"kek".to_vec();
assert_eq!(
vec_to_string(&output.multiline()),
String::from(
"*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_from_multiline() {
let txt = "*** STDOUT ***\n\
puk\n".as_bytes();
let output = JobOutput::from_raw(txt).unwrap();
assert_eq!(
vec_to_string(&output.stdout),
"puk".to_string()
);
assert_eq!(output.stderr.len(), 0);
}
}

@ -0,0 +1,116 @@
use super::{ExactJob, JobCache, JobMeta, JobOutput, JobState, JobType};
use crate::{
executor::{DynFut, Waiter},
utils::OneOrMany,
UError, UResult,
};
use guess_host_triple::guess_host_triple;
use std::{process::Output, time::SystemTime};
use tokio::process::Command;
use uuid::Uuid;
pub struct Job {
exec_type: JobType,
payload: Option<Vec<u8>>,
result: ExactJob,
}
impl Job {
fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult<Self> {
match job_meta.exec_type {
JobType::Shell => {
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string();
if job_meta.platform != curr_platform {
return Err(UError::InsuitablePlatform(
job_meta.platform.clone(),
curr_platform,
));
}
let job_meta = job_meta.clone();
Ok(Self {
exec_type: job_meta.exec_type,
payload: job_meta.payload,
result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id)),
})
}
_ => todo!(),
}
}
async fn run(mut self) -> UResult<ExactJob> {
match self.exec_type {
JobType::Shell => {
let str_payload = match &self.payload {
Some(box_payload) => String::from_utf8_lossy(box_payload).into_owned(),
None => unimplemented!(),
};
let mut cmd_parts = str_payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
.into_iter();
let cmd = cmd_parts.nth(0).unwrap();
let args = cmd_parts.collect::<Vec<_>>();
let cmd_result = Command::new(cmd).args(args).output().await;
let (data, retcode) = match cmd_result {
Ok(Output {
status,
stdout,
stderr,
}) => (
JobOutput::new().stdout(stdout).stderr(stderr).multiline(),
status.code(),
),
Err(e) => (
UError::JobError(e.to_string()).to_string().into_bytes(),
None,
),
};
self.result.result = Some(data);
self.result.retcode = retcode;
self.result.updated = SystemTime::now();
self.result.state = JobState::Finished;
}
_ => todo!(),
}
Ok(self.result)
}
}
pub fn build_jobs_with_result<J: OneOrMany<ExactJob>>(job_requests: J) -> Waiter {
let prepared_jobs = job_requests
.into_vec()
.into_iter()
.filter_map(|jr| -> Option<DynFut> {
let job = {
let job_meta = JobCache::get(&jr.job_id);
if job_meta.is_none() {
Err(UError::NoJob(jr.job_id))
} else {
Job::build(&*job_meta.unwrap(), jr.id)
}
};
match job {
Ok(j) => Some(Box::pin(j.run())),
Err(e) => {
warn!("Job building error: {}", e);
None
}
}
})
.collect::<Vec<DynFut>>();
Waiter::new(prepared_jobs)
}
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let job_requests = job_metas
.into_vec()
.into_iter()
.map(|jm| {
let j_uid = jm.id;
JobCache::insert(jm);
ExactJob::from_meta(j_uid, None)
})
.collect::<Vec<ExactJob>>();
build_jobs_with_result(job_requests)
}

@ -0,0 +1,47 @@
use crate::models::JobMeta;
use std::{
collections::HashMap,
ops::Deref,
sync::{RwLock, RwLockReadGuard},
};
use uuid::Uuid;
type Cache = HashMap<Uuid, JobMeta>;
lazy_static! {
static ref JOB_CACHE: RwLock<Cache> = RwLock::new(HashMap::new());
}
pub struct JobCache;
impl JobCache {
pub fn insert(job_meta: JobMeta) {
JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta);
}
pub fn contains(uid: &Uuid) -> bool {
JOB_CACHE.read().unwrap().contains_key(uid)
}
pub fn get(uid: &Uuid) -> Option<JobCacheHolder> {
if !Self::contains(uid) {
return None;
}
let lock = JOB_CACHE.read().unwrap();
Some(JobCacheHolder(lock, uid))
}
pub fn remove(uid: &Uuid) {
JOB_CACHE.write().unwrap().remove(uid);
}
}
pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid);
impl<'jm> Deref for JobCacheHolder<'jm> {
type Target = JobMeta;
fn deref(&self) -> &Self::Target {
self.0.get(self.1).unwrap()
}
}

@ -0,0 +1,68 @@
use super::JobType;
use crate::models::schema::*;
use diesel::{Identifiable, Insertable, Queryable};
use guess_host_triple::guess_host_triple;
use serde::{Deserialize, Serialize};
use std::fmt;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable)]
#[table_name = "jobs"]
pub struct JobMeta {
pub alias: String,
pub id: Uuid,
pub exec_type: JobType,
//pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Vec<u8>>,
}
impl fmt::Display for JobMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Job {}", self.id);
out += &format!(" ({})", self.alias);
out += &format!("\nExecutable type: {}", self.exec_type);
out += &format!("\nPlatform: {}", self.platform);
if self.exec_type == JobType::Shell && self.payload.is_some() {
out += &format!(
"\nPayload: {}",
String::from_utf8_lossy(self.payload.as_ref().unwrap())
);
}
write!(f, "{}", out)
}
}
impl JobMeta {
pub fn from_shell<S: Into<String>>(shell_cmd: S) -> Self {
let shell_cmd = shell_cmd.into();
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
alias: job_name.to_string(),
payload: Some(shell_cmd.into_bytes()),
..Default::default()
}
}
/*
pub fn from_file(path: PathBuf) -> UResult<Self> {
let data = fs::read(path)
.map_err(|e| UError::FilesystemError(
path.to_string_lossy().to_string(),
e.to_string()
))?;
let filename = path.file_name().unwrap().to_str().unwrap();
}*/
}
impl Default for JobMeta {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
alias: String::new(),
exec_type: JobType::Shell,
platform: guess_host_triple().unwrap_or("unknown").to_string(),
payload: None,
}
}
}

@ -0,0 +1,164 @@
use diesel_derive_enum::DbEnum;
use serde::{Deserialize, Serialize};
use strum::Display;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
Ping,
UpdateAvailable,
JobsResultsRequest,
Terminate,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobSchedule {
Once,
Permanent,
//Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
#[PgType = "JobState"]
#[DieselType = "Jobstate"]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
//Pending, // client got a job, but not running yet
Running, // client is currently running a job
Finished,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
#[PgType = "JobType"]
#[DieselType = "Jobtype"]
pub enum JobType {
Manage,
Shell,
Python,
Binary,
Dummy,
}
#[derive(Clone, Debug)]
pub struct JobOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
impl JobOutput {
const STREAM_BORDER: &'static str = "***";
const STDOUT: &'static str = "STDOUT";
const STDERR: &'static str = "STDERR";
#[inline]
fn create_delim(header: &'static str) -> String {
format!(
"{border} {head} {border}\n",
border = JobOutput::STREAM_BORDER,
head = header
)
}
pub fn new() -> Self {
Self {
stdout: vec![],
stderr: vec![],
}
}
pub fn stdout(mut self, data: Vec<u8>) -> Self {
self.stdout = data;
self
}
pub fn stderr(mut self, data: Vec<u8>) -> Self {
self.stderr = data;
self
}
pub fn multiline(&self) -> Vec<u8> {
let mut result: Vec<u8> = vec![];
if self.stdout.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes());
result.extend(&self.stdout);
result.push(b'\n');
}
if self.stderr.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes());
result.extend(&self.stderr);
result.push(b'\n');
}
result
}
pub fn from_raw(raw: &[u8]) -> Option<Self> {
let raw = String::from_utf8_lossy(raw);
let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| {
let mut parts = s
.split(&err_header)
.map(|d| d.trim().as_bytes().to_vec())
.collect::<Vec<Vec<u8>>>()
.into_iter();
JobOutput::new()
.stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(vec![]))
})
}
pub fn into_appropriate(self) -> Vec<u8> {
if self.stdout.len() > 0 {
self.stdout
} else if self.stderr.len() > 0 {
self.stderr
} else {
b"No data".to_vec()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::vec_to_string;
#[test]
fn test_to_multiline() {
let mut output = JobOutput::new();
output.stdout = b"lol".to_vec();
output.stderr = b"kek".to_vec();
assert_eq!(
vec_to_string(&output.multiline()),
String::from(
"*** STDOUT ***\n\
lol\n\
*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_to_multiline_stderr_only() {
let mut output = JobOutput::new();
output.stderr = b"kek".to_vec();
assert_eq!(
vec_to_string(&output.multiline()),
String::from(
"*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_from_multiline() {
let txt = "*** STDOUT ***\n\
puk\n"
.as_bytes();
let output = JobOutput::from_raw(txt).unwrap();
assert_eq!(vec_to_string(&output.stdout), "puk".to_string());
assert_eq!(output.stderr.len(), 0);
}
}

@ -0,0 +1,7 @@
pub mod builder;
pub mod cache;
pub mod meta;
pub mod misc;
pub mod result;
pub use {builder::*, cache::*, meta::*, misc::*, result::*};

@ -0,0 +1,155 @@
use super::JobState;
use crate::{models::schema::*, utils::systime_to_string, UID};
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use std::{fmt, time::SystemTime};
use uuid::Uuid;
#[derive(
Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable, AsChangeset,
)]
#[table_name = "results"]
pub struct ExactJob {
pub agent_id: Uuid,
pub created: SystemTime,
pub id: Uuid,
pub job_id: Uuid,
pub result: Option<Vec<u8>>,
pub state: JobState,
pub retcode: Option<i32>,
pub updated: SystemTime,
}
impl Default for ExactJob {
fn default() -> Self {
Self {
agent_id: Uuid::nil(),
created: SystemTime::now(),
id: Uuid::new_v4(),
job_id: Uuid::nil(),
result: None,
state: JobState::Queued,
retcode: None,
updated: SystemTime::now(),
}
}
}
impl fmt::Display for ExactJob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Result {}", self.id);
out += &format!("\nAgent {}", self.agent_id);
out += &format!("\nJob: {}", self.job_id);
out += &format!("\nUpdated: {}", systime_to_string(&self.updated));
out += &format!("\nState: {}", self.state);
if self.state == JobState::Finished {
if self.retcode.is_some() {
out += &format!("\nReturn code: {}", self.retcode.unwrap());
}
if self.result.is_some() {
out += &format!(
"\nResult: {}",
String::from_utf8_lossy(self.result.as_ref().unwrap())
);
}
}
write!(f, "{}", out)
}
}
impl ExactJob {
pub fn from_meta(job_id: Uuid, result_id: Option<Uuid>) -> Self {
Self {
id: result_id.unwrap_or(Uuid::new_v4()),
agent_id: *UID,
job_id,
..Default::default()
}
}
//pub fn as_job_output(&self) -> JobOutput {}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
models::jobs::{build_jobs, JobMeta, JobOutput},
utils::vec_to_string,
UResult,
};
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
build_jobs(sleep_jobs).run_until_complete().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
#[tokio::test]
async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("whoami");
let job_result = build_jobs(job).run_one_until_complete().await;
let stdout = JobOutput::from_raw(&job_result.unwrap().result.unwrap())
.unwrap()
.stdout;
assert_eq!(vec_to_string(&stdout).trim(), "plazmoid");
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> UResult<()> {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let longest_job = build_jobs(longest_job).spawn().await;
let ls = build_jobs(JobMeta::from_shell("ls"))
.run_one_until_complete()
.await
.unwrap();
assert_eq!(ls.retcode.unwrap(), 0);
let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap();
let folders = String::from_utf8_lossy(&result.stdout);
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)))
.collect();
let ls_subfolders = build_jobs(subfolders_jobs).run_until_complete().await;
for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk");
let job_result = build_jobs(job).run_one_until_complete().await.unwrap();
let output = JobOutput::from_raw(&job_result.result.unwrap());
assert!(output.is_none());
assert!(job_result.retcode.is_none());
Ok(())
}
}
Loading…
Cancel
Save