add candle appending, configs

master
plazmoid 1 month ago
parent ff280584d5
commit f206262618
  1. 5
      .env
  2. 17
      Cargo.lock
  3. 2
      Cargo.toml
  4. 24
      src/config.rs
  5. 3
      src/error.rs
  6. 105
      src/main.rs
  7. 88
      src/models.rs
  8. 29
      src/poloniex.rs
  9. 131
      src/repo.rs

@ -0,0 +1,5 @@
export PAIR="BTC_USDT"
export INTERVAL="MINUTE_1"
export POLONIEX_REST_URL="https://api.poloniex.com"
export POLONIEX_WS_URL="wss://ws.poloniex.com/ws/public"
export DB_NAME="poloniex_data.db"

17
Cargo.lock generated

@ -228,6 +228,12 @@ dependencies = [
"syn 2.0.98", "syn 2.0.98",
] ]
[[package]]
name = "dotenv"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]] [[package]]
name = "encoding_rs" name = "encoding_rs"
version = "0.8.35" version = "0.8.35"
@ -237,6 +243,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "envy"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f47e0157f2cb54f5ae1bd371b30a2ae4311e1c028f575cd4e81de7353215965"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.1" version = "1.0.1"
@ -935,6 +950,8 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"chrono", "chrono",
"dotenv",
"envy",
"futures-util", "futures-util",
"reqwest", "reqwest",
"reqwest-websocket", "reqwest-websocket",

@ -6,6 +6,8 @@ edition = "2021"
[dependencies] [dependencies]
async-stream = "0.3.6" async-stream = "0.3.6"
chrono = { version = "0.4.39", features = ["serde"] } chrono = { version = "0.4.39", features = ["serde"] }
dotenv = "0.15.0"
envy = "0.4.2"
futures-util = "0.3.31" futures-util = "0.3.31"
reqwest = { version = "0.12.12", features = ["json"] } reqwest = { version = "0.12.12", features = ["json"] }
reqwest-websocket = "0.4.4" reqwest-websocket = "0.4.4"

@ -0,0 +1,24 @@
use serde::{Deserialize, Deserializer};
use url::Url;
use crate::models::CandleInterval;
#[derive(Deserialize)]
pub struct Config {
pub pair: String,
pub interval: CandleInterval,
#[serde(deserialize_with = "deser_url")]
pub poloniex_rest_url: Url,
#[serde(deserialize_with = "deser_url")]
pub poloniex_ws_url: Url,
pub db_name: String,
}
fn deser_url<'de, D: Deserializer<'de>>(deserialize: D) -> Result<Url, D::Error> {
let url = String::deserialize(deserialize)?;
url.parse().map_err(serde::de::Error::custom)
}
pub fn get_config() -> Config {
envy::from_env().unwrap()
}

