Merge branch '14-integration-tests' into 'master'

Resolve "Integration tests"

Closes #14

See merge request root/unki!4
4-update-check
ortem 4 years ago
commit 84c63329bc
  1. 3
      .cargo/config.toml
  2. 4
      .env
  3. 8
      .gitignore
  4. 6
      Cargo.toml
  5. 18
      Makefile
  6. 1
      bin/u_agent/Cargo.toml
  7. 88
      bin/u_agent/src/lib.rs
  8. 88
      bin/u_agent/src/main.rs
  9. 3
      bin/u_panel/Cargo.toml
  10. 110
      bin/u_panel/src/main.rs
  11. 16
      bin/u_server/Cargo.toml
  12. 34
      bin/u_server/src/db.rs
  13. 88
      bin/u_server/src/filters.rs
  14. 220
      bin/u_server/src/handlers.rs
  15. 102
      bin/u_server/src/lib.rs
  16. 123
      bin/u_server/src/main.rs
  17. 23
      integration/Cargo.toml
  18. 89
      integration/docker-compose.yml
  19. 81
      integration/docker.py
  20. 66
      integration/docker_compose.py
  21. 4
      integration/images/tests_runner.Dockerfile
  22. 3
      integration/images/u_agent.Dockerfile
  23. 3
      integration/images/u_db.Dockerfile
  24. 3
      integration/images/u_server.Dockerfile
  25. 36
      integration/integration_tests.py
  26. 3
      integration/integration_tests.sh
  27. 3
      integration/src/main.rs
  28. 1
      integration/tests/behaviour.rs
  29. 48
      integration/tests/helpers/client.rs
  30. 5
      integration/tests/helpers/mod.rs
  31. 53
      integration/tests/helpers/panel.rs
  32. 80
      integration/tests/tests.rs
  33. 32
      integration/utils.py
  34. 3
      lib/u_lib/Cargo.toml
  35. 14
      lib/u_lib/build.rs
  36. 27
      lib/u_lib/src/api.rs
  37. 179
      lib/u_lib/src/builder.rs
  38. 5
      lib/u_lib/src/config.rs
  39. 6
      lib/u_lib/src/executor.rs
  40. 3
      lib/u_lib/src/lib.rs
  41. 6
      lib/u_lib/src/messaging.rs
  42. 3
      lib/u_lib/src/models/agent.rs
  43. 48
      lib/u_lib/src/models/jobs/misc.rs
  44. 13
      lib/u_lib/src/models/mod.rs
  45. 161
      lib/u_lib/src/utils.rs
  46. 40
      lib/u_lib/src/utils/combined_result.rs
  47. 19
      lib/u_lib/src/utils/conv.rs
  48. 71
      lib/u_lib/src/utils/misc.rs
  49. 11
      lib/u_lib/src/utils/mod.rs
  50. 38
      lib/u_lib/src/utils/tempfile.rs
  51. 40
      lib/u_lib/src/utils/vec_display.rs
  52. 17
      lib/u_lib/tests/api_macro.rs
  53. BIN
      lib/u_lib/tests/fixtures/echoer
  54. 161
      lib/u_lib/tests/jobs/execution.rs
  55. 43
      lib/u_lib/tests/jobs/output.rs
  56. 10
      lib/u_lib/tests/tests.rs
  57. 3
      scripts/cargo_musl.sh

@ -1,3 +0,0 @@
[build]
target = "x86_64-unknown-linux-gnu" # -musl"

@ -1 +1,3 @@
export DATABASE_URL=postgres://postgres:12348756@172.17.0.2/u_db ADMIN_AUTH_TOKEN=464af63dbd241969baa1e94b2461d94d
POSTGRES_PASSWORD=12348756
DATABASE_URL=postgres://postgres:${POSTGRES_PASSWORD}@u_db/u_db

8
.gitignore vendored

