Merge branch '8-hold-a-readonly-ref-on-a-cached-job' into 'master'

Resolve "Hold a readonly ref on a cached job"

Closes #8

See merge request root/unki!1
4-update-check
Administrator 4 years ago
commit 6af4cf84f8
  1. 29
      bin/u_agent/src/main.rs
  2. 43
      bin/u_panel/src/main.rs
  3. 32
      bin/u_server/src/db.rs
  4. 20
      bin/u_server/src/handlers.rs
  5. 10
      bin/u_server/src/main.rs
  6. 38
      lib/u_lib/src/api.rs
  7. 10
      lib/u_lib/src/errors.rs
  8. 1
      lib/u_lib/src/lib.rs
  9. 6
      lib/u_lib/src/models/agent.rs
  10. 135
      lib/u_lib/src/models/jobs.rs
  11. 7
      lib/u_lib/src/models/schema.rs
  12. 4
      migrations/2020-10-24-111622_create_all/up.sql

@ -2,9 +2,7 @@
// поддержка питона // поддержка питона
// резолв адреса управляющего сервера через DoT // резолв адреса управляющего сервера через DoT
// кроссплатформенность (реализовать интерфейс для винды и никсов) // кроссплатформенность (реализовать интерфейс для винды и никсов)
// перезапуск через memfd_create
// проверка обнов // проверка обнов
// проверка ssh ключей и распространение через known_hosts
// самоуничтожение // самоуничтожение
#[macro_use] #[macro_use]
@ -15,8 +13,10 @@ use std::env;
use u_lib::{ use u_lib::{
api::ClientHandler, api::ClientHandler,
models::{gather}, models::{gather},
build_jobs, build_jobs_with_result,
UID UID,
JobResult,
JobCache
}; };
use tokio::{time::{Duration, sleep}}; use tokio::{time::{Duration, sleep}};
@ -44,17 +44,22 @@ async fn main() {
retry_until_ok!(instance.init(&cli_info).await); retry_until_ok!(instance.init(&cli_info).await);
info!("Instanciated! Running main loop"); info!("Instanciated! Running main loop");
loop { loop {
let jobs = retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await); let job_requests: Vec<JobResult> =
if jobs.len() > 0 { retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await);
let job_uids: Vec<String> = jobs.iter() if job_requests.len() > 0 {
.map(|j| j.id.to_string()[..8].to_owned()) for jr in job_requests.iter() {
.collect(); if !JobCache::contains(&jr.job_id) {
info!("Fetched jobs: \n{}", job_uids.join("\n")); info!("Fetching job: {}", &jr.job_id);
let result = build_jobs(jobs) let fetched_job =
retry_until_ok!(instance.get_jobs(Some(&jr.job_id)).await).pop().unwrap();
JobCache::insert(fetched_job);
}
};
let result = build_jobs_with_result(job_requests)
.run_until_complete() .run_until_complete()
.await .await
.into_iter() .into_iter()
.map(|r| r.unwrap())//TODO: panic handler (send info on server) .map(|r| r.unwrap())
.collect(); .collect();
retry_until_ok!(instance.report( retry_until_ok!(instance.report(
&result &result

@ -1,7 +1,8 @@
use structopt::StructOpt; use structopt::StructOpt;
use u_lib::{ use u_lib::{
api::ClientHandler, api::ClientHandler,
models::JobMeta models::JobMeta,
UError
}; };
use std::path::PathBuf; use std::path::PathBuf;
use uuid::Uuid; use uuid::Uuid;
@ -51,10 +52,7 @@ enum JmALD {
}, },
List { List {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
uid: Option<Uuid>, uid: Option<Uuid>
#[structopt(short, long)]
results: bool
}, },
Delete { Delete {
#[structopt(parse(try_from_str = parse_uuid))] #[structopt(parse(try_from_str = parse_uuid))]
@ -78,46 +76,47 @@ fn parse_uuid(src: &str) -> Result<Uuid, String> {
Uuid::parse_str(src).map_err(|e| e.to_string()) Uuid::parse_str(src).map_err(|e| e.to_string())
} }
#[tokio::main] async fn process_cmd(cmd: Cmd) -> Result<(), UError> {
async fn main() -> Result<(), &'static str> {
let args: Args = Args::from_args();
let cli_handler = ClientHandler::new(None) let cli_handler = ClientHandler::new(None)
.password("123qwe".to_string()); .password("123qwe".to_string());
match args.cmd { match cmd {
Cmd::Agents(action) => match action { Cmd::Agents(action) => match action {
LD::List {uid} => cli_handler.get_agents(uid.as_ref()) LD::List {uid} => cli_handler.get_agents(uid.as_ref())
.await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)), .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)),
LD::Delete {uid} => { LD::Delete {uid} => {
println!("{}", cli_handler.del(Some(&uid)).await.unwrap()); println!("{}", cli_handler.del(Some(&uid)).await?);
} }
}, },
Cmd::Jobs(action) => match action { Cmd::Jobs(action) => match action {
JobALD::Add {cmd: JobCmd::Cmd(cmd), agent} => { JobALD::Add {cmd: JobCmd::Cmd(cmd), agent} => {
let job = JobMeta::from_shell(cmd.join(" ")); let job = JobMeta::from_shell(cmd.join(" "));
let job_uid = job.id; let job_uid = job.id;
cli_handler.upload_jobs(&vec![job]).await.unwrap(); cli_handler.upload_jobs(&vec![job]).await?;
if agent.is_some() { if agent.is_some() {
cli_handler.set_jobs(&vec![job_uid], agent.as_ref()).await.unwrap() cli_handler.set_jobs(&vec![job_uid], agent.as_ref()).await?
} }
}, },
JobALD::LD(LD::List {uid}) => cli_handler.get_jobs(uid.as_ref()) JobALD::LD(LD::List {uid}) => cli_handler.get_jobs(uid.as_ref())
.await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)), .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)),
JobALD::LD(LD::Delete {uid}) => { JobALD::LD(LD::Delete {uid}) => {
println!("{}", cli_handler.del(Some(&uid)).await.unwrap()) println!("{}", cli_handler.del(Some(&uid)).await?)
} }
} }
Cmd::Jobmap(action) => match action { Cmd::Jobmap(action) => match action {
JmALD::Add {agent_uid, job_uids} => cli_handler.set_jobs(&job_uids, Some(&agent_uid)) JmALD::Add {agent_uid, job_uids} => cli_handler.set_jobs(&job_uids, Some(&agent_uid))
.await.unwrap(), .await?,
JmALD::List {uid, results} => if results { JmALD::List {uid} => {
cli_handler.get_results(uid.as_ref())
.await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r))
} else {
cli_handler.get_agent_jobs(uid.as_ref()) cli_handler.get_agent_jobs(uid.as_ref())
.await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)) .await?.into_iter().for_each(|r| println!("{}{}", DELIM, r))
}, },
JmALD::Delete {uid} => println!("{}", cli_handler.del(Some(&uid)).await.unwrap()) JmALD::Delete {uid} => println!("{}", cli_handler.del(Some(&uid)).await?)
} }
} }
Ok(()) Ok(())
} }
#[tokio::main]
async fn main() -> Result<(), UError> {
let args: Args = Args::from_args();
process_cmd(args.cmd).await
}

