Merge branch '9-tls' into 'master'

Resolve "TLS"

Closes #9

See merge request root/unki!5
4-update-check
ortem 3 years ago
commit 18343cc829
  1. 7
      .env
  2. 4
      .env.private.sample
  3. 5
      .gitignore
  4. 10
      Makefile
  5. 3
      bin/u_agent/Cargo.toml
  6. 8
      bin/u_agent/build.rs
  7. 6
      bin/u_agent/src/lib.rs
  8. 39
      bin/u_panel/src/main.rs
  9. 8
      bin/u_server/Cargo.toml
  10. 21
      bin/u_server/src/db.rs
  11. 19
      bin/u_server/src/filters.rs
  12. 31
      bin/u_server/src/handlers.rs
  13. 35
      bin/u_server/src/u_server.rs
  14. 17
      certs/gen_certs.sh
  15. 4
      integration/Cargo.toml
  16. 28
      integration/docker-compose.yml
  17. 2
      integration/tests/helpers/mod.rs
  18. 34
      integration/tests/helpers/panel.rs
  19. 87
      integration/tests/tests.rs
  20. 2
      lib/u_lib/Cargo.toml
  21. 25
      lib/u_lib/src/api.rs
  22. 27
      lib/u_lib/src/builder.rs
  23. 10
      lib/u_lib/src/datatypes.rs
  24. 4
      lib/u_lib/src/errors.rs
  25. 25
      lib/u_lib/src/executor.rs
  26. 1
      lib/u_lib/src/lib.rs
  27. 4
      lib/u_lib/src/messaging.rs
  28. 10
      lib/u_lib/src/models/agent.rs
  29. 10
      lib/u_lib/src/models/jobs/assigned.rs
  30. 9
      lib/u_lib/src/models/jobs/meta.rs
  31. 1
      lib/u_lib/src/models/mod.rs
  32. 3
      lib/u_lib/src/models/result.rs
  33. 4
      lib/u_lib/src/utils/combined_result.rs
  34. 2
      lib/u_lib/src/utils/misc.rs
  35. 2
      lib/u_lib/src/utils/vec_display.rs
  36. 7
      scripts/cargo_musl.sh

@ -1,3 +1,4 @@
ADMIN_AUTH_TOKEN=464af63dbd241969baa1e94b2461d94d
POSTGRES_PASSWORD=12348756
DATABASE_URL=postgres://postgres:${POSTGRES_PASSWORD}@u_db/u_db
DB_HOST=u_db
DB_NAME=u_db
DB_USER=postgres
RUST_BACKTRACE=1

@ -0,0 +1,4 @@
# remove '.sample' to activate
ADMIN_AUTH_TOKEN=
DB_PASSWORD=
POSTGRES_PASSWORD=${DB_PASSWORD}

5
.gitignore vendored

