Skip to content

Commit

Permalink
Merge pull request #13916 from nvartolomei/nv/delete-objects-cleanup
Browse files Browse the repository at this point in the history
archival: rely on the batch formation logic in remote
  • Loading branch information
nvartolomei authored Oct 9, 2023
2 parents f8233aa + 90df5be commit 4c8aefb
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 80 deletions.
113 changes: 35 additions & 78 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2185,8 +2185,8 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
throw std::system_error(backlog.error());
}

std::deque<std::filesystem::path> objects_to_remove;
std::deque<std::filesystem::path> manifests_to_remove;
std::deque<cloud_storage_clients::object_key> objects_to_remove;
std::deque<cloud_storage_clients::object_key> manifests_to_remove;

const auto clean_offset = manifest().get_archive_clean_offset();
const auto start_offset = manifest().get_archive_start_offset();
Expand Down Expand Up @@ -2237,8 +2237,13 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
continue;
}
if (meta.committed_offset < start_offset) {
auto path = manifest.generate_segment_path(meta);
objects_to_remove.push_back(path());
const auto path = manifest.generate_segment_path(meta);
vlog(
_rtclog.info,
"Enqueuing spillover segment delete from cloud "
"storage: {}",
path);
objects_to_remove.emplace_back(path);
new_clean_offset = model::next_offset(
meta.committed_offset);
bytes_to_remove += meta.size_bytes;
Expand All @@ -2248,10 +2253,10 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
meta.sname_format
== cloud_storage::segment_name_format::v3
&& meta.metadata_size_hint != 0) {
objects_to_remove.push_back(
cloud_storage::generate_remote_tx_path(path)());
objects_to_remove.emplace_back(
cloud_storage::generate_remote_tx_path(path));
}
objects_to_remove.push_back(
objects_to_remove.emplace_back(
cloud_storage::generate_index_path(path));
} else {
// This indicates that we need to remove only some of the
Expand All @@ -2267,8 +2272,14 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
if (stop) {
break;
}
auto path = cursor->manifest()->get_manifest_path();
manifests_to_remove.push_back(path());
const auto path = cursor->manifest()->get_manifest_path();
vlog(
_rtclog.info,
"Enqueuing spillover manifest delete from cloud "
"storage: {}",
path);
manifests_to_remove.emplace_back(path);

auto res = co_await cursor->next();
if (res.has_failure()) {
if (res.error() == cloud_storage::error_outcome::shutting_down) {
Expand Down Expand Up @@ -2306,43 +2317,16 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
}
}

size_t successful_deletes{0};
size_t successful_segment_deletes{0};
size_t segments_in_batch{0};
const size_t batch_size = 1000;
std::vector<cloud_storage_clients::object_key> rem_batch;
for (const auto& path : objects_to_remove) {
std::string_view path_view{path.c_str()};
if (!path_view.ends_with("index") && !path_view.ends_with("tx")) {
vlog(
_rtclog.info,
"Deleting spillover segment from cloud storage: {}",
path);
++segments_in_batch;
}
rem_batch.emplace_back(path);
if (rem_batch.size() >= batch_size) {
auto sz = rem_batch.size();
std::vector<cloud_storage_clients::object_key> tmp;
std::swap(tmp, rem_batch);
if (co_await batch_delete(std::move(tmp))) {
successful_deletes += sz;
successful_segment_deletes += segments_in_batch;
}

segments_in_batch = 0;
}
}
if (co_await batch_delete(rem_batch)) {
successful_deletes += rem_batch.size();
successful_segment_deletes += segments_in_batch;
}
rem_batch.clear();

retry_chain_node fib(
_conf->garbage_collect_timeout,
_conf->cloud_storage_initial_backoff,
&_rtcnode);
const auto delete_result = co_await _remote.delete_objects(
get_bucket_name(), objects_to_remove, fib);
const auto backlog_size_exceeded = segments_to_remove_count
> _max_segments_pending_deletion();
const auto all_deletes_succeeded = successful_deletes
== objects_to_remove.size();
const auto all_deletes_succeeded = delete_result
== cloud_storage::upload_result::success;

if (!all_deletes_succeeded && backlog_size_exceeded) {
vlog(
Expand Down Expand Up @@ -2374,44 +2358,17 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
"Failed to clean up metadata after garbage collection: {}",
error);
} else {
// Remove manifests only if metadata no longer references them
for (const auto& path : manifests_to_remove) {
vlog(
_rtclog.info,
"Deleting spillover manifest from cloud storage: {}",
path);
rem_batch.emplace_back(path);
if (rem_batch.size() >= batch_size) {
std::vector<cloud_storage_clients::object_key> tmp;
std::swap(tmp, rem_batch);
co_await batch_delete(std::move(tmp));
}
}
co_await batch_delete(rem_batch);
std::ignore = co_await _remote.delete_objects(
get_bucket_name(), manifests_to_remove, fib);
}
}
_probe->segments_deleted(static_cast<int64_t>(successful_segment_deletes));

_probe->segments_deleted(static_cast<int64_t>(
all_deletes_succeeded ? segments_to_remove_count : 0));
vlog(
_rtclog.info,
_rtclog.debug,
"Deleted {} spillover segments from the cloud",
successful_deletes);
}

ss::future<bool> ntp_archiver::batch_delete(
std::vector<cloud_storage_clients::object_key> keys) {
// Do batch delete, the batch size should be below the limit
auto timeout = config::shard_local_cfg()
.cloud_storage_segment_upload_timeout_ms.value();
auto backoff
= config::shard_local_cfg().cloud_storage_initial_backoff_ms.value();
retry_chain_node fib(timeout, backoff, &_rtcnode);
auto res = co_await _remote.delete_objects(
get_bucket_name(), std::move(keys), fib);
if (res != cloud_storage::upload_result::success) {
vlog(_rtclog.error, "Failed to delete objects", res);
co_return false;
}
co_return true;
all_deletes_succeeded ? segments_to_remove_count : 0);
}

ss::future<> ntp_archiver::apply_spillover() {
Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/tests/tiered_storage_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ def make_validators(self):
return [
LogBasedValidator(
"TS_Spillover_DeleteByGC_log",
"ntp_archiver_service.*Deleting spillover segment from cloud storage",
"ntp_archiver_service.*Enqueuing spillover segment delete from cloud storage",
execution_stage=TestRunStage.Intermediate),
]

Expand Down Expand Up @@ -1346,7 +1346,7 @@ def make_validators(self):
return [
LogBasedValidator(
"TS_Spillover_ManifestDeleted_log",
"ntp_archiver_service.*Deleting spillover manifest from cloud storage",
"ntp_archiver_service.*Enqueuing spillover manifest delete from cloud",
confidence_threshold=LOW_THRESHOLD,
execution_stage=TestRunStage.Intermediate),
]
Expand Down

0 comments on commit 4c8aefb

Please sign in to comment.