32 changed files with 1270 additions and 728 deletions
File diff suppressed because it is too large
Load Diff
@ -1,6 +1,7 @@ |
||||
<mat-form-field class="info-dlg-field" floatLabel="always"> |
||||
<mat-form-field style="box-sizing:border-box; width:100%" class="info-dlg-field" floatLabel="always"> |
||||
<mat-label>Payload data</mat-label> |
||||
<textarea matInput cdkTextareaAutosize *ngIf="!isTooBigPayload" [readonly]="isPreview" [(ngModel)]="decodedPayload"> |
||||
<textarea class="code" matInput cdkTextareaAutosize="true" *ngIf="!isTooBigPayload" [readonly]="isPreview" |
||||
[(ngModel)]="decodedPayload"> |
||||
</textarea> |
||||
<input matInput *ngIf="isTooBigPayload" disabled placeholder="Payload is too big to display"> |
||||
</mat-form-field> |
@ -1,167 +0,0 @@ |
||||
use crate::{models::AssignedJob, UResult}; |
||||
use lazy_static::lazy_static; |
||||
use std::collections::HashMap; |
||||
use std::future::Future; |
||||
use std::pin::Pin; |
||||
use tokio::{ |
||||
runtime::Handle, |
||||
sync::mpsc::{channel, Receiver, Sender}, |
||||
sync::Mutex, |
||||
task::{spawn, spawn_blocking, JoinHandle}, |
||||
}; |
||||
use uuid::Uuid; |
||||
|
||||
pub type ExecResult = UResult<AssignedJob>; |
||||
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>; |
||||
|
||||
lazy_static! { |
||||
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); |
||||
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = { |
||||
spawn(init_receiver()); |
||||
let (tx, rx) = channel(100); |
||||
(tx, Mutex::new(rx)) |
||||
}; |
||||
} |
||||
|
||||
struct JoinInfo { |
||||
handle: JoinHandle<JoinHandle<ExecResult>>, |
||||
completed: bool, |
||||
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
|
||||
} |
||||
|
||||
impl JoinInfo { |
||||
async fn wait_result(self) -> ExecResult { |
||||
self.handle.await.unwrap().await.unwrap() |
||||
} |
||||
} |
||||
|
||||
fn get_sender() -> Sender<Uuid> { |
||||
FUT_CHANNEL.0.clone() |
||||
} |
||||
|
||||
pub struct Waiter { |
||||
tasks: Vec<BoxFuture<'static, ExecResult>>, |
||||
fids: Vec<Uuid>, |
||||
} |
||||
|
||||
impl Waiter { |
||||
pub fn new() -> Self { |
||||
Self { |
||||
tasks: vec![], |
||||
fids: vec![], |
||||
} |
||||
} |
||||
|
||||
pub fn push(&mut self, task: impl Future<Output = ExecResult> + Send + 'static) { |
||||
self.tasks.push(Box::pin(task)); |
||||
} |
||||
|
||||
/// Spawn prepared tasks
|
||||
pub async fn spawn(mut self) -> Self { |
||||
let collectable = true; //TODO: self.tasks.len() != 1;
|
||||
for f in self.tasks.drain(..) { |
||||
let handle = Handle::current(); |
||||
let fid = Uuid::new_v4(); |
||||
let tx = get_sender(); |
||||
self.fids.push(fid); |
||||
let task_wrapper = async move { |
||||
debug!("inside wrapper (started): {}", fid); |
||||
let result = f.await; |
||||
tx.send(fid).await.unwrap(); |
||||
result |
||||
}; |
||||
let handler = JoinInfo { |
||||
handle: spawn_blocking(move || handle.spawn(task_wrapper)), |
||||
completed: false, |
||||
collectable, |
||||
}; |
||||
FUT_RESULTS.lock().await.insert(fid, handler); |
||||
} |
||||
self |
||||
} |
||||
|
||||
/// Wait until a bunch of tasks is finished.
|
||||
/// NOT GUARANTEED that all tasks will be returned due to
|
||||
/// possibility to pop them in other places
|
||||
pub async fn wait(self) -> Vec<ExecResult> { |
||||
let mut result = vec![]; |
||||
for fid in self.fids { |
||||
if let Some(task) = pop_task(fid).await { |
||||
result.push(task.wait_result().await); |
||||
} |
||||
} |
||||
result |
||||
} |
||||
} |
||||
|
||||
async fn init_receiver() { |
||||
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { |
||||
let mut lock = FUT_RESULTS.lock().await; |
||||
if let Some(j) = lock.get_mut(&fid) { |
||||
j.completed = true; |
||||
} |
||||
} |
||||
} |
||||
|
||||
async fn pop_task(fid: Uuid) -> Option<JoinInfo> { |
||||
FUT_RESULTS.lock().await.remove(&fid) |
||||
} |
||||
|
||||
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> { |
||||
let &JoinInfo { |
||||
handle: _, |
||||
collectable, |
||||
completed, |
||||
} = match FUT_RESULTS.lock().await.get(&fid) { |
||||
Some(t) => t, |
||||
None => return None, |
||||
}; |
||||
if collectable && completed { |
||||
let task = pop_task(fid).await.unwrap(); |
||||
Some(task.wait_result().await) |
||||
} else { |
||||
None |
||||
} |
||||
} |
||||
|
||||
pub async fn pop_completed() -> Vec<ExecResult> { |
||||
let mut completed: Vec<ExecResult> = vec![]; |
||||
let fids = FUT_RESULTS |
||||
.lock() |
||||
.await |
||||
.keys() |
||||
.copied() |
||||
.collect::<Vec<Uuid>>(); |
||||
for fid in fids { |
||||
if let Some(r) = pop_task_if_completed(fid).await { |
||||
completed.push(r) |
||||
} |
||||
} |
||||
completed |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod tests { |
||||
use super::*; |
||||
|
||||
// WTF
|
||||
// WTF
|
||||
// WTF
|
||||
#[tokio::test] |
||||
async fn test_spawn() { |
||||
use std::sync::Arc; |
||||
|
||||
let val = Arc::new(Mutex::new(0)); |
||||
let t = { |
||||
let v = val.clone(); |
||||
spawn(async move { |
||||
*v.lock().await = 5; |
||||
}) |
||||
}; |
||||
assert_eq!(0, *val.lock().await); |
||||
spawn(async {}).await.unwrap(); |
||||
assert_eq!(5, *val.lock().await); |
||||
t.await.unwrap(); |
||||
assert_eq!(5, *val.lock().await); |
||||
} |
||||
} |
@ -0,0 +1,14 @@ |
||||
use serde::{Deserialize, Serialize}; |
||||
|
||||
use crate::{messaging::AsMsg, scheduler::EntryStat, types::Id, ufs::IndexFileMeta}; |
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)] |
||||
pub struct Stats { |
||||
pub agent_id: Id, |
||||
pub running_jobs: Vec<String>, |
||||
pub scheduled_jobs: Vec<EntryStat>, |
||||
pub cached_jobs: Vec<Id>, |
||||
pub cached_payloads: Vec<IndexFileMeta>, |
||||
} |
||||
|
||||
impl AsMsg for Stats {} |
@ -0,0 +1,91 @@ |
||||
use crate::{models::PreparedJob, types::Id, u_runner::IdentifiableFuture}; |
||||
use chrono::{DateTime, Utc}; |
||||
use core::fmt; |
||||
use serde::{Deserialize, Serialize}; |
||||
use std::sync::Arc; |
||||
|
||||
pub trait SchedulerJob { |
||||
fn call(&self) -> IdentifiableFuture<()>; |
||||
} |
||||
|
||||
// scheduler job need to be called repeatedly,
|
||||
// the only way to achieve this is to use Fn() -> Future
|
||||
impl<F> SchedulerJob for F |
||||
where |
||||
F: Fn() -> IdentifiableFuture<()>, |
||||
{ |
||||
fn call(&self) -> IdentifiableFuture<()> { |
||||
self() |
||||
} |
||||
} |
||||
|
||||
#[derive(Clone)] |
||||
pub struct Entry { |
||||
pub entry_id: Id, |
||||
pub schedule: Option<cron::Schedule>, |
||||
pub next: Option<DateTime<Utc>>, |
||||
pub runnable: EntryType, |
||||
} |
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)] |
||||
pub struct EntryStat { |
||||
pub entry_id: Id, |
||||
pub job_ident: String, |
||||
pub schedule: String, |
||||
pub next: Option<DateTime<Utc>>, |
||||
} |
||||
|
||||
#[derive(Clone)] |
||||
pub enum EntryType { |
||||
Common(Arc<dyn SchedulerJob + Send + Sync>), |
||||
URunner(PreparedJob), |
||||
} |
||||
|
||||
impl<J: SchedulerJob + Send + Sync + 'static> From<J> for EntryType { |
||||
fn from(value: J) -> Self { |
||||
Self::Common(Arc::new(value)) |
||||
} |
||||
} |
||||
|
||||
impl From<PreparedJob> for EntryType { |
||||
fn from(value: PreparedJob) -> Self { |
||||
Self::URunner(value) |
||||
} |
||||
} |
||||
|
||||
impl Entry { |
||||
pub fn set_next_run_time(&mut self) { |
||||
self.next = self.get_next_run_time(); |
||||
} |
||||
|
||||
pub fn get_next_run_time(&self) -> Option<DateTime<Utc>> { |
||||
match &self.schedule { |
||||
Some(schedule) => schedule.upcoming(Utc).next(), |
||||
None => Some(Utc::now()), |
||||
} |
||||
} |
||||
|
||||
pub fn as_stat(&self) -> EntryStat { |
||||
let job_ident = match &self.runnable { |
||||
EntryType::URunner(entry) => entry.job.meta.id.to_string(), |
||||
EntryType::Common(entry) => entry.call().job_ident.to_string(), |
||||
}; |
||||
|
||||
EntryStat { |
||||
entry_id: self.entry_id, |
||||
job_ident, |
||||
schedule: self |
||||
.schedule |
||||
.as_ref() |
||||
.map(|s| s.to_string()) |
||||
.unwrap_or_default(), |
||||
next: self.next.clone(), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl fmt::Debug for Entry { |
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
||||
write!(f, "{} {:?} {:?}", self.entry_id, self.schedule, self.next) |
||||
} |
||||
} |
@ -0,0 +1,178 @@ |
||||
mod entry; |
||||
|
||||
use std::sync::Arc; |
||||
use std::time::Duration; |
||||
|
||||
use chrono::Utc; |
||||
use cron::Schedule; |
||||
use entry::Entry; |
||||
use once_cell::sync::Lazy; |
||||
use tokio::sync::Mutex; |
||||
use uuid::Uuid; |
||||
|
||||
use crate::jobs::AnonymousJobBatch; |
||||
use crate::types::Id; |
||||
|
||||
use self::entry::EntryType; |
||||
|
||||
pub use entry::EntryStat; |
||||
|
||||
pub static SCHEDULER: Lazy<AsyncScheduler> = Lazy::new(|| AsyncScheduler::new()); |
||||
|
||||
#[derive(Clone)] |
||||
pub struct AsyncScheduler { |
||||
entries: Arc<Mutex<Vec<Entry>>>, |
||||
} |
||||
|
||||
impl AsyncScheduler { |
||||
pub fn new() -> AsyncScheduler { |
||||
AsyncScheduler { |
||||
entries: Arc::new(Mutex::new(Vec::new())), |
||||
} |
||||
} |
||||
|
||||
pub async fn start_blocking(&self) -> ! { |
||||
for entry in self.entries.lock().await.iter_mut() { |
||||
entry.set_next_run_time(); |
||||
} |
||||
|
||||
loop { |
||||
let mut entries = self.entries.lock().await; |
||||
entries.sort_by(|b, a| b.next.cmp(&a.next)); |
||||
|
||||
let wait_duration = if let Some(entry) = entries.first() { |
||||
let wait_millis = (entry.next.as_ref().unwrap().timestamp_millis() as u64) |
||||
.saturating_sub(Utc::now().timestamp_millis() as u64); |
||||
|
||||
Duration::from_millis(wait_millis) |
||||
} else { |
||||
Duration::from_secs(1) |
||||
}; |
||||
|
||||
drop(entries); |
||||
tokio::time::sleep(wait_duration).await; |
||||
|
||||
let mut entries = self.entries.lock().await; |
||||
let mut job_batch = vec![]; |
||||
|
||||
for entry in &mut *entries { |
||||
match entry.next.as_ref() { |
||||
Some(next) => { |
||||
if next.gt(&Utc::now()) { |
||||
break; |
||||
} |
||||
match &entry.runnable { |
||||
EntryType::Common(runnable) => { |
||||
let fut = runnable.call(); |
||||
tokio::spawn(fut); |
||||
} |
||||
EntryType::URunner(runnable) => { |
||||
debug!("starting assigned job {:?}", runnable.ids); |
||||
job_batch.push(runnable.clone()) |
||||
} |
||||
} |
||||
entry.set_next_run_time(); |
||||
} |
||||
None => {} |
||||
} |
||||
} |
||||
|
||||
AnonymousJobBatch::from_prepared_jobs(job_batch) |
||||
.spawn() |
||||
.await; |
||||
|
||||
entries.retain(|e| e.schedule.is_some()); |
||||
} |
||||
} |
||||
|
||||
pub async fn add_job(&self, schedule: Option<Schedule>, runnable: impl Into<EntryType>) -> Id { |
||||
let entry_id = Uuid::new_v4(); |
||||
let runnable = runnable.into(); |
||||
let mut entry = Entry { |
||||
entry_id, |
||||
schedule, |
||||
next: None, |
||||
runnable, |
||||
}; |
||||
|
||||
entry.set_next_run_time(); |
||||
self.entries.lock().await.push(entry); |
||||
|
||||
entry_id |
||||
} |
||||
|
||||
pub async fn del_job(&self, entry_id: Id) { |
||||
self.entries.lock().await.retain(|e| e.entry_id != entry_id); |
||||
} |
||||
|
||||
pub async fn stats(&self) -> Vec<EntryStat> { |
||||
self.entries |
||||
.lock() |
||||
.await |
||||
.iter() |
||||
.map(|entry| entry.as_stat()) |
||||
.collect() |
||||
} |
||||
|
||||
pub async fn start(&self) { |
||||
let cloned = self.clone(); |
||||
tokio::spawn(async move { |
||||
cloned.start_blocking().await; |
||||
}); |
||||
} |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod tests { |
||||
use std::time::Duration; |
||||
use tokio::time::sleep; |
||||
|
||||
use crate::u_runner::IdentifiableFuture; |
||||
|
||||
use super::*; |
||||
|
||||
#[tokio::test(flavor = "multi_thread")] |
||||
async fn scheduling() { |
||||
let v = Arc::new(Mutex::new(0)); |
||||
let scheduler = AsyncScheduler::new(); |
||||
|
||||
{ |
||||
let v = v.clone(); |
||||
scheduler |
||||
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
||||
let v = v.clone(); |
||||
IdentifiableFuture::from_fut_with_ident("testf", async move { |
||||
*v.lock().await += 1; |
||||
}) |
||||
}) |
||||
.await; |
||||
} |
||||
|
||||
scheduler |
||||
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
||||
IdentifiableFuture::from_fut_with_ident("testf", async move { |
||||
println!("hello1"); |
||||
}) |
||||
}) |
||||
.await; |
||||
|
||||
scheduler |
||||
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
||||
IdentifiableFuture::from_fut_with_ident("testf", async move { |
||||
println!("hello2"); |
||||
}) |
||||
}) |
||||
.await; |
||||
|
||||
scheduler.start().await; |
||||
|
||||
sleep(Duration::from_secs_f32(0.1)).await; // wait for scheduler
|
||||
sleep(Duration::from_secs(1)).await; |
||||
|
||||
assert_eq!(*v.lock().await, 1); |
||||
|
||||
sleep(Duration::from_secs(1)).await; |
||||
|
||||
assert_eq!(*v.lock().await, 2); |
||||
} |
||||
} |
@ -0,0 +1,200 @@ |
||||
use crate::{models::AssignedJob, UResult}; |
||||
use lazy_static::lazy_static; |
||||
use std::future::Future; |
||||
use std::pin::Pin; |
||||
use std::task::Poll; |
||||
use std::{collections::HashMap, task::Context}; |
||||
use tokio::{ |
||||
runtime::Handle, |
||||
sync::mpsc::{channel, Receiver, Sender}, |
||||
sync::Mutex, |
||||
task::{spawn, spawn_blocking, JoinHandle}, |
||||
}; |
||||
use uuid::Uuid; |
||||
|
||||
pub type ExecResult = UResult<AssignedJob>; |
||||
|
||||
lazy_static! { |
||||
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); |
||||
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = { |
||||
spawn(init_receiver()); |
||||
let (tx, rx) = channel(100); |
||||
(tx, Mutex::new(rx)) |
||||
}; |
||||
} |
||||
|
||||
pub struct IdentifiableFuture<R> { |
||||
pub job_ident: String, |
||||
fut: Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>, |
||||
} |
||||
|
||||
impl<R> IdentifiableFuture<R> { |
||||
pub fn from_fut_with_ident( |
||||
job_ident: impl Into<String>, |
||||
job: impl Future<Output = R> + Send + Sync + 'static, |
||||
) -> Self { |
||||
Self { |
||||
fut: Box::pin(job), |
||||
job_ident: job_ident.into(), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl<R> Future for IdentifiableFuture<R> { |
||||
type Output = R; |
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
||||
self.fut.as_mut().poll(cx) |
||||
} |
||||
} |
||||
|
||||
struct JoinInfo { |
||||
job_ident: String, |
||||
handle: JoinHandle<JoinHandle<ExecResult>>, |
||||
completed: bool, |
||||
// collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
|
||||
} |
||||
|
||||
impl JoinInfo { |
||||
async fn wait_result(self) -> ExecResult { |
||||
self.handle.await.unwrap().await.unwrap() |
||||
} |
||||
} |
||||
|
||||
fn get_sender() -> Sender<Uuid> { |
||||
FUT_CHANNEL.0.clone() |
||||
} |
||||
|
||||
/// Job runner. Has multiple ways of work.
|
||||
/// - run 1 or more jobs and wait until they're all done
|
||||
/// - run 1 or more jobs in background and collect results of completed jobs later
|
||||
pub struct URunner { |
||||
executables: Vec<IdentifiableFuture<ExecResult>>, |
||||
fids: Vec<Uuid>, |
||||
} |
||||
|
||||
impl URunner { |
||||
pub fn new() -> Self { |
||||
Self { |
||||
executables: vec![], |
||||
fids: vec![], |
||||
} |
||||
} |
||||
|
||||
pub fn push(&mut self, job: IdentifiableFuture<ExecResult>) { |
||||
self.executables.push(job); |
||||
} |
||||
|
||||
/// Spawn prepared tasks
|
||||
pub async fn spawn(mut self) -> Self { |
||||
for executable in self.executables.drain(..) { |
||||
let handle = Handle::current(); |
||||
let fid = Uuid::new_v4(); |
||||
let tx = get_sender(); |
||||
let job_id = executable.job_ident.clone(); |
||||
self.fids.push(fid); |
||||
let job_wrapper = async move { |
||||
debug!("inside wrapper (started): {}", fid); |
||||
let result = executable.await; |
||||
tx.send(fid).await.ok(); |
||||
result |
||||
}; |
||||
let handler = JoinInfo { |
||||
job_ident: job_id, |
||||
handle: spawn_blocking(move || handle.spawn(job_wrapper)), |
||||
completed: false, |
||||
}; |
||||
FUT_RESULTS.lock().await.insert(fid, handler); |
||||
} |
||||
self |
||||
} |
||||
|
||||
/// Wait until a bunch of tasks is finished.
|
||||
/// NOT GUARANTEED that all tasks will be returned due to
|
||||
/// possibility to pop them in other places
|
||||
pub async fn wait(self) -> Vec<ExecResult> { |
||||
let mut result = vec![]; |
||||
for fid in self.fids { |
||||
if let Some(job) = Self::pop_job(fid).await { |
||||
result.push(job.wait_result().await); |
||||
} |
||||
} |
||||
result |
||||
} |
||||
|
||||
pub async fn pop_job_if_completed(fid: Uuid) -> Option<ExecResult> { |
||||
let Some(&JoinInfo { completed, .. }) = FUT_RESULTS.lock().await.get(&fid) else { |
||||
return None; |
||||
}; |
||||
if completed { |
||||
let job = Self::pop_job(fid).await.unwrap(); |
||||
Some(job.wait_result().await) |
||||
} else { |
||||
None |
||||
} |
||||
} |
||||
|
||||
pub async fn pop_completed() -> Vec<ExecResult> { |
||||
let mut completed: Vec<ExecResult> = vec![]; |
||||
let fids = FUT_RESULTS |
||||
.lock() |
||||
.await |
||||
.keys() |
||||
.copied() |
||||
.collect::<Vec<Uuid>>(); |
||||
for fid in fids { |
||||
if let Some(r) = Self::pop_job_if_completed(fid).await { |
||||
completed.push(r) |
||||
} |
||||
} |
||||
completed |
||||
} |
||||
|
||||
pub async fn stats() -> Vec<String> { |
||||
FUT_RESULTS |
||||
.lock() |
||||
.await |
||||
.values() |
||||
.map(|v| v.job_ident.clone()) |
||||
.collect() |
||||
} |
||||
|
||||
async fn pop_job(fid: Uuid) -> Option<JoinInfo> { |
||||
FUT_RESULTS.lock().await.remove(&fid) |
||||
} |
||||
} |
||||
|
||||
async fn init_receiver() { |
||||
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { |
||||
let mut lock = FUT_RESULTS.lock().await; |
||||
if let Some(j) = lock.get_mut(&fid) { |
||||
j.completed = true; |
||||
} |
||||
} |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod tests { |
||||
use super::*; |
||||
|
||||
// WTF
|
||||
// WTF
|
||||
// WTF
|
||||
#[tokio::test] |
||||
async fn test_spawn() { |
||||
use std::sync::Arc; |
||||
|
||||
let val = Arc::new(Mutex::new(0)); |
||||
let t = { |
||||
let v = val.clone(); |
||||
spawn(async move { |
||||
*v.lock().await = 5; |
||||
}) |
||||
}; |
||||
assert_eq!(0, *val.lock().await); |
||||
spawn(async {}).await.unwrap(); |
||||
assert_eq!(5, *val.lock().await); |
||||
t.await.unwrap(); |
||||
assert_eq!(5, *val.lock().await); |
||||
} |
||||
} |
Loading…
Reference in new issue