use crate::error::Error; use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection}; use std::mem::drop; use u_lib::{ db::PgAsyncPool, models::{schema, Agent, AssignedJob, BriefJob, JobModel, JobState, PayloadMeta}, platform::Platform, types::Id, }; type Result = std::result::Result; pub struct PgRepo { pool: PgAsyncPool, } impl PgRepo { pub fn new(pool: PgAsyncPool) -> PgRepo { PgRepo { pool } } pub async fn interact(&self, f: F) -> Result where F: for<'c> FnOnce(UDB<'c>) -> Result, F: Send + 'static, R: Send + 'static, { let connection = self.pool.get().await?; connection .interact(|conn| f(UDB { conn })) .await .expect("deadpool interaction failed") } pub async fn transaction(&self, f: F) -> Result where F: for<'c> FnOnce(UDB<'c>) -> Result, F: Send + 'static, R: Send + 'static, { let conn = self.pool.get().await?; conn.interact(|c| c.transaction(|conn| f(UDB { conn }))) .await .expect("deadpool interaction failed") } } pub struct UDB<'c> { conn: &'c mut PgConnection, } impl UDB<'_> { pub fn insert_jobs(&mut self, jobs: &[BriefJob]) -> Result<()> { use schema::{jobs, payloads}; let (jobs, payloads_opt): (Vec<_>, Vec<_>) = jobs .iter() .map(|j| (&j.job, j.payload_meta.as_ref())) .unzip(); let payloads = payloads_opt .into_iter() .filter_map(|p| p) .collect::>(); diesel::insert_into(payloads::table) .values(payloads) .execute(self.conn) .map(drop) .map_err(with_err_ctx("Can't insert payloads"))?; diesel::insert_into(jobs::table) .values(jobs) .execute(self.conn) .map(drop) .map_err(with_err_ctx("Can't insert jobs")) } pub fn get_job(&mut self, id: Id) -> Result> { use schema::{jobs, payloads}; let maybe_job_with_payload = jobs::table .left_join(payloads::table) .filter(jobs::id.eq(id)) .first::<(JobModel, Option)>(self.conn) .optional() .map_err(with_err_ctx(format!("Can't get job {id}")))?; Ok(maybe_job_with_payload.map(|(job, payload_meta)| BriefJob { job, payload_meta })) } pub fn get_jobs(&mut self) -> Result> { use schema::jobs; jobs::table .load(self.conn) .map_err(with_err_ctx("Can't get jobs")) } pub fn get_payload_meta(&mut self, id: Id) -> Result> { use schema::payloads; payloads::table .filter(payloads::id.eq(id)) .first(self.conn) .optional() .map_err(with_err_ctx(format!("Can't get payload {id}"))) } pub fn get_payload_metas(&mut self) -> Result> { use schema::payloads; payloads::table .load(self.conn) .map_err(with_err_ctx("Can't get payloads")) } pub fn find_job_by_alias(&mut self, alias: &str) -> Result> { use schema::{jobs, payloads}; let maybe_job_with_payload = jobs::table .left_join(payloads::table) .filter(jobs::alias.eq(alias)) .first::<(JobModel, Option)>(self.conn) .optional() .map_err(with_err_ctx(format!("Can't get job by alias {alias}")))?; Ok(maybe_job_with_payload.map(|(job, payload_meta)| BriefJob { job, payload_meta })) } pub fn insert_result(&mut self, result: &AssignedJob) -> Result<()> { use schema::results; diesel::insert_into(results::table) .values(result) .execute(self.conn) .map_err(with_err_ctx(format!("Can't insert result {result:?}")))?; Ok(()) } pub fn get_agent(&mut self, id: Id) -> Result> { use schema::agents; agents::table .filter(agents::id.eq(id)) .first(self.conn) .optional() .map_err(with_err_ctx(format!("Can't get agent {id:?}"))) } pub fn get_agents(&mut self) -> Result> { use schema::agents; agents::table .load::(self.conn) .map_err(with_err_ctx(format!("Can't get agents"))) } pub fn update_job_status(&mut self, id: Id, status: JobState) -> Result<()> { use schema::results; diesel::update(results::table) .filter(results::id.eq(id)) .set(results::state.eq(status)) .execute(self.conn) .map_err(with_err_ctx(format!("Can't update status of job {id}")))?; Ok(()) } //TODO: filters possibly could work in a wrong way, check pub fn get_assigned_jobs( &mut self, id: Option, personal: bool, ) -> Result> { use schema::results; let mut q = results::table.into_boxed(); /*if id.is_some() { q = q.filter(results::agent_id.eq(id.unwrap())) }*/ if personal { q = q.filter( results::state .eq(JobState::Queued) .and(results::agent_id.eq(id.unwrap())), ) } else if id.is_some() { q = q .filter(results::agent_id.eq(id.unwrap())) .or_filter(results::job_id.eq(id.unwrap())) .or_filter(results::id.eq(id.unwrap())) } let result = q .load::(self.conn) .map_err(with_err_ctx("Can't get exact jobs"))?; Ok(result) } pub fn set_jobs_for_agent(&mut self, agent_id: Id, job_ids: &[Id]) -> Result<()> { use schema::{jobs, results}; let agent_platform = match self.get_agent(agent_id)? { Some(agent) => Platform::new(&agent.platform), None => { return Err(Error::ProcessingError(format!( "Agent {agent_id} not found" ))) } }; let jobs_meta = jobs::table .select((jobs::id, jobs::alias, jobs::target_platforms)) .filter(jobs::id.eq_any(job_ids)) .load::<(Id, Option, String)>(self.conn) .map_err(with_err_ctx(format!("Can't find jobs {job_ids:?}")))?; for meta in &jobs_meta { if !agent_platform.matches(&meta.2) { return Err(Error::InsuitablePlatform( agent_platform.into_string(), meta.2.clone(), )); } } let job_requests = jobs_meta .into_iter() .map(|(job_id, alias, _)| AssignedJob { job_id, agent_id, alias, ..Default::default() }) .collect::>(); diesel::insert_into(results::table) .values(&job_requests) .execute(self.conn) .map(drop) .map_err(with_err_ctx(format!( "Can't setup jobs {job_ids:?} for agent {agent_id:?}" ))) } pub fn del_jobs(&mut self, ids: &[Id]) -> Result<()> { use schema::jobs; diesel::delete(jobs::table) .filter(jobs::id.eq_any(ids)) .execute(self.conn) .map(drop) .map_err(with_err_ctx("Can't delete jobs")) } pub fn del_results(&mut self, ids: &[Id]) -> Result<()> { use schema::results; diesel::delete(results::table) .filter(results::id.eq_any(ids)) .execute(self.conn) .map(drop) .map_err(with_err_ctx("Can't delete results")) } pub fn del_agents(&mut self, ids: &[Id]) -> Result<()> { use schema::agents; diesel::delete(agents::table) .filter(agents::id.eq_any(ids)) .execute(self.conn) .map(drop) .map_err(with_err_ctx("Can't delete agents")) } pub fn del_payloads(&mut self, ids: &[Id]) -> Result<()> { use schema::payloads; diesel::delete(payloads::table) .filter(payloads::id.eq_any(ids)) .execute(self.conn) .map(drop) .map_err(with_err_ctx("Can't delete payloads")) } pub fn upsert_agent(&mut self, agent: &Agent) -> Result<()> { use schema::agents; diesel::insert_into(agents::table) .values(agent) .on_conflict(agents::id) .do_update() .set(agent) .execute(self.conn) .map_err(with_err_ctx(format!("Can't insert agent {agent:?}")))?; Ok(()) } pub fn update_job(&mut self, job: &JobModel) -> Result<()> { job.save_changes::(self.conn) .map_err(with_err_ctx(format!("Can't update job {job:?}")))?; Ok(()) } pub fn update_result(&mut self, result: &AssignedJob) -> Result<()> { debug!( "updating result: id = {}, job_id = {}, agent_id = {}", result.id, result.job_id, result.agent_id ); result .save_changes::(self.conn) .map_err(with_err_ctx(format!("Can't update result {result:?}")))?; Ok(()) } } fn with_err_ctx(msg: impl AsRef) -> impl Fn(DslError) -> Error { move |err| Error::DBErrorCtx(format!("{}, reason: {err}", msg.as_ref())) }