remove redundant brief structs

pull/9/head
plazmoid 1 year ago
parent a38e3a1561
commit 32e96476cf
  1. 27
      Cargo.lock
  2. 16
      bin/u_agent/src/lib.rs
  3. 25
      bin/u_panel/src/argparse.rs
  4. 10
      bin/u_panel/src/gui/fe/src/app/components/dialogs/job-info-dialog/job-info-dialog.component.ts
  5. 6
      bin/u_panel/src/gui/fe/src/app/models/job.model.ts
  6. 12
      bin/u_panel/src/gui/fe/src/app/models/payload.model.ts
  7. 4
      bin/u_panel/src/gui/fe/src/app/services/api.service.ts
  8. 29
      bin/u_server/src/db.rs
  9. 26
      bin/u_server/src/error.rs
  10. 89
      bin/u_server/src/handlers.rs
  11. 6
      bin/u_server/src/u_server.rs
  12. 10
      integration-tests/tests/fixtures/agent.rs
  13. 5
      integration-tests/tests/helpers/panel.rs
  14. 14
      integration-tests/tests/integration_tests/api.rs
  15. 4
      integration-tests/tests/integration_tests/behaviour.rs
  16. 28
      lib/u_lib/src/api.rs
  17. 4
      lib/u_lib/src/cache.rs
  18. 66
      lib/u_lib/src/jobs.rs
  19. 4
      lib/u_lib/src/logging.rs
  20. 7
      lib/u_lib/src/messaging.rs
  21. 92
      lib/u_lib/src/models/jobs/meta.rs
  22. 13
      lib/u_lib/src/models/mod.rs
  23. 156
      lib/u_lib/src/models/payload.rs
  24. 1
      lib/u_lib/src/models/schema.rs
  25. 53
      lib/u_lib/src/ufs/mod.rs
  26. 1
      migrations/2020-10-24-111622_create_all/up.sql

27
Cargo.lock generated

@ -2020,9 +2020,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.37.14"
version = "0.37.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b864d3c18a5785a05953adeed93e2dca37ed30f18e69bba9f30079d51f363f"
checksum = "a0661814f891c57c930a610266415528da53c4933e6dea5fb350cbfe048a9ece"
dependencies = [
"bitflags",
"errno 0.3.1",
@ -2503,9 +2503,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.27.0"
version = "1.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001"
checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f"
dependencies = [
"autocfg",
"bytes",
@ -2517,14 +2517,14 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.45.0",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
version = "2.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
@ -2554,9 +2554,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.12"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
@ -2577,9 +2577,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.7"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
"bytes",
"futures-core",
@ -2606,11 +2606,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]]
name = "tracing"
version = "0.1.37"
version = "0.1.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
checksum = "cf9cf6a813d3f40c88b0b6b6f29a5c95c6cdbf97c1f9cc53fb820200f5ad814d"
dependencies = [
"cfg-if 1.0.0",
"log",
"pin-project-lite",
"tracing-attributes",

@ -9,8 +9,8 @@ use u_lib::{
cache::JobCache,
config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL},
error::ErrChan,
executor::pop_completed,
jobs::{split_payload, AnonymousJobBatch},
executor,
jobs::AnonymousJobBatch,
logging::init_logger,
messaging::Reportable,
models::AssignedJobById,
@ -21,7 +21,7 @@ async fn process_request(jobs: Vec<AssignedJobById>, client: &HttpClient) {
for jr in &jobs {
if !JobCache::contains(jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job = loop {
let mut fetched_job = loop {
//todo: use payload cache
match client.get_full_job(jr.job_id).await {
Ok(result) => break result,
@ -31,11 +31,13 @@ async fn process_request(jobs: Vec<AssignedJobById>, client: &HttpClient) {
}
}
};
match split_payload(fetched_job) {
Ok(job_payload_meta) => JobCache::insert(job_payload_meta),
Err(e) => ErrChan::send(e, "pld").await,
if let Some(payload) = &mut fetched_job.payload {
if let Err(e) = payload.maybe_split_payload() {
ErrChan::send(e, "pld").await;
}
}
JobCache::insert(fetched_job);
}
}
info!(
"Scheduling jobs: {}",
@ -88,7 +90,7 @@ async fn agent_loop(client: HttpClient) -> ! {
Err(err) => ErrChan::send(err, "processing").await,
}
let result: Vec<Reportable> = pop_completed()
let result: Vec<Reportable> = executor::pop_completed()
.await
.into_iter()
.map(|result| match result {

@ -2,9 +2,8 @@ use serde_json::{from_str, to_value, Value};
use structopt::StructOpt;
use u_lib::{
api::HttpClient,
jobs::join_payload,
messaging::AsMsg,
models::{Agent, AssignedJob, BriefMode, BriefOrFullJob, RawJob},
models::{Agent, AssignedJob, BriefMode, RawJob},
types::Id,
types::PanelResult,
UError, UResult,
@ -88,14 +87,13 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
JobCRUD::Create { job } => {
let raw_job = from_str::<RawJob>(&job)
.map_err(|e| UError::DeserializeError(e.to_string(), job))?;
let job = raw_job.validated()?;
let full_job = join_payload(job)?;
let mut job = raw_job.validated()?;
into_value(
client
.upload_jobs([&BriefOrFullJob::Full(full_job)])
.await?,
)
if let Some(payload) = &mut job.payload {
payload.join_payload()?;
}
into_value(client.upload_jobs([&job]).await?)
}
JobCRUD::RUD(RUD::Read { id }) => match id {
Some(id) => into_value(vec![client.get_job(id, args.brief).await?]),
@ -104,10 +102,13 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
JobCRUD::RUD(RUD::Update { item }) => {
let raw_job = from_str::<RawJob>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?;
let job = raw_job.validated()?;
let full_job = join_payload(job)?;
let mut job = raw_job.validated()?;
if let Some(payload) = &mut job.payload {
payload.join_payload()?;
}
into_value(client.update_job(&BriefOrFullJob::Full(full_job)).await?)
into_value(client.update_job(&job).await?)
}
JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
},

@ -1,9 +1,9 @@
import { Component, Inject } from '@angular/core';
import { MAT_DIALOG_DATA } from '@angular/material/dialog';
import { EventEmitter } from '@angular/core';
import { BriefOrFullJobModel } from '../../../models/job.model';
import { Job } from '../../../models/job.model';
import { ApiTableService } from 'src/app/services';
import { BriefOrFullPayloadModel, isFullPayload } from 'src/app/models';
import { PayloadModel } from 'src/app/models';
@Component({
selector: 'job-info-dialog',
@ -19,7 +19,7 @@ export class JobInfoDialogComponent {
onSave = new EventEmitter();
constructor(@Inject(MAT_DIALOG_DATA) public data: BriefOrFullJobModel, dataSource: ApiTableService) {
constructor(@Inject(MAT_DIALOG_DATA) public data: Job, dataSource: ApiTableService) {
if (data.payload !== null) {
this.showPayload(data.payload)
}
@ -29,8 +29,8 @@ export class JobInfoDialogComponent {
})
}
showPayload(payload: BriefOrFullPayloadModel) {
if (isFullPayload(payload)) {
showPayload(payload: PayloadModel) {
if (payload.data !== null) {
this.decodedPayload = new TextDecoder().decode(new Uint8Array(payload.data))
} else {
this.isTooBigPayload = true

@ -1,4 +1,4 @@
import { BriefOrFullPayloadModel } from './'
import { PayloadModel } from './'
export interface JobModel {
alias: string | null,
@ -10,7 +10,7 @@ export interface JobModel {
schedule: string | null,
}
export interface BriefOrFullJobModel {
export interface Job {
job: JobModel,
payload: BriefOrFullPayloadModel | null,
payload: PayloadModel | null,
}

@ -3,15 +3,5 @@ export interface PayloadModel {
mime_type: string,
name: string,
size: number,
}
export interface FullPayloadModel {
meta: PayloadModel,
data: number[]
}
export type BriefOrFullPayloadModel = PayloadModel | FullPayloadModel;
export function isFullPayload(payload: BriefOrFullPayloadModel): payload is FullPayloadModel {
return (payload as FullPayloadModel).data !== undefined
data: number[] | null
}

@ -1,7 +1,7 @@
import { environment } from 'src/environments/environment';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, map, catchError, throwError } from 'rxjs';
import { ApiModel, getAreaByModel, PayloadModel, Empty, Area, AgentModel, JobModel, ResultModel, BriefOrFullJobModel } from '../models';
import { ApiModel, getAreaByModel, PayloadModel, Empty, Area, AgentModel, JobModel, ResultModel, Job } from '../models';
import { Injectable, Inject } from '@angular/core';
import { ErrorService } from './error.service';
@ -53,7 +53,7 @@ export class ApiTableService {
return this.getOne(id, 'agents')
}
getJob(id: string): Observable<BriefOrFullJobModel> {
getJob(id: string): Observable<Job> {
return this.getOne(id, 'jobs')
}

@ -3,7 +3,7 @@ use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection
use std::mem::drop;
use u_lib::{
db::PgAsyncPool,
models::{schema, Agent, AssignedJob, BriefJob, JobModel, JobState, PayloadMeta},
models::{schema, Agent, AssignedJob, Job, JobModel, JobState, Payload},
platform::Platform,
types::Id,
};
@ -50,13 +50,11 @@ pub struct UDB<'c> {
}
impl UDB<'_> {
pub fn insert_jobs(&mut self, jobs: &[BriefJob]) -> Result<()> {
pub fn insert_jobs(&mut self, jobs: &[Job]) -> Result<()> {
use schema::{jobs, payloads};
let (jobs, payloads_opt): (Vec<_>, Vec<_>) = jobs
.iter()
.map(|j| (&j.job, j.payload_meta.as_ref()))
.unzip();
let (jobs, payloads_opt): (Vec<_>, Vec<_>) =
jobs.iter().map(|j| (&j.job, j.payload.as_ref())).unzip();
let payloads = payloads_opt
.into_iter()
@ -76,17 +74,17 @@ impl UDB<'_> {
.map_err(with_err_ctx("Can't insert jobs"))
}
pub fn get_job(&mut self, id: Id) -> Result<Option<BriefJob>> {
pub fn get_job(&mut self, id: Id) -> Result<Option<Job>> {
use schema::{jobs, payloads};
let maybe_job_with_payload = jobs::table
.left_join(payloads::table)
.filter(jobs::id.eq(id))
.first::<(JobModel, Option<PayloadMeta>)>(self.conn)
.first::<(JobModel, Option<Payload>)>(self.conn)
.optional()
.map_err(with_err_ctx(format!("Can't get job {id}")))?;
Ok(maybe_job_with_payload.map(|(job, payload_meta)| BriefJob { job, payload_meta }))
Ok(maybe_job_with_payload.map(|(job, payload)| Job { job, payload }))
}
pub fn get_jobs(&mut self) -> Result<Vec<JobModel>> {
@ -97,7 +95,7 @@ impl UDB<'_> {
.map_err(with_err_ctx("Can't get jobs"))
}
pub fn get_payload_meta(&mut self, id: Id) -> Result<Option<PayloadMeta>> {
pub fn get_payload(&mut self, id: Id) -> Result<Option<Payload>> {
use schema::payloads;
payloads::table
@ -107,7 +105,7 @@ impl UDB<'_> {
.map_err(with_err_ctx(format!("Can't get payload {id}")))
}
pub fn get_payload_metas(&mut self) -> Result<Vec<PayloadMeta>> {
pub fn get_payloads(&mut self) -> Result<Vec<Payload>> {
use schema::payloads;
payloads::table
@ -115,17 +113,20 @@ impl UDB<'_> {
.map_err(with_err_ctx("Can't get payloads"))
}
pub fn find_job_by_alias(&mut self, alias: &str) -> Result<Option<BriefJob>> {
pub fn find_job_by_alias(&mut self, alias: &str) -> Result<Option<Job>> {
use schema::{jobs, payloads};
let maybe_job_with_payload = jobs::table
.left_join(payloads::table)
.filter(jobs::alias.eq(alias))
.first::<(JobModel, Option<PayloadMeta>)>(self.conn)
.first::<(JobModel, Option<Payload>)>(self.conn)
.optional()
.map_err(with_err_ctx(format!("Can't get job by alias {alias}")))?;
Ok(maybe_job_with_payload.map(|(job, payload_meta)| BriefJob { job, payload_meta }))
Ok(maybe_job_with_payload.map(|(job, payload_meta)| Job {
job,
payload: payload_meta,
}))
}
pub fn insert_result(&mut self, result: &AssignedJob) -> Result<()> {

@ -30,6 +30,12 @@ pub enum Error {
#[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")]
InsuitablePlatform(String, String),
#[error("{0}\nContext: {1}")]
Contexted(Box<Error>, String),
#[error("Runtime error: {0}")]
Runtime(String),
}
impl Reject for Error {}
@ -67,3 +73,23 @@ impl Reply for RejResponse {
with_status(self.message, self.status).into_response()
}
}
impl From<anyhow::Error> for Error {
fn from(e: anyhow::Error) -> Self {
let ctx = e
.chain()
.rev()
.skip(1)
.map(|cause| format!("ctx: {}", cause))
.collect::<Vec<_>>()
.join("\n");
match e.downcast::<Error>() {
Ok(err) => Error::Contexted(Box::new(err), ctx),
Err(err) => match err.downcast::<ufs::Error>() {
Ok(err) => Error::Contexted(Box::new(Error::FSError(err)), ctx),
Err(err) => Error::Runtime(err.to_string()),
},
}
}
}

@ -3,19 +3,10 @@ use std::sync::Arc;
use crate::db::{PgRepo, UDB};
use crate::error::Error;
use serde::Deserialize;
use u_lib::ufs;
use u_lib::{
api::retypes,
jobs::{join_payload, split_payload},
messaging::Reportable,
models::*,
types::Id,
};
use u_lib::{api::retypes, messaging::Reportable, models::*, types::Id};
use warp::reject::not_found;
use warp::Rejection;
const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32;
type EndpResult<T> = Result<T, Rejection>;
#[derive(Deserialize)]
@ -48,29 +39,24 @@ impl Endpoints {
id: Id,
params: Option<PayloadFlags>,
) -> EndpResult<retypes::GetJob> {
let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else {
let Some(mut job) = repo.interact(move |mut db| db.get_job(id)).await? else {
return Err(not_found())
};
let make_full_job = |j| -> Result<BriefOrFullJob, Error> {
let full_job = join_payload(j).map_err(Error::from)?;
Ok(BriefOrFullJob::Full(full_job))
};
Ok(match params.map(|p| p.brief) {
Some(BriefMode::Yes) => BriefOrFullJob::Brief(job),
Some(BriefMode::Yes) => job,
Some(BriefMode::Auto) | None => {
if job
.payload_meta
.as_ref()
.map(|m| m.size > MAX_READABLE_PAYLOAD_SIZE)
.unwrap_or(false)
{
BriefOrFullJob::Brief(job)
} else {
make_full_job(job)?
if let Some(payload) = &mut job.payload {
payload.maybe_join_payload().map_err(Error::from)?;
}
job
}
Some(BriefMode::No) => {
if let Some(payload) = &mut job.payload {
payload.join_payload().map_err(Error::from)?;
}
job
}
Some(BriefMode::No) => make_full_job(job)?,
})
}
@ -90,7 +76,7 @@ impl Endpoints {
}
pub async fn get_payloads(repo: Arc<PgRepo>) -> EndpResult<retypes::GetPayloads> {
repo.interact(move |mut db| db.get_payload_metas())
repo.interact(move |mut db| db.get_payloads())
.await
.map_err(From::from)
}
@ -100,24 +86,19 @@ impl Endpoints {
id: Id,
params: Option<PayloadFlags>,
) -> EndpResult<retypes::GetPayload> {
let Some(meta) = repo.interact(move |mut db| db.get_payload_meta(id)).await? else {
let Some(mut payload) = repo.interact(move |mut db| db.get_payload(id)).await? else {
return Err(not_found())
};
Ok(match params.map(|p| p.brief) {
Some(BriefMode::Yes) => BriefOrFullPayload::Brief(meta),
None | Some(BriefMode::Auto) if meta.size > MAX_READABLE_PAYLOAD_SIZE => {
BriefOrFullPayload::Brief(meta)
Some(BriefMode::Yes) => payload,
None | Some(BriefMode::Auto) => {
payload.maybe_join_payload().map_err(Error::from)?;
payload
}
_ => {
let payload_data = ufs::read(&meta.name).map_err(|e| {
error!("payload reading failed: {}", e);
Error::from(e.downcast::<Error>().expect("wrong error type"))
})?;
BriefOrFullPayload::Full(FullPayload {
meta,
data: payload_data,
})
payload.join_payload().map_err(Error::from)?;
payload
}
})
}
@ -162,17 +143,16 @@ impl Endpoints {
.map_err(From::from)
}
pub async fn upload_jobs(
repo: Arc<PgRepo>,
msg: Vec<BriefOrFullJob>,
) -> EndpResult<retypes::UploadJobs> {
pub async fn upload_jobs(repo: Arc<PgRepo>, msg: Vec<Job>) -> EndpResult<retypes::UploadJobs> {
let jobs = msg
.into_iter()
.map(|meta| match meta {
BriefOrFull::Full(job) => Ok(split_payload(job)?),
BriefOrFull::Brief(job) => Ok(job),
.map(|mut job| {
if let Some(payload) = &mut job.payload {
payload.maybe_split_payload()?;
}
Ok(job)
})
.collect::<Result<Vec<BriefJob>, Error>>()?;
.collect::<Result<Vec<Job>, Error>>()?;
repo.interact(move |mut db| db.insert_jobs(&jobs))
.await
@ -286,17 +266,12 @@ impl Endpoints {
Ok(())
}
pub async fn update_job(
repo: Arc<PgRepo>,
job: BriefOrFullJob,
) -> EndpResult<retypes::UpdateJob> {
let thin_job = match job {
BriefOrFullJob::Full(job) => split_payload(job).map_err(Error::from)?,
BriefOrFullJob::Brief(job) => job,
};
pub async fn update_job(repo: Arc<PgRepo>, mut job: Job) -> EndpResult<retypes::UpdateJob> {
if let Some(payload) = &mut job.payload {
payload.maybe_split_payload().map_err(Error::from)?;
}
repo.interact(move |mut db| db.update_job(&thin_job.job))
.await?;
repo.interact(move |mut db| db.update_job(&job.job)).await?;
Ok(())
}

@ -57,7 +57,7 @@ pub fn init_endpoints(
let upload_jobs = path("upload_jobs")
.and(with_db.clone())
.and(body::json::<Vec<BriefOrFullJob>>())
.and(body::json::<Vec<Job>>())
.and_then(Endpoints::upload_jobs)
.map(into_message);
@ -115,7 +115,7 @@ pub fn init_endpoints(
let update_job = path("update_job")
.and(with_db.clone())
.and(body::json::<BriefOrFullJob>())
.and(body::json::<Job>())
.and_then(Endpoints::update_job)
.map(ok);
@ -167,7 +167,7 @@ pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> {
let job_alias = "agent_hello";
let if_job_exists = db.find_job_by_alias(job_alias)?;
if if_job_exists.is_none() {
let agent_hello = RawJob::brief_job_builder()
let agent_hello = RawJob::builder()
.with_type(JobType::Init)
.with_alias(job_alias)
.build()

@ -1,7 +1,6 @@
use super::connections::*;
use super::run_async;
use u_lib::unwrap_enum;
use u_lib::{api::HttpClient, jobs::split_payload, messaging::Reportable, models::*, types::Id};
use u_lib::{api::HttpClient, messaging::Reportable, models::*, types::Id};
pub struct RegisteredAgent {
pub id: Id,
@ -23,14 +22,17 @@ pub fn registered_agent(client: &HttpClient) -> RegisteredAgent {
.unwrap();
let job_id = resp.job_id;
let job = client.get_job(job_id, BriefMode::No).await.unwrap();
let job = unwrap_enum!(job, BriefOrFull::Full);
assert_eq!(job.job.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from((&split_payload(job).unwrap().job, resp));
let mut agent_data = AssignedJob::from((&job.job, resp));
agent_data.set_result(&agent);
client
.report([Reportable::Assigned(agent_data)])
.await
.unwrap();
RegisteredAgent { id: agent_id }
})
}

@ -23,10 +23,7 @@ impl Panel {
let stderr = output.get_stderr();
if !stderr.is_empty() {
println!(
"\n*** PANEL DEBUG OUTPUT START***\n{}\n*** PANEL DEBUG OUTPUT END ***\n",
String::from_utf8_lossy(stderr)
);
println!("\n{}\n", String::from_utf8_lossy(stderr));
}
match from_slice(output.get_stdout()) {

@ -14,13 +14,13 @@
// ping(&self)
use crate::fixtures::connections::*;
use u_lib::models::{BriefOrFullJob, RawJob};
use u_lib::models::RawJob;
#[rstest]
#[tokio::test]
async fn test_jobs_endpoints(client_panel: &HttpClient) {
let job_alias = "henlo";
let mut job = RawJob::brief_job_builder()
let mut job = RawJob::builder()
.with_shell("echo henlo")
.with_alias(job_alias)
.build()
@ -28,19 +28,13 @@ async fn test_jobs_endpoints(client_panel: &HttpClient) {
let job_id = job.job.id;
client_panel
.upload_jobs([&BriefOrFullJob::Brief(job.clone())])
.await
.unwrap();
client_panel.upload_jobs([&job]).await.unwrap();
let fetched_job = client_panel.get_brief_job(job_id).await.unwrap();
assert_eq!(job, fetched_job);
job.job.alias = Some("henlo2".to_string());
client_panel
.update_job(&BriefOrFullJob::Brief(job.clone()))
.await
.unwrap();
client_panel.update_job(&job).await.unwrap();
let fetched_job = client_panel.get_brief_job(job_id).await.unwrap();
assert_eq!(job, fetched_job);

@ -21,7 +21,7 @@ async fn setup_tasks() {
let agents: Vec<Agent> = Panel::check_output("agents read");
let agent_id = agents[0].id;
let job_alias = "passwd_contents";
let job = RawJob::brief_job_builder()
let job = RawJob::builder()
.with_alias(job_alias)
.with_raw_payload(b"cat /etc/passwd".as_slice())
.with_shell("/bin/bash {}")
@ -54,7 +54,7 @@ async fn large_payload() {
let agent = &Panel::check_output::<Vec<Agent>>("agents read")[0];
let agent_id = agent.id;
let job_alias = "large_payload";
let job = RawJob::brief_job_builder()
let job = RawJob::builder()
.with_alias(job_alias)
.with_payload_path("./tests/bin/echoer")
.with_shell("{} type echo")

@ -7,7 +7,6 @@ use reqwest::{header, header::HeaderMap, Certificate, Client, Identity, Method,
use serde::de::DeserializeOwned;
use serde_json::{from_str, Value};
use crate::unwrap_enum;
use crate::{
config::{get_self_id, MASTER_PORT},
conv::opt_to_string,
@ -25,9 +24,8 @@ pub mod retypes {
pub type GetPersonalJobs = Vec<AssignedJobById>;
pub type Report = ();
pub type GetJob = BriefOrFullJob;
pub type GetFullJob = FullJob;
pub type GetBriefJob = BriefJob;
pub type GetJob = Job;
pub type GetBriefJob = Job;
pub type GetJobs = Vec<JobModel>;
pub type GetAgents = Vec<Agent>;
pub type UpdateAgent = ();
@ -38,8 +36,8 @@ pub mod retypes {
pub type SetJobs = Vec<Id>;
pub type GetAgentJobs = Vec<AssignedJob>;
pub type Ping = ();
pub type GetPayloads = Vec<PayloadMeta>;
pub type GetPayload = BriefOrFullPayload;
pub type GetPayloads = Vec<Payload>;
pub type GetPayload = Payload;
}
#[derive(Clone, Debug)]
@ -113,8 +111,6 @@ impl HttpClient {
.post(self.base_url.join(url).unwrap())
.json(payload);
debug!("url = {url}");
let response = request
.send()
.await
@ -136,7 +132,7 @@ impl HttpClient {
}
.map_err(From::from);
debug!("response = {:?}", result);
debug!("url = {url}, response = {result:?}");
result
}
@ -165,14 +161,12 @@ impl HttpClient {
self.req(format!("get_job/{job}?brief={brief}")).await
}
pub async fn get_full_job(&self, job: Id) -> Result<retypes::GetFullJob> {
let job = self.get_job(job, BriefMode::No).await?;
Ok(unwrap_enum!(job, BriefOrFullJob::Full))
pub async fn get_full_job(&self, job: Id) -> Result<retypes::GetJob> {
self.get_job(job, BriefMode::No).await
}
pub async fn get_brief_job(&self, job: Id) -> Result<retypes::GetBriefJob> {
let job = self.get_job(job, BriefMode::Yes).await?;
Ok(unwrap_enum!(job, BriefOrFullJob::Brief))
pub async fn get_brief_job(&self, job: Id) -> Result<retypes::GetJob> {
self.get_job(job, BriefMode::Yes).await
}
/// get all available jobs
@ -196,7 +190,7 @@ impl HttpClient {
}
/// update job
pub async fn update_job(&self, job: &BriefOrFullJob) -> Result<retypes::UpdateJob> {
pub async fn update_job(&self, job: &Job) -> Result<retypes::UpdateJob> {
self.req_with_payload("update_job", job).await
}
@ -208,7 +202,7 @@ impl HttpClient {
/// create and upload job
pub async fn upload_jobs(
&self,
payload: impl IntoIterator<Item = &BriefOrFullJob>,
payload: impl IntoIterator<Item = &Job>,
) -> Result<retypes::UploadJobs> {
self.req_with_payload("upload_jobs", &payload.into_iter().collect::<Vec<_>>())
.await

@ -1,10 +1,10 @@
use crate::models::BriefJob;
use crate::models::Job;
use crate::types::Id;
use lazy_static::lazy_static;
use parking_lot::{RwLock, RwLockReadGuard};
use std::{collections::HashMap, ops::Deref};
type Val = BriefJob;
type Val = Job;
type Cache = HashMap<Id, Val>;
lazy_static! {

@ -1,11 +1,8 @@
use crate::{
combined_result::CombinedResult,
executor::{ExecResult, Waiter},
models::{
Agent, AssignedJob, AssignedJobById, BriefJob, FullJob, FullPayload, JobType, RawJob,
},
models::{Agent, AssignedJob, AssignedJobById, Job, JobType, RawJob},
proc_output::ProcOutput,
ufs,
};
use std::collections::HashMap;
use std::process::exit;
@ -17,7 +14,7 @@ pub struct AnonymousJobBatch {
}
impl AnonymousJobBatch {
pub fn from_meta_with_id(jobs: impl IntoIterator<Item = (BriefJob, AssignedJobById)>) -> Self {
pub fn from_meta_with_id(jobs: impl IntoIterator<Item = (Job, AssignedJobById)>) -> Self {
let mut waiter = Waiter::new();
for (job, ids) in jobs {
waiter.push(run_assigned_job(job, ids));
@ -28,7 +25,7 @@ impl AnonymousJobBatch {
}
}
pub fn from_meta(jobs: impl IntoIterator<Item = BriefJob>) -> Self {
pub fn from_meta(jobs: impl IntoIterator<Item = Job>) -> Self {
let jobs_ids: Vec<_> = jobs
.into_iter()
.map(|job| {
@ -47,6 +44,7 @@ impl AnonymousJobBatch {
/// Spawn jobs
pub async fn spawn(mut self) -> Self {
debug!("spawning jobs");
self.waiter = self.waiter.spawn().await;
self.is_running = true;
self
@ -81,11 +79,7 @@ impl NamedJobBatch {
let jobs: Vec<_> = named_jobs
.into_iter()
.filter_map(|(alias, cmd)| {
match RawJob::brief_job_builder()
.with_shell(cmd)
.with_alias(alias)
.build()
{
match RawJob::builder().with_shell(cmd).with_alias(alias).build() {
Ok(jpm) => Some(jpm),
Err(e) => {
result.push_err(e);
@ -98,7 +92,7 @@ impl NamedJobBatch {
result
}
pub fn from_meta(named_jobs: Vec<BriefJob>) -> Self {
pub fn from_meta(named_jobs: Vec<Job>) -> Self {
let (job_names, jobs): (Vec<_>, Vec<_>) = named_jobs
.into_iter()
.map(|job| (job.job.alias.clone().unwrap(), job))
@ -134,14 +128,14 @@ impl NamedJobBatch<true> {
}
}
pub async fn run_assigned_job(job: BriefJob, ids: AssignedJobById) -> ExecResult {
let BriefJob { job, payload_meta } = job;
pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult {
let Job { job, payload } = job;
let mut result = AssignedJob::from((&job, ids));
match job.exec_type {
JobType::Shell => {
let (argv, _prepared_payload) = {
if let Some(meta) = payload_meta {
let (prep_exec, prep_exec_path) = ufs::prepare_executable(meta.name)?;
if let Some(payload) = payload {
let (prep_exec, prep_exec_path) = payload.prepare_executable()?;
let argv_with_exec = job.argv.replace("{}", &prep_exec_path);
(argv_with_exec, Some(prep_exec))
} else {
@ -181,38 +175,6 @@ pub async fn run_assigned_job(job: BriefJob, ids: AssignedJobById) -> ExecResult
Ok(result)
}
pub fn split_payload(job: FullJob) -> Result<BriefJob, ufs::Error> {
let FullJob { job, payload } = job;
if let Some(payload) = &payload {
if ufs::exists_in_index(&payload.meta.name) {
ufs::edit(&payload.meta.name, &payload.data)?;
} else {
ufs::put(&payload.meta.name, &payload.data)?;
}
}
Ok(BriefJob {
job,
payload_meta: payload.map(|p| p.meta),
})
}
pub fn join_payload(job: BriefJob) -> Result<FullJob, ufs::Error> {
let BriefJob { job, payload_meta } = job;
let payload = match payload_meta {
Some(meta) => {
let payload_data = ufs::read(&meta.name)?;
Some(FullPayload {
meta,
data: payload_data,
})
}
None => None,
};
Ok(FullJob { job, payload })
}
#[cfg(test)]
mod tests {
use crate::{
@ -265,7 +227,7 @@ mod tests {
#[case] payload: Option<&[u8]>,
#[case] expected_result: &str,
) -> TestResult {
let mut job = RawJob::brief_job_builder().with_shell(cmd);
let mut job = RawJob::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_raw_payload(p);
}
@ -348,7 +310,7 @@ mod tests {
#[case] payload: Option<&[u8]>,
#[case] err_str: &str,
) -> TestResult {
let mut job = RawJob::brief_job_builder().with_shell(cmd);
let mut job = RawJob::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_raw_payload(p);
}
@ -361,12 +323,12 @@ mod tests {
#[tokio::test]
async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobBatch::from_meta(vec![
RawJob::brief_job_builder()
RawJob::builder()
.with_shell("sleep 3")
.with_alias("sleeper")
.build()
.unwrap(),
RawJob::brief_job_builder()
RawJob::builder()
.with_type(JobType::Init)
.with_alias("gatherer")
.build()

@ -1,5 +1,5 @@
use std::env;
use std::io::{stderr, stdout};
use std::io::stderr;
use std::path::Path;
use tracing_appender::rolling;
@ -11,7 +11,7 @@ pub fn init_logger(logfile: Option<&str>) {
}
let output_layer = if cfg!(test) {
fmt::layer().with_writer(stdout).with_test_writer().boxed()
fmt::layer().with_test_writer().boxed()
} else {
fmt::layer().with_writer(stderr).boxed()
};

@ -11,12 +11,9 @@ impl AsMsg for Agent {}
impl AsMsg for AssignedJob {}
impl AsMsg for AssignedJobById {}
impl AsMsg for JobModel {}
impl AsMsg for FullJob {}
impl AsMsg for Reportable {}
impl AsMsg for PayloadMeta {}
impl AsMsg for FullPayload {}
impl AsMsg for BriefJob {}
impl<B: AsMsg, F: AsMsg> AsMsg for BriefOrFull<B, F> {}
impl AsMsg for Payload {}
impl AsMsg for Job {}
impl AsMsg for Id {}
impl AsMsg for String {}
impl AsMsg for Vec<u8> {}

@ -3,10 +3,10 @@ use std::fmt;
use super::JobType;
#[cfg(feature = "server")]
use crate::models::schema::*;
use crate::models::{FullPayload, PayloadMeta};
use crate::models::Payload;
use crate::platform;
use crate::types::Id;
use crate::{ufs, UError, UResult};
use crate::{UError, UResult};
#[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
@ -33,15 +33,9 @@ pub struct JobModel {
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FullJob {
pub struct Job {
pub job: JobModel,
pub payload: Option<FullPayload>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct BriefJob {
pub job: JobModel,
pub payload_meta: Option<PayloadMeta>,
pub payload: Option<Payload>,
}
#[derive(Serialize, Deserialize, Clone)]
@ -105,9 +99,12 @@ impl fmt::Debug for RawJob<'_> {
}
}
impl From<BriefJob> for RawJob<'_> {
fn from(job: BriefJob) -> Self {
let BriefJob { job, payload_meta } = job;
impl From<Job> for RawJob<'_> {
fn from(job: Job) -> Self {
let Job {
job,
payload: payload_meta,
} = job;
RawJob {
alias: job.alias,
argv: job.argv,
@ -122,15 +119,15 @@ impl From<BriefJob> for RawJob<'_> {
}
impl<'p> RawJob<'p> {
pub fn validated(self) -> UResult<BriefJob> {
pub fn validated(self) -> UResult<Job> {
JobBuilder { inner: self }.build()
}
pub fn from_shell(cmd: impl Into<String>) -> UResult<BriefJob> {
Self::brief_job_builder().with_shell(cmd).build()
pub fn from_shell(cmd: impl Into<String>) -> UResult<Job> {
Self::builder().with_shell(cmd).build()
}
pub fn brief_job_builder() -> JobBuilder<'p> {
pub fn builder() -> JobBuilder<'p> {
JobBuilder::default()
}
}
@ -174,29 +171,40 @@ impl<'p> JobBuilder<'p> {
self
}
pub fn build(self) -> UResult<BriefJob> {
pub fn build(self) -> UResult<Job> {
let mut inner = self.inner;
let raw_into_job = |raw: RawJob| -> UResult<BriefJob> {
let payload_meta = raw
fn _build(job: RawJob) -> UResult<Job> {
let payload = {
let payload_from_path = job
.payload_path
.as_ref()
.map(|payload_ident| PayloadMeta::from_existing_meta(payload_ident))
.map(|path| Payload::from_path(path))
.transpose()?;
Ok(BriefJob {
if payload_from_path.is_none() {
job.raw_payload
.as_ref()
.map(|data| Payload::from_data(data))
.transpose()?
} else {
payload_from_path
}
};
Ok(Job {
job: JobModel {
alias: raw.alias,
argv: raw.argv,
id: raw.id,
exec_type: raw.exec_type,
target_platforms: raw.target_platforms,
payload: payload_meta.as_ref().map(|meta| meta.id),
schedule: raw.schedule,
alias: job.alias,
argv: job.argv,
id: job.id,
exec_type: job.exec_type,
target_platforms: job.target_platforms,
payload: payload.as_ref().map(|p| p.id),
schedule: job.schedule,
},
payload_meta,
payload,
})
};
}
match inner.exec_type {
JobType::Shell => {
@ -219,23 +227,14 @@ impl<'p> JobBuilder<'p> {
return Err(empty_err.into());
}
if let Some(path) = &inner.payload_path {
ufs::put_external(path)?;
}
if let Some(raw_payload) = &inner.raw_payload {
match inner.payload_path {
Some(_) => {
if inner.raw_payload.is_some() && inner.payload_path.is_some() {
return Err(UError::JobBuildError(
"Can't use both raw payload with payload path".to_string(),
))
}
None => inner.payload_path = Some(ufs::create_anonymous(raw_payload)?),
}
));
}
match inner.payload_path.as_ref() {
Some(_) => {
Some(_) | None if inner.raw_payload.is_some() => {
if !inner.argv.contains("{}") {
return Err(UError::JobBuildError(
"Argv contains no executable placeholder".into(),
@ -244,7 +243,7 @@ impl<'p> JobBuilder<'p> {
}
}
None => {
if inner.argv.contains("{}") {
if inner.argv.contains("{}") && inner.raw_payload.is_none() {
return Err(UError::JobBuildError(
"No payload provided, but argv contains executable placeholder"
.into(),
@ -252,6 +251,7 @@ impl<'p> JobBuilder<'p> {
.into());
}
}
_ => (),
};
if inner.target_platforms.is_empty() {
@ -265,9 +265,9 @@ impl<'p> JobBuilder<'p> {
)));
}
raw_into_job(inner)
_build(inner)
}
_ => raw_into_job(inner),
_ => _build(inner),
}
}
}

@ -4,21 +4,10 @@ mod payload;
#[cfg(feature = "server")]
pub mod schema;
use crate::messaging::AsMsg;
pub use crate::models::{agent::*, jobs::*, payload::*};
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use strum::{Display as StrumDisplay, EnumString};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub enum BriefOrFull<B: AsMsg, F: AsMsg> {
Brief(B),
Full(F),
}
pub type BriefOrFullJob = BriefOrFull<BriefJob, FullJob>;
pub type BriefOrFullPayload = BriefOrFull<PayloadMeta, FullPayload>;
#[derive(Default, Debug, StrumDisplay, EnumString, Deserialize)]
pub enum BriefMode {
Yes,

@ -1,6 +1,9 @@
use crate::{conv::bytes_to_string, types::Id, ufs, UError};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::process::Command;
use std::{fs::File, path::Path, process::Command};
const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32;
#[cfg(feature = "server")]
use crate::models::schema::*;
@ -13,39 +16,148 @@ use diesel::Identifiable;
diesel(table_name = payloads)
)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PayloadMeta {
pub struct Payload {
pub id: Id,
pub mime_type: String,
pub name: String,
pub size: i64,
data: Option<Vec<u8>>, // when None, payload data is stored in ufs
}
impl Payload {
pub fn is_human_readable(&self) -> bool {
self.size < MAX_READABLE_PAYLOAD_SIZE && self.mime_type.starts_with("text/")
}
pub fn maybe_split_payload(&mut self) -> Result<()> {
if self.is_human_readable() {
return Ok(());
}
if let Some(data) = self.data.take() {
if ufs::exists_in_index(&self.name) {
ufs::edit(&self.name, data)?;
} else {
ufs::put(&self.name, data)?;
}
}
Ok(())
}
pub fn join_payload(&mut self) -> Result<()> {
if self.data.is_none() {
self.data = Some(ufs::read(&self.name)?);
}
Ok(())
}
pub fn maybe_join_payload(&mut self) -> Result<()> {
if self.is_human_readable() {
self.join_payload()?;
}
Ok(())
}
pub fn from_data(data: impl AsRef<[u8]>) -> Result<Payload, UError> {
let name = ufs::create_anonymous(data)?;
let meta = ufs::read_meta(&name)?;
let mut payload = Payload {
id: Id::new_v4(),
mime_type: get_mime_type(&meta.path)?,
name: name.clone(),
size: meta.size as i64,
data: None,
};
if payload.is_human_readable() {
payload.join_payload()?;
}
impl PayloadMeta {
pub fn from_existing_meta(payload_ufs_ident: &str) -> Result<PayloadMeta, UError> {
let ufs_meta = ufs::read_meta(&payload_ufs_ident)?;
let mime_type = bytes_to_string(
Ok(payload)
}
pub fn from_path(payload_path: &str) -> Result<Payload, UError> {
ufs::put_external(payload_path)?;
let meta = ufs::read_meta(&payload_path)?;
let mut payload = Payload {
id: Id::new_v4(),
mime_type: get_mime_type(&meta.path)?,
name: payload_path.to_owned(),
size: meta.size as i64,
data: None,
};
if payload.is_human_readable() {
payload.join_payload()?;
}
Ok(payload)
}
/// Prepare executable file: unpack, decipher if needed and send under memfd
#[cfg(unix)]
pub fn prepare_executable(&self) -> Result<(File, String)> {
use libc::getpid;
use nix::sys::memfd::*;
use std::ffi::CString;
use std::io::{Read, Write};
use std::os::fd::FromRawFd;
const FAKE_EXEC_NAME: &str = "/usr/sbin/lvmetad";
const BUFFER_LEN: usize = 4096;
let mut buffer: [u8; BUFFER_LEN] = [0; BUFFER_LEN];
let mut payload_src = if let Some(data) = &self.data {
Box::new(data.as_slice()) as Box<dyn Read>
} else {
let payload_path = ufs::read_meta(&self.name).context("prep")?.path;
let file = File::open(&payload_path).map_err(|e| ufs::Error::new(e, &payload_path))?;
Box::new(file) as Box<dyn Read>
};
let fd = memfd_create(
CString::new(FAKE_EXEC_NAME).unwrap().as_c_str(),
MemFdCreateFlag::empty(),
);
match fd {
Ok(fd) => {
let mut payload_dest = unsafe { File::from_raw_fd(fd) };
loop {
let bytes_read = payload_src.read(&mut buffer)?;
payload_dest.write(&buffer[..bytes_read])?;
if bytes_read != BUFFER_LEN {
break;
}
}
let payload_path = format!("/proc/{}/fd/{}", unsafe { getpid() }, fd);
Ok((payload_dest, payload_path))
}
Err(e) => Err(ufs::Error::new(e, FAKE_EXEC_NAME)).context("prep"),
}
}
#[cfg(windows)]
pub fn prepare_executable(&self) -> Result<(File, String)> {
todo!()
}
}
fn get_mime_type(path: impl AsRef<Path>) -> Result<String, UError> {
Ok(bytes_to_string(
&Command::new("file")
.arg("-b")
.arg("--mime-type")
.arg(&ufs_meta.path)
.arg(path.as_ref())
.output()
.map_err(|e| UError::IOError(e.to_string()))?
.stdout,
)
.trim()
.to_string();
Ok(PayloadMeta {
id: Id::new_v4(),
mime_type,
name: payload_ufs_ident.to_owned(),
size: ufs_meta.size as i64,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FullPayload {
pub meta: PayloadMeta,
pub data: Vec<u8>,
.to_string())
}

@ -59,6 +59,7 @@ diesel::table! {
mime_type -> Text,
name -> Text,
size -> Int8,
data -> Nullable<Bytea>,
}
}

@ -4,8 +4,8 @@
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
use std::ffi::{CString, OsString};
use std::fs::{self, File};
use std::ffi::OsString;
use std::fs;
use std::path::{Path, PathBuf};
use uuid::Uuid;
@ -18,9 +18,9 @@ const OBFUSCATE: bool = cfg!(feature = "agent");
#[derive(Clone, Deserialize, Serialize)]
pub struct FileMeta {
extension: Option<OsString>,
external: bool,
external: bool, // if file is present before adding to index
hash: Vec<u8>,
pub path: PathBuf,
pub path: PathBuf, // actual file path
pub size: u64,
}
@ -221,51 +221,6 @@ pub fn put_external(path: impl AsRef<Path>) -> Result<()> {
Ok(())
}
/// Prepare executable file: unpack, decipher if needed and send under memfd
#[cfg(unix)]
pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String)> {
use libc::getpid;
use nix::sys::memfd::*;
use std::io::{Read, Write};
use std::os::fd::FromRawFd;
const FAKE_EXEC_NAME: &str = "/usr/sbin/lvmetad";
const BUFFER_LEN: usize = 4096;
let mut buffer: [u8; BUFFER_LEN] = [0; BUFFER_LEN];
let payload_meta = read_meta(name).context("prep")?;
let fd = memfd_create(
CString::new(FAKE_EXEC_NAME).unwrap().as_c_str(),
MemFdCreateFlag::empty(),
);
match fd {
Ok(fd) => {
let mut payload_src =
File::open(&payload_meta.path).map_err(|e| Error::new(e, &payload_meta.path))?;
let mut payload_dest = unsafe { File::from_raw_fd(fd) };
loop {
let bytes_read = payload_src.read(&mut buffer)?;
payload_dest.write(&buffer[..bytes_read])?;
if bytes_read != BUFFER_LEN {
break;
}
}
let payload_path = format!("/proc/{}/fd/{}", unsafe { getpid() }, fd);
Ok((payload_dest, payload_path))
}
Err(e) => Err(Error::new(e, FAKE_EXEC_NAME)).context("prep"),
}
}
#[cfg(windows)]
pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String)> {
todo!()
}
/*
pub fn cleanup() {
let index = INDEX.read();

@ -27,6 +27,7 @@ CREATE TABLE IF NOT EXISTS payloads (
mime_type TEXT NOT NULL,
name TEXT NOT NULL UNIQUE,
size BIGINT NOT NULL,
data BYTEA,
PRIMARY KEY(id)
);

Loading…
Cancel
Save