add readable fmt instead of hexlify every debug output

master
plazmoid 2 years ago
parent 4216f74876
commit a21bc40323
  1. 34
      Cargo.lock
  2. 14
      bin/u_panel/src/argparse.rs
  3. 32
      bin/u_server/src/db.rs
  4. 26
      bin/u_server/src/handlers.rs
  5. 26
      bin/u_server/src/u_server.rs
  6. 6
      images/u_agent.Dockerfile
  7. 2
      integration/docker-compose.yml
  8. 2
      integration/integration_tests.py
  9. 5
      integration/tests/fixtures/agent.rs
  10. 6
      integration/tests/helpers/panel.rs
  11. 4
      integration/tests/integration/behaviour.rs
  12. 2
      lib/u_lib/Cargo.toml
  13. 56
      lib/u_lib/src/api.rs
  14. 12
      lib/u_lib/src/cache.rs
  15. 7
      lib/u_lib/src/config.rs
  16. 17
      lib/u_lib/src/conv.rs
  17. 2
      lib/u_lib/src/lib.rs
  18. 4
      lib/u_lib/src/messaging/mod.rs
  19. 37
      lib/u_lib/src/models/agent.rs
  20. 70
      lib/u_lib/src/models/jobs/assigned.rs
  21. 44
      lib/u_lib/src/models/jobs/meta.rs
  22. 4
      lib/u_lib/src/models/jobs/misc.rs
  23. 12
      lib/u_lib/src/models/payload.rs
  24. 2
      lib/u_lib/src/types.rs
  25. 5
      lib/u_lib/src/ufs/mod.rs

34
Cargo.lock generated

