Skip to content

Commit

Permalink
Merge pull request #10559 from andijcr/feat/cstore/partition_manifest…
Browse files Browse the repository at this point in the history
…_serde

Feat/cstore/partition manifest serde
  • Loading branch information
jcsp authored May 17, 2023
2 parents 256a955 + 39f83e3 commit 56ec997
Show file tree
Hide file tree
Showing 48 changed files with 1,721 additions and 614 deletions.
7 changes: 2 additions & 5 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -674,12 +674,9 @@ ntp_archiver::download_manifest() {
_conf->cloud_storage_initial_backoff,
&_rtcnode);
cloud_storage::partition_manifest tmp(_ntp, _rev);
auto path = tmp.get_manifest_path();
auto key = cloud_storage::remote_manifest_path(
std::filesystem::path(std::move(path)));
vlog(_rtclog.debug, "Downloading manifest");
auto result = co_await _remote.download_manifest(
get_bucket_name(), key, tmp, fib);
auto [result, _] = co_await _remote.try_download_partition_manifest(
get_bucket_name(), tmp, fib);

// It's OK if the manifest is not found for a newly created topic. The
// condition in if statement is not guaranteed to cover all cases for new
Expand Down
11 changes: 7 additions & 4 deletions src/v/archival/scrubber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,18 @@ ss::future<scrubber::purge_result> scrubber::purge_partition(
// could have a "drop out on slowdown" flag?

cloud_storage::partition_manifest manifest(ntp, remote_revision);
auto manifest_path = manifest.get_manifest_path();
auto manifest_get_result = co_await _api.maybe_download_manifest(
bucket, manifest_path, manifest, manifest_rtc);
auto [manifest_get_result, manifest_fmt]
= co_await _api.try_download_partition_manifest(
bucket, manifest, manifest_rtc, true);

// save path here since manifest will get moved out
auto manifest_path = manifest.get_manifest_path(manifest_fmt);

if (manifest_get_result == download_result::notfound) {
vlog(
archival_log.debug,
"Partition manifest get {} not found",
manifest_path);
manifest.get_legacy_manifest_format_and_path().second);
result.status = purge_status::permanent_failure;
co_return result;
} else if (manifest_get_result != download_result::success) {
Expand Down
4 changes: 2 additions & 2 deletions src/v/archival/tests/archival_metadata_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ FIXTURE_TEST(test_snapshot_loading, archival_metadata_stm_base_fixture) {

{
std::stringstream s1, s2;
m.serialize(s1);
archival_stm.manifest().serialize(s2);
m.serialize_json(s1);
archival_stm.manifest().serialize_json(s2);
vlog(logger.info, "original manifest: {}", s1.str());
vlog(logger.info, "restored manifest: {}", s2.str());
}
Expand Down
9 changes: 6 additions & 3 deletions src/v/archival/tests/ntp_archiver_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ static const auto manifest_ntp = model::ntp(
manifest_namespace, manifest_topic, manifest_partition);
static const auto manifest_revision = model::initial_revision_id(0);
static const ss::sstring manifest_url = ssx::sformat(
"/10000000/meta/{}_{}/manifest.json",
"/10000000/meta/{}_{}/manifest.bin",
manifest_ntp.path(),
manifest_revision());

Expand Down Expand Up @@ -281,7 +281,10 @@ class archival_metadata_stm_accessor {
i.append(json.data(), json.size());
cloud_storage::partition_manifest m;

m.update(make_iobuf_input_stream(std::move(i))).get();
m.update(
cloud_storage::manifest_format::json,
make_iobuf_input_stream(std::move(i)))
.get();

stm._manifest = ss::make_shared<cloud_storage::partition_manifest>(
std::move(m));
Expand Down Expand Up @@ -447,7 +450,7 @@ FIXTURE_TEST(
BOOST_REQUIRE_EQUAL(get_requests().size(), 3);

std::stringstream st;
stm_manifest.serialize(st);
stm_manifest.serialize_json(st);
vlog(test_log.debug, "manifest: {}", st.str());
verify_segment_request("500-1-v1.log", stm_manifest);

Expand Down
2 changes: 1 addition & 1 deletion src/v/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static const auto manifest_ntp = model::ntp( // NOLINT
manifest_partition);
static const auto manifest_revision = model::initial_revision_id(0); // NOLINT
static const ss::sstring manifest_url = ssx::sformat( // NOLINT
"/10000000/meta/{}_{}/manifest.json",
"/10000000/meta/{}_{}/manifest.bin",
manifest_ntp.path(),
manifest_revision());

Expand Down
71 changes: 54 additions & 17 deletions src/v/archival/tests/segment_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ static constexpr ss::lowres_clock::duration segment_lock_timeout{60s};

SEASTAR_THREAD_TEST_CASE(test_segment_collection) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -124,7 +126,9 @@ SEASTAR_THREAD_TEST_CASE(test_segment_collection) {

SEASTAR_THREAD_TEST_CASE(test_start_ahead_of_manifest) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -183,7 +187,9 @@ SEASTAR_THREAD_TEST_CASE(test_empty_manifest) {

SEASTAR_THREAD_TEST_CASE(test_short_compacted_segment_inside_manifest_segment) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -216,7 +222,9 @@ SEASTAR_THREAD_TEST_CASE(test_short_compacted_segment_inside_manifest_segment) {

SEASTAR_THREAD_TEST_CASE(test_compacted_segment_aligned_with_manifest_segment) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -250,7 +258,9 @@ SEASTAR_THREAD_TEST_CASE(test_compacted_segment_aligned_with_manifest_segment) {
SEASTAR_THREAD_TEST_CASE(
test_short_compacted_segment_aligned_with_manifest_segment) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -286,7 +296,9 @@ SEASTAR_THREAD_TEST_CASE(
SEASTAR_THREAD_TEST_CASE(
test_many_compacted_segments_make_up_to_manifest_segment) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -319,7 +331,9 @@ SEASTAR_THREAD_TEST_CASE(

SEASTAR_THREAD_TEST_CASE(test_compacted_segment_larger_than_manifest_segment) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -355,7 +369,9 @@ SEASTAR_THREAD_TEST_CASE(test_compacted_segment_larger_than_manifest_segment) {

SEASTAR_THREAD_TEST_CASE(test_collect_capped_by_size) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -402,7 +418,9 @@ SEASTAR_THREAD_TEST_CASE(test_collect_capped_by_size) {

SEASTAR_THREAD_TEST_CASE(test_no_compacted_segments) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -430,7 +448,9 @@ SEASTAR_THREAD_TEST_CASE(test_no_compacted_segments) {

SEASTAR_THREAD_TEST_CASE(test_segment_name_adjustment) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand All @@ -457,7 +477,9 @@ SEASTAR_THREAD_TEST_CASE(test_segment_name_adjustment) {

SEASTAR_THREAD_TEST_CASE(test_segment_name_no_adjustment) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand All @@ -484,7 +506,10 @@ SEASTAR_THREAD_TEST_CASE(test_segment_name_no_adjustment) {

SEASTAR_THREAD_TEST_CASE(test_collected_segments_completely_cover_gap) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest_with_gaps)).get();
m.update(
cloud_storage::manifest_format::json,
make_manifest_stream(manifest_with_gaps))
.get();

using namespace storage;

Expand Down Expand Up @@ -581,7 +606,10 @@ SEASTAR_THREAD_TEST_CASE(test_collected_segments_completely_cover_gap) {

SEASTAR_THREAD_TEST_CASE(test_collection_starts_in_gap) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest_with_gaps)).get();
m.update(
cloud_storage::manifest_format::json,
make_manifest_stream(manifest_with_gaps))
.get();

using namespace storage;

Expand Down Expand Up @@ -613,7 +641,10 @@ SEASTAR_THREAD_TEST_CASE(test_collection_starts_in_gap) {

SEASTAR_THREAD_TEST_CASE(test_collection_ends_in_gap) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest_with_gaps)).get();
m.update(
cloud_storage::manifest_format::json,
make_manifest_stream(manifest_with_gaps))
.get();

using namespace storage;

Expand Down Expand Up @@ -645,7 +676,9 @@ SEASTAR_THREAD_TEST_CASE(test_collection_ends_in_gap) {

SEASTAR_THREAD_TEST_CASE(test_compacted_segment_after_manifest_start) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

using namespace storage;

Expand Down Expand Up @@ -677,7 +710,9 @@ SEASTAR_THREAD_TEST_CASE(test_compacted_segment_after_manifest_start) {

SEASTAR_THREAD_TEST_CASE(test_upload_candidate_generation) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down Expand Up @@ -749,7 +784,9 @@ SEASTAR_THREAD_TEST_CASE(test_upload_candidate_generation) {

SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
cloud_storage::partition_manifest m;
m.update(make_manifest_stream(manifest)).get();
m.update(
cloud_storage::manifest_format::json, make_manifest_stream(manifest))
.get();

temporary_dir tmp_dir("concat_segment_read");
auto data_path = tmp_dir.get_path();
Expand Down
18 changes: 4 additions & 14 deletions src/v/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "archival/types.h"
#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "cloud_storage/base_manifest.h"
#include "cloud_storage/remote_segment.h"
#include "cloud_storage_clients/configuration.h"
#include "cluster/archival_metadata_stm.h"
Expand Down Expand Up @@ -94,16 +95,6 @@ archiver_fixture::~archiver_fixture() {
pool.stop().get();
}

static cloud_storage::partition_manifest
load_manifest_from_str(std::string_view v) {
cloud_storage::partition_manifest m;
iobuf i;
i.append(v.data(), v.size());
auto s = make_iobuf_input_stream(std::move(i));
m.update(std::move(s)).get();
return m;
}

static void write_batches(
ss::lw_shared_ptr<storage::segment> seg,
ss::circular_buffer<model::record_batch> batches) { // NOLINT
Expand Down Expand Up @@ -480,19 +471,18 @@ void segment_matcher<Fixture>::verify_manifest(
template<class Fixture>
void segment_matcher<Fixture>::verify_manifest_content(
const ss::sstring& manifest_content) {
cloud_storage::partition_manifest m = load_manifest_from_str(
manifest_content);
cloud_storage::partition_manifest m = load_manifest(manifest_content);
verify_manifest(m);
}

template class segment_matcher<archiver_fixture>;

cloud_storage::partition_manifest load_manifest(std::string_view v) {
cloud_storage::partition_manifest m;
iobuf i;
i.append(v.data(), v.size());
auto s = make_iobuf_input_stream(std::move(i));
m.update(std::move(s)).get();
cloud_storage::partition_manifest m;
m.update(cloud_storage::manifest_format::serde, std::move(s)).get();
return m;
}

Expand Down
11 changes: 11 additions & 0 deletions src/v/cloud_storage/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
v_cc_library(
NAME segment_meta_cstore
SRCS
segment_meta_cstore.cc
DEPS
v::bytes
v::model
absl::btree
)

v_cc_library(
NAME cloud_storage
SRCS
base_manifest.cc
cache_service.cc
access_time_tracker.cc
cache_probe.cc
Expand Down Expand Up @@ -35,6 +45,7 @@ v_cc_library(
v::model
v::rphashing
v::cloud_roles
v::segment_meta_cstore
# NOTE: do not add v::cloud as a dependency
)
add_subdirectory(tests)
16 changes: 16 additions & 0 deletions src/v/cloud_storage/base_manifest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cloud_storage/base_manifest.h"

namespace cloud_storage {

base_manifest::~base_manifest() = default;
} // namespace cloud_storage
Loading

0 comments on commit 56ec997

Please sign in to comment.