Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: remove redis from hedging test #407

Merged
merged 9 commits into from
Aug 21, 2023
2 changes: 1 addition & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[profile.default]
slow-timeout = { period = "45s", terminate-after = 2 }
slow-timeout = { period = "90s", terminate-after = 2 }
13 changes: 0 additions & 13 deletions hedging/tests/fixtures/hedging.json

This file was deleted.

347 changes: 179 additions & 168 deletions hedging/tests/hedging.rs
Original file line number Diff line number Diff line change
@@ -1,170 +1,181 @@
#![allow(clippy::or_fun_call)]

// use futures::{stream::StreamExt, Stream};
// use galoy_client::GaloyClientConfig;
// use rust_decimal::Decimal;
// use rust_decimal_macros::dec;
// use serial_test::serial;

// use std::{env, fs};

// use okex_client::*;
// use shared::{payload::*, pubsub::*};

// use hedging::*;

// #[derive(serde::Deserialize)]
// struct Fixture {
// payloads: Vec<SynthUsdLiabilityPayload>,
// }

// fn load_fixture(path: &str) -> anyhow::Result<Fixture> {
// let contents = fs::read_to_string(path).expect("Couldn't load fixtures");
// Ok(serde_json::from_str(&contents)?)
// }

// fn okex_config() -> OkexConfig {
// let api_key = env::var("OKEX_API_KEY").expect("OKEX_API_KEY not set");
// let passphrase = env::var("OKEX_PASSPHRASE").expect("OKEX_PASS_PHRASE not set");
// let secret_key = env::var("OKEX_SECRET_KEY").expect("OKEX_SECRET_KEY not set");
// OkexConfig {
// client: OkexClientConfig {
// api_key,
// passphrase,
// secret_key,
// simulated: true,
// },
// ..Default::default()
// }
// }

// fn galoy_client_config() -> GaloyClientConfig {
// let api = env::var("GALOY_GRAPHQL_URI").expect("GALOY_GRAPHQL_URI not set");
// let phone_number = env::var("PHONE_NUMBER").expect("PHONE_NUMBER not set");
// let code = env::var("AUTH_CODE").expect("AUTH_CODE not set");

// GaloyClientConfig {
// api,
// phone_number,
// auth_code: code,
// }
// }

// async fn expect_exposure_between(
// mut stream: impl Stream<Item = Envelope<OkexBtcUsdSwapPositionPayload>> + Unpin,
// lower: Decimal,
// upper: Decimal,
// ) {
// let mut passed = false;
// for _ in 0..=20 {
// let pos = stream.next().await.unwrap().payload.signed_usd_exposure;
// passed = pos < upper && pos > lower;
// if passed {
// break;
// }
// }
// assert!(passed);
// }

// async fn expect_exposure_below(
// mut stream: impl Stream<Item = Envelope<OkexBtcUsdSwapPositionPayload>> + Unpin,
// expected: Decimal,
// ) {
// let mut passed = false;
// for _ in 0..=10 {
// let pos = stream.next().await.unwrap().payload.signed_usd_exposure;
// passed = pos < expected;
// if passed {
// break;
// }
// tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// }
// assert!(passed);
// }

// async fn expect_exposure_equal(
// mut stream: impl Stream<Item = Envelope<OkexBtcUsdSwapPositionPayload>> + Unpin,
// expected: Decimal,
// ) {
// let mut passed = false;
// for _ in 0..=20 {
// let pos = stream.next().await.unwrap().payload.signed_usd_exposure;
// passed = pos == expected;
// if passed {
// break;
// }
// }
// assert!(passed);
// }

// #[tokio::test]
// #[serial]
// #[ignore = "okex is very unstable"]
// async fn hedging() -> anyhow::Result<()> {
// let (_, tick_recv) = memory::channel(chrono::Duration::from_std(
// std::time::Duration::from_secs(1),
// )?);
// let redis_host = std::env::var("REDIS_HOST").unwrap_or("localhost".to_string());
// let pubsub_config = PubSubConfig {
// host: Some(redis_host),
// ..PubSubConfig::default()
// };
// let pg_host = std::env::var("PG_HOST").unwrap_or("localhost".to_string());
// let pg_con = format!("postgres://user:password@{pg_host}:5432/pg",);
// let pool = sqlx::PgPool::connect(&pg_con).await?;

// let publisher = Publisher::new(pubsub_config.clone()).await?;
// let mut subscriber = Subscriber::new(pubsub_config.clone()).await?;
// let mut stream = subscriber
// .subscribe::<OkexBtcUsdSwapPositionPayload>()
// .await?;

// tokio::spawn(async move {
// let (_, recv) = futures::channel::mpsc::unbounded();
// HedgingApp::run(
// pool.clone(),
// recv,
// HedgingAppConfig {
// ..Default::default()
// },
// okex_config(),
// galoy_client_config(),
// pubsub_config.clone(),
// tick_recv.resubscribe(),
// )
// .await
// .expect("HedgingApp failed");
// });
// tokio::time::sleep(std::time::Duration::from_secs(5)).await;

// let mut payloads = load_fixture("./tests/fixtures/hedging.json")
// .expect("Couldn't load fixtures")
// .payloads
// .into_iter();
// publisher.publish(payloads.next().unwrap()).await?;

// let okex = OkexClient::new(okex_config().client).await?;
// expect_exposure_equal(&mut stream, dec!(0)).await;

// publisher.publish(payloads.next().unwrap()).await?;

