can i ever sleep?

- fix envs naming and handling
- refactor db interface to use deadpool
- bump deps
- add migrator
- simplify schema generation
- make daemonize work
- workspaced more deps
- split integration-tests and deploy ymls
- cleanup dead code
- fix inages naming for podman
master
plazmoid 2 years ago
parent 4bac5ac6e9
commit 699896f335
  1. 7
      .env
  2. 8
      Cargo.toml
  3. 14
      Makefile.toml
  4. 13
      bin/migrator/Cargo.toml
  5. 140
      bin/migrator/src/database.rs
  6. 88
      bin/migrator/src/main.rs
  7. 82
      bin/migrator/src/query_helper.rs
  8. 1
      bin/u_agent/Cargo.toml
  9. 25
      bin/u_agent/src/lib.rs
  10. 7
      bin/u_agent/src/main.rs
  11. 12
      bin/u_panel/src/main.rs
  12. 2
      bin/u_panel/src/server/mod.rs
  13. 6
      bin/u_server/Cargo.toml
  14. 136
      bin/u_server/src/db.rs
  15. 3
      bin/u_server/src/error.rs
  16. 100
      bin/u_server/src/handlers.rs
  17. 10
      bin/u_server/src/main.rs
  18. 63
      bin/u_server/src/u_server.rs
  19. 53
      deploy/podman-compose.yml
  20. 9
      deploy/start_server.sh
  21. 17
      images/integration-tests/u_db.Dockerfile
  22. 7
      images/integration-tests/u_db_entrypoint.sh
  23. 2
      images/tests_runner.Dockerfile
  24. 0
      images/u_agent.Dockerfile
  25. 8
      images/u_db.Dockerfile
  26. 0
      images/u_server.Dockerfile
  27. 20
      integration/docker-compose.yml
  28. 4
      integration/docker.py
  29. 4
      integration/tests/helpers/mod.rs
  30. 2
      integration/tests/integration/connection.rs
  31. 7
      lib/u_lib/Cargo.toml
  32. 13
      lib/u_lib/src/config.rs
  33. 75
      lib/u_lib/src/config/mod.rs
  34. 26
      lib/u_lib/src/db.rs
  35. 72
      lib/u_lib/src/errors/mod.rs
  36. 71
      lib/u_lib/src/errors/variants.rs
  37. 18
      lib/u_lib/src/lib.rs
  38. 18
      lib/u_lib/src/models/agent.rs
  39. 3
      lib/u_lib/src/models/jobs/assigned.rs
  40. 2
      lib/u_lib/src/models/jobs/meta.rs
  41. 16
      lib/u_lib/src/models/jobs/misc.rs
  42. 25
      lib/u_lib/src/models/schema.rs
  43. 29
      lib/u_lib/src/utils/env.rs
  44. 24
      lib/u_lib/src/utils/fmt/hexlify.rs
  45. 5
      lib/u_lib/src/utils/fmt/mod.rs
  46. 80
      lib/u_lib/src/utils/fmt/stripped.rs
  47. 6
      lib/u_lib/src/utils/mod.rs
  48. 2
      lib/u_lib/src/utils/tempfile.rs
  49. 3
      sample.env.private
  50. 20
      scripts/deploy.sh
  51. 20
      scripts/gen_schema.sh
  52. 5
      scripts/start_server.sh
  53. 6
      scripts/u_db_entrypoint.sh

@ -1,5 +1,6 @@
DB_HOST=u_db
DB_NAME=u_db
DB_USER=postgres
POSTGRES_HOST=u_db
POSTGRES_DATABASE=u_db
POSTGRES_USER=u_ser
POSTGRES_PORT=5432
RUST_BACKTRACE=1
U_SERVER=u_server

@ -1,15 +1,19 @@
[workspace]
members = [
"bin/migrator",
"bin/u_agent",
"bin/u_panel",
"bin/u_run",
"bin/u_server",
"lib/u_lib",
"integration"
"integration",
]
[workspace.dependencies]
anyhow = "=1.0.63"
deadpool-diesel = "0.4.0"
diesel = { version = "2", features = ["postgres", "uuid"] }
openssl = "0.10"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@ -18,7 +22,7 @@ tokio = { version = "1.11", features = ["macros"] }
tracing = "0.1.35"
tracing-appender = "0.2.0"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"]}
uuid = "0.6.5"
uuid = "1.2.1"
[profile.release]
panic = "abort"