@ -2,5 +2,8 @@ target/
**/*.rs.bk
.idea/
data/
static/
**/*.pyc
certs/*
*.log
echoer
.env.private

@ -1,4 +1,4 @@
.PHONY: _pre_build debug release run clean
.PHONY: _pre_build debug release run clean unit-tests integration-tests test
CARGO=./scripts/cargo_musl.sh
@ -16,3 +16,11 @@ release: _pre_build
run: build
${CARGO} run
unit-tests:
${CARGO} test --lib
integration-tests:
cd ./integration && ./integration_tests.sh
test: unit-tests integration-tests

@ -15,3 +15,6 @@ uuid = "0.6.5"
reqwest = { version = "0.11", features = ["json"] }
openssl = "*"
u_lib = { version = "*", path = "../../lib/u_lib" }
[build-dependencies]
openssl = "*"

@ -0,0 +1,8 @@
use std::path::PathBuf;
fn main() {
let server_cert = PathBuf::from("../../certs/ca.crt");
if !server_cert.exists() {
panic!("CA certificate doesn't exist. Create it first with certs/gen_certs.sh");
}
}

@ -46,12 +46,12 @@ pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHand
}
}
info!(
"Scheduling jobs: \n{}",
"Scheduling jobs: {}",
job_requests
.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join("\n")
.join(", ")
);
let mut builder = JobBuilder::from_request(job_requests);
let errors = builder.pop_errors();
@ -77,7 +77,7 @@ pub async fn run_forever() {
info!("Connecting to the server");
loop {
let job_requests: Vec<AssignedJob> =
retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await).into_builtin_vec();
retry_until_ok!(instance.get_personal_jobs(Some(*UID)).await).into_builtin_vec();
process_request(job_requests, &instance).await;
let result: Vec<ExecResult> = pop_completed().await.into_iter().collect();
if result.len() > 0 {

@ -1,9 +1,9 @@
use serde::Serialize;
use std::env;
use std::fmt;
use structopt::StructOpt;
use u_lib::{
api::ClientHandler, messaging::AsMsg, models::JobMeta, utils::init_env, UError, UResult,
api::ClientHandler, datatypes::DataResult, messaging::AsMsg, models::JobMeta, utils::init_env,
UResult,
};
use uuid::Uuid;
@ -79,16 +79,13 @@ fn parse_uuid(src: &str) -> Result<Uuid, String> {
}
async fn process_cmd(args: Args) {
fn printer<Msg: AsMsg + fmt::Display>(data: UResult<Msg>, json: bool) {
if json {
#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "status", content = "data")]
enum DataResult<M> {
Ok(M),
Err(UError),
struct Printer {
json: bool,
}
impl Printer {
pub fn print<Msg: AsMsg + fmt::Display>(&self, data: UResult<Msg>) {
if self.json {
let data = match data {
Ok(r) => DataResult::Ok(r),
Err(e) => DataResult::Err(e),
@ -101,14 +98,15 @@ async fn process_cmd(args: Args) {
}
}
}
}
let token = env::var("ADMIN_AUTH_TOKEN").expect("Authentication token is not set");
let cli_handler = ClientHandler::new(None).password(token);
let json = args.json;
let printer = Printer { json: args.json };
match args.cmd {
Cmd::Agents(action) => match action {
LD::List { uid } => printer(cli_handler.get_agents(uid).await, json),
LD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json),
LD::List { uid } => printer.print(cli_handler.get_agents(uid).await),
LD::Delete { uid } => printer.print(cli_handler.del(Some(uid)).await),
},
Cmd::Jobs(action) => match action {
JobALD::Add {
@ -121,21 +119,18 @@ async fn process_cmd(args: Args) {
.with_alias(alias)
.build()
.unwrap();
printer(cli_handler.upload_jobs(&[job]).await, json);
printer.print(cli_handler.upload_jobs(&[job]).await);
}
JobALD::LD(LD::List { uid }) => printer(cli_handler.get_jobs(uid).await, json),
JobALD::LD(LD::Delete { uid }) => printer(cli_handler.del(Some(uid)).await, json),
JobALD::LD(LD::List { uid }) => printer.print(cli_handler.get_jobs(uid).await),
JobALD::LD(LD::Delete { uid }) => printer.print(cli_handler.del(Some(uid)).await),
},
Cmd::Jobmap(action) => match action {
JobMapALD::Add {
agent_uid,
job_idents,
} => printer(
cli_handler.set_jobs(Some(agent_uid), &job_idents).await,
json,
),
JobMapALD::List { uid } => printer(cli_handler.get_agent_jobs(uid).await, json),
JobMapALD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json),
} => printer.print(cli_handler.set_jobs(Some(agent_uid), &job_idents).await),
JobMapALD::List { uid } => printer.print(cli_handler.get_agent_jobs(uid).await),
JobMapALD::Delete { uid } => printer.print(cli_handler.del(Some(uid)).await),
},
}
}

@ -8,10 +8,10 @@ version = "0.1.0"
log = "0.4.11"
simplelog = "0.10"
thiserror = "*"
warp = "0.2.4"
warp = { version = "0.3.1", features = ["tls"] }
uuid = { version = "0.6.5", features = ["serde", "v4"] }
once_cell = "1.7.2"
hyper = "0.13.10"
hyper = "0.14"
mockall = "0.9.1"
mockall_double = "0.2"
openssl = "*"
@ -26,7 +26,7 @@ version = "1.0.114"
[dependencies.tokio]
features = ["macros"]
version = "0.2.22"
version = "1.9"
[dependencies.u_lib]
path = "../../lib/u_lib"
@ -37,7 +37,7 @@ test-case = "1.1.0"
[lib]
name = "u_server_lib"
path = "src/lib.rs"
path = "src/u_server.rs"
[[bin]]
name = "u_server"

@ -20,8 +20,16 @@ static DB: OnceCell<Arc<Mutex<UDB>>> = OnceCell::new();
impl UDB {
pub fn lock_db() -> MutexGuard<'static, UDB> {
DB.get_or_init(|| {
let db_path = env::var("DATABASE_URL").unwrap();
let conn = PgConnection::establish(&db_path).unwrap();
let _getenv = |v| env::var(v).unwrap();
let db_host = _getenv("DB_HOST");
let db_name = _getenv("DB_NAME");
let db_user = _getenv("DB_USER");
let db_password = _getenv("DB_PASSWORD");
let db_url = format!(
"postgres://{}:{}@{}/{}",
db_user, db_password, db_host, db_name
);
let conn = PgConnection::establish(&db_url).unwrap();
let instance = UDB { conn };
Arc::new(Mutex::new(instance))
})
@ -97,9 +105,9 @@ impl UDB {
) -> ULocalResult<Vec<AssignedJob>> {
use schema::results;
let mut q = results::table.into_boxed();
if uid.is_some() {
/*if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap()))
}
}*/
if personal {
q = q.filter(
results::state
@ -140,10 +148,13 @@ impl UDB {
}
let job_requests = job_uids
.iter()
.map(|job_uid| AssignedJob {
.map(|job_uid| {
info!("set_jobs_for_agent: set {} for {}", job_uid, agent_uid);
AssignedJob {
job_id: *job_uid,
agent_id: *agent_uid,
..Default::default()
}
})
.collect::<Vec<AssignedJob>>();
diesel::insert_into(results::table)

@ -48,12 +48,12 @@ pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection>
.map(Some)
.or_else(infallible_none),
)
.and_then(|uid| Endpoints::get_agent_jobs(uid, false));
.and_then(|uid| Endpoints::get_agent_jobs(uid));
let get_personal_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(warp::path("get_personal_jobs"))
.and(warp::path::param::<Uuid>().map(Some))
.and_then(|uid| Endpoints::get_agent_jobs(uid, true));
.and_then(|uid| Endpoints::get_personal_jobs(uid));
let del = warp::get()
.and(warp::path("del"))
@ -73,16 +73,15 @@ pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection>
let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str();
let auth_header = warp::header::exact("authorization", Box::leak(auth_token));
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report);
let auth_zone = auth_header.and(
get_agents
let auth_zone = (get_agents
.or(get_jobs)
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs),
);
.or(get_agent_jobs))
.and(auth_header);
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report);
agent_zone.or(auth_zone)
auth_zone.or(agent_zone)
}