@ -1,4 +1,6 @@
/target target/
**/*.rs.bk **/*.rs.bk
/.idea .idea/
/data data/
static/
**/*.pyc

@ -5,7 +5,8 @@ members = [
"bin/u_run", "bin/u_run",
"bin/u_server", "bin/u_server",
"lib/u_lib", "lib/u_lib",
"lib/u_api_proc_macro" "lib/u_api_proc_macro",
"integration"
] ]
[profile.release] [profile.release]
@ -13,5 +14,4 @@ panic = "abort"
[profile.dev] [profile.dev]
debug = true # Добавляет флаг `-g` для компилятора; debug = true # Добавляет флаг `-g` для компилятора;
opt-level = 0 opt-level = 0

@ -0,0 +1,18 @@
.PHONY: _pre_build debug release run clean
CARGO=./scripts/cargo_musl.sh
clean:
${CARGO} clean
_pre_build:
docker build -t unki/musllibs ./muslrust
debug: _pre_build
${CARGO} build
release: _pre_build
${CARGO} build --release
run: build
${CARGO} run

@ -13,4 +13,5 @@ log = "^0.4"
env_logger = "0.8.3" env_logger = "0.8.3"
uuid = "0.6.5" uuid = "0.6.5"
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
openssl = "*"
u_lib = { version = "*", path = "../../lib/u_lib" } u_lib = { version = "*", path = "../../lib/u_lib" }

@ -0,0 +1,88 @@
// TODO:
// поддержка питона
// резолв адреса управляющего сервера через DoT
// кроссплатформенность (реализовать интерфейс для винды и никсов)
// проверка обнов
// самоуничтожение
#[macro_use]
extern crate log;
extern crate env_logger;
use std::env;
use tokio::time::{sleep, Duration};
use u_lib::{
api::ClientHandler,
builder::JobBuilder,
cache::JobCache,
executor::pop_completed,
models::{AssignedJob, ExecResult},
UID,
//daemonize
};
#[macro_export]
macro_rules! retry_until_ok {
( $body:expr ) => {
loop {
match $body {
Ok(r) => break r,
Err(e) => error!("{:?}", e),
};
sleep(Duration::from_secs(5)).await;
}
};
}
pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) {
if job_requests.len() > 0 {
for jr in &job_requests {
if !JobCache::contains(&jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await)
.pop()
.unwrap();
JobCache::insert(fetched_job);
}
}
info!(
"Scheduling jobs: \n{}",
job_requests
.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join("\n")
);
let mut builder = JobBuilder::from_request(job_requests);
let errors = builder.pop_errors();
if errors.len() > 0 {
error!(
"Some errors encountered: \n{}",
errors
.iter()
.map(|j| j.to_string())
.collect::<Vec<String>>()
.join("\n")
);
}
builder.unwrap_one().spawn().await;
}
}
pub async fn run_forever() {
//daemonize();
env_logger::init();
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip.as_deref());
info!("Connecting to the server");
loop {
let job_requests: Vec<AssignedJob> =
retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await).into_builtin_vec();
process_request(job_requests, &instance).await;
let result: Vec<ExecResult> = pop_completed().await.into_iter().collect();
if result.len() > 0 {
retry_until_ok!(instance.report(&result).await);
}
sleep(Duration::from_secs(5)).await;
}
}

@ -1,89 +1,7 @@
// TODO: use tokio;
// поддержка питона use u_agent::run_forever;
// резолв адреса управляющего сервера через DoT
// кроссплатформенность (реализовать интерфейс для винды и никсов)
// проверка обнов
// самоуничтожение
#[macro_use]
extern crate log;
extern crate env_logger;
use std::env;
use tokio::time::{sleep, Duration};
use u_lib::{
api::ClientHandler,
builder::JobBuilder,
cache::JobCache,
executor::pop_completed,
models::{AssignedJob, ExecResult},
UID,
//daemonize
};
#[macro_export]
macro_rules! retry_until_ok {
( $body:expr ) => {
loop {
match $body {
Ok(r) => break r,
Err(e) => error!("{:?}", e),
};
sleep(Duration::from_secs(5)).await;
}
};
}
async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) {
if job_requests.len() > 0 {
for jr in &job_requests {
if !JobCache::contains(&jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await)
.pop()
.unwrap();
JobCache::insert(fetched_job);
}
}
info!(
"Scheduling jobs: \n{}",
job_requests
.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join("\n")
);
let mut builder = JobBuilder::from_request(job_requests);
let errors = builder.pop_errors();
if errors.len() > 0 {
error!(
"Some errors encountered: \n{}",
errors
.iter()
.map(|j| j.to_string())
.collect::<Vec<String>>()
.join("\n")
);
}
builder.unwrap_one().spawn().await;
}
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
//daemonize(); run_forever().await;
env_logger::init();
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip);
info!("Connecting to the server");
loop {
let job_requests: Vec<AssignedJob> =
retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await);
process_request(job_requests, &instance).await;
let result: Vec<ExecResult> = pop_completed().await.into_iter().collect();
if result.len() > 0 {
retry_until_ok!(instance.report(&result).await)
}
sleep(Duration::from_secs(5)).await;
}
} }

@ -13,4 +13,7 @@ log = "^0.4"
env_logger = "0.7.1" env_logger = "0.7.1"
uuid = "0.6.5" uuid = "0.6.5"
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
openssl = "*"
u_lib = { version = "*", path = "../../lib/u_lib" } u_lib = { version = "*", path = "../../lib/u_lib" }
serde_json = "1.0.4"
serde = { version = "1.0.114", features = ["derive"] }

@ -1,20 +1,25 @@
use serde::Serialize;
use std::env;
use std::fmt;
use structopt::StructOpt; use structopt::StructOpt;
use u_lib::{api::ClientHandler, models::JobMeta, UError}; use u_lib::{
api::ClientHandler, messaging::AsMsg, models::JobMeta, utils::init_env, UError, UResult,
};
use uuid::Uuid; use uuid::Uuid;
const DELIM: &'static str = "*************\n";
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
struct Args { struct Args {
#[structopt(subcommand)] #[structopt(subcommand)]
cmd: Cmd, cmd: Cmd,
#[structopt(long)]
json: bool,
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
enum Cmd { enum Cmd {
Agents(LD), Agents(LD),
Jobs(JobALD), Jobs(JobALD),
Jobmap(JmALD), Jobmap(JobMapALD),
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
@ -23,6 +28,9 @@ enum JobALD {
#[structopt(long, parse(try_from_str = parse_uuid))] #[structopt(long, parse(try_from_str = parse_uuid))]
agent: Option<Uuid>, agent: Option<Uuid>,
#[structopt(long)]
alias: String,
#[structopt(subcommand)] #[structopt(subcommand)]
cmd: JobCmd, cmd: JobCmd,
}, },
@ -37,13 +45,12 @@ enum JobCmd {
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
enum JmALD { enum JobMapALD {
Add { Add {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
agent_uid: Uuid, agent_uid: Uuid,
#[structopt(parse(try_from_str = parse_uuid))] job_idents: Vec<String>,
job_uids: Vec<Uuid>,
}, },
List { List {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
@ -71,58 +78,71 @@ fn parse_uuid(src: &str) -> Result<Uuid, String> {
Uuid::parse_str(src).map_err(|e| e.to_string()) Uuid::parse_str(src).map_err(|e| e.to_string())
} }
async fn process_cmd(cmd: Cmd) -> Result<(), UError> { async fn process_cmd(args: Args) {
let cli_handler = ClientHandler::new(None).password("123qwe".to_string()); fn printer<Msg: AsMsg + fmt::Display>(data: UResult<Msg>, json: bool) {
match cmd { if json {
Cmd::Agents(action) => match action { #[derive(Serialize)]
LD::List { uid } => cli_handler #[serde(rename_all = "lowercase")]
.get_agents(uid) #[serde(tag = "status", content = "data")]
.await? enum DataResult<M> {
.into_iter() Ok(M),
.for_each(|r| println!("{}{}", DELIM, r)), Err(UError),
LD::Delete { uid } => { }
println!("{}", cli_handler.del(Some(uid)).await?);
let data = match data {
Ok(r) => DataResult::Ok(r),
Err(e) => DataResult::Err(e),
};
println!("{}", serde_json::to_string_pretty(&data).unwrap());
} else {
match data {
Ok(r) => println!("{}", r),
Err(e) => eprintln!("Error: {}", e),
} }
}
}
let token = env::var("ADMIN_AUTH_TOKEN").expect("Authentication token is not set");
let cli_handler = ClientHandler::new(None).password(token);
let json = args.json;
match args.cmd {
Cmd::Agents(action) => match action {
LD::List { uid } => printer(cli_handler.get_agents(uid).await, json),
LD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json),
}, },
Cmd::Jobs(action) => match action { Cmd::Jobs(action) => match action {
JobALD::Add { JobALD::Add {
cmd: JobCmd::Cmd(cmd), cmd: JobCmd::Cmd(cmd),
agent, alias,
agent: _agent,
} => { } => {
let job = JobMeta::from_shell(cmd.join(" "))?; let job = JobMeta::builder()
let job_uid = job.id; .with_shell(cmd.join(" "))
cli_handler.upload_jobs(&vec![job]).await?; .with_alias(alias)
if agent.is_some() { .build()
cli_handler.set_jobs(agent, &vec![job_uid]).await? .unwrap();
} printer(cli_handler.upload_jobs(&[job]).await, json);
}
JobALD::LD(LD::List { uid }) => cli_handler
.get_jobs(uid)
.await?
.into_iter()
.for_each(|r| println!("{}{}", DELIM, r)),
JobALD::LD(LD::Delete { uid }) => {
println!("{}", cli_handler.del(Some(uid)).await?)
} }
JobALD::LD(LD::List { uid }) => printer(cli_handler.get_jobs(uid).await, json),
JobALD::LD(LD::Delete { uid }) => printer(cli_handler.del(Some(uid)).await, json),
}, },
Cmd::Jobmap(action) => match action { Cmd::Jobmap(action) => match action {
JmALD::Add { JobMapALD::Add {
agent_uid, agent_uid,
job_uids, job_idents,
} => cli_handler.set_jobs(Some(agent_uid), &job_uids).await?, } => printer(
JmALD::List { uid } => cli_handler cli_handler.set_jobs(Some(agent_uid), &job_idents).await,
.get_agent_jobs(uid) json,
.await? ),
.into_iter() JobMapALD::List { uid } => printer(cli_handler.get_agent_jobs(uid).await, json),
.for_each(|r| println!("{}{}", DELIM, r)), JobMapALD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json),
JmALD::Delete { uid } => println!("{}", cli_handler.del(Some(uid)).await?),
}, },
} }
Ok(())
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), UError> { async fn main() {
init_env();
let args: Args = Args::from_args(); let args: Args = Args::from_args();
process_cmd(args.cmd).await process_cmd(args).await;
} }

@ -5,14 +5,16 @@ name = "u_server"
version = "0.1.0" version = "0.1.0"
[dependencies] [dependencies]
dotenv = "0.15.0"
env_logger = "0.7.1"
log = "0.4.11" log = "0.4.11"
simplelog = "0.10"
thiserror = "*" thiserror = "*"
warp = "0.2.4" warp = "0.2.4"
uuid = { version = "0.6.5", features = ["serde", "v4"] } uuid = { version = "0.6.5", features = ["serde", "v4"] }
once_cell = "1.7.2" once_cell = "1.7.2"
hyper = "0.13.10" hyper = "0.13.10"
mockall = "0.9.1"
mockall_double = "0.2"
openssl = "*"
[dependencies.diesel] [dependencies.diesel]
features = ["postgres", "uuid"] features = ["postgres", "uuid"]
@ -30,3 +32,13 @@ version = "0.2.22"
path = "../../lib/u_lib" path = "../../lib/u_lib"
version = "*" version = "*"
[dev-dependencies]
test-case = "1.1.0"
[lib]
name = "u_server_lib"
path = "src/lib.rs"
[[bin]]
name = "u_server"
path = "src/main.rs"

@ -1,5 +1,4 @@
use diesel::{pg::PgConnection, prelude::*, result::Error as DslError}; use diesel::{pg::PgConnection, prelude::*, result::Error as DslError};
use dotenv::dotenv;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::{ use std::{
env, env,
@ -17,19 +16,19 @@ pub struct UDB {
static DB: OnceCell<Arc<Mutex<UDB>>> = OnceCell::new(); static DB: OnceCell<Arc<Mutex<UDB>>> = OnceCell::new();
pub fn lock_db() -> MutexGuard<'static, UDB> { #[cfg_attr(test, automock)]
DB.get_or_init(|| {
dotenv().unwrap();
let db_path = env::var("DATABASE_URL").unwrap();
let conn = PgConnection::establish(&db_path).unwrap();
let instance = UDB { conn };
Arc::new(Mutex::new(instance))
})
.lock()
.unwrap()
}
impl UDB { impl UDB {
pub fn lock_db() -> MutexGuard<'static, UDB> {
DB.get_or_init(|| {
let db_path = env::var("DATABASE_URL").unwrap();
let conn = PgConnection::establish(&db_path).unwrap();
let instance = UDB { conn };
Arc::new(Mutex::new(instance))
})
.lock()
.unwrap()
}
pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> { pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> {
use schema::jobs; use schema::jobs;
diesel::insert_into(jobs::table) diesel::insert_into(jobs::table)
@ -117,7 +116,11 @@ impl UDB {
Ok(result) Ok(result)
} }
pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &[Uuid]) -> ULocalResult<()> { pub fn set_jobs_for_agent(
&self,
agent_uid: &Uuid,
job_uids: &[Uuid],
) -> ULocalResult<Vec<Uuid>> {
use schema::{agents::dsl::agents, jobs::dsl::jobs, results}; use schema::{agents::dsl::agents, jobs::dsl::jobs, results};
if let Err(DslError::NotFound) = agents.find(agent_uid).first::<Agent>(&self.conn) { if let Err(DslError::NotFound) = agents.find(agent_uid).first::<Agent>(&self.conn) {
return Err(ULocalError::NotFound(agent_uid.to_string())); return Err(ULocalError::NotFound(agent_uid.to_string()));
@ -146,7 +149,8 @@ impl UDB {
diesel::insert_into(results::table) diesel::insert_into(results::table)
.values(&job_requests) .values(&job_requests)
.execute(&self.conn)?; .execute(&self.conn)?;
Ok(()) let assigned_uids = job_requests.iter().map(|aj| aj.id).collect();
Ok(assigned_uids)
} }
pub fn del_jobs(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> { pub fn del_jobs(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {

@ -0,0 +1,88 @@
use crate::handlers::Endpoints;
use serde::de::DeserializeOwned;
use std::env;
use u_lib::{
messaging::{AsMsg, BaseMessage},
models::*,
};
use uuid::Uuid;
use warp::{body, Filter, Rejection, Reply};
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
}
pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) };
let get_agents = warp::get()
.and(warp::path("get_agents"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_agents);
let upload_jobs = warp::post()
.and(warp::path("upload_jobs"))
.and(get_content::<Vec<JobMeta>>())
.and_then(Endpoints::upload_jobs);
let get_jobs = warp::get()
.and(warp::path("get_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_jobs);
let get_agent_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(|uid| Endpoints::get_agent_jobs(uid, false));
let get_personal_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(warp::path::param::<Uuid>().map(Some))
.and_then(|uid| Endpoints::get_agent_jobs(uid, true));
let del = warp::get()
.and(warp::path("del"))
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::del);
let set_jobs = warp::post()
.and(warp::path("set_jobs"))
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<String>>())
.and_then(Endpoints::set_jobs);
let report = warp::post()
.and(warp::path("report"))
.and(get_content::<Vec<ExecResult>>().and_then(Endpoints::report));
let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str();
let auth_header = warp::header::exact("authorization", Box::leak(auth_token));
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report);
let auth_zone = auth_header.and(
get_agents
.or(get_jobs)
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs),
);
agent_zone.or(auth_zone)
}

@ -1,10 +1,10 @@
use crate::db::{lock_db, UDB}; use crate::db::UDB;
use diesel::SaveChangesDsl; use diesel::SaveChangesDsl;
use hyper::Body; use hyper::Body;
use serde::Serialize; use serde::Serialize;
use u_lib::{ use u_lib::{
messaging::{AsMsg, BaseMessage}, messaging::{AsMsg, BaseMessage},
models::{Agent, AssignedJob, ExecResult, JobMeta, JobState}, models::{Agent, AgentState, AssignedJob, ExecResult, JobMeta, JobState},
ULocalError, ULocalError,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -13,130 +13,156 @@ use warp::{
Rejection, Reply, Rejection, Reply,
}; };
fn build_response<S: Into<Body>>(code: StatusCode, body: S) -> Response<Body> { pub fn build_response<S: Into<Body>>(code: StatusCode, body: S) -> Response<Body> {
Response::builder().status(code).body(body.into()).unwrap() Response::builder().status(code).body(body.into()).unwrap()
} }
fn build_ok<S: Into<Body>>(body: S) -> Response<Body> { pub fn build_ok<S: Into<Body>>(body: S) -> Response<Body> {
build_response(StatusCode::OK, body) build_response(StatusCode::OK, body)
} }
fn build_err<S: ToString>(body: S) -> Response<Body> { pub fn build_err<S: ToString>(body: S) -> Response<Body> {
build_response(StatusCode::BAD_REQUEST, body.to_string()) build_response(StatusCode::BAD_REQUEST, body.to_string())
} }
fn build_message<M: AsMsg + Serialize>(m: M) -> Response<Body> { pub fn build_message<M: AsMsg + Serialize>(m: M) -> Response<Body> {
warp::reply::json(&m.as_message()).into_response() warp::reply::json(&m.as_message()).into_response()
} }
pub async fn add_agent(msg: Agent) -> Result<impl Reply, Rejection> { pub struct Endpoints;
debug!("hnd: add_agent");
lock_db()
.insert_agent(&msg)
.map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_agents(uid: Option<Uuid>) -> Result<impl Reply, Rejection> { #[cfg_attr(test, automock)]
debug!("hnd: get_agents"); impl Endpoints {
lock_db() pub async fn add_agent(msg: Agent) -> Result<Response<Body>, Rejection> {
.get_agents(uid) info!("hnd: add_agent");
.map(|m| build_message(m)) UDB::lock_db()
.or_else(|e| Ok(build_err(e))) .insert_agent(&msg)
} .map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_jobs(uid: Option<Uuid>) -> Result<impl Reply, Rejection> { pub async fn get_agents(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
debug!("hnd: get_jobs"); info!("hnd: get_agents");
lock_db() UDB::lock_db()
.get_jobs(uid) .get_agents(uid)
.map(|m| build_message(m)) .map(|m| build_message(m))
.or_else(|e| Ok(build_err(e))) .or_else(|e| Ok(build_err(e)))
} }
pub async fn get_agent_jobs(uid: Option<Uuid>, personal: bool) -> Result<impl Reply, Rejection> { pub async fn get_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
info!("hnd: get_agent_jobs {}", personal); info!("hnd: get_jobs");
if personal { UDB::lock_db()
let agents = lock_db().get_agents(uid).unwrap(); .get_jobs(uid)
if agents.len() == 0 { .map(|m| build_message(m))
let db = lock_db(); .or_else(|e| Ok(build_err(e)))
db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap(); }
let job = db.find_job_by_alias("agent_hello").unwrap();
if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) { pub async fn get_agent_jobs(
return Ok(build_err(e)); uid: Option<Uuid>,
personal: bool,
) -> Result<Response<Body>, Rejection> {
info!("hnd: get_agent_jobs {}", personal);
if personal {
let agents = UDB::lock_db().get_agents(uid).unwrap();
if agents.len() == 0 {
let db = UDB::lock_db();
db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap();
let job = db.find_job_by_alias("agent_hello").unwrap();
if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) {
return Ok(build_err(e));
}
} }
} }
} let result = UDB::lock_db().get_exact_jobs(uid, personal);
let result = lock_db().get_exact_jobs(uid, personal); match result {
match result { Ok(r) => {
Ok(r) => { if personal {
let db = lock_db(); let db = UDB::lock_db();
for j in r.iter() { for j in r.iter() {
db.update_job_status(j.id, JobState::Running).ok(); db.update_job_status(j.id, JobState::Running).ok();
}
}
Ok(build_message(r))
} }
Ok(build_message(r)) Err(e) => Ok(build_err(e)),
} }
Err(e) => Ok(build_err(e)),
} }
}
pub async fn upload_jobs(msg: BaseMessage<'_, Vec<JobMeta>>) -> Result<impl Reply, Rejection> { pub async fn upload_jobs(
debug!("hnd: upload_jobs"); msg: BaseMessage<'static, Vec<JobMeta>>,
lock_db() ) -> Result<Response<Body>, Rejection> {
.insert_jobs(&msg.into_inner()) info!("hnd: upload_jobs");
.map(|_| build_ok("")) UDB::lock_db()
.or_else(|e| Ok(build_err(e))) .insert_jobs(&msg.into_inner())
} .map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e)))
}
pub async fn del(uid: Uuid) -> Result<impl Reply, Rejection> { pub async fn del(uid: Uuid) -> Result<Response<Body>, Rejection> {
debug!("hnd: del"); info!("hnd: del");
let db = lock_db(); let db = UDB::lock_db();
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns { for del_fn in del_fns {
let affected = del_fn(&db, &vec![uid]).unwrap(); let affected = del_fn(&db, &vec![uid]).unwrap();
if affected > 0 { if affected > 0 {
return Ok(build_ok(affected.to_string())); return Ok(build_ok(affected.to_string()));
}
} }
Ok(build_err("0"))
} }
Ok(build_err("0"))
}
pub async fn set_jobs( pub async fn set_jobs(
agent_uid: Uuid, agent_uid: Uuid,
msg: BaseMessage<'_, Vec<Uuid>>, msg: BaseMessage<'static, Vec<String>>,
) -> Result<impl Reply, Rejection> { ) -> Result<Response<Body>, Rejection> {
debug!("hnd: set_jobs"); info!("hnd: set_jobs_by_alias");
lock_db() let jobs: Result<Vec<Uuid>, ULocalError> = msg
.set_jobs_for_agent(&agent_uid, &msg.into_inner()) .into_inner()
.map(|_| build_ok("")) .into_iter()
.or_else(|e| Ok(build_err(e))) .map(|ident| {
} Uuid::parse_str(&ident)
.or_else(|_| UDB::lock_db().find_job_by_alias(&ident).map(|j| j.id))
})
.collect();
match jobs {
Ok(j) => UDB::lock_db()
.set_jobs_for_agent(&agent_uid, &j)
.map(|assigned_uids| build_message(assigned_uids))
.or_else(|e| Ok(build_err(e))),
Err(e) => Ok(build_err(e)),
}
}
pub async fn report(msg: BaseMessage<'_, Vec<ExecResult>>) -> Result<impl Reply, Rejection> { pub async fn report(
debug!("hnd: report"); msg: BaseMessage<'static, Vec<ExecResult>>,
let id = msg.id; ) -> Result<Response<Body>, Rejection> {
let mut failed = vec![]; info!("hnd: report");
for entry in msg.into_inner() { let id = msg.id;
match entry { let mut failed = vec![];
ExecResult::Assigned(res) => { for entry in msg.into_inner() {
if id != res.agent_id { match entry {
continue; ExecResult::Assigned(res) => {
if id != res.agent_id {
continue;
}
let db = UDB::lock_db();
if let Err(e) = res
.save_changes::<AssignedJob>(&db.conn)
.map_err(ULocalError::from)
{
failed.push(e.to_string())
}
} }
let db = lock_db(); ExecResult::Agent(mut a) => {
if let Err(e) = res a.state = AgentState::Active;
.save_changes::<AssignedJob>(&db.conn) Self::add_agent(a).await?;
.map_err(ULocalError::from)
{
failed.push(e.to_string())
} }
} }
ExecResult::Agent(a) => {
add_agent(a).await?;
}
} }
if failed.len() > 0 {
let err_msg = ULocalError::ProcessingError(failed.join(", "));
return Ok(build_err(err_msg));
}
Ok(build_ok(""))
} }
if failed.len() > 0 {
let err_msg = ULocalError::ProcessingError(failed.join(", "));
return Ok(build_err(err_msg));
}
Ok(build_ok(""))
} }

@ -0,0 +1,102 @@
#[macro_use]
extern crate log;
#[macro_use]
extern crate mockall;
#[macro_use]
extern crate mockall_double;
// because of linking errors
extern crate openssl;
#[macro_use]
extern crate diesel;
//
mod db;
mod filters;
mod handlers;
use db::UDB;
use filters::make_filters;
use u_lib::{config::MASTER_PORT, models::*, utils::init_env};
use warp::Filter;
const LOGFILE: &str = "u_server.log";
fn prefill_jobs() {
let agent_hello = JobMeta::builder()
.with_type(misc::JobType::Manage)
.with_alias("agent_hello")
.build()
.unwrap();
UDB::lock_db().insert_jobs(&[agent_hello]).ok();
}
fn init_logger() {
use simplelog::*;
use std::fs::OpenOptions;
let log_cfg = ConfigBuilder::new()
.set_time_format_str("%x %X")
.set_time_to_local(true)
.build();
let logfile = OpenOptions::new()
.append(true)
.create(true)
.open(LOGFILE)
.unwrap();
let loggers = vec![
WriteLogger::new(LevelFilter::Info, log_cfg.clone(), logfile) as Box<dyn SharedLogger>,
TermLogger::new(
LevelFilter::Info,
log_cfg,
TerminalMode::Stderr,
ColorChoice::Auto,
),
];
CombinedLogger::init(loggers).unwrap();
}
fn init_all() {
init_logger();
init_env();
prefill_jobs();
}
pub async fn serve() {
init_all();
let routes = make_filters();
warp::serve(routes.with(warp::log("warp")))
.run(([0, 0, 0, 0], MASTER_PORT))
.await;
}
#[cfg(test)]
mod tests {
use super::*;
#[double]
use crate::handlers::Endpoints;
use handlers::build_ok;
use mockall::predicate::*;
use test_case::test_case;
use uuid::Uuid;
use warp::test::request;
#[test_case(Some(Uuid::new_v4()))]
#[test_case(None => panics)]
#[tokio::test]
async fn test_get_agent_jobs_unauthorized(uid: Option<Uuid>) {
let mock = Endpoints::get_agent_jobs_context();
mock.expect()
.with(eq(uid), eq(uid.is_some()))
.returning(|_, _| Ok(build_ok("")));
request()
.path(&format!(
"/get_agent_jobs/{}",
uid.map(|u| u.simple().to_string()).unwrap_or(String::new())
))
.method("GET")
.filter(&make_filters())
.await
.unwrap();
mock.checkpoint();
}
}

@ -1,125 +1,6 @@
mod db; use u_server_lib::serve;
mod handlers;
use warp::{body, Filter, Rejection};
#[macro_use]
extern crate log;
extern crate env_logger;
use db::lock_db;
use serde::de::DeserializeOwned;
use u_lib::{
config::MASTER_PORT,
messaging::{AsMsg, BaseMessage},
models::*,
};
use uuid::Uuid;
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
}
fn prefill_jobs() {
let agent_hello = JobMeta::builder()
.with_type(misc::JobType::Manage)
.with_alias("agent_hello")
.build()
.unwrap();
lock_db().insert_jobs(&[agent_hello]).ok();
}
fn init() {
env_logger::init();
prefill_jobs();
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
init(); serve().await;
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) };
let get_agents = warp::get()
.and(warp::path("get_agents"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(handlers::get_agents);
let upload_jobs = warp::post()
.and(warp::path("upload_jobs"))
.and(get_content::<Vec<JobMeta>>())
.and_then(handlers::upload_jobs);
let get_jobs = warp::get()
.and(warp::path("get_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(handlers::get_jobs);
let get_agent_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(|uid| handlers::get_agent_jobs(uid, false));
let get_personal_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(warp::path::param::<Uuid>().map(Some))
.and_then(|uid| handlers::get_agent_jobs(uid, true));
let del = warp::get()
.and(warp::path("del"))
.and(warp::path::param::<Uuid>())
.and_then(handlers::del);
let set_jobs = warp::post()
.and(warp::path("set_jobs"))
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<Uuid>>())
.and_then(handlers::set_jobs);
let report = warp::post()
.and(warp::path("report"))
.and(get_content::<Vec<ExecResult>>().and_then(handlers::report));
let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report);
let auth_zone = auth_token.and(
get_agents
.or(get_jobs)
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs),
);
let routes = agent_zone.or(auth_zone);
warp::serve(routes.with(warp::log("warp")))
.run(([0, 0, 0, 0], MASTER_PORT))
.await;
}
/*
#[cfg(test)]
mod tests {
use super::*;
/*
#[tokio::test]
async fn test_gather() {
}
*/
} }
*/

