From 9ccf69e692c7f27f7331560a578a90a5a995e9a0 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Thu, 3 Sep 2020 03:28:14 +0500 Subject: [PATCH] almost shareable jobs --- bin/u_agent/src/main.rs | 19 +++++++++++- bin/u_server/src/handlers.rs | 32 ++++++++++++++++++++ bin/u_server/src/main.rs | 45 +++++++++++++++++++++++----- lib/u_lib/src/client/client.rs | 4 +-- lib/u_lib/src/client/network.rs | 6 ++-- lib/u_lib/src/contracts/datatypes.rs | 6 ++++ lib/u_lib/src/contracts/mod.rs | 2 +- 7 files changed, 100 insertions(+), 14 deletions(-) diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index c7fad73..57f19fb 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -23,8 +23,25 @@ async fn main() { let arg_ip = env::args().nth(1); let instance = ClientHandler::new(arg_ip); let cli_info = ClientInfo::gather(); - instance.init(&cli_info).await; + retry_until_ok!(instance.init(&cli_info).await); loop { + let jobs = retry_until_ok!(instance.get_jobs().await); + if jobs.len() > 0 { + println!("{:?}", jobs); + } sleep(Duration::from_secs(2)); } } + +#[macro_export] +macro_rules! retry_until_ok { + ( $body:stmt ) => { + loop { + match $body { + Ok(r) => break r, + Err(e) => println!("{:?}", e) + }; + sleep(Duration::from_secs(2)); + } + } +} diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 3f7128c..41b1623 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -25,6 +25,38 @@ pub async fn add_client( } } +pub async fn get_jobs( + db: Storage) -> Result +{ + let mut clients = db.lock().await; + let cli = clients.get_mut(&msg.id).unwrap(); + cli.jobs.iter_mut().for_each(|job: &mut JobMeta| { + if job.state == JobState::Queued { + job.state = JobState::Pending + } + }); + Ok(warp::reply::json( + &Message::new(cli.jobs.clone()) + )) +} + +pub async fn set_jobs( + uid: Option, + msg: Message<'_, JobStorageWrapper>, + db: Storage) -> Result +{ + let mut clients = db.lock().await; + let cli = clients.get_mut(&uid.unwrap_or(msg.id)).unwrap(); + msg.item.0.into_iter().for_each(|(uuid, job)| { + match cli.jobs.get_mut(&uuid) { + Some(cli_job) => *cli_job = job, + None => cli.jobs.push(job) + }; + }); + Ok(()) + +} + pub async fn listing(db: Storage) -> Result { let clients = db.lock().await; let mut result: Vec = Vec::with_capacity(clients.len()); diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index 7e9581b..4184022 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -10,12 +10,14 @@ use env_logger; use u_lib::{ MASTER_PORT, contracts::*, + client::network::Paths }; +use uuid::Uuid; -fn get_content() -> impl Filter,), +fn get_content() -> impl Filter,), Error = Rejection> + Clone { - body::content_length_limit(1024*64).and(body::json::>()) + body::content_length_limit(1024*64).and(body::json::>()) } @@ -26,22 +28,49 @@ async fn main() { let db = warp::any().map(move || base_db.clone()); let new_client = warp::post() - .and(warp::path("init")) - .and(get_content()) + .and(warp::path(Paths::init)) + .and(get_content::()) .and(db.clone()) .and_then(handlers::add_client); let ls = warp::get() - .and(warp::path("ls")) + .and(warp::path(Paths::ls)) .and(db.clone()) .and_then(handlers::listing); + let get_jobs = warp::get() + .and(warp::path(Paths::get_jobs)) + .and(db.clone()) + .and_then(handlers::get_jobs); + + let set_jobs = warp::post() + .and(warp::path(Paths::set_jobs)) + .and(warp::path::param::().map(Some)) + .and(get_content::()) + .and(db.clone()) + .and_then(handlers::set_jobs); + + let update_own_jobs = warp::post() + .and(warp::path(Paths::set_jobs)) + .and(warp::path::param::().map(Some)) + .and(get_content::()) + .and(db.clone()) + .and_then(handlers::set_jobs); + let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); - let auth_zone = auth_token.and(ls); + let agent_zone = new_client + .or(get_jobs) + .or(update_own_jobs) + ; + + let auth_zone = auth_token + .and(ls + .or(set_jobs) + ); - let routes = new_client - .or(auth_zone) + let routes = auth_zone + .or(agent_zone) .with(warp::log("warp")); warp::serve(routes) .run(([0,0,0,0], MASTER_PORT)).await; diff --git a/lib/u_lib/src/client/client.rs b/lib/u_lib/src/client/client.rs index a6828d8..6ec31bf 100644 --- a/lib/u_lib/src/client/client.rs +++ b/lib/u_lib/src/client/client.rs @@ -12,14 +12,14 @@ use crate::{contracts::*, UID, exec_job}; pub struct UClient { pub client_info: ClientInfo, - pub jobs: Vec, // TODO: to futures + pub jobs: JobStorage, // TODO: to futures } impl UClient { pub fn new(client_info: ClientInfo) -> Self { Self { client_info, - jobs: Vec::new() + jobs: HashMap::new() } } } diff --git a/lib/u_lib/src/client/network.rs b/lib/u_lib/src/client/network.rs index f6ce57c..951f23a 100644 --- a/lib/u_lib/src/client/network.rs +++ b/lib/u_lib/src/client/network.rs @@ -21,7 +21,7 @@ use std::{ pub struct Paths; #[macro_export] -macro_rules! epilogue { +macro_rules! get_result { ( () ) => ( |_| async Ok(()) ); ( $result:ty ) => { |response: Response| async { @@ -49,7 +49,7 @@ macro_rules! build_handler { request = request $(.json::>(¶m.as_message()))?; let response = request.send().await?; - ($crate::epilogue!($result)(response)).await + ($crate::get_result!($result)(response)).await } } @@ -106,3 +106,5 @@ impl ClientHandler { build_handler!(POST init(ClientInfo) -> RawMsg); build_handler!(GET ls() -> Vec); build_handler!(POST del() -> ()); +build_handler!(GET get_jobs() -> JobStorageWrapper); +build_handler!(POST set_jobs(JobStorageWrapper) -> ()); diff --git a/lib/u_lib/src/contracts/datatypes.rs b/lib/u_lib/src/contracts/datatypes.rs index 3655f8a..0f97109 100644 --- a/lib/u_lib/src/contracts/datatypes.rs +++ b/lib/u_lib/src/contracts/datatypes.rs @@ -7,9 +7,15 @@ use { std::sync::Arc, std::collections::HashMap, uuid::Uuid, + serde::{Serialize, Deserialize} }; pub type CliStorage = HashMap; +pub type JobStorage = HashMap; + +// because can't impl From> for Cow +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct JobStorageWrapper(pub JobStorage); #[derive(Clone)] diff --git a/lib/u_lib/src/contracts/mod.rs b/lib/u_lib/src/contracts/mod.rs index 369798a..fdbc66c 100644 --- a/lib/u_lib/src/contracts/mod.rs +++ b/lib/u_lib/src/contracts/mod.rs @@ -34,4 +34,4 @@ macro_rules! to_message { } } -to_message!(ClientInfo, RawMsg); \ No newline at end of file +to_message!(ClientInfo, RawMsg, JobStorageWrapper); \ No newline at end of file