diff --git a/Cargo.lock b/Cargo.lock index 29725d67..9188185f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1045,6 +1045,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fslock" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "funty" version = "2.0.0" @@ -2573,6 +2583,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "serial_test", "sqlx", "stablesats-ledger", "stablesats-shared", @@ -3074,6 +3085,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" dependencies = [ "dashmap", + "fslock", "futures", "lazy_static", "log", @@ -3555,6 +3567,7 @@ dependencies = [ "rust_decimal_macros", "serde", "serde_json", + "serial_test", "sqlx", "sqlx-ledger", "stablesats-shared", diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index b70f46ce..08d5bb72 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -26,3 +26,4 @@ uuid = { workspace = true } [dev-dependencies] anyhow = "1.0.70" +serial_test = { version = "*" , features = ["file_locks"] } diff --git a/ledger/tests/ledger.rs b/ledger/tests/ledger.rs index c79f85b2..496c1125 100644 --- a/ledger/tests/ledger.rs +++ b/ledger/tests/ledger.rs @@ -1,5 +1,6 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; +use serial_test::file_serial; use stablesats_ledger::*; @@ -134,6 +135,7 @@ async fn adjust_exchange_position() -> anyhow::Result<()> { } #[tokio::test] +#[file_serial] async fn buy_and_sell_quotes() -> anyhow::Result<()> { let pool = init_pool().await?; diff --git a/quotes-server/Cargo.toml b/quotes-server/Cargo.toml index 0ee44912..b1c5e33b 100644 --- a/quotes-server/Cargo.toml +++ b/quotes-server/Cargo.toml @@ -41,3 +41,4 @@ tonic-build = { version = "0.8", features = ["prost"] } anyhow = "1.0.70" serde = "1.0.158" serde_json = "1.0.93" +serial_test = { version = "*" , features = [ "file_locks"] } diff --git a/quotes-server/tests/quote_app.rs b/quotes-server/tests/quote_app.rs new file mode 100644 index 00000000..29889bc5 --- /dev/null +++ b/quotes-server/tests/quote_app.rs @@ -0,0 +1,113 @@ +use chrono::Duration; +use rust_decimal_macros::dec; +use serial_test::file_serial; + +use quotes_server::error::QuotesAppError; +use quotes_server::{ + app::*, cache::OrderBookCacheError, ExchangePriceCacheError, QuotesExchangePriceCacheConfig, + QuotesFeeCalculatorConfig, +}; + +use shared::{payload::*, pubsub::*, time::*}; +fn load_fixture() -> OrderBookPayload { + OrderBookPayload { + bids: [( + PriceRaw::from(dec!(0.001)), + VolumeInCentsRaw::from(dec!(100_000_000)), + )] + .into_iter() + .collect(), + asks: [( + PriceRaw::from(dec!(0.01)), + VolumeInCentsRaw::from(dec!(100_000_000)), + )] + .into_iter() + .collect(), + timestamp: TimeStamp::from(10000000), + exchange: "okex".into(), + } +} + +#[tokio::test] +#[file_serial] +async fn quotes_app() -> anyhow::Result<()> { + let (tick_send, tick_recv) = + memory::channel(chrono::Duration::from_std(std::time::Duration::from_secs(2)).unwrap()); + let publisher = tick_send.clone(); + let mut subscriber = tick_recv.resubscribe(); + + let (_, recv) = futures::channel::mpsc::unbounded(); + let ex_cfgs = ExchangeWeights { + okex: Some(dec!(1.0)), + bitfinex: None, + }; + + let base_fee_rate = dec!(0.001); + let immediate_fee_rate = dec!(0.01); + let delayed_fee_rate = dec!(0.1); + + let pg_host = std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into()); + let pg_con = format!("postgres://user:password@{}:5432/pg", pg_host); + let pool = sqlx::PgPool::connect(&pg_con).await?; + + let app = QuotesApp::run( + pool, + recv, + QuotesServerHealthCheckConfig::default(), + QuotesFeeCalculatorConfig { + base_fee_rate, + immediate_fee_rate, + delayed_fee_rate, + }, + tick_recv, + QuotesExchangePriceCacheConfig::default(), + ex_cfgs, + QuotesConfig { + expiration_interval: Duration::seconds(2), + }, + ) + .await?; + + let err = app + .quote_cents_from_sats_for_buy(dec!(100_000_000), true) + .await; + if let Err(QuotesAppError::ExchangePriceCacheError(ExchangePriceCacheError::OrderBookCache( + OrderBookCacheError::NoSnapshotAvailable, + ))) = err + { + assert!(true) + } else { + assert!(false) + } + + let mut payload = load_fixture(); + tick_send + .publish(PriceStreamPayload::OkexBtcUsdSwapOrderBookPayload( + payload.clone(), + )) + .await?; + subscriber.next().await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + payload.timestamp = TimeStamp::now(); + publisher + .publish(PriceStreamPayload::OkexBtcUsdSwapOrderBookPayload(payload)) + .await?; + subscriber.next().await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let quote = app + .quote_cents_from_sats_for_buy(dec!(100_000_000), false) + .await; + assert!(quote.is_ok()); + let accepted = app.accept_quote(quote.unwrap().id).await; + assert!(accepted.is_ok()); + + let quote = app + .quote_cents_from_sats_for_buy(dec!(100_000_000), true) + .await; + assert!(quote.is_ok()); + assert!(quote.unwrap().is_accepted()); + + Ok(()) +}