improve ufs index, fix unit and most integration tests, remove futures

pull/9/head
plazmoid 2 years ago
parent d0d7d0aca5
commit 1b0cdae404
  1. 127
      Cargo.lock
  2. 10
      Makefile.toml
  3. 2
      bin/u_agent/Cargo.toml
  4. 12
      bin/u_panel/src/argparse.rs
  5. 2
      bin/u_server/src/db.rs
  6. 12
      bin/u_server/src/handlers.rs
  7. 2
      bin/u_server/src/u_server.rs
  8. 1
      integration/Cargo.toml
  9. 2
      integration/integration_tests.sh
  10. 10
      integration/tests/fixtures/agent.rs
  11. 12
      integration/tests/fixtures/mod.rs
  12. 4
      integration/tests/helpers/panel.rs
  13. 1
      integration/tests/integration/api.rs
  14. 22
      integration/tests/integration/behaviour.rs
  15. 4
      lib/u_lib/Cargo.toml
  16. 17
      lib/u_lib/src/api.rs
  17. 33
      lib/u_lib/src/error/mod.rs
  18. 7
      lib/u_lib/src/executor.rs
  19. 8
      lib/u_lib/src/jobs.rs
  20. 55
      lib/u_lib/src/models/agent.rs
  21. 146
      lib/u_lib/src/models/jobs/meta.rs
  22. 2
      lib/u_lib/src/models/schema.rs
  23. 26
      lib/u_lib/src/platform.rs
  24. 15
      lib/u_lib/src/ufs/error.rs
  25. 101
      lib/u_lib/src/ufs/index.rs
  26. 203
      lib/u_lib/src/ufs/mod.rs
  27. 2
      migrations/2020-10-24-111622_create_all/up.sql

127
Cargo.lock generated

