partially implemented some structs, but not the behaviour

4-update-check
plazmoid 3 years ago
parent 37ae67bd1c
commit 1124495540
  1. 1
      bin/u_server/Cargo.toml
  2. 22
      bin/u_server/src/filters.rs
  3. 30
      bin/u_server/src/handlers.rs
  4. 65
      bin/u_server/src/storages/files.rs
  5. 78
      bin/u_server/src/storages/mapper.rs
  6. 5
      bin/u_server/src/storages/mod.rs
  7. 23
      bin/u_server/src/u_server.rs
  8. 3
      bin/u_server/src/worker.rs

@ -15,6 +15,7 @@ hyper = "0.14"
mockall = "0.9.1" mockall = "0.9.1"
mockall_double = "0.2" mockall_double = "0.2"
openssl = "*" openssl = "*"
sha2 = "0.9.5"
[dependencies.diesel] [dependencies.diesel]
features = ["postgres", "uuid"] features = ["postgres", "uuid"]

@ -1,5 +1,6 @@
use crate::handlers::Endpoints; use crate::handlers::Endpoints;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use std::convert::Infallible;
use std::env; use std::env;
use u_lib::{ use u_lib::{
messaging::{AsMsg, BaseMessage}, messaging::{AsMsg, BaseMessage},
@ -16,7 +17,7 @@ where
} }
pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let infallible_none = |_| async { Ok::<(Option<Uuid>,), std::convert::Infallible>((None,)) }; let infallible_none = |_| async { Ok::<(Option<Uuid>,), Infallible>((None,)) };
let get_agents = warp::get() let get_agents = warp::get()
.and(warp::path("get_agents")) .and(warp::path("get_agents"))
@ -68,7 +69,18 @@ pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection>
let report = warp::post() let report = warp::post()
.and(warp::path("report")) .and(warp::path("report"))
.and(get_content::<Vec<ExecResult>>().and_then(Endpoints::report)); .and(get_content::<Vec<ExecResult>>())
.and_then(Endpoints::report);
let download = warp::get()
.and(warp::path("dl"))
.and(warp::path::param::<Uuid>())
.and_then(Endpoints::download);
let download_request = warp::post()
.and(warp::path("dlr"))
.and(get_content::<String>())
.and_then(Endpoints::download_request);
let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str(); let auth_token = format!("Bearer {}", env::var("ADMIN_AUTH_TOKEN").unwrap()).into_boxed_str();
let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); let auth_header = warp::header::exact("authorization", Box::leak(auth_token));
@ -81,7 +93,11 @@ pub fn make_filters() -> impl Filter<Extract = (impl Reply,), Error = Rejection>
.or(get_agent_jobs)) .or(get_agent_jobs))
.and(auth_header); .and(auth_header);
let agent_zone = get_jobs.clone().or(get_personal_jobs).or(report); let agent_zone = get_jobs
.or(get_personal_jobs)
.or(report)
.or(download)
.or(download_request);
auth_zone.or(agent_zone) auth_zone.or(agent_zone)
} }

@ -1,9 +1,12 @@
use crate::db::UDB; use crate::{
db::UDB,
storages::{FileIndex, Mapper},
};
use diesel::SaveChangesDsl; use diesel::SaveChangesDsl;
use hyper::Body; use hyper::Body;
use serde::Serialize; use serde::Serialize;
use u_lib::{ use u_lib::{
messaging::{AsMsg, BaseMessage}, messaging::{AsMsg, BaseMessage, DownloadInfo},
models::{Agent, AgentState, AssignedJob, ExecResult, JobMeta, JobState}, models::{Agent, AgentState, AssignedJob, ExecResult, JobMeta, JobState},
ULocalError, ULocalError,
}; };
@ -25,6 +28,10 @@ pub fn build_err<S: ToString>(body: S) -> Response<Body> {
build_response(StatusCode::BAD_REQUEST, body.to_string()) build_response(StatusCode::BAD_REQUEST, body.to_string())
} }
pub fn build_not_found() -> Response<Body> {
build_response(StatusCode::NOT_FOUND, "")
}
pub fn build_message<M: AsMsg + Serialize>(m: M) -> Response<Body> { pub fn build_message<M: AsMsg + Serialize>(m: M) -> Response<Body> {
warp::reply::json(&m.as_message()).into_response() warp::reply::json(&m.as_message()).into_response()
} }
@ -168,4 +175,23 @@ impl Endpoints {
} }
Ok(build_ok("")) Ok(build_ok(""))
} }
pub async fn download_request(
msg: BaseMessage<'static, String>,
) -> Result<Response<Body>, Rejection> {
let agent_id = msg.id;
FileIndex::
}
pub async fn download(uid: Uuid) -> Result<Response<Body>, Rejection> {
let data = match Mapper::get(uid) {
Some(mut r) => {
let mut buf = vec![];
r.read_to_end(&mut buf).unwrap();
build_ok(buf)
}
None => build_not_found(),
};
Ok(data)
}
} }

