improved executor and some fixes

4-update-check
plazmoid 4 years ago
parent 6af4cf84f8
commit 0cad9c107a
  1. 2
      bin/u_agent/Cargo.toml
  2. 55
      bin/u_agent/src/main.rs
  3. 17
      bin/u_server/src/db.rs
  4. 21
      bin/u_server/src/handlers.rs
  5. 2
      bin/u_server/src/main.rs
  6. 4
      lib/u_lib/src/api.rs
  7. 55
      lib/u_lib/src/executor.rs
  8. 22
      lib/u_lib/src/models/jobs.rs
  9. 4
      lib/u_lib/src/models/mod.rs

@ -11,6 +11,6 @@ tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process",
sysinfo = "0.10.5"
log = "^0.4"
env_logger = "0.8.3"
uuid = "0.8.1"
uuid = "0.6.5"
reqwest = { version = "0.11", features = ["json"] }
u_lib = { version = "*", path = "../../lib/u_lib" }

@ -14,9 +14,11 @@ use u_lib::{
api::ClientHandler,
models::{gather},
build_jobs_with_result,
pop_completed,
UID,
JobResult,
JobCache
ExactJob,
JobCache,
//daemonize
};
use tokio::{time::{Duration, sleep}};
@ -33,6 +35,28 @@ macro_rules! retry_until_ok {
}
}
async fn process_request(job_requests: Vec<ExactJob>, client: &ClientHandler) {
if job_requests.len() > 0 {
for jr in job_requests.iter() {
if !JobCache::contains(&jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job =
retry_until_ok!(client.get_jobs(Some(&jr.job_id)).await).pop().unwrap();
JobCache::insert(fetched_job);
}
};
info!(
"Scheduling jobs: \n{}",
job_requests.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>().join("\n")
);
build_jobs_with_result(job_requests)
.spawn()
.await;
}
}
#[tokio::main]
async fn main() {
//daemonize();
@ -44,26 +68,15 @@ async fn main() {
retry_until_ok!(instance.init(&cli_info).await);
info!("Instanciated! Running main loop");
loop {
let job_requests: Vec<JobResult> =
let job_requests: Vec<ExactJob> =
retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await);
if job_requests.len() > 0 {
for jr in job_requests.iter() {
if !JobCache::contains(&jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job =
retry_until_ok!(instance.get_jobs(Some(&jr.job_id)).await).pop().unwrap();
JobCache::insert(fetched_job);
}
};
let result = build_jobs_with_result(job_requests)
.run_until_complete()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
retry_until_ok!(instance.report(
&result
).await)
process_request(job_requests, &instance).await;
let result: Vec<ExactJob> = pop_completed().await
.into_iter()
.map(|r| r.unwrap())
.collect();
if result.len() > 0 {
retry_until_ok!(instance.report(&result).await)
}
sleep(Duration::from_secs(5)).await;
}

@ -67,7 +67,16 @@ impl UDB {
Ok(result)
}
pub fn get_agent_jobs(&self, uid: Option<Uuid>, personal: bool) -> USrvResult<Vec<JobResult>> {
pub fn update_job_status(&self, uid: Uuid, status: JobState) -> USrvResult<()> {
use schema::results;
diesel::update(results::table)
.filter(results::id.eq(uid))
.set(results::state.eq(status))
.execute(&self.conn)?;
Ok(())
}
pub fn get_agent_jobs(&self, uid: Option<Uuid>, personal: bool) -> USrvResult<Vec<ExactJob>> {
use schema::results;
let mut q = results::table
.into_boxed();
@ -85,7 +94,7 @@ impl UDB {
.or_filter(results::id.eq(uid.unwrap()))
}
let result = q.load::<JobResult>(&self.conn)?;
let result = q.load::<ExactJob>(&self.conn)?;
Ok(result)
}
@ -111,12 +120,12 @@ impl UDB {
return Err(USrvError::NotFound(not_found_jobs.join(", ")));
}
let job_requests = job_uids.iter().map(|job_uid| {
JobResult {
ExactJob {
job_id: *job_uid,
agent_id: *agent_uid,
..Default::default()
}
}).collect::<Vec<JobResult>>();
}).collect::<Vec<ExactJob>>();
diesel::insert_into(results::table)
.values(&job_requests)
.execute(&self.conn)?;

@ -71,12 +71,19 @@ pub async fn get_agent_jobs(
db: Storage,
personal: bool) -> Result<impl Reply, Rejection>
{
match db.lock()
let result = db.lock()
.unwrap()
.get_agent_jobs(uid, personal) {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
.get_agent_jobs(uid, personal);
match result {
Ok(r) => {
let _db = db.lock().unwrap();
for j in r.iter() {
_db.update_job_status(j.id, JobState::Running).ok();
}
Ok(warp::reply::json(
&r.as_message()
))
},
Err(e) => Err(warp::reject())
}
}
@ -126,7 +133,7 @@ pub async fn set_jobs(
}
pub async fn report(
msg: BaseMessage<'_, Vec<JobResult>>,
msg: BaseMessage<'_, Vec<ExactJob>>,
db: Storage) -> Result<impl Reply, Rejection>
{
let db = db.lock().unwrap();
@ -136,7 +143,7 @@ pub async fn report(
if id != res.agent_id {
continue
}
if let Err(e) = res.save_changes::<JobResult>(&db.conn).map_err(USrvError::from) {
if let Err(e) = res.save_changes::<ExactJob>(&db.conn).map_err(USrvError::from) {
failed.push(e.to_string())
}
}

@ -95,7 +95,7 @@ async fn main() {
let report = warp::post()
.and(warp::path(Paths::report))
.and(get_content::<Vec<JobResult>>())
.and(get_content::<Vec<ExactJob>>())
.and(db.clone())
.and_then(handlers::report);

@ -182,7 +182,7 @@ impl ClientHandler {
// client listing (A)
build_handler!(GET get_agents/Uuid() -> Vec<Agent>);
// get jobs for client
build_handler!(GET get_agent_jobs/Uuid() -> Vec<JobResult>);
build_handler!(GET get_agent_jobs/Uuid() -> Vec<ExactJob>);
// get all available jobs (A)
build_handler!(GET get_jobs/Uuid() -> Vec<JobMeta>);
// add client to server's db
@ -195,4 +195,4 @@ build_handler!(GET del/Uuid() -> String);
// POST /set_jobs/Uuid json: Vec<Uuid>
build_handler!(POST set_jobs/Uuid(Vec<Uuid>) -> ());
// report job result
build_handler!(POST report(Vec<JobResult>) -> ());
build_handler!(POST report(Vec<ExactJob>) -> ());

@ -11,29 +11,26 @@ use lazy_static::lazy_static;
use tokio::{
spawn,
task::JoinHandle,
//sync::mpsc::{channel, Receiver, Sender}
sync::mpsc::{channel, Receiver, Sender}
};
use uuid::Uuid;
pub type FutRes = UResult<JobResult>;
pub type FutRes = UResult<ExactJob>;
pub type DynFut = BoxFuture<'static, FutRes>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
/*static ref FUT_CHANNEL: (Mutex<Sender<Uuid>>, Mutex<Receiver<Uuid>>) = {
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = {
spawn(init_receiver());
let (tx, rx) = channel(100);
(Mutex::new(tx), Mutex::new(rx))
};*/
(tx, Mutex::new(rx))
};
}
/*
async fn get_static_channel() -> (Sender<Uuid>, MutexGuard<'static, Receiver<Uuid>>) {
(
FUT_CHANNEL.0.lock().await.clone(),
FUT_CHANNEL.1.lock().await
)
async fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone()
}
*/
struct JoinInfo {
handle: JoinHandle<FutRes>,
completed: bool,
@ -54,26 +51,25 @@ impl Waiter {
}
pub async fn spawn(mut self) -> Self {
let collectable = self.tasks.len() != 1;
let collectable = true; //TODO: self.tasks.len() != 1;
for f in self.tasks.drain(..) {
//eprintln!("before static channel");
//let tx = get_static_channel().await.0;
//eprintln!("after static channel");
let tx = get_sender().await;
let fid = Uuid::new_v4();
self.fids.push(fid);
/*let task_wrapper = async move {
//eprintln!("inside wrapper (started): {}", fid);
let task_wrapper = async move {
debug!("inside wrapper (started): {}", fid);
let result = f.await;
tx.send(fid).await.unwrap();
result
};*/
};
debug!("before JoinInfo");
let handle = JoinInfo {
handle: spawn(f),
handle: spawn(task_wrapper),
completed: false,
collectable
};
//eprintln!("before push: {}", fid);
spawn(async {}).await.ok();
debug!("before push: {}", fid);
//spawn(async {}).await.ok();
FUT_RESULTS.lock().await.insert(fid, handle);
}
self
@ -105,15 +101,19 @@ impl Waiter {
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
/*
async fn init_receiver() {
while let Some(fid) = get_static_channel().await.1.recv().await {
if let Some(j) = FUT_RESULTS.lock().await.get_mut(&fid) {
j.completed = true;
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
//info!("init_receiver: next val: {}", &fid);
if let Some(mut lock) = FUT_RESULTS.try_lock() {
if let Some(j) = lock.get_mut(&fid) {
//info!("init_receiver: marked as completed");
j.completed = true;
}
}
}
}
*/
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS
.lock()
@ -122,7 +122,6 @@ pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
Some(t) => t,
None => return None
};
//eprint!("{}, {}: ", &fid, *collectable);
if collectable && completed {
let task = pop_task(fid).await.unwrap();
let result = task.handle.await.unwrap();

@ -272,7 +272,7 @@ impl Default for JobMeta {
AsChangeset,
)]
#[table_name = "results"]
pub struct JobResult {
pub struct ExactJob {
pub agent_id: Uuid,
pub created: SystemTime,
pub id: Uuid,
@ -283,7 +283,7 @@ pub struct JobResult {
pub updated: SystemTime,
}
impl fmt::Display for JobResult {
impl fmt::Display for ExactJob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut out = format!("Result {}", self.id);
out += &format!("\nAgent {}", self.agent_id);
@ -302,7 +302,7 @@ impl fmt::Display for JobResult {
}
}
impl JobResult {
impl ExactJob {
pub fn from_meta(job_id: Uuid, result_id: Option<Uuid>) -> Self {
Self {
id: result_id.unwrap_or(Uuid::new_v4()),
@ -315,7 +315,7 @@ impl JobResult {
//pub fn as_job_output(&self) -> JobOutput {}
}
impl Default for JobResult {
impl Default for ExactJob {
fn default() -> Self {
Self {
agent_id: Uuid::nil(),
@ -333,7 +333,7 @@ impl Default for JobResult {
pub struct Job {
exec_type: JobType,
payload: Option<Vec<u8>>,
result: JobResult
result: ExactJob
}
impl Job {
@ -350,14 +350,14 @@ impl Job {
Ok(Self {
exec_type: job_meta.exec_type,
payload: job_meta.payload,
result: JobResult::from_meta(job_meta.id.clone(), Some(result_id))
result: ExactJob::from_meta(job_meta.id.clone(), Some(result_id))
})
},
_ => todo!()
}
}
async fn run(mut self) -> UResult<JobResult> {
async fn run(mut self) -> UResult<ExactJob> {
match self.exec_type {
JobType::Shell => {
let str_payload = match &self.payload {
@ -406,7 +406,7 @@ impl Job {
}
}
pub fn build_jobs_with_result<J: OneOrMany<JobResult>>(job_requests: J) -> Waiter {
pub fn build_jobs_with_result<J: OneOrMany<ExactJob>>(job_requests: J) -> Waiter {
let prepared_jobs = job_requests.into_vec()
.into_iter()
.filter_map(|jr| -> Option<DynFut> {
@ -433,8 +433,8 @@ pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let job_requests = job_metas.into_vec().into_iter().map(|jm| {
let j_uid = jm.id;
JobCache::insert(jm);
JobResult::from_meta(j_uid, None)
}).collect::<Vec<JobResult>>();
ExactJob::from_meta(j_uid, None)
}).collect::<Vec<ExactJob>>();
build_jobs_with_result(job_requests)
}
@ -442,7 +442,7 @@ pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
#[cfg(test)]
mod tests {
use super::*;
use crate::{build_jobs, utils::vec_to_string, pop_completed, spawn_dummy};
use crate::{build_jobs, utils::vec_to_string, pop_completed};
#[tokio::test]
async fn test_is_really_async() {

@ -38,11 +38,11 @@ to_message!(
Agent,
IAgent,
JobMeta,
JobResult,
ExactJob,
String,
Vec<Agent>,
Vec<JobMeta>,
Vec<JobResult>,
Vec<ExactJob>,
Vec<Uuid>,
()
);

Loading…
Cancel
Save