initial payload table impl

pull/9/head
plazmoid 2 years ago
parent 7e267b0074
commit ce708d0c98
  1. 2
      Cargo.lock
  2. 1
      Cargo.toml
  3. 11
      bin/u_agent/src/lib.rs
  4. 2
      bin/u_panel/Cargo.toml
  5. 18
      bin/u_panel/src/argparse.rs
  6. 1
      bin/u_server/Cargo.toml
  7. 131
      bin/u_server/src/db.rs
  8. 94
      bin/u_server/src/handlers.rs
  9. 22
      bin/u_server/src/u_server.rs
  10. 1
      integration/Cargo.toml
  11. 53
      integration/tests/fixtures/agent.rs
  12. 28
      integration/tests/fixtures/connections.rs
  13. 16
      integration/tests/fixtures/env.rs
  14. 8
      integration/tests/fixtures/mod.rs
  15. 5
      integration/tests/helpers/mod.rs
  16. 1
      integration/tests/integration/api.rs
  17. 11
      integration/tests/integration/behaviour.rs
  18. 7
      integration/tests/integration/connection.rs
  19. 18
      lib/u_lib/src/api.rs
  20. 11
      lib/u_lib/src/cache.rs
  21. 9
      lib/u_lib/src/db.rs
  22. 193
      lib/u_lib/src/jobs.rs
  23. 10
      lib/u_lib/src/messaging.rs
  24. 8
      lib/u_lib/src/messaging/files.rs
  25. 2
      lib/u_lib/src/misc.rs
  26. 10
      lib/u_lib/src/models/jobs/assigned.rs
  27. 209
      lib/u_lib/src/models/jobs/meta.rs
  28. 90
      lib/u_lib/src/models/payload.rs
  29. 27
      lib/u_lib/src/models/schema.rs
  30. 88
      lib/u_lib/src/ufs/mod.rs
  31. 9
      migrations/2020-10-24-111622_create_all/down.sql
  32. 27
      migrations/2020-10-24-111622_create_all/up.sql

2
Cargo.lock generated