@ -0,0 +1,23 @@
[package]
name = "integration"
version = "0.1.0"
authors = ["plazmoid <kronos44@mail.ru>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] }
log = "^0.4"
env_logger = "0.8.3"
uuid = { version = "0.6.5", features = ["serde", "v4"] }
reqwest = { version = "0.11", features = ["json"] }
serde_json = "1.0"
serde = { version = "1.0.114", features = ["derive"] }
futures = "0.3.5"
shlex = "1.0.0"
[[test]]
name = "integration"
path = "tests/tests.rs"

@ -0,0 +1,89 @@
version: "2.1"
networks:
u_net:
services:
u_server:
image: unki/u_server
networks:
- u_net
volumes:
- ../target/x86_64-unknown-linux-musl/release/u_server:/u_server
- ../:/unki/
working_dir: /unki
command: bash -c "diesel setup && diesel migration run && /u_server"
depends_on:
u_db:
condition: service_healthy
expose:
- '63714'
environment:
RUST_LOG: warp=info
env_file:
- ../.env
healthcheck:
test: /bin/ss -tlpn | grep 63714
interval: 5s
timeout: 2s
retries: 2
u_db:
image: unki/u_db
networks:
- u_net
expose:
- '5432'
env_file:
- ../.env
healthcheck:
test: /bin/ss -tlpn | grep 5432
interval: 5s
timeout: 2s
retries: 2
u_agent_1:
image: unki/u_agent
networks:
- u_net
volumes:
- ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent
command: /u_agent u_server
depends_on:
u_server:
condition: service_healthy
u_agent_2:
image: unki/u_agent
networks:
- u_net
volumes:
- ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent
command: /u_agent u_server
depends_on:
u_server:
condition: service_healthy
tests_runner:
image: unki/tests_runner
networks:
- u_net
volumes:
- ./:/tests/
- ../target/x86_64-unknown-linux-musl/release/u_panel:/u_panel
- ~/.cargo/registry:/root/.cargo/registry
working_dir:
/tests/
env_file:
- ../.env
depends_on:
u_agent_1:
condition: service_started
u_agent_2:
condition: service_started
u_server:
condition: service_healthy
environment:
RUST_BACKTRACE: 1
U_SERVER: u_server

@ -0,0 +1,81 @@
import subprocess
from utils import *
BASE_IMAGE_DIR = 'images'
DOCKERFILES = {
'u_agent': {
'ctx': BASE_IMAGE_DIR,
'dockerfile_prefix': 'u_agent'
},
'u_server': {
'ctx': BASE_IMAGE_DIR,
'dockerfile_prefix': 'u_server'
},
'u_db': {
'ctx': BASE_IMAGE_DIR,
'dockerfile_prefix': 'u_db'
},
'tests_runner': {
'ctx': BASE_IMAGE_DIR,
'dockerfile_prefix': 'tests_runner'
},
}
def docker(args):
cmd = ['docker'] + args
log(f'Running docker command: {cmd}')
return subprocess.run(
cmd,
check=True,
)
def print_errors(errors):
err_msg = '\n'.join(
' {container}: {error}'.format(container=item['container'],
error=item['error'])
for item in errors)
err('There are some errors in next containers:\n%s' % err_msg)
def check_state(containers):
errors = []
for container in containers:
ret, out = subprocess.getstatusoutput(
'docker inspect --format \'{{ .State.Running }}\' %s'
% container)
out = out.strip()
if ret == 0:
if out == 'true':
continue
else:
errors.append({'container': container,
'error': 'Bad state: Running=%s' % out})
else:
errors.append({'container': container,
'error': out})
return errors
def rebuild_images_if_needed(force_rebuild=False):
for img_name, data in DOCKERFILES.items():
ctx = data['ctx']
df_prefix = data.get('dockerfile_prefix')
df_suffix = 'Dockerfile'
img_name = f'unki/{img_name}'
log(f'Building docker image {img_name}')
cmd = [
'build',
'-t',
img_name,
ctx,
]
if df_prefix:
cmd += ['-f', f'{ctx}/{df_prefix}.{df_suffix}']
if force_rebuild:
cmd += ['--no-cache']
docker(cmd)

@ -0,0 +1,66 @@
import subprocess
import shlex
from utils import *
from docker import docker, check_state, print_errors
class Compose:
ALL_CONTAINERS = [
'u_agent_1',
'u_agent_2',
'u_server',
'u_db',
'tests_runner',
]
def __init__(self):
self.container_tpl = 'integration_%s_1'
self.cmd_container = self.container_tpl % 'tests_runner'
self.ALL_CONTAINERS = [self.container_tpl % c for c in self.ALL_CONTAINERS]
def _call(self, *args):
subprocess.check_call([
'docker-compose',
'--no-ansi',
] + list(args)
)
def up(self):
log('Instanciating cluster')
self._call('up', '-d')
log('Ok')
def down(self):
log('Shutting down cluster')
self._call('down')
log('Ok')
def stop(self):
log('Stopping cluster')
self._call('stop')
log('Ok')
def run(self, cmd):
container = self.cmd_container
if isinstance(cmd, str):
cmd = shlex.split(cmd)
log(f'Running command "{cmd}" in container {container}')
result = docker([
'exec',
'-ti',
container
] + cmd)
log('Ok')
return result
def is_alive(self):
log('Check if all containers are alive')
errors = check_state(self.ALL_CONTAINERS)
log('Check done')
if errors:
print_errors(errors)
raise TestsError('Error during `is_alive` check')
else:
log('All containers are alive')

@ -0,0 +1,4 @@
FROM rust:1.53
RUN rustup target add x86_64-unknown-linux-musl
CMD ["sleep", "3600"]

@ -0,0 +1,3 @@
FROM centos:7
RUN yum update -y

@ -0,0 +1,3 @@
FROM postgres:13.3
RUN apt update && apt -y upgrade && apt install -y iproute2

@ -0,0 +1,3 @@
FROM rust:1.53
RUN cargo install diesel_cli --no-default-features --features postgres

@ -0,0 +1,36 @@
import signal
import sys
from utils import *
from docker import rebuild_images_if_needed
from docker_compose import Compose
cluster = Compose()
def abort_handler(s, _):
warn(f'Received signal: {s}')
warn(f'Gracefully stopping...')
cluster.down()
def run_tests():
force_rebuild = '--rebuild' in sys.argv
preserve_containers = '--preserve' in sys.argv
for s in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP):
signal.signal(s, abort_handler)
rebuild_images_if_needed(force_rebuild)
try:
cluster.up()
cluster.is_alive()
cluster.run('cargo test --test integration')
except Exception as e:
err(e)
sys.exit(1)
finally:
if not preserve_containers:
cluster.down()
if __name__ == '__main__':
run_tests()

