Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sqlx ledger initialization #282

Merged
merged 9 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 6 additions & 27 deletions hedging/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,23 @@
},
"query": "SELECT client_order_id FROM okex_orders WHERE complete = false"
},
"774f260ef3109c246642f50d09384471a5bb9a7c59203fb5c1d7d9eca1a7f5dc": {
"755920d9b23d1c31d6479964d3e1130409f4b74ad5c7519e838818f07ebefd47": {
"describe": {
"columns": [
{
"name": "amount",
"name": "count",
"ordinal": 0,
"type_info": "Numeric"
"type_info": "Int8"
}
],
"nullable": [
false
null
],
"parameters": {
"Left": [
"Numeric",
"Uuid"
]
"Left": []
}
},
"query": "INSERT INTO synth_usd_liability (amount, correlation_id) \n SELECT $1, $2\n WHERE NOT EXISTS (\n SELECT FROM (\n SELECT amount FROM synth_usd_liability ORDER BY idx DESC LIMIT 1\n ) last_row WHERE amount = $1\n )\nRETURNING amount;\n"
"query": "SELECT COUNT(*) FROM user_trades WHERE ledger_tx_id IS NULL"
},
"96f7ee6322ab9ae4a1a69d230e50dcbec45afacfbbf811d90078643fa67cd772": {
"describe": {
Expand Down Expand Up @@ -168,24 +165,6 @@
},
"query": "UPDATE okex_transfers SET state = 'deleted' WHERE lost = true AND state = 'pending' AND created_at < now() - interval '1 day'"
},
"d1a41ae07e59886a30a1750d7e8dd8f24c0652c267b25d8fd5cf3698dc8ca699": {
"describe": {
"columns": [
{
"name": "amount",
"ordinal": 0,
"type_info": "Numeric"
}
],
"nullable": [
false
],
"parameters": {
"Left": []
}
},
"query": "SELECT amount FROM synth_usd_liability ORDER BY idx DESC LIMIT 1"
},
"d393086f855e0eb9bae8575eabf08e0882c0dbaaa7235c8ef9f94954c1f4a9a4": {
"describe": {
"columns": [
Expand Down
66 changes: 34 additions & 32 deletions hedging/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ impl HedgingApp {
price_receiver.resubscribe(),
)
.await?;
Self::spawn_non_stop_polling(pool.clone(), okex_poll_delay).await?;
Self::spawn_health_checker(
health_check_trigger,
health_cfg,
position_sub,
price_receiver,
)
.await;
Self::spawn_non_stop_polling(pool.clone(), okex_poll_delay).await?;
let app = HedgingApp {
_runner: job_runner,
};
Expand Down Expand Up @@ -160,30 +160,36 @@ impl HedgingApp {
pool: sqlx::PgPool,
delay: std::time::Duration,
) -> Result<(), HedgingError> {
loop {
let _ = job::spawn_poll_okex(&pool, std::time::Duration::from_secs(1)).await;
tokio::time::sleep(delay).await;
}
tokio::spawn(async move {
loop {
let _ = job::spawn_poll_okex(&pool, std::time::Duration::from_secs(1)).await;
tokio::time::sleep(delay).await;
}
});
Ok(())
}

async fn spawn_liability_balance_listener(
pool: sqlx::PgPool,
ledger: ledger::Ledger,
) -> Result<(), HedgingError> {
let mut events = ledger.usd_liability_balance_events().await;
loop {
match events.recv().await {
Ok(received) => {
if let ledger::LedgerEventData::BalanceUpdated(data) = received.data {
job::spawn_adjust_hedge(&pool, data.entry_id).await?;
tokio::spawn(async move {
let _ = job::spawn_adjust_hedge(&pool, uuid::Uuid::new_v4()).await;
let mut events = ledger.usd_liability_balance_events().await;
loop {
match events.recv().await {
Ok(received) => {
if let ledger::LedgerEventData::BalanceUpdated(data) = received.data {
let _ = job::spawn_adjust_hedge(&pool, data.entry_id).await;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => (),
_ => {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => (),
_ => {
break;
}
}
}
});
Ok(())
}

Expand Down Expand Up @@ -229,23 +235,19 @@ impl HedgingApp {
position_sub: Subscriber,
price_sub: memory::Subscriber<PriceStreamPayload>,
) {
tokio::spawn(async move {
while let Some(check) = health_check_trigger.next().await {
match (
position_sub
.healthy(health_cfg.unhealthy_msg_interval_position)
.await,
price_sub
.healthy(health_cfg.unhealthy_msg_interval_price)
.await,
) {
(Err(e), _) | (_, Err(e)) => {
check.send(Err(e)).expect("Couldn't send response")
}
_ => check.send(Ok(())).expect("Couldn't send response"),
}
while let Some(check) = health_check_trigger.next().await {
match (
position_sub
.healthy(health_cfg.unhealthy_msg_interval_position)
.await,
price_sub
.healthy(health_cfg.unhealthy_msg_interval_price)
.await,
) {
(Err(e), _) | (_, Err(e)) => check.send(Err(e)).expect("Couldn't send response"),
_ => check.send(Ok(())).expect("Couldn't send response"),
}
});
}
}

async fn handle_received_okex_position(
Expand Down
21 changes: 21 additions & 0 deletions hedging/src/hack_user_trades_lag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use tracing::instrument;

use crate::error::*;

#[instrument(
name = "hedging.job.hack_user_trades_lag",
skip_all,
fields(lag_count),
err
)]
pub async fn lag_ok(pool: &sqlx::PgPool) -> Result<bool, HedgingError> {
let span = tracing::Span::current();
let count = sqlx::query!("SELECT COUNT(*) FROM user_trades WHERE ledger_tx_id IS NULL")
.fetch_one(pool)
.await?;
span.record("lag_count", count.count);
if count.count.unwrap_or(2) >= 2 {
return Ok(false);
}
Ok(true)
}
7 changes: 6 additions & 1 deletion hedging/src/job/adjust_funding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@ const SATS_PER_BTC: Decimal = dec!(100_000_000);
#[instrument(name = "hedging.job.adjust_funding", skip_all, fields(correlation_id = %correlation_id,
target_liability, current_position, last_price_in_usd_cents, funding_available_balance,
trading_available_balance, onchain_fees, action, client_transfer_id,
transferred_funding) err)]
transferred_funding, lag_ok), err)]
pub(super) async fn execute(
correlation_id: CorrelationId,
pool: &sqlx::PgPool,
ledger: ledger::Ledger,
okex: OkexClient,
okex_transfers: OkexTransfers,
galoy: GaloyClient,
funding_adjustment: FundingAdjustment,
) -> Result<(), HedgingError> {
let span = tracing::Span::current();
if !crate::hack_user_trades_lag::lag_ok(pool).await? {
span.record("lag_ok", &tracing::field::display(false));
return Ok(());
}

let target_liability_in_cents = ledger.balances().target_liability_in_cents().await?;
span.record(
Expand Down
7 changes: 6 additions & 1 deletion hedging/src/job/adjust_hedge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ use shared::pubsub::CorrelationId;
use crate::{adjustment_action::*, error::*, okex_orders::*};

#[instrument(name = "hedging.job.adjust_hedge", skip_all, fields(correlation_id = %correlation_id,
target_liability, current_position, action, placed_order, client_order_id) err)]
target_liability, current_position, action, placed_order, client_order_id, lag_ok), err)]
pub(super) async fn execute(
correlation_id: CorrelationId,
pool: &sqlx::PgPool,
ledger: ledger::Ledger,
okex: OkexClient,
okex_orders: OkexOrders,
hedging_adjustment: HedgingAdjustment,
) -> Result<(), HedgingError> {
let span = tracing::Span::current();
if !crate::hack_user_trades_lag::lag_ok(pool).await? {
span.record("lag_ok", &tracing::field::display(false));
return Ok(());
}
let target_liability = ledger.balances().target_liability_in_cents().await?;
span.record(
"target_liability",
Expand Down
6 changes: 5 additions & 1 deletion hedging/src/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct AdjustHedgeData {
tracing_data: HashMap<String, String>,
}

#[instrument(name = "hedging.job.spawn_adjust_hedge", skip_all, fields(error, error.message) err)]
#[instrument(name = "hedging.job.spawn_adjust_hedge", skip_all, fields(error, error.message), err)]
pub async fn spawn_adjust_hedge<'a>(
tx: impl Executor<'a, Database = Postgres>,
trigger_id: impl Into<Uuid>,
Expand Down Expand Up @@ -144,13 +144,15 @@ async fn adjust_hedge(
okex_orders: OkexOrders,
hedging_adjustment: HedgingAdjustment,
) -> Result<(), HedgingError> {
let pool = current_job.pool().clone();
JobExecutor::builder(&mut current_job)
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: AdjustHedgeData = data.ok_or(HedgingError::NoJobDataPresent)?;
adjust_hedge::execute(
data.correlation_id,
&pool,
ledger,
okex,
okex_orders,
Expand Down Expand Up @@ -206,13 +208,15 @@ async fn adjust_funding(
galoy: GaloyClient,
funding_adjustment: FundingAdjustment,
) -> Result<(), HedgingError> {
let pool = current_job.pool().clone();
JobExecutor::builder(&mut current_job)
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: AdjustFundingData = data.ok_or(HedgingError::NoJobDataPresent)?;
adjust_funding::execute(
data.correlation_id,
&pool,
ledger,
okex,
okex_transfers,
Expand Down
1 change: 1 addition & 0 deletions hedging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod adjustment_action;
mod app;
mod error;
pub(crate) mod hack_user_trades_lag;
mod okex_orders;
mod okex_transfers;
mod rebalance_action;
Expand Down
2 changes: 1 addition & 1 deletion ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fail-on-warnings = []
[dependencies]
shared = { path = "../shared", package = "stablesats-shared" }

sqlx-ledger = "0.4"
sqlx-ledger = "0.5"

chrono = "0.4.23"
rust_decimal = "1.28.1"
Expand Down
10 changes: 6 additions & 4 deletions ledger/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ use uuid::{uuid, Uuid};

// Templates
pub(super) const USER_BUYS_USD_CODE: &str = "USER_BUYS_USD";
pub(super) const USER_BUYS_USD_ID: Uuid = uuid!("00000000-0000-0000-0000-000000000001");
pub(super) const USER_SELLS_USD_CODE: &str = "USER_SELLS_USD";
pub(super) const USER_SELLS_USD_ID: Uuid = uuid!("00000000-0000-0000-0000-000000000002");

// Journal
pub(super) const STABLESATS_JOURNAL_NAME: &str = "Stablesats";
pub(super) const STABLESATS_JOURNAL_ID: Uuid = uuid!("00000000-0000-0000-0000-000000000001");

// Accounts
pub(super) const EXTERNAL_OMNIBUS_CODE: &str = "EXTERNAL_OMNIBUS";
pub(super) const EXTERNAL_OMNIBUS_ID: Uuid = uuid!("00000000-0000-0000-0000-000000000001");
pub(super) const EXTERNAL_OMNIBUS_ID: Uuid = uuid!("10000000-1000-0000-0000-000000000000");

pub(super) const STABLESATS_BTC_WALLET: &str = "STABLESATS_BTC_WALLET";
pub(super) const STABLESATS_BTC_WALLET_ID: Uuid = uuid!("10000000-0000-0000-0000-000000000001");
pub(super) const STABLESATS_BTC_WALLET_ID: Uuid = uuid!("20000000-2000-0000-0000-000000000000");

pub(super) const STABLESATS_OMNIBUS: &str = "STABLESATS_OMNIBUS";
pub(super) const STABLESATS_OMNIBUS_ID: Uuid = uuid!("20000000-0000-0000-0000-000000000001");
pub(super) const STABLESATS_OMNIBUS_ID: Uuid = uuid!("20000000-1000-0000-0000-000000000000");

pub(super) const STABLESATS_LIABILITY: &str = "STABLESATS_LIABILITY";
pub(super) const STABLESATS_LIABILITY_ID: Uuid = uuid!("20000000-0000-0000-0000-000000000002");
pub(super) const STABLESATS_LIABILITY_ID: Uuid = uuid!("20000000-2100-0000-0000-000000000000");

pub const SATS_PER_BTC: Decimal = dec!(100_000_000);
pub const CENTS_PER_USD: Decimal = dec!(100);
4 changes: 2 additions & 2 deletions ledger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ impl Ledger {
.code(EXTERNAL_OMNIBUS_CODE)
.id(EXTERNAL_OMNIBUS_ID)
.name(EXTERNAL_OMNIBUS_CODE)
.description("Account for balancing btc coming into wallet".to_string())
.normal_balance_type(DebitOrCredit::Debit)
.description("Account for balancing btc coming into wallet".to_string())
.build()
.expect("Couldn't create external omnibus account");
match ledger.accounts().create(new_account).await {
Expand Down Expand Up @@ -148,6 +148,7 @@ impl Ledger {
.code(STABLESATS_OMNIBUS)
.id(STABLESATS_OMNIBUS_ID)
.name(STABLESATS_OMNIBUS)
.normal_balance_type(DebitOrCredit::Debit)
.description("Omnibus account for all stablesats hedging".to_string())
.build()
.expect("Couldn't create stablesats omnibus account");
Expand All @@ -163,7 +164,6 @@ impl Ledger {
.code(STABLESATS_LIABILITY)
.id(STABLESATS_LIABILITY_ID)
.name(STABLESATS_LIABILITY)
.normal_balance_type(DebitOrCredit::Debit)
.description("Account for incoming stablesats liability".to_string())
.build()
.expect("Couldn't create stablesats liability account");
Expand Down
Loading