Skip to content

Commit

Permalink
cloud_storage: Make stream consumer into friend class, use better name
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijat committed Jun 29, 2023
1 parent c39d558 commit fc95469
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 41 deletions.
65 changes: 38 additions & 27 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,41 @@ class bounded_stream final : public ss::data_source_impl {

namespace cloud_storage {

class split_segment_into_chunk_range_consumer {
public:
split_segment_into_chunk_range_consumer(
cloud_storage::remote_segment& remote_segment,
cloud_storage::segment_chunk_range range)
: _segment{remote_segment}
, _range{std::move(range)} {}

ss::future<uint64_t>
operator()(uint64_t size, ss::input_stream<char> stream) {
for (const auto [start, end] : _range) {
const auto bytes_to_read = end.value_or(_segment._size - 1) - start
+ 1;
auto reservation = co_await _segment._cache.reserve_space(
bytes_to_read, 1);
vlog(
cst_log.trace,
"making stream from byte offset {} for {} bytes",
start,
bytes_to_read);
auto dsi = std::make_unique<bounded_stream>(stream, bytes_to_read);
auto stream_upto = ss::input_stream<char>{
ss::data_source{std::move(dsi)}};
co_await _segment.put_chunk_in_cache(
reservation, std::move(stream_upto), start);
}
co_await stream.close();
co_return size;
}

private:
cloud_storage::remote_segment& _segment;
cloud_storage::segment_chunk_range _range;
};

std::filesystem::path
generate_index_path(const cloud_storage::remote_segment_path& p) {
return fmt::format("{}.index", p().native());
Expand Down Expand Up @@ -430,8 +465,7 @@ ss::future<uint64_t> remote_segment::put_segment_in_cache(
co_return size_bytes;
}

ss::future<uint64_t> remote_segment::put_chunk_in_cache(
uint64_t size,
ss::future<> remote_segment::put_chunk_in_cache(
space_reservation_guard& reservation,
ss::input_stream<char> stream,
chunk_start_offset_t chunk_start) {
Expand All @@ -447,8 +481,6 @@ ss::future<uint64_t> remote_segment::put_chunk_in_cache(
put_exception);
std::rethrow_exception(put_exception);
}

co_return size;
}

ss::future<> remote_segment::do_hydrate_segment() {
Expand Down Expand Up @@ -905,28 +937,6 @@ ss::future<> remote_segment::hydrate() {
.discard_result();
}

remote_segment::consume_stream::consume_stream(
remote_segment& remote_segment, segment_chunk_range range)
: _segment{remote_segment}
, _range{std::move(range)} {}

ss::future<uint64_t> remote_segment::consume_stream::operator()(
uint64_t size, ss::input_stream<char> stream) {
for (const auto [start, end] : _range) {
const auto space_required = end.value_or(_segment._size - 1) - start
+ 1;
auto reservation = co_await _segment._cache.reserve_space(
space_required, 1);
auto dsi = std::make_unique<bounded_stream>(
stream, end.value_or(_segment._size));
auto stream_upto = ss::input_stream<char>{
ss::data_source{std::move(dsi)}};
co_await _segment.put_chunk_in_cache(
size, reservation, std::move(stream_upto), start);
}
co_return size;
}

ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
const auto start = range.first_offset();
const auto path_to_start = get_path_to_chunk(start);
Expand Down Expand Up @@ -954,7 +964,8 @@ ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
cache_hydration_timeout, cache_hydration_backoff, &_rtc};

const auto end = range.last_offset().value_or(_size - 1);
auto consumer = consume_stream{*this, std::move(range)};
auto consumer = split_segment_into_chunk_range_consumer{
*this, std::move(range)};
auto res = co_await _api.download_segment(
_bucket, _path, std::move(consumer), rtc, std::make_pair(start, end));
if (res != download_result::success) {
Expand Down
16 changes: 2 additions & 14 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ class remote_segment final {

/// Stores a segment chunk in cache. The chunk is stored in a path derived
/// from the segment path: <segment_path>_chunks/chunk_start_file_offset.
ss::future<uint64_t> put_chunk_in_cache(
uint64_t,
ss::future<> put_chunk_in_cache(
space_reservation_guard&,
ss::input_stream<char>,
chunk_start_offset_t chunk_start);
Expand Down Expand Up @@ -303,18 +302,7 @@ class remote_segment final {
std::optional<segment_chunks> _chunks_api;
std::optional<offset_index::coarse_index_t> _coarse_index;

class consume_stream {
public:
consume_stream(
remote_segment& remote_segment, segment_chunk_range range);

ss::future<uint64_t>
operator()(uint64_t, ss::input_stream<char> stream);

private:
remote_segment& _segment;
segment_chunk_range _range;
};
friend class split_segment_into_chunk_range_consumer;
};

class remote_segment_batch_consumer;
Expand Down

0 comments on commit fc95469

Please sign in to comment.