diff --git a/.cargo/config.toml b/.cargo/config.toml deleted file mode 100644 index 664ed6c..0000000 --- a/.cargo/config.toml +++ /dev/null @@ -1,3 +0,0 @@ -[build] -target = "x86_64-unknown-linux-gnu" # -musl" - diff --git a/.env b/.env index 2ac5089..001cab7 100644 --- a/.env +++ b/.env @@ -1 +1,3 @@ -export DATABASE_URL=postgres://postgres:12348756@172.17.0.2/u_db +ADMIN_AUTH_TOKEN=464af63dbd241969baa1e94b2461d94d +POSTGRES_PASSWORD=12348756 +DATABASE_URL=postgres://postgres:${POSTGRES_PASSWORD}@u_db/u_db diff --git a/.gitignore b/.gitignore index f8abd6f..fa726f9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ -/target +target/ **/*.rs.bk -/.idea -/data \ No newline at end of file +.idea/ +data/ +static/ +**/*.pyc \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index e1a1cda..ede90c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,8 @@ members = [ "bin/u_run", "bin/u_server", "lib/u_lib", - "lib/u_api_proc_macro" + "lib/u_api_proc_macro", + "integration" ] [profile.release] @@ -13,5 +14,4 @@ panic = "abort" [profile.dev] debug = true # Добавляет флаг `-g` для компилятора; -opt-level = 0 - +opt-level = 0 \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8c51d78 --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +.PHONY: _pre_build debug release run clean + +CARGO=./scripts/cargo_musl.sh + +clean: + ${CARGO} clean + +_pre_build: + docker build -t unki/musllibs ./muslrust + +debug: _pre_build + ${CARGO} build + +release: _pre_build + ${CARGO} build --release + +run: build + ${CARGO} run \ No newline at end of file diff --git a/bin/u_agent/Cargo.toml b/bin/u_agent/Cargo.toml index a0fa747..c963c2b 100644 --- a/bin/u_agent/Cargo.toml +++ b/bin/u_agent/Cargo.toml @@ -13,4 +13,5 @@ log = "^0.4" env_logger = "0.8.3" uuid = "0.6.5" reqwest = { version = "0.11", features = ["json"] } +openssl = "*" u_lib = { version = "*", path = "../../lib/u_lib" } \ No newline at end of file diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs new file mode 100644 index 0000000..876a1b6 --- /dev/null +++ b/bin/u_agent/src/lib.rs @@ -0,0 +1,88 @@ +// TODO: +// поддержка питона +// резолв адреса управляющего сервера через DoT +// кроссплатформенность (реализовать интерфейс для винды и никсов) +// проверка обнов +// самоуничтожение + +#[macro_use] +extern crate log; +extern crate env_logger; + +use std::env; +use tokio::time::{sleep, Duration}; +use u_lib::{ + api::ClientHandler, + builder::JobBuilder, + cache::JobCache, + executor::pop_completed, + models::{AssignedJob, ExecResult}, + UID, + //daemonize +}; + +#[macro_export] +macro_rules! retry_until_ok { + ( $body:expr ) => { + loop { + match $body { + Ok(r) => break r, + Err(e) => error!("{:?}", e), + }; + sleep(Duration::from_secs(5)).await; + } + }; +} + +pub async fn process_request(job_requests: Vec, client: &ClientHandler) { + if job_requests.len() > 0 { + for jr in &job_requests { + if !JobCache::contains(&jr.job_id) { + info!("Fetching job: {}", &jr.job_id); + let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await) + .pop() + .unwrap(); + JobCache::insert(fetched_job); + } + } + info!( + "Scheduling jobs: \n{}", + job_requests + .iter() + .map(|j| j.job_id.to_string()) + .collect::>() + .join("\n") + ); + let mut builder = JobBuilder::from_request(job_requests); + let errors = builder.pop_errors(); + if errors.len() > 0 { + error!( + "Some errors encountered: \n{}", + errors + .iter() + .map(|j| j.to_string()) + .collect::>() + .join("\n") + ); + } + builder.unwrap_one().spawn().await; + } +} + +pub async fn run_forever() { + //daemonize(); + env_logger::init(); + let arg_ip = env::args().nth(1); + let instance = ClientHandler::new(arg_ip.as_deref()); + info!("Connecting to the server"); + loop { + let job_requests: Vec = + retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await).into_builtin_vec(); + process_request(job_requests, &instance).await; + let result: Vec = pop_completed().await.into_iter().collect(); + if result.len() > 0 { + retry_until_ok!(instance.report(&result).await); + } + sleep(Duration::from_secs(5)).await; + } +} diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index a854863..43166a8 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -1,89 +1,7 @@ -// TODO: -// поддержка питона -// резолв адреса управляющего сервера через DoT -// кроссплатформенность (реализовать интерфейс для винды и никсов) -// проверка обнов -// самоуничтожение - -#[macro_use] -extern crate log; -extern crate env_logger; - -use std::env; -use tokio::time::{sleep, Duration}; -use u_lib::{ - api::ClientHandler, - builder::JobBuilder, - cache::JobCache, - executor::pop_completed, - models::{AssignedJob, ExecResult}, - UID, - //daemonize -}; - -#[macro_export] -macro_rules! retry_until_ok { - ( $body:expr ) => { - loop { - match $body { - Ok(r) => break r, - Err(e) => error!("{:?}", e), - }; - sleep(Duration::from_secs(5)).await; - } - }; -} - -async fn process_request(job_requests: Vec, client: &ClientHandler) { - if job_requests.len() > 0 { - for jr in &job_requests { - if !JobCache::contains(&jr.job_id) { - info!("Fetching job: {}", &jr.job_id); - let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await) - .pop() - .unwrap(); - JobCache::insert(fetched_job); - } - } - info!( - "Scheduling jobs: \n{}", - job_requests - .iter() - .map(|j| j.job_id.to_string()) - .collect::>() - .join("\n") - ); - let mut builder = JobBuilder::from_request(job_requests); - let errors = builder.pop_errors(); - if errors.len() > 0 { - error!( - "Some errors encountered: \n{}", - errors - .iter() - .map(|j| j.to_string()) - .collect::>() - .join("\n") - ); - } - builder.unwrap_one().spawn().await; - } -} +use tokio; +use u_agent::run_forever; #[tokio::main] async fn main() { - //daemonize(); - env_logger::init(); - let arg_ip = env::args().nth(1); - let instance = ClientHandler::new(arg_ip); - info!("Connecting to the server"); - loop { - let job_requests: Vec = - retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await); - process_request(job_requests, &instance).await; - let result: Vec = pop_completed().await.into_iter().collect(); - if result.len() > 0 { - retry_until_ok!(instance.report(&result).await) - } - sleep(Duration::from_secs(5)).await; - } + run_forever().await; } diff --git a/bin/u_panel/Cargo.toml b/bin/u_panel/Cargo.toml index 734d36c..1f15786 100644 --- a/bin/u_panel/Cargo.toml +++ b/bin/u_panel/Cargo.toml @@ -13,4 +13,7 @@ log = "^0.4" env_logger = "0.7.1" uuid = "0.6.5" reqwest = { version = "0.11", features = ["json"] } +openssl = "*" u_lib = { version = "*", path = "../../lib/u_lib" } +serde_json = "1.0.4" +serde = { version = "1.0.114", features = ["derive"] } diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index e6b8990..f6335a2 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -1,20 +1,25 @@ +use serde::Serialize; +use std::env; +use std::fmt; use structopt::StructOpt; -use u_lib::{api::ClientHandler, models::JobMeta, UError}; +use u_lib::{ + api::ClientHandler, messaging::AsMsg, models::JobMeta, utils::init_env, UError, UResult, +}; use uuid::Uuid; -const DELIM: &'static str = "*************\n"; - #[derive(StructOpt, Debug)] struct Args { #[structopt(subcommand)] cmd: Cmd, + #[structopt(long)] + json: bool, } #[derive(StructOpt, Debug)] enum Cmd { Agents(LD), Jobs(JobALD), - Jobmap(JmALD), + Jobmap(JobMapALD), } #[derive(StructOpt, Debug)] @@ -23,6 +28,9 @@ enum JobALD { #[structopt(long, parse(try_from_str = parse_uuid))] agent: Option, + #[structopt(long)] + alias: String, + #[structopt(subcommand)] cmd: JobCmd, }, @@ -37,13 +45,12 @@ enum JobCmd { } #[derive(StructOpt, Debug)] -enum JmALD { +enum JobMapALD { Add { #[structopt(parse(try_from_str = parse_uuid))] agent_uid: Uuid, - #[structopt(parse(try_from_str = parse_uuid))] - job_uids: Vec, + job_idents: Vec, }, List { #[structopt(parse(try_from_str = parse_uuid))] @@ -71,58 +78,71 @@ fn parse_uuid(src: &str) -> Result { Uuid::parse_str(src).map_err(|e| e.to_string()) } -async fn process_cmd(cmd: Cmd) -> Result<(), UError> { - let cli_handler = ClientHandler::new(None).password("123qwe".to_string()); - match cmd { - Cmd::Agents(action) => match action { - LD::List { uid } => cli_handler - .get_agents(uid) - .await? - .into_iter() - .for_each(|r| println!("{}{}", DELIM, r)), - LD::Delete { uid } => { - println!("{}", cli_handler.del(Some(uid)).await?); +async fn process_cmd(args: Args) { + fn printer(data: UResult, json: bool) { + if json { + #[derive(Serialize)] + #[serde(rename_all = "lowercase")] + #[serde(tag = "status", content = "data")] + enum DataResult { + Ok(M), + Err(UError), + } + + let data = match data { + Ok(r) => DataResult::Ok(r), + Err(e) => DataResult::Err(e), + }; + println!("{}", serde_json::to_string_pretty(&data).unwrap()); + } else { + match data { + Ok(r) => println!("{}", r), + Err(e) => eprintln!("Error: {}", e), } + } + } + + let token = env::var("ADMIN_AUTH_TOKEN").expect("Authentication token is not set"); + let cli_handler = ClientHandler::new(None).password(token); + let json = args.json; + match args.cmd { + Cmd::Agents(action) => match action { + LD::List { uid } => printer(cli_handler.get_agents(uid).await, json), + LD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json), }, Cmd::Jobs(action) => match action { JobALD::Add { cmd: JobCmd::Cmd(cmd), - agent, + alias, + agent: _agent, } => { - let job = JobMeta::from_shell(cmd.join(" "))?; - let job_uid = job.id; - cli_handler.upload_jobs(&vec![job]).await?; - if agent.is_some() { - cli_handler.set_jobs(agent, &vec![job_uid]).await? - } - } - JobALD::LD(LD::List { uid }) => cli_handler - .get_jobs(uid) - .await? - .into_iter() - .for_each(|r| println!("{}{}", DELIM, r)), - JobALD::LD(LD::Delete { uid }) => { - println!("{}", cli_handler.del(Some(uid)).await?) + let job = JobMeta::builder() + .with_shell(cmd.join(" ")) + .with_alias(alias) + .build() + .unwrap(); + printer(cli_handler.upload_jobs(&[job]).await, json); } + JobALD::LD(LD::List { uid }) => printer(cli_handler.get_jobs(uid).await, json), + JobALD::LD(LD::Delete { uid }) => printer(cli_handler.del(Some(uid)).await, json), }, Cmd::Jobmap(action) => match action { - JmALD::Add { + JobMapALD::Add { agent_uid, - job_uids, - } => cli_handler.set_jobs(Some(agent_uid), &job_uids).await?, - JmALD::List { uid } => cli_handler - .get_agent_jobs(uid) - .await? - .into_iter() - .for_each(|r| println!("{}{}", DELIM, r)), - JmALD::Delete { uid } => println!("{}", cli_handler.del(Some(uid)).await?), + job_idents, + } => printer( + cli_handler.set_jobs(Some(agent_uid), &job_idents).await, + json, + ), + JobMapALD::List { uid } => printer(cli_handler.get_agent_jobs(uid).await, json), + JobMapALD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json), }, } - Ok(()) } #[tokio::main] -async fn main() -> Result<(), UError> { +async fn main() { + init_env(); let args: Args = Args::from_args(); - process_cmd(args.cmd).await + process_cmd(args).await; } diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 6ef7838..96a3651 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -5,14 +5,16 @@ name = "u_server" version = "0.1.0" [dependencies] -dotenv = "0.15.0" -env_logger = "0.7.1" log = "0.4.11" +simplelog = "0.10" thiserror = "*" warp = "0.2.4" uuid = { version = "0.6.5", features = ["serde", "v4"] } once_cell = "1.7.2" hyper = "0.13.10" +mockall = "0.9.1" +mockall_double = "0.2" +openssl = "*" [dependencies.diesel] features = ["postgres", "uuid"] @@ -30,3 +32,13 @@ version = "0.2.22" path = "../../lib/u_lib" version = "*" +[dev-dependencies] +test-case = "1.1.0" + +[lib] +name = "u_server_lib" +path = "src/lib.rs" + +[[bin]] +name = "u_server" +path = "src/main.rs" \ No newline at end of file diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index 1302443..c104968 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -1,5 +1,4 @@ use diesel::{pg::PgConnection, prelude::*, result::Error as DslError}; -use dotenv::dotenv; use once_cell::sync::OnceCell; use std::{ env, @@ -17,19 +16,19 @@ pub struct UDB { static DB: OnceCell>> = OnceCell::new(); -pub fn lock_db() -> MutexGuard<'static, UDB> { - DB.get_or_init(|| { - dotenv().unwrap(); - let db_path = env::var("DATABASE_URL").unwrap(); - let conn = PgConnection::establish(&db_path).unwrap(); - let instance = UDB { conn }; - Arc::new(Mutex::new(instance)) - }) - .lock() - .unwrap() -} - +#[cfg_attr(test, automock)] impl UDB { + pub fn lock_db() -> MutexGuard<'static, UDB> { + DB.get_or_init(|| { + let db_path = env::var("DATABASE_URL").unwrap(); + let conn = PgConnection::establish(&db_path).unwrap(); + let instance = UDB { conn }; + Arc::new(Mutex::new(instance)) + }) + .lock() + .unwrap() + } + pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> { use schema::jobs; diesel::insert_into(jobs::table) @@ -117,7 +116,11 @@ impl UDB { Ok(result) } - pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &[Uuid]) -> ULocalResult<()> { + pub fn set_jobs_for_agent( + &self, + agent_uid: &Uuid, + job_uids: &[Uuid], + ) -> ULocalResult> { use schema::{agents::dsl::agents, jobs::dsl::jobs, results}; if let Err(DslError::NotFound) = agents.find(agent_uid).first::(&self.conn) { return Err(ULocalError::NotFound(agent_uid.to_string())); @@ -146,7 +149,8 @@ impl UDB { diesel::insert_into(results::table) .values(&job_requests) .execute(&self.conn)?; - Ok(()) + let assigned_uids = job_requests.iter().map(|aj| aj.id).collect(); + Ok(assigned_uids) } pub fn del_jobs(&self, uids: &Vec) -> ULocalResult { diff --git a/bin/u_server/src/filters.rs b/bin/u_server/src/filters.rs new file mode 100644 index 0000000..a9c51f0 --- /dev/null +++ b/bin/u_server/src/filters.rs @@ -0,0 +1,88 @@ +use crate::handlers::Endpoints; +use serde::de::DeserializeOwned; +use std::env; +use u_lib::{ + messaging::{AsMsg, BaseMessage}, + models::*, +}; +use uuid::Uuid; +use warp::{body, Filter, Rejection, Reply}; + +fn get_content() -> impl Filter,), Error = Rejection> + Clone +where + M: AsMsg + Sync + Send + DeserializeOwned + 'static, +{ + body::content_length_limit(1024 * 64).and(body::json::>()) +} + +pub fn make_filters() -> impl Filter + Clone { + let infallible_none = |_| async { Ok::<(Option,), std::convert::Infallible>((None,)) }; + + let get_agents = warp::get() + .and(warp::path("get_agents")) + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) + .and_then(Endpoints::get_agents); + + let upload_jobs = warp::post() + .and(warp::path("upload_jobs")) + .and(get_content::>()) + .and_then(Endpoints::upload_jobs); + + let get_jobs = warp::get() + .and(warp::path("get_jobs")) + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) + .and_then(Endpoints::get_jobs); + + let get_agent_jobs = warp::get() + .and(warp::path("get_agent_jobs")) + .and( + warp::path::param::() + .map(Some) + .or_else(infallible_none), + ) + .and_then(|uid| Endpoints::get_agent_jobs(uid, false)); + + let get_personal_jobs = warp::get() + .and(warp::path("get_agent_jobs")) + .and(warp::path::param::().map(Some)) + .and_then(|uid| Endpoints::get_agent_jobs(uid, true)); + + let del = warp::get() + .and(warp::path("del")) + .and(warp::path::param::()) + .and_then(Endpoints::del); + + let set_jobs = warp::post() + .and(warp::path("set_jobs")) + .and(warp::path::param::()) + .and(get_content::>()) + .and_then(Endpoints::set_jobs); + + let report = warp::post() + .and(warp::path("report")) + .and(get_content::>().and_then(Endpoints::report)); + + let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); + let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); + + let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); + + let auth_zone = auth_header.and( + get_agents + .or(get_jobs) + .or(upload_jobs) + .or(del) + .or(set_jobs) + .or(get_agent_jobs), + ); + + agent_zone.or(auth_zone) +} diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index df6e58f..f792188 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -1,10 +1,10 @@ -use crate::db::{lock_db, UDB}; +use crate::db::UDB; use diesel::SaveChangesDsl; use hyper::Body; use serde::Serialize; use u_lib::{ messaging::{AsMsg, BaseMessage}, - models::{Agent, AssignedJob, ExecResult, JobMeta, JobState}, + models::{Agent, AgentState, AssignedJob, ExecResult, JobMeta, JobState}, ULocalError, }; use uuid::Uuid; @@ -13,130 +13,156 @@ use warp::{ Rejection, Reply, }; -fn build_response>(code: StatusCode, body: S) -> Response { +pub fn build_response>(code: StatusCode, body: S) -> Response { Response::builder().status(code).body(body.into()).unwrap() } -fn build_ok>(body: S) -> Response { +pub fn build_ok>(body: S) -> Response { build_response(StatusCode::OK, body) } -fn build_err(body: S) -> Response { +pub fn build_err(body: S) -> Response { build_response(StatusCode::BAD_REQUEST, body.to_string()) } -fn build_message(m: M) -> Response { +pub fn build_message(m: M) -> Response { warp::reply::json(&m.as_message()).into_response() } -pub async fn add_agent(msg: Agent) -> Result { - debug!("hnd: add_agent"); - lock_db() - .insert_agent(&msg) - .map(|_| build_ok("")) - .or_else(|e| Ok(build_err(e))) -} +pub struct Endpoints; -pub async fn get_agents(uid: Option) -> Result { - debug!("hnd: get_agents"); - lock_db() - .get_agents(uid) - .map(|m| build_message(m)) - .or_else(|e| Ok(build_err(e))) -} +#[cfg_attr(test, automock)] +impl Endpoints { + pub async fn add_agent(msg: Agent) -> Result, Rejection> { + info!("hnd: add_agent"); + UDB::lock_db() + .insert_agent(&msg) + .map(|_| build_ok("")) + .or_else(|e| Ok(build_err(e))) + } -pub async fn get_jobs(uid: Option) -> Result { - debug!("hnd: get_jobs"); - lock_db() - .get_jobs(uid) - .map(|m| build_message(m)) - .or_else(|e| Ok(build_err(e))) -} + pub async fn get_agents(uid: Option) -> Result, Rejection> { + info!("hnd: get_agents"); + UDB::lock_db() + .get_agents(uid) + .map(|m| build_message(m)) + .or_else(|e| Ok(build_err(e))) + } -pub async fn get_agent_jobs(uid: Option, personal: bool) -> Result { - info!("hnd: get_agent_jobs {}", personal); - if personal { - let agents = lock_db().get_agents(uid).unwrap(); - if agents.len() == 0 { - let db = lock_db(); - db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap(); - let job = db.find_job_by_alias("agent_hello").unwrap(); - if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) { - return Ok(build_err(e)); + pub async fn get_jobs(uid: Option) -> Result, Rejection> { + info!("hnd: get_jobs"); + UDB::lock_db() + .get_jobs(uid) + .map(|m| build_message(m)) + .or_else(|e| Ok(build_err(e))) + } + + pub async fn get_agent_jobs( + uid: Option, + personal: bool, + ) -> Result, Rejection> { + info!("hnd: get_agent_jobs {}", personal); + if personal { + let agents = UDB::lock_db().get_agents(uid).unwrap(); + if agents.len() == 0 { + let db = UDB::lock_db(); + db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap(); + let job = db.find_job_by_alias("agent_hello").unwrap(); + if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) { + return Ok(build_err(e)); + } } } - } - let result = lock_db().get_exact_jobs(uid, personal); - match result { - Ok(r) => { - let db = lock_db(); - for j in r.iter() { - db.update_job_status(j.id, JobState::Running).ok(); + let result = UDB::lock_db().get_exact_jobs(uid, personal); + match result { + Ok(r) => { + if personal { + let db = UDB::lock_db(); + for j in r.iter() { + db.update_job_status(j.id, JobState::Running).ok(); + } + } + Ok(build_message(r)) } - Ok(build_message(r)) + Err(e) => Ok(build_err(e)), } - Err(e) => Ok(build_err(e)), } -} -pub async fn upload_jobs(msg: BaseMessage<'_, Vec>) -> Result { - debug!("hnd: upload_jobs"); - lock_db() - .insert_jobs(&msg.into_inner()) - .map(|_| build_ok("")) - .or_else(|e| Ok(build_err(e))) -} + pub async fn upload_jobs( + msg: BaseMessage<'static, Vec>, + ) -> Result, Rejection> { + info!("hnd: upload_jobs"); + UDB::lock_db() + .insert_jobs(&msg.into_inner()) + .map(|_| build_ok("")) + .or_else(|e| Ok(build_err(e))) + } -pub async fn del(uid: Uuid) -> Result { - debug!("hnd: del"); - let db = lock_db(); - let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; - for del_fn in del_fns { - let affected = del_fn(&db, &vec![uid]).unwrap(); - if affected > 0 { - return Ok(build_ok(affected.to_string())); + pub async fn del(uid: Uuid) -> Result, Rejection> { + info!("hnd: del"); + let db = UDB::lock_db(); + let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; + for del_fn in del_fns { + let affected = del_fn(&db, &vec![uid]).unwrap(); + if affected > 0 { + return Ok(build_ok(affected.to_string())); + } } + Ok(build_err("0")) } - Ok(build_err("0")) -} -pub async fn set_jobs( - agent_uid: Uuid, - msg: BaseMessage<'_, Vec>, -) -> Result { - debug!("hnd: set_jobs"); - lock_db() - .set_jobs_for_agent(&agent_uid, &msg.into_inner()) - .map(|_| build_ok("")) - .or_else(|e| Ok(build_err(e))) -} + pub async fn set_jobs( + agent_uid: Uuid, + msg: BaseMessage<'static, Vec>, + ) -> Result, Rejection> { + info!("hnd: set_jobs_by_alias"); + let jobs: Result, ULocalError> = msg + .into_inner() + .into_iter() + .map(|ident| { + Uuid::parse_str(&ident) + .or_else(|_| UDB::lock_db().find_job_by_alias(&ident).map(|j| j.id)) + }) + .collect(); + match jobs { + Ok(j) => UDB::lock_db() + .set_jobs_for_agent(&agent_uid, &j) + .map(|assigned_uids| build_message(assigned_uids)) + .or_else(|e| Ok(build_err(e))), + Err(e) => Ok(build_err(e)), + } + } -pub async fn report(msg: BaseMessage<'_, Vec>) -> Result { - debug!("hnd: report"); - let id = msg.id; - let mut failed = vec![]; - for entry in msg.into_inner() { - match entry { - ExecResult::Assigned(res) => { - if id != res.agent_id { - continue; + pub async fn report( + msg: BaseMessage<'static, Vec>, + ) -> Result, Rejection> { + info!("hnd: report"); + let id = msg.id; + let mut failed = vec![]; + for entry in msg.into_inner() { + match entry { + ExecResult::Assigned(res) => { + if id != res.agent_id { + continue; + } + let db = UDB::lock_db(); + if let Err(e) = res + .save_changes::(&db.conn) + .map_err(ULocalError::from) + { + failed.push(e.to_string()) + } } - let db = lock_db(); - if let Err(e) = res - .save_changes::(&db.conn) - .map_err(ULocalError::from) - { - failed.push(e.to_string()) + ExecResult::Agent(mut a) => { + a.state = AgentState::Active; + Self::add_agent(a).await?; } } - ExecResult::Agent(a) => { - add_agent(a).await?; - } } + if failed.len() > 0 { + let err_msg = ULocalError::ProcessingError(failed.join(", ")); + return Ok(build_err(err_msg)); + } + Ok(build_ok("")) } - if failed.len() > 0 { - let err_msg = ULocalError::ProcessingError(failed.join(", ")); - return Ok(build_err(err_msg)); - } - Ok(build_ok("")) } diff --git a/bin/u_server/src/lib.rs b/bin/u_server/src/lib.rs new file mode 100644 index 0000000..6b24a9a --- /dev/null +++ b/bin/u_server/src/lib.rs @@ -0,0 +1,102 @@ +#[macro_use] +extern crate log; + +#[macro_use] +extern crate mockall; +#[macro_use] +extern crate mockall_double; + +// because of linking errors +extern crate openssl; +#[macro_use] +extern crate diesel; +// +mod db; +mod filters; +mod handlers; + +use db::UDB; +use filters::make_filters; +use u_lib::{config::MASTER_PORT, models::*, utils::init_env}; +use warp::Filter; + +const LOGFILE: &str = "u_server.log"; + +fn prefill_jobs() { + let agent_hello = JobMeta::builder() + .with_type(misc::JobType::Manage) + .with_alias("agent_hello") + .build() + .unwrap(); + UDB::lock_db().insert_jobs(&[agent_hello]).ok(); +} + +fn init_logger() { + use simplelog::*; + use std::fs::OpenOptions; + let log_cfg = ConfigBuilder::new() + .set_time_format_str("%x %X") + .set_time_to_local(true) + .build(); + let logfile = OpenOptions::new() + .append(true) + .create(true) + .open(LOGFILE) + .unwrap(); + let loggers = vec![ + WriteLogger::new(LevelFilter::Info, log_cfg.clone(), logfile) as Box, + TermLogger::new( + LevelFilter::Info, + log_cfg, + TerminalMode::Stderr, + ColorChoice::Auto, + ), + ]; + CombinedLogger::init(loggers).unwrap(); +} + +fn init_all() { + init_logger(); + init_env(); + prefill_jobs(); +} + +pub async fn serve() { + init_all(); + let routes = make_filters(); + warp::serve(routes.with(warp::log("warp"))) + .run(([0, 0, 0, 0], MASTER_PORT)) + .await; +} + +#[cfg(test)] +mod tests { + use super::*; + #[double] + use crate::handlers::Endpoints; + use handlers::build_ok; + use mockall::predicate::*; + use test_case::test_case; + use uuid::Uuid; + use warp::test::request; + + #[test_case(Some(Uuid::new_v4()))] + #[test_case(None => panics)] + #[tokio::test] + async fn test_get_agent_jobs_unauthorized(uid: Option) { + let mock = Endpoints::get_agent_jobs_context(); + mock.expect() + .with(eq(uid), eq(uid.is_some())) + .returning(|_, _| Ok(build_ok(""))); + request() + .path(&format!( + "/get_agent_jobs/{}", + uid.map(|u| u.simple().to_string()).unwrap_or(String::new()) + )) + .method("GET") + .filter(&make_filters()) + .await + .unwrap(); + mock.checkpoint(); + } +} diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index c606e09..05ec4e0 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -1,125 +1,6 @@ -mod db; -mod handlers; - -use warp::{body, Filter, Rejection}; - -#[macro_use] -extern crate log; -extern crate env_logger; - -use db::lock_db; -use serde::de::DeserializeOwned; -use u_lib::{ - config::MASTER_PORT, - messaging::{AsMsg, BaseMessage}, - models::*, -}; -use uuid::Uuid; - -fn get_content() -> impl Filter,), Error = Rejection> + Clone -where - M: AsMsg + Sync + Send + DeserializeOwned + 'static, -{ - body::content_length_limit(1024 * 64).and(body::json::>()) -} - -fn prefill_jobs() { - let agent_hello = JobMeta::builder() - .with_type(misc::JobType::Manage) - .with_alias("agent_hello") - .build() - .unwrap(); - lock_db().insert_jobs(&[agent_hello]).ok(); -} - -fn init() { - env_logger::init(); - prefill_jobs(); -} +use u_server_lib::serve; #[tokio::main] async fn main() { - init(); - let infallible_none = |_| async { Ok::<(Option,), std::convert::Infallible>((None,)) }; - - let get_agents = warp::get() - .and(warp::path("get_agents")) - .and( - warp::path::param::() - .map(Some) - .or_else(infallible_none), - ) - .and_then(handlers::get_agents); - - let upload_jobs = warp::post() - .and(warp::path("upload_jobs")) - .and(get_content::>()) - .and_then(handlers::upload_jobs); - - let get_jobs = warp::get() - .and(warp::path("get_jobs")) - .and( - warp::path::param::() - .map(Some) - .or_else(infallible_none), - ) - .and_then(handlers::get_jobs); - - let get_agent_jobs = warp::get() - .and(warp::path("get_agent_jobs")) - .and( - warp::path::param::() - .map(Some) - .or_else(infallible_none), - ) - .and_then(|uid| handlers::get_agent_jobs(uid, false)); - - let get_personal_jobs = warp::get() - .and(warp::path("get_agent_jobs")) - .and(warp::path::param::().map(Some)) - .and_then(|uid| handlers::get_agent_jobs(uid, true)); - - let del = warp::get() - .and(warp::path("del")) - .and(warp::path::param::()) - .and_then(handlers::del); - - let set_jobs = warp::post() - .and(warp::path("set_jobs")) - .and(warp::path::param::()) - .and(get_content::>()) - .and_then(handlers::set_jobs); - - let report = warp::post() - .and(warp::path("report")) - .and(get_content::>().and_then(handlers::report)); - - let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); - - let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); - - let auth_zone = auth_token.and( - get_agents - .or(get_jobs) - .or(upload_jobs) - .or(del) - .or(set_jobs) - .or(get_agent_jobs), - ); - - let routes = agent_zone.or(auth_zone); - warp::serve(routes.with(warp::log("warp"))) - .run(([0, 0, 0, 0], MASTER_PORT)) - .await; -} -/* -#[cfg(test)] -mod tests { - use super::*; - /* - #[tokio::test] - async fn test_gather() { - } - */ + serve().await; } -*/ diff --git a/integration/Cargo.toml b/integration/Cargo.toml new file mode 100644 index 0000000..dfccd4c --- /dev/null +++ b/integration/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "integration" +version = "0.1.0" +authors = ["plazmoid "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] } +log = "^0.4" +env_logger = "0.8.3" +uuid = { version = "0.6.5", features = ["serde", "v4"] } +reqwest = { version = "0.11", features = ["json"] } +serde_json = "1.0" +serde = { version = "1.0.114", features = ["derive"] } +futures = "0.3.5" +shlex = "1.0.0" + + +[[test]] +name = "integration" +path = "tests/tests.rs" \ No newline at end of file diff --git a/integration/docker-compose.yml b/integration/docker-compose.yml new file mode 100644 index 0000000..bd83d3e --- /dev/null +++ b/integration/docker-compose.yml @@ -0,0 +1,89 @@ +version: "2.1" + +networks: + u_net: + +services: + + u_server: + image: unki/u_server + networks: + - u_net + volumes: + - ../target/x86_64-unknown-linux-musl/release/u_server:/u_server + - ../:/unki/ + working_dir: /unki + command: bash -c "diesel setup && diesel migration run && /u_server" + depends_on: + u_db: + condition: service_healthy + expose: + - '63714' + environment: + RUST_LOG: warp=info + env_file: + - ../.env + healthcheck: + test: /bin/ss -tlpn | grep 63714 + interval: 5s + timeout: 2s + retries: 2 + + u_db: + image: unki/u_db + networks: + - u_net + expose: + - '5432' + env_file: + - ../.env + healthcheck: + test: /bin/ss -tlpn | grep 5432 + interval: 5s + timeout: 2s + retries: 2 + + u_agent_1: + image: unki/u_agent + networks: + - u_net + volumes: + - ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent + command: /u_agent u_server + depends_on: + u_server: + condition: service_healthy + + u_agent_2: + image: unki/u_agent + networks: + - u_net + volumes: + - ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent + command: /u_agent u_server + depends_on: + u_server: + condition: service_healthy + + tests_runner: + image: unki/tests_runner + networks: + - u_net + volumes: + - ./:/tests/ + - ../target/x86_64-unknown-linux-musl/release/u_panel:/u_panel + - ~/.cargo/registry:/root/.cargo/registry + working_dir: + /tests/ + env_file: + - ../.env + depends_on: + u_agent_1: + condition: service_started + u_agent_2: + condition: service_started + u_server: + condition: service_healthy + environment: + RUST_BACKTRACE: 1 + U_SERVER: u_server \ No newline at end of file diff --git a/integration/docker.py b/integration/docker.py new file mode 100644 index 0000000..68abd69 --- /dev/null +++ b/integration/docker.py @@ -0,0 +1,81 @@ +import subprocess +from utils import * + +BASE_IMAGE_DIR = 'images' + +DOCKERFILES = { + 'u_agent': { + 'ctx': BASE_IMAGE_DIR, + 'dockerfile_prefix': 'u_agent' + }, + 'u_server': { + 'ctx': BASE_IMAGE_DIR, + 'dockerfile_prefix': 'u_server' + }, + 'u_db': { + 'ctx': BASE_IMAGE_DIR, + 'dockerfile_prefix': 'u_db' + }, + 'tests_runner': { + 'ctx': BASE_IMAGE_DIR, + 'dockerfile_prefix': 'tests_runner' + }, +} + + +def docker(args): + cmd = ['docker'] + args + log(f'Running docker command: {cmd}') + return subprocess.run( + cmd, + check=True, + ) + + +def print_errors(errors): + err_msg = '\n'.join( + ' {container}: {error}'.format(container=item['container'], + error=item['error']) + for item in errors) + + err('There are some errors in next containers:\n%s' % err_msg) + + +def check_state(containers): + errors = [] + for container in containers: + ret, out = subprocess.getstatusoutput( + 'docker inspect --format \'{{ .State.Running }}\' %s' + % container) + out = out.strip() + if ret == 0: + if out == 'true': + continue + else: + errors.append({'container': container, + 'error': 'Bad state: Running=%s' % out}) + else: + errors.append({'container': container, + 'error': out}) + + return errors + + +def rebuild_images_if_needed(force_rebuild=False): + for img_name, data in DOCKERFILES.items(): + ctx = data['ctx'] + df_prefix = data.get('dockerfile_prefix') + df_suffix = 'Dockerfile' + img_name = f'unki/{img_name}' + log(f'Building docker image {img_name}') + cmd = [ + 'build', + '-t', + img_name, + ctx, + ] + if df_prefix: + cmd += ['-f', f'{ctx}/{df_prefix}.{df_suffix}'] + if force_rebuild: + cmd += ['--no-cache'] + docker(cmd) diff --git a/integration/docker_compose.py b/integration/docker_compose.py new file mode 100644 index 0000000..47e3916 --- /dev/null +++ b/integration/docker_compose.py @@ -0,0 +1,66 @@ +import subprocess +import shlex +from utils import * +from docker import docker, check_state, print_errors + + +class Compose: + ALL_CONTAINERS = [ + 'u_agent_1', + 'u_agent_2', + 'u_server', + 'u_db', + 'tests_runner', + ] + + def __init__(self): + self.container_tpl = 'integration_%s_1' + self.cmd_container = self.container_tpl % 'tests_runner' + self.ALL_CONTAINERS = [self.container_tpl % c for c in self.ALL_CONTAINERS] + + def _call(self, *args): + subprocess.check_call([ + 'docker-compose', + '--no-ansi', + ] + list(args) + ) + + def up(self): + log('Instanciating cluster') + self._call('up', '-d') + log('Ok') + + def down(self): + log('Shutting down cluster') + self._call('down') + log('Ok') + + def stop(self): + log('Stopping cluster') + self._call('stop') + log('Ok') + + def run(self, cmd): + container = self.cmd_container + if isinstance(cmd, str): + cmd = shlex.split(cmd) + log(f'Running command "{cmd}" in container {container}') + result = docker([ + 'exec', + '-ti', + container + ] + cmd) + log('Ok') + return result + + def is_alive(self): + log('Check if all containers are alive') + + errors = check_state(self.ALL_CONTAINERS) + log('Check done') + + if errors: + print_errors(errors) + raise TestsError('Error during `is_alive` check') + else: + log('All containers are alive') diff --git a/integration/images/tests_runner.Dockerfile b/integration/images/tests_runner.Dockerfile new file mode 100644 index 0000000..2411bd6 --- /dev/null +++ b/integration/images/tests_runner.Dockerfile @@ -0,0 +1,4 @@ +FROM rust:1.53 + +RUN rustup target add x86_64-unknown-linux-musl +CMD ["sleep", "3600"] \ No newline at end of file diff --git a/integration/images/u_agent.Dockerfile b/integration/images/u_agent.Dockerfile new file mode 100644 index 0000000..3058997 --- /dev/null +++ b/integration/images/u_agent.Dockerfile @@ -0,0 +1,3 @@ +FROM centos:7 + +RUN yum update -y \ No newline at end of file diff --git a/integration/images/u_db.Dockerfile b/integration/images/u_db.Dockerfile new file mode 100644 index 0000000..8577c8d --- /dev/null +++ b/integration/images/u_db.Dockerfile @@ -0,0 +1,3 @@ +FROM postgres:13.3 + +RUN apt update && apt -y upgrade && apt install -y iproute2 \ No newline at end of file diff --git a/integration/images/u_server.Dockerfile b/integration/images/u_server.Dockerfile new file mode 100644 index 0000000..ea82ccd --- /dev/null +++ b/integration/images/u_server.Dockerfile @@ -0,0 +1,3 @@ +FROM rust:1.53 + +RUN cargo install diesel_cli --no-default-features --features postgres \ No newline at end of file diff --git a/integration/integration_tests.py b/integration/integration_tests.py new file mode 100644 index 0000000..e4fe460 --- /dev/null +++ b/integration/integration_tests.py @@ -0,0 +1,36 @@ +import signal +import sys +from utils import * +from docker import rebuild_images_if_needed +from docker_compose import Compose + + +cluster = Compose() + + +def abort_handler(s, _): + warn(f'Received signal: {s}') + warn(f'Gracefully stopping...') + cluster.down() + + +def run_tests(): + force_rebuild = '--rebuild' in sys.argv + preserve_containers = '--preserve' in sys.argv + for s in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP): + signal.signal(s, abort_handler) + rebuild_images_if_needed(force_rebuild) + try: + cluster.up() + cluster.is_alive() + cluster.run('cargo test --test integration') + except Exception as e: + err(e) + sys.exit(1) + finally: + if not preserve_containers: + cluster.down() + + +if __name__ == '__main__': + run_tests() diff --git a/integration/integration_tests.sh b/integration/integration_tests.sh new file mode 100755 index 0000000..3f3eca1 --- /dev/null +++ b/integration/integration_tests.sh @@ -0,0 +1,3 @@ +#!/bin/bash +set -e +python integration_tests.py $@ diff --git a/integration/src/main.rs b/integration/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/integration/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/integration/tests/behaviour.rs b/integration/tests/behaviour.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/integration/tests/behaviour.rs @@ -0,0 +1 @@ + diff --git a/integration/tests/helpers/client.rs b/integration/tests/helpers/client.rs new file mode 100644 index 0000000..01c5c89 --- /dev/null +++ b/integration/tests/helpers/client.rs @@ -0,0 +1,48 @@ +use reqwest::{Client, RequestBuilder, Url}; +use serde::Serialize; +use serde_json::{from_str, json, Value}; + +const SERVER: &str = "u_server"; +const PORT: &str = "63714"; + +pub struct AgentClient { + client: Client, + base_url: Url, +} + +impl AgentClient { + pub fn new() -> Self { + Self { + client: Client::new(), + base_url: Url::parse(&format!("http://{}:{}", SERVER, PORT)).unwrap(), + } + } + + async fn process_request(&self, req: RequestBuilder, resp_needed: bool) -> Value { + let resp = req.send().await.unwrap(); + if let Err(e) = resp.error_for_status_ref() { + panic!( + "Server responded with code {}\nError: {}", + e.status() + .map(|s| s.to_string()) + .unwrap_or(String::from("")), + e.to_string() + ); + } + if !resp_needed { + return json!([]); + } + let resp: Value = from_str(&resp.text().await.unwrap()).unwrap(); + resp.get("inner").unwrap().get(0).unwrap().clone() + } + + pub async fn get>(&self, url: S) -> Value { + let req = self.client.get(self.base_url.join(url.as_ref()).unwrap()); + self.process_request(req, true).await + } + + pub async fn post, B: Serialize>(&self, url: S, body: &B) -> Value { + let req = self.client.post(self.base_url.join(url.as_ref()).unwrap()); + self.process_request(req.json(body), false).await + } +} diff --git a/integration/tests/helpers/mod.rs b/integration/tests/helpers/mod.rs new file mode 100644 index 0000000..0957ad9 --- /dev/null +++ b/integration/tests/helpers/mod.rs @@ -0,0 +1,5 @@ +pub mod client; +pub mod panel; + +pub use client::AgentClient; +pub use panel::Panel; diff --git a/integration/tests/helpers/panel.rs b/integration/tests/helpers/panel.rs new file mode 100644 index 0000000..f250332 --- /dev/null +++ b/integration/tests/helpers/panel.rs @@ -0,0 +1,53 @@ +use serde_json::{from_slice, Value}; +use shlex::split; +use std::process::{Command, Output}; + +const PANEL_BINARY: &str = "/u_panel"; + +pub struct Panel; + +impl Panel { + fn run(args: &[&str]) -> Output { + Command::new(PANEL_BINARY) + .arg("--json") + .args(args) + .output() + .unwrap() + } + + pub fn output_argv(args: &[&str]) -> Value { + let result = Self::run(args); + assert!(result.status.success()); + from_slice(&result.stdout).unwrap() + } + + pub fn output>(args: S) -> Value { + let splitted = split(args.into().as_ref()).unwrap(); + Self::output_argv( + splitted + .iter() + .map(|s| s.as_ref()) + .collect::>() + .as_ref(), + ) + } + + fn status_is_ok(data: &Value) { + assert_eq!( + data["status"], "ok", + "Panel failed with erroneous status: {}", + data["data"] + ); + } + + pub fn check_status>(args: S) { + let result = Self::output(args); + Self::status_is_ok(&result); + } + + pub fn check_output>(args: S) -> Vec { + let result = Self::output(args); + Self::status_is_ok(&result); + result["data"].as_array().unwrap().clone() + } +} diff --git a/integration/tests/tests.rs b/integration/tests/tests.rs new file mode 100644 index 0000000..2cc9bce --- /dev/null +++ b/integration/tests/tests.rs @@ -0,0 +1,80 @@ +mod helpers; + +use helpers::{AgentClient, Panel}; + +use serde_json::json; +use std::thread::sleep; +use std::time::Duration; +use uuid::Uuid; + +type TestResult = Result>; + +async fn register_agent() -> Uuid { + let cli = AgentClient::new(); + let agent_uid = Uuid::new_v4(); + let resp = cli.get(format!("get_agent_jobs/{}", agent_uid)).await; + let job_id = &resp["job_id"]; + let resp = cli.get(format!("get_jobs/{}", job_id)).await; + assert_eq!(&resp["alias"], "agent_hello"); + let agent_data = json! { + {"id": &agent_uid,"inner":[ + {"Agent": + {"alias":null, + "hostname":"3b1030fa6324", + "id":&agent_uid, + "is_root":false, + "is_root_allowed":false, + "last_active":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814921}, + "platform":"x86_64-unknown-linux-gnu", + "regtime":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814945}, + "state":"New", + "token":null, + "username":"root"} + } + ]} + }; + cli.post("report", &agent_data).await; + agent_uid +} + +#[tokio::test] +async fn test_registration() -> TestResult { + let agent_uid = register_agent().await; + let agents = Panel::check_output("agents list"); + let found = agents + .iter() + .find(|v| v["id"].as_str().unwrap() == agent_uid.to_string()); + assert!(found.is_some()); + Ok(()) +} + +#[tokio::test] +async fn test_setup_tasks() -> TestResult { + let agent_uid = Panel::check_output("agents list")[0]["id"].clone(); + dbg!(&agent_uid); + let job_alias = "passwd_contents"; + let cmd = format!("jobs add --alias {} 'cat /etc/passwd'", job_alias); + Panel::check_status(cmd); + let cmd = format!("jobmap add {} {}", agent_uid, job_alias); + let assigned_uids = Panel::check_output(cmd); + dbg!(&assigned_uids); + loop { + let result = Panel::check_output(format!("jobmap list {}", assigned_uids[0])); + dbg!(&result); + match result.get(0) { + Some(entry) if entry["state"] == "Finished" => { + println!("{}", result[0]); + break; + } + None => { + eprintln!("jobmap list is empty (bad bad bad)"); + continue; + } + _ => { + sleep(Duration::from_secs(1)); + eprintln!("waiting for task"); + } + } + } + Ok(()) +} diff --git a/integration/utils.py b/integration/utils.py new file mode 100644 index 0000000..0015df2 --- /dev/null +++ b/integration/utils.py @@ -0,0 +1,32 @@ +from termcolor import colored + +__all__ = ['log', 'warn', 'err', 'TestsError'] + + +class TestsError(Exception): + pass + + +COLORS = { + 'question': colored('[?]', 'magenta'), + 'info': colored('[~]', 'green'), + 'warning': colored('[!]', 'yellow'), + 'error': colored('[X]', 'red'), +} + + +def warn(msg): + log(msg, log_lvl='w') + + +def err(msg): + log(msg, log_lvl='e') + + +def log(msg, log_lvl='i'): + for lvl, text in COLORS.items(): + if lvl.startswith(log_lvl): + print(f'{text} {msg}') + break + else: + ValueError('Unknown log level') diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 295b216..dad63a9 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +dotenv = "0.15.0" serde = { version = "1.0.114", features = ["derive"] } uuid = { version = "0.6.5", features = ["serde", "v4"] } nix = "0.17" @@ -14,10 +15,12 @@ libc = "^0.2" lazy_static = "1.4.0" tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] } reqwest = { version = "0.11", features = ["json"] } +openssl = "*" futures = "0.3.5" guess_host_triple = "0.1.2" thiserror = "*" log = "*" +mockall = "0.9.1" env_logger = "0.8.3" diesel-derive-enum = { version = "1", features = ["postgres"] } chrono = "0.4.19" diff --git a/lib/u_lib/build.rs b/lib/u_lib/build.rs new file mode 100644 index 0000000..cf1757d --- /dev/null +++ b/lib/u_lib/build.rs @@ -0,0 +1,14 @@ +use std::path::PathBuf; +use std::process::Command; + +fn main() { + let echoer = PathBuf::from("./tests/fixtures/echoer"); + let mut echoer_src = echoer.clone(); + echoer_src.set_extension("rs"); + Command::new("rustc") + .args(&[echoer_src.to_str().unwrap(), "-o", echoer.to_str().unwrap()]) + .status() + .unwrap(); + println!("cargo:rerun-if-changed={}", echoer_src.display()); + println!("cargo:rerun-if-changed={}", echoer.display()); +} diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 5e0cd3c..636a0dc 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -2,12 +2,12 @@ use crate::{ config::{MASTER_PORT, MASTER_SERVER}, messaging::{AsMsg, BaseMessage}, - models::*, - utils::opt_to_string, + models, + utils::{opt_to_string, VecDisplay}, UError, UResult, }; use reqwest::{Client, RequestBuilder, Url}; -use std::{net::Ipv4Addr, str::FromStr}; +use std::env; use u_api_proc_macro::api_route; use uuid::Uuid; @@ -18,10 +18,9 @@ pub struct ClientHandler { } impl ClientHandler { - pub fn new(server: Option) -> Self { - let master_server = server - .map(|s| Ipv4Addr::from_str(&s).unwrap()) - .unwrap_or(MASTER_SERVER); + pub fn new(server: Option<&str>) -> Self { + let env_server = env::var("U_SERVER").unwrap_or(String::from(MASTER_SERVER)); + let master_server = server.unwrap_or(env_server.as_str()); Self { client: Client::new(), base_url: Url::parse(&format!("http://{}:{}", master_server, MASTER_PORT)).unwrap(), @@ -53,24 +52,24 @@ impl ClientHandler { // // get jobs for client #[api_route("GET")] - fn get_agent_jobs(&self, url_param: Option) -> Vec {} + fn get_agent_jobs(&self, url_param: Option) -> VecDisplay {} // // send something to server #[api_route("POST")] - fn report(&self, payload: &M) {} + fn report(&self, payload: &M) -> models::Empty {} - //#/////////#// Admin area //#////////#// + //##########// Admin area //##########// /// client listing #[api_route("GET")] - fn get_agents(&self, url_param: Option) -> Vec {} + fn get_agents(&self, url_param: Option) -> VecDisplay {} // // get all available jobs #[api_route("GET")] - fn get_jobs(&self, url_param: Option) -> Vec {} + fn get_jobs(&self, url_param: Option) -> VecDisplay {} // // create and upload job #[api_route("POST")] - fn upload_jobs(&self, payload: &Vec) {} + fn upload_jobs(&self, payload: &[models::JobMeta]) -> models::Empty {} // // delete something #[api_route("GET")] @@ -78,5 +77,5 @@ impl ClientHandler { // // set jobs for client #[api_route("POST")] - fn set_jobs(&self, url_param: Option, payload: &Vec) {} + fn set_jobs(&self, url_param: Option, payload: &[String]) -> VecDisplay {} } diff --git a/lib/u_lib/src/builder.rs b/lib/u_lib/src/builder.rs index 7948b33..14b172d 100644 --- a/lib/u_lib/src/builder.rs +++ b/lib/u_lib/src/builder.rs @@ -2,7 +2,7 @@ use crate::{ cache::JobCache, executor::{FutRes, Waiter, DynFut}, models::{Agent, AssignedJob, JobMeta, JobType}, - utils::{CombinedResult, OneOrMany}, + utils::{CombinedResult, OneOrVec}, UError, }; use guess_host_triple::guess_host_triple; @@ -13,7 +13,7 @@ pub struct JobBuilder { } impl JobBuilder { - pub fn from_request>(job_requests: J) -> CombinedResult { + pub fn from_request>(job_requests: J) -> CombinedResult { let job_requests = job_requests.into_vec(); let mut prepared: Vec = vec![]; let mut result = CombinedResult::new(); @@ -52,7 +52,7 @@ impl JobBuilder { result } - pub fn from_meta>(job_metas: J) -> CombinedResult { + pub fn from_meta>(job_metas: J) -> CombinedResult { let job_requests = job_metas .into_vec() .into_iter() @@ -90,7 +90,7 @@ pub struct NamedJobBuilder { } impl NamedJobBuilder { - pub fn from_shell>( + pub fn from_shell>( named_jobs: J, ) -> CombinedResult { let mut result = CombinedResult::new(); @@ -111,7 +111,7 @@ impl NamedJobBuilder { result } - pub fn from_meta>(named_jobs: J) -> Self { + pub fn from_meta>(named_jobs: J) -> Self { let mut job_names = vec![]; let job_metas: Vec = named_jobs .into_vec() @@ -144,3 +144,172 @@ impl NamedJobBuilder { self.pop_opt(name).unwrap() } } + +#[cfg(test)] +mod tests { + + use test_case::test_case; + use std::{time::SystemTime}; + use crate::{ + errors::UError, + models::{ + jobs::{JobMeta}, + ExecResult, + misc::JobType + }, + builder::{JobBuilder, NamedJobBuilder}, + unwrap_enum, + }; + + type TestResult = Result>; + + #[tokio::test] + async fn test_is_really_async() { + const SLEEP_SECS: u64 = 1; + let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); + let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); + let now = SystemTime::now(); + JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await; + assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) + } + + #[test_case( + "/bin/sh {}", + Some(b"echo test01 > /tmp/asd; cat /tmp/asd"), + "test01" + ;"sh payload" + )] + #[test_case( + r#"/usr/bin/python -c 'print("test02")'"#, + None, + "test02" + ;"python cmd" + )] + #[test_case( + "/{}", + Some( + br#"#!/bin/sh + TMPPATH=/tmp/lol + mkdir -p $TMPPATH + echo test03 > $TMPPATH/t + cat $TMPPATH/t"# + ), + "test03" + ;"sh multiline payload" + )] + #[test_case( + "/{} 'some msg as arg'", + Some(include_bytes!("../tests/fixtures/echoer")), + "some msg as arg" + ;"standalone binary with args" + )] + #[tokio::test] + async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult { + let mut job = JobMeta::builder().with_shell(cmd); + if let Some(p) = payload { + job = job.with_payload(p); + } + let job = job.build().unwrap(); + let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; + let result = unwrap_enum!(job_result, ExecResult::Assigned); + let result = result.to_string_result().unwrap(); + assert_eq!(result.trim(), expected_result); + Ok(()) + } + + #[tokio::test] + async fn test_complex_load() -> TestResult { + const SLEEP_SECS: u64 = 1; + let now = SystemTime::now(); + let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); + let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await; + let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one() + .wait_one() + .await; + let ls = unwrap_enum!(ls, ExecResult::Assigned); + assert_eq!(ls.retcode.unwrap(), 0); + let folders = ls.to_string_result().unwrap(); + let subfolders_jobs: Vec = folders + .lines() + .map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) + .collect(); + let ls_subfolders = JobBuilder::from_meta(subfolders_jobs) + .unwrap_one() + .wait() + .await; + for result in ls_subfolders { + let result = unwrap_enum!(result, ExecResult::Assigned); + assert_eq!(result.retcode.unwrap(), 0); + } + longest_job.wait().await; + assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); + Ok(()) + } + /* + #[tokio::test] + async fn test_exec_multiple_jobs_nowait() -> UResult<()> { + const REPEATS: usize = 10; + let job = JobMeta::from_shell("whoami"); + let sleep_jobs: Vec = (0..=REPEATS).map(|_| job.clone()).collect(); + build_jobs(sleep_jobs).spawn().await; + let mut completed = 0; + while completed < REPEATS { + let c = pop_completed().await.len(); + if c > 0 { + completed += c; + println!("{}", c); + } + } + Ok(()) + } + */ + #[tokio::test] + async fn test_failing_shell_job() -> TestResult { + let job = JobMeta::from_shell("lol_kek_puk")?; + let job_result = JobBuilder::from_meta(job) + .unwrap_one() + .wait_one() + .await; + let job_result = unwrap_enum!(job_result, ExecResult::Assigned); + let output = job_result.to_string_result().unwrap(); + assert!(output.contains("No such file")); + assert!(job_result.retcode.is_none()); + Ok(()) + } + + #[test_case( + "/bin/bash {}", + None, + "contains executable" + ; "no binary" + )] + #[test_case( + "/bin/bash", + Some(b"whoami"), + "contains no executable" + ; "no path to binary" + )] + #[tokio::test] + async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult { + let mut job = JobMeta::builder().with_shell(cmd); + if let Some(p) = payload { + job = job.with_payload(p); + } + let err = job.build().unwrap_err(); + let err_msg = unwrap_enum!(err, UError::JobArgsError); + assert!(err_msg.contains(err_str)); + Ok(()) + } + + #[tokio::test] + async fn test_different_job_types() -> TestResult { + let mut jobs = NamedJobBuilder::from_meta(vec![ + ("sleeper", JobMeta::from_shell("sleep 3")?), + ("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?) + ]).wait().await; + let gathered = jobs.pop("gatherer"); + assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None); + Ok(()) + } + +} \ No newline at end of file diff --git a/lib/u_lib/src/config.rs b/lib/u_lib/src/config.rs index 7db464c..88fc795 100644 --- a/lib/u_lib/src/config.rs +++ b/lib/u_lib/src/config.rs @@ -1,9 +1,8 @@ -use std::net::Ipv4Addr; use uuid::Uuid; -pub const MASTER_SERVER: Ipv4Addr = Ipv4Addr::LOCALHOST; //Ipv4Addr::new(3,9,16,40) +pub const MASTER_SERVER: &str = "127.0.0.1"; //Ipv4Addr::new(3,9,16,40) pub const MASTER_PORT: u16 = 63714; lazy_static! { pub static ref UID: Uuid = Uuid::new_v4(); -} \ No newline at end of file +} diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index c4d6492..95ab8aa 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -1,7 +1,7 @@ -use crate::{utils::OneOrMany, models::ExecResult}; +use crate::{models::ExecResult, utils::OneOrVec}; use futures::{future::BoxFuture, lock::Mutex}; use lazy_static::lazy_static; -use std::{collections::HashMap}; +use std::collections::HashMap; use tokio::{ spawn, sync::mpsc::{channel, Receiver, Sender}, @@ -37,7 +37,7 @@ pub struct Waiter { } impl Waiter { - pub fn new>(tasks: S) -> Self { + pub fn new>(tasks: S) -> Self { Self { tasks: tasks.into_vec(), fids: vec![], diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index d615391..00d1c18 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -26,3 +26,6 @@ extern crate diesel; #[macro_use] extern crate log; extern crate env_logger; + +#[macro_use] +extern crate mockall; diff --git a/lib/u_lib/src/messaging.rs b/lib/u_lib/src/messaging.rs index 7d7a34c..3af1933 100644 --- a/lib/u_lib/src/messaging.rs +++ b/lib/u_lib/src/messaging.rs @@ -1,9 +1,11 @@ +use crate::utils::VecDisplay; use crate::UID; use serde::{Deserialize, Serialize}; use std::borrow::Cow; +use std::fmt::Display; use uuid::Uuid; -pub struct Moo<'cow, T: Clone>(pub Cow<'cow, T>); +pub struct Moo<'cow, T: AsMsg + Clone>(pub Cow<'cow, T>); pub trait AsMsg: Clone + Serialize { fn as_message<'m>(&'m self) -> BaseMessage<'m, Self> @@ -29,6 +31,8 @@ impl<'cow, M: AsMsg> From<&'cow M> for Moo<'cow, M> { } impl AsMsg for Vec {} +impl AsMsg for VecDisplay {} +impl<'msg, M: AsMsg> AsMsg for &'msg [M] {} #[derive(Serialize, Deserialize, Debug)] pub struct BaseMessage<'cow, I: AsMsg> { diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index fe213b9..2c66f1e 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -79,10 +79,9 @@ impl Agent { .await; let decoder = |job_result: ExecResult| { let assoc_job = unwrap_enum!(job_result, ExecResult::Assigned); - assoc_job.to_string_result().unwrap() + assoc_job.to_string_result().unwrap().trim().to_string() }; - #[cfg(unix)] Self { hostname: decoder(builder.pop("hostname")), is_root: &decoder(builder.pop("is_root")) == "0", diff --git a/lib/u_lib/src/models/jobs/misc.rs b/lib/u_lib/src/models/jobs/misc.rs index 9c45195..7451548 100644 --- a/lib/u_lib/src/models/jobs/misc.rs +++ b/lib/u_lib/src/models/jobs/misc.rs @@ -136,3 +136,51 @@ impl JobOutput { result } } + +#[cfg(test)] +mod tests { + use crate::{models::JobOutput, utils::bytes_to_string}; + use test_case::test_case; + + const STDOUT: &str = "<***STDOUT***>"; + const STDERR: &str = "<***STDERR***>"; + + #[test_case( + "lol", + "kek", + &format!("{}lol{}kek", STDOUT, STDERR) + ;"stdout stderr" + )] + #[test_case( + "", + "kek", + &format!("{}kek", STDERR) + ;"stderr" + )] + fn test_to_combined(stdout: &str, stderr: &str, result: &str) { + let output = JobOutput::new() + .stdout(stdout.as_bytes().to_vec()) + .stderr(stderr.as_bytes().to_vec()); + assert_eq!(&bytes_to_string(&output.into_combined()), result) + } + + #[test_case( + &format!("{}lal{}kik", STDOUT, STDERR), + "lal\nkik" + ;"stdout stderr" + )] + #[test_case( + &format!("{}qeq", STDOUT), + "qeq" + ;"stdout" + )] + #[test_case( + &format!("{}vev", STDERR), + "vev" + ;"stderr" + )] + fn test_from_combined(src: &str, result: &str) { + let output = JobOutput::from_combined(src.as_bytes()).unwrap(); + assert_eq!(bytes_to_string(&output.to_appropriate()).trim(), result); + } +} diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index 9406e9a..d30a33a 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -6,6 +6,8 @@ pub mod schema; use crate::messaging::AsMsg; pub use crate::models::result::ExecResult; pub use crate::models::{agent::*, jobs::*}; +use serde::{Deserialize, Serialize}; +use std::fmt; use uuid::Uuid; impl AsMsg for Agent {} @@ -14,4 +16,13 @@ impl AsMsg for ExecResult {} impl AsMsg for JobMeta {} impl AsMsg for String {} impl AsMsg for Uuid {} -impl AsMsg for () {} +impl AsMsg for Empty {} + +#[derive(Serialize, Deserialize, Clone, Default, Debug)] +pub struct Empty; + +impl fmt::Display for Empty { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "") + } +} diff --git a/lib/u_lib/src/utils.rs b/lib/u_lib/src/utils.rs deleted file mode 100644 index ebf3ebe..0000000 --- a/lib/u_lib/src/utils.rs +++ /dev/null @@ -1,161 +0,0 @@ -use crate::UError; -use chrono::{offset::Local, DateTime}; -use nix::{ - sys::signal::{signal, SigHandler, Signal}, - unistd::{chdir, close as fdclose, fork, getppid, setsid, ForkResult}, -}; -use std::{ - env::temp_dir, fs, ops::Drop, os::unix::fs::PermissionsExt, path::PathBuf, process::exit, - time::SystemTime, -}; -use uuid::Uuid; - -pub trait OneOrMany { - fn into_vec(self) -> Vec; -} - -impl OneOrMany for T { - fn into_vec(self) -> Vec { - vec![self] - } -} - -impl OneOrMany for Vec { - fn into_vec(self) -> Vec { - self - } -} - -pub fn daemonize() { - if getppid().as_raw() != 1 { - setsig(Signal::SIGTTOU, SigHandler::SigIgn); - setsig(Signal::SIGTTIN, SigHandler::SigIgn); - setsig(Signal::SIGTSTP, SigHandler::SigIgn); - } - for fd in 0..=2 { - match fdclose(fd) { - _ => (), - } - } - match chdir("/") { - _ => (), - }; - - match fork() { - Ok(ForkResult::Parent { .. }) => { - exit(0); - } - Ok(ForkResult::Child) => match setsid() { - _ => (), - }, - Err(_) => exit(255), - } -} - -pub fn setsig(sig: Signal, hnd: SigHandler) { - unsafe { - signal(sig, hnd).unwrap(); - } -} - -pub fn vec_to_string(v: &[u8]) -> String { - String::from_utf8_lossy(v).to_string() -} - -pub fn opt_to_string(item: Option) -> String { - match item { - Some(s) => s.to_string(), - None => String::new(), - } -} - -pub fn systime_to_string(time: &SystemTime) -> String { - DateTime::::from(*time) - .format("%d/%m/%Y %T") - .to_string() -} - -pub struct TempFile { - path: PathBuf, -} - -impl TempFile { - pub fn get_path(&self) -> String { - self.path.to_string_lossy().to_string() - } - - pub fn new() -> Self { - let name = Uuid::simple(&Uuid::new_v4()).to_string(); - let mut path = temp_dir(); - path.push(name); - Self { path } - } - - pub fn write_all(&self, data: &[u8]) -> Result<(), String> { - fs::write(&self.path, data).map_err(|e| e.to_string()) - } - - pub fn write_exec(data: &[u8]) -> Result { - let this = Self::new(); - let path = this.get_path(); - this.write_all(data).map_err(|e| (path.clone(), e))?; - let perms = fs::Permissions::from_mode(0o555); - fs::set_permissions(&path, perms).map_err(|e| (path, e.to_string()))?; - Ok(this) - } -} - -impl Drop for TempFile { - fn drop(&mut self) { - fs::remove_file(&self.path).ok(); - } -} - -pub struct CombinedResult { - ok: Vec, - err: Vec, -} - -impl CombinedResult { - pub fn new() -> Self { - Self { - ok: vec![], - err: vec![], - } - } - - pub fn ok>(&mut self, result: I) { - self.ok.extend(result.into_vec()); - } - - pub fn err>(&mut self, err: I) { - self.err.extend(err.into_vec()); - } - - pub fn unwrap(self) -> Vec { - let err_len = self.err.len(); - if err_len > 0 { - panic!("CombinedResult has {} errors", err_len); - } - self.ok - } - - pub fn unwrap_one(self) -> T { - self.unwrap().pop().unwrap() - } - - pub fn pop_errors(&mut self) -> Vec { - self.err.drain(..).collect() - } -} - -#[macro_export] -macro_rules! unwrap_enum { - ($src:ident, $t:path) => { - if let $t(result) = $src { - result - } else { - panic!("wrong type") - } - }; -} diff --git a/lib/u_lib/src/utils/combined_result.rs b/lib/u_lib/src/utils/combined_result.rs new file mode 100644 index 0000000..687f541 --- /dev/null +++ b/lib/u_lib/src/utils/combined_result.rs @@ -0,0 +1,40 @@ +use crate::utils::OneOrVec; +use crate::UError; + +pub struct CombinedResult { + ok: Vec, + err: Vec, +} + +impl CombinedResult { + pub fn new() -> Self { + Self { + ok: vec![], + err: vec![], + } + } + + pub fn ok>(&mut self, result: I) { + self.ok.extend(result.into_vec()); + } + + pub fn err>(&mut self, err: I) { + self.err.extend(err.into_vec()); + } + + pub fn unwrap(self) -> Vec { + let err_len = self.err.len(); + if err_len > 0 { + panic!("CombinedResult has {} errors", err_len); + } + self.ok + } + + pub fn unwrap_one(self) -> T { + self.unwrap().pop().unwrap() + } + + pub fn pop_errors(&mut self) -> Vec { + self.err.drain(..).collect() + } +} diff --git a/lib/u_lib/src/utils/conv.rs b/lib/u_lib/src/utils/conv.rs new file mode 100644 index 0000000..9cb2166 --- /dev/null +++ b/lib/u_lib/src/utils/conv.rs @@ -0,0 +1,19 @@ +use chrono::{offset::Local, DateTime}; +use std::time::SystemTime; + +pub fn bytes_to_string(v: &[u8]) -> String { + String::from_utf8_lossy(v).to_string() +} + +pub fn opt_to_string(item: Option) -> String { + match item { + Some(s) => s.to_string(), + None => String::new(), + } +} + +pub fn systime_to_string(time: &SystemTime) -> String { + DateTime::::from(*time) + .format("%d/%m/%Y %T") + .to_string() +} diff --git a/lib/u_lib/src/utils/misc.rs b/lib/u_lib/src/utils/misc.rs new file mode 100644 index 0000000..4f93a85 --- /dev/null +++ b/lib/u_lib/src/utils/misc.rs @@ -0,0 +1,71 @@ +use nix::{ + sys::signal::{signal, SigHandler, Signal}, + unistd::{chdir, close as fdclose, fork, getppid, setsid, ForkResult}, +}; +use std::process::exit; + +pub trait OneOrVec { + fn into_vec(self) -> Vec; +} + +impl OneOrVec for T { + fn into_vec(self) -> Vec { + vec![self] + } +} + +impl OneOrVec for Vec { + fn into_vec(self) -> Vec { + self + } +} + +#[macro_export] +macro_rules! unwrap_enum { + ($src:ident, $t:path) => { + if let $t(result) = $src { + result + } else { + panic!("wrong type") + } + }; +} + +pub fn daemonize() { + if getppid().as_raw() != 1 { + setsig(Signal::SIGTTOU, SigHandler::SigIgn); + setsig(Signal::SIGTTIN, SigHandler::SigIgn); + setsig(Signal::SIGTSTP, SigHandler::SigIgn); + } + for fd in 0..=2 { + match fdclose(fd) { + _ => (), + } + } + match chdir("/") { + _ => (), + }; + + match fork() { + Ok(ForkResult::Parent { .. }) => { + exit(0); + } + Ok(ForkResult::Child) => match setsid() { + _ => (), + }, + Err(_) => exit(255), + } +} + +pub fn setsig(sig: Signal, hnd: SigHandler) { + unsafe { + signal(sig, hnd).unwrap(); + } +} + +pub fn init_env() { + let envs = [".env"]; + for envfile in &envs { + dotenv::from_filename(envfile).ok(); + } +} diff --git a/lib/u_lib/src/utils/mod.rs b/lib/u_lib/src/utils/mod.rs new file mode 100644 index 0000000..13f94d3 --- /dev/null +++ b/lib/u_lib/src/utils/mod.rs @@ -0,0 +1,11 @@ +pub mod combined_result; +pub mod conv; +pub mod misc; +pub mod tempfile; +pub mod vec_display; + +pub use combined_result::*; +pub use conv::*; +pub use misc::*; +pub use tempfile::*; +pub use vec_display::*; diff --git a/lib/u_lib/src/utils/tempfile.rs b/lib/u_lib/src/utils/tempfile.rs new file mode 100644 index 0000000..e76baa8 --- /dev/null +++ b/lib/u_lib/src/utils/tempfile.rs @@ -0,0 +1,38 @@ +use std::{env::temp_dir, fs, ops::Drop, os::unix::fs::PermissionsExt, path::PathBuf}; +use uuid::Uuid; + +pub struct TempFile { + path: PathBuf, +} + +impl TempFile { + pub fn get_path(&self) -> String { + self.path.to_string_lossy().to_string() + } + + pub fn new() -> Self { + let name = Uuid::simple(&Uuid::new_v4()).to_string(); + let mut path = temp_dir(); + path.push(name); + Self { path } + } + + pub fn write_all(&self, data: &[u8]) -> Result<(), String> { + fs::write(&self.path, data).map_err(|e| e.to_string()) + } + + pub fn write_exec(data: &[u8]) -> Result { + let this = Self::new(); + let path = this.get_path(); + this.write_all(data).map_err(|e| (path.clone(), e))?; + let perms = fs::Permissions::from_mode(0o555); + fs::set_permissions(&path, perms).map_err(|e| (path, e.to_string()))?; + Ok(this) + } +} + +impl Drop for TempFile { + fn drop(&mut self) { + fs::remove_file(&self.path).ok(); + } +} diff --git a/lib/u_lib/src/utils/vec_display.rs b/lib/u_lib/src/utils/vec_display.rs new file mode 100644 index 0000000..117ee44 --- /dev/null +++ b/lib/u_lib/src/utils/vec_display.rs @@ -0,0 +1,40 @@ +use crate::{messaging::AsMsg, utils::OneOrVec}; +use serde::{Deserialize, Serialize}; +use std::fmt::{self, Display, Formatter}; +use std::ops::{Deref, DerefMut}; + +#[derive(Serialize, Deserialize, Clone, Default)] +pub struct VecDisplay(pub Vec); + +impl VecDisplay { + pub fn new>(inner: I) -> Self { + VecDisplay(inner.into_vec()) + } + + pub fn into_builtin_vec(self) -> Vec { + self.0 + } +} + +impl Deref for VecDisplay { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for VecDisplay { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Display for VecDisplay { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + for (i, itm) in self.0.iter().enumerate() { + writeln!(f, "### {}:\n{}\n", i, itm)?; + } + Ok(()) + } +} diff --git a/lib/u_lib/tests/api_macro.rs b/lib/u_lib/tests/api_macro.rs deleted file mode 100644 index e5fd094..0000000 --- a/lib/u_lib/tests/api_macro.rs +++ /dev/null @@ -1,17 +0,0 @@ -/* -use std::fmt::Display; -use u_api_proc_macro::api_route; -use uuid::Uuid; - -struct Paths; -struct ClientHandler; - -#[test] -fn test_api_proc_macro() { - #[api_route("GET", Uuid)] - fn list(&self, msg: T) -> String {} - - #[api_route("POST", Uuid)] - fn report(&self, msg: T) -> String {} -} -*/ diff --git a/lib/u_lib/tests/fixtures/echoer b/lib/u_lib/tests/fixtures/echoer deleted file mode 100755 index 4c13f8d..0000000 Binary files a/lib/u_lib/tests/fixtures/echoer and /dev/null differ diff --git a/lib/u_lib/tests/jobs/execution.rs b/lib/u_lib/tests/jobs/execution.rs deleted file mode 100644 index 598dd8b..0000000 --- a/lib/u_lib/tests/jobs/execution.rs +++ /dev/null @@ -1,161 +0,0 @@ -use std::{time::SystemTime}; -use u_lib::{ - errors::UError, - models::{ - jobs::{JobMeta}, - ExecResult, - misc::JobType - }, - builder::{JobBuilder, NamedJobBuilder} -}; - -type TestResult = Result>; - -#[tokio::test] -async fn test_is_really_async() { - const SLEEP_SECS: u64 = 1; - let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); - let sleep_jobs: Vec = (0..50).map(|_| job.clone()).collect(); - let now = SystemTime::now(); - JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await; - assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) -} - -#[test_case( - "/bin/sh {}", - Some(b"echo test01 > /tmp/asd; cat /tmp/asd"), - "test01" - ;"sh payload" -)] -#[test_case( - r#"/usr/bin/python -c 'print("test02")'"#, - None, - "test02" - ;"python cmd" -)] -#[test_case( - "/{}", - Some( -br#"#!/bin/sh -TMPPATH=/tmp/lol -mkdir -p $TMPPATH -echo test03 > $TMPPATH/t -cat $TMPPATH/t"# - ), - "test03" - ;"sh multiline payload" -)] -#[test_case( - "/{} 'some msg as arg'", - Some(include_bytes!("../fixtures/echoer")), - "some msg as arg" - ;"standalone binary with args" -)] -#[tokio::test] -async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult { - let mut job = JobMeta::builder().with_shell(cmd); - if let Some(p) = payload { - job = job.with_payload(p); - } - let job = job.build().unwrap(); - let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await; - let result = unwrap_enum!(job_result, ExecResult::Assigned); - let result = result.to_string_result().unwrap(); - assert_eq!(result.trim(), expected_result); - Ok(()) -} - -#[tokio::test] -async fn test_complex_load() -> TestResult { - const SLEEP_SECS: u64 = 1; - let now = SystemTime::now(); - let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); - let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await; - let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one() - .wait_one() - .await; - let ls = unwrap_enum!(ls, ExecResult::Assigned); - assert_eq!(ls.retcode.unwrap(), 0); - let folders = ls.to_string_result().unwrap(); - let subfolders_jobs: Vec = folders - .lines() - .map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap()) - .collect(); - let ls_subfolders = JobBuilder::from_meta(subfolders_jobs) - .unwrap_one() - .wait() - .await; - for result in ls_subfolders { - let result = unwrap_enum!(result, ExecResult::Assigned); - assert_eq!(result.retcode.unwrap(), 0); - } - longest_job.wait().await; - assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); - Ok(()) -} -/* -#[tokio::test] -async fn test_exec_multiple_jobs_nowait() -> UResult<()> { - const REPEATS: usize = 10; - let job = JobMeta::from_shell("whoami"); - let sleep_jobs: Vec = (0..=REPEATS).map(|_| job.clone()).collect(); - build_jobs(sleep_jobs).spawn().await; - let mut completed = 0; - while completed < REPEATS { - let c = pop_completed().await.len(); - if c > 0 { - completed += c; - println!("{}", c); - } - } - Ok(()) -} -*/ -#[tokio::test] -async fn test_failing_shell_job() -> TestResult { - let job = JobMeta::from_shell("lol_kek_puk")?; - let job_result = JobBuilder::from_meta(job) - .unwrap_one() - .wait_one() - .await; - let job_result = unwrap_enum!(job_result, ExecResult::Assigned); - let output = job_result.to_string_result().unwrap(); - assert!(output.contains("No such file")); - assert!(job_result.retcode.is_none()); - Ok(()) -} - -#[test_case( - "/bin/bash {}", - None, - "contains executable" - ; "no binary" -)] -#[test_case( - "/bin/bash", - Some(b"whoami"), - "contains no executable" - ; "no path to binary" -)] -#[tokio::test] -async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult { - let mut job = JobMeta::builder().with_shell(cmd); - if let Some(p) = payload { - job = job.with_payload(p); - } - let err = job.build().unwrap_err(); - let err_msg = unwrap_enum!(err, UError::JobArgsError); - assert!(err_msg.contains(err_str)); - Ok(()) -} - -#[tokio::test] -async fn test_different_job_types() -> TestResult { - let mut jobs = NamedJobBuilder::from_meta(vec![ - ("sleeper", JobMeta::from_shell("sleep 3")?), - ("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?) - ]).wait().await; - let gathered = jobs.pop("gatherer"); - assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None); - Ok(()) -} diff --git a/lib/u_lib/tests/jobs/output.rs b/lib/u_lib/tests/jobs/output.rs deleted file mode 100644 index 0fc1522..0000000 --- a/lib/u_lib/tests/jobs/output.rs +++ /dev/null @@ -1,43 +0,0 @@ -use u_lib::{models::JobOutput, utils::vec_to_string}; - -const STDOUT: &str = "<***STDOUT***>"; -const STDERR: &str = "<***STDERR***>"; - -#[test_case( - "lol", - "kek", - &format!("{}lol{}kek", STDOUT, STDERR) - ;"stdout stderr" -)] -#[test_case( - "", - "kek", - &format!("{}kek", STDERR) - ;"stderr" -)] -fn test_to_combined(stdout: &str, stderr: &str, result: &str) { - let output = JobOutput::new() - .stdout(stdout.as_bytes().to_vec()) - .stderr(stderr.as_bytes().to_vec()); - assert_eq!(&vec_to_string(&output.into_combined()), result) -} - -#[test_case( - &format!("{}lal{}kik", STDOUT, STDERR), - "lal\nkik" - ;"stdout stderr" -)] -#[test_case( - &format!("{}qeq", STDOUT), - "qeq" - ;"stdout" -)] -#[test_case( - &format!("{}vev", STDERR), - "vev" - ;"stderr" -)] -fn test_from_combined(src: &str, result: &str) { - let output = JobOutput::from_combined(src.as_bytes()).unwrap(); - assert_eq!(vec_to_string(&output.to_appropriate()).trim(), result); -} diff --git a/lib/u_lib/tests/tests.rs b/lib/u_lib/tests/tests.rs deleted file mode 100644 index b0e8959..0000000 --- a/lib/u_lib/tests/tests.rs +++ /dev/null @@ -1,10 +0,0 @@ -#[macro_use] -extern crate test_case; - -#[macro_use] -extern crate u_lib; - -mod jobs { - mod execution; - mod output; -} diff --git a/scripts/cargo_musl.sh b/scripts/cargo_musl.sh index 49cf0cd..c33121b 100755 --- a/scripts/cargo_musl.sh +++ b/scripts/cargo_musl.sh @@ -1,10 +1,11 @@ #!/bin/bash set -e source $(dirname $0)/rootdir.sh #set ROOTDIR +umask 002 docker run \ -v $ROOTDIR:/volume \ -v cargo-cache:/root/.cargo/registry \ -w /volume \ -it \ - clux/muslrust \ + unki/musllibs \ cargo $@