From 645442c29fc388f097eed36d57038bc5c14d1f1c Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 4 Aug 2023 13:22:13 +0100 Subject: [PATCH 1/3] pageserver: avoid logging confusing "ERROR" success messages Example: ``` walreceiver connection handling ended: db error: ERROR: ending streaming to Some("pageserver") at 0/4031CA8 ``` The inner DbError has a severity of ERROR so DbError's Display implementation includes that ERROR, even though we are actually logging the error at INFO level. Introduce an explicit WalReceiverError type, and in its From<> for postgres errors, apply the logic from ExpectedError, for expected errors, and a new condition for successes. The new output looks like: ``` walreceiver connection handling ended with success: ending streaming to Some("pageserver") at 0/154E9C0, receiver is caughtup and there is no computes ``` --- .../walreceiver/connection_manager.rs | 25 ++-- .../walreceiver/walreceiver_connection.rs | 118 +++++++++--------- test_runner/fixtures/neon_fixtures.py | 1 - 3 files changed, 77 insertions(+), 67 deletions(-) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 2642746c791b..e96aa41da0b7 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -38,7 +38,10 @@ use utils::{ lsn::Lsn, }; -use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle}; +use super::{ + walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError, + TaskEvent, TaskHandle, +}; /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. /// Based on the updates, desides whether to start, keep or stop a WAL receiver task. @@ -419,13 +422,19 @@ impl ConnectionManagerState { match res { Ok(()) => Ok(()), Err(e) => { - use super::walreceiver_connection::ExpectedError; - if e.is_expected() { - info!("walreceiver connection handling ended: {e:#}"); - Ok(()) - } else { - // give out an error to have task_mgr give it a really verbose logging - Err(e).context("walreceiver connection handling failure") + match e { + WalReceiverError::SuccessfulCompletion(msg) => { + info!("walreceiver connection handling ended with success: {msg}"); + Ok(()) + } + WalReceiverError::ExpectedSafekeeperError(e) => { + info!("walreceiver connection handling ended: {e}"); + Ok(()) + } + WalReceiverError::Other(e) => { + // give out an error to have task_mgr give it a really verbose logging + Err(e).context("walreceiver connection handling failure") + } } } } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 817a30247ef5..7d1e9b4a39e3 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -8,14 +8,14 @@ use std::{ time::{Duration, SystemTime}, }; -use anyhow::{bail, ensure, Context}; +use anyhow::{anyhow, Context}; use bytes::BytesMut; use chrono::{NaiveDateTime, Utc}; use fail::fail_point; use futures::StreamExt; use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow}; -use postgres_ffi::v14::xlog_utils::normalize_lsn; use postgres_ffi::WAL_SEGMENT_SIZE; +use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError}; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use tokio::{select, sync::watch, time}; @@ -60,6 +60,50 @@ pub(super) struct WalConnectionStatus { pub node: NodeId, } +pub(super) enum WalReceiverError { + /// An error of a type that does not indicate an issue, e.g. a connection closing + ExpectedSafekeeperError(postgres::Error), + /// An "error" message that carries a SUCCESSFUL_COMPLETION status code. Carries + /// the message part of the original postgres error + SuccessfulCompletion(String), + /// Generic error + Other(anyhow::Error), +} + +impl From for WalReceiverError { + fn from(err: tokio_postgres::Error) -> Self { + if let Some(dberror) = err.as_db_error().filter(|db_error| { + db_error.code() == &SqlState::SUCCESSFUL_COMPLETION + && db_error.message().contains("ending streaming") + }) { + // Strip the outer DbError, which carries a misleading "error" severity + Self::SuccessfulCompletion(dberror.message().to_string()) + } else if err.is_closed() + || err + .source() + .and_then(|source| source.downcast_ref::()) + .map(is_expected_io_error) + .unwrap_or(false) + { + Self::ExpectedSafekeeperError(err) + } else { + Self::Other(anyhow::Error::new(err)) + } + } +} + +impl From for WalReceiverError { + fn from(err: anyhow::Error) -> Self { + Self::Other(err) + } +} + +impl From for WalReceiverError { + fn from(err: WalDecodeError) -> Self { + Self::Other(anyhow::Error::new(err)) + } +} + /// Open a connection to the given safekeeper and receive WAL, sending back progress /// messages as we go. pub(super) async fn handle_walreceiver_connection( @@ -70,7 +114,7 @@ pub(super) async fn handle_walreceiver_connection( connect_timeout: Duration, ctx: RequestContext, node: NodeId, -) -> anyhow::Result<()> { +) -> Result<(), WalReceiverError> { debug_assert_current_span_has_tenant_and_timeline_id(); WALRECEIVER_STARTED_CONNECTIONS.inc(); @@ -130,11 +174,15 @@ pub(super) async fn handle_walreceiver_connection( connection_result = connection => match connection_result { Ok(()) => debug!("Walreceiver db connection closed"), Err(connection_error) => { - if connection_error.is_expected() { - // silence, because most likely we've already exited the outer call - // with a similar error. - } else { - warn!("Connection aborted: {connection_error:#}") + match WalReceiverError::from(connection_error) { + WalReceiverError::ExpectedSafekeeperError(_) => { + // silence, because most likely we've already exited the outer call + // with a similar error. + }, + WalReceiverError::SuccessfulCompletion(_) => {} + WalReceiverError::Other(err) => { + warn!("Connection aborted: {err:#}") + } } } }, @@ -180,7 +228,7 @@ pub(super) async fn handle_walreceiver_connection( let mut startpoint = last_rec_lsn; if startpoint == Lsn(0) { - bail!("No previous WAL position"); + return Err(WalReceiverError::Other(anyhow!("No previous WAL position"))); } // There might be some padding after the last full record, skip it. @@ -262,7 +310,9 @@ pub(super) async fn handle_walreceiver_connection( // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are // at risk of hitting a deadlock. - ensure!(lsn.is_aligned()); + if !lsn.is_aligned() { + return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); + } walingest .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) @@ -419,51 +469,3 @@ async fn identify_system(client: &mut Client) -> anyhow::Result Err(IdentifyError.into()) } } - -/// Trait for avoid reporting walreceiver specific expected or "normal" or "ok" errors. -pub(super) trait ExpectedError { - /// Test if this error is an ok error. - /// - /// We don't want to report connectivity problems as real errors towards connection manager because - /// 1. they happen frequently enough to make server logs hard to read and - /// 2. the connection manager can retry other safekeeper. - /// - /// If this function returns `true`, it's such an error. - /// The caller should log it at info level and then report to connection manager that we're done handling this connection. - /// Connection manager will then handle reconnections. - /// - /// If this function returns an `false` the error should be propagated and the connection manager - /// will log the error at ERROR level. - fn is_expected(&self) -> bool; -} - -impl ExpectedError for postgres::Error { - fn is_expected(&self) -> bool { - self.is_closed() - || self - .source() - .and_then(|source| source.downcast_ref::()) - .map(is_expected_io_error) - .unwrap_or(false) - || self - .as_db_error() - .filter(|db_error| { - db_error.code() == &SqlState::SUCCESSFUL_COMPLETION - && db_error.message().contains("ending streaming") - }) - .is_some() - } -} - -impl ExpectedError for anyhow::Error { - fn is_expected(&self) -> bool { - let head = self.downcast_ref::(); - - let tail = self - .chain() - .filter_map(|e| e.downcast_ref::()); - - // check if self or any of the chained/sourced errors are expected - head.into_iter().chain(tail).any(|e| e.is_expected()) - } -} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 392c4d4c0b50..049933a06247 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1691,7 +1691,6 @@ def __init__(self, env: NeonEnv, port: PageserverPort, config_override: Optional # FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected ".*Connection aborted: unexpected message from server*", ".*kill_and_wait_impl.*: wait successful.*", - ".*: db error:.*ending streaming to Some.*", ".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down ".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down # safekeeper connection can fail with this, in the window between timeline creation From b316b5f53a3c3f2c4e99a4cd8a4efa5c9cdc0aec Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 8 Aug 2023 09:07:12 +0100 Subject: [PATCH 2/3] safekeeper: add comments to protect magic error string --- safekeeper/src/send_wal.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 92a7bb703af7..7dbfdece480a 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -498,6 +498,9 @@ impl WalSender<'_, IO> { if let Some(stop_pos) = self.stop_pos { if self.start_pos >= stop_pos { // recovery finished + // Note that "ending streaming" part of the string is used by + // pageserver to identify WalReceiverError::SuccessfulCompletion, + // do not change this string without updating pageserver. return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to walproposer at {}, recovery finished", self.start_pos @@ -568,6 +571,9 @@ impl WalSender<'_, IO> { { if self.tli.should_walsender_stop(remote_consistent_lsn).await { // Terminate if there is nothing more to send. + // Note that "ending streaming" part of the string is used by + // pageserver to identify WalReceiverError::SuccessfulCompletion, + // do not change this string without updating pageserver. return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", self.appname, self.start_pos, From c1a024a1869195fd3b1dc938ca6bbdd082a3e250 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 8 Aug 2023 09:33:18 +0100 Subject: [PATCH 3/3] remove safekeeper comment from the walproposer path --- safekeeper/src/send_wal.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 7dbfdece480a..a0d4bb35d709 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -498,9 +498,6 @@ impl WalSender<'_, IO> { if let Some(stop_pos) = self.stop_pos { if self.start_pos >= stop_pos { // recovery finished - // Note that "ending streaming" part of the string is used by - // pageserver to identify WalReceiverError::SuccessfulCompletion, - // do not change this string without updating pageserver. return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to walproposer at {}, recovery finished", self.start_pos