use crate::{utils::OneOrMany, models::ExecResult}; use futures::{future::BoxFuture, lock::Mutex}; use lazy_static::lazy_static; use std::{collections::HashMap}; use tokio::{ spawn, sync::mpsc::{channel, Receiver, Sender}, task::JoinHandle, }; use uuid::Uuid; pub type FutRes = ExecResult; pub type DynFut = BoxFuture<'static, FutRes>; lazy_static! { static ref FUT_RESULTS: Mutex> = Mutex::new(HashMap::new()); static ref FUT_CHANNEL: (Sender, Mutex>) = { spawn(init_receiver()); let (tx, rx) = channel(100); (tx, Mutex::new(rx)) }; } struct JoinInfo { handle: JoinHandle, completed: bool, collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed } fn get_sender() -> Sender { FUT_CHANNEL.0.clone() } pub struct Waiter { tasks: Vec, fids: Vec, } impl Waiter { pub fn new>(tasks: S) -> Self { Self { tasks: tasks.into_vec(), fids: vec![], } } pub async fn spawn(mut self) -> Self { let collectable = true; //TODO: self.tasks.len() != 1; for f in self.tasks.drain(..) { let tx = get_sender(); let fid = Uuid::new_v4(); self.fids.push(fid); let task_wrapper = async move { debug!("inside wrapper (started): {}", fid); let result = f.await; tx.send(fid).await.unwrap(); result }; let handle = JoinInfo { handle: spawn(task_wrapper), completed: false, collectable, }; FUT_RESULTS.lock().await.insert(fid, handle); } self } // wait until a bunch of tasks is finished // NOT GUARANTEED that all tasks will be returned due to // possibility to pop them in other places pub async fn wait(self) -> Vec { let mut result = vec![]; for fid in self.fids { if let Some(task) = pop_task(fid).await { let r = task.handle.await; result.push(r.unwrap()); } } result } } async fn init_receiver() { while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { //info!("init_receiver: next val: {}", &fid); if let Some(mut lock) = FUT_RESULTS.try_lock() { if let Some(j) = lock.get_mut(&fid) { //info!("init_receiver: marked as completed"); j.completed = true; } } } } async fn pop_task(fid: Uuid) -> Option { FUT_RESULTS.lock().await.remove(&fid) } /* async fn insert_task(fid: Uuid) { FUT_RESULTS.lock().await.remove(&fid) } */ pub async fn pop_task_if_completed(fid: Uuid) -> Option { let &mut JoinInfo { handle: _, collectable, completed, } = match FUT_RESULTS.lock().await.get_mut(&fid) { Some(t) => t, None => return None, }; if collectable && completed { let task = pop_task(fid).await.unwrap(); let result = task.handle.await.unwrap(); Some(result) } else { None } } pub async fn pop_completed() -> Vec { let mut completed: Vec = vec![]; let fids = FUT_RESULTS .lock() .await .keys() .map(|k| *k) .collect::>(); for fid in fids { if let Some(r) = pop_task_if_completed(fid).await { completed.push(r) } } completed } #[cfg(test)] mod tests { use super::*; // WTF // WTF // WTF #[tokio::test] async fn test_spawn() { use std::sync::Arc; let val = Arc::new(Mutex::new(0)); let t = { let v = val.clone(); spawn(async move { *v.lock().await = 5; }) }; assert_eq!(0, *val.lock().await); spawn(async {}).await.ok(); assert_eq!(5, *val.lock().await); t.await.ok(); assert_eq!(5, *val.lock().await); } }