Scheduling & stats #12

Merged
root merged 4 commits from 10-scheduling into master 1 year ago
  1. 760
      Cargo.lock
  2. 183
      bin/u_agent/src/lib.rs
  3. 2
      bin/u_panel/src/gui/fe/src/app/app.module.ts
  4. 4
      bin/u_panel/src/gui/fe/src/app/components/dialogs/base-info-dialog.component.less
  5. 5
      bin/u_panel/src/gui/fe/src/app/components/payload-overview/payload-overview.component.html
  6. 2
      bin/u_panel/src/gui/fe/src/index.html
  7. 45
      bin/u_server/src/handlers.rs
  8. 2
      integration-tests/docker-compose.yml
  9. 2
      integration-tests/tests/fixtures/agent.rs
  10. 2
      integration-tests/tests/helpers/panel.rs
  11. 51
      integration-tests/tests/integration_tests/behaviour.rs
  12. 4
      lib/u_lib/Cargo.toml
  13. 7
      lib/u_lib/src/api.rs
  14. 10
      lib/u_lib/src/cache.rs
  15. 167
      lib/u_lib/src/executor.rs
  16. 92
      lib/u_lib/src/jobs.rs
  17. 3
      lib/u_lib/src/lib.rs
  18. 4
      lib/u_lib/src/models/agent.rs
  19. 33
      lib/u_lib/src/models/jobs/assigned.rs
  20. 11
      lib/u_lib/src/models/jobs/meta.rs
  21. 2
      lib/u_lib/src/models/jobs/misc.rs
  22. 3
      lib/u_lib/src/models/mod.rs
  23. 2
      lib/u_lib/src/models/payload.rs
  24. 7
      lib/u_lib/src/models/schema.rs
  25. 14
      lib/u_lib/src/models/stats.rs
  26. 2
      lib/u_lib/src/proc_output.rs
  27. 91
      lib/u_lib/src/scheduler/entry.rs
  28. 178
      lib/u_lib/src/scheduler/mod.rs
  29. 200
      lib/u_lib/src/u_runner.rs
  30. 49
      lib/u_lib/src/ufs/index.rs
  31. 59
      lib/u_lib/src/ufs/mod.rs
  32. 2
      migrations/2020-10-24-111622_create_all/up.sql

760
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -1,110 +1,102 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
use std::process::exit;
use tokio::runtime::Builder; use tokio::runtime::Builder;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use u_lib::models::PreparedJob;
use u_lib::scheduler::SCHEDULER;
use u_lib::u_runner::{IdentifiableFuture, URunner};
use u_lib::{ use u_lib::{
api::HttpClient, api::HttpClient,
cache::JobCache, cache::JobCache,
config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL}, config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL},
error::ErrChan, error::ErrChan,
executor,
jobs::AnonymousJobBatch,
logging::init_logger, logging::init_logger,
messaging::Reportable, messaging::Reportable,
models::AssignedJobById, models::AssignedJobById,
}; };
async fn process_request(jobs: Vec<AssignedJobById>, client: &HttpClient) { async fn process_request(assigned_jobs: Vec<AssignedJobById>, client: &HttpClient) {
if !jobs.is_empty() { for asgn_job in assigned_jobs {
for jr in &jobs { if !JobCache::contains(asgn_job.job_id) {
if !JobCache::contains(jr.job_id) { info!("Fetching job: {}", &asgn_job.job_id);
info!("Fetching job: {}", &jr.job_id); let mut fetched_job = loop {
let mut fetched_job = loop { //todo: use payload cache
//todo: use payload cache match client.get_full_job(asgn_job.job_id).await {
match client.get_full_job(jr.job_id).await { Ok(result) => break result,
Ok(result) => break result, Err(err) => {
Err(err) => { debug!("{:?} \nretrying...", err);
debug!("{:?} \nretrying...", err); sleep(AGENT_ITERATION_INTERVAL).await;
sleep(AGENT_ITERATION_INTERVAL).await;
}
}
};
if let Some(payload) = &mut fetched_job.payload {
if let Err(e) = payload.maybe_split_payload() {
ErrChan::send(e, "pld").await;
} }
} }
JobCache::insert(fetched_job); };
if let Some(payload) = &mut fetched_job.payload {
if let Err(e) = payload.maybe_split_payload() {
ErrChan::send(e, "pay").await;
continue;
}
} }
JobCache::insert(fetched_job);
} }
info!(
"Scheduling jobs: {}",
jobs.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join(", ")
);
let meta_with_ids = jobs let job = match JobCache::get(asgn_job.job_id).as_deref() {
.into_iter() Some(job) => job.clone(),
.map(|job| { None => continue,
let meta = JobCache::get(job.job_id).unwrap().clone(); };
(meta, job)
})
.collect::<Vec<_>>();
AnonymousJobBatch::from_meta_with_id(meta_with_ids) info!("Scheduling job {}", job.meta.id.to_string());
.spawn()
.await;
}
}
async fn error_reporting(client: HttpClient) -> ! { let schedule = match job.meta.schedule.clone() {
loop { Some(sched) => {
match ErrChan::recv().await { if sched.is_empty() {
Some(err) => { None
'retry: for _ in 0..3 { } else {
match client.report([Reportable::Error(err.clone())]).await { match sched.as_str().try_into() {
Ok(_) => break 'retry, Ok(s) => Some(s),
Err(e) => { Err(err) => {
debug!("Reporting error: {:?}", e); ErrChan::send(err, "sch").await;
sleep(Duration::from_secs(10)).await; continue;
} }
} }
} }
} }
None => sleep(Duration::from_secs(3)).await, None => None,
} };
SCHEDULER
.add_job(schedule, PreparedJob { job, ids: asgn_job })
.await;
}
}
async fn error_reporting(client: HttpClient) {
while let Some(err) = ErrChan::recv().await {
let _ = client.report([Reportable::Error(err.clone())]).await;
} }
} }
async fn agent_loop(client: HttpClient) -> ! { async fn agent_loop(client: HttpClient) {
let self_id = get_self_id(); let self_id = get_self_id();
loop {
match client.get_personal_jobs(self_id).await { match client.get_personal_jobs(self_id).await {
Ok(jobs) => { Ok(jobs) => {
process_request(jobs, &client).await; process_request(jobs, &client).await;
}
Err(err) => ErrChan::send(err, "processing").await,
} }
Err(err) => ErrChan::send(err, "pro").await,
}
let result: Vec<Reportable> = executor::pop_completed() let result: Vec<Reportable> = URunner::pop_completed()
.await .await
.into_iter() .into_iter()
.map(|result| match result { .map(|result| match result {
Ok(r) => Reportable::Assigned(r), Ok(r) => Reportable::Assigned(r),
Err(e) => Reportable::Error(e), Err(e) => Reportable::Error(e),
}) })
.collect(); .collect();
if !result.is_empty() { if !result.is_empty() {
if let Err(err) = client.report(result).await { if let Err(err) = client.report(result).await {
ErrChan::send(err, "report").await; ErrChan::send(err, "rep").await;
}
} }
sleep(AGENT_ITERATION_INTERVAL).await;
} }
} }
@ -134,15 +126,42 @@ pub fn run_forever() -> ! {
.build() .build()
.unwrap() .unwrap()
.block_on(async { .block_on(async {
match HttpClient::new(&env.u_server, None).await { let client = loop {
Ok(client) => { match HttpClient::new(&env.u_server, None).await {
tokio::spawn(error_reporting(client.clone())); Ok(client) => break client,
agent_loop(client).await Err(e) => {
} error!("client init failed: {}", e);
Err(e) => { sleep(Duration::from_secs(5)).await;
error!("client init failed: {}", e); continue;
exit(7) // todo: wtf? }
} }
};
{
let client = client.clone();
SCHEDULER
.add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || {
let client = client.clone();
IdentifiableFuture::from_fut_with_ident("error_reporting", async move {
error_reporting(client.clone()).await
})
})
.await;
}
{
let client = client.clone();
SCHEDULER
.add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || {
let client = client.clone();
IdentifiableFuture::from_fut_with_ident("agent_loop", async move {
agent_loop(client).await
})
})
.await;
} }
SCHEDULER.start_blocking().await
}) })
} }

