Compare commits
7 Commits
master
...
14-integra
Author | SHA1 | Date |
---|---|---|
plazmoid | 4bfcdd2b23 | 4 years ago |
plazmoid | 4e88f49f96 | 4 years ago |
plazmoid | 3e0c9ecd77 | 4 years ago |
plazmoid | 7735596bf0 | 4 years ago |
plazmoid | 5248ae7ac4 | 4 years ago |
plazmoid | 8cccc82fc2 | 4 years ago |
plazmoid | 5734145e8f | 4 years ago |
57 changed files with 1569 additions and 797 deletions
@ -1,3 +0,0 @@ |
||||
[build] |
||||
target = "x86_64-unknown-linux-gnu" # -musl" |
||||
|
@ -1 +1,3 @@ |
||||
export DATABASE_URL=postgres://postgres:12348756@172.17.0.2/u_db |
||||
ADMIN_AUTH_TOKEN=464af63dbd241969baa1e94b2461d94d |
||||
POSTGRES_PASSWORD=12348756 |
||||
DATABASE_URL=postgres://postgres:${POSTGRES_PASSWORD}@u_db/u_db |
||||
|
@ -1,4 +1,6 @@ |
||||
/target |
||||
target/ |
||||
**/*.rs.bk |
||||
/.idea |
||||
/data |
||||
.idea/ |
||||
data/ |
||||
static/ |
||||
**/*.pyc |
@ -0,0 +1,18 @@ |
||||
.PHONY: _pre_build debug release run clean |
||||
|
||||
CARGO=./scripts/cargo_musl.sh
|
||||
|
||||
clean: |
||||
${CARGO} clean
|
||||
|
||||
_pre_build: |
||||
docker build -t unki/musllibs ./muslrust
|
||||
|
||||
debug: _pre_build |
||||
${CARGO} build
|
||||
|
||||
release: _pre_build |
||||
${CARGO} build --release
|
||||
|
||||
run: build |
||||
${CARGO} run
|
@ -0,0 +1,88 @@ |
||||
// TODO:
|
||||
// поддержка питона
|
||||
// резолв адреса управляющего сервера через DoT
|
||||
// кроссплатформенность (реализовать интерфейс для винды и никсов)
|
||||
// проверка обнов
|
||||
// самоуничтожение
|
||||
|
||||
#[macro_use] |
||||
extern crate log; |
||||
extern crate env_logger; |
||||
|
||||
use std::env; |
||||
use tokio::time::{sleep, Duration}; |
||||
use u_lib::{ |
||||
api::ClientHandler, |
||||
builder::JobBuilder, |
||||
cache::JobCache, |
||||
executor::pop_completed, |
||||
models::{AssignedJob, ExecResult}, |
||||
UID, |
||||
//daemonize
|
||||
}; |
||||
|
||||
#[macro_export] |
||||
macro_rules! retry_until_ok { |
||||
( $body:expr ) => { |
||||
loop { |
||||
match $body { |
||||
Ok(r) => break r, |
||||
Err(e) => error!("{:?}", e), |
||||
}; |
||||
sleep(Duration::from_secs(5)).await; |
||||
} |
||||
}; |
||||
} |
||||
|
||||
pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) { |
||||
if job_requests.len() > 0 { |
||||
for jr in &job_requests { |
||||
if !JobCache::contains(&jr.job_id) { |
||||
info!("Fetching job: {}", &jr.job_id); |
||||
let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await) |
||||
.pop() |
||||
.unwrap(); |
||||
JobCache::insert(fetched_job); |
||||
} |
||||
} |
||||
info!( |
||||
"Scheduling jobs: \n{}", |
||||
job_requests |
||||
.iter() |
||||
.map(|j| j.job_id.to_string()) |
||||
.collect::<Vec<String>>() |
||||
.join("\n") |
||||
); |
||||
let mut builder = JobBuilder::from_request(job_requests); |
||||
let errors = builder.pop_errors(); |
||||
if errors.len() > 0 { |
||||
error!( |
||||
"Some errors encountered: \n{}", |
||||
errors |
||||
.iter() |
||||
.map(|j| j.to_string()) |
||||
.collect::<Vec<String>>() |
||||
.join("\n") |
||||
); |
||||
} |
||||
builder.unwrap_one().spawn().await; |
||||
} |
||||
} |
||||
|
||||
pub async fn run_forever() { |
||||
//daemonize();
|
||||
env_logger::init(); |
||||
let arg_ip = env::args().nth(1); |
||||
let instance = ClientHandler::new(arg_ip.as_deref()); |
||||
info!("Connecting to the server"); |
||||
loop { |
||||
let job_requests: Vec<AssignedJob> = |
||||
retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await).into_builtin_vec(); |
||||
process_request(job_requests, &instance).await; |
||||
let result: Vec<ExecResult> = pop_completed().await.into_iter().collect(); |
||||
if result.len() > 0 { |
||||
retry_until_ok!(instance.report(&result).await); |
||||
} |
||||
sleep(Duration::from_secs(5)).await; |
||||
} |
||||
} |
@ -1,89 +1,7 @@ |
||||
// TODO:
|
||||
// поддержка питона
|
||||
// резолв адреса управляющего сервера через DoT
|
||||
// кроссплатформенность (реализовать интерфейс для винды и никсов)
|
||||
// проверка обнов
|
||||
// самоуничтожение
|
||||
|
||||
#[macro_use] |
||||
extern crate log; |
||||
extern crate env_logger; |
||||
|
||||
use std::env; |
||||
use tokio::time::{sleep, Duration}; |
||||
use u_lib::{ |
||||
api::ClientHandler, |
||||
builder::JobBuilder, |
||||
cache::JobCache, |
||||
executor::pop_completed, |
||||
models::{AssignedJob, ExecResult}, |
||||
UID, |
||||
//daemonize
|
||||
}; |
||||
|
||||
#[macro_export] |
||||
macro_rules! retry_until_ok { |
||||
( $body:expr ) => { |
||||
loop { |
||||
match $body { |
||||
Ok(r) => break r, |
||||
Err(e) => error!("{:?}", e), |
||||
}; |
||||
sleep(Duration::from_secs(5)).await; |
||||
} |
||||
}; |
||||
} |
||||
|
||||
async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) { |
||||
if job_requests.len() > 0 { |
||||
for jr in &job_requests { |
||||
if !JobCache::contains(&jr.job_id) { |
||||
info!("Fetching job: {}", &jr.job_id); |
||||
let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await) |
||||
.pop() |
||||
.unwrap(); |
||||
JobCache::insert(fetched_job); |
||||
} |
||||
} |
||||
info!( |
||||
"Scheduling jobs: \n{}", |
||||
job_requests |
||||
.iter() |
||||
.map(|j| j.job_id.to_string()) |
||||
.collect::<Vec<String>>() |
||||
.join("\n") |
||||
); |
||||
let mut builder = JobBuilder::from_request(job_requests); |
||||
let errors = builder.pop_errors(); |
||||
if errors.len() > 0 { |
||||
error!( |
||||
"Some errors encountered: \n{}", |
||||
errors |
||||
.iter() |
||||
.map(|j| j.to_string()) |
||||
.collect::<Vec<String>>() |
||||
.join("\n") |
||||
); |
||||
} |
||||
builder.unwrap_one().spawn().await; |
||||
} |
||||
} |
||||
use tokio; |
||||
use u_agent::run_forever; |
||||
|
||||
#[tokio::main] |
||||
async fn main() { |
||||
//daemonize();
|
||||
env_logger::init(); |
||||
let arg_ip = env::args().nth(1); |
||||
let instance = ClientHandler::new(arg_ip); |
||||
info!("Connecting to the server"); |
||||
loop { |
||||
let job_requests: Vec<AssignedJob> = |
||||
retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await); |
||||
process_request(job_requests, &instance).await; |
||||
let result: Vec<ExecResult> = pop_completed().await.into_iter().collect(); |
||||
if result.len() > 0 { |
||||
retry_until_ok!(instance.report(&result).await) |
||||
} |
||||
sleep(Duration::from_secs(5)).await; |
||||
} |
||||
run_forever().await; |
||||
} |
||||
|
@ -0,0 +1,88 @@ |
||||
use crate::handlers::Endpoints; |
||||
use serde::de::DeserializeOwned; |
||||
use std::env; |
||||
use u_lib::{ |
||||
messaging::{AsMsg, BaseMessage}, |
||||
models::*, |
||||
}; |
||||
use uuid::Uuid; |
||||
use warp::{body, Filter, Rejection, Reply}; |
||||
|
||||
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone |
||||
where |
||||
M: AsMsg + Sync + Send + DeserializeOwned + 'static, |
||||
{ |
||||
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>()) |
||||
} |
||||
|
||||
pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { |
||||
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) }; |
||||
|
||||
let get_agents = warp::get() |
||||
.and(warp::path("get_agents")) |
||||
.and( |
||||
warp::path::param::<Uuid>() |
||||
.map(Some) |
||||
.or_else(infallible_none), |
||||
) |
||||
.and_then(Endpoints::get_agents); |
||||
|
||||
let upload_jobs = warp::post() |
||||
.and(warp::path("upload_jobs")) |
||||
.and(get_content::<Vec<JobMeta>>()) |
||||
.and_then(Endpoints::upload_jobs); |
||||
|
||||
let get_jobs = warp::get() |
||||
.and(warp::path("get_jobs")) |
||||
.and( |
||||
warp::path::param::<Uuid>() |
||||
.map(Some) |
||||
.or_else(infallible_none), |
||||
) |
||||
.and_then(Endpoints::get_jobs); |
||||
|
||||
let get_agent_jobs = warp::get() |
||||
.and(warp::path("get_agent_jobs")) |
||||
.and( |
||||
warp::path::param::<Uuid>() |
||||
.map(Some) |
||||
.or_else(infallible_none), |
||||
) |
||||
.and_then(|uid| Endpoints::get_agent_jobs(uid, false)); |
||||
|
||||
let get_personal_jobs = warp::get() |
||||
.and(warp::path("get_agent_jobs")) |
||||
.and(warp::path::param::<Uuid>().map(Some)) |
||||
.and_then(|uid| Endpoints::get_agent_jobs(uid, true)); |
||||
|
||||
let del = warp::get() |
||||
.and(warp::path("del")) |
||||
.and(warp::path::param::<Uuid>()) |
||||
.and_then(Endpoints::del); |
||||
|
||||
let set_jobs = warp::post() |
||||
.and(warp::path("set_jobs")) |
||||
.and(warp::path::param::<Uuid>()) |
||||
.and(get_content::<Vec<String>>()) |
||||
.and_then(Endpoints::set_jobs); |
||||
|
||||
let report = warp::post() |
||||
.and(warp::path("report")) |
||||
.and(get_content::<Vec<ExecResult>>().and_then(Endpoints::report)); |
||||
|
||||
let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); |
||||
let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); |
||||
|
||||
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); |
||||
|
||||
let auth_zone = auth_header.and( |
||||
get_agents |
||||
.or(get_jobs) |
||||
.or(upload_jobs) |
||||
.or(del) |
||||
.or(set_jobs) |
||||
.or(get_agent_jobs), |
||||
); |
||||
|
||||
agent_zone.or(auth_zone) |
||||
} |
@ -0,0 +1,102 @@ |
||||
#[macro_use] |
||||
extern crate log; |
||||
|
||||
#[macro_use] |
||||
extern crate mockall; |
||||
#[macro_use] |
||||
extern crate mockall_double; |
||||
|
||||
// because of linking errors
|
||||
extern crate openssl; |
||||
#[macro_use] |
||||
extern crate diesel; |
||||
//
|
||||
mod db; |
||||
mod filters; |
||||
mod handlers; |
||||
|
||||
use db::UDB; |
||||
use filters::make_filters; |
||||
use u_lib::{config::MASTER_PORT, models::*, utils::init_env}; |
||||
use warp::Filter; |
||||
|
||||
const LOGFILE: &str = "u_server.log"; |
||||
|
||||
fn prefill_jobs() { |
||||
let agent_hello = JobMeta::builder() |
||||
.with_type(misc::JobType::Manage) |
||||
.with_alias("agent_hello") |
||||
.build() |
||||
.unwrap(); |
||||
UDB::lock_db().insert_jobs(&[agent_hello]).ok(); |
||||
} |
||||
|
||||
fn init_logger() { |
||||
use simplelog::*; |
||||
use std::fs::OpenOptions; |
||||
let log_cfg = ConfigBuilder::new() |
||||
.set_time_format_str("%x %X") |
||||
.set_time_to_local(true) |
||||
.build(); |
||||
let logfile = OpenOptions::new() |
||||
.append(true) |
||||
.create(true) |
||||
.open(LOGFILE) |
||||
.unwrap(); |
||||
let loggers = vec![ |
||||
WriteLogger::new(LevelFilter::Info, log_cfg.clone(), logfile) as Box<dyn SharedLogger>, |
||||
TermLogger::new( |
||||
LevelFilter::Info, |
||||
log_cfg, |
||||
TerminalMode::Stderr, |
||||
ColorChoice::Auto, |
||||
), |
||||
]; |
||||
CombinedLogger::init(loggers).unwrap(); |
||||
} |
||||
|
||||
fn init_all() { |
||||
init_logger(); |
||||
init_env(); |
||||
prefill_jobs(); |
||||
} |
||||
|
||||
pub async fn serve() { |
||||
init_all(); |
||||
let routes = make_filters(); |
||||
warp::serve(routes.with(warp::log("warp"))) |
||||
.run(([0, 0, 0, 0], MASTER_PORT)) |
||||
.await; |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod tests { |
||||
use super::*; |
||||
#[double] |
||||
use crate::handlers::Endpoints; |
||||
use handlers::build_ok; |
||||
use mockall::predicate::*; |
||||
use test_case::test_case; |
||||
use uuid::Uuid; |
||||
use warp::test::request; |
||||
|
||||
#[test_case(Some(Uuid::new_v4()))] |
||||
#[test_case(None => panics)] |
||||
#[tokio::test] |
||||
async fn test_get_agent_jobs_unauthorized(uid: Option<Uuid>) { |
||||
let mock = Endpoints::get_agent_jobs_context(); |
||||
mock.expect() |
||||
.with(eq(uid), eq(uid.is_some())) |
||||
.returning(|_, _| Ok(build_ok(""))); |
||||
request() |
||||
.path(&format!( |
||||
"/get_agent_jobs/{}", |
||||
uid.map(|u| u.simple().to_string()).unwrap_or(String::new()) |
||||
)) |
||||
.method("GET") |
||||
.filter(&make_filters()) |
||||
.await |
||||
.unwrap(); |
||||
mock.checkpoint(); |
||||
} |
||||
} |
@ -0,0 +1,23 @@ |
||||
[package] |
||||
name = "integration" |
||||
version = "0.1.0" |
||||
authors = ["plazmoid <kronos44@mail.ru>"] |
||||
edition = "2018" |
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html |
||||
|
||||
[dependencies] |
||||
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] } |
||||
log = "^0.4" |
||||
env_logger = "0.8.3" |
||||
uuid = { version = "0.6.5", features = ["serde", "v4"] } |
||||
reqwest = { version = "0.11", features = ["json"] } |
||||
serde_json = "1.0" |
||||
serde = { version = "1.0.114", features = ["derive"] } |
||||
futures = "0.3.5" |
||||
shlex = "1.0.0" |
||||
|
||||
|
||||
[[test]] |
||||
name = "integration" |
||||
path = "tests/tests.rs" |
@ -0,0 +1,89 @@ |
||||
version: "2.1" |
||||
|
||||
networks: |
||||
u_net: |
||||
|
||||
services: |
||||
|
||||
u_server: |
||||
image: unki/u_server |
||||
networks: |
||||
- u_net |
||||
volumes: |
||||
- ../target/x86_64-unknown-linux-musl/release/u_server:/u_server |
||||
- ../:/unki/ |
||||
working_dir: /unki |
||||
command: bash -c "diesel setup && diesel migration run && /u_server" |
||||
depends_on: |
||||
u_db: |
||||
condition: service_healthy |
||||
expose: |
||||
- '63714' |
||||
environment: |
||||
RUST_LOG: warp=info |
||||
env_file: |
||||
- ../.env |
||||
healthcheck: |
||||
test: /bin/ss -tlpn | grep 63714 |
||||
interval: 5s |
||||
timeout: 2s |
||||
retries: 2 |
||||
|
||||
u_db: |
||||
image: unki/u_db |
||||
networks: |
||||
- u_net |
||||
expose: |
||||
- '5432' |
||||
env_file: |
||||
- ../.env |
||||
healthcheck: |
||||
test: /bin/ss -tlpn | grep 5432 |
||||
interval: 5s |
||||
timeout: 2s |
||||
retries: 2 |
||||
|
||||
u_agent_1: |
||||
image: unki/u_agent |
||||
networks: |
||||
- u_net |
||||
volumes: |
||||
- ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent |
||||
command: /u_agent u_server |
||||
depends_on: |
||||
u_server: |
||||
condition: service_healthy |
||||
|
||||
u_agent_2: |
||||
image: unki/u_agent |
||||
networks: |
||||
- u_net |
||||
volumes: |
||||
- ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent |
||||
command: /u_agent u_server |
||||
depends_on: |
||||
u_server: |
||||
condition: service_healthy |
||||
|
||||
tests_runner: |
||||
image: unki/tests_runner |
||||
networks: |
||||
- u_net |
||||
volumes: |
||||
- ./:/tests/ |
||||
- ../target/x86_64-unknown-linux-musl/release/u_panel:/u_panel |
||||
- ~/.cargo/registry:/root/.cargo/registry |
||||
working_dir: |
||||
/tests/ |
||||
env_file: |
||||
- ../.env |
||||
depends_on: |
||||
u_agent_1: |
||||
condition: service_started |
||||
u_agent_2: |
||||
condition: service_started |
||||
u_server: |
||||
condition: service_healthy |
||||
environment: |
||||
RUST_BACKTRACE: 1 |
||||
U_SERVER: u_server |
@ -0,0 +1,81 @@ |
||||
import subprocess |
||||
from utils import * |
||||
|
||||
BASE_IMAGE_DIR = 'images' |
||||
|
||||
DOCKERFILES = { |
||||
'u_agent': { |
||||
'ctx': BASE_IMAGE_DIR, |
||||
'dockerfile_prefix': 'u_agent' |
||||
}, |
||||
'u_server': { |
||||
'ctx': BASE_IMAGE_DIR, |
||||
'dockerfile_prefix': 'u_server' |
||||
}, |
||||
'u_db': { |
||||
'ctx': BASE_IMAGE_DIR, |
||||
'dockerfile_prefix': 'u_db' |
||||
}, |
||||
'tests_runner': { |
||||
'ctx': BASE_IMAGE_DIR, |
||||
'dockerfile_prefix': 'tests_runner' |
||||
}, |
||||
} |
||||
|
||||
|
||||
def docker(args): |
||||
cmd = ['docker'] + args |
||||
log(f'Running docker command: {cmd}') |
||||
return subprocess.run( |
||||
cmd, |
||||
check=True, |
||||
) |
||||
|
||||
|
||||
def print_errors(errors): |
||||
err_msg = '\n'.join( |
||||
' {container}: {error}'.format(container=item['container'], |
||||
error=item['error']) |
||||
for item in errors) |
||||
|
||||
err('There are some errors in next containers:\n%s' % err_msg) |
||||
|
||||
|
||||
def check_state(containers): |
||||
errors = [] |
||||
for container in containers: |
||||
ret, out = subprocess.getstatusoutput( |
||||
'docker inspect --format \'{{ .State.Running }}\' %s' |
||||
% container) |
||||
out = out.strip() |
||||
if ret == 0: |
||||
if out == 'true': |
||||
continue |
||||
else: |
||||
errors.append({'container': container, |
||||
'error': 'Bad state: Running=%s' % out}) |
||||
else: |
||||
errors.append({'container': container, |
||||
'error': out}) |
||||
|
||||
return errors |
||||
|
||||
|
||||
def rebuild_images_if_needed(force_rebuild=False): |
||||
for img_name, data in DOCKERFILES.items(): |
||||
ctx = data['ctx'] |
||||
df_prefix = data.get('dockerfile_prefix') |
||||
df_suffix = 'Dockerfile' |
||||
img_name = f'unki/{img_name}' |
||||
log(f'Building docker image {img_name}') |
||||
cmd = [ |
||||
'build', |
||||
'-t', |
||||
img_name, |
||||
ctx, |
||||
] |
||||
if df_prefix: |
||||
cmd += ['-f', f'{ctx}/{df_prefix}.{df_suffix}'] |
||||
if force_rebuild: |
||||
cmd += ['--no-cache'] |
||||
docker(cmd) |
@ -0,0 +1,66 @@ |
||||
import subprocess |
||||
import shlex |
||||
from utils import * |
||||
from docker import docker, check_state, print_errors |
||||
|
||||
|
||||
class Compose: |
||||
ALL_CONTAINERS = [ |
||||
'u_agent_1', |
||||
'u_agent_2', |
||||
'u_server', |
||||
'u_db', |
||||
'tests_runner', |
||||
] |
||||
|
||||
def __init__(self): |
||||
self.container_tpl = 'integration_%s_1' |
||||
self.cmd_container = self.container_tpl % 'tests_runner' |
||||
self.ALL_CONTAINERS = [self.container_tpl % c for c in self.ALL_CONTAINERS] |
||||
|
||||
def _call(self, *args): |
||||
subprocess.check_call([ |
||||
'docker-compose', |
||||
'--no-ansi', |
||||
] + list(args) |
||||
) |
||||
|
||||
def up(self): |
||||
log('Instanciating cluster') |
||||
self._call('up', '-d') |
||||
log('Ok') |
||||
|
||||
def down(self): |
||||
log('Shutting down cluster') |
||||
self._call('down') |
||||
log('Ok') |
||||
|
||||
def stop(self): |
||||
log('Stopping cluster') |
||||
self._call('stop') |
||||
log('Ok') |
||||
|
||||
def run(self, cmd): |
||||
container = self.cmd_container |
||||
if isinstance(cmd, str): |
||||
cmd = shlex.split(cmd) |
||||
log(f'Running command "{cmd}" in container {container}') |
||||
result = docker([ |
||||
'exec', |
||||
'-ti', |
||||
container |
||||
] + cmd) |
||||
log('Ok') |
||||
return result |
||||
|
||||
def is_alive(self): |
||||
log('Check if all containers are alive') |
||||
|
||||
errors = check_state(self.ALL_CONTAINERS) |
||||
log('Check done') |
||||
|
||||
if errors: |
||||
print_errors(errors) |
||||
raise TestsError('Error during `is_alive` check') |
||||
else: |
||||
log('All containers are alive') |
@ -0,0 +1,4 @@ |
||||
FROM rust:1.53 |
||||
|
||||
RUN rustup target add x86_64-unknown-linux-musl |
||||
CMD ["sleep", "3600"] |
@ -0,0 +1,3 @@ |
||||
FROM centos:7 |
||||
|
||||
RUN yum update -y |
@ -0,0 +1,3 @@ |
||||
FROM postgres:13.3 |
||||
|
||||
RUN apt update && apt -y upgrade && apt install -y iproute2 |
@ -0,0 +1,3 @@ |
||||
FROM rust:1.53 |
||||
|
||||
RUN cargo install diesel_cli --no-default-features --features postgres |
@ -0,0 +1,36 @@ |
||||
import signal |
||||
import sys |
||||
from utils import * |
||||
from docker import rebuild_images_if_needed |
||||
from docker_compose import Compose |
||||
|
||||
|
||||
cluster = Compose() |
||||
|
||||
|
||||
def abort_handler(s, _): |
||||
warn(f'Received signal: {s}') |
||||
warn(f'Gracefully stopping...') |
||||
cluster.down() |
||||
|
||||
|
||||
def run_tests(): |
||||
force_rebuild = '--rebuild' in sys.argv |
||||
preserve_containers = '--preserve' in sys.argv |
||||
for s in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP): |
||||
signal.signal(s, abort_handler) |
||||
rebuild_images_if_needed(force_rebuild) |
||||
try: |
||||
cluster.up() |
||||
cluster.is_alive() |
||||
cluster.run('cargo test --test integration') |
||||
except Exception as e: |
||||
err(e) |
||||
sys.exit(1) |
||||
finally: |
||||
if not preserve_containers: |
||||
cluster.down() |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
run_tests() |
@ -0,0 +1,3 @@ |
||||
#!/bin/bash |
||||
set -e |
||||
python integration_tests.py $@ |
@ -0,0 +1,3 @@ |
||||
fn main() { |
||||
println!("Hello, world!"); |
||||
} |
@ -0,0 +1 @@ |
||||
|
@ -0,0 +1,48 @@ |
||||
use reqwest::{Client, RequestBuilder, Url}; |
||||
use serde::Serialize; |
||||
use serde_json::{from_str, json, Value}; |
||||
|
||||
const SERVER: &str = "u_server"; |
||||
const PORT: &str = "63714"; |
||||
|
||||
pub struct AgentClient { |
||||
client: Client, |
||||
base_url: Url, |
||||
} |
||||
|
||||
impl AgentClient { |
||||
pub fn new() -> Self { |
||||
Self { |
||||
client: Client::new(), |
||||
base_url: Url::parse(&format!("http://{}:{}", SERVER, PORT)).unwrap(), |
||||
} |
||||
} |
||||
|
||||
async fn process_request(&self, req: RequestBuilder, resp_needed: bool) -> Value { |
||||
let resp = req.send().await.unwrap(); |
||||
if let Err(e) = resp.error_for_status_ref() { |
||||
panic!( |
||||
"Server responded with code {}\nError: {}", |
||||
e.status() |
||||
.map(|s| s.to_string()) |
||||
.unwrap_or(String::from("<none>")), |
||||
e.to_string() |
||||
); |
||||
} |
||||
if !resp_needed { |
||||
return json!([]); |
||||
} |
||||
let resp: Value = from_str(&resp.text().await.unwrap()).unwrap(); |
||||
resp.get("inner").unwrap().get(0).unwrap().clone() |
||||
} |
||||
|
||||
pub async fn get<S: AsRef<str>>(&self, url: S) -> Value { |
||||
let req = self.client.get(self.base_url.join(url.as_ref()).unwrap()); |
||||
self.process_request(req, true).await |
||||
} |
||||
|
||||
pub async fn post<S: AsRef<str>, B: Serialize>(&self, url: S, body: &B) -> Value { |
||||
let req = self.client.post(self.base_url.join(url.as_ref()).unwrap()); |
||||
self.process_request(req.json(body), false).await |
||||
} |
||||
} |
@ -0,0 +1,5 @@ |
||||
pub mod client; |
||||
pub mod panel; |
||||
|
||||
pub use client::AgentClient; |
||||
pub use panel::Panel; |
@ -0,0 +1,53 @@ |
||||
use serde_json::{from_slice, Value}; |
||||
use shlex::split; |
||||
use std::process::{Command, Output}; |
||||
|
||||
const PANEL_BINARY: &str = "/u_panel"; |
||||
|
||||
pub struct Panel; |
||||
|
||||
impl Panel { |
||||
fn run(args: &[&str]) -> Output { |
||||
Command::new(PANEL_BINARY) |
||||
.arg("--json") |
||||
.args(args) |
||||
.output() |
||||
.unwrap() |
||||
} |
||||
|
||||
pub fn output_argv(args: &[&str]) -> Value { |
||||
let result = Self::run(args); |
||||
assert!(result.status.success()); |
||||
from_slice(&result.stdout).unwrap() |
||||
} |
||||
|
||||
pub fn output<S: Into<String>>(args: S) -> Value { |
||||
let splitted = split(args.into().as_ref()).unwrap(); |
||||
Self::output_argv( |
||||
splitted |
||||
.iter() |
||||
.map(|s| s.as_ref()) |
||||
.collect::<Vec<&str>>() |
||||
.as_ref(), |
||||
) |
||||
} |
||||
|
||||
fn status_is_ok(data: &Value) { |
||||
assert_eq!( |
||||
data["status"], "ok", |
||||
"Panel failed with erroneous status: {}", |
||||
data["data"] |
||||
); |
||||
} |
||||
|
||||
pub fn check_status<S: Into<String>>(args: S) { |
||||
let result = Self::output(args); |
||||
Self::status_is_ok(&result); |
||||
} |
||||
|
||||
pub fn check_output<S: Into<String>>(args: S) -> Vec<Value> { |
||||
let result = Self::output(args); |
||||
Self::status_is_ok(&result); |
||||
result["data"].as_array().unwrap().clone() |
||||
} |
||||
} |
@ -0,0 +1,80 @@ |
||||
mod helpers; |
||||
|
||||
use helpers::{AgentClient, Panel}; |
||||
|
||||
use serde_json::json; |
||||
use std::thread::sleep; |
||||
use std::time::Duration; |
||||
use uuid::Uuid; |
||||
|
||||
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>; |
||||
|
||||
async fn register_agent() -> Uuid { |
||||
let cli = AgentClient::new(); |
||||
let agent_uid = Uuid::new_v4(); |
||||
let resp = cli.get(format!("get_agent_jobs/{}", agent_uid)).await; |
||||
let job_id = &resp["job_id"]; |
||||
let resp = cli.get(format!("get_jobs/{}", job_id)).await; |
||||
assert_eq!(&resp["alias"], "agent_hello"); |
||||
let agent_data = json! { |
||||
{"id": &agent_uid,"inner":[ |
||||
{"Agent": |
||||
{"alias":null, |
||||
"hostname":"3b1030fa6324", |
||||
"id":&agent_uid, |
||||
"is_root":false, |
||||
"is_root_allowed":false, |
||||
"last_active":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814921}, |
||||
"platform":"x86_64-unknown-linux-gnu", |
||||
"regtime":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814945}, |
||||
"state":"New", |
||||
"token":null, |
||||
"username":"root"} |
||||
} |
||||
]} |
||||
}; |
||||
cli.post("report", &agent_data).await; |
||||
agent_uid |
||||
} |
||||
|
||||
#[tokio::test] |
||||
async fn test_registration() -> TestResult { |
||||
let agent_uid = register_agent().await; |
||||
let agents = Panel::check_output("agents list"); |
||||
let found = agents |
||||
.iter() |
||||
.find(|v| v["id"].as_str().unwrap() == agent_uid.to_string()); |
||||
assert!(found.is_some()); |
||||
Ok(()) |
||||
} |
||||
|
||||
#[tokio::test] |
||||
async fn test_setup_tasks() -> TestResult { |
||||
let agent_uid = Panel::check_output("agents list")[0]["id"].clone(); |
||||
dbg!(&agent_uid); |
||||
let job_alias = "passwd_contents"; |
||||
let cmd = format!("jobs add --alias {} 'cat /etc/passwd'", job_alias); |
||||
Panel::check_status(cmd); |
||||
let cmd = format!("jobmap add {} {}", agent_uid, job_alias); |
||||
let assigned_uids = Panel::check_output(cmd); |
||||
dbg!(&assigned_uids); |
||||
loop { |
||||
let result = Panel::check_output(format!("jobmap list {}", assigned_uids[0])); |
||||
dbg!(&result); |
||||
match result.get(0) { |
||||
Some(entry) if entry["state"] == "Finished" => { |
||||
println!("{}", result[0]); |
||||
break; |
||||
} |
||||
None => { |
||||
eprintln!("jobmap list is empty (bad bad bad)"); |
||||
continue; |
||||
} |
||||
_ => { |
||||
sleep(Duration::from_secs(1)); |
||||
eprintln!("waiting for task"); |
||||
} |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
@ -0,0 +1,32 @@ |
||||
from termcolor import colored |
||||
|
||||
__all__ = ['log', 'warn', 'err', 'TestsError'] |
||||
|
||||
|
||||
class TestsError(Exception): |
||||
pass |
||||
|
||||
|
||||
COLORS = { |
||||
'question': colored('[?]', 'magenta'), |
||||
'info': colored('[~]', 'green'), |
||||
'warning': colored('[!]', 'yellow'), |
||||
'error': colored('[X]', 'red'), |
||||
} |
||||
|
||||
|
||||
def warn(msg): |
||||
log(msg, log_lvl='w') |
||||
|
||||
|
||||
def err(msg): |
||||
log(msg, log_lvl='e') |
||||
|
||||
|
||||
def log(msg, log_lvl='i'): |
||||
for lvl, text in COLORS.items(): |
||||
if lvl.startswith(log_lvl): |
||||
print(f'{text} {msg}') |
||||
break |
||||
else: |
||||
ValueError('Unknown log level') |
@ -0,0 +1,14 @@ |
||||
use std::path::PathBuf; |
||||
use std::process::Command; |
||||
|
||||
fn main() { |
||||
let echoer = PathBuf::from("./tests/fixtures/echoer"); |
||||
let mut echoer_src = echoer.clone(); |
||||
echoer_src.set_extension("rs"); |
||||
Command::new("rustc") |
||||
.args(&[echoer_src.to_str().unwrap(), "-o", echoer.to_str().unwrap()]) |
||||
.status() |
||||
.unwrap(); |
||||
println!("cargo:rerun-if-changed={}", echoer_src.display()); |
||||
println!("cargo:rerun-if-changed={}", echoer.display()); |
||||
} |
@ -1,161 +0,0 @@ |
||||
use crate::UError; |
||||
use chrono::{offset::Local, DateTime}; |
||||
use nix::{ |
||||
sys::signal::{signal, SigHandler, Signal}, |
||||
unistd::{chdir, close as fdclose, fork, getppid, setsid, ForkResult}, |
||||
}; |
||||
use std::{ |
||||
env::temp_dir, fs, ops::Drop, os::unix::fs::PermissionsExt, path::PathBuf, process::exit, |
||||
time::SystemTime, |
||||
}; |
||||
use uuid::Uuid; |
||||
|
||||
pub trait OneOrMany<T> { |
||||
fn into_vec(self) -> Vec<T>; |
||||
} |
||||
|
||||
impl<T> OneOrMany<T> for T { |
||||
fn into_vec(self) -> Vec<T> { |
||||
vec![self] |
||||
} |
||||
} |
||||
|
||||
impl<T> OneOrMany<T> for Vec<T> { |
||||
fn into_vec(self) -> Vec<T> { |
||||
self |
||||
} |
||||
} |
||||
|
||||
pub fn daemonize() { |
||||
if getppid().as_raw() != 1 { |
||||
setsig(Signal::SIGTTOU, SigHandler::SigIgn); |
||||
setsig(Signal::SIGTTIN, SigHandler::SigIgn); |
||||
setsig(Signal::SIGTSTP, SigHandler::SigIgn); |
||||
} |
||||
for fd in 0..=2 { |
||||
match fdclose(fd) { |
||||
_ => (), |
||||
} |
||||
} |
||||
match chdir("/") { |
||||
_ => (), |
||||
}; |
||||
|
||||
match fork() { |
||||
Ok(ForkResult::Parent { .. }) => { |
||||
exit(0); |
||||
} |
||||
Ok(ForkResult::Child) => match setsid() { |
||||
_ => (), |
||||
}, |
||||
Err(_) => exit(255), |
||||
} |
||||
} |
||||
|
||||
pub fn setsig(sig: Signal, hnd: SigHandler) { |
||||
unsafe { |
||||
signal(sig, hnd).unwrap(); |
||||
} |
||||
} |
||||
|
||||
pub fn vec_to_string(v: &[u8]) -> String { |
||||
String::from_utf8_lossy(v).to_string() |
||||
} |
||||
|
||||
pub fn opt_to_string<T: ToString>(item: Option<T>) -> String { |
||||
match item { |
||||
Some(s) => s.to_string(), |
||||
None => String::new(), |
||||
} |
||||
} |
||||
|
||||
pub fn systime_to_string(time: &SystemTime) -> String { |
||||
DateTime::<Local>::from(*time) |
||||
.format("%d/%m/%Y %T") |
||||
.to_string() |
||||
} |
||||
|
||||
pub struct TempFile { |
||||
path: PathBuf, |
||||
} |
||||
|
||||
impl TempFile { |
||||
pub fn get_path(&self) -> String { |
||||
self.path.to_string_lossy().to_string() |
||||
} |
||||
|
||||
pub fn new() -> Self { |
||||
let name = Uuid::simple(&Uuid::new_v4()).to_string(); |
||||
let mut path = temp_dir(); |
||||
path.push(name); |
||||
Self { path } |
||||
} |
||||
|
||||
pub fn write_all(&self, data: &[u8]) -> Result<(), String> { |
||||
fs::write(&self.path, data).map_err(|e| e.to_string()) |
||||
} |
||||
|
||||
pub fn write_exec(data: &[u8]) -> Result<Self, (String, String)> { |
||||
let this = Self::new(); |
||||
let path = this.get_path(); |
||||
this.write_all(data).map_err(|e| (path.clone(), e))?; |
||||
let perms = fs::Permissions::from_mode(0o555); |
||||
fs::set_permissions(&path, perms).map_err(|e| (path, e.to_string()))?; |
||||
Ok(this) |
||||
} |
||||
} |
||||
|
||||
impl Drop for TempFile { |
||||
fn drop(&mut self) { |
||||
fs::remove_file(&self.path).ok(); |
||||
} |
||||
} |
||||
|
||||
pub struct CombinedResult<T, E = UError> { |
||||
ok: Vec<T>, |
||||
err: Vec<E>, |
||||
} |
||||
|
||||
impl<T, E> CombinedResult<T, E> { |
||||
pub fn new() -> Self { |
||||
Self { |
||||
ok: vec![], |
||||
err: vec![], |
||||
} |
||||
} |
||||
|
||||
pub fn ok<I: OneOrMany<T>>(&mut self, result: I) { |
||||
self.ok.extend(result.into_vec()); |
||||
} |
||||
|
||||
pub fn err<I: OneOrMany<E>>(&mut self, err: I) { |
||||
self.err.extend(err.into_vec()); |
||||
} |
||||
|
||||
pub fn unwrap(self) -> Vec<T> { |
||||
let err_len = self.err.len(); |
||||
if err_len > 0 { |
||||
panic!("CombinedResult has {} errors", err_len); |
||||
} |
||||
self.ok |
||||
} |
||||
|
||||
pub fn unwrap_one(self) -> T { |
||||
self.unwrap().pop().unwrap() |
||||
} |
||||
|
||||
pub fn pop_errors(&mut self) -> Vec<E> { |
||||
self.err.drain(..).collect() |
||||
} |
||||
} |
||||
|
||||
#[macro_export] |
||||
macro_rules! unwrap_enum { |
||||
($src:ident, $t:path) => { |
||||
if let $t(result) = $src { |
||||
result |
||||
} else { |
||||
panic!("wrong type") |
||||
} |
||||
}; |
||||
} |
@ -0,0 +1,40 @@ |
||||
use crate::utils::OneOrVec; |
||||
use crate::UError; |
||||
|
||||
pub struct CombinedResult<T, E = UError> { |
||||
ok: Vec<T>, |
||||
err: Vec<E>, |
||||
} |
||||
|
||||
impl<T, E> CombinedResult<T, E> { |
||||
pub fn new() -> Self { |
||||
Self { |
||||
ok: vec![], |
||||
err: vec![], |
||||
} |
||||
} |
||||
|
||||
pub fn ok<I: OneOrVec<T>>(&mut self, result: I) { |
||||
self.ok.extend(result.into_vec()); |
||||
} |
||||
|
||||
pub fn err<I: OneOrVec<E>>(&mut self, err: I) { |
||||
self.err.extend(err.into_vec()); |
||||
} |
||||
|
||||
pub fn unwrap(self) -> Vec<T> { |
||||
let err_len = self.err.len(); |
||||
if err_len > 0 { |
||||
panic!("CombinedResult has {} errors", err_len); |
||||
} |
||||
self.ok |
||||
} |
||||
|
||||
pub fn unwrap_one(self) -> T { |
||||
self.unwrap().pop().unwrap() |
||||
} |
||||
|
||||
pub fn pop_errors(&mut self) -> Vec<E> { |
||||
self.err.drain(..).collect() |
||||
} |
||||
} |
@ -0,0 +1,19 @@ |
||||
use chrono::{offset::Local, DateTime}; |
||||
use std::time::SystemTime; |
||||
|
||||
pub fn bytes_to_string(v: &[u8]) -> String { |
||||
String::from_utf8_lossy(v).to_string() |
||||
} |
||||
|
||||
pub fn opt_to_string<T: ToString>(item: Option<T>) -> String { |
||||
match item { |
||||
Some(s) => s.to_string(), |
||||
None => String::new(), |
||||
} |
||||
} |
||||
|
||||
pub fn systime_to_string(time: &SystemTime) -> String { |
||||
DateTime::<Local>::from(*time) |
||||
.format("%d/%m/%Y %T") |
||||
.to_string() |
||||
} |
@ -0,0 +1,71 @@ |
||||
use nix::{ |
||||
sys::signal::{signal, SigHandler, Signal}, |
||||
unistd::{chdir, close as fdclose, fork, getppid, setsid, ForkResult}, |
||||
}; |
||||
use std::process::exit; |
||||
|
||||
pub trait OneOrVec<T> { |
||||
fn into_vec(self) -> Vec<T>; |
||||
} |
||||
|
||||
impl<T> OneOrVec<T> for T { |
||||
fn into_vec(self) -> Vec<T> { |
||||
vec![self] |
||||
} |
||||
} |
||||
|
||||
impl<T> OneOrVec<T> for Vec<T> { |
||||
fn into_vec(self) -> Vec<T> { |
||||
self |
||||
} |
||||
} |
||||
|
||||
#[macro_export] |
||||
macro_rules! unwrap_enum { |
||||
($src:ident, $t:path) => { |
||||
if let $t(result) = $src { |
||||
result |
||||
} else { |
||||
panic!("wrong type") |
||||
} |
||||
}; |
||||
} |
||||
|
||||
pub fn daemonize() { |
||||
if getppid().as_raw() != 1 { |
||||
setsig(Signal::SIGTTOU, SigHandler::SigIgn); |
||||
setsig(Signal::SIGTTIN, SigHandler::SigIgn); |
||||
setsig(Signal::SIGTSTP, SigHandler::SigIgn); |
||||
} |
||||
for fd in 0..=2 { |
||||
match fdclose(fd) { |
||||
_ => (), |
||||
} |
||||
} |
||||
match chdir("/") { |
||||
_ => (), |
||||
}; |
||||
|
||||
match fork() { |
||||
Ok(ForkResult::Parent { .. }) => { |
||||
exit(0); |
||||
} |
||||
Ok(ForkResult::Child) => match setsid() { |
||||
_ => (), |
||||
}, |
||||
Err(_) => exit(255), |
||||
} |
||||
} |
||||
|
||||
pub fn setsig(sig: Signal, hnd: SigHandler) { |
||||
unsafe { |
||||
signal(sig, hnd).unwrap(); |
||||
} |
||||
} |
||||
|
||||
pub fn init_env() { |
||||
let envs = [".env"]; |
||||
for envfile in &envs { |
||||
dotenv::from_filename(envfile).ok(); |
||||
} |
||||
} |
@ -0,0 +1,11 @@ |
||||
pub mod combined_result; |
||||
pub mod conv; |
||||
pub mod misc; |
||||
pub mod tempfile; |
||||
pub mod vec_display; |
||||
|
||||
pub use combined_result::*; |
||||
pub use conv::*; |
||||
pub use misc::*; |
||||
pub use tempfile::*; |
||||
pub use vec_display::*; |
@ -0,0 +1,38 @@ |
||||
use std::{env::temp_dir, fs, ops::Drop, os::unix::fs::PermissionsExt, path::PathBuf}; |
||||
use uuid::Uuid; |
||||
|
||||
pub struct TempFile { |
||||
path: PathBuf, |
||||
} |
||||
|
||||
impl TempFile { |
||||
pub fn get_path(&self) -> String { |
||||
self.path.to_string_lossy().to_string() |
||||
} |
||||
|
||||
pub fn new() -> Self { |
||||
let name = Uuid::simple(&Uuid::new_v4()).to_string(); |
||||
let mut path = temp_dir(); |
||||
path.push(name); |
||||
Self { path } |
||||
} |
||||
|
||||
pub fn write_all(&self, data: &[u8]) -> Result<(), String> { |
||||
fs::write(&self.path, data).map_err(|e| e.to_string()) |
||||
} |
||||
|
||||
pub fn write_exec(data: &[u8]) -> Result<Self, (String, String)> { |
||||
let this = Self::new(); |
||||
let path = this.get_path(); |
||||
this.write_all(data).map_err(|e| (path.clone(), e))?; |
||||
let perms = fs::Permissions::from_mode(0o555); |
||||
fs::set_permissions(&path, perms).map_err(|e| (path, e.to_string()))?; |
||||
Ok(this) |
||||
} |
||||
} |
||||
|
||||
impl Drop for TempFile { |
||||
fn drop(&mut self) { |
||||
fs::remove_file(&self.path).ok(); |
||||
} |
||||
} |
@ -0,0 +1,40 @@ |
||||
use crate::{messaging::AsMsg, utils::OneOrVec}; |
||||
use serde::{Deserialize, Serialize}; |
||||
use std::fmt::{self, Display, Formatter}; |
||||
use std::ops::{Deref, DerefMut}; |
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Default)] |
||||
pub struct VecDisplay<T: AsMsg + Display>(pub Vec<T>); |
||||
|
||||
impl<T: AsMsg + Display> VecDisplay<T> { |
||||
pub fn new<I: OneOrVec<T>>(inner: I) -> Self { |
||||
VecDisplay(inner.into_vec()) |
||||
} |
||||
|
||||
pub fn into_builtin_vec(self) -> Vec<T> { |
||||
self.0 |
||||
} |
||||
} |
||||
|
||||
impl<T: AsMsg + Display> Deref for VecDisplay<T> { |
||||
type Target = Vec<T>; |
||||
|
||||
fn deref(&self) -> &Self::Target { |
||||
&self.0 |
||||
} |
||||
} |
||||
|
||||
impl<T: AsMsg + Display> DerefMut for VecDisplay<T> { |
||||
fn deref_mut(&mut self) -> &mut Self::Target { |
||||
&mut self.0 |
||||
} |
||||
} |
||||
|
||||
impl<T: AsMsg + Display> Display for VecDisplay<T> { |
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
||||
for (i, itm) in self.0.iter().enumerate() { |
||||
writeln!(f, "### {}:\n{}\n", i, itm)?; |
||||
} |
||||
Ok(()) |
||||
} |
||||
} |
@ -1,17 +0,0 @@ |
||||
/* |
||||
use std::fmt::Display; |
||||
use u_api_proc_macro::api_route; |
||||
use uuid::Uuid; |
||||
|
||||
struct Paths; |
||||
struct ClientHandler; |
||||
|
||||
#[test] |
||||
fn test_api_proc_macro() { |
||||
#[api_route("GET", Uuid)] |
||||
fn list<T: Display>(&self, msg: T) -> String {} |
||||
|
||||
#[api_route("POST", Uuid)] |
||||
fn report<T: Display>(&self, msg: T) -> String {} |
||||
} |
||||
*/ |
Binary file not shown.
@ -1,161 +0,0 @@ |
||||
use std::{time::SystemTime}; |
||||
use u_lib::{ |
||||
errors::UError, |
||||
models::{ |
||||
jobs::{JobMeta}, |
||||
ExecResult, |
||||
misc::JobType |
||||
}, |
||||
builder::{JobBuilder, NamedJobBuilder} |
||||
}; |
||||
|
||||
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>; |
||||
|
||||
#[tokio::test] |
||||
async fn test_is_really_async() { |
||||
const SLEEP_SECS: u64 = 1; |
||||
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); |
||||
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect(); |
||||
let now = SystemTime::now(); |
||||
JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await; |
||||
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) |
||||
} |
||||
|
||||
#[test_case(
|
||||
"/bin/sh {}",
|
||||
Some(b"echo test01 > /tmp/asd; cat /tmp/asd"),
|
||||
"test01" |
||||
;"sh payload" |
||||
)] |
||||
#[test_case(
|
||||
r#"/usr/bin/python -c 'print("test02")'"#,
|
||||
None,
|
||||
"test02" |
||||
;"python cmd" |
||||
)] |
||||
#[test_case(
|
||||
"/{}",
|
||||
Some( |
||||
br#"#!/bin/sh |
||||
TMPPATH=/tmp/lol |
||||
mkdir -p $TMPPATH |
||||
echo test03 > $TMPPATH/t |
||||
cat $TMPPATH/t"# |
||||
), |
||||
"test03" |
||||
;"sh multiline payload" |
||||
)] |
||||
#[test_case(
|
||||
"/{} 'some msg as arg'", |
||||
Some(include_bytes!("../fixtures/echoer")), |
||||
"some msg as arg" |
||||
;"standalone binary with args" |
||||
)] |
||||
#[tokio::test] |
||||
async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult { |
||||
let mut job = JobMeta::builder().with_shell(cmd);
|
||||
if let Some(p) = payload { |
||||
job = job.with_payload(p); |
||||
} |
||||
let job = job.build().unwrap();
|
||||
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; |
||||
let result = unwrap_enum!(job_result, ExecResult::Assigned); |
||||
let result = result.to_string_result().unwrap(); |
||||
assert_eq!(result.trim(), expected_result); |
||||
Ok(()) |
||||
} |
||||
|
||||
#[tokio::test] |
||||
async fn test_complex_load() -> TestResult { |
||||
const SLEEP_SECS: u64 = 1; |
||||
let now = SystemTime::now(); |
||||
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); |
||||
let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await;
|
||||
let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one() |
||||
.wait_one() |
||||
.await; |
||||
let ls = unwrap_enum!(ls, ExecResult::Assigned); |
||||
assert_eq!(ls.retcode.unwrap(), 0); |
||||
let folders = ls.to_string_result().unwrap(); |
||||
let subfolders_jobs: Vec<JobMeta> = folders |
||||
.lines() |
||||
.map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) |
||||
.collect(); |
||||
let ls_subfolders = JobBuilder::from_meta(subfolders_jobs) |
||||
.unwrap_one() |
||||
.wait() |
||||
.await; |
||||
for result in ls_subfolders { |
||||
let result = unwrap_enum!(result, ExecResult::Assigned); |
||||
assert_eq!(result.retcode.unwrap(), 0); |
||||
} |
||||
longest_job.wait().await; |
||||
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); |
||||
Ok(()) |
||||
} |
||||
/* |
||||
#[tokio::test] |
||||
async fn test_exec_multiple_jobs_nowait() -> UResult<()> { |
||||
const REPEATS: usize = 10; |
||||
let job = JobMeta::from_shell("whoami"); |
||||
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect(); |
||||
build_jobs(sleep_jobs).spawn().await; |
||||
let mut completed = 0; |
||||
while completed < REPEATS { |
||||
let c = pop_completed().await.len(); |
||||
if c > 0 { |
||||
completed += c; |
||||
println!("{}", c); |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
||||
*/ |
||||
#[tokio::test] |
||||
async fn test_failing_shell_job() -> TestResult { |
||||
let job = JobMeta::from_shell("lol_kek_puk")?; |
||||
let job_result = JobBuilder::from_meta(job) |
||||
.unwrap_one() |
||||
.wait_one() |
||||
.await; |
||||
let job_result = unwrap_enum!(job_result, ExecResult::Assigned); |
||||
let output = job_result.to_string_result().unwrap(); |
||||
assert!(output.contains("No such file")); |
||||
assert!(job_result.retcode.is_none()); |
||||
Ok(()) |
||||
} |
||||
|
||||
#[test_case(
|
||||
"/bin/bash {}",
|
||||
None,
|
||||
"contains executable" |
||||
; "no binary" |
||||
)] |
||||
#[test_case(
|
||||
"/bin/bash",
|
||||
Some(b"whoami"),
|
||||
"contains no executable" |
||||
; "no path to binary" |
||||
)] |
||||
#[tokio::test] |
||||
async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult { |
||||
let mut job = JobMeta::builder().with_shell(cmd); |
||||
if let Some(p) = payload { |
||||
job = job.with_payload(p); |
||||
} |
||||
let err = job.build().unwrap_err(); |
||||
let err_msg = unwrap_enum!(err, UError::JobArgsError); |
||||
assert!(err_msg.contains(err_str)); |
||||
Ok(()) |
||||
} |
||||
|
||||
#[tokio::test] |
||||
async fn test_different_job_types() -> TestResult { |
||||
let mut jobs = NamedJobBuilder::from_meta(vec![ |
||||
("sleeper", JobMeta::from_shell("sleep 3")?), |
||||
("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?) |
||||
]).wait().await; |
||||
let gathered = jobs.pop("gatherer"); |
||||
assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None); |
||||
Ok(()) |
||||
} |
@ -1,43 +0,0 @@ |
||||
use u_lib::{models::JobOutput, utils::vec_to_string}; |
||||
|
||||
const STDOUT: &str = "<***STDOUT***>"; |
||||
const STDERR: &str = "<***STDERR***>"; |
||||
|
||||
#[test_case(
|
||||
"lol", |
||||
"kek", |
||||
&format!("{}lol{}kek", STDOUT, STDERR) |
||||
;"stdout stderr" |
||||
)] |
||||
#[test_case(
|
||||
"", |
||||
"kek", |
||||
&format!("{}kek", STDERR) |
||||
;"stderr" |
||||
)] |
||||
fn test_to_combined(stdout: &str, stderr: &str, result: &str) { |
||||
let output = JobOutput::new() |
||||
.stdout(stdout.as_bytes().to_vec()) |
||||
.stderr(stderr.as_bytes().to_vec()); |
||||
assert_eq!(&vec_to_string(&output.into_combined()), result) |
||||
} |
||||
|
||||
#[test_case(
|
||||
&format!("{}lal{}kik", STDOUT, STDERR), |
||||
"lal\nkik" |
||||
;"stdout stderr" |
||||
)] |
||||
#[test_case(
|
||||
&format!("{}qeq", STDOUT), |
||||
"qeq" |
||||
;"stdout" |
||||
)] |
||||
#[test_case(
|
||||
&format!("{}vev", STDERR), |
||||
"vev" |
||||
;"stderr" |
||||
)] |
||||
fn test_from_combined(src: &str, result: &str) { |
||||
let output = JobOutput::from_combined(src.as_bytes()).unwrap(); |
||||
assert_eq!(vec_to_string(&output.to_appropriate()).trim(), result); |
||||
} |
@ -1,10 +0,0 @@ |
||||
#[macro_use] |
||||
extern crate test_case; |
||||
|
||||
#[macro_use] |
||||
extern crate u_lib; |
||||
|
||||
mod jobs { |
||||
mod execution; |
||||
mod output; |
||||
} |
@ -1,10 +1,11 @@ |
||||
#!/bin/bash |
||||
set -e |
||||
source $(dirname $0)/rootdir.sh #set ROOTDIR |
||||
umask 002 |
||||
docker run \ |
||||
-v $ROOTDIR:/volume \ |
||||
-v cargo-cache:/root/.cargo/registry \ |
||||
-w /volume \ |
||||
-it \ |
||||
clux/muslrust \ |
||||
unki/musllibs \ |
||||
cargo $@ |
||||
|
Loading…
Reference in new issue