diff --git a/bin/migrator/src/main.rs b/bin/migrator/src/main.rs index f6f4a5b..6043216 100644 --- a/bin/migrator/src/main.rs +++ b/bin/migrator/src/main.rs @@ -11,14 +11,14 @@ use diesel::migration::Migration; use diesel::{migration, pg::PgConnection, Connection}; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use std::error::Error; -use u_lib::config; +use u_lib::config::DBEnv; use u_lib::db::generate_postgres_url; const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); fn main() -> Result<(), Box> { let action = action::parse_command_line()?; - let dbconfig = config::db::load()?; + let dbconfig = DBEnv::load()?; database::setup_database().unwrap(); diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index 1fd93e8..f64df3e 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -1,7 +1,3 @@ -// TODO: -// поддержка питона -// резолв адреса управляющего сервера через DoT - #[macro_use] extern crate log; @@ -11,13 +7,13 @@ use tokio::time::{sleep, Duration}; use u_lib::{ api::ClientHandler, cache::JobCache, - config::{endpoints, get_self_uid}, + config::{get_self_uid, EndpointsEnv}, error::ErrChan, executor::pop_completed, + jobs::{fat_meta_to_thin, UnnamedJobsBatch}, logging::init_logger, messaging::Reportable, models::AssignedJobById, - runner::JobRunner, }; const ITERATION_LATENCY: u64 = 5; @@ -28,15 +24,18 @@ pub async fn process_request(jobs: Vec, client: &ClientHandler) if !JobCache::contains(jr.job_id) { info!("Fetching job: {}", &jr.job_id); let fetched_job = loop { - match client.get_jobs(Some(jr.job_id)).await { - Ok(mut result) => break result.pop().unwrap(), + match client.get_job(jr.job_id).await { + Ok(result) => break result, Err(err) => { debug!("{:?} \nretrying...", err); sleep(Duration::from_secs(ITERATION_LATENCY)).await; } } }; - JobCache::insert(fetched_job); + match fat_meta_to_thin(fetched_job) { + Ok(thin_meta) => JobCache::insert(thin_meta), + Err(e) => ErrChan::send(e, "pld").await, + } } } info!( @@ -46,14 +45,18 @@ pub async fn process_request(jobs: Vec, client: &ClientHandler) .collect::>() .join(", ") ); - let mut runner = JobRunner::from_jobs(jobs); - let errors = runner.pop_errors(); - if !errors.is_empty() { - for e in errors { - ErrChan::send(e, "ebld").await; - } - } - runner.unwrap_one().spawn().await; + + let meta_with_ids = jobs + .into_iter() + .map(|job| { + let meta = JobCache::get(job.job_id).unwrap().clone(); + (meta, job) + }) + .collect::>(); + + UnnamedJobsBatch::from_meta_with_id(meta_with_ids) + .spawn() + .await; } } @@ -77,13 +80,15 @@ async fn error_reporting(client: ClientHandler) -> ! { } async fn agent_loop(client: ClientHandler) -> ! { + let self_uid = get_self_uid(); loop { - match client.get_personal_jobs(get_self_uid()).await { + match client.get_personal_jobs(self_uid).await { Ok(jobs) => { process_request(jobs, &client).await; } Err(err) => ErrChan::send(err, "processing").await, } + let result: Vec = pop_completed() .await .into_iter() @@ -92,6 +97,7 @@ async fn agent_loop(client: ClientHandler) -> ! { Err(e) => Reportable::Error(e), }) .collect(); + if !result.is_empty() { if let Err(err) = client.report(result).await { ErrChan::send(err, "report").await; @@ -102,7 +108,7 @@ async fn agent_loop(client: ClientHandler) -> ! { } pub fn run_forever() -> ! { - let env = endpoints::load().unwrap(); + let env = EndpointsEnv::load(); if cfg!(debug_assertions) { init_logger(Some(format!( diff --git a/bin/u_panel/src/argparse.rs b/bin/u_panel/src/argparse.rs index ef26792..e80843a 100644 --- a/bin/u_panel/src/argparse.rs +++ b/bin/u_panel/src/argparse.rs @@ -4,7 +4,7 @@ use u_lib::{ api::ClientHandler, datatypes::PanelResult, messaging::AsMsg, - models::{Agent, AssignedJob, JobMeta}, + models::{Agent, AssignedJob, FatJobMeta}, UError, UResult, }; use uuid::Uuid; @@ -87,13 +87,21 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult match action { JobCRUD::Create { job } => { - let raw_job = from_str::(&job)?; - let job = raw_job.validated()?; + let raw_job = from_str::(&job)?; + let mut job = raw_job.validated()?; + + if let Some(payload) = &mut job.payload { + payload.read_into_self()?; + } + into_value(client.upload_jobs(job).await?) } - JobCRUD::RUD(RUD::Read { uid }) => into_value(client.get_jobs(uid).await?), + JobCRUD::RUD(RUD::Read { uid }) => match uid { + Some(uid) => into_value(client.get_job(uid).await?), + None => into_value(client.get_jobs().await?), + }, JobCRUD::RUD(RUD::Update { item }) => { - let raw_job = from_str::(&item)?; + let raw_job = from_str::(&item)?; let job = raw_job.validated()?; into_value(client.update_job(job).await?) } diff --git a/bin/u_panel/src/gui/fe/src/app/core/models/job.model.ts b/bin/u_panel/src/gui/fe/src/app/core/models/job.model.ts index 9db303f..2b84d70 100644 --- a/bin/u_panel/src/gui/fe/src/app/core/models/job.model.ts +++ b/bin/u_panel/src/gui/fe/src/app/core/models/job.model.ts @@ -7,6 +7,5 @@ export interface JobModel extends ApiModel { exec_type: string, platform: string, payload: number[] | null, - payload_path: string | null, schedule: string | null, } \ No newline at end of file diff --git a/bin/u_panel/src/gui/fe/src/app/core/tables/dialogs/job-info-dialog.html b/bin/u_panel/src/gui/fe/src/app/core/tables/dialogs/job-info-dialog.html index 79108e6..7bb97f6 100644 --- a/bin/u_panel/src/gui/fe/src/app/core/tables/dialogs/job-info-dialog.html +++ b/bin/u_panel/src/gui/fe/src/app/core/tables/dialogs/job-info-dialog.html @@ -29,12 +29,6 @@ -
- - Payload path - - -
Payload diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index fa0c980..f707aef 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -8,12 +8,12 @@ use anyhow::Result as AnyResult; use argparse::{process_cmd, Args}; use structopt::StructOpt; use u_lib::api::ClientHandler; -use u_lib::config::admin; +use u_lib::config::AccessEnv; use u_lib::logging::init_logger; #[actix_web::main] async fn main() -> AnyResult<()> { - let env = admin::load()?; + let env = AccessEnv::load()?; let client = ClientHandler::new(&env.u_server, Some(env.admin_auth_token)).await?; let args = Args::from_args(); diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index cabbb8a..98f9d95 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -1,10 +1,11 @@ -use crate::error::Error as ServerError; +use crate::error::Error; use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection}; use u_lib::db::PgAsyncPool; -use u_lib::models::{schema, Agent, AssignedJob, JobMeta, JobState}; +use u_lib::models::{schema, Agent, AssignedJob, JobState, ThinJobMeta}; +use u_lib::platform::Platform; use uuid::Uuid; -type Result = std::result::Result; +type Result = std::result::Result; pub struct PgRepo { pool: PgAsyncPool, @@ -46,37 +47,42 @@ pub struct UDB<'c> { } impl UDB<'_> { - pub fn insert_jobs(&mut self, job_metas: &[JobMeta]) -> Result> { + pub fn insert_jobs(&mut self, job_metas: &[ThinJobMeta]) -> Result> { use schema::jobs; diesel::insert_into(jobs::table) .values(job_metas) .get_results(self.conn) - .map(|rows| rows.iter().map(|job: &JobMeta| job.id).collect()) + .map(|rows| rows.iter().map(|job: &ThinJobMeta| job.id).collect()) .map_err(with_err_ctx("Can't insert jobs")) } - pub fn get_jobs(&mut self, ouid: Option) -> Result> { + pub fn get_job(&mut self, uid: Uuid) -> Result> { use schema::jobs; - match ouid { - Some(uid) => jobs::table - .filter(jobs::id.eq(uid)) - .get_results::(self.conn), - None => jobs::table.load::(self.conn), - } - .map_err(with_err_ctx("Can't get exact jobs")) + jobs::table + .filter(jobs::id.eq(uid)) + .first(self.conn) + .optional() + .map_err(with_err_ctx(format!("Can't get job {uid}"))) } - pub fn find_job_by_alias(&mut self, alias: &str) -> Result> { + pub fn get_jobs(&mut self) -> Result> { use schema::jobs; - let result = jobs::table + jobs::table + .load(self.conn) + .map_err(with_err_ctx("Can't get jobs")) + } + + pub fn find_job_by_alias(&mut self, alias: &str) -> Result> { + use schema::jobs; + + jobs::table .filter(jobs::alias.eq(alias)) - .first::(self.conn) + .first(self.conn) .optional() - .map_err(with_err_ctx(format!("Can't find job by alias {alias}")))?; - Ok(result) + .map_err(with_err_ctx(format!("Can't find job by alias {alias}"))) } pub fn insert_agent(&mut self, agent: &Agent) -> Result<()> { @@ -102,16 +108,22 @@ impl UDB<'_> { Ok(()) } - pub fn get_agents(&mut self, ouid: Option) -> Result> { + pub fn get_agent(&mut self, uid: Uuid) -> Result> { use schema::agents; - match ouid { - Some(uid) => agents::table - .filter(agents::id.eq(uid)) - .load::(self.conn), - None => agents::table.load::(self.conn), - } - .map_err(with_err_ctx(format!("Can't get agent(s) {ouid:?}"))) + agents::table + .filter(agents::id.eq(uid)) + .first(self.conn) + .optional() + .map_err(with_err_ctx(format!("Can't get agent {uid:?}"))) + } + + pub fn get_agents(&mut self) -> Result> { + use schema::agents; + + agents::table + .load::(self.conn) + .map_err(with_err_ctx(format!("Can't get agents"))) } pub fn update_job_status(&mut self, uid: Uuid, status: JobState) -> Result<()> { @@ -155,14 +167,38 @@ impl UDB<'_> { Ok(result) } - pub fn set_jobs_for_agent(&mut self, agent_uid: &Uuid, job_uids: &[Uuid]) -> Result> { - use schema::results; + pub fn set_jobs_for_agent(&mut self, agent_uid: Uuid, job_uids: &[Uuid]) -> Result> { + use schema::{jobs, results}; + + let agent_platform = match self.get_agent(agent_uid)? { + Some(agent) => Platform::new(&agent.platform), + None => { + return Err(Error::ProcessingError(format!( + "Agent {agent_uid} not found" + ))) + } + }; + + for uid in job_uids { + let job_platform = jobs::table + .select(jobs::platform) + .filter(jobs::id.eq(uid)) + .first(self.conn) + .map_err(with_err_ctx(format!("Can't find job {uid}")))?; + + if !agent_platform.matches(&job_platform) { + return Err(Error::InsuitablePlatform( + agent_platform.into_string(), + job_platform, + )); + } + } let job_requests = job_uids .iter() .map(|job_uid| AssignedJob { job_id: *job_uid, - agent_id: *agent_uid, + agent_id: agent_uid, ..Default::default() }) .collect::>(); @@ -226,8 +262,8 @@ impl UDB<'_> { Ok(()) } - pub fn update_job(&mut self, job: &JobMeta) -> Result<()> { - job.save_changes::(self.conn) + pub fn update_job(&mut self, job: &ThinJobMeta) -> Result<()> { + job.save_changes::(self.conn) .map_err(with_err_ctx(format!("Can't update job {job:x?}")))?; Ok(()) } @@ -244,6 +280,6 @@ impl UDB<'_> { } } -fn with_err_ctx(msg: impl AsRef) -> impl Fn(DslError) -> ServerError { - move |err| ServerError::DBErrorCtx(format!("{}, reason: {err}", msg.as_ref())) +fn with_err_ctx(msg: impl AsRef) -> impl Fn(DslError) -> Error { + move |err| Error::DBErrorCtx(format!("{}, reason: {err}", msg.as_ref())) } diff --git a/bin/u_server/src/error.rs b/bin/u_server/src/error.rs index e989a0a..0d4a1d4 100644 --- a/bin/u_server/src/error.rs +++ b/bin/u_server/src/error.rs @@ -1,5 +1,6 @@ use diesel::result::Error as DslError; use thiserror::Error; +use u_lib::ufs; use warp::{ http::StatusCode, reject::Reject, @@ -9,6 +10,9 @@ use warp::{ #[derive(Error, Debug)] pub enum Error { + #[error("Configs error: {0}")] + ConfigError(#[from] u_lib::config::Error), + #[error("Error processing {0}")] ProcessingError(String), @@ -21,8 +25,11 @@ pub enum Error { #[error("Deadpool error: {0}")] DeadpoolError(#[from] deadpool_diesel::PoolError), - #[error("General error: {0}")] - Other(String), + #[error(transparent)] + FSError(#[from] ufs::Error), + + #[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")] + InsuitablePlatform(String, String), } impl Reject for Error {} diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index bc4eb37..5ed4acc 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -2,12 +2,15 @@ use std::sync::Arc; use crate::db::{PgRepo, UDB}; use crate::error::Error; +use crate::ValidJobMeta; +use u_lib::jobs::{fat_meta_to_thin, thin_meta_to_fat}; use u_lib::{ messaging::{AsMsg, BaseMessage, Reportable}, misc::OneOrVec, models::*, }; use uuid::Uuid; +use warp::reject::not_found; use warp::Rejection; type EndpResult = Result; @@ -16,13 +19,33 @@ pub struct Endpoints; impl Endpoints { pub async fn get_agents(repo: Arc, uid: Option) -> EndpResult> { - repo.interact(move |mut db| db.get_agents(uid)) - .await - .map_err(From::from) + repo.interact(move |mut db| { + Ok(match uid { + Some(uid) => { + if let Some(agent) = db.get_agent(uid)? { + vec![agent] + } else { + vec![] + } + } + None => db.get_agents()?, + }) + }) + .await + .map_err(From::from) + } + + pub async fn get_job(repo: Arc, uid: Uuid) -> EndpResult { + let Some(job) = repo.interact(move |mut db| db.get_job(uid)).await? else { + return Err(not_found()) + }; + + let fat_meta = thin_meta_to_fat(job).map_err(Error::from)?; + Ok(fat_meta) } - pub async fn get_jobs(repo: Arc, uid: Option) -> EndpResult> { - repo.interact(move |mut db| db.get_jobs(uid)) + pub async fn get_jobs(repo: Arc) -> EndpResult> { + repo.interact(move |mut db| db.get_jobs()) .await .map_err(From::from) } @@ -38,25 +61,32 @@ impl Endpoints { pub async fn get_personal_jobs(repo: Arc, uid: Uuid) -> EndpResult> { repo.transaction(move |mut db| { - let mut agents = db.get_agents(Some(uid))?; - if agents.is_empty() { - let new_agent = Agent::with_id(uid); - db.insert_agent(&new_agent)?; - let job = db - .find_job_by_alias("agent_hello")? - .expect("agent_hello job not found"); - db.set_jobs_for_agent(&uid, &[job.id])?; - } else { - let mut agent = agents.pop().unwrap(); - agent.touch(); - db.update_agent(&agent)?; + let agent = db.get_agent(uid)?; + match agent { + Some(mut agent) => { + agent.touch(); + db.update_agent(&agent)?; + } + None => { + let new_agent = Agent::with_id(uid); + + db.insert_agent(&new_agent)?; + + let job = db + .find_job_by_alias("agent_hello")? + .expect("agent_hello job not found"); + + db.set_jobs_for_agent(uid, &[job.id])?; + } } - let result = db.get_exact_jobs(Some(uid), true)?; - for j in result.iter() { - db.update_job_status(j.id, JobState::Running)?; + let assigned_jobs = db.get_exact_jobs(Some(uid), true)?; + + for job in &assigned_jobs { + db.update_job_status(job.id, JobState::Running)?; } - Ok(result) + + Ok(assigned_jobs) }) .await .map_err(From::from) @@ -64,9 +94,15 @@ impl Endpoints { pub async fn upload_jobs( repo: Arc, - msg: BaseMessage<'static, Vec>, + msg: BaseMessage<'static, Vec>, ) -> EndpResult> { - repo.interact(move |mut db| db.insert_jobs(&msg.into_inner())) + let jobs = msg + .into_inner() + .into_iter() + .map(|meta| Ok(fat_meta_to_thin(meta)?)) + .collect::, Error>>()?; + + repo.interact(move |mut db| db.insert_jobs(&jobs)) .await .map_err(From::from) } @@ -89,10 +125,11 @@ impl Endpoints { pub async fn set_jobs( repo: Arc, agent_uid: Uuid, - msg: BaseMessage<'static, Vec>, + job_idents: BaseMessage<'static, Vec>, ) -> EndpResult> { repo.transaction(move |mut db| { - msg.into_inner() + job_idents + .into_inner() .into_iter() .map(|ident| { Uuid::parse_str(&ident).or_else(|_| { @@ -109,7 +146,7 @@ impl Endpoints { }) }) .collect::, Error>>() - .and_then(|j| db.set_jobs_for_agent(&agent_uid, &j)) + .and_then(|j| db.set_jobs_for_agent(agent_uid, &j)) }) .await .map_err(From::from) @@ -137,14 +174,14 @@ impl Endpoints { let mut agent: Agent = match serde_json::from_slice(&rbytes) { Ok(a) => a, Err(e) => { - warn!("Error deserializing agent from {id}: {e}"); + error!("Error deserializing agent from {id}: {e}"); continue; } }; agent.state = AgentState::Active; db.insert_agent(&agent)?; } - None => warn!("Empty agent data"), + None => error!("Empty agent data"), }, JobType::Shell => (), JobType::Service => (), @@ -154,7 +191,7 @@ impl Endpoints { db.update_result(&result)?; } Reportable::Error(e) => { - warn!("{} reported an error: {}", id, e); + error!("agent {id} reported: {e}"); } Reportable::Dummy => (), }} @@ -175,9 +212,11 @@ impl Endpoints { pub async fn update_job( repo: Arc, - job: BaseMessage<'static, JobMeta>, + job: BaseMessage<'static, ValidJobMeta>, ) -> EndpResult<()> { - repo.interact(move |mut db| db.update_job(&job.into_inner())) + let meta = job.into_inner(); + let thin_meta = fat_meta_to_thin(meta).map_err(Error::from)?; + repo.interact(move |mut db| db.update_job(&thin_meta)) .await?; Ok(()) } diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index 2e60db4..f38210e 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -16,6 +16,7 @@ use std::{convert::Infallible, sync::Arc}; use u_lib::{ config, db::async_pool, + jobs::fat_meta_to_thin, messaging::{AsMsg, BaseMessage, Reportable}, models::*, }; @@ -29,6 +30,8 @@ use warp::{ use crate::handlers::Endpoints; +type ValidJobMeta = FatJobMeta; + fn get_content() -> impl Filter,), Error = Rejection> + Clone where M: AsMsg + Sync + Send + DeserializeOwned + 'static, @@ -64,17 +67,18 @@ pub fn init_endpoints( let upload_jobs = path("upload_jobs") .and(with_db.clone()) - .and(get_content::>()) + .and(get_content::>()) .and_then(Endpoints::upload_jobs) .map(into_message); + let get_job = path("get_job") + .and(with_db.clone()) + .and(warp::path::param::()) + .and_then(Endpoints::get_job) + .map(into_message); + let get_jobs = path("get_jobs") .and(with_db.clone()) - .and( - warp::path::param::() - .map(Some) - .or_else(infallible_none), - ) .and_then(Endpoints::get_jobs) .map(into_message); @@ -121,7 +125,7 @@ pub fn init_endpoints( let update_job = path("update_job") .and(with_db.clone()) - .and(get_content::()) + .and(get_content::()) .and_then(Endpoints::update_job) .map(ok); @@ -142,6 +146,7 @@ pub fn init_endpoints( let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); let auth_zone = (get_agents + .or(get_job.clone()) .or(get_jobs.clone()) .or(upload_jobs) .or(del) @@ -152,7 +157,11 @@ pub fn init_endpoints( .or(ping)) .and(auth_header); - let agent_zone = get_jobs.or(get_personal_jobs).or(report).or(download); + let agent_zone = get_job + .or(get_jobs) + .or(get_personal_jobs) + .or(report) + .or(download); auth_zone.or(agent_zone) } @@ -162,11 +171,13 @@ pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> { let job_alias = "agent_hello"; let if_job_exists = db.find_job_by_alias(job_alias)?; if if_job_exists.is_none() { - let agent_hello = JobMeta::builder() - .with_type(JobType::Init) - .with_alias(job_alias) - .build() - .unwrap(); + let agent_hello = fat_meta_to_thin( + FatJobMeta::builder() + .with_type(JobType::Init) + .with_alias(job_alias) + .build() + .unwrap(), + )?; db.insert_jobs(&[agent_hello])?; } Ok(()) @@ -175,13 +186,13 @@ pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> { } pub async fn serve() -> Result<(), ServerError> { - let env = config::db::load().unwrap(); + let env = config::DBEnv::load()?; let pool = async_pool(&env); let db = PgRepo::new(pool); preload_jobs(&db).await?; - let env = config::admin::load().map_err(|e| ServerError::Other(e.to_string()))?; + let env = config::AccessEnv::load()?; let routes = init_endpoints(&env.admin_auth_token, db) .recover(handle_rejection) .with(custom(logger)); diff --git a/images/musl-libs.Dockerfile b/images/musl-libs.Dockerfile index c2e0b01..1ddf2c1 100644 --- a/images/musl-libs.Dockerfile +++ b/images/musl-libs.Dockerfile @@ -65,7 +65,6 @@ RUN curl -sSL https://zlib.net/zlib-$ZLIB_VER.tar.gz | tar xz && \ # Build openssl (used in curl and pq) # Would like to use zlib here, but can't seem to get it to work properly -# TODO: fix so that it works RUN curl -sSL https://www.openssl.org/source/old/1.0.2/openssl-$SSL_VER.tar.gz | tar xz && \ cd openssl-$SSL_VER && \ ./Configure no-zlib no-shared -fPIC --prefix=$PREFIX --openssldir=$PREFIX/ssl linux-x86_64 && \ diff --git a/images/tests_runner.Dockerfile b/images/tests_runner.Dockerfile index 2d98f31..35355c5 100644 --- a/images/tests_runner.Dockerfile +++ b/images/tests_runner.Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.64 +FROM rust:1.67 RUN rustup target add x86_64-unknown-linux-musl RUN mkdir -p /tests && chmod 777 /tests diff --git a/integration/tests/fixtures/agent.rs b/integration/tests/fixtures/agent.rs index dc2689d..131e197 100644 --- a/integration/tests/fixtures/agent.rs +++ b/integration/tests/fixtures/agent.rs @@ -1,5 +1,8 @@ use crate::helpers::ENV; -use u_lib::{api::ClientHandler, messaging::Reportable, models::*}; +use u_lib::{ + api::ClientHandler, config::get_self_uid, jobs::fat_meta_to_thin, messaging::Reportable, + models::*, +}; use uuid::Uuid; pub struct RegisteredAgent { @@ -16,7 +19,7 @@ impl RegisteredAgent { #[fixture] pub async fn register_agent() -> RegisteredAgent { let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap(); - let agent_uid = Uuid::new_v4(); + let agent_uid = get_self_uid(); println!("registering agent {agent_uid}"); let resp = cli .get_personal_jobs(agent_uid) @@ -25,10 +28,9 @@ pub async fn register_agent() -> RegisteredAgent { .pop() .unwrap(); let job_id = resp.job_id; - let job = cli.get_jobs(Some(job_id)).await.unwrap().pop().unwrap(); + let job = cli.get_job(job_id).await.unwrap(); assert_eq!(job.alias, Some("agent_hello".to_string())); - let mut agent_data = AssignedJob::from(&job); - agent_data.agent_id = agent_uid; + let mut agent_data = AssignedJob::from((&fat_meta_to_thin(job).unwrap(), resp)); agent_data.set_result(&Agent::with_id(agent_uid)); cli.report(Reportable::Assigned(agent_data)).await.unwrap(); RegisteredAgent { uid: agent_uid } diff --git a/integration/tests/helpers/mod.rs b/integration/tests/helpers/mod.rs index 3747bb1..6bfc9d0 100644 --- a/integration/tests/helpers/mod.rs +++ b/integration/tests/helpers/mod.rs @@ -3,6 +3,6 @@ pub mod panel; pub use panel::Panel; use once_cell::sync::Lazy; -use u_lib::config::endpoints::{load, EndpointsEnv}; +use u_lib::config::EndpointsEnv; -pub static ENV: Lazy = Lazy::new(|| load().unwrap()); +pub static ENV: Lazy = Lazy::new(|| EndpointsEnv::load()); diff --git a/integration/tests/helpers/panel.rs b/integration/tests/helpers/panel.rs index 2de4465..1d76cc8 100644 --- a/integration/tests/helpers/panel.rs +++ b/integration/tests/helpers/panel.rs @@ -15,23 +15,23 @@ impl Panel { pub fn output_argv(argv: &[&str]) -> PanelResult { let result = Self::run(argv); - let output = ProcOutput::from_output(&result).into_vec(); - from_slice(&output) - .map_err(|e| { + let output = ProcOutput::from_output(&result); + + match from_slice(output.get_stdout()) { + Ok(r) => r, + Err(e) => { eprintln!( - "Failed to decode panel response: '{}'", - bytes_to_string(&output) + "Failed to decode panel response: ###'{}'###", + bytes_to_string(output.get_stdout()) ); - e.to_string() - }) - .unwrap() + panic!("{e}") + } + } } - pub fn output( - args: impl Into + Display, - ) -> PanelResult { - eprintln!(">>> {PANEL_BINARY} {}", &args); - let splitted = shlex::split(args.into().as_ref()).unwrap(); + pub fn output(args: impl IntoArgs) -> PanelResult { + eprintln!(">>> {PANEL_BINARY} {}", args.display()); + let splitted = args.into_args(); let result = Self::output_argv( splitted .iter() @@ -40,8 +40,8 @@ impl Panel { .as_ref(), ); match &result { - PanelResult::Ok(r) => eprintln!("<<<+ {r:02x?}"), - PanelResult::Err(e) => eprintln!("<< eprintln!("+<< {r:02x?}"), + PanelResult::Err(e) => eprintln!("!<< {e:02x?}"), } result } @@ -49,17 +49,58 @@ impl Panel { fn status_is_ok(data: PanelResult) -> T { match data { PanelResult::Ok(r) => r, - PanelResult::Err(e) => panic!("Panel failed: {}", e), + PanelResult::Err(e) => panic!("Panel failed: {e}"), } } - pub fn check_status(args: impl Into + Display) { + pub fn check_status(args: impl IntoArgs) { let result: PanelResult = Self::output(args); Self::status_is_ok(result); } - pub fn check_output(args: impl Into + Display) -> T { + pub fn check_output(args: impl IntoArgs) -> T { let result = Self::output(args); Self::status_is_ok(result) } } + +pub trait IntoArgs { + fn into_args(self) -> Vec; + fn display(&self) -> String; +} + +impl IntoArgs for String { + fn into_args(self) -> Vec { + >::as_ref(&self).into_args() + } + + fn display(&self) -> String { + self.clone() + } +} + +impl IntoArgs for &str { + fn into_args(self) -> Vec { + shlex::split(self.as_ref()).unwrap() + } + + fn display(&self) -> String { + self.to_string() + } +} + +impl IntoArgs for [S; N] +where + S: Display, +{ + fn into_args(self) -> Vec { + self.into_iter().map(|s| s.to_string()).collect() + } + + fn display(&self) -> String { + self.iter() + .map(|s| format!(r#""{s}""#)) + .collect::>() + .join(" ") + } +} diff --git a/integration/tests/integration/behaviour.rs b/integration/tests/integration/behaviour.rs index 35d77b4..ecfe2cf 100644 --- a/integration/tests/integration/behaviour.rs +++ b/integration/tests/integration/behaviour.rs @@ -3,45 +3,41 @@ use crate::helpers::Panel; use rstest::rstest; use serde_json::{json, to_string}; -use std::error::Error; -use std::fs; use std::time::Duration; use tokio::time::sleep; use u_lib::models::*; use uuid::Uuid; -type TestResult = Result>; - #[rstest] #[tokio::test] -async fn registration(#[future] register_agent: RegisteredAgent) -> TestResult { +async fn registration(#[future] register_agent: RegisteredAgent) { let agent = register_agent.await; let agents: Vec = Panel::check_output("agents read"); let found = agents.iter().find(|v| v.id == agent.uid); assert!(found.is_some()); Panel::check_status(format!("agents delete {}", agent.uid)); - Ok(()) } #[tokio::test] -async fn setup_tasks() -> TestResult { +async fn setup_tasks() { let agents: Vec = Panel::check_output("agents read"); let agent_uid = agents[0].id; let job_alias = "passwd_contents"; let job = json!( {"alias": job_alias, "payload": b"cat /etc/passwd", "argv": "/bin/bash {}" } ); - let cmd = format!("jobs create '{}'", to_string(&job).unwrap()); - Panel::check_status(cmd); + + Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]); let cmd = format!("map create {} {}", agent_uid, job_alias); let assigned_uids: Vec = Panel::check_output(cmd); for _ in 0..3 { - let result: Vec = - Panel::check_output(format!("map read {}", assigned_uids[0])); - if result[0].state == JobState::Finished { - return Ok(()); + let result = + Panel::check_output::>(format!("map read {}", assigned_uids[0])) + .remove(0); + if result.state == JobState::Finished { + assert!(result.to_str_result().contains("root:x:0:0::/root")); } else { sleep(Duration::from_secs(5)).await; eprintln!("waiting for task"); @@ -51,25 +47,29 @@ async fn setup_tasks() -> TestResult { } #[tokio::test] -async fn large_payload() -> TestResult { +async fn large_payload() { let agent_uid = Panel::check_output::>("agents read")[0].id; let job_alias = "large_payload"; - let payload = fs::read("./tests/bin/echoer").unwrap(); - let job = json!( - {"alias": job_alias, "payload": payload, "argv": "/bin/bash {}" } - ); - let cmd = format!("jobs create '{}'", to_string(&job).unwrap()); - Panel::check_status(cmd); - let cmd = format!("map create {} {}", agent_uid, job_alias); + let job = FatJobMeta::builder() + .with_alias(job_alias) + .with_payload_path("./tests/bin/echoer") + .with_shell("{} 'type echo'") + .build() + .unwrap(); + + Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]); + + let cmd = format!("map create {agent_uid} {job_alias}"); let assigned_uids: Vec = Panel::check_output(cmd); for _ in 0..3 { - let result: Vec = - Panel::check_output(format!("map read {}", assigned_uids[0])); - if result[0].state == JobState::Finished { - return Ok(()); + let result = + Panel::check_output::>(format!("map read {}", assigned_uids[0])) + .remove(0); + if result.state == JobState::Finished { + assert_eq!(result.to_str_result(), "type echo"); } else { sleep(Duration::from_secs(5)).await; eprintln!("waiting for task"); diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 1324ec6..6affbe7 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -31,6 +31,7 @@ tracing = { workspace = true } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } uuid = { workspace = true, features = ["serde", "v4"] } +parking_lot = "0.12.1" [target.'cfg(unix)'.dependencies] daemonize = "0.4.1" diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 444394e..34218e2 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -98,8 +98,8 @@ impl ClientHandler { Err(e) => Err(UError::from(e)), }; let resp = response.text().await.context("resp")?; - debug!("url = {}, resp = {}", url.as_ref(), resp); - match is_success { + + let result = match is_success { Ok(_) => from_str::>(&resp) .map(|msg| msg.into_inner()) .or_else(|e| match content_len { @@ -109,7 +109,11 @@ impl ClientHandler { Err(UError::NetError(err, _)) => Err(UError::NetError(err, resp)), _ => unreachable!(), } - .map_err(From::from) + .map_err(From::from); + + debug!("url = {}, resp = {:?}", url.as_ref(), result); + + result } // get jobs for client @@ -127,9 +131,14 @@ impl ClientHandler { self.req(format!("dl/{file}")).await } + /// get exact job + pub async fn get_job(&self, job: Uuid) -> Result> { + self.req(format!("get_job/{job}")).await + } + /// get all available jobs - pub async fn get_jobs(&self, job: Option) -> Result> { - self.req(format!("get_jobs/{}", opt_to_string(job))).await + pub async fn get_jobs(&self) -> Result> { + self.req("get_jobs").await } } @@ -148,7 +157,7 @@ impl ClientHandler { } /// update job - pub async fn update_job(&self, job: models::JobMeta) -> Result<()> { + pub async fn update_job(&self, job: models::FatJobMeta) -> Result<()> { self.req_with_payload("update_job", job).await } @@ -158,7 +167,10 @@ impl ClientHandler { } /// create and upload job - pub async fn upload_jobs(&self, payload: impl OneOrVec) -> Result> { + pub async fn upload_jobs( + &self, + payload: impl OneOrVec>, + ) -> Result> { self.req_with_payload("upload_jobs", payload.into_vec()) .await } diff --git a/lib/u_lib/src/cache.rs b/lib/u_lib/src/cache.rs index 9d173ee..7d933e4 100644 --- a/lib/u_lib/src/cache.rs +++ b/lib/u_lib/src/cache.rs @@ -1,13 +1,10 @@ -use crate::models::JobMeta; +use crate::models::ThinJobMeta; use lazy_static::lazy_static; -use std::{ - collections::HashMap, - ops::Deref, - sync::{RwLock, RwLockReadGuard}, -}; +use parking_lot::{RwLock, RwLockReadGuard}; +use std::{collections::HashMap, ops::Deref}; use uuid::Uuid; -type Cache = HashMap; +type Cache = HashMap; lazy_static! { static ref JOB_CACHE: RwLock = RwLock::new(HashMap::new()); @@ -16,31 +13,31 @@ lazy_static! { pub struct JobCache; impl JobCache { - pub fn insert(job_meta: JobMeta) { - JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta); + pub fn insert(job_meta: ThinJobMeta) { + JOB_CACHE.write().insert(job_meta.id, job_meta); } pub fn contains(uid: Uuid) -> bool { - JOB_CACHE.read().unwrap().contains_key(&uid) + JOB_CACHE.read().contains_key(&uid) } pub fn get<'jh>(uid: Uuid) -> Option> { if !Self::contains(uid) { return None; } - let lock = JOB_CACHE.read().unwrap(); + let lock = JOB_CACHE.read(); Some(JobCacheHolder(lock, uid)) } pub fn remove(uid: Uuid) { - JOB_CACHE.write().unwrap().remove(&uid); + JOB_CACHE.write().remove(&uid); } } pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Uuid); impl<'jh> Deref for JobCacheHolder<'jh> { - type Target = JobMeta; + type Target = ThinJobMeta; fn deref(&self) -> &Self::Target { self.0.get(&self.1).unwrap() diff --git a/lib/u_lib/src/config/mod.rs b/lib/u_lib/src/config.rs similarity index 50% rename from lib/u_lib/src/config/mod.rs rename to lib/u_lib/src/config.rs index 89f176d..7eab224 100644 --- a/lib/u_lib/src/config/mod.rs +++ b/lib/u_lib/src/config.rs @@ -3,6 +3,8 @@ use lazy_static::lazy_static; use serde::Deserialize; use uuid::Uuid; +pub use envy::Error; + pub const MASTER_PORT: u16 = 63714; lazy_static! { @@ -14,49 +16,43 @@ pub fn get_self_uid() -> Uuid { *UID } -pub mod endpoints { - use super::*; - - #[derive(Deserialize)] - pub struct EndpointsEnv { - #[serde(default = "default_host")] - pub u_server: String, - } +#[derive(Deserialize)] +pub struct EndpointsEnv { + #[serde(default = "default_host")] + pub u_server: String, +} - pub fn load() -> EnvResult { +impl EndpointsEnv { + pub fn load() -> EndpointsEnv { dot(); - from_env() + from_env().unwrap() } } -pub mod db { - use super::*; - - #[derive(Deserialize)] - pub struct DBEnv { - pub database: String, - pub host: String, - pub user: String, - pub password: String, - pub port: u16, - } +#[derive(Deserialize)] +pub struct DBEnv { + pub database: String, + pub host: String, + pub user: String, + pub password: String, + pub port: u16, +} +impl DBEnv { pub fn load() -> EnvResult { dot(); prefixed("POSTGRES_").from_env() } } -pub mod admin { - use super::*; - - #[derive(Deserialize)] - pub struct AccessEnv { - pub admin_auth_token: String, - #[serde(default = "default_host")] - pub u_server: String, - } +#[derive(Deserialize)] +pub struct AccessEnv { + pub admin_auth_token: String, + #[serde(default = "default_host")] + pub u_server: String, +} +impl AccessEnv { pub fn load() -> EnvResult { dot(); from_env() diff --git a/lib/u_lib/src/db.rs b/lib/u_lib/src/db.rs index 4bd907d..3e874d2 100644 --- a/lib/u_lib/src/db.rs +++ b/lib/u_lib/src/db.rs @@ -2,7 +2,7 @@ use deadpool_diesel::{Manager as DManager, Pool as DPool, Runtime}; use diesel::pg::PgConnection; use std::time::Duration; -use crate::config::db::DBEnv; +use crate::config::DBEnv; pub type PgAsyncPool = DPool>; diff --git a/lib/u_lib/src/error/mod.rs b/lib/u_lib/src/error/mod.rs index df928ff..0a63f1d 100644 --- a/lib/u_lib/src/error/mod.rs +++ b/lib/u_lib/src/error/mod.rs @@ -2,6 +2,7 @@ mod chan; pub use chan::*; +use crate::ufs; use reqwest::Error as ReqError; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -9,7 +10,7 @@ use uuid::Uuid; pub type UResult = std::result::Result; -#[derive(PartialEq, Error, Debug, Serialize, Deserialize, Clone)] +#[derive(Error, Debug, Serialize, Deserialize, Clone)] pub enum UError { #[error("Runtime error: {0}")] Runtime(String), @@ -23,20 +24,14 @@ pub enum UError { #[error("Job error: {0}")] JobError(String), - #[error("Argument parsing failed: {0}")] - JobArgsError(String), - - #[error("Job is uncompleted yet")] - JobUncompleted, - - #[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")] - InsuitablePlatform(String, String), + #[error("Job build failed: {0}")] + JobBuildError(String), #[error("Job {0} doesn't exist")] NoJob(Uuid), - #[error("FS error while processing {0}: {1}")] - FSError(String, String), + #[error(transparent)] + FSError(#[from] ufs::Error), #[error("Wrong auth token")] WrongToken, diff --git a/lib/u_lib/src/runner.rs b/lib/u_lib/src/jobs.rs similarity index 52% rename from lib/u_lib/src/runner.rs rename to lib/u_lib/src/jobs.rs index 69bb898..ea29e49 100644 --- a/lib/u_lib/src/runner.rs +++ b/lib/u_lib/src/jobs.rs @@ -1,70 +1,49 @@ use crate::{ - cache::JobCache, combined_result::CombinedResult, executor::{ExecResult, Waiter}, misc::OneOrVec, - models::{Agent, AssignedJob, AssignedJobById, JobMeta, JobType}, - platform::Platform, + models::{Agent, AssignedJob, AssignedJobById, FatJobMeta, JobType, Payload, ThinJobMeta}, proc_output::ProcOutput, - tempfile::TempFile, - UError, UResult, + ufs, }; use std::collections::HashMap; use std::process::exit; use tokio::process::Command; -pub struct JobRunner { +pub struct UnnamedJobsBatch { waiter: Waiter, is_running: bool, } -impl JobRunner { - pub fn from_jobs(jobs: impl OneOrVec) -> CombinedResult { +impl UnnamedJobsBatch { + pub fn from_meta_with_id(jobs: impl OneOrVec<(ThinJobMeta, AssignedJobById)>) -> Self { let jobs = jobs.into_vec(); let mut waiter = Waiter::new(); - let mut result = CombinedResult::new(); - for job in jobs { - //waiting for try-blocks stabilization - let built_job: UResult<()> = (|| { - let meta = JobCache::get(job.job_id).ok_or(UError::NoJob(job.job_id))?; - let curr_platform = Platform::current(); - if !curr_platform.matches(&meta.platform) { - return Err(UError::InsuitablePlatform( - meta.platform.clone(), - curr_platform.into_string(), - )); - } - let job = AssignedJob::from((&*meta, job)); - waiter.push(run_assigned_job(job)); - Ok(()) - })(); - if let Err(e) = built_job { - result.err(e) - } + for (meta, job) in jobs { + waiter.push(run_assigned_job(meta, job)); } - result.ok(Self { + Self { waiter, is_running: false, - }); - result + } } - pub fn from_meta(metas: impl OneOrVec) -> CombinedResult { - let jobs = metas + pub fn from_meta(metas: impl OneOrVec) -> Self { + let jobs: Vec<_> = metas .into_vec() .into_iter() - .map(|jm| { - let job_id = jm.id; - if !JobCache::contains(job_id) { - JobCache::insert(jm); - } - AssignedJobById { - job_id, - ..Default::default() - } + .map(|meta| { + let job_id = meta.id; + ( + meta, + AssignedJobById { + job_id, + ..Default::default() + }, + ) }) - .collect::>(); - JobRunner::from_jobs(jobs) + .collect(); + UnnamedJobsBatch::from_meta_with_id(jobs) } /// Spawn jobs @@ -90,24 +69,20 @@ impl JobRunner { } } -pub async fn run_assigned_job(mut job: AssignedJob) -> ExecResult { - match job.exec_type { +pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecResult { + let mut job = AssignedJob::from((&meta, ids)); + match meta.exec_type { JobType::Shell => { - let (argv, _payload) = { - let meta = JobCache::get(job.job_id).unwrap(); + let (argv, _prepared_payload) = { if let Some(ref payload) = meta.payload { - let extracted_payload = match TempFile::write_exec(payload) { - Ok(p) => p, - Err(e) => return Err(UError::Runtime(e.to_string())), - }; - ( - meta.argv.replace("{}", &extracted_payload.get_path()), - Some(extracted_payload), - ) + let (prep_exec, prep_exec_path) = ufs::prepare_executable(payload)?; + let argv_with_exec = meta.argv.replace("{}", &prep_exec_path); + (argv_with_exec, Some(prep_exec)) } else { (meta.argv.clone(), None) } }; + let mut split_cmd = shlex::split(&argv).unwrap().into_iter(); let cmd = split_cmd.nth(0).unwrap(); let args = split_cmd.collect::>(); @@ -119,7 +94,7 @@ pub async fn run_assigned_job(mut job: AssignedJob) -> ExecResult { ), Err(e) => ( ProcOutput::new() - .stderr(e.to_string().into_bytes()) + .set_stderr(e.to_string().into_bytes()) .into_vec(), None, ), @@ -138,61 +113,116 @@ pub async fn run_assigned_job(mut job: AssignedJob) -> ExecResult { Ok(job) } +pub fn fat_meta_to_thin(meta: FatJobMeta) -> Result { + let payload_ident = if let Some(mut payload) = meta.payload { + let job_name = match &meta.alias { + Some(a) => a.to_string(), + None => meta.id.simple().to_string(), + }; + payload.write_self_into(&job_name)?; + Some(job_name) + } else { + None + }; + + Ok(ThinJobMeta { + alias: meta.alias, + argv: meta.argv, + id: meta.id, + exec_type: meta.exec_type, + platform: meta.platform, + payload: payload_ident, + schedule: meta.schedule, + }) +} + +pub fn thin_meta_to_fat(meta: ThinJobMeta) -> Result, ufs::Error> { + let payload = if let Some(payload) = meta.payload { + let mut fat_payload = Payload::Ident(payload); + fat_payload.read_into_self()?; + Some(fat_payload) + } else { + None + }; + + Ok(FatJobMeta { + alias: meta.alias, + argv: meta.argv, + id: meta.id, + exec_type: meta.exec_type, + platform: meta.platform, + payload, + schedule: meta.schedule, + }) +} + /// Store jobs and get results by name -pub struct NamedJobRunner { - runner: Option, - job_names: Vec<&'static str>, - results: HashMap<&'static str, ExecResult>, +pub struct NamedJobsBatch { + runner: Option, + job_names: Vec, + results: HashMap, } -impl NamedJobRunner { +impl NamedJobsBatch { pub fn from_shell( named_jobs: impl OneOrVec<(&'static str, &'static str)>, ) -> CombinedResult { let mut result = CombinedResult::new(); - let jobs: Vec<(&'static str, JobMeta)> = named_jobs + let jobs: Vec<_> = named_jobs .into_vec() .into_iter() - .filter_map( - |(alias, cmd)| match JobMeta::builder().with_shell(cmd).build() { - Ok(meta) => Some((alias, meta)), + .filter_map(|(alias, cmd)| { + match FatJobMeta::builder() + .with_shell(cmd) + .with_alias(alias) + .build() + { + Ok(fat_meta) => match fat_meta_to_thin(fat_meta) { + Ok(thin_meta) => Some(thin_meta), + Err(e) => { + result.err(e); + None + } + }, Err(e) => { result.err(e); None } - }, - ) + } + }) .collect(); result.ok(Self::from_meta(jobs)); result } - pub fn from_meta(named_jobs: impl OneOrVec<(&'static str, JobMeta)>) -> Self { - let mut job_names = vec![]; - let job_metas: Vec = named_jobs + pub fn from_meta(named_jobs: impl OneOrVec) -> Self { + let (job_names, job_metas): (Vec<_>, Vec<_>) = named_jobs .into_vec() .into_iter() - .map(|(alias, mut meta)| { - job_names.push(alias); - meta.alias = Some(alias.to_string()); - meta - }) - .collect(); + .map(|meta| (meta.alias.clone().unwrap(), meta)) + .unzip(); Self { - runner: Some(JobRunner::from_meta(job_metas).unwrap_one()), + runner: Some(UnnamedJobsBatch::from_meta(job_metas)), job_names, results: HashMap::new(), } } - pub async fn wait(mut self) -> Self { + pub async fn wait(mut self) -> NamedJobsBatch { let results = self.runner.take().unwrap().wait().await; - for (name, result) in self.job_names.iter().zip(results.into_iter()) { + for (name, result) in self.job_names.into_iter().zip(results.into_iter()) { self.results.insert(name, result); } - self + + NamedJobsBatch:: { + runner: None, + job_names: vec![], + results: self.results, + } } +} +impl NamedJobsBatch { pub fn pop_opt(&mut self, name: &'static str) -> Option { self.results.remove(name) } @@ -206,9 +236,9 @@ impl NamedJobRunner { mod tests { use super::*; use crate::{ - models::{misc::JobType, JobMeta}, - runner::{JobRunner, NamedJobRunner}, - unwrap_enum, + jobs::{NamedJobsBatch, UnnamedJobsBatch}, + models::{misc::JobType, FatJobMeta}, + unwrap_enum, UError, }; use std::time::SystemTime; @@ -217,10 +247,13 @@ mod tests { #[tokio::test] async fn test_is_really_async() { const SLEEP_SECS: u64 = 1; - let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); - let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); + let job = FatJobMeta::from_shell(format!("sleep {SLEEP_SECS}")).unwrap(); + let sleep_jobs = (0..50) + .map(|_| fat_meta_to_thin(job.clone()).unwrap()) + .collect::>(); + let now = SystemTime::now(); - JobRunner::from_meta(sleep_jobs).unwrap_one().wait().await; + UnnamedJobsBatch::from_meta(sleep_jobs).wait().await; assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) } @@ -254,16 +287,12 @@ mod tests { #[case] payload: Option<&[u8]>, #[case] expected_result: &str, ) -> TestResult { - let mut job = JobMeta::builder().with_shell(cmd); + let mut job = FatJobMeta::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_payload(p); } - let job = job.build().unwrap(); - let result = JobRunner::from_meta(job) - .unwrap_one() - .wait_one() - .await - .unwrap(); + let job = fat_meta_to_thin(job.build().unwrap()).unwrap(); + let result = UnnamedJobsBatch::from_meta(job).wait_one().await.unwrap(); let result = result.to_str_result(); assert_eq!(result.trim(), expected_result); Ok(()) @@ -273,26 +302,31 @@ mod tests { async fn test_complex_load() -> TestResult { const SLEEP_SECS: u64 = 1; let now = SystemTime::now(); - let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); - let longest_job = JobRunner::from_meta(longest_job).unwrap_one().spawn().await; - let ls = JobRunner::from_meta(JobMeta::from_shell("ls")?) - .unwrap_one() - .wait_one() - .await - .unwrap(); + let longest_job = FatJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); + let longest_job = UnnamedJobsBatch::from_meta(fat_meta_to_thin(longest_job).unwrap()) + .spawn() + .await; + let ls = UnnamedJobsBatch::from_meta( + fat_meta_to_thin(FatJobMeta::from_shell("ls").unwrap()).unwrap(), + ) + .wait_one() + .await + .unwrap(); + assert_eq!(ls.retcode.unwrap(), 0); + let folders = ls.to_str_result(); - let subfolders_jobs: Vec = folders + let subfolders_jobs = folders .lines() - .map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) - .collect(); - let ls_subfolders = JobRunner::from_meta(subfolders_jobs) - .unwrap_one() - .wait() - .await; + .map(|f| fat_meta_to_thin(FatJobMeta::from_shell(format!("ls {f}")).unwrap()).unwrap()) + .collect::>(); + + let ls_subfolders = UnnamedJobsBatch::from_meta(subfolders_jobs).wait().await; + for result in ls_subfolders { assert_eq!(result.unwrap().retcode.unwrap(), 0); } + longest_job.wait().await; assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); Ok(()) @@ -317,12 +351,8 @@ mod tests { */ #[tokio::test] async fn test_failing_shell_job() -> TestResult { - let job = JobMeta::from_shell("lol_kek_puk")?; - let job_result = JobRunner::from_meta(job) - .unwrap_one() - .wait_one() - .await - .unwrap(); + let job = fat_meta_to_thin(FatJobMeta::from_shell("lol_kek_puk").unwrap()).unwrap(); + let job_result = UnnamedJobsBatch::from_meta(job).wait_one().await.unwrap(); let output = job_result.to_str_result(); assert!(output.contains("No such file")); assert!(job_result.retcode.is_none()); @@ -338,29 +368,39 @@ mod tests { #[case] payload: Option<&[u8]>, #[case] err_str: &str, ) -> TestResult { - let mut job = JobMeta::builder().with_shell(cmd); + let mut job = FatJobMeta::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_payload(p); } let err = job.build().unwrap_err(); - let err_msg = unwrap_enum!(err, UError::JobArgsError); + let err_msg = unwrap_enum!(err, UError::JobBuildError); assert!(err_msg.contains(err_str)); Ok(()) } #[tokio::test] async fn test_different_job_types() -> TestResult { - let mut jobs = NamedJobRunner::from_meta(vec![ - ("sleeper", JobMeta::from_shell("sleep 3")?), - ( - "gatherer", - JobMeta::builder().with_type(JobType::Init).build()?, - ), - ]) + let mut jobs = NamedJobsBatch::from_meta( + [ + FatJobMeta::builder() + .with_shell("sleep 3") + .with_alias("sleeper") + .build() + .unwrap(), + FatJobMeta::builder() + .with_type(JobType::Init) + .with_alias("gatherer") + .build() + .unwrap(), + ] + .into_iter() + .map(|meta| fat_meta_to_thin(meta).unwrap()) + .collect::>(), + ) .wait() .await; - let gathered = jobs.pop("gatherer"); - assert_eq!(gathered.unwrap().alias, None); + let gathered = jobs.pop("gatherer").unwrap(); + assert_eq!(gathered.alias.unwrap(), "gatherer"); Ok(()) } } diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index 456e34b..31adf56 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -9,15 +9,14 @@ pub mod datatypes; pub mod db; pub mod error; pub mod executor; +pub mod jobs; pub mod logging; pub mod messaging; pub mod misc; pub mod models; pub mod platform; pub mod proc_output; -pub mod runner; -pub mod storage; -pub mod tempfile; +pub mod ufs; #[cfg(unix)] pub mod unix; diff --git a/lib/u_lib/src/messaging/base.rs b/lib/u_lib/src/messaging/base.rs index b397621..b3bc49c 100644 --- a/lib/u_lib/src/messaging/base.rs +++ b/lib/u_lib/src/messaging/base.rs @@ -1,11 +1,11 @@ use crate::config::get_self_uid; use serde::{Deserialize, Serialize}; -use std::borrow::Cow; +use std::{borrow::Cow, fmt::Debug}; use uuid::Uuid; pub struct Moo<'cow, T: AsMsg + Clone>(pub Cow<'cow, T>); -pub trait AsMsg: Clone + Serialize { +pub trait AsMsg: Clone + Serialize + Debug { fn as_message(&self) -> BaseMessage<'_, Self> { BaseMessage::new(self) } @@ -50,7 +50,7 @@ impl<'cow, I: AsMsg> BaseMessage<'cow, I> { self.inner.into_owned() } - pub fn inner_ref(&self) -> &I { + pub fn as_inner(&self) -> &I { self.inner.as_ref() } } diff --git a/lib/u_lib/src/messaging/mod.rs b/lib/u_lib/src/messaging/mod.rs index f1e82e5..e425630 100644 --- a/lib/u_lib/src/messaging/mod.rs +++ b/lib/u_lib/src/messaging/mod.rs @@ -12,15 +12,16 @@ impl AsMsg for Agent {} impl AsMsg for AssignedJob {} impl AsMsg for AssignedJobById {} impl AsMsg for DownloadInfo {} +impl AsMsg for FatJobMeta {} impl AsMsg for Reportable {} -impl AsMsg for JobMeta {} impl AsMsg for String {} +impl AsMsg for ThinJobMeta {} impl AsMsg for Uuid {} impl AsMsg for i32 {} impl AsMsg for u8 {} impl AsMsg for () {} -#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum Reportable { Assigned(AssignedJob), Dummy, diff --git a/lib/u_lib/src/misc.rs b/lib/u_lib/src/misc.rs index 7fddf4c..bbc6484 100644 --- a/lib/u_lib/src/misc.rs +++ b/lib/u_lib/src/misc.rs @@ -20,7 +20,7 @@ macro_rules! unwrap_enum { if let $t(result) = $src { result } else { - panic!("wrong type") + panic!("wrong type {}", stringify!($t)) } }; } diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 775688c..eb9e8d6 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -11,9 +11,7 @@ mod server { #[cfg(feature = "server")] use self::server::*; -use crate::{ - config::get_self_uid, executor::ExecResult, platform::Platform, runner::NamedJobRunner, -}; +use crate::{config::get_self_uid, executor::ExecResult, jobs::NamedJobsBatch, platform::Platform}; use uuid::Uuid; @@ -82,7 +80,7 @@ impl Agent { ("username", "id -un"), ]; - let mut builder = NamedJobRunner::from_shell(cmds).unwrap_one().wait().await; + let mut builder = NamedJobsBatch::from_shell(cmds).unwrap_one().wait().await; let decoder = |job_result: ExecResult| job_result.unwrap().to_str_result().trim().to_string(); @@ -91,7 +89,7 @@ impl Agent { host_info: decoder(builder.pop("host_info")), is_root: &decoder(builder.pop("is_root")) == "0", username: decoder(builder.pop("username")), - platform: Platform::current_as_string(), + platform: Platform::current().into_string(), ..Default::default() } } diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index 2646ea9..57bee7f 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -1,4 +1,4 @@ -use super::{JobMeta, JobState, JobType}; +use super::{JobState, JobType, ThinJobMeta}; use crate::config::get_self_uid; #[cfg(feature = "server")] use crate::models::schema::*; @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use std::{borrow::Cow, time::SystemTime}; use uuid::Uuid; -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] #[cfg_attr( feature = "server", derive(Queryable, Identifiable, Insertable, AsChangeset), @@ -27,31 +27,19 @@ pub struct AssignedJob { pub updated: SystemTime, } -#[derive(Serialize, Deserialize, Clone, Copy)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct AssignedJobById { pub agent_id: Uuid, pub id: Uuid, pub job_id: Uuid, } -impl From<(&JobMeta, AssignedJobById)> for AssignedJob { - fn from((meta, pj): (&JobMeta, AssignedJobById)) -> Self { +impl From<(&ThinJobMeta, AssignedJobById)> for AssignedJob { + fn from((meta, assigned_job_by_id): (&ThinJobMeta, AssignedJobById)) -> Self { AssignedJob { - id: pj.id, - agent_id: pj.agent_id, - job_id: pj.job_id, - alias: meta.alias.clone(), - exec_type: meta.exec_type, - ..Default::default() - } - } -} - -impl From<&JobMeta> for AssignedJob { - fn from(meta: &JobMeta) -> Self { - AssignedJob { - job_id: meta.id, - agent_id: get_self_uid(), + id: assigned_job_by_id.id, + agent_id: assigned_job_by_id.agent_id, + job_id: assigned_job_by_id.job_id, alias: meta.alias.clone(), exec_type: meta.exec_type, ..Default::default() @@ -90,11 +78,11 @@ impl AssignedJob { pub fn to_raw_result(&self) -> &[u8] { match self.result.as_ref() { Some(r) => r, - None => b"No data yet", + None => b"No data", } } - pub fn to_str_result(&self) -> Cow<'_, str> { + pub fn to_str_result(&self) -> Cow { String::from_utf8_lossy(self.to_raw_result()) } diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 8fa53a4..09f55c6 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -1,4 +1,5 @@ use super::JobType; +use crate::models::payload::Payload; #[cfg(feature = "server")] use crate::models::schema::*; use crate::platform::Platform; @@ -6,7 +7,6 @@ use crate::{UError, UResult}; #[cfg(feature = "server")] use diesel::{Identifiable, Insertable, Queryable}; use serde::{Deserialize, Serialize}; -use std::fs; use uuid::Uuid; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -15,7 +15,22 @@ use uuid::Uuid; derive(Queryable, Identifiable, Insertable, AsChangeset), diesel(table_name = jobs) )] -pub struct JobMeta { +pub struct ThinJobMeta { + pub alias: Option, + /// string like `bash -c {} -a 1 --arg2`, + /// where {} is replaced by executable's tmp path + pub argv: String, + pub id: Uuid, + pub exec_type: JobType, + /// target triple + pub platform: String, + pub payload: Option, + /// cron-like string + pub schedule: Option, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct FatJobMeta { #[serde(default)] pub alias: Option, @@ -30,37 +45,33 @@ pub struct JobMeta { #[serde(default)] pub exec_type: JobType, - ///target triple + /// target triple #[serde(default = "Platform::current_as_string")] pub platform: String, #[serde(default)] - pub payload: Option>, - - /// if payload should be read from external resource - #[serde(default)] - pub payload_path: Option, + pub payload: Option, - ///cron-like string + /// cron-like string #[serde(default)] pub schedule: Option, } -impl JobMeta { - pub fn builder() -> JobMetaBuilder { - JobMetaBuilder::default() - } - - pub fn validated(self) -> UResult { +impl FatJobMeta { + pub fn validated(self) -> UResult> { JobMetaBuilder { inner: self }.build() } - pub fn from_shell(cmd: impl Into) -> UResult { + pub fn from_shell(cmd: impl Into) -> UResult> { Self::builder().with_shell(cmd).build() } + + pub fn builder() -> JobMetaBuilder { + JobMetaBuilder::default() + } } -impl Default for JobMeta { +impl Default for FatJobMeta { fn default() -> Self { Self { id: Uuid::new_v4(), @@ -70,14 +81,13 @@ impl Default for JobMeta { platform: Platform::current_as_string(), payload: None, schedule: None, - payload_path: None, } } } #[derive(Default)] pub struct JobMetaBuilder { - inner: JobMeta, + inner: FatJobMeta, } impl JobMetaBuilder { @@ -88,12 +98,12 @@ impl JobMetaBuilder { } pub fn with_payload(mut self, payload: impl Into>) -> Self { - self.inner.payload = Some(payload.into()); + self.inner.payload = Some(Payload::from_payload(payload)); self } - pub fn with_payload_src(mut self, path: impl Into) -> Self { - self.inner.payload_path = Some(path.into()); + pub fn with_payload_path(mut self, path: impl Into) -> Self { + self.inner.payload = Some(Payload::Ident(path.into())); self } @@ -107,54 +117,75 @@ impl JobMetaBuilder { self } - pub fn build(self) -> UResult { + pub fn build(self) -> UResult> { let mut inner = self.inner; + let validated = |jmeta: FatJobMeta| FatJobMeta:: { + alias: jmeta.alias, + argv: jmeta.argv, + id: jmeta.id, + exec_type: jmeta.exec_type, + platform: jmeta.platform, + payload: jmeta.payload, + schedule: jmeta.schedule, + }; + match inner.exec_type { JobType::Shell => { + const ARGV_STR_LEN: usize = 2048; + if inner.argv.is_empty() { // TODO: fix detecting inner.argv = String::from("echo 'hello, world!'") + } else if inner.argv.len() > ARGV_STR_LEN { + return Err(UError::JobBuildError(format!( + "argv length limit ({ARGV_STR_LEN}) exceeded" + ))); } - let argv_parts = - shlex::split(&inner.argv).ok_or(UError::JobArgsError("Shlex failed".into()))?; - let empty_err = UError::JobArgsError("Empty argv".into()); + + let argv_parts = shlex::split(&inner.argv) + .ok_or(UError::JobBuildError("Shlex failed".into()))?; + let empty_err = UError::JobBuildError("Empty argv".into()); + if argv_parts.get(0).ok_or(empty_err.clone())?.is_empty() { return Err(empty_err.into()); } - if let Some(path) = &inner.payload_path { - let data = fs::read(path) - .map_err(|e| UError::FSError(path.to_string(), e.to_string()))?; - inner.payload = Some(data) + + if let Some(payload) = &mut inner.payload { + payload.add_to_index()?; } + match inner.payload.as_ref() { - Some(p) if p.len() > 0 => { - if !inner.argv.contains("{}") { - return Err(UError::JobArgsError( - "Argv contains no executable placeholder".into(), - ) - .into()); + Some(p) => { + if let Payload::Data(d) = p { + if !d.is_empty() && !inner.argv.contains("{}") { + return Err(UError::JobBuildError( + "Argv contains no executable placeholder".into(), + ) + .into()); + } } } None => { if inner.argv.contains("{}") { - return Err(UError::JobArgsError( + return Err(UError::JobBuildError( "No payload provided, but argv contains executable placeholder" .into(), ) .into()); } } - _ => (), }; - if !Platform::new(&inner.platform).check() { - return Err(UError::JobArgsError(format!( + + if Platform::new(&inner.platform).find_valid().is_none() { + return Err(UError::JobBuildError(format!( "Unknown platform {}", inner.platform ))); } - Ok(inner.into()) + + Ok(validated(inner)) } - _ => Ok(inner.into()), + _ => Ok(validated(inner)), } } } diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index bb6e50e..3000f8b 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -1,6 +1,7 @@ mod agent; mod jobs; +mod payload; #[cfg(feature = "server")] pub mod schema; -pub use crate::models::{agent::*, jobs::*}; +pub use crate::models::{agent::*, jobs::*, payload::*}; diff --git a/lib/u_lib/src/models/payload.rs b/lib/u_lib/src/models/payload.rs new file mode 100644 index 0000000..3ec78bd --- /dev/null +++ b/lib/u_lib/src/models/payload.rs @@ -0,0 +1,84 @@ +use crate::ufs; +use serde::{Deserialize, Serialize}; +use std::{fmt, path::PathBuf}; + +#[derive(Clone, Deserialize, Serialize)] +#[serde(untagged)] +pub enum Payload { + /// Raw payload data + Data(Vec), + /// Payload identifier in ufs + Ident(String), +} + +impl Payload { + pub fn read_into_self(&mut self) -> Result<(), ufs::Error> { + match self { + Payload::Data(_) => Ok(()), + Payload::Ident(ident) => { + let data = ufs::read(ident)?; + + *self = Payload::Data(data); + + Ok(()) + } + } + } + + pub fn write_self_into(&mut self, name: impl AsRef) -> Result<(), ufs::Error> { + match self { + Payload::Ident(_) => Ok(()), + Payload::Data(data) => { + ufs::put(&name, data)?; + + *self = Payload::Ident(name.as_ref().to_string()); + + Ok(()) + } + } + } + + pub fn from_payload(data: impl Into>) -> Self { + Payload::Data(data.into()) + } + + pub fn from_path(path: impl Into) -> Result { + let path: PathBuf = path.into(); + + if !path.exists() || path.is_dir() { + return Err(ufs::Error::not_found(path)); + } + + ufs::put_existing(&path)?; + + Ok(Payload::Ident(path.to_string_lossy().to_string())) + } + + pub fn add_to_index(&self) -> Result<(), ufs::Error> { + match self { + Payload::Ident(ident) => Payload::from_path(ident).map(|_| ()), + _ => Ok(()), + } + } +} + +impl fmt::Debug for Payload { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Data(data) => { + const MAX_RESULT_LEN: usize = 256; + let mut dbg = &mut f.debug_tuple("Data"); + + let readable_data = if data.len() > MAX_RESULT_LEN { + let truncated = &data[..MAX_RESULT_LEN]; + String::from_utf8_lossy(truncated).to_string() + " " + } else { + String::from_utf8_lossy(&data).to_string() + }; + dbg = dbg.field(&readable_data); + dbg.finish() + } + Self::Ident(ident) => f.debug_tuple("Ident").field(ident).finish(), + } + } +} diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index b36d99b..7e48a3b 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -56,8 +56,7 @@ diesel::table! { id -> Uuid, exec_type -> Jobtype, platform -> Text, - payload -> Nullable, - payload_path -> Nullable, + payload -> Nullable, schedule -> Nullable, } } @@ -85,9 +84,4 @@ diesel::joinable!(certificates -> agents (agent_id)); diesel::joinable!(results -> agents (agent_id)); diesel::joinable!(results -> jobs (job_id)); -diesel::allow_tables_to_appear_in_same_query!( - agents, - certificates, - jobs, - results, -); +diesel::allow_tables_to_appear_in_same_query!(agents, certificates, jobs, results,); diff --git a/lib/u_lib/src/platform.rs b/lib/u_lib/src/platform.rs index 04aa4ae..529f41a 100644 --- a/lib/u_lib/src/platform.rs +++ b/lib/u_lib/src/platform.rs @@ -1,5 +1,5 @@ use guess_host_triple::guess_host_triple; -use platforms::{Platform as _Platform, PlatformReq}; +use platforms::{Platform as LibPlatform, PlatformReq}; use serde::Deserialize; use std::str::FromStr; @@ -8,11 +8,11 @@ pub struct Platform(String); impl Platform { pub fn new(p: impl Into) -> Self { - Self(p.into()) + Platform(p.into()) } pub fn current() -> Platform { - Self(guess_host_triple().unwrap_or("unknown").to_string()) + Self(guess_host_triple().unwrap().to_string()) } pub fn current_as_string() -> String { @@ -20,27 +20,23 @@ impl Platform { } pub fn matches(&self, pf: impl AsRef) -> bool { + // this finder needs a full triple, so when the .0 is empty, return true + // this is fucked up tbh + let Some(platform_to_match_against) = self.find_valid() else { + return self.0.is_empty() + }; + match PlatformReq::from_str(pf.as_ref()) { - Ok(p) => p.matches(&_Platform::find(&self.0).unwrap()), + Ok(p) => p.matches(&platform_to_match_against), Err(_) => false, } } - pub fn check(&self) -> bool { - PlatformReq::from_str(&self.0).is_ok() + pub fn find_valid(&self) -> Option<&'static LibPlatform> { + LibPlatform::find(&self.0) } pub fn into_string(self) -> String { self.0 } - - pub fn any() -> Platform { - Self(String::from("*")) - } -} - -impl Default for Platform { - fn default() -> Self { - Self::any() - } } diff --git a/lib/u_lib/src/proc_output.rs b/lib/u_lib/src/proc_output.rs index 01e0f06..06de5a6 100644 --- a/lib/u_lib/src/proc_output.rs +++ b/lib/u_lib/src/proc_output.rs @@ -7,12 +7,12 @@ pub struct ProcOutput { } impl ProcOutput { - const STDERR_DELIMETER: &[u8] = b"\n[STDERR]\n"; + const STDERR_DELIMETER: &[u8] = b"[STDERR]\n"; pub fn from_output(output: &Output) -> Self { Self::new() - .stdout(output.stdout.to_vec()) - .stderr(output.stderr.to_vec()) + .set_stdout(output.stdout.to_vec()) + .set_stderr(output.stderr.to_vec()) } pub fn new() -> Self { @@ -22,42 +22,59 @@ impl ProcOutput { } } - pub fn stdout(mut self, data: Vec) -> Self { + pub fn set_stdout(mut self, data: Vec) -> Self { self.stdout = data; self } - pub fn stderr(mut self, data: Vec) -> Self { + pub fn set_stderr(mut self, data: Vec) -> Self { self.stderr = data; self } + pub fn get_stdout(&self) -> &[u8] { + &self.stdout + } + + pub fn get_stderr(&self) -> &[u8] { + &self.stderr + } + pub fn into_vec(self) -> Vec { let mut result: Vec = vec![]; - if !self.stdout.is_empty() { + let stdout_is_empty = self.stdout.is_empty(); + if !stdout_is_empty { result.extend(self.stdout); } if !self.stderr.is_empty() { + if !stdout_is_empty && *result.last().unwrap() != b'\n' { + result.extend(b"\n"); + } result.extend(Self::STDERR_DELIMETER); result.extend(self.stderr); } result } - pub fn from_raw_proc_output(raw: &[u8]) -> Option { + pub fn into_readable(self) -> String { + String::from_utf8_lossy(&self.into_vec()).to_string() + } + + pub fn from_raw_proc_output(raw: &[u8]) -> Self { let stderr_delim_len = Self::STDERR_DELIMETER.len(); - raw.windows(stderr_delim_len) + let split_pos = raw + .windows(stderr_delim_len) .position(|w| w == Self::STDERR_DELIMETER) - .map(|split_pos| { - let (stdout, stderr) = raw.split_at(split_pos); - let result = Self::new().stdout(stdout.to_vec()); - if stderr.len() <= stderr_delim_len { - result.stderr(stderr[stderr_delim_len..].to_vec()) - } else { - result - } - }) + .unwrap_or(raw.len()); + + let (stdout, stderr) = raw.split_at(split_pos); + let result = Self::new().set_stdout(stdout.to_vec()); + if stderr.len() >= stderr_delim_len { + result.set_stderr(stderr[stderr_delim_len..].to_vec()) + } else { + result + } } } @@ -73,23 +90,23 @@ mod tests { #[case::stdout_stderr( "lol", "kek", - &format!("lol{}kek", STDERR_DELIMETER) + &format!("lol\n{}kek", STDERR_DELIMETER) )] #[case::stderr( "", "kek", &format!("{}kek", STDERR_DELIMETER) )] - fn test_to_combined(#[case] stdout: &str, #[case] stderr: &str, #[case] result: &str) { + fn test_to_combined(#[case] stdout: &str, #[case] stderr: &str, #[case] expected_result: &str) { let output = ProcOutput::new() - .stdout(stdout.as_bytes().to_vec()) - .stderr(stderr.as_bytes().to_vec()); - assert_eq!(&bytes_to_string(&output.into_vec()), result) + .set_stdout(stdout.as_bytes().to_vec()) + .set_stderr(stderr.as_bytes().to_vec()); + assert_eq!(bytes_to_string(&output.into_vec()), expected_result) } #[rstest] #[case::stdout_stderr( - &format!("lal{}kik", STDERR_DELIMETER), + &format!("lal\n{}kik", STDERR_DELIMETER), )] #[case::stdout( &format!("qeq"), @@ -97,8 +114,8 @@ mod tests { #[case::stderr( &format!("{}vev", STDERR_DELIMETER), )] - fn test_from_combined(#[case] src_result: &str) { - let output = ProcOutput::from_raw_proc_output(src_result.as_bytes()).unwrap(); - assert_eq!(bytes_to_string(&output.into_vec()).trim(), src_result); + fn test_from_combined(#[case] expected_result: &str) { + let output = ProcOutput::from_raw_proc_output(expected_result.as_bytes()); + assert_eq!(bytes_to_string(&output.into_vec()).trim(), expected_result); } } diff --git a/lib/u_lib/src/storage.rs b/lib/u_lib/src/storage.rs deleted file mode 100644 index d96faba..0000000 --- a/lib/u_lib/src/storage.rs +++ /dev/null @@ -1,39 +0,0 @@ -use once_cell::sync::Lazy; -use std::cmp::Eq; -use std::collections::HashMap; -use std::hash::Hash; -use std::ops::Deref; -use std::sync::Arc; -use std::sync::{Mutex, MutexGuard}; - -//improve this later, replace job cacher with it -//possibly add different backends (memory, disk) -pub struct SharedStorage(Arc>>); - -impl SharedStorage { - pub fn new() -> Lazy> { - Lazy::new(|| SharedStorage(Arc::new(Mutex::new(HashMap::new())))) - } - - pub fn lock(&self) -> MutexGuard<'_, HashMap> { - self.0.lock().unwrap() - } - - pub fn get<'get, 'slf: 'get>(&'slf self, key: &'get Key) -> Option> { - if !self.lock().contains_key(key) { - return None; - } - let lock = self.lock(); - Some(RefHolder(lock, key)) - } -} - -pub struct RefHolder<'h, Key, Val>(pub MutexGuard<'h, HashMap>, pub &'h Key); - -impl<'h, Key: Eq + Hash, Val> Deref for RefHolder<'h, Key, Val> { - type Target = Val; - - fn deref(&self) -> &Self::Target { - self.0.get(self.1).unwrap() - } -} diff --git a/lib/u_lib/src/tempfile.rs b/lib/u_lib/src/tempfile.rs deleted file mode 100644 index 0dad26f..0000000 --- a/lib/u_lib/src/tempfile.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::{UError, UResult}; -#[cfg(unix)] -use std::os::unix::fs::PermissionsExt; -use std::{env::temp_dir, fs, ops::Drop, path::PathBuf}; -use uuid::Uuid; - -pub struct TempFile { - path: PathBuf, -} - -impl TempFile { - pub fn get_path(&self) -> String { - self.path.to_string_lossy().to_string() - } - - pub fn new() -> Self { - let name = Uuid::new_v4().simple().to_string(); - let mut path = temp_dir(); - path.push(name); - Self { path } - } - - pub fn write_all(&self, data: &[u8]) -> UResult<()> { - fs::write(&self.path, data).map_err(|e| UError::FSError(self.get_path(), e.to_string()))?; - Ok(()) - } - - pub fn write_exec(data: &[u8]) -> UResult { - let this = Self::new(); - let path = this.get_path(); - dbg!(&path); - this.write_all(data)?; - - #[cfg(unix)] - { - let perms = fs::Permissions::from_mode(0o555); - fs::set_permissions(&path, perms).map_err(|e| UError::FSError(path, e.to_string()))?; - } - Ok(this) - } -} - -impl Drop for TempFile { - fn drop(&mut self) { - fs::remove_file(&self.path).unwrap(); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::conv::bytes_to_string; - use std::path::Path; - use std::process::Command; - - #[test] - fn test_file_is_not_busy() { - let binary = include_bytes!("../tests/fixtures/echoer"); - for _ in 0..100 { - let executable = TempFile::write_exec(binary).unwrap(); - let path = executable.get_path(); - let result = Command::new(path).arg("qwe").output().unwrap(); - assert_eq!(bytes_to_string(result.stdout.as_ref()).trim(), "qwe"); - } - } - - #[test] - fn test_file_removed_after_dropping() { - let path; - { - let file = TempFile::new(); - file.write_all(b"asdqwe").unwrap(); - path = file.get_path(); - assert!(Path::new(&path).exists()) - } - assert!(!Path::new(&path).exists()) - } -} diff --git a/lib/u_lib/src/ufs/error.rs b/lib/u_lib/src/ufs/error.rs new file mode 100644 index 0000000..28f985a --- /dev/null +++ b/lib/u_lib/src/ufs/error.rs @@ -0,0 +1,37 @@ +use serde::{Deserialize, Serialize}; +use std::fmt::Display; +use std::io; +use std::path::Path; + +#[derive(thiserror::Error, Debug, Deserialize, Serialize, Clone)] +#[error("Filesystem error while processing '{path}': {err}")] +pub struct Error { + err: String, + path: String, +} + +impl Error { + pub fn new(err: impl Display, path: impl AsRef) -> Self { + Error { + err: err.to_string(), + path: path.as_ref().to_string_lossy().to_string(), + } + } + + pub fn not_found(path: impl AsRef) -> Self { + Error::new("Not found", path) + } + + pub fn already_exists(path: impl AsRef) -> Self { + Error::new("Already exists", path) + } +} + +impl From for Error { + fn from(e: io::Error) -> Self { + Error { + err: e.to_string(), + path: String::new(), + } + } +} diff --git a/lib/u_lib/src/ufs/mod.rs b/lib/u_lib/src/ufs/mod.rs new file mode 100644 index 0000000..c63f9d6 --- /dev/null +++ b/lib/u_lib/src/ufs/mod.rs @@ -0,0 +1,166 @@ +// This module is aiming to store obfuscated payloads, get them by name, +// delete or prepare to execute via memfd_create (unix) + +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::env::temp_dir; +use std::ffi::{CString, OsString}; +use std::fs::{self, File}; +use std::path::{Path, PathBuf}; +use uuid::Uuid; + +mod error; +pub use error::Error; + +// INDEX format: given_name -> payload_meta +static INDEX: Lazy>> = Lazy::new(|| RwLock::new(HashMap::new())); + +struct FileMeta { + path: PathBuf, + obfuscated: bool, + extension: Option, +} + +/// Remove deleted files from index +pub fn sync_index() { + let mut index = INDEX.write(); + + let files_to_delete: Vec = index + .iter() + .filter_map(|(name, meta)| { + if meta.path.exists() { + None + } else { + Some(name.to_string()) + } + }) + .collect(); + + files_to_delete.into_iter().for_each(|f| { + index.remove(&f); + }); +} + +pub fn in_index(name: impl AsRef) -> bool { + sync_index(); + + INDEX.read().get(name.as_ref()).is_some() +} + +pub fn read(name: impl AsRef) -> Result, Error> { + sync_index(); + + let name = name.as_ref(); + let index = INDEX.read(); + let meta = index.get(name).ok_or_else(|| Error::not_found(name))?; + + fs::read(&meta.path).map_err(|e| Error::new(e, name)) +} + +/// Create new file and add to index +pub fn put(name: impl AsRef, data: impl AsRef<[u8]>) -> Result<(), Error> { + let name = name.as_ref(); + let obfuscate = !cfg!(feature = "server") && !cfg!(feature = "panel"); + + if in_index(&name) { + return Err(Error::already_exists(&name)); + } + + let path = { + let exec_name = if obfuscate { + PathBuf::from(Uuid::new_v4().simple().to_string()) + } else { + PathBuf::from(name) + }; + + let mut path = temp_dir(); + path.push(exec_name); + path + }; + + let extension = path.file_stem().map(ToOwned::to_owned); + + fs::write(&path, data).map_err(|e| Error::new(e, name))?; + + let mut index = INDEX.write(); + index.insert( + name.to_string(), + FileMeta { + path, + obfuscated: obfuscate, + extension, + }, + ); + + Ok(()) +} + +/// Add existing file to index +pub fn put_existing(path: impl AsRef) -> Result<(), Error> { + let path = path.as_ref(); + let path_str = path.as_os_str().to_string_lossy().to_string(); + + if !path.exists() || path.is_dir() { + return Err(Error::not_found(path)); + } + + if in_index(&path_str) { + return Err(Error::already_exists(&path)); + } + + let mut index = INDEX.write(); + index.insert( + path_str, + FileMeta { + path: path.to_owned(), + obfuscated: false, + extension: path.file_stem().map(ToOwned::to_owned), + }, + ); + + Ok(()) +} + +#[cfg(unix)] +pub fn prepare_executable(name: impl AsRef) -> Result<(File, String), Error> { + use libc::getpid; + use nix::sys::memfd::*; + use std::io::{Read, Write}; + use std::os::fd::FromRawFd; + + const FAKE_EXEC_NAME: &str = "/usr/sbin/lvmetad"; + const BUFFER_LEN: usize = 4096; + + sync_index(); + + let mut buffer: [u8; BUFFER_LEN] = [0; BUFFER_LEN]; + let name = name.as_ref(); + let index = INDEX.read(); + let payload_meta = index.get(name).ok_or_else(|| Error::not_found(name))?; + + let fd = memfd_create( + CString::new(FAKE_EXEC_NAME).unwrap().as_c_str(), + MemFdCreateFlag::empty(), + ); + + match fd { + Ok(fd) => { + let mut payload_src = + File::open(&payload_meta.path).map_err(|e| Error::new(e, &payload_meta.path))?; + let mut payload_dest = unsafe { File::from_raw_fd(fd) }; + + loop { + let bytes_read = payload_src.read(&mut buffer)?; + payload_dest.write(&buffer)?; + + if bytes_read != BUFFER_LEN { + break; + } + } + let payload_path = format!("/proc/{}/fd/{}", unsafe { getpid() }, fd); + Ok((payload_dest, payload_path)) + } + Err(e) => Err(Error::new(e, FAKE_EXEC_NAME)), + } +} diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index 619f61a..41bf5e7 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -28,8 +28,7 @@ CREATE TABLE IF NOT EXISTS jobs ( id UUID NOT NULL DEFAULT uuid_generate_v4(), exec_type JobType NOT NULL DEFAULT 'shell', platform TEXT NOT NULL, - payload BYTEA, - payload_path TEXT, + payload TEXT, schedule TEXT, PRIMARY KEY(id) diff --git a/spec.txt b/spec.txt index 7b833d0..9427f7b 100644 --- a/spec.txt +++ b/spec.txt @@ -3,5 +3,4 @@ Upload/download files More tests Agent update (use more JobType's) Bump wine version to test agent on windows -Store downloaded payload on disk instead of ram Improve web interface