@ -12,6 +12,7 @@ import { MatSelectModule } from '@angular/material/select';
import { MatProgressSpinnerModule } from '@angular/material/progress-spinner'; import { MatProgressSpinnerModule } from '@angular/material/progress-spinner';
import { HttpClientModule } from '@angular/common/http'; import { HttpClientModule } from '@angular/common/http';
import { MatDialogModule } from '@angular/material/dialog'; import { MatDialogModule } from '@angular/material/dialog';
import { MatGridListModule } from '@angular/material/grid-list';
import { MatIconModule } from '@angular/material/icon'; import { MatIconModule } from '@angular/material/icon';
import { FormsModule } from '@angular/forms'; import { FormsModule } from '@angular/forms';
import { AgentComponent, JobComponent, ResultComponent, PayloadComponent } from './components/tables'; import { AgentComponent, JobComponent, ResultComponent, PayloadComponent } from './components/tables';
@ -62,6 +63,7 @@ import { NewPayloadDialogComponent } from './components/dialogs/new-payload-dial
MatSnackBarModule, MatSnackBarModule,
MatSelectModule, MatSelectModule,
MatListModule, MatListModule,
MatGridListModule,
FormsModule, FormsModule,
BrowserAnimationsModule BrowserAnimationsModule
], ],

@ -12,3 +12,7 @@ div.info-dialog-forms-box-smol {
float: left; float: left;
margin-right: 10px; margin-right: 10px;
} }
.code {
font-family: "Roboto Mono", monospace;
}

@ -1,6 +1,7 @@
<mat-form-field class="info-dlg-field" floatLabel="always"> <mat-form-field style="box-sizing:border-box; width:100%" class="info-dlg-field" floatLabel="always">
<mat-label>Payload data</mat-label> <mat-label>Payload data</mat-label>
<textarea matInput cdkTextareaAutosize *ngIf="!isTooBigPayload" [readonly]="isPreview" [(ngModel)]="decodedPayload"> <textarea class="code" matInput cdkTextareaAutosize="true" *ngIf="!isTooBigPayload" [readonly]="isPreview"
[(ngModel)]="decodedPayload">
</textarea> </textarea>
<input matInput *ngIf="isTooBigPayload" disabled placeholder="Payload is too big to display"> <input matInput *ngIf="isTooBigPayload" disabled placeholder="Payload is too big to display">
</mat-form-field> </mat-form-field>

@ -7,7 +7,7 @@
<meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="icon" type="image/x-icon" href="favicon.ico"> <link rel="icon" type="image/x-icon" href="favicon.ico">
<link rel="preconnect" href="https://fonts.gstatic.com"> <link rel="preconnect" href="https://fonts.gstatic.com">
<link href="https://fonts.googleapis.com/css2?family=Roboto:wght@300;400;500&display=swap" rel="stylesheet"> <link href="https://fonts.googleapis.com/css2?family=Roboto+Mono:wght@300;400;500&display=swap" rel="stylesheet">
<link href="https://fonts.googleapis.com/icon?family=Material+Icons" rel="stylesheet"> <link href="https://fonts.googleapis.com/icon?family=Material+Icons" rel="stylesheet">
</head> </head>

@ -249,7 +249,10 @@ impl Endpoints {
Reportable::Assigned(mut result) => { Reportable::Assigned(mut result) => {
let result_agent_id = &result.agent_id; let result_agent_id = &result.agent_id;
if agent_id != *result_agent_id { if agent_id != *result_agent_id {
warn!("Agent ids are not equal! actual id: {agent_id}, id from job: {result_agent_id}"); warn!(
"Agent ids are not equal! actual id: {agent_id}, \
id from job: {result_agent_id}"
);
continue; continue;
} }
result.touch(); result.touch();
@ -260,26 +263,23 @@ impl Endpoints {
JobType::Init => { JobType::Init => {
result.state = JobState::Finished; result.state = JobState::Finished;
match &result.result { let mut agent: Agent = match result.deserialize() {
Some(rbytes) => { Ok(a) => a,
let mut agent: Agent = match serde_json::from_slice(&rbytes) { Err(e) => {
Ok(a) => a, error!(
Err(e) => { "Error deserializing agent \
error!("Error deserializing agent data from {agent_id}: {e}"); data from {agent_id}: {e}"
continue; );
} continue;
}; }
agent.state = AgentState::Active; };
db.upsert_agent(&agent)?; agent.state = AgentState::Active;
} db.upsert_agent(&agent)?;
None => error!("Empty agent data"), }
}}, JobType::Shell => result.state = JobState::Finished,
JobType::Shell => { JobType::Stats => result.state = JobState::Finished,
result.state = JobState::Finished JobType::Terminate => (),
}, JobType::Update => (),
JobType::Service => (),
JobType::Terminate => todo!(),
JobType::Update => todo!(),
} }
db.update_result(&result)?; db.update_result(&result)?;
} }
@ -287,7 +287,8 @@ impl Endpoints {
error!("agent {agent_id} reported: {e}"); error!("agent {agent_id} reported: {e}");
} }
Reportable::Dummy => (), Reportable::Dummy => (),
}} }
}
Ok(()) Ok(())
}) })
.await .await

@ -40,7 +40,7 @@ services:
networks: networks:
- u_net - u_net
ports: ports:
- 54321:5432 - 5432:5432
env_file: env_file:
- ../.env - ../.env
- ../.env.private - ../.env.private

