Skip to content

Commit

Permalink
Merge pull request #21497 from abhijat/feat/inv-scrub/report-parser-f…
Browse files Browse the repository at this point in the history
…ollow-ups

cst/inv: report parser follow ups
  • Loading branch information
abhijat authored Jul 19, 2024
2 parents f8bc2f8 + 69c623e commit da183a4
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 33 deletions.
23 changes: 15 additions & 8 deletions src/v/cloud_storage/inventory/inv_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
#include <seastar/core/file.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/seastar.hh>
#include <seastar/coroutine/as_future.hh>

#include <re2/re2.h>

#include <exception>
#include <ranges>

namespace ranges = std::ranges;
Expand All @@ -32,9 +34,18 @@ namespace views = std::views;
namespace {
// hash-string/ns/tp/partition_rev/.*
const RE2 path_expr{"^[[:xdigit:]]+/(.*?)/(.*?)/(\\d+)_\\d+/.*?"};

// Holds hashes for a given NTP in memory before they will be flushed to disk.
// One of these structures is held per NTP in a map keyed by the NTP itself.
struct flush_entry {
model::ntp ntp;
// The vector of hashes which will be written to disk in the file_name on
// next flush.
fragmented_vector<uint64_t> hashes;
// The file to which hashes will be written. The file name is incremented on
// each flush, so each file name contains some hashes, and we may end up
// with multiple files per NTP. The hash loader will read all the files and
// collect the hashes together.
uint64_t file_name;
};

Expand Down Expand Up @@ -213,15 +224,11 @@ ss::future<> inventory_consumer::write_hashes_to_file(
vlog(cst_log.trace, "Writing {} hashe(s) to disk", hashes.size());
std::exception_ptr ep;
auto stream = co_await ss::make_file_output_stream(f);
try {
co_await write_hashes_to_file(stream, std::move(hashes));
} catch (...) {
ep = std::current_exception();
}

auto res = co_await ss::coroutine::as_future(
write_hashes_to_file(stream, std::move(hashes)));
co_await stream.close();
if (ep) {
std::rethrow_exception(ep);
if (res.failed()) {
std::rethrow_exception(res.get_exception());
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/v/cloud_storage/inventory/inv_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class inventory_consumer {
ss::future<> process_paths(fragmented_vector<ss::sstring> paths);

// Checks if the path belongs to one of the NTPs whose leadership belongs to
// this node. If so, the path is hashed and added to the `_path_hashes` map.
// this node. If so, the path is hashed and added to current NTP flush
// states, and will be written to disk on the next flush operation.
void process_path(ss::sstring path);

// Writes the largest hash vectors to disk. The vectors are written in files
Expand Down Expand Up @@ -111,7 +112,7 @@ class inventory_consumer {
// performed to free up memory.
size_t _max_hash_size_in_memory;

// Total size of path hashes held in memory
// Total size in bytes of path hashes held in memory
size_t _total_size{};

size_t _num_flushes{};
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/inventory/ntp_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "cloud_storage/inventory/ntp_hashes.h"

#include "base/likely.h"
#include "cloud_storage/logger.h"
#include "container/fragmented_vector.h"
#include "hashing/xx.h"
Expand Down Expand Up @@ -94,10 +95,9 @@ ss::future<> ntp_path_hashes::load_hashes(ss::input_stream<char>& stream) {
vlog(_ctxlog.trace, "read {} path hashes from disk", hashes.size());

for (auto hash : hashes) {
if (unlikely(_path_hashes.contains(hash))) {
auto [_, inserted] = _path_hashes.insert(hash);
if (unlikely(!inserted)) {
_possible_collisions.insert(hash);
} else {
_path_hashes.insert(hash);
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/v/cloud_storage/inventory/tests/inv_consumer_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ constexpr auto compressed

} // namespace

TEST(ParseNTPFromPath, Consumer) {
TEST(Consumer, ParseNTPFromPath) {
using p = std::pair<std::string_view, std::optional<model::ntp>>;
std::vector<p> test_data{
{"a0a6eeb8/kafka/topic-x/999_24/178-188-1574137-1-v1.log.1",
Expand All @@ -130,7 +130,7 @@ TEST(ParseNTPFromPath, Consumer) {
}
}

TEST(ParseCSV, Consumer) {
TEST(Consumer, ParseCSV) {
inventory_consumer c{"", {}, 0};
using p = std::pair<const char*, std::vector<ss::sstring>>;
std::vector<p> test_data{
Expand All @@ -152,7 +152,7 @@ TEST(ParseCSV, Consumer) {
}
}

TEST(LargeNumberOfPaths, Consumer) {
TEST(Consumer, LargeNumberOfPaths) {
temporary_dir t{"test_inv_consumer"};
const auto p0 = make_ntp("kafka", "topic-A", 110);

Expand Down Expand Up @@ -183,7 +183,7 @@ TEST(LargeNumberOfPaths, Consumer) {
}
}

TEST(WriteThenReadHashes, Consumer) {
TEST(Consumer, WriteThenReadHashes) {
temporary_dir t{"test_inv_consumer"};

const auto p0 = make_ntp("kafka", "partagas", 0);
Expand Down Expand Up @@ -224,7 +224,7 @@ TEST(WriteThenReadHashes, Consumer) {
}
}

TEST(CollisionDetection, Consumer) {
TEST(Consumer, CollisionDetection) {
temporary_dir t{"test_inv_consumer"};
const auto p0 = make_ntp("kafka", "topic-A", 110);

Expand All @@ -247,7 +247,7 @@ TEST(CollisionDetection, Consumer) {
h.stop().get();
}

TEST(ConsumeMultipleStreams, Consumer) {
TEST(Consumer, ConsumeMultipleStreams) {
temporary_dir t{"test_inv_consumer"};
const auto p0 = make_ntp("kafka", "partagas", 0);

Expand Down
12 changes: 6 additions & 6 deletions src/v/cloud_storage/inventory/tests/report_parser_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ report_parser make_parser(
}
} // namespace

TEST(ParseReport, Parser) {
TEST(Parser, ParseReport) {
auto input = std::vector<ss::sstring>{"foo", "bar", "x"};
for (const auto compression :
{is_gzip_compressed::no, is_gzip_compressed::yes}) {
Expand All @@ -59,7 +59,7 @@ TEST(ParseReport, Parser) {
}
}

TEST(ParseEmptyString, Parser) {
TEST(Parser, ParseEmptyString) {
std::string input{};
for (const auto compression :
{is_gzip_compressed::no, is_gzip_compressed::yes}) {
Expand All @@ -68,7 +68,7 @@ TEST(ParseEmptyString, Parser) {
}
}

TEST(ParseTrailingNewLine, Parser) {
TEST(Parser, ParseTrailingNewLine) {
std::string input{"foobar\n"};
std::vector<ss::sstring> expected{"foobar"};
for (const auto compression :
Expand All @@ -78,7 +78,7 @@ TEST(ParseTrailingNewLine, Parser) {
}
}

TEST(ParseLargePayloadWithSmallChunkSize, Parser) {
TEST(Parser, ParseLargePayloadWithSmallChunkSize) {
std::vector<ss::sstring> input;
input.reserve(1024);
for (auto i = 0; i < 1024; ++i) {
Expand All @@ -91,7 +91,7 @@ TEST(ParseLargePayloadWithSmallChunkSize, Parser) {
}
}

TEST(ParseGzippedFile, Parser) {
TEST(Parser, ParseGzippedFile) {
// This is the base64 encoding of a gzipped file containing the strings 1, 2
// and 3, one per line.
constexpr auto gzipped_data
Expand All @@ -102,7 +102,7 @@ TEST(ParseGzippedFile, Parser) {
EXPECT_EQ(collect(p), (std::vector<ss::sstring>{"1", "2", "3"}));
}

TEST(BatchSizeControlledByChunkSize, Parser) {
TEST(Parser, BatchSizeControlledByChunkSize) {
size_t chunk_size{4096};

std::vector<ss::sstring> input;
Expand Down
16 changes: 8 additions & 8 deletions src/v/compression/tests/gzip_stream_decompression_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "bytes/iostream.h"
#include "compression/gzip_stream_decompression.h"
#include "compression/internal/gzip_compressor.h"
#include "test_utils/randoms.h"
#include "utils/base64.h"

#include <seastar/core/iostream.hh>
Expand All @@ -24,9 +25,8 @@ using namespace compression;
namespace {

ss::input_stream<char> make_compressed_stream(ss::sstring s) {
iobuf b;
b.append(s.data(), s.size());
return make_iobuf_input_stream(internal::gzip_compressor::compress(b));
return make_iobuf_input_stream(
internal::gzip_compressor::compress(iobuf::from(s)));
}

ss::sstring consume(gzip_stream_decompressor& gsd) {
Expand All @@ -45,8 +45,8 @@ ss::sstring consume(gzip_stream_decompressor& gsd) {

} // namespace

TEST(SmallChunkSize, GzStrm) {
ss::sstring test_string(100000, 'a');
TEST(GzStrm, SmallChunkSize) {
auto test_string = tests::random_named_string<ss::sstring>(100000);
gzip_stream_decompressor gsd{make_compressed_stream(test_string), 5};
gsd.reset();
auto output = consume(gsd);
Expand All @@ -55,7 +55,7 @@ TEST(SmallChunkSize, GzStrm) {
gsd.stop().get();
}

TEST(SmallPayloadLargeChunkSize, GzStrm) {
TEST(GzStrm, SmallPayloadLargeChunkSize) {
ss::sstring test_string("xyz");
gzip_stream_decompressor gsd{make_compressed_stream(test_string), 4096};
gsd.reset();
Expand All @@ -65,7 +65,7 @@ TEST(SmallPayloadLargeChunkSize, GzStrm) {
gsd.stop().get();
}

TEST(EmptyString, GzStrm) {
TEST(GzStrm, EmptyString) {
ss::sstring test_string;
gzip_stream_decompressor gsd{make_compressed_stream(test_string), 4096};
gsd.reset();
Expand All @@ -75,7 +75,7 @@ TEST(EmptyString, GzStrm) {
gsd.stop().get();
}

TEST(NonStandardCompressionMethod, GzStrm) {
TEST(GzStrm, NonStandardCompressionMethod) {
// Tests payload compressed with gzip -9
const auto payload
= "H4sICG5pgmYCA21vYnktZGlja2FoAFVXwY7kthG9N9D/"
Expand Down

0 comments on commit da183a4

Please sign in to comment.