@ -0,0 +1,3 @@
#!/bin/bash
set -e
python integration_tests.py $@

@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}

@ -0,0 +1,48 @@
use reqwest::{Client, RequestBuilder, Url};
use serde::Serialize;
use serde_json::{from_str, json, Value};
const SERVER: &str = "u_server";
const PORT: &str = "63714";
pub struct AgentClient {
client: Client,
base_url: Url,
}
impl AgentClient {
pub fn new() -> Self {
Self {
client: Client::new(),
base_url: Url::parse(&format!("http://{}:{}", SERVER, PORT)).unwrap(),
}
}
async fn process_request(&self, req: RequestBuilder, resp_needed: bool) -> Value {
let resp = req.send().await.unwrap();
if let Err(e) = resp.error_for_status_ref() {
panic!(
"Server responded with code {}\nError: {}",
e.status()
.map(|s| s.to_string())
.unwrap_or(String::from("<none>")),
e.to_string()
);
}
if !resp_needed {
return json!([]);
}
let resp: Value = from_str(&resp.text().await.unwrap()).unwrap();
resp.get("inner").unwrap().get(0).unwrap().clone()
}
pub async fn get<S: AsRef<str>>(&self, url: S) -> Value {
let req = self.client.get(self.base_url.join(url.as_ref()).unwrap());
self.process_request(req, true).await
}
pub async fn post<S: AsRef<str>, B: Serialize>(&self, url: S, body: &B) -> Value {
let req = self.client.post(self.base_url.join(url.as_ref()).unwrap());
self.process_request(req.json(body), false).await
}
}

