add more interfaces & fixes

master
plazmoid 1 month ago
parent ebc5b14e42
commit 65cb0d875b
  1. 9
      .env
  2. 12
      Cargo.lock
  3. 1
      Cargo.toml
  4. 13
      src/config.rs
  5. 3
      src/error.rs
  6. 145
      src/main.rs
  7. 28
      src/markets/mod.rs
  8. 19
      src/markets/poloniex.rs
  9. 6
      src/models.rs
  10. 18
      src/repos/mod.rs
  11. 22
      src/repos/sqlite.rs

@ -1,5 +1,6 @@
export PAIR="BTC_USDT" export PAIRS="BTC_USDT,TRX_USDT,ETH_USDT,DOGE_USDT,BCH_USDT"
export INTERVAL="MINUTE_1" export INTERVALS="MINUTE_1,MINUTE_15,HOUR_1,DAY_1"
export DB_NAME="./poloniex_data.db"
export POLONIEX_REST_URL="https://api.poloniex.com" export POLONIEX_REST_URL="https://api.poloniex.com"
export POLONIEX_WS_URL="wss://ws.poloniex.com/ws/public" export POLONIEX_WS_URL="wss://ws.poloniex.com/ws/public"
export DB_NAME="poloniex_data.db"

12
Cargo.lock generated

@ -54,6 +54,17 @@ dependencies = [
"syn 2.0.98", "syn 2.0.98",
] ]
[[package]]
name = "async-trait"
version = "0.1.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644dd749086bf3771a2fbc5f256fdb982d53f011c7d5d560304eafeecebce79d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.98",
]
[[package]] [[package]]
name = "async-tungstenite" name = "async-tungstenite"
version = "0.28.2" version = "0.28.2"
@ -949,6 +960,7 @@ name = "poloniex_dump"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"async-trait",
"chrono", "chrono",
"dotenv", "dotenv",
"envy", "envy",

@ -5,6 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
async-stream = "0.3.6" async-stream = "0.3.6"
async-trait = "0.1.86"
chrono = { version = "0.4.39", features = ["serde"] } chrono = { version = "0.4.39", features = ["serde"] }
dotenv = "0.15.0" dotenv = "0.15.0"
envy = "0.4.2" envy = "0.4.2"

@ -1,12 +1,15 @@
use serde::{Deserialize, Deserializer}; use serde::{Deserialize, Deserializer};
use url::Url; use url::Url;
use crate::models::CandleInterval; use crate::{
error::{AppError, AppResult},
models::CandleInterval,
};
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct Config { pub struct Config {
pub pair: String, pub pairs: Vec<String>,
pub interval: CandleInterval, pub intervals: Vec<CandleInterval>,
#[serde(deserialize_with = "deser_url")] #[serde(deserialize_with = "deser_url")]
pub poloniex_rest_url: Url, pub poloniex_rest_url: Url,
#[serde(deserialize_with = "deser_url")] #[serde(deserialize_with = "deser_url")]
@ -19,6 +22,6 @@ fn deser_url<'de, D: Deserializer<'de>>(deserialize: D) -> Result<Url, D::Error>
url.parse().map_err(serde::de::Error::custom) url.parse().map_err(serde::de::Error::custom)
} }
pub fn get_config() -> Config { pub fn get_config() -> AppResult<Config> {
envy::from_env().unwrap() envy::from_env().map_err(AppError::from)
} }

