Skip to content

Commit

Permalink
Remove unused configuration item repl-workers (apache#377)
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk authored and ShooterIT committed Nov 2, 2021
1 parent d75b95c commit 1cb29ad
Show file tree
Hide file tree
Showing 11 changed files with 13 additions and 76 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ target_sources(kvrocks2redis PRIVATE
src/scripting.h
src/rand.cc
src/rand.h
src/sha1.cc
src/sha1.h
tools/kvrocks2redis/config.cc
tools/kvrocks2redis/config.h
tools/kvrocks2redis/main.cc
Expand Down
4 changes: 0 additions & 4 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ timeout 0
# The number of worker's threads, increase or decrease it would effect the performance.
workers 8

# The number of replication worker's threads, increase or decrease it would effect the replication performance.
# Default: 1
repl-workers 1

# By default kvrocks does not run as a daemon. Use 'yes' if you need it.
# Note that kvrocks will write a pid file in /var/run/kvrocks.pid when daemonized.
daemonize no
Expand Down
3 changes: 1 addition & 2 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Config::Config() {
{"bind", true, new StringField(&binds_, "127.0.0.1")},
{"port", true, new IntField(&port, 6666, 1, 65535)},
{"workers", true, new IntField(&workers, 8, 1, 256)},
{"repl-workers", true, new IntField(&repl_workers, 4, 1, 256)},
{"timeout", false, new IntField(&timeout, 0, 0, INT_MAX)},
{"tcp-backlog", true, new IntField(&backlog, 511, 0, INT_MAX)},
{"maxclients", false, new IntField(&maxclients, 10240, 0, INT_MAX)},
Expand All @@ -78,7 +77,7 @@ Config::Config() {
{"compact-cron", false, new StringField(&compact_cron_, "")},
{"bgsave-cron", false, new StringField(&bgsave_cron_, "")},
{"compaction-checker-range", false, new StringField(&compaction_checker_range_, "")},
{"db-name", true, new StringField(&db_name, "changeme.name")},
{"db-name", true, new StringField(&db_name, "change.me.db")},
{"dir", true, new StringField(&dir, "/tmp/kvrocks")},
{"backup-dir", true, new StringField(&backup_dir, "")},
{"log-dir", true, new StringField(&log_dir, "")},
Expand Down
1 change: 0 additions & 1 deletion src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ struct Config{
~Config();
int port = 6666;
int workers = 0;
int repl_workers = 1;
int timeout = 0;
int loglevel = 0;
int backlog = 511;
Expand Down
16 changes: 7 additions & 9 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,14 @@ class CommandAuth : public Commander {
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Config *config = svr->GetConfig();
auto user_password = args_[1];
if (!conn->IsRepl()) {
auto iter = config->tokens.find(user_password);
if (iter != config->tokens.end()) {
conn->SetNamespace(iter->second);
conn->BecomeUser();
*output = Redis::SimpleString("OK");
return Status::OK();
}
auto iter = config->tokens.find(user_password);
if (iter != config->tokens.end()) {
conn->SetNamespace(iter->second);
conn->BecomeUser();
*output = Redis::SimpleString("OK");
return Status::OK();
}
const auto requirepass = conn->IsRepl() ? config->masterauth : config->requirepass;
const auto requirepass = config->requirepass;
if (!requirepass.empty() && user_password != requirepass) {
*output = Redis::Error("ERR invaild password");
return Status::OK();
Expand Down
8 changes: 1 addition & 7 deletions src/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ uint64_t Connection::GetClientType() {

std::string Connection::GetFlags() {
std::string flags;
if (owner_->IsRepl()) flags.append("R");
if (IsFlagEnabled(kSlave)) flags.append("S");
if (IsFlagEnabled(kCloseAfterReply)) flags.append("c");
if (IsFlagEnabled(kMonitor)) flags.append("M");
Expand All @@ -170,10 +169,6 @@ bool Connection::IsFlagEnabled(Flag flag) {
return (flags_ & flag) > 0;
}

bool Connection::IsRepl() {
return owner_->IsRepl();
}

void Connection::SubscribeChannel(const std::string &channel) {
for (const auto &chan : subscribe_channels_) {
if (channel == chan) return;
Expand Down Expand Up @@ -295,8 +290,7 @@ void Connection::ExecuteCommands(const std::vector<Redis::CommandTokens> &to_pro
if (to_process_cmds.empty()) return;

Config *config = svr_->GetConfig();
std::string reply, password;
password = IsRepl() ? config->masterauth : config->requirepass;
std::string reply, password = config->requirepass;

svr_->SetCurrentConnection(this);
for (auto &cmd_tokens : to_process_cmds) {
Expand Down
1 change: 0 additions & 1 deletion src/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class Connection {
void EnableFlag(Flag flag);
void DisableFlag(Flag flag);
bool IsFlagEnabled(Flag flag);
bool IsRepl();

uint64_t GetID() { return id_; }
void SetID(uint64_t id) { id_ = id; }
Expand Down
12 changes: 0 additions & 12 deletions src/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1190,18 +1190,6 @@ void Server::KillClient(int64_t *killed, std::string addr, uint64_t id,
}
}

void Server::SetReplicationRateLimit(uint64_t max_replication_mb) {
uint64_t max_rate_per_repl_worker = 0;
if (max_replication_mb > 0) {
max_rate_per_repl_worker = (max_replication_mb*MiB)/config_->repl_workers;
}
for (const auto &t : worker_threads_) {
if (t->GetWorker()->IsRepl()) {
t->GetWorker()->SetReplicationRateLimit(max_rate_per_repl_worker);
}
}
}

ReplState Server::GetReplicationState() {
if (IsSlave() && replication_thread_) {
return replication_thread_->State();
Expand Down
1 change: 0 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ class Server {
std::atomic<uint64_t> *GetClientID();
void KillClient(int64_t *killed, std::string addr, uint64_t id, uint64_t type,
bool skipme, Redis::Connection *conn);
void SetReplicationRateLimit(uint64_t max_replication_mb);

lua_State *Lua() { return lua_; }
Status ScriptExists(const std::string &sha);
Expand Down
38 changes: 2 additions & 36 deletions src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include "server.h"
#include "util.h"

Worker::Worker(Server *svr, Config *config, bool repl) : svr_(svr), repl_(repl) {
Worker::Worker(Server *svr, Config *config, bool repl) : svr_(svr) {
base_ = event_base_new();
if (!base_) throw std::exception();

Expand Down Expand Up @@ -241,36 +241,6 @@ Status Worker::Reply(int fd, const std::string &reply) {
return Status(Status::NotOK, "connection doesn't exist");
}

int Worker::SetReplicationRateLimit(uint64_t max_replication_bytes) {
auto write_limit = EV_RATE_LIMIT_MAX;
if (max_replication_bytes > 0) {
write_limit = max_replication_bytes;
}
struct timeval cfg_tick = {1, 0};
auto old_cfg = rate_limit_group_cfg_;
rate_limit_group_cfg_ = ev_token_bucket_cfg_new(
EV_RATE_LIMIT_MAX, EV_RATE_LIMIT_MAX,
write_limit, write_limit,
&cfg_tick);
if (rate_limit_group_cfg_ == nullptr) {
LOG(ERROR) << "[server] ev_token_bucket_cfg_new error";
rate_limit_group_cfg_ = old_cfg;
return -1;
}

if (rate_limit_group_ != nullptr) {
bufferevent_rate_limit_group_set_cfg(rate_limit_group_, rate_limit_group_cfg_);
} else {
rate_limit_group_ = bufferevent_rate_limit_group_new(base_, rate_limit_group_cfg_);
}

if (old_cfg != nullptr) {
ev_token_bucket_cfg_free(old_cfg);
}

return 0;
}

void Worker::BecomeMonitorConn(Redis::Connection *conn) {
conns_mu_.lock();
conns_.erase(conn->GetFD());
Expand Down Expand Up @@ -358,11 +328,7 @@ void Worker::KickoutIdleClients(int timeout) {
void WorkerThread::Start() {
try {
t_ = std::thread([this]() {
if (this->worker_->IsRepl()) {
Util::ThreadSetName("repl-worker");
} else {
Util::ThreadSetName("worker");
}
Util::ThreadSetName("worker");
this->worker_->Run(std::this_thread::get_id());
});
} catch (const std::system_error &e) {
Expand Down
3 changes: 0 additions & 3 deletions src/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class Worker {
Status AddConnection(Redis::Connection *c);
Status EnableWriteEvent(int fd);
Status Reply(int fd, const std::string &reply);
bool IsRepl() { return repl_; }
int SetReplicationRateLimit(uint64_t max_replication_bytes);
void BecomeMonitorConn(Redis::Connection *conn);
void FeedMonitorConns(Redis::Connection *conn, const std::vector<std::string> &tokens);

Expand Down Expand Up @@ -61,7 +59,6 @@ class Worker {
std::map<int, Redis::Connection*> monitor_conns_;
int last_iter_conn_fd = 0; // fd of last processed connection in previous cron

bool repl_;
struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr;
struct ev_token_bucket_cfg *rate_limit_group_cfg_ = nullptr;
};
Expand Down

0 comments on commit 1cb29ad

Please sign in to comment.