Skip to content

Commit

Permalink
cloud_storage: Make wait for hydration abortable
Browse files Browse the repository at this point in the history
When a consumer is waiting for hydration, the underlying kafka
connection may disconnect. The read path will still wait for the
hydration to finish.

This change adds a subscription to the abort source wired in with the
kafka connection - if that abort source is triggered, which happens
when the consumer disconnects, the read path fiber waiting for the
hydration is notified by setting the promise to aborted exception.

The download will still progress, so if there are multiple waiters for a
segment, and one of them disconnects, the others will still get access
to the file.
  • Loading branch information
abhijat committed Dec 7, 2023
1 parent f9565f4 commit 2b0e83a
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 13 deletions.
33 changes: 24 additions & 9 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ using namespace std::chrono_literals;

static constexpr size_t max_consume_size = 128_KiB;

inline void expiry_handler_impl(ss::promise<ss::file>& pr) {
inline void expiry_handler_impl(abortable_promise& pr) {
pr.set_exception(ss::timed_out_error());
}

Expand Down Expand Up @@ -278,10 +278,11 @@ remote_segment::offset_data_stream(
kafka::offset start,
kafka::offset end,
std::optional<model::timestamp> first_timestamp,
ss::io_priority_class io_priority) {
ss::io_priority_class io_priority,
storage::opt_abort_source_t as) {
vlog(_ctxlog.debug, "remote segment file input stream at offset {}", start);
ss::gate::holder g(_gate);
co_await hydrate();
co_await hydrate(as);

std::optional<offset_index::find_result> indexed_pos;
std::optional<uint16_t> prefetch_override = std::nullopt;
Expand Down Expand Up @@ -872,7 +873,7 @@ ss::future<> remote_segment::run_hydrate_bg() {
_hydration_loop_running = false;
}

ss::future<> remote_segment::hydrate() {
ss::future<> remote_segment::hydrate(storage::opt_abort_source_t as) {
if (!_hydration_loop_running) {
vlog(
_ctxlog.error,
Expand All @@ -887,11 +888,25 @@ ss::future<> remote_segment::hydrate() {
auto g = _gate.hold();
vlog(_ctxlog.debug, "segment {} hydration requested", _path);
ss::promise<ss::file> p;
auto fut = p.get_future();
_wait_list.push_back(std::move(p), ss::lowres_clock::time_point::max());

ss::optimized_optional<ss::abort_source::subscription> subscription;
auto fut = ss::shared_future<ss::file>{p.get_future()};
if (as.has_value()) {
subscription = as->get().subscribe(
[&p, f = fut.get_future()]() noexcept {
if (!f.available()) {
p.set_exception(ss::abort_requested_exception{});
}
});
}

_wait_list.push_back(
abortable_promise{
std::move(p), fut.get_future(), std::move(subscription)},
ss::lowres_clock::time_point::max());
_bg_cvar.signal();
return fut
.handle_exception_type([this](const download_exception& ex) {
return fut.get_future()
.handle_exception_type([this, &as](const download_exception& ex) {
// If we are working with an index-only format, and index download
// failed, we may not be able to progress. So we fallback to old
// format where the full segment was downloaded, and try to hydrate
Expand All @@ -903,7 +918,7 @@ ss::future<> remote_segment::hydrate() {
"fallback mode and retrying hydration.",
ex);
_fallback_mode = fallback_mode::yes;
return hydrate().then([] {
return hydrate(as).then([] {
// This is an empty file to match the type returned by `fut`.
// The result is discarded immediately so it is unused.
return ss::file{};
Expand Down
9 changes: 5 additions & 4 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ class remote_segment final {
kafka::offset start,
kafka::offset end,
std::optional<model::timestamp>,
ss::io_priority_class);
ss::io_priority_class,
storage::opt_abort_source_t as = std::nullopt);

/// Hydrate the segment for segment meta version v2 or lower. For v3 or
/// higher, only hydrate the index. If the index hydration fails, fall back
/// to old mode where the full segment is hydrated. For v3 or higher
/// versions, the actual segment data is hydrated by the data source
/// implementation, but the index is still required to be present first.
ss::future<> hydrate();
ss::future<> hydrate(storage::opt_abort_source_t as = std::nullopt);

/// Hydrate a part of a segment, identified by the given range. The range
/// can contain data for multiple contiguous chunks, in which case multiple
Expand Down Expand Up @@ -273,10 +274,10 @@ class remote_segment final {
/// Notifies the background hydration fiber
ss::condition_variable _bg_cvar;

using expiry_handler = std::function<void(ss::promise<ss::file>&)>;
using expiry_handler = std::function<void(abortable_promise&)>;

/// List of fibers that wait for the segment to be hydrated
ss::expiring_fifo<ss::promise<ss::file>, expiry_handler> _wait_list;
ss::expiring_fifo<abortable_promise, expiry_handler> _wait_list;

ss::file _data_file;
std::optional<offset_index> _index;
Expand Down
33 changes: 33 additions & 0 deletions src/v/cloud_storage/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -456,4 +456,37 @@ configuration::get_bucket_config() {
}
}

abortable_promise::abortable_promise(
ss::promise<ss::file>&& promise,
ss::future<ss::file>&& query,
ss::optimized_optional<ss::abort_source::subscription>&& subscription)
: promise{std::move(promise)}
, query{std::move(query)}
, subscription{std::move(subscription)} {}

abortable_promise::abortable_promise(abortable_promise&& other) noexcept
: promise{std::move(other.promise)}
, query{std::move(other.query)}
, subscription{std::move(other.subscription)} {}

bool abortable_promise::available() const { return query.available(); }

void abortable_promise::set_exception(const std::exception_ptr& ex) {
if (!available()) {
promise.set_exception(ex);
}
}

void abortable_promise::set_exception(std::exception&& ex) {
if (!available()) {
promise.set_exception(std::move(ex));
}
}

void abortable_promise::set_value(ss::file file) {
if (!available()) {
promise.set_value(std::move(file));
}
}

} // namespace cloud_storage
25 changes: 25 additions & 0 deletions src/v/cloud_storage/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "seastarx.h"
#include "utils/named_type.h"

#include <seastar/core/file.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/bool_class.hh>
Expand Down Expand Up @@ -411,6 +412,30 @@ struct anomalies

std::ostream& operator<<(std::ostream& o, const anomalies& a);

struct abortable_promise {
ss::promise<ss::file> promise;
// The query future is used to check the status of the operation without
// extracting the future from the promise.
ss::future<ss::file> query;
ss::optimized_optional<ss::abort_source::subscription> subscription;

abortable_promise(
ss::promise<ss::file>&& promise,
ss::future<ss::file>&& query,
ss::optimized_optional<ss::abort_source::subscription>&& subscription);

abortable_promise(abortable_promise&&) noexcept;
abortable_promise& operator=(abortable_promise&&) = delete;

abortable_promise(const abortable_promise&) = delete;
abortable_promise& operator=(const abortable_promise&) = delete;

bool available() const;
void set_exception(const std::exception_ptr&);
void set_exception(std::exception&&);
void set_value(ss::file);
};

} // namespace cloud_storage

namespace std {
Expand Down

0 comments on commit 2b0e83a

Please sign in to comment.