diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index eea84ab45d437..72df6c12c03fe 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -119,7 +119,7 @@ using namespace std::chrono_literals; static constexpr size_t max_consume_size = 128_KiB; -inline void expiry_handler_impl(ss::promise& pr) { +inline void expiry_handler_impl(abortable_promise& pr) { pr.set_exception(ss::timed_out_error()); } @@ -278,10 +278,11 @@ remote_segment::offset_data_stream( kafka::offset start, kafka::offset end, std::optional first_timestamp, - ss::io_priority_class io_priority) { + ss::io_priority_class io_priority, + storage::opt_abort_source_t as) { vlog(_ctxlog.debug, "remote segment file input stream at offset {}", start); ss::gate::holder g(_gate); - co_await hydrate(); + co_await hydrate(as); std::optional indexed_pos; std::optional prefetch_override = std::nullopt; @@ -872,7 +873,7 @@ ss::future<> remote_segment::run_hydrate_bg() { _hydration_loop_running = false; } -ss::future<> remote_segment::hydrate() { +ss::future<> remote_segment::hydrate(storage::opt_abort_source_t as) { if (!_hydration_loop_running) { vlog( _ctxlog.error, @@ -887,11 +888,25 @@ ss::future<> remote_segment::hydrate() { auto g = _gate.hold(); vlog(_ctxlog.debug, "segment {} hydration requested", _path); ss::promise p; - auto fut = p.get_future(); - _wait_list.push_back(std::move(p), ss::lowres_clock::time_point::max()); + + ss::optimized_optional subscription; + auto fut = ss::shared_future{p.get_future()}; + if (as.has_value()) { + subscription = as->get().subscribe( + [&p, f = fut.get_future()]() noexcept { + if (!f.available()) { + p.set_exception(ss::abort_requested_exception{}); + } + }); + } + + _wait_list.push_back( + abortable_promise{ + std::move(p), fut.get_future(), std::move(subscription)}, + ss::lowres_clock::time_point::max()); _bg_cvar.signal(); - return fut - .handle_exception_type([this](const download_exception& ex) { + return fut.get_future() + .handle_exception_type([this, &as](const download_exception& ex) { // If we are working with an index-only format, and index download // failed, we may not be able to progress. So we fallback to old // format where the full segment was downloaded, and try to hydrate @@ -903,7 +918,7 @@ ss::future<> remote_segment::hydrate() { "fallback mode and retrying hydration.", ex); _fallback_mode = fallback_mode::yes; - return hydrate().then([] { + return hydrate(as).then([] { // This is an empty file to match the type returned by `fut`. // The result is discarded immediately so it is unused. return ss::file{}; diff --git a/src/v/cloud_storage/remote_segment.h b/src/v/cloud_storage/remote_segment.h index 9dc8bfc0aa62f..42430a700c053 100644 --- a/src/v/cloud_storage/remote_segment.h +++ b/src/v/cloud_storage/remote_segment.h @@ -114,14 +114,15 @@ class remote_segment final { kafka::offset start, kafka::offset end, std::optional, - ss::io_priority_class); + ss::io_priority_class, + storage::opt_abort_source_t as = std::nullopt); /// Hydrate the segment for segment meta version v2 or lower. For v3 or /// higher, only hydrate the index. If the index hydration fails, fall back /// to old mode where the full segment is hydrated. For v3 or higher /// versions, the actual segment data is hydrated by the data source /// implementation, but the index is still required to be present first. - ss::future<> hydrate(); + ss::future<> hydrate(storage::opt_abort_source_t as = std::nullopt); /// Hydrate a part of a segment, identified by the given range. The range /// can contain data for multiple contiguous chunks, in which case multiple @@ -273,10 +274,10 @@ class remote_segment final { /// Notifies the background hydration fiber ss::condition_variable _bg_cvar; - using expiry_handler = std::function&)>; + using expiry_handler = std::function; /// List of fibers that wait for the segment to be hydrated - ss::expiring_fifo, expiry_handler> _wait_list; + ss::expiring_fifo _wait_list; ss::file _data_file; std::optional _index; diff --git a/src/v/cloud_storage/types.cc b/src/v/cloud_storage/types.cc index 2a82dcad657f6..1b4f3f47d69d4 100644 --- a/src/v/cloud_storage/types.cc +++ b/src/v/cloud_storage/types.cc @@ -456,4 +456,37 @@ configuration::get_bucket_config() { } } +abortable_promise::abortable_promise( + ss::promise&& promise, + ss::future&& query, + ss::optimized_optional&& subscription) + : promise{std::move(promise)} + , query{std::move(query)} + , subscription{std::move(subscription)} {} + +abortable_promise::abortable_promise(abortable_promise&& other) noexcept + : promise{std::move(other.promise)} + , query{std::move(other.query)} + , subscription{std::move(other.subscription)} {} + +bool abortable_promise::available() const { return query.available(); } + +void abortable_promise::set_exception(const std::exception_ptr& ex) { + if (!available()) { + promise.set_exception(ex); + } +} + +void abortable_promise::set_exception(std::exception&& ex) { + if (!available()) { + promise.set_exception(std::move(ex)); + } +} + +void abortable_promise::set_value(ss::file file) { + if (!available()) { + promise.set_value(std::move(file)); + } +} + } // namespace cloud_storage diff --git a/src/v/cloud_storage/types.h b/src/v/cloud_storage/types.h index f37d0c33f0ddd..aa1bf4dea32ab 100644 --- a/src/v/cloud_storage/types.h +++ b/src/v/cloud_storage/types.h @@ -19,6 +19,7 @@ #include "seastarx.h" #include "utils/named_type.h" +#include #include #include #include @@ -411,6 +412,30 @@ struct anomalies std::ostream& operator<<(std::ostream& o, const anomalies& a); +struct abortable_promise { + ss::promise promise; + // The query future is used to check the status of the operation without + // extracting the future from the promise. + ss::future query; + ss::optimized_optional subscription; + + abortable_promise( + ss::promise&& promise, + ss::future&& query, + ss::optimized_optional&& subscription); + + abortable_promise(abortable_promise&&) noexcept; + abortable_promise& operator=(abortable_promise&&) = delete; + + abortable_promise(const abortable_promise&) = delete; + abortable_promise& operator=(const abortable_promise&) = delete; + + bool available() const; + void set_exception(const std::exception_ptr&); + void set_exception(std::exception&&); + void set_value(ss::file); +}; + } // namespace cloud_storage namespace std {