You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

108 lines
3.3 KiB

// TODO:
// поддержка питона
// резолв адреса управляющего сервера через DoT
// кроссплатформенность (реализовать интерфейс для винды и никсов)
#[macro_use]
extern crate log;
extern crate env_logger;
use std::env;
use std::panic;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use u_lib::{
api::ClientHandler,
builder::JobBuilder,
cache::JobCache,
errors::ErrChan,
executor::pop_completed,
messaging::Reportable,
models::AssignedJob,
UError,
UID,
//daemonize
};
const ITERATION_LATENCY: u64 = 5;
pub async fn process_request(job_requests: Vec<AssignedJob>, client: &ClientHandler) {
if !job_requests.is_empty() {
for jr in &job_requests {
if !JobCache::contains(jr.job_id) {
debug!("Fetching job: {}", &jr.job_id);
let fetched_job = loop {
match client.get_jobs(Some(jr.job_id)).await {
Ok(mut result) => break result.pop().unwrap(),
Err(err) => {
debug!("{:?} \nretrying...", err);
sleep(Duration::from_secs(ITERATION_LATENCY)).await;
}
}
};
JobCache::insert(fetched_job);
}
}
debug!(
"Scheduling jobs: {}",
job_requests
.iter()
.map(|j| j.job_id.to_string())
.collect::<Vec<String>>()
.join(", ")
);
let mut builder = JobBuilder::from_request(job_requests);
let errors = builder.pop_errors();
if !errors.is_empty() {
errors.into_iter().for_each(ErrChan::send)
}
builder.unwrap_one().spawn().await;
}
}
async fn error_reporting(client: Arc<ClientHandler>) -> ! {
loop {
let err = ErrChan::recv();
debug!("Error encountered: {:?}", err);
'retry: for _ in 0..3 {
match client.report(&[Reportable::Error(err.to_string())]).await {
Ok(_) => break 'retry,
Err(e) => {
debug!("Reporting error: {:?}", e);
sleep(Duration::from_secs(10)).await;
}
}
}
}
}
async fn do_stuff(client: Arc<ClientHandler>) -> ! {
loop {
match client.get_personal_jobs(Some(*UID)).await {
Ok(resp) => {
let job_requests = resp.into_builtin_vec();
process_request(job_requests, &client).await;
}
Err(err) => ErrChan::send(err),
}
let result: Vec<Reportable> = pop_completed().await.into_iter().collect();
if !result.is_empty() {
if let Err(err) = client.report(&result).await {
ErrChan::send(err)
}
}
sleep(Duration::from_secs(ITERATION_LATENCY)).await;
}
}
pub async fn run_forever() {
//daemonize();
env_logger::init();
let arg_ip = env::args().nth(1);
let client = Arc::new(ClientHandler::new(arg_ip.as_deref()));
panic::set_hook(Box::new(|panic_info| {
ErrChan::send(UError::Panic(panic_info.to_string()))
}));
tokio::spawn(error_reporting(client.clone()));
do_stuff(client).await;
}