@ -21,4 +21,7 @@ pub enum AppError {
#[error(transparent)] #[error(transparent)]
StrumError(#[from] strum::ParseError), StrumError(#[from] strum::ParseError),
#[error(transparent)]
ConfigError(#[from] envy::Error),
} }

@ -1,30 +1,35 @@
use std::sync::Arc;
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Utc}; use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Utc};
use config::get_config; use config::get_config;
use error::AppResult; use error::AppResult;
use futures_util::StreamExt; use futures_util::{future::try_join_all, StreamExt};
use models::{Candle, CandleInterval, PoloniuxCandle, TradeDirection}; use markets::{poloniex::PoloniexClient, Market};
use poloniex::PoloniexClient; use models::{Candle, CandleExtended, CandleInterval, TradeDirection};
use repo::Repo; use repos::{sqlite::SqliteRepo, Repo};
mod config; mod config;
mod error; mod error;
mod markets;
mod models; mod models;
mod poloniex; mod repos;
mod repo;
#[macro_use]
extern crate async_trait;
async fn fetch_candles_until_now( async fn fetch_candles_until_now(
poloniex_client: &PoloniexClient, market_client: Arc<impl Market>,
pair: &str, pair: String,
interval: CandleInterval, interval: CandleInterval,
mut start_time: NaiveDateTime, mut start_time: NaiveDateTime,
) -> AppResult<Vec<PoloniuxCandle>> { ) -> AppResult<(Vec<Candle>, String)> {
let mut result = vec![]; let mut result = vec![];
let limit = 500; let limit = 500;
loop { loop {
println!("pulling candles from {start_time}"); println!("pulling candles from {start_time}");
let candles = poloniex_client let candles = market_client
.get_historical_candles(pair, interval, start_time, Utc::now().naive_utc(), limit) .get_historical_candles(&pair, interval, start_time, Utc::now().naive_utc(), limit)
.await?; .await?;
let Some(last_candle) = candles.last() else { let Some(last_candle) = candles.last() else {
@ -56,48 +61,22 @@ async fn fetch_candles_until_now(
} }
} }
Ok(result) Ok((result, pair.to_string()))
} }
async fn _main() -> AppResult<()> { async fn trades_processor(
dotenv::dotenv().ok(); repo: Arc<impl Repo>,
market_client: Arc<impl Market>,
let config = get_config(); pairs: &[String],
let poloniex_client = PoloniexClient::new(&config.poloniex_rest_url, &config.poloniex_ws_url); interval: CandleInterval,
let repo = Repo::new_init(config.db_name)?; ) -> AppResult<()> {
let mut trades = market_client.recent_trades_stream(&pairs).await?;
let start_time = NaiveDate::from_ymd_opt(2024, 12, 1)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
let pair = config.pair.clone();
let candles =
fetch_candles_until_now(&poloniex_client, &pair, config.interval, start_time).await?;
println!(
"{pair}: fetched {} candles with interval {}",
candles.len(),
config.interval.as_ref()
);
// нельзя так делать, нужно использовать транзакцию
// и батч-вставку для уменьшения количества обращений к бд,
// но в контексте тестового и так сойдёт
for candle in candles {
repo.upsert_candle(&Candle {
candle,
pair: pair.to_string(),
})?;
}
let mut trades = poloniex_client.recent_trades_stream(&pair).await?;
while let Some(t) = trades.next().await { while let Some(t) = trades.next().await {
println!("{t:?}"); println!("{t:?}");
let Ok(trade) = t else { break }; let Ok(trade) = t else { break };
let mut last_candle = repo.get_latest_candle_from_interval(&pair, config.interval)?; let mut last_candle = repo.get_latest_candle_from_interval(&trade.symbol, interval)?;
let interval_delta = match last_candle.candle.interval { let interval_delta = match last_candle.candle.interval {
CandleInterval::M1 => TimeDelta::minutes(1), CandleInterval::M1 => TimeDelta::minutes(1),
CandleInterval::M15 => TimeDelta::minutes(15), CandleInterval::M15 => TimeDelta::minutes(15),
@ -122,8 +101,8 @@ async fn _main() -> AppResult<()> {
.unwrap() .unwrap()
.naive_utc(); .naive_utc();
let new_candle = Candle { let new_candle = CandleExtended {
candle: PoloniuxCandle { candle: Candle {
low: trade.price, low: trade.price,
high: trade.price, high: trade.price,
open: trade.price, open: trade.price,
@ -135,7 +114,7 @@ async fn _main() -> AppResult<()> {
trade_count: 1, trade_count: 1,
ts: trade.ts, ts: trade.ts,
weighted_average: trade.amount / trade.quantity, weighted_average: trade.amount / trade.quantity,
interval: config.interval, interval,
start_time: new_candle_ts, start_time: new_candle_ts,
close_time: NaiveDateTime::UNIX_EPOCH, close_time: NaiveDateTime::UNIX_EPOCH,
}, },
@ -153,7 +132,7 @@ async fn _main() -> AppResult<()> {
last_candle.candle.ts = trade.ts; last_candle.candle.ts = trade.ts;
last_candle.candle.weighted_average = last_candle.candle.weighted_average =
last_candle.candle.amount / last_candle.candle.quantity; last_candle.candle.amount / last_candle.candle.quantity;
last_candle.candle.interval = config.interval; last_candle.candle.interval = interval;
last_candle.candle.close_time = trade.ts; last_candle.candle.close_time = trade.ts;
if is_buy { if is_buy {
@ -166,6 +145,72 @@ async fn _main() -> AppResult<()> {
repo.insert_trade(&trade)?; repo.insert_trade(&trade)?;
} }
Ok(())
}
async fn _main() -> AppResult<()> {
dotenv::dotenv().ok();
let config = get_config()?;
let poloniex_client = Arc::new(PoloniexClient::new(
&config.poloniex_rest_url,
&config.poloniex_ws_url,
));
let repo = Arc::new(SqliteRepo::new_init(config.db_name)?);
let start_time = NaiveDate::from_ymd_opt(2024, 12, 1)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
let mut fetchers = vec![];
for pair in &config.pairs {
for interval in &config.intervals {
let fetcher = fetch_candles_until_now(
poloniex_client.clone(),
pair.to_string(),
*interval,
start_time,
);
fetchers.push(fetcher);
}
}
let fetched_candles = try_join_all(fetchers).await?;
// println!(
// "{pair}: fetched {} candles with interval {}",
// candles.len(),
// config.interval.as_ref()
// );
// нельзя так делать, нужно использовать транзакцию
// и батч-вставку для уменьшения количества обращений к бд,
// но в контексте тестового и так сойдёт
for (candles, pair) in fetched_candles {
for candle in candles {
repo.upsert_candle(&CandleExtended {
candle,
pair: pair.clone(),
})?;
}
}
for interval in &config.intervals {
tokio::spawn({
let poloniex_client = poloniex_client.clone();
let repo = repo.clone();
let pairs = config.pairs.clone();
let interval = *interval;
async move {
let result = trades_processor(repo, poloniex_client, &pairs, interval).await;
if let Err(e) = result {
eprintln!("processor stopped with error: {e}")
}
}
});
}
Ok(()) Ok(())
} }

@ -0,0 +1,28 @@
use std::pin::Pin;
use chrono::NaiveDateTime;
use futures_util::Stream;
use crate::{
error::AppResult,
models::{Candle, CandleInterval, Trade},
};
pub mod poloniex;
#[async_trait]
pub trait Market {
async fn get_historical_candles(
&self,
pair: &str,
interval: CandleInterval,
start_date: NaiveDateTime,
end_date: NaiveDateTime,
limit: usize,
) -> AppResult<Vec<Candle>>;
async fn recent_trades_stream(
&self,
pairs: &[String],
) -> AppResult<Pin<Box<dyn Stream<Item = AppResult<Trade>> + Send>>>;
}

@ -8,11 +8,13 @@ use reqwest_websocket::Message;
use crate::{ use crate::{
error::{AppError, AppResult}, error::{AppError, AppResult},
models::{ models::{
CandleInterval, PoloniuxCandle, SubscriptionRequest, SubscriptionResponse, Candle, CandleInterval, SubscriptionRequest, SubscriptionResponse,
SubscriptionResponseData, Trade, SubscriptionResponseData, Trade,
}, },
}; };
use super::Market;
pub struct PoloniexClient { pub struct PoloniexClient {
rest_base_url: Url, rest_base_url: Url,
ws_base_url: Url, ws_base_url: Url,
@ -25,15 +27,18 @@ impl PoloniexClient {
ws_base_url: ws_base_url.to_owned(), ws_base_url: ws_base_url.to_owned(),
} }
} }
}
pub async fn get_historical_candles( #[async_trait]
impl Market for PoloniexClient {
async fn get_historical_candles(
&self, &self,
pair: &str, pair: &str,
interval: CandleInterval, interval: CandleInterval,
start_date: NaiveDateTime, start_date: NaiveDateTime,
end_date: NaiveDateTime, end_date: NaiveDateTime,
limit: usize, limit: usize,
) -> AppResult<Vec<PoloniuxCandle>> { ) -> AppResult<Vec<Candle>> {
let mut req = self let mut req = self
.rest_base_url .rest_base_url
.join(&format!("/markets/{}/candles", pair))?; .join(&format!("/markets/{}/candles", pair))?;
@ -59,16 +64,16 @@ impl PoloniexClient {
result.json().await.map_err(AppError::from) result.json().await.map_err(AppError::from)
} }
pub async fn recent_trades_stream( async fn recent_trades_stream(
&self, &self,
pair: &str, pairs: &[String],
) -> AppResult<Pin<Box<dyn Stream<Item = AppResult<Trade>>>>> { ) -> AppResult<Pin<Box<dyn Stream<Item = AppResult<Trade>> + Send>>> {
let mut ws = reqwest_websocket::websocket(self.ws_base_url.clone()).await?; let mut ws = reqwest_websocket::websocket(self.ws_base_url.clone()).await?;
let req = SubscriptionRequest { let req = SubscriptionRequest {
event: "subscribe".to_string(), event: "subscribe".to_string(),
channel: vec!["trades".to_string()], channel: vec!["trades".to_string()],
symbols: vec![pair.to_string()], symbols: pairs.to_vec(),
}; };
ws.send(Message::Text(serde_json::to_string(&req)?)).await?; ws.send(Message::Text(serde_json::to_string(&req)?)).await?;

@ -19,7 +19,7 @@ pub enum CandleInterval {
} }
#[derive(Debug, Deserialize_tuple)] #[derive(Debug, Deserialize_tuple)]
pub struct PoloniuxCandle { pub struct Candle {
#[serde(deserialize_with = "deser_str_to_int")] #[serde(deserialize_with = "deser_str_to_int")]
pub low: f64, pub low: f64,
#[serde(deserialize_with = "deser_str_to_int")] #[serde(deserialize_with = "deser_str_to_int")]
@ -48,8 +48,8 @@ pub struct PoloniuxCandle {
pub close_time: NaiveDateTime, pub close_time: NaiveDateTime,
} }
pub struct Candle { pub struct CandleExtended {
pub candle: PoloniuxCandle, pub candle: Candle,
pub pair: String, pub pair: String,
} }

@ -0,0 +1,18 @@
use crate::{
error::AppResult,
models::{CandleExtended, CandleInterval, Trade},
};
pub mod sqlite;
pub trait Repo {
fn upsert_candle(&self, candle: &CandleExtended) -> AppResult<usize>;
fn insert_trade(&self, trade: &Trade) -> AppResult<usize>;
fn get_latest_candle_from_interval(
&self,
pair: &str,
interval: CandleInterval,
) -> AppResult<CandleExtended>;
}

@ -5,14 +5,16 @@ use rusqlite::{self, params, Connection};
use crate::{ use crate::{
error::{AppError, AppResult}, error::{AppError, AppResult},
models::{Candle, CandleInterval, PoloniuxCandle, Trade}, models::{Candle, CandleExtended, CandleInterval, Trade},
}; };
pub struct Repo { use super::Repo;
pub struct SqliteRepo {
conn: Connection, conn: Connection,
} }
impl Repo { impl SqliteRepo {
pub fn new_init(db_path: impl AsRef<Path>) -> AppResult<Self> { pub fn new_init(db_path: impl AsRef<Path>) -> AppResult<Self> {
let path = db_path.as_ref(); let path = db_path.as_ref();
@ -60,8 +62,10 @@ impl Repo {
Ok(Self { conn }) Ok(Self { conn })
} }
}
pub fn upsert_candle(&self, candle: &Candle) -> AppResult<usize> { impl Repo for SqliteRepo {
fn upsert_candle(&self, candle: &CandleExtended) -> AppResult<usize> {
let q = " let q = "
REPLACE INTO candles( REPLACE INTO candles(
low, low,
@ -121,7 +125,7 @@ impl Repo {
.map_err(AppError::from) .map_err(AppError::from)
} }
pub fn insert_trade(&self, trade: &Trade) -> AppResult<usize> { fn insert_trade(&self, trade: &Trade) -> AppResult<usize> {
let q = " let q = "
INSERT INTO trades( INSERT INTO trades(
symbol, symbol,
@ -161,11 +165,11 @@ impl Repo {
.map_err(AppError::from) .map_err(AppError::from)
} }
pub fn get_latest_candle_from_interval( fn get_latest_candle_from_interval(
&self, &self,
pair: &str, pair: &str,
interval: CandleInterval, interval: CandleInterval,
) -> AppResult<Candle> { ) -> AppResult<CandleExtended> {
let q = " let q = "
SELECT * FROM candles SELECT * FROM candles
WHERE pair = ?1 AND interval = ?2 WHERE pair = ?1 AND interval = ?2
@ -173,8 +177,8 @@ impl Repo {
self.conn self.conn
.query_row(&q, params![pair, interval.as_ref()], |row| { .query_row(&q, params![pair, interval.as_ref()], |row| {
Ok(Candle { Ok(CandleExtended {
candle: PoloniuxCandle { candle: Candle {
low: row.get(0)?, low: row.get(0)?,
high: row.get(1)?, high: row.get(1)?,
open: row.get(2)?, open: row.get(2)?,
Loading…
Cancel
Save