From f206262618925ab50f2d6499ad3e4632dd971b23 Mon Sep 17 00:00:00 2001 From: plazmoid Date: Tue, 4 Feb 2025 20:17:21 +0300 Subject: [PATCH] add candle appending, configs --- .env | 5 ++ Cargo.lock | 17 ++++++ Cargo.toml | 2 + src/config.rs | 24 ++++++++ src/error.rs | 3 + src/main.rs | 105 +++++++++++++++++++++++++++----- src/models.rs | 88 +++++++++++++++------------ src/poloniex.rs | 29 ++++----- src/repo.rs | 157 +++++++++++++++++++++++++++++++----------------- 9 files changed, 304 insertions(+), 126 deletions(-) create mode 100644 .env create mode 100644 src/config.rs diff --git a/.env b/.env new file mode 100644 index 0000000..abfa398 --- /dev/null +++ b/.env @@ -0,0 +1,5 @@ +export PAIR="BTC_USDT" +export INTERVAL="MINUTE_1" +export POLONIEX_REST_URL="https://api.poloniex.com" +export POLONIEX_WS_URL="wss://ws.poloniex.com/ws/public" +export DB_NAME="poloniex_data.db" \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 453a96b..cf3c6ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -228,6 +228,12 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -237,6 +243,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "envy" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f47e0157f2cb54f5ae1bd371b30a2ae4311e1c028f575cd4e81de7353215965" +dependencies = [ + "serde", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -935,6 +950,8 @@ version = "0.1.0" dependencies = [ "async-stream", "chrono", + "dotenv", + "envy", "futures-util", "reqwest", "reqwest-websocket", diff --git a/Cargo.toml b/Cargo.toml index cd826b0..d9ebc94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] async-stream = "0.3.6" chrono = { version = "0.4.39", features = ["serde"] } +dotenv = "0.15.0" +envy = "0.4.2" futures-util = "0.3.31" reqwest = { version = "0.12.12", features = ["json"] } reqwest-websocket = "0.4.4" diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..04b6871 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,24 @@ +use serde::{Deserialize, Deserializer}; +use url::Url; + +use crate::models::CandleInterval; + +#[derive(Deserialize)] +pub struct Config { + pub pair: String, + pub interval: CandleInterval, + #[serde(deserialize_with = "deser_url")] + pub poloniex_rest_url: Url, + #[serde(deserialize_with = "deser_url")] + pub poloniex_ws_url: Url, + pub db_name: String, +} + +fn deser_url<'de, D: Deserializer<'de>>(deserialize: D) -> Result { + let url = String::deserialize(deserialize)?; + url.parse().map_err(serde::de::Error::custom) +} + +pub fn get_config() -> Config { + envy::from_env().unwrap() +} diff --git a/src/error.rs b/src/error.rs index 40dbd1d..190dfaf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,4 +18,7 @@ pub enum AppError { #[error(transparent)] DbError(#[from] rusqlite::Error), + + #[error(transparent)] + StrumError(#[from] strum::ParseError), } diff --git a/src/main.rs b/src/main.rs index 4439854..82ca70a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,12 @@ -use chrono::{NaiveDate, NaiveDateTime, TimeDelta, Utc}; +use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Utc}; +use config::get_config; use error::AppResult; use futures_util::StreamExt; -use models::{Candle, CandleInterval, Pair}; +use models::{Candle, CandleInterval, PoloniuxCandle, TradeDirection}; use poloniex::PoloniexClient; use repo::Repo; +mod config; mod error; mod models; mod poloniex; @@ -12,10 +14,10 @@ mod repo; async fn fetch_candles_until_now( poloniex_client: &PoloniexClient, - pair: &Pair, + pair: &str, interval: CandleInterval, mut start_time: NaiveDateTime, -) -> AppResult> { +) -> AppResult> { let mut result = vec![]; loop { @@ -51,28 +53,35 @@ async fn fetch_candles_until_now( } async fn _main() -> AppResult<()> { - let poloniex_client = PoloniexClient::new( - "https://api.poloniex.com", - "wss://ws.poloniex.com/ws/public", - )?; - let repo = Repo::new_init("poloniex_data.db")?; + dotenv::dotenv().ok(); + + let config = get_config(); + let poloniex_client = PoloniexClient::new(&config.poloniex_rest_url, &config.poloniex_ws_url); + let repo = Repo::new_init(config.db_name)?; let start_time = NaiveDate::from_ymd_opt(2024, 12, 1) .unwrap() .and_hms_opt(0, 0, 0) .unwrap(); - let pair = Pair::new("BTC", "USDT"); + let pair = config.pair.clone(); let candles = - fetch_candles_until_now(&poloniex_client, &pair, CandleInterval::M1, start_time).await?; + fetch_candles_until_now(&poloniex_client, &pair, config.interval, start_time).await?; - println!("fetched {} candles", candles.len()); + println!( + "{pair}: fetched {} candles with interval {}", + candles.len(), + config.interval.as_ref() + ); // нельзя так делать, нужно использовать транзакцию // и батч-вставку для уменьшения количества обращений к бд, // но в контексте тестового и так сойдёт for candle in candles { - repo.insert_candle(&candle)?; + repo.upsert_candle(&Candle { + candle, + pair: pair.to_string(), + })?; } let mut trades = poloniex_client.recent_trades_stream(&pair).await?; @@ -80,9 +89,75 @@ async fn _main() -> AppResult<()> { while let Some(t) = trades.next().await { println!("{t:?}"); - if let Ok(trade) = t { - repo.insert_trade(&trade)?; + let Ok(trade) = t else { break }; + let mut last_candle = repo.get_latest_candle_from_interval(&pair, config.interval)?; + let interval_delta = match last_candle.candle.interval { + CandleInterval::M1 => TimeDelta::minutes(1), + CandleInterval::M15 => TimeDelta::minutes(15), + CandleInterval::H1 => TimeDelta::hours(1), + CandleInterval::D1 => TimeDelta::days(1), + }; + + let is_buy = matches!(trade.taker_side, TradeDirection::Buy); + + // если трейд не входит в интервал последней свечи, то создаём новую свечу, иначе обновляем предыдущую + if trade.ts > (last_candle.candle.ts + interval_delta) { + let interval_secs = match last_candle.candle.interval { + CandleInterval::M1 => 60, + CandleInterval::M15 => 60 * 15, + CandleInterval::H1 => 60 * 60, + CandleInterval::D1 => 60 * 60 * 24, + }; + let new_candle_ts = DateTime::from_timestamp( + (trade.ts.and_utc().timestamp() / interval_secs) * interval_secs, + 0, + ) + .unwrap() + .naive_utc(); + + let new_candle = Candle { + candle: PoloniuxCandle { + low: trade.price, + high: trade.price, + open: trade.price, + close: trade.price, + amount: trade.amount, + quantity: trade.quantity, + buy_taker_amount: if is_buy { trade.amount } else { 0.0 }, + buy_taker_quantity: if is_buy { trade.quantity } else { 0.0 }, + trade_count: 1, + ts: trade.ts, + weighted_average: trade.amount / trade.quantity, + interval: config.interval, + start_time: new_candle_ts, + close_time: NaiveDateTime::UNIX_EPOCH, + }, + pair: trade.symbol.clone(), + }; + + repo.upsert_candle(&new_candle)?; + } else { + last_candle.candle.low = last_candle.candle.low.min(trade.price); + last_candle.candle.high = last_candle.candle.high.max(trade.price); + last_candle.candle.close = trade.price; + last_candle.candle.amount += trade.amount; + last_candle.candle.quantity += trade.quantity; + last_candle.candle.trade_count += 1; + last_candle.candle.ts = trade.ts; + last_candle.candle.weighted_average = + last_candle.candle.amount / last_candle.candle.quantity; + last_candle.candle.interval = config.interval; + last_candle.candle.close_time = trade.ts; + + if is_buy { + last_candle.candle.buy_taker_amount += trade.amount; + last_candle.candle.buy_taker_quantity += trade.quantity; + } + + repo.upsert_candle(&last_candle)?; } + + repo.insert_trade(&trade)?; } Ok(()) } diff --git a/src/models.rs b/src/models.rs index eb06a30..988212f 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,62 +1,64 @@ -use std::fmt::{self, Display}; - use chrono::{DateTime, NaiveDateTime}; use serde::{Deserialize, Deserializer, Serialize}; use serde_tuple::Deserialize_tuple; -pub struct Pair { - pub base: String, - pub quote: String, -} - -impl Pair { - pub fn new(base: &str, quote: &str) -> Self { - Self { - base: base.to_string(), - quote: quote.to_string(), - } - } -} - -impl Display for Pair { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}_{}", self.base, self.quote) - } -} - -#[derive(strum::AsRefStr, Clone, Copy)] +#[derive(strum::EnumString, strum::AsRefStr, Clone, Copy, Deserialize, Debug)] pub enum CandleInterval { #[strum(serialize = "MINUTE_1")] + #[serde(rename = "MINUTE_1")] M1, #[strum(serialize = "MINUTE_15")] + #[serde(rename = "MINUTE_15")] M15, #[strum(serialize = "HOUR_1")] + #[serde(rename = "HOUR_1")] H1, #[strum(serialize = "DAY_1")] + #[serde(rename = "DAY_1")] D1, } #[derive(Debug, Deserialize_tuple)] -pub struct Candle { - pub low: String, - pub high: String, - pub open: String, - pub close: String, - pub amount: String, - pub quantity: String, - pub buy_taker_amount: String, - pub buy_taker_quantity: String, +pub struct PoloniuxCandle { + #[serde(deserialize_with = "deser_str_to_int")] + pub low: f64, + #[serde(deserialize_with = "deser_str_to_int")] + pub high: f64, + #[serde(deserialize_with = "deser_str_to_int")] + pub open: f64, + #[serde(deserialize_with = "deser_str_to_int")] + pub close: f64, + #[serde(deserialize_with = "deser_str_to_int")] + pub amount: f64, + #[serde(deserialize_with = "deser_str_to_int")] + pub quantity: f64, + #[serde(deserialize_with = "deser_str_to_int")] + pub buy_taker_amount: f64, + #[serde(deserialize_with = "deser_str_to_int")] + pub buy_taker_quantity: f64, pub trade_count: i64, #[serde(deserialize_with = "deser_naive_dt")] pub ts: NaiveDateTime, - pub weighted_average: String, - pub interval: String, + #[serde(deserialize_with = "deser_str_to_int")] + pub weighted_average: f64, + pub interval: CandleInterval, #[serde(deserialize_with = "deser_naive_dt")] pub start_time: NaiveDateTime, #[serde(deserialize_with = "deser_naive_dt")] pub close_time: NaiveDateTime, } +pub struct Candle { + pub candle: PoloniuxCandle, + pub pair: String, +} + +fn deser_str_to_int<'de, D: Deserializer<'de>>(deserialize: D) -> Result { + let s = String::deserialize(deserialize)?; + s.parse() + .map_err(|e| serde::de::Error::custom(format!("{e}: {s}"))) +} + fn deser_naive_dt<'de, D: Deserializer<'de>>(deserialize: D) -> Result { let ts = Deserialize::deserialize(deserialize)?; DateTime::from_timestamp_millis(ts) @@ -68,17 +70,27 @@ fn deser_naive_dt<'de, D: Deserializer<'de>>(deserialize: D) -> Result AppResult { - let rest_base_url = rest_base_url.parse()?; - let ws_base_url = ws_base_url.parse()?; - - Ok(Self { - rest_base_url, - ws_base_url, - }) + pub fn new(rest_base_url: &Url, ws_base_url: &Url) -> Self { + Self { + rest_base_url: rest_base_url.to_owned(), + ws_base_url: ws_base_url.to_owned(), + } } pub async fn get_historical_candles( &self, - pair: &Pair, + pair: &str, interval: CandleInterval, start_date: NaiveDateTime, end_date: NaiveDateTime, - ) -> AppResult> { + ) -> AppResult> { let mut req = self .rest_base_url - .join(&format!("/markets/{}/candles", pair.to_string()))?; + .join(&format!("/markets/{}/candles", pair))?; req.query_pairs_mut() .append_pair( @@ -64,7 +60,7 @@ impl PoloniexClient { pub async fn recent_trades_stream( &self, - pair: &Pair, + pair: &str, ) -> AppResult>>>> { let mut ws = reqwest_websocket::websocket(self.ws_base_url.clone()).await?; @@ -77,7 +73,7 @@ impl PoloniexClient { ws.send(Message::Text(serde_json::to_string(&req)?)).await?; Ok(Box::pin(async_stream::stream! { - while let Some(message) = ws.try_next().await.unwrap() { + while let Some(message) = ws.try_next().await? { match message { Message::Text(text) => { if let Ok(sub) = serde_json::from_str::(&text) { @@ -85,7 +81,6 @@ impl PoloniexClient { continue } - dbg!(&text); let trades = serde_json::from_str::>(&text)?; for trade in trades.data { yield Ok(trade) diff --git a/src/repo.rs b/src/repo.rs index cce06b0..c2cf2d1 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,10 +1,11 @@ -use std::{fs, path::Path}; +use std::{fs, path::Path, str::FromStr}; +use chrono::DateTime; use rusqlite::{self, params, Connection}; use crate::{ error::{AppError, AppResult}, - models::{Candle, Trade}, + models::{Candle, CandleInterval, PoloniuxCandle, Trade}, }; pub struct Repo { @@ -20,49 +21,49 @@ impl Repo { let conn = Connection::open(path)?; - conn.execute( - " - CREATE TABLE IF NOT EXISTS trades( - symbol TEXT NOT NULL, - amount TEXT NOT NULL, - taker_side TEXT NOT NULL, - quantity TEXT NOT NULL, - create_time INT NOT NULL, - price TEXT NOT NULL, - id TEXT NOT NULL, - ts INT NOT NULL - ); - ", - [], - )?; - conn.execute( - " - CREATE TABLE IF NOT EXISTS candles( - low TEXT NOT NULL, - high TEXT NOT NULL, - open TEXT NOT NULL, - close TEXT NOT NULL, - amount TEXT NOT NULL, - quantity TEXT NOT NULL, - buy_taker_amount TEXT NOT NULL, - buy_taker_quantity TEXT NOT NULL, - trade_count INT NOT NULL, - ts INT NOT NULL, - weighted_average TEXT NOT NULL, - interval TEXT NOT NULL, - start_time INT NOT NULL, - close_time INT NOT NULL - ); - ", - [], + conn.execute_batch( + "BEGIN; + + CREATE TABLE IF NOT EXISTS trades( + symbol TEXT NOT NULL, + amount REAL NOT NULL, + taker_side TEXT NOT NULL, + quantity REAL NOT NULL, + create_time INT NOT NULL, + price REAL NOT NULL, + id TEXT NOT NULL, + ts INT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS candles( + low REAL NOT NULL, + high REAL NOT NULL, + open REAL NOT NULL, + close REAL NOT NULL, + amount REAL NOT NULL, + quantity REAL NOT NULL, + buy_taker_amount REAL NOT NULL, + buy_taker_quantity REAL NOT NULL, + trade_count INT NOT NULL, + ts INT NOT NULL, + weighted_average REAL NOT NULL, + interval TEXT NOT NULL, + start_time INT NOT NULL, + close_time INT NOT NULL, + pair TEXT NOT NULL, + + PRIMARY KEY(pair, interval, start_time) + ); + + COMMIT;", )?; Ok(Self { conn }) } - pub fn insert_candle(&self, candle: &Candle) -> AppResult { + pub fn upsert_candle(&self, candle: &Candle) -> AppResult { let q = " - INSERT INTO candles( + REPLACE INTO candles( low, high, open, @@ -76,7 +77,8 @@ impl Repo { weighted_average, interval, start_time, - close_time + close_time, + pair ) VALUES ( ?1, ?2, @@ -91,27 +93,29 @@ impl Repo { ?11, ?12, ?13, - ?14 + ?14, + ?15 ) "; self.conn .execute( q, params![ - &candle.low, - &candle.high, - &candle.open, - &candle.close, - &candle.amount, - &candle.quantity, - &candle.buy_taker_amount, - &candle.buy_taker_quantity, - &candle.trade_count, - &candle.ts.and_utc().timestamp_millis(), - &candle.weighted_average, - &candle.interval, - &candle.start_time.and_utc().timestamp_millis(), - &candle.close_time.and_utc().timestamp_millis(), + &candle.candle.low, + &candle.candle.high, + &candle.candle.open, + &candle.candle.close, + &candle.candle.amount, + &candle.candle.quantity, + &candle.candle.buy_taker_amount, + &candle.candle.buy_taker_quantity, + &candle.candle.trade_count, + &candle.candle.ts.and_utc().timestamp_millis(), + &candle.candle.weighted_average, + &candle.candle.interval.as_ref(), + &candle.candle.start_time.and_utc().timestamp_millis(), + &candle.candle.close_time.and_utc().timestamp_millis(), + &candle.pair ], ) .map_err(AppError::from) @@ -146,7 +150,7 @@ impl Repo { params![ &trade.symbol, &trade.amount, - &trade.taker_side, + &trade.taker_side.as_ref(), &trade.quantity, &trade.create_time.and_utc().timestamp_millis(), &trade.price, @@ -156,4 +160,45 @@ impl Repo { ) .map_err(AppError::from) } + + pub fn get_latest_candle_from_interval( + &self, + pair: &str, + interval: CandleInterval, + ) -> AppResult { + let q = " + SELECT * FROM candles + WHERE pair = ?1 AND interval = ?2 + "; + + self.conn + .query_row(&q, params![pair, interval.as_ref()], |row| { + Ok(Candle { + candle: PoloniuxCandle { + low: row.get(0)?, + high: row.get(1)?, + open: row.get(2)?, + close: row.get(3)?, + amount: row.get(4)?, + quantity: row.get(5)?, + buy_taker_amount: row.get(6)?, + buy_taker_quantity: row.get(7)?, + trade_count: row.get(8)?, + ts: DateTime::from_timestamp(row.get(9)?, 0) + .unwrap() + .naive_local(), + weighted_average: row.get(10)?, + interval: FromStr::from_str(&row.get::<_, String>(11)?).unwrap(), + start_time: DateTime::from_timestamp(row.get(12)?, 0) + .unwrap() + .naive_local(), + close_time: DateTime::from_timestamp(row.get(13)?, 0) + .unwrap() + .naive_local(), + }, + pair: row.get(14)?, + }) + }) + .map_err(AppError::from) + } }