rename uid -> id, remove BaseMessage

master
plazmoid 2 years ago
parent d7ea1ffb85
commit 81cefee5bf
  1. 20
      bin/u_agent/src/lib.rs
  2. 24
      bin/u_panel/src/argparse.rs
  3. 91
      bin/u_server/src/db.rs
  4. 74
      bin/u_server/src/handlers.rs
  5. 33
      bin/u_server/src/u_server.rs
  6. 4
      images/u_agent.Dockerfile
  7. 2
      images/u_server.Dockerfile
  8. 16
      integration/tests/fixtures/agent.rs
  9. 22
      integration/tests/helpers/jobs.rs
  10. 1
      integration/tests/helpers/mod.rs
  11. 52
      integration/tests/integration/behaviour.rs
  12. 18
      lib/u_lib/src/api.rs
  13. 14
      lib/u_lib/src/cache.rs
  14. 9
      lib/u_lib/src/config.rs
  15. 176
      lib/u_lib/src/jobs.rs
  16. 56
      lib/u_lib/src/messaging/base.rs
  17. 8
      lib/u_lib/src/messaging/mod.rs
  18. 10
      lib/u_lib/src/models/agent.rs
  19. 4
      lib/u_lib/src/models/jobs/assigned.rs
  20. 2
      lib/u_lib/src/models/payload.rs
  21. 10
      lib/u_lib/src/ufs/mod.rs
  22. 2
      lib/u_lib/tests/fixtures/echoer.rs

