diff --git a/Cargo.lock b/Cargo.lock index f92da5ec512d..da8cefb219ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3642,6 +3642,7 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_with", + "smallvec", "storage_broker", "strum", "strum_macros", diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index f48c1febb53c..ee20613d6db3 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -109,6 +109,8 @@ pub struct ConfigToml { pub virtual_file_io_mode: Option, #[serde(skip_serializing_if = "Option::is_none")] pub no_sync: Option, + #[serde(with = "humantime_serde")] + pub server_side_batch_timeout: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -317,6 +319,8 @@ pub mod defaults { pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512; + + pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None; } impl Default for ConfigToml { @@ -397,6 +401,8 @@ impl Default for ConfigToml { ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), l0_flush: None, virtual_file_io_mode: None, + server_side_batch_timeout: DEFAULT_SERVER_SIDE_BATCH_TIMEOUT + .map(|duration| humantime::parse_duration(duration).unwrap()), tenant_config: TenantConfigToml::default(), no_sync: None, } diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 7419798a60a3..9075a019b4e6 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -716,6 +716,9 @@ impl PostgresBackend { Ok(()) } + // Proto looks like this: + // FeMessage::Query("pagestream_v2{FeMessage::CopyData(PagesetreamFeMessage::GetPage(..))}") + async fn process_message( &mut self, handler: &mut impl Handler, diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index ecb8fa74912c..143d8236dff5 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -84,6 +84,7 @@ enumset = { workspace = true, features = ["serde"]} strum.workspace = true strum_macros.workspace = true wal_decoder.workspace = true +smallvec.workspace = true [target.'cfg(target_os = "linux")'.dependencies] procfs.workspace = true diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index b694a435999f..f7be6ecaabd4 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -182,6 +182,10 @@ pub struct PageServerConf { /// Optionally disable disk syncs (unsafe!) pub no_sync: bool, + + /// Maximum amount of time for which a get page request request + /// might be held up for request merging. + pub server_side_batch_timeout: Option, } /// Token for authentication to safekeepers @@ -336,6 +340,7 @@ impl PageServerConf { concurrent_tenant_warmup, concurrent_tenant_size_logical_size_queries, virtual_file_io_engine, + server_side_batch_timeout, tenant_config, no_sync, } = config_toml; @@ -377,6 +382,7 @@ impl PageServerConf { image_compression, timeline_offloading, ephemeral_bytes_per_memory_kb, + server_side_batch_timeout, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 147372918654..3cdc2a761e33 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1187,6 +1187,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> { ctx: &'c RequestContext, start: std::time::Instant, op: SmgrQueryType, + count: usize, } impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> { @@ -1214,10 +1215,13 @@ impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> { elapsed } }; - self.global_latency_histo - .observe(ex_throttled.as_secs_f64()); - if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo { - per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64()); + + for _ in 0..self.count { + self.global_latency_histo + .observe(ex_throttled.as_secs_f64()); + if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo { + per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64()); + } } } } @@ -1385,6 +1389,14 @@ impl SmgrQueryTimePerTimeline { &'a self, op: SmgrQueryType, ctx: &'c RequestContext, + ) -> Option { + self.start_timer_many(op, 1, ctx) + } + pub(crate) fn start_timer_many<'c: 'a, 'a>( + &'a self, + op: SmgrQueryType, + count: usize, + ctx: &'c RequestContext, ) -> Option { let start = Instant::now(); @@ -1422,6 +1434,7 @@ impl SmgrQueryTimePerTimeline { ctx, start, op, + count, }) } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f07474df6a7a..a429dff1fd46 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,13 +7,13 @@ use bytes::Buf; use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; -use pageserver_api::models::TenantState; +use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, - PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, - PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, - PagestreamNblocksResponse, PagestreamProtocolVersion, + PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest, + PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, + PagestreamProtocolVersion, }; use pageserver_api::shard::TenantShardId; use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError}; @@ -44,7 +44,7 @@ use crate::basebackup; use crate::basebackup::BasebackupError; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; -use crate::metrics; +use crate::metrics::{self}; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::pgdatadir_mapping::Version; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; @@ -59,7 +59,7 @@ use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; use crate::tenant::Timeline; use pageserver_api::key::rel_block_to_key; -use pageserver_api::reltag::SlruKind; +use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; @@ -105,6 +105,7 @@ pub fn spawn( pg_auth, tcp_listener, conf.pg_auth_type, + conf.server_side_batch_timeout, libpq_ctx, cancel.clone(), ) @@ -153,6 +154,7 @@ pub async fn libpq_listener_main( auth: Option>, listener: tokio::net::TcpListener, auth_type: AuthType, + server_side_batch_timeout: Option, listener_ctx: RequestContext, listener_cancel: CancellationToken, ) -> Connections { @@ -183,6 +185,7 @@ pub async fn libpq_listener_main( local_auth, socket, auth_type, + server_side_batch_timeout, connection_ctx, connections_cancel.child_token(), )); @@ -210,6 +213,7 @@ async fn page_service_conn_main( auth: Option>, socket: tokio::net::TcpStream, auth_type: AuthType, + server_side_batch_timeout: Option, connection_ctx: RequestContext, cancel: CancellationToken, ) -> ConnectionHandlerResult { @@ -260,8 +264,13 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = - PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone()); + let mut conn_handler = PageServerHandler::new( + tenant_manager, + auth, + server_side_batch_timeout, + connection_ctx, + cancel.clone(), + ); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; match pgbackend.run(&mut conn_handler, &cancel).await { @@ -304,6 +313,12 @@ struct PageServerHandler { cancel: CancellationToken, timeline_handles: TimelineHandles, + + /// Messages queued up for the next processing batch + next_batch: Option, + + /// See [`PageServerConf::server_side_batch_timeout`] + server_side_batch_timeout: Option, } struct TimelineHandles { @@ -517,10 +532,47 @@ impl From for QueryError { } } +enum BatchedFeMessage { + Exists { + span: Span, + req: models::PagestreamExistsRequest, + }, + Nblocks { + span: Span, + req: models::PagestreamNblocksRequest, + }, + GetPage { + span: Span, + shard: timeline::handle::Handle, + effective_request_lsn: Lsn, + pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + }, + DbSize { + span: Span, + req: models::PagestreamDbSizeRequest, + }, + GetSlruSegment { + span: Span, + req: models::PagestreamGetSlruSegmentRequest, + }, + RespondError { + span: Span, + error: PageStreamError, + }, +} + +enum BatchOrEof { + /// In the common case, this has one entry. + /// At most, it has two entries: the first is the leftover batch, the second is an error. + Batch(smallvec::SmallVec<[BatchedFeMessage; 1]>), + Eof, +} + impl PageServerHandler { pub fn new( tenant_manager: Arc, auth: Option>, + server_side_batch_timeout: Option, connection_ctx: RequestContext, cancel: CancellationToken, ) -> Self { @@ -530,6 +582,8 @@ impl PageServerHandler { connection_ctx, timeline_handles: TimelineHandles::new(tenant_manager), cancel, + next_batch: None, + server_side_batch_timeout, } } @@ -557,6 +611,221 @@ impl PageServerHandler { ) } + async fn read_batch_from_connection( + &mut self, + pgb: &mut PostgresBackend, + tenant_id: &TenantId, + timeline_id: &TimelineId, + ctx: &RequestContext, + ) -> Result, QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + { + let mut batch = self.next_batch.take(); + let mut batch_started_at: Option = None; + + let next_batch: Option = loop { + let sleep_fut = match (self.server_side_batch_timeout, batch_started_at) { + (Some(batch_timeout), Some(started_at)) => futures::future::Either::Left( + tokio::time::sleep_until((started_at + batch_timeout).into()), + ), + _ => futures::future::Either::Right(futures::future::pending()), + }; + + let msg = tokio::select! { + biased; + _ = self.cancel.cancelled() => { + return Err(QueryError::Shutdown) + } + msg = pgb.read_message() => { + msg + } + _ = sleep_fut => { + assert!(batch.is_some()); + break None; + } + }; + let copy_data_bytes = match msg? { + Some(FeMessage::CopyData(bytes)) => bytes, + Some(FeMessage::Terminate) => { + return Ok(Some(BatchOrEof::Eof)); + } + Some(m) => { + return Err(QueryError::Other(anyhow::anyhow!( + "unexpected message: {m:?} during COPY" + ))); + } + None => { + return Ok(Some(BatchOrEof::Eof)); + } // client disconnected + }; + trace!("query: {copy_data_bytes:?}"); + fail::fail_point!("ps::handle-pagerequest-message"); + + // parse request + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + + let this_msg = match neon_fe_msg { + PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists { + span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn), + req, + }, + PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks { + span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn), + req, + }, + PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize { + span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn), + req, + }, + PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment { + span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn), + req, + }, + PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + request_lsn, + not_modified_since, + rel, + blkno, + }) => { + // shard_id is filled in by the handler + let span = tracing::info_span!( + "handle_get_page_at_lsn_request_batched", + %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, + batch_size = tracing::field::Empty, batch_id = tracing::field::Empty + ); + + macro_rules! current_batch_and_error { + ($error:expr) => {{ + let error = BatchedFeMessage::RespondError { + span, + error: $error, + }; + let batch_and_error = match batch { + Some(b) => smallvec::smallvec![b, error], + None => smallvec::smallvec![error], + }; + Ok(Some(BatchOrEof::Batch(batch_and_error))) + }}; + } + + let key = rel_block_to_key(rel, blkno); + let shard = match self + .timeline_handles + .get(*tenant_id, *timeline_id, ShardSelector::Page(key)) + .instrument(span.clone()) + .await + { + Ok(tl) => tl, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return current_batch_and_error!(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into() + )); + } + Err(e) => { + return current_batch_and_error!(e.into()); + } + }; + let effective_request_lsn = match Self::wait_or_get_last_lsn( + &shard, + request_lsn, + not_modified_since, + &shard.get_latest_gc_cutoff_lsn(), + ctx, + ) + // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait + .await + { + Ok(lsn) => lsn, + Err(e) => { + return current_batch_and_error!(e); + } + }; + BatchedFeMessage::GetPage { + span, + shard, + effective_request_lsn, + pages: smallvec::smallvec![(rel, blkno)], + } + } + }; + + let batch_timeout = match self.server_side_batch_timeout { + Some(value) => value, + None => { + // Batching is not enabled - stop on the first message. + return Ok(Some(BatchOrEof::Batch(smallvec::smallvec![this_msg]))); + } + }; + + // check if we can batch + match (&mut batch, this_msg) { + (None, this_msg) => { + batch = Some(this_msg); + } + ( + Some(BatchedFeMessage::GetPage { + span: _, + shard: accum_shard, + pages: accum_pages, + effective_request_lsn: accum_lsn, + }), + BatchedFeMessage::GetPage { + span: _, + shard: this_shard, + pages: this_pages, + effective_request_lsn: this_lsn, + }, + ) if async { + assert_eq!(this_pages.len(), 1); + if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { + assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); + return false; + } + if (accum_shard.tenant_shard_id, accum_shard.timeline_id) + != (this_shard.tenant_shard_id, this_shard.timeline_id) + { + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logic for keeping responses in order does not support that. + return false; + } + // the vectored get currently only supports a single LSN, so, bounce as soon + // as the effective request_lsn changes + if *accum_lsn != this_lsn { + return false; + } + true + } + .await => + { + // ok to batch + accum_pages.extend(this_pages); + } + (Some(_), this_msg) => { + // by default, don't continue batching + break Some(this_msg); + } + } + + // batching impl piece + let started_at = batch_started_at.get_or_insert_with(Instant::now); + if started_at.elapsed() > batch_timeout { + break None; + } + }; + + self.next_batch = next_batch; + Ok(batch.map(|b| BatchOrEof::Batch(smallvec::smallvec![b]))) + } + /// Pagestream sub-protocol handler. /// /// It is a simple request-response protocol inside a COPYBOTH session. @@ -592,133 +861,165 @@ impl PageServerHandler { } } + // If [`PageServerHandler`] is reused for multiple pagestreams, + // then make sure to not process requests from the previous ones. + self.next_batch = None; + loop { - // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData) - let msg = tokio::select! { - biased; - _ = self.cancel.cancelled() => { - return Err(QueryError::Shutdown) + let maybe_batched = self + .read_batch_from_connection(pgb, &tenant_id, &timeline_id, &ctx) + .await?; + let batched = match maybe_batched { + Some(BatchOrEof::Batch(b)) => b, + Some(BatchOrEof::Eof) => { + break; } - msg = pgb.read_message() => { msg } - }; - let copy_data_bytes = match msg? { - Some(FeMessage::CopyData(bytes)) => bytes, - Some(FeMessage::Terminate) => break, - Some(m) => { - return Err(QueryError::Other(anyhow::anyhow!( - "unexpected message: {m:?} during COPY" - ))); + None => { + continue; } - None => break, // client disconnected }; - trace!("query: {copy_data_bytes:?}"); - fail::fail_point!("ps::handle-pagerequest-message"); - - // parse request - let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; - - // invoke handler function - let (handler_result, span) = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => { - fail::fail_point!("ps::handle-pagerequest-message::exists"); - let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); - ( - self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - PagestreamFeMessage::Nblocks(req) => { - fail::fail_point!("ps::handle-pagerequest-message::nblocks"); - let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); - ( - self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - PagestreamFeMessage::GetPage(req) => { - fail::fail_point!("ps::handle-pagerequest-message::getpage"); - // shard_id is filled in by the handler - let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn); - ( - self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - PagestreamFeMessage::DbSize(req) => { - fail::fail_point!("ps::handle-pagerequest-message::dbsize"); - let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); - ( - self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - span, - ) - } - PagestreamFeMessage::GetSlruSegment(req) => { - fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); - let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); - ( - self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, + for batch in batched { + // invoke handler function + let (handler_results, span): ( + Vec>, + _, + ) = match batch { + BatchedFeMessage::Exists { span, req } => { + fail::fail_point!("ps::handle-pagerequest-message::exists"); + ( + vec![ + self.handle_get_rel_exists_request( + tenant_id, + timeline_id, + &req, + &ctx, + ) + .instrument(span.clone()) + .await, + ], + span, + ) + } + BatchedFeMessage::Nblocks { span, req } => { + fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + ( + vec![ + self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + ], + span, + ) + } + BatchedFeMessage::GetPage { span, - ) - } - }; - - // Map handler result to protocol behavior. - // Some handler errors cause exit from pagestream protocol. - // Other handler errors are sent back as an error message and we stay in pagestream protocol. - let response_msg = match handler_result { - Err(e) => match &e { - PageStreamError::Shutdown => { - // If we fail to fulfil a request during shutdown, which may be _because_ of - // shutdown, then do not send the error to the client. Instead just drop the - // connection. - span.in_scope(|| info!("dropping connection due to shutdown")); - return Err(QueryError::Shutdown); + shard, + effective_request_lsn, + pages, + } => { + fail::fail_point!("ps::handle-pagerequest-message::getpage"); + ( + { + let npages = pages.len(); + let res = self + .handle_get_page_at_lsn_request_batched( + &shard, + effective_request_lsn, + pages, + &ctx, + ) + .instrument(span.clone()) + .await; + assert_eq!(res.len(), npages); + res + }, + span, + ) } - PageStreamError::Reconnect(reason) => { - span.in_scope(|| info!("handler requested reconnect: {reason}")); - return Err(QueryError::Reconnect); + BatchedFeMessage::DbSize { span, req } => { + fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + ( + vec![ + self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + ], + span, + ) } - PageStreamError::Read(_) - | PageStreamError::LsnTimeout(_) - | PageStreamError::NotFound(_) - | PageStreamError::BadRequest(_) => { - // print the all details to the log with {:#}, but for the client the - // error message is enough. Do not log if shutting down, as the anyhow::Error - // here includes cancellation which is not an error. - let full = utils::error::report_compact_sources(&e); - span.in_scope(|| { - error!("error reading relation or page version: {full:#}") - }); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) + BatchedFeMessage::GetSlruSegment { span, req } => { + fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + ( + vec![ + self.handle_get_slru_segment_request( + tenant_id, + timeline_id, + &req, + &ctx, + ) + .instrument(span.clone()) + .await, + ], + span, + ) } - }, - Ok(response_msg) => response_msg, - }; + BatchedFeMessage::RespondError { span, error } => { + // We've already decided to respond with an error, so we don't need to + // call the handler. + (vec![Err(error)], span) + } + }; - // marshal & transmit response message - pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - tokio::select! { - biased; - _ = self.cancel.cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); - return Err(QueryError::Shutdown) + // Map handler result to protocol behavior. + // Some handler errors cause exit from pagestream protocol. + // Other handler errors are sent back as an error message and we stay in pagestream protocol. + for handler_result in handler_results { + let response_msg = match handler_result { + Err(e) => match &e { + PageStreamError::Shutdown => { + // If we fail to fulfil a request during shutdown, which may be _because_ of + // shutdown, then do not send the error to the client. Instead just drop the + // connection. + span.in_scope(|| info!("dropping connection due to shutdown")); + return Err(QueryError::Shutdown); + } + PageStreamError::Reconnect(reason) => { + span.in_scope(|| info!("handler requested reconnect: {reason}")); + return Err(QueryError::Reconnect); + } + PageStreamError::Read(_) + | PageStreamError::LsnTimeout(_) + | PageStreamError::NotFound(_) + | PageStreamError::BadRequest(_) => { + // print the all details to the log with {:#}, but for the client the + // error message is enough. Do not log if shutting down, as the anyhow::Error + // here includes cancellation which is not an error. + let full = utils::error::report_compact_sources(&e); + span.in_scope(|| { + error!("error reading relation or page version: {full:#}") + }); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + } + }, + Ok(response_msg) => response_msg, + }; + + // marshal & transmit response message + pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; } - res = pgb.flush() => { - res?; + tokio::select! { + biased; + _ = self.cancel.cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown) + } + res = pgb.flush() => { + res?; + } } } } @@ -964,60 +1265,30 @@ impl PageServerHandler { })) } - #[instrument(skip_all, fields(shard_id))] - async fn handle_get_page_at_lsn_request( + #[instrument(skip_all)] + async fn handle_get_page_at_lsn_request_batched( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, - req: &PagestreamGetPageRequest, + timeline: &Timeline, + effective_lsn: Lsn, + pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, ctx: &RequestContext, - ) -> Result { - let timeline = match self - .timeline_handles - .get( - tenant_id, - timeline_id, - ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)), - ) - .await - { - Ok(tl) => tl, - Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { - // We already know this tenant exists in general, because we resolved it at - // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node: the client's knowledge of shard->pageserver - // mapping is out of date. - // - // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via - // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration - // and talk to a different pageserver. - return Err(PageStreamError::Reconnect( - "getpage@lsn request routed to wrong shard".into(), - )); - } - Err(e) => return Err(e.into()), - }; - - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx); - - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); - let lsn = Self::wait_or_get_last_lsn( - &timeline, - req.request_lsn, - req.not_modified_since, - &latest_gc_cutoff_lsn, + ) -> Vec> { + debug_assert_current_span_has_tenant_and_timeline_id(); + let _timer = timeline.query_metrics.start_timer_many( + metrics::SmgrQueryType::GetPageAtLsn, + pages.len(), ctx, - ) - .await?; + ); - let page = timeline - .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx) - .await?; + let pages = timeline + .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx) + .await; - Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { - page, + Vec::from_iter(pages.into_iter().map(|page| { + page.map(|page| { + PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page }) + }) + .map_err(PageStreamError::from) })) } @@ -1674,6 +1945,13 @@ fn set_tracing_field_shard_id(timeline: &Timeline) { debug_assert_current_span_has_tenant_and_timeline_id(); } +struct WaitedForLsn(Lsn); +impl From for Lsn { + fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self { + lsn + } +} + #[cfg(test)] mod tests { use utils::shard::ShardCount; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 7c1abbf3e2d1..5995d1cc5726 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -10,10 +10,15 @@ use super::tenant::{PageReconstructError, Timeline}; use crate::aux_file; use crate::context::RequestContext; use crate::keyspace::{KeySpace, KeySpaceAccum}; -use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; +use crate::span::{ + debug_assert_current_span_has_tenant_and_timeline_id, + debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, +}; +use crate::tenant::timeline::GetVectoredError; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; +use itertools::Itertools; use pageserver_api::key::Key; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, @@ -30,7 +35,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; -use std::collections::{hash_map, HashMap, HashSet}; +use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; @@ -193,26 +198,195 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result { - if tag.relnode == 0 { - return Err(PageReconstructError::Other( - RelationError::InvalidRelnode.into(), - )); + match version { + Version::Lsn(effective_lsn) => { + let pages = smallvec::smallvec![(tag, blknum)]; + let res = self + .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx) + .await; + assert_eq!(res.len(), 1); + res.into_iter().next().unwrap() + } + Version::Modified(modification) => { + if tag.relnode == 0 { + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); + } + + let nblocks = self.get_rel_size(tag, version, ctx).await?; + if blknum >= nblocks { + debug!( + "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", + tag, + blknum, + version.get_lsn(), + nblocks + ); + return Ok(ZERO_PAGE.clone()); + } + + let key = rel_block_to_key(tag, blknum); + modification.get(key, ctx).await + } } + } - let nblocks = self.get_rel_size(tag, version, ctx).await?; - if blknum >= nblocks { - debug!( - "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", - tag, - blknum, - version.get_lsn(), - nblocks - ); - return Ok(ZERO_PAGE.clone()); + /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages. + /// + /// The ordering of the returned vec corresponds to the ordering of `pages`. + pub(crate) async fn get_rel_page_at_lsn_batched( + &self, + pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + effective_lsn: Lsn, + ctx: &RequestContext, + ) -> Vec> { + debug_assert_current_span_has_tenant_and_timeline_id(); + + let mut slots_filled = 0; + let page_count = pages.len(); + + // Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API. + let mut result = Vec::with_capacity(pages.len()); + let result_slots = result.spare_capacity_mut(); + + let mut keys_slots: BTreeMap> = BTreeMap::default(); + for (response_slot_idx, (tag, blknum)) in pages.into_iter().enumerate() { + if tag.relnode == 0 { + result_slots[response_slot_idx].write(Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + ))); + + slots_filled += 1; + continue; + } + + let nblocks = match self + .get_rel_size(tag, Version::Lsn(effective_lsn), ctx) + .await + { + Ok(nblocks) => nblocks, + Err(err) => { + result_slots[response_slot_idx].write(Err(err)); + slots_filled += 1; + continue; + } + }; + + if blknum >= nblocks { + debug!( + "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", + tag, blknum, effective_lsn, nblocks + ); + result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone())); + slots_filled += 1; + continue; + } + + let key = rel_block_to_key(tag, blknum); + + let key_slots = keys_slots.entry(key).or_default(); + key_slots.push(response_slot_idx); + } + + let keyspace = { + // add_key requires monotonicity + let mut acc = KeySpaceAccum::new(); + for key in keys_slots + .keys() + // in fact it requires strong monotonicity + .dedup() + { + acc.add_key(*key); + } + acc.to_keyspace() + }; + + match self.get_vectored(keyspace, effective_lsn, ctx).await { + Ok(results) => { + for (key, res) in results { + let mut key_slots = keys_slots.remove(&key).unwrap().into_iter(); + let first_slot = key_slots.next().unwrap(); + + for slot in key_slots { + let clone = match &res { + Ok(buf) => Ok(buf.clone()), + Err(err) => Err(match err { + PageReconstructError::Cancelled => { + PageReconstructError::Cancelled + } + + x @ PageReconstructError::Other(_) | + x @ PageReconstructError::AncestorLsnTimeout(_) | + x @ PageReconstructError::WalRedo(_) | + x @ PageReconstructError::MissingKey(_) => { + PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}")) + }, + }), + }; + + result_slots[slot].write(clone); + slots_filled += 1; + } + + result_slots[first_slot].write(res); + slots_filled += 1; + } + } + Err(err) => { + // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size + // (We enforce the max batch size outside of this function, in the code that constructs the batch request.) + for slot in keys_slots.values().flatten() { + // this whole `match` is a lot like `From for PageReconstructError` + // but without taking ownership of the GetVectoredError + let err = match &err { + GetVectoredError::Cancelled => { + Err(PageReconstructError::Cancelled) + } + // TODO: restructure get_vectored API to make this error per-key + GetVectoredError::MissingKey(err) => { + Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}"))) + } + // TODO: restructure get_vectored API to make this error per-key + GetVectoredError::GetReadyAncestorError(err) => { + Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"))) + } + // TODO: restructure get_vectored API to make this error per-key + GetVectoredError::Other(err) => { + Err(PageReconstructError::Other( + anyhow::anyhow!("whole vectored get request failed: {err:?}"), + )) + } + // TODO: we can prevent this error class by moving this check into the type system + GetVectoredError::InvalidLsn(e) => { + Err(anyhow::anyhow!("invalid LSN: {e:?}").into()) + } + // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS + // TODO: we can prevent this error class by moving this check into the type system + GetVectoredError::Oversized(err) => { + Err(anyhow::anyhow!( + "batching oversized: {err:?}" + ) + .into()) + } + }; + + result_slots[*slot].write(err); + } + + slots_filled += keys_slots.values().map(|slots| slots.len()).sum::(); + } + }; + + assert_eq!(slots_filled, page_count); + // SAFETY: + // 1. `result` and any of its uninint members are not read from until this point + // 2. The length below is tracked at run-time and matches the number of requested pages. + unsafe { + result.set_len(page_count); } - let key = rel_block_to_key(tag, blknum); - version.get(self, key, ctx).await + result } // Get size of a database in blocks diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 38d69760f276..ad6ccbc85466 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1528,6 +1528,11 @@ mod tests { assert_current_logical_size(&tline, Lsn(0x50)); + let test_span = tracing::info_span!(parent: None, "test", + tenant_id=%tline.tenant_shard_id.tenant_id, + shard_id=%tline.tenant_shard_id.shard_slug(), + timeline_id=%tline.timeline_id); + // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!( tline @@ -1562,6 +1567,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 0 at 2") ); @@ -1569,6 +1575,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 0 at 3") ); @@ -1576,12 +1583,14 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 0 at 3") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 1 at 4") ); @@ -1589,18 +1598,21 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 0 at 3") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 1 at 4") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 2 at 5") ); @@ -1623,12 +1635,14 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 0 at 3") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 1 at 4") ); @@ -1643,6 +1657,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 2 at 5") ); @@ -1675,12 +1690,14 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx) + .instrument(test_span.clone()) .await?, ZERO_PAGE ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 1") ); @@ -1701,6 +1718,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx) + .instrument(test_span.clone()) .await?, ZERO_PAGE ); @@ -1708,6 +1726,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx) + .instrument(test_span.clone()) .await?, test_img("foo blk 1500") ); @@ -1815,6 +1834,11 @@ mod tests { } m.commit(&ctx).await?; + let test_span = tracing::info_span!(parent: None, "test", + tenant_id=%tline.tenant_shard_id.tenant_id, + shard_id=%tline.tenant_shard_id.shard_slug(), + timeline_id=%tline.timeline_id); + // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( tline @@ -1847,6 +1871,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx) + .instrument(test_span.clone()) .await?, test_img(&data) ); @@ -1874,6 +1899,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx) + .instrument(test_span.clone()) .await?, test_img(&data) ); @@ -1892,6 +1918,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx) + .instrument(test_span.clone()) .await?, test_img(&data) ); @@ -1928,6 +1955,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx) + .instrument(test_span.clone()) .await?, test_img(&data) );