fix last integrations test

pull/9/head
plazmoid 1 year ago
parent 1b0cdae404
commit f5d2190fc4
  1. 31
      Cargo.lock
  2. 20
      bin/u_server/src/db.rs
  3. 65
      bin/u_server/src/handlers.rs
  4. 2
      bin/u_server/src/u_server.rs
  5. 4
      integration/tests/integration/api.rs
  6. 2
      lib/u_lib/Cargo.toml
  7. 18
      lib/u_lib/src/api.rs
  8. 9
      lib/u_lib/src/error/mod.rs
  9. 8
      lib/u_lib/src/jobs.rs
  10. 17
      lib/u_lib/src/models/jobs/assigned.rs
  11. 2
      lib/u_lib/src/ufs/error.rs
  12. 31
      lib/u_lib/src/ufs/mod.rs

31
Cargo.lock generated

@ -345,12 +345,6 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "boxfnonce"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5988cb1d626264ac94100be357308f29ff7cbdd3b36bda27f450a4ee3f713426"
[[package]] [[package]]
name = "brotli" name = "brotli"
version = "3.3.4" version = "3.3.4"
@ -610,11 +604,10 @@ dependencies = [
[[package]] [[package]]
name = "daemonize" name = "daemonize"
version = "0.4.1" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70c24513e34f53b640819f0ac9f705b673fcf4006d7aab8778bee72ebfc89815" checksum = "ab8bfdaacb3c887a54d41bdf48d3af8873b3f5566469f8ba21b92057509f116e"
dependencies = [ dependencies = [
"boxfnonce",
"libc", "libc",
] ]
@ -675,9 +668,9 @@ dependencies = [
[[package]] [[package]]
name = "diesel" name = "diesel"
version = "2.0.3" version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4391a22b19c916e50bec4d6140f29bdda3e3bb187223fe6e3ea0b6e4d1021c04" checksum = "72eb77396836a4505da85bae0712fa324b74acfe1876d7c2f7e694ef3d0ee373"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"byteorder", "byteorder",
@ -980,9 +973,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.17" version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f" checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@ -1340,9 +1333,9 @@ dependencies = [
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.3.1" version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59d8c75012853d2e872fb56bc8a2e53718e2cafe1a4c823143141c6d90c322f" checksum = "9b085a4f2cde5781fc4b1717f2e86c62f5cda49de7ba99a7c2eae02b61c9064c"
[[package]] [[package]]
name = "local-channel" name = "local-channel"
@ -1710,9 +1703,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]] [[package]]
name = "pq-sys" name = "pq-sys"
version = "0.4.7" version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b845d6d8ec554f972a2c5298aad68953fd64e7441e846075450b44656a016d1" checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd"
dependencies = [ dependencies = [
"vcpkg", "vcpkg",
] ]
@ -2010,9 +2003,9 @@ dependencies = [
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.37.11" version = "0.37.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85597d61f83914ddeba6a47b3b8ffe7365107221c2e557ed94426489fefb5f77" checksum = "722529a737f5a942fdbac3a46cee213053196737c5eaa3386d52e85b786f2659"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"errno 0.3.1", "errno 0.3.1",

@ -1,10 +1,12 @@
use crate::error::Error; use crate::error::Error;
use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection}; use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection};
use std::mem::drop; use std::mem::drop;
use u_lib::db::PgAsyncPool; use u_lib::{
use u_lib::models::{schema, Agent, AssignedJob, JobModel, JobState, PayloadMeta, ThinJob}; db::PgAsyncPool,
use u_lib::platform::Platform; models::{schema, Agent, AssignedJob, JobModel, JobState, PayloadMeta, ThinJob},
use u_lib::types::Id; platform::Platform,
types::Id,
};
type Result<T> = std::result::Result<T, Error>; type Result<T> = std::result::Result<T, Error>;
@ -249,6 +251,16 @@ impl UDB<'_> {
.map_err(with_err_ctx("Can't delete agents")) .map_err(with_err_ctx("Can't delete agents"))
} }
pub fn del_payloads(&mut self, ids: &[Id]) -> Result<()> {
use schema::payloads;
diesel::delete(payloads::table)
.filter(payloads::id.eq_any(ids))
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx("Can't delete payloads"))
}
pub fn upsert_agent(&mut self, agent: &Agent) -> Result<()> { pub fn upsert_agent(&mut self, agent: &Agent) -> Result<()> {
use schema::agents; use schema::agents;

@ -3,9 +3,9 @@ use std::sync::Arc;
use crate::db::{PgRepo, UDB}; use crate::db::{PgRepo, UDB};
use crate::error::Error; use crate::error::Error;
use serde::Deserialize; use serde::Deserialize;
use u_lib::jobs::join_payload;
use u_lib::{ use u_lib::{
jobs::split_payload, api::retypes,
jobs::{join_payload, split_payload},
messaging::{AsMsg, Reportable}, messaging::{AsMsg, Reportable},
misc::OneOrVec, misc::OneOrVec,
models::*, models::*,
@ -24,7 +24,7 @@ pub struct GetJobQuery {
pub struct Endpoints; pub struct Endpoints;
impl Endpoints { impl Endpoints {
pub async fn get_agents(repo: Arc<PgRepo>, id: Option<Id>) -> EndpResult<Vec<Agent>> { pub async fn get_agents(repo: Arc<PgRepo>, id: Option<Id>) -> EndpResult<retypes::GetAgents> {
repo.interact(move |mut db| { repo.interact(move |mut db| {
Ok(match id { Ok(match id {
Some(id) => { Some(id) => {
@ -41,7 +41,11 @@ impl Endpoints {
.map_err(From::from) .map_err(From::from)
} }
pub async fn get_job(repo: Arc<PgRepo>, id: Id, params: GetJobQuery) -> EndpResult<FatJob> { pub async fn get_job(
repo: Arc<PgRepo>,
id: Id,
params: GetJobQuery,
) -> EndpResult<retypes::GetJob> {
let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else { let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else {
return Err(not_found()) return Err(not_found())
}; };
@ -60,19 +64,25 @@ impl Endpoints {
Ok(join_payload(job).map_err(Error::from)?) Ok(join_payload(job).map_err(Error::from)?)
} }
pub async fn get_jobs(repo: Arc<PgRepo>) -> EndpResult<Vec<JobModel>> { pub async fn get_jobs(repo: Arc<PgRepo>) -> EndpResult<retypes::GetJobs> {
repo.interact(move |mut db| db.get_jobs()) repo.interact(move |mut db| db.get_jobs())
.await .await
.map_err(From::from) .map_err(From::from)
} }
pub async fn get_agent_jobs(repo: Arc<PgRepo>, id: Option<Id>) -> EndpResult<Vec<AssignedJob>> { pub async fn get_agent_jobs(
repo: Arc<PgRepo>,
id: Option<Id>,
) -> EndpResult<retypes::GetAgentJobs> {
repo.interact(move |mut db| db.get_exact_jobs(id, false)) repo.interact(move |mut db| db.get_exact_jobs(id, false))
.await .await
.map_err(From::from) .map_err(From::from)
} }
pub async fn get_personal_jobs(repo: Arc<PgRepo>, id: Id) -> EndpResult<Vec<AssignedJob>> { pub async fn get_personal_jobs(
repo: Arc<PgRepo>,
id: Id,
) -> EndpResult<retypes::GetPersonalJobs> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
let agent = db.get_agent(id)?; let agent = db.get_agent(id)?;
match agent { match agent {
@ -100,13 +110,19 @@ impl Endpoints {
db.update_job_status(job.id, JobState::Running)?; db.update_job_status(job.id, JobState::Running)?;
} }
Ok(assigned_jobs) Ok(assigned_jobs
.into_iter()
.map(|j| AssignedJobById::from(&j))
.collect())
}) })
.await .await
.map_err(From::from) .map_err(From::from)
} }
pub async fn upload_jobs(repo: Arc<PgRepo>, msg: Vec<FatJob>) -> EndpResult<()> { pub async fn upload_jobs(
repo: Arc<PgRepo>,
msg: Vec<FatJob>,
) -> EndpResult<retypes::UploadJobs> {
let jobs = msg let jobs = msg
.into_iter() .into_iter()
.map(|meta| Ok(split_payload(meta)?)) .map(|meta| Ok(split_payload(meta)?))
@ -119,10 +135,15 @@ impl Endpoints {
pub async fn del(repo: Arc<PgRepo>, id: Id) -> EndpResult<()> { pub async fn del(repo: Arc<PgRepo>, id: Id) -> EndpResult<()> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
[UDB::del_agents, UDB::del_jobs, UDB::del_results] [
.iter() UDB::del_agents,
.map(|f| f(&mut db, &[id])) UDB::del_jobs,
.collect::<Result<(), Error>>() UDB::del_results,
UDB::del_payloads,
]
.iter()
.map(|f| f(&mut db, &[id]))
.collect::<Result<(), Error>>()
}) })
.await .await
.map_err(From::from) .map_err(From::from)
@ -132,7 +153,7 @@ impl Endpoints {
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
agent_id: Id, agent_id: Id,
job_idents: Vec<String>, job_idents: Vec<String>,
) -> EndpResult<Vec<Id>> { ) -> EndpResult<retypes::SetJobs> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
let assigned_job_idents = job_idents let assigned_job_idents = job_idents
.into_iter() .into_iter()
@ -162,7 +183,7 @@ impl Endpoints {
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
msg: Data, msg: Data,
agent_id: Id, agent_id: Id,
) -> EndpResult<()> { ) -> EndpResult<retypes::Report> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
for entry in msg.into_vec() { for entry in msg.into_vec() {
match entry { match entry {
@ -214,17 +235,23 @@ impl Endpoints {
.map_err(From::from) .map_err(From::from)
} }
pub async fn update_agent(repo: Arc<PgRepo>, agent: Agent) -> EndpResult<()> { pub async fn update_agent(repo: Arc<PgRepo>, agent: Agent) -> EndpResult<retypes::UpdateAgent> {
repo.interact(move |mut db| db.upsert_agent(&agent)).await?; repo.interact(move |mut db| db.upsert_agent(&agent)).await?;
Ok(()) Ok(())
} }
pub async fn update_job(repo: Arc<PgRepo>, job: JobModel) -> EndpResult<()> { pub async fn update_job(repo: Arc<PgRepo>, job: FatJob) -> EndpResult<retypes::UpdateJob> {
repo.interact(move |mut db| db.update_job(&job)).await?; let thin_job = split_payload(job).map_err(Error::from)?;
repo.interact(move |mut db| db.update_job(&thin_job.job))
.await?;
Ok(()) Ok(())
} }
pub async fn update_assigned_job(repo: Arc<PgRepo>, assigned: AssignedJob) -> EndpResult<()> { pub async fn update_assigned_job(
repo: Arc<PgRepo>,
assigned: AssignedJob,
) -> EndpResult<retypes::UpdateResult> {
repo.interact(move |mut db| db.update_result(&assigned)) repo.interact(move |mut db| db.update_result(&assigned))
.await?; .await?;
Ok(()) Ok(())

@ -108,7 +108,7 @@ pub fn init_endpoints(
let update_job = path("update_job") let update_job = path("update_job")
.and(with_db.clone()) .and(with_db.clone())
.and(body::json::<JobModel>()) .and(body::json::<FatJob>())
.and_then(Endpoints::update_job) .and_then(Endpoints::update_job)
.map(ok); .map(ok);

@ -42,5 +42,7 @@ async fn test_jobs_endpoints(client_panel: &HttpClient) {
assert_eq!(fat_job, fetched_job); assert_eq!(fat_job, fetched_job);
client_panel.del(job_id).await.unwrap(); client_panel.del(job_id).await.unwrap();
client_panel.get_job(job_id, false).await.unwrap(); // should fail with 404
let not_found_err = client_panel.get_job(job_id, false).await.unwrap_err();
assert!(not_found_err.to_string().contains("404 Not Found"))
} }

@ -35,7 +35,7 @@ bincode = "1.3.3"
sha3 = "0.10.7" sha3 = "0.10.7"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
daemonize = "0.4.1" daemonize = "0.5"
nix = "0.17" nix = "0.17"
[features] [features]

@ -20,6 +20,24 @@ use crate::{
const AGENT_IDENTITY: &[u8] = include_bytes!("../../../certs/alice.p12"); const AGENT_IDENTITY: &[u8] = include_bytes!("../../../certs/alice.p12");
const ROOT_CA_CERT: &[u8] = include_bytes!("../../../certs/ca.crt"); const ROOT_CA_CERT: &[u8] = include_bytes!("../../../certs/ca.crt");
pub mod retypes {
use super::*;
pub type GetPersonalJobs = Vec<AssignedJobById>;
pub type Report = ();
pub type GetJob = FatJob;
pub type GetJobs = Vec<JobModel>;
pub type GetAgents = Vec<Agent>;
pub type UpdateAgent = ();
pub type UpdateJob = ();
pub type UpdateResult = ();
pub type UploadJobs = ();
pub type Del = ();
pub type SetJobs = Vec<Id>;
pub type GetAgentJobs = Vec<AssignedJob>;
pub type Ping = ();
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct HttpClient { pub struct HttpClient {
base_url: Url, base_url: Url,

@ -58,17 +58,12 @@ impl From<ReqError> for UError {
) )
} }
} }
/*
impl From<serde_json::Error> for UError {
fn from(e: serde_json::Error) -> Self {
UError::DeserializeError(e.to_string())
}
}
*/
impl From<anyhow::Error> for UError { impl From<anyhow::Error> for UError {
fn from(e: anyhow::Error) -> Self { fn from(e: anyhow::Error) -> Self {
let ctx = e let ctx = e
.chain() .chain()
.rev()
.skip(1) .skip(1)
.map(|cause| format!("ctx: {}", cause)) .map(|cause| format!("ctx: {}", cause))
.collect::<Vec<_>>() .collect::<Vec<_>>()

@ -189,8 +189,12 @@ pub fn split_payload(job: FatJob) -> Result<ThinJob, ufs::Error> {
} = job; } = job;
if let Some(meta) = &payload_meta { if let Some(meta) = &payload_meta {
if !ufs::in_index(&meta.name) { if let Some(data) = payload_data {
ufs::put(&meta.name, payload_data.unwrap())?; if ufs::in_index(&meta.name) {
ufs::edit(&meta.name, data)?;
} else {
ufs::put(&meta.name, data)?;
}
} }
} }

@ -79,6 +79,23 @@ impl From<(&JobModel, AssignedJobById)> for AssignedJob {
} }
} }
impl From<&AssignedJob> for AssignedJobById {
fn from(j: &AssignedJob) -> Self {
let &AssignedJob {
agent_id,
id,
job_id,
..
} = j;
AssignedJobById {
agent_id,
id,
job_id,
}
}
}
impl Default for AssignedJobById { impl Default for AssignedJobById {
fn default() -> Self { fn default() -> Self {
Self { Self {

@ -40,6 +40,8 @@ impl From<anyhow::Error> for Error {
fn from(e: anyhow::Error) -> Self { fn from(e: anyhow::Error) -> Self {
let err = e let err = e
.chain() .chain()
.rev()
.skip(1)
.map(|cause| format!("ctx: {}", cause)) .map(|cause| format!("ctx: {}", cause))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join("\n"); .join("\n");

@ -44,6 +44,8 @@ impl FileMeta {
} }
} }
/// Check if file exists in index.
/// File may present in fs but not in index, thus fn will return false.
pub fn in_index(name: impl AsRef<str>) -> bool { pub fn in_index(name: impl AsRef<str>) -> bool {
read_meta(name).is_ok() read_meta(name).is_ok()
} }
@ -55,6 +57,7 @@ pub fn read_meta(name: impl AsRef<str>) -> Result<FileMeta> {
.context("meta") .context("meta")
} }
/// Read file by index name
pub fn read(name: impl AsRef<str>) -> Result<Vec<u8>> { pub fn read(name: impl AsRef<str>) -> Result<Vec<u8>> {
let meta = read_meta(&name).context("read_meta")?; let meta = read_meta(&name).context("read_meta")?;
fs::read(&meta.path) fs::read(&meta.path)
@ -62,6 +65,7 @@ pub fn read(name: impl AsRef<str>) -> Result<Vec<u8>> {
.context("read") .context("read")
} }
/// Create file with generated name
pub fn create_anonymous(data: impl AsRef<[u8]>) -> Result<String> { pub fn create_anonymous(data: impl AsRef<[u8]>) -> Result<String> {
if let Some((name, _)) = index::get_by_hash(hash_data(&data)) { if let Some((name, _)) = index::get_by_hash(hash_data(&data)) {
return Ok(name); return Ok(name);
@ -74,7 +78,10 @@ pub fn create_anonymous(data: impl AsRef<[u8]>) -> Result<String> {
Ok(name) Ok(name)
} }
/// Create new file and add to index /// Create new file and add to index.
///
/// If index already contains a file with the same contents, it doesn't duplicate it,
/// but adds an symlink.
pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> { pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
let name = name.as_ref(); let name = name.as_ref();
let data_hash = hash_data(&data); let data_hash = hash_data(&data);
@ -112,6 +119,26 @@ pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
Ok(()) Ok(())
} }
pub fn edit(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
let meta = read_meta(&name).context("edit_meta")?;
let data_hash = hash_data(&data);
if meta.hash == data_hash {
return Ok(());
}
fs::write(&meta.path, &data)
.map_err(|e| Error::new(e, &meta.path))
.context("edit_write")?;
let new_meta = FileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?;
index::remove(&name);
index::insert(name.as_ref(), new_meta);
Ok(())
}
pub fn remove(name: impl AsRef<str>) -> Result<()> { pub fn remove(name: impl AsRef<str>) -> Result<()> {
let name = name.as_ref(); let name = name.as_ref();
match index::remove(name) { match index::remove(name) {
@ -167,7 +194,7 @@ pub fn update_payload_data(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Res
remove(&name).context("upd")?; remove(&name).context("upd")?;
} }
put(name, data).context("upd") put(name, data).context("upd_put")
} }
/// Add an existing file to index /// Add an existing file to index

Loading…
Cancel
Save