Skip to content

Commit

Permalink
Merge pull request redpanda-data#18653 from Lazin/pr/ntp-archiver-util
Browse files Browse the repository at this point in the history
archival: Data upload path groundwork
  • Loading branch information
Lazin authored Jun 3, 2024
2 parents 154221f + d819b0a commit 0eb784c
Show file tree
Hide file tree
Showing 12 changed files with 605 additions and 11 deletions.
3 changes: 3 additions & 0 deletions src/v/archival/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ class purger;
class scrubber;
class archiver_manager;

class archiver_operations_api;
class archiver_scheduler_api;

} // namespace archival
7 changes: 6 additions & 1 deletion src/v/archival/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ struct adjacent_segment_run {
std::ostream& operator<<(std::ostream& o, const adjacent_segment_run& run);

enum class error_outcome {
unexpected_failure,
unexpected_failure = 1,
timed_out,
out_of_range,
offset_not_found,
Expand Down Expand Up @@ -246,3 +246,8 @@ inline std::error_code make_error_code(error_outcome e) noexcept {
}

} // namespace archival

namespace std {
template<>
struct is_error_code_enum<archival::error_outcome> : true_type {};
} // namespace std
18 changes: 13 additions & 5 deletions src/v/cloud_storage/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ ss::future<upload_result> remote::upload_controller_snapshot(
api_activity_type::controller_snapshot_upload,
[this] { _probe.controller_snapshot_failed_upload(); },
[this] { _probe.controller_snapshot_successful_upload(); },
[this] { _probe.controller_snapshot_upload_backoff(); });
[this] { _probe.controller_snapshot_upload_backoff(); },
std::nullopt);
}

