From 1ce14d37a5b3fa2bdab62d254e24ddcc241215fe Mon Sep 17 00:00:00 2001 From: Administrator Date: Thu, 25 Feb 2021 21:50:43 +0000 Subject: [PATCH] Resolve "Hold a readonly ref on a cached job" --- bin/u_agent/src/main.rs | 29 ++-- bin/u_panel/src/main.rs | 43 +++--- bin/u_server/src/db.rs | 34 ++--- bin/u_server/src/handlers.rs | 20 +-- bin/u_server/src/main.rs | 10 +- lib/u_lib/src/api.rs | 38 +++-- lib/u_lib/src/errors.rs | 10 +- lib/u_lib/src/lib.rs | 1 + lib/u_lib/src/models/agent.rs | 6 +- lib/u_lib/src/models/jobs.rs | 137 ++++++++++++++---- lib/u_lib/src/models/schema.rs | 7 +- .../2020-10-24-111622_create_all/up.sql | 4 +- 12 files changed, 200 insertions(+), 139 deletions(-) diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index f72b211..0281762 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -2,9 +2,7 @@ // поддержка питона // резолв адреса управляющего сервера через DoT // кроссплатформенность (реализовать интерфейс для винды и никсов) -// перезапуск через memfd_create // проверка обнов -// проверка ssh ключей и распространение через known_hosts // самоуничтожение #[macro_use] @@ -15,8 +13,10 @@ use std::env; use u_lib::{ api::ClientHandler, models::{gather}, - build_jobs, - UID + build_jobs_with_result, + UID, + JobResult, + JobCache }; use tokio::{time::{Duration, sleep}}; @@ -44,17 +44,22 @@ async fn main() { retry_until_ok!(instance.init(&cli_info).await); info!("Instanciated! Running main loop"); loop { - let jobs = retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await); - if jobs.len() > 0 { - let job_uids: Vec = jobs.iter() - .map(|j| j.id.to_string()[..8].to_owned()) - .collect(); - info!("Fetched jobs: \n{}", job_uids.join("\n")); - let result = build_jobs(jobs) + let job_requests: Vec = + retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await); + if job_requests.len() > 0 { + for jr in job_requests.iter() { + if !JobCache::contains(&jr.job_id) { + info!("Fetching job: {}", &jr.job_id); + let fetched_job = + retry_until_ok!(instance.get_jobs(Some(&jr.job_id)).await).pop().unwrap(); + JobCache::insert(fetched_job); + } + }; + let result = build_jobs_with_result(job_requests) .run_until_complete() .await .into_iter() - .map(|r| r.unwrap())//TODO: panic handler (send info on server) + .map(|r| r.unwrap()) .collect(); retry_until_ok!(instance.report( &result diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index 0fecbb8..1a873f7 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -1,7 +1,8 @@ use structopt::StructOpt; use u_lib::{ api::ClientHandler, - models::JobMeta + models::JobMeta, + UError }; use std::path::PathBuf; use uuid::Uuid; @@ -51,10 +52,7 @@ enum JmALD { }, List { #[structopt(parse(try_from_str = parse_uuid))] - uid: Option, - - #[structopt(short, long)] - results: bool + uid: Option }, Delete { #[structopt(parse(try_from_str = parse_uuid))] @@ -78,46 +76,47 @@ fn parse_uuid(src: &str) -> Result { Uuid::parse_str(src).map_err(|e| e.to_string()) } -#[tokio::main] -async fn main() -> Result<(), &'static str> { - let args: Args = Args::from_args(); +async fn process_cmd(cmd: Cmd) -> Result<(), UError> { let cli_handler = ClientHandler::new(None) .password("123qwe".to_string()); - match args.cmd { + match cmd { Cmd::Agents(action) => match action { LD::List {uid} => cli_handler.get_agents(uid.as_ref()) - .await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)), + .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)), LD::Delete {uid} => { - println!("{}", cli_handler.del(Some(&uid)).await.unwrap()); + println!("{}", cli_handler.del(Some(&uid)).await?); } }, Cmd::Jobs(action) => match action { JobALD::Add {cmd: JobCmd::Cmd(cmd), agent} => { let job = JobMeta::from_shell(cmd.join(" ")); let job_uid = job.id; - cli_handler.upload_jobs(&vec![job]).await.unwrap(); + cli_handler.upload_jobs(&vec![job]).await?; if agent.is_some() { - cli_handler.set_jobs(&vec![job_uid], agent.as_ref()).await.unwrap() + cli_handler.set_jobs(&vec![job_uid], agent.as_ref()).await? } }, JobALD::LD(LD::List {uid}) => cli_handler.get_jobs(uid.as_ref()) - .await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)), + .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)), JobALD::LD(LD::Delete {uid}) => { - println!("{}", cli_handler.del(Some(&uid)).await.unwrap()) + println!("{}", cli_handler.del(Some(&uid)).await?) } } Cmd::Jobmap(action) => match action { JmALD::Add {agent_uid, job_uids} => cli_handler.set_jobs(&job_uids, Some(&agent_uid)) - .await.unwrap(), - JmALD::List {uid, results} => if results { - cli_handler.get_results(uid.as_ref()) - .await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)) - } else { + .await?, + JmALD::List {uid} => { cli_handler.get_agent_jobs(uid.as_ref()) - .await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)) + .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)) }, - JmALD::Delete {uid} => println!("{}", cli_handler.del(Some(&uid)).await.unwrap()) + JmALD::Delete {uid} => println!("{}", cli_handler.del(Some(&uid)).await?) } } Ok(()) } + +#[tokio::main] +async fn main() -> Result<(), UError> { + let args: Args = Args::from_args(); + process_cmd(args.cmd).await +} diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 797d0e8..943669b 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -67,37 +67,25 @@ impl UDB { Ok(result) } - //TODO: belongs_to - pub fn get_agent_jobs(&self, uid: Option, personal: bool) -> USrvResult> { - use schema::{results, jobs}; + pub fn get_agent_jobs(&self, uid: Option, personal: bool) -> USrvResult> { + use schema::results; let mut q = results::table - .inner_join(jobs::table) .into_boxed(); if uid.is_some() { q = q.filter(results::agent_id.eq(uid.unwrap())) } if personal { - q = q.filter(results::state.eq(JobState::Queued)) - } - let result = q.select( - (jobs::alias, jobs::id, jobs::exec_type, jobs::platform, jobs::payload) - ) - .get_results::(&self.conn)?; - Ok(result) - } - - pub fn get_results(&self, uid: Option) -> USrvResult> { - use schema::results; - let result = if uid.is_some() { - results::table - .filter(results::agent_id.eq(uid.unwrap())) + q = q.filter( + results::state.eq(JobState::Queued) + .and(results::agent_id.eq(uid.unwrap())) + ) + } else if uid.is_some() { + q = q.filter(results::agent_id.eq(uid.unwrap())) .or_filter(results::job_id.eq(uid.unwrap())) .or_filter(results::id.eq(uid.unwrap())) - .load::(&self.conn)? - } else { - results::table - .load::(&self.conn)? - }; + + } + let result = q.load::(&self.conn)?; Ok(result) } diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 0b912d2..8a586f2 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -66,20 +66,6 @@ pub async fn get_jobs( } } -pub async fn get_results( - uid: Option, - db: Storage) -> Result -{ - match db.lock() - .unwrap() - .get_results(uid) { - Ok(r) => Ok(warp::reply::json( - &r.as_message() - )), - Err(e) => Err(warp::reject()) - } -} - pub async fn get_agent_jobs( uid: Option, db: Storage, @@ -135,7 +121,7 @@ pub async fn set_jobs( .unwrap() .set_jobs_for_agent(&agent_uid, &msg.into_inner()) { Ok(_) => build_empty_200(), - Err(e) => build_response(StatusCode::BAD_REQUEST, dbg!(e)) + Err(e) => build_response(StatusCode::BAD_REQUEST, e) } } @@ -160,7 +146,3 @@ pub async fn report( } build_empty_200() } - -pub async fn dummy() -> Result { - Ok(String::from("ok")) -} diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index 8b93631..771a22a 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -76,7 +76,7 @@ async fn main() { let get_personal_jobs = warp::get() .and(warp::path(Paths::get_agent_jobs)) - .and(warp::path::param::().map(Some).or_else(infallible_none)) + .and(warp::path::param::().map(Some)) .and(db.clone()) .and_then(|uid, db| handlers::get_agent_jobs(uid, db, true)); @@ -99,15 +99,10 @@ async fn main() { .and(db.clone()) .and_then(handlers::report); - let get_results = warp::get() - .and(warp::path(Paths::get_results)) - .and(warp::path::param::().map(Some).or_else(infallible_none)) - .and(db.clone()) - .and_then(handlers::get_results); - let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); let agent_zone = new_agent + .or(get_jobs.clone()) .or(get_personal_jobs) .or(report) ; @@ -120,7 +115,6 @@ async fn main() { .or(del) .or(set_jobs) .or(get_agent_jobs) - .or(get_results) ); let routes = agent_zone diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 0ff71c5..f45e660 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -11,7 +11,6 @@ use crate::{ use reqwest::{ Client, Url, - Response, RequestBuilder }; use std::{ @@ -95,14 +94,34 @@ macro_rules! build_handler { let request = $crate::build_url_by_method!( $method $path, pname = $($param_name)?, ptype = $($param_type)?, urlparam = $($url_param)? - )(self $(, param as &$param_type)? $(, url_param as Option<&$url_param>)?); + )( + self + $(, param as &$param_type)? + $(, url_param as Option<&$url_param>)? + ); let response = request.send().await?; - match response.error_for_status() { - Ok(r) => Ok(r.json::>() + let content_len = response.content_length(); + let is_success = match response.error_for_status_ref() { + Ok(_) => Ok(()), + Err(e) => Err(UError::from(e)) + }; + match is_success { + Ok(_) => response.json::>() .await .map(|msg| msg.into_inner()) - .unwrap_or_default()), - Err(e) => Err(UError::from(e)) + .or_else(|e| { + match content_len { + Some(0) => Ok(Default::default()), + _ => Err(UError::from(e)) + } + }), + Err(UError::NetError(err_src, _)) => Err( + UError::NetError( + err_src, + response.text().await.unwrap() + ) + ), + _ => unreachable!() } } } @@ -162,8 +181,8 @@ impl ClientHandler { ////////////////// // client listing (A) build_handler!(GET get_agents/Uuid() -> Vec); -// get jobs for client (agent_id=Uuid) -build_handler!(GET get_agent_jobs/Uuid() -> Vec); +// get jobs for client +build_handler!(GET get_agent_jobs/Uuid() -> Vec); // get all available jobs (A) build_handler!(GET get_jobs/Uuid() -> Vec); // add client to server's db @@ -175,8 +194,5 @@ build_handler!(GET del/Uuid() -> String); // set jobs for client (A) // POST /set_jobs/Uuid json: Vec build_handler!(POST set_jobs/Uuid(Vec) -> ()); -// get results (A) -// GET /get_job_results/Uuid -build_handler!(GET get_results/Uuid() -> Vec); // report job result build_handler!(POST report(Vec) -> ()); diff --git a/lib/u_lib/src/errors.rs b/lib/u_lib/src/errors.rs index 9103bae..6dc51bf 100644 --- a/lib/u_lib/src/errors.rs +++ b/lib/u_lib/src/errors.rs @@ -13,8 +13,8 @@ pub enum UError { #[error("Error: {0}")] Raw(&'static str), - #[error("Connection error: {0}")] - NetError(String), + #[error("Connection error: {0}. Body: {1}")] + NetError(String, String), #[error("Parse error")] ParseError, @@ -28,8 +28,8 @@ pub enum UError { #[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")] InsuitablePlatform(String, String), - #[error("Task {0} doesn't exist")] - NoTask(Uuid), + #[error("Job {0} doesn't exist")] + NoJob(Uuid), #[error("Error opening {0}: {1}")] FilesystemError(String, String) @@ -37,6 +37,6 @@ pub enum UError { impl From for UError { fn from(e: ReqError) -> Self { - UError::NetError(e.to_string()) + UError::NetError(e.to_string(), String::new()) } } \ No newline at end of file diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index e7a91b6..8cf8352 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -21,5 +21,6 @@ extern crate lazy_static; #[macro_use] extern crate diesel; +#[macro_use] extern crate log; extern crate env_logger; \ No newline at end of file diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 599aa55..e5cf7d9 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -88,10 +88,10 @@ pub async fn gather() -> IAgent { id: UID.clone(), hostname: run_cmd_fast("hostname").await, is_root: &run_cmd_fast("id -u").await == "0", - is_root_allowed: false, //TODO + is_root_allowed: false, platform: guess_host_triple().unwrap_or("Error").to_string(), - status: None, //TODO - token: None, //TODO + status: None, //wtf? + token: None, username: run_cmd_fast("id -un").await, } } diff --git a/lib/u_lib/src/models/jobs.rs b/lib/u_lib/src/models/jobs.rs index f77144f..e927d8c 100644 --- a/lib/u_lib/src/models/jobs.rs +++ b/lib/u_lib/src/models/jobs.rs @@ -1,11 +1,15 @@ use std::{ time::{SystemTime, Duration}, thread, + sync::{RwLock, RwLockReadGuard}, cmp::PartialEq, fmt, string::ToString, path::PathBuf, - fs + fs, + process::Output, + collections::HashMap, + ops::Deref, }; use serde::{ Serialize, @@ -13,10 +17,13 @@ use serde::{ }; use uuid::Uuid; use guess_host_triple::guess_host_triple; -use tokio::process::Command; +use tokio::{ + process::Command +}; use crate::{ utils::systime_to_string, models::schema::*, + Agent, UError, UResult, UID, @@ -29,10 +36,49 @@ use diesel::{ Queryable, Identifiable, Insertable, - query_builder::AsChangeset }; use strum::Display; +type Cache = HashMap; + +lazy_static! { + static ref JOB_CACHE: RwLock = RwLock::new(HashMap::new()); +} + +pub struct JobCache; + +impl JobCache { + pub fn insert(job_meta: JobMeta) { + JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta); + } + + pub fn contains(uid: &Uuid) -> bool { + JOB_CACHE.read().unwrap().contains_key(uid) + } + + pub fn get(uid: &Uuid) -> Option { + if !Self::contains(uid) { + return None + } + let lock = JOB_CACHE.read().unwrap(); + Some(JobCacheHolder(lock, uid)) + } + + pub fn remove(uid: &Uuid) { + JOB_CACHE.write().unwrap().remove(uid); + } +} + +pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid); + +impl<'jm> Deref for JobCacheHolder<'jm> { + type Target = JobMeta; + + fn deref(&self) -> &Self::Target { + self.0.get(self.1).unwrap() + } +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub enum ManageAction { @@ -46,7 +92,7 @@ pub enum ManageAction { pub enum JobSchedule { Once, Permanent, - //TODO: Scheduled + //Scheduled } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)] @@ -96,13 +142,13 @@ impl JobOutput { } } - pub fn stdout(mut self, data: &[u8]) -> Self { - self.stdout = data.to_owned(); + pub fn stdout(mut self, data: Vec) -> Self { + self.stdout = data; self } - pub fn stderr(mut self, data: &[u8]) -> Self { - self.stderr = data.to_owned(); + pub fn stderr(mut self, data: Vec) -> Self { + self.stderr = data; self } @@ -128,12 +174,12 @@ impl JobOutput { raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) .map(|s: &str| { let mut parts = s.split(&err_header) - .map(|d| d.trim().as_bytes()) - .collect::>() + .map(|d| d.trim().as_bytes().to_vec()) + .collect::>>() .into_iter(); JobOutput::new() .stdout(parts.next().unwrap()) - .stderr(parts.next().unwrap_or(&[])) + .stderr(parts.next().unwrap_or(vec![])) }) } @@ -158,7 +204,7 @@ impl JobOutput { Insertable )] #[table_name = "jobs"] -pub struct JobMeta { // TODO: shell cmd how to exec payload +pub struct JobMeta { pub alias: String, pub id: Uuid, pub exec_type: JobType, @@ -223,7 +269,7 @@ impl Default for JobMeta { Queryable, Identifiable, Insertable, - AsChangeset + AsChangeset, )] #[table_name = "results"] pub struct JobResult { @@ -257,11 +303,13 @@ impl fmt::Display for JobResult { } impl JobResult { - pub fn from_meta(job_id: Uuid) -> Self { - let mut inst = JobResult::default(); - inst.agent_id = *UID; - inst.job_id = job_id; - inst + pub fn from_meta(job_id: Uuid, result_id: Option) -> Self { + Self { + id: result_id.unwrap_or(Uuid::new_v4()), + agent_id: *UID, + job_id, + ..Default::default() + } } //pub fn as_job_output(&self) -> JobOutput {} @@ -289,17 +337,20 @@ pub struct Job { } impl Job { - fn build(job_meta: JobMeta) -> UResult { + fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult { match job_meta.exec_type { JobType::Shell => { let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); if job_meta.platform != curr_platform { - return Err(UError::InsuitablePlatform(job_meta.platform, curr_platform)) + return Err(UError::InsuitablePlatform( + job_meta.platform.clone(), curr_platform + )) } + let job_meta = job_meta.clone(); Ok(Self { exec_type: job_meta.exec_type, payload: job_meta.payload, - result: JobResult::from_meta(job_meta.id.clone()) + result: JobResult::from_meta(job_meta.id.clone(), Some(result_id)) }) }, _ => todo!() @@ -315,7 +366,7 @@ impl Job { } None => unimplemented!() }; - let mut cmd_parts = str_payload // TODO: WRONG + let mut cmd_parts = str_payload .split(" ") .map(String::from) .collect::>() @@ -327,14 +378,14 @@ impl Job { .output() .await; let (data, retcode) = match cmd_result { - Ok(output) => { + Ok(Output {status, stdout, stderr}) => { ( Some(JobOutput::new() - .stdout(&output.stdout) - .stderr(&output.stderr) + .stdout(stdout) + .stderr(stderr) .multiline() ), - output.status.code() + status.code() ) } Err(e) => { @@ -355,14 +406,38 @@ impl Job { } } -pub fn build_jobs>(job_metas: J) -> Waiter { - let prepared_jobs = job_metas.into_vec().into_iter().map(|job| -> DynFut { - let j = Job::build(job).unwrap(); - Box::pin(j.run()) - }).collect::>(); +pub fn build_jobs_with_result>(job_requests: J) -> Waiter { + let prepared_jobs = job_requests.into_vec() + .into_iter() + .filter_map(|jr| -> Option { + let job = { + let job_meta = JobCache::get(&jr.job_id); + if job_meta.is_none() { + Err(UError::NoJob(jr.job_id)) + } else { + Job::build(&*job_meta.unwrap(), jr.id) + } + }; + match job { + Ok(j) => Some(Box::pin(j.run())), + Err(e) => { + warn!("Job building error: {}", e); + None + } + } + }).collect::>(); Waiter::new(prepared_jobs) } +pub fn build_jobs>(job_metas: J) -> Waiter { + let job_requests = job_metas.into_vec().into_iter().map(|jm| { + let j_uid = jm.id; + JobCache::insert(jm); + JobResult::from_meta(j_uid, None) + }).collect::>(); + build_jobs_with_result(job_requests) +} + #[cfg(test)] mod tests { diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index f459e0f..aa58e92 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -49,9 +49,9 @@ table! { use crate::*; jobs (id) { - alias -> Nullable, + alias -> Text, id -> Uuid, - exec_type -> JobType, + exec_type -> Jobtype, platform -> Text, payload -> Nullable, } @@ -67,8 +67,9 @@ table! { id -> Uuid, job_id -> Uuid, result -> Nullable, + state -> Jobstate, retcode -> Nullable, - ts -> Timestamp, + updated -> Timestamp, } } diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index 471cc6e..a57754b 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -52,8 +52,8 @@ CREATE TABLE IF NOT EXISTS results ( , state JobState NOT NULL DEFAULT 'queued' , retcode INTEGER , updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP - , FOREIGN KEY(agent_id) REFERENCES agents(id) - , FOREIGN KEY(job_id) REFERENCES jobs(id) + , FOREIGN KEY(agent_id) REFERENCES agents(id) ON DELETE CASCADE + , FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE , PRIMARY KEY(id) );