Compare commits
No commits in common. 'f8cf7965e6e405e09d436f7bde0f011bfdc4e069' and 'c8ce2aca6078dc89182bf8ac6732e1194118779f' have entirely different histories.
f8cf7965e6
...
c8ce2aca60
32 changed files with 728 additions and 1270 deletions
File diff suppressed because it is too large
Load Diff
@ -1,7 +1,6 @@ |
|||||||
<mat-form-field style="box-sizing:border-box; width:100%" class="info-dlg-field" floatLabel="always"> |
<mat-form-field class="info-dlg-field" floatLabel="always"> |
||||||
<mat-label>Payload data</mat-label> |
<mat-label>Payload data</mat-label> |
||||||
<textarea class="code" matInput cdkTextareaAutosize="true" *ngIf="!isTooBigPayload" [readonly]="isPreview" |
<textarea matInput cdkTextareaAutosize *ngIf="!isTooBigPayload" [readonly]="isPreview" [(ngModel)]="decodedPayload"> |
||||||
[(ngModel)]="decodedPayload"> |
|
||||||
</textarea> |
</textarea> |
||||||
<input matInput *ngIf="isTooBigPayload" disabled placeholder="Payload is too big to display"> |
<input matInput *ngIf="isTooBigPayload" disabled placeholder="Payload is too big to display"> |
||||||
</mat-form-field> |
</mat-form-field> |
@ -0,0 +1,167 @@ |
|||||||
|
use crate::{models::AssignedJob, UResult}; |
||||||
|
use lazy_static::lazy_static; |
||||||
|
use std::collections::HashMap; |
||||||
|
use std::future::Future; |
||||||
|
use std::pin::Pin; |
||||||
|
use tokio::{ |
||||||
|
runtime::Handle, |
||||||
|
sync::mpsc::{channel, Receiver, Sender}, |
||||||
|
sync::Mutex, |
||||||
|
task::{spawn, spawn_blocking, JoinHandle}, |
||||||
|
}; |
||||||
|
use uuid::Uuid; |
||||||
|
|
||||||
|
pub type ExecResult = UResult<AssignedJob>; |
||||||
|
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>; |
||||||
|
|
||||||
|
lazy_static! { |
||||||
|
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); |
||||||
|
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = { |
||||||
|
spawn(init_receiver()); |
||||||
|
let (tx, rx) = channel(100); |
||||||
|
(tx, Mutex::new(rx)) |
||||||
|
}; |
||||||
|
} |
||||||
|
|
||||||
|
struct JoinInfo { |
||||||
|
handle: JoinHandle<JoinHandle<ExecResult>>, |
||||||
|
completed: bool, |
||||||
|
collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
|
||||||
|
} |
||||||
|
|
||||||
|
impl JoinInfo { |
||||||
|
async fn wait_result(self) -> ExecResult { |
||||||
|
self.handle.await.unwrap().await.unwrap() |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
fn get_sender() -> Sender<Uuid> { |
||||||
|
FUT_CHANNEL.0.clone() |
||||||
|
} |
||||||
|
|
||||||
|
pub struct Waiter { |
||||||
|
tasks: Vec<BoxFuture<'static, ExecResult>>, |
||||||
|
fids: Vec<Uuid>, |
||||||
|
} |
||||||
|
|
||||||
|
impl Waiter { |
||||||
|
pub fn new() -> Self { |
||||||
|
Self { |
||||||
|
tasks: vec![], |
||||||
|
fids: vec![], |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub fn push(&mut self, task: impl Future<Output = ExecResult> + Send + 'static) { |
||||||
|
self.tasks.push(Box::pin(task)); |
||||||
|
} |
||||||
|
|
||||||
|
/// Spawn prepared tasks
|
||||||
|
pub async fn spawn(mut self) -> Self { |
||||||
|
let collectable = true; //TODO: self.tasks.len() != 1;
|
||||||
|
for f in self.tasks.drain(..) { |
||||||
|
let handle = Handle::current(); |
||||||
|
let fid = Uuid::new_v4(); |
||||||
|
let tx = get_sender(); |
||||||
|
self.fids.push(fid); |
||||||
|
let task_wrapper = async move { |
||||||
|
debug!("inside wrapper (started): {}", fid); |
||||||
|
let result = f.await; |
||||||
|
tx.send(fid).await.unwrap(); |
||||||
|
result |
||||||
|
}; |
||||||
|
let handler = JoinInfo { |
||||||
|
handle: spawn_blocking(move || handle.spawn(task_wrapper)), |
||||||
|
completed: false, |
||||||
|
collectable, |
||||||
|
}; |
||||||
|
FUT_RESULTS.lock().await.insert(fid, handler); |
||||||
|
} |
||||||
|
self |
||||||
|
} |
||||||
|
|
||||||
|
/// Wait until a bunch of tasks is finished.
|
||||||
|
/// NOT GUARANTEED that all tasks will be returned due to
|
||||||
|
/// possibility to pop them in other places
|
||||||
|
pub async fn wait(self) -> Vec<ExecResult> { |
||||||
|
let mut result = vec![]; |
||||||
|
for fid in self.fids { |
||||||
|
if let Some(task) = pop_task(fid).await { |
||||||
|
result.push(task.wait_result().await); |
||||||
|
} |
||||||
|
} |
||||||
|
result |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
async fn init_receiver() { |
||||||
|
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { |
||||||
|
let mut lock = FUT_RESULTS.lock().await; |
||||||
|
if let Some(j) = lock.get_mut(&fid) { |
||||||
|
j.completed = true; |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
async fn pop_task(fid: Uuid) -> Option<JoinInfo> { |
||||||
|
FUT_RESULTS.lock().await.remove(&fid) |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn pop_task_if_completed(fid: Uuid) -> Option<ExecResult> { |
||||||
|
let &JoinInfo { |
||||||
|
handle: _, |
||||||
|
collectable, |
||||||
|
completed, |
||||||
|
} = match FUT_RESULTS.lock().await.get(&fid) { |
||||||
|
Some(t) => t, |
||||||
|
None => return None, |
||||||
|
}; |
||||||
|
if collectable && completed { |
||||||
|
let task = pop_task(fid).await.unwrap(); |
||||||
|
Some(task.wait_result().await) |
||||||
|
} else { |
||||||
|
None |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn pop_completed() -> Vec<ExecResult> { |
||||||
|
let mut completed: Vec<ExecResult> = vec![]; |
||||||
|
let fids = FUT_RESULTS |
||||||
|
.lock() |
||||||
|
.await |
||||||
|
.keys() |
||||||
|
.copied() |
||||||
|
.collect::<Vec<Uuid>>(); |
||||||
|
for fid in fids { |
||||||
|
if let Some(r) = pop_task_if_completed(fid).await { |
||||||
|
completed.push(r) |
||||||
|
} |
||||||
|
} |
||||||
|
completed |
||||||
|
} |
||||||
|
|
||||||
|
#[cfg(test)] |
||||||
|
mod tests { |
||||||
|
use super::*; |
||||||
|
|
||||||
|
// WTF
|
||||||
|
// WTF
|
||||||
|
// WTF
|
||||||
|
#[tokio::test] |
||||||
|
async fn test_spawn() { |
||||||
|
use std::sync::Arc; |
||||||
|
|
||||||
|
let val = Arc::new(Mutex::new(0)); |
||||||
|
let t = { |
||||||
|
let v = val.clone(); |
||||||
|
spawn(async move { |
||||||
|
*v.lock().await = 5; |
||||||
|
}) |
||||||
|
}; |
||||||
|
assert_eq!(0, *val.lock().await); |
||||||
|
spawn(async {}).await.unwrap(); |
||||||
|
assert_eq!(5, *val.lock().await); |
||||||
|
t.await.unwrap(); |
||||||
|
assert_eq!(5, *val.lock().await); |
||||||
|
} |
||||||
|
} |
@ -1,14 +0,0 @@ |
|||||||
use serde::{Deserialize, Serialize}; |
|
||||||
|
|
||||||
use crate::{messaging::AsMsg, scheduler::EntryStat, types::Id, ufs::IndexFileMeta}; |
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)] |
|
||||||
pub struct Stats { |
|
||||||
pub agent_id: Id, |
|
||||||
pub running_jobs: Vec<String>, |
|
||||||
pub scheduled_jobs: Vec<EntryStat>, |
|
||||||
pub cached_jobs: Vec<Id>, |
|
||||||
pub cached_payloads: Vec<IndexFileMeta>, |
|
||||||
} |
|
||||||
|
|
||||||
impl AsMsg for Stats {} |
|
@ -1,91 +0,0 @@ |
|||||||
use crate::{models::PreparedJob, types::Id, u_runner::IdentifiableFuture}; |
|
||||||
use chrono::{DateTime, Utc}; |
|
||||||
use core::fmt; |
|
||||||
use serde::{Deserialize, Serialize}; |
|
||||||
use std::sync::Arc; |
|
||||||
|
|
||||||
pub trait SchedulerJob { |
|
||||||
fn call(&self) -> IdentifiableFuture<()>; |
|
||||||
} |
|
||||||
|
|
||||||
// scheduler job need to be called repeatedly,
|
|
||||||
// the only way to achieve this is to use Fn() -> Future
|
|
||||||
impl<F> SchedulerJob for F |
|
||||||
where |
|
||||||
F: Fn() -> IdentifiableFuture<()>, |
|
||||||
{ |
|
||||||
fn call(&self) -> IdentifiableFuture<()> { |
|
||||||
self() |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(Clone)] |
|
||||||
pub struct Entry { |
|
||||||
pub entry_id: Id, |
|
||||||
pub schedule: Option<cron::Schedule>, |
|
||||||
pub next: Option<DateTime<Utc>>, |
|
||||||
pub runnable: EntryType, |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)] |
|
||||||
pub struct EntryStat { |
|
||||||
pub entry_id: Id, |
|
||||||
pub job_ident: String, |
|
||||||
pub schedule: String, |
|
||||||
pub next: Option<DateTime<Utc>>, |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(Clone)] |
|
||||||
pub enum EntryType { |
|
||||||
Common(Arc<dyn SchedulerJob + Send + Sync>), |
|
||||||
URunner(PreparedJob), |
|
||||||
} |
|
||||||
|
|
||||||
impl<J: SchedulerJob + Send + Sync + 'static> From<J> for EntryType { |
|
||||||
fn from(value: J) -> Self { |
|
||||||
Self::Common(Arc::new(value)) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl From<PreparedJob> for EntryType { |
|
||||||
fn from(value: PreparedJob) -> Self { |
|
||||||
Self::URunner(value) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl Entry { |
|
||||||
pub fn set_next_run_time(&mut self) { |
|
||||||
self.next = self.get_next_run_time(); |
|
||||||
} |
|
||||||
|
|
||||||
pub fn get_next_run_time(&self) -> Option<DateTime<Utc>> { |
|
||||||
match &self.schedule { |
|
||||||
Some(schedule) => schedule.upcoming(Utc).next(), |
|
||||||
None => Some(Utc::now()), |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub fn as_stat(&self) -> EntryStat { |
|
||||||
let job_ident = match &self.runnable { |
|
||||||
EntryType::URunner(entry) => entry.job.meta.id.to_string(), |
|
||||||
EntryType::Common(entry) => entry.call().job_ident.to_string(), |
|
||||||
}; |
|
||||||
|
|
||||||
EntryStat { |
|
||||||
entry_id: self.entry_id, |
|
||||||
job_ident, |
|
||||||
schedule: self |
|
||||||
.schedule |
|
||||||
.as_ref() |
|
||||||
.map(|s| s.to_string()) |
|
||||||
.unwrap_or_default(), |
|
||||||
next: self.next.clone(), |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl fmt::Debug for Entry { |
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
|
||||||
write!(f, "{} {:?} {:?}", self.entry_id, self.schedule, self.next) |
|
||||||
} |
|
||||||
} |
|
@ -1,178 +0,0 @@ |
|||||||
mod entry; |
|
||||||
|
|
||||||
use std::sync::Arc; |
|
||||||
use std::time::Duration; |
|
||||||
|
|
||||||
use chrono::Utc; |
|
||||||
use cron::Schedule; |
|
||||||
use entry::Entry; |
|
||||||
use once_cell::sync::Lazy; |
|
||||||
use tokio::sync::Mutex; |
|
||||||
use uuid::Uuid; |
|
||||||
|
|
||||||
use crate::jobs::AnonymousJobBatch; |
|
||||||
use crate::types::Id; |
|
||||||
|
|
||||||
use self::entry::EntryType; |
|
||||||
|
|
||||||
pub use entry::EntryStat; |
|
||||||
|
|
||||||
pub static SCHEDULER: Lazy<AsyncScheduler> = Lazy::new(|| AsyncScheduler::new()); |
|
||||||
|
|
||||||
#[derive(Clone)] |
|
||||||
pub struct AsyncScheduler { |
|
||||||
entries: Arc<Mutex<Vec<Entry>>>, |
|
||||||
} |
|
||||||
|
|
||||||
impl AsyncScheduler { |
|
||||||
pub fn new() -> AsyncScheduler { |
|
||||||
AsyncScheduler { |
|
||||||
entries: Arc::new(Mutex::new(Vec::new())), |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn start_blocking(&self) -> ! { |
|
||||||
for entry in self.entries.lock().await.iter_mut() { |
|
||||||
entry.set_next_run_time(); |
|
||||||
} |
|
||||||
|
|
||||||
loop { |
|
||||||
let mut entries = self.entries.lock().await; |
|
||||||
entries.sort_by(|b, a| b.next.cmp(&a.next)); |
|
||||||
|
|
||||||
let wait_duration = if let Some(entry) = entries.first() { |
|
||||||
let wait_millis = (entry.next.as_ref().unwrap().timestamp_millis() as u64) |
|
||||||
.saturating_sub(Utc::now().timestamp_millis() as u64); |
|
||||||
|
|
||||||
Duration::from_millis(wait_millis) |
|
||||||
} else { |
|
||||||
Duration::from_secs(1) |
|
||||||
}; |
|
||||||
|
|
||||||
drop(entries); |
|
||||||
tokio::time::sleep(wait_duration).await; |
|
||||||
|
|
||||||
let mut entries = self.entries.lock().await; |
|
||||||
let mut job_batch = vec![]; |
|
||||||
|
|
||||||
for entry in &mut *entries { |
|
||||||
match entry.next.as_ref() { |
|
||||||
Some(next) => { |
|
||||||
if next.gt(&Utc::now()) { |
|
||||||
break; |
|
||||||
} |
|
||||||
match &entry.runnable { |
|
||||||
EntryType::Common(runnable) => { |
|
||||||
let fut = runnable.call(); |
|
||||||
tokio::spawn(fut); |
|
||||||
} |
|
||||||
EntryType::URunner(runnable) => { |
|
||||||
debug!("starting assigned job {:?}", runnable.ids); |
|
||||||
job_batch.push(runnable.clone()) |
|
||||||
} |
|
||||||
} |
|
||||||
entry.set_next_run_time(); |
|
||||||
} |
|
||||||
None => {} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
AnonymousJobBatch::from_prepared_jobs(job_batch) |
|
||||||
.spawn() |
|
||||||
.await; |
|
||||||
|
|
||||||
entries.retain(|e| e.schedule.is_some()); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn add_job(&self, schedule: Option<Schedule>, runnable: impl Into<EntryType>) -> Id { |
|
||||||
let entry_id = Uuid::new_v4(); |
|
||||||
let runnable = runnable.into(); |
|
||||||
let mut entry = Entry { |
|
||||||
entry_id, |
|
||||||
schedule, |
|
||||||
next: None, |
|
||||||
runnable, |
|
||||||
}; |
|
||||||
|
|
||||||
entry.set_next_run_time(); |
|
||||||
self.entries.lock().await.push(entry); |
|
||||||
|
|
||||||
entry_id |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn del_job(&self, entry_id: Id) { |
|
||||||
self.entries.lock().await.retain(|e| e.entry_id != entry_id); |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn stats(&self) -> Vec<EntryStat> { |
|
||||||
self.entries |
|
||||||
.lock() |
|
||||||
.await |
|
||||||
.iter() |
|
||||||
.map(|entry| entry.as_stat()) |
|
||||||
.collect() |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn start(&self) { |
|
||||||
let cloned = self.clone(); |
|
||||||
tokio::spawn(async move { |
|
||||||
cloned.start_blocking().await; |
|
||||||
}); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
#[cfg(test)] |
|
||||||
mod tests { |
|
||||||
use std::time::Duration; |
|
||||||
use tokio::time::sleep; |
|
||||||
|
|
||||||
use crate::u_runner::IdentifiableFuture; |
|
||||||
|
|
||||||
use super::*; |
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread")] |
|
||||||
async fn scheduling() { |
|
||||||
let v = Arc::new(Mutex::new(0)); |
|
||||||
let scheduler = AsyncScheduler::new(); |
|
||||||
|
|
||||||
{ |
|
||||||
let v = v.clone(); |
|
||||||
scheduler |
|
||||||
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
|
||||||
let v = v.clone(); |
|
||||||
IdentifiableFuture::from_fut_with_ident("testf", async move { |
|
||||||
*v.lock().await += 1; |
|
||||||
}) |
|
||||||
}) |
|
||||||
.await; |
|
||||||
} |
|
||||||
|
|
||||||
scheduler |
|
||||||
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
|
||||||
IdentifiableFuture::from_fut_with_ident("testf", async move { |
|
||||||
println!("hello1"); |
|
||||||
}) |
|
||||||
}) |
|
||||||
.await; |
|
||||||
|
|
||||||
scheduler |
|
||||||
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
|
||||||
IdentifiableFuture::from_fut_with_ident("testf", async move { |
|
||||||
println!("hello2"); |
|
||||||
}) |
|
||||||
}) |
|
||||||
.await; |
|
||||||
|
|
||||||
scheduler.start().await; |
|
||||||
|
|
||||||
sleep(Duration::from_secs_f32(0.1)).await; // wait for scheduler
|
|
||||||
sleep(Duration::from_secs(1)).await; |
|
||||||
|
|
||||||
assert_eq!(*v.lock().await, 1); |
|
||||||
|
|
||||||
sleep(Duration::from_secs(1)).await; |
|
||||||
|
|
||||||
assert_eq!(*v.lock().await, 2); |
|
||||||
} |
|
||||||
} |
|
@ -1,200 +0,0 @@ |
|||||||
use crate::{models::AssignedJob, UResult}; |
|
||||||
use lazy_static::lazy_static; |
|
||||||
use std::future::Future; |
|
||||||
use std::pin::Pin; |
|
||||||
use std::task::Poll; |
|
||||||
use std::{collections::HashMap, task::Context}; |
|
||||||
use tokio::{ |
|
||||||
runtime::Handle, |
|
||||||
sync::mpsc::{channel, Receiver, Sender}, |
|
||||||
sync::Mutex, |
|
||||||
task::{spawn, spawn_blocking, JoinHandle}, |
|
||||||
}; |
|
||||||
use uuid::Uuid; |
|
||||||
|
|
||||||
pub type ExecResult = UResult<AssignedJob>; |
|
||||||
|
|
||||||
lazy_static! { |
|
||||||
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); |
|
||||||
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = { |
|
||||||
spawn(init_receiver()); |
|
||||||
let (tx, rx) = channel(100); |
|
||||||
(tx, Mutex::new(rx)) |
|
||||||
}; |
|
||||||
} |
|
||||||
|
|
||||||
pub struct IdentifiableFuture<R> { |
|
||||||
pub job_ident: String, |
|
||||||
fut: Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>, |
|
||||||
} |
|
||||||
|
|
||||||
impl<R> IdentifiableFuture<R> { |
|
||||||
pub fn from_fut_with_ident( |
|
||||||
job_ident: impl Into<String>, |
|
||||||
job: impl Future<Output = R> + Send + Sync + 'static, |
|
||||||
) -> Self { |
|
||||||
Self { |
|
||||||
fut: Box::pin(job), |
|
||||||
job_ident: job_ident.into(), |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl<R> Future for IdentifiableFuture<R> { |
|
||||||
type Output = R; |
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
|
||||||
self.fut.as_mut().poll(cx) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
struct JoinInfo { |
|
||||||
job_ident: String, |
|
||||||
handle: JoinHandle<JoinHandle<ExecResult>>, |
|
||||||
completed: bool, |
|
||||||
// collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
|
|
||||||
} |
|
||||||
|
|
||||||
impl JoinInfo { |
|
||||||
async fn wait_result(self) -> ExecResult { |
|
||||||
self.handle.await.unwrap().await.unwrap() |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
fn get_sender() -> Sender<Uuid> { |
|
||||||
FUT_CHANNEL.0.clone() |
|
||||||
} |
|
||||||
|
|
||||||
/// Job runner. Has multiple ways of work.
|
|
||||||
/// - run 1 or more jobs and wait until they're all done
|
|
||||||
/// - run 1 or more jobs in background and collect results of completed jobs later
|
|
||||||
pub struct URunner { |
|
||||||
executables: Vec<IdentifiableFuture<ExecResult>>, |
|
||||||
fids: Vec<Uuid>, |
|
||||||
} |
|
||||||
|
|
||||||
impl URunner { |
|
||||||
pub fn new() -> Self { |
|
||||||
Self { |
|
||||||
executables: vec![], |
|
||||||
fids: vec![], |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub fn push(&mut self, job: IdentifiableFuture<ExecResult>) { |
|
||||||
self.executables.push(job); |
|
||||||
} |
|
||||||
|
|
||||||
/// Spawn prepared tasks
|
|
||||||
pub async fn spawn(mut self) -> Self { |
|
||||||
for executable in self.executables.drain(..) { |
|
||||||
let handle = Handle::current(); |
|
||||||
let fid = Uuid::new_v4(); |
|
||||||
let tx = get_sender(); |
|
||||||
let job_id = executable.job_ident.clone(); |
|
||||||
self.fids.push(fid); |
|
||||||
let job_wrapper = async move { |
|
||||||
debug!("inside wrapper (started): {}", fid); |
|
||||||
let result = executable.await; |
|
||||||
tx.send(fid).await.ok(); |
|
||||||
result |
|
||||||
}; |
|
||||||
let handler = JoinInfo { |
|
||||||
job_ident: job_id, |
|
||||||
handle: spawn_blocking(move || handle.spawn(job_wrapper)), |
|
||||||
completed: false, |
|
||||||
}; |
|
||||||
FUT_RESULTS.lock().await.insert(fid, handler); |
|
||||||
} |
|
||||||
self |
|
||||||
} |
|
||||||
|
|
||||||
/// Wait until a bunch of tasks is finished.
|
|
||||||
/// NOT GUARANTEED that all tasks will be returned due to
|
|
||||||
/// possibility to pop them in other places
|
|
||||||
pub async fn wait(self) -> Vec<ExecResult> { |
|
||||||
let mut result = vec![]; |
|
||||||
for fid in self.fids { |
|
||||||
if let Some(job) = Self::pop_job(fid).await { |
|
||||||
result.push(job.wait_result().await); |
|
||||||
} |
|
||||||
} |
|
||||||
result |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn pop_job_if_completed(fid: Uuid) -> Option<ExecResult> { |
|
||||||
let Some(&JoinInfo { completed, .. }) = FUT_RESULTS.lock().await.get(&fid) else { |
|
||||||
return None; |
|
||||||
}; |
|
||||||
if completed { |
|
||||||
let job = Self::pop_job(fid).await.unwrap(); |
|
||||||
Some(job.wait_result().await) |
|
||||||
} else { |
|
||||||
None |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn pop_completed() -> Vec<ExecResult> { |
|
||||||
let mut completed: Vec<ExecResult> = vec![]; |
|
||||||
let fids = FUT_RESULTS |
|
||||||
.lock() |
|
||||||
.await |
|
||||||
.keys() |
|
||||||
.copied() |
|
||||||
.collect::<Vec<Uuid>>(); |
|
||||||
for fid in fids { |
|
||||||
if let Some(r) = Self::pop_job_if_completed(fid).await { |
|
||||||
completed.push(r) |
|
||||||
} |
|
||||||
} |
|
||||||
completed |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn stats() -> Vec<String> { |
|
||||||
FUT_RESULTS |
|
||||||
.lock() |
|
||||||
.await |
|
||||||
.values() |
|
||||||
.map(|v| v.job_ident.clone()) |
|
||||||
.collect() |
|
||||||
} |
|
||||||
|
|
||||||
async fn pop_job(fid: Uuid) -> Option<JoinInfo> { |
|
||||||
FUT_RESULTS.lock().await.remove(&fid) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
async fn init_receiver() { |
|
||||||
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { |
|
||||||
let mut lock = FUT_RESULTS.lock().await; |
|
||||||
if let Some(j) = lock.get_mut(&fid) { |
|
||||||
j.completed = true; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
#[cfg(test)] |
|
||||||
mod tests { |
|
||||||
use super::*; |
|
||||||
|
|
||||||
// WTF
|
|
||||||
// WTF
|
|
||||||
// WTF
|
|
||||||
#[tokio::test] |
|
||||||
async fn test_spawn() { |
|
||||||
use std::sync::Arc; |
|
||||||
|
|
||||||
let val = Arc::new(Mutex::new(0)); |
|
||||||
let t = { |
|
||||||
let v = val.clone(); |
|
||||||
spawn(async move { |
|
||||||
*v.lock().await = 5; |
|
||||||
}) |
|
||||||
}; |
|
||||||
assert_eq!(0, *val.lock().await); |
|
||||||
spawn(async {}).await.unwrap(); |
|
||||||
assert_eq!(5, *val.lock().await); |
|
||||||
t.await.unwrap(); |
|
||||||
assert_eq!(5, *val.lock().await); |
|
||||||
} |
|
||||||
} |
|
Loading…
Reference in new issue