diff --git a/src/v/cloud_storage/inventory/inv_consumer.cc b/src/v/cloud_storage/inventory/inv_consumer.cc index b827cd692c91..046812afa96c 100644 --- a/src/v/cloud_storage/inventory/inv_consumer.cc +++ b/src/v/cloud_storage/inventory/inv_consumer.cc @@ -21,9 +21,11 @@ #include #include #include +#include #include +#include #include namespace ranges = std::ranges; @@ -32,9 +34,18 @@ namespace views = std::views; namespace { // hash-string/ns/tp/partition_rev/.* const RE2 path_expr{"^[[:xdigit:]]+/(.*?)/(.*?)/(\\d+)_\\d+/.*?"}; + +// Holds hashes for a given NTP in memory before they will be flushed to disk. +// One of these structures is held per NTP in a map keyed by the NTP itself. struct flush_entry { model::ntp ntp; + // The vector of hashes which will be written to disk in the file_name on + // next flush. fragmented_vector hashes; + // The file to which hashes will be written. The file name is incremented on + // each flush, so each file name contains some hashes, and we may end up + // with multiple files per NTP. The hash loader will read all the files and + // collect the hashes together. uint64_t file_name; }; @@ -213,15 +224,11 @@ ss::future<> inventory_consumer::write_hashes_to_file( vlog(cst_log.trace, "Writing {} hashe(s) to disk", hashes.size()); std::exception_ptr ep; auto stream = co_await ss::make_file_output_stream(f); - try { - co_await write_hashes_to_file(stream, std::move(hashes)); - } catch (...) { - ep = std::current_exception(); - } - + auto res = co_await ss::coroutine::as_future( + write_hashes_to_file(stream, std::move(hashes))); co_await stream.close(); - if (ep) { - std::rethrow_exception(ep); + if (res.failed()) { + std::rethrow_exception(res.get_exception()); } } diff --git a/src/v/cloud_storage/inventory/inv_consumer.h b/src/v/cloud_storage/inventory/inv_consumer.h index ecc91ead709c..86780a33b473 100644 --- a/src/v/cloud_storage/inventory/inv_consumer.h +++ b/src/v/cloud_storage/inventory/inv_consumer.h @@ -68,7 +68,8 @@ class inventory_consumer { ss::future<> process_paths(fragmented_vector paths); // Checks if the path belongs to one of the NTPs whose leadership belongs to - // this node. If so, the path is hashed and added to the `_path_hashes` map. + // this node. If so, the path is hashed and added to current NTP flush + // states, and will be written to disk on the next flush operation. void process_path(ss::sstring path); // Writes the largest hash vectors to disk. The vectors are written in files @@ -111,7 +112,7 @@ class inventory_consumer { // performed to free up memory. size_t _max_hash_size_in_memory; - // Total size of path hashes held in memory + // Total size in bytes of path hashes held in memory size_t _total_size{}; size_t _num_flushes{}; diff --git a/src/v/cloud_storage/inventory/ntp_hashes.cc b/src/v/cloud_storage/inventory/ntp_hashes.cc index 42203e5d5b3a..454060bac9e9 100644 --- a/src/v/cloud_storage/inventory/ntp_hashes.cc +++ b/src/v/cloud_storage/inventory/ntp_hashes.cc @@ -11,6 +11,7 @@ #include "cloud_storage/inventory/ntp_hashes.h" +#include "base/likely.h" #include "cloud_storage/logger.h" #include "container/fragmented_vector.h" #include "hashing/xx.h" @@ -94,10 +95,9 @@ ss::future<> ntp_path_hashes::load_hashes(ss::input_stream& stream) { vlog(_ctxlog.trace, "read {} path hashes from disk", hashes.size()); for (auto hash : hashes) { - if (unlikely(_path_hashes.contains(hash))) { + auto [_, inserted] = _path_hashes.insert(hash); + if (unlikely(!inserted)) { _possible_collisions.insert(hash); - } else { - _path_hashes.insert(hash); } } } diff --git a/src/v/cloud_storage/inventory/tests/inv_consumer_tests.cc b/src/v/cloud_storage/inventory/tests/inv_consumer_tests.cc index 80db3c85973e..4be053191e0e 100644 --- a/src/v/cloud_storage/inventory/tests/inv_consumer_tests.cc +++ b/src/v/cloud_storage/inventory/tests/inv_consumer_tests.cc @@ -103,7 +103,7 @@ constexpr auto compressed } // namespace -TEST(ParseNTPFromPath, Consumer) { +TEST(Consumer, ParseNTPFromPath) { using p = std::pair>; std::vector

