removed itemwrap

bump tokio to 1.2 version
finished big fat rework
4-update-check
plazmoid 4 years ago
parent e921ee98a5
commit bb5e7aa04c
  1. 6
      bin/u_agent/Cargo.toml
  2. 68
      bin/u_agent/src/main.rs
  3. 4
      bin/u_panel/Cargo.toml
  4. 2
      bin/u_panel/src/main.rs
  5. 16
      bin/u_server/src/handlers.rs
  6. 12
      bin/u_server/src/main.rs
  7. 6
      lib/u_lib/Cargo.toml
  8. 8
      lib/u_lib/src/api.rs
  9. 21
      lib/u_lib/src/executor.rs
  10. 5
      lib/u_lib/src/lib.rs
  11. 47
      lib/u_lib/src/messaging.rs
  12. 4
      lib/u_lib/src/models/jobs.rs

@ -7,10 +7,10 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "0.2.22", features = ["macros", "rt-core", "process", "blocking"] }
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process", "time"] }
sysinfo = "0.10.5"
log = "^0.4"
env_logger = "0.7.1"
env_logger = "0.8.3"
uuid = "0.8.1"
reqwest = { version = "0.10.7", features = ["json"] }
reqwest = { version = "0.11", features = ["json"] }
u_lib = { version = "*", path = "../../lib/u_lib" }

@ -7,51 +7,55 @@
// проверка ssh ключей и распространение через known_hosts
// самоуничтожение
use {
std::thread::sleep,
std::time::Duration,
std::env,
u_lib::{
api::ClientHandler,
models::*,
send_jobs_to_executor,
},
#[macro_use]
extern crate log;
extern crate env_logger;
use std::env;
use u_lib::{
api::ClientHandler,
models::{gather},
build_jobs,
};
use tokio::{time::{Duration, sleep}};
#[macro_export]
macro_rules! retry_until_ok {
( $body:expr ) => {
loop {
match $body {
Ok(r) => break r,
Err(e) => eprintln!("{:?}", e)
};
sleep(Duration::from_secs(2)).await;
}
}
}
#[tokio::main]
async fn main() {
//daemonize();
env_logger::init();
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip);
debug!("Gathering info");
let cli_info = gather().await;
debug!("Connecting to the server");
retry_until_ok!(instance.init(&cli_info).await);
debug!("Instanciated! Running main loop");
loop {/*
let jobs = retry_until_ok!(instance.get_jobs().await);
if jobs.0.len() > 0 {
let result = send_jobs_to_executor(jobs
.into_inner()
.values()
.collect()
).await;
let jobs = retry_until_ok!(instance.get_jobs().await).unwrap();
if jobs.len() > 0 {
let result = build_jobs(jobs)
.run_until_complete()
.await;
retry_until_ok!(instance.report(
ItemWrap(result.into_iter().map(|r| r.unwrap()).collect())
result.into_iter().map(|r| r.unwrap()).collect()
).await)
}*/
//let jobs = retry_until_ok!(instance.get_jobs().await);
//println!("{:?}", jobs);
sleep(Duration::from_secs(2));
let jobs = retry_until_ok!(instance.get_jobs().await);
println!("{:?}", jobs);
sleep(Duration::from_secs(2)).await;
}
}
#[macro_export]
macro_rules! retry_until_ok {
( $body:expr ) => {
loop {
match $body {
Ok(r) => break r,
Err(e) => println!("{:?}", e)
};
sleep(Duration::from_secs(2));
}
}
}

@ -7,9 +7,9 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "0.2.22", features = ["macros", "rt-core", "process", "blocking"] }
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread", "process"] }
log = "^0.4"
env_logger = "0.7.1"
uuid = "0.8.1"
reqwest = { version = "0.10.7", features = ["json"] }
reqwest = { version = "0.11", features = ["json"] }
u_lib = { version = "*", path = "../../lib/u_lib" }

@ -18,7 +18,7 @@ async fn main() -> Result<(), &'static str> {
"ls" => {
let result = cli_handler.ls().await;
for cli in result.iter() {
println!("{}", cli.0)
println!("{:?}", cli)
}
},
_ => return Err("Unknown method")