@ -0,0 +1,5 @@
pub mod client;
pub mod panel;
pub use client::AgentClient;
pub use panel::Panel;

@ -0,0 +1,53 @@
use serde_json::{from_slice, Value};
use shlex::split;
use std::process::{Command, Output};
const PANEL_BINARY: &str = "/u_panel";
pub struct Panel;
impl Panel {
fn run(args: &[&str]) -> Output {
Command::new(PANEL_BINARY)
.arg("--json")
.args(args)
.output()
.unwrap()
}
pub fn output_argv(args: &[&str]) -> Value {
let result = Self::run(args);
assert!(result.status.success());
from_slice(&result.stdout).unwrap()
}
pub fn output<S: Into<String>>(args: S) -> Value {
let splitted = split(args.into().as_ref()).unwrap();
Self::output_argv(
splitted
.iter()
.map(|s| s.as_ref())
.collect::<Vec<&str>>()
.as_ref(),
)
}
fn status_is_ok(data: &Value) {
assert_eq!(
data["status"], "ok",
"Panel failed with erroneous status: {}",
data["data"]
);
}
pub fn check_status<S: Into<String>>(args: S) {
let result = Self::output(args);
Self::status_is_ok(&result);
}
pub fn check_output<S: Into<String>>(args: S) -> Vec<Value> {
let result = Self::output(args);
Self::status_is_ok(&result);
result["data"].as_array().unwrap().clone()
}
}

@ -0,0 +1,80 @@
mod helpers;
use helpers::{AgentClient, Panel};
use serde_json::json;
use std::thread::sleep;
use std::time::Duration;
use uuid::Uuid;
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
async fn register_agent() -> Uuid {
let cli = AgentClient::new();
let agent_uid = Uuid::new_v4();
let resp = cli.get(format!("get_agent_jobs/{}", agent_uid)).await;
let job_id = &resp["job_id"];
let resp = cli.get(format!("get_jobs/{}", job_id)).await;
assert_eq!(&resp["alias"], "agent_hello");
let agent_data = json! {
{"id": &agent_uid,"inner":[
{"Agent":
{"alias":null,
"hostname":"3b1030fa6324",
"id":&agent_uid,
"is_root":false,
"is_root_allowed":false,
"last_active":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814921},
"platform":"x86_64-unknown-linux-gnu",
"regtime":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814945},
"state":"New",
"token":null,
"username":"root"}
}
]}
};
cli.post("report", &agent_data).await;
agent_uid
}
#[tokio::test]
async fn test_registration() -> TestResult {
let agent_uid = register_agent().await;
let agents = Panel::check_output("agents list");
let found = agents
.iter()
.find(|v| v["id"].as_str().unwrap() == agent_uid.to_string());
assert!(found.is_some());
Ok(())
}
#[tokio::test]
async fn test_setup_tasks() -> TestResult {
let agent_uid = Panel::check_output("agents list")[0]["id"].clone();
dbg!(&agent_uid);
let job_alias = "passwd_contents";
let cmd = format!("jobs add --alias {} 'cat /etc/passwd'", job_alias);
Panel::check_status(cmd);
let cmd = format!("jobmap add {} {}", agent_uid, job_alias);
let assigned_uids = Panel::check_output(cmd);
dbg!(&assigned_uids);
loop {
let result = Panel::check_output(format!("jobmap list {}", assigned_uids[0]));
dbg!(&result);
match result.get(0) {
Some(entry) if entry["state"] == "Finished" => {
println!("{}", result[0]);
break;
}
None => {
eprintln!("jobmap list is empty (bad bad bad)");
continue;
}
_ => {
sleep(Duration::from_secs(1));
eprintln!("waiting for task");
}
}
}
Ok(())
}

@ -0,0 +1,32 @@
from termcolor import colored
__all__ = ['log', 'warn', 'err', 'TestsError']
class TestsError(Exception):
pass
COLORS = {
'question': colored('[?]', 'magenta'),
'info': colored('[~]', 'green'),
'warning': colored('[!]', 'yellow'),
'error': colored('[X]', 'red'),
}
def warn(msg):
log(msg, log_lvl='w')
def err(msg):
log(msg, log_lvl='e')
def log(msg, log_lvl='i'):
for lvl, text in COLORS.items():
if lvl.startswith(log_lvl):
print(f'{text} {msg}')
break
else:
ValueError('Unknown log level')

@ -7,6 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
dotenv = "0.15.0"
serde = { version = "1.0.114", features = ["derive"] } serde = { version = "1.0.114", features = ["derive"] }
uuid = { version = "0.6.5", features = ["serde", "v4"] } uuid = { version = "0.6.5", features = ["serde", "v4"] }
nix = "0.17" nix = "0.17"
@ -14,10 +15,12 @@ libc = "^0.2"
lazy_static = "1.4.0" lazy_static = "1.4.0"
tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] } tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] }
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
openssl = "*"
futures = "0.3.5" futures = "0.3.5"
guess_host_triple = "0.1.2" guess_host_triple = "0.1.2"
thiserror = "*" thiserror = "*"
log = "*" log = "*"
mockall = "0.9.1"
env_logger = "0.8.3" env_logger = "0.8.3"
diesel-derive-enum = { version = "1", features = ["postgres"] } diesel-derive-enum = { version = "1", features = ["postgres"] }
chrono = "0.4.19" chrono = "0.4.19"

@ -0,0 +1,14 @@
use std::path::PathBuf;
use std::process::Command;
fn main() {
let echoer = PathBuf::from("./tests/fixtures/echoer");
let mut echoer_src = echoer.clone();
echoer_src.set_extension("rs");
Command::new("rustc")
.args(&[echoer_src.to_str().unwrap(), "-o", echoer.to_str().unwrap()])
.status()
.unwrap();
println!("cargo:rerun-if-changed={}", echoer_src.display());
println!("cargo:rerun-if-changed={}", echoer.display());
}

