Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(dense set): fix scan function to return only home bucket data #474

Merged
merged 1 commit into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 65 additions & 27 deletions src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ constexpr size_t kMinSize = 1 << kMinSizeShift;
constexpr bool kAllowDisplacements = true;

DenseSet::IteratorBase::IteratorBase(const DenseSet* owner, bool is_end)
: owner_(const_cast<DenseSet&>(*owner)),
curr_entry_(nullptr) {
: owner_(const_cast<DenseSet&>(*owner)), curr_entry_(nullptr) {
curr_list_ = is_end ? owner_.entries_.end() : owner_.entries_.begin();
if (curr_list_ != owner->entries_.end()) {
curr_entry_ = &(*curr_list_);
Expand Down Expand Up @@ -173,6 +172,32 @@ bool DenseSet::Equal(DensePtr dptr, const void* ptr, uint32_t cookie) const {
return ObjEqual(dptr.GetObject(), ptr, cookie);
}

bool DenseSet::NoItemBelongsBucket(uint32_t bid) const {
auto& entries = const_cast<DenseSet*>(this)->entries_;
DensePtr* curr = &entries[bid];
ExpireIfNeeded(nullptr, curr);
if (!curr->IsEmpty() && !curr->IsDisplaced()) {
return false;
}

if (bid + 1 < entries_.size()) {
DensePtr* right_bucket = &entries[bid + 1];
ExpireIfNeeded(nullptr, right_bucket);
if (!right_bucket->IsEmpty() && right_bucket->IsDisplaced() &&
right_bucket->GetDisplacedDirection() == 1)
return false;
}

if (bid > 0) {
DensePtr* left_bucket = &entries[bid - 1];
ExpireIfNeeded(nullptr, left_bucket);
if (!left_bucket->IsEmpty() && left_bucket->IsDisplaced() &&
left_bucket->GetDisplacedDirection() == -1)
return false;
}
return true;
}

auto DenseSet::FindEmptyAround(uint32_t bid) -> ChainVectorIterator {
ExpireIfNeeded(nullptr, &entries_[bid]);

Expand Down Expand Up @@ -494,33 +519,45 @@ uint32_t DenseSet::Scan(uint32_t cursor, const ItemCb& cb) const {

auto& entries = const_cast<DenseSet*>(this)->entries_;

// skip empty entries
do {
while (entries_idx < entries_.size() && entries_[entries_idx].IsEmpty()) {
++entries_idx;
}

if (entries_idx == entries_.size()) {
return 0;
}
// First find the bucket to scan, skip empty buckets.
// A bucket is empty if the current index is empty and the data is not displaced
// to the right or to the left.
while (entries_idx < entries_.size() && NoItemBelongsBucket(entries_idx)) {
++entries_idx;
}

ExpireIfNeeded(nullptr, &entries[entries_idx]);
} while (entries_[entries_idx].IsEmpty());
if (entries_idx == entries_.size()) {
return 0;
}

DensePtr* curr = &entries[entries_idx];

// when scanning add all entries in a given chain
while (true) {
cb(curr->GetObject());
if (!curr->IsLink())
break;
// Check home bucket
if (!curr->IsEmpty() && !curr->IsDisplaced()) {
// scanning add all entries in a given chain
while (true) {
cb(curr->GetObject());
if (!curr->IsLink())
break;

DensePtr* mcurr = const_cast<DensePtr*>(curr);
DensePtr* mcurr = const_cast<DensePtr*>(curr);

if (ExpireIfNeeded(mcurr, &mcurr->AsLink()->next) && !mcurr->IsLink()) {
break;
if (ExpireIfNeeded(mcurr, &mcurr->AsLink()->next) && !mcurr->IsLink()) {
break;
}
curr = &curr->AsLink()->next;
}
}

// Check if the bucket on the left belongs to the home bucket.
if (entries_idx > 0) {
DensePtr* left_bucket = &entries[entries_idx - 1];
ExpireIfNeeded(nullptr, left_bucket);

if (left_bucket->IsDisplaced() &&
left_bucket->GetDisplacedDirection() == -1) { // left of the home bucket
cb(left_bucket->GetObject());
}
curr = &curr->AsLink()->next;
}

// move to the next index for the next scan and check if we are done
Expand All @@ -529,12 +566,13 @@ uint32_t DenseSet::Scan(uint32_t cursor, const ItemCb& cb) const {
return 0;
}

// In case of displacement, we want to fully cover the bucket we traversed, therefore
// we check if the bucket on the right belongs to the home bucket.
ExpireIfNeeded(nullptr, &entries[entries_idx]);
// Check if the bucket on the right belongs to the home bucket.
DensePtr* right_bucket = &entries[entries_idx];
ExpireIfNeeded(nullptr, right_bucket);

if (entries[entries_idx].GetDisplacedDirection() == 1) { // right of the home bucket
cb(entries[entries_idx].GetObject());
if (right_bucket->IsDisplaced() &&
right_bucket->GetDisplacedDirection() == 1) { // right of the home bucket
cb(right_bucket->GetObject());
}

return entries_idx << (32 - capacity_log_);
Expand Down
3 changes: 3 additions & 0 deletions src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ class DenseSet {

// return a ChainVectorIterator (a.k.a iterator) or end if there is an empty chain found
ChainVectorIterator FindEmptyAround(uint32_t bid);
// return if bucket has no item which is not displaced and right/left bucket has no displaced item
// belong to given bid
bool NoItemBelongsBucket(uint32_t bid) const;
void Grow();

// ============ Pseudo Linked List Functions for interacting with Chains ==================
Expand Down
5 changes: 5 additions & 0 deletions src/server/set_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,14 @@ TEST_F(SetFamilyTest, SScan) {
vec = StrArray(resp.GetVec()[1]);
EXPECT_THAT(vec.size(), 5);

resp = Run({"sscan", "mystrset", "0", "match", "str-1*"});
vec = StrArray(resp.GetVec()[1]);
EXPECT_THAT(vec, UnorderedElementsAre("str-1", "str-10", "str-11", "str-12", "str-13", "str-14"));

resp = Run({"sscan", "mystrset", "0", "match", "str-1*", "count", "3"});
vec = StrArray(resp.GetVec()[1]);
EXPECT_THAT(vec, IsSubsetOf({"str-1", "str-10", "str-11", "str-12", "str-13", "str-14"}));
EXPECT_EQ(vec.size(), 3);

// nothing should match this
resp = Run({"sscan", "mystrset", "0", "match", "1*"});
Expand Down