Merge branch '11-move-db-related-methods-to-u_lib-models' into 'master'

split jobs into submodules, reworked db interface

Closes #11

See merge request root/unki!2
4-update-check
Administrator 4 years ago
commit 06b731b221
  1. 3
      .cargo/config.toml
  2. 1
      bin/u_server/Cargo.toml
  3. 142
      bin/u_server/src/db.rs
  4. 16
      bin/u_server/src/errors.rs
  5. 136
      bin/u_server/src/handlers.rs
  6. 96
      bin/u_server/src/main.rs
  7. 1
      lib/u_lib/Cargo.toml
  8. 31
      lib/u_lib/src/api.rs
  9. 21
      lib/u_lib/src/errors.rs
  10. 42
      lib/u_lib/src/executor.rs
  11. 18
      lib/u_lib/src/lib.rs
  12. 43
      lib/u_lib/src/models/agent.rs
  13. 570
      lib/u_lib/src/models/jobs.rs
  14. 116
      lib/u_lib/src/models/jobs/builder.rs
  15. 47
      lib/u_lib/src/models/jobs/cache.rs
  16. 68
      lib/u_lib/src/models/jobs/meta.rs
  17. 164
      lib/u_lib/src/models/jobs/misc.rs
  18. 7
      lib/u_lib/src/models/jobs/mod.rs
  19. 155
      lib/u_lib/src/models/jobs/result.rs

@ -0,0 +1,3 @@
[build]
target = "x86_64-unknown-linux-gnu" # -musl"

@ -11,6 +11,7 @@ log = "0.4.11"
thiserror = "*" thiserror = "*"
warp = "0.2.4" warp = "0.2.4"
uuid = { version = "0.6.5", features = ["serde", "v4"] } uuid = { version = "0.6.5", features = ["serde", "v4"] }
once_cell = "1.7.2"
[dependencies.diesel] [dependencies.diesel]
features = ["postgres", "uuid"] features = ["postgres", "uuid"]

