make assigning jobs work again

pull/9/head
plazmoid 1 year ago
parent 886d4833fb
commit e2ba50d947
  1. 16
      Cargo.lock
  2. 38
      bin/u_panel/src/argparse.rs
  3. 14
      bin/u_panel/src/gui/fe/src/app/components/dialogs/assign-job-dialog/assign-job-dialog.component.ts
  4. 5
      bin/u_panel/src/gui/fe/src/app/models/result.model.ts
  5. 4
      bin/u_panel/src/gui/fe/src/app/services/api.service.ts
  6. 77
      bin/u_server/src/db.rs
  7. 2
      bin/u_server/src/error.rs
  8. 51
      bin/u_server/src/handlers.rs
  9. 11
      bin/u_server/src/u_server.rs
  10. 4
      deploy/podman-compose.yml
  11. 4
      integration-tests/docker-compose.yml
  12. 26
      integration-tests/tests/integration_tests/behaviour.rs
  13. 2
      integration-tests/tests/integration_tests/endpoints.rs
  14. 20
      lib/u_lib/src/api.rs
  15. 5
      lib/u_lib/src/config.rs
  16. 1
      lib/u_lib/src/models/jobs/assigned.rs
  17. 28
      lib/u_lib/src/models/payload.rs

16
Cargo.lock generated

@ -391,9 +391,9 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
[[package]]
name = "byteorder"
version = "1.4.3"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
@ -1287,9 +1287,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.148"
version = "0.2.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b"
[[package]]
name = "libflate"
@ -1721,9 +1721,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
version = "1.0.67"
version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c"
dependencies = [
"unicode-ident",
]
@ -3188,9 +3188,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "winnow"
version = "0.5.15"
version = "0.5.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc"
checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907"
dependencies = [
"memchr",
]

