i suddenly fucked up when adding db

4-update-check
plazmoid 4 years ago
parent 0ea7dc0683
commit 1497af9c39
  1. 5
      Cargo.toml
  2. 29
      README.md
  3. 6
      bin/u_agent/src/main.rs
  4. 1
      bin/u_server/.env
  5. 36
      bin/u_server/Cargo.toml
  6. 5
      bin/u_server/diesel.toml
  7. 59
      bin/u_server/migrations/2020-10-24-111622_create_all/up.sql
  8. 42
      bin/u_server/src/db/db.rs
  9. 6
      bin/u_server/src/db/mod.rs
  10. 43
      bin/u_server/src/db/models.rs
  11. 73
      bin/u_server/src/db/schema.rs
  12. 3
      bin/u_server/src/errors.rs
  13. 55
      bin/u_server/src/handlers.rs
  14. 53
      bin/u_server/src/main.rs
  15. 1
      lib/u_lib/Cargo.toml
  16. 81
      lib/u_lib/src/client/client.rs
  17. 4
      lib/u_lib/src/client/mod.rs
  18. 3
      lib/u_lib/src/config.rs
  19. 78
      lib/u_lib/src/contracts/agent.rs
  20. 77
      lib/u_lib/src/contracts/datatypes.rs
  21. 29
      lib/u_lib/src/contracts/jobs.rs
  22. 40
      lib/u_lib/src/contracts/messaging.rs
  23. 7
      lib/u_lib/src/contracts/mod.rs
  24. 2
      lib/u_lib/src/errors.rs
  25. 2
      lib/u_lib/src/lib.rs
  26. 20
      lib/u_lib/src/network.rs
  27. 7
      lib/u_lib/src/utils.rs

@ -1,6 +1,9 @@
[workspace]
members = [
"bin/*",
"bin/u_agent",
"bin/u_panel",
"bin/u_run",
"bin/u_server",
"lib/u_lib"
]

@ -0,0 +1,29 @@
#UNKI
Контролируем собственные устройства (компы, ноут, телефон, ящики) через веб-интерфейс сервера,
к которому подключаются разбросанные по устройствам агенты.
Ничто не должно нарушать работоспособность и коммуникацию агентов с сервером,
поэтому подключение должно быть защищено от прослушки, модификации.
##Установка агента на устройство (u_run)
Утилита u_run осуществляет первичную сборку инфы о платформе, скачивание агента,
его установку и запуск. Также она
Для каждого устройства компилируется собственная версия агента в зависимости от ОС, процессора, битности и т.д..
В момент компиляции в агент встраивается сгенерированный уникальный сертификат, по которому будет происходить общение.
Исполняемый код шифруется блочным шифром (непопулярным), ключ получает при запуске и подключении к серверу.
##Взаимодействие (u_agent)
Агент висит в памяти в виде демона/сервиса/модуля ядра, запуск производится во время старта системы.
Раз в 5 секунд агент пингует сервер, показывая свою жизнеспособность,
а также запрашивая выставленные инструкции:
* скачать новый список джоб
* отправить результаты текущих джоб
## Веб-интерфейс (u_panel)
Панель управления для обменистрирования.
Представляет собой u_agent с веб-сервером, транслирующим команды u_server-у.
Запускается на localhost