@ -67,37 +67,25 @@ impl UDB {
Ok(result) Ok(result)
} }
//TODO: belongs_to pub fn get_agent_jobs(&self, uid: Option<Uuid>, personal: bool) -> USrvResult<Vec<JobResult>> {
pub fn get_agent_jobs(&self, uid: Option<Uuid>, personal: bool) -> USrvResult<Vec<JobMeta>> { use schema::results;
use schema::{results, jobs};
let mut q = results::table let mut q = results::table
.inner_join(jobs::table)
.into_boxed(); .into_boxed();
if uid.is_some() { if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap())) q = q.filter(results::agent_id.eq(uid.unwrap()))
} }
if personal { if personal {
q = q.filter(results::state.eq(JobState::Queued)) q = q.filter(
} results::state.eq(JobState::Queued)
let result = q.select( .and(results::agent_id.eq(uid.unwrap()))
(jobs::alias, jobs::id, jobs::exec_type, jobs::platform, jobs::payload)
) )
.get_results::<JobMeta>(&self.conn)?; } else if uid.is_some() {
Ok(result) q = q.filter(results::agent_id.eq(uid.unwrap()))
}
pub fn get_results(&self, uid: Option<Uuid>) -> USrvResult<Vec<JobResult>> {
use schema::results;
let result = if uid.is_some() {
results::table
.filter(results::agent_id.eq(uid.unwrap()))
.or_filter(results::job_id.eq(uid.unwrap())) .or_filter(results::job_id.eq(uid.unwrap()))
.or_filter(results::id.eq(uid.unwrap())) .or_filter(results::id.eq(uid.unwrap()))
.load::<JobResult>(&self.conn)?
} else { }
results::table let result = q.load::<JobResult>(&self.conn)?;
.load::<JobResult>(&self.conn)?
};
Ok(result) Ok(result)
} }

