Skip to content

Commit

Permalink
Add support of the slot batch for CLUSTERX SETSLOT (#1414)
Browse files Browse the repository at this point in the history
  • Loading branch information
infdahai authored May 14, 2023
1 parent 06c793b commit d2d10f7
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 64 deletions.
65 changes: 27 additions & 38 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fstream>
#include <memory>

#include "cluster/cluster_defs.h"
#include "commands/commander.h"
#include "common/io_util.h"
#include "fmt/format.h"
Expand All @@ -35,16 +36,6 @@
#include "string_util.h"
#include "time_util.h"

const char *errInvalidNodeID = "Invalid cluster node id";
const char *errInvalidSlotID = "Invalid slot id";
const char *errSlotOutOfRange = "Slot is out of range";
const char *errInvalidClusterVersion = "Invalid cluster version";
const char *errSlotOverlapped = "Slot distribution is overlapped";
const char *errNoMasterNode = "The node isn't a master";
const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized";
const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info";
const char *errInvalidImportState = "Invalid import state";

ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
std::bitset<kClusterSlots> slots)
: id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {}
Expand Down Expand Up @@ -93,32 +84,24 @@ Status Cluster::SetNodeId(const std::string &node_id) {
return Status::OK();
}

// Set the slot to the node if new version is current version +1. It is useful
// when we scale cluster avoid too many big messages, since we only update one
// slot distribution and there are 16384 slot in our design.
//
// The reason why the new version MUST be +1 of current version is that,
// the command changes topology based on specific topology (also means specific
// version), we must guarantee current topology is exactly expected, otherwise,
// this update may make topology corrupt, so base topology version is very important.
// This is different with CLUSTERX SETNODES commands because it uses new version
// topology to cover current version, it allows kvrocks nodes lost some topology
// updates since of network failure, it is state instead of operation.
Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_version) {
// Parameters check
Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const std::string &node_id,
int64_t new_version) {
if (new_version <= 0 || new_version != version_ + 1) {
return {Status::NotOK, errInvalidClusterVersion};
}

if (!IsValidSlot(slot)) {
return {Status::NotOK, errInvalidSlotID};
}

if (node_id.size() != kClusterNodeIdLen) {
return {Status::NotOK, errInvalidNodeID};
}

// Get the node which we want to assign a slot into it
// Get the node which we want to assign slots into it
std::shared_ptr<ClusterNode> to_assign_node = nodes_[node_id];
if (to_assign_node == nullptr) {
return {Status::NotOK, "No this node in the cluster"};
Expand All @@ -135,23 +118,29 @@ Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_versio
// 1. Remove the slot from old node if existing
// 2. Add the slot into to-assign node
// 3. Update the map of slots to nodes.
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots[slot] = false;
}
to_assign_node->slots[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
migrated_slots_.erase(slot);
}
// If slot is imported into this node
if (imported_slots_.count(slot) > 0) {
imported_slots_.erase(slot);
// remember: The atomicity of the process is based on
// the transactionality of ClearKeysOfSlot().
for (auto [s_start, s_end] : slot_ranges) {
for (int slot = s_start; slot <= s_end; slot++) {
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots[slot] = false;
}
to_assign_node->slots[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
migrated_slots_.erase(slot);
}
// If slot is imported into this node
if (imported_slots_.count(slot) > 0) {
imported_slots_.erase(slot);
}
}
}
}

Expand Down
11 changes: 2 additions & 9 deletions src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,13 @@
#include <unordered_map>
#include <vector>

#include "cluster/cluster_defs.h"
#include "commands/commander.h"
#include "common/io_util.h"
#include "redis_slot.h"
#include "server/redis_connection.h"
#include "status.h"

enum {
kClusterMaster = 1,
kClusterSlave = 2,
kClusterNodeIdLen = 40,
kClusterPortIncr = 10000,
kClusterSlots = HASH_SLOTS_SIZE,
};

class ClusterNode {
public:
explicit ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
Expand Down Expand Up @@ -79,7 +72,7 @@ class Cluster {
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force);
Status GetClusterNodes(std::string *nodes_str);
Status SetNodeId(const std::string &node_id);
Status SetSlot(int slot, const std::string &node_id, int64_t version);
Status SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const std::string &node_id, int64_t version);
Status SetSlotMigrated(int slot, const std::string &ip_port);
Status SetSlotImported(int slot);
Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
Expand Down
43 changes: 43 additions & 0 deletions src/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include "cluster/redis_slot.h"

enum {
kClusterMaster = 1,
kClusterSlave = 2,
kClusterNodeIdLen = 40,
kClusterPortIncr = 10000,
kClusterSlots = HASH_SLOTS_SIZE,
};

inline constexpr const char *errInvalidNodeID = "Invalid cluster node id";
inline constexpr const char *errInvalidSlotID = "Invalid slot id";
inline constexpr const char *errSlotOutOfRange = "Slot is out of range";
inline constexpr const char *errInvalidClusterVersion = "Invalid cluster version";
inline constexpr const char *errSlotOverlapped = "Slot distribution is overlapped";
inline constexpr const char *errNoMasterNode = "The node isn't a master";
inline constexpr const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized";
inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info";
inline constexpr const char *errInvalidImportState = "Invalid import state";