@ -13,7 +13,7 @@ use {
std::env,
u_lib::{
client::{
ClientInfo,
Agent,
network::ClientHandler
},
contracts::*,
@ -26,7 +26,7 @@ async fn main() {
//daemonize();
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip);
let cli_info = ClientInfo::gather();
let cli_info = Agent::gather();
retry_until_ok!(instance.init(&cli_info).await);
loop {
let jobs = retry_until_ok!(instance.get_jobs().await);
@ -37,7 +37,7 @@ async fn main() {
.collect()
).await;
retry_until_ok!(instance.report(
IterWrap(result.into_iter().map(|r| r.unwrap()).collect())
ItemWrap(result.into_iter().map(|r| r.unwrap()).collect())
).await)
}
sleep(Duration::from_secs(2));

@ -0,0 +1 @@
DATABASE_URL=./u_server.db

@ -1,16 +1,32 @@
[package]
name = "u_server"
version = "0.1.0"
authors = ["plazmoid <kronos44@mail.ru>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
name = "u_server"
version = "0.1.0"
[dependencies]
warp = "0.2.4"
serde = { version = "1.0.114", features = ["derive"] }
tokio = { version = "0.2.22", features = ["macros"] }
log = "0.4.11"
dotenv = "0.15.0"
env_logger = "0.7.1"
uuid = "0.8.1"
u_lib = { version = "*", path = "../../lib/u_lib" }
log = "0.4.11"
anyhow = "*"
warp = "0.2.4"
[dependencies.diesel]
features = ["sqlite", "uuid"]
version = "1.4.5"
[dependencies.uuid]
features = ["serde", "v4"]
version = "*"
[dependencies.serde]
features = ["derive"]
version = "1.0.114"
[dependencies.tokio]
features = ["macros"]
version = "0.2.22"
[dependencies.u_lib]
path = "../../lib/u_lib"
version = "*"

@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/db/schema.rs"

@ -0,0 +1,59 @@
CREATE TABLE IF NOT EXISTS agents (
alias TEXT
, agent_id TEXT NOT NULL UNIQUE
, hostname TEXT NOT NULL
, id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL
, is_root BOOLEAN NOT NULL DEFAULT 0
, is_root_allowed BOOLEAN NOT NULL DEFAULT 0
, last_active TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
-- target triplet
, platform TEXT NOT NULL
, regtime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, status TEXT
-- is needed to processing requests
, token TEXT
, username TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS ip_addrs (
agent_id INTEGER NOT NULL
, check_ts DATETIME NOT NULL
, gateway TEXT
, id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL
, iface TEXT NOT NULL
, ip_addr TEXT NOT NULL
, is_gray BOOLEAN NOT NULL DEFAULT 1
, netmask TEXT NOT NULL
, FOREIGN KEY(agent_id) REFERENCES agents(id)
);
CREATE TABLE IF NOT EXISTS jobs (
alias TEXT
, id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL
-- Shell, Binary (with program download), Python (with program and python download if not exist), Management
, job_type TEXT CHECK(job_type IN ('S','B','P','M')) NOT NULL DEFAULT 'S'
-- Executable type: ALL - no matter, W - windows, L = linux
, exec_type TEXT CHECK(exec_type IN ('ALL', 'W', 'L')) NOT NULL DEFAULT 'L'
, platform TEXT CHECK(platform IN ('x86', 'x64', 'aarch32', 'aarch64'))
, data BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS results (
agent_id INTEGER NOT NULL
, created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL
, job_id INTEGER NOT NULL
, result BLOB
-- Queued, Pending, Running, Finished
, status TEXT CHECK(status IN ('Q', 'P', 'R', 'F'))
, ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, FOREIGN KEY(agent_id) REFERENCES agents(id)
, FOREIGN KEY(job_id) REFERENCES jobs(id)
);
CREATE TABLE IF NOT EXISTS certificates (
agent_id INTEGER NOT NULL
, id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL
, is_revoked BOOLEAN NOT NULL DEFAULT FALSE
, FOREIGN KEY(agent_id) REFERENCES agents(id)
);

@ -0,0 +1,42 @@
use diesel::{
sqlite::SqliteConnection,
prelude::*
};
use dotenv::dotenv;
use std::{
env,
sync::{Arc, Mutex}
};
use crate::{
errors::USrvResult,
db::IAgent
};
use super::schema;
pub type Storage = Arc<Mutex<UDB>>;
pub struct UDB {
conn: SqliteConnection
}
impl UDB {
pub fn new() -> USrvResult<Storage> {
dotenv()?;
let db_path = env::var("DATABASE_URL")?;
let conn = SqliteConnection::establish(&db_path)?;
conn.execute("PRAGMA foreign_keys = ON;")?;
let instance = UDB {
conn
};
Ok(Arc::new(Mutex::new(instance)))
}
pub fn new_agent(&self, agent: IAgent) -> USrvResult<()> {
use schema::agents;
diesel::insert_into(agents::table)
.values(agent)
.execute(&self.conn)?;
Ok(())
}
}

@ -0,0 +1,6 @@
pub mod db;
mod schema;
mod models;
pub use db::*;
pub use models::*;

@ -0,0 +1,43 @@
use diesel::{
Insertable,
Queryable,
Identifiable
};
use serde::{
Deserialize,
Serialize
};
use u_lib::Uid;
use std::time::SystemTime;
use crate::db::schema::*;
//belongs_to
#[derive(Identifiable, Queryable, Serialize)]
#[table_name = "agents"]
pub struct QAgent {
pub alias: Option<String>,
pub agent_id: Uid,
pub hostname: String,
pub id: i32,
pub is_root: bool,
pub is_root_allowed: bool,
pub last_active: SystemTime,
pub platform: String,
pub regtime: SystemTime,
pub status: Option<String>,
pub token: Option<String>,
pub username: String
}
#[derive(Insertable, Deserialize, Clone)]
#[table_name = "agents"]
pub struct IAgent {
pub agent_id: Uid,
pub hostname: String,
pub is_root: bool,
pub is_root_allowed: bool,
pub platform: String,
pub status: Option<String>,
pub token: Option<String>,
pub username: String
}

@ -0,0 +1,73 @@
table! {
agents (id) {
alias -> Nullable<Text>,
agent_id -> Text,
hostname -> Text,
id -> Integer,
is_root -> Bool,
is_root_allowed -> Bool,
last_active -> Timestamp,
platform -> Text,
regtime -> Timestamp,
status -> Nullable<Text>,
token -> Nullable<Text>,
username -> Text,
}
}
table! {
certificates (id) {
agent_id -> Integer,
id -> Integer,
is_revoked -> Bool,
}
}
table! {
ip_addrs (id) {
agent_id -> Integer,
check_ts -> Timestamp,
gateway -> Nullable<Text>,
id -> Integer,
iface -> Text,
ip_addr -> Text,
is_gray -> Bool,
netmask -> Text,
}
}
table! {
jobs (id) {
alias -> Nullable<Text>,
id -> Integer,
job_type -> Text,
exec_type -> Text,
platform -> Nullable<Text>,
data -> Binary,
}
}
table! {
results (id) {
agent_id -> Integer,
created -> Timestamp,
id -> Integer,
job_id -> Integer,
result -> Nullable<Binary>,
status -> Nullable<Text>,
ts -> Timestamp,
}
}
joinable!(certificates -> agents (agent_id));
joinable!(ip_addrs -> agents (agent_id));
joinable!(results -> agents (agent_id));
joinable!(results -> jobs (job_id));
allow_tables_to_appear_in_same_query!(
agents,
certificates,
ip_addrs,
jobs,
results,
);

@ -0,0 +1,3 @@
use anyhow::Result as AnyResult;
pub type USrvResult<T> = AnyResult<T>;

@ -5,10 +5,30 @@ use warp::{
reply::with_status,
http::StatusCode
};
use uuid::Uuid;
use crate::db::{
Storage,
IAgent
};
pub async fn add_agent(
msg: Message<'_, IAgent>,
db: Storage) -> Result<impl Reply, Rejection>
{
let agent = msg.item;
let result = db.lock().unwrap().new_agent(agent.into_owned());
match result {
Ok(_) => Ok(warp::reply::json(
&RawMsg("Added".to_string()).as_message()
)),
Err(e) => Ok(warp::reply::json( //TODO: rejection
&RawMsg("Already exist".to_string()).as_message()
))
}
}
/*
pub async fn report(
msg: Message<'_, IterWrap<Vec<JobResult>>>,
msg: Payload<Vec<JobResult>>,
db: Storage) -> Result<impl Reply, Rejection>
{
let results = msg.item.into_inner();
@ -35,27 +55,6 @@ pub async fn get_job_results(
}
}
pub async fn add_client(
msg: Message<'_, ClientInfo>,
db: Storage) -> Result<impl Reply, Rejection>
{
let new_cli = msg.item;
let mut clients = db.clients().await;
if clients.contains_key(&new_cli.id) {
Ok(warp::reply::json(
&RawMsg("Already exist".to_string()).as_message()
))
} else {
clients.insert(
new_cli.id.clone(),
UClient::new(new_cli.into_owned())
);
Ok(warp::reply::json(
&RawMsg("Added".to_string()).as_message()
))
}
}
pub async fn get_jobs(
db: Storage) -> Result<impl Reply, Rejection>
{
@ -73,7 +72,7 @@ pub async fn get_jobs(
pub async fn set_jobs(
uid: Option<Uuid>,
msg: Message<'_, IterWrap<JobMetaStorage>>,
msg: Message<'_, ItemWrap<JobMetaStorage>>,
db: Storage) -> Result<impl Reply, Rejection>
{
let mut clients = db.clients().await;
@ -90,11 +89,15 @@ pub async fn set_jobs(
pub async fn ls(db: Storage) -> Result<impl Reply, Rejection> {
let clients = db.clients().await;
let mut result: Vec<ClientInfo> = Vec::with_capacity(clients.len());
let mut result: Vec<Agent> = Vec::with_capacity(clients.len());
for cli in clients.values() {
result.push(cli.client_info.clone());
}
Ok(warp::reply::json(
&IterWrap(result).as_message()
&ItemWrap(result).as_message()
))
}*/
pub async fn dummy() -> Result<impl Reply, Rejection> {
Ok(String::from("ok"))
}

@ -1,4 +1,6 @@
mod handlers;
mod db;
mod errors;
use warp::{
Filter,
@ -10,29 +12,41 @@ use env_logger;
use u_lib::{
MASTER_PORT,
contracts::*,
client::network::Paths
network::Paths
};
use uuid::Uuid;
use db::*;
use serde::{
de::DeserializeOwned
};
#[macro_use]
extern crate diesel;
fn get_content<M: ToMsg>() -> impl Filter<Extract = (Message<'static, M>,),
Error = Rejection> + Clone {
body::content_length_limit(1024*64).and(body::json::<Message<M>>())
fn get_content<M>()
-> impl Filter<Extract = (Message<'static, M>,),
Error = Rejection> + Clone
where
M: Clone + Sync + Send + DeserializeOwned + 'static
{
body::content_length_limit(1024*64)
.and(body::json::<Message<M>>())
}
#[tokio::main]
async fn main() {
env_logger::init();
let base_db = Storage::new();
let base_db = UDB::new().unwrap();
let db = warp::any().map(move || base_db.clone());
let new_client = warp::post()
.and(warp::path(Paths::init))
.and(get_content::<ClientInfo>())
.and(db.clone())
.and_then(handlers::add_client);
//.and(get_content::<IAgent>())
//.and(db.clone())
.and_then(handlers::dummy);
/*
let ls = warp::get()
.and(warp::path(Paths::ls))
.and(db.clone())
@ -46,7 +60,7 @@ async fn main() {
let set_jobs = warp::post()
.and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>().map(Some))
.and(get_content::<IterWrap<JobMetaStorage>>())
.and(get_content::<JobMetaStorage>())
.and(db.clone())
.and_then(handlers::set_jobs);
@ -58,22 +72,23 @@ async fn main() {
let report = warp::post()
.and(warp::path(Paths::report))
.and(get_content::<IterWrap<Vec<JobResult>>>())
.and(get_content::<Vec<JobResult>>())
.and(db.clone())
.and_then(handlers::report);
*/
let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let agent_zone = new_client
.or(get_jobs)
.or(report)
// .or(get_jobs)
// .or(report)
;
let auth_zone = auth_token
.and(ls
.or(set_jobs)
.or(get_job_results)
);
// .and(ls
// .or(set_jobs)
// .or(get_job_results)
// )
;
let routes = auth_zone
.or(agent_zone)

@ -15,3 +15,4 @@ lazy_static = "1.4.0"
tokio = { version = "0.2.22", features = ["macros", "process"] }
reqwest = { version = "0.10.7", features = ["json"] }
futures = "0.3.5"
guess_host_triple = "0.1.2"

@ -1,81 +0,0 @@
use std::{
collections::HashMap
};
use serde::{
Deserialize,
Serialize
};
use uuid::Uuid;
use crate::{contracts::*, UID, exec_job};
pub struct UClient {
pub client_info: ClientInfo,
pub jobs: JobMetaStorage,
}
impl UClient {
pub fn new(client_info: ClientInfo) -> Self {
Self {
client_info,
jobs: HashMap::new()
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClientInfo {
pub info: HashMap<String, String>,
pub id: Uuid,
}
impl ClientInfo {
pub async fn gather() -> Self {
let mut info: HashMap<String, String> = HashMap::new();
for job in DEFAULT_JOBS {
let job_meta = JobMeta::from_shell(job.1.into()).into_arc();
let job_result = exec_job(job_meta.clone()).await;
let job_data = match job_result.unwrap().data.unwrap() {
Ok(output) => output.multiline(),
Err(e) => e.to_string()
};
info.insert(job.0.into(), job_data);
}
ClientInfo {
info,
id: *UID
}
}
pub fn get_field(&self, field: &str) -> Option<&String> {
self.info.get(field)
}
}
const DEFAULT_JOBS: &[(&str, &str)] = &[
//("local ip", "ip a"),
("hostname", "hostname"),
("username", "whoami"),
("platform", "uname -a"),
];
#[cfg(test)]
mod tests {
use super::*;
use crate::{
utils::vec_to_string
};
#[tokio::test]
async fn test_gather() {
let cli_info = ClientInfo::gather().await;
let field = cli_info.get_field("username").unwrap();
let stdout = JobOutput::from_multiline(field).unwrap().stdout;
assert_eq!(
&vec_to_string(&stdout),
"root"
)
}
}

@ -1,4 +0,0 @@
pub mod network;
pub mod client;
pub use client::*;

@ -4,6 +4,7 @@ use uuid::Uuid;
pub const MASTER_SERVER: Ipv4Addr = Ipv4Addr::LOCALHOST; //Ipv4Addr::new(3,9,16,40)
pub const MASTER_PORT: u16 = 63714;
pub type Uid = String;
lazy_static! {
pub static ref UID: Uuid = Uuid::new_v4();
pub static ref UID: Uid = Uuid::new_v4().to_string();
}

@ -0,0 +1,78 @@
use std::{
collections::HashMap,
time::SystemTime
};
use serde::{
Deserialize,
Serialize
};
use guess_host_triple::guess_host_triple;
use crate::{
contracts::*,
UID,
Uid,
exec_job,
utils::vec_to_string
};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Agent {
pub agent_id: Uid,
pub hostname: String,
pub is_root: bool,
pub is_root_allowed: bool,
pub platform: String,
pub status: Option<String>,
pub token: Option<String>,
pub username: String
}
impl Agent {
pub async fn gather() -> Self {
async fn run_cmd_fast(cmd: String) -> String {
let job = exec_job(
JobMeta::from_shell_arc(cmd)
).await;
let job_result = match job.unwrap().data.unwrap() {
Ok(output) => output.multiline(),
Err(e) => e.to_string()
};
JobOutput::from_multiline(&job_result)
.map(|o| vec_to_string(&o.into_appropriate()))
.unwrap_or(job_result)
}
#[cfg(unix)]
Agent {
agent_id: UID.clone().to_string(),
hostname: run_cmd_fast("hostname".to_string()).await,
is_root: &run_cmd_fast("id -u".to_string()).await == "0",
is_root_allowed: false, //TODO
platform: guess_host_triple().unwrap_or("Error").to_string(),
status: None, //TODO
token: None, //TODO
username: run_cmd_fast("id -un".to_string()).await,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_gather() {
let cli_info = Agent::gather().await;
assert_eq!(
&cli_info.username,
"root"
)
}
}

@ -1,77 +0,0 @@
use {
super::*,
tokio::sync::{
Mutex,
MutexGuard
},
std::{
sync::{Arc, Mutex as StdMutex},
collections::HashMap,
},
uuid::Uuid,
serde::{Serialize, Deserialize}
};
pub type CliStorage = HashMap<Uuid, UClient>;
pub type JobResultsStorage = HashMap<Uuid, Vec<JobResult>>;
pub type JobMetaStorage = HashMap<Uuid, JobMeta>;
pub type JobMetaRef = Arc<StdMutex<JobMeta>>;
// because can't impl From<IterWrap<...>> for Cow
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct IterWrap<T>(pub T);
impl<T> IterWrap<T> {
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> From<T> for IterWrap<T> {
fn from(t: T) -> Self {
IterWrap(t)
}
}
impl<T: Clone> ToMsg for IterWrap<T> {}
impl<'cow, T: Clone> From<IterWrap<T>> for Cow<'cow, IterWrap<T>> {
fn from(obj: IterWrap<T>) -> Cow<'cow, IterWrap<T>> {
Cow::Owned(obj)
}
}
impl<'cow, T: Clone> From<&'cow IterWrap<T>> for Cow<'cow, IterWrap<T>> {
fn from(obj: &'cow IterWrap<T>) -> Cow<'cow, IterWrap<T>> {
Cow::Borrowed(obj)
}
}
#[derive(Clone)]
pub struct Storage {
clients: Arc<Mutex<CliStorage>>,
jobs_results: Arc<Mutex<JobResultsStorage>>
}
impl Storage {
pub fn new() -> Self {
Self {
clients: Arc::new(
Mutex::new(HashMap::<Uuid, UClient>::new())
),
jobs_results: Arc::new(
Mutex::new(HashMap::<Uuid, Vec<JobResult>>::new())
)
}
}
pub async fn clients(&self) -> MutexGuard<'_, CliStorage> {
self.clients.lock().await
}
pub async fn results(&self) -> MutexGuard<'_, JobResultsStorage> {
self.jobs_results.lock().await
}
}

@ -15,11 +15,12 @@ use crate::{
UError,
UErrType,
UErrType::JobError,
BoxError,
JobErrType,
UResult,
utils::format_err
};
pub type JobMetaRef = Arc<Mutex<JobMeta>>;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
@ -33,7 +34,6 @@ pub enum ManageAction {
pub enum JobSchedule {
Once,
Permanent,
Terminate // to terminate jobs obvsl
//TODO: Scheduled
}
@ -63,7 +63,7 @@ pub struct JobMeta {
pub state: JobState,
pub exec_type: JobType,
pub schedule: JobSchedule,
pub append_result: bool, //true: append, false: rewrite
//pub append_result: bool, //true: append, false: rewrite
pub payload: Option<Box<Vec<u8>>>,
}
@ -78,13 +78,14 @@ impl JobMeta {
state: JobState::Pending,
exec_type: JobType::Shell,
schedule: JobSchedule::Once,
append_result: true,
payload: Some(Box::new(shell_cmd.into_bytes()))
}
}
pub fn into_arc(self) -> JobMetaRef {
Arc::new(Mutex::new(self))
pub fn from_shell_arc(shell_cmd: String) -> JobMetaRef {
Arc::new(Mutex::new(
Self::from_shell(shell_cmd)
))
}
pub fn touch(&mut self) {
@ -158,6 +159,16 @@ impl JobOutput {
.stderr(parts.next().unwrap_or(vec![]))
})
}
pub fn into_appropriate(self) -> Vec<u8> {
if self.stdout.len() > 0 {
self.stdout
} else if self.stderr.len() > 0 {
self.stderr
} else {
format_err("No data").as_bytes().to_vec()
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -319,7 +330,7 @@ mod tests {
#[tokio::test]
async fn test_is_really_async() {
let secs_to_sleep = 1;
let job = JobMeta::from_shell(format!("sleep {}", secs_to_sleep)).into_arc();
let job = JobMeta::from_shell_arc(format!("sleep {}", secs_to_sleep));
let sleep_jobs = vec![job.clone(), job.clone(), job.clone()];
let now = SystemTime::now();
send_jobs_to_executor(sleep_jobs).await;
@ -328,7 +339,7 @@ mod tests {
#[tokio::test]
async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("whoami".into()).into_arc();
let job = JobMeta::from_shell_arc("whoami".into());
let job_result = exec_job(job.clone()).await.unwrap();
assert_eq!(
vec_to_string(&job_result.data.unwrap()?.stdout).trim(),
@ -339,7 +350,7 @@ mod tests {
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk".into()).into_arc();
let job = JobMeta::from_shell_arc("lol_kek_puk".into());
let job_result = exec_job(job.clone()).await.unwrap();
assert!(job_result.data.unwrap().is_err());
assert_eq!(job_result.retcode, None);

@ -6,7 +6,8 @@ use serde::{
use std::{
borrow::Cow
};
use crate::UID;
use crate::{UID, Uid};
pub trait ToMsg
where Self: Clone {
@ -19,16 +20,17 @@ where Self: Clone {
#[derive(Serialize, Deserialize, Debug)]
pub struct Message<'cow, I>
where I: Clone {
pub id: Uuid,
pub id: Uid,
pub item: Cow<'cow, I>
}
impl<'cow, I> Message<'cow, I>
where I: Clone {
where I: Clone
{
pub fn new<C>(item: C) -> Self
where C: Into<Cow<'cow, I>> {
Self {
id: *UID,
id: UID.clone().to_string(),
item: item.into()
}
}
@ -42,6 +44,36 @@ where I: Clone {
pub struct RawMsg(pub String);
// because can't impl From<ItemWrap<...>> for Cow
#[derive(Serialize, Deserialize, Debug, Clone)]
struct ItemWrap<T>(T);
impl<T> ItemWrap<T> {
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> From<T> for ItemWrap<T> {
fn from(t: T) -> Self {
ItemWrap(t)
}
}
impl<T: Clone> ToMsg for ItemWrap<T> {}
impl<'cow, T: Clone> From<ItemWrap<T>> for Cow<'cow, ItemWrap<T>> {
fn from(obj: ItemWrap<T>) -> Cow<'cow, ItemWrap<T>> {
Cow::Owned(obj)
}
}
impl<'cow, T: Clone> From<&'cow ItemWrap<T>> for Cow<'cow, ItemWrap<T>> {
fn from(obj: &'cow ItemWrap<T>) -> Cow<'cow, ItemWrap<T>> {
Cow::Borrowed(obj)
}
}
/*
#[cfg(test)]
mod tests {

@ -1,12 +1,11 @@
pub mod jobs;
pub mod messaging;
pub mod datatypes;
pub mod agent;
pub use {
crate::client::client::*,
messaging::*,
jobs::*,
datatypes::*
agent::*
};
use std::{
@ -34,4 +33,4 @@ macro_rules! to_message {
}
}
to_message!(ClientInfo, RawMsg, JobMeta, JobResult);
to_message!(Agent, RawMsg, JobMeta, JobResult);

@ -6,7 +6,7 @@ use serde::{
Deserialize
};
pub type BoxError = Box<(dyn StdError + Send + Sync + 'static)>;
//pub type BoxError = Box<(dyn StdError + Send + Sync + 'static)>;
pub type UResult<T> = std::result::Result<T, UError>;

@ -1,9 +1,9 @@
pub mod executor;
pub mod config;
pub mod utils;
pub mod client;
pub mod errors;
pub mod contracts;
pub mod network;
pub use {
utils::*,

@ -75,7 +75,7 @@ macro_rules! build_url_by_method {
};
}
// param and result must impl ToMsg
// param_type and result must impl ToMsg
#[macro_export]
macro_rules! build_handler {
(
@ -84,7 +84,8 @@ macro_rules! build_handler {
$($param_name:literal:)?
$($param_type:ty)?
$(; $url_param:ty)?
) -> $result:ty ) => {
) -> $result:ty
) => {
impl ClientHandler {
pub async fn $path(
&self $(, param: &$param_type)? $(, url_param: &$url_param)?
@ -151,18 +152,19 @@ impl ClientHandler {
// method basic_path(json/query param; additional_url_param) -> return value
// A - admin only
// client listing (A)
build_handler!(GET ls() -> IterWrap<Vec<ClientInfo>>);
// build_handler!(GET ls() -> Vec<QAgent>);
// get jobs for client himself (A: id=client_id)
build_handler!(GET get_jobs() -> IterWrap<JobMetaStorage>);
//build_handler!(GET get_jobs() -> JobMetaStorage);
// add client to server's db
build_handler!(POST init(ClientInfo) -> RawMsg);
build_handler!(POST init(Agent) -> RawMsg);
// ???
build_handler!(POST del() -> ());
/*build_handler!(POST del() -> ());
// set jobs for client (A)
// POST /set_jobs/Uuid json: JobMetaStorage
build_handler!(POST set_jobs(IterWrap<JobMetaStorage>; Uuid) -> ());
build_handler!(POST set_jobs(JobMetaStorage; Uuid) -> ());
// get results (A)
// GET /get_job_results?job_id=Uuid
build_handler!(GET get_job_results("job_id":Uuid) -> IterWrap<Vec<JobResult>>);
build_handler!(GET get_job_results("job_id":Uuid) -> Vec<JobResult>);
// report job result
build_handler!(POST report(IterWrap<Vec<JobResult>>) -> ());
build_handler!(POST report(Vec<JobResult>) -> ());
*/

@ -50,7 +50,6 @@ pub fn vec_to_string(v: &Vec<u8>) -> String {
String::from_utf8_lossy(v).to_string()
}
/*
pub fn generate_auth_token() -> String {
}*/
pub fn format_err(s: &str) -> String {
format!("Error: {}", s)
}
Loading…
Cancel
Save