finish payload crud & tests

pull/9/head
plazmoid 1 year ago
parent 32e96476cf
commit 45bba0dd9b
  1. 59
      Cargo.lock
  2. 55
      bin/u_panel/src/argparse.rs
  3. 45
      bin/u_server/src/db.rs
  4. 5
      bin/u_server/src/error.rs
  5. 90
      bin/u_server/src/handlers.rs
  6. 36
      bin/u_server/src/u_server.rs
  7. 2
      images/u_server.Dockerfile
  8. 4
      integration-tests/docker-compose.yml
  9. 46
      integration-tests/tests/integration_tests/api.rs
  10. 2
      integration-tests/tests/integration_tests/behaviour.rs
  11. 112
      integration-tests/tests/integration_tests/endpoints.rs
  12. 2
      integration-tests/tests/integration_tests/mod.rs
  13. 25
      lib/u_lib/src/api.rs
  14. 6
      lib/u_lib/src/logging.rs
  15. 1
      lib/u_lib/src/messaging.rs
  16. 32
      lib/u_lib/src/models/jobs/meta.rs
  17. 49
      lib/u_lib/src/models/payload.rs
  18. 4
      lib/u_lib/src/models/schema.rs
  19. 8
      lib/u_lib/src/ufs/mod.rs
  20. 4
      migrations/2020-10-24-111622_create_all/up.sql

59
Cargo.lock generated