@ -289,7 +289,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.13", "syn 2.0.15",
] ]
[[package]] [[package]]
@ -321,6 +321,15 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.3.2" version = "1.3.2"
@ -504,9 +513,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.7" version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf2b3e8478797446514c91ef04bafcb59faba183e621ad488df88983cc14128c" checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"crossbeam-utils", "crossbeam-utils",
@ -579,7 +588,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"scratch", "scratch",
"syn 2.0.13", "syn 2.0.15",
] ]
[[package]] [[package]]
@ -596,7 +605,7 @@ checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.13", "syn 2.0.15",
] ]
[[package]] [[package]]
@ -773,13 +782,13 @@ dependencies = [
[[package]] [[package]]
name = "errno" name = "errno"
version = "0.3.0" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d6a0976c999d473fe89ad888d5a284e55366d9dc9038b1ba2aa15128c4afa0" checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [ dependencies = [
"errno-dragonfly", "errno-dragonfly",
"libc", "libc",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -897,7 +906,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.13", "syn 2.0.15",
] ]
[[package]] [[package]]
@ -948,9 +957,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.8" version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"libc", "libc",
@ -971,9 +980,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.16" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5be7b54589b581f624f566bf5d8eb2bab1db736c51528720b6bd36b96b55924d" checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@ -1094,9 +1103,9 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.25" version = "0.14.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc5e554ff619822309ffd57d8734d77cd5ce6238bc956f037ea06c58238c9899" checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
@ -1220,7 +1229,6 @@ dependencies = [
name = "integration" name = "integration"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"futures",
"once_cell", "once_cell",
"reqwest", "reqwest",
"rstest 0.17.0", "rstest 0.17.0",
@ -1235,13 +1243,13 @@ dependencies = [
[[package]] [[package]]
name = "io-lifetimes" name = "io-lifetimes"
version = "1.0.9" version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09270fd4fa1111bc614ed2246c7ef56239a3063d5be0d1ec3b589c505d400aeb" checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220"
dependencies = [ dependencies = [
"hermit-abi 0.3.1", "hermit-abi 0.3.1",
"libc", "libc",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -1274,6 +1282,15 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "keccak"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3afef3b6eff9ce9d8ff9b3601125eec7f0c8cbac7abd14f355d053fa56c98768"
dependencies = [
"cpufeatures",
]
[[package]] [[package]]
name = "language-tags" name = "language-tags"
version = "0.3.2" version = "0.3.2"
@ -1558,9 +1575,9 @@ checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
[[package]] [[package]]
name = "openssl" name = "openssl"
version = "0.10.49" version = "0.10.50"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d2f106ab837a24e03672c59b1239669a0596406ff657c3c0835b6b7f0f35a33" checksum = "7e30d8bc91859781f0a943411186324d580f2bbeb71b452fe91ae344806af3f1"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"cfg-if 1.0.0", "cfg-if 1.0.0",
@ -1579,7 +1596,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.13", "syn 2.0.15",
] ]
[[package]] [[package]]
@ -1590,9 +1607,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]] [[package]]
name = "openssl-sys" name = "openssl-sys"
version = "0.9.84" version = "0.9.85"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a20eace9dc2d82904039cb76dcf50fb1a0bba071cfd1629720b5d6f1ddba0fa" checksum = "0d3d193fb1488ad46ffe3aaabc912cc931d02ee8518fe2959aea8ef52718b0c0"
dependencies = [ dependencies = [
"cc", "cc",
"libc", "libc",
@ -1993,16 +2010,16 @@ dependencies = [
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.37.7" version = "0.37.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2aae838e49b3d63e9274e1c01833cc8139d3fec468c3b84688c628f44b1ae11d" checksum = "85597d61f83914ddeba6a47b3b8ffe7365107221c2e557ed94426489fefb5f77"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"errno 0.3.0", "errno 0.3.1",
"io-lifetimes", "io-lifetimes",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -2109,29 +2126,29 @@ checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.159" version = "1.0.160"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c04e8343c3daeec41f58990b9d77068df31209f2af111e059e9fe9646693065" checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.159" version = "1.0.160"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c614d17805b093df4b147b51339e7e44bf05ef59fba1e45d83500bcfb4d8585" checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.13", "syn 2.0.15",
] ]
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.95" version = "1.0.96"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d721eca97ac802aa7777b701877c8004d950fc142651367300d21c1cc0194744" checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
@ -2172,6 +2189,16 @@ dependencies = [
"digest", "digest",
] ]
[[package]]
name = "sha3"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54c2bb1a323307527314a36bfb73f24febb08ce2b8a554bf4ffd6f51ad15198c"
dependencies = [
"digest",
"keccak",
]
[[package]] [[package]]
name = "sharded-slab" name = "sharded-slab"
version = "0.1.4" version = "0.1.4"
@ -2312,9 +2339,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.13" version = "2.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c9da457c5285ac1f936ebd076af6dac17a61cfe7826f2076b4d015cf47bc8ec" checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2477,7 +2504,7 @@ checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.13", "syn 2.0.15",
] ]
[[package]] [[package]]
@ -2675,6 +2702,7 @@ name = "u_lib"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bincode",
"chrono", "chrono",
"daemonize", "daemonize",
"deadpool-diesel", "deadpool-diesel",
@ -2682,7 +2710,6 @@ dependencies = [
"diesel-derive-enum", "diesel-derive-enum",
"dotenv", "dotenv",
"envy", "envy",
"futures",
"guess_host_triple", "guess_host_triple",
"lazy_static", "lazy_static",
"libc", "libc",
@ -2694,6 +2721,7 @@ dependencies = [
"rstest 0.12.0", "rstest 0.12.0",
"serde", "serde",
"serde_json", "serde_json",
"sha3",
"shlex", "shlex",
"strum 0.20.0", "strum 0.20.0",
"thiserror", "thiserror",
@ -2826,9 +2854,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.3.0" version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"serde", "serde",
@ -3078,6 +3106,15 @@ dependencies = [
"windows-targets 0.42.2", "windows-targets 0.42.2",
] ]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.0",
]
[[package]] [[package]]
name = "windows-targets" name = "windows-targets"
version = "0.42.2" version = "0.42.2"
@ -3212,9 +3249,9 @@ dependencies = [
[[package]] [[package]]
name = "zstd-safe" name = "zstd-safe"
version = "6.0.4+zstd.1.5.4" version = "6.0.5+zstd.1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7afb4b54b8910cf5447638cb54bf4e8a65cbedd783af98b98c62ffe91f185543" checksum = "d56d9e60b4b1758206c238a10165fbcae3ca37b01744e394c463463f6529d23b"
dependencies = [ dependencies = [
"libc", "libc",
"zstd-sys", "zstd-sys",
@ -3222,9 +3259,9 @@ dependencies = [
[[package]] [[package]]
name = "zstd-sys" name = "zstd-sys"
version = "2.0.7+zstd.1.5.4" version = "2.0.8+zstd.1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94509c3ba2fe55294d752b79842c530ccfab760192521df74a081a78d2b3c7f5" checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c"
dependencies = [ dependencies = [
"cc", "cc",
"libc", "libc",

@ -69,11 +69,14 @@ clear = true
[tasks.run] [tasks.run]
disabled = true disabled = true
[tasks.unit] [tasks.unit-tests]
command = "${CARGO}" command = "${CARGO}"
args = ["test", "--target", "${TARGET}", "--lib", "--", "${@}"] args = ["test", "--target", "${TARGET}", "--lib", "--", "${@}"]
[tasks.integration] [tasks.ut]
alias = "unit-tests"
[tasks.integration-tests]
dependencies = ["cargo_update"] dependencies = ["cargo_update"]
script = ''' script = '''
[[ ! -d "./target/${TARGET}/${PROFILE_OVERRIDE}" ]] && echo 'No target folder. Build project first' && exit 1 [[ ! -d "./target/${TARGET}/${PROFILE_OVERRIDE}" ]] && echo 'No target folder. Build project first' && exit 1
@ -81,6 +84,9 @@ cd ./integration
bash integration_tests.sh ${@} bash integration_tests.sh ${@}
''' '''
[tasks.it]
alias = "integration-tests"
[tasks.test] [tasks.test]
dependencies = ["unit", "integration"] dependencies = ["unit", "integration"]

@ -12,5 +12,5 @@ reqwest = { workspace = true }
sysinfo = "0.10.5" sysinfo = "0.10.5"
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "time"] } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "time"] }
uuid = { workspace = true } uuid = { workspace = true }
u_lib = { path = "../../lib/u_lib" } u_lib = { path = "../../lib/u_lib", features = ["agent"] }

@ -81,14 +81,16 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
Cmd::Agents(action) => match action { Cmd::Agents(action) => match action {
RUD::Read { id } => into_value(client.get_agents(id).await?), RUD::Read { id } => into_value(client.get_agents(id).await?),
RUD::Update { item } => { RUD::Update { item } => {
let agent = from_str::<Agent>(&item)?; let agent = from_str::<Agent>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?;
into_value(client.update_agent(&agent).await?) into_value(client.update_agent(&agent).await?)
} }
RUD::Delete { id } => into_value(client.del(id).await?), RUD::Delete { id } => into_value(client.del(id).await?),
}, },
Cmd::Jobs(action) => match action { Cmd::Jobs(action) => match action {
JobCRUD::Create { job } => { JobCRUD::Create { job } => {
let raw_job = from_str::<RawJob>(&job)?; let raw_job = from_str::<RawJob>(&job)
.map_err(|e| UError::DeserializeError(e.to_string(), job))?;
let job = raw_job.validated()?; let job = raw_job.validated()?;
let fat_job = join_payload(job)?; let fat_job = join_payload(job)?;
@ -100,7 +102,8 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
None => into_value(client.get_jobs().await?), None => into_value(client.get_jobs().await?),
}, },
JobCRUD::RUD(RUD::Update { item }) => { JobCRUD::RUD(RUD::Update { item }) => {
let raw_job = from_str::<RawJob>(&item)?; let raw_job = from_str::<RawJob>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?;
let job = raw_job.validated()?; let job = raw_job.validated()?;
into_value(client.update_job(&join_payload(job)?).await?) into_value(client.update_job(&join_payload(job)?).await?)
} }
@ -113,7 +116,8 @@ pub async fn process_cmd(client: HttpClient, args: Args) -> PanelResult<Value> {
} => into_value(client.set_jobs(agent_id, &job_idents).await?), } => into_value(client.set_jobs(agent_id, &job_idents).await?),
JobMapCRUD::RUD(RUD::Read { id }) => into_value(client.get_agent_jobs(id).await?), JobMapCRUD::RUD(RUD::Read { id }) => into_value(client.get_agent_jobs(id).await?),
JobMapCRUD::RUD(RUD::Update { item }) => { JobMapCRUD::RUD(RUD::Update { item }) => {
let assigned = from_str::<AssignedJob>(&item)?; let assigned = from_str::<AssignedJob>(&item)
.map_err(|e| UError::DeserializeError(e.to_string(), item))?;
into_value(client.update_result(&assigned).await?) into_value(client.update_result(&assigned).await?)
} }
JobMapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?), JobMapCRUD::RUD(RUD::Delete { id }) => into_value(client.del(id).await?),

@ -186,7 +186,7 @@ impl UDB<'_> {
}; };
let jobs_meta = jobs::table let jobs_meta = jobs::table
.select((jobs::id, jobs::alias, jobs::platform)) .select((jobs::id, jobs::alias, jobs::target_platforms))
.filter(jobs::id.eq_any(job_ids)) .filter(jobs::id.eq_any(job_ids))
.load::<(Id, Option<String>, String)>(self.conn) .load::<(Id, Option<String>, String)>(self.conn)
.map_err(with_err_ctx(format!("Can't find jobs {job_ids:?}")))?; .map_err(with_err_ctx(format!("Can't find jobs {job_ids:?}")))?;