@ -1,39 +1,56 @@
use diesel::{ use diesel::{pg::PgConnection, prelude::*, result::Error as DslError};
pg::PgConnection,
prelude::*,
result::Error as DslError
};
use dotenv::dotenv; use dotenv::dotenv;
use once_cell::sync::OnceCell;
use std::{ use std::{
env, env,
sync::{Arc, Mutex} sync::{Arc, Mutex, MutexGuard},
};
use crate::{
errors::*
}; };
use u_lib::{ use u_lib::{
models::* models::{schema, Agent, ExactJob, IAgent, JobMeta, JobState},
ULocalError, ULocalResult,
}; };
use uuid::Uuid; use uuid::Uuid;
pub type Storage = Arc<Mutex<UDB>>;
pub struct UDB { pub struct UDB {
pub conn: PgConnection pub conn: PgConnection,
} }
impl UDB { static DB: OnceCell<Arc<Mutex<UDB>>> = OnceCell::new();
pub fn new() -> USrvResult<Storage> {
pub fn lock_db() -> MutexGuard<'static, UDB> {
DB.get_or_init(|| {
dotenv().unwrap(); dotenv().unwrap();
let db_path = env::var("DATABASE_URL").unwrap(); let db_path = env::var("DATABASE_URL").unwrap();
let conn = PgConnection::establish(&db_path).unwrap(); let conn = PgConnection::establish(&db_path).unwrap();
let instance = UDB { let instance = UDB { conn };
conn Arc::new(Mutex::new(instance))
})
.lock()
.unwrap()
}
impl UDB {
pub fn insert_jobs(&self, job_metas: &Vec<JobMeta>) -> ULocalResult<()> {
use schema::jobs;
diesel::insert_into(jobs::table)
.values(job_metas)
.execute(&self.conn)?;
Ok(())
}
pub fn get_jobs(&self, uid: Option<Uuid>) -> ULocalResult<Vec<JobMeta>> {
use schema::jobs;
let result = if uid.is_some() {
jobs::table
.filter(jobs::id.eq(uid.unwrap()))
.get_results::<JobMeta>(&self.conn)?
} else {
jobs::table.load::<JobMeta>(&self.conn)?
}; };
Ok(Arc::new(Mutex::new(instance))) Ok(result)
} }
pub fn new_agent(&self, agent: &IAgent) -> USrvResult<()> { pub fn insert_agents(&self, agent: &IAgent) -> ULocalResult<()> {
use schema::agents; use schema::agents;
diesel::insert_into(agents::table) diesel::insert_into(agents::table)
.values(agent) .values(agent)
@ -41,33 +58,19 @@ impl UDB {
Ok(()) Ok(())
} }
pub fn get_agents(&self, uid: Option<Uuid>) -> USrvResult<Vec<Agent>> { pub fn get_agents(&self, uid: Option<Uuid>) -> ULocalResult<Vec<Agent>> {
use schema::agents; use schema::agents;
let result = if uid.is_some() { let result = if uid.is_some() {
agents::table agents::table
.filter(agents::id.eq(uid.unwrap())) .filter(agents::id.eq(uid.unwrap()))
.load::<Agent>(&self.conn)? .load::<Agent>(&self.conn)?
} else { } else {
agents::table agents::table.load::<Agent>(&self.conn)?
.load::<Agent>(&self.conn)?
}; };
Ok(result) Ok(result)
} }
pub fn get_jobs(&self, uid: Option<Uuid>) -> USrvResult<Vec<JobMeta>> { pub fn update_job_status(&self, uid: Uuid, status: JobState) -> ULocalResult<()> {
use schema::jobs;
let result = if uid.is_some() {
jobs::table
.filter(jobs::id.eq(uid.unwrap()))
.get_results::<JobMeta>(&self.conn)?
} else {
jobs::table
.load::<JobMeta>(&self.conn)?
};
Ok(result)
}
pub fn update_job_status(&self, uid: Uuid, status: JobState) -> USrvResult<()> {
use schema::results; use schema::results;
diesel::update(results::table) diesel::update(results::table)
.filter(results::id.eq(uid)) .filter(results::id.eq(uid))
@ -76,63 +79,62 @@ impl UDB {
Ok(()) Ok(())
} }
pub fn get_agent_jobs(&self, uid: Option<Uuid>, personal: bool) -> USrvResult<Vec<ExactJob>> { //TODO: filters possibly could work in a wrong way, check
pub fn get_exact_jobs(&self, uid: Option<Uuid>, personal: bool) -> ULocalResult<Vec<ExactJob>> {
use schema::results; use schema::results;
let mut q = results::table let mut q = results::table.into_boxed();
.into_boxed();
if uid.is_some() { if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap())) q = q.filter(results::agent_id.eq(uid.unwrap()))
} }
if personal { if personal {
q = q.filter( q = q.filter(
results::state.eq(JobState::Queued) results::state
.and(results::agent_id.eq(uid.unwrap())) .eq(JobState::Queued)
.and(results::agent_id.eq(uid.unwrap())),
) )
} else if uid.is_some() { } else if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap())) q = q
.filter(results::agent_id.eq(uid.unwrap()))
.or_filter(results::job_id.eq(uid.unwrap())) .or_filter(results::job_id.eq(uid.unwrap()))
.or_filter(results::id.eq(uid.unwrap())) .or_filter(results::id.eq(uid.unwrap()))
} }
let result = q.load::<ExactJob>(&self.conn)?; let result = q.load::<ExactJob>(&self.conn)?;
Ok(result) Ok(result)
} }
pub fn add_jobs(&self, job_metas: &Vec<JobMeta>) -> USrvResult<()> { pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &Vec<Uuid>) -> ULocalResult<()> {
use schema::jobs;
diesel::insert_into(jobs::table)
.values(job_metas)
.execute(&self.conn)?;
Ok(())
}
pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &Vec<Uuid>) -> USrvResult<()> {
use schema::{agents::dsl::agents, jobs::dsl::jobs, results}; use schema::{agents::dsl::agents, jobs::dsl::jobs, results};
if let Err(DslError::NotFound) = agents.find(agent_uid).first::<Agent>(&self.conn) { if let Err(DslError::NotFound) = agents.find(agent_uid).first::<Agent>(&self.conn) {
return Err(USrvError::NotFound(agent_uid.to_string())) return Err(ULocalError::NotFound(agent_uid.to_string()));
} }
let not_found_jobs = job_uids.iter().filter_map(|job_uid| { let not_found_jobs = job_uids
.iter()
.filter_map(|job_uid| {
if let Err(DslError::NotFound) = jobs.find(job_uid).first::<JobMeta>(&self.conn) { if let Err(DslError::NotFound) = jobs.find(job_uid).first::<JobMeta>(&self.conn) {
Some(job_uid.to_string()) Some(job_uid.to_string())
} else { None } } else {
}).collect::<Vec<String>>(); None
}
})
.collect::<Vec<String>>();
if not_found_jobs.len() > 0 { if not_found_jobs.len() > 0 {
return Err(USrvError::NotFound(not_found_jobs.join(", "))); return Err(ULocalError::NotFound(not_found_jobs.join(", ")));
} }
let job_requests = job_uids.iter().map(|job_uid| { let job_requests = job_uids
ExactJob { .iter()
.map(|job_uid| ExactJob {
job_id: *job_uid, job_id: *job_uid,
agent_id: *agent_uid, agent_id: *agent_uid,
..Default::default() ..Default::default()
} })
}).collect::<Vec<ExactJob>>(); .collect::<Vec<ExactJob>>();
diesel::insert_into(results::table) diesel::insert_into(results::table)
.values(&job_requests) .values(&job_requests)
.execute(&self.conn)?; .execute(&self.conn)?;
Ok(()) Ok(())
} }
pub fn del_jobs(&self, uids: &Vec<Uuid>) -> USrvResult<usize> { pub fn del_jobs(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::jobs; use schema::jobs;
let mut affected = 0; let mut affected = 0;
for &uid in uids { for &uid in uids {
@ -144,24 +146,24 @@ impl UDB {
Ok(affected) Ok(affected)
} }
pub fn del_agents(&self, uids: &Vec<Uuid>) -> USrvResult<usize> { pub fn del_results(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::agents; use schema::results;
let mut affected = 0; let mut affected = 0;
for &uid in uids { for &uid in uids {
let deleted = diesel::delete(agents::table) let deleted = diesel::delete(results::table)
.filter(agents::id.eq(uid)) .filter(results::id.eq(uid))
.execute(&self.conn)?; .execute(&self.conn)?;
affected += deleted; affected += deleted;
} }
Ok(affected) Ok(affected)
} }
pub fn del_results(&self, uids: &Vec<Uuid>) -> USrvResult<usize> { pub fn del_agents(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::results; use schema::agents;
let mut affected = 0; let mut affected = 0;
for &uid in uids { for &uid in uids {
let deleted = diesel::delete(results::table) let deleted = diesel::delete(agents::table)
.filter(results::id.eq(uid)) .filter(agents::id.eq(uid))
.execute(&self.conn)?; .execute(&self.conn)?;
affected += deleted; affected += deleted;
} }

@ -1,16 +0,0 @@
use thiserror::Error;
use diesel::result::Error as DslError;
pub type USrvResult<T> = Result<T, USrvError>;
#[derive(Error, Debug)]
pub enum USrvError {
#[error("{0} is not found")]
NotFound(String),
#[error("Error processing {0}")]
ProcessingError(String),
#[error(transparent)]
DBError(#[from] DslError)
}

@ -1,119 +1,78 @@
use crate::db::{lock_db, UDB};
use diesel::SaveChangesDsl;
use std::fmt::Display; use std::fmt::Display;
use u_lib::{ use u_lib::{
models::* messaging::{BaseMessage, ToMsg},
models::{ExactJob, IAgent, JobMeta, JobState},
ULocalError,
}; };
use uuid::Uuid;
use warp::{ use warp::{
Rejection, http::{Response, StatusCode},
Reply, reply::with_status,
reply::{with_status}, Rejection, Reply,
http::{StatusCode, Response}
}; };
use crate::db::{
Storage,
UDB,
};
use uuid::Uuid;
use diesel::SaveChangesDsl;
use crate::errors::USrvError;
fn build_response<S: Display>(code: StatusCode, body: S) -> Result<Response<String>, Rejection> { fn build_response<S: Display>(code: StatusCode, body: S) -> Result<Response<String>, Rejection> {
Ok(Response::builder() Ok(Response::builder()
.status(code) .status(code)
.body(format!("{}", body)).unwrap()) .body(format!("{}", body))
.unwrap())
} }
fn build_empty_200() -> Result<Response<String>, Rejection> { fn build_empty_200() -> Result<Response<String>, Rejection> {
build_response(StatusCode::OK, "") build_response(StatusCode::OK, "")
} }
pub async fn add_agent( pub async fn add_agent(msg: BaseMessage<'_, IAgent>) -> Result<impl Reply, Rejection> {
msg: BaseMessage<'_, IAgent>, match lock_db().insert_agents(&msg.into_inner()) {
db: Storage) -> Result<impl Reply, Rejection>
{
match db.lock()
.unwrap()
.new_agent(&msg.into_inner()) {
Ok(_) => build_empty_200(), Ok(_) => build_empty_200(),
Err(e) => build_response(StatusCode::BAD_REQUEST, e) Err(e) => build_response(StatusCode::BAD_REQUEST, e),
} }
} }
pub async fn get_agents( pub async fn get_agents(uid: Option<Uuid>) -> Result<impl Reply, Rejection> {
uid: Option<Uuid>, match lock_db().get_agents(uid) {
db: Storage Ok(r) => Ok(warp::reply::json(&r.as_message())),
) -> Result<impl Reply, Rejection> { Err(e) => Err(warp::reject()),
match db.lock()
.unwrap()
.get_agents(uid) {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
Err(e) => Err(warp::reject())
} }
} }
pub async fn get_jobs( pub async fn get_jobs(uid: Option<Uuid>) -> Result<impl Reply, Rejection> {
uid: Option<Uuid>, match lock_db().get_jobs(uid) {
db: Storage) -> Result<impl Reply, Rejection> Ok(r) => Ok(warp::reply::json(&r.as_message())),
{ Err(e) => Err(warp::reject()),
match db.lock()
.unwrap()
.get_jobs(uid) {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
Err(e) => Err(warp::reject())
} }
} }
pub async fn get_agent_jobs( pub async fn get_agent_jobs(uid: Option<Uuid>, personal: bool) -> Result<impl Reply, Rejection> {
uid: Option<Uuid>, let result = lock_db().get_exact_jobs(uid, personal);
db: Storage,
personal: bool) -> Result<impl Reply, Rejection>
{
let result = db.lock()
.unwrap()
.get_agent_jobs(uid, personal);
match result { match result {
Ok(r) => { Ok(r) => {
let _db = db.lock().unwrap(); let _db = lock_db();
for j in r.iter() { for j in r.iter() {
_db.update_job_status(j.id, JobState::Running).ok(); _db.update_job_status(j.id, JobState::Running).ok();
} }
Ok(warp::reply::json( Ok(warp::reply::json(&r.as_message()))
&r.as_message() }
)) Err(e) => Err(warp::reject()),
},
Err(e) => Err(warp::reject())
} }
} }
pub async fn upload_jobs( pub async fn upload_jobs(msg: BaseMessage<'_, Vec<JobMeta>>) -> Result<impl Reply, Rejection> {
msg: BaseMessage<'_, Vec<JobMeta>>, match lock_db().insert_jobs(&msg.into_inner()) {
db: Storage) -> Result<impl Reply, Rejection>
{
match db.lock()
.unwrap()
.add_jobs(&msg.into_inner()) {
Ok(_) => build_empty_200(), Ok(_) => build_empty_200(),
Err(e) => build_response(StatusCode::BAD_REQUEST, e) Err(e) => build_response(StatusCode::BAD_REQUEST, e),
} }
} }
pub async fn del( pub async fn del(uid: Uuid) -> Result<impl Reply, Rejection> {
uid: Uuid, let db = lock_db();
db: Storage) -> Result<impl Reply, Rejection> let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
{
let db = db.lock().unwrap();
let del_fns = &[
UDB::del_agents,
UDB::del_jobs,
UDB::del_results,
];
for del_fn in del_fns { for del_fn in del_fns {
let affected = del_fn(&db, &vec![uid]).unwrap(); let affected = del_fn(&db, &vec![uid]).unwrap();
if affected > 0 { if affected > 0 {
return build_response(StatusCode::OK, affected) return build_response(StatusCode::OK, affected);
} }
} }
build_response(StatusCode::BAD_REQUEST, 0) build_response(StatusCode::BAD_REQUEST, 0)
@ -122,33 +81,30 @@ pub async fn del(
pub async fn set_jobs( pub async fn set_jobs(
agent_uid: Uuid, agent_uid: Uuid,
msg: BaseMessage<'_, Vec<Uuid>>, msg: BaseMessage<'_, Vec<Uuid>>,
db: Storage) -> Result<impl Reply, Rejection> ) -> Result<impl Reply, Rejection> {
{ match lock_db().set_jobs_for_agent(&agent_uid, &msg.into_inner()) {
match db.lock()
.unwrap()
.set_jobs_for_agent(&agent_uid, &msg.into_inner()) {
Ok(_) => build_empty_200(), Ok(_) => build_empty_200(),
Err(e) => build_response(StatusCode::BAD_REQUEST, e) Err(e) => build_response(StatusCode::BAD_REQUEST, e),
} }
} }
pub async fn report( pub async fn report(msg: BaseMessage<'_, Vec<ExactJob>>) -> Result<impl Reply, Rejection> {
msg: BaseMessage<'_, Vec<ExactJob>>, let db = lock_db();
db: Storage) -> Result<impl Reply, Rejection>
{
let db = db.lock().unwrap();
let id = msg.id; let id = msg.id;
let mut failed = vec![]; let mut failed = vec![];
for res in msg.into_inner() { for res in msg.into_inner() {
if id != res.agent_id { if id != res.agent_id {
continue continue;
} }
if let Err(e) = res.save_changes::<ExactJob>(&db.conn).map_err(USrvError::from) { if let Err(e) = res
.save_changes::<ExactJob>(&db.conn)
.map_err(ULocalError::from)
{
failed.push(e.to_string()) failed.push(e.to_string())
} }
} }
if failed.len() > 0 { if failed.len() > 0 {
let err_msg = USrvError::ProcessingError(failed.join(", ")); let err_msg = ULocalError::ProcessingError(failed.join(", "));
return build_response(StatusCode::BAD_REQUEST, err_msg); return build_response(StatusCode::BAD_REQUEST, err_msg);
} }
build_empty_200() build_empty_200()

@ -1,102 +1,89 @@
mod handlers;
mod db; mod db;
mod errors; mod handlers;
use warp::{ use warp::{body, Filter, Rejection, Reply};
Filter,
Rejection,
Reply,
body
};
extern crate log;
extern crate env_logger; extern crate env_logger;
extern crate log;
use u_lib::{ use db::lock_db;
MASTER_PORT, use serde::de::DeserializeOwned;
api::Paths, use u_lib::{api::Paths, config::MASTER_PORT, models::*};
models::*
};
use db::*;
use serde::{
de::DeserializeOwned
};
use uuid::Uuid; use uuid::Uuid;
fn get_content<M>() fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
-> impl Filter<Extract = (BaseMessage<'static, M>,),
Error = Rejection> + Clone
where where
M: ToMsg + Sync + Send + DeserializeOwned + 'static M: ToMsg + Sync + Send + DeserializeOwned + 'static,
{ {
body::content_length_limit(1024*64) body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
.and(body::json::<BaseMessage<M>>())
} }
fn init() {
env_logger::init();
lock_db();
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
env_logger::init(); init();
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) };
let base_db = UDB::new().unwrap();
let db = warp::any().map(move || base_db.clone());
let infallible_none = |_| async {
Ok::<(Option<Uuid>,), std::convert::Infallible>((None,))
};
let new_agent = warp::post() let new_agent = warp::post()
.and(warp::path(Paths::init)) .and(warp::path(Paths::init))
.and(get_content::<IAgent>()) .and(get_content::<IAgent>())
.and(db.clone())
.and_then(handlers::add_agent); .and_then(handlers::add_agent);
let get_agents = warp::get() let get_agents = warp::get()
.and(warp::path(Paths::get_agents)) .and(warp::path(Paths::get_agents))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none)) .and(
.and(db.clone()) warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(handlers::get_agents); .and_then(handlers::get_agents);
let upload_jobs = warp::post() let upload_jobs = warp::post()
.and(warp::path(Paths::upload_jobs)) .and(warp::path(Paths::upload_jobs))
.and(get_content::<Vec<JobMeta>>()) .and(get_content::<Vec<JobMeta>>())
.and(db.clone())
.and_then(handlers::upload_jobs); .and_then(handlers::upload_jobs);
let get_jobs = warp::get() let get_jobs = warp::get()
.and(warp::path(Paths::get_jobs)) .and(warp::path(Paths::get_jobs))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none)) .and(
.and(db.clone()) warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(handlers::get_jobs); .and_then(handlers::get_jobs);
let get_agent_jobs = warp::get() let get_agent_jobs = warp::get()
.and(warp::path(Paths::get_agent_jobs)) .and(warp::path(Paths::get_agent_jobs))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none)) .and(
.and(db.clone()) warp::path::param::<Uuid>()
.and_then(|uid, db| handlers::get_agent_jobs(uid, db, false)); .map(Some)
.or_else(infallible_none),
)
.and_then(|uid| handlers::get_agent_jobs(uid, false));
let get_personal_jobs = warp::get() let get_personal_jobs = warp::get()
.and(warp::path(Paths::get_agent_jobs)) .and(warp::path(Paths::get_agent_jobs))
.and(warp::path::param::<Uuid>().map(Some)) .and(warp::path::param::<Uuid>().map(Some))
.and(db.clone()) .and_then(|uid| handlers::get_agent_jobs(uid, true));
.and_then(|uid, db| handlers::get_agent_jobs(uid, db, true));
let del = warp::get() let del = warp::get()
.and(warp::path(Paths::del)) .and(warp::path(Paths::del))
.and(warp::path::param::<Uuid>()) .and(warp::path::param::<Uuid>())
.and(db.clone())
.and_then(handlers::del); .and_then(handlers::del);
let set_jobs = warp::post() let set_jobs = warp::post()
.and(warp::path(Paths::set_jobs)) .and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>()) .and(warp::path::param::<Uuid>())
.and(get_content::<Vec<Uuid>>()) .and(get_content::<Vec<Uuid>>())
.and(db.clone())
.and_then(handlers::set_jobs); .and_then(handlers::set_jobs);
let report = warp::post() let report = warp::post()
.and(warp::path(Paths::report)) .and(warp::path(Paths::report))
.and(get_content::<Vec<ExactJob>>()) .and(get_content::<Vec<ExactJob>>())
.and(db.clone())
.and_then(handlers::report); .and_then(handlers::report);
let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
@ -104,32 +91,29 @@ async fn main() {
let agent_zone = new_agent let agent_zone = new_agent
.or(get_jobs.clone()) .or(get_jobs.clone())
.or(get_personal_jobs) .or(get_personal_jobs)
.or(report) .or(report);
;
let auth_zone = auth_token let auth_zone = auth_token.and(
.and(
get_agents get_agents
.or(get_jobs) .or(get_jobs)
.or(upload_jobs) .or(upload_jobs)
.or(del) .or(del)
.or(set_jobs) .or(set_jobs)
.or(get_agent_jobs) .or(get_agent_jobs),
); );
let routes = agent_zone let routes = agent_zone.or(auth_zone);
.or(auth_zone);
warp::serve(routes.with(warp::log("warp"))) warp::serve(routes.with(warp::log("warp")))
.run(([0,0,0,0], MASTER_PORT)).await; .run(([0, 0, 0, 0], MASTER_PORT))
.await;
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
/* /*
#[tokio::test] #[tokio::test]
async fn test_gather() { async fn test_gather() {
} }
*/ */
} }

@ -22,6 +22,7 @@ env_logger = "0.8.3"
diesel-derive-enum = { version = "1", features = ["postgres"] } diesel-derive-enum = { version = "1", features = ["postgres"] }
chrono = "0.4.19" chrono = "0.4.19"
strum = { version = "0.20", features = ["derive"] } strum = { version = "0.20", features = ["derive"] }
once_cell = "1.7.2"
[dependencies.diesel] [dependencies.diesel]
version = "1.4.5" version = "1.4.5"

@ -1,22 +1,12 @@
#[allow(non_upper_case_globals)] #[allow(non_upper_case_globals)]
use crate::{ use crate::{
MASTER_SERVER, config::{MASTER_PORT, MASTER_SERVER},
MASTER_PORT,
models::*, models::*,
UResult, utils::opt_to_string,
UError, UError, UResult,
utils::opt_to_string
};
use reqwest::{
Client,
Url,
RequestBuilder
};
use std::{
net::Ipv4Addr,
str::FromStr
}; };
use reqwest::{Client, RequestBuilder, Url};
use std::{net::Ipv4Addr, str::FromStr};
use uuid::Uuid; use uuid::Uuid;
pub struct Paths; pub struct Paths;
@ -132,11 +122,10 @@ macro_rules! build_handler {
}; };
} }
pub struct ClientHandler { pub struct ClientHandler {
base_url: Url, base_url: Url,
client: Client, client: Client,
password: Option<String> password: Option<String>,
} }
impl ClientHandler { impl ClientHandler {
@ -146,10 +135,8 @@ impl ClientHandler {
.unwrap_or(MASTER_SERVER); .unwrap_or(MASTER_SERVER);
Self { Self {
client: Client::new(), client: Client::new(),
base_url: Url::parse( base_url: Url::parse(&format!("http://{}:{}", master_server, MASTER_PORT)).unwrap(),
&format!("http://{}:{}", master_server, MASTER_PORT) password: None,
).unwrap(),
password: None
} }
} }
@ -161,7 +148,7 @@ impl ClientHandler {
fn set_pwd(&self, rb: RequestBuilder) -> RequestBuilder { fn set_pwd(&self, rb: RequestBuilder) -> RequestBuilder {
match &self.password { match &self.password {
Some(p) => rb.bearer_auth(p), Some(p) => rb.bearer_auth(p),
None => rb None => rb,
} }
} }

@ -1,12 +1,11 @@
use diesel::result::Error as DslError;
use reqwest::Error as ReqError; use reqwest::Error as ReqError;
use serde::{ use serde::{Deserialize, Serialize};
Serialize,
Deserialize
};
use thiserror::Error; use thiserror::Error;
use uuid::Uuid; use uuid::Uuid;
pub type UResult<T> = std::result::Result<T, UError>; pub type UResult<T> = std::result::Result<T, UError>;
pub type ULocalResult<T> = std::result::Result<T, ULocalError>;
#[derive(Error, Debug, Serialize, Deserialize, Clone)] #[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum UError { pub enum UError {
@ -32,7 +31,7 @@ pub enum UError {
NoJob(Uuid), NoJob(Uuid),
#[error("Error opening {0}: {1}")] #[error("Error opening {0}: {1}")]
FilesystemError(String, String) FilesystemError(String, String),
} }
impl From<ReqError> for UError { impl From<ReqError> for UError {
@ -40,3 +39,15 @@ impl From<ReqError> for UError {
UError::NetError(e.to_string(), String::new()) UError::NetError(e.to_string(), String::new())
} }
} }
#[derive(Error, Debug)]
pub enum ULocalError {
#[error("{0} is not found")]
NotFound(String),
#[error("Error processing {0}")]
ProcessingError(String),
#[error(transparent)]
DBError(#[from] DslError),
}

@ -2,16 +2,16 @@
// job runner (thread) // job runner (thread)
// every job runs in other thread/process // every job runs in other thread/process
use crate::{models::*, UResult, OneOrMany}; use crate::{models::*, utils::OneOrMany, UResult};
use std::collections::HashMap; use std::collections::HashMap;
use futures::{lock::{Mutex, MutexGuard}, future::BoxFuture}; use futures::{future::BoxFuture, lock::Mutex};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tokio::{ use tokio::{
spawn, spawn,
sync::mpsc::{channel, Receiver, Sender},
task::JoinHandle, task::JoinHandle,
sync::mpsc::{channel, Receiver, Sender}
}; };
use uuid::Uuid; use uuid::Uuid;
@ -27,33 +27,33 @@ lazy_static! {
}; };
} }
async fn get_sender() -> Sender<Uuid> { fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone() FUT_CHANNEL.0.clone()
} }
struct JoinInfo { struct JoinInfo {
handle: JoinHandle<FutRes>, handle: JoinHandle<FutRes>,
completed: bool, completed: bool,
collectable: bool // indicates if future can be popped from pool via pop_task_if_completed collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
} }
pub struct Waiter { pub struct Waiter {
tasks: Vec<DynFut>, tasks: Vec<DynFut>,
fids: Vec<Uuid> fids: Vec<Uuid>,
} }
impl Waiter { impl Waiter {
pub fn new<S: OneOrMany<DynFut>>(tasks: S) -> Self { pub fn new<S: OneOrMany<DynFut>>(tasks: S) -> Self {
Self { Self {
tasks: tasks.into_vec(), tasks: tasks.into_vec(),
fids: vec![] fids: vec![],
} }
} }
pub async fn spawn(mut self) -> Self { pub async fn spawn(mut self) -> Self {
let collectable = true; //TODO: self.tasks.len() != 1; let collectable = true; //TODO: self.tasks.len() != 1;
for f in self.tasks.drain(..) { for f in self.tasks.drain(..) {
let tx = get_sender().await; let tx = get_sender();
let fid = Uuid::new_v4(); let fid = Uuid::new_v4();
self.fids.push(fid); self.fids.push(fid);
let task_wrapper = async move { let task_wrapper = async move {
@ -66,7 +66,7 @@ impl Waiter {
let handle = JoinInfo { let handle = JoinInfo {
handle: spawn(task_wrapper), handle: spawn(task_wrapper),
completed: false, completed: false,
collectable collectable,
}; };
debug!("before push: {}", fid); debug!("before push: {}", fid);
//spawn(async {}).await.ok(); //spawn(async {}).await.ok();
@ -98,10 +98,6 @@ impl Waiter {
} }
} }
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
async fn init_receiver() { async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
//info!("init_receiver: next val: {}", &fid); //info!("init_receiver: next val: {}", &fid);
@ -114,13 +110,18 @@ async fn init_receiver() {
} }
} }
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> { async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS FUT_RESULTS.lock().await.remove(&fid)
.lock() }
.await
.get_mut(&fid) { async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
let &mut JoinInfo {
handle: _,
collectable,
completed,
} = match FUT_RESULTS.lock().await.get_mut(&fid) {
Some(t) => t, Some(t) => t,
None => return None None => return None,
}; };
if collectable && completed { if collectable && completed {
let task = pop_task(fid).await.unwrap(); let task = pop_task(fid).await.unwrap();
@ -133,7 +134,8 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
pub async fn pop_completed() -> Vec<FutRes> { pub async fn pop_completed() -> Vec<FutRes> {
let mut completed: Vec<FutRes> = vec![]; let mut completed: Vec<FutRes> = vec![];
let fids = FUT_RESULTS.lock() let fids = FUT_RESULTS
.lock()
.await .await
.keys() .keys()
.map(|k| *k) .map(|k| *k)

@ -1,19 +1,15 @@
#![allow(non_upper_case_globals)] #![allow(non_upper_case_globals)]
pub mod executor; pub mod api;
pub mod config; pub mod config;
pub mod utils;
pub mod errors; pub mod errors;
pub mod api; pub mod executor;
pub mod models;
pub mod messaging; pub mod messaging;
pub mod models;
pub mod utils;
pub use { pub use config::UID;
utils::*, pub use errors::{UError, ULocalError, ULocalResult, UResult};
config::*, pub use models::{Jobstate, Jobtype}; // for schema
executor::*,
errors::*,
models::*
};
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;

@ -1,20 +1,15 @@
use serde::{ use diesel::{Identifiable, Insertable, Queryable};
Serialize, use serde::{Deserialize, Serialize};
Deserialize
};
use std::time::SystemTime;
use std::fmt; use std::fmt;
use diesel::{ use std::time::SystemTime;
Queryable,
Identifiable,
Insertable
};
use crate::{ use crate::{
models::*, models::{
jobs::{build_jobs, JobMeta, JobOutput},
schema::*,
},
utils::{systime_to_string, vec_to_string},
UID, UID,
utils::*,
models::schema::*,
}; };
use guess_host_triple::guess_host_triple; use guess_host_triple::guess_host_triple;
@ -34,12 +29,12 @@ pub struct Agent {
pub regtime: SystemTime, pub regtime: SystemTime,
pub status: Option<String>, pub status: Option<String>,
pub token: Option<String>, pub token: Option<String>,
pub username: String pub username: String,
} }
impl fmt::Display for Agent { impl fmt::Display for Agent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Agent {}", self.id); let mut out = format!("Agent: {}", self.id);
if self.alias.is_some() { if self.alias.is_some() {
out += &format!(" ({})", self.alias.as_ref().unwrap()) out += &format!(" ({})", self.alias.as_ref().unwrap())
} }
@ -64,11 +59,11 @@ pub struct IAgent {
pub platform: String, pub platform: String,
pub status: Option<String>, pub status: Option<String>,
pub token: Option<String>, pub token: Option<String>,
pub username: String pub username: String,
} }
impl IAgent {
pub async fn gather() -> IAgent { pub async fn gather() -> Self {
async fn run_cmd_fast<S: Into<String>>(cmd: S) -> String { async fn run_cmd_fast<S: Into<String>>(cmd: S) -> String {
let jm = JobMeta::from_shell(cmd); let jm = JobMeta::from_shell(cmd);
let job_result = build_jobs(jm) let job_result = build_jobs(jm)
@ -83,7 +78,7 @@ pub async fn gather() -> IAgent {
} }
#[cfg(unix)] #[cfg(unix)]
IAgent { Self {
alias: None, alias: None,
id: UID.clone(), id: UID.clone(),
hostname: run_cmd_fast("hostname").await, hostname: run_cmd_fast("hostname").await,
@ -94,20 +89,16 @@ pub async fn gather() -> IAgent {
token: None, token: None,
username: run_cmd_fast("id -un").await, username: run_cmd_fast("id -un").await,
} }
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
#[tokio::test] #[tokio::test]
async fn test_gather() { async fn test_gather() {
let cli_info = gather().await; let cli_info = IAgent::gather().await;
assert_eq!( assert_eq!(&cli_info.username, "plazmoid")
&cli_info.username,
"plazmoid"
)
} }
} }

@ -1,570 +0,0 @@
use std::{
time::{SystemTime, Duration},
thread,
sync::{RwLock, RwLockReadGuard},
cmp::PartialEq,
fmt,
string::ToString,
path::PathBuf,
fs,
process::Output,
collections::HashMap,
ops::Deref,
};
use serde::{
Serialize,
Deserialize
};
use uuid::Uuid;
use guess_host_triple::guess_host_triple;
use tokio::{
process::Command
};
use crate::{
utils::systime_to_string,
models::schema::*,
Agent,
UError,
UResult,
UID,
Waiter,
OneOrMany,
DynFut,
};
use diesel_derive_enum::DbEnum;
use diesel::{
Queryable,
Identifiable,
Insertable,
};
use strum::Display;
type Cache = HashMap<Uuid, JobMeta>;
lazy_static! {
static ref JOB_CACHE: RwLock<Cache> = RwLock::new(HashMap::new());
}
pub struct JobCache;
impl JobCache {
pub fn insert(job_meta: JobMeta) {
JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta);
}
pub fn contains(uid: &Uuid) -> bool {
JOB_CACHE.read().unwrap().contains_key(uid)
}
pub fn get(uid: &Uuid) -> Option<JobCacheHolder> {
if !Self::contains(uid) {
return None
}
let lock = JOB_CACHE.read().unwrap();
Some(JobCacheHolder(lock, uid))
}
pub fn remove(uid: &Uuid) {
JOB_CACHE.write().unwrap().remove(uid);
}
}
pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid);
impl<'jm> Deref for JobCacheHolder<'jm> {
type Target = JobMeta;
fn deref(&self) -> &Self::Target {
self.0.get(self.1).unwrap()
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
Ping,
UpdateAvailable,
JobsResultsRequest,
Terminate
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobSchedule {
Once,
Permanent,
//Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
#[PgType = "JobState"]
#[DieselType = "Jobstate"]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
//Pending, // client got a job, but not running yet
Running, // client is currently running a job
Finished,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
#[PgType = "JobType"]
#[DieselType = "Jobtype"]
pub enum JobType {
Manage,
Shell,
Python,
Binary,
Dummy
}
#[derive(Clone, Debug)]
pub struct JobOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
impl JobOutput {
const STREAM_BORDER: &'static str = "***";
const STDOUT: &'static str = "STDOUT";
const STDERR: &'static str = "STDERR";
#[inline]
fn create_delim(header: &'static str) -> String {
format!("{border} {head} {border}\n",
border = JobOutput::STREAM_BORDER,
head = header
)
}
pub fn new() -> Self {
Self {
stdout: vec![],
stderr: vec![],
}
}
pub fn stdout(mut self, data: Vec<u8>) -> Self {
self.stdout = data;
self
}
pub fn stderr(mut self, data: Vec<u8>) -> Self {
self.stderr = data;
self
}
pub fn multiline(&self) -> Vec<u8> {
let mut result: Vec<u8> = vec![];
if self.stdout.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes());
result.extend(&self.stdout);
result.push(b'\n');
}
if self.stderr.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes());
result.extend(&self.stderr);
result.push(b'\n');
}
result
}
pub fn from_raw(raw: &[u8]) -> Option<Self> {
let raw = String::from_utf8_lossy(raw);
let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| {
let mut parts = s.split(&err_header)
.map(|d| d.trim().as_bytes().to_vec())
.collect::<Vec<Vec<u8>>>()
.into_iter();
JobOutput::new()
.stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(vec![]))
})
}
pub fn into_appropriate(self) -> Vec<u8> {
if self.stdout.len() > 0 {
self.stdout
} else if self.stderr.len() > 0 {
self.stderr
} else {
b"No data".to_vec()
}
}
}
#[derive(
Serialize,
Deserialize,
Clone,
Debug,
Queryable,
Identifiable,
Insertable
)]
#[table_name = "jobs"]
pub struct JobMeta {
pub alias: String,
pub id: Uuid,
pub exec_type: JobType,
//pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Vec<u8>>,
}
impl fmt::Display for JobMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Job {}", self.id);
out += &format!(" ({})", self.alias);
out += &format!("\nExecutable type: {}", self.exec_type);
out += &format!("\nPlatform: {}", self.platform);
if self.exec_type == JobType::Shell && self.payload.is_some() {
out += &format!("\nPayload: {}", String::from_utf8_lossy(self.payload.as_ref().unwrap()));
}
write!(f, "{}", out)
}
}
impl JobMeta {
pub fn from_shell<S: Into<String>>(shell_cmd: S) -> Self {
let shell_cmd = shell_cmd.into();
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
alias: job_name.to_string(),
payload: Some(shell_cmd.into_bytes()),
..Default::default()
}
}
/*
pub fn from_file(path: PathBuf) -> UResult<Self> {
let data = fs::read(path)
.map_err(|e| UError::FilesystemError(
path.to_string_lossy().to_string(),
e.to_string()
))?;
let filename = path.file_name().unwrap().to_str().unwrap();
}*/
}
impl Default for JobMeta {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
alias: String::new(),
exec_type: JobType::Shell,
platform: guess_host_triple().unwrap_or("unknown").to_string(),
payload: None
}
}
}
#[derive(
Serialize,
Deserialize,
Clone,
Debug,
Queryable,
Identifiable,
Insertable,
AsChangeset,
)]
#[table_name = "results"]
pub struct ExactJob {
pub agent_id: Uuid,
pub created: SystemTime,
pub id: Uuid,
pub job_id: Uuid,
pub result: Option<Vec<u8>>,
pub state: JobState,
pub retcode: Option<i32>,
pub updated: SystemTime,
}
impl fmt::Display for ExactJob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Result {}", self.id);
out += &format!("\nAgent {}", self.agent_id);
out += &format!("\nJob: {}", self.job_id);
out += &format!("\nUpdated: {}", systime_to_string(&self.updated));
out += &format!("\nState: {}", self.state);
if self.state == JobState::Finished {
if self.retcode.is_some() {
out += &format!("\nReturn code: {}", self.retcode.unwrap());
}
if self.result.is_some() {
out += &format!("\nResult: {}", String::from_utf8_lossy(self.result.as_ref().unwrap()));
}
}
write!(f, "{}", out)
}
}
impl ExactJob {
pub fn from_meta(job_id: Uuid, result_id: Option<Uuid>) -> Self {
Self {
id: result_id.unwrap_or(Uuid::new_v4()),
agent_id: *UID,
job_id,
..Default::default()
}
}
//pub fn as_job_output(&self) -> JobOutput {}
}
impl Default for ExactJob {
fn default() -> Self {
Self {
agent_id: Uuid::nil(),
created: SystemTime::now(),
id: Uuid::new_v4(),
job_id: Uuid::nil(),
result: None,
state: JobState::Queued,
retcode: None,
updated: SystemTime::now()
}
}
}
pub struct Job {
exec_type: JobType,
payload: Option<Vec<u8>>,
result: ExactJob
}
impl Job {
fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult<Self> {
match job_meta.exec_type {
JobType::Shell => {
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string();
if job_meta.platform != curr_platform {
return Err(UError::InsuitablePlatform(
job_meta.platform.clone(), curr_platform
))
}
let job_meta = job_meta.clone();
Ok(Self {
exec_type: job_meta.exec_type,
payload: job_meta.payload,
result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id))
})
},
_ => todo!()
}
}
async fn run(mut self) -> UResult<ExactJob> {
match self.exec_type {
JobType::Shell => {
let str_payload = match &self.payload {
Some(box_payload) => {
String::from_utf8_lossy(box_payload).into_owned()
}
None => unimplemented!()
};
let mut cmd_parts = str_payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
.into_iter();
let cmd = cmd_parts.nth(0).unwrap();
let args = cmd_parts.collect::<Vec<_>>();
let cmd_result = Command::new(cmd)
.args(args)
.output()
.await;
let (data, retcode) = match cmd_result {
Ok(Output {status, stdout, stderr}) => {
(
Some(JobOutput::new()
.stdout(stdout)
.stderr(stderr)
.multiline()
),
status.code()
)
}
Err(e) => {
(
Some(UError::JobError(e.to_string()).to_string().into_bytes()),
None
)
}
};
self.result.result = data;
self.result.retcode = retcode;
self.result.updated = SystemTime::now();
self.result.state = JobState::Finished;
},
_ => todo!()
}
Ok(self.result)
}
}
pub fn build_jobs_with_result<J: OneOrMany<ExactJob>>(job_requests: J) -> Waiter {
let prepared_jobs = job_requests.into_vec()
.into_iter()
.filter_map(|jr| -> Option<DynFut> {
let job = {
let job_meta = JobCache::get(&jr.job_id);
if job_meta.is_none() {
Err(UError::NoJob(jr.job_id))
} else {
Job::build(&*job_meta.unwrap(), jr.id)
}
};
match job {
Ok(j) => Some(Box::pin(j.run())),
Err(e) => {
warn!("Job building error: {}", e);
None
}
}
}).collect::<Vec<DynFut>>();
Waiter::new(prepared_jobs)
}
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let job_requests = job_metas.into_vec().into_iter().map(|jm| {
let j_uid = jm.id;
JobCache::insert(jm);
ExactJob::from_meta(j_uid, None)
}).collect::<Vec<ExactJob>>();
build_jobs_with_result(job_requests)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{build_jobs, utils::vec_to_string, pop_completed};
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
build_jobs(sleep_jobs).run_until_complete().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS+2)
}
#[tokio::test]
async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("whoami");
let job_result = build_jobs(job)
.run_one_until_complete()
.await;
let stdout = JobOutput::from_raw(
&job_result.unwrap().result.unwrap()
).unwrap().stdout;
assert_eq!(
vec_to_string(&stdout).trim(),
"plazmoid"
);
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> UResult<()> {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let longest_job = build_jobs(longest_job).spawn().await;
let ls = build_jobs(JobMeta::from_shell("ls"))
.run_one_until_complete()
.await
.unwrap();
assert_eq!(ls.retcode.unwrap(), 0);
let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap();
let folders = String::from_utf8_lossy(
&result.stdout
);
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)))
.collect();
let ls_subfolders = build_jobs(
subfolders_jobs
).run_until_complete().await;
for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk");
let job_result = build_jobs(job).run_one_until_complete().await.unwrap();
let output = JobOutput::from_raw(&job_result.result.unwrap());
assert!(output.is_none());
assert!(job_result.retcode.is_none());
Ok(())
}
#[test]
fn test_to_multiline() {
let mut output = JobOutput::new();
output.stdout = b"lol".to_vec();
output.stderr = b"kek".to_vec();
assert_eq!(
vec_to_string(&output.multiline()),
String::from(
"*** STDOUT ***\n\
lol\n\
*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_to_multiline_stderr_only() {
let mut output = JobOutput::new();
output.stderr = b"kek".to_vec();
assert_eq!(
vec_to_string(&output.multiline()),
String::from(
"*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_from_multiline() {
let txt = "*** STDOUT ***\n\
puk\n".as_bytes();
let output = JobOutput::from_raw(txt).unwrap();
assert_eq!(
vec_to_string(&output.stdout),
"puk".to_string()
);
assert_eq!(output.stderr.len(), 0);
}
}

@ -0,0 +1,116 @@
use super::{ExactJob, JobCache, JobMeta, JobOutput, JobState, JobType};
use crate::{
executor::{DynFut, Waiter},
utils::OneOrMany,
UError, UResult,
};
use guess_host_triple::guess_host_triple;
use std::{process::Output, time::SystemTime};
use tokio::process::Command;
use uuid::Uuid;
pub struct Job {
exec_type: JobType,
payload: Option<Vec<u8>>,
result: ExactJob,
}
impl Job {
fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult<Self> {
match job_meta.exec_type {
JobType::Shell => {
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string();
if job_meta.platform != curr_platform {
return Err(UError::InsuitablePlatform(
job_meta.platform.clone(),
curr_platform,
));
}
let job_meta = job_meta.clone();
Ok(Self {
exec_type: job_meta.exec_type,
payload: job_meta.payload,
result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id)),
})
}
_ => todo!(),
}
}
async fn run(mut self) -> UResult<ExactJob> {
match self.exec_type {
JobType::Shell => {
let str_payload = match &self.payload {
Some(box_payload) => String::from_utf8_lossy(box_payload).into_owned(),
None => unimplemented!(),
};
let mut cmd_parts = str_payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
.into_iter();
let cmd = cmd_parts.nth(0).unwrap();
let args = cmd_parts.collect::<Vec<_>>();
let cmd_result = Command::new(cmd).args(args).output().await;
let (data, retcode) = match cmd_result {
Ok(Output {
status,
stdout,
stderr,
}) => (
JobOutput::new().stdout(stdout).stderr(stderr).multiline(),
status.code(),
),
Err(e) => (
UError::JobError(e.to_string()).to_string().into_bytes(),
None,
),
};
self.result.result = Some(data);
self.result.retcode = retcode;
self.result.updated = SystemTime::now();
self.result.state = JobState::Finished;
}
_ => todo!(),
}
Ok(self.result)
}
}
pub fn build_jobs_with_result<J: OneOrMany<ExactJob>>(job_requests: J) -> Waiter {
let prepared_jobs = job_requests
.into_vec()
.into_iter()
.filter_map(|jr| -> Option<DynFut> {
let job = {
let job_meta = JobCache::get(&jr.job_id);
if job_meta.is_none() {
Err(UError::NoJob(jr.job_id))
} else {
Job::build(&*job_meta.unwrap(), jr.id)
}
};
match job {
Ok(j) => Some(Box::pin(j.run())),
Err(e) => {
warn!("Job building error: {}", e);
None
}
}
})
.collect::<Vec<DynFut>>();
Waiter::new(prepared_jobs)
}
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let job_requests = job_metas
.into_vec()
.into_iter()
.map(|jm| {
let j_uid = jm.id;
JobCache::insert(jm);
ExactJob::from_meta(j_uid, None)
})
.collect::<Vec<ExactJob>>();
build_jobs_with_result(job_requests)
}

