diff --git a/Cargo.lock b/Cargo.lock index 8d0a08b..9e3b502 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1195,6 +1195,7 @@ dependencies = [ name = "integration" version = "0.1.0" dependencies = [ + "futures", "once_cell", "reqwest", "rstest", @@ -2690,6 +2691,7 @@ dependencies = [ "deadpool-diesel", "diesel", "hyper", + "mime_guess", "once_cell", "openssl", "rstest", diff --git a/Cargo.toml b/Cargo.toml index 224420d..850aad0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ anyhow = "=1.0.63" deadpool-diesel = "0.4.0" diesel = { version = "2", features = ["postgres", "uuid"] } +mime_guess = "2.0" openssl = "0.10" reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0", features = ["derive"] } diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index 08cc6a3..7f08635 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -10,7 +10,7 @@ use u_lib::{ config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL}, error::ErrChan, executor::pop_completed, - jobs::{fat_meta_to_thin, AnonymousJobBatch}, + jobs::{split_payload, AnonymousJobBatch}, logging::init_logger, messaging::Reportable, models::AssignedJobById, @@ -30,8 +30,8 @@ pub async fn process_request(jobs: Vec, client: &ClientHandler) } } }; - match fat_meta_to_thin(fetched_job) { - Ok(thin_meta) => JobCache::insert(thin_meta), + match split_payload(fetched_job) { + Ok(job_payload_meta) => JobCache::insert(job_payload_meta), Err(e) => ErrChan::send(e, "pld").await, } } @@ -109,7 +109,7 @@ pub fn run_forever() -> ! { let env = EndpointsEnv::load(); if cfg!(debug_assertions) { - init_logger(Some(format!( + let logfile_uid = format!( "u_agent-{}", get_self_id() .hyphenated() @@ -117,7 +117,8 @@ pub fn run_forever() -> ! { .split("-") .next() .unwrap() - ))); + ); + init_logger(Some(logfile_uid)); } else { #[cfg(unix)] u_lib::unix::daemonize() diff --git a/bin/u_panel/Cargo.toml b/bin/u_panel/Cargo.toml index e6705f6..083c7e0 100644 --- a/bin/u_panel/Cargo.toml +++ b/bin/u_panel/Cargo.toml @@ -11,7 +11,7 @@ actix-cors = "0.6.1" actix-web = "4.1" anyhow = { workspace = true } futures-util = "0.3.21" -mime_guess = "2.0.4" +mime_guess = { workspace = true } once_cell = "1.8.0" rust-embed = { version = "6.3.0", features = ["debug-embed", "compression"] } serde = { workspace = true } diff --git a/bin/u_panel/src/argparse.rs b/bin/u_panel/src/argparse.rs index 8a13e99..eeaf168 100644 --- a/bin/u_panel/src/argparse.rs +++ b/bin/u_panel/src/argparse.rs @@ -2,8 +2,9 @@ use serde_json::{from_str, to_value, Value}; use structopt::StructOpt; use u_lib::{ api::ClientHandler, + jobs::join_payload, messaging::AsMsg, - models::{Agent, AssignedJob, FatJobMeta}, + models::{Agent, AssignedJob, RawJob}, types::Id, types::PanelResult, UError, UResult, @@ -87,14 +88,11 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult match action { JobCRUD::Create { job } => { - let raw_job = from_str::(&job)?; - let mut job = raw_job.validated()?; - - if let Some(payload) = &mut job.payload { - payload.read_into_self()?; - } + let raw_job = from_str::(&job)?; + let job = raw_job.validated()?; + let fat_job = join_payload(job)?; - into_value(client.upload_jobs(job).await?) + into_value(client.upload_jobs(fat_job).await?) } JobCRUD::RUD(RUD::Read { id }) => match id { //todo: use vec not to break frontend api, possibly refactor later @@ -102,9 +100,9 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult into_value(client.get_jobs().await?), }, JobCRUD::RUD(RUD::Update { item }) => { - let raw_job = from_str::(&item)?; + let raw_job = from_str::(&item)?; let job = raw_job.validated()?; - into_value(client.update_job(job).await?) + into_value(client.update_job(join_payload(job)?).await?) } JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), }, diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index ee1e2a8..9b800b2 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -9,6 +9,7 @@ anyhow = { workspace = true } diesel = { workspace = true } deadpool-diesel = { workspace = true } hyper = "0.14" +mime_guess = { workspace = true } once_cell = "1.7.2" openssl = { workspace = true } serde = { workspace = true } diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 23b1dc1..6d43f96 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -1,7 +1,8 @@ use crate::error::Error; use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection}; +use std::mem::drop; use u_lib::db::PgAsyncPool; -use u_lib::models::{schema, Agent, AssignedJob, JobState, ThinJobMeta}; +use u_lib::models::{schema, Agent, AssignedJob, JobModel, JobState, PayloadMeta, ThinJob}; use u_lib::platform::Platform; use u_lib::types::Id; @@ -47,27 +48,46 @@ pub struct UDB<'c> { } impl UDB<'_> { - pub fn insert_jobs(&mut self, job_metas: &[ThinJobMeta]) -> Result> { - use schema::jobs; + pub fn insert_jobs(&mut self, jobs: &[ThinJob]) -> Result<()> { + use schema::{jobs, payloads}; + + let (jobs, payloads_opt): (Vec<_>, Vec<_>) = jobs + .iter() + .map(|j| (&j.job, j.payload_meta.as_ref())) + .unzip(); + + let payloads = payloads_opt + .into_iter() + .filter_map(|p| p) + .collect::>(); + + diesel::insert_into(payloads::table) + .values(payloads) + .execute(self.conn) + .map(drop) + .map_err(with_err_ctx("Can't insert payloads"))?; diesel::insert_into(jobs::table) - .values(job_metas) - .get_results(self.conn) - .map(|rows| rows.iter().map(|job: &ThinJobMeta| job.id).collect()) + .values(jobs) + .execute(self.conn) + .map(drop) .map_err(with_err_ctx("Can't insert jobs")) } - pub fn get_job(&mut self, id: Id) -> Result> { - use schema::jobs; + pub fn get_job(&mut self, id: Id) -> Result> { + use schema::{jobs, payloads}; - jobs::table + let maybe_job_with_payload = jobs::table + .left_join(payloads::table) .filter(jobs::id.eq(id)) - .first(self.conn) + .first::<(JobModel, Option)>(self.conn) .optional() - .map_err(with_err_ctx(format!("Can't get job {id}"))) + .map_err(with_err_ctx(format!("Can't get job {id}")))?; + + Ok(maybe_job_with_payload.map(|(job, payload_meta)| ThinJob { job, payload_meta })) } - pub fn get_jobs(&mut self) -> Result> { + pub fn get_jobs(&mut self) -> Result> { use schema::jobs; jobs::table @@ -75,27 +95,17 @@ impl UDB<'_> { .map_err(with_err_ctx("Can't get jobs")) } - pub fn find_job_by_alias(&mut self, alias: &str) -> Result> { - use schema::jobs; + pub fn find_job_by_alias(&mut self, alias: &str) -> Result> { + use schema::{jobs, payloads}; - jobs::table + let maybe_job_with_payload = jobs::table + .left_join(payloads::table) .filter(jobs::alias.eq(alias)) - .first(self.conn) + .first::<(JobModel, Option)>(self.conn) .optional() - .map_err(with_err_ctx(format!("Can't find job by alias {alias}"))) - } - - pub fn insert_agent(&mut self, agent: &Agent) -> Result<()> { - use schema::agents; + .map_err(with_err_ctx(format!("Can't get job by alias {alias}")))?; - diesel::insert_into(agents::table) - .values(agent) - .on_conflict(agents::id) - .do_update() - .set(agent) - .execute(self.conn) - .map_err(with_err_ctx(format!("Can't insert agent {agent:?}")))?; - Ok(()) + Ok(maybe_job_with_payload.map(|(job, payload_meta)| ThinJob { job, payload_meta })) } pub fn insert_result(&mut self, result: &AssignedJob) -> Result<()> { @@ -163,7 +173,7 @@ impl UDB<'_> { Ok(result) } - pub fn set_jobs_for_agent(&mut self, agent_id: Id, job_ids: &[Id]) -> Result> { + pub fn set_jobs_for_agent(&mut self, agent_id: Id, job_ids: &[Id]) -> Result<()> { use schema::{jobs, results}; let agent_platform = match self.get_agent(agent_id)? { @@ -203,64 +213,51 @@ impl UDB<'_> { diesel::insert_into(results::table) .values(&job_requests) .execute(self.conn) + .map(drop) .map_err(with_err_ctx(format!( "Can't setup jobs {job_ids:?} for agent {agent_id:?}" - )))?; - - Ok(job_requests.iter().map(|aj| aj.id).collect()) + ))) } - pub fn del_jobs(&mut self, ids: &[Id]) -> Result { + pub fn del_jobs(&mut self, ids: &[Id]) -> Result<()> { use schema::jobs; - let mut affected = 0; - for id in ids { - let deleted = diesel::delete(jobs::table) - .filter(jobs::id.eq(id)) - .execute(self.conn) - .map_err(with_err_ctx("Can't delete jobs"))?; - affected += deleted; - } - Ok(affected) + diesel::delete(jobs::table) + .filter(jobs::id.eq_any(ids)) + .execute(self.conn) + .map(drop) + .map_err(with_err_ctx("Can't delete jobs")) } - pub fn del_results(&mut self, ids: &[Id]) -> Result { + pub fn del_results(&mut self, ids: &[Id]) -> Result<()> { use schema::results; - let mut affected = 0; - for id in ids { - let deleted = diesel::delete(results::table) - .filter(results::id.eq(id)) - .execute(self.conn) - .map_err(with_err_ctx("Can't delete results"))?; - affected += deleted; - } - Ok(affected) + diesel::delete(results::table) + .filter(results::id.eq_any(ids)) + .execute(self.conn) + .map(drop) + .map_err(with_err_ctx("Can't delete results")) } - pub fn del_agents(&mut self, ids: &[Id]) -> Result { + pub fn del_agents(&mut self, ids: &[Id]) -> Result<()> { use schema::agents; - let mut affected = 0; - for id in ids { - let deleted = diesel::delete(agents::table) - .filter(agents::id.eq(id)) - .execute(self.conn) - .map_err(with_err_ctx("Can't delete agents"))?; - affected += deleted; - } - Ok(affected) + diesel::delete(agents::table) + .filter(agents::id.eq_any(ids)) + .execute(self.conn) + .map(drop) + .map_err(with_err_ctx("Can't delete agents")) } - pub fn update_agent(&mut self, agent: &Agent) -> Result<()> { + pub fn upsert_agent(&mut self, agent: &Agent) -> Result<()> { agent .save_changes::(self.conn) - .map_err(with_err_ctx(format!("Can't update agent {agent:?}")))?; + .map_err(with_err_ctx(format!("Can't upsert agent {agent:?}")))?; Ok(()) } - pub fn update_job(&mut self, job: &ThinJobMeta) -> Result<()> { - job.save_changes::(self.conn) + pub fn update_job(&mut self, job: &JobModel) -> Result<()> { + job.save_changes::(self.conn) .map_err(with_err_ctx(format!("Can't update job {job:?}")))?; Ok(()) } diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index c1f037d..be2a260 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -2,19 +2,25 @@ use std::sync::Arc; use crate::db::{PgRepo, UDB}; use crate::error::Error; -use crate::ValidJobMeta; +use serde::Deserialize; +use u_lib::jobs::join_payload; use u_lib::{ - jobs::{fat_meta_to_thin, thin_meta_to_fat}, + jobs::split_payload, messaging::{AsMsg, Reportable}, misc::OneOrVec, models::*, - types::Id + types::Id, }; use warp::reject::not_found; use warp::Rejection; type EndpResult = Result; +#[derive(Deserialize)] +pub struct GetJobQuery { + force_payload: bool, +} + pub struct Endpoints; impl Endpoints { @@ -35,25 +41,32 @@ impl Endpoints { .map_err(From::from) } - pub async fn get_job(repo: Arc, id: Id) -> EndpResult { + pub async fn get_job(repo: Arc, id: Id, params: GetJobQuery) -> EndpResult { let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else { return Err(not_found()) }; - let fat_meta = thin_meta_to_fat(job).map_err(Error::from)?; - Ok(fat_meta) + if let Some(meta) = &job.payload_meta { + let max_readable_payload_size = 1024 * 8; + if !params.force_payload && meta.size > max_readable_payload_size { + return Ok(FatJob { + job: job.job, + payload_meta: job.payload_meta, + payload_data: None, + }); + } + } + + Ok(join_payload(job).map_err(Error::from)?) } - pub async fn get_jobs(repo: Arc) -> EndpResult> { + pub async fn get_jobs(repo: Arc) -> EndpResult> { repo.interact(move |mut db| db.get_jobs()) .await .map_err(From::from) } - pub async fn get_agent_jobs( - repo: Arc, - id: Option, - ) -> EndpResult> { + pub async fn get_agent_jobs(repo: Arc, id: Option) -> EndpResult> { repo.interact(move |mut db| db.get_exact_jobs(id, false)) .await .map_err(From::from) @@ -65,18 +78,18 @@ impl Endpoints { match agent { Some(mut agent) => { agent.touch(); - db.update_agent(&agent)?; + db.upsert_agent(&agent)?; } None => { let new_agent = Agent::with_id(id); - db.insert_agent(&new_agent)?; + db.upsert_agent(&new_agent)?; let job = db .find_job_by_alias("agent_hello")? .expect("agent_hello job not found"); - db.set_jobs_for_agent(id, &[job.id])?; + db.set_jobs_for_agent(id, &[job.job.id])?; } } @@ -92,30 +105,23 @@ impl Endpoints { .map_err(From::from) } - pub async fn upload_jobs( - repo: Arc, - msg: Vec, - ) -> EndpResult> { + pub async fn upload_jobs(repo: Arc, msg: Vec) -> EndpResult<()> { let jobs = msg .into_iter() - .map(|meta| Ok(fat_meta_to_thin(meta)?)) - .collect::, Error>>()?; + .map(|meta| Ok(split_payload(meta)?)) + .collect::, Error>>()?; repo.interact(move |mut db| db.insert_jobs(&jobs)) .await .map_err(From::from) } - pub async fn del(repo: Arc, id: Id) -> EndpResult { + pub async fn del(repo: Arc, id: Id) -> EndpResult<()> { repo.transaction(move |mut db| { - let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; - for del_fn in del_fns { - let affected = del_fn(&mut db, &[id])?; - if affected > 0 { - return Ok(affected); - } - } - Ok(0) + [UDB::del_agents, UDB::del_jobs, UDB::del_results] + .iter() + .map(|f| f(&mut db, &[id])) + .collect::>() }) .await .map_err(From::from) @@ -125,7 +131,7 @@ impl Endpoints { repo: Arc, agent_id: Id, job_idents: Vec, - ) -> EndpResult> { + ) -> EndpResult<()> { repo.transaction(move |mut db| { job_idents .into_iter() @@ -134,7 +140,7 @@ impl Endpoints { let job_from_db = db.find_job_by_alias(&ident); match job_from_db { Ok(job) => match job { - Some(j) => Ok(j.id), + Some(j) => Ok(j.job.id), None => { Err(Error::ProcessingError(format!("unknown ident {ident}"))) } @@ -153,7 +159,7 @@ impl Endpoints { pub async fn report + AsMsg + Send + Sync + 'static>( repo: Arc, msg: Data, - agent_id: Id + agent_id: Id, ) -> EndpResult<()> { repo.transaction(move |mut db| { for entry in msg.into_vec() { @@ -165,13 +171,13 @@ impl Endpoints { continue; } result.touch(); - + info!("agent {agent_id} updated job {}", result.id); match result.exec_type { JobType::Init => { result.state = JobState::Finished; - + match &result.result { Some(rbytes) => { let mut agent: Agent = match serde_json::from_slice(&rbytes) { @@ -182,7 +188,7 @@ impl Endpoints { } }; agent.state = AgentState::Active; - db.insert_agent(&agent)?; + db.upsert_agent(&agent)?; } None => error!("Empty agent data"), }}, @@ -206,29 +212,17 @@ impl Endpoints { .map_err(From::from) } - pub async fn update_agent( - repo: Arc, - agent: Agent, - ) -> EndpResult<()> { - repo.interact(move |mut db| db.update_agent(&agent)) - .await?; + pub async fn update_agent(repo: Arc, agent: Agent) -> EndpResult<()> { + repo.interact(move |mut db| db.upsert_agent(&agent)).await?; Ok(()) } - pub async fn update_job( - repo: Arc, - job: ValidJobMeta, - ) -> EndpResult<()> { - let thin_meta = fat_meta_to_thin(job).map_err(Error::from)?; - repo.interact(move |mut db| db.update_job(&thin_meta)) - .await?; + pub async fn update_job(repo: Arc, job: JobModel) -> EndpResult<()> { + repo.interact(move |mut db| db.update_job(&job)).await?; Ok(()) } - pub async fn update_assigned_job( - repo: Arc, - assigned: AssignedJob, - ) -> EndpResult<()> { + pub async fn update_assigned_job(repo: Arc, assigned: AssignedJob) -> EndpResult<()> { repo.interact(move |mut db| db.update_result(&assigned)) .await?; Ok(()) diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index 34b37d4..40b2656 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -15,7 +15,6 @@ use std::{convert::Infallible, sync::Arc}; use u_lib::{ config, db::async_pool, - jobs::fat_meta_to_thin, messaging::{AsMsg, Reportable}, models::*, types::Id, @@ -27,9 +26,7 @@ use warp::{ Filter, Rejection, Reply, }; -use crate::handlers::Endpoints; - -type ValidJobMeta = FatJobMeta; +use crate::handlers::{Endpoints, GetJobQuery}; fn into_message(msg: M) -> Json { json(&msg) @@ -55,13 +52,14 @@ pub fn init_endpoints( let upload_jobs = path("upload_jobs") .and(with_db.clone()) - .and(body::json::>()) + .and(body::json::>()) .and_then(Endpoints::upload_jobs) .map(into_message); let get_job = path("get_job") .and(with_db.clone()) .and(warp::path::param::()) + .and(warp::query::()) .and_then(Endpoints::get_job) .map(into_message); @@ -110,7 +108,7 @@ pub fn init_endpoints( let update_job = path("update_job") .and(with_db.clone()) - .and(body::json::()) + .and(body::json::()) .and_then(Endpoints::update_job) .map(ok); @@ -156,13 +154,11 @@ pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> { let job_alias = "agent_hello"; let if_job_exists = db.find_job_by_alias(job_alias)?; if if_job_exists.is_none() { - let agent_hello = fat_meta_to_thin( - FatJobMeta::builder() - .with_type(JobType::Init) - .with_alias(job_alias) - .build() - .unwrap(), - )?; + let agent_hello = RawJob::builder() + .with_type(JobType::Init) + .with_alias(job_alias) + .build() + .unwrap(); db.insert_jobs(&[agent_hello])?; } Ok(()) diff --git a/integration/Cargo.toml b/integration/Cargo.toml index 706d97d..16ea59a 100644 --- a/integration/Cargo.toml +++ b/integration/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +futures = { version = "0.3", features = ["executor"] } once_cell = "1.10.0" reqwest = { workspace = true } rstest = "0.12" diff --git a/integration/tests/fixtures/agent.rs b/integration/tests/fixtures/agent.rs index af023f3..7e3f3df 100644 --- a/integration/tests/fixtures/agent.rs +++ b/integration/tests/fixtures/agent.rs @@ -1,36 +1,35 @@ -use crate::helpers::ENV; +use super::connections::*; +use super::run_async; use u_lib::{ - api::ClientHandler, config::get_self_id, jobs::fat_meta_to_thin, messaging::Reportable, - models::*, types::Id, + api::ClientHandler, config::get_self_id, jobs::split_payload, messaging::Reportable, models::*, + types::Id, }; pub struct RegisteredAgent { pub id: Id, } -impl RegisteredAgent { - pub async fn unregister(self) { - let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap(); - cli.del(self.id).await.unwrap(); - } -} - #[fixture] -pub async fn register_agent() -> RegisteredAgent { - let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap(); - let agent_id = get_self_id(); - println!("registering agent {agent_id}"); - let resp = cli - .get_personal_jobs(agent_id) - .await - .unwrap() - .pop() - .unwrap(); - let job_id = resp.job_id; - let job = cli.get_job(job_id).await.unwrap(); - assert_eq!(job.alias, Some("agent_hello".to_string())); - let mut agent_data = AssignedJob::from((&fat_meta_to_thin(job).unwrap(), resp)); - agent_data.set_result(&Agent::with_id(agent_id)); - cli.report(Reportable::Assigned(agent_data)).await.unwrap(); - RegisteredAgent { id: agent_id } +#[once] +pub fn registered_agent(client: &ClientHandler) -> RegisteredAgent { + run_async(async { + let agent_id = get_self_id(); + println!("registering agent {agent_id}"); + let resp = client + .get_personal_jobs(agent_id) + .await + .unwrap() + .pop() + .unwrap(); + let job_id = resp.job_id; + let job = client.get_job(job_id).await.unwrap(); + assert_eq!(job.job.alias, Some("agent_hello".to_string())); + let mut agent_data = AssignedJob::from((&split_payload(job).unwrap().job, resp)); + agent_data.set_result(&Agent::with_id(agent_id)); + client + .report(Reportable::Assigned(agent_data)) + .await + .unwrap(); + RegisteredAgent { id: agent_id } + }) } diff --git a/integration/tests/fixtures/connections.rs b/integration/tests/fixtures/connections.rs new file mode 100644 index 0000000..d4c6986 --- /dev/null +++ b/integration/tests/fixtures/connections.rs @@ -0,0 +1,28 @@ +use super::env::*; +use super::run_async; + +pub use u_lib::api::ClientHandler; +use u_lib::db::unpooled; +pub use u_lib::db::PgConnection; + +#[fixture] +#[once] +pub fn client(env_default: EndpointsEnv) -> ClientHandler { + run_async(ClientHandler::new(&env_default.u_server, None)).unwrap() +} + +#[fixture] +#[once] +pub fn client_panel(env_access: AccessEnv) -> ClientHandler { + run_async(ClientHandler::new( + &env_access.u_server, + Some(env_access.admin_auth_token), + )) + .unwrap() +} + +#[fixture] +#[once] +pub fn db(env_db: DBEnv) -> PgConnection { + unpooled(&env_db) +} diff --git a/integration/tests/fixtures/env.rs b/integration/tests/fixtures/env.rs new file mode 100644 index 0000000..4738400 --- /dev/null +++ b/integration/tests/fixtures/env.rs @@ -0,0 +1,16 @@ +pub use u_lib::config::{AccessEnv, DBEnv, EndpointsEnv}; + +#[fixture] +pub fn env_default() -> EndpointsEnv { + EndpointsEnv::load() +} + +#[fixture] +pub fn env_access() -> AccessEnv { + AccessEnv::load().unwrap() +} + +#[fixture] +pub fn env_db() -> DBEnv { + DBEnv::load().unwrap() +} diff --git a/integration/tests/fixtures/mod.rs b/integration/tests/fixtures/mod.rs index f17bc55..c2cdc85 100644 --- a/integration/tests/fixtures/mod.rs +++ b/integration/tests/fixtures/mod.rs @@ -1 +1,9 @@ pub mod agent; +pub mod connections; +pub mod env; + +use std::future::Future; + +fn run_async(fut: impl Future) -> R { + futures::executor::block_on(fut) +} diff --git a/integration/tests/helpers/mod.rs b/integration/tests/helpers/mod.rs index 3ac8a05..19ca72b 100644 --- a/integration/tests/helpers/mod.rs +++ b/integration/tests/helpers/mod.rs @@ -2,8 +2,3 @@ pub mod jobs; pub mod panel; pub use panel::Panel; - -use once_cell::sync::Lazy; -use u_lib::config::EndpointsEnv; - -pub static ENV: Lazy = Lazy::new(|| EndpointsEnv::load()); diff --git a/integration/tests/integration/api.rs b/integration/tests/integration/api.rs new file mode 100644 index 0000000..12af506 --- /dev/null +++ b/integration/tests/integration/api.rs @@ -0,0 +1 @@ +//async fn diff --git a/integration/tests/integration/behaviour.rs b/integration/tests/integration/behaviour.rs index 8faa9b3..a5f4620 100644 --- a/integration/tests/integration/behaviour.rs +++ b/integration/tests/integration/behaviour.rs @@ -9,12 +9,11 @@ use uuid::Uuid; #[rstest] #[tokio::test] -async fn registration(#[future] register_agent: RegisteredAgent) { - let agent = register_agent.await; +async fn registration(registered_agent: &RegisteredAgent) { let agents: Vec = Panel::check_output("agents read"); - let found = agents.iter().find(|v| v.id == agent.id); + let found = agents.iter().find(|v| v.id == registered_agent.id); assert!(found.is_some()); - Panel::check_status(format!("agents delete {}", agent.id)); + Panel::check_status(format!("agents delete {}", registered_agent.id)); } #[tokio::test] @@ -52,14 +51,14 @@ async fn large_payload() { let job_alias = "large_payload"; - let job = FatJobMeta::builder() + let job = RawJob::builder() .with_alias(job_alias) .with_payload_path("./tests/bin/echoer") .with_shell("{} type echo") .build() .unwrap(); - Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]); + Panel::check_status(["jobs", "create", &to_string(&RawJob::from(job)).unwrap()]); let cmd = format!("map create {agent_id} {job_alias}"); let assigned_ids: Vec = Panel::check_output(cmd); diff --git a/integration/tests/integration/connection.rs b/integration/tests/integration/connection.rs index 156a6a1..2822f86 100644 --- a/integration/tests/integration/connection.rs +++ b/integration/tests/integration/connection.rs @@ -1,14 +1,15 @@ -use crate::helpers::ENV; +use crate::fixtures::env::*; use u_lib::config::MASTER_PORT; +#[rstest] #[tokio::test] -async fn non_auth_connection_dropped() { +async fn non_auth_connection_dropped(env_default: EndpointsEnv) { let client = reqwest::ClientBuilder::new() .danger_accept_invalid_certs(true) .build() .unwrap(); match client - .get(format!("https://{}:{}", &ENV.u_server, MASTER_PORT)) + .get(format!("https://{}:{}", &env_default.u_server, MASTER_PORT)) .send() .await { diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 9d306a9..c1d18d4 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -75,11 +75,11 @@ impl ClientHandler { }) } - async fn req(&self, url: impl AsRef) -> Result { + async fn req(&self, url: impl AsRef) -> Result { self.req_with_payload(url, ()).await } - async fn req_with_payload( + async fn req_with_payload( &self, url: impl AsRef, payload: P, @@ -93,7 +93,6 @@ impl ClientHandler { .send() .await .context("error while sending request")?; - let content_len = response.content_length(); let is_success = match response.error_for_status_ref() { Ok(_) => Ok(()), Err(e) => Err(UError::from(e)), @@ -101,10 +100,7 @@ impl ClientHandler { let resp = response.text().await.context("resp")?; let result = match is_success { - Ok(_) => from_str::(&resp).or_else(|e| match content_len { - Some(0) => Ok(Default::default()), - _ => Err(UError::NetError(e.to_string(), resp)), - }), + Ok(_) => from_str::(&resp).map_err(|e| UError::NetError(e.to_string(), resp)), Err(UError::NetError(err, _)) => Err(UError::NetError(err, resp)), _ => unreachable!(), } @@ -131,12 +127,12 @@ impl ClientHandler { } /// get exact job - pub async fn get_job(&self, job: Id) -> Result> { + pub async fn get_job(&self, job: Id) -> Result { self.req(format!("get_job/{job}")).await } /// get all available jobs - pub async fn get_jobs(&self) -> Result> { + pub async fn get_jobs(&self) -> Result> { self.req("get_jobs").await } } @@ -156,7 +152,7 @@ impl ClientHandler { } /// update job - pub async fn update_job(&self, job: FatJobMeta) -> Result<()> { + pub async fn update_job(&self, job: FatJob) -> Result<()> { self.req_with_payload("update_job", job).await } @@ -166,7 +162,7 @@ impl ClientHandler { } /// create and upload job - pub async fn upload_jobs(&self, payload: impl OneOrVec>) -> Result> { + pub async fn upload_jobs(&self, payload: impl OneOrVec) -> Result> { self.req_with_payload("upload_jobs", payload.into_vec()) .await } diff --git a/lib/u_lib/src/cache.rs b/lib/u_lib/src/cache.rs index 17ccfaa..c025db4 100644 --- a/lib/u_lib/src/cache.rs +++ b/lib/u_lib/src/cache.rs @@ -1,10 +1,11 @@ -use crate::models::ThinJobMeta; +use crate::models::ThinJob; use crate::types::Id; use lazy_static::lazy_static; use parking_lot::{RwLock, RwLockReadGuard}; use std::{collections::HashMap, ops::Deref}; -type Cache = HashMap; +type Val = ThinJob; +type Cache = HashMap; lazy_static! { static ref JOB_CACHE: RwLock = RwLock::new(HashMap::new()); @@ -13,8 +14,8 @@ lazy_static! { pub struct JobCache; impl JobCache { - pub fn insert(job_meta: ThinJobMeta) { - JOB_CACHE.write().insert(job_meta.id, job_meta); + pub fn insert(job: Val) { + JOB_CACHE.write().insert(job.job.id, job); } pub fn contains(id: Id) -> bool { @@ -37,7 +38,7 @@ impl JobCache { pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Id); impl<'jh> Deref for JobCacheHolder<'jh> { - type Target = ThinJobMeta; + type Target = Val; fn deref(&self) -> &Self::Target { self.0.get(&self.1).unwrap() diff --git a/lib/u_lib/src/db.rs b/lib/u_lib/src/db.rs index 3e874d2..0c5d8ef 100644 --- a/lib/u_lib/src/db.rs +++ b/lib/u_lib/src/db.rs @@ -1,5 +1,6 @@ use deadpool_diesel::{Manager as DManager, Pool as DPool, Runtime}; -use diesel::pg::PgConnection; +pub use diesel::pg::PgConnection; +use diesel::Connection; use std::time::Duration; use crate::config::DBEnv; @@ -24,3 +25,9 @@ pub fn async_pool(config: &DBEnv) -> PgAsyncPool { .build() .unwrap() } + +pub fn unpooled(config: &DBEnv) -> PgConnection { + let db_url = generate_postgres_url(config); + + PgConnection::establish(&db_url).unwrap() +} diff --git a/lib/u_lib/src/jobs.rs b/lib/u_lib/src/jobs.rs index ea0efd3..c81bd3d 100644 --- a/lib/u_lib/src/jobs.rs +++ b/lib/u_lib/src/jobs.rs @@ -2,7 +2,7 @@ use crate::{ combined_result::CombinedResult, executor::{ExecResult, Waiter}, misc::OneOrVec, - models::{Agent, AssignedJob, AssignedJobById, FatJobMeta, JobType, Payload, ThinJobMeta}, + models::{Agent, AssignedJob, AssignedJobById, FatJob, JobType, RawJob, ThinJob}, proc_output::ProcOutput, ufs, }; @@ -16,11 +16,10 @@ pub struct AnonymousJobBatch { } impl AnonymousJobBatch { - pub fn from_meta_with_id(jobs: impl OneOrVec<(ThinJobMeta, AssignedJobById)>) -> Self { - let jobs = jobs.into_vec(); + pub fn from_meta_with_id(jobs: impl OneOrVec<(ThinJob, AssignedJobById)>) -> Self { let mut waiter = Waiter::new(); - for (meta, job) in jobs { - waiter.push(run_assigned_job(meta, job)); + for (job, ids) in jobs.into_vec() { + waiter.push(run_assigned_job(job, ids)); } Self { waiter, @@ -28,14 +27,14 @@ impl AnonymousJobBatch { } } - pub fn from_meta(metas: impl OneOrVec) -> Self { - let jobs: Vec<_> = metas + pub fn from_meta(jobs: impl OneOrVec) -> Self { + let jobs_ids: Vec<_> = jobs .into_vec() .into_iter() - .map(|meta| { - let job_id = meta.id; + .map(|job| { + let job_id = job.job.id; ( - meta, + job, AssignedJobById { job_id, ..Default::default() @@ -43,7 +42,7 @@ impl AnonymousJobBatch { ) }) .collect(); - AnonymousJobBatch::from_meta_with_id(jobs) + AnonymousJobBatch::from_meta_with_id(jobs_ids) } /// Spawn jobs @@ -85,18 +84,8 @@ impl NamedJobBatch { .into_vec() .into_iter() .filter_map(|(alias, cmd)| { - match FatJobMeta::builder() - .with_shell(cmd) - .with_alias(alias) - .build() - { - Ok(fat_meta) => match fat_meta_to_thin(fat_meta) { - Ok(thin_meta) => Some(thin_meta), - Err(e) => { - result.err(e); - None - } - }, + match RawJob::builder().with_shell(cmd).with_alias(alias).build() { + Ok(jpm) => Some(jpm), Err(e) => { result.err(e); None @@ -108,14 +97,14 @@ impl NamedJobBatch { result } - pub fn from_meta(named_jobs: impl OneOrVec) -> Self { - let (job_names, job_metas): (Vec<_>, Vec<_>) = named_jobs + pub fn from_meta(named_jobs: impl OneOrVec) -> Self { + let (job_names, jobs): (Vec<_>, Vec<_>) = named_jobs .into_vec() .into_iter() - .map(|meta| (meta.alias.clone().unwrap(), meta)) + .map(|job| (job.job.alias.clone().unwrap(), job)) .unzip(); Self { - runner: Some(AnonymousJobBatch::from_meta(job_metas)), + runner: Some(AnonymousJobBatch::from_meta(jobs)), job_names, results: HashMap::new(), } @@ -145,17 +134,18 @@ impl NamedJobBatch { } } -pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecResult { - let mut job = AssignedJob::from((&meta, ids)); - match meta.exec_type { +pub async fn run_assigned_job(job: ThinJob, ids: AssignedJobById) -> ExecResult { + let ThinJob { job, payload_meta } = job; + let mut result = AssignedJob::from((&job, ids)); + match job.exec_type { JobType::Shell => { let (argv, _prepared_payload) = { - if let Some(ref payload) = meta.payload { - let (prep_exec, prep_exec_path) = ufs::prepare_executable(payload)?; - let argv_with_exec = meta.argv.replace("{}", &prep_exec_path); + if let Some(meta) = payload_meta { + let (prep_exec, prep_exec_path) = ufs::prepare_executable(meta.name)?; + let argv_with_exec = job.argv.replace("{}", &prep_exec_path); (argv_with_exec, Some(prep_exec)) } else { - (meta.argv, None) + (job.argv, None) } }; @@ -175,69 +165,55 @@ pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecRe None, ), }; - job.result = Some(data); - job.retcode = retcode; + result.result = Some(data); + result.retcode = retcode; } JobType::Init => { - job.set_result(&Agent::run().await); - job.retcode = Some(0); + result.set_result(&Agent::run().await); + result.retcode = Some(0); } JobType::Service => todo!(), JobType::Update => todo!(), JobType::Terminate => exit(0), }; - Ok(job) + Ok(result) } -pub fn fat_meta_to_thin(meta: FatJobMeta) -> Result { - let payload_ident = if let Some(mut payload) = meta.payload { - let job_name = match &meta.alias { - Some(a) => a.to_string(), - None => meta.id.simple().to_string(), - }; - payload.write_self_into(&job_name)?; - Some(job_name) - } else { - None - }; +pub fn split_payload(job: FatJob) -> Result { + let FatJob { + job, + payload_meta, + payload_data, + } = job; - Ok(ThinJobMeta { - alias: meta.alias, - argv: meta.argv, - id: meta.id, - exec_type: meta.exec_type, - platform: meta.platform, - payload: payload_ident, - schedule: meta.schedule, - }) -} + if let Some(meta) = &payload_meta { + if !ufs::in_index(&meta.name) { + ufs::put(&meta.name, payload_data.unwrap())?; + } + } -pub fn thin_meta_to_fat(meta: ThinJobMeta) -> Result, ufs::Error> { - let payload = if let Some(payload) = meta.payload { - let mut fat_payload = Payload::Ident(payload); - fat_payload.read_into_self()?; - Some(fat_payload) - } else { - None - }; + Ok(ThinJob { job, payload_meta }) +} - Ok(FatJobMeta { - alias: meta.alias, - argv: meta.argv, - id: meta.id, - exec_type: meta.exec_type, - platform: meta.platform, - payload, - schedule: meta.schedule, +pub fn join_payload(job: ThinJob) -> Result { + let ThinJob { job, payload_meta } = job; + let payload_data = payload_meta + .as_ref() + .map(|p| ufs::read(&p.name)) + .transpose()?; + + Ok(FatJob { + job, + payload_meta, + payload_data, }) } #[cfg(test)] mod tests { - use super::*; use crate::{ jobs::{AnonymousJobBatch, NamedJobBatch}, - models::{misc::JobType, FatJobMeta}, + models::{misc::JobType, RawJob}, unwrap_enum, UError, }; use std::time::SystemTime; @@ -247,10 +223,8 @@ mod tests { #[tokio::test] async fn test_is_really_async() { const SLEEP_SECS: u64 = 1; - let job = FatJobMeta::from_shell(format!("sleep {SLEEP_SECS}")).unwrap(); - let sleep_jobs = (0..50) - .map(|_| fat_meta_to_thin(job.clone()).unwrap()) - .collect::>(); + let job = RawJob::from_shell(format!("sleep {SLEEP_SECS}")).unwrap(); + let sleep_jobs = (0..50).map(|_| job.clone()).collect::>(); let now = SystemTime::now(); AnonymousJobBatch::from_meta(sleep_jobs).wait().await; @@ -287,11 +261,11 @@ mod tests { #[case] payload: Option<&[u8]>, #[case] expected_result: &str, ) -> TestResult { - let mut job = FatJobMeta::builder().with_shell(cmd); + let mut job = RawJob::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_payload(p); } - let job = fat_meta_to_thin(job.build().unwrap()).unwrap(); + let job = job.build().unwrap(); let result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap(); let result = result.to_str_result(); assert_eq!(result.trim(), expected_result); @@ -302,23 +276,19 @@ mod tests { async fn test_complex_load() -> TestResult { const SLEEP_SECS: u64 = 1; let now = SystemTime::now(); - let longest_job = FatJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); - let longest_job = AnonymousJobBatch::from_meta(fat_meta_to_thin(longest_job).unwrap()) - .spawn() - .await; - let ls = AnonymousJobBatch::from_meta( - fat_meta_to_thin(FatJobMeta::from_shell("ls").unwrap()).unwrap(), - ) - .wait_one() - .await - .unwrap(); + let longest_job = RawJob::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); + let longest_job = AnonymousJobBatch::from_meta(longest_job).spawn().await; + let ls = AnonymousJobBatch::from_meta(RawJob::from_shell("ls").unwrap()) + .wait_one() + .await + .unwrap(); assert_eq!(ls.retcode.unwrap(), 0); let folders = ls.to_str_result(); let subfolders_jobs = folders .lines() - .map(|f| fat_meta_to_thin(FatJobMeta::from_shell(format!("ls {f}")).unwrap()).unwrap()) + .map(|f| RawJob::from_shell(format!("ls {f}")).unwrap()) .collect::>(); let ls_subfolders = AnonymousJobBatch::from_meta(subfolders_jobs).wait().await; @@ -351,7 +321,7 @@ mod tests { */ #[tokio::test] async fn test_failing_shell_job() -> TestResult { - let job = fat_meta_to_thin(FatJobMeta::from_shell("lol_kek_puk").unwrap()).unwrap(); + let job = RawJob::from_shell("lol_kek_puk").unwrap(); let job_result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap(); let output = job_result.to_str_result(); assert!(output.contains("No such file")); @@ -368,7 +338,7 @@ mod tests { #[case] payload: Option<&[u8]>, #[case] err_str: &str, ) -> TestResult { - let mut job = FatJobMeta::builder().with_shell(cmd); + let mut job = RawJob::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_payload(p); } @@ -380,23 +350,18 @@ mod tests { #[tokio::test] async fn test_different_job_types() -> TestResult { - let mut jobs = NamedJobBatch::from_meta( - [ - FatJobMeta::builder() - .with_shell("sleep 3") - .with_alias("sleeper") - .build() - .unwrap(), - FatJobMeta::builder() - .with_type(JobType::Init) - .with_alias("gatherer") - .build() - .unwrap(), - ] - .into_iter() - .map(|meta| fat_meta_to_thin(meta).unwrap()) - .collect::>(), - ) + let mut jobs = NamedJobBatch::from_meta(vec![ + RawJob::builder() + .with_shell("sleep 3") + .with_alias("sleeper") + .build() + .unwrap(), + RawJob::builder() + .with_type(JobType::Init) + .with_alias("gatherer") + .build() + .unwrap(), + ]) .wait() .await; let gathered = jobs.pop("gatherer").unwrap(); diff --git a/lib/u_lib/src/messaging/mod.rs b/lib/u_lib/src/messaging.rs similarity index 80% rename from lib/u_lib/src/messaging/mod.rs rename to lib/u_lib/src/messaging.rs index 1dc699f..88089e2 100644 --- a/lib/u_lib/src/messaging/mod.rs +++ b/lib/u_lib/src/messaging.rs @@ -1,22 +1,20 @@ -mod files; - use crate::models::*; use crate::types::Id; use crate::UError; -pub use files::*; use serde::{Deserialize, Serialize}; use std::fmt::Debug; +/// Represents types that could be used in client-server interaction pub trait AsMsg: Clone + Serialize + Debug {} impl AsMsg for Agent {} impl AsMsg for AssignedJob {} impl AsMsg for AssignedJobById {} -impl AsMsg for DownloadInfo {} -impl AsMsg for FatJobMeta {} +impl AsMsg for JobModel {} +impl AsMsg for FatJob {} impl AsMsg for Reportable {} impl AsMsg for String {} -impl AsMsg for ThinJobMeta {} +impl AsMsg for ThinJob {} impl AsMsg for Id {} impl AsMsg for i32 {} impl AsMsg for u8 {} diff --git a/lib/u_lib/src/messaging/files.rs b/lib/u_lib/src/messaging/files.rs deleted file mode 100644 index 4cdb804..0000000 --- a/lib/u_lib/src/messaging/files.rs +++ /dev/null @@ -1,8 +0,0 @@ -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Serialize, Deserialize, Clone, Debug, Default)] -pub struct DownloadInfo { - hashsum: String, - dl_fid: Uuid, -} diff --git a/lib/u_lib/src/misc.rs b/lib/u_lib/src/misc.rs index bbc6484..7d185f1 100644 --- a/lib/u_lib/src/misc.rs +++ b/lib/u_lib/src/misc.rs @@ -20,7 +20,7 @@ macro_rules! unwrap_enum { if let $t(result) = $src { result } else { - panic!("wrong type {}", stringify!($t)) + panic!("wrong type '{}'", stringify!($t)) } }; } diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index 0e302ef..041451c 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -1,4 +1,4 @@ -use super::{JobState, JobType, ThinJobMeta}; +use super::{JobModel, JobState, JobType}; #[cfg(feature = "server")] use crate::models::schema::*; use crate::{ @@ -60,8 +60,8 @@ pub struct AssignedJobById { pub job_id: Id, } -impl From<(&ThinJobMeta, AssignedJobById)> for AssignedJob { - fn from((meta, ids): (&ThinJobMeta, AssignedJobById)) -> Self { +impl From<(&JobModel, AssignedJobById)> for AssignedJob { + fn from((job, ids): (&JobModel, AssignedJobById)) -> Self { let AssignedJobById { agent_id, id, @@ -72,8 +72,8 @@ impl From<(&ThinJobMeta, AssignedJobById)> for AssignedJob { id, agent_id, job_id, - alias: meta.alias.clone(), - exec_type: meta.exec_type, + alias: job.alias.clone(), + exec_type: job.exec_type, ..Default::default() } } diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 2feca3d..5f525f0 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -1,23 +1,26 @@ use std::fmt; use super::JobType; -use crate::models::payload::Payload; +use crate::conv::bytes_to_string; #[cfg(feature = "server")] use crate::models::schema::*; +use crate::models::PayloadMeta; use crate::platform::Platform; use crate::types::Id; -use crate::{UError, UResult}; +use crate::{ufs, UError, UResult}; #[cfg(feature = "server")] use diesel::{Identifiable, Insertable, Queryable}; use serde::{Deserialize, Serialize}; +use std::fs::metadata; +use std::process::Command; -#[derive(Serialize, Deserialize, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[cfg_attr( feature = "server", derive(Queryable, Identifiable, Insertable, AsChangeset), diesel(table_name = jobs) )] -pub struct ThinJobMeta { +pub struct JobModel { pub alias: Option, /// string like `bash -c {} -a 1 --arg2`, /// where {} is replaced by executable's tmp path @@ -26,27 +29,40 @@ pub struct ThinJobMeta { pub exec_type: JobType, /// target triple pub platform: String, - pub payload: Option, + pub payload: Option, /// cron-like string pub schedule: Option, } -impl fmt::Debug for ThinJobMeta { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ThinJobMeta") - .field("alias", &self.alias) - .field("argv", &self.argv) - .field("id", &self.id.to_string()) - .field("exec_type", &self.exec_type) - .field("platform", &self.platform) - .field("payload", &self.payload) - .field("schedule", &self.schedule) - .finish() - } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FatJob { + pub job: JobModel, + pub payload_meta: Option, + pub payload_data: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ThinJob { + pub job: JobModel, + pub payload_meta: Option, } -#[derive(Serialize, Deserialize, Clone)] -pub struct FatJobMeta { +// impl fmt::Debug for ThinJobMeta { +// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +// f.debug_struct("ThinJobMeta") +// .field("alias", &self.alias) +// .field("argv", &self.argv) +// .field("id", &self.id.to_string()) +// .field("exec_type", &self.exec_type) +// .field("platform", &self.platform) +// .field("payload", &self.payload) +// .field("schedule", &self.schedule) +// .finish() +// } +// } + +#[derive(Serialize, Deserialize, Clone, Default)] +pub struct RawJob { #[serde(default)] pub alias: Option, @@ -66,16 +82,16 @@ pub struct FatJobMeta { pub platform: String, #[serde(default)] - pub payload: Option, + pub payload: Option, /// cron-like string #[serde(default)] pub schedule: Option, } -impl fmt::Debug for FatJobMeta { +impl fmt::Debug for RawJob { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FatJobMeta") + f.debug_struct("RawJob") .field("alias", &self.alias) .field("argv", &self.argv) .field("id", &self.id.to_string()) @@ -87,53 +103,75 @@ impl fmt::Debug for FatJobMeta { } } -impl FatJobMeta { - pub fn validated(self) -> UResult> { - JobMetaBuilder { inner: self }.build() +impl From for RawJob { + fn from(job: ThinJob) -> Self { + let ThinJob { job, payload_meta } = job; + RawJob { + alias: job.alias, + argv: job.argv, + id: job.id, + exec_type: job.exec_type, + platform: job.platform, + payload: payload_meta.map(|m| m.name), + schedule: job.schedule, + } } +} - pub fn from_shell(cmd: impl Into) -> UResult> { - Self::builder().with_shell(cmd).build() +impl RawJob { + pub fn validated(self) -> UResult { + JobBuilder { + inner: self, + raw_payload: None, + } + .build() } - pub fn builder() -> JobMetaBuilder { - JobMetaBuilder::default() + pub fn from_shell(cmd: impl Into) -> UResult { + Self::builder().with_shell(cmd).build() } -} -impl Default for FatJobMeta { - fn default() -> Self { - Self { - id: Id::new_v4(), - alias: None, - argv: String::new(), - exec_type: JobType::Shell, - platform: Platform::current_as_string(), - payload: None, - schedule: None, - } + pub fn builder<'p>() -> JobBuilder<'p> { + JobBuilder::default() } } +// impl Default for RawJob { +// fn default() -> Self { +// Self { +// id: Id::new_v4(), +// alias: None, +// argv: String::new(), +// exec_type: JobType::Shell, +// platform: Platform::current_as_string(), +// payload: None, +// schedule: None, +// } +// } +//} + #[derive(Default)] -pub struct JobMetaBuilder { - inner: FatJobMeta, +pub struct JobBuilder<'p> { + inner: RawJob, + raw_payload: Option<&'p [u8]>, } -impl JobMetaBuilder { +impl<'p> JobBuilder<'p> { pub fn with_shell(mut self, shell_cmd: impl Into) -> Self { self.inner.argv = shell_cmd.into(); self.inner.exec_type = JobType::Shell; self } - pub fn with_payload(mut self, payload: impl Into>) -> Self { - self.inner.payload = Some(Payload::from_payload(payload)); + pub fn with_payload(mut self, raw_payload: &'p [u8]) -> Self { + self.raw_payload = Some(raw_payload); + self.inner.payload = None; self } pub fn with_payload_path(mut self, path: impl Into) -> Self { - self.inner.payload = Some(Payload::Ident(path.into())); + self.inner.payload = Some(path.into()); + self.raw_payload = None; self } @@ -147,16 +185,42 @@ impl JobMetaBuilder { self } - pub fn build(self) -> UResult> { + pub fn build(self) -> UResult { let mut inner = self.inner; - let validated = |jmeta: FatJobMeta| FatJobMeta:: { - alias: jmeta.alias, - argv: jmeta.argv, - id: jmeta.id, - exec_type: jmeta.exec_type, - platform: jmeta.platform, - payload: jmeta.payload, - schedule: jmeta.schedule, + + let raw_into_job = |raw: RawJob| -> UResult { + let payload_id = raw.payload.as_ref().map(|_| Id::new_v4()); + + Ok(ThinJob { + job: JobModel { + alias: raw.alias, + argv: raw.argv, + id: raw.id, + exec_type: raw.exec_type, + platform: raw.platform, + payload: payload_id, + schedule: raw.schedule, + }, + payload_meta: raw + .payload + .map(|payload_path| { + Ok::<_, UError>(PayloadMeta { + id: payload_id.unwrap(), + mime_type: bytes_to_string( + &Command::new("file") + .arg("-b") + .arg("--mime-type") + .arg(&payload_path) + .output() + .map_err(|e| UError::JobBuildError(e.to_string()))? + .stdout, + ), + name: payload_path.clone(), + size: metadata(payload_path).unwrap().len() as i64, + }) + }) + .transpose()?, + }) }; match inner.exec_type { @@ -180,19 +244,28 @@ impl JobMetaBuilder { return Err(empty_err.into()); } - if let Some(payload) = &mut inner.payload { - payload.add_to_index()?; + if let Some(payload_path) = &inner.payload { + ufs::put_external(payload_path)?; + } + + if let Some(raw) = self.raw_payload { + match inner.payload { + Some(_) => { + return Err(UError::JobBuildError( + "Can't use both raw payload with payload path".to_string(), + )) + } + None => inner.payload = Some(ufs::create(raw)?), + } } match inner.payload.as_ref() { - Some(p) => { - if let Payload::Data(d) = p { - if !d.is_empty() && !inner.argv.contains("{}") { - return Err(UError::JobBuildError( - "Argv contains no executable placeholder".into(), - ) - .into()); - } + Some(_) => { + if !inner.argv.contains("{}") { + return Err(UError::JobBuildError( + "Argv contains no executable placeholder".into(), + ) + .into()); } } None => { @@ -213,9 +286,9 @@ impl JobMetaBuilder { ))); } - Ok(validated(inner)) + raw_into_job(inner) } - _ => Ok(validated(inner)), + _ => raw_into_job(inner), } } } diff --git a/lib/u_lib/src/models/payload.rs b/lib/u_lib/src/models/payload.rs index 4543ec3..46fcec8 100644 --- a/lib/u_lib/src/models/payload.rs +++ b/lib/u_lib/src/models/payload.rs @@ -1,76 +1,20 @@ -use crate::{conv::bytes_to_string_truncated, ufs}; +use crate::types::Id; use serde::{Deserialize, Serialize}; -use std::{fmt, path::PathBuf}; -#[derive(Clone, Deserialize, Serialize)] -#[serde(untagged)] -pub enum Payload { - /// Raw payload data - Data(Vec), - /// Payload identifier in ufs - Ident(String), -} - -impl Payload { - pub fn read_into_self(&mut self) -> Result<(), ufs::Error> { - match self { - Payload::Data(_) => Ok(()), - Payload::Ident(ident) => { - let data = ufs::read(ident)?; - - *self = Payload::Data(data); - Ok(()) - } - } - } - - pub fn write_self_into(&mut self, name: impl AsRef) -> Result<(), ufs::Error> { - match self { - Payload::Ident(_) => Ok(()), - Payload::Data(data) => { - ufs::put(&name, data)?; - - *self = Payload::Ident(name.as_ref().to_string()); - Ok(()) - } - } - } - - pub fn from_payload(data: impl Into>) -> Self { - Payload::Data(data.into()) - } - - pub fn from_path(path: impl Into) -> Result { - let path: PathBuf = path.into(); - - if !path.exists() || path.is_dir() { - return Err(ufs::Error::not_found(path)); - } - - ufs::put_existing(&path)?; - - Ok(Payload::Ident(path.to_string_lossy().to_string())) - } - - pub fn add_to_index(&self) -> Result<(), ufs::Error> { - match self { - Payload::Ident(ident) => Payload::from_path(ident).map(|_| ()), - _ => Ok(()), - } - } -} - -impl fmt::Debug for Payload { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Data(data) => { - let mut dbg = &mut f.debug_tuple("Data"); - let data = bytes_to_string_truncated(data, 256); - - dbg = dbg.field(&data); - dbg.finish() - } - Self::Ident(ident) => f.debug_tuple("Ident").field(ident).finish(), - } - } +#[cfg(feature = "server")] +use crate::models::schema::*; +#[cfg(feature = "server")] +use diesel::Identifiable; + +#[cfg_attr( + feature = "server", + derive(Insertable, Queryable, Identifiable), + diesel(table_name = payloads) +)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PayloadMeta { + pub id: Id, + pub mime_type: String, + pub name: String, + pub size: i64, } diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index 7e48a3b..6e5a3ef 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -36,16 +36,6 @@ diesel::table! { } } -diesel::table! { - use crate::schema_exports::*; - - certificates (id) { - agent_id -> Uuid, - id -> Uuid, - is_revoked -> Bool, - } -} - diesel::table! { use crate::schema_exports::*; use super::sql_types::Jobtype; @@ -56,11 +46,22 @@ diesel::table! { id -> Uuid, exec_type -> Jobtype, platform -> Text, - payload -> Nullable, + payload -> Nullable, schedule -> Nullable, } } +diesel::table! { + use crate::schema_exports::*; + + payloads (id) { + id -> Uuid, + mime_type -> Text, + name -> Text, + size -> Int8, + } +} + diesel::table! { use crate::schema_exports::*; use super::sql_types::Jobstate; @@ -80,8 +81,8 @@ diesel::table! { } } -diesel::joinable!(certificates -> agents (agent_id)); +diesel::joinable!(jobs -> payloads (payload)); diesel::joinable!(results -> agents (agent_id)); diesel::joinable!(results -> jobs (job_id)); -diesel::allow_tables_to_appear_in_same_query!(agents, certificates, jobs, results,); +diesel::allow_tables_to_appear_in_same_query!(agents, jobs, payloads, results,); diff --git a/lib/u_lib/src/ufs/mod.rs b/lib/u_lib/src/ufs/mod.rs index d8ec748..be35999 100644 --- a/lib/u_lib/src/ufs/mod.rs +++ b/lib/u_lib/src/ufs/mod.rs @@ -1,5 +1,5 @@ // This module is aiming to store obfuscated payloads, get them by name, -// delete or prepare to execute via memfd_create (unix) +// rename, update, delete or prepare to execute via memfd_create (unix) use once_cell::sync::Lazy; use parking_lot::RwLock; @@ -20,6 +20,7 @@ struct FileMeta { path: PathBuf, obfuscated: bool, extension: Option, + external: bool, } /// Remove deleted files from index @@ -58,8 +59,18 @@ pub fn read(name: impl AsRef) -> Result, Error> { fs::read(&meta.path).map_err(|e| Error::new(e, name)) } +pub fn create(data: impl AsRef<[u8]>) -> Result { + let name = Uuid::new_v4().simple().to_string(); + + put(&name, data)?; + + Ok(name) +} + /// Create new file and add to index pub fn put(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<(), Error> { + sync_index(); + let name = name.as_ref(); let obfuscate = !cfg!(feature = "server") && !cfg!(feature = "panel"); @@ -83,21 +94,83 @@ pub fn put(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<(), Error> { fs::write(&path, data).map_err(|e| Error::new(e, name))?; - let mut index = INDEX.write(); - index.insert( + INDEX.write().insert( name.to_string(), FileMeta { path, obfuscated: obfuscate, extension, + external: false, }, ); Ok(()) } -/// Add existing file to index -pub fn put_existing(path: impl AsRef) -> Result<(), Error> { +pub fn remove(name: impl AsRef) -> Result<(), Error> { + sync_index(); + + let name = name.as_ref(); + + match INDEX.write().remove(name) { + Some(value) => fs::remove_file(value.path).map_err(|e| Error::new(e, name)), + None => Ok(()), + } +} + +pub fn rename(old_name: impl AsRef, new_name: impl AsRef) -> Result<(), Error> { + sync_index(); + + let old_name = old_name.as_ref(); + let new_name = new_name.as_ref(); + + if old_name == new_name { + return Ok(()); + } + + if !in_index(old_name) { + return Err(Error::not_found(old_name)); + } + + if in_index(new_name) { + return Err(Error::already_exists(new_name)); + } + + let mut value = INDEX.write().remove(old_name).unwrap(); + + if !value.obfuscated { + let old_path = value.path.clone(); + + value.path.pop(); + value.path.push(new_name); + + fs::rename(old_path, &value.path).map_err(|e| Error::new(e, &value.path))?; + } + + INDEX.write().insert(new_name.to_string(), value); + + Ok(()) +} + +pub fn update_payload_data(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<(), Error> { + sync_index(); + + let name = name.as_ref(); + let external = INDEX.read().get(name).map(|v| v.external).unwrap_or(false); + + if external { + INDEX.write().remove(name); + } else if in_index(&name) { + remove(&name)?; + } + + put(name, data) +} + +/// Add an existing file to index +pub fn put_external(path: impl AsRef) -> Result<(), Error> { + sync_index(); + let path = path.as_ref(); let path_str = path.as_os_str().to_string_lossy().to_string(); @@ -109,19 +182,20 @@ pub fn put_existing(path: impl AsRef) -> Result<(), Error> { return Err(Error::already_exists(&path)); } - let mut index = INDEX.write(); - index.insert( + INDEX.write().insert( path_str, FileMeta { path: path.to_owned(), obfuscated: false, extension: path.file_stem().map(ToOwned::to_owned), + external: true, }, ); Ok(()) } +/// Prepare executable file: unpack, decipher if needed and send under memfd #[cfg(unix)] pub fn prepare_executable(name: impl AsRef) -> Result<(File, String), Error> { use libc::getpid; diff --git a/migrations/2020-10-24-111622_create_all/down.sql b/migrations/2020-10-24-111622_create_all/down.sql index 3ded775..ec60637 100644 --- a/migrations/2020-10-24-111622_create_all/down.sql +++ b/migrations/2020-10-24-111622_create_all/down.sql @@ -1,8 +1,7 @@ -DROP TABLE ip_addrs; -DROP TABLE results; -DROP TABLE certificates; -DROP TABLE jobs; -DROP TABLE agents; +DROP TABLE IF EXISTS results; +DROP TABLE IF EXISTS jobs; +DROP TABLE IF EXISTS payloads; +DROP TABLE IF EXISTS agents; DROP TYPE IF EXISTS JobState; DROP TYPE IF EXISTS JobType; diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index 41bf5e7..8e0af5a 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS agents ( alias TEXT, hostname TEXT NOT NULL, host_info TEXT NOT NULL, - id UUID NOT NULL DEFAULT uuid_generate_v4(), + id UUID NOT NULL, ip_gray TEXT, ip_white TEXT, is_root BOOLEAN NOT NULL DEFAULT false, @@ -22,15 +22,25 @@ CREATE TABLE IF NOT EXISTS agents ( PRIMARY KEY(id) ); +CREATE TABLE IF NOT EXISTS payloads ( + id UUID NOT NULL, + mime_type TEXT NOT NULL, + name TEXT NOT NULL UNIQUE, + size BIGINT NOT NULL, + + PRIMARY KEY(id) +); + CREATE TABLE IF NOT EXISTS jobs ( alias TEXT, argv TEXT NOT NULL, - id UUID NOT NULL DEFAULT uuid_generate_v4(), + id UUID NOT NULL, exec_type JobType NOT NULL DEFAULT 'shell', platform TEXT NOT NULL, - payload TEXT, + payload UUID, schedule TEXT, + FOREIGN KEY(payload) REFERENCES payloads(id) ON DELETE SET NULL, PRIMARY KEY(id) ); @@ -38,7 +48,7 @@ CREATE TABLE IF NOT EXISTS results ( agent_id UUID NOT NULL, alias TEXT, created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - id UUID NOT NULL DEFAULT uuid_generate_v4(), + id UUID NOT NULL, job_id UUID NOT NULL, result BYTEA, state JobState NOT NULL DEFAULT 'queued', @@ -49,13 +59,4 @@ CREATE TABLE IF NOT EXISTS results ( FOREIGN KEY(agent_id) REFERENCES agents(id) ON DELETE CASCADE, FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE, PRIMARY KEY(id) -); - -CREATE TABLE IF NOT EXISTS certificates ( - agent_id UUID NOT NULL, - id UUID NOT NULL DEFAULT uuid_generate_v4(), - is_revoked BOOLEAN NOT NULL DEFAULT FALSE, - - PRIMARY KEY(id), - FOREIGN KEY(agent_id) REFERENCES agents(id) ); \ No newline at end of file