Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve the state machine of RedisParser #4085

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions src/facade/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
ResultConsumed resultc{OK, 0};

do {
if (str.empty()) {
resultc.first = INPUT_PENDING;
break;
}

switch (state_) {
case MAP_LEN_S:
case ARRAY_LEN_S:
Expand All @@ -61,16 +56,21 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
}

*consumed += resultc.second;
str.remove_prefix(exchange(resultc.second, 0));
} while (state_ != CMD_COMPLETE_S && resultc.first == OK && !str.empty());

if (resultc.first != OK) {
break;
if (state_ != CMD_COMPLETE_S) {
if (resultc.first == OK) {
resultc.first = INPUT_PENDING;
}
str.remove_prefix(exchange(resultc.second, 0));
} while (state_ != CMD_COMPLETE_S);

if (resultc.first == INPUT_PENDING) {
StashState(res);
} else if (resultc.first == OK) {
if (resultc.first == INPUT_PENDING) {
StashState(res);
}
return resultc.first;
}

if (resultc.first == OK) {
DCHECK(cached_expr_);
if (res != cached_expr_) {
DCHECK(!stash_.empty());
Expand All @@ -82,7 +82,7 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
return resultc.first;
}

void RedisParser::InitStart(uint8_t prefix_b, RespExpr::Vec* res) {
void RedisParser::InitStart(char prefix_b, RespExpr::Vec* res) {
buf_stash_.clear();
stash_.clear();
cached_expr_ = res;
Expand Down Expand Up @@ -287,6 +287,7 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
}

auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
DCHECK(!str.empty());
char c = str[0];

if (c == '$') {
Expand Down Expand Up @@ -389,25 +390,29 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {

uint32_t consumed = 0;

if (str.size() >= bulk_len_ + 2) {
if (str[bulk_len_] != '\r' || str[bulk_len_ + 1] != '\n') {
return {BAD_STRING, 0};
}

if (str.size() >= bulk_len_) {
consumed = bulk_len_;
if (bulk_len_) {
// is_broken_token_ can be false, if we just parsed the bulk length but have
// not parsed the token itself.
if (is_broken_token_) {
memcpy(bulk_str.end(), str.data(), bulk_len_);
bulk_str = Buffer{bulk_str.data(), bulk_str.size() + bulk_len_};
} else {
bulk_str = str.subspan(0, bulk_len_);
}
str.remove_prefix(exchange(bulk_len_, 0));
is_broken_token_ = false;
}
is_broken_token_ = false;
consumed = bulk_len_ + 2;
bulk_len_ = 0;
HandleFinishArg();

return {OK, consumed};
if (str.size() >= 2) {
if (str[0] != '\r' || str[1] != '\n') {
return {BAD_STRING, consumed};
}
HandleFinishArg();
return {OK, consumed + 2};
}
return {INPUT_PENDING, consumed};
}

if (str.size() >= 32) {
Expand Down
5 changes: 2 additions & 3 deletions src/facade/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ class RedisParser {
private:
using ResultConsumed = std::pair<Result, uint32_t>;

void InitStart(uint8_t prefix_b, RespVec* res);
void InitStart(char prefix_b, RespVec* res);
void StashState(RespVec* res);

// Skips the first character (*).
ResultConsumed ConsumeArrayLen(Buffer str);
ResultConsumed ParseArg(Buffer str);
ResultConsumed ConsumeBulk(Buffer str);
ResultConsumed ParseInline(Buffer str);

ResultConsumed ParseLen(Buffer str, int64_t* res);

void HandleFinishArg();
Expand All @@ -98,7 +97,7 @@ class RedisParser {
};

State state_ = CMD_COMPLETE_S;
bool is_broken_token_ = false; // whether the last inline string was broken in the middle.
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
bool server_mode_ = true;

uint32_t bulk_len_ = 0;
Expand Down
17 changes: 9 additions & 8 deletions src/facade/redis_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ TEST_F(RedisParserTest, Multi2) {
EXPECT_EQ(4, consumed_);

ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("$4\r\nMSET"));
EXPECT_EQ(4, consumed_);
EXPECT_EQ(8, consumed_);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we consume the bulk string token even if we have not parsed '\r\n' fully.


ASSERT_EQ(RedisParser::OK, Parse("MSET\r\n*2\r\n"));
EXPECT_EQ(6, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n*2\r\n"));
EXPECT_EQ(2, consumed_);

ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*2\r\n$3\r\nKEY\r\n$3\r\nVAL"));
EXPECT_EQ(17, consumed_);
EXPECT_EQ(20, consumed_);

ASSERT_EQ(RedisParser::OK, Parse("VAL\r\n"));
EXPECT_EQ(5, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
EXPECT_EQ(2, consumed_);
EXPECT_THAT(args_, ElementsAre("KEY", "VAL"));
}

Expand All @@ -129,8 +129,9 @@ TEST_F(RedisParserTest, Multi3) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kFirst));
ASSERT_EQ(strlen(kFirst) - 4, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kSecond));
ASSERT_EQ(strlen(kSecond) - 3, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("VXK\r\n*3\r\n$3\r\nSET"));
ASSERT_EQ(strlen(kSecond), consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n*3\r\n$3\r\nSET"));
ASSERT_EQ(2, consumed_);
EXPECT_THAT(args_, ElementsAre("SET", "key:000002273458", "VXK"));
}

Expand Down
Loading