You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
3.8 KiB

// list of jobs: job (cmd, args) OR rust fn OR python func + cron-like timing
// job runner (thread)
// every job runs in other thread/process
/*
use cron::Schedule as CronSchedule;
enum Schedule {
Persistent, // run forever, restart if stops (set max_retries)
Cron(CronSchedule),
Once
}
4 years ago
*/
4 years ago
use crate::{
contracts::*,
UResult,
};
use std::{
sync::{Mutex, MutexGuard, Arc},
};
//use tokio::task::JoinHandle;
use futures::future::{join_all, JoinAll};
use futures::Future;
type Executables = Vec<Job>;
struct AsyncExecutor {
new_jobs: Executables,
promises: Executables
}
4 years ago
impl AsyncExecutor {
pub fn new() -> Self {
Self {
new_jobs: vec![],
promises: vec![]
}
}
/*
pub fn process_jobs(&mut self) {
new_jobs
if job.state() == JobState::Pending {
tokio::spawn(job.run);
}
}
*/
pub async fn apply_job(&mut self, new_job: Job) -> UResult<JobResult> {
4 years ago
/*let id = new_job.id();
let mut job_pool = self.jobs.lock().unwrap();
job_pool.push(new_job);
id*/
tokio::spawn(async move {
new_job.run().await
}).await.unwrap()
}
pub async fn apply_jobs(&mut self, new_jobs: Vec<Job>) -> Vec<UResult<JobResult>> {
/*
let mut job_pool = self.jobs.lock().unwrap();
job_pool.extend(
new_jobs.into_iter()
.map(|job| (job.id(), job))
);*/
let futures = new_jobs.into_iter().map(|mut job| {
job.run()
}).collect::<Vec<_>>();
join_all(futures).await
}
/*
pub fn get_job_result(&self, id: &Uuid) -> Option<JobResult> {
let mut job_pool = self.jobs.lock().unwrap();
let (state, result) = match job_pool.get(id) {
Some(job) => (job.state(), job.get_result()),
None => return None
};
if state == JobState::Finished {
job_pool.remove(&id);
}
Some(result)
}
4 years ago
pub fn get_all_results(&self) -> Vec<JobResult> {
let mut job_pool = self.jobs.lock().unwrap();
let to_remove = job_pool.values()
.filter(|job| job.finished())
.map(|job| job.id())
.collect::<Vec<Uuid>>();
let results = job_pool.values()
.map(|job| job.get_result())
.collect();
to_remove.into_iter().for_each(|id| {
job_pool.remove(&id);
});
results
}*/
}
lazy_static! {
static ref EXECUTOR: Mutex<AsyncExecutor> =
Mutex::new(AsyncExecutor::new());
}
/*
pub fn get_job_result(id: &Uuid, wait: bool) -> Option<JobResult> {
let executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
loop {
let result = executor.get_job_result(&id);
if wait {
if result.is_some() && result.as_ref().unwrap().state == JobState::Finished {
return result
}
thread::sleep(Duration::from_secs(1))
} else {
return result
}
}
}
pub fn get_all_results() -> Vec<JobResult> {
let executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
executor.get_all_results()
}
*/
4 years ago
// run jobs without awaiting (now is waiting)
pub async fn send_jobs_to_executor(
jobs: Vec<JobMetaRef>
) -> Vec<UResult<JobResult>> {
let mut executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
let executables = jobs.into_iter()
.map(|job_meta| Job::new(job_meta))
.collect();
executor.apply_jobs(executables).await
}
4 years ago
// run job and await result
pub async fn exec_job(job_meta: JobMetaRef) -> UResult<JobResult> {
let mut executor: MutexGuard<AsyncExecutor> = EXECUTOR.lock().unwrap();
let job = Job::new(job_meta);
executor.apply_job(job).await
}