Skip to content

Commit

Permalink
[ISSUE #6316] Nameserver should choose a master with a larger epoch w…
Browse files Browse the repository at this point in the history
…hen there are two masters in controller mode (#6317)
  • Loading branch information
RongtongJin authored Mar 13, 2023
1 parent d0df051 commit 20dc5c9
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1548,8 +1548,6 @@ protected void startBasicService() throws Exception {
this.brokerPreOnlineService.start();
}

//Init state version after messageStore initialized.
this.topicConfigManager.initStateVersion();
}

public void start() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch

schedulingCheckSyncStateSet();

this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);

this.executorService.submit(() -> {
// Register broker to name-srv
try {
Expand Down Expand Up @@ -243,6 +245,8 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc
// Notify ha service, change to slave
this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());

this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);

this.executorService.submit(() -> {
// Register broker to name-srv
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,8 @@ private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ct
LOGGER.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
this.brokerController.getConfiguration().update(properties);
if (properties.containsKey("brokerPermission")) {
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(stateMachineVersion);
this.brokerController.registerBrokerAll(false, false, true);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ private void updateForbiddenValue(String group, String topic, Integer forbidden)
log.info("set group forbidden, {}@{} old: {} new: {}", group, topic, 0, forbidden);
}

this.dataVersion.nextVersion();
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);

this.persist();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public TopicConfig createTopicIfAbsent(TopicConfig topicConfig, boolean register
}
log.info("Create new topic [{}] config:[{}]", topicConfig.getTopicName(), topicConfig);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
this.dataVersion.nextVersion();
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
createNew = true;
this.persist();
} finally {
Expand Down Expand Up @@ -394,7 +395,8 @@ public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQue
log.info("create new topic {}", topicConfig);
this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, topicConfig);
createNew = true;
this.dataVersion.nextVersion();
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
this.persist();
} finally {
this.topicConfigTableLock.unlock();
Expand Down Expand Up @@ -540,7 +542,8 @@ public void deleteTopicConfig(final String topic) {
TopicConfig old = this.topicConfigTable.remove(topic);
if (old != null) {
log.info("delete topic config OK, topic: {}", old);
this.dataVersion.nextVersion();
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
this.persist();
} else {
log.warn("delete topic config failed, topic: {} not exists", topic);
Expand All @@ -556,12 +559,6 @@ public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
return topicConfigSerializeWrapper;
}

public void initStateVersion() {
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
this.persist();
}

@Override
public String encode() {
return encode(false);
Expand Down Expand Up @@ -734,4 +731,6 @@ private String realKey(String key) {
public boolean containsTopic(String topic) {
return topicConfigTable.containsKey(topic);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public class DefaultMessageStore implements MessageStore {

private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);

private long stateMachineVersion = 0L;

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
Expand Down Expand Up @@ -1909,7 +1911,11 @@ public DispatchRequest checkMessageAndReturnSize(final ByteBuffer byteBuffer, fi

@Override
public long getStateMachineVersion() {
return 0L;
return stateMachineVersion;
}

public void setStateMachineVersion(long stateMachineVersion) {
this.stateMachineVersion = stateMachineVersion;
}

public BrokerStatsManager getBrokerStatsManager() {
Expand Down Expand Up @@ -3215,4 +3221,6 @@ public boolean isTransientStorePoolEnable() {
return this.messageStoreConfig.isTransientStorePoolEnable() &&
(this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ public boolean changeToMaster(int masterEpoch) {
}

LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());

this.defaultMessageStore.recoverTopicQueueTable();
this.defaultMessageStore.setStateMachineVersion(masterEpoch);
LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
return true;
}
Expand Down Expand Up @@ -178,6 +178,8 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav
defaultMessageStore.getTransientStorePool().setRealCommit(false);
}

this.defaultMessageStore.setStateMachineVersion(newMasterEpoch);

LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
return true;
} catch (final Exception e) {
Expand Down

0 comments on commit 20dc5c9

Please sign in to comment.