You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

218 lines
6.4 KiB

use diesel::{pg::PgConnection, prelude::*, result::Error as DslError};
use once_cell::sync::OnceCell;
use std::{
env,
sync::{Arc, Mutex, MutexGuard},
};
use u_lib::{
models::{schema, Agent, AssignedJob, JobMeta, JobState},
ULocalError, ULocalResult,
};
use uuid::Uuid;
pub struct UDB {
pub conn: PgConnection,
}
static DB: OnceCell<Arc<Mutex<UDB>>> = OnceCell::new();
#[cfg_attr(test, automock)]
impl UDB {
pub fn lock_db() -> MutexGuard<'static, UDB> {
DB.get_or_init(|| {
let db_path = env::var("DATABASE_URL").unwrap();
let conn = PgConnection::establish(&db_path).unwrap();
let instance = UDB { conn };
Arc::new(Mutex::new(instance))
})
.lock()
.unwrap()
}
pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> ULocalResult<()> {
use schema::jobs;
diesel::insert_into(jobs::table)
.values(job_metas)
.execute(&self.conn)?;
Ok(())
}
pub fn get_jobs(&self, uid: Option<Uuid>) -> ULocalResult<Vec<JobMeta>> {
use schema::jobs;
let result = if uid.is_some() {
jobs::table
.filter(jobs::id.eq(uid.unwrap()))
.get_results::<JobMeta>(&self.conn)?
} else {
jobs::table.load::<JobMeta>(&self.conn)?
};
Ok(result)
}
pub fn find_job_by_alias(&self, alias: &str) -> ULocalResult<JobMeta> {
use schema::jobs;
let result = jobs::table
.filter(jobs::alias.eq(alias))
.first::<JobMeta>(&self.conn)?;
Ok(result)
}
pub fn insert_agent(&self, agent: &Agent) -> ULocalResult<()> {
use schema::agents;
diesel::insert_into(agents::table)
.values(agent)
.on_conflict(agents::id)
.do_update()
.set(agent)
.execute(&self.conn)?;
Ok(())
}
pub fn get_agents(&self, uid: Option<Uuid>) -> ULocalResult<Vec<Agent>> {
use schema::agents;
let result = if uid.is_some() {
agents::table
.filter(agents::id.eq(uid.unwrap()))
.load::<Agent>(&self.conn)?
} else {
agents::table.load::<Agent>(&self.conn)?
};
Ok(result)
}
pub fn update_job_status(&self, uid: Uuid, status: JobState) -> ULocalResult<()> {
use schema::results;
diesel::update(results::table)
.filter(results::id.eq(uid))
.set(results::state.eq(status))
.execute(&self.conn)?;
Ok(())
}
//TODO: filters possibly could work in a wrong way, check
pub fn get_exact_jobs(
&self,
uid: Option<Uuid>,
personal: bool,
) -> ULocalResult<Vec<AssignedJob>> {
use schema::results;
let mut q = results::table.into_boxed();
if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap()))
}
if personal {
q = q.filter(
results::state
.eq(JobState::Queued)
.and(results::agent_id.eq(uid.unwrap())),
)
} else if uid.is_some() {
q = q
.filter(results::agent_id.eq(uid.unwrap()))
.or_filter(results::job_id.eq(uid.unwrap()))
.or_filter(results::id.eq(uid.unwrap()))
}
let result = q.load::<AssignedJob>(&self.conn)?;
Ok(result)
}
pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &[Uuid]) -> ULocalResult<()> {
use schema::{agents::dsl::agents, jobs::dsl::jobs, results};
if let Err(DslError::NotFound) = agents.find(agent_uid).first::<Agent>(&self.conn) {
return Err(ULocalError::NotFound(agent_uid.to_string()));
}
let not_found_jobs = job_uids
.iter()
.filter_map(|job_uid| {
if let Err(DslError::NotFound) = jobs.find(job_uid).first::<JobMeta>(&self.conn) {
Some(job_uid.to_string())
} else {
None
}
})
.collect::<Vec<String>>();
if not_found_jobs.len() > 0 {
return Err(ULocalError::NotFound(not_found_jobs.join(", ")));
}
let job_requests = job_uids
.iter()
.map(|job_uid| AssignedJob {
job_id: *job_uid,
agent_id: *agent_uid,
..Default::default()
})
.collect::<Vec<AssignedJob>>();
diesel::insert_into(results::table)
.values(&job_requests)
.execute(&self.conn)?;
Ok(())
}
pub fn del_jobs(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::jobs;
let mut affected = 0;
for &uid in uids {
let deleted = diesel::delete(jobs::table)
.filter(jobs::id.eq(uid))
.execute(&self.conn)?;
affected += deleted;
}
Ok(affected)
}
pub fn del_results(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::results;
let mut affected = 0;
for &uid in uids {
let deleted = diesel::delete(results::table)
.filter(results::id.eq(uid))
.execute(&self.conn)?;
affected += deleted;
}
Ok(affected)
}
pub fn del_agents(&self, uids: &Vec<Uuid>) -> ULocalResult<usize> {
use schema::agents;
let mut affected = 0;
for &uid in uids {
let deleted = diesel::delete(agents::table)
.filter(agents::id.eq(uid))
.execute(&self.conn)?;
affected += deleted;
}
Ok(affected)
}
}
/*
#[cfg(test)]
mod tests {
use super::*;
fn setup_db() -> Storage {
return UDB::new().unwrap();
}
#[tokio::test]
async fn test_add_agent() {
let db = setup_db();
let agent = IAgent {
alias: None,
id: "000-000".to_string(),
hostname: "test".to_string(),
is_root: false,
is_root_allowed: false,
platform: "linux".to_string(),
status: None,
token: None,
username: "test".to_string()
};
db.lock().unwrap().new_agent(agent).unwrap();
let result = db.lock().unwrap().get_agents().unwrap();
assert_eq!(
result[0].username,
"test".to_string()
)
}
}
*/