diff --git a/.env b/.env index abfa398..0cb9884 100644 --- a/.env +++ b/.env @@ -1,5 +1,6 @@ -export PAIR="BTC_USDT" -export INTERVAL="MINUTE_1" +export PAIRS="BTC_USDT,TRX_USDT,ETH_USDT,DOGE_USDT,BCH_USDT" +export INTERVALS="MINUTE_1,MINUTE_15,HOUR_1,DAY_1" +export DB_NAME="./poloniex_data.db" + export POLONIEX_REST_URL="https://api.poloniex.com" -export POLONIEX_WS_URL="wss://ws.poloniex.com/ws/public" -export DB_NAME="poloniex_data.db" \ No newline at end of file +export POLONIEX_WS_URL="wss://ws.poloniex.com/ws/public" \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index cf3c6ff..7389f91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,6 +54,17 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "async-trait" +version = "0.1.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644dd749086bf3771a2fbc5f256fdb982d53f011c7d5d560304eafeecebce79d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "async-tungstenite" version = "0.28.2" @@ -949,6 +960,7 @@ name = "poloniex_dump" version = "0.1.0" dependencies = [ "async-stream", + "async-trait", "chrono", "dotenv", "envy", diff --git a/Cargo.toml b/Cargo.toml index d9ebc94..cf0fb8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] async-stream = "0.3.6" +async-trait = "0.1.86" chrono = { version = "0.4.39", features = ["serde"] } dotenv = "0.15.0" envy = "0.4.2" diff --git a/src/config.rs b/src/config.rs index 04b6871..86b7ed4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,12 +1,15 @@ use serde::{Deserialize, Deserializer}; use url::Url; -use crate::models::CandleInterval; +use crate::{ + error::{AppError, AppResult}, + models::CandleInterval, +}; #[derive(Deserialize)] pub struct Config { - pub pair: String, - pub interval: CandleInterval, + pub pairs: Vec, + pub intervals: Vec, #[serde(deserialize_with = "deser_url")] pub poloniex_rest_url: Url, #[serde(deserialize_with = "deser_url")] @@ -19,6 +22,6 @@ fn deser_url<'de, D: Deserializer<'de>>(deserialize: D) -> Result url.parse().map_err(serde::de::Error::custom) } -pub fn get_config() -> Config { - envy::from_env().unwrap() +pub fn get_config() -> AppResult { + envy::from_env().map_err(AppError::from) } diff --git a/src/error.rs b/src/error.rs index 190dfaf..fac890d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -21,4 +21,7 @@ pub enum AppError { #[error(transparent)] StrumError(#[from] strum::ParseError), + + #[error(transparent)] + ConfigError(#[from] envy::Error), } diff --git a/src/main.rs b/src/main.rs index e3ad5a7..1ece8c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,30 +1,35 @@ +use std::sync::Arc; + use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Utc}; use config::get_config; use error::AppResult; -use futures_util::StreamExt; -use models::{Candle, CandleInterval, PoloniuxCandle, TradeDirection}; -use poloniex::PoloniexClient; -use repo::Repo; +use futures_util::{future::try_join_all, StreamExt}; +use markets::{poloniex::PoloniexClient, Market}; +use models::{Candle, CandleExtended, CandleInterval, TradeDirection}; +use repos::{sqlite::SqliteRepo, Repo}; mod config; mod error; +mod markets; mod models; -mod poloniex; -mod repo; +mod repos; + +#[macro_use] +extern crate async_trait; async fn fetch_candles_until_now( - poloniex_client: &PoloniexClient, - pair: &str, + market_client: Arc, + pair: String, interval: CandleInterval, mut start_time: NaiveDateTime, -) -> AppResult> { +) -> AppResult<(Vec, String)> { let mut result = vec![]; let limit = 500; loop { println!("pulling candles from {start_time}"); - let candles = poloniex_client - .get_historical_candles(pair, interval, start_time, Utc::now().naive_utc(), limit) + let candles = market_client + .get_historical_candles(&pair, interval, start_time, Utc::now().naive_utc(), limit) .await?; let Some(last_candle) = candles.last() else { @@ -56,48 +61,22 @@ async fn fetch_candles_until_now( } } - Ok(result) + Ok((result, pair.to_string())) } -async fn _main() -> AppResult<()> { - dotenv::dotenv().ok(); - - let config = get_config(); - let poloniex_client = PoloniexClient::new(&config.poloniex_rest_url, &config.poloniex_ws_url); - let repo = Repo::new_init(config.db_name)?; - - let start_time = NaiveDate::from_ymd_opt(2024, 12, 1) - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap(); - - let pair = config.pair.clone(); - let candles = - fetch_candles_until_now(&poloniex_client, &pair, config.interval, start_time).await?; - - println!( - "{pair}: fetched {} candles with interval {}", - candles.len(), - config.interval.as_ref() - ); - - // нельзя так делать, нужно использовать транзакцию - // и батч-вставку для уменьшения количества обращений к бд, - // но в контексте тестового и так сойдёт - for candle in candles { - repo.upsert_candle(&Candle { - candle, - pair: pair.to_string(), - })?; - } - - let mut trades = poloniex_client.recent_trades_stream(&pair).await?; +async fn trades_processor( + repo: Arc, + market_client: Arc, + pairs: &[String], + interval: CandleInterval, +) -> AppResult<()> { + let mut trades = market_client.recent_trades_stream(&pairs).await?; while let Some(t) = trades.next().await { println!("{t:?}"); let Ok(trade) = t else { break }; - let mut last_candle = repo.get_latest_candle_from_interval(&pair, config.interval)?; + let mut last_candle = repo.get_latest_candle_from_interval(&trade.symbol, interval)?; let interval_delta = match last_candle.candle.interval { CandleInterval::M1 => TimeDelta::minutes(1), CandleInterval::M15 => TimeDelta::minutes(15), @@ -122,8 +101,8 @@ async fn _main() -> AppResult<()> { .unwrap() .naive_utc(); - let new_candle = Candle { - candle: PoloniuxCandle { + let new_candle = CandleExtended { + candle: Candle { low: trade.price, high: trade.price, open: trade.price, @@ -135,7 +114,7 @@ async fn _main() -> AppResult<()> { trade_count: 1, ts: trade.ts, weighted_average: trade.amount / trade.quantity, - interval: config.interval, + interval, start_time: new_candle_ts, close_time: NaiveDateTime::UNIX_EPOCH, }, @@ -153,7 +132,7 @@ async fn _main() -> AppResult<()> { last_candle.candle.ts = trade.ts; last_candle.candle.weighted_average = last_candle.candle.amount / last_candle.candle.quantity; - last_candle.candle.interval = config.interval; + last_candle.candle.interval = interval; last_candle.candle.close_time = trade.ts; if is_buy { @@ -166,6 +145,72 @@ async fn _main() -> AppResult<()> { repo.insert_trade(&trade)?; } + + Ok(()) +} + +async fn _main() -> AppResult<()> { + dotenv::dotenv().ok(); + + let config = get_config()?; + let poloniex_client = Arc::new(PoloniexClient::new( + &config.poloniex_rest_url, + &config.poloniex_ws_url, + )); + let repo = Arc::new(SqliteRepo::new_init(config.db_name)?); + + let start_time = NaiveDate::from_ymd_opt(2024, 12, 1) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + + let mut fetchers = vec![]; + + for pair in &config.pairs { + for interval in &config.intervals { + let fetcher = fetch_candles_until_now( + poloniex_client.clone(), + pair.to_string(), + *interval, + start_time, + ); + fetchers.push(fetcher); + } + } + let fetched_candles = try_join_all(fetchers).await?; + + // println!( + // "{pair}: fetched {} candles with interval {}", + // candles.len(), + // config.interval.as_ref() + // ); + + // нельзя так делать, нужно использовать транзакцию + // и батч-вставку для уменьшения количества обращений к бд, + // но в контексте тестового и так сойдёт + for (candles, pair) in fetched_candles { + for candle in candles { + repo.upsert_candle(&CandleExtended { + candle, + pair: pair.clone(), + })?; + } + } + + for interval in &config.intervals { + tokio::spawn({ + let poloniex_client = poloniex_client.clone(); + let repo = repo.clone(); + let pairs = config.pairs.clone(); + let interval = *interval; + async move { + let result = trades_processor(repo, poloniex_client, &pairs, interval).await; + if let Err(e) = result { + eprintln!("processor stopped with error: {e}") + } + } + }); + } Ok(()) } diff --git a/src/markets/mod.rs b/src/markets/mod.rs new file mode 100644 index 0000000..ebb68df --- /dev/null +++ b/src/markets/mod.rs @@ -0,0 +1,28 @@ +use std::pin::Pin; + +use chrono::NaiveDateTime; +use futures_util::Stream; + +use crate::{ + error::AppResult, + models::{Candle, CandleInterval, Trade}, +}; + +pub mod poloniex; + +#[async_trait] +pub trait Market { + async fn get_historical_candles( + &self, + pair: &str, + interval: CandleInterval, + start_date: NaiveDateTime, + end_date: NaiveDateTime, + limit: usize, + ) -> AppResult>; + + async fn recent_trades_stream( + &self, + pairs: &[String], + ) -> AppResult> + Send>>>; +} diff --git a/src/poloniex.rs b/src/markets/poloniex.rs similarity index 87% rename from src/poloniex.rs rename to src/markets/poloniex.rs index 95816ee..924f661 100644 --- a/src/poloniex.rs +++ b/src/markets/poloniex.rs @@ -8,11 +8,13 @@ use reqwest_websocket::Message; use crate::{ error::{AppError, AppResult}, models::{ - CandleInterval, PoloniuxCandle, SubscriptionRequest, SubscriptionResponse, + Candle, CandleInterval, SubscriptionRequest, SubscriptionResponse, SubscriptionResponseData, Trade, }, }; +use super::Market; + pub struct PoloniexClient { rest_base_url: Url, ws_base_url: Url, @@ -25,15 +27,18 @@ impl PoloniexClient { ws_base_url: ws_base_url.to_owned(), } } +} - pub async fn get_historical_candles( +#[async_trait] +impl Market for PoloniexClient { + async fn get_historical_candles( &self, pair: &str, interval: CandleInterval, start_date: NaiveDateTime, end_date: NaiveDateTime, limit: usize, - ) -> AppResult> { + ) -> AppResult> { let mut req = self .rest_base_url .join(&format!("/markets/{}/candles", pair))?; @@ -59,16 +64,16 @@ impl PoloniexClient { result.json().await.map_err(AppError::from) } - pub async fn recent_trades_stream( + async fn recent_trades_stream( &self, - pair: &str, - ) -> AppResult>>>> { + pairs: &[String], + ) -> AppResult> + Send>>> { let mut ws = reqwest_websocket::websocket(self.ws_base_url.clone()).await?; let req = SubscriptionRequest { event: "subscribe".to_string(), channel: vec!["trades".to_string()], - symbols: vec![pair.to_string()], + symbols: pairs.to_vec(), }; ws.send(Message::Text(serde_json::to_string(&req)?)).await?; diff --git a/src/models.rs b/src/models.rs index 988212f..6628518 100644 --- a/src/models.rs +++ b/src/models.rs @@ -19,7 +19,7 @@ pub enum CandleInterval { } #[derive(Debug, Deserialize_tuple)] -pub struct PoloniuxCandle { +pub struct Candle { #[serde(deserialize_with = "deser_str_to_int")] pub low: f64, #[serde(deserialize_with = "deser_str_to_int")] @@ -48,8 +48,8 @@ pub struct PoloniuxCandle { pub close_time: NaiveDateTime, } -pub struct Candle { - pub candle: PoloniuxCandle, +pub struct CandleExtended { + pub candle: Candle, pub pair: String, } diff --git a/src/repos/mod.rs b/src/repos/mod.rs new file mode 100644 index 0000000..2acbcc3 --- /dev/null +++ b/src/repos/mod.rs @@ -0,0 +1,18 @@ +use crate::{ + error::AppResult, + models::{CandleExtended, CandleInterval, Trade}, +}; + +pub mod sqlite; + +pub trait Repo { + fn upsert_candle(&self, candle: &CandleExtended) -> AppResult; + + fn insert_trade(&self, trade: &Trade) -> AppResult; + + fn get_latest_candle_from_interval( + &self, + pair: &str, + interval: CandleInterval, + ) -> AppResult; +} diff --git a/src/repo.rs b/src/repos/sqlite.rs similarity index 92% rename from src/repo.rs rename to src/repos/sqlite.rs index c2cf2d1..9fca5ae 100644 --- a/src/repo.rs +++ b/src/repos/sqlite.rs @@ -5,14 +5,16 @@ use rusqlite::{self, params, Connection}; use crate::{ error::{AppError, AppResult}, - models::{Candle, CandleInterval, PoloniuxCandle, Trade}, + models::{Candle, CandleExtended, CandleInterval, Trade}, }; -pub struct Repo { +use super::Repo; + +pub struct SqliteRepo { conn: Connection, } -impl Repo { +impl SqliteRepo { pub fn new_init(db_path: impl AsRef) -> AppResult { let path = db_path.as_ref(); @@ -60,8 +62,10 @@ impl Repo { Ok(Self { conn }) } +} - pub fn upsert_candle(&self, candle: &Candle) -> AppResult { +impl Repo for SqliteRepo { + fn upsert_candle(&self, candle: &CandleExtended) -> AppResult { let q = " REPLACE INTO candles( low, @@ -121,7 +125,7 @@ impl Repo { .map_err(AppError::from) } - pub fn insert_trade(&self, trade: &Trade) -> AppResult { + fn insert_trade(&self, trade: &Trade) -> AppResult { let q = " INSERT INTO trades( symbol, @@ -161,11 +165,11 @@ impl Repo { .map_err(AppError::from) } - pub fn get_latest_candle_from_interval( + fn get_latest_candle_from_interval( &self, pair: &str, interval: CandleInterval, - ) -> AppResult { + ) -> AppResult { let q = " SELECT * FROM candles WHERE pair = ?1 AND interval = ?2 @@ -173,8 +177,8 @@ impl Repo { self.conn .query_row(&q, params![pair, interval.as_ref()], |row| { - Ok(Candle { - candle: PoloniuxCandle { + Ok(CandleExtended { + candle: Candle { low: row.get(0)?, high: row.get(1)?, open: row.get(2)?,