@ -0,0 +1,47 @@
use crate::models::JobMeta;
use std::{
collections::HashMap,
ops::Deref,
sync::{RwLock, RwLockReadGuard},
};
use uuid::Uuid;
type Cache = HashMap<Uuid, JobMeta>;
lazy_static! {
static ref JOB_CACHE: RwLock<Cache> = RwLock::new(HashMap::new());
}
pub struct JobCache;
impl JobCache {
pub fn insert(job_meta: JobMeta) {
JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta);
}
pub fn contains(uid: &Uuid) -> bool {
JOB_CACHE.read().unwrap().contains_key(uid)
}
pub fn get(uid: &Uuid) -> Option<JobCacheHolder> {
if !Self::contains(uid) {
return None;
}
let lock = JOB_CACHE.read().unwrap();
Some(JobCacheHolder(lock, uid))
}
pub fn remove(uid: &Uuid) {
JOB_CACHE.write().unwrap().remove(uid);
}
}
pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid);
impl<'jm> Deref for JobCacheHolder<'jm> {
type Target = JobMeta;
fn deref(&self) -> &Self::Target {
self.0.get(self.1).unwrap()
}
}

@ -0,0 +1,68 @@
use super::JobType;
use crate::models::schema::*;
use diesel::{Identifiable, Insertable, Queryable};
use guess_host_triple::guess_host_triple;
use serde::{Deserialize, Serialize};
use std::fmt;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable)]
#[table_name = "jobs"]
pub struct JobMeta {
pub alias: String,
pub id: Uuid,
pub exec_type: JobType,
//pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Vec<u8>>,
}
impl fmt::Display for JobMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Job {}", self.id);
out += &format!(" ({})", self.alias);
out += &format!("\nExecutable type: {}", self.exec_type);
out += &format!("\nPlatform: {}", self.platform);
if self.exec_type == JobType::Shell && self.payload.is_some() {
out += &format!(
"\nPayload: {}",
String::from_utf8_lossy(self.payload.as_ref().unwrap())
);
}
write!(f, "{}", out)
}
}
impl JobMeta {
pub fn from_shell<S: Into<String>>(shell_cmd: S) -> Self {
let shell_cmd = shell_cmd.into();
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
alias: job_name.to_string(),
payload: Some(shell_cmd.into_bytes()),
..Default::default()
}
}
/*
pub fn from_file(path: PathBuf) -> UResult<Self> {
let data = fs::read(path)
.map_err(|e| UError::FilesystemError(
path.to_string_lossy().to_string(),
e.to_string()
))?;
let filename = path.file_name().unwrap().to_str().unwrap();
}*/
}
impl Default for JobMeta {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
alias: String::new(),
exec_type: JobType::Shell,
platform: guess_host_triple().unwrap_or("unknown").to_string(),
payload: None,
}
}
}

