almost shareable jobs

4-update-check
plazmoid 4 years ago
parent 8e081158a5
commit 9ccf69e692
  1. 19
      bin/u_agent/src/main.rs
  2. 32
      bin/u_server/src/handlers.rs
  3. 45
      bin/u_server/src/main.rs
  4. 4
      lib/u_lib/src/client/client.rs
  5. 6
      lib/u_lib/src/client/network.rs
  6. 6
      lib/u_lib/src/contracts/datatypes.rs
  7. 2
      lib/u_lib/src/contracts/mod.rs

@ -23,8 +23,25 @@ async fn main() {
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip);
let cli_info = ClientInfo::gather();
instance.init(&cli_info).await;
retry_until_ok!(instance.init(&cli_info).await);
loop {
let jobs = retry_until_ok!(instance.get_jobs().await);
if jobs.len() > 0 {
println!("{:?}", jobs);
}
sleep(Duration::from_secs(2));
}
}
#[macro_export]
macro_rules! retry_until_ok {
( $body:stmt ) => {
loop {
match $body {
Ok(r) => break r,
Err(e) => println!("{:?}", e)
};
sleep(Duration::from_secs(2));
}
}
}

@ -25,6 +25,38 @@ pub async fn add_client(
}
}
pub async fn get_jobs(
db: Storage) -> Result<impl Reply, Rejection>
{
let mut clients = db.lock().await;
let cli = clients.get_mut(&msg.id).unwrap();
cli.jobs.iter_mut().for_each(|job: &mut JobMeta| {
if job.state == JobState::Queued {
job.state = JobState::Pending
}
});
Ok(warp::reply::json(
&Message::new(cli.jobs.clone())
))
}
pub async fn set_jobs(
uid: Option<Uuid>,
msg: Message<'_, JobStorageWrapper>,
db: Storage) -> Result<impl Reply, Rejection>
{
let mut clients = db.lock().await;
let cli = clients.get_mut(&uid.unwrap_or(msg.id)).unwrap();
msg.item.0.into_iter().for_each(|(uuid, job)| {
match cli.jobs.get_mut(&uuid) {
Some(cli_job) => *cli_job = job,
None => cli.jobs.push(job)
};
});
Ok(())
}
pub async fn listing(db: Storage) -> Result<impl Reply, Rejection> {
let clients = db.lock().await;
let mut result: Vec<ClientInfo> = Vec::with_capacity(clients.len());

@ -10,12 +10,14 @@ use env_logger;
use u_lib::{
MASTER_PORT,
contracts::*,
client::network::Paths
};
use uuid::Uuid;
fn get_content() -> impl Filter<Extract = (Message<'static, ClientInfo>,),
fn get_content<M: ToMsg>() -> impl Filter<Extract = (Message<'static, M>,),
Error = Rejection> + Clone {
body::content_length_limit(1024*64).and(body::json::<Message<ClientInfo>>())
body::content_length_limit(1024*64).and(body::json::<Message<M>>())
}
@ -26,22 +28,49 @@ async fn main() {
let db = warp::any().map(move || base_db.clone());
let new_client = warp::post()
.and(warp::path("init"))
.and(get_content())
.and(warp::path(Paths::init))
.and(get_content::<ClientInfo>())
.and(db.clone())
.and_then(handlers::add_client);
let ls = warp::get()
.and(warp::path("ls"))
.and(warp::path(Paths::ls))
.and(db.clone())
.and_then(handlers::listing);
let get_jobs = warp::get()
.and(warp::path(Paths::get_jobs))
.and(db.clone())
.and_then(handlers::get_jobs);
let set_jobs = warp::post()
.and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>().map(Some))
.and(get_content::<JobStorageWrapper>())
.and(db.clone())
.and_then(handlers::set_jobs);
let update_own_jobs = warp::post()
.and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>().map(Some))
.and(get_content::<JobStorageWrapper>())
.and(db.clone())
.and_then(handlers::set_jobs);
let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let auth_zone = auth_token.and(ls);
let agent_zone = new_client
.or(get_jobs)
.or(update_own_jobs)
;
let auth_zone = auth_token
.and(ls
.or(set_jobs)
);
let routes = new_client
.or(auth_zone)
let routes = auth_zone
.or(agent_zone)
.with(warp::log("warp"));
warp::serve(routes)
.run(([0,0,0,0], MASTER_PORT)).await;

@ -12,14 +12,14 @@ use crate::{contracts::*, UID, exec_job};
pub struct UClient {
pub client_info: ClientInfo,
pub jobs: Vec<JobMeta>, // TODO: to futures
pub jobs: JobStorage, // TODO: to futures
}
impl UClient {
pub fn new(client_info: ClientInfo) -> Self {
Self {
client_info,
jobs: Vec::new()
jobs: HashMap::new()
}
}
}

@ -21,7 +21,7 @@ use std::{
pub struct Paths;
#[macro_export]
macro_rules! epilogue {
macro_rules! get_result {
( () ) => ( |_| async Ok(()) );
( $result:ty ) => {
|response: Response| async {
@ -49,7 +49,7 @@ macro_rules! build_handler {
request = request
$(.json::<Message<'_, $param>>(&param.as_message()))?;
let response = request.send().await?;
($crate::epilogue!($result)(response)).await
($crate::get_result!($result)(response)).await
}
}
@ -106,3 +106,5 @@ impl ClientHandler {
build_handler!(POST init(ClientInfo) -> RawMsg);
build_handler!(GET ls() -> Vec<ClientInfo>);
build_handler!(POST del() -> ());
build_handler!(GET get_jobs() -> JobStorageWrapper);
build_handler!(POST set_jobs(JobStorageWrapper) -> ());

@ -7,9 +7,15 @@ use {
std::sync::Arc,
std::collections::HashMap,
uuid::Uuid,
serde::{Serialize, Deserialize}
};
pub type CliStorage = HashMap<Uuid, UClient>;
pub type JobStorage = HashMap<Uuid, JobMeta>;
// because can't impl From<HashMap<...>> for Cow
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct JobStorageWrapper(pub JobStorage);
#[derive(Clone)]

@ -34,4 +34,4 @@ macro_rules! to_message {
}
}
to_message!(ClientInfo, RawMsg);
to_message!(ClientInfo, RawMsg, JobStorageWrapper);
Loading…
Cancel
Save