Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resume timeline deletions on attach #5030

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,9 @@ impl Tenant {
.instrument(info_span!("download_index_part", %timeline_id)),
);
}

let mut timelines_to_resume_deletions = vec![];

// Wait for all the download tasks to complete & collect results.
let mut remote_index_and_client = HashMap::new();
let mut timeline_ancestors = HashMap::new();
Expand All @@ -635,9 +638,12 @@ impl Tenant {
);
remote_index_and_client.insert(timeline_id, (index_part, client));
}
MaybeDeletedIndexPart::Deleted(_) => {
info!("timeline {} is deleted, skipping", timeline_id);
continue;
MaybeDeletedIndexPart::Deleted(index_part) => {
info!(
"timeline {} is deleted, picking to resume deletion",
timeline_id
);
timelines_to_resume_deletions.push((timeline_id, index_part, client));
}
}
}
Expand All @@ -662,6 +668,25 @@ impl Tenant {
})?;
}

// Walk through deleted timelines, resume deletion
LizardWizzard marked this conversation as resolved.
Show resolved Hide resolved
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
remote_timeline_client
.init_upload_queue_stopped_to_continue_deletion(&index_part)
.context("init queue stopped")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;

DeleteTimelineFlow::resume_deletion(
Arc::clone(self),
timeline_id,
&index_part.parse_metadata().context("parse_metadata")?,
Some(remote_timeline_client),
None,
)
.await
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}

std::fs::remove_file(&marker_file)
.with_context(|| format!("unlink attach marker file {}", marker_file.display()))?;
crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
Expand Down
125 changes: 122 additions & 3 deletions test_runner/regress/test_timeline_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
MANY_SMALL_LAYERS_TENANT_CONFIG,
assert_prefix_empty,
assert_prefix_not_empty,
poll_for_remote_storage_iterations,
Expand All @@ -34,7 +35,7 @@
available_s3_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
from fixtures.utils import query_scalar, run_pg_bench_small, wait_until


def test_timeline_delete(neon_simple_env: NeonEnv):
Expand Down Expand Up @@ -208,7 +209,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
timeline_id = env.neon_cli.create_timeline("delete")
with env.endpoints.create_start("delete") as endpoint:
# generate enough layers
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", endpoint.connstr()])
run_pg_bench_small(pg_bin, endpoint.connstr())
if remote_storage_kind is RemoteStorageKind.NOOP:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline_id)
else:
Expand Down Expand Up @@ -812,7 +813,7 @@ def test_delete_orphaned_objects(
timeline_id = env.neon_cli.create_timeline("delete")
with env.endpoints.create_start("delete") as endpoint:
# generate enough layers
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", endpoint.connstr()])
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)

# write orphaned file that is missing from the index
Expand Down Expand Up @@ -848,3 +849,121 @@ def test_delete_orphaned_objects(
)

assert env.remote_storage.index_path(env.initial_tenant, timeline_id).exists()


@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_timeline_delete_resumed_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_deleted_tenant_ignored_on_attach",
)

env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)

tenant_id = env.initial_tenant

ps_http = env.pageserver.http_client()

timeline_id = env.neon_cli.create_timeline("delete")
with env.endpoints.create_start("delete") as endpoint:
# generate enough layers
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)

if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)

# failpoint before we remove index_part from s3
failpoint = "timeline-delete-during-rm"
ps_http.configure_failpoints((failpoint, "return"))

env.pageserver.allowed_errors.extend(
(
# allow errors caused by failpoints
f".*failpoint: {failpoint}",
# It appears when we stopped flush loop during deletion (attempt) and then pageserver is stopped
".*freeze_and_flush_on_shutdown.*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
# error from http response is also logged
".*InternalServerError\\(Tenant is marked as deleted on remote storage.*",
# Polling after attach may fail with this
f".*InternalServerError\\(Tenant {tenant_id} is not active.*",
'.*shutdown_pageserver{exit_code=0}: stopping left-over name="remote upload".*',
)
)

iterations = poll_for_remote_storage_iterations(remote_storage_kind)

ps_http.timeline_delete(tenant_id, timeline_id)

timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
timeline_id=timeline_id,
expected_state="Broken",
iterations=iterations,
)

reason = timeline_info["state"]["Broken"]["reason"]
log.info(f"timeline broken: {reason}")

# failpoint may not be the only error in the stack
assert reason.endswith(f"failpoint: {failpoint}"), reason

if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
"timelines",
str(timeline_id),
)
),
)

# now we stop pageserver and remove local tenant state
env.endpoints.stop_all()
env.pageserver.stop()

dir_to_clear = Path(env.repo_dir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

env.pageserver.start()

# now we call attach
ps_http.tenant_attach(tenant_id=tenant_id)

# delete should be resumed
wait_timeline_detail_404(ps_http, env.initial_tenant, timeline_id, iterations=iterations)

tenant_path = env.timeline_dir(tenant_id=tenant_id, timeline_id=timeline_id)
assert not tenant_path.exists()

if remote_storage_kind in available_s3_storages():
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(timeline_id),
"timelines",
str(timeline_id),
)
),
)