@ -0,0 +1,65 @@
use crate::worker::Worker;
use once_cell::sync::Lazy;
use sha2::{Digest, Sha512};
use std::fs;
use std::path::PathBuf;
use u_lib::utils::{Hexlify, SharedStorage};
pub struct FileMeta {
hashsum: String,
path: PathBuf,
name: String,
}
static FILES: Lazy<SharedStorage<String, FileMeta>> = SharedStorage::new();
pub struct FileIndex;
impl FileIndex {
pub fn search_by_name(pat: impl Into<String>) -> Option<String> {
let name = pat.into();
let (key, _) = FILES.lock().iter().find(|(_, v)| v.name == name)?;
Some(*key)
}
pub fn get(key: impl Into<String>) -> Option<FileMeta> {
let name = key.into();
FILES.get(&name).and_then(|r| Some(*r))
}
}
impl Worker for FileIndex {
fn process(&mut self) {
let hexlify = |data: &[u8]| format!("{:x}", Hexlify(data));
let mut hasher = Sha512::new();
for file in fs::read_dir("./files/hosted/").unwrap() {
let file = match file {
Ok(f) => f.path(),
Err(e) => {
warn!("Error reading file: {}", e);
continue;
}
};
if file.is_file() {
let file_data = fs::read(file).unwrap();
hasher.update(file_data);
let hashsum = hasher.finalize_reset();
let shrinked_sum = hashsum
.iter()
.cloned()
.take(32)
.map(|b| b ^ 0x5a)
.collect::<Vec<u8>>();
let shrinked_sum = hexlify(&shrinked_sum);
let hashsum = hexlify(&hashsum);
let name = file.file_name().unwrap().to_string_lossy().into_owned();
let meta = FileMeta {
hashsum,
name,
path: file,
};
FILES.lock().insert(shrinked_sum, meta);
}
}
}
}

@ -0,0 +1,78 @@
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::io::Read;
use std::iter::Iterator;
use std::time::{Duration, SystemTime};
use u_lib::utils::SharedStorage;
use uuid::Uuid;
type Data = Box<dyn Read + Send + Sync>;
/// This struct holds uids of various readable objects (files, streams, etc...)
/// with timeout and permissions for any uids to read data
struct MappedData {
//ids that allowed to read the data
for_ids: HashSet<Uuid>,
created: SystemTime,
timeout: Duration,
data: Data,
}
impl MappedData {
pub fn new(
ids: impl Iterator<Item = Uuid>,
data: impl Read + Send + Sync + 'static,
timeout: Duration,
) -> Self {
MappedData {
for_ids: ids.collect(),
created: SystemTime::now(),
timeout,
data: Box::new(data),
}
}
}
static MAPPER: Lazy<SharedStorage<Uuid, MappedData>> = SharedStorage::new();
pub struct Mapper;
impl Mapper {
fn remove_overdue() {
MAPPER
.lock()
.retain(|_, v| v.created.elapsed().unwrap() < v.timeout);
}
pub fn get(item_uid: Uuid, getter_uid: Uuid) -> Option<Data> {
Self::remove_overdue();
let allowed_ids = MAPPER.lock().get(&item_uid)?.for_ids;
if allowed_ids.contains(&getter_uid) {
MAPPER.lock().remove(&item_uid).map(|d| d.data)
} else {
None
}
}
pub fn set(
for_ids: impl Iterator<Item = Uuid>,
data: impl Read + Send + Sync + 'static,
timeout: Option<Duration>,
) -> Uuid {
Self::remove_overdue();
let uid = Uuid::new_v4();
let timeout = timeout.unwrap_or(Duration::from_secs(60));
let mapped = MappedData::new(for_ids, data, timeout);
MAPPER.lock().insert(uid, mapped);
uid
}
}
// init:
// fill index struct with files allowed to download (probably auto-refresh index on file adding)
// hashmap<stripped_hash_of_file_name, PathBuf>
// download:
// 1) agent sends download_request with hash of wanted file
// 2) handler checks if this agent is allowed to dl file
// 3) if all ok, create in entry in mapper and return its uid and hashsum to agent
// 4) agent downloads file by uid, uid destructed after that

@ -0,0 +1,5 @@
pub mod files;
pub mod mapper;
pub use files::FileIndex;
pub use mapper::Mapper;

@ -14,13 +14,25 @@ extern crate diesel;
mod db; mod db;
mod filters; mod filters;
mod handlers; mod handlers;
mod storages;
mod worker;
use db::UDB; use db::UDB;
use filters::make_filters; use filters::make_filters;
use std::thread;
use std::time::Duration;
use storages::FileIndex;
use u_lib::{config::MASTER_PORT, models::*, utils::init_env}; use u_lib::{config::MASTER_PORT, models::*, utils::init_env};
use warp::Filter; use warp::Filter;
use worker::Worker;
const LOGFILE: &str = "u_server.log"; fn init_workers() {
let workers: Vec<Box<dyn Worker + Send>> = vec![Box::new(FileIndex)];
thread::spawn(move || loop {
workers.into_iter().for_each(|w| w.process());
thread::sleep(Duration::from_secs(5));
});
}
fn prefill_jobs() { fn prefill_jobs() {
let agent_hello = JobMeta::builder() let agent_hello = JobMeta::builder()
@ -28,12 +40,19 @@ fn prefill_jobs() {
.with_alias("agent_hello") .with_alias("agent_hello")
.build() .build()
.unwrap(); .unwrap();
UDB::lock_db().insert_jobs(&[agent_hello]).ok(); let update_agent = JobMeta::builder()
.with_type(misc::JobType::Update)
.with_alias("update_agent")
.build()
.unwrap();
let all = [agent_hello, update_agent];
UDB::lock_db().insert_jobs(&all).ok();
} }
fn init_logger() { fn init_logger() {
use simplelog::*; use simplelog::*;
use std::fs::OpenOptions; use std::fs::OpenOptions;
const LOGFILE: &str = "u_server.log";
let log_cfg = ConfigBuilder::new() let log_cfg = ConfigBuilder::new()
.set_time_format_str("%x %X") .set_time_format_str("%x %X")
.set_time_to_local(true) .set_time_to_local(true)

@ -0,0 +1,3 @@
pub trait Worker {
fn process(&mut self);
}
Loading…
Cancel
Save