From 5e99b276e5983acc142a7606fd62445b4914b41a Mon Sep 17 00:00:00 2001 From: Twice Date: Wed, 29 May 2024 21:17:50 +0900 Subject: [PATCH] Improve RESP handling code in replication (#2334) --- src/cluster/replication.cc | 33 +++++++++++++++------------ src/cluster/replication.h | 6 ++--- src/commands/cmd_replication.cc | 5 ++-- src/commands/cmd_server.cc | 4 +++- src/commands/error_constants.h | 2 ++ src/common/event_util.h | 3 +++ src/server/redis_connection.cc | 8 ++++--- src/server/redis_reply.cc | 4 ++-- src/server/redis_reply.h | 4 +++- tests/gocase/unit/hello/hello_test.go | 4 ++-- 10 files changed, 44 insertions(+), 29 deletions(-) diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index ea9c5587376..a3567284091 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -33,6 +33,7 @@ #include #include +#include "commands/error_constants.h" #include "event_util.h" #include "fmt/format.h" #include "io_util.h" @@ -402,13 +403,13 @@ ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) { return CBState::NEXT; } -inline bool ResponseLineIsOK(const char *line) { return strncmp(line, "+OK", 3) == 0; } +inline bool ResponseLineIsOK(std::string_view line) { return line == RESP_PREFIX_SIMPLE_STRING "OK"; } ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) { // NOLINT auto input = bufferevent_get_input(bev); UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT); if (!line) return CBState::AGAIN; - if (!ResponseLineIsOK(line.get())) { + if (!ResponseLineIsOK(line.View())) { // Auth failed LOG(ERROR) << "[replication] Auth failed: " << line.get(); return CBState::RESTART; @@ -430,7 +431,7 @@ ReplicationThread::CBState ReplicationThread::checkDBNameReadCB(bufferevent *bev if (!line) return CBState::AGAIN; if (line[0] == '-') { - if (isRestoringError(line.get())) { + if (isRestoringError(line.View())) { LOG(WARNING) << "The master was restoring the db, retry later"; } else { LOG(ERROR) << "Failed to get the db name, " << line.get(); @@ -468,18 +469,18 @@ ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev) { if (!line) return CBState::AGAIN; // on unknown option: first try without announce ip, if it fails again - do nothing (to prevent infinite loop) - if (isUnknownOption(line.get()) && !next_try_without_announce_ip_address_) { + if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) { next_try_without_announce_ip_address_ = true; LOG(WARNING) << "The old version master, can't handle ip-address, " << "try without it again"; // Retry previous state, i.e. send replconf again return CBState::PREV; } - if (line[0] == '-' && isRestoringError(line.get())) { + if (line[0] == '-' && isRestoringError(line.View())) { LOG(WARNING) << "The master was restoring the db, retry later"; return CBState::RESTART; } - if (!ResponseLineIsOK(line.get())) { + if (!ResponseLineIsOK(line.View())) { LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1; // backward compatible with old version that doesn't support replconf cmd return CBState::NEXT; @@ -530,12 +531,12 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) { UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT); if (!line) return CBState::AGAIN; - if (line[0] == '-' && isRestoringError(line.get())) { + if (line[0] == '-' && isRestoringError(line.View())) { LOG(WARNING) << "The master was restoring the db, retry later"; return CBState::RESTART; } - if (line[0] == '-' && isWrongPsyncNum(line.get())) { + if (line[0] == '-' && isWrongPsyncNum(line.View())) { next_try_old_psync_ = true; LOG(WARNING) << "The old version master, can't handle new PSYNC, " << "try old PSYNC again"; @@ -543,7 +544,7 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) { return CBState::PREV; } - if (!ResponseLineIsOK(line.get())) { + if (!ResponseLineIsOK(line.View())) { // PSYNC isn't OK, we should use FullSync // Switch to fullsync state machine fullsync_steps_.Start(); @@ -844,7 +845,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) { } UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT); if (!line) continue; - if (!ResponseLineIsOK(line.get())) { + if (!ResponseLineIsOK(line.View())) { return {Status::NotOK, "auth got invalid response"}; } break; @@ -998,15 +999,17 @@ Status ReplicationThread::parseWriteBatch(const std::string &batch_string) { return Status::OK(); } -bool ReplicationThread::isRestoringError(const char *err) { - return std::string(err) == "-ERR restoring the db from backup"; +bool ReplicationThread::isRestoringError(std::string_view err) { + return err == std::string(RESP_PREFIX_ERROR) + redis::errRestoringBackup; } -bool ReplicationThread::isWrongPsyncNum(const char *err) { - return std::string(err) == "-ERR wrong number of arguments"; +bool ReplicationThread::isWrongPsyncNum(std::string_view err) { + return err == std::string(RESP_PREFIX_ERROR) + redis::errWrongNumArguments; } -bool ReplicationThread::isUnknownOption(const char *err) { return std::string(err) == "-ERR unknown option"; } +bool ReplicationThread::isUnknownOption(std::string_view err) { + return err == fmt::format("{}ERR {}", RESP_PREFIX_ERROR, redis::errUnknownOption); +} rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const rocksdb::Slice &key, const rocksdb::Slice &value) { diff --git a/src/cluster/replication.h b/src/cluster/replication.h index b223bd6a0e5..8da25713920 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -204,9 +204,9 @@ class ReplicationThread : private EventCallbackBase { Status fetchFiles(int sock_fd, const std::string &dir, const std::vector &files, const std::vector &crcs, const FetchFileCallback &fn, ssl_st *ssl); Status parallelFetchFile(const std::string &dir, const std::vector> &files); - static bool isRestoringError(const char *err); - static bool isWrongPsyncNum(const char *err); - static bool isUnknownOption(const char *err); + static bool isRestoringError(std::string_view err); + static bool isWrongPsyncNum(std::string_view err); + static bool isUnknownOption(std::string_view err); Status parseWriteBatch(const std::string &batch_string); }; diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index 5c46e23d278..5ec6faa03b6 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -22,6 +22,7 @@ #include "error_constants.h" #include "io_util.h" #include "scope_exit.h" +#include "server/redis_reply.h" #include "server/server.h" #include "thread_util.h" #include "time_util.h" @@ -101,7 +102,7 @@ class CommandPSync : public Commander { srv->stats.IncrPSyncOKCount(); s = srv->AddSlave(conn, next_repl_seq_); if (!s.IsOK()) { - std::string err = "-ERR " + s.Msg() + "\r\n"; + std::string err = redis::Error(s.Msg()); s = util::SockSend(conn->GetFD(), err, conn->GetBufferEvent()); if (!s.IsOK()) { LOG(WARNING) << "failed to send error message to the replica: " << s.Msg(); @@ -229,7 +230,7 @@ class CommandFetchMeta : public Commander { std::string files; auto s = engine::Storage::ReplDataManager::GetFullReplDataInfo(srv->storage, &files); if (!s.IsOK()) { - s = util::SockSend(repl_fd, "-ERR can't create db checkpoint", bev); + s = util::SockSend(repl_fd, redis::Error("can't create db checkpoint"), bev); if (!s.IsOK()) { LOG(WARNING) << "[replication] Failed to send error response: " << s.Msg(); } diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index 28c31603c80..7362e298fc7 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -29,6 +29,7 @@ #include "config/config.h" #include "error_constants.h" #include "server/redis_connection.h" +#include "server/redis_reply.h" #include "server/server.h" #include "stats/disk_stats.h" #include "storage/rdb.h" @@ -740,7 +741,8 @@ class CommandHello final : public Commander { // kvrocks only supports REPL2 by now, but for supporting some // `hello 3`, it will not report error when using 3. if (protocol < 2 || protocol > 3) { - return {Status::NotOK, "-NOPROTO unsupported protocol version"}; + conn->Reply(redis::Error("NOPROTO unsupported protocol version")); + return Status::OK(); } } diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h index 43c7440da09..2074a705531 100644 --- a/src/commands/error_constants.h +++ b/src/commands/error_constants.h @@ -43,5 +43,7 @@ inline constexpr const char *errValueIsNotFloat = "value is not a valid float"; inline constexpr const char *errNoMatchingScript = "NOSCRIPT No matching script. Please use EVAL"; inline constexpr const char *errUnknownOption = "unknown option"; inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown subcommand or wrong number of arguments"; +inline constexpr const char *errWrongNumArguments = "ERR wrong number of arguments"; +inline constexpr const char *errRestoringBackup = "LOADING kvrocks is restoring the db from backup"; } // namespace redis diff --git a/src/common/event_util.h b/src/common/event_util.h index fccdfd5336b..f3c83012430 100644 --- a/src/common/event_util.h +++ b/src/common/event_util.h @@ -22,6 +22,7 @@ #include #include +#include #include #include "event2/buffer.h" @@ -44,6 +45,8 @@ struct UniqueEvbufReadln : UniqueFreePtr { : UniqueFreePtr(evbuffer_readln(buffer, &length, eol_style)) {} size_t length; + + std::string_view View() { return {get(), length}; } }; using StaticEvbufFree = StaticFunction; diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index 4a84f0a7dc7..536c4d2067b 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -26,7 +26,9 @@ #include #include "commands/commander.h" +#include "commands/error_constants.h" #include "fmt/format.h" +#include "server/redis_reply.h" #include "string_util.h" #ifdef ENABLE_OPENSSL #include @@ -138,7 +140,7 @@ std::string Connection::Bool(bool b) const { } std::string Connection::MultiBulkString(const std::vector &values) const { - std::string result = "*" + std::to_string(values.size()) + CRLF; + std::string result = MultiLen(values.size()); for (const auto &value : values) { if (value.empty()) { result += NilString(); @@ -151,7 +153,7 @@ std::string Connection::MultiBulkString(const std::vector &values) std::string Connection::MultiBulkString(const std::vector &values, const std::vector &statuses) const { - std::string result = "*" + std::to_string(values.size()) + CRLF; + std::string result = MultiLen(values.size()); for (size_t i = 0; i < values.size(); i++) { if (i < statuses.size() && !statuses[i].ok()) { result += NilString(); @@ -470,7 +472,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { } if (srv_->IsLoading() && !(cmd_flags & kCmdLoading)) { - Reply(redis::Error("LOADING kvrocks is restoring the db from backup")); + Reply(redis::Error(errRestoringBackup)); if (is_multi_exec) multi_error_ = true; continue; } diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc index 95bbf9fde03..d2143b6a787 100644 --- a/src/server/redis_reply.cc +++ b/src/server/redis_reply.cc @@ -34,7 +34,7 @@ std::string BulkString(const std::string &data) { return "$" + std::to_string(da std::string Array(const std::vector &list) { size_t n = std::accumulate(list.begin(), list.end(), 0, [](size_t n, const std::string &s) { return n + s.size(); }); - std::string result = "*" + std::to_string(list.size()) + CRLF; + std::string result = MultiLen(list.size()); std::string::size_type final_size = result.size() + n; result.reserve(final_size); for (const auto &i : list) result += i; @@ -42,7 +42,7 @@ std::string Array(const std::vector &list) { } std::string ArrayOfBulkStrings(const std::vector &elems) { - std::string result = "*" + std::to_string(elems.size()) + CRLF; + std::string result = MultiLen(elems.size()); for (const auto &elem : elems) { result += BulkString(elem); } diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h index 213a8bc0849..c7fd3b2a8e2 100644 --- a/src/server/redis_reply.h +++ b/src/server/redis_reply.h @@ -25,7 +25,9 @@ #include #include -#define CRLF "\r\n" // NOLINT +#define CRLF "\r\n" // NOLINT +#define RESP_PREFIX_ERROR "-" // NOLINT +#define RESP_PREFIX_SIMPLE_STRING "+" // NOLINT namespace redis { diff --git a/tests/gocase/unit/hello/hello_test.go b/tests/gocase/unit/hello/hello_test.go index 24f8baf36e6..5b6eea35d01 100644 --- a/tests/gocase/unit/hello/hello_test.go +++ b/tests/gocase/unit/hello/hello_test.go @@ -38,7 +38,7 @@ func TestHello(t *testing.T) { t.Run("hello with wrong protocol", func(t *testing.T) { r := rdb.Do(ctx, "HELLO", "1") - require.ErrorContains(t, r.Err(), "-NOPROTO unsupported protocol version") + require.ErrorContains(t, r.Err(), "NOPROTO unsupported protocol version") }) t.Run("hello with protocol 2", func(t *testing.T) { @@ -61,7 +61,7 @@ func TestHello(t *testing.T) { t.Run("hello with wrong protocol", func(t *testing.T) { r := rdb.Do(ctx, "HELLO", "5") - require.ErrorContains(t, r.Err(), "-NOPROTO unsupported protocol version") + require.ErrorContains(t, r.Err(), "NOPROTO unsupported protocol version") }) t.Run("hello with non protocol", func(t *testing.T) {