@ -81,7 +81,8 @@ impl Endpoints {
db.upsert_agent(&agent)?; db.upsert_agent(&agent)?;
} }
None => { None => {
let new_agent = Agent::with_id(id); let mut new_agent = Agent::empty();
new_agent.id = id;
db.upsert_agent(&new_agent)?; db.upsert_agent(&new_agent)?;
@ -131,9 +132,9 @@ impl Endpoints {
repo: Arc<PgRepo>, repo: Arc<PgRepo>,
agent_id: Id, agent_id: Id,
job_idents: Vec<String>, job_idents: Vec<String>,
) -> EndpResult<()> { ) -> EndpResult<Vec<Id>> {
repo.transaction(move |mut db| { repo.transaction(move |mut db| {
job_idents let assigned_job_idents = job_idents
.into_iter() .into_iter()
.map(|ident| { .map(|ident| {
Id::parse_str(&ident).or_else(|_| { Id::parse_str(&ident).or_else(|_| {
@ -149,8 +150,9 @@ impl Endpoints {
} }
}) })
}) })
.collect::<Result<Vec<Id>, Error>>() .collect::<Result<Vec<Id>, Error>>()?;
.and_then(|j| db.set_jobs_for_agent(agent_id, &j)) db.set_jobs_for_agent(agent_id, &assigned_job_idents)?;
Ok(assigned_job_idents)
}) })
.await .await
.map_err(From::from) .map_err(From::from)

@ -221,7 +221,7 @@ fn logger(info: Info<'_>) {
} }
fn ok<T>(_: T) -> impl Reply { fn ok<T>(_: T) -> impl Reply {
reply() "null"
} }
/* /*

@ -7,7 +7,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
futures = { version = "0.3", features = ["executor"] }
once_cell = "1.10.0" once_cell = "1.10.0"
reqwest = { workspace = true } reqwest = { workspace = true }
rstest = "0.17" rstest = "0.17"

@ -2,5 +2,7 @@
set -e set -e
export DOCKER_UID=$(id -u) export DOCKER_UID=$(id -u)
export DOCKER_GID=$(id -g) export DOCKER_GID=$(id -g)
rm ../logs/u_agent*
[[ "$@" =~ "--release" ]] && export PROFILE=release || export PROFILE=debug [[ "$@" =~ "--release" ]] && export PROFILE=release || export PROFILE=debug
python integration_tests.py $@ python integration_tests.py $@

@ -1,9 +1,6 @@
use super::connections::*; use super::connections::*;
use super::run_async; use super::run_async;
use u_lib::{ use u_lib::{api::HttpClient, jobs::split_payload, messaging::Reportable, models::*, types::Id};
api::HttpClient, config::get_self_id, jobs::split_payload, messaging::Reportable, models::*,
types::Id,
};
pub struct RegisteredAgent { pub struct RegisteredAgent {
pub id: Id, pub id: Id,
@ -13,7 +10,8 @@ pub struct RegisteredAgent {
#[once] #[once]
pub fn registered_agent(client: &HttpClient) -> RegisteredAgent { pub fn registered_agent(client: &HttpClient) -> RegisteredAgent {
run_async(async { run_async(async {
let agent_id = get_self_id(); let agent = Agent::with_current_platform();
let agent_id = agent.id;
println!("registering agent {agent_id}"); println!("registering agent {agent_id}");
let resp = client let resp = client
.get_personal_jobs(agent_id) .get_personal_jobs(agent_id)
@ -25,7 +23,7 @@ pub fn registered_agent(client: &HttpClient) -> RegisteredAgent {
let job = client.get_job(job_id, true).await.unwrap(); let job = client.get_job(job_id, true).await.unwrap();
assert_eq!(job.job.alias, Some("agent_hello".to_string())); assert_eq!(job.job.alias, Some("agent_hello".to_string()));
let mut agent_data = AssignedJob::from((&split_payload(job).unwrap().job, resp)); let mut agent_data = AssignedJob::from((&split_payload(job).unwrap().job, resp));
agent_data.set_result(&Agent::with_id(agent_id)); agent_data.set_result(&agent);
client client
.report(&Reportable::Assigned(agent_data)) .report(&Reportable::Assigned(agent_data))
.await .await

@ -3,7 +3,15 @@ pub mod connections;
pub mod env; pub mod env;
use std::future::Future; use std::future::Future;
use std::thread;
use tokio::runtime::Runtime;
fn run_async<R>(fut: impl Future<Output = R>) -> R { // tokio runtime cannot be created inside another runtime,
futures::executor::block_on(fut) // so i create a separate non-'static thread not to interfere
fn run_async<R: Send>(fut: impl Future<Output = R> + Send) -> R {
thread::scope(|s| {
s.spawn(|| Runtime::new().unwrap().block_on(fut))
.join()
.expect("async task failed")
})
} }

@ -40,8 +40,8 @@ impl Panel {
.as_ref(), .as_ref(),
); );
match &result { match &result {
PanelResult::Ok(r) => eprintln!("+<< {r:?}"), PanelResult::Ok(r) => eprintln!("+<< {r:#?}"),
PanelResult::Err(e) => eprintln!("!<< {e:?}"), PanelResult::Err(e) => eprintln!("!<< {e:#?}"),
} }
result result
} }

@ -26,6 +26,7 @@ async fn test_jobs_endpoints(client_panel: &HttpClient) {
.with_alias(job_alias) .with_alias(job_alias)
.build() .build()
.unwrap(); .unwrap();
let job_id = job.job.id; let job_id = job.job.id;
let mut fat_job = join_payload(job).unwrap(); let mut fat_job = join_payload(job).unwrap();

@ -2,7 +2,7 @@ use crate::fixtures::agent::*;
use crate::helpers::{jobs::retry_with_interval, Panel}; use crate::helpers::{jobs::retry_with_interval, Panel};
use rstest::rstest; use rstest::rstest;
use serde_json::{json, to_string}; use serde_json::to_string;
use u_lib::config::AGENT_ITERATION_INTERVAL; use u_lib::config::AGENT_ITERATION_INTERVAL;
use u_lib::models::*; use u_lib::models::*;
use uuid::Uuid; use uuid::Uuid;
@ -21,13 +21,17 @@ async fn setup_tasks() {
let agents: Vec<Agent> = Panel::check_output("agents read"); let agents: Vec<Agent> = Panel::check_output("agents read");
let agent_id = agents[0].id; let agent_id = agents[0].id;
let job_alias = "passwd_contents"; let job_alias = "passwd_contents";
let job = json!( let job = RawJob::builder()
{"alias": job_alias, "payload": b"cat /etc/passwd", "argv": "/bin/bash {}" } .with_alias(job_alias)
); .with_raw_payload(b"cat /etc/passwd".as_slice())
.with_shell("/bin/bash {}")
.with_target_platforms("*linux*")
.build()
.unwrap();
Panel::check_status(["jobs", "create", &to_string(&job).unwrap()]); Panel::check_status(["jobs", "create", &to_string(&RawJob::from(job)).unwrap()]);
let cmd = format!("map create {} {}", agent_id, job_alias); let cmd = format!("map create {agent_id} {job_alias}");
let assigned_ids: Vec<Uuid> = Panel::check_output(cmd); let assigned_ids: Vec<Uuid> = Panel::check_output(cmd);
retry_with_interval(5, AGENT_ITERATION_INTERVAL, || { retry_with_interval(5, AGENT_ITERATION_INTERVAL, || {
@ -47,14 +51,14 @@ async fn setup_tasks() {
#[tokio::test] #[tokio::test]
async fn large_payload() { async fn large_payload() {
let agent_id = Panel::check_output::<Vec<Agent>>("agents read")[0].id; let agent = &Panel::check_output::<Vec<Agent>>("agents read")[0];
let agent_id = agent.id;
let job_alias = "large_payload"; let job_alias = "large_payload";
let job = RawJob::builder() let job = RawJob::builder()
.with_alias(job_alias) .with_alias(job_alias)
.with_payload_path("./tests/bin/echoer") .with_payload_path("./tests/bin/echoer")
.with_shell("{} type echo") .with_shell("{} type echo")
.with_target_platforms(&agent.platform)
.build() .build()
.unwrap(); .unwrap();

@ -14,7 +14,6 @@ diesel-derive-enum = { version = "2.0.0", features = ["postgres"], optional = tr
deadpool-diesel = { workspace = true, optional = true } deadpool-diesel = { workspace = true, optional = true }
dotenv = "0.15.0" dotenv = "0.15.0"
envy = "0.4.2" envy = "0.4.2"
futures = "0.3.5"
guess_host_triple = "0.1.2" guess_host_triple = "0.1.2"
libc = "^0.2" libc = "^0.2"
lazy_static = "1.4.0" lazy_static = "1.4.0"
@ -32,12 +31,15 @@ tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing-subscriber = { workspace = true, features = ["env-filter"] }
uuid = { workspace = true, features = ["serde", "v4"] } uuid = { workspace = true, features = ["serde", "v4"] }
parking_lot = "0.12.1" parking_lot = "0.12.1"
bincode = "1.3.3"
sha3 = "0.10.7"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
daemonize = "0.4.1" daemonize = "0.4.1"
nix = "0.17" nix = "0.17"
[features] [features]
agent = []
panel = [] panel = []
server = ["dep:diesel", "dep:diesel-derive-enum", "dep:deadpool-diesel"] server = ["dep:diesel", "dep:diesel-derive-enum", "dep:deadpool-diesel"]

@ -85,29 +85,34 @@ impl HttpClient {
url: impl AsRef<str>, url: impl AsRef<str>,
payload: &P, payload: &P,
) -> Result<R> { ) -> Result<R> {
let url = url.as_ref();
let request = self let request = self
.client .client
.post(self.base_url.join(url.as_ref()).unwrap()) .post(self.base_url.join(url).unwrap())
.json(payload); .json(payload);
let response = request let response = request
.send() .send()
.await .await
.context("error while sending request")?; .context("error while sending request")?;
let is_success = match response.error_for_status_ref() { let is_success = match response.error_for_status_ref() {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => Err(UError::from(e)), Err(e) => Err(UError::from(e)),
}; };
let resp = response.text().await.context("resp")?; let resp = response.text().await.context("resp")?;
let result = match is_success { let result = match is_success {
Ok(_) => from_str::<R>(&resp).map_err(|e| UError::NetError(e.to_string(), resp)), Ok(_) => {
Err(UError::NetError(err, _)) => Err(UError::NetError(err, resp)), from_str::<R>(&resp).map_err(|e| UError::DeserializeError(e.to_string(), resp))
}
Err(UError::NetError(err, url, _)) => Err(UError::NetError(err, url, resp)),
_ => unreachable!(), _ => unreachable!(),
} }
.map_err(From::from); .map_err(From::from);
debug!("url = {}, resp = {:?}", url.as_ref(), result); debug!("url = {}, resp = {:?}", url, result);
result result
} }
@ -164,13 +169,13 @@ impl HttpClient {
} }
/// create and upload job /// create and upload job
pub async fn upload_jobs(&self, payload: impl OneOrVecRef<FatJob>) -> Result<Vec<Id>> { pub async fn upload_jobs(&self, payload: impl OneOrVecRef<FatJob>) -> Result<()> {
self.req_with_payload("upload_jobs", &payload.as_vec()) self.req_with_payload("upload_jobs", &payload.as_vec())
.await .await
} }
/// delete something /// delete something
pub async fn del(&self, item: Id) -> Result<i32> { pub async fn del(&self, item: Id) -> Result<()> {
self.req(format!("del/{item}")).await self.req(format!("del/{item}")).await
} }

@ -15,8 +15,8 @@ pub enum UError {
#[error("Runtime error: {0}")] #[error("Runtime error: {0}")]
Runtime(String), Runtime(String),
#[error("Connection error: {0}. Body: {1}")] #[error("Connection error: {0}; url: {1}; body: '''{2}'''")]
NetError(String, String), NetError(String, String, String),
#[error("Parse error")] #[error("Parse error")]
ParseError, ParseError,
@ -42,27 +42,44 @@ pub enum UError {
#[error("Panel error: {0}")] #[error("Panel error: {0}")]
PanelError(String), PanelError(String),
#[error("Deserialize from json error: {0}")] #[error("Deserialize from json error: {0}, body: {1}")]
DeserializeError(String), DeserializeError(String, String),
#[error("{0}\n{1}")]
Contexted(Box<UError>, String),
} }
impl From<ReqError> for UError { impl From<ReqError> for UError {
fn from(e: ReqError) -> Self { fn from(e: ReqError) -> Self {
UError::NetError(e.to_string(), String::new()) UError::NetError(
e.to_string(),
e.url().map(|u| u.to_string()).unwrap_or_default(),
String::new(),
)
} }
} }
/*
impl From<serde_json::Error> for UError { impl From<serde_json::Error> for UError {
fn from(e: serde_json::Error) -> Self { fn from(e: serde_json::Error) -> Self {
UError::DeserializeError(e.to_string()) UError::DeserializeError(e.to_string())
} }
} }
*/
impl From<anyhow::Error> for UError { impl From<anyhow::Error> for UError {
fn from(e: anyhow::Error) -> Self { fn from(e: anyhow::Error) -> Self {
let ctx = e
.chain()
.skip(1)
.map(|cause| format!("ctx: {}", cause))
.collect::<Vec<_>>()
.join("\n");
match e.downcast::<UError>() { match e.downcast::<UError>() {
Ok(err) => err, Ok(err) => UError::Contexted(Box::new(err), ctx),
Err(err) => match err.downcast::<ufs::Error>() {
Ok(err) => UError::Contexted(Box::new(UError::FSError(err)), ctx),
Err(err) => UError::Runtime(err.to_string()), Err(err) => UError::Runtime(err.to_string()),
},
} }
} }
} }

@ -1,16 +1,18 @@
use crate::{models::AssignedJob, UResult}; use crate::{models::AssignedJob, UResult};
use futures::{future::BoxFuture, lock::Mutex};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future; use std::future::Future;
use std::pin::Pin;
use tokio::{ use tokio::{
runtime::Handle, runtime::Handle,
sync::mpsc::{channel, Receiver, Sender}, sync::mpsc::{channel, Receiver, Sender},
sync::Mutex,
task::{spawn, spawn_blocking, JoinHandle}, task::{spawn, spawn_blocking, JoinHandle},
}; };
use uuid::Uuid; use uuid::Uuid;
pub type ExecResult = UResult<AssignedJob>; pub type ExecResult = UResult<AssignedJob>;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
lazy_static! { lazy_static! {
static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new()); static ref FUT_RESULTS: Mutex<HashMap<Uuid, JoinInfo>> = Mutex::new(HashMap::new());
@ -94,12 +96,11 @@ impl Waiter {
async fn init_receiver() { async fn init_receiver() {
while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await {
if let Some(mut lock) = FUT_RESULTS.try_lock() { let mut lock = FUT_RESULTS.lock().await;
if let Some(j) = lock.get_mut(&fid) { if let Some(j) = lock.get_mut(&fid) {
j.completed = true; j.completed = true;
} }
} }
}
} }
async fn pop_task(fid: Uuid) -> Option<JoinInfo> { async fn pop_task(fid: Uuid) -> Option<JoinInfo> {

@ -169,7 +169,9 @@ pub async fn run_assigned_job(job: ThinJob, ids: AssignedJobById) -> ExecResult
result.retcode = retcode; result.retcode = retcode;
} }
JobType::Init => { JobType::Init => {
result.set_result(&Agent::run().await); let agent = Agent::gather().await;
debug!("gathered info from agent: {agent:?}");
result.set_result(&agent);
result.retcode = Some(0); result.retcode = Some(0);
} }
JobType::Service => todo!(), JobType::Service => todo!(),
@ -263,7 +265,7 @@ mod tests {
) -> TestResult { ) -> TestResult {
let mut job = RawJob::builder().with_shell(cmd); let mut job = RawJob::builder().with_shell(cmd);
if let Some(p) = payload { if let Some(p) = payload {
job = job.with_payload(p); job = job.with_raw_payload(p);
} }
let job = job.build().unwrap(); let job = job.build().unwrap();
let result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap(); let result = AnonymousJobBatch::from_meta(job).wait_one().await.unwrap();
@ -340,7 +342,7 @@ mod tests {
) -> TestResult { ) -> TestResult {
let mut job = RawJob::builder().with_shell(cmd); let mut job = RawJob::builder().with_shell(cmd);
if let Some(p) = payload { if let Some(p) = payload {
job = job.with_payload(p); job = job.with_raw_payload(p);
} }
let err = job.build().unwrap_err(); let err = job.build().unwrap_err();
let err_msg = unwrap_enum!(err, UError::JobBuildError); let err_msg = unwrap_enum!(err, UError::JobBuildError);

@ -14,7 +14,7 @@ use self::server::*;
use crate::{ use crate::{
config::get_self_id, conv::systime_to_string, executor::ExecResult, jobs::NamedJobBatch, config::get_self_id, conv::systime_to_string, executor::ExecResult, jobs::NamedJobBatch,
platform::Platform, types::Id, platform, types::Id,
}; };
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)]
@ -54,10 +54,29 @@ pub struct Agent {
} }
impl Agent { impl Agent {
pub fn with_id(id: Id) -> Self { pub fn empty() -> Self {
Self { Self {
id, alias: None,
..Default::default() id: get_self_id(),
hostname: String::new(),
host_info: String::new(),
is_root: false,
is_root_allowed: false,
last_active: SystemTime::now(),
platform: String::new(),
regtime: SystemTime::now(),
state: AgentState::New,
token: None,
username: String::new(),
ip_gray: None,
ip_white: None,
}
}
pub fn with_current_platform() -> Self {
Self {
platform: platform::current_as_string(),
..Self::empty()
} }
} }
@ -91,33 +110,7 @@ impl Agent {
host_info: decoder(builder.pop("host_info")), host_info: decoder(builder.pop("host_info")),
is_root: &decoder(builder.pop("is_root")) == "0", is_root: &decoder(builder.pop("is_root")) == "0",
username: decoder(builder.pop("username")), username: decoder(builder.pop("username")),
platform: Platform::current().into_string(), ..Self::with_current_platform()
..Default::default()
}
}
pub async fn run() -> Agent {
Agent::gather().await
}
}
impl Default for Agent {
fn default() -> Self {
Self {
alias: None,
id: get_self_id(),
hostname: String::new(),
host_info: String::new(),
is_root: false,
is_root_allowed: false,
last_active: SystemTime::now(),
platform: String::new(),
regtime: SystemTime::now(),
state: AgentState::New,
token: None,
username: String::new(),
ip_gray: None,
ip_white: None,
} }
} }
} }