@ -1195,6 +1195,7 @@ dependencies = [
name = "integration"
version = "0.1.0"
dependencies = [
"futures",
"once_cell",
"reqwest",
"rstest",
@ -2690,6 +2691,7 @@ dependencies = [
"deadpool-diesel",
"diesel",
"hyper",
"mime_guess",
"once_cell",
"openssl",
"rstest",

@ -13,6 +13,7 @@ members = [
anyhow = "=1.0.63"
deadpool-diesel = "0.4.0"
diesel = { version = "2", features = ["postgres", "uuid"] }
mime_guess = "2.0"
openssl = "0.10"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }

@ -10,7 +10,7 @@ use u_lib::{
config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL},
error::ErrChan,
executor::pop_completed,
jobs::{fat_meta_to_thin, AnonymousJobBatch},
jobs::{split_payload, AnonymousJobBatch},
logging::init_logger,
messaging::Reportable,
models::AssignedJobById,
@ -30,8 +30,8 @@ pub async fn process_request(jobs: Vec<AssignedJobById>, client: &ClientHandler)
}
}
};
match fat_meta_to_thin(fetched_job) {
Ok(thin_meta) => JobCache::insert(thin_meta),
match split_payload(fetched_job) {
Ok(job_payload_meta) => JobCache::insert(job_payload_meta),
Err(e) => ErrChan::send(e, "pld").await,
}
}
@ -109,7 +109,7 @@ pub fn run_forever() -> ! {
let env = EndpointsEnv::load();
if cfg!(debug_assertions) {
init_logger(Some(format!(
let logfile_uid = format!(
"u_agent-{}",
get_self_id()
.hyphenated()
@ -117,7 +117,8 @@ pub fn run_forever() -> ! {
.split("-")
.next()
.unwrap()
)));
);
init_logger(Some(logfile_uid));
} else {
#[cfg(unix)]
u_lib::unix::daemonize()

@ -11,7 +11,7 @@ actix-cors = "0.6.1"
actix-web = "4.1"
anyhow = { workspace = true }
futures-util = "0.3.21"
mime_guess = "2.0.4"
mime_guess = { workspace = true }
once_cell = "1.8.0"
rust-embed = { version = "6.3.0", features = ["debug-embed", "compression"] }
serde = { workspace = true }

@ -2,8 +2,9 @@ use serde_json::{from_str, to_value, Value};
use structopt::StructOpt;
use u_lib::{
api::ClientHandler,
jobs::join_payload,
messaging::AsMsg,
models::{Agent, AssignedJob, FatJobMeta},
models::{Agent, AssignedJob, RawJob},
types::Id,
types::PanelResult,
UError, UResult,
@ -87,14 +88,11 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult<Value
},
Cmd::Jobs(action) => match action {
JobCRUD::Create { job } => {
let raw_job = from_str::<FatJobMeta>(&job)?;
let mut job = raw_job.validated()?;
if let Some(payload) = &mut job.payload {
payload.read_into_self()?;
}
let raw_job = from_str::<RawJob>(&job)?;
let job = raw_job.validated()?;
let fat_job = join_payload(job)?;
into_value(client.upload_jobs(job).await?)
into_value(client.upload_jobs(fat_job).await?)
}
JobCRUD::RUD(RUD::Read { id }) => match id {
//todo: use vec not to break frontend api, possibly refactor later
@ -102,9 +100,9 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult<Value
None => into_value(client.get_jobs().await?),
},
JobCRUD::RUD(RUD::Update { item }) => {
let raw_job = from_str::<FatJobMeta>(&item)?;
let raw_job = from_str::<RawJob>(&item)?;
let job = raw_job.validated()?;
into_value(client.update_job(job).await?)
into_value(client.update_job(join_payload(job)?).await?)
}
JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
},

@ -9,6 +9,7 @@ anyhow = { workspace = true }
diesel = { workspace = true }
deadpool-diesel = { workspace = true }
hyper = "0.14"
mime_guess = { workspace = true }
once_cell = "1.7.2"
openssl = { workspace = true }
serde = { workspace = true }

@ -1,7 +1,8 @@
use crate::error::Error;
use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection};
use std::mem::drop;
use u_lib::db::PgAsyncPool;
use u_lib::models::{schema, Agent, AssignedJob, JobState, ThinJobMeta};
use u_lib::models::{schema, Agent, AssignedJob, JobModel, JobState, PayloadMeta, ThinJob};
use u_lib::platform::Platform;
use u_lib::types::Id;
@ -47,27 +48,46 @@ pub struct UDB<'c> {
}
impl UDB<'_> {
pub fn insert_jobs(&mut self, job_metas: &[ThinJobMeta]) -> Result<Vec<Id>> {
use schema::jobs;
pub fn insert_jobs(&mut self, jobs: &[ThinJob]) -> Result<()> {
use schema::{jobs, payloads};
let (jobs, payloads_opt): (Vec<_>, Vec<_>) = jobs
.iter()
.map(|j| (&j.job, j.payload_meta.as_ref()))
.unzip();
let payloads = payloads_opt
.into_iter()
.filter_map(|p| p)
.collect::<Vec<_>>();
diesel::insert_into(payloads::table)
.values(payloads)
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx("Can't insert payloads"))?;
diesel::insert_into(jobs::table)
.values(job_metas)
.get_results(self.conn)
.map(|rows| rows.iter().map(|job: &ThinJobMeta| job.id).collect())
.values(jobs)
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx("Can't insert jobs"))
}
pub fn get_job(&mut self, id: Id) -> Result<Option<ThinJobMeta>> {
use schema::jobs;
pub fn get_job(&mut self, id: Id) -> Result<Option<ThinJob>> {
use schema::{jobs, payloads};
jobs::table
let maybe_job_with_payload = jobs::table
.left_join(payloads::table)
.filter(jobs::id.eq(id))
.first(self.conn)
.first::<(JobModel, Option<PayloadMeta>)>(self.conn)
.optional()
.map_err(with_err_ctx(format!("Can't get job {id}")))
.map_err(with_err_ctx(format!("Can't get job {id}")))?;
Ok(maybe_job_with_payload.map(|(job, payload_meta)| ThinJob { job, payload_meta }))
}
pub fn get_jobs(&mut self) -> Result<Vec<ThinJobMeta>> {
pub fn get_jobs(&mut self) -> Result<Vec<JobModel>> {
use schema::jobs;
jobs::table
@ -75,27 +95,17 @@ impl UDB<'_> {
.map_err(with_err_ctx("Can't get jobs"))
}
pub fn find_job_by_alias(&mut self, alias: &str) -> Result<Option<ThinJobMeta>> {
use schema::jobs;
pub fn find_job_by_alias(&mut self, alias: &str) -> Result<Option<ThinJob>> {
use schema::{jobs, payloads};
jobs::table
let maybe_job_with_payload = jobs::table
.left_join(payloads::table)
.filter(jobs::alias.eq(alias))
.first(self.conn)
.first::<(JobModel, Option<PayloadMeta>)>(self.conn)
.optional()
.map_err(with_err_ctx(format!("Can't find job by alias {alias}")))
}
pub fn insert_agent(&mut self, agent: &Agent) -> Result<()> {
use schema::agents;
.map_err(with_err_ctx(format!("Can't get job by alias {alias}")))?;
diesel::insert_into(agents::table)
.values(agent)
.on_conflict(agents::id)
.do_update()
.set(agent)
.execute(self.conn)
.map_err(with_err_ctx(format!("Can't insert agent {agent:?}")))?;
Ok(())
Ok(maybe_job_with_payload.map(|(job, payload_meta)| ThinJob { job, payload_meta }))
}
pub fn insert_result(&mut self, result: &AssignedJob) -> Result<()> {
@ -163,7 +173,7 @@ impl UDB<'_> {
Ok(result)
}
pub fn set_jobs_for_agent(&mut self, agent_id: Id, job_ids: &[Id]) -> Result<Vec<Id>> {
pub fn set_jobs_for_agent(&mut self, agent_id: Id, job_ids: &[Id]) -> Result<()> {
use schema::{jobs, results};
let agent_platform = match self.get_agent(agent_id)? {
@ -203,64 +213,51 @@ impl UDB<'_> {
diesel::insert_into(results::table)
.values(&job_requests)
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx(format!(
"Can't setup jobs {job_ids:?} for agent {agent_id:?}"
)))?;
Ok(job_requests.iter().map(|aj| aj.id).collect())
)))
}
pub fn del_jobs(&mut self, ids: &[Id]) -> Result<usize> {
pub fn del_jobs(&mut self, ids: &[Id]) -> Result<()> {
use schema::jobs;
let mut affected = 0;
for id in ids {
let deleted = diesel::delete(jobs::table)
.filter(jobs::id.eq(id))
.execute(self.conn)
.map_err(with_err_ctx("Can't delete jobs"))?;
affected += deleted;
}
Ok(affected)
diesel::delete(jobs::table)
.filter(jobs::id.eq_any(ids))
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx("Can't delete jobs"))
}
pub fn del_results(&mut self, ids: &[Id]) -> Result<usize> {
pub fn del_results(&mut self, ids: &[Id]) -> Result<()> {
use schema::results;
let mut affected = 0;
for id in ids {
let deleted = diesel::delete(results::table)
.filter(results::id.eq(id))
.execute(self.conn)
.map_err(with_err_ctx("Can't delete results"))?;
affected += deleted;
}
Ok(affected)
diesel::delete(results::table)
.filter(results::id.eq_any(ids))
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx("Can't delete results"))
}
pub fn del_agents(&mut self, ids: &[Id]) -> Result<usize> {
pub fn del_agents(&mut self, ids: &[Id]) -> Result<()> {
use schema::agents;
let mut affected = 0;
for id in ids {
let deleted = diesel::delete(agents::table)
.filter(agents::id.eq(id))
.execute(self.conn)
.map_err(with_err_ctx("Can't delete agents"))?;
affected += deleted;
}
Ok(affected)
diesel::delete(agents::table)
.filter(agents::id.eq_any(ids))
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx("Can't delete agents"))
}
pub fn update_agent(&mut self, agent: &Agent) -> Result<()> {
pub fn upsert_agent(&mut self, agent: &Agent) -> Result<()> {
agent
.save_changes::<Agent>(self.conn)
.map_err(with_err_ctx(format!("Can't update agent {agent:?}")))?;
.map_err(with_err_ctx(format!("Can't upsert agent {agent:?}")))?;
Ok(())
}
pub fn update_job(&mut self, job: &ThinJobMeta) -> Result<()> {
job.save_changes::<ThinJobMeta>(self.conn)
pub fn update_job(&mut self, job: &JobModel) -> Result<()> {
job.save_changes::<JobModel>(self.conn)
.map_err(with_err_ctx(format!("Can't update job {job:?}")))?;
Ok(())
}

@ -2,19 +2,25 @@ use std::sync::Arc;
use crate::db::{PgRepo, UDB};
use crate::error::Error;
use crate::ValidJobMeta;
use serde::Deserialize;
use u_lib::jobs::join_payload;
use u_lib::{
jobs::{fat_meta_to_thin, thin_meta_to_fat},
jobs::split_payload,
messaging::{AsMsg, Reportable},
misc::OneOrVec,
models::*,
types::Id
types::Id,
};
use warp::reject::not_found;
use warp::Rejection;
type EndpResult<T> = Result<T, Rejection>;
#[derive(Deserialize)]
pub struct GetJobQuery {
force_payload: bool,
}
pub struct Endpoints;
impl Endpoints {
@ -35,25 +41,32 @@ impl Endpoints {
.map_err(From::from)
}
pub async fn get_job(repo: Arc<PgRepo>, id: Id) -> EndpResult<ValidJobMeta> {
pub async fn get_job(repo: Arc<PgRepo>, id: Id, params: GetJobQuery) -> EndpResult<FatJob> {
let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else {
return Err(not_found())
};
let fat_meta = thin_meta_to_fat(job).map_err(Error::from)?;
Ok(fat_meta)
if let Some(meta) = &job.payload_meta {
let max_readable_payload_size = 1024 * 8;
if !params.force_payload && meta.size > max_readable_payload_size {
return Ok(FatJob {
job: job.job,
payload_meta: job.payload_meta,
payload_data: None,
});
}
}
Ok(join_payload(job).map_err(Error::from)?)
}
pub async fn get_jobs(repo: Arc<PgRepo>) -> EndpResult<Vec<ThinJobMeta>> {
pub async fn get_jobs(repo: Arc<PgRepo>) -> EndpResult<Vec<JobModel>> {
repo.interact(move |mut db| db.get_jobs())
.await
.map_err(From::from)
}
pub async fn get_agent_jobs(
repo: Arc<PgRepo>,
id: Option<Id>,
) -> EndpResult<Vec<AssignedJob>> {
pub async fn get_agent_jobs(repo: Arc<PgRepo>, id: Option<Id>) -> EndpResult<Vec<AssignedJob>> {
repo.interact(move |mut db| db.get_exact_jobs(id, false))
.await
.map_err(From::from)
@ -65,18 +78,18 @@ impl Endpoints {
match agent {
Some(mut agent) => {
agent.touch();
db.update_agent(&agent)?;
db.upsert_agent(&agent)?;
}
None => {
let new_agent = Agent::with_id(id);
db.insert_agent(&new_agent)?;
db.upsert_agent(&new_agent)?;
let job = db
.find_job_by_alias("agent_hello")?
.expect("agent_hello job not found");
db.set_jobs_for_agent(id, &[job.id])?;
db.set_jobs_for_agent(id, &[job.job.id])?;
}
}
@ -92,30 +105,23 @@ impl Endpoints {
.map_err(From::from)
}
pub async fn upload_jobs(
repo: Arc<PgRepo>,
msg: Vec<ValidJobMeta>,
) -> EndpResult<Vec<Id>> {
pub async fn upload_jobs(repo: Arc<PgRepo>, msg: Vec<FatJob>) -> EndpResult<()> {
let jobs = msg
.into_iter()
.map(|meta| Ok(fat_meta_to_thin(meta)?))
.collect::<Result<Vec<ThinJobMeta>, Error>>()?;
.map(|meta| Ok(split_payload(meta)?))
.collect::<Result<Vec<ThinJob>, Error>>()?;
repo.interact(move |mut db| db.insert_jobs(&jobs))
.await
.map_err(From::from)
}
pub async fn del(repo: Arc<PgRepo>, id: Id) -> EndpResult<usize> {
pub async fn del(repo: Arc<PgRepo>, id: Id) -> EndpResult<()> {
repo.transaction(move |mut db| {
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns {
let affected = del_fn(&mut db, &[id])?;
if affected > 0 {
return Ok(affected);
}
}
Ok(0)
[UDB::del_agents, UDB::del_jobs, UDB::del_results]
.iter()
.map(|f| f(&mut db, &[id]))
.collect::<Result<(), Error>>()
})
.await
.map_err(From::from)
@ -125,7 +131,7 @@ impl Endpoints {
repo: Arc<PgRepo>,
agent_id: Id,
job_idents: Vec<String>,
) -> EndpResult<Vec<Id>> {
) -> EndpResult<()> {
repo.transaction(move |mut db| {
job_idents
.into_iter()
@ -134,7 +140,7 @@ impl Endpoints {
let job_from_db = db.find_job_by_alias(&ident);
match job_from_db {
Ok(job) => match job {
Some(j) => Ok(j.id),
Some(j) => Ok(j.job.id),
None => {
Err(Error::ProcessingError(format!("unknown ident {ident}")))
}
@ -153,7 +159,7 @@ impl Endpoints {
pub async fn report<Data: OneOrVec<Reportable> + AsMsg + Send + Sync + 'static>(
repo: Arc<PgRepo>,
msg: Data,
agent_id: Id
agent_id: Id,
) -> EndpResult<()> {
repo.transaction(move |mut db| {
for entry in msg.into_vec() {
@ -182,7 +188,7 @@ impl Endpoints {
}
};
agent.state = AgentState::Active;
db.insert_agent(&agent)?;
db.upsert_agent(&agent)?;
}
None => error!("Empty agent data"),
}},
@ -206,29 +212,17 @@ impl Endpoints {
.map_err(From::from)
}
pub async fn update_agent(
repo: Arc<PgRepo>,
agent: Agent,
) -> EndpResult<()> {
repo.interact(move |mut db| db.update_agent(&agent))
.await?;
pub async fn update_agent(repo: Arc<PgRepo>, agent: Agent) -> EndpResult<()> {
repo.interact(move |mut db| db.upsert_agent(&agent)).await?;
Ok(())
}
pub async fn update_job(
repo: Arc<PgRepo>,
job: ValidJobMeta,
) -> EndpResult<()> {
let thin_meta = fat_meta_to_thin(job).map_err(Error::from)?;
repo.interact(move |mut db| db.update_job(&thin_meta))
.await?;
pub async fn update_job(repo: Arc<PgRepo>, job: JobModel) -> EndpResult<()> {
repo.interact(move |mut db| db.update_job(&job)).await?;
Ok(())
}
pub async fn update_assigned_job(
repo: Arc<PgRepo>,
assigned: AssignedJob,
) -> EndpResult<()> {
pub async fn update_assigned_job(repo: Arc<PgRepo>, assigned: AssignedJob) -> EndpResult<()> {
repo.interact(move |mut db| db.update_result(&assigned))
.await?;
Ok(())

@ -15,7 +15,6 @@ use std::{convert::Infallible, sync::Arc};
use u_lib::{
config,
db::async_pool,
jobs::fat_meta_to_thin,
messaging::{AsMsg, Reportable},
models::*,
types::Id,
@ -27,9 +26,7 @@ use warp::{
Filter, Rejection, Reply,
};
use crate::handlers::Endpoints;
type ValidJobMeta = FatJobMeta<true>;
use crate::handlers::{Endpoints, GetJobQuery};
fn into_message<M: AsMsg>(msg: M) -> Json {
json(&msg)
@ -55,13 +52,14 @@ pub fn init_endpoints(
let upload_jobs = path("upload_jobs")
.and(with_db.clone())
.and(body::json::<Vec<ValidJobMeta>>())
.and(body::json::<Vec<FatJob>>())
.and_then(Endpoints::upload_jobs)
.map(into_message);
let get_job = path("get_job")
.and(with_db.clone())
.and(warp::path::param::<Id>())
.and(warp::query::<GetJobQuery>())
.and_then(Endpoints::get_job)
.map(into_message);
@ -110,7 +108,7 @@ pub fn init_endpoints(
let update_job = path("update_job")
.and(with_db.clone())
.and(body::json::<ValidJobMeta>())
.and(body::json::<JobModel>())
.and_then(Endpoints::update_job)
.map(ok);
@ -156,13 +154,11 @@ pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> {
let job_alias = "agent_hello";
let if_job_exists = db.find_job_by_alias(job_alias)?;
if if_job_exists.is_none() {
let agent_hello = fat_meta_to_thin(
FatJobMeta::builder()
.with_type(JobType::Init)
.with_alias(job_alias)
.build()
.unwrap(),
)?;
let agent_hello = RawJob::builder()
.with_type(JobType::Init)
.with_alias(job_alias)
.build()
.unwrap();
db.insert_jobs(&[agent_hello])?;
}
Ok(())

@ -7,6 +7,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = { version = "0.3", features = ["executor"] }
once_cell = "1.10.0"
reqwest = { workspace = true }
rstest = "0.12"

@ -1,36 +1,35 @@
use crate::helpers::ENV;
use super::connections::*;
use super::run_async;
use u_lib::{
api::ClientHandler, config::get_self_id, jobs::fat_meta_to_thin, messaging::Reportable,
models::*, types::Id,
api::ClientHandler, config::get_self_id, jobs::split_payload, messaging::Reportable, models::*,
types::Id,
};
pub struct RegisteredAgent {
pub id: Id,
}
impl RegisteredAgent {
pub async fn unregister(self) {
let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap();
cli.del(self.id).await.unwrap();
}
}
#[fixture]
pub async fn register_agent() -> RegisteredAgent {
let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap();
let agent_id = get_self_id();
println!("registering agent {agent_id}");
let resp = cli
.get_personal_jobs(agent_id)
.await
.unwrap()
.pop()
.unwrap();
let job_id = resp.job_id;
let job = cli.get_job(job_id).await.unwrap();
assert_eq!(job.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from((&fat_meta_to_thin(job).unwrap(), resp));
agent_data.set_result(&Agent::with_id(agent_id));
cli.report(Reportable::Assigned(agent_data)).await.unwrap();
RegisteredAgent { id: agent_id }
#[once]
pub fn registered_agent(client: &ClientHandler) -> RegisteredAgent {
run_async(async {
let agent_id = get_self_id();
println!("registering agent {agent_id}");
let resp = client
.get_personal_jobs(agent_id)
.await
.unwrap()
.pop()
.unwrap();
let job_id = resp.job_id;
let job = client.get_job(job_id).await.unwrap();
assert_eq!(job.job.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from((&split_payload(job).unwrap().job, resp));
agent_data.set_result(&Agent::with_id(agent_id));
client
.report(Reportable::Assigned(agent_data))
.await
.unwrap();
RegisteredAgent { id: agent_id }
})
}

@ -0,0 +1,28 @@
use super::env::*;
use super::run_async;
pub use u_lib::api::ClientHandler;
use u_lib::db::unpooled;
pub use u_lib::db::PgConnection;
#[fixture]
#[once]
pub fn client(env_default: EndpointsEnv) -> ClientHandler {
run_async(ClientHandler::new(&env_default.u_server, None)).unwrap()
}
#[fixture]
#[once]
pub fn client_panel(env_access: AccessEnv) -> ClientHandler {
run_async(ClientHandler::new(
&env_access.u_server,
Some(env_access.admin_auth_token),
))
.unwrap()
}
#[fixture]
#[once]
pub fn db(env_db: DBEnv) -> PgConnection {
unpooled(&env_db)
}

@ -0,0 +1,16 @@
pub use u_lib::config::{AccessEnv, DBEnv, EndpointsEnv};
#[fixture]
pub fn env_default() -> EndpointsEnv {
EndpointsEnv::load()
}
#[fixture]
pub fn env_access() -> AccessEnv {
AccessEnv::load().unwrap()
}
#[fixture]
pub fn env_db() -> DBEnv {
DBEnv::load().unwrap()
}

@ -1 +1,9 @@
pub mod agent;
pub mod connections;
pub mod env;
use std::future::Future;
fn run_async<R>(fut: impl Future<Output = R>) -> R {
futures::executor::block_on(fut)
}

@ -2,8 +2,3 @@ pub mod jobs;
pub mod panel;
pub use panel::Panel;
use once_cell::sync::Lazy;
use u_lib::config::EndpointsEnv;
pub static ENV: Lazy<EndpointsEnv> = Lazy::new(|| EndpointsEnv::load());

@ -9,12 +9,11 @@ use uuid::Uuid;
#[rstest]
#[tokio::test]
async fn registration(#[future] register_agent: RegisteredAgent) {
let agent = register_agent.await;
async fn registration(registered_agent: &RegisteredAgent) {
let agents: Vec<Agent> = Panel::check_output("agents read");
let found = agents.iter().find(|v| v.id == agent.id);
let found = agents.iter().find(|v| v.id == registered_agent.id);
assert!(found.is_some());
Panel::check_status(format!("agents delete {}", agent.id));
Panel::check_status(format!("agents delete {}", registered_agent.id));
}
#[tokio::test]
@ -52,14 +51,14 @@ async fn large_payload() {
let job_alias = "large_payload";
let job = FatJobMeta::builder()
let job = RawJob::builder()
.with_alias(job_alias)
.with_payload_path("./tests/bin/echoer")
.with_shell("{} type echo")
.build()
.unwrap();
Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]);
Panel::check_status(["jobs", "create", &to_string(&RawJob::from(job)).unwrap()]);
let cmd = format!("map create {agent_id} {job_alias}");
let assigned_ids: Vec<Uuid> = Panel::check_output(cmd);

@ -1,14 +1,15 @@
use crate::helpers::ENV;
use crate::fixtures::env::*;
use u_lib::config::MASTER_PORT;
#[rstest]
#[tokio::test]
async fn non_auth_connection_dropped() {
async fn non_auth_connection_dropped(env_default: EndpointsEnv) {
let client = reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(true)
.build()
.unwrap();
match client
.get(format!("https://{}:{}", &ENV.u_server, MASTER_PORT))
.get(format!("https://{}:{}", &env_default.u_server, MASTER_PORT))
.send()
.await
{

@ -75,11 +75,11 @@ impl ClientHandler {
})
}
async fn req<R: AsMsg + DeserializeOwned + Default>(&self, url: impl AsRef<str>) -> Result<R> {
async fn req<R: AsMsg + DeserializeOwned>(&self, url: impl AsRef<str>) -> Result<R> {
self.req_with_payload(url, ()).await
}
async fn req_with_payload<P: AsMsg, R: AsMsg + DeserializeOwned + Default>(
async fn req_with_payload<P: AsMsg, R: AsMsg + DeserializeOwned>(
&self,
url: impl AsRef<str>,
payload: P,
@ -93,7 +93,6 @@ impl ClientHandler {
.send()
.await
.context("error while sending request")?;
let content_len = response.content_length();
let is_success = match response.error_for_status_ref() {
Ok(_) => Ok(()),
Err(e) => Err(UError::from(e)),
@ -101,10 +100,7 @@ impl ClientHandler {
let resp = response.text().await.context("resp")?;
let result = match is_success {
Ok(_) => from_str::<R>(&resp).or_else(|e| match content_len {
Some(0) => Ok(Default::default()),
_ => Err(UError::NetError(e.to_string(), resp)),
}),
Ok(_) => from_str::<R>(&resp).map_err(|e| UError::NetError(e.to_string(), resp)),
Err(UError::NetError(err, _)) => Err(UError::NetError(err, resp)),
_ => unreachable!(),
}
@ -131,12 +127,12 @@ impl ClientHandler {
}
/// get exact job
pub async fn get_job(&self, job: Id) -> Result<FatJobMeta<true>> {
pub async fn get_job(&self, job: Id) -> Result<FatJob> {
self.req(format!("get_job/{job}")).await
}
/// get all available jobs
pub async fn get_jobs(&self) -> Result<Vec<ThinJobMeta>> {
pub async fn get_jobs(&self) -> Result<Vec<JobModel>> {
self.req("get_jobs").await
}
}
@ -156,7 +152,7 @@ impl ClientHandler {
}
/// update job
pub async fn update_job(&self, job: FatJobMeta<true>) -> Result<()> {
pub async fn update_job(&self, job: FatJob) -> Result<()> {
self.req_with_payload("update_job", job).await
}
@ -166,7 +162,7 @@ impl ClientHandler {
}
/// create and upload job
pub async fn upload_jobs(&self, payload: impl OneOrVec<FatJobMeta<true>>) -> Result<Vec<Id>> {
pub async fn upload_jobs(&self, payload: impl OneOrVec<FatJob>) -> Result<Vec<Id>> {
self.req_with_payload("upload_jobs", payload.into_vec())
.await
}

@ -1,10 +1,11 @@
use crate::models::ThinJobMeta;
use crate::models::ThinJob;
use crate::types::Id;
use lazy_static::lazy_static;
use parking_lot::{RwLock, RwLockReadGuard};
use std::{collections::HashMap, ops::Deref};
type Cache = HashMap<Id, ThinJobMeta>;
type Val = ThinJob;
type Cache = HashMap<Id, Val>;
lazy_static! {
static ref JOB_CACHE: RwLock<Cache> = RwLock::new(HashMap::new());
@ -13,8 +14,8 @@ lazy_static! {
pub struct JobCache;
impl JobCache {
pub fn insert(job_meta: ThinJobMeta) {
JOB_CACHE.write().insert(job_meta.id, job_meta);
pub fn insert(job: Val) {
JOB_CACHE.write().insert(job.job.id, job);
}
pub fn contains(id: Id) -> bool {
@ -37,7 +38,7 @@ impl JobCache {
pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Id);
impl<'jh> Deref for JobCacheHolder<'jh> {
type Target = ThinJobMeta;
type Target = Val;
fn deref(&self) -> &Self::Target {
self.0.get(&self.1).unwrap()

@ -1,5 +1,6 @@
use deadpool_diesel::{Manager as DManager, Pool as DPool, Runtime};
use diesel::pg::PgConnection;
pub use diesel::pg::PgConnection;
use diesel::Connection;
use std::time::Duration;
use crate::config::DBEnv;
@ -24,3 +25,9 @@ pub fn async_pool(config: &DBEnv) -> PgAsyncPool {
.build()
.unwrap()
}
pub fn unpooled(config: &DBEnv) -> PgConnection {
let db_url = generate_postgres_url(config);
PgConnection::establish(&db_url).unwrap()
}

@ -2,7 +2,7 @@ use crate::{
combined_result::CombinedResult,
executor::{ExecResult, Waiter},
misc::OneOrVec,
models::{Agent, AssignedJob, AssignedJobById, FatJobMeta, JobType, Payload, ThinJobMeta},
models::{Agent, AssignedJob, AssignedJobById, FatJob, JobType, RawJob, ThinJob},
proc_output::ProcOutput,
ufs,
};
@ -16,11 +16,10 @@ pub struct AnonymousJobBatch {
}
impl AnonymousJobBatch {
pub fn from_meta_with_id(jobs: impl OneOrVec<(ThinJobMeta, AssignedJobById)>) -> Self {
let jobs = jobs.into_vec();
pub fn from_meta_with_id(jobs: impl OneOrVec<(ThinJob, AssignedJobById)>) -> Self {
let mut waiter = Waiter::new();
for (meta, job) in jobs {
waiter.push(run_assigned_job(meta, job));
for (job, ids) in jobs.into_vec() {
waiter.push(run_assigned_job(job, ids));
}
Self {
waiter,
@ -28,14 +27,14 @@ impl AnonymousJobBatch {
}
}
pub fn from_meta(metas: impl OneOrVec<ThinJobMeta>) -> Self {
let jobs: Vec<_> = metas
pub fn from_meta(jobs: impl OneOrVec<ThinJob>) -> Self {
let jobs_ids: Vec<_> = jobs
.into_vec()
.into_iter()
.map(|meta| {
let job_id = meta.id;
.map(|job| {
let job_id = job.job.id;
(
meta,
job,
AssignedJobById {
job_id,
..Default::default()
@ -43,7 +42,7 @@ impl AnonymousJobBatch {
)
})
.collect();
AnonymousJobBatch::from_meta_with_id(jobs)
AnonymousJobBatch::from_meta_with_id(jobs_ids)
}
/// Spawn jobs
@ -85,18 +84,8 @@ impl NamedJobBatch {
.into_vec()
.into_iter()
.filter_map(|(alias, cmd)| {
match FatJobMeta::builder()
.with_shell(cmd)
.with_alias(alias)
.build()
{
Ok(fat_meta) => match fat_meta_to_thin(fat_meta) {
Ok(thin_meta) => Some(thin_meta),
Err(e) => {
result.err(e);
None
}
},
match RawJob::builder().with_shell(cmd).with_alias(alias).build() {
Ok(jpm) => Some(jpm),
Err(e) => {
result.err(e);
None
@ -108,14 +97,14 @@ impl NamedJobBatch {
result
}
pub fn from_meta(named_jobs: impl OneOrVec<ThinJobMeta>) -> Self {
let (job_names, job_metas): (Vec<_>, Vec<_>) = named_jobs
pub fn from_meta(named_jobs: impl OneOrVec<ThinJob>) -> Self {
let (job_names, jobs): (Vec<_>, Vec<_>) = named_jobs
.into_vec()
.into_iter()
.map(|meta| (meta.alias.clone().unwrap(), meta))
.map(|job| (job.job.alias.clone().unwrap(), job))
.unzip();
Self {
runner: Some(AnonymousJobBatch::from_meta(job_metas)),
runner: Some(AnonymousJobBatch::from_meta(jobs)),
job_names,
results: HashMap::new(),
}
@ -145,17 +134,18 @@ impl NamedJobBatch<true> {
}
}
pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecResult {
let mut job = AssignedJob::from((&meta, ids));
match meta.exec_type {
pub async fn run_assigned_job(job: ThinJob, ids: AssignedJobById) -> ExecResult {
let ThinJob { job, payload_meta } = job;
let mut result = AssignedJob::from((&job, ids));
match job.exec_type {
JobType::Shell => {
let (argv, _prepared_payload) = {
if let Some(ref payload) = meta.payload {
let (prep_exec, prep_exec_path) = ufs::prepare_executable(payload)?;
let argv_with_exec = meta.argv.replace("{}", &prep_exec_path);
if let Some(meta) = payload_meta {
let (prep_exec, prep_exec_path) = ufs::prepare_executable(meta.name)?;
let argv_with_exec = job.argv.replace("{}", &prep_exec_path);
(argv_with_exec, Some(prep_exec))
} else {
(meta.argv, None)
(job.argv, None)
}
};
@ -175,69 +165,55 @@ pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecRe
None,
),
};
job.result = Some(data);
job.retcode = retcode;
result.result = Some(data);
result.retcode = retcode;
}
JobType::Init => {
job.set_result(&Agent::run().await);
job.retcode = Some(0);
result.set_result(&Agent::run().await);
result.retcode = Some(0);
}
JobType::Service => todo!(),
JobType::Update => todo!(),
JobType::Terminate => exit(0),
};
Ok(job)
Ok(result)
}
pub fn fat_meta_to_thin(meta: FatJobMeta<true>) -> Result<ThinJobMeta, ufs::Error> {
let payload_ident = if let Some(mut payload) = meta.payload {
let job_name = match &meta.alias {
Some(a) => a.to_string(),
None => meta.id.simple().to_string(),
};
payload.write_self_into(&job_name)?;
Some(job_name)
} else {
None
};
pub fn split_payload(job: FatJob) -> Result<ThinJob, ufs::Error> {
let FatJob {
job,
payload_meta,
payload_data,
} = job;
Ok(ThinJobMeta {
alias: meta.alias,
argv: meta.argv,
id: meta.id,
exec_type: meta.exec_type,
platform: meta.platform,
payload: payload_ident,
schedule: meta.schedule,
})
}
if let Some(meta) = &payload_meta {
if !ufs::in_index(&meta.name) {
ufs::put(&meta.name, payload_data.unwrap())?;
}
}
pub fn thin_meta_to_fat(meta: ThinJobMeta) -> Result<FatJobMeta<true>, ufs::Error> {
let payload = if let Some(payload) = meta.payload {
let mut fat_payload = Payload::Ident(payload);
fat_payload.read_into_self()?;
Some(fat_payload)
} else {
None
};
Ok(ThinJob { job, payload_meta })
}
Ok(FatJobMeta {
alias: meta.alias,
argv: meta.argv,
id: meta.id,
exec_type: meta.exec_type,
platform: meta.platform,
payload,
schedule: meta.schedule,
pub fn join_payload(job: ThinJob) -> Result<FatJob, ufs::Error> {
let ThinJob { job, payload_meta } = job;
let payload_data = payload_meta
.as_ref()
.map(|p| ufs::read(&p.name))
.transpose()?;
Ok(FatJob {
job,
payload_meta,
payload_data,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
jobs::{AnonymousJobBatch, NamedJobBatch},
models::{misc::JobType, FatJobMeta},
models::{misc::JobType, RawJob},
unwrap_enum, UError,
};
use std::time::SystemTime;
@ -247,10 +223,8 @@ mod tests {
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = FatJobMeta::from_shell(format!("sleep {SLEEP_SECS}")).unwrap();
let sleep_jobs = (0..50)
.map(|_| fat_meta_to_thin(job.clone()).unwrap())
.collect::<Vec<_>>();
let job = RawJob::from_shell(format!("sleep {SLEEP_SECS}")).unwrap();
let sleep_jobs = (0..50).map(|_| job.clone()).collect::<Vec<_>>();
let now = SystemTime::now();
AnonymousJobBatch::from_meta(sleep_jobs).wait().await;
@ -287,11 +261,11 @@ mod tests {
#[case] payload: Option<&[u8]>,
#[case] expected_result: &str,
) -> TestResult {
let mut job = FatJobMeta::builder().with_shell(cmd);
let mut job = RawJob::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let job = fat_meta_to_thin(job.build().unwrap()).unwrap();
let job = job.build().unwrap();
let result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap();
let result = result.to_str_result();
assert_eq!(result.trim(), expected_result);
@ -302,23 +276,19 @@ mod tests {
async fn test_complex_load() -> TestResult {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = FatJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = AnonymousJobBatch::from_meta(fat_meta_to_thin(longest_job).unwrap())
.spawn()
.await;
let ls = AnonymousJobBatch::from_meta(
fat_meta_to_thin(FatJobMeta::from_shell("ls").unwrap()).unwrap(),
)
.wait_one()
.await
.unwrap();
let longest_job = RawJob::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = AnonymousJobBatch::from_meta(longest_job).spawn().await;
let ls = AnonymousJobBatch::from_meta(RawJob::from_shell("ls").unwrap())
.wait_one()
.await
.unwrap();
assert_eq!(ls.retcode.unwrap(), 0);
let folders = ls.to_str_result();
let subfolders_jobs = folders
.lines()
.map(|f| fat_meta_to_thin(FatJobMeta::from_shell(format!("ls {f}")).unwrap()).unwrap())
.map(|f| RawJob::from_shell(format!("ls {f}")).unwrap())
.collect::<Vec<_>>();
let ls_subfolders = AnonymousJobBatch::from_meta(subfolders_jobs).wait().await;
@ -351,7 +321,7 @@ mod tests {
*/
#[tokio::test]
async fn test_failing_shell_job() -> TestResult {
let job = fat_meta_to_thin(FatJobMeta::from_shell("lol_kek_puk").unwrap()).unwrap();
let job = RawJob::from_shell("lol_kek_puk").unwrap();
let job_result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap();
let output = job_result.to_str_result();
assert!(output.contains("No such file"));
@ -368,7 +338,7 @@ mod tests {
#[case] payload: Option<&[u8]>,
#[case] err_str: &str,
) -> TestResult {
let mut job = FatJobMeta::builder().with_shell(cmd);
let mut job = RawJob::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
@ -380,23 +350,18 @@ mod tests {
#[tokio::test]
async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobBatch::from_meta(
[
FatJobMeta::builder()
.with_shell("sleep 3")
.with_alias("sleeper")
.build()
.unwrap(),
FatJobMeta::builder()
.with_type(JobType::Init)
.with_alias("gatherer")
.build()
.unwrap(),
]
.into_iter()
.map(|meta| fat_meta_to_thin(meta).unwrap())
.collect::<Vec<_>>(),
)
let mut jobs = NamedJobBatch::from_meta(vec![
RawJob::builder()
.with_shell("sleep 3")
.with_alias("sleeper")
.build()
.unwrap(),
RawJob::builder()
.with_type(JobType::Init)
.with_alias("gatherer")
.build()
.unwrap(),
])
.wait()
.await;
let gathered = jobs.pop("gatherer").unwrap();

@ -1,22 +1,20 @@
mod files;
use crate::models::*;
use crate::types::Id;
use crate::UError;
pub use files::*;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
/// Represents types that could be used in client-server interaction
pub trait AsMsg: Clone + Serialize + Debug {}
impl AsMsg for Agent {}
impl AsMsg for AssignedJob {}
impl AsMsg for AssignedJobById {}
impl AsMsg for DownloadInfo {}
impl AsMsg for FatJobMeta<true> {}
impl AsMsg for JobModel {}
impl AsMsg for FatJob {}
impl AsMsg for Reportable {}
impl AsMsg for String {}
impl AsMsg for ThinJobMeta {}
impl AsMsg for ThinJob {}
impl AsMsg for Id {}
impl AsMsg for i32 {}
impl AsMsg for u8 {}

@ -1,8 +0,0 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct DownloadInfo {
hashsum: String,
dl_fid: Uuid,
}

@ -20,7 +20,7 @@ macro_rules! unwrap_enum {
if let $t(result) = $src {
result
} else {
panic!("wrong type {}", stringify!($t))
panic!("wrong type '{}'", stringify!($t))
}
};
}

@ -1,4 +1,4 @@
use super::{JobState, JobType, ThinJobMeta};
use super::{JobModel, JobState, JobType};
#[cfg(feature = "server")]
use crate::models::schema::*;
use crate::{
@ -60,8 +60,8 @@ pub struct AssignedJobById {
pub job_id: Id,
}
impl From<(&ThinJobMeta, AssignedJobById)> for AssignedJob {
fn from((meta, ids): (&ThinJobMeta, AssignedJobById)) -> Self {
impl From<(&JobModel, AssignedJobById)> for AssignedJob {
fn from((job, ids): (&JobModel, AssignedJobById)) -> Self {
let AssignedJobById {
agent_id,
id,
@ -72,8 +72,8 @@ impl From<(&ThinJobMeta, AssignedJobById)> for AssignedJob {
id,
agent_id,
job_id,
alias: meta.alias.clone(),
exec_type: meta.exec_type,
alias: job.alias.clone(),
exec_type: job.exec_type,
..Default::default()
}
}

@ -1,23 +1,26 @@
use std::fmt;
use super::JobType;
use crate::models::payload::Payload;
use crate::conv::bytes_to_string;
#[cfg(feature = "server")]
use crate::models::schema::*;
use crate::models::PayloadMeta;
use crate::platform::Platform;
use crate::types::Id;
use crate::{UError, UResult};
use crate::{ufs, UError, UResult};
#[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use std::fs::metadata;
use std::process::Command;
#[derive(Serialize, Deserialize, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(
feature = "server",
derive(Queryable, Identifiable, Insertable, AsChangeset),
diesel(table_name = jobs)
)]
pub struct ThinJobMeta {
pub struct JobModel {
pub alias: Option<String>,
/// string like `bash -c {} -a 1 --arg2`,
/// where {} is replaced by executable's tmp path
@ -26,27 +29,40 @@ pub struct ThinJobMeta {
pub exec_type: JobType,
/// target triple
pub platform: String,
pub payload: Option<String>,
pub payload: Option<Id>,
/// cron-like string
pub schedule: Option<String>,
}
impl fmt::Debug for ThinJobMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThinJobMeta")
.field("alias", &self.alias)
.field("argv", &self.argv)
.field("id", &self.id.to_string())
.field("exec_type", &self.exec_type)
.field("platform", &self.platform)
.field("payload", &self.payload)
.field("schedule", &self.schedule)
.finish()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FatJob {
pub job: JobModel,
pub payload_meta: Option<PayloadMeta>,
pub payload_data: Option<Vec<u8>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThinJob {
pub job: JobModel,
pub payload_meta: Option<PayloadMeta>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct FatJobMeta<const VALIDATED: bool = false> {
// impl fmt::Debug for ThinJobMeta {
// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// f.debug_struct("ThinJobMeta")
// .field("alias", &self.alias)
// .field("argv", &self.argv)
// .field("id", &self.id.to_string())
// .field("exec_type", &self.exec_type)
// .field("platform", &self.platform)
// .field("payload", &self.payload)
// .field("schedule", &self.schedule)
// .finish()
// }
// }
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct RawJob {
#[serde(default)]
pub alias: Option<String>,
@ -66,16 +82,16 @@ pub struct FatJobMeta<const VALIDATED: bool = false> {
pub platform: String,
#[serde(default)]
pub payload: Option<Payload>,
pub payload: Option<String>,
/// cron-like string
#[serde(default)]
pub schedule: Option<String>,
}
impl<const VALIDATED: bool> fmt::Debug for FatJobMeta<VALIDATED> {
impl fmt::Debug for RawJob {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FatJobMeta")
f.debug_struct("RawJob")
.field("alias", &self.alias)
.field("argv", &self.argv)
.field("id", &self.id.to_string())
@ -87,53 +103,75 @@ impl<const VALIDATED: bool> fmt::Debug for FatJobMeta<VALIDATED> {
}
}
impl FatJobMeta {
pub fn validated(self) -> UResult<FatJobMeta<true>> {
JobMetaBuilder { inner: self }.build()
impl From<ThinJob> for RawJob {
fn from(job: ThinJob) -> Self {
let ThinJob { job, payload_meta } = job;
RawJob {
alias: job.alias,
argv: job.argv,
id: job.id,
exec_type: job.exec_type,
platform: job.platform,
payload: payload_meta.map(|m| m.name),
schedule: job.schedule,
}
}
}
pub fn from_shell(cmd: impl Into<String>) -> UResult<FatJobMeta<true>> {
Self::builder().with_shell(cmd).build()
impl RawJob {
pub fn validated(self) -> UResult<ThinJob> {
JobBuilder {
inner: self,
raw_payload: None,
}
.build()
}
pub fn builder() -> JobMetaBuilder {
JobMetaBuilder::default()
pub fn from_shell(cmd: impl Into<String>) -> UResult<ThinJob> {
Self::builder().with_shell(cmd).build()
}
}
impl<const VALIDATED: bool> Default for FatJobMeta<VALIDATED> {
fn default() -> Self {
Self {
id: Id::new_v4(),
alias: None,
argv: String::new(),
exec_type: JobType::Shell,
platform: Platform::current_as_string(),
payload: None,
schedule: None,
}
pub fn builder<'p>() -> JobBuilder<'p> {
JobBuilder::default()
}
}
// impl Default for RawJob {
// fn default() -> Self {
// Self {
// id: Id::new_v4(),
// alias: None,
// argv: String::new(),
// exec_type: JobType::Shell,
// platform: Platform::current_as_string(),
// payload: None,
// schedule: None,
// }
// }
//}
#[derive(Default)]
pub struct JobMetaBuilder {
inner: FatJobMeta<false>,
pub struct JobBuilder<'p> {
inner: RawJob,
raw_payload: Option<&'p [u8]>,
}
impl JobMetaBuilder {
impl<'p> JobBuilder<'p> {
pub fn with_shell(mut self, shell_cmd: impl Into<String>) -> Self {
self.inner.argv = shell_cmd.into();
self.inner.exec_type = JobType::Shell;
self
}
pub fn with_payload(mut self, payload: impl Into<Vec<u8>>) -> Self {
self.inner.payload = Some(Payload::from_payload(payload));
pub fn with_payload(mut self, raw_payload: &'p [u8]) -> Self {
self.raw_payload = Some(raw_payload);
self.inner.payload = None;
self
}
pub fn with_payload_path(mut self, path: impl Into<String>) -> Self {
self.inner.payload = Some(Payload::Ident(path.into()));
self.inner.payload = Some(path.into());
self.raw_payload = None;
self
}
@ -147,16 +185,42 @@ impl JobMetaBuilder {
self
}
pub fn build(self) -> UResult<FatJobMeta<true>> {
pub fn build(self) -> UResult<ThinJob> {
let mut inner = self.inner;
let validated = |jmeta: FatJobMeta<false>| FatJobMeta::<true> {
alias: jmeta.alias,
argv: jmeta.argv,
id: jmeta.id,
exec_type: jmeta.exec_type,
platform: jmeta.platform,
payload: jmeta.payload,
schedule: jmeta.schedule,
let raw_into_job = |raw: RawJob| -> UResult<ThinJob> {
let payload_id = raw.payload.as_ref().map(|_| Id::new_v4());
Ok(ThinJob {
job: JobModel {
alias: raw.alias,
argv: raw.argv,
id: raw.id,
exec_type: raw.exec_type,
platform: raw.platform,
payload: payload_id,
schedule: raw.schedule,
},
payload_meta: raw
.payload
.map(|payload_path| {
Ok::<_, UError>(PayloadMeta {
id: payload_id.unwrap(),
mime_type: bytes_to_string(
&Command::new("file")
.arg("-b")
.arg("--mime-type")
.arg(&payload_path)
.output()
.map_err(|e| UError::JobBuildError(e.to_string()))?
.stdout,
),
name: payload_path.clone(),
size: metadata(payload_path).unwrap().len() as i64,
})
})
.transpose()?,
})
};
match inner.exec_type {
@ -180,19 +244,28 @@ impl JobMetaBuilder {
return Err(empty_err.into());
}
if let Some(payload) = &mut inner.payload {
payload.add_to_index()?;
if let Some(payload_path) = &inner.payload {
ufs::put_external(payload_path)?;
}
if let Some(raw) = self.raw_payload {
match inner.payload {
Some(_) => {
return Err(UError::JobBuildError(
"Can't use both raw payload with payload path".to_string(),
))
}
None => inner.payload = Some(ufs::create(raw)?),
}
}
match inner.payload.as_ref() {
Some(p) => {
if let Payload::Data(d) = p {
if !d.is_empty() && !inner.argv.contains("{}") {
return Err(UError::JobBuildError(
"Argv contains no executable placeholder".into(),
)
.into());
}
Some(_) => {
if !inner.argv.contains("{}") {
return Err(UError::JobBuildError(
"Argv contains no executable placeholder".into(),
)
.into());
}
}
None => {
@ -213,9 +286,9 @@ impl JobMetaBuilder {
)));
}
Ok(validated(inner))
raw_into_job(inner)
}
_ => Ok(validated(inner)),
_ => raw_into_job(inner),
}
}
}

@ -1,76 +1,20 @@
use crate::{conv::bytes_to_string_truncated, ufs};
use crate::types::Id;
use serde::{Deserialize, Serialize};
use std::{fmt, path::PathBuf};
#[derive(Clone, Deserialize, Serialize)]
#[serde(untagged)]
pub enum Payload {
/// Raw payload data
Data(Vec<u8>),
/// Payload identifier in ufs
Ident(String),
}
impl Payload {
pub fn read_into_self(&mut self) -> Result<(), ufs::Error> {
match self {
Payload::Data(_) => Ok(()),
Payload::Ident(ident) => {
let data = ufs::read(ident)?;
*self = Payload::Data(data);
Ok(())
}
}
}
pub fn write_self_into(&mut self, name: impl AsRef<str>) -> Result<(), ufs::Error> {
match self {
Payload::Ident(_) => Ok(()),
Payload::Data(data) => {
ufs::put(&name, data)?;
*self = Payload::Ident(name.as_ref().to_string());
Ok(())
}
}
}
pub fn from_payload(data: impl Into<Vec<u8>>) -> Self {
Payload::Data(data.into())
}
pub fn from_path(path: impl Into<PathBuf>) -> Result<Self, ufs::Error> {
let path: PathBuf = path.into();
if !path.exists() || path.is_dir() {
return Err(ufs::Error::not_found(path));
}
ufs::put_existing(&path)?;
Ok(Payload::Ident(path.to_string_lossy().to_string()))
}
pub fn add_to_index(&self) -> Result<(), ufs::Error> {
match self {
Payload::Ident(ident) => Payload::from_path(ident).map(|_| ()),
_ => Ok(()),
}
}
}
impl fmt::Debug for Payload {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Data(data) => {
let mut dbg = &mut f.debug_tuple("Data");
let data = bytes_to_string_truncated(data, 256);
dbg = dbg.field(&data);
dbg.finish()
}
Self::Ident(ident) => f.debug_tuple("Ident").field(ident).finish(),
}
}
#[cfg(feature = "server")]
use crate::models::schema::*;
#[cfg(feature = "server")]
use diesel::Identifiable;
#[cfg_attr(
feature = "server",
derive(Insertable, Queryable, Identifiable),
diesel(table_name = payloads)
)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PayloadMeta {
pub id: Id,
pub mime_type: String,
pub name: String,
pub size: i64,
}

@ -36,16 +36,6 @@ diesel::table! {
}
}
diesel::table! {
use crate::schema_exports::*;
certificates (id) {
agent_id -> Uuid,
id -> Uuid,
is_revoked -> Bool,
}
}
diesel::table! {
use crate::schema_exports::*;
use super::sql_types::Jobtype;
@ -56,11 +46,22 @@ diesel::table! {
id -> Uuid,
exec_type -> Jobtype,
platform -> Text,
payload -> Nullable<Text>,
payload -> Nullable<Uuid>,
schedule -> Nullable<Text>,
}
}
diesel::table! {
use crate::schema_exports::*;
payloads (id) {
id -> Uuid,
mime_type -> Text,
name -> Text,
size -> Int8,
}
}
diesel::table! {
use crate::schema_exports::*;
use super::sql_types::Jobstate;
@ -80,8 +81,8 @@ diesel::table! {
}
}
diesel::joinable!(certificates -> agents (agent_id));
diesel::joinable!(jobs -> payloads (payload));
diesel::joinable!(results -> agents (agent_id));
diesel::joinable!(results -> jobs (job_id));
diesel::allow_tables_to_appear_in_same_query!(agents, certificates, jobs, results,);
diesel::allow_tables_to_appear_in_same_query!(agents, jobs, payloads, results,);

@ -1,5 +1,5 @@
// This module is aiming to store obfuscated payloads, get them by name,
// delete or prepare to execute via memfd_create (unix)
// rename, update, delete or prepare to execute via memfd_create (unix)
use once_cell::sync::Lazy;
use parking_lot::RwLock;
@ -20,6 +20,7 @@ struct FileMeta {
path: PathBuf,
obfuscated: bool,
extension: Option<OsString>,
external: bool,
}
/// Remove deleted files from index
@ -58,8 +59,18 @@ pub fn read(name: impl AsRef<str>) -> Result<Vec<u8>, Error> {
fs::read(&meta.path).map_err(|e| Error::new(e, name))
}
pub fn create(data: impl AsRef<[u8]>) -> Result<String, Error> {
let name = Uuid::new_v4().simple().to_string();
put(&name, data)?;
Ok(name)
}
/// Create new file and add to index
pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<(), Error> {
sync_index();
let name = name.as_ref();
let obfuscate = !cfg!(feature = "server") && !cfg!(feature = "panel");
@ -83,21 +94,83 @@ pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<(), Error> {
fs::write(&path, data).map_err(|e| Error::new(e, name))?;
let mut index = INDEX.write();
index.insert(
INDEX.write().insert(
name.to_string(),
FileMeta {
path,
obfuscated: obfuscate,
extension,
external: false,
},
);
Ok(())
}
/// Add existing file to index
pub fn put_existing(path: impl AsRef<Path>) -> Result<(), Error> {
pub fn remove(name: impl AsRef<str>) -> Result<(), Error> {
sync_index();
let name = name.as_ref();
match INDEX.write().remove(name) {
Some(value) => fs::remove_file(value.path).map_err(|e| Error::new(e, name)),
None => Ok(()),
}
}
pub fn rename(old_name: impl AsRef<str>, new_name: impl AsRef<str>) -> Result<(), Error> {
sync_index();
let old_name = old_name.as_ref();
let new_name = new_name.as_ref();
if old_name == new_name {
return Ok(());
}
if !in_index(old_name) {
return Err(Error::not_found(old_name));
}
if in_index(new_name) {
return Err(Error::already_exists(new_name));
}
let mut value = INDEX.write().remove(old_name).unwrap();
if !value.obfuscated {
let old_path = value.path.clone();
value.path.pop();
value.path.push(new_name);
fs::rename(old_path, &value.path).map_err(|e| Error::new(e, &value.path))?;
}
INDEX.write().insert(new_name.to_string(), value);
Ok(())
}
pub fn update_payload_data(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<(), Error> {
sync_index();
let name = name.as_ref();
let external = INDEX.read().get(name).map(|v| v.external).unwrap_or(false);
if external {
INDEX.write().remove(name);
} else if in_index(&name) {
remove(&name)?;
}
put(name, data)
}
/// Add an existing file to index
pub fn put_external(path: impl AsRef<Path>) -> Result<(), Error> {
sync_index();
let path = path.as_ref();
let path_str = path.as_os_str().to_string_lossy().to_string();
@ -109,19 +182,20 @@ pub fn put_existing(path: impl AsRef<Path>) -> Result<(), Error> {
return Err(Error::already_exists(&path));
}
let mut index = INDEX.write();
index.insert(
INDEX.write().insert(
path_str,
FileMeta {
path: path.to_owned(),
obfuscated: false,
extension: path.file_stem().map(ToOwned::to_owned),
external: true,
},
);
Ok(())
}
/// Prepare executable file: unpack, decipher if needed and send under memfd
#[cfg(unix)]
pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error> {
use libc::getpid;

@ -1,8 +1,7 @@
DROP TABLE ip_addrs;
DROP TABLE results;
DROP TABLE certificates;
DROP TABLE jobs;
DROP TABLE agents;
DROP TABLE IF EXISTS results;
DROP TABLE IF EXISTS jobs;
DROP TABLE IF EXISTS payloads;
DROP TABLE IF EXISTS agents;
DROP TYPE IF EXISTS JobState;
DROP TYPE IF EXISTS JobType;

@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS agents (
alias TEXT,
hostname TEXT NOT NULL,
host_info TEXT NOT NULL,
id UUID NOT NULL DEFAULT uuid_generate_v4(),
id UUID NOT NULL,
ip_gray TEXT,
ip_white TEXT,
is_root BOOLEAN NOT NULL DEFAULT false,
@ -22,15 +22,25 @@ CREATE TABLE IF NOT EXISTS agents (
PRIMARY KEY(id)
);
CREATE TABLE IF NOT EXISTS payloads (
id UUID NOT NULL,
mime_type TEXT NOT NULL,
name TEXT NOT NULL UNIQUE,
size BIGINT NOT NULL,
PRIMARY KEY(id)
);
CREATE TABLE IF NOT EXISTS jobs (
alias TEXT,
argv TEXT NOT NULL,
id UUID NOT NULL DEFAULT uuid_generate_v4(),
id UUID NOT NULL,
exec_type JobType NOT NULL DEFAULT 'shell',
platform TEXT NOT NULL,
payload TEXT,
payload UUID,
schedule TEXT,
FOREIGN KEY(payload) REFERENCES payloads(id) ON DELETE SET NULL,
PRIMARY KEY(id)
);
@ -38,7 +48,7 @@ CREATE TABLE IF NOT EXISTS results (
agent_id UUID NOT NULL,
alias TEXT,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
id UUID NOT NULL DEFAULT uuid_generate_v4(),
id UUID NOT NULL,
job_id UUID NOT NULL,
result BYTEA,
state JobState NOT NULL DEFAULT 'queued',
@ -50,12 +60,3 @@ CREATE TABLE IF NOT EXISTS results (
FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE,
PRIMARY KEY(id)
);
CREATE TABLE IF NOT EXISTS certificates (
agent_id UUID NOT NULL,
id UUID NOT NULL DEFAULT uuid_generate_v4(),
is_revoked BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY(id),
FOREIGN KEY(agent_id) REFERENCES agents(id)
);
Loading…
Cancel
Save