implement task

master
plazmoid 1 month ago
commit ff280584d5
  1. 1
      .gitignore
  2. 2015
      Cargo.lock
  3. 19
      Cargo.toml
  4. 21
      src/error.rs
  5. 95
      src/main.rs
  6. 102
      src/models.rs
  7. 103
      src/poloniex.rs
  8. 159
      src/repo.rs

1
.gitignore vendored

@ -0,0 +1 @@
/target

2015
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -0,0 +1,19 @@
[package]
name = "poloniex_dump"
version = "0.1.0"
edition = "2021"
[dependencies]
async-stream = "0.3.6"
chrono = { version = "0.4.39", features = ["serde"] }
futures-util = "0.3.31"
reqwest = { version = "0.12.12", features = ["json"] }
reqwest-websocket = "0.4.4"
rusqlite = "0.33.0"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.138"
serde_tuple = "1.1.0"
strum = { version = "0.26.3", features = ["derive"] }
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros"] }
url = "2.5.4"

@ -0,0 +1,21 @@
use url::ParseError;
pub type AppResult<R> = Result<R, AppError>;
#[derive(thiserror::Error, Debug)]
pub enum AppError {
#[error(transparent)]
UrlParseError(#[from] ParseError),
#[error(transparent)]
ReqwestError(#[from] reqwest::Error),
#[error(transparent)]
ReqwestWsError(#[from] reqwest_websocket::Error),
#[error(transparent)]
SerdeError(#[from] serde_json::Error),
#[error(transparent)]
DbError(#[from] rusqlite::Error),
}

@ -0,0 +1,95 @@
use chrono::{NaiveDate, NaiveDateTime, TimeDelta, Utc};
use error::AppResult;
use futures_util::StreamExt;
use models::{Candle, CandleInterval, Pair};
use poloniex::PoloniexClient;
use repo::Repo;
mod error;
mod models;
mod poloniex;
mod repo;
async fn fetch_candles_until_now(
poloniex_client: &PoloniexClient,
pair: &Pair,
interval: CandleInterval,
mut start_time: NaiveDateTime,
) -> AppResult<Vec<Candle>> {
let mut result = vec![];
loop {
let candles = poloniex_client
.get_historical_candles(pair, interval, start_time, Utc::now().naive_utc())
.await?;
let Some(last_candle) = candles.last() else {
// больше нет свечей, скачали все возможные
break;
};
let last_candle_ts = last_candle.ts.and_utc().timestamp();
let now = Utc::now().timestamp();
dbg!(last_candle_ts, now);
if last_candle_ts < now {
// если какие-то свечки недополучили из-за лимитов,
// смещаем запрашиваемый временной отрезок вправо
start_time = last_candle.ts + TimeDelta::seconds(1);
if start_time.and_utc().timestamp() >= now {
break;
}
} else {
break;
}
result.extend(candles);
}
Ok(result)
}
async fn _main() -> AppResult<()> {
let poloniex_client = PoloniexClient::new(
"https://api.poloniex.com",
"wss://ws.poloniex.com/ws/public",
)?;
let repo = Repo::new_init("poloniex_data.db")?;
let start_time = NaiveDate::from_ymd_opt(2024, 12, 1)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
let pair = Pair::new("BTC", "USDT");
let candles =
fetch_candles_until_now(&poloniex_client, &pair, CandleInterval::M1, start_time).await?;
println!("fetched {} candles", candles.len());
// нельзя так делать, нужно использовать транзакцию
// и батч-вставку для уменьшения количества обращений к бд,
// но в контексте тестового и так сойдёт
for candle in candles {
repo.insert_candle(&candle)?;
}
let mut trades = poloniex_client.recent_trades_stream(&pair).await?;
while let Some(t) = trades.next().await {
println!("{t:?}");
if let Ok(trade) = t {
repo.insert_trade(&trade)?;
}
}
Ok(())
}
#[tokio::main]
async fn main() {
if let Err(e) = _main().await {
eprintln!("{e}");
}
}

@ -0,0 +1,102 @@
use std::fmt::{self, Display};
use chrono::{DateTime, NaiveDateTime};
use serde::{Deserialize, Deserializer, Serialize};
use serde_tuple::Deserialize_tuple;
pub struct Pair {
pub base: String,
pub quote: String,
}
impl Pair {
pub fn new(base: &str, quote: &str) -> Self {
Self {
base: base.to_string(),
quote: quote.to_string(),
}
}
}
impl Display for Pair {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}_{}", self.base, self.quote)
}
}
#[derive(strum::AsRefStr, Clone, Copy)]
pub enum CandleInterval {
#[strum(serialize = "MINUTE_1")]
M1,
#[strum(serialize = "MINUTE_15")]
M15,
#[strum(serialize = "HOUR_1")]
H1,
#[strum(serialize = "DAY_1")]
D1,
}
#[derive(Debug, Deserialize_tuple)]
pub struct Candle {
pub low: String,
pub high: String,
pub open: String,
pub close: String,
pub amount: String,
pub quantity: String,
pub buy_taker_amount: String,
pub buy_taker_quantity: String,
pub trade_count: i64,
#[serde(deserialize_with = "deser_naive_dt")]
pub ts: NaiveDateTime,
pub weighted_average: String,
pub interval: String,
#[serde(deserialize_with = "deser_naive_dt")]
pub start_time: NaiveDateTime,
#[serde(deserialize_with = "deser_naive_dt")]
pub close_time: NaiveDateTime,
}
fn deser_naive_dt<'de, D: Deserializer<'de>>(deserialize: D) -> Result<NaiveDateTime, D::Error> {
let ts = Deserialize::deserialize(deserialize)?;
DateTime::from_timestamp_millis(ts)
.ok_or(serde::de::Error::custom(format!("wrong timestamp '{ts}'")))
.map(|dt| dt.naive_utc())
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Trade {
pub symbol: String,
pub amount: String,
pub taker_side: String,
pub quantity: String,
#[serde(deserialize_with = "deser_naive_dt")]
pub create_time: NaiveDateTime,
pub price: String,
pub id: String,
#[serde(deserialize_with = "deser_naive_dt")]
pub ts: NaiveDateTime,
}
#[derive(Serialize)]
pub struct SubscriptionRequest {
pub event: String,
pub channel: Vec<String>,
pub symbols: Vec<String>,
}
#[derive(Deserialize, Debug)]
#[allow(dead_code)]
pub struct SubscriptionResponse {
pub event: String,
pub channel: String,
pub symbols: Vec<String>,
}
#[derive(Deserialize, Debug)]
#[allow(dead_code)]
pub struct SubscriptionResponseData<T> {
pub channel: String,
pub data: Vec<T>,
}

@ -0,0 +1,103 @@
use std::pin::Pin;
use chrono::{NaiveDate, NaiveDateTime};
use futures_util::{SinkExt, Stream, TryStreamExt as _};
use reqwest::Url;
use reqwest_websocket::Message;
use serde::{Deserialize, Serialize};
use crate::{
error::{AppError, AppResult},
models::{
Candle, CandleInterval, Pair, SubscriptionRequest, SubscriptionResponse,
SubscriptionResponseData, Trade,
},
};
pub struct PoloniexClient {
rest_base_url: Url,
ws_base_url: Url,
}
impl PoloniexClient {
pub fn new(rest_base_url: &str, ws_base_url: &str) -> AppResult<Self> {
let rest_base_url = rest_base_url.parse()?;
let ws_base_url = ws_base_url.parse()?;
Ok(Self {
rest_base_url,
ws_base_url,
})
}
pub async fn get_historical_candles(
&self,
pair: &Pair,
interval: CandleInterval,
start_date: NaiveDateTime,
end_date: NaiveDateTime,
) -> AppResult<Vec<Candle>> {
let mut req = self
.rest_base_url
.join(&format!("/markets/{}/candles", pair.to_string()))?;
req.query_pairs_mut()
.append_pair(
"startTime",
&start_date.and_utc().timestamp_millis().to_string(),
)
.append_pair(
"endTime",
&end_date.and_utc().timestamp_millis().to_string(),
)
.append_pair("interval", interval.as_ref())
.append_pair("limit", "500");
let result = reqwest::get(req).await?;
if let Err(e) = result.error_for_status_ref() {
println!("{:?}", result.text().await);
return Err(e.into());
};
result.json().await.map_err(AppError::from)
}
pub async fn recent_trades_stream(
&self,
pair: &Pair,
) -> AppResult<Pin<Box<dyn Stream<Item = AppResult<Trade>>>>> {
let mut ws = reqwest_websocket::websocket(self.ws_base_url.clone()).await?;
let req = SubscriptionRequest {
event: "subscribe".to_string(),
channel: vec!["trades".to_string()],
symbols: vec![pair.to_string()],
};
ws.send(Message::Text(serde_json::to_string(&req)?)).await?;
Ok(Box::pin(async_stream::stream! {
while let Some(message) = ws.try_next().await.unwrap() {
match message {
Message::Text(text) => {
if let Ok(sub) = serde_json::from_str::<SubscriptionResponse>(&text) {
println!("{sub:?}");
continue
}
dbg!(&text);
let trades = serde_json::from_str::<SubscriptionResponseData<Trade>>(&text)?;
for trade in trades.data {
yield Ok(trade)
}
},
Message::Close {..} => {
eprintln!("trades stream closed: {message:?}");
break
},
m => eprintln!("unknown message {m:?}")
}
}
}))
}
}

@ -0,0 +1,159 @@
use std::{fs, path::Path};
use rusqlite::{self, params, Connection};
use crate::{
error::{AppError, AppResult},
models::{Candle, Trade},
};
pub struct Repo {
conn: Connection,
}
impl Repo {
pub fn new_init(db_path: impl AsRef<Path>) -> AppResult<Self> {
let path = db_path.as_ref();
// постоянно создаём новую бд для упрощения тестирования
fs::remove_file(path).ok();
let conn = Connection::open(path)?;
conn.execute(
"
CREATE TABLE IF NOT EXISTS trades(
symbol TEXT NOT NULL,
amount TEXT NOT NULL,
taker_side TEXT NOT NULL,
quantity TEXT NOT NULL,
create_time INT NOT NULL,
price TEXT NOT NULL,
id TEXT NOT NULL,
ts INT NOT NULL
);
",
[],
)?;
conn.execute(
"
CREATE TABLE IF NOT EXISTS candles(
low TEXT NOT NULL,
high TEXT NOT NULL,
open TEXT NOT NULL,
close TEXT NOT NULL,
amount TEXT NOT NULL,
quantity TEXT NOT NULL,
buy_taker_amount TEXT NOT NULL,
buy_taker_quantity TEXT NOT NULL,
trade_count INT NOT NULL,
ts INT NOT NULL,
weighted_average TEXT NOT NULL,
interval TEXT NOT NULL,
start_time INT NOT NULL,
close_time INT NOT NULL
);
",
[],
)?;
Ok(Self { conn })
}
pub fn insert_candle(&self, candle: &Candle) -> AppResult<usize> {
let q = "
INSERT INTO candles(
low,
high,
open,
close,
amount,
quantity,
buy_taker_amount,
buy_taker_quantity,
trade_count,
ts,
weighted_average,
interval,
start_time,
close_time
) VALUES (
?1,
?2,
?3,
?4,
?5,
?6,
?7,
?8,
?9,
?10,
?11,
?12,
?13,
?14
)
";
self.conn
.execute(
q,
params![
&candle.low,
&candle.high,
&candle.open,
&candle.close,
&candle.amount,
&candle.quantity,
&candle.buy_taker_amount,
&candle.buy_taker_quantity,
&candle.trade_count,
&candle.ts.and_utc().timestamp_millis(),
&candle.weighted_average,
&candle.interval,
&candle.start_time.and_utc().timestamp_millis(),
&candle.close_time.and_utc().timestamp_millis(),
],
)
.map_err(AppError::from)
}
pub fn insert_trade(&self, trade: &Trade) -> AppResult<usize> {
let q = "
INSERT INTO trades(
symbol,
amount,
taker_side,
quantity,
create_time,
price,
id,
ts
) VALUES (
?1,
?2,
?3,
?4,
?5,
?6,
?7,
?8
);
";
self.conn
.execute(
&q,
params![
&trade.symbol,
&trade.amount,
&trade.taker_side,
&trade.quantity,
&trade.create_time.and_utc().timestamp_millis(),
&trade.price,
&trade.id,
&trade.ts.and_utc().timestamp_millis()
],
)
.map_err(AppError::from)
}
}
Loading…
Cancel
Save