exequteor v2

4-update-check
plazmoid 4 years ago
parent 28c3e96966
commit aedd8f774c
  1. 1
      diesel.toml
  2. 2
      lib/u_lib/Cargo.toml
  3. 56
      lib/u_lib/src/executor.rs
  4. 22
      lib/u_lib/src/models/agent.rs
  5. 182
      lib/u_lib/src/models/jobs.rs
  6. 9
      migrations/2020-10-24-111622_create_all/down.sql
  7. 15
      migrations/2020-10-24-111622_create_all/up.sql

@ -3,3 +3,4 @@
[print_schema]
file = "lib/u_lib/src/models/schema.rs"
import_types = ["diesel::sql_types::*", "crate::*"]

@ -17,7 +17,7 @@ reqwest = { version = "0.10.7", features = ["json"] }
futures = "0.3.5"
guess_host_triple = "0.1.2"
thiserror = "*"
async-trait = "*"
diesel-derive-enum = { version = "1", features = ["postgres"] }
[dependencies.diesel]
version = "1.4.5"

@ -8,73 +8,91 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::thread::sleep;
use std::time::{Duration, Instant};
use std::slice::Iter;
use std::task::Poll;
use tokio::process::Command;
use futures::{lock::Mutex, prelude::*};
use futures::{lock::Mutex, prelude::*, poll};
use lazy_static::lazy_static;
use tokio::{prelude::*, spawn, task::JoinHandle};
use uuid::Uuid;
pub type FutRes = UResult<JobResult>;
type BoxFut<O> = Pin<Box<dyn Future<Output = O> + Send + 'static>>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinHandle<FutRes>>> = Mutex::new(HashMap::new());
}
//TODO: waiter struct
pub async fn append_task(task: impl Future<Output=FutRes> + Send + 'static) -> Uuid {
let fid = Uuid::new_v4();
let result = spawn(Box::pin(task));
FUT_RESULTS.lock().await.insert(fid, result);
fid
}
pub async fn append_tasks(tasks: Vec<impl Future<Output=FutRes> + Send + 'static>) -> Vec<Uuid> {
let mut fids = Vec::<Uuid>::new();
for f in tasks.into_iter() {
let fid = Uuid::new_v4();
let fid = append_task(f).await;
fids.push(fid);
let result = spawn(Box::pin(f));
FUT_RESULTS.lock().await.insert(fid, result);
}
fids
}
pub async fn append_task(task: impl Future<Output=FutRes> + Send + 'static) -> Uuid {
append_tasks(vec![Box::pin(task)]).await[0]
}
pub async fn pop_task(fid: Uuid) -> JoinHandle<FutRes> {
FUT_RESULTS.lock().await.remove(&fid).expect(&UError::NoTask(fid).to_string())
}
async fn task_present(fid: Uuid) -> bool {
pub async fn task_present(fid: Uuid) -> bool {
FUT_RESULTS.lock().await.get(&fid).is_some()
}
pub async fn pop_task_if_completed(fid: Uuid) -> Option<FutRes>{
let task = FUT_RESULTS
let mut tasks = FUT_RESULTS
.lock()
.await
.await;
let task = tasks
.get_mut(&fid)
.expect(&UError::NoTask(fid).to_string());
let status = match task.poll() {
let status = match poll!(task) {
Poll::Pending => None,
Poll::Ready(r) => Some(r.unwrap())
};
if status.is_some() {
pop_task(fid);
pop_task(fid).await;
}
status
}
pub async fn pop_completed(fids: Option<Vec<Uuid>>) -> Vec<Option<FutRes>> {
match fids {
let fids = match fids {
Some(v) => v,
None => FUT_RESULTS.lock()
.await
.keys()
.map(|k| *k)
.collect::<Vec<Uuid>>()
}.into_iter().map(pop_task_if_completed).collect()
};
let mut completed: Vec<Option<FutRes>> = vec![];
for fid in fids {
completed.push(pop_task_if_completed(fid).await)
}
completed
}
pub async fn wait_for_task(fid: Uuid) -> FutRes {
pop_task(fid).await.await.unwrap()
}
pub async fn wait_for_tasks(fids: Vec<Uuid>) -> Vec<FutRes> {
let mut results = vec![];
for fid in fids {
results.push(wait_for_task(fid).await);
}
results
}
pub async fn run_until_complete(task: impl Future<Output=FutRes> + Send + 'static) -> FutRes {
let task_fid = append_task(task).await;
pop_task(task_fid).await.await.unwrap()
let fid = append_task(task).await;
wait_for_task(fid).await
}

@ -3,7 +3,6 @@ use serde::{
Deserialize
};
use std::time::SystemTime;
use std::collections::HashMap;
use diesel::{
Queryable,
Identifiable,
@ -16,7 +15,7 @@ use crate::{
utils::vec_to_string,
models::schema::*,
executor::*,
jobs::create_job
jobs::exec_job
};
use guess_host_triple::guess_host_triple;
@ -55,29 +54,24 @@ pub struct IAgent {
pub async fn gather() -> IAgent {
async fn run_cmd_fast(cmd: String) -> String {
let job_result = exec_job(JobMeta::from_shell(cmd)).await;
let job_result = match job_result.unwrap().result.unwrap() {
Ok(output) => output.multiline(),
Err(e) => e.to_string()
};
JobOutput::from_multiline(&job_result)
async fn run_cmd_fast<S: Into<String>>(cmd: S) -> String {
let job_result = exec_job(JobMeta::from_shell(cmd)).await.unwrap().result.unwrap();
JobOutput::from_raw(&job_result)
.map(|o| vec_to_string(&o.into_appropriate()))
.unwrap_or(job_result)
.unwrap_or(String::from_utf8_lossy(&job_result).to_string())
}
#[cfg(unix)]
IAgent {
alias: None,
id: UID.clone(),
hostname: run_cmd_fast("hostname".to_string()).await,
is_root: &run_cmd_fast("id -u".to_string()).await == "0",
hostname: run_cmd_fast("hostname").await,
is_root: &run_cmd_fast("id -u").await == "0",
is_root_allowed: false, //TODO
platform: guess_host_triple().unwrap_or("Error").to_string(),
status: None, //TODO
token: None, //TODO
username: run_cmd_fast("id -un".to_string()).await,
username: run_cmd_fast("id -un").await,
}
}

@ -10,10 +10,13 @@ use serde::{
use uuid::Uuid;
use guess_host_triple::guess_host_triple;
use tokio::process::Command;
use crate::{models::schema::*, UError, UResult, UID, run_until_complete};
use async_trait::async_trait;
//pub type JobMetaRef = Arc<Mutex<JobMeta>>;
use crate::{models::schema::*, UError, UResult, UID, run_until_complete, append_tasks, append_task, wait_for_tasks};
use diesel_derive_enum::DbEnum;
use diesel::{
Queryable,
Identifiable,
Insertable
};
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -31,16 +34,16 @@ pub enum JobSchedule {
//TODO: Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobState {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum)]
pub enum Jobstate {
Queued, // server created a job, but client didn't get it yet
Pending, // client got a job, but not running yet
//Pending, // client got a job, but not running yet
Running, // client is currently running a job
// Rerunning, // if job is cycled
Finished,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum)]
#[PgType = "JobType"]
pub enum JobType {
Manage,
Shell,
@ -49,7 +52,7 @@ pub enum JobType {
Dummy
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Clone, Debug)]
pub struct JobOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
@ -85,23 +88,24 @@ impl JobOutput {
self
}
pub fn multiline(&self) -> String {
let mut result = String::new();
pub fn multiline(&self) -> Vec<u8> {
let mut result: Vec<u8> = vec![];
if self.stdout.len() > 0 {
result += &format!("{stdout_head}{stdout}\n",
stdout_head = JobOutput::create_delim(JobOutput::STDOUT),
stdout = String::from_utf8_lossy(&self.stdout))
result.extend(JobOutput::create_delim(JobOutput::STDOUT).into_bytes());
result.extend(&self.stdout);
result.push(b'\n');
}
if self.stderr.len() > 0 {
result += &format!("{stderr_head}{stderr}\n",
stderr_head = JobOutput::create_delim(JobOutput::STDERR),
stderr = String::from_utf8_lossy(&self.stderr))
result.extend(JobOutput::create_delim(JobOutput::STDERR).into_bytes());
result.extend(&self.stderr);
result.push(b'\n');
}
result
}
pub fn from_multiline(raw: &String) -> Option<Self> {
pub fn from_raw(raw: &[u8]) -> Option<Self> {
let raw = String::from_utf8_lossy(raw);
let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| {
@ -121,29 +125,31 @@ impl JobOutput {
} else if self.stderr.len() > 0 {
self.stderr
} else {
UError::Raw("No data").to_string().as_bytes().to_vec()
UError::Raw("No data").to_string().into_bytes()
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Identifiable)]
#[table_name = "jobs"]
pub struct JobMeta {
pub id: Uuid,
pub alias: String,
pub id: Uuid,
pub exec_type: JobType,
pub schedule: JobSchedule,
//pub schedule: JobSchedule,
pub platform: String,
pub payload: Option<Box<Vec<u8>>>,
}
impl JobMeta {
pub fn from_shell(shell_cmd: String) -> Self {
pub fn from_shell<S: Into<String>>(shell_cmd: S) -> Self {
let shell_cmd = shell_cmd.into();
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
id: Uuid::new_v4(),
alias: job_name.to_string(),
exec_type: JobType::Shell,
schedule: JobSchedule::Once,
//schedule: JobSchedule::Once,
platform: guess_host_triple().unwrap_or("unknown").to_string(),
payload: Some(Box::new(shell_cmd.into_bytes()))
}
@ -151,27 +157,23 @@ impl JobMeta {
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, Queryable, Insertable)]
#[table_name = "results"]
pub struct JobResult {
pub agent_id: Uuid,
pub job_id: Uuid,
pub result: Option<Result<JobOutput, UError>>,
pub state: JobState,
pub result: Option<Vec<u8>>,
// pub state: Jobstate,
pub retcode: Option<i32>,
pub timestamp: SystemTime,
pub ts: SystemTime,
}
impl JobResult {
pub fn from_meta(meta: &JobMeta) -> Self {
let job_id = meta.id.clone();
JobResult {
agent_id: *UID,
job_id,
state: JobState::Running,
result: None,
retcode: None,
timestamp: SystemTime::now()
}
pub fn from_meta(job_id: Uuid) -> Self {
let mut inst = JobResult::default();
inst.agent_id = *UID;
inst.job_id = job_id;
inst
}
}
@ -181,9 +183,9 @@ impl Default for JobResult {
agent_id: Uuid::nil(),
job_id: Uuid::nil(),
result: None,
state: JobState::Running,
// state: Jobstate::Running,
retcode: None,
timestamp: SystemTime::now()
ts: SystemTime::now()
}
}
}
@ -205,7 +207,7 @@ impl Job {
Ok(Self {
exec_type: job_meta.exec_type,
payload: job_meta.payload,
result: JobResult::from_meta(&job_meta)
result: JobResult::from_meta(job_meta.id.clone())
})
},
_ => todo!()
@ -213,7 +215,7 @@ impl Job {
}
async fn run(mut self) -> UResult<JobResult> {
match job_meta.exec_type {
match self.exec_type {
JobType::Shell => {
let str_payload = match &self.payload {
Some(box_payload) => {
@ -235,23 +237,24 @@ impl Job {
let (data, retcode) = match cmd_result {
Ok(output) => {
(
Some(Ok(JobOutput::new()
Some(JobOutput::new()
.stdout(output.stdout.to_vec())
.stderr(output.stderr.to_vec()))
.stderr(output.stderr.to_vec())
.multiline()
),
output.status.code()
)
}
Err(e) => {
(
Some(Err(UError::JobError(e.to_string()))),
Some(UError::JobError(e.to_string()).to_string().into_bytes()),
None
)
}
};
self.result.result = data;
self.result.retcode = retcode;
self.result.timestamp = SystemTime::now();
self.result.ts = SystemTime::now();
},
_ => todo!()
}
@ -259,47 +262,92 @@ impl Job {
}
}
pub async fn exec_jobs(jobs: Vec<JobMeta>) -> Vec<UResult<JobResult>> {
let fids = exec_jobs_nowait(jobs).await.unwrap();
wait_for_tasks(fids).await
}
pub async fn exec_jobs_nowait(jobs: Vec<JobMeta>) -> UResult<Vec<Uuid>> {
let prepared_jobs = jobs.into_iter().map(|job| {
let j = Job::build(job).unwrap();
j.run()
}).collect();
let fids = append_tasks(prepared_jobs).await;
Ok(fids)
}
pub async fn exec_job(job_meta: JobMeta) -> UResult<JobResult> {
let job = create_job(job_meta);
let job = Job::build(job_meta)?;
run_until_complete(job.run()).await
}
pub async fn exec_job_nowait(job_meta: JobMeta) -> UResult<Uuid> {
let job = Job::build(job_meta)?;
let fid = append_task(job.run()).await;
Ok(fid)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
send_jobs_to_executor,
utils::vec_to_string
};
use crate::{exec_job, utils::vec_to_string, wait_for_task, append_task};
#[tokio::test]
async fn test_is_really_async() {
let secs_to_sleep = 1;
let job = JobMeta::from_shell_arc(format!("sleep {}", secs_to_sleep));
const SLEEP_SECS: u64 = 1;
let job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let sleep_jobs = vec![job.clone(), job.clone(), job.clone()];
let now = SystemTime::now();
send_jobs_to_executor(sleep_jobs).await;
assert_eq!(now.elapsed().unwrap().as_secs(), secs_to_sleep)
let fids = exec_jobs_nowait(sleep_jobs).await.unwrap();
for f in fids.into_iter() {
wait_for_task(f).await;
}
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS)
}
#[tokio::test]
async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell_arc("whoami".into());
let job_result = exec_job(job.clone()).await.unwrap();
let job = JobMeta::from_shell("whoami");
let job_result = exec_job(job).await.unwrap();
let stdout = JobOutput::from_raw(&job_result.result.unwrap()).unwrap().stdout;
assert_eq!(
vec_to_string(&job_result.data.unwrap()?.stdout).trim(),
vec_to_string(&stdout).trim(),
"plazmoid"
);
Ok(())
}
#[tokio::test]
async fn test_complex_shell_jobs_load() -> UResult<()> {
const SLEEP_SECS: u64 = 1;
let now = SystemTime::now();
let longest_job = JobMeta::from_shell(format!("sleep {}", SLEEP_SECS));
let longest_job_id = exec_job_nowait(longest_job).await.unwrap();
let ls = exec_job(JobMeta::from_shell("ls")).await.unwrap();
assert_eq!(ls.retcode.unwrap(), 0);
let result = JobOutput::from_raw(&ls.result.unwrap()).unwrap();
let folders = String::from_utf8_lossy(
&result.stdout
);
let ls_subfolders = exec_jobs(
folders.lines().map(|f| JobMeta::from_shell(format!("ls {}", f))).collect()
).await;
for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0);
}
wait_for_task(longest_job_id).await;
assert_eq!(now.elapsed().unwrap().as_secs(), SLEEP_SECS);
Ok(())
}
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell_arc("lol_kek_puk".into());
let job_result = exec_job(job.clone()).await.unwrap();
assert!(job_result.data.unwrap().is_err());
assert_eq!(job_result.retcode, None);
let job = JobMeta::from_shell("lol_kek_puk");
let job_result = exec_job(job).await.unwrap();
let output = JobOutput::from_raw(&job_result.result.unwrap());
assert!(output.is_none());
assert!(job_result.retcode.is_none());
Ok(())
}
@ -315,7 +363,7 @@ mod tests {
lol\n\
*** STDERR ***\n\
kek\n"
)
).into_bytes()
)
}
@ -328,15 +376,15 @@ mod tests {
String::from(
"*** STDERR ***\n\
kek\n"
)
).into_bytes()
)
}
#[test]
fn test_from_multiline() {
let txt = "*** STDOUT ***\n\
puk\n".to_string();
let output = JobOutput::from_multiline(&txt).unwrap();
puk\n".as_bytes();
let output = JobOutput::from_raw(txt).unwrap();
assert_eq!(
output.stdout,
b"puk".to_vec()

@ -1 +1,8 @@
DROP DATABASE u_db;
DROP TABLE ip_addrs;
DROP TABLE results;
DROP TABLE certificates;
DROP TABLE jobs;
DROP TABLE agents;
DROP TYPE IF EXISTS JobState;
DROP TYPE IF EXISTS JobType;

@ -1,6 +1,6 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE JOB_TYPE AS ENUM ('shell', 'manage', 'binary', 'python');
CREATE TYPE TASK_STATUS AS ENUM ('queued', 'running', 'finished');
CREATE TYPE JobType AS ENUM ('shell', 'manage', 'binary', 'python');
CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished');
CREATE TABLE IF NOT EXISTS agents (
alias TEXT
@ -37,9 +37,9 @@ CREATE TABLE IF NOT EXISTS jobs (
, id UUID NOT NULL DEFAULT uuid_generate_v4()
-- Shell, Binary (with program download),
-- Python (with program and python download if not exist), Management
, exec_type JOB_TYPE NOT NULL DEFAULT 'shell'
, exec_type JobType NOT NULL DEFAULT 'shell'
, platform TEXT NOT NULL
, path TEXT NOT NULL
, payload BYTEA
, PRIMARY KEY(id)
);
@ -47,11 +47,10 @@ CREATE TABLE IF NOT EXISTS results (
agent_id UUID NOT NULL
, created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, id UUID NOT NULL DEFAULT uuid_generate_v4()
, job_id INTEGER NOT NULL
, result TEXT
, job_id UUID NOT NULL
, result BYTEA
, retcode INTEGER
-- Queued, Pending, Running, Finished
, state TASK_STATUS NOT NULL DEFAULT 'queued'
-- , state JobState NOT NULL DEFAULT 'queued'
, ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, FOREIGN KEY(agent_id) REFERENCES agents(id)
, FOREIGN KEY(job_id) REFERENCES jobs(id)

Loading…
Cancel
Save