Skip to content

Commit

Permalink
cloud_storage: Make stream consumer and use better name
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijat committed Jun 14, 2023
1 parent 8677178 commit 988b7fa
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 39 deletions.
54 changes: 29 additions & 25 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,31 @@ class bounded_stream final : public ss::data_source_impl {

namespace cloud_storage {

class split_segment_into_chunk_range_consumer {
public:
split_segment_into_chunk_range_consumer(
cloud_storage::remote_segment& remote_segment,
cloud_storage::segment_chunk_range range)
: _segment{remote_segment}
, _range{std::move(range)} {}

ss::future<uint64_t>
operator()(uint64_t size, ss::input_stream<char> stream) {
for (const auto [start, end] : _range) {
auto dsi = std::make_unique<bounded_stream>(
stream, end.value_or(_segment._size));
auto stream_upto = ss::input_stream<char>{
ss::data_source{std::move(dsi)}};
co_await _segment.put_chunk_in_cache(std::move(stream_upto), start);
}
co_return size;
}

private:
cloud_storage::remote_segment& _segment;
cloud_storage::segment_chunk_range _range;
};

std::filesystem::path
generate_index_path(const cloud_storage::remote_segment_path& p) {
return fmt::format("{}.index", p().native());
Expand Down Expand Up @@ -417,10 +442,8 @@ ss::future<uint64_t> remote_segment::put_segment_in_cache(
co_return size_bytes;
}

ss::future<uint64_t> remote_segment::put_chunk_in_cache(
uint64_t size,
ss::input_stream<char> stream,
chunk_start_offset_t chunk_start) {
ss::future<> remote_segment::put_chunk_in_cache(
ss::input_stream<char> stream, chunk_start_offset_t chunk_start) {
try {
co_await _cache.put(get_path_to_chunk(chunk_start), stream)
.finally([&stream] { return stream.close(); });
Expand All @@ -433,8 +456,6 @@ ss::future<uint64_t> remote_segment::put_chunk_in_cache(
put_exception);
std::rethrow_exception(put_exception);
}

co_return size;
}

ss::future<> remote_segment::do_hydrate_segment() {
Expand Down Expand Up @@ -888,24 +909,6 @@ ss::future<> remote_segment::hydrate() {
.discard_result();
}

remote_segment::consume_stream::consume_stream(
remote_segment& remote_segment, segment_chunk_range range)
: _segment{remote_segment}
, _range{std::move(range)} {}

ss::future<uint64_t> remote_segment::consume_stream::operator()(
uint64_t size, ss::input_stream<char> stream) {
for (const auto [start, end] : _range) {
auto dsi = std::make_unique<bounded_stream>(
stream, end.value_or(_segment._size));
auto stream_upto = ss::input_stream<char>{
ss::data_source{std::move(dsi)}};
co_await _segment.put_chunk_in_cache(
size, std::move(stream_upto), start);
}
co_return size;
}

ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
const auto start = range.first_offset();
const auto path_to_start = get_path_to_chunk(start);
Expand Down Expand Up @@ -936,7 +939,8 @@ ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
const auto space_required = end - start + 1;
const auto reserved = co_await _cache.reserve_space(space_required);

auto consumer = consume_stream{*this, std::move(range)};
auto consumer = split_segment_into_chunk_range_consumer{
*this, std::move(range)};
auto res = co_await _api.download_segment(
_bucket, _path, std::move(consumer), rtc, std::make_pair(start, end));
if (res != download_result::success) {
Expand Down
17 changes: 3 additions & 14 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ class remote_segment final {

/// Stores a segment chunk in cache. The chunk is stored in a path derived
/// from the segment path: <segment_path>_chunks/chunk_start_file_offset.
ss::future<uint64_t> put_chunk_in_cache(
uint64_t, ss::input_stream<char>, chunk_start_offset_t chunk_start);
ss::future<> put_chunk_in_cache(
ss::input_stream<char>, chunk_start_offset_t chunk_start);

/// Hydrate tx manifest. Method downloads the manifest file to the cache
/// dir.
Expand Down Expand Up @@ -299,18 +299,7 @@ class remote_segment final {
std::optional<segment_chunks> _chunks_api;
std::optional<offset_index::coarse_index_t> _coarse_index;

class consume_stream {
public:
consume_stream(
remote_segment& remote_segment, segment_chunk_range range);

ss::future<uint64_t>
operator()(uint64_t, ss::input_stream<char> stream);

private:
remote_segment& _segment;
segment_chunk_range _range;
};
friend class split_segment_into_chunk_range_consumer;
};

class remote_segment_batch_consumer;
Expand Down

0 comments on commit 988b7fa

Please sign in to comment.