From f5d2190fc4e85920e722858b12cfcc6f35fa6dd2 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Thu, 20 Apr 2023 00:13:20 +0300 Subject: [PATCH] fix last integrations test --- Cargo.lock | 31 +++++-------- bin/u_server/src/db.rs | 20 +++++++-- bin/u_server/src/handlers.rs | 65 +++++++++++++++++++-------- bin/u_server/src/u_server.rs | 2 +- integration/tests/integration/api.rs | 4 +- lib/u_lib/Cargo.toml | 2 +- lib/u_lib/src/api.rs | 18 ++++++++ lib/u_lib/src/error/mod.rs | 9 +--- lib/u_lib/src/jobs.rs | 8 +++- lib/u_lib/src/models/jobs/assigned.rs | 17 +++++++ lib/u_lib/src/ufs/error.rs | 2 + lib/u_lib/src/ufs/mod.rs | 31 ++++++++++++- 12 files changed, 153 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1cd2c46..08a23ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -345,12 +345,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "boxfnonce" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5988cb1d626264ac94100be357308f29ff7cbdd3b36bda27f450a4ee3f713426" - [[package]] name = "brotli" version = "3.3.4" @@ -610,11 +604,10 @@ dependencies = [ [[package]] name = "daemonize" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70c24513e34f53b640819f0ac9f705b673fcf4006d7aab8778bee72ebfc89815" +checksum = "ab8bfdaacb3c887a54d41bdf48d3af8873b3f5566469f8ba21b92057509f116e" dependencies = [ - "boxfnonce", "libc", ] @@ -675,9 +668,9 @@ dependencies = [ [[package]] name = "diesel" -version = "2.0.3" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4391a22b19c916e50bec4d6140f29bdda3e3bb187223fe6e3ea0b6e4d1021c04" +checksum = "72eb77396836a4505da85bae0712fa324b74acfe1876d7c2f7e694ef3d0ee373" dependencies = [ "bitflags", "byteorder", @@ -980,9 +973,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f" +checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21" dependencies = [ "bytes", "fnv", @@ -1340,9 +1333,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59d8c75012853d2e872fb56bc8a2e53718e2cafe1a4c823143141c6d90c322f" +checksum = "9b085a4f2cde5781fc4b1717f2e86c62f5cda49de7ba99a7c2eae02b61c9064c" [[package]] name = "local-channel" @@ -1710,9 +1703,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "pq-sys" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b845d6d8ec554f972a2c5298aad68953fd64e7441e846075450b44656a016d1" +checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" dependencies = [ "vcpkg", ] @@ -2010,9 +2003,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.11" +version = "0.37.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85597d61f83914ddeba6a47b3b8ffe7365107221c2e557ed94426489fefb5f77" +checksum = "722529a737f5a942fdbac3a46cee213053196737c5eaa3386d52e85b786f2659" dependencies = [ "bitflags", "errno 0.3.1", diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 3de24e6..053bf26 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -1,10 +1,12 @@ use crate::error::Error; use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection}; use std::mem::drop; -use u_lib::db::PgAsyncPool; -use u_lib::models::{schema, Agent, AssignedJob, JobModel, JobState, PayloadMeta, ThinJob}; -use u_lib::platform::Platform; -use u_lib::types::Id; +use u_lib::{ + db::PgAsyncPool, + models::{schema, Agent, AssignedJob, JobModel, JobState, PayloadMeta, ThinJob}, + platform::Platform, + types::Id, +}; type Result = std::result::Result; @@ -249,6 +251,16 @@ impl UDB<'_> { .map_err(with_err_ctx("Can't delete agents")) } + pub fn del_payloads(&mut self, ids: &[Id]) -> Result<()> { + use schema::payloads; + + diesel::delete(payloads::table) + .filter(payloads::id.eq_any(ids)) + .execute(self.conn) + .map(drop) + .map_err(with_err_ctx("Can't delete payloads")) + } + pub fn upsert_agent(&mut self, agent: &Agent) -> Result<()> { use schema::agents; diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index bc82823..0fd739f 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use crate::db::{PgRepo, UDB}; use crate::error::Error; use serde::Deserialize; -use u_lib::jobs::join_payload; use u_lib::{ - jobs::split_payload, + api::retypes, + jobs::{join_payload, split_payload}, messaging::{AsMsg, Reportable}, misc::OneOrVec, models::*, @@ -24,7 +24,7 @@ pub struct GetJobQuery { pub struct Endpoints; impl Endpoints { - pub async fn get_agents(repo: Arc, id: Option) -> EndpResult> { + pub async fn get_agents(repo: Arc, id: Option) -> EndpResult { repo.interact(move |mut db| { Ok(match id { Some(id) => { @@ -41,7 +41,11 @@ impl Endpoints { .map_err(From::from) } - pub async fn get_job(repo: Arc, id: Id, params: GetJobQuery) -> EndpResult { + pub async fn get_job( + repo: Arc, + id: Id, + params: GetJobQuery, + ) -> EndpResult { let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else { return Err(not_found()) }; @@ -60,19 +64,25 @@ impl Endpoints { Ok(join_payload(job).map_err(Error::from)?) } - pub async fn get_jobs(repo: Arc) -> EndpResult> { + pub async fn get_jobs(repo: Arc) -> EndpResult { repo.interact(move |mut db| db.get_jobs()) .await .map_err(From::from) } - pub async fn get_agent_jobs(repo: Arc, id: Option) -> EndpResult> { + pub async fn get_agent_jobs( + repo: Arc, + id: Option, + ) -> EndpResult { repo.interact(move |mut db| db.get_exact_jobs(id, false)) .await .map_err(From::from) } - pub async fn get_personal_jobs(repo: Arc, id: Id) -> EndpResult> { + pub async fn get_personal_jobs( + repo: Arc, + id: Id, + ) -> EndpResult { repo.transaction(move |mut db| { let agent = db.get_agent(id)?; match agent { @@ -100,13 +110,19 @@ impl Endpoints { db.update_job_status(job.id, JobState::Running)?; } - Ok(assigned_jobs) + Ok(assigned_jobs + .into_iter() + .map(|j| AssignedJobById::from(&j)) + .collect()) }) .await .map_err(From::from) } - pub async fn upload_jobs(repo: Arc, msg: Vec) -> EndpResult<()> { + pub async fn upload_jobs( + repo: Arc, + msg: Vec, + ) -> EndpResult { let jobs = msg .into_iter() .map(|meta| Ok(split_payload(meta)?)) @@ -119,10 +135,15 @@ impl Endpoints { pub async fn del(repo: Arc, id: Id) -> EndpResult<()> { repo.transaction(move |mut db| { - [UDB::del_agents, UDB::del_jobs, UDB::del_results] - .iter() - .map(|f| f(&mut db, &[id])) - .collect::>() + [ + UDB::del_agents, + UDB::del_jobs, + UDB::del_results, + UDB::del_payloads, + ] + .iter() + .map(|f| f(&mut db, &[id])) + .collect::>() }) .await .map_err(From::from) @@ -132,7 +153,7 @@ impl Endpoints { repo: Arc, agent_id: Id, job_idents: Vec, - ) -> EndpResult> { + ) -> EndpResult { repo.transaction(move |mut db| { let assigned_job_idents = job_idents .into_iter() @@ -162,7 +183,7 @@ impl Endpoints { repo: Arc, msg: Data, agent_id: Id, - ) -> EndpResult<()> { + ) -> EndpResult { repo.transaction(move |mut db| { for entry in msg.into_vec() { match entry { @@ -214,17 +235,23 @@ impl Endpoints { .map_err(From::from) } - pub async fn update_agent(repo: Arc, agent: Agent) -> EndpResult<()> { + pub async fn update_agent(repo: Arc, agent: Agent) -> EndpResult { repo.interact(move |mut db| db.upsert_agent(&agent)).await?; Ok(()) } - pub async fn update_job(repo: Arc, job: JobModel) -> EndpResult<()> { - repo.interact(move |mut db| db.update_job(&job)).await?; + pub async fn update_job(repo: Arc, job: FatJob) -> EndpResult { + let thin_job = split_payload(job).map_err(Error::from)?; + + repo.interact(move |mut db| db.update_job(&thin_job.job)) + .await?; Ok(()) } - pub async fn update_assigned_job(repo: Arc, assigned: AssignedJob) -> EndpResult<()> { + pub async fn update_assigned_job( + repo: Arc, + assigned: AssignedJob, + ) -> EndpResult { repo.interact(move |mut db| db.update_result(&assigned)) .await?; Ok(()) diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index ee3f440..3ad73a1 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -108,7 +108,7 @@ pub fn init_endpoints( let update_job = path("update_job") .and(with_db.clone()) - .and(body::json::()) + .and(body::json::()) .and_then(Endpoints::update_job) .map(ok); diff --git a/integration/tests/integration/api.rs b/integration/tests/integration/api.rs index f438fcf..17d42cd 100644 --- a/integration/tests/integration/api.rs +++ b/integration/tests/integration/api.rs @@ -42,5 +42,7 @@ async fn test_jobs_endpoints(client_panel: &HttpClient) { assert_eq!(fat_job, fetched_job); client_panel.del(job_id).await.unwrap(); - client_panel.get_job(job_id, false).await.unwrap(); // should fail with 404 + + let not_found_err = client_panel.get_job(job_id, false).await.unwrap_err(); + assert!(not_found_err.to_string().contains("404 Not Found")) } diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index f5fc62f..8cf77d7 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -35,7 +35,7 @@ bincode = "1.3.3" sha3 = "0.10.7" [target.'cfg(unix)'.dependencies] -daemonize = "0.4.1" +daemonize = "0.5" nix = "0.17" [features] diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 2fbadf7..d656832 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -20,6 +20,24 @@ use crate::{ const AGENT_IDENTITY: &[u8] = include_bytes!("../../../certs/alice.p12"); const ROOT_CA_CERT: &[u8] = include_bytes!("../../../certs/ca.crt"); +pub mod retypes { + use super::*; + + pub type GetPersonalJobs = Vec; + pub type Report = (); + pub type GetJob = FatJob; + pub type GetJobs = Vec; + pub type GetAgents = Vec; + pub type UpdateAgent = (); + pub type UpdateJob = (); + pub type UpdateResult = (); + pub type UploadJobs = (); + pub type Del = (); + pub type SetJobs = Vec; + pub type GetAgentJobs = Vec; + pub type Ping = (); +} + #[derive(Clone, Debug)] pub struct HttpClient { base_url: Url, diff --git a/lib/u_lib/src/error/mod.rs b/lib/u_lib/src/error/mod.rs index 82a5700..b9678a2 100644 --- a/lib/u_lib/src/error/mod.rs +++ b/lib/u_lib/src/error/mod.rs @@ -58,17 +58,12 @@ impl From for UError { ) } } -/* -impl From for UError { - fn from(e: serde_json::Error) -> Self { - UError::DeserializeError(e.to_string()) - } -} -*/ + impl From for UError { fn from(e: anyhow::Error) -> Self { let ctx = e .chain() + .rev() .skip(1) .map(|cause| format!("ctx: {}", cause)) .collect::>() diff --git a/lib/u_lib/src/jobs.rs b/lib/u_lib/src/jobs.rs index 67e3494..94c2c0f 100644 --- a/lib/u_lib/src/jobs.rs +++ b/lib/u_lib/src/jobs.rs @@ -189,8 +189,12 @@ pub fn split_payload(job: FatJob) -> Result { } = job; if let Some(meta) = &payload_meta { - if !ufs::in_index(&meta.name) { - ufs::put(&meta.name, payload_data.unwrap())?; + if let Some(data) = payload_data { + if ufs::in_index(&meta.name) { + ufs::edit(&meta.name, data)?; + } else { + ufs::put(&meta.name, data)?; + } } } diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index 041451c..946fc4b 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -79,6 +79,23 @@ impl From<(&JobModel, AssignedJobById)> for AssignedJob { } } +impl From<&AssignedJob> for AssignedJobById { + fn from(j: &AssignedJob) -> Self { + let &AssignedJob { + agent_id, + id, + job_id, + .. + } = j; + + AssignedJobById { + agent_id, + id, + job_id, + } + } +} + impl Default for AssignedJobById { fn default() -> Self { Self { diff --git a/lib/u_lib/src/ufs/error.rs b/lib/u_lib/src/ufs/error.rs index 90baac9..3a5cbd8 100644 --- a/lib/u_lib/src/ufs/error.rs +++ b/lib/u_lib/src/ufs/error.rs @@ -40,6 +40,8 @@ impl From for Error { fn from(e: anyhow::Error) -> Self { let err = e .chain() + .rev() + .skip(1) .map(|cause| format!("ctx: {}", cause)) .collect::>() .join("\n"); diff --git a/lib/u_lib/src/ufs/mod.rs b/lib/u_lib/src/ufs/mod.rs index 4ab8c40..51a8ff6 100644 --- a/lib/u_lib/src/ufs/mod.rs +++ b/lib/u_lib/src/ufs/mod.rs @@ -44,6 +44,8 @@ impl FileMeta { } } +/// Check if file exists in index. +/// File may present in fs but not in index, thus fn will return false. pub fn in_index(name: impl AsRef) -> bool { read_meta(name).is_ok() } @@ -55,6 +57,7 @@ pub fn read_meta(name: impl AsRef) -> Result { .context("meta") } +/// Read file by index name pub fn read(name: impl AsRef) -> Result> { let meta = read_meta(&name).context("read_meta")?; fs::read(&meta.path) @@ -62,6 +65,7 @@ pub fn read(name: impl AsRef) -> Result> { .context("read") } +/// Create file with generated name pub fn create_anonymous(data: impl AsRef<[u8]>) -> Result { if let Some((name, _)) = index::get_by_hash(hash_data(&data)) { return Ok(name); @@ -74,7 +78,10 @@ pub fn create_anonymous(data: impl AsRef<[u8]>) -> Result { Ok(name) } -/// Create new file and add to index +/// Create new file and add to index. +/// +/// If index already contains a file with the same contents, it doesn't duplicate it, +/// but adds an symlink. pub fn put(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<()> { let name = name.as_ref(); let data_hash = hash_data(&data); @@ -112,6 +119,26 @@ pub fn put(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<()> { Ok(()) } +pub fn edit(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<()> { + let meta = read_meta(&name).context("edit_meta")?; + let data_hash = hash_data(&data); + + if meta.hash == data_hash { + return Ok(()); + } + + fs::write(&meta.path, &data) + .map_err(|e| Error::new(e, &meta.path)) + .context("edit_write")?; + + let new_meta = FileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?; + + index::remove(&name); + index::insert(name.as_ref(), new_meta); + + Ok(()) +} + pub fn remove(name: impl AsRef) -> Result<()> { let name = name.as_ref(); match index::remove(name) { @@ -167,7 +194,7 @@ pub fn update_payload_data(name: impl AsRef, data: impl AsRef<[u8]>) -> Res remove(&name).context("upd")?; } - put(name, data).context("upd") + put(name, data).context("upd_put") } /// Add an existing file to index