Skip to content

Commit

Permalink
Add the support of the namespace replication (#1776)
Browse files Browse the repository at this point in the history
Currently, Kvrocks won't propagate the namespace and token between master and slaves, and it may cause trouble if users add them in master but missing in slaves. So it would make this process easier if we could propagate the namespace/token as well.


## Design

To implement this feature, we need to consider the following situations:

- How to replicate namespaces between the master and slaves
- How to be compatible with old versions

**For the replication:**

We now have the propagate column family to propagate commands between the master and slaves, so we can use it to replicate namespaces as well. The steps are like below:

1. Modify namespace/token in master(forbid changing in slaves)
2. Propagate the namespace event to notify slaves like the Lua script
3. Slaves replay namespace/token changes

This works well for new instances, but what if there are different namespaces between the master and slaves in old instances?  There are three possible conditions:

1. Namespace/token exists in master but NOT in slave
2. Namespace exists in slave but NOT in master
3. Namespace exists both in master and slave, but the token is different

For condition 1, we can add the namespace/token to the slave as well. For condition 2, it makes sense to keep the namespace NOT existing in the master since data are from it, so we can remove the namespace/token. The most troublesome condition is 3 since users may have already used the token to access the data, changing the token would cause an invalid password error. **To make this simple, I prefer also overwriting the token.** And offer the configuration to allow users to enable or disable this feature, they MUST understand the risks before enabling.

To be noticed, we don't expect to propagate the default namespace since it may affect the replication part, so we won't persist the default namespace.

**For the compatible issue:**

Currently, namespaces are kept in the configuration file, so we need to read from the rocksdb first(store in propagate column like Lua script) when starting:

- If the key exists in rocksdb, load namespace/token from rocksdb and check if any namespace/token is inside the configuration file. If yes, refuse to start the server and ask users to remove them and add them via the `namespace add` command.
- If the key does NOT exist in rocksdb, read namespace/token from the configuration file and write them into rocksdb. To be noticed, we need to add an empty value for the key even though no namespace/token.

Users can only modify the namespace via command if this feature is enabled.
  • Loading branch information
git-hulk authored Oct 9, 2023
1 parent 42623c1 commit f972819
Show file tree
Hide file tree
Showing 14 changed files with 527 additions and 282 deletions.
9 changes: 9 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ daemonize no

cluster-enabled no

# By default, namespaces are stored in the configuration file and won't be replicated
# to replicas. This option allows to change this behavior, so that namespaces are also
# propagated to slaves. Note that:
# 1) it won't replicate the 'masterauth' to prevent breaking master/replica replication
# 2) it will overwrite replica's namespace with master's namespace, so be careful of in-using namespaces
# 3) cannot switch off the namespace replication once it's enabled
#
# Default: no
repl-namespace-enabled no

# Persist the cluster nodes topology in local file($dir/nodes.conf). This configuration
# takes effect only if the cluster mode was enabled.
Expand Down
5 changes: 5 additions & 0 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,11 @@ Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
return s.Prefixed("failed to execute propagate command");
}
}
} else if (write_batch_handler.Key() == kNamespaceDBKey) {
auto s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
return s.Prefixed("failed to load namespaces");
}
}
break;
case kBatchTypeStream: {
Expand Down
35 changes: 18 additions & 17 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ enum class AuthResult {
NO_REQUIRE_PASS,
};

AuthResult AuthenticateUser(Connection *conn, Config *config, const std::string &user_password) {
auto iter = config->tokens.find(user_password);
if (iter != config->tokens.end()) {
conn->SetNamespace(iter->second);
AuthResult AuthenticateUser(Server *srv, Connection *conn, const std::string &user_password) {
auto ns = srv->GetNamespace()->GetByToken(user_password);
if (ns.IsOK()) {
conn->SetNamespace(ns.GetValue());
conn->BecomeUser();
return AuthResult::OK;
}

const auto &requirepass = config->requirepass;
const auto &requirepass = srv->GetConfig()->requirepass;
if (!requirepass.empty() && user_password != requirepass) {
return AuthResult::INVALID_PASSWORD;
}
Expand All @@ -64,9 +64,8 @@ AuthResult AuthenticateUser(Connection *conn, Config *config, const std::string
class CommandAuth : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Config *config = svr->GetConfig();
auto &user_password = args_[1];
AuthResult result = AuthenticateUser(conn, config, user_password);
AuthResult result = AuthenticateUser(svr, conn, user_password);
switch (result) {
case AuthResult::OK:
*output = redis::SimpleString("OK");
Expand All @@ -89,10 +88,13 @@ class CommandNamespace : public Commander {

Config *config = svr->GetConfig();
std::string sub_command = util::ToLower(args_[1]);
if (config->repl_namespace_enabled && config->IsSlave() && sub_command != "get") {
return {Status::RedisExecErr, "namespace is read-only for slave"};
}
if (args_.size() == 3 && sub_command == "get") {
if (args_[2] == "*") {
std::vector<std::string> namespaces;
auto tokens = config->tokens;
auto tokens = svr->GetNamespace()->List();
for (auto &token : tokens) {
namespaces.emplace_back(token.second); // namespace
namespaces.emplace_back(token.first); // token
Expand All @@ -101,26 +103,25 @@ class CommandNamespace : public Commander {
namespaces.emplace_back(config->requirepass);
*output = redis::MultiBulkString(namespaces, false);
} else {
std::string token;
auto s = config->GetNamespace(args_[2], &token);
if (s.Is<Status::NotFound>()) {
auto token = svr->GetNamespace()->Get(args_[2]);
if (token.Is<Status::NotFound>()) {
*output = redis::NilString();
} else {
*output = redis::BulkString(token);
*output = redis::BulkString(token.GetValue());
}
}
} else if (args_.size() == 4 && sub_command == "set") {
Status s = config->SetNamespace(args_[2], args_[3]);
Status s = svr->GetNamespace()->Set(args_[2], args_[3]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + s.Msg());
LOG(WARNING) << "Updated namespace: " << args_[2] << " with token: " << args_[3] << ", addr: " << conn->GetAddr()
<< ", result: " << s.Msg();
} else if (args_.size() == 4 && sub_command == "add") {
Status s = config->AddNamespace(args_[2], args_[3]);
Status s = svr->GetNamespace()->Add(args_[2], args_[3]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + s.Msg());
LOG(WARNING) << "New namespace: " << args_[2] << " with token: " << args_[3] << ", addr: " << conn->GetAddr()
<< ", result: " << s.Msg();
} else if (args_.size() == 3 && sub_command == "del") {
Status s = config->DelNamespace(args_[2]);
Status s = svr->GetNamespace()->Del(args_[2]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + s.Msg());
LOG(WARNING) << "Deleted namespace: " << args_[2] << ", addr: " << conn->GetAddr() << ", result: " << s.Msg();
} else {
Expand Down Expand Up @@ -239,7 +240,7 @@ class CommandConfig : public Commander {
}

if (args_.size() == 2 && sub_command == "rewrite") {
Status s = config->Rewrite();
Status s = config->Rewrite(svr->GetNamespace()->List());
if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};

*output = redis::SimpleString("OK");
Expand Down Expand Up @@ -709,7 +710,7 @@ class CommandHello final : public Commander {
next_arg++;
}
const auto &user_password = args_[next_arg + 1];
auto auth_result = AuthenticateUser(conn, svr->GetConfig(), user_password);
auto auth_result = AuthenticateUser(svr, conn, user_password);
switch (auth_result) {
case AuthResult::INVALID_PASSWORD:
return {Status::NotOK, "invalid password"};
Expand Down
128 changes: 18 additions & 110 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Config::Config() {
{"log-retention-days", false, new IntField(&log_retention_days, -1, -1, INT_MAX)},
{"persist-cluster-nodes-enabled", false, new YesNoField(&persist_cluster_nodes_enabled, true)},
{"redis-cursor-compatible", false, new YesNoField(&redis_cursor_compatible, false)},
{"repl-namespace-enabled", false, new YesNoField(&repl_namespace_enabled, false)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down Expand Up @@ -233,17 +234,17 @@ void Config::initFieldValidator() {
std::map<std::string, ValidateFn> validators = {
{"requirepass",
[this](const std::string &k, const std::string &v) -> Status {
if (v.empty() && !tokens.empty()) {
if (v.empty() && !load_tokens.empty()) {
return {Status::NotOK, "requirepass empty not allowed while the namespace exists"};
}
if (tokens.find(v) != tokens.end()) {
if (load_tokens.find(v) != load_tokens.end()) {
return {Status::NotOK, "requirepass is duplicated with namespace tokens"};
}
return Status::OK();
}},
{"masterauth",
[this](const std::string &k, const std::string &v) -> Status {
if (tokens.find(v) != tokens.end()) {
if (load_tokens.find(v) != load_tokens.end()) {
return {Status::NotOK, "masterauth is duplicated with namespace tokens"};
}
return Status::OK();
Expand Down Expand Up @@ -515,6 +516,12 @@ void Config::initFieldCallback() {
remove(nodes_file_path.data());
return Status::OK();
}},
{"repl-namespace-enabled",
[](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
return srv->GetNamespace()->LoadAndRewrite();
}},

{"rocksdb.target_file_size_base",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
Expand Down Expand Up @@ -682,7 +689,7 @@ Status Config::parseConfigFromPair(const std::pair<std::string, std::string> &in
if (strncasecmp(input.first.data(), ns_str, ns_str_size) == 0) {
// namespace should keep key case-sensitive
field_key = input.first;
tokens[input.second] = input.first.substr(ns_str_size);
load_tokens[input.second] = input.first.substr(ns_str_size);
return Status::OK();
}

Expand Down Expand Up @@ -711,10 +718,10 @@ Status Config::parseConfigFromString(const std::string &input, int line_number)
}

Status Config::finish() {
if (requirepass.empty() && !tokens.empty()) {
if (requirepass.empty() && !load_tokens.empty()) {
return {Status::NotOK, "requirepass empty wasn't allowed while the namespace exists"};
}
if ((cluster_enabled) && !tokens.empty()) {
if ((cluster_enabled) && !load_tokens.empty()) {
return {Status::NotOK, "enabled cluster mode wasn't allowed while the namespace exists"};
}
if (unixsocket.empty() && binds.size() == 0) {
Expand Down Expand Up @@ -836,7 +843,7 @@ Status Config::Set(Server *svr, std::string key, const std::string &value) {
return Status::OK();
}

Status Config::Rewrite() {
Status Config::Rewrite(const std::map<std::string, std::string> &tokens) {
if (path_.empty()) {
return {Status::NotOK, "the server is running without a config file"};
}
Expand All @@ -853,8 +860,10 @@ Status Config::Rewrite() {
}

std::string namespace_prefix = "namespace.";
for (const auto &iter : tokens) {
new_config[namespace_prefix + iter.second] = iter.first;
if (!repl_namespace_enabled) { // need to rewrite to the configuration if we don't replicate namespaces
for (const auto &iter : tokens) {
new_config[namespace_prefix + iter.second] = iter.first;
}
}

std::ifstream file(path_);
Expand Down Expand Up @@ -900,104 +909,3 @@ Status Config::Rewrite() {
}
return Status::OK();
}

Status Config::GetNamespace(const std::string &ns, std::string *token) const {
token->clear();
for (const auto &iter : tokens) {
if (iter.second == ns) {
*token = iter.first;
return Status::OK();
}
}
return {Status::NotFound};
}

Status Config::SetNamespace(const std::string &ns, const std::string &token) {
if (ns == kDefaultNamespace) {
return {Status::NotOK, "forbidden to update the default namespace"};
}
if (tokens.find(token) != tokens.end()) {
return {Status::NotOK, "the token has already exists"};
}

if (token == requirepass || token == masterauth) {
return {Status::NotOK, "the token is duplicated with requirepass or masterauth"};
}

for (const auto &iter : tokens) {
if (iter.second == ns) {
tokens.erase(iter.first);
tokens[token] = ns;
auto s = Rewrite();
if (!s.IsOK()) {
// Need to roll back the old token if fails to rewrite the config
tokens.erase(token);
tokens[iter.first] = ns;
}
return s;
}
}
return {Status::NotOK, "the namespace was not found"};
}

Status Config::AddNamespace(const std::string &ns, const std::string &token) {
if (requirepass.empty()) {
return {Status::NotOK, "forbidden to add namespace when requirepass was empty"};
}
if (cluster_enabled) {
return {Status::NotOK, "forbidden to add namespace when cluster mode was enabled"};
}
if (ns == kDefaultNamespace) {
return {Status::NotOK, "forbidden to add the default namespace"};
}
auto s = isNamespaceLegal(ns);
if (!s.IsOK()) return s;
if (tokens.find(token) != tokens.end()) {
return {Status::NotOK, "the token has already exists"};
}

if (token == requirepass || token == masterauth) {
return {Status::NotOK, "the token is duplicated with requirepass or masterauth"};
}

for (const auto &iter : tokens) {
if (iter.second == ns) {
return {Status::NotOK, "the namespace has already exists"};
}
}
tokens[token] = ns;

s = Rewrite();
if (!s.IsOK()) {
tokens.erase(token);
}
return s;
}

Status Config::DelNamespace(const std::string &ns) {
if (ns == kDefaultNamespace) {
return {Status::NotOK, "forbidden to delete the default namespace"};
}
for (const auto &iter : tokens) {
if (iter.second == ns) {
tokens.erase(iter.first);
auto s = Rewrite();
if (!s.IsOK()) {
tokens[iter.first] = ns;
}
return s;
}
}
return {Status::NotOK, "the namespace was not found"};
}

Status Config::isNamespaceLegal(const std::string &ns) {
if (ns.size() > UINT8_MAX) {
return {Status::NotOK, fmt::format("size exceed limit {}", UINT8_MAX)};
}
char last_char = ns.back();
if (last_char == std::numeric_limits<char>::max()) {
return {Status::NotOK, "namespace contain illegal letter"};
}
return Status::OK();
}
15 changes: 8 additions & 7 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ struct Config {
CompactionCheckerRange compaction_checker_range{-1, -1};
int64_t force_compact_file_age;
int force_compact_file_min_deleted_percentage;
std::map<std::string, std::string> tokens;
bool repl_namespace_enabled = false;
std::string replica_announce_ip;
uint32_t replica_announce_port = 0;

Expand All @@ -149,6 +149,11 @@ struct Config {

bool redis_cursor_compatible = false;
int log_retention_days;

// load_tokens is used to buffer the tokens when loading,
// don't use it to authenticate or rewrite the configuration file.
std::map<std::string, std::string> load_tokens;

// profiling
int profiling_sample_ratio = 0;
int profiling_sample_record_threshold_ms = 0;
Expand Down Expand Up @@ -210,16 +215,13 @@ struct Config {
mutable std::mutex backup_mu;

std::string NodesFilePath() const;
Status Rewrite();
Status Rewrite(const std::map<std::string, std::string> &tokens);
Status Load(const CLIOptions &path);
void Get(const std::string &key, std::vector<std::string> *values) const;
Status Set(Server *svr, std::string key, const std::string &value);
void SetMaster(const std::string &host, uint32_t port);
void ClearMaster();
Status GetNamespace(const std::string &ns, std::string *token) const;
Status AddNamespace(const std::string &ns, const std::string &token);
Status SetNamespace(const std::string &ns, const std::string &token);
Status DelNamespace(const std::string &ns);
bool IsSlave() const { return !master_host.empty(); }

private:
std::string path_;
Expand All @@ -237,5 +239,4 @@ struct Config {
Status parseConfigFromPair(const std::pair<std::string, std::string> &input, int line_number);
Status parseConfigFromString(const std::string &input, int line_number);
Status finish();
static Status isNamespaceLegal(const std::string &ns);
};
Loading

0 comments on commit f972819

Please sign in to comment.