integration tests beginning

14-integration-tests
plazmoid 3 years ago
parent 5bd1760d10
commit 5734145e8f
  1. 4
      .cargo/config.toml
  2. 1
      .env
  3. 1
      Cargo.toml
  4. 5
      bin/u_agent/Cargo.toml
  5. 3
      bin/u_agent/Dockerfile
  6. 88
      bin/u_agent/src/lib.rs
  7. 88
      bin/u_agent/src/main.rs
  8. 8
      bin/u_agent/tests/behaviour.rs
  9. 2
      bin/u_agent/tests/tests.rs
  10. 1
      bin/u_panel/Cargo.toml
  11. 4
      bin/u_server/Cargo.toml
  12. 3
      bin/u_server/Dockerfile
  13. 25
      bin/u_server/src/db.rs
  14. 204
      bin/u_server/src/handlers.rs
  15. 153
      bin/u_server/src/lib.rs
  16. 123
      bin/u_server/src/main.rs
  17. 2
      integration/.env
  18. 67
      integration/docker-compose.yml
  19. 76
      integration/docker.py
  20. 61
      integration/docker_compose.py
  21. 33
      integration/integration_tests.py
  22. 5
      integration/integration_tests.sh
  23. 3
      integration/tests_runner.Dockerfile
  24. 32
      integration/utils.py
  25. 2
      lib/u_lib/Cargo.toml
  26. 14
      lib/u_lib/build.rs
  27. 169
      lib/u_lib/src/builder.rs
  28. 3
      lib/u_lib/src/lib.rs
  29. 2
      lib/u_lib/src/messaging.rs
  30. 48
      lib/u_lib/src/models/jobs/misc.rs
  31. 17
      lib/u_lib/tests/api_macro.rs
  32. BIN
      lib/u_lib/tests/fixtures/echoer
  33. 161
      lib/u_lib/tests/jobs/execution.rs
  34. 43
      lib/u_lib/tests/jobs/output.rs
  35. 10
      lib/u_lib/tests/tests.rs
  36. 10
      scripts/cargo_musl.sh

@ -1,3 +1,3 @@
[build] [build]
target = "x86_64-unknown-linux-gnu" # -musl" target = "x86_64-unknown-linux-musl"
rustflags = ["-C", "target-feature=+crt-static"]

@ -1 +0,0 @@
export DATABASE_URL=postgres://postgres:12348756@172.17.0.2/u_db

@ -14,4 +14,3 @@ panic = "abort"
[profile.dev] [profile.dev]
debug = true # Добавляет флаг `-g` для компилятора; debug = true # Добавляет флаг `-g` для компилятора;
opt-level = 0 opt-level = 0

@ -13,4 +13,9 @@ log = "^0.4"
env_logger = "0.8.3" env_logger = "0.8.3"
uuid = "0.6.5" uuid = "0.6.5"
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
openssl = { version = "0.10.32", features = ["vendored"] }
u_lib = { version = "*", path = "../../lib/u_lib" } u_lib = { version = "*", path = "../../lib/u_lib" }
[[test]]
name = "integration"
path = "tests/tests.rs"

@ -0,0 +1,3 @@
FROM centos:7
CMD yum update

@ -0,0 +1,88 @@
// TODO:
// поддержка питона
// резолв адреса управляющего сервера через DoT
// кроссплатформенность (реализовать интерфейс для винды и никсов)
// проверка обнов
// самоуничтожение
#[macro_use]
extern crate log;
extern crate env_logger;
use std::env;
use tokio::time::{sleep, Duration};
use u_lib::{
api::ClientHandler,
builder::JobBuilder,
cache::JobCache,
executor::pop_completed,
models::{AssignedJob, ExecResult},
UID,
//daemonize
};
#[macro_export]
macro_rules! retry_until_ok {
( $body:expr ) => {
loop {
match $body {
Ok(r) => break r,
Err(e) => error!("{:?}", e),
};
sleep(Duration::from_secs(5)).await;
}
};
}
pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) {
if job_requests.len() > 0 {
for jr in &job_requests {
if !JobCache::contains(&jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await)
.pop()
.unwrap();
JobCache::insert(fetched_job);
}
}
info!(
"Scheduling jobs: \n{}",
job_requests
.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join("\n")
);
let mut builder = JobBuilder::from_request(job_requests);
let errors = builder.pop_errors();
if errors.len() > 0 {
error!(
"Some errors encountered: \n{}",
errors
.iter()
.map(|j| j.to_string())
.collect::<Vec<String>>()
.join("\n")
);
}
builder.unwrap_one().spawn().await;
}
}
pub async fn run_forever() {
//daemonize();
env_logger::init();
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip);
info!("Connecting to the server");
loop {
let job_requests: Vec<AssignedJob> =
retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await);
process_request(job_requests, &instance).await;
let result: Vec<ExecResult> = pop_completed().await.into_iter().collect();
if result.len() > 0 {
retry_until_ok!(instance.report(&result).await)
}
sleep(Duration::from_secs(5)).await;
}
}

