From abc083000cb6de51e37d5037283e97ed0e27249e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Feb 2022 10:39:37 -0800 Subject: [PATCH] fix(subscriber): use monotonic `Instant`s for all timestamps (#288) ## Motivation Currently, the `console-subscriber` crate records all timestamps as `SystemTime`s. This is because they are eventually sent over the wire as protobuf `Timestamp`s, which can be constructed from a `SystemTime`. They cannot be constructed from `Instant`s, because `Instant` is opaque, and does not expose access to the underlying OS time. However, using `SystemTime` is not really correct for our use case. We only use timestamps for calculating durations; we only have to serialize them because some durations are calculated in the console UI rather than in-process. We *don't* need timestamps that are globally consistent with a shared timebase, but we *do* need monotonicity --- using `SystemTime` leaves us vulnerable to clock skew, if (for example), an NTP clock skew adjustment causes the system clock to run backwards far enough that a poll appears to end "before" it started (as in issue #286). If we were using monotonic `Instant`s, all polls should always have positive durations, but with `SystemTime`s, this isn't necessarily the case. Furthermore, `Instant::now()` may have less performance overhead than `SystemTime::now()`, at least on some platforms. ## Solution This branch changes `console-subscriber` to always take timestamps using `Instant::now()` rather than using `SystemTime::now()`, and store all timestamps as `Instant`s. In order to convert these `Instant`s into `SystemTime`s that can be sent over the wire, we construct a reference `TimeAnchor`, consisting of a paired `Instant` and `SystemTime` recorded at the same time when the `ConsoleLayer` is constructed. We can then construct "system times" that are monotonic, by calculating the duration between a given `Instant` and the anchor `Instant`, and adding that duration to the anchor `SystemTime`. These are not *real* system timestamps, as they will never run backwards if the system clock is adjusted; they are relative only to the base process start time as recorded by the anchor. However, they *are* monotonic, and all durations calculated from them will be reasonable. This is part of the change I proposed in #254. I'm not going to close that issue yet, though, as it also described potentially switching to use the `quanta` crate rather than `std::time::Instant` to reduce the overhead of recording monotonic timestamps. Fixes #286 --- Cargo.lock | 1 + console-subscriber/Cargo.toml | 1 + console-subscriber/src/aggregator/id_data.rs | 18 ++- console-subscriber/src/aggregator/mod.rs | 60 ++++---- console-subscriber/src/lib.rs | 54 ++++--- console-subscriber/src/stats.rs | 142 ++++++++++++------- 6 files changed, 169 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a8010961..9b2bfdcd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,7 @@ dependencies = [ "hdrhistogram", "humantime", "parking_lot", + "prost-types", "serde", "serde_json", "thread_local", diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index ecbc81ab4..a84c20363 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -45,6 +45,7 @@ hdrhistogram = { version = "7.3.0", default-features = false, features = ["seria # feature to also enable `tracing-subscriber`'s parking_lot feature flag. parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true } humantime = "2.1.0" +prost-types = "0.9.0" # Required for recording: serde = { version = "1", features = ["derive"] } diff --git a/console-subscriber/src/aggregator/id_data.rs b/console-subscriber/src/aggregator/id_data.rs index ebb783c8c..a44c3d8a2 100644 --- a/console-subscriber/src/aggregator/id_data.rs +++ b/console-subscriber/src/aggregator/id_data.rs @@ -1,7 +1,7 @@ use super::{shrink::ShrinkMap, Id, ToProto}; -use crate::stats::{DroppedAt, Unsent}; +use crate::stats::{DroppedAt, TimeAnchor, Unsent}; use std::collections::HashMap; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant}; pub(crate) struct IdData { data: ShrinkMap, @@ -45,18 +45,22 @@ impl IdData { self.data.get(id) } - pub(crate) fn as_proto(&mut self, include: Include) -> HashMap + pub(crate) fn as_proto( + &mut self, + include: Include, + base_time: &TimeAnchor, + ) -> HashMap where T: ToProto, { match include { Include::UpdatedOnly => self .since_last_update() - .map(|(id, d)| (id.into_u64(), d.to_proto())) + .map(|(id, d)| (id.into_u64(), d.to_proto(base_time))) .collect(), Include::All => self .all() - .map(|(id, d)| (id.into_u64(), d.to_proto())) + .map(|(id, d)| (id.into_u64(), d.to_proto(base_time))) .collect(), } } @@ -64,7 +68,7 @@ impl IdData { pub(crate) fn drop_closed( &mut self, stats: &mut IdData, - now: SystemTime, + now: Instant, retention: Duration, has_watchers: bool, ) { @@ -80,7 +84,7 @@ impl IdData { stats.data.retain_and_shrink(|id, stats| { if let Some(dropped_at) = stats.dropped_at() { - let dropped_for = now.duration_since(dropped_at).unwrap_or_default(); + let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default(); let dirty = stats.is_unsent(); let should_drop = // if there are any clients watching, retain all dirty tasks regardless of age diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 14d806f8e..f688877d9 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -13,7 +13,7 @@ use std::{ atomic::{AtomicBool, Ordering::*}, Arc, }, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use tracing_core::{span::Id, Metadata}; @@ -86,6 +86,10 @@ pub(crate) struct Aggregator { /// The time "state" of the aggregator, such as paused or live. temporality: Temporality, + + /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a + /// timestamp that can be sent over the wire. + base_time: stats::TimeAnchor, } #[derive(Debug, Default)] @@ -135,6 +139,7 @@ impl Aggregator { rpcs: mpsc::Receiver, builder: &crate::Builder, shared: Arc, + base_time: stats::TimeAnchor, ) -> Self { Self { shared, @@ -155,6 +160,7 @@ impl Aggregator { all_poll_ops: Default::default(), new_poll_ops: Default::default(), temporality: Temporality::Live, + base_time, } } @@ -241,7 +247,7 @@ impl Aggregator { fn cleanup_closed(&mut self) { // drop all closed have that has completed *and* whose final data has already // been sent off. - let now = SystemTime::now(); + let now = Instant::now(); let has_watchers = !self.watchers.is_empty(); self.tasks .drop_closed(&mut self.task_stats, now, self.retention, has_watchers); @@ -254,25 +260,25 @@ impl Aggregator { /// Add the task subscription to the watchers after sending the first update fn add_instrument_subscription(&mut self, subscription: Watch) { tracing::debug!("new instrument subscription"); - let now = SystemTime::now(); + let now = Instant::now(); // Send the initial state --- if this fails, the subscription is already dead let update = &proto::instrument::Update { task_update: Some(proto::tasks::TaskUpdate { new_tasks: self .tasks .all() - .map(|(_, value)| value.to_proto()) + .map(|(_, value)| value.to_proto(&self.base_time)) .collect(), - stats_update: self.task_stats.as_proto(Include::All), + stats_update: self.task_stats.as_proto(Include::All, &self.base_time), dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64, }), resource_update: Some(proto::resources::ResourceUpdate { new_resources: self .resources .all() - .map(|(_, value)| value.to_proto()) + .map(|(_, value)| value.to_proto(&self.base_time)) .collect(), - stats_update: self.resource_stats.as_proto(Include::All), + stats_update: self.resource_stats.as_proto(Include::All, &self.base_time), new_poll_ops: (*self.all_poll_ops).clone(), dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64, }), @@ -280,12 +286,12 @@ impl Aggregator { new_async_ops: self .async_ops .all() - .map(|(_, value)| value.to_proto()) + .map(|(_, value)| value.to_proto(&self.base_time)) .collect(), - stats_update: self.async_op_stats.as_proto(Include::All), + stats_update: self.async_op_stats.as_proto(Include::All, &self.base_time), dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64, }), - now: Some(now.into()), + now: Some(self.base_time.to_timestamp(now)), new_metadata: Some(proto::RegisterMetadata { metadata: (*self.all_metadata).clone(), }), @@ -311,13 +317,13 @@ impl Aggregator { if let Some(stats) = self.task_stats.get(&id) { let (tx, rx) = mpsc::channel(buffer); let subscription = Watch(tx); - let now = SystemTime::now(); + let now = Some(self.base_time.to_timestamp(Instant::now())); // Send back the stream receiver. // Then send the initial state --- if this fails, the subscription is already dead. if stream_sender.send(rx).is_ok() && subscription.update(&proto::tasks::TaskDetails { task_id: Some(id.clone().into()), - now: Some(now.into()), + now, poll_times_histogram: stats.serialize_histogram(), }) { @@ -345,17 +351,19 @@ impl Aggregator { let new_poll_ops = std::mem::take(&mut self.new_poll_ops); - let now = SystemTime::now(); + let now = self.base_time.to_timestamp(Instant::now()); let update = proto::instrument::Update { - now: Some(now.into()), + now: Some(now.clone()), new_metadata, task_update: Some(proto::tasks::TaskUpdate { new_tasks: self .tasks .since_last_update() - .map(|(_, value)| value.to_proto()) + .map(|(_, value)| value.to_proto(&self.base_time)) .collect(), - stats_update: self.task_stats.as_proto(Include::UpdatedOnly), + stats_update: self + .task_stats + .as_proto(Include::UpdatedOnly, &self.base_time), dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64, }), @@ -363,9 +371,11 @@ impl Aggregator { new_resources: self .resources .since_last_update() - .map(|(_, value)| value.to_proto()) + .map(|(_, value)| value.to_proto(&self.base_time)) .collect(), - stats_update: self.resource_stats.as_proto(Include::UpdatedOnly), + stats_update: self + .resource_stats + .as_proto(Include::UpdatedOnly, &self.base_time), new_poll_ops, dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64, @@ -374,9 +384,11 @@ impl Aggregator { new_async_ops: self .async_ops .since_last_update() - .map(|(_, value)| value.to_proto()) + .map(|(_, value)| value.to_proto(&self.base_time)) .collect(), - stats_update: self.async_op_stats.as_proto(Include::UpdatedOnly), + stats_update: self + .async_op_stats + .as_proto(Include::UpdatedOnly, &self.base_time), dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64, }), @@ -392,7 +404,7 @@ impl Aggregator { if let Some(task_stats) = stats.get(id) { let details = proto::tasks::TaskDetails { task_id: Some(id.clone().into()), - now: Some(now.into()), + now: Some(now.clone()), poll_times_histogram: task_stats.serialize_histogram(), }; watchers.retain(|watch| watch.update(&details)); @@ -545,7 +557,7 @@ impl Watch { impl ToProto for Task { type Output = proto::tasks::Task; - fn to_proto(&self) -> Self::Output { + fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output { proto::tasks::Task { id: Some(self.id.clone().into()), // TODO: more kinds of tasks... @@ -571,7 +583,7 @@ impl Unsent for Task { impl ToProto for Resource { type Output = proto::resources::Resource; - fn to_proto(&self) -> Self::Output { + fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output { proto::resources::Resource { id: Some(self.id.clone().into()), parent_resource_id: self.parent_id.clone().map(Into::into), @@ -597,7 +609,7 @@ impl Unsent for Resource { impl ToProto for AsyncOp { type Output = proto::async_ops::AsyncOp; - fn to_proto(&self) -> Self::Output { + fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output { proto::async_ops::AsyncOp { id: Some(self.id.clone().into()), metadata: Some(self.metadata.into()), diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 1145352b1..62db8da64 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -10,7 +10,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use thread_local::ThreadLocal; use tokio::sync::{mpsc, oneshot}; @@ -110,6 +110,10 @@ pub struct ConsoleLayer { /// A sink to record all events to a file. recorder: Option, + + /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a + /// timestamp that can be sent over the wire or recorded to JSON. + base_time: stats::TimeAnchor, } /// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire]. @@ -132,7 +136,7 @@ pub struct Server { pub(crate) trait ToProto { type Output; - fn to_proto(&self) -> Self::Output; + fn to_proto(&self, base_time: &stats::TimeAnchor) -> Self::Output; } /// State shared between the `ConsoleLayer` and the `Aggregator` task. @@ -249,6 +253,8 @@ impl ConsoleLayer { cfg!(tokio_unstable), "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!" ); + + let base_time = stats::TimeAnchor::new(); tracing::debug!( config.event_buffer_capacity, config.client_buffer_capacity, @@ -256,13 +262,14 @@ impl ConsoleLayer { ?config.retention, ?config.server_addr, ?config.recording_path, + ?base_time, "configured console subscriber" ); let (tx, events) = mpsc::channel(config.event_buffer_capacity); let (subscribe, rpcs) = mpsc::channel(256); let shared = Arc::new(Shared::default()); - let aggregator = Aggregator::new(events, rpcs, &config, shared.clone()); + let aggregator = Aggregator::new(events, rpcs, &config, shared.clone(), base_time.clone()); // Conservatively, start to trigger a flush when half the channel is full. // This tries to reduce the chance of losing events to a full channel. let flush_under_capacity = config.event_buffer_capacity / 2; @@ -290,6 +297,7 @@ impl ConsoleLayer { resource_state_update_callsites: Callsites::default(), async_op_state_update_callsites: Callsites::default(), recorder, + base_time, }; (layer, server) } @@ -529,13 +537,13 @@ where fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { let metadata = attrs.metadata(); if self.is_spawn(metadata) { - let at = SystemTime::now(); + let at = Instant::now(); let mut task_visitor = TaskVisitor::new(metadata.into()); attrs.record(&mut task_visitor); let (fields, location) = task_visitor.result(); self.record(|| record::Event::Spawn { id: id.into_u64(), - at, + at: self.base_time.to_system_time(at), fields: record::SerializeFields(fields.clone()), }); if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || { @@ -555,7 +563,7 @@ where } if self.is_resource(metadata) { - let at = SystemTime::now(); + let at = Instant::now(); let mut resource_visitor = ResourceVisitor::default(); attrs.record(&mut resource_visitor); if let Some(result) = resource_visitor.result() { @@ -594,7 +602,7 @@ where } if self.is_async_op(metadata) { - let at = SystemTime::now(); + let at = Instant::now(); let mut async_op_visitor = AsyncOpVisitor::default(); attrs.record(&mut async_op_visitor); if let Some((source, inherit_child_attrs)) = async_op_visitor.result() { @@ -635,7 +643,7 @@ where fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { let metadata = event.metadata(); if self.waker_callsites.contains(metadata) { - let at = SystemTime::now(); + let at = Instant::now(); let mut visitor = WakerVisitor::default(); event.record(&mut visitor); // XXX (eliza): ew... @@ -658,7 +666,7 @@ where stats.record_wake_op(op, at); self.record(|| record::Event::Waker { id: id.into_u64(), - at, + at: self.base_time.to_system_time(at), op, }); } @@ -743,24 +751,24 @@ where fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) { fn update LookupSpan<'a>>( span: &SpanRef, - at: Option, - ) -> Option { + at: Option, + ) -> Option { let exts = span.extensions(); // if the span we are entering is a task or async op, record the // poll stats. if let Some(stats) = exts.get::>() { - let at = at.unwrap_or_else(SystemTime::now); + let at = at.unwrap_or_else(Instant::now); stats.start_poll(at); Some(at) } else if let Some(stats) = exts.get::>() { - let at = at.unwrap_or_else(SystemTime::now); + let at = at.unwrap_or_else(Instant::now); stats.start_poll(at); Some(at) // otherwise, is the span a resource? in that case, we also want // to enter it, although we don't care about recording poll // stats. } else if exts.get::>().is_some() { - Some(at.unwrap_or_else(SystemTime::now)) + Some(at.unwrap_or_else(Instant::now)) } else { None } @@ -778,7 +786,7 @@ where self.record(|| record::Event::Enter { id: id.into_u64(), - at: now, + at: self.base_time.to_system_time(now), }); } } @@ -787,24 +795,24 @@ where fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) { fn update LookupSpan<'a>>( span: &SpanRef, - at: Option, - ) -> Option { + at: Option, + ) -> Option { let exts = span.extensions(); // if the span we are entering is a task or async op, record the // poll stats. if let Some(stats) = exts.get::>() { - let at = at.unwrap_or_else(SystemTime::now); + let at = at.unwrap_or_else(Instant::now); stats.end_poll(at); Some(at) } else if let Some(stats) = exts.get::>() { - let at = at.unwrap_or_else(SystemTime::now); + let at = at.unwrap_or_else(Instant::now); stats.end_poll(at); Some(at) // otherwise, is the span a resource? in that case, we also want // to enter it, although we don't care about recording poll // stats. } else if exts.get::>().is_some() { - Some(at.unwrap_or_else(SystemTime::now)) + Some(at.unwrap_or_else(Instant::now)) } else { None } @@ -819,7 +827,7 @@ where self.record(|| record::Event::Exit { id: id.into_u64(), - at: now, + at: self.base_time.to_system_time(now), }); } } @@ -827,7 +835,7 @@ where fn on_close(&self, id: span::Id, cx: Context<'_, S>) { if let Some(span) = cx.span(&id) { - let now = SystemTime::now(); + let now = Instant::now(); let exts = span.extensions(); if let Some(stats) = exts.get::>() { stats.drop_task(now); @@ -838,7 +846,7 @@ where } self.record(|| record::Event::Close { id: id.into_u64(), - at: now, + at: self.base_time.to_system_time(now), }); } } diff --git a/console-subscriber/src/stats.rs b/console-subscriber/src/stats.rs index a18b33266..cb5f482f9 100644 --- a/console-subscriber/src/stats.rs +++ b/console-subscriber/src/stats.rs @@ -9,7 +9,7 @@ use std::sync::{ atomic::{AtomicBool, AtomicUsize, Ordering::*}, Arc, }; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use tracing::span::Id; use console_api as proto; @@ -38,7 +38,15 @@ pub(crate) trait Unsent { // have been closed indicating that a task, async op or a // resource is not in use anymore pub(crate) trait DroppedAt { - fn dropped_at(&self) -> Option; + fn dropped_at(&self) -> Option; +} + +/// Anchors an `Instant` with a `SystemTime` timestamp to allow converting +/// monotonic `Instant`s into timestamps that can be sent over the wire. +#[derive(Debug, Clone)] +pub(crate) struct TimeAnchor { + mono: Instant, + sys: SystemTime, } /// Stats associated with a task. @@ -47,7 +55,7 @@ pub(crate) struct TaskStats { is_dirty: AtomicBool, is_dropped: AtomicBool, // task stats - pub(crate) created_at: SystemTime, + pub(crate) created_at: Instant, timestamps: Mutex, // waker stats @@ -85,8 +93,8 @@ pub(crate) struct AsyncOpStats { pub(crate) struct ResourceStats { is_dirty: AtomicBool, is_dropped: AtomicBool, - created_at: SystemTime, - dropped_at: Mutex>, + created_at: Instant, + dropped_at: Mutex>, attributes: Mutex, pub(crate) inherit_child_attributes: bool, pub(crate) parent_id: Option, @@ -94,8 +102,8 @@ pub(crate) struct ResourceStats { #[derive(Debug, Default)] struct TaskTimestamps { - dropped_at: Option, - last_wake: Option, + dropped_at: Option, + last_wake: Option, } #[derive(Debug, Default)] @@ -109,15 +117,35 @@ struct PollStats { #[derive(Debug, Default)] struct PollTimestamps { - first_poll: Option, - last_poll_started: Option, - last_poll_ended: Option, + first_poll: Option, + last_poll_started: Option, + last_poll_ended: Option, busy_time: Duration, histogram: Option>, } +impl TimeAnchor { + pub(crate) fn new() -> Self { + Self { + mono: Instant::now(), + sys: SystemTime::now(), + } + } + + pub(crate) fn to_system_time(&self, t: Instant) -> SystemTime { + let dur = t + .checked_duration_since(self.mono) + .unwrap_or_else(|| Duration::from_secs(0)); + self.sys + dur + } + + pub(crate) fn to_timestamp(&self, t: Instant) -> prost_types::Timestamp { + self.to_system_time(t).into() + } +} + impl TaskStats { - pub(crate) fn new(created_at: SystemTime) -> Self { + pub(crate) fn new(created_at: Instant) -> Self { // significant figures should be in the [0-5] range and memory usage // grows exponentially with higher a sigfig let poll_times_histogram = Histogram::::new(2).unwrap(); @@ -140,7 +168,7 @@ impl TaskStats { } } - pub(crate) fn record_wake_op(&self, op: crate::WakeOp, at: SystemTime) { + pub(crate) fn record_wake_op(&self, op: crate::WakeOp, at: Instant) { use crate::WakeOp; match op { WakeOp::Clone => { @@ -167,7 +195,7 @@ impl TaskStats { self.make_dirty(); } - fn wake(&self, at: SystemTime, self_wake: bool) { + fn wake(&self, at: Instant, self_wake: bool) { let mut timestamps = self.timestamps.lock(); timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at)); self.wakes.fetch_add(1, Release); @@ -177,17 +205,17 @@ impl TaskStats { } } - pub(crate) fn start_poll(&self, at: SystemTime) { + pub(crate) fn start_poll(&self, at: Instant) { self.poll_stats.start_poll(at); self.make_dirty(); } - pub(crate) fn end_poll(&self, at: SystemTime) { + pub(crate) fn end_poll(&self, at: Instant) { self.poll_stats.end_poll(at); self.make_dirty(); } - pub(crate) fn drop_task(&self, dropped_at: SystemTime) { + pub(crate) fn drop_task(&self, dropped_at: Instant) { if self.is_dropped.swap(true, AcqRel) { // The task was already dropped. // TODO(eliza): this could maybe panic in debug mode... @@ -218,18 +246,18 @@ impl TaskStats { impl ToProto for TaskStats { type Output = proto::tasks::Stats; - fn to_proto(&self) -> Self::Output { - let poll_stats = Some(self.poll_stats.to_proto()); + fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output { + let poll_stats = Some(self.poll_stats.to_proto(base_time)); let timestamps = self.timestamps.lock(); proto::tasks::Stats { poll_stats, - created_at: Some(self.created_at.into()), - dropped_at: timestamps.dropped_at.map(Into::into), + created_at: Some(base_time.to_timestamp(self.created_at)), + dropped_at: timestamps.dropped_at.map(|at| base_time.to_timestamp(at)), wakes: self.wakes.load(Acquire) as u64, waker_clones: self.waker_clones.load(Acquire) as u64, self_wakes: self.self_wakes.load(Acquire) as u64, waker_drops: self.waker_drops.load(Acquire) as u64, - last_wake: timestamps.last_wake.map(Into::into), + last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)), } } } @@ -246,7 +274,7 @@ impl Unsent for TaskStats { } impl DroppedAt for TaskStats { - fn dropped_at(&self) -> Option { + fn dropped_at(&self) -> Option { // avoid acquiring the lock if we know we haven't tried to drop this // thing yet if self.is_dropped.load(Acquire) { @@ -261,7 +289,7 @@ impl DroppedAt for TaskStats { impl AsyncOpStats { pub(crate) fn new( - created_at: SystemTime, + created_at: Instant, inherit_child_attributes: bool, parent_id: Option, ) -> Self { @@ -286,16 +314,16 @@ impl AsyncOpStats { self.make_dirty(); } - pub(crate) fn drop_async_op(&self, dropped_at: SystemTime) { + pub(crate) fn drop_async_op(&self, dropped_at: Instant) { self.stats.drop_resource(dropped_at) } - pub(crate) fn start_poll(&self, at: SystemTime) { + pub(crate) fn start_poll(&self, at: Instant) { self.poll_stats.start_poll(at); self.make_dirty(); } - pub(crate) fn end_poll(&self, at: SystemTime) { + pub(crate) fn end_poll(&self, at: Instant) { self.poll_stats.end_poll(at); self.make_dirty(); } @@ -319,7 +347,7 @@ impl Unsent for AsyncOpStats { } impl DroppedAt for AsyncOpStats { - fn dropped_at(&self) -> Option { + fn dropped_at(&self) -> Option { self.stats.dropped_at() } } @@ -327,12 +355,16 @@ impl DroppedAt for AsyncOpStats { impl ToProto for AsyncOpStats { type Output = proto::async_ops::Stats; - fn to_proto(&self) -> Self::Output { + fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output { let attributes = self.stats.attributes.lock().values().cloned().collect(); proto::async_ops::Stats { - poll_stats: Some(self.poll_stats.to_proto()), - created_at: Some(self.stats.created_at.into()), - dropped_at: self.stats.dropped_at.lock().map(Into::into), + poll_stats: Some(self.poll_stats.to_proto(base_time)), + created_at: Some(base_time.to_timestamp(self.stats.created_at)), + dropped_at: self + .stats + .dropped_at + .lock() + .map(|at| base_time.to_timestamp(at)), task_id: self.task_id().map(Into::into), attributes, } @@ -343,7 +375,7 @@ impl ToProto for AsyncOpStats { impl ResourceStats { pub(crate) fn new( - created_at: SystemTime, + created_at: Instant, inherit_child_attributes: bool, parent_id: Option, ) -> Self { @@ -364,7 +396,7 @@ impl ResourceStats { } #[inline] - pub(crate) fn drop_resource(&self, dropped_at: SystemTime) { + pub(crate) fn drop_resource(&self, dropped_at: Instant) { if self.is_dropped.swap(true, AcqRel) { // The task was already dropped. // TODO(eliza): this could maybe panic in debug mode... @@ -398,7 +430,7 @@ impl Unsent for ResourceStats { } impl DroppedAt for ResourceStats { - fn dropped_at(&self) -> Option { + fn dropped_at(&self) -> Option { // avoid acquiring the lock if we know we haven't tried to drop this // thing yet if self.is_dropped.load(Acquire) { @@ -412,11 +444,11 @@ impl DroppedAt for ResourceStats { impl ToProto for ResourceStats { type Output = proto::resources::Stats; - fn to_proto(&self) -> Self::Output { + fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output { let attributes = self.attributes.lock().values().cloned().collect(); proto::resources::Stats { - created_at: Some(self.created_at.into()), - dropped_at: self.dropped_at.lock().map(Into::into), + created_at: Some(base_time.to_timestamp(self.created_at)), + dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)), attributes, } } @@ -425,7 +457,7 @@ impl ToProto for ResourceStats { // === impl PollStats === impl PollStats { - fn start_poll(&self, at: SystemTime) { + fn start_poll(&self, at: Instant) { if self.current_polls.fetch_add(1, AcqRel) == 0 { // We are starting the first poll let mut timestamps = self.timestamps.lock(); @@ -439,7 +471,7 @@ impl PollStats { } } - fn end_poll(&self, at: SystemTime) { + fn end_poll(&self, at: Instant) { // Are we ending the last current poll? if self.current_polls.fetch_sub(1, AcqRel) > 1 { return; @@ -451,20 +483,20 @@ impl PollStats { None => { eprintln!( "a poll ended, but start timestamp was recorded. \ - this is probably a `console-subscriber` bug" + this is probably a `console-subscriber` bug" ); return; } }; timestamps.last_poll_ended = Some(at); - let elapsed = match started.duration_since(at) { - Ok(elapsed) => elapsed, - Err(error) => { + let elapsed = match at.checked_duration_since(started) { + Some(elapsed) => elapsed, + None => { eprintln!( - "possible clock skew detected: a poll's end timestamp \ - was {:?} before its start timestamp", - error.duration(), + "possible Instant clock skew detected: a poll's end timestamp \ + was before its start timestamp\nstart = {:?}\n end = {:?}", + started, at ); return; } @@ -485,13 +517,17 @@ impl PollStats { impl ToProto for PollStats { type Output = proto::PollStats; - fn to_proto(&self) -> Self::Output { + fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output { let timestamps = self.timestamps.lock(); proto::PollStats { polls: self.polls.load(Acquire) as u64, - first_poll: timestamps.first_poll.map(Into::into), - last_poll_started: timestamps.last_poll_started.map(Into::into), - last_poll_ended: timestamps.last_poll_ended.map(Into::into), + first_poll: timestamps.first_poll.map(|at| base_time.to_timestamp(at)), + last_poll_started: timestamps + .last_poll_started + .map(|at| base_time.to_timestamp(at)), + last_poll_ended: timestamps + .last_poll_ended + .map(|at| base_time.to_timestamp(at)), busy_time: Some(timestamps.busy_time.into()), } } @@ -500,7 +536,7 @@ impl ToProto for PollStats { // === impl Arc === impl DroppedAt for Arc { - fn dropped_at(&self) -> Option { + fn dropped_at(&self) -> Option { T::dropped_at(self) } } @@ -517,7 +553,7 @@ impl Unsent for Arc { impl ToProto for Arc { type Output = T::Output; - fn to_proto(&self) -> T::Output { - T::to_proto(self) + fn to_proto(&self, base_time: &TimeAnchor) -> T::Output { + T::to_proto(self, base_time) } }