Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: implement Erase for a range #4106

Merged
merged 1 commit into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion src/core/qlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ void QList::Compress(quicklistNode* node) {
reverse = reverse->prev;
}

if (!in_depth)
if (!in_depth && node)
CompressNodeIfNeeded(node);

/* At this point, forward and reverse are one node beyond depth */
Expand Down Expand Up @@ -1022,6 +1022,80 @@ auto QList::Erase(Iterator it) -> Iterator {
return it;
}

bool QList::Erase(const long start, unsigned count) {
if (count == 0)
return false;

unsigned extent = count; /* range is inclusive of start position */

if (start >= 0 && extent > (count_ - start)) {
/* if requesting delete more elements than exist, limit to list size. */
extent = count_ - start;
} else if (start < 0 && extent > (unsigned long)(-start)) {
/* else, if at negative offset, limit max size to rest of list. */
extent = -start; /* c.f. LREM -29 29; just delete until end. */
}

Iterator it = GetIterator(start);
quicklistNode* node = it.current_;
long offset = it.offset_;

/* iterate over next nodes until everything is deleted. */
while (extent) {
quicklistNode* next = node->next;

unsigned long del;
int delete_entire_node = 0;
if (offset == 0 && extent >= node->count) {
/* If we are deleting more than the count of this node, we
* can just delete the entire node without listpack math. */
delete_entire_node = 1;
del = node->count;
} else if (offset >= 0 && extent + offset >= node->count) {
/* If deleting more nodes after this one, calculate delete based
* on size of current node. */
del = node->count - offset;
} else if (offset < 0) {
/* If offset is negative, we are in the first run of this loop
* and we are deleting the entire range
* from this start offset to end of list. Since the Negative
* offset is the number of elements until the tail of the list,
* just use it directly as the deletion count. */
del = -offset;

/* If the positive offset is greater than the remaining extent,
* we only delete the remaining extent, not the entire offset.
*/
if (del > extent)
del = extent;
} else {
/* else, we are deleting less than the extent of this node, so
* use extent directly. */
del = extent;
}

if (delete_entire_node || QL_NODE_IS_PLAIN(node)) {
DelNode(node);
} else {
DecompressNodeIfNeeded(true, node);
node->entry = lpDeleteRange(node->entry, offset, del);
NodeUpdateSz(node);
node->count -= del;
count_ -= del;
if (node->count == 0) {
DelNode(node);
} else {
RecompressOnly(node);
}
}

extent -= del;
node = next;
offset = 0;
}
return true;
}

