Skip to content

Commit

Permalink
storage: use B-Tree for compaction reducer
Browse files Browse the repository at this point in the history
Swap out the node hash map for a B-Tree. The issue with the node hash
map is it's growth pattern. It doubles in size whenver it nears its
capacity, which results in large allocations. The B-Tree is a lot more
well behaved in this regard.

Note how the key into the B-Tree is the hash of the key. This avoids a
lot of expensive string compares and maintains the randomness for when
we evict from the index. Since there can be collisions, I've used the
multimap variant of the tree.

(cherry picked from commit b2bab8b)
  • Loading branch information
Vlad Lazar authored and vbotbuildovich committed Aug 31, 2023
1 parent 0f28eb9 commit 5453caa
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
15 changes: 10 additions & 5 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <seastar/core/future.hh>

#include <absl/algorithm/container.h>
#include <absl/container/flat_hash_map.h>
#include <boost/range/irange.hpp>

#include <algorithm>
Expand All @@ -37,8 +36,12 @@ compaction_key_reducer::operator()(compacted_index::entry&& e) {
using stop_t = ss::stop_iteration;
const model::offset o = e.offset + model::offset(e.delta);

auto it = _indices.find(e.key);
if (it != _indices.end()) {
auto [begin, end] = _indices.equal_range(_hasher(e.key));
auto it = std::find_if(begin, end, [&e](const auto& entry) {
return entry.second.key == e.key;
});

if (it != end) {
if (o > it->second.offset) {
// cannot be std::max() because _natural_index must be preserved
it->second.offset = o;
Expand All @@ -59,15 +62,17 @@ compaction_key_reducer::operator()(compacted_index::entry&& e) {
* pseudo random elemnent
*/
auto mit = _indices.begin();
_keys_mem_usage -= mit->first.size();
_keys_mem_usage -= mit->second.key.size();

// write the entry again - we ran out of scratch space
_inverted.add(mit->second.natural_index);
_indices.erase(mit);
}
// TODO: account for short string optimisation here
_keys_mem_usage += e.key.size();
// 2. do the insertion
_indices.emplace(std::move(e.key), value_type(o, _natural_index));
_indices.emplace(
_hasher(e.key), value_type(std::move(e.key), o, _natural_index));
}

++_natural_index; // MOST important
Expand Down
35 changes: 20 additions & 15 deletions src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include "storage/segment_appender.h"
#include "units.h"
#include "utils/fragmented_vector.h"
#include "utils/tracking_allocator.h"

#include <absl/container/btree_map.h>
#include <absl/container/node_hash_map.h>
#include <fmt/core.h>
#include <roaring/roaring.hh>

Expand All @@ -36,35 +36,40 @@ class compaction_key_reducer : public compaction_reducer {
public:
static constexpr const size_t default_max_memory_usage = 5_MiB;
struct value_type {
value_type(model::offset o, uint32_t i)
: offset(o)
value_type(bytes k, model::offset o, uint32_t i)
: key(std::move(k))
, offset(o)
, natural_index(i) {}
bytes key;
model::offset offset;
uint32_t natural_index;
};
using underlying_t = absl::node_hash_map<
bytes,
using underlying_t = absl::btree_multimap<
uint64_t,
value_type,
bytes_hasher<uint64_t, xxhash_64>,
bytes_type_eq>;
std::less<>,
util::tracking_allocator<value_type>>;

explicit compaction_key_reducer(size_t max_mem = default_max_memory_usage)
: _max_mem(max_mem) {}
: _max_mem(max_mem)
, _memory_tracker(
ss::make_shared<util::mem_tracker>("compaction_key_reducer_index"))
, _indices{util::tracking_allocator<value_type>{_memory_tracker}} {}

ss::future<ss::stop_iteration> operator()(compacted_index::entry&&);
roaring::Roaring end_of_stream();

private:
size_t idx_mem_usage() {
using debug = absl::container_internal::hashtable_debug_internal::
HashtableDebugAccess<underlying_t>;
return debug::AllocatedByteSize(_indices);
}
roaring::Roaring _inverted;
underlying_t _indices;
size_t idx_mem_usage() { return _memory_tracker->consumption(); }
size_t _keys_mem_usage{0};
size_t _max_mem{0};
uint32_t _natural_index{0};

roaring::Roaring _inverted;

ss::shared_ptr<util::mem_tracker> _memory_tracker;
underlying_t _indices;
bytes_hasher<uint64_t, xxhash_64> _hasher;
};

/// This class copies the input reader into the writer consulting the bitmap of
Expand Down

0 comments on commit 5453caa

Please sign in to comment.