diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index 3acc6de..5950181 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -13,7 +13,7 @@ use { std::env, u_lib::{ api::ClientHandler, - contracts::*, + models::*, send_jobs_to_executor, }, }; diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index 8801333..24e8626 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -18,7 +18,7 @@ async fn main() -> Result<(), &'static str> { "ls" => { let result = cli_handler.ls().await; for cli in result.iter() { - println!("{:#?}", cli) + println!("{}", cli.0) } }, _ => return Err("Unknown method") diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 67cdf5f..0decc93 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -1,5 +1,4 @@ use u_lib::{ - contracts::*, models::* }; use warp::{ diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index 31971f2..b9e32e2 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -15,7 +15,6 @@ use env_logger; use u_lib::{ MASTER_PORT, - contracts::*, api::Paths, models::* }; @@ -52,6 +51,11 @@ async fn main() { .and(warp::path(Paths::ls)) .and(db.clone()) .and_then(handlers::get_agents); + + let upload_job = warp::post() + .and(warp::path(Paths::upload_job)) + .and(db.clone()) + .and_then(handlers::upload_job); /* let get_jobs = warp::get() .and(warp::path(Paths::get_jobs)) @@ -86,6 +90,7 @@ async fn main() { let auth_zone = auth_token .and(get_agents + .or(upload_job) // .or(set_jobs) // .or(get_job_results) ) diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 58c71c5..61c00d1 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -3,7 +3,6 @@ use crate::{ MASTER_SERVER, MASTER_PORT, - contracts::*, models::*, UResult, UError @@ -158,6 +157,8 @@ build_handler!(GET ls() -> ItemWrap>); build_handler!(GET get_jobs() -> ItemWrap>); // add client to server's db build_handler!(POST init(IAgent) -> RawMsg); +// create and upload job (A) +//build_handler!(POST upload_jobs) // ??? /*build_handler!(POST del() -> ()); // set jobs for client (A) diff --git a/lib/u_lib/src/contracts/agent.rs b/lib/u_lib/src/contracts/agent.rs deleted file mode 100644 index 91d3f85..0000000 --- a/lib/u_lib/src/contracts/agent.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::{ - collections::HashMap, - time::SystemTime -}; -use crate::{ - contracts::*, - UID, - exec_job, - utils::vec_to_string, - models::* -}; - -use guess_host_triple::guess_host_triple; - -pub async fn gather() -> IAgent { - - async fn run_cmd_fast(cmd: String) -> String { - let job = exec_job( - JobMeta::from_shell_arc(cmd) - ).await; - let job_result = match job.unwrap().data.unwrap() { - Ok(output) => output.multiline(), - Err(e) => e.to_string() - }; - JobOutput::from_multiline(&job_result) - .map(|o| vec_to_string(&o.into_appropriate())) - .unwrap_or(job_result) - } - - #[cfg(unix)] - IAgent { - alias: None, - id: UID.clone(), - hostname: run_cmd_fast("hostname".to_string()).await, - is_root: &run_cmd_fast("id -u".to_string()).await == "0", - is_root_allowed: false, //TODO - platform: guess_host_triple().unwrap_or("Error").to_string(), - status: None, //TODO - token: None, //TODO - username: run_cmd_fast("id -un".to_string()).await, - } -} - - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_gather() { - let cli_info = gather().await; - assert_eq!( - &cli_info.username, - "plazmoid" - ) - } - -} diff --git a/lib/u_lib/src/contracts/mod.rs b/lib/u_lib/src/contracts/mod.rs deleted file mode 100644 index 4399f6e..0000000 --- a/lib/u_lib/src/contracts/mod.rs +++ /dev/null @@ -1,36 +0,0 @@ -pub mod jobs; -pub mod messaging; -pub mod agent; - -pub use { - messaging::*, - jobs::*, -}; - -use std::{ - borrow::Cow -}; -use crate::models::*; - -macro_rules! to_message { - ($($type:ty),+) => { $( - - impl ToMsg for $type {} - - impl<'cow> From<$type> for Cow<'cow, $type> { - #[inline] - fn from(obj: $type) -> Cow<'cow, $type> { - Cow::Owned(obj) - } - } - - impl<'cow> From<&'cow $type> for Cow<'cow, $type> { - #[inline] - fn from(obj: &'cow $type) -> Cow<'cow, $type> { - Cow::Borrowed(obj) - } - } )+ - } -} - -to_message!(IAgent, Agent, RawMsg, JobMeta, JobResult); diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index cec5d6e..d32772a 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -2,142 +2,52 @@ // job runner (thread) // every job runs in other thread/process /* -use cron::Schedule as CronSchedule; enum Schedule { Persistent, // run forever, restart if stops (set max_retries) Cron(CronSchedule), Once } - */ use crate::{ - contracts::*, + models::*, UResult, }; -use std::{ - sync::{Mutex, MutexGuard, Arc}, -}; -//use tokio::task::JoinHandle; -use futures::future::{join_all, JoinAll}; -use futures::Future; +use std::collections::HashMap; +use std::pin::Pin; +use std::thread::sleep; +use std::time::{Duration, Instant}; +use tokio::process::Command; -type Executables = Vec; +use futures::{lock::Mutex, prelude::*}; +use lazy_static::lazy_static; +use tokio::{prelude::*, spawn, task::JoinHandle}; +use uuid::Uuid; -struct AsyncExecutor { - new_jobs: Executables, - promises: Executables -} - -impl AsyncExecutor { - pub fn new() -> Self { - Self { - new_jobs: vec![], - promises: vec![] - } - } -/* - pub fn process_jobs(&mut self) { - new_jobs - if job.state() == JobState::Pending { - tokio::spawn(job.run); - } - } -*/ - pub async fn apply_job(&mut self, new_job: Job) -> UResult { - /*let id = new_job.id(); - let mut job_pool = self.jobs.lock().unwrap(); - job_pool.push(new_job); - id*/ - tokio::spawn(async move { - new_job.run().await - }).await.unwrap() - } - - pub async fn apply_jobs(&mut self, new_jobs: Vec) -> Vec> { - /* - let mut job_pool = self.jobs.lock().unwrap(); - job_pool.extend( - new_jobs.into_iter() - .map(|job| (job.id(), job)) - );*/ - let futures = new_jobs.into_iter().map(|mut job| { - job.run() - }).collect::>(); - join_all(futures).await - - } -/* - pub fn get_job_result(&self, id: &Uuid) -> Option { - let mut job_pool = self.jobs.lock().unwrap(); - let (state, result) = match job_pool.get(id) { - Some(job) => (job.state(), job.get_result()), - None => return None - }; - if state == JobState::Finished { - job_pool.remove(&id); - } - Some(result) - } - - pub fn get_all_results(&self) -> Vec { - let mut job_pool = self.jobs.lock().unwrap(); - let to_remove = job_pool.values() - .filter(|job| job.finished()) - .map(|job| job.id()) - .collect::>(); - let results = job_pool.values() - .map(|job| job.get_result()) - .collect(); - to_remove.into_iter().for_each(|id| { - job_pool.remove(&id); - }); - results - }*/ -} +pub type FutRes = String; +type BoxFut = Pin + Send + 'static>>; lazy_static! { - static ref EXECUTOR: Mutex = - Mutex::new(AsyncExecutor::new()); -} -/* -pub fn get_job_result(id: &Uuid, wait: bool) -> Option { - let executor: MutexGuard = EXECUTOR.lock().unwrap(); - loop { - let result = executor.get_job_result(&id); - if wait { - if result.is_some() && result.as_ref().unwrap().state == JobState::Finished { - return result - } - thread::sleep(Duration::from_secs(1)) - } else { - return result - } - } + static ref FUT_RESULTS: Mutex>> = Mutex::new(HashMap::new()); } -pub fn get_all_results() -> Vec { - let executor: MutexGuard = EXECUTOR.lock().unwrap(); - executor.get_all_results() +pub async fn apply_tasks(tasks: Vec + Send + 'static>) -> Vec { + let mut fids = Vec::::new(); + for f in tasks.into_iter() { + let fid = Uuid::new_v4(); + fids.push(fid); + let result = spawn(Box::pin(f)); + FUT_RESULTS.lock().await.insert(fid, result); + } + fids } -*/ -// run jobs without awaiting (now is waiting) -pub async fn send_jobs_to_executor( - jobs: Vec -) -> Vec> { - let mut executor: MutexGuard = EXECUTOR.lock().unwrap(); - let executables = jobs.into_iter() - .map(|job_meta| Job::new(job_meta)) - .collect(); - executor.apply_jobs(executables).await +pub async fn apply_task(task: impl Future + Send + 'static) -> Uuid { + apply_tasks(vec![Box::pin(task)]).await[0] } -// run job and await result -pub async fn exec_job(job_meta: JobMetaRef) -> UResult { - let mut executor: MutexGuard = EXECUTOR.lock().unwrap(); - let job = Job::new(job_meta); - executor.apply_job(job).await +pub async fn pop(fid: Uuid) -> Option> { + FUT_RESULTS.lock().await.remove(&fid) } \ No newline at end of file diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index 444a331..fa9216d 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -2,9 +2,9 @@ pub mod executor; pub mod config; pub mod utils; pub mod errors; -pub mod contracts; pub mod api; pub mod models; +pub mod messaging; pub use { utils::*, @@ -18,4 +18,4 @@ pub use { extern crate lazy_static; #[macro_use] -extern crate diesel; \ No newline at end of file +extern crate diesel; diff --git a/lib/u_lib/src/contracts/messaging.rs b/lib/u_lib/src/messaging.rs similarity index 100% rename from lib/u_lib/src/contracts/messaging.rs rename to lib/u_lib/src/messaging.rs diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 4a440f0..2bf426a 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -3,15 +3,23 @@ use serde::{ Deserialize }; use std::time::SystemTime; +use std::collections::HashMap; use diesel::{ Queryable, Identifiable, Insertable }; -use crate::models::schema::*; -use uuid::Uuid; +; +use crate::{ + models::* + UID, + exec_job, + utils::vec_to_string, + models::schema::* +}; -type Uid = String; +use guess_host_triple::guess_host_triple; +use uuid::Uuid; //belongs_to #[derive(Clone, Debug, Serialize, Deserialize, Identifiable, Queryable)] @@ -42,4 +50,50 @@ pub struct IAgent { pub status: Option, pub token: Option, pub username: String -} \ No newline at end of file +} + + +pub async fn gather() -> IAgent { + + async fn run_cmd_fast(cmd: String) -> String { + let job = exec_job( + JobMeta::from_shell_arc(cmd) + ).await; + let job_result = match job.unwrap().data.unwrap() { + Ok(output) => output.multiline(), + Err(e) => e.to_string() + }; + JobOutput::from_multiline(&job_result) + .map(|o| vec_to_string(&o.into_appropriate())) + .unwrap_or(job_result) + } + + #[cfg(unix)] + IAgent { + alias: None, + id: UID.clone(), + hostname: run_cmd_fast("hostname".to_string()).await, + is_root: &run_cmd_fast("id -u".to_string()).await == "0", + is_root_allowed: false, //TODO + platform: guess_host_triple().unwrap_or("Error").to_string(), + status: None, //TODO + token: None, //TODO + username: run_cmd_fast("id -un".to_string()).await, + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_gather() { + let cli_info = gather().await; + assert_eq!( + &cli_info.username, + "plazmoid" + ) + } + +} diff --git a/lib/u_lib/src/contracts/jobs.rs b/lib/u_lib/src/models/jobs.rs similarity index 89% rename from lib/u_lib/src/contracts/jobs.rs rename to lib/u_lib/src/models/jobs.rs index 240dc6d..d086c02 100644 --- a/lib/u_lib/src/contracts/jobs.rs +++ b/lib/u_lib/src/models/jobs.rs @@ -9,18 +9,20 @@ use serde::{ Deserialize }; use uuid::Uuid; +use guess_host_triple::guess_host_triple; use tokio::process::Command; -use super::*; use crate::{ + models::* UError, UErrType, UErrType::JobError, JobErrType, UResult, - utils::format_err + utils::format_err, + UID }; -pub type JobMetaRef = Arc>; +//pub type JobMetaRef = Arc>; #[derive(Serialize, Deserialize, Clone, Debug)] pub enum ManageAction { @@ -48,7 +50,7 @@ pub enum JobState { #[derive(Serialize, Deserialize, Clone, Debug)] pub enum JobType { - Manage(ManageAction), + Manage, Shell, Python, Binary @@ -57,13 +59,10 @@ pub enum JobType { #[derive(Serialize, Deserialize, Clone, Debug)] pub struct JobMeta { pub id: Uuid, - pub name: String, - pub created: SystemTime, - pub updated: SystemTime, - pub state: JobState, + pub alias: String, pub exec_type: JobType, pub schedule: JobSchedule, - //pub append_result: bool, //true: append, false: rewrite + pub platform: String, pub payload: Option>>, } @@ -72,12 +71,10 @@ impl JobMeta { let job_name = shell_cmd.split(" ").nth(0).unwrap(); Self { id: Uuid::new_v4(), - name: job_name.to_string(), - created: SystemTime::now(), - updated: SystemTime::now(), - state: JobState::Pending, + alias: job_name.to_string(), exec_type: JobType::Shell, schedule: JobSchedule::Once, + platform: guess_host_triple().unwrap_or("").to_string(), payload: Some(Box::new(shell_cmd.into_bytes())) } } @@ -87,10 +84,6 @@ impl JobMeta { Self::from_shell(shell_cmd) )) } - - pub fn touch(&mut self) { - self.updated = SystemTime::now(); - } } @@ -173,36 +166,42 @@ impl JobOutput { #[derive(Serialize, Deserialize, Clone, Debug)] pub struct JobResult { - pub id: Uuid, - pub data: Option>, + //pub id: i32, + pub agent_id: Uuid, + pub job_id: Uuid, + pub result: Option>, pub state: JobState, pub retcode: Option, pub timestamp: SystemTime, } +impl JobResult { + pub fn from_meta(meta: &JobMeta) -> Self { + let job_id = meta.id.clone(); + let state = meta.state.clone(); + JobResult { + agent_id: *UID, + job_id, + state: if state == JobState::Queued { + JobState::Pending + } else { + state + }, + result: None, + retcode: None, + timestamp: SystemTime::now() + } + } +} pub struct Job { - result: JobResult, - meta: JobMetaRef, + result: JobResult } impl Job { - pub fn new(job_meta: JobMetaRef) -> Self { - let id = job_meta.lock().unwrap().id.clone(); - let state = job_meta.lock().unwrap().state.clone(); + pub fn new(job_meta: JobMeta) -> Self { Self { - result: JobResult { - id, - state: if state == JobState::Queued { - JobState::Pending - } else { - state - }, - data: None, - retcode: None, - timestamp: SystemTime::now() - }, - meta: job_meta, + result: JobResult::from_meta(&job_meta), } } @@ -311,7 +310,7 @@ impl Job { self.state() == JobState::Finished } - pub fn into_result(mut self) -> JobResult { + pub fn into_result(mut self) -> JobResult { //TODO: Cow self.result.timestamp = SystemTime::now(); self.result } diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index 6758fca..665e543 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -1,5 +1,38 @@ mod agent; pub mod schema; +pub mod jobs; +pub use crate::{ + models::{ + agent::*, + jobs::*, + }, + messaging::*, +}; -pub use agent::*; +use std::{ + borrow::Cow +}; + +macro_rules! to_message { + ($($type:ty),+) => { $( + + impl ToMsg for $type {} + + impl<'cow> From<$type> for Cow<'cow, $type> { + #[inline] + fn from(obj: $type) -> Cow<'cow, $type> { + Cow::Owned(obj) + } + } + + impl<'cow> From<&'cow $type> for Cow<'cow, $type> { + #[inline] + fn from(obj: &'cow $type) -> Cow<'cow, $type> { + Cow::Borrowed(obj) + } + } )+ + } +} + +to_message!(IAgent, Agent, RawMsg, JobMeta, JobResult); diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index ed89d5c..e69de29 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -1,73 +0,0 @@ -table! { - agents (id) { - alias -> Nullable, - agent_id -> Text, - hostname -> Text, - id -> Integer, - is_root -> Bool, - is_root_allowed -> Bool, - last_active -> Timestamp, - platform -> Text, - regtime -> Timestamp, - status -> Nullable, - token -> Nullable, - username -> Text, - } -} - -table! { - certificates (id) { - agent_id -> Integer, - id -> Integer, - is_revoked -> Bool, - } -} - -table! { - ip_addrs (id) { - agent_id -> Integer, - check_ts -> Timestamp, - gateway -> Nullable, - id -> Integer, - iface -> Text, - ip_addr -> Text, - is_gray -> Bool, - netmask -> Text, - } -} - -table! { - jobs (id) { - alias -> Nullable, - id -> Integer, - job_type -> Text, - exec_type -> Text, - platform -> Nullable, - data -> Binary, - } -} - -table! { - results (id) { - agent_id -> Integer, - created -> Timestamp, - id -> Integer, - job_id -> Integer, - result -> Nullable, - status -> Nullable, - ts -> Timestamp, - } -} - -joinable!(certificates -> agents (agent_id)); -joinable!(ip_addrs -> agents (agent_id)); -joinable!(results -> agents (agent_id)); -joinable!(results -> jobs (job_id)); - -allow_tables_to_appear_in_same_query!( - agents, - certificates, - ip_addrs, - jobs, - results, -); diff --git a/migrations/2020-10-24-111622_create_all/down.sql b/migrations/2020-10-24-111622_create_all/down.sql index 7d59dc2..faef607 100644 --- a/migrations/2020-10-24-111622_create_all/down.sql +++ b/migrations/2020-10-24-111622_create_all/down.sql @@ -1,4 +1 @@ -DROP TABLE agents; -DROP TABLE ip_addrs; -DROP TABLE jobs; -DROP TABLE results; \ No newline at end of file +DROP DATABASE u_db; diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index f88007b..ea352d1 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -1,4 +1,6 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +CREATE TYPE JOB_TYPE AS ENUM ('shell', 'manage', 'binary', 'python'); +CREATE TYPE TASK_STATUS AS ENUM ('queued', 'running', 'finished'); CREATE TABLE IF NOT EXISTS agents ( alias TEXT @@ -21,7 +23,7 @@ CREATE TABLE IF NOT EXISTS ip_addrs ( agent_id UUID NOT NULL , check_ts TIMESTAMP NOT NULL , gateway TEXT - , id SERIAL + , id UUID NOT NULL DEFAULT uuid_generate_v4() , iface TEXT NOT NULL , ip_addr TEXT NOT NULL , is_gray BOOLEAN NOT NULL DEFAULT true @@ -32,13 +34,11 @@ CREATE TABLE IF NOT EXISTS ip_addrs ( CREATE TABLE IF NOT EXISTS jobs ( alias TEXT - , id SERIAL + , id UUID NOT NULL DEFAULT uuid_generate_v4() -- Shell, Binary (with program download), -- Python (with program and python download if not exist), Management - , job_type TEXT CHECK(job_type IN ('S','B','P','M')) NOT NULL DEFAULT 'S' - -- Executable type: ALL - no matter, W - windows, L = linux - , exec_type TEXT CHECK(exec_type IN ('ALL', 'W', 'L')) NOT NULL DEFAULT 'L' - , platform TEXT CHECK(platform IN ('x86', 'x64', 'aarch32', 'aarch64')) + , exec_type JOB_TYPE NOT NULL DEFAULT 'shell' + , platform TEXT NOT NULL , path TEXT NOT NULL , PRIMARY KEY(id) ); @@ -46,11 +46,12 @@ CREATE TABLE IF NOT EXISTS jobs ( CREATE TABLE IF NOT EXISTS results ( agent_id UUID NOT NULL , created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP - , id SERIAL + , id UUID NOT NULL DEFAULT uuid_generate_v4() , job_id INTEGER NOT NULL , result TEXT + , retcode INTEGER -- Queued, Pending, Running, Finished - , status TEXT CHECK(status IN ('Q', 'P', 'R', 'F')) + , state TASK_STATUS NOT NULL DEFAULT 'queued' , ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , FOREIGN KEY(agent_id) REFERENCES agents(id) , FOREIGN KEY(job_id) REFERENCES jobs(id) @@ -59,7 +60,7 @@ CREATE TABLE IF NOT EXISTS results ( CREATE TABLE IF NOT EXISTS certificates ( agent_id UUID NOT NULL - , id SERIAL + , id UUID NOT NULL DEFAULT uuid_generate_v4() , is_revoked BOOLEAN NOT NULL DEFAULT FALSE , PRIMARY KEY(id) , FOREIGN KEY(agent_id) REFERENCES agents(id)