add cancellation on sighup

master
plazmoid 1 month ago
parent c448909f82
commit aaef6ed851
  1. 11
      Cargo.lock
  2. 3
      Cargo.toml
  3. 2
      README.md
  4. 3
      src/error.rs
  5. 32
      src/handlers.rs
  6. 23
      src/main.rs
  7. 9
      src/models.rs

11
Cargo.lock generated

@ -1338,6 +1338,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.9"
@ -1504,6 +1513,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
@ -1646,6 +1656,7 @@ dependencies = [
"sled",
"thiserror",
"tokio",
"tokio-util",
]
[[package]]

@ -15,4 +15,5 @@ reqwest = "0.12.12"
serde = { version = "1.0.217", features = ["derive"] }
sled = "0.34.7"
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = "0.7.13"

@ -13,7 +13,7 @@
- удаление дубликатов ссылок
- возможность указать размер пула обработчиков ссылок через URL_HANDLERS_POOL_SIZE
- логирование и чтение настроек логирования через RUST_LOG
- отмена загрузки ссылок на SIGHUP
Для отправки HTTP-запросов к сервису можно использовать дополнительную утилиту run.py:
```

@ -23,6 +23,9 @@ pub enum AppError {
#[error(transparent)]
Utf8Error(#[from] FromUtf8Error),
#[error("Cancelled: {0}")]
Cancelled(String),
}
impl IntoResponse for AppError {

@ -1,9 +1,10 @@
use std::collections::HashSet;
use axum::{extract::State, response::IntoResponse, Json};
use tokio::select;
use crate::{
error::AppResult,
error::{AppError, AppResult},
models::{AppStateShared, UrlReportResponse, UrlRequest, UrlResult},
repo::Repo,
};
@ -62,16 +63,24 @@ pub async fn upload_urls<R: Repo>(
tokio::spawn(async move {
let r_state = state.read().await;
let _sem = r_state.url_handling_semaphore.acquire().await.unwrap();
let ct = r_state.cancellation_token.clone();
info!("fetching {url}");
let response: AppResult<_> = (|| async {
let request = (|| async {
let resp = reqwest::get(url.to_string()).await?;
let status = resp.status().as_u16();
let result = resp.text().await?;
Ok((status, result))
})()
.await;
})();
let response: AppResult<_> = select! {
r = request => r,
_ = ct.cancelled() => {
info!("fetching {url} cancelled");
Err(AppError::Cancelled(url.to_string()))
}
};
match response {
Ok((status, resp)) => UrlResult {
@ -92,8 +101,14 @@ pub async fn upload_urls<R: Repo>(
.collect::<Vec<_>>()
};
let ct = state.read().await.cancellation_token.clone();
for task in tasks {
let result = task.await.unwrap();
let result = if let Some(result) = ct.run_until_cancelled(task).await {
result.unwrap()
} else {
continue;
};
let status = result.status.unwrap_or(0);
if status >= 200 && status < 400 {
@ -102,7 +117,12 @@ pub async fn upload_urls<R: Repo>(
failures.push(result);
}
}
state.write().await.repo.insert_urls(&successes)?;
info!("fetching completed");
let mut w_state = state.write().await;
w_state.repo.insert_urls(&successes)?;
w_state.regen_cancellation_token();
drop(w_state);
Ok(Json(UrlReportResponse {
successes,

@ -12,6 +12,7 @@ use error::{AppError, AppResult};
use models::AppState;
use repo::Repo;
use tokio::{net::TcpListener, sync::RwLock};
use tokio_util::sync::CancellationToken;
#[macro_use]
extern crate log;
@ -23,7 +24,27 @@ async fn run() -> AppResult<()> {
info!("{cfg:?}");
let repo = create_repo()?;
let state = Arc::new(RwLock::new(AppState::new(cfg.url_handlers_pool_size, repo)));
let ct = CancellationToken::new();
let state = Arc::new(RwLock::new(AppState::new(
cfg.url_handlers_pool_size,
repo,
ct.clone(),
)));
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let state = state.clone();
tokio::spawn(async move {
let mut signal = signal(SignalKind::hangup()).unwrap();
loop {
signal.recv().await;
state.read().await.cancellation_token.cancel();
info!("SIGHUP");
}
});
}
let app = Router::new()
.route(

@ -3,6 +3,7 @@ use std::{collections::HashSet, sync::Arc};
use axum::http::Uri;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tokio::sync::{RwLock, Semaphore};
use tokio_util::sync::CancellationToken;
use crate::repo::Repo;
@ -11,15 +12,21 @@ pub type AppStateShared<R> = Arc<RwLock<AppState<R>>>;
pub struct AppState<R: Repo> {
pub repo: R,
pub url_handling_semaphore: Semaphore,
pub cancellation_token: CancellationToken,
}
impl<R: Repo> AppState<R> {
pub fn new(url_handlers_pool_size: usize, repo: R) -> Self {
pub fn new(url_handlers_pool_size: usize, repo: R, ct: CancellationToken) -> Self {
Self {
repo,
url_handling_semaphore: Semaphore::new(url_handlers_pool_size),
cancellation_token: ct,
}
}
pub fn regen_cancellation_token(&mut self) {
self.cancellation_token = CancellationToken::new();
}
}
#[derive(Serialize)]

Loading…
Cancel
Save