You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

286 lines
9.2 KiB

use std::sync::Arc;
use crate::db::{PgRepo, UDB};
use crate::error::Error;
use serde::Deserialize;
use u_lib::{api::retypes, messaging::Reportable, models::*, types::Id};
use warp::reject::not_found;
use warp::Rejection;
type EndpResult<T> = Result<T, Rejection>;
#[derive(Deserialize)]
pub struct PayloadFlags {
brief: BriefMode,
}
pub struct Endpoints;
impl Endpoints {
pub async fn get_agents(repo: Arc<PgRepo>, id: Option<Id>) -> EndpResult<retypes::GetAgents> {
repo.interact(move |mut db| {
Ok(match id {
Some(id) => {
if let Some(agent) = db.get_agent(id)? {
vec![agent]
} else {
vec![]
}
}
None => db.get_agents()?,
})
})
.await
.map_err(From::from)
}
pub async fn get_job(
repo: Arc<PgRepo>,
id: Id,
params: Option<PayloadFlags>,
) -> EndpResult<retypes::GetJob> {
let Some(mut job) = repo.interact(move |mut db| db.get_job(id)).await? else {
return Err(not_found())
};
Ok(match params.map(|p| p.brief) {
Some(BriefMode::Yes) => job,
Some(BriefMode::Auto) | None => {
if let Some(payload) = &mut job.payload {
payload.maybe_join_payload().map_err(Error::from)?;
}
job
}
Some(BriefMode::No) => {
if let Some(payload) = &mut job.payload {
payload.join_payload().map_err(Error::from)?;
}
job
}
})
}
pub async fn get_jobs(repo: Arc<PgRepo>) -> EndpResult<retypes::GetJobs> {
repo.interact(move |mut db| db.get_jobs())
.await
.map_err(From::from)
}
pub async fn get_assigned_jobs(
repo: Arc<PgRepo>,
id: Option<Id>,
) -> EndpResult<retypes::GetAgentJobs> {
repo.interact(move |mut db| db.get_assigned_jobs(id, false))
.await
.map_err(From::from)
}
pub async fn get_payloads(repo: Arc<PgRepo>) -> EndpResult<retypes::GetPayloads> {
repo.interact(move |mut db| db.get_payloads())
.await
.map_err(From::from)
}
pub async fn get_payload(
repo: Arc<PgRepo>,
id: Id,
params: Option<PayloadFlags>,
) -> EndpResult<retypes::GetPayload> {
let Some(mut payload) = repo.interact(move |mut db| db.get_payload(id)).await? else {
return Err(not_found())
};
Ok(match params.map(|p| p.brief) {
Some(BriefMode::Yes) => payload,
None | Some(BriefMode::Auto) => {
payload.maybe_join_payload().map_err(Error::from)?;
payload
}
_ => {
payload.join_payload().map_err(Error::from)?;
payload
}
})
}
pub async fn get_personal_jobs(
repo: Arc<PgRepo>,
id: Id,
) -> EndpResult<retypes::GetPersonalJobs> {
repo.transaction(move |mut db| {
let agent = db.get_agent(id)?;
match agent {
Some(mut agent) => {
agent.touch();
db.upsert_agent(&agent)?;
}
None => {
let mut new_agent = Agent::empty();
new_agent.id = id;
db.upsert_agent(&new_agent)?;
let job = db
.find_job_by_alias("agent_hello")?
.expect("agent_hello job not found");
db.set_jobs_for_agent(id, &[job.job.id])?;
}
}
let assigned_jobs = db.get_assigned_jobs(Some(id), true)?;
for job in &assigned_jobs {
db.update_job_status(job.id, JobState::Running)?;
}
Ok(assigned_jobs
.into_iter()
.map(|j| AssignedJobById::from(&j))
.collect())
})
.await
.map_err(From::from)
}
pub async fn upload_jobs(repo: Arc<PgRepo>, msg: Vec<Job>) -> EndpResult<retypes::UploadJobs> {
let jobs = msg
.into_iter()
.map(|mut job| {
if let Some(payload) = &mut job.payload {
payload.maybe_split_payload()?;
}
Ok(job)
})
.collect::<Result<Vec<Job>, Error>>()?;
repo.interact(move |mut db| db.insert_jobs(&jobs))
.await
.map_err(From::from)
}
pub async fn del(repo: Arc<PgRepo>, id: Id) -> EndpResult<()> {
repo.transaction(move |mut db| {
[
UDB::del_agents,
UDB::del_jobs,
UDB::del_results,
UDB::del_payloads,
]
.iter()
.map(|f| f(&mut db, &[id]))
.collect::<Result<(), Error>>()
})
.await
.map_err(From::from)
}
pub async fn set_jobs(
repo: Arc<PgRepo>,
agent_id: Id,
job_idents: Vec<String>,
) -> EndpResult<retypes::SetJobs> {
repo.transaction(move |mut db| {
let assigned_job_idents = job_idents
.into_iter()
.map(|ident| {
Id::parse_str(&ident).or_else(|_| {
let job_from_db = db.find_job_by_alias(&ident);
match job_from_db {
Ok(job) => match job {
Some(j) => Ok(j.job.id),
None => {
Err(Error::ProcessingError(format!("unknown ident {ident}")))
}
},
Err(e) => Err(e),
}
})
})
.collect::<Result<Vec<Id>, Error>>()?;
db.set_jobs_for_agent(agent_id, &assigned_job_idents)?;
Ok(assigned_job_idents)
})
.await
.map_err(From::from)
}
pub async fn report(
repo: Arc<PgRepo>,
msg: Vec<Reportable>,
agent_id: Id,
) -> EndpResult<retypes::Report> {
repo.transaction(move |mut db| {
for entry in msg {
match entry {
Reportable::Assigned(mut result) => {
let result_agent_id = &result.agent_id;
if agent_id != *result_agent_id {
warn!("Agent ids are not equal! actual id: {agent_id}, id from job: {result_agent_id}");
continue;
}
result.touch();
info!("agent {agent_id} updated job {}", result.id);
match result.exec_type {
JobType::Init => {
result.state = JobState::Finished;
match &result.result {
Some(rbytes) => {
let mut agent: Agent = match serde_json::from_slice(&rbytes) {
Ok(a) => a,
Err(e) => {
error!("Error deserializing agent data from {agent_id}: {e}");
continue;
}
};
agent.state = AgentState::Active;
db.upsert_agent(&agent)?;
}
None => error!("Empty agent data"),
}},
JobType::Shell => {
result.state = JobState::Finished
},
JobType::Service => (),
JobType::Terminate => todo!(),
JobType::Update => todo!(),
}
db.update_result(&result)?;
}
Reportable::Error(e) => {
error!("agent {agent_id} reported: {e}");
}
Reportable::Dummy => (),
}}
Ok(())
})
.await
.map_err(From::from)
}
pub async fn update_agent(repo: Arc<PgRepo>, agent: Agent) -> EndpResult<retypes::UpdateAgent> {
repo.interact(move |mut db| db.upsert_agent(&agent)).await?;
Ok(())
}
pub async fn update_job(repo: Arc<PgRepo>, mut job: Job) -> EndpResult<retypes::UpdateJob> {
if let Some(payload) = &mut job.payload {
payload.maybe_split_payload().map_err(Error::from)?;
}
repo.interact(move |mut db| db.update_job(&job.job)).await?;
Ok(())
}
pub async fn update_assigned_job(
repo: Arc<PgRepo>,
assigned: AssignedJob,
) -> EndpResult<retypes::UpdateResult> {
repo.interact(move |mut db| db.update_result(&assigned))
.await?;
Ok(())
}
}