#[macro_use] extern crate tracing; #[cfg(test)] #[macro_use] extern crate rstest; // due to linking errors extern crate openssl; // don't touch anything extern crate diesel; // in this block mod db; mod error; mod handlers; use error::{Error as ServerError, RejResponse}; use serde::{de::DeserializeOwned, Deserialize}; use std::{convert::Infallible, path::PathBuf}; use u_lib::{ config::MASTER_PORT, logging::init_logger, messaging::{AsMsg, BaseMessage, Reportable}, models::*, utils::load_env, }; use uuid::Uuid; use warp::{ body, log::{custom, Info}, reply::{json, reply, Json, Response}, Filter, Rejection, Reply, }; use crate::db::UDB; use crate::handlers::Endpoints; #[derive(Deserialize)] struct ServEnv { admin_auth_token: String, } fn get_content() -> impl Filter,), Error = Rejection> + Clone where M: AsMsg + Sync + Send + DeserializeOwned + 'static, { body::content_length_limit(1024 * 64).and(body::json::>()) } fn into_message(msg: M) -> Json { json(&msg.as_message()) } pub fn init_endpoints( auth_token: &str, ) -> impl Filter + Clone { let path = |p: &'static str| warp::post().and(warp::path(p)); let infallible_none = |_| async { Ok::<_, Infallible>((None::,)) }; let get_agents = path("get_agents") .and( warp::path::param::() .map(Some) .or_else(infallible_none), ) .and_then(Endpoints::get_agents) .map(into_message); let upload_jobs = path("upload_jobs") .and(get_content::>()) .and_then(Endpoints::upload_jobs) .map(ok); let get_jobs = path("get_jobs") .and( warp::path::param::() .map(Some) .or_else(infallible_none), ) .and_then(Endpoints::get_jobs) .map(into_message); let get_agent_jobs = path("get_agent_jobs") .and( warp::path::param::() .map(Some) .or_else(infallible_none), ) .and_then(Endpoints::get_agent_jobs) .map(into_message); let get_personal_jobs = path("get_personal_jobs") .and(warp::path::param::()) .and_then(Endpoints::get_personal_jobs) .map(into_message); let del = path("del") .and(warp::path::param::()) .and_then(Endpoints::del) .map(ok); let set_jobs = path("set_jobs") .and(warp::path::param::()) .and(get_content::>()) .and_then(Endpoints::set_jobs) .map(into_message); let report = path("report") .and(get_content::>()) .and_then(Endpoints::report) .map(ok); let update_agent = path("update_item") .and(get_content::()) .and_then(Endpoints::update_agent) .map(ok); let update_job = path("update_item") .and(get_content::()) .and_then(Endpoints::update_job) .map(ok); let update_assigned_job = path("update_item") .and(get_content::()) .and_then(Endpoints::update_assigned_job) .map(ok); let download = path("download") .and(warp::path::param::()) .and_then(Endpoints::download) .map(ok); let ping = path("ping").map(reply); let auth_token = format!("Bearer {auth_token}",).into_boxed_str(); let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); let auth_zone = (get_agents .or(get_jobs) .or(upload_jobs) .or(del) .or(set_jobs) .or(get_agent_jobs) .or(update_agent) .or(update_job) .or(update_assigned_job) .or(download) .or(ping)) .and(auth_header); let agent_zone = get_jobs.or(get_personal_jobs).or(report).or(download); auth_zone.or(agent_zone) } pub fn preload_jobs() -> Result<(), ServerError> { let job_alias = "agent_hello"; let if_job_exists = UDB::lock_db().find_job_by_alias(job_alias)?; if if_job_exists.is_none() { let agent_hello = JobMeta::builder() .with_type(JobType::Init) .with_alias(job_alias) .build() .unwrap(); UDB::lock_db().insert_jobs(&[agent_hello])? } Ok(()) } pub async fn serve() -> Result<(), ServerError> { init_logger(Some("u_server")); preload_jobs()?; let certs_dir = PathBuf::from("certs"); let env = load_env::().map_err(|e| ServerError::Other(e.to_string()))?; let routes = init_endpoints(&env.admin_auth_token) .recover(handle_rejection) .with(custom(logger)); warp::serve(routes) .tls() .cert_path(certs_dir.join("server.crt")) .key_path(certs_dir.join("server.key")) .client_auth_required_path(certs_dir.join("ca.crt")) .run(([0, 0, 0, 0], MASTER_PORT)) .await; Ok(()) } async fn handle_rejection(rej: Rejection) -> Result { let resp = if let Some(err) = rej.find::() { error!("{:x?}", err); RejResponse::bad_request(err.to_string()) } else if rej.is_not_found() { RejResponse::not_found("not found placeholder") } else { error!("{:?}", rej); RejResponse::internal() }; Ok(resp.into_response()) } fn logger(info: Info<'_>) { info!(target: "warp", "{raddr} {agent_uid} \"{path}\" {status}", raddr = info.remote_addr().unwrap_or(([0, 0, 0, 0], 0).into()), path = info.path(), agent_uid = info.user_agent() .map(|uid: &str| uid.splitn(3, '-') .take(2) .collect::() ) .unwrap_or_else(|| "NO_AGENT".to_string()), status = info.status() ); } fn ok(_: T) -> impl Reply { reply() } /* #[cfg(test)] mod tests { use super::*; use crate::handlers::Endpoints; use handlers::build_ok; use u_lib::messaging::{AsMsg, BaseMessage, Reportable}; use uuid::Uuid; use warp::test; #[rstest] #[case(Some(Uuid::new_v4()))] #[should_panic] #[case(None)] #[tokio::test] async fn test_get_agent_jobs_unauthorized(#[case] uid: Option) { let mock = Endpoints::faux(); when!(mock.get_agent_jobs).then_return(Ok(build_ok(""))); //mock.expect().with(eq(uid)).returning(|_| Ok(build_ok(""))); test::request() .path(&format!( "/get_agent_jobs/{}", uid.map(|u| u.simple().to_string()).unwrap_or(String::new()) )) .method("GET") .filter(&init_filters("")) .await .unwrap(); } #[tokio::test] async fn test_report_unauth_successful() { let mock = Endpoints::report(); mock.expect() .withf(|msg: &BaseMessage<'_, Vec>| msg.inner_ref()[0] == Reportable::Dummy) .returning(|_| Ok(build_ok(""))); test::request() .path("/report/") .method("POST") .json(&vec![Reportable::Dummy].as_message()) .filter(&init_filters("")) .await .unwrap(); mock.checkpoint(); } } */