diff --git a/src/error.rs b/src/error.rs index 297cf89..1ba2c5a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -19,6 +19,9 @@ pub enum AppError { #[error(transparent)] DbError(#[from] sqlx::Error), + #[error(transparent)] + DbMigrateError(#[from] sqlx::migrate::MigrateError), + #[error(transparent)] StrumError(#[from] strum::ParseError), diff --git a/src/main.rs b/src/main.rs index 719ae7d..db60241 100644 --- a/src/main.rs +++ b/src/main.rs @@ -165,7 +165,7 @@ async fn _main() -> AppResult<()> { &config.poloniex_rest_url, &config.poloniex_ws_url, )); - let repo = Arc::new(SqliteRepo::new(&config.database_url).await?); + let repo = Arc::new(SqliteRepo::new_migrate(&config.database_url).await?); let base_start_time = NaiveDate::from_ymd_opt(2024, 12, 1) .unwrap() .and_hms_opt(0, 0, 0) @@ -221,9 +221,14 @@ async fn _main() -> AppResult<()> { let mut trades = poloniex_client.recent_trades_stream(&config.pairs).await?; while let Some(t) = trades.next().await { - println!("{t:?}"); - - let Ok(trade) = t else { break }; + let trade = match t { + Ok(t) => t, + Err(e) => { + println!("trade stream error: {e:?}"); + break; + } + }; + println!("inserting {trade:?}"); for interval in &config.intervals { tokio::spawn({ diff --git a/src/repos/sqlite.rs b/src/repos/sqlite.rs index 641015d..a8d7f97 100644 --- a/src/repos/sqlite.rs +++ b/src/repos/sqlite.rs @@ -1,4 +1,6 @@ -use sqlx::SqlitePool; +use std::str::FromStr; + +use sqlx::{sqlite::SqliteConnectOptions, SqlitePool}; use crate::{ error::{AppError, AppResult}, @@ -12,8 +14,14 @@ pub struct SqliteRepo { } impl SqliteRepo { - pub async fn new(db_url: &str) -> AppResult { - let pool = SqlitePool::connect(db_url).await?; + pub async fn new_migrate(db_url: &str) -> AppResult { + let conn_options = SqliteConnectOptions::from_str(db_url)?.create_if_missing(true); + + println!("trying to open db {db_url}"); + let pool = SqlitePool::connect_with(conn_options).await?; + + println!("applying migrations"); + sqlx::migrate!("./migrations").run(&pool).await?; Ok(Self { pool }) }