@ -18,4 +18,7 @@ pub enum AppError {
#[error(transparent)] #[error(transparent)]
DbError(#[from] rusqlite::Error), DbError(#[from] rusqlite::Error),
#[error(transparent)]
StrumError(#[from] strum::ParseError),
} }

@ -1,10 +1,12 @@
use chrono::{NaiveDate, NaiveDateTime, TimeDelta, Utc}; use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Utc};
use config::get_config;
use error::AppResult; use error::AppResult;
use futures_util::StreamExt; use futures_util::StreamExt;
use models::{Candle, CandleInterval, Pair}; use models::{Candle, CandleInterval, PoloniuxCandle, TradeDirection};
use poloniex::PoloniexClient; use poloniex::PoloniexClient;
use repo::Repo; use repo::Repo;
mod config;
mod error; mod error;
mod models; mod models;
mod poloniex; mod poloniex;
@ -12,10 +14,10 @@ mod repo;
async fn fetch_candles_until_now( async fn fetch_candles_until_now(
poloniex_client: &PoloniexClient, poloniex_client: &PoloniexClient,
pair: &Pair, pair: &str,
interval: CandleInterval, interval: CandleInterval,
mut start_time: NaiveDateTime, mut start_time: NaiveDateTime,
) -> AppResult<Vec<Candle>> { ) -> AppResult<Vec<PoloniuxCandle>> {
let mut result = vec![]; let mut result = vec![];
loop { loop {
@ -51,28 +53,35 @@ async fn fetch_candles_until_now(
} }
async fn _main() -> AppResult<()> { async fn _main() -> AppResult<()> {
let poloniex_client = PoloniexClient::new( dotenv::dotenv().ok();
"https://api.poloniex.com",
"wss://ws.poloniex.com/ws/public", let config = get_config();
)?; let poloniex_client = PoloniexClient::new(&config.poloniex_rest_url, &config.poloniex_ws_url);
let repo = Repo::new_init("poloniex_data.db")?; let repo = Repo::new_init(config.db_name)?;
let start_time = NaiveDate::from_ymd_opt(2024, 12, 1) let start_time = NaiveDate::from_ymd_opt(2024, 12, 1)
.unwrap() .unwrap()
.and_hms_opt(0, 0, 0) .and_hms_opt(0, 0, 0)
.unwrap(); .unwrap();
let pair = Pair::new("BTC", "USDT"); let pair = config.pair.clone();
let candles = let candles =
fetch_candles_until_now(&poloniex_client, &pair, CandleInterval::M1, start_time).await?; fetch_candles_until_now(&poloniex_client, &pair, config.interval, start_time).await?;
println!("fetched {} candles", candles.len()); println!(
"{pair}: fetched {} candles with interval {}",
candles.len(),
config.interval.as_ref()
);
// нельзя так делать, нужно использовать транзакцию // нельзя так делать, нужно использовать транзакцию
// и батч-вставку для уменьшения количества обращений к бд, // и батч-вставку для уменьшения количества обращений к бд,
// но в контексте тестового и так сойдёт // но в контексте тестового и так сойдёт
for candle in candles { for candle in candles {
repo.insert_candle(&candle)?; repo.upsert_candle(&Candle {
candle,
pair: pair.to_string(),
})?;
} }
let mut trades = poloniex_client.recent_trades_stream(&pair).await?; let mut trades = poloniex_client.recent_trades_stream(&pair).await?;
@ -80,9 +89,75 @@ async fn _main() -> AppResult<()> {
while let Some(t) = trades.next().await { while let Some(t) = trades.next().await {
println!("{t:?}"); println!("{t:?}");
if let Ok(trade) = t { let Ok(trade) = t else { break };
repo.insert_trade(&trade)?; let mut last_candle = repo.get_latest_candle_from_interval(&pair, config.interval)?;
let interval_delta = match last_candle.candle.interval {
CandleInterval::M1 => TimeDelta::minutes(1),
CandleInterval::M15 => TimeDelta::minutes(15),
CandleInterval::H1 => TimeDelta::hours(1),
CandleInterval::D1 => TimeDelta::days(1),
};
let is_buy = matches!(trade.taker_side, TradeDirection::Buy);
// если трейд не входит в интервал последней свечи, то создаём новую свечу, иначе обновляем предыдущую
if trade.ts > (last_candle.candle.ts + interval_delta) {
let interval_secs = match last_candle.candle.interval {
CandleInterval::M1 => 60,
CandleInterval::M15 => 60 * 15,
CandleInterval::H1 => 60 * 60,
CandleInterval::D1 => 60 * 60 * 24,
};
let new_candle_ts = DateTime::from_timestamp(
(trade.ts.and_utc().timestamp() / interval_secs) * interval_secs,
0,
)
.unwrap()
.naive_utc();
let new_candle = Candle {
candle: PoloniuxCandle {
low: trade.price,
high: trade.price,
open: trade.price,
close: trade.price,
amount: trade.amount,
quantity: trade.quantity,
buy_taker_amount: if is_buy { trade.amount } else { 0.0 },
buy_taker_quantity: if is_buy { trade.quantity } else { 0.0 },
trade_count: 1,
ts: trade.ts,
weighted_average: trade.amount / trade.quantity,
interval: config.interval,
start_time: new_candle_ts,
close_time: NaiveDateTime::UNIX_EPOCH,
},
pair: trade.symbol.clone(),
};
repo.upsert_candle(&new_candle)?;
} else {
last_candle.candle.low = last_candle.candle.low.min(trade.price);
last_candle.candle.high = last_candle.candle.high.max(trade.price);
last_candle.candle.close = trade.price;
last_candle.candle.amount += trade.amount;
last_candle.candle.quantity += trade.quantity;
last_candle.candle.trade_count += 1;
last_candle.candle.ts = trade.ts;
last_candle.candle.weighted_average =
last_candle.candle.amount / last_candle.candle.quantity;
last_candle.candle.interval = config.interval;
last_candle.candle.close_time = trade.ts;
if is_buy {
last_candle.candle.buy_taker_amount += trade.amount;
last_candle.candle.buy_taker_quantity += trade.quantity;
} }
repo.upsert_candle(&last_candle)?;
}
repo.insert_trade(&trade)?;
} }
Ok(()) Ok(())
} }

@ -1,62 +1,64 @@
use std::fmt::{self, Display};
use chrono::{DateTime, NaiveDateTime}; use chrono::{DateTime, NaiveDateTime};
use serde::{Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
use serde_tuple::Deserialize_tuple; use serde_tuple::Deserialize_tuple;
pub struct Pair { #[derive(strum::EnumString, strum::AsRefStr, Clone, Copy, Deserialize, Debug)]
pub base: String,
pub quote: String,
}
impl Pair {
pub fn new(base: &str, quote: &str) -> Self {
Self {
base: base.to_string(),
quote: quote.to_string(),
}
}
}
impl Display for Pair {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}_{}", self.base, self.quote)
}
}
#[derive(strum::AsRefStr, Clone, Copy)]
pub enum CandleInterval { pub enum CandleInterval {
#[strum(serialize = "MINUTE_1")] #[strum(serialize = "MINUTE_1")]
#[serde(rename = "MINUTE_1")]
M1, M1,
#[strum(serialize = "MINUTE_15")] #[strum(serialize = "MINUTE_15")]
#[serde(rename = "MINUTE_15")]
M15, M15,
#[strum(serialize = "HOUR_1")] #[strum(serialize = "HOUR_1")]
#[serde(rename = "HOUR_1")]
H1, H1,
#[strum(serialize = "DAY_1")] #[strum(serialize = "DAY_1")]
#[serde(rename = "DAY_1")]
D1, D1,
} }
#[derive(Debug, Deserialize_tuple)] #[derive(Debug, Deserialize_tuple)]
pub struct Candle { pub struct PoloniuxCandle {
pub low: String, #[serde(deserialize_with = "deser_str_to_int")]
pub high: String, pub low: f64,
pub open: String, #[serde(deserialize_with = "deser_str_to_int")]
pub close: String, pub high: f64,
pub amount: String, #[serde(deserialize_with = "deser_str_to_int")]
pub quantity: String, pub open: f64,
pub buy_taker_amount: String, #[serde(deserialize_with = "deser_str_to_int")]
pub buy_taker_quantity: String, pub close: f64,
#[serde(deserialize_with = "deser_str_to_int")]
pub amount: f64,
#[serde(deserialize_with = "deser_str_to_int")]
pub quantity: f64,
#[serde(deserialize_with = "deser_str_to_int")]
pub buy_taker_amount: f64,
#[serde(deserialize_with = "deser_str_to_int")]
pub buy_taker_quantity: f64,
pub trade_count: i64, pub trade_count: i64,
#[serde(deserialize_with = "deser_naive_dt")] #[serde(deserialize_with = "deser_naive_dt")]
pub ts: NaiveDateTime, pub ts: NaiveDateTime,
pub weighted_average: String, #[serde(deserialize_with = "deser_str_to_int")]
pub interval: String, pub weighted_average: f64,
pub interval: CandleInterval,
#[serde(deserialize_with = "deser_naive_dt")] #[serde(deserialize_with = "deser_naive_dt")]
pub start_time: NaiveDateTime, pub start_time: NaiveDateTime,
#[serde(deserialize_with = "deser_naive_dt")] #[serde(deserialize_with = "deser_naive_dt")]
pub close_time: NaiveDateTime, pub close_time: NaiveDateTime,
} }
pub struct Candle {
pub candle: PoloniuxCandle,
pub pair: String,
}
fn deser_str_to_int<'de, D: Deserializer<'de>>(deserialize: D) -> Result<f64, D::Error> {
let s = String::deserialize(deserialize)?;
s.parse()
.map_err(|e| serde::de::Error::custom(format!("{e}: {s}")))
}
fn deser_naive_dt<'de, D: Deserializer<'de>>(deserialize: D) -> Result<NaiveDateTime, D::Error> { fn deser_naive_dt<'de, D: Deserializer<'de>>(deserialize: D) -> Result<NaiveDateTime, D::Error> {
let ts = Deserialize::deserialize(deserialize)?; let ts = Deserialize::deserialize(deserialize)?;
DateTime::from_timestamp_millis(ts) DateTime::from_timestamp_millis(ts)
@ -68,17 +70,27 @@ fn deser_naive_dt<'de, D: Deserializer<'de>>(deserialize: D) -> Result<NaiveDate
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Trade { pub struct Trade {
pub symbol: String, pub symbol: String,
pub amount: String, #[serde(deserialize_with = "deser_str_to_int")]
pub taker_side: String, pub amount: f64,
pub quantity: String, pub taker_side: TradeDirection,
#[serde(deserialize_with = "deser_str_to_int")]
pub quantity: f64,
#[serde(deserialize_with = "deser_naive_dt")] #[serde(deserialize_with = "deser_naive_dt")]
pub create_time: NaiveDateTime, pub create_time: NaiveDateTime,
pub price: String, #[serde(deserialize_with = "deser_str_to_int")]
pub price: f64,
pub id: String, pub id: String,
#[serde(deserialize_with = "deser_naive_dt")] #[serde(deserialize_with = "deser_naive_dt")]
pub ts: NaiveDateTime, pub ts: NaiveDateTime,
} }
#[derive(Deserialize, Debug, strum::AsRefStr)]
#[serde(rename_all = "camelCase")]
pub enum TradeDirection {
Buy,
Sell,
}
#[derive(Serialize)] #[derive(Serialize)]
pub struct SubscriptionRequest { pub struct SubscriptionRequest {
pub event: String, pub event: String,

@ -1,15 +1,14 @@
use std::pin::Pin; use std::pin::Pin;
use chrono::{NaiveDate, NaiveDateTime}; use chrono::NaiveDateTime;
use futures_util::{SinkExt, Stream, TryStreamExt as _}; use futures_util::{SinkExt, Stream, TryStreamExt as _};
use reqwest::Url; use reqwest::Url;
use reqwest_websocket::Message; use reqwest_websocket::Message;
use serde::{Deserialize, Serialize};
use crate::{ use crate::{
error::{AppError, AppResult}, error::{AppError, AppResult},
models::{ models::{
Candle, CandleInterval, Pair, SubscriptionRequest, SubscriptionResponse, CandleInterval, PoloniuxCandle, SubscriptionRequest, SubscriptionResponse,
SubscriptionResponseData, Trade, SubscriptionResponseData, Trade,
}, },
}; };
@ -20,26 +19,23 @@ pub struct PoloniexClient {
} }
impl PoloniexClient { impl PoloniexClient {
pub fn new(rest_base_url: &str, ws_base_url: &str) -> AppResult<Self> { pub fn new(rest_base_url: &Url, ws_base_url: &Url) -> Self {
let rest_base_url = rest_base_url.parse()?; Self {
let ws_base_url = ws_base_url.parse()?; rest_base_url: rest_base_url.to_owned(),
ws_base_url: ws_base_url.to_owned(),
Ok(Self { }
rest_base_url,
ws_base_url,
})
} }
pub async fn get_historical_candles( pub async fn get_historical_candles(
&self, &self,
pair: &Pair, pair: &str,
interval: CandleInterval, interval: CandleInterval,
start_date: NaiveDateTime, start_date: NaiveDateTime,
end_date: NaiveDateTime, end_date: NaiveDateTime,
) -> AppResult<Vec<Candle>> { ) -> AppResult<Vec<PoloniuxCandle>> {
let mut req = self let mut req = self
.rest_base_url .rest_base_url
.join(&format!("/markets/{}/candles", pair.to_string()))?; .join(&format!("/markets/{}/candles", pair))?;
req.query_pairs_mut() req.query_pairs_mut()
.append_pair( .append_pair(
@ -64,7 +60,7 @@ impl PoloniexClient {
pub async fn recent_trades_stream( pub async fn recent_trades_stream(
&self, &self,
pair: &Pair, pair: &str,
) -> AppResult<Pin<Box<dyn Stream<Item = AppResult<Trade>>>>> { ) -> AppResult<Pin<Box<dyn Stream<Item = AppResult<Trade>>>>> {
let mut ws = reqwest_websocket::websocket(self.ws_base_url.clone()).await?; let mut ws = reqwest_websocket::websocket(self.ws_base_url.clone()).await?;
@ -77,7 +73,7 @@ impl PoloniexClient {
ws.send(Message::Text(serde_json::to_string(&req)?)).await?; ws.send(Message::Text(serde_json::to_string(&req)?)).await?;
Ok(Box::pin(async_stream::stream! { Ok(Box::pin(async_stream::stream! {
while let Some(message) = ws.try_next().await.unwrap() { while let Some(message) = ws.try_next().await? {
match message { match message {
Message::Text(text) => { Message::Text(text) => {
if let Ok(sub) = serde_json::from_str::<SubscriptionResponse>(&text) { if let Ok(sub) = serde_json::from_str::<SubscriptionResponse>(&text) {
@ -85,7 +81,6 @@ impl PoloniexClient {
continue continue
} }
dbg!(&text);
let trades = serde_json::from_str::<SubscriptionResponseData<Trade>>(&text)?; let trades = serde_json::from_str::<SubscriptionResponseData<Trade>>(&text)?;
for trade in trades.data { for trade in trades.data {
yield Ok(trade) yield Ok(trade)

@ -1,10 +1,11 @@
use std::{fs, path::Path}; use std::{fs, path::Path, str::FromStr};
use chrono::DateTime;
use rusqlite::{self, params, Connection}; use rusqlite::{self, params, Connection};
use crate::{ use crate::{
error::{AppError, AppResult}, error::{AppError, AppResult},
models::{Candle, Trade}, models::{Candle, CandleInterval, PoloniuxCandle, Trade},
}; };
pub struct Repo { pub struct Repo {
@ -20,49 +21,49 @@ impl Repo {
let conn = Connection::open(path)?; let conn = Connection::open(path)?;
conn.execute( conn.execute_batch(
" "BEGIN;
CREATE TABLE IF NOT EXISTS trades( CREATE TABLE IF NOT EXISTS trades(
symbol TEXT NOT NULL, symbol TEXT NOT NULL,
amount TEXT NOT NULL, amount REAL NOT NULL,
taker_side TEXT NOT NULL, taker_side TEXT NOT NULL,
quantity TEXT NOT NULL, quantity REAL NOT NULL,
create_time INT NOT NULL, create_time INT NOT NULL,
price TEXT NOT NULL, price REAL NOT NULL,
id TEXT NOT NULL, id TEXT NOT NULL,
ts INT NOT NULL ts INT NOT NULL
); );
",
[],
)?;
conn.execute(
"
CREATE TABLE IF NOT EXISTS candles( CREATE TABLE IF NOT EXISTS candles(
low TEXT NOT NULL, low REAL NOT NULL,
high TEXT NOT NULL, high REAL NOT NULL,
open TEXT NOT NULL, open REAL NOT NULL,
close TEXT NOT NULL, close REAL NOT NULL,
amount TEXT NOT NULL, amount REAL NOT NULL,
quantity TEXT NOT NULL, quantity REAL NOT NULL,
buy_taker_amount TEXT NOT NULL, buy_taker_amount REAL NOT NULL,
buy_taker_quantity TEXT NOT NULL, buy_taker_quantity REAL NOT NULL,
trade_count INT NOT NULL, trade_count INT NOT NULL,
ts INT NOT NULL, ts INT NOT NULL,
weighted_average TEXT NOT NULL, weighted_average REAL NOT NULL,
interval TEXT NOT NULL, interval TEXT NOT NULL,
start_time INT NOT NULL, start_time INT NOT NULL,
close_time INT NOT NULL close_time INT NOT NULL,
pair TEXT NOT NULL,
PRIMARY KEY(pair, interval, start_time)
); );
",
[], COMMIT;",
)?; )?;
Ok(Self { conn }) Ok(Self { conn })
} }
pub fn insert_candle(&self, candle: &Candle) -> AppResult<usize> { pub fn upsert_candle(&self, candle: &Candle) -> AppResult<usize> {
let q = " let q = "
INSERT INTO candles( REPLACE INTO candles(
low, low,
high, high,
open, open,
@ -76,7 +77,8 @@ impl Repo {
weighted_average, weighted_average,
interval, interval,
start_time, start_time,
close_time close_time,
pair
) VALUES ( ) VALUES (
?1, ?1,
?2, ?2,
@ -91,27 +93,29 @@ impl Repo {
?11, ?11,
?12, ?12,
?13, ?13,
?14 ?14,
?15
) )
"; ";
self.conn self.conn
.execute( .execute(
q, q,
params![ params![
&candle.low, &candle.candle.low,
&candle.high, &candle.candle.high,
&candle.open, &candle.candle.open,
&candle.close, &candle.candle.close,
&candle.amount, &candle.candle.amount,
&candle.quantity, &candle.candle.quantity,
&candle.buy_taker_amount, &candle.candle.buy_taker_amount,
&candle.buy_taker_quantity, &candle.candle.buy_taker_quantity,
&candle.trade_count, &candle.candle.trade_count,
&candle.ts.and_utc().timestamp_millis(), &candle.candle.ts.and_utc().timestamp_millis(),
&candle.weighted_average, &candle.candle.weighted_average,
&candle.interval, &candle.candle.interval.as_ref(),
&candle.start_time.and_utc().timestamp_millis(), &candle.candle.start_time.and_utc().timestamp_millis(),
&candle.close_time.and_utc().timestamp_millis(), &candle.candle.close_time.and_utc().timestamp_millis(),
&candle.pair
], ],
) )
.map_err(AppError::from) .map_err(AppError::from)
@ -146,7 +150,7 @@ impl Repo {
params![ params![
&trade.symbol, &trade.symbol,
&trade.amount, &trade.amount,
&trade.taker_side, &trade.taker_side.as_ref(),
&trade.quantity, &trade.quantity,
&trade.create_time.and_utc().timestamp_millis(), &trade.create_time.and_utc().timestamp_millis(),
&trade.price, &trade.price,
@ -156,4 +160,45 @@ impl Repo {
) )
.map_err(AppError::from) .map_err(AppError::from)
} }
pub fn get_latest_candle_from_interval(
&self,
pair: &str,
interval: CandleInterval,
) -> AppResult<Candle> {
let q = "
SELECT * FROM candles
WHERE pair = ?1 AND interval = ?2
";
self.conn
.query_row(&q, params![pair, interval.as_ref()], |row| {
Ok(Candle {
candle: PoloniuxCandle {
low: row.get(0)?,
high: row.get(1)?,
open: row.get(2)?,
close: row.get(3)?,
amount: row.get(4)?,
quantity: row.get(5)?,
buy_taker_amount: row.get(6)?,
buy_taker_quantity: row.get(7)?,
trade_count: row.get(8)?,
ts: DateTime::from_timestamp(row.get(9)?, 0)
.unwrap()
.naive_local(),
weighted_average: row.get(10)?,
interval: FromStr::from_str(&row.get::<_, String>(11)?).unwrap(),
start_time: DateTime::from_timestamp(row.get(12)?, 0)
.unwrap()
.naive_local(),
close_time: DateTime::from_timestamp(row.get(13)?, 0)
.unwrap()
.naive_local(),
},
pair: row.get(14)?,
})
})
.map_err(AppError::from)
}
} }

Loading…
Cancel
Save