You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

200 lines
5.3 KiB

use crate::{models::AssignedJob, UResult};
use lazy_static::lazy_static;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::{collections::HashMap, task::Context};
use tokio::{
runtime::Handle,
sync::mpsc::{channel, Receiver, Sender},
sync::Mutex,
task::{spawn, spawn_blocking, JoinHandle},
};
use uuid::Uuid;
pub type ExecResult = UResult<AssignedJob>;
lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
static ref FUT_CHANNEL: (Sender<Uuid>, Mutex<Receiver<Uuid>>) = {
spawn(init_receiver());
let (tx, rx) = channel(100);
(tx, Mutex::new(rx))
};
}
pub struct IdentifiableFuture<R> {
pub job_ident: String,
fut: Pin<Box<dyn Future<Output = R> + Send + Sync + 'static>>,
}
impl<R> IdentifiableFuture<R> {
pub fn from_fut_with_ident(
job_ident: impl Into<String>,
job: impl Future<Output = R> + Send + Sync + 'static,
) -> Self {
Self {
fut: Box::pin(job),
job_ident: job_ident.into(),
}
}
}
impl<R> Future for IdentifiableFuture<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut.as_mut().poll(cx)
}
}
struct JoinInfo {
job_ident: String,
handle: JoinHandle<JoinHandle<ExecResult>>,
completed: bool,
// collectable: bool, // indicates if future can be popped from pool via pop_task_if_completed
}
impl JoinInfo {
async fn wait_result(self) -> ExecResult {
self.handle.await.unwrap().await.unwrap()
}
}
fn get_sender() -> Sender<Uuid> {
FUT_CHANNEL.0.clone()
}
/// Job runner. Has multiple ways of work.
/// - run 1 or more jobs and wait until they're all done
/// - run 1 or more jobs in background and collect results of completed jobs later
pub struct URunner {
executables: Vec<IdentifiableFuture<ExecResult>>,
fids: Vec<Uuid>,
}
impl URunner {
pub fn new() -> Self {
Self {
executables: vec![],
fids: vec![],
}
}
pub fn push(&mut self, job: IdentifiableFuture<ExecResult>) {
self.executables.push(job);
}
/// Spawn prepared tasks
pub async fn spawn(mut self) -> Self {
for executable in self.executables.drain(..) {
let handle = Handle::current();
let fid = Uuid::new_v4();
let tx = get_sender();
let job_id = executable.job_ident.clone();
self.fids.push(fid);
let job_wrapper = async move {
debug!("inside wrapper (started): {}", fid);
let result = executable.await;
tx.send(fid).await.ok();
result
};
let handler = JoinInfo {
job_ident: job_id,
handle: spawn_blocking(move || handle.spawn(job_wrapper)),
completed: false,
};
FUT_RESULTS.lock().await.insert(fid, handler);
}
self
}
/// Wait until a bunch of tasks is finished.
/// NOT GUARANTEED that all tasks will be returned due to
/// possibility to pop them in other places
pub async fn wait(self) -> Vec<ExecResult> {
let mut result = vec![];
for fid in self.fids {
if let Some(job) = Self::pop_job(fid).await {
result.push(job.wait_result().await);
}
}
result
}
pub async fn pop_job_if_completed(fid: Uuid) -> Option<ExecResult> {
let Some(&JoinInfo { completed, .. }) = FUT_RESULTS.lock().await.get(&fid) else {
return None;
};
if completed {
let job = Self::pop_job(fid).await.unwrap();
Some(job.wait_result().await)
} else {
None
}
}
pub async fn pop_completed() -> Vec<ExecResult> {
let mut completed: Vec<ExecResult> = vec![];
let fids = FUT_RESULTS
.lock()
.await
.keys()
.copied()
.collect::<Vec<Uuid>>();
for fid in fids {
if let Some(r) = Self::pop_job_if_completed(fid).await {
completed.push(r)
}
}
completed
}
pub async fn stats() -> Vec<String> {
FUT_RESULTS
.lock()
.await
.values()
.map(|v| v.job_ident.clone())
.collect()
}
async fn pop_job(fid: Uuid) -> Option<JoinInfo> {
FUT_RESULTS.lock().await.remove(&fid)
}
}
async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
let mut lock = FUT_RESULTS.lock().await;
if let Some(j) = lock.get_mut(&fid) {
j.completed = true;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
// WTF
// WTF
// WTF
#[tokio::test]
async fn test_spawn() {
use std::sync::Arc;
let val = Arc::new(Mutex::new(0));
let t = {
let v = val.clone();
spawn(async move {
*v.lock().await = 5;
})
};
assert_eq!(0, *val.lock().await);
spawn(async {}).await.unwrap();
assert_eq!(5, *val.lock().await);
t.await.unwrap();
assert_eq!(5, *val.lock().await);
}
}