Skip to content

Commit

Permalink
fix(subscriber): record timestamps for updates last
Browse files Browse the repository at this point in the history
## Motivation

Currently, when constructing an update message to send over the wire, a
timestamp is taken first, and then the protobuf data is constructed.
This can lead to issues where the "now" timestamp is actually _before_
timestamps present in the stats sent in that update, since the stats for
a particular task/resource/async op might be updated on another thread
after taking the update's "now" timestamp. This results in issues like
#266.

## Solution

There's no actual reason to take those timestamps *before* we assemble
the update. This branch changes the aggregator to build all the various
data updates in an update message, and *then* record the update's "now"
timestamp. Any timestamps for tasks/resources/async ops that are
recorded after the update's "now" timestamp will now be included in the
*next* update.

Fixes #266
Depends on #288
  • Loading branch information
hawkw committed Feb 17, 2022
1 parent 591f4bb commit 4d52973
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,12 @@ impl Aggregator {
/// Add the task subscription to the watchers after sending the first update
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
tracing::debug!("new instrument subscription");
let now = Instant::now();

let task_update = Some(self.task_update(Include::All));
let resource_update = Some(self.resource_update(Include::All));
let async_op_update = Some(self.async_op_update(Include::All));
let now = Instant::now();

let update = &proto::instrument::Update {
task_update,
resource_update,
Expand Down Expand Up @@ -356,13 +357,12 @@ impl Aggregator {
} else {
None
};
let now = Instant::now();
let task_update = Some(self.task_update(Include::UpdatedOnly));
let resource_update = Some(self.resource_update(Include::UpdatedOnly));
let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));

let update = proto::instrument::Update {
now: Some(self.base_time.to_timestamp(now)),
now: Some(self.base_time.to_timestamp(Instant::now())),
new_metadata,
task_update,
resource_update,
Expand All @@ -379,7 +379,7 @@ impl Aggregator {
if let Some(task_stats) = stats.get(id) {
let details = proto::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now: Some(self.base_time.to_timestamp(now)),
now: Some(self.base_time.to_timestamp(Instant::now())),
poll_times_histogram: task_stats.serialize_histogram(),
};
watchers.retain(|watch| watch.update(&details));
Expand Down

0 comments on commit 4d52973

Please sign in to comment.