Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio-epoll-uring: retry on launch failures due to locked memory #7141

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]

disallowed-macros = [
Expand Down
1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ postgres.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
procfs.workspace = true
rand.workspace = true
regex.workspace = true
scopeguard.workspace = true
Expand Down
27 changes: 23 additions & 4 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2465,23 +2465,22 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}

pub mod tokio_epoll_uring {
use metrics::UIntGauge;
use metrics::{register_int_counter, UIntGauge};
use once_cell::sync::Lazy;

pub struct Collector {
descs: Vec<metrics::core::Desc>,
systems_created: UIntGauge,
systems_destroyed: UIntGauge,
}

const NMETRICS: usize = 2;

impl metrics::core::Collector for Collector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
self.descs.iter().collect()
}

fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(NMETRICS);
let mut mfs = Vec::with_capacity(Self::NMETRICS);
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
Expand All @@ -2495,6 +2494,8 @@ pub mod tokio_epoll_uring {
}

impl Collector {
const NMETRICS: usize = 2;

#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut descs = Vec::new();
Expand Down Expand Up @@ -2528,6 +2529,22 @@ pub mod tokio_epoll_uring {
}
}
}

pub(crate) static THREAD_LOCAL_LAUNCH_SUCCESSES: Lazy<metrics::IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_tokio_epoll_uring_pageserver_thread_local_launch_success_count",
"Number of times where thread_local_system creation spanned multiple executor threads",
)
.unwrap()
});

pub(crate) static THREAD_LOCAL_LAUNCH_FAILURES: Lazy<metrics::IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_tokio_epoll_uring_pageserver_thread_local_launch_failures_count",
"Number of times thread_local_system creation failed and was retried after back-off.",
)
.unwrap()
});
}

pub(crate) mod tenant_throttling {
Expand Down Expand Up @@ -2656,6 +2673,8 @@ pub fn preinitialize_metrics() {
&WALRECEIVER_BROKER_UPDATES,
&WALRECEIVER_CANDIDATES_ADDED,
&WALRECEIVER_CANDIDATES_REMOVED,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_FAILURES,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_SUCCESSES,
]
.into_iter()
.for_each(|c| {
Expand Down
14 changes: 9 additions & 5 deletions pageserver/src/virtual_file/io_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
//! Initialize using [`init`].
//!
//! Then use [`get`] and [`super::OpenOptions`].
//!
//!

pub(super) mod tokio_epoll_uring_ext;

use tokio_epoll_uring::{IoBuf, Slice};
use tracing::Instrument;
Expand Down Expand Up @@ -145,7 +149,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -160,7 +164,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fsync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -178,7 +182,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fdatasync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -197,7 +201,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.statx(file_guard).await;
(
resources,
Expand All @@ -220,7 +224,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.write(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand Down
195 changes: 195 additions & 0 deletions pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
//! handling in case the instance can't launched.
//!
//! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
//! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
//! See https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391 for more details.

use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};

use tokio_epoll_uring::{System, SystemHandle};

use crate::virtual_file::on_fatal_io_error;

use crate::metrics::tokio_epoll_uring as metrics;

#[derive(Clone)]
struct ThreadLocalState(Arc<ThreadLocalStateInner>);

struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle>,
launch_attempts: AtomicU32,
}

impl ThreadLocalState {
pub fn new() -> Self {
Self(Arc::new(ThreadLocalStateInner {
cell: tokio::sync::OnceCell::default(),
launch_attempts: AtomicU32::new(0),
}))
}
pub fn make_id_string(&self) -> String {
format!("0x{:p}", Arc::as_ptr(&self.0))
}
}

impl Drop for ThreadLocalState {
fn drop(&mut self) {
info!(parent: None, id=%self.make_id_string(), "tokio-epoll-uring_ext: ThreadLocalState is being dropped and id might be re-used in the future");
}
}

thread_local! {
static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
}

/// Panics if we cannot [`System::launch`].
pub async fn thread_local_system() -> Handle {
let fake_cancel = CancellationToken::new();
loop {
let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
let inner = &thread_local_state.0;
// NB: thread_id becomes stale after first await
problame marked this conversation as resolved.
Show resolved Hide resolved
let get_or_init_res = inner
.cell
.get_or_try_init(|| async {
let attempt_no = inner
.launch_attempts
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
async {
// Rate-limit retries per thread-local.
// NB: doesn't yield to executor at attempt_no=0.
problame marked this conversation as resolved.
Show resolved Hide resolved
utils::backoff::exponential_backoff(
attempt_no,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&fake_cancel,
)
.await;
let res = System::launch()
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {
Ok(system) => {
info!("successfully launched system");
metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
Ok(system)
}
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
warn!("not enough locked memory to tokio-epoll-uring, will retry");
info_span!("stats").in_scope(|| {
emit_launch_failure_process_stats();
});
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
Err(())
}
// abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
// This is equivalent to a fatal IO error.
Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
info_span!("stats").in_scope(|| {
emit_launch_failure_process_stats();
});
on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
},
}
}
.instrument(span)
.await
})
.await;
if get_or_init_res.is_ok() {
return Handle(thread_local_state);
}
}
}

fn emit_launch_failure_process_stats() {
// tokio-epoll-uring stats
// vmlck + rlimit
// number of threads
// rss / system memory usage generally

let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
info!(systems_created, systems_destroyed, "tokio-epoll-uring");

match procfs::process::Process::myself() {
Ok(myself) => {
match myself.limits() {
Ok(limits) => {
info!(?limits.max_locked_memory, "/proc/self/limits");
}
Err(error) => {
info!(%error, "no limit stats due to error");
}
}

match myself.status() {
Ok(status) => {
let procfs::process::Status {
vmsize,
vmlck,
vmpin,
vmrss,
rssanon,
rssfile,
rssshmem,
vmdata,
vmstk,
vmexe,
vmlib,
vmpte,
threads,
..
} = status;
info!(
vmsize,
vmlck,
vmpin,
vmrss,
rssanon,
rssfile,
rssshmem,
vmdata,
vmstk,
vmexe,
vmlib,
vmpte,
threads,
"/proc/self/status"
);
}
Err(error) => {
info!(%error, "no status status due to error");
}
}
}
Err(error) => {
info!(%error, "no process stats due to error");
}
};
}

#[derive(Clone)]
pub struct Handle(ThreadLocalState);

impl std::ops::Deref for Handle {
type Target = SystemHandle;

fn deref(&self) -> &Self::Target {
self.0
.0
.cell
.get()
.expect("must be already initialized when using this")
}
}
2 changes: 1 addition & 1 deletion pageserver/src/virtual_file/open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl OpenOptions {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
Expand Down
Loading