@ -1,89 +1,7 @@
// TODO: use tokio;
// поддержка питона use u_agent::run_forever;
// резолв адреса управляющего сервера через DoT
// кроссплатформенность (реализовать интерфейс для винды и никсов)
// проверка обнов
// самоуничтожение
#[macro_use]
extern crate log;
extern crate env_logger;
use std::env;
use tokio::time::{sleep, Duration};
use u_lib::{
api::ClientHandler,
builder::JobBuilder,
cache::JobCache,
executor::pop_completed,
models::{AssignedJob, ExecResult},
UID,
//daemonize
};
#[macro_export]
macro_rules! retry_until_ok {
( $body:expr ) => {
loop {
match $body {
Ok(r) => break r,
Err(e) => error!("{:?}", e),
};
sleep(Duration::from_secs(5)).await;
}
};
}
async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) {
if job_requests.len() > 0 {
for jr in &job_requests {
if !JobCache::contains(&jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job = retry_until_ok!(client.get_jobs(Some(jr.job_id)).await)
.pop()
.unwrap();
JobCache::insert(fetched_job);
}
}
info!(
"Scheduling jobs: \n{}",
job_requests
.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join("\n")
);
let mut builder = JobBuilder::from_request(job_requests);
let errors = builder.pop_errors();
if errors.len() > 0 {
error!(
"Some errors encountered: \n{}",
errors
.iter()
.map(|j| j.to_string())
.collect::<Vec<String>>()
.join("\n")
);
}
builder.unwrap_one().spawn().await;
}
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
//daemonize(); run_forever().await;
env_logger::init();
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip);
info!("Connecting to the server");
loop {
let job_requests: Vec<AssignedJob> =
retry_until_ok!(instance.get_agent_jobs(Some(*UID)).await);
process_request(job_requests, &instance).await;
let result: Vec<ExecResult> = pop_completed().await.into_iter().collect();
if result.len() > 0 {
retry_until_ok!(instance.report(&result).await)
}
sleep(Duration::from_secs(5)).await;
}
} }

@ -0,0 +1,8 @@
use u_agent::process_request;
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
#[tokio::test]
async fn test_first_connection() -> TestResult {
Ok(())
}

@ -0,0 +1,2 @@
extern crate u_agent;
mod behaviour;

@ -13,4 +13,5 @@ log = "^0.4"
env_logger = "0.7.1" env_logger = "0.7.1"
uuid = "0.6.5" uuid = "0.6.5"
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
openssl = { version = "0.10.32", features = ["vendored"] }
u_lib = { version = "*", path = "../../lib/u_lib" } u_lib = { version = "*", path = "../../lib/u_lib" }

@ -13,6 +13,8 @@ warp = "0.2.4"
uuid = { version = "0.6.5", features = ["serde", "v4"] } uuid = { version = "0.6.5", features = ["serde", "v4"] }
once_cell = "1.7.2" once_cell = "1.7.2"
hyper = "0.13.10" hyper = "0.13.10"
mockall = "0.9.1"
mockall_double = "0.2"
[dependencies.diesel] [dependencies.diesel]
features = ["postgres", "uuid"] features = ["postgres", "uuid"]
@ -30,3 +32,5 @@ version = "0.2.22"
path = "../../lib/u_lib" path = "../../lib/u_lib"
version = "*" version = "*"
[dev-dependencies]
test-case = "1.1.0"

@ -0,0 +1,3 @@
FROM centos:7
CMD yum update

