Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(console): add task scheduled times histogram #409

Merged
merged 7 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions console-api/proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ message TaskDetails {
// A histogram plus additional data.
DurationHistogram histogram = 4;
}

// A histogram of task scheduled durations.
//
// The scheduled duration is the time a task spends between being
// woken and when it is next polled.
DurationHistogram scheduled_times_histogram = 5;
}

// Data recorded when a new task is spawned.
Expand Down
6 changes: 6 additions & 0 deletions console-api/src/generated/rs.tokio.console.tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub struct TaskDetails {
/// The timestamp for when the update to the task took place.
#[prost(message, optional, tag="2")]
pub now: ::core::option::Option<::prost_types::Timestamp>,
/// A histogram of task scheduled durations.
///
/// The scheduled duration is the time a task spends between being
/// woken and when it is next polled.
#[prost(message, optional, tag="5")]
pub scheduled_times_histogram: ::core::option::Option<DurationHistogram>,
/// A histogram of task poll durations.
///
/// This is either:
Expand Down
48 changes: 48 additions & 0 deletions console-subscriber/examples/long_sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::time::Duration;

use console_subscriber::ConsoleLayer;
use tokio::task::{self, yield_now};
use tracing::info;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
ConsoleLayer::builder()
.with_default_env()
.publish_interval(Duration::from_millis(100))
.init();

let long_sleeps = task::Builder::new()
.name("long-sleeps")
.spawn(long_sleeps(5000))
.unwrap();

let sleep_forever = task::Builder::new()
.name("sleep-forever")
.spawn(sleep_forever(5000))
.unwrap();

match (long_sleeps.await, sleep_forever.await) {
(Ok(_), Ok(_)) => info!("Success"),
(_, _) => info!("Error awaiting tasks."),
}

tokio::time::sleep(Duration::from_millis(200)).await;

Ok(())
}

async fn long_sleeps(inc: u64) {
let millis = inc;
loop {
std::thread::sleep(Duration::from_millis(millis));

yield_now().await;
}
}

async fn sleep_forever(inc: u64) {
let millis = inc;
loop {
std::thread::sleep(Duration::from_millis(millis));
}
}
2 changes: 2 additions & 0 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ impl Aggregator {
task_id: Some(id.clone().into()),
now,
poll_times_histogram: Some(stats.poll_duration_histogram()),
scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
})
{
self.details_watchers
Expand Down Expand Up @@ -374,6 +375,7 @@ impl Aggregator {
task_id: Some(id.clone().into()),
now: Some(self.base_time.to_timestamp(Instant::now())),
poll_times_histogram: Some(task_stats.poll_duration_histogram()),
scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
};
watchers.retain(|watch| watch.update(&details));
!watchers.is_empty()
Expand Down
24 changes: 24 additions & 0 deletions console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub struct Builder {
/// Any polls exceeding this duration will be clamped to this value. Higher
/// values will result in more memory usage.
pub(super) poll_duration_max: Duration,

/// The maximum value for the task scheduled duration histogram.
///
/// Any scheduled times exceeding this duration will be clamped to this
/// value. Higher values will result in more memory usage.
pub(super) scheduled_duration_max: Duration,
}

impl Default for Builder {
Expand All @@ -60,6 +66,7 @@ impl Default for Builder {
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
retention: ConsoleLayer::DEFAULT_RETENTION,
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
scheduled_duration_max: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX,
server_addr: ServerAddr::Tcp(SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT)),
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
Expand Down Expand Up @@ -235,6 +242,23 @@ impl Builder {
}
}

/// Sets the maximum value for task scheduled duration histograms.
///
/// Any scheduled duration (the time from a task being woken until it is next
/// polled) exceeding this value will be clamped down to this duration
/// and recorded as an outlier.
///
/// By default, this is [one second]. Higher values will increase per-task
/// memory usage.
///
/// [one second]: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX
pub fn scheduled_duration_histogram_max(self, max: Duration) -> Self {
Self {
scheduled_duration_max: max,
..self
}
}

/// Sets whether tasks, resources, and async ops from the console
/// subscriber thread are recorded.
///
Expand Down
22 changes: 21 additions & 1 deletion console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ pub struct ConsoleLayer {
///
/// By default, this is one second.
max_poll_duration_nanos: u64,

/// Maximum value for the scheduled time histogram.
///
/// By default, this is one second.
max_scheduled_duration_nanos: u64,
}

/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
Expand Down Expand Up @@ -273,6 +278,7 @@ impl ConsoleLayer {
?config.recording_path,
?config.filter_env_var,
?config.poll_duration_max,
?config.scheduled_duration_max,
?base_time,
"configured console subscriber"
);
Expand Down Expand Up @@ -310,6 +316,7 @@ impl ConsoleLayer {
recorder,
base_time,
max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
};
(layer, server)
}
Expand Down Expand Up @@ -365,6 +372,15 @@ impl ConsoleLayer {
/// See also [`Builder::poll_duration_histogram_max`].
pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);

