Skip to content

Commit

Permalink
refac(subscriber): factor out update construction (#290)
Browse files Browse the repository at this point in the history
This branch refactors how the aggregator task builds update messages. In
particular, it factors out the shared code between building the initial
update for a subscription and building the subsequent updates in
`publish` into methods that are called with an `Include` enum to
determine whether the update should include all tasks/resources/ops, or
just the ones since the last update. This makes the code in
`add_instrument_subscription` and in `publish` much easier to read.

This is just the refactor part from #289 split out into a separate
branch; this should make no functional change. If we decide that #289 is
the correct way to fix the bug, we can rebase that branch onto this, so
that it _just_ includes the bugfix. This way, we can land the refactor
anyway, because I think it's nice.
  • Loading branch information
hawkw authored Feb 17, 2022
1 parent abc0830 commit f2a25ed
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 71 deletions.
18 changes: 18 additions & 0 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub(crate) struct IdData<T> {
data: ShrinkMap<Id, T>,
}

#[derive(Copy, Clone, Eq, PartialEq)]
pub(crate) enum Include {
All,
UpdatedOnly,
Expand Down Expand Up @@ -45,6 +46,23 @@ impl<T: Unsent> IdData<T> {
self.data.get(id)
}

pub(crate) fn as_proto_list(
&mut self,
include: Include,
base_time: &TimeAnchor,
) -> Vec<T::Output>
where
T: ToProto,
{
match include {
Include::UpdatedOnly => self
.since_last_update()
.map(|(_, d)| d.to_proto(base_time))
.collect(),
Include::All => self.all().map(|(_, d)| d.to_proto(base_time)).collect(),
}
}

pub(crate) fn as_proto(
&mut self,
include: Include,
Expand Down
117 changes: 46 additions & 71 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,47 +261,55 @@ impl Aggregator {
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
tracing::debug!("new instrument subscription");
let now = Instant::now();
// Send the initial state --- if this fails, the subscription is already dead

let task_update = Some(self.task_update(Include::All));
let resource_update = Some(self.resource_update(Include::All));
let async_op_update = Some(self.async_op_update(Include::All));
let update = &proto::instrument::Update {
task_update: Some(proto::tasks::TaskUpdate {
new_tasks: self
.tasks
.all()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.task_stats.as_proto(Include::All, &self.base_time),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
.resources
.all()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.resource_stats.as_proto(Include::All, &self.base_time),
new_poll_ops: (*self.all_poll_ops).clone(),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
.async_ops
.all()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.async_op_stats.as_proto(Include::All, &self.base_time),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
task_update,
resource_update,
async_op_update,
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
};

// Send the initial state --- if this fails, the subscription is already dead
if subscription.update(update) {
self.watchers.push(subscription)
}
}

fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
proto::tasks::TaskUpdate {
new_tasks: self.tasks.as_proto_list(include, &self.base_time),
stats_update: self.task_stats.as_proto(include, &self.base_time),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}
}

fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
let new_poll_ops = match include {
Include::All => (*self.all_poll_ops).clone(),
Include::UpdatedOnly => std::mem::take(&mut self.new_poll_ops),
};
proto::resources::ResourceUpdate {
new_resources: self.resources.as_proto_list(include, &self.base_time),
stats_update: self.resource_stats.as_proto(include, &self.base_time),
new_poll_ops,
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}
}

fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
proto::async_ops::AsyncOpUpdate {
new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
stats_update: self.async_op_stats.as_proto(include, &self.base_time),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}
}

/// Add the task details subscription to the watchers after sending the first update,
/// if the task is found.
fn add_task_detail_subscription(
Expand Down Expand Up @@ -348,50 +356,17 @@ impl Aggregator {
} else {
None
};
let now = Instant::now();
let task_update = Some(self.task_update(Include::UpdatedOnly));
let resource_update = Some(self.resource_update(Include::UpdatedOnly));
let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));

let new_poll_ops = std::mem::take(&mut self.new_poll_ops);

let now = self.base_time.to_timestamp(Instant::now());
let update = proto::instrument::Update {
now: Some(now.clone()),
now: Some(self.base_time.to_timestamp(now)),
new_metadata,
task_update: Some(proto::tasks::TaskUpdate {
new_tasks: self
.tasks
.since_last_update()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self
.task_stats
.as_proto(Include::UpdatedOnly, &self.base_time),

dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
.resources
.since_last_update()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self
.resource_stats
.as_proto(Include::UpdatedOnly, &self.base_time),
new_poll_ops,

dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
.async_ops
.since_last_update()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self
.async_op_stats
.as_proto(Include::UpdatedOnly, &self.base_time),

dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
task_update,
resource_update,
async_op_update,
};

self.watchers
Expand All @@ -404,7 +379,7 @@ impl Aggregator {
if let Some(task_stats) = stats.get(id) {
let details = proto::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now: Some(now.clone()),
now: Some(self.base_time.to_timestamp(now)),
poll_times_histogram: task_stats.serialize_histogram(),
};
watchers.retain(|watch| watch.update(&details));
Expand Down

0 comments on commit f2a25ed

Please sign in to comment.