Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cst/inv: report parser follow ups #21497

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions src/v/cloud_storage/inventory/inv_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
#include <seastar/core/file.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/seastar.hh>
#include <seastar/coroutine/as_future.hh>

#include <re2/re2.h>

#include <exception>
#include <ranges>

namespace ranges = std::ranges;
Expand All @@ -32,9 +34,18 @@ namespace views = std::views;
namespace {
// hash-string/ns/tp/partition_rev/.*
const RE2 path_expr{"^[[:xdigit:]]+/(.*?)/(.*?)/(\\d+)_\\d+/.*?"};

// Holds hashes for a given NTP in memory before they will be flushed to disk.
// One of these structures is held per NTP in a map keyed by the NTP itself.
struct flush_entry {
model::ntp ntp;
// The vector of hashes which will be written to disk in the file_name on
// next flush.
fragmented_vector<uint64_t> hashes;
// The file to which hashes will be written. The file name is incremented on
// each flush, so each file name contains some hashes, and we may end up
// with multiple files per NTP. The hash loader will read all the files and
// collect the hashes together.
uint64_t file_name;
};

Expand Down Expand Up @@ -213,15 +224,11 @@ ss::future<> inventory_consumer::write_hashes_to_file(
vlog(cst_log.trace, "Writing {} hashe(s) to disk", hashes.size());
std::exception_ptr ep;
auto stream = co_await ss::make_file_output_stream(f);
try {
co_await write_hashes_to_file(stream, std::move(hashes));
} catch (...) {
ep = std::current_exception();
}

auto res = co_await ss::coroutine::as_future(
write_hashes_to_file(stream, std::move(hashes)));
co_await stream.close();
if (ep) {
std::rethrow_exception(ep);
if (res.failed()) {
std::rethrow_exception(res.get_exception());
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/v/cloud_storage/inventory/inv_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class inventory_consumer {
ss::future<> process_paths(fragmented_vector<ss::sstring> paths);

// Checks if the path belongs to one of the NTPs whose leadership belongs to
// this node. If so, the path is hashed and added to the `_path_hashes` map.
// this node. If so, the path is hashed and added to current NTP flush
// states, and will be written to disk on the next flush operation.
void process_path(ss::sstring path);

// Writes the largest hash vectors to disk. The vectors are written in files
Expand Down Expand Up @@ -111,7 +112,7 @@ class inventory_consumer {
// performed to free up memory.
size_t _max_hash_size_in_memory;

// Total size of path hashes held in memory
// Total size in bytes of path hashes held in memory
size_t _total_size{};

size_t _num_flushes{};
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/inventory/ntp_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "cloud_storage/inventory/ntp_hashes.h"

#include "base/likely.h"
#include "cloud_storage/logger.h"
#include "container/fragmented_vector.h"
#include "hashing/xx.h"
Expand Down Expand Up @@ -94,10 +95,9 @@ ss::future<> ntp_path_hashes::load_hashes(ss::input_stream<char>& stream) {
vlog(_ctxlog.trace, "read {} path hashes from disk", hashes.size());

for (auto hash : hashes) {
if (unlikely(_path_hashes.contains(hash))) {
auto [_, inserted] = _path_hashes.insert(hash);
if (unlikely(!inserted)) {
_possible_collisions.insert(hash);
} else {
_path_hashes.insert(hash);
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/v/cloud_storage/inventory/tests/inv_consumer_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ constexpr auto compressed

} // namespace

TEST(ParseNTPFromPath, Consumer) {
TEST(Consumer, ParseNTPFromPath) {
using p = std::pair<std::string_view, std::optional<model::ntp>>;
std::vector<p> test_data{
{"a0a6eeb8/kafka/topic-x/999_24/178-188-1574137-1-v1.log.1",
Expand All @@ -130,7 +130,7 @@ TEST(ParseNTPFromPath, Consumer) {
}
}

TEST(ParseCSV, Consumer) {
TEST(Consumer, ParseCSV) {
inventory_consumer c{"", {}, 0};
using p = std::pair<const char*, std::vector<ss::sstring>>;
std::vector<p> test_data{
Expand All @@ -152,7 +152,7 @@ TEST(ParseCSV, Consumer) {
}
}

TEST(LargeNumberOfPaths, Consumer) {
TEST(Consumer, LargeNumberOfPaths) {
temporary_dir t{"test_inv_consumer"};
const auto p0 = make_ntp("kafka", "topic-A", 110);

Expand Down Expand Up @@ -183,7 +183,7 @@ TEST(LargeNumberOfPaths, Consumer) {
}
}

TEST(WriteThenReadHashes, Consumer) {
TEST(Consumer, WriteThenReadHashes) {
temporary_dir t{"test_inv_consumer"};

const auto p0 = make_ntp("kafka", "partagas", 0);
Expand Down Expand Up @@ -224,7 +224,7 @@ TEST(WriteThenReadHashes, Consumer) {
}
}

TEST(CollisionDetection, Consumer) {
TEST(Consumer, CollisionDetection) {
temporary_dir t{"test_inv_consumer"};
const auto p0 = make_ntp("kafka", "topic-A", 110);

Expand All @@ -247,7 +247,7 @@ TEST(CollisionDetection, Consumer) {
h.stop().get();
}

TEST(ConsumeMultipleStreams, Consumer) {
TEST(Consumer, ConsumeMultipleStreams) {
temporary_dir t{"test_inv_consumer"};
const auto p0 = make_ntp("kafka", "partagas", 0);

Expand Down
12 changes: 6 additions & 6 deletions src/v/cloud_storage/inventory/tests/report_parser_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ report_parser make_parser(
}
} // namespace

TEST(ParseReport, Parser) {
TEST(Parser, ParseReport) {
auto input = std::vector<ss::sstring>{"foo", "bar", "x"};
for (const auto compression :
{is_gzip_compressed::no, is_gzip_compressed::yes}) {
Expand All @@ -59,7 +59,7 @@ TEST(ParseReport, Parser) {
}
}

TEST(ParseEmptyString, Parser) {
TEST(Parser, ParseEmptyString) {
std::string input{};
for (const auto compression :
{is_gzip_compressed::no, is_gzip_compressed::yes}) {
Expand All @@ -68,7 +68,7 @@ TEST(ParseEmptyString, Parser) {
}
}

TEST(ParseTrailingNewLine, Parser) {
TEST(Parser, ParseTrailingNewLine) {
std::string input{"foobar\n"};
std::vector<ss::sstring> expected{"foobar"};
for (const auto compression :
Expand All @@ -78,7 +78,7 @@ TEST(ParseTrailingNewLine, Parser) {
}
}

TEST(ParseLargePayloadWithSmallChunkSize, Parser) {
TEST(Parser, ParseLargePayloadWithSmallChunkSize) {
std::vector<ss::sstring> input;
input.reserve(1024);
for (auto i = 0; i < 1024; ++i) {
Expand All @@ -91,7 +91,7 @@ TEST(ParseLargePayloadWithSmallChunkSize, Parser) {
}
}

TEST(ParseGzippedFile, Parser) {
TEST(Parser, ParseGzippedFile) {
// This is the base64 encoding of a gzipped file containing the strings 1, 2
// and 3, one per line.
constexpr auto gzipped_data
Expand All @@ -102,7 +102,7 @@ TEST(ParseGzippedFile, Parser) {
EXPECT_EQ(collect(p), (std::vector<ss::sstring>{"1", "2", "3"}));
}

TEST(BatchSizeControlledByChunkSize, Parser) {
TEST(Parser, BatchSizeControlledByChunkSize) {
size_t chunk_size{4096};

std::vector<ss::sstring> input;
Expand Down
16 changes: 8 additions & 8 deletions src/v/compression/tests/gzip_stream_decompression_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "bytes/iostream.h"
#include "compression/gzip_stream_decompression.h"
#include "compression/internal/gzip_compressor.h"
#include "test_utils/randoms.h"
#include "utils/base64.h"

#include <seastar/core/iostream.hh>
Expand All @@ -24,9 +25,8 @@ using namespace compression;
namespace {

ss::input_stream<char> make_compressed_stream(ss::sstring s) {
iobuf b;
b.append(s.data(), s.size());
return make_iobuf_input_stream(internal::gzip_compressor::compress(b));
return make_iobuf_input_stream(
internal::gzip_compressor::compress(iobuf::from(s)));
}

ss::sstring consume(gzip_stream_decompressor& gsd) {
Expand All @@ -45,8 +45,8 @@ ss::sstring consume(gzip_stream_decompressor& gsd) {

} // namespace

TEST(SmallChunkSize, GzStrm) {
ss::sstring test_string(100000, 'a');
TEST(GzStrm, SmallChunkSize) {
auto test_string = tests::random_named_string<ss::sstring>(100000);
gzip_stream_decompressor gsd{make_compressed_stream(test_string), 5};
gsd.reset();
auto output = consume(gsd);
Expand All @@ -55,7 +55,7 @@ TEST(SmallChunkSize, GzStrm) {
gsd.stop().get();
}

TEST(SmallPayloadLargeChunkSize, GzStrm) {
TEST(GzStrm, SmallPayloadLargeChunkSize) {
ss::sstring test_string("xyz");
gzip_stream_decompressor gsd{make_compressed_stream(test_string), 4096};
gsd.reset();
Expand All @@ -65,7 +65,7 @@ TEST(SmallPayloadLargeChunkSize, GzStrm) {
gsd.stop().get();
}

TEST(EmptyString, GzStrm) {
TEST(GzStrm, EmptyString) {
ss::sstring test_string;
gzip_stream_decompressor gsd{make_compressed_stream(test_string), 4096};
gsd.reset();
Expand All @@ -75,7 +75,7 @@ TEST(EmptyString, GzStrm) {
gsd.stop().get();
}

TEST(NonStandardCompressionMethod, GzStrm) {
TEST(GzStrm, NonStandardCompressionMethod) {
// Tests payload compressed with gzip -9
const auto payload
= "H4sICG5pgmYCA21vYnktZGlja2FoAFVXwY7kthG9N9D/"
Expand Down