@ -2,12 +2,12 @@
use crate::{ use crate::{
config::{MASTER_PORT, MASTER_SERVER}, config::{MASTER_PORT, MASTER_SERVER},
messaging::{AsMsg, BaseMessage}, messaging::{AsMsg, BaseMessage},
models::*, models,
utils::opt_to_string, utils::{opt_to_string, VecDisplay},
UError, UResult, UError, UResult,
}; };
use reqwest::{Client, RequestBuilder, Url}; use reqwest::{Client, RequestBuilder, Url};
use std::{net::Ipv4Addr, str::FromStr}; use std::env;
use u_api_proc_macro::api_route; use u_api_proc_macro::api_route;
use uuid::Uuid; use uuid::Uuid;
@ -18,10 +18,9 @@ pub struct ClientHandler {
} }
impl ClientHandler { impl ClientHandler {
pub fn new(server: Option<String>) -> Self { pub fn new(server: Option<&str>) -> Self {
let master_server = server let env_server = env::var("U_SERVER").unwrap_or(String::from(MASTER_SERVER));
.map(|s| Ipv4Addr::from_str(&s).unwrap()) let master_server = server.unwrap_or(env_server.as_str());
.unwrap_or(MASTER_SERVER);
Self { Self {
client: Client::new(), client: Client::new(),
base_url: Url::parse(&format!("http://{}:{}", master_server, MASTER_PORT)).unwrap(), base_url: Url::parse(&format!("http://{}:{}", master_server, MASTER_PORT)).unwrap(),
@ -53,24 +52,24 @@ impl ClientHandler {
// //
// get jobs for client // get jobs for client
#[api_route("GET")] #[api_route("GET")]
fn get_agent_jobs(&self, url_param: Option<Uuid>) -> Vec<AssignedJob> {} fn get_agent_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<models::AssignedJob> {}
// //
// send something to server // send something to server
#[api_route("POST")] #[api_route("POST")]
fn report<M: AsMsg>(&self, payload: &M) {} fn report<M: AsMsg>(&self, payload: &M) -> models::Empty {}
//#/////////#// Admin area //#////////#// //##########// Admin area //##########//
/// client listing /// client listing
#[api_route("GET")] #[api_route("GET")]
fn get_agents(&self, url_param: Option<Uuid>) -> Vec<Agent> {} fn get_agents(&self, url_param: Option<Uuid>) -> VecDisplay<models::Agent> {}
// //
// get all available jobs // get all available jobs
#[api_route("GET")] #[api_route("GET")]
fn get_jobs(&self, url_param: Option<Uuid>) -> Vec<JobMeta> {} fn get_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<models::JobMeta> {}
// //
// create and upload job // create and upload job
#[api_route("POST")] #[api_route("POST")]
fn upload_jobs(&self, payload: &Vec<JobMeta>) {} fn upload_jobs(&self, payload: &[models::JobMeta]) -> models::Empty {}
// //
// delete something // delete something
#[api_route("GET")] #[api_route("GET")]
@ -78,5 +77,5 @@ impl ClientHandler {
// //
// set jobs for client // set jobs for client
#[api_route("POST")] #[api_route("POST")]
fn set_jobs(&self, url_param: Option<Uuid>, payload: &Vec<Uuid>) {} fn set_jobs(&self, url_param: Option<Uuid>, payload: &[String]) -> VecDisplay<Uuid> {}
} }

@ -2,7 +2,7 @@ use crate::{
cache::JobCache, cache::JobCache,
executor::{FutRes, Waiter, DynFut}, executor::{FutRes, Waiter, DynFut},
models::{Agent, AssignedJob, JobMeta, JobType}, models::{Agent, AssignedJob, JobMeta, JobType},
utils::{CombinedResult, OneOrMany}, utils::{CombinedResult, OneOrVec},
UError, UError,
}; };
use guess_host_triple::guess_host_triple; use guess_host_triple::guess_host_triple;
@ -13,7 +13,7 @@ pub struct JobBuilder {
} }
impl JobBuilder { impl JobBuilder {
pub fn from_request<J: OneOrMany<AssignedJob>>(job_requests: J) -> CombinedResult<Self> { pub fn from_request<J: OneOrVec<AssignedJob>>(job_requests: J) -> CombinedResult<Self> {
let job_requests = job_requests.into_vec(); let job_requests = job_requests.into_vec();
let mut prepared: Vec<DynFut> = vec![]; let mut prepared: Vec<DynFut> = vec![];
let mut result = CombinedResult::new(); let mut result = CombinedResult::new();
@ -52,7 +52,7 @@ impl JobBuilder {
result result
} }
pub fn from_meta<J: OneOrMany<JobMeta>>(job_metas: J) -> CombinedResult<Self> { pub fn from_meta<J: OneOrVec<JobMeta>>(job_metas: J) -> CombinedResult<Self> {
let job_requests = job_metas let job_requests = job_metas
.into_vec() .into_vec()
.into_iter() .into_iter()
@ -90,7 +90,7 @@ pub struct NamedJobBuilder {
} }
impl NamedJobBuilder { impl NamedJobBuilder {
pub fn from_shell<J: OneOrMany<(&'static str, &'static str)>>( pub fn from_shell<J: OneOrVec<(&'static str, &'static str)>>(
named_jobs: J, named_jobs: J,
) -> CombinedResult<Self> { ) -> CombinedResult<Self> {
let mut result = CombinedResult::new(); let mut result = CombinedResult::new();
@ -111,7 +111,7 @@ impl NamedJobBuilder {
result result
} }
pub fn from_meta<J: OneOrMany<(&'static str, JobMeta)>>(named_jobs: J) -> Self { pub fn from_meta<J: OneOrVec<(&'static str, JobMeta)>>(named_jobs: J) -> Self {
let mut job_names = vec![]; let mut job_names = vec![];
let job_metas: Vec<JobMeta> = named_jobs let job_metas: Vec<JobMeta> = named_jobs
.into_vec() .into_vec()
@ -144,3 +144,172 @@ impl NamedJobBuilder {
self.pop_opt(name).unwrap() self.pop_opt(name).unwrap()
} }
} }
#[cfg(test)]
mod tests {
use test_case::test_case;
use std::{time::SystemTime};
use crate::{
errors::UError,
models::{
jobs::{JobMeta},
ExecResult,
misc::JobType
},
builder::{JobBuilder, NamedJobBuilder},
unwrap_enum,
};
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
#[test_case(
"/bin/sh {}",
Some(b"echo test01 > /tmp/asd; cat /tmp/asd"),
"test01"
;"sh payload"
)]
#[test_case(
r#"/usr/bin/python -c 'print("test02")'"#,
None,
"test02"
;"python cmd"
)]
#[test_case(
"/{}",
Some(
br#"#!/bin/sh
TMPPATH=/tmp/lol
mkdir -p $TMPPATH
echo test03 > $TMPPATH/t
cat $TMPPATH/t"#
),
"test03"
;"sh multiline payload"
)]
#[test_case(
"/{} 'some msg as arg'",
Some(include_bytes!("../tests/fixtures/echoer")),
"some msg as arg"
;"standalone binary with args"
)]
#[tokio::test]
async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult {
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let job = job.build().unwrap();
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let result = unwrap_enum!(job_result, ExecResult::Assigned);
let result = result.to_string_result().unwrap();
assert_eq!(result.trim(), expected_result);
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> TestResult {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await;
let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one()
.wait_one()
.await;
let ls = unwrap_enum!(ls, ExecResult::Assigned);
assert_eq!(ls.retcode.unwrap(), 0);
let folders = ls.to_string_result().unwrap();
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap())
.collect();
let ls_subfolders = JobBuilder::from_meta(subfolders_jobs)
.unwrap_one()
.wait()
.await;
for result in ls_subfolders {
let result = unwrap_enum!(result, ExecResult::Assigned);
assert_eq!(result.retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> TestResult {
let job = JobMeta::from_shell("lol_kek_puk")?;
let job_result = JobBuilder::from_meta(job)
.unwrap_one()
.wait_one()
.await;
let job_result = unwrap_enum!(job_result, ExecResult::Assigned);
let output = job_result.to_string_result().unwrap();
assert!(output.contains("No such file"));
assert!(job_result.retcode.is_none());
Ok(())
}
#[test_case(
"/bin/bash {}",
None,
"contains executable"
; "no binary"
)]
#[test_case(
"/bin/bash",
Some(b"whoami"),
"contains no executable"
; "no path to binary"
)]
#[tokio::test]
async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult {
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let err = job.build().unwrap_err();
let err_msg = unwrap_enum!(err, UError::JobArgsError);
assert!(err_msg.contains(err_str));
Ok(())
}
#[tokio::test]
async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobBuilder::from_meta(vec![
("sleeper", JobMeta::from_shell("sleep 3")?),
("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?)
]).wait().await;
let gathered = jobs.pop("gatherer");
assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None);
Ok(())
}
}

@ -1,9 +1,8 @@
use std::net::Ipv4Addr;
use uuid::Uuid; use uuid::Uuid;
pub const MASTER_SERVER: Ipv4Addr = Ipv4Addr::LOCALHOST; //Ipv4Addr::new(3,9,16,40) pub const MASTER_SERVER: &str = "127.0.0.1"; //Ipv4Addr::new(3,9,16,40)
pub const MASTER_PORT: u16 = 63714; pub const MASTER_PORT: u16 = 63714;
lazy_static! { lazy_static! {
pub static ref UID: Uuid = Uuid::new_v4(); pub static ref UID: Uuid = Uuid::new_v4();
} }

