diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index 2f4179c..89e0c93 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -11,10 +11,14 @@ use { std::thread::sleep, std::time::Duration, std::env, - u_lib::client::{ - ClientInfo, - network::ClientHandler - } + u_lib::{ + client::{ + ClientInfo, + network::ClientHandler + }, + contracts::*, + send_jobs_to_executor, + }, }; #[tokio::main] @@ -27,7 +31,14 @@ async fn main() { loop { let jobs = retry_until_ok!(instance.get_jobs().await); if jobs.0.len() > 0 { - println!("{:?}", jobs); + let result = send_jobs_to_executor(jobs + .into_inner() + .values() + .collect() + ).await; + retry_until_ok!(instance.report( + IterWrap(result.into_iter().map(|r| r.unwrap()).collect()) + ).await) } sleep(Duration::from_secs(2)); } diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 7ec2a49..ba00d19 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -2,14 +2,45 @@ use u_lib::contracts::*; use warp::{ Rejection, Reply, + reply::with_status, + http::StatusCode }; +use uuid::Uuid; + +pub async fn report( + msg: Message<'_, IterWrap>>, + db: Storage) -> Result +{ + let results = msg.item.into_inner(); + let mut storage = db.results().await; + results.into_iter().for_each(|new_result| { + match storage.get_mut(&new_result.id) { + Some(v) => v.push(new_result), + None => storage.insert(new_result.id, vec![new_result]) + } + }); + Ok(with_status(warp::reply(), StatusCode::OK)) +} + +pub async fn get_job_results( + uid: Uuid, + db: Storage) -> Result +{ + let storage = db.results().await; + match storage.get(&uid) { + Some(v) => Ok(warp::reply::json( + &Message::new(v.clone()) + )), + None => Err(warp::reject()) + } +} pub async fn add_client( msg: Message<'_, ClientInfo>, db: Storage) -> Result { let new_cli = msg.item; - let mut clients = db.lock().await; + let mut clients = db.clients().await; if clients.contains_key(&new_cli.id) { Ok(warp::reply::json( &RawMsg("Already exist".to_string()).as_message() @@ -28,7 +59,7 @@ pub async fn add_client( pub async fn get_jobs( db: Storage) -> Result { - let mut clients = db.lock().await; + let mut clients = db.clients().await; let cli = clients.get_mut(&msg.id).unwrap(); cli.jobs.iter_mut().for_each(|job: &mut JobMeta| { if job.state == JobState::Queued { @@ -42,10 +73,10 @@ pub async fn get_jobs( pub async fn set_jobs( uid: Option, - msg: Message<'_, CollectionWrapper>, + msg: Message<'_, IterWrap>, db: Storage) -> Result { - let mut clients = db.lock().await; + let mut clients = db.clients().await; let cli = clients.get_mut(&uid.unwrap_or(msg.id)).unwrap(); msg.item.0.into_iter().for_each(|(uuid, job)| { match cli.jobs.get_mut(&uuid) { @@ -57,13 +88,13 @@ pub async fn set_jobs( } -pub async fn listing(db: Storage) -> Result { - let clients = db.lock().await; +pub async fn ls(db: Storage) -> Result { + let clients = db.clients().await; let mut result: Vec = Vec::with_capacity(clients.len()); for cli in clients.values() { result.push(cli.client_info.clone()); } Ok(warp::reply::json( - &CollectionWrapper(result).as_message() + &IterWrap(result).as_message() )) } \ No newline at end of file diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index ed801e6..b522914 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -36,7 +36,7 @@ async fn main() { let ls = warp::get() .and(warp::path(Paths::ls)) .and(db.clone()) - .and_then(handlers::listing); + .and_then(handlers::ls); let get_jobs = warp::get() .and(warp::path(Paths::get_jobs)) @@ -46,27 +46,33 @@ async fn main() { let set_jobs = warp::post() .and(warp::path(Paths::set_jobs)) .and(warp::path::param::().map(Some)) - .and(get_content::>()) + .and(get_content::>()) .and(db.clone()) .and_then(handlers::set_jobs); - let update_own_jobs = warp::post() - .and(warp::path(Paths::set_jobs)) - .and(warp::path::param::().map(Some)) - .and(get_content::>()) + let get_job_results = warp::get() + .and(warp::path(Paths::get_job_results)) + .and(warp::path::param::()) .and(db.clone()) - .and_then(handlers::set_jobs); + .and_then(handlers::get_job_results); + + let report = warp::post() + .and(warp::path(Paths::report)) + .and(get_content::>>()) + .and(db.clone()) + .and_then(handlers::report); let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); let agent_zone = new_client .or(get_jobs) - .or(update_own_jobs) + .or(report) ; let auth_zone = auth_token .and(ls .or(set_jobs) + .or(get_job_results) ); let routes = auth_zone diff --git a/lib/u_lib/src/client/client.rs b/lib/u_lib/src/client/client.rs index 83d17c2..2380d34 100644 --- a/lib/u_lib/src/client/client.rs +++ b/lib/u_lib/src/client/client.rs @@ -67,7 +67,6 @@ mod tests { use crate::{ utils::vec_to_string }; - use std::time::SystemTime; #[tokio::test] async fn test_gather() { diff --git a/lib/u_lib/src/client/network.rs b/lib/u_lib/src/client/network.rs index cd05bd1..4ea4e53 100644 --- a/lib/u_lib/src/client/network.rs +++ b/lib/u_lib/src/client/network.rs @@ -17,6 +17,7 @@ use std::{ net::Ipv4Addr, str::FromStr }; +use uuid::Uuid; pub struct Paths; @@ -34,20 +35,64 @@ macro_rules! get_result { }; } +#[macro_export] +macro_rules! build_url_by_method { + ( + POST $path:tt, + pname = $($param_name:literal)?, + ptype = $($param_type:ty)?, + urlparam = $($url_param:ty)? + ) => { + |instance: &ClientHandler $(, param: &$param_type)?| { + let request = ClientHandler::build_post( + instance, + &format!("{}/{}", + stringify!($path), + String::new() $(+ stringify!($url_param))? + ) + ); + request + $( .json::>(¶m.as_message()) )? + } + }; + ( + GET $path:tt, + pname = $($param_name:literal)?, + ptype = $($param_type:ty)?, + urlparam = $($url_param:ty)? + ) => { + |instance: &ClientHandler $(, param: &$param_type)?| { + let request = ClientHandler::build_get( + instance, + &format!("{}/{}", + stringify!($path), + String::new() $(+ stringify!($url_param))? + ) + ); + request + $( .query(&[(stringify!($param_name), param.to_string())]) )? + } + }; +} + // param and result must impl ToMsg #[macro_export] macro_rules! build_handler { - ( $method:tt $path:tt($($param:ty)?) -> $result:ty ) => { + ( + $method:tt + $path:tt( + $($param_name:literal:)? + $($param_type:ty)? + $(; $url_param:ty)? + ) -> $result:ty ) => { impl ClientHandler { - pub async fn $path(&self $(, param: &$param)?) -> UResult<$result> { - let builder = match stringify!($method) { - "POST" => ClientHandler::build_post, - "GET" => ClientHandler::build_get, - _ => panic!("Method '{}' is not allowed", stringify!($method)) - }; - let mut request = builder(self, stringify!($path)); - request = request - $( .json::>(¶m.as_message()) )? ; + pub async fn $path( + &self $(, param: &$param_type)? $(, url_param: &$url_param)? + ) -> UResult<$result> { + let request = $crate::build_url_by_method!( + $method $path, + pname = $($param_name)?, ptype = $($param_type)?, urlparam = $($url_param)? + )(self $(, param as &$param_type)? ); let response = request.send().await?; ($crate::get_result!($result)(response)).await } @@ -103,15 +148,21 @@ impl ClientHandler { } } +// method basic_path(json/query param; additional_url_param) -> return value // A - admin only // client listing (A) -build_handler!(GET ls() -> CollectionWrapper>); +build_handler!(GET ls() -> IterWrap>); // get jobs for client himself (A: id=client_id) -build_handler!(GET get_jobs() -> CollectionWrapper); +build_handler!(GET get_jobs() -> IterWrap); // add client to server's db build_handler!(POST init(ClientInfo) -> RawMsg); // ??? build_handler!(POST del() -> ()); // set jobs for client (A) -build_handler!(POST set_jobs(CollectionWrapper) -> ()); -// get_results (A): user_id, job_id +// POST /set_jobs/Uuid json: JobMetaStorage +build_handler!(POST set_jobs(IterWrap; Uuid) -> ()); +// get results (A) +// GET /get_job_results?job_id=Uuid +build_handler!(GET get_job_results("job_id":Uuid) -> IterWrap>); +// report job result +build_handler!(POST report(IterWrap>) -> ()); \ No newline at end of file diff --git a/lib/u_lib/src/contracts/datatypes.rs b/lib/u_lib/src/contracts/datatypes.rs index 8d2b353..110fc56 100644 --- a/lib/u_lib/src/contracts/datatypes.rs +++ b/lib/u_lib/src/contracts/datatypes.rs @@ -14,29 +14,36 @@ use { pub type CliStorage = HashMap; +pub type JobResultsStorage = HashMap>; pub type JobMetaStorage = HashMap; pub type JobMetaRef = Arc>; -// because can't impl From> for Cow +// because can't impl From> for Cow #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct CollectionWrapper(pub T); +pub struct IterWrap(pub T); -impl From for CollectionWrapper { +impl IterWrap { + pub fn into_inner(self) -> T { + self.0 + } +} + +impl From for IterWrap { fn from(t: T) -> Self { - CollectionWrapper(t) + IterWrap(t) } } -impl ToMsg for CollectionWrapper {} +impl ToMsg for IterWrap {} -impl<'cow, T: Clone> From> for Cow<'cow, CollectionWrapper> { - fn from(obj: CollectionWrapper) -> Cow<'cow, CollectionWrapper> { +impl<'cow, T: Clone> From> for Cow<'cow, IterWrap> { + fn from(obj: IterWrap) -> Cow<'cow, IterWrap> { Cow::Owned(obj) } } -impl<'cow, T: Clone> From<&'cow CollectionWrapper> for Cow<'cow, CollectionWrapper> { - fn from(obj: &'cow CollectionWrapper) -> Cow<'cow, CollectionWrapper> { +impl<'cow, T: Clone> From<&'cow IterWrap> for Cow<'cow, IterWrap> { + fn from(obj: &'cow IterWrap) -> Cow<'cow, IterWrap> { Cow::Borrowed(obj) } } @@ -44,7 +51,8 @@ impl<'cow, T: Clone> From<&'cow CollectionWrapper> for Cow<'cow, CollectionWr #[derive(Clone)] pub struct Storage { - clients: Arc> + clients: Arc>, + jobs_results: Arc> } impl Storage { @@ -52,11 +60,18 @@ impl Storage { Self { clients: Arc::new( Mutex::new(HashMap::::new()) + ), + jobs_results: Arc::new( + Mutex::new(HashMap::>::new()) ) } } - pub async fn lock(&self) -> MutexGuard<'_, CliStorage> { + pub async fn clients(&self) -> MutexGuard<'_, CliStorage> { self.clients.lock().await } + + pub async fn results(&self) -> MutexGuard<'_, JobResultsStorage> { + self.jobs_results.lock().await + } } \ No newline at end of file diff --git a/lib/u_lib/src/contracts/jobs.rs b/lib/u_lib/src/contracts/jobs.rs index 172a039..7a077ed 100644 --- a/lib/u_lib/src/contracts/jobs.rs +++ b/lib/u_lib/src/contracts/jobs.rs @@ -166,7 +166,7 @@ pub struct JobResult { pub data: Option>, pub state: JobState, pub retcode: Option, - pub date: SystemTime, + pub timestamp: SystemTime, } @@ -189,6 +189,7 @@ impl Job { }, data: None, retcode: None, + timestamp: SystemTime::now() }, meta: job_meta, } @@ -299,7 +300,8 @@ impl Job { self.state() == JobState::Finished } - pub fn into_result(self) -> JobResult { + pub fn into_result(mut self) -> JobResult { + self.result.timestamp = SystemTime::now(); self.result } } diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index 29111f7..cec5d6e 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -18,11 +18,7 @@ use crate::{ }; use std::{ sync::{Mutex, MutexGuard, Arc}, - thread, - time::Duration, - collections::HashMap, }; -use uuid::Uuid; //use tokio::task::JoinHandle; use futures::future::{join_all, JoinAll}; use futures::Future; @@ -50,7 +46,7 @@ impl AsyncExecutor { } } */ - pub async fn apply_job(&mut self, mut new_job: Job) -> UResult { + pub async fn apply_job(&mut self, new_job: Job) -> UResult { /*let id = new_job.id(); let mut job_pool = self.jobs.lock().unwrap(); job_pool.push(new_job);