#[macro_use] extern crate tracing; #[cfg(test)] #[macro_use] extern crate rstest; mod db; mod error; mod handlers; use db::PgRepo; use error::{Error as ServerError, RejResponse}; use std::{convert::Infallible, sync::Arc}; use u_lib::{ config, db::async_pool, jobs::fat_meta_to_thin, messaging::{AsMsg, Reportable}, models::*, types::Id, }; use warp::{ body, log::{custom, Info}, reply::{json, reply, Json, Response}, Filter, Rejection, Reply, }; use crate::handlers::Endpoints; type ValidJobMeta = FatJobMeta; fn into_message(msg: M) -> Json { json(&msg) } pub fn init_endpoints( auth_token: &str, db: PgRepo, ) -> impl Filter + Clone { let path = |p: &'static str| warp::post().and(warp::path(p)); let infallible_none = |_| async { Result::<(Option,), Infallible>::Ok((None,)) }; let with_db = { let adb = Arc::new(db); warp::any().map(move || adb.clone()) }; let get_agents = path("get_agents") .and(with_db.clone()) .and(warp::path::param::().map(Some).or_else(infallible_none)) .and_then(Endpoints::get_agents) .map(into_message); let upload_jobs = path("upload_jobs") .and(with_db.clone()) .and(body::json::>()) .and_then(Endpoints::upload_jobs) .map(into_message); let get_job = path("get_job") .and(with_db.clone()) .and(warp::path::param::()) .and_then(Endpoints::get_job) .map(into_message); let get_jobs = path("get_jobs") .and(with_db.clone()) .and_then(Endpoints::get_jobs) .map(into_message); let get_agent_jobs = path("get_agent_jobs") .and(with_db.clone()) .and(warp::path::param::().map(Some).or_else(infallible_none)) .and_then(Endpoints::get_agent_jobs) .map(into_message); let get_personal_jobs = path("get_personal_jobs") .and(with_db.clone()) .and(warp::path::param::()) .and_then(Endpoints::get_personal_jobs) .map(into_message); let del = path("del") .and(with_db.clone()) .and(warp::path::param::()) .and_then(Endpoints::del) .map(ok); let set_jobs = path("set_jobs") .and(with_db.clone()) .and(warp::path::param::()) .and(body::json::>()) .and_then(Endpoints::set_jobs) .map(into_message); let report = path("report") .and(with_db.clone()) .and(body::json::>()) .and(warp::header("User-Agent")) .and_then(Endpoints::report) .map(ok); let update_agent = path("update_agent") .and(with_db.clone()) .and(body::json::()) .and_then(Endpoints::update_agent) .map(ok); let update_job = path("update_job") .and(with_db.clone()) .and(body::json::()) .and_then(Endpoints::update_job) .map(ok); let update_assigned_job = path("update_result") .and(with_db.clone()) .and(body::json::()) .and_then(Endpoints::update_assigned_job) .map(ok); let download = path("download") .and(warp::path::param::()) .and_then(Endpoints::download) .map(ok); let ping = path("ping").map(reply); let auth_token = format!("Bearer {auth_token}",).into_boxed_str(); let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); let auth_zone = (get_agents .or(get_job.clone()) .or(get_jobs.clone()) .or(upload_jobs) .or(del) .or(set_jobs) .or(get_agent_jobs) .or(update_agent.or(update_job).or(update_assigned_job)) .or(download) .or(ping)) .and(auth_header); let agent_zone = get_job .or(get_jobs) .or(get_personal_jobs) .or(report) .or(download); auth_zone.or(agent_zone) } pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> { repo.interact(|mut db| { let job_alias = "agent_hello"; let if_job_exists = db.find_job_by_alias(job_alias)?; if if_job_exists.is_none() { let agent_hello = fat_meta_to_thin( FatJobMeta::builder() .with_type(JobType::Init) .with_alias(job_alias) .build() .unwrap(), )?; db.insert_jobs(&[agent_hello])?; } Ok(()) }) .await } pub async fn serve() -> Result<(), ServerError> { let env = config::DBEnv::load()?; let pool = async_pool(&env); let db = PgRepo::new(pool); preload_jobs(&db).await?; let env = config::AccessEnv::load()?; let routes = init_endpoints(&env.admin_auth_token, db) .recover(handle_rejection) .with(custom(logger)); let server_cert = include_bytes!("../../../certs/server.crt"); let server_key = include_bytes!("../../../certs/server.key"); let ca = include_bytes!("../../../certs/ca.crt"); warp::serve(routes) .tls() .cert(server_cert) .key(server_key) .client_auth_required(ca) .run(([0, 0, 0, 0], config::MASTER_PORT)) .await; Ok(()) } async fn handle_rejection(rej: Rejection) -> Result { let resp = if let Some(err) = rej.find::() { error!("{:?}", err); RejResponse::bad_request(err.to_string()) } else if rej.is_not_found() { RejResponse::not_found("not found placeholder") } else { error!("{:?}", rej); RejResponse::internal() }; Ok(resp.into_response()) } fn logger(info: Info<'_>) { info!(target: "warp", "{raddr} {agent_id} \"{path}\" {status}", raddr = info.remote_addr().unwrap_or(([0, 0, 0, 0], 0).into()), path = info.path(), agent_id = info.user_agent() .map(|id: &str| id.splitn(3, '-') .take(2) .collect::() ) .unwrap_or_else(|| "NO_AGENT".to_string()), status = info.status() ); } fn ok(_: T) -> impl Reply { reply() } /* #[cfg(test)] mod tests { use super::*; use crate::handlers::Endpoints; use handlers::build_ok; use u_lib::messaging::{AsMsg, BaseMessage, Reportable}; use uuid::Uuid; use warp::test; #[rstest] #[case(Some(Uuid::new_v4()))] #[should_panic] #[case(None)] #[tokio::test] async fn test_get_agent_jobs_unauthorized(#[case] uid: Option) { let mock = Endpoints::faux(); when!(mock.get_agent_jobs).then_return(Ok(build_ok(""))); //mock.expect().with(eq(uid)).returning(|_| Ok(build_ok(""))); test::request() .path(&format!( "/get_agent_jobs/{}", uid.map(|u| u.simple().to_string()).unwrap_or(String::new()) )) .method("GET") .filter(&init_filters("")) .await .unwrap(); } #[tokio::test] async fn test_report_unauth_successful() { let mock = Endpoints::report(); mock.expect() .withf(|msg: &BaseMessage<'_, Vec>| msg.inner_ref()[0] == Reportable::Dummy) .returning(|_| Ok(build_ok(""))); test::request() .path("/report/") .method("POST") .json(&vec![Reportable::Dummy].as_message()) .filter(&init_filters("")) .await .unwrap(); mock.checkpoint(); } } */