You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

181 lines
5.8 KiB

use crate::db::UDB;
use diesel::SaveChangesDsl;
use hyper::Body;
use serde::Serialize;
use u_lib::{
messaging::{AsMsg, BaseMessage, Reportable},
models::*,
utils::{OneOrVec, Stripped},
ULocalError,
};
use uuid::Uuid;
use warp::{
http::{Response, StatusCode},
Rejection, Reply,
};
pub fn build_response(code: StatusCode, body: impl Into<Body>) -> Response<Body> {
Response::builder().status(code).body(body.into()).unwrap()
}
pub fn build_ok(body: impl Into<Body>) -> Response<Body> {
build_response(StatusCode::OK, body)
}
pub fn build_err(body: impl ToString) -> Response<Body> {
build_response(StatusCode::BAD_REQUEST, body.to_string())
}
pub fn build_message<M: AsMsg + Serialize>(m: M) -> Response<Body> {
warp::reply::json(&m.as_message()).into_response()
}
pub struct Endpoints;
#[cfg_attr(test, automock)]
impl Endpoints {
pub async fn add_agent(msg: Agent) -> Result<Response<Body>, Rejection> {
info!("hnd: add_agent");
UDB::lock_db()
.insert_agent(&msg)
.map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_agents(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
info!("hnd: get_agents");
UDB::lock_db()
.get_agents(uid)
.map(build_message)
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
info!("hnd: get_jobs");
UDB::lock_db()
.get_jobs(uid)
.map(build_message)
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_agent_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
info!("hnd: get_agent_jobs");
UDB::lock_db()
.get_exact_jobs(uid, false)
.map(build_message)
.or_else(|e| Ok(build_err(e)))
}
pub async fn get_personal_jobs(uid: Option<Uuid>) -> Result<Response<Body>, Rejection> {
info!("hnd: get_personal_jobs");
let agents = UDB::lock_db().get_agents(uid).unwrap();
if agents.is_empty() {
let db = UDB::lock_db();
db.insert_agent(&Agent::with_id(uid.unwrap())).unwrap();
let job = db.find_job_by_alias("agent_hello").unwrap();
if let Err(e) = db.set_jobs_for_agent(&uid.unwrap(), &[job.id]) {
return Ok(build_err(e));
}
}
let result = UDB::lock_db().get_exact_jobs(uid, true);
match result {
Ok(r) => {
let db = UDB::lock_db();
for j in r.iter() {
db.update_job_status(j.id, JobState::Running).unwrap();
}
Ok(build_message(r))
}
Err(e) => Ok(build_err(e)),
}
}
pub async fn upload_jobs(
msg: BaseMessage<'static, Vec<JobMeta>>,
) -> Result<Response<Body>, Rejection> {
info!("hnd: upload_jobs");
UDB::lock_db()
.insert_jobs(&msg.into_inner())
.map(|_| build_ok(""))
.or_else(|e| Ok(build_err(e)))
}
pub async fn del(uid: Uuid) -> Result<Response<Body>, Rejection> {
info!("hnd: del");
let db = UDB::lock_db();
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns {
let affected = del_fn(&db, &[uid]).unwrap();
if affected > 0 {
return Ok(build_message(affected as i32));
}
}
Ok(build_message(0))
}
pub async fn set_jobs(
agent_uid: Uuid,
msg: BaseMessage<'static, Vec<String>>,
) -> Result<Response<Body>, Rejection> {
info!("hnd: set_jobs_by_alias, agent: {}", agent_uid);
let jobs: Result<Vec<Uuid>, ULocalError> = msg
.into_inner()
.into_iter()
.map(|ident| {
info!("hnd: set_jobs_by_alias, job: {}", ident);
Uuid::parse_str(&ident)
.or_else(|_| UDB::lock_db().find_job_by_alias(&ident).map(|j| j.id))
})
.collect();
match jobs {
Ok(j) => UDB::lock_db()
.set_jobs_for_agent(&agent_uid, &j)
.map(build_message)
.or_else(|e| Ok(build_err(e))),
Err(e) => Ok(build_err(e)),
}
}
pub async fn report<Data: OneOrVec<Reportable> + AsMsg + 'static>(
msg: BaseMessage<'static, Data>,
) -> Result<Response<Body>, Rejection> {
info!("hnd: report");
let id = msg.id;
let mut failed = vec![];
for entry in msg.into_inner().into_vec() {
match entry {
Reportable::Assigned(res) => {
if id != res.agent_id {
continue;
}
let db = UDB::lock_db();
if let Err(e) = res
.save_changes::<AssignedJob>(&db.conn)
.map_err(ULocalError::from)
{
failed.push(e.to_string())
}
}
Reportable::Agent(mut a) => {
a.state = AgentState::Active;
Self::add_agent(a).await?;
}
Reportable::Error(e) => {
let err = AgentError::from_msg(e, id);
warn!(
"{} reported an error: {}",
err.agent_id,
Stripped(&err.msg.as_str())
);
UDB::lock_db().report_error(&err).unwrap();
}
Reportable::Dummy => (),
}
}
if !failed.is_empty() {
let err_msg = ULocalError::ProcessingError(failed.join(", "));
return Ok(build_err(err_msg));
}
Ok(build_ok(""))
}
}