From a4571a1bc7ad43be156ec071a5fa387e655c482b Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 11:00:01 +0100 Subject: [PATCH 1/9] chore: introduce user-trade-unit enum --- ...230208092911_user-trade-unit-enum.down.sql | 3 + ...20230208092911_user-trade-unit-enum.up.sql | 20 +++++++ user-trades/src/app/mod.rs | 5 +- .../src/job/poll_galoy_transactions.rs | 12 ++-- user-trades/src/lib.rs | 1 - user-trades/src/user_trade_unit/mod.rs | 58 ------------------- user-trades/src/user_trades/mod.rs | 26 ++++----- user-trades/src/user_trades/unit.rs | 6 ++ 8 files changed, 49 insertions(+), 82 deletions(-) create mode 100644 migrations/20230208092911_user-trade-unit-enum.down.sql create mode 100644 migrations/20230208092911_user-trade-unit-enum.up.sql delete mode 100644 user-trades/src/user_trade_unit/mod.rs create mode 100644 user-trades/src/user_trades/unit.rs diff --git a/migrations/20230208092911_user-trade-unit-enum.down.sql b/migrations/20230208092911_user-trade-unit-enum.down.sql new file mode 100644 index 000000000..b42c5e79a --- /dev/null +++ b/migrations/20230208092911_user-trade-unit-enum.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE user_trades DROP COLUMN buy_unit; +ALTER TABLE user_trades DROP COLUMN sell_unit; +DROP TYPE UserTradeUnit; diff --git a/migrations/20230208092911_user-trade-unit-enum.up.sql b/migrations/20230208092911_user-trade-unit-enum.up.sql new file mode 100644 index 000000000..c3652593d --- /dev/null +++ b/migrations/20230208092911_user-trade-unit-enum.up.sql @@ -0,0 +1,20 @@ +CREATE TYPE UserTradeUnit AS ENUM ('usd_cent', 'satoshi'); + +ALTER TABLE user_trades ADD COLUMN buy_unit UserTradeUnit NOT NULL DEFAULT 'usd_cent'; +ALTER TABLE user_trades ADD COLUMN sell_unit UserTradeUnit NOT NULL DEFAULT 'usd_cent'; + +UPDATE user_trades SET buy_unit = 'usd_cent', sell_unit = 'satoshi' +WHERE id in ( + SELECT t.id FROM user_trades t JOIN user_trade_units u ON t.buy_unit_id = u.id WHERE u.name = 'synthetic_cent' +); + +UPDATE user_trades SET buy_unit = 'satoshi', sell_unit = 'usd_cent' +WHERE id in ( + SELECT t.id FROM user_trades t JOIN user_trade_units u ON t.sell_unit_id = u.id WHERE u.name = 'synthetic_cent' +); + +ALTER TABLE user_trades ALTER COLUMN buy_unit DROP DEFAULT; +ALTER TABLE user_trades ALTER COLUMN sell_unit DROP DEFAULT; +ALTER TABLE user_trades DROP COLUMN buy_unit_id; +ALTER TABLE user_trades DROP COLUMN sell_unit_id; +DROP TABLE user_trade_units; diff --git a/user-trades/src/app/mod.rs b/user-trades/src/app/mod.rs index 6d077e155..f80780860 100644 --- a/user-trades/src/app/mod.rs +++ b/user-trades/src/app/mod.rs @@ -4,7 +4,7 @@ use sqlxmq::OwnedHandle; use galoy_client::{GaloyClient, GaloyClientConfig}; -use crate::{error::*, job, user_trade_unit::*, user_trades::*}; +use crate::{error::*, job, user_trades::*}; pub use config::*; pub struct UserTradesApp { @@ -20,8 +20,7 @@ impl UserTradesApp { galoy_client_cfg: GaloyClientConfig, ) -> Result { let ledger = ledger::Ledger::init(&pool).await?; - let units = UserTradeUnits::load(&pool).await?; - let user_trades = UserTrades::new(pool.clone(), units); + let user_trades = UserTrades::new(pool.clone()); let job_runner = job::start_job_runner( pool.clone(), ledger, diff --git a/user-trades/src/job/poll_galoy_transactions.rs b/user-trades/src/job/poll_galoy_transactions.rs index 140f83ae1..eeb7cb4c9 100644 --- a/user-trades/src/job/poll_galoy_transactions.rs +++ b/user-trades/src/job/poll_galoy_transactions.rs @@ -5,9 +5,7 @@ use std::collections::BTreeMap; use galoy_client::{GaloyClient, SettlementCurrency, TxCursor}; -use crate::{ - error::UserTradesError, galoy_transactions::*, user_trade_unit::UserTradeUnit, user_trades::*, -}; +use crate::{error::UserTradesError, galoy_transactions::*, user_trades::*}; #[instrument( name = "user_trades.job.poll_galoy_transactions", @@ -83,7 +81,7 @@ async fn update_ledger( .. })) = user_trades.find_unaccounted_trade(&mut tx).await { - if buy_unit == UserTradeUnit::SynthCent { + if buy_unit == UserTradeUnit::UsdCent { ledger .user_buys_usd( tx, @@ -191,7 +189,7 @@ impl From for UserTradeUnit { fn from(currency: SettlementCurrency) -> Self { match currency { SettlementCurrency::BTC => Self::Satoshi, - SettlementCurrency::USD => Self::SynthCent, + SettlementCurrency::USD => Self::UsdCent, _ => unimplemented!(), } } @@ -254,7 +252,7 @@ mod tests { assert_eq!( trade1, &NewUserTrade { - buy_unit: UserTradeUnit::SynthCent, + buy_unit: UserTradeUnit::UsdCent, buy_amount: dec!(10), sell_unit: UserTradeUnit::Satoshi, sell_amount: dec!(1000), @@ -270,7 +268,7 @@ mod tests { &NewUserTrade { buy_unit: UserTradeUnit::Satoshi, buy_amount: dec!(1000), - sell_unit: UserTradeUnit::SynthCent, + sell_unit: UserTradeUnit::UsdCent, sell_amount: dec!(10), external_ref: ExternalRef { timestamp: created_earlier, diff --git a/user-trades/src/lib.rs b/user-trades/src/lib.rs index 3c56d65a9..6952e592d 100644 --- a/user-trades/src/lib.rs +++ b/user-trades/src/lib.rs @@ -5,7 +5,6 @@ mod app; mod error; mod galoy_transactions; pub mod job; -pub mod user_trade_unit; pub mod user_trades; use galoy_client::GaloyClientConfig; diff --git a/user-trades/src/user_trade_unit/mod.rs b/user-trades/src/user_trade_unit/mod.rs deleted file mode 100644 index 180168569..000000000 --- a/user-trades/src/user_trade_unit/mod.rs +++ /dev/null @@ -1,58 +0,0 @@ -use sqlx::PgPool; -use std::collections::HashMap; - -use crate::error::UserTradesError; - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)] -pub enum UserTradeUnit { - Satoshi, - SynthCent, -} - -impl TryFrom<&str> for UserTradeUnit { - type Error = (); - - fn try_from(s: &str) -> Result { - match s { - "satoshi" => Ok(UserTradeUnit::Satoshi), - "synthetic_cent" => Ok(UserTradeUnit::SynthCent), - _ => Err(()), - } - } -} - -#[derive(Clone)] -pub struct UserTradeUnits { - inner: HashMap, -} - -impl UserTradeUnits { - pub async fn load(pool: &PgPool) -> Result { - let res = sqlx::query!("SELECT id, name FROM user_trade_units") - .fetch_all(pool) - .await?; - - let mut inner = HashMap::new(); - for row in res { - if let Ok(unit) = UserTradeUnit::try_from(row.name.as_str()) { - inner.insert(unit, row.id); - } - } - - Ok(Self { inner }) - } - - pub fn get_id(&self, unit: UserTradeUnit) -> i32 { - *self - .inner - .get(&unit) - .expect("UserTradeUnit.get_id - not found") - } - - pub fn from_id(&self, id: i32) -> UserTradeUnit { - self.inner - .iter() - .find_map(|(k, v)| if *v == id { Some(*k) } else { None }) - .expect("UserTradeUnit.from_id - not found") - } -} diff --git a/user-trades/src/user_trades/mod.rs b/user-trades/src/user_trades/mod.rs index b305195c5..031d72f41 100644 --- a/user-trades/src/user_trades/mod.rs +++ b/user-trades/src/user_trades/mod.rs @@ -1,13 +1,15 @@ +mod unit; + use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use tracing::instrument; use uuid::Uuid; -use crate::{error::UserTradesError, user_trade_unit::*}; +use crate::error::UserTradesError; use rust_decimal::Decimal; -use crate::user_trade_unit::UserTradeUnit; +pub use unit::*; pub struct UnaccountedUserTrade { pub buy_unit: UserTradeUnit, @@ -36,13 +38,11 @@ pub struct NewUserTrade { } #[derive(Clone)] -pub struct UserTrades { - units: UserTradeUnits, -} +pub struct UserTrades {} impl UserTrades { - pub fn new(_pool: PgPool, units: UserTradeUnits) -> Self { - Self { units } + pub fn new(_pool: PgPool) -> Self { + Self {} } pub async fn persist_all<'a>( @@ -55,7 +55,7 @@ impl UserTrades { } let mut query_builder: QueryBuilder = QueryBuilder::new( - "INSERT INTO user_trades (buy_unit_id, buy_amount, sell_unit_id, sell_amount, external_ref)" + "INSERT INTO user_trades (buy_unit, buy_amount, sell_unit, sell_amount, external_ref)", ); query_builder.push_values( new_user_trades, @@ -67,9 +67,9 @@ impl UserTrades { sell_amount, external_ref, }| { - builder.push_bind(self.units.get_id(buy_unit)); + builder.push_bind(buy_unit); builder.push_bind(buy_amount); - builder.push_bind(self.units.get_id(sell_unit)); + builder.push_bind(sell_unit); builder.push_bind(sell_amount); builder.push_bind( serde_json::to_value(external_ref).expect("failed to serialize external_ref"), @@ -92,15 +92,15 @@ impl UserTrades { SET ledger_tx_id = $1 WHERE id = ( SELECT id FROM user_trades WHERE ledger_tx_id IS NULL ORDER BY id LIMIT 1 - ) RETURNING id, buy_amount, buy_unit_id, sell_amount, sell_unit_id, external_ref"#, + ) RETURNING id, buy_amount, buy_unit as "buy_unit: UserTradeUnit", sell_amount, sell_unit as "sell_unit: UserTradeUnit", external_ref"#, tx_id ) .fetch_optional(&mut *tx) .await?; Ok(trade.map(|trade| UnaccountedUserTrade { - buy_unit: self.units.from_id(trade.buy_unit_id), + buy_unit: trade.buy_unit, buy_amount: trade.buy_amount, - sell_unit: self.units.from_id(trade.sell_unit_id), + sell_unit: trade.sell_unit, sell_amount: trade.sell_amount, external_ref: serde_json::from_value(trade.external_ref) .expect("failed to deserialize external_ref"), diff --git a/user-trades/src/user_trades/unit.rs b/user-trades/src/user_trades/unit.rs new file mode 100644 index 000000000..63ce32d62 --- /dev/null +++ b/user-trades/src/user_trades/unit.rs @@ -0,0 +1,6 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] +#[sqlx(type_name = "UserTradeUnit", rename_all = "snake_case")] +pub enum UserTradeUnit { + UsdCent, + Satoshi, +} From 2259b1396648fb71f9c6677e347adcea63ac0167 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 11:11:25 +0100 Subject: [PATCH 2/9] fix: switch user trade sides --- ...0208100022_switch-user-trade-sides.down.sql | 8 ++++++++ ...230208100022_switch-user-trade-sides.up.sql | 8 ++++++++ user-trades/src/job/poll_galoy_transactions.rs | 18 +++++++++--------- 3 files changed, 25 insertions(+), 9 deletions(-) create mode 100644 migrations/20230208100022_switch-user-trade-sides.down.sql create mode 100644 migrations/20230208100022_switch-user-trade-sides.up.sql diff --git a/migrations/20230208100022_switch-user-trade-sides.down.sql b/migrations/20230208100022_switch-user-trade-sides.down.sql new file mode 100644 index 000000000..75e7335e5 --- /dev/null +++ b/migrations/20230208100022_switch-user-trade-sides.down.sql @@ -0,0 +1,8 @@ +ALTER TABLE user_trades RENAME COLUMN buy_unit TO old_buy_unit; +ALTER TABLE user_trades RENAME COLUMN buy_amount TO old_buy_amount; + +ALTER TABLE user_trades RENAME COLUMN sell_unit TO buy_unit; +ALTER TABLE user_trades RENAME COLUMN sell_amount TO buy_amount; + +ALTER TABLE user_trades RENAME COLUMN old_buy_unit TO sell_unit; +ALTER TABLE user_trades RENAME COLUMN old_buy_amount TO sell_amount; diff --git a/migrations/20230208100022_switch-user-trade-sides.up.sql b/migrations/20230208100022_switch-user-trade-sides.up.sql new file mode 100644 index 000000000..75e7335e5 --- /dev/null +++ b/migrations/20230208100022_switch-user-trade-sides.up.sql @@ -0,0 +1,8 @@ +ALTER TABLE user_trades RENAME COLUMN buy_unit TO old_buy_unit; +ALTER TABLE user_trades RENAME COLUMN buy_amount TO old_buy_amount; + +ALTER TABLE user_trades RENAME COLUMN sell_unit TO buy_unit; +ALTER TABLE user_trades RENAME COLUMN sell_amount TO buy_amount; + +ALTER TABLE user_trades RENAME COLUMN old_buy_unit TO sell_unit; +ALTER TABLE user_trades RENAME COLUMN old_buy_amount TO sell_amount; diff --git a/user-trades/src/job/poll_galoy_transactions.rs b/user-trades/src/job/poll_galoy_transactions.rs index eeb7cb4c9..32138ec18 100644 --- a/user-trades/src/job/poll_galoy_transactions.rs +++ b/user-trades/src/job/poll_galoy_transactions.rs @@ -154,7 +154,7 @@ fn unify(unpaired_transactions: Vec) -> (Vec, }; paired_ids.push(external_ref.btc_tx_id.clone()); paired_ids.push(external_ref.usd_tx_id.clone()); - if tx.settlement_amount < Decimal::ZERO { + if tx.settlement_amount > Decimal::ZERO { user_trades.push(NewUserTrade { buy_unit: tx.settlement_currency.into(), buy_amount: tx.settlement_amount.abs(), @@ -252,10 +252,10 @@ mod tests { assert_eq!( trade1, &NewUserTrade { - buy_unit: UserTradeUnit::UsdCent, - buy_amount: dec!(10), - sell_unit: UserTradeUnit::Satoshi, - sell_amount: dec!(1000), + sell_unit: UserTradeUnit::UsdCent, + sell_amount: dec!(10), + buy_unit: UserTradeUnit::Satoshi, + buy_amount: dec!(1000), external_ref: ExternalRef { timestamp: created_at, btc_tx_id: "id1".to_string(), @@ -266,10 +266,10 @@ mod tests { assert_eq!( trade2, &NewUserTrade { - buy_unit: UserTradeUnit::Satoshi, - buy_amount: dec!(1000), - sell_unit: UserTradeUnit::UsdCent, - sell_amount: dec!(10), + sell_unit: UserTradeUnit::Satoshi, + sell_amount: dec!(1000), + buy_unit: UserTradeUnit::UsdCent, + buy_amount: dec!(10), external_ref: ExternalRef { timestamp: created_earlier, btc_tx_id: "id3".to_string(), From 0e00261b9be92f2fa00724d8ce383ac4100563df Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 11:19:43 +0100 Subject: [PATCH 3/9] fix: switch sqlx-ledger templates --- ledger/src/lib.rs | 4 ++-- ledger/tests/ledger.rs | 4 ++-- .../20230208101626_switch-sqlx-ledger-templates.down.sql | 1 + migrations/20230208101626_switch-sqlx-ledger-templates.up.sql | 4 ++++ 4 files changed, 9 insertions(+), 4 deletions(-) create mode 100644 migrations/20230208101626_switch-sqlx-ledger-templates.down.sql create mode 100644 migrations/20230208101626_switch-sqlx-ledger-templates.up.sql diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 284465043..c6e6e320f 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -118,7 +118,6 @@ impl Ledger { .id(EXTERNAL_OMNIBUS_ID) .name(EXTERNAL_OMNIBUS_CODE) .description("Account for balancing btc coming into wallet".to_string()) - .normal_balance_type(DebitOrCredit::Debit) .build() .expect("Couldn't create external omnibus account"); match ledger.accounts().create(new_account).await { @@ -133,6 +132,7 @@ impl Ledger { .code(STABLESATS_BTC_WALLET) .id(STABLESATS_BTC_WALLET_ID) .name(STABLESATS_BTC_WALLET) + .normal_balance_type(DebitOrCredit::Debit) .description("Account that records the stablesats btc balance".to_string()) .build() .expect("Couldn't create stablesats btc wallet account"); @@ -148,6 +148,7 @@ impl Ledger { .code(STABLESATS_OMNIBUS) .id(STABLESATS_OMNIBUS_ID) .name(STABLESATS_OMNIBUS) + .normal_balance_type(DebitOrCredit::Debit) .description("Omnibus account for all stablesats hedging".to_string()) .build() .expect("Couldn't create stablesats omnibus account"); @@ -163,7 +164,6 @@ impl Ledger { .code(STABLESATS_LIABILITY) .id(STABLESATS_LIABILITY_ID) .name(STABLESATS_LIABILITY) - .normal_balance_type(DebitOrCredit::Debit) .description("Account for incoming stablesats liability".to_string()) .build() .expect("Couldn't create stablesats liability account"); diff --git a/ledger/tests/ledger.rs b/ledger/tests/ledger.rs index 0e6566810..5377f4c2f 100644 --- a/ledger/tests/ledger.rs +++ b/ledger/tests/ledger.rs @@ -56,8 +56,8 @@ async fn user_buys_and_sells_usd() -> anyhow::Result<()> { .await? .unwrap() .settled(); - assert_eq!(before_liability - after_liability, dec!(5)); - assert_eq!(before_btc - after_btc, dec!(0.01)); + assert_eq!(after_liability - before_liability, dec!(5)); + assert_eq!(after_btc - before_btc, dec!(0.01)); ledger .user_sells_usd( diff --git a/migrations/20230208101626_switch-sqlx-ledger-templates.down.sql b/migrations/20230208101626_switch-sqlx-ledger-templates.down.sql new file mode 100644 index 000000000..d2f607c5b --- /dev/null +++ b/migrations/20230208101626_switch-sqlx-ledger-templates.down.sql @@ -0,0 +1 @@ +-- Add down migration script here diff --git a/migrations/20230208101626_switch-sqlx-ledger-templates.up.sql b/migrations/20230208101626_switch-sqlx-ledger-templates.up.sql new file mode 100644 index 000000000..baac44550 --- /dev/null +++ b/migrations/20230208101626_switch-sqlx-ledger-templates.up.sql @@ -0,0 +1,4 @@ +UPDATE sqlx_ledger_accounts SET normal_balance_type = 'credit'; +UPDATE sqlx_ledger_accounts + SET normal_balance_type = 'debit' WHERE id = '10000000-0000-0000-0000-000000000001'; + SET normal_balance_type = 'debit' WHERE id = '20000000-0000-0000-0000-000000000001'; From ffd0b9a2a5111386348f4e5632954e220b1e21ac Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 20:54:16 +0100 Subject: [PATCH 4/9] fix: hedging stalls on startup --- hedging/src/app/mod.rs | 66 ++++++++++++++++++++++-------------------- hedging/src/job/mod.rs | 2 +- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/hedging/src/app/mod.rs b/hedging/src/app/mod.rs index 17076682c..219686ebe 100644 --- a/hedging/src/app/mod.rs +++ b/hedging/src/app/mod.rs @@ -79,6 +79,7 @@ impl HedgingApp { price_receiver.resubscribe(), ) .await?; + Self::spawn_non_stop_polling(pool.clone(), okex_poll_delay).await?; Self::spawn_health_checker( health_check_trigger, health_cfg, @@ -86,7 +87,6 @@ impl HedgingApp { price_receiver, ) .await; - Self::spawn_non_stop_polling(pool.clone(), okex_poll_delay).await?; let app = HedgingApp { _runner: job_runner, }; @@ -160,30 +160,36 @@ impl HedgingApp { pool: sqlx::PgPool, delay: std::time::Duration, ) -> Result<(), HedgingError> { - loop { - let _ = job::spawn_poll_okex(&pool, std::time::Duration::from_secs(1)).await; - tokio::time::sleep(delay).await; - } + tokio::spawn(async move { + loop { + let _ = job::spawn_poll_okex(&pool, std::time::Duration::from_secs(1)).await; + tokio::time::sleep(delay).await; + } + }); + Ok(()) } async fn spawn_liability_balance_listener( pool: sqlx::PgPool, ledger: ledger::Ledger, ) -> Result<(), HedgingError> { - let mut events = ledger.usd_liability_balance_events().await; - loop { - match events.recv().await { - Ok(received) => { - if let ledger::LedgerEventData::BalanceUpdated(data) = received.data { - job::spawn_adjust_hedge(&pool, data.entry_id).await?; + tokio::spawn(async move { + let _ = job::spawn_adjust_hedge(&pool, uuid::Uuid::new_v4()).await; + let mut events = ledger.usd_liability_balance_events().await; + loop { + match events.recv().await { + Ok(received) => { + if let ledger::LedgerEventData::BalanceUpdated(data) = received.data { + let _ = job::spawn_adjust_hedge(&pool, data.entry_id).await; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => (), + _ => { + break; } - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => (), - _ => { - break; } } - } + }); Ok(()) } @@ -229,23 +235,19 @@ impl HedgingApp { position_sub: Subscriber, price_sub: memory::Subscriber, ) { - tokio::spawn(async move { - while let Some(check) = health_check_trigger.next().await { - match ( - position_sub - .healthy(health_cfg.unhealthy_msg_interval_position) - .await, - price_sub - .healthy(health_cfg.unhealthy_msg_interval_price) - .await, - ) { - (Err(e), _) | (_, Err(e)) => { - check.send(Err(e)).expect("Couldn't send response") - } - _ => check.send(Ok(())).expect("Couldn't send response"), - } + while let Some(check) = health_check_trigger.next().await { + match ( + position_sub + .healthy(health_cfg.unhealthy_msg_interval_position) + .await, + price_sub + .healthy(health_cfg.unhealthy_msg_interval_price) + .await, + ) { + (Err(e), _) | (_, Err(e)) => check.send(Err(e)).expect("Couldn't send response"), + _ => check.send(Ok(())).expect("Couldn't send response"), } - }); + } } async fn handle_received_okex_position( diff --git a/hedging/src/job/mod.rs b/hedging/src/job/mod.rs index 4e9926c35..c07cc9ce5 100644 --- a/hedging/src/job/mod.rs +++ b/hedging/src/job/mod.rs @@ -88,7 +88,7 @@ struct AdjustHedgeData { tracing_data: HashMap, } -#[instrument(name = "hedging.job.spawn_adjust_hedge", skip_all, fields(error, error.message) err)] +#[instrument(name = "hedging.job.spawn_adjust_hedge", skip_all, fields(error, error.message), err)] pub async fn spawn_adjust_hedge<'a>( tx: impl Executor<'a, Database = Postgres>, trigger_id: impl Into, From c7c0c58b757861237f56987ee59f69f266134edc Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 20:57:55 +0100 Subject: [PATCH 5/9] chore: fix ledger templates + reset --- ledger/src/constants.rs | 8 ++++---- ledger/src/lib.rs | 2 +- ledger/src/templates/user_buys_usd.rs | 8 ++++---- ledger/src/templates/user_sells_usd.rs | 12 ++++++------ .../20230208100022_switch-user-trade-sides.down.sql | 8 -------- .../20230208100022_switch-user-trade-sides.up.sql | 8 -------- ...sql => 20230208101626_reset-sqlx-ledger.down.sql} | 0 migrations/20230208101626_reset-sqlx-ledger.up.sql | 9 +++++++++ ...0230208101626_switch-sqlx-ledger-templates.up.sql | 4 ---- 9 files changed, 24 insertions(+), 35 deletions(-) delete mode 100644 migrations/20230208100022_switch-user-trade-sides.down.sql delete mode 100644 migrations/20230208100022_switch-user-trade-sides.up.sql rename migrations/{20230208101626_switch-sqlx-ledger-templates.down.sql => 20230208101626_reset-sqlx-ledger.down.sql} (100%) create mode 100644 migrations/20230208101626_reset-sqlx-ledger.up.sql delete mode 100644 migrations/20230208101626_switch-sqlx-ledger-templates.up.sql diff --git a/ledger/src/constants.rs b/ledger/src/constants.rs index 14f66341b..cc15e7bb8 100644 --- a/ledger/src/constants.rs +++ b/ledger/src/constants.rs @@ -12,16 +12,16 @@ pub(super) const STABLESATS_JOURNAL_ID: Uuid = uuid!("00000000-0000-0000-0000-00 // Accounts pub(super) const EXTERNAL_OMNIBUS_CODE: &str = "EXTERNAL_OMNIBUS"; -pub(super) const EXTERNAL_OMNIBUS_ID: Uuid = uuid!("00000000-0000-0000-0000-000000000001"); +pub(super) const EXTERNAL_OMNIBUS_ID: Uuid = uuid!("10000000-1000-0000-0000-000000000000"); pub(super) const STABLESATS_BTC_WALLET: &str = "STABLESATS_BTC_WALLET"; -pub(super) const STABLESATS_BTC_WALLET_ID: Uuid = uuid!("10000000-0000-0000-0000-000000000001"); +pub(super) const STABLESATS_BTC_WALLET_ID: Uuid = uuid!("20000000-2000-0000-0000-000000000000"); pub(super) const STABLESATS_OMNIBUS: &str = "STABLESATS_OMNIBUS"; -pub(super) const STABLESATS_OMNIBUS_ID: Uuid = uuid!("20000000-0000-0000-0000-000000000001"); +pub(super) const STABLESATS_OMNIBUS_ID: Uuid = uuid!("20000000-1000-0000-0000-000000000000"); pub(super) const STABLESATS_LIABILITY: &str = "STABLESATS_LIABILITY"; -pub(super) const STABLESATS_LIABILITY_ID: Uuid = uuid!("20000000-0000-0000-0000-000000000002"); +pub(super) const STABLESATS_LIABILITY_ID: Uuid = uuid!("20000000-2100-0000-0000-000000000000"); pub const SATS_PER_BTC: Decimal = dec!(100_000_000); pub const CENTS_PER_USD: Decimal = dec!(100); diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index c6e6e320f..71f60e696 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -117,6 +117,7 @@ impl Ledger { .code(EXTERNAL_OMNIBUS_CODE) .id(EXTERNAL_OMNIBUS_ID) .name(EXTERNAL_OMNIBUS_CODE) + .normal_balance_type(DebitOrCredit::Debit) .description("Account for balancing btc coming into wallet".to_string()) .build() .expect("Couldn't create external omnibus account"); @@ -132,7 +133,6 @@ impl Ledger { .code(STABLESATS_BTC_WALLET) .id(STABLESATS_BTC_WALLET_ID) .name(STABLESATS_BTC_WALLET) - .normal_balance_type(DebitOrCredit::Debit) .description("Account that records the stablesats btc balance".to_string()) .build() .expect("Couldn't create stablesats btc wallet account"); diff --git a/ledger/src/templates/user_buys_usd.rs b/ledger/src/templates/user_buys_usd.rs index b8d59806f..a36af89d3 100644 --- a/ledger/src/templates/user_buys_usd.rs +++ b/ledger/src/templates/user_buys_usd.rs @@ -81,21 +81,21 @@ impl UserBuysUsd { EntryInput::builder() .entry_type("'USER_BUYS_USD_BTC_CR'") .currency("'BTC'") - .account_id(format!("uuid('{EXTERNAL_OMNIBUS_ID}')")) + .account_id(format!("uuid('{STABLESATS_BTC_WALLET_ID}')")) .direction("CREDIT") .layer("SETTLED") .units("params.btc_amount") .build() - .expect("Couldn't build PENDING_ONCHAIN_DEBIT entry"), + .expect("Couldn't build USER_BUYS_USD_BTC_CR entry"), EntryInput::builder() .entry_type("'USER_BUYS_USD_BTC_DR'") .currency("'BTC'") - .account_id(format!("uuid('{STABLESATS_BTC_WALLET_ID}')")) + .account_id(format!("uuid('{EXTERNAL_OMNIBUS_ID}')")) .direction("DEBIT") .layer("SETTLED") .units("params.btc_amount") .build() - .expect("Couldn't build BTC_WALLET_DR entry"), + .expect("Couldn't build USER_BUYS_USD_BTC_DR entry"), EntryInput::builder() .entry_type("'USER_BUYS_USD_USD_CR'") .currency("'USD'") diff --git a/ledger/src/templates/user_sells_usd.rs b/ledger/src/templates/user_sells_usd.rs index 6a413522c..8a3e11327 100644 --- a/ledger/src/templates/user_sells_usd.rs +++ b/ledger/src/templates/user_sells_usd.rs @@ -81,21 +81,21 @@ impl UserSellsUsd { EntryInput::builder() .entry_type("'USER_SELLS_USD_BTC_DR'") .currency("'BTC'") - .account_id(format!("uuid('{EXTERNAL_OMNIBUS_ID}')")) + .account_id(format!("uuid('{STABLESATS_BTC_WALLET_ID}')")) .direction("DEBIT") .layer("SETTLED") .units("params.btc_amount") .build() - .expect("Couldn't build PENDING_ONCHAIN_DEBIT entry"), + .expect("Couldn't build USER_SELLS_USD_BTC_DR entry"), EntryInput::builder() .entry_type("'USER_SELLS_USD_BTC_CR'") .currency("'BTC'") - .account_id(format!("uuid('{STABLESATS_BTC_WALLET_ID}')")) + .account_id(format!("uuid('{EXTERNAL_OMNIBUS_ID}')")) .direction("CREDIT") .layer("SETTLED") .units("params.btc_amount") .build() - .expect("Couldn't build BTC_WALLET_DR entry"), + .expect("Couldn't build USER_SELLS_USD_BTC_CR entry"), EntryInput::builder() .entry_type("'USER_SELLS_USD_USD_DR'") .currency("'USD'") @@ -104,7 +104,7 @@ impl UserSellsUsd { .layer("SETTLED") .units("params.usd_amount") .build() - .expect("Couldn't build USER_SELLS_USD_USD_CR entry"), + .expect("Couldn't build USER_SELLS_USD_USD_DR entry"), EntryInput::builder() .entry_type("'USER_SELLS_USD_USD_CR'") .currency("'USD'") @@ -113,7 +113,7 @@ impl UserSellsUsd { .layer("SETTLED") .units("params.usd_amount") .build() - .expect("Couldn't build USER_SELLS_USD_USD_DR entry"), + .expect("Couldn't build USER_SELLS_USD_USD_CR entry"), ]; let params = UserSellsUsdParams::defs(); diff --git a/migrations/20230208100022_switch-user-trade-sides.down.sql b/migrations/20230208100022_switch-user-trade-sides.down.sql deleted file mode 100644 index 75e7335e5..000000000 --- a/migrations/20230208100022_switch-user-trade-sides.down.sql +++ /dev/null @@ -1,8 +0,0 @@ -ALTER TABLE user_trades RENAME COLUMN buy_unit TO old_buy_unit; -ALTER TABLE user_trades RENAME COLUMN buy_amount TO old_buy_amount; - -ALTER TABLE user_trades RENAME COLUMN sell_unit TO buy_unit; -ALTER TABLE user_trades RENAME COLUMN sell_amount TO buy_amount; - -ALTER TABLE user_trades RENAME COLUMN old_buy_unit TO sell_unit; -ALTER TABLE user_trades RENAME COLUMN old_buy_amount TO sell_amount; diff --git a/migrations/20230208100022_switch-user-trade-sides.up.sql b/migrations/20230208100022_switch-user-trade-sides.up.sql deleted file mode 100644 index 75e7335e5..000000000 --- a/migrations/20230208100022_switch-user-trade-sides.up.sql +++ /dev/null @@ -1,8 +0,0 @@ -ALTER TABLE user_trades RENAME COLUMN buy_unit TO old_buy_unit; -ALTER TABLE user_trades RENAME COLUMN buy_amount TO old_buy_amount; - -ALTER TABLE user_trades RENAME COLUMN sell_unit TO buy_unit; -ALTER TABLE user_trades RENAME COLUMN sell_amount TO buy_amount; - -ALTER TABLE user_trades RENAME COLUMN old_buy_unit TO sell_unit; -ALTER TABLE user_trades RENAME COLUMN old_buy_amount TO sell_amount; diff --git a/migrations/20230208101626_switch-sqlx-ledger-templates.down.sql b/migrations/20230208101626_reset-sqlx-ledger.down.sql similarity index 100% rename from migrations/20230208101626_switch-sqlx-ledger-templates.down.sql rename to migrations/20230208101626_reset-sqlx-ledger.down.sql diff --git a/migrations/20230208101626_reset-sqlx-ledger.up.sql b/migrations/20230208101626_reset-sqlx-ledger.up.sql new file mode 100644 index 000000000..8451eb3b6 --- /dev/null +++ b/migrations/20230208101626_reset-sqlx-ledger.up.sql @@ -0,0 +1,9 @@ +DELETE FROM sqlx_ledger_accounts; +DELETE FROM sqlx_ledger_transactions; +DELETE FROM sqlx_ledger_entries; +DELETE FROM sqlx_ledger_events; +DELETE FROM sqlx_ledger_balances; +DELETE FROM sqlx_ledger_current_balances; +DELETE FROM sqlx_ledger_tx_templates; + +UPDATE user_trades SET ledger_tx_id = NULL; diff --git a/migrations/20230208101626_switch-sqlx-ledger-templates.up.sql b/migrations/20230208101626_switch-sqlx-ledger-templates.up.sql deleted file mode 100644 index baac44550..000000000 --- a/migrations/20230208101626_switch-sqlx-ledger-templates.up.sql +++ /dev/null @@ -1,4 +0,0 @@ -UPDATE sqlx_ledger_accounts SET normal_balance_type = 'credit'; -UPDATE sqlx_ledger_accounts - SET normal_balance_type = 'debit' WHERE id = '10000000-0000-0000-0000-000000000001'; - SET normal_balance_type = 'debit' WHERE id = '20000000-0000-0000-0000-000000000001'; From d80e2cebcdb5fa604529089650f4b667a6106038 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 20:58:19 +0100 Subject: [PATCH 6/9] fix: user trades ordering --- user-trades/src/galoy_transactions/mod.rs | 6 +++--- user-trades/src/job/poll_galoy_transactions.rs | 18 +++++++++--------- user-trades/src/user_trades/mod.rs | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/user-trades/src/galoy_transactions/mod.rs b/user-trades/src/galoy_transactions/mod.rs index e11814ba7..9f647a4dd 100644 --- a/user-trades/src/galoy_transactions/mod.rs +++ b/user-trades/src/galoy_transactions/mod.rs @@ -73,10 +73,10 @@ impl GaloyTransactions { pub async fn get_latest_cursor(&self) -> Result, UserTradesError> { let res = sqlx::query!("SELECT cursor FROM galoy_transactions ORDER BY created_at DESC LIMIT 1") - .fetch_all(&self.pool) + .fetch_optional(&self.pool) .await?; - if let Some(res) = res.into_iter().next() { + if let Some(res) = res { Ok(Some(LatestCursor(res.cursor))) } else { Ok(None) @@ -91,7 +91,7 @@ impl GaloyTransactions { " SELECT id, settlement_amount, settlement_currency, amount_in_usd_cents, created_at FROM galoy_transactions - WHERE is_paired = false AND amount_in_usd_cents != 0 FOR UPDATE + WHERE is_paired = false AND amount_in_usd_cents != 0 ORDER BY created_at FOR UPDATE " ) .fetch_all(&mut tx) diff --git a/user-trades/src/job/poll_galoy_transactions.rs b/user-trades/src/job/poll_galoy_transactions.rs index 32138ec18..eeb7cb4c9 100644 --- a/user-trades/src/job/poll_galoy_transactions.rs +++ b/user-trades/src/job/poll_galoy_transactions.rs @@ -154,7 +154,7 @@ fn unify(unpaired_transactions: Vec) -> (Vec, }; paired_ids.push(external_ref.btc_tx_id.clone()); paired_ids.push(external_ref.usd_tx_id.clone()); - if tx.settlement_amount > Decimal::ZERO { + if tx.settlement_amount < Decimal::ZERO { user_trades.push(NewUserTrade { buy_unit: tx.settlement_currency.into(), buy_amount: tx.settlement_amount.abs(), @@ -252,10 +252,10 @@ mod tests { assert_eq!( trade1, &NewUserTrade { - sell_unit: UserTradeUnit::UsdCent, - sell_amount: dec!(10), - buy_unit: UserTradeUnit::Satoshi, - buy_amount: dec!(1000), + buy_unit: UserTradeUnit::UsdCent, + buy_amount: dec!(10), + sell_unit: UserTradeUnit::Satoshi, + sell_amount: dec!(1000), external_ref: ExternalRef { timestamp: created_at, btc_tx_id: "id1".to_string(), @@ -266,10 +266,10 @@ mod tests { assert_eq!( trade2, &NewUserTrade { - sell_unit: UserTradeUnit::Satoshi, - sell_amount: dec!(1000), - buy_unit: UserTradeUnit::UsdCent, - buy_amount: dec!(10), + buy_unit: UserTradeUnit::Satoshi, + buy_amount: dec!(1000), + sell_unit: UserTradeUnit::UsdCent, + sell_amount: dec!(10), external_ref: ExternalRef { timestamp: created_earlier, btc_tx_id: "id3".to_string(), diff --git a/user-trades/src/user_trades/mod.rs b/user-trades/src/user_trades/mod.rs index 031d72f41..021e7a4aa 100644 --- a/user-trades/src/user_trades/mod.rs +++ b/user-trades/src/user_trades/mod.rs @@ -91,7 +91,7 @@ impl UserTrades { r#"UPDATE user_trades SET ledger_tx_id = $1 WHERE id = ( - SELECT id FROM user_trades WHERE ledger_tx_id IS NULL ORDER BY id LIMIT 1 + SELECT id FROM user_trades WHERE ledger_tx_id IS NULL ORDER BY external_ref->>'timestamp' LIMIT 1 ) RETURNING id, buy_amount, buy_unit as "buy_unit: UserTradeUnit", sell_amount, sell_unit as "sell_unit: UserTradeUnit", external_ref"#, tx_id ) From 660fba0a4b6890d0ee251f93f2e1ac85946b5b19 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 21:13:35 +0100 Subject: [PATCH 7/9] chore: tmp hack to early exit jobs due to lag --- hedging/src/hack_user_trades_lag.rs | 21 +++++++++++++++++++++ hedging/src/job/adjust_funding.rs | 7 ++++++- hedging/src/job/adjust_hedge.rs | 7 ++++++- hedging/src/job/mod.rs | 4 ++++ hedging/src/lib.rs | 1 + 5 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 hedging/src/hack_user_trades_lag.rs diff --git a/hedging/src/hack_user_trades_lag.rs b/hedging/src/hack_user_trades_lag.rs new file mode 100644 index 000000000..bee64c5f5 --- /dev/null +++ b/hedging/src/hack_user_trades_lag.rs @@ -0,0 +1,21 @@ +use tracing::instrument; + +use crate::error::*; + +#[instrument( + name = "hedging.job.hack_user_trades_lag", + skip_all, + fields(lag_count), + err +)] +pub async fn lag_ok(pool: &sqlx::PgPool) -> Result { + let span = tracing::Span::current(); + let count = sqlx::query!("SELECT COUNT(*) FROM user_trades WHERE ledger_tx_id IS NULL") + .fetch_one(pool) + .await?; + span.record("lag_count", count.count); + if count.count.unwrap_or(2) >= 2 { + return Ok(false); + } + Ok(true) +} diff --git a/hedging/src/job/adjust_funding.rs b/hedging/src/job/adjust_funding.rs index af0164f3d..86c71cfbd 100644 --- a/hedging/src/job/adjust_funding.rs +++ b/hedging/src/job/adjust_funding.rs @@ -13,9 +13,10 @@ const SATS_PER_BTC: Decimal = dec!(100_000_000); #[instrument(name = "hedging.job.adjust_funding", skip_all, fields(correlation_id = %correlation_id, target_liability, current_position, last_price_in_usd_cents, funding_available_balance, trading_available_balance, onchain_fees, action, client_transfer_id, - transferred_funding) err)] + transferred_funding, lag_ok), err)] pub(super) async fn execute( correlation_id: CorrelationId, + pool: &sqlx::PgPool, ledger: ledger::Ledger, okex: OkexClient, okex_transfers: OkexTransfers, @@ -23,6 +24,10 @@ pub(super) async fn execute( funding_adjustment: FundingAdjustment, ) -> Result<(), HedgingError> { let span = tracing::Span::current(); + if !crate::hack_user_trades_lag::lag_ok(pool).await? { + span.record("lag_ok", &tracing::field::display(false)); + return Ok(()); + } let target_liability_in_cents = ledger.balances().target_liability_in_cents().await?; span.record( diff --git a/hedging/src/job/adjust_hedge.rs b/hedging/src/job/adjust_hedge.rs index 85d8a5464..41032d3d7 100644 --- a/hedging/src/job/adjust_hedge.rs +++ b/hedging/src/job/adjust_hedge.rs @@ -7,15 +7,20 @@ use shared::pubsub::CorrelationId; use crate::{adjustment_action::*, error::*, okex_orders::*}; #[instrument(name = "hedging.job.adjust_hedge", skip_all, fields(correlation_id = %correlation_id, - target_liability, current_position, action, placed_order, client_order_id) err)] + target_liability, current_position, action, placed_order, client_order_id, lag_ok), err)] pub(super) async fn execute( correlation_id: CorrelationId, + pool: &sqlx::PgPool, ledger: ledger::Ledger, okex: OkexClient, okex_orders: OkexOrders, hedging_adjustment: HedgingAdjustment, ) -> Result<(), HedgingError> { let span = tracing::Span::current(); + if !crate::hack_user_trades_lag::lag_ok(pool).await? { + span.record("lag_ok", &tracing::field::display(false)); + return Ok(()); + } let target_liability = ledger.balances().target_liability_in_cents().await?; span.record( "target_liability", diff --git a/hedging/src/job/mod.rs b/hedging/src/job/mod.rs index c07cc9ce5..9177870e3 100644 --- a/hedging/src/job/mod.rs +++ b/hedging/src/job/mod.rs @@ -144,6 +144,7 @@ async fn adjust_hedge( okex_orders: OkexOrders, hedging_adjustment: HedgingAdjustment, ) -> Result<(), HedgingError> { + let pool = current_job.pool().clone(); JobExecutor::builder(&mut current_job) .build() .expect("couldn't build JobExecutor") @@ -151,6 +152,7 @@ async fn adjust_hedge( let data: AdjustHedgeData = data.ok_or(HedgingError::NoJobDataPresent)?; adjust_hedge::execute( data.correlation_id, + &pool, ledger, okex, okex_orders, @@ -206,6 +208,7 @@ async fn adjust_funding( galoy: GaloyClient, funding_adjustment: FundingAdjustment, ) -> Result<(), HedgingError> { + let pool = current_job.pool().clone(); JobExecutor::builder(&mut current_job) .build() .expect("couldn't build JobExecutor") @@ -213,6 +216,7 @@ async fn adjust_funding( let data: AdjustFundingData = data.ok_or(HedgingError::NoJobDataPresent)?; adjust_funding::execute( data.correlation_id, + &pool, ledger, okex, okex_transfers, diff --git a/hedging/src/lib.rs b/hedging/src/lib.rs index e65a0bf84..1ff3f2dfc 100644 --- a/hedging/src/lib.rs +++ b/hedging/src/lib.rs @@ -4,6 +4,7 @@ mod adjustment_action; mod app; mod error; +pub(crate) mod hack_user_trades_lag; mod okex_orders; mod okex_transfers; mod rebalance_action; From cf3d35d528df85a2445a61a8068850b84c7f38e5 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 21:18:48 +0100 Subject: [PATCH 8/9] chore: update sqlx-data.json --- hedging/sqlx-data.json | 33 ++----- user-trades/sqlx-data.json | 186 +++++++++---------------------------- 2 files changed, 50 insertions(+), 169 deletions(-) diff --git a/hedging/sqlx-data.json b/hedging/sqlx-data.json index 26acd9cb3..ef0c7f736 100644 --- a/hedging/sqlx-data.json +++ b/hedging/sqlx-data.json @@ -59,26 +59,23 @@ }, "query": "SELECT client_order_id FROM okex_orders WHERE complete = false" }, - "774f260ef3109c246642f50d09384471a5bb9a7c59203fb5c1d7d9eca1a7f5dc": { + "755920d9b23d1c31d6479964d3e1130409f4b74ad5c7519e838818f07ebefd47": { "describe": { "columns": [ { - "name": "amount", + "name": "count", "ordinal": 0, - "type_info": "Numeric" + "type_info": "Int8" } ], "nullable": [ - false + null ], "parameters": { - "Left": [ - "Numeric", - "Uuid" - ] + "Left": [] } }, - "query": "INSERT INTO synth_usd_liability (amount, correlation_id) \n SELECT $1, $2\n WHERE NOT EXISTS (\n SELECT FROM (\n SELECT amount FROM synth_usd_liability ORDER BY idx DESC LIMIT 1\n ) last_row WHERE amount = $1\n )\nRETURNING amount;\n" + "query": "SELECT COUNT(*) FROM user_trades WHERE ledger_tx_id IS NULL" }, "96f7ee6322ab9ae4a1a69d230e50dcbec45afacfbbf811d90078643fa67cd772": { "describe": { @@ -168,24 +165,6 @@ }, "query": "UPDATE okex_transfers SET state = 'deleted' WHERE lost = true AND state = 'pending' AND created_at < now() - interval '1 day'" }, - "d1a41ae07e59886a30a1750d7e8dd8f24c0652c267b25d8fd5cf3698dc8ca699": { - "describe": { - "columns": [ - { - "name": "amount", - "ordinal": 0, - "type_info": "Numeric" - } - ], - "nullable": [ - false - ], - "parameters": { - "Left": [] - } - }, - "query": "SELECT amount FROM synth_usd_liability ORDER BY idx DESC LIMIT 1" - }, "d393086f855e0eb9bae8575eabf08e0882c0dbaaa7235c8ef9f94954c1f4a9a4": { "describe": { "columns": [ diff --git a/user-trades/sqlx-data.json b/user-trades/sqlx-data.json index a85b7a83f..c30ff51ff 100644 --- a/user-trades/sqlx-data.json +++ b/user-trades/sqlx-data.json @@ -1,55 +1,5 @@ { "db": "PostgreSQL", - "059b953850061888b032cc6d58238ea8236ec2c0facf18471b71535531548e26": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Int4" - }, - { - "name": "buy_amount", - "ordinal": 1, - "type_info": "Numeric" - }, - { - "name": "buy_unit_id", - "ordinal": 2, - "type_info": "Int4" - }, - { - "name": "sell_amount", - "ordinal": 3, - "type_info": "Numeric" - }, - { - "name": "sell_unit_id", - "ordinal": 4, - "type_info": "Int4" - }, - { - "name": "external_ref", - "ordinal": 5, - "type_info": "Jsonb" - } - ], - "nullable": [ - false, - false, - false, - false, - false, - false - ], - "parameters": { - "Left": [ - "Uuid" - ] - } - }, - "query": "UPDATE user_trades\n SET ledger_tx_id = $1\n WHERE id = (\n SELECT id FROM user_trades WHERE ledger_tx_id IS NULL ORDER BY id LIMIT 1\n ) RETURNING id, buy_amount, buy_unit_id, sell_amount, sell_unit_id, external_ref" - }, "2b868fd5a78978ec8bc3bcd79008f831a139e070f994b0b8bfe4e8a3dd3105f7": { "describe": { "columns": [ @@ -68,125 +18,77 @@ }, "query": "SELECT cursor FROM galoy_transactions ORDER BY created_at DESC LIMIT 1" }, - "2bd20eee766a5eb278d25c67fb87c998f20a5af7c02a0b05c175af3d6464f9f7": { - "describe": { - "columns": [ - { - "name": "last_trade_id", - "ordinal": 0, - "type_info": "Int4" - } - ], - "nullable": [ - true - ], - "parameters": { - "Left": [] - } - }, - "query": "SELECT last_trade_id FROM user_trade_balances FOR UPDATE" - }, - "38b9a4487c09ce6e43890cc32001931eb0dfdf65ef8fa04d0a8b1553d2c3a035": { - "describe": { - "columns": [], - "nullable": [], - "parameters": { - "Left": [ - "Numeric", - "Int4", - "Int4" - ] - } - }, - "query": "UPDATE user_trade_balances SET current_balance = $1, last_trade_id = $2, updated_at = now() WHERE unit_id = $3" - }, - "5c5325c7dd64793f5f50c172ce6039cf510981c3c42e9040994202feef706ff6": { + "ecda7feba70249cfa67e14e09d73daaac625dcbf4b307ae664f26b9f5d0eef2d": { "describe": { "columns": [ { - "name": "unit_id", + "name": "id", "ordinal": 0, "type_info": "Int4" }, { - "name": "new_balance", + "name": "buy_amount", "ordinal": 1, "type_info": "Numeric" }, { - "name": "new_latest_id", + "name": "buy_unit: UserTradeUnit", "ordinal": 2, - "type_info": "Int4" - } - ], - "nullable": [ - false, - null, - null - ], - "parameters": { - "Left": [ - "Int4" - ] - } - }, - "query": " WITH amounts AS (\n SELECT MAX(id) as latest_id, buy_unit_id, SUM(buy_amount) AS to_sub, sell_unit_id, SUM(sell_amount) AS to_add \n FROM user_trades\n WHERE id > $1\n GROUP BY GROUPING SETS ((buy_unit_id), (sell_unit_id))\n )\nSELECT\n unit_id,\n current_balance + COALESCE(sell_amounts.to_add, 0) - COALESCE(buy_amounts.to_sub, 0) AS new_balance,\n MAX(GREATEST(sell_amounts.latest_id, buy_amounts.latest_id)) OVER () AS new_latest_id\nFROM user_trade_balances\nLEFT JOIN amounts buy_amounts ON unit_id = buy_amounts.buy_unit_id\nLEFT JOIN amounts sell_amounts ON unit_id = sell_amounts.sell_unit_id;\n" - }, - "9afafd1fc5d1bc1130b130a1c94cad02eba9ead36564e2b5962cd56d0a9fce66": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Int4" + "type_info": { + "Custom": { + "kind": { + "Enum": [ + "usd_cent", + "satoshi" + ] + }, + "name": "usertradeunit" + } + } }, { - "name": "name", - "ordinal": 1, - "type_info": "Varchar" - } - ], - "nullable": [ - false, - false - ], - "parameters": { - "Left": [] - } - }, - "query": "SELECT id, name FROM user_trade_units" - }, - "b64b1f2189c7b28ade7381b416bcd704cbb4a7df819db3d3f2e5e0cd3681736a": { - "describe": { - "columns": [ - { - "name": "unit_id", - "ordinal": 0, - "type_info": "Int4" + "name": "sell_amount", + "ordinal": 3, + "type_info": "Numeric" }, { - "name": "current_balance", - "ordinal": 1, - "type_info": "Numeric" + "name": "sell_unit: UserTradeUnit", + "ordinal": 4, + "type_info": { + "Custom": { + "kind": { + "Enum": [ + "usd_cent", + "satoshi" + ] + }, + "name": "usertradeunit" + } + } }, { - "name": "last_trade_id", - "ordinal": 2, - "type_info": "Int4" + "name": "external_ref", + "ordinal": 5, + "type_info": "Jsonb" } ], "nullable": [ false, false, - true + false, + false, + false, + false ], "parameters": { - "Left": [] + "Left": [ + "Uuid" + ] } }, - "query": "SELECT unit_id, current_balance, last_trade_id FROM user_trade_balances" + "query": "UPDATE user_trades\n SET ledger_tx_id = $1\n WHERE id = (\n SELECT id FROM user_trades WHERE ledger_tx_id IS NULL ORDER BY external_ref->>'timestamp' LIMIT 1\n ) RETURNING id, buy_amount, buy_unit as \"buy_unit: UserTradeUnit\", sell_amount, sell_unit as \"sell_unit: UserTradeUnit\", external_ref" }, - "f1353aa7ecf75375f32be2736a51f09fb13ed8492a0a3c51659743fa0a47b3ab": { + "fbeadad71170b3413e08ca82e3da340b4d6a0f5c6d7a963c08b590a568a714f5": { "describe": { "columns": [ { @@ -226,6 +128,6 @@ "Left": [] } }, - "query": "\n SELECT id, settlement_amount, settlement_currency, amount_in_usd_cents, created_at\n FROM galoy_transactions\n WHERE is_paired = false AND amount_in_usd_cents != 0 FOR UPDATE\n " + "query": "\n SELECT id, settlement_amount, settlement_currency, amount_in_usd_cents, created_at\n FROM galoy_transactions\n WHERE is_paired = false AND amount_in_usd_cents != 0 ORDER BY created_at FOR UPDATE\n " } } \ No newline at end of file From 5b3c142ec26650a91cf58e57099f7a94e0137393 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 8 Feb 2023 22:11:02 +0100 Subject: [PATCH 9/9] chore: bump sqlx-ledger --- Cargo.lock | 12 ++++++------ ledger/Cargo.toml | 2 +- ledger/src/constants.rs | 2 ++ ledger/src/templates/user_buys_usd.rs | 1 + ledger/src/templates/user_sells_usd.rs | 1 + 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f662ef417..b3e4d0462 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2954,9 +2954,9 @@ dependencies = [ [[package]] name = "sqlx-ledger" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "240fcaf9bfe280f2d5eb2fa012e316893e66107d9b730e7ccb147ad32a2290e6" +checksum = "5acb039f63cb50d5e627f4d14f93e16824d29f1c78dc0cf4f2ab56f431744b07" dependencies = [ "chrono", "derive_builder", @@ -2974,9 +2974,9 @@ dependencies = [ [[package]] name = "sqlx-ledger-cel-interpreter" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "603e33555f72842486ce0510c3ca07e97a8e806d9ac7cbb3caa5f8912380b01c" +checksum = "f56e76da7e38dcc3cb42cacf5121039f62737fc497d99104e46955a90c6c2bab" dependencies = [ "chrono", "rust_decimal", @@ -2989,9 +2989,9 @@ dependencies = [ [[package]] name = "sqlx-ledger-cel-parser" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2efd3797c60d0639c192bcf3eec5cc6a23c99c6813620a4a070ef3a74b5d22a" +checksum = "c60459bf75711a7a056bb1a002037aaceb18554ac951eb8b42dac46f11e1b950" dependencies = [ "lalrpop", "lalrpop-util", diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 53675f0b7..bbad64f7f 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -11,7 +11,7 @@ fail-on-warnings = [] [dependencies] shared = { path = "../shared", package = "stablesats-shared" } -sqlx-ledger = "0.4" +sqlx-ledger = "0.5" chrono = "0.4.23" rust_decimal = "1.28.1" diff --git a/ledger/src/constants.rs b/ledger/src/constants.rs index cc15e7bb8..57c5edc97 100644 --- a/ledger/src/constants.rs +++ b/ledger/src/constants.rs @@ -4,7 +4,9 @@ use uuid::{uuid, Uuid}; // Templates pub(super) const USER_BUYS_USD_CODE: &str = "USER_BUYS_USD"; +pub(super) const USER_BUYS_USD_ID: Uuid = uuid!("00000000-0000-0000-0000-000000000001"); pub(super) const USER_SELLS_USD_CODE: &str = "USER_SELLS_USD"; +pub(super) const USER_SELLS_USD_ID: Uuid = uuid!("00000000-0000-0000-0000-000000000002"); // Journal pub(super) const STABLESATS_JOURNAL_NAME: &str = "Stablesats"; diff --git a/ledger/src/templates/user_buys_usd.rs b/ledger/src/templates/user_buys_usd.rs index a36af89d3..1509e7902 100644 --- a/ledger/src/templates/user_buys_usd.rs +++ b/ledger/src/templates/user_buys_usd.rs @@ -118,6 +118,7 @@ impl UserBuysUsd { let params = UserBuysUsdParams::defs(); let template = NewTxTemplate::builder() + .id(USER_BUYS_USD_ID) .code(USER_BUYS_USD_CODE) .tx_input(tx_input) .entries(entries) diff --git a/ledger/src/templates/user_sells_usd.rs b/ledger/src/templates/user_sells_usd.rs index 8a3e11327..4149b37e6 100644 --- a/ledger/src/templates/user_sells_usd.rs +++ b/ledger/src/templates/user_sells_usd.rs @@ -118,6 +118,7 @@ impl UserSellsUsd { let params = UserSellsUsdParams::defs(); let template = NewTxTemplate::builder() + .id(USER_SELLS_USD_ID) .code(USER_SELLS_USD_CODE) .tx_input(tx_input) .entries(entries)