4-update-check
plazmoid 4 years ago
parent 2864511f7f
commit 47a0ae4b2b
  1. 2
      bin/u_server/src/handlers.rs
  2. 4
      bin/u_server/src/main.rs
  3. 0
      cargo_musl.sh
  4. 2
      lib/u_lib/Cargo.toml
  5. 24
      lib/u_lib/src/client/client.rs
  6. 13
      lib/u_lib/src/client/network.rs
  7. 12
      lib/u_lib/src/contracts/datatypes.rs
  8. 200
      lib/u_lib/src/contracts/jobs.rs
  9. 9
      lib/u_lib/src/contracts/messaging.rs
  10. 2
      lib/u_lib/src/contracts/mod.rs
  11. 32
      lib/u_lib/src/errors.rs
  12. 136
      lib/u_lib/src/executor.rs

@ -42,7 +42,7 @@ pub async fn get_jobs(
pub async fn set_jobs(
uid: Option<Uuid>,
msg: Message<'_, CollectionWrapper<JobStorage>>,
msg: Message<'_, CollectionWrapper<JobMetaStorage>>,
db: Storage) -> Result<impl Reply, Rejection>
{
let mut clients = db.lock().await;

@ -46,14 +46,14 @@ async fn main() {
let set_jobs = warp::post()
.and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>().map(Some))
.and(get_content::<CollectionWrapper<JobStorage>>())
.and(get_content::<CollectionWrapper<JobMetaStorage>>())
.and(db.clone())
.and_then(handlers::set_jobs);
let update_own_jobs = warp::post()
.and(warp::path(Paths::set_jobs))
.and(warp::path::param::<Uuid>().map(Some))
.and(get_content::<CollectionWrapper<JobStorage>>())
.and(get_content::<CollectionWrapper<JobMetaStorage>>())
.and(db.clone())
.and_then(handlers::set_jobs);

@ -14,4 +14,4 @@ libc = "^0.2"
lazy_static = "1.4.0"
tokio = { version = "0.2.22", features = ["macros", "process"] }
reqwest = { version = "0.10.7", features = ["json"] }
bytes = "0.5.6"
futures = "0.3.5"

@ -12,7 +12,7 @@ use crate::{contracts::*, UID, exec_job};
pub struct UClient {
pub client_info: ClientInfo,
pub jobs: JobStorage, // TODO: to futures
pub jobs: JobMetaStorage,
}
impl UClient {
@ -31,12 +31,12 @@ pub struct ClientInfo {
}
impl ClientInfo {
pub fn gather() -> Self {
pub async fn gather() -> Self {
let mut info: HashMap<String, String> = HashMap::new();
for job in DEFAULT_JOBS {
let mut job_meta = JobMeta::from_shell(job.1.into());
let job_result = exec_job(&mut job_meta);
let job_data = match job_result.data {
let job_meta = JobMeta::from_shell(job.1.into()).into_arc();
let job_result = exec_job(job_meta.clone()).await;
let job_data = match job_result.unwrap().data.unwrap() {
Ok(output) => output.multiline(),
Err(e) => e.to_string()
};
@ -64,17 +64,19 @@ const DEFAULT_JOBS: &[(&str, &str)] = &[
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::vec_to_string;
use crate::{
utils::vec_to_string
};
use std::time::SystemTime;
#[test]
fn test_gather() {
let cli_info = ClientInfo::gather();
#[tokio::test]
async fn test_gather() {
let cli_info = ClientInfo::gather().await;
let field = cli_info.get_field("username").unwrap();
let stdout = JobOutput::from_multiline(field).unwrap().stdout;
assert_eq!(
&vec_to_string(&stdout),
"plazmoid"
"root"
)
}
}

@ -103,8 +103,15 @@ impl ClientHandler {
}
}
build_handler!(POST init(ClientInfo) -> RawMsg);
// A - admin only
// client listing (A)
build_handler!(GET ls() -> CollectionWrapper<Vec<ClientInfo>>);
// get jobs for client himself (A: id=client_id)
build_handler!(GET get_jobs() -> CollectionWrapper<JobMetaStorage>);
// add client to server's db
build_handler!(POST init(ClientInfo) -> RawMsg);
// ???
build_handler!(POST del() -> ());
build_handler!(GET get_jobs() -> CollectionWrapper<JobStorage>);
build_handler!(POST set_jobs(CollectionWrapper<JobStorage>) -> ());
// set jobs for client (A)
build_handler!(POST set_jobs(CollectionWrapper<JobMetaStorage>) -> ());
// get_results (A): user_id, job_id

@ -4,14 +4,18 @@ use {
Mutex,
MutexGuard
},
std::sync::Arc,
std::collections::HashMap,
std::{
sync::{Arc, Mutex as StdMutex},
collections::HashMap,
},
uuid::Uuid,
serde::{Serialize, Deserialize}
};
pub type CliStorage = HashMap<Uuid, UClient>;
pub type JobStorage = HashMap<Uuid, JobMeta>;
pub type JobMetaStorage = HashMap<Uuid, JobMeta>;
pub type JobMetaRef = Arc<StdMutex<JobMeta>>;
// because can't impl From<CollectionWrapper<...>> for Cow
#[derive(Serialize, Deserialize, Debug, Clone)]
@ -26,14 +30,12 @@ impl<T> From<T> for CollectionWrapper<T> {
impl<T: Clone> ToMsg for CollectionWrapper<T> {}
impl<'cow, T: Clone> From<CollectionWrapper<T>> for Cow<'cow, CollectionWrapper<T>> {
#[inline]
fn from(obj: CollectionWrapper<T>) -> Cow<'cow, CollectionWrapper<T>> {
Cow::Owned(obj)
}
}
impl<'cow, T: Clone> From<&'cow CollectionWrapper<T>> for Cow<'cow, CollectionWrapper<T>> {
#[inline]
fn from(obj: &'cow CollectionWrapper<T>) -> Cow<'cow, CollectionWrapper<T>> {
Cow::Borrowed(obj)
}

@ -1,16 +1,24 @@
use std::{
process::Command,
// process::Command,
time::SystemTime,
cmp::PartialEq,
sync::{Arc, Mutex, MutexGuard},
};
use serde::{
Serialize,
Deserialize
};
use uuid::Uuid;
//use tokio::process::Command;
use tokio::process::Command;
use super::*;
use crate::{UError, UErrType};
use crate::{
UError,
UErrType,
UErrType::JobError,
BoxError,
JobErrType,
UResult,
};
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -29,7 +37,7 @@ pub enum JobSchedule {
//TODO: Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
Pending, // client got a job, but not running yet
@ -75,13 +83,15 @@ impl JobMeta {
}
}
pub fn into_arc(self) -> JobMetaRef {
Arc::new(Mutex::new(self))
}
pub fn touch(&mut self) {
self.updated = SystemTime::now();
}
}
impl ToMsg for JobMeta {}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobOutput {
@ -109,6 +119,16 @@ impl JobOutput {
}
}
pub fn stdout(mut self, data: Vec<u8>) -> Self {
self.stdout = data;
self
}
pub fn stderr(mut self, data: Vec<u8>) -> Self {
self.stderr = data;
self
}
pub fn multiline(&self) -> String {
let mut result = String::new();
if self.stdout.len() > 0 {
@ -133,10 +153,9 @@ impl JobOutput {
.map(|d| Vec::from(d.trim().as_bytes()))
.collect::<Vec<Vec<u8>>>()
.into_iter();
let mut instance = JobOutput::new();
instance.stdout = parts.next().unwrap();
instance.stderr = parts.next().unwrap_or(vec![]);
instance
JobOutput::new()
.stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(vec![]))
})
}
}
@ -144,83 +163,102 @@ impl JobOutput {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobResult {
pub id: Uuid,
pub data: Result<JobOutput, UError>,
pub data: Option<Result<JobOutput, UError>>,
pub state: JobState,
pub retcode: Option<i32>,
pub date: SystemTime,
}
impl ToMsg for JobResult {}
pub struct Job<'meta> {
pub struct Job {
result: JobResult,
meta: &'meta mut JobMeta,
meta: JobMetaRef,
}
impl<'meta> Job<'meta> {
pub fn new(job_meta: &'meta mut JobMeta) -> Self {
impl Job {
pub fn new(job_meta: JobMetaRef) -> Self {
let id = job_meta.lock().unwrap().id.clone();
let state = job_meta.lock().unwrap().state.clone();
Self {
result: JobResult {
id: job_meta.id.clone(),
state: job_meta.state.clone(),
data: Ok(JobOutput::new()),
id,
state: if state == JobState::Queued {
JobState::Pending
} else {
state
},
data: None,
retcode: None,
},
meta: job_meta,
}
}
pub fn run(&mut self) {
match self.meta.exec_type {
pub async fn run(mut self) -> UResult<JobResult> {
match self.exec_type() {
JobType::Shell => {
match self.meta.state {
match self.state() {
JobState::Queued | JobState::Pending => {
self.update_state(Some(JobState::Running));
},
JobState::Finished => {
if self.meta.schedule == JobSchedule::Permanent {
if self.schedule() == JobSchedule::Permanent {
self.update_state(Some(JobState::Running))
} else {
return
return Err(UError::new_type(
JobError(JobErrType::Finished)
))
}
},
JobState::Running => return
JobState::Running => return Err(UError::new_type(
JobError(JobErrType::AlreadyRunning)
))
}
match &self.meta.payload {
let str_payload = match &self.lock().payload {
Some(box_payload) => {
let payload = String::from_utf8_lossy(box_payload).into_owned();
let mut cmd_parts = payload
String::from_utf8_lossy(box_payload).into_owned()
}
None => unimplemented!()
};
let mut cmd_parts = str_payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
.into_iter();
let cmd = cmd_parts.nth(0).unwrap();
let args = cmd_parts.collect::<Vec<_>>();
let result = Command::new(cmd)
let cmd_result = Command::new(cmd)
.args(args)
.output();
match result {
.output()
.await;
let (data, retcode) = match cmd_result {
Ok(output) => {
let job_out: &mut JobOutput = self.result.data.as_mut().unwrap();
job_out.stdout = output.stdout.to_vec();
job_out.stderr = output.stderr.to_vec();
self.result.retcode = output.status.code();
(
Some(Ok(JobOutput::new()
.stdout(output.stdout.to_vec())
.stderr(output.stderr.to_vec()))
),
output.status.code()
)
}
Err(e) => {
self.result.data = Err(
UError::new(UErrType::JobError, e.to_string())
);
self.result.retcode = None;
}
}
}
None => return
(
Some(Err(UError::new(
UErrType::JobError(JobErrType::System),
e.to_string()
))),
None
)
}
self.meta.state = JobState::Finished;
};
self.update_state(Some(JobState::Finished));
self.result.data = data;
self.result.retcode = retcode;
},
_ => unimplemented!()
}
Ok(self.into_result())
}
/// None => state is copied from meta to result field
@ -228,17 +266,40 @@ impl<'meta> Job<'meta> {
pub fn update_state(&mut self, state: Option<JobState>) {
match state {
Some(state) => {
self.meta.state = state.clone();
self.meta.lock().unwrap().state = state.clone();
self.result.state = state;
}
None => {
self.result.state = self.meta.state.clone();
self.result.state = self.state();
}
}
}
fn lock(&self) -> MutexGuard<JobMeta> {
self.meta.lock().unwrap()
}
pub fn id(&self) -> Uuid {
self.lock().id.clone()
}
pub fn into_result(mut self) -> JobResult {
Self::update_state(&mut self, None);
pub fn state(&self) -> JobState {
self.lock().state.clone()
}
pub fn exec_type(&self) -> JobType {
self.lock().exec_type.clone()
}
pub fn schedule(&self) -> JobSchedule {
self.lock().schedule.clone()
}
pub fn finished(&self) -> bool {
self.state() == JobState::Finished
}
pub fn into_result(self) -> JobResult {
self.result
}
}
@ -248,30 +309,39 @@ impl<'meta> Job<'meta> {
mod tests {
use super::*;
use crate::{
execute_jobs,
send_jobs_to_executor,
exec_job,
utils::vec_to_string
};
#[test]
fn test_shell_job() {
let mut job = JobMeta::from_shell("whoami".into());
let mut jobs: Vec<Job> = vec![Job::new(&mut job)];
execute_jobs(&mut jobs);
let job_result = jobs.pop().unwrap().into_result();
#[tokio::test]
async fn test_is_really_async() {
let secs_to_sleep = 1;
let job = JobMeta::from_shell(format!("sleep {}", secs_to_sleep)).into_arc();
let sleep_jobs = vec![job.clone(), job.clone(), job.clone()];
let now = SystemTime::now();
send_jobs_to_executor(sleep_jobs).await;
assert_eq!(now.elapsed().unwrap().as_secs(), secs_to_sleep)
}
#[tokio::test]
async fn test_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("whoami".into()).into_arc();
let job_result = exec_job(job.clone()).await.unwrap();
assert_eq!(
&vec_to_string(&job_result.data.unwrap().stdout),
"plazmoid"
vec_to_string(&job_result.data.unwrap()?.stdout).trim(),
"root"
);
Ok(())
}
#[test]
fn test_failing_shell_job() {
let mut job = JobMeta::from_shell("lol_kek_puk".into());
let mut jobs: Vec<Job> = vec![Job::new(&mut job)];
execute_jobs(&mut jobs);
let job_result = jobs.pop().unwrap().into_result();
assert!(job_result.data.is_err());
#[tokio::test]
async fn test_failing_shell_job() -> UResult<()> {
let job = JobMeta::from_shell("lol_kek_puk".into()).into_arc();
let job_result = exec_job(job.clone()).await.unwrap();
assert!(job_result.data.unwrap().is_err());
assert_eq!(job_result.retcode, None);
Ok(())
}
#[test]

@ -33,15 +33,6 @@ where I: Clone {
}
}
/* crutch
// because Vec is stored as &[] (wtf?)
pub fn new_owned(item: I) -> Self {
Self {
id: *UID,
item: Cow::Owned(item)
}
}*/
pub fn into_item(self) -> Cow<'cow, I> {
self.item
}

@ -34,4 +34,4 @@ macro_rules! to_message {
}
}
to_message!(ClientInfo, RawMsg);
to_message!(ClientInfo, RawMsg, JobMeta, JobResult);

@ -9,11 +9,19 @@ use serde::{
pub type BoxError = Box<(dyn StdError + Send + Sync + 'static)>;
pub type UResult<T> = std::result::Result<T, UError>;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum JobErrType {
AlreadyRunning,
Finished,
System
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum UErrType {
ConnectionError,
ParseError,
JobError,
JobError(JobErrType),
Unknown,
Raw(String)
}
@ -38,12 +46,15 @@ impl UError {
})
}
}
pub fn new_type(err_type: UErrType) -> Self {
UError::new(err_type, String::new())
}
}
impl fmt::Debug for UError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut builder = f.debug_struct("errors::UError");
builder.field("kind", &self.inner.err_type);
builder.field("source", &self.inner.source);
builder.finish()
@ -53,13 +64,18 @@ impl fmt::Debug for UError {
impl fmt::Display for UError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let e_type = match self.inner.err_type {
UErrType::Raw(ref msg) => msg,
UErrType::ConnectionError => "Connection error",
UErrType::ParseError => "Parse error",
UErrType::JobError => "Job error",
UErrType::Unknown => "Unknown error",
UErrType::Raw(ref msg) => msg.clone(),
UErrType::ConnectionError => "Connection error".to_string(),
UErrType::ParseError => "Parse error".to_string(),
UErrType::JobError(ref inner) =>
(String::from("Job error: ") + match *inner {
JobErrType::AlreadyRunning => "job is already running",
JobErrType::Finished => "once-scheduled job is already finished",
JobErrType::System => "system error"
}),
UErrType::Unknown => "Unknown error".to_string(),
};
f.write_str(e_type)?;
f.write_str(&e_type)?;
write!(f, ": {}", self.inner.source)
}

@ -10,20 +10,138 @@ enum Schedule {
Once
}
*/
lazy_static! {
pub static ref EXECUTOR: Vec<>
use crate::{
contracts::*,
UResult,
};
use std::{
sync::{Mutex, MutexGuard, Arc},
thread,
time::Duration,
collections::HashMap,
};
use uuid::Uuid;
//use tokio::task::JoinHandle;
use futures::future::{join_all, JoinAll};
use futures::Future;
type Executables = Vec<Job>;
struct AsyncExecutor {
new_jobs: Executables,
promises: Executables
}
impl AsyncExecutor {
pub fn new() -> Self {
Self {
new_jobs: vec![],
promises: vec![]
}
}
/*
pub fn process_jobs(&mut self) {
new_jobs
if job.state() == JobState::Pending {
tokio::spawn(job.run);
}
}
*/
pub async fn apply_job(&mut self, mut new_job: Job) -> UResult<JobResult> {
/*let id = new_job.id();
let mut job_pool = self.jobs.lock().unwrap();
job_pool.push(new_job);
id*/
tokio::spawn(async move {
new_job.run().await
}).await.unwrap()
}
use crate::contracts::*;
pub async fn apply_jobs(&mut self, new_jobs: Vec<Job>) -> Vec<UResult<JobResult>> {
/*
let mut job_pool = self.jobs.lock().unwrap();
job_pool.extend(
new_jobs.into_iter()
.map(|job| (job.id(), job))
);*/
let futures = new_jobs.into_iter().map(|mut job| {
job.run()
}).collect::<Vec<_>>();
join_all(futures).await
}
/*
pub fn get_job_result(&self, id: &Uuid) -> Option<JobResult> {
let mut job_pool = self.jobs.lock().unwrap();
let (state, result) = match job_pool.get(id) {
Some(job) => (job.state(), job.get_result()),
None => return None
};
if state == JobState::Finished {
job_pool.remove(&id);
}
Some(result)
}
pub fn get_all_results(&self) -> Vec<JobResult> {
let mut job_pool = self.jobs.lock().unwrap();
let to_remove = job_pool.values()
.filter(|job| job.finished())
.map(|job| job.id())
.collect::<Vec<Uuid>>();
let results = job_pool.values()
.map(|job| job.get_result())
.collect();
to_remove.into_iter().for_each(|id| {
job_pool.remove(&id);
});
results
}*/
}
lazy_static! {
static ref EXECUTOR: Mutex<AsyncExecutor> =
Mutex::new(AsyncExecutor::new());
}
/*
pub fn get_job_result(id: &Uuid, wait: bool) -> Option<JobResult> {
let executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
loop {
let result = executor.get_job_result(&id);
if wait {
if result.is_some() && result.as_ref().unwrap().state == JobState::Finished {
return result
}
thread::sleep(Duration::from_secs(1))
} else {
return result
}
}
}
pub fn get_all_results() -> Vec<JobResult> {
let executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
executor.get_all_results()
}
*/
pub fn execute_jobs(jobs: &mut Vec<Job>) {
jobs.iter_mut().for_each(|job| job.run())
// run jobs without awaiting (now is waiting)
pub async fn send_jobs_to_executor(
jobs: Vec<JobMetaRef>
) -> Vec<UResult<JobResult>> {
let mut executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
let executables = jobs.into_iter()
.map(|job_meta| Job::new(job_meta))
.collect();
executor.apply_jobs(executables).await
}
pub fn exec_job(job_meta: &mut JobMeta) -> JobResult {
let mut job = Job::new(job_meta);
job.run();
job.into_result()
// run job and await result
pub async fn exec_job(job_meta: JobMetaRef) -> UResult<JobResult> {
let mut executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
let job = Job::new(job_meta);
executor.apply_job(job).await
}
Loading…
Cancel
Save