From bb5e7aa04c8312854baf2eae4964c550fde738fa Mon Sep 17 00:00:00 2001 From: plazmoid Date: Tue, 16 Feb 2021 03:15:39 +0500 Subject: [PATCH] removed itemwrap bump tokio to 1.2 version finished big fat rework --- bin/u_agent/Cargo.toml | 6 ++-- bin/u_agent/src/main.rs | 68 +++++++++++++++++++----------------- bin/u_panel/Cargo.toml | 4 +-- bin/u_panel/src/main.rs | 2 +- bin/u_server/src/handlers.rs | 16 ++++----- bin/u_server/src/main.rs | 12 +++---- lib/u_lib/Cargo.toml | 6 ++-- lib/u_lib/src/api.rs | 8 ++--- lib/u_lib/src/executor.rs | 21 ++++++----- lib/u_lib/src/lib.rs | 5 +++ lib/u_lib/src/messaging.rs | 47 ++++++++++--------------- lib/u_lib/src/models/jobs.rs | 4 +-- 12 files changed, 99 insertions(+), 100 deletions(-) diff --git a/bin/u_agent/Cargo.toml b/bin/u_agent/Cargo.toml index c6dcc20..88b7024 100644 --- a/bin/u_agent/Cargo.toml +++ b/bin/u_agent/Cargo.toml @@ -7,10 +7,10 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "0.2.22", features = ["macros", "rt-core", "process", "blocking"] } +tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] } sysinfo = "0.10.5" log = "^0.4" -env_logger = "0.7.1" +env_logger = "0.8.3" uuid = "0.8.1" -reqwest = { version = "0.10.7", features = ["json"] } +reqwest = { version = "0.11", features = ["json"] } u_lib = { version = "*", path = "../../lib/u_lib" } \ No newline at end of file diff --git a/bin/u_agent/src/main.rs b/bin/u_agent/src/main.rs index b0d80b5..b5a721e 100644 --- a/bin/u_agent/src/main.rs +++ b/bin/u_agent/src/main.rs @@ -7,51 +7,55 @@ // проверка ssh ключей и распространение через known_hosts // самоуничтожение -use { - std::thread::sleep, - std::time::Duration, - std::env, - u_lib::{ - api::ClientHandler, - models::*, - send_jobs_to_executor, - }, +#[macro_use] +extern crate log; +extern crate env_logger; + +use std::env; +use u_lib::{ + api::ClientHandler, + models::{gather}, + build_jobs, }; +use tokio::{time::{Duration, sleep}}; + +#[macro_export] +macro_rules! retry_until_ok { + ( $body:expr ) => { + loop { + match $body { + Ok(r) => break r, + Err(e) => eprintln!("{:?}", e) + }; + sleep(Duration::from_secs(2)).await; + } + } +} #[tokio::main] async fn main() { //daemonize(); + env_logger::init(); let arg_ip = env::args().nth(1); let instance = ClientHandler::new(arg_ip); + debug!("Gathering info"); let cli_info = gather().await; + debug!("Connecting to the server"); retry_until_ok!(instance.init(&cli_info).await); + debug!("Instanciated! Running main loop"); loop {/* - let jobs = retry_until_ok!(instance.get_jobs().await); - if jobs.0.len() > 0 { - let result = send_jobs_to_executor(jobs - .into_inner() - .values() - .collect() - ).await; + let jobs = retry_until_ok!(instance.get_jobs().await).unwrap(); + if jobs.len() > 0 { + let result = build_jobs(jobs) + .run_until_complete() + .await; retry_until_ok!(instance.report( - ItemWrap(result.into_iter().map(|r| r.unwrap()).collect()) + result.into_iter().map(|r| r.unwrap()).collect() ).await) }*/ - //let jobs = retry_until_ok!(instance.get_jobs().await); - //println!("{:?}", jobs); - sleep(Duration::from_secs(2)); + let jobs = retry_until_ok!(instance.get_jobs().await); + println!("{:?}", jobs); + sleep(Duration::from_secs(2)).await; } } -#[macro_export] -macro_rules! retry_until_ok { - ( $body:expr ) => { - loop { - match $body { - Ok(r) => break r, - Err(e) => println!("{:?}", e) - }; - sleep(Duration::from_secs(2)); - } - } -} diff --git a/bin/u_panel/Cargo.toml b/bin/u_panel/Cargo.toml index d9a4b71..de289c5 100644 --- a/bin/u_panel/Cargo.toml +++ b/bin/u_panel/Cargo.toml @@ -7,9 +7,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "0.2.22", features = ["macros", "rt-core", "process", "blocking"] } +tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process"] } log = "^0.4" env_logger = "0.7.1" uuid = "0.8.1" -reqwest = { version = "0.10.7", features = ["json"] } +reqwest = { version = "0.11", features = ["json"] } u_lib = { version = "*", path = "../../lib/u_lib" } diff --git a/bin/u_panel/src/main.rs b/bin/u_panel/src/main.rs index 24e8626..414e1b3 100644 --- a/bin/u_panel/src/main.rs +++ b/bin/u_panel/src/main.rs @@ -18,7 +18,7 @@ async fn main() -> Result<(), &'static str> { "ls" => { let result = cli_handler.ls().await; for cli in result.iter() { - println!("{}", cli.0) + println!("{:?}", cli) } }, _ => return Err("Unknown method") diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 0decc93..a70a599 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -12,11 +12,12 @@ use crate::db::{ }; pub async fn add_agent( - msg: Message<'_, IAgent>, + msg: BaseMessage<'_, IAgent>, db: Storage) -> Result { - let agent = msg.item; - let result = db.lock().unwrap().new_agent(agent.into_owned()); + let result = db.lock() + .unwrap() + .new_agent(msg.into_item()); match result { Ok(_) => Ok(warp::reply::json( &RawMsg("Added".to_string()).as_message() @@ -50,7 +51,7 @@ pub async fn get_job_results( let storage = db.results().await; match storage.get(&uid) { Some(v) => Ok(warp::reply::json( - &Message::new(v.clone()) + &BaseMessage::new(v.clone()) )), None => Err(warp::reject()) } @@ -67,13 +68,13 @@ pub async fn get_jobs( } }); Ok(warp::reply::json( - &Message::new(cli.jobs.clone()) + &BaseMessage::new(cli.jobs.clone()) )) } pub async fn set_jobs( uid: Option, - msg: Message<'_, ItemWrap>, + msg: BaseMessage<'_, ItemWrap>, db: Storage) -> Result { let mut clients = db.clients().await; @@ -92,10 +93,9 @@ pub async fn get_agents(db: Storage) -> Result { let result = db.lock().unwrap().get_agents(); match result { Ok(r) => Ok(warp::reply::json( - &ItemWrap(r).as_message() + &r.into_message() )), Err(e) => { - error!("{}", &e); Err(warp::reject()) } } diff --git a/bin/u_server/src/main.rs b/bin/u_server/src/main.rs index b9e32e2..70ec83b 100644 --- a/bin/u_server/src/main.rs +++ b/bin/u_server/src/main.rs @@ -11,7 +11,7 @@ use warp::{ #[macro_use] extern crate log; -use env_logger; +extern crate env_logger; use u_lib::{ MASTER_PORT, @@ -24,13 +24,13 @@ use serde::{ }; fn get_content() - -> impl Filter,), + -> impl Filter,), Error = Rejection> + Clone where M: ToMsg + Sync + Send + DeserializeOwned + 'static { body::content_length_limit(1024*64) - .and(body::json::>()) + .and(body::json::>()) } @@ -51,12 +51,12 @@ async fn main() { .and(warp::path(Paths::ls)) .and(db.clone()) .and_then(handlers::get_agents); - +/* let upload_job = warp::post() .and(warp::path(Paths::upload_job)) .and(db.clone()) .and_then(handlers::upload_job); -/* + let get_jobs = warp::get() .and(warp::path(Paths::get_jobs)) .and(db.clone()) @@ -90,7 +90,7 @@ async fn main() { let auth_zone = auth_token .and(get_agents - .or(upload_job) +// .or(upload_job) // .or(set_jobs) // .or(get_job_results) ) diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index cba3a30..124d191 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -12,11 +12,13 @@ uuid = { version = "0.6.5", features = ["serde", "v4"] } nix = "0.17" libc = "^0.2" lazy_static = "1.4.0" -tokio = { version = "1.2.0", features = ["rt", "sync", "macros", "process", "time"] } -reqwest = { version = "0.10.7", features = ["json"] } +tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] } +reqwest = { version = "0.11", features = ["json"] } futures = "0.3.5" guess_host_triple = "0.1.2" thiserror = "*" +log = "*" +env_logger = "0.8.3" diesel-derive-enum = { version = "1", features = ["postgres"] } [dependencies.diesel] diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index 61c00d1..fc6c6b0 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -27,7 +27,7 @@ macro_rules! get_result { ( $result:ty ) => { |response: Response| async { response - .json::>() + .json::>() .await .map(|msg| msg.into_item()) .map_err(|e| UError::from(e)) @@ -52,7 +52,7 @@ macro_rules! build_url_by_method { ) ); request - $( .json::>(¶m.as_message()) )? + $( .json::>(¶m.as_message()) )? } }; ( @@ -152,9 +152,9 @@ impl ClientHandler { // method basic_path(json/query param; additional_url_param) -> return value // A - admin only // client listing (A) -build_handler!(GET ls() -> ItemWrap>); +build_handler!(GET ls() -> Vec); // get jobs for client himself (A: id=client_id) -build_handler!(GET get_jobs() -> ItemWrap>); +build_handler!(GET get_jobs() -> Vec); // add client to server's db build_handler!(POST init(IAgent) -> RawMsg); // create and upload job (A) diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/executor.rs index 321a84a..0c4d92c 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/executor.rs @@ -20,20 +20,20 @@ pub type DynFut = BoxFuture<'static, FutRes>; lazy_static! { static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); - static ref FUT_CHANNEL: (Mutex>, Mutex>) = { + /*static ref FUT_CHANNEL: (Mutex>, Mutex>) = { spawn(init_receiver()); let (tx, rx) = channel(100); (Mutex::new(tx), Mutex::new(rx)) - }; + };*/ } - +/* async fn get_static_channel() -> (Sender, MutexGuard<'static, Receiver>) { ( FUT_CHANNEL.0.lock().await.clone(), FUT_CHANNEL.1.lock().await ) } - +*/ struct JoinInfo { handle: JoinHandle, completed: bool, @@ -57,18 +57,18 @@ impl Waiter { let collectable = self.tasks.len() != 1; for f in self.tasks.drain(..) { //eprintln!("before static channel"); - let tx = get_static_channel().await.0; + //let tx = get_static_channel().await.0; //eprintln!("after static channel"); let fid = Uuid::new_v4(); self.fids.push(fid); - let task_wrapper = async move { + /*let task_wrapper = async move { //eprintln!("inside wrapper (started): {}", fid); let result = f.await; tx.send(fid).await.unwrap(); result - }; + };*/ let handle = JoinInfo { - handle: spawn(task_wrapper), + handle: spawn(f), completed: false, collectable }; @@ -105,16 +105,15 @@ impl Waiter { async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } - +/* async fn init_receiver() { while let Some(fid) = get_static_channel().await.1.recv().await { - eprintln!("task {} is done", fid); if let Some(j) = FUT_RESULTS.lock().await.get_mut(&fid) { j.completed = true; } } } - +*/ pub async fn pop_task_if_completed(fid: Uuid) -> Option { let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS .lock() diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index fa9216d..8cf8352 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(non_upper_case_globals)] pub mod executor; pub mod config; pub mod utils; @@ -19,3 +20,7 @@ extern crate lazy_static; #[macro_use] extern crate diesel; + +#[macro_use] +extern crate log; +extern crate env_logger; \ No newline at end of file diff --git a/lib/u_lib/src/messaging.rs b/lib/u_lib/src/messaging.rs index c10a5a5..673be6d 100644 --- a/lib/u_lib/src/messaging.rs +++ b/lib/u_lib/src/messaging.rs @@ -10,23 +10,24 @@ use uuid::Uuid; use crate::{UID}; pub trait ToMsg: Clone { //+ Serialize + DeserializeOwned { - fn as_message<'m>(&'m self) -> Message<'m, Self> + fn as_message<'m>(&'m self) -> BaseMessage<'m, Self> where Cow<'m, Self>: From<&'m Self> { - Message::new(self) + BaseMessage::new(self) } -} -// 1. Cow<'_, ItemWrap> - failed, Message::new needs val or ref -// 2. ItemWrap> - can't impl From> for Cow + fn into_message(self) -> BaseMessage<'static, Self> { + BaseMessage::new_cow(self) + } +} #[derive(Serialize, Deserialize, Debug)] -pub struct Message<'cow, I> +pub struct BaseMessage<'cow, I> where I: ToMsg { pub id: Uuid, pub item: Cow<'cow, I> } -impl<'cow, I> Message<'cow, I> +impl<'cow, I> BaseMessage<'cow, I> where I: ToMsg { pub fn new(item: C) -> Self @@ -37,6 +38,13 @@ impl<'cow, I> Message<'cow, I> } } + pub fn new_cow(item: I) -> Self { + Self { + id: UID.clone(), + item: Cow::Owned(item) + } + } + pub fn into_item(self) -> I { self.item.into_owned() } @@ -45,26 +53,7 @@ impl<'cow, I> Message<'cow, I> #[derive(Serialize, Deserialize, Debug, Clone)] pub struct RawMsg(pub String); -impl ToMsg for Vec {} //TODO: impl this for all collections - - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ItemWrap(pub T); - -impl ToMsg for ItemWrap {} - -impl<'cow, T: ToMsg> From> for Cow<'cow, ItemWrap> { - fn from(obj: ItemWrap) -> Cow<'cow, ItemWrap> { - Cow::Owned(obj) - } -} - -impl<'cow, T: ToMsg> From<&'cow ItemWrap> for Cow<'cow, ItemWrap> { - fn from(obj: &'cow ItemWrap) -> Cow<'cow, ItemWrap> { - Cow::Borrowed(obj) - } -} - +impl ToMsg for Vec {} /* #[cfg(test)] mod tests { @@ -73,11 +62,11 @@ mod tests { #[test] fn test_create_message_owned() { let item = String::from("QWEDSA"); - let msg_raw = Message { + let msg_raw = BaseMessage { id: *UID, item: Cow::Owned(item.clone()) }; - let msg = Message::new(item); + let msg = BaseMessage::new(item); assert_eq!(msg_raw.item, msg.item); } }*/ \ No newline at end of file diff --git a/lib/u_lib/src/models/jobs.rs b/lib/u_lib/src/models/jobs.rs index 3b80168..a307057 100644 --- a/lib/u_lib/src/models/jobs.rs +++ b/lib/u_lib/src/models/jobs.rs @@ -341,7 +341,7 @@ mod tests { assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS); Ok(()) } - +/* #[tokio::test] async fn test_exec_multiple_jobs_nowait() -> UResult<()> { const REPEATS: usize = 10; @@ -358,7 +358,7 @@ mod tests { } Ok(()) } - +*/ #[tokio::test] async fn test_failing_shell_job() -> UResult<()> { let job = JobMeta::from_shell("lol_kek_puk");