/// The default maximum value for the task scheduled duration histogram.
///
/// Any scheduled duration (the time from a task being woken until it is next
/// polled) exceeding this will be clamped to this value. By default, the
/// maximum scheduled duration is one second.
///
/// See also [`Builder::scheduled_duration_histogram_max`].
pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);

fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
self.spawn_callsites.contains(meta)
}
Expand Down Expand Up @@ -567,7 +583,11 @@ where
fields: record::SerializeFields(fields.clone()),
});
if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
let stats = Arc::new(stats::TaskStats::new(self.max_poll_duration_nanos, at));
let stats = Arc::new(stats::TaskStats::new(
self.max_poll_duration_nanos,
self.max_scheduled_duration_nanos,
at,
));
let event = Event::Spawn {
id: id.clone(),
stats: stats.clone(),
Expand Down
42 changes: 30 additions & 12 deletions console-subscriber/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ struct PollTimestamps<H> {
last_poll_ended: Option<Instant>,
busy_time: Duration,
scheduled_time: Duration,
histogram: H,
poll_histogram: H,
scheduled_histogram: H,
}

#[derive(Debug)]
Expand All @@ -128,8 +129,8 @@ struct Histogram {
max_outlier: Option<u64>,
}

trait RecordPoll {
fn record_poll_duration(&mut self, duration: Duration);
trait RecordDuration {
fn record_duration(&mut self, duration: Duration);
}

impl TimeAnchor {
Expand All @@ -153,15 +154,20 @@ impl TimeAnchor {
}

impl TaskStats {
pub(crate) fn new(poll_duration_max: u64, created_at: Instant) -> Self {
pub(crate) fn new(
poll_duration_max: u64,
scheduled_duration_max: u64,
created_at: Instant,
) -> Self {
Self {
is_dirty: AtomicBool::new(true),
is_dropped: AtomicBool::new(false),
created_at,
dropped_at: Mutex::new(None),
poll_stats: PollStats {
timestamps: Mutex::new(PollTimestamps {
histogram: Histogram::new(poll_duration_max),
poll_histogram: Histogram::new(poll_duration_max),
scheduled_histogram: Histogram::new(scheduled_duration_max),
first_poll: None,
last_wake: None,
last_poll_started: None,
Expand Down Expand Up @@ -240,10 +246,18 @@ impl TaskStats {
}

pub(crate) fn poll_duration_histogram(&self) -> proto::tasks::task_details::PollTimesHistogram {
let hist = self.poll_stats.timestamps.lock().histogram.to_proto();
let hist = self.poll_stats.timestamps.lock().poll_histogram.to_proto();
proto::tasks::task_details::PollTimesHistogram::Histogram(hist)
}

pub(crate) fn scheduled_duration_histogram(&self) -> proto::tasks::DurationHistogram {
self.poll_stats
.timestamps
.lock()
.scheduled_histogram
.to_proto()
}

#[inline]
fn make_dirty(&self) {
self.is_dirty.swap(true, AcqRel);
Expand Down Expand Up @@ -475,7 +489,7 @@ impl ToProto for ResourceStats {

// === impl PollStats ===

impl<H: RecordPoll> PollStats<H> {
impl<H: RecordDuration> PollStats<H> {
fn wake(&self, at: Instant) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
Expand Down Expand Up @@ -515,6 +529,10 @@ impl<H: RecordPoll> PollStats<H> {
return;
}
};

// if we have a scheduled time histogram, add the timestamp
timestamps.scheduled_histogram.record_duration(elapsed);

timestamps.scheduled_time += elapsed;
}

Expand Down Expand Up @@ -550,7 +568,7 @@ impl<H: RecordPoll> PollStats<H> {
};

// if we have a poll time histogram, add the timestamp
timestamps.histogram.record_poll_duration(elapsed);
timestamps.poll_histogram.record_duration(elapsed);

timestamps.busy_time += elapsed;
}
Expand Down Expand Up @@ -636,8 +654,8 @@ impl Histogram {
}
}

impl RecordPoll for Histogram {
fn record_poll_duration(&mut self, duration: Duration) {
impl RecordDuration for Histogram {
fn record_duration(&mut self, duration: Duration) {
let mut duration_ns = duration.as_nanos() as u64;

// clamp the duration to the histogram's max value
Expand All @@ -653,8 +671,8 @@ impl RecordPoll for Histogram {
}
}

impl RecordPoll for () {
fn record_poll_duration(&mut self, _: Duration) {
impl RecordDuration for () {
fn record_duration(&mut self, _: Duration) {
// do nothing
}
}
2 changes: 1 addition & 1 deletion tokio-console/src/state/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl DurationHistogram {
})
}

fn from_proto(proto: &proto::DurationHistogram) -> Option<Self> {
pub(crate) fn from_proto(proto: &proto::DurationHistogram) -> Option<Self> {
let histogram = deserialize_histogram(&proto.raw_histogram[..])?;
Some(Self {
histogram,
Expand Down
4 changes: 4 additions & 0 deletions tokio-console/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ impl State {
.poll_times_histogram
.as_ref()
.and_then(histogram::DurationHistogram::from_poll_durations),
scheduled_times_histogram: update
.scheduled_times_histogram
.as_ref()
.and_then(histogram::DurationHistogram::from_proto),
};

*self.current_task_details.borrow_mut() = Some(details);
Expand Down
5 changes: 5 additions & 0 deletions tokio-console/src/state/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub(crate) struct TasksState {
pub(crate) struct Details {
pub(crate) span_id: SpanId,
pub(crate) poll_times_histogram: Option<DurationHistogram>,
pub(crate) scheduled_times_histogram: Option<DurationHistogram>,
}

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -264,6 +265,10 @@ impl Details {
pub(crate) fn poll_times_histogram(&self) -> Option<&DurationHistogram> {
self.poll_times_histogram.as_ref()
}

pub(crate) fn scheduled_times_histogram(&self) -> Option<&DurationHistogram> {
self.scheduled_times_histogram.as_ref()
}
}

impl Task {
Expand Down
16 changes: 15 additions & 1 deletion tokio-console/src/view/durations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ pub(crate) struct Durations<'a> {
percentiles_title: &'a str,
/// Title for histogram sparkline block
histogram_title: &'a str,
/// Fixed width for percentiles block
percentiles_width: u16,
}

impl<'a> Widget for Durations<'a> {
fn render(self, area: tui::layout::Rect, buf: &mut tui::buffer::Buffer) {
// Only split the durations area in half if we're also drawing a
// sparkline. We require UTF-8 to draw the sparkline and also enough width.
let (percentiles_area, histogram_area) = if self.styles.utf8 {
let percentiles_width = cmp::max(self.percentiles_title.len() as u16, 13_u16) + 2;
let percentiles_width = match self.percentiles_width {
// Fixed width
width if width > 0 => width,
// Long enough for the title or for a single line
// like "p99: 544.77µs" (13) (and borders on the sides).
_ => cmp::max(self.percentiles_title.len() as u16, 13_u16) + 2,
};
Comment on lines +48 to +54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the Durations widget currently constructed with a width of 0? Is that used for the resource details view, where we only have one durations widget?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually (and I had to go and check this to be sure), the resource details view doesn't have a Durations widget currently (although it will be much easier to add after the refactoring). The width=0 value here is set as a sensible default if a fixed width isn't set.

The fixed width was introduced in this PR so that the percentiles and sparklines widgets are the same width for both the Durations widgets irrespective of the title length (so that it looks nicer).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could change the constructor to require that a non-zero width is always provided, and remove this behavior, if we intend to always construct the widget with a fixed width? This might be a bit simpler. Not a blocker though...


// If there isn't enough width left after drawing the percentiles
// then we won't draw the sparkline at all.
Expand Down Expand Up @@ -88,6 +96,7 @@ impl<'a> Durations<'a> {
histogram: None,
percentiles_title: "Percentiles",
histogram_title: "Histogram",
percentiles_width: 0,
}
}

Expand All @@ -105,4 +114,9 @@ impl<'a> Durations<'a> {
self.histogram_title = title;
self
}

pub(crate) fn percentiles_width(mut self, width: u16) -> Self {
self.percentiles_width = width;
self
}
}
Loading