server logger added

14-integration-tests
plazmoid 3 years ago
parent 4e88f49f96
commit 4bfcdd2b23
  1. 30
      bin/u_panel/src/main.rs
  2. 2
      bin/u_server/Cargo.toml
  3. 9
      bin/u_server/src/db.rs
  4. 88
      bin/u_server/src/filters.rs
  5. 35
      bin/u_server/src/handlers.rs
  6. 137
      bin/u_server/src/lib.rs
  7. 2
      integration/docker-compose.yml
  8. 21
      integration/tests/helpers/panel.rs
  9. 40
      integration/tests/tests.rs
  10. 14
      lib/u_lib/src/api.rs

@ -19,7 +19,7 @@ struct Args {
enum Cmd { enum Cmd {
Agents(LD), Agents(LD),
Jobs(JobALD), Jobs(JobALD),
Jobmap(JmALD), Jobmap(JobMapALD),
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
@ -28,6 +28,9 @@ enum JobALD {
#[structopt(long, parse(try_from_str = parse_uuid))] #[structopt(long, parse(try_from_str = parse_uuid))]
agent: Option<Uuid>, agent: Option<Uuid>,
#[structopt(long)]
alias: String,
#[structopt(subcommand)] #[structopt(subcommand)]
cmd: JobCmd, cmd: JobCmd,
}, },
@ -42,13 +45,12 @@ enum JobCmd {
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
enum JmALD { enum JobMapALD {
Add { Add {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
agent_uid: Uuid, agent_uid: Uuid,
#[structopt(parse(try_from_str = parse_uuid))] job_idents: Vec<String>,
job_uids: Vec<Uuid>,
}, },
List { List {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
@ -111,21 +113,29 @@ async fn process_cmd(args: Args) {
Cmd::Jobs(action) => match action { Cmd::Jobs(action) => match action {
JobALD::Add { JobALD::Add {
cmd: JobCmd::Cmd(cmd), cmd: JobCmd::Cmd(cmd),
alias,
agent: _agent, agent: _agent,
} => { } => {
let job = JobMeta::from_shell(cmd.join(" ")).unwrap(); let job = JobMeta::builder()
.with_shell(cmd.join(" "))
.with_alias(alias)
.build()
.unwrap();
printer(cli_handler.upload_jobs(&[job]).await, json); printer(cli_handler.upload_jobs(&[job]).await, json);
} }
JobALD::LD(LD::List { uid }) => printer(cli_handler.get_jobs(uid).await, json), JobALD::LD(LD::List { uid }) => printer(cli_handler.get_jobs(uid).await, json),
JobALD::LD(LD::Delete { uid }) => printer(cli_handler.del(Some(uid)).await, json), JobALD::LD(LD::Delete { uid }) => printer(cli_handler.del(Some(uid)).await, json),
}, },
Cmd::Jobmap(action) => match action { Cmd::Jobmap(action) => match action {
JmALD::Add { JobMapALD::Add {
agent_uid, agent_uid,
job_uids, job_idents,
} => printer(cli_handler.set_jobs(Some(agent_uid), &job_uids).await, json), } => printer(
JmALD::List { uid } => printer(cli_handler.get_agent_jobs(uid).await, json), cli_handler.set_jobs(Some(agent_uid), &job_idents).await,
JmALD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json), json,
),
JobMapALD::List { uid } => printer(cli_handler.get_agent_jobs(uid).await, json),
JobMapALD::Delete { uid } => printer(cli_handler.del(Some(uid)).await, json),
}, },
} }
} }

@ -5,8 +5,8 @@ name = "u_server"
version = "0.1.0" version = "0.1.0"
[dependencies] [dependencies]
env_logger = "0.7.1"
log = "0.4.11" log = "0.4.11"
simplelog = "0.10"
thiserror = "*" thiserror = "*"
warp = "0.2.4" warp = "0.2.4"
uuid = { version = "0.6.5", features = ["serde", "v4"] } uuid = { version = "0.6.5", features = ["serde", "v4"] }

@ -116,7 +116,11 @@ impl UDB {
Ok(result) Ok(result)
} }
pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &[Uuid]) -> ULocalResult<()> { pub fn set_jobs_for_agent(
&self,
agent_uid: &Uuid,
job_uids: &[Uuid],
) -> ULocalResult<Vec<Uuid>> {
use schema::{agents::dsl::agents, jobs::dsl::jobs, results}; use schema::{agents::dsl::agents, jobs::dsl::jobs, results};
if let Err(DslError::NotFound) = agents.find(agent_uid).first::<Agent>(&self.conn) { if let Err(DslError::NotFound) = agents.find(agent_uid).first::<Agent>(&self.conn) {
return Err(ULocalError::NotFound(agent_uid.to_string())); return Err(ULocalError::NotFound(agent_uid.to_string()));
@ -145,7 +149,8 @@ impl UDB {
diesel::insert_into(results::table) diesel::insert_into(results::table)
.values(&job_requests) .values(&job_requests)
.execute(&self.conn)?; .execute(&self.conn)?;
Ok(()) let assigned_uids = job_requests.iter().map(|aj| aj.id).collect();
Ok(assigned_uids)
} }
pub fn del_jobs(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> { pub fn del_jobs(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {

@ -0,0 +1,88 @@
use crate::handlers::Endpoints;
use serde::de::DeserializeOwned;
use std::env;
use u_lib::{
messaging::{AsMsg, BaseMessage},
models::*,
};
use uuid::Uuid;
use warp::{body, Filter, Rejection, Reply};
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
}
pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) };
let get_agents = warp::get()
.and(warp::path("get_agents"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_agents);
let upload_jobs = warp::post()
.and(warp::path("upload_jobs"))
.and(get_content::<Vec<JobMeta>>())
.and_then(Endpoints::upload_jobs);
let get_jobs = warp::get()
.and(warp::path("get_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_jobs);
let get_agent_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(|uid| Endpoints::get_agent_jobs(uid, false));
let get_personal_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(warp::path::param::<Uuid>().map(Some))
.and_then(|uid| Endpoints::get_agent_jobs(uid, true));
let del = warp::get()
.and(warp::path("del"))
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::del);
let set_jobs = warp::post()
.and(warp::path("set_jobs"))
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<String>>())
.and_then(Endpoints::set_jobs);
let report = warp::post()
.and(warp::path("report"))
.and(get_content::<Vec<ExecResult>>().and_then(Endpoints::report));
let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str();
let auth_header = warp::header::exact("authorization", Box::leak(auth_token));
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report);
let auth_zone = auth_header.and(
get_agents
.or(get_jobs)
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs),
);
agent_zone.or(auth_zone)
}

@ -34,7 +34,7 @@ pub struct Endpoints;
#[cfg_attr(test, automock)] #[cfg_attr(test, automock)]
impl Endpoints { impl Endpoints {
pub async fn add_agent(msg: Agent) -> Result<Response<Body>, Rejection> { pub async fn add_agent(msg: Agent) -> Result<Response<Body>, Rejection> {
debug!("hnd: add_agent"); info!("hnd: add_agent");
UDB::lock_db() UDB::lock_db()
.insert_agent(&msg) .insert_agent(&msg)
.map(|_| build_ok("")) .map(|_| build_ok(""))
@ -42,7 +42,7 @@ impl Endpoints {
} }
pub async fn get_agents(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> { pub async fn get_agents(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
debug!("hnd: get_agents"); info!("hnd: get_agents");
UDB::lock_db() UDB::lock_db()
.get_agents(uid) .get_agents(uid)
.map(|m| build_message(m)) .map(|m| build_message(m))
@ -50,7 +50,7 @@ impl Endpoints {
} }
pub async fn get_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> { pub async fn get_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
debug!("hnd: get_jobs"); info!("hnd: get_jobs");
UDB::lock_db() UDB::lock_db()
.get_jobs(uid) .get_jobs(uid)
.map(|m| build_message(m)) .map(|m| build_message(m))
@ -91,7 +91,7 @@ impl Endpoints {
pub async fn upload_jobs( pub async fn upload_jobs(
msg: BaseMessage<'static, Vec<JobMeta>>, msg: BaseMessage<'static, Vec<JobMeta>>,
) -> Result<Response<Body>, Rejection> { ) -> Result<Response<Body>, Rejection> {
debug!("hnd: upload_jobs"); info!("hnd: upload_jobs");
UDB::lock_db() UDB::lock_db()
.insert_jobs(&msg.into_inner()) .insert_jobs(&msg.into_inner())
.map(|_| build_ok("")) .map(|_| build_ok(""))
@ -99,7 +99,7 @@ impl Endpoints {
} }
pub async fn del(uid: Uuid) -> Result<Response<Body>, Rejection> { pub async fn del(uid: Uuid) -> Result<Response<Body>, Rejection> {
debug!("hnd: del"); info!("hnd: del");
let db = UDB::lock_db(); let db = UDB::lock_db();
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns { for del_fn in del_fns {
@ -113,19 +113,30 @@ impl Endpoints {
pub async fn set_jobs( pub async fn set_jobs(
agent_uid: Uuid, agent_uid: Uuid,
msg: BaseMessage<'static, Vec<Uuid>>, msg: BaseMessage<'static, Vec<String>>,
) -> Result<Response<Body>, Rejection> { ) -> Result<Response<Body>, Rejection> {
debug!("hnd: set_jobs"); info!("hnd: set_jobs_by_alias");
UDB::lock_db() let jobs: Result<Vec<Uuid>, ULocalError> = msg
.set_jobs_for_agent(&agent_uid, &msg.into_inner()) .into_inner()
.map(|_| build_ok("")) .into_iter()
.or_else(|e| Ok(build_err(e))) .map(|ident| {
Uuid::parse_str(&ident)
.or_else(|_| UDB::lock_db().find_job_by_alias(&ident).map(|j| j.id))
})
.collect();
match jobs {
Ok(j) => UDB::lock_db()
.set_jobs_for_agent(&agent_uid, &j)
.map(|assigned_uids| build_message(assigned_uids))
.or_else(|e| Ok(build_err(e))),
Err(e) => Ok(build_err(e)),
}
} }
pub async fn report( pub async fn report(
msg: BaseMessage<'static, Vec<ExecResult>>, msg: BaseMessage<'static, Vec<ExecResult>>,
) -> Result<Response<Body>, Rejection> { ) -> Result<Response<Body>, Rejection> {
debug!("hnd: report"); info!("hnd: report");
let id = msg.id; let id = msg.id;
let mut failed = vec![]; let mut failed = vec![];
for entry in msg.into_inner() { for entry in msg.into_inner() {

@ -1,11 +1,5 @@
mod db;
mod handlers;
use warp::{body, Filter, Rejection, Reply};
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate env_logger;
#[macro_use] #[macro_use]
extern crate mockall; extern crate mockall;
@ -17,26 +11,16 @@ extern crate openssl;
#[macro_use] #[macro_use]
extern crate diesel; extern crate diesel;
// //
mod db;
mod filters;
mod handlers;
use db::UDB; use db::UDB;
#[double] use filters::make_filters;
use handlers::Endpoints; use u_lib::{config::MASTER_PORT, models::*, utils::init_env};
use serde::de::DeserializeOwned; use warp::Filter;
use std::env;
use u_lib::{
config::MASTER_PORT,
messaging::{AsMsg, BaseMessage},
models::*,
utils::init_env,
};
use uuid::Uuid;
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone const LOGFILE: &str = "u_server.log";
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
}
fn prefill_jobs() { fn prefill_jobs() {
let agent_hello = JobMeta::builder() let agent_hello = JobMeta::builder()
@ -47,86 +31,38 @@ fn prefill_jobs() {
UDB::lock_db().insert_jobs(&[agent_hello]).ok(); UDB::lock_db().insert_jobs(&[agent_hello]).ok();
} }
fn init() { fn init_logger() {
init_env(); use simplelog::*;
env_logger::init(); use std::fs::OpenOptions;
prefill_jobs(); let log_cfg = ConfigBuilder::new()
.set_time_format_str("%x %X")
.set_time_to_local(true)
.build();
let logfile = OpenOptions::new()
.append(true)
.create(true)
.open(LOGFILE)
.unwrap();
let loggers = vec![
WriteLogger::new(LevelFilter::Info, log_cfg.clone(), logfile) as Box<dyn SharedLogger>,
TermLogger::new(
LevelFilter::Info,
log_cfg,
TerminalMode::Stderr,
ColorChoice::Auto,
),
];
CombinedLogger::init(loggers).unwrap();
} }
fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { fn init_all() {
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) }; init_logger();
init_env();
let get_agents = warp::get() prefill_jobs();
.and(warp::path("get_agents"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_agents);
let upload_jobs = warp::post()
.and(warp::path("upload_jobs"))
.and(get_content::<Vec<JobMeta>>())
.and_then(Endpoints::upload_jobs);
let get_jobs = warp::get()
.and(warp::path("get_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_jobs);
let get_agent_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(|uid| Endpoints::get_agent_jobs(uid, false));
let get_personal_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(warp::path::param::<Uuid>().map(Some))
.and_then(|uid| Endpoints::get_agent_jobs(uid, true));
let del = warp::get()
.and(warp::path("del"))
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::del);
let set_jobs = warp::post()
.and(warp::path("set_jobs"))
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<Uuid>>())
.and_then(Endpoints::set_jobs);
let report = warp::post()
.and(warp::path("report"))
.and(get_content::<Vec<ExecResult>>().and_then(Endpoints::report));
let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str();
let auth_header = warp::header::exact("authorization", Box::leak(auth_token));
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report);
let auth_zone = auth_header.and(
get_agents
.or(get_jobs)
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs),
);
agent_zone.or(auth_zone)
} }
pub async fn serve() { pub async fn serve() {
init(); init_all();
let routes = make_filters(); let routes = make_filters();
warp::serve(routes.with(warp::log("warp"))) warp::serve(routes.with(warp::log("warp")))
.run(([0, 0, 0, 0], MASTER_PORT)) .run(([0, 0, 0, 0], MASTER_PORT))
@ -136,9 +72,12 @@ pub async fn serve() {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
#[double]
use crate::handlers::Endpoints;
use handlers::build_ok; use handlers::build_ok;
use mockall::predicate::*; use mockall::predicate::*;
use test_case::test_case; use test_case::test_case;
use uuid::Uuid;
use warp::test::request; use warp::test::request;
#[test_case(Some(Uuid::new_v4()))] #[test_case(Some(Uuid::new_v4()))]
@ -158,6 +97,6 @@ mod tests {
.filter(&make_filters()) .filter(&make_filters())
.await .await
.unwrap(); .unwrap();
mock.checkpoint() mock.checkpoint();
} }
} }

@ -20,7 +20,7 @@ services:
expose: expose:
- '63714' - '63714'
environment: environment:
RUST_LOG: warp RUST_LOG: warp=info
env_file: env_file:
- ../.env - ../.env
healthcheck: healthcheck:

@ -21,8 +21,8 @@ impl Panel {
from_slice(&result.stdout).unwrap() from_slice(&result.stdout).unwrap()
} }
pub fn output(args: &str) -> Value { pub fn output<S: Into<String>>(args: S) -> Value {
let splitted = split(args).unwrap(); let splitted = split(args.into().as_ref()).unwrap();
Self::output_argv( Self::output_argv(
splitted splitted
.iter() .iter()
@ -32,9 +32,22 @@ impl Panel {
) )
} }
pub fn check_output(args: &str) -> Vec<Value> { fn status_is_ok(data: &Value) {
assert_eq!(
data["status"], "ok",
"Panel failed with erroneous status: {}",
data["data"]
);
}
pub fn check_status<S: Into<String>>(args: S) {
let result = Self::output(args);
Self::status_is_ok(&result);
}
pub fn check_output<S: Into<String>>(args: S) -> Vec<Value> {
let result = Self::output(args); let result = Self::output(args);
assert_eq!(result["status"], "ok"); Self::status_is_ok(&result);
result["data"].as_array().unwrap().clone() result["data"].as_array().unwrap().clone()
} }
} }

@ -3,6 +3,8 @@ mod helpers;
use helpers::{AgentClient, Panel}; use helpers::{AgentClient, Panel};
use serde_json::json; use serde_json::json;
use std::thread::sleep;
use std::time::Duration;
use uuid::Uuid; use uuid::Uuid;
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>; type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
@ -28,27 +30,51 @@ async fn register_agent() -> Uuid {
"state":"New", "state":"New",
"token":null, "token":null,
"username":"root"} "username":"root"}
}]} }
]}
}; };
cli.post("report", &agent_data).await; cli.post("report", &agent_data).await;
agent_uid agent_uid
} }
#[tokio::test] #[tokio::test]
async fn test_first_connection() -> TestResult { async fn test_registration() -> TestResult {
let uid = register_agent().await; let agent_uid = register_agent().await;
let agents = Panel::check_output("agents list"); let agents = Panel::check_output("agents list");
dbg!(&agents);
assert_eq!(agents.len(), 2);
let found = agents let found = agents
.iter() .iter()
.find(|v| v["id"].as_str().unwrap() == uid.to_string()); .find(|v| v["id"].as_str().unwrap() == agent_uid.to_string());
assert!(found.is_some()); assert!(found.is_some());
Ok(()) Ok(())
} }
#[tokio::test] #[tokio::test]
async fn test_setup_tasks() -> TestResult { async fn test_setup_tasks() -> TestResult {
register_agent().await; let agent_uid = Panel::check_output("agents list")[0]["id"].clone();
dbg!(&agent_uid);
let job_alias = "passwd_contents";
let cmd = format!("jobs add --alias {} 'cat /etc/passwd'", job_alias);
Panel::check_status(cmd);
let cmd = format!("jobmap add {} {}", agent_uid, job_alias);
let assigned_uids = Panel::check_output(cmd);
dbg!(&assigned_uids);
loop {
let result = Panel::check_output(format!("jobmap list {}", assigned_uids[0]));
dbg!(&result);
match result.get(0) {
Some(entry) if entry["state"] == "Finished" => {
println!("{}", result[0]);
break;
}
None => {
eprintln!("jobmap list is empty (bad bad bad)");
continue;
}
_ => {
sleep(Duration::from_secs(1));
eprintln!("waiting for task");
}
}
}
Ok(()) Ok(())
} }

@ -2,7 +2,7 @@
use crate::{ use crate::{
config::{MASTER_PORT, MASTER_SERVER}, config::{MASTER_PORT, MASTER_SERVER},
messaging::{AsMsg, BaseMessage}, messaging::{AsMsg, BaseMessage},
models::*, models,
utils::{opt_to_string, VecDisplay}, utils::{opt_to_string, VecDisplay},
UError, UResult, UError, UResult,
}; };
@ -52,24 +52,24 @@ impl ClientHandler {
// //
// get jobs for client // get jobs for client
#[api_route("GET")] #[api_route("GET")]
fn get_agent_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<AssignedJob> {} fn get_agent_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<models::AssignedJob> {}
// //
// send something to server // send something to server
#[api_route("POST")] #[api_route("POST")]
fn report<M: AsMsg>(&self, payload: &M) -> Empty {} fn report<M: AsMsg>(&self, payload: &M) -> models::Empty {}
//##########// Admin area //##########// //##########// Admin area //##########//
/// client listing /// client listing
#[api_route("GET")] #[api_route("GET")]
fn get_agents(&self, url_param: Option<Uuid>) -> VecDisplay<Agent> {} fn get_agents(&self, url_param: Option<Uuid>) -> VecDisplay<models::Agent> {}
// //
// get all available jobs // get all available jobs
#[api_route("GET")] #[api_route("GET")]
fn get_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<JobMeta> {} fn get_jobs(&self, url_param: Option<Uuid>) -> VecDisplay<models::JobMeta> {}
// //
// create and upload job // create and upload job
#[api_route("POST")] #[api_route("POST")]
fn upload_jobs(&self, payload: &[JobMeta]) -> Empty {} fn upload_jobs(&self, payload: &[models::JobMeta]) -> models::Empty {}
// //
// delete something // delete something
#[api_route("GET")] #[api_route("GET")]
@ -77,5 +77,5 @@ impl ClientHandler {
// //
// set jobs for client // set jobs for client
#[api_route("POST")] #[api_route("POST")]
fn set_jobs(&self, url_param: Option<Uuid>, payload: &[Uuid]) -> Empty {} fn set_jobs(&self, url_param: Option<Uuid>, payload: &[String]) -> VecDisplay<Uuid> {}
} }

Loading…
Cancel
Save