@ -17,19 +17,20 @@ pub struct UDB {
static DB: OnceCell<Arc<Mutex<UDB>>> = OnceCell::new(); static DB: OnceCell<Arc<Mutex<UDB>>> = OnceCell::new();
pub fn lock_db() -> MutexGuard<'static, UDB> { #[cfg_attr(test, automock)]
DB.get_or_init(|| {
dotenv().unwrap();
let db_path = env::var("DATABASE_URL").unwrap();
let conn = PgConnection::establish(&db_path).unwrap();
let instance = UDB { conn };
Arc::new(Mutex::new(instance))
})
.lock()
.unwrap()
}
impl UDB { impl UDB {
pub fn lock_db() -> MutexGuard<'static, UDB> {
DB.get_or_init(|| {
dotenv().unwrap();
let db_path = env::var("DATABASE_URL").unwrap();
let conn = PgConnection::establish(&db_path).unwrap();
let instance = UDB { conn };
Arc::new(Mutex::new(instance))
})
.lock()
.unwrap()
}
pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> { pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> {
use schema::jobs; use schema::jobs;
diesel::insert_into(jobs::table) diesel::insert_into(jobs::table)

@ -1,4 +1,4 @@
use crate::db::{lock_db, UDB}; use crate::db::UDB;
use diesel::SaveChangesDsl; use diesel::SaveChangesDsl;
use hyper::Body; use hyper::Body;
use serde::Serialize; use serde::Serialize;
@ -13,130 +13,142 @@ use warp::{
Rejection, Reply, Rejection, Reply,
}; };
fn build_response<S: Into<Body>>(code: StatusCode, body: S) -> Response<Body> { pub fn build_response<S: Into<Body>>(code: StatusCode, body: S) -> Response<Body> {
Response::builder().status(code).body(body.into()).unwrap() Response::builder().status(code).body(body.into()).unwrap()
} }
fn build_ok<S: Into<Body>>(body: S) -> Response<Body> { pub fn build_ok<S: Into<Body>>(body: S) -> Response<Body> {
build_response(StatusCode::OK, body) build_response(StatusCode::OK, body)
} }
fn build_err<S: ToString>(body: S) -> Response<Body> { pub fn build_err<S: ToString>(body: S) -> Response<Body> {
build_response(StatusCode::BAD_REQUEST, body.to_string()) build_response(StatusCode::BAD_REQUEST, body.to_string())
} }
fn build_message<M: AsMsg + Serialize>(m: M) -> Response<Body> { pub fn build_message<M: AsMsg + Serialize>(m: M) -> Response<Body> {
warp::reply::json(&m.as_message()).into_response() warp::reply::json(&m.as_message()).into_response()
} }
pub async fn add_agent(msg: Agent) -> Result<impl Reply, Rejection> { pub struct Endpoints;
debug!("hnd: add_agent");
lock_db()
.insert_agent(&msg)
.map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_agents(uid: Option<Uuid>) -> Result<impl Reply, Rejection> { #[cfg_attr(test, automock)]
debug!("hnd: get_agents"); impl Endpoints {
lock_db() pub async fn add_agent(msg: Agent) -> Result<Response<Body>, Rejection> {
.get_agents(uid) debug!("hnd: add_agent");
.map(|m| build_message(m)) UDB::lock_db()
.or_else(|e| Ok(build_err(e))) .insert_agent(&msg)
} .map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_jobs(uid: Option<Uuid>) -> Result<impl Reply, Rejection> { pub async fn get_agents(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
debug!("hnd: get_jobs"); debug!("hnd: get_agents");
lock_db() UDB::lock_db()
.get_jobs(uid) .get_agents(uid)
.map(|m| build_message(m)) .map(|m| build_message(m))
.or_else(|e| Ok(build_err(e))) .or_else(|e| Ok(build_err(e)))
} }
pub async fn get_agent_jobs(uid: Option<Uuid>, personal: bool) -> Result<impl Reply, Rejection> { pub async fn get_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
info!("hnd: get_agent_jobs {}", personal); debug!("hnd: get_jobs");
if personal { UDB::lock_db()
let agents = lock_db().get_agents(uid).unwrap(); .get_jobs(uid)
if agents.len() == 0 { .map(|m| build_message(m))
let db = lock_db(); .or_else(|e| Ok(build_err(e)))
db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap(); }
let job = db.find_job_by_alias("agent_hello").unwrap();
if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) { pub async fn get_agent_jobs(
return Ok(build_err(e)); uid: Option<Uuid>,
personal: bool,
) -> Result<Response<Body>, Rejection> {
info!("hnd: get_agent_jobs {}", personal);
if personal {
let agents = UDB::lock_db().get_agents(uid).unwrap();
if agents.len() == 0 {
let db = UDB::lock_db();
db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap();
let job = db.find_job_by_alias("agent_hello").unwrap();
if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) {
return Ok(build_err(e));
}
} }
} }
} let result = UDB::lock_db().get_exact_jobs(uid, personal);
let result = lock_db().get_exact_jobs(uid, personal); match result {
match result { Ok(r) => {
Ok(r) => { let db = UDB::lock_db();
let db = lock_db(); for j in r.iter() {
for j in r.iter() { db.update_job_status(j.id, JobState::Running).ok();
db.update_job_status(j.id, JobState::Running).ok(); }
Ok(build_message(r))
} }
Ok(build_message(r)) Err(e) => Ok(build_err(e)),
} }
Err(e) => Ok(build_err(e)),
} }
}
pub async fn upload_jobs(msg: BaseMessage<'_, Vec<JobMeta>>) -> Result<impl Reply, Rejection> { pub async fn upload_jobs(
debug!("hnd: upload_jobs"); msg: BaseMessage<'static, Vec<JobMeta>>,
lock_db() ) -> Result<Response<Body>, Rejection> {
.insert_jobs(&msg.into_inner()) debug!("hnd: upload_jobs");
.map(|_| build_ok("")) UDB::lock_db()
.or_else(|e| Ok(build_err(e))) .insert_jobs(&msg.into_inner())
} .map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e)))
}
pub async fn del(uid: Uuid) -> Result<impl Reply, Rejection> { pub async fn del(uid: Uuid) -> Result<Response<Body>, Rejection> {
debug!("hnd: del"); debug!("hnd: del");
let db = lock_db(); let db = UDB::lock_db();
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns { for del_fn in del_fns {
let affected = del_fn(&db, &vec![uid]).unwrap(); let affected = del_fn(&db, &vec![uid]).unwrap();
if affected > 0 { if affected > 0 {
return Ok(build_ok(affected.to_string())); return Ok(build_ok(affected.to_string()));
}
} }
Ok(build_err("0"))
} }
Ok(build_err("0"))
}
pub async fn set_jobs( pub async fn set_jobs(
agent_uid: Uuid, agent_uid: Uuid,
msg: BaseMessage<'_, Vec<Uuid>>, msg: BaseMessage<'static, Vec<Uuid>>,
) -> Result<impl Reply, Rejection> { ) -> Result<Response<Body>, Rejection> {
debug!("hnd: set_jobs"); debug!("hnd: set_jobs");
lock_db() UDB::lock_db()
.set_jobs_for_agent(&agent_uid, &msg.into_inner()) .set_jobs_for_agent(&agent_uid, &msg.into_inner())
.map(|_| build_ok("")) .map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e))) .or_else(|e| Ok(build_err(e)))
} }
pub async fn report(msg: BaseMessage<'_, Vec<ExecResult>>) -> Result<impl Reply, Rejection> { pub async fn report(
debug!("hnd: report"); msg: BaseMessage<'static, Vec<ExecResult>>,
let id = msg.id; ) -> Result<Response<Body>, Rejection> {
let mut failed = vec![]; debug!("hnd: report");
for entry in msg.into_inner() { let id = msg.id;
match entry { let mut failed = vec![];
ExecResult::Assigned(res) => { for entry in msg.into_inner() {
if id != res.agent_id { match entry {
continue; ExecResult::Assigned(res) => {
if id != res.agent_id {
continue;
}
let db = UDB::lock_db();
if let Err(e) = res
.save_changes::<AssignedJob>(&db.conn)
.map_err(ULocalError::from)
{
failed.push(e.to_string())
}
} }
let db = lock_db(); ExecResult::Agent(a) => {
if let Err(e) = res Self::add_agent(a).await?;
.save_changes::<AssignedJob>(&db.conn)
.map_err(ULocalError::from)
{
failed.push(e.to_string())
} }
} }
ExecResult::Agent(a) => {
add_agent(a).await?;
}
} }
if failed.len() > 0 {
let err_msg = ULocalError::ProcessingError(failed.join(", "));
return Ok(build_err(err_msg));
}
Ok(build_ok(""))
} }
if failed.len() > 0 {
let err_msg = ULocalError::ProcessingError(failed.join(", "));
return Ok(build_err(err_msg));
}
Ok(build_ok(""))
} }

