Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: do graceful shutdown by default #8655

Merged
merged 13 commits into from
Aug 12, 2024
10 changes: 10 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use upload_queue::NotInitialized;
use utils::backoff;
use utils::circuit_breaker::CircuitBreaker;
use utils::completion;
Expand Down Expand Up @@ -601,6 +602,15 @@ impl From<PageReconstructError> for GcError {
}
}

impl From<NotInitialized> for GcError {
fn from(value: NotInitialized) -> Self {
match value {
NotInitialized::Uninitialized => GcError::Remote(value.into()),
NotInitialized::Stopped | NotInitialized::ShuttingDown => GcError::TimelineCancelled,
}
}
}

impl From<timeline::layer_manager::Shutdown> for GcError {
fn from(_: timeline::layer_manager::Shutdown) -> Self {
GcError::TimelineCancelled
Expand Down
5 changes: 4 additions & 1 deletion pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,10 @@ impl RemoteTimelineClient {
///
/// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
/// is invoked on them.
pub(crate) fn schedule_gc_update(self: &Arc<Self>, gc_layers: &[Layer]) -> anyhow::Result<()> {
pub(crate) fn schedule_gc_update(
self: &Arc<Self>,
gc_layers: &[Layer],
) -> Result<(), NotInitialized> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;

Expand Down
3 changes: 0 additions & 3 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,6 @@ impl ImageLayerInner {
self.lsn
}

/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
/// - outer has the permanent failure
pub(super) async fn load(
path: &Utf8Path,
lsn: Lsn,
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1848,8 +1848,8 @@ impl ResidentLayer {
/// Read all they keys in this layer which match the ShardIdentity, and write them all to
/// the provided writer. Return the number of keys written.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
pub(crate) async fn filter<'a>(
&'a self,
pub(crate) async fn filter(
&self,
shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter,
ctx: &RequestContext,
Expand Down
13 changes: 7 additions & 6 deletions pageserver/src/tenant/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
} else {
// Run compaction
match tenant.compaction_iteration(&cancel, &ctx).await {
Ok(has_pending_task) => {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
if has_pending_task { Duration::ZERO } else { period }
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
Expand All @@ -227,11 +232,6 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
);
wait_duration
}
Ok(has_pending_task) => {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
if has_pending_task { Duration::from_secs(0) } else { period }
}
}
};

Expand Down Expand Up @@ -265,7 +265,8 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
count_throttled,
sum_throttled_usecs,
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds")
"shard was throttled in the last n_seconds"
);
});

// Sleep
Expand Down
46 changes: 20 additions & 26 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4412,31 +4412,33 @@ impl From<CollectKeySpaceError> for CompactionError {
impl From<super::upload_queue::NotInitialized> for CompactionError {
fn from(value: super::upload_queue::NotInitialized) -> Self {
match value {
super::upload_queue::NotInitialized::Uninitialized
| super::upload_queue::NotInitialized::Stopped => {
super::upload_queue::NotInitialized::Uninitialized => {
CompactionError::Other(anyhow::anyhow!(value))
}
super::upload_queue::NotInitialized::ShuttingDown => CompactionError::ShuttingDown,
super::upload_queue::NotInitialized::ShuttingDown
| super::upload_queue::NotInitialized::Stopped => CompactionError::ShuttingDown,
}
}
}

impl CompactionError {
/// We cannot do compaction because we could not download a layer that is input to the compaction.
pub(crate) fn input_layer_download_failed(
e: super::storage_layer::layer::DownloadError,
) -> Self {
impl From<super::storage_layer::layer::DownloadError> for CompactionError {
fn from(e: super::storage_layer::layer::DownloadError) -> Self {
match e {
super::storage_layer::layer::DownloadError::TimelineShutdown |
/* TODO DownloadCancelled correct here? */
super::storage_layer::layer::DownloadError::DownloadCancelled => CompactionError::ShuttingDown,
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads |
super::storage_layer::layer::DownloadError::DownloadRequired |
super::storage_layer::layer::DownloadError::NotFile(_) |
super::storage_layer::layer::DownloadError::DownloadFailed |
super::storage_layer::layer::DownloadError::PreStatFailed(_)=>CompactionError::Other(anyhow::anyhow!(e)),
super::storage_layer::layer::DownloadError::TimelineShutdown
| super::storage_layer::layer::DownloadError::DownloadCancelled => {
CompactionError::ShuttingDown
}
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
| super::storage_layer::layer::DownloadError::DownloadRequired
| super::storage_layer::layer::DownloadError::NotFile(_)
| super::storage_layer::layer::DownloadError::DownloadFailed
| super::storage_layer::layer::DownloadError::PreStatFailed(_) => {
CompactionError::Other(anyhow::anyhow!(e))
}
#[cfg(test)]
super::storage_layer::layer::DownloadError::Failpoint(_) => CompactionError::Other(anyhow::anyhow!(e)),
super::storage_layer::layer::DownloadError::Failpoint(_) => {
CompactionError::Other(anyhow::anyhow!(e))
}
}
}
}
Expand Down Expand Up @@ -4990,15 +4992,7 @@ impl Timeline {

result.layers_removed = gc_layers.len() as u64;

self.remote_client
.schedule_gc_update(&gc_layers)
.map_err(|e| {
if self.cancel.is_cancelled() {
GcError::TimelineCancelled
} else {
GcError::Remote(e)
}
})?;
self.remote_client.schedule_gc_update(&gc_layers)?;

guard.open_mut()?.finish_gc_timeline(&gc_layers);

Expand Down
22 changes: 7 additions & 15 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,7 @@ impl Timeline {
// - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
// - GC, which at worst witnesses us "undelete" a layer that they just deleted.
// - ingestion, which only inserts layers, therefore cannot collide with us.
let resident = layer
.download_and_keep_resident()
.await
.map_err(CompactionError::input_layer_download_failed)?;
let resident = layer.download_and_keep_resident().await?;

let keys_written = resident
.filter(&self.shard_identity, &mut image_layer_writer, ctx)
Expand Down Expand Up @@ -693,23 +690,14 @@ impl Timeline {

let mut fully_compacted = true;

deltas_to_compact.push(
first_level0_delta
.download_and_keep_resident()
.await
.map_err(CompactionError::input_layer_download_failed)?,
);
deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
for l in level0_deltas_iter {
let lsn_range = &l.layer_desc().lsn_range;

if lsn_range.start != prev_lsn_end {
break;
}
deltas_to_compact.push(
l.download_and_keep_resident()
.await
.map_err(CompactionError::input_layer_download_failed)?,
);
deltas_to_compact.push(l.download_and_keep_resident().await?);
deltas_to_compact_bytes += l.metadata().file_size;
prev_lsn_end = lsn_range.end;

Expand Down Expand Up @@ -1137,6 +1125,10 @@ impl Timeline {

if !self.shard_identity.is_key_disposable(&key) {
if writer.is_none() {
if self.cancel.is_cancelled() {
skyzh marked this conversation as resolved.
Show resolved Hide resolved
// to be somewhat responsive to cancellation, check for each new layer
return Err(CompactionError::ShuttingDown);
}
// Create writer if not initiaized yet
writer = Some(
DeltaLayerWriter::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ pub(super) async fn handle_walreceiver_connection(
filtered_records += 1;
}

// FIXME: this cannot be made pausable_failpoint without fixing the
// failpoint library; in tests, the added amount of debugging will cause us
// to timeout the tests.
fail_point!("walreceiver-after-ingest");

last_rec_lsn = lsn;
Expand Down
2 changes: 1 addition & 1 deletion test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ def __exit__(
if self.env:
log.info("Cleaning up all storage and compute nodes")
self.env.stop(
immediate=True,
immediate=False,
# if the test threw an exception, don't check for errors
# as a failing assertion would cause the cleanup below to fail
ps_assert_metric_no_errors=(exc_type is None),
Expand Down
6 changes: 5 additions & 1 deletion test_runner/regress/test_ancestor_branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
}
)

pageserver_http.configure_failpoints(("flush-frozen-pausable", "sleep(10000)"))
failpoint = "flush-frozen-pausable"

pageserver_http.configure_failpoints((failpoint, "sleep(10000)"))

endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
branch0_cur = endpoint_branch0.connect().cursor()
Expand Down Expand Up @@ -96,3 +98,5 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
assert query_scalar(branch1_cur, "SELECT count(*) FROM foo") == 200000

assert query_scalar(branch2_cur, "SELECT count(*) FROM foo") == 300000

pageserver_http.configure_failpoints((failpoint, "off"))
7 changes: 7 additions & 0 deletions test_runner/regress/test_timeline_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,3 +1137,10 @@ def lazy_tenant_is_active():
delete_lazy_activating(lazy_tenant, env.pageserver, expect_attaching=True)
else:
raise RuntimeError(activation_method)

client.configure_failpoints(
[
("timeline-calculate-logical-size-pause", "off"),
("walreceiver-after-ingest", "off"),
]
)
Loading