Skip to content

Commit

Permalink
fix(subscription): apollo_router_opened_subscriptions counter was ina…
Browse files Browse the repository at this point in the history
…ccurate (#5363)

Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com>
  • Loading branch information
bnjjj authored Jun 10, 2024
1 parent 74ab807 commit 9b3d216
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changesets/fix_bnjjj_fix_subscription_metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### `apollo_router_opened_subscriptions` counter was inaccurate ([PR #5363](https://github.com/apollographql/router/pull/5363))

The counter `apollo_router_opened_subscriptions` was only incrementing and never decrementing. Now it's handled properly.

By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5363
61 changes: 52 additions & 9 deletions apollo-router/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,27 @@ enum Notification<K, V> {
},
}

impl<K, V> Debug for Notification<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CreateOrSubscribe { .. } => f.debug_struct("CreateOrSubscribe").finish(),
Self::Subscribe { .. } => f.debug_struct("Subscribe").finish(),
Self::SubscribeIfExist { .. } => f.debug_struct("SubscribeIfExist").finish(),
Self::Unsubscribe { .. } => f.debug_struct("Unsubscribe").finish(),
Self::ForceDelete { .. } => f.debug_struct("ForceDelete").finish(),
Self::Exist { .. } => f.debug_struct("Exist").finish(),
Self::InvalidIds { .. } => f.debug_struct("InvalidIds").finish(),
Self::UpdateHeartbeat { .. } => f.debug_struct("UpdateHeartbeat").finish(),
#[cfg(test)]
Self::TryDelete { .. } => f.debug_struct("TryDelete").finish(),
#[cfg(test)]
Self::Broadcast { .. } => f.debug_struct("Broadcast").finish(),
#[cfg(test)]
Self::Debug { .. } => f.debug_struct("Debug").finish(),
}
}
}

/// In memory pub/sub implementation
#[derive(Clone)]
pub struct Notify<K, V> {
Expand All @@ -154,7 +175,7 @@ where
queue_size: Option<usize>,
) -> Notify<K, V> {
let (sender, receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE);
let receiver_stream = ReceiverStream::new(receiver);
let receiver_stream: ReceiverStream<Notification<K, V>> = ReceiverStream::new(receiver);
tokio::task::spawn(task(receiver_stream, ttl, heartbeat_error_message));
Notify {
sender,
Expand Down Expand Up @@ -606,11 +627,6 @@ async fn task<K, V>(
_ = ttl_fut.next() => {
let heartbeat_error_message = heartbeat_error_message.clone();
pubsub.kill_dead_topics(heartbeat_error_message).await;
u64_counter!(
"apollo_router_opened_subscriptions",
"Number of opened subscriptions",
pubsub.subscriptions.len() as u64
);
}
message = receiver.next() => {
match message {
Expand Down Expand Up @@ -753,8 +769,18 @@ where
sender: broadcast::Sender<Option<V>>,
heartbeat_enabled: bool,
) {
self.subscriptions
.insert(topic, Subscription::new(sender, heartbeat_enabled));
let existed = self
.subscriptions
.insert(topic, Subscription::new(sender, heartbeat_enabled))
.is_some();
if !existed {
// TODO: deprecated name, should use our new convention apollo.router. for router next
i64_up_down_counter!(
"apollo_router_opened_subscriptions",
"Number of opened subscriptions",
1
);
}
}

fn subscribe(&mut self, topic: K, sender: ResponseSender<V>) {
Expand Down Expand Up @@ -802,8 +828,15 @@ where
}
None => tracing::trace!("Cannot find the subscription to unsubscribe"),
}
#[allow(clippy::collapsible_if)]
if topic_to_delete {
self.subscriptions.remove(&topic);
if self.subscriptions.remove(&topic).is_some() {
i64_up_down_counter!(
"apollo_router_opened_subscriptions",
"Number of opened subscriptions",
-1
);
}
};
}

Expand Down Expand Up @@ -870,6 +903,11 @@ where

// Send error message to all killed connections
for (_subscriber_id, subscription) in closed_subs {
i64_up_down_counter!(
"apollo_router_opened_subscriptions",
"Number of opened subscriptions",
-1
);
if let Some(heartbeat_error_message) = &heartbeat_error_message {
let _ = subscription
.msg_sender
Expand All @@ -895,6 +933,11 @@ where
tracing::trace!("deleting subscription");
let sub = self.subscriptions.remove(&topic);
if let Some(sub) = sub {
i64_up_down_counter!(
"apollo_router_opened_subscriptions",
"Number of opened subscriptions",
-1
);
let _ = sub.msg_sender.send(None);
}
}
Expand Down

0 comments on commit 9b3d216

Please sign in to comment.