Skip to content

Commit

Permalink
feat!: use ledger to trigger adjustment jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Feb 6, 2023
1 parent 70336b8 commit a26082a
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 162 deletions.
14 changes: 8 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hedging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
fail-on-warnings = []

[dependencies]
ledger = { path = "../ledger", package = "stablesats-ledger" }
shared = { path = "../shared", package = "stablesats-shared" }
okex-client = { path = "../okex-client" }
bitfinex-client = { path = "../bitfinex-client" }
Expand Down
107 changes: 33 additions & 74 deletions hedging/src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
mod config;

use futures::stream::StreamExt;
use opentelemetry::{propagation::TextMapPropagator, sdk::propagation::TraceContextPropagator};
use sqlxmq::OwnedHandle;
use tracing::{info_span, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use galoy_client::*;
use okex_client::*;
use shared::{
exchanges_config::OkexConfig,
health::HealthCheckTrigger,
payload::{OkexBtcUsdSwapPositionPayload, PriceStreamPayload, SynthUsdLiabilityPayload},
payload::{OkexBtcUsdSwapPositionPayload, PriceStreamPayload},
pubsub::{memory, CorrelationId, PubSubConfig, Publisher, Subscriber},
};

use crate::{
adjustment_action::*, error::*, job, okex_orders::*, okex_transfers::*, rebalance_action::*,
synth_usd_liability::*,
};

pub use config::*;
Expand All @@ -43,7 +40,6 @@ impl HedgingApp {
pubsub_config: PubSubConfig,
price_receiver: memory::Subscriber<PriceStreamPayload>,
) -> Result<Self, HedgingError> {
let synth_usd_liability = SynthUsdLiability::new(pool.clone());
let okex_orders = OkexOrders::new(pool.clone()).await?;
let okex_transfers = OkexTransfers::new(pool.clone()).await?;
let okex = OkexClient::new(okex_client_config).await?;
Expand All @@ -52,9 +48,10 @@ impl HedgingApp {
let funding_adjustment =
FundingAdjustment::new(funding_config.clone(), hedging_config.clone());
let hedging_adjustment = HedgingAdjustment::new(hedging_config);
let ledger = ledger::Ledger::init(&pool).await?;
let job_runner = job::start_job_runner(
pool.clone(),
synth_usd_liability.clone(),
ledger.clone(),
okex.clone(),
okex_orders,
okex_transfers.clone(),
Expand All @@ -66,19 +63,17 @@ impl HedgingApp {
funding_config.clone(),
)
.await?;
let liability_sub =
Self::spawn_synth_usd_listener(pubsub_config.clone(), synth_usd_liability.clone())
.await?;
Self::spawn_liability_balance_listener(pool.clone(), ledger.clone()).await?;
let position_sub = Self::spawn_okex_position_listener(
pubsub_config.clone(),
pool.clone(),
synth_usd_liability.clone(),
ledger.clone(),
hedging_adjustment,
)
.await?;
Self::spawn_okex_price_listener(
pool.clone(),
synth_usd_liability,
ledger,
okex,
funding_adjustment,
price_receiver.resubscribe(),
Expand All @@ -87,7 +82,6 @@ impl HedgingApp {
Self::spawn_health_checker(
health_check_trigger,
health_cfg,
liability_sub,
position_sub,
price_receiver,
)
Expand All @@ -101,7 +95,7 @@ impl HedgingApp {

async fn spawn_okex_price_listener(
pool: sqlx::PgPool,
synth_usd_liability: SynthUsdLiability,
ledger: ledger::Ledger,
okex: OkexClient,
funding_adjustment: FundingAdjustment,
mut tick_recv: memory::Subscriber<PriceStreamPayload>,
Expand All @@ -122,7 +116,7 @@ impl HedgingApp {
let _ = Self::handle_received_okex_price(
correlation_id,
&pool,
&synth_usd_liability,
&ledger,
&okex,
funding_adjustment.clone(),
)
Expand All @@ -137,11 +131,11 @@ impl HedgingApp {
async fn handle_received_okex_price(
correlation_id: CorrelationId,
pool: &sqlx::PgPool,
synth_usd_liability: &SynthUsdLiability,
ledger: &ledger::Ledger,
okex: &OkexClient,
funding_adjustment: FundingAdjustment,
) -> Result<(), HedgingError> {
let target_liability_in_cents = synth_usd_liability.get_latest_liability().await?;
let target_liability_in_cents = ledger.balances().target_liability_in_cents().await?;
let current_position_in_cents = okex.get_position_in_signed_usd_cents().await?.usd_cents;
let last_price_in_usd_cents = okex.get_last_price_in_usd_cents().await?.usd_cents;
let trading_available_balance = okex.trading_account_balance().await?;
Expand Down Expand Up @@ -172,40 +166,31 @@ impl HedgingApp {
}
}

async fn spawn_synth_usd_listener(
config: PubSubConfig,
synth_usd_liability: SynthUsdLiability,
) -> Result<Subscriber, HedgingError> {
let mut subscriber = Subscriber::new(config).await?;
let mut stream = subscriber.subscribe::<SynthUsdLiabilityPayload>().await?;
tokio::spawn(async move {
let propagator = TraceContextPropagator::new();

while let Some(msg) = stream.next().await {
let correlation_id = msg.meta.correlation_id;
let span = info_span!(
"synth_usd_liability_received",
message_type = %msg.payload_type,
correlation_id = %correlation_id
);
let context = propagator.extract(&msg.meta.tracing_data);
span.set_parent(context);
let _ = Self::handle_received_synth_usd_liability(
msg.payload,
correlation_id,
&synth_usd_liability,
)
.instrument(span)
.await;
async fn spawn_liability_balance_listener(
pool: sqlx::PgPool,
ledger: ledger::Ledger,
) -> Result<(), HedgingError> {
let mut events = ledger.usd_liability_balance_events().await;
loop {
match events.recv().await {
Ok(received) => {
if let ledger::LedgerEventData::BalanceUpdated(data) = received.data {
job::spawn_adjust_hedge(&pool, data.entry_id).await?;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => (),
_ => {
break;
}
}
});
Ok(subscriber)
}
Ok(())
}

async fn spawn_okex_position_listener(
config: PubSubConfig,
pool: sqlx::PgPool,
synth_usd_liability: SynthUsdLiability,
ledger: ledger::Ledger,
hedging_adjustment: HedgingAdjustment,
) -> Result<Subscriber, HedgingError> {
let mut subscriber = Subscriber::new(config).await?;
Expand All @@ -228,7 +213,7 @@ impl HedgingApp {
msg.payload,
correlation_id,
&pool,
&synth_usd_liability,
&ledger,
hedging_adjustment.clone(),
)
.instrument(span)
Expand All @@ -241,24 +226,20 @@ impl HedgingApp {
async fn spawn_health_checker(
mut health_check_trigger: HealthCheckTrigger,
health_cfg: HedgingAppHealthConfig,
liability_sub: Subscriber,
position_sub: Subscriber,
price_sub: memory::Subscriber<PriceStreamPayload>,
) {
tokio::spawn(async move {
while let Some(check) = health_check_trigger.next().await {
match (
liability_sub
.healthy(health_cfg.unhealthy_msg_interval_liability)
.await,
position_sub
.healthy(health_cfg.unhealthy_msg_interval_position)
.await,
price_sub
.healthy(health_cfg.unhealthy_msg_interval_price)
.await,
) {
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
(Err(e), _) | (_, Err(e)) => {
check.send(Err(e)).expect("Couldn't send response")
}
_ => check.send(Ok(())).expect("Couldn't send response"),
Expand All @@ -267,36 +248,14 @@ impl HedgingApp {
});
}

async fn handle_received_synth_usd_liability(
payload: SynthUsdLiabilityPayload,
correlation_id: CorrelationId,
synth_usd_liability: &SynthUsdLiability,
) -> Result<(), HedgingError> {
match synth_usd_liability
.insert_if_new(correlation_id, payload.liability)
.await
{
Ok(Some(mut tx)) => {
job::spawn_adjust_hedge(&mut tx, correlation_id).await?;
tx.commit().await?;
Ok(())
}
Ok(None) => Ok(()),
Err(e) => {
shared::tracing::insert_error_fields(tracing::Level::ERROR, &e);
Err(e)
}
}
}

async fn handle_received_okex_position(
payload: OkexBtcUsdSwapPositionPayload,
correlation_id: CorrelationId,
pool: &sqlx::PgPool,
synth_usd_liability: &SynthUsdLiability,
ledger: &ledger::Ledger,
hedging_adjustment: HedgingAdjustment,
) -> Result<(), HedgingError> {
let amount = synth_usd_liability.get_latest_liability().await?;
let amount = ledger.balances().target_liability_in_cents().await?;
if hedging_adjustment
.determine_action(amount, payload.signed_usd_exposure)
.action_required()
Expand Down
2 changes: 2 additions & 0 deletions hedging/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub enum HedgingError {
BitfinextClient(#[from] bitfinex_client::BitfinexClientError),
#[error("HedgingError - NoJobDataPresent")]
NoJobDataPresent,
#[error("UserTradesError - Leger: {0}")]
Ledger(#[from] ledger::LedgerError),
}

impl JobExecutionError for HedgingError {}
6 changes: 3 additions & 3 deletions hedging/src/job/adjust_funding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use galoy_client::*;
use okex_client::*;
use shared::pubsub::CorrelationId;

use crate::{error::*, okex_transfers::*, rebalance_action::*, synth_usd_liability::*};
use crate::{error::*, okex_transfers::*, rebalance_action::*};

const SATS_PER_BTC: Decimal = dec!(100_000_000);

Expand All @@ -16,15 +16,15 @@ const SATS_PER_BTC: Decimal = dec!(100_000_000);
transferred_funding) err)]
pub(super) async fn execute(
correlation_id: CorrelationId,
synth_usd_liability: SynthUsdLiability,
ledger: ledger::Ledger,
okex: OkexClient,
okex_transfers: OkexTransfers,
galoy: GaloyClient,
funding_adjustment: FundingAdjustment,
) -> Result<(), HedgingError> {
let span = tracing::Span::current();

let target_liability_in_cents = synth_usd_liability.get_latest_liability().await?;
let target_liability_in_cents = ledger.balances().target_liability_in_cents().await?;
span.record(
"target_liability",
&tracing::field::display(target_liability_in_cents),
Expand Down
6 changes: 3 additions & 3 deletions hedging/src/job/adjust_hedge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ use tracing::instrument;
use okex_client::*;
use shared::pubsub::CorrelationId;

use crate::{adjustment_action::*, error::*, okex_orders::*, synth_usd_liability::*};
use crate::{adjustment_action::*, error::*, okex_orders::*};

#[instrument(name = "adjust_hedge", skip_all, fields(correlation_id = %correlation_id,
target_liability, current_position, action, placed_order, client_order_id) err)]
pub(super) async fn execute(
correlation_id: CorrelationId,
synth_usd_liability: SynthUsdLiability,
ledger: ledger::Ledger,
okex: OkexClient,
okex_orders: OkexOrders,
hedging_adjustment: HedgingAdjustment,
) -> Result<(), HedgingError> {
let span = tracing::Span::current();
let target_liability = synth_usd_liability.get_latest_liability().await?;
let target_liability = ledger.balances().target_liability_in_cents().await?;
span.record(
"target_liability",
&tracing::field::display(target_liability),
Expand Down
Loading

0 comments on commit a26082a

Please sign in to comment.