// for idx in 0..=1 {
// expect_exposure_between(&mut stream, dec!(-21000), dec!(-19000)).await;

// if idx == 0 {
// okex.place_order(
// ClientOrderId::new(),
// OkexOrderSide::Sell,
// &BtcUsdSwapContracts::from(5),
// )
// .await?;
// expect_exposure_below(&mut stream, dec!(-50000)).await;
// }
// }

// publisher.publish(payloads.next().unwrap()).await?;

// expect_exposure_equal(&mut stream, dec!(0)).await;

// Ok(())
// }
use galoy_client::GaloyClientConfig;
use rust_decimal_macros::dec;
use serial_test::serial;

use std::env;

use ledger::*;
use okex_client::*;
use shared::pubsub::*;

use hedging::*;

fn okex_config() -> OkexConfig {
let api_key = env::var("OKEX_API_KEY").expect("OKEX_API_KEY not set");
let passphrase = env::var("OKEX_PASSPHRASE").expect("OKEX_PASS_PHRASE not set");
let secret_key = env::var("OKEX_SECRET_KEY").expect("OKEX_SECRET_KEY not set");
OkexConfig {
client: OkexClientConfig {
api_key,
passphrase,
secret_key,
simulated: true,
},
..Default::default()
}
}

fn galoy_client_config() -> GaloyClientConfig {
let api = env::var("GALOY_GRAPHQL_URI").expect("GALOY_GRAPHQL_URI not set");
let phone_number = env::var("PHONE_NUMBER").expect("PHONE_NUMBER not set");
let code = env::var("AUTH_CODE").expect("AUTH_CODE not set");

GaloyClientConfig {
api,
phone_number,
auth_code: code,
}
}

#[tokio::test]
#[serial]
async fn hedging() -> anyhow::Result<()> {
let pg_host = std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into());
let pg_con = format!("postgres://user:password@{}:5432/pg", pg_host);
let pool = sqlx::PgPool::connect(&pg_con).await?;
let ledger = ledger::Ledger::init(&pool).await?;

let (send, mut receive) = tokio::sync::mpsc::channel(1);
let (_, tick_recv) = memory::channel(chrono::Duration::from_std(
std::time::Duration::from_secs(1),
)?);

tokio::spawn(async move {
let (_, recv) = futures::channel::mpsc::unbounded();
let _ = send.try_send(
HedgingApp::run(
pool,
recv,
HedgingAppConfig {
..Default::default()
},
okex_config(),
galoy_client_config(),
tick_recv.resubscribe(),
)
.await
.expect("HedgingApp failed"),
);
});
let _reason = receive.recv().await.expect("Didn't receive msg");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;

let pool = sqlx::PgPool::connect(&pg_con).await?;
ledger
.user_buys_usd(
pool.clone().begin().await?,
LedgerTxId::new(),
UserBuysUsdParams {
satoshi_amount: dec!(1000000),
usd_cents_amount: dec!(50000),
meta: UserBuysUsdMeta {
timestamp: chrono::Utc::now(),
btc_tx_id: "btc_tx_id".into(),
usd_tx_id: "usd_tx_id".into(),
},
},
)
.await?;
let mut event = ledger.usd_okex_position_balance_events().await?;
let mut passed = false;
for _ in 0..=20 {
let user_buy_event = event.recv().await?;
// checks if a position of $-500 gets opened on the exchange.
if let ledger::LedgerEventData::BalanceUpdated(data) = user_buy_event.data {
if (data.settled_cr_balance - data.settled_dr_balance) == dec!(-500) {
passed = true;
break;
}
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if !passed {
panic!("Could not open a position on the exchange!");
}

let okex = OkexClient::new(okex_config().client).await?;
okex.place_order(
ClientOrderId::new(),
OkexOrderSide::Buy,
&BtcUsdSwapContracts::from(5),
)
.await?;
passed = false;
for _ in 0..20 {
let PositionSize { usd_cents, .. } = okex.get_position_in_signed_usd_cents().await?;
// checks if the position gets closed via OkexClient
if usd_cents / dec!(100) == dec!(0) {
passed = true;
break;
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if !passed {
panic!("Could not close the position via OkexClient!");
}

passed = false;
for _ in 0..=20 {
let user_buy_event = event.recv().await?;
// checks if a position of $-500 gets opened on the exchange.
if let ledger::LedgerEventData::BalanceUpdated(data) = user_buy_event.data {
if (data.settled_cr_balance - data.settled_dr_balance) == dec!(-500) {
passed = true;
break;
}
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if !passed {
panic!("Could not open a position on the exchange after closing it via OkexClient!");
}

ledger
.user_sells_usd(
pool.begin().await?,
LedgerTxId::new(),
UserSellsUsdParams {
satoshi_amount: dec!(1000000),
usd_cents_amount: dec!(50000),
meta: UserSellsUsdMeta {
timestamp: chrono::Utc::now(),
btc_tx_id: "btc_tx_id".into(),
usd_tx_id: "usd_tx_id".into(),
},
},
)
.await?;
passed = false;
for _ in 0..=20 {
let user_sell_event = event.recv().await?;
// checks if the position gets closed on the exchange.
if let ledger::LedgerEventData::BalanceUpdated(data) = user_sell_event.data {
if (data.settled_cr_balance - data.settled_dr_balance) == dec!(0) {
passed = true;
break;
}
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if !passed {
panic!("Could not close the position on the exchange");
}

Ok(())
}