Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Support for LIMIT and REV in ZRANGE #422 #456

Merged
merged 5 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 121 additions & 107 deletions src/server/zset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ extern "C" {
#include "facade/error.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/transaction.h"
#include "server/container_utils.h"

namespace dfly {

Expand Down Expand Up @@ -122,11 +122,7 @@ OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args
return it;
}

enum class Action {
RANGE = 0,
REMOVE = 1,
POP = 2
};
enum class Action { RANGE = 0, REMOVE = 1, POP = 2 };

class IntervalVisitor {
public:
Expand Down Expand Up @@ -269,10 +265,13 @@ void IntervalVisitor::operator()(ZSetFamily::TopNScored sc) {
}

void IntervalVisitor::ActionRange(unsigned start, unsigned end) {
container_utils::IterateSortedSet(zobj_, [this](container_utils::ContainerEntry ce, double score){
result_.emplace_back(ce.ToString(), score);
return true;
}, start, end, params_.reverse, params_.with_scores);
container_utils::IterateSortedSet(
zobj_,
[this](container_utils::ContainerEntry ce, double score) {
result_.emplace_back(ce.ToString(), score);
return true;
},
start, end, params_.reverse, params_.with_scores);
}

void IntervalVisitor::ActionRange(const zrangespec& range) {
Expand Down Expand Up @@ -513,9 +512,9 @@ void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) {
long long vlong = 0;

if (params_.reverse) {
eptr = lpSeek(zl,-2);
eptr = lpSeek(zl, -2);
} else {
eptr = lpSeek(zl,0);
eptr = lpSeek(zl, 0);
}

/* Get score pointer for the first element. */
Expand All @@ -537,11 +536,11 @@ void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) {
if (params_.reverse) {
/* If the number of elements to delete is greater than the listpack length,
* we set the start to 0 because lpseek fails to search beyond length in reverse */
start = (2*sc > lpLength(zl)) ? 0 : -2*sc;
start = (2 * sc > lpLength(zl)) ? 0 : -2 * sc;
}

/* We can finally delete the elements */
zobj_->ptr = lpDeleteRange(zl, start, 2*sc);
zobj_->ptr = lpDeleteRange(zl, start, 2 * sc);
}

void IntervalVisitor::PopSkipList(ZSetFamily::TopNScored sc) {
Expand Down Expand Up @@ -1204,44 +1203,91 @@ void ZSetFamily::ZLexCount(CmdArgList args, ConnectionContext* cntx) {
}

void ZSetFamily::ZRange(CmdArgList args, ConnectionContext* cntx) {
ZRangeGeneric(std::move(args), false, cntx);
RangeParams::INTERVALTYPE interval_type = RangeParams::INTERVALTYPE::RANK;
RangeParams range_params;
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

for (size_t i = 4; i < args.size(); ++i) {
ToUpper(&args[i]);

string_view cur_arg = ArgS(args, i);
if (cur_arg == "BYSCORE") {
if (interval_type == RangeParams::INTERVALTYPE::LEX) {
return (*cntx)->SendError("BYSCORE and BYLEX options are not compatible");
}
interval_type = RangeParams::INTERVALTYPE::SCORE;
} else if (cur_arg == "BYLEX") {
if (interval_type == RangeParams::INTERVALTYPE::SCORE) {
return (*cntx)->SendError("BYSCORE and BYLEX options are not compatible");
}
interval_type = RangeParams::INTERVALTYPE::LEX;
} else if (cur_arg == "REV") {
range_params.reverse = true;
} else if (cur_arg == "WITHSCORES") {
range_params.with_scores = true;
} else if (cur_arg == "LIMIT") {
if (i + 3 != args.size()) {
return (*cntx)->SendError(kSyntaxErr);
}
string_view os = ArgS(args, i + 1);
string_view cs = ArgS(args, i + 2);
if (!SimpleAtoi(os, &range_params.offset) || !SimpleAtoi(cs, &range_params.limit)) {
return (*cntx)->SendError(kInvalidIntErr);
}
i += 3;
} else {
return cntx->reply_builder()->SendError(absl::StrCat("unsupported option ", cur_arg));
}
}

range_params.interval_type = interval_type;
ZRangeGeneric(std::move(args), range_params, cntx);
}

void ZSetFamily::ZRank(CmdArgList args, ConnectionContext* cntx) {
ZRankGeneric(std::move(args), false, cntx);
}

void ZSetFamily::ZRevRange(CmdArgList args, ConnectionContext* cntx) {
ZRangeGeneric(std::move(args), true, cntx);
}

void ZSetFamily::ZRevRangeByScore(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
string_view max_s = ArgS(args, 3);

RangeParams range_params;
range_params.reverse = true;
args.remove_prefix(4);

if (!ParseRangeByScoreParams(args, &range_params)) {
return (*cntx)->SendError(kSyntaxErr);
for (size_t i = 4; i < args.size(); ++i) {
ToUpper(&args[i]);

string_view cur_arg = ArgS(args, i);
if (cur_arg == "WITHSCORES") {
range_params.with_scores = true;
} else {
return cntx->reply_builder()->SendError(absl::StrCat("unsupported option ", cur_arg));
}
}

ZRangeByScoreInternal(key, min_s, max_s, range_params, cntx);
ZRangeGeneric(std::move(args), range_params, cntx);
}

void ZSetFamily::ZRevRangeByScore(CmdArgList args, ConnectionContext* cntx) {
ZRangeByScoreInternal(std::move(args), true, cntx);
}

void ZSetFamily::ZRevRank(CmdArgList args, ConnectionContext* cntx) {
ZRankGeneric(std::move(args), true, cntx);
}

void ZSetFamily::ZRangeByLex(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
string_view max_s = ArgS(args, 3);
ZRangeByLexInternal(std::move(args), false, cntx);
}
void ZSetFamily::ZRevRangeByLex(CmdArgList args, ConnectionContext* cntx) {
ZRangeByLexInternal(std::move(args), true, cntx);
}

void ZSetFamily::ZRangeByLexInternal(CmdArgList args, bool reverse, ConnectionContext* cntx) {
uint32_t offset = 0;
uint32_t count = kuint32max;

RangeParams range_params;
range_params.interval_type = RangeParams::INTERVALTYPE::LEX;
range_params.reverse = reverse;

if (args.size() > 4) {
if (args.size() != 7)
return (*cntx)->SendError(kSyntaxErr);
Expand All @@ -1251,42 +1297,18 @@ void ZSetFamily::ZRangeByLex(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(kSyntaxErr);
string_view os = ArgS(args, 5);
string_view cs = ArgS(args, 6);
if (!SimpleAtoi(os, &count) || !SimpleAtoi(cs, &count)) {
if (!SimpleAtoi(os, &offset) || !SimpleAtoi(cs, &count)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

return (*cntx)->SendError(kInvalidIntErr);
}
}
range_params.offset = offset;
range_params.limit = count;

LexInterval li;
if (!ParseLexBound(min_s, &li.first) || !ParseLexBound(max_s, &li.second)) {
return (*cntx)->SendError(kLexRangeErr);
}

ZRangeSpec range_spec;
range_spec.params.offset = offset;
range_spec.params.limit = count;
range_spec.interval = li;

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRange(range_spec, t->GetOpArgs(shard), key);
};

OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OutputScoredArrayResult(result, range_spec.params, cntx);
ZRangeGeneric(args, range_params, cntx);
}

void ZSetFamily::ZRangeByScore(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
string_view max_s = ArgS(args, 3);

RangeParams range_params;
args.remove_prefix(4);

if (!ParseRangeByScoreParams(args, &range_params)) {
return (*cntx)->SendError(kSyntaxErr);
}

ZRangeByScoreInternal(key, min_s, max_s, range_params, cntx);
ZRangeByScoreInternal(std::move(args), false, cntx);
}

void ZSetFamily::ZRemRangeByRank(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -1492,23 +1514,14 @@ void ZSetFamily::ZUnionStore(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(smvec.size());
}

void ZSetFamily::ZRangeByScoreInternal(string_view key, string_view min_s, string_view max_s,
const RangeParams& params, ConnectionContext* cntx) {
ZRangeSpec range_spec;
range_spec.params = params;

ScoreInterval si;
if (!ParseBound(min_s, &si.first) || !ParseBound(max_s, &si.second)) {
return (*cntx)->SendError(kFloatRangeErr);
void ZSetFamily::ZRangeByScoreInternal(CmdArgList args, bool reverse, ConnectionContext* cntx) {
RangeParams range_params;
range_params.interval_type = RangeParams::INTERVALTYPE::SCORE;
range_params.reverse = reverse;
if (!ParseRangeByScoreParams(args.subspan(4), &range_params)) {
return (*cntx)->SendError(kSyntaxErr);
}
range_spec.interval = si;

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRange(range_spec, t->GetOpArgs(shard), key);
};

OpResult<ScoredArray> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OutputScoredArrayResult(result, params, cntx);
ZRangeGeneric(args, range_params, cntx);
}

void ZSetFamily::OutputScoredArrayResult(const OpResult<ScoredArray>& result,
Expand Down Expand Up @@ -1545,44 +1558,42 @@ void ZSetFamily::ZRemRangeGeneric(string_view key, const ZRangeSpec& range_spec,
}
}

void ZSetFamily::ZRangeGeneric(CmdArgList args, bool reverse, ConnectionContext* cntx) {
void ZSetFamily::ZRangeGeneric(CmdArgList args, RangeParams range_params, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
string_view max_s = ArgS(args, 3);

bool parse_score = false;
RangeParams range_params;
range_params.reverse = reverse;

for (size_t i = 4; i < args.size(); ++i) {
ToUpper(&args[i]);
ZRangeSpec range_spec;
range_spec.params = range_params;

string_view cur_arg = ArgS(args, i);
if (!reverse && cur_arg == "BYSCORE") {
parse_score = true;
} else if (cur_arg == "WITHSCORES") {
range_params.with_scores = true;
} else {
return cntx->reply_builder()->SendError(absl::StrCat("unsupported option ", cur_arg));
switch (range_params.interval_type) {
case RangeParams::INTERVALTYPE::SCORE: {
ScoreInterval si;
if (!ParseBound(min_s, &si.first) || !ParseBound(max_s, &si.second)) {
return (*cntx)->SendError(kFloatRangeErr);
}
range_spec.interval = si;
break;
}
case RangeParams::INTERVALTYPE::LEX: {
LexInterval li;
if (!ParseLexBound(min_s, &li.first) || !ParseLexBound(max_s, &li.second)) {
return (*cntx)->SendError(kLexRangeErr);
}
range_spec.interval = li;
break;
}
case RangeParams::INTERVALTYPE::RANK: {
IndexInterval ii;
if (!SimpleAtoi(min_s, &ii.first) || !SimpleAtoi(max_s, &ii.second)) {
(*cntx)->SendError(kInvalidIntErr);
return;
}
range_spec.interval = ii;
break;
}
}

if (parse_score) {
ZRangeByScoreInternal(key, min_s, max_s, range_params, cntx);
return;
}

IndexInterval ii;

if (!SimpleAtoi(min_s, &ii.first) || !SimpleAtoi(max_s, &ii.second)) {
(*cntx)->SendError(kInvalidIntErr);
return;
}

ZRangeSpec range_spec;
range_spec.params = range_params;
range_spec.interval = ii;

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRange(range_spec, t->GetOpArgs(shard), key);
};
Expand Down Expand Up @@ -1758,7 +1769,8 @@ OpResult<double> ZSetFamily::OpScore(const OpArgs& op_args, string_view key, str
return score;
}

OpResult<ZSetFamily::MScoreResponse> ZSetFamily::OpMScore(const OpArgs& op_args, string_view key, ArgSlice members) {
OpResult<ZSetFamily::MScoreResponse> ZSetFamily::OpMScore(const OpArgs& op_args, string_view key,
ArgSlice members) {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
Expand All @@ -1784,7 +1796,8 @@ OpResult<ZSetFamily::MScoreResponse> ZSetFamily::OpMScore(const OpArgs& op_args,
return scores;
}

auto ZSetFamily::OpPopCount(const ZRangeSpec& range_spec, const OpArgs& op_args, string_view key) -> OpResult<ScoredArray> {
auto ZSetFamily::OpPopCount(const ZRangeSpec& range_spec, const OpArgs& op_args, string_view key)
-> OpResult<ScoredArray> {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
if (!res_it)
Expand Down Expand Up @@ -2020,6 +2033,7 @@ void ZSetFamily::Register(CommandRegistry* registry) {
<< CI{"ZREMRANGEBYSCORE", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByScore)
<< CI{"ZREMRANGEBYLEX", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByLex)
<< CI{"ZREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRange)
<< CI{"ZREVRANGEBYLEX", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByLex)
<< CI{"ZREVRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByScore)
<< CI{"ZREVRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRevRank)
<< CI{"ZSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(ZScan)
Expand Down
Loading