use crate::{messaging::Reportable, utils::OneOrVec}; use futures::{future::BoxFuture, lock::Mutex}; use lazy_static::lazy_static; use std::collections::HashMap; use tokio::{ spawn, sync::mpsc::{channel, Receiver, Sender}, task::JoinHandle, }; use uuid::Uuid; pub type DynFut = BoxFuture<'static, Reportable>; lazy_static! { static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); static ref FUT_CHANNEL: (Sender, Mutex>) = { spawn(init_receiver()); let (tx, rx) = channel(100); (tx, Mutex::new(rx)) }; } struct JoinInfo { handle: JoinHandle, completed: bool, collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed } fn get_sender() -> Sender { FUT_CHANNEL.0.clone() } pub struct Waiter { tasks: Vec, fids: Vec, } impl Waiter { pub fn new(tasks: impl OneOrVec) -> Self { Self { tasks: tasks.into_vec(), fids: vec![], } } pub async fn spawn(mut self) -> Self { let collectable = true; //TODO: self.tasks.len() != 1; for f in self.tasks.drain(..) { let tx = get_sender(); let fid = Uuid::new_v4(); self.fids.push(fid); let task_wrapper = async move { debug!("inside wrapper (started): {}", fid); let result = f.await; tx.send(fid).await.unwrap(); result }; let handler = JoinInfo { handle: spawn(task_wrapper), completed: false, collectable, }; FUT_RESULTS.lock().await.insert(fid, handler); } self } /// Wait until a bunch of tasks is finished. /// NOT GUARANTEED that all tasks will be returned due to /// possibility to pop them in other places pub async fn wait(self) -> Vec { let mut result = vec![]; for fid in self.fids { if let Some(task) = pop_task(fid).await { let r = task.handle.await; result.push(r.unwrap()); } } result } } async fn init_receiver() { while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { if let Some(mut lock) = FUT_RESULTS.try_lock() { if let Some(j) = lock.get_mut(&fid) { j.completed = true; } } } } async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } pub async fn pop_task_if_completed(fid: Uuid) -> Option { let &mut JoinInfo { handle: _, collectable, completed, } = match FUT_RESULTS.lock().await.get_mut(&fid) { Some(t) => t, None => return None, }; if collectable && completed { let task = pop_task(fid).await.unwrap(); let result = task.handle.await.unwrap(); Some(result) } else { None } } pub async fn pop_completed() -> Vec { let mut completed: Vec = vec![]; let fids = FUT_RESULTS .lock() .await .keys() .copied() .collect::>(); for fid in fids { if let Some(r) = pop_task_if_completed(fid).await { completed.push(r) } } completed } #[cfg(test)] mod tests { use super::*; // WTF // WTF // WTF #[tokio::test] async fn test_spawn() { use std::sync::Arc; let val = Arc::new(Mutex::new(0)); let t = { let v = val.clone(); spawn(async move { *v.lock().await = 5; }) }; assert_eq!(0, *val.lock().await); spawn(async {}).await.unwrap(); assert_eq!(5, *val.lock().await); t.await.unwrap(); assert_eq!(5, *val.lock().await); } }