diff --git a/backend-rust/src/graphql_api.rs b/backend-rust/src/graphql_api.rs index 0dc34da4..814b4424 100644 --- a/backend-rust/src/graphql_api.rs +++ b/backend-rust/src/graphql_api.rs @@ -37,6 +37,7 @@ use prometheus_client::registry::Registry; use sqlx::{postgres::types::PgInterval, PgPool}; use std::{error::Error, str::FromStr, sync::Arc}; use tokio::{net::TcpListener, sync::broadcast}; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tokio_util::sync::CancellationToken; use transaction_metrics::TransactionMetricsQuery; @@ -997,42 +998,58 @@ LIMIT 30", // WHERE slot_time > (LOCALTIMESTAMP - $1::interval) } pub struct Subscription { - pub block_added: broadcast::Receiver>, + pub block_added: broadcast::Receiver, + pub accounts_updated: broadcast::Receiver, } + impl Subscription { pub fn new() -> (Self, SubscriptionContext) { let (block_added_sender, block_added) = broadcast::channel(100); + let (accounts_updated_sender, accounts_updated) = broadcast::channel(100); ( Subscription { block_added, + accounts_updated, }, SubscriptionContext { block_added_sender, + accounts_updated_sender, }, ) } } + #[Subscription] impl Subscription { - async fn block_added( - &self, - ) -> impl Stream, tokio_stream::wrappers::errors::BroadcastStreamRecvError>> - { + async fn block_added(&self) -> impl Stream> { tokio_stream::wrappers::BroadcastStream::new(self.block_added.resubscribe()) } + + async fn accounts_updated( + &self, + // TODO: What to do with this? + account_address: String, + ) -> impl Stream> { + tokio_stream::wrappers::BroadcastStream::new(self.accounts_updated.resubscribe()) + } } + pub struct SubscriptionContext { - block_added_sender: broadcast::Sender>, + block_added_sender: broadcast::Sender, + accounts_updated_sender: broadcast::Sender, } + impl SubscriptionContext { + const ACCOUNTS_UPDATED_CHANNEL: &'static str = "accounts_updated"; const BLOCK_ADDED_CHANNEL: &'static str = "block_added"; pub async fn listen(self, pool: PgPool, stop_signal: CancellationToken) -> anyhow::Result<()> { let mut listener = sqlx::postgres::PgListener::connect_with(&pool) .await .context("Failed to create a postgreSQL listener")?; + listener - .listen_all([Self::BLOCK_ADDED_CHANNEL]) + .listen_all([Self::BLOCK_ADDED_CHANNEL, Self::ACCOUNTS_UPDATED_CHANNEL]) .await .context("Failed to listen to postgreSQL notifications")?; @@ -1045,22 +1062,36 @@ impl SubscriptionContext { let block_height = BlockHeight::from_str(notification.payload()) .context("Failed to parse payload of block added")?; let block = Block::query_by_height(&pool, block_height).await?; - self.block_added_sender.send(Arc::new(block))?; + self.block_added_sender.send(block)?; + } + + Self::ACCOUNTS_UPDATED_CHANNEL => { + self.accounts_updated_sender.send(AccountsUpdatedSubscriptionItem { + address: notification.payload().to_string(), + })?; } + unknown => { - anyhow::bail!("Unknown channel {}", unknown); + anyhow::bail!("Received notification on unknown channel: {unknown}"); } } } }) .await; + if let Some(result) = exit { result.context("Failed listening")?; } + Ok(()) } } +#[derive(Clone, Debug, SimpleObject)] +pub struct AccountsUpdatedSubscriptionItem { + address: String, +} + /// The UnsignedLong scalar type represents a unsigned 64-bit numeric /// non-fractional value greater than or equal to 0. #[derive( @@ -1214,7 +1245,7 @@ struct Versions { backend_versions: String, } -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, Clone, sqlx::FromRow)] pub struct Block { hash: BlockHash, height: BlockHeight,