@ -0,0 +1,153 @@
mod db;
mod handlers;
use warp::{body, Filter, Rejection, Reply};
#[macro_use]
extern crate log;
extern crate env_logger;
#[macro_use]
extern crate mockall;
#[macro_use]
extern crate mockall_double;
use db::UDB;
#[double]
use handlers::Endpoints;
use serde::de::DeserializeOwned;
use u_lib::{
config::MASTER_PORT,
messaging::{AsMsg, BaseMessage},
models::*,
};
use uuid::Uuid;
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
}
fn prefill_jobs() {
let agent_hello = JobMeta::builder()
.with_type(misc::JobType::Manage)
.with_alias("agent_hello")
.build()
.unwrap();
UDB::lock_db().insert_jobs(&[agent_hello]).ok();
}
fn init() {
env_logger::init();
prefill_jobs();
}
fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) };
let get_agents = warp::get()
.and(warp::path("get_agents"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_agents);
let upload_jobs = warp::post()
.and(warp::path("upload_jobs"))
.and(get_content::<Vec<JobMeta>>())
.and_then(Endpoints::upload_jobs);
let get_jobs = warp::get()
.and(warp::path("get_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_jobs);
let get_agent_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(|uid| Endpoints::get_agent_jobs(uid, false));
let get_personal_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(warp::path::param::<Uuid>().map(Some))
.and_then(|uid| Endpoints::get_agent_jobs(uid, true));
let del = warp::get()
.and(warp::path("del"))
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::del);
let set_jobs = warp::post()
.and(warp::path("set_jobs"))
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<Uuid>>())
.and_then(Endpoints::set_jobs);
let report = warp::post()
.and(warp::path("report"))
.and(get_content::<Vec<ExecResult>>().and_then(Endpoints::report));
let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report);
let auth_zone = auth_token.and(
get_agents
.or(get_jobs)
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs),
);
agent_zone.or(auth_zone)
}
pub async fn serve() {
init();
let routes = make_filters();
warp::serve(routes.with(warp::log("warp")))
.run(([0, 0, 0, 0], MASTER_PORT))
.await;
}
#[cfg(test)]
mod tests {
use super::*;
use handlers::build_ok;
use mockall::predicate::*;
use test_case::test_case;
use warp::test::request;
#[test_case(Some(Uuid::new_v4()))]
#[test_case(None => panics)]
#[tokio::test]
async fn test_get_agent_jobs_unauthorized(uid: Option<Uuid>) {
let mock = Endpoints::get_agent_jobs_context();
mock.expect()
.with(eq(uid), eq(uid.is_some()))
.returning(|_, _| Ok(build_ok("")));
request()
.path(&format!(
"/get_agent_jobs/{}",
uid.map(|u| u.simple().to_string()).unwrap_or(String::new())
))
.method("GET")
.filter(&make_filters())
.await
.unwrap();
mock.checkpoint()
}
}

