server hardwork

4-update-check
plazmoid 4 years ago
parent 2fc52aafb6
commit 54707c1a92
  1. 35
      bin/u_server/src/db.rs
  2. 57
      bin/u_server/src/handlers.rs
  3. 32
      bin/u_server/src/main.rs
  4. 40
      lib/u_lib/src/api.rs
  5. 4
      lib/u_lib/src/executor.rs
  6. 3
      lib/u_lib/src/messaging.rs
  7. 37
      lib/u_lib/src/models/jobs.rs
  8. 5
      lib/u_lib/src/models/mod.rs
  9. 2
      migrations/2020-10-24-111622_create_all/up.sql

@ -12,6 +12,7 @@ use crate::{
errors::USrvResult
};
use u_lib::models::*;
use uuid::Uuid;
pub type Storage = Arc<Mutex<UDB>>;
@ -30,7 +31,7 @@ impl UDB {
Ok(Arc::new(Mutex::new(instance)))
}
pub fn new_agent(&self, agent: IAgent) -> USrvResult<()> {
pub fn new_agent(&self, agent: &IAgent) -> USrvResult<()> {
use schema::agents;
diesel::insert_into(agents::table)
.values(agent)
@ -44,6 +45,38 @@ impl UDB {
.load::<Agent>(&self.conn)?;
Ok(result)
}
pub fn get_jobs(&self, uid: Option<Uuid>) -> USrvResult<Vec<JobMeta>> {
use schema::jobs;
let result = if uid.is_some() {
jobs::table
.filter(jobs::id.like(uid.unwrap()))
} else {
jobs::table
}
.load::<JobMeta>(&self.conn)?;
Ok(result)
}
pub fn get_agent_jobs(&self, uid: Option<Uuid>) -> USrvResult<Vec<JobMeta>> {
use schema::{results, jobs};
let result = if uid.is_some() {
jobs::table
.filter(jobs::id.like(uid.unwrap()))
} else {
jobs::table
}
.load::<Agent>(&self.conn)?;
Ok(result)
}
pub fn add_jobs(&self, jobs: &Vec<JobMeta>) -> USrvResult<()> {
use schema::jobs;
diesel::insert_into(jobs::table)
.values(jobs)
.execute(&self.conn)?;
Ok(())
}
}
#[cfg(test)]

@ -4,30 +4,58 @@ use u_lib::{
use warp::{
Rejection,
Reply,
reply::with_status,
reply::{with_status, WithStatus},
http::StatusCode
};
use crate::db::{
Storage
};
use uuid::Uuid;
pub async fn add_agent(
msg: BaseMessage<'_, IAgent>,
db: Storage) -> Result<impl Reply, Rejection>
{
let result = db.lock()
match db.lock()
.unwrap()
.new_agent(msg.into_item());
match result {
Ok(_) => Ok(warp::reply::json(
&RawMsg("Added".to_string()).as_message()
.new_agent(&msg.into_item()) {
Ok(_) => Ok(warp::reply()),
//Err(e) => Ok(with_status(s.to_string(), StatusCode::BAD_REQUEST)) TODO
Err(e) => Err(warp::reject())
}
}
pub async fn get_agents(db: Storage) -> Result<impl Reply, Rejection> {
match db.lock()
.unwrap()
.get_agents() {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
Err(e) => Ok(warp::reply::json( //TODO: rejection
&RawMsg("Already exist".to_string()).as_message()
))
Err(e) => Err(warp::reject())
}
}
pub async fn get_jobs(
uid: Option<Uuid>,
db: Storage) -> Result<impl Reply, Rejection>
{
}
pub async fn get_agent_jobs(
uid: Option<Uuid>,
db: Storage) -> Result<impl Reply, Rejection>
{
}
pub async fn upload_jobs(
msg: BaseMessage<'_, Vec<JobMeta>>,
db: Storage) -> Result<impl Reply, Rejection>
{
}
/*
pub async fn report(
msg: Payload<Vec<JobResult>>,
@ -89,17 +117,6 @@ pub async fn set_jobs(
}
*/
pub async fn get_agents(db: Storage) -> Result<impl Reply, Rejection> {
let result = db.lock().unwrap().get_agents();
match result {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
Err(e) => {
Err(warp::reject())
}
}
}
pub async fn dummy() -> Result<impl Reply, Rejection> {
Ok(String::from("ok"))

@ -22,6 +22,7 @@ use db::*;
use serde::{
de::DeserializeOwned
};
use uuid::Uuid;
fn get_content<M>()
-> impl Filter<Extract = (BaseMessage<'static, M>,),
@ -51,17 +52,31 @@ async fn main() {
.and(warp::path(Paths::get_agents))
.and(db.clone())
.and_then(handlers::get_agents);
/*
let upload_job = warp::post()
.and(warp::path(Paths::upload_job))
let upload_jobs = warp::post()
.and(warp::path(Paths::upload_jobs))
.and(get_content::<Vec<JobMeta>>())
.and(db.clone())
.and_then(handlers::upload_job);
.and_then(handlers::upload_jobs);
let get_jobs = warp::get()
.and(warp::path(Paths::get_jobs))
.and(warp::path::param::<Option<Uuid>>())
.and(db.clone())
.and_then(handlers::get_jobs);
let get_agent_jobs = warp::get()
.and(warp::path(Paths::get_agent_jobs))
.and(warp::path::param::<Option<Uuid>>())
.and(db.clone())
.and_then(handlers::get_agent_jobs);
let del = warp::get()
.and(warp::path(Paths::del))
.and(warp::path::param::<Uuid>())
.and(db.clone())
.and_then(handlers::del);
/*
let set_jobs = warp::post()
.and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>().map(Some))
@ -84,17 +99,18 @@ async fn main() {
let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let agent_zone = new_client
// .or(get_jobs)
.or(get_agent_jobs)
// .or(report)
;
let auth_zone = auth_token
.and(get_agents
// .or(upload_job)
.or(get_jobs)
.or(upload_jobs)
.or(del)
// .or(set_jobs)
// .or(get_job_results)
)
;
);
let routes = auth_zone
.or(agent_zone);

@ -29,12 +29,16 @@ macro_rules! build_url_by_method {
ptype = $($param_type:ty)?,
urlparam = $($url_param:ty)?
) => {
|instance: &ClientHandler $(, param: &$param_type)?| {
|
instance: &ClientHandler
$(, param: &$param_type)?
$(, url: &$url_param)?
| {
let request = ClientHandler::build_post(
instance,
&format!("{}/{}",
stringify!($path),
String::new() $(+ stringify!($url_param))?
String::new() $(+ &(url as &$url_param).to_string())?
)
);
request
@ -47,12 +51,16 @@ macro_rules! build_url_by_method {
ptype = $($param_type:ty)?,
urlparam = $($url_param:ty)?
) => {
|instance: &ClientHandler $(, param: &$param_type)?| {
|
instance: &ClientHandler
$(, param: &$param_type)?
$(, url: &$url_param)?
| {
let request = ClientHandler::build_get(
instance,
&format!("{}/{}",
stringify!($path),
String::new() $(+ stringify!($url_param))?
String::new() $(+ &(url as &$url_param).to_string())?
)
);
request
@ -73,12 +81,14 @@ macro_rules! build_handler {
) => {
impl ClientHandler {
pub async fn $path(
&self $(, param: &$param_type)? // $(, url_param: &$url_param)?
&self
$(, param: &$param_type)?
$(, url_param: &$url_param)?
) -> UResult<$result> {
let request = $crate::build_url_by_method!(
$method $path,
pname = $($param_name)?, ptype = $($param_type)?, urlparam = $($url_param)?
)(self $(, param as &$param_type)? );
)(self $(, param as &$param_type)? $(, url_param as &$url_param)?);
let response = request.send().await?;
match response.error_for_status() {
Ok(r) => r.json::<BaseMessage<$result>>()
@ -146,20 +156,20 @@ impl ClientHandler {
// client listing (A)
build_handler!(GET get_agents() -> Vec<Agent>);
// get jobs for client (agent_id=Uuid)
build_handler!(GET get_jobs("agent_id":Uuid) -> Vec<JobMeta>);
build_handler!(GET get_agent_jobs/Uuid() -> Vec<JobMeta>);
// get all available jobs (A)
build_handler!(GET get_jobs/Uuid() -> Vec<JobMeta>);
// add client to server's db
build_handler!(POST init(IAgent) -> RawMsg);
build_handler!(POST init(IAgent) -> String);
// create and upload job (A)
build_handler!(POST add_jobs(Vec<JobMeta>) -> ());
build_handler!(POST upload_jobs(Vec<JobMeta>) -> ());
// delete something (A)
build_handler!(GET del/Uuid() -> ());
/*
// set jobs for client (A)
// POST /set_jobs/Uuid json: JobMetaStorage
build_handler!(POST set_jobs(JobMetaStorage; Uuid) -> ());
// POST /set_jobs/Uuid json: Vec<Uuid>
build_handler!(POST set_jobs/Uuid(Vec<Uuid>) -> ());
// get results (A)
// GET /get_job_results?job_id=Uuid
build_handler!(GET get_job_results("job_id":Uuid) -> Vec<JobResult>);
// GET /get_job_results/Uuid
build_handler!(GET get_job_results/Uuid() -> Vec<JobResult>);
// report job result
build_handler!(POST report(Vec<JobResult>) -> ());
*/

@ -2,7 +2,7 @@
// job runner (thread)
// every job runs in other thread/process
use crate::{models::*, UResult, UError, OneOrMany};
use crate::{models::*, UResult, OneOrMany};
use std::collections::HashMap;
@ -11,7 +11,7 @@ use lazy_static::lazy_static;
use tokio::{
spawn,
task::JoinHandle,
sync::mpsc::{channel, Receiver, Sender}
//sync::mpsc::{channel, Receiver, Sender}
};
use uuid::Uuid;

@ -42,9 +42,6 @@ impl<'cow, I> BaseMessage<'cow, I>
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RawMsg(pub String);
/*
#[cfg(test)]
mod tests {

@ -43,7 +43,9 @@ pub enum JobSchedule {
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum)]
pub enum Jobstate {
#[PgType = "JobState"]
#[DieselType = "Jobstate"]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
//Pending, // client got a job, but not running yet
Running, // client is currently running a job
@ -52,6 +54,7 @@ pub enum Jobstate {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum)]
#[PgType = "JobType"]
#[DieselType = "Job_type"]
pub enum JobType {
Manage,
Shell,
@ -138,7 +141,15 @@ impl JobOutput {
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Identifiable)]
#[derive(
Serialize,
Deserialize,
Clone,
Debug,
Queryable,
Identifiable,
Insertable
)]
#[table_name = "jobs"]
pub struct JobMeta {
pub alias: String,
@ -146,7 +157,7 @@ pub struct JobMeta {
pub exec_type: JobType,
//pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Box<Vec<u8>>>,
pub payload: Option<Vec<u8>>,
}
impl JobMeta {
@ -159,19 +170,28 @@ impl JobMeta {
exec_type: JobType::Shell,
//schedule: JobSchedule::Once,
platform: guess_host_triple().unwrap_or("unknown").to_string(),
payload: Some(Box::new(shell_cmd.into_bytes()))
payload: Some(shell_cmd.into_bytes())
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Insertable)]
#[derive(
Serialize,
Deserialize,
Clone,
Debug,
Queryable,
Identifiable,
Insertable
)]
#[table_name = "results"]
pub struct JobResult {
pub agent_id: Uuid,
pub id: Uuid,
pub job_id: Uuid,
pub result: Option<Vec<u8>>,
// pub state: Jobstate,
pub state: JobState,
pub retcode: Option<i32>,
pub ts: SystemTime,
}
@ -191,9 +211,10 @@ impl Default for JobResult {
fn default() -> Self {
Self {
agent_id: Uuid::nil(),
id: Uuid::new_v4(),
job_id: Uuid::nil(),
result: None,
// state: Jobstate::Running,
state: JobState::Queued,
retcode: None,
ts: SystemTime::now()
}
@ -202,7 +223,7 @@ impl Default for JobResult {
pub struct Job {
exec_type: JobType,
payload: Option<Box<Vec<u8>>>,
payload: Option<Vec<u8>>,
result: JobResult
}

@ -13,6 +13,7 @@ pub use crate::{
use std::{
borrow::Cow
};
use uuid::Uuid;
macro_rules! to_message {
($($type:ty),+) => { $(
@ -40,8 +41,10 @@ to_message!(
IAgent,
JobMeta,
JobResult,
RawMsg,
String,
Vec<Agent>,
Vec<JobMeta>,
Vec<JobResult>,
Vec<Uuid>,
()
);

@ -50,7 +50,7 @@ CREATE TABLE IF NOT EXISTS results (
, job_id UUID NOT NULL
, result BYTEA
, retcode INTEGER
-- , state JobState NOT NULL DEFAULT 'queued'
, state JobState NOT NULL DEFAULT 'queued'
, ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, FOREIGN KEY(agent_id) REFERENCES agents(id)
, FOREIGN KEY(job_id) REFERENCES jobs(id)

Loading…
Cancel
Save