did some shit to migrate

started full refactoring
because this is big shit
it's easier to start from the very beginning
4-update-check
plazmoid 4 years ago
parent b0ca782fcb
commit c94912252a
  1. 2
      bin/u_agent/src/main.rs
  2. 2
      bin/u_panel/src/main.rs
  3. 1
      bin/u_server/src/handlers.rs
  4. 7
      bin/u_server/src/main.rs
  5. 3
      lib/u_lib/src/api.rs
  6. 58
      lib/u_lib/src/contracts/agent.rs
  7. 36
      lib/u_lib/src/contracts/mod.rs
  8. 142
      lib/u_lib/src/executor.rs
  9. 2
      lib/u_lib/src/lib.rs
  10. 0
      lib/u_lib/src/messaging.rs
  11. 60
      lib/u_lib/src/models/agent.rs
  12. 71
      lib/u_lib/src/models/jobs.rs
  13. 35
      lib/u_lib/src/models/mod.rs
  14. 73
      lib/u_lib/src/models/schema.rs
  15. 5
      migrations/2020-10-24-111622_create_all/down.sql
  16. 19
      migrations/2020-10-24-111622_create_all/up.sql

@ -13,7 +13,7 @@ use {
std::env,
u_lib::{
api::ClientHandler,
contracts::*,
models::*,
send_jobs_to_executor,
},
};

@ -18,7 +18,7 @@ async fn main() -> Result<(), &'static str> {
"ls" => {
let result = cli_handler.ls().await;
for cli in result.iter() {
println!("{:#?}", cli)
println!("{}", cli.0)
}
},
_ => return Err("Unknown method")