@ -1,125 +1,6 @@
mod db; use u_server::serve;
mod handlers;
use warp::{body, Filter, Rejection};
#[macro_use]
extern crate log;
extern crate env_logger;
use db::lock_db;
use serde::de::DeserializeOwned;
use u_lib::{
config::MASTER_PORT,
messaging::{AsMsg, BaseMessage},
models::*,
};
use uuid::Uuid;
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
}
fn prefill_jobs() {
let agent_hello = JobMeta::builder()
.with_type(misc::JobType::Manage)
.with_alias("agent_hello")
.build()
.unwrap();
lock_db().insert_jobs(&[agent_hello]).ok();
}
fn init() {
env_logger::init();
prefill_jobs();
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
init(); serve().await;
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) };
let get_agents = warp::get()
.and(warp::path("get_agents"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(handlers::get_agents);
let upload_jobs = warp::post()
.and(warp::path("upload_jobs"))
.and(get_content::<Vec<JobMeta>>())
.and_then(handlers::upload_jobs);
let get_jobs = warp::get()
.and(warp::path("get_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(handlers::get_jobs);
let get_agent_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(|uid| handlers::get_agent_jobs(uid, false));
let get_personal_jobs = warp::get()
.and(warp::path("get_agent_jobs"))
.and(warp::path::param::<Uuid>().map(Some))
.and_then(|uid| handlers::get_agent_jobs(uid, true));
let del = warp::get()
.and(warp::path("del"))
.and(warp::path::param::<Uuid>())
.and_then(handlers::del);
let set_jobs = warp::post()
.and(warp::path("set_jobs"))
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<Uuid>>())
.and_then(handlers::set_jobs);
let report = warp::post()
.and(warp::path("report"))
.and(get_content::<Vec<ExecResult>>().and_then(handlers::report));
let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report);
let auth_zone = auth_token.and(
get_agents
.or(get_jobs)
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs),
);
let routes = agent_zone.or(auth_zone);
warp::serve(routes.with(warp::log("warp")))
.run(([0, 0, 0, 0], MASTER_PORT))
.await;
}
/*
#[cfg(test)]
mod tests {
use super::*;
/*
#[tokio::test]
async fn test_gather() {
}
*/
} }
*/

@ -0,0 +1,2 @@
DATABASE_URL=postgres://postgres:12348756@u_db/u_db
PG_PASSWORD=12348756

@ -0,0 +1,67 @@
version: "2.1"
networks:
u_net:
ipam:
config:
- subnet: 10.16.0.0/24
services:
u_server:
image: unki/u_server
networks:
u_net:
ipv4_address: 10.16.0.200
volumes:
- ../target/x86_64-unknown-linux-musl/release/u_server:/u_server
- ./.env:/.env
command: /u_server
depends_on:
u_db:
condition: service_started
expose:
- '63714'
environment:
RUST_LOG: warp
u_db:
image: postgres:13.3
networks:
- u_net
expose:
- '5432'
environment:
- POSTGRES_PASSWORD=${PG_PASSWORD}
u_agent_1:
image: unki/u_agent
networks:
- u_net
volumes:
- ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent
command: /u_agent 10.16.0.200
depends_on:
u_server:
condition: service_started
u_agent_2:
image: unki/u_agent
networks:
- u_net
volumes:
- ../target/x86_64-unknown-linux-musl/release/u_agent:/u_agent
command: /u_agent 10.16.0.200
depends_on:
u_server:
condition: service_started
tests_runner:
image: unki/tests_runner
networks:
- u_net
volumes:
- ../:/unki/
depends_on:
u_server:
condition: service_started

@ -0,0 +1,76 @@
import subprocess
from utils import *
DOCKERFILES = {
'u_agent': {
'ctx': '../bin/u_agent',
},
'u_server': {
'ctx': '../bin/u_server'
},
'tests_runner': {
'ctx': './',
'dockerfile_prefix': 'tests_runner'
},
}
def docker(args):
try:
cmd = ['docker'] + args
#log(f'Running docker cmd: {cmd}')
return subprocess.check_output(
cmd
)
except subprocess.CalledProcessError as e:
err(str(e))
raise
def print_errors(errors):
err_msg = '\n'.join(
' {container}: {error}'.format(container=item['container'],
error=item['error'])
for item in errors)
err('There are some errors in next containers:\n%s' % err_msg)
def check_state(containers):
errors = []
for container in containers:
ret, out = subprocess.getstatusoutput(
'docker inspect --format \'{{ .State.Running }}\' %s'
% container)
out = out.strip()
if ret == 0:
if out == 'true':
continue
else:
errors.append({'container': container,
'error': 'Bad state: Running=%s' % out})
else:
errors.append({'container': container,
'error': out})
return errors
def rebuild_images_if_needed(force_rebuild=False):
for img_name, data in DOCKERFILES.items():
ctx = data['ctx']
df_prefix = data.get('dockerfile_prefix')
df_suffix = 'Dockerfile'
img_name = f'unki/{img_name}'
log(f'Building docker image {img_name}')
cmd = [
'build',
'-t',
img_name,
ctx,
]
if df_prefix:
cmd += ['-f', f'{ctx}/{df_prefix}.{df_suffix}']
if force_rebuild:
cmd += ['--no-cache']
docker(cmd)

