From 07e760969b4c90be2e04d4699cbf546fe8e78bc2 Mon Sep 17 00:00:00 2001 From: vaibhav Date: Wed, 15 Nov 2023 19:38:41 +0530 Subject: [PATCH] feat: call quotes run fn from cli (#475) --- Cargo.lock | 1 + cli/Cargo.toml | 1 + cli/src/app.rs | 43 ++++++++++++++++++++++++++++++++++++ cli/src/config.rs | 35 +++++++++++++++++++++++++++++ quotes-server/src/app/mod.rs | 2 +- quotes-server/src/lib.rs | 4 ++-- 6 files changed, 83 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0718301cc..e83c83403 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3530,6 +3530,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "price-server", + "quotes-server", "rust_decimal", "serde", "serde_yaml", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index f84a99bcf..9841a5662 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -10,6 +10,7 @@ description = "The stablesats cli binary" [dependencies] shared = { path = "../shared", package = "stablesats-shared" } price-server = { path = "../price-server" } +quotes-server = { path = "../quotes-server" } user-trades = { path = "../user-trades" } galoy-client = { path = "../galoy-client" } okex-client = { path = "../okex-client" } diff --git a/cli/src/app.rs b/cli/src/app.rs index 148741834..3232ff0b6 100644 --- a/cli/src/app.rs +++ b/cli/src/app.rs @@ -120,6 +120,7 @@ async fn run_cmd( hedging, exchanges, bria, + quotes_server, }: Config, ) -> anyhow::Result<()> { println!("Stablesats - v{}", env!("CARGO_PKG_VERSION")); @@ -223,6 +224,38 @@ async fn run_cmd( } } + if quotes_server.enabled { + println!("Starting quotes_server"); + + let quotes_send = send.clone(); + let pool = if let Some(pool) = pool.as_ref() { + pool.clone() + } else { + crate::db::init_pool(&db).await? + }; + let (snd, recv) = futures::channel::mpsc::unbounded(); + checkers.insert("quotes", snd); + let price = price_recv.resubscribe(); + let weights = extract_weights_for_quotes_server(&exchanges); + handles.push(tokio::spawn(async move { + let _ = quotes_send.try_send( + quotes_server::run( + pool, + recv, + quotes_server.health, + quotes_server.server, + quotes_server.fees, + price, + quotes_server.price_cache, + weights, + quotes_server.config, + ) + .await + .context("Quote Server error"), + ); + })); + } + if user_trades.enabled { println!("Starting user trades process"); @@ -271,3 +304,13 @@ fn extract_weights(config: &hedging::ExchangesConfig) -> price_server::ExchangeW bitfinex: None, } } + +fn extract_weights_for_quotes_server( + config: &hedging::ExchangesConfig, +) -> quotes_server::ExchangeWeights { + quotes_server::ExchangeWeights { + okex: config.okex.as_ref().map(|c| c.weight), + + bitfinex: None, + } +} diff --git a/cli/src/config.rs b/cli/src/config.rs index 14f94f273..e93623e57 100644 --- a/cli/src/config.rs +++ b/cli/src/config.rs @@ -8,6 +8,10 @@ use hedging::{ExchangesConfig, HedgingAppConfig}; use price_server::{ ExchangePriceCacheConfig, FeeCalculatorConfig, PriceServerConfig, PriceServerHealthCheckConfig, }; +use quotes_server::{ + QuotesConfig, QuotesExchangePriceCacheConfig, QuotesFeeCalculatorConfig, QuotesServerConfig, + QuotesServerHealthCheckConfig, +}; use user_trades::UserTradesConfig; use super::{db::DbConfig, tracing::TracingConfig}; @@ -32,6 +36,8 @@ pub struct Config { pub exchanges: ExchangesConfig, #[serde(default)] pub bria: BriaClientConfig, + #[serde(default)] + pub quotes_server: QuotesServerWrapper, } pub struct EnvOverride { @@ -101,6 +107,35 @@ impl Default for PriceServerWrapper { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QuotesServerWrapper { + #[serde(default = "bool_true")] + pub enabled: bool, + #[serde(default)] + pub health: QuotesServerHealthCheckConfig, + #[serde(default)] + pub server: QuotesServerConfig, + #[serde(default)] + pub fees: QuotesFeeCalculatorConfig, + #[serde(default)] + pub price_cache: QuotesExchangePriceCacheConfig, + #[serde(default)] + pub config: QuotesConfig, +} + +impl Default for QuotesServerWrapper { + fn default() -> Self { + Self { + enabled: true, + server: QuotesServerConfig::default(), + health: QuotesServerHealthCheckConfig::default(), + fees: QuotesFeeCalculatorConfig::default(), + price_cache: QuotesExchangePriceCacheConfig::default(), + config: QuotesConfig::default(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct BitfinexPriceFeedConfigWrapper { #[serde(default)] diff --git a/quotes-server/src/app/mod.rs b/quotes-server/src/app/mod.rs index fdc63d98f..cc7a95b85 100644 --- a/quotes-server/src/app/mod.rs +++ b/quotes-server/src/app/mod.rs @@ -28,6 +28,7 @@ pub struct QuotesApp { #[allow(clippy::too_many_arguments)] impl QuotesApp { pub async fn run( + pool: sqlx::PgPool, mut health_check_trigger: HealthCheckTrigger, health_check_cfg: QuotesServerHealthCheckConfig, fee_calc_cfg: QuotesFeeCalculatorConfig, @@ -35,7 +36,6 @@ impl QuotesApp { price_cache_config: QuotesExchangePriceCacheConfig, exchange_weights: ExchangeWeights, config: QuotesConfig, - pool: sqlx::PgPool, ) -> Result { let health_subscriber = subscriber.resubscribe(); tokio::spawn(async move { diff --git a/quotes-server/src/lib.rs b/quotes-server/src/lib.rs index c4dccc154..5ff6ac09d 100644 --- a/quotes-server/src/lib.rs +++ b/quotes-server/src/lib.rs @@ -20,6 +20,7 @@ pub use server::*; #[allow(clippy::too_many_arguments)] pub async fn run( + pool: sqlx::PgPool, health_check_trigger: HealthCheckTrigger, health_check_cfg: QuotesServerHealthCheckConfig, server_config: QuotesServerConfig, @@ -27,10 +28,10 @@ pub async fn run( subscriber: memory::Subscriber, price_cache_config: QuotesExchangePriceCacheConfig, exchange_weights: ExchangeWeights, - pool: sqlx::PgPool, quotes_config: QuotesConfig, ) -> Result<(), QuotesServerError> { let app = QuotesApp::run( + pool, health_check_trigger, health_check_cfg, fee_calc_cfg, @@ -38,7 +39,6 @@ pub async fn run( price_cache_config, exchange_weights, quotes_config, - pool, ) .await?;