Skip to content

Commit

Permalink
page_service: don't count time spent flushing towards smgr latency me…
Browse files Browse the repository at this point in the history
…trics (#10042)

## Problem

In #9962 I changed the smgr metrics to include time spent on flush.

It isn't under our (=storage team's) control how long that flush takes
because the client can stop reading requests.

## Summary of changes

Stop the timer as soon as we've buffered up the response in the
`pgb_writer`.

Track flush time in a separate metric.

---------

Co-authored-by: Yuchen Liang <70461588+yliang412@users.noreply.github.com>
  • Loading branch information
problame and yliang412 authored Dec 7, 2024
1 parent b1fd086 commit 4d7111f
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 36 deletions.
138 changes: 125 additions & 13 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,31 +1223,60 @@ pub(crate) mod virtual_file_io_engine {
});
}

pub(crate) struct SmgrOpTimer {
pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
pub(crate) struct SmgrOpTimerInner {
global_latency_histo: Histogram,

// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<Histogram>,

global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,

start: Instant,
throttled: Duration,
op: SmgrQueryType,
}

pub(crate) struct SmgrOpFlushInProgress {
base: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}

impl SmgrOpTimer {
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
let Some(throttle) = throttle else {
return;
};
self.throttled += *throttle;
let inner = self.0.as_mut().expect("other public methods consume self");
inner.throttled += *throttle;
}
}

impl Drop for SmgrOpTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
let (flush_start, inner) = self
.smgr_op_end()
.expect("this method consume self, and the only other caller is drop handler");
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
SmgrOpFlushInProgress {
base: flush_start,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
}
}

let elapsed = match elapsed.checked_sub(self.throttled) {
/// Returns `None`` if this method has already been called, `Some` otherwise.
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
let inner = self.0.take()?;

let now = Instant::now();
let elapsed = now - inner.start;

let elapsed = match elapsed.checked_sub(inner.throttled) {
Some(elapsed) => elapsed,
None => {
use utils::rate_limit::RateLimit;
Expand All @@ -1258,20 +1287,64 @@ impl Drop for SmgrOpTimer {
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
let rate_limit = &mut guard[inner.op];
rate_limit.call(|| {
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
});
elapsed // un-throttled time, more info than just saturating to 0
}
};

let elapsed = elapsed.as_secs_f64();

self.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
inner.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed);
}

Some((now, inner))
}
}

impl Drop for SmgrOpTimer {
fn drop(&mut self) {
self.smgr_op_end();
}
}

impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);

let now = Instant::now();
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let elapsed = now - self.base;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.base = now;
},
|mut observe| {
observe();
},
);

loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}

Expand Down Expand Up @@ -1302,6 +1375,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_getpage_latency: Histogram,
global_batch_size: Histogram,
per_timeline_batch_size: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
}

static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
Expand Down Expand Up @@ -1464,6 +1539,26 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
.set(value.try_into().unwrap());
}

static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_service_pagestream_flush_in_progress_micros",
"Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \
If the flush is particularly slow, this counter will be updated periodically to make slow flushes \
easily discoverable in monitoring. \
Hence, this is NOT a completion latency historgram.",
&["tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});

static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_service_pagestream_flush_in_progress_micros_global",
"Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.",
)
.expect("failed to define a metric")
});

impl SmgrQueryTimePerTimeline {
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
Expand Down Expand Up @@ -1504,13 +1599,21 @@ impl SmgrQueryTimePerTimeline {
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();

let global_flush_in_progress_micros =
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();

Self {
global_started,
global_latency,
per_timeline_getpage_latency,
per_timeline_getpage_started,
global_batch_size,
per_timeline_batch_size,
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
}
}
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
Expand All @@ -1523,13 +1626,17 @@ impl SmgrQueryTimePerTimeline {
None
};

SmgrOpTimer {
SmgrOpTimer(Some(SmgrOpTimerInner {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
start: started_at,
op,
throttled: Duration::ZERO,
}
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
}))
}

pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
Expand Down Expand Up @@ -2777,6 +2884,11 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
}
}

Expand Down
76 changes: 53 additions & 23 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,10 +1017,8 @@ impl PageServerHandler {
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let mut timers: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(handler_results.len());
for handler_result in handler_results {
let response_msg = match handler_result {
let (response_msg, timer) = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
Expand All @@ -1044,34 +1042,66 @@ impl PageServerHandler {
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
(
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
}),
None, // TODO: measure errors
)
}
},
Ok((response_msg, timer)) => {
// Extending the lifetime of the timers so observations on drop
// include the flush time.
timers.push(timer);
response_msg
}
Ok((response_msg, timer)) => (response_msg, Some(timer)),
};

//
// marshal & transmit response message
//

pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = pgb_writer.flush() => {
res?;

// We purposefully don't count flush time into the timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer =
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());

// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = flush_fut => {
res?;
}
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
drop(timers);
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions test_runner/fixtures/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def counter(name: str) -> str:
counter("pageserver_tenant_throttling_wait_usecs_sum"),
counter("pageserver_tenant_throttling_count"),
counter("pageserver_timeline_wal_records_received"),
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
*histogram("pageserver_page_service_batch_size"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold
Expand Down

1 comment on commit 4d7111f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7190 tests run: 6861 passed, 0 failed, 329 skipped (full report)


Flaky tests (2)

Postgres 17

  • test_physical_replication_config_mismatch_max_locks_per_transaction: release-arm64

Postgres 14

Code coverage* (full report)

  • functions: 31.4% (8329 of 26527 functions)
  • lines: 47.7% (65542 of 137349 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
4d7111f at 2024-12-07T11:05:41.022Z :recycle:

Please sign in to comment.