Skip to content

Commit

Permalink
bugfix: after scaling down a Raft cluster, the metadata still contain…
Browse files Browse the repository at this point in the history
…s the removed node (#6855)
  • Loading branch information
funky-eyes authored Sep 16, 2024
1 parent 963f7f3 commit 22f9c47
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
2 changes: 1 addition & 1 deletion changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6840](https://github.com/apache/incubator-seata/pull/6840)] Fix the issue of unsafe deserialization in ProcessorYaml.java
- [[#6843](https://github.com/apache/incubator-seata/pull/6843)] Fix 403 error when sending a POST request from the console
- [[#6850](https://github.com/apache/incubator-seata/pull/6850)] raft mode is backward compatible with version 2.0

- [[#6855](https://github.com/apache/incubator-seata/pull/6855)] after scaling down a Raft cluster, the metadata still contains the removed node


### optimize:
Expand Down
3 changes: 2 additions & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
- [[#6845](https://github.com/apache/incubator-seata/pull/6845)] 修复rocksdb open相同文件多次的问题
- [[#6840](https://github.com/apache/incubator-seata/pull/6840)] 修复ProcessorYaml中不安全的反序列化
- [[#6843](https://github.com/apache/incubator-seata/pull/6843)] 修复从控制台发送POST请求时出现的403错误
- [[#6850](https://github.com/apache/incubator-seata/pull/6850)] raft mode is backward compatible with version 2.0
- [[#6850](https://github.com/apache/incubator-seata/pull/6850)] raft模式向下兼容2.0版本
- [[#6855](https://github.com/apache/incubator-seata/pull/6855)] 修复raft缩容后元数据中残留该节点的问题(需先升级到2.2再进行缩容)


### optimize:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public boolean equals(Object o) {
return Objects.equals(control, node.control) && Objects.equals(transaction, node.transaction);
}


// convert to String
public String toJsonString(ObjectMapper objectMapper) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ public void onLeaderStart(final long term) {
SeataClusterContext.unbindGroup();
}
});
Configuration conf = RouteTable.getInstance().getConfiguration(group);
// A member change might trigger a leader re-election. At this point, it’s necessary to filter out non-existent members and synchronize again.
changePeers(conf);
}
}

Expand All @@ -262,28 +265,40 @@ public void onStartFollowing(final LeaderChangeContext ctx) {
public void onConfigurationCommitted(Configuration conf) {
LOGGER.info("groupId: {}, onConfigurationCommitted: {}.", group, conf);
RouteTable.getInstance().updateConfiguration(group, conf);
// After a member change, the metadata needs to be synchronized again.
initSync.compareAndSet(true, false);
if (isLeader()) {
lock.lock();
try {
List<PeerId> newFollowers = conf.getPeers();
Set<PeerId> newLearners = conf.getLearners();
List<Node> currentFollowers = raftClusterMetadata.getFollowers();
if (CollectionUtils.isNotEmpty(newFollowers)) {
raftClusterMetadata.setFollowers(currentFollowers.stream()
.filter(node -> contains(node, newFollowers)).collect(Collectors.toList()));
}
if (CollectionUtils.isNotEmpty(newLearners)) {
raftClusterMetadata.setLearner(raftClusterMetadata.getLearner().stream()
.filter(node -> contains(node, newLearners)).collect(Collectors.toList()));
}
syncMetadata();
} finally {
lock.unlock();
changePeers(conf);
}
}

private void changePeers(Configuration conf) {
lock.lock();
try {
List<PeerId> newFollowers = conf.getPeers();
Set<PeerId> newLearners = conf.getLearners();
List<Node> currentFollowers = raftClusterMetadata.getFollowers();
if (CollectionUtils.isNotEmpty(newFollowers)) {
raftClusterMetadata.setFollowers(currentFollowers.stream().filter(node -> contains(node, newFollowers))
.collect(Collectors.toList()));
}
if (CollectionUtils.isNotEmpty(newLearners)) {
raftClusterMetadata.setLearner(raftClusterMetadata.getLearner().stream()
.filter(node -> contains(node, newLearners)).collect(Collectors.toList()));
} else {
raftClusterMetadata.setLearner(Collections.emptyList());
}
CompletableFuture.runAsync(this::syncMetadata, RESYNC_METADATA_POOL);
} finally {
lock.unlock();
}
}

private boolean contains(Node node, Collection<PeerId> list) {
// This indicates that the node is of a lower version.
// When scaling up or down on a higher version
// you need to ensure that the cluster is consistent first
// otherwise, the lower version nodes may be removed.
if (node.getInternal() == null) {
return true;
}
Expand Down Expand Up @@ -367,25 +382,24 @@ public void refreshClusterMetadata(RaftBaseMsg syncMsg) {
}

private void syncCurrentNodeInfo(String group) {
if (initSync.get()) {
return;
}
try {
RouteTable.getInstance().refreshLeader(RaftServerManager.getCliClientServiceInstance(), group, 1000);
PeerId peerId = RouteTable.getInstance().selectLeader(group);
if (peerId != null) {
syncCurrentNodeInfo(peerId);
if (initSync.compareAndSet(false, true)) {
try {
RouteTable.getInstance().refreshLeader(RaftServerManager.getCliClientServiceInstance(), group, 1000);
PeerId peerId = RouteTable.getInstance().selectLeader(group);
if (peerId != null) {
syncCurrentNodeInfo(peerId);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

private void syncCurrentNodeInfo(PeerId leaderPeerId) {
try {
// Ensure that the current leader must be version 2.1 or later to synchronize the operation
Node leader = raftClusterMetadata.getLeader();
if (leader != null && StringUtils.isNotBlank(leader.getVersion()) && initSync.compareAndSet(false, true)) {
if (leader != null && StringUtils.isNotBlank(leader.getVersion())) {
RaftServer raftServer = RaftServerManager.getRaftServer(group);
PeerId cureentPeerId = raftServer.getServerId();
Node node = raftClusterMetadata.createNode(XID.getIpAddress(), XID.getPort(), cureentPeerId.getPort(),
Expand Down

0 comments on commit 22f9c47

Please sign in to comment.