refactor again

- deduplicate deps
- simplify & unify Job* interfaces
- unify api with OneOrVec
- remove some Display impls
- remove ips db table
pull/1/head
plazmoid 2 years ago
parent a594348a30
commit 6fe0e71959
  1. 10
      Cargo.toml
  2. 10
      bin/u_agent/Cargo.toml
  3. 42
      bin/u_agent/src/lib.rs
  4. 27
      bin/u_panel/Cargo.toml
  5. 8
      bin/u_panel/src/argparse.rs
  6. 2
      bin/u_run/Cargo.toml
  7. 19
      bin/u_server/Cargo.toml
  8. 2
      bin/u_server/src/db.rs
  9. 42
      bin/u_server/src/handlers.rs
  10. 4
      bin/u_server/src/u_server.rs
  11. 3
      images/integration-tests/tests_runner.Dockerfile
  12. 5
      images/integration-tests/u_db.Dockerfile
  13. 22
      integration/Cargo.toml
  14. 23
      integration/docker-compose.yml
  15. 79
      integration/docker.py
  16. 72
      integration/docker_compose.py
  17. 55
      integration/integration_tests.py
  18. 11
      integration/tests/fixtures/agent.rs
  19. 41
      lib/u_lib/Cargo.toml
  20. 21
      lib/u_lib/src/api.rs
  21. 5
      lib/u_lib/src/errors/chan.rs
  22. 11
      lib/u_lib/src/errors/variants.rs
  23. 51
      lib/u_lib/src/executor.rs
  24. 2
      lib/u_lib/src/lib.rs
  25. 5
      lib/u_lib/src/messaging/base.rs
  26. 12
      lib/u_lib/src/messaging/mod.rs
  27. 44
      lib/u_lib/src/models/agent.rs
  28. 119
      lib/u_lib/src/models/jobs/assigned.rs
  29. 117
      lib/u_lib/src/models/jobs/meta.rs
  30. 32
      lib/u_lib/src/models/jobs/misc.rs
  31. 42
      lib/u_lib/src/models/schema.rs
  32. 203
      lib/u_lib/src/runner.rs
  33. 7
      lib/u_lib/src/utils/fmt/stripped.rs
  34. 2
      lib/u_lib/src/utils/mod.rs
  35. 4
      lib/u_lib/src/utils/platform.rs
  36. 22
      migrations/2020-10-24-111622_create_all/up.sql

@ -8,6 +8,16 @@ members = [
"integration"
]
[workspace.dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.11", features = ["macros"] }
tracing = "0.1.35"
tracing-appender = "0.2.0"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"]}
uuid = "0.6.5"
[profile.release]
panic = "abort"
strip = "symbols"

@ -7,10 +7,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] }
sysinfo = "0.10.5"
log = "^0.4"
uuid = "0.6.5"
reqwest = { version = "0.11", features = ["json"] }
u_lib = { version = "*", path = "../../lib/u_lib" }
reqwest = { workspace = true }
sysinfo = "0.10.5"
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "time"] }
uuid = { workspace = true }
u_lib = { path = "../../lib/u_lib" }

