diff --git a/CMakeLists.txt b/CMakeLists.txt index bd4b65c2c37..05ed9e10fac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -262,6 +262,8 @@ target_sources(kvrocks2redis PRIVATE src/scripting.h src/rand.cc src/rand.h + src/sha1.cc + src/sha1.h tools/kvrocks2redis/config.cc tools/kvrocks2redis/config.h tools/kvrocks2redis/main.cc diff --git a/kvrocks.conf b/kvrocks.conf index 4c7f31979cf..6876adb4ff3 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -20,10 +20,6 @@ timeout 0 # The number of worker's threads, increase or decrease it would effect the performance. workers 8 -# The number of replication worker's threads, increase or decrease it would effect the replication performance. -# Default: 1 -repl-workers 1 - # By default kvrocks does not run as a daemon. Use 'yes' if you need it. # Note that kvrocks will write a pid file in /var/run/kvrocks.pid when daemonized. daemonize no diff --git a/src/config.cc b/src/config.cc index a9892f2d4d8..6b5677e0866 100644 --- a/src/config.cc +++ b/src/config.cc @@ -65,7 +65,6 @@ Config::Config() { {"bind", true, new StringField(&binds_, "127.0.0.1")}, {"port", true, new IntField(&port, 6666, 1, 65535)}, {"workers", true, new IntField(&workers, 8, 1, 256)}, - {"repl-workers", true, new IntField(&repl_workers, 4, 1, 256)}, {"timeout", false, new IntField(&timeout, 0, 0, INT_MAX)}, {"tcp-backlog", true, new IntField(&backlog, 511, 0, INT_MAX)}, {"maxclients", false, new IntField(&maxclients, 10240, 0, INT_MAX)}, @@ -78,7 +77,7 @@ Config::Config() { {"compact-cron", false, new StringField(&compact_cron_, "")}, {"bgsave-cron", false, new StringField(&bgsave_cron_, "")}, {"compaction-checker-range", false, new StringField(&compaction_checker_range_, "")}, - {"db-name", true, new StringField(&db_name, "changeme.name")}, + {"db-name", true, new StringField(&db_name, "change.me.db")}, {"dir", true, new StringField(&dir, "/tmp/kvrocks")}, {"backup-dir", true, new StringField(&backup_dir, "")}, {"log-dir", true, new StringField(&log_dir, "")}, diff --git a/src/config.h b/src/config.h index 89d885c6db5..39c7b465a9d 100644 --- a/src/config.h +++ b/src/config.h @@ -45,7 +45,6 @@ struct Config{ ~Config(); int port = 6666; int workers = 0; - int repl_workers = 1; int timeout = 0; int loglevel = 0; int backlog = 511; diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index b5e936f5c75..1a4d9743d8c 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -48,16 +48,14 @@ class CommandAuth : public Commander { Status Execute(Server *svr, Connection *conn, std::string *output) override { Config *config = svr->GetConfig(); auto user_password = args_[1]; - if (!conn->IsRepl()) { - auto iter = config->tokens.find(user_password); - if (iter != config->tokens.end()) { - conn->SetNamespace(iter->second); - conn->BecomeUser(); - *output = Redis::SimpleString("OK"); - return Status::OK(); - } + auto iter = config->tokens.find(user_password); + if (iter != config->tokens.end()) { + conn->SetNamespace(iter->second); + conn->BecomeUser(); + *output = Redis::SimpleString("OK"); + return Status::OK(); } - const auto requirepass = conn->IsRepl() ? config->masterauth : config->requirepass; + const auto requirepass = config->requirepass; if (!requirepass.empty() && user_password != requirepass) { *output = Redis::Error("ERR invaild password"); return Status::OK(); diff --git a/src/redis_connection.cc b/src/redis_connection.cc index 3b99f00d64a..db03251f993 100644 --- a/src/redis_connection.cc +++ b/src/redis_connection.cc @@ -149,7 +149,6 @@ uint64_t Connection::GetClientType() { std::string Connection::GetFlags() { std::string flags; - if (owner_->IsRepl()) flags.append("R"); if (IsFlagEnabled(kSlave)) flags.append("S"); if (IsFlagEnabled(kCloseAfterReply)) flags.append("c"); if (IsFlagEnabled(kMonitor)) flags.append("M"); @@ -170,10 +169,6 @@ bool Connection::IsFlagEnabled(Flag flag) { return (flags_ & flag) > 0; } -bool Connection::IsRepl() { - return owner_->IsRepl(); -} - void Connection::SubscribeChannel(const std::string &channel) { for (const auto &chan : subscribe_channels_) { if (channel == chan) return; @@ -295,8 +290,7 @@ void Connection::ExecuteCommands(const std::vector &to_pro if (to_process_cmds.empty()) return; Config *config = svr_->GetConfig(); - std::string reply, password; - password = IsRepl() ? config->masterauth : config->requirepass; + std::string reply, password = config->requirepass; svr_->SetCurrentConnection(this); for (auto &cmd_tokens : to_process_cmds) { diff --git a/src/redis_connection.h b/src/redis_connection.h index 975679a6430..a4ccca353da 100644 --- a/src/redis_connection.h +++ b/src/redis_connection.h @@ -52,7 +52,6 @@ class Connection { void EnableFlag(Flag flag); void DisableFlag(Flag flag); bool IsFlagEnabled(Flag flag); - bool IsRepl(); uint64_t GetID() { return id_; } void SetID(uint64_t id) { id_ = id; } diff --git a/src/server.cc b/src/server.cc index a309076fa03..f3e46bb02e4 100644 --- a/src/server.cc +++ b/src/server.cc @@ -1190,18 +1190,6 @@ void Server::KillClient(int64_t *killed, std::string addr, uint64_t id, } } -void Server::SetReplicationRateLimit(uint64_t max_replication_mb) { - uint64_t max_rate_per_repl_worker = 0; - if (max_replication_mb > 0) { - max_rate_per_repl_worker = (max_replication_mb*MiB)/config_->repl_workers; - } - for (const auto &t : worker_threads_) { - if (t->GetWorker()->IsRepl()) { - t->GetWorker()->SetReplicationRateLimit(max_rate_per_repl_worker); - } - } -} - ReplState Server::GetReplicationState() { if (IsSlave() && replication_thread_) { return replication_thread_->State(); diff --git a/src/server.h b/src/server.h index 9cbc2cb75b0..c310a37074e 100644 --- a/src/server.h +++ b/src/server.h @@ -124,7 +124,6 @@ class Server { std::atomic *GetClientID(); void KillClient(int64_t *killed, std::string addr, uint64_t id, uint64_t type, bool skipme, Redis::Connection *conn); - void SetReplicationRateLimit(uint64_t max_replication_mb); lua_State *Lua() { return lua_; } Status ScriptExists(const std::string &sha); diff --git a/src/worker.cc b/src/worker.cc index b0ed7ac0593..a1a54be6682 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -13,7 +13,7 @@ #include "server.h" #include "util.h" -Worker::Worker(Server *svr, Config *config, bool repl) : svr_(svr), repl_(repl) { +Worker::Worker(Server *svr, Config *config, bool repl) : svr_(svr) { base_ = event_base_new(); if (!base_) throw std::exception(); @@ -241,36 +241,6 @@ Status Worker::Reply(int fd, const std::string &reply) { return Status(Status::NotOK, "connection doesn't exist"); } -int Worker::SetReplicationRateLimit(uint64_t max_replication_bytes) { - auto write_limit = EV_RATE_LIMIT_MAX; - if (max_replication_bytes > 0) { - write_limit = max_replication_bytes; - } - struct timeval cfg_tick = {1, 0}; - auto old_cfg = rate_limit_group_cfg_; - rate_limit_group_cfg_ = ev_token_bucket_cfg_new( - EV_RATE_LIMIT_MAX, EV_RATE_LIMIT_MAX, - write_limit, write_limit, - &cfg_tick); - if (rate_limit_group_cfg_ == nullptr) { - LOG(ERROR) << "[server] ev_token_bucket_cfg_new error"; - rate_limit_group_cfg_ = old_cfg; - return -1; - } - - if (rate_limit_group_ != nullptr) { - bufferevent_rate_limit_group_set_cfg(rate_limit_group_, rate_limit_group_cfg_); - } else { - rate_limit_group_ = bufferevent_rate_limit_group_new(base_, rate_limit_group_cfg_); - } - - if (old_cfg != nullptr) { - ev_token_bucket_cfg_free(old_cfg); - } - - return 0; -} - void Worker::BecomeMonitorConn(Redis::Connection *conn) { conns_mu_.lock(); conns_.erase(conn->GetFD()); @@ -358,11 +328,7 @@ void Worker::KickoutIdleClients(int timeout) { void WorkerThread::Start() { try { t_ = std::thread([this]() { - if (this->worker_->IsRepl()) { - Util::ThreadSetName("repl-worker"); - } else { - Util::ThreadSetName("worker"); - } + Util::ThreadSetName("worker"); this->worker_->Run(std::this_thread::get_id()); }); } catch (const std::system_error &e) { diff --git a/src/worker.h b/src/worker.h index c7412928aba..e48595055cd 100644 --- a/src/worker.h +++ b/src/worker.h @@ -32,8 +32,6 @@ class Worker { Status AddConnection(Redis::Connection *c); Status EnableWriteEvent(int fd); Status Reply(int fd, const std::string &reply); - bool IsRepl() { return repl_; } - int SetReplicationRateLimit(uint64_t max_replication_bytes); void BecomeMonitorConn(Redis::Connection *conn); void FeedMonitorConns(Redis::Connection *conn, const std::vector &tokens); @@ -61,7 +59,6 @@ class Worker { std::map monitor_conns_; int last_iter_conn_fd = 0; // fd of last processed connection in previous cron - bool repl_; struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr; struct ev_token_bucket_cfg *rate_limit_group_cfg_ = nullptr; };