From 544c07cf8d558457f448c398446b887a4aa52da4 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Fri, 7 Oct 2022 04:57:49 +0500 Subject: [PATCH] 4:43 AM commit back: - fix broken deps - add connection check to panel - pretty-print panel errors - touch timestamps of updates - simplify u_server api - simplify u_lib api front: - add routing - use async instead of rxjs - define tables' columns in templates, not in ts - use different templates for different tables --- Cargo.toml | 3 +- bin/u_panel/src/argparse.rs | 4 +- .../server/fe/src/app/app-routing.module.ts | 11 ++- .../src/server/fe/src/app/app.component.html | 17 ++-- .../fe/src/app/core/services/api.service.ts | 36 +++++--- .../src/app/core/tables/agent.component.html | 68 +++++++++++++++ .../fe/src/app/core/tables/agent.component.ts | 83 ++++++++++--------- .../tables/dialogs/agent-info-dialog.html | 3 + .../tables/dialogs/agent_info.component.ts | 2 +- .../fe/src/app/core/tables/job.component.html | 68 +++++++++++++++ .../fe/src/app/core/tables/job.component.ts | 39 +-------- .../src/app/core/tables/result.component.html | 68 +++++++++++++++ .../src/app/core/tables/result.component.ts | 47 +++-------- .../src/app/core/tables/table.component.html | 37 --------- .../src/app/core/tables/table.component.less | 3 +- .../fe/src/app/core/tables/table.component.ts | 44 ++++------ .../src/server/fe/src/app/core/utils.ts | 4 + bin/u_panel/src/server/mod.rs | 10 ++- bin/u_server/Cargo.toml | 2 +- bin/u_server/src/db.rs | 11 +-- bin/u_server/src/handlers.rs | 21 ++--- bin/u_server/src/u_server.rs | 17 ++-- integration/tests/fixtures/agent.rs | 5 +- lib/u_lib/Cargo.toml | 2 +- lib/u_lib/src/api.rs | 50 ++++++----- lib/u_lib/src/config.rs | 1 + lib/u_lib/src/logging.rs | 1 + lib/u_lib/src/messaging/mod.rs | 5 +- lib/u_lib/src/models/agent.rs | 4 + lib/u_lib/src/models/jobs/assigned.rs | 4 + lib/u_lib/src/runner.rs | 33 +++++--- 31 files changed, 427 insertions(+), 276 deletions(-) create mode 100644 bin/u_panel/src/server/fe/src/app/core/tables/agent.component.html create mode 100644 bin/u_panel/src/server/fe/src/app/core/tables/job.component.html create mode 100644 bin/u_panel/src/server/fe/src/app/core/tables/result.component.html delete mode 100644 bin/u_panel/src/server/fe/src/app/core/tables/table.component.html diff --git a/Cargo.toml b/Cargo.toml index 8f5feed..bf4d243 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,11 @@ members = [ ] [workspace.dependencies] -anyhow = "1.0.58" +anyhow = "=1.0.63" reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +thiserror = "=1.0.31" tokio = { version = "1.11", features = ["macros"] } tracing = "0.1.35" tracing-appender = "0.2.0" diff --git a/bin/u_panel/src/argparse.rs b/bin/u_panel/src/argparse.rs index 80261f4..267f383 100644 --- a/bin/u_panel/src/argparse.rs +++ b/bin/u_panel/src/argparse.rs @@ -20,6 +20,7 @@ enum Cmd { Agents(RUD), Jobs(JobCRUD), Map(JobMapCRUD), + Ping, //TUI(TUIArgs), Serve, } @@ -119,13 +120,14 @@ pub async fn process_cmd(client: ClientHandler, args: Args) -> UResult { } JobMapCRUD::RUD(RUD::Delete { uid }) => to_json(client.del(uid).await), }, + Cmd::Ping => to_json(client.ping().await), /*Cmd::TUI(args) => crate::tui::init_tui(&args) .await .map_err(|e| UError::PanelError(e.to_string()))?,*/ Cmd::Serve => { crate::server::serve(client) .await - .map_err(|e| UError::PanelError(e.to_string()))?; + .map_err(|e| UError::PanelError(format!("{e:?}")))?; String::new() } }) diff --git a/bin/u_panel/src/server/fe/src/app/app-routing.module.ts b/bin/u_panel/src/server/fe/src/app/app-routing.module.ts index 0297262..0ec7bf4 100644 --- a/bin/u_panel/src/server/fe/src/app/app-routing.module.ts +++ b/bin/u_panel/src/server/fe/src/app/app-routing.module.ts @@ -1,7 +1,16 @@ import { NgModule } from '@angular/core'; import { RouterModule, Routes } from '@angular/router'; +import { AgentComponent } from './core/tables/agent.component'; +import { JobComponent } from './core/tables/job.component'; +import { ResultComponent } from './core/tables/result.component'; +import { AgentInfoDialogComponent } from './core/tables/dialogs/agent_info.component'; -const routes: Routes = []; +const routes: Routes = [ + { path: '', redirectTo: 'agents', pathMatch: 'full' }, + { path: 'agents', component: AgentComponent }, + { path: 'jobs', component: JobComponent }, + { path: 'results', component: ResultComponent }, +]; @NgModule({ imports: [RouterModule.forRoot(routes)], diff --git a/bin/u_panel/src/server/fe/src/app/app.component.html b/bin/u_panel/src/server/fe/src/app/app.component.html index 6aedf2e..8bb207a 100644 --- a/bin/u_panel/src/server/fe/src/app/app.component.html +++ b/bin/u_panel/src/server/fe/src/app/app.component.html @@ -1,11 +1,6 @@ - - - - - - - - - - - \ No newline at end of file + + \ No newline at end of file diff --git a/bin/u_panel/src/server/fe/src/app/core/services/api.service.ts b/bin/u_panel/src/server/fe/src/app/core/services/api.service.ts index cc04493..a7a609b 100644 --- a/bin/u_panel/src/server/fe/src/app/core/services/api.service.ts +++ b/bin/u_panel/src/server/fe/src/app/core/services/api.service.ts @@ -1,7 +1,7 @@ import { Injectable } from '@angular/core'; import { environment } from 'src/environments/environment'; import { HttpClient } from '@angular/common/http'; -import { Observable } from 'rxjs'; +import { firstValueFrom } from 'rxjs'; interface ServerResponse { status: "ok" | "err", @@ -17,27 +17,37 @@ export class ApiTableService { requestUrl = `${environment.server}/cmd/`; - req(cmd: string): Observable> { - return this.http.post>(this.requestUrl, cmd); + async req(cmd: string): Promise> { + return await firstValueFrom(this.http.post>(this.requestUrl, cmd)) } - getOne(id: string): Observable> { - return this.req(`${this.area} read ${id}`) + async getOne(id: string): Promise> { + const resp = await this.req(`${this.area} read ${id}`) + if (resp.data.length === 0) { + return { + status: 'err', + data: `${id} not found in ${this.area}` + } + } + return { + status: resp.status, + data: resp.data[0] + } } - getMany(): Observable> { - return this.req(`${this.area} read`) + async getMany(): Promise> { + return await this.req(`${this.area} read`) } - update(item: T): Observable> { - return this.req(`${this.area} update '${JSON.stringify(item)}'`) + async update(item: T): Promise> { + return await this.req(`${this.area} update '${JSON.stringify(item)}'`) } - delete(id: string): Observable> { - return this.req(`${this.area} delete ${id}`) + async delete(id: string): Promise> { + return await this.req(`${this.area} delete ${id}`) } - create(item: string): Observable> { - return this.req(`${this.area} create ${item}`) + async create(item: string): Promise> { + return await this.req(`${this.area} create ${item}`) } } \ No newline at end of file diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/agent.component.html b/bin/u_panel/src/server/fe/src/app/core/tables/agent.component.html new file mode 100644 index 0000000..80318c1 --- /dev/null +++ b/bin/u_panel/src/server/fe/src/app/core/tables/agent.component.html @@ -0,0 +1,68 @@ +
+ +
+
+ +
+ + Filter + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ID + {{row.id}} + Alias + {{row.alias}} + User + {{row.username}} + Hostname + {{row.hostname}} + Last active + {{row.last_active.secs_since_epoch * 1000 | date:'long'}} + + +
No data
+
+ + +
\ No newline at end of file diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/agent.component.ts b/bin/u_panel/src/server/fe/src/app/core/tables/agent.component.ts index dea2e61..c7ae033 100644 --- a/bin/u_panel/src/server/fe/src/app/core/tables/agent.component.ts +++ b/bin/u_panel/src/server/fe/src/app/core/tables/agent.component.ts @@ -1,57 +1,62 @@ -import { Component, OnInit } from '@angular/core'; +import { Component, OnDestroy, OnInit } from '@angular/core'; import { TablesComponent } from './table.component'; import { AgentModel } from '../models'; import { AgentInfoDialogComponent } from './dialogs/agent_info.component'; import { HttpClient } from '@angular/common/http'; import { MatDialog } from '@angular/material/dialog'; import { epochToStr } from '../utils'; +import { ActivatedRoute, Router } from '@angular/router'; +import { emitErr } from '../utils'; +import { Subscription } from 'rxjs'; @Component({ selector: 'agent-table', - templateUrl: './table.component.html', + templateUrl: './agent.component.html', styleUrls: ['./table.component.less'] }) -export class AgentComponent extends TablesComponent { +export class AgentComponent extends TablesComponent implements OnDestroy, OnInit { - constructor(public override _httpClient: HttpClient, public override info_dlg: MatDialog) { + dialogSubscr!: Subscription; + area = 'agents' as const; + + displayedColumns = ['id', 'alias', 'username', 'hostname', 'last_active', 'actions'] + + constructor( + public override _httpClient: HttpClient, + public override info_dlg: MatDialog, + public route: ActivatedRoute, + public router: Router + ) { super(_httpClient, info_dlg); } - area = 'agents' as const; + override ngOnInit(): void { + super.ngOnInit() + this.dialogSubscr = this.route.queryParams.subscribe(params => { + const id = params['id'] + if (id) { + this.show_item_dialog(id); + } + }) + } + + show_item_dialog(id: string) { + this.data_source!.getOne(id).then(resp => { + if (resp.status === 'ok') { + const dialog = this.info_dlg.open(AgentInfoDialogComponent, { + data: resp.data as AgentModel + }); + + dialog.afterClosed().subscribe(result => { + this.router.navigate(['.'], { relativeTo: this.route }) + }) + } else { + emitErr(resp.data) + } + }).catch(emitErr) + } - columns = [ - { - def: "id", - name: "ID", - cell: (cell: AgentModel) => cell.id - }, - { - def: "alias", - name: "Alias", - cell: (cell: AgentModel) => cell.alias ?? "" - }, - { - def: "username", - name: "User", - cell: (cell: AgentModel) => cell.username - }, - { - def: "hostname", - name: "Host", - cell: (cell: AgentModel) => cell.hostname - }, - { - def: "last_active", - name: "Last active", - cell: (cell: AgentModel) => epochToStr(cell.last_active.secs_since_epoch) - }, - ] - - displayedColumns = this.columns.map((c) => c.def); - - show_item_dialog(obj: AgentModel) { - const dialog = this.info_dlg.open(AgentInfoDialogComponent, { - data: obj - }); + ngOnDestroy(): void { + this.dialogSubscr.unsubscribe() } } diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/dialogs/agent-info-dialog.html b/bin/u_panel/src/server/fe/src/app/core/tables/dialogs/agent-info-dialog.html index eaf5e5a..2afdeab 100644 --- a/bin/u_panel/src/server/fe/src/app/core/tables/dialogs/agent-info-dialog.html +++ b/bin/u_panel/src/server/fe/src/app/core/tables/dialogs/agent-info-dialog.html @@ -1,9 +1,12 @@
+

ID: {{data.id}}

Alias: {{data.alias}}

Username: {{data.username}}

Hostname: {{data.hostname}}

Platform: {{data.platform}}

+

Registration time: {{data.regtime.secs_since_epoch * 1000 | date:'long'}}

+

Last active time: {{data.last_active.secs_since_epoch * 1000 | date:'long'}}

diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/dialogs/agent_info.component.ts b/bin/u_panel/src/server/fe/src/app/core/tables/dialogs/agent_info.component.ts index 1c318ca..1d85364 100644 --- a/bin/u_panel/src/server/fe/src/app/core/tables/dialogs/agent_info.component.ts +++ b/bin/u_panel/src/server/fe/src/app/core/tables/dialogs/agent_info.component.ts @@ -8,4 +8,4 @@ import { AgentModel } from '../../models/agent.model'; }) export class AgentInfoDialogComponent { constructor(@Inject(MAT_DIALOG_DATA) public data: AgentModel) { } -} \ No newline at end of file +} \ No newline at end of file diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/job.component.html b/bin/u_panel/src/server/fe/src/app/core/tables/job.component.html new file mode 100644 index 0000000..1234aa6 --- /dev/null +++ b/bin/u_panel/src/server/fe/src/app/core/tables/job.component.html @@ -0,0 +1,68 @@ +
+ +
+
+ +
+ + Filter + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ID + {{row.id}} + Alias + {{row.alias}} + Cmd-line args + {{row.argv}} + Platform + {{row.platform}} + Payload + {{row.payload}} + Type + {{row.exec_type}} +
No data
+
+ + +
\ No newline at end of file diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/job.component.ts b/bin/u_panel/src/server/fe/src/app/core/tables/job.component.ts index 28f9c92..0403bea 100644 --- a/bin/u_panel/src/server/fe/src/app/core/tables/job.component.ts +++ b/bin/u_panel/src/server/fe/src/app/core/tables/job.component.ts @@ -4,45 +4,12 @@ import { JobModel } from '../models'; @Component({ selector: 'job-table', - templateUrl: './table.component.html', + templateUrl: './job.component.html', styleUrls: ['./table.component.less'] }) export class JobComponent extends TablesComponent { area = 'jobs' as const; + displayedColumns = ['id', 'alias', 'argv', 'platform', 'payload', 'exec_type'] - columns = [ - { - def: "id", - name: "ID", - cell: (cell: JobModel) => cell.id - }, - { - def: "alias", - name: "Alias", - cell: (cell: JobModel) => cell.alias - }, - { - def: "argv", - name: "Cmd-line args", - cell: (cell: JobModel) => cell.argv - }, - { - def: "platform", - name: "Platform", - cell: (cell: JobModel) => cell.platform - }, - { - def: "payload", - name: "Payload", - cell: (cell: JobModel) => `${cell.payload}` - }, - { - def: "etype", - name: "Type", - cell: (cell: JobModel) => cell.exec_type - }, - ] - displayedColumns = this.columns.map((c) => c.def); - - show_item_dialog(obj: JobModel) { } + show_item_dialog(id: string) { } } diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/result.component.html b/bin/u_panel/src/server/fe/src/app/core/tables/result.component.html new file mode 100644 index 0000000..e9abbd7 --- /dev/null +++ b/bin/u_panel/src/server/fe/src/app/core/tables/result.component.html @@ -0,0 +1,68 @@ +
+ +
+
+ +
+ + Filter + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ID + {{row.id}} + Alias + {{row.alias}} + Agent + {{row.agent_id}} + Job + {{row.job_id}} + State + {{row.state}} {{(row.state === "Finished") ? '(' + row.retcode + ')' : ''}} + ID + {{row.updated.secs_since_epoch * 1000| date:'long'}} +
No data
+
+ + +
\ No newline at end of file diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/result.component.ts b/bin/u_panel/src/server/fe/src/app/core/tables/result.component.ts index 3b7cade..b477094 100644 --- a/bin/u_panel/src/server/fe/src/app/core/tables/result.component.ts +++ b/bin/u_panel/src/server/fe/src/app/core/tables/result.component.ts @@ -4,48 +4,23 @@ import { ResultModel } from '../models'; import { epochToStr } from '../utils'; @Component({ - selector: 'result-table', - templateUrl: './table.component.html', + selector: 'results-table', + templateUrl: './result.component.html', styleUrls: ['./table.component.less'] }) export class ResultComponent extends TablesComponent { area = 'map' as const; - columns = [ - { - def: "id", - name: "ID", - cell: (cell: ResultModel) => cell.id - }, - { - def: "alias", - name: "Alias", - cell: (cell: ResultModel) => cell.alias - }, - { - def: "agent_id", - name: "Agent ID", - cell: (cell: ResultModel) => cell.agent_id - }, - { - def: "job_id", - name: "Job ID", - cell: (cell: ResultModel) => cell.job_id - }, - { - def: "state", - name: "State", - cell: (cell: ResultModel) => `${cell.state} `.concat((cell.state === "Finished") ? `(${cell.retcode})` : '') - }, - { - def: "last_updated", - name: "Last updated", - cell: (cell: ResultModel) => epochToStr(cell.updated.secs_since_epoch) - }, - ] - displayedColumns = this.columns.map((c) => c.def); + displayedColumns = [ + 'id', + 'alias', + 'agent_id', + 'job_id', + 'state', + 'last_updated' + ]; - show_item_dialog(obj: ResultModel) { + show_item_dialog(id: string) { } } diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/table.component.html b/bin/u_panel/src/server/fe/src/app/core/tables/table.component.html deleted file mode 100644 index c54abd9..0000000 --- a/bin/u_panel/src/server/fe/src/app/core/tables/table.component.html +++ /dev/null @@ -1,37 +0,0 @@ -
- -
-
- -
- - Filter - - - - - - - - - - - - - - - - - -
- {{column.name}} - - {{column.cell(row)}} -
No data
-
- - -
\ No newline at end of file diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/table.component.less b/bin/u_panel/src/server/fe/src/app/core/tables/table.component.less index 931742f..6b3ee6b 100644 --- a/bin/u_panel/src/server/fe/src/app/core/tables/table.component.less +++ b/bin/u_panel/src/server/fe/src/app/core/tables/table.component.less @@ -12,7 +12,7 @@ left: 0; bottom: 56px; right: 0; - background: rgba(0, 0, 0, 0.15); + //background: rgba(0, 0, 0, 0.15); z-index: 1; display: flex; align-items: center; @@ -25,5 +25,4 @@ .data-table-row:hover { background: whitesmoke; - cursor: pointer; } \ No newline at end of file diff --git a/bin/u_panel/src/server/fe/src/app/core/tables/table.component.ts b/bin/u_panel/src/server/fe/src/app/core/tables/table.component.ts index 39107d7..f0cb6da 100644 --- a/bin/u_panel/src/server/fe/src/app/core/tables/table.component.ts +++ b/bin/u_panel/src/server/fe/src/app/core/tables/table.component.ts @@ -9,7 +9,7 @@ import { MatDialog } from '@angular/material/dialog'; @Directive() export abstract class TablesComponent implements OnInit { abstract area: "agents" | "jobs" | "map"; - data_source!: ApiTableService | null; + data_source!: ApiTableService; table_data!: MatTableDataSource; isLoadingResults = true; @@ -20,34 +20,20 @@ export abstract class TablesComponent implements OnInit { ngOnInit() { this.data_source = new ApiTableService(this._httpClient, this.area); - this.fetch_many(); - // If the user changes the sort order, reset back to the first page. - //this.sort.sortChange.subscribe(() => (this.paginator.pageIndex = 0)); - + this.fetchMany(); } - fetch_many() { - timer(0) - .pipe( - startWith({}), - switchMap(() => { - this.isLoadingResults = true; - return this.data_source!.getMany().pipe(catchError(() => observableOf(null))); - }), - map(data => { - this.isLoadingResults = false; - - if (data === null) { - return "no data returned" - } - - // Only refresh the result length if there is new data. In case of rate - // limit errors, we do not want to reset the paginator to zero, as that - // would prevent users from re-triggering requests. - return data.data; - }), - ) - .subscribe(data => { if (typeof data !== 'string') { this.table_data.data = data } else { alert(`Error: ${data}`) } }); + async fetchMany() { + this.isLoadingResults = true; + //possibly needs try/catch + const data = await this.data_source!.getMany(); + this.isLoadingResults = false; + + if (typeof data.data !== 'string') { + this.table_data.data = data.data + } else { + alert(`Error: ${data}`) + }; } apply_filter(event: Event) { @@ -55,10 +41,8 @@ export abstract class TablesComponent implements OnInit { this.table_data.filter = filterValue.trim().toLowerCase(); } - abstract show_item_dialog(obj: T): void; - - abstract columns: ColumnDef[]; abstract displayedColumns: string[]; + abstract show_item_dialog(id: string): void; } type ColumnDef = { diff --git a/bin/u_panel/src/server/fe/src/app/core/utils.ts b/bin/u_panel/src/server/fe/src/app/core/utils.ts index 27aadd8..128a6a2 100644 --- a/bin/u_panel/src/server/fe/src/app/core/utils.ts +++ b/bin/u_panel/src/server/fe/src/app/core/utils.ts @@ -1,3 +1,7 @@ export function epochToStr(epoch: number): string { return new Date(epoch * 1000).toLocaleString('en-GB') +} + +export function emitErr(e: any) { + alert(e) } \ No newline at end of file diff --git a/bin/u_panel/src/server/mod.rs b/bin/u_panel/src/server/mod.rs index 49b08f7..1640e45 100644 --- a/bin/u_panel/src/server/mod.rs +++ b/bin/u_panel/src/server/mod.rs @@ -77,9 +77,12 @@ async fn send_cmd( ) } -pub async fn serve(client: ClientHandler) -> std::io::Result<()> { +pub async fn serve(client: ClientHandler) -> anyhow::Result<()> { + info!("Connecting to u_server..."); + client.ping().await?; + let addr = "127.0.0.1:8080"; - info!("Serving at http://{}", addr); + info!("Connected, instanciating u_panel at http://{}", addr); HttpServer::new(move || { App::new() @@ -92,5 +95,6 @@ pub async fn serve(client: ClientHandler) -> std::io::Result<()> { }) .bind(addr)? .run() - .await + .await?; + Ok(()) } diff --git a/bin/u_server/Cargo.toml b/bin/u_server/Cargo.toml index 474ecb1..1209d12 100644 --- a/bin/u_server/Cargo.toml +++ b/bin/u_server/Cargo.toml @@ -12,7 +12,7 @@ once_cell = "1.7.2" openssl = "*" serde = { workspace = true } serde_json = { workspace = true } -thiserror = "*" +thiserror = { workspace = true } tracing = { workspace = true } tokio = { workspace = true, features = ["macros"] } uuid = { workspace = true, features = ["serde", "v4"] } diff --git a/bin/u_server/src/db.rs b/bin/u_server/src/db.rs index e72b2f9..e18824f 100644 --- a/bin/u_server/src/db.rs +++ b/bin/u_server/src/db.rs @@ -152,13 +152,10 @@ impl UDB { let job_requests = job_uids .iter() - .map(|job_uid| { - debug!("set_jobs_for_agent: set {} for {}", job_uid, agent_uid); - AssignedJob { - job_id: *job_uid, - agent_id: *agent_uid, - ..Default::default() - } + .map(|job_uid| AssignedJob { + job_id: *job_uid, + agent_id: *agent_uid, + ..Default::default() }) .collect::>(); diff --git a/bin/u_server/src/handlers.rs b/bin/u_server/src/handlers.rs index 43139ab..39f3398 100644 --- a/bin/u_server/src/handlers.rs +++ b/bin/u_server/src/handlers.rs @@ -1,5 +1,3 @@ -use std::time::SystemTime; - use crate::db::UDB; use crate::error::Error; use u_lib::{ @@ -33,19 +31,22 @@ impl Endpoints { .map_err(From::from) } - pub async fn get_personal_jobs(uid: Option) -> EndpResult> { - let agents = UDB::lock_db().get_agents(uid)?; + pub async fn get_personal_jobs(uid: Uuid) -> EndpResult> { + let db = UDB::lock_db(); + let mut agents = db.get_agents(Some(uid))?; if agents.is_empty() { - let db = UDB::lock_db(); - db.insert_agent(&Agent::with_id(uid.unwrap()))?; + db.insert_agent(&Agent::with_id(uid))?; let job = db .find_job_by_alias("agent_hello")? .expect("agent_hello job not found"); - db.set_jobs_for_agent(&uid.unwrap(), &[job.id])?; + db.set_jobs_for_agent(&uid, &[job.id])?; + } else { + let mut agent = agents.pop().unwrap(); + agent.touch(); + db.update_agent(&agent)?; } - let result = UDB::lock_db().get_exact_jobs(uid, true)?; + let result = db.get_exact_jobs(Some(uid), true)?; - let db = UDB::lock_db(); for j in result.iter() { db.update_job_status(j.id, JobState::Running)?; } @@ -106,7 +107,7 @@ impl Endpoints { continue; } result.state = JobState::Finished; - result.updated = SystemTime::now(); + result.touch(); match result.exec_type { JobType::Init => match &result.result { Some(rbytes) => { diff --git a/bin/u_server/src/u_server.rs b/bin/u_server/src/u_server.rs index 4417e8f..70a5b9f 100644 --- a/bin/u_server/src/u_server.rs +++ b/bin/u_server/src/u_server.rs @@ -91,7 +91,7 @@ pub fn init_endpoints( .map(into_message); let get_personal_jobs = path("get_personal_jobs") - .and(warp::path::param::().map(Some)) + .and(warp::path::param::()) .and_then(Endpoints::get_personal_jobs) .map(into_message); @@ -131,6 +131,8 @@ pub fn init_endpoints( .and_then(Endpoints::download) .map(ok); + let ping = path("ping").map(reply); + let auth_token = format!("Bearer {auth_token}",).into_boxed_str(); let auth_header = warp::header::exact("authorization", Box::leak(auth_token)); @@ -143,7 +145,8 @@ pub fn init_endpoints( .or(update_agent) .or(update_job) .or(update_assigned_job) - .or(download)) + .or(download) + .or(ping)) .and(auth_header); let agent_zone = get_jobs.or(get_personal_jobs).or(report).or(download); @@ -151,7 +154,7 @@ pub fn init_endpoints( auth_zone.or(agent_zone) } -pub fn prefill_jobs() -> Result<(), ServerError> { +pub fn preload_jobs() -> Result<(), ServerError> { let job_alias = "agent_hello"; let if_job_exists = UDB::lock_db().find_job_by_alias(job_alias)?; if if_job_exists.is_none() { @@ -167,7 +170,7 @@ pub fn prefill_jobs() -> Result<(), ServerError> { pub async fn serve() -> Result<(), ServerError> { init_logger(Some("u_server")); - prefill_jobs()?; + preload_jobs()?; let certs_dir = PathBuf::from("certs"); let env = load_env::().map_err(|e| ServerError::Other(e.to_string()))?; @@ -192,6 +195,7 @@ async fn handle_rejection(rej: Rejection) -> Result { } else if rej.is_not_found() { RejResponse::not_found("not found placeholder") } else { + error!("{:?}", rej); RejResponse::internal() }; Ok(resp.into_response()) @@ -199,7 +203,7 @@ async fn handle_rejection(rej: Rejection) -> Result { fn logger(info: Info<'_>) { info!(target: "warp", - "{raddr} {agent_uid} \"{path}\"", + "{raddr} {agent_uid} \"{path}\" {status}", raddr = info.remote_addr().unwrap_or(([0, 0, 0, 0], 0).into()), path = info.path(), agent_uid = info.user_agent() @@ -207,7 +211,8 @@ fn logger(info: Info<'_>) { .take(2) .collect::() ) - .unwrap_or_else(|| "NO_AGENT".to_string()) + .unwrap_or_else(|| "NO_AGENT".to_string()), + status = info.status() ); } diff --git a/integration/tests/fixtures/agent.rs b/integration/tests/fixtures/agent.rs index 5f825d1..2e6cdce 100644 --- a/integration/tests/fixtures/agent.rs +++ b/integration/tests/fixtures/agent.rs @@ -29,10 +29,7 @@ pub async fn register_agent() -> RegisteredAgent { assert_eq!(job.alias, Some("agent_hello".to_string())); let mut agent_data = AssignedJob::from(&job); agent_data.agent_id = agent_uid; - agent_data.set_result(&Agent { - id: agent_uid, - ..Default::default() - }); + agent_data.set_result(&Agent::with_id(agent_uid)); cli.report(Reportable::Assigned(agent_data)).await.unwrap(); RegisteredAgent { uid: agent_uid } } diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 24b943f..1696f34 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -26,7 +26,7 @@ shlex = "1.0.0" serde = { workspace = true } serde_json = { workspace = true } strum = { version = "0.20", features = ["derive"] } -thiserror = "*" +thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "process", "time"] } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index b18cc5c..b64913c 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use crate::{ config::{get_self_uid, MASTER_PORT}, - messaging::{self, AsMsg, BaseMessage, Empty}, + messaging::{self, AsMsg, BaseMessage}, models::{self}, utils::{opt_to_string, OneOrVec}, UError, @@ -48,17 +48,24 @@ impl ClientHandler { } } - async fn _req( + async fn req(&self, url: impl AsRef) -> Result { + self.req_with_payload(url, ()).await + } + + async fn req_with_payload( &self, url: impl AsRef, payload: P, - ) -> Result { + ) -> Result { let request = self .client .post(self.base_url.join(url.as_ref()).unwrap()) .json(&payload.as_message()); - let response = request.send().await.context("send")?; + let response = request + .send() + .await + .context("error while sending request")?; let content_len = response.content_length(); let is_success = match response.error_for_status_ref() { Ok(_) => Ok(()), @@ -67,7 +74,7 @@ impl ClientHandler { let resp = response.text().await.context("resp")?; debug!("url = {}, resp = {}", url.as_ref(), resp); match is_success { - Ok(_) => from_str::>(&resp) + Ok(_) => from_str::>(&resp) .map(|msg| msg.into_inner()) .or_else(|e| match content_len { Some(0) => Ok(Default::default()), @@ -81,24 +88,22 @@ impl ClientHandler { // get jobs for client pub async fn get_personal_jobs(&self, url_param: Uuid) -> Result> { - self._req(format!("get_personal_jobs/{}", url_param), Empty) - .await + self.req(format!("get_personal_jobs/{}", url_param)).await } // send something to server - pub async fn report(&self, payload: impl OneOrVec) -> Result { - self._req("report", payload.into_vec()).await + pub async fn report(&self, payload: impl OneOrVec) -> Result<()> { + self.req_with_payload("report", payload.into_vec()).await } // download file pub async fn dl(&self, file: String) -> Result> { - self._req(format!("dl/{file}"), Empty).await + self.req(format!("dl/{file}")).await } /// get all available jobs pub async fn get_jobs(&self, job: Option) -> Result> { - self._req(format!("get_jobs/{}", opt_to_string(job)), Empty) - .await + self.req(format!("get_jobs/{}", opt_to_string(job))).await } } @@ -107,23 +112,24 @@ impl ClientHandler { impl ClientHandler { /// agent listing pub async fn get_agents(&self, agent: Option) -> Result> { - self._req(format!("get_agents/{}", opt_to_string(agent)), Empty) + self.req(format!("get_agents/{}", opt_to_string(agent))) .await } /// update something - pub async fn update_item(&self, item: impl AsMsg + Debug) -> Result { - self._req("update_item", item).await + pub async fn update_item(&self, item: impl AsMsg + Debug) -> Result<()> { + self.req_with_payload("update_item", item).await } /// create and upload job - pub async fn upload_jobs(&self, payload: impl OneOrVec) -> Result { - self._req("upload_jobs", payload.into_vec()).await + pub async fn upload_jobs(&self, payload: impl OneOrVec) -> Result<()> { + self.req_with_payload("upload_jobs", payload.into_vec()) + .await } /// delete something pub async fn del(&self, item: Uuid) -> Result { - self._req(format!("del/{item}"), Empty).await + self.req(format!("del/{item}")).await } /// set jobs for any agent @@ -132,13 +138,17 @@ impl ClientHandler { agent: Uuid, job_idents: impl OneOrVec, ) -> Result> { - self._req(format!("set_jobs/{agent}"), job_idents.into_vec()) + self.req_with_payload(format!("set_jobs/{agent}"), job_idents.into_vec()) .await } /// get jobs for any agent pub async fn get_agent_jobs(&self, agent: Option) -> Result> { - self._req(format!("get_agent_jobs/{}", opt_to_string(agent)), Empty) + self.req(format!("get_agent_jobs/{}", opt_to_string(agent))) .await } + + pub async fn ping(&self) -> Result<()> { + self.req("ping").await + } } diff --git a/lib/u_lib/src/config.rs b/lib/u_lib/src/config.rs index 50d3408..3ab8fce 100644 --- a/lib/u_lib/src/config.rs +++ b/lib/u_lib/src/config.rs @@ -7,6 +7,7 @@ lazy_static! { static ref UID: Uuid = Uuid::new_v4(); } +#[inline] pub fn get_self_uid() -> Uuid { *UID } diff --git a/lib/u_lib/src/logging.rs b/lib/u_lib/src/logging.rs index 7c4b991..9c4f1db 100644 --- a/lib/u_lib/src/logging.rs +++ b/lib/u_lib/src/logging.rs @@ -12,6 +12,7 @@ pub fn init_logger(logfile: Option + Send + Sync + 'static>) { let reg = registry() .with(EnvFilter::from_default_env()) .with(fmt::layer()); + match logfile { Some(file) => reg .with( diff --git a/lib/u_lib/src/messaging/mod.rs b/lib/u_lib/src/messaging/mod.rs index b1342bf..f1e82e5 100644 --- a/lib/u_lib/src/messaging/mod.rs +++ b/lib/u_lib/src/messaging/mod.rs @@ -16,12 +16,9 @@ impl AsMsg for Reportable {} impl AsMsg for JobMeta {} impl AsMsg for String {} impl AsMsg for Uuid {} -impl AsMsg for Empty {} impl AsMsg for i32 {} impl AsMsg for u8 {} - -#[derive(Serialize, Deserialize, Clone, Default, Debug)] -pub struct Empty; +impl AsMsg for () {} #[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] pub enum Reportable { diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index 227ce90..5ad5549 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -79,6 +79,10 @@ impl Agent { } } + pub fn touch(&mut self) { + self.last_active = SystemTime::now(); + } + #[cfg(unix)] pub async fn gather() -> Self { let mut builder = NamedJobRunner::from_shell(vec![ diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index 2230f70..78850a4 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -112,4 +112,8 @@ impl AssignedJob { pub fn set_result(&mut self, result: &S) { self.result = Some(serde_json::to_vec(result).unwrap()); } + + pub fn touch(&mut self) { + self.updated = SystemTime::now() + } } diff --git a/lib/u_lib/src/runner.rs b/lib/u_lib/src/runner.rs index 133a6c0..6383cea 100644 --- a/lib/u_lib/src/runner.rs +++ b/lib/u_lib/src/runner.rs @@ -12,24 +12,24 @@ use tokio::process::Command; pub struct JobRunner { waiter: Waiter, + is_running: bool, } impl JobRunner { pub fn from_jobs(jobs: impl OneOrVec) -> CombinedResult { let jobs = jobs.into_vec(); let mut waiter = Waiter::new(); - let mut result = CombinedResult::::new(); + let mut result = CombinedResult::new(); for job in jobs { //waiting for try-blocks stabilization - let built_job = (|| -> UResult<()> { + let built_job: UResult<()> = (|| { let meta = JobCache::get(job.job_id).ok_or(UError::NoJob(job.job_id))?; let curr_platform = Platform::current(); if !curr_platform.matches(&meta.platform) { return Err(UError::InsuitablePlatform( meta.platform.clone(), curr_platform.into_string(), - ) - .into()); + )); } let job = AssignedJob::from((&*meta, job)); waiter.push(run_assigned_job(job)); @@ -39,7 +39,10 @@ impl JobRunner { result.err(e) } } - result.ok(Self { waiter }); + result.ok(Self { + waiter, + is_running: false, + }); result } @@ -48,10 +51,10 @@ impl JobRunner { .into_vec() .into_iter() .map(|jm| { - let job_uid = jm.id; + let job_id = jm.id; JobCache::insert(jm); AssignedJobById { - job_id: job_uid, + job_id, ..Default::default() } }) @@ -62,17 +65,23 @@ impl JobRunner { /// Spawn jobs pub async fn spawn(mut self) -> Self { self.waiter = self.waiter.spawn().await; + self.is_running = true; self } /// Spawn jobs and wait for result pub async fn wait(self) -> Vec { - self.waiter.spawn().await.wait().await + let waiter = if !self.is_running { + self.spawn().await.waiter + } else { + self.waiter + }; + waiter.wait().await } /// Spawn one job and wait for result pub async fn wait_one(self) -> ExecResult { - self.waiter.spawn().await.wait().await.pop().unwrap() + self.wait().await.pop().unwrap() } } @@ -125,7 +134,7 @@ pub async fn run_assigned_job(mut job: AssignedJob) -> ExecResult { /// Store jobs and get results by name pub struct NamedJobRunner { - builder: Option, + runner: Option, job_names: Vec<&'static str>, results: HashMap<&'static str, ExecResult>, } @@ -163,14 +172,14 @@ impl NamedJobRunner { }) .collect(); Self { - builder: Some(JobRunner::from_meta(job_metas).unwrap_one()), + runner: Some(JobRunner::from_meta(job_metas).unwrap_one()), job_names, results: HashMap::new(), } } pub async fn wait(mut self) -> Self { - let results = self.builder.take().unwrap().wait().await; + let results = self.runner.take().unwrap().wait().await; for (name, result) in self.job_names.iter().zip(results.into_iter()) { self.results.insert(name, result); }