Skip to content

Commit

Permalink
chore: improve the state machine of RedisParser (#4085)
Browse files Browse the repository at this point in the history
1. Simplify conditions inside the main loop.
2. Improve the logic inside ConsumeBulk() function.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Nov 15, 2024
1 parent c46cb25 commit 2ff6bf3
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 34 deletions.
51 changes: 28 additions & 23 deletions src/facade/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
ResultConsumed resultc{OK, 0};

do {
if (str.empty()) {
resultc.first = INPUT_PENDING;
break;
}

switch (state_) {
case MAP_LEN_S:
case ARRAY_LEN_S:
Expand All @@ -61,16 +56,21 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
}

*consumed += resultc.second;
str.remove_prefix(exchange(resultc.second, 0));
} while (state_ != CMD_COMPLETE_S && resultc.first == OK && !str.empty());

if (resultc.first != OK) {
break;
if (state_ != CMD_COMPLETE_S) {
if (resultc.first == OK) {
resultc.first = INPUT_PENDING;
}
str.remove_prefix(exchange(resultc.second, 0));
} while (state_ != CMD_COMPLETE_S);

if (resultc.first == INPUT_PENDING) {
StashState(res);
} else if (resultc.first == OK) {
if (resultc.first == INPUT_PENDING) {
StashState(res);
}
return resultc.first;
}

if (resultc.first == OK) {
DCHECK(cached_expr_);
if (res != cached_expr_) {
DCHECK(!stash_.empty());
Expand All @@ -82,7 +82,7 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
return resultc.first;
}

void RedisParser::InitStart(uint8_t prefix_b, RespExpr::Vec* res) {
void RedisParser::InitStart(char prefix_b, RespExpr::Vec* res) {
buf_stash_.clear();
stash_.clear();
cached_expr_ = res;
Expand Down Expand Up @@ -287,6 +287,7 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
}

auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
DCHECK(!str.empty());
char c = str[0];

if (c == '$') {
Expand Down Expand Up @@ -389,25 +390,29 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {

uint32_t consumed = 0;

if (str.size() >= bulk_len_ + 2) {
if (str[bulk_len_] != '\r' || str[bulk_len_ + 1] != '\n') {
return {BAD_STRING, 0};
}

if (str.size() >= bulk_len_) {
consumed = bulk_len_;
if (bulk_len_) {
// is_broken_token_ can be false, if we just parsed the bulk length but have
// not parsed the token itself.
if (is_broken_token_) {
memcpy(bulk_str.end(), str.data(), bulk_len_);
bulk_str = Buffer{bulk_str.data(), bulk_str.size() + bulk_len_};
} else {
bulk_str = str.subspan(0, bulk_len_);
}
str.remove_prefix(exchange(bulk_len_, 0));
is_broken_token_ = false;
}
is_broken_token_ = false;
consumed = bulk_len_ + 2;
bulk_len_ = 0;
HandleFinishArg();

return {OK, consumed};
if (str.size() >= 2) {
if (str[0] != '\r' || str[1] != '\n') {
return {BAD_STRING, consumed};
}
HandleFinishArg();
return {OK, consumed + 2};
}
return {INPUT_PENDING, consumed};
}

if (str.size() >= 32) {
Expand Down
5 changes: 2 additions & 3 deletions src/facade/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ class RedisParser {
private:
using ResultConsumed = std::pair<Result, uint32_t>;

void InitStart(uint8_t prefix_b, RespVec* res);
void InitStart(char prefix_b, RespVec* res);
void StashState(RespVec* res);

// Skips the first character (*).
ResultConsumed ConsumeArrayLen(Buffer str);
ResultConsumed ParseArg(Buffer str);
ResultConsumed ConsumeBulk(Buffer str);
ResultConsumed ParseInline(Buffer str);

ResultConsumed ParseLen(Buffer str, int64_t* res);

void HandleFinishArg();
Expand All @@ -98,7 +97,7 @@ class RedisParser {
};

State state_ = CMD_COMPLETE_S;
bool is_broken_token_ = false; // whether the last inline string was broken in the middle.
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
bool server_mode_ = true;

uint32_t bulk_len_ = 0;
Expand Down
17 changes: 9 additions & 8 deletions src/facade/redis_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ TEST_F(RedisParserTest, Multi2) {
EXPECT_EQ(4, consumed_);

ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("$4\r\nMSET"));
EXPECT_EQ(4, consumed_);
EXPECT_EQ(8, consumed_);

ASSERT_EQ(RedisParser::OK, Parse("MSET\r\n*2\r\n"));
EXPECT_EQ(6, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n*2\r\n"));
EXPECT_EQ(2, consumed_);

ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*2\r\n$3\r\nKEY\r\n$3\r\nVAL"));
EXPECT_EQ(17, consumed_);
EXPECT_EQ(20, consumed_);

ASSERT_EQ(RedisParser::OK, Parse("VAL\r\n"));
EXPECT_EQ(5, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
EXPECT_EQ(2, consumed_);
EXPECT_THAT(args_, ElementsAre("KEY", "VAL"));
}

Expand All @@ -129,8 +129,9 @@ TEST_F(RedisParserTest, Multi3) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kFirst));
ASSERT_EQ(strlen(kFirst) - 4, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kSecond));
ASSERT_EQ(strlen(kSecond) - 3, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("VXK\r\n*3\r\n$3\r\nSET"));
ASSERT_EQ(strlen(kSecond), consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n*3\r\n$3\r\nSET"));
ASSERT_EQ(2, consumed_);
EXPECT_THAT(args_, ElementsAre("SET", "key:000002273458", "VXK"));
}

Expand Down

0 comments on commit 2ff6bf3

Please sign in to comment.