-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cloud_storage: enable prefetching chunks #10950
Conversation
227bde3
to
81cb604
Compare
/ci-repeat 10 |
1 similar comment
/ci-repeat 10 |
a474c52
to
8ea1ca4
Compare
/ci-repeat 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Needs a rebase though.
@@ -1573,6 +1573,12 @@ configuration::configuration() | |||
{model::cloud_storage_chunk_eviction_strategy::eager, | |||
model::cloud_storage_chunk_eviction_strategy::capped, | |||
model::cloud_storage_chunk_eviction_strategy::predictive}) | |||
, cloud_storage_chunk_prefetch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: perhaps we could bound this property between between [0, segment_size / chunk_size]
? The lower limit (0) can be enforced via bounded_property
, but the upper limit is dynamic and has to be done somewhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to address this in a future PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the lower bound, but I'm curious what good the upper bound would do -- in the worst case, a poorly configured cluster would just download the entire segment in segment_size / chunk_size
pieces anyway, right?
Also, it may be challenging to reconcile this with the fact that each topic may have a different segment size.
// makes an HTTP GET call and E is also prefetched. So a total of two calls | ||
// are made for the five chunks (ignoring any cache evictions during the | ||
// process). | ||
if (const auto status = co_await _cache.is_cached(path_to_start); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each is_cached
call maps to an access
sys call. We should be able to answer these queries by using the access time tracker in cache_service
. I'll try to include this in #10855.
f34de26
to
8245b13
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but raced with #11287 which added cache lookups for chunks (which this PR also adds).
ss::future<ss::temporary_buffer<char>> get() override { | ||
return _stream.read_up_to(_upto); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can read_up_to
return with a shorter read than requested (looking at the impl I don't think so). If it can, _upto
should probably be updated in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually looks like it can return a shorter buffer, nice catch:
} else if (_buf.size() <= n) {
// easy case: steal buffer, return to caller
return make_ready_future<tmp_buf>(std::move(_buf));
}
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if read_exactly is a better fit here.
85668de
to
68ef97e
Compare
@@ -1573,6 +1573,12 @@ configuration::configuration() | |||
{model::cloud_storage_chunk_eviction_strategy::eager, | |||
model::cloud_storage_chunk_eviction_strategy::capped, | |||
model::cloud_storage_chunk_eviction_strategy::predictive}) | |||
, cloud_storage_chunk_prefetch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the lower bound, but I'm curious what good the upper bound would do -- in the worst case, a poorly configured cluster would just download the entire segment in segment_size / chunk_size
pieces anyway, right?
Also, it may be challenging to reconcile this with the fact that each topic may have a different segment size.
auto it = chunks.find(start); | ||
auto n_it = std::next(it); | ||
|
||
for (size_t i = 0; i < prefetch + 1 && it != chunks.end(); ++i) { | ||
auto start = it->first; | ||
std::optional<chunk_start_offset_t> end = std::nullopt; | ||
if (n_it != chunks.end()) { | ||
end = n_it->first - 1; | ||
} | ||
_chunks[start] = end; | ||
if (n_it == chunks.end()) { | ||
break; | ||
} | ||
it++; | ||
n_it++; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this seems a bit non-trivial. Could you comment what this is doing? Either here or in the header? Also could you mention what it means for an end offset to be nullopt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
class segment_chunk_range { | ||
public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something, but given the public API of this, could this be a deque of pairs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean for storing the data inside this class (instead of a map), or for replacing this class entirely?
If the former, then probably yes it can be a deque of pairs and since we only ever iterate over it and never lookup keys themselves, it should be marginally faster, but I don't see that as enough of a speedup to change it (traversal through a tree should still be pretty fast).
If the latter, this class provides some convenience methods for calculating the bounds of the range to decide how much space to reserve in cache etc, it could maybe be done by free functions accepting the deque of pairs but I prefer a class.
ss::future<ss::temporary_buffer<char>> get() override { | ||
const auto buf = co_await _stream.read_up_to(_upto); | ||
if (buf.size() < _upto) { | ||
_upto -= buf.size(); | ||
} | ||
co_return buf; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What prevents _upto from underflowing? Is it possible for us to read more than the stream size in the first call? If so, would it make sense for subsequent calls to check _upto == 0 and return empty? Or is this only expected to be called once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have switched to read_exactly
, I think it is more appropriate here, it will only return less than _upto
bytes in case of EOF, in which case the next call to our get
will return an empty buffer signalling EOF correctly.
src/v/cloud_storage/remote_segment.h
Outdated
@@ -299,6 +299,19 @@ class remote_segment final { | |||
|
|||
std::optional<segment_chunks> _chunks_api; | |||
std::optional<offset_index::coarse_index_t> _coarse_index; | |||
|
|||
class consume_stream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we make the name a bit more descriptive? Maybe chunk_caching_stream_consumer
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
chunk_start_offset_t start, std::optional<chunk_start_offset_t> end) { | ||
remote_segment::consume_stream::consume_stream( | ||
remote_segment& remote_segment, segment_chunk_range range) | ||
: _segment{remote_segment} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems out of place?
988b7fa
to
c210f2b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, but the current version doesn't build.
While reading through this again I realised there's a tradeoff between prefetch and the cache space reservation. If the cache is nearly full, then prefetching is actually counter-productive to the overall system. Maybe we could check how full the cache is before creating the chunk_data_source_impl
and override the prefetch to 0
if it's over 90% utilised or so.
2b5bafb
to
a322dc0
Compare
/ci-repeat |
self._set_params_and_start_redpanda( | ||
cloud_storage_cache_chunk_size=1048576 * 8) | ||
cloud_storage_cache_chunk_size=1048576 * 8, | ||
cloud_storage_chunk_prefetch=prefetch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to check that prefetching is actually happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While trying to add a test for this, I ran into a bug in the coarse index generation code. Creating a separate PR to fix that, once that is merged I will add the test to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
draft PR #11705 for fixing the bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the test tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_prefetch_chunks
now, will require a few ci-repeats to see if it's parameters are correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, one small issue
, _range{std::move(range)} {} | ||
|
||
ss::future<uint64_t> | ||
operator()(uint64_t size, ss::input_stream<char> stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the stream is not closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
5691a15
to
150d4d0
Compare
/ci-repeat |
Controls the number of chunks prefetched when a chunk is downloaded. A count of 0 means no extra chunk will be downloaded. A count of 1 means for every downloaded chunk, the next chunk is also prefetched.
A chunk range covers a series of contiguous chunks, which can be used to prefetch. The idea when prefetching is that we need to make a single http call for a byte range which contains > 1 chunks, but we want to write the data to disk as separate chunk files. The range utility introduced here takes a start offset, a prefetch count and a map of chunk starts, and enables iteration over the chunks until the prefetch is satisfied, also allowing access to the last offset which is used to download the byte range.
A bounded stream wraps an input stream and only consumes upto a certain offset from the underlying input stream. It is intended to be used with chunk prefetch, where a single input stream is used to read the response from an http GET call, and bounded streams are created on it to read up to chunk boundaries to be written to disk.
The remote object expects a consumer which will take an input stream and do something with it (usually write the response to disk). The consumer is supposed to be re-entrant since it can be called multiple times by remote. This change adds a stream consumer which accepts an http response stream and a chunk range, and creates chunk files to put in cache from the stream containing potentially multiple contiguous chunks. It effectively breaks up the stream into chunks before putting the files in cache.
Allow overriding prefetch size per reader. The main purpose of this is to allow disabling prefetch (by setting it to zero) for timequeries.
When segment is hydrated, we are guaranteed to be in legacy mode, which means that we were not able to download an index from cloud storage. The change cleans up the code path in segment hydration, so that if a segment is downloaded, an index is always created on the fly from the segment data.
Chunk prefetching allows fetching more than one segment chunks at once for performance improvement. The http call to fetch data includes a single byte range covering the original chunk plus the prefetch, and the response is written to disk as individual chunk files.
Note on potential inefficiency/wasted effort: With these changes, consider a set of contiguous chunks A,B,C,D,E,F. With a prefetch of 4, when A is downloaded, B,C,D,E are also downloaded. These are not however hydrated or materialized. The chunk files are kept on disk. When/if a request for hydrating B is issued, if it is still on disk, it is directly materialized. If not, it is re-downloaded and then materialized. In the event that B is deleted for some reason (cache eviction) and C,D,E are still on disk, when B is re-hydrated, C,D,E will also be downloaded again, and the chunk files will be overwritten.
With some extra code this could be avoided by only selectively downloading missing chunks from the prefetch list, this implementation takes the simpler approach and just downloads all chunks from B onwards again, assuming that the next few chunks are also absent.
Fixes #11028
Backports Required
Release Notes
Force push: