You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

167 lines
4.9 KiB

#[macro_use]
extern crate log;
use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};
use u_lib::models::PreparedJob;
use u_lib::scheduler::SCHEDULER;
use u_lib::u_runner::{IdentifiableFuture, URunner};
use u_lib::{
api::HttpClient,
cache::JobCache,
config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL},
error::ErrChan,
logging::init_logger,
messaging::Reportable,
models::AssignedJobById,
};
async fn process_request(assigned_jobs: Vec<AssignedJobById>, client: &HttpClient) {
for asgn_job in assigned_jobs {
if !JobCache::contains(asgn_job.job_id) {
info!("Fetching job: {}", &asgn_job.job_id);
let mut fetched_job = loop {
//todo: use payload cache
match client.get_full_job(asgn_job.job_id).await {
Ok(result) => break result,
Err(err) => {
debug!("{:?} \nretrying...", err);
sleep(AGENT_ITERATION_INTERVAL).await;
}
}
};
if let Some(payload) = &mut fetched_job.payload {
if let Err(e) = payload.maybe_split_payload() {
ErrChan::send(e, "pay").await;
continue;
}
}
JobCache::insert(fetched_job);
}
let job = match JobCache::get(asgn_job.job_id).as_deref() {
Some(job) => job.clone(),
None => continue,
};
info!("Scheduling job {}", job.meta.id.to_string());
let schedule = match job.meta.schedule.clone() {
Some(sched) => {
if sched.is_empty() {
None
} else {
match sched.as_str().try_into() {
Ok(s) => Some(s),
Err(err) => {
ErrChan::send(err, "sch").await;
continue;
}
}
}
}
None => None,
};
SCHEDULER
.add_job(schedule, PreparedJob { job, ids: asgn_job })
.await;
}
}
async fn error_reporting(client: HttpClient) {
while let Some(err) = ErrChan::recv().await {
let _ = client.report([Reportable::Error(err.clone())]).await;
}
}
async fn agent_loop(client: HttpClient) {
let self_id = get_self_id();
match client.get_personal_jobs(self_id).await {
Ok(jobs) => {
process_request(jobs, &client).await;
}
Err(err) => ErrChan::send(err, "pro").await,
}
let result: Vec<Reportable> = URunner::pop_completed()
.await
.into_iter()
.map(|result| match result {
Ok(r) => Reportable::Assigned(r),
Err(e) => Reportable::Error(e),
})
.collect();
if !result.is_empty() {
if let Err(err) = client.report(result).await {
ErrChan::send(err, "rep").await;
}
}
}
pub fn run_forever() -> ! {
let env = EndpointsEnv::load();
if cfg!(debug_assertions) {
let logfile_uid = format!(
"u_agent-{}",
get_self_id()
.hyphenated()
.to_string()
.split("-")
.next()
.unwrap()
);
init_logger(Some(&logfile_uid));
} else {
#[cfg(unix)]
u_lib::unix::daemonize()
}
info!("Starting agent {}", get_self_id());
Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let client = loop {
match HttpClient::new(&env.u_server, None).await {
Ok(client) => break client,
Err(e) => {
error!("client init failed: {}", e);
sleep(Duration::from_secs(5)).await;
continue;
}
}
};
{
let client = client.clone();
SCHEDULER
.add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || {
let client = client.clone();
IdentifiableFuture::from_fut_with_ident("error_reporting", async move {
error_reporting(client.clone()).await
})
})
.await;
}
{
let client = client.clone();
SCHEDULER
.add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || {
let client = client.clone();
IdentifiableFuture::from_fut_with_ident("agent_loop", async move {
agent_loop(client).await
})
})
.await;
}
SCHEDULER.start_blocking().await
})
}