@ -815,9 +815,9 @@ dependencies = [
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.0.25" version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"miniz_oxide", "miniz_oxide",
@ -1344,9 +1344,9 @@ dependencies = [
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.3.4" version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36eb31c1778188ae1e64398743890d0877fef36d11521ac60406b42016e8c2cf" checksum = "b64f40e5e03e0d54f03845c8197d0291253cdbedfb1cb46b13c2c117554a9f4c"
[[package]] [[package]]
name = "local-channel" name = "local-channel"
@ -1459,9 +1459,9 @@ dependencies = [
[[package]] [[package]]
name = "miniz_oxide" name = "miniz_oxide"
version = "0.6.2" version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [ dependencies = [
"adler", "adler",
] ]
@ -1479,17 +1479,21 @@ dependencies = [
] ]
[[package]] [[package]]
name = "multiparty" name = "multer"
version = "0.1.0" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed1ec6589a6d4a1e0b33b4c0a3f6ee96dfba88ebdb3da51403fd7cf0a24a4b04" checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "encoding_rs",
"futures-util",
"http",
"httparse", "httparse",
"log",
"memchr", "memchr",
"pin-project-lite", "mime",
"try-lock", "spin 0.9.8",
"version_check",
] ]
[[package]] [[package]]
@ -1873,9 +1877,9 @@ checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c"
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.11.16" version = "0.11.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27b71749df584b7f4cac2c426c127a7c785a5106cc98f7a8feb044115f0fa254" checksum = "13293b639a097af28fc8a90f22add145a9c954e49d77da06263d58cf44d5fb91"
dependencies = [ dependencies = [
"base64 0.21.0", "base64 0.21.0",
"bytes", "bytes",
@ -1923,7 +1927,7 @@ dependencies = [
"cc", "cc",
"libc", "libc",
"once_cell", "once_cell",
"spin", "spin 0.5.2",
"untrusted", "untrusted",
"web-sys", "web-sys",
"winapi", "winapi",
@ -2020,9 +2024,9 @@ dependencies = [
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.37.15" version = "0.37.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0661814f891c57c930a610266415528da53c4933e6dea5fb350cbfe048a9ece" checksum = "bc809f704c03a812ac71f22456c857be34185cac691a4316f27ab0f633bb9009"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"errno 0.3.1", "errno 0.3.1",
@ -2278,6 +2282,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.8.0" version = "0.8.0"
@ -2606,10 +2616,11 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.38" version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9cf6a813d3f40c88b0b6b6f29a5c95c6cdbf97c1f9cc53fb820200f5ad814d" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [ dependencies = [
"cfg-if 1.0.0",
"log", "log",
"pin-project-lite", "pin-project-lite",
"tracing-attributes", "tracing-attributes",
@ -2878,9 +2889,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.3.1" version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"serde", "serde",
@ -2938,9 +2949,9 @@ dependencies = [
[[package]] [[package]]
name = "warp" name = "warp"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27e1a710288f0f91a98dd8a74f05b76a10768db245ce183edf64dc1afdc3016c" checksum = "ba431ef570df1287f7f8b07e376491ad54f84d26ac473489427231e1718e1f69"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
@ -2951,7 +2962,7 @@ dependencies = [
"log", "log",
"mime", "mime",
"mime_guess", "mime_guess",
"multiparty", "multer",
"percent-encoding", "percent-encoding",
"pin-project", "pin-project",
"rustls-pemfile", "rustls-pemfile",

@ -1,12 +1,7 @@
use serde_json::{from_str, to_value, Value}; use serde_json::{from_str, to_value, Value};
use structopt::StructOpt; use structopt::StructOpt;
use u_lib::{ use u_lib::{
api::HttpClient, api::HttpClient, messaging::AsMsg, models::*, types::Id, types::PanelResult, UError, UResult,
messaging::AsMsg,
models::{Agent, AssignedJob, BriefMode, RawJob},
types::Id,
types::PanelResult,
UError, UResult,
}; };
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
@ -20,17 +15,17 @@ pub struct Args {
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
enum Cmd { enum Cmd {
Agents(RUD), Agents(RUD),
Jobs(JobCRUD), Jobs(CRUD),
Map(MapCRUD), Map(MapCRUD),
Payloads(RUD), Payloads(PayloadCRUD),
Ping, Ping,
Serve, Serve,
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
enum JobCRUD { enum CRUD {
Create { Create {
job: String, item: String,
}, },
#[structopt(flatten)] #[structopt(flatten)]
RUD(RUD), RUD(RUD),
@ -48,6 +43,23 @@ enum MapCRUD {
RUD(RUD), RUD(RUD),
} }
#[derive(StructOpt, Debug)]
enum PayloadCRUD {
Create {
item: String,
},
Read {
id: Option<String>,
},
Update {
item: String,
},
Delete {
#[structopt(parse(try_from_str = parse_uuid))]
id: Id,
},
}
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
enum RUD { enum RUD {
Read { Read {
@ -84,7 +96,7 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
RUD::Delete { id } => into_value(client.del(id).await?), RUD::Delete { id } => into_value(client.del(id).await?),
}, },
Cmd::Jobs(action) => match action { Cmd::Jobs(action) => match action {
JobCRUD::Create { job } => { CRUD::Create { item: job } => {
let raw_job = from_str::<RawJob>(&job) let raw_job = from_str::<RawJob>(&job)
.map_err(|e| UError::DeserializeError(e.to_string(), job))?; .map_err(|e| UError::DeserializeError(e.to_string(), job))?;
let mut job = raw_job.validated()?; let mut job = raw_job.validated()?;
@ -95,11 +107,11 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
into_value(client.upload_jobs([&job]).await?) into_value(client.upload_jobs([&job]).await?)
} }
JobCRUD::RUD(RUD::Read { id }) => match id { CRUD::RUD(RUD::Read { id }) => match id {
Some(id) => into_value(vec![client.get_job(id, args.brief).await?]), Some(id) => into_value(vec![client.get_job(id, args.brief).await?]),
None => into_value(client.get_jobs().await?), None => into_value(client.get_jobs().await?),
}, },
JobCRUD::RUD(RUD::Update { item }) => { CRUD::RUD(RUD::Update { item }) => {
let raw_job = from_str::<RawJob>(&item) let raw_job = from_str::<RawJob>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?; .map_err(|e| UError::DeserializeError(e.to_string(), item))?;
let mut job = raw_job.validated()?; let mut job = raw_job.validated()?;
@ -110,7 +122,7 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
into_value(client.update_job(&job).await?) into_value(client.update_job(&job).await?)
} }
JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), CRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
}, },
Cmd::Map(action) => match action { Cmd::Map(action) => match action {
MapCRUD::Create { MapCRUD::Create {
@ -126,12 +138,21 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
MapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), MapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
}, },
Cmd::Payloads(action) => match action { Cmd::Payloads(action) => match action {
RUD::Read { id } => match id { PayloadCRUD::Create { item } => {
let payload = from_str::<RawPayload>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?;
into_value(client.upload_payloads([&payload]).await?)
}
PayloadCRUD::Read { id } => match id {
None => into_value(client.get_payloads().await?), None => into_value(client.get_payloads().await?),
Some(id) => into_value(client.get_payload(id, args.brief).await?), Some(id) => into_value(client.get_payload(id, args.brief).await?),
}, },
RUD::Update { item: _item } => todo!(), PayloadCRUD::Update { item } => {
RUD::Delete { id } => into_value(client.del(id).await?), let payload = from_str::<Payload>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?;
into_value(client.update_payload(&payload).await?)
}
PayloadCRUD::Delete { id } => into_value(client.del(id).await?),
}, },
Cmd::Ping => into_value(client.ping().await?), Cmd::Ping => into_value(client.ping().await?),
Cmd::Serve => { Cmd::Serve => {

@ -50,28 +50,24 @@ pub struct UDB<'c> {
} }
impl UDB<'_> { impl UDB<'_> {
pub fn insert_jobs(&mut self, jobs: &[Job]) -> Result<()> { pub fn insert_jobs(&mut self, jobs: &[JobModel]) -> Result<()> {
use schema::{jobs, payloads}; use schema::jobs;
let (jobs, payloads_opt): (Vec<_>, Vec<_>) = diesel::insert_into(jobs::table)
jobs.iter().map(|j| (&j.job, j.payload.as_ref())).unzip(); .values(jobs)
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx("Can't insert jobs"))
}
let payloads = payloads_opt pub fn insert_payloads(&mut self, payloads: &[Payload]) -> Result<()> {
.into_iter() use schema::payloads;
.filter_map(|p| p)
.collect::<Vec<_>>();
diesel::insert_into(payloads::table) diesel::insert_into(payloads::table)
.values(payloads) .values(payloads)
.execute(self.conn) .execute(self.conn)
.map(drop) .map(drop)
.map_err(with_err_ctx("Can't insert payloads"))?; .map_err(with_err_ctx("Can't insert payloads"))
diesel::insert_into(jobs::table)
.values(jobs)
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx("Can't insert jobs"))
} }
pub fn get_job(&mut self, id: Id) -> Result<Option<Job>> { pub fn get_job(&mut self, id: Id) -> Result<Option<Job>> {
@ -105,6 +101,16 @@ impl UDB<'_> {
.map_err(with_err_ctx(format!("Can't get payload {id}"))) .map_err(with_err_ctx(format!("Can't get payload {id}")))
} }
pub fn get_payload_by_name(&mut self, name: String) -> Result<Option<Payload>> {
use schema::payloads;
payloads::table
.filter(payloads::name.eq(&name))
.first(self.conn)
.optional()
.map_err(with_err_ctx(format!("Can't get payload by name {name}")))
}
pub fn get_payloads(&mut self) -> Result<Vec<Payload>> { pub fn get_payloads(&mut self) -> Result<Vec<Payload>> {
use schema::payloads; use schema::payloads;
@ -113,7 +119,7 @@ impl UDB<'_> {
.map_err(with_err_ctx("Can't get payloads")) .map_err(with_err_ctx("Can't get payloads"))
} }
pub fn find_job_by_alias(&mut self, alias: &str) -> Result<Option<Job>> { pub fn get_job_by_alias(&mut self, alias: &str) -> Result<Option<Job>> {
use schema::{jobs, payloads}; use schema::{jobs, payloads};
let maybe_job_with_payload = jobs::table let maybe_job_with_payload = jobs::table
@ -303,6 +309,13 @@ impl UDB<'_> {
Ok(()) Ok(())
} }
pub fn update_payload(&mut self, payload: &Payload) -> Result<()> {
payload
.save_changes::<Payload>(self.conn)
.map_err(with_err_ctx(format!("Can't update payload {payload:?}")))?;
Ok(())
}
pub fn update_result(&mut self, result: &AssignedJob) -> Result<()> { pub fn update_result(&mut self, result: &AssignedJob) -> Result<()> {
debug!( debug!(
"updating result: id = {}, job_id = {}, agent_id = {}", "updating result: id = {}, job_id = {}, agent_id = {}",

@ -1,6 +1,6 @@
use diesel::result::Error as DslError; use diesel::result::Error as DslError;
use thiserror::Error; use thiserror::Error;
use u_lib::ufs; use u_lib::{ufs, UError};
use warp::{ use warp::{
http::StatusCode, http::StatusCode,
reject::Reject, reject::Reject,
@ -34,6 +34,9 @@ pub enum Error {
#[error("{0}\nContext: {1}")] #[error("{0}\nContext: {1}")]
Contexted(Box<Error>, String), Contexted(Box<Error>, String),
#[error(transparent)]
UError(#[from] UError),
#[error("Runtime error: {0}")] #[error("Runtime error: {0}")]
Runtime(String), Runtime(String),
} }

@ -83,15 +83,25 @@ impl Endpoints {
pub async fn get_payload( pub async fn get_payload(
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
id: Id, name_or_id: String,
params: Option<PayloadFlags>, params: Option<PayloadFlags>,
) -> EndpResult<retypes::GetPayload> { ) -> EndpResult<retypes::GetPayload> {
let Some(mut payload) = repo.interact(move |mut db| db.get_payload(id)).await? else { let mut payload = match repo
return Err(not_found()) .interact(move |mut db| match Id::parse_str(&name_or_id) {
Ok(id) => db.get_payload(id),
Err(_) => db.get_payload_by_name(name_or_id),
})
.await?
{
Some(p) => p,
None => return Err(not_found()),
}; };
Ok(match params.map(|p| p.brief) { Ok(match params.map(|p| p.brief) {
Some(BriefMode::Yes) => payload, Some(BriefMode::Yes) => {
payload.data = None;
payload
}
None | Some(BriefMode::Auto) => { None | Some(BriefMode::Auto) => {
payload.maybe_join_payload().map_err(Error::from)?; payload.maybe_join_payload().map_err(Error::from)?;
payload payload
@ -121,7 +131,7 @@ impl Endpoints {
db.upsert_agent(&new_agent)?; db.upsert_agent(&new_agent)?;
let job = db let job = db
.find_job_by_alias("agent_hello")? .get_job_by_alias("agent_hello")?
.expect("agent_hello job not found"); .expect("agent_hello job not found");
db.set_jobs_for_agent(id, &[job.job.id])?; db.set_jobs_for_agent(id, &[job.job.id])?;
@ -154,7 +164,33 @@ impl Endpoints {
}) })
.collect::<Result<Vec<Job>, Error>>()?; .collect::<Result<Vec<Job>, Error>>()?;
repo.interact(move |mut db| db.insert_jobs(&jobs)) let (jobs, payloads_opt): (Vec<_>, Vec<_>) =
jobs.into_iter().map(|j| (j.job, j.payload)).unzip();
let payloads = payloads_opt
.into_iter()
.filter_map(|p| p)
.collect::<Vec<_>>();
repo.transaction(move |mut db| {
db.insert_payloads(&payloads)?;
db.insert_jobs(&jobs)
})
.await
.map_err(From::from)
}
pub async fn upload_payloads(
repo: Arc<PgRepo>,
raw_payloads: Vec<RawPayload>,
) -> EndpResult<retypes::UploadPayloads> {
let payloads = raw_payloads
.into_iter()
.map(|raw| raw.into_payload())
.collect::<Result<Vec<_>, _>>()
.map_err(Error::from)?;
repo.interact(move |mut db| db.insert_payloads(&payloads))
.await .await
.map_err(From::from) .map_err(From::from)
} }
@ -185,7 +221,7 @@ impl Endpoints {
.into_iter() .into_iter()
.map(|ident| { .map(|ident| {
Id::parse_str(&ident).or_else(|_| { Id::parse_str(&ident).or_else(|_| {
let job_from_db = db.find_job_by_alias(&ident); let job_from_db = db.get_job_by_alias(&ident);
match job_from_db { match job_from_db {
Ok(job) => match job { Ok(job) => match job {
Some(j) => Ok(j.job.id), Some(j) => Ok(j.job.id),
@ -262,17 +298,15 @@ impl Endpoints {
} }
pub async fn update_agent(repo: Arc<PgRepo>, agent: Agent) -> EndpResult<retypes::UpdateAgent> { pub async fn update_agent(repo: Arc<PgRepo>, agent: Agent) -> EndpResult<retypes::UpdateAgent> {
repo.interact(move |mut db| db.upsert_agent(&agent)).await?; repo.interact(move |mut db| db.upsert_agent(&agent))
Ok(()) .await
} .map_err(From::from)
pub async fn update_job(repo: Arc<PgRepo>, mut job: Job) -> EndpResult<retypes::UpdateJob> {
if let Some(payload) = &mut job.payload {
payload.maybe_split_payload().map_err(Error::from)?;
} }
repo.interact(move |mut db| db.update_job(&job.job)).await?; pub async fn update_job(repo: Arc<PgRepo>, job: Job) -> EndpResult<retypes::UpdateJob> {
Ok(()) repo.interact(move |mut db| db.update_job(&job.job))
.await
.map_err(From::from)
} }
pub async fn update_assigned_job( pub async fn update_assigned_job(
@ -280,7 +314,27 @@ impl Endpoints {
assigned: AssignedJob, assigned: AssignedJob,
) -> EndpResult<retypes::UpdateResult> { ) -> EndpResult<retypes::UpdateResult> {
repo.interact(move |mut db| db.update_result(&assigned)) repo.interact(move |mut db| db.update_result(&assigned))
.await?; .await
Ok(()) .map_err(From::from)
}
pub async fn update_payload(
repo: Arc<PgRepo>,
payload: Payload,
) -> EndpResult<retypes::UpdatePayload> {
match payload.data {
Some(data) => {
let mut well_formed_payload =
Payload::from_data(data, Some(&payload.name)).map_err(Error::from)?;
well_formed_payload.id = payload.id;
debug!("wf payload: {well_formed_payload:?}");
repo.interact(move |mut db| db.update_payload(&well_formed_payload))
.await
.map_err(From::from)
}
None => return Ok(()),
}
} }
} }

@ -22,7 +22,7 @@ use warp::{
Filter, Rejection, Reply, Filter, Rejection, Reply,
}; };
const DEFAULT_RESP: &str = "null"; const DEFAULT_RESPONSE: &str = "null";
use crate::handlers::{Endpoints, PayloadFlags}; use crate::handlers::{Endpoints, PayloadFlags};
@ -132,14 +132,26 @@ pub fn init_endpoints(
let get_payload = path("get_payload") let get_payload = path("get_payload")
.and(with_db.clone()) .and(with_db.clone())
.and(warp::path::param::<Id>()) .and(warp::path::param::<String>())
.and(make_optional(serde_qs::warp::query::<PayloadFlags>( .and(make_optional(serde_qs::warp::query::<PayloadFlags>(
create_qs_cfg(), create_qs_cfg(),
))) )))
.and_then(Endpoints::get_payload) .and_then(Endpoints::get_payload)
.map(into_message); .map(into_message);
let ping = path("ping").map(|| DEFAULT_RESP); let upload_payloads = path("upload_payloads")
.and(with_db.clone())
.and(body::json::<Vec<RawPayload>>())
.and_then(Endpoints::upload_payloads)
.map(ok);
let update_payload = path("update_payload")
.and(with_db.clone())
.and(body::json::<Payload>())
.and_then(Endpoints::update_payload)
.map(ok);
let ping = path("ping").map(|| DEFAULT_RESPONSE);
let auth_token = format!("Bearer {auth_token}",).into_boxed_str(); let auth_token = format!("Bearer {auth_token}",).into_boxed_str();
let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); let auth_header = warp::header::exact("authorization", Box::leak(auth_token));
@ -147,13 +159,17 @@ pub fn init_endpoints(
let auth_zone = (get_agents let auth_zone = (get_agents
.or(get_job.clone()) .or(get_job.clone())
.or(get_jobs.clone()) .or(get_jobs.clone())
.or(get_payloads)
.or(get_payload)
.or(upload_jobs) .or(upload_jobs)
.or(upload_payloads)
.or(del) .or(del)
.or(set_jobs) .or(set_jobs)
.or(get_assigned_jobs) .or(get_assigned_jobs)
.or(update_agent.or(update_job).or(update_assigned_job)) .or(update_agent)
.or(get_payloads) .or(update_job)
.or(get_payload) .or(update_assigned_job)
.or(update_payload)
.or(ping)) .or(ping))
.and(auth_header); .and(auth_header);
@ -165,14 +181,14 @@ pub fn init_endpoints(
pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> { pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> {
repo.interact(|mut db| { repo.interact(|mut db| {
let job_alias = "agent_hello"; let job_alias = "agent_hello";
let if_job_exists = db.find_job_by_alias(job_alias)?; let if_job_exists = db.get_job_by_alias(job_alias)?;
if if_job_exists.is_none() { if if_job_exists.is_none() {
let agent_hello = RawJob::builder() let agent_hello = RawJob::builder()
.with_type(JobType::Init) .with_type(JobType::Init)
.with_alias(job_alias) .with_alias(job_alias)
.build() .build()
.unwrap(); .unwrap();
db.insert_jobs(&[agent_hello])?; db.insert_jobs(&[agent_hello.job])?;
} }
Ok(()) Ok(())
}) })
@ -228,13 +244,13 @@ fn logger(info: Info<'_>) {
.take(2) .take(2)
.collect::<String>() .collect::<String>()
) )
.unwrap_or_else(|| "NO_AGENT".to_string()), .unwrap_or_else(|| "NO_AGENT_UID".to_string()),
status = info.status() status = info.status()
); );
} }
fn ok<T>(_: T) -> impl Reply { fn ok<T>(_: T) -> impl Reply {
DEFAULT_RESP DEFAULT_RESPONSE
} }
/* /*

@ -1,3 +1,3 @@
FROM alpine:3.17 FROM alpine:3.17
RUN apk add iproute2 bash RUN apk add iproute2 bash file

@ -28,7 +28,7 @@ services:
- ../.env - ../.env
- ../.env.private - ../.env.private
environment: environment:
RUST_LOG: warp=info,u_server_lib=debug RUST_LOG: warp=info,u_server_lib=debug,u_lib=debug
healthcheck: healthcheck:
test: ss -tlpn | grep 63714 test: ss -tlpn | grep 63714
interval: 5s interval: 5s
@ -100,5 +100,5 @@ services:
- ../.env.private - ../.env.private
environment: environment:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
RUST_LOG: debug,hyper=info,reqwest=info RUST_LOG: hyper=info,reqwest=info
U_SERVER: u_server U_SERVER: u_server

@ -1,46 +0,0 @@
// get_personal_jobs(&self, url_param: Id)
// report(&self, payload: impl OneOrVec<messaging::Reportable>)
// dl(&self, file: String)
// get_job(&self, job: Id)
// get_jobs(&self)
// get_agents(&self, agent: Option<Id>)
// update_agent(&self, agent: Agent)
// update_job(&self, job: FatJob)
// update_result(&self, result: AssignedJob)
// upload_jobs(&self, payload: impl OneOrVec<FatJob>)
// del(&self, item: Id)
// set_jobs(&self, agent: Id, job_idents: impl OneOrVec<String>)
// get_agent_jobs(&self, agent: Option<Id>)
// ping(&self)
use crate::fixtures::connections::*;
use u_lib::models::RawJob;
#[rstest]
#[tokio::test]
async fn test_jobs_endpoints(client_panel: &HttpClient) {
let job_alias = "henlo";
let mut job = RawJob::builder()
.with_shell("echo henlo")
.with_alias(job_alias)
.build()
.unwrap();
let job_id = job.job.id;
client_panel.upload_jobs([&job]).await.unwrap();
let fetched_job = client_panel.get_brief_job(job_id).await.unwrap();
assert_eq!(job, fetched_job);
job.job.alias = Some("henlo2".to_string());
client_panel.update_job(&job).await.unwrap();
let fetched_job = client_panel.get_brief_job(job_id).await.unwrap();
assert_eq!(job, fetched_job);
client_panel.del(job_id).await.unwrap();
let not_found_err = client_panel.get_brief_job(job_id).await.unwrap_err();
assert!(not_found_err.to_string().contains("404 Not Found"))
}

@ -23,7 +23,7 @@ async fn setup_tasks() {
let job_alias = "passwd_contents"; let job_alias = "passwd_contents";
let job = RawJob::builder() let job = RawJob::builder()
.with_alias(job_alias) .with_alias(job_alias)
.with_raw_payload(b"cat /etc/passwd".as_slice()) .with_raw_payload("cat /etc/passwd")
.with_shell("/bin/bash {}") .with_shell("/bin/bash {}")
.with_target_platforms("*linux*") .with_target_platforms("*linux*")
.build() .build()

@ -0,0 +1,112 @@
// get_personal_jobs(&self, url_param: Id)
// report(&self, payload: impl OneOrVec<messaging::Reportable>)
// dl(&self, file: String)
// get_job(&self, job: Id)
// get_jobs(&self)
// get_agents(&self, agent: Option<Id>)
// update_agent(&self, agent: Agent)
// update_job(&self, job: FatJob)
// update_result(&self, result: AssignedJob)
// upload_jobs(&self, payload: impl OneOrVec<FatJob>)
// del(&self, item: Id)
// set_jobs(&self, agent: Id, job_idents: impl OneOrVec<String>)
// get_agent_jobs(&self, agent: Option<Id>)
// ping(&self)
use crate::fixtures::connections::*;
use std::iter::repeat;
use u_lib::models::{BriefMode, RawJob, RawPayload, MAX_READABLE_PAYLOAD_SIZE};
#[rstest]
#[tokio::test]
async fn jobs_upload_update_get_del(client_panel: &HttpClient) {
let job_alias = "henlo";
let mut job = RawJob::builder()
.with_shell("/bin/bash {}")
.with_raw_payload("echo henlo")
.with_alias(job_alias)
.build()
.unwrap();
let job_id = job.job.id;
client_panel.upload_jobs([&job]).await.unwrap();
let fetched_job = client_panel.get_full_job(job_id).await.unwrap();
assert_eq!(job, fetched_job);
// update job's payload by edit existing does nothing,
// editing is only allowed from payload itself
*job.payload.as_mut().unwrap().data.as_mut().unwrap() = b"echo henlo2".to_vec();
client_panel.update_job(&job).await.unwrap();
let fetched_job = client_panel.get_full_job(job_id).await.unwrap();
assert_eq!(
fetched_job.payload.as_ref().unwrap().data.as_ref().unwrap(),
b"echo henlo"
);
client_panel.del(job_id).await.unwrap();
let not_found_err = client_panel.get_brief_job(job_id).await.unwrap_err();
assert!(not_found_err.to_string().contains("404 Not Found"))
}
#[rstest]
#[tokio::test]
async fn payloads_upload_update_get_del(client_panel: &HttpClient) {
let name = "test1".to_string();
let data = b"qweasdzxc".to_vec();
let payload = RawPayload {
name: name.clone(),
data: data.clone(),
};
client_panel.upload_payloads([&payload]).await.unwrap();
let mut fetched_payload = client_panel
.get_payload(&name, BriefMode::No)
.await
.unwrap();
let fetched_payload_auto = client_panel
.get_payload(&name, BriefMode::Auto)
.await
.unwrap();
assert_eq!(fetched_payload, fetched_payload_auto);
assert_eq!(fetched_payload.data.unwrap(), data);
let new_size = MAX_READABLE_PAYLOAD_SIZE + 1;
let big_data = repeat(1u8).take(new_size as usize).collect::<Vec<_>>();
fetched_payload.data = Some(big_data.clone());
client_panel.update_payload(&fetched_payload).await.unwrap();
let fetched_big_payload = client_panel
.get_payload(&name, BriefMode::Yes)
.await
.unwrap();
let fetched_big_payload_auto = client_panel
.get_payload(&name, BriefMode::Auto)
.await
.unwrap();
assert_eq!(fetched_big_payload, fetched_big_payload_auto);
assert_eq!(fetched_big_payload.size, new_size);
assert!(fetched_big_payload.data.is_none());
let fetched_big_payload_full = client_panel
.get_payload(&name, BriefMode::No)
.await
.unwrap();
assert_eq!(fetched_big_payload_full.data.unwrap(), big_data);
client_panel.del(fetched_big_payload_full.id).await.unwrap();
let not_found_err = client_panel
.get_payload(&name, BriefMode::Yes)
.await
.unwrap_err();
assert!(not_found_err.to_string().contains("404 Not Found"))
}

@ -1,3 +1,3 @@
mod api;
mod behaviour; mod behaviour;
mod connection; mod connection;
mod endpoints;

@ -31,7 +31,9 @@ pub mod retypes {
pub type UpdateAgent = (); pub type UpdateAgent = ();
pub type UpdateJob = (); pub type UpdateJob = ();
pub type UpdateResult = (); pub type UpdateResult = ();
pub type UpdatePayload = ();
pub type UploadJobs = (); pub type UploadJobs = ();
pub type UploadPayloads = ();
pub type Del = (); pub type Del = ();
pub type SetJobs = Vec<Id>; pub type SetJobs = Vec<Id>;
pub type GetAgentJobs = Vec<AssignedJob>; pub type GetAgentJobs = Vec<AssignedJob>;
@ -199,12 +201,24 @@ impl HttpClient {
self.req_with_payload("update_result", result).await self.req_with_payload("update_result", result).await
} }
pub async fn update_payload(&self, payload: &Payload) -> Result<retypes::UpdatePayload> {
self.req_with_payload("update_payload", payload).await
}
/// create and upload job /// create and upload job
pub async fn upload_jobs( pub async fn upload_jobs(
&self, &self,
payload: impl IntoIterator<Item = &Job>, jobs: impl IntoIterator<Item = &Job>,
) -> Result<retypes::UploadJobs> { ) -> Result<retypes::UploadJobs> {
self.req_with_payload("upload_jobs", &payload.into_iter().collect::<Vec<_>>()) self.req_with_payload("upload_jobs", &jobs.into_iter().collect::<Vec<_>>())
.await
}
pub async fn upload_payloads(
&self,
payload: impl IntoIterator<Item = &RawPayload>,
) -> Result<retypes::UploadPayloads> {
self.req_with_payload("upload_payloads", &payload.into_iter().collect::<Vec<_>>())
.await .await
} }
@ -239,7 +253,12 @@ impl HttpClient {
self.req("get_payloads").await self.req("get_payloads").await
} }
pub async fn get_payload(&self, payload: Id, brief: BriefMode) -> Result<retypes::GetPayload> { pub async fn get_payload(
&self,
payload: impl AsRef<str>,
brief: BriefMode,
) -> Result<retypes::GetPayload> {
let payload = payload.as_ref();
self.req(format!("get_payload/{payload}?brief={brief}")) self.req(format!("get_payload/{payload}?brief={brief}"))
.await .await
} }

@ -10,10 +10,12 @@ pub fn init_logger(logfile: Option<&str>) {
env::set_var("RUST_LOG", "info") env::set_var("RUST_LOG", "info")
} }
let layer = fmt::layer().with_line_number(true);
let output_layer = if cfg!(test) { let output_layer = if cfg!(test) {
fmt::layer().with_test_writer().boxed() layer.with_test_writer().boxed()
} else { } else {
fmt::layer().with_writer(stderr).boxed() layer.with_writer(stderr).boxed()
}; };
let reg = registry() let reg = registry()

@ -13,6 +13,7 @@ impl AsMsg for AssignedJobById {}
impl AsMsg for JobModel {} impl AsMsg for JobModel {}
impl AsMsg for Reportable {} impl AsMsg for Reportable {}
impl AsMsg for Payload {} impl AsMsg for Payload {}
impl AsMsg for RawPayload {}
impl AsMsg for Job {} impl AsMsg for Job {}
impl AsMsg for Id {} impl AsMsg for Id {}
impl AsMsg for String {} impl AsMsg for String {}

@ -27,7 +27,7 @@ pub struct JobModel {
pub exec_type: JobType, pub exec_type: JobType,
/// target triple /// target triple
pub target_platforms: String, pub target_platforms: String,
pub payload: Option<Id>, pub payload_id: Option<Id>,
/// cron-like string /// cron-like string
pub schedule: Option<String>, pub schedule: Option<String>,
} }
@ -144,8 +144,8 @@ impl<'p> JobBuilder<'p> {
self self
} }
pub fn with_raw_payload(mut self, raw_payload: impl Into<Cow<'p, [u8]>>) -> Self { pub fn with_raw_payload(mut self, raw_payload: impl AsPayload<'p>) -> Self {
self.inner.raw_payload = Some(raw_payload.into()); self.inner.raw_payload = Some(raw_payload.as_payload());
self.inner.payload_path = None; self.inner.payload_path = None;
self self
} }
@ -185,7 +185,7 @@ impl<'p> JobBuilder<'p> {
if payload_from_path.is_none() { if payload_from_path.is_none() {
job.raw_payload job.raw_payload
.as_ref() .as_ref()
.map(|data| Payload::from_data(data)) .map(|data| Payload::from_data(data, None))
.transpose()? .transpose()?
} else { } else {
payload_from_path payload_from_path
@ -199,7 +199,7 @@ impl<'p> JobBuilder<'p> {
id: job.id, id: job.id,
exec_type: job.exec_type, exec_type: job.exec_type,
target_platforms: job.target_platforms, target_platforms: job.target_platforms,
payload: payload.as_ref().map(|p| p.id), payload_id: payload.as_ref().map(|p| p.id),
schedule: job.schedule, schedule: job.schedule,
}, },
payload, payload,
@ -271,3 +271,25 @@ impl<'p> JobBuilder<'p> {
} }
} }
} }
pub trait AsPayload<'p> {
fn as_payload(&self) -> Cow<'p, [u8]>;
}
impl<'p> AsPayload<'p> for &'p str {
fn as_payload(&self) -> Cow<'p, [u8]> {
Cow::Borrowed(self.as_bytes())
}
}
impl<'p, const N: usize> AsPayload<'p> for &'p [u8; N] {
fn as_payload(&self) -> Cow<'p, [u8]> {
Cow::Borrowed(*self)
}
}
impl<'p> AsPayload<'p> for &'p [u8] {
fn as_payload(&self) -> Cow<'p, [u8]> {
Cow::Borrowed(self)
}
}

@ -3,17 +3,30 @@ use anyhow::{Context, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fs::File, path::Path, process::Command}; use std::{fs::File, path::Path, process::Command};
const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32; pub const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32;
#[cfg(feature = "server")] #[cfg(feature = "server")]
use crate::models::schema::*; use crate::models::schema::*;
#[cfg(feature = "server")] #[cfg(feature = "server")]
use diesel::Identifiable; use diesel::Identifiable;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RawPayload {
pub name: String,
pub data: Vec<u8>,
}
impl RawPayload {
pub fn into_payload(self) -> Result<Payload, UError> {
Payload::from_data(self.data, Some(&self.name))
}
}
#[cfg_attr( #[cfg_attr(
feature = "server", feature = "server",
derive(Insertable, Queryable, Identifiable), derive(Insertable, Queryable, Identifiable, AsChangeset),
diesel(table_name = payloads) diesel(table_name = payloads),
diesel(treat_none_as_null = true)
)] )]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Payload { pub struct Payload {
@ -21,7 +34,7 @@ pub struct Payload {
pub mime_type: String, pub mime_type: String,
pub name: String, pub name: String,
pub size: i64, pub size: i64,
data: Option<Vec<u8>>, // when None, payload data is stored in ufs pub data: Option<Vec<u8>>, // when None, payload data is stored in ufs
} }
impl Payload { impl Payload {
@ -58,21 +71,27 @@ impl Payload {
Ok(()) Ok(())
} }
pub fn from_data(data: impl AsRef<[u8]>) -> Result<Payload, UError> { pub fn from_data(data: impl AsRef<[u8]>, name: Option<&str>) -> Result<Payload, UError> {
let name = ufs::create_anonymous(data)?; let name = match name {
let meta = ufs::read_meta(&name)?; Some(name) => {
ufs::put(name, data).context("fr_put")?;
name.to_string()
}
None => ufs::create_anonymous(data).context("fr_anon")?,
};
let meta = ufs::read_meta(&name).context("fr_me")?;
debug!("from_data {meta:?}");
let mut payload = Payload { let mut payload = Payload {
id: Id::new_v4(), id: Id::new_v4(),
mime_type: get_mime_type(&meta.path)?, mime_type: get_mime_type(&meta.path).context(format!("fr_mi {:?}", &meta.path))?,
name: name.clone(), name,
size: meta.size as i64, size: meta.size as i64,
data: None, data: None,
}; };
if payload.is_human_readable() { payload.maybe_join_payload().context("fr_ma")?;
payload.join_payload()?;
}
Ok(payload) Ok(payload)
} }
@ -149,11 +168,15 @@ impl Payload {
} }
fn get_mime_type(path: impl AsRef<Path>) -> Result<String, UError> { fn get_mime_type(path: impl AsRef<Path>) -> Result<String, UError> {
let path = path.as_ref();
debug!("mime of {path:?}");
Ok(bytes_to_string( Ok(bytes_to_string(
&Command::new("file") &Command::new("file")
.arg("-b") .arg("-b")
.arg("--mime-type") .arg("--mime-type")
.arg(path.as_ref()) .arg(path)
.output() .output()
.map_err(|e| UError::IOError(e.to_string()))? .map_err(|e| UError::IOError(e.to_string()))?
.stdout, .stdout,

@ -46,7 +46,7 @@ diesel::table! {
id -> Uuid, id -> Uuid,
exec_type -> Jobtype, exec_type -> Jobtype,
target_platforms -> Text, target_platforms -> Text,
payload -> Nullable<Uuid>, payload_id -> Nullable<Uuid>,
schedule -> Nullable<Text>, schedule -> Nullable<Text>,
} }
} }
@ -82,7 +82,7 @@ diesel::table! {
} }
} }
diesel::joinable!(jobs -> payloads (payload)); diesel::joinable!(jobs -> payloads (payload_id));
diesel::joinable!(results -> agents (agent_id)); diesel::joinable!(results -> agents (agent_id));
diesel::joinable!(results -> jobs (job_id)); diesel::joinable!(results -> jobs (job_id));

@ -15,7 +15,7 @@ pub use error::Error;
const OBFUSCATE: bool = cfg!(feature = "agent"); const OBFUSCATE: bool = cfg!(feature = "agent");
#[derive(Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileMeta { pub struct FileMeta {
extension: Option<OsString>, extension: Option<OsString>,
external: bool, // if file is present before adding to index external: bool, // if file is present before adding to index
@ -86,9 +86,7 @@ pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
let name = name.as_ref(); let name = name.as_ref();
let data_hash = hash_data(&data); let data_hash = hash_data(&data);
if exists_in_index(&name) { debug!("put: {name}");
return Err(Error::already_exists(&name)).context("put_exists");
}
let path = match index::get_by_hash(&data_hash) { let path = match index::get_by_hash(&data_hash) {
Some((_, meta)) => meta.path, Some((_, meta)) => meta.path,
@ -105,6 +103,8 @@ pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
path path
}; };
debug!("put: path: {path:?}");
fs::write(&path, &data) fs::write(&path, &data)
.map_err(|e| Error::new(e, name)) .map_err(|e| Error::new(e, name))
.context("put_write")?; .context("put_write")?;

@ -38,10 +38,10 @@ CREATE TABLE IF NOT EXISTS jobs (
id UUID NOT NULL, id UUID NOT NULL,
exec_type JobType NOT NULL DEFAULT 'shell', exec_type JobType NOT NULL DEFAULT 'shell',
target_platforms TEXT NOT NULL, target_platforms TEXT NOT NULL,
payload UUID, payload_id UUID,
schedule TEXT, schedule TEXT,
FOREIGN KEY(payload) REFERENCES payloads(id) ON DELETE SET NULL, FOREIGN KEY(payload_id) REFERENCES payloads(id) ON DELETE SET NULL,
PRIMARY KEY(id) PRIMARY KEY(id)
); );

Loading…
Cancel
Save