@ -1,7 +1,7 @@
use crate::{utils::OneOrMany, models::ExecResult}; use crate::{models::ExecResult, utils::OneOrVec};
use futures::{future::BoxFuture, lock::Mutex}; use futures::{future::BoxFuture, lock::Mutex};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::{collections::HashMap}; use std::collections::HashMap;
use tokio::{ use tokio::{
spawn, spawn,
sync::mpsc::{channel, Receiver, Sender}, sync::mpsc::{channel, Receiver, Sender},
@ -37,7 +37,7 @@ pub struct Waiter {
} }
impl Waiter { impl Waiter {
pub fn new<S: OneOrMany<DynFut>>(tasks: S) -> Self { pub fn new<S: OneOrVec<DynFut>>(tasks: S) -> Self {
Self { Self {
tasks: tasks.into_vec(), tasks: tasks.into_vec(),
fids: vec![], fids: vec![],

@ -26,3 +26,6 @@ extern crate diesel;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate env_logger; extern crate env_logger;
#[macro_use]
extern crate mockall;

@ -1,9 +1,11 @@
use crate::utils::VecDisplay;
use crate::UID; use crate::UID;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use std::fmt::Display;
use uuid::Uuid; use uuid::Uuid;
pub struct Moo<'cow, T: Clone>(pub Cow<'cow, T>); pub struct Moo<'cow, T: AsMsg + Clone>(pub Cow<'cow, T>);
pub trait AsMsg: Clone + Serialize { pub trait AsMsg: Clone + Serialize {
fn as_message<'m>(&'m self) -> BaseMessage<'m, Self> fn as_message<'m>(&'m self) -> BaseMessage<'m, Self>
@ -29,6 +31,8 @@ impl<'cow, M: AsMsg> From<&'cow M> for Moo<'cow, M> {
} }
impl<M: AsMsg> AsMsg for Vec<M> {} impl<M: AsMsg> AsMsg for Vec<M> {}
impl<M: AsMsg + Display> AsMsg for VecDisplay<M> {}
impl<'msg, M: AsMsg> AsMsg for &'msg [M] {}
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct BaseMessage<'cow, I: AsMsg> { pub struct BaseMessage<'cow, I: AsMsg> {

@ -79,10 +79,9 @@ impl Agent {
.await; .await;
let decoder = |job_result: ExecResult| { let decoder = |job_result: ExecResult| {
let assoc_job = unwrap_enum!(job_result, ExecResult::Assigned); let assoc_job = unwrap_enum!(job_result, ExecResult::Assigned);
assoc_job.to_string_result().unwrap() assoc_job.to_string_result().unwrap().trim().to_string()
}; };
#[cfg(unix)]
Self { Self {
hostname: decoder(builder.pop("hostname")), hostname: decoder(builder.pop("hostname")),
is_root: &decoder(builder.pop("is_root")) == "0", is_root: &decoder(builder.pop("is_root")) == "0",

@ -136,3 +136,51 @@ impl JobOutput {
result result
} }
} }
#[cfg(test)]
mod tests {
use crate::{models::JobOutput, utils::bytes_to_string};
use test_case::test_case;
const STDOUT: &str = "<***STDOUT***>";
const STDERR: &str = "<***STDERR***>";
#[test_case(
"lol",
"kek",
&format!("{}lol{}kek", STDOUT, STDERR)
;"stdout stderr"
)]
#[test_case(
"",
"kek",
&format!("{}kek", STDERR)
;"stderr"
)]
fn test_to_combined(stdout: &str, stderr: &str, result: &str) {
let output = JobOutput::new()
.stdout(stdout.as_bytes().to_vec())
.stderr(stderr.as_bytes().to_vec());
assert_eq!(&bytes_to_string(&output.into_combined()), result)
}
#[test_case(
&format!("{}lal{}kik", STDOUT, STDERR),
"lal\nkik"
;"stdout stderr"
)]
#[test_case(
&format!("{}qeq", STDOUT),
"qeq"
;"stdout"
)]
#[test_case(
&format!("{}vev", STDERR),
"vev"
;"stderr"
)]
fn test_from_combined(src: &str, result: &str) {
let output = JobOutput::from_combined(src.as_bytes()).unwrap();
assert_eq!(bytes_to_string(&output.to_appropriate()).trim(), result);
}
}

@ -6,6 +6,8 @@ pub mod schema;
use crate::messaging::AsMsg; use crate::messaging::AsMsg;
pub use crate::models::result::ExecResult; pub use crate::models::result::ExecResult;
pub use crate::models::{agent::*, jobs::*}; pub use crate::models::{agent::*, jobs::*};
use serde::{Deserialize, Serialize};
use std::fmt;
use uuid::Uuid; use uuid::Uuid;
impl AsMsg for Agent {} impl AsMsg for Agent {}
@ -14,4 +16,13 @@ impl AsMsg for ExecResult {}
impl AsMsg for JobMeta {} impl AsMsg for JobMeta {}
impl AsMsg for String {} impl AsMsg for String {}
impl AsMsg for Uuid {} impl AsMsg for Uuid {}
impl AsMsg for () {} impl AsMsg for Empty {}
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
pub struct Empty;
impl fmt::Display for Empty {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "<empty>")
}
}

@ -1,161 +0,0 @@
use crate::UError;
use chrono::{offset::Local, DateTime};
use nix::{
sys::signal::{signal, SigHandler, Signal},
unistd::{chdir, close as fdclose, fork, getppid, setsid, ForkResult},
};
use std::{
env::temp_dir, fs, ops::Drop, os::unix::fs::PermissionsExt, path::PathBuf, process::exit,
time::SystemTime,
};
use uuid::Uuid;
pub trait OneOrMany<T> {
fn into_vec(self) -> Vec<T>;
}
impl<T> OneOrMany<T> for T {
fn into_vec(self) -> Vec<T> {
vec![self]
}
}
impl<T> OneOrMany<T> for Vec<T> {
fn into_vec(self) -> Vec<T> {
self
}
}
pub fn daemonize() {
if getppid().as_raw() != 1 {
setsig(Signal::SIGTTOU, SigHandler::SigIgn);
setsig(Signal::SIGTTIN, SigHandler::SigIgn);
setsig(Signal::SIGTSTP, SigHandler::SigIgn);
}
for fd in 0..=2 {
match fdclose(fd) {
_ => (),
}
}
match chdir("/") {
_ => (),
};
match fork() {
Ok(ForkResult::Parent { .. }) => {
exit(0);
}
Ok(ForkResult::Child) => match setsid() {
_ => (),
},
Err(_) => exit(255),
}
}
pub fn setsig(sig: Signal, hnd: SigHandler) {
unsafe {
signal(sig, hnd).unwrap();
}
}
pub fn vec_to_string(v: &[u8]) -> String {
String::from_utf8_lossy(v).to_string()
}
pub fn opt_to_string<T: ToString>(item: Option<T>) -> String {
match item {
Some(s) => s.to_string(),
None => String::new(),
}
}
pub fn systime_to_string(time: &SystemTime) -> String {
DateTime::<Local>::from(*time)
.format("%d/%m/%Y %T")
.to_string()
}
pub struct TempFile {
path: PathBuf,
}
impl TempFile {
pub fn get_path(&self) -> String {
self.path.to_string_lossy().to_string()
}
pub fn new() -> Self {
let name = Uuid::simple(&Uuid::new_v4()).to_string();
let mut path = temp_dir();
path.push(name);
Self { path }
}
pub fn write_all(&self, data: &[u8]) -> Result<(), String> {
fs::write(&self.path, data).map_err(|e| e.to_string())
}
pub fn write_exec(data: &[u8]) -> Result<Self, (String, String)> {
let this = Self::new();
let path = this.get_path();
this.write_all(data).map_err(|e| (path.clone(), e))?;
let perms = fs::Permissions::from_mode(0o555);
fs::set_permissions(&path, perms).map_err(|e| (path, e.to_string()))?;
Ok(this)
}
}
impl Drop for TempFile {
fn drop(&mut self) {
fs::remove_file(&self.path).ok();
}
}
pub struct CombinedResult<T, E = UError> {
ok: Vec<T>,
err: Vec<E>,
}
impl<T, E> CombinedResult<T, E> {
pub fn new() -> Self {
Self {
ok: vec![],
err: vec![],
}
}
pub fn ok<I: OneOrMany<T>>(&mut self, result: I) {
self.ok.extend(result.into_vec());
}
pub fn err<I: OneOrMany<E>>(&mut self, err: I) {
self.err.extend(err.into_vec());
}
pub fn unwrap(self) -> Vec<T> {
let err_len = self.err.len();
if err_len > 0 {
panic!("CombinedResult has {} errors", err_len);
}
self.ok
}
pub fn unwrap_one(self) -> T {
self.unwrap().pop().unwrap()
}
pub fn pop_errors(&mut self) -> Vec<E> {
self.err.drain(..).collect()
}
}
#[macro_export]
macro_rules! unwrap_enum {
($src:ident, $t:path) => {
if let $t(result) = $src {
result
} else {
panic!("wrong type")
}
};
}

@ -0,0 +1,40 @@
use crate::utils::OneOrVec;
use crate::UError;
pub struct CombinedResult<T, E = UError> {
ok: Vec<T>,
err: Vec<E>,
}
impl<T, E> CombinedResult<T, E> {
pub fn new() -> Self {
Self {
ok: vec![],
err: vec![],
}
}
pub fn ok<I: OneOrVec<T>>(&mut self, result: I) {
self.ok.extend(result.into_vec());
}
pub fn err<I: OneOrVec<E>>(&mut self, err: I) {
self.err.extend(err.into_vec());
}
pub fn unwrap(self) -> Vec<T> {
let err_len = self.err.len();
if err_len > 0 {
panic!("CombinedResult has {} errors", err_len);
}
self.ok
}
pub fn unwrap_one(self) -> T {
self.unwrap().pop().unwrap()
}
pub fn pop_errors(&mut self) -> Vec<E> {
self.err.drain(..).collect()
}
}

