diff --git a/src/v/cloud_storage/remote.cc b/src/v/cloud_storage/remote.cc index 7db1a491f86c3..63d71571b7238 100644 --- a/src/v/cloud_storage/remote.cc +++ b/src/v/cloud_storage/remote.cc @@ -673,7 +673,13 @@ ss::future remote::download_stream( auto lease = co_await [this, &fib] { auto m = _probe.client_acquisition(); - return _pool.local().acquire(fib.root_abort_source()); + return _pool.local() + .acquire(fib.root_abort_source()) + .then([](auto lease) { + return ss::make_lw_shared< + cloud_storage_clients::client_pool::client_lease>( + std::move(lease)); + }); }(); auto permit = fib.retry(); @@ -684,7 +690,7 @@ ss::future remote::download_stream( api_activity_notification::segment_download, parent); auto download_latency_measure = download_latency_measurement(); - auto resp = co_await lease.client->get_object( + auto resp = co_await lease->client->get_object( bucket, path, fib.get_timeout(), false, byte_range); if (resp) { @@ -693,8 +699,14 @@ ss::future remote::download_stream( resp.value()->get_headers().at( boost::beast::http::field::content_length)); try { - uint64_t content_length = co_await cons_str( - length, resp.value()->as_input_stream()); + uint64_t content_length{}; + if (std::holds_alternative(cons_str)) { + content_length = co_await std::get(cons_str)( + length, resp.value()->as_input_stream()); + } else if (std::holds_alternative(cons_str)) { + content_length = co_await std::get(cons_str)( + length, resp.value()->as_input_stream(), lease); + } _probe.successful_download(); _probe.register_download_size(content_length); co_return download_result::success; @@ -712,7 +724,7 @@ ss::future remote::download_stream( download_latency_measure.reset(); - lease.client->shutdown(); + lease->client->shutdown(); switch (resp.error()) { case cloud_storage_clients::error_outcome::retry: diff --git a/src/v/cloud_storage/remote.h b/src/v/cloud_storage/remote.h index 7be74642771fa..201ca8e41fd33 100644 --- a/src/v/cloud_storage/remote.h +++ b/src/v/cloud_storage/remote.h @@ -128,8 +128,13 @@ class remote : public ss::peering_sharded_service { /// The functor should be reenterable since it can be called many times. /// On success it should return content_length. On failure it should /// allow the exception from the input_stream to propagate. - using try_consume_stream = ss::noncopyable_function( + using a = ss::noncopyable_function( uint64_t, ss::input_stream)>; + using b = ss::noncopyable_function( + uint64_t, + ss::input_stream, + ss::lw_shared_ptr)>; + using try_consume_stream = std::variant; /// Functor that should be provided by user when list_objects api is called. /// It receives every key that matches the query as well as it's modifiation diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index ab8f75042f38b..ff47ded1bf1af 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -51,6 +51,7 @@ #include #include +#include namespace { class bounded_stream final : public ss::data_source_impl { @@ -273,12 +274,39 @@ remote_segment::data_stream(size_t pos, ss::io_priority_class io_priority) { co_return storage::segment_reader_handle(std::move(data_stream)); } +ss::future remote_segment::build_stream() { + retry_chain_node local_rtc( + cache_hydration_timeout, cache_hydration_backoff, &_rtc); + + stream_with_leased_client slc; + auto res = co_await _api.download_segment( + _bucket, + _path, + [&slc]( + uint64_t size_bytes, + ss::input_stream s, + stream_with_leased_client::lease_t lease) { + slc.stream = std::move(s); + slc.lease = std::move(lease); + return ss::make_ready_future(size_bytes); + }, + local_rtc); + + if (res != download_result::success) { + vlog(_ctxlog.debug, "Failed to create stream for {}", _path); + throw download_exception(res, _path); + } + + co_return slc; +} + ss::future remote_segment::offset_data_stream( kafka::offset start, kafka::offset end, std::optional first_timestamp, - ss::io_priority_class io_priority) { + ss::io_priority_class io_priority, + bool skip_cache) { vlog(_ctxlog.debug, "remote segment file input stream at offset {}", start); ss::gate::holder g(_gate); co_await hydrate(); @@ -286,6 +314,16 @@ remote_segment::offset_data_stream( std::optional indexed_pos; std::optional prefetch_override = std::nullopt; + if (skip_cache) { + vlog(_ctxlog.info, "using direct stream to serve request"); + auto stream = std::make_unique( + co_await build_stream()); + co_return input_stream_with_offsets{ + .stream = ss::input_stream{ss::data_source{std::move(stream)}}, + .rp_offset = _base_rp_offset, + .kafka_offset = _base_rp_offset - _base_offset_delta}; + } + // Perform index lookup by timestamp or offset. This reduces the number // of hydrated chunks required to serve the request. if (first_timestamp) { @@ -1345,7 +1383,8 @@ remote_segment_batch_reader::init_parser() { model::offset_cast(_config.start_offset), model::offset_cast(_config.max_offset), _config.first_timestamp, - priority_manager::local().shadow_indexing_priority()); + priority_manager::local().shadow_indexing_priority(), + true); auto parser = std::make_unique( std::make_unique( diff --git a/src/v/cloud_storage/remote_segment.h b/src/v/cloud_storage/remote_segment.h index 748ad41e4d8a8..fbb2ce439bac9 100644 --- a/src/v/cloud_storage/remote_segment.h +++ b/src/v/cloud_storage/remote_segment.h @@ -60,7 +60,23 @@ class remote_segment_exception : public std::runtime_error { : std::runtime_error(m) {} }; +struct stream_with_leased_client : public ss::data_source_impl { + using lease_t + = ss::lw_shared_ptr; + + ss::future> get() override { + co_return co_await stream.read(); + } + + ss::future<> close() override { co_return co_await stream.close(); } + + lease_t lease; + ss::input_stream stream; +}; + class remote_segment final { + ss::future build_stream(); + public: remote_segment( remote& r, @@ -114,7 +130,8 @@ class remote_segment final { kafka::offset start, kafka::offset end, std::optional, - ss::io_priority_class); + ss::io_priority_class, + bool skip_cache = false); /// Hydrate the segment for segment meta version v2 or lower. For v3 or /// higher, only hydrate the index. If the index hydration fails, fall back