Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: increase cluster migration default timeout #4293

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ bool IncomingSlotMigration::Join(long attempt) {
const absl::Duration passed = now - start;
VLOG(1) << "Checking whether to continue with join " << passed << " vs " << timeout;
if (passed >= timeout) {
LOG(WARNING) << "Can't join migration in time";
LOG(WARNING) << "Can't join migration in time for " << source_id_;
ReportError(GenericError("Can't join migration in time"));
return false;
}
Expand Down
22 changes: 14 additions & 8 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "server/server_family.h"
#include "util/fibers/synchronization.h"

ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations");
ABSL_FLAG(int, slot_migration_connection_timeout_ms, 5000, "Timeout for network operations");

using namespace std;
using namespace facade;
Expand Down Expand Up @@ -288,10 +288,12 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
if (cntx_.GetError()) {
return true;
}
VLOG(1) << "Reconnecting to source";
VLOG(1) << "Reconnecting " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
LOG(WARNING) << "Couldn't connect to source.";
LOG(WARNING) << "Couldn't connect " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
return false;
}
}
Expand All @@ -306,7 +308,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
nullptr, ClientPause::WRITE, is_pause_in_progress);

if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";
LOG(WARNING) << "Migration finalization time out " << cf_->MyID() << " : "
<< migration_info_.node_info.id << " attempt " << attempt;
}

absl::Cleanup cleanup([&is_block_active, &pause_fb_opt]() {
Expand Down Expand Up @@ -335,7 +338,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
const absl::Time now = absl::Now();
const absl::Duration passed = now - start;
if (passed >= timeout) {
LOG(WARNING) << "Timeout fot ACK " << attempt;
LOG(WARNING) << "Timeout fot ACK " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
return false;
}

Expand All @@ -345,15 +349,17 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}

if (!CheckRespFirstTypes({RespExpr::INT64})) {
LOG(WARNING) << "Incorrect response type: "
<< facade::ToSV(LastResponseArgs().front().GetBuf());
LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : "
<< migration_info_.node_info.id << " attempt " << attempt
<< " msg: " << facade::ToSV(LastResponseArgs().front().GetBuf());
return false;
}

if (const auto res = get<int64_t>(LastResponseArgs().front().u); res == attempt) {
break;
} else {
LOG(WARNING) << "Incorrect attempt payload, sent " << attempt << " received " << res;
LOG(WARNING) << "Incorrect attempt payload " << cf_->MyID() << " : "
<< migration_info_.node_info.id << ", sent " << attempt << " received " << res;
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ async def all_finished():
res = False
return res

@assert_eventually(times=500)
@assert_eventually(times=600)
async def test_all_finished():
assert await all_finished()

Expand Down
Loading