Skip to content

Commit

Permalink
chore: improve the state machine of RedisParser
Browse files Browse the repository at this point in the history
1. Simplify conditions inside the main loop.
2. Improve the logic inside ConsumeBulk() function.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange committed Nov 10, 2024
1 parent 1eef773 commit 4f1df14
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 34 deletions.
51 changes: 28 additions & 23 deletions src/facade/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
ResultConsumed resultc{OK, 0};

do {
if (str.empty()) {
resultc.first = INPUT_PENDING;
break;
}

switch (state_) {
case MAP_LEN_S:
case ARRAY_LEN_S:
Expand All @@ -61,16 +56,21 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
}

*consumed += resultc.second;
str.remove_prefix(exchange(resultc.second, 0));
} while (state_ != CMD_COMPLETE_S && resultc.first == OK && !str.empty());

if (resultc.first != OK) {
break;
if (state_ != CMD_COMPLETE_S) {
if (resultc.first == OK) {
resultc.first = INPUT_PENDING;
}
str.remove_prefix(exchange(resultc.second, 0));
} while (state_ != CMD_COMPLETE_S);

if (resultc.first == INPUT_PENDING) {
StashState(res);
} else if (resultc.first == OK) {
if (resultc.first == INPUT_PENDING) {
StashState(res);
}
return resultc.first;
}

if (resultc.first == OK) {
DCHECK(cached_expr_);
if (res != cached_expr_) {
DCHECK(!stash_.empty());
Expand All @@ -82,7 +82,7 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
return resultc.first;
}

void RedisParser::InitStart(uint8_t prefix_b, RespExpr::Vec* res) {
void RedisParser::InitStart(char prefix_b, RespExpr::Vec* res) {
buf_stash_.clear();
stash_.clear();
cached_expr_ = res;
Expand Down Expand Up @@ -287,6 +287,7 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
}

auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
DCHECK(!str.empty());
char c = str[0];

if (c == '$') {
Expand Down Expand Up @@ -389,25 +390,29 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {

uint32_t consumed = 0;

if (str.size() >= bulk_len_ + 2) {
if (str[bulk_len_] != '\r' || str[bulk_len_ + 1] != '\n') {
return {BAD_STRING, 0};
}

if (str.size() >= bulk_len_) {
consumed = bulk_len_;
if (bulk_len_) {
// is_broken_token_ can be false, if we just parsed the bulk length but have
// not parsed the token itself.
if (is_broken_token_) {
memcpy(bulk_str.end(), str.data(), bulk_len_);
bulk_str = Buffer{bulk_str.data(), bulk_str.size() + bulk_len_};
} else {
bulk_str = str.subspan(0, bulk_len_);
}
str.remove_prefix(exchange(bulk_len_, 0));
is_broken_token_ = false;
}
is_broken_token_ = false;
consumed = bulk_len_ + 2;
bulk_len_ = 0;
HandleFinishArg();

return {OK, consumed};
if (str.size() >= 2) {
if (str[0] != '\r' || str[1] != '\n') {
return {BAD_STRING, consumed};
}
HandleFinishArg();
return {OK, consumed + 2};
}
return {INPUT_PENDING, consumed};
}

if (str.size() >= 32) {
Expand Down
5 changes: 2 additions & 3 deletions src/facade/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ class RedisParser {
private:
using ResultConsumed = std::pair<Result, uint32_t>;

void InitStart(uint8_t prefix_b, RespVec* res);
void InitStart(char prefix_b, RespVec* res);
void StashState(RespVec* res);

// Skips the first character (*).
ResultConsumed ConsumeArrayLen(Buffer str);
ResultConsumed ParseArg(Buffer str);
ResultConsumed ConsumeBulk(Buffer str);
ResultConsumed ParseInline(Buffer str);

ResultConsumed ParseLen(Buffer str, int64_t* res);

void HandleFinishArg();
Expand All @@ -98,7 +97,7 @@ class RedisParser {
};

State state_ = CMD_COMPLETE_S;
bool is_broken_token_ = false; // whether the last inline string was broken in the middle.
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
bool server_mode_ = true;

uint32_t bulk_len_ = 0;
Expand Down
17 changes: 9 additions & 8 deletions src/facade/redis_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ TEST_F(RedisParserTest, Multi2) {
EXPECT_EQ(4, consumed_);

ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("$4\r\nMSET"));
EXPECT_EQ(4, consumed_);
EXPECT_EQ(8, consumed_);

ASSERT_EQ(RedisParser::OK, Parse("MSET\r\n*2\r\n"));
EXPECT_EQ(6, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n*2\r\n"));
EXPECT_EQ(2, consumed_);

ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*2\r\n$3\r\nKEY\r\n$3\r\nVAL"));
EXPECT_EQ(17, consumed_);
EXPECT_EQ(20, consumed_);

ASSERT_EQ(RedisParser::OK, Parse("VAL\r\n"));
EXPECT_EQ(5, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
EXPECT_EQ(2, consumed_);
EXPECT_THAT(args_, ElementsAre("KEY", "VAL"));
}

Expand All @@ -129,8 +129,9 @@ TEST_F(RedisParserTest, Multi3) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kFirst));
ASSERT_EQ(strlen(kFirst) - 4, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kSecond));
ASSERT_EQ(strlen(kSecond) - 3, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("VXK\r\n*3\r\n$3\r\nSET"));
ASSERT_EQ(strlen(kSecond), consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n*3\r\n$3\r\nSET"));
ASSERT_EQ(2, consumed_);
EXPECT_THAT(args_, ElementsAre("SET", "key:000002273458", "VXK"));
}

Expand Down

0 comments on commit 4f1df14

Please sign in to comment.