@ -1,5 +1,4 @@
use u_lib::{
contracts::*,
models::*
};
use warp::{

@ -15,7 +15,6 @@ use env_logger;
use u_lib::{
MASTER_PORT,
contracts::*,
api::Paths,
models::*
};
@ -52,6 +51,11 @@ async fn main() {
.and(warp::path(Paths::ls))
.and(db.clone())
.and_then(handlers::get_agents);
let upload_job = warp::post()
.and(warp::path(Paths::upload_job))
.and(db.clone())
.and_then(handlers::upload_job);
/*
let get_jobs = warp::get()
.and(warp::path(Paths::get_jobs))
@ -86,6 +90,7 @@ async fn main() {
let auth_zone = auth_token
.and(get_agents
.or(upload_job)
// .or(set_jobs)
// .or(get_job_results)
)

@ -3,7 +3,6 @@
use crate::{
MASTER_SERVER,
MASTER_PORT,
contracts::*,
models::*,
UResult,
UError
@ -158,6 +157,8 @@ build_handler!(GET ls() -> ItemWrap<Vec<Agent>>);
build_handler!(GET get_jobs() -> ItemWrap<Vec<JobMeta>>);
// add client to server's db
build_handler!(POST init(IAgent) -> RawMsg);
// create and upload job (A)
//build_handler!(POST upload_jobs)
// ???
/*build_handler!(POST del() -> ());
// set jobs for client (A)

@ -1,58 +0,0 @@
use std::{
collections::HashMap,
time::SystemTime
};
use crate::{
contracts::*,
UID,
exec_job,
utils::vec_to_string,
models::*
};
use guess_host_triple::guess_host_triple;
pub async fn gather() -> IAgent {
async fn run_cmd_fast(cmd: String) -> String {
let job = exec_job(
JobMeta::from_shell_arc(cmd)
).await;
let job_result = match job.unwrap().data.unwrap() {
Ok(output) => output.multiline(),
Err(e) => e.to_string()
};
JobOutput::from_multiline(&job_result)
.map(|o| vec_to_string(&o.into_appropriate()))
.unwrap_or(job_result)
}
#[cfg(unix)]
IAgent {
alias: None,
id: UID.clone(),
hostname: run_cmd_fast("hostname".to_string()).await,
is_root: &run_cmd_fast("id -u".to_string()).await == "0",
is_root_allowed: false, //TODO
platform: guess_host_triple().unwrap_or("Error").to_string(),
status: None, //TODO
token: None, //TODO
username: run_cmd_fast("id -un".to_string()).await,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_gather() {
let cli_info = gather().await;
assert_eq!(
&cli_info.username,
"plazmoid"
)
}
}

@ -1,36 +0,0 @@
pub mod jobs;
pub mod messaging;
pub mod agent;
pub use {
messaging::*,
jobs::*,
};
use std::{
borrow::Cow
};
use crate::models::*;
macro_rules! to_message {
($($type:ty),+) => { $(
impl ToMsg for $type {}
impl<'cow> From<$type> for Cow<'cow, $type> {
#[inline]
fn from(obj: $type) -> Cow<'cow, $type> {
Cow::Owned(obj)
}
}
impl<'cow> From<&'cow $type> for Cow<'cow, $type> {
#[inline]
fn from(obj: &'cow $type) -> Cow<'cow, $type> {
Cow::Borrowed(obj)
}
} )+
}
}
to_message!(IAgent, Agent, RawMsg, JobMeta, JobResult);

@ -2,142 +2,52 @@
// job runner (thread)
// every job runs in other thread/process
/*
use cron::Schedule as CronSchedule;
enum Schedule {
Persistent, // run forever, restart if stops (set max_retries)
Cron(CronSchedule),
Once
}
*/
use crate::{
contracts::*,
models::*,
UResult,
};
use std::{
sync::{Mutex, MutexGuard, Arc},
};
//use tokio::task::JoinHandle;
use futures::future::{join_all, JoinAll};
use futures::Future;
type Executables = Vec<Job>;
struct AsyncExecutor {
new_jobs: Executables,
promises: Executables
}
impl AsyncExecutor {
pub fn new() -> Self {
Self {
new_jobs: vec![],
promises: vec![]
}
}
/*
pub fn process_jobs(&mut self) {
new_jobs
if job.state() == JobState::Pending {
tokio::spawn(job.run);
}
}
*/
pub async fn apply_job(&mut self, new_job: Job) -> UResult<JobResult> {
/*let id = new_job.id();
let mut job_pool = self.jobs.lock().unwrap();
job_pool.push(new_job);
id*/
tokio::spawn(async move {
new_job.run().await
}).await.unwrap()
}
pub async fn apply_jobs(&mut self, new_jobs: Vec<Job>) -> Vec<UResult<JobResult>> {
/*
let mut job_pool = self.jobs.lock().unwrap();
job_pool.extend(
new_jobs.into_iter()
.map(|job| (job.id(), job))
);*/
let futures = new_jobs.into_iter().map(|mut job| {
job.run()
}).collect::<Vec<_>>();
join_all(futures).await
use std::collections::HashMap;
use std::pin::Pin;
use std::thread::sleep;
use std::time::{Duration, Instant};
use tokio::process::Command;
}
/*
pub fn get_job_result(&self, id: &Uuid) -> Option<JobResult> {
let mut job_pool = self.jobs.lock().unwrap();
let (state, result) = match job_pool.get(id) {
Some(job) => (job.state(), job.get_result()),
None => return None
};
if state == JobState::Finished {
job_pool.remove(&id);
}
Some(result)
}
use futures::{lock::Mutex, prelude::*};
use lazy_static::lazy_static;
use tokio::{prelude::*, spawn, task::JoinHandle};
use uuid::Uuid;
pub fn get_all_results(&self) -> Vec<JobResult> {
let mut job_pool = self.jobs.lock().unwrap();
let to_remove = job_pool.values()
.filter(|job| job.finished())
.map(|job| job.id())
.collect::<Vec<Uuid>>();
let results = job_pool.values()
.map(|job| job.get_result())
.collect();
to_remove.into_iter().for_each(|id| {
job_pool.remove(&id);
});
results
}*/
}
pub type FutRes = String;
type BoxFut<O> = Pin<Box<dyn Future<Output = O> + Send + 'static>>;
lazy_static! {
static ref EXECUTOR: Mutex<AsyncExecutor> =
Mutex::new(AsyncExecutor::new());
}
/*
pub fn get_job_result(id: &Uuid, wait: bool) -> Option<JobResult> {
let executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
loop {
let result = executor.get_job_result(&id);
if wait {
if result.is_some() && result.as_ref().unwrap().state == JobState::Finished {
return result
}
thread::sleep(Duration::from_secs(1))
} else {
return result
}
}
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinHandle<FutRes>>> = Mutex::new(HashMap::new());
}
pub fn get_all_results() -> Vec<JobResult> {
let executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
executor.get_all_results()
pub async fn apply_tasks(tasks: Vec<impl Future<Output=FutRes> + Send + 'static>) -> Vec<Uuid> {
let mut fids = Vec::<Uuid>::new();
for f in tasks.into_iter() {
let fid = Uuid::new_v4();
fids.push(fid);
let result = spawn(Box::pin(f));
FUT_RESULTS.lock().await.insert(fid, result);
}
fids
}
*/
// run jobs without awaiting (now is waiting)
pub async fn send_jobs_to_executor(
jobs: Vec<JobMetaRef>
) -> Vec<UResult<JobResult>> {
let mut executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
let executables = jobs.into_iter()
.map(|job_meta| Job::new(job_meta))
.collect();
executor.apply_jobs(executables).await
pub async fn apply_task(task: impl Future<Output=FutRes> + Send + 'static) -> Uuid {
apply_tasks(vec![Box::pin(task)]).await[0]
}
// run job and await result
pub async fn exec_job(job_meta: JobMetaRef) -> UResult<JobResult> {
let mut executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
let job = Job::new(job_meta);
executor.apply_job(job).await
pub async fn pop(fid: Uuid) -> Option<JoinHandle<FutRes>> {
FUT_RESULTS.lock().await.remove(&fid)
}

@ -2,9 +2,9 @@ pub mod executor;
pub mod config;
pub mod utils;
pub mod errors;
pub mod contracts;
pub mod api;
pub mod models;
pub mod messaging;
pub use {
utils::*,

@ -3,15 +3,23 @@ use serde::{
Deserialize
};
use std::time::SystemTime;
use std::collections::HashMap;
use diesel::{
Queryable,
Identifiable,
Insertable
};
use crate::models::schema::*;
use uuid::Uuid;
;
use crate::{
models::*
UID,
exec_job,
utils::vec_to_string,
models::schema::*
};
type Uid = String;
use guess_host_triple::guess_host_triple;
use uuid::Uuid;
//belongs_to
#[derive(Clone, Debug, Serialize, Deserialize, Identifiable, Queryable)]
@ -43,3 +51,49 @@ pub struct IAgent {
pub token: Option<String>,
pub username: String
}
pub async fn gather() -> IAgent {
async fn run_cmd_fast(cmd: String) -> String {
let job = exec_job(
JobMeta::from_shell_arc(cmd)
).await;
let job_result = match job.unwrap().data.unwrap() {
Ok(output) => output.multiline(),
Err(e) => e.to_string()
};
JobOutput::from_multiline(&job_result)
.map(|o| vec_to_string(&o.into_appropriate()))
.unwrap_or(job_result)
}
#[cfg(unix)]
IAgent {
alias: None,
id: UID.clone(),
hostname: run_cmd_fast("hostname".to_string()).await,
is_root: &run_cmd_fast("id -u".to_string()).await == "0",
is_root_allowed: false, //TODO
platform: guess_host_triple().unwrap_or("Error").to_string(),
status: None, //TODO
token: None, //TODO
username: run_cmd_fast("id -un".to_string()).await,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_gather() {
let cli_info = gather().await;
assert_eq!(
&cli_info.username,
"plazmoid"
)
}
}

@ -9,18 +9,20 @@ use serde::{
Deserialize
};
use uuid::Uuid;
use guess_host_triple::guess_host_triple;
use tokio::process::Command;
use super::*;
use crate::{
models::*
UError,
UErrType,
UErrType::JobError,
JobErrType,
UResult,
utils::format_err
utils::format_err,
UID
};
pub type JobMetaRef = Arc<Mutex<JobMeta>>;
//pub type JobMetaRef = Arc<Mutex<JobMeta>>;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
@ -48,7 +50,7 @@ pub enum JobState {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum JobType {
Manage(ManageAction),
Manage,
Shell,
Python,
Binary
@ -57,13 +59,10 @@ pub enum JobType {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobMeta {
pub id: Uuid,
pub name: String,
pub created: SystemTime,
pub updated: SystemTime,
pub state: JobState,
pub alias: String,
pub exec_type: JobType,
pub schedule: JobSchedule,
//pub append_result: bool, //true: append, false: rewrite
pub platform: String,
pub payload: Option<Box<Vec<u8>>>,
}
@ -72,12 +71,10 @@ impl JobMeta {
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
id: Uuid::new_v4(),
name: job_name.to_string(),
created: SystemTime::now(),
updated: SystemTime::now(),
state: JobState::Pending,
alias: job_name.to_string(),
exec_type: JobType::Shell,
schedule: JobSchedule::Once,
platform: guess_host_triple().unwrap_or("").to_string(),
payload: Some(Box::new(shell_cmd.into_bytes()))
}
}
@ -87,10 +84,6 @@ impl JobMeta {
Self::from_shell(shell_cmd)
))
}
pub fn touch(&mut self) {
self.updated = SystemTime::now();
}
}
@ -173,36 +166,42 @@ impl JobOutput {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobResult {
pub id: Uuid,
pub data: Option<Result<JobOutput, UError>>,
//pub id: i32,
pub agent_id: Uuid,
pub job_id: Uuid,
pub result: Option<Result<JobOutput, UError>>,
pub state: JobState,
pub retcode: Option<i32>,
pub timestamp: SystemTime,
}
pub struct Job {
result: JobResult,
meta: JobMetaRef,
}
impl Job {
pub fn new(job_meta: JobMetaRef) -> Self {
let id = job_meta.lock().unwrap().id.clone();
let state = job_meta.lock().unwrap().state.clone();
Self {
result: JobResult {
id,
impl JobResult {
pub fn from_meta(meta: &JobMeta) -> Self {
let job_id = meta.id.clone();
let state = meta.state.clone();
JobResult {
agent_id: *UID,
job_id,
state: if state == JobState::Queued {
JobState::Pending
} else {
state
},
data: None,
result: None,
retcode: None,
timestamp: SystemTime::now()
},
meta: job_meta,
}
}
}
pub struct Job {
result: JobResult
}
impl Job {
pub fn new(job_meta: JobMeta) -> Self {
Self {
result: JobResult::from_meta(&job_meta),
}
}
@ -311,7 +310,7 @@ impl Job {
self.state() == JobState::Finished
}
pub fn into_result(mut self) -> JobResult {
pub fn into_result(mut self) -> JobResult { //TODO: Cow
self.result.timestamp = SystemTime::now();
self.result
}

@ -1,5 +1,38 @@
mod agent;
pub mod schema;
pub mod jobs;
pub use crate::{
models::{
agent::*,
jobs::*,
},
messaging::*,
};
pub use agent::*;
use std::{
borrow::Cow
};
macro_rules! to_message {
($($type:ty),+) => { $(
impl ToMsg for $type {}
impl<'cow> From<$type> for Cow<'cow, $type> {
#[inline]
fn from(obj: $type) -> Cow<'cow, $type> {
Cow::Owned(obj)
}
}
impl<'cow> From<&'cow $type> for Cow<'cow, $type> {
#[inline]
fn from(obj: &'cow $type) -> Cow<'cow, $type> {
Cow::Borrowed(obj)
}
} )+
}
}
to_message!(IAgent, Agent, RawMsg, JobMeta, JobResult);

@ -1,73 +0,0 @@
table! {
agents (id) {
alias -> Nullable<Text>,
agent_id -> Text,
hostname -> Text,
id -> Integer,
is_root -> Bool,
is_root_allowed -> Bool,
last_active -> Timestamp,
platform -> Text,
regtime -> Timestamp,
status -> Nullable<Text>,
token -> Nullable<Text>,
username -> Text,
}
}
table! {
certificates (id) {
agent_id -> Integer,
id -> Integer,
is_revoked -> Bool,
}
}
table! {
ip_addrs (id) {
agent_id -> Integer,
check_ts -> Timestamp,
gateway -> Nullable<Text>,
id -> Integer,
iface -> Text,
ip_addr -> Text,
is_gray -> Bool,
netmask -> Text,
}
}
table! {
jobs (id) {
alias -> Nullable<Text>,
id -> Integer,
job_type -> Text,
exec_type -> Text,
platform -> Nullable<Text>,
data -> Binary,
}
}
table! {
results (id) {
agent_id -> Integer,
created -> Timestamp,
id -> Integer,
job_id -> Integer,
result -> Nullable<Binary>,
status -> Nullable<Text>,
ts -> Timestamp,
}
}
joinable!(certificates -> agents (agent_id));
joinable!(ip_addrs -> agents (agent_id));
joinable!(results -> agents (agent_id));
joinable!(results -> jobs (job_id));
allow_tables_to_appear_in_same_query!(
agents,
certificates,
ip_addrs,
jobs,
results,
);

@ -1,4 +1 @@
DROP TABLE agents;
DROP TABLE ip_addrs;
DROP TABLE jobs;
DROP TABLE results;
DROP DATABASE u_db;

@ -1,4 +1,6 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE JOB_TYPE AS ENUM ('shell', 'manage', 'binary', 'python');
CREATE TYPE TASK_STATUS AS ENUM ('queued', 'running', 'finished');
CREATE TABLE IF NOT EXISTS agents (
alias TEXT
@ -21,7 +23,7 @@ CREATE TABLE IF NOT EXISTS ip_addrs (
agent_id UUID NOT NULL
, check_ts TIMESTAMP NOT NULL
, gateway TEXT
, id SERIAL
, id UUID NOT NULL DEFAULT uuid_generate_v4()
, iface TEXT NOT NULL
, ip_addr TEXT NOT NULL
, is_gray BOOLEAN NOT NULL DEFAULT true
@ -32,13 +34,11 @@ CREATE TABLE IF NOT EXISTS ip_addrs (
CREATE TABLE IF NOT EXISTS jobs (
alias TEXT
, id SERIAL
, id UUID NOT NULL DEFAULT uuid_generate_v4()
-- Shell, Binary (with program download),
-- Python (with program and python download if not exist), Management
, job_type TEXT CHECK(job_type IN ('S','B','P','M')) NOT NULL DEFAULT 'S'
-- Executable type: ALL - no matter, W - windows, L = linux
, exec_type TEXT CHECK(exec_type IN ('ALL', 'W', 'L')) NOT NULL DEFAULT 'L'
, platform TEXT CHECK(platform IN ('x86', 'x64', 'aarch32', 'aarch64'))
, exec_type JOB_TYPE NOT NULL DEFAULT 'shell'
, platform TEXT NOT NULL
, path TEXT NOT NULL
, PRIMARY KEY(id)
);
@ -46,11 +46,12 @@ CREATE TABLE IF NOT EXISTS jobs (
CREATE TABLE IF NOT EXISTS results (
agent_id UUID NOT NULL
, created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, id SERIAL
, id UUID NOT NULL DEFAULT uuid_generate_v4()
, job_id INTEGER NOT NULL
, result TEXT
, retcode INTEGER
-- Queued, Pending, Running, Finished
, status TEXT CHECK(status IN ('Q', 'P', 'R', 'F'))
, state TASK_STATUS NOT NULL DEFAULT 'queued'
, ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, FOREIGN KEY(agent_id) REFERENCES agents(id)
, FOREIGN KEY(job_id) REFERENCES jobs(id)
@ -59,7 +60,7 @@ CREATE TABLE IF NOT EXISTS results (
CREATE TABLE IF NOT EXISTS certificates (
agent_id UUID NOT NULL
, id SERIAL
, id UUID NOT NULL DEFAULT uuid_generate_v4()
, is_revoked BOOLEAN NOT NULL DEFAULT FALSE
, PRIMARY KEY(id)
, FOREIGN KEY(agent_id) REFERENCES agents(id)

Loading…
Cancel
Save