improve api, add integration tests

pull/9/head
plazmoid 2 years ago
parent ce708d0c98
commit d0d7d0aca5
  1. 699
      Cargo.lock
  2. 17
      bin/u_agent/src/lib.rs
  3. 16
      bin/u_panel/src/argparse.rs
  4. 6
      bin/u_panel/src/gui/mod.rs
  5. 4
      bin/u_panel/src/main.rs
  6. 12
      bin/u_server/src/db.rs
  7. 4
      integration/Cargo.toml
  8. 8
      integration/tests/fixtures/agent.rs
  9. 10
      integration/tests/fixtures/connections.rs
  10. 46
      integration/tests/integration/api.rs
  11. 1
      integration/tests/integration/mod.rs
  12. 54
      lib/u_lib/src/api.rs
  13. 2
      lib/u_lib/src/messaging.rs
  14. 16
      lib/u_lib/src/misc.rs
  15. 6
      lib/u_lib/src/models/jobs/meta.rs
  16. 2
      lib/u_lib/src/models/jobs/misc.rs
  17. 2
      lib/u_lib/src/models/payload.rs

699
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -5,7 +5,7 @@ use std::process::exit;
use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};
use u_lib::{
api::ClientHandler,
api::HttpClient,
cache::JobCache,
config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL},
error::ErrChan,
@ -16,13 +16,14 @@ use u_lib::{
models::AssignedJobById,
};
pub async fn process_request(jobs: Vec<AssignedJobById>, client: &ClientHandler) {
pub async fn process_request(jobs: Vec<AssignedJobById>, client: &HttpClient) {
if !jobs.is_empty() {
for jr in &jobs {
if !JobCache::contains(jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job = loop {
match client.get_job(jr.job_id).await {
//todo: use payload cache
match client.get_job(jr.job_id, true).await {
Ok(result) => break result,
Err(err) => {
debug!("{:?} \nretrying...", err);
@ -58,12 +59,12 @@ pub async fn process_request(jobs: Vec<AssignedJobById>, client: &ClientHandler)
}
}
async fn error_reporting(client: ClientHandler) -> ! {
async fn error_reporting(client: HttpClient) -> ! {
loop {
match ErrChan::recv().await {
Some(err) => {
'retry: for _ in 0..3 {
match client.report(Reportable::Error(err.clone())).await {
match client.report(&Reportable::Error(err.clone())).await {
Ok(_) => break 'retry,
Err(e) => {
debug!("Reporting error: {:?}", e);
@ -77,7 +78,7 @@ async fn error_reporting(client: ClientHandler) -> ! {
}
}
async fn agent_loop(client: ClientHandler) -> ! {
async fn agent_loop(client: HttpClient) -> ! {
let self_id = get_self_id();
loop {
match client.get_personal_jobs(self_id).await {
@ -97,7 +98,7 @@ async fn agent_loop(client: ClientHandler) -> ! {
.collect();
if !result.is_empty() {
if let Err(err) = client.report(result).await {
if let Err(err) = client.report(&result).await {
ErrChan::send(err, "report").await;
}
}
@ -131,7 +132,7 @@ pub fn run_forever() -> ! {
.build()
.unwrap()
.block_on(async {
match ClientHandler::new(&env.u_server, None).await {
match HttpClient::new(&env.u_server, None).await {
Ok(client) => {
tokio::spawn(error_reporting(client.clone()));
agent_loop(client).await

@ -1,7 +1,7 @@
use serde_json::{from_str, to_value, Value};
use structopt::StructOpt;
use u_lib::{
api::ClientHandler,
api::HttpClient,
jobs::join_payload,
messaging::AsMsg,
models::{Agent, AssignedJob, RawJob},
@ -75,14 +75,14 @@ pub fn into_value<M: AsMsg>(data: M) -> Value {
to_value(data).unwrap()
}
pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult<Value> {
pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
let catcher: UResult<Value> = (|| async {
Ok(match args.cmd {
Cmd::Agents(action) => match action {
RUD::Read { id } => into_value(client.get_agents(id).await?),
RUD::Update { item } => {
let agent = from_str::<Agent>(&item)?;
into_value(client.update_agent(agent).await?)
into_value(client.update_agent(&agent).await?)
}
RUD::Delete { id } => into_value(client.del(id).await?),
},
@ -92,17 +92,17 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult<Value
let job = raw_job.validated()?;
let fat_job = join_payload(job)?;
into_value(client.upload_jobs(fat_job).await?)
into_value(client.upload_jobs(&fat_job).await?)
}
JobCRUD::RUD(RUD::Read { id }) => match id {
//todo: use vec not to break frontend api, possibly refactor later
Some(id) => into_value(vec![client.get_job(id).await?]),
Some(id) => into_value(vec![client.get_job(id, false).await?]),
None => into_value(client.get_jobs().await?),
},
JobCRUD::RUD(RUD::Update { item }) => {
let raw_job = from_str::<RawJob>(&item)?;
let job = raw_job.validated()?;
into_value(client.update_job(join_payload(job)?).await?)
into_value(client.update_job(&join_payload(job)?).await?)
}
JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
},
@ -110,11 +110,11 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult<Value
JobMapCRUD::Create {
agent_id,
job_idents,
} => into_value(client.set_jobs(agent_id, job_idents).await?),
} => into_value(client.set_jobs(agent_id, &job_idents).await?),
JobMapCRUD::RUD(RUD::Read { id }) => into_value(client.get_agent_jobs(id).await?),
JobMapCRUD::RUD(RUD::Update { item }) => {
let assigned = from_str::<AssignedJob>(&item)?;
into_value(client.update_result(assigned).await?)
into_value(client.update_result(&assigned).await?)
}
JobMapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
},

@ -8,7 +8,7 @@ use futures_util::StreamExt;
use rust_embed::RustEmbed;
use std::borrow::Cow;
use structopt::StructOpt;
use u_lib::{api::ClientHandler, unwrap_enum};
use u_lib::{api::HttpClient, unwrap_enum};
#[derive(RustEmbed)]
#[folder = "./src/gui/fe/dist/fe/"]
@ -42,7 +42,7 @@ async fn resources_adapter(path: web::Path<(String,)>) -> impl Responder {
#[post("/cmd/")]
async fn send_cmd(
mut body: web::Payload,
client: web::Data<ClientHandler>,
client: web::Data<HttpClient>,
) -> Result<impl Responder, Error> {
let mut bytes = web::BytesMut::new();
@ -74,7 +74,7 @@ async fn send_cmd(
Ok(response)
}
pub async fn serve(client: ClientHandler) -> anyhow::Result<()> {
pub async fn serve(client: HttpClient) -> anyhow::Result<()> {
info!("Connecting to u_server...");
client.ping().await?;

@ -7,14 +7,14 @@ extern crate tracing;
use anyhow::Result as AnyResult;
use argparse::{process_cmd, Args};
use structopt::StructOpt;
use u_lib::api::ClientHandler;
use u_lib::api::HttpClient;
use u_lib::config::AccessEnv;
use u_lib::logging::init_logger;
#[actix_web::main]
async fn main() -> AnyResult<()> {
let env = AccessEnv::load()?;
let client = ClientHandler::new(&env.u_server, Some(env.admin_auth_token)).await?;
let client = HttpClient::new(&env.u_server, Some(env.admin_auth_token)).await?;
let args = Args::from_args();
init_logger(None::<&str>);

@ -250,9 +250,15 @@ impl UDB<'_> {
}
pub fn upsert_agent(&mut self, agent: &Agent) -> Result<()> {
agent
.save_changes::<Agent>(self.conn)
.map_err(with_err_ctx(format!("Can't upsert agent {agent:?}")))?;
use schema::agents;
diesel::insert_into(agents::table)
.values(agent)
.on_conflict(agents::id)
.do_update()
.set(agent)
.execute(self.conn)
.map_err(with_err_ctx(format!("Can't insert agent {agent:?}")))?;
Ok(())
}

@ -10,14 +10,14 @@ edition = "2021"
futures = { version = "0.3", features = ["executor"] }
once_cell = "1.10.0"
reqwest = { workspace = true }
rstest = "0.12"
rstest = "0.17"
serde = { workspace = true }
serde_json = { workspace = true }
shlex = "1.0.0"
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "time"] }
tracing = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
u_lib = { path = "../lib/u_lib", features = ["panel"] }
u_lib = { path = "../lib/u_lib", features = ["panel", "server"] }
[[test]]

@ -1,7 +1,7 @@
use super::connections::*;
use super::run_async;
use u_lib::{
api::ClientHandler, config::get_self_id, jobs::split_payload, messaging::Reportable, models::*,
api::HttpClient, config::get_self_id, jobs::split_payload, messaging::Reportable, models::*,
types::Id,
};
@ -11,7 +11,7 @@ pub struct RegisteredAgent {
#[fixture]
#[once]
pub fn registered_agent(client: &ClientHandler) -> RegisteredAgent {
pub fn registered_agent(client: &HttpClient) -> RegisteredAgent {
run_async(async {
let agent_id = get_self_id();
println!("registering agent {agent_id}");
@ -22,12 +22,12 @@ pub fn registered_agent(client: &ClientHandler) -> RegisteredAgent {
.pop()
.unwrap();
let job_id = resp.job_id;
let job = client.get_job(job_id).await.unwrap();
let job = client.get_job(job_id, true).await.unwrap();
assert_eq!(job.job.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from((&split_payload(job).unwrap().job, resp));
agent_data.set_result(&Agent::with_id(agent_id));
client
.report(Reportable::Assigned(agent_data))
.report(&Reportable::Assigned(agent_data))
.await
.unwrap();
RegisteredAgent { id: agent_id }

@ -1,20 +1,20 @@
use super::env::*;
use super::run_async;
pub use u_lib::api::ClientHandler;
pub use u_lib::api::HttpClient;
use u_lib::db::unpooled;
pub use u_lib::db::PgConnection;
#[fixture]
#[once]
pub fn client(env_default: EndpointsEnv) -> ClientHandler {
run_async(ClientHandler::new(&env_default.u_server, None)).unwrap()
pub fn client(env_default: EndpointsEnv) -> HttpClient {
run_async(HttpClient::new(&env_default.u_server, None)).unwrap()
}
#[fixture]
#[once]
pub fn client_panel(env_access: AccessEnv) -> ClientHandler {
run_async(ClientHandler::new(
pub fn client_panel(env_access: AccessEnv) -> HttpClient {
run_async(HttpClient::new(
&env_access.u_server,
Some(env_access.admin_auth_token),
))

@ -1 +1,45 @@
//async fn
// get_personal_jobs(&self, url_param: Id)
// report(&self, payload: impl OneOrVec<messaging::Reportable>)
// dl(&self, file: String)
// get_job(&self, job: Id)
// get_jobs(&self)
// get_agents(&self, agent: Option<Id>)
// update_agent(&self, agent: Agent)
// update_job(&self, job: FatJob)
// update_result(&self, result: AssignedJob)
// upload_jobs(&self, payload: impl OneOrVec<FatJob>)
// del(&self, item: Id)
// set_jobs(&self, agent: Id, job_idents: impl OneOrVec<String>)
// get_agent_jobs(&self, agent: Option<Id>)
// ping(&self)
use crate::fixtures::connections::*;
use u_lib::jobs::join_payload;
use u_lib::models::RawJob;
#[rstest]
#[tokio::test]
async fn test_jobs_endpoints(client_panel: &HttpClient) {
let job_alias = "henlo";
let job = RawJob::builder()
.with_shell("echo henlo")
.with_alias(job_alias)
.build()
.unwrap();
let job_id = job.job.id;
let mut fat_job = join_payload(job).unwrap();
client_panel.upload_jobs(&fat_job).await.unwrap();
let fetched_job = client_panel.get_job(job_id, false).await.unwrap();
assert_eq!(fat_job, fetched_job);
fat_job.job.alias = Some("henlo2".to_string());
client_panel.update_job(&fat_job).await.unwrap();
let fetched_job = client_panel.get_job(job_id, false).await.unwrap();
assert_eq!(fat_job, fetched_job);
client_panel.del(job_id).await.unwrap();
client_panel.get_job(job_id, false).await.unwrap(); // should fail with 404
}

@ -1,2 +1,3 @@
mod api;
mod behaviour;
mod connection;

@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::{collections::HashMap, time::Duration};
use anyhow::{Context, Result};
use reqwest::{header, header::HeaderMap, Certificate, Client, Identity, Method, Url};
@ -11,7 +11,7 @@ use crate::{
config::{get_self_id, MASTER_PORT},
conv::opt_to_string,
messaging::{self, AsMsg},
misc::OneOrVec,
misc::OneOrVecRef,
models::*,
types::Id,
UError, UResult,
@ -21,12 +21,12 @@ const AGENT_IDENTITY: &[u8] = include_bytes!("../../../certs/alice.p12");
const ROOT_CA_CERT: &[u8] = include_bytes!("../../../certs/ca.crt");
#[derive(Clone, Debug)]
pub struct ClientHandler {
pub struct HttpClient {
base_url: Url,
client: Client,
}
impl ClientHandler {
impl HttpClient {
pub async fn new(server: &str, password: Option<String>) -> UResult<Self> {
let identity = Identity::from_pkcs12_der(AGENT_IDENTITY, "").unwrap();
let mut default_headers =
@ -41,7 +41,8 @@ impl ClientHandler {
let client = Client::builder()
.identity(identity)
.default_headers(HeaderMap::try_from(&default_headers).unwrap())
.add_root_certificate(Certificate::from_pem(ROOT_CA_CERT).unwrap());
.add_root_certificate(Certificate::from_pem(ROOT_CA_CERT).unwrap())
.timeout(Duration::from_secs(20));
let dns_response = Client::new()
.request(
@ -76,18 +77,18 @@ impl ClientHandler {
}
async fn req<R: AsMsg + DeserializeOwned>(&self, url: impl AsRef<str>) -> Result<R> {
self.req_with_payload(url, ()).await
self.req_with_payload(url, &()).await
}
async fn req_with_payload<P: AsMsg, R: AsMsg + DeserializeOwned>(
&self,
url: impl AsRef<str>,
payload: P,
payload: &P,
) -> Result<R> {
let request = self
.client
.post(self.base_url.join(url.as_ref()).unwrap())
.json(&payload);
.json(payload);
let response = request
.send()
@ -112,23 +113,24 @@ impl ClientHandler {
}
// get jobs for client
pub async fn get_personal_jobs(&self, url_param: Id) -> Result<Vec<AssignedJobById>> {
self.req(format!("get_personal_jobs/{}", url_param)).await
pub async fn get_personal_jobs(&self, agent_id: Id) -> Result<Vec<AssignedJobById>> {
self.req(format!("get_personal_jobs/{}", agent_id)).await
}
// send something to server
pub async fn report(&self, payload: impl OneOrVec<messaging::Reportable>) -> Result<()> {
self.req_with_payload("report", payload.into_vec()).await
pub async fn report(&self, payload: impl OneOrVecRef<messaging::Reportable>) -> Result<()> {
self.req_with_payload("report", &payload.as_vec()).await
}
// download file
pub async fn dl(&self, file: String) -> Result<Vec<u8>> {
// download payload
pub async fn dl(&self, file: &str) -> Result<Vec<u8>> {
self.req(format!("dl/{file}")).await
}
/// get exact job
pub async fn get_job(&self, job: Id) -> Result<FatJob> {
self.req(format!("get_job/{job}")).await
pub async fn get_job(&self, job: Id, force_payload: bool) -> Result<FatJob> {
self.req(format!("get_job/{job}?force_payload={force_payload}"))
.await
}
/// get all available jobs
@ -139,7 +141,7 @@ impl ClientHandler {
//##########// Admin area //##########//
#[cfg(feature = "panel")]
impl ClientHandler {
impl HttpClient {
/// agent listing
pub async fn get_agents(&self, agent: Option<Id>) -> Result<Vec<Agent>> {
self.req(format!("get_agents/{}", opt_to_string(agent)))
@ -147,23 +149,23 @@ impl ClientHandler {
}
/// update agent
pub async fn update_agent(&self, agent: Agent) -> Result<()> {
pub async fn update_agent(&self, agent: &Agent) -> Result<()> {
self.req_with_payload("update_agent", agent).await
}
/// update job
pub async fn update_job(&self, job: FatJob) -> Result<()> {
pub async fn update_job(&self, job: &FatJob) -> Result<()> {
self.req_with_payload("update_job", job).await
}
/// update result
pub async fn update_result(&self, result: AssignedJob) -> Result<()> {
pub async fn update_result(&self, result: &AssignedJob) -> Result<()> {
self.req_with_payload("update_result", result).await
}
/// create and upload job
pub async fn upload_jobs(&self, payload: impl OneOrVec<FatJob>) -> Result<Vec<Id>> {
self.req_with_payload("upload_jobs", payload.into_vec())
pub async fn upload_jobs(&self, payload: impl OneOrVecRef<FatJob>) -> Result<Vec<Id>> {
self.req_with_payload("upload_jobs", &payload.as_vec())
.await
}
@ -173,8 +175,12 @@ impl ClientHandler {
}
/// set jobs for any agent
pub async fn set_jobs(&self, agent: Id, job_idents: impl OneOrVec<String>) -> Result<Vec<Id>> {
self.req_with_payload(format!("set_jobs/{agent}"), job_idents.into_vec())
pub async fn set_jobs(
&self,
agent: Id,
job_idents: impl OneOrVecRef<String>,
) -> Result<Vec<Id>> {
self.req_with_payload(format!("set_jobs/{agent}"), &job_idents.as_vec())
.await
}

@ -23,6 +23,8 @@ impl AsMsg for () {}
impl<M: AsMsg> AsMsg for Vec<M> {}
impl<'msg, M: AsMsg> AsMsg for &'msg [M] {}
impl<M: AsMsg> AsMsg for &M {}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Reportable {
Assigned(AssignedJob),

@ -14,6 +14,22 @@ impl<T> OneOrVec<T> for Vec<T> {
}
}
pub trait OneOrVecRef<T> {
fn as_vec(&self) -> Vec<&T>;
}
impl<T> OneOrVecRef<T> for &T {
fn as_vec(&self) -> Vec<&T> {
vec![self]
}
}
impl<T> OneOrVecRef<T> for &Vec<T> {
fn as_vec(&self) -> Vec<&T> {
self.iter().collect()
}
}
#[macro_export]
macro_rules! unwrap_enum {
($src:expr, $t:path) => {

@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use std::fs::metadata;
use std::process::Command;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[cfg_attr(
feature = "server",
derive(Queryable, Identifiable, Insertable, AsChangeset),
@ -34,14 +34,14 @@ pub struct JobModel {
pub schedule: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FatJob {
pub job: JobModel,
pub payload_meta: Option<PayloadMeta>,
pub payload_data: Option<Vec<u8>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ThinJob {
pub job: JobModel,
pub payload_meta: Option<PayloadMeta>,

@ -26,7 +26,7 @@ pub enum JobState {
Finished,
}
#[derive(Default, Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Display)]
#[derive(Default, Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Display)]
#[cfg_attr(
feature = "server",
derive(DbEnum),

@ -11,7 +11,7 @@ use diesel::Identifiable;
derive(Insertable, Queryable, Identifiable),
diesel(table_name = payloads)
)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PayloadMeta {
pub id: Id,
pub mime_type: String,

Loading…
Cancel
Save