@ -5,13 +5,13 @@ use crate::conv::bytes_to_string;
#[cfg(feature = "server")] #[cfg(feature = "server")]
use crate::models::schema::*; use crate::models::schema::*;
use crate::models::PayloadMeta; use crate::models::PayloadMeta;
use crate::platform::Platform; use crate::platform;
use crate::types::Id; use crate::types::Id;
use crate::{ufs, UError, UResult}; use crate::{ufs, UError, UResult};
#[cfg(feature = "server")] #[cfg(feature = "server")]
use diesel::{Identifiable, Insertable, Queryable}; use diesel::{Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::metadata; use std::borrow::Cow;
use std::process::Command; use std::process::Command;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -28,7 +28,7 @@ pub struct JobModel {
pub id: Id, pub id: Id,
pub exec_type: JobType, pub exec_type: JobType,
/// target triple /// target triple
pub platform: String, pub target_platforms: String,
pub payload: Option<Id>, pub payload: Option<Id>,
/// cron-like string /// cron-like string
pub schedule: Option<String>, pub schedule: Option<String>,
@ -47,22 +47,8 @@ pub struct ThinJob {
pub payload_meta: Option<PayloadMeta>, pub payload_meta: Option<PayloadMeta>,
} }
// impl fmt::Debug for ThinJobMeta { #[derive(Serialize, Deserialize, Clone)]
// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { pub struct RawJob<'p> {
// f.debug_struct("ThinJobMeta")
// .field("alias", &self.alias)
// .field("argv", &self.argv)
// .field("id", &self.id.to_string())
// .field("exec_type", &self.exec_type)
// .field("platform", &self.platform)
// .field("payload", &self.payload)
// .field("schedule", &self.schedule)
// .finish()
// }
// }
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct RawJob {
#[serde(default)] #[serde(default)]
pub alias: Option<String>, pub alias: Option<String>,
@ -78,32 +64,51 @@ pub struct RawJob {
pub exec_type: JobType, pub exec_type: JobType,
/// target triple /// target triple
#[serde(default = "Platform::current_as_string")] #[serde(default = "platform::current_as_string")]
pub platform: String, pub target_platforms: String,
#[serde(default)] #[serde(default)]
pub payload: Option<String>, pub payload_path: Option<String>,
#[serde(default)]
pub raw_payload: Option<Cow<'p, [u8]>>,
/// cron-like string /// cron-like string
#[serde(default)] #[serde(default)]
pub schedule: Option<String>, pub schedule: Option<String>,
} }
impl fmt::Debug for RawJob { impl Default for RawJob<'_> {
fn default() -> Self {
Self {
alias: None,
argv: String::new(),
id: Id::new_v4(),
exec_type: JobType::default(),
target_platforms: String::new(),
payload_path: None,
raw_payload: None,
schedule: None,
}
}
}
impl fmt::Debug for RawJob<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawJob") f.debug_struct("RawJob")
.field("alias", &self.alias) .field("alias", &self.alias)
.field("argv", &self.argv) .field("argv", &self.argv)
.field("id", &self.id.to_string()) .field("id", &self.id.to_string())
.field("exec_type", &self.exec_type) .field("exec_type", &self.exec_type)
.field("platform", &self.platform) .field("platform", &self.target_platforms)
.field("payload", &self.payload) .field("payload_path", &self.payload_path)
.field("raw_payload", &self.raw_payload)
.field("schedule", &self.schedule) .field("schedule", &self.schedule)
.finish() .finish()
} }
} }
impl From<ThinJob> for RawJob { impl From<ThinJob> for RawJob<'_> {
fn from(job: ThinJob) -> Self { fn from(job: ThinJob) -> Self {
let ThinJob { job, payload_meta } = job; let ThinJob { job, payload_meta } = job;
RawJob { RawJob {
@ -111,49 +116,31 @@ impl From<ThinJob> for RawJob {
argv: job.argv, argv: job.argv,
id: job.id, id: job.id,
exec_type: job.exec_type, exec_type: job.exec_type,
platform: job.platform, target_platforms: job.target_platforms,
payload: payload_meta.map(|m| m.name), payload_path: payload_meta.map(|m| m.name),
raw_payload: None,
schedule: job.schedule, schedule: job.schedule,
} }
} }
} }
impl RawJob { impl<'p> RawJob<'p> {
pub fn validated(self) -> UResult<ThinJob> { pub fn validated(self) -> UResult<ThinJob> {
JobBuilder { JobBuilder { inner: self }.build()
inner: self,
raw_payload: None,
}
.build()
} }
pub fn from_shell(cmd: impl Into<String>) -> UResult<ThinJob> { pub fn from_shell(cmd: impl Into<String>) -> UResult<ThinJob> {
Self::builder().with_shell(cmd).build() Self::builder().with_shell(cmd).build()
} }
pub fn builder<'p>() -> JobBuilder<'p> { pub fn builder() -> JobBuilder<'p> {
JobBuilder::default() JobBuilder::default()
} }
} }
// impl Default for RawJob {
// fn default() -> Self {
// Self {
// id: Id::new_v4(),
// alias: None,
// argv: String::new(),
// exec_type: JobType::Shell,
// platform: Platform::current_as_string(),
// payload: None,
// schedule: None,
// }
// }
//}
#[derive(Default)] #[derive(Default)]
pub struct JobBuilder<'p> { pub struct JobBuilder<'p> {
inner: RawJob, inner: RawJob<'p>,
raw_payload: Option<&'p [u8]>,
} }
impl<'p> JobBuilder<'p> { impl<'p> JobBuilder<'p> {
@ -163,15 +150,15 @@ impl<'p> JobBuilder<'p> {
self self
} }
pub fn with_payload(mut self, raw_payload: &'p [u8]) -> Self { pub fn with_raw_payload(mut self, raw_payload: impl Into<Cow<'p, [u8]>>) -> Self {
self.raw_payload = Some(raw_payload); self.inner.raw_payload = Some(raw_payload.into());
self.inner.payload = None; self.inner.payload_path = None;
self self
} }
pub fn with_payload_path(mut self, path: impl Into<String>) -> Self { pub fn with_payload_path(mut self, path: impl Into<String>) -> Self {
self.inner.payload = Some(path.into()); self.inner.payload_path = Some(path.into());
self.raw_payload = None; self.inner.raw_payload = None;
self self
} }
@ -185,11 +172,16 @@ impl<'p> JobBuilder<'p> {
self self
} }
pub fn with_target_platforms(mut self, platform: impl Into<String>) -> Self {
self.inner.target_platforms = platform.into();
self
}
pub fn build(self) -> UResult<ThinJob> { pub fn build(self) -> UResult<ThinJob> {
let mut inner = self.inner; let mut inner = self.inner;
let raw_into_job = |raw: RawJob| -> UResult<ThinJob> { let raw_into_job = |raw: RawJob| -> UResult<ThinJob> {
let payload_id = raw.payload.as_ref().map(|_| Id::new_v4()); let payload_id = raw.payload_path.as_ref().map(|_| Id::new_v4());
Ok(ThinJob { Ok(ThinJob {
job: JobModel { job: JobModel {
@ -197,27 +189,29 @@ impl<'p> JobBuilder<'p> {
argv: raw.argv, argv: raw.argv,
id: raw.id, id: raw.id,
exec_type: raw.exec_type, exec_type: raw.exec_type,
platform: raw.platform, target_platforms: raw.target_platforms,
payload: payload_id, payload: payload_id,
schedule: raw.schedule, schedule: raw.schedule,
}, },
payload_meta: raw payload_meta: raw
.payload .payload_path
.map(|payload_path| { .map(|payload_ident| {
Ok::<_, UError>(PayloadMeta { let ufs_meta = ufs::read_meta(&payload_ident)?;
let payload_meta = PayloadMeta {
id: payload_id.unwrap(), id: payload_id.unwrap(),
mime_type: bytes_to_string( mime_type: bytes_to_string(
&Command::new("file") &Command::new("file")
.arg("-b") .arg("-b")
.arg("--mime-type") .arg("--mime-type")
.arg(&payload_path) .arg(&ufs_meta.path)
.output() .output()
.map_err(|e| UError::JobBuildError(e.to_string()))? .map_err(|e| UError::JobBuildError(e.to_string()))?
.stdout, .stdout,
), ),
name: payload_path.clone(), name: payload_ident.clone(),
size: metadata(payload_path).unwrap().len() as i64, size: ufs_meta.size as i64,
}) };
Ok::<_, UError>(payload_meta)
}) })
.transpose()?, .transpose()?,
}) })
@ -244,22 +238,22 @@ impl<'p> JobBuilder<'p> {
return Err(empty_err.into()); return Err(empty_err.into());
} }
if let Some(payload_path) = &inner.payload { if let Some(path) = &inner.payload_path {
ufs::put_external(payload_path)?; ufs::put_external(path)?;
} }
if let Some(raw) = self.raw_payload { if let Some(raw_payload) = &inner.raw_payload {
match inner.payload { match inner.payload_path {
Some(_) => { Some(_) => {
return Err(UError::JobBuildError( return Err(UError::JobBuildError(
"Can't use both raw payload with payload path".to_string(), "Can't use both raw payload with payload path".to_string(),
)) ))
} }
None => inner.payload = Some(ufs::create(raw)?), None => inner.payload_path = Some(ufs::create_anonymous(raw_payload)?),
} }
} }
match inner.payload.as_ref() { match inner.payload_path.as_ref() {
Some(_) => { Some(_) => {
if !inner.argv.contains("{}") { if !inner.argv.contains("{}") {
return Err(UError::JobBuildError( return Err(UError::JobBuildError(
@ -279,10 +273,14 @@ impl<'p> JobBuilder<'p> {
} }
}; };
if Platform::new(&inner.platform).find_valid().is_none() { if inner.target_platforms.is_empty() {
inner.target_platforms = "*".to_string();
}
if !platform::is_valid_glob(&inner.target_platforms) {
return Err(UError::JobBuildError(format!( return Err(UError::JobBuildError(format!(
"Unknown platform {}", "Unknown platform '{}'",
inner.platform inner.target_platforms
))); )));
} }

@ -45,7 +45,7 @@ diesel::table! {
argv -> Text, argv -> Text,
id -> Uuid, id -> Uuid,
exec_type -> Jobtype, exec_type -> Jobtype,
platform -> Text, target_platforms -> Text,
payload -> Nullable<Uuid>, payload -> Nullable<Uuid>,
schedule -> Nullable<Text>, schedule -> Nullable<Text>,
} }

@ -11,18 +11,10 @@ impl Platform {
Platform(p.into()) Platform(p.into())
} }
pub fn current() -> Platform {
Self(guess_host_triple().unwrap().to_string())
}
pub fn current_as_string() -> String {
Self::current().into_string()
}
pub fn matches(&self, pf: impl AsRef<str>) -> bool { pub fn matches(&self, pf: impl AsRef<str>) -> bool {
// this finder needs a full triple, so when the .0 is empty, return true // this finder needs a full triple, so when the .0 is empty, return true
// this is fucked up tbh // this is fucked up tbh
let Some(platform_to_match_against) = self.find_valid() else { let Some(platform_to_match_against) = LibPlatform::find(&self.0) else {
return self.0.is_empty() return self.0.is_empty()
}; };
@ -32,11 +24,19 @@ impl Platform {
} }
} }
pub fn find_valid(&self) -> Option<&'static LibPlatform> {
LibPlatform::find(&self.0)
}
pub fn into_string(self) -> String { pub fn into_string(self) -> String {
self.0 self.0
} }
} }
pub fn current() -> Platform {
Platform(guess_host_triple().unwrap().to_string())
}
pub fn current_as_string() -> String {
current().into_string()
}
pub fn is_valid_glob(platform_req: impl AsRef<str>) -> bool {
PlatformReq::from_str(platform_req.as_ref()).is_ok()
}

@ -35,3 +35,18 @@ impl From<io::Error> for Error {
} }
} }
} }
impl From<anyhow::Error> for Error {
fn from(e: anyhow::Error) -> Self {
let err = e
.chain()
.map(|cause| format!("ctx: {}", cause))
.collect::<Vec<_>>()
.join("\n");
Error {
err,
path: String::new(),
}
}
}