@ -0,0 +1,164 @@
use diesel_derive_enum::DbEnum;
use serde::{Deserialize, Serialize};
use strum::Display;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
Ping,
UpdateAvailable,
JobsResultsRequest,
Terminate,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobSchedule {
Once,
Permanent,
//Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
#[PgType = "JobState"]
#[DieselType = "Jobstate"]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
//Pending, // client got a job, but not running yet
Running, // client is currently running a job
Finished,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
#[PgType = "JobType"]
#[DieselType = "Jobtype"]
pub enum JobType {
Manage,
Shell,
Python,
Binary,
Dummy,
}
#[derive(Clone, Debug)]
pub struct JobOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
impl JobOutput {
const STREAM_BORDER: &'static str = "***";
const STDOUT: &'static str = "STDOUT";
const STDERR: &'static str = "STDERR";
#[inline]
fn create_delim(header: &'static str) -> String {
format!(
"{border} {head} {border}\n",
border = JobOutput::STREAM_BORDER,
head = header
)
}
pub fn new() -> Self {
Self {
stdout: vec![],
stderr: vec![],
}
}
pub fn stdout(mut self, data: Vec<u8>) -> Self {
self.stdout = data;
self
}
pub fn stderr(mut self, data: Vec<u8>) -> Self {
self.stderr = data;
self
}
pub fn multiline(&self) -> Vec<u8> {
let mut result: Vec<u8> = vec![];
if self.stdout.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes());
result.extend(&self.stdout);
result.push(b'\n');
}
if self.stderr.len() > 0 {
result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes());
result.extend(&self.stderr);
result.push(b'\n');
}
result
}
pub fn from_raw(raw: &[u8]) -> Option<Self> {
let raw = String::from_utf8_lossy(raw);
let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| {
let mut parts = s
.split(&err_header)
.map(|d| d.trim().as_bytes().to_vec())
.collect::<Vec<Vec<u8>>>()
.into_iter();
JobOutput::new()
.stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(vec![]))
})
}
pub fn into_appropriate(self) -> Vec<u8> {
if self.stdout.len() > 0 {
self.stdout
} else if self.stderr.len() > 0 {
self.stderr
} else {
b"No data".to_vec()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::vec_to_string;
#[test]
fn test_to_multiline() {
let mut output = JobOutput::new();
output.stdout = b"lol".to_vec();
output.stderr = b"kek".to_vec();
assert_eq!(
vec_to_string(&output.multiline()),
String::from(
"*** STDOUT ***\n\
lol\n\
*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_to_multiline_stderr_only() {
let mut output = JobOutput::new();
output.stderr = b"kek".to_vec();
assert_eq!(
vec_to_string(&output.multiline()),
String::from(
"*** STDERR ***\n\
kek\n"
)
)
}
#[test]
fn test_from_multiline() {
let txt = "*** STDOUT ***\n\
puk\n"
.as_bytes();
let output = JobOutput::from_raw(txt).unwrap();
assert_eq!(vec_to_string(&output.stdout), "puk".to_string());
assert_eq!(output.stderr.len(), 0);
}
}

@ -0,0 +1,7 @@
pub mod builder;
pub mod cache;
pub mod meta;
pub mod misc;
pub mod result;
pub use {builder::*, cache::*, meta::*, misc::*, result::*};

@ -0,0 +1,155 @@
use super::JobState;
use crate::{models::schema::*, utils::systime_to_string, UID};
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use std::{fmt, time::SystemTime};
use uuid::Uuid;
#[derive(
Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable, AsChangeset,
)]
#[table_name = "results"]
pub struct ExactJob {
pub agent_id: Uuid,
pub created: SystemTime,
pub id: Uuid,
pub job_id: Uuid,
pub result: Option<Vec<u8>>,
pub state: JobState,
pub retcode: Option<i32>,
pub updated: SystemTime,
}
impl Default for ExactJob {
fn default() -> Self {
Self {
agent_id: Uuid::nil(),
created: SystemTime::now(),
id: Uuid::new_v4(),
job_id: Uuid::nil(),
result: None,
state: JobState::Queued,
retcode: None,
updated: SystemTime::now(),
}
}
}
impl fmt::Display for ExactJob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Result {}", self.id);
out += &format!("\nAgent {}", self.agent_id);
out += &format!("\nJob: {}", self.job_id);
out += &format!("\nUpdated: {}", systime_to_string(&self.updated));
out += &format!("\nState: {}", self.state);
if self.state == JobState::Finished {
if self.retcode.is_some() {
out += &format!("\nReturn code: {}", self.retcode.unwrap());
}
if self.result.is_some() {
out += &format!(
"\nResult: {}",
String::from_utf8_lossy(self.result.as_ref().unwrap())
);
}
}
write!(f, "{}", out)
}
}
impl ExactJob {
pub fn from_meta(job_id: Uuid, result_id: Option<Uuid>) -> Self {
Self {
id: result_id.unwrap_or(Uuid::new_v4()),
agent_id: *UID,
job_id,
..Default::default()
}
}
//pub fn as_job_output(&self) -> JobOutput {}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
models::jobs::{build_jobs, JobMeta, JobOutput},
utils::vec_to_string,
UResult,
};
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
build_jobs(sleep_jobs).run_until_complete().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
#[tokio::test]
async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("whoami");
let job_result = build_jobs(job).run_one_until_complete().await;
let stdout = JobOutput::from_raw(&job_result.unwrap().result.unwrap())
.unwrap()
.stdout;
assert_eq!(vec_to_string(&stdout).trim(), "plazmoid");
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> UResult<()> {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let longest_job = build_jobs(longest_job).spawn().await;
let ls = build_jobs(JobMeta::from_shell("ls"))
.run_one_until_complete()
.await
.unwrap();
assert_eq!(ls.retcode.unwrap(), 0);
let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap();
let folders = String::from_utf8_lossy(&result.stdout);
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)))
.collect();
let ls_subfolders = build_jobs(subfolders_jobs).run_until_complete().await;
for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk");
let job_result = build_jobs(job).run_one_until_complete().await.unwrap();
let output = JobOutput::from_raw(&job_result.result.unwrap());
assert!(output.is_none());
assert!(job_result.retcode.is_none());
Ok(())
}
}
Loading…
Cancel
Save