parent
c8ce2aca60
commit
b75871e25d
14 changed files with 695 additions and 254 deletions
@ -0,0 +1,66 @@ |
||||
use crate::{models::PreparedJob, types::Id}; |
||||
use chrono::{DateTime, Utc}; |
||||
use core::fmt; |
||||
use std::{future::Future, pin::Pin, sync::Arc}; |
||||
|
||||
pub type JobFuture = Box<dyn Future<Output = ()> + Send>; |
||||
|
||||
pub trait SchedulerJob { |
||||
fn call(&self) -> Pin<JobFuture>; |
||||
} |
||||
|
||||
impl<F, T> SchedulerJob for F |
||||
where |
||||
F: Fn() -> T, |
||||
T: Future<Output = ()> + Send + 'static, |
||||
{ |
||||
fn call(&self) -> Pin<JobFuture> { |
||||
Box::pin(self()) |
||||
} |
||||
} |
||||
|
||||
#[derive(Clone)] |
||||
pub struct Entry { |
||||
pub id: Id, |
||||
pub job_id: Id, |
||||
pub schedule: Option<cron::Schedule>, |
||||
pub next: Option<DateTime<Utc>>, |
||||
pub runnable: EntryType, |
||||
} |
||||
|
||||
#[derive(Clone)] |
||||
pub enum EntryType { |
||||
Common(Arc<dyn SchedulerJob + Send + Sync>), |
||||
URunner(PreparedJob), |
||||
} |
||||
|
||||
impl<J: SchedulerJob + Send + Sync + 'static> From<J> for EntryType { |
||||
fn from(value: J) -> Self { |
||||
Self::Common(Arc::new(value)) |
||||
} |
||||
} |
||||
|
||||
impl From<PreparedJob> for EntryType { |
||||
fn from(value: PreparedJob) -> Self { |
||||
Self::URunner(value) |
||||
} |
||||
} |
||||
|
||||
impl Entry { |
||||
pub fn set_next_run_time(&mut self) { |
||||
self.next = self.get_next_run_time(); |
||||
} |
||||
|
||||
pub fn get_next_run_time(&self) -> Option<DateTime<Utc>> { |
||||
match &self.schedule { |
||||
Some(schedule) => schedule.upcoming(Utc).next(), |
||||
None => Some(Utc::now()), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl fmt::Debug for Entry { |
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
||||
write!(f, "{} {:?} {:?}", self.id, self.schedule, self.next) |
||||
} |
||||
} |
@ -0,0 +1,187 @@ |
||||
mod entry; |
||||
mod spawner; |
||||
|
||||
use std::sync::Arc; |
||||
use std::time::Duration; |
||||
|
||||
use chrono::Utc; |
||||
use cron::Schedule; |
||||
use entry::Entry; |
||||
use once_cell::sync::Lazy; |
||||
use tokio::sync::Mutex; |
||||
use uuid::Uuid; |
||||
|
||||
use crate::types::Id; |
||||
|
||||
use self::entry::EntryType; |
||||
use self::spawner::{CommonSpawner, Spawner, URunnerSpawner}; |
||||
|
||||
pub static SCHEDULER: Lazy<AsyncScheduler> = Lazy::new(|| AsyncScheduler::new()); |
||||
|
||||
#[derive(Clone)] |
||||
pub struct AsyncScheduler { |
||||
entries: Arc<Mutex<Vec<Entry>>>, |
||||
} |
||||
|
||||
impl AsyncScheduler { |
||||
pub fn new() -> AsyncScheduler { |
||||
AsyncScheduler { |
||||
entries: Arc::new(Mutex::new(Vec::new())), |
||||
} |
||||
} |
||||
|
||||
pub async fn start_blocking(&self) -> ! { |
||||
for entry in self.entries.lock().await.iter_mut() { |
||||
entry.set_next_run_time(); |
||||
} |
||||
|
||||
loop { |
||||
let mut entries = self.entries.lock().await; |
||||
entries.sort_by(|b, a| b.next.cmp(&a.next)); |
||||
|
||||
let wait_duration = if let Some(entry) = entries.first() { |
||||
let wait_millis = (entry.next.as_ref().unwrap().timestamp_millis() as u64) |
||||
.saturating_sub(Utc::now().timestamp_millis() as u64); |
||||
|
||||
Duration::from_millis(wait_millis) |
||||
} else { |
||||
Duration::from_secs(1) |
||||
}; |
||||
|
||||
drop(entries); |
||||
tokio::time::sleep(wait_duration).await; |
||||
|
||||
let mut entries = self.entries.lock().await; |
||||
|
||||
CommonSpawner { |
||||
entries: &mut entries, |
||||
} |
||||
.spawn() |
||||
.await; |
||||
|
||||
URunnerSpawner { |
||||
entries: &mut entries, |
||||
} |
||||
.spawn() |
||||
.await; |
||||
|
||||
entries.retain(|e| e.schedule.is_some()); |
||||
} |
||||
} |
||||
|
||||
pub async fn add_job(&self, schedule: Option<Schedule>, runnable: impl Into<EntryType>) -> Id { |
||||
let entry_id = Uuid::new_v4(); |
||||
let runnable = runnable.into(); |
||||
let job_id = match &runnable { |
||||
EntryType::URunner(j) => j.job.meta.id, |
||||
_ => Id::new_v4(), |
||||
}; |
||||
let mut entry = Entry { |
||||
id: entry_id, |
||||
job_id, |
||||
schedule, |
||||
next: None, |
||||
runnable, |
||||
}; |
||||
|
||||
entry.set_next_run_time(); |
||||
self.entries.lock().await.push(entry); |
||||
|
||||
entry_id |
||||
} |
||||
|
||||
pub async fn del_job(&self, entry_id: Id) { |
||||
self.entries.lock().await.retain(|e| e.id != entry_id); |
||||
} |
||||
|
||||
pub async fn start(&self) { |
||||
let cloned = self.clone(); |
||||
tokio::spawn(async move { |
||||
cloned.start_blocking().await; |
||||
}); |
||||
} |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod tests { |
||||
use std::time::Duration; |
||||
use tokio::time::sleep; |
||||
|
||||
use super::*; |
||||
|
||||
#[tokio::test(flavor = "multi_thread")] |
||||
async fn scheduling() { |
||||
let v = Arc::new(Mutex::new(0)); |
||||
let scheduler = AsyncScheduler::new(); |
||||
|
||||
{ |
||||
let v = v.clone(); |
||||
scheduler |
||||
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
||||
let v = v.clone(); |
||||
async move { |
||||
*v.lock().await += 1; |
||||
} |
||||
}) |
||||
.await; |
||||
} |
||||
|
||||
scheduler |
||||
.add_job( |
||||
Some("*/1 * * * * * *".try_into().unwrap()), |
||||
move || async move { |
||||
println!("hello1"); |
||||
}, |
||||
) |
||||
.await; |
||||
|
||||
scheduler |
||||
.add_job( |
||||
Some("*/1 * * * * * *".try_into().unwrap()), |
||||
move || async move { |
||||
println!("hello2"); |
||||
}, |
||||
) |
||||
.await; |
||||
|
||||
scheduler.start().await; |
||||
|
||||
sleep(Duration::from_secs_f32(0.1)).await; // wait for scheduler
|
||||
sleep(Duration::from_secs(1)).await; |
||||
|
||||
assert_eq!(*v.lock().await, 1); |
||||
|
||||
sleep(Duration::from_secs(1)).await; |
||||
|
||||
assert_eq!(*v.lock().await, 2); |
||||
} |
||||
|
||||
/// Run task every second.
|
||||
/// Task will be cancelled if not completed until next run
|
||||
/// (and this task lasts 2 secs),
|
||||
/// so the counter value should still be 0
|
||||
#[tokio::test(flavor = "multi_thread")] |
||||
async fn cancellation() { |
||||
let counter = Arc::new(Mutex::new(0)); |
||||
let scheduler = AsyncScheduler::new(); |
||||
|
||||
{ |
||||
let counter = counter.clone(); |
||||
scheduler |
||||
.add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { |
||||
let counter = counter.clone(); |
||||
async move { |
||||
sleep(Duration::from_secs(2)).await; |
||||
*counter.lock().await += 1; |
||||
} |
||||
}) |
||||
.await; |
||||
} |
||||
|
||||
scheduler.start().await; |
||||
|
||||
sleep(Duration::from_secs(3)).await; |
||||
|
||||
assert_eq!(*counter.lock().await, 0); |
||||
} |
||||
} |
@ -0,0 +1,85 @@ |
||||
use std::time::Duration; |
||||
|
||||
use crate::jobs::AnonymousJobBatch; |
||||
|
||||
use super::entry::{Entry, EntryType}; |
||||
use chrono::Utc; |
||||
use tokio::time::timeout; |
||||
|
||||
#[async_trait] |
||||
pub trait Spawner { |
||||
async fn spawn(self); |
||||
} |
||||
|
||||
/// Spawn jobs in scheduler thru `tokio::spawn`
|
||||
pub struct CommonSpawner<'e> { |
||||
pub entries: &'e mut [Entry], |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl Spawner for CommonSpawner<'_> { |
||||
async fn spawn(self) { |
||||
for entry in self |
||||
.entries |
||||
.iter_mut() |
||||
.filter(|e| matches!(e.runnable, EntryType::Common(_))) |
||||
{ |
||||
if let EntryType::Common(runnable) = &entry.runnable { |
||||
match entry.next.as_ref() { |
||||
Some(next) => { |
||||
let cancel_timeout = |
||||
next.timestamp_millis() - Utc::now().timestamp_millis(); |
||||
let cancel_timeout = Duration::from_millis(cancel_timeout as u64); |
||||
|
||||
tokio::spawn(timeout(cancel_timeout, runnable.call())); |
||||
|
||||
entry.set_next_run_time(); |
||||
} |
||||
None => { |
||||
error!("not implemented yet"); |
||||
todo!(); |
||||
} |
||||
} |
||||
} |
||||
if entry.next.as_ref().unwrap().gt(&Utc::now()) { |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// Spawn jobs in scheduler thru `URunner`
|
||||
pub struct URunnerSpawner<'e> { |
||||
pub entries: &'e mut [Entry], |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl Spawner for URunnerSpawner<'_> { |
||||
async fn spawn(self) { |
||||
let mut job_batch = vec![]; |
||||
for entry in self |
||||
.entries |
||||
.iter_mut() |
||||
.filter(|e| matches!(e.runnable, EntryType::URunner(_))) |
||||
{ |
||||
if let EntryType::URunner(runnable) = &entry.runnable { |
||||
match entry.next.as_ref() { |
||||
Some(_) => { |
||||
job_batch.push(runnable.clone()); |
||||
entry.set_next_run_time(); |
||||
} |
||||
None => { |
||||
error!("not implemented yet"); |
||||
todo!(); |
||||
} |
||||
} |
||||
} |
||||
if entry.next.as_ref().unwrap().gt(&Utc::now()) { |
||||
break; |
||||
} |
||||
} |
||||
AnonymousJobBatch::from_prepared_jobs(job_batch) |
||||
.spawn() |
||||
.await; |
||||
} |
||||
} |
Loading…
Reference in new issue