From 6fe0e71959480760d30a19c64e0a2bc0bb596e70 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Mon, 26 Sep 2022 02:50:10 +0500 Subject: [PATCH] refactor again - deduplicate deps - simplify & unify Job* interfaces - unify api with OneOrVec - remove some Display impls - remove ips db table --- Cargo.toml | 10 + bin/u_agent/Cargo.toml | 10 +- bin/u_agent/src/lib.rs | 42 ++-- bin/u_panel/Cargo.toml | 27 ++- bin/u_panel/src/argparse.rs | 8 +- bin/u_run/Cargo.toml | 2 +- bin/u_server/Cargo.toml | 19 +- bin/u_server/src/db.rs | 2 + bin/u_server/src/handlers.rs | 42 ++-- bin/u_server/src/u_server.rs | 4 +- .../integration-tests/tests_runner.Dockerfile | 3 +- images/integration-tests/u_db.Dockerfile | 5 + integration/Cargo.toml | 22 +- integration/docker-compose.yml | 23 +- integration/docker.py | 79 +++++++ integration/docker_compose.py | 72 ------- integration/integration_tests.py | 55 +++-- integration/tests/fixtures/agent.rs | 11 +- lib/u_lib/Cargo.toml | 41 ++-- lib/u_lib/src/api.rs | 21 +- lib/u_lib/src/errors/chan.rs | 5 +- lib/u_lib/src/errors/variants.rs | 11 +- lib/u_lib/src/executor.rs | 51 +++-- lib/u_lib/src/lib.rs | 2 +- lib/u_lib/src/messaging/base.rs | 5 +- lib/u_lib/src/messaging/mod.rs | 12 +- lib/u_lib/src/models/agent.rs | 44 ++-- lib/u_lib/src/models/jobs/assigned.rs | 119 ++++------ lib/u_lib/src/models/jobs/meta.rs | 117 +++------- lib/u_lib/src/models/jobs/misc.rs | 32 +-- lib/u_lib/src/models/schema.rs | 42 +--- lib/u_lib/src/{builder.rs => runner.rs} | 203 +++++++++++------- lib/u_lib/src/utils/fmt/stripped.rs | 7 +- lib/u_lib/src/utils/mod.rs | 2 - lib/u_lib/src/utils/platform.rs | 4 + .../2020-10-24-111622_create_all/up.sql | 22 +- 36 files changed, 584 insertions(+), 592 deletions(-) delete mode 100644 integration/docker_compose.py rename lib/u_lib/src/{builder.rs => runner.rs} (54%) diff --git a/Cargo.toml b/Cargo.toml index 02c9d71..ece9d2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,16 @@ members = [ "integration" ] +[workspace.dependencies] +reqwest = { version = "0.11", features = ["json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.11", features = ["macros"] } +tracing = "0.1.35" +tracing-appender = "0.2.0" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"]} +uuid = "0.6.5" + [profile.release] panic = "abort" strip = "symbols" diff --git a/bin/u_agent/Cargo.toml b/bin/u_agent/Cargo.toml index a7cfb91..77259f2 100644 --- a/bin/u_agent/Cargo.toml +++ b/bin/u_agent/Cargo.toml @@ -7,10 +7,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] } -sysinfo = "0.10.5" log = "^0.4" -uuid = "0.6.5" -reqwest = { version = "0.11", features = ["json"] } -u_lib = { version = "*", path = "../../lib/u_lib" } +reqwest = { workspace = true } +sysinfo = "0.10.5" +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "time"] } +uuid = { workspace = true } +u_lib = { path = "../../lib/u_lib" } diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index 597df48..1725e58 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -9,16 +9,16 @@ extern crate log; use std::sync::Arc; use tokio::time::{sleep, Duration}; use u_lib::{ - api::ClientHandler, builder::JobBuilder, cache::JobCache, config::get_self_uid, - errors::ErrChan, executor::pop_completed, logging::init_logger, messaging::Reportable, - models::AssignedJob, utils::load_env_default, UError, + api::ClientHandler, cache::JobCache, config::get_self_uid, errors::ErrChan, + executor::pop_completed, logging::init_logger, messaging::Reportable, models::AssignedJobById, + runner::JobRunner, utils::load_env_default, }; const ITERATION_LATENCY: u64 = 5; -pub async fn process_request(job_requests: Vec, client: &ClientHandler) { - if !job_requests.is_empty() { - for jr in &job_requests { +pub async fn process_request(jobs: Vec, client: &ClientHandler) { + if !jobs.is_empty() { + for jr in &jobs { if !JobCache::contains(jr.job_id) { info!("Fetching job: {}", &jr.job_id); let fetched_job = loop { @@ -35,20 +35,19 @@ pub async fn process_request(job_requests: Vec, client: &ClientHand } info!( "Scheduling jobs: {}", - job_requests - .iter() + jobs.iter() .map(|j| j.job_id.to_string()) .collect::>() .join(", ") ); - let mut builder = JobBuilder::from_request(job_requests); - let errors = builder.pop_errors(); + let mut runner = JobRunner::from_jobs(jobs); + let errors = runner.pop_errors(); if !errors.is_empty() { for e in errors { ErrChan::send(e, "ebld").await; } } - builder.unwrap_one().spawn().await; + runner.unwrap_one().spawn().await; } } @@ -57,7 +56,7 @@ async fn error_reporting(client: Arc) -> ! { match ErrChan::recv().await { Some(err) => { 'retry: for _ in 0..3 { - match client.report(&[Reportable::Error(err.to_string())]).await { + match client.report(Reportable::Error(err.clone())).await { Ok(_) => break 'retry, Err(e) => { debug!("Reporting error: {:?}", e); @@ -71,17 +70,24 @@ async fn error_reporting(client: Arc) -> ! { } } -async fn do_stuff(client: Arc) -> ! { +async fn agent_loop(client: Arc) -> ! { loop { match client.get_personal_jobs(get_self_uid()).await { - Ok(resp) => { - process_request(resp, &client).await; + Ok(jobs) => { + process_request(jobs, &client).await; } Err(err) => ErrChan::send(err, "processing").await, } - let result: Vec = pop_completed().await.into_iter().collect(); + let result: Vec = pop_completed() + .await + .into_iter() + .map(|result| match result { + Ok(r) => Reportable::Assigned(r), + Err(e) => Reportable::Error(e), + }) + .collect(); if !result.is_empty() { - if let Err(err) = client.report(&result).await { + if let Err(err) = client.report(result).await { ErrChan::send(err, "report").await; } } @@ -110,5 +116,5 @@ pub async fn run_forever() -> ! { // } } info!("Startup"); - do_stuff(client).await + agent_loop(client).await } diff --git a/bin/u_panel/Cargo.toml b/bin/u_panel/Cargo.toml index 1e69c0c..618d1bc 100644 --- a/bin/u_panel/Cargo.toml +++ b/bin/u_panel/Cargo.toml @@ -7,23 +7,22 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +actix-cors = "0.6.1" actix-web = "4.1" -structopt = "0.3.21" -uuid = "0.6.5" -serde_json = "1.0.4" -serde = { version = "1.0.114", features = ["derive"] } -tokio = { version = "1.11.0", features = ["rt", "rt-multi-thread"] } -u_lib = { version = "*", path = "../../lib/u_lib", features = ["panel"] } anyhow = "1.0.44" -strum = { version = "0.22.0", features = ["derive"] } +futures-util = "0.3.21" +mime_guess = "2.0.4" once_cell = "1.8.0" -tracing = "0.1.29" -tracing-subscriber = { version = "0.3.3", features = ["env-filter"]} -signal-hook = "0.3.12" -tracing-appender = "0.2.0" rust-embed = { version = "6.3.0", features = ["debug-embed", "compression"] } -mime_guess = "2.0.4" +serde = { workspace = true } +serde_json = { workspace = true } +strum = { version = "0.22.0", features = ["derive"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +tracing-appender = { workspace = true } shlex = "1.1.0" +structopt = "0.3.21" thiserror = "1.0.31" -futures-util = "0.3.21" -actix-cors = "0.6.1" +uuid = { workspace = true } +u_lib = { version = "*", path = "../../lib/u_lib", features = ["panel"] } diff --git a/bin/u_panel/src/argparse.rs b/bin/u_panel/src/argparse.rs index b414fe7..ce810ba 100644 --- a/bin/u_panel/src/argparse.rs +++ b/bin/u_panel/src/argparse.rs @@ -4,7 +4,7 @@ use u_lib::{ api::ClientHandler, datatypes::PanelResult, messaging::AsMsg, - models::{Agent, AssignedJob, JobMeta, RawJobMeta}, + models::{Agent, AssignedJob, JobMeta}, UError, UResult, }; use uuid::Uuid; @@ -96,9 +96,9 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> UResult { }, Cmd::Jobs(action) => match action { JobCRUD::Create { job } => { - let raw_job = serde_json::from_str::(&job)?; + let raw_job = serde_json::from_str::(&job)?; let job = raw_job.into_builder().build()?; - to_json(client.upload_jobs(&[job]).await) + to_json(client.upload_jobs(job).await) } JobCRUD::RUD(RUD::Read { uid }) => to_json(client.get_jobs(uid).await), JobCRUD::RUD(RUD::Update { item }) => { @@ -111,7 +111,7 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> UResult { JobMapCRUD::Create { agent_uid, job_idents, - } => to_json(client.set_jobs(agent_uid, &job_idents).await), + } => to_json(client.set_jobs(agent_uid, job_idents).await), JobMapCRUD::RUD(RUD::Read { uid }) => to_json(client.get_agent_jobs(uid).await), JobMapCRUD::RUD(RUD::Update { item }) => { let assigned = serde_json::from_str::(&item)?; diff --git a/bin/u_run/Cargo.toml b/bin/u_run/Cargo.toml index 9756ea2..1c3b145 100644 --- a/bin/u_run/Cargo.toml +++ b/bin/u_run/Cargo.toml @@ -7,5 +7,5 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -nix = "0.17" libc = "^0.2" +nix = "0.17" diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 5b07891..b825bbf 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -5,17 +5,18 @@ name = "u_server" version = "0.1.0" [dependencies] -tracing = "0.1.35" -thiserror = "*" -warp = { version = "0.3.1", features = ["tls"] } -uuid = { version = "0.6.5", features = ["serde", "v4"] } -once_cell = "1.7.2" +diesel = { version = "1.4.5", features = ["postgres", "uuid"] } hyper = "0.14" +once_cell = "1.7.2" openssl = "*" -diesel = { version = "1.4.5", features = ["postgres", "uuid"] } -serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.9", features = ["macros"] } -u_lib = { path = "../../lib/u_lib", version = "*", features = ["server"] } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = "*" +tracing = { workspace = true } +tokio = { workspace = true, features = ["macros"] } +uuid = { workspace = true, features = ["serde", "v4"] } +u_lib = { path = "../../lib/u_lib", features = ["server"] } +warp = { version = "0.3.1", features = ["tls"] } [dev-dependencies] diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 8eb3a55..09dcad3 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -11,6 +11,7 @@ use uuid::Uuid; pub struct UDB { pub conn: PgConnection, + __p: (), } static DB: OnceCell> = OnceCell::new(); @@ -33,6 +34,7 @@ impl UDB { ); let instance = UDB { conn: PgConnection::establish(&db_url).unwrap(), + __p: (), }; Mutex::new(instance) }) diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 4e1eebe..3b0db33 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -6,7 +6,7 @@ use diesel::SaveChangesDsl; use u_lib::{ messaging::{AsMsg, BaseMessage, Reportable}, models::*, - utils::{OneOrVec, Stripped}, + utils::OneOrVec, }; use uuid::Uuid; use warp::Rejection; @@ -88,36 +88,46 @@ impl Endpoints { msg: BaseMessage<'static, Data>, ) -> EndpResult<()> { let id = msg.id; - let mut failed = vec![]; for entry in msg.into_inner().into_vec() { match entry { Reportable::Assigned(mut result) => { - if id != result.agent_id { + let result_agent_id = &result.agent_id; + if id != *result_agent_id { + warn!("Ids are not equal! actual id: {id}, job id: {result_agent_id}"); continue; } result.state = JobState::Finished; result.updated = SystemTime::now(); + match result.exec_type { + JobType::Init => match &result.result { + Some(rbytes) => { + let mut agent: Agent = match serde_json::from_slice(&rbytes) { + Ok(a) => a, + Err(e) => { + warn!("Error deserializing agent from {id}: {e}"); + continue; + } + }; + agent.state = AgentState::Active; + Self::add_agent(agent).await?; + } + None => warn!("Empty agent data"), + }, + JobType::Shell => (), + JobType::Terminate => todo!(), + JobType::Update => todo!(), + } let db = UDB::lock_db(); - if let Err(e) = result + result .save_changes::(&db.conn) - .map_err(Error::from) - { - failed.push(e.to_string()) - } - } - Reportable::Agent(mut a) => { - a.state = AgentState::Active; - Self::add_agent(a).await?; + .map_err(Error::from)?; } Reportable::Error(e) => { - warn!("{} reported an error: {}", id, Stripped(&e.as_str())); + warn!("{} reported an error: {}", id, e); } Reportable::Dummy => (), } } - if !failed.is_empty() { - return Err(Error::ProcessingError(failed.join(", ")).into()); - } Ok(()) } diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index d2e84d5..8c11e0a 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -156,8 +156,8 @@ pub fn prefill_jobs() -> SResult<()> { match if_job_exists { Ok(_) => Ok(()), Err(Error::DBError(diesel::result::Error::NotFound)) => { - let agent_hello = RawJobMeta::builder() - .with_type(misc::JobType::Manage) + let agent_hello = JobMeta::builder() + .with_type(JobType::Init) .with_alias(job_alias) .build() .unwrap(); diff --git a/images/integration-tests/tests_runner.Dockerfile b/images/integration-tests/tests_runner.Dockerfile index 4714c7d..cefab17 100644 --- a/images/integration-tests/tests_runner.Dockerfile +++ b/images/integration-tests/tests_runner.Dockerfile @@ -1,4 +1,5 @@ -FROM rust:1.62 +FROM rust:1.64 RUN rustup target add x86_64-unknown-linux-musl +RUN mkdir -p /tests && chmod 777 /tests CMD ["sleep", "3600"] \ No newline at end of file diff --git a/images/integration-tests/u_db.Dockerfile b/images/integration-tests/u_db.Dockerfile index 8578b44..5f1812f 100644 --- a/images/integration-tests/u_db.Dockerfile +++ b/images/integration-tests/u_db.Dockerfile @@ -1,6 +1,7 @@ FROM postgres:14.5 ENV DEBIAN_FRONTEND=noninteractive + RUN apt update && apt upgrade -y RUN apt install -y curl build-essential libpq-dev iproute2 RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable --profile minimal @@ -9,4 +10,8 @@ RUN rustup target add x86_64-unknown-linux-musl RUN cargo install diesel_cli --no-default-features --features postgres RUN mkdir -p /unki +ENV LC_ALL en_US.UTF-8 +ENV LANG en_US.UTF-8 +ENV LANGUAGE en_US.UTF-8 +RUN apt install -y locales locales-all COPY u_db_entrypoint.sh /unki/ diff --git a/integration/Cargo.toml b/integration/Cargo.toml index eda6095..706d97d 100644 --- a/integration/Cargo.toml +++ b/integration/Cargo.toml @@ -7,20 +7,16 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] } -tracing = "0.1.35" -uuid = { version = "0.6.5", features = ["serde", "v4"] } -reqwest = { version = "0.11", features = ["json"] } -serde_json = "1.0" -serde = { version = "1.0.114", features = ["derive"] } -shlex = "1.0.0" -rstest = "0.12" once_cell = "1.10.0" - -[dependencies.u_lib] -path = "../lib/u_lib" -version = "*" -features = ["panel"] +reqwest = { workspace = true } +rstest = "0.12" +serde = { workspace = true } +serde_json = { workspace = true } +shlex = "1.0.0" +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "time"] } +tracing = { workspace = true } +uuid = { workspace = true, features = ["serde", "v4"] } +u_lib = { path = "../lib/u_lib", features = ["panel"] } [[test]] diff --git a/integration/docker-compose.yml b/integration/docker-compose.yml index 04bc87b..d148d39 100644 --- a/integration/docker-compose.yml +++ b/integration/docker-compose.yml @@ -1,4 +1,8 @@ -version: "2.1" +version: "3.4" + +x-global: + user: &user + "${DOCKER_UID:-1000}:${DOCKER_GID:-1000}" networks: u_net: @@ -6,7 +10,7 @@ networks: services: u_server: - user: "${DOCKER_UID:-1000}:${DOCKER_GID:-1000}" + user: *user image: unki/u_server networks: - u_net @@ -53,7 +57,7 @@ services: retries: 3 u_agent: - user: "${DOCKER_UID:-1000}:${DOCKER_GID:-1000}" + user: *user image: unki/u_agent networks: - u_net @@ -71,18 +75,19 @@ services: condition: service_healthy tests_runner: - user: "${DOCKER_UID:-1000}:${DOCKER_GID:-1000}" + user: *user image: unki/tests_runner networks: - u_net volumes: - - ./:/tests/ - - ../certs:/certs + - ../__Cargo_integration.toml:/tests/Cargo.toml + - ./:/tests/integration/ + - ../certs:/tests/certs - ../target/x86_64-unknown-linux-musl/${PROFILE:-debug}/u_panel:/u_panel - - ../lib/u_lib:/lib/u_lib - - ../logs:/tests/logs:rw + - ../lib/u_lib:/tests/lib/u_lib + - ../logs:/tests/integration/logs:rw working_dir: - /tests/ + /tests/integration/ depends_on: u_agent: condition: service_started diff --git a/integration/docker.py b/integration/docker.py index 05182bd..355e517 100644 --- a/integration/docker.py +++ b/integration/docker.py @@ -1,4 +1,5 @@ import subprocess +import shlex from utils import * @@ -78,3 +79,81 @@ def rebuild_images_if_needed(force_rebuild=False): if force_rebuild: cmd += ['--no-cache'] docker(cmd) + + +class Compose: + ALL_IMAGES = [ + 'u_agent', + 'u_server', + 'u_db', + 'tests_runner', + ] + + def __init__(self): + self.container_tpl = 'integration_%s_%d' + self.cmd_container = self.container_tpl % ('tests_runner', 1) + self.ALL_CONTAINERS = [self.container_tpl % + (c, 1) for c in self.ALL_IMAGES] + self.scaled_svc = {} + self.scale("u_agent", 2) + + def scale(self, svc, count): + for c in range(1, count): + new_container = self.container_tpl % (svc, c + 1) + self.ALL_CONTAINERS.append(new_container) + self.scaled_svc[svc] = count + + def _call(self, *args): + cmd = [ + 'docker-compose', + '--ansi=never', + ] + list(args) + log(f'Running docker-compose command: {cmd}') + subprocess.check_call(cmd) + + def up(self): + log(f'Instanciating cluster: {self.ALL_CONTAINERS}') + scaled = [f"{k}={v}" for k, v in self.scaled_svc.items()] + if len(scaled) > 0: + scaled.insert(0, '--scale') + self._call('up', '-d', *scaled) + + def down(self): + log('Shutting down cluster') + self._call('down') + + def stop(self): + log('Stopping cluster') + self._call('stop') + + def run(self, cmd): + container = self.cmd_container + if isinstance(cmd, str): + cmd = shlex.split(cmd) + result = docker([ + 'exec', + '-ti', + container + ] + cmd) + return result + + def is_alive(self): + log('Check if all containers are alive') + + errors = check_state(self.ALL_CONTAINERS) + + if errors: + print_errors(errors) + raise TestsError('Error during `is_alive` check') + else: + log('All containers are alive') + + def print_containers_logs(self): + for container in self.ALL_CONTAINERS: + try: + docker([ + 'logs', + container + ]) + except Exception: + pass diff --git a/integration/docker_compose.py b/integration/docker_compose.py deleted file mode 100644 index 0c445dd..0000000 --- a/integration/docker_compose.py +++ /dev/null @@ -1,72 +0,0 @@ -import subprocess -import shlex -from utils import * -from docker import docker, check_state, print_errors - - -class Compose: - ALL_CONTAINERS = [ - 'u_agent', - 'u_server', - 'u_db', - 'tests_runner', - ] - - def __init__(self): - self.container_tpl = 'integration_%s_%d' - self.cmd_container = self.container_tpl % ('tests_runner', 1) - self.ALL_CONTAINERS = [self.container_tpl % - (c, 1) for c in self.ALL_CONTAINERS] - self.scaled_svc = {} - self.scale("u_agent", 2) - - def scale(self, svc, count): - for c in range(1, count): - new_container = self.container_tpl % (svc, c + 1) - self.ALL_CONTAINERS.append(new_container) - self.scaled_svc[svc] = count - - def _call(self, *args): - cmd = [ - 'docker-compose', - '--ansi=never', - ] + list(args) - log(f'Running docker-compose command: {cmd}') - subprocess.check_call(cmd) - - def up(self): - log('Instanciating cluster') - scaled = [f"{k}={v}" for k, v in self.scaled_svc.items()] - if len(scaled) > 0: - scaled.insert(0, '--scale') - self._call('up', '-d', *scaled) - - def down(self): - log('Shutting down cluster') - self._call('down') - - def stop(self): - log('Stopping cluster') - self._call('stop') - - def run(self, cmd): - container = self.cmd_container - if isinstance(cmd, str): - cmd = shlex.split(cmd) - result = docker([ - 'exec', - '-ti', - container - ] + cmd) - return result - - def is_alive(self): - log('Check if all containers are alive') - - errors = check_state(self.ALL_CONTAINERS) - - if errors: - print_errors(errors) - raise TestsError('Error during `is_alive` check') - else: - log('All containers are alive') diff --git a/integration/integration_tests.py b/integration/integration_tests.py index 3402c9a..43c54ff 100644 --- a/integration/integration_tests.py +++ b/integration/integration_tests.py @@ -1,49 +1,68 @@ import signal import sys -from utils import * -from docker import rebuild_images_if_needed -from docker_compose import Compose +import toml +from docker import rebuild_images_if_needed, Compose +from pathlib import Path +from utils import * -cluster = Compose() +CARGO_INTEGRATION_TOML = Path('../__Cargo_integration.toml') +CLUSTER = Compose() -def abort_handler(s, _): - warn(f'Received signal: {s}') - warn(f'Gracefully stopping...') - cluster.down() +def fail(msg): + err(msg) + sys.exit(1) def usage_exit(): usage = f"""Usage: python {__file__.split('/')[-1]} [--rebuild] [--preserve] [--no-run]""" - print(usage) - sys.exit(1) + fail(usage) + + +def create_integration_workspace(): + if CARGO_INTEGRATION_TOML.exists(): + CARGO_INTEGRATION_TOML.unlink() + workspace = toml.load('../Cargo.toml') + workspace['workspace']['members'] = ['integration'] + with open(CARGO_INTEGRATION_TOML, 'w') as fo: + toml.dump(workspace, fo) def run_tests(): - allowed_args = set(["--rebuild", "--preserve", "--no-run"]) + allowed_args = set(["--rebuild", "--preserve", "--no-run", "--release"]) args = sys.argv[1:] if not set(args).issubset(allowed_args): usage_exit() force_rebuild = '--rebuild' in args preserve_containers = '--preserve' in args only_setup_cluster = '--no-run' in args + + def _cleanup(): + if not preserve_containers and not only_setup_cluster: + CLUSTER.down() + CARGO_INTEGRATION_TOML.unlink(missing_ok=True) + + def abort_handler(s, _): + warn(f'Received signal: {s}, gracefully stopping...') + _cleanup() + for s in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP): signal.signal(s, abort_handler) rebuild_images_if_needed(force_rebuild) + create_integration_workspace() try: - cluster.up() - cluster.is_alive() + CLUSTER.up() + CLUSTER.is_alive() if not only_setup_cluster: - cluster.run('cargo test --test integration') + CLUSTER.run('cargo test --test integration') except Exception as e: - err(e) - sys.exit(1) + # CLUSTER.print_containers_logs() + fail(e) finally: - if not preserve_containers and not only_setup_cluster: - cluster.down() + _cleanup() if __name__ == '__main__': diff --git a/integration/tests/fixtures/agent.rs b/integration/tests/fixtures/agent.rs index b80366d..1096aac 100644 --- a/integration/tests/fixtures/agent.rs +++ b/integration/tests/fixtures/agent.rs @@ -24,12 +24,13 @@ pub async fn register_agent() -> RegisteredAgent { .pop() .unwrap(); let job_id = resp.job_id; - let resp = cli.get_jobs(Some(job_id)).await.unwrap().pop().unwrap(); - assert_eq!(resp.alias, Some("agent_hello".to_string())); - let agent_data = Agent { + let job = cli.get_jobs(Some(job_id)).await.unwrap().pop().unwrap(); + assert_eq!(job.alias, Some("agent_hello".to_string())); + let mut agent_data = AssignedJob::from(&job); + agent_data.set_result(&Agent { id: agent_uid, ..Default::default() - }; - cli.report(&[Reportable::Agent(agent_data)]).await.unwrap(); + }); + cli.report(Reportable::Assigned(agent_data)).await.unwrap(); RegisteredAgent { uid: agent_uid } } diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 63de9f6..27e2c96 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -7,37 +7,34 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dotenv = "0.15.0" -serde = { version = "1.0.114", features = ["derive"] } -uuid = { version = "0.6.5", features = ["serde", "v4"] } -nix = "0.17" -libc = "^0.2" -lazy_static = "1.4.0" -futures = "0.3.5" -thiserror = "*" -diesel-derive-enum = { version = "1", features = ["postgres"], optional = true } +anyhow = "1.0.58" chrono = "0.4.19" -strum = { version = "0.20", features = ["derive"] } -once_cell = "1.7.2" -shlex = "1.0.0" -crossbeam = "0.8.1" diesel = { version = "1.4.5", features = ["postgres", "uuid"], optional = true } +diesel-derive-enum = { version = "1", features = ["postgres"], optional = true } +dotenv = "0.15.0" envy = "0.4.2" -serde_json = "1.0.81" -tracing-subscriber = { version = "0.3.14", features = ["env-filter"] } -tracing-appender = "0.2.2" +futures = "0.3.5" +guess_host_triple = "0.1.2" +libc = "^0.2" +lazy_static = "1.4.0" log = "*" -anyhow = "1.0.58" +nix = "0.17" +once_cell = "1.7.2" platforms = "3.0.1" +reqwest = { workspace = true, features = ["native-tls"] } +shlex = "1.0.0" +serde = { workspace = true } +serde_json = { workspace = true } +strum = { version = "0.20", features = ["derive"] } +thiserror = "*" +tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "process", "time"] } +tracing-appender = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +uuid = { workspace = true, features = ["serde", "v4"] } [features] panel = [] server = ["dep:diesel", "dep:diesel-derive-enum"] -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] -reqwest = { version = "0.11", features = ["json", "native-tls"] } -tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] } -guess_host_triple = "0.1.2" - [dev-dependencies] rstest = "0.12" diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index a860365..be723b4 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -5,7 +5,7 @@ use crate::{ config::MASTER_PORT, messaging::{self, AsMsg, BaseMessage, Empty}, models::{self}, - utils::opt_to_string, + utils::{opt_to_string, OneOrVec}, UError, }; use anyhow::{Context, Result}; @@ -78,14 +78,14 @@ impl ClientHandler { } // get jobs for client - pub async fn get_personal_jobs(&self, url_param: Uuid) -> Result> { + pub async fn get_personal_jobs(&self, url_param: Uuid) -> Result> { self._req(format!("get_personal_jobs/{}", url_param), Empty) .await } // send something to server - pub async fn report(&self, payload: &[messaging::Reportable]) -> Result { - self._req("report", payload).await + pub async fn report(&self, payload: impl OneOrVec) -> Result { + self._req("report", payload.into_vec()).await } // download file @@ -115,8 +115,8 @@ impl ClientHandler { } /// create and upload job - pub async fn upload_jobs(&self, payload: &[models::JobMeta]) -> Result { - self._req("upload_jobs", payload).await + pub async fn upload_jobs(&self, payload: impl OneOrVec) -> Result { + self._req("upload_jobs", payload.into_vec()).await } /// delete something @@ -125,8 +125,13 @@ impl ClientHandler { } /// set jobs for any agent - pub async fn set_jobs(&self, agent: Uuid, job_idents: &[String]) -> Result> { - self._req(format!("set_jobs/{agent}"), job_idents).await + pub async fn set_jobs( + &self, + agent: Uuid, + job_idents: impl OneOrVec, + ) -> Result> { + self._req(format!("set_jobs/{agent}"), job_idents.into_vec()) + .await } /// get jobs for any agent diff --git a/lib/u_lib/src/errors/chan.rs b/lib/u_lib/src/errors/chan.rs index 10668a4..ef3b7ea 100644 --- a/lib/u_lib/src/errors/chan.rs +++ b/lib/u_lib/src/errors/chan.rs @@ -1,3 +1,4 @@ +use crate::UError; use anyhow::Error; use once_cell::sync::OnceCell; use tokio::sync::mpsc::{channel, error::TryRecvError, Receiver, Sender}; @@ -28,9 +29,9 @@ impl ErrChan { Self::get().await.tx.try_send(err).unwrap(); } - pub async fn recv() -> Option { + pub async fn recv() -> Option { match Self::get().await.rx.try_recv() { - Ok(r) => Some(r), + Ok(err) => Some(UError::from(err)), Err(TryRecvError::Disconnected) => panic!("err chan disconnected"), Err(TryRecvError::Empty) => None, } diff --git a/lib/u_lib/src/errors/variants.rs b/lib/u_lib/src/errors/variants.rs index 2a9a48f..ff65f45 100644 --- a/lib/u_lib/src/errors/variants.rs +++ b/lib/u_lib/src/errors/variants.rs @@ -6,7 +6,7 @@ use uuid::Uuid; pub type UResult = std::result::Result; -#[derive(Error, Debug, Serialize, Deserialize, Clone)] +#[derive(PartialEq, Eq, Error, Debug, Serialize, Deserialize, Clone)] pub enum UError { #[error("Runtime error: {0}")] Runtime(String), @@ -60,3 +60,12 @@ impl From for UError { UError::DeserializeError(e.to_string()) } } + +impl From for UError { + fn from(e: anyhow::Error) -> Self { + match e.downcast::() { + Ok(err) => err, + Err(err) => UError::Runtime(err.to_string()), + } + } +} diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index c34e08b..169002f 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -1,15 +1,16 @@ -use crate::{messaging::Reportable, utils::OneOrVec}; +use crate::{models::AssignedJob, UResult}; use futures::{future::BoxFuture, lock::Mutex}; use lazy_static::lazy_static; use std::collections::HashMap; +use std::future::Future; use tokio::{ - spawn, + runtime::Handle, sync::mpsc::{channel, Receiver, Sender}, - task::JoinHandle, + task::{spawn, spawn_blocking, JoinHandle}, }; use uuid::Uuid; -pub type DynFut = BoxFuture<'static, Reportable>; +pub type ExecResult = UResult; lazy_static! { static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); @@ -21,33 +22,45 @@ lazy_static! { } struct JoinInfo { - handle: JoinHandle, + handle: JoinHandle>, completed: bool, collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed } +impl JoinInfo { + async fn wait_result(self) -> ExecResult { + self.handle.await.unwrap().await.unwrap() + } +} + fn get_sender() -> Sender { FUT_CHANNEL.0.clone() } pub struct Waiter { - tasks: Vec, + tasks: Vec>, fids: Vec, } impl Waiter { - pub fn new(tasks: impl OneOrVec) -> Self { + pub fn new() -> Self { Self { - tasks: tasks.into_vec(), + tasks: vec![], fids: vec![], } } + pub fn push(&mut self, task: impl Future + Send + 'static) { + self.tasks.push(Box::pin(task)); + } + + /// Spawn prepared tasks pub async fn spawn(mut self) -> Self { let collectable = true; //TODO: self.tasks.len() != 1; for f in self.tasks.drain(..) { - let tx = get_sender(); + let handle = Handle::current(); let fid = Uuid::new_v4(); + let tx = get_sender(); self.fids.push(fid); let task_wrapper = async move { debug!("inside wrapper (started): {}", fid); @@ -56,7 +69,7 @@ impl Waiter { result }; let handler = JoinInfo { - handle: spawn(task_wrapper), + handle: spawn_blocking(move || handle.spawn(task_wrapper)), completed: false, collectable, }; @@ -68,12 +81,11 @@ impl Waiter { /// Wait until a bunch of tasks is finished. /// NOT GUARANTEED that all tasks will be returned due to /// possibility to pop them in other places - pub async fn wait(self) -> Vec { + pub async fn wait(self) -> Vec { let mut result = vec![]; for fid in self.fids { if let Some(task) = pop_task(fid).await { - let r = task.handle.await; - result.push(r.unwrap()); + result.push(task.wait_result().await); } } result @@ -94,26 +106,25 @@ async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } -pub async fn pop_task_if_completed(fid: Uuid) -> Option { - let &mut JoinInfo { +pub async fn pop_task_if_completed(fid: Uuid) -> Option { + let &JoinInfo { handle: _, collectable, completed, - } = match FUT_RESULTS.lock().await.get_mut(&fid) { + } = match FUT_RESULTS.lock().await.get(&fid) { Some(t) => t, None => return None, }; if collectable && completed { let task = pop_task(fid).await.unwrap(); - let result = task.handle.await.unwrap(); - Some(result) + Some(task.wait_result().await) } else { None } } -pub async fn pop_completed() -> Vec { - let mut completed: Vec = vec![]; +pub async fn pop_completed() -> Vec { + let mut completed: Vec = vec![]; let fids = FUT_RESULTS .lock() .await diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index b6a94f8..62b1f0b 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -3,7 +3,6 @@ #[path = "."] pub mod exports { pub mod api; - pub mod builder; pub mod cache; pub mod config; pub mod datatypes; @@ -12,6 +11,7 @@ pub mod exports { pub mod logging; pub mod messaging; pub mod models; + pub mod runner; pub mod utils; } diff --git a/lib/u_lib/src/messaging/base.rs b/lib/u_lib/src/messaging/base.rs index 06b73a6..862f1ff 100644 --- a/lib/u_lib/src/messaging/base.rs +++ b/lib/u_lib/src/messaging/base.rs @@ -1,8 +1,8 @@ use crate::config::get_self_uid; -use crate::utils::VecDisplay; +//use crate::utils::VecDisplay; use serde::{Deserialize, Serialize}; use std::borrow::Cow; -use std::fmt::Display; +//use std::fmt::Display; use uuid::Uuid; pub struct Moo<'cow, T: AsMsg + Clone>(pub Cow<'cow, T>); @@ -28,7 +28,6 @@ impl<'cow, M: AsMsg> From<&'cow M> for Moo<'cow, M> { } impl AsMsg for Vec {} -impl AsMsg for VecDisplay {} impl<'msg, M: AsMsg> AsMsg for &'msg [M] {} #[derive(Serialize, Deserialize, Debug)] diff --git a/lib/u_lib/src/messaging/mod.rs b/lib/u_lib/src/messaging/mod.rs index e79ef34..b1342bf 100644 --- a/lib/u_lib/src/messaging/mod.rs +++ b/lib/u_lib/src/messaging/mod.rs @@ -2,14 +2,15 @@ mod base; mod files; use crate::models::*; +use crate::UError; pub use base::{AsMsg, BaseMessage}; pub use files::*; use serde::{Deserialize, Serialize}; -use std::fmt; use uuid::Uuid; impl AsMsg for Agent {} impl AsMsg for AssignedJob {} +impl AsMsg for AssignedJobById {} impl AsMsg for DownloadInfo {} impl AsMsg for Reportable {} impl AsMsg for JobMeta {} @@ -22,16 +23,9 @@ impl AsMsg for u8 {} #[derive(Serialize, Deserialize, Clone, Default, Debug)] pub struct Empty; -impl fmt::Display for Empty { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "") - } -} - #[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] pub enum Reportable { Assigned(AssignedJob), - Agent(Agent), Dummy, - Error(String), + Error(UError), } diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 71483ac..b00a677 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -6,14 +6,13 @@ use serde::{Deserialize, Serialize}; use std::{fmt, time::SystemTime}; use strum::Display; -#[cfg(not(target_arch = "wasm32"))] -use crate::builder::NamedJobBuilder; #[cfg(feature = "server")] use crate::models::schema::*; + use crate::{ config::get_self_uid, - messaging::Reportable, - unwrap_enum, + executor::ExecResult, + runner::NamedJobRunner, utils::{systime_to_string, Platform}, }; @@ -43,6 +42,8 @@ pub struct Agent { pub alias: Option, pub hostname: String, pub id: Uuid, + pub ip_gray: Option, + pub ip_white: Option, pub is_root: bool, pub is_root_allowed: bool, pub last_active: SystemTime, @@ -55,18 +56,17 @@ pub struct Agent { impl fmt::Display for Agent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut out = format!("Agent: {}", self.id); + write!(f, "Agent: {}", self.id)?; if let Some(ref alias) = self.alias { - out += &format!(" ({})", alias) + write!(f, " ({})", alias)? } - out += &format!("\nUsername: {}", self.username); - out += &format!("\nHostname: {}", self.hostname); - out += &format!("\nIs root: {}", self.is_root); - out += &format!("\nRoot allowed: {}", self.is_root_allowed); - out += &format!("\nLast active: {}", systime_to_string(&self.last_active)); - out += &format!("\nPlatform: {}", self.platform); - out += &format!("\nState: {}", self.state); - write!(f, "{}", out) + writeln!(f, "\nUsername: {}", self.username)?; + writeln!(f, "Hostname: {}", self.hostname)?; + writeln!(f, "Is root: {}", self.is_root)?; + writeln!(f, "Root allowed: {}", self.is_root_allowed)?; + writeln!(f, "Last active: {}", systime_to_string(&self.last_active))?; + writeln!(f, "Platform: {}", self.platform)?; + writeln!(f, "State: {}", self.state) } } @@ -81,7 +81,7 @@ impl Agent { #[cfg(unix)] pub async fn gather() -> Self { - let mut builder = NamedJobBuilder::from_shell(vec![ + let mut builder = NamedJobRunner::from_shell(vec![ ("hostname", "hostname"), ("is_root", "id -u"), ("username", "id -un"), @@ -89,10 +89,8 @@ impl Agent { .unwrap_one() .wait() .await; - let decoder = |job_result: Reportable| { - let assoc_job = unwrap_enum!(job_result, Reportable::Assigned); - assoc_job.to_string_result().trim().to_string() - }; + let decoder = + |job_result: ExecResult| job_result.unwrap().to_string_result().trim().to_string(); Self { hostname: decoder(builder.pop("hostname")), @@ -105,11 +103,11 @@ impl Agent { #[cfg(not(unix))] pub async fn gather() -> Self { - Self::default() + todo!() } - pub async fn run() -> Reportable { - Reportable::Agent(Agent::gather().await) + pub async fn run() -> Agent { + Agent::gather().await } } @@ -127,6 +125,8 @@ impl Default for Agent { state: AgentState::New, token: None, username: String::new(), + ip_gray: None, + ip_white: None, } } } diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index 1122031..2230f70 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -1,18 +1,12 @@ -use super::JobState; +use super::{JobMeta, JobState, JobType}; #[cfg(feature = "server")] use crate::models::schema::*; #[cfg(not(target_arch = "wasm32"))] -use crate::{cache::JobCache, utils::TempFile}; -use crate::{ - config::get_self_uid, - errors::UError, - messaging::Reportable, - utils::{systime_to_string, ProcOutput}, -}; +use crate::{config::get_self_uid, utils::ProcOutput}; #[cfg(feature = "server")] use diesel::{Identifiable, Insertable, Queryable}; use serde::{Deserialize, Serialize}; -use std::{fmt, time::SystemTime}; +use std::time::SystemTime; use uuid::Uuid; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -21,7 +15,6 @@ use uuid::Uuid; derive(Queryable, Identifiable, Insertable, AsChangeset), table_name = "results" )] - pub struct AssignedJob { pub agent_id: Uuid, pub alias: Option, @@ -30,29 +23,50 @@ pub struct AssignedJob { pub job_id: Uuid, pub result: Option>, pub state: JobState, + pub exec_type: JobType, pub retcode: Option, pub updated: SystemTime, } -impl fmt::Display for AssignedJob { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut out = format!("Result: {}", self.id); - out += &format!("\nAgent: {}", self.agent_id); - out += &format!("\nJob: {}", self.job_id); - if let Some(ref alias) = self.alias { - out += &format!("\nAlias: {}", alias); +#[derive(Serialize, Deserialize, Clone, Copy)] +pub struct AssignedJobById { + pub agent_id: Uuid, + pub id: Uuid, + pub job_id: Uuid, +} + +impl From<(&JobMeta, AssignedJobById)> for AssignedJob { + fn from((meta, pj): (&JobMeta, AssignedJobById)) -> Self { + AssignedJob { + id: pj.id, + agent_id: pj.agent_id, + job_id: pj.job_id, + alias: meta.alias.clone(), + exec_type: meta.exec_type, + ..Default::default() + } + } +} + +impl From<&JobMeta> for AssignedJob { + fn from(meta: &JobMeta) -> Self { + AssignedJob { + job_id: meta.id, + agent_id: get_self_uid(), + alias: meta.alias.clone(), + exec_type: meta.exec_type, + ..Default::default() } - out += &format!("\nUpdated: {}", systime_to_string(&self.updated)); - out += &format!("\nState: {}", self.state); - if self.state == JobState::Finished { - if let Some(ref retcode) = self.retcode { - out += &format!("\nReturn code: {}", retcode); - } - if let Some(ref result) = self.result { - out += &format!("\nResult: {}", String::from_utf8_lossy(result)); - } + } +} + +impl Default for AssignedJobById { + fn default() -> Self { + Self { + agent_id: get_self_uid(), + id: Uuid::new_v4(), + job_id: Uuid::nil(), } - write!(f, "{}", out) } } @@ -68,58 +82,13 @@ impl Default for AssignedJob { state: JobState::Queued, retcode: None, updated: SystemTime::now(), + exec_type: JobType::default(), } } } #[cfg(not(target_arch = "wasm32"))] impl AssignedJob { - pub async fn run(mut self) -> Reportable { - use tokio::process::Command; - let (argv, _payload) = { - let meta = JobCache::get(self.job_id).unwrap(); - if let Some(ref payload) = meta.payload { - let extracted_payload = match TempFile::write_exec(payload) { - Ok(p) => p, - Err(e) => return Reportable::Error(UError::Runtime(e.to_string()).to_string()), - }; - ( - meta.argv.replace("{}", &extracted_payload.get_path()), - Some(extracted_payload), - ) - } else { - (meta.argv.clone(), None) - } - }; - let mut split_cmd = shlex::split(&argv).unwrap().into_iter(); - let cmd = split_cmd.nth(0).unwrap(); - let args = split_cmd.collect::>(); - let cmd_result = Command::new(cmd).args(args).output().await; - let (data, retcode) = match cmd_result { - Ok(output) => ( - ProcOutput::from_output(&output).into_combined(), - output.status.code(), - ), - Err(e) => ( - ProcOutput::new() - .stderr(e.to_string().into_bytes()) - .into_combined(), - None, - ), - }; - self.result = Some(data); - self.retcode = retcode; - Reportable::Assigned(self) - } - - pub fn new(job_id: Uuid, other: Option<&Self>) -> Self { - Self { - agent_id: get_self_uid(), - job_id, - ..other.unwrap_or(&Default::default()).clone() - } - } - pub fn as_job_output(&self) -> Option { self.result .as_ref() @@ -139,4 +108,8 @@ impl AssignedJob { pub fn to_string_result(&self) -> String { String::from_utf8_lossy(&self.to_raw_result()).into_owned() } + + pub fn set_result(&mut self, result: &S) { + self.result = Some(serde_json::to_vec(result).unwrap()); + } } diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 2b76941..e839f8e 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -2,13 +2,11 @@ use super::JobType; #[cfg(feature = "server")] use crate::models::schema::*; use crate::utils::Platform; -use crate::{utils::Stripped, UError, UResult}; +use crate::{UError, UResult}; #[cfg(feature = "server")] use diesel::{Identifiable, Insertable, Queryable}; use serde::{Deserialize, Serialize}; -use std::path::PathBuf; -use std::str::from_utf8; -use std::{fmt, fs}; +use std::fs; use uuid::Uuid; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -18,37 +16,37 @@ use uuid::Uuid; table_name = "jobs" )] pub struct JobMeta { + #[serde(default)] pub alias: Option, + /// string like `bash -c {} -a 1 --arg2`, /// where {} is replaced by executable's tmp path + #[serde(default)] pub argv: String, + + #[serde(default = "Uuid::new_v4")] pub id: Uuid, - pub exec_type: JobType, - //pub schedule: JobSchedule, - pub platform: String, - pub payload: Option>, -} -#[derive(Deserialize, Debug)] -pub struct RawJobMeta { - #[serde(default)] - pub alias: Option, - #[serde(default)] - pub argv: String, - #[serde(default)] - pub id: UuidDefaultUnique, #[serde(default)] pub exec_type: JobType, - //pub schedule: JobSchedule, + + ///target triple #[serde(default)] - pub platform: Platform, + pub platform: String, + #[serde(default)] pub payload: Option>, + + /// if payload should be read from external resource #[serde(default)] - pub payload_path: Option, + pub payload_path: Option, + + ///cron-like string + #[serde(default)] + pub schedule: Option, } -impl RawJobMeta { +impl JobMeta { pub fn builder() -> JobMetaBuilder { JobMetaBuilder::default() } @@ -62,29 +60,6 @@ impl RawJobMeta { } } -impl fmt::Display for JobMeta { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut out = format!("Job: {}", self.id); - if let Some(ref alias) = self.alias { - out += &format!(" ({})", alias); - } - out += &format!("\nArgv: {}", self.argv); - out += &format!("\nExecutable type: {}", self.exec_type); - out += &format!("\nPlatform: {}", self.platform); - if let Some(ref payload) = self.payload { - if self.exec_type == JobType::Shell { - let payload = if let Ok(str_payload) = from_utf8(payload) { - Stripped(&str_payload).to_string() - } else { - Stripped(&payload).to_string() - }; - out += &format!("\nPayload: {}", payload); - } - } - write!(f, "{}", out) - } -} - impl Default for JobMeta { fn default() -> Self { Self { @@ -94,19 +69,7 @@ impl Default for JobMeta { exec_type: JobType::Shell, platform: Platform::current().into_string(), payload: None, - } - } -} - -impl Default for RawJobMeta { - fn default() -> Self { - Self { - id: UuidDefaultUnique::default(), - alias: None, - argv: String::new(), - exec_type: JobType::Shell, - platform: Platform::current(), - payload: None, + schedule: None, payload_path: None, } } @@ -114,12 +77,13 @@ impl Default for RawJobMeta { #[derive(Default)] pub struct JobMetaBuilder { - inner: RawJobMeta, + inner: JobMeta, } impl JobMetaBuilder { pub fn with_shell(mut self, shell_cmd: impl Into) -> Self { self.inner.argv = shell_cmd.into(); + self.inner.exec_type = JobType::Shell; self } @@ -128,7 +92,7 @@ impl JobMetaBuilder { self } - pub fn with_payload_src(mut self, path: impl Into) -> Self { + pub fn with_payload_src(mut self, path: impl Into) -> Self { self.inner.payload_path = Some(path.into()); self } @@ -148,6 +112,7 @@ impl JobMetaBuilder { match inner.exec_type { JobType::Shell => { if inner.argv.is_empty() { + // TODO: fix detecting inner.argv = String::from("/bin/bash -c {}") } let argv_parts = @@ -157,9 +122,8 @@ impl JobMetaBuilder { return Err(empty_err.into()); } if let Some(path) = &inner.payload_path { - let data = fs::read(path.clone()).map_err(|e| { - UError::FSError(path.to_string_lossy().to_string(), e.to_string()) - })?; + let data = fs::read(path) + .map_err(|e| UError::FSError(path.to_string(), e.to_string()))?; inner.payload = Some(data) } match inner.payload.as_ref() { @@ -181,38 +145,15 @@ impl JobMetaBuilder { } } }; - if !inner.platform.check() { + if !Platform::new(&inner.platform).check() { return Err(UError::JobArgsError(format!( "Unknown platform {}", - inner.platform.into_string() + inner.platform ))); } Ok(inner.into()) } - JobType::Manage => Ok(inner.into()), - _ => todo!(), - } - } -} - -impl From for JobMeta { - fn from(rjm: RawJobMeta) -> Self { - JobMeta { - alias: rjm.alias, - argv: rjm.argv, - id: rjm.id.0, - exec_type: rjm.exec_type, - platform: rjm.platform.into_string(), - payload: rjm.payload, + _ => Ok(inner.into()), } } } - -#[derive(Deserialize, Debug)] -pub struct UuidDefaultUnique(Uuid); - -impl Default for UuidDefaultUnique { - fn default() -> Self { - Self(Uuid::new_v4()) - } -} diff --git a/lib/u_lib/src/models/jobs/misc.rs b/lib/u_lib/src/models/jobs/misc.rs index df91f0d..ec9b866 100644 --- a/lib/u_lib/src/models/jobs/misc.rs +++ b/lib/u_lib/src/models/jobs/misc.rs @@ -3,21 +3,6 @@ use diesel_derive_enum::DbEnum; use serde::{Deserialize, Serialize}; use strum::Display; -#[derive(Serialize, Deserialize, Clone, Debug)] -pub enum ManageAction { - Ping, - UpdateAvailable, - JobsResultsRequest, - Terminate, -} - -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub enum JobSchedule { - Once, - Permanent, - //Scheduled -} - #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)] #[cfg_attr( feature = "server", @@ -26,13 +11,17 @@ pub enum JobSchedule { DieselType = "Jobstate" )] pub enum JobState { - Queued, // server created a job, but client didn't get it yet - //Pending, // client got a job, but not running yet - Running, // client is currently running a job + /// server created a job, but client didn't get it yet + Queued, + + // client got a job, but not running yet + //Pending, + /// client is currently running a job + Running, Finished, } -#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq, Display)] +#[derive(Default, Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Display)] #[cfg_attr( feature = "server", derive(DbEnum), @@ -40,8 +29,9 @@ pub enum JobState { DieselType = "Jobtype" )] pub enum JobType { - Manage, + Init, #[default] Shell, - Python, + Terminate, + Update, } diff --git a/lib/u_lib/src/models/schema.rs b/lib/u_lib/src/models/schema.rs index 83c1f59..fd2db7b 100644 --- a/lib/u_lib/src/models/schema.rs +++ b/lib/u_lib/src/models/schema.rs @@ -5,6 +5,8 @@ table! { alias -> Nullable, hostname -> Text, id -> Uuid, + ip_gray -> Nullable, + ip_white -> Nullable, is_root -> Bool, is_root_allowed -> Bool, last_active -> Timestamp, @@ -26,32 +28,6 @@ table! { } } -table! { - use crate::schema_exports::*; - - errors (id) { - agent_id -> Uuid, - created -> Timestamp, - id -> Uuid, - msg -> Nullable, - } -} - -table! { - use crate::schema_exports::*; - - ip_addrs (id) { - agent_id -> Uuid, - check_ts -> Timestamp, - gateway -> Nullable, - id -> Uuid, - iface -> Text, - ip_addr -> Text, - is_gray -> Bool, - netmask -> Text, - } -} - table! { use crate::schema_exports::*; @@ -62,6 +38,8 @@ table! { exec_type -> Jobtype, platform -> Text, payload -> Nullable, + payload_path -> Nullable, + schedule -> Nullable, } } @@ -76,22 +54,14 @@ table! { job_id -> Uuid, result -> Nullable, state -> Jobstate, + exec_type -> Jobtype, retcode -> Nullable, updated -> Timestamp, } } joinable!(certificates -> agents (agent_id)); -joinable!(errors -> agents (agent_id)); -joinable!(ip_addrs -> agents (agent_id)); joinable!(results -> agents (agent_id)); joinable!(results -> jobs (job_id)); -allow_tables_to_appear_in_same_query!( - agents, - certificates, - errors, - ip_addrs, - jobs, - results, -); +allow_tables_to_appear_in_same_query!(agents, certificates, jobs, results,); diff --git a/lib/u_lib/src/builder.rs b/lib/u_lib/src/runner.rs similarity index 54% rename from lib/u_lib/src/builder.rs rename to lib/u_lib/src/runner.rs index 84e46c5..133a6c0 100644 --- a/lib/u_lib/src/builder.rs +++ b/lib/u_lib/src/runner.rs @@ -1,97 +1,136 @@ use crate::{ cache::JobCache, - executor::{DynFut, Waiter}, - messaging::Reportable, - models::{Agent, AssignedJob, JobMeta, JobType, RawJobMeta}, + executor::{ExecResult, Waiter}, + models::{Agent, AssignedJob, AssignedJobById, JobMeta, JobType}, utils::{CombinedResult, OneOrVec, Platform}, + utils::{ProcOutput, TempFile}, UError, UResult, }; use std::collections::HashMap; +use std::process::exit; +use tokio::process::Command; -pub struct JobBuilder { +pub struct JobRunner { waiter: Waiter, } -impl JobBuilder { - pub fn from_request(job_requests: impl OneOrVec) -> CombinedResult { - let job_requests = job_requests.into_vec(); - let mut prepared: Vec = vec![]; - let mut result = CombinedResult::::new(); - for req in job_requests { - let job_meta = JobCache::get(req.job_id); - if job_meta.is_none() { - result.err(UError::NoJob(req.job_id)); - continue; - } - let job_meta = job_meta.unwrap(); +impl JobRunner { + pub fn from_jobs(jobs: impl OneOrVec) -> CombinedResult { + let jobs = jobs.into_vec(); + let mut waiter = Waiter::new(); + let mut result = CombinedResult::::new(); + for job in jobs { //waiting for try-blocks stabilization - let built_req = (|| -> UResult<()> { - Ok(match job_meta.exec_type { - JobType::Shell => { - let meta = JobCache::get(req.job_id).ok_or(UError::NoJob(req.job_id))?; - let curr_platform = Platform::current(); - if !curr_platform.matches(&meta.platform) { - return Err(UError::InsuitablePlatform( - meta.platform.clone(), - curr_platform.into_string(), - ) - .into()); - } - let job = AssignedJob::new(req.job_id, Some(&req)); - prepared.push(Box::pin(job.run())) - } - JobType::Manage => prepared.push(Box::pin(Agent::run())), - _ => todo!(), - }) + let built_job = (|| -> UResult<()> { + let meta = JobCache::get(job.job_id).ok_or(UError::NoJob(job.job_id))?; + let curr_platform = Platform::current(); + if !curr_platform.matches(&meta.platform) { + return Err(UError::InsuitablePlatform( + meta.platform.clone(), + curr_platform.into_string(), + ) + .into()); + } + let job = AssignedJob::from((&*meta, job)); + waiter.push(run_assigned_job(job)); + Ok(()) })(); - if let Err(e) = built_req { + if let Err(e) = built_job { result.err(e) } } - result.ok(Self { - waiter: Waiter::new(prepared), - }); + result.ok(Self { waiter }); result } - pub fn from_meta(job_metas: impl OneOrVec) -> CombinedResult { - let job_requests = job_metas + pub fn from_meta(metas: impl OneOrVec) -> CombinedResult { + let jobs = metas .into_vec() .into_iter() .map(|jm| { - let j_uid = jm.id; + let job_uid = jm.id; JobCache::insert(jm); - AssignedJob::new(j_uid, None) + AssignedJobById { + job_id: job_uid, + ..Default::default() + } }) - .collect::>(); - JobBuilder::from_request(job_requests) + .collect::>(); + JobRunner::from_jobs(jobs) } - /// Spawn jobs and pop results later + /// Spawn jobs pub async fn spawn(mut self) -> Self { self.waiter = self.waiter.spawn().await; self } /// Spawn jobs and wait for result - pub async fn wait(self) -> Vec { + pub async fn wait(self) -> Vec { self.waiter.spawn().await.wait().await } /// Spawn one job and wait for result - pub async fn wait_one(self) -> Reportable { + pub async fn wait_one(self) -> ExecResult { self.waiter.spawn().await.wait().await.pop().unwrap() } } +pub async fn run_assigned_job(mut job: AssignedJob) -> ExecResult { + match job.exec_type { + JobType::Shell => { + let (argv, _payload) = { + let meta = JobCache::get(job.job_id).unwrap(); + if let Some(ref payload) = meta.payload { + let extracted_payload = match TempFile::write_exec(payload) { + Ok(p) => p, + Err(e) => return Err(UError::Runtime(e.to_string())), + }; + ( + meta.argv.replace("{}", &extracted_payload.get_path()), + Some(extracted_payload), + ) + } else { + (meta.argv.clone(), None) + } + }; + let mut split_cmd = shlex::split(&argv).unwrap().into_iter(); + let cmd = split_cmd.nth(0).unwrap(); + let args = split_cmd.collect::>(); + let cmd_result = Command::new(cmd).args(args).output().await; + let (data, retcode) = match cmd_result { + Ok(output) => ( + ProcOutput::from_output(&output).into_combined(), + output.status.code(), + ), + Err(e) => ( + ProcOutput::new() + .stderr(e.to_string().into_bytes()) + .into_combined(), + None, + ), + }; + job.result = Some(data); + job.retcode = retcode; + } + JobType::Init => { + job.set_result(&Agent::run().await); + job.retcode = Some(0); + } + JobType::Update => todo!(), + JobType::Terminate => exit(0), + }; + Ok(job) +} + /// Store jobs and get results by name -pub struct NamedJobBuilder { - builder: Option, +pub struct NamedJobRunner { + builder: Option, job_names: Vec<&'static str>, - results: HashMap<&'static str, Reportable>, + results: HashMap<&'static str, ExecResult>, } -impl NamedJobBuilder { +impl NamedJobRunner { pub fn from_shell( named_jobs: impl OneOrVec<(&'static str, &'static str)>, ) -> CombinedResult { @@ -100,7 +139,7 @@ impl NamedJobBuilder { .into_vec() .into_iter() .filter_map( - |(alias, cmd)| match RawJobMeta::builder().with_shell(cmd).build() { + |(alias, cmd)| match JobMeta::builder().with_shell(cmd).build() { Ok(meta) => Some((alias, meta)), Err(e) => { result.err(e); @@ -124,7 +163,7 @@ impl NamedJobBuilder { }) .collect(); Self { - builder: Some(JobBuilder::from_meta(job_metas).unwrap_one()), + builder: Some(JobRunner::from_meta(job_metas).unwrap_one()), job_names, results: HashMap::new(), } @@ -138,11 +177,11 @@ impl NamedJobBuilder { self } - pub fn pop_opt(&mut self, name: &'static str) -> Option { + pub fn pop_opt(&mut self, name: &'static str) -> Option { self.results.remove(name) } - pub fn pop(&mut self, name: &'static str) -> Reportable { + pub fn pop(&mut self, name: &'static str) -> ExecResult { self.pop_opt(name).unwrap() } } @@ -151,8 +190,8 @@ impl NamedJobBuilder { mod tests { use super::*; use crate::{ - builder::{JobBuilder, NamedJobBuilder}, models::{misc::JobType, JobMeta}, + runner::{JobRunner, NamedJobRunner}, unwrap_enum, }; use std::time::SystemTime; @@ -162,10 +201,10 @@ mod tests { #[tokio::test] async fn test_is_really_async() { const SLEEP_SECS: u64 = 1; - let job = RawJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); + let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); let now = SystemTime::now(); - JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await; + JobRunner::from_meta(sleep_jobs).unwrap_one().wait().await; assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) } @@ -199,13 +238,16 @@ mod tests { #[case] payload: Option<&[u8]>, #[case] expected_result: &str, ) -> TestResult { - let mut job = RawJobMeta::builder().with_shell(cmd); + let mut job = JobMeta::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_payload(p); } let job = job.build().unwrap(); - let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; - let result = unwrap_enum!(job_result, Reportable::Assigned); + let result = JobRunner::from_meta(job) + .unwrap_one() + .wait_one() + .await + .unwrap(); let result = result.to_string_result(); assert_eq!(result.trim(), expected_result); Ok(()) @@ -215,29 +257,25 @@ mod tests { async fn test_complex_load() -> TestResult { const SLEEP_SECS: u64 = 1; let now = SystemTime::now(); - let longest_job = RawJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); - let longest_job = JobBuilder::from_meta(longest_job) - .unwrap_one() - .spawn() - .await; - let ls = JobBuilder::from_meta(RawJobMeta::from_shell("ls")?) + let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); + let longest_job = JobRunner::from_meta(longest_job).unwrap_one().spawn().await; + let ls = JobRunner::from_meta(JobMeta::from_shell("ls")?) .unwrap_one() .wait_one() - .await; - let ls = unwrap_enum!(ls, Reportable::Assigned); + .await + .unwrap(); assert_eq!(ls.retcode.unwrap(), 0); let folders = ls.to_string_result(); let subfolders_jobs: Vec = folders .lines() - .map(|f| RawJobMeta::from_shell(format!("ls {}", f)).unwrap()) + .map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) .collect(); - let ls_subfolders = JobBuilder::from_meta(subfolders_jobs) + let ls_subfolders = JobRunner::from_meta(subfolders_jobs) .unwrap_one() .wait() .await; for result in ls_subfolders { - let result = unwrap_enum!(result, Reportable::Assigned); - assert_eq!(result.retcode.unwrap(), 0); + assert_eq!(result.unwrap().retcode.unwrap(), 0); } longest_job.wait().await; assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); @@ -263,9 +301,12 @@ mod tests { */ #[tokio::test] async fn test_failing_shell_job() -> TestResult { - let job = RawJobMeta::from_shell("lol_kek_puk")?; - let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; - let job_result = unwrap_enum!(job_result, Reportable::Assigned); + let job = JobMeta::from_shell("lol_kek_puk")?; + let job_result = JobRunner::from_meta(job) + .unwrap_one() + .wait_one() + .await + .unwrap(); let output = job_result.to_string_result(); assert!(output.contains("No such file")); assert!(job_result.retcode.is_none()); @@ -281,7 +322,7 @@ mod tests { #[case] payload: Option<&[u8]>, #[case] err_str: &str, ) -> TestResult { - let mut job = RawJobMeta::builder().with_shell(cmd); + let mut job = JobMeta::builder().with_shell(cmd); if let Some(p) = payload { job = job.with_payload(p); } @@ -293,17 +334,17 @@ mod tests { #[tokio::test] async fn test_different_job_types() -> TestResult { - let mut jobs = NamedJobBuilder::from_meta(vec![ - ("sleeper", RawJobMeta::from_shell("sleep 3")?), + let mut jobs = NamedJobRunner::from_meta(vec![ + ("sleeper", JobMeta::from_shell("sleep 3")?), ( "gatherer", - RawJobMeta::builder().with_type(JobType::Manage).build()?, + JobMeta::builder().with_type(JobType::Init).build()?, ), ]) .wait() .await; let gathered = jobs.pop("gatherer"); - assert_eq!(unwrap_enum!(gathered, Reportable::Agent).alias, None); + assert_eq!(gathered.unwrap().alias, None); Ok(()) } } diff --git a/lib/u_lib/src/utils/fmt/stripped.rs b/lib/u_lib/src/utils/fmt/stripped.rs index 21d0032..44af3ca 100644 --- a/lib/u_lib/src/utils/fmt/stripped.rs +++ b/lib/u_lib/src/utils/fmt/stripped.rs @@ -38,9 +38,10 @@ impl<'a> Strippable for &'a Vec { self.iter() } } -pub struct Stripped<'inner, Inner: Strippable + 'inner>(pub &'inner Inner); -impl<'inner, Inner: Strippable + 'inner> Stripped<'inner, Inner> { +pub struct Stripped<'i, Inner: Strippable + 'i>(pub &'i Inner); + +impl<'i, Inner: Strippable + 'i> Stripped<'i, Inner> { fn iter(&self) -> Inner::TypeIter { self.0.iterator() } @@ -54,7 +55,7 @@ impl<'inner, Inner: Strippable + 'inner> Stripped<'inner, Inner> { } } -impl<'inner, Inner: Strippable + 'inner> fmt::Display for Stripped<'inner, Inner> { +impl<'i, Inner: Strippable + 'i> fmt::Display for Stripped<'i, Inner> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let placeholder = self.placeholder(); for c in self.iter().take(MAX_DATA_LEN - placeholder.len()) { diff --git a/lib/u_lib/src/utils/mod.rs b/lib/u_lib/src/utils/mod.rs index 4d6e004..5d9f9b4 100644 --- a/lib/u_lib/src/utils/mod.rs +++ b/lib/u_lib/src/utils/mod.rs @@ -10,7 +10,6 @@ pub mod storage; pub mod tempfile; #[cfg(unix)] pub mod unix; -pub mod vec_display; pub use combined_result::*; pub use conv::*; @@ -22,7 +21,6 @@ pub use proc_output::*; pub use storage::*; #[cfg(not(target_arch = "wasm32"))] pub use tempfile::*; -pub use vec_display::*; #[cfg(unix)] pub use unix::*; diff --git a/lib/u_lib/src/utils/platform.rs b/lib/u_lib/src/utils/platform.rs index 7e13301..6ecb82b 100644 --- a/lib/u_lib/src/utils/platform.rs +++ b/lib/u_lib/src/utils/platform.rs @@ -7,6 +7,10 @@ use std::str::FromStr; pub struct Platform(String); impl Platform { + pub fn new(p: impl Into) -> Self { + Self(p.into()) + } + pub fn current() -> Platform { Self(guess_host_triple().unwrap_or("unknown").to_string()) } diff --git a/migrations/2020-10-24-111622_create_all/up.sql b/migrations/2020-10-24-111622_create_all/up.sql index a26d094..c2d5ac9 100644 --- a/migrations/2020-10-24-111622_create_all/up.sql +++ b/migrations/2020-10-24-111622_create_all/up.sql @@ -1,5 +1,5 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -CREATE TYPE JobType AS ENUM ('shell', 'manage', 'python'); +CREATE TYPE JobType AS ENUM ('shell', 'init', 'python'); CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished'); CREATE TYPE AgentState AS ENUM ('new', 'active', 'banned'); @@ -7,6 +7,8 @@ CREATE TABLE IF NOT EXISTS agents ( alias TEXT , hostname TEXT NOT NULL , id UUID NOT NULL DEFAULT uuid_generate_v4() + , ip_gray TEXT + , ip_white TEXT , is_root BOOLEAN NOT NULL DEFAULT false , is_root_allowed BOOLEAN NOT NULL DEFAULT false , last_active TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP @@ -17,20 +19,8 @@ CREATE TABLE IF NOT EXISTS agents ( -- is needed to processing requests , token TEXT , username TEXT NOT NULL - , PRIMARY KEY(id) -); -CREATE TABLE IF NOT EXISTS ip_addrs ( - agent_id UUID NOT NULL - , check_ts TIMESTAMP NOT NULL - , gateway TEXT - , id UUID NOT NULL DEFAULT uuid_generate_v4() - , iface TEXT NOT NULL - , ip_addr TEXT NOT NULL - , is_gray BOOLEAN NOT NULL DEFAULT true - , netmask TEXT NOT NULL , PRIMARY KEY(id) - , FOREIGN KEY(agent_id) REFERENCES agents(id) ); CREATE TABLE IF NOT EXISTS jobs ( @@ -40,6 +30,9 @@ CREATE TABLE IF NOT EXISTS jobs ( , exec_type JobType NOT NULL DEFAULT 'shell' , platform TEXT NOT NULL , payload BYTEA + , payload_path TEXT + , schedule TEXT + , PRIMARY KEY(id) ); @@ -51,8 +44,10 @@ CREATE TABLE IF NOT EXISTS results ( , job_id UUID NOT NULL , result BYTEA , state JobState NOT NULL DEFAULT 'queued' + , exec_type JobType NOT NULL DEFAULT 'shell' , retcode INTEGER , updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + , FOREIGN KEY(agent_id) REFERENCES agents(id) ON DELETE CASCADE , FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE , PRIMARY KEY(id) @@ -62,6 +57,7 @@ CREATE TABLE IF NOT EXISTS certificates ( agent_id UUID NOT NULL , id UUID NOT NULL DEFAULT uuid_generate_v4() , is_revoked BOOLEAN NOT NULL DEFAULT FALSE + , PRIMARY KEY(id) , FOREIGN KEY(agent_id) REFERENCES agents(id) ); \ No newline at end of file