using SlotRange = std::pair<int, int>;
1 change: 1 addition & 0 deletions src/cluster/redis_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

#pragma once
#include <cstdint>
#include <string>

// crc16
Expand Down
17 changes: 6 additions & 11 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*
*/

#include "cluster/cluster_defs.h"
#include "cluster/slot_import.h"
#include "commander.h"
#include "error_constants.h"
Expand Down Expand Up @@ -156,15 +157,9 @@ class CommandClusterX : public Commander {

// CLUSTERX SETSLOT $SLOT_ID NODE $NODE_ID $VERSION
if (subcommand_ == "setslot" && args_.size() == 6) {
auto parse_id = ParseInt<int>(args[2], 10);
if (!parse_id) {
return {Status::RedisParseErr, errValueNotInteger};
}

slot_id_ = *parse_id;

if (!Cluster::IsValidSlot(slot_id_)) {
return {Status::RedisParseErr, "Invalid slot id"};
Status s = CommanderHelper::ParseSlotRanges(args_[2], slot_ranges_);
if (!s.IsOK()) {
return s;
}

if (strcasecmp(args_[3].c_str(), "node") != 0) {
Expand Down Expand Up @@ -219,7 +214,7 @@ class CommandClusterX : public Commander {
*output = redis::Error(s.Msg());
}
} else if (subcommand_ == "setslot") {
Status s = svr->cluster->SetSlot(slot_id_, args_[4], set_version_);
Status s = svr->cluster->SetSlotRanges(slot_ranges_, args_[4], set_version_);
if (s.IsOK()) {
need_persist_nodes_info = true;
*output = redis::SimpleString("OK");
Expand Down Expand Up @@ -251,7 +246,7 @@ class CommandClusterX : public Commander {
std::string dst_node_id_;
int64_t set_version_ = 0;
int64_t slot_ = -1;
int slot_id_ = -1;
std::vector<SlotRange> slot_ranges_;
bool force_ = false;
};

Expand Down
50 changes: 50 additions & 0 deletions src/commands/commander.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "commander.h"

#include "cluster/cluster_defs.h"

namespace redis {

RegisterToCommandTable::RegisterToCommandTable(std::initializer_list<CommandAttributes> list) {
Expand Down Expand Up @@ -108,4 +110,52 @@ bool IsCommandExists(const std::string &name) {
return command_details::original_commands.find(util::ToLower(name)) != command_details::original_commands.end();
}

Status CommanderHelper::ParseSlotRanges(const std::string &slots_str, std::vector<SlotRange> &slots) {
if (slots_str.empty()) {
return {Status::NotOK, "No slots to parse."};
}

std::vector<std::string> slot_ranges = util::Split(slots_str, " ");
if (slot_ranges.empty()) {
return {Status::NotOK,
fmt::format("Invalid slots: `{}`. No slots to parse. Please use spaces to separate slots.", slots_str)};
}

auto valid_range = NumericRange<int>{0, kClusterSlots - 1};
// Parse all slots (include slot ranges)
for (auto &slot_range : slot_ranges) {
if (slot_range.find('-') == std::string::npos) {
auto parse_result = ParseInt<int>(slot_range, valid_range, 10);
if (!parse_result) {
return std::move(parse_result).Prefixed(errInvalidSlotID);
}
slots.emplace_back(*parse_result, *parse_result);
continue;
}

// parse slot range: "int1-int2" (satisfy: int1 <= int2 )
if (slot_range.front() == '-' || slot_range.back() == '-') {
return {Status::NotOK,
fmt::format("Invalid slot range: `{}`. The character '-' can't appear in the first or last position.",
slot_range)};
}
std::vector<std::string> fields = util::Split(slot_range, "-");
if (fields.size() != 2) {
return {Status::NotOK,
fmt::format("Invalid slot range: `{}`. The slot range should be of the form `int1-int2`.", slot_range)};
}
auto parse_start = ParseInt<int>(fields[0], valid_range, 10);
auto parse_end = ParseInt<int>(fields[1], valid_range, 10);
if (!parse_start || !parse_end || *parse_start > *parse_end) {
return {Status::NotOK,
fmt::format(
"Invalid slot range: `{}`. The slot range `int1-int2` needs to satisfy the condition (int1 <= int2).",
slot_range)};
}
slots.emplace_back(*parse_start, *parse_end);
}

return Status::OK();
}

} // namespace redis
7 changes: 7 additions & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include <utility>
#include <vector>

#include "cluster/cluster_defs.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "status.h"
#include "string_util.h"
Expand Down Expand Up @@ -87,6 +89,11 @@ class CommanderWithParseMove : Commander {
virtual Status ParseMove(std::vector<std::string> &&args) { return Status::OK(); }
};

class CommanderHelper {
public:
static Status ParseSlotRanges(const std::string &slots_str, std::vector<SlotRange> &slots);
};

using CommanderFactory = std::function<std::unique_ptr<Commander>()>;

struct CommandKeyRange {
Expand Down
1 change: 1 addition & 0 deletions src/common/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rocksdb/slice.h>
#include <unistd.h>

#include <cstdint>
#include <string>

enum class Endian {
Expand Down
Loading

0 comments on commit d2d10f7

Please sign in to comment.