Skip to content

Commit

Permalink
chore: Implement AddMany method (#3866)
Browse files Browse the repository at this point in the history
* chore: Implement AddMany method

1. Fix a performance bug in Find2 that made redundant comparisons
2. Provide a method to StringSet that adds several items in a batch
3. Use AddMany inside set_family

Before:
```
BM_Add        4253939 ns      4253713 ns          991
```

After:
```
BM_Add        3482177 ns      3482050 ns         1206
BM_AddMany    3101622 ns      3101507 ns         1360
```

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* chore: fixes

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored and kostasrim committed Oct 7, 2024
1 parent 0abdc73 commit 6970b66
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 9 deletions.
21 changes: 15 additions & 6 deletions src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,11 @@ void DenseSet::AddUnique(void* obj, bool has_ttl, uint64_t hashcode) {
++size_;
}

void DenseSet::Prefetch(uint64_t hash) {
uint32_t bid = BucketId(hash);
PREFETCH_READ(&entries_[bid]);
}

auto DenseSet::Find2(const void* ptr, uint32_t bid, uint32_t cookie)
-> tuple<size_t, DensePtr*, DensePtr*> {
DCHECK_LT(bid, entries_.size());
Expand All @@ -563,19 +568,23 @@ auto DenseSet::Find2(const void* ptr, uint32_t bid, uint32_t cookie)
// first look for displaced nodes since this is quicker than iterating a potential long chain
if (bid > 0) {
curr = &entries_[bid - 1];
ExpireIfNeeded(nullptr, curr);
if (curr->IsDisplaced() && curr->GetDisplacedDirection() == -1) {
ExpireIfNeeded(nullptr, curr);

if (Equal(*curr, ptr, cookie)) {
return {bid - 1, nullptr, curr};
if (Equal(*curr, ptr, cookie)) {
return {bid - 1, nullptr, curr};
}
}
}

if (bid + 1 < entries_.size()) {
curr = &entries_[bid + 1];
ExpireIfNeeded(nullptr, curr);
if (curr->IsDisplaced() && curr->GetDisplacedDirection() == 1) {
ExpireIfNeeded(nullptr, curr);

if (Equal(*curr, ptr, cookie)) {
return {bid + 1, nullptr, curr};
if (Equal(*curr, ptr, cookie)) {
return {bid + 1, nullptr, curr};
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class DenseSet {

public:
using MemoryResource = PMR_NS::memory_resource;
static constexpr uint32_t kMaxBatchLen = 32;

explicit DenseSet(MemoryResource* mr = PMR_NS::get_default_resource());
virtual ~DenseSet();
Expand Down Expand Up @@ -317,6 +318,8 @@ class DenseSet {
// Assumes that the object does not exist in the set.
void AddUnique(void* obj, bool has_ttl, uint64_t hashcode);

void Prefetch(uint64_t hash);

private:
DenseSet(const DenseSet&) = delete;
DenseSet& operator=(DenseSet&) = delete;
Expand Down
36 changes: 36 additions & 0 deletions src/core/string_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,42 @@ bool StringSet::Add(string_view src, uint32_t ttl_sec) {
return true;
}

unsigned StringSet::AddMany(absl::Span<std::string_view> span, uint32_t ttl_sec) {
uint64_t hash[kMaxBatchLen];
string_view* data = span.data();
bool has_ttl = ttl_sec != UINT32_MAX;
size_t count = span.size();
unsigned res = 0;

if (BucketCount() < count) {
Reserve(count);
}
while (count >= kMaxBatchLen) {
for (unsigned i = 0; i < kMaxBatchLen; ++i) {
hash[i] = CompactObj::HashCode(data[i]);
Prefetch(hash[i]);
}

for (unsigned i = 0; i < kMaxBatchLen; ++i) {
void* prev = FindInternal(data + i, hash[i], 1);
if (prev == nullptr) {
++res;
sds field = MakeSetSds(data[i], ttl_sec);
AddUnique(field, has_ttl, hash[i]);
}
}

count -= kMaxBatchLen;
data += kMaxBatchLen;
res += kMaxBatchLen;
}

for (unsigned i = 0; i < count; ++i) {
res += Add(data[i], ttl_sec);
}
return res;
}

std::optional<std::string> StringSet::Pop() {
sds str = (sds)PopInternal();

Expand Down
5 changes: 4 additions & 1 deletion src/core/string_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

#pragma once

#include <absl/types/span.h>

#include <cstdint>
#include <functional>
#include <optional>
#include <string>
#include <string_view>

#include "core/dense_set.h"
Expand All @@ -28,6 +29,8 @@ class StringSet : public DenseSet {
// Returns true if elem was added.
bool Add(std::string_view s1, uint32_t ttl_sec = UINT32_MAX);

unsigned AddMany(absl::Span<std::string_view> span, uint32_t ttl_sec);

bool Erase(std::string_view str) {
return EraseInternal(&str, 1);
}
Expand Down
30 changes: 30 additions & 0 deletions src/core/string_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,34 @@ void BM_Add(benchmark::State& state) {
}
BENCHMARK(BM_Add);

void BM_AddMany(benchmark::State& state) {
vector<string> strs;
mt19937 generator(0);
StringSet ss;
unsigned elems = 100000;
for (size_t i = 0; i < elems; ++i) {
string str = random_string(generator, 16);
strs.push_back(str);
}
ss.Reserve(elems);
array<string_view, 32> str_views;

while (state.KeepRunning()) {
unsigned offset = 0;
while (offset < elems) {
unsigned len = min(elems - offset, 32u);
for (size_t i = 0; i < len; ++i) {
str_views[i] = strs[offset + i];
}
offset += len;
ss.AddMany({str_views.data(), len}, UINT32_MAX);
}
state.PauseTiming();
ss.Clear();
ss.Reserve(elems);
state.ResumeTiming();
}
}
BENCHMARK(BM_AddMany);

} // namespace dfly
20 changes: 18 additions & 2 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,24 @@ struct StringSetWrapper {

unsigned Add(const NewEntries& entries, uint32_t ttl_sec) const {
unsigned res = 0;
for (string_view member : EntriesRange(entries))
res += ss->Add(member, ttl_sec);
string_view members[StringSet::kMaxBatchLen];
size_t entries_len = std::visit([](const auto& e) { return e.size(); }, entries);
unsigned len = 0;
if (ss->BucketCount() < entries_len) {
ss->Reserve(entries_len);
}
for (string_view member : EntriesRange(entries)) {
members[len++] = member;
if (len == StringSet::kMaxBatchLen) {
res += ss->AddMany({members, StringSet::kMaxBatchLen}, ttl_sec);
len = 0;
}
}

if (len) {
res += ss->AddMany({members, len}, ttl_sec);
}

return res;
}

Expand Down

0 comments on commit 6970b66

Please sign in to comment.