Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archival: rely on the batch formation logic in remote #13916

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 35 additions & 78 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2164,8 +2164,8 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
throw std::system_error(backlog.error());
}

std::deque<std::filesystem::path> objects_to_remove;
std::deque<std::filesystem::path> manifests_to_remove;
std::deque<cloud_storage_clients::object_key> objects_to_remove;
std::deque<cloud_storage_clients::object_key> manifests_to_remove;

const auto clean_offset = manifest().get_archive_clean_offset();
const auto start_offset = manifest().get_archive_start_offset();
Expand Down Expand Up @@ -2216,8 +2216,13 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
continue;
}
if (meta.committed_offset < start_offset) {
auto path = manifest.generate_segment_path(meta);
objects_to_remove.push_back(path());
const auto path = manifest.generate_segment_path(meta);
vlog(
_rtclog.info,
"Enqueuing spillover segment delete from cloud "
"storage: {}",
path);
objects_to_remove.emplace_back(path);
new_clean_offset = model::next_offset(
meta.committed_offset);
bytes_to_remove += meta.size_bytes;
Expand All @@ -2227,10 +2232,10 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
meta.sname_format
== cloud_storage::segment_name_format::v3
&& meta.metadata_size_hint != 0) {
objects_to_remove.push_back(
cloud_storage::generate_remote_tx_path(path)());
objects_to_remove.emplace_back(
cloud_storage::generate_remote_tx_path(path));
}
objects_to_remove.push_back(
objects_to_remove.emplace_back(
cloud_storage::generate_index_path(path));
} else {
// This indicates that we need to remove only some of the
Expand All @@ -2246,8 +2251,14 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
if (stop) {
break;
}
auto path = cursor->manifest()->get_manifest_path();
manifests_to_remove.push_back(path());
const auto path = cursor->manifest()->get_manifest_path();
vlog(
_rtclog.info,
"Enqueuing spillover manifest delete from cloud "
"storage: {}",
path);
manifests_to_remove.emplace_back(path);

auto res = co_await cursor->next();
if (res.has_failure()) {
vlog(
Expand Down Expand Up @@ -2278,43 +2289,16 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
}
}

size_t successful_deletes{0};
size_t successful_segment_deletes{0};
size_t segments_in_batch{0};
const size_t batch_size = 1000;
std::vector<cloud_storage_clients::object_key> rem_batch;
for (const auto& path : objects_to_remove) {
std::string_view path_view{path.c_str()};
if (!path_view.ends_with("index") && !path_view.ends_with("tx")) {
vlog(
_rtclog.info,
"Deleting spillover segment from cloud storage: {}",
path);
++segments_in_batch;
}
rem_batch.emplace_back(path);
if (rem_batch.size() >= batch_size) {
auto sz = rem_batch.size();
std::vector<cloud_storage_clients::object_key> tmp;
std::swap(tmp, rem_batch);
if (co_await batch_delete(std::move(tmp))) {
successful_deletes += sz;
successful_segment_deletes += segments_in_batch;
}

segments_in_batch = 0;
}
}
if (co_await batch_delete(rem_batch)) {
successful_deletes += rem_batch.size();
successful_segment_deletes += segments_in_batch;
}
rem_batch.clear();

retry_chain_node fib(
_conf->garbage_collect_timeout,
_conf->cloud_storage_initial_backoff,
&_rtcnode);
const auto delete_result = co_await _remote.delete_objects(
get_bucket_name(), objects_to_remove, fib);
const auto backlog_size_exceeded = segments_to_remove_count
> _max_segments_pending_deletion();
const auto all_deletes_succeeded = successful_deletes
== objects_to_remove.size();
const auto all_deletes_succeeded = delete_result
== cloud_storage::upload_result::success;

if (!all_deletes_succeeded && backlog_size_exceeded) {
vlog(
Expand Down Expand Up @@ -2346,44 +2330,17 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
"Failed to clean up metadata after garbage collection: {}",
error);
} else {
// Remove manifests only if metadata no longer references them
for (const auto& path : manifests_to_remove) {
vlog(
_rtclog.info,
"Deleting spillover manifest from cloud storage: {}",
path);
rem_batch.emplace_back(path);
if (rem_batch.size() >= batch_size) {
std::vector<cloud_storage_clients::object_key> tmp;
std::swap(tmp, rem_batch);
co_await batch_delete(std::move(tmp));
}
}
co_await batch_delete(rem_batch);
std::ignore = co_await _remote.delete_objects(
get_bucket_name(), manifests_to_remove, fib);
}
}
_probe->segments_deleted(static_cast<int64_t>(successful_segment_deletes));

_probe->segments_deleted(static_cast<int64_t>(
all_deletes_succeeded ? segments_to_remove_count : 0));
vlog(
_rtclog.info,
_rtclog.debug,
"Deleted {} spillover segments from the cloud",
successful_deletes);
}

ss::future<bool> ntp_archiver::batch_delete(
std::vector<cloud_storage_clients::object_key> keys) {
// Do batch delete, the batch size should be below the limit
auto timeout = config::shard_local_cfg()
.cloud_storage_segment_upload_timeout_ms.value();
auto backoff
= config::shard_local_cfg().cloud_storage_initial_backoff_ms.value();
retry_chain_node fib(timeout, backoff, &_rtcnode);
auto res = co_await _remote.delete_objects(
get_bucket_name(), std::move(keys), fib);
if (res != cloud_storage::upload_result::success) {
vlog(_rtclog.error, "Failed to delete objects", res);
co_return false;
}
co_return true;
all_deletes_succeeded ? segments_to_remove_count : 0);
}

ss::future<> ntp_archiver::apply_spillover() {
Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/tests/tiered_storage_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ def make_validators(self):
return [
LogBasedValidator(
"TS_Spillover_DeleteByGC_log",
"ntp_archiver_service.*Deleting spillover segment from cloud storage",
"ntp_archiver_service.*Enqueuing spillover segment delete from cloud storage",
execution_stage=TestRunStage.Intermediate),
]

Expand Down Expand Up @@ -1344,7 +1344,7 @@ def make_validators(self):
return [
LogBasedValidator(
"TS_Spillover_ManifestDeleted_log",
"ntp_archiver_service.*Deleting spillover manifest from cloud storage",
"ntp_archiver_service.*Enqueuing spillover manifest delete from cloud",
confidence_threshold=LOW_THRESHOLD,
execution_stage=TestRunStage.Intermediate),
]
Expand Down