Skip to content

Commit

Permalink
refactor: remove start-slot-migration cmd #2727
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Mar 14, 2024
1 parent 95538db commit 6326b1c
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 40 deletions.
51 changes: 40 additions & 11 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
DCHECK(tl_cluster_config != nullptr);

// TODO rewrite with outgoing migrations
if (!StartSlotMigrations(new_config->GetIncomingMigrations(), cntx)) {
if (!StartSlotMigrations(new_config->GetOutgoingMigrations(), cntx)) {
return cntx->SendError("Can't start the migration");
}
RemoveFinishedMigrations();
Expand Down Expand Up @@ -615,7 +615,7 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());

auto* node = AddMigration(std::string(host_ip), port, std::move(slots));
auto* node = CreateIncomingMigration(std::string(host_ip), port, std::move(slots));
if (!node) {
return cntx->SendError("Can't start the migration, another one is in progress");
}
Expand All @@ -628,7 +628,8 @@ bool ClusterFamily::StartSlotMigrations(const std::vector<ClusterConfig::Migrati
ConnectionContext* cntx) {
// Add validating and error processing
for (auto m : migrations) {
auto* node = AddMigration(m.ip, m.port, m.slot_ranges);
auto sync_id = CreateOutgoingMigration(m.ip, m.port, m.slot_ranges);
auto node = GetOutgoingMigration(sync_id);
if (!node) {
return false;
}
Expand Down Expand Up @@ -702,6 +703,8 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
args.remove_prefix(1);
if (sub_cmd == "CONF") {
MigrationConf(args, cntx);
} else if (sub_cmd == "INIT") {
InitMigration(args, cntx);
} else if (sub_cmd == "FLOW") {
DflyMigrateFlow(args, cntx);
} else if (sub_cmd == "ACK") {
Expand All @@ -711,8 +714,8 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
}
}

ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t port,
SlotRanges slots) {
ClusterSlotMigration* ClusterFamily::CreateIncomingMigration(std::string host_ip, uint16_t port,
SlotRanges slots) {
lock_guard lk(migration_mu_);
for (const auto& mj : incoming_migrations_jobs_) {
if (auto info = mj->GetInfo(); info.host == host_ip && info.port == port) {
Expand Down Expand Up @@ -769,7 +772,8 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
}
}

auto sync_id = CreateOutgoingMigration(cntx, port, std::move(slots));
auto sync_id =
CreateOutgoingMigration(cntx->conn()->RemoteEndpointAddress(), port, std::move(slots));

cntx->conn()->SetName("slot_migration_ctrl");
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
Expand All @@ -779,8 +783,33 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
return;
}

uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
SlotRanges slots) {
void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create slot migration config " << args;
CmdArgParser parser{args};
auto [sync_id, port, flows_num] = parser.Next<uint32_t, uint32_t, uint32_t>();

SlotRanges slots;
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slots.emplace_back(SlotRange{slot_start, slot_end});
} while (parser.HasNext());

if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());

auto* node =
CreateIncomingMigration(cntx->conn()->RemoteEndpointAddress(), port, std::move(slots));

VLOG(1) << "Init migration " << cntx->conn()->RemoteEndpointAddress() << ":" << port;
node->Init(sync_id, flows_num);

// auto sync_id = CreateOutgoingMigration(cntx, port, std::move(slots));

// cntx->conn()->SetName("slot_migration_ctrl");
return cntx->SendOk();
}

uint32_t ClusterFamily::CreateOutgoingMigration(std::string host, uint16_t port, SlotRanges slots) {
std::lock_guard lk(migration_mu_);
auto sync_id = next_sync_id_++;
auto err_handler = [](const GenericError& err) {
Expand All @@ -789,8 +818,8 @@ uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_
// Todo add error processing, stop migration process
// fb2::Fiber("stop_Migration", &ClusterFamily::StopMigration, this, sync_id).Detach();
};
auto migration = make_shared<OutgoingMigration>(cntx->conn()->RemoteEndpointAddress(), port,
std::move(slots), err_handler, server_family_);
auto migration = make_shared<OutgoingMigration>(host, port, std::move(slots), sync_id,
err_handler, server_family_);
auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, std::move(migration));
CHECK(inserted);
return sync_id;
Expand Down Expand Up @@ -820,7 +849,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
EngineShard* shard = EngineShard::tlocal();
DCHECK(shard->shard_id() == shard_id);

migration->StartFlow(sync_id, server_family_->journal(), cntx->conn()->socket());
migration->StartFlow(server_family_->journal(), cntx->conn()->socket());
}

void ClusterFamily::FinalizeIncomingMigration(uint32_t local_sync_id) {
Expand Down
7 changes: 5 additions & 2 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ClusterFamily {
// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);

void InitMigration(CmdArgList args, ConnectionContext* cntx);

// DFLYMIGRATE CONF initiate first step in slots migration procedure
// MigrationConf process this request and saving slots range and
// target node port in outgoing_migration_jobs_.
Expand All @@ -73,14 +75,15 @@ class ClusterFamily {
void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);

// create a ClusterSlotMigration entity which will execute migration
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, SlotRanges slots);
ClusterSlotMigration* CreateIncomingMigration(std::string host_ip, uint16_t port,
SlotRanges slots);

bool StartSlotMigrations(const std::vector<ClusterConfig::MigrationInfo>& migrations,
ConnectionContext* cntx);
void RemoveFinishedMigrations();

// store info about migration and create unique session id
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, SlotRanges slots);
uint32_t CreateOutgoingMigration(std::string host_ip, uint16_t port, SlotRanges slots);

std::shared_ptr<OutgoingMigration> GetOutgoingMigration(uint32_t sync_id);

Expand Down
25 changes: 25 additions & 0 deletions src/server/cluster/cluster_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ error_code ClusterSlotMigration::Start(ConnectionContext* cntx) {
return {};
}

error_code ClusterSlotMigration::Init(uint32_t sync_id, uint32_t shards_num) {
VLOG(1) << "Init slot migration";

state_ = MigrationState::C_CONNECTING;

sync_id_ = sync_id;
source_shards_num_ = shards_num;

VLOG(1) << "Resolving host DNS";
error_code ec = ResolveHostDns();
if (ec)
return ec;

VLOG(1) << "Connecting to source";
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
if (ec)
return ec;

ResetParser(false);

sync_fb_ = fb2::Fiber("main_migration", &ClusterSlotMigration::MainMigrationFb, this);

return ec;
}

error_code ClusterSlotMigration::Greet() {
ResetParser(false);
VLOG(1) << "greeting message handling";
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/cluster_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class ClusterSlotMigration : private ProtocolClient {
SlotRanges slots);
~ClusterSlotMigration();

std::error_code Init(uint32_t sync_id, uint32_t shards_num);

// Initiate connection with source node and create migration fiber
std::error_code Start(ConnectionContext* cntx);
Info GetInfo() const;
Expand Down
51 changes: 47 additions & 4 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@

#include "server/cluster/outgoing_slot_migration.h"

#include <absl/flags/flag.h>

#include <atomic>

#include "absl/cleanup/cleanup.h"
#include "base/logging.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/streamer.h"
#include "server/server_family.h"

ABSL_DECLARE_FLAG(int, source_connect_timeout_ms);

ABSL_DECLARE_FLAG(uint16_t, admin_port);

using namespace std;
namespace dfly {

Expand Down Expand Up @@ -50,9 +57,12 @@ class OutgoingMigration::SliceSlotMigration {
};

OutgoingMigration::OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots,
Context::ErrHandler err_handler, ServerFamily* sf)
: host_ip_(ip),
uint32_t sync_id, Context::ErrHandler err_handler,
ServerFamily* sf)
: ProtocolClient(ip, port),
host_ip_(ip),
port_(port),
sync_id_(sync_id),
slots_(slots),
cntx_(err_handler),
slot_migrations_(shard_set->size()),
Expand All @@ -63,7 +73,7 @@ OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}

void OutgoingMigration::StartFlow(uint32_t sync_id, journal::Journal* journal, io::Sink* dest) {
void OutgoingMigration::StartFlow(journal::Journal* journal, io::Sink* dest) {
EngineShard* shard = EngineShard::tlocal();
DbSlice* slice = &shard->db_slice();

Expand All @@ -73,7 +83,7 @@ void OutgoingMigration::StartFlow(uint32_t sync_id, journal::Journal* journal, i
{
std::lock_guard lck(flows_mu_);
slot_migrations_[shard_id] =
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id, journal, &cntx_, dest);
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id_, journal, &cntx_, dest);
state = GetStateImpl();
}

Expand Down Expand Up @@ -141,4 +151,37 @@ void OutgoingMigration::SyncFb() {
// TODO add ACK here and config update
}

std::error_code OutgoingMigration::Start(ConnectionContext* cntx) {
VLOG(1) << "Starting outgoing migration";

auto check_connection_error = [&cntx](error_code ec, const char* msg) -> error_code {
if (ec) {
cntx->SendError(absl::StrCat(msg, ec.message()));
}
return ec;
};

VLOG(1) << "Resolving host DNS";
error_code ec = ResolveHostDns();
RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns"));

VLOG(1) << "Connecting to source";
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source"));

VLOG(1) << "Migration initiating";
ResetParser(false);
auto port = absl::GetFlag(FLAGS_admin_port);
auto cmd = absl::StrCat("DFLYMIGRATE INIT ", sync_id_, " ", port, " ", slot_migrations_.size());
for (const auto& s : slots_) {
absl::StrAppend(&cmd, " ", s.start, " ", s.end);
}
RETURN_ON_ERR(SendCommandAndReadResponse(cmd));
LOG_IF(ERROR, !CheckRespIsSimpleReply("OK")) << facade::ToSV(LastResponseArgs().front().GetBuf());

// sync_fb_ = fb2::Fiber("main_migration", &ClusterSlotMigration::MainMigrationFb, this);

return {};
}

} // namespace dfly
12 changes: 8 additions & 4 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "io/io.h"
#include "server/cluster/cluster_config.h"
#include "server/common.h"
#include "server/protocol_client.h"

namespace dfly {

Expand All @@ -17,15 +18,17 @@ class DbSlice;
class ServerFamily;

// Whole outgoing slots migration manager
class OutgoingMigration {
class OutgoingMigration : private ProtocolClient {
public:
OutgoingMigration() = default;
~OutgoingMigration();
OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, Context::ErrHandler,
ServerFamily* sf);
OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, uint32_t sync_id,
Context::ErrHandler, ServerFamily* sf);

std::error_code Start(ConnectionContext* cntx);

// should be run for all shards
void StartFlow(uint32_t sync_id, journal::Journal* journal, io::Sink* dest);
void StartFlow(journal::Journal* journal, io::Sink* dest);

void Finalize(uint32_t shard_id);
void Cancel(uint32_t shard_id);
Expand Down Expand Up @@ -54,6 +57,7 @@ class OutgoingMigration {
private:
std::string host_ip_;
uint16_t port_;
uint32_t sync_id_;
SlotRanges slots_;
Context cntx_;
mutable Mutex flows_mu_;
Expand Down
40 changes: 21 additions & 19 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
"replicas": [],
"migrations": [{{ "slot_ranges": [ {{ "start": 5200, "end": 5259 }} ]
, "ip": "127.0.0.1", "port" : {nodes[0].admin_port}, "target_id": "{node_ids[1]}" }}]
, "ip": "127.0.0.1", "port" : {nodes[1].admin_port}, "target_id": "{node_ids[1]}" }}]
}},
{{
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
Expand All @@ -841,24 +841,26 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)

await asyncio.sleep(0.5)
try:
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER",
"START-SLOT-MIGRATION",
"127.0.0.1",
str(nodes[0].admin_port),
"5000",
"5200",
)
assert False, "Should not be able to start slot migration"
except redis.exceptions.ResponseError as e:
assert e.args[0] == "Can't start the migration, another one is in progress"

await push_config(
config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"),
c_nodes_admin,
)
assert False, "Should not be able to start slot migration"

# await asyncio.sleep(0.5)
# try:
# await c_nodes_admin[1].execute_command(
# "DFLYCLUSTER",
# "START-SLOT-MIGRATION",
# "127.0.0.1",
# str(nodes[0].admin_port),
# "5000",
# "5200",
# )
# assert False, "Should not be able to start slot migration"
# except redis.exceptions.ResponseError as e:
# assert e.args[0] == "Can't start the migration, another one is in progress"

# await push_config(
# config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"),
# c_nodes_admin,
# )

await close_clients(*c_nodes, *c_nodes_admin)

Expand Down

0 comments on commit 6326b1c

Please sign in to comment.