Skip to content

Commit

Permalink
fix(subscriber): Don't save poll_ops if no-one is receiving them
Browse files Browse the repository at this point in the history
Do not record poll_ops if there are no current connected clients
(watchers). Without this `Aggregator::poll_ops` would grow forever.

Follow up to tokio-rs#311 and
fix for these two:
- tokio-rs#184
- tokio-rs#500
  • Loading branch information
Graham King committed Dec 14, 2023
1 parent 96c65bd commit 4063632
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use super::{Command, Event, Shared, Watch};
use crate::{
stats::{self, Unsent},
ToProto, WatchRequest,
};
use console_api as proto;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};

use std::{
sync::{
atomic::{AtomicBool, Ordering::*},
Arc,
},
time::{Duration, Instant},
};

use console_api as proto;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};
use tracing_core::{span::Id, Metadata};

use super::{Command, Event, Shared, Watch};
use crate::{
stats::{self, Unsent},
ToProto, WatchRequest,
};

mod id_data;
mod shrink;
use self::id_data::{IdData, Include};
Expand Down Expand Up @@ -269,6 +270,9 @@ impl Aggregator {
.drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
self.async_ops
.drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
if !has_watchers && !self.poll_ops.is_empty() {
self.poll_ops.clear();
}
}

/// Add the task subscription to the watchers after sending the first update
Expand Down Expand Up @@ -305,14 +309,10 @@ impl Aggregator {
}

fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
let new_poll_ops = match include {
Include::All => self.poll_ops.clone(),
Include::UpdatedOnly => std::mem::take(&mut self.poll_ops),
};
proto::resources::ResourceUpdate {
new_resources: self.resources.as_proto_list(include, &self.base_time),
stats_update: self.resource_stats.as_proto(include, &self.base_time),
new_poll_ops,
new_poll_ops: std::mem::take(&mut self.poll_ops),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}
}
Expand Down Expand Up @@ -472,6 +472,9 @@ impl Aggregator {
task_id,
is_ready,
} => {
if self.watchers.is_empty() {
return;
}
let poll_op = proto::resources::PollOp {
metadata: Some(metadata.into()),
resource_id: Some(resource_id.into()),
Expand Down

0 comments on commit 4063632

Please sign in to comment.