Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the ZADD options #1022

Merged
merged 35 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ffa097f
Add zadd options in headers
manchurio Oct 20, 2022
f4f3ebb
Parse options for zadd.
manchurio Oct 20, 2022
8ce1093
Validate flags in zadd.
manchurio Oct 20, 2022
09bf9e7
Refactor parseOptions.
manchurio Oct 21, 2022
0baa9b7
Handle zadd options
manchurio Oct 21, 2022
0e51582
Handel zadd response with options
manchurio Oct 21, 2022
3af1e0d
Zadd options unit test.
manchurio Oct 21, 2022
9ab913c
Fix problems about clang-format.
manchurio Oct 21, 2022
0d27361
Fix problems of old code about clang-format
manchurio Oct 21, 2022
0574b88
Enable c++ 17 for cpp check
manchurio Oct 21, 2022
283634c
Add mailing list to README (#1023)
PragmaTwice Oct 21, 2022
aae8228
Merge branch 'unstable' into feat/zadd_option
manchurio Oct 21, 2022
d985bba
change the code for bad cppcheck
manchurio Oct 21, 2022
af27eac
format go test case
manchurio Oct 22, 2022
5f589dc
Add duplicate nx/xx to zadd
manchurio Oct 22, 2022
822afdb
Merge branch 'unstable' into feat/zadd_option
tisonkun Oct 23, 2022
79f8607
Fix problems of code style.
manchurio Oct 23, 2022
6ac21ac
Fix NX problem.
manchurio Oct 23, 2022
93186a1
Add NX with multiple pairs in zadd unit tests
manchurio Oct 23, 2022
199eb21
change permission of x.py to 755
manchurio Oct 23, 2022
0143e04
Merge branch 'unstable' into feat/zadd_option
PragmaTwice Oct 23, 2022
ba43865
Change parseOptions to parseFlags
manchurio Oct 24, 2022
9daea18
Refactor flags from uint8_t to struct ZAddFlags
manchurio Oct 24, 2022
a429eaa
Merge multiple conditions into one line.
manchurio Oct 24, 2022
4688daa
Using EqualValues instead of Equals to avoid type cast.
manchurio Oct 24, 2022
b58ae86
Fix clang-format.
manchurio Oct 24, 2022
093e995
Merge branch 'unstable' into feat/zadd_option
manchurio Oct 24, 2022
6eb57dd
Merge branch 'unstable' into feat/zadd_option
manchurio Oct 24, 2022
994d3cb
Merge branch 'unstable' into feat/zadd_option
manchurio Oct 27, 2022
12053a8
Merge branch 'unstable' into feat/zadd_option
PragmaTwice Oct 23, 2022
7c7b604
No need for additional local variables
manchurio Oct 27, 2022
f704d6f
clang format
manchurio Oct 27, 2022
8c178b7
use class instead of typedef struct
manchurio Oct 27, 2022
16829bf
Fix no pair bugs in zadd
manchurio Oct 27, 2022
b7cd0b4
Merge branch 'unstable' into feat/zadd_option
manchurio Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 61 additions & 6 deletions src/commands/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const char *errUnbalancedStreamList =
"Unbalanced XREAD list of streams: for each stream key an ID or '$' must be specified.";
const char *errTimeoutIsNegative = "timeout is negative";
const char *errLimitOptionNotAllowed = "syntax error, LIMIT cannot be used without the special ~ option";
const char *errZSetLTGTNX = "GT, LT, and/or NX options at the same time are not compatible";

enum class AuthResult {
OK,
Expand Down Expand Up @@ -2380,12 +2381,20 @@ class CommandSInterStore : public Commander {
class CommandZAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() % 2 != 0) {
return Status(Status::RedisParseErr, errInvalidSyntax);
unsigned index = 2;
parseFlags(args, index);
if (auto s = validateFlags(); !s.IsOK()) {
return s;
}
if (auto left = (args.size() - index); left > 0) {
manchurio marked this conversation as resolved.
Show resolved Hide resolved
if (flags_.HasIncr() && left != 2) {
return Status(Status::RedisParseErr, "INCR option supports a single increment-element pair");
} else if (left % 2 != 0) {
return Status(Status::RedisParseErr, errInvalidSyntax);
}
}

try {
for (unsigned i = 2; i < args.size(); i += 2) {
for (unsigned i = index; i < args.size(); i += 2) {
double score = std::stod(args[i]);
if (std::isnan(score)) {
return Status(Status::RedisParseErr, "ERR score is not a valid float");
Expand All @@ -2400,19 +2409,65 @@ class CommandZAdd : public Commander {

Status Execute(Server *svr, Connection *conn, std::string *output) override {
int ret;
double old_score = member_scores_[0].score;
Redis::ZSet zset_db(svr->storage_, conn->GetNamespace());
rocksdb::Status s = zset_db.Add(args_[1], 0, &member_scores_, &ret);
rocksdb::Status s = zset_db.Add(args_[1], flags_, &member_scores_, &ret);
if (!s.ok()) {
return Status(Status::RedisExecErr, s.ToString());
}
*output = Redis::Integer(ret);
bool incr = flags_.HasIncr();
if (incr) {
auto new_score = member_scores_[0].score;
bool nx = flags_.HasNX(), xx = flags_.HasXX(), lt = flags_.HasLT(), gt = flags_.HasGT();
if ((nx || xx || lt || gt) && old_score == new_score &&
manchurio marked this conversation as resolved.
Show resolved Hide resolved
ret == 0) { // not the first time using incr && score not changed
*output = Redis::NilString();
return Status::OK();
}
*output = Redis::BulkString(Util::Float2String(new_score));
} else {
*output = Redis::Integer(ret);
}
return Status::OK();
}

private:
std::vector<MemberScore> member_scores_;
ZAddFlags flags_{0};

void parseFlags(const std::vector<std::string> &args, unsigned &index);
Status validateFlags() const;
};

void CommandZAdd::parseFlags(const std::vector<std::string> &args, unsigned &index) {
std::unordered_map<std::string, ZSetFlags> options = {{"xx", kZSetXX}, {"nx", kZSetNX}, {"ch", kZSetCH},
{"lt", kZSetLT}, {"gt", kZSetGT}, {"incr", kZSetIncr}};
for (unsigned i = 2; i < args.size(); i++) {
auto option = Util::ToLower(args[i]);
auto it = options.find(option);
if (it != options.end()) {
flags_.SetFlag(it->second);
index++;
} else {
break;
}
}
}

Status CommandZAdd::validateFlags() const {
if (!flags_.HasFlag()) {
return Status::OK();
}
bool nx = flags_.HasNX(), xx = flags_.HasXX(), lt = flags_.HasLT(), gt = flags_.HasGT();
if (nx && xx) {
return Status(Status::RedisParseErr, "XX and NX options at the same time are not compatible");
}
if ((lt && gt) || (lt && nx) || (gt && nx)) {
return Status(Status::RedisParseErr, errZSetLTGTNX);
}
manchurio marked this conversation as resolved.
Show resolved Hide resolved
return Status::OK();
}

class CommandZCount : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down
4 changes: 2 additions & 2 deletions src/types/redis_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rocksdb::Status Geo::Add(const Slice &user_key, std::vector<GeoPoint> *geo_point
GeoHashFix52Bits bits = GeoHashHelper::Align52Bits(hash);
member_scores.emplace_back(MemberScore{geo_point.member, static_cast<double>(bits)});
}
return ZSet::Add(user_key, 0, &member_scores, ret);
return ZSet::Add(user_key, ZAddFlags::Default(), &member_scores, ret);
}

rocksdb::Status Geo::Dist(const Slice &user_key, const Slice &member_1, const Slice &member_2, double *dist) {
Expand Down Expand Up @@ -120,7 +120,7 @@ rocksdb::Status Geo::Radius(const Slice &user_key, double longitude, double lati
member_scores.emplace_back(MemberScore{geo_point.member, score});
}
int ret;
ZSet::Add(store_key, 0, &member_scores, &ret);
ZSet::Add(store_key, ZAddFlags::Default(), &member_scores, &ret);
}
}

Expand Down
26 changes: 23 additions & 3 deletions src/types/redis_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ rocksdb::Status ZSet::GetMetadata(const Slice &ns_key, ZSetMetadata *metadata) {
return Database::GetMetadata(kRedisZSet, ns_key, metadata);
}

rocksdb::Status ZSet::Add(const Slice &user_key, uint8_t flags, std::vector<MemberScore> *mscores, int *ret) {
rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, std::vector<MemberScore> *mscores, int *ret) {
*ret = 0;

std::string ns_key;
Expand All @@ -49,6 +49,7 @@ rocksdb::Status ZSet::Add(const Slice &user_key, uint8_t flags, std::vector<Memb
if (!s.ok() && !s.IsNotFound()) return s;

int added = 0;
int changed = 0;
rocksdb::WriteBatch batch;
WriteBatchLogData log_data(kRedisZSet);
batch.PutLogData(log_data.Encode());
Expand All @@ -71,19 +72,31 @@ rocksdb::Status ZSet::Add(const Slice &user_key, uint8_t flags, std::vector<Memb
}
added_member_keys.insert(member_key);

bool lt = flags.HasLT();
bool gt = flags.HasGT();
manchurio marked this conversation as resolved.
Show resolved Hide resolved

if (metadata.size > 0) {
std::string old_score_bytes;
s = db_->Get(rocksdb::ReadOptions(), member_key, &old_score_bytes);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.ok()) {
if (!s.IsNotFound() && flags.HasNX()) {
continue;
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
}
double old_score = DecodeDouble(old_score_bytes.data());
if (flags == kZSetIncr) {
if (flags.HasIncr()) {
if ((lt && (*mscores)[i].score >= 0) || (gt && (*mscores)[i].score <= 0)) {
continue;
}
(*mscores)[i].score += old_score;
if (std::isnan((*mscores)[i].score)) {
return rocksdb::Status::InvalidArgument("resulting score is not a number (NaN)");
}
}
if ((*mscores)[i].score != old_score) {
if ((lt && (*mscores)[i].score >= old_score) || (gt && (*mscores)[i].score <= old_score)) {
continue;
}
old_score_bytes.append((*mscores)[i].member);
std::string old_score_key;
InternalKey(ns_key, old_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode(&old_score_key);
Expand All @@ -94,10 +107,14 @@ rocksdb::Status ZSet::Add(const Slice &user_key, uint8_t flags, std::vector<Memb
new_score_bytes.append((*mscores)[i].member);
InternalKey(ns_key, new_score_bytes, metadata.version, storage_->IsSlotIdEncoded()).Encode(&new_score_key);
batch.Put(score_cf_handle_, new_score_key, Slice());
changed++;
}
continue;
}
}
if (flags.HasXX()) {
continue;
}
std::string score_bytes, score_key;
PutDouble(&score_bytes, (*mscores)[i].score);
batch.Put(member_key, score_bytes);
Expand All @@ -113,6 +130,9 @@ rocksdb::Status ZSet::Add(const Slice &user_key, uint8_t flags, std::vector<Memb
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
if (flags.HasCH()) {
*ret += changed;
}
return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}

Expand All @@ -138,7 +158,7 @@ rocksdb::Status ZSet::IncrBy(const Slice &user_key, const Slice &member, double
int ret;
std::vector<MemberScore> mscores;
mscores.emplace_back(MemberScore{member.ToString(), increment});
rocksdb::Status s = Add(user_key, kZSetIncr, &mscores, &ret);
rocksdb::Status s = Add(user_key, ZAddFlags::Incr(), &mscores, &ret);
if (!s.ok()) return s;
*score = mscores[0].score;
return rocksdb::Status::OK();
Expand Down
26 changes: 25 additions & 1 deletion src/types/redis_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,39 @@ enum ZSetFlags {
kZSetXX = 1 << 2,
kZSetReversed = 1 << 3,
kZSetRemoved = 1 << 4,
kZSetGT = 1 << 5,
kZSetLT = 1 << 6,
kZSetCH = 1 << 7,
};

typedef struct ZAddFlags {
manchurio marked this conversation as resolved.
Show resolved Hide resolved
explicit ZAddFlags(uint8_t flags = 0) : flags(flags) {}

bool HasNX() const { return (flags & kZSetNX) != 0; }
bool HasXX() const { return (flags & kZSetXX) != 0; }
bool HasLT() const { return (flags & kZSetLT) != 0; }
bool HasGT() const { return (flags & kZSetGT) != 0; }
bool HasCH() const { return (flags & kZSetCH) != 0; }
bool HasIncr() const { return (flags & kZSetIncr) != 0; }
bool HasFlag() const { return flags != 0; }
manchurio marked this conversation as resolved.
Show resolved Hide resolved

void SetFlag(ZSetFlags setFlags) { flags |= setFlags; }

static const ZAddFlags Incr() { return ZAddFlags{kZSetIncr}; }

static const ZAddFlags Default() { return ZAddFlags{0}; }

private:
uint8_t flags = 0;
} ZAddFlags;
manchurio marked this conversation as resolved.
Show resolved Hide resolved

namespace Redis {

class ZSet : public SubKeyScanner {
public:
explicit ZSet(Engine::Storage *storage, const std::string &ns)
: SubKeyScanner(storage, ns), score_cf_handle_(storage->GetCFHandle("zset_score")) {}
rocksdb::Status Add(const Slice &user_key, uint8_t flags, std::vector<MemberScore> *mscores, int *ret);
rocksdb::Status Add(const Slice &user_key, ZAddFlags flags, std::vector<MemberScore> *mscores, int *ret);
rocksdb::Status Card(const Slice &user_key, int *ret);
rocksdb::Status Count(const Slice &user_key, const ZRangeSpec &spec, int *ret);
rocksdb::Status IncrBy(const Slice &user_key, const Slice &member, double increment, double *score);
Expand Down
2 changes: 1 addition & 1 deletion tests/cppunit/compact_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ TEST(Compact, Filter) {
auto zset = std::make_unique<Redis::ZSet>(storage_.get(), ns);
std::string expired_zset_key = "expire_zset_key";
std::vector<MemberScore> member_scores = {MemberScore{"z1", 1.1}, MemberScore{"z2", 0.4}};
zset->Add(expired_zset_key, 0, &member_scores, &ret);
zset->Add(expired_zset_key, ZAddFlags::Default(), &member_scores, &ret);
zset->Expire(expired_zset_key, 1); // expired
usleep(10000);

Expand Down
2 changes: 1 addition & 1 deletion tests/cppunit/disk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ TEST_F(RedisDiskTest, ZsetDisk) {
mscores[i].score = 1.0 * value_size[int(values_.size()) - i - 1];
approximate_size += (key_.size() + 8 + mscores[i].member.size() + 8) * 2;
}
rocksdb::Status s = zset->Add(key_, 0, &mscores, &ret);
rocksdb::Status s = zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_TRUE(s.ok() && ret == 5);
uint64_t key_size = 0;
EXPECT_TRUE(disk->GetKeySize(key_, kRedisZSet, &key_size).ok());
Expand Down
30 changes: 15 additions & 15 deletions tests/cppunit/t_zset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ TEST_F(RedisZSetTest, Add) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(static_cast<int>(fields_.size()), ret);
for (size_t i = 0; i < fields_.size(); i++) {
double got;
rocksdb::Status s = zset->Score(key_, fields_[i], &got);
EXPECT_EQ(scores_[i], got);
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(ret, 0);
zset->Del(key_);
}
Expand All @@ -65,7 +65,7 @@ TEST_F(RedisZSetTest, IncrBy) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);
for (size_t i = 0; i < fields_.size(); i++) {
double increment = 12.3, score;
Expand All @@ -81,7 +81,7 @@ TEST_F(RedisZSetTest, Remove) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);
zset->Remove(key_, fields_, &ret);
EXPECT_EQ(fields_.size(), ret);
Expand All @@ -100,7 +100,7 @@ TEST_F(RedisZSetTest, Range) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
int count = mscores.size() - 1;
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);
zset->Range(key_, 0, -2, 0, &mscores);
EXPECT_EQ(mscores.size(), count);
Expand All @@ -118,7 +118,7 @@ TEST_F(RedisZSetTest, RevRange) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
int count = mscores.size() - 1;
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(static_cast<int>(fields_.size()), ret);
zset->Range(key_, 0, -2, kZSetReversed, &mscores);
EXPECT_EQ(mscores.size(), count);
Expand All @@ -135,7 +135,7 @@ TEST_F(RedisZSetTest, PopMin) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(static_cast<int>(fields_.size()), ret);
zset->Pop(key_, mscores.size() - 1, true, &mscores);
for (size_t i = 0; i < mscores.size(); i++) {
Expand All @@ -154,7 +154,7 @@ TEST_F(RedisZSetTest, PopMax) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(static_cast<int>(fields_.size()), ret);
zset->Pop(key_, mscores.size() - 1, false, &mscores);
for (size_t i = 0; i < mscores.size(); i++) {
Expand All @@ -171,7 +171,7 @@ TEST_F(RedisZSetTest, RangeByLex) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);

ZRangeLexSpec spec;
Expand Down Expand Up @@ -227,7 +227,7 @@ TEST_F(RedisZSetTest, RangeByScore) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);

// test case: inclusive the min and max score
Expand Down Expand Up @@ -275,7 +275,7 @@ TEST_F(RedisZSetTest, RangeByScoreWithLimit) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);

ZRangeSpec spec;
Expand All @@ -296,7 +296,7 @@ TEST_F(RedisZSetTest, RemRangeByScore) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);
ZRangeSpec spec;
spec.min = scores_[0];
Expand All @@ -315,7 +315,7 @@ TEST_F(RedisZSetTest, RemoveRangeByRank) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);
zset->RemoveRangeByRank(key_, 0, fields_.size() - 2, &ret);
EXPECT_EQ(fields_.size() - 1, ret);
Expand All @@ -329,7 +329,7 @@ TEST_F(RedisZSetTest, RemoveRevRangeByRank) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);
zset->RemoveRangeByRank(key_, 0, fields_.size() - 2, &ret);
EXPECT_EQ(static_cast<int>(fields_.size() - 1), ret);
Expand All @@ -343,7 +343,7 @@ TEST_F(RedisZSetTest, Rank) {
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset->Add(key_, 0, &mscores, &ret);
zset->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(static_cast<int>(fields_.size()), ret);

for (size_t i = 0; i < fields_.size(); i++) {
Expand Down
Loading