@ -7,17 +7,15 @@ use tokio::time::{sleep, Duration};
use u_lib::{ use u_lib::{
api::ClientHandler, api::ClientHandler,
cache::JobCache, cache::JobCache,
config::{get_self_uid, EndpointsEnv}, config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL},
error::ErrChan, error::ErrChan,
executor::pop_completed, executor::pop_completed,
jobs::{fat_meta_to_thin, UnnamedJobsBatch}, jobs::{fat_meta_to_thin, AnonymousJobBatch},
logging::init_logger, logging::init_logger,
messaging::Reportable, messaging::Reportable,
models::AssignedJobById, models::AssignedJobById,
}; };
const ITERATION_LATENCY: u64 = 5;
pub async fn process_request(jobs: Vec<AssignedJobById>, client: &ClientHandler) { pub async fn process_request(jobs: Vec<AssignedJobById>, client: &ClientHandler) {
if !jobs.is_empty() { if !jobs.is_empty() {
for jr in &jobs { for jr in &jobs {
@ -28,7 +26,7 @@ pub async fn process_request(jobs: Vec<AssignedJobById>, client: &ClientHandler)
Ok(result) => break result, Ok(result) => break result,
Err(err) => { Err(err) => {
debug!("{:?} \nretrying...", err); debug!("{:?} \nretrying...", err);
sleep(Duration::from_secs(ITERATION_LATENCY)).await; sleep(AGENT_ITERATION_INTERVAL).await;
} }
} }
}; };
@ -54,7 +52,7 @@ pub async fn process_request(jobs: Vec<AssignedJobById>, client: &ClientHandler)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
UnnamedJobsBatch::from_meta_with_id(meta_with_ids) AnonymousJobBatch::from_meta_with_id(meta_with_ids)
.spawn() .spawn()
.await; .await;
} }
@ -80,9 +78,9 @@ async fn error_reporting(client: ClientHandler) -> ! {
} }
async fn agent_loop(client: ClientHandler) -> ! { async fn agent_loop(client: ClientHandler) -> ! {
let self_uid = get_self_uid(); let self_id = get_self_id();
loop { loop {
match client.get_personal_jobs(self_uid).await { match client.get_personal_jobs(self_id).await {
Ok(jobs) => { Ok(jobs) => {
process_request(jobs, &client).await; process_request(jobs, &client).await;
} }
@ -103,7 +101,7 @@ async fn agent_loop(client: ClientHandler) -> ! {
ErrChan::send(err, "report").await; ErrChan::send(err, "report").await;
} }
} }
sleep(Duration::from_secs(ITERATION_LATENCY)).await; sleep(AGENT_ITERATION_INTERVAL).await;
} }
} }
@ -113,7 +111,7 @@ pub fn run_forever() -> ! {
if cfg!(debug_assertions) { if cfg!(debug_assertions) {
init_logger(Some(format!( init_logger(Some(format!(
"u_agent-{}", "u_agent-{}",
get_self_uid() get_self_id()
.hyphenated() .hyphenated()
.to_string() .to_string()
.split("-") .split("-")
@ -125,7 +123,7 @@ pub fn run_forever() -> ! {
u_lib::unix::daemonize() u_lib::unix::daemonize()
} }
info!("Starting agent {}", get_self_uid()); info!("Starting agent {}", get_self_id());
Builder::new_multi_thread() Builder::new_multi_thread()
.enable_all() .enable_all()

@ -43,7 +43,7 @@ enum JobCmd {
enum JobMapCRUD { enum JobMapCRUD {
Create { Create {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
agent_uid: Uuid, agent_id: Uuid,
job_idents: Vec<String>, job_idents: Vec<String>,
}, },
@ -55,14 +55,14 @@ enum JobMapCRUD {
enum RUD { enum RUD {
Read { Read {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
uid: Option<Uuid>, id: Option<Uuid>,
}, },
Update { Update {
item: String, item: String,
}, },
Delete { Delete {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
uid: Uuid, id: Uuid,
}, },
} }
@ -78,12 +78,12 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult<Value
let catcher: UResult<Value> = (|| async { let catcher: UResult<Value> = (|| async {
Ok(match args.cmd { Ok(match args.cmd {
Cmd::Agents(action) => match action { Cmd::Agents(action) => match action {
RUD::Read { uid } => into_value(client.get_agents(uid).await?), RUD::Read { id } => into_value(client.get_agents(id).await?),
RUD::Update { item } => { RUD::Update { item } => {
let agent = from_str::<Agent>(&item)?; let agent = from_str::<Agent>(&item)?;
into_value(client.update_agent(agent).await?) into_value(client.update_agent(agent).await?)
} }
RUD::Delete { uid } => into_value(client.del(uid).await?), RUD::Delete { id } => into_value(client.del(id).await?),
}, },
Cmd::Jobs(action) => match action { Cmd::Jobs(action) => match action {
JobCRUD::Create { job } => { JobCRUD::Create { job } => {
@ -96,8 +96,8 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult<Value
into_value(client.upload_jobs(job).await?) into_value(client.upload_jobs(job).await?)
} }
JobCRUD::RUD(RUD::Read { uid }) => match uid { JobCRUD::RUD(RUD::Read { id }) => match id {
Some(uid) => into_value(client.get_job(uid).await?), Some(id) => into_value(client.get_job(id).await?),
None => into_value(client.get_jobs().await?), None => into_value(client.get_jobs().await?),
}, },
JobCRUD::RUD(RUD::Update { item }) => { JobCRUD::RUD(RUD::Update { item }) => {
@ -105,19 +105,19 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> PanelResult<Value
let job = raw_job.validated()?; let job = raw_job.validated()?;
into_value(client.update_job(job).await?) into_value(client.update_job(job).await?)
} }
JobCRUD::RUD(RUD::Delete { uid }) => into_value(client.del(uid).await?), JobCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
}, },
Cmd::Map(action) => match action { Cmd::Map(action) => match action {
JobMapCRUD::Create { JobMapCRUD::Create {
agent_uid, agent_id,
job_idents, job_idents,
} => into_value(client.set_jobs(agent_uid, job_idents).await?), } => into_value(client.set_jobs(agent_id, job_idents).await?),
JobMapCRUD::RUD(RUD::Read { uid }) => into_value(client.get_agent_jobs(uid).await?), JobMapCRUD::RUD(RUD::Read { id }) => into_value(client.get_agent_jobs(id).await?),
JobMapCRUD::RUD(RUD::Update { item }) => { JobMapCRUD::RUD(RUD::Update { item }) => {
let assigned = from_str::<AssignedJob>(&item)?; let assigned = from_str::<AssignedJob>(&item)?;
into_value(client.update_result(assigned).await?) into_value(client.update_result(assigned).await?)
} }
JobMapCRUD::RUD(RUD::Delete { uid }) => into_value(client.del(uid).await?), JobMapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
}, },
Cmd::Ping => into_value(client.ping().await?), Cmd::Ping => into_value(client.ping().await?),
Cmd::Serve => { Cmd::Serve => {

@ -57,14 +57,14 @@ impl UDB<'_> {
.map_err(with_err_ctx("Can't insert jobs")) .map_err(with_err_ctx("Can't insert jobs"))
} }
pub fn get_job(&mut self, uid: Uuid) -> Result<Option<ThinJobMeta>> { pub fn get_job(&mut self, id: Uuid) -> Result<Option<ThinJobMeta>> {
use schema::jobs; use schema::jobs;
jobs::table jobs::table
.filter(jobs::id.eq(uid)) .filter(jobs::id.eq(id))
.first(self.conn) .first(self.conn)
.optional() .optional()
.map_err(with_err_ctx(format!("Can't get job {uid}"))) .map_err(with_err_ctx(format!("Can't get job {id}")))
} }
pub fn get_jobs(&mut self) -> Result<Vec<ThinJobMeta>> { pub fn get_jobs(&mut self) -> Result<Vec<ThinJobMeta>> {
@ -108,14 +108,14 @@ impl UDB<'_> {
Ok(()) Ok(())
} }
pub fn get_agent(&mut self, uid: Uuid) -> Result<Option<Agent>> { pub fn get_agent(&mut self, id: Uuid) -> Result<Option<Agent>> {
use schema::agents; use schema::agents;
agents::table agents::table
.filter(agents::id.eq(uid)) .filter(agents::id.eq(id))
.first(self.conn) .first(self.conn)
.optional() .optional()
.map_err(with_err_ctx(format!("Can't get agent {uid:?}"))) .map_err(with_err_ctx(format!("Can't get agent {id:?}")))
} }
pub fn get_agents(&mut self) -> Result<Vec<Agent>> { pub fn get_agents(&mut self) -> Result<Vec<Agent>> {
@ -126,40 +126,36 @@ impl UDB<'_> {
.map_err(with_err_ctx(format!("Can't get agents"))) .map_err(with_err_ctx(format!("Can't get agents")))
} }
pub fn update_job_status(&mut self, uid: Uuid, status: JobState) -> Result<()> { pub fn update_job_status(&mut self, id: Uuid, status: JobState) -> Result<()> {
use schema::results; use schema::results;
diesel::update(results::table) diesel::update(results::table)
.filter(results::id.eq(uid)) .filter(results::id.eq(id))
.set(results::state.eq(status)) .set(results::state.eq(status))
.execute(self.conn) .execute(self.conn)
.map_err(with_err_ctx(format!("Can't update status of job {uid}")))?; .map_err(with_err_ctx(format!("Can't update status of job {id}")))?;
Ok(()) Ok(())
} }
//TODO: filters possibly could work in a wrong way, check //TODO: filters possibly could work in a wrong way, check
pub fn get_exact_jobs( pub fn get_exact_jobs(&mut self, id: Option<Uuid>, personal: bool) -> Result<Vec<AssignedJob>> {
&mut self,
uid: Option<Uuid>,
personal: bool,
) -> Result<Vec<AssignedJob>> {
use schema::results; use schema::results;
let mut q = results::table.into_boxed(); let mut q = results::table.into_boxed();
/*if uid.is_some() { /*if id.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap())) q = q.filter(results::agent_id.eq(id.unwrap()))
}*/ }*/
if personal { if personal {
q = q.filter( q = q.filter(
results::state results::state
.eq(JobState::Queued) .eq(JobState::Queued)
.and(results::agent_id.eq(uid.unwrap())), .and(results::agent_id.eq(id.unwrap())),
) )
} else if uid.is_some() { } else if id.is_some() {
q = q q = q
.filter(results::agent_id.eq(uid.unwrap())) .filter(results::agent_id.eq(id.unwrap()))
.or_filter(results::job_id.eq(uid.unwrap())) .or_filter(results::job_id.eq(id.unwrap()))
.or_filter(results::id.eq(uid.unwrap())) .or_filter(results::id.eq(id.unwrap()))
} }
let result = q let result = q
.load::<AssignedJob>(self.conn) .load::<AssignedJob>(self.conn)
@ -167,38 +163,39 @@ impl UDB<'_> {
Ok(result) Ok(result)
} }
pub fn set_jobs_for_agent(&mut self, agent_uid: Uuid, job_uids: &[Uuid]) -> Result<Vec<Uuid>> { pub fn set_jobs_for_agent(&mut self, agent_id: Uuid, job_ids: &[Uuid]) -> Result<Vec<Uuid>> {
use schema::{jobs, results}; use schema::{jobs, results};
let agent_platform = match self.get_agent(agent_uid)? { let agent_platform = match self.get_agent(agent_id)? {
Some(agent) => Platform::new(&agent.platform), Some(agent) => Platform::new(&agent.platform),
None => { None => {
return Err(Error::ProcessingError(format!( return Err(Error::ProcessingError(format!(
"Agent {agent_uid} not found" "Agent {agent_id} not found"
))) )))
} }
}; };
for uid in job_uids { let jobs_meta = jobs::table
let job_platform = jobs::table .select((jobs::id, jobs::alias, jobs::platform))
.select(jobs::platform) .filter(jobs::id.eq_any(job_ids))
.filter(jobs::id.eq(uid)) .load::<(Uuid, Option<String>, String)>(self.conn)
.first(self.conn) .map_err(with_err_ctx(format!("Can't find jobs {job_ids:?}")))?;
.map_err(with_err_ctx(format!("Can't find job {uid}")))?;
if !agent_platform.matches(&job_platform) { for meta in &jobs_meta {
if !agent_platform.matches(&meta.2) {
return Err(Error::InsuitablePlatform( return Err(Error::InsuitablePlatform(
agent_platform.into_string(), agent_platform.into_string(),
job_platform, meta.2.clone(),
)); ));
} }
} }
let job_requests = job_uids let job_requests = jobs_meta
.iter() .into_iter()
.map(|job_uid| AssignedJob { .map(|(job_id, alias, _)| AssignedJob {
job_id: *job_uid, job_id,
agent_id: agent_uid, agent_id,
alias,
..Default::default() ..Default::default()
}) })
.collect::<Vec<AssignedJob>>(); .collect::<Vec<AssignedJob>>();
@ -207,19 +204,19 @@ impl UDB<'_> {
.values(&job_requests) .values(&job_requests)
.execute(self.conn) .execute(self.conn)
.map_err(with_err_ctx(format!( .map_err(with_err_ctx(format!(
"Can't setup jobs {job_uids:?} for agent {agent_uid:?}" "Can't setup jobs {job_ids:?} for agent {agent_id:?}"
)))?; )))?;
Ok(job_requests.iter().map(|aj| aj.id).collect()) Ok(job_requests.iter().map(|aj| aj.id).collect())
} }
pub fn del_jobs(&mut self, uids: &[Uuid]) -> Result<usize> { pub fn del_jobs(&mut self, ids: &[Uuid]) -> Result<usize> {
use schema::jobs; use schema::jobs;
let mut affected = 0; let mut affected = 0;
for &uid in uids { for id in ids {
let deleted = diesel::delete(jobs::table) let deleted = diesel::delete(jobs::table)
.filter(jobs::id.eq(uid)) .filter(jobs::id.eq(id))
.execute(self.conn) .execute(self.conn)
.map_err(with_err_ctx("Can't delete jobs"))?; .map_err(with_err_ctx("Can't delete jobs"))?;
affected += deleted; affected += deleted;
@ -227,13 +224,13 @@ impl UDB<'_> {
Ok(affected) Ok(affected)
} }
pub fn del_results(&mut self, uids: &[Uuid]) -> Result<usize> { pub fn del_results(&mut self, ids: &[Uuid]) -> Result<usize> {
use schema::results; use schema::results;
let mut affected = 0; let mut affected = 0;
for &uid in uids { for id in ids {
let deleted = diesel::delete(results::table) let deleted = diesel::delete(results::table)
.filter(results::id.eq(uid)) .filter(results::id.eq(id))
.execute(self.conn) .execute(self.conn)
.map_err(with_err_ctx("Can't delete results"))?; .map_err(with_err_ctx("Can't delete results"))?;
affected += deleted; affected += deleted;
@ -241,13 +238,13 @@ impl UDB<'_> {
Ok(affected) Ok(affected)
} }
pub fn del_agents(&mut self, uids: &[Uuid]) -> Result<usize> { pub fn del_agents(&mut self, ids: &[Uuid]) -> Result<usize> {
use schema::agents; use schema::agents;
let mut affected = 0; let mut affected = 0;
for &uid in uids { for id in ids {
let deleted = diesel::delete(agents::table) let deleted = diesel::delete(agents::table)
.filter(agents::id.eq(uid)) .filter(agents::id.eq(id))
.execute(self.conn) .execute(self.conn)
.map_err(with_err_ctx("Can't delete agents"))?; .map_err(with_err_ctx("Can't delete agents"))?;
affected += deleted; affected += deleted;

@ -5,7 +5,7 @@ use crate::error::Error;
use crate::ValidJobMeta; use crate::ValidJobMeta;
use u_lib::jobs::{fat_meta_to_thin, thin_meta_to_fat}; use u_lib::jobs::{fat_meta_to_thin, thin_meta_to_fat};
use u_lib::{ use u_lib::{
messaging::{AsMsg, BaseMessage, Reportable}, messaging::{AsMsg, Reportable},
misc::OneOrVec, misc::OneOrVec,
models::*, models::*,
}; };
@ -18,11 +18,11 @@ type EndpResult<T> = Result<T, Rejection>;
pub struct Endpoints; pub struct Endpoints;
impl Endpoints { impl Endpoints {
pub async fn get_agents(repo: Arc<PgRepo>, uid: Option<Uuid>) -> EndpResult<Vec<Agent>> { pub async fn get_agents(repo: Arc<PgRepo>, id: Option<Uuid>) -> EndpResult<Vec<Agent>> {
repo.interact(move |mut db| { repo.interact(move |mut db| {
Ok(match uid { Ok(match id {
Some(uid) => { Some(id) => {
if let Some(agent) = db.get_agent(uid)? { if let Some(agent) = db.get_agent(id)? {
vec![agent] vec![agent]
} else { } else {
vec![] vec![]
@ -35,8 +35,8 @@ impl Endpoints {
.map_err(From::from) .map_err(From::from)
} }
pub async fn get_job(repo: Arc<PgRepo>, uid: Uuid) -> EndpResult<ValidJobMeta> { pub async fn get_job(repo: Arc<PgRepo>, id: Uuid) -> EndpResult<ValidJobMeta> {
let Some(job) = repo.interact(move |mut db| db.get_job(uid)).await? else { let Some(job) = repo.interact(move |mut db| db.get_job(id)).await? else {
return Err(not_found()) return Err(not_found())
}; };
@ -52,23 +52,23 @@ impl Endpoints {
pub async fn get_agent_jobs( pub async fn get_agent_jobs(
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
uid: Option<Uuid>, id: Option<Uuid>,
) -> EndpResult<Vec<AssignedJob>> { ) -> EndpResult<Vec<AssignedJob>> {
repo.interact(move |mut db| db.get_exact_jobs(uid, false)) repo.interact(move |mut db| db.get_exact_jobs(id, false))
.await .await
.map_err(From::from) .map_err(From::from)
} }
pub async fn get_personal_jobs(repo: Arc<PgRepo>, uid: Uuid) -> EndpResult<Vec<AssignedJob>> { pub async fn get_personal_jobs(repo: Arc<PgRepo>, id: Uuid) -> EndpResult<Vec<AssignedJob>> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
let agent = db.get_agent(uid)?; let agent = db.get_agent(id)?;
match agent { match agent {
Some(mut agent) => { Some(mut agent) => {
agent.touch(); agent.touch();
db.update_agent(&agent)?; db.update_agent(&agent)?;
} }
None => { None => {
let new_agent = Agent::with_id(uid); let new_agent = Agent::with_id(id);
db.insert_agent(&new_agent)?; db.insert_agent(&new_agent)?;
@ -76,11 +76,11 @@ impl Endpoints {
.find_job_by_alias("agent_hello")? .find_job_by_alias("agent_hello")?
.expect("agent_hello job not found"); .expect("agent_hello job not found");
db.set_jobs_for_agent(uid, &[job.id])?; db.set_jobs_for_agent(id, &[job.id])?;
} }
} }
let assigned_jobs = db.get_exact_jobs(Some(uid), true)?; let assigned_jobs = db.get_exact_jobs(Some(id), true)?;
for job in &assigned_jobs { for job in &assigned_jobs {
db.update_job_status(job.id, JobState::Running)?; db.update_job_status(job.id, JobState::Running)?;
@ -94,10 +94,9 @@ impl Endpoints {
pub async fn upload_jobs( pub async fn upload_jobs(
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
msg: BaseMessage<'static, Vec<ValidJobMeta>>, msg: Vec<ValidJobMeta>,
) -> EndpResult<Vec<Uuid>> { ) -> EndpResult<Vec<Uuid>> {
let jobs = msg let jobs = msg
.into_inner()
.into_iter() .into_iter()
.map(|meta| Ok(fat_meta_to_thin(meta)?)) .map(|meta| Ok(fat_meta_to_thin(meta)?))
.collect::<Result<Vec<ThinJobMeta>, Error>>()?; .collect::<Result<Vec<ThinJobMeta>, Error>>()?;
@ -107,11 +106,11 @@ impl Endpoints {
.map_err(From::from) .map_err(From::from)
} }
pub async fn del(repo: Arc<PgRepo>, uid: Uuid) -> EndpResult<usize> { pub async fn del(repo: Arc<PgRepo>, id: Uuid) -> EndpResult<usize> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results]; let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns { for del_fn in del_fns {
let affected = del_fn(&mut db, &[uid])?; let affected = del_fn(&mut db, &[id])?;
if affected > 0 { if affected > 0 {
return Ok(affected); return Ok(affected);
} }
@ -124,12 +123,11 @@ impl Endpoints {
pub async fn set_jobs( pub async fn set_jobs(
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
agent_uid: Uuid, agent_id: Uuid,
job_idents: BaseMessage<'static, Vec<String>>, job_idents: Vec<String>,
) -> EndpResult<Vec<Uuid>> { ) -> EndpResult<Vec<Uuid>> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
job_idents job_idents
.into_inner()
.into_iter() .into_iter()
.map(|ident| { .map(|ident| {
Uuid::parse_str(&ident).or_else(|_| { Uuid::parse_str(&ident).or_else(|_| {
@ -146,7 +144,7 @@ impl Endpoints {
}) })
}) })
.collect::<Result<Vec<Uuid>, Error>>() .collect::<Result<Vec<Uuid>, Error>>()
.and_then(|j| db.set_jobs_for_agent(agent_uid, &j)) .and_then(|j| db.set_jobs_for_agent(agent_id, &j))
}) })
.await .await
.map_err(From::from) .map_err(From::from)
@ -154,27 +152,30 @@ impl Endpoints {
pub async fn report<Data: OneOrVec<Reportable> + AsMsg + Send + Sync + 'static>( pub async fn report<Data: OneOrVec<Reportable> + AsMsg + Send + Sync + 'static>(
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
msg: BaseMessage<'static, Data>, msg: Data,
agent_id: Uuid
) -> EndpResult<()> { ) -> EndpResult<()> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
let id = msg.id; for entry in msg.into_vec() {
for entry in msg.into_inner().into_vec() {
match entry { match entry {
Reportable::Assigned(mut result) => { Reportable::Assigned(mut result) => {
let result_agent_id = &result.agent_id; let result_agent_id = &result.agent_id;
if id != *result_agent_id { if agent_id != *result_agent_id {
warn!("Ids are not equal! actual id: {id}, id from job: {result_agent_id}"); warn!("Agent ids are not equal! actual id: {agent_id}, id from job: {result_agent_id}");
continue; continue;
} }
result.state = JobState::Finished; result.state = JobState::Finished;
result.touch(); result.touch();
info!("agent {agent_id} updated job {}", result.id);
match result.exec_type { match result.exec_type {
JobType::Init => match &result.result { JobType::Init => match &result.result {
Some(rbytes) => { Some(rbytes) => {
let mut agent: Agent = match serde_json::from_slice(&rbytes) { let mut agent: Agent = match serde_json::from_slice(&rbytes) {
Ok(a) => a, Ok(a) => a,
Err(e) => { Err(e) => {
error!("Error deserializing agent from {id}: {e}"); error!("Error deserializing agent data from {agent_id}: {e}");
continue; continue;
} }
}; };
@ -191,7 +192,7 @@ impl Endpoints {
db.update_result(&result)?; db.update_result(&result)?;
} }
Reportable::Error(e) => { Reportable::Error(e) => {
error!("agent {id} reported: {e}"); error!("agent {agent_id} reported: {e}");
} }
Reportable::Dummy => (), Reportable::Dummy => (),
}} }}
@ -203,19 +204,18 @@ impl Endpoints {
pub async fn update_agent( pub async fn update_agent(
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
agent: BaseMessage<'static, Agent>, agent: Agent,
) -> EndpResult<()> { ) -> EndpResult<()> {
repo.interact(move |mut db| db.update_agent(&agent.into_inner())) repo.interact(move |mut db| db.update_agent(&agent))
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn update_job( pub async fn update_job(
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
job: BaseMessage<'static, ValidJobMeta>, job: ValidJobMeta,
) -> EndpResult<()> { ) -> EndpResult<()> {
let meta = job.into_inner(); let thin_meta = fat_meta_to_thin(job).map_err(Error::from)?;
let thin_meta = fat_meta_to_thin(meta).map_err(Error::from)?;
repo.interact(move |mut db| db.update_job(&thin_meta)) repo.interact(move |mut db| db.update_job(&thin_meta))
.await?; .await?;
Ok(()) Ok(())
@ -223,14 +223,14 @@ impl Endpoints {
pub async fn update_assigned_job( pub async fn update_assigned_job(
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
assigned: BaseMessage<'static, AssignedJob>, assigned: AssignedJob,
) -> EndpResult<()> { ) -> EndpResult<()> {
repo.interact(move |mut db| db.update_result(&assigned.into_inner())) repo.interact(move |mut db| db.update_result(&assigned))
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn download(_file_uid: String) -> EndpResult<Vec<u8>> { pub async fn download(_file_id: String) -> EndpResult<Vec<u8>> {
todo!() todo!()
} }
} }

@ -11,13 +11,12 @@ mod handlers;
use db::PgRepo; use db::PgRepo;
use error::{Error as ServerError, RejResponse}; use error::{Error as ServerError, RejResponse};
use serde::de::DeserializeOwned;
use std::{convert::Infallible, sync::Arc}; use std::{convert::Infallible, sync::Arc};
use u_lib::{ use u_lib::{
config, config,
db::async_pool, db::async_pool,
jobs::fat_meta_to_thin, jobs::fat_meta_to_thin,
messaging::{AsMsg, BaseMessage, Reportable}, messaging::{AsMsg, Reportable},
models::*, models::*,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -32,15 +31,8 @@ use crate::handlers::Endpoints;
type ValidJobMeta = FatJobMeta<true>; type ValidJobMeta = FatJobMeta<true>;
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::json::<BaseMessage<M>>()
}
fn into_message<M: AsMsg>(msg: M) -> Json { fn into_message<M: AsMsg>(msg: M) -> Json {
json(&msg.as_message()) json(&msg)
} }
pub fn init_endpoints( pub fn init_endpoints(
@ -48,7 +40,7 @@ pub fn init_endpoints(
db: PgRepo, db: PgRepo,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { ) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let path = |p: &'static str| warp::post().and(warp::path(p)); let path = |p: &'static str| warp::post().and(warp::path(p));
let infallible_none = |_| async { Ok::<_, Infallible>((None::<Uuid>,)) }; let infallible_none = |_| async { Result::<(Option<Uuid>,), Infallible>::Ok((None,)) };
let with_db = { let with_db = {
let adb = Arc::new(db); let adb = Arc::new(db);
@ -67,7 +59,7 @@ pub fn init_endpoints(
let upload_jobs = path("upload_jobs") let upload_jobs = path("upload_jobs")
.and(with_db.clone()) .and(with_db.clone())
.and(get_content::<Vec<ValidJobMeta>>()) .and(body::json::<Vec<ValidJobMeta>>())
.and_then(Endpoints::upload_jobs) .and_then(Endpoints::upload_jobs)
.map(into_message); .map(into_message);
@ -107,31 +99,32 @@ pub fn init_endpoints(
let set_jobs = path("set_jobs") let set_jobs = path("set_jobs")
.and(with_db.clone()) .and(with_db.clone())
.and(warp::path::param::<Uuid>()) .and(warp::path::param::<Uuid>())
.and(get_content::<Vec<String>>()) .and(body::json::<Vec<String>>())
.and_then(Endpoints::set_jobs) .and_then(Endpoints::set_jobs)
.map(into_message); .map(into_message);
let report = path("report") let report = path("report")
.and(with_db.clone()) .and(with_db.clone())
.and(get_content::<Vec<Reportable>>()) .and(body::json::<Vec<Reportable>>())
.and(warp::header("User-Agent"))
.and_then(Endpoints::report) .and_then(Endpoints::report)
.map(ok); .map(ok);
let update_agent = path("update_agent") let update_agent = path("update_agent")
.and(with_db.clone()) .and(with_db.clone())
.and(get_content::<Agent>()) .and(body::json::<Agent>())
.and_then(Endpoints::update_agent) .and_then(Endpoints::update_agent)
.map(ok); .map(ok);
let update_job = path("update_job") let update_job = path("update_job")
.and(with_db.clone()) .and(with_db.clone())
.and(get_content::<ValidJobMeta>()) .and(body::json::<ValidJobMeta>())
.and_then(Endpoints::update_job) .and_then(Endpoints::update_job)
.map(ok); .map(ok);
let update_assigned_job = path("update_result") let update_assigned_job = path("update_result")
.and(with_db.clone()) .and(with_db.clone())
.and(get_content::<AssignedJob>()) .and(body::json::<AssignedJob>())
.and_then(Endpoints::update_assigned_job) .and_then(Endpoints::update_assigned_job)
.map(ok); .map(ok);
@ -226,11 +219,11 @@ async fn handle_rejection(rej: Rejection) -> Result<Response, Infallible> {
fn logger(info: Info<'_>) { fn logger(info: Info<'_>) {
info!(target: "warp", info!(target: "warp",
"{raddr} {agent_uid} \"{path}\" {status}", "{raddr} {agent_id} \"{path}\" {status}",
raddr = info.remote_addr().unwrap_or(([0, 0, 0, 0], 0).into()), raddr = info.remote_addr().unwrap_or(([0, 0, 0, 0], 0).into()),
path = info.path(), path = info.path(),
agent_uid = info.user_agent() agent_id = info.user_agent()
.map(|uid: &str| uid.splitn(3, '-') .map(|id: &str| id.splitn(3, '-')
.take(2) .take(2)
.collect::<String>() .collect::<String>()
) )

@ -1,3 +1,3 @@
FROM centos:7 FROM alpine:3.17
RUN yum update -y RUN apk add bash

@ -1,3 +1,3 @@
FROM alpine:latest FROM alpine:3.17
RUN apk add iproute2 bash RUN apk add iproute2 bash

@ -1,28 +1,28 @@
use crate::helpers::ENV; use crate::helpers::ENV;
use u_lib::{ use u_lib::{
api::ClientHandler, config::get_self_uid, jobs::fat_meta_to_thin, messaging::Reportable, api::ClientHandler, config::get_self_id, jobs::fat_meta_to_thin, messaging::Reportable,
models::*, models::*,
}; };
use uuid::Uuid; use uuid::Uuid;
pub struct RegisteredAgent { pub struct RegisteredAgent {
pub uid: Uuid, pub id: Uuid,
} }
impl RegisteredAgent { impl RegisteredAgent {
pub async fn unregister(self) { pub async fn unregister(self) {
let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap(); let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap();
cli.del(self.uid).await.unwrap(); cli.del(self.id).await.unwrap();
} }
} }
#[fixture] #[fixture]
pub async fn register_agent() -> RegisteredAgent { pub async fn register_agent() -> RegisteredAgent {
let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap(); let cli = ClientHandler::new(&ENV.u_server, None).await.unwrap();
let agent_uid = get_self_uid(); let agent_id = get_self_id();
println!("registering agent {agent_uid}"); println!("registering agent {agent_id}");
let resp = cli let resp = cli
.get_personal_jobs(agent_uid) .get_personal_jobs(agent_id)
.await .await
.unwrap() .unwrap()
.pop() .pop()
@ -31,7 +31,7 @@ pub async fn register_agent() -> RegisteredAgent {
let job = cli.get_job(job_id).await.unwrap(); let job = cli.get_job(job_id).await.unwrap();
assert_eq!(job.alias, Some("agent_hello".to_string())); assert_eq!(job.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from((&fat_meta_to_thin(job).unwrap(), resp)); let mut agent_data = AssignedJob::from((&fat_meta_to_thin(job).unwrap(), resp));
agent_data.set_result(&Agent::with_id(agent_uid)); agent_data.set_result(&Agent::with_id(agent_id));
cli.report(Reportable::Assigned(agent_data)).await.unwrap(); cli.report(Reportable::Assigned(agent_data)).await.unwrap();
RegisteredAgent { uid: agent_uid } RegisteredAgent { id: agent_id }
} }

@ -0,0 +1,22 @@
use std::{fmt::Debug, time::Duration};
use tokio::time::sleep;
pub async fn retry_with_interval<T, E: Debug>(
retries: usize,
interval: Duration,
f: impl Fn() -> Result<T, E>,
) -> T {
let mut err = String::new();
for i in 0..retries {
eprintln!("retrier: {} attempt...", i + 1);
let result = f();
match result {
Ok(r) => return r,
Err(e) => {
err = format!("{e:?}");
sleep(interval).await;
}
}
}
panic!("{err}");
}

@ -1,3 +1,4 @@
pub mod jobs;
pub mod panel; pub mod panel;
pub use panel::Panel; pub use panel::Panel;

@ -1,10 +1,9 @@
use crate::fixtures::agent::*; use crate::fixtures::agent::*;
use crate::helpers::Panel; use crate::helpers::{jobs::retry_with_interval, Panel};
use rstest::rstest; use rstest::rstest;
use serde_json::{json, to_string}; use serde_json::{json, to_string};
use std::time::Duration; use u_lib::config::AGENT_ITERATION_INTERVAL;
use tokio::time::sleep;
use u_lib::models::*; use u_lib::models::*;
use uuid::Uuid; use uuid::Uuid;
@ -13,15 +12,15 @@ use uuid::Uuid;
async fn registration(#[future] register_agent: RegisteredAgent) { async fn registration(#[future] register_agent: RegisteredAgent) {
let agent = register_agent.await; let agent = register_agent.await;
let agents: Vec<Agent> = Panel::check_output("agents read"); let agents: Vec<Agent> = Panel::check_output("agents read");
let found = agents.iter().find(|v| v.id == agent.uid); let found = agents.iter().find(|v| v.id == agent.id);
assert!(found.is_some()); assert!(found.is_some());
Panel::check_status(format!("agents delete {}", agent.uid)); Panel::check_status(format!("agents delete {}", agent.id));
} }
#[tokio::test] #[tokio::test]
async fn setup_tasks() { async fn setup_tasks() {
let agents: Vec<Agent> = Panel::check_output("agents read"); let agents: Vec<Agent> = Panel::check_output("agents read");
let agent_uid = agents[0].id; let agent_id = agents[0].id;
let job_alias = "passwd_contents"; let job_alias = "passwd_contents";
let job = json!( let job = json!(
{"alias": job_alias, "payload": b"cat /etc/passwd", "argv": "/bin/bash {}" } {"alias": job_alias, "payload": b"cat /etc/passwd", "argv": "/bin/bash {}" }
@ -29,51 +28,52 @@ async fn setup_tasks() {
Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]); Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]);
let cmd = format!("map create {} {}", agent_uid, job_alias); let cmd = format!("map create {} {}", agent_id, job_alias);
let assigned_uids: Vec<Uuid> = Panel::check_output(cmd); let assigned_ids: Vec<Uuid> = Panel::check_output(cmd);
for _ in 0..3 { retry_with_interval(3, AGENT_ITERATION_INTERVAL, || {
let result = let result =
Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", assigned_uids[0])) Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", assigned_ids[0]))
.remove(0); .remove(0);
if result.state == JobState::Finished { if result.state == JobState::Finished {
assert!(result.to_str_result().contains("root:x:0:0::/root")); eprintln!("{}", result.to_str_result());
assert!(result.to_str_result().contains("root:x:0:0"));
Ok(())
} else { } else {
sleep(Duration::from_secs(5)).await; Err("job didn't finish")
eprintln!("waiting for task");
} }
} })
panic!("Job didn't appear in the job map"); .await;
} }
#[tokio::test] #[tokio::test]
async fn large_payload() { async fn large_payload() {
let agent_uid = Panel::check_output::<Vec<Agent>>("agents read")[0].id; let agent_id = Panel::check_output::<Vec<Agent>>("agents read")[0].id;
let job_alias = "large_payload"; let job_alias = "large_payload";
let job = FatJobMeta::builder() let job = FatJobMeta::builder()
.with_alias(job_alias) .with_alias(job_alias)
.with_payload_path("./tests/bin/echoer") .with_payload_path("./tests/bin/echoer")
.with_shell("{} 'type echo'") .with_shell("{} type echo")
.build() .build()
.unwrap(); .unwrap();
Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]); Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]);
let cmd = format!("map create {agent_uid} {job_alias}"); let cmd = format!("map create {agent_id} {job_alias}");
let assigned_uids: Vec<Uuid> = Panel::check_output(cmd); let assigned_ids: Vec<Uuid> = Panel::check_output(cmd);
for _ in 0..3 { retry_with_interval(3, AGENT_ITERATION_INTERVAL, || {
let result = let result =
Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", assigned_uids[0])) Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", assigned_ids[0]))
.remove(0); .remove(0);
if result.state == JobState::Finished { if result.state == JobState::Finished {
assert_eq!(result.to_str_result(), "type echo"); assert_eq!(result.to_str_result(), "type echo\n");
Ok(())
} else { } else {
sleep(Duration::from_secs(5)).await; Err("job didn't finish")
eprintln!("waiting for task");
} }
} })
panic!("Job didn't appear in the job map"); .await;
} }

@ -9,9 +9,9 @@ use serde_json::{from_str, Value};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
config::{get_self_uid, MASTER_PORT}, config::{get_self_id, MASTER_PORT},
conv::opt_to_string, conv::opt_to_string,
messaging::{self, AsMsg, BaseMessage}, messaging::{self, AsMsg},
misc::OneOrVec, misc::OneOrVec,
models::{self}, models::{self},
UError, UResult, UError, UResult,
@ -30,7 +30,7 @@ impl ClientHandler {
pub async fn new(server: &str, password: Option<String>) -> UResult<Self> { pub async fn new(server: &str, password: Option<String>) -> UResult<Self> {
let identity = Identity::from_pkcs12_der(AGENT_IDENTITY, "").unwrap(); let identity = Identity::from_pkcs12_der(AGENT_IDENTITY, "").unwrap();
let mut default_headers = let mut default_headers =
HashMap::from([(header::USER_AGENT, get_self_uid().hyphenated().to_string())]); HashMap::from([(header::USER_AGENT, get_self_id().hyphenated().to_string())]);
if let Some(pwd) = password { if let Some(pwd) = password {
default_headers.insert(header::AUTHORIZATION, format!("Bearer {pwd}")); default_headers.insert(header::AUTHORIZATION, format!("Bearer {pwd}"));
@ -86,7 +86,7 @@ impl ClientHandler {
let request = self let request = self
.client .client
.post(self.base_url.join(url.as_ref()).unwrap()) .post(self.base_url.join(url.as_ref()).unwrap())
.json(&payload.as_message()); .json(&payload);
let response = request let response = request
.send() .send()
@ -100,12 +100,10 @@ impl ClientHandler {
let resp = response.text().await.context("resp")?; let resp = response.text().await.context("resp")?;
let result = match is_success { let result = match is_success {
Ok(_) => from_str::<BaseMessage<R>>(&resp) Ok(_) => from_str::<R>(&resp).or_else(|e| match content_len {
.map(|msg| msg.into_inner()) Some(0) => Ok(Default::default()),
.or_else(|e| match content_len { _ => Err(UError::NetError(e.to_string(), resp)),
Some(0) => Ok(Default::default()), }),
_ => Err(UError::NetError(e.to_string(), resp)),
}),
Err(UError::NetError(err, _)) => Err(UError::NetError(err, resp)), Err(UError::NetError(err, _)) => Err(UError::NetError(err, resp)),
_ => unreachable!(), _ => unreachable!(),
} }

@ -17,20 +17,20 @@ impl JobCache {
JOB_CACHE.write().insert(job_meta.id, job_meta); JOB_CACHE.write().insert(job_meta.id, job_meta);
} }
pub fn contains(uid: Uuid) -> bool { pub fn contains(id: Uuid) -> bool {
JOB_CACHE.read().contains_key(&uid) JOB_CACHE.read().contains_key(&id)
} }
pub fn get<'jh>(uid: Uuid) -> Option<JobCacheHolder<'jh>> { pub fn get<'jh>(id: Uuid) -> Option<JobCacheHolder<'jh>> {
if !Self::contains(uid) { if !Self::contains(id) {
return None; return None;
} }
let lock = JOB_CACHE.read(); let lock = JOB_CACHE.read();
Some(JobCacheHolder(lock, uid)) Some(JobCacheHolder(lock, id))
} }
pub fn remove(uid: Uuid) { pub fn remove(id: Uuid) {
JOB_CACHE.write().remove(&uid); JOB_CACHE.write().remove(&id);
} }
} }

@ -1,19 +1,22 @@
use envy::{from_env, prefixed, Result as EnvResult}; use envy::{from_env, prefixed, Result as EnvResult};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use serde::Deserialize; use serde::Deserialize;
use std::time::Duration;
use uuid::Uuid; use uuid::Uuid;
pub use envy::Error; pub use envy::Error;
pub const MASTER_PORT: u16 = 63714; pub const MASTER_PORT: u16 = 63714;
pub const AGENT_ITERATION_INTERVAL: Duration = Duration::from_secs(5);
lazy_static! { lazy_static! {
static ref UID: Uuid = Uuid::new_v4(); static ref ID: Uuid = Uuid::new_v4();
} }
#[inline] #[inline]
pub fn get_self_uid() -> Uuid { pub fn get_self_id() -> Uuid {
*UID *ID
} }
#[derive(Deserialize)] #[derive(Deserialize)]

@ -10,12 +10,12 @@ use std::collections::HashMap;
use std::process::exit; use std::process::exit;
use tokio::process::Command; use tokio::process::Command;
pub struct UnnamedJobsBatch { pub struct AnonymousJobBatch {
waiter: Waiter, waiter: Waiter,
is_running: bool, is_running: bool,
} }
impl UnnamedJobsBatch { impl AnonymousJobBatch {
pub fn from_meta_with_id(jobs: impl OneOrVec<(ThinJobMeta, AssignedJobById)>) -> Self { pub fn from_meta_with_id(jobs: impl OneOrVec<(ThinJobMeta, AssignedJobById)>) -> Self {
let jobs = jobs.into_vec(); let jobs = jobs.into_vec();
let mut waiter = Waiter::new(); let mut waiter = Waiter::new();
@ -43,7 +43,7 @@ impl UnnamedJobsBatch {
) )
}) })
.collect(); .collect();
UnnamedJobsBatch::from_meta_with_id(jobs) AnonymousJobBatch::from_meta_with_id(jobs)
} }
/// Spawn jobs /// Spawn jobs
@ -69,6 +69,82 @@ impl UnnamedJobsBatch {
} }
} }
/// Store jobs and get results by name
pub struct NamedJobBatch<const FINISHED: bool = false> {
runner: Option<AnonymousJobBatch>,
job_names: Vec<String>,
results: HashMap<String, ExecResult>,
}
impl NamedJobBatch {
pub fn from_shell(
named_jobs: impl OneOrVec<(&'static str, &'static str)>,
) -> CombinedResult<Self> {
let mut result = CombinedResult::new();
let jobs: Vec<_> = named_jobs
.into_vec()
.into_iter()
.filter_map(|(alias, cmd)| {
match FatJobMeta::builder()
.with_shell(cmd)
.with_alias(alias)
.build()
{
Ok(fat_meta) => match fat_meta_to_thin(fat_meta) {
Ok(thin_meta) => Some(thin_meta),
Err(e) => {
result.err(e);
None
}
},
Err(e) => {
result.err(e);
None
}
}
})
.collect();
result.ok(Self::from_meta(jobs));
result
}
pub fn from_meta(named_jobs: impl OneOrVec<ThinJobMeta>) -> Self {
let (job_names, job_metas): (Vec<_>, Vec<_>) = named_jobs
.into_vec()
.into_iter()
.map(|meta| (meta.alias.clone().unwrap(), meta))
.unzip();
Self {
runner: Some(AnonymousJobBatch::from_meta(job_metas)),
job_names,
results: HashMap::new(),
}
}
pub async fn wait(mut self) -> NamedJobBatch<true> {
let results = self.runner.take().unwrap().wait().await;
for (name, result) in self.job_names.into_iter().zip(results.into_iter()) {
self.results.insert(name, result);
}
NamedJobBatch::<true> {
runner: None,
job_names: vec![],
results: self.results,
}
}
}
impl NamedJobBatch<true> {
pub fn pop_opt(&mut self, name: &'static str) -> Option<ExecResult> {
self.results.remove(name)
}
pub fn pop(&mut self, name: &'static str) -> ExecResult {
self.pop_opt(name).unwrap()
}
}
pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecResult { pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecResult {
let mut job = AssignedJob::from((&meta, ids)); let mut job = AssignedJob::from((&meta, ids));
match meta.exec_type { match meta.exec_type {
@ -79,7 +155,7 @@ pub async fn run_assigned_job(meta: ThinJobMeta, ids: AssignedJobById) -> ExecRe
let argv_with_exec = meta.argv.replace("{}", &prep_exec_path); let argv_with_exec = meta.argv.replace("{}", &prep_exec_path);
(argv_with_exec, Some(prep_exec)) (argv_with_exec, Some(prep_exec))
} else { } else {
(meta.argv.clone(), None) (meta.argv, None)
} }
}; };
@ -156,87 +232,11 @@ pub fn thin_meta_to_fat(meta: ThinJobMeta) -> Result<FatJobMeta<true>, ufs::Erro
}) })
} }
/// Store jobs and get results by name
pub struct NamedJobsBatch<const FINISHED: bool = false> {
runner: Option<UnnamedJobsBatch>,
job_names: Vec<String>,
results: HashMap<String, ExecResult>,
}
impl NamedJobsBatch {
pub fn from_shell(
named_jobs: impl OneOrVec<(&'static str, &'static str)>,
) -> CombinedResult<Self> {
let mut result = CombinedResult::new();
let jobs: Vec<_> = named_jobs
.into_vec()
.into_iter()
.filter_map(|(alias, cmd)| {
match FatJobMeta::builder()
.with_shell(cmd)
.with_alias(alias)
.build()
{
Ok(fat_meta) => match fat_meta_to_thin(fat_meta) {
Ok(thin_meta) => Some(thin_meta),
Err(e) => {
result.err(e);
None
}
},
Err(e) => {
result.err(e);
None
}
}
})
.collect();
result.ok(Self::from_meta(jobs));
result
}
pub fn from_meta(named_jobs: impl OneOrVec<ThinJobMeta>) -> Self {
let (job_names, job_metas): (Vec<_>, Vec<_>) = named_jobs
.into_vec()
.into_iter()
.map(|meta| (meta.alias.clone().unwrap(), meta))
.unzip();
Self {
runner: Some(UnnamedJobsBatch::from_meta(job_metas)),
job_names,
results: HashMap::new(),
}
}
pub async fn wait(mut self) -> NamedJobsBatch<true> {
let results = self.runner.take().unwrap().wait().await;
for (name, result) in self.job_names.into_iter().zip(results.into_iter()) {
self.results.insert(name, result);
}
NamedJobsBatch::<true> {
runner: None,
job_names: vec![],
results: self.results,
}
}
}
impl NamedJobsBatch<true> {
pub fn pop_opt(&mut self, name: &'static str) -> Option<ExecResult> {
self.results.remove(name)
}
pub fn pop(&mut self, name: &'static str) -> ExecResult {
self.pop_opt(name).unwrap()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{ use crate::{
jobs::{NamedJobsBatch, UnnamedJobsBatch}, jobs::{AnonymousJobBatch, NamedJobBatch},
models::{misc::JobType, FatJobMeta}, models::{misc::JobType, FatJobMeta},
unwrap_enum, UError, unwrap_enum, UError,
}; };
@ -253,7 +253,7 @@ mod tests {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let now = SystemTime::now(); let now = SystemTime::now();
UnnamedJobsBatch::from_meta(sleep_jobs).wait().await; AnonymousJobBatch::from_meta(sleep_jobs).wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
} }
@ -292,7 +292,7 @@ mod tests {
job = job.with_payload(p); job = job.with_payload(p);
} }
let job = fat_meta_to_thin(job.build().unwrap()).unwrap(); let job = fat_meta_to_thin(job.build().unwrap()).unwrap();
let result = UnnamedJobsBatch::from_meta(job).wait_one().await.unwrap(); let result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap();
let result = result.to_str_result(); let result = result.to_str_result();
assert_eq!(result.trim(), expected_result); assert_eq!(result.trim(), expected_result);
Ok(()) Ok(())
@ -303,10 +303,10 @@ mod tests {
const SLEEP_SECS: u64 = 1; const SLEEP_SECS: u64 = 1;
let now = SystemTime::now(); let now = SystemTime::now();
let longest_job = FatJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); let longest_job = FatJobMeta::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = UnnamedJobsBatch::from_meta(fat_meta_to_thin(longest_job).unwrap()) let longest_job = AnonymousJobBatch::from_meta(fat_meta_to_thin(longest_job).unwrap())
.spawn() .spawn()
.await; .await;
let ls = UnnamedJobsBatch::from_meta( let ls = AnonymousJobBatch::from_meta(
fat_meta_to_thin(FatJobMeta::from_shell("ls").unwrap()).unwrap(), fat_meta_to_thin(FatJobMeta::from_shell("ls").unwrap()).unwrap(),
) )
.wait_one() .wait_one()
@ -321,7 +321,7 @@ mod tests {
.map(|f| fat_meta_to_thin(FatJobMeta::from_shell(format!("ls {f}")).unwrap()).unwrap()) .map(|f| fat_meta_to_thin(FatJobMeta::from_shell(format!("ls {f}")).unwrap()).unwrap())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let ls_subfolders = UnnamedJobsBatch::from_meta(subfolders_jobs).wait().await; let ls_subfolders = AnonymousJobBatch::from_meta(subfolders_jobs).wait().await;
for result in ls_subfolders { for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0); assert_eq!(result.unwrap().retcode.unwrap(), 0);
@ -352,7 +352,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_failing_shell_job() -> TestResult { async fn test_failing_shell_job() -> TestResult {
let job = fat_meta_to_thin(FatJobMeta::from_shell("lol_kek_puk").unwrap()).unwrap(); let job = fat_meta_to_thin(FatJobMeta::from_shell("lol_kek_puk").unwrap()).unwrap();
let job_result = UnnamedJobsBatch::from_meta(job).wait_one().await.unwrap(); let job_result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap();
let output = job_result.to_str_result(); let output = job_result.to_str_result();
assert!(output.contains("No such file")); assert!(output.contains("No such file"));
assert!(job_result.retcode.is_none()); assert!(job_result.retcode.is_none());
@ -380,7 +380,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_different_job_types() -> TestResult { async fn test_different_job_types() -> TestResult {
let mut jobs = NamedJobsBatch::from_meta( let mut jobs = NamedJobBatch::from_meta(
[ [
FatJobMeta::builder() FatJobMeta::builder()
.with_shell("sleep 3") .with_shell("sleep 3")

@ -1,56 +0,0 @@
use crate::config::get_self_uid;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt::Debug};
use uuid::Uuid;
pub struct Moo<'cow, T: AsMsg + Clone>(pub Cow<'cow, T>);
pub trait AsMsg: Clone + Serialize + Debug {
fn as_message(&self) -> BaseMessage<'_, Self> {
BaseMessage::new(self)
}
}
impl<'cow, M: AsMsg> From<M> for Moo<'cow, M> {
#[inline]
fn from(obj: M) -> Moo<'cow, M> {
Moo(Cow::Owned(obj))
}
}
impl<'cow, M: AsMsg> From<&'cow M> for Moo<'cow, M> {
#[inline]
fn from(obj: &'cow M) -> Moo<'cow, M> {
Moo(Cow::Borrowed(obj))
}
}
impl<M: AsMsg> AsMsg for Vec<M> {}
impl<'msg, M: AsMsg> AsMsg for &'msg [M] {}
#[derive(Serialize, Deserialize, Debug)]
pub struct BaseMessage<'cow, I: AsMsg> {
pub id: Uuid,
inner: Cow<'cow, I>,
}
impl<'cow, I: AsMsg> BaseMessage<'cow, I> {
pub fn new<C>(inner: C) -> Self
where
C: Into<Moo<'cow, I>>,
{
let Moo(inner) = inner.into();
Self {
id: get_self_uid(),
inner,
}
}
pub fn into_inner(self) -> I {
self.inner.into_owned()
}
pub fn as_inner(&self) -> &I {
self.inner.as_ref()
}
}

@ -1,13 +1,14 @@
mod base;
mod files; mod files;
use crate::models::*; use crate::models::*;
use crate::UError; use crate::UError;
pub use base::{AsMsg, BaseMessage};
pub use files::*; pub use files::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use uuid::Uuid; use uuid::Uuid;
pub trait AsMsg: Clone + Serialize + Debug {}
impl AsMsg for Agent {} impl AsMsg for Agent {}
impl AsMsg for AssignedJob {} impl AsMsg for AssignedJob {}
impl AsMsg for AssignedJobById {} impl AsMsg for AssignedJobById {}
@ -21,6 +22,9 @@ impl AsMsg for i32 {}
impl AsMsg for u8 {} impl AsMsg for u8 {}
impl AsMsg for () {} impl AsMsg for () {}
impl<M: AsMsg> AsMsg for Vec<M> {}
impl<'msg, M: AsMsg> AsMsg for &'msg [M] {}
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Reportable { pub enum Reportable {
Assigned(AssignedJob), Assigned(AssignedJob),

@ -11,7 +11,7 @@ mod server {
#[cfg(feature = "server")] #[cfg(feature = "server")]
use self::server::*; use self::server::*;
use crate::{config::get_self_uid, executor::ExecResult, jobs::NamedJobsBatch, platform::Platform}; use crate::{config::get_self_id, executor::ExecResult, jobs::NamedJobBatch, platform::Platform};
use uuid::Uuid; use uuid::Uuid;
@ -52,9 +52,9 @@ pub struct Agent {
} }
impl Agent { impl Agent {
pub fn with_id(uid: Uuid) -> Self { pub fn with_id(id: Uuid) -> Self {
Self { Self {
id: uid, id,
..Default::default() ..Default::default()
} }
} }
@ -80,7 +80,7 @@ impl Agent {
("username", "id -un"), ("username", "id -un"),
]; ];
let mut builder = NamedJobsBatch::from_shell(cmds).unwrap_one().wait().await; let mut builder = NamedJobBatch::from_shell(cmds).unwrap_one().wait().await;
let decoder = let decoder =
|job_result: ExecResult| job_result.unwrap().to_str_result().trim().to_string(); |job_result: ExecResult| job_result.unwrap().to_str_result().trim().to_string();
@ -103,7 +103,7 @@ impl Default for Agent {
fn default() -> Self { fn default() -> Self {
Self { Self {
alias: None, alias: None,
id: get_self_uid(), id: get_self_id(),
hostname: String::new(), hostname: String::new(),
host_info: String::new(), host_info: String::new(),
is_root: false, is_root: false,

@ -1,5 +1,5 @@
use super::{JobState, JobType, ThinJobMeta}; use super::{JobState, JobType, ThinJobMeta};
use crate::config::get_self_uid; use crate::config::get_self_id;
#[cfg(feature = "server")] #[cfg(feature = "server")]
use crate::models::schema::*; use crate::models::schema::*;
#[cfg(feature = "server")] #[cfg(feature = "server")]
@ -50,7 +50,7 @@ impl From<(&ThinJobMeta, AssignedJobById)> for AssignedJob {
impl Default for AssignedJobById { impl Default for AssignedJobById {
fn default() -> Self { fn default() -> Self {
Self { Self {
agent_id: get_self_uid(), agent_id: get_self_id(),
id: Uuid::new_v4(), id: Uuid::new_v4(),
job_id: Uuid::nil(), job_id: Uuid::nil(),
} }

@ -19,7 +19,6 @@ impl Payload {
let data = ufs::read(ident)?; let data = ufs::read(ident)?;
*self = Payload::Data(data); *self = Payload::Data(data);
Ok(()) Ok(())
} }
} }
@ -32,7 +31,6 @@ impl Payload {
ufs::put(&name, data)?; ufs::put(&name, data)?;
*self = Payload::Ident(name.as_ref().to_string()); *self = Payload::Ident(name.as_ref().to_string());
Ok(()) Ok(())
} }
} }

@ -152,7 +152,7 @@ pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error
loop { loop {
let bytes_read = payload_src.read(&mut buffer)?; let bytes_read = payload_src.read(&mut buffer)?;
payload_dest.write(&buffer)?; payload_dest.write(&buffer[..bytes_read])?;
if bytes_read != BUFFER_LEN { if bytes_read != BUFFER_LEN {
break; break;
@ -164,3 +164,11 @@ pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error
Err(e) => Err(Error::new(e, FAKE_EXEC_NAME)), Err(e) => Err(Error::new(e, FAKE_EXEC_NAME)),
} }
} }
pub fn cleanup() {
let index = INDEX.read();
index.values().for_each(|f| {
fs::remove_file(&f.path).ok();
});
}

@ -1,5 +1,5 @@
use std::env; use std::env;
fn main() { fn main() {
println!("{}", env::args().nth(1).unwrap_or(String::new())); println!("{}", env::args().skip(1).collect::<Vec<String>>().join(" "));
} }

Loading…
Cancel
Save