You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

282 lines
7.7 KiB

#[macro_use]
extern crate tracing;
#[cfg(test)]
#[macro_use]
extern crate rstest;
mod db;
mod error;
mod handlers;
use db::PgRepo;
use error::{Error as ServerError, RejResponse};
use serde::de::DeserializeOwned;
use std::{convert::Infallible, sync::Arc};
use u_lib::{
config,
db::async_pool,
messaging::{AsMsg, BaseMessage, Reportable},
models::*,
};
use uuid::Uuid;
use warp::{
body,
log::{custom, Info},
reply::{json, reply, Json, Response},
Filter, Rejection, Reply,
};
use crate::handlers::Endpoints;
fn get_content<M>() -> impl Filter<Extract = (BaseMessage<'static, M>,), Error = Rejection> + Clone
where
M: AsMsg + Sync + Send + DeserializeOwned + 'static,
{
body::json::<BaseMessage<M>>()
}
fn into_message<M: AsMsg>(msg: M) -> Json {
json(&msg.as_message())
}
pub fn init_endpoints(
auth_token: &str,
db: PgRepo,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let path = |p: &'static str| warp::post().and(warp::path(p));
let infallible_none = |_| async { Ok::<_, Infallible>((None::<Uuid>,)) };
let with_db = {
let adb = Arc::new(db);
warp::any().map(move || adb.clone())
};
let get_agents = path("get_agents")
.and(with_db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_agents)
.map(into_message);
let upload_jobs = path("upload_jobs")
.and(with_db.clone())
.and(get_content::<Vec<JobMeta>>())
.and_then(Endpoints::upload_jobs)
.map(into_message);
let get_jobs = path("get_jobs")
.and(with_db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_jobs)
.map(into_message);
let get_agent_jobs = path("get_agent_jobs")
.and(with_db.clone())
.and(
warp::path::param::<Uuid>()
.map(Some)
.or_else(infallible_none),
)
.and_then(Endpoints::get_agent_jobs)
.map(into_message);
let get_personal_jobs = path("get_personal_jobs")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::get_personal_jobs)
.map(into_message);
let del = path("del")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::del)
.map(ok);
let set_jobs = path("set_jobs")
.and(with_db.clone())
.and(warp::path::param::<Uuid>())
.and(get_content::<Vec<String>>())
.and_then(Endpoints::set_jobs)
.map(into_message);
let report = path("report")
.and(with_db.clone())
.and(get_content::<Vec<Reportable>>())
.and_then(Endpoints::report)
.map(ok);
let update_agent = path("update_agent")
.and(with_db.clone())
.and(get_content::<Agent>())
.and_then(Endpoints::update_agent)
.map(ok);
let update_job = path("update_job")
.and(with_db.clone())
.and(get_content::<JobMeta>())
.and_then(Endpoints::update_job)
.map(ok);
let update_assigned_job = path("update_result")
.and(with_db.clone())
.and(get_content::<AssignedJob>())
.and_then(Endpoints::update_assigned_job)
.map(ok);
let download = path("download")
.and(warp::path::param::<String>())
.and_then(Endpoints::download)
.map(ok);
let ping = path("ping").map(reply);
let auth_token = format!("Bearer {auth_token}",).into_boxed_str();
let auth_header = warp::header::exact("authorization", Box::leak(auth_token));
let auth_zone = (get_agents
.or(get_jobs.clone())
.or(upload_jobs)
.or(del)
.or(set_jobs)
.or(get_agent_jobs)
.or(update_agent.or(update_job).or(update_assigned_job))
.or(download)
.or(ping))
.and(auth_header);
let agent_zone = get_jobs.or(get_personal_jobs).or(report).or(download);
auth_zone.or(agent_zone)
}
pub async fn preload_jobs(repo: &PgRepo) -> Result<(), ServerError> {
repo.interact(|mut db| {
let job_alias = "agent_hello";
let if_job_exists = db.find_job_by_alias(job_alias)?;
if if_job_exists.is_none() {
let agent_hello = JobMeta::builder()
.with_type(JobType::Init)
.with_alias(job_alias)
.build()
.unwrap();
db.insert_jobs(&[agent_hello])?;
}
Ok(())
})
.await
}
pub async fn serve() -> Result<(), ServerError> {
let env = config::db::load().unwrap();
let pool = async_pool(&env);
let db = PgRepo::new(pool);
preload_jobs(&db).await?;
let env = config::admin::load().map_err(|e| ServerError::Other(e.to_string()))?;
let routes = init_endpoints(&env.admin_auth_token, db)
.recover(handle_rejection)
.with(custom(logger));
let server_cert = include_bytes!("../../../certs/server.crt");
let server_key = include_bytes!("../../../certs/server.key");
let ca = include_bytes!("../../../certs/ca.crt");
warp::serve(routes)
3 years ago
.tls()
.cert(server_cert)
.key(server_key)
.client_auth_required(ca)
.run(([0, 0, 0, 0], config::MASTER_PORT))
.await;
Ok(())
}
async fn handle_rejection(rej: Rejection) -> Result<Response, Infallible> {
let resp = if let Some(err) = rej.find::<ServerError>() {
error!("{:x?}", err);
RejResponse::bad_request(err.to_string())
} else if rej.is_not_found() {
RejResponse::not_found("not found placeholder")
} else {
error!("{:?}", rej);
RejResponse::internal()
};
Ok(resp.into_response())
}
fn logger(info: Info<'_>) {
info!(target: "warp",
"{raddr} {agent_uid} \"{path}\" {status}",
raddr = info.remote_addr().unwrap_or(([0, 0, 0, 0], 0).into()),
path = info.path(),
agent_uid = info.user_agent()
.map(|uid: &str| uid.splitn(3, '-')
.take(2)
.collect::<String>()
)
.unwrap_or_else(|| "NO_AGENT".to_string()),
status = info.status()
);
}
fn ok<T>(_: T) -> impl Reply {
reply()
}
/*
#[cfg(test)]
mod tests {
use super::*;
use crate::handlers::Endpoints;
use handlers::build_ok;
use u_lib::messaging::{AsMsg, BaseMessage, Reportable};
use uuid::Uuid;
use warp::test;
#[rstest]
#[case(Some(Uuid::new_v4()))]
#[should_panic]
#[case(None)]
#[tokio::test]
async fn test_get_agent_jobs_unauthorized(#[case] uid: Option<Uuid>) {
let mock = Endpoints::faux();
when!(mock.get_agent_jobs).then_return(Ok(build_ok("")));
//mock.expect().with(eq(uid)).returning(|_| Ok(build_ok("")));
test::request()
.path(&format!(
"/get_agent_jobs/{}",
uid.map(|u| u.simple().to_string()).unwrap_or(String::new())
))
.method("GET")
.filter(&init_filters(""))
.await
.unwrap();
}
3 years ago
#[tokio::test]
async fn test_report_unauth_successful() {
let mock = Endpoints::report();
3 years ago
mock.expect()
.withf(|msg: &BaseMessage<'_, Vec<Reportable>>| msg.inner_ref()[0] == Reportable::Dummy)
3 years ago
.returning(|_| Ok(build_ok("")));
test::request()
3 years ago
.path("/report/")
.method("POST")
.json(&vec![Reportable::Dummy].as_message())
.filter(&init_filters(""))
3 years ago
.await
.unwrap();
mock.checkpoint();
}
}
*/