From df79be1a69fe2d6e631f8ef29c06d4b54c213f25 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Mon, 31 Aug 2020 13:15:41 +0500 Subject: [PATCH] working sync jobs --- bin/u_agent/src/main.rs | 12 +- bin/u_panel/src/main.rs | 5 +- lib/u_lib/src/client/client.rs | 62 ++++-- lib/u_lib/src/client/network.rs | 21 +- lib/u_lib/src/contracts/datatypes.rs | 3 - lib/u_lib/src/contracts/jobs.rs | 278 ++++++++++++++++++--------- lib/u_lib/src/errors.rs | 27 +++ lib/u_lib/src/executor.rs | 6 + lib/u_lib/src/lib.rs | 6 +- 9 files changed, 291 insertions(+), 129 deletions(-) create mode 100644 lib/u_lib/src/errors.rs diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index 5ec5a77..675de6d 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -7,22 +7,18 @@ // проверка ssh ключей и распространение через known_hosts // самоуничтожение -//mod jobs; - use { std::thread::sleep, std::time::Duration, - u_lib::client::* + std::env, + u_lib::client::network::ClientHandler }; -//mod executor; - -use network::ClientHandler; - #[tokio::main] async fn main() { //daemonize(); - let instance = ClientHandler::new(); + let arg_ip = env::args().nth(1); + let instance = ClientHandler::new(arg_ip); instance.init().await; loop { sleep(Duration::from_secs(2)); diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index 6669231..6d318a3 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -12,12 +12,13 @@ async fn main() -> Result<(), &'static str> { Some(m) => m, None => return Err("Method required") }; - let cli_handler = ClientHandler::new_pwd("123qwe".to_string()); + let cli_handler = ClientHandler::new(None) + .password("123qwe".to_string()); match method.as_str() { "ls" => { let result = cli_handler.list().await; for cli in result.iter() { - println!("{:?}", cli) + println!("{:#?}", cli) } }, _ => return Err("Unknown method") diff --git a/lib/u_lib/src/client/client.rs b/lib/u_lib/src/client/client.rs index eb8841e..b9f2d4e 100644 --- a/lib/u_lib/src/client/client.rs +++ b/lib/u_lib/src/client/client.rs @@ -1,12 +1,14 @@ +use std::{ + collections::HashMap, + fmt +}; + use serde::{ Deserialize, Serialize }; use uuid::Uuid; -use crate::{ - contracts::*, - UID -}; +use crate::{contracts::*, UID, exec_job}; pub struct UClient { @@ -25,25 +27,55 @@ impl UClient { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ClientInfo { - pub local_ip: String, - pub hostname: String, - pub username: String, - pub os: String, - pub platform: String, + pub info: HashMap, pub id: Uuid, } impl ClientInfo { pub fn gather() -> Self { + let mut info: HashMap = HashMap::new(); + for job in DEFAULT_JOBS { + let mut job_meta = JobMeta::from_shell(job.1.into()); + let job_result = exec_job(&mut job_meta); + let job_data = match job_result.data { + Ok(output) => output.multiline(), + Err(e) => e.description() + }; + info.insert(job.0.into(), job_data); + } ClientInfo { - local_ip: String::from("1.2.3.4"), - hostname: String::from("polokonzerva"), - username: String::from("plazmoid"), - os: String::from("pinux"), - platform: String::from("x86_64"), + info, id: *UID } } + + pub fn get_field(&self, field: &str) -> Option<&String> { + self.info.get(field) + } } -impl ToMsg for ClientInfo {} \ No newline at end of file +impl ToMsg for ClientInfo {} + +const DEFAULT_JOBS: &[(&str, &str)] = &[ + //("local ip", "ip a"), + ("hostname", "hostname"), + ("username", "whoami"), + ("platform", "uname -a"), +]; + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_gather() { + let cli_info = ClientInfo::gather(); + let field = cli_info.get_field("username").unwrap(); + assert_eq!( + JobOutput::from_multiline(field).unwrap().stdout, + b"plazmoid".to_vec() + ) + } + +} diff --git a/lib/u_lib/src/client/network.rs b/lib/u_lib/src/client/network.rs index b2c34cd..f45bf3d 100644 --- a/lib/u_lib/src/client/network.rs +++ b/lib/u_lib/src/client/network.rs @@ -9,7 +9,10 @@ use reqwest::{ Response, RequestBuilder }; - +use std::{ + net::Ipv4Addr, + str::FromStr +}; pub struct ClientHandler { base_url: Url, @@ -19,21 +22,23 @@ pub struct ClientHandler { } impl ClientHandler { - pub fn new() -> Self { + pub fn new(server: Option) -> Self { + let master_server = server + .map(|s| Ipv4Addr::from_str(&s).unwrap()) + .unwrap_or(MASTER_SERVER); Self { client: Client::new(), cli_info: ClientInfo::gather(), base_url: Url::parse( - &format!("http://{}:{}", MASTER_SERVER, MASTER_PORT) + &format!("http://{}:{}", master_server, MASTER_PORT) ).unwrap(), password: None } } - pub fn new_pwd(password: String) -> Self { - let mut instance = Self::new(); - instance.password = Some(password); - instance + pub fn password(mut self, password: String) -> ClientHandler { + self.password = Some(password); + self } fn set_pwd(&self, rb: RequestBuilder) -> RequestBuilder { @@ -53,7 +58,7 @@ impl ClientHandler { self.set_pwd(rb) } - pub async fn init(&self) -> RawMsg { + pub async fn init(&self) -> RawMsg { // move to result let response: Response = self.build_post("/new") .json(&self.cli_info.as_message()) .send() diff --git a/lib/u_lib/src/contracts/datatypes.rs b/lib/u_lib/src/contracts/datatypes.rs index 44ca8d3..3655f8a 100644 --- a/lib/u_lib/src/contracts/datatypes.rs +++ b/lib/u_lib/src/contracts/datatypes.rs @@ -9,9 +9,6 @@ use { uuid::Uuid, }; - -//pub type JobPool = Vec>; - pub type CliStorage = HashMap; diff --git a/lib/u_lib/src/contracts/jobs.rs b/lib/u_lib/src/contracts/jobs.rs index e0d83aa..93e8c0b 100644 --- a/lib/u_lib/src/contracts/jobs.rs +++ b/lib/u_lib/src/contracts/jobs.rs @@ -1,7 +1,7 @@ use std::{ process::Command, time::SystemTime, - cmp::PartialEq + cmp::PartialEq, }; use serde::{ Serialize, @@ -10,80 +10,10 @@ use serde::{ use uuid::Uuid; //use tokio::process::Command; use super::*; +use crate::UError; -pub struct Job<'meta> { - pub result: JobResult, - pub meta: &'meta mut JobMeta, -} - -impl<'meta> Job<'meta> { - pub fn new(job_meta: &'meta mut JobMeta) -> Self { - Self { - result: JobResult { - id: job_meta.id.clone(), - state: job_meta.state.clone(), - data: None, - }, - meta: job_meta, - } - } - - pub fn run(&mut self) { - match self.meta.exec_type { - JobType::Shell => { - match self.meta.state { - JobState::Queued | JobState::Pending => { - self.meta.state = JobState::Running; - }, - JobState::Finished => { - if self.meta.schedule == JobSchedule::Permanent { - self.meta.state = JobState::Running; - } else { - return - } - }, - JobState::Running => return - } - - match &self.meta.payload { - Some(box_payload) => { - let payload = String::from_utf8_lossy(box_payload).into_owned(); - let mut cmd_parts = payload - .split(" ") - .map(String::from) - .collect::>() - .into_iter(); - let cmd = cmd_parts.nth(0).unwrap(); - let args = cmd_parts.collect::>(); - let result = Command::new(cmd) - .args(args) - .output(); - self.result.data = Some(match result { - Ok(output) => { - if output.status.success() { - Ok(output.stdout.to_vec()) - } else { - Err(output.stderr.to_vec()) - } - } - Err(e) => Err(e.to_string().into_bytes()) - }) - } - None => return - } - self.meta.state = JobState::Finished; - }, - _ => unimplemented!() - } - } - - fn into_result(self) -> JobResult { - self.result - } -} - -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum ManageAction { Ping, UpdateAvailable, @@ -91,7 +21,7 @@ pub enum ManageAction { Terminate } -#[derive(Serialize, Deserialize, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum JobSchedule { Once, Permanent, @@ -99,7 +29,7 @@ pub enum JobSchedule { //TODO: Scheduled } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum JobState { Queued, // server created a job, but client didn't get it yet Pending, // client got a job, but not running yet @@ -108,7 +38,7 @@ pub enum JobState { Finished, } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum JobType { Manage(ManageAction), Shell, @@ -116,7 +46,7 @@ pub enum JobType { Binary } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct JobMeta { pub id: Uuid, pub name: String, @@ -130,40 +60,192 @@ pub struct JobMeta { } impl JobMeta { - pub fn from_shell(shell_cmd: Vec) -> Self { + pub fn from_shell(shell_cmd: String) -> Self { let uid = Uuid::new_v4(); - //let str_payload_name: &[u8] = shell_cmd.split(|b| &[*b] == b" ").collect(); - //let job_name = format!("{} {}", uid.to_string()[..6], str_payload_name[0]); + let job_name = shell_cmd.split(" ").nth(0).unwrap(); Self { id: uid.clone(), - name: uid.to_string(), + name: job_name.to_string(), created: SystemTime::now(), updated: SystemTime::now(), state: JobState::Pending, exec_type: JobType::Shell, schedule: JobSchedule::Once, append_result: true, - payload: Some(Box::new(shell_cmd)) + payload: Some(Box::new(shell_cmd.into_bytes())) } } - pub fn from_shell_str(shell_cmd: String) -> Self { - Self::from_shell(shell_cmd.into_bytes()) + pub fn touch(&mut self) { + self.updated = SystemTime::now(); } } impl ToMsg for JobMeta {} -#[derive(Serialize, Deserialize, Clone)] + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct JobOutput { + pub stdout: Vec, + pub stderr: Vec, +} + +impl JobOutput { + const STREAM_BORDER: &'static str = "***"; + const STDOUT: &'static str = "STDOUT"; + const STDERR: &'static str = "STDERR"; + + #[inline] + fn create_delim(header: &'static str) -> String { + format!("{border} {head} {border}\n", + border = JobOutput::STREAM_BORDER, + head = header + ) + } + + pub fn new() -> Self { + Self { + stdout: Vec::new(), + stderr: Vec::new(), + } + } + + pub fn multiline(&self) -> String { + let mut result = String::new(); + if self.stdout.len() > 0 { + result += &format!("{stdout_head}{stdout}\n", + stdout_head = JobOutput::create_delim(JobOutput::STDOUT), + stdout = String::from_utf8_lossy(&self.stdout)) + } + + if self.stderr.len() > 0 { + result += &format!("{stderr_head}{stderr}\n", + stderr_head = JobOutput::create_delim(JobOutput::STDERR), + stderr = String::from_utf8_lossy(&self.stderr)) + } + result + } + + pub fn from_multiline(raw: &String) -> Option { + let err_header = JobOutput::create_delim(JobOutput::STDERR); + raw.find(&err_header) + .and(raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))) + .map(|s: &str| { + let mut parts = s.split(&err_header) + .map(|d| Vec::from(d.trim().as_bytes())) + .collect::>>() + .into_iter(); + let mut instance = JobOutput::new(); + instance.stdout = parts.next().unwrap(); + instance.stderr = parts.next().unwrap(); + instance + }) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct JobResult { - id: Uuid, - data: Option, Vec>>, - state: JobState + pub id: Uuid, + pub data: Result, + pub state: JobState, + pub retcode: Option, } impl ToMsg for JobResult {} +pub struct Job<'meta> { + pub result: JobResult, + pub meta: &'meta mut JobMeta, +} + +impl<'meta> Job<'meta> { + pub fn new(job_meta: &'meta mut JobMeta) -> Self { + Self { + result: JobResult { + id: job_meta.id.clone(), + state: job_meta.state.clone(), + data: Ok(JobOutput::new()), + retcode: None, + }, + meta: job_meta, + } + } + + pub fn run(&mut self) { + match self.meta.exec_type { + JobType::Shell => { + match self.meta.state { + JobState::Queued | JobState::Pending => { + self.update_state(Some(JobState::Running)); + }, + JobState::Finished => { + if self.meta.schedule == JobSchedule::Permanent { + self.update_state(Some(JobState::Running)) + } else { + return + } + }, + JobState::Running => return + } + + match &self.meta.payload { + Some(box_payload) => { + let payload = String::from_utf8_lossy(box_payload).into_owned(); + let mut cmd_parts = payload + .split(" ") + .map(String::from) + .collect::>() + .into_iter(); + let cmd = cmd_parts.nth(0).unwrap(); + let args = cmd_parts.collect::>(); + let result = Command::new(cmd) + .args(args) + .output(); + match result { + Ok(output) => { + let job_out: &mut JobOutput = self.result.data.as_mut().unwrap(); + job_out.stdout = output.stdout.to_vec(); + job_out.stderr = output.stderr.to_vec(); + self.result.retcode = output.status.code(); + } + Err(e) => { + self.result.data = Err( + UError::Raw(e.to_string()) + ); + self.result.retcode = None; + } + } + } + None => return + } + self.meta.state = JobState::Finished; + }, + _ => unimplemented!() + } + } + + /// None => state is copied from meta to result field + /// Some => state is applied to both meta and result fields + pub fn update_state(&mut self, state: Option) { + match state { + Some(state) => { + self.meta.state = state.clone(); + self.result.state = state; + } + None => { + self.result.state = self.meta.state.clone(); + } + } + } + + pub fn into_result(mut self) -> JobResult { + Self::update_state(&mut self, None); + self.result + } +} + + #[cfg(test)] mod tests { use super::*; @@ -171,9 +253,23 @@ mod tests { #[test] fn test_shell_job() { - let mut job = JobMeta::from_shell_str("whoami".into()); + let mut job = JobMeta::from_shell("whoami".into()); let mut jobs: Vec = vec![Job::new(&mut job)]; execute_jobs(&mut jobs); - assert_eq!(jobs.pop().unwrap().result.data.unwrap().unwrap(), b"plazmoid\n".to_vec()); + let job_result = jobs.pop().unwrap().into_result(); + assert_eq!( + job_result.data.unwrap().stdout, + b"plazmoid\n".to_vec() + ); } -} \ No newline at end of file + + #[test] + fn test_failing_shell_job() { + let mut job = JobMeta::from_shell("lol_kek_puk".into()); + let mut jobs: Vec = vec![Job::new(&mut job)]; + execute_jobs(&mut jobs); + let job_result = jobs.pop().unwrap().into_result(); + assert!(job_result.data.is_err()); + assert_eq!(job_result.retcode, None); + } +} diff --git a/lib/u_lib/src/errors.rs b/lib/u_lib/src/errors.rs new file mode 100644 index 0000000..e38e167 --- /dev/null +++ b/lib/u_lib/src/errors.rs @@ -0,0 +1,27 @@ +use std::fmt; +use serde::{ + Serialize, + Deserialize +}; + +#[derive(Serialize, Deserialize, Clone)] +pub enum UError { + Raw(String) +} + +impl UError { + pub fn description(&self) -> String { + match self { + UError::Raw(msg) => msg.clone() + } + } +} + +impl fmt::Debug for UError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let msg = match self { + UError::Raw(msg) => msg + }; + write!(f, "{}", msg) + } +} \ No newline at end of file diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index a3d80f2..00cc3b0 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -20,4 +20,10 @@ use crate::contracts::*; pub fn execute_jobs(jobs: &mut Vec) { jobs.iter_mut().for_each(|job| job.run()) +} + +pub fn exec_job(job_meta: &mut JobMeta) -> JobResult { + let mut job = Job::new(job_meta); + job.run(); + job.into_result() } \ No newline at end of file diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index 0628d95..823982e 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -1,13 +1,15 @@ pub mod executor; pub mod config; -pub mod contracts; pub mod utils; pub mod client; +pub mod errors; +pub mod contracts; pub use { utils::*, config::*, - executor::* + executor::*, + errors::*, }; #[macro_use]