@ -0,0 +1,61 @@
import subprocess
from utils import *
from docker import docker, check_state, print_errors
class Compose:
ALL_CONTAINERS = [
'u_agent_1',
'u_agent_2',
'u_server',
'u_db',
'tests_runner',
]
def __init__(self):
self.cmd_container = 'tests_runner'
def _call(self, *args):
subprocess.check_call([
'docker-compose',
'--no-ansi',
] + list(args)
)
def up(self):
log('Instanciating cluster')
self._call('up')
log('Ok')
def down(self):
log('Shutting down cluster')
self._call('down')
log('Ok')
def stop(self):
log('Stopping cluster')
self._call('stop')
log('Ok')
def run(self, cmd):
container = self.cmd_container
log(f'Running command "{cmd}" in container {container}')
docker([
'exec',
'-i',
f"'{container}'",
f"'{cmd}'"
])
log('Ok')
def is_alive(self):
log('Check if all containers are alive')
errors = check_state(self.ALL_CONTAINERS)
log('Check done')
if errors:
print_errors(errors)
raise TestsError('Error during `is_alive` check')
else:
log('All containers are alive')

@ -0,0 +1,33 @@
import signal
import sys
from utils import *
from docker import rebuild_images_if_needed
from docker_compose import Compose
cluster = Compose()
def abort_handler(s, _):
warn(f'Received signal: {s}')
warn(f'Gracefully stopping...')
cluster.down()
def run_tests():
for s in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP):
signal.signal(s, abort_handler)
rebuild_images_if_needed()
try:
cluster.up()
cluster.is_alive()
cluster.run('cargo test --test integration')
except Exception as e:
err(e)
sys.exit(1)
finally:
cluster.down()
if __name__ == '__main__':
run_tests()

@ -0,0 +1,5 @@
#!/bin/bash
set -e
CARGO=cargo
$CARGO build --release --target x86_64-unknown-linux-musl
python integration_tests.py

@ -0,0 +1,3 @@
FROM rust:1.53
CMD rustup target add x86_64-unknown-linux-musl

@ -0,0 +1,32 @@
from termcolor import colored
__all__ = ['log', 'warn', 'err', 'TestsError']
class TestsError(Exception):
pass
COLORS = {
'question': colored('[?]', 'yellow'),
'info': colored('[~]', 'green'),
'warning': colored('[!]', 'magenta'),
'error': colored('[X]', 'red'),
}
def warn(msg):
log(msg, log_lvl='w')
def err(msg):
log(msg, log_lvl='e')
def log(msg, log_lvl='i'):
for lvl, text in COLORS.items():
if lvl.startswith(log_lvl):
print(f'{text} {msg}')
break
else:
ValueError('Unknown log level')

@ -14,10 +14,12 @@ libc = "^0.2"
lazy_static = "1.4.0" lazy_static = "1.4.0"
tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] } tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] }
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
openssl = { version = "0.10.32", features = ["vendored"] }
futures = "0.3.5" futures = "0.3.5"
guess_host_triple = "0.1.2" guess_host_triple = "0.1.2"
thiserror = "*" thiserror = "*"
log = "*" log = "*"
mockall = "0.9.1"
env_logger = "0.8.3" env_logger = "0.8.3"
diesel-derive-enum = { version = "1", features = ["postgres"] } diesel-derive-enum = { version = "1", features = ["postgres"] }
chrono = "0.4.19" chrono = "0.4.19"

@ -0,0 +1,14 @@
use std::path::PathBuf;
use std::process::Command;
fn main() {
let echoer = PathBuf::from("./tests/fixtures/echoer");
let mut echoer_src = echoer.clone();
echoer_src.set_extension("rs");
Command::new("rustc")
.args(&[echoer_src.to_str().unwrap(), "-o", echoer.to_str().unwrap()])
.status()
.unwrap();
println!("cargo:rerun-if-changed={}", echoer_src.display());
println!("cargo:rerun-if-changed={}", echoer.display());
}

