From 69ee0d45f2a42d14bf7f7521bdf1ae54585782e9 Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Mon, 21 Aug 2023 20:00:32 +0530 Subject: [PATCH] test: remove redis from hedging test (#407) * test: remove redis from hedging test * fix: spawning of threads * fix: can now open and close position on exchange * fix: add timeout after buy/sell usd * fix: add try loop to check if position is updated * chore: increase runtime for nextest * fix: include buy contracts via okex_client * chore: increase timeout for test * chore: delete fixtures --------- Co-authored-by: Seb Ver --- .config/nextest.toml | 2 +- hedging/tests/fixtures/hedging.json | 13 -- hedging/tests/hedging.rs | 347 ++++++++++++++-------------- 3 files changed, 180 insertions(+), 182 deletions(-) delete mode 100644 hedging/tests/fixtures/hedging.json diff --git a/.config/nextest.toml b/.config/nextest.toml index 69ec1b16b..de661f04b 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,2 +1,2 @@ [profile.default] -slow-timeout = { period = "45s", terminate-after = 2 } +slow-timeout = { period = "90s", terminate-after = 2 } diff --git a/hedging/tests/fixtures/hedging.json b/hedging/tests/fixtures/hedging.json deleted file mode 100644 index cd165b3bb..000000000 --- a/hedging/tests/fixtures/hedging.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "payloads": [ - { - "liability": "0" - }, - { - "liability": "20000" - }, - { - "liability": "0" - } - ] -} diff --git a/hedging/tests/hedging.rs b/hedging/tests/hedging.rs index 1ece5de3c..780a69771 100644 --- a/hedging/tests/hedging.rs +++ b/hedging/tests/hedging.rs @@ -1,170 +1,181 @@ #![allow(clippy::or_fun_call)] -// use futures::{stream::StreamExt, Stream}; -// use galoy_client::GaloyClientConfig; -// use rust_decimal::Decimal; -// use rust_decimal_macros::dec; -// use serial_test::serial; - -// use std::{env, fs}; - -// use okex_client::*; -// use shared::{payload::*, pubsub::*}; - -// use hedging::*; - -// #[derive(serde::Deserialize)] -// struct Fixture { -// payloads: Vec, -// } - -// fn load_fixture(path: &str) -> anyhow::Result { -// let contents = fs::read_to_string(path).expect("Couldn't load fixtures"); -// Ok(serde_json::from_str(&contents)?) -// } - -// fn okex_config() -> OkexConfig { -// let api_key = env::var("OKEX_API_KEY").expect("OKEX_API_KEY not set"); -// let passphrase = env::var("OKEX_PASSPHRASE").expect("OKEX_PASS_PHRASE not set"); -// let secret_key = env::var("OKEX_SECRET_KEY").expect("OKEX_SECRET_KEY not set"); -// OkexConfig { -// client: OkexClientConfig { -// api_key, -// passphrase, -// secret_key, -// simulated: true, -// }, -// ..Default::default() -// } -// } - -// fn galoy_client_config() -> GaloyClientConfig { -// let api = env::var("GALOY_GRAPHQL_URI").expect("GALOY_GRAPHQL_URI not set"); -// let phone_number = env::var("PHONE_NUMBER").expect("PHONE_NUMBER not set"); -// let code = env::var("AUTH_CODE").expect("AUTH_CODE not set"); - -// GaloyClientConfig { -// api, -// phone_number, -// auth_code: code, -// } -// } - -// async fn expect_exposure_between( -// mut stream: impl Stream> + Unpin, -// lower: Decimal, -// upper: Decimal, -// ) { -// let mut passed = false; -// for _ in 0..=20 { -// let pos = stream.next().await.unwrap().payload.signed_usd_exposure; -// passed = pos < upper && pos > lower; -// if passed { -// break; -// } -// } -// assert!(passed); -// } - -// async fn expect_exposure_below( -// mut stream: impl Stream> + Unpin, -// expected: Decimal, -// ) { -// let mut passed = false; -// for _ in 0..=10 { -// let pos = stream.next().await.unwrap().payload.signed_usd_exposure; -// passed = pos < expected; -// if passed { -// break; -// } -// tokio::time::sleep(std::time::Duration::from_millis(200)).await; -// } -// assert!(passed); -// } - -// async fn expect_exposure_equal( -// mut stream: impl Stream> + Unpin, -// expected: Decimal, -// ) { -// let mut passed = false; -// for _ in 0..=20 { -// let pos = stream.next().await.unwrap().payload.signed_usd_exposure; -// passed = pos == expected; -// if passed { -// break; -// } -// } -// assert!(passed); -// } - -// #[tokio::test] -// #[serial] -// #[ignore = "okex is very unstable"] -// async fn hedging() -> anyhow::Result<()> { -// let (_, tick_recv) = memory::channel(chrono::Duration::from_std( -// std::time::Duration::from_secs(1), -// )?); -// let redis_host = std::env::var("REDIS_HOST").unwrap_or("localhost".to_string()); -// let pubsub_config = PubSubConfig { -// host: Some(redis_host), -// ..PubSubConfig::default() -// }; -// let pg_host = std::env::var("PG_HOST").unwrap_or("localhost".to_string()); -// let pg_con = format!("postgres://user:password@{pg_host}:5432/pg",); -// let pool = sqlx::PgPool::connect(&pg_con).await?; - -// let publisher = Publisher::new(pubsub_config.clone()).await?; -// let mut subscriber = Subscriber::new(pubsub_config.clone()).await?; -// let mut stream = subscriber -// .subscribe::() -// .await?; - -// tokio::spawn(async move { -// let (_, recv) = futures::channel::mpsc::unbounded(); -// HedgingApp::run( -// pool.clone(), -// recv, -// HedgingAppConfig { -// ..Default::default() -// }, -// okex_config(), -// galoy_client_config(), -// pubsub_config.clone(), -// tick_recv.resubscribe(), -// ) -// .await -// .expect("HedgingApp failed"); -// }); -// tokio::time::sleep(std::time::Duration::from_secs(5)).await; - -// let mut payloads = load_fixture("./tests/fixtures/hedging.json") -// .expect("Couldn't load fixtures") -// .payloads -// .into_iter(); -// publisher.publish(payloads.next().unwrap()).await?; - -// let okex = OkexClient::new(okex_config().client).await?; -// expect_exposure_equal(&mut stream, dec!(0)).await; - -// publisher.publish(payloads.next().unwrap()).await?; - -// for idx in 0..=1 { -// expect_exposure_between(&mut stream, dec!(-21000), dec!(-19000)).await; - -// if idx == 0 { -// okex.place_order( -// ClientOrderId::new(), -// OkexOrderSide::Sell, -// &BtcUsdSwapContracts::from(5), -// ) -// .await?; -// expect_exposure_below(&mut stream, dec!(-50000)).await; -// } -// } - -// publisher.publish(payloads.next().unwrap()).await?; - -// expect_exposure_equal(&mut stream, dec!(0)).await; - -// Ok(()) -// } +use galoy_client::GaloyClientConfig; +use rust_decimal_macros::dec; +use serial_test::serial; + +use std::env; + +use ledger::*; +use okex_client::*; +use shared::pubsub::*; + +use hedging::*; + +fn okex_config() -> OkexConfig { + let api_key = env::var("OKEX_API_KEY").expect("OKEX_API_KEY not set"); + let passphrase = env::var("OKEX_PASSPHRASE").expect("OKEX_PASS_PHRASE not set"); + let secret_key = env::var("OKEX_SECRET_KEY").expect("OKEX_SECRET_KEY not set"); + OkexConfig { + client: OkexClientConfig { + api_key, + passphrase, + secret_key, + simulated: true, + }, + ..Default::default() + } +} + +fn galoy_client_config() -> GaloyClientConfig { + let api = env::var("GALOY_GRAPHQL_URI").expect("GALOY_GRAPHQL_URI not set"); + let phone_number = env::var("PHONE_NUMBER").expect("PHONE_NUMBER not set"); + let code = env::var("AUTH_CODE").expect("AUTH_CODE not set"); + + GaloyClientConfig { + api, + phone_number, + auth_code: code, + } +} + +#[tokio::test] +#[serial] +async fn hedging() -> anyhow::Result<()> { + let pg_host = std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into()); + let pg_con = format!("postgres://user:password@{}:5432/pg", pg_host); + let pool = sqlx::PgPool::connect(&pg_con).await?; + let ledger = ledger::Ledger::init(&pool).await?; + + let (send, mut receive) = tokio::sync::mpsc::channel(1); + let (_, tick_recv) = memory::channel(chrono::Duration::from_std( + std::time::Duration::from_secs(1), + )?); + + tokio::spawn(async move { + let (_, recv) = futures::channel::mpsc::unbounded(); + let _ = send.try_send( + HedgingApp::run( + pool, + recv, + HedgingAppConfig { + ..Default::default() + }, + okex_config(), + galoy_client_config(), + tick_recv.resubscribe(), + ) + .await + .expect("HedgingApp failed"), + ); + }); + let _reason = receive.recv().await.expect("Didn't receive msg"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + let pool = sqlx::PgPool::connect(&pg_con).await?; + ledger + .user_buys_usd( + pool.clone().begin().await?, + LedgerTxId::new(), + UserBuysUsdParams { + satoshi_amount: dec!(1000000), + usd_cents_amount: dec!(50000), + meta: UserBuysUsdMeta { + timestamp: chrono::Utc::now(), + btc_tx_id: "btc_tx_id".into(), + usd_tx_id: "usd_tx_id".into(), + }, + }, + ) + .await?; + let mut event = ledger.usd_okex_position_balance_events().await?; + let mut passed = false; + for _ in 0..=20 { + let user_buy_event = event.recv().await?; + // checks if a position of $-500 gets opened on the exchange. + if let ledger::LedgerEventData::BalanceUpdated(data) = user_buy_event.data { + if (data.settled_cr_balance - data.settled_dr_balance) == dec!(-500) { + passed = true; + break; + } + } else { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + if !passed { + panic!("Could not open a position on the exchange!"); + } + + let okex = OkexClient::new(okex_config().client).await?; + okex.place_order( + ClientOrderId::new(), + OkexOrderSide::Buy, + &BtcUsdSwapContracts::from(5), + ) + .await?; + passed = false; + for _ in 0..20 { + let PositionSize { usd_cents, .. } = okex.get_position_in_signed_usd_cents().await?; + // checks if the position gets closed via OkexClient + if usd_cents / dec!(100) == dec!(0) { + passed = true; + break; + } else { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + if !passed { + panic!("Could not close the position via OkexClient!"); + } + + passed = false; + for _ in 0..=20 { + let user_buy_event = event.recv().await?; + // checks if a position of $-500 gets opened on the exchange. + if let ledger::LedgerEventData::BalanceUpdated(data) = user_buy_event.data { + if (data.settled_cr_balance - data.settled_dr_balance) == dec!(-500) { + passed = true; + break; + } + } else { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + if !passed { + panic!("Could not open a position on the exchange after closing it via OkexClient!"); + } + + ledger + .user_sells_usd( + pool.begin().await?, + LedgerTxId::new(), + UserSellsUsdParams { + satoshi_amount: dec!(1000000), + usd_cents_amount: dec!(50000), + meta: UserSellsUsdMeta { + timestamp: chrono::Utc::now(), + btc_tx_id: "btc_tx_id".into(), + usd_tx_id: "usd_tx_id".into(), + }, + }, + ) + .await?; + passed = false; + for _ in 0..=20 { + let user_sell_event = event.recv().await?; + // checks if the position gets closed on the exchange. + if let ledger::LedgerEventData::BalanceUpdated(data) = user_sell_event.data { + if (data.settled_cr_balance - data.settled_dr_balance) == dec!(0) { + passed = true; + break; + } + } else { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + if !passed { + panic!("Could not close the position on the exchange"); + } + + Ok(()) +}