@ -1,4 +1,4 @@
use crate ::errors ::{ Error , SResult } ;
use crate ::error ::Error as ServerError ;
use diesel ::{ pg ::PgConnection , prelude ::* , result ::Error as DslError } ;
use once_cell ::sync ::OnceCell ;
use serde ::Deserialize ;
@ -9,9 +9,10 @@ use u_lib::{
} ;
use uuid ::Uuid ;
type Result < T > = std ::result ::Result < T , ServerError > ;
pub struct UDB {
pub conn : PgConnection ,
__p : ( ) ,
conn : PgConnection ,
}
static DB : OnceCell < Mutex < UDB > > = OnceCell ::new ( ) ;
@ -34,7 +35,6 @@ impl UDB {
) ;
let instance = UDB {
conn : PgConnection ::establish ( & db_url ) . unwrap ( ) ,
__p : ( ) ,
} ;
Mutex ::new ( instance )
} )
@ -42,69 +42,89 @@ impl UDB {
. unwrap ( )
}
pub fn insert_jobs ( & self , job_metas : & [ JobMeta ] ) -> S Result< ( ) > {
pub fn insert_jobs ( & self , job_metas : & [ JobMeta ] ) -> Result < ( ) > {
use schema ::jobs ;
diesel ::insert_into ( jobs ::table )
. values ( job_metas )
. execute ( & self . conn ) ? ;
. execute ( & self . conn )
. map_err ( with_err_ctx ( "Can't insert jobs" ) ) ? ;
Ok ( ( ) )
}
pub fn get_jobs ( & self , uid : Option < Uuid > ) -> S Result< Vec < JobMeta > > {
pub fn get_jobs ( & self , o uid : Option < Uuid > ) -> Result < Vec < JobMeta > > {
use schema ::jobs ;
let result = if uid . is_some ( ) {
jobs ::table
. filter ( jobs ::id . eq ( uid . unwrap ( ) ) )
. get_results ::< JobMeta > ( & self . conn ) ?
} else {
jobs ::table . load ::< JobMeta > ( & self . conn ) ?
} ;
Ok ( result )
match ouid {
Some ( uid ) = > jobs ::table
. filter ( jobs ::id . eq ( uid ) )
. get_results ::< JobMeta > ( & self . conn ) ,
None = > jobs ::table . load ::< JobMeta > ( & self . conn ) ,
}
. map_err ( with_err_ctx ( "Can't get exact jobs" ) )
}
pub fn find_job_by_alias ( & self , alias : & str ) -> S Result< JobMeta > {
pub fn find_job_by_alias ( & self , alias : & str ) -> Result < Option < JobMeta > > {
use schema ::jobs ;
let result = jobs ::table
. filter ( jobs ::alias . eq ( alias ) )
. first ::< JobMeta > ( & self . conn ) ? ;
. first ::< JobMeta > ( & self . conn )
. optional ( )
. map_err ( with_err_ctx ( format! ( "Can't find job by alias {alias}" ) ) ) ? ;
Ok ( result )
}
pub fn insert_agent ( & self , agent : & Agent ) -> S Result< ( ) > {
pub fn insert_agent ( & self , agent : & Agent ) -> Result < ( ) > {
use schema ::agents ;
diesel ::insert_into ( agents ::table )
. values ( agent )
. on_conflict ( agents ::id )
. do_update ( )
. set ( agent )
. execute ( & self . conn ) ? ;
. execute ( & self . conn )
. map_err ( with_err_ctx ( format! ( "Can't insert agent {agent:x?}" ) ) ) ? ;
Ok ( ( ) )
}
pub fn insert_result ( & self , result : & AssignedJob ) -> Result < ( ) > {
use schema ::results ;
diesel ::insert_into ( results ::table )
. values ( result )
. execute ( & self . conn )
. map_err ( with_err_ctx ( format! ( "Can't insert result {result:x?}" ) ) ) ? ;
Ok ( ( ) )
}
pub fn get_agents ( & self , uid : Option < Uuid > ) -> SResult < Vec < Agent > > {
pub fn get_agents ( & self , o uid : Option < Uuid > ) -> Result < Vec < Agent > > {
use schema ::agents ;
let result = if uid . is_some ( ) {
agents ::table
. filter ( agents ::id . eq ( uid . unwrap ( ) ) )
. load ::< Agent > ( & self . conn ) ?
} else {
agents ::table . load ::< Agent > ( & self . conn ) ?
} ;
Ok ( result )
match ouid {
Some ( uid ) = > agents ::table
. filter ( agents ::id . eq ( uid ) )
. load ::< Agent > ( & self . conn ) ,
None = > agents ::table . load ::< Agent > ( & self . conn ) ,
}
. map_err ( with_err_ctx ( format! ( "Can't get agent(s) {ouid:?}" ) ) )
}
pub fn update_job_status ( & self , uid : Uuid , status : JobState ) -> S Result< ( ) > {
pub fn update_job_status ( & self , uid : Uuid , status : JobState ) -> Result < ( ) > {
use schema ::results ;
diesel ::update ( results ::table )
. filter ( results ::id . eq ( uid ) )
. set ( results ::state . eq ( status ) )
. execute ( & self . conn ) ? ;
. execute ( & self . conn )
. map_err ( with_err_ctx ( format! ( "Can't update status of job {uid}" ) ) ) ? ;
Ok ( ( ) )
}
//TODO: filters possibly could work in a wrong way, check
pub fn get_exact_jobs ( & self , uid : Option < Uuid > , personal : bool ) -> S Result< Vec < AssignedJob > > {
pub fn get_exact_jobs ( & self , uid : Option < Uuid > , personal : bool ) -> Result < Vec < AssignedJob > > {
use schema ::results ;
let mut q = results ::table . into_boxed ( ) ;
/* if uid.is_some() {
q = q . filter ( results ::agent_id . eq ( uid . unwrap ( ) ) )
@ -121,32 +141,19 @@ impl UDB {
. or_filter ( results ::job_id . eq ( uid . unwrap ( ) ) )
. or_filter ( results ::id . eq ( uid . unwrap ( ) ) )
}
let result = q . load ::< AssignedJob > ( & self . conn ) ? ;
let result = q
. load ::< AssignedJob > ( & self . conn )
. map_err ( with_err_ctx ( "Can't get exact jobs" ) ) ? ;
Ok ( result )
}
pub fn set_jobs_for_agent ( & self , agent_uid : & Uuid , job_uids : & [ Uuid ] ) -> SResult < Vec < Uuid > > {
use schema ::{ agents ::dsl ::agents , jobs ::dsl ::jobs , results } ;
if let Err ( DslError ::NotFound ) = agents . find ( agent_uid ) . first ::< Agent > ( & self . conn ) {
return Err ( Error ::NotFound ( agent_uid . to_string ( ) ) ) ;
}
let not_found_jobs = job_uids
. iter ( )
. filter_map ( | job_uid | {
if let Err ( DslError ::NotFound ) = jobs . find ( job_uid ) . first ::< JobMeta > ( & self . conn ) {
Some ( job_uid . to_string ( ) )
} else {
None
}
} )
. collect ::< Vec < String > > ( ) ;
if ! not_found_jobs . is_empty ( ) {
return Err ( Error ::NotFound ( not_found_jobs . join ( ", " ) ) ) ;
}
pub fn set_jobs_for_agent ( & self , agent_uid : & Uuid , job_uids : & [ Uuid ] ) -> Result < Vec < Uuid > > {
use schema ::results ;
let job_requests = job_uids
. iter ( )
. map ( | job_uid | {
info ! ( "set_jobs_for_agent: set {} for {}" , job_uid , agent_uid ) ;
debug ! ( "set_jobs_for_agent: set {} for {}" , job_uid , agent_uid ) ;
AssignedJob {
job_id : * job_uid ,
agent_id : * agent_uid ,
@ -154,46 +161,84 @@ impl UDB {
}
} )
. collect ::< Vec < AssignedJob > > ( ) ;
diesel ::insert_into ( results ::table )
. values ( & job_requests )
. execute ( & self . conn ) ? ;
let assigned_uids = job_requests . iter ( ) . map ( | aj | aj . id ) . collect ( ) ;
Ok ( assigned_uids )
. execute ( & self . conn )
. map_err ( with_err_ctx ( format! (
"Can't setup jobs {job_uids:?} for agent {agent_uid:?}"
) ) ) ? ;
Ok ( job_requests . iter ( ) . map ( | aj | aj . id ) . collect ( ) )
}
pub fn del_jobs ( & self , uids : & [ Uuid ] ) -> SResult < usize > {
pub fn del_jobs ( & self , uids : & [ Uuid ] ) -> Result < usize > {
use schema ::jobs ;
let mut affected = 0 ;
for & uid in uids {
let deleted = diesel ::delete ( jobs ::table )
. filter ( jobs ::id . eq ( uid ) )
. execute ( & self . conn ) ? ;
. execute ( & self . conn )
. map_err ( with_err_ctx ( "Can't delete jobs" ) ) ? ;
affected + = deleted ;
}
Ok ( affected )
}
pub fn del_results ( & self , uids : & [ Uuid ] ) -> S Result< usize > {
pub fn del_results ( & self , uids : & [ Uuid ] ) -> Result < usize > {
use schema ::results ;
let mut affected = 0 ;
for & uid in uids {
let deleted = diesel ::delete ( results ::table )
. filter ( results ::id . eq ( uid ) )
. execute ( & self . conn ) ? ;
. execute ( & self . conn )
. map_err ( with_err_ctx ( "Can't delete results" ) ) ? ;
affected + = deleted ;
}
Ok ( affected )
}
pub fn del_agents ( & self , uids : & [ Uuid ] ) -> S Result< usize > {
pub fn del_agents ( & self , uids : & [ Uuid ] ) -> Result < usize > {
use schema ::agents ;
let mut affected = 0 ;
for & uid in uids {
let deleted = diesel ::delete ( agents ::table )
. filter ( agents ::id . eq ( uid ) )
. execute ( & self . conn ) ? ;
. execute ( & self . conn )
. map_err ( with_err_ctx ( "Can't delete agents" ) ) ? ;
affected + = deleted ;
}
Ok ( affected )
}
pub fn update_agent ( & self , agent : & Agent ) -> Result < ( ) > {
agent
. save_changes ::< Agent > ( & self . conn )
. map_err ( with_err_ctx ( format! ( "Can't update agent {agent:x?}" ) ) ) ? ;
Ok ( ( ) )
}
pub fn update_job ( & self , job : & JobMeta ) -> Result < ( ) > {
job . save_changes ::< JobMeta > ( & self . conn )
. map_err ( with_err_ctx ( format! ( "Can't update job {job:x?}" ) ) ) ? ;
Ok ( ( ) )
}
pub fn update_result ( & self , result : & AssignedJob ) -> Result < ( ) > {
debug ! (
"updating result: id = {}, job_id = {}, agent_id = {}" ,
result . id , result . job_id , result . agent_id
) ;
result
. save_changes ::< AssignedJob > ( & self . conn )
. map_err ( with_err_ctx ( format! ( "Can't update result {result:x?}" ) ) ) ? ;
Ok ( ( ) )
}
}
fn with_err_ctx ( msg : impl AsRef < str > ) -> impl Fn ( DslError ) -> ServerError {
move | err | ServerError ::DBErrorCtx ( format! ( "{}, reason: {err}" , msg . as_ref ( ) ) )
}