diff --git a/Cargo.lock b/Cargo.lock index 1309c3b7b..d7d221f33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1236,6 +1236,7 @@ dependencies = [ "serial_test", "sqlx", "sqlxmq", + "stablesats-ledger", "stablesats-shared", "thiserror", "tokio", @@ -2962,9 +2963,9 @@ dependencies = [ [[package]] name = "sqlx-ledger" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9742b50fc9b7fac2bbd2d03425e93a71713d16b888a42aba895ab4582511ad71" +checksum = "240fcaf9bfe280f2d5eb2fa012e316893e66107d9b730e7ccb147ad32a2290e6" dependencies = [ "chrono", "derive_builder", @@ -2982,9 +2983,9 @@ dependencies = [ [[package]] name = "sqlx-ledger-cel-interpreter" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc118d8a91cc87bb0608dd5fb9d454891b0fddf2860e6e38073dbdf5fcb91b79" +checksum = "603e33555f72842486ce0510c3ca07e97a8e806d9ac7cbb3caa5f8912380b01c" dependencies = [ "chrono", "rust_decimal", @@ -2997,9 +2998,9 @@ dependencies = [ [[package]] name = "sqlx-ledger-cel-parser" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70291f8e3fb865769e8aa645b9741291282fc50ac86bc2a816bbaea179e9eb04" +checksum = "a2efd3797c60d0639c192bcf3eec5cc6a23c99c6813620a4a070ef3a74b5d22a" dependencies = [ "lalrpop", "lalrpop-util", @@ -3112,6 +3113,7 @@ dependencies = [ "serde_json", "sqlx", "sqlx-ledger", + "stablesats-shared", "thiserror", "tokio", "tracing", diff --git a/hedging/Cargo.toml b/hedging/Cargo.toml index aa0624246..158073145 100644 --- a/hedging/Cargo.toml +++ b/hedging/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" fail-on-warnings = [] [dependencies] +ledger = { path = "../ledger", package = "stablesats-ledger" } shared = { path = "../shared", package = "stablesats-shared" } okex-client = { path = "../okex-client" } bitfinex-client = { path = "../bitfinex-client" } diff --git a/hedging/src/app/mod.rs b/hedging/src/app/mod.rs index d29885d19..16fd7616a 100644 --- a/hedging/src/app/mod.rs +++ b/hedging/src/app/mod.rs @@ -1,23 +1,20 @@ mod config; use futures::stream::StreamExt; -use opentelemetry::{propagation::TextMapPropagator, sdk::propagation::TraceContextPropagator}; use sqlxmq::OwnedHandle; use tracing::{info_span, Instrument}; -use tracing_opentelemetry::OpenTelemetrySpanExt; use galoy_client::*; use okex_client::*; use shared::{ exchanges_config::OkexConfig, health::HealthCheckTrigger, - payload::{OkexBtcUsdSwapPositionPayload, PriceStreamPayload, SynthUsdLiabilityPayload}, + payload::{OkexBtcUsdSwapPositionPayload, PriceStreamPayload}, pubsub::{memory, CorrelationId, PubSubConfig, Publisher, Subscriber}, }; use crate::{ adjustment_action::*, error::*, job, okex_orders::*, okex_transfers::*, rebalance_action::*, - synth_usd_liability::*, }; pub use config::*; @@ -43,7 +40,6 @@ impl HedgingApp { pubsub_config: PubSubConfig, price_receiver: memory::Subscriber, ) -> Result { - let synth_usd_liability = SynthUsdLiability::new(pool.clone()); let okex_orders = OkexOrders::new(pool.clone()).await?; let okex_transfers = OkexTransfers::new(pool.clone()).await?; let okex = OkexClient::new(okex_client_config).await?; @@ -52,9 +48,10 @@ impl HedgingApp { let funding_adjustment = FundingAdjustment::new(funding_config.clone(), hedging_config.clone()); let hedging_adjustment = HedgingAdjustment::new(hedging_config); + let ledger = ledger::Ledger::init(&pool).await?; let job_runner = job::start_job_runner( pool.clone(), - synth_usd_liability.clone(), + ledger.clone(), okex.clone(), okex_orders, okex_transfers.clone(), @@ -66,19 +63,17 @@ impl HedgingApp { funding_config.clone(), ) .await?; - let liability_sub = - Self::spawn_synth_usd_listener(pubsub_config.clone(), synth_usd_liability.clone()) - .await?; + Self::spawn_liability_balance_listener(pool.clone(), ledger.clone()).await?; let position_sub = Self::spawn_okex_position_listener( pubsub_config.clone(), pool.clone(), - synth_usd_liability.clone(), + ledger.clone(), hedging_adjustment, ) .await?; Self::spawn_okex_price_listener( pool.clone(), - synth_usd_liability, + ledger, okex, funding_adjustment, price_receiver.resubscribe(), @@ -87,7 +82,6 @@ impl HedgingApp { Self::spawn_health_checker( health_check_trigger, health_cfg, - liability_sub, position_sub, price_receiver, ) @@ -101,7 +95,7 @@ impl HedgingApp { async fn spawn_okex_price_listener( pool: sqlx::PgPool, - synth_usd_liability: SynthUsdLiability, + ledger: ledger::Ledger, okex: OkexClient, funding_adjustment: FundingAdjustment, mut tick_recv: memory::Subscriber, @@ -122,7 +116,7 @@ impl HedgingApp { let _ = Self::handle_received_okex_price( correlation_id, &pool, - &synth_usd_liability, + &ledger, &okex, funding_adjustment.clone(), ) @@ -137,11 +131,11 @@ impl HedgingApp { async fn handle_received_okex_price( correlation_id: CorrelationId, pool: &sqlx::PgPool, - synth_usd_liability: &SynthUsdLiability, + ledger: &ledger::Ledger, okex: &OkexClient, funding_adjustment: FundingAdjustment, ) -> Result<(), HedgingError> { - let target_liability_in_cents = synth_usd_liability.get_latest_liability().await?; + let target_liability_in_cents = ledger.balances().target_liability_in_cents().await?; let current_position_in_cents = okex.get_position_in_signed_usd_cents().await?.usd_cents; let last_price_in_usd_cents = okex.get_last_price_in_usd_cents().await?.usd_cents; let trading_available_balance = okex.trading_account_balance().await?; @@ -172,40 +166,31 @@ impl HedgingApp { } } - async fn spawn_synth_usd_listener( - config: PubSubConfig, - synth_usd_liability: SynthUsdLiability, - ) -> Result { - let mut subscriber = Subscriber::new(config).await?; - let mut stream = subscriber.subscribe::().await?; - tokio::spawn(async move { - let propagator = TraceContextPropagator::new(); - - while let Some(msg) = stream.next().await { - let correlation_id = msg.meta.correlation_id; - let span = info_span!( - "synth_usd_liability_received", - message_type = %msg.payload_type, - correlation_id = %correlation_id - ); - let context = propagator.extract(&msg.meta.tracing_data); - span.set_parent(context); - let _ = Self::handle_received_synth_usd_liability( - msg.payload, - correlation_id, - &synth_usd_liability, - ) - .instrument(span) - .await; + async fn spawn_liability_balance_listener( + pool: sqlx::PgPool, + ledger: ledger::Ledger, + ) -> Result<(), HedgingError> { + let mut events = ledger.usd_liability_balance_events().await; + loop { + match events.recv().await { + Ok(received) => { + if let ledger::LedgerEventData::BalanceUpdated(data) = received.data { + job::spawn_adjust_hedge(&pool, data.entry_id).await?; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => (), + _ => { + break; + } } - }); - Ok(subscriber) + } + Ok(()) } async fn spawn_okex_position_listener( config: PubSubConfig, pool: sqlx::PgPool, - synth_usd_liability: SynthUsdLiability, + ledger: ledger::Ledger, hedging_adjustment: HedgingAdjustment, ) -> Result { let mut subscriber = Subscriber::new(config).await?; @@ -228,7 +213,7 @@ impl HedgingApp { msg.payload, correlation_id, &pool, - &synth_usd_liability, + &ledger, hedging_adjustment.clone(), ) .instrument(span) @@ -241,16 +226,12 @@ impl HedgingApp { async fn spawn_health_checker( mut health_check_trigger: HealthCheckTrigger, health_cfg: HedgingAppHealthConfig, - liability_sub: Subscriber, position_sub: Subscriber, price_sub: memory::Subscriber, ) { tokio::spawn(async move { while let Some(check) = health_check_trigger.next().await { match ( - liability_sub - .healthy(health_cfg.unhealthy_msg_interval_liability) - .await, position_sub .healthy(health_cfg.unhealthy_msg_interval_position) .await, @@ -258,7 +239,7 @@ impl HedgingApp { .healthy(health_cfg.unhealthy_msg_interval_price) .await, ) { - (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => { + (Err(e), _) | (_, Err(e)) => { check.send(Err(e)).expect("Couldn't send response") } _ => check.send(Ok(())).expect("Couldn't send response"), @@ -267,36 +248,14 @@ impl HedgingApp { }); } - async fn handle_received_synth_usd_liability( - payload: SynthUsdLiabilityPayload, - correlation_id: CorrelationId, - synth_usd_liability: &SynthUsdLiability, - ) -> Result<(), HedgingError> { - match synth_usd_liability - .insert_if_new(correlation_id, payload.liability) - .await - { - Ok(Some(mut tx)) => { - job::spawn_adjust_hedge(&mut tx, correlation_id).await?; - tx.commit().await?; - Ok(()) - } - Ok(None) => Ok(()), - Err(e) => { - shared::tracing::insert_error_fields(tracing::Level::ERROR, &e); - Err(e) - } - } - } - async fn handle_received_okex_position( payload: OkexBtcUsdSwapPositionPayload, correlation_id: CorrelationId, pool: &sqlx::PgPool, - synth_usd_liability: &SynthUsdLiability, + ledger: &ledger::Ledger, hedging_adjustment: HedgingAdjustment, ) -> Result<(), HedgingError> { - let amount = synth_usd_liability.get_latest_liability().await?; + let amount = ledger.balances().target_liability_in_cents().await?; if hedging_adjustment .determine_action(amount, payload.signed_usd_exposure) .action_required() diff --git a/hedging/src/error.rs b/hedging/src/error.rs index 580fbf515..2f1ba246b 100644 --- a/hedging/src/error.rs +++ b/hedging/src/error.rs @@ -26,6 +26,8 @@ pub enum HedgingError { BitfinextClient(#[from] bitfinex_client::BitfinexClientError), #[error("HedgingError - NoJobDataPresent")] NoJobDataPresent, + #[error("UserTradesError - Leger: {0}")] + Ledger(#[from] ledger::LedgerError), } impl JobExecutionError for HedgingError {} diff --git a/hedging/src/job/adjust_funding.rs b/hedging/src/job/adjust_funding.rs index 01fbf2e53..e36d78619 100644 --- a/hedging/src/job/adjust_funding.rs +++ b/hedging/src/job/adjust_funding.rs @@ -6,7 +6,7 @@ use galoy_client::*; use okex_client::*; use shared::pubsub::CorrelationId; -use crate::{error::*, okex_transfers::*, rebalance_action::*, synth_usd_liability::*}; +use crate::{error::*, okex_transfers::*, rebalance_action::*}; const SATS_PER_BTC: Decimal = dec!(100_000_000); @@ -16,7 +16,7 @@ const SATS_PER_BTC: Decimal = dec!(100_000_000); transferred_funding) err)] pub(super) async fn execute( correlation_id: CorrelationId, - synth_usd_liability: SynthUsdLiability, + ledger: ledger::Ledger, okex: OkexClient, okex_transfers: OkexTransfers, galoy: GaloyClient, @@ -24,7 +24,7 @@ pub(super) async fn execute( ) -> Result<(), HedgingError> { let span = tracing::Span::current(); - let target_liability_in_cents = synth_usd_liability.get_latest_liability().await?; + let target_liability_in_cents = ledger.balances().target_liability_in_cents().await?; span.record( "target_liability", &tracing::field::display(target_liability_in_cents), diff --git a/hedging/src/job/adjust_hedge.rs b/hedging/src/job/adjust_hedge.rs index 7c10cd2aa..3cac7a2e5 100644 --- a/hedging/src/job/adjust_hedge.rs +++ b/hedging/src/job/adjust_hedge.rs @@ -4,19 +4,19 @@ use tracing::instrument; use okex_client::*; use shared::pubsub::CorrelationId; -use crate::{adjustment_action::*, error::*, okex_orders::*, synth_usd_liability::*}; +use crate::{adjustment_action::*, error::*, okex_orders::*}; #[instrument(name = "adjust_hedge", skip_all, fields(correlation_id = %correlation_id, target_liability, current_position, action, placed_order, client_order_id) err)] pub(super) async fn execute( correlation_id: CorrelationId, - synth_usd_liability: SynthUsdLiability, + ledger: ledger::Ledger, okex: OkexClient, okex_orders: OkexOrders, hedging_adjustment: HedgingAdjustment, ) -> Result<(), HedgingError> { let span = tracing::Span::current(); - let target_liability = synth_usd_liability.get_latest_liability().await?; + let target_liability = ledger.balances().target_liability_in_cents().await?; span.record( "target_liability", &tracing::field::display(target_liability), diff --git a/hedging/src/job/mod.rs b/hedging/src/job/mod.rs index b88aad300..8c6f581bd 100644 --- a/hedging/src/job/mod.rs +++ b/hedging/src/job/mod.rs @@ -19,7 +19,7 @@ use shared::{ use crate::{ adjustment_action::*, app::FundingSectionConfig, error::*, okex_orders::OkexOrders, - okex_transfers::OkexTransfers, rebalance_action::*, synth_usd_liability::*, + okex_transfers::OkexTransfers, rebalance_action::*, }; pub const POLL_OKEX_ID: Uuid = uuid!("10000000-0000-0000-0000-000000000001"); @@ -30,7 +30,7 @@ struct OkexPollDelay(std::time::Duration); #[allow(clippy::too_many_arguments)] pub async fn start_job_runner( pool: sqlx::PgPool, - synth_usd_liability: SynthUsdLiability, + ledger: ledger::Ledger, okex: OkexClient, okex_orders: OkexOrders, okex_transfers: OkexTransfers, @@ -42,7 +42,7 @@ pub async fn start_job_runner( funding_config: FundingSectionConfig, ) -> Result { let mut registry = JobRegistry::new(&[adjust_hedge, poll_okex, adjust_funding]); - registry.set_context(synth_usd_liability); + registry.set_context(ledger); registry.set_context(okex); registry.set_context(okex_orders); registry.set_context(okex_transfers); @@ -91,15 +91,16 @@ struct AdjustHedgeData { #[instrument(skip_all, fields(error, error.message) err)] pub async fn spawn_adjust_hedge<'a>( tx: impl Executor<'a, Database = Postgres>, - correlation_id: CorrelationId, + trigger_id: impl Into, ) -> Result<(), HedgingError> { + let correlation_id = trigger_id.into(); match JobBuilder::new_with_id(Uuid::from(correlation_id), "adjust_hedge") .set_ordered(true) .set_channel_name("hedging") .set_channel_args("adjust_hedge") .set_json(&AdjustHedgeData { tracing_data: shared::tracing::extract_tracing_data(), - correlation_id, + correlation_id: CorrelationId::from(correlation_id), }) .expect("Couldn't set json") .spawn(tx) @@ -138,7 +139,7 @@ async fn poll_okex( #[job(name = "adjust_hedge")] async fn adjust_hedge( mut current_job: CurrentJob, - synth_usd_liability: SynthUsdLiability, + ledger: ledger::Ledger, okex: OkexClient, okex_orders: OkexOrders, hedging_adjustment: HedgingAdjustment, @@ -150,7 +151,7 @@ async fn adjust_hedge( let data: AdjustHedgeData = data.ok_or(HedgingError::NoJobDataPresent)?; adjust_hedge::execute( data.correlation_id, - synth_usd_liability, + ledger, okex, okex_orders, hedging_adjustment, @@ -172,15 +173,16 @@ struct AdjustFundingData { #[instrument(skip_all, fields(error, error.message) err)] pub async fn spawn_adjust_funding<'a>( tx: impl Executor<'a, Database = Postgres>, - correlation_id: CorrelationId, + trigger_id: impl Into, ) -> Result<(), HedgingError> { - match JobBuilder::new_with_id(Uuid::from(correlation_id), "adjust_funding") + let correlation_id = trigger_id.into(); + match JobBuilder::new_with_id(correlation_id, "adjust_funding") .set_ordered(true) .set_channel_name("hedging") .set_channel_args("adjust_funding") .set_json(&AdjustFundingData { tracing_data: shared::tracing::extract_tracing_data(), - correlation_id, + correlation_id: CorrelationId::from(correlation_id), }) .expect("Couldn't set json") .spawn(tx) @@ -198,7 +200,7 @@ pub async fn spawn_adjust_funding<'a>( #[job(name = "adjust_funding")] async fn adjust_funding( mut current_job: CurrentJob, - synth_usd_liability: SynthUsdLiability, + ledger: ledger::Ledger, okex: OkexClient, okex_transfers: OkexTransfers, galoy: GaloyClient, @@ -211,7 +213,7 @@ async fn adjust_funding( let data: AdjustFundingData = data.ok_or(HedgingError::NoJobDataPresent)?; adjust_funding::execute( data.correlation_id, - synth_usd_liability, + ledger, okex, okex_transfers, galoy, diff --git a/hedging/src/lib.rs b/hedging/src/lib.rs index 2bdc05155..e65a0bf84 100644 --- a/hedging/src/lib.rs +++ b/hedging/src/lib.rs @@ -7,7 +7,6 @@ mod error; mod okex_orders; mod okex_transfers; mod rebalance_action; -mod synth_usd_liability; pub mod job; diff --git a/hedging/src/synth_usd_liability/mod.rs b/hedging/src/synth_usd_liability/mod.rs deleted file mode 100644 index c6ab69d4a..000000000 --- a/hedging/src/synth_usd_liability/mod.rs +++ /dev/null @@ -1,51 +0,0 @@ -use rust_decimal::Decimal; -use sqlx::{Executor, PgPool, Postgres, Transaction}; -use uuid::Uuid; - -use shared::{payload::SyntheticCentLiability, pubsub::CorrelationId}; - -use crate::error::HedgingError; - -#[derive(Clone)] -pub struct SynthUsdLiability { - pool: PgPool, -} - -impl SynthUsdLiability { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } - - pub async fn insert_if_new<'a>( - &self, - correlation_id: CorrelationId, - amount: SyntheticCentLiability, - ) -> Result>, HedgingError> { - let mut tx = self.pool.begin().await?; - tx.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;") - .await?; - - let result = sqlx::query_file!( - "src/synth_usd_liability/sql/insert-if-new.sql", - Decimal::from(amount), - Uuid::from(correlation_id) - ) - .fetch_all(&mut tx) - .await?; - - if result.is_empty() { - Ok(None) - } else { - Ok(Some(tx)) - } - } - - pub async fn get_latest_liability(&self) -> Result { - let result = - sqlx::query!("SELECT amount FROM synth_usd_liability ORDER BY idx DESC LIMIT 1") - .fetch_one(&self.pool) - .await?; - - Ok(SyntheticCentLiability::try_from(result.amount).expect("invalid liability")) - } -} diff --git a/hedging/src/synth_usd_liability/sql/insert-if-new.sql b/hedging/src/synth_usd_liability/sql/insert-if-new.sql deleted file mode 100644 index 890b9efc2..000000000 --- a/hedging/src/synth_usd_liability/sql/insert-if-new.sql +++ /dev/null @@ -1,8 +0,0 @@ -INSERT INTO synth_usd_liability (amount, correlation_id) - SELECT $1, $2 - WHERE NOT EXISTS ( - SELECT FROM ( - SELECT amount FROM synth_usd_liability ORDER BY idx DESC LIMIT 1 - ) last_row WHERE amount = $1 - ) -RETURNING amount; diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index f8f1af8b6..83b1e7479 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -9,7 +9,9 @@ edition = "2021" fail-on-warnings = [] [dependencies] -sqlx-ledger = "0.3" +shared = { path = "../shared", package = "stablesats-shared" } + +sqlx-ledger = "0.4" chrono = "0.4.23" rust_decimal = "1.28.0" @@ -18,9 +20,9 @@ serde = "1.0.152" serde_json = "1.0.91" sqlx = { version = "0.6", features = [ "offline", "runtime-tokio-rustls", "postgres", "decimal", "uuid", "chrono"] } thiserror = "1.0.38" +tokio = "1.25.0" tracing = "0.1.37" uuid = "1.3.0" [dev-dependencies] anyhow = "1.0.68" -tokio = "1.25.0" diff --git a/ledger/src/balances.rs b/ledger/src/balances.rs index 0aaec7a68..0445685a2 100644 --- a/ledger/src/balances.rs +++ b/ledger/src/balances.rs @@ -1,7 +1,9 @@ +use rust_decimal::Decimal; use sqlx_ledger::{balance::AccountBalance, AccountId as LedgerAccountId, Currency, SqlxLedger}; use tracing::instrument; use crate::{constants::*, LedgerError}; +use shared::payload::SyntheticCentLiability; pub struct Balances<'a> { pub(super) inner: &'a SqlxLedger, @@ -15,6 +17,14 @@ impl<'a> Balances<'a> { .await } + pub async fn target_liability_in_cents(&self) -> Result { + let liability = self.stablesats_liability().await?; + Ok(SyntheticCentLiability::try_from( + liability.map(|l| l.settled()).unwrap_or(Decimal::ZERO) * CENTS_PER_USD, + ) + .expect("usd liability has wrong sign")) + } + pub async fn stablesats_btc_wallet(&self) -> Result, LedgerError> { self.get_ledger_account_balance(STABLESATS_BTC_WALLET_ID, self.btc) .await diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 5e8544f97..284465043 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -2,6 +2,7 @@ #![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))] use sqlx::{PgPool, Postgres, Transaction}; +use tokio::sync::broadcast; use tracing::instrument; mod balances; @@ -13,14 +14,23 @@ use constants::*; pub use error::*; pub use templates::*; -pub use sqlx_ledger::TransactionId as LedgerTxId; use sqlx_ledger::{ - account::NewAccount, journal::*, Currency, DebitOrCredit, SqlxLedger, SqlxLedgerError, + account::NewAccount, + event::{EventSubscriber, SqlxLedgerEvent}, + journal::*, + Currency, DebitOrCredit, SqlxLedger, SqlxLedgerError, }; +pub use sqlx_ledger::{ + event::{SqlxLedgerEvent as LedgerEvent, SqlxLedgerEventData as LedgerEventData}, + TransactionId as LedgerTxId, +}; + +const DEFAULT_BUFFER_SIZE: usize = 100; #[derive(Debug, Clone)] pub struct Ledger { inner: SqlxLedger, + events: EventSubscriber, usd: Currency, btc: Currency, } @@ -40,6 +50,7 @@ impl Ledger { templates::UserSellsUsd::init(&inner).await?; Ok(Self { + events: inner.events(DEFAULT_BUFFER_SIZE).await?, inner, usd: "USD".parse().unwrap(), btc: "BTC".parse().unwrap(), @@ -80,6 +91,12 @@ impl Ledger { Ok(()) } + pub async fn usd_liability_balance_events(&self) -> broadcast::Receiver { + self.events + .account_balance(STABLESATS_JOURNAL_ID.into(), STABLESATS_LIABILITY_ID.into()) + .await + } + #[instrument(name = "ledger.create_stablesats_journal", skip(ledger))] async fn create_stablesats_journal(ledger: &SqlxLedger) -> Result<(), LedgerError> { let new_journal = NewJournal::builder() diff --git a/migrations/20230206130127_remove-synth-usd-liability.down.sql b/migrations/20230206130127_remove-synth-usd-liability.down.sql new file mode 100644 index 000000000..575245543 --- /dev/null +++ b/migrations/20230206130127_remove-synth-usd-liability.down.sql @@ -0,0 +1,6 @@ +CREATE TABLE synth_usd_liability ( + idx SERIAL PRIMARY KEY, + correlation_id UUID UNIQUE NOT NULL, + amount NUMERIC NOT NULL, + recorded_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); diff --git a/migrations/20230206130127_remove-synth-usd-liability.up.sql b/migrations/20230206130127_remove-synth-usd-liability.up.sql new file mode 100644 index 000000000..6e238680a --- /dev/null +++ b/migrations/20230206130127_remove-synth-usd-liability.up.sql @@ -0,0 +1 @@ +DROP TABLE synth_usd_liability; diff --git a/shared/src/pubsub/message.rs b/shared/src/pubsub/message.rs index b6578d594..f3c42ab0c 100644 --- a/shared/src/pubsub/message.rs +++ b/shared/src/pubsub/message.rs @@ -23,6 +23,11 @@ impl From for Uuid { id.0 } } +impl From for CorrelationId { + fn from(id: Uuid) -> Self { + Self(id) + } +} impl std::fmt::Display for CorrelationId { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}", self.0)