@ -66,20 +66,6 @@ pub async fn get_jobs(
} }
} }
pub async fn get_results(
uid: Option<Uuid>,
db: Storage) -> Result<impl Reply, Rejection>
{
match db.lock()
.unwrap()
.get_results(uid) {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
Err(e) => Err(warp::reject())
}
}
pub async fn get_agent_jobs( pub async fn get_agent_jobs(
uid: Option<Uuid>, uid: Option<Uuid>,
db: Storage, db: Storage,
@ -135,7 +121,7 @@ pub async fn set_jobs(
.unwrap() .unwrap()
.set_jobs_for_agent(&agent_uid, &msg.into_inner()) { .set_jobs_for_agent(&agent_uid, &msg.into_inner()) {
Ok(_) => build_empty_200(), Ok(_) => build_empty_200(),
Err(e) => build_response(StatusCode::BAD_REQUEST, dbg!(e)) Err(e) => build_response(StatusCode::BAD_REQUEST, e)
} }
} }
@ -160,7 +146,3 @@ pub async fn report(
} }
build_empty_200() build_empty_200()
} }
pub async fn dummy() -> Result<impl Reply, Rejection> {
Ok(String::from("ok"))
}

@ -76,7 +76,7 @@ async fn main() {
let get_personal_jobs = warp::get() let get_personal_jobs = warp::get()
.and(warp::path(Paths::get_agent_jobs)) .and(warp::path(Paths::get_agent_jobs))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none)) .and(warp::path::param::<Uuid>().map(Some))
.and(db.clone()) .and(db.clone())
.and_then(|uid, db| handlers::get_agent_jobs(uid, db, true)); .and_then(|uid, db| handlers::get_agent_jobs(uid, db, true));
@ -99,15 +99,10 @@ async fn main() {
.and(db.clone()) .and(db.clone())
.and_then(handlers::report); .and_then(handlers::report);
let get_results = warp::get()
.and(warp::path(Paths::get_results))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none))
.and(db.clone())
.and_then(handlers::get_results);
let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let agent_zone = new_agent let agent_zone = new_agent
.or(get_jobs.clone())
.or(get_personal_jobs) .or(get_personal_jobs)
.or(report) .or(report)
; ;
@ -120,7 +115,6 @@ async fn main() {
.or(del) .or(del)
.or(set_jobs) .or(set_jobs)
.or(get_agent_jobs) .or(get_agent_jobs)
.or(get_results)
); );
let routes = agent_zone let routes = agent_zone