@ -12,11 +12,12 @@ use crate::db::{
};
pub async fn add_agent(
msg: Message<'_, IAgent>,
msg: BaseMessage<'_, IAgent>,
db: Storage) -> Result<impl Reply, Rejection>
{
let agent = msg.item;
let result = db.lock().unwrap().new_agent(agent.into_owned());
let result = db.lock()
.unwrap()
.new_agent(msg.into_item());
match result {
Ok(_) => Ok(warp::reply::json(
&RawMsg("Added".to_string()).as_message()
@ -50,7 +51,7 @@ pub async fn get_job_results(
let storage = db.results().await;
match storage.get(&uid) {
Some(v) => Ok(warp::reply::json(
&Message::new(v.clone())
&BaseMessage::new(v.clone())
)),
None => Err(warp::reject())
}
@ -67,13 +68,13 @@ pub async fn get_jobs(
}
});
Ok(warp::reply::json(
&Message::new(cli.jobs.clone())
&BaseMessage::new(cli.jobs.clone())
))
}
pub async fn set_jobs(
uid: Option<Uuid>,
msg: Message<'_, ItemWrap<JobMetaStorage>>,
msg: BaseMessage<'_, ItemWrap<JobMetaStorage>>,
db: Storage) -> Result<impl Reply, Rejection>
{
let mut clients = db.clients().await;
@ -92,10 +93,9 @@ pub async fn get_agents(db: Storage) -> Result<impl Reply, Rejection> {
let result = db.lock().unwrap().get_agents();
match result {
Ok(r) => Ok(warp::reply::json(
&ItemWrap(r).as_message()
&r.into_message()
)),
Err(e) => {
error!("{}", &e);
Err(warp::reject())
}
}

@ -11,7 +11,7 @@ use warp::{
#[macro_use]
extern crate log;
use env_logger;
extern crate env_logger;
use u_lib::{
MASTER_PORT,
@ -24,13 +24,13 @@ use serde::{
};
fn get_content<M>()
-> impl Filter<Extract = (Message<'static, M>,),
-> impl Filter<Extract = (BaseMessage<'static, M>,),
Error = Rejection> + Clone
where
M: ToMsg + Sync + Send + DeserializeOwned + 'static
{
body::content_length_limit(1024*64)
.and(body::json::<Message<M>>())
.and(body::json::<BaseMessage<M>>())
}
@ -51,12 +51,12 @@ async fn main() {
.and(warp::path(Paths::ls))
.and(db.clone())
.and_then(handlers::get_agents);
/*
let upload_job = warp::post()
.and(warp::path(Paths::upload_job))
.and(db.clone())
.and_then(handlers::upload_job);
/*
let get_jobs = warp::get()
.and(warp::path(Paths::get_jobs))
.and(db.clone())
@ -90,7 +90,7 @@ async fn main() {
let auth_zone = auth_token
.and(get_agents
.or(upload_job)
// .or(upload_job)
// .or(set_jobs)
// .or(get_job_results)
)

@ -12,11 +12,13 @@ uuid = { version = "0.6.5", features = ["serde", "v4"] }
nix = "0.17"
libc = "^0.2"
lazy_static = "1.4.0"
tokio = { version = "1.2.0", features = ["rt", "sync", "macros", "process", "time"] }
reqwest = { version = "0.10.7", features = ["json"] }
tokio = { version = "1.2.0", features = ["rt-multi-thread", "sync", "macros", "process", "time"] }
reqwest = { version = "0.11", features = ["json"] }
futures = "0.3.5"
guess_host_triple = "0.1.2"
thiserror = "*"
log = "*"
env_logger = "0.8.3"
diesel-derive-enum = { version = "1", features = ["postgres"] }
[dependencies.diesel]

@ -27,7 +27,7 @@ macro_rules! get_result {
( $result:ty ) => {
|response: Response| async {
response
.json::<Message<$result>>()
.json::<BaseMessage<$result>>()
.await
.map(|msg| msg.into_item())
.map_err(|e| UError::from(e))
@ -52,7 +52,7 @@ macro_rules! build_url_by_method {
)
);
request
$( .json::<Message<'_, $param_type>>(&param.as_message()) )?
$( .json::<BaseMessage<'_, $param_type>>(&param.as_message()) )?
}
};
(
@ -152,9 +152,9 @@ impl ClientHandler {
// method basic_path(json/query param; additional_url_param) -> return value
// A - admin only
// client listing (A)
build_handler!(GET ls() -> ItemWrap<Vec<Agent>>);
build_handler!(GET ls() -> Vec<Agent>);
// get jobs for client himself (A: id=client_id)
build_handler!(GET get_jobs() -> ItemWrap<Vec<JobMeta>>);
build_handler!(GET get_jobs() -> Vec<JobMeta>);
// add client to server's db
build_handler!(POST init(IAgent) -> RawMsg);
// create and upload job (A)

@ -20,20 +20,20 @@ pub type DynFut = BoxFuture<'static, FutRes>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
static ref FUT_CHANNEL: (Mutex<Sender<Uuid>>, Mutex<Receiver<Uuid>>) = {
/*static ref FUT_CHANNEL: (Mutex<Sender<Uuid>>, Mutex<Receiver<Uuid>>) = {
spawn(init_receiver());
let (tx, rx) = channel(100);
(Mutex::new(tx), Mutex::new(rx))
};
};*/
}
/*
async fn get_static_channel() -> (Sender<Uuid>, MutexGuard<'static, Receiver<Uuid>>) {
(
FUT_CHANNEL.0.lock().await.clone(),
FUT_CHANNEL.1.lock().await
)
}
*/
struct JoinInfo {
handle: JoinHandle<FutRes>,
completed: bool,
@ -57,18 +57,18 @@ impl Waiter {
let collectable = self.tasks.len() != 1;
for f in self.tasks.drain(..) {
//eprintln!("before static channel");
let tx = get_static_channel().await.0;
//let tx = get_static_channel().await.0;
//eprintln!("after static channel");
let fid = Uuid::new_v4();
self.fids.push(fid);
let task_wrapper = async move {
/*let task_wrapper = async move {
//eprintln!("inside wrapper (started): {}", fid);
let result = f.await;
tx.send(fid).await.unwrap();
result
};
};*/
let handle = JoinInfo {
handle: spawn(task_wrapper),
handle: spawn(f),
completed: false,
collectable
};
@ -105,16 +105,15 @@ impl Waiter {
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
/*
async fn init_receiver() {
while let Some(fid) = get_static_channel().await.1.recv().await {
eprintln!("task {} is done", fid);
if let Some(j) = FUT_RESULTS.lock().await.get_mut(&fid) {
j.completed = true;
}
}
}
*/
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes> {
let &mut JoinInfo {handle: _, collectable, completed} = match FUT_RESULTS
.lock()

@ -1,3 +1,4 @@
#![allow(non_upper_case_globals)]
pub mod executor;
pub mod config;
pub mod utils;
@ -19,3 +20,7 @@ extern crate lazy_static;
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate log;
extern crate env_logger;

@ -10,23 +10,24 @@ use uuid::Uuid;
use crate::{UID};
pub trait ToMsg: Clone { //+ Serialize + DeserializeOwned {
fn as_message<'m>(&'m self) -> Message<'m, Self>
fn as_message<'m>(&'m self) -> BaseMessage<'m, Self>
where Cow<'m, Self>: From<&'m Self> {
Message::new(self)
BaseMessage::new(self)
}
}
// 1. Cow<'_, ItemWrap<I>> - failed, Message::new needs val or ref
// 2. ItemWrap<Cow<'_, I>> - can't impl From<Vec<...>> for Cow
fn into_message(self) -> BaseMessage<'static, Self> {
BaseMessage::new_cow(self)
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Message<'cow, I>
pub struct BaseMessage<'cow, I>
where I: ToMsg {
pub id: Uuid,
pub item: Cow<'cow, I>
}
impl<'cow, I> Message<'cow, I>
impl<'cow, I> BaseMessage<'cow, I>
where I: ToMsg
{
pub fn new<C>(item: C) -> Self
@ -37,6 +38,13 @@ impl<'cow, I> Message<'cow, I>
}
}
pub fn new_cow(item: I) -> Self {
Self {
id: UID.clone(),
item: Cow::Owned(item)
}
}
pub fn into_item(self) -> I {
self.item.into_owned()
}
@ -45,26 +53,7 @@ impl<'cow, I> Message<'cow, I>
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RawMsg(pub String);
impl<T: ToMsg> ToMsg for Vec<T> {} //TODO: impl this for all collections
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ItemWrap<T: ToMsg>(pub T);
impl<T: ToMsg> ToMsg for ItemWrap<T> {}
impl<'cow, T: ToMsg> From<ItemWrap<T>> for Cow<'cow, ItemWrap<T>> {
fn from(obj: ItemWrap<T>) -> Cow<'cow, ItemWrap<T>> {
Cow::Owned(obj)
}
}
impl<'cow, T: ToMsg> From<&'cow ItemWrap<T>> for Cow<'cow, ItemWrap<T>> {
fn from(obj: &'cow ItemWrap<T>) -> Cow<'cow, ItemWrap<T>> {
Cow::Borrowed(obj)
}
}
impl<T: ToMsg> ToMsg for Vec<T> {}
/*
#[cfg(test)]
mod tests {
@ -73,11 +62,11 @@ mod tests {
#[test]
fn test_create_message_owned() {
let item = String::from("QWEDSA");
let msg_raw = Message {
let msg_raw = BaseMessage {
id: *UID,
item: Cow::Owned(item.clone())
};
let msg = Message::new(item);
let msg = BaseMessage::new(item);
assert_eq!(msg_raw.item, msg.item);
}
}*/

@ -341,7 +341,7 @@ mod tests {
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
/*
#[tokio::test]
async fn test_exec_multiple_jobs_nowait() -> UResult<()> {
const REPEATS: usize = 10;
@ -358,7 +358,7 @@ mod tests {
}
Ok(())
}
*/
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk");

Loading…
Cancel
Save