@ -182,7 +182,7 @@ dependencies = [
"serde_urlencoded",
"smallvec",
"socket2",
"time 0.3.17",
"time 0.3.18",
"url",
]
@ -462,7 +462,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb"
dependencies = [
"percent-encoding",
"time 0.3.17",
"time 0.3.18",
"version_check",
]
@ -555,9 +555,9 @@ dependencies = [
[[package]]
name = "cxx"
version = "1.0.90"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90d59d9acd2a682b4e40605a242f6670eaa58c5957471cbf85e8aa6a0b97a5e8"
checksum = "86d3488e7665a7a483b57e25bdd90d0aeb2bc7608c8d0346acf2ad3f1caf1d62"
dependencies = [
"cc",
"cxxbridge-flags",
@ -567,9 +567,9 @@ dependencies = [
[[package]]
name = "cxx-build"
version = "1.0.90"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebfa40bda659dd5c864e65f4c9a2b0aff19bea56b017b9b77c73d3766a453a38"
checksum = "48fcaf066a053a41a81dfb14d57d99738b767febb8b735c3016e469fac5da690"
dependencies = [
"cc",
"codespan-reporting",
@ -582,15 +582,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
version = "1.0.90"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "457ce6757c5c70dc6ecdbda6925b958aae7f959bda7d8fb9bde889e34a09dc03"
checksum = "a2ef98b8b717a829ca5603af80e1f9e2e48013ab227b68ef37872ef84ee479bf"
[[package]]
name = "cxxbridge-macro"
version = "1.0.90"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebf883b7aacd7b2aeb2a7b338648ee19f57c140d4ee8e52c68979c6b2f7f2263"
checksum = "086c685979a698443656e5cf7856c95c642295a38599f12fb1ff76fb28d19892"
dependencies = [
"proc-macro2",
"quote",
@ -1257,9 +1257,9 @@ checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
[[package]]
name = "libflate"
version = "1.2.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093"
checksum = "97822bf791bd4d5b403713886a5fbe8bf49520fe78e323b0dc480ca1a03e50b0"
dependencies = [
"adler32",
"crc32fast",
@ -1268,9 +1268,9 @@ dependencies = [
[[package]]
name = "libflate_lz77"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a"
checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf"
dependencies = [
"rle-decode-fast",
]
@ -2345,9 +2345,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.17"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376"
checksum = "af0097eaf301d576d0b2aead7a59facab6d53cc636340f0291fab8446a2e8613"
dependencies = [
"itoa",
"serde",
@ -2509,7 +2509,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
dependencies = [
"crossbeam-channel",
"time 0.3.17",
"time 0.3.18",
"tracing-subscriber",
]

@ -2,12 +2,12 @@ use serde_json::{from_str, to_value, Value};
use structopt::StructOpt;
use u_lib::{
api::ClientHandler,
datatypes::PanelResult,
messaging::AsMsg,
models::{Agent, AssignedJob, FatJobMeta},
types::Id,
types::PanelResult,
UError, UResult,
};
use uuid::Uuid;
#[derive(StructOpt, Debug)]
pub struct Args {
@ -43,7 +43,7 @@ enum JobCmd {
enum JobMapCRUD {
Create {
#[structopt(parse(try_from_str = parse_uuid))]
agent_id: Uuid,
agent_id: Id,
job_idents: Vec<String>,
},
@ -55,19 +55,19 @@ enum JobMapCRUD {
enum RUD {
Read {
#[structopt(parse(try_from_str = parse_uuid))]
id: Option<Uuid>,
id: Option<Id>,
},
Update {
item: String,
},
Delete {
#[structopt(parse(try_from_str = parse_uuid))]
id: Uuid,
id: Id,
},
}
fn parse_uuid(src: &str) -> Result<Uuid, String> {
Uuid::parse_str(src).map_err(|e| e.to_string())
fn parse_uuid(src: &str) -> Result<Id, String> {
Id::parse_str(src).map_err(|e| e.to_string())
}
pub fn into_value<M: AsMsg>(data: M) -> Value {

@ -3,7 +3,7 @@ use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection
use u_lib::db::PgAsyncPool;
use u_lib::models::{schema, Agent, AssignedJob, JobState, ThinJobMeta};
use u_lib::platform::Platform;
use uuid::Uuid;
use u_lib::types::Id;
type Result<T> = std::result::Result<T, Error>;
@ -47,7 +47,7 @@ pub struct UDB<'c> {
}
impl UDB<'_> {
pub fn insert_jobs(&mut self, job_metas: &[ThinJobMeta]) -> Result<Vec<Uuid>> {
pub fn insert_jobs(&mut self, job_metas: &[ThinJobMeta]) -> Result<Vec<Id>> {
use schema::jobs;
diesel::insert_into(jobs::table)
@ -57,7 +57,7 @@ impl UDB<'_> {
.map_err(with_err_ctx("Can't insert jobs"))
}
pub fn get_job(&mut self, id: Uuid) -> Result<Option<ThinJobMeta>> {
pub fn get_job(&mut self, id: Id) -> Result<Option<ThinJobMeta>> {
use schema::jobs;
jobs::table
@ -94,7 +94,7 @@ impl UDB<'_> {
.do_update()
.set(agent)
.execute(self.conn)
.map_err(with_err_ctx(format!("Can't insert agent {agent:x?}")))?;
.map_err(with_err_ctx(format!("Can't insert agent {agent:?}")))?;
Ok(())
}
@ -104,11 +104,11 @@ impl UDB<'_> {
diesel::insert_into(results::table)
.values(result)
.execute(self.conn)
.map_err(with_err_ctx(format!("Can't insert result {result:x?}")))?;
.map_err(with_err_ctx(format!("Can't insert result {result:?}")))?;
Ok(())
}
pub fn get_agent(&mut self, id: Uuid) -> Result<Option<Agent>> {
pub fn get_agent(&mut self, id: Id) -> Result<Option<Agent>> {
use schema::agents;
agents::table
@ -126,7 +126,7 @@ impl UDB<'_> {
.map_err(with_err_ctx(format!("Can't get agents")))
}
pub fn update_job_status(&mut self, id: Uuid, status: JobState) -> Result<()> {
pub fn update_job_status(&mut self, id: Id, status: JobState) -> Result<()> {
use schema::results;
diesel::update(results::table)
@ -138,7 +138,7 @@ impl UDB<'_> {
}
//TODO: filters possibly could work in a wrong way, check
pub fn get_exact_jobs(&mut self, id: Option<Uuid>, personal: bool) -> Result<Vec<AssignedJob>> {
pub fn get_exact_jobs(&mut self, id: Option<Id>, personal: bool) -> Result<Vec<AssignedJob>> {
use schema::results;
let mut q = results::table.into_boxed();
@ -163,7 +163,7 @@ impl UDB<'_> {
Ok(result)
}
pub fn set_jobs_for_agent(&mut self, agent_id: Uuid, job_ids: &[Uuid]) -> Result<Vec<Uuid>> {
pub fn set_jobs_for_agent(&mut self, agent_id: Id, job_ids: &[Id]) -> Result<Vec<Id>> {
use schema::{jobs, results};
let agent_platform = match self.get_agent(agent_id)? {
@ -178,7 +178,7 @@ impl UDB<'_> {
let jobs_meta = jobs::table
.select((jobs::id, jobs::alias, jobs::platform))
.filter(jobs::id.eq_any(job_ids))
.load::<(Uuid, Option<String>, String)>(self.conn)
.load::<(Id, Option<String>, String)>(self.conn)
.map_err(with_err_ctx(format!("Can't find jobs {job_ids:?}")))?;
for meta in &jobs_meta {
@ -210,7 +210,7 @@ impl UDB<'_> {
Ok(job_requests.iter().map(|aj| aj.id).collect())
}
pub fn del_jobs(&mut self, ids: &[Uuid]) -> Result<usize> {
pub fn del_jobs(&mut self, ids: &[Id]) -> Result<usize> {
use schema::jobs;
let mut affected = 0;
@ -224,7 +224,7 @@ impl UDB<'_> {
Ok(affected)
}
pub fn del_results(&mut self, ids: &[Uuid]) -> Result<usize> {
pub fn del_results(&mut self, ids: &[Id]) -> Result<usize> {
use schema::results;
let mut affected = 0;
@ -238,7 +238,7 @@ impl UDB<'_> {
Ok(affected)
}
pub fn del_agents(&mut self, ids: &[Uuid]) -> Result<usize> {
pub fn del_agents(&mut self, ids: &[Id]) -> Result<usize> {
use schema::agents;
let mut affected = 0;
@ -255,13 +255,13 @@ impl UDB<'_> {
pub fn update_agent(&mut self, agent: &Agent) -> Result<()> {
agent
.save_changes::<Agent>(self.conn)
.map_err(with_err_ctx(format!("Can't update agent {agent:x?}")))?;
.map_err(with_err_ctx(format!("Can't update agent {agent:?}")))?;
Ok(())
}
pub fn update_job(&mut self, job: &ThinJobMeta) -> Result<()> {
job.save_changes::<ThinJobMeta>(self.conn)
.map_err(with_err_ctx(format!("Can't update job {job:x?}")))?;
.map_err(with_err_ctx(format!("Can't update job {job:?}")))?;
Ok(())
}
@ -272,7 +272,7 @@ impl UDB<'_> {
);
result
.save_changes::<AssignedJob>(self.conn)
.map_err(with_err_ctx(format!("Can't update result {result:x?}")))?;
.map_err(with_err_ctx(format!("Can't update result {result:?}")))?;
Ok(())
}
}

@ -3,13 +3,13 @@ use std::sync::Arc;
use crate::db::{PgRepo, UDB};
use crate::error::Error;
use crate::ValidJobMeta;
use u_lib::jobs::{fat_meta_to_thin, thin_meta_to_fat};
use u_lib::{
jobs::{fat_meta_to_thin, thin_meta_to_fat},
messaging::{AsMsg, Reportable},
misc::OneOrVec,
models::*,
types::Id
};
use uuid::Uuid;
use warp::reject::not_found;
use warp::Rejection;
@ -18,7 +18,7 @@ type EndpResult<T> = Result<T, Rejection>;
pub struct Endpoints;
impl Endpoints {
pub async fn get_agents(repo: Arc<PgRepo>, id: Option<Uuid>) -> EndpResult<Vec<Agent>> {
pub async fn get_agents(repo: Arc<PgRepo>, id: Option<Id>) -> EndpResult<Vec<Agent>> {
repo.interact(move |mut db| {
Ok(match id {
Some(id) => {
@ -35,7 +35,7 @@ impl Endpoints {
.map_err(From::from)
}
pub async fn get_job(repo: Arc<PgRepo>, id: Uuid) -> EndpResult<ValidJobMeta> {
pub async fn get_job(repo: Arc<PgRepo>, id: Id) -> EndpResult<ValidJobMeta> {
let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else {
return Err(not_found())
};
@ -52,14 +52,14 @@ impl Endpoints {
pub async fn get_agent_jobs(
repo: Arc<PgRepo>,
id: Option<Uuid>,
id: Option<Id>,
) -> EndpResult<Vec<AssignedJob>> {
repo.interact(move |mut db| db.get_exact_jobs(id, false))
.await
.map_err(From::from)
}
pub async fn get_personal_jobs(repo: Arc<PgRepo>, id: Uuid) -> EndpResult<Vec<AssignedJob>> {
pub async fn get_personal_jobs(repo: Arc<PgRepo>, id: Id) -> EndpResult<Vec<AssignedJob>> {
repo.transaction(move |mut db| {
let agent = db.get_agent(id)?;
match agent {
@ -95,7 +95,7 @@ impl Endpoints {
pub async fn upload_jobs(
repo: Arc<PgRepo>,
msg: Vec<ValidJobMeta>,
) -> EndpResult<Vec<Uuid>> {
) -> EndpResult<Vec<Id>> {
let jobs = msg
.into_iter()
.map(|meta| Ok(fat_meta_to_thin(meta)?))
@ -106,7 +106,7 @@ impl Endpoints {
.map_err(From::from)
}
pub async fn del(repo: Arc<PgRepo>, id: Uuid) -> EndpResult<usize> {
pub async fn del(repo: Arc<PgRepo>, id: Id) -> EndpResult<usize> {
repo.transaction(move |mut db| {
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns {
@ -123,14 +123,14 @@ impl Endpoints {
pub async fn set_jobs(
repo: Arc<PgRepo>,
agent_id: Uuid,
agent_id: Id,
job_idents: Vec<String>,
) -> EndpResult<Vec<Uuid>> {
) -> EndpResult<Vec<Id>> {
repo.transaction(move |mut db| {
job_idents
.into_iter()
.map(|ident| {
Uuid::parse_str(&ident).or_else(|_| {
Id::parse_str(&ident).or_else(|_| {
let job_from_db = db.find_job_by_alias(&ident);
match job_from_db {
Ok(job) => match job {
@ -143,7 +143,7 @@ impl Endpoints {
}
})
})
.collect::<Result<Vec<Uuid>, Error>>()
.collect::<Result<Vec<Id>, Error>>()
.and_then(|j| db.set_jobs_for_agent(agent_id, &j))
})
.await
@ -153,7 +153,7 @@ impl Endpoints {
pub async fn report<Data: OneOrVec<Reportable> + AsMsg + Send + Sync + 'static>(
repo: Arc<PgRepo>,
msg: Data,
agent_id: Uuid
agent_id: Id
) -> EndpResult<()> {
repo.transaction(move |mut db| {
for entry in msg.into_vec() {

@ -18,8 +18,8 @@ use u_lib::{
jobs::fat_meta_to_thin,
messaging::{AsMsg, Reportable},
models::*,
types::Id,
};
use uuid::Uuid;
use warp::{
body,
log::{custom, Info},
@ -40,7 +40,7 @@ pub fn init_endpoints(
db: PgRepo,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let path = |p: &'static str| warp::post().and(warp::path(p));
let infallible_none = |_| async { Result::<(Option<Uuid>,), Infallible>::Ok((None,)) };
let infallible_none = |_| async { Result::<(Option<Id>,), Infallible>::Ok((None,)) };
let with_db = {
let adb = Arc::new(db);
@ -49,11 +49,7 @@ pub fn init_endpoints(
let get_agents = path("get_agents")
.and(with_db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and(warp::path::param::<Id>().map(Some).or_else(infallible_none))
.and_then(Endpoints::get_agents)
.map(into_message);
@ -65,7 +61,7 @@ pub fn init_endpoints(
let get_job = path("get_job")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and(warp::path::param::<Id>())
.and_then(Endpoints::get_job)
.map(into_message);
@ -76,29 +72,25 @@ pub fn init_endpoints(
let get_agent_jobs = path("get_agent_jobs")
.and(with_db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and(warp::path::param::<Id>().map(Some).or_else(infallible_none))
.and_then(Endpoints::get_agent_jobs)
.map(into_message);
let get_personal_jobs = path("get_personal_jobs")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and(warp::path::param::<Id>())
.and_then(Endpoints::get_personal_jobs)
.map(into_message);
let del = path("del")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and(warp::path::param::<Id>())
.and_then(Endpoints::del)
.map(ok);
let set_jobs = path("set_jobs")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and(warp::path::param::<Id>())
.and(body::json::<Vec<String>>())
.and_then(Endpoints::set_jobs)
.map(into_message);
@ -206,7 +198,7 @@ pub async fn serve() -> Result<(), ServerError> {
async fn handle_rejection(rej: Rejection) -> Result<Response, Infallible> {
let resp = if let Some(err) = rej.find::<ServerError>() {
error!("{:x?}", err);
error!("{:?}", err);
RejResponse::bad_request(err.to_string())
} else if rej.is_not_found() {
RejResponse::not_found("not found placeholder")

@ -1,3 +1,5 @@
FROM alpine:3.17
FROM ubuntu:xenial
RUN apk add bash
RUN apt update && apt upgrade -y
#todo: without this, request to 1.1.1.1 fails due to invalid cert (?), research more
RUN apt install curl -y

@ -66,7 +66,7 @@ services:
- ../target/x86_64-unknown-linux-musl/${PROFILE:-debug}/u_agent:/unki/u_agent
- ../logs:/unki/logs:rw
working_dir: /unki
command: bash -c "/unki/u_agent u_server && sleep 3600"
command: bash -c "/unki/u_agent u_server; sleep 3600"
env_file:
- ../.env
environment:

@ -64,7 +64,7 @@ def run_tests():
if not only_setup_cluster:
CLUSTER.run('cargo test --test integration')
except Exception as e:
CLUSTER.print_containers_logs()
#CLUSTER.print_containers_logs()
fail(e)
finally:
_cleanup()

@ -1,12 +1,11 @@
use crate::helpers::ENV;
use u_lib::{
api::ClientHandler, config::get_self_id, jobs::fat_meta_to_thin, messaging::Reportable,
models::*,
models::*, types::Id,
};
use uuid::Uuid;
pub struct RegisteredAgent {
pub id: Uuid,
pub id: Id,
}
impl RegisteredAgent {

@ -2,7 +2,7 @@ use serde::de::DeserializeOwned;
use serde_json::{from_slice, Value};
use std::fmt::{Debug, Display};
use std::process::{Command, Output};
use u_lib::{conv::bytes_to_string, datatypes::PanelResult, proc_output::ProcOutput};
use u_lib::{conv::bytes_to_string, proc_output::ProcOutput, types::PanelResult};
const PANEL_BINARY: &str = "/u_panel";
@ -40,8 +40,8 @@ impl Panel {
.as_ref(),
);
match &result {
PanelResult::Ok(r) => eprintln!("+<< {r:02x?}"),
PanelResult::Err(e) => eprintln!("!<< {e:02x?}"),
PanelResult::Ok(r) => eprintln!("+<< {r:?}"),
PanelResult::Err(e) => eprintln!("!<< {e:?}"),
}
result
}

@ -31,7 +31,7 @@ async fn setup_tasks() {
let cmd = format!("map create {} {}", agent_id, job_alias);
let assigned_ids: Vec<Uuid> = Panel::check_output(cmd);
retry_with_interval(3, AGENT_ITERATION_INTERVAL, || {
retry_with_interval(5, AGENT_ITERATION_INTERVAL, || {
let result =
Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", assigned_ids[0]))
.remove(0);
@ -64,7 +64,7 @@ async fn large_payload() {
let cmd = format!("map create {agent_id} {job_alias}");
let assigned_ids: Vec<Uuid> = Panel::check_output(cmd);
retry_with_interval(3, AGENT_ITERATION_INTERVAL, || {
retry_with_interval(5, AGENT_ITERATION_INTERVAL, || {
let result =
Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", assigned_ids[0]))
.remove(0);

@ -10,7 +10,7 @@ edition = "2021"
anyhow = { workspace = true }
chrono = "0.4.19"
diesel = { workspace = true, optional = true }
diesel-derive-enum = { version = "2.0.0-rc.0", features = ["postgres"], optional = true }
diesel-derive-enum = { version = "2.0.0", features = ["postgres"], optional = true }
deadpool-diesel = { workspace = true, optional = true }
dotenv = "0.15.0"
envy = "0.4.2"

@ -6,14 +6,14 @@ use anyhow::{Context, Result};
use reqwest::{header, header::HeaderMap, Certificate, Client, Identity, Method, Url};
use serde::de::DeserializeOwned;
use serde_json::{from_str, Value};
use uuid::Uuid;
use crate::{
config::{get_self_id, MASTER_PORT},
conv::opt_to_string,
messaging::{self, AsMsg},
misc::OneOrVec,
models::{self},
models::*,
types::Id,
UError, UResult,
};
@ -36,23 +36,24 @@ impl ClientHandler {
default_headers.insert(header::AUTHORIZATION, format!("Bearer {pwd}"));
}
let dns_response = Client::new()
.request(
Method::GET,
format!("https://1.1.1.1/dns-query?name={server}&type=A"),
)
.header(header::ACCEPT, "application/dns-json")
.send()
.await?
.text()
.await?;
// todo: don't rely only on dns resolve
let client = {
let client = Client::builder()
.identity(identity)
.default_headers(HeaderMap::try_from(&default_headers).unwrap())
.add_root_certificate(Certificate::from_pem(ROOT_CA_CERT).unwrap());
let dns_response = Client::new()
.request(
Method::GET,
format!("https://1.1.1.1/dns-query?name={server}&type=A"),
)
.header(header::ACCEPT, "application/dns-json")
.send()
.await?
.text()
.await?;
match from_str::<Value>(&dns_response).unwrap()["Answer"]
.get(0)
.and_then(|a| a.get("data"))
@ -115,7 +116,7 @@ impl ClientHandler {
}
// get jobs for client
pub async fn get_personal_jobs(&self, url_param: Uuid) -> Result<Vec<models::AssignedJobById>> {
pub async fn get_personal_jobs(&self, url_param: Id) -> Result<Vec<AssignedJobById>> {
self.req(format!("get_personal_jobs/{}", url_param)).await
}
@ -130,12 +131,12 @@ impl ClientHandler {
}
/// get exact job
pub async fn get_job(&self, job: Uuid) -> Result<models::FatJobMeta<true>> {
pub async fn get_job(&self, job: Id) -> Result<FatJobMeta<true>> {
self.req(format!("get_job/{job}")).await
}
/// get all available jobs
pub async fn get_jobs(&self) -> Result<Vec<models::ThinJobMeta>> {
pub async fn get_jobs(&self) -> Result<Vec<ThinJobMeta>> {
self.req("get_jobs").await
}
}
@ -144,52 +145,45 @@ impl ClientHandler {
#[cfg(feature = "panel")]
impl ClientHandler {
/// agent listing
pub async fn get_agents(&self, agent: Option<Uuid>) -> Result<Vec<models::Agent>> {
pub async fn get_agents(&self, agent: Option<Id>) -> Result<Vec<Agent>> {
self.req(format!("get_agents/{}", opt_to_string(agent)))
.await
}
/// update agent
pub async fn update_agent(&self, agent: models::Agent) -> Result<()> {
pub async fn update_agent(&self, agent: Agent) -> Result<()> {
self.req_with_payload("update_agent", agent).await
}
/// update job
pub async fn update_job(&self, job: models::FatJobMeta<true>) -> Result<()> {
pub async fn update_job(&self, job: FatJobMeta<true>) -> Result<()> {
self.req_with_payload("update_job", job).await
}
/// update result
pub async fn update_result(&self, result: models::AssignedJob) -> Result<()> {
pub async fn update_result(&self, result: AssignedJob) -> Result<()> {
self.req_with_payload("update_result", result).await
}
/// create and upload job
pub async fn upload_jobs(
&self,
payload: impl OneOrVec<models::FatJobMeta<true>>,
) -> Result<Vec<Uuid>> {
pub async fn upload_jobs(&self, payload: impl OneOrVec<FatJobMeta<true>>) -> Result<Vec<Id>> {
self.req_with_payload("upload_jobs", payload.into_vec())
.await
}
/// delete something
pub async fn del(&self, item: Uuid) -> Result<i32> {
pub async fn del(&self, item: Id) -> Result<i32> {
self.req(format!("del/{item}")).await
}
/// set jobs for any agent
pub async fn set_jobs(
&self,
agent: Uuid,
job_idents: impl OneOrVec<String>,
) -> Result<Vec<Uuid>> {
pub async fn set_jobs(&self, agent: Id, job_idents: impl OneOrVec<String>) -> Result<Vec<Id>> {
self.req_with_payload(format!("set_jobs/{agent}"), job_idents.into_vec())
.await
}
/// get jobs for any agent
pub async fn get_agent_jobs(&self, agent: Option<Uuid>) -> Result<Vec<models::AssignedJob>> {
pub async fn get_agent_jobs(&self, agent: Option<Id>) -> Result<Vec<AssignedJob>> {
self.req(format!("get_agent_jobs/{}", opt_to_string(agent)))
.await
}

@ -1,10 +1,10 @@
use crate::models::ThinJobMeta;
use crate::types::Id;
use lazy_static::lazy_static;
use parking_lot::{RwLock, RwLockReadGuard};
use std::{collections::HashMap, ops::Deref};
use uuid::Uuid;
type Cache = HashMap<Uuid, ThinJobMeta>;
type Cache = HashMap<Id, ThinJobMeta>;
lazy_static! {
static ref JOB_CACHE: RwLock<Cache> = RwLock::new(HashMap::new());
@ -17,11 +17,11 @@ impl JobCache {
JOB_CACHE.write().insert(job_meta.id, job_meta);
}
pub fn contains(id: Uuid) -> bool {
pub fn contains(id: Id) -> bool {
JOB_CACHE.read().contains_key(&id)
}
pub fn get<'jh>(id: Uuid) -> Option<JobCacheHolder<'jh>> {
pub fn get<'jh>(id: Id) -> Option<JobCacheHolder<'jh>> {
if !Self::contains(id) {
return None;
}
@ -29,12 +29,12 @@ impl JobCache {
Some(JobCacheHolder(lock, id))
}
pub fn remove(id: Uuid) {
pub fn remove(id: Id) {
JOB_CACHE.write().remove(&id);
}
}
pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Uuid);
pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Id);
impl<'jh> Deref for JobCacheHolder<'jh> {
type Target = ThinJobMeta;

@ -2,20 +2,21 @@ use envy::{from_env, prefixed, Result as EnvResult};
use lazy_static::lazy_static;
use serde::Deserialize;
use std::time::Duration;
use uuid::Uuid;
pub use envy::Error;
use crate::types::Id;
pub const MASTER_PORT: u16 = 63714;
pub const AGENT_ITERATION_INTERVAL: Duration = Duration::from_secs(5);
lazy_static! {
static ref ID: Uuid = Uuid::new_v4();
static ref ID: Id = Id::new_v4();
}
#[inline]
pub fn get_self_id() -> Uuid {
pub fn get_self_id() -> Id {
*ID
}

@ -1,8 +1,8 @@
use chrono::{offset::Local, DateTime};
use std::time::SystemTime;
pub fn bytes_to_string(v: &[u8]) -> String {
String::from_utf8_lossy(v).to_string()
pub fn bytes_to_string(data: &[u8]) -> String {
String::from_utf8_lossy(data).to_string()
}
pub fn opt_to_string<T: ToString>(item: Option<T>) -> String {
@ -12,8 +12,17 @@ pub fn opt_to_string<T: ToString>(item: Option<T>) -> String {
}
}
pub fn systime_to_string(time: &SystemTime) -> String {
DateTime::<Local>::from(*time)
pub fn systime_to_string(time: SystemTime) -> String {
DateTime::<Local>::from(time)
.format("%d/%m/%Y %T")
.to_string()
}
pub fn bytes_to_string_truncated(data: &[u8], max_len: usize) -> String {
if data.len() > max_len {
let truncated = &data[..max_len];
String::from_utf8_lossy(truncated).to_string() + " <truncated>"
} else {
String::from_utf8_lossy(&data).to_string()
}
}

@ -4,7 +4,6 @@ pub mod cache;
pub mod combined_result;
pub mod config;
pub mod conv;
pub mod datatypes;
#[cfg(feature = "server")]
pub mod db;
pub mod error;
@ -16,6 +15,7 @@ pub mod misc;
pub mod models;
pub mod platform;
pub mod proc_output;
pub mod types;
pub mod ufs;
#[cfg(unix)]
pub mod unix;

@ -1,11 +1,11 @@
mod files;
use crate::models::*;
use crate::types::Id;
use crate::UError;
pub use files::*;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use uuid::Uuid;
pub trait AsMsg: Clone + Serialize + Debug {}
@ -17,7 +17,7 @@ impl AsMsg for FatJobMeta<true> {}
impl AsMsg for Reportable {}
impl AsMsg for String {}
impl AsMsg for ThinJobMeta {}
impl AsMsg for Uuid {}
impl AsMsg for Id {}
impl AsMsg for i32 {}
impl AsMsg for u8 {}
impl AsMsg for () {}

@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use std::fmt;
use std::time::SystemTime;
use strum::Display;
@ -11,15 +12,16 @@ mod server {
#[cfg(feature = "server")]
use self::server::*;
use crate::{config::get_self_id, executor::ExecResult, jobs::NamedJobBatch, platform::Platform};
use uuid::Uuid;
use crate::{
config::get_self_id, conv::systime_to_string, executor::ExecResult, jobs::NamedJobBatch,
platform::Platform, types::Id,
};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)]
#[cfg_attr(
feature = "server",
derive(DbEnum),
DieselTypePath = "sql_types::Agentstate"
ExistingTypePath = "sql_types::Agentstate"
)]
pub enum AgentState {
New,
@ -28,7 +30,7 @@ pub enum AgentState {
}
//belongs_to
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Serialize, Deserialize, PartialEq)]
#[cfg_attr(
feature = "server",
derive(Identifiable, Queryable, Insertable, AsChangeset),
@ -38,7 +40,7 @@ pub struct Agent {
pub alias: Option<String>,
pub hostname: String,
pub host_info: String,
pub id: Uuid,
pub id: Id,
pub ip_gray: Option<String>,
pub ip_white: Option<String>,
pub is_root: bool,
@ -52,7 +54,7 @@ pub struct Agent {
}
impl Agent {
pub fn with_id(id: Uuid) -> Self {
pub fn with_id(id: Id) -> Self {
Self {
id,
..Default::default()
@ -119,3 +121,24 @@ impl Default for Agent {
}
}
}
impl fmt::Debug for Agent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Agent")
.field("alias", &self.alias)
.field("hostname", &self.hostname)
.field("host_info", &self.host_info)
.field("id", &self.id.to_string())
.field("ip_gray", &self.ip_gray)
.field("ip_white", &self.ip_white)
.field("is_root", &self.is_root)
.field("is_root_allowed", &self.is_root_allowed)
.field("last_active", &systime_to_string(self.last_active))
.field("platform", &self.platform)
.field("regtime", &systime_to_string(self.regtime))
.field("state", &self.state)
.field("token", &self.token)
.field("username", &self.username)
.finish()
}
}

@ -1,25 +1,28 @@
use super::{JobState, JobType, ThinJobMeta};
use crate::config::get_self_id;
#[cfg(feature = "server")]
use crate::models::schema::*;
use crate::{
config::get_self_id,
conv::{bytes_to_string_truncated, systime_to_string},
types::Id,
};
#[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, time::SystemTime};
use uuid::Uuid;
use std::{borrow::Cow, fmt::Debug, time::SystemTime};
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)]
#[derive(Serialize, Deserialize, Clone, PartialEq)]
#[cfg_attr(
feature = "server",
derive(Queryable, Identifiable, Insertable, AsChangeset),
diesel(table_name = results)
)]
pub struct AssignedJob {
pub agent_id: Uuid,
pub agent_id: Id,
pub alias: Option<String>,
pub created: SystemTime,
pub id: Uuid,
pub job_id: Uuid,
pub id: Id,
pub job_id: Id,
pub result: Option<Vec<u8>>,
pub state: JobState,
pub exec_type: JobType,
@ -27,19 +30,48 @@ pub struct AssignedJob {
pub updated: SystemTime,
}
impl Debug for AssignedJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AssignedJob")
.field("agent_id", &self.agent_id.to_string())
.field("alias", &self.alias)
.field("created", &systime_to_string(self.created))
.field("id", &self.id.to_string())
.field("job_id", &self.job_id.to_string())
.field(
"result",
&self
.result
.as_ref()
.map(|r| bytes_to_string_truncated(&r, 256)),
)
.field("state", &self.state)
.field("exec_type", &self.exec_type)
.field("retcode", &self.retcode)
.field("updated", &systime_to_string(self.updated))
.finish()
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct AssignedJobById {
pub agent_id: Uuid,
pub id: Uuid,
pub job_id: Uuid,
pub agent_id: Id,
pub id: Id,
pub job_id: Id,
}
impl From<(&ThinJobMeta, AssignedJobById)> for AssignedJob {
fn from((meta, assigned_job_by_id): (&ThinJobMeta, AssignedJobById)) -> Self {
fn from((meta, ids): (&ThinJobMeta, AssignedJobById)) -> Self {
let AssignedJobById {
agent_id,
id,
job_id,
} = ids;
AssignedJob {
id: assigned_job_by_id.id,
agent_id: assigned_job_by_id.agent_id,
job_id: assigned_job_by_id.job_id,
id,
agent_id,
job_id,
alias: meta.alias.clone(),
exec_type: meta.exec_type,
..Default::default()
@ -51,8 +83,8 @@ impl Default for AssignedJobById {
fn default() -> Self {
Self {
agent_id: get_self_id(),
id: Uuid::new_v4(),
job_id: Uuid::nil(),
id: Id::new_v4(),
job_id: Id::nil(),
}
}
}
@ -60,11 +92,11 @@ impl Default for AssignedJobById {
impl Default for AssignedJob {
fn default() -> Self {
Self {
agent_id: Uuid::nil(),
agent_id: Id::nil(),
alias: None,
created: SystemTime::now(),
id: Uuid::new_v4(),
job_id: Uuid::nil(),
id: Id::new_v4(),
job_id: Id::nil(),
result: None,
state: JobState::Queued,
retcode: None,

@ -1,15 +1,17 @@
use std::fmt;
use super::JobType;
use crate::models::payload::Payload;
#[cfg(feature = "server")]
use crate::models::schema::*;
use crate::platform::Platform;
use crate::types::Id;
use crate::{UError, UResult};
#[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(
feature = "server",
derive(Queryable, Identifiable, Insertable, AsChangeset),
@ -20,7 +22,7 @@ pub struct ThinJobMeta {
/// string like `bash -c {} -a 1 --arg2`,
/// where {} is replaced by executable's tmp path
pub argv: String,
pub id: Uuid,
pub id: Id,
pub exec_type: JobType,
/// target triple
pub platform: String,
@ -29,7 +31,21 @@ pub struct ThinJobMeta {
pub schedule: Option<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
impl fmt::Debug for ThinJobMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThinJobMeta")
.field("alias", &self.alias)
.field("argv", &self.argv)
.field("id", &self.id.to_string())
.field("exec_type", &self.exec_type)
.field("platform", &self.platform)
.field("payload", &self.payload)
.field("schedule", &self.schedule)
.finish()
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct FatJobMeta<const VALIDATED: bool = false> {
#[serde(default)]
pub alias: Option<String>,
@ -39,8 +55,8 @@ pub struct FatJobMeta<const VALIDATED: bool = false> {
#[serde(default)]
pub argv: String,
#[serde(default = "Uuid::new_v4")]
pub id: Uuid,
#[serde(default = "Id::new_v4")]
pub id: Id,
#[serde(default)]
pub exec_type: JobType,
@ -57,6 +73,20 @@ pub struct FatJobMeta<const VALIDATED: bool = false> {
pub schedule: Option<String>,
}
impl<const VALIDATED: bool> fmt::Debug for FatJobMeta<VALIDATED> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FatJobMeta")
.field("alias", &self.alias)
.field("argv", &self.argv)
.field("id", &self.id.to_string())
.field("exec_type", &self.exec_type)
.field("platform", &self.platform)
.field("payload", &self.payload)
.field("schedule", &self.schedule)
.finish()
}
}
impl FatJobMeta {
pub fn validated(self) -> UResult<FatJobMeta<true>> {
JobMetaBuilder { inner: self }.build()
@ -74,7 +104,7 @@ impl FatJobMeta {
impl<const VALIDATED: bool> Default for FatJobMeta<VALIDATED> {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
id: Id::new_v4(),
alias: None,
argv: String::new(),
exec_type: JobType::Shell,

@ -13,7 +13,7 @@ use self::server::*;
#[cfg_attr(
feature = "server",
derive(DbEnum),
DieselTypePath = "sql_types::Jobstate"
ExistingTypePath = "sql_types::Jobstate"
)]
pub enum JobState {
/// server created a job, but client didn't get it yet
@ -30,7 +30,7 @@ pub enum JobState {
#[cfg_attr(
feature = "server",
derive(DbEnum),
DieselTypePath = "sql_types::Jobtype"
ExistingTypePath = "sql_types::Jobtype"
)]
pub enum JobType {
Init,

@ -1,4 +1,4 @@
use crate::ufs;
use crate::{conv::bytes_to_string_truncated, ufs};
use serde::{Deserialize, Serialize};
use std::{fmt, path::PathBuf};
@ -64,16 +64,10 @@ impl fmt::Debug for Payload {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Data(data) => {
const MAX_RESULT_LEN: usize = 256;
let mut dbg = &mut f.debug_tuple("Data");
let data = bytes_to_string_truncated(data, 256);
let readable_data = if data.len() > MAX_RESULT_LEN {
let truncated = &data[..MAX_RESULT_LEN];
String::from_utf8_lossy(truncated).to_string() + " <truncated>"
} else {
String::from_utf8_lossy(&data).to_string()
};
dbg = dbg.field(&readable_data);
dbg = dbg.field(&data);
dbg.finish()
}
Self::Ident(ident) => f.debug_tuple("Ident").field(ident).finish(),

@ -24,3 +24,5 @@ impl<M: Serialize> ToString for PanelResult<M> {
serde_json::to_string(self).unwrap()
}
}
pub type Id = uuid::Uuid;

@ -165,6 +165,11 @@ pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error
}
}
#[cfg(windows)]
pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error> {
todo!()
}
pub fn cleanup() {
let index = INDEX.read();

Loading…
Cancel
Save