Skip to content

Commit

Permalink
Merge pull request redpanda-data#13067 from VladLazar/fix-compaction-…
Browse files Browse the repository at this point in the history
…reducer-over-alloc

storage: use B-Tree for compaction reducer
  • Loading branch information
Vlad Lazar authored Aug 31, 2023
2 parents f5f6d08 + 696d34b commit ee1157b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 20 deletions.
15 changes: 10 additions & 5 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <seastar/core/future.hh>

#include <absl/algorithm/container.h>
#include <absl/container/flat_hash_map.h>
#include <boost/range/irange.hpp>

#include <algorithm>
Expand All @@ -37,8 +36,12 @@ compaction_key_reducer::operator()(compacted_index::entry&& e) {
using stop_t = ss::stop_iteration;
const model::offset o = e.offset + model::offset(e.delta);

auto it = _indices.find(e.key);
if (it != _indices.end()) {
auto [begin, end] = _indices.equal_range(_hasher(e.key));
auto it = std::find_if(begin, end, [&e](const auto& entry) {
return entry.second.key == e.key;
});

if (it != end) {
if (o > it->second.offset) {
// cannot be std::max() because _natural_index must be preserved
it->second.offset = o;
Expand All @@ -59,15 +62,17 @@ compaction_key_reducer::operator()(compacted_index::entry&& e) {
* pseudo random elemnent
*/
auto mit = _indices.begin();
_keys_mem_usage -= mit->first.size();
_keys_mem_usage -= mit->second.key.size();

// write the entry again - we ran out of scratch space
_inverted.add(mit->second.natural_index);
_indices.erase(mit);
}
// TODO: account for short string optimisation here
_keys_mem_usage += e.key.size();
// 2. do the insertion
_indices.emplace(std::move(e.key), value_type(o, _natural_index));
_indices.emplace(
_hasher(e.key), value_type(std::move(e.key), o, _natural_index));
}

++_natural_index; // MOST important
Expand Down
35 changes: 20 additions & 15 deletions src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include "storage/logger.h"
#include "units.h"
#include "utils/fragmented_vector.h"
#include "utils/tracking_allocator.h"

#include <absl/container/btree_map.h>
#include <absl/container/node_hash_map.h>
#include <fmt/core.h>
#include <roaring/roaring.hh>

Expand All @@ -36,35 +36,40 @@ class compaction_key_reducer : public compaction_reducer {
public:
static constexpr const size_t default_max_memory_usage = 5_MiB;
struct value_type {
value_type(model::offset o, uint32_t i)
: offset(o)
value_type(bytes k, model::offset o, uint32_t i)
: key(std::move(k))
, offset(o)
, natural_index(i) {}
bytes key;
model::offset offset;
uint32_t natural_index;
};
using underlying_t = absl::node_hash_map<
bytes,
using underlying_t = absl::btree_multimap<
uint64_t,
value_type,
bytes_hasher<uint64_t, xxhash_64>,
bytes_type_eq>;
std::less<>,
util::tracking_allocator<value_type>>;

explicit compaction_key_reducer(size_t max_mem = default_max_memory_usage)
: _max_mem(max_mem) {}
: _max_mem(max_mem)
, _memory_tracker(
ss::make_shared<util::mem_tracker>("compaction_key_reducer_index"))
, _indices{util::tracking_allocator<value_type>{_memory_tracker}} {}

ss::future<ss::stop_iteration> operator()(compacted_index::entry&&);
roaring::Roaring end_of_stream();
size_t idx_mem_usage() { return _memory_tracker->consumption(); }

private:
size_t idx_mem_usage() {
using debug = absl::container_internal::hashtable_debug_internal::
HashtableDebugAccess<underlying_t>;
return debug::AllocatedByteSize(_indices);
}
roaring::Roaring _inverted;
underlying_t _indices;
size_t _keys_mem_usage{0};
size_t _max_mem{0};
uint32_t _natural_index{0};

roaring::Roaring _inverted;

ss::shared_ptr<util::mem_tracker> _memory_tracker;
underlying_t _indices;
bytes_hasher<uint64_t, xxhash_64> _hasher;
};

/// This class copies the input reader into the writer consulting the bitmap of
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ rp_test(
offset_to_filepos_test.cc
offset_translator_state_test.cc
file_sanitizer_test.cc
compaction_reducer_test.cc
LIBRARIES v::seastar_testing_main v::storage_test_utils v::model_test_utils
LABELS storage
ARGS "-- -c 1"
Expand Down
61 changes: 61 additions & 0 deletions src/v/storage/tests/compaction_reducer_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include "random/generators.h"
#include "storage/compacted_index.h"
#include "storage/compaction_reducers.h"

#include <seastar/testing/thread_test_case.hh>

SEASTAR_THREAD_TEST_CASE(compaction_reducer_key_clash_test) {
// Insert three elements with the same key in the reducer
// and validate that the one with the largest offset wins.

storage::internal::compaction_key_reducer reducer{16_KiB};

auto key = random_generators::get_bytes(20);

// natural offset 0, rp offset 0
storage::compacted_index::entry entry_at_0(
storage::compacted_index::entry_type::key,
storage::compaction_key(key),
model::offset(0),
0);

// natural index 1, rp offset 5 (should win)
storage::compacted_index::entry entry_at_5(
storage::compacted_index::entry_type::key,
storage::compaction_key(key),
model::offset(5),
0);

// natural index 2, rp offset 1
storage::compacted_index::entry entry_at_1(
storage::compacted_index::entry_type::key,
storage::compaction_key(key),
model::offset(1),
0);

reducer(std::move(entry_at_0)).get();
reducer(std::move(entry_at_5)).get();
reducer(std::move(entry_at_1)).get();

auto bitmap = reducer.end_of_stream();
BOOST_REQUIRE_EQUAL(bitmap.minimum(), 1);
BOOST_REQUIRE_EQUAL(bitmap.maximum(), 1);
}

SEASTAR_THREAD_TEST_CASE(compaction_reducer_max_mem_usage_test) {
storage::internal::compaction_key_reducer reducer{16_KiB};

// Empirically, 200 of the entries below use 16KiB of memory.
// Test that the index stays within the memory usage bounds.
for (size_t i = 0; i < 1000; ++i) {
auto key = random_generators::get_bytes(20);
storage::compacted_index::entry entry(
storage::compacted_index::entry_type::key,
storage::compaction_key(std::move(key)),
model::offset(i),
0);

reducer(std::move(entry)).get();
BOOST_REQUIRE_LE(reducer.idx_mem_usage(), 16_KiB);
}
}

0 comments on commit ee1157b

Please sign in to comment.