Skip to content

Commit

Permalink
test: remove redis from hedging test (#407)
Browse files Browse the repository at this point in the history
* test: remove redis from hedging test

* fix: spawning of threads

* fix: can now open and close position on exchange

* fix: add timeout after buy/sell usd

* fix: add try loop to check if position is updated

* chore: increase runtime for nextest

* fix: include buy contracts via okex_client

* chore: increase timeout for test

* chore: delete fixtures

---------

Co-authored-by: Seb Ver <sebver@pop-os.localdomain>
  • Loading branch information
thevaibhav-dixit and Seb Ver authored Aug 21, 2023
1 parent c620104 commit 69ee0d4
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 182 deletions.
2 changes: 1 addition & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[profile.default]
slow-timeout = { period = "45s", terminate-after = 2 }
slow-timeout = { period = "90s", terminate-after = 2 }
13 changes: 0 additions & 13 deletions hedging/tests/fixtures/hedging.json

This file was deleted.

347 changes: 179 additions & 168 deletions hedging/tests/hedging.rs
Original file line number Diff line number Diff line change
@@ -1,170 +1,181 @@
#![allow(clippy::or_fun_call)]

// use futures::{stream::StreamExt, Stream};
// use galoy_client::GaloyClientConfig;
// use rust_decimal::Decimal;
// use rust_decimal_macros::dec;
// use serial_test::serial;

// use std::{env, fs};

// use okex_client::*;
// use shared::{payload::*, pubsub::*};

// use hedging::*;

// #[derive(serde::Deserialize)]
// struct Fixture {
// payloads: Vec<SynthUsdLiabilityPayload>,
// }

// fn load_fixture(path: &str) -> anyhow::Result<Fixture> {
// let contents = fs::read_to_string(path).expect("Couldn't load fixtures");
// Ok(serde_json::from_str(&contents)?)
// }

// fn okex_config() -> OkexConfig {
// let api_key = env::var("OKEX_API_KEY").expect("OKEX_API_KEY not set");
// let passphrase = env::var("OKEX_PASSPHRASE").expect("OKEX_PASS_PHRASE not set");
// let secret_key = env::var("OKEX_SECRET_KEY").expect("OKEX_SECRET_KEY not set");
// OkexConfig {
// client: OkexClientConfig {
// api_key,
// passphrase,
// secret_key,
// simulated: true,
// },
// ..Default::default()
// }
// }

// fn galoy_client_config() -> GaloyClientConfig {
// let api = env::var("GALOY_GRAPHQL_URI").expect("GALOY_GRAPHQL_URI not set");
// let phone_number = env::var("PHONE_NUMBER").expect("PHONE_NUMBER not set");
// let code = env::var("AUTH_CODE").expect("AUTH_CODE not set");

// GaloyClientConfig {
// api,
// phone_number,
// auth_code: code,
// }
// }

// async fn expect_exposure_between(
// mut stream: impl Stream<Item = Envelope<OkexBtcUsdSwapPositionPayload>> + Unpin,
// lower: Decimal,
// upper: Decimal,
// ) {
// let mut passed = false;
// for _ in 0..=20 {
// let pos = stream.next().await.unwrap().payload.signed_usd_exposure;
// passed = pos < upper && pos > lower;
// if passed {
// break;
// }
// }
// assert!(passed);
// }

// async fn expect_exposure_below(
// mut stream: impl Stream<Item = Envelope<OkexBtcUsdSwapPositionPayload>> + Unpin,
// expected: Decimal,
// ) {
// let mut passed = false;
// for _ in 0..=10 {
// let pos = stream.next().await.unwrap().payload.signed_usd_exposure;
// passed = pos < expected;
// if passed {
// break;
// }
// tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// }
// assert!(passed);
// }

// async fn expect_exposure_equal(
// mut stream: impl Stream<Item = Envelope<OkexBtcUsdSwapPositionPayload>> + Unpin,
// expected: Decimal,
// ) {
// let mut passed = false;
// for _ in 0..=20 {
// let pos = stream.next().await.unwrap().payload.signed_usd_exposure;
// passed = pos == expected;
// if passed {
// break;
// }
// }
// assert!(passed);
// }

// #[tokio::test]
// #[serial]
// #[ignore = "okex is very unstable"]
// async fn hedging() -> anyhow::Result<()> {
// let (_, tick_recv) = memory::channel(chrono::Duration::from_std(
// std::time::Duration::from_secs(1),
// )?);
// let redis_host = std::env::var("REDIS_HOST").unwrap_or("localhost".to_string());
// let pubsub_config = PubSubConfig {
// host: Some(redis_host),
// ..PubSubConfig::default()
// };
// let pg_host = std::env::var("PG_HOST").unwrap_or("localhost".to_string());
// let pg_con = format!("postgres://user:password@{pg_host}:5432/pg",);
// let pool = sqlx::PgPool::connect(&pg_con).await?;

// let publisher = Publisher::new(pubsub_config.clone()).await?;
// let mut subscriber = Subscriber::new(pubsub_config.clone()).await?;
// let mut stream = subscriber
// .subscribe::<OkexBtcUsdSwapPositionPayload>()
// .await?;

// tokio::spawn(async move {
// let (_, recv) = futures::channel::mpsc::unbounded();
// HedgingApp::run(
// pool.clone(),
// recv,
// HedgingAppConfig {
// ..Default::default()
// },
// okex_config(),
// galoy_client_config(),
// pubsub_config.clone(),
// tick_recv.resubscribe(),
// )
// .await
// .expect("HedgingApp failed");
// });
// tokio::time::sleep(std::time::Duration::from_secs(5)).await;

// let mut payloads = load_fixture("./tests/fixtures/hedging.json")
// .expect("Couldn't load fixtures")
// .payloads
// .into_iter();
// publisher.publish(payloads.next().unwrap()).await?;

// let okex = OkexClient::new(okex_config().client).await?;
// expect_exposure_equal(&mut stream, dec!(0)).await;

// publisher.publish(payloads.next().unwrap()).await?;

// for idx in 0..=1 {
// expect_exposure_between(&mut stream, dec!(-21000), dec!(-19000)).await;

// if idx == 0 {
// okex.place_order(
// ClientOrderId::new(),
// OkexOrderSide::Sell,
// &BtcUsdSwapContracts::from(5),
// )
// .await?;
// expect_exposure_below(&mut stream, dec!(-50000)).await;
// }
// }

// publisher.publish(payloads.next().unwrap()).await?;

// expect_exposure_equal(&mut stream, dec!(0)).await;

// Ok(())
// }
use galoy_client::GaloyClientConfig;
use rust_decimal_macros::dec;
use serial_test::serial;

use std::env;

use ledger::*;
use okex_client::*;
use shared::pubsub::*;

use hedging::*;

fn okex_config() -> OkexConfig {
let api_key = env::var("OKEX_API_KEY").expect("OKEX_API_KEY not set");
let passphrase = env::var("OKEX_PASSPHRASE").expect("OKEX_PASS_PHRASE not set");
let secret_key = env::var("OKEX_SECRET_KEY").expect("OKEX_SECRET_KEY not set");
OkexConfig {
client: OkexClientConfig {
api_key,
passphrase,
secret_key,
simulated: true,
},
..Default::default()
}
}

fn galoy_client_config() -> GaloyClientConfig {
let api = env::var("GALOY_GRAPHQL_URI").expect("GALOY_GRAPHQL_URI not set");
let phone_number = env::var("PHONE_NUMBER").expect("PHONE_NUMBER not set");
let code = env::var("AUTH_CODE").expect("AUTH_CODE not set");

GaloyClientConfig {
api,
phone_number,
auth_code: code,
}
}

#[tokio::test]
#[serial]
async fn hedging() -> anyhow::Result<()> {
let pg_host = std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into());
let pg_con = format!("postgres://user:password@{}:5432/pg", pg_host);
let pool = sqlx::PgPool::connect(&pg_con).await?;
let ledger = ledger::Ledger::init(&pool).await?;

let (send, mut receive) = tokio::sync::mpsc::channel(1);
let (_, tick_recv) = memory::channel(chrono::Duration::from_std(
std::time::Duration::from_secs(1),
)?);

tokio::spawn(async move {
let (_, recv) = futures::channel::mpsc::unbounded();
let _ = send.try_send(
HedgingApp::run(
pool,
recv,
HedgingAppConfig {
..Default::default()
},
okex_config(),
galoy_client_config(),
tick_recv.resubscribe(),
)
.await
.expect("HedgingApp failed"),
);
});
let _reason = receive.recv().await.expect("Didn't receive msg");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;

let pool = sqlx::PgPool::connect(&pg_con).await?;
ledger
.user_buys_usd(
pool.clone().begin().await?,
LedgerTxId::new(),
UserBuysUsdParams {
satoshi_amount: dec!(1000000),
usd_cents_amount: dec!(50000),
meta: UserBuysUsdMeta {
timestamp: chrono::Utc::now(),
btc_tx_id: "btc_tx_id".into(),
usd_tx_id: "usd_tx_id".into(),
},
},
)
.await?;
let mut event = ledger.usd_okex_position_balance_events().await?;
let mut passed = false;
for _ in 0..=20 {
let user_buy_event = event.recv().await?;
// checks if a position of $-500 gets opened on the exchange.
if let ledger::LedgerEventData::BalanceUpdated(data) = user_buy_event.data {
if (data.settled_cr_balance - data.settled_dr_balance) == dec!(-500) {
passed = true;
break;
}
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if !passed {
panic!("Could not open a position on the exchange!");
}

let okex = OkexClient::new(okex_config().client).await?;
okex.place_order(
ClientOrderId::new(),
OkexOrderSide::Buy,
&BtcUsdSwapContracts::from(5),
)
.await?;
passed = false;
for _ in 0..20 {
let PositionSize { usd_cents, .. } = okex.get_position_in_signed_usd_cents().await?;
// checks if the position gets closed via OkexClient
if usd_cents / dec!(100) == dec!(0) {
passed = true;
break;
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if !passed {
panic!("Could not close the position via OkexClient!");
}

passed = false;
for _ in 0..=20 {
let user_buy_event = event.recv().await?;
// checks if a position of $-500 gets opened on the exchange.
if let ledger::LedgerEventData::BalanceUpdated(data) = user_buy_event.data {
if (data.settled_cr_balance - data.settled_dr_balance) == dec!(-500) {
passed = true;
break;
}
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if !passed {
panic!("Could not open a position on the exchange after closing it via OkexClient!");
}

ledger
.user_sells_usd(
pool.begin().await?,
LedgerTxId::new(),
UserSellsUsdParams {
satoshi_amount: dec!(1000000),
usd_cents_amount: dec!(50000),
meta: UserSellsUsdMeta {
timestamp: chrono::Utc::now(),
btc_tx_id: "btc_tx_id".into(),
usd_tx_id: "usd_tx_id".into(),
},
},
)
.await?;
passed = false;
for _ in 0..=20 {
let user_sell_event = event.recv().await?;
// checks if the position gets closed on the exchange.
if let ledger::LedgerEventData::BalanceUpdated(data) = user_sell_event.data {
if (data.settled_cr_balance - data.settled_dr_balance) == dec!(0) {
passed = true;
break;
}
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if !passed {
panic!("Could not close the position on the exchange");
}

Ok(())
}

0 comments on commit 69ee0d4

Please sign in to comment.