From 45bba0dd9bc3d0a77ed8ea656ee46e5d3f52df15 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Sun, 30 Apr 2023 00:50:19 +0300 Subject: [PATCH] finish payload crud & tests --- Cargo.lock | 59 +++++---- bin/u_panel/src/argparse.rs | 55 ++++++--- bin/u_server/src/db.rs | 45 ++++--- bin/u_server/src/error.rs | 5 +- bin/u_server/src/handlers.rs | 90 +++++++++++--- bin/u_server/src/u_server.rs | 36 ++++-- images/u_server.Dockerfile | 2 +- integration-tests/docker-compose.yml | 4 +- .../tests/integration_tests/api.rs | 46 ------- .../tests/integration_tests/behaviour.rs | 2 +- .../tests/integration_tests/endpoints.rs | 112 ++++++++++++++++++ .../tests/integration_tests/mod.rs | 2 +- lib/u_lib/src/api.rs | 25 +++- lib/u_lib/src/logging.rs | 6 +- lib/u_lib/src/messaging.rs | 1 + lib/u_lib/src/models/jobs/meta.rs | 32 ++++- lib/u_lib/src/models/payload.rs | 49 ++++++-- lib/u_lib/src/models/schema.rs | 4 +- lib/u_lib/src/ufs/mod.rs | 8 +- .../2020-10-24-111622_create_all/up.sql | 4 +- 20 files changed, 419 insertions(+), 168 deletions(-) delete mode 100644 integration-tests/tests/integration_tests/api.rs create mode 100644 integration-tests/tests/integration_tests/endpoints.rs diff --git a/Cargo.lock b/Cargo.lock index 6f15ca8..c65af22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -815,9 +815,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.25" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" dependencies = [ "crc32fast", "miniz_oxide", @@ -1344,9 +1344,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36eb31c1778188ae1e64398743890d0877fef36d11521ac60406b42016e8c2cf" +checksum = "b64f40e5e03e0d54f03845c8197d0291253cdbedfb1cb46b13c2c117554a9f4c" [[package]] name = "local-channel" @@ -1459,9 +1459,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.6.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" dependencies = [ "adler", ] @@ -1479,17 +1479,21 @@ dependencies = [ ] [[package]] -name = "multiparty" -version = "0.1.0" +name = "multer" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed1ec6589a6d4a1e0b33b4c0a3f6ee96dfba88ebdb3da51403fd7cf0a24a4b04" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" dependencies = [ "bytes", - "futures-core", + "encoding_rs", + "futures-util", + "http", "httparse", + "log", "memchr", - "pin-project-lite", - "try-lock", + "mime", + "spin 0.9.8", + "version_check", ] [[package]] @@ -1873,9 +1877,9 @@ checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" [[package]] name = "reqwest" -version = "0.11.16" +version = "0.11.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27b71749df584b7f4cac2c426c127a7c785a5106cc98f7a8feb044115f0fa254" +checksum = "13293b639a097af28fc8a90f22add145a9c954e49d77da06263d58cf44d5fb91" dependencies = [ "base64 0.21.0", "bytes", @@ -1923,7 +1927,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -2020,9 +2024,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.15" +version = "0.37.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0661814f891c57c930a610266415528da53c4933e6dea5fb350cbfe048a9ece" +checksum = "bc809f704c03a812ac71f22456c857be34185cac691a4316f27ab0f633bb9009" dependencies = [ "bitflags", "errno 0.3.1", @@ -2278,6 +2282,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "strsim" version = "0.8.0" @@ -2606,10 +2616,11 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.38" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9cf6a813d3f40c88b0b6b6f29a5c95c6cdbf97c1f9cc53fb820200f5ad814d" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ + "cfg-if 1.0.0", "log", "pin-project-lite", "tracing-attributes", @@ -2878,9 +2889,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" +checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2" dependencies = [ "getrandom", "serde", @@ -2938,9 +2949,9 @@ dependencies = [ [[package]] name = "warp" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27e1a710288f0f91a98dd8a74f05b76a10768db245ce183edf64dc1afdc3016c" +checksum = "ba431ef570df1287f7f8b07e376491ad54f84d26ac473489427231e1718e1f69" dependencies = [ "bytes", "futures-channel", @@ -2951,7 +2962,7 @@ dependencies = [ "log", "mime", "mime_guess", - "multiparty", + "multer", "percent-encoding", "pin-project", "rustls-pemfile", diff --git a/bin/u_panel/src/argparse.rs b/bin/u_panel/src/argparse.rs index 0863996..e528aad 100644 --- a/bin/u_panel/src/argparse.rs +++ b/bin/u_panel/src/argparse.rs @@ -1,12 +1,7 @@ use serde_json::{from_str, to_value, Value}; use structopt::StructOpt; use u_lib::{ - api::HttpClient, - messaging::AsMsg, - models::{Agent, AssignedJob, BriefMode, RawJob}, - types::Id, - types::PanelResult, - UError, UResult, + api::HttpClient, messaging::AsMsg, models::*, types::Id, types::PanelResult, UError, UResult, }; #[derive(StructOpt, Debug)] @@ -20,17 +15,17 @@ pub struct Args { #[derive(StructOpt, Debug)] enum Cmd { Agents(RUD), - Jobs(JobCRUD), + Jobs(CRUD), Map(MapCRUD), - Payloads(RUD), + Payloads(PayloadCRUD), Ping, Serve, } #[derive(StructOpt, Debug)] -enum JobCRUD { +enum CRUD { Create { - job: String, + item: String, }, #[structopt(flatten)] RUD(RUD), @@ -48,6 +43,23 @@ enum MapCRUD { RUD(RUD), } +#[derive(StructOpt, Debug)] +enum PayloadCRUD { + Create { + item: String, + }, + Read { + id: Option, + }, + Update { + item: String, + }, + Delete { + #[structopt(parse(try_from_str = parse_uuid))] + id: Id, + }, +} + #[derive(StructOpt, Debug)] enum RUD { Read { @@ -84,7 +96,7 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult { RUD::Delete { id } => into_value(client.del(id).await?), }, Cmd::Jobs(action) => match action { - JobCRUD::Create { job } => { + CRUD::Create { item: job } => { let raw_job = from_str::(&job) .map_err(|e| UError::DeserializeError(e.to_string(), job))?; let mut job = raw_job.validated()?; @@ -95,11 +107,11 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult { into_value(client.upload_jobs([&job]).await?) } - JobCRUD::RUD(RUD::Read { id }) => match id { + CRUD::RUD(RUD::Read { id }) => match id { Some(id) => into_value(vec![client.get_job(id, args.brief).await?]), None => into_value(client.get_jobs().await?), }, - JobCRUD::RUD(RUD::Update { item }) => { + CRUD::RUD(RUD::Update { item }) => { let raw_job = from_str::(&item) .map_err(|e| UError::DeserializeError(e.to_string(), item))?; let mut job = raw_job.validated()?; @@ -110,7 +122,7 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult { into_value(client.update_job(&job).await?) } - JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), + CRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), }, Cmd::Map(action) => match action { MapCRUD::Create { @@ -126,12 +138,21 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult { MapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), }, Cmd::Payloads(action) => match action { - RUD::Read { id } => match id { + PayloadCRUD::Create { item } => { + let payload = from_str::(&item) + .map_err(|e| UError::DeserializeError(e.to_string(), item))?; + into_value(client.upload_payloads([&payload]).await?) + } + PayloadCRUD::Read { id } => match id { None => into_value(client.get_payloads().await?), Some(id) => into_value(client.get_payload(id, args.brief).await?), }, - RUD::Update { item: _item } => todo!(), - RUD::Delete { id } => into_value(client.del(id).await?), + PayloadCRUD::Update { item } => { + let payload = from_str::(&item) + .map_err(|e| UError::DeserializeError(e.to_string(), item))?; + into_value(client.update_payload(&payload).await?) + } + PayloadCRUD::Delete { id } => into_value(client.del(id).await?), }, Cmd::Ping => into_value(client.ping().await?), Cmd::Serve => { diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index a417b49..9228529 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -50,28 +50,24 @@ pub struct UDB<'c> { } impl UDB<'_> { - pub fn insert_jobs(&mut self, jobs: &[Job]) -> Result<()> { - use schema::{jobs, payloads}; + pub fn insert_jobs(&mut self, jobs: &[JobModel]) -> Result<()> { + use schema::jobs; - let (jobs, payloads_opt): (Vec<_>, Vec<_>) = - jobs.iter().map(|j| (&j.job, j.payload.as_ref())).unzip(); + diesel::insert_into(jobs::table) + .values(jobs) + .execute(self.conn) + .map(drop) + .map_err(with_err_ctx("Can't insert jobs")) + } - let payloads = payloads_opt - .into_iter() - .filter_map(|p| p) - .collect::>(); + pub fn insert_payloads(&mut self, payloads: &[Payload]) -> Result<()> { + use schema::payloads; diesel::insert_into(payloads::table) .values(payloads) .execute(self.conn) .map(drop) - .map_err(with_err_ctx("Can't insert payloads"))?; - - diesel::insert_into(jobs::table) - .values(jobs) - .execute(self.conn) - .map(drop) - .map_err(with_err_ctx("Can't insert jobs")) + .map_err(with_err_ctx("Can't insert payloads")) } pub fn get_job(&mut self, id: Id) -> Result> { @@ -105,6 +101,16 @@ impl UDB<'_> { .map_err(with_err_ctx(format!("Can't get payload {id}"))) } + pub fn get_payload_by_name(&mut self, name: String) -> Result> { + use schema::payloads; + + payloads::table + .filter(payloads::name.eq(&name)) + .first(self.conn) + .optional() + .map_err(with_err_ctx(format!("Can't get payload by name {name}"))) + } + pub fn get_payloads(&mut self) -> Result> { use schema::payloads; @@ -113,7 +119,7 @@ impl UDB<'_> { .map_err(with_err_ctx("Can't get payloads")) } - pub fn find_job_by_alias(&mut self, alias: &str) -> Result> { + pub fn get_job_by_alias(&mut self, alias: &str) -> Result> { use schema::{jobs, payloads}; let maybe_job_with_payload = jobs::table @@ -303,6 +309,13 @@ impl UDB<'_> { Ok(()) } + pub fn update_payload(&mut self, payload: &Payload) -> Result<()> { + payload + .save_changes::(self.conn) + .map_err(with_err_ctx(format!("Can't update payload {payload:?}")))?; + Ok(()) + } + pub fn update_result(&mut self, result: &AssignedJob) -> Result<()> { debug!( "updating result: id = {}, job_id = {}, agent_id = {}", diff --git a/bin/u_server/src/error.rs b/bin/u_server/src/error.rs index 4a958dc..b2c4382 100644 --- a/bin/u_server/src/error.rs +++ b/bin/u_server/src/error.rs @@ -1,6 +1,6 @@ use diesel::result::Error as DslError; use thiserror::Error; -use u_lib::ufs; +use u_lib::{ufs, UError}; use warp::{ http::StatusCode, reject::Reject, @@ -34,6 +34,9 @@ pub enum Error { #[error("{0}\nContext: {1}")] Contexted(Box, String), + #[error(transparent)] + UError(#[from] UError), + #[error("Runtime error: {0}")] Runtime(String), } diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 1a52b61..2ce2e2e 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -83,15 +83,25 @@ impl Endpoints { pub async fn get_payload( repo: Arc, - id: Id, + name_or_id: String, params: Option, ) -> EndpResult { - let Some(mut payload) = repo.interact(move |mut db| db.get_payload(id)).await? else { - return Err(not_found()) + let mut payload = match repo + .interact(move |mut db| match Id::parse_str(&name_or_id) { + Ok(id) => db.get_payload(id), + Err(_) => db.get_payload_by_name(name_or_id), + }) + .await? + { + Some(p) => p, + None => return Err(not_found()), }; Ok(match params.map(|p| p.brief) { - Some(BriefMode::Yes) => payload, + Some(BriefMode::Yes) => { + payload.data = None; + payload + } None | Some(BriefMode::Auto) => { payload.maybe_join_payload().map_err(Error::from)?; payload @@ -121,7 +131,7 @@ impl Endpoints { db.upsert_agent(&new_agent)?; let job = db - .find_job_by_alias("agent_hello")? + .get_job_by_alias("agent_hello")? .expect("agent_hello job not found"); db.set_jobs_for_agent(id, &[job.job.id])?; @@ -154,7 +164,33 @@ impl Endpoints { }) .collect::, Error>>()?; - repo.interact(move |mut db| db.insert_jobs(&jobs)) + let (jobs, payloads_opt): (Vec<_>, Vec<_>) = + jobs.into_iter().map(|j| (j.job, j.payload)).unzip(); + + let payloads = payloads_opt + .into_iter() + .filter_map(|p| p) + .collect::>(); + + repo.transaction(move |mut db| { + db.insert_payloads(&payloads)?; + db.insert_jobs(&jobs) + }) + .await + .map_err(From::from) + } + + pub async fn upload_payloads( + repo: Arc, + raw_payloads: Vec, + ) -> EndpResult { + let payloads = raw_payloads + .into_iter() + .map(|raw| raw.into_payload()) + .collect::, _>>() + .map_err(Error::from)?; + + repo.interact(move |mut db| db.insert_payloads(&payloads)) .await .map_err(From::from) } @@ -185,7 +221,7 @@ impl Endpoints { .into_iter() .map(|ident| { Id::parse_str(&ident).or_else(|_| { - let job_from_db = db.find_job_by_alias(&ident); + let job_from_db = db.get_job_by_alias(&ident); match job_from_db { Ok(job) => match job { Some(j) => Ok(j.job.id), @@ -262,17 +298,15 @@ impl Endpoints { } pub async fn update_agent(repo: Arc, agent: Agent) -> EndpResult { - repo.interact(move |mut db| db.upsert_agent(&agent)).await?; - Ok(()) + repo.interact(move |mut db| db.upsert_agent(&agent)) + .await + .map_err(From::from) } - pub async fn update_job(repo: Arc, mut job: Job) -> EndpResult { - if let Some(payload) = &mut job.payload { - payload.maybe_split_payload().map_err(Error::from)?; - } - - repo.interact(move |mut db| db.update_job(&job.job)).await?; - Ok(()) + pub async fn update_job(repo: Arc, job: Job) -> EndpResult { + repo.interact(move |mut db| db.update_job(&job.job)) + .await + .map_err(From::from) } pub async fn update_assigned_job( @@ -280,7 +314,27 @@ impl Endpoints { assigned: AssignedJob, ) -> EndpResult { repo.interact(move |mut db| db.update_result(&assigned)) - .await?; - Ok(()) + .await + .map_err(From::from) + } + + pub async fn update_payload( + repo: Arc, + payload: Payload, + ) -> EndpResult { + match payload.data { + Some(data) => { + let mut well_formed_payload = + Payload::from_data(data, Some(&payload.name)).map_err(Error::from)?; + well_formed_payload.id = payload.id; + + debug!("wf payload: {well_formed_payload:?}"); + + repo.interact(move |mut db| db.update_payload(&well_formed_payload)) + .await + .map_err(From::from) + } + None => return Ok(()), + } } } diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index ce1cd9e..87dfe6a 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -22,7 +22,7 @@ use warp::{ Filter, Rejection, Reply, }; -const DEFAULT_RESP: &str = "null"; +const DEFAULT_RESPONSE: &str = "null"; use crate::handlers::{Endpoints, PayloadFlags}; @@ -132,14 +132,26 @@ pub fn init_endpoints( let get_payload = path("get_payload") .and(with_db.clone()) - .and(warp::path::param::()) + .and(warp::path::param::()) .and(make_optional(serde_qs::warp::query::( create_qs_cfg(), ))) .and_then(Endpoints::get_payload) .map(into_message); - let ping = path("ping").map(|| DEFAULT_RESP); + let upload_payloads = path("upload_payloads") + .and(with_db.clone()) + .and(body::json::>()) + .and_then(Endpoints::upload_payloads) + .map(ok); + + let update_payload = path("update_payload") + .and(with_db.clone()) + .and(body::json::()) + .and_then(Endpoints::update_payload) + .map(ok); + + let ping = path("ping").map(|| DEFAULT_RESPONSE); let auth_token = format!("Bearer {auth_token}",).into_boxed_str(); let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); @@ -147,13 +159,17 @@ pub fn init_endpoints( let auth_zone = (get_agents .or(get_job.clone()) .or(get_jobs.clone()) + .or(get_payloads) + .or(get_payload) .or(upload_jobs) + .or(upload_payloads) .or(del) .or(set_jobs) .or(get_assigned_jobs) - .or(update_agent.or(update_job).or(update_assigned_job)) - .or(get_payloads) - .or(get_payload) + .or(update_agent) + .or(update_job) + .or(update_assigned_job) + .or(update_payload) .or(ping)) .and(auth_header); @@ -165,14 +181,14 @@ pub fn init_endpoints( pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> { repo.interact(|mut db| { let job_alias = "agent_hello"; - let if_job_exists = db.find_job_by_alias(job_alias)?; + let if_job_exists = db.get_job_by_alias(job_alias)?; if if_job_exists.is_none() { let agent_hello = RawJob::builder() .with_type(JobType::Init) .with_alias(job_alias) .build() .unwrap(); - db.insert_jobs(&[agent_hello])?; + db.insert_jobs(&[agent_hello.job])?; } Ok(()) }) @@ -228,13 +244,13 @@ fn logger(info: Info<'_>) { .take(2) .collect::() ) - .unwrap_or_else(|| "NO_AGENT".to_string()), + .unwrap_or_else(|| "NO_AGENT_UID".to_string()), status = info.status() ); } fn ok(_: T) -> impl Reply { - DEFAULT_RESP + DEFAULT_RESPONSE } /* diff --git a/images/u_server.Dockerfile b/images/u_server.Dockerfile index 122ef4d..d9f21d6 100644 --- a/images/u_server.Dockerfile +++ b/images/u_server.Dockerfile @@ -1,3 +1,3 @@ FROM alpine:3.17 -RUN apk add iproute2 bash \ No newline at end of file +RUN apk add iproute2 bash file \ No newline at end of file diff --git a/integration-tests/docker-compose.yml b/integration-tests/docker-compose.yml index db44663..bbd3424 100644 --- a/integration-tests/docker-compose.yml +++ b/integration-tests/docker-compose.yml @@ -28,7 +28,7 @@ services: - ../.env - ../.env.private environment: - RUST_LOG: warp=info,u_server_lib=debug + RUST_LOG: warp=info,u_server_lib=debug,u_lib=debug healthcheck: test: ss -tlpn | grep 63714 interval: 5s @@ -100,5 +100,5 @@ services: - ../.env.private environment: RUST_BACKTRACE: 1 - RUST_LOG: debug,hyper=info,reqwest=info + RUST_LOG: hyper=info,reqwest=info U_SERVER: u_server \ No newline at end of file diff --git a/integration-tests/tests/integration_tests/api.rs b/integration-tests/tests/integration_tests/api.rs deleted file mode 100644 index 734ea31..0000000 --- a/integration-tests/tests/integration_tests/api.rs +++ /dev/null @@ -1,46 +0,0 @@ -// get_personal_jobs(&self, url_param: Id) -// report(&self, payload: impl OneOrVec) -// dl(&self, file: String) -// get_job(&self, job: Id) -// get_jobs(&self) -// get_agents(&self, agent: Option) -// update_agent(&self, agent: Agent) -// update_job(&self, job: FatJob) -// update_result(&self, result: AssignedJob) -// upload_jobs(&self, payload: impl OneOrVec) -// del(&self, item: Id) -// set_jobs(&self, agent: Id, job_idents: impl OneOrVec) -// get_agent_jobs(&self, agent: Option) -// ping(&self) - -use crate::fixtures::connections::*; -use u_lib::models::RawJob; - -#[rstest] -#[tokio::test] -async fn test_jobs_endpoints(client_panel: &HttpClient) { - let job_alias = "henlo"; - let mut job = RawJob::builder() - .with_shell("echo henlo") - .with_alias(job_alias) - .build() - .unwrap(); - - let job_id = job.job.id; - - client_panel.upload_jobs([&job]).await.unwrap(); - - let fetched_job = client_panel.get_brief_job(job_id).await.unwrap(); - assert_eq!(job, fetched_job); - - job.job.alias = Some("henlo2".to_string()); - client_panel.update_job(&job).await.unwrap(); - - let fetched_job = client_panel.get_brief_job(job_id).await.unwrap(); - assert_eq!(job, fetched_job); - - client_panel.del(job_id).await.unwrap(); - - let not_found_err = client_panel.get_brief_job(job_id).await.unwrap_err(); - assert!(not_found_err.to_string().contains("404 Not Found")) -} diff --git a/integration-tests/tests/integration_tests/behaviour.rs b/integration-tests/tests/integration_tests/behaviour.rs index 51ea208..f3d94ed 100644 --- a/integration-tests/tests/integration_tests/behaviour.rs +++ b/integration-tests/tests/integration_tests/behaviour.rs @@ -23,7 +23,7 @@ async fn setup_tasks() { let job_alias = "passwd_contents"; let job = RawJob::builder() .with_alias(job_alias) - .with_raw_payload(b"cat /etc/passwd".as_slice()) + .with_raw_payload("cat /etc/passwd") .with_shell("/bin/bash {}") .with_target_platforms("*linux*") .build() diff --git a/integration-tests/tests/integration_tests/endpoints.rs b/integration-tests/tests/integration_tests/endpoints.rs new file mode 100644 index 0000000..96bf960 --- /dev/null +++ b/integration-tests/tests/integration_tests/endpoints.rs @@ -0,0 +1,112 @@ +// get_personal_jobs(&self, url_param: Id) +// report(&self, payload: impl OneOrVec) +// dl(&self, file: String) +// get_job(&self, job: Id) +// get_jobs(&self) +// get_agents(&self, agent: Option) +// update_agent(&self, agent: Agent) +// update_job(&self, job: FatJob) +// update_result(&self, result: AssignedJob) +// upload_jobs(&self, payload: impl OneOrVec) +// del(&self, item: Id) +// set_jobs(&self, agent: Id, job_idents: impl OneOrVec) +// get_agent_jobs(&self, agent: Option) +// ping(&self) + +use crate::fixtures::connections::*; +use std::iter::repeat; +use u_lib::models::{BriefMode, RawJob, RawPayload, MAX_READABLE_PAYLOAD_SIZE}; + +#[rstest] +#[tokio::test] +async fn jobs_upload_update_get_del(client_panel: &HttpClient) { + let job_alias = "henlo"; + let mut job = RawJob::builder() + .with_shell("/bin/bash {}") + .with_raw_payload("echo henlo") + .with_alias(job_alias) + .build() + .unwrap(); + + let job_id = job.job.id; + + client_panel.upload_jobs([&job]).await.unwrap(); + + let fetched_job = client_panel.get_full_job(job_id).await.unwrap(); + assert_eq!(job, fetched_job); + + // update job's payload by edit existing does nothing, + // editing is only allowed from payload itself + *job.payload.as_mut().unwrap().data.as_mut().unwrap() = b"echo henlo2".to_vec(); + client_panel.update_job(&job).await.unwrap(); + + let fetched_job = client_panel.get_full_job(job_id).await.unwrap(); + assert_eq!( + fetched_job.payload.as_ref().unwrap().data.as_ref().unwrap(), + b"echo henlo" + ); + + client_panel.del(job_id).await.unwrap(); + + let not_found_err = client_panel.get_brief_job(job_id).await.unwrap_err(); + assert!(not_found_err.to_string().contains("404 Not Found")) +} + +#[rstest] +#[tokio::test] +async fn payloads_upload_update_get_del(client_panel: &HttpClient) { + let name = "test1".to_string(); + let data = b"qweasdzxc".to_vec(); + let payload = RawPayload { + name: name.clone(), + data: data.clone(), + }; + + client_panel.upload_payloads([&payload]).await.unwrap(); + + let mut fetched_payload = client_panel + .get_payload(&name, BriefMode::No) + .await + .unwrap(); + let fetched_payload_auto = client_panel + .get_payload(&name, BriefMode::Auto) + .await + .unwrap(); + + assert_eq!(fetched_payload, fetched_payload_auto); + assert_eq!(fetched_payload.data.unwrap(), data); + + let new_size = MAX_READABLE_PAYLOAD_SIZE + 1; + let big_data = repeat(1u8).take(new_size as usize).collect::>(); + + fetched_payload.data = Some(big_data.clone()); + client_panel.update_payload(&fetched_payload).await.unwrap(); + + let fetched_big_payload = client_panel + .get_payload(&name, BriefMode::Yes) + .await + .unwrap(); + let fetched_big_payload_auto = client_panel + .get_payload(&name, BriefMode::Auto) + .await + .unwrap(); + + assert_eq!(fetched_big_payload, fetched_big_payload_auto); + assert_eq!(fetched_big_payload.size, new_size); + assert!(fetched_big_payload.data.is_none()); + + let fetched_big_payload_full = client_panel + .get_payload(&name, BriefMode::No) + .await + .unwrap(); + + assert_eq!(fetched_big_payload_full.data.unwrap(), big_data); + + client_panel.del(fetched_big_payload_full.id).await.unwrap(); + + let not_found_err = client_panel + .get_payload(&name, BriefMode::Yes) + .await + .unwrap_err(); + assert!(not_found_err.to_string().contains("404 Not Found")) +} diff --git a/integration-tests/tests/integration_tests/mod.rs b/integration-tests/tests/integration_tests/mod.rs index 117235a..bbc23d4 100644 --- a/integration-tests/tests/integration_tests/mod.rs +++ b/integration-tests/tests/integration_tests/mod.rs @@ -1,3 +1,3 @@ -mod api; mod behaviour; mod connection; +mod endpoints; diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 9e5c70b..deca48a 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -31,7 +31,9 @@ pub mod retypes { pub type UpdateAgent = (); pub type UpdateJob = (); pub type UpdateResult = (); + pub type UpdatePayload = (); pub type UploadJobs = (); + pub type UploadPayloads = (); pub type Del = (); pub type SetJobs = Vec; pub type GetAgentJobs = Vec; @@ -199,12 +201,24 @@ impl HttpClient { self.req_with_payload("update_result", result).await } + pub async fn update_payload(&self, payload: &Payload) -> Result { + self.req_with_payload("update_payload", payload).await + } + /// create and upload job pub async fn upload_jobs( &self, - payload: impl IntoIterator, + jobs: impl IntoIterator, ) -> Result { - self.req_with_payload("upload_jobs", &payload.into_iter().collect::>()) + self.req_with_payload("upload_jobs", &jobs.into_iter().collect::>()) + .await + } + + pub async fn upload_payloads( + &self, + payload: impl IntoIterator, + ) -> Result { + self.req_with_payload("upload_payloads", &payload.into_iter().collect::>()) .await } @@ -239,7 +253,12 @@ impl HttpClient { self.req("get_payloads").await } - pub async fn get_payload(&self, payload: Id, brief: BriefMode) -> Result { + pub async fn get_payload( + &self, + payload: impl AsRef, + brief: BriefMode, + ) -> Result { + let payload = payload.as_ref(); self.req(format!("get_payload/{payload}?brief={brief}")) .await } diff --git a/lib/u_lib/src/logging.rs b/lib/u_lib/src/logging.rs index 018df38..e552f18 100644 --- a/lib/u_lib/src/logging.rs +++ b/lib/u_lib/src/logging.rs @@ -10,10 +10,12 @@ pub fn init_logger(logfile: Option<&str>) { env::set_var("RUST_LOG", "info") } + let layer = fmt::layer().with_line_number(true); + let output_layer = if cfg!(test) { - fmt::layer().with_test_writer().boxed() + layer.with_test_writer().boxed() } else { - fmt::layer().with_writer(stderr).boxed() + layer.with_writer(stderr).boxed() }; let reg = registry() diff --git a/lib/u_lib/src/messaging.rs b/lib/u_lib/src/messaging.rs index 721bbb8..b7a0318 100644 --- a/lib/u_lib/src/messaging.rs +++ b/lib/u_lib/src/messaging.rs @@ -13,6 +13,7 @@ impl AsMsg for AssignedJobById {} impl AsMsg for JobModel {} impl AsMsg for Reportable {} impl AsMsg for Payload {} +impl AsMsg for RawPayload {} impl AsMsg for Job {} impl AsMsg for Id {} impl AsMsg for String {} diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 1d79122..c419cf5 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -27,7 +27,7 @@ pub struct JobModel { pub exec_type: JobType, /// target triple pub target_platforms: String, - pub payload: Option, + pub payload_id: Option, /// cron-like string pub schedule: Option, } @@ -144,8 +144,8 @@ impl<'p> JobBuilder<'p> { self } - pub fn with_raw_payload(mut self, raw_payload: impl Into>) -> Self { - self.inner.raw_payload = Some(raw_payload.into()); + pub fn with_raw_payload(mut self, raw_payload: impl AsPayload<'p>) -> Self { + self.inner.raw_payload = Some(raw_payload.as_payload()); self.inner.payload_path = None; self } @@ -185,7 +185,7 @@ impl<'p> JobBuilder<'p> { if payload_from_path.is_none() { job.raw_payload .as_ref() - .map(|data| Payload::from_data(data)) + .map(|data| Payload::from_data(data, None)) .transpose()? } else { payload_from_path @@ -199,7 +199,7 @@ impl<'p> JobBuilder<'p> { id: job.id, exec_type: job.exec_type, target_platforms: job.target_platforms, - payload: payload.as_ref().map(|p| p.id), + payload_id: payload.as_ref().map(|p| p.id), schedule: job.schedule, }, payload, @@ -271,3 +271,25 @@ impl<'p> JobBuilder<'p> { } } } + +pub trait AsPayload<'p> { + fn as_payload(&self) -> Cow<'p, [u8]>; +} + +impl<'p> AsPayload<'p> for &'p str { + fn as_payload(&self) -> Cow<'p, [u8]> { + Cow::Borrowed(self.as_bytes()) + } +} + +impl<'p, const N: usize> AsPayload<'p> for &'p [u8; N] { + fn as_payload(&self) -> Cow<'p, [u8]> { + Cow::Borrowed(*self) + } +} + +impl<'p> AsPayload<'p> for &'p [u8] { + fn as_payload(&self) -> Cow<'p, [u8]> { + Cow::Borrowed(self) + } +} diff --git a/lib/u_lib/src/models/payload.rs b/lib/u_lib/src/models/payload.rs index 2b162fe..211a5aa 100644 --- a/lib/u_lib/src/models/payload.rs +++ b/lib/u_lib/src/models/payload.rs @@ -3,17 +3,30 @@ use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use std::{fs::File, path::Path, process::Command}; -const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32; +pub const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32; #[cfg(feature = "server")] use crate::models::schema::*; #[cfg(feature = "server")] use diesel::Identifiable; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RawPayload { + pub name: String, + pub data: Vec, +} + +impl RawPayload { + pub fn into_payload(self) -> Result { + Payload::from_data(self.data, Some(&self.name)) + } +} + #[cfg_attr( feature = "server", - derive(Insertable, Queryable, Identifiable), - diesel(table_name = payloads) + derive(Insertable, Queryable, Identifiable, AsChangeset), + diesel(table_name = payloads), + diesel(treat_none_as_null = true) )] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Payload { @@ -21,7 +34,7 @@ pub struct Payload { pub mime_type: String, pub name: String, pub size: i64, - data: Option>, // when None, payload data is stored in ufs + pub data: Option>, // when None, payload data is stored in ufs } impl Payload { @@ -58,21 +71,27 @@ impl Payload { Ok(()) } - pub fn from_data(data: impl AsRef<[u8]>) -> Result { - let name = ufs::create_anonymous(data)?; - let meta = ufs::read_meta(&name)?; + pub fn from_data(data: impl AsRef<[u8]>, name: Option<&str>) -> Result { + let name = match name { + Some(name) => { + ufs::put(name, data).context("fr_put")?; + name.to_string() + } + None => ufs::create_anonymous(data).context("fr_anon")?, + }; + let meta = ufs::read_meta(&name).context("fr_me")?; + + debug!("from_data {meta:?}"); let mut payload = Payload { id: Id::new_v4(), - mime_type: get_mime_type(&meta.path)?, - name: name.clone(), + mime_type: get_mime_type(&meta.path).context(format!("fr_mi {:?}", &meta.path))?, + name, size: meta.size as i64, data: None, }; - if payload.is_human_readable() { - payload.join_payload()?; - } + payload.maybe_join_payload().context("fr_ma")?; Ok(payload) } @@ -149,11 +168,15 @@ impl Payload { } fn get_mime_type(path: impl AsRef) -> Result { + let path = path.as_ref(); + + debug!("mime of {path:?}"); + Ok(bytes_to_string( &Command::new("file") .arg("-b") .arg("--mime-type") - .arg(path.as_ref()) + .arg(path) .output() .map_err(|e| UError::IOError(e.to_string()))? .stdout, diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index 9908f4d..53a01fc 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -46,7 +46,7 @@ diesel::table! { id -> Uuid, exec_type -> Jobtype, target_platforms -> Text, - payload -> Nullable, + payload_id -> Nullable, schedule -> Nullable, } } @@ -82,7 +82,7 @@ diesel::table! { } } -diesel::joinable!(jobs -> payloads (payload)); +diesel::joinable!(jobs -> payloads (payload_id)); diesel::joinable!(results -> agents (agent_id)); diesel::joinable!(results -> jobs (job_id)); diff --git a/lib/u_lib/src/ufs/mod.rs b/lib/u_lib/src/ufs/mod.rs index c6023c5..96f0bbd 100644 --- a/lib/u_lib/src/ufs/mod.rs +++ b/lib/u_lib/src/ufs/mod.rs @@ -15,7 +15,7 @@ pub use error::Error; const OBFUSCATE: bool = cfg!(feature = "agent"); -#[derive(Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct FileMeta { extension: Option, external: bool, // if file is present before adding to index @@ -86,9 +86,7 @@ pub fn put(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<()> { let name = name.as_ref(); let data_hash = hash_data(&data); - if exists_in_index(&name) { - return Err(Error::already_exists(&name)).context("put_exists"); - } + debug!("put: {name}"); let path = match index::get_by_hash(&data_hash) { Some((_, meta)) => meta.path, @@ -105,6 +103,8 @@ pub fn put(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<()> { path }; + debug!("put: path: {path:?}"); + fs::write(&path, &data) .map_err(|e| Error::new(e, name)) .context("put_write")?; diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index 49fd6cf..819c7b6 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -38,10 +38,10 @@ CREATE TABLE IF NOT EXISTS jobs ( id UUID NOT NULL, exec_type JobType NOT NULL DEFAULT 'shell', target_platforms TEXT NOT NULL, - payload UUID, + payload_id UUID, schedule TEXT, - FOREIGN KEY(payload) REFERENCES payloads(id) ON DELETE SET NULL, + FOREIGN KEY(payload_id) REFERENCES payloads(id) ON DELETE SET NULL, PRIMARY KEY(id) );