@ -57,12 +57,16 @@ impl Endpoints {
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_agent_jobs(
uid: Option<Uuid>,
personal: bool,
) -> Result<Response<Body>, Rejection> {
info!("hnd: get_agent_jobs {}", personal);
if personal {
pub async fn get_agent_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
info!("hnd: get_agent_jobs");
UDB::lock_db()
.get_exact_jobs(uid, false)
.map(|m| build_message(m))
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_personal_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
info!("hnd: get_personal_jobs");
let agents = UDB::lock_db().get_agents(uid).unwrap();
if agents.len() == 0 {
let db = UDB::lock_db();
@ -72,15 +76,12 @@ impl Endpoints {
return Ok(build_err(e));
}
}
}
let result = UDB::lock_db().get_exact_jobs(uid, personal);
let result = UDB::lock_db().get_exact_jobs(uid, true);
match result {
Ok(r) => {
if personal {
let db = UDB::lock_db();
for j in r.iter() {
db.update_job_status(j.id, JobState::Running).ok();
}
db.update_job_status(j.id, JobState::Running).unwrap();
}
Ok(build_message(r))
}
@ -105,21 +106,22 @@ impl Endpoints {
for del_fn in del_fns {
let affected = del_fn(&db, &vec![uid]).unwrap();
if affected > 0 {
return Ok(build_ok(affected.to_string()));
return Ok(build_message(affected as i32));
}
}
Ok(build_err("0"))
Ok(build_message(0))
}
pub async fn set_jobs(
agent_uid: Uuid,
msg: BaseMessage<'static, Vec<String>>,
) -> Result<Response<Body>, Rejection> {
info!("hnd: set_jobs_by_alias");
info!("hnd: set_jobs_by_alias, agent: {}", agent_uid);
let jobs: Result<Vec<Uuid>, ULocalError> = msg
.into_inner()
.into_iter()
.map(|ident| {
info!("hnd: set_jobs_by_alias, job: {}", ident);
Uuid::parse_str(&ident)
.or_else(|_| UDB::lock_db().find_job_by_alias(&ident).map(|j| j.id))
})
@ -157,6 +159,7 @@ impl Endpoints {
a.state = AgentState::Active;
Self::add_agent(a).await?;
}
ExecResult::Dummy => (),
}
}
if failed.len() > 0 {

@ -43,14 +43,10 @@ fn init_logger() {
.create(true)
.open(LOGFILE)
.unwrap();
let level = LevelFilter::Info;
let loggers = vec![
WriteLogger::new(LevelFilter::Info, log_cfg.clone(), logfile) as Box<dyn SharedLogger>,
TermLogger::new(
LevelFilter::Info,
log_cfg,
TerminalMode::Stderr,
ColorChoice::Auto,
),
WriteLogger::new(level, log_cfg.clone(), logfile) as Box<dyn SharedLogger>,
TermLogger::new(level, log_cfg, TerminalMode::Stderr, ColorChoice::Auto),
];
CombinedLogger::init(loggers).unwrap();
}
@ -65,6 +61,10 @@ pub async fn serve() {
init_all();
let routes = make_filters();
warp::serve(routes.with(warp::log("warp")))
.tls()
.cert_path("./certs/server.crt")
.key_path("./certs/server.key")
.client_auth_required_path("./certs/ca.crt")
.run(([0, 0, 0, 0], MASTER_PORT))
.await;
}
@ -77,6 +77,7 @@ mod tests {
use handlers::build_ok;
use mockall::predicate::*;
use test_case::test_case;
use u_lib::messaging::{AsMsg, BaseMessage};
use uuid::Uuid;
use warp::test::request;
@ -85,9 +86,7 @@ mod tests {
#[tokio::test]
async fn test_get_agent_jobs_unauthorized(uid: Option<Uuid>) {
let mock = Endpoints::get_agent_jobs_context();
mock.expect()
.with(eq(uid), eq(uid.is_some()))
.returning(|_, _| Ok(build_ok("")));
mock.expect().with(eq(uid)).returning(|_| Ok(build_ok("")));
request()
.path(&format!(
"/get_agent_jobs/{}",
@ -99,4 +98,20 @@ mod tests {
.unwrap();
mock.checkpoint();
}
#[tokio::test]
async fn test_report_unauth_successful() {
let mock = Endpoints::report_context();
mock.expect()
.withf(|msg: &BaseMessage<'_, Vec<ExecResult>>| msg.inner_ref()[0] == ExecResult::Dummy)
.returning(|_| Ok(build_ok("")));
request()
.path("/report/")
.method("POST")
.json(&vec![ExecResult::Dummy].as_message())
.filter(&make_filters())
.await
.unwrap();
mock.checkpoint();
}
}

@ -0,0 +1,17 @@
set -ex
DIR=.
V3_CFG=v3.ext
cat > $DIR/$V3_CFG << EOF
authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment, keyAgreement, keyCertSign, cRLSign
EOF
openssl req -x509 -newkey rsa:4096 -keyout $DIR/ca.key -out $DIR/ca.crt -nodes -days 365 -subj "/CN=root"
openssl req -newkey rsa:4096 -keyout $DIR/alice.key -out $DIR/alice.csr -nodes -days 365 -subj "/CN=alice"
openssl req -newkey rsa:4096 -keyout $DIR/server.key -out $DIR/server.csr -nodes -days 365 -subj "/CN=u_server"
openssl x509 -req -in $DIR/alice.csr -CA $DIR/ca.crt -CAkey $DIR/ca.key -out $DIR/alice.crt -set_serial 01 -days 365 -extfile $DIR/$V3_CFG
openssl x509 -req -in $DIR/server.csr -CA $DIR/ca.crt -CAkey $DIR/ca.key -out $DIR/server.crt -set_serial 01 -days 365 -extfile $DIR/$V3_CFG
openssl pkcs12 -export -out $DIR/alice.p12 -inkey $DIR/alice.key -in $DIR/alice.crt -passin pass: -passout pass:

@ -17,6 +17,10 @@ serde = { version = "1.0.114", features = ["derive"] }
futures = "0.3.5"
shlex = "1.0.0"
[dependencies.u_lib]
path = "../lib/u_lib"
version = "*"
[[test]]
name = "integration"

@ -13,16 +13,19 @@ services:
- ../target/x86_64-unknown-linux-musl/release/u_server:/u_server
- ../:/unki/
working_dir: /unki
command: bash -c "diesel setup && diesel migration run && /u_server"
command: bash -c "
export DATABASE_URL=postgres://$${DB_USER}:$${DB_PASSWORD}@$${DB_HOST}/$${DB_NAME} &&
diesel setup && diesel migration run && /u_server"
depends_on:
u_db:
condition: service_healthy
expose:
- '63714'
environment:
RUST_LOG: warp=info
env_file:
- ../.env
- ../.env.private
environment:
RUST_LOG: trace
healthcheck:
test: /bin/ss -tlpn | grep 63714
interval: 5s
@ -37,6 +40,7 @@ services:
- '5432'
env_file:
- ../.env
- ../.env.private
healthcheck:
test: /bin/ss -tlpn | grep 5432
interval: 5s
@ -50,6 +54,10 @@ services:
volumes:
- ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent
command: /u_agent u_server
env_file:
- ../.env
environment:
RUST_LOG: u_agent=debug
depends_on:
u_server:
condition: service_healthy
@ -61,6 +69,10 @@ services:
volumes:
- ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent
command: /u_agent u_server
env_file:
- ../.env
environment:
RUST_LOG: u_agent=debug
depends_on:
u_server:
condition: service_healthy
@ -70,13 +82,14 @@ services:
networks:
- u_net
volumes:
- ~/.cargo/registry:/root/.cargo/registry
- ./:/tests/
- ../certs:/certs
- ../target/x86_64-unknown-linux-musl/release/u_panel:/u_panel
- ~/.cargo/registry:/root/.cargo/registry
- ../lib/u_lib:/lib/u_lib
- ../lib/u_api_proc_macro:/lib/u_api_proc_macro
working_dir:
/tests/
env_file:
- ../.env
depends_on:
u_agent_1:
condition: service_started
@ -84,6 +97,9 @@ services:
condition: service_started
u_server:
condition: service_healthy
env_file:
- ../.env
- ../.env.private
environment:
RUST_BACKTRACE: 1
U_SERVER: u_server

@ -1,5 +1,3 @@
pub mod client;
pub mod panel;
pub use client::AgentClient;
pub use panel::Panel;

@ -1,9 +1,13 @@
use serde_json::{from_slice, Value};
use serde::de::DeserializeOwned;
use serde_json::from_slice;
use shlex::split;
use std::process::{Command, Output};
use u_lib::{datatypes::DataResult, messaging::AsMsg};
const PANEL_BINARY: &str = "/u_panel";
type PanelResult<T> = Result<DataResult<T>, String>;
pub struct Panel;
impl Panel {
@ -15,13 +19,13 @@ impl Panel {
.unwrap()
}
pub fn output_argv(args: &[&str]) -> Value {
pub fn output_argv<T: AsMsg + DeserializeOwned>(args: &[&str]) -> PanelResult<T> {
let result = Self::run(args);
assert!(result.status.success());
from_slice(&result.stdout).unwrap()
from_slice(&result.stdout).map_err(|e| e.to_string())
}
pub fn output<S: Into<String>>(args: S) -> Value {
pub fn output<T: AsMsg + DeserializeOwned>(args: impl Into<String>) -> PanelResult<T> {
let splitted = split(args.into().as_ref()).unwrap();
Self::output_argv(
splitted
@ -32,22 +36,20 @@ impl Panel {
)
}
fn status_is_ok(data: &Value) {
assert_eq!(
data["status"], "ok",
"Panel failed with erroneous status: {}",
data["data"]
);
fn status_is_ok<T: AsMsg + DeserializeOwned>(data: PanelResult<T>) -> T {
match data.unwrap() {
DataResult::Ok(r) => r,
DataResult::Err(e) => panic!("Panel failed with erroneous status: {}", e),
}
}
pub fn check_status<S: Into<String>>(args: S) {
let result = Self::output(args);
Self::status_is_ok(&result);
pub fn check_status<'s, T: AsMsg + DeserializeOwned>(args: &'s str) {
let result: PanelResult<T> = Self::output(args);
Self::status_is_ok(result);
}
pub fn check_output<S: Into<String>>(args: S) -> Vec<Value> {
pub fn check_output<T: AsMsg + DeserializeOwned>(args: impl Into<String>) -> T {
let result = Self::output(args);
Self::status_is_ok(&result);
result["data"].as_array().unwrap().clone()
Self::status_is_ok(result)
}
}

@ -1,80 +1,67 @@
mod helpers;
use helpers::{AgentClient, Panel};
use helpers::Panel;
use serde_json::json;
use std::error::Error;
use std::thread::sleep;
use std::time::Duration;
use u_lib::{api::ClientHandler, models::*};
use uuid::Uuid;
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
type TestResult<R = ()> = Result<R, Box<dyn Error>>;
async fn register_agent() -> Uuid {
let cli = AgentClient::new();
let cli = ClientHandler::new(None);
let agent_uid = Uuid::new_v4();
let resp = cli.get(format!("get_agent_jobs/{}", agent_uid)).await;
let job_id = &resp["job_id"];
let resp = cli.get(format!("get_jobs/{}", job_id)).await;
assert_eq!(&resp["alias"], "agent_hello");
let agent_data = json! {
{"id": &agent_uid,"inner":[
{"Agent":
{"alias":null,
"hostname":"3b1030fa6324",
"id":&agent_uid,
"is_root":false,
"is_root_allowed":false,
"last_active":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814921},
"platform":"x86_64-unknown-linux-gnu",
"regtime":{"secs_since_epoch":1625271265,"nanos_since_epoch":92814945},
"state":"New",
"token":null,
"username":"root"}
}
]}
let resp = cli
.get_personal_jobs(Some(agent_uid))
.await
.unwrap()
.pop()
.unwrap();
let job_id = resp.job_id;
let resp = cli.get_jobs(Some(job_id)).await.unwrap().pop().unwrap();
assert_eq!(resp.alias, Some("agent_hello".to_string()));
let agent_data = Agent {
id: agent_uid,
..Default::default()
};
cli.post("report", &agent_data).await;
cli.report(&vec![ExecResult::Agent(agent_data)])
.await
.unwrap();
agent_uid
}
#[tokio::test]
async fn test_registration() -> TestResult {
let agent_uid = register_agent().await;
let agents = Panel::check_output("agents list");
let found = agents
.iter()
.find(|v| v["id"].as_str().unwrap() == agent_uid.to_string());
let agents: Vec<Agent> = Panel::check_output("agents list");
let found = agents.iter().find(|v| v.id == agent_uid);
assert!(found.is_some());
//teardown
Panel::check_status::<i32>(&format!("agents delete {}", agent_uid));
Ok(())
}
#[tokio::test]
async fn test_setup_tasks() -> TestResult {
let agent_uid = Panel::check_output("agents list")[0]["id"].clone();
dbg!(&agent_uid);
//some independent agents should present
let agents: Vec<Agent> = Panel::check_output("agents list");
let agent_uid = agents[0].id;
let job_alias = "passwd_contents";
let cmd = format!("jobs add --alias {} 'cat /etc/passwd'", job_alias);
Panel::check_status(cmd);
Panel::check_status::<Empty>(&cmd);
let cmd = format!("jobmap add {} {}", agent_uid, job_alias);
let assigned_uids = Panel::check_output(cmd);
dbg!(&assigned_uids);
loop {
let result = Panel::check_output(format!("jobmap list {}", assigned_uids[0]));
dbg!(&result);
match result.get(0) {
Some(entry) if entry["state"] == "Finished" => {
println!("{}", result[0]);
break;
}
None => {
eprintln!("jobmap list is empty (bad bad bad)");
continue;
}
_ => {
sleep(Duration::from_secs(1));
let assigned_uids: Vec<Uuid> = Panel::check_output(cmd);
for _ in 0..3 {
let result: Vec<AssignedJob> =
Panel::check_output(format!("jobmap list {}", assigned_uids[0]));
if result[0].state == JobState::Finished {
return Ok(());
} else {
sleep(Duration::from_secs(5));
eprintln!("waiting for task");
}
}
}
Ok(())
panic!()
}

@ -14,7 +14,7 @@ nix = "0.17"
libc = "^0.2"
lazy_static = "1.4.0"
tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] }
reqwest = { version = "0.11", features = ["json"] }
reqwest = { version = "0.11", features = ["json", "native-tls"] }
openssl = "*"
futures = "0.3.5"
guess_host_triple = "0.1.2"

@ -6,11 +6,14 @@ use crate::{
utils::{opt_to_string, VecDisplay},
UError, UResult,
};
use reqwest::{Client, RequestBuilder, Url};
use reqwest::{Certificate, Client, Identity, RequestBuilder, Url};
use std::env;
use u_api_proc_macro::api_route;
use uuid::Uuid;
const AGENT_IDENTITY: &[u8] = include_bytes!("../../../certs/alice.p12");
const ROOT_CA_CERT: &[u8] = include_bytes!("../../../certs/ca.crt");
pub struct ClientHandler {
base_url: Url,
client: Client,
@ -21,9 +24,15 @@ impl ClientHandler {
pub fn new(server: Option<&str>) -> Self {
let env_server = env::var("U_SERVER").unwrap_or(String::from(MASTER_SERVER));
let master_server = server.unwrap_or(env_server.as_str());
let identity = Identity::from_pkcs12_der(AGENT_IDENTITY, "").unwrap();
let client = Client::builder()
.identity(identity)
.add_root_certificate(Certificate::from_pem(ROOT_CA_CERT).unwrap())
.build()
.unwrap();
Self {
client: Client::new(),
base_url: Url::parse(&format!("http://{}:{}", master_server, MASTER_PORT)).unwrap(),
client,
base_url: Url::parse(&format!("https://{}:{}", master_server, MASTER_PORT)).unwrap(),
password: None,
}
}
@ -52,7 +61,7 @@ impl ClientHandler {
//
// get jobs for client
#[api_route("GET")]
fn get_agent_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<models::AssignedJob> {}
fn get_personal_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<models::AssignedJob> {}
//
// send something to server
#[api_route("POST")]
@ -73,9 +82,13 @@ impl ClientHandler {
//
// delete something
#[api_route("GET")]
fn del(&self, url_param: Option<Uuid>) -> String {}
fn del(&self, url_param: Option<Uuid>) -> i32 {}
//
// set jobs for client
// set jobs for any client
#[api_route("POST")]
fn set_jobs(&self, url_param: Option<Uuid>, payload: &[String]) -> VecDisplay<Uuid> {}
//
// get jobs for any client
#[api_route("GET")]
fn get_agent_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<models::AssignedJob> {}
}

@ -1,7 +1,7 @@
use crate::{
cache::JobCache,
executor::{FutRes, Waiter, DynFut},
models::{Agent, AssignedJob, JobMeta, JobType},
executor::{Waiter, DynFut},
models::{Agent, AssignedJob, JobMeta, JobType, ExecResult},
utils::{CombinedResult, OneOrVec},
UError,
};
@ -13,7 +13,7 @@ pub struct JobBuilder {
}
impl JobBuilder {
pub fn from_request<J: OneOrVec<AssignedJob>>(job_requests: J) -> CombinedResult<Self> {
pub fn from_request(job_requests: impl OneOrVec<AssignedJob>) -> CombinedResult<Self> {
let job_requests = job_requests.into_vec();
let mut prepared: Vec<DynFut> = vec![];
let mut result = CombinedResult::new();
@ -24,6 +24,7 @@ impl JobBuilder {
continue;
}
let job_meta = job_meta.unwrap();
//waiting for try-blocks stabilization
let built_req = (|| {
Ok(match job_meta.exec_type {
JobType::Shell => {
@ -52,7 +53,7 @@ impl JobBuilder {
result
}
pub fn from_meta<J: OneOrVec<JobMeta>>(job_metas: J) -> CombinedResult<Self> {
pub fn from_meta(job_metas: impl OneOrVec<JobMeta>) -> CombinedResult<Self> {
let job_requests = job_metas
.into_vec()
.into_iter()
@ -72,12 +73,12 @@ impl JobBuilder {
}
/// Spawn jobs and wait for result
pub async fn wait(self) -> Vec<FutRes> {
pub async fn wait(self) -> Vec<ExecResult> {
self.jobs.spawn().await.wait().await
}
/// Spawn one job and wait for result
pub async fn wait_one(self) -> FutRes {
pub async fn wait_one(self) -> ExecResult {
self.jobs.spawn().await.wait().await.pop().unwrap()
}
}
@ -86,12 +87,12 @@ impl JobBuilder {
pub struct NamedJobBuilder {
builder: Option<JobBuilder>,
job_names: Vec<&'static str>,
results: HashMap<&'static str, FutRes>,
results: HashMap<&'static str, ExecResult>,
}
impl NamedJobBuilder {
pub fn from_shell<J: OneOrVec<(&'static str, &'static str)>>(
named_jobs: J,
pub fn from_shell(
named_jobs: impl OneOrVec<(&'static str, &'static str)>,
) -> CombinedResult<Self> {
let mut result = CombinedResult::new();
let jobs: Vec<(&'static str, JobMeta)> = named_jobs
@ -111,7 +112,7 @@ impl NamedJobBuilder {
result
}
pub fn from_meta<J: OneOrVec<(&'static str, JobMeta)>>(named_jobs: J) -> Self {
pub fn from_meta(named_jobs: impl OneOrVec<(&'static str, JobMeta)>) -> Self {
let mut job_names = vec![];
let job_metas: Vec<JobMeta> = named_jobs
.into_vec()
@ -136,11 +137,11 @@ impl NamedJobBuilder {
self
}
pub fn pop_opt(&mut self, name: &'static str) -> Option<FutRes> {
pub fn pop_opt(&mut self, name: &'static str) -> Option<ExecResult> {
self.results.remove(name)
}
pub fn pop(&mut self, name: &'static str) -> FutRes {
pub fn pop(&mut self, name: &'static str) -> ExecResult {
self.pop_opt(name).unwrap()
}
}
@ -180,7 +181,7 @@ mod tests {
;"sh payload"
)]
#[test_case(
r#"/usr/bin/python -c 'print("test02")'"#,
r#"/usr/bin/python3 -c 'print("test02")'"#,
None,
"test02"
;"python cmd"

@ -0,0 +1,10 @@
use crate::UError;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "status", content = "data")]
pub enum DataResult<M> {
Ok(M),
Err(UError),
}

@ -10,7 +10,7 @@ pub type ULocalResult<T> = std::result::Result<T, ULocalError>;
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum UError {
#[error("Error: {0}")]
Raw(&'static str),
Raw(String),
#[error("Connection error: {0}. Body: {1}")]
NetError(String, String),
@ -22,7 +22,7 @@ pub enum UError {
JobError(String),
#[error("Argument parsing failed: {0}")]
JobArgsError(&'static str),
JobArgsError(String),
#[error("Job is uncompleted yet")]
JobUncompleted,

@ -9,8 +9,7 @@ use tokio::{
};
use uuid::Uuid;
pub type FutRes = ExecResult;
pub type DynFut = BoxFuture<'static, FutRes>;
pub type DynFut = BoxFuture<'static, ExecResult>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
@ -22,7 +21,7 @@ lazy_static! {
}
struct JoinInfo {
handle: JoinHandle<FutRes>,
handle: JoinHandle<ExecResult>,
completed: bool,
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
}
@ -37,7 +36,7 @@ pub struct Waiter {
}
impl Waiter {
pub fn new<S: OneOrVec<DynFut>>(tasks: S) -> Self {
pub fn new(tasks: impl OneOrVec<DynFut>) -> Self {
Self {
tasks: tasks.into_vec(),
fids: vec![],
@ -66,10 +65,10 @@ impl Waiter {
self
}
// wait until a bunch of tasks is finished
// NOT GUARANTEED that all tasks will be returned due to
// possibility to pop them in other places
pub async fn wait(self) -> Vec<FutRes> {
/// Wait until a bunch of tasks is finished.
/// NOT GUARANTEED that all tasks will be returned due to
/// possibility to pop them in other places
pub async fn wait(self) -> Vec<ExecResult> {
let mut result = vec![];
for fid in self.fids {
if let Some(task) = pop_task(fid).await {
@ -83,10 +82,8 @@ impl Waiter {
async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
//info!("init_receiver: next val: {}", &fid);
if let Some(mut lock) = FUT_RESULTS.try_lock() {
if let Some(j) = lock.get_mut(&fid) {
//info!("init_receiver: marked as completed");
j.completed = true;
}
}
@ -96,12 +93,8 @@ async fn init_receiver() {
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
/*
async fn insert_task(fid: Uuid) {
FUT_RESULTS.lock().await.remove(&fid)
}
*/
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> {
let &mut JoinInfo {
handle: _,
collectable,

@ -3,6 +3,7 @@ pub mod api;
pub mod builder;
pub mod cache;
pub mod config;
pub mod datatypes;
pub mod errors;
pub mod executor;
pub mod messaging;

@ -55,4 +55,8 @@ impl<'cow, I: AsMsg> BaseMessage<'cow, I> {
pub fn into_inner(self) -> I {
self.inner.into_owned()
}
pub fn inner_ref(&self) -> &I {
self.inner.as_ref()
}
}

@ -26,7 +26,15 @@ pub enum AgentState {
//belongs_to
#[derive(
Clone, Debug, Serialize, Deserialize, Identifiable, Queryable, Insertable, AsChangeset,
Clone,
Debug,
Serialize,
Deserialize,
Identifiable,
Queryable,
Insertable,
AsChangeset,
PartialEq,
)]
#[table_name = "agents"]
pub struct Agent {

@ -12,7 +12,15 @@ use tokio::process::Command;
use uuid::Uuid;
#[derive(
Serialize, Deserialize, Clone, Debug, Queryable, Identifiable, Insertable, AsChangeset,
Serialize,
Deserialize,
Clone,
Debug,
Queryable,
Identifiable,
Insertable,
AsChangeset,
PartialEq,
)]
#[table_name = "results"]
pub struct AssignedJob {

@ -115,8 +115,8 @@ impl JobMetaBuilder {
match inner.exec_type {
JobType::Shell => {
let argv_parts =
shlex::split(&inner.argv).ok_or(UError::JobArgsError("Shlex failed"))?;
let empty_err = UError::JobArgsError("Empty argv");
shlex::split(&inner.argv).ok_or(UError::JobArgsError("Shlex failed".into()))?;
let empty_err = UError::JobArgsError("Empty argv".into());
if argv_parts.get(0).ok_or(empty_err.clone())?.len() == 0 {
return Err(empty_err);
}
@ -124,7 +124,7 @@ impl JobMetaBuilder {
Some(_) => {
if !inner.argv.contains("{}") {
return Err(UError::JobArgsError(
"Argv contains no executable placeholder",
"Argv contains no executable placeholder".into(),
));
} else {
()
@ -133,7 +133,8 @@ impl JobMetaBuilder {
None => {
if inner.argv.contains("{}") {
return Err(UError::JobArgsError(
"No payload provided, but argv contains executable placeholder",
"No payload provided, but argv contains executable placeholder"
.into(),
));
} else {
()

@ -17,6 +17,7 @@ impl AsMsg for JobMeta {}
impl AsMsg for String {}
impl AsMsg for Uuid {}
impl AsMsg for Empty {}
impl AsMsg for i32 {}
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
pub struct Empty;

@ -1,8 +1,9 @@
use crate::models::{Agent, AssignedJob};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, PartialEq)]
pub enum ExecResult {
Assigned(AssignedJob),
Agent(Agent),
Dummy,
}

@ -14,11 +14,11 @@ impl<T, E> CombinedResult<T, E> {
}
}
pub fn ok<I: OneOrVec<T>>(&mut self, result: I) {
pub fn ok(&mut self, result: impl OneOrVec<T>) {
self.ok.extend(result.into_vec());
}
pub fn err<I: OneOrVec<E>>(&mut self, err: I) {
pub fn err(&mut self, err: impl OneOrVec<E>) {
self.err.extend(err.into_vec());
}

@ -64,7 +64,7 @@ pub fn setsig(sig: Signal, hnd: SigHandler) {
}
pub fn init_env() {
let envs = [".env"];
let envs = [".env", ".env.private"];
for envfile in &envs {
dotenv::from_filename(envfile).ok();
}

@ -7,7 +7,7 @@ use std::ops::{Deref, DerefMut};
pub struct VecDisplay<T: AsMsg + Display>(pub Vec<T>);
impl<T: AsMsg + Display> VecDisplay<T> {
pub fn new<I: OneOrVec<T>>(inner: I) -> Self {
pub fn new(inner: impl OneOrVec<T>) -> Self {
VecDisplay(inner.into_vec())
}

@ -1,11 +1,12 @@
#!/bin/bash
set -e
set -ex
source $(dirname $0)/rootdir.sh #set ROOTDIR
umask 002
ARGS=$@
docker run \
--env-file $ROOTDIR/.env \
-v $ROOTDIR:/volume \
-v cargo-cache:/root/.cargo/registry \
-w /volume \
-it \
unki/musllibs \
cargo $@
bash -c "umask 0000; cargo $ARGS"

Loading…
Cancel
Save