diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 043022a..c9b1dde 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -15,6 +15,7 @@ hyper = "0.14" mockall = "0.9.1" mockall_double = "0.2" openssl = "*" +sha2 = "0.9.5" [dependencies.diesel] features = ["postgres", "uuid"] @@ -41,4 +42,4 @@ path = "src/u_server.rs" [[bin]] name = "u_server" -path = "src/main.rs" \ No newline at end of file +path = "src/main.rs" diff --git a/bin/u_server/src/filters.rs b/bin/u_server/src/filters.rs index 59527a0..3fe321b 100644 --- a/bin/u_server/src/filters.rs +++ b/bin/u_server/src/filters.rs @@ -1,5 +1,6 @@ use crate::handlers::Endpoints; use serde::de::DeserializeOwned; +use std::convert::Infallible; use std::env; use u_lib::{ messaging::{AsMsg, BaseMessage}, @@ -16,7 +17,7 @@ where } pub fn make_filters() -> impl Filter + Clone { - let infallible_none = |_| async { Ok::<(Option,), std::convert::Infallible>((None,)) }; + let infallible_none = |_| async { Ok::<(Option,), Infallible>((None,)) }; let get_agents = warp::get() .and(warp::path("get_agents")) @@ -68,7 +69,18 @@ pub fn make_filters() -> impl Filter let report = warp::post() .and(warp::path("report")) - .and(get_content::>().and_then(Endpoints::report)); + .and(get_content::>()) + .and_then(Endpoints::report); + + let download = warp::get() + .and(warp::path("dl")) + .and(warp::path::param::()) + .and_then(Endpoints::download); + + let download_request = warp::post() + .and(warp::path("dlr")) + .and(get_content::()) + .and_then(Endpoints::download_request); let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); @@ -81,7 +93,11 @@ pub fn make_filters() -> impl Filter .or(get_agent_jobs)) .and(auth_header); - let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); + let agent_zone = get_jobs + .or(get_personal_jobs) + .or(report) + .or(download) + .or(download_request); auth_zone.or(agent_zone) } diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index db8ff4c..d4f5b0e 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -1,9 +1,12 @@ -use crate::db::UDB; +use crate::{ + db::UDB, + storages::{FileIndex, Mapper}, +}; use diesel::SaveChangesDsl; use hyper::Body; use serde::Serialize; use u_lib::{ - messaging::{AsMsg, BaseMessage}, + messaging::{AsMsg, BaseMessage, DownloadInfo}, models::{Agent, AgentState, AssignedJob, ExecResult, JobMeta, JobState}, ULocalError, }; @@ -25,6 +28,10 @@ pub fn build_err(body: S) -> Response { build_response(StatusCode::BAD_REQUEST, body.to_string()) } +pub fn build_not_found() -> Response { + build_response(StatusCode::NOT_FOUND, "") +} + pub fn build_message(m: M) -> Response { warp::reply::json(&m.as_message()).into_response() } @@ -168,4 +175,23 @@ impl Endpoints { } Ok(build_ok("")) } + + pub async fn download_request( + msg: BaseMessage<'static, String>, + ) -> Result, Rejection> { + let agent_id = msg.id; + FileIndex:: + } + + pub async fn download(uid: Uuid) -> Result, Rejection> { + let data = match Mapper::get(uid) { + Some(mut r) => { + let mut buf = vec![]; + r.read_to_end(&mut buf).unwrap(); + build_ok(buf) + } + None => build_not_found(), + }; + Ok(data) + } } diff --git a/bin/u_server/src/storages/files.rs b/bin/u_server/src/storages/files.rs new file mode 100644 index 0000000..0fe441d --- /dev/null +++ b/bin/u_server/src/storages/files.rs @@ -0,0 +1,65 @@ +use crate::worker::Worker; +use once_cell::sync::Lazy; +use sha2::{Digest, Sha512}; +use std::fs; +use std::path::PathBuf; +use u_lib::utils::{Hexlify, SharedStorage}; + +pub struct FileMeta { + hashsum: String, + path: PathBuf, + name: String, +} + +static FILES: Lazy> = SharedStorage::new(); + +pub struct FileIndex; + +impl FileIndex { + pub fn search_by_name(pat: impl Into) -> Option { + let name = pat.into(); + let (key, _) = FILES.lock().iter().find(|(_, v)| v.name == name)?; + Some(*key) + } + + pub fn get(key: impl Into) -> Option { + let name = key.into(); + FILES.get(&name).and_then(|r| Some(*r)) + } +} + +impl Worker for FileIndex { + fn process(&mut self) { + let hexlify = |data: &[u8]| format!("{:x}", Hexlify(data)); + let mut hasher = Sha512::new(); + for file in fs::read_dir("./files/hosted/").unwrap() { + let file = match file { + Ok(f) => f.path(), + Err(e) => { + warn!("Error reading file: {}", e); + continue; + } + }; + if file.is_file() { + let file_data = fs::read(file).unwrap(); + hasher.update(file_data); + let hashsum = hasher.finalize_reset(); + let shrinked_sum = hashsum + .iter() + .cloned() + .take(32) + .map(|b| b ^ 0x5a) + .collect::>(); + let shrinked_sum = hexlify(&shrinked_sum); + let hashsum = hexlify(&hashsum); + let name = file.file_name().unwrap().to_string_lossy().into_owned(); + let meta = FileMeta { + hashsum, + name, + path: file, + }; + FILES.lock().insert(shrinked_sum, meta); + } + } + } +} diff --git a/bin/u_server/src/storages/mapper.rs b/bin/u_server/src/storages/mapper.rs new file mode 100644 index 0000000..985c6f2 --- /dev/null +++ b/bin/u_server/src/storages/mapper.rs @@ -0,0 +1,78 @@ +use once_cell::sync::Lazy; +use std::collections::HashSet; +use std::io::Read; +use std::iter::Iterator; +use std::time::{Duration, SystemTime}; +use u_lib::utils::SharedStorage; +use uuid::Uuid; + +type Data = Box; + +/// This struct holds uids of various readable objects (files, streams, etc...) +/// with timeout and permissions for any uids to read data +struct MappedData { + //ids that allowed to read the data + for_ids: HashSet, + created: SystemTime, + timeout: Duration, + data: Data, +} + +impl MappedData { + pub fn new( + ids: impl Iterator, + data: impl Read + Send + Sync + 'static, + timeout: Duration, + ) -> Self { + MappedData { + for_ids: ids.collect(), + created: SystemTime::now(), + timeout, + data: Box::new(data), + } + } +} + +static MAPPER: Lazy> = SharedStorage::new(); + +pub struct Mapper; + +impl Mapper { + fn remove_overdue() { + MAPPER + .lock() + .retain(|_, v| v.created.elapsed().unwrap() < v.timeout); + } + + pub fn get(item_uid: Uuid, getter_uid: Uuid) -> Option { + Self::remove_overdue(); + let allowed_ids = MAPPER.lock().get(&item_uid)?.for_ids; + if allowed_ids.contains(&getter_uid) { + MAPPER.lock().remove(&item_uid).map(|d| d.data) + } else { + None + } + } + + pub fn set( + for_ids: impl Iterator, + data: impl Read + Send + Sync + 'static, + timeout: Option, + ) -> Uuid { + Self::remove_overdue(); + let uid = Uuid::new_v4(); + let timeout = timeout.unwrap_or(Duration::from_secs(60)); + let mapped = MappedData::new(for_ids, data, timeout); + MAPPER.lock().insert(uid, mapped); + uid + } +} + +// init: +// fill index struct with files allowed to download (probably auto-refresh index on file adding) +// hashmap +// download: +// 1) agent sends download_request with hash of wanted file +// 2) handler checks if this agent is allowed to dl file +// 3) if all ok, create in entry in mapper and return its uid and hashsum to agent +// 4) agent downloads file by uid, uid destructed after that diff --git a/bin/u_server/src/storages/mod.rs b/bin/u_server/src/storages/mod.rs new file mode 100644 index 0000000..cf23190 --- /dev/null +++ b/bin/u_server/src/storages/mod.rs @@ -0,0 +1,5 @@ +pub mod files; +pub mod mapper; + +pub use files::FileIndex; +pub use mapper::Mapper; diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index cbba6b1..98ceb6a 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -14,13 +14,25 @@ extern crate diesel; mod db; mod filters; mod handlers; +mod storages; +mod worker; use db::UDB; use filters::make_filters; +use std::thread; +use std::time::Duration; +use storages::FileIndex; use u_lib::{config::MASTER_PORT, models::*, utils::init_env}; use warp::Filter; +use worker::Worker; -const LOGFILE: &str = "u_server.log"; +fn init_workers() { + let workers: Vec> = vec![Box::new(FileIndex)]; + thread::spawn(move || loop { + workers.into_iter().for_each(|w| w.process()); + thread::sleep(Duration::from_secs(5)); + }); +} fn prefill_jobs() { let agent_hello = JobMeta::builder() @@ -28,12 +40,19 @@ fn prefill_jobs() { .with_alias("agent_hello") .build() .unwrap(); - UDB::lock_db().insert_jobs(&[agent_hello]).ok(); + let update_agent = JobMeta::builder() + .with_type(misc::JobType::Update) + .with_alias("update_agent") + .build() + .unwrap(); + let all = [agent_hello, update_agent]; + UDB::lock_db().insert_jobs(&all).ok(); } fn init_logger() { use simplelog::*; use std::fs::OpenOptions; + const LOGFILE: &str = "u_server.log"; let log_cfg = ConfigBuilder::new() .set_time_format_str("%x %X") .set_time_to_local(true) diff --git a/bin/u_server/src/worker.rs b/bin/u_server/src/worker.rs new file mode 100644 index 0000000..4e5f201 --- /dev/null +++ b/bin/u_server/src/worker.rs @@ -0,0 +1,3 @@ +pub trait Worker { + fn process(&mut self); +}