Skip to content

Commit

Permalink
cloud_storage: Pass abort source when waiting for units
Browse files Browse the repository at this point in the history
When the code path to make a remote partition/segment reader waits for
units, this change passes the connection abort source to the wait, so
that if the connection is killed while waiting, we can end the wait
early.

The calls during aborted_transactions do not use the abort source, but
they will also possibly need to accept the abort source, this may
require changes higher up the call chain and can be done in a later PR.
  • Loading branch information
abhijat committed Nov 3, 2023
1 parent f4c7426 commit 450b36b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
27 changes: 21 additions & 6 deletions src/v/cloud_storage/materialized_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,19 @@ void materialized_resources::register_segment(materialized_segment_state& s) {
_materialized.push_back(s);
}

namespace {

ss::future<ssx::semaphore_units> get_units_abortable(
adjustable_semaphore& sem, ssize_t units, storage::opt_abort_source_t as) {
return as.has_value() ? sem.get_units(units, as.value())
: sem.get_units(units);
}

} // namespace

ss::future<segment_reader_units>
materialized_resources::get_segment_reader_units() {
materialized_resources::get_segment_reader_units(
storage::opt_abort_source_t as) {
// Estimate segment reader memory requirements
auto size_bytes = projected_remote_segment_reader_memory_usage();
if (_mem_units.available_units() <= size_bytes) {
Expand All @@ -431,19 +442,22 @@ materialized_resources::get_segment_reader_units() {
trim_segment_readers(max_memory_utilization() / 2);
}

auto semaphore_units = co_await _mem_units.get_units(size_bytes);
auto semaphore_units = co_await get_units_abortable(
_mem_units, size_bytes, as);
co_return segment_reader_units{std::move(semaphore_units)};
}

ss::future<ssx::semaphore_units>
materialized_resources::get_partition_reader_units() {
materialized_resources::get_partition_reader_units(
storage::opt_abort_source_t as) {
auto sz = projected_remote_partition_reader_memory_usage();
if (_mem_units.available_units() <= sz) {
// Update metrics counter if we are trying to acquire units while
// saturated
_partition_readers_delayed += 1;
}
return _mem_units.get_units(sz);

return get_units_abortable(_mem_units, sz, as);
}

ss::future<ssx::semaphore_units>
Expand All @@ -452,7 +466,8 @@ materialized_resources::get_hydration_units(size_t n) {
co_return std::move(u);
}

ss::future<segment_units> materialized_resources::get_segment_units() {
ss::future<segment_units>
materialized_resources::get_segment_units(storage::opt_abort_source_t as) {
auto sz = projected_remote_segment_memory_usage();
if (_mem_units.available_units() <= sz) {
// Update metrics counter if we are trying to acquire units while
Expand All @@ -461,7 +476,7 @@ ss::future<segment_units> materialized_resources::get_segment_units() {

trim_segments(max_memory_utilization() / 2);
}
auto semaphore_units = co_await _mem_units.get_units(sz);
auto semaphore_units = co_await get_units_abortable(_mem_units, sz, as);
co_return segment_units{std::move(semaphore_units)};
}

Expand Down
8 changes: 5 additions & 3 deletions src/v/cloud_storage/materialized_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ class materialized_resources {

void register_segment(materialized_segment_state& s);

ss::future<segment_reader_units> get_segment_reader_units();
ss::future<segment_reader_units>
get_segment_reader_units(storage::opt_abort_source_t as);

ss::future<ssx::semaphore_units> get_partition_reader_units();
ss::future<ssx::semaphore_units>
get_partition_reader_units(storage::opt_abort_source_t as);

ss::future<segment_units> get_segment_units();
ss::future<segment_units> get_segment_units(storage::opt_abort_source_t as);

materialized_manifest_cache& get_materialized_manifest_cache();

Expand Down
24 changes: 14 additions & 10 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,11 @@ class partition_record_batch_reader_impl final
}

ss::future<> init_cursor(storage::log_reader_config config) {
auto segment_unit
= co_await _partition->materialized().get_segment_units();
auto segment_unit = co_await _partition->materialized()
.get_segment_units(config.abort_source);
auto segment_reader_unit
= co_await _partition->materialized().get_segment_reader_units();
= co_await _partition->materialized().get_segment_reader_units(
config.abort_source);

async_view_search_query_t query;
if (config.first_timestamp.has_value()) {
Expand Down Expand Up @@ -661,11 +662,11 @@ class partition_record_batch_reader_impl final
_next_segment_base_offset,
_view_cursor->get_status());

auto segment_unit
= co_await _partition->materialized().get_segment_units();
auto segment_unit = co_await _partition->materialized()
.get_segment_units(config.abort_source);
auto segment_reader_unit
= co_await _partition->materialized().get_segment_reader_units();

= co_await _partition->materialized().get_segment_reader_units(
config.abort_source);
auto maybe_manifest = _view_cursor->manifest();
if (
maybe_manifest.has_value()
Expand Down Expand Up @@ -933,7 +934,8 @@ remote_partition::aborted_transactions(offset_range offsets) {
// in a failure to materialise. This should be transient however.
// One solution for this is to grab all the required segment units
// up front at the start of the function.
auto segment_unit = co_await materialized().get_segment_units();
auto segment_unit = co_await materialized().get_segment_units(
std::nullopt);
auto path = stm_manifest.generate_segment_path(*it);
auto m = get_or_materialize_segment(
path, *it, std::move(segment_unit));
Expand Down Expand Up @@ -981,7 +983,8 @@ remote_partition::aborted_transactions(offset_range offsets) {
});

for (const auto& [meta, path] : meta_to_materialize) {
auto segment_unit = co_await materialized().get_segment_units();
auto segment_unit = co_await materialized().get_segment_units(
std::nullopt);
auto m = get_or_materialize_segment(
path, meta, std::move(segment_unit));
auto tx = co_await m->second->segment->aborted_transactions(
Expand Down Expand Up @@ -1087,7 +1090,8 @@ ss::future<storage::translating_reader> remote_partition::make_reader(
config,
_segments.size());

auto units = co_await _api.materialized().get_partition_reader_units();
auto units = co_await _api.materialized().get_partition_reader_units(
config.abort_source);
auto ot_state = ss::make_lw_shared<storage::offset_translator_state>(
get_ntp());
auto impl = std::make_unique<partition_record_batch_reader_impl>(
Expand Down

0 comments on commit 450b36b

Please sign in to comment.