improving web interfaces

4-update-check
plazmoid 4 years ago
parent 47a0ae4b2b
commit 0ea7dc0683
  1. 21
      bin/u_agent/src/main.rs
  2. 45
      bin/u_server/src/handlers.rs
  3. 22
      bin/u_server/src/main.rs
  4. 1
      lib/u_lib/src/client/client.rs
  5. 79
      lib/u_lib/src/client/network.rs
  6. 37
      lib/u_lib/src/contracts/datatypes.rs
  7. 6
      lib/u_lib/src/contracts/jobs.rs
  8. 6
      lib/u_lib/src/executor.rs

@ -11,10 +11,14 @@ use {
std::thread::sleep, std::thread::sleep,
std::time::Duration, std::time::Duration,
std::env, std::env,
u_lib::client::{ u_lib::{
ClientInfo, client::{
network::ClientHandler ClientInfo,
} network::ClientHandler
},
contracts::*,
send_jobs_to_executor,
},
}; };
#[tokio::main] #[tokio::main]
@ -27,7 +31,14 @@ async fn main() {
loop { loop {
let jobs = retry_until_ok!(instance.get_jobs().await); let jobs = retry_until_ok!(instance.get_jobs().await);
if jobs.0.len() > 0 { if jobs.0.len() > 0 {
println!("{:?}", jobs); let result = send_jobs_to_executor(jobs
.into_inner()
.values()
.collect()
).await;
retry_until_ok!(instance.report(
IterWrap(result.into_iter().map(|r| r.unwrap()).collect())
).await)
} }
sleep(Duration::from_secs(2)); sleep(Duration::from_secs(2));
} }

@ -2,14 +2,45 @@ use u_lib::contracts::*;
use warp::{ use warp::{
Rejection, Rejection,
Reply, Reply,
reply::with_status,
http::StatusCode
}; };
use uuid::Uuid;
pub async fn report(
msg: Message<'_, IterWrap<Vec<JobResult>>>,
db: Storage) -> Result<impl Reply, Rejection>
{
let results = msg.item.into_inner();
let mut storage = db.results().await;
results.into_iter().for_each(|new_result| {
match storage.get_mut(&new_result.id) {
Some(v) => v.push(new_result),
None => storage.insert(new_result.id, vec![new_result])
}
});
Ok(with_status(warp::reply(), StatusCode::OK))
}
pub async fn get_job_results(
uid: Uuid,
db: Storage) -> Result<impl Reply, Rejection>
{
let storage = db.results().await;
match storage.get(&uid) {
Some(v) => Ok(warp::reply::json(
&Message::new(v.clone())
)),
None => Err(warp::reject())
}
}
pub async fn add_client( pub async fn add_client(
msg: Message<'_, ClientInfo>, msg: Message<'_, ClientInfo>,
db: Storage) -> Result<impl Reply, Rejection> db: Storage) -> Result<impl Reply, Rejection>
{ {
let new_cli = msg.item; let new_cli = msg.item;
let mut clients = db.lock().await; let mut clients = db.clients().await;
if clients.contains_key(&new_cli.id) { if clients.contains_key(&new_cli.id) {
Ok(warp::reply::json( Ok(warp::reply::json(
&RawMsg("Already exist".to_string()).as_message() &RawMsg("Already exist".to_string()).as_message()
@ -28,7 +59,7 @@ pub async fn add_client(
pub async fn get_jobs( pub async fn get_jobs(
db: Storage) -> Result<impl Reply, Rejection> db: Storage) -> Result<impl Reply, Rejection>
{ {
let mut clients = db.lock().await; let mut clients = db.clients().await;
let cli = clients.get_mut(&msg.id).unwrap(); let cli = clients.get_mut(&msg.id).unwrap();
cli.jobs.iter_mut().for_each(|job: &mut JobMeta| { cli.jobs.iter_mut().for_each(|job: &mut JobMeta| {
if job.state == JobState::Queued { if job.state == JobState::Queued {
@ -42,10 +73,10 @@ pub async fn get_jobs(
pub async fn set_jobs( pub async fn set_jobs(
uid: Option<Uuid>, uid: Option<Uuid>,
msg: Message<'_, CollectionWrapper<JobMetaStorage>>, msg: Message<'_, IterWrap<JobMetaStorage>>,
db: Storage) -> Result<impl Reply, Rejection> db: Storage) -> Result<impl Reply, Rejection>
{ {
let mut clients = db.lock().await; let mut clients = db.clients().await;
let cli = clients.get_mut(&uid.unwrap_or(msg.id)).unwrap(); let cli = clients.get_mut(&uid.unwrap_or(msg.id)).unwrap();
msg.item.0.into_iter().for_each(|(uuid, job)| { msg.item.0.into_iter().for_each(|(uuid, job)| {
match cli.jobs.get_mut(&uuid) { match cli.jobs.get_mut(&uuid) {
@ -57,13 +88,13 @@ pub async fn set_jobs(
} }
pub async fn listing(db: Storage) -> Result<impl Reply, Rejection> { pub async fn ls(db: Storage) -> Result<impl Reply, Rejection> {
let clients = db.lock().await; let clients = db.clients().await;
let mut result: Vec<ClientInfo> = Vec::with_capacity(clients.len()); let mut result: Vec<ClientInfo> = Vec::with_capacity(clients.len());
for cli in clients.values() { for cli in clients.values() {
result.push(cli.client_info.clone()); result.push(cli.client_info.clone());
} }
Ok(warp::reply::json( Ok(warp::reply::json(
&CollectionWrapper(result).as_message() &IterWrap(result).as_message()
)) ))
} }

@ -36,7 +36,7 @@ async fn main() {
let ls = warp::get() let ls = warp::get()
.and(warp::path(Paths::ls)) .and(warp::path(Paths::ls))
.and(db.clone()) .and(db.clone())
.and_then(handlers::listing); .and_then(handlers::ls);
let get_jobs = warp::get() let get_jobs = warp::get()
.and(warp::path(Paths::get_jobs)) .and(warp::path(Paths::get_jobs))
@ -46,27 +46,33 @@ async fn main() {
let set_jobs = warp::post() let set_jobs = warp::post()
.and(warp::path(Paths::set_jobs)) .and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>().map(Some)) .and(warp::path::param::<Uuid>().map(Some))
.and(get_content::<CollectionWrapper<JobMetaStorage>>()) .and(get_content::<IterWrap<JobMetaStorage>>())
.and(db.clone()) .and(db.clone())
.and_then(handlers::set_jobs); .and_then(handlers::set_jobs);
let update_own_jobs = warp::post() let get_job_results = warp::get()
.and(warp::path(Paths::set_jobs)) .and(warp::path(Paths::get_job_results))
.and(warp::path::param::<Uuid>().map(Some)) .and(warp::path::param::<Uuid>())
.and(get_content::<CollectionWrapper<JobMetaStorage>>())
.and(db.clone()) .and(db.clone())
.and_then(handlers::set_jobs); .and_then(handlers::get_job_results);
let report = warp::post()
.and(warp::path(Paths::report))
.and(get_content::<IterWrap<Vec<JobResult>>>())
.and(db.clone())
.and_then(handlers::report);
let auth_token = warp::header::exact("authorization", "Bearer 123qwe"); let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let agent_zone = new_client let agent_zone = new_client
.or(get_jobs) .or(get_jobs)
.or(update_own_jobs) .or(report)
; ;
let auth_zone = auth_token let auth_zone = auth_token
.and(ls .and(ls
.or(set_jobs) .or(set_jobs)
.or(get_job_results)
); );
let routes = auth_zone let routes = auth_zone

@ -67,7 +67,6 @@ mod tests {
use crate::{ use crate::{
utils::vec_to_string utils::vec_to_string
}; };
use std::time::SystemTime;
#[tokio::test] #[tokio::test]
async fn test_gather() { async fn test_gather() {

@ -17,6 +17,7 @@ use std::{
net::Ipv4Addr, net::Ipv4Addr,
str::FromStr str::FromStr
}; };
use uuid::Uuid;
pub struct Paths; pub struct Paths;
@ -34,20 +35,64 @@ macro_rules! get_result {
}; };
} }
#[macro_export]
macro_rules! build_url_by_method {
(
POST $path:tt,
pname = $($param_name:literal)?,
ptype = $($param_type:ty)?,
urlparam = $($url_param:ty)?
) => {
|instance: &ClientHandler $(, param: &$param_type)?| {
let request = ClientHandler::build_post(
instance,
&format!("{}/{}",
stringify!($path),
String::new() $(+ stringify!($url_param))?
)
);
request
$( .json::<Message<'_, $param_type>>(&param.as_message()) )?
}
};
(
GET $path:tt,
pname = $($param_name:literal)?,
ptype = $($param_type:ty)?,
urlparam = $($url_param:ty)?
) => {
|instance: &ClientHandler $(, param: &$param_type)?| {
let request = ClientHandler::build_get(
instance,
&format!("{}/{}",
stringify!($path),
String::new() $(+ stringify!($url_param))?
)
);
request
$( .query(&[(stringify!($param_name), param.to_string())]) )?
}
};
}
// param and result must impl ToMsg // param and result must impl ToMsg
#[macro_export] #[macro_export]
macro_rules! build_handler { macro_rules! build_handler {
( $method:tt $path:tt($($param:ty)?) -> $result:ty ) => { (
$method:tt
$path:tt(
$($param_name:literal:)?
$($param_type:ty)?
$(; $url_param:ty)?
) -> $result:ty ) => {
impl ClientHandler { impl ClientHandler {
pub async fn $path(&self $(, param: &$param)?) -> UResult<$result> { pub async fn $path(
let builder = match stringify!($method) { &self $(, param: &$param_type)? $(, url_param: &$url_param)?
"POST" => ClientHandler::build_post, ) -> UResult<$result> {
"GET" => ClientHandler::build_get, let request = $crate::build_url_by_method!(
_ => panic!("Method '{}' is not allowed", stringify!($method)) $method $path,
}; pname = $($param_name)?, ptype = $($param_type)?, urlparam = $($url_param)?
let mut request = builder(self, stringify!($path)); )(self $(, param as &$param_type)? );
request = request
$( .json::<Message<'_, $param>>(&param.as_message()) )? ;
let response = request.send().await?; let response = request.send().await?;
($crate::get_result!($result)(response)).await ($crate::get_result!($result)(response)).await
} }
@ -103,15 +148,21 @@ impl ClientHandler {
} }
} }
// method basic_path(json/query param; additional_url_param) -> return value
// A - admin only // A - admin only
// client listing (A) // client listing (A)
build_handler!(GET ls() -> CollectionWrapper<Vec<ClientInfo>>); build_handler!(GET ls() -> IterWrap<Vec<ClientInfo>>);
// get jobs for client himself (A: id=client_id) // get jobs for client himself (A: id=client_id)
build_handler!(GET get_jobs() -> CollectionWrapper<JobMetaStorage>); build_handler!(GET get_jobs() -> IterWrap<JobMetaStorage>);
// add client to server's db // add client to server's db
build_handler!(POST init(ClientInfo) -> RawMsg); build_handler!(POST init(ClientInfo) -> RawMsg);
// ??? // ???
build_handler!(POST del() -> ()); build_handler!(POST del() -> ());
// set jobs for client (A) // set jobs for client (A)
build_handler!(POST set_jobs(CollectionWrapper<JobMetaStorage>) -> ()); // POST /set_jobs/Uuid json: JobMetaStorage
// get_results (A): user_id, job_id build_handler!(POST set_jobs(IterWrap<JobMetaStorage>; Uuid) -> ());
// get results (A)
// GET /get_job_results?job_id=Uuid
build_handler!(GET get_job_results("job_id":Uuid) -> IterWrap<Vec<JobResult>>);
// report job result
build_handler!(POST report(IterWrap<Vec<JobResult>>) -> ());

@ -14,29 +14,36 @@ use {
pub type CliStorage = HashMap<Uuid, UClient>; pub type CliStorage = HashMap<Uuid, UClient>;
pub type JobResultsStorage = HashMap<Uuid, Vec<JobResult>>;
pub type JobMetaStorage = HashMap<Uuid, JobMeta>; pub type JobMetaStorage = HashMap<Uuid, JobMeta>;
pub type JobMetaRef = Arc<StdMutex<JobMeta>>; pub type JobMetaRef = Arc<StdMutex<JobMeta>>;
// because can't impl From<CollectionWrapper<...>> for Cow // because can't impl From<IterWrap<...>> for Cow
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CollectionWrapper<T>(pub T); pub struct IterWrap<T>(pub T);
impl<T> From<T> for CollectionWrapper<T> { impl<T> IterWrap<T> {
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> From<T> for IterWrap<T> {
fn from(t: T) -> Self { fn from(t: T) -> Self {
CollectionWrapper(t) IterWrap(t)
} }
} }
impl<T: Clone> ToMsg for CollectionWrapper<T> {} impl<T: Clone> ToMsg for IterWrap<T> {}
impl<'cow, T: Clone> From<CollectionWrapper<T>> for Cow<'cow, CollectionWrapper<T>> { impl<'cow, T: Clone> From<IterWrap<T>> for Cow<'cow, IterWrap<T>> {
fn from(obj: CollectionWrapper<T>) -> Cow<'cow, CollectionWrapper<T>> { fn from(obj: IterWrap<T>) -> Cow<'cow, IterWrap<T>> {
Cow::Owned(obj) Cow::Owned(obj)
} }
} }
impl<'cow, T: Clone> From<&'cow CollectionWrapper<T>> for Cow<'cow, CollectionWrapper<T>> { impl<'cow, T: Clone> From<&'cow IterWrap<T>> for Cow<'cow, IterWrap<T>> {
fn from(obj: &'cow CollectionWrapper<T>) -> Cow<'cow, CollectionWrapper<T>> { fn from(obj: &'cow IterWrap<T>) -> Cow<'cow, IterWrap<T>> {
Cow::Borrowed(obj) Cow::Borrowed(obj)
} }
} }
@ -44,7 +51,8 @@ impl<'cow, T: Clone> From<&'cow CollectionWrapper<T>> for Cow<'cow, CollectionWr
#[derive(Clone)] #[derive(Clone)]
pub struct Storage { pub struct Storage {
clients: Arc<Mutex<CliStorage>> clients: Arc<Mutex<CliStorage>>,
jobs_results: Arc<Mutex<JobResultsStorage>>
} }
impl Storage { impl Storage {
@ -52,11 +60,18 @@ impl Storage {
Self { Self {
clients: Arc::new( clients: Arc::new(
Mutex::new(HashMap::<Uuid, UClient>::new()) Mutex::new(HashMap::<Uuid, UClient>::new())
),
jobs_results: Arc::new(
Mutex::new(HashMap::<Uuid, Vec<JobResult>>::new())
) )
} }
} }
pub async fn lock(&self) -> MutexGuard<'_, CliStorage> { pub async fn clients(&self) -> MutexGuard<'_, CliStorage> {
self.clients.lock().await self.clients.lock().await
} }
pub async fn results(&self) -> MutexGuard<'_, JobResultsStorage> {
self.jobs_results.lock().await
}
} }

@ -166,7 +166,7 @@ pub struct JobResult {
pub data: Option<Result<JobOutput, UError>>, pub data: Option<Result<JobOutput, UError>>,
pub state: JobState, pub state: JobState,
pub retcode: Option<i32>, pub retcode: Option<i32>,
pub date: SystemTime, pub timestamp: SystemTime,
} }
@ -189,6 +189,7 @@ impl Job {
}, },
data: None, data: None,
retcode: None, retcode: None,
timestamp: SystemTime::now()
}, },
meta: job_meta, meta: job_meta,
} }
@ -299,7 +300,8 @@ impl Job {
self.state() == JobState::Finished self.state() == JobState::Finished
} }
pub fn into_result(self) -> JobResult { pub fn into_result(mut self) -> JobResult {
self.result.timestamp = SystemTime::now();
self.result self.result
} }
} }

@ -18,11 +18,7 @@ use crate::{
}; };
use std::{ use std::{
sync::{Mutex, MutexGuard, Arc}, sync::{Mutex, MutexGuard, Arc},
thread,
time::Duration,
collections::HashMap,
}; };
use uuid::Uuid;
//use tokio::task::JoinHandle; //use tokio::task::JoinHandle;
use futures::future::{join_all, JoinAll}; use futures::future::{join_all, JoinAll};
use futures::Future; use futures::Future;
@ -50,7 +46,7 @@ impl AsyncExecutor {
} }
} }
*/ */
pub async fn apply_job(&mut self, mut new_job: Job) -> UResult<JobResult> { pub async fn apply_job(&mut self, new_job: Job) -> UResult<JobResult> {
/*let id = new_job.id(); /*let id = new_job.id();
let mut job_pool = self.jobs.lock().unwrap(); let mut job_pool = self.jobs.lock().unwrap();
job_pool.push(new_job); job_pool.push(new_job);

Loading…
Cancel
Save