diff --git a/Cargo.lock b/Cargo.lock index bcacca2..a898fde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1338,6 +1338,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -1504,6 +1513,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -1646,6 +1656,7 @@ dependencies = [ "sled", "thiserror", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 781dd0d..59eb04e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,5 @@ reqwest = "0.12.12" serde = { version = "1.0.217", features = ["derive"] } sled = "0.34.7" thiserror = "2.0.11" -tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "signal"] } +tokio-util = "0.7.13" diff --git a/README.md b/README.md index 1f6e2bc..3c2a82a 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ - удаление дубликатов ссылок - возможность указать размер пула обработчиков ссылок через URL_HANDLERS_POOL_SIZE - логирование и чтение настроек логирования через RUST_LOG - +- отмена загрузки ссылок на SIGHUP Для отправки HTTP-запросов к сервису можно использовать дополнительную утилиту run.py: ``` diff --git a/src/error.rs b/src/error.rs index 0a09ad3..b9616b5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,6 +23,9 @@ pub enum AppError { #[error(transparent)] Utf8Error(#[from] FromUtf8Error), + + #[error("Cancelled: {0}")] + Cancelled(String), } impl IntoResponse for AppError { diff --git a/src/handlers.rs b/src/handlers.rs index 853d7a4..e6dd3c7 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,9 +1,10 @@ use std::collections::HashSet; use axum::{extract::State, response::IntoResponse, Json}; +use tokio::select; use crate::{ - error::AppResult, + error::{AppError, AppResult}, models::{AppStateShared, UrlReportResponse, UrlRequest, UrlResult}, repo::Repo, }; @@ -62,16 +63,24 @@ pub async fn upload_urls( tokio::spawn(async move { let r_state = state.read().await; let _sem = r_state.url_handling_semaphore.acquire().await.unwrap(); + let ct = r_state.cancellation_token.clone(); info!("fetching {url}"); - let response: AppResult<_> = (|| async { + let request = (|| async { let resp = reqwest::get(url.to_string()).await?; let status = resp.status().as_u16(); let result = resp.text().await?; Ok((status, result)) - })() - .await; + })(); + + let response: AppResult<_> = select! { + r = request => r, + _ = ct.cancelled() => { + info!("fetching {url} cancelled"); + Err(AppError::Cancelled(url.to_string())) + } + }; match response { Ok((status, resp)) => UrlResult { @@ -92,8 +101,14 @@ pub async fn upload_urls( .collect::>() }; + let ct = state.read().await.cancellation_token.clone(); for task in tasks { - let result = task.await.unwrap(); + let result = if let Some(result) = ct.run_until_cancelled(task).await { + result.unwrap() + } else { + continue; + }; + let status = result.status.unwrap_or(0); if status >= 200 && status < 400 { @@ -102,7 +117,12 @@ pub async fn upload_urls( failures.push(result); } } - state.write().await.repo.insert_urls(&successes)?; + info!("fetching completed"); + + let mut w_state = state.write().await; + w_state.repo.insert_urls(&successes)?; + w_state.regen_cancellation_token(); + drop(w_state); Ok(Json(UrlReportResponse { successes, diff --git a/src/main.rs b/src/main.rs index 731f773..edd23a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use error::{AppError, AppResult}; use models::AppState; use repo::Repo; use tokio::{net::TcpListener, sync::RwLock}; +use tokio_util::sync::CancellationToken; #[macro_use] extern crate log; @@ -23,7 +24,27 @@ async fn run() -> AppResult<()> { info!("{cfg:?}"); let repo = create_repo()?; - let state = Arc::new(RwLock::new(AppState::new(cfg.url_handlers_pool_size, repo))); + let ct = CancellationToken::new(); + let state = Arc::new(RwLock::new(AppState::new( + cfg.url_handlers_pool_size, + repo, + ct.clone(), + ))); + + #[cfg(unix)] + { + use tokio::signal::unix::{signal, SignalKind}; + + let state = state.clone(); + tokio::spawn(async move { + let mut signal = signal(SignalKind::hangup()).unwrap(); + loop { + signal.recv().await; + state.read().await.cancellation_token.cancel(); + info!("SIGHUP"); + } + }); + } let app = Router::new() .route( diff --git a/src/models.rs b/src/models.rs index b4f7b1a..2bb8df9 100644 --- a/src/models.rs +++ b/src/models.rs @@ -3,6 +3,7 @@ use std::{collections::HashSet, sync::Arc}; use axum::http::Uri; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use tokio::sync::{RwLock, Semaphore}; +use tokio_util::sync::CancellationToken; use crate::repo::Repo; @@ -11,15 +12,21 @@ pub type AppStateShared = Arc>>; pub struct AppState { pub repo: R, pub url_handling_semaphore: Semaphore, + pub cancellation_token: CancellationToken, } impl AppState { - pub fn new(url_handlers_pool_size: usize, repo: R) -> Self { + pub fn new(url_handlers_pool_size: usize, repo: R, ct: CancellationToken) -> Self { Self { repo, url_handling_semaphore: Semaphore::new(url_handlers_pool_size), + cancellation_token: ct, } } + + pub fn regen_cancellation_token(&mut self) { + self.cancellation_token = CancellationToken::new(); + } } #[derive(Serialize)]