@ -81,19 +81,11 @@ cd ./integration
bash integration_tests.sh ${@}
'''
[tasks.gen_schema]
script = '''
docker run --rm \
--env-file=$PWD/.env \
--env-file=$PWD/.env.private \
-v $PWD:/unki \
-w /unki \
unki/u_db \
/unki/images/integration-tests/u_db_entrypoint.sh || true
'''
[tasks.test]
dependencies = ["unit", "integration"]
[tasks.gen_schema]
script = './scripts/gen_schema.sh'
[tasks.deploy]
script = './scripts/deploy.sh'

@ -0,0 +1,13 @@
[package]
name = "migrator"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
diesel = { workspace = true, features = ["postgres", "serde_json"] }
diesel_migrations = { version = "2.0.0", features = ["postgres"] }
openssl = { workspace = true }
u_lib = { path = "../../lib/u_lib" }
url = "2.3.1"

@ -0,0 +1,140 @@
use super::query_helper;
use diesel::dsl::sql;
use diesel::sql_types::Bool;
use diesel::*;
use std::env;
use std::error::Error;
type DatabaseResult<T> = Result<T, Box<dyn Error>>;
pub enum Backend {
Pg,
}
impl Backend {
pub fn for_url(database_url: &str) -> Self {
match database_url {
_ if database_url.starts_with("postgres://")
|| database_url.starts_with("postgresql://") =>
{
Backend::Pg
}
_ => panic!(
"At least one backend must be specified for use with this crate. \
You may omit the unneeded dependencies in the following command. \n\n \
ex. `cargo install diesel_cli --no-default-features --features mysql postgres sqlite` \n"
),
}
}
}
pub enum InferConnection {
Pg(PgConnection),
}
impl InferConnection {
pub fn establish(database_url: &str) -> DatabaseResult<Self> {
match Backend::for_url(database_url) {
Backend::Pg => PgConnection::establish(database_url).map(InferConnection::Pg),
}
.map_err(Into::into)
}
}
pub fn reset_database() -> DatabaseResult<()> {
drop_database(&database_url())?;
setup_database()
}
pub fn setup_database() -> DatabaseResult<()> {
let database_url = database_url();
create_database_if_needed(&database_url)?;
Ok(())
}
pub fn drop_database_command() -> DatabaseResult<()> {
drop_database(&database_url())
}
/// Creates the database specified in the connection url. It returns an error
/// it was unable to create the database.
fn create_database_if_needed(database_url: &str) -> DatabaseResult<()> {
match Backend::for_url(database_url) {
Backend::Pg => {
if PgConnection::establish(database_url).is_err() {
let (database, postgres_url) = change_database_of_url(database_url, "postgres");
println!("Creating database: {}", database);
let mut conn = PgConnection::establish(&postgres_url)?;
query_helper::create_database(&database).execute(&mut conn)?;
}
}
}
Ok(())
}
/// Drops the database specified in the connection url. It returns an error
/// if it was unable to drop the database.
fn drop_database(database_url: &str) -> DatabaseResult<()> {
match Backend::for_url(database_url) {
Backend::Pg => {
let (database, postgres_url) = change_database_of_url(database_url, "postgres");
let mut conn = PgConnection::establish(&postgres_url)?;
if pg_database_exists(&mut conn, &database)? {
println!("Dropping database: {}", database);
query_helper::drop_database(&database)
.if_exists()
.execute(&mut conn)?;
}
}
}
Ok(())
}
table! {
pg_database (datname) {
datname -> Text,
datistemplate -> Bool,
}
}
fn pg_database_exists(conn: &mut PgConnection, database_name: &str) -> QueryResult<bool> {
use self::pg_database::dsl::*;
pg_database
.select(datname)
.filter(datname.eq(database_name))
.filter(datistemplate.eq(false))
.get_result::<String>(conn)
.optional()
.map(|x| x.is_some())
}
/// Returns true if the `__diesel_schema_migrations` table exists in the
/// database we connect to, returns false if it does not.
pub fn schema_table_exists(database_url: &str) -> DatabaseResult<bool> {
match InferConnection::establish(database_url).unwrap() {
InferConnection::Pg(mut conn) => select(sql::<Bool>(
"EXISTS \
(SELECT 1 \
FROM information_schema.tables \
WHERE table_name = '__diesel_schema_migrations')",
))
.get_result(&mut conn),
}
.map_err(Into::into)
}
pub fn database_url() -> String {
env::var("DATABASE_URL").unwrap()
}
fn change_database_of_url(database_url: &str, default_database: &str) -> (String, String) {
let base = ::url::Url::parse(database_url).unwrap();
let database = base.path_segments().unwrap().last().unwrap().to_owned();
let mut new_url = base.join(default_database).unwrap();
new_url.set_query(base.query());
(database, new_url.into())
}

@ -0,0 +1,88 @@
// due to linking errors
extern crate openssl;
// don't touch anything
extern crate diesel;
// in this block
pub mod database;
pub mod query_helper;
use diesel::migration::Migration;
use diesel::{migration, pg::PgConnection, Connection};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use std::error::Error;
use u_lib::config;
use u_lib::db::generate_postgres_url;
const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let action = action::parse_command_line()?;
let dbconfig = config::db::load()?;
database::setup_database().unwrap();
let conn = PgConnection::establish(&generate_postgres_url(&dbconfig))?;
run(action, conn)
}
fn run(action: action::Action, mut conn: PgConnection) -> migration::Result<()> {
use action::Action::*;
match action {
ListPending => {
let list = conn.pending_migrations(MIGRATIONS)?;
if list.is_empty() {
println!("No pending migrations.");
}
for mig in list {
println!("Pending migration: {}", mig.name());
}
}
MigrateUp => {
let list = conn.run_pending_migrations(MIGRATIONS)?;
if list.is_empty() {
println!("No pending migrations.");
}
for mig in list {
println!("Applied migration: {}", mig);
}
}
MigrateDown => {
let mig = conn.revert_last_migration(MIGRATIONS)?;
println!("Reverted migration: {}", mig);
}
}
Ok(())
}
mod action {
pub enum Action {
ListPending,
MigrateUp,
MigrateDown,
}
impl TryFrom<&str> for Action {
type Error = ();
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"" | "list" => Ok(Action::ListPending),
"up" => Ok(Action::MigrateUp),
"down" => Ok(Action::MigrateDown),
_ => Err(()),
}
}
}
pub fn parse_command_line() -> Result<Action, String> {
let action_str = std::env::args().nth(1).unwrap_or_default();
let action = action_str.as_str().try_into().map_err(|_| {
format!(
"unrecognized command line argument: {} (expected 'up', 'down', 'list')",
action_str
)
})?;
Ok(action)
}
}

@ -0,0 +1,82 @@
use diesel::backend::Backend;
use diesel::query_builder::*;
use diesel::result::QueryResult;
use diesel::RunQueryDsl;
#[derive(Debug, Clone)]
pub struct DropDatabaseStatement {
db_name: String,
if_exists: bool,
}
impl DropDatabaseStatement {
pub fn new(db_name: &str) -> Self {
DropDatabaseStatement {
db_name: db_name.to_owned(),
if_exists: false,
}
}
pub fn if_exists(self) -> Self {
DropDatabaseStatement {
if_exists: true,
..self
}
}
}
impl<DB: Backend> QueryFragment<DB> for DropDatabaseStatement {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
out.push_sql("DROP DATABASE ");
if self.if_exists {
out.push_sql("IF EXISTS ");
}
out.push_identifier(&self.db_name)?;
Ok(())
}
}
impl<Conn> RunQueryDsl<Conn> for DropDatabaseStatement {}
impl QueryId for DropDatabaseStatement {
type QueryId = ();
const HAS_STATIC_QUERY_ID: bool = false;
}
#[derive(Debug, Clone)]
pub struct CreateDatabaseStatement {
db_name: String,
}
impl CreateDatabaseStatement {
pub fn new(db_name: &str) -> Self {
CreateDatabaseStatement {
db_name: db_name.to_owned(),
}
}
}
impl<DB: Backend> QueryFragment<DB> for CreateDatabaseStatement {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
out.push_sql("CREATE DATABASE ");
out.push_identifier(&self.db_name)?;
Ok(())
}
}
impl<Conn> RunQueryDsl<Conn> for CreateDatabaseStatement {}
impl QueryId for CreateDatabaseStatement {
type QueryId = ();
const HAS_STATIC_QUERY_ID: bool = false;
}
pub fn drop_database(db_name: &str) -> DropDatabaseStatement {
DropDatabaseStatement::new(db_name)
}
pub fn create_database(db_name: &str) -> CreateDatabaseStatement {
CreateDatabaseStatement::new(db_name)
}

@ -7,6 +7,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
daemonize = "0.4.1"
log = "^0.4"
reqwest = { workspace = true }
sysinfo = "0.10.5"

@ -5,13 +5,15 @@
#[macro_use]
extern crate log;
//use daemonize::Daemonize;
use daemonize::Daemonize;
use std::sync::Arc;
use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};
use u_lib::config;
use u_lib::{
api::ClientHandler, cache::JobCache, config::get_self_uid, errors::ErrChan,
executor::pop_completed, logging::init_logger, messaging::Reportable, models::AssignedJobById,
runner::JobRunner, utils::load_env_default,
runner::JobRunner,
};
const ITERATION_LATENCY: u64 = 5;
@ -95,10 +97,9 @@ async fn agent_loop(client: Arc<ClientHandler>) -> ! {
}
}
pub async fn run_forever() -> ! {
let env = load_env_default().unwrap();
pub fn run_forever() {
let env = config::endpoints::load().unwrap();
let client = Arc::new(ClientHandler::new(&env.u_server, None));
tokio::spawn(error_reporting(client.clone()));
if cfg!(debug_assertions) {
init_logger(Some(format!(
@ -110,11 +111,17 @@ pub async fn run_forever() -> ! {
.next()
.unwrap()
)));
// } else {
// if let Err(e) = Daemonize::new().start() {
// ErrChan::send(UError::Runtime(e.to_string()), "deeeemon").await
// }
} else {
Daemonize::new().start().unwrap();
}
info!("Starting agent {}", get_self_uid());
Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
tokio::spawn(error_reporting(client.clone()));
agent_loop(client).await
})
}

@ -1,6 +1,3 @@
use u_agent::run_forever;
#[tokio::main]
async fn main() {
run_forever().await;
fn main() {
u_agent::run_forever();
}

@ -6,22 +6,14 @@ extern crate tracing;
use anyhow::Result as AnyResult;
use argparse::{process_cmd, Args};
use serde::Deserialize;
use structopt::StructOpt;
use u_lib::api::ClientHandler;
use u_lib::config::admin;
use u_lib::logging::init_logger;
use u_lib::utils::{env::default_host, load_env};
#[derive(Deserialize)]
struct AccessEnv {
admin_auth_token: String,
#[serde(default = "default_host")]
u_server: String,
}
#[actix_web::main]
async fn main() -> AnyResult<()> {
let env = load_env::<AccessEnv>()?;
let env = admin::load()?;
let client = ClientHandler::new(&env.u_server, Some(env.admin_auth_token));
let args = Args::from_args();

@ -78,7 +78,7 @@ pub async fn serve(client: ClientHandler) -> anyhow::Result<()> {
info!("Connecting to u_server...");
client.ping().await?;
let addr = "127.0.0.1:8080";
let addr = "127.0.0.1:7799";
info!("Connected, instanciating u_panel at http://{}", addr);
HttpServer::new(move || {

@ -6,10 +6,11 @@ version = "0.1.0"
[dependencies]
anyhow = { workspace = true }
diesel = { version = "1.4.5", features = ["postgres", "uuid"] }
diesel = { workspace = true }
deadpool-diesel = { workspace = true }
hyper = "0.14"
once_cell = "1.7.2"
openssl = "*"
openssl = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
@ -19,7 +20,6 @@ uuid = { workspace = true, features = ["serde", "v4"] }
u_lib = { path = "../../lib/u_lib", features = ["server"] }
warp = { version = "0.3.1", features = ["tls"] }
[dev-dependencies]
rstest = "0.12"

@ -1,81 +1,85 @@
use crate::error::Error as ServerError;
use diesel::{pg::PgConnection, prelude::*, result::Error as DslError};
use once_cell::sync::OnceCell;
use serde::Deserialize;
use std::sync::{Mutex, MutexGuard};
use u_lib::{
models::{schema, Agent, AssignedJob, JobMeta, JobState},
utils::load_env,
};
use diesel::{pg::PgConnection, prelude::*, result::Error as DslError, Connection};
use u_lib::db::PgAsyncPool;
use u_lib::models::{schema, Agent, AssignedJob, JobMeta, JobState};
use uuid::Uuid;
type Result<T> = std::result::Result<T, ServerError>;
pub struct UDB {
conn: PgConnection,
pub struct PgRepo {
pool: PgAsyncPool,
}
static DB: OnceCell<Mutex<UDB>> = OnceCell::new();
impl PgRepo {
pub fn new(pool: PgAsyncPool) -> PgRepo {
PgRepo { pool }
}
#[derive(Deserialize)]
struct DBEnv {
db_host: String,
db_name: String,
db_user: String,
db_password: String,
pub async fn interact<F, R>(&self, f: F) -> Result<R>
where
F: for<'c> FnOnce(UDB<'c>) -> Result<R>,
F: Send + 'static,
R: Send + 'static,
{
let connection = self.pool.get().await?;
connection
.interact(|conn| f(UDB { conn }))
.await
.expect("deadpool interaction failed")
}
impl UDB {
pub fn lock_db() -> MutexGuard<'static, Self> {
DB.get_or_init(|| {
let env = load_env::<DBEnv>().unwrap();
let db_url = format!(
"postgres://{}:{}@{}/{}",
env.db_user, env.db_password, env.db_host, env.db_name
);
let instance = UDB {
conn: PgConnection::establish(&db_url).unwrap(),
};
Mutex::new(instance)
})
.lock()
.unwrap()
pub async fn transaction<F, R>(&self, f: F) -> Result<R>
where
F: for<'c> FnOnce(UDB<'c>) -> Result<R>,
F: Send + 'static,
R: Send + 'static,
{
let conn = self.pool.get().await?;
conn.interact(|c| c.transaction(|conn| f(UDB { conn })))
.await
.expect("deadpool interaction failed")
}
}
pub struct UDB<'c> {
conn: &'c mut PgConnection,
}
pub fn insert_jobs(&self, job_metas: &[JobMeta]) -> Result<Vec<Uuid>> {
impl UDB<'_> {
pub fn insert_jobs(&mut self, job_metas: &[JobMeta]) -> Result<Vec<Uuid>> {
use schema::jobs;
diesel::insert_into(jobs::table)
.values(job_metas)
.get_results(&self.conn)
.get_results(self.conn)
.map(|rows| rows.iter().map(|job: &JobMeta| job.id).collect())
.map_err(with_err_ctx("Can't insert jobs"))
}
pub fn get_jobs(&self, ouid: Option<Uuid>) -> Result<Vec<JobMeta>> {
pub fn get_jobs(&mut self, ouid: Option<Uuid>) -> Result<Vec<JobMeta>> {
use schema::jobs;
match ouid {
Some(uid) => jobs::table
.filter(jobs::id.eq(uid))
.get_results::<JobMeta>(&self.conn),
None => jobs::table.load::<JobMeta>(&self.conn),
.get_results::<JobMeta>(self.conn),
None => jobs::table.load::<JobMeta>(self.conn),
}
.map_err(with_err_ctx("Can't get exact jobs"))
}
pub fn find_job_by_alias(&self, alias: &str) -> Result<Option<JobMeta>> {
pub fn find_job_by_alias(&mut self, alias: &str) -> Result<Option<JobMeta>> {
use schema::jobs;
let result = jobs::table
.filter(jobs::alias.eq(alias))
.first::<JobMeta>(&self.conn)
.first::<JobMeta>(self.conn)
.optional()
.map_err(with_err_ctx(format!("Can't find job by alias {alias}")))?;
Ok(result)
}
pub fn insert_agent(&self, agent: &Agent) -> Result<()> {
pub fn insert_agent(&mut self, agent: &Agent) -> Result<()> {
use schema::agents;
diesel::insert_into(agents::table)
@ -83,46 +87,50 @@ impl UDB {
.on_conflict(agents::id)
.do_update()
.set(agent)
.execute(&self.conn)
.execute(self.conn)
.map_err(with_err_ctx(format!("Can't insert agent {agent:x?}")))?;
Ok(())
}
pub fn insert_result(&self, result: &AssignedJob) -> Result<()> {
pub fn insert_result(&mut self, result: &AssignedJob) -> Result<()> {
use schema::results;
diesel::insert_into(results::table)
.values(result)
.execute(&self.conn)
.execute(self.conn)
.map_err(with_err_ctx(format!("Can't insert result {result:x?}")))?;
Ok(())
}
pub fn get_agents(&self, ouid: Option<Uuid>) -> Result<Vec<Agent>> {
pub fn get_agents(&mut self, ouid: Option<Uuid>) -> Result<Vec<Agent>> {
use schema::agents;
match ouid {
Some(uid) => agents::table
.filter(agents::id.eq(uid))
.load::<Agent>(&self.conn),
None => agents::table.load::<Agent>(&self.conn),
.load::<Agent>(self.conn),
None => agents::table.load::<Agent>(self.conn),
}
.map_err(with_err_ctx(format!("Can't get agent(s) {ouid:?}")))
}
pub fn update_job_status(&self, uid: Uuid, status: JobState) -> Result<()> {
pub fn update_job_status(&mut self, uid: Uuid, status: JobState) -> Result<()> {
use schema::results;
diesel::update(results::table)
.filter(results::id.eq(uid))
.set(results::state.eq(status))
.execute(&self.conn)
.execute(self.conn)
.map_err(with_err_ctx(format!("Can't update status of job {uid}")))?;
Ok(())
}
//TODO: filters possibly could work in a wrong way, check
pub fn get_exact_jobs(&self, uid: Option<Uuid>, personal: bool) -> Result<Vec<AssignedJob>> {
pub fn get_exact_jobs(
&mut self,
uid: Option<Uuid>,
personal: bool,
) -> Result<Vec<AssignedJob>> {
use schema::results;
let mut q = results::table.into_boxed();
@ -142,12 +150,12 @@ impl UDB {
.or_filter(results::id.eq(uid.unwrap()))
}
let result = q
.load::<AssignedJob>(&self.conn)
.load::<AssignedJob>(self.conn)
.map_err(with_err_ctx("Can't get exact jobs"))?;
Ok(result)
}
pub fn set_jobs_for_agent(&self, agent_uid: &Uuid, job_uids: &[Uuid]) -> Result<Vec<Uuid>> {
pub fn set_jobs_for_agent(&mut self, agent_uid: &Uuid, job_uids: &[Uuid]) -> Result<Vec<Uuid>> {
use schema::results;
let job_requests = job_uids
@ -161,7 +169,7 @@ impl UDB {
diesel::insert_into(results::table)
.values(&job_requests)
.execute(&self.conn)
.execute(self.conn)
.map_err(with_err_ctx(format!(
"Can't setup jobs {job_uids:?} for agent {agent_uid:?}"
)))?;
@ -169,68 +177,68 @@ impl UDB {
Ok(job_requests.iter().map(|aj| aj.id).collect())
}
pub fn del_jobs(&self, uids: &[Uuid]) -> Result<usize> {
pub fn del_jobs(&mut self, uids: &[Uuid]) -> Result<usize> {
use schema::jobs;
let mut affected = 0;
for &uid in uids {
let deleted = diesel::delete(jobs::table)
.filter(jobs::id.eq(uid))
.execute(&self.conn)
.execute(self.conn)
.map_err(with_err_ctx("Can't delete jobs"))?;
affected += deleted;
}
Ok(affected)
}
pub fn del_results(&self, uids: &[Uuid]) -> Result<usize> {
pub fn del_results(&mut self, uids: &[Uuid]) -> Result<usize> {
use schema::results;
let mut affected = 0;
for &uid in uids {
let deleted = diesel::delete(results::table)
.filter(results::id.eq(uid))
.execute(&self.conn)
.execute(self.conn)
.map_err(with_err_ctx("Can't delete results"))?;
affected += deleted;
}
Ok(affected)
}
pub fn del_agents(&self, uids: &[Uuid]) -> Result<usize> {
pub fn del_agents(&mut self, uids: &[Uuid]) -> Result<usize> {
use schema::agents;
let mut affected = 0;
for &uid in uids {
let deleted = diesel::delete(agents::table)
.filter(agents::id.eq(uid))
.execute(&self.conn)
.execute(self.conn)
.map_err(with_err_ctx("Can't delete agents"))?;
affected += deleted;
}
Ok(affected)
}
pub fn update_agent(&self, agent: &Agent) -> Result<()> {
pub fn update_agent(&mut self, agent: &Agent) -> Result<()> {
agent
.save_changes::<Agent>(&self.conn)
.save_changes::<Agent>(self.conn)
.map_err(with_err_ctx(format!("Can't update agent {agent:x?}")))?;
Ok(())
}
pub fn update_job(&self, job: &JobMeta) -> Result<()> {
job.save_changes::<JobMeta>(&self.conn)
pub fn update_job(&mut self, job: &JobMeta) -> Result<()> {
job.save_changes::<JobMeta>(self.conn)
.map_err(with_err_ctx(format!("Can't update job {job:x?}")))?;
Ok(())
}
pub fn update_result(&self, result: &AssignedJob) -> Result<()> {
pub fn update_result(&mut self, result: &AssignedJob) -> Result<()> {
debug!(
"updating result: id = {}, job_id = {}, agent_id = {}",
result.id, result.job_id, result.agent_id
);
result
.save_changes::<AssignedJob>(&self.conn)
.save_changes::<AssignedJob>(self.conn)
.map_err(with_err_ctx(format!("Can't update result {result:x?}")))?;
Ok(())
}

@ -18,6 +18,9 @@ pub enum Error {
#[error("DB error: {0}")]
DBErrorCtx(String),
#[error("Deadpool error: {0}")]
DeadpoolError(#[from] deadpool_diesel::PoolError),
#[error("General error: {0}")]
Other(String),
}

@ -1,4 +1,6 @@
use crate::db::UDB;
use std::sync::Arc;
use crate::db::{PgRepo, UDB};
use crate::error::Error;
use u_lib::{
messaging::{AsMsg, BaseMessage, Reportable},
@ -13,26 +15,29 @@ type EndpResult<T> = Result<T, Rejection>;
pub struct Endpoints;
impl Endpoints {
pub async fn add_agent(msg: Agent) -> EndpResult<()> {
UDB::lock_db().insert_agent(&msg).map_err(From::from)
}
pub async fn get_agents(uid: Option<Uuid>) -> EndpResult<Vec<Agent>> {
UDB::lock_db().get_agents(uid).map_err(From::from)
pub async fn get_agents(repo: Arc<PgRepo>, uid: Option<Uuid>) -> EndpResult<Vec<Agent>> {
repo.interact(move |mut db| db.get_agents(uid))
.await
.map_err(From::from)
}
pub async fn get_jobs(uid: Option<Uuid>) -> EndpResult<Vec<JobMeta>> {
UDB::lock_db().get_jobs(uid).map_err(From::from)
pub async fn get_jobs(repo: Arc<PgRepo>, uid: Option<Uuid>) -> EndpResult<Vec<JobMeta>> {
repo.interact(move |mut db| db.get_jobs(uid))
.await
.map_err(From::from)
}
pub async fn get_agent_jobs(uid: Option<Uuid>) -> EndpResult<Vec<AssignedJob>> {
UDB::lock_db()
.get_exact_jobs(uid, false)
pub async fn get_agent_jobs(
repo: Arc<PgRepo>,
uid: Option<Uuid>,
) -> EndpResult<Vec<AssignedJob>> {
repo.interact(move |mut db| db.get_exact_jobs(uid, false))
.await
.map_err(From::from)
}
pub async fn get_personal_jobs(uid: Uuid) -> EndpResult<Vec<AssignedJob>> {
let db = UDB::lock_db();
pub async fn get_personal_jobs(repo: Arc<PgRepo>, uid: Uuid) -> EndpResult<Vec<AssignedJob>> {
repo.transaction(move |mut db| {
let mut agents = db.get_agents(Some(uid))?;
if agents.is_empty() {
let new_agent = Agent::with_id(uid);
@ -52,52 +57,69 @@ impl Endpoints {
db.update_job_status(j.id, JobState::Running)?;
}
Ok(result)
})
.await
.map_err(From::from)
}
pub async fn upload_jobs(msg: BaseMessage<'static, Vec<JobMeta>>) -> EndpResult<Vec<Uuid>> {
UDB::lock_db()
.insert_jobs(&msg.into_inner())
pub async fn upload_jobs(
repo: Arc<PgRepo>,
msg: BaseMessage<'static, Vec<JobMeta>>,
) -> EndpResult<Vec<Uuid>> {
repo.interact(move |mut db| db.insert_jobs(&msg.into_inner()))
.await
.map_err(From::from)
}
pub async fn del(uid: Uuid) -> EndpResult<usize> {
let db = UDB::lock_db();
pub async fn del(repo: Arc<PgRepo>, uid: Uuid) -> EndpResult<usize> {
repo.transaction(move |mut db| {
let del_fns = &[UDB::del_agents, UDB::del_jobs, UDB::del_results];
for del_fn in del_fns {
let affected = del_fn(&db, &[uid])?;
let affected = del_fn(&mut db, &[uid])?;
if affected > 0 {
return Ok(affected);
}
}
Ok(0)
})
.await
.map_err(From::from)
}
pub async fn set_jobs(
repo: Arc<PgRepo>,
agent_uid: Uuid,
msg: BaseMessage<'static, Vec<String>>,
) -> EndpResult<Vec<Uuid>> {
repo.transaction(move |mut db| {
msg.into_inner()
.into_iter()
.map(|ident| {
Uuid::parse_str(&ident).or_else(|_| {
let job_from_db = UDB::lock_db().find_job_by_alias(&ident);
let job_from_db = db.find_job_by_alias(&ident);
match job_from_db {
Ok(job) => match job {
Some(j) => Ok(j.id),
None => Err(Error::ProcessingError(format!("unknown ident {ident}"))),
None => {
Err(Error::ProcessingError(format!("unknown ident {ident}")))
}
},
Err(e) => Err(e),
}
})
})
.collect::<Result<Vec<Uuid>, Error>>()
.and_then(|j| UDB::lock_db().set_jobs_for_agent(&agent_uid, &j))
.and_then(|j| db.set_jobs_for_agent(&agent_uid, &j))
})
.await
.map_err(From::from)
}
pub async fn report<Data: OneOrVec<Reportable> + AsMsg + 'static>(
pub async fn report<Data: OneOrVec<Reportable> + AsMsg + Send + Sync + 'static>(
repo: Arc<PgRepo>,
msg: BaseMessage<'static, Data>,
) -> EndpResult<()> {
repo.transaction(move |mut db| {
let id = msg.id;
for entry in msg.into_inner().into_vec() {
match entry {
@ -120,7 +142,7 @@ impl Endpoints {
}
};
agent.state = AgentState::Active;
Self::add_agent(agent).await?;
db.insert_agent(&agent)?;
}
None => warn!("Empty agent data"),
},
@ -128,31 +150,43 @@ impl Endpoints {
JobType::Terminate => todo!(),
JobType::Update => todo!(),
}
UDB::lock_db().update_result(&result)?;
db.update_result(&result)?;
}
Reportable::Error(e) => {
warn!("{} reported an error: {}", id, e);
}
Reportable::Dummy => (),
}
}
}}
Ok(())
})
.await
.map_err(From::from)
}
pub async fn update_agent(agent: BaseMessage<'static, Agent>) -> EndpResult<()> {
UDB::lock_db().update_agent(&agent.into_inner())?;
pub async fn update_agent(
repo: Arc<PgRepo>,
agent: BaseMessage<'static, Agent>,
) -> EndpResult<()> {
repo.interact(move |mut db| db.update_agent(&agent.into_inner()))
.await?;
Ok(())
}
pub async fn update_job(job: BaseMessage<'static, JobMeta>) -> EndpResult<()> {
UDB::lock_db().update_job(&job.into_inner())?;
pub async fn update_job(
repo: Arc<PgRepo>,
job: BaseMessage<'static, JobMeta>,
) -> EndpResult<()> {
repo.interact(move |mut db| db.update_job(&job.into_inner()))
.await?;
Ok(())
}
pub async fn update_assigned_job(
repo: Arc<PgRepo>,
assigned: BaseMessage<'static, AssignedJob>,
) -> EndpResult<()> {
UDB::lock_db().update_result(&assigned.into_inner())?;
repo.interact(move |mut db| db.update_result(&assigned.into_inner()))
.await?;
Ok(())
}

@ -1,11 +1,17 @@
use u_server_lib::serve;
// due to linking errors
extern crate openssl;
// don't touch anything
extern crate diesel;
// in this block
#[macro_use]
extern crate tracing;
#[tokio::main]
async fn main() {
if let Err(e) = serve().await {
u_lib::logging::init_logger(Some("u_server"));
if let Err(e) = u_server_lib::serve().await {
error!("U_SERVER error: {}", e);
}
}

@ -5,25 +5,19 @@ extern crate tracing;
#[macro_use]
extern crate rstest;
// due to linking errors
extern crate openssl;
// don't touch anything
extern crate diesel;
// in this block
mod db;
mod error;
mod handlers;
use db::PgRepo;
use error::{Error as ServerError, RejResponse};
use serde::{de::DeserializeOwned, Deserialize};
use std::{convert::Infallible, path::PathBuf};
use serde::de::DeserializeOwned;
use std::{convert::Infallible, path::PathBuf, sync::Arc};
use u_lib::{
config::MASTER_PORT,
logging::init_logger,
config,
db::async_pool,
messaging::{AsMsg, BaseMessage, Reportable},
models::*,
utils::load_env,
};
use uuid::Uuid;
use warp::{
@ -33,19 +27,13 @@ use warp::{
Filter, Rejection, Reply,
};
use crate::db::UDB;
use crate::handlers::Endpoints;
#[derive(Deserialize)]
struct ServEnv {
admin_auth_token: String,
}
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::content_length_limit(1024 * 64).and(body::json::<BaseMessage<M>>())
body::json::<BaseMessage<M>>()
}
fn into_message<M: AsMsg>(msg: M) -> Json {
@ -54,11 +42,16 @@ fn into_message<M: AsMsg>(msg: M) -> Json {
pub fn init_endpoints(
auth_token: &str,
db: PgRepo,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let path = |p: &'static str| warp::post().and(warp::path(p));
let infallible_none = |_| async { Ok::<_, Infallible>((None::<Uuid>,)) };
let adb = Arc::new(db);
let with_db = warp::any().map(move || adb.clone());
let get_agents = path("get_agents")
.and(with_db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
@ -68,11 +61,13 @@ pub fn init_endpoints(
.map(into_message);
let upload_jobs = path("upload_jobs")
.and(with_db.clone())
.and(get_content::<Vec<JobMeta>>())
.and_then(Endpoints::upload_jobs)
.map(into_message);
let get_jobs = path("get_jobs")
.and(with_db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
@ -82,6 +77,7 @@ pub fn init_endpoints(
.map(into_message);
let get_agent_jobs = path("get_agent_jobs")
.and(with_db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
@ -91,37 +87,44 @@ pub fn init_endpoints(
.map(into_message);
let get_personal_jobs = path("get_personal_jobs")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::get_personal_jobs)
.map(into_message);
let del = path("del")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::del)
.map(ok);
let set_jobs = path("set_jobs")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<String>>())
.and_then(Endpoints::set_jobs)
.map(into_message);
let report = path("report")
.and(with_db.clone())
.and(get_content::<Vec<Reportable>>())
.and_then(Endpoints::report)
.map(ok);
let update_agent = path("update_agent")
.and(with_db.clone())
.and(get_content::<Agent>())
.and_then(Endpoints::update_agent)
.map(ok);
let update_job = path("update_job")
.and(with_db.clone())
.and(get_content::<JobMeta>())
.and_then(Endpoints::update_job)
.map(ok);
let update_assigned_job = path("update_result")
.and(with_db.clone())
.and(get_content::<AssignedJob>())
.and_then(Endpoints::update_assigned_job)
.map(ok);
@ -137,7 +140,7 @@ pub fn init_endpoints(
let auth_header = warp::header::exact("authorization", Box::leak(auth_token));
let auth_zone = (get_agents
.or(get_jobs)
.or(get_jobs.clone())
.or(upload_jobs)
.or(del)
.or(set_jobs)
@ -152,27 +155,33 @@ pub fn init_endpoints(
auth_zone.or(agent_zone)
}
pub fn preload_jobs() -> Result<(), ServerError> {
pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> {
repo.interact(|mut db| {
let job_alias = "agent_hello";
let if_job_exists = UDB::lock_db().find_job_by_alias(job_alias)?;
let if_job_exists = db.find_job_by_alias(job_alias)?;
if if_job_exists.is_none() {
let agent_hello = JobMeta::builder()
.with_type(JobType::Init)
.with_alias(job_alias)
.build()
.unwrap();
UDB::lock_db().insert_jobs(&[agent_hello])?;
db.insert_jobs(&[agent_hello])?;
}
Ok(())
})
.await
}
pub async fn serve() -> Result<(), ServerError> {
init_logger(Some("u_server"));
preload_jobs()?;
let env = config::db::load().unwrap();
let pool = async_pool(&env);
let db = PgRepo::new(pool);
preload_jobs(&db).await?;
let certs_dir = PathBuf::from("certs");
let env = load_env::<ServEnv>().map_err(|e| ServerError::Other(e.to_string()))?;
let routes = init_endpoints(&env.admin_auth_token)
let env = config::admin::load().map_err(|e| ServerError::Other(e.to_string()))?;
let routes = init_endpoints(&env.admin_auth_token, db)
.recover(handle_rejection)
.with(custom(logger));
@ -181,7 +190,7 @@ pub async fn serve() -> Result<(), ServerError> {
.cert_path(certs_dir.join("server.crt"))
.key_path(certs_dir.join("server.key"))
.client_auth_required_path(certs_dir.join("ca.crt"))
.run(([0, 0, 0, 0], MASTER_PORT))
.run(([0, 0, 0, 0], config::MASTER_PORT))
.await;
Ok(())
}

@ -0,0 +1,53 @@
version: "3.4"
networks:
u_net:
services:
u_server:
image: localhost/unki/u_server
networks:
- u_net
volumes:
- ./u_server:/unki/u_server
- ./certs:/unki/certs
- ./logs:/unki/logs:rw
working_dir: /unki
command: /unki/u_server
depends_on:
u_db:
condition: service_healthy
ports:
- 63714:63714
env_file:
- ./.env
- ./.env.private
environment:
RUST_LOG: warp=info,u_server_lib=debug
healthcheck:
test: ss -tlpn | grep 63714
interval: 5s
timeout: 2s
retries: 2
u_db:
image: localhost/unki/u_db
networks:
- u_net
env_file:
- ./.env
- ./.env.private
volumes:
- ./migrator:/migrator
- ./data:/var/lib/postgresql/data
- type: bind
source: ./u_db_entrypoint.sh
target: /u_db_entrypoint.sh
command: /u_db_entrypoint.sh
healthcheck:
# test if db's port is open and db is created
test: ss -tlpn | grep 5432 && psql -lqt -U $${POSTGRES_USER} | grep -qw $${POSTGRES_DATABASE}
interval: 5s
timeout: 5s
retries: 3

@ -0,0 +1,9 @@
#!/bin/bash
export DOCKER_UID=$(id -u)
export DOCKER_GID=$(id -g)
docker build -t localhost/unki/u_db -f u_db.Dockerfile .
docker build -t localhost/unki/u_server -f u_server.Dockerfile .
podman-compose down -v
podman-compose up -d

@ -1,17 +0,0 @@
FROM postgres:14.5
ENV DEBIAN_FRONTEND=noninteractive
RUN apt update && apt upgrade -y
RUN apt install -y curl build-essential libpq-dev iproute2
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable --profile minimal
ENV PATH /root/.cargo/bin:$PATH
RUN rustup target add x86_64-unknown-linux-musl
RUN cargo install diesel_cli --no-default-features --features postgres
RUN mkdir -p /unki
ENV LC_ALL en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US.UTF-8
RUN apt install -y locales locales-all
COPY u_db_entrypoint.sh /unki/

@ -1,7 +0,0 @@
set -m
export DATABASE_URL=postgres://${DB_USER}:${DB_PASSWORD}@127.0.0.1/${DB_NAME}
touch /unki/Cargo.toml
/usr/local/bin/docker-entrypoint.sh postgres &
sleep 10 && diesel setup && diesel migration run
[[ $1 == "svc" ]] && fg %1

@ -2,4 +2,4 @@ FROM rust:1.64
RUN rustup target add x86_64-unknown-linux-musl
RUN mkdir -p /tests && chmod 777 /tests
CMD ["sleep", "3600"]
ENTRYPOINT ["sleep", "3600"]

@ -0,0 +1,8 @@
FROM postgres:14.5
RUN apt update && apt upgrade -y
ENV LC_ALL en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US.UTF-8
RUN apt install -y locales locales-all iproute2

@ -11,7 +11,7 @@ services:
u_server:
user: *user
image: unki/u_server
image: localhost/unki/u_server
networks:
- u_net
volumes:
@ -37,7 +37,7 @@ services:
retries: 2
u_db:
image: unki/u_db
image: localhost/unki/u_db
networks:
- u_net
ports:
@ -45,27 +45,29 @@ services:
env_file:
- ../.env
- ../.env.private
working_dir: /unki
volumes:
- ../migrations:/unki/migrations
command: /unki/u_db_entrypoint.sh svc
- ../target/x86_64-unknown-linux-musl/${PROFILE:-debug}/migrator:/migrator
- type: bind
source: ../scripts/u_db_entrypoint.sh
target: /u_db_entrypoint.sh
command: /u_db_entrypoint.sh
healthcheck:
# test if db's port is open and db is created
test: ss -tlpn | grep 5432 && psql -lqt -U $${DB_USER} | grep -qw $${DB_NAME}
test: ss -tlpn | grep 5432 && psql -lqt -U $${POSTGRES_USER} | grep -qw $${POSTGRES_DATABASE}
interval: 5s
timeout: 5s
retries: 3
u_agent:
user: *user
image: unki/u_agent
image: localhost/unki/u_agent
networks:
- u_net
volumes:
- ../target/x86_64-unknown-linux-musl/${PROFILE:-debug}/u_agent:/unki/u_agent
- ../logs:/unki/logs:rw
working_dir: /unki
command: /unki/u_agent u_server
command: bash -c "/unki/u_agent u_server && sleep 3600"
env_file:
- ../.env
environment:
@ -76,7 +78,7 @@ services:
tests_runner:
user: *user
image: unki/tests_runner
image: localhost/unki/tests_runner
networks:
- u_net
volumes:

@ -3,7 +3,7 @@ import shlex
from utils import *
BASE_IMAGE_DIR = '../images/integration-tests'
BASE_IMAGE_DIR = '../images/'
DOCKERFILES = [
{
@ -68,7 +68,7 @@ def rebuild_images_if_needed(force_rebuild=False):
ctx = img['ctx']
name = img.get('name')
df_suffix = 'Dockerfile'
img_name = f'unki/{name}'
img_name = f'localhost/unki/{name}'
log(f'Building docker image {img_name}')
cmd = [
'build',

@ -3,6 +3,6 @@ pub mod panel;
pub use panel::Panel;
use once_cell::sync::Lazy;
use u_lib::utils::{env::DefaultEnv, load_env_default};
use u_lib::config::endpoints::{load, EndpointsEnv};
pub static ENV: Lazy<DefaultEnv> = Lazy::new(|| load_env_default().unwrap());
pub static ENV: Lazy<EndpointsEnv> = Lazy::new(|| load().unwrap());

@ -15,7 +15,7 @@ async fn test_non_auth_connection_dropped() {
Err(e) => {
let err = e.to_string();
println!("captured err: {err}");
assert!(err.contains("certificate required"));
assert!(err.contains("certificate required") || err.contains("channel closed"));
}
_ => panic!("no error occured on foreign client connection"),
}

@ -9,8 +9,9 @@ edition = "2021"
[dependencies]
anyhow = { workspace = true }
chrono = "0.4.19"
diesel = { version = "1.4.5", features = ["postgres", "uuid"], optional = true }
diesel-derive-enum = { version = "1", features = ["postgres"], optional = true }
diesel = { workspace = true, optional = true }
diesel-derive-enum = { version = "2.0.0-rc.0", features = ["postgres"], optional = true }
deadpool-diesel = { workspace = true, optional = true }
dotenv = "0.15.0"
envy = "0.4.2"
futures = "0.3.5"
@ -34,7 +35,7 @@ uuid = { workspace = true, features = ["serde", "v4"] }
[features]
panel = []
server = ["dep:diesel", "dep:diesel-derive-enum"]
server = ["dep:diesel", "dep:diesel-derive-enum", "dep:deadpool-diesel"]
[dev-dependencies]
rstest = "0.12"

@ -1,13 +0,0 @@
use lazy_static::lazy_static;
use uuid::Uuid;
pub const MASTER_PORT: u16 = 63714;
lazy_static! {
static ref UID: Uuid = Uuid::new_v4();
}
#[inline]
pub fn get_self_uid() -> Uuid {
*UID
}

@ -0,0 +1,75 @@
use envy::{from_env, prefixed, Result as EnvResult};
use lazy_static::lazy_static;
use serde::Deserialize;
use uuid::Uuid;
pub const MASTER_PORT: u16 = 63714;
lazy_static! {
static ref UID: Uuid = Uuid::new_v4();
}
#[inline]
pub fn get_self_uid() -> Uuid {
*UID
}
pub mod endpoints {
use super::*;
#[derive(Deserialize)]
pub struct EndpointsEnv {
#[serde(default = "default_host")]
pub u_server: String,
}
pub fn load() -> EnvResult<EndpointsEnv> {
dot();
from_env()
}
}
pub mod db {
use super::*;
#[derive(Deserialize)]
pub struct DBEnv {
pub database: String,
pub host: String,
pub user: String,
pub password: String,
pub port: u16,
}
pub fn load() -> EnvResult<DBEnv> {
dot();
prefixed("POSTGRES_").from_env()
}
}
pub mod admin {
use super::*;
#[derive(Deserialize)]
pub struct AccessEnv {
pub admin_auth_token: String,
#[serde(default = "default_host")]
pub u_server: String,
}
pub fn load() -> EnvResult<AccessEnv> {
dot();
from_env()
}
}
fn dot() {
let envs = [".env", ".env.private"];
for envfile in &envs {
dotenv::from_filename(envfile).ok();
}
}
pub fn default_host() -> String {
"ortem.xyz".to_string()
}

@ -0,0 +1,26 @@
use deadpool_diesel::{Manager as DManager, Pool as DPool, Runtime};
use diesel::pg::PgConnection;
use std::time::Duration;
use crate::config::db::DBEnv;
pub type PgAsyncPool = DPool<DManager<PgConnection>>;
pub fn generate_postgres_url(config: &DBEnv) -> String {
format!(
"postgres://{}:{}@{}:{}/{}",
config.user, config.password, config.host, config.port, config.database
)
}
pub fn async_pool(config: &DBEnv) -> PgAsyncPool {
let db_url = generate_postgres_url(config);
let manager = DManager::new(db_url, Runtime::Tokio1);
DPool::builder(manager)
.max_size(8)
.wait_timeout(Some(Duration::from_secs(5 * 60)))
.runtime(Runtime::Tokio1)
.build()
.unwrap()
}

@ -1,5 +1,73 @@
mod chan;
mod variants;
pub use chan::*;
pub use variants::*;
use reqwest::Error as ReqError;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
pub type UResult<T> = std::result::Result<T, UError>;
#[derive(PartialEq, Error, Debug, Serialize, Deserialize, Clone)]
pub enum UError {
#[error("Runtime error: {0}")]
Runtime(String),
#[error("Connection error: {0}. Body: {1}")]
NetError(String, String),
#[error("Parse error")]
ParseError,
#[error("Job error: {0}")]
JobError(String),
#[error("Argument parsing failed: {0}")]
JobArgsError(String),
#[error("Job is uncompleted yet")]
JobUncompleted,
#[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")]
InsuitablePlatform(String, String),
#[error("Job {0} doesn't exist")]
NoJob(Uuid),
#[error("FS error while processing {0}: {1}")]
FSError(String, String),
#[error("Wrong auth token")]
WrongToken,
#[error("Panicked: {0}")]
Panic(String),
#[error("Panel error: {0}")]
PanelError(String),
#[error("Deserialize from json error: {0}")]
DeserializeError(String),
}
impl From<ReqError> for UError {
fn from(e: ReqError) -> Self {
UError::NetError(e.to_string(), String::new())
}
}
impl From<serde_json::Error> for UError {
fn from(e: serde_json::Error) -> Self {
UError::DeserializeError(e.to_string())
}
}
impl From<anyhow::Error> for UError {
fn from(e: anyhow::Error) -> Self {
match e.downcast::<UError>() {
Ok(err) => err,
Err(err) => UError::Runtime(err.to_string()),
}
}
}

@ -1,71 +0,0 @@
#[cfg(not(target_arch = "wasm32"))]
use reqwest::Error as ReqError;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
pub type UResult<T> = std::result::Result<T, UError>;
#[derive(PartialEq, Eq, Error, Debug, Serialize, Deserialize, Clone)]
pub enum UError {
#[error("Runtime error: {0}")]
Runtime(String),
#[error("Connection error: {0}. Body: {1}")]
NetError(String, String),
#[error("Parse error")]
ParseError,
#[error("Job error: {0}")]
JobError(String),
#[error("Argument parsing failed: {0}")]
JobArgsError(String),
#[error("Job is uncompleted yet")]
JobUncompleted,
#[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")]
InsuitablePlatform(String, String),
#[error("Job {0} doesn't exist")]
NoJob(Uuid),
#[error("FS error while processing {0}: {1}")]
FSError(String, String),
#[error("Wrong auth token")]
WrongToken,
#[error("Panicked: {0}")]
Panic(String),
#[error("Panel error: {0}")]
PanelError(String),
#[error("Deserialize from json error: {0}")]
DeserializeError(String),
}
#[cfg(not(target_arch = "wasm32"))]
impl From<ReqError> for UError {
fn from(e: ReqError) -> Self {
UError::NetError(e.to_string(), String::new())
}
}
impl From<serde_json::Error> for UError {
fn from(e: serde_json::Error) -> Self {
UError::DeserializeError(e.to_string())
}
}
impl From<anyhow::Error> for UError {
fn from(e: anyhow::Error) -> Self {
match e.downcast::<UError>() {
Ok(err) => err,
Err(err) => UError::Runtime(err.to_string()),
}
}
}

@ -1,11 +1,10 @@
#![allow(non_upper_case_globals)]
#[cfg(not(target_arch = "wasm32"))]
#[path = "."]
pub mod exports {
pub mod api;
pub mod cache;
pub mod config;
pub mod datatypes;
#[cfg(feature = "server")]
pub mod db;
pub mod errors;
pub mod executor;
pub mod logging;
@ -13,24 +12,11 @@ pub mod exports {
pub mod models;
pub mod runner;
pub mod utils;
}
#[cfg(target_arch = "wasm32")]
#[path = "."]
pub mod exports {
pub mod config;
pub mod errors;
pub mod messaging;
pub mod models;
pub mod utils;
}
pub use errors::{UError, UResult};
pub use exports::*;
#[cfg(feature = "server")]
pub mod schema_exports {
pub use crate::models::{Agentstate, Jobstate, Jobtype};
pub use diesel::sql_types::*;
}

@ -1,13 +1,15 @@
#[cfg(feature = "server")]
use diesel::{AsChangeset, Identifiable, Insertable, Queryable};
#[cfg(feature = "server")]
use diesel_derive_enum::DbEnum;
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
use strum::Display;
#[cfg(feature = "server")]
use crate::models::schema::*;
mod server {
pub use crate::models::schema::*;
pub use diesel::{AsChangeset, Identifiable, Insertable, Queryable};
pub use diesel_derive_enum::DbEnum;
}
#[cfg(feature = "server")]
use self::server::*;
use crate::{config::get_self_uid, executor::ExecResult, runner::NamedJobRunner, utils::Platform};
@ -17,8 +19,7 @@ use uuid::Uuid;
#[cfg_attr(
feature = "server",
derive(DbEnum),
PgType = "AgentState",
DieselType = "Agentstate"
DieselTypePath = "sql_types::Agentstate"
)]
pub enum AgentState {
New,
@ -31,7 +32,7 @@ pub enum AgentState {
#[cfg_attr(
feature = "server",
derive(Identifiable, Queryable, Insertable, AsChangeset),
table_name = "agents"
diesel(table_name = agents)
)]
pub struct Agent {
pub alias: Option<String>,
@ -50,7 +51,6 @@ pub struct Agent {
pub username: String,
}
#[cfg(not(target_arch = "wasm32"))]
impl Agent {
pub fn with_id(uid: Uuid) -> Self {
Self {

@ -1,5 +1,4 @@
use super::{JobMeta, JobState, JobType};
#[cfg(not(target_arch = "wasm32"))]
use crate::config::get_self_uid;
#[cfg(feature = "server")]
use crate::models::schema::*;
@ -13,7 +12,7 @@ use uuid::Uuid;
#[cfg_attr(
feature = "server",
derive(Queryable, Identifiable, Insertable, AsChangeset),
table_name = "results"
diesel(table_name = results)
)]
pub struct AssignedJob {
pub agent_id: Uuid,

@ -13,7 +13,7 @@ use uuid::Uuid;
#[cfg_attr(
feature = "server",
derive(Queryable, Identifiable, Insertable, AsChangeset),
table_name = "jobs"
diesel(table_name = jobs)
)]
pub struct JobMeta {
#[serde(default)]

@ -1,14 +1,19 @@
#[cfg(feature = "server")]
use diesel_derive_enum::DbEnum;
use serde::{Deserialize, Serialize};
use strum::Display;
#[cfg(feature = "server")]
mod server {
pub use crate::models::schema::*;
pub use diesel_derive_enum::DbEnum;
}
#[cfg(feature = "server")]
use self::server::*;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)]
#[cfg_attr(
feature = "server",
derive(DbEnum),
PgType = "JobState",
DieselType = "Jobstate"
DieselTypePath = "sql_types::Jobstate"
)]
pub enum JobState {
/// server created a job, but client didn't get it yet
@ -25,8 +30,7 @@ pub enum JobState {
#[cfg_attr(
feature = "server",
derive(DbEnum),
PgType = "JobType",
DieselType = "Jobtype"
DieselTypePath = "sql_types::Jobtype"
)]
pub enum JobType {
Init,

@ -1,7 +1,22 @@
// @generated automatically by Diesel CLI.
pub mod sql_types {
#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "agentstate"))]
pub struct Agentstate;
#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "jobstate"))]
pub struct Jobstate;
#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "jobtype"))]
pub struct Jobtype;
}
diesel::table! {
use crate::schema_exports::*;
use super::sql_types::Agentstate;
agents (id) {
alias -> Nullable<Text>,
@ -33,6 +48,7 @@ diesel::table! {
diesel::table! {
use crate::schema_exports::*;
use super::sql_types::Jobtype;
jobs (id) {
alias -> Nullable<Text>,
@ -48,6 +64,8 @@ diesel::table! {
diesel::table! {
use crate::schema_exports::*;
use super::sql_types::Jobstate;
use super::sql_types::Jobtype;
results (id) {
agent_id -> Uuid,
@ -67,4 +85,9 @@ diesel::joinable!(certificates -> agents (agent_id));
diesel::joinable!(results -> agents (agent_id));
diesel::joinable!(results -> jobs (job_id));
diesel::allow_tables_to_appear_in_same_query!(agents, certificates, jobs, results,);
diesel::allow_tables_to_appear_in_same_query!(
agents,
certificates,
jobs,
results,
);

@ -1,29 +0,0 @@
use envy::{from_env, Result as EnvResult};
use serde::{de::DeserializeOwned, Deserialize};
#[derive(Deserialize)]
pub struct DefaultEnv {
#[serde(default = "default_host")]
pub u_server: String,
}
pub fn load_env<E: DeserializeOwned>() -> EnvResult<E> {
dot();
from_env()
}
pub fn load_env_default() -> EnvResult<DefaultEnv> {
dot();
from_env()
}
fn dot() {
let envs = [".env", ".env.private"];
for envfile in &envs {
dotenv::from_filename(envfile).ok();
}
}
pub fn default_host() -> String {
"ortem.xyz".to_string()
}

@ -1,24 +0,0 @@
use std::fmt;
pub struct Hexlify<'b>(pub &'b [u8]);
impl<'a> fmt::LowerHex for Hexlify<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0.iter() {
write!(f, "{:02x}", byte)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hexlify() {
let data = b"\x5a\x6b\x23\x4f\xa3\x7f\x9e";
let result = "5a6b234fa37f9e";
assert_eq!(format!("{:x}", Hexlify(data)), result);
}
}

@ -1,5 +0,0 @@
mod hexlify;
mod stripped;
pub use hexlify::*;
pub use stripped::*;

@ -1,80 +0,0 @@
use std::fmt;
use std::iter::Iterator;
use std::slice::Iter as SliceIter;
use std::str::Chars;
const MAX_DATA_LEN: usize = 2000;
pub trait Strippable {
type Item: fmt::Display;
type TypeIter: Iterator<Item = Self::Item>;
fn length(&self) -> usize;
fn iterator(&self) -> Self::TypeIter;
}
impl<'a> Strippable for &'a str {
type Item = char;
type TypeIter = Chars<'a>;
fn length(&self) -> usize {
self.len()
}
fn iterator(&self) -> Self::TypeIter {
self.chars()
}
}
impl<'a> Strippable for &'a Vec<u8> {
type Item = &'a u8;
type TypeIter = SliceIter<'a, u8>;
fn length(&self) -> usize {
self.len()
}
fn iterator(&self) -> Self::TypeIter {
self.iter()
}
}
pub struct Stripped<'i, Inner: Strippable + 'i>(pub &'i Inner);
impl<'i, Inner: Strippable + 'i> Stripped<'i, Inner> {
fn iter(&self) -> Inner::TypeIter {
self.0.iterator()
}
fn placeholder(&self) -> &'static str {
if self.0.length() >= MAX_DATA_LEN {
" <...>"
} else {
""
}
}
}
impl<'i, Inner: Strippable + 'i> fmt::Display for Stripped<'i, Inner> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let placeholder = self.placeholder();
for c in self.iter().take(MAX_DATA_LEN - placeholder.len()) {
write!(f, "{}", c)?;
}
write!(f, "{}", placeholder)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::*;
#[rstest]
#[case("abc", 3)]
#[case("abcde".repeat(50), MAX_DATA_LEN)]
fn test_strip(#[case] input: impl Into<String>, #[case] result_len: usize) {
let s = input.into();
assert_eq!(Stripped(&s.as_str()).to_string().len(), result_len);
}
}

@ -1,25 +1,19 @@
pub mod combined_result;
pub mod conv;
pub mod env;
pub mod fmt;
pub mod misc;
pub mod platform;
pub mod proc_output;
pub mod storage;
#[cfg(not(target_arch = "wasm32"))]
pub mod tempfile;
#[cfg(unix)]
pub mod unix;
pub use combined_result::*;
pub use conv::*;
pub use env::{load_env, load_env_default};
pub use fmt::*;
pub use misc::*;
pub use platform::*;
pub use proc_output::*;
pub use storage::*;
#[cfg(not(target_arch = "wasm32"))]
pub use tempfile::*;
#[cfg(unix)]

@ -14,7 +14,7 @@ impl TempFile {
}
pub fn new() -> Self {
let name = Uuid::simple(&Uuid::new_v4()).to_string();
let name = Uuid::new_v4().simple().to_string();
let mut path = temp_dir();
path.push(name);
Self { path }

@ -1,4 +1,3 @@
# remove '.sample' to activate
ADMIN_AUTH_TOKEN=
DB_PASSWORD=
POSTGRES_PASSWORD=${DB_PASSWORD}
POSTGRES_PASSWORD=

@ -7,14 +7,14 @@ REMOTE_DIR=/srv/usrv
REMOTE_PATH=$SERVER:$REMOTE_DIR
RSYNC="rsync -arzh --progress"
ssh $SERVER mkdir -p $REMOTE_DIR/{release,deploy}
$RSYNC $ROOTDIR/target/x86_64-unknown-linux-musl/release/u_server $REMOTE_PATH/release/u_server
$RSYNC $ROOTDIR/certs/server.{crt,key} $REMOTE_PATH/certs
$RSYNC $ROOTDIR/certs/ca.crt $REMOTE_PATH/certs
$RSYNC $ROOTDIR/migrations/ $REMOTE_PATH/migrations
ssh $SERVER mkdir -p $REMOTE_DIR/data
$RSYNC $ROOTDIR/target/x86_64-unknown-linux-musl/release/{u_server,migrator} $REMOTE_PATH/
$RSYNC $ROOTDIR/certs/server.{crt,key} $REMOTE_PATH/certs/
$RSYNC $ROOTDIR/certs/ca.crt $REMOTE_PATH/certs/
$RSYNC $ROOTDIR/.env* $REMOTE_PATH/
$RSYNC $ROOTDIR/integration/docker-compose.yml $REMOTE_PATH/deploy/
$RSYNC $ROOTDIR/images/integration-tests/u_db* $REMOTE_PATH/deploy/
$RSYNC $ROOTDIR/images/integration-tests/u_server.Dockerfile $REMOTE_PATH/deploy/
$RSYNC $ROOTDIR/scripts/start_server.sh $REMOTE_PATH/start_server.sh
ssh $SERVER "cd $REMOTE_DIR/deploy && ./start_server.sh"
$RSYNC $ROOTDIR/deploy/* $REMOTE_PATH/
$RSYNC $ROOTDIR/images/{u_server,u_db}.Dockerfile $REMOTE_PATH/
$RSYNC $ROOTDIR/scripts/u_db_entrypoint.sh $REMOTE_PATH/
ssh $SERVER "cd $REMOTE_DIR/ && sudo ./start_server.sh"

@ -0,0 +1,20 @@
#!/bin/bash
set -e
. ./.env
. ./.env.private
export DATABASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@127.0.0.1/${POSTGRES_DATABASE}
IMG_NAME=pg-schema
docker run --rm -d \
--env-file=$PWD/.env \
--env-file=$PWD/.env.private \
--name $IMG_NAME \
-p 5432:5432 \
postgres:14.5
timeout 10s grep -q 'system is ready to accept connections' <(docker logs --follow $IMG_NAME)
sleep 0.5
diesel setup && diesel migration run
docker stop $IMG_NAME

@ -1,5 +0,0 @@
#!/bin/bash
docker build -t unki/u_db -f u_db.Dockerfile .
docker build -t unki/u_server -f u_server.Dockerfile .
docker-compose down
docker-compose up -d u_server

@ -0,0 +1,6 @@
set -m
export DATABASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@127.0.0.1/${POSTGRES_DATABASE}
/usr/local/bin/docker-entrypoint.sh postgres &
sleep 8 && /migrator up
fg %1
Loading…
Cancel
Save