Skip to content

Commit

Permalink
Revert "pageserver: use a single tokio runtime (#6555)" (#7246)
Browse files Browse the repository at this point in the history
  • Loading branch information
problame authored Mar 26, 2024
1 parent 6c18109 commit ad072de
Show file tree
Hide file tree
Showing 20 changed files with 131 additions and 92 deletions.
82 changes: 45 additions & 37 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::{secondary, TenantSharedResources};
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
use tracing::*;

Expand All @@ -28,7 +28,7 @@ use pageserver::{
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::THE_RUNTIME,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
Expand Down Expand Up @@ -323,7 +323,7 @@ fn start_pageserver(

// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = THE_RUNTIME
let broker_client = WALRECEIVER_RUNTIME
.block_on(async {
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
Expand Down Expand Up @@ -391,7 +391,7 @@ fn start_pageserver(
conf,
);
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(THE_RUNTIME.handle());
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
}

// Up to this point no significant I/O has been done: this should have been fast. Record
Expand Down Expand Up @@ -423,7 +423,7 @@ fn start_pageserver(

// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = THE_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
Expand All @@ -435,7 +435,7 @@ fn start_pageserver(
))?;
let tenant_manager = Arc::new(tenant_manager);

THE_RUNTIME.spawn({
BACKGROUND_RUNTIME.spawn({
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
Expand Down Expand Up @@ -545,7 +545,7 @@ fn start_pageserver(
// Start up the service to handle HTTP mgmt API request. We created the
// listener earlier already.
{
let _rt_guard = THE_RUNTIME.enter();
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();

let router_state = Arc::new(
http::routes::State::new(
Expand All @@ -569,6 +569,7 @@ fn start_pageserver(
.with_graceful_shutdown(task_mgr::shutdown_watcher());

task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::HttpEndpointListener,
None,
None,
Expand All @@ -593,6 +594,7 @@ fn start_pageserver(
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");

task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
Expand Down Expand Up @@ -641,6 +643,7 @@ fn start_pageserver(
DownloadBehavior::Error,
);
task_mgr::spawn(
COMPUTE_REQUEST_RUNTIME.handle(),
TaskKind::LibpqEndpointListener,
None,
None,
Expand All @@ -664,37 +667,42 @@ fn start_pageserver(
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());

// All started up! Now just sit and wait for shutdown signal.

{
THE_RUNTIME.block_on(async move {
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode",);
std::process::exit(111);
}
_ = sigint.recv() => { "SIGINT" },
_ = sigterm.recv() => { "SIGTERM" },
};

info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);

// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
)
.await;
unreachable!()
})
use signal_hook::consts::*;
let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || {
let mut signals =
signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
return signals
.forever()
.next()
.expect("forever() never returns None unless explicitly closed");
});
let signal = BACKGROUND_RUNTIME
.block_on(signal_handler)
.expect("join error");
match signal {
SIGQUIT => {
info!("Got signal {signal}. Terminating in immediate shutdown mode",);
std::process::exit(111);
}
SIGINT | SIGTERM => {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);

// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
));
unreachable!()
}
_ => unreachable!(),
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
use camino::Utf8PathBuf;
Expand Down Expand Up @@ -61,6 +61,7 @@ pub async fn collect_metrics(
let worker_ctx =
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::CalculateSyntheticSize,
None,
None,
Expand Down
4 changes: 3 additions & 1 deletion pageserver/src/control_plane_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
register,
};

fail::fail_point!("control-plane-client-re-attach");

let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
tracing::info!(
"Received re-attach response with {} tenants",
Expand Down Expand Up @@ -208,7 +210,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
.collect(),
};

crate::tenant::pausable_failpoint!("control-plane-client-validate");
fail::fail_point!("control-plane-client-validate");

let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;

Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use utils::{completion, id::TimelineId};
use crate::{
config::PageServerConf,
metrics::disk_usage_based_eviction::METRICS,
task_mgr::{self, TaskKind},
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
mgr::TenantManager,
Expand Down Expand Up @@ -202,6 +202,7 @@ pub fn launch_disk_usage_global_eviction_task(
info!("launching disk usage based eviction task");

task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub async fn libpq_listener_main(
// only deal with a particular timeline, but we don't know which one
// yet.
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::PageRequestHandler,
None,
None,
Expand Down
37 changes: 29 additions & 8 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,42 @@ use utils::id::TimelineId;
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
//
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker")
.enable_all()
.build()
.expect("Failed to create compute request runtime")
});

pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker")
.enable_all()
.build()
.expect("Failed to create mgmt request runtime")
});

pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker")
.enable_all()
.build()
.expect("Failed to create walreceiver runtime")
});

/// The single tokio runtime used by all pageserver code.
/// In the past, we had multiple runtimes, and in the future we should weed out
/// remaining references to this global field and rely on ambient runtime instead,
/// i.e., use `tokio::spawn` instead of `THE_RUNTIME.spawn()`, etc.
pub static THE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker")
// if you change the number of worker threads please change the constant below
.enable_all()
.build()
.expect("Failed to create background op runtime")
});

pub(crate) static THE_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
// force init and thus panics
let _ = THE_RUNTIME.handle();
let _ = BACKGROUND_RUNTIME.handle();
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
// tokio would had already panicked for parsing errors or NotUnicode
//
Expand Down Expand Up @@ -305,6 +325,7 @@ struct PageServerTask {
/// Note: if shutdown_process_on_error is set to true failure
/// of the task will lead to shutdown of entire process
pub fn spawn<F>(
runtime: &tokio::runtime::Handle,
kind: TaskKind,
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
Expand Down Expand Up @@ -333,7 +354,7 @@ where

let task_name = name.to_string();
let task_cloned = Arc::clone(&task);
let join_handle = THE_RUNTIME.spawn(task_wrapper(
let join_handle = runtime.spawn(task_wrapper(
task_name,
task_id,
task_cloned,
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ macro_rules! pausable_failpoint {
}
};
}
pub(crate) use pausable_failpoint;

pub mod blob_io;
pub mod block_io;
Expand Down Expand Up @@ -662,6 +661,7 @@ impl Tenant {
let tenant_clone = Arc::clone(&tenant);
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Attach,
Some(tenant_shard_id),
None,
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ impl DeleteTenantFlow {
let tenant_shard_id = tenant.tenant_shard_id;

task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(tenant_shard_id),
None,
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,7 @@ impl TenantManager {
let task_tenant_id = None;

task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
task_tenant_id,
None,
Expand Down Expand Up @@ -2815,12 +2816,15 @@ pub(crate) fn immediate_gc(

// TODO: spawning is redundant now, need to hold the gate
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
Some(timeline_id),
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
false,
async move {
fail::fail_point!("immediate_gc_task_pre");

#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
Expand Down
11 changes: 11 additions & 0 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ use crate::{
config::PageServerConf,
task_mgr,
task_mgr::TaskKind,
task_mgr::BACKGROUND_RUNTIME,
tenant::metadata::TimelineMetadata,
tenant::upload_queue::{
UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
Expand Down Expand Up @@ -297,6 +298,8 @@ pub enum PersistIndexPartWithDeletedFlagError {
pub struct RemoteTimelineClient {
conf: &'static PageServerConf,

runtime: tokio::runtime::Handle,

tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
Expand Down Expand Up @@ -329,6 +332,12 @@ impl RemoteTimelineClient {
) -> RemoteTimelineClient {
RemoteTimelineClient {
conf,
runtime: if cfg!(test) {
// remote_timeline_client.rs tests rely on current-thread runtime
tokio::runtime::Handle::current()
} else {
BACKGROUND_RUNTIME.handle().clone()
},
tenant_shard_id,
timeline_id,
generation,
Expand Down Expand Up @@ -1264,6 +1273,7 @@ impl RemoteTimelineClient {
let tenant_shard_id = self.tenant_shard_id;
let timeline_id = self.timeline_id;
task_mgr::spawn(
&self.runtime,
TaskKind::RemoteUploadTask,
Some(self.tenant_shard_id),
Some(self.timeline_id),
Expand Down Expand Up @@ -1857,6 +1867,7 @@ mod tests {
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
Arc::new(RemoteTimelineClient {
conf: self.harness.conf,
runtime: tokio::runtime::Handle::current(),
tenant_shard_id: self.harness.tenant_shard_id,
timeline_id: TIMELINE_ID,
generation,
Expand Down
Loading

1 comment on commit ad072de

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2798 tests run: 2646 passed, 3 failed, 149 skipped (full report)


Failures on Postgres 14

  • test_compare_child_and_root_read_perf[github-actions-selfhosted]: release
  • test_bulk_insert[neon-github-actions-selfhosted]: release
  • test_random_writes[neon-github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_compare_child_and_root_read_perf[release-pg14-github-actions-selfhosted] or test_bulk_insert[neon-release-pg14-github-actions-selfhosted] or test_random_writes[neon-release-pg14-github-actions-selfhosted]"

Code coverage* (full report)

  • functions: 28.2% (6295 of 22343 functions)
  • lines: 47.0% (44208 of 94138 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
ad072de at 2024-03-26T15:26:15.312Z :recycle:

Please sign in to comment.