32 changed files with 1270 additions and 728 deletions
File diff suppressed because it is too large
Load Diff
@ -1,6 +1,7 @@ |
|||||||
<mat-form-field class="info-dlg-field" floatLabel="always"> |
<mat-form-field style="box-sizing:border-box; width:100%" class="info-dlg-field" floatLabel="always"> |
||||||
<mat-label>Payload data</mat-label> |
<mat-label>Payload data</mat-label> |
||||||
<textarea matInput cdkTextareaAutosize *ngIf="!isTooBigPayload" [readonly]="isPreview" [(ngModel)]="decodedPayload"> |
<textarea class="code" matInput cdkTextareaAutosize="true" *ngIf="!isTooBigPayload" [readonly]="isPreview" |
||||||
|
[(ngModel)]="decodedPayload"> |
||||||
</textarea> |
</textarea> |
||||||
<input matInput *ngIf="isTooBigPayload" disabled placeholder="Payload is too big to display"> |
<input matInput *ngIf="isTooBigPayload" disabled placeholder="Payload is too big to display"> |
||||||
</mat-form-field> |
</mat-form-field> |
@ -1,167 +0,0 @@ |
|||||||
use crate::{models::AssignedJob, UResult}; |
|
||||||
use lazy_static::lazy_static; |
|
||||||
use std::collections::HashMap; |
|
||||||
use std::future::Future; |
|
||||||
use std::pin::Pin; |
|
||||||
use tokio::{ |
|
||||||
runtime::Handle, |
|
||||||
sync::mpsc::{channel, Receiver, Sender}, |
|
||||||
sync::Mutex, |
|
||||||
task::{spawn, spawn_blocking, JoinHandle}, |
|
||||||
}; |
|
||||||
use uuid::Uuid; |
|
||||||
|
|
||||||
pub type ExecResult = UResult<AssignedJob>; |
|
||||||
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>; |
|
||||||
|
|
||||||
lazy_static! { |
|
||||||
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); |
|
||||||
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = { |
|
||||||
spawn(init_receiver()); |
|
||||||
let (tx, rx) = channel(100); |
|
||||||
(tx, Mutex::new(rx)) |
|
||||||
}; |
|
||||||
} |
|
||||||
|
|
||||||
struct JoinInfo { |
|
||||||
handle: JoinHandle<JoinHandle<ExecResult>>, |
|
||||||
completed: bool, |
|
||||||
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
|
|
||||||
} |
|
||||||
|
|
||||||
impl JoinInfo { |
|
||||||
async fn wait_result(self) -> ExecResult { |
|
||||||
self.handle.await.unwrap().await.unwrap() |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
fn get_sender() -> Sender<Uuid> { |
|
||||||
FUT_CHANNEL.0.clone() |
|
||||||
} |
|
||||||
|
|
||||||
pub struct Waiter { |
|
||||||
tasks: Vec<BoxFuture<'static, ExecResult>>, |
|
||||||
fids: Vec<Uuid>, |
|
||||||
} |
|
||||||
|
|
||||||
impl Waiter { |
|
||||||
pub fn new() -> Self { |
|
||||||
Self { |
|
||||||
tasks: vec![], |
|
||||||
fids: vec![], |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub fn push(&mut self, task: impl Future<Output = ExecResult> + Send + 'static) { |
|
||||||
self.tasks.push(Box::pin(task)); |
|
||||||
} |
|
||||||
|
|
||||||
/// Spawn prepared tasks
|
|
||||||
pub async fn spawn(mut self) -> Self { |
|
||||||
let collectable = true; //TODO: self.tasks.len() != 1;
|
|
||||||
for f in self.tasks.drain(..) { |
|
||||||
let handle = Handle::current(); |
|
||||||
let fid = Uuid::new_v4(); |
|
||||||
let tx = get_sender(); |
|
||||||
self.fids.push(fid); |
|
||||||
let task_wrapper = async move { |
|
||||||
debug!("inside wrapper (started): {}", fid); |
|
||||||
let result = f.await; |
|
||||||
tx.send(fid).await.unwrap(); |
|
||||||
result |
|
||||||
}; |
|
||||||
let handler = JoinInfo { |
|
||||||
handle: spawn_blocking(move || handle.spawn(task_wrapper)), |
|
||||||
completed: false, |
|
||||||
collectable, |
|
||||||
}; |
|
||||||
FUT_RESULTS.lock().await.insert(fid, handler); |
|
||||||
} |
|
||||||
self |
|
||||||
} |
|
||||||
|
|
||||||
/// Wait until a bunch of tasks is finished.
|
|
||||||
/// NOT GUARANTEED that all tasks will be returned due to
|
|
||||||
/// possibility to pop them in other places
|
|
||||||
pub async fn wait(self) -> Vec<ExecResult> { |
|
||||||
let mut result = vec![]; |
|
||||||
for fid in self.fids { |
|
||||||
if let Some(task) = pop_task(fid).await { |
|
||||||
result.push(task.wait_result().await); |
|
||||||
} |
|
||||||
} |
|
||||||
result |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
async fn init_receiver() { |
|
||||||
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { |
|
||||||
let mut lock = FUT_RESULTS.lock().await; |
|
||||||
if let Some(j) = lock.get_mut(&fid) { |
|
||||||
j.completed = true; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
async fn pop_task(fid: Uuid) -> Option<JoinInfo> { |
|
||||||
FUT_RESULTS.lock().await.remove(&fid) |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> { |
|
||||||
let &JoinInfo { |
|
||||||
handle: _, |
|
||||||
collectable, |
|
||||||
completed, |
|
||||||
} = match FUT_RESULTS.lock().await.get(&fid) { |
|
||||||
Some(t) => t, |
|
||||||
None => return None, |
|
||||||
}; |
|
||||||
if collectable && completed { |
|
||||||
let task = pop_task(fid).await.unwrap(); |
|
||||||
Some(task.wait_result().await) |
|
||||||
} else { |
|
||||||
None |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn pop_completed() -> Vec<ExecResult> { |
|
||||||
let mut completed: Vec<ExecResult> = vec![]; |
|
||||||
let fids = FUT_RESULTS |
|
||||||
.lock() |
|
||||||
.await |
|
||||||
.keys() |
|
||||||
.copied() |
|
||||||
.collect::<Vec<Uuid>>(); |
|
||||||
for fid in fids { |
|
||||||
if let Some(r) = pop_task_if_completed(fid).await { |
|
||||||
completed.push(r) |
|
||||||
} |
|
||||||
} |
|
||||||
completed |
|
||||||
} |
|
||||||
|
|
||||||
#[cfg(test)] |
|
||||||
mod tests { |
|
||||||
use super::*; |
|
||||||
|
|
||||||
// WTF
|
|
||||||
// WTF
|
|
||||||
// WTF
|
|
||||||
#[tokio::test] |
|
||||||
async fn test_spawn() { |
|
||||||
use std::sync::Arc; |
|
||||||
|
|
||||||
let val = Arc::new(Mutex::new(0)); |
|
||||||
let t = { |
|
||||||
let v = val.clone(); |
|
||||||
spawn(async move { |
|
||||||
*v.lock().await = 5; |
|
||||||
}) |
|
||||||
}; |
|
||||||
assert_eq!(0, *val.lock().await); |
|
||||||
spawn(async {}).await.unwrap(); |
|
||||||
assert_eq!(5, *val.lock().await); |
|
||||||
t.await.unwrap(); |
|
||||||
assert_eq!(5, *val.lock().await); |
|
||||||
} |
|
||||||
} |
|
@ -0,0 +1,14 @@ |
|||||||
|
use serde::{Deserialize, Serialize}; |
||||||
|
|
||||||
|
use crate::{messaging::AsMsg, scheduler::EntryStat, types::Id, ufs::IndexFileMeta}; |
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)] |
||||||
|
pub struct Stats { |
||||||
|
pub agent_id: Id, |
||||||
|
pub running_jobs: Vec<String>, |
||||||
|
pub scheduled_jobs: Vec<EntryStat>, |
||||||
|
pub cached_jobs: Vec<Id>, |
||||||
|
pub cached_payloads: Vec<IndexFileMeta>, |
||||||
|
} |
||||||
|
|
||||||
|
impl AsMsg for Stats {} |
@ -0,0 +1,91 @@ |
|||||||
|
use crate::{models::PreparedJob, types::Id, u_runner::IdentifiableFuture}; |
||||||
|
use chrono::{DateTime, Utc}; |
||||||
|
use core::fmt; |
||||||
|
use serde::{Deserialize, Serialize}; |
||||||
|
use std::sync::Arc; |
||||||
|
|
||||||
|
pub trait SchedulerJob { |
||||||
|
fn call(&self) -> IdentifiableFuture<()>; |
||||||
|
} |
||||||
|
|
||||||
|
// scheduler job need to be called repeatedly,
|
||||||
|
// the only way to achieve this is to use Fn() -> Future
|
||||||
|
impl<F> SchedulerJob for F |
||||||
|
where |
||||||
|
F: Fn() -> IdentifiableFuture<()>, |
||||||
|
{ |
||||||
|
fn call(&self) -> IdentifiableFuture<()> { |
||||||
|
self() |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
#[derive(Clone)] |
||||||
|
pub struct Entry { |
||||||
|
pub entry_id: Id, |
||||||
|
pub schedule: Option<cron::Schedule>, |
||||||
|
pub next: Option<DateTime<Utc>>, |
||||||
|
pub runnable: EntryType, |
||||||
|
} |
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)] |
||||||
|
pub struct EntryStat { |
||||||
|
pub entry_id: Id, |
||||||
|
pub job_ident: String, |
||||||
|
pub schedule: String, |
||||||
|
pub next: Option<DateTime<Utc>>, |
||||||
|
} |
||||||
|
|
||||||
|
#[derive(Clone)] |
||||||
|
pub enum EntryType { |
||||||
|
Common(Arc<dyn SchedulerJob + Send + Sync>), |
||||||
|
URunner(PreparedJob), |
||||||
|
} |
||||||
|
|
||||||
|
impl<J: SchedulerJob + Send + Sync + 'static> From<J> for EntryType { |
||||||
|
fn from(value: J) -> Self { |
||||||
|
Self::Common(Arc::new(value)) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
impl From<PreparedJob> for EntryType { |
||||||
|
fn from(value: PreparedJob) -> Self { |
||||||
|
Self::URunner(value) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
impl Entry { |
||||||
|
pub fn set_next_run_time(&mut self) { |
||||||
|
self.next = self.get_next_run_time(); |
||||||
|
} |
||||||
|
|
||||||
|
pub fn get_next_run_time(&self) -> Option<DateTime<Utc>> { |
||||||
|
match &self.schedule { |
||||||
|
Some(schedule) => schedule.upcoming(Utc).next(), |
||||||
|
None => Some(Utc::now()), |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub fn as_stat(&self) -> EntryStat { |
||||||
|
let job_ident = match &self.runnable { |
||||||
|
EntryType::URunner(entry) => entry.job.meta.id.to_string(), |
||||||
|
EntryType::Common(entry) => entry.call().job_ident.to_string(), |
||||||
|
}; |
||||||
|
|
||||||
|
EntryStat { |
||||||
|
entry_id: self.entry_id, |
||||||
|
job_ident, |
||||||
|
schedule: self |
||||||
|
.schedule |
||||||
|
.as_ref() |
||||||
|
.map(|s| s.to_string()) |
||||||
|
.unwrap_or_default(), |
||||||
|
next: self.next.clone(), |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
impl fmt::Debug for Entry { |
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
||||||
|
write!(f, "{} {:?} {:?}", self.entry_id, self.schedule, self.next) |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,178 @@ |
|||||||
|
mod entry; |
||||||
|
|
||||||
|
use std::sync::Arc; |
||||||
|
use std::time::Duration; |
||||||
|
|
||||||
|
use chrono::Utc; |
||||||
|
use cron::Schedule; |
||||||
|
use entry::Entry; |
||||||
|
use once_cell::sync::Lazy; |
||||||
|
use tokio::sync::Mutex; |
||||||
|
use uuid::Uuid; |
||||||
|
|
||||||
|
use crate::jobs::AnonymousJobBatch; |
||||||
|
use crate::types::Id; |
||||||
|
|
||||||
|
use self::entry::EntryType; |
||||||
|
|
||||||
|
pub use entry::EntryStat; |
||||||
|
|
||||||
|
pub static SCHEDULER: Lazy<AsyncScheduler> = Lazy::new(|| AsyncScheduler::new()); |
||||||
|
|
||||||
|
#[derive(Clone)] |
||||||
|
pub struct AsyncScheduler { |
||||||
|
entries: Arc<Mutex<Vec<Entry>>>, |
||||||
|
} |
||||||
|
|
||||||
|
impl AsyncScheduler { |
||||||
|
pub fn new() -> AsyncScheduler { |
||||||
|
AsyncScheduler { |
||||||
|
entries: Arc::new(Mutex::new(Vec::new())), |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn start_blocking(&self) -> ! { |
||||||
|
for entry in self.entries.lock().await.iter_mut() { |
||||||
|
entry.set_next_run_time(); |
||||||
|
} |
||||||
|
|
||||||
|
loop { |
||||||
|
let mut entries = self.entries.lock().await; |
||||||
|
entries.sort_by(|b, a| b.next.cmp(&a.next)); |
||||||
|
|
||||||
|
let wait_duration = if let Some(entry) = entries.first() { |
||||||
|
let wait_millis = (entry.next.as_ref().unwrap().timestamp_millis() as u64) |
||||||
|
.saturating_sub(Utc::now().timestamp_millis() as u64); |
||||||
|
|
||||||
|
Duration::from_millis(wait_millis) |
||||||
|
} else { |
||||||
|
Duration::from_secs(1) |
||||||
|
}; |
||||||
|
|
||||||
|
drop(entries); |
||||||
|
tokio::time::sleep(wait_duration).await; |
||||||
|
|
||||||
|
let mut entries = self.entries.lock().await; |
||||||
|
let mut job_batch = vec![]; |
||||||
|
|
||||||
|
for entry in &mut *entries { |
||||||
|
match entry.next.as_ref() { |
||||||
|
Some(next) => { |
||||||
|
if next.gt(&Utc::now()) { |
||||||
|
break; |
||||||
|
} |
||||||
|
match &entry.runnable { |
||||||
|
EntryType::Common(runnable) => { |
||||||
|
let fut = runnable.call(); |
||||||
|
tokio::spawn(fut); |
||||||
|
} |
||||||
|
EntryType::URunner(runnable) => { |
||||||
|
debug!("starting assigned job {:?}", runnable.ids); |
||||||
|
job_batch.push(runnable.clone()) |
||||||
|
} |
||||||
|
} |
||||||
|
entry.set_next_run_time(); |
||||||
|
} |
||||||
|
None => {} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
AnonymousJobBatch::from_prepared_jobs(job_batch) |
||||||
|
.spawn() |
||||||
|
.await; |
||||||
|
|
||||||
|
entries.retain(|e| e.schedule.is_some()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn add_job(&self, schedule: Option<Schedule>, runnable: impl Into<EntryType>) -> Id { |
||||||
|
let entry_id = Uuid::new_v4(); |
||||||
|
let runnable = runnable.into(); |
||||||
|
let mut entry = Entry { |
||||||
|
entry_id, |
||||||
|
schedule, |
||||||
|
next: None, |
||||||
|
runnable, |
||||||
|
}; |
||||||
|
|
||||||
|
entry.set_next_run_time(); |
||||||
|
self.entries.lock().await.push(entry); |
||||||
|
|
||||||
|
entry_id |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn del_job(&self, entry_id: Id) { |
||||||
|
self.entries.lock().await.retain(|e| e.entry_id != entry_id); |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn stats(&self) -> Vec<EntryStat> { |
||||||
|
self.entries |
||||||
|
.lock() |
||||||
|
.await |
||||||
|
.iter() |
||||||
|
.map(|entry| entry.as_stat()) |
||||||
|
.collect() |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn start(&self) { |
||||||
|
let cloned = self.clone(); |
||||||
|
tokio::spawn(async move { |
||||||
|
cloned.start_blocking().await; |
||||||
|
}); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
#[cfg(test)] |
||||||
|
mod tests { |
||||||
|
use std::time::Duration; |
||||||
|
use tokio::time::sleep; |
||||||
|
|
||||||
|
use crate::u_runner::IdentifiableFuture; |
||||||
|
|
||||||
|
use super::*; |
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")] |
||||||
|
async fn scheduling() { |
||||||
|
let v = Arc::new(Mutex::new(0)); |
||||||
|
let scheduler = AsyncScheduler::new(); |
||||||
|
|
||||||
|
{ |
||||||
|
let v = v.clone(); |
||||||
|
scheduler |
||||||
|
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
||||||
|
let v = v.clone(); |
||||||
|
IdentifiableFuture::from_fut_with_ident("testf", async move { |
||||||
|
*v.lock().await += 1; |
||||||
|
}) |
||||||
|
}) |
||||||
|
.await; |
||||||
|
} |
||||||
|
|
||||||
|
scheduler |
||||||
|
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
||||||
|
IdentifiableFuture::from_fut_with_ident("testf", async move { |
||||||
|
println!("hello1"); |
||||||
|
}) |
||||||
|
}) |
||||||
|
.await; |
||||||
|
|
||||||
|
scheduler |
||||||
|
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
||||||
|
IdentifiableFuture::from_fut_with_ident("testf", async move { |
||||||
|
println!("hello2"); |
||||||
|
}) |
||||||
|
}) |
||||||
|
.await; |
||||||
|
|
||||||
|
scheduler.start().await; |
||||||
|
|
||||||
|
sleep(Duration::from_secs_f32(0.1)).await; // wait for scheduler
|
||||||
|
sleep(Duration::from_secs(1)).await; |
||||||
|
|
||||||
|
assert_eq!(*v.lock().await, 1); |
||||||
|
|
||||||
|
sleep(Duration::from_secs(1)).await; |
||||||
|
|
||||||
|
assert_eq!(*v.lock().await, 2); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,200 @@ |
|||||||
|
use crate::{models::AssignedJob, UResult}; |
||||||
|
use lazy_static::lazy_static; |
||||||
|
use std::future::Future; |
||||||
|
use std::pin::Pin; |
||||||
|
use std::task::Poll; |
||||||
|
use std::{collections::HashMap, task::Context}; |
||||||
|
use tokio::{ |
||||||
|
runtime::Handle, |
||||||
|
sync::mpsc::{channel, Receiver, Sender}, |
||||||
|
sync::Mutex, |
||||||
|
task::{spawn, spawn_blocking, JoinHandle}, |
||||||
|
}; |
||||||
|
use uuid::Uuid; |
||||||
|
|
||||||
|
pub type ExecResult = UResult<AssignedJob>; |
||||||
|
|
||||||
|
lazy_static! { |
||||||
|
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); |
||||||
|
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = { |
||||||
|
spawn(init_receiver()); |
||||||
|
let (tx, rx) = channel(100); |
||||||
|
(tx, Mutex::new(rx)) |
||||||
|
}; |
||||||
|
} |
||||||
|
|
||||||
|
pub struct IdentifiableFuture<R> { |
||||||
|
pub job_ident: String, |
||||||
|
fut: Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>, |
||||||
|
} |
||||||
|
|
||||||
|
impl<R> IdentifiableFuture<R> { |
||||||
|
pub fn from_fut_with_ident( |
||||||
|
job_ident: impl Into<String>, |
||||||
|
job: impl Future<Output = R> + Send + Sync + 'static, |
||||||
|
) -> Self { |
||||||
|
Self { |
||||||
|
fut: Box::pin(job), |
||||||
|
job_ident: job_ident.into(), |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
impl<R> Future for IdentifiableFuture<R> { |
||||||
|
type Output = R; |
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
||||||
|
self.fut.as_mut().poll(cx) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
struct JoinInfo { |
||||||
|
job_ident: String, |
||||||
|
handle: JoinHandle<JoinHandle<ExecResult>>, |
||||||
|
completed: bool, |
||||||
|
// collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
|
||||||
|
} |
||||||
|
|
||||||
|
impl JoinInfo { |
||||||
|
async fn wait_result(self) -> ExecResult { |
||||||
|
self.handle.await.unwrap().await.unwrap() |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
fn get_sender() -> Sender<Uuid> { |
||||||
|
FUT_CHANNEL.0.clone() |
||||||
|
} |
||||||
|
|
||||||
|
/// Job runner. Has multiple ways of work.
|
||||||
|
/// - run 1 or more jobs and wait until they're all done
|
||||||
|
/// - run 1 or more jobs in background and collect results of completed jobs later
|
||||||
|
pub struct URunner { |
||||||
|
executables: Vec<IdentifiableFuture<ExecResult>>, |
||||||
|
fids: Vec<Uuid>, |
||||||
|
} |
||||||
|
|
||||||
|
impl URunner { |
||||||
|
pub fn new() -> Self { |
||||||
|
Self { |
||||||
|
executables: vec![], |
||||||
|
fids: vec![], |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub fn push(&mut self, job: IdentifiableFuture<ExecResult>) { |
||||||
|
self.executables.push(job); |
||||||
|
} |
||||||
|
|
||||||
|
/// Spawn prepared tasks
|
||||||
|
pub async fn spawn(mut self) -> Self { |
||||||
|
for executable in self.executables.drain(..) { |
||||||
|
let handle = Handle::current(); |
||||||
|
let fid = Uuid::new_v4(); |
||||||
|
let tx = get_sender(); |
||||||
|
let job_id = executable.job_ident.clone(); |
||||||
|
self.fids.push(fid); |
||||||
|
let job_wrapper = async move { |
||||||
|
debug!("inside wrapper (started): {}", fid); |
||||||
|
let result = executable.await; |
||||||
|
tx.send(fid).await.ok(); |
||||||
|
result |
||||||
|
}; |
||||||
|
let handler = JoinInfo { |
||||||
|
job_ident: job_id, |
||||||
|
handle: spawn_blocking(move || handle.spawn(job_wrapper)), |
||||||
|
completed: false, |
||||||
|
}; |
||||||
|
FUT_RESULTS.lock().await.insert(fid, handler); |
||||||
|
} |
||||||
|
self |
||||||
|
} |
||||||
|
|
||||||
|
/// Wait until a bunch of tasks is finished.
|
||||||
|
/// NOT GUARANTEED that all tasks will be returned due to
|
||||||
|
/// possibility to pop them in other places
|
||||||
|
pub async fn wait(self) -> Vec<ExecResult> { |
||||||
|
let mut result = vec![]; |
||||||
|
for fid in self.fids { |
||||||
|
if let Some(job) = Self::pop_job(fid).await { |
||||||
|
result.push(job.wait_result().await); |
||||||
|
} |
||||||
|
} |
||||||
|
result |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn pop_job_if_completed(fid: Uuid) -> Option<ExecResult> { |
||||||
|
let Some(&JoinInfo { completed, .. }) = FUT_RESULTS.lock().await.get(&fid) else { |
||||||
|
return None; |
||||||
|
}; |
||||||
|
if completed { |
||||||
|
let job = Self::pop_job(fid).await.unwrap(); |
||||||
|
Some(job.wait_result().await) |
||||||
|
} else { |
||||||
|
None |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn pop_completed() -> Vec<ExecResult> { |
||||||
|
let mut completed: Vec<ExecResult> = vec![]; |
||||||
|
let fids = FUT_RESULTS |
||||||
|
.lock() |
||||||
|
.await |
||||||
|
.keys() |
||||||
|
.copied() |
||||||
|
.collect::<Vec<Uuid>>(); |
||||||
|
for fid in fids { |
||||||
|
if let Some(r) = Self::pop_job_if_completed(fid).await { |
||||||
|
completed.push(r) |
||||||
|
} |
||||||
|
} |
||||||
|
completed |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn stats() -> Vec<String> { |
||||||
|
FUT_RESULTS |
||||||
|
.lock() |
||||||
|
.await |
||||||
|
.values() |
||||||
|
.map(|v| v.job_ident.clone()) |
||||||
|
.collect() |
||||||
|
} |
||||||
|
|
||||||
|
async fn pop_job(fid: Uuid) -> Option<JoinInfo> { |
||||||
|
FUT_RESULTS.lock().await.remove(&fid) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
async fn init_receiver() { |
||||||
|
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { |
||||||
|
let mut lock = FUT_RESULTS.lock().await; |
||||||
|
if let Some(j) = lock.get_mut(&fid) { |
||||||
|
j.completed = true; |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
#[cfg(test)] |
||||||
|
mod tests { |
||||||
|
use super::*; |
||||||
|
|
||||||
|
// WTF
|
||||||
|
// WTF
|
||||||
|
// WTF
|
||||||
|
#[tokio::test] |
||||||
|
async fn test_spawn() { |
||||||
|
use std::sync::Arc; |
||||||
|
|
||||||
|
let val = Arc::new(Mutex::new(0)); |
||||||
|
let t = { |
||||||
|
let v = val.clone(); |
||||||
|
spawn(async move { |
||||||
|
*v.lock().await = 5; |
||||||
|
}) |
||||||
|
}; |
||||||
|
assert_eq!(0, *val.lock().await); |
||||||
|
spawn(async {}).await.unwrap(); |
||||||
|
assert_eq!(5, *val.lock().await); |
||||||
|
t.await.unwrap(); |
||||||
|
assert_eq!(5, *val.lock().await); |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue