Skip to content

Commit

Permalink
add stdint.h for uint_t type
Browse files Browse the repository at this point in the history
ref(https://godbolt.org/z/5q6zj5W9s)

Signed-off-by: clundro <859287553@qq.com>

add  cluster_slot_batch ref to #529

Signed-off-by: clundro <859287553@qq.com>

add tests for parsedSlotRange

Signed-off-by: clundro <859287553@qq.com>
  • Loading branch information
infdahai committed May 3, 2023
1 parent 30e9fd7 commit d245e9c
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 42 deletions.
123 changes: 94 additions & 29 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,32 +93,26 @@ Status Cluster::SetNodeId(const std::string &node_id) {
return Status::OK();
}

// Set the slot to the node if new version is current version +1. It is useful
// when we scale cluster avoid too many big messages, since we only update one
// slot distribution and there are 16384 slot in our design.
//
// Set the slots to the node if new version is current version +1. It is useful
// when we scale cluster avoid too many big messages. By th way,there are 16384 slots
// in our design.
// The reason why the new version MUST be +1 of current version is that,
// the command changes topology based on specific topology (also means specific
// version), we must guarantee current topology is exactly expected, otherwise,
// this update may make topology corrupt, so base topology version is very important.
// This is different with CLUSTERX SETNODES commands because it uses new version
// topology to cover current version, it allows kvrocks nodes lost some topology
// updates since of network failure, it is state instead of operation.
Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_version) {
// Parameters check
Status Cluster::SetSlot(const std::string &slots_str, const std::string &node_id, int64_t new_version) {
if (new_version <= 0 || new_version != version_ + 1) {
return {Status::NotOK, errInvalidClusterVersion};
}

if (!IsValidSlot(slot)) {
return {Status::NotOK, errInvalidSlotID};
}

if (node_id.size() != kClusterNodeIdLen) {
return {Status::NotOK, errInvalidNodeID};
}

// Get the node which we want to assign a slot into it
// Get the node which we want to assign slots into it
std::shared_ptr<ClusterNode> to_assign_node = nodes_[node_id];
if (to_assign_node == nullptr) {
return {Status::NotOK, "No this node in the cluster"};
Expand All @@ -128,30 +122,43 @@ Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_versio
return {Status::NotOK, errNoMasterNode};
}

std::vector<std::pair<int, int>> slots;
Status s = parseSlotRanges(slots_str, slots);
if (!s.IsOK()) {
return s;
}

// Update version
version_ = new_version;

// Update topology
// 1. Remove the slot from old node if existing
// 2. Add the slot into to-assign node
// 3. Update the map of slots to nodes.
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots[slot] = false;
}
to_assign_node->slots[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
migrated_slots_.erase(slot);
}
// If slot is imported into this node
if (imported_slots_.count(slot) > 0) {
imported_slots_.erase(slot);
// remember: The atomicity of the process is based on
// the transactionality of ClearKeysOfSlot().
for (const auto &slot_range : slots) {
int s_start = slot_range.first, s_end = slot_range.second;
for (int slot = s_start; slot <= s_end; slot++) {
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots[slot] = false;
}
to_assign_node->slots[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
migrated_slots_.erase(slot);
}
// If slot is imported into this node
if (imported_slots_.count(slot) > 0) {
imported_slots_.erase(slot);
}
}
}
}

Expand Down Expand Up @@ -465,7 +472,7 @@ SlotInfo Cluster::genSlotNodeInfo(int start, int end, const std::shared_ptr<Clus
std::vector<SlotInfo::NodeInfo> vn;
vn.push_back({n->host, n->port, n->id}); // itself

for (const auto &id : n->replicas) { // replicas
for (const auto &id : n->replicas) { // replicas
if (nodes_.find(id) == nodes_.end()) continue;
vn.push_back({nodes_[id]->host, nodes_[id]->port, nodes_[id]->id});
}
Expand Down Expand Up @@ -641,6 +648,64 @@ Status Cluster::LoadClusterNodes(const std::string &file_path) {
return SetClusterNodes(nodes_info, version, false);
}

// TODO: maybe it needs to use a more precise error type to represent `NotOk`.
Status Cluster::parseSlotRange(const std::string &slots_str, std::vector<std::pair<int, int>> &slots) {
if (slots_str.empty()) {
return {Status::NotOK, "Don't use empty slots."};
}

std::vector<std::string> slot_ranges = util::Split(slots_str, " ");
if (slot_ranges.empty()) {
return {Status::NotOK,
fmt::format("Invalid slots: `{}`. No slots to parse. Please use spaces to separate slots.", slots_str)};
}

auto valid_range = NumericRange<int>{0, kClusterSlots - 1};
// Parse all slots (include slot ranges)
for (auto &slot_range : slot_ranges) {
if (slot_range.find('-') == std::string::npos) {
auto parse_result = ParseInt<int>(slot_range, valid_range, 10);
if (!parse_result) {
return {Status::NotOK, errInvalidSlotID};
}
}
return true;
#endif
};

// Parse all slots(include slot ranges)
for (const auto &slot_range : slot_ranges) {
if (is_number(slot_range)) {
int s_start = stoi(slot_range);
assert(IsValidSlot(s_start));
slots.emplace_back(std::make_pair(s_start, s_start));
continue;
}

// parse slot range: "int1-int2" (satisfy: int1 <= int2 )
if (slot_range.back() == '-') {
return {Status::NotOK,
fmt::format("Invalid slot range: {}. The character '-' can't appear in the last position.", slot_range)};
}
std::vector<std::string> fields = util::Split(slot_range, "-");
if (fields.size() != 2) {
return {Status::NotOK,
fmt::format("Invalid slot range: `{}`. The slot range should be of the form `int1-int2`.", slot_range)};
}
auto parse_start = ParseInt<int>(fields[0], valid_range, 10);
auto parse_end = ParseInt<int>(fields[1], valid_range, 10);
if (!parse_start || !parse_end || *parse_start > *parse_end) {
return {Status::NotOK,
fmt::format(
"Invalid slot range: {}. The slot range `int1-int2` needs to satisfy the condition (int1 <= int2).",
slot_range)};
}
slots.emplace_back(*parse_start, *parse_end);
}

return Status::OK();
}

Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes) {
std::vector<std::string> nodes_info = util::Split(nodes_str, "\n");
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Cluster {
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force);
Status GetClusterNodes(std::string *nodes_str);
Status SetNodeId(const std::string &node_id);
Status SetSlot(int slot, const std::string &node_id, int64_t version);
Status SetSlot(const std::string& slots_str, const std::string &node_id, int64_t version);
Status SetSlotMigrated(int slot, const std::string &ip_port);
Status SetSlotImported(int slot);
Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
Expand All @@ -104,6 +104,7 @@ class Cluster {
std::string genNodesInfo();
void updateSlotsInfo();
SlotInfo genSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n);
static Status parseSlotRange(const std::string&slots_str,std::vector<std::pair<int,int>> &slots);
static Status parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes);
Server *svr_;
Expand Down
1 change: 1 addition & 0 deletions src/cluster/redis_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

#pragma once
#include <cstdint>
#include <string>

// crc16
Expand Down
15 changes: 3 additions & 12 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,7 @@ class CommandClusterX : public Commander {

// CLUSTERX SETSLOT $SLOT_ID NODE $NODE_ID $VERSION
if (subcommand_ == "setslot" && args_.size() == 6) {
auto parse_id = ParseInt<int>(args[2], 10);
if (!parse_id) {
return {Status::RedisParseErr, errValueNotInteger};
}

slot_id_ = *parse_id;

if (!Cluster::IsValidSlot(slot_id_)) {
return {Status::RedisParseErr, "Invalid slot id"};
}
slots_str_ = args_[2];

if (strcasecmp(args_[3].c_str(), "node") != 0) {
return {Status::RedisParseErr, "Invalid setslot options"};
Expand Down Expand Up @@ -219,7 +210,7 @@ class CommandClusterX : public Commander {
*output = redis::Error(s.Msg());
}
} else if (subcommand_ == "setslot") {
Status s = svr->cluster->SetSlot(slot_id_, args_[4], set_version_);
Status s = svr->cluster->SetSlot(slots_str_, args_[4], set_version_);
if (s.IsOK()) {
need_persist_nodes_info = true;
*output = redis::SimpleString("OK");
Expand Down Expand Up @@ -251,7 +242,7 @@ class CommandClusterX : public Commander {
std::string dst_node_id_;
int64_t set_version_ = 0;
int64_t slot_ = -1;
int slot_id_ = -1;
std::string slots_str_;
bool force_ = false;
};

Expand Down
1 change: 1 addition & 0 deletions src/common/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rocksdb/slice.h>
#include <unistd.h>

#include <cstdint>
#include <string>

enum class Endian {
Expand Down
108 changes: 108 additions & 0 deletions tests/cppunit/cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

#include "cluster/cluster.h"

#include <fmt/core.h>
#include <gtest/gtest.h>

#include <algorithm>
#include <cstring>
#include <vector>

#include "server/server.h"

Expand Down Expand Up @@ -195,3 +197,109 @@ TEST(Cluster, TestDumpAndLoadClusterNodesInfo) {

unlink(nodes_filename.c_str());
}

TEST(Cluster, ClusterParseSlotRanges) {
Status s;
Cluster cluster(nullptr, {"127.0.0.1"}, 3002);
const std::string node_id = "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1";
int64_t version = 1;

const std::string right_nodes = node_id +
" 127.0.0.1 30002 "
"master - 0 123-456 789 831 8192-16381 16382 16383";
s = cluster.SetClusterNodes(right_nodes, version, false);
ASSERT_TRUE(s.IsOK());
ASSERT_TRUE(cluster.GetVersion() == version);
version++;

const std::string t_single_slot = "1234";
s = cluster.SetSlot(t_single_slot, node_id, version);
ASSERT_TRUE(s.IsOK());
version++;

const std::string t_single_ranges = "1234-1236";
s = cluster.SetSlot(t_single_ranges, node_id, version);
ASSERT_TRUE(s.IsOK());
version++;

const std::string t_mixed_slot = "10229 16301 4710 3557-8559 ";
s = cluster.SetSlot(t_mixed_slot, node_id, version);
ASSERT_TRUE(s.IsOK());
version++;

std::string empty_slots;
s = cluster.SetSlot(empty_slots, node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "No slots to parse.");

std::string space_slots = " ";
s = cluster.SetSlot(space_slots, node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == fmt::format("Invalid slots: `{}`. No slots to parse. "
"Please use spaces to separate slots.",
space_slots));

std::vector<std::string> error_slots;
std::string invalid_single_slot = "830849ad";
std::string unbound_single_slot = "1683093429";
std::string front_slot_ranges = "-1234-3456";
std::string back_slot_ranges = "1234-3456-";
std::string f_single_slot = "-6351";
std::string overmuch_slot_ranges = "12-34-56";
std::string f_cond_slot_ranges = "3456-1234";

error_slots.emplace_back(invalid_single_slot);
error_slots.emplace_back(unbound_single_slot);
error_slots.emplace_back(front_slot_ranges);
error_slots.emplace_back(back_slot_ranges);
error_slots.emplace_back(f_single_slot);
error_slots.emplace_back(overmuch_slot_ranges);
error_slots.emplace_back(f_cond_slot_ranges);

for (int i = 0; i < 2; i++) {
if (i == 1) {
for (auto &slot_str : error_slots) {
slot_str = t_mixed_slot + slot_str;
}
}

s = cluster.SetSlot(error_slots[0], node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid slot id: encounter non-integer characters");

s = cluster.SetSlot(error_slots[1], node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid slot id: out of numeric range");

s = cluster.SetSlot(error_slots[2], node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == fmt::format("Invalid slot range: `{}`. The character '-' can't appear "
"in the first or last position.",
front_slot_ranges));

s = cluster.SetSlot(error_slots[3], node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() ==
fmt::format("Invalid slot range: `{}`. The character '-' can't appear in the first or last position.",
back_slot_ranges));

s = cluster.SetSlot(error_slots[4], node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() ==
fmt::format("Invalid slot range: `{}`. The character '-' can't appear in the first or last position.",
f_single_slot));

s = cluster.SetSlot(error_slots[5], node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == fmt::format("Invalid slot range: `{}`. The slot range should be of the form `int1-int2`.",
overmuch_slot_ranges));

s = cluster.SetSlot(error_slots[6], node_id, version);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(
s.Msg() ==
fmt::format(
"Invalid slot range: `{}`. The slot range `int1-int2` needs to satisfy the condition (int1 <= int2).",
f_cond_slot_ranges));
}
}

0 comments on commit d245e9c

Please sign in to comment.