@ -11,7 +11,6 @@ use crate::{
use reqwest::{ use reqwest::{
Client, Client,
Url, Url,
Response,
RequestBuilder RequestBuilder
}; };
use std::{ use std::{
@ -95,14 +94,34 @@ macro_rules! build_handler {
let request = $crate::build_url_by_method!( let request = $crate::build_url_by_method!(
$method $path, $method $path,
pname = $($param_name)?, ptype = $($param_type)?, urlparam = $($url_param)? pname = $($param_name)?, ptype = $($param_type)?, urlparam = $($url_param)?
)(self $(, param as &$param_type)? $(, url_param as Option<&$url_param>)?); )(
self
$(, param as &$param_type)?
$(, url_param as Option<&$url_param>)?
);
let response = request.send().await?; let response = request.send().await?;
match response.error_for_status() { let content_len = response.content_length();
Ok(r) => Ok(r.json::<BaseMessage<$result>>() let is_success = match response.error_for_status_ref() {
Ok(_) => Ok(()),
Err(e) => Err(UError::from(e))
};
match is_success {
Ok(_) => response.json::<BaseMessage<$result>>()
.await .await
.map(|msg| msg.into_inner()) .map(|msg| msg.into_inner())
.unwrap_or_default()), .or_else(|e| {
Err(e) => Err(UError::from(e)) match content_len {
Some(0) => Ok(Default::default()),
_ => Err(UError::from(e))
}
}),
Err(UError::NetError(err_src, _)) => Err(
UError::NetError(
err_src,
response.text().await.unwrap()
)
),
_ => unreachable!()
} }
} }
} }
@ -162,8 +181,8 @@ impl ClientHandler {
////////////////// //////////////////
// client listing (A) // client listing (A)
build_handler!(GET get_agents/Uuid() -> Vec<Agent>); build_handler!(GET get_agents/Uuid() -> Vec<Agent>);
// get jobs for client (agent_id=Uuid) // get jobs for client
build_handler!(GET get_agent_jobs/Uuid() -> Vec<JobMeta>); build_handler!(GET get_agent_jobs/Uuid() -> Vec<JobResult>);
// get all available jobs (A) // get all available jobs (A)
build_handler!(GET get_jobs/Uuid() -> Vec<JobMeta>); build_handler!(GET get_jobs/Uuid() -> Vec<JobMeta>);
// add client to server's db // add client to server's db
@ -175,8 +194,5 @@ build_handler!(GET del/Uuid() -> String);
// set jobs for client (A) // set jobs for client (A)
// POST /set_jobs/Uuid json: Vec<Uuid> // POST /set_jobs/Uuid json: Vec<Uuid>
build_handler!(POST set_jobs/Uuid(Vec<Uuid>) -> ()); build_handler!(POST set_jobs/Uuid(Vec<Uuid>) -> ());
// get results (A)
// GET /get_job_results/Uuid
build_handler!(GET get_results/Uuid() -> Vec<JobResult>);
// report job result // report job result
build_handler!(POST report(Vec<JobResult>) -> ()); build_handler!(POST report(Vec<JobResult>) -> ());

@ -13,8 +13,8 @@ pub enum UError {
#[error("Error: {0}")] #[error("Error: {0}")]
Raw(&'static str), Raw(&'static str),
#[error("Connection error: {0}")] #[error("Connection error: {0}. Body: {1}")]
NetError(String), NetError(String, String),
#[error("Parse error")] #[error("Parse error")]
ParseError, ParseError,
@ -28,8 +28,8 @@ pub enum UError {
#[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")] #[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")]
InsuitablePlatform(String, String), InsuitablePlatform(String, String),
#[error("Task {0} doesn't exist")] #[error("Job {0} doesn't exist")]
NoTask(Uuid), NoJob(Uuid),
#[error("Error opening {0}: {1}")] #[error("Error opening {0}: {1}")]
FilesystemError(String, String) FilesystemError(String, String)
@ -37,6 +37,6 @@ pub enum UError {
impl From<ReqError> for UError { impl From<ReqError> for UError {
fn from(e: ReqError) -> Self { fn from(e: ReqError) -> Self {
UError::NetError(e.to_string()) UError::NetError(e.to_string(), String::new())
} }
} }

@ -21,5 +21,6 @@ extern crate lazy_static;
#[macro_use] #[macro_use]
extern crate diesel; extern crate diesel;
#[macro_use]
extern crate log; extern crate log;
extern crate env_logger; extern crate env_logger;

@ -88,10 +88,10 @@ pub async fn gather() -> IAgent {
id: UID.clone(), id: UID.clone(),
hostname: run_cmd_fast("hostname").await, hostname: run_cmd_fast("hostname").await,
is_root: &run_cmd_fast("id -u").await == "0", is_root: &run_cmd_fast("id -u").await == "0",
is_root_allowed: false, //TODO is_root_allowed: false,
platform: guess_host_triple().unwrap_or("Error").to_string(), platform: guess_host_triple().unwrap_or("Error").to_string(),
status: None, //TODO status: None, //wtf?
token: None, //TODO token: None,
username: run_cmd_fast("id -un").await, username: run_cmd_fast("id -un").await,
} }
} }

@ -1,11 +1,15 @@
use std::{ use std::{
time::{SystemTime, Duration}, time::{SystemTime, Duration},
thread, thread,
sync::{RwLock, RwLockReadGuard},
cmp::PartialEq, cmp::PartialEq,
fmt, fmt,
string::ToString, string::ToString,
path::PathBuf, path::PathBuf,
fs fs,
process::Output,
collections::HashMap,
ops::Deref,
}; };
use serde::{ use serde::{
Serialize, Serialize,
@ -13,10 +17,13 @@ use serde::{
}; };
use uuid::Uuid; use uuid::Uuid;
use guess_host_triple::guess_host_triple; use guess_host_triple::guess_host_triple;
use tokio::process::Command; use tokio::{
process::Command
};
use crate::{ use crate::{
utils::systime_to_string, utils::systime_to_string,
models::schema::*, models::schema::*,
Agent,
UError, UError,
UResult, UResult,
UID, UID,
@ -29,10 +36,49 @@ use diesel::{
Queryable, Queryable,
Identifiable, Identifiable,
Insertable, Insertable,
query_builder::AsChangeset
}; };
use strum::Display; use strum::Display;
type Cache = HashMap<Uuid, JobMeta>;
lazy_static! {
static ref JOB_CACHE: RwLock<Cache> = RwLock::new(HashMap::new());
}
pub struct JobCache;
impl JobCache {
pub fn insert(job_meta: JobMeta) {
JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta);
}
pub fn contains(uid: &Uuid) -> bool {
JOB_CACHE.read().unwrap().contains_key(uid)
}
pub fn get(uid: &Uuid) -> Option<JobCacheHolder> {
if !Self::contains(uid) {
return None
}
let lock = JOB_CACHE.read().unwrap();
Some(JobCacheHolder(lock, uid))
}
pub fn remove(uid: &Uuid) {
JOB_CACHE.write().unwrap().remove(uid);
}
}
pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid);
impl<'jm> Deref for JobCacheHolder<'jm> {
type Target = JobMeta;
fn deref(&self) -> &Self::Target {
self.0.get(self.1).unwrap()
}
}
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction { pub enum ManageAction {
@ -46,7 +92,7 @@ pub enum ManageAction {
pub enum JobSchedule { pub enum JobSchedule {
Once, Once,
Permanent, Permanent,
//TODO: Scheduled //Scheduled
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
@ -96,13 +142,13 @@ impl JobOutput {
} }
} }
pub fn stdout(mut self, data: &[u8]) -> Self { pub fn stdout(mut self, data: Vec<u8>) -> Self {
self.stdout = data.to_owned(); self.stdout = data;
self self
} }
pub fn stderr(mut self, data: &[u8]) -> Self { pub fn stderr(mut self, data: Vec<u8>) -> Self {
self.stderr = data.to_owned(); self.stderr = data;
self self
} }
@ -128,12 +174,12 @@ impl JobOutput {
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)) raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| { .map(|s: &str| {
let mut parts = s.split(&err_header) let mut parts = s.split(&err_header)
.map(|d| d.trim().as_bytes()) .map(|d| d.trim().as_bytes().to_vec())
.collect::<Vec<&[u8]>>() .collect::<Vec<Vec<u8>>>()
.into_iter(); .into_iter();
JobOutput::new() JobOutput::new()
.stdout(parts.next().unwrap()) .stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(&[])) .stderr(parts.next().unwrap_or(vec![]))
}) })
} }
@ -158,7 +204,7 @@ impl JobOutput {
Insertable Insertable
)] )]
#[table_name = "jobs"] #[table_name = "jobs"]
pub struct JobMeta { // TODO: shell cmd how to exec payload pub struct JobMeta {
pub alias: String, pub alias: String,
pub id: Uuid, pub id: Uuid,
pub exec_type: JobType, pub exec_type: JobType,
@ -223,7 +269,7 @@ impl Default for JobMeta {
Queryable, Queryable,
Identifiable, Identifiable,
Insertable, Insertable,
AsChangeset AsChangeset,
)] )]
#[table_name = "results"] #[table_name = "results"]
pub struct JobResult { pub struct JobResult {
@ -257,11 +303,13 @@ impl fmt::Display for JobResult {
} }
impl JobResult { impl JobResult {
pub fn from_meta(job_id: Uuid) -> Self { pub fn from_meta(job_id: Uuid, result_id: Option<Uuid>) -> Self {
let mut inst = JobResult::default(); Self {
inst.agent_id = *UID; id: result_id.unwrap_or(Uuid::new_v4()),
inst.job_id = job_id; agent_id: *UID,
inst job_id,
..Default::default()
}
} }
//pub fn as_job_output(&self) -> JobOutput {} //pub fn as_job_output(&self) -> JobOutput {}
@ -289,17 +337,20 @@ pub struct Job {
} }
impl Job { impl Job {
fn build(job_meta: JobMeta) -> UResult<Self> { fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult<Self> {
match job_meta.exec_type { match job_meta.exec_type {
JobType::Shell => { JobType::Shell => {
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string(); let curr_platform = guess_host_triple().unwrap_or("unknown").to_string();
if job_meta.platform != curr_platform { if job_meta.platform != curr_platform {
return Err(UError::InsuitablePlatform(job_meta.platform, curr_platform)) return Err(UError::InsuitablePlatform(
job_meta.platform.clone(), curr_platform
))
} }
let job_meta = job_meta.clone();
Ok(Self { Ok(Self {
exec_type: job_meta.exec_type, exec_type: job_meta.exec_type,
payload: job_meta.payload, payload: job_meta.payload,
result: JobResult::from_meta(job_meta.id.clone()) result: JobResult::from_meta(job_meta.id.clone(), Some(result_id))
}) })
}, },
_ => todo!() _ => todo!()
@ -315,7 +366,7 @@ impl Job {
} }
None => unimplemented!() None => unimplemented!()
}; };
let mut cmd_parts = str_payload // TODO: WRONG let mut cmd_parts = str_payload
.split(" ") .split(" ")
.map(String::from) .map(String::from)
.collect::<Vec<String>>() .collect::<Vec<String>>()
@ -327,14 +378,14 @@ impl Job {
.output() .output()
.await; .await;
let (data, retcode) = match cmd_result { let (data, retcode) = match cmd_result {
Ok(output) => { Ok(Output {status, stdout, stderr}) => {
( (
Some(JobOutput::new() Some(JobOutput::new()
.stdout(&output.stdout) .stdout(stdout)
.stderr(&output.stderr) .stderr(stderr)
.multiline() .multiline()
), ),
output.status.code() status.code()
) )
} }
Err(e) => { Err(e) => {
@ -355,14 +406,38 @@ impl Job {
} }
} }
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter { pub fn build_jobs_with_result<J: OneOrMany<JobResult>>(job_requests: J) -> Waiter {
let prepared_jobs = job_metas.into_vec().into_iter().map(|job| -> DynFut { let prepared_jobs = job_requests.into_vec()
let j = Job::build(job).unwrap(); .into_iter()
Box::pin(j.run()) .filter_map(|jr| -> Option<DynFut> {
let job = {
let job_meta = JobCache::get(&jr.job_id);
if job_meta.is_none() {
Err(UError::NoJob(jr.job_id))
} else {
Job::build(&*job_meta.unwrap(), jr.id)
}
};
match job {
Ok(j) => Some(Box::pin(j.run())),
Err(e) => {
warn!("Job building error: {}", e);
None
}
}
}).collect::<Vec<DynFut>>(); }).collect::<Vec<DynFut>>();
Waiter::new(prepared_jobs) Waiter::new(prepared_jobs)
} }
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let job_requests = job_metas.into_vec().into_iter().map(|jm| {
let j_uid = jm.id;
JobCache::insert(jm);
JobResult::from_meta(j_uid, None)
}).collect::<Vec<JobResult>>();
build_jobs_with_result(job_requests)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

@ -49,9 +49,9 @@ table! {
use crate::*; use crate::*;
jobs (id) { jobs (id) {
alias -> Nullable<Text>, alias -> Text,
id -> Uuid, id -> Uuid,
exec_type -> JobType, exec_type -> Jobtype,
platform -> Text, platform -> Text,
payload -> Nullable<Bytea>, payload -> Nullable<Bytea>,
} }
@ -67,8 +67,9 @@ table! {
id -> Uuid, id -> Uuid,
job_id -> Uuid, job_id -> Uuid,
result -> Nullable<Bytea>, result -> Nullable<Bytea>,
state -> Jobstate,
retcode -> Nullable<Int4>, retcode -> Nullable<Int4>,
ts -> Timestamp, updated -> Timestamp,
} }
} }

@ -52,8 +52,8 @@ CREATE TABLE IF NOT EXISTS results (
, state JobState NOT NULL DEFAULT 'queued' , state JobState NOT NULL DEFAULT 'queued'
, retcode INTEGER , retcode INTEGER
, updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, FOREIGN KEY(agent_id) REFERENCES agents(id) , FOREIGN KEY(agent_id) REFERENCES agents(id) ON DELETE CASCADE
, FOREIGN KEY(job_id) REFERENCES jobs(id) , FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
, PRIMARY KEY(id) , PRIMARY KEY(id)
); );

Loading…
Cancel
Save