@ -0,0 +1,19 @@
use chrono::{offset::Local, DateTime};
use std::time::SystemTime;
pub fn bytes_to_string(v: &[u8]) -> String {
String::from_utf8_lossy(v).to_string()
}
pub fn opt_to_string<T: ToString>(item: Option<T>) -> String {
match item {
Some(s) => s.to_string(),
None => String::new(),
}
}
pub fn systime_to_string(time: &SystemTime) -> String {
DateTime::<Local>::from(*time)
.format("%d/%m/%Y %T")
.to_string()
}

@ -0,0 +1,71 @@
use nix::{
sys::signal::{signal, SigHandler, Signal},
unistd::{chdir, close as fdclose, fork, getppid, setsid, ForkResult},
};
use std::process::exit;
pub trait OneOrVec<T> {
fn into_vec(self) -> Vec<T>;
}
impl<T> OneOrVec<T> for T {
fn into_vec(self) -> Vec<T> {
vec![self]
}
}
impl<T> OneOrVec<T> for Vec<T> {
fn into_vec(self) -> Vec<T> {
self
}
}
#[macro_export]
macro_rules! unwrap_enum {
($src:ident, $t:path) => {
if let $t(result) = $src {
result
} else {
panic!("wrong type")
}
};
}
pub fn daemonize() {
if getppid().as_raw() != 1 {
setsig(Signal::SIGTTOU, SigHandler::SigIgn);
setsig(Signal::SIGTTIN, SigHandler::SigIgn);
setsig(Signal::SIGTSTP, SigHandler::SigIgn);
}
for fd in 0..=2 {
match fdclose(fd) {
_ => (),
}
}
match chdir("/") {
_ => (),
};
match fork() {
Ok(ForkResult::Parent { .. }) => {
exit(0);
}
Ok(ForkResult::Child) => match setsid() {
_ => (),
},
Err(_) => exit(255),
}
}
pub fn setsig(sig: Signal, hnd: SigHandler) {
unsafe {
signal(sig, hnd).unwrap();
}
}
pub fn init_env() {
let envs = [".env"];
for envfile in &envs {
dotenv::from_filename(envfile).ok();
}
}

@ -0,0 +1,11 @@
pub mod combined_result;
pub mod conv;
pub mod misc;
pub mod tempfile;
pub mod vec_display;
pub use combined_result::*;
pub use conv::*;
pub use misc::*;
pub use tempfile::*;
pub use vec_display::*;

@ -0,0 +1,38 @@
use std::{env::temp_dir, fs, ops::Drop, os::unix::fs::PermissionsExt, path::PathBuf};
use uuid::Uuid;
pub struct TempFile {
path: PathBuf,
}
impl TempFile {
pub fn get_path(&self) -> String {
self.path.to_string_lossy().to_string()
}
pub fn new() -> Self {
let name = Uuid::simple(&Uuid::new_v4()).to_string();
let mut path = temp_dir();
path.push(name);
Self { path }
}
pub fn write_all(&self, data: &[u8]) -> Result<(), String> {
fs::write(&self.path, data).map_err(|e| e.to_string())
}
pub fn write_exec(data: &[u8]) -> Result<Self, (String, String)> {
let this = Self::new();
let path = this.get_path();
this.write_all(data).map_err(|e| (path.clone(), e))?;
let perms = fs::Permissions::from_mode(0o555);
fs::set_permissions(&path, perms).map_err(|e| (path, e.to_string()))?;
Ok(this)
}
}
impl Drop for TempFile {
fn drop(&mut self) {
fs::remove_file(&self.path).ok();
}
}

@ -0,0 +1,40 @@
use crate::{messaging::AsMsg, utils::OneOrVec};
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display, Formatter};
use std::ops::{Deref, DerefMut};
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct VecDisplay<T: AsMsg + Display>(pub Vec<T>);
impl<T: AsMsg + Display> VecDisplay<T> {
pub fn new<I: OneOrVec<T>>(inner: I) -> Self {
VecDisplay(inner.into_vec())
}
pub fn into_builtin_vec(self) -> Vec<T> {
self.0
}
}
impl<T: AsMsg + Display> Deref for VecDisplay<T> {
type Target = Vec<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: AsMsg + Display> DerefMut for VecDisplay<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: AsMsg + Display> Display for VecDisplay<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
for (i, itm) in self.0.iter().enumerate() {
writeln!(f, "### {}:\n{}\n", i, itm)?;
}
Ok(())
}
}

@ -1,17 +0,0 @@
/*
use std::fmt::Display;
use u_api_proc_macro::api_route;
use uuid::Uuid;
struct Paths;
struct ClientHandler;
#[test]
fn test_api_proc_macro() {
#[api_route("GET", Uuid)]
fn list<T: Display>(&self, msg: T) -> String {}
#[api_route("POST", Uuid)]
fn report<T: Display>(&self, msg: T) -> String {}
}
*/

Binary file not shown.

@ -1,161 +0,0 @@
use std::{time::SystemTime};
use u_lib::{
errors::UError,
models::{
jobs::{JobMeta},
ExecResult,
misc::JobType
},
builder::{JobBuilder, NamedJobBuilder}
};
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
#[test_case(
"/bin/sh {}",
Some(b"echo test01 > /tmp/asd; cat /tmp/asd"),
"test01"
;"sh payload"
)]
#[test_case(
r#"/usr/bin/python -c 'print("test02")'"#,
None,
"test02"
;"python cmd"
)]
#[test_case(
"/{}",
Some(
br#"#!/bin/sh
TMPPATH=/tmp/lol
mkdir -p $TMPPATH
echo test03 > $TMPPATH/t
cat $TMPPATH/t"#
),
"test03"
;"sh multiline payload"
)]
#[test_case(
"/{} 'some msg as arg'",
Some(include_bytes!("../fixtures/echoer")),
"some msg as arg"
;"standalone binary with args"
)]
#[tokio::test]
async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult {
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let job = job.build().unwrap();
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let result = unwrap_enum!(job_result, ExecResult::Assigned);
let result = result.to_string_result().unwrap();
assert_eq!(result.trim(), expected_result);
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> TestResult {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await;
let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one()
.wait_one()
.await;
let ls = unwrap_enum!(ls, ExecResult::Assigned);
assert_eq!(ls.retcode.unwrap(), 0);
let folders = ls.to_string_result().unwrap();
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap())
.collect();
let ls_subfolders = JobBuilder::from_meta(subfolders_jobs)
.unwrap_one()
.wait()
.await;
for result in ls_subfolders {
let result = unwrap_enum!(result, ExecResult::Assigned);
assert_eq!(result.retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> TestResult {
let job = JobMeta::from_shell("lol_kek_puk")?;
let job_result = JobBuilder::from_meta(job)
.unwrap_one()
.wait_one()
.await;
let job_result = unwrap_enum!(job_result, ExecResult::Assigned);
let output = job_result.to_string_result().unwrap();
assert!(output.contains("No such file"));
assert!(job_result.retcode.is_none());
Ok(())
}
#[test_case(
"/bin/bash {}",
None,
"contains executable"
; "no binary"
)]
#[test_case(
"/bin/bash",
Some(b"whoami"),
"contains no executable"
; "no path to binary"
)]
#[tokio::test]
async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult {
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let err = job.build().unwrap_err();
let err_msg = unwrap_enum!(err, UError::JobArgsError);
assert!(err_msg.contains(err_str));
Ok(())
}
#[tokio::test]
async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobBuilder::from_meta(vec![
("sleeper", JobMeta::from_shell("sleep 3")?),
("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?)
]).wait().await;
let gathered = jobs.pop("gatherer");
assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None);
Ok(())
}

@ -1,43 +0,0 @@
use u_lib::{models::JobOutput, utils::vec_to_string};
const STDOUT: &str = "<***STDOUT***>";
const STDERR: &str = "<***STDERR***>";
#[test_case(
"lol",
"kek",
&format!("{}lol{}kek", STDOUT, STDERR)
;"stdout stderr"
)]
#[test_case(
"",
"kek",
&format!("{}kek", STDERR)
;"stderr"
)]
fn test_to_combined(stdout: &str, stderr: &str, result: &str) {
let output = JobOutput::new()
.stdout(stdout.as_bytes().to_vec())
.stderr(stderr.as_bytes().to_vec());
assert_eq!(&vec_to_string(&output.into_combined()), result)
}
#[test_case(
&format!("{}lal{}kik", STDOUT, STDERR),
"lal\nkik"
;"stdout stderr"
)]
#[test_case(
&format!("{}qeq", STDOUT),
"qeq"
;"stdout"
)]
#[test_case(
&format!("{}vev", STDERR),
"vev"
;"stderr"
)]
fn test_from_combined(src: &str, result: &str) {
let output = JobOutput::from_combined(src.as_bytes()).unwrap();
assert_eq!(vec_to_string(&output.to_appropriate()).trim(), result);
}

@ -1,10 +0,0 @@
#[macro_use]
extern crate test_case;
#[macro_use]
extern crate u_lib;
mod jobs {
mod execution;
mod output;
}

@ -1,10 +1,11 @@
#!/bin/bash #!/bin/bash
set -e set -e
source $(dirname $0)/rootdir.sh #set ROOTDIR source $(dirname $0)/rootdir.sh #set ROOTDIR
umask 002
docker run \ docker run \
-v $ROOTDIR:/volume \ -v $ROOTDIR:/volume \
-v cargo-cache:/root/.cargo/registry \ -v cargo-cache:/root/.cargo/registry \
-w /volume \ -w /volume \
-it \ -it \
clux/muslrust \ unki/musllibs \
cargo $@ cargo $@

Loading…
Cancel
Save