diff --git a/.env b/.env index 001cab7..60ff916 100644 --- a/.env +++ b/.env @@ -1,3 +1,4 @@ -ADMIN_AUTH_TOKEN=464af63dbd241969baa1e94b2461d94d -POSTGRES_PASSWORD=12348756 -DATABASE_URL=postgres://postgres:${POSTGRES_PASSWORD}@u_db/u_db +DB_HOST=u_db +DB_NAME=u_db +DB_USER=postgres +RUST_BACKTRACE=1 \ No newline at end of file diff --git a/.env.private.sample b/.env.private.sample new file mode 100644 index 0000000..28abc30 --- /dev/null +++ b/.env.private.sample @@ -0,0 +1,4 @@ +# remove '.sample' to activate +ADMIN_AUTH_TOKEN= +DB_PASSWORD= +POSTGRES_PASSWORD=${DB_PASSWORD} \ No newline at end of file diff --git a/.gitignore b/.gitignore index fa726f9..0f6b5f5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,8 @@ target/ **/*.rs.bk .idea/ data/ -static/ -**/*.pyc \ No newline at end of file +**/*.pyc +certs/* +*.log +echoer +.env.private \ No newline at end of file diff --git a/Makefile b/Makefile index 8c51d78..969e896 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: _pre_build debug release run clean +.PHONY: _pre_build debug release run clean unit-tests integration-tests test CARGO=./scripts/cargo_musl.sh @@ -15,4 +15,12 @@ release: _pre_build ${CARGO} build --release run: build - ${CARGO} run \ No newline at end of file + ${CARGO} run + +unit-tests: + ${CARGO} test --lib + +integration-tests: + cd ./integration && ./integration_tests.sh + +test: unit-tests integration-tests \ No newline at end of file diff --git a/bin/u_agent/Cargo.toml b/bin/u_agent/Cargo.toml index c963c2b..3f7a884 100644 --- a/bin/u_agent/Cargo.toml +++ b/bin/u_agent/Cargo.toml @@ -14,4 +14,7 @@ env_logger = "0.8.3" uuid = "0.6.5" reqwest = { version = "0.11", features = ["json"] } openssl = "*" -u_lib = { version = "*", path = "../../lib/u_lib" } \ No newline at end of file +u_lib = { version = "*", path = "../../lib/u_lib" } + +[build-dependencies] +openssl = "*" \ No newline at end of file diff --git a/bin/u_agent/build.rs b/bin/u_agent/build.rs new file mode 100644 index 0000000..a9a9abd --- /dev/null +++ b/bin/u_agent/build.rs @@ -0,0 +1,8 @@ +use std::path::PathBuf; + +fn main() { + let server_cert = PathBuf::from("../../certs/ca.crt"); + if !server_cert.exists() { + panic!("CA certificate doesn't exist. Create it first with certs/gen_certs.sh"); + } +} diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index 876a1b6..5db6b54 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -46,12 +46,12 @@ pub async fn process_request(job_requests: Vec, client: &ClientHand } } info!( - "Scheduling jobs: \n{}", + "Scheduling jobs: {}", job_requests .iter() .map(|j| j.job_id.to_string()) .collect::>() - .join("\n") + .join(", ") ); let mut builder = JobBuilder::from_request(job_requests); let errors = builder.pop_errors(); @@ -77,7 +77,7 @@ pub async fn run_forever() { info!("Connecting to the server"); loop { let job_requests: Vec = - retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await).into_builtin_vec(); + retry_until_ok!(instance.get_personal_jobs(Some(*UID)).await).into_builtin_vec(); process_request(job_requests, &instance).await; let result: Vec = pop_completed().await.into_iter().collect(); if result.len() > 0 { diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index f6335a2..4b86905 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -1,9 +1,9 @@ -use serde::Serialize; use std::env; use std::fmt; use structopt::StructOpt; use u_lib::{ - api::ClientHandler, messaging::AsMsg, models::JobMeta, utils::init_env, UError, UResult, + api::ClientHandler, datatypes::DataResult, messaging::AsMsg, models::JobMeta, utils::init_env, + UResult, }; use uuid::Uuid; @@ -79,36 +79,34 @@ fn parse_uuid(src: &str) -> Result { } async fn process_cmd(args: Args) { - fn printer(data: UResult, json: bool) { - if json { - #[derive(Serialize)] - #[serde(rename_all = "lowercase")] - #[serde(tag = "status", content = "data")] - enum DataResult { - Ok(M), - Err(UError), - } + struct Printer { + json: bool, + } - let data = match data { - Ok(r) => DataResult::Ok(r), - Err(e) => DataResult::Err(e), - }; - println!("{}", serde_json::to_string_pretty(&data).unwrap()); - } else { - match data { - Ok(r) => println!("{}", r), - Err(e) => eprintln!("Error: {}", e), + impl Printer { + pub fn print(&self, data: UResult) { + if self.json { + let data = match data { + Ok(r) => DataResult::Ok(r), + Err(e) => DataResult::Err(e), + }; + println!("{}", serde_json::to_string_pretty(&data).unwrap()); + } else { + match data { + Ok(r) => println!("{}", r), + Err(e) => eprintln!("Error: {}", e), + } } } } let token = env::var("ADMIN_AUTH_TOKEN").expect("Authentication token is not set"); let cli_handler = ClientHandler::new(None).password(token); - let json = args.json; + let printer = Printer { json: args.json }; match args.cmd { Cmd::Agents(action) => match action { - LD::List { uid } => printer(cli_handler.get_agents(uid).await, json), - LD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json), + LD::List { uid } => printer.print(cli_handler.get_agents(uid).await), + LD::Delete { uid } => printer.print(cli_handler.del(Some(uid)).await), }, Cmd::Jobs(action) => match action { JobALD::Add { @@ -121,21 +119,18 @@ async fn process_cmd(args: Args) { .with_alias(alias) .build() .unwrap(); - printer(cli_handler.upload_jobs(&[job]).await, json); + printer.print(cli_handler.upload_jobs(&[job]).await); } - JobALD::LD(LD::List { uid }) => printer(cli_handler.get_jobs(uid).await, json), - JobALD::LD(LD::Delete { uid }) => printer(cli_handler.del(Some(uid)).await, json), + JobALD::LD(LD::List { uid }) => printer.print(cli_handler.get_jobs(uid).await), + JobALD::LD(LD::Delete { uid }) => printer.print(cli_handler.del(Some(uid)).await), }, Cmd::Jobmap(action) => match action { JobMapALD::Add { agent_uid, job_idents, - } => printer( - cli_handler.set_jobs(Some(agent_uid), &job_idents).await, - json, - ), - JobMapALD::List { uid } => printer(cli_handler.get_agent_jobs(uid).await, json), - JobMapALD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json), + } => printer.print(cli_handler.set_jobs(Some(agent_uid), &job_idents).await), + JobMapALD::List { uid } => printer.print(cli_handler.get_agent_jobs(uid).await), + JobMapALD::Delete { uid } => printer.print(cli_handler.del(Some(uid)).await), }, } } diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 96a3651..043022a 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -8,10 +8,10 @@ version = "0.1.0" log = "0.4.11" simplelog = "0.10" thiserror = "*" -warp = "0.2.4" +warp = { version = "0.3.1", features = ["tls"] } uuid = { version = "0.6.5", features = ["serde", "v4"] } once_cell = "1.7.2" -hyper = "0.13.10" +hyper = "0.14" mockall = "0.9.1" mockall_double = "0.2" openssl = "*" @@ -26,7 +26,7 @@ version = "1.0.114" [dependencies.tokio] features = ["macros"] -version = "0.2.22" +version = "1.9" [dependencies.u_lib] path = "../../lib/u_lib" @@ -37,7 +37,7 @@ test-case = "1.1.0" [lib] name = "u_server_lib" -path = "src/lib.rs" +path = "src/u_server.rs" [[bin]] name = "u_server" diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index c104968..fa39ad4 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -20,8 +20,16 @@ static DB: OnceCell>> = OnceCell::new(); impl UDB { pub fn lock_db() -> MutexGuard<'static, UDB> { DB.get_or_init(|| { - let db_path = env::var("DATABASE_URL").unwrap(); - let conn = PgConnection::establish(&db_path).unwrap(); + let _getenv = |v| env::var(v).unwrap(); + let db_host = _getenv("DB_HOST"); + let db_name = _getenv("DB_NAME"); + let db_user = _getenv("DB_USER"); + let db_password = _getenv("DB_PASSWORD"); + let db_url = format!( + "postgres://{}:{}@{}/{}", + db_user, db_password, db_host, db_name + ); + let conn = PgConnection::establish(&db_url).unwrap(); let instance = UDB { conn }; Arc::new(Mutex::new(instance)) }) @@ -97,9 +105,9 @@ impl UDB { ) -> ULocalResult> { use schema::results; let mut q = results::table.into_boxed(); - if uid.is_some() { + /*if uid.is_some() { q = q.filter(results::agent_id.eq(uid.unwrap())) - } + }*/ if personal { q = q.filter( results::state @@ -140,10 +148,13 @@ impl UDB { } let job_requests = job_uids .iter() - .map(|job_uid| AssignedJob { - job_id: *job_uid, - agent_id: *agent_uid, - ..Default::default() + .map(|job_uid| { + info!("set_jobs_for_agent: set {} for {}", job_uid, agent_uid); + AssignedJob { + job_id: *job_uid, + agent_id: *agent_uid, + ..Default::default() + } }) .collect::>(); diesel::insert_into(results::table) diff --git a/bin/u_server/src/filters.rs b/bin/u_server/src/filters.rs index a9c51f0..59527a0 100644 --- a/bin/u_server/src/filters.rs +++ b/bin/u_server/src/filters.rs @@ -48,12 +48,12 @@ pub fn make_filters() -> impl Filter .map(Some) .or_else(infallible_none), ) - .and_then(|uid| Endpoints::get_agent_jobs(uid, false)); + .and_then(|uid| Endpoints::get_agent_jobs(uid)); let get_personal_jobs = warp::get() - .and(warp::path("get_agent_jobs")) + .and(warp::path("get_personal_jobs")) .and(warp::path::param::().map(Some)) - .and_then(|uid| Endpoints::get_agent_jobs(uid, true)); + .and_then(|uid| Endpoints::get_personal_jobs(uid)); let del = warp::get() .and(warp::path("del")) @@ -73,16 +73,15 @@ pub fn make_filters() -> impl Filter let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); - let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); + let auth_zone = (get_agents + .or(get_jobs) + .or(upload_jobs) + .or(del) + .or(set_jobs) + .or(get_agent_jobs)) + .and(auth_header); - let auth_zone = auth_header.and( - get_agents - .or(get_jobs) - .or(upload_jobs) - .or(del) - .or(set_jobs) - .or(get_agent_jobs), - ); + let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); - agent_zone.or(auth_zone) + auth_zone.or(agent_zone) } diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index f792188..db8ff4c 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -57,30 +57,31 @@ impl Endpoints { .or_else(|e| Ok(build_err(e))) } - pub async fn get_agent_jobs( - uid: Option, - personal: bool, - ) -> Result, Rejection> { - info!("hnd: get_agent_jobs {}", personal); - if personal { - let agents = UDB::lock_db().get_agents(uid).unwrap(); - if agents.len() == 0 { - let db = UDB::lock_db(); - db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap(); - let job = db.find_job_by_alias("agent_hello").unwrap(); - if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) { - return Ok(build_err(e)); - } + pub async fn get_agent_jobs(uid: Option) -> Result, Rejection> { + info!("hnd: get_agent_jobs"); + UDB::lock_db() + .get_exact_jobs(uid, false) + .map(|m| build_message(m)) + .or_else(|e| Ok(build_err(e))) + } + + pub async fn get_personal_jobs(uid: Option) -> Result, Rejection> { + info!("hnd: get_personal_jobs"); + let agents = UDB::lock_db().get_agents(uid).unwrap(); + if agents.len() == 0 { + let db = UDB::lock_db(); + db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap(); + let job = db.find_job_by_alias("agent_hello").unwrap(); + if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) { + return Ok(build_err(e)); } } - let result = UDB::lock_db().get_exact_jobs(uid, personal); + let result = UDB::lock_db().get_exact_jobs(uid, true); match result { Ok(r) => { - if personal { - let db = UDB::lock_db(); - for j in r.iter() { - db.update_job_status(j.id, JobState::Running).ok(); - } + let db = UDB::lock_db(); + for j in r.iter() { + db.update_job_status(j.id, JobState::Running).unwrap(); } Ok(build_message(r)) } @@ -105,21 +106,22 @@ impl Endpoints { for del_fn in del_fns { let affected = del_fn(&db, &vec![uid]).unwrap(); if affected > 0 { - return Ok(build_ok(affected.to_string())); + return Ok(build_message(affected as i32)); } } - Ok(build_err("0")) + Ok(build_message(0)) } pub async fn set_jobs( agent_uid: Uuid, msg: BaseMessage<'static, Vec>, ) -> Result, Rejection> { - info!("hnd: set_jobs_by_alias"); + info!("hnd: set_jobs_by_alias, agent: {}", agent_uid); let jobs: Result, ULocalError> = msg .into_inner() .into_iter() .map(|ident| { + info!("hnd: set_jobs_by_alias, job: {}", ident); Uuid::parse_str(&ident) .or_else(|_| UDB::lock_db().find_job_by_alias(&ident).map(|j| j.id)) }) @@ -157,6 +159,7 @@ impl Endpoints { a.state = AgentState::Active; Self::add_agent(a).await?; } + ExecResult::Dummy => (), } } if failed.len() > 0 { diff --git a/bin/u_server/src/lib.rs b/bin/u_server/src/u_server.rs similarity index 67% rename from bin/u_server/src/lib.rs rename to bin/u_server/src/u_server.rs index 6b24a9a..cbba6b1 100644 --- a/bin/u_server/src/lib.rs +++ b/bin/u_server/src/u_server.rs @@ -43,14 +43,10 @@ fn init_logger() { .create(true) .open(LOGFILE) .unwrap(); + let level = LevelFilter::Info; let loggers = vec![ - WriteLogger::new(LevelFilter::Info, log_cfg.clone(), logfile) as Box, - TermLogger::new( - LevelFilter::Info, - log_cfg, - TerminalMode::Stderr, - ColorChoice::Auto, - ), + WriteLogger::new(level, log_cfg.clone(), logfile) as Box, + TermLogger::new(level, log_cfg, TerminalMode::Stderr, ColorChoice::Auto), ]; CombinedLogger::init(loggers).unwrap(); } @@ -65,6 +61,10 @@ pub async fn serve() { init_all(); let routes = make_filters(); warp::serve(routes.with(warp::log("warp"))) + .tls() + .cert_path("./certs/server.crt") + .key_path("./certs/server.key") + .client_auth_required_path("./certs/ca.crt") .run(([0, 0, 0, 0], MASTER_PORT)) .await; } @@ -77,6 +77,7 @@ mod tests { use handlers::build_ok; use mockall::predicate::*; use test_case::test_case; + use u_lib::messaging::{AsMsg, BaseMessage}; use uuid::Uuid; use warp::test::request; @@ -85,9 +86,7 @@ mod tests { #[tokio::test] async fn test_get_agent_jobs_unauthorized(uid: Option) { let mock = Endpoints::get_agent_jobs_context(); - mock.expect() - .with(eq(uid), eq(uid.is_some())) - .returning(|_, _| Ok(build_ok(""))); + mock.expect().with(eq(uid)).returning(|_| Ok(build_ok(""))); request() .path(&format!( "/get_agent_jobs/{}", @@ -99,4 +98,20 @@ mod tests { .unwrap(); mock.checkpoint(); } + + #[tokio::test] + async fn test_report_unauth_successful() { + let mock = Endpoints::report_context(); + mock.expect() + .withf(|msg: &BaseMessage<'_, Vec>| msg.inner_ref()[0] == ExecResult::Dummy) + .returning(|_| Ok(build_ok(""))); + request() + .path("/report/") + .method("POST") + .json(&vec![ExecResult::Dummy].as_message()) + .filter(&make_filters()) + .await + .unwrap(); + mock.checkpoint(); + } } diff --git a/certs/gen_certs.sh b/certs/gen_certs.sh new file mode 100755 index 0000000..a0e9d52 --- /dev/null +++ b/certs/gen_certs.sh @@ -0,0 +1,17 @@ +set -ex +DIR=. +V3_CFG=v3.ext + +cat > $DIR/$V3_CFG << EOF +authorityKeyIdentifier=keyid,issuer +basicConstraints=CA:FALSE +keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment, keyAgreement, keyCertSign, cRLSign +EOF + + +openssl req -x509 -newkey rsa:4096 -keyout $DIR/ca.key -out $DIR/ca.crt -nodes -days 365 -subj "/CN=root" +openssl req -newkey rsa:4096 -keyout $DIR/alice.key -out $DIR/alice.csr -nodes -days 365 -subj "/CN=alice" +openssl req -newkey rsa:4096 -keyout $DIR/server.key -out $DIR/server.csr -nodes -days 365 -subj "/CN=u_server" +openssl x509 -req -in $DIR/alice.csr -CA $DIR/ca.crt -CAkey $DIR/ca.key -out $DIR/alice.crt -set_serial 01 -days 365 -extfile $DIR/$V3_CFG +openssl x509 -req -in $DIR/server.csr -CA $DIR/ca.crt -CAkey $DIR/ca.key -out $DIR/server.crt -set_serial 01 -days 365 -extfile $DIR/$V3_CFG +openssl pkcs12 -export -out $DIR/alice.p12 -inkey $DIR/alice.key -in $DIR/alice.crt -passin pass: -passout pass: \ No newline at end of file diff --git a/integration/Cargo.toml b/integration/Cargo.toml index dfccd4c..28028a2 100644 --- a/integration/Cargo.toml +++ b/integration/Cargo.toml @@ -17,6 +17,10 @@ serde = { version = "1.0.114", features = ["derive"] } futures = "0.3.5" shlex = "1.0.0" +[dependencies.u_lib] +path = "../lib/u_lib" +version = "*" + [[test]] name = "integration" diff --git a/integration/docker-compose.yml b/integration/docker-compose.yml index bd83d3e..c0381e3 100644 --- a/integration/docker-compose.yml +++ b/integration/docker-compose.yml @@ -13,16 +13,19 @@ services: - ../target/x86_64-unknown-linux-musl/release/u_server:/u_server - ../:/unki/ working_dir: /unki - command: bash -c "diesel setup && diesel migration run && /u_server" + command: bash -c " + export DATABASE_URL=postgres://$${DB_USER}:$${DB_PASSWORD}@$${DB_HOST}/$${DB_NAME} && + diesel setup && diesel migration run && /u_server" depends_on: u_db: condition: service_healthy expose: - '63714' - environment: - RUST_LOG: warp=info env_file: - ../.env + - ../.env.private + environment: + RUST_LOG: trace healthcheck: test: /bin/ss -tlpn | grep 63714 interval: 5s @@ -37,6 +40,7 @@ services: - '5432' env_file: - ../.env + - ../.env.private healthcheck: test: /bin/ss -tlpn | grep 5432 interval: 5s @@ -50,6 +54,10 @@ services: volumes: - ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent command: /u_agent u_server + env_file: + - ../.env + environment: + RUST_LOG: u_agent=debug depends_on: u_server: condition: service_healthy @@ -61,6 +69,10 @@ services: volumes: - ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent command: /u_agent u_server + env_file: + - ../.env + environment: + RUST_LOG: u_agent=debug depends_on: u_server: condition: service_healthy @@ -70,13 +82,14 @@ services: networks: - u_net volumes: + - ~/.cargo/registry:/root/.cargo/registry - ./:/tests/ + - ../certs:/certs - ../target/x86_64-unknown-linux-musl/release/u_panel:/u_panel - - ~/.cargo/registry:/root/.cargo/registry + - ../lib/u_lib:/lib/u_lib + - ../lib/u_api_proc_macro:/lib/u_api_proc_macro working_dir: /tests/ - env_file: - - ../.env depends_on: u_agent_1: condition: service_started @@ -84,6 +97,9 @@ services: condition: service_started u_server: condition: service_healthy + env_file: + - ../.env + - ../.env.private environment: RUST_BACKTRACE: 1 U_SERVER: u_server \ No newline at end of file diff --git a/integration/tests/helpers/mod.rs b/integration/tests/helpers/mod.rs index 0957ad9..783b365 100644 --- a/integration/tests/helpers/mod.rs +++ b/integration/tests/helpers/mod.rs @@ -1,5 +1,3 @@ -pub mod client; pub mod panel; -pub use client::AgentClient; pub use panel::Panel; diff --git a/integration/tests/helpers/panel.rs b/integration/tests/helpers/panel.rs index f250332..41008f9 100644 --- a/integration/tests/helpers/panel.rs +++ b/integration/tests/helpers/panel.rs @@ -1,9 +1,13 @@ -use serde_json::{from_slice, Value}; +use serde::de::DeserializeOwned; +use serde_json::from_slice; use shlex::split; use std::process::{Command, Output}; +use u_lib::{datatypes::DataResult, messaging::AsMsg}; const PANEL_BINARY: &str = "/u_panel"; +type PanelResult = Result, String>; + pub struct Panel; impl Panel { @@ -15,13 +19,13 @@ impl Panel { .unwrap() } - pub fn output_argv(args: &[&str]) -> Value { + pub fn output_argv(args: &[&str]) -> PanelResult { let result = Self::run(args); assert!(result.status.success()); - from_slice(&result.stdout).unwrap() + from_slice(&result.stdout).map_err(|e| e.to_string()) } - pub fn output>(args: S) -> Value { + pub fn output(args: impl Into) -> PanelResult { let splitted = split(args.into().as_ref()).unwrap(); Self::output_argv( splitted @@ -32,22 +36,20 @@ impl Panel { ) } - fn status_is_ok(data: &Value) { - assert_eq!( - data["status"], "ok", - "Panel failed with erroneous status: {}", - data["data"] - ); + fn status_is_ok(data: PanelResult) -> T { + match data.unwrap() { + DataResult::Ok(r) => r, + DataResult::Err(e) => panic!("Panel failed with erroneous status: {}", e), + } } - pub fn check_status>(args: S) { - let result = Self::output(args); - Self::status_is_ok(&result); + pub fn check_status<'s, T: AsMsg + DeserializeOwned>(args: &'s str) { + let result: PanelResult = Self::output(args); + Self::status_is_ok(result); } - pub fn check_output>(args: S) -> Vec { + pub fn check_output(args: impl Into) -> T { let result = Self::output(args); - Self::status_is_ok(&result); - result["data"].as_array().unwrap().clone() + Self::status_is_ok(result) } } diff --git a/integration/tests/tests.rs b/integration/tests/tests.rs index 2cc9bce..636a4be 100644 --- a/integration/tests/tests.rs +++ b/integration/tests/tests.rs @@ -1,80 +1,67 @@ mod helpers; -use helpers::{AgentClient, Panel}; +use helpers::Panel; -use serde_json::json; +use std::error::Error; use std::thread::sleep; use std::time::Duration; +use u_lib::{api::ClientHandler, models::*}; use uuid::Uuid; -type TestResult = Result>; +type TestResult = Result>; async fn register_agent() -> Uuid { - let cli = AgentClient::new(); + let cli = ClientHandler::new(None); let agent_uid = Uuid::new_v4(); - let resp = cli.get(format!("get_agent_jobs/{}", agent_uid)).await; - let job_id = &resp["job_id"]; - let resp = cli.get(format!("get_jobs/{}", job_id)).await; - assert_eq!(&resp["alias"], "agent_hello"); - let agent_data = json! { - {"id": &agent_uid,"inner":[ - {"Agent": - {"alias":null, - "hostname":"3b1030fa6324", - "id":&agent_uid, - "is_root":false, - "is_root_allowed":false, - "last_active":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814921}, - "platform":"x86_64-unknown-linux-gnu", - "regtime":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814945}, - "state":"New", - "token":null, - "username":"root"} - } - ]} + let resp = cli + .get_personal_jobs(Some(agent_uid)) + .await + .unwrap() + .pop() + .unwrap(); + let job_id = resp.job_id; + let resp = cli.get_jobs(Some(job_id)).await.unwrap().pop().unwrap(); + assert_eq!(resp.alias, Some("agent_hello".to_string())); + let agent_data = Agent { + id: agent_uid, + ..Default::default() }; - cli.post("report", &agent_data).await; + cli.report(&vec![ExecResult::Agent(agent_data)]) + .await + .unwrap(); agent_uid } #[tokio::test] async fn test_registration() -> TestResult { let agent_uid = register_agent().await; - let agents = Panel::check_output("agents list"); - let found = agents - .iter() - .find(|v| v["id"].as_str().unwrap() == agent_uid.to_string()); + let agents: Vec = Panel::check_output("agents list"); + let found = agents.iter().find(|v| v.id == agent_uid); assert!(found.is_some()); + //teardown + Panel::check_status::(&format!("agents delete {}", agent_uid)); Ok(()) } #[tokio::test] async fn test_setup_tasks() -> TestResult { - let agent_uid = Panel::check_output("agents list")[0]["id"].clone(); - dbg!(&agent_uid); + //some independent agents should present + let agents: Vec = Panel::check_output("agents list"); + let agent_uid = agents[0].id; let job_alias = "passwd_contents"; let cmd = format!("jobs add --alias {} 'cat /etc/passwd'", job_alias); - Panel::check_status(cmd); + Panel::check_status::(&cmd); let cmd = format!("jobmap add {} {}", agent_uid, job_alias); - let assigned_uids = Panel::check_output(cmd); - dbg!(&assigned_uids); - loop { - let result = Panel::check_output(format!("jobmap list {}", assigned_uids[0])); - dbg!(&result); - match result.get(0) { - Some(entry) if entry["state"] == "Finished" => { - println!("{}", result[0]); - break; - } - None => { - eprintln!("jobmap list is empty (bad bad bad)"); - continue; - } - _ => { - sleep(Duration::from_secs(1)); - eprintln!("waiting for task"); - } + let assigned_uids: Vec = Panel::check_output(cmd); + for _ in 0..3 { + let result: Vec = + Panel::check_output(format!("jobmap list {}", assigned_uids[0])); + if result[0].state == JobState::Finished { + return Ok(()); + } else { + sleep(Duration::from_secs(5)); + eprintln!("waiting for task"); } } - Ok(()) + panic!() } diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index dad63a9..5d56eff 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -14,7 +14,7 @@ nix = "0.17" libc = "^0.2" lazy_static = "1.4.0" tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", features = ["json", "native-tls"] } openssl = "*" futures = "0.3.5" guess_host_triple = "0.1.2" diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 636a0dc..82ba4bf 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -6,11 +6,14 @@ use crate::{ utils::{opt_to_string, VecDisplay}, UError, UResult, }; -use reqwest::{Client, RequestBuilder, Url}; +use reqwest::{Certificate, Client, Identity, RequestBuilder, Url}; use std::env; use u_api_proc_macro::api_route; use uuid::Uuid; +const AGENT_IDENTITY: &[u8] = include_bytes!("../../../certs/alice.p12"); +const ROOT_CA_CERT: &[u8] = include_bytes!("../../../certs/ca.crt"); + pub struct ClientHandler { base_url: Url, client: Client, @@ -21,9 +24,15 @@ impl ClientHandler { pub fn new(server: Option<&str>) -> Self { let env_server = env::var("U_SERVER").unwrap_or(String::from(MASTER_SERVER)); let master_server = server.unwrap_or(env_server.as_str()); + let identity = Identity::from_pkcs12_der(AGENT_IDENTITY, "").unwrap(); + let client = Client::builder() + .identity(identity) + .add_root_certificate(Certificate::from_pem(ROOT_CA_CERT).unwrap()) + .build() + .unwrap(); Self { - client: Client::new(), - base_url: Url::parse(&format!("http://{}:{}", master_server, MASTER_PORT)).unwrap(), + client, + base_url: Url::parse(&format!("https://{}:{}", master_server, MASTER_PORT)).unwrap(), password: None, } } @@ -52,7 +61,7 @@ impl ClientHandler { // // get jobs for client #[api_route("GET")] - fn get_agent_jobs(&self, url_param: Option) -> VecDisplay {} + fn get_personal_jobs(&self, url_param: Option) -> VecDisplay {} // // send something to server #[api_route("POST")] @@ -73,9 +82,13 @@ impl ClientHandler { // // delete something #[api_route("GET")] - fn del(&self, url_param: Option) -> String {} + fn del(&self, url_param: Option) -> i32 {} // - // set jobs for client + // set jobs for any client #[api_route("POST")] fn set_jobs(&self, url_param: Option, payload: &[String]) -> VecDisplay {} + // + // get jobs for any client + #[api_route("GET")] + fn get_agent_jobs(&self, url_param: Option) -> VecDisplay {} } diff --git a/lib/u_lib/src/builder.rs b/lib/u_lib/src/builder.rs index 14b172d..da94225 100644 --- a/lib/u_lib/src/builder.rs +++ b/lib/u_lib/src/builder.rs @@ -1,7 +1,7 @@ use crate::{ cache::JobCache, - executor::{FutRes, Waiter, DynFut}, - models::{Agent, AssignedJob, JobMeta, JobType}, + executor::{Waiter, DynFut}, + models::{Agent, AssignedJob, JobMeta, JobType, ExecResult}, utils::{CombinedResult, OneOrVec}, UError, }; @@ -13,7 +13,7 @@ pub struct JobBuilder { } impl JobBuilder { - pub fn from_request>(job_requests: J) -> CombinedResult { + pub fn from_request(job_requests: impl OneOrVec) -> CombinedResult { let job_requests = job_requests.into_vec(); let mut prepared: Vec = vec![]; let mut result = CombinedResult::new(); @@ -24,6 +24,7 @@ impl JobBuilder { continue; } let job_meta = job_meta.unwrap(); + //waiting for try-blocks stabilization let built_req = (|| { Ok(match job_meta.exec_type { JobType::Shell => { @@ -52,7 +53,7 @@ impl JobBuilder { result } - pub fn from_meta>(job_metas: J) -> CombinedResult { + pub fn from_meta(job_metas: impl OneOrVec) -> CombinedResult { let job_requests = job_metas .into_vec() .into_iter() @@ -72,12 +73,12 @@ impl JobBuilder { } /// Spawn jobs and wait for result - pub async fn wait(self) -> Vec { + pub async fn wait(self) -> Vec { self.jobs.spawn().await.wait().await } /// Spawn one job and wait for result - pub async fn wait_one(self) -> FutRes { + pub async fn wait_one(self) -> ExecResult { self.jobs.spawn().await.wait().await.pop().unwrap() } } @@ -86,12 +87,12 @@ impl JobBuilder { pub struct NamedJobBuilder { builder: Option, job_names: Vec<&'static str>, - results: HashMap<&'static str, FutRes>, + results: HashMap<&'static str, ExecResult>, } impl NamedJobBuilder { - pub fn from_shell>( - named_jobs: J, + pub fn from_shell( + named_jobs: impl OneOrVec<(&'static str, &'static str)>, ) -> CombinedResult { let mut result = CombinedResult::new(); let jobs: Vec<(&'static str, JobMeta)> = named_jobs @@ -111,7 +112,7 @@ impl NamedJobBuilder { result } - pub fn from_meta>(named_jobs: J) -> Self { + pub fn from_meta(named_jobs: impl OneOrVec<(&'static str, JobMeta)>) -> Self { let mut job_names = vec![]; let job_metas: Vec = named_jobs .into_vec() @@ -136,11 +137,11 @@ impl NamedJobBuilder { self } - pub fn pop_opt(&mut self, name: &'static str) -> Option { + pub fn pop_opt(&mut self, name: &'static str) -> Option { self.results.remove(name) } - pub fn pop(&mut self, name: &'static str) -> FutRes { + pub fn pop(&mut self, name: &'static str) -> ExecResult { self.pop_opt(name).unwrap() } } @@ -180,7 +181,7 @@ mod tests { ;"sh payload" )] #[test_case( - r#"/usr/bin/python -c 'print("test02")'"#, + r#"/usr/bin/python3 -c 'print("test02")'"#, None, "test02" ;"python cmd" diff --git a/lib/u_lib/src/datatypes.rs b/lib/u_lib/src/datatypes.rs new file mode 100644 index 0000000..d980734 --- /dev/null +++ b/lib/u_lib/src/datatypes.rs @@ -0,0 +1,10 @@ +use crate::UError; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +#[serde(tag = "status", content = "data")] +pub enum DataResult { + Ok(M), + Err(UError), +} diff --git a/lib/u_lib/src/errors.rs b/lib/u_lib/src/errors.rs index a180311..206e363 100644 --- a/lib/u_lib/src/errors.rs +++ b/lib/u_lib/src/errors.rs @@ -10,7 +10,7 @@ pub type ULocalResult = std::result::Result; #[derive(Error, Debug, Serialize, Deserialize, Clone)] pub enum UError { #[error("Error: {0}")] - Raw(&'static str), + Raw(String), #[error("Connection error: {0}. Body: {1}")] NetError(String, String), @@ -22,7 +22,7 @@ pub enum UError { JobError(String), #[error("Argument parsing failed: {0}")] - JobArgsError(&'static str), + JobArgsError(String), #[error("Job is uncompleted yet")] JobUncompleted, diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index 95ab8aa..1c128e7 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -9,8 +9,7 @@ use tokio::{ }; use uuid::Uuid; -pub type FutRes = ExecResult; -pub type DynFut = BoxFuture<'static, FutRes>; +pub type DynFut = BoxFuture<'static, ExecResult>; lazy_static! { static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); @@ -22,7 +21,7 @@ lazy_static! { } struct JoinInfo { - handle: JoinHandle, + handle: JoinHandle, completed: bool, collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed } @@ -37,7 +36,7 @@ pub struct Waiter { } impl Waiter { - pub fn new>(tasks: S) -> Self { + pub fn new(tasks: impl OneOrVec) -> Self { Self { tasks: tasks.into_vec(), fids: vec![], @@ -66,10 +65,10 @@ impl Waiter { self } - // wait until a bunch of tasks is finished - // NOT GUARANTEED that all tasks will be returned due to - // possibility to pop them in other places - pub async fn wait(self) -> Vec { + /// Wait until a bunch of tasks is finished. + /// NOT GUARANTEED that all tasks will be returned due to + /// possibility to pop them in other places + pub async fn wait(self) -> Vec { let mut result = vec![]; for fid in self.fids { if let Some(task) = pop_task(fid).await { @@ -83,10 +82,8 @@ impl Waiter { async fn init_receiver() { while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { - //info!("init_receiver: next val: {}", &fid); if let Some(mut lock) = FUT_RESULTS.try_lock() { if let Some(j) = lock.get_mut(&fid) { - //info!("init_receiver: marked as completed"); j.completed = true; } } @@ -96,12 +93,8 @@ async fn init_receiver() { async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } -/* -async fn insert_task(fid: Uuid) { - FUT_RESULTS.lock().await.remove(&fid) -} -*/ -pub async fn pop_task_if_completed(fid: Uuid) -> Option { + +pub async fn pop_task_if_completed(fid: Uuid) -> Option { let &mut JoinInfo { handle: _, collectable, diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index 00d1c18..55f4e1e 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -3,6 +3,7 @@ pub mod api; pub mod builder; pub mod cache; pub mod config; +pub mod datatypes; pub mod errors; pub mod executor; pub mod messaging; diff --git a/lib/u_lib/src/messaging.rs b/lib/u_lib/src/messaging.rs index 3af1933..5330927 100644 --- a/lib/u_lib/src/messaging.rs +++ b/lib/u_lib/src/messaging.rs @@ -55,4 +55,8 @@ impl<'cow, I: AsMsg> BaseMessage<'cow, I> { pub fn into_inner(self) -> I { self.inner.into_owned() } + + pub fn inner_ref(&self) -> &I { + self.inner.as_ref() + } } diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 2c66f1e..16a78db 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -26,7 +26,15 @@ pub enum AgentState { //belongs_to #[derive( - Clone, Debug, Serialize, Deserialize, Identifiable, Queryable, Insertable, AsChangeset, + Clone, + Debug, + Serialize, + Deserialize, + Identifiable, + Queryable, + Insertable, + AsChangeset, + PartialEq, )] #[table_name = "agents"] pub struct Agent { diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index d9350c4..b11c50b 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -12,7 +12,15 @@ use tokio::process::Command; use uuid::Uuid; #[derive( - Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable, AsChangeset, + Serialize, + Deserialize, + Clone, + Debug, + Queryable, + Identifiable, + Insertable, + AsChangeset, + PartialEq, )] #[table_name = "results"] pub struct AssignedJob { diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 0fd1c41..50e1b65 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -115,8 +115,8 @@ impl JobMetaBuilder { match inner.exec_type { JobType::Shell => { let argv_parts = - shlex::split(&inner.argv).ok_or(UError::JobArgsError("Shlex failed"))?; - let empty_err = UError::JobArgsError("Empty argv"); + shlex::split(&inner.argv).ok_or(UError::JobArgsError("Shlex failed".into()))?; + let empty_err = UError::JobArgsError("Empty argv".into()); if argv_parts.get(0).ok_or(empty_err.clone())?.len() == 0 { return Err(empty_err); } @@ -124,7 +124,7 @@ impl JobMetaBuilder { Some(_) => { if !inner.argv.contains("{}") { return Err(UError::JobArgsError( - "Argv contains no executable placeholder", + "Argv contains no executable placeholder".into(), )); } else { () @@ -133,7 +133,8 @@ impl JobMetaBuilder { None => { if inner.argv.contains("{}") { return Err(UError::JobArgsError( - "No payload provided, but argv contains executable placeholder", + "No payload provided, but argv contains executable placeholder" + .into(), )); } else { () diff --git a/lib/u_lib/src/models/mod.rs b/lib/u_lib/src/models/mod.rs index d30a33a..29b14de 100644 --- a/lib/u_lib/src/models/mod.rs +++ b/lib/u_lib/src/models/mod.rs @@ -17,6 +17,7 @@ impl AsMsg for JobMeta {} impl AsMsg for String {} impl AsMsg for Uuid {} impl AsMsg for Empty {} +impl AsMsg for i32 {} #[derive(Serialize, Deserialize, Clone, Default, Debug)] pub struct Empty; diff --git a/lib/u_lib/src/models/result.rs b/lib/u_lib/src/models/result.rs index 5f77f8f..849405a 100644 --- a/lib/u_lib/src/models/result.rs +++ b/lib/u_lib/src/models/result.rs @@ -1,8 +1,9 @@ use crate::models::{Agent, AssignedJob}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, PartialEq)] pub enum ExecResult { Assigned(AssignedJob), Agent(Agent), + Dummy, } diff --git a/lib/u_lib/src/utils/combined_result.rs b/lib/u_lib/src/utils/combined_result.rs index 687f541..4699ae6 100644 --- a/lib/u_lib/src/utils/combined_result.rs +++ b/lib/u_lib/src/utils/combined_result.rs @@ -14,11 +14,11 @@ impl CombinedResult { } } - pub fn ok>(&mut self, result: I) { + pub fn ok(&mut self, result: impl OneOrVec) { self.ok.extend(result.into_vec()); } - pub fn err>(&mut self, err: I) { + pub fn err(&mut self, err: impl OneOrVec) { self.err.extend(err.into_vec()); } diff --git a/lib/u_lib/src/utils/misc.rs b/lib/u_lib/src/utils/misc.rs index 4f93a85..375a6ad 100644 --- a/lib/u_lib/src/utils/misc.rs +++ b/lib/u_lib/src/utils/misc.rs @@ -64,7 +64,7 @@ pub fn setsig(sig: Signal, hnd: SigHandler) { } pub fn init_env() { - let envs = [".env"]; + let envs = [".env", ".env.private"]; for envfile in &envs { dotenv::from_filename(envfile).ok(); } diff --git a/lib/u_lib/src/utils/vec_display.rs b/lib/u_lib/src/utils/vec_display.rs index 117ee44..4b9f83c 100644 --- a/lib/u_lib/src/utils/vec_display.rs +++ b/lib/u_lib/src/utils/vec_display.rs @@ -7,7 +7,7 @@ use std::ops::{Deref, DerefMut}; pub struct VecDisplay(pub Vec); impl VecDisplay { - pub fn new>(inner: I) -> Self { + pub fn new(inner: impl OneOrVec) -> Self { VecDisplay(inner.into_vec()) } diff --git a/scripts/cargo_musl.sh b/scripts/cargo_musl.sh index a3446ec..0a8b3aa 100755 --- a/scripts/cargo_musl.sh +++ b/scripts/cargo_musl.sh @@ -1,12 +1,12 @@ #!/bin/bash -set -e +set -ex source $(dirname $0)/rootdir.sh #set ROOTDIR -umask 002 +ARGS=$@ docker run \ + --env-file $ROOTDIR/.env \ -v $ROOTDIR:/volume \ -v cargo-cache:/root/.cargo/registry \ -w /volume \ -it \ unki/musllibs \ - cargo $@ - + bash -c "umask 0000; cargo $ARGS"