@ -9,16 +9,16 @@ extern crate log;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use u_lib::{
api::ClientHandler, builder::JobBuilder, cache::JobCache, config::get_self_uid,
errors::ErrChan, executor::pop_completed, logging::init_logger, messaging::Reportable,
models::AssignedJob, utils::load_env_default, UError,
api::ClientHandler, cache::JobCache, config::get_self_uid, errors::ErrChan,
executor::pop_completed, logging::init_logger, messaging::Reportable, models::AssignedJobById,
runner::JobRunner, utils::load_env_default,
};
const ITERATION_LATENCY: u64 = 5;
pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) {
if !job_requests.is_empty() {
for jr in &job_requests {
pub async fn process_request(jobs: Vec<AssignedJobById>, client: &ClientHandler) {
if !jobs.is_empty() {
for jr in &jobs {
if !JobCache::contains(jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job = loop {
@ -35,20 +35,19 @@ pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHand
}
info!(
"Scheduling jobs: {}",
job_requests
.iter()
jobs.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join(", ")
);
let mut builder = JobBuilder::from_request(job_requests);
let errors = builder.pop_errors();
let mut runner = JobRunner::from_jobs(jobs);
let errors = runner.pop_errors();
if !errors.is_empty() {
for e in errors {
ErrChan::send(e, "ebld").await;
}
}
builder.unwrap_one().spawn().await;
runner.unwrap_one().spawn().await;
}
}
@ -57,7 +56,7 @@ async fn error_reporting(client: Arc<ClientHandler>) -> ! {
match ErrChan::recv().await {
Some(err) => {
'retry: for _ in 0..3 {
match client.report(&[Reportable::Error(err.to_string())]).await {
match client.report(Reportable::Error(err.clone())).await {
Ok(_) => break 'retry,
Err(e) => {
debug!("Reporting error: {:?}", e);
@ -71,17 +70,24 @@ async fn error_reporting(client: Arc<ClientHandler>) -> ! {
}
}
async fn do_stuff(client: Arc<ClientHandler>) -> ! {
async fn agent_loop(client: Arc<ClientHandler>) -> ! {
loop {
match client.get_personal_jobs(get_self_uid()).await {
Ok(resp) => {
process_request(resp, &client).await;
Ok(jobs) => {
process_request(jobs, &client).await;
}
Err(err) => ErrChan::send(err, "processing").await,
}
let result: Vec<Reportable> = pop_completed().await.into_iter().collect();
let result: Vec<Reportable> = pop_completed()
.await
.into_iter()
.map(|result| match result {
Ok(r) => Reportable::Assigned(r),
Err(e) => Reportable::Error(e),
})
.collect();
if !result.is_empty() {
if let Err(err) = client.report(&result).await {
if let Err(err) = client.report(result).await {
ErrChan::send(err, "report").await;
}
}
@ -110,5 +116,5 @@ pub async fn run_forever() -> ! {
// }
}
info!("Startup");
do_stuff(client).await
agent_loop(client).await
}

@ -7,23 +7,22 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-cors = "0.6.1"
actix-web = "4.1"
structopt = "0.3.21"
uuid = "0.6.5"
serde_json = "1.0.4"
serde = { version = "1.0.114", features = ["derive"] }
tokio = { version = "1.11.0", features = ["rt", "rt-multi-thread"] }
u_lib = { version = "*", path = "../../lib/u_lib", features = ["panel"] }
anyhow = "1.0.44"
strum = { version = "0.22.0", features = ["derive"] }
futures-util = "0.3.21"
mime_guess = "2.0.4"
once_cell = "1.8.0"
tracing = "0.1.29"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"]}
signal-hook = "0.3.12"
tracing-appender = "0.2.0"
rust-embed = { version = "6.3.0", features = ["debug-embed", "compression"] }
mime_guess = "2.0.4"
serde = { workspace = true }
serde_json = { workspace = true }
strum = { version = "0.22.0", features = ["derive"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-appender = { workspace = true }
shlex = "1.1.0"
structopt = "0.3.21"
thiserror = "1.0.31"
futures-util = "0.3.21"
actix-cors = "0.6.1"
uuid = { workspace = true }
u_lib = { version = "*", path = "../../lib/u_lib", features = ["panel"] }

@ -4,7 +4,7 @@ use u_lib::{
api::ClientHandler,
datatypes::PanelResult,
messaging::AsMsg,
models::{Agent, AssignedJob, JobMeta, RawJobMeta},
models::{Agent, AssignedJob, JobMeta},
UError, UResult,
};
use uuid::Uuid;
@ -96,9 +96,9 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> UResult<String> {
},
Cmd::Jobs(action) => match action {
JobCRUD::Create { job } => {
let raw_job = serde_json::from_str::<RawJobMeta>(&job)?;
let raw_job = serde_json::from_str::<JobMeta>(&job)?;
let job = raw_job.into_builder().build()?;
to_json(client.upload_jobs(&[job]).await)
to_json(client.upload_jobs(job).await)
}
JobCRUD::RUD(RUD::Read { uid }) => to_json(client.get_jobs(uid).await),
JobCRUD::RUD(RUD::Update { item }) => {
@ -111,7 +111,7 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> UResult<String> {
JobMapCRUD::Create {
agent_uid,
job_idents,
} => to_json(client.set_jobs(agent_uid, &job_idents).await),
} => to_json(client.set_jobs(agent_uid, job_idents).await),
JobMapCRUD::RUD(RUD::Read { uid }) => to_json(client.get_agent_jobs(uid).await),
JobMapCRUD::RUD(RUD::Update { item }) => {
let assigned = serde_json::from_str::<AssignedJob>(&item)?;

@ -7,5 +7,5 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
nix = "0.17"
libc = "^0.2"
nix = "0.17"

@ -5,17 +5,18 @@ name = "u_server"
version = "0.1.0"
[dependencies]
tracing = "0.1.35"
thiserror = "*"
warp = { version = "0.3.1", features = ["tls"] }
uuid = { version = "0.6.5", features = ["serde", "v4"] }
once_cell = "1.7.2"
diesel = { version = "1.4.5", features = ["postgres", "uuid"] }
hyper = "0.14"
once_cell = "1.7.2"
openssl = "*"
diesel = { version = "1.4.5", features = ["postgres", "uuid"] }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.9", features = ["macros"] }
u_lib = { path = "../../lib/u_lib", version = "*", features = ["server"] }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = "*"
tracing = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
uuid = { workspace = true, features = ["serde", "v4"] }
u_lib = { path = "../../lib/u_lib", features = ["server"] }
warp = { version = "0.3.1", features = ["tls"] }
[dev-dependencies]

@ -11,6 +11,7 @@ use uuid::Uuid;
pub struct UDB {
pub conn: PgConnection,
__p: (),
}
static DB: OnceCell<Mutex<UDB>> = OnceCell::new();
@ -33,6 +34,7 @@ impl UDB {
);
let instance = UDB {
conn: PgConnection::establish(&db_url).unwrap(),
__p: (),
};
Mutex::new(instance)
})

@ -6,7 +6,7 @@ use diesel::SaveChangesDsl;
use u_lib::{
messaging::{AsMsg, BaseMessage, Reportable},
models::*,
utils::{OneOrVec, Stripped},
utils::OneOrVec,
};
use uuid::Uuid;
use warp::Rejection;
@ -88,36 +88,46 @@ impl Endpoints {
msg: BaseMessage<'static, Data>,
) -> EndpResult<()> {
let id = msg.id;
let mut failed = vec![];
for entry in msg.into_inner().into_vec() {
match entry {
Reportable::Assigned(mut result) => {
if id != result.agent_id {
let result_agent_id = &result.agent_id;
if id != *result_agent_id {
warn!("Ids are not equal! actual id: {id}, job id: {result_agent_id}");
continue;
}
result.state = JobState::Finished;
result.updated = SystemTime::now();
match result.exec_type {
JobType::Init => match &result.result {
Some(rbytes) => {
let mut agent: Agent = match serde_json::from_slice(&rbytes) {
Ok(a) => a,
Err(e) => {
warn!("Error deserializing agent from {id}: {e}");
continue;
}
};
agent.state = AgentState::Active;
Self::add_agent(agent).await?;
}
None => warn!("Empty agent data"),
},
JobType::Shell => (),
JobType::Terminate => todo!(),
JobType::Update => todo!(),
}
let db = UDB::lock_db();
if let Err(e) = result
result
.save_changes::<AssignedJob>(&db.conn)
.map_err(Error::from)
{
failed.push(e.to_string())
}
}
Reportable::Agent(mut a) => {
a.state = AgentState::Active;
Self::add_agent(a).await?;
.map_err(Error::from)?;
}
Reportable::Error(e) => {
warn!("{} reported an error: {}", id, Stripped(&e.as_str()));
warn!("{} reported an error: {}", id, e);
}
Reportable::Dummy => (),
}
}
if !failed.is_empty() {
return Err(Error::ProcessingError(failed.join(", ")).into());
}
Ok(())
}

@ -156,8 +156,8 @@ pub fn prefill_jobs() -> SResult<()> {
match if_job_exists {
Ok(_) => Ok(()),
Err(Error::DBError(diesel::result::Error::NotFound)) => {
let agent_hello = RawJobMeta::builder()
.with_type(misc::JobType::Manage)
let agent_hello = JobMeta::builder()
.with_type(JobType::Init)
.with_alias(job_alias)
.build()
.unwrap();

@ -1,4 +1,5 @@
FROM rust:1.62
FROM rust:1.64
RUN rustup target add x86_64-unknown-linux-musl
RUN mkdir -p /tests && chmod 777 /tests
CMD ["sleep", "3600"]

@ -1,6 +1,7 @@
FROM postgres:14.5
ENV DEBIAN_FRONTEND=noninteractive
RUN apt update && apt upgrade -y
RUN apt install -y curl build-essential libpq-dev iproute2
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable --profile minimal
@ -9,4 +10,8 @@ RUN rustup target add x86_64-unknown-linux-musl
RUN cargo install diesel_cli --no-default-features --features postgres
RUN mkdir -p /unki
ENV LC_ALL en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US.UTF-8
RUN apt install -y locales locales-all
COPY u_db_entrypoint.sh /unki/

@ -7,20 +7,16 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] }
tracing = "0.1.35"
uuid = { version = "0.6.5", features = ["serde", "v4"] }
reqwest = { version = "0.11", features = ["json"] }
serde_json = "1.0"
serde = { version = "1.0.114", features = ["derive"] }
shlex = "1.0.0"
rstest = "0.12"
once_cell = "1.10.0"
[dependencies.u_lib]
path = "../lib/u_lib"
version = "*"
features = ["panel"]
reqwest = { workspace = true }
rstest = "0.12"
serde = { workspace = true }
serde_json = { workspace = true }
shlex = "1.0.0"
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "time"] }
tracing = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
u_lib = { path = "../lib/u_lib", features = ["panel"] }
[[test]]

@ -1,4 +1,8 @@
version: "2.1"
version: "3.4"
x-global:
user: &user
"${DOCKER_UID:-1000}:${DOCKER_GID:-1000}"
networks:
u_net:
@ -6,7 +10,7 @@ networks:
services:
u_server:
user: "${DOCKER_UID:-1000}:${DOCKER_GID:-1000}"
user: *user
image: unki/u_server
networks:
- u_net
@ -53,7 +57,7 @@ services:
retries: 3
u_agent:
user: "${DOCKER_UID:-1000}:${DOCKER_GID:-1000}"
user: *user
image: unki/u_agent
networks:
- u_net
@ -71,18 +75,19 @@ services:
condition: service_healthy
tests_runner:
user: "${DOCKER_UID:-1000}:${DOCKER_GID:-1000}"
user: *user
image: unki/tests_runner
networks:
- u_net
volumes:
- ./:/tests/
- ../certs:/certs
- ../__Cargo_integration.toml:/tests/Cargo.toml
- ./:/tests/integration/
- ../certs:/tests/certs
- ../target/x86_64-unknown-linux-musl/${PROFILE:-debug}/u_panel:/u_panel
- ../lib/u_lib:/lib/u_lib
- ../logs:/tests/logs:rw
- ../lib/u_lib:/tests/lib/u_lib
- ../logs:/tests/integration/logs:rw
working_dir:
/tests/
/tests/integration/
depends_on:
u_agent:
condition: service_started

@ -1,4 +1,5 @@
import subprocess
import shlex
from utils import *
@ -78,3 +79,81 @@ def rebuild_images_if_needed(force_rebuild=False):
if force_rebuild:
cmd += ['--no-cache']
docker(cmd)
class Compose:
ALL_IMAGES = [
'u_agent',
'u_server',
'u_db',
'tests_runner',
]
def __init__(self):
self.container_tpl = 'integration_%s_%d'
self.cmd_container = self.container_tpl % ('tests_runner', 1)
self.ALL_CONTAINERS = [self.container_tpl %
(c, 1) for c in self.ALL_IMAGES]
self.scaled_svc = {}
self.scale("u_agent", 2)
def scale(self, svc, count):
for c in range(1, count):
new_container = self.container_tpl % (svc, c + 1)
self.ALL_CONTAINERS.append(new_container)
self.scaled_svc[svc] = count
def _call(self, *args):
cmd = [
'docker-compose',
'--ansi=never',
] + list(args)
log(f'Running docker-compose command: {cmd}')
subprocess.check_call(cmd)
def up(self):
log(f'Instanciating cluster: {self.ALL_CONTAINERS}')
scaled = [f"{k}={v}" for k, v in self.scaled_svc.items()]
if len(scaled) > 0:
scaled.insert(0, '--scale')
self._call('up', '-d', *scaled)
def down(self):
log('Shutting down cluster')
self._call('down')
def stop(self):
log('Stopping cluster')
self._call('stop')
def run(self, cmd):
container = self.cmd_container
if isinstance(cmd, str):
cmd = shlex.split(cmd)
result = docker([
'exec',
'-ti',
container
] + cmd)
return result
def is_alive(self):
log('Check if all containers are alive')
errors = check_state(self.ALL_CONTAINERS)
if errors:
print_errors(errors)
raise TestsError('Error during `is_alive` check')
else:
log('All containers are alive')
def print_containers_logs(self):
for container in self.ALL_CONTAINERS:
try:
docker([
'logs',
container
])
except Exception:
pass

@ -1,72 +0,0 @@
import subprocess
import shlex
from utils import *
from docker import docker, check_state, print_errors
class Compose:
ALL_CONTAINERS = [
'u_agent',
'u_server',
'u_db',
'tests_runner',
]
def __init__(self):
self.container_tpl = 'integration_%s_%d'
self.cmd_container = self.container_tpl % ('tests_runner', 1)
self.ALL_CONTAINERS = [self.container_tpl %
(c, 1) for c in self.ALL_CONTAINERS]
self.scaled_svc = {}
self.scale("u_agent", 2)
def scale(self, svc, count):
for c in range(1, count):
new_container = self.container_tpl % (svc, c + 1)
self.ALL_CONTAINERS.append(new_container)
self.scaled_svc[svc] = count
def _call(self, *args):
cmd = [
'docker-compose',
'--ansi=never',
] + list(args)
log(f'Running docker-compose command: {cmd}')
subprocess.check_call(cmd)
def up(self):
log('Instanciating cluster')
scaled = [f"{k}={v}" for k, v in self.scaled_svc.items()]
if len(scaled) > 0:
scaled.insert(0, '--scale')
self._call('up', '-d', *scaled)
def down(self):
log('Shutting down cluster')
self._call('down')
def stop(self):
log('Stopping cluster')
self._call('stop')
def run(self, cmd):
container = self.cmd_container
if isinstance(cmd, str):
cmd = shlex.split(cmd)
result = docker([
'exec',
'-ti',
container
] + cmd)
return result
def is_alive(self):
log('Check if all containers are alive')
errors = check_state(self.ALL_CONTAINERS)
if errors:
print_errors(errors)
raise TestsError('Error during `is_alive` check')
else:
log('All containers are alive')

@ -1,49 +1,68 @@
import signal
import sys
from utils import *
from docker import rebuild_images_if_needed
from docker_compose import Compose
import toml
from docker import rebuild_images_if_needed, Compose
from pathlib import Path
from utils import *
cluster = Compose()
CARGO_INTEGRATION_TOML = Path('../__Cargo_integration.toml')
CLUSTER = Compose()
def abort_handler(s, _):
warn(f'Received signal: {s}')
warn(f'Gracefully stopping...')
cluster.down()
def fail(msg):
err(msg)
sys.exit(1)
def usage_exit():
usage = f"""Usage:
python {__file__.split('/')[-1]} [--rebuild] [--preserve] [--no-run]"""
print(usage)
sys.exit(1)
fail(usage)
def create_integration_workspace():
if CARGO_INTEGRATION_TOML.exists():
CARGO_INTEGRATION_TOML.unlink()
workspace = toml.load('../Cargo.toml')
workspace['workspace']['members'] = ['integration']
with open(CARGO_INTEGRATION_TOML, 'w') as fo:
toml.dump(workspace, fo)
def run_tests():
allowed_args = set(["--rebuild", "--preserve", "--no-run"])
allowed_args = set(["--rebuild", "--preserve", "--no-run", "--release"])
args = sys.argv[1:]
if not set(args).issubset(allowed_args):
usage_exit()
force_rebuild = '--rebuild' in args
preserve_containers = '--preserve' in args
only_setup_cluster = '--no-run' in args
def _cleanup():
if not preserve_containers and not only_setup_cluster:
CLUSTER.down()
CARGO_INTEGRATION_TOML.unlink(missing_ok=True)
def abort_handler(s, _):
warn(f'Received signal: {s}, gracefully stopping...')
_cleanup()
for s in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP):
signal.signal(s, abort_handler)
rebuild_images_if_needed(force_rebuild)
create_integration_workspace()
try:
cluster.up()
cluster.is_alive()
CLUSTER.up()
CLUSTER.is_alive()
if not only_setup_cluster:
cluster.run('cargo test --test integration')
CLUSTER.run('cargo test --test integration')
except Exception as e:
err(e)
sys.exit(1)
# CLUSTER.print_containers_logs()
fail(e)
finally:
if not preserve_containers and not only_setup_cluster:
cluster.down()
_cleanup()
if __name__ == '__main__':

@ -24,12 +24,13 @@ pub async fn register_agent() -> RegisteredAgent {
.pop()
.unwrap();
let job_id = resp.job_id;
let resp = cli.get_jobs(Some(job_id)).await.unwrap().pop().unwrap();
assert_eq!(resp.alias, Some("agent_hello".to_string()));
let agent_data = Agent {
let job = cli.get_jobs(Some(job_id)).await.unwrap().pop().unwrap();
assert_eq!(job.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from(&job);
agent_data.set_result(&Agent {
id: agent_uid,
..Default::default()
};
cli.report(&[Reportable::Agent(agent_data)]).await.unwrap();
});
cli.report(Reportable::Assigned(agent_data)).await.unwrap();
RegisteredAgent { uid: agent_uid }
}

@ -7,37 +7,34 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dotenv = "0.15.0"
serde = { version = "1.0.114", features = ["derive"] }
uuid = { version = "0.6.5", features = ["serde", "v4"] }
nix = "0.17"
libc = "^0.2"
lazy_static = "1.4.0"
futures = "0.3.5"
thiserror = "*"
diesel-derive-enum = { version = "1", features = ["postgres"], optional = true }
anyhow = "1.0.58"
chrono = "0.4.19"
strum = { version = "0.20", features = ["derive"] }
once_cell = "1.7.2"
shlex = "1.0.0"
crossbeam = "0.8.1"
diesel = { version = "1.4.5", features = ["postgres", "uuid"], optional = true }
diesel-derive-enum = { version = "1", features = ["postgres"], optional = true }
dotenv = "0.15.0"
envy = "0.4.2"
serde_json = "1.0.81"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
tracing-appender = "0.2.2"
futures = "0.3.5"
guess_host_triple = "0.1.2"
libc = "^0.2"
lazy_static = "1.4.0"
log = "*"
anyhow = "1.0.58"
nix = "0.17"
once_cell = "1.7.2"
platforms = "3.0.1"
reqwest = { workspace = true, features = ["native-tls"] }
shlex = "1.0.0"
serde = { workspace = true }
serde_json = { workspace = true }
strum = { version = "0.20", features = ["derive"] }
thiserror = "*"
tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "process", "time"] }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
uuid = { workspace = true, features = ["serde", "v4"] }
[features]
panel = []
server = ["dep:diesel", "dep:diesel-derive-enum"]
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
reqwest = { version = "0.11", features = ["json", "native-tls"] }
tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] }
guess_host_triple = "0.1.2"
[dev-dependencies]
rstest = "0.12"

@ -5,7 +5,7 @@ use crate::{
config::MASTER_PORT,
messaging::{self, AsMsg, BaseMessage, Empty},
models::{self},
utils::opt_to_string,
utils::{opt_to_string, OneOrVec},
UError,
};
use anyhow::{Context, Result};
@ -78,14 +78,14 @@ impl ClientHandler {
}
// get jobs for client
pub async fn get_personal_jobs(&self, url_param: Uuid) -> Result<Vec<models::AssignedJob>> {
pub async fn get_personal_jobs(&self, url_param: Uuid) -> Result<Vec<models::AssignedJobById>> {
self._req(format!("get_personal_jobs/{}", url_param), Empty)
.await
}
// send something to server
pub async fn report(&self, payload: &[messaging::Reportable]) -> Result<Empty> {
self._req("report", payload).await
pub async fn report(&self, payload: impl OneOrVec<messaging::Reportable>) -> Result<Empty> {
self._req("report", payload.into_vec()).await
}
// download file
@ -115,8 +115,8 @@ impl ClientHandler {
}
/// create and upload job
pub async fn upload_jobs(&self, payload: &[models::JobMeta]) -> Result<Empty> {
self._req("upload_jobs", payload).await
pub async fn upload_jobs(&self, payload: impl OneOrVec<models::JobMeta>) -> Result<Empty> {
self._req("upload_jobs", payload.into_vec()).await
}
/// delete something
@ -125,8 +125,13 @@ impl ClientHandler {
}
/// set jobs for any agent
pub async fn set_jobs(&self, agent: Uuid, job_idents: &[String]) -> Result<Vec<Uuid>> {
self._req(format!("set_jobs/{agent}"), job_idents).await
pub async fn set_jobs(
&self,
agent: Uuid,
job_idents: impl OneOrVec<String>,
) -> Result<Vec<Uuid>> {
self._req(format!("set_jobs/{agent}"), job_idents.into_vec())
.await
}
/// get jobs for any agent

@ -1,3 +1,4 @@
use crate::UError;
use anyhow::Error;
use once_cell::sync::OnceCell;
use tokio::sync::mpsc::{channel, error::TryRecvError, Receiver, Sender};
@ -28,9 +29,9 @@ impl ErrChan {
Self::get().await.tx.try_send(err).unwrap();
}
pub async fn recv() -> Option<ChanError> {
pub async fn recv() -> Option<UError> {
match Self::get().await.rx.try_recv() {
Ok(r) => Some(r),
Ok(err) => Some(UError::from(err)),
Err(TryRecvError::Disconnected) => panic!("err chan disconnected"),
Err(TryRecvError::Empty) => None,
}

@ -6,7 +6,7 @@ use uuid::Uuid;
pub type UResult<T> = std::result::Result<T, UError>;
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
#[derive(PartialEq, Eq, Error, Debug, Serialize, Deserialize, Clone)]
pub enum UError {
#[error("Runtime error: {0}")]
Runtime(String),
@ -60,3 +60,12 @@ impl From<serde_json::Error> for UError {
UError::DeserializeError(e.to_string())
}
}
impl From<anyhow::Error> for UError {
fn from(e: anyhow::Error) -> Self {
match e.downcast::<UError>() {
Ok(err) => err,
Err(err) => UError::Runtime(err.to_string()),
}
}
}

@ -1,15 +1,16 @@
use crate::{messaging::Reportable, utils::OneOrVec};
use crate::{models::AssignedJob, UResult};
use futures::{future::BoxFuture, lock::Mutex};
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::future::Future;
use tokio::{
spawn,
runtime::Handle,
sync::mpsc::{channel, Receiver, Sender},
task::JoinHandle,
task::{spawn, spawn_blocking, JoinHandle},
};
use uuid::Uuid;
pub type DynFut = BoxFuture<'static, Reportable>;
pub type ExecResult = UResult<AssignedJob>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
@ -21,33 +22,45 @@ lazy_static! {
}
struct JoinInfo {
handle: JoinHandle<Reportable>,
handle: JoinHandle<JoinHandle<ExecResult>>,
completed: bool,
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
}
impl JoinInfo {
async fn wait_result(self) -> ExecResult {
self.handle.await.unwrap().await.unwrap()
}
}
fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone()
}
pub struct Waiter {
tasks: Vec<DynFut>,
tasks: Vec<BoxFuture<'static, ExecResult>>,
fids: Vec<Uuid>,
}
impl Waiter {
pub fn new(tasks: impl OneOrVec<DynFut>) -> Self {
pub fn new() -> Self {
Self {
tasks: tasks.into_vec(),
tasks: vec![],
fids: vec![],
}
}
pub fn push(&mut self, task: impl Future<Output = ExecResult> + Send + 'static) {
self.tasks.push(Box::pin(task));
}
/// Spawn prepared tasks
pub async fn spawn(mut self) -> Self {
let collectable = true; //TODO: self.tasks.len() != 1;
for f in self.tasks.drain(..) {
let tx = get_sender();
let handle = Handle::current();
let fid = Uuid::new_v4();
let tx = get_sender();
self.fids.push(fid);
let task_wrapper = async move {
debug!("inside wrapper (started): {}", fid);
@ -56,7 +69,7 @@ impl Waiter {
result
};
let handler = JoinInfo {
handle: spawn(task_wrapper),
handle: spawn_blocking(move || handle.spawn(task_wrapper)),
completed: false,
collectable,
};
@ -68,12 +81,11 @@ impl Waiter {
/// Wait until a bunch of tasks is finished.
/// NOT GUARANTEED that all tasks will be returned due to
/// possibility to pop them in other places
pub async fn wait(self) -> Vec<Reportable> {
pub async fn wait(self) -> Vec<ExecResult> {
let mut result = vec![];
for fid in self.fids {
if let Some(task) = pop_task(fid).await {
let r = task.handle.await;
result.push(r.unwrap());
result.push(task.wait_result().await);
}
}
result
@ -94,26 +106,25 @@ async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
pub async fn pop_task_if_completed(fid: Uuid) -> Option<Reportable> {
let &mut JoinInfo {
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> {
let &JoinInfo {
handle: _,
collectable,
completed,
} = match FUT_RESULTS.lock().await.get_mut(&fid) {
} = match FUT_RESULTS.lock().await.get(&fid) {
Some(t) => t,
None => return None,
};
if collectable && completed {
let task = pop_task(fid).await.unwrap();
let result = task.handle.await.unwrap();
Some(result)
Some(task.wait_result().await)
} else {
None
}
}
pub async fn pop_completed() -> Vec<Reportable> {
let mut completed: Vec<Reportable> = vec![];
pub async fn pop_completed() -> Vec<ExecResult> {
let mut completed: Vec<ExecResult> = vec![];
let fids = FUT_RESULTS
.lock()
.await

@ -3,7 +3,6 @@
#[path = "."]
pub mod exports {
pub mod api;
pub mod builder;
pub mod cache;
pub mod config;
pub mod datatypes;
@ -12,6 +11,7 @@ pub mod exports {
pub mod logging;
pub mod messaging;
pub mod models;
pub mod runner;
pub mod utils;
}

@ -1,8 +1,8 @@
use crate::config::get_self_uid;
use crate::utils::VecDisplay;
//use crate::utils::VecDisplay;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fmt::Display;
//use std::fmt::Display;
use uuid::Uuid;
pub struct Moo<'cow, T: AsMsg + Clone>(pub Cow<'cow, T>);
@ -28,7 +28,6 @@ impl<'cow, M: AsMsg> From<&'cow M> for Moo<'cow, M> {
}
impl<M: AsMsg> AsMsg for Vec<M> {}
impl<M: AsMsg + Display> AsMsg for VecDisplay<M> {}
impl<'msg, M: AsMsg> AsMsg for &'msg [M] {}
#[derive(Serialize, Deserialize, Debug)]

@ -2,14 +2,15 @@ mod base;
mod files;
use crate::models::*;
use crate::UError;
pub use base::{AsMsg, BaseMessage};
pub use files::*;
use serde::{Deserialize, Serialize};
use std::fmt;
use uuid::Uuid;
impl AsMsg for Agent {}
impl AsMsg for AssignedJob {}
impl AsMsg for AssignedJobById {}
impl AsMsg for DownloadInfo {}
impl AsMsg for Reportable {}
impl AsMsg for JobMeta {}
@ -22,16 +23,9 @@ impl AsMsg for u8 {}
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
pub struct Empty;
impl fmt::Display for Empty {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "<empty>")
}
}
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)]
pub enum Reportable {
Assigned(AssignedJob),
Agent(Agent),
Dummy,
Error(String),
Error(UError),
}

@ -6,14 +6,13 @@ use serde::{Deserialize, Serialize};
use std::{fmt, time::SystemTime};
use strum::Display;
#[cfg(not(target_arch = "wasm32"))]
use crate::builder::NamedJobBuilder;
#[cfg(feature = "server")]
use crate::models::schema::*;
use crate::{
config::get_self_uid,
messaging::Reportable,
unwrap_enum,
executor::ExecResult,
runner::NamedJobRunner,
utils::{systime_to_string, Platform},
};
@ -43,6 +42,8 @@ pub struct Agent {
pub alias: Option<String>,
pub hostname: String,
pub id: Uuid,
pub ip_gray: Option<String>,
pub ip_white: Option<String>,
pub is_root: bool,
pub is_root_allowed: bool,
pub last_active: SystemTime,
@ -55,18 +56,17 @@ pub struct Agent {
impl fmt::Display for Agent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Agent: {}", self.id);
write!(f, "Agent: {}", self.id)?;
if let Some(ref alias) = self.alias {
out += &format!(" ({})", alias)
write!(f, " ({})", alias)?
}
out += &format!("\nUsername: {}", self.username);
out += &format!("\nHostname: {}", self.hostname);
out += &format!("\nIs root: {}", self.is_root);
out += &format!("\nRoot allowed: {}", self.is_root_allowed);
out += &format!("\nLast active: {}", systime_to_string(&self.last_active));
out += &format!("\nPlatform: {}", self.platform);
out += &format!("\nState: {}", self.state);
write!(f, "{}", out)
writeln!(f, "\nUsername: {}", self.username)?;
writeln!(f, "Hostname: {}", self.hostname)?;
writeln!(f, "Is root: {}", self.is_root)?;
writeln!(f, "Root allowed: {}", self.is_root_allowed)?;
writeln!(f, "Last active: {}", systime_to_string(&self.last_active))?;
writeln!(f, "Platform: {}", self.platform)?;
writeln!(f, "State: {}", self.state)
}
}
@ -81,7 +81,7 @@ impl Agent {
#[cfg(unix)]
pub async fn gather() -> Self {
let mut builder = NamedJobBuilder::from_shell(vec![
let mut builder = NamedJobRunner::from_shell(vec![
("hostname", "hostname"),
("is_root", "id -u"),
("username", "id -un"),
@ -89,10 +89,8 @@ impl Agent {
.unwrap_one()
.wait()
.await;
let decoder = |job_result: Reportable| {
let assoc_job = unwrap_enum!(job_result, Reportable::Assigned);
assoc_job.to_string_result().trim().to_string()
};
let decoder =
|job_result: ExecResult| job_result.unwrap().to_string_result().trim().to_string();
Self {
hostname: decoder(builder.pop("hostname")),
@ -105,11 +103,11 @@ impl Agent {
#[cfg(not(unix))]
pub async fn gather() -> Self {
Self::default()
todo!()
}
pub async fn run() -> Reportable {
Reportable::Agent(Agent::gather().await)
pub async fn run() -> Agent {
Agent::gather().await
}
}
@ -127,6 +125,8 @@ impl Default for Agent {
state: AgentState::New,
token: None,
username: String::new(),
ip_gray: None,
ip_white: None,
}
}
}

@ -1,18 +1,12 @@
use super::JobState;
use super::{JobMeta, JobState, JobType};
#[cfg(feature = "server")]
use crate::models::schema::*;
#[cfg(not(target_arch = "wasm32"))]
use crate::{cache::JobCache, utils::TempFile};
use crate::{
config::get_self_uid,
errors::UError,
messaging::Reportable,
utils::{systime_to_string, ProcOutput},
};
use crate::{config::get_self_uid, utils::ProcOutput};
#[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use std::{fmt, time::SystemTime};
use std::time::SystemTime;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -21,7 +15,6 @@ use uuid::Uuid;
derive(Queryable, Identifiable, Insertable, AsChangeset),
table_name = "results"
)]
pub struct AssignedJob {
pub agent_id: Uuid,
pub alias: Option<String>,
@ -30,29 +23,50 @@ pub struct AssignedJob {
pub job_id: Uuid,
pub result: Option<Vec<u8>>,
pub state: JobState,
pub exec_type: JobType,
pub retcode: Option<i32>,
pub updated: SystemTime,
}
impl fmt::Display for AssignedJob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Result: {}", self.id);
out += &format!("\nAgent: {}", self.agent_id);
out += &format!("\nJob: {}", self.job_id);
if let Some(ref alias) = self.alias {
out += &format!("\nAlias: {}", alias);
#[derive(Serialize, Deserialize, Clone, Copy)]
pub struct AssignedJobById {
pub agent_id: Uuid,
pub id: Uuid,
pub job_id: Uuid,
}
impl From<(&JobMeta, AssignedJobById)> for AssignedJob {
fn from((meta, pj): (&JobMeta, AssignedJobById)) -> Self {
AssignedJob {
id: pj.id,
agent_id: pj.agent_id,
job_id: pj.job_id,
alias: meta.alias.clone(),
exec_type: meta.exec_type,
..Default::default()
}
}
}
impl From<&JobMeta> for AssignedJob {
fn from(meta: &JobMeta) -> Self {
AssignedJob {
job_id: meta.id,
agent_id: get_self_uid(),
alias: meta.alias.clone(),
exec_type: meta.exec_type,
..Default::default()
}
out += &format!("\nUpdated: {}", systime_to_string(&self.updated));
out += &format!("\nState: {}", self.state);
if self.state == JobState::Finished {
if let Some(ref retcode) = self.retcode {
out += &format!("\nReturn code: {}", retcode);
}
if let Some(ref result) = self.result {
out += &format!("\nResult: {}", String::from_utf8_lossy(result));
}
}
}
impl Default for AssignedJobById {
fn default() -> Self {
Self {
agent_id: get_self_uid(),
id: Uuid::new_v4(),
job_id: Uuid::nil(),
}
write!(f, "{}", out)
}
}
@ -68,58 +82,13 @@ impl Default for AssignedJob {
state: JobState::Queued,
retcode: None,
updated: SystemTime::now(),
exec_type: JobType::default(),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl AssignedJob {
pub async fn run(mut self) -> Reportable {
use tokio::process::Command;
let (argv, _payload) = {
let meta = JobCache::get(self.job_id).unwrap();
if let Some(ref payload) = meta.payload {
let extracted_payload = match TempFile::write_exec(payload) {
Ok(p) => p,
Err(e) => return Reportable::Error(UError::Runtime(e.to_string()).to_string()),
};
(
meta.argv.replace("{}", &extracted_payload.get_path()),
Some(extracted_payload),
)
} else {
(meta.argv.clone(), None)
}
};
let mut split_cmd = shlex::split(&argv).unwrap().into_iter();
let cmd = split_cmd.nth(0).unwrap();
let args = split_cmd.collect::<Vec<String>>();
let cmd_result = Command::new(cmd).args(args).output().await;
let (data, retcode) = match cmd_result {
Ok(output) => (
ProcOutput::from_output(&output).into_combined(),
output.status.code(),
),
Err(e) => (
ProcOutput::new()
.stderr(e.to_string().into_bytes())
.into_combined(),
None,
),
};
self.result = Some(data);
self.retcode = retcode;
Reportable::Assigned(self)
}
pub fn new(job_id: Uuid, other: Option<&Self>) -> Self {
Self {
agent_id: get_self_uid(),
job_id,
..other.unwrap_or(&Default::default()).clone()
}
}
pub fn as_job_output(&self) -> Option<ProcOutput> {
self.result
.as_ref()
@ -139,4 +108,8 @@ impl AssignedJob {
pub fn to_string_result(&self) -> String {
String::from_utf8_lossy(&self.to_raw_result()).into_owned()
}
pub fn set_result<S: Serialize>(&mut self, result: &S) {
self.result = Some(serde_json::to_vec(result).unwrap());
}
}

@ -2,13 +2,11 @@ use super::JobType;
#[cfg(feature = "server")]
use crate::models::schema::*;
use crate::utils::Platform;
use crate::{utils::Stripped, UError, UResult};
use crate::{UError, UResult};
#[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::str::from_utf8;
use std::{fmt, fs};
use std::fs;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -18,37 +16,37 @@ use uuid::Uuid;
table_name = "jobs"
)]
pub struct JobMeta {
#[serde(default)]
pub alias: Option<String>,
/// string like `bash -c {} -a 1 --arg2`,
/// where {} is replaced by executable's tmp path
#[serde(default)]
pub argv: String,
#[serde(default = "Uuid::new_v4")]
pub id: Uuid,
pub exec_type: JobType,
//pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Vec<u8>>,
}
#[derive(Deserialize, Debug)]
pub struct RawJobMeta {
#[serde(default)]
pub alias: Option<String>,
#[serde(default)]
pub argv: String,
#[serde(default)]
pub id: UuidDefaultUnique,
#[serde(default)]
pub exec_type: JobType,
//pub schedule: JobSchedule,
///target triple
#[serde(default)]
pub platform: Platform,
pub platform: String,
#[serde(default)]
pub payload: Option<Vec<u8>>,
/// if payload should be read from external resource
#[serde(default)]
pub payload_path: Option<PathBuf>,
pub payload_path: Option<String>,
///cron-like string
#[serde(default)]
pub schedule: Option<String>,
}
impl RawJobMeta {
impl JobMeta {
pub fn builder() -> JobMetaBuilder {
JobMetaBuilder::default()
}
@ -62,29 +60,6 @@ impl RawJobMeta {
}
}
impl fmt::Display for JobMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Job: {}", self.id);
if let Some(ref alias) = self.alias {
out += &format!(" ({})", alias);
}
out += &format!("\nArgv: {}", self.argv);
out += &format!("\nExecutable type: {}", self.exec_type);
out += &format!("\nPlatform: {}", self.platform);
if let Some(ref payload) = self.payload {
if self.exec_type == JobType::Shell {
let payload = if let Ok(str_payload) = from_utf8(payload) {
Stripped(&str_payload).to_string()
} else {
Stripped(&payload).to_string()
};
out += &format!("\nPayload: {}", payload);
}
}
write!(f, "{}", out)
}
}
impl Default for JobMeta {
fn default() -> Self {
Self {
@ -94,19 +69,7 @@ impl Default for JobMeta {
exec_type: JobType::Shell,
platform: Platform::current().into_string(),
payload: None,
}
}
}
impl Default for RawJobMeta {
fn default() -> Self {
Self {
id: UuidDefaultUnique::default(),
alias: None,
argv: String::new(),
exec_type: JobType::Shell,
platform: Platform::current(),
payload: None,
schedule: None,
payload_path: None,
}
}
@ -114,12 +77,13 @@ impl Default for RawJobMeta {
#[derive(Default)]
pub struct JobMetaBuilder {
inner: RawJobMeta,
inner: JobMeta,
}
impl JobMetaBuilder {
pub fn with_shell(mut self, shell_cmd: impl Into<String>) -> Self {
self.inner.argv = shell_cmd.into();
self.inner.exec_type = JobType::Shell;
self
}
@ -128,7 +92,7 @@ impl JobMetaBuilder {
self
}
pub fn with_payload_src(mut self, path: impl Into<PathBuf>) -> Self {
pub fn with_payload_src(mut self, path: impl Into<String>) -> Self {
self.inner.payload_path = Some(path.into());
self
}
@ -148,6 +112,7 @@ impl JobMetaBuilder {
match inner.exec_type {
JobType::Shell => {
if inner.argv.is_empty() {
// TODO: fix detecting
inner.argv = String::from("/bin/bash -c {}")
}
let argv_parts =
@ -157,9 +122,8 @@ impl JobMetaBuilder {
return Err(empty_err.into());
}
if let Some(path) = &inner.payload_path {
let data = fs::read(path.clone()).map_err(|e| {
UError::FSError(path.to_string_lossy().to_string(), e.to_string())
})?;
let data = fs::read(path)
.map_err(|e| UError::FSError(path.to_string(), e.to_string()))?;
inner.payload = Some(data)
}
match inner.payload.as_ref() {
@ -181,38 +145,15 @@ impl JobMetaBuilder {
}
}
};
if !inner.platform.check() {
if !Platform::new(&inner.platform).check() {
return Err(UError::JobArgsError(format!(
"Unknown platform {}",
inner.platform.into_string()
inner.platform
)));
}
Ok(inner.into())
}
JobType::Manage => Ok(inner.into()),
_ => todo!(),
}
}
}
impl From<RawJobMeta> for JobMeta {
fn from(rjm: RawJobMeta) -> Self {
JobMeta {
alias: rjm.alias,
argv: rjm.argv,
id: rjm.id.0,
exec_type: rjm.exec_type,
platform: rjm.platform.into_string(),
payload: rjm.payload,
_ => Ok(inner.into()),
}
}
}
#[derive(Deserialize, Debug)]
pub struct UuidDefaultUnique(Uuid);
impl Default for UuidDefaultUnique {
fn default() -> Self {
Self(Uuid::new_v4())
}
}

@ -3,21 +3,6 @@ use diesel_derive_enum::DbEnum;
use serde::{Deserialize, Serialize};
use strum::Display;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
Ping,
UpdateAvailable,
JobsResultsRequest,
Terminate,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobSchedule {
Once,
Permanent,
//Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)]
#[cfg_attr(
feature = "server",
@ -26,13 +11,17 @@ pub enum JobSchedule {
DieselType = "Jobstate"
)]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
//Pending, // client got a job, but not running yet
Running, // client is currently running a job
/// server created a job, but client didn't get it yet
Queued,
// client got a job, but not running yet
//Pending,
/// client is currently running a job
Running,
Finished,
}
#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq, Display)]
#[derive(Default, Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Display)]
#[cfg_attr(
feature = "server",
derive(DbEnum),
@ -40,8 +29,9 @@ pub enum JobState {
DieselType = "Jobtype"
)]
pub enum JobType {
Manage,
Init,
#[default]
Shell,
Python,
Terminate,
Update,
}

@ -5,6 +5,8 @@ table! {
alias -> Nullable<Text>,
hostname -> Text,
id -> Uuid,
ip_gray -> Nullable<Text>,
ip_white -> Nullable<Text>,
is_root -> Bool,
is_root_allowed -> Bool,
last_active -> Timestamp,
@ -26,32 +28,6 @@ table! {
}
}
table! {
use crate::schema_exports::*;
errors (id) {
agent_id -> Uuid,
created -> Timestamp,
id -> Uuid,
msg -> Nullable<Text>,
}
}
table! {
use crate::schema_exports::*;
ip_addrs (id) {
agent_id -> Uuid,
check_ts -> Timestamp,
gateway -> Nullable<Text>,
id -> Uuid,
iface -> Text,
ip_addr -> Text,
is_gray -> Bool,
netmask -> Text,
}
}
table! {
use crate::schema_exports::*;
@ -62,6 +38,8 @@ table! {
exec_type -> Jobtype,
platform -> Text,
payload -> Nullable<Bytea>,
payload_path -> Nullable<Text>,
schedule -> Nullable<Text>,
}
}
@ -76,22 +54,14 @@ table! {
job_id -> Uuid,
result -> Nullable<Bytea>,
state -> Jobstate,
exec_type -> Jobtype,
retcode -> Nullable<Int4>,
updated -> Timestamp,
}
}
joinable!(certificates -> agents (agent_id));
joinable!(errors -> agents (agent_id));
joinable!(ip_addrs -> agents (agent_id));
joinable!(results -> agents (agent_id));
joinable!(results -> jobs (job_id));
allow_tables_to_appear_in_same_query!(
agents,
certificates,
errors,
ip_addrs,
jobs,
results,
);
allow_tables_to_appear_in_same_query!(agents, certificates, jobs, results,);

@ -1,97 +1,136 @@
use crate::{
cache::JobCache,
executor::{DynFut, Waiter},
messaging::Reportable,
models::{Agent, AssignedJob, JobMeta, JobType, RawJobMeta},
executor::{ExecResult, Waiter},
models::{Agent, AssignedJob, AssignedJobById, JobMeta, JobType},
utils::{CombinedResult, OneOrVec, Platform},
utils::{ProcOutput, TempFile},
UError, UResult,
};
use std::collections::HashMap;
use std::process::exit;
use tokio::process::Command;
pub struct JobBuilder {
pub struct JobRunner {
waiter: Waiter,
}
impl JobBuilder {
pub fn from_request(job_requests: impl OneOrVec<AssignedJob>) -> CombinedResult<Self> {
let job_requests = job_requests.into_vec();
let mut prepared: Vec<DynFut> = vec![];
let mut result = CombinedResult::<JobBuilder, _>::new();
for req in job_requests {
let job_meta = JobCache::get(req.job_id);
if job_meta.is_none() {
result.err(UError::NoJob(req.job_id));
continue;
}
let job_meta = job_meta.unwrap();
impl JobRunner {
pub fn from_jobs(jobs: impl OneOrVec<AssignedJobById>) -> CombinedResult<Self> {
let jobs = jobs.into_vec();
let mut waiter = Waiter::new();
let mut result = CombinedResult::<Self, _>::new();
for job in jobs {
//waiting for try-blocks stabilization
let built_req = (|| -> UResult<()> {
Ok(match job_meta.exec_type {
JobType::Shell => {
let meta = JobCache::get(req.job_id).ok_or(UError::NoJob(req.job_id))?;
let curr_platform = Platform::current();
if !curr_platform.matches(&meta.platform) {
return Err(UError::InsuitablePlatform(
meta.platform.clone(),
curr_platform.into_string(),
)
.into());
}
let job = AssignedJob::new(req.job_id, Some(&req));
prepared.push(Box::pin(job.run()))
}
JobType::Manage => prepared.push(Box::pin(Agent::run())),
_ => todo!(),
})
let built_job = (|| -> UResult<()> {
let meta = JobCache::get(job.job_id).ok_or(UError::NoJob(job.job_id))?;
let curr_platform = Platform::current();
if !curr_platform.matches(&meta.platform) {
return Err(UError::InsuitablePlatform(
meta.platform.clone(),
curr_platform.into_string(),
)
.into());
}
let job = AssignedJob::from((&*meta, job));
waiter.push(run_assigned_job(job));
Ok(())
})();
if let Err(e) = built_req {
if let Err(e) = built_job {
result.err(e)
}
}
result.ok(Self {
waiter: Waiter::new(prepared),
});
result.ok(Self { waiter });
result
}
pub fn from_meta(job_metas: impl OneOrVec<JobMeta>) -> CombinedResult<Self> {
let job_requests = job_metas
pub fn from_meta(metas: impl OneOrVec<JobMeta>) -> CombinedResult<Self> {
let jobs = metas
.into_vec()
.into_iter()
.map(|jm| {
let j_uid = jm.id;
let job_uid = jm.id;
JobCache::insert(jm);
AssignedJob::new(j_uid, None)
AssignedJobById {
job_id: job_uid,
..Default::default()
}
})
.collect::<Vec<AssignedJob>>();
JobBuilder::from_request(job_requests)
.collect::<Vec<AssignedJobById>>();
JobRunner::from_jobs(jobs)
}
/// Spawn jobs and pop results later
/// Spawn jobs
pub async fn spawn(mut self) -> Self {
self.waiter = self.waiter.spawn().await;
self
}
/// Spawn jobs and wait for result
pub async fn wait(self) -> Vec<Reportable> {
pub async fn wait(self) -> Vec<ExecResult> {
self.waiter.spawn().await.wait().await
}
/// Spawn one job and wait for result
pub async fn wait_one(self) -> Reportable {
pub async fn wait_one(self) -> ExecResult {
self.waiter.spawn().await.wait().await.pop().unwrap()
}
}
pub async fn run_assigned_job(mut job: AssignedJob) -> ExecResult {
match job.exec_type {
JobType::Shell => {
let (argv, _payload) = {
let meta = JobCache::get(job.job_id).unwrap();
if let Some(ref payload) = meta.payload {
let extracted_payload = match TempFile::write_exec(payload) {
Ok(p) => p,
Err(e) => return Err(UError::Runtime(e.to_string())),
};
(
meta.argv.replace("{}", &extracted_payload.get_path()),
Some(extracted_payload),
)
} else {
(meta.argv.clone(), None)
}
};
let mut split_cmd = shlex::split(&argv).unwrap().into_iter();
let cmd = split_cmd.nth(0).unwrap();
let args = split_cmd.collect::<Vec<String>>();
let cmd_result = Command::new(cmd).args(args).output().await;
let (data, retcode) = match cmd_result {
Ok(output) => (
ProcOutput::from_output(&output).into_combined(),
output.status.code(),
),
Err(e) => (
ProcOutput::new()
.stderr(e.to_string().into_bytes())
.into_combined(),
None,
),
};
job.result = Some(data);
job.retcode = retcode;
}
JobType::Init => {
job.set_result(&Agent::run().await);
job.retcode = Some(0);
}
JobType::Update => todo!(),
JobType::Terminate => exit(0),
};
Ok(job)
}
/// Store jobs and get results by name
pub struct NamedJobBuilder {
builder: Option<JobBuilder>,
pub struct NamedJobRunner {
builder: Option<JobRunner>,
job_names: Vec<&'static str>,
results: HashMap<&'static str, Reportable>,
results: HashMap<&'static str, ExecResult>,
}
impl NamedJobBuilder {
impl NamedJobRunner {
pub fn from_shell(
named_jobs: impl OneOrVec<(&'static str, &'static str)>,
) -> CombinedResult<Self> {
@ -100,7 +139,7 @@ impl NamedJobBuilder {
.into_vec()
.into_iter()
.filter_map(
|(alias, cmd)| match RawJobMeta::builder().with_shell(cmd).build() {
|(alias, cmd)| match JobMeta::builder().with_shell(cmd).build() {
Ok(meta) => Some((alias, meta)),
Err(e) => {
result.err(e);
@ -124,7 +163,7 @@ impl NamedJobBuilder {
})
.collect();
Self {
builder: Some(JobBuilder::from_meta(job_metas).unwrap_one()),
builder: Some(JobRunner::from_meta(job_metas).unwrap_one()),
job_names,
results: HashMap::new(),
}
@ -138,11 +177,11 @@ impl NamedJobBuilder {
self
}
pub fn pop_opt(&mut self, name: &'static str) -> Option<Reportable> {
pub fn pop_opt(&mut self, name: &'static str) -> Option<ExecResult> {
self.results.remove(name)
}
pub fn pop(&mut self, name: &'static str) -> Reportable {
pub fn pop(&mut self, name: &'static str) -> ExecResult {
self.pop_opt(name).unwrap()
}
}
@ -151,8 +190,8 @@ impl NamedJobBuilder {
mod tests {
use super::*;
use crate::{
builder::{JobBuilder, NamedJobBuilder},
models::{misc::JobType, JobMeta},
runner::{JobRunner, NamedJobRunner},
unwrap_enum,
};
use std::time::SystemTime;
@ -162,10 +201,10 @@ mod tests {
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = RawJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await;
JobRunner::from_meta(sleep_jobs).unwrap_one().wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
@ -199,13 +238,16 @@ mod tests {
#[case] payload: Option<&[u8]>,
#[case] expected_result: &str,
) -> TestResult {
let mut job = RawJobMeta::builder().with_shell(cmd);
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let job = job.build().unwrap();
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let result = unwrap_enum!(job_result, Reportable::Assigned);
let result = JobRunner::from_meta(job)
.unwrap_one()
.wait_one()
.await
.unwrap();
let result = result.to_string_result();
assert_eq!(result.trim(), expected_result);
Ok(())
@ -215,29 +257,25 @@ mod tests {
async fn test_complex_load() -> TestResult {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = RawJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = JobBuilder::from_meta(longest_job)
.unwrap_one()
.spawn()
.await;
let ls = JobBuilder::from_meta(RawJobMeta::from_shell("ls")?)
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = JobRunner::from_meta(longest_job).unwrap_one().spawn().await;
let ls = JobRunner::from_meta(JobMeta::from_shell("ls")?)
.unwrap_one()
.wait_one()
.await;
let ls = unwrap_enum!(ls, Reportable::Assigned);
.await
.unwrap();
assert_eq!(ls.retcode.unwrap(), 0);
let folders = ls.to_string_result();
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| RawJobMeta::from_shell(format!("ls {}", f)).unwrap())
.map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap())
.collect();
let ls_subfolders = JobBuilder::from_meta(subfolders_jobs)
let ls_subfolders = JobRunner::from_meta(subfolders_jobs)
.unwrap_one()
.wait()
.await;
for result in ls_subfolders {
let result = unwrap_enum!(result, Reportable::Assigned);
assert_eq!(result.retcode.unwrap(), 0);
assert_eq!(result.unwrap().retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
@ -263,9 +301,12 @@ mod tests {
*/
#[tokio::test]
async fn test_failing_shell_job() -> TestResult {
let job = RawJobMeta::from_shell("lol_kek_puk")?;
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let job_result = unwrap_enum!(job_result, Reportable::Assigned);
let job = JobMeta::from_shell("lol_kek_puk")?;
let job_result = JobRunner::from_meta(job)
.unwrap_one()
.wait_one()
.await
.unwrap();
let output = job_result.to_string_result();
assert!(output.contains("No such file"));
assert!(job_result.retcode.is_none());
@ -281,7 +322,7 @@ mod tests {
#[case] payload: Option<&[u8]>,
#[case] err_str: &str,
) -> TestResult {
let mut job = RawJobMeta::builder().with_shell(cmd);
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
@ -293,17 +334,17 @@ mod tests {
#[tokio::test]
async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobBuilder::from_meta(vec![
("sleeper", RawJobMeta::from_shell("sleep 3")?),
let mut jobs = NamedJobRunner::from_meta(vec![
("sleeper", JobMeta::from_shell("sleep 3")?),
(
"gatherer",
RawJobMeta::builder().with_type(JobType::Manage).build()?,
JobMeta::builder().with_type(JobType::Init).build()?,
),
])
.wait()
.await;
let gathered = jobs.pop("gatherer");
assert_eq!(unwrap_enum!(gathered, Reportable::Agent).alias, None);
assert_eq!(gathered.unwrap().alias, None);
Ok(())
}
}

@ -38,9 +38,10 @@ impl<'a> Strippable for &'a Vec<u8> {
self.iter()
}
}
pub struct Stripped<'inner, Inner: Strippable + 'inner>(pub &'inner Inner);
impl<'inner, Inner: Strippable + 'inner> Stripped<'inner, Inner> {
pub struct Stripped<'i, Inner: Strippable + 'i>(pub &'i Inner);
impl<'i, Inner: Strippable + 'i> Stripped<'i, Inner> {
fn iter(&self) -> Inner::TypeIter {
self.0.iterator()
}
@ -54,7 +55,7 @@ impl<'inner, Inner: Strippable + 'inner> Stripped<'inner, Inner> {
}
}
impl<'inner, Inner: Strippable + 'inner> fmt::Display for Stripped<'inner, Inner> {
impl<'i, Inner: Strippable + 'i> fmt::Display for Stripped<'i, Inner> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let placeholder = self.placeholder();
for c in self.iter().take(MAX_DATA_LEN - placeholder.len()) {

@ -10,7 +10,6 @@ pub mod storage;
pub mod tempfile;
#[cfg(unix)]
pub mod unix;
pub mod vec_display;
pub use combined_result::*;
pub use conv::*;
@ -22,7 +21,6 @@ pub use proc_output::*;
pub use storage::*;
#[cfg(not(target_arch = "wasm32"))]
pub use tempfile::*;
pub use vec_display::*;
#[cfg(unix)]
pub use unix::*;

@ -7,6 +7,10 @@ use std::str::FromStr;
pub struct Platform(String);
impl Platform {
pub fn new(p: impl Into<String>) -> Self {
Self(p.into())
}
pub fn current() -> Platform {
Self(guess_host_triple().unwrap_or("unknown").to_string())
}

@ -1,5 +1,5 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE JobType AS ENUM ('shell', 'manage', 'python');
CREATE TYPE JobType AS ENUM ('shell', 'init', 'python');
CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished');
CREATE TYPE AgentState AS ENUM ('new', 'active', 'banned');
@ -7,6 +7,8 @@ CREATE TABLE IF NOT EXISTS agents (
alias TEXT
, hostname TEXT NOT NULL
, id UUID NOT NULL DEFAULT uuid_generate_v4()
, ip_gray TEXT
, ip_white TEXT
, is_root BOOLEAN NOT NULL DEFAULT false
, is_root_allowed BOOLEAN NOT NULL DEFAULT false
, last_active TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
@ -17,20 +19,8 @@ CREATE TABLE IF NOT EXISTS agents (
-- is needed to processing requests
, token TEXT
, username TEXT NOT NULL
, PRIMARY KEY(id)
);
CREATE TABLE IF NOT EXISTS ip_addrs (
agent_id UUID NOT NULL
, check_ts TIMESTAMP NOT NULL
, gateway TEXT
, id UUID NOT NULL DEFAULT uuid_generate_v4()
, iface TEXT NOT NULL
, ip_addr TEXT NOT NULL
, is_gray BOOLEAN NOT NULL DEFAULT true
, netmask TEXT NOT NULL
, PRIMARY KEY(id)
, FOREIGN KEY(agent_id) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS jobs (
@ -40,6 +30,9 @@ CREATE TABLE IF NOT EXISTS jobs (
, exec_type JobType NOT NULL DEFAULT 'shell'
, platform TEXT NOT NULL
, payload BYTEA
, payload_path TEXT
, schedule TEXT
, PRIMARY KEY(id)
);
@ -51,8 +44,10 @@ CREATE TABLE IF NOT EXISTS results (
, job_id UUID NOT NULL
, result BYTEA
, state JobState NOT NULL DEFAULT 'queued'
, exec_type JobType NOT NULL DEFAULT 'shell'
, retcode INTEGER
, updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, FOREIGN KEY(agent_id) REFERENCES agents(id) ON DELETE CASCADE
, FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
, PRIMARY KEY(id)
@ -62,6 +57,7 @@ CREATE TABLE IF NOT EXISTS certificates (
agent_id UUID NOT NULL
, id UUID NOT NULL DEFAULT uuid_generate_v4()
, is_revoked BOOLEAN NOT NULL DEFAULT FALSE
, PRIMARY KEY(id)
, FOREIGN KEY(agent_id) REFERENCES agents(id)
);
Loading…
Cancel
Save