Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: remote labels #20778

Merged
merged 34 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
44978ef
offline_log_viewer: support remote_labels in topic properties
andrwng Jun 20, 2024
7d38dba
cluster: add remote label as topic property
andrwng Jun 12, 2024
fab93f1
archival_stm: add remote path provider to archival stm
andrwng Jun 29, 2024
cda86a5
ntp_archiver: use path provider for naming
andrwng Jun 20, 2024
160a40e
cloud_storage: plumb path provider into async manifest view
andrwng Jun 20, 2024
291ed3f
cluster: use topic_manifest_downloader in topic recovery
andrwng Jun 17, 2024
b6d1e16
cluster: use topic_manifest_downloader in list-based recovery
andrwng Jun 25, 2024
41df563
archival_stm: use partition_manifest_downloader for snapshot recovery
andrwng Jun 18, 2024
e16c247
ntp_archiver: use partition_manifest_downlaoder for read replicas
andrwng Jun 17, 2024
13a1567
cluster: use partition_manifest_downloader in partition recovery
andrwng Jun 17, 2024
fe640c9
cluster: use partition_manifest_downloader in topic recovery validation
andrwng Jun 20, 2024
9d97ff6
cluster: use partition_manifest_downloader for unsafe reset from cloud
andrwng Jun 17, 2024
8cb5db2
cloud_storage: use partition_manifest_downloader in scrubber/anomaly …
andrwng Jun 20, 2024
ebf2fa1
remote_partition: use partition_manifest_downloader for manifest fina…
andrwng Jun 17, 2024
653c9ec
cloud_storage: remove try_download_partition_manifest
andrwng Jun 17, 2024
eaa7b95
cloud_storage: remove remote::partition_manifest_exists()
andrwng Jun 20, 2024
849aef7
cloud_storage: use path provider for spillovers in manifest view
andrwng Jun 19, 2024
5248547
remote_partition: use path provider for segment paths
andrwng Jun 18, 2024
3937454
cloud_storage: add utils for lifecycle marker paths
andrwng Jun 29, 2024
d10a60d
archival: use path provider to generate lifecycle marker paths
andrwng Jun 29, 2024
5d30320
archival: use the path provider throughout the purger
andrwng Jun 18, 2024
a0be06a
archival/segment_merger: use path provider for remote runs
andrwng Jun 18, 2024
d2681ba
admin: use path provider for anomalies report
andrwng Jun 19, 2024
1066344
features: add flag for remote labels
andrwng Jul 4, 2024
2d7e19a
config: property for disabling remote labels for tests
andrwng Jun 20, 2024
e7669a9
cloud_storage: clean up manifest path generation from manifests
andrwng Jun 20, 2024
46ed453
cloud_storage: clean up remote segment name generation
andrwng Jun 29, 2024
459106c
topic_manifest: remove feature table
andrwng Jun 22, 2024
902f522
rptest/services: make get_cluster_uuid() node optional
andrwng Jun 24, 2024
245b4f2
rptest: support remote labels in BucketView
andrwng Jun 24, 2024
df36562
rptest: option to change si_settings bucket
andrwng Jul 1, 2024
45c62a4
cluster: plug cluster uuid into new topics
andrwng Jun 29, 2024
b6ae520
config: enable remote labels by default
andrwng Jul 3, 2024
263d4b3
rptest: add remote_label_test
andrwng Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions src/v/cloud_storage/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,33 +231,6 @@ ss::future<download_result> remote::maybe_download_manifest(
co_return co_await do_download_manifest(bucket, fk, manifest, parent, true);
}

ss::future<std::pair<download_result, manifest_format>>
remote::try_download_partition_manifest(
WillemKauf marked this conversation as resolved.
Show resolved Hide resolved
const cloud_storage_clients::bucket_name& bucket,
partition_manifest& manifest,
retry_chain_node& parent,
bool expect_missing) {
vassert(
manifest.get_ntp() != model::ntp{}
&& manifest.get_revision_id() != model::initial_revision_id{},
"partition manifest must have ntp");

// first try to download the serde format
auto format_path = manifest.get_manifest_format_and_path();
auto serde_result = co_await do_download_manifest(
bucket, format_path, manifest, parent, expect_missing);
if (serde_result != download_result::notfound) {
// propagate success, timedout and failed to caller
co_return std::pair{serde_result, manifest_format::serde};
}
// fallback to json format
format_path = manifest.get_legacy_manifest_format_and_path();
co_return std::pair{
co_await do_download_manifest(
bucket, format_path, manifest, parent, expect_missing),
manifest_format::json};
}

ss::future<download_result> remote::do_download_manifest(
const cloud_storage_clients::bucket_name& bucket,
const std::pair<manifest_format, remote_manifest_path>& format_key,
Expand Down
12 changes: 0 additions & 12 deletions src/v/cloud_storage/remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,6 @@ class remote
base_manifest& manifest,
retry_chain_node& parent);

/// \brief Try downloading partition_manifest. the function tries first the
/// manifest_format::serde path, and then manifest_format::json path. it's
/// expected that manifest is constructed with the approprieate npt and
/// revision_id, as it will be used to generate the paths return type is
/// download_result and index of path that generated the result
ss::future<std::pair<download_result, manifest_format>>
try_download_partition_manifest(
const cloud_storage_clients::bucket_name& bucket,
partition_manifest& manifest,
retry_chain_node& parent,
bool expect_missing = false);