template<
Expand All @@ -525,7 +526,8 @@ ss::future<upload_result> remote::upload_stream(
api_activity_type event_type,
FailedUploadMetricFn failed_upload_metric,
SuccessfulUploadMetricFn successful_upload_metric,
UploadBackoffMetricFn upload_backoff_metric) {
UploadBackoffMetricFn upload_backoff_metric,
std::optional<size_t> max_retries) {
auto guard = _gate.hold();
retry_chain_node fib(&parent);
retry_chain_logger ctxlog(cst_log, fib);
Expand All @@ -537,7 +539,11 @@ ss::future<upload_result> remote::upload_stream(
segment_path,
content_length);
std::optional<upload_result> result;
while (!_gate.is_closed() && permit.is_allowed && !result) {
while (!_gate.is_closed() && permit.is_allowed && !result
&& max_retries.value_or(1) > 0) {
if (max_retries.has_value()) {
max_retries = max_retries.value() - 1;
}
auto lease = co_await _pool.local().acquire(fib.root_abort_source());
notify_external_subscribers(
api_activity_notification{
Expand Down Expand Up @@ -633,7 +639,8 @@ ss::future<upload_result> remote::upload_segment(
uint64_t content_length,
const reset_input_stream& reset_str,
retry_chain_node& parent,
lazy_abort_source& lazy_abort_source) {
lazy_abort_source& lazy_abort_source,
std::optional<size_t> max_retries) {
return upload_stream(
bucket,
segment_path,
Expand All @@ -645,7 +652,8 @@ ss::future<upload_result> remote::upload_segment(
api_activity_type::segment_upload,
[this] { _probe.failed_upload(); },
[this] { _probe.successful_upload(); },
[this] { _probe.upload_backoff(); });
[this] { _probe.upload_backoff(); },
max_retries);
}

ss::future<download_result> remote::download_stream(
Expand Down
7 changes: 5 additions & 2 deletions src/v/cloud_storage/remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,15 @@ class remote
/// segment's data
/// \param exposed_name is a segment's name in S3
/// \param manifest is a manifest that should have the segment metadata
/// \param max_retries is a maximal number of allowed retries
ss::future<upload_result> upload_segment(
const cloud_storage_clients::bucket_name& bucket,
const remote_segment_path& segment_path,
uint64_t content_length,
const reset_input_stream& reset_str,
retry_chain_node& parent,
lazy_abort_source& lazy_abort_source);
lazy_abort_source& lazy_abort_source,
std::optional<size_t> max_retries = std::nullopt);

/// \brief Download segment from S3
///
Expand Down Expand Up @@ -565,7 +567,8 @@ class remote
api_activity_type event_type,
FailedUploadMetricFn failed_upload_metric,
SuccessfulUploadMetricFn successful_upload_metric,
UploadBackoffMetricFn upload_backoff_metric);
UploadBackoffMetricFn upload_backoff_metric,
std::optional<size_t> max_retries);

template<
typename DownloadLatencyMeasurementFn,
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_storage/remote_segment_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ struct segment_record_stats {
model::timestamp base_timestamp;
// Last timestamp
model::timestamp last_timestamp;

auto operator<=>(const segment_record_stats&) const noexcept = default;
};

class remote_segment_index_builder : public storage::batch_consumer {
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,7 @@ configuration::configuration()
"cloud_storage_segment_max_upload_interval_sec",
"Time that segment can be kept locally without uploading it to the "
"remote storage (sec)",
{.visibility = visibility::tunable},
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1h)
, cloud_storage_manifest_max_upload_interval_sec(
*this,
Expand Down
4 changes: 2 additions & 2 deletions src/v/container/include/container/fragmented_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,11 @@ class fragmented_vector {
// items.
constexpr size_t initial_cap = std::max(1UL, 32UL / sizeof(T));
_capacity = initial_cap;
_frags.emplace_back().reserve(_capacity);
_frags.emplace_back(_frags.get_allocator()).reserve(_capacity);
return;
}
}
_frags.emplace_back().reserve(elems_per_frag);
_frags.emplace_back(_frags.get_allocator()).reserve(elems_per_frag);
_capacity += elems_per_frag;
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/model/fundamental.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ struct topic_partition {
|| (topic == other.topic && partition < other.partition);
}

auto operator<=>(const topic_partition& other) const noexcept = default;

operator topic_partition_view() {
return topic_partition_view(topic, partition);
}
Expand Down
4 changes: 4 additions & 0 deletions src/v/model/ktp.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ class ktp {
return tp_equals(other.tp, tp) && kafka_ns_view == other.ns();
}

auto operator<=>(const ktp& other) const noexcept {
return tp <=> other.tp;
}

friend std::ostream& operator<<(std::ostream& os, const ktp& t) {
os << t.to_ntp();
return os;
Expand Down
162 changes: 162 additions & 0 deletions src/v/storage/batch_consumer_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "storage/parser.h"

#include <seastar/core/future.hh>

#include <exception>

namespace storage {

/// Chain multiple batch_consumer instances to consume
/// from the same input_stream. The consumers are allowed to skip
/// batches or stop early.
///
/// Every consumer in the chain observes the batches as if it's the
/// only one in the chain. If one of the consumers skips batches or
/// stops early it shouldn't affect the others.
template<int N>
class chained_batch_consumer : public storage::batch_consumer {
public:
template<typename... Args>
explicit chained_batch_consumer(Args&&... args)
: _cons{std::forward<Args>(args)...} {
std::ignore = std::fill_n(
_prev.begin(), N, consume_result::accept_batch);
}

consume_result
accept_batch_start(const model::record_batch_header& h) const override {
auto ix = 0;
for (const auto& c : _cons) {
if (_prev[ix] != consume_result::stop_parser) {
auto r = c->accept_batch_start(h);
_prev[ix] = r;
}
ix++;
}
size_t n_stop = 0;
size_t n_skip = 0;
for (auto cr : _prev) {
switch (cr) {
case consume_result::accept_batch:
break;
case consume_result::skip_batch:
n_skip++;
break;
case consume_result::stop_parser:
n_stop++;
break;
}
}
if (n_stop == _prev.size()) {
return consume_result::stop_parser;
}
if (n_skip == _prev.size()) {
return consume_result::skip_batch;
}
return consume_result::accept_batch;
}

void consume_batch_start(
model::record_batch_header hdr,
size_t offset, // NOLINT
size_t size) override {
auto ix = 0;
for (const auto& c : _cons) {
if (_prev[ix] == consume_result::accept_batch) {
c->consume_batch_start(hdr, offset, size);
} else if (_prev[ix] == consume_result::skip_batch) {
c->skip_batch_start(hdr, offset, size);
}
ix++;
}
}

void skip_batch_start(
model::record_batch_header hdr,
size_t offset, // NOLINT
size_t size) override {
auto ix = 0;
for (const auto& c : _cons) {
if (_prev[ix] == consume_result::skip_batch) {
c->skip_batch_start(hdr, offset, size);
}
ix++;
}
}

void consume_records(iobuf&& b) override {
auto ix = 0;
for (auto& c : _cons) {
if (_prev[ix] == consume_result::accept_batch) {
c->consume_records(b.share(0, b.size_bytes()));
}
ix++;
}
}

ss::future<stop_parser> consume_batch_end() override {
auto ix = 0;
std::vector<ss::future<stop_parser>> fut;
for (auto& c : _cons) {
if (_prev[ix] == consume_result::accept_batch) {
fut.emplace_back(c->consume_batch_end());
} else {
fut.emplace_back(ss::make_ready_future<stop_parser>(
_prev[ix] == consume_result::stop_parser ? stop_parser::yes
: stop_parser::no));
}
ix++;
}
auto ready = co_await ss::when_all(fut.begin(), fut.end());
std::exception_ptr err;
stop_parser stop = stop_parser::yes;
ix = 0;
for (ss::future<stop_parser>& f : ready) {
if (f.failed()) {
// We need to extract all exceptions to avoid the
// warning but we can only rethrow the last one.
err = f.get_exception();
// Can't continue with the consumer after exception.
// Assume it being stopped.
_prev[ix] = consume_result::stop_parser;
} else {
auto s = f.get();
if (s == stop_parser::yes) {
_prev[ix] = consume_result::stop_parser;
} else {
// This could be removed
_prev[ix] = consume_result::accept_batch;
stop = stop_parser::no;
}
}
ix++;
}
if (err) {
std::rethrow_exception(err);
}
co_return stop;
}

void print(std::ostream& o) const override {
o << "chained_batch_consumer";
}

private:
std::array<std::unique_ptr<storage::batch_consumer>, N> _cons;
mutable std::array<consume_result, N> _prev;
};

} // namespace storage
1 change: 1 addition & 0 deletions src/v/storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ rp_test(
offset_translator_state_test.cc
file_sanitizer_test.cc
compaction_reducer_test.cc
batch_consumer_utils_test.cc
LIBRARIES v::seastar_testing_main v::storage_test_utils v::model_test_utils
LABELS storage
ARGS "-- -c 1"
Expand Down
Loading

0 comments on commit 0eb784c

Please sign in to comment.