From bb28f211230823978b183a45e569b946da411dc0 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Wed, 26 Apr 2023 19:02:08 +0300 Subject: [PATCH] remove redundant brief structs --- Cargo.lock | 27 ++- bin/u_agent/src/lib.rs | 12 +- bin/u_panel/src/argparse.rs | 25 +-- .../job-info-dialog.component.ts | 10 +- .../src/gui/fe/src/app/models/job.model.ts | 6 +- .../gui/fe/src/app/models/payload.model.ts | 12 +- .../gui/fe/src/app/services/api.service.ts | 4 +- bin/u_server/src/db.rs | 29 ++-- bin/u_server/src/error.rs | 26 +++ bin/u_server/src/handlers.rs | 89 ++++------ bin/u_server/src/u_server.rs | 6 +- integration-tests/tests/fixtures/agent.rs | 10 +- .../tests/integration_tests/api.rs | 14 +- .../tests/integration_tests/behaviour.rs | 4 +- lib/u_lib/src/api.rs | 24 ++- lib/u_lib/src/cache.rs | 4 +- lib/u_lib/src/jobs.rs | 66 ++----- lib/u_lib/src/logging.rs | 4 +- lib/u_lib/src/messaging.rs | 7 +- lib/u_lib/src/models/jobs/meta.rs | 102 +++++------ lib/u_lib/src/models/mod.rs | 13 +- lib/u_lib/src/models/payload.rs | 164 +++++++++++++++--- lib/u_lib/src/models/schema.rs | 1 + lib/u_lib/src/ufs/mod.rs | 53 +----- .../2020-10-24-111622_create_all/up.sql | 1 + 25 files changed, 358 insertions(+), 355 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3cdbe68..6f15ca8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2020,9 +2020,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.14" +version = "0.37.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b864d3c18a5785a05953adeed93e2dca37ed30f18e69bba9f30079d51f363f" +checksum = "a0661814f891c57c930a610266415528da53c4933e6dea5fb350cbfe048a9ece" dependencies = [ "bitflags", "errno 0.3.1", @@ -2503,9 +2503,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f" dependencies = [ "autocfg", "bytes", @@ -2517,14 +2517,14 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] name = "tokio-macros" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", @@ -2554,9 +2554,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ "futures-core", "pin-project-lite", @@ -2577,9 +2577,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", @@ -2606,11 +2606,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "cf9cf6a813d3f40c88b0b6b6f29a5c95c6cdbf97c1f9cc53fb820200f5ad814d" dependencies = [ - "cfg-if 1.0.0", "log", "pin-project-lite", "tracing-attributes", diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index 942b1b6..d63e5b8 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -10,7 +10,7 @@ use u_lib::{ config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL}, error::ErrChan, executor::pop_completed, - jobs::{split_payload, AnonymousJobBatch}, + jobs::AnonymousJobBatch, logging::init_logger, messaging::Reportable, models::AssignedJobById, @@ -21,7 +21,7 @@ async fn process_request(jobs: Vec, client: &HttpClient) { for jr in &jobs { if !JobCache::contains(jr.job_id) { info!("Fetching job: {}", &jr.job_id); - let fetched_job = loop { + let mut fetched_job = loop { //todo: use payload cache match client.get_full_job(jr.job_id).await { Ok(result) => break result, @@ -31,9 +31,11 @@ async fn process_request(jobs: Vec, client: &HttpClient) { } } }; - match split_payload(fetched_job) { - Ok(job_payload_meta) => JobCache::insert(job_payload_meta), - Err(e) => ErrChan::send(e, "pld").await, + if let Some(payload) = &mut fetched_job.payload { + match payload.maybe_split_payload() { + Ok(_) => JobCache::insert(fetched_job), + Err(e) => ErrChan::send(e, "pld").await, + } } } } diff --git a/bin/u_panel/src/argparse.rs b/bin/u_panel/src/argparse.rs index 66bf9d1..0863996 100644 --- a/bin/u_panel/src/argparse.rs +++ b/bin/u_panel/src/argparse.rs @@ -2,9 +2,8 @@ use serde_json::{from_str, to_value, Value}; use structopt::StructOpt; use u_lib::{ api::HttpClient, - jobs::join_payload, messaging::AsMsg, - models::{Agent, AssignedJob, BriefMode, BriefOrFullJob, RawJob}, + models::{Agent, AssignedJob, BriefMode, RawJob}, types::Id, types::PanelResult, UError, UResult, @@ -88,14 +87,13 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult { JobCRUD::Create { job } => { let raw_job = from_str::(&job) .map_err(|e| UError::DeserializeError(e.to_string(), job))?; - let job = raw_job.validated()?; - let full_job = join_payload(job)?; + let mut job = raw_job.validated()?; - into_value( - client - .upload_jobs([&BriefOrFullJob::Full(full_job)]) - .await?, - ) + if let Some(payload) = &mut job.payload { + payload.join_payload()?; + } + + into_value(client.upload_jobs([&job]).await?) } JobCRUD::RUD(RUD::Read { id }) => match id { Some(id) => into_value(vec![client.get_job(id, args.brief).await?]), @@ -104,10 +102,13 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult { JobCRUD::RUD(RUD::Update { item }) => { let raw_job = from_str::(&item) .map_err(|e| UError::DeserializeError(e.to_string(), item))?; - let job = raw_job.validated()?; - let full_job = join_payload(job)?; + let mut job = raw_job.validated()?; + + if let Some(payload) = &mut job.payload { + payload.join_payload()?; + } - into_value(client.update_job(&BriefOrFullJob::Full(full_job)).await?) + into_value(client.update_job(&job).await?) } JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), }, diff --git a/bin/u_panel/src/gui/fe/src/app/components/dialogs/job-info-dialog/job-info-dialog.component.ts b/bin/u_panel/src/gui/fe/src/app/components/dialogs/job-info-dialog/job-info-dialog.component.ts index 02a197e..c1ab012 100644 --- a/bin/u_panel/src/gui/fe/src/app/components/dialogs/job-info-dialog/job-info-dialog.component.ts +++ b/bin/u_panel/src/gui/fe/src/app/components/dialogs/job-info-dialog/job-info-dialog.component.ts @@ -1,9 +1,9 @@ import { Component, Inject } from '@angular/core'; import { MAT_DIALOG_DATA } from '@angular/material/dialog'; import { EventEmitter } from '@angular/core'; -import { BriefOrFullJobModel } from '../../../models/job.model'; +import { Job } from '../../../models/job.model'; import { ApiTableService } from 'src/app/services'; -import { BriefOrFullPayloadModel, isFullPayload } from 'src/app/models'; +import { PayloadModel } from 'src/app/models'; @Component({ selector: 'job-info-dialog', @@ -19,7 +19,7 @@ export class JobInfoDialogComponent { onSave = new EventEmitter(); - constructor(@Inject(MAT_DIALOG_DATA) public data: BriefOrFullJobModel, dataSource: ApiTableService) { + constructor(@Inject(MAT_DIALOG_DATA) public data: Job, dataSource: ApiTableService) { if (data.payload !== null) { this.showPayload(data.payload) } @@ -29,8 +29,8 @@ export class JobInfoDialogComponent { }) } - showPayload(payload: BriefOrFullPayloadModel) { - if (isFullPayload(payload)) { + showPayload(payload: PayloadModel) { + if (payload.data !== null) { this.decodedPayload = new TextDecoder().decode(new Uint8Array(payload.data)) } else { this.isTooBigPayload = true diff --git a/bin/u_panel/src/gui/fe/src/app/models/job.model.ts b/bin/u_panel/src/gui/fe/src/app/models/job.model.ts index 492db84..301ad5a 100644 --- a/bin/u_panel/src/gui/fe/src/app/models/job.model.ts +++ b/bin/u_panel/src/gui/fe/src/app/models/job.model.ts @@ -1,4 +1,4 @@ -import { BriefOrFullPayloadModel } from './' +import { PayloadModel } from './' export interface JobModel { alias: string | null, @@ -10,7 +10,7 @@ export interface JobModel { schedule: string | null, } -export interface BriefOrFullJobModel { +export interface Job { job: JobModel, - payload: BriefOrFullPayloadModel | null, + payload: PayloadModel | null, } \ No newline at end of file diff --git a/bin/u_panel/src/gui/fe/src/app/models/payload.model.ts b/bin/u_panel/src/gui/fe/src/app/models/payload.model.ts index 4e1e9c7..e947ac8 100644 --- a/bin/u_panel/src/gui/fe/src/app/models/payload.model.ts +++ b/bin/u_panel/src/gui/fe/src/app/models/payload.model.ts @@ -3,15 +3,5 @@ export interface PayloadModel { mime_type: string, name: string, size: number, -} - -export interface FullPayloadModel { - meta: PayloadModel, - data: number[] -} - -export type BriefOrFullPayloadModel = PayloadModel | FullPayloadModel; - -export function isFullPayload(payload: BriefOrFullPayloadModel): payload is FullPayloadModel { - return (payload as FullPayloadModel).data !== undefined + data: number[] | null } \ No newline at end of file diff --git a/bin/u_panel/src/gui/fe/src/app/services/api.service.ts b/bin/u_panel/src/gui/fe/src/app/services/api.service.ts index cee6ecb..4f2dc8a 100644 --- a/bin/u_panel/src/gui/fe/src/app/services/api.service.ts +++ b/bin/u_panel/src/gui/fe/src/app/services/api.service.ts @@ -1,7 +1,7 @@ import { environment } from 'src/environments/environment'; import { HttpClient, HttpErrorResponse } from '@angular/common/http'; import { Observable, map, catchError, throwError } from 'rxjs'; -import { ApiModel, getAreaByModel, PayloadModel, Empty, Area, AgentModel, JobModel, ResultModel, BriefOrFullJobModel } from '../models'; +import { ApiModel, getAreaByModel, PayloadModel, Empty, Area, AgentModel, JobModel, ResultModel, Job } from '../models'; import { Injectable, Inject } from '@angular/core'; import { ErrorService } from './error.service'; @@ -53,7 +53,7 @@ export class ApiTableService { return this.getOne(id, 'agents') } - getJob(id: string): Observable { + getJob(id: string): Observable { return this.getOne(id, 'jobs') } diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 0f7dfe6..a417b49 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -3,7 +3,7 @@ use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection use std::mem::drop; use u_lib::{ db::PgAsyncPool, - models::{schema, Agent, AssignedJob, BriefJob, JobModel, JobState, PayloadMeta}, + models::{schema, Agent, AssignedJob, Job, JobModel, JobState, Payload}, platform::Platform, types::Id, }; @@ -50,13 +50,11 @@ pub struct UDB<'c> { } impl UDB<'_> { - pub fn insert_jobs(&mut self, jobs: &[BriefJob]) -> Result<()> { + pub fn insert_jobs(&mut self, jobs: &[Job]) -> Result<()> { use schema::{jobs, payloads}; - let (jobs, payloads_opt): (Vec<_>, Vec<_>) = jobs - .iter() - .map(|j| (&j.job, j.payload_meta.as_ref())) - .unzip(); + let (jobs, payloads_opt): (Vec<_>, Vec<_>) = + jobs.iter().map(|j| (&j.job, j.payload.as_ref())).unzip(); let payloads = payloads_opt .into_iter() @@ -76,17 +74,17 @@ impl UDB<'_> { .map_err(with_err_ctx("Can't insert jobs")) } - pub fn get_job(&mut self, id: Id) -> Result> { + pub fn get_job(&mut self, id: Id) -> Result> { use schema::{jobs, payloads}; let maybe_job_with_payload = jobs::table .left_join(payloads::table) .filter(jobs::id.eq(id)) - .first::<(JobModel, Option)>(self.conn) + .first::<(JobModel, Option)>(self.conn) .optional() .map_err(with_err_ctx(format!("Can't get job {id}")))?; - Ok(maybe_job_with_payload.map(|(job, payload_meta)| BriefJob { job, payload_meta })) + Ok(maybe_job_with_payload.map(|(job, payload)| Job { job, payload })) } pub fn get_jobs(&mut self) -> Result> { @@ -97,7 +95,7 @@ impl UDB<'_> { .map_err(with_err_ctx("Can't get jobs")) } - pub fn get_payload_meta(&mut self, id: Id) -> Result> { + pub fn get_payload(&mut self, id: Id) -> Result> { use schema::payloads; payloads::table @@ -107,7 +105,7 @@ impl UDB<'_> { .map_err(with_err_ctx(format!("Can't get payload {id}"))) } - pub fn get_payload_metas(&mut self) -> Result> { + pub fn get_payloads(&mut self) -> Result> { use schema::payloads; payloads::table @@ -115,17 +113,20 @@ impl UDB<'_> { .map_err(with_err_ctx("Can't get payloads")) } - pub fn find_job_by_alias(&mut self, alias: &str) -> Result> { + pub fn find_job_by_alias(&mut self, alias: &str) -> Result> { use schema::{jobs, payloads}; let maybe_job_with_payload = jobs::table .left_join(payloads::table) .filter(jobs::alias.eq(alias)) - .first::<(JobModel, Option)>(self.conn) + .first::<(JobModel, Option)>(self.conn) .optional() .map_err(with_err_ctx(format!("Can't get job by alias {alias}")))?; - Ok(maybe_job_with_payload.map(|(job, payload_meta)| BriefJob { job, payload_meta })) + Ok(maybe_job_with_payload.map(|(job, payload_meta)| Job { + job, + payload: payload_meta, + })) } pub fn insert_result(&mut self, result: &AssignedJob) -> Result<()> { diff --git a/bin/u_server/src/error.rs b/bin/u_server/src/error.rs index 26eb199..4a958dc 100644 --- a/bin/u_server/src/error.rs +++ b/bin/u_server/src/error.rs @@ -30,6 +30,12 @@ pub enum Error { #[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")] InsuitablePlatform(String, String), + + #[error("{0}\nContext: {1}")] + Contexted(Box, String), + + #[error("Runtime error: {0}")] + Runtime(String), } impl Reject for Error {} @@ -67,3 +73,23 @@ impl Reply for RejResponse { with_status(self.message, self.status).into_response() } } + +impl From for Error { + fn from(e: anyhow::Error) -> Self { + let ctx = e + .chain() + .rev() + .skip(1) + .map(|cause| format!("ctx: {}", cause)) + .collect::>() + .join("\n"); + + match e.downcast::() { + Ok(err) => Error::Contexted(Box::new(err), ctx), + Err(err) => match err.downcast::() { + Ok(err) => Error::Contexted(Box::new(Error::FSError(err)), ctx), + Err(err) => Error::Runtime(err.to_string()), + }, + } + } +} diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index e1d6b7c..1a52b61 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -3,19 +3,10 @@ use std::sync::Arc; use crate::db::{PgRepo, UDB}; use crate::error::Error; use serde::Deserialize; -use u_lib::ufs; -use u_lib::{ - api::retypes, - jobs::{join_payload, split_payload}, - messaging::Reportable, - models::*, - types::Id, -}; +use u_lib::{api::retypes, messaging::Reportable, models::*, types::Id}; use warp::reject::not_found; use warp::Rejection; -const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32; - type EndpResult = Result; #[derive(Deserialize)] @@ -48,29 +39,24 @@ impl Endpoints { id: Id, params: Option, ) -> EndpResult { - let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else { + let Some(mut job) = repo.interact(move |mut db| db.get_job(id)).await? else { return Err(not_found()) }; - let make_full_job = |j| -> Result { - let full_job = join_payload(j).map_err(Error::from)?; - Ok(BriefOrFullJob::Full(full_job)) - }; Ok(match params.map(|p| p.brief) { - Some(BriefMode::Yes) => BriefOrFullJob::Brief(job), + Some(BriefMode::Yes) => job, Some(BriefMode::Auto) | None => { - if job - .payload_meta - .as_ref() - .map(|m| m.size > MAX_READABLE_PAYLOAD_SIZE) - .unwrap_or(false) - { - BriefOrFullJob::Brief(job) - } else { - make_full_job(job)? + if let Some(payload) = &mut job.payload { + payload.maybe_join_payload().map_err(Error::from)?; } + job + } + Some(BriefMode::No) => { + if let Some(payload) = &mut job.payload { + payload.join_payload().map_err(Error::from)?; + } + job } - Some(BriefMode::No) => make_full_job(job)?, }) } @@ -90,7 +76,7 @@ impl Endpoints { } pub async fn get_payloads(repo: Arc) -> EndpResult { - repo.interact(move |mut db| db.get_payload_metas()) + repo.interact(move |mut db| db.get_payloads()) .await .map_err(From::from) } @@ -100,24 +86,19 @@ impl Endpoints { id: Id, params: Option, ) -> EndpResult { - let Some(meta) = repo.interact(move |mut db| db.get_payload_meta(id)).await? else { + let Some(mut payload) = repo.interact(move |mut db| db.get_payload(id)).await? else { return Err(not_found()) }; Ok(match params.map(|p| p.brief) { - Some(BriefMode::Yes) => BriefOrFullPayload::Brief(meta), - None | Some(BriefMode::Auto) if meta.size > MAX_READABLE_PAYLOAD_SIZE => { - BriefOrFullPayload::Brief(meta) + Some(BriefMode::Yes) => payload, + None | Some(BriefMode::Auto) => { + payload.maybe_join_payload().map_err(Error::from)?; + payload } _ => { - let payload_data = ufs::read(&meta.name).map_err(|e| { - error!("payload reading failed: {}", e); - Error::from(e.downcast::().expect("wrong error type")) - })?; - BriefOrFullPayload::Full(FullPayload { - meta, - data: payload_data, - }) + payload.join_payload().map_err(Error::from)?; + payload } }) } @@ -162,17 +143,16 @@ impl Endpoints { .map_err(From::from) } - pub async fn upload_jobs( - repo: Arc, - msg: Vec, - ) -> EndpResult { + pub async fn upload_jobs(repo: Arc, msg: Vec) -> EndpResult { let jobs = msg .into_iter() - .map(|meta| match meta { - BriefOrFull::Full(job) => Ok(split_payload(job)?), - BriefOrFull::Brief(job) => Ok(job), + .map(|mut job| { + if let Some(payload) = &mut job.payload { + payload.maybe_split_payload()?; + } + Ok(job) }) - .collect::, Error>>()?; + .collect::, Error>>()?; repo.interact(move |mut db| db.insert_jobs(&jobs)) .await @@ -286,17 +266,12 @@ impl Endpoints { Ok(()) } - pub async fn update_job( - repo: Arc, - job: BriefOrFullJob, - ) -> EndpResult { - let thin_job = match job { - BriefOrFullJob::Full(job) => split_payload(job).map_err(Error::from)?, - BriefOrFullJob::Brief(job) => job, - }; + pub async fn update_job(repo: Arc, mut job: Job) -> EndpResult { + if let Some(payload) = &mut job.payload { + payload.maybe_split_payload().map_err(Error::from)?; + } - repo.interact(move |mut db| db.update_job(&thin_job.job)) - .await?; + repo.interact(move |mut db| db.update_job(&job.job)).await?; Ok(()) } diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index 5d06e96..ce1cd9e 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -57,7 +57,7 @@ pub fn init_endpoints( let upload_jobs = path("upload_jobs") .and(with_db.clone()) - .and(body::json::>()) + .and(body::json::>()) .and_then(Endpoints::upload_jobs) .map(into_message); @@ -115,7 +115,7 @@ pub fn init_endpoints( let update_job = path("update_job") .and(with_db.clone()) - .and(body::json::()) + .and(body::json::()) .and_then(Endpoints::update_job) .map(ok); @@ -167,7 +167,7 @@ pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> { let job_alias = "agent_hello"; let if_job_exists = db.find_job_by_alias(job_alias)?; if if_job_exists.is_none() { - let agent_hello = RawJob::brief_job_builder() + let agent_hello = RawJob::builder() .with_type(JobType::Init) .with_alias(job_alias) .build() diff --git a/integration-tests/tests/fixtures/agent.rs b/integration-tests/tests/fixtures/agent.rs index f8264c8..e0dc8e2 100644 --- a/integration-tests/tests/fixtures/agent.rs +++ b/integration-tests/tests/fixtures/agent.rs @@ -1,7 +1,6 @@ use super::connections::*; use super::run_async; -use u_lib::unwrap_enum; -use u_lib::{api::HttpClient, jobs::split_payload, messaging::Reportable, models::*, types::Id}; +use u_lib::{api::HttpClient, messaging::Reportable, models::*, types::Id}; pub struct RegisteredAgent { pub id: Id, @@ -23,14 +22,17 @@ pub fn registered_agent(client: &HttpClient) -> RegisteredAgent { .unwrap(); let job_id = resp.job_id; let job = client.get_job(job_id, BriefMode::No).await.unwrap(); - let job = unwrap_enum!(job, BriefOrFull::Full); + assert_eq!(job.job.alias, Some("agent_hello".to_string())); - let mut agent_data = AssignedJob::from((&split_payload(job).unwrap().job, resp)); + + let mut agent_data = AssignedJob::from((&job.job, resp)); agent_data.set_result(&agent); + client .report([Reportable::Assigned(agent_data)]) .await .unwrap(); + RegisteredAgent { id: agent_id } }) } diff --git a/integration-tests/tests/integration_tests/api.rs b/integration-tests/tests/integration_tests/api.rs index 7051e9a..734ea31 100644 --- a/integration-tests/tests/integration_tests/api.rs +++ b/integration-tests/tests/integration_tests/api.rs @@ -14,13 +14,13 @@ // ping(&self) use crate::fixtures::connections::*; -use u_lib::models::{BriefOrFullJob, RawJob}; +use u_lib::models::RawJob; #[rstest] #[tokio::test] async fn test_jobs_endpoints(client_panel: &HttpClient) { let job_alias = "henlo"; - let mut job = RawJob::brief_job_builder() + let mut job = RawJob::builder() .with_shell("echo henlo") .with_alias(job_alias) .build() @@ -28,19 +28,13 @@ async fn test_jobs_endpoints(client_panel: &HttpClient) { let job_id = job.job.id; - client_panel - .upload_jobs([&BriefOrFullJob::Brief(job.clone())]) - .await - .unwrap(); + client_panel.upload_jobs([&job]).await.unwrap(); let fetched_job = client_panel.get_brief_job(job_id).await.unwrap(); assert_eq!(job, fetched_job); job.job.alias = Some("henlo2".to_string()); - client_panel - .update_job(&BriefOrFullJob::Brief(job.clone())) - .await - .unwrap(); + client_panel.update_job(&job).await.unwrap(); let fetched_job = client_panel.get_brief_job(job_id).await.unwrap(); assert_eq!(job, fetched_job); diff --git a/integration-tests/tests/integration_tests/behaviour.rs b/integration-tests/tests/integration_tests/behaviour.rs index 99facbb..51ea208 100644 --- a/integration-tests/tests/integration_tests/behaviour.rs +++ b/integration-tests/tests/integration_tests/behaviour.rs @@ -21,7 +21,7 @@ async fn setup_tasks() { let agents: Vec = Panel::check_output("agents read"); let agent_id = agents[0].id; let job_alias = "passwd_contents"; - let job = RawJob::brief_job_builder() + let job = RawJob::builder() .with_alias(job_alias) .with_raw_payload(b"cat /etc/passwd".as_slice()) .with_shell("/bin/bash {}") @@ -54,7 +54,7 @@ async fn large_payload() { let agent = &Panel::check_output::>("agents read")[0]; let agent_id = agent.id; let job_alias = "large_payload"; - let job = RawJob::brief_job_builder() + let job = RawJob::builder() .with_alias(job_alias) .with_payload_path("./tests/bin/echoer") .with_shell("{} type echo") diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 2a21165..09e25a7 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -7,7 +7,6 @@ use reqwest::{header, header::HeaderMap, Certificate, Client, Identity, Method, use serde::de::DeserializeOwned; use serde_json::{from_str, Value}; -use crate::unwrap_enum; use crate::{ config::{get_self_id, MASTER_PORT}, conv::opt_to_string, @@ -25,9 +24,8 @@ pub mod retypes { pub type GetPersonalJobs = Vec; pub type Report = (); - pub type GetJob = BriefOrFullJob; - pub type GetFullJob = FullJob; - pub type GetBriefJob = BriefJob; + pub type GetJob = Job; + pub type GetBriefJob = Job; pub type GetJobs = Vec; pub type GetAgents = Vec; pub type UpdateAgent = (); @@ -38,8 +36,8 @@ pub mod retypes { pub type SetJobs = Vec; pub type GetAgentJobs = Vec; pub type Ping = (); - pub type GetPayloads = Vec; - pub type GetPayload = BriefOrFullPayload; + pub type GetPayloads = Vec; + pub type GetPayload = Payload; } #[derive(Clone, Debug)] @@ -165,14 +163,12 @@ impl HttpClient { self.req(format!("get_job/{job}?brief={brief}")).await } - pub async fn get_full_job(&self, job: Id) -> Result { - let job = self.get_job(job, BriefMode::No).await?; - Ok(unwrap_enum!(job, BriefOrFullJob::Full)) + pub async fn get_full_job(&self, job: Id) -> Result { + self.get_job(job, BriefMode::No).await } - pub async fn get_brief_job(&self, job: Id) -> Result { - let job = self.get_job(job, BriefMode::Yes).await?; - Ok(unwrap_enum!(job, BriefOrFullJob::Brief)) + pub async fn get_brief_job(&self, job: Id) -> Result { + self.get_job(job, BriefMode::Yes).await } /// get all available jobs @@ -196,7 +192,7 @@ impl HttpClient { } /// update job - pub async fn update_job(&self, job: &BriefOrFullJob) -> Result { + pub async fn update_job(&self, job: &Job) -> Result { self.req_with_payload("update_job", job).await } @@ -208,7 +204,7 @@ impl HttpClient { /// create and upload job pub async fn upload_jobs( &self, - payload: impl IntoIterator, + payload: impl IntoIterator, ) -> Result { self.req_with_payload("upload_jobs", &payload.into_iter().collect::>()) .await diff --git a/lib/u_lib/src/cache.rs b/lib/u_lib/src/cache.rs index 3d8f4b4..c81414c 100644 --- a/lib/u_lib/src/cache.rs +++ b/lib/u_lib/src/cache.rs @@ -1,10 +1,10 @@ -use crate::models::BriefJob; +use crate::models::Job; use crate::types::Id; use lazy_static::lazy_static; use parking_lot::{RwLock, RwLockReadGuard}; use std::{collections::HashMap, ops::Deref}; -type Val = BriefJob; +type Val = Job; type Cache = HashMap; lazy_static! { diff --git a/lib/u_lib/src/jobs.rs b/lib/u_lib/src/jobs.rs index 5c01b91..4c433cc 100644 --- a/lib/u_lib/src/jobs.rs +++ b/lib/u_lib/src/jobs.rs @@ -1,11 +1,8 @@ use crate::{ combined_result::CombinedResult, executor::{ExecResult, Waiter}, - models::{ - Agent, AssignedJob, AssignedJobById, BriefJob, FullJob, FullPayload, JobType, RawJob, - }, + models::{Agent, AssignedJob, AssignedJobById, Job, JobType, RawJob}, proc_output::ProcOutput, - ufs, }; use std::collections::HashMap; use std::process::exit; @@ -17,7 +14,7 @@ pub struct AnonymousJobBatch { } impl AnonymousJobBatch { - pub fn from_meta_with_id(jobs: impl IntoIterator) -> Self { + pub fn from_meta_with_id(jobs: impl IntoIterator) -> Self { let mut waiter = Waiter::new(); for (job, ids) in jobs { waiter.push(run_assigned_job(job, ids)); @@ -28,7 +25,7 @@ impl AnonymousJobBatch { } } - pub fn from_meta(jobs: impl IntoIterator) -> Self { + pub fn from_meta(jobs: impl IntoIterator) -> Self { let jobs_ids: Vec<_> = jobs .into_iter() .map(|job| { @@ -47,6 +44,7 @@ impl AnonymousJobBatch { /// Spawn jobs pub async fn spawn(mut self) -> Self { + debug!("spawning jobs"); self.waiter = self.waiter.spawn().await; self.is_running = true; self @@ -81,11 +79,7 @@ impl NamedJobBatch { let jobs: Vec<_> = named_jobs .into_iter() .filter_map(|(alias, cmd)| { - match RawJob::brief_job_builder() - .with_shell(cmd) - .with_alias(alias) - .build() - { + match RawJob::builder().with_shell(cmd).with_alias(alias).build() { Ok(jpm) => Some(jpm), Err(e) => { result.push_err(e); @@ -98,7 +92,7 @@ impl NamedJobBatch { result } - pub fn from_meta(named_jobs: Vec) -> Self { + pub fn from_meta(named_jobs: Vec) -> Self { let (job_names, jobs): (Vec<_>, Vec<_>) = named_jobs .into_iter() .map(|job| (job.job.alias.clone().unwrap(), job)) @@ -134,14 +128,14 @@ impl NamedJobBatch { } } -pub async fn run_assigned_job(job: BriefJob, ids: AssignedJobById) -> ExecResult { - let BriefJob { job, payload_meta } = job; +pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult { + let Job { job, payload } = job; let mut result = AssignedJob::from((&job, ids)); match job.exec_type { JobType::Shell => { let (argv, _prepared_payload) = { - if let Some(meta) = payload_meta { - let (prep_exec, prep_exec_path) = ufs::prepare_executable(meta.name)?; + if let Some(payload) = payload { + let (prep_exec, prep_exec_path) = payload.prepare_executable()?; let argv_with_exec = job.argv.replace("{}", &prep_exec_path); (argv_with_exec, Some(prep_exec)) } else { @@ -181,38 +175,6 @@ pub async fn run_assigned_job(job: BriefJob, ids: AssignedJobById) -> ExecResult Ok(result) } -pub fn split_payload(job: FullJob) -> Result { - let FullJob { job, payload } = job; - - if let Some(payload) = &payload { - if ufs::exists_in_index(&payload.meta.name) { - ufs::edit(&payload.meta.name, &payload.data)?; - } else { - ufs::put(&payload.meta.name, &payload.data)?; - } - } - - Ok(BriefJob { - job, - payload_meta: payload.map(|p| p.meta), - }) -} - -pub fn join_payload(job: BriefJob) -> Result { - let BriefJob { job, payload_meta } = job; - let payload = match payload_meta { - Some(meta) => { - let payload_data = ufs::read(&meta.name)?; - Some(FullPayload { - meta, - data: payload_data, - }) - } - None => None, - }; - Ok(FullJob { job, payload }) -} - #[cfg(test)] mod tests { use crate::{ @@ -265,7 +227,7 @@ mod tests { #[case] payload: Option<&[u8]>, #[case] expected_result: &str, ) -> TestResult { - let mut job = RawJob::brief_job_builder().with_shell(cmd); + let mut job = RawJob::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_raw_payload(p); } @@ -348,7 +310,7 @@ mod tests { #[case] payload: Option<&[u8]>, #[case] err_str: &str, ) -> TestResult { - let mut job = RawJob::brief_job_builder().with_shell(cmd); + let mut job = RawJob::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_raw_payload(p); } @@ -361,12 +323,12 @@ mod tests { #[tokio::test] async fn test_different_job_types() -> TestResult { let mut jobs = NamedJobBatch::from_meta(vec![ - RawJob::brief_job_builder() + RawJob::builder() .with_shell("sleep 3") .with_alias("sleeper") .build() .unwrap(), - RawJob::brief_job_builder() + RawJob::builder() .with_type(JobType::Init) .with_alias("gatherer") .build() diff --git a/lib/u_lib/src/logging.rs b/lib/u_lib/src/logging.rs index ae32a4f..018df38 100644 --- a/lib/u_lib/src/logging.rs +++ b/lib/u_lib/src/logging.rs @@ -1,5 +1,5 @@ use std::env; -use std::io::{stderr, stdout}; +use std::io::stderr; use std::path::Path; use tracing_appender::rolling; @@ -11,7 +11,7 @@ pub fn init_logger(logfile: Option<&str>) { } let output_layer = if cfg!(test) { - fmt::layer().with_writer(stdout).with_test_writer().boxed() + fmt::layer().with_test_writer().boxed() } else { fmt::layer().with_writer(stderr).boxed() }; diff --git a/lib/u_lib/src/messaging.rs b/lib/u_lib/src/messaging.rs index cea2be9..721bbb8 100644 --- a/lib/u_lib/src/messaging.rs +++ b/lib/u_lib/src/messaging.rs @@ -11,12 +11,9 @@ impl AsMsg for Agent {} impl AsMsg for AssignedJob {} impl AsMsg for AssignedJobById {} impl AsMsg for JobModel {} -impl AsMsg for FullJob {} impl AsMsg for Reportable {} -impl AsMsg for PayloadMeta {} -impl AsMsg for FullPayload {} -impl AsMsg for BriefJob {} -impl AsMsg for BriefOrFull {} +impl AsMsg for Payload {} +impl AsMsg for Job {} impl AsMsg for Id {} impl AsMsg for String {} impl AsMsg for Vec {} diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 6039ad1..1d79122 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -3,10 +3,10 @@ use std::fmt; use super::JobType; #[cfg(feature = "server")] use crate::models::schema::*; -use crate::models::{FullPayload, PayloadMeta}; +use crate::models::Payload; use crate::platform; use crate::types::Id; -use crate::{ufs, UError, UResult}; +use crate::{UError, UResult}; #[cfg(feature = "server")] use diesel::{Identifiable, Insertable, Queryable}; use serde::{Deserialize, Serialize}; @@ -33,15 +33,9 @@ pub struct JobModel { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct FullJob { +pub struct Job { pub job: JobModel, - pub payload: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct BriefJob { - pub job: JobModel, - pub payload_meta: Option, + pub payload: Option, } #[derive(Serialize, Deserialize, Clone)] @@ -105,9 +99,12 @@ impl fmt::Debug for RawJob<'_> { } } -impl From for RawJob<'_> { - fn from(job: BriefJob) -> Self { - let BriefJob { job, payload_meta } = job; +impl From for RawJob<'_> { + fn from(job: Job) -> Self { + let Job { + job, + payload: payload_meta, + } = job; RawJob { alias: job.alias, argv: job.argv, @@ -122,15 +119,15 @@ impl From for RawJob<'_> { } impl<'p> RawJob<'p> { - pub fn validated(self) -> UResult { + pub fn validated(self) -> UResult { JobBuilder { inner: self }.build() } - pub fn from_shell(cmd: impl Into) -> UResult { - Self::brief_job_builder().with_shell(cmd).build() + pub fn from_shell(cmd: impl Into) -> UResult { + Self::builder().with_shell(cmd).build() } - pub fn brief_job_builder() -> JobBuilder<'p> { + pub fn builder() -> JobBuilder<'p> { JobBuilder::default() } } @@ -174,29 +171,40 @@ impl<'p> JobBuilder<'p> { self } - pub fn build(self) -> UResult { + pub fn build(self) -> UResult { let mut inner = self.inner; - let raw_into_job = |raw: RawJob| -> UResult { - let payload_meta = raw - .payload_path - .as_ref() - .map(|payload_ident| PayloadMeta::from_existing_meta(payload_ident)) - .transpose()?; + fn _build(job: RawJob) -> UResult { + let payload = { + let payload_from_path = job + .payload_path + .as_ref() + .map(|path| Payload::from_path(path)) + .transpose()?; + + if payload_from_path.is_none() { + job.raw_payload + .as_ref() + .map(|data| Payload::from_data(data)) + .transpose()? + } else { + payload_from_path + } + }; - Ok(BriefJob { + Ok(Job { job: JobModel { - alias: raw.alias, - argv: raw.argv, - id: raw.id, - exec_type: raw.exec_type, - target_platforms: raw.target_platforms, - payload: payload_meta.as_ref().map(|meta| meta.id), - schedule: raw.schedule, + alias: job.alias, + argv: job.argv, + id: job.id, + exec_type: job.exec_type, + target_platforms: job.target_platforms, + payload: payload.as_ref().map(|p| p.id), + schedule: job.schedule, }, - payload_meta, + payload, }) - }; + } match inner.exec_type { JobType::Shell => { @@ -219,23 +227,14 @@ impl<'p> JobBuilder<'p> { return Err(empty_err.into()); } - if let Some(path) = &inner.payload_path { - ufs::put_external(path)?; - } - - if let Some(raw_payload) = &inner.raw_payload { - match inner.payload_path { - Some(_) => { - return Err(UError::JobBuildError( - "Can't use both raw payload with payload path".to_string(), - )) - } - None => inner.payload_path = Some(ufs::create_anonymous(raw_payload)?), - } + if inner.raw_payload.is_some() && inner.payload_path.is_some() { + return Err(UError::JobBuildError( + "Can't use both raw payload with payload path".to_string(), + )); } match inner.payload_path.as_ref() { - Some(_) => { + Some(_) | None if inner.raw_payload.is_some() => { if !inner.argv.contains("{}") { return Err(UError::JobBuildError( "Argv contains no executable placeholder".into(), @@ -244,7 +243,7 @@ impl<'p> JobBuilder<'p> { } } None => { - if inner.argv.contains("{}") { + if inner.argv.contains("{}") && inner.raw_payload.is_none() { return Err(UError::JobBuildError( "No payload provided, but argv contains executable placeholder" .into(), @@ -252,6 +251,7 @@ impl<'p> JobBuilder<'p> { .into()); } } + _ => (), }; if inner.target_platforms.is_empty() { @@ -265,9 +265,9 @@ impl<'p> JobBuilder<'p> { ))); } - raw_into_job(inner) + _build(inner) } - _ => raw_into_job(inner), + _ => _build(inner), } } } diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index e8f315d..44d56a8 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -4,21 +4,10 @@ mod payload; #[cfg(feature = "server")] pub mod schema; -use crate::messaging::AsMsg; pub use crate::models::{agent::*, jobs::*, payload::*}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use strum::{Display as StrumDisplay, EnumString}; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(untagged)] -pub enum BriefOrFull { - Brief(B), - Full(F), -} - -pub type BriefOrFullJob = BriefOrFull; -pub type BriefOrFullPayload = BriefOrFull; - #[derive(Default, Debug, StrumDisplay, EnumString, Deserialize)] pub enum BriefMode { Yes, diff --git a/lib/u_lib/src/models/payload.rs b/lib/u_lib/src/models/payload.rs index 88cf6fa..2b162fe 100644 --- a/lib/u_lib/src/models/payload.rs +++ b/lib/u_lib/src/models/payload.rs @@ -1,6 +1,9 @@ use crate::{conv::bytes_to_string, types::Id, ufs, UError}; +use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; -use std::process::Command; +use std::{fs::File, path::Path, process::Command}; + +const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32; #[cfg(feature = "server")] use crate::models::schema::*; @@ -13,39 +16,148 @@ use diesel::Identifiable; diesel(table_name = payloads) )] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct PayloadMeta { +pub struct Payload { pub id: Id, pub mime_type: String, pub name: String, pub size: i64, + data: Option>, // when None, payload data is stored in ufs } -impl PayloadMeta { - pub fn from_existing_meta(payload_ufs_ident: &str) -> Result { - let ufs_meta = ufs::read_meta(&payload_ufs_ident)?; - let mime_type = bytes_to_string( - &Command::new("file") - .arg("-b") - .arg("--mime-type") - .arg(&ufs_meta.path) - .output() - .map_err(|e| UError::IOError(e.to_string()))? - .stdout, - ) - .trim() - .to_string(); - - Ok(PayloadMeta { +impl Payload { + pub fn is_human_readable(&self) -> bool { + self.size < MAX_READABLE_PAYLOAD_SIZE && self.mime_type.starts_with("text/") + } + + pub fn maybe_split_payload(&mut self) -> Result<()> { + if self.is_human_readable() { + return Ok(()); + } + + if let Some(data) = self.data.take() { + if ufs::exists_in_index(&self.name) { + ufs::edit(&self.name, data)?; + } else { + ufs::put(&self.name, data)?; + } + } + Ok(()) + } + + pub fn join_payload(&mut self) -> Result<()> { + if self.data.is_none() { + self.data = Some(ufs::read(&self.name)?); + } + Ok(()) + } + + pub fn maybe_join_payload(&mut self) -> Result<()> { + if self.is_human_readable() { + self.join_payload()?; + } + Ok(()) + } + + pub fn from_data(data: impl AsRef<[u8]>) -> Result { + let name = ufs::create_anonymous(data)?; + let meta = ufs::read_meta(&name)?; + + let mut payload = Payload { id: Id::new_v4(), - mime_type, - name: payload_ufs_ident.to_owned(), - size: ufs_meta.size as i64, - }) + mime_type: get_mime_type(&meta.path)?, + name: name.clone(), + size: meta.size as i64, + data: None, + }; + + if payload.is_human_readable() { + payload.join_payload()?; + } + + Ok(payload) + } + + pub fn from_path(payload_path: &str) -> Result { + ufs::put_external(payload_path)?; + + let meta = ufs::read_meta(&payload_path)?; + + let mut payload = Payload { + id: Id::new_v4(), + mime_type: get_mime_type(&meta.path)?, + name: payload_path.to_owned(), + size: meta.size as i64, + data: None, + }; + + if payload.is_human_readable() { + payload.join_payload()?; + } + + Ok(payload) + } + + /// Prepare executable file: unpack, decipher if needed and send under memfd + #[cfg(unix)] + pub fn prepare_executable(&self) -> Result<(File, String)> { + use libc::getpid; + use nix::sys::memfd::*; + use std::ffi::CString; + use std::io::{Read, Write}; + use std::os::fd::FromRawFd; + + const FAKE_EXEC_NAME: &str = "/usr/sbin/lvmetad"; + const BUFFER_LEN: usize = 4096; + + let mut buffer: [u8; BUFFER_LEN] = [0; BUFFER_LEN]; + let mut payload_src = if let Some(data) = &self.data { + Box::new(data.as_slice()) as Box + } else { + let payload_path = ufs::read_meta(&self.name).context("prep")?.path; + let file = File::open(&payload_path).map_err(|e| ufs::Error::new(e, &payload_path))?; + Box::new(file) as Box + }; + + let fd = memfd_create( + CString::new(FAKE_EXEC_NAME).unwrap().as_c_str(), + MemFdCreateFlag::empty(), + ); + + match fd { + Ok(fd) => { + let mut payload_dest = unsafe { File::from_raw_fd(fd) }; + + loop { + let bytes_read = payload_src.read(&mut buffer)?; + payload_dest.write(&buffer[..bytes_read])?; + + if bytes_read != BUFFER_LEN { + break; + } + } + let payload_path = format!("/proc/{}/fd/{}", unsafe { getpid() }, fd); + Ok((payload_dest, payload_path)) + } + Err(e) => Err(ufs::Error::new(e, FAKE_EXEC_NAME)).context("prep"), + } + } + + #[cfg(windows)] + pub fn prepare_executable(&self) -> Result<(File, String)> { + todo!() } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct FullPayload { - pub meta: PayloadMeta, - pub data: Vec, +fn get_mime_type(path: impl AsRef) -> Result { + Ok(bytes_to_string( + &Command::new("file") + .arg("-b") + .arg("--mime-type") + .arg(path.as_ref()) + .output() + .map_err(|e| UError::IOError(e.to_string()))? + .stdout, + ) + .trim() + .to_string()) } diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index 5fac813..9908f4d 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -59,6 +59,7 @@ diesel::table! { mime_type -> Text, name -> Text, size -> Int8, + data -> Nullable, } } diff --git a/lib/u_lib/src/ufs/mod.rs b/lib/u_lib/src/ufs/mod.rs index e59d110..c6023c5 100644 --- a/lib/u_lib/src/ufs/mod.rs +++ b/lib/u_lib/src/ufs/mod.rs @@ -4,8 +4,8 @@ use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use std::env::temp_dir; -use std::ffi::{CString, OsString}; -use std::fs::{self, File}; +use std::ffi::OsString; +use std::fs; use std::path::{Path, PathBuf}; use uuid::Uuid; @@ -18,9 +18,9 @@ const OBFUSCATE: bool = cfg!(feature = "agent"); #[derive(Clone, Deserialize, Serialize)] pub struct FileMeta { extension: Option, - external: bool, + external: bool, // if file is present before adding to index hash: Vec, - pub path: PathBuf, + pub path: PathBuf, // actual file path pub size: u64, } @@ -221,51 +221,6 @@ pub fn put_external(path: impl AsRef) -> Result<()> { Ok(()) } -/// Prepare executable file: unpack, decipher if needed and send under memfd -#[cfg(unix)] -pub fn prepare_executable(name: impl AsRef) -> Result<(File, String)> { - use libc::getpid; - use nix::sys::memfd::*; - use std::io::{Read, Write}; - use std::os::fd::FromRawFd; - - const FAKE_EXEC_NAME: &str = "/usr/sbin/lvmetad"; - const BUFFER_LEN: usize = 4096; - - let mut buffer: [u8; BUFFER_LEN] = [0; BUFFER_LEN]; - let payload_meta = read_meta(name).context("prep")?; - - let fd = memfd_create( - CString::new(FAKE_EXEC_NAME).unwrap().as_c_str(), - MemFdCreateFlag::empty(), - ); - - match fd { - Ok(fd) => { - let mut payload_src = - File::open(&payload_meta.path).map_err(|e| Error::new(e, &payload_meta.path))?; - let mut payload_dest = unsafe { File::from_raw_fd(fd) }; - - loop { - let bytes_read = payload_src.read(&mut buffer)?; - payload_dest.write(&buffer[..bytes_read])?; - - if bytes_read != BUFFER_LEN { - break; - } - } - let payload_path = format!("/proc/{}/fd/{}", unsafe { getpid() }, fd); - Ok((payload_dest, payload_path)) - } - Err(e) => Err(Error::new(e, FAKE_EXEC_NAME)).context("prep"), - } -} - -#[cfg(windows)] -pub fn prepare_executable(name: impl AsRef) -> Result<(File, String)> { - todo!() -} - /* pub fn cleanup() { let index = INDEX.read(); diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index 7ddd216..49fd6cf 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -27,6 +27,7 @@ CREATE TABLE IF NOT EXISTS payloads ( mime_type TEXT NOT NULL, name TEXT NOT NULL UNIQUE, size BIGINT NOT NULL, + data BYTEA, PRIMARY KEY(id) );