working sync jobs

4-update-check
plazmoid 4 years ago
parent 50d6d9412b
commit df79be1a69
  1. 12
      bin/u_agent/src/main.rs
  2. 5
      bin/u_panel/src/main.rs
  3. 60
      lib/u_lib/src/client/client.rs
  4. 21
      lib/u_lib/src/client/network.rs
  5. 3
      lib/u_lib/src/contracts/datatypes.rs
  6. 276
      lib/u_lib/src/contracts/jobs.rs
  7. 27
      lib/u_lib/src/errors.rs
  8. 6
      lib/u_lib/src/executor.rs
  9. 6
      lib/u_lib/src/lib.rs

@ -7,22 +7,18 @@
// проверка ssh ключей и распространение через known_hosts
// самоуничтожение
//mod jobs;
use {
std::thread::sleep,
std::time::Duration,
u_lib::client::*
std::env,
u_lib::client::network::ClientHandler
};
//mod executor;
use network::ClientHandler;
#[tokio::main]
async fn main() {
//daemonize();
let instance = ClientHandler::new();
let arg_ip = env::args().nth(1);
let instance = ClientHandler::new(arg_ip);
instance.init().await;
loop {
sleep(Duration::from_secs(2));

@ -12,12 +12,13 @@ async fn main() -> Result<(), &'static str> {
Some(m) => m,
None => return Err("Method required")
};
let cli_handler = ClientHandler::new_pwd("123qwe".to_string());
let cli_handler = ClientHandler::new(None)
.password("123qwe".to_string());
match method.as_str() {
"ls" => {
let result = cli_handler.list().await;
for cli in result.iter() {
println!("{:?}", cli)
println!("{:#?}", cli)
}
},
_ => return Err("Unknown method")

@ -1,12 +1,14 @@
use std::{
collections::HashMap,
fmt
};
use serde::{
Deserialize,
Serialize
};
use uuid::Uuid;
use crate::{
contracts::*,
UID
};
use crate::{contracts::*, UID, exec_job};
pub struct UClient {
@ -25,25 +27,55 @@ impl UClient {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClientInfo {
pub local_ip: String,
pub hostname: String,
pub username: String,
pub os: String,
pub platform: String,
pub info: HashMap<String, String>,
pub id: Uuid,
}
impl ClientInfo {
pub fn gather() -> Self {
let mut info: HashMap<String, String> = HashMap::new();
for job in DEFAULT_JOBS {
let mut job_meta = JobMeta::from_shell(job.1.into());
let job_result = exec_job(&mut job_meta);
let job_data = match job_result.data {
Ok(output) => output.multiline(),
Err(e) => e.description()
};
info.insert(job.0.into(), job_data);
}
ClientInfo {
local_ip: String::from("1.2.3.4"),
hostname: String::from("polokonzerva"),
username: String::from("plazmoid"),
os: String::from("pinux"),
platform: String::from("x86_64"),
info,
id: *UID
}
}
pub fn get_field(&self, field: &str) -> Option<&String> {
self.info.get(field)
}
}
impl ToMsg for ClientInfo {}
const DEFAULT_JOBS: &[(&str, &str)] = &[
//("local ip", "ip a"),
("hostname", "hostname"),
("username", "whoami"),
("platform", "uname -a"),
];
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_gather() {
let cli_info = ClientInfo::gather();
let field = cli_info.get_field("username").unwrap();
assert_eq!(
JobOutput::from_multiline(field).unwrap().stdout,
b"plazmoid".to_vec()
)
}
}

@ -9,7 +9,10 @@ use reqwest::{
Response,
RequestBuilder
};
use std::{
net::Ipv4Addr,
str::FromStr
};
pub struct ClientHandler {
base_url: Url,
@ -19,21 +22,23 @@ pub struct ClientHandler {
}
impl ClientHandler {
pub fn new() -> Self {
pub fn new(server: Option<String>) -> Self {
let master_server = server
.map(|s| Ipv4Addr::from_str(&s).unwrap())
.unwrap_or(MASTER_SERVER);
Self {
client: Client::new(),
cli_info: ClientInfo::gather(),
base_url: Url::parse(
&format!("http://{}:{}", MASTER_SERVER, MASTER_PORT)
&format!("http://{}:{}", master_server, MASTER_PORT)
).unwrap(),
password: None
}
}
pub fn new_pwd(password: String) -> Self {
let mut instance = Self::new();
instance.password = Some(password);
instance
pub fn password(mut self, password: String) -> ClientHandler {
self.password = Some(password);
self
}
fn set_pwd(&self, rb: RequestBuilder) -> RequestBuilder {
@ -53,7 +58,7 @@ impl ClientHandler {
self.set_pwd(rb)
}
pub async fn init(&self) -> RawMsg {
pub async fn init(&self) -> RawMsg { // move to result
let response: Response = self.build_post("/new")
.json(&self.cli_info.as_message())
.send()

@ -9,9 +9,6 @@ use {
uuid::Uuid,
};
//pub type JobPool = Vec<Box<dyn Job>>;
pub type CliStorage = HashMap<Uuid, UClient>;

@ -1,7 +1,7 @@
use std::{
process::Command,
time::SystemTime,
cmp::PartialEq
cmp::PartialEq,
};
use serde::{
Serialize,
@ -10,80 +10,10 @@ use serde::{
use uuid::Uuid;
//use tokio::process::Command;
use super::*;
use crate::UError;
pub struct Job<'meta> {
pub result: JobResult,
pub meta: &'meta mut JobMeta,
}
impl<'meta> Job<'meta> {
pub fn new(job_meta: &'meta mut JobMeta) -> Self {
Self {
result: JobResult {
id: job_meta.id.clone(),
state: job_meta.state.clone(),
data: None,
},
meta: job_meta,
}
}
pub fn run(&mut self) {
match self.meta.exec_type {
JobType::Shell => {
match self.meta.state {
JobState::Queued | JobState::Pending => {
self.meta.state = JobState::Running;
},
JobState::Finished => {
if self.meta.schedule == JobSchedule::Permanent {
self.meta.state = JobState::Running;
} else {
return
}
},
JobState::Running => return
}
match &self.meta.payload {
Some(box_payload) => {
let payload = String::from_utf8_lossy(box_payload).into_owned();
let mut cmd_parts = payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
.into_iter();
let cmd = cmd_parts.nth(0).unwrap();
let args = cmd_parts.collect::<Vec<_>>();
let result = Command::new(cmd)
.args(args)
.output();
self.result.data = Some(match result {
Ok(output) => {
if output.status.success() {
Ok(output.stdout.to_vec())
} else {
Err(output.stderr.to_vec())
}
}
Err(e) => Err(e.to_string().into_bytes())
})
}
None => return
}
self.meta.state = JobState::Finished;
},
_ => unimplemented!()
}
}
fn into_result(self) -> JobResult {
self.result
}
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
Ping,
UpdateAvailable,
@ -91,7 +21,7 @@ pub enum ManageAction {
Terminate
}
#[derive(Serialize, Deserialize, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobSchedule {
Once,
Permanent,
@ -99,7 +29,7 @@ pub enum JobSchedule {
//TODO: Scheduled
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum JobState {
Queued, // server created a job, but client didn't get it yet
Pending, // client got a job, but not running yet
@ -108,7 +38,7 @@ pub enum JobState {
Finished,
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum JobType {
Manage(ManageAction),
Shell,
@ -116,7 +46,7 @@ pub enum JobType {
Binary
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobMeta {
pub id: Uuid,
pub name: String,
@ -130,40 +60,192 @@ pub struct JobMeta {
}
impl JobMeta {
pub fn from_shell(shell_cmd: Vec<u8>) -> Self {
pub fn from_shell(shell_cmd: String) -> Self {
let uid = Uuid::new_v4();
//let str_payload_name: &[u8] = shell_cmd.split(|b| &[*b] == b" ").collect();
//let job_name = format!("{} {}", uid.to_string()[..6], str_payload_name[0]);
let job_name = shell_cmd.split(" ").nth(0).unwrap();
Self {
id: uid.clone(),
name: uid.to_string(),
name: job_name.to_string(),
created: SystemTime::now(),
updated: SystemTime::now(),
state: JobState::Pending,
exec_type: JobType::Shell,
schedule: JobSchedule::Once,
append_result: true,
payload: Some(Box::new(shell_cmd))
payload: Some(Box::new(shell_cmd.into_bytes()))
}
}
pub fn from_shell_str(shell_cmd: String) -> Self {
Self::from_shell(shell_cmd.into_bytes())
pub fn touch(&mut self) {
self.updated = SystemTime::now();
}
}
impl ToMsg for JobMeta {}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
impl JobOutput {
const STREAM_BORDER: &'static str = "***";
const STDOUT: &'static str = "STDOUT";
const STDERR: &'static str = "STDERR";
#[inline]
fn create_delim(header: &'static str) -> String {
format!("{border} {head} {border}\n",
border = JobOutput::STREAM_BORDER,
head = header
)
}
pub fn new() -> Self {
Self {
stdout: Vec::new(),
stderr: Vec::new(),
}
}
pub fn multiline(&self) -> String {
let mut result = String::new();
if self.stdout.len() > 0 {
result += &format!("{stdout_head}{stdout}\n",
stdout_head = JobOutput::create_delim(JobOutput::STDOUT),
stdout = String::from_utf8_lossy(&self.stdout))
}
if self.stderr.len() > 0 {
result += &format!("{stderr_head}{stderr}\n",
stderr_head = JobOutput::create_delim(JobOutput::STDERR),
stderr = String::from_utf8_lossy(&self.stderr))
}
result
}
pub fn from_multiline(raw: &String) -> Option<Self> {
let err_header = JobOutput::create_delim(JobOutput::STDERR);
raw.find(&err_header)
.and(raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT)))
.map(|s: &str| {
let mut parts = s.split(&err_header)
.map(|d| Vec::from(d.trim().as_bytes()))
.collect::<Vec<Vec<u8>>>()
.into_iter();
let mut instance = JobOutput::new();
instance.stdout = parts.next().unwrap();
instance.stderr = parts.next().unwrap();
instance
})
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JobResult {
id: Uuid,
data: Option<Result<Vec<u8>, Vec<u8>>>,
state: JobState
pub id: Uuid,
pub data: Result<JobOutput, UError>,
pub state: JobState,
pub retcode: Option<i32>,
}
impl ToMsg for JobResult {}
pub struct Job<'meta> {
pub result: JobResult,
pub meta: &'meta mut JobMeta,
}
impl<'meta> Job<'meta> {
pub fn new(job_meta: &'meta mut JobMeta) -> Self {
Self {
result: JobResult {
id: job_meta.id.clone(),
state: job_meta.state.clone(),
data: Ok(JobOutput::new()),
retcode: None,
},
meta: job_meta,
}
}
pub fn run(&mut self) {
match self.meta.exec_type {
JobType::Shell => {
match self.meta.state {
JobState::Queued | JobState::Pending => {
self.update_state(Some(JobState::Running));
},
JobState::Finished => {
if self.meta.schedule == JobSchedule::Permanent {
self.update_state(Some(JobState::Running))
} else {
return
}
},
JobState::Running => return
}
match &self.meta.payload {
Some(box_payload) => {
let payload = String::from_utf8_lossy(box_payload).into_owned();
let mut cmd_parts = payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
.into_iter();
let cmd = cmd_parts.nth(0).unwrap();
let args = cmd_parts.collect::<Vec<_>>();
let result = Command::new(cmd)
.args(args)
.output();
match result {
Ok(output) => {
let job_out: &mut JobOutput = self.result.data.as_mut().unwrap();
job_out.stdout = output.stdout.to_vec();
job_out.stderr = output.stderr.to_vec();
self.result.retcode = output.status.code();
}
Err(e) => {
self.result.data = Err(
UError::Raw(e.to_string())
);
self.result.retcode = None;
}
}
}
None => return
}
self.meta.state = JobState::Finished;
},
_ => unimplemented!()
}
}
/// None => state is copied from meta to result field
/// Some => state is applied to both meta and result fields
pub fn update_state(&mut self, state: Option<JobState>) {
match state {
Some(state) => {
self.meta.state = state.clone();
self.result.state = state;
}
None => {
self.result.state = self.meta.state.clone();
}
}
}
pub fn into_result(mut self) -> JobResult {
Self::update_state(&mut self, None);
self.result
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -171,9 +253,23 @@ mod tests {
#[test]
fn test_shell_job() {
let mut job = JobMeta::from_shell_str("whoami".into());
let mut job = JobMeta::from_shell("whoami".into());
let mut jobs: Vec<Job> = vec![Job::new(&mut job)];
execute_jobs(&mut jobs);
let job_result = jobs.pop().unwrap().into_result();
assert_eq!(
job_result.data.unwrap().stdout,
b"plazmoid\n".to_vec()
);
}
#[test]
fn test_failing_shell_job() {
let mut job = JobMeta::from_shell("lol_kek_puk".into());
let mut jobs: Vec<Job> = vec![Job::new(&mut job)];
execute_jobs(&mut jobs);
assert_eq!(jobs.pop().unwrap().result.data.unwrap().unwrap(), b"plazmoid\n".to_vec());
let job_result = jobs.pop().unwrap().into_result();
assert!(job_result.data.is_err());
assert_eq!(job_result.retcode, None);
}
}

@ -0,0 +1,27 @@
use std::fmt;
use serde::{
Serialize,
Deserialize
};
#[derive(Serialize, Deserialize, Clone)]
pub enum UError {
Raw(String)
}
impl UError {
pub fn description(&self) -> String {
match self {
UError::Raw(msg) => msg.clone()
}
}
}
impl fmt::Debug for UError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = match self {
UError::Raw(msg) => msg
};
write!(f, "{}", msg)
}
}

@ -21,3 +21,9 @@ use crate::contracts::*;
pub fn execute_jobs(jobs: &mut Vec<Job>) {
jobs.iter_mut().for_each(|job| job.run())
}
pub fn exec_job(job_meta: &mut JobMeta) -> JobResult {
let mut job = Job::new(job_meta);
job.run();
job.into_result()
}

@ -1,13 +1,15 @@
pub mod executor;
pub mod config;
pub mod contracts;
pub mod utils;
pub mod client;
pub mod errors;
pub mod contracts;
pub use {
utils::*,
config::*,
executor::*
executor::*,
errors::*,
};
#[macro_use]

Loading…
Cancel
Save