/// \brief Upload manifest to the pre-defined S3 location
///
/// \param bucket is a bucket name
Expand Down
82 changes: 13 additions & 69 deletions src/v/cloud_storage/tests/remote_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "cloud_storage/materialized_resources.h"
#include "cloud_storage/offset_translation_layer.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/partition_manifest_downloader.h"
#include "cloud_storage/remote.h"
#include "cloud_storage/remote_segment.h"
#include "cloud_storage/tests/common_def.h"
Expand Down Expand Up @@ -88,15 +89,6 @@ static cloud_storage::lazy_abort_source always_continue{
static constexpr model::cloud_credentials_source config_file{
model::cloud_credentials_source::config_file};

static partition_manifest load_manifest_from_str(std::string_view v) {
partition_manifest m;
iobuf i;
i.append(v.data(), v.size());
auto s = make_iobuf_input_stream(std::move(i));
m.update(manifest_format::json, std::move(s)).get();
return m;
}

static remote::event_filter allow_all;

static iobuf make_iobuf_from_string(std::string_view s) {
Expand Down Expand Up @@ -154,24 +146,6 @@ using remote_fixture = remote_fixture_base<noop_mixin_t>;
using gcs_remote_fixture = remote_fixture_base<
backend_override_mixin_t<model::cloud_storage_backend::google_s3_compat>>;

static auto run_manifest_download_and_check(
auto& remote,
const cloud_storage_clients::bucket_name& bucket_name,
partition_manifest expected_manifest,
manifest_format expected_download_format,
std::string_view test_context) {
partition_manifest actual(manifest_ntp, manifest_revision);
retry_chain_node fib(never_abort, 100ms, 20ms);
auto [res, fmt] = remote.local()
.try_download_partition_manifest(
bucket_name, actual, fib)
.get();

EXPECT_TRUE(res == download_result::success);
EXPECT_TRUE(fmt == expected_download_format);
EXPECT_TRUE(expected_manifest == actual);
}

class all_types_remote_fixture
: public remote_fixture
, public testing::TestWithParam<remote_test_parameters> {
Expand All @@ -188,44 +162,6 @@ class all_types_gcs_remote_fixture
: gcs_remote_fixture(GetParam().url_style) {}
};

TEST_P(all_types_remote_fixture, test_download_manifest_json) {
set_expectations_and_listen({expectation{
.url = manifest_url, .body = ss::sstring(manifest_payload)}});
auto subscription = remote.local().subscribe(allow_all);
run_manifest_download_and_check(
remote,
bucket_name,
load_manifest_from_str(manifest_payload),
manifest_format::json,
"manifest load from json");
EXPECT_TRUE(subscription.available());
EXPECT_TRUE(
subscription.get().type == api_activity_type::manifest_download);
}

TEST_P(all_types_remote_fixture, test_download_manifest_serde) {
auto translator = load_manifest_from_str(manifest_payload);
auto serialized = translator.serialize().get();
auto manifest_binary
= serialized.stream.read_exactly(serialized.size_bytes).get();

set_expectations_and_listen({expectation{
.url = manifest_serde_url,
.body = ss::sstring{manifest_binary.begin(), manifest_binary.end()}}});

auto subscription = remote.local().subscribe(allow_all);
run_manifest_download_and_check(
remote,
bucket_name,
std::move(translator),
manifest_format::serde,
"manifest load from serde");

EXPECT_TRUE(subscription.available());
EXPECT_TRUE(
subscription.get().type == api_activity_type::manifest_download);
}

TEST_P(all_types_remote_fixture, test_download_manifest_timeout) { // NOLINT
partition_manifest actual(manifest_ntp, manifest_revision);
auto subscription = remote.local().subscribe(allow_all);
Expand Down Expand Up @@ -1378,8 +1314,15 @@ TEST_P(all_types_remote_fixture, test_notification_retry_meta) {
partition_manifest actual(manifest_ntp, manifest_revision);
auto filter = remote::event_filter{};

auto fut = remote.local().try_download_partition_manifest(
bucket_name, actual, fib);
remote_path_provider path_provider(std::nullopt);
partition_manifest_downloader dl(
bucket_name,
path_provider,
manifest_ntp,
manifest_revision,
remote.local());

auto fut = dl.download_manifest(fib, &actual);

RPTEST_REQUIRE_EVENTUALLY(2s, [&] {
auto sub = remote.local().subscribe(filter);
Expand All @@ -1388,8 +1331,9 @@ TEST_P(all_types_remote_fixture, test_notification_retry_meta) {
});
});

auto [res, fmt] = fut.get();
EXPECT_TRUE(res == download_result::timedout);
auto res = fut.get();
EXPECT_TRUE(res.has_error());
EXPECT_TRUE(res.error() == error_outcome::manifest_download_error);
}

TEST_P(all_types_remote_fixture, test_get_object) {
Expand Down
11 changes: 8 additions & 3 deletions src/v/cloud_storage/tests/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#include "cloud_storage/tests/util.h"

#include "cloud_storage/partition_manifest_downloader.h"
#include "model/record.h"
#include "model/record_batch_types.h"

Expand Down Expand Up @@ -621,11 +622,15 @@ partition_manifest hydrate_manifest(
remote& api, const cloud_storage_clients::bucket_name& bucket) {
static ss::abort_source never_abort;

remote_path_provider path_provider(std::nullopt);
partition_manifest_downloader dl(
bucket, path_provider, manifest_ntp, manifest_revision, api);
partition_manifest m(manifest_ntp, manifest_revision);
ss::lowres_clock::update();
retry_chain_node rtc(never_abort, 300s, 200ms);
auto [res, _] = api.try_download_partition_manifest(bucket, m, rtc).get();
BOOST_REQUIRE(res == cloud_storage::download_result::success);
ss::lowres_clock::update();
auto res = dl.download_manifest(rtc, &m).get();
BOOST_REQUIRE(res.has_value());
BOOST_REQUIRE(res.value() == find_partition_manifest_outcome::success);
return m;
}

Expand Down