diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index f64df3e..08cc6a3 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -7,17 +7,15 @@ use tokio::time::{sleep, Duration}; use u_lib::{ api::ClientHandler, cache::JobCache, - config::{get_self_uid, EndpointsEnv}, + config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL}, error::ErrChan, executor::pop_completed, - jobs::{fat_meta_to_thin, UnnamedJobsBatch}, + jobs::{fat_meta_to_thin, AnonymousJobBatch}, logging::init_logger, messaging::Reportable, models::AssignedJobById, }; -const ITERATION_LATENCY: u64 = 5; - pub async fn process_request(jobs: Vec, client: &ClientHandler) { if !jobs.is_empty() { for jr in &jobs { @@ -28,7 +26,7 @@ pub async fn process_request(jobs: Vec, client: &ClientHandler) Ok(result) => break result, Err(err) => { debug!("{:?} \nretrying...", err); - sleep(Duration::from_secs(ITERATION_LATENCY)).await; + sleep(AGENT_ITERATION_INTERVAL).await; } } }; @@ -54,7 +52,7 @@ pub async fn process_request(jobs: Vec, client: &ClientHandler) }) .collect::>(); - UnnamedJobsBatch::from_meta_with_id(meta_with_ids) + AnonymousJobBatch::from_meta_with_id(meta_with_ids) .spawn() .await; } @@ -80,9 +78,9 @@ async fn error_reporting(client: ClientHandler) -> ! { } async fn agent_loop(client: ClientHandler) -> ! { - let self_uid = get_self_uid(); + let self_id = get_self_id(); loop { - match client.get_personal_jobs(self_uid).await { + match client.get_personal_jobs(self_id).await { Ok(jobs) => { process_request(jobs, &client).await; } @@ -103,7 +101,7 @@ async fn agent_loop(client: ClientHandler) -> ! { ErrChan::send(err, "report").await; } } - sleep(Duration::from_secs(ITERATION_LATENCY)).await; + sleep(AGENT_ITERATION_INTERVAL).await; } } @@ -113,7 +111,7 @@ pub fn run_forever() -> ! { if cfg!(debug_assertions) { init_logger(Some(format!( "u_agent-{}", - get_self_uid() + get_self_id() .hyphenated() .to_string() .split("-") @@ -125,7 +123,7 @@ pub fn run_forever() -> ! { u_lib::unix::daemonize() } - info!("Starting agent {}", get_self_uid()); + info!("Starting agent {}", get_self_id()); Builder::new_multi_thread() .enable_all() diff --git a/bin/u_panel/src/argparse.rs b/bin/u_panel/src/argparse.rs index e80843a..f3a2dce 100644 --- a/bin/u_panel/src/argparse.rs +++ b/bin/u_panel/src/argparse.rs @@ -43,7 +43,7 @@ enum JobCmd { enum JobMapCRUD { Create { #[structopt(parse(try_from_str = parse_uuid))] - agent_uid: Uuid, + agent_id: Uuid, job_idents: Vec, }, @@ -55,14 +55,14 @@ enum JobMapCRUD { enum RUD { Read { #[structopt(parse(try_from_str = parse_uuid))] - uid: Option, + id: Option, }, Update { item: String, }, Delete { #[structopt(parse(try_from_str = parse_uuid))] - uid: Uuid, + id: Uuid, }, } @@ -78,12 +78,12 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult = (|| async { Ok(match args.cmd { Cmd::Agents(action) => match action { - RUD::Read { uid } => into_value(client.get_agents(uid).await?), + RUD::Read { id } => into_value(client.get_agents(id).await?), RUD::Update { item } => { let agent = from_str::(&item)?; into_value(client.update_agent(agent).await?) } - RUD::Delete { uid } => into_value(client.del(uid).await?), + RUD::Delete { id } => into_value(client.del(id).await?), }, Cmd::Jobs(action) => match action { JobCRUD::Create { job } => { @@ -96,8 +96,8 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult match uid { - Some(uid) => into_value(client.get_job(uid).await?), + JobCRUD::RUD(RUD::Read { id }) => match id { + Some(id) => into_value(client.get_job(id).await?), None => into_value(client.get_jobs().await?), }, JobCRUD::RUD(RUD::Update { item }) => { @@ -105,19 +105,19 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult into_value(client.del(uid).await?), + JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), }, Cmd::Map(action) => match action { JobMapCRUD::Create { - agent_uid, + agent_id, job_idents, - } => into_value(client.set_jobs(agent_uid, job_idents).await?), - JobMapCRUD::RUD(RUD::Read { uid }) => into_value(client.get_agent_jobs(uid).await?), + } => into_value(client.set_jobs(agent_id, job_idents).await?), + JobMapCRUD::RUD(RUD::Read { id }) => into_value(client.get_agent_jobs(id).await?), JobMapCRUD::RUD(RUD::Update { item }) => { let assigned = from_str::(&item)?; into_value(client.update_result(assigned).await?) } - JobMapCRUD::RUD(RUD::Delete { uid }) => into_value(client.del(uid).await?), + JobMapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), }, Cmd::Ping => into_value(client.ping().await?), Cmd::Serve => { diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 98f9d95..dba691e 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -57,14 +57,14 @@ impl UDB<'_> { .map_err(with_err_ctx("Can't insert jobs")) } - pub fn get_job(&mut self, uid: Uuid) -> Result> { + pub fn get_job(&mut self, id: Uuid) -> Result> { use schema::jobs; jobs::table - .filter(jobs::id.eq(uid)) + .filter(jobs::id.eq(id)) .first(self.conn) .optional() - .map_err(with_err_ctx(format!("Can't get job {uid}"))) + .map_err(with_err_ctx(format!("Can't get job {id}"))) } pub fn get_jobs(&mut self) -> Result> { @@ -108,14 +108,14 @@ impl UDB<'_> { Ok(()) } - pub fn get_agent(&mut self, uid: Uuid) -> Result> { + pub fn get_agent(&mut self, id: Uuid) -> Result> { use schema::agents; agents::table - .filter(agents::id.eq(uid)) + .filter(agents::id.eq(id)) .first(self.conn) .optional() - .map_err(with_err_ctx(format!("Can't get agent {uid:?}"))) + .map_err(with_err_ctx(format!("Can't get agent {id:?}"))) } pub fn get_agents(&mut self) -> Result> { @@ -126,40 +126,36 @@ impl UDB<'_> { .map_err(with_err_ctx(format!("Can't get agents"))) } - pub fn update_job_status(&mut self, uid: Uuid, status: JobState) -> Result<()> { + pub fn update_job_status(&mut self, id: Uuid, status: JobState) -> Result<()> { use schema::results; diesel::update(results::table) - .filter(results::id.eq(uid)) + .filter(results::id.eq(id)) .set(results::state.eq(status)) .execute(self.conn) - .map_err(with_err_ctx(format!("Can't update status of job {uid}")))?; + .map_err(with_err_ctx(format!("Can't update status of job {id}")))?; Ok(()) } //TODO: filters possibly could work in a wrong way, check - pub fn get_exact_jobs( - &mut self, - uid: Option, - personal: bool, - ) -> Result> { + pub fn get_exact_jobs(&mut self, id: Option, personal: bool) -> Result> { use schema::results; let mut q = results::table.into_boxed(); - /*if uid.is_some() { - q = q.filter(results::agent_id.eq(uid.unwrap())) + /*if id.is_some() { + q = q.filter(results::agent_id.eq(id.unwrap())) }*/ if personal { q = q.filter( results::state .eq(JobState::Queued) - .and(results::agent_id.eq(uid.unwrap())), + .and(results::agent_id.eq(id.unwrap())), ) - } else if uid.is_some() { + } else if id.is_some() { q = q - .filter(results::agent_id.eq(uid.unwrap())) - .or_filter(results::job_id.eq(uid.unwrap())) - .or_filter(results::id.eq(uid.unwrap())) + .filter(results::agent_id.eq(id.unwrap())) + .or_filter(results::job_id.eq(id.unwrap())) + .or_filter(results::id.eq(id.unwrap())) } let result = q .load::(self.conn) @@ -167,38 +163,39 @@ impl UDB<'_> { Ok(result) } - pub fn set_jobs_for_agent(&mut self, agent_uid: Uuid, job_uids: &[Uuid]) -> Result> { + pub fn set_jobs_for_agent(&mut self, agent_id: Uuid, job_ids: &[Uuid]) -> Result> { use schema::{jobs, results}; - let agent_platform = match self.get_agent(agent_uid)? { + let agent_platform = match self.get_agent(agent_id)? { Some(agent) => Platform::new(&agent.platform), None => { return Err(Error::ProcessingError(format!( - "Agent {agent_uid} not found" + "Agent {agent_id} not found" ))) } }; - for uid in job_uids { - let job_platform = jobs::table - .select(jobs::platform) - .filter(jobs::id.eq(uid)) - .first(self.conn) - .map_err(with_err_ctx(format!("Can't find job {uid}")))?; + let jobs_meta = jobs::table + .select((jobs::id, jobs::alias, jobs::platform)) + .filter(jobs::id.eq_any(job_ids)) + .load::<(Uuid, Option, String)>(self.conn) + .map_err(with_err_ctx(format!("Can't find jobs {job_ids:?}")))?; - if !agent_platform.matches(&job_platform) { + for meta in &jobs_meta { + if !agent_platform.matches(&meta.2) { return Err(Error::InsuitablePlatform( agent_platform.into_string(), - job_platform, + meta.2.clone(), )); } } - let job_requests = job_uids - .iter() - .map(|job_uid| AssignedJob { - job_id: *job_uid, - agent_id: agent_uid, + let job_requests = jobs_meta + .into_iter() + .map(|(job_id, alias, _)| AssignedJob { + job_id, + agent_id, + alias, ..Default::default() }) .collect::>(); @@ -207,19 +204,19 @@ impl UDB<'_> { .values(&job_requests) .execute(self.conn) .map_err(with_err_ctx(format!( - "Can't setup jobs {job_uids:?} for agent {agent_uid:?}" + "Can't setup jobs {job_ids:?} for agent {agent_id:?}" )))?; Ok(job_requests.iter().map(|aj| aj.id).collect()) } - pub fn del_jobs(&mut self, uids: &[Uuid]) -> Result { + pub fn del_jobs(&mut self, ids: &[Uuid]) -> Result { use schema::jobs; let mut affected = 0; - for &uid in uids { + for id in ids { let deleted = diesel::delete(jobs::table) - .filter(jobs::id.eq(uid)) + .filter(jobs::id.eq(id)) .execute(self.conn) .map_err(with_err_ctx("Can't delete jobs"))?; affected += deleted; @@ -227,13 +224,13 @@ impl UDB<'_> { Ok(affected) } - pub fn del_results(&mut self, uids: &[Uuid]) -> Result { + pub fn del_results(&mut self, ids: &[Uuid]) -> Result { use schema::results; let mut affected = 0; - for &uid in uids { + for id in ids { let deleted = diesel::delete(results::table) - .filter(results::id.eq(uid)) + .filter(results::id.eq(id)) .execute(self.conn) .map_err(with_err_ctx("Can't delete results"))?; affected += deleted; @@ -241,13 +238,13 @@ impl UDB<'_> { Ok(affected) } - pub fn del_agents(&mut self, uids: &[Uuid]) -> Result { + pub fn del_agents(&mut self, ids: &[Uuid]) -> Result { use schema::agents; let mut affected = 0; - for &uid in uids { + for id in ids { let deleted = diesel::delete(agents::table) - .filter(agents::id.eq(uid)) + .filter(agents::id.eq(id)) .execute(self.conn) .map_err(with_err_ctx("Can't delete agents"))?; affected += deleted; diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 5ed4acc..2e0da72 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -5,7 +5,7 @@ use crate::error::Error; use crate::ValidJobMeta; use u_lib::jobs::{fat_meta_to_thin, thin_meta_to_fat}; use u_lib::{ - messaging::{AsMsg, BaseMessage, Reportable}, + messaging::{AsMsg, Reportable}, misc::OneOrVec, models::*, }; @@ -18,11 +18,11 @@ type EndpResult = Result; pub struct Endpoints; impl Endpoints { - pub async fn get_agents(repo: Arc, uid: Option) -> EndpResult> { + pub async fn get_agents(repo: Arc, id: Option) -> EndpResult> { repo.interact(move |mut db| { - Ok(match uid { - Some(uid) => { - if let Some(agent) = db.get_agent(uid)? { + Ok(match id { + Some(id) => { + if let Some(agent) = db.get_agent(id)? { vec![agent] } else { vec![] @@ -35,8 +35,8 @@ impl Endpoints { .map_err(From::from) } - pub async fn get_job(repo: Arc, uid: Uuid) -> EndpResult { - let Some(job) = repo.interact(move |mut db| db.get_job(uid)).await? else { + pub async fn get_job(repo: Arc, id: Uuid) -> EndpResult { + let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else { return Err(not_found()) }; @@ -52,23 +52,23 @@ impl Endpoints { pub async fn get_agent_jobs( repo: Arc, - uid: Option, + id: Option, ) -> EndpResult> { - repo.interact(move |mut db| db.get_exact_jobs(uid, false)) + repo.interact(move |mut db| db.get_exact_jobs(id, false)) .await .map_err(From::from) } - pub async fn get_personal_jobs(repo: Arc, uid: Uuid) -> EndpResult> { + pub async fn get_personal_jobs(repo: Arc, id: Uuid) -> EndpResult> { repo.transaction(move |mut db| { - let agent = db.get_agent(uid)?; + let agent = db.get_agent(id)?; match agent { Some(mut agent) => { agent.touch(); db.update_agent(&agent)?; } None => { - let new_agent = Agent::with_id(uid); + let new_agent = Agent::with_id(id); db.insert_agent(&new_agent)?; @@ -76,11 +76,11 @@ impl Endpoints { .find_job_by_alias("agent_hello")? .expect("agent_hello job not found"); - db.set_jobs_for_agent(uid, &[job.id])?; + db.set_jobs_for_agent(id, &[job.id])?; } } - let assigned_jobs = db.get_exact_jobs(Some(uid), true)?; + let assigned_jobs = db.get_exact_jobs(Some(id), true)?; for job in &assigned_jobs { db.update_job_status(job.id, JobState::Running)?; @@ -94,10 +94,9 @@ impl Endpoints { pub async fn upload_jobs( repo: Arc, - msg: BaseMessage<'static, Vec>, + msg: Vec, ) -> EndpResult> { let jobs = msg - .into_inner() .into_iter() .map(|meta| Ok(fat_meta_to_thin(meta)?)) .collect::, Error>>()?; @@ -107,11 +106,11 @@ impl Endpoints { .map_err(From::from) } - pub async fn del(repo: Arc, uid: Uuid) -> EndpResult { + pub async fn del(repo: Arc, id: Uuid) -> EndpResult { repo.transaction(move |mut db| { let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; for del_fn in del_fns { - let affected = del_fn(&mut db, &[uid])?; + let affected = del_fn(&mut db, &[id])?; if affected > 0 { return Ok(affected); } @@ -124,12 +123,11 @@ impl Endpoints { pub async fn set_jobs( repo: Arc, - agent_uid: Uuid, - job_idents: BaseMessage<'static, Vec>, + agent_id: Uuid, + job_idents: Vec, ) -> EndpResult> { repo.transaction(move |mut db| { job_idents - .into_inner() .into_iter() .map(|ident| { Uuid::parse_str(&ident).or_else(|_| { @@ -146,7 +144,7 @@ impl Endpoints { }) }) .collect::, Error>>() - .and_then(|j| db.set_jobs_for_agent(agent_uid, &j)) + .and_then(|j| db.set_jobs_for_agent(agent_id, &j)) }) .await .map_err(From::from) @@ -154,27 +152,30 @@ impl Endpoints { pub async fn report + AsMsg + Send + Sync + 'static>( repo: Arc, - msg: BaseMessage<'static, Data>, + msg: Data, + agent_id: Uuid ) -> EndpResult<()> { repo.transaction(move |mut db| { - let id = msg.id; - for entry in msg.into_inner().into_vec() { + for entry in msg.into_vec() { match entry { Reportable::Assigned(mut result) => { let result_agent_id = &result.agent_id; - if id != *result_agent_id { - warn!("Ids are not equal! actual id: {id}, id from job: {result_agent_id}"); + if agent_id != *result_agent_id { + warn!("Agent ids are not equal! actual id: {agent_id}, id from job: {result_agent_id}"); continue; } result.state = JobState::Finished; result.touch(); + + info!("agent {agent_id} updated job {}", result.id); + match result.exec_type { JobType::Init => match &result.result { Some(rbytes) => { let mut agent: Agent = match serde_json::from_slice(&rbytes) { Ok(a) => a, Err(e) => { - error!("Error deserializing agent from {id}: {e}"); + error!("Error deserializing agent data from {agent_id}: {e}"); continue; } }; @@ -191,7 +192,7 @@ impl Endpoints { db.update_result(&result)?; } Reportable::Error(e) => { - error!("agent {id} reported: {e}"); + error!("agent {agent_id} reported: {e}"); } Reportable::Dummy => (), }} @@ -203,19 +204,18 @@ impl Endpoints { pub async fn update_agent( repo: Arc, - agent: BaseMessage<'static, Agent>, + agent: Agent, ) -> EndpResult<()> { - repo.interact(move |mut db| db.update_agent(&agent.into_inner())) + repo.interact(move |mut db| db.update_agent(&agent)) .await?; Ok(()) } pub async fn update_job( repo: Arc, - job: BaseMessage<'static, ValidJobMeta>, + job: ValidJobMeta, ) -> EndpResult<()> { - let meta = job.into_inner(); - let thin_meta = fat_meta_to_thin(meta).map_err(Error::from)?; + let thin_meta = fat_meta_to_thin(job).map_err(Error::from)?; repo.interact(move |mut db| db.update_job(&thin_meta)) .await?; Ok(()) @@ -223,14 +223,14 @@ impl Endpoints { pub async fn update_assigned_job( repo: Arc, - assigned: BaseMessage<'static, AssignedJob>, + assigned: AssignedJob, ) -> EndpResult<()> { - repo.interact(move |mut db| db.update_result(&assigned.into_inner())) + repo.interact(move |mut db| db.update_result(&assigned)) .await?; Ok(()) } - pub async fn download(_file_uid: String) -> EndpResult> { + pub async fn download(_file_id: String) -> EndpResult> { todo!() } } diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index f38210e..b0f86d5 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -11,13 +11,12 @@ mod handlers; use db::PgRepo; use error::{Error as ServerError, RejResponse}; -use serde::de::DeserializeOwned; use std::{convert::Infallible, sync::Arc}; use u_lib::{ config, db::async_pool, jobs::fat_meta_to_thin, - messaging::{AsMsg, BaseMessage, Reportable}, + messaging::{AsMsg, Reportable}, models::*, }; use uuid::Uuid; @@ -32,15 +31,8 @@ use crate::handlers::Endpoints; type ValidJobMeta = FatJobMeta; -fn get_content() -> impl Filter,), Error = Rejection> + Clone -where - M: AsMsg + Sync + Send + DeserializeOwned + 'static, -{ - body::json::>() -} - fn into_message(msg: M) -> Json { - json(&msg.as_message()) + json(&msg) } pub fn init_endpoints( @@ -48,7 +40,7 @@ pub fn init_endpoints( db: PgRepo, ) -> impl Filter + Clone { let path = |p: &'static str| warp::post().and(warp::path(p)); - let infallible_none = |_| async { Ok::<_, Infallible>((None::,)) }; + let infallible_none = |_| async { Result::<(Option,), Infallible>::Ok((None,)) }; let with_db = { let adb = Arc::new(db); @@ -67,7 +59,7 @@ pub fn init_endpoints( let upload_jobs = path("upload_jobs") .and(with_db.clone()) - .and(get_content::>()) + .and(body::json::>()) .and_then(Endpoints::upload_jobs) .map(into_message); @@ -107,31 +99,32 @@ pub fn init_endpoints( let set_jobs = path("set_jobs") .and(with_db.clone()) .and(warp::path::param::()) - .and(get_content::>()) + .and(body::json::>()) .and_then(Endpoints::set_jobs) .map(into_message); let report = path("report") .and(with_db.clone()) - .and(get_content::>()) + .and(body::json::>()) + .and(warp::header("User-Agent")) .and_then(Endpoints::report) .map(ok); let update_agent = path("update_agent") .and(with_db.clone()) - .and(get_content::()) + .and(body::json::()) .and_then(Endpoints::update_agent) .map(ok); let update_job = path("update_job") .and(with_db.clone()) - .and(get_content::()) + .and(body::json::()) .and_then(Endpoints::update_job) .map(ok); let update_assigned_job = path("update_result") .and(with_db.clone()) - .and(get_content::()) + .and(body::json::()) .and_then(Endpoints::update_assigned_job) .map(ok); @@ -226,11 +219,11 @@ async fn handle_rejection(rej: Rejection) -> Result { fn logger(info: Info<'_>) { info!(target: "warp", - "{raddr} {agent_uid} \"{path}\" {status}", + "{raddr} {agent_id} \"{path}\" {status}", raddr = info.remote_addr().unwrap_or(([0, 0, 0, 0], 0).into()), path = info.path(), - agent_uid = info.user_agent() - .map(|uid: &str| uid.splitn(3, '-') + agent_id = info.user_agent() + .map(|id: &str| id.splitn(3, '-') .take(2) .collect::() ) diff --git a/images/u_agent.Dockerfile b/images/u_agent.Dockerfile index 3058997..ca455aa 100644 --- a/images/u_agent.Dockerfile +++ b/images/u_agent.Dockerfile @@ -1,3 +1,3 @@ -FROM centos:7 +FROM alpine:3.17 -RUN yum update -y \ No newline at end of file +RUN apk add bash \ No newline at end of file diff --git a/images/u_server.Dockerfile b/images/u_server.Dockerfile index 691a618..122ef4d 100644 --- a/images/u_server.Dockerfile +++ b/images/u_server.Dockerfile @@ -1,3 +1,3 @@ -FROM alpine:latest +FROM alpine:3.17 RUN apk add iproute2 bash \ No newline at end of file diff --git a/integration/tests/fixtures/agent.rs b/integration/tests/fixtures/agent.rs index 131e197..766f884 100644 --- a/integration/tests/fixtures/agent.rs +++ b/integration/tests/fixtures/agent.rs @@ -1,28 +1,28 @@ use crate::helpers::ENV; use u_lib::{ - api::ClientHandler, config::get_self_uid, jobs::fat_meta_to_thin, messaging::Reportable, + api::ClientHandler, config::get_self_id, jobs::fat_meta_to_thin, messaging::Reportable, models::*, }; use uuid::Uuid; pub struct RegisteredAgent { - pub uid: Uuid, + pub id: Uuid, } impl RegisteredAgent { pub async fn unregister(self) { let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap(); - cli.del(self.uid).await.unwrap(); + cli.del(self.id).await.unwrap(); } } #[fixture] pub async fn register_agent() -> RegisteredAgent { let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap(); - let agent_uid = get_self_uid(); - println!("registering agent {agent_uid}"); + let agent_id = get_self_id(); + println!("registering agent {agent_id}"); let resp = cli - .get_personal_jobs(agent_uid) + .get_personal_jobs(agent_id) .await .unwrap() .pop() @@ -31,7 +31,7 @@ pub async fn register_agent() -> RegisteredAgent { let job = cli.get_job(job_id).await.unwrap(); assert_eq!(job.alias, Some("agent_hello".to_string())); let mut agent_data = AssignedJob::from((&fat_meta_to_thin(job).unwrap(), resp)); - agent_data.set_result(&Agent::with_id(agent_uid)); + agent_data.set_result(&Agent::with_id(agent_id)); cli.report(Reportable::Assigned(agent_data)).await.unwrap(); - RegisteredAgent { uid: agent_uid } + RegisteredAgent { id: agent_id } } diff --git a/integration/tests/helpers/jobs.rs b/integration/tests/helpers/jobs.rs new file mode 100644 index 0000000..c266f27 --- /dev/null +++ b/integration/tests/helpers/jobs.rs @@ -0,0 +1,22 @@ +use std::{fmt::Debug, time::Duration}; +use tokio::time::sleep; + +pub async fn retry_with_interval( + retries: usize, + interval: Duration, + f: impl Fn() -> Result, +) -> T { + let mut err = String::new(); + for i in 0..retries { + eprintln!("retrier: {} attempt...", i + 1); + let result = f(); + match result { + Ok(r) => return r, + Err(e) => { + err = format!("{e:?}"); + sleep(interval).await; + } + } + } + panic!("{err}"); +} diff --git a/integration/tests/helpers/mod.rs b/integration/tests/helpers/mod.rs index 6bfc9d0..3ac8a05 100644 --- a/integration/tests/helpers/mod.rs +++ b/integration/tests/helpers/mod.rs @@ -1,3 +1,4 @@ +pub mod jobs; pub mod panel; pub use panel::Panel; diff --git a/integration/tests/integration/behaviour.rs b/integration/tests/integration/behaviour.rs index ecfe2cf..397b893 100644 --- a/integration/tests/integration/behaviour.rs +++ b/integration/tests/integration/behaviour.rs @@ -1,10 +1,9 @@ use crate::fixtures::agent::*; -use crate::helpers::Panel; +use crate::helpers::{jobs::retry_with_interval, Panel}; use rstest::rstest; use serde_json::{json, to_string}; -use std::time::Duration; -use tokio::time::sleep; +use u_lib::config::AGENT_ITERATION_INTERVAL; use u_lib::models::*; use uuid::Uuid; @@ -13,15 +12,15 @@ use uuid::Uuid; async fn registration(#[future] register_agent: RegisteredAgent) { let agent = register_agent.await; let agents: Vec = Panel::check_output("agents read"); - let found = agents.iter().find(|v| v.id == agent.uid); + let found = agents.iter().find(|v| v.id == agent.id); assert!(found.is_some()); - Panel::check_status(format!("agents delete {}", agent.uid)); + Panel::check_status(format!("agents delete {}", agent.id)); } #[tokio::test] async fn setup_tasks() { let agents: Vec = Panel::check_output("agents read"); - let agent_uid = agents[0].id; + let agent_id = agents[0].id; let job_alias = "passwd_contents"; let job = json!( {"alias": job_alias, "payload": b"cat /etc/passwd", "argv": "/bin/bash {}" } @@ -29,51 +28,52 @@ async fn setup_tasks() { Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]); - let cmd = format!("map create {} {}", agent_uid, job_alias); - let assigned_uids: Vec = Panel::check_output(cmd); + let cmd = format!("map create {} {}", agent_id, job_alias); + let assigned_ids: Vec = Panel::check_output(cmd); - for _ in 0..3 { + retry_with_interval(3, AGENT_ITERATION_INTERVAL, || { let result = - Panel::check_output::>(format!("map read {}", assigned_uids[0])) + Panel::check_output::>(format!("map read {}", assigned_ids[0])) .remove(0); if result.state == JobState::Finished { - assert!(result.to_str_result().contains("root:x:0:0::/root")); + eprintln!("{}", result.to_str_result()); + assert!(result.to_str_result().contains("root:x:0:0")); + Ok(()) } else { - sleep(Duration::from_secs(5)).await; - eprintln!("waiting for task"); + Err("job didn't finish") } - } - panic!("Job didn't appear in the job map"); + }) + .await; } #[tokio::test] async fn large_payload() { - let agent_uid = Panel::check_output::>("agents read")[0].id; + let agent_id = Panel::check_output::>("agents read")[0].id; let job_alias = "large_payload"; let job = FatJobMeta::builder() .with_alias(job_alias) .with_payload_path("./tests/bin/echoer") - .with_shell("{} 'type echo'") + .with_shell("{} type echo") .build() .unwrap(); Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]); - let cmd = format!("map create {agent_uid} {job_alias}"); - let assigned_uids: Vec = Panel::check_output(cmd); + let cmd = format!("map create {agent_id} {job_alias}"); + let assigned_ids: Vec = Panel::check_output(cmd); - for _ in 0..3 { + retry_with_interval(3, AGENT_ITERATION_INTERVAL, || { let result = - Panel::check_output::>(format!("map read {}", assigned_uids[0])) + Panel::check_output::>(format!("map read {}", assigned_ids[0])) .remove(0); if result.state == JobState::Finished { - assert_eq!(result.to_str_result(), "type echo"); + assert_eq!(result.to_str_result(), "type echo\n"); + Ok(()) } else { - sleep(Duration::from_secs(5)).await; - eprintln!("waiting for task"); + Err("job didn't finish") } - } - panic!("Job didn't appear in the job map"); + }) + .await; } diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 34218e2..370ea8d 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -9,9 +9,9 @@ use serde_json::{from_str, Value}; use uuid::Uuid; use crate::{ - config::{get_self_uid, MASTER_PORT}, + config::{get_self_id, MASTER_PORT}, conv::opt_to_string, - messaging::{self, AsMsg, BaseMessage}, + messaging::{self, AsMsg}, misc::OneOrVec, models::{self}, UError, UResult, @@ -30,7 +30,7 @@ impl ClientHandler { pub async fn new(server: &str, password: Option) -> UResult { let identity = Identity::from_pkcs12_der(AGENT_IDENTITY, "").unwrap(); let mut default_headers = - HashMap::from([(header::USER_AGENT, get_self_uid().hyphenated().to_string())]); + HashMap::from([(header::USER_AGENT, get_self_id().hyphenated().to_string())]); if let Some(pwd) = password { default_headers.insert(header::AUTHORIZATION, format!("Bearer {pwd}")); @@ -86,7 +86,7 @@ impl ClientHandler { let request = self .client .post(self.base_url.join(url.as_ref()).unwrap()) - .json(&payload.as_message()); + .json(&payload); let response = request .send() @@ -100,12 +100,10 @@ impl ClientHandler { let resp = response.text().await.context("resp")?; let result = match is_success { - Ok(_) => from_str::>(&resp) - .map(|msg| msg.into_inner()) - .or_else(|e| match content_len { - Some(0) => Ok(Default::default()), - _ => Err(UError::NetError(e.to_string(), resp)), - }), + Ok(_) => from_str::(&resp).or_else(|e| match content_len { + Some(0) => Ok(Default::default()), + _ => Err(UError::NetError(e.to_string(), resp)), + }), Err(UError::NetError(err, _)) => Err(UError::NetError(err, resp)), _ => unreachable!(), } diff --git a/lib/u_lib/src/cache.rs b/lib/u_lib/src/cache.rs index 7d933e4..0a10b81 100644 --- a/lib/u_lib/src/cache.rs +++ b/lib/u_lib/src/cache.rs @@ -17,20 +17,20 @@ impl JobCache { JOB_CACHE.write().insert(job_meta.id, job_meta); } - pub fn contains(uid: Uuid) -> bool { - JOB_CACHE.read().contains_key(&uid) + pub fn contains(id: Uuid) -> bool { + JOB_CACHE.read().contains_key(&id) } - pub fn get<'jh>(uid: Uuid) -> Option> { - if !Self::contains(uid) { + pub fn get<'jh>(id: Uuid) -> Option> { + if !Self::contains(id) { return None; } let lock = JOB_CACHE.read(); - Some(JobCacheHolder(lock, uid)) + Some(JobCacheHolder(lock, id)) } - pub fn remove(uid: Uuid) { - JOB_CACHE.write().remove(&uid); + pub fn remove(id: Uuid) { + JOB_CACHE.write().remove(&id); } } diff --git a/lib/u_lib/src/config.rs b/lib/u_lib/src/config.rs index 7eab224..b189508 100644 --- a/lib/u_lib/src/config.rs +++ b/lib/u_lib/src/config.rs @@ -1,19 +1,22 @@ use envy::{from_env, prefixed, Result as EnvResult}; use lazy_static::lazy_static; use serde::Deserialize; +use std::time::Duration; use uuid::Uuid; pub use envy::Error; pub const MASTER_PORT: u16 = 63714; +pub const AGENT_ITERATION_INTERVAL: Duration = Duration::from_secs(5); + lazy_static! { - static ref UID: Uuid = Uuid::new_v4(); + static ref ID: Uuid = Uuid::new_v4(); } #[inline] -pub fn get_self_uid() -> Uuid { - *UID +pub fn get_self_id() -> Uuid { + *ID } #[derive(Deserialize)] diff --git a/lib/u_lib/src/jobs.rs b/lib/u_lib/src/jobs.rs index ea29e49..ea0efd3 100644 --- a/lib/u_lib/src/jobs.rs +++ b/lib/u_lib/src/jobs.rs @@ -10,12 +10,12 @@ use std::collections::HashMap; use std::process::exit; use tokio::process::Command; -pub struct UnnamedJobsBatch { +pub struct AnonymousJobBatch { waiter: Waiter, is_running: bool, } -impl UnnamedJobsBatch { +impl AnonymousJobBatch { pub fn from_meta_with_id(jobs: impl OneOrVec<(ThinJobMeta, AssignedJobById)>) -> Self { let jobs = jobs.into_vec(); let mut waiter = Waiter::new(); @@ -43,7 +43,7 @@ impl UnnamedJobsBatch { ) }) .collect(); - UnnamedJobsBatch::from_meta_with_id(jobs) + AnonymousJobBatch::from_meta_with_id(jobs) } /// Spawn jobs @@ -69,6 +69,82 @@ impl UnnamedJobsBatch { } } +/// Store jobs and get results by name +pub struct NamedJobBatch { + runner: Option, + job_names: Vec, + results: HashMap, +} + +impl NamedJobBatch { + pub fn from_shell( + named_jobs: impl OneOrVec<(&'static str, &'static str)>, + ) -> CombinedResult { + let mut result = CombinedResult::new(); + let jobs: Vec<_> = named_jobs + .into_vec() + .into_iter() + .filter_map(|(alias, cmd)| { + match FatJobMeta::builder() + .with_shell(cmd) + .with_alias(alias) + .build() + { + Ok(fat_meta) => match fat_meta_to_thin(fat_meta) { + Ok(thin_meta) => Some(thin_meta), + Err(e) => { + result.err(e); + None + } + }, + Err(e) => { + result.err(e); + None + } + } + }) + .collect(); + result.ok(Self::from_meta(jobs)); + result + } + + pub fn from_meta(named_jobs: impl OneOrVec) -> Self { + let (job_names, job_metas): (Vec<_>, Vec<_>) = named_jobs + .into_vec() + .into_iter() + .map(|meta| (meta.alias.clone().unwrap(), meta)) + .unzip(); + Self { + runner: Some(AnonymousJobBatch::from_meta(job_metas)), + job_names, + results: HashMap::new(), + } + } + + pub async fn wait(mut self) -> NamedJobBatch { + let results = self.runner.take().unwrap().wait().await; + for (name, result) in self.job_names.into_iter().zip(results.into_iter()) { + self.results.insert(name, result); + } + + NamedJobBatch:: { + runner: None, + job_names: vec![], + results: self.results, + } + } +} + +impl NamedJobBatch { + pub fn pop_opt(&mut self, name: &'static str) -> Option { + self.results.remove(name) + } + + pub fn pop(&mut self, name: &'static str) -> ExecResult { + self.pop_opt(name).unwrap() + } +} + pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecResult { let mut job = AssignedJob::from((&meta, ids)); match meta.exec_type { @@ -79,7 +155,7 @@ pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecRe let argv_with_exec = meta.argv.replace("{}", &prep_exec_path); (argv_with_exec, Some(prep_exec)) } else { - (meta.argv.clone(), None) + (meta.argv, None) } }; @@ -156,87 +232,11 @@ pub fn thin_meta_to_fat(meta: ThinJobMeta) -> Result, ufs::Erro }) } -/// Store jobs and get results by name -pub struct NamedJobsBatch { - runner: Option, - job_names: Vec, - results: HashMap, -} - -impl NamedJobsBatch { - pub fn from_shell( - named_jobs: impl OneOrVec<(&'static str, &'static str)>, - ) -> CombinedResult { - let mut result = CombinedResult::new(); - let jobs: Vec<_> = named_jobs - .into_vec() - .into_iter() - .filter_map(|(alias, cmd)| { - match FatJobMeta::builder() - .with_shell(cmd) - .with_alias(alias) - .build() - { - Ok(fat_meta) => match fat_meta_to_thin(fat_meta) { - Ok(thin_meta) => Some(thin_meta), - Err(e) => { - result.err(e); - None - } - }, - Err(e) => { - result.err(e); - None - } - } - }) - .collect(); - result.ok(Self::from_meta(jobs)); - result - } - - pub fn from_meta(named_jobs: impl OneOrVec) -> Self { - let (job_names, job_metas): (Vec<_>, Vec<_>) = named_jobs - .into_vec() - .into_iter() - .map(|meta| (meta.alias.clone().unwrap(), meta)) - .unzip(); - Self { - runner: Some(UnnamedJobsBatch::from_meta(job_metas)), - job_names, - results: HashMap::new(), - } - } - - pub async fn wait(mut self) -> NamedJobsBatch { - let results = self.runner.take().unwrap().wait().await; - for (name, result) in self.job_names.into_iter().zip(results.into_iter()) { - self.results.insert(name, result); - } - - NamedJobsBatch:: { - runner: None, - job_names: vec![], - results: self.results, - } - } -} - -impl NamedJobsBatch { - pub fn pop_opt(&mut self, name: &'static str) -> Option { - self.results.remove(name) - } - - pub fn pop(&mut self, name: &'static str) -> ExecResult { - self.pop_opt(name).unwrap() - } -} - #[cfg(test)] mod tests { use super::*; use crate::{ - jobs::{NamedJobsBatch, UnnamedJobsBatch}, + jobs::{AnonymousJobBatch, NamedJobBatch}, models::{misc::JobType, FatJobMeta}, unwrap_enum, UError, }; @@ -253,7 +253,7 @@ mod tests { .collect::>(); let now = SystemTime::now(); - UnnamedJobsBatch::from_meta(sleep_jobs).wait().await; + AnonymousJobBatch::from_meta(sleep_jobs).wait().await; assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) } @@ -292,7 +292,7 @@ mod tests { job = job.with_payload(p); } let job = fat_meta_to_thin(job.build().unwrap()).unwrap(); - let result = UnnamedJobsBatch::from_meta(job).wait_one().await.unwrap(); + let result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap(); let result = result.to_str_result(); assert_eq!(result.trim(), expected_result); Ok(()) @@ -303,10 +303,10 @@ mod tests { const SLEEP_SECS: u64 = 1; let now = SystemTime::now(); let longest_job = FatJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); - let longest_job = UnnamedJobsBatch::from_meta(fat_meta_to_thin(longest_job).unwrap()) + let longest_job = AnonymousJobBatch::from_meta(fat_meta_to_thin(longest_job).unwrap()) .spawn() .await; - let ls = UnnamedJobsBatch::from_meta( + let ls = AnonymousJobBatch::from_meta( fat_meta_to_thin(FatJobMeta::from_shell("ls").unwrap()).unwrap(), ) .wait_one() @@ -321,7 +321,7 @@ mod tests { .map(|f| fat_meta_to_thin(FatJobMeta::from_shell(format!("ls {f}")).unwrap()).unwrap()) .collect::>(); - let ls_subfolders = UnnamedJobsBatch::from_meta(subfolders_jobs).wait().await; + let ls_subfolders = AnonymousJobBatch::from_meta(subfolders_jobs).wait().await; for result in ls_subfolders { assert_eq!(result.unwrap().retcode.unwrap(), 0); @@ -352,7 +352,7 @@ mod tests { #[tokio::test] async fn test_failing_shell_job() -> TestResult { let job = fat_meta_to_thin(FatJobMeta::from_shell("lol_kek_puk").unwrap()).unwrap(); - let job_result = UnnamedJobsBatch::from_meta(job).wait_one().await.unwrap(); + let job_result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap(); let output = job_result.to_str_result(); assert!(output.contains("No such file")); assert!(job_result.retcode.is_none()); @@ -380,7 +380,7 @@ mod tests { #[tokio::test] async fn test_different_job_types() -> TestResult { - let mut jobs = NamedJobsBatch::from_meta( + let mut jobs = NamedJobBatch::from_meta( [ FatJobMeta::builder() .with_shell("sleep 3") diff --git a/lib/u_lib/src/messaging/base.rs b/lib/u_lib/src/messaging/base.rs deleted file mode 100644 index b3bc49c..0000000 --- a/lib/u_lib/src/messaging/base.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::config::get_self_uid; -use serde::{Deserialize, Serialize}; -use std::{borrow::Cow, fmt::Debug}; -use uuid::Uuid; - -pub struct Moo<'cow, T: AsMsg + Clone>(pub Cow<'cow, T>); - -pub trait AsMsg: Clone + Serialize + Debug { - fn as_message(&self) -> BaseMessage<'_, Self> { - BaseMessage::new(self) - } -} - -impl<'cow, M: AsMsg> From for Moo<'cow, M> { - #[inline] - fn from(obj: M) -> Moo<'cow, M> { - Moo(Cow::Owned(obj)) - } -} - -impl<'cow, M: AsMsg> From<&'cow M> for Moo<'cow, M> { - #[inline] - fn from(obj: &'cow M) -> Moo<'cow, M> { - Moo(Cow::Borrowed(obj)) - } -} - -impl AsMsg for Vec {} -impl<'msg, M: AsMsg> AsMsg for &'msg [M] {} - -#[derive(Serialize, Deserialize, Debug)] -pub struct BaseMessage<'cow, I: AsMsg> { - pub id: Uuid, - inner: Cow<'cow, I>, -} - -impl<'cow, I: AsMsg> BaseMessage<'cow, I> { - pub fn new(inner: C) -> Self - where - C: Into>, - { - let Moo(inner) = inner.into(); - Self { - id: get_self_uid(), - inner, - } - } - - pub fn into_inner(self) -> I { - self.inner.into_owned() - } - - pub fn as_inner(&self) -> &I { - self.inner.as_ref() - } -} diff --git a/lib/u_lib/src/messaging/mod.rs b/lib/u_lib/src/messaging/mod.rs index e425630..9e089fb 100644 --- a/lib/u_lib/src/messaging/mod.rs +++ b/lib/u_lib/src/messaging/mod.rs @@ -1,13 +1,14 @@ -mod base; mod files; use crate::models::*; use crate::UError; -pub use base::{AsMsg, BaseMessage}; pub use files::*; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use uuid::Uuid; +pub trait AsMsg: Clone + Serialize + Debug {} + impl AsMsg for Agent {} impl AsMsg for AssignedJob {} impl AsMsg for AssignedJobById {} @@ -21,6 +22,9 @@ impl AsMsg for i32 {} impl AsMsg for u8 {} impl AsMsg for () {} +impl AsMsg for Vec {} +impl<'msg, M: AsMsg> AsMsg for &'msg [M] {} + #[derive(Serialize, Deserialize, Clone, Debug)] pub enum Reportable { Assigned(AssignedJob), diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index eb9e8d6..029b441 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -11,7 +11,7 @@ mod server { #[cfg(feature = "server")] use self::server::*; -use crate::{config::get_self_uid, executor::ExecResult, jobs::NamedJobsBatch, platform::Platform}; +use crate::{config::get_self_id, executor::ExecResult, jobs::NamedJobBatch, platform::Platform}; use uuid::Uuid; @@ -52,9 +52,9 @@ pub struct Agent { } impl Agent { - pub fn with_id(uid: Uuid) -> Self { + pub fn with_id(id: Uuid) -> Self { Self { - id: uid, + id, ..Default::default() } } @@ -80,7 +80,7 @@ impl Agent { ("username", "id -un"), ]; - let mut builder = NamedJobsBatch::from_shell(cmds).unwrap_one().wait().await; + let mut builder = NamedJobBatch::from_shell(cmds).unwrap_one().wait().await; let decoder = |job_result: ExecResult| job_result.unwrap().to_str_result().trim().to_string(); @@ -103,7 +103,7 @@ impl Default for Agent { fn default() -> Self { Self { alias: None, - id: get_self_uid(), + id: get_self_id(), hostname: String::new(), host_info: String::new(), is_root: false, diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index 57bee7f..d19cade 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -1,5 +1,5 @@ use super::{JobState, JobType, ThinJobMeta}; -use crate::config::get_self_uid; +use crate::config::get_self_id; #[cfg(feature = "server")] use crate::models::schema::*; #[cfg(feature = "server")] @@ -50,7 +50,7 @@ impl From<(&ThinJobMeta, AssignedJobById)> for AssignedJob { impl Default for AssignedJobById { fn default() -> Self { Self { - agent_id: get_self_uid(), + agent_id: get_self_id(), id: Uuid::new_v4(), job_id: Uuid::nil(), } diff --git a/lib/u_lib/src/models/payload.rs b/lib/u_lib/src/models/payload.rs index 3ec78bd..f8acf58 100644 --- a/lib/u_lib/src/models/payload.rs +++ b/lib/u_lib/src/models/payload.rs @@ -19,7 +19,6 @@ impl Payload { let data = ufs::read(ident)?; *self = Payload::Data(data); - Ok(()) } } @@ -32,7 +31,6 @@ impl Payload { ufs::put(&name, data)?; *self = Payload::Ident(name.as_ref().to_string()); - Ok(()) } } diff --git a/lib/u_lib/src/ufs/mod.rs b/lib/u_lib/src/ufs/mod.rs index c63f9d6..13de156 100644 --- a/lib/u_lib/src/ufs/mod.rs +++ b/lib/u_lib/src/ufs/mod.rs @@ -152,7 +152,7 @@ pub fn prepare_executable(name: impl AsRef) -> Result<(File, String), Error loop { let bytes_read = payload_src.read(&mut buffer)?; - payload_dest.write(&buffer)?; + payload_dest.write(&buffer[..bytes_read])?; if bytes_read != BUFFER_LEN { break; @@ -164,3 +164,11 @@ pub fn prepare_executable(name: impl AsRef) -> Result<(File, String), Error Err(e) => Err(Error::new(e, FAKE_EXEC_NAME)), } } + +pub fn cleanup() { + let index = INDEX.read(); + + index.values().for_each(|f| { + fs::remove_file(&f.path).ok(); + }); +} diff --git a/lib/u_lib/tests/fixtures/echoer.rs b/lib/u_lib/tests/fixtures/echoer.rs index cc7eb02..3958fcf 100644 --- a/lib/u_lib/tests/fixtures/echoer.rs +++ b/lib/u_lib/tests/fixtures/echoer.rs @@ -1,5 +1,5 @@ use std::env; fn main() { - println!("{}", env::args().nth(1).unwrap_or(String::new())); + println!("{}", env::args().skip(1).collect::>().join(" ")); }