From cc4f63d67afa0636eeb344b648a17a5ecd5d1c49 Mon Sep 17 00:00:00 2001 From: Twice Date: Sat, 16 Nov 2024 14:22:40 +0800 Subject: [PATCH 1/5] feat(cmd): all blocking commands should be no-script (#2666) --- .gitignore | 1 + src/commands/cmd_list.cc | 7 +++---- src/commands/cmd_server.cc | 2 +- src/commands/commander.h | 8 ++++++-- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index c9a9027a069..cd2fd010b28 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,4 @@ testdb build cmake-build-* +build-* diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index 1a9f5d03dfc..6b97eb1557a 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -894,10 +894,9 @@ class CommandLPos : public Commander { PosSpec spec_; }; -REDIS_REGISTER_COMMANDS(List, MakeCmdAttr("blpop", -3, "write no-script blocking", 1, -2, 1), - MakeCmdAttr("brpop", -3, "write no-script blocking", 1, -2, 1), - MakeCmdAttr("blmpop", -5, "write no-script blocking", - CommandBLMPop::keyRangeGen), +REDIS_REGISTER_COMMANDS(List, MakeCmdAttr("blpop", -3, "write blocking", 1, -2, 1), + MakeCmdAttr("brpop", -3, "write blocking", 1, -2, 1), + MakeCmdAttr("blmpop", -5, "write blocking", CommandBLMPop::keyRangeGen), MakeCmdAttr("lindex", 3, "read-only", 1, 1, 1), MakeCmdAttr("linsert", 5, "write slow", 1, 1, 1), MakeCmdAttr("llen", 2, "read-only", 1, 1, 1), diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index e2938267710..ef424a14c81 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1343,7 +1343,7 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr("auth", 2, "read-only o MakeCmdAttr("slowlog", -2, "read-only", NO_KEY), MakeCmdAttr("perflog", -2, "read-only", NO_KEY), MakeCmdAttr("client", -2, "read-only", NO_KEY), - MakeCmdAttr("monitor", 1, "read-only no-multi", NO_KEY), + MakeCmdAttr("monitor", 1, "read-only no-multi no-script", NO_KEY), MakeCmdAttr("shutdown", 1, "read-only no-multi no-script", NO_KEY), MakeCmdAttr("quit", 1, "read-only", NO_KEY), MakeCmdAttr("scan", -2, "read-only", NO_KEY), diff --git a/src/commands/commander.h b/src/commands/commander.h index ac3d3aa9939..bf172a79408 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -328,9 +328,13 @@ inline uint64_t ParseCommandFlags(const std::string &description, const std::str flags |= kCmdNoDBSizeCheck; else if (flag == "slow") flags |= kCmdSlow; - else if (flag == "blocking") + else if (flag == "blocking") { flags |= kCmdBlocking; - else { + + // blocking commands should always be no-script + // TODO: we can relax this restriction if scripting becomes non-exclusive + flags |= kCmdNoScript; + } else { std::cout << fmt::format("Encountered non-existent flag '{}' in command {} in command attribute parsing", flag, cmd_name) << std::endl; From eb4de5c63129cda2f9a0365eee5118094889d013 Mon Sep 17 00:00:00 2001 From: Twice Date: Sat, 16 Nov 2024 17:14:37 +0800 Subject: [PATCH 2/5] feat(cmd): add the auth flag for authentication checking (#2669) --- src/commands/cmd_server.cc | 4 ++-- src/commands/commander.h | 4 ++++ src/server/redis_connection.cc | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index ef424a14c81..2cc56919647 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1329,7 +1329,7 @@ class CommandPollUpdates : public Commander { Format format_ = Format::Raw; }; -REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr("auth", 2, "read-only ok-loading", NO_KEY), +REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr("auth", 2, "read-only ok-loading auth", NO_KEY), MakeCmdAttr("ping", -1, "read-only", NO_KEY), MakeCmdAttr("select", 2, "read-only", NO_KEY), MakeCmdAttr("info", -1, "read-only ok-loading", NO_KEY), @@ -1354,7 +1354,7 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr("auth", 2, "read-only o MakeCmdAttr("time", 1, "read-only ok-loading", NO_KEY), MakeCmdAttr("disk", 3, "read-only", 2, 2, 1), MakeCmdAttr("memory", 3, "read-only", 2, 2, 1), - MakeCmdAttr("hello", -1, "read-only ok-loading", NO_KEY), + MakeCmdAttr("hello", -1, "read-only ok-loading auth", NO_KEY), MakeCmdAttr("restore", -4, "write", 1, 1, 1), MakeCmdAttr("compact", 1, "read-only no-script", NO_KEY), diff --git a/src/commands/commander.h b/src/commands/commander.h index bf172a79408..b2a6ae3d330 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -80,6 +80,8 @@ enum CommandFlags : uint64_t { // "blocking" flag, for commands that don't perform db ops immediately, // but block and wait for some event to happen before performing db ops kCmdBlocking = 1ULL << 14, + // "auth" flag, for commands used for authentication + kCmdAuth = 1ULL << 15, }; enum class CommandCategory : uint8_t { @@ -328,6 +330,8 @@ inline uint64_t ParseCommandFlags(const std::string &description, const std::str flags |= kCmdNoDBSizeCheck; else if (flag == "slow") flags |= kCmdSlow; + else if (flag == "auth") + flags |= kCmdAuth; else if (flag == "blocking") { flags |= kCmdBlocking; diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index e6bef06d59f..ca00de1684b 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -397,7 +397,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { if (GetNamespace().empty()) { if (!password.empty()) { - if (cmd_name != "auth" && cmd_name != "hello") { + if (!(cmd_flags & kCmdAuth)) { Reply(redis::Error({Status::RedisNoAuth, "Authentication required."})); continue; } From 5e9db7995192be8cfc0edc3ac9775081580580c2 Mon Sep 17 00:00:00 2001 From: sryan yuan Date: Sat, 16 Nov 2024 23:22:56 +0800 Subject: [PATCH 3/5] fix(replication): slave blocks until keepalive timer is reached when master is gone without fin/rst notification (#2662) Co-authored-by: yxj25245 Co-authored-by: hulk Co-authored-by: Twice Co-authored-by: Twice --- kvrocks.conf | 14 ++++++++++++++ src/cluster/replication.cc | 20 +++++++++++++++++--- src/common/io_util.cc | 12 ++++++++++-- src/common/status.h | 4 ++++ src/config/config.cc | 2 ++ src/config/config.h | 2 ++ 6 files changed, 49 insertions(+), 5 deletions(-) diff --git a/kvrocks.conf b/kvrocks.conf index 0ff0ce50508..13b1fb6c7f4 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -174,6 +174,20 @@ slave-read-only yes # By default the priority is 100. slave-priority 100 +# Change the default timeout in milliseconds for socket connect during replication. +# The default value is 3100, and 0 means no timeout. +# +# If the master is unreachable before connecting, not having a timeout may block future +# 'clusterx setnodes' commands because the replication thread is blocked on connect. +replication-connect-timeout-ms 3100 + +# Change the default timeout in milliseconds for socket recv during fullsync. +# The default value is 3200, and 0 means no timeout. +# +# If the master is unreachable when fetching SST files, not having a timeout may block +# future 'clusterx setnodes' commands because the replication thread is blocked on recv. +replication-recv-timeout-ms 3200 + # TCP listen() backlog. # # In high requests-per-second environments you need an high backlog in order diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index dff2d3d7956..cd7fe197a76 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -252,7 +252,6 @@ void ReplicationThread::CallbacksStateMachine::Start() { } uint64_t last_connect_timestamp = 0; - int connect_timeout_ms = 3100; while (!repl_->stop_flag_ && bev == nullptr) { if (util::GetTimeStampMS() - last_connect_timestamp < 1000) { @@ -260,7 +259,7 @@ void ReplicationThread::CallbacksStateMachine::Start() { sleep(1); } last_connect_timestamp = util::GetTimeStampMS(); - auto cfd = util::SockConnect(repl_->host_, repl_->port_, connect_timeout_ms); + auto cfd = util::SockConnect(repl_->host_, repl_->port_, repl_->srv_->GetConfig()->replication_connect_timeout_ms); if (!cfd) { LOG(ERROR) << "[replication] Failed to connect the master, err: " << cfd.Msg(); continue; @@ -777,7 +776,10 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir, } auto exit = MakeScopeExit([ssl] { SSL_free(ssl); }); #endif - int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl).Prefixed("connect the server err")); + int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl, + this->srv_->GetConfig()->replication_connect_timeout_ms, + this->srv_->GetConfig()->replication_recv_timeout_ms) + .Prefixed("connect the server err")); #ifdef ENABLE_OPENSSL exit.Disable(); #endif @@ -874,6 +876,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT); if (!line) { if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) { + if (s.Is()) { + if (stop_flag_) { + return {Status::NotOK, "replication thread was stopped"}; + } + continue; + } return std::move(s).Prefixed("read size"); } continue; @@ -907,6 +915,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str remain -= data_len; } else { if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) { + if (s.Is()) { + if (stop_flag_) { + return {Status::NotOK, "replication thread was stopped"}; + } + continue; + } return std::move(s).Prefixed("read sst file"); } } diff --git a/src/common/io_util.cc b/src/common/io_util.cc index 35fa80d9472..23cccc69fb7 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -502,7 +502,12 @@ StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may howmuch = BUFFER_SIZE; } if (howmuch = SSL_read(ssl, tmp, howmuch); howmuch <= 0) { - return {Status::NotOK, fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))}; + int err = SSL_get_error(ssl, howmuch); + if (err == SSL_ERROR_ZERO_RETURN) { + return {Status::EndOfFile, "EOF encountered while reading from SSL connection"}; + } + return {(err == SSL_ERROR_WANT_READ) ? Status::TryAgain : Status::NotOK, + fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))}; } if (int ret = evbuffer_add(buf, tmp, howmuch); ret == -1) { @@ -514,8 +519,11 @@ StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may #endif if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) { return ret; + } else if (ret == 0) { + return {Status::EndOfFile, "EOF encountered while reading from socket"}; } else { - return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; + return {(errno == EWOULDBLOCK || errno == EAGAIN) ? Status::TryAgain : Status::NotOK, + fmt::format("failed to read from socket: {}", strerror(errno))}; } } diff --git a/src/common/status.h b/src/common/status.h index b4b228a05ef..823e5681c9b 100644 --- a/src/common/status.h +++ b/src/common/status.h @@ -75,6 +75,10 @@ class [[nodiscard]] Status { // Search NoPrefixMatched, TypeMismatched, + + // IO + TryAgain, + EndOfFile, }; Status() : impl_{nullptr} {} diff --git a/src/config/config.cc b/src/config/config.cc index 165e352f25d..c2491844253 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -203,6 +203,8 @@ Config::Config() { {"slave-empty-db-before-fullsync", false, new YesNoField(&slave_empty_db_before_fullsync, false)}, {"slave-priority", false, new IntField(&slave_priority, 100, 0, INT_MAX)}, {"slave-read-only", false, new YesNoField(&slave_readonly, true)}, + {"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)}, + {"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)}, {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)}, {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)}, {"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)}, diff --git a/src/config/config.h b/src/config/config.h index 3b9d99de2e9..3dcc8d87002 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -105,6 +105,8 @@ struct Config { bool slave_serve_stale_data = true; bool slave_empty_db_before_fullsync = false; int slave_priority = 100; + int replication_connect_timeout_ms = 3100; + int replication_recv_timeout_ms = 3200; int max_db_size = 0; int max_replication_mb = 0; int max_io_mb = 0; From c6d9ad2916eb0ede75fff5427fe7dd2966093d42 Mon Sep 17 00:00:00 2001 From: Twice Date: Sun, 17 Nov 2024 12:22:00 +0800 Subject: [PATCH 4/5] feat(conn): avoid to check the command name directly (#2668) --- src/commands/cmd_server.cc | 2 +- src/commands/cmd_txn.cc | 14 +++++--------- src/commands/commander.h | 9 +++++---- src/common/string_util.cc | 5 +++++ src/common/string_util.h | 1 + src/server/redis_connection.cc | 6 +++--- tests/gocase/unit/multi/multi_test.go | 2 +- 7 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index 2cc56919647..c7f742656d0 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1364,7 +1364,7 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr("auth", 2, "read-only o MakeCmdAttr("slaveof", 3, "read-only exclusive no-script", NO_KEY), MakeCmdAttr("stats", 1, "read-only", NO_KEY), MakeCmdAttr("rdb", -3, "write exclusive", NO_KEY), - MakeCmdAttr("reset", 1, "ok-loading multi no-script", NO_KEY), + MakeCmdAttr("reset", 1, "ok-loading bypass-multi no-script", NO_KEY), MakeCmdAttr("applybatch", -2, "write no-multi", NO_KEY), MakeCmdAttr("dump", 2, "read-only", 1, 1, 1), MakeCmdAttr("pollupdates", -2, "read-only", NO_KEY), ) diff --git a/src/commands/cmd_txn.cc b/src/commands/cmd_txn.cc index 3d88d9a2ec4..5f922ddf5ef 100644 --- a/src/commands/cmd_txn.cc +++ b/src/commands/cmd_txn.cc @@ -98,10 +98,6 @@ class CommandExec : public Commander { class CommandWatch : public Commander { public: Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { - if (conn->IsFlagEnabled(Connection::kMultiExec)) { - return {Status::RedisExecErr, "WATCH inside MULTI is not allowed"}; - } - // If a conn is already marked as watched_keys_modified, we can skip the watch. if (srv->IsWatchedKeysModified(conn)) { *output = redis::RESP_OK; @@ -123,10 +119,10 @@ class CommandUnwatch : public Commander { } }; -REDIS_REGISTER_COMMANDS(Txn, MakeCmdAttr("multi", 1, "multi", NO_KEY), - MakeCmdAttr("discard", 1, "multi", NO_KEY), - MakeCmdAttr("exec", 1, "exclusive multi slow", NO_KEY), - MakeCmdAttr("watch", -2, "multi", 1, -1, 1), - MakeCmdAttr("unwatch", 1, "multi", NO_KEY), ) +REDIS_REGISTER_COMMANDS(Txn, MakeCmdAttr("multi", 1, "bypass-multi", NO_KEY), + MakeCmdAttr("discard", 1, "bypass-multi", NO_KEY), + MakeCmdAttr("exec", 1, "exclusive bypass-multi slow", NO_KEY), + MakeCmdAttr("watch", -2, "no-multi", 1, -1, 1), + MakeCmdAttr("unwatch", 1, "no-multi", NO_KEY), ) } // namespace redis diff --git a/src/commands/commander.h b/src/commands/commander.h index b2a6ae3d330..f0589c92c42 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -64,8 +64,9 @@ enum CommandFlags : uint64_t { // "ok-loading" flag, for any command that can be executed while // the db is in loading phase kCmdLoading = 1ULL << 5, - // "multi" flag, for commands that can end a MULTI scope - kCmdEndMulti = 1ULL << 6, + // "bypass-multi" flag, for commands that can be executed in a MULTI scope, + // but these commands will NOT be queued and will be executed immediately + kCmdBypassMulti = 1ULL << 6, // "exclusive" flag, for commands that should be executed execlusive globally kCmdExclusive = 1ULL << 7, // "no-multi" flag, for commands that cannot be executed in MULTI scope @@ -320,8 +321,8 @@ inline uint64_t ParseCommandFlags(const std::string &description, const std::str flags |= kCmdLoading; else if (flag == "exclusive") flags |= kCmdExclusive; - else if (flag == "multi") - flags |= kCmdEndMulti; + else if (flag == "bypass-multi") + flags |= kCmdBypassMulti; else if (flag == "no-multi") flags |= kCmdNoMulti; else if (flag == "no-script") diff --git a/src/common/string_util.cc b/src/common/string_util.cc index cce6440227a..c1e6f7e3ffe 100644 --- a/src/common/string_util.cc +++ b/src/common/string_util.cc @@ -42,6 +42,11 @@ std::string ToLower(std::string in) { return in; } +std::string ToUpper(std::string in) { + std::transform(in.begin(), in.end(), in.begin(), [](char c) -> char { return static_cast(std::toupper(c)); }); + return in; +} + bool EqualICase(std::string_view lhs, std::string_view rhs) { return lhs.size() == rhs.size() && std::equal(lhs.begin(), lhs.end(), rhs.begin(), [](char l, char r) { return std::tolower(l) == std::tolower(r); }); diff --git a/src/common/string_util.h b/src/common/string_util.h index f86590ad046..619d95d5d09 100644 --- a/src/common/string_util.h +++ b/src/common/string_util.h @@ -32,6 +32,7 @@ namespace util { std::string Float2String(double d); std::string ToLower(std::string in); +std::string ToUpper(std::string in); bool EqualICase(std::string_view lhs, std::string_view rhs); std::string BytesToHuman(uint64_t n); std::string Trim(std::string in, std::string_view chars); diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index ca00de1684b..2e4ed68c552 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -413,7 +413,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { // that can guarantee other threads can't come into critical zone, such as DEBUG, // CLUSTER subcommand, CONFIG SET, MULTI, LUA (in the immediate future). // Otherwise, we just use 'ConcurrencyGuard' to allow all workers to execute commands at the same time. - if (is_multi_exec && cmd_name != "exec") { + if (is_multi_exec && !(cmd_flags & kCmdBypassMulti)) { // No lock guard, because 'exec' command has acquired 'WorkExclusivityGuard' } else if (cmd_flags & kCmdExclusive) { exclusivity = srv_->WorkExclusivityGuard(); @@ -443,7 +443,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { } if (is_multi_exec && (cmd_flags & kCmdNoMulti)) { - Reply(redis::Error({Status::NotOK, "Can't execute " + cmd_name + " in MULTI"})); + Reply(redis::Error({Status::NotOK, fmt::format("{} inside MULTI is not allowed", util::ToUpper(cmd_name))})); multi_error_ = true; continue; } @@ -463,7 +463,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { } // We don't execute commands, but queue them, and then execute in EXEC command - if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdEndMulti)) { + if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdBypassMulti)) { multi_cmds_.emplace_back(std::move(cmd_tokens)); Reply(redis::SimpleString("QUEUED")); continue; diff --git a/tests/gocase/unit/multi/multi_test.go b/tests/gocase/unit/multi/multi_test.go index c2a96917b41..084eb805ba7 100644 --- a/tests/gocase/unit/multi/multi_test.go +++ b/tests/gocase/unit/multi/multi_test.go @@ -174,7 +174,7 @@ func TestMulti(t *testing.T) { t.Run("WATCH inside MULTI is not allowed", func(t *testing.T) { require.NoError(t, rdb.Do(ctx, "MULTI").Err()) require.EqualError(t, rdb.Do(ctx, "WATCH", "x").Err(), "ERR WATCH inside MULTI is not allowed") - require.NoError(t, rdb.Do(ctx, "EXEC").Err()) + require.NoError(t, rdb.Do(ctx, "DISCARD").Err()) }) t.Run("EXEC without MULTI is not allowed", func(t *testing.T) { From 1dbd7d3cb3c7c9b86b031b2104358c3ee93fb618 Mon Sep 17 00:00:00 2001 From: Aleks Lozovyuk Date: Sun, 17 Nov 2024 16:56:14 +0200 Subject: [PATCH 5/5] chore: bump cpptrace to v0.7.3 (#2670) Co-authored-by: Twice --- cmake/cpptrace.cmake | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmake/cpptrace.cmake b/cmake/cpptrace.cmake index a8d76b84b2e..e32e7790082 100644 --- a/cmake/cpptrace.cmake +++ b/cmake/cpptrace.cmake @@ -20,8 +20,8 @@ include_guard() include(cmake/utils.cmake) FetchContent_DeclareGitHubWithMirror(cpptrace - jeremy-rifkin/cpptrace v0.7.2 - MD5=4d992a22ddb80300fa2ddac097a5ce51 + jeremy-rifkin/cpptrace v0.7.3 + MD5=032eb39d17eb138871a760b1c2f52a74 ) if (SYMBOLIZE_BACKEND STREQUAL "libbacktrace")