You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

162 lines
5.5 KiB

use crate::db::UDB;
use crate::error::Error;
use u_lib::{
messaging::{AsMsg, BaseMessage, Reportable},
models::*,
utils::OneOrVec,
};
use uuid::Uuid;
use warp::Rejection;
type EndpResult<T> = Result<T, Rejection>;
pub struct Endpoints;
impl Endpoints {
pub async fn add_agent(msg: Agent) -> EndpResult<()> {
UDB::lock_db().insert_agent(&msg).map_err(From::from)
}
pub async fn get_agents(uid: Option<Uuid>) -> EndpResult<Vec<Agent>> {
UDB::lock_db().get_agents(uid).map_err(From::from)
}
pub async fn get_jobs(uid: Option<Uuid>) -> EndpResult<Vec<JobMeta>> {
UDB::lock_db().get_jobs(uid).map_err(From::from)
}
pub async fn get_agent_jobs(uid: Option<Uuid>) -> EndpResult<Vec<AssignedJob>> {
UDB::lock_db()
.get_exact_jobs(uid, false)
.map_err(From::from)
}
pub async fn get_personal_jobs(uid: Uuid) -> EndpResult<Vec<AssignedJob>> {
let db = UDB::lock_db();
let mut agents = db.get_agents(Some(uid))?;
if agents.is_empty() {
let new_agent = Agent::with_id(uid);
db.insert_agent(&new_agent)?;
let job = db
.find_job_by_alias("agent_hello")?
.expect("agent_hello job not found");
db.set_jobs_for_agent(&uid, &[job.id])?;
} else {
let mut agent = agents.pop().unwrap();
agent.touch();
db.update_agent(&agent)?;
}
let result = db.get_exact_jobs(Some(uid), true)?;
for j in result.iter() {
db.update_job_status(j.id, JobState::Running)?;
}
Ok(result)
}
pub async fn upload_jobs(msg: BaseMessage<'static, Vec<JobMeta>>) -> EndpResult<Vec<Uuid>> {
UDB::lock_db()
.insert_jobs(&msg.into_inner())
.map_err(From::from)
}
pub async fn del(uid: Uuid) -> EndpResult<usize> {
let db = UDB::lock_db();
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns {
let affected = del_fn(&db, &[uid])?;
if affected > 0 {
return Ok(affected);
}
}
Ok(0)
}
pub async fn set_jobs(
agent_uid: Uuid,
msg: BaseMessage<'static, Vec<String>>,
) -> EndpResult<Vec<Uuid>> {
msg.into_inner()
.into_iter()
.map(|ident| {
Uuid::parse_str(&ident).or_else(|_| {
let job_from_db = UDB::lock_db().find_job_by_alias(&ident);
match job_from_db {
Ok(job) => match job {
Some(j) => Ok(j.id),
None => Err(Error::ProcessingError(format!("unknown ident {ident}"))),
},
Err(e) => Err(e),
}
})
})
.collect::<Result<Vec<Uuid>, Error>>()
.and_then(|j| UDB::lock_db().set_jobs_for_agent(&agent_uid, &j))
.map_err(From::from)
}
pub async fn report<Data: OneOrVec<Reportable> + AsMsg + 'static>(
msg: BaseMessage<'static, Data>,
) -> EndpResult<()> {
let id = msg.id;
for entry in msg.into_inner().into_vec() {
match entry {
Reportable::Assigned(mut result) => {
let result_agent_id = &result.agent_id;
if id != *result_agent_id {
warn!("Ids are not equal! actual id: {id}, id from job: {result_agent_id}");
continue;
}
result.state = JobState::Finished;
result.touch();
match result.exec_type {
JobType::Init => match &result.result {
Some(rbytes) => {
let mut agent: Agent = match serde_json::from_slice(&rbytes) {
Ok(a) => a,
Err(e) => {
warn!("Error deserializing agent from {id}: {e}");
continue;
}
};
agent.state = AgentState::Active;
Self::add_agent(agent).await?;
}
None => warn!("Empty agent data"),
},
JobType::Shell => (),
JobType::Terminate => todo!(),
JobType::Update => todo!(),
}
UDB::lock_db().update_result(&result)?;
}
Reportable::Error(e) => {
warn!("{} reported an error: {}", id, e);
}
Reportable::Dummy => (),
}
}
Ok(())
}
pub async fn update_agent(agent: BaseMessage<'static, Agent>) -> EndpResult<()> {
UDB::lock_db().update_agent(&agent.into_inner())?;
Ok(())
}
pub async fn update_job(job: BaseMessage<'static, JobMeta>) -> EndpResult<()> {
UDB::lock_db().update_job(&job.into_inner())?;
Ok(())
}
pub async fn update_assigned_job(
assigned: BaseMessage<'static, AssignedJob>,
) -> EndpResult<()> {
UDB::lock_db().update_result(&assigned.into_inner())?;
Ok(())
}
pub async fn download(_file_uid: String) -> EndpResult<Vec<u8>> {
todo!()
}
}