@ -0,0 +1,101 @@
use super::{Error, FileMeta};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::env::temp_dir;
use std::fs;
use std::path::PathBuf;
// index format: given_name -> payload_meta
type Index = HashMap<String, FileMeta>;
static IDX_FILE_NAME: Lazy<PathBuf> = Lazy::new(|| temp_dir().join(".i"));
static INDEX: Lazy<Mutex<Index>> = Lazy::new(|| {
let idx_name = &*IDX_FILE_NAME;
let deserialized_idx = fs::read(idx_name)
.map_err(|e| Error::new(e, idx_name))
.and_then(|raw_idx| {
bincode::deserialize::<Index>(&raw_idx).map_err(|e| Error::new(e, idx_name))
});
let idx = match deserialized_idx {
Ok(idx) => idx,
Err(e) => {
error!("index loading failed: {e}");
HashMap::new()
}
};
Mutex::new(idx)
});
mod sync {
use super::{Index, IDX_FILE_NAME};
use std::fs;
pub(super) fn deleted(index: &mut Index) {
let files_to_delete: Vec<String> = index
.iter()
.filter_map(|(name, meta)| {
if meta.path.exists() {
None
} else {
Some(name.to_string())
}
})
.collect();
files_to_delete.into_iter().for_each(|f| {
index.remove(&f);
});
}
pub(super) fn index2fs(index: &Index) {
let serialized_idx = bincode::serialize(index).expect("broken index");
if let Err(e) = fs::write(&*IDX_FILE_NAME, serialized_idx) {
error!("index dumping failed: {e}");
}
}
}
pub fn get(name: impl AsRef<str>) -> Option<FileMeta> {
let mut index = INDEX.lock();
sync::deleted(&mut index);
index.get(name.as_ref()).cloned()
}
pub fn get_by_hash(hash: impl AsRef<[u8]>) -> Option<(String, FileMeta)> {
let mut index = INDEX.lock();
sync::deleted(&mut index);
index
.iter()
.find(|(_name, meta)| meta.hash == hash.as_ref())
.map(|(n, m)| (n.to_owned(), m.clone()))
}
pub fn insert(name: impl Into<String>, meta: FileMeta) {
let mut index = INDEX.lock();
sync::deleted(&mut index);
index.insert(name.into(), meta);
#[cfg(any(feature = "panel", feature = "server"))]
sync::index2fs(&mut index);
}
pub fn remove(name: impl AsRef<str>) -> Option<FileMeta> {
let mut index = INDEX.lock();
sync::deleted(&mut index);
let result = index.remove(name.as_ref());
#[cfg(any(feature = "panel", feature = "server"))]
sync::index2fs(&mut index);
result
}

