From d7a17e7c324bedf01f8386efb4ab1853426a0bcf Mon Sep 17 00:00:00 2001 From: plazmoid Date: Mon, 10 Feb 2025 13:35:01 +0300 Subject: [PATCH] bugfix in pulling candles --- .env | 1 + src/config.rs | 2 ++ src/main.rs | 61 ++++++++++++++------------------------------------- src/models.rs | 13 ++++++++++- 4 files changed, 31 insertions(+), 46 deletions(-) diff --git a/.env b/.env index d990498..01bed97 100644 --- a/.env +++ b/.env @@ -1,3 +1,4 @@ +export START_FROM="2025-02-01T00:00:00" export PAIRS="BTC_USDT,TRX_USDT,ETH_USDT,DOGE_USDT,BCH_USDT" export INTERVALS="MINUTE_1,MINUTE_15,HOUR_1,DAY_1" export DATABASE_URL="sqlite://./poloniex_data.db" diff --git a/src/config.rs b/src/config.rs index e35a09a..375ba23 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,3 +1,4 @@ +use chrono::NaiveDateTime; use serde::{Deserialize, Deserializer}; use url::Url; @@ -8,6 +9,7 @@ use crate::{ #[derive(Deserialize)] pub struct Config { + pub start_from: NaiveDateTime, pub pairs: Vec, pub intervals: Vec, #[serde(deserialize_with = "deser_url")] diff --git a/src/main.rs b/src/main.rs index ea8c9a7..bcb7faf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use config::get_config; use error::AppResult; use futures_util::{future::try_join_all, StreamExt}; @@ -21,49 +21,35 @@ async fn fetch_candles_until_now( market_client: Arc, pair: String, interval: CandleInterval, - mut start_time: NaiveDateTime, + start_time: NaiveDateTime, ) -> AppResult<(Vec, String)> { let mut result = vec![]; - let limit = 500; + let mut end_time = Utc::now().naive_utc(); let mut is_completed = false; + // max limit is 500 + let limit = 500; + while !is_completed { println!( - "{}:{}: pulling candles from {start_time}", + "{}:{}: pulling candles from [{start_time}, {end_time}]", pair, interval.as_ref() ); let candles = market_client - .get_historical_candles(&pair, interval, start_time, Utc::now().naive_utc(), limit) + .get_historical_candles(&pair, interval, start_time, end_time, limit) .await?; - let Some(last_candle) = candles.last() else { - // больше нет свечей, скачали все возможные - break; - }; - - let last_candle_ts = last_candle.ts.and_utc().timestamp(); - let now = Utc::now().timestamp(); - - if last_candle_ts < now { - // если какие-то свечки недополучили из-за лимитов, - // смещаем запрашиваемый временной отрезок вправо - start_time = last_candle.ts + TimeDelta::seconds(1); + // если какие-то свечки недополучили из-за лимитов, + // смещаем запрашиваемый временной отрезок влево + let time_offset = interval.as_timedelta() * limit as i32; + end_time -= time_offset; - if start_time.and_utc().timestamp() >= now { - is_completed = true; - } - } else { + if end_time < start_time { is_completed = true; } - let candles_count = candles.len(); - result.extend(candles); - - if candles_count < limit { - is_completed = true; - } } println!( @@ -82,12 +68,7 @@ async fn calculate_new_candles( ) -> AppResult<()> { let is_buy = matches!(trade.taker_side, TradeDirection::Buy); let insert_new_candle = || async { - let interval_secs = match interval { - CandleInterval::M1 => 60, - CandleInterval::M15 => 60 * 15, - CandleInterval::H1 => 60 * 60, - CandleInterval::D1 => 60 * 60 * 24, - }; + let interval_secs = interval.as_timedelta().num_seconds(); let new_candle_ts = DateTime::from_timestamp( (trade.ts.and_utc().timestamp() / interval_secs) * interval_secs, 0, @@ -125,12 +106,7 @@ async fn calculate_new_candles( .await?; if let Some(mut last_candle) = last_candle { - let interval_delta = match last_candle.candle.interval { - CandleInterval::M1 => TimeDelta::minutes(1), - CandleInterval::M15 => TimeDelta::minutes(15), - CandleInterval::H1 => TimeDelta::hours(1), - CandleInterval::D1 => TimeDelta::days(1), - }; + let interval_delta = last_candle.candle.interval.as_timedelta(); // если трейд не входит в интервал последней свечи, то создаём новую свечу, иначе обновляем предыдущую if trade.ts > (last_candle.candle.ts + interval_delta) { @@ -173,11 +149,6 @@ async fn _main() -> AppResult<()> { &config.poloniex_ws_url, )); let repo = Arc::new(SqliteRepo::new_migrate(&config.database_url).await?); - let base_start_time = NaiveDate::from_ymd_opt(2024, 12, 1) - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap(); - let mut fetchers = vec![]; for pair in &config.pairs { @@ -189,7 +160,7 @@ async fn _main() -> AppResult<()> { match last_candle { Some(c) => c.candle.start_time, - None => base_start_time, + None => config.start_from, } }; diff --git a/src/models.rs b/src/models.rs index dda8e8c..0ce89ed 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, NaiveDateTime}; +use chrono::{DateTime, NaiveDateTime, TimeDelta}; use serde::{Deserialize, Deserializer, Serialize}; use serde_tuple::Deserialize_tuple; use sqlx::prelude::{FromRow, Type}; @@ -19,6 +19,17 @@ pub enum CandleInterval { D1, } +impl CandleInterval { + pub fn as_timedelta(&self) -> TimeDelta { + match self { + CandleInterval::M1 => TimeDelta::minutes(1), + CandleInterval::M15 => TimeDelta::minutes(15), + CandleInterval::H1 => TimeDelta::hours(1), + CandleInterval::D1 => TimeDelta::days(1), + } + } +} + #[derive(Debug, Deserialize_tuple, FromRow)] pub struct Candle { #[serde(deserialize_with = "deser_str_to_int")]