bugfix in pulling candles

master
plazmoid 4 weeks ago
parent 2a193ba016
commit d7a17e7c32
  1. 1
      .env
  2. 2
      src/config.rs
  3. 61
      src/main.rs
  4. 13
      src/models.rs

@ -1,3 +1,4 @@
export START_FROM="2025-02-01T00:00:00"
export PAIRS="BTC_USDT,TRX_USDT,ETH_USDT,DOGE_USDT,BCH_USDT"
export INTERVALS="MINUTE_1,MINUTE_15,HOUR_1,DAY_1"
export DATABASE_URL="sqlite://./poloniex_data.db"

@ -1,3 +1,4 @@
use chrono::NaiveDateTime;
use serde::{Deserialize, Deserializer};
use url::Url;
@ -8,6 +9,7 @@ use crate::{
#[derive(Deserialize)]
pub struct Config {
pub start_from: NaiveDateTime,
pub pairs: Vec<String>,
pub intervals: Vec<CandleInterval>,
#[serde(deserialize_with = "deser_url")]

@ -1,6 +1,6 @@
use std::sync::Arc;
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Utc};
use chrono::{DateTime, NaiveDateTime, Utc};
use config::get_config;
use error::AppResult;
use futures_util::{future::try_join_all, StreamExt};
@ -21,49 +21,35 @@ async fn fetch_candles_until_now(
market_client: Arc<impl Market>,
pair: String,
interval: CandleInterval,
mut start_time: NaiveDateTime,
start_time: NaiveDateTime,
) -> AppResult<(Vec<Candle>, String)> {
let mut result = vec![];
let limit = 500;
let mut end_time = Utc::now().naive_utc();
let mut is_completed = false;
// max limit is 500
let limit = 500;
while !is_completed {
println!(
"{}:{}: pulling candles from {start_time}",
"{}:{}: pulling candles from [{start_time}, {end_time}]",
pair,
interval.as_ref()
);
let candles = market_client
.get_historical_candles(&pair, interval, start_time, Utc::now().naive_utc(), limit)
.get_historical_candles(&pair, interval, start_time, end_time, limit)
.await?;
let Some(last_candle) = candles.last() else {
// больше нет свечей, скачали все возможные
break;
};
let last_candle_ts = last_candle.ts.and_utc().timestamp();
let now = Utc::now().timestamp();
if last_candle_ts < now {
// если какие-то свечки недополучили из-за лимитов,
// смещаем запрашиваемый временной отрезок вправо
start_time = last_candle.ts + TimeDelta::seconds(1);
// если какие-то свечки недополучили из-за лимитов,
// смещаем запрашиваемый временной отрезок влево
let time_offset = interval.as_timedelta() * limit as i32;
end_time -= time_offset;
if start_time.and_utc().timestamp() >= now {
is_completed = true;
}
} else {
if end_time < start_time {
is_completed = true;
}
let candles_count = candles.len();
result.extend(candles);
if candles_count < limit {
is_completed = true;
}
}
println!(
@ -82,12 +68,7 @@ async fn calculate_new_candles(
) -> AppResult<()> {
let is_buy = matches!(trade.taker_side, TradeDirection::Buy);
let insert_new_candle = || async {
let interval_secs = match interval {
CandleInterval::M1 => 60,
CandleInterval::M15 => 60 * 15,
CandleInterval::H1 => 60 * 60,
CandleInterval::D1 => 60 * 60 * 24,
};
let interval_secs = interval.as_timedelta().num_seconds();
let new_candle_ts = DateTime::from_timestamp(
(trade.ts.and_utc().timestamp() / interval_secs) * interval_secs,
0,
@ -125,12 +106,7 @@ async fn calculate_new_candles(
.await?;
if let Some(mut last_candle) = last_candle {
let interval_delta = match last_candle.candle.interval {
CandleInterval::M1 => TimeDelta::minutes(1),
CandleInterval::M15 => TimeDelta::minutes(15),
CandleInterval::H1 => TimeDelta::hours(1),
CandleInterval::D1 => TimeDelta::days(1),
};
let interval_delta = last_candle.candle.interval.as_timedelta();
// если трейд не входит в интервал последней свечи, то создаём новую свечу, иначе обновляем предыдущую
if trade.ts > (last_candle.candle.ts + interval_delta) {
@ -173,11 +149,6 @@ async fn _main() -> AppResult<()> {
&config.poloniex_ws_url,
));
let repo = Arc::new(SqliteRepo::new_migrate(&config.database_url).await?);
let base_start_time = NaiveDate::from_ymd_opt(2024, 12, 1)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
let mut fetchers = vec![];
for pair in &config.pairs {
@ -189,7 +160,7 @@ async fn _main() -> AppResult<()> {
match last_candle {
Some(c) => c.candle.start_time,
None => base_start_time,
None => config.start_from,
}
};

@ -1,4 +1,4 @@
use chrono::{DateTime, NaiveDateTime};
use chrono::{DateTime, NaiveDateTime, TimeDelta};
use serde::{Deserialize, Deserializer, Serialize};
use serde_tuple::Deserialize_tuple;
use sqlx::prelude::{FromRow, Type};
@ -19,6 +19,17 @@ pub enum CandleInterval {
D1,
}
impl CandleInterval {
pub fn as_timedelta(&self) -> TimeDelta {
match self {
CandleInterval::M1 => TimeDelta::minutes(1),
CandleInterval::M15 => TimeDelta::minutes(15),
CandleInterval::H1 => TimeDelta::hours(1),
CandleInterval::D1 => TimeDelta::days(1),
}
}
}
#[derive(Debug, Deserialize_tuple, FromRow)]
pub struct Candle {
#[serde(deserialize_with = "deser_str_to_int")]

Loading…
Cancel
Save