diff --git a/.changesets/fix_bnjjj_fix_subscription_metrics.md b/.changesets/fix_bnjjj_fix_subscription_metrics.md new file mode 100644 index 0000000000..d1c78311a9 --- /dev/null +++ b/.changesets/fix_bnjjj_fix_subscription_metrics.md @@ -0,0 +1,5 @@ +### `apollo_router_opened_subscriptions` counter was inaccurate ([PR #5363](https://github.com/apollographql/router/pull/5363)) + +The counter `apollo_router_opened_subscriptions` was only incrementing and never decrementing. Now it's handled properly. + +By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5363 \ No newline at end of file diff --git a/apollo-router/src/notification.rs b/apollo-router/src/notification.rs index e836b72d30..3dc3c01d48 100644 --- a/apollo-router/src/notification.rs +++ b/apollo-router/src/notification.rs @@ -132,6 +132,27 @@ enum Notification { }, } +impl Debug for Notification { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::CreateOrSubscribe { .. } => f.debug_struct("CreateOrSubscribe").finish(), + Self::Subscribe { .. } => f.debug_struct("Subscribe").finish(), + Self::SubscribeIfExist { .. } => f.debug_struct("SubscribeIfExist").finish(), + Self::Unsubscribe { .. } => f.debug_struct("Unsubscribe").finish(), + Self::ForceDelete { .. } => f.debug_struct("ForceDelete").finish(), + Self::Exist { .. } => f.debug_struct("Exist").finish(), + Self::InvalidIds { .. } => f.debug_struct("InvalidIds").finish(), + Self::UpdateHeartbeat { .. } => f.debug_struct("UpdateHeartbeat").finish(), + #[cfg(test)] + Self::TryDelete { .. } => f.debug_struct("TryDelete").finish(), + #[cfg(test)] + Self::Broadcast { .. } => f.debug_struct("Broadcast").finish(), + #[cfg(test)] + Self::Debug { .. } => f.debug_struct("Debug").finish(), + } + } +} + /// In memory pub/sub implementation #[derive(Clone)] pub struct Notify { @@ -154,7 +175,7 @@ where queue_size: Option, ) -> Notify { let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE); - let receiver_stream = ReceiverStream::new(receiver); + let receiver_stream: ReceiverStream> = ReceiverStream::new(receiver); tokio::task::spawn(task(receiver_stream, ttl, heartbeat_error_message)); Notify { sender, @@ -606,11 +627,6 @@ async fn task( _ = ttl_fut.next() => { let heartbeat_error_message = heartbeat_error_message.clone(); pubsub.kill_dead_topics(heartbeat_error_message).await; - u64_counter!( - "apollo_router_opened_subscriptions", - "Number of opened subscriptions", - pubsub.subscriptions.len() as u64 - ); } message = receiver.next() => { match message { @@ -753,8 +769,18 @@ where sender: broadcast::Sender>, heartbeat_enabled: bool, ) { - self.subscriptions - .insert(topic, Subscription::new(sender, heartbeat_enabled)); + let existed = self + .subscriptions + .insert(topic, Subscription::new(sender, heartbeat_enabled)) + .is_some(); + if !existed { + // TODO: deprecated name, should use our new convention apollo.router. for router next + i64_up_down_counter!( + "apollo_router_opened_subscriptions", + "Number of opened subscriptions", + 1 + ); + } } fn subscribe(&mut self, topic: K, sender: ResponseSender) { @@ -802,8 +828,15 @@ where } None => tracing::trace!("Cannot find the subscription to unsubscribe"), } + #[allow(clippy::collapsible_if)] if topic_to_delete { - self.subscriptions.remove(&topic); + if self.subscriptions.remove(&topic).is_some() { + i64_up_down_counter!( + "apollo_router_opened_subscriptions", + "Number of opened subscriptions", + -1 + ); + } }; } @@ -870,6 +903,11 @@ where // Send error message to all killed connections for (_subscriber_id, subscription) in closed_subs { + i64_up_down_counter!( + "apollo_router_opened_subscriptions", + "Number of opened subscriptions", + -1 + ); if let Some(heartbeat_error_message) = &heartbeat_error_message { let _ = subscription .msg_sender @@ -895,6 +933,11 @@ where tracing::trace!("deleting subscription"); let sub = self.subscriptions.remove(&topic); if let Some(sub) = sub { + i64_up_down_counter!( + "apollo_router_opened_subscriptions", + "Number of opened subscriptions", + -1 + ); let _ = sub.msg_sender.send(None); } }