Skip to content

Commit

Permalink
experiment with bypassing cache
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijat committed Sep 13, 2023
1 parent 2400a6d commit f7df767
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 9 deletions.
22 changes: 17 additions & 5 deletions src/v/cloud_storage/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,13 @@ ss::future<download_result> remote::download_stream(

auto lease = co_await [this, &fib] {
auto m = _probe.client_acquisition();
return _pool.local().acquire(fib.root_abort_source());
return _pool.local()
.acquire(fib.root_abort_source())
.then([](auto lease) {
return ss::make_lw_shared<
cloud_storage_clients::client_pool::client_lease>(
std::move(lease));
});
}();

auto permit = fib.retry();
Expand All @@ -684,7 +690,7 @@ ss::future<download_result> remote::download_stream(
api_activity_notification::segment_download, parent);

auto download_latency_measure = download_latency_measurement();
auto resp = co_await lease.client->get_object(
auto resp = co_await lease->client->get_object(
bucket, path, fib.get_timeout(), false, byte_range);

if (resp) {
Expand All @@ -693,8 +699,14 @@ ss::future<download_result> remote::download_stream(
resp.value()->get_headers().at(
boost::beast::http::field::content_length));
try {
uint64_t content_length = co_await cons_str(
length, resp.value()->as_input_stream());
uint64_t content_length{};
if (std::holds_alternative<a>(cons_str)) {
content_length = co_await std::get<a>(cons_str)(
length, resp.value()->as_input_stream());
} else if (std::holds_alternative<b>(cons_str)) {
content_length = co_await std::get<b>(cons_str)(
length, resp.value()->as_input_stream(), lease);
}
_probe.successful_download();
_probe.register_download_size(content_length);
co_return download_result::success;
Expand All @@ -712,7 +724,7 @@ ss::future<download_result> remote::download_stream(

download_latency_measure.reset();

lease.client->shutdown();
lease->client->shutdown();

switch (resp.error()) {
case cloud_storage_clients::error_outcome::retry:
Expand Down
7 changes: 6 additions & 1 deletion src/v/cloud_storage/remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,13 @@ class remote : public ss::peering_sharded_service<remote> {
/// The functor should be reenterable since it can be called many times.
/// On success it should return content_length. On failure it should
/// allow the exception from the input_stream to propagate.
using try_consume_stream = ss::noncopyable_function<ss::future<uint64_t>(
using a = ss::noncopyable_function<ss::future<uint64_t>(
uint64_t, ss::input_stream<char>)>;
using b = ss::noncopyable_function<ss::future<uint64_t>(
uint64_t,
ss::input_stream<char>,
ss::lw_shared_ptr<cloud_storage_clients::client_pool::client_lease>)>;
using try_consume_stream = std::variant<a, b>;

/// Functor that should be provided by user when list_objects api is called.
/// It receives every key that matches the query as well as it's modifiation
Expand Down
43 changes: 41 additions & 2 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <fmt/core.h>

#include <exception>
#include <utility>

namespace {
class bounded_stream final : public ss::data_source_impl {
Expand Down Expand Up @@ -273,19 +274,56 @@ remote_segment::data_stream(size_t pos, ss::io_priority_class io_priority) {
co_return storage::segment_reader_handle(std::move(data_stream));
}

ss::future<stream_with_leased_client> remote_segment::build_stream() {
retry_chain_node local_rtc(
cache_hydration_timeout, cache_hydration_backoff, &_rtc);

stream_with_leased_client slc;
auto res = co_await _api.download_segment(
_bucket,
_path,
[&slc](
uint64_t size_bytes,
ss::input_stream<char> s,
stream_with_leased_client::lease_t lease) {
slc.stream = std::move(s);
slc.lease = std::move(lease);
return ss::make_ready_future<uint64_t>(size_bytes);
},
local_rtc);

if (res != download_result::success) {
vlog(_ctxlog.debug, "Failed to create stream for {}", _path);
throw download_exception(res, _path);
}

co_return slc;
}

ss::future<remote_segment::input_stream_with_offsets>
remote_segment::offset_data_stream(
kafka::offset start,
kafka::offset end,
std::optional<model::timestamp> first_timestamp,
ss::io_priority_class io_priority) {
ss::io_priority_class io_priority,
bool skip_cache) {
vlog(_ctxlog.debug, "remote segment file input stream at offset {}", start);
ss::gate::holder g(_gate);
co_await hydrate();

std::optional<offset_index::find_result> indexed_pos;
std::optional<uint16_t> prefetch_override = std::nullopt;

if (skip_cache) {
vlog(_ctxlog.info, "using direct stream to serve request");
auto stream = std::make_unique<stream_with_leased_client>(
co_await build_stream());
co_return input_stream_with_offsets{
.stream = ss::input_stream<char>{ss::data_source{std::move(stream)}},
.rp_offset = _base_rp_offset,
.kafka_offset = _base_rp_offset - _base_offset_delta};
}

// Perform index lookup by timestamp or offset. This reduces the number
// of hydrated chunks required to serve the request.
if (first_timestamp) {
Expand Down Expand Up @@ -1345,7 +1383,8 @@ remote_segment_batch_reader::init_parser() {
model::offset_cast(_config.start_offset),
model::offset_cast(_config.max_offset),
_config.first_timestamp,
priority_manager::local().shadow_indexing_priority());
priority_manager::local().shadow_indexing_priority(),
true);

auto parser = std::make_unique<storage::continuous_batch_parser>(
std::make_unique<remote_segment_batch_consumer>(
Expand Down
19 changes: 18 additions & 1 deletion src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,23 @@ class remote_segment_exception : public std::runtime_error {
: std::runtime_error(m) {}
};

struct stream_with_leased_client : public ss::data_source_impl {
using lease_t
= ss::lw_shared_ptr<cloud_storage_clients::client_pool::client_lease>;

ss::future<ss::temporary_buffer<char>> get() override {
co_return co_await stream.read();
}

ss::future<> close() override { co_return co_await stream.close(); }

lease_t lease;
ss::input_stream<char> stream;
};

class remote_segment final {
ss::future<stream_with_leased_client> build_stream();

public:
remote_segment(
remote& r,
Expand Down Expand Up @@ -114,7 +130,8 @@ class remote_segment final {
kafka::offset start,
kafka::offset end,
std::optional<model::timestamp>,
ss::io_priority_class);
ss::io_priority_class,
bool skip_cache = false);

/// Hydrate the segment for segment meta version v2 or lower. For v3 or
/// higher, only hydrate the index. If the index hydration fails, fall back
Expand Down

0 comments on commit f7df767

Please sign in to comment.