@ -144,3 +144,172 @@ impl NamedJobBuilder {
self.pop_opt(name).unwrap() self.pop_opt(name).unwrap()
} }
} }
#[cfg(test)]
mod tests {
use test_case::test_case;
use std::{time::SystemTime};
use crate::{
errors::UError,
models::{
jobs::{JobMeta},
ExecResult,
misc::JobType
},
builder::{JobBuilder, NamedJobBuilder},
unwrap_enum,
};
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
#[test_case(
"/bin/sh {}",
Some(b"echo test01 > /tmp/asd; cat /tmp/asd"),
"test01"
;"sh payload"
)]
#[test_case(
r#"/usr/bin/python -c 'print("test02")'"#,
None,
"test02"
;"python cmd"
)]
#[test_case(
"/{}",
Some(
br#"#!/bin/sh
TMPPATH=/tmp/lol
mkdir -p $TMPPATH
echo test03 > $TMPPATH/t
cat $TMPPATH/t"#
),
"test03"
;"sh multiline payload"
)]
#[test_case(
"/{} 'some msg as arg'",
Some(include_bytes!("../tests/fixtures/echoer")),
"some msg as arg"
;"standalone binary with args"
)]
#[tokio::test]
async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult {
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let job = job.build().unwrap();
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let result = unwrap_enum!(job_result, ExecResult::Assigned);
let result = result.to_string_result().unwrap();
assert_eq!(result.trim(), expected_result);
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> TestResult {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await;
let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one()
.wait_one()
.await;
let ls = unwrap_enum!(ls, ExecResult::Assigned);
assert_eq!(ls.retcode.unwrap(), 0);
let folders = ls.to_string_result().unwrap();
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap())
.collect();
let ls_subfolders = JobBuilder::from_meta(subfolders_jobs)
.unwrap_one()
.wait()
.await;
for result in ls_subfolders {
let result = unwrap_enum!(result, ExecResult::Assigned);
assert_eq!(result.retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> TestResult {
let job = JobMeta::from_shell("lol_kek_puk")?;
let job_result = JobBuilder::from_meta(job)
.unwrap_one()
.wait_one()
.await;
let job_result = unwrap_enum!(job_result, ExecResult::Assigned);
let output = job_result.to_string_result().unwrap();
assert!(output.contains("No such file"));
assert!(job_result.retcode.is_none());
Ok(())
}
#[test_case(
"/bin/bash {}",
None,
"contains executable"
; "no binary"
)]
#[test_case(
"/bin/bash",
Some(b"whoami"),
"contains no executable"
; "no path to binary"
)]
#[tokio::test]
async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult {
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let err = job.build().unwrap_err();
let err_msg = unwrap_enum!(err, UError::JobArgsError);
assert!(err_msg.contains(err_str));
Ok(())
}
#[tokio::test]
async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobBuilder::from_meta(vec![
("sleeper", JobMeta::from_shell("sleep 3")?),
("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?)
]).wait().await;
let gathered = jobs.pop("gatherer");
assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None);
Ok(())
}
}

@ -26,3 +26,6 @@ extern crate diesel;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate env_logger; extern crate env_logger;
#[macro_use]
extern crate mockall;

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use uuid::Uuid; use uuid::Uuid;
pub struct Moo<'cow, T: Clone>(pub Cow<'cow, T>); pub struct Moo<'cow, T: AsMsg + Clone>(pub Cow<'cow, T>);
pub trait AsMsg: Clone + Serialize { pub trait AsMsg: Clone + Serialize {
fn as_message<'m>(&'m self) -> BaseMessage<'m, Self> fn as_message<'m>(&'m self) -> BaseMessage<'m, Self>

@ -136,3 +136,51 @@ impl JobOutput {
result result
} }
} }
#[cfg(test)]
mod tests {
use crate::{models::JobOutput, utils::vec_to_string};
use test_case::test_case;
const STDOUT: &str = "<***STDOUT***>";
const STDERR: &str = "<***STDERR***>";
#[test_case(
"lol",
"kek",
&format!("{}lol{}kek", STDOUT, STDERR)
;"stdout stderr"
)]
#[test_case(
"",
"kek",
&format!("{}kek", STDERR)
;"stderr"
)]
fn test_to_combined(stdout: &str, stderr: &str, result: &str) {
let output = JobOutput::new()
.stdout(stdout.as_bytes().to_vec())
.stderr(stderr.as_bytes().to_vec());
assert_eq!(&vec_to_string(&output.into_combined()), result)
}
#[test_case(
&format!("{}lal{}kik", STDOUT, STDERR),
"lal\nkik"
;"stdout stderr"
)]
#[test_case(
&format!("{}qeq", STDOUT),
"qeq"
;"stdout"
)]
#[test_case(
&format!("{}vev", STDERR),
"vev"
;"stderr"
)]
fn test_from_combined(src: &str, result: &str) {
let output = JobOutput::from_combined(src.as_bytes()).unwrap();
assert_eq!(vec_to_string(&output.to_appropriate()).trim(), result);
}
}

@ -1,17 +0,0 @@
/*
use std::fmt::Display;
use u_api_proc_macro::api_route;
use uuid::Uuid;
struct Paths;
struct ClientHandler;
#[test]
fn test_api_proc_macro() {
#[api_route("GET", Uuid)]
fn list<T: Display>(&self, msg: T) -> String {}
#[api_route("POST", Uuid)]
fn report<T: Display>(&self, msg: T) -> String {}
}
*/

Binary file not shown.

@ -1,161 +0,0 @@
use std::{time::SystemTime};
use u_lib::{
errors::UError,
models::{
jobs::{JobMeta},
ExecResult,
misc::JobType
},
builder::{JobBuilder, NamedJobBuilder}
};
type TestResult<R = ()> = Result<R, Box<dyn std::error::Error>>;
#[tokio::test]
async fn test_is_really_async() {
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let sleep_jobs: Vec<JobMeta> = (0..50).map(|_| job.clone()).collect();
let now = SystemTime::now();
JobBuilder::from_meta(sleep_jobs).unwrap_one().wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
}
#[test_case(
"/bin/sh {}",
Some(b"echo test01 > /tmp/asd; cat /tmp/asd"),
"test01"
;"sh payload"
)]
#[test_case(
r#"/usr/bin/python -c 'print("test02")'"#,
None,
"test02"
;"python cmd"
)]
#[test_case(
"/{}",
Some(
br#"#!/bin/sh
TMPPATH=/tmp/lol
mkdir -p $TMPPATH
echo test03 > $TMPPATH/t
cat $TMPPATH/t"#
),
"test03"
;"sh multiline payload"
)]
#[test_case(
"/{} 'some msg as arg'",
Some(include_bytes!("../fixtures/echoer")),
"some msg as arg"
;"standalone binary with args"
)]
#[tokio::test]
async fn test_shell_job(cmd: &str, payload: Option<&[u8]>, expected_result: &str) -> TestResult {
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let job = job.build().unwrap();
let job_result = JobBuilder::from_meta(job).unwrap_one().wait_one().await;
let result = unwrap_enum!(job_result, ExecResult::Assigned);
let result = result.to_string_result().unwrap();
assert_eq!(result.trim(), expected_result);
Ok(())
}
#[tokio::test]
async fn test_complex_load() -> TestResult {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = JobBuilder::from_meta(longest_job).unwrap_one().spawn().await;
let ls = JobBuilder::from_meta(JobMeta::from_shell("ls")?).unwrap_one()
.wait_one()
.await;
let ls = unwrap_enum!(ls, ExecResult::Assigned);
assert_eq!(ls.retcode.unwrap(), 0);
let folders = ls.to_string_result().unwrap();
let subfolders_jobs: Vec<JobMeta> = folders
.lines()
.map(|f| JobMeta::from_shell(format!("ls {}", f)).unwrap())
.collect();
let ls_subfolders = JobBuilder::from_meta(subfolders_jobs)
.unwrap_one()
.wait()
.await;
for result in ls_subfolders {
let result = unwrap_enum!(result, ExecResult::Assigned);
assert_eq!(result.retcode.unwrap(), 0);
}
longest_job.wait().await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
let job = JobMeta::from_shell("whoami");
let sleep_jobs: Vec<JobMeta> = (0..=REPEATS).map(|_| job.clone()).collect();
build_jobs(sleep_jobs).spawn().await;
let mut completed = 0;
while completed < REPEATS {
let c = pop_completed().await.len();
if c > 0 {
completed += c;
println!("{}", c);
}
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> TestResult {
let job = JobMeta::from_shell("lol_kek_puk")?;
let job_result = JobBuilder::from_meta(job)
.unwrap_one()
.wait_one()
.await;
let job_result = unwrap_enum!(job_result, ExecResult::Assigned);
let output = job_result.to_string_result().unwrap();
assert!(output.contains("No such file"));
assert!(job_result.retcode.is_none());
Ok(())
}
#[test_case(
"/bin/bash {}",
None,
"contains executable"
; "no binary"
)]
#[test_case(
"/bin/bash",
Some(b"whoami"),
"contains no executable"
; "no path to binary"
)]
#[tokio::test]
async fn test_job_building_failed(cmd: &str, payload: Option<&[u8]>, err_str: &str) -> TestResult {
let mut job = JobMeta::builder().with_shell(cmd);
if let Some(p) = payload {
job = job.with_payload(p);
}
let err = job.build().unwrap_err();
let err_msg = unwrap_enum!(err, UError::JobArgsError);
assert!(err_msg.contains(err_str));
Ok(())
}
#[tokio::test]
async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobBuilder::from_meta(vec![
("sleeper", JobMeta::from_shell("sleep 3")?),
("gatherer", JobMeta::builder().with_type(JobType::Manage).build()?)
]).wait().await;
let gathered = jobs.pop("gatherer");
assert_eq!(unwrap_enum!(gathered, ExecResult::Agent).alias, None);
Ok(())
}

@ -1,43 +0,0 @@
use u_lib::{models::JobOutput, utils::vec_to_string};
const STDOUT: &str = "<***STDOUT***>";
const STDERR: &str = "<***STDERR***>";
#[test_case(
"lol",
"kek",
&format!("{}lol{}kek", STDOUT, STDERR)
;"stdout stderr"
)]
#[test_case(
"",
"kek",
&format!("{}kek", STDERR)
;"stderr"
)]
fn test_to_combined(stdout: &str, stderr: &str, result: &str) {
let output = JobOutput::new()
.stdout(stdout.as_bytes().to_vec())
.stderr(stderr.as_bytes().to_vec());
assert_eq!(&vec_to_string(&output.into_combined()), result)
}
#[test_case(
&format!("{}lal{}kik", STDOUT, STDERR),
"lal\nkik"
;"stdout stderr"
)]
#[test_case(
&format!("{}qeq", STDOUT),
"qeq"
;"stdout"
)]
#[test_case(
&format!("{}vev", STDERR),
"vev"
;"stderr"
)]
fn test_from_combined(src: &str, result: &str) {
let output = JobOutput::from_combined(src.as_bytes()).unwrap();
assert_eq!(vec_to_string(&output.to_appropriate()).trim(), result);
}

@ -1,10 +0,0 @@
#[macro_use]
extern crate test_case;
#[macro_use]
extern crate u_lib;
mod jobs {
mod execution;
mod output;
}

@ -1,10 +0,0 @@
#!/bin/bash
set -e
source $(dirname $0)/rootdir.sh #set ROOTDIR
docker run \
-v $ROOTDIR:/volume \
-v cargo-cache:/root/.cargo/registry \
-w /volume \
-it \
clux/muslrust \
cargo $@
Loading…
Cancel
Save