@ -16,7 +16,7 @@ pub struct Args {
enum Cmd {
Agents(RUD),
Jobs(CRUD),
Map(MapCRUD),
Map(AssignedCRUD),
Payloads(PayloadCRUD),
Ping,
Serve,
@ -32,12 +32,9 @@ enum CRUD {
}
#[derive(StructOpt, Debug)]
enum MapCRUD {
enum AssignedCRUD {
Create {
#[structopt(parse(try_from_str = parse_uuid))]
agent_id: Id,
job_idents: Vec<String>,
item: String,
},
#[structopt(flatten)]
RUD(RUD),
@ -55,7 +52,7 @@ enum PayloadCRUD {
item: String,
},
Delete {
#[structopt(parse(try_from_str = parse_uuid))]
#[structopt(parse(try_from_str = parse::uuid))]
id: Id,
},
}
@ -63,20 +60,24 @@ enum PayloadCRUD {
#[derive(StructOpt, Debug)]
enum RUD {
Read {
#[structopt(parse(try_from_str = parse_uuid))]
#[structopt(parse(try_from_str = parse::uuid))]
id: Option<Id>,
},
Update {
item: String,
},
Delete {
#[structopt(parse(try_from_str = parse_uuid))]
#[structopt(parse(try_from_str = parse::uuid))]
id: Id,
},
}
fn parse_uuid(src: &str) -> Result<Id, String> {
mod parse {
use super::*;
pub fn uuid(src: &str) -> Result<Id, String> {
Id::parse_str(src).map_err(|e| e.to_string())
}
}
pub fn into_value<M: AsMsg>(data: M) -> Value {
@ -125,17 +126,20 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
CRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
},
Cmd::Map(action) => match action {
MapCRUD::Create {
agent_id,
job_idents,
} => into_value(client.set_jobs(agent_id, &job_idents).await?),
MapCRUD::RUD(RUD::Read { id }) => into_value(client.get_assigned_jobs(id).await?),
MapCRUD::RUD(RUD::Update { item }) => {
AssignedCRUD::Create { item } => {
let payload = serde_json::from_str::<Vec<AssignedJobById>>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?;
into_value(client.assign_jobs(&payload).await?)
}
AssignedCRUD::RUD(RUD::Read { id }) => {
into_value(client.get_assigned_jobs(id).await?)
}
AssignedCRUD::RUD(RUD::Update { item }) => {
let assigned = from_str::<AssignedJob>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?;
into_value(client.update_result(&assigned).await?)
}
MapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
AssignedCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),
},
Cmd::Payloads(action) => match action {
PayloadCRUD::Create { item } => {

@ -1,5 +1,6 @@
import { Component, Inject } from '@angular/core';
import { MAT_DIALOG_DATA } from '@angular/material/dialog';
import { AssignedJobByIdModel } from 'src/app/models';
import { ApiTableService } from '../../../services';
@Component({
@ -21,8 +22,15 @@ export class AssignJobDialogComponent {
}
assignSelectedJobs() {
const job_ids = this.selected_rows.map(row => row.split(' ', 1)[0]).join(' ');
const request = `${this.agent_id} ${job_ids}`
this.dataSource.createResult(request)
const assigned_jobs: AssignedJobByIdModel[] = this.selected_rows.map(row => {
const job_id = row.split(' ', 1)[0];
return {
job_id: job_id,
agent_id: this.agent_id
}
});
this.dataSource.createResult(assigned_jobs).subscribe(_ => {
alert("Created")
});
}
}

@ -11,3 +11,8 @@ export interface ResultModel {
retcode: number | null,
updated: UTCDate,
}
export interface AssignedJobByIdModel {
job_id: string,
agent_id: string
}

@ -1,7 +1,7 @@
import { environment } from 'src/environments/environment';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, map, catchError, throwError } from 'rxjs';
import { ApiModel, PayloadModel, Empty, Area, AgentModel, JobModel, ResultModel, Job, NewPayloadModel } from '../models';
import { ApiModel, PayloadModel, Empty, Area, AgentModel, JobModel, ResultModel, Job, NewPayloadModel, AssignedJobByIdModel } from '../models';
import { Injectable, Inject } from '@angular/core';
import { ErrorService } from './error.service';
@ -115,7 +115,7 @@ export class ApiTableService {
return this.filterErrStatus(this.req(`${area} create '${serialized}'`))
}
createResult(item: string): Observable<string[]> {
createResult(item: AssignedJobByIdModel[]): Observable<string[]> {
return this.create(item, 'map')
}

@ -1,12 +1,14 @@
use crate::error::Error;
use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection};
use std::collections::{HashMap, HashSet};
use std::mem::drop;
use u_lib::{
db::PgAsyncPool,
models::{schema, Agent, AssignedJob, Job, JobMeta, JobState, Payload},
models::{schema, Agent, AssignedJob, AssignedJobById, Job, JobMeta, JobState, Payload},
platform::Platform,
types::Id,
};
use uuid::Uuid;
type Result<T> = std::result::Result<T, Error>;
@ -215,39 +217,72 @@ impl UDB<'_> {
Ok(result)
}
pub fn set_jobs_for_agent(&mut self, agent_id: Id, job_ids: &[Id]) -> Result<()> {
// todo: move to handlers
pub fn assign_jobs(&mut self, assigned_jobs: &[AssignedJobById]) -> Result<()> {
use schema::{jobs, results};
let agent_platform = match self.get_agent(agent_id)? {
struct JobBriefMeta {
alias: Option<String>,
target_platform: String,
}
let assigned_job_ids = HashSet::<Uuid>::from_iter(assigned_jobs.iter().map(|a| a.job_id));
let jobs_meta = HashMap::<Id, JobBriefMeta>::from_iter(
jobs::table
.select((jobs::id, jobs::alias, jobs::target_platforms))
.filter(jobs::id.eq_any(&assigned_job_ids))
.load::<(Id, Option<String>, String)>(self.conn)
.map_err(with_err_ctx(format!(
"Can't find jobs {:?}",
assigned_job_ids
)))?
.into_iter()
.map(|(id, alias, target_platform)| {
(
id,
JobBriefMeta {
alias,
target_platform,
},
)
}),
);
let existing_job_ids = HashSet::from_iter(jobs_meta.keys().copied());
if assigned_job_ids != existing_job_ids {
return Err(Error::ProcessingError(format!(
"Jobs not found: {:?}",
assigned_job_ids.difference(&existing_job_ids),
)));
}
for ajob in assigned_jobs {
let meta = &jobs_meta[&ajob.job_id];
let agent_platform = match self.get_agent(ajob.agent_id)? {
Some(agent) => Platform::new(&agent.platform),
None => {
return Err(Error::ProcessingError(format!(
"Agent {agent_id} not found"
"Agent {} not found",
ajob.agent_id
)))
}
};
let jobs_meta = jobs::table
.select((jobs::id, jobs::alias, jobs::target_platforms))
.filter(jobs::id.eq_any(job_ids))
.load::<(Id, Option<String>, String)>(self.conn)
.map_err(with_err_ctx(format!("Can't find jobs {job_ids:?}")))?;
for meta in &jobs_meta {
if !agent_platform.matches(&meta.2) {
if !agent_platform.matches(&meta.target_platform) {
return Err(Error::InsuitablePlatform(
agent_platform.into_string(),
meta.2.clone(),
meta.target_platform.clone(),
));
}
}
let job_requests = jobs_meta
let job_requests = assigned_jobs
.into_iter()
.map(|(job_id, alias, _)| AssignedJob {
job_id,
agent_id,
alias,
.map(|a| AssignedJob {
job_id: a.job_id,
agent_id: a.agent_id,
alias: jobs_meta[&a.job_id].alias.clone(),
..Default::default()
})
.collect::<Vec<AssignedJob>>();
@ -256,9 +291,7 @@ impl UDB<'_> {
.values(&job_requests)
.execute(self.conn)
.map(drop)
.map_err(with_err_ctx(format!(
"Can't setup jobs {job_ids:?} for agent {agent_id:?}"
)))
.map_err(with_err_ctx("Can't assign jobs"))
}
pub fn del_jobs(&mut self, ids: &[Id]) -> Result<()> {

@ -13,7 +13,7 @@ pub enum Error {
#[error("Configs error: {0}")]
ConfigError(#[from] u_lib::config::Error),
#[error("Error processing {0}")]
#[error("Processing error: {0}")]
ProcessingError(String),
#[error(transparent)]

@ -115,10 +115,10 @@ impl Endpoints {
pub async fn get_personal_jobs(
repo: Arc<PgRepo>,
id: Id,
agent_id: Id,
) -> EndpResult<api_types::GetPersonalJobs> {
repo.transaction(move |mut db| {
let agent = db.get_agent(id)?;
let agent = db.get_agent(agent_id)?;
match agent {
Some(mut agent) => {
agent.touch();
@ -126,7 +126,7 @@ impl Endpoints {
}
None => {
let mut new_agent = Agent::empty();
new_agent.id = id;
new_agent.id = agent_id;
db.upsert_agent(&new_agent)?;
@ -134,11 +134,16 @@ impl Endpoints {
.get_job_by_alias("agent_hello")?
.expect("agent_hello job not found");
db.set_jobs_for_agent(id, &[job.meta.id])?;
let assigned_job = AssignedJobById {
agent_id,
job_id: job.meta.id,
..Default::default()
};
db.assign_jobs(&[assigned_job])?;
}
}
let assigned_jobs = db.get_assigned_jobs(Some(id), true)?;
let assigned_jobs = db.get_assigned_jobs(Some(agent_id), true)?;
for job in &assigned_jobs {
db.update_job_status(job.id, JobState::Running)?;
@ -159,21 +164,19 @@ impl Endpoints {
) -> EndpResult<api_types::UploadJobs> {
let mut checked_jobs = vec![];
for mut job in jobs {
debug!("{job:?}");
if let Some(payload) = &mut job.payload {
payload.maybe_split_payload().map_err(Error::from)?;
} else if let Some(pld_id) = job.meta.payload_id {
if repo
if !repo
.interact(move |mut db| db.payload_exists(pld_id))
.await?
{
checked_jobs.push(job)
} else {
Err(Error::ProcessingError(format!(
"Payload {pld_id} not found"
)))?
}
}
checked_jobs.push(job)
}
let (jobs, payloads_opt): (Vec<_>, Vec<_>) = checked_jobs
@ -196,7 +199,7 @@ impl Endpoints {
.map_err(From::from)
}
pub async fn upload_payloads(
pub async fn upload_payload(
repo: Arc<PgRepo>,
raw_payload: RawPayload,
) -> EndpResult<api_types::UploadPayloads> {
@ -223,31 +226,13 @@ impl Endpoints {
.map_err(From::from)
}
pub async fn set_jobs(
pub async fn assign_jobs(
repo: Arc<PgRepo>,
agent_id: Id,
job_idents: Vec<String>,
) -> EndpResult<api_types::SetJobs> {
assigned_jobs: Vec<AssignedJobById>,
) -> EndpResult<()> {
repo.transaction(move |mut db| {
let assigned_job_idents = job_idents
.into_iter()
.map(|ident| {
Id::parse_str(&ident).or_else(|_| {
let job_from_db = db.get_job_by_alias(&ident);
match job_from_db {
Ok(job) => match job {
Some(j) => Ok(j.meta.id),
None => {
Err(Error::ProcessingError(format!("unknown ident {ident}")))
}
},
Err(e) => Err(e),
}
})
})
.collect::<Result<Vec<Id>, Error>>()?;
db.set_jobs_for_agent(agent_id, &assigned_job_idents)?;
Ok(assigned_job_idents)
db.assign_jobs(&assigned_jobs)?;
Ok(())
})
.await
.map_err(From::from)

@ -92,11 +92,10 @@ pub fn init_endpoints(
.and_then(Endpoints::del)
.map(ok);
let set_jobs = path("set_jobs")
let assign_jobs = path("assign_jobs")
.and(with_db.clone())
.and(warp::path::param::<Id>())
.and(body::json::<Vec<String>>())
.and_then(Endpoints::set_jobs)
.and(body::json::<Vec<AssignedJobById>>())
.and_then(Endpoints::assign_jobs)
.map(into_message);
let report = path("report")
@ -141,7 +140,7 @@ pub fn init_endpoints(
let upload_payload = path("upload_payload")
.and(with_db.clone())
.and(body::json::<RawPayload>())
.and_then(Endpoints::upload_payloads)
.and_then(Endpoints::upload_payload)
.map(ok);
let update_payload = path("update_payload")
@ -163,7 +162,7 @@ pub fn init_endpoints(
.or(upload_jobs)
.or(upload_payload)
.or(del)
.or(set_jobs)
.or(assign_jobs)
.or(get_assigned_jobs)
.or(update_agent)
.or(update_job)

@ -18,14 +18,14 @@ services:
u_db:
condition: service_healthy
ports:
- 63714:63714
- 9990:9990
env_file:
- ./.env
- ./.env.private
environment:
RUST_LOG: warp=info,u_server_lib=debug
healthcheck:
test: ss -tlpn | grep 63714
test: ss -tlpn | grep 9990
interval: 5s
timeout: 2s
retries: 2

@ -23,14 +23,14 @@ services:
u_db:
condition: service_healthy
ports:
- 63714:63714
- 9990:9990
env_file:
- ../.env
- ../.env.private
environment:
RUST_LOG: warp=info,u_server_lib=debug,u_lib=debug
healthcheck:
test: ss -tlpn | grep 63714
test: ss -tlpn | grep 9990
interval: 5s
timeout: 2s
retries: 2

@ -5,7 +5,6 @@ use rstest::rstest;
use serde_json::to_string;
use u_lib::config::AGENT_ITERATION_INTERVAL;
use u_lib::models::*;
use uuid::Uuid;
#[rstest]
#[tokio::test]
@ -28,16 +27,21 @@ async fn setup_tasks() {
.with_target_platforms("*linux*")
.try_into_job()
.unwrap();
let job_id = job.meta.id;
Panel::check_status(["jobs", "create", &to_string(&RawJob::from(job)).unwrap()]);
let cmd = format!("map create {agent_id} {job_alias}");
let assigned_ids: Vec<Uuid> = Panel::check_output(cmd);
let assigned = AssignedJobById {
agent_id,
job_id,
..Default::default()
};
Panel::check_status(["map", "create", &to_string(&[assigned]).unwrap()]);
retry_with_interval(5, AGENT_ITERATION_INTERVAL, || {
let result =
Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", assigned_ids[0]))
.remove(0);
Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", job_id)).remove(0);
if result.state == JobState::Finished {
eprintln!("{}", result.to_str_result());
assert!(result.to_str_result().contains("root:x:0:0"));
@ -61,16 +65,20 @@ async fn large_payload() {
.with_target_platforms(&agent.platform)
.try_into_job()
.unwrap();
let job_id = job.meta.id;
Panel::check_status(["jobs", "create", &to_string(&RawJob::from(job)).unwrap()]);
let cmd = format!("map create {agent_id} {job_alias}");
let assigned_ids: Vec<Uuid> = Panel::check_output(cmd);
let assigned = AssignedJobById {
agent_id,
job_id,
..Default::default()
};
Panel::check_status(["map", "create", &to_string(&[assigned]).unwrap()]);
retry_with_interval(5, AGENT_ITERATION_INTERVAL, || {
let result =
Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", assigned_ids[0]))
.remove(0);
Panel::check_output::<Vec<AssignedJob>>(format!("map read {}", job_id)).remove(0);
if result.state == JobState::Finished {
assert_eq!(result.to_str_result(), "type echo\n");
Ok(())

@ -9,7 +9,7 @@
// update_result(&self, result: AssignedJob)
// upload_jobs(&self, payload: impl OneOrVec<FatJob>)
// del(&self, item: Id)
// set_jobs(&self, agent: Id, job_idents: impl OneOrVec<String>)
// assign_jobs(&self, agent: Id, job_idents: impl OneOrVec<String>)
// get_agent_jobs(&self, agent: Option<Id>)
// ping(&self)

@ -35,7 +35,7 @@ pub mod api_types {
pub type UploadJobs = ();
pub type UploadPayloads = ();
pub type Del = ();
pub type SetJobs = Vec<Id>;
pub type SetJobs = ();
pub type GetAgentJobs = Vec<AssignedJob>;
pub type Ping = ();
pub type GetPayloads = Vec<Payload>;
@ -229,24 +229,20 @@ impl HttpClient {
}
/// set jobs for any agent
pub async fn set_jobs(
pub async fn assign_jobs(
&self,
agent: Id,
job_idents: impl IntoIterator<Item = impl Into<String>>,
assigned: impl IntoIterator<Item = &AssignedJobById>,
) -> Result<api_types::SetJobs> {
self.req_with_payload(
format!("set_jobs/{agent}"),
&job_idents
.into_iter()
.map(|i| i.into())
.collect::<Vec<String>>(),
format!("assign_jobs"),
&assigned.into_iter().collect::<Vec<_>>(),
)
.await
}
/// get jobs for any agent
pub async fn get_assigned_jobs(&self, agent: Option<Id>) -> Result<api_types::GetAgentJobs> {
self.req(format!("get_assigned_jobs/{}", opt_to_string(agent)))
/// get jobs for any agent by job_id, agent_id or result_id
pub async fn get_assigned_jobs(&self, id: Option<Id>) -> Result<api_types::GetAgentJobs> {
self.req(format!("get_assigned_jobs/{}", opt_to_string(id)))
.await
}

@ -1,3 +1,4 @@
use crate::types::Id;
use envy::{from_env, prefixed, Result as EnvResult};
use lazy_static::lazy_static;
use serde::Deserialize;
@ -5,9 +6,7 @@ use std::time::Duration;
pub use envy::Error;
use crate::types::Id;
pub const MASTER_PORT: u16 = 63714;
pub const MASTER_PORT: u16 = 9990;
pub const AGENT_ITERATION_INTERVAL: Duration = Duration::from_secs(5);

@ -56,6 +56,7 @@ impl Debug for AssignedJob {
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct AssignedJobById {
pub agent_id: Id,
#[serde(default)]
pub id: Id,
pub job_id: Id,
}

@ -1,7 +1,11 @@
use crate::{conv::bytes_to_string, types::Id, ufs, UError};
use crate::{
conv::{bytes_to_string, bytes_to_string_truncated},
types::Id,
ufs, UError,
};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::{fs::File, path::Path, process::Command};
use std::{fmt::Debug, fs::File, path::Path, process::Command};
pub const MAX_READABLE_PAYLOAD_SIZE: i64 = 1024 * 32;
@ -28,7 +32,7 @@ impl RawPayload {
diesel(table_name = payloads),
diesel(treat_none_as_null = true)
)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Payload {
pub id: Id,
pub mime_type: String,
@ -37,6 +41,24 @@ pub struct Payload {
pub data: Option<Vec<u8>>, // when None, payload data is stored in ufs
}
impl Debug for Payload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Payload")
.field("id", &self.id)
.field("mime_type", &self.mime_type)
.field("name", &self.name)
.field("size", &self.size)
.field(
"data",
&self
.data
.as_ref()
.map(|data| bytes_to_string_truncated(data, 256)),
)
.finish()
}
}
impl Payload {
pub fn is_human_readable(&self) -> bool {
self.size < MAX_READABLE_PAYLOAD_SIZE && self.mime_type.starts_with("text/")

Loading…
Cancel
Save