test_data{ {"a0a6eeb8/kafka/topic-x/999_24/178-188-1574137-1-v1.log.1", @@ -130,7 +130,7 @@ TEST(ParseNTPFromPath, Consumer) { } } -TEST(ParseCSV, Consumer) { +TEST(Consumer, ParseCSV) { inventory_consumer c{"", {}, 0}; using p = std::pair>; std::vector

test_data{ @@ -152,7 +152,7 @@ TEST(ParseCSV, Consumer) { } } -TEST(LargeNumberOfPaths, Consumer) { +TEST(Consumer, LargeNumberOfPaths) { temporary_dir t{"test_inv_consumer"}; const auto p0 = make_ntp("kafka", "topic-A", 110); @@ -183,7 +183,7 @@ TEST(LargeNumberOfPaths, Consumer) { } } -TEST(WriteThenReadHashes, Consumer) { +TEST(Consumer, WriteThenReadHashes) { temporary_dir t{"test_inv_consumer"}; const auto p0 = make_ntp("kafka", "partagas", 0); @@ -224,7 +224,7 @@ TEST(WriteThenReadHashes, Consumer) { } } -TEST(CollisionDetection, Consumer) { +TEST(Consumer, CollisionDetection) { temporary_dir t{"test_inv_consumer"}; const auto p0 = make_ntp("kafka", "topic-A", 110); @@ -247,7 +247,7 @@ TEST(CollisionDetection, Consumer) { h.stop().get(); } -TEST(ConsumeMultipleStreams, Consumer) { +TEST(Consumer, ConsumeMultipleStreams) { temporary_dir t{"test_inv_consumer"}; const auto p0 = make_ntp("kafka", "partagas", 0); diff --git a/src/v/cloud_storage/inventory/tests/report_parser_tests.cc b/src/v/cloud_storage/inventory/tests/report_parser_tests.cc index 82de3f52956c..a1f4aa2cd0e5 100644 --- a/src/v/cloud_storage/inventory/tests/report_parser_tests.cc +++ b/src/v/cloud_storage/inventory/tests/report_parser_tests.cc @@ -50,7 +50,7 @@ report_parser make_parser( } } // namespace -TEST(ParseReport, Parser) { +TEST(Parser, ParseReport) { auto input = std::vector{"foo", "bar", "x"}; for (const auto compression : {is_gzip_compressed::no, is_gzip_compressed::yes}) { @@ -59,7 +59,7 @@ TEST(ParseReport, Parser) { } } -TEST(ParseEmptyString, Parser) { +TEST(Parser, ParseEmptyString) { std::string input{}; for (const auto compression : {is_gzip_compressed::no, is_gzip_compressed::yes}) { @@ -68,7 +68,7 @@ TEST(ParseEmptyString, Parser) { } } -TEST(ParseTrailingNewLine, Parser) { +TEST(Parser, ParseTrailingNewLine) { std::string input{"foobar\n"}; std::vector expected{"foobar"}; for (const auto compression : @@ -78,7 +78,7 @@ TEST(ParseTrailingNewLine, Parser) { } } -TEST(ParseLargePayloadWithSmallChunkSize, Parser) { +TEST(Parser, ParseLargePayloadWithSmallChunkSize) { std::vector input; input.reserve(1024); for (auto i = 0; i < 1024; ++i) { @@ -91,7 +91,7 @@ TEST(ParseLargePayloadWithSmallChunkSize, Parser) { } } -TEST(ParseGzippedFile, Parser) { +TEST(Parser, ParseGzippedFile) { // This is the base64 encoding of a gzipped file containing the strings 1, 2 // and 3, one per line. constexpr auto gzipped_data @@ -102,7 +102,7 @@ TEST(ParseGzippedFile, Parser) { EXPECT_EQ(collect(p), (std::vector{"1", "2", "3"})); } -TEST(BatchSizeControlledByChunkSize, Parser) { +TEST(Parser, BatchSizeControlledByChunkSize) { size_t chunk_size{4096}; std::vector input; diff --git a/src/v/compression/tests/gzip_stream_decompression_tests.cc b/src/v/compression/tests/gzip_stream_decompression_tests.cc index 704bf013fcf3..630f1ae7eb93 100644 --- a/src/v/compression/tests/gzip_stream_decompression_tests.cc +++ b/src/v/compression/tests/gzip_stream_decompression_tests.cc @@ -13,6 +13,7 @@ #include "bytes/iostream.h" #include "compression/gzip_stream_decompression.h" #include "compression/internal/gzip_compressor.h" +#include "test_utils/randoms.h" #include "utils/base64.h" #include @@ -24,9 +25,8 @@ using namespace compression; namespace { ss::input_stream make_compressed_stream(ss::sstring s) { - iobuf b; - b.append(s.data(), s.size()); - return make_iobuf_input_stream(internal::gzip_compressor::compress(b)); + return make_iobuf_input_stream( + internal::gzip_compressor::compress(iobuf::from(s))); } ss::sstring consume(gzip_stream_decompressor& gsd) { @@ -45,8 +45,8 @@ ss::sstring consume(gzip_stream_decompressor& gsd) { } // namespace -TEST(SmallChunkSize, GzStrm) { - ss::sstring test_string(100000, 'a'); +TEST(GzStrm, SmallChunkSize) { + auto test_string = tests::random_named_string(100000); gzip_stream_decompressor gsd{make_compressed_stream(test_string), 5}; gsd.reset(); auto output = consume(gsd); @@ -55,7 +55,7 @@ TEST(SmallChunkSize, GzStrm) { gsd.stop().get(); } -TEST(SmallPayloadLargeChunkSize, GzStrm) { +TEST(GzStrm, SmallPayloadLargeChunkSize) { ss::sstring test_string("xyz"); gzip_stream_decompressor gsd{make_compressed_stream(test_string), 4096}; gsd.reset(); @@ -65,7 +65,7 @@ TEST(SmallPayloadLargeChunkSize, GzStrm) { gsd.stop().get(); } -TEST(EmptyString, GzStrm) { +TEST(GzStrm, EmptyString) { ss::sstring test_string; gzip_stream_decompressor gsd{make_compressed_stream(test_string), 4096}; gsd.reset(); @@ -75,7 +75,7 @@ TEST(EmptyString, GzStrm) { gsd.stop().get(); } -TEST(NonStandardCompressionMethod, GzStrm) { +TEST(GzStrm, NonStandardCompressionMethod) { // Tests payload compressed with gzip -9 const auto payload = "H4sICG5pgmYCA21vYnktZGlja2FoAFVXwY7kthG9N9D/"