Skip to content

Commit

Permalink
fix: increase cluster migration default timeout (#4293)
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev authored Dec 11, 2024
1 parent 76f79f0 commit f892d9b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ bool IncomingSlotMigration::Join(long attempt) {
const absl::Duration passed = now - start;
VLOG(1) << "Checking whether to continue with join " << passed << " vs " << timeout;
if (passed >= timeout) {
LOG(WARNING) << "Can't join migration in time";
LOG(WARNING) << "Can't join migration in time for " << source_id_;
ReportError(GenericError("Can't join migration in time"));
return false;
}
Expand Down
22 changes: 14 additions & 8 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "server/server_family.h"
#include "util/fibers/synchronization.h"

ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations");
ABSL_FLAG(int, slot_migration_connection_timeout_ms, 5000, "Timeout for network operations");

using namespace std;
using namespace facade;
Expand Down Expand Up @@ -288,10 +288,12 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
if (cntx_.GetError()) {
return true;
}
VLOG(1) << "Reconnecting to source";
VLOG(1) << "Reconnecting " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
LOG(WARNING) << "Couldn't connect to source.";
LOG(WARNING) << "Couldn't connect " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
return false;
}
}
Expand All @@ -306,7 +308,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
nullptr, ClientPause::WRITE, is_pause_in_progress);

if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";
LOG(WARNING) << "Migration finalization time out " << cf_->MyID() << " : "
<< migration_info_.node_info.id << " attempt " << attempt;
}

absl::Cleanup cleanup([&is_block_active, &pause_fb_opt]() {
Expand Down Expand Up @@ -335,7 +338,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
const absl::Time now = absl::Now();
const absl::Duration passed = now - start;
if (passed >= timeout) {
LOG(WARNING) << "Timeout fot ACK " << attempt;
LOG(WARNING) << "Timeout fot ACK " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
return false;
}

Expand All @@ -345,15 +349,17 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}

if (!CheckRespFirstTypes({RespExpr::INT64})) {
LOG(WARNING) << "Incorrect response type: "
<< facade::ToSV(LastResponseArgs().front().GetBuf());
LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : "
<< migration_info_.node_info.id << " attempt " << attempt
<< " msg: " << facade::ToSV(LastResponseArgs().front().GetBuf());
return false;
}

if (const auto res = get<int64_t>(LastResponseArgs().front().u); res == attempt) {
break;
} else {
LOG(WARNING) << "Incorrect attempt payload, sent " << attempt << " received " << res;
LOG(WARNING) << "Incorrect attempt payload " << cf_->MyID() << " : "
<< migration_info_.node_info.id << ", sent " << attempt << " received " << res;
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ async def all_finished():
res = False
return res

@assert_eventually(times=500)
@assert_eventually(times=600)
async def test_all_finished():
assert await all_finished()

Expand Down

0 comments on commit f892d9b

Please sign in to comment.