@ -1,9 +1,8 @@
// This module is aiming to store obfuscated payloads, get them by name, // This module is aiming to store obfuscated payloads, get them by name,
// rename, update, delete or prepare to execute via memfd_create (unix) // rename, update, delete or prepare to execute via memfd_create (unix)
use once_cell::sync::Lazy; use anyhow::{Context, Result};
use parking_lot::RwLock; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::{CString, OsString}; use std::ffi::{CString, OsString};
use std::fs::{self, File}; use std::fs::{self, File};
@ -11,75 +10,84 @@ use std::path::{Path, PathBuf};
use uuid::Uuid; use uuid::Uuid;
mod error; mod error;
mod index;
pub use error::Error; pub use error::Error;
// INDEX format: given_name -> payload_meta const OBFUSCATE: bool = cfg!(feature = "agent");
static INDEX: Lazy<RwLock<HashMap<String, FileMeta>>> = Lazy::new(|| RwLock::new(HashMap::new()));
struct FileMeta { #[derive(Clone, Deserialize, Serialize)]
path: PathBuf, pub struct FileMeta {
obfuscated: bool,
extension: Option<OsString>, extension: Option<OsString>,
external: bool, external: bool,
hash: Vec<u8>,
pub path: PathBuf,
pub size: u64,
} }
/// Remove deleted files from index impl FileMeta {
pub fn sync_index() { pub fn new(
let mut index = INDEX.write(); full_path: impl Into<PathBuf>,
hash: Vec<u8>,
external: bool,
) -> Result<Self, Error> {
let full_path: PathBuf = full_path.into();
let extension = full_path.extension().map(ToOwned::to_owned);
let size = fs::metadata(&full_path)?.len();
let files_to_delete: Vec<String> = index Ok(FileMeta {
.iter() path: full_path,
.filter_map(|(name, meta)| { extension,
if meta.path.exists() { external,
None hash,
} else { size,
Some(name.to_string())
}
}) })
.collect(); }
files_to_delete.into_iter().for_each(|f| {
index.remove(&f);
});
} }
pub fn in_index(name: impl AsRef<str>) -> bool { pub fn in_index(name: impl AsRef<str>) -> bool {
sync_index(); read_meta(name).is_ok()
INDEX.read().get(name.as_ref()).is_some()
} }
pub fn read(name: impl AsRef<str>) -> Result<Vec<u8>, Error> { #[inline]
sync_index(); pub fn read_meta(name: impl AsRef<str>) -> Result<FileMeta> {
index::get(&name)
let name = name.as_ref(); .ok_or_else(|| Error::not_found(name.as_ref()))
let index = INDEX.read(); .context("meta")
let meta = index.get(name).ok_or_else(|| Error::not_found(name))?; }
fs::read(&meta.path).map_err(|e| Error::new(e, name)) pub fn read(name: impl AsRef<str>) -> Result<Vec<u8>> {
let meta = read_meta(&name).context("read_meta")?;
fs::read(&meta.path)
.map_err(|e| Error::new(e, name.as_ref()))
.context("read")
} }
pub fn create(data: impl AsRef<[u8]>) -> Result<String, Error> { pub fn create_anonymous(data: impl AsRef<[u8]>) -> Result<String> {
if let Some((name, _)) = index::get_by_hash(hash_data(&data)) {
return Ok(name);
}
let name = Uuid::new_v4().simple().to_string(); let name = Uuid::new_v4().simple().to_string();
put(&name, data)?; put(&name, data).context("cr_anon")?;
Ok(name) Ok(name)
} }
/// Create new file and add to index /// Create new file and add to index
pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<(), Error> { pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
sync_index();
let name = name.as_ref(); let name = name.as_ref();
let obfuscate = !cfg!(feature = "server") && !cfg!(feature = "panel"); let data_hash = hash_data(&data);
if in_index(&name) { if in_index(&name) {
return Err(Error::already_exists(&name)); return Err(Error::already_exists(&name)).context("put_exists");
} }
let path = match index::get_by_hash(&data_hash) {
Some((_, meta)) => meta.path,
None => {
let path = { let path = {
let exec_name = if obfuscate { let exec_name = if OBFUSCATE {
PathBuf::from(Uuid::new_v4().simple().to_string()) PathBuf::from(Uuid::new_v4().simple().to_string())
} else { } else {
PathBuf::from(name) PathBuf::from(name)
@ -90,37 +98,32 @@ pub fn put(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<(), Error> {
path path
}; };
let extension = path.file_stem().map(ToOwned::to_owned); fs::write(&path, &data)
.map_err(|e| Error::new(e, name))
fs::write(&path, data).map_err(|e| Error::new(e, name))?; .context("put_write")?;
path
INDEX.write().insert( }
name.to_string(), };
FileMeta { index::insert(
path, name,
obfuscated: obfuscate, FileMeta::new(path, data_hash, false).context("put_insert")?,
extension,
external: false,
},
); );
Ok(()) Ok(())
} }
pub fn remove(name: impl AsRef<str>) -> Result<(), Error> { pub fn remove(name: impl AsRef<str>) -> Result<()> {
sync_index();
let name = name.as_ref(); let name = name.as_ref();
match index::remove(name) {
match INDEX.write().remove(name) { Some(value) if !value.external => fs::remove_file(value.path)
Some(value) => fs::remove_file(value.path).map_err(|e| Error::new(e, name)), .map_err(|e| Error::new(e, name))
None => Ok(()), .context("remove"),
_ => Ok(()),
} }
} }
pub fn rename(old_name: impl AsRef<str>, new_name: impl AsRef<str>) -> Result<(), Error> { // todo: don't rename external files
sync_index(); pub fn rename(old_name: impl AsRef<str>, new_name: impl AsRef<str>) -> Result<()> {
let old_name = old_name.as_ref(); let old_name = old_name.as_ref();
let new_name = new_name.as_ref(); let new_name = new_name.as_ref();
@ -129,67 +132,63 @@ pub fn rename(old_name: impl AsRef<str>, new_name: impl AsRef<str>) -> Result<()
} }
if !in_index(old_name) { if !in_index(old_name) {
return Err(Error::not_found(old_name)); return Err(Error::not_found(old_name)).context("rename");
} }
if in_index(new_name) { if in_index(new_name) {
return Err(Error::already_exists(new_name)); return Err(Error::already_exists(new_name)).context("rename");
} }
let mut value = INDEX.write().remove(old_name).unwrap(); let mut value = index::remove(old_name).unwrap();
if !value.obfuscated { if !OBFUSCATE {
let old_path = value.path.clone(); let old_path = value.path.clone();
value.path.pop(); value.path.pop();
value.path.push(new_name); value.path.push(new_name);
fs::rename(old_path, &value.path).map_err(|e| Error::new(e, &value.path))?; fs::rename(old_path, &value.path)
.map_err(|e| Error::new(e, &value.path))
.context("rename")?;
} }
INDEX.write().insert(new_name.to_string(), value); index::insert(new_name, value);
Ok(()) Ok(())
} }
pub fn update_payload_data(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<(), Error> { pub fn update_payload_data(name: impl AsRef<str>, data: impl AsRef<[u8]>) -> Result<()> {
sync_index();
let name = name.as_ref(); let name = name.as_ref();
let external = INDEX.read().get(name).map(|v| v.external).unwrap_or(false); let external = index::get(name).map(|v| v.external).unwrap_or(false);
if external { if external {
INDEX.write().remove(name); index::remove(name);
} else if in_index(&name) { } else if in_index(&name) {
remove(&name)?; remove(&name).context("upd")?;
} }
put(name, data) put(name, data).context("upd")
} }
/// Add an existing file to index /// Add an existing file to index
pub fn put_external(path: impl AsRef<Path>) -> Result<(), Error> { pub fn put_external(path: impl AsRef<Path>) -> Result<()> {
sync_index();
let path = path.as_ref(); let path = path.as_ref();
let path_str = path.as_os_str().to_string_lossy().to_string(); let path_str = path.as_os_str().to_string_lossy().to_string();
if !path.exists() || path.is_dir() { if in_index(&path_str) {
return Err(Error::not_found(path)); return Ok(());
} }
if in_index(&path_str) { if !path.exists() || path.is_dir() {
return Err(Error::already_exists(&path)); return Err(Error::not_found(path)).context("ext1");
} }
INDEX.write().insert( let file_data = fs::read(&path_str).unwrap();
let data_hash = hash_data(&file_data);
index::insert(
path_str, path_str,
FileMeta { FileMeta::new(path, data_hash, true).context("ext2")?,
path: path.to_owned(),
obfuscated: false,
extension: path.file_stem().map(ToOwned::to_owned),
external: true,
},
); );
Ok(()) Ok(())
@ -197,7 +196,7 @@ pub fn put_external(path: impl AsRef<Path>) -> Result<(), Error> {
/// Prepare executable file: unpack, decipher if needed and send under memfd /// Prepare executable file: unpack, decipher if needed and send under memfd
#[cfg(unix)] #[cfg(unix)]
pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error> { pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String)> {
use libc::getpid; use libc::getpid;
use nix::sys::memfd::*; use nix::sys::memfd::*;
use std::io::{Read, Write}; use std::io::{Read, Write};
@ -206,12 +205,8 @@ pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error
const FAKE_EXEC_NAME: &str = "/usr/sbin/lvmetad"; const FAKE_EXEC_NAME: &str = "/usr/sbin/lvmetad";
const BUFFER_LEN: usize = 4096; const BUFFER_LEN: usize = 4096;
sync_index();
let mut buffer: [u8; BUFFER_LEN] = [0; BUFFER_LEN]; let mut buffer: [u8; BUFFER_LEN] = [0; BUFFER_LEN];
let name = name.as_ref(); let payload_meta = read_meta(name).context("prep")?;
let index = INDEX.read();
let payload_meta = index.get(name).ok_or_else(|| Error::not_found(name))?;
let fd = memfd_create( let fd = memfd_create(
CString::new(FAKE_EXEC_NAME).unwrap().as_c_str(), CString::new(FAKE_EXEC_NAME).unwrap().as_c_str(),
@ -235,15 +230,16 @@ pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error
let payload_path = format!("/proc/{}/fd/{}", unsafe { getpid() }, fd); let payload_path = format!("/proc/{}/fd/{}", unsafe { getpid() }, fd);
Ok((payload_dest, payload_path)) Ok((payload_dest, payload_path))
} }
Err(e) => Err(Error::new(e, FAKE_EXEC_NAME)), Err(e) => Err(Error::new(e, FAKE_EXEC_NAME)).context("prep"),
} }
} }
#[cfg(windows)] #[cfg(windows)]
pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String), Error> { pub fn prepare_executable(name: impl AsRef<str>) -> Result<(File, String)> {
todo!() todo!()
} }
/*
pub fn cleanup() { pub fn cleanup() {
let index = INDEX.read(); let index = INDEX.read();
@ -251,3 +247,12 @@ pub fn cleanup() {
fs::remove_file(&f.path).ok(); fs::remove_file(&f.path).ok();
}); });
} }
*/
fn hash_data(data: impl AsRef<[u8]>) -> Vec<u8> {
use sha3::{Digest, Sha3_256};
let mut hasher = Sha3_256::new();
hasher.update(data);
hasher.finalize().to_vec()
}

@ -36,7 +36,7 @@ CREATE TABLE IF NOT EXISTS jobs (
argv TEXT NOT NULL, argv TEXT NOT NULL,
id UUID NOT NULL, id UUID NOT NULL,
exec_type JobType NOT NULL DEFAULT 'shell', exec_type JobType NOT NULL DEFAULT 'shell',
platform TEXT NOT NULL, target_platforms TEXT NOT NULL,
payload UUID, payload UUID,
schedule TEXT, schedule TEXT,

Loading…
Cancel
Save