bool QList::Entry::operator==(std::string_view sv) const {
if (std::holds_alternative<int64_t>(value_)) {
char buf[absl::numbers_internal::kFastToBufferSize];
Expand Down
13 changes: 13 additions & 0 deletions src/core/qlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,25 @@ class QList {
return len_;
}

unsigned compress_param() const {
return compress_;
}

Iterator Erase(Iterator it);

// Returns true if elements were deleted, false if list has not changed.
// Negative start index is allowed.
bool Erase(const long start, unsigned count);

// Needed by tests and the rdb code.
const quicklistNode* Head() const {
return head_;
}

const quicklistNode* Tail() const {
return tail_;
}

private:
bool AllowCompression() const {
return compress_ != 0;
Expand Down
193 changes: 192 additions & 1 deletion src/core/qlist_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "core/qlist.h"

#include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h>
#include <gmock/gmock.h>

#include "base/gtest.h"
Expand All @@ -21,6 +22,100 @@ namespace dfly {
using namespace std;
using namespace testing;

static int _ql_verify_compress(const QList& ql) {
int errors = 0;
unsigned compress_param = ql.compress_param();
if (compress_param > 0) {
const quicklistNode* node = ql.Head();
unsigned int low_raw = compress_param;
unsigned int high_raw = ql.node_count() - compress_param;

for (unsigned int at = 0; at < ql.node_count(); at++, node = node->next) {
if (node && (at < low_raw || at >= high_raw)) {
if (node->encoding != QUICKLIST_NODE_ENCODING_RAW) {
LOG(ERROR) << "Incorrect compression: node " << at << " is compressed at depth "
<< compress_param << " ((" << low_raw << "," << high_raw
<< " total nodes: " << ql.node_count() << "; size: " << node->sz
<< "; recompress: " << node->recompress;
errors++;
}
} else {
if (node->encoding != QUICKLIST_NODE_ENCODING_LZF && !node->attempted_compress) {
LOG(ERROR) << absl::StrFormat(
"Incorrect non-compression: node %d is NOT "
"compressed at depth %d ((%u, %u); total "
"nodes: %lu; size: %zu; recompress: %d; attempted: %d)",
at, compress_param, low_raw, high_raw, ql.node_count(), node->sz, node->recompress,
node->attempted_compress);
errors++;
}
}
}
}
return errors;
}

/* Verify list metadata matches physical list contents. */
static int ql_verify(const QList& ql, uint32_t nc, uint32_t count, uint32_t head_count,
uint32_t tail_count) {
int errors = 0;

if (nc != ql.node_count()) {
LOG(ERROR) << "quicklist length wrong: expected " << nc << " got " << ql.node_count();
errors++;
}

if (count != ql.Size()) {
LOG(ERROR) << "quicklist count wrong: expected " << count << " got " << ql.Size();
errors++;
}

auto* node = ql.Head();
size_t node_size = 0;
while (node) {
node_size += node->count;
node = node->next;
}

if (node_size != ql.Size()) {
LOG(ERROR) << "quicklist cached count not match actual count: expected " << ql.Size() << " got "
<< node_size;
errors++;
}

node = ql.Tail();
node_size = 0;
while (node) {
node_size += node->count;
node = node->prev;
}
if (node_size != ql.Size()) {
LOG(ERROR) << "has different forward count than reverse count! "
"Forward count is "
<< ql.Size() << ", reverse count is " << node_size;
errors++;
}

if (ql.node_count() == 0 && errors == 0) {
return 0;
}

if (ql.Head() && head_count != ql.Head()->count && head_count != lpLength(ql.Head()->entry)) {
LOG(ERROR) << absl::StrFormat("head count wrong: expected %u got cached %u vs. actual %lu",
head_count, ql.Head()->count, lpLength(ql.Head()->entry));
errors++;
}

if (ql.Tail() && tail_count != ql.Tail()->count && tail_count != lpLength(ql.Tail()->entry)) {
LOG(ERROR) << "tail count wrong: expected " << tail_count << "got cached " << ql.Tail()->count
<< " vs. actual " << lpLength(ql.Tail()->entry);
errors++;
}

errors += _ql_verify_compress(ql);
return errors;
}

class QListTest : public ::testing::Test {
protected:
QListTest() : mr_(mi_heap_get_backing()) {
Expand Down Expand Up @@ -183,4 +278,100 @@ TEST_P(OptionsTest, Numbers) {
EXPECT_EQ("xxxxxxxxxxxxxxxxxxxx", it.Get().view());
}

}; // namespace dfly
TEST_P(OptionsTest, DelRangeA) {
auto [fill, compress] = GetParam();
ql_ = QList(fill, compress);
long long nums[5000];
for (int i = 0; i < 33; i++) {
nums[i] = -5157318210846258176 + i;
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
}
if (fill == 32)
ql_verify(ql_, 2, 33, 32, 1);

/* ltrim 3 3 (keep [3,3] inclusive = 1 remaining) */
ql_.Erase(0, 3);
ql_.Erase(-29, 4000); /* make sure not loop forever */
if (fill == 32)
ql_verify(ql_, 1, 1, 1, 1);

auto it = ql_.GetIterator(0);
ASSERT_TRUE(it.Next());
EXPECT_EQ(-5157318210846258173, it.Get().ival());
}

TEST_P(OptionsTest, DelRangeB) {
auto [fill, _] = GetParam();
ql_ = QList(fill, QUICKLIST_NOCOMPRESS); // ignore compress parameter

long long nums[5000];
for (int i = 0; i < 33; i++) {
nums[i] = i;
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
}
if (fill == 32)
ql_verify(ql_, 2, 33, 32, 1);

/* ltrim 5 16 (keep [5,16] inclusive = 12 remaining) */
ql_.Erase(0, 5);
ql_.Erase(-16, 16);
if (fill == 32)
ql_verify(ql_, 1, 12, 12, 12);

auto it = ql_.GetIterator(0);
ASSERT_TRUE(it.Next());
EXPECT_EQ(5, it.Get().ival());

it = ql_.GetIterator(-1);
ASSERT_TRUE(it.Next());
EXPECT_EQ(16, it.Get().ival());

ql_.Push("bobobob", QList::TAIL);
it = ql_.GetIterator(-1);
ASSERT_TRUE(it.Next());
EXPECT_EQ("bobobob", it.Get().view());

for (int i = 0; i < 12; i++) {
it = ql_.GetIterator(i);
ASSERT_TRUE(it.Next());
EXPECT_EQ(i + 5, it.Get().ival());
}
}

TEST_P(OptionsTest, DelRangeC) {
auto [fill, compress] = GetParam();
ql_ = QList(fill, compress);
long long nums[5000];
for (int i = 0; i < 33; i++) {
nums[i] = -5157318210846258176 + i;
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
}
if (fill == 32)
ql_verify(ql_, 2, 33, 32, 1);

/* ltrim 3 3 (keep [3,3] inclusive = 1 remaining) */
ql_.Erase(0, 3);
ql_.Erase(-29, 4000); /* make sure not loop forever */
if (fill == 32)
ql_verify(ql_, 1, 1, 1, 1);
auto it = ql_.GetIterator(0);
ASSERT_TRUE(it.Next());
ASSERT_EQ(-5157318210846258173, it.Get().ival());
}

TEST_P(OptionsTest, DelRangeD) {
auto [fill, compress] = GetParam();
ql_ = QList(fill, compress);
long long nums[5000];
for (int i = 0; i < 33; i++) {
nums[i] = -5157318210846258176 + i;
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
}
if (fill == 32)
ql_verify(ql_, 2, 33, 32, 1);
ql_.Erase(-12, 3);

ASSERT_EQ(30, ql_.Size());
}

} // namespace dfly
18 changes: 12 additions & 6 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,8 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
return it_res.status();

auto it = it_res->it;
quicklist* ql = GetQL(it->second);
long llen = quicklistCount(ql);

long llen = it->second.Size();

/* convert negative indexes */
if (start < 0)
Expand All @@ -781,12 +781,18 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
rtrim = llen - end - 1;
}

quicklistDelRange(ql, 0, ltrim);
quicklistDelRange(ql, -rtrim, rtrim);

if (it->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(it->second);
ql->Erase(0, ltrim);
ql->Erase(-rtrim, rtrim);
} else {
quicklist* ql = GetQL(it->second);
quicklistDelRange(ql, 0, ltrim);
quicklistDelRange(ql, -rtrim, rtrim);
}
it_res->post_updater.Run();

if (quicklistCount(ql) == 0) {
if (it->second.Size() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty will be better

CHECK(db_slice.Del(op_args.db_cntx, it));
}
return OpStatus::OK;
Expand Down
Loading