@ -25,7 +25,7 @@ pub fn registered_agent(client: &HttpClient) -> RegisteredAgent {
assert_eq!(job.meta.alias, Some("agent_hello".to_string())); assert_eq!(job.meta.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from((&job.meta, resp)); let mut agent_data = AssignedJob::from(&PreparedJob { job, ids: resp });
agent_data.set_result(&agent); agent_data.set_result(&agent);
client client

@ -80,7 +80,7 @@ pub trait IntoArgs {
impl IntoArgs for String { impl IntoArgs for String {
fn into_args(self) -> Vec<String> { fn into_args(self) -> Vec<String> {
<Self as AsRef<str>>::as_ref(&self).into_args() self.as_str().into_args()
} }
fn display(&self) -> String { fn display(&self) -> String {

@ -16,7 +16,7 @@ async fn registration(registered_agent: &RegisteredAgent) {
} }
#[tokio::test] #[tokio::test]
async fn setup_tasks() { async fn setup_jobs() {
let agents: Vec<Agent> = Panel::check_output("agents read"); let agents: Vec<Agent> = Panel::check_output("agents read");
let agent_id = agents[0].id; let agent_id = agents[0].id;
let job_alias = "passwd_contents"; let job_alias = "passwd_contents";
@ -88,3 +88,52 @@ async fn large_payload() {
}) })
.await; .await;
} }
#[tokio::test]
async fn gather_stats() {
let agent = &Panel::check_output::<Vec<Agent>>("agents read")[0];
let job_alias = "stats";
let job = RawJob::default()
.with_type(JobType::Stats)
.with_alias(job_alias)
.with_target_platforms(&agent.platform)
.try_into_job()
.unwrap();
let job_id = job.meta.id;
let agent_id = agent.id;
Panel::check_status(["jobs", "create", &to_string(&RawJob::from(job)).unwrap()]);
let assigned = AssignedJobById {
agent_id,
job_id,
..Default::default()
};
Panel::check_status(["map", "create", &to_string(&[assigned]).unwrap()]);
let result = retry_with_interval(5, AGENT_ITERATION_INTERVAL, || {
let result =
Panel::check_output::<Vec<AssignedJob>>(["map", "read", &job_id.to_string()]).remove(0);
if result.state == JobState::Finished {
Ok(result)
} else {
Err("job didn't finish")
}
})
.await;
let stats = result.deserialize::<Stats>().unwrap();
assert_eq!(stats.agent_id, agent_id);
assert!(stats.cached_jobs.contains(&job_id));
assert!(stats
.scheduled_jobs
.iter()
.find(|j| j.job_ident == "error_reporting")
.is_some());
assert!(stats
.scheduled_jobs
.iter()
.find(|j| j.job_ident == "agent_loop")
.is_some())
}

@ -8,7 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }
chrono = "0.4.19" chrono = { version = "0.4.19", features = ["serde"] }
diesel = { workspace = true, optional = true } diesel = { workspace = true, optional = true }
diesel-derive-enum = { version = "2.0.0", features = ["postgres"], optional = true } diesel-derive-enum = { version = "2.0.0", features = ["postgres"], optional = true }
deadpool-diesel = { workspace = true, optional = true } deadpool-diesel = { workspace = true, optional = true }
@ -33,6 +33,8 @@ uuid = { workspace = true, features = ["serde", "v4"] }
parking_lot = "0.12.1" parking_lot = "0.12.1"
bincode = "1.3.3" bincode = "1.3.3"
sha3 = "0.10.7" sha3 = "0.10.7"
cron = "0.12.0"
hex = "0.4.3"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
daemonize = "0.5" daemonize = "0.5"

@ -233,11 +233,8 @@ impl HttpClient {
&self, &self,
assigned: impl IntoIterator<Item = &AssignedJobById>, assigned: impl IntoIterator<Item = &AssignedJobById>,
) -> Result<api_types::SetJobs> { ) -> Result<api_types::SetJobs> {
self.req_with_payload( self.req_with_payload("assign_jobs", &assigned.into_iter().collect::<Vec<_>>())
format!("assign_jobs"), .await
&assigned.into_iter().collect::<Vec<_>>(),
)
.await
} }
/// get jobs for any agent by job_id, agent_id or result_id /// get jobs for any agent by job_id, agent_id or result_id

@ -22,7 +22,7 @@ impl JobCache {
JOB_CACHE.read().contains_key(&id) JOB_CACHE.read().contains_key(&id)
} }
pub fn get<'jh>(id: Id) -> Option<JobCacheHolder<'jh>> { pub fn get(id: Id) -> Option<JobCacheHolder> {
if !Self::contains(id) { if !Self::contains(id) {
return None; return None;
} }
@ -33,11 +33,15 @@ impl JobCache {
pub fn remove(id: Id) { pub fn remove(id: Id) {
JOB_CACHE.write().remove(&id); JOB_CACHE.write().remove(&id);
} }
pub fn stats() -> Vec<Id> {
JOB_CACHE.read().values().map(|job| job.meta.id).collect()
}
} }
pub struct JobCacheHolder<'jh>(pub RwLockReadGuard<'jh, Cache>, pub Id); pub struct JobCacheHolder(pub RwLockReadGuard<'static, Cache>, pub Id);
impl<'jh> Deref for JobCacheHolder<'jh> { impl Deref for JobCacheHolder {
type Target = Val; type Target = Val;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {

@ -1,167 +0,0 @@
use crate::{models::AssignedJob, UResult};
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use tokio::{
runtime::Handle,
sync::mpsc::{channel, Receiver, Sender},
sync::Mutex,
task::{spawn, spawn_blocking, JoinHandle},
};
use uuid::Uuid;
pub type ExecResult = UResult<AssignedJob>;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = {
spawn(init_receiver());
let (tx, rx) = channel(100);
(tx, Mutex::new(rx))
};
}
struct JoinInfo {
handle: JoinHandle<JoinHandle<ExecResult>>,
completed: bool,
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
}
impl JoinInfo {
async fn wait_result(self) -> ExecResult {
self.handle.await.unwrap().await.unwrap()
}
}
fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone()
}
pub struct Waiter {
tasks: Vec<BoxFuture<'static, ExecResult>>,
fids: Vec<Uuid>,
}
impl Waiter {
pub fn new() -> Self {
Self {
tasks: vec![],
fids: vec![],
}
}
pub fn push(&mut self, task: impl Future<Output = ExecResult> + Send + 'static) {
self.tasks.push(Box::pin(task));
}
/// Spawn prepared tasks
pub async fn spawn(mut self) -> Self {
let collectable = true; //TODO: self.tasks.len() != 1;
for f in self.tasks.drain(..) {
let handle = Handle::current();
let fid = Uuid::new_v4();
let tx = get_sender();
self.fids.push(fid);
let task_wrapper = async move {
debug!("inside wrapper (started): {}", fid);
let result = f.await;
tx.send(fid).await.unwrap();
result
};
let handler = JoinInfo {
handle: spawn_blocking(move || handle.spawn(task_wrapper)),
completed: false,
collectable,
};
FUT_RESULTS.lock().await.insert(fid, handler);
}
self
}
/// Wait until a bunch of tasks is finished.
/// NOT GUARANTEED that all tasks will be returned due to
/// possibility to pop them in other places
pub async fn wait(self) -> Vec<ExecResult> {
let mut result = vec![];
for fid in self.fids {
if let Some(task) = pop_task(fid).await {
result.push(task.wait_result().await);
}
}
result
}
}
async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
let mut lock = FUT_RESULTS.lock().await;
if let Some(j) = lock.get_mut(&fid) {
j.completed = true;
}
}
}
async fn pop_task(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> {
let &JoinInfo {
handle: _,
collectable,
completed,
} = match FUT_RESULTS.lock().await.get(&fid) {
Some(t) => t,
None => return None,
};
if collectable && completed {
let task = pop_task(fid).await.unwrap();
Some(task.wait_result().await)
} else {
None
}
}
pub async fn pop_completed() -> Vec<ExecResult> {
let mut completed: Vec<ExecResult> = vec![];
let fids = FUT_RESULTS
.lock()
.await
.keys()
.copied()
.collect::<Vec<Uuid>>();
for fid in fids {
if let Some(r) = pop_task_if_completed(fid).await {
completed.push(r)
}
}
completed
}
#[cfg(test)]
mod tests {
use super::*;
// WTF
// WTF
// WTF
#[tokio::test]
async fn test_spawn() {
use std::sync::Arc;
let val = Arc::new(Mutex::new(0));
let t = {
let v = val.clone();
spawn(async move {
*v.lock().await = 5;
})
};
assert_eq!(0, *val.lock().await);
spawn(async {}).await.unwrap();
assert_eq!(5, *val.lock().await);
t.await.unwrap();
assert_eq!(5, *val.lock().await);
}
}

@ -1,63 +1,67 @@
use crate::{ use crate::{
cache::JobCache,
combined_result::CombinedResult, combined_result::CombinedResult,
executor::{ExecResult, Waiter}, config::get_self_id,
models::{Agent, AssignedJob, AssignedJobById, Job, JobType, RawJob}, models::{Agent, AssignedJob, AssignedJobById, Job, JobType, PreparedJob, RawJob, Stats},
proc_output::ProcOutput, proc_output::ProcOutput,
scheduler::SCHEDULER,
u_runner::{ExecResult, IdentifiableFuture, URunner},
ufs,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::process::exit; use std::process::exit;
use tokio::process::Command; use tokio::process::Command;
pub struct AnonymousJobBatch { pub struct AnonymousJobBatch {
waiter: Waiter, runner: URunner,
is_running: bool, is_running: bool,
} }
impl AnonymousJobBatch { impl AnonymousJobBatch {
pub fn from_meta_with_id(jobs: impl IntoIterator<Item = (Job, AssignedJobById)>) -> Self { pub fn from_prepared_jobs(jobs: impl IntoIterator<Item = PreparedJob>) -> Self {
let mut waiter = Waiter::new(); let mut runner = URunner::new();
for (job, ids) in jobs { for job in jobs {
waiter.push(run_assigned_job(job, ids)); runner.push(IdentifiableFuture::from_fut_with_ident(
job.ids.id,
run_assigned_job(job),
));
} }
Self { Self {
waiter, runner,
is_running: false, is_running: false,
} }
} }
pub fn from_meta(jobs: impl IntoIterator<Item = Job>) -> Self { pub fn from_jobs(jobs: impl IntoIterator<Item = Job>) -> Self {
let jobs_ids: Vec<_> = jobs let jobs_ids: Vec<_> = jobs
.into_iter() .into_iter()
.map(|job| { .map(|job| PreparedJob {
let job_id = job.meta.id; ids: AssignedJobById {
( job_id: job.meta.id,
job, ..Default::default()
AssignedJobById { },
job_id, job,
..Default::default()
},
)
}) })
.collect(); .collect();
AnonymousJobBatch::from_meta_with_id(jobs_ids) AnonymousJobBatch::from_prepared_jobs(jobs_ids)
} }
/// Spawn jobs /// Spawn jobs
pub async fn spawn(mut self) -> Self { pub async fn spawn(mut self) -> Self {
debug!("spawning jobs"); debug!("spawning jobs");
self.waiter = self.waiter.spawn().await; self.runner = self.runner.spawn().await;
self.is_running = true; self.is_running = true;
self self
} }
/// Spawn jobs and wait for result /// Spawn jobs and wait for result
pub async fn wait(self) -> Vec<ExecResult> { pub async fn wait(self) -> Vec<ExecResult> {
let waiter = if !self.is_running { let runner = if !self.is_running {
self.spawn().await.waiter self.spawn().await.runner
} else { } else {
self.waiter self.runner
}; };
waiter.wait().await runner.wait().await
} }
/// Spawn one job and wait for result /// Spawn one job and wait for result
@ -102,7 +106,7 @@ impl NamedJobBatch {
.map(|job| (job.meta.alias.clone().unwrap(), job)) .map(|job| (job.meta.alias.clone().unwrap(), job))
.unzip(); .unzip();
Self { Self {
runner: Some(AnonymousJobBatch::from_meta(jobs)), runner: Some(AnonymousJobBatch::from_jobs(jobs)),
job_names, job_names,
results: HashMap::new(), results: HashMap::new(),
} }
@ -132,9 +136,9 @@ impl NamedJobBatch<true> {
} }
} }
pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult { pub async fn run_assigned_job(prepared_job: PreparedJob) -> ExecResult {
let Job { meta, payload } = job; let mut result = AssignedJob::from(&prepared_job);
let mut result = AssignedJob::from((&meta, ids)); let Job { meta, payload } = prepared_job.job;
match meta.exec_type { match meta.exec_type {
JobType::Shell => { JobType::Shell => {
let (argv, _prepared_payload) = { let (argv, _prepared_payload) = {
@ -150,7 +154,11 @@ pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult {
let mut split_cmd = shlex::split(&argv).unwrap().into_iter(); let mut split_cmd = shlex::split(&argv).unwrap().into_iter();
let cmd = split_cmd.nth(0).unwrap(); let cmd = split_cmd.nth(0).unwrap();
let args = split_cmd.collect::<Vec<String>>(); let args = split_cmd.collect::<Vec<String>>();
let cmd_result = Command::new(cmd).args(args).output().await; let cmd_result = Command::new(cmd)
.kill_on_drop(true)
.args(args)
.output()
.await;
let (data, retcode) = match cmd_result { let (data, retcode) = match cmd_result {
Ok(output) => ( Ok(output) => (
ProcOutput::from_output(&output).into_vec(), ProcOutput::from_output(&output).into_vec(),
@ -172,8 +180,18 @@ pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult {
result.set_result(&agent); result.set_result(&agent);
result.retcode = Some(0); result.retcode = Some(0);
} }
JobType::Service => todo!(), JobType::Stats => {
JobType::Update => todo!(), let stats = Stats {
agent_id: get_self_id(),
running_jobs: URunner::stats().await,
scheduled_jobs: SCHEDULER.stats().await,
cached_jobs: JobCache::stats(),
cached_payloads: ufs::stats(),
};
result.set_result(&stats);
result.retcode = Some(0)
}
JobType::Update => (),
JobType::Terminate => exit(0), JobType::Terminate => exit(0),
}; };
Ok(result) Ok(result)
@ -197,7 +215,7 @@ mod tests {
let sleep_jobs = (0..50).map(|_| job.clone()).collect::<Vec<_>>(); let sleep_jobs = (0..50).map(|_| job.clone()).collect::<Vec<_>>();
let now = SystemTime::now(); let now = SystemTime::now();
AnonymousJobBatch::from_meta(sleep_jobs).wait().await; AnonymousJobBatch::from_jobs(sleep_jobs).wait().await;
assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2) assert!(now.elapsed().unwrap().as_secs() < SLEEP_SECS + 2)
} }
@ -236,7 +254,7 @@ mod tests {
raw_job = raw_job.with_raw_payload(p); raw_job = raw_job.with_raw_payload(p);
} }
let job = raw_job.try_into_job().unwrap(); let job = raw_job.try_into_job().unwrap();
let result = AnonymousJobBatch::from_meta([job]) let result = AnonymousJobBatch::from_jobs([job])
.wait_one() .wait_one()
.await .await
.unwrap(); .unwrap();
@ -250,8 +268,8 @@ mod tests {
const SLEEP_SECS: u64 = 1; const SLEEP_SECS: u64 = 1;
let now = SystemTime::now(); let now = SystemTime::now();
let longest_job = RawJob::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap(); let longest_job = RawJob::from_shell(format!("sleep {}", SLEEP_SECS)).unwrap();
let longest_job = AnonymousJobBatch::from_meta([longest_job]).spawn().await; let longest_job = AnonymousJobBatch::from_jobs([longest_job]).spawn().await;
let ls = AnonymousJobBatch::from_meta([RawJob::from_shell("ls").unwrap()]) let ls = AnonymousJobBatch::from_jobs([RawJob::from_shell("ls").unwrap()])
.wait_one() .wait_one()
.await .await
.unwrap(); .unwrap();
@ -264,7 +282,7 @@ mod tests {
.map(|f| RawJob::from_shell(format!("ls {f}")).unwrap()) .map(|f| RawJob::from_shell(format!("ls {f}")).unwrap())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let ls_subfolders = AnonymousJobBatch::from_meta(subfolders_jobs).wait().await; let ls_subfolders = AnonymousJobBatch::from_jobs(subfolders_jobs).wait().await;
for result in ls_subfolders { for result in ls_subfolders {
assert_eq!(result.unwrap().retcode.unwrap(), 0); assert_eq!(result.unwrap().retcode.unwrap(), 0);
@ -295,7 +313,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_failing_shell_job() -> TestResult { async fn test_failing_shell_job() -> TestResult {
let job = RawJob::from_shell("lol_kek_puk").unwrap(); let job = RawJob::from_shell("lol_kek_puk").unwrap();
let job_result = AnonymousJobBatch::from_meta([job]) let job_result = AnonymousJobBatch::from_jobs([job])
.wait_one() .wait_one()
.await .await
.unwrap(); .unwrap();

@ -7,7 +7,6 @@ pub mod conv;
#[cfg(feature = "server")] #[cfg(feature = "server")]
pub mod db; pub mod db;
pub mod error; pub mod error;
pub mod executor;
pub mod jobs; pub mod jobs;
pub mod logging; pub mod logging;
pub mod messaging; pub mod messaging;
@ -15,7 +14,9 @@ pub mod misc;
pub mod models; pub mod models;
pub mod platform; pub mod platform;
pub mod proc_output; pub mod proc_output;
pub mod scheduler;
pub mod types; pub mod types;
pub mod u_runner;
pub mod ufs; pub mod ufs;
#[cfg(unix)] #[cfg(unix)]
pub mod unix; pub mod unix;

@ -13,8 +13,8 @@ mod server {
use self::server::*; use self::server::*;
use crate::{ use crate::{
config::get_self_id, conv::systime_to_string, executor::ExecResult, jobs::NamedJobBatch, config::get_self_id, conv::systime_to_string, jobs::NamedJobBatch, platform, types::Id,
platform, types::Id, u_runner::ExecResult,
}; };
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)]

@ -1,16 +1,19 @@
use super::{JobMeta, JobState, JobType}; use super::{JobState, JobType};
#[cfg(feature = "server")] #[cfg(feature = "server")]
use crate::models::schema::*; use crate::models::schema::*;
use crate::{ use crate::{
config::get_self_id, config::get_self_id,
conv::{bytes_to_string_truncated, systime_to_string}, conv::{bytes_to_string_truncated, systime_to_string},
models::Job,
types::Id, types::Id,
UError, UResult,
}; };
#[cfg(feature = "server")] #[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable}; use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{borrow::Cow, fmt::Debug, time::SystemTime}; use std::{borrow::Cow, fmt::Debug, time::SystemTime};
/// A job assigned to some agents, stores execution state and result
#[derive(Serialize, Deserialize, Clone, PartialEq)] #[derive(Serialize, Deserialize, Clone, PartialEq)]
#[cfg_attr( #[cfg_attr(
feature = "server", feature = "server",
@ -53,6 +56,13 @@ impl Debug for AssignedJob {
} }
} }
#[derive(Debug, Clone)]
pub struct PreparedJob {
pub job: Job,
pub ids: AssignedJobById,
}
/// Lightweight linking struct between agent job and assigned job
#[derive(Serialize, Deserialize, Clone, Copy, Debug)] #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct AssignedJobById { pub struct AssignedJobById {
pub agent_id: Id, pub agent_id: Id,
@ -61,20 +71,20 @@ pub struct AssignedJobById {
pub job_id: Id, pub job_id: Id,
} }
impl From<(&JobMeta, AssignedJobById)> for AssignedJob { impl From<&PreparedJob> for AssignedJob {
fn from((job, ids): (&JobMeta, AssignedJobById)) -> Self { fn from(prep_job: &PreparedJob) -> Self {
let AssignedJobById { let AssignedJobById {
agent_id, agent_id,
id, id,
job_id, job_id,
} = ids; } = prep_job.ids;
AssignedJob { AssignedJob {
id, id,
agent_id, agent_id,
job_id, job_id,
alias: job.alias.clone(), alias: prep_job.job.meta.alias.clone(),
exec_type: job.exec_type, exec_type: prep_job.job.meta.exec_type,
..Default::default() ..Default::default()
} }
} }
@ -136,7 +146,14 @@ impl AssignedJob {
String::from_utf8_lossy(self.to_raw_result()) String::from_utf8_lossy(self.to_raw_result())
} }
pub fn set_result<S: Serialize>(&mut self, result: &S) { pub fn deserialize<T: DeserializeOwned>(&self) -> UResult<T> {
let body = self.result.as_deref().unwrap_or_default();
serde_json::from_slice(body).map_err(|e| {
UError::DeserializeError(e.to_string(), bytes_to_string_truncated(body, 1024))
})
}
pub fn set_result(&mut self, result: &impl Serialize) {
self.result = Some(serde_json::to_vec(result).unwrap()); self.result = Some(serde_json::to_vec(result).unwrap());
} }

@ -7,6 +7,7 @@ use crate::models::Payload;
use crate::platform; use crate::platform;
use crate::types::Id; use crate::types::Id;
use crate::{UError, UResult}; use crate::{UError, UResult};
use cron::Schedule;
#[cfg(feature = "server")] #[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable}; use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -102,7 +103,7 @@ impl JobMeta {
return Err(mk_err("Argv contains no executable placeholder")); return Err(mk_err("Argv contains no executable placeholder"));
} }
if self.argv.contains("{}") && self.payload_id.is_none() { if self.payload_id.is_none() && self.argv.contains("{}") {
return Err(mk_err( return Err(mk_err(
"No payload provided, but argv contains executable placeholder", "No payload provided, but argv contains executable placeholder",
)); ));
@ -119,16 +120,24 @@ impl JobMeta {
))); )));
} }
if let Some(schedule) = &self.schedule {
Schedule::try_from(schedule.as_str())
.map_err(|e| mk_err(format!("bad schedule: {e}")))?;
}
Ok(self) Ok(self)
} }
} }
/// An abstract valid-constructed job that can be assigned to any agent.
/// Contains job meta and payload meta
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Job { pub struct Job {
pub meta: JobMeta, pub meta: JobMeta,
pub payload: Option<Payload>, pub payload: Option<Payload>,
} }
/// Job that has only been deserialized without checks
#[derive(Serialize, Deserialize, Clone, Default)] #[derive(Serialize, Deserialize, Clone, Default)]
pub struct RawJob<'p> { pub struct RawJob<'p> {
#[serde(default)] #[serde(default)]

@ -35,7 +35,7 @@ pub enum JobState {
)] )]
pub enum JobType { pub enum JobType {
Init, Init,
Service, Stats,
#[default] #[default]
Shell, Shell,
Terminate, Terminate,

@ -3,8 +3,9 @@ mod jobs;
mod payload; mod payload;
#[cfg(feature = "server")] #[cfg(feature = "server")]
pub mod schema; pub mod schema;
mod stats;
pub use crate::models::{agent::*, jobs::*, payload::*}; pub use crate::models::{agent::*, jobs::*, payload::*, stats::*};
use serde::Deserialize; use serde::Deserialize;
use strum::{Display as StrumDisplay, EnumString}; use strum::{Display as StrumDisplay, EnumString};

@ -185,7 +185,7 @@ impl Payload {
#[cfg(windows)] #[cfg(windows)]
pub fn prepare_executable(&self) -> Result<(File, String)> { pub fn prepare_executable(&self) -> Result<(File, String)> {
todo!() compile_error!("допилить")
} }
} }

@ -86,4 +86,9 @@ diesel::joinable!(jobs -> payloads (payload_id));
diesel::joinable!(results -> agents (agent_id)); diesel::joinable!(results -> agents (agent_id));
diesel::joinable!(results -> jobs (job_id)); diesel::joinable!(results -> jobs (job_id));
diesel::allow_tables_to_appear_in_same_query!(agents, jobs, payloads, results,); diesel::allow_tables_to_appear_in_same_query!(
agents,
jobs,
payloads,
results,
);

@ -0,0 +1,14 @@
use serde::{Deserialize, Serialize};
use crate::{messaging::AsMsg, scheduler::EntryStat, types::Id, ufs::IndexFileMeta};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stats {
pub agent_id: Id,
pub running_jobs: Vec<String>,
pub scheduled_jobs: Vec<EntryStat>,
pub cached_jobs: Vec<Id>,
pub cached_payloads: Vec<IndexFileMeta>,
}
impl AsMsg for Stats {}

@ -7,7 +7,7 @@ pub struct ProcOutput {
} }
impl ProcOutput { impl ProcOutput {
const STDERR_DELIMETER: &[u8] = b"[STDERR]\n"; const STDERR_DELIMETER: &'static [u8] = b"[STDERR]\n";
pub fn from_output(output: &Output) -> Self { pub fn from_output(output: &Output) -> Self {
Self::new() Self::new()

@ -0,0 +1,91 @@
use crate::{models::PreparedJob, types::Id, u_runner::IdentifiableFuture};
use chrono::{DateTime, Utc};
use core::fmt;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub trait SchedulerJob {
fn call(&self) -> IdentifiableFuture<()>;
}
// scheduler job need to be called repeatedly,
// the only way to achieve this is to use Fn() -> Future
impl<F> SchedulerJob for F
where
F: Fn() -> IdentifiableFuture<()>,
{
fn call(&self) -> IdentifiableFuture<()> {
self()
}
}
#[derive(Clone)]
pub struct Entry {
pub entry_id: Id,
pub schedule: Option<cron::Schedule>,
pub next: Option<DateTime<Utc>>,
pub runnable: EntryType,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EntryStat {
pub entry_id: Id,
pub job_ident: String,
pub schedule: String,
pub next: Option<DateTime<Utc>>,
}
#[derive(Clone)]
pub enum EntryType {
Common(Arc<dyn SchedulerJob + Send + Sync>),
URunner(PreparedJob),
}
impl<J: SchedulerJob + Send + Sync + 'static> From<J> for EntryType {
fn from(value: J) -> Self {
Self::Common(Arc::new(value))
}
}
impl From<PreparedJob> for EntryType {
fn from(value: PreparedJob) -> Self {
Self::URunner(value)
}
}
impl Entry {
pub fn set_next_run_time(&mut self) {
self.next = self.get_next_run_time();
}
pub fn get_next_run_time(&self) -> Option<DateTime<Utc>> {
match &self.schedule {
Some(schedule) => schedule.upcoming(Utc).next(),
None => Some(Utc::now()),
}
}
pub fn as_stat(&self) -> EntryStat {
let job_ident = match &self.runnable {
EntryType::URunner(entry) => entry.job.meta.id.to_string(),
EntryType::Common(entry) => entry.call().job_ident.to_string(),
};
EntryStat {
entry_id: self.entry_id,
job_ident,
schedule: self
.schedule
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default(),
next: self.next.clone(),
}
}
}
impl fmt::Debug for Entry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} {:?} {:?}", self.entry_id, self.schedule, self.next)
}
}

@ -0,0 +1,178 @@
mod entry;
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use cron::Schedule;
use entry::Entry;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::jobs::AnonymousJobBatch;
use crate::types::Id;
use self::entry::EntryType;
pub use entry::EntryStat;
pub static SCHEDULER: Lazy<AsyncScheduler> = Lazy::new(|| AsyncScheduler::new());
#[derive(Clone)]
pub struct AsyncScheduler {
entries: Arc<Mutex<Vec<Entry>>>,
}
impl AsyncScheduler {
pub fn new() -> AsyncScheduler {
AsyncScheduler {
entries: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn start_blocking(&self) -> ! {
for entry in self.entries.lock().await.iter_mut() {
entry.set_next_run_time();
}
loop {
let mut entries = self.entries.lock().await;
entries.sort_by(|b, a| b.next.cmp(&a.next));
let wait_duration = if let Some(entry) = entries.first() {
let wait_millis = (entry.next.as_ref().unwrap().timestamp_millis() as u64)
.saturating_sub(Utc::now().timestamp_millis() as u64);
Duration::from_millis(wait_millis)
} else {
Duration::from_secs(1)
};
drop(entries);
tokio::time::sleep(wait_duration).await;
let mut entries = self.entries.lock().await;
let mut job_batch = vec![];
for entry in &mut *entries {
match entry.next.as_ref() {
Some(next) => {
if next.gt(&Utc::now()) {
break;
}
match &entry.runnable {
EntryType::Common(runnable) => {
let fut = runnable.call();
tokio::spawn(fut);
}
EntryType::URunner(runnable) => {
debug!("starting assigned job {:?}", runnable.ids);
job_batch.push(runnable.clone())
}
}
entry.set_next_run_time();
}
None => {}
}
}
AnonymousJobBatch::from_prepared_jobs(job_batch)
.spawn()
.await;
entries.retain(|e| e.schedule.is_some());
}
}
pub async fn add_job(&self, schedule: Option<Schedule>, runnable: impl Into<EntryType>) -> Id {
let entry_id = Uuid::new_v4();
let runnable = runnable.into();
let mut entry = Entry {
entry_id,
schedule,
next: None,
runnable,
};
entry.set_next_run_time();
self.entries.lock().await.push(entry);
entry_id
}
pub async fn del_job(&self, entry_id: Id) {
self.entries.lock().await.retain(|e| e.entry_id != entry_id);
}
pub async fn stats(&self) -> Vec<EntryStat> {
self.entries
.lock()
.await
.iter()
.map(|entry| entry.as_stat())
.collect()
}
pub async fn start(&self) {
let cloned = self.clone();
tokio::spawn(async move {
cloned.start_blocking().await;
});
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::time::sleep;
use crate::u_runner::IdentifiableFuture;
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn scheduling() {
let v = Arc::new(Mutex::new(0));
let scheduler = AsyncScheduler::new();
{
let v = v.clone();
scheduler
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
let v = v.clone();
IdentifiableFuture::from_fut_with_ident("testf", async move {
*v.lock().await += 1;
})
})
.await;
}
scheduler
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
IdentifiableFuture::from_fut_with_ident("testf", async move {
println!("hello1");
})
})
.await;
scheduler
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || {
IdentifiableFuture::from_fut_with_ident("testf", async move {
println!("hello2");
})
})
.await;
scheduler.start().await;
sleep(Duration::from_secs_f32(0.1)).await; // wait for scheduler
sleep(Duration::from_secs(1)).await;
assert_eq!(*v.lock().await, 1);
sleep(Duration::from_secs(1)).await;
assert_eq!(*v.lock().await, 2);
}
}

@ -0,0 +1,200 @@
use crate::{models::AssignedJob, UResult};
use lazy_static::lazy_static;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::{collections::HashMap, task::Context};
use tokio::{
runtime::Handle,
sync::mpsc::{channel, Receiver, Sender},
sync::Mutex,
task::{spawn, spawn_blocking, JoinHandle},
};
use uuid::Uuid;
pub type ExecResult = UResult<AssignedJob>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = {
spawn(init_receiver());
let (tx, rx) = channel(100);
(tx, Mutex::new(rx))
};
}
pub struct IdentifiableFuture<R> {
pub job_ident: String,
fut: Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>,
}
impl<R> IdentifiableFuture<R> {
pub fn from_fut_with_ident(
job_ident: impl Into<String>,
job: impl Future<Output = R> + Send + Sync + 'static,
) -> Self {
Self {
fut: Box::pin(job),
job_ident: job_ident.into(),
}
}
}
impl<R> Future for IdentifiableFuture<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut.as_mut().poll(cx)
}
}
struct JoinInfo {
job_ident: String,
handle: JoinHandle<JoinHandle<ExecResult>>,
completed: bool,
// collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
}
impl JoinInfo {
async fn wait_result(self) -> ExecResult {
self.handle.await.unwrap().await.unwrap()
}
}
fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone()
}
/// Job runner. Has multiple ways of work.
/// - run 1 or more jobs and wait until they're all done
/// - run 1 or more jobs in background and collect results of completed jobs later
pub struct URunner {
executables: Vec<IdentifiableFuture<ExecResult>>,
fids: Vec<Uuid>,
}
impl URunner {
pub fn new() -> Self {
Self {
executables: vec![],
fids: vec![],
}
}
pub fn push(&mut self, job: IdentifiableFuture<ExecResult>) {
self.executables.push(job);
}
/// Spawn prepared tasks
pub async fn spawn(mut self) -> Self {
for executable in self.executables.drain(..) {
let handle = Handle::current();
let fid = Uuid::new_v4();
let tx = get_sender();
let job_id = executable.job_ident.clone();
self.fids.push(fid);
let job_wrapper = async move {
debug!("inside wrapper (started): {}", fid);
let result = executable.await;
tx.send(fid).await.ok();
result
};
let handler = JoinInfo {
job_ident: job_id,
handle: spawn_blocking(move || handle.spawn(job_wrapper)),
completed: false,
};
FUT_RESULTS.lock().await.insert(fid, handler);
}
self
}
/// Wait until a bunch of tasks is finished.
/// NOT GUARANTEED that all tasks will be returned due to
/// possibility to pop them in other places
pub async fn wait(self) -> Vec<ExecResult> {
let mut result = vec![];
for fid in self.fids {
if let Some(job) = Self::pop_job(fid).await {
result.push(job.wait_result().await);
}
}
result
}
pub async fn pop_job_if_completed(fid: Uuid) -> Option<ExecResult> {
let Some(&JoinInfo { completed, .. }) = FUT_RESULTS.lock().await.get(&fid) else {
return None;
};
if completed {
let job = Self::pop_job(fid).await.unwrap();
Some(job.wait_result().await)
} else {
None
}
}
pub async fn pop_completed() -> Vec<ExecResult> {
let mut completed: Vec<ExecResult> = vec![];
let fids = FUT_RESULTS
.lock()
.await
.keys()
.copied()
.collect::<Vec<Uuid>>();
for fid in fids {
if let Some(r) = Self::pop_job_if_completed(fid).await {
completed.push(r)
}
}
completed
}
pub async fn stats() -> Vec<String> {
FUT_RESULTS
.lock()
.await
.values()
.map(|v| v.job_ident.clone())
.collect()
}
async fn pop_job(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
}
async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
let mut lock = FUT_RESULTS.lock().await;
if let Some(j) = lock.get_mut(&fid) {
j.completed = true;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
// WTF
// WTF
// WTF
#[tokio::test]
async fn test_spawn() {
use std::sync::Arc;
let val = Arc::new(Mutex::new(0));
let t = {
let v = val.clone();
spawn(async move {
*v.lock().await = 5;
})
};
assert_eq!(0, *val.lock().await);
spawn(async {}).await.unwrap();
assert_eq!(5, *val.lock().await);
t.await.unwrap();
assert_eq!(5, *val.lock().await);
}
}

@ -1,13 +1,15 @@
use super::{Error, FileMeta}; use super::Error;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::OsString;
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
// index format: given_name -> payload_meta // index format: given_name -> payload_meta
type Index = HashMap<String, FileMeta>; type Index = HashMap<String, IndexFileMeta>;
static IDX_FILE_NAME: Lazy<PathBuf> = Lazy::new(|| temp_dir().join(".i")); static IDX_FILE_NAME: Lazy<PathBuf> = Lazy::new(|| temp_dir().join(".i"));
static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| { static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| {
@ -28,6 +30,35 @@ static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| {
Mutex::new(idx) Mutex::new(idx)
}); });
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct IndexFileMeta {
extension: Option<OsString>,
pub external: bool, // if file is present before adding to index
pub hashsum: String,
pub path: PathBuf, // actual file path
pub size: u64,
}
impl IndexFileMeta {
pub fn new(
full_path: impl Into<PathBuf>,
hashsum: String,
external: bool,
) -> Result<Self, Error> {
let full_path: PathBuf = full_path.into();
let extension = full_path.extension().map(ToOwned::to_owned);
let size = fs::metadata(&full_path)?.len();
Ok(IndexFileMeta {
path: full_path,
extension,
external,
hashsum,
size,
})
}
}
mod sync { mod sync {
use super::{Index, IDX_FILE_NAME}; use super::{Index, IDX_FILE_NAME};
use std::fs; use std::fs;
@ -57,7 +88,7 @@ mod sync {
} }
} }
pub fn get(name: impl AsRef<str>) -> Option<FileMeta> { pub fn get(name: impl AsRef<str>) -> Option<IndexFileMeta> {
let mut index = INDEX.lock(); let mut index = INDEX.lock();
sync::deleted(&mut index); sync::deleted(&mut index);
@ -65,18 +96,18 @@ pub fn get(name: impl AsRef<str>) -> Option<FileMeta> {
index.get(name.as_ref()).cloned() index.get(name.as_ref()).cloned()
} }
pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, FileMeta)> { pub fn get_by_hash(hash: impl AsRef<str>) -> Option<(String, IndexFileMeta)> {
let mut index = INDEX.lock(); let mut index = INDEX.lock();
sync::deleted(&mut index); sync::deleted(&mut index);
index index
.iter() .iter()
.find(|(_name, meta)| meta.hash == hash.as_ref()) .find(|(_name, meta)| meta.hashsum == hash.as_ref())
.map(|(n, m)| (n.to_owned(), m.clone())) .map(|(n, m)| (n.to_owned(), m.clone()))
} }
pub fn insert(name: impl Into<String>, meta: FileMeta) { pub fn insert(name: impl Into<String>, meta: IndexFileMeta) {
let mut index = INDEX.lock(); let mut index = INDEX.lock();
sync::deleted(&mut index); sync::deleted(&mut index);
@ -87,7 +118,7 @@ pub fn insert(name: impl Into<String>, meta: FileMeta) {
sync::index2fs(&mut index); sync::index2fs(&mut index);
} }
pub fn remove(name: impl AsRef<str>) -> Option<FileMeta> { pub fn remove(name: impl AsRef<str>) -> Option<IndexFileMeta> {
let mut index = INDEX.lock(); let mut index = INDEX.lock();
sync::deleted(&mut index); sync::deleted(&mut index);
@ -99,3 +130,7 @@ pub fn remove(name: impl AsRef<str>) -> Option<FileMeta> {
result result
} }
pub fn stats() -> Vec<IndexFileMeta> {
INDEX.lock().values().cloned().collect()
}

@ -1,49 +1,19 @@
// This module is aiming to store (obfuscated?) payloads, get them by name, /// This module is aiming to store (obfuscated?) payloads, get them by name,
// rename, update, delete or prepare to execute via memfd_create (unix) /// rename, update, delete or prepare to execute via memfd_create (unix)
mod error;
mod index;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::OsString;
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use uuid::Uuid; use uuid::Uuid;
mod error;
mod index;
pub use error::Error; pub use error::Error;
pub use index::IndexFileMeta;
const OBFUSCATE: bool = cfg!(feature = "agent"); const OBFUSCATE: bool = cfg!(feature = "agent");
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileMeta {
extension: Option<OsString>,
external: bool, // if file is present before adding to index
hash: Vec<u8>,
pub path: PathBuf, // actual file path
pub size: u64,
}
impl FileMeta {
pub fn new(
full_path: impl Into<PathBuf>,
hash: Vec<u8>,
external: bool,
) -> Result<Self, Error> {
let full_path: PathBuf = full_path.into();
let extension = full_path.extension().map(ToOwned::to_owned);
let size = fs::metadata(&full_path)?.len();
Ok(FileMeta {
path: full_path,
extension,
external,
hash,
size,
})
}
}
/// Check if file exists in index. /// Check if file exists in index.
/// File may present in fs but not in index, fn will return false then. /// File may present in fs but not in index, fn will return false then.
pub fn exists_in_index(name: impl AsRef<str>) -> bool { pub fn exists_in_index(name: impl AsRef<str>) -> bool {
@ -51,7 +21,7 @@ pub fn exists_in_index(name: impl AsRef<str>) -> bool {
} }
#[inline] #[inline]
pub fn read_meta(name: impl AsRef<str>) -> Result<FileMeta> { pub fn read_meta(name: impl AsRef<str>) -> Result<IndexFileMeta> {
index::get(&name) index::get(&name)
.ok_or_else(|| Error::not_found(name.as_ref())) .ok_or_else(|| Error::not_found(name.as_ref()))
.context("meta") .context("meta")
@ -113,7 +83,7 @@ pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
}; };
index::insert( index::insert(
name, name,
FileMeta::new(path, data_hash, false).context("put_insert")?, IndexFileMeta::new(path, data_hash, false).context("put_insert")?,
); );
Ok(()) Ok(())
@ -123,7 +93,7 @@ pub fn edit(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
let meta = read_meta(&name).context("edit_meta")?; let meta = read_meta(&name).context("edit_meta")?;
let data_hash = hash_data(&data); let data_hash = hash_data(&data);
if meta.hash == data_hash { if meta.hashsum == data_hash {
return Ok(()); return Ok(());
} }
@ -131,7 +101,7 @@ pub fn edit(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
.map_err(|e| Error::new(e, &meta.path)) .map_err(|e| Error::new(e, &meta.path))
.context("edit_write")?; .context("edit_write")?;
let new_meta = FileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?; let new_meta = IndexFileMeta::new(meta.path, data_hash, meta.external).context("edit_nmeta")?;
index::remove(&name); index::remove(&name);
index::insert(name.as_ref(), new_meta); index::insert(name.as_ref(), new_meta);
@ -215,12 +185,16 @@ pub fn put_external(path: impl AsRef<Path>) -> Result<()> {
index::insert( index::insert(
path_str, path_str,
FileMeta::new(path, data_hash, true).context("ext2")?, IndexFileMeta::new(path, data_hash, true).context("ext2")?,
); );
Ok(()) Ok(())
} }
pub fn stats() -> Vec<IndexFileMeta> {
index::stats()
}
/* /*
pub fn cleanup() { pub fn cleanup() {
let index = INDEX.read(); let index = INDEX.read();
@ -231,10 +205,11 @@ pub fn cleanup() {
} }
*/ */
fn hash_data(data: impl AsRef<[u8]>) -> Vec<u8> { fn hash_data(data: impl AsRef<[u8]>) -> String {
use sha3::{Digest, Sha3_256}; use sha3::{Digest, Sha3_256};
let mut hasher = Sha3_256::new(); let mut hasher = Sha3_256::new();
hasher.update(data); hasher.update(data);
hasher.finalize().to_vec() let hashsum = hasher.finalize().to_vec();
hex::encode(hashsum)
} }

@ -1,5 +1,5 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE JobType AS ENUM ('shell', 'init', 'python', 'service'); CREATE TYPE JobType AS ENUM ('shell', 'init', 'python', 'service', 'stats');
CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished'); CREATE TYPE JobState AS ENUM ('queued', 'running', 'finished');
CREATE TYPE AgentState AS ENUM ('new', 'active', 'banned'); CREATE TYPE AgentState AS ENUM ('new', 'active', 'banned');

Loading…
Cancel
Save