Skip to content

Commit

Permalink
支持不同topic指定专属partitionNum (#2479)
Browse files Browse the repository at this point in the history
  • Loading branch information
GuangYaoLee92 authored Aug 22, 2020
1 parent b79af46 commit b81e9df
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class MQDestination {
private Integer partitionsNum;
private String partitionHash;
private String dynamicTopic;
private String dynamicTopicPartitionNum;

public String getCanalDestination() {
return canalDestination;
Expand Down Expand Up @@ -62,4 +63,12 @@ public String getDynamicTopic() {
public void setDynamicTopic(String dynamicTopic) {
this.dynamicTopic = dynamicTopic;
}

public String getDynamicTopicPartitionNum() {
return dynamicTopicPartitionNum;
}

public void setDynamicTopicPartitionNum(String dynamicTopicPartitionNum) {
this.dynamicTopicPartitionNum = dynamicTopicPartitionNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,38 @@ public List<DynamicTopicData> apply(String pkHashConfigs) {
}
});

private static Map<String, List<TopicPartitionData>> topicPartitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
.softValues(),
new Function<String, List<TopicPartitionData>>() {

public List<TopicPartitionData> apply(String tPConfigs) {
List<TopicPartitionData> datas = Lists.newArrayList();
String[] tPArray = StringUtils.split(StringUtils.replace(tPConfigs,
",",
";"),
";");
for (String tPConfig : tPArray) {
TopicPartitionData data = new TopicPartitionData();
int i = tPConfig.lastIndexOf(":");
if (i > 0) {
String tStr = tPConfig.substring(0, i);
String pStr = tPConfig.substring(i + 1);
if (!isWildCard(tStr)) {
data.simpleName = tStr;
} else {
data.regexFilter = new AviaterRegexFilter(tStr);
}
if (!StringUtils.isEmpty(pStr) && StringUtils.isNumeric(pStr)) {
data.partitionNum = Integer.valueOf(pStr);
}
datas.add(data);
}
}

return datas;
}
});

/**
* 按 schema 或者 schema+table 将 message 分配到对应topic
*
Expand Down Expand Up @@ -619,6 +651,24 @@ public static boolean checkPkNamesHasContain(List<String> pkNames, String name)
return false;
}

public static Integer parseDynamicTopicPartition(String name, String tPConfigs) {
if (!StringUtils.isEmpty(tPConfigs)) {
List<TopicPartitionData> datas = topicPartitionDatas.get(tPConfigs);
for (TopicPartitionData data : datas) {
if (data.simpleName != null) {
if (data.simpleName.equalsIgnoreCase(name)) {
return data.partitionNum;
}
} else {
if (data.regexFilter.filter(name)) {
return data.partitionNum;
}
}
}
}
return null;
}

private static boolean isWildCard(String value) {
// not contaiins '.' ?
return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
Expand Down Expand Up @@ -656,6 +706,13 @@ public static class DynamicTopicData {
public AviaterRegexFilter tableRegexFilter;
}

public static class TopicPartitionData {

public String simpleName;
public AviaterRegexFilter regexFilter;
public Integer partitionNum;
}

public static class EntryRowData {

public Entry entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,19 @@ public void send(MQDestination mqDestination, Message message, Callback callback

private List<Future> send(MQDestination mqDestination, String topicName, Message message, boolean flat) {
List<ProducerRecord<String, byte[]>> records = new ArrayList<>();
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName, mqDestination.getDynamicTopicPartitionNum());
if (partitionNum == null) {
partitionNum = mqDestination.getPartitionsNum();
}
if (!flat) {
if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
// 并发构造
EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
Message[] messages = MQMessageUtils.messagePartition(datas,
message.getId(),
mqDestination.getPartitionsNum(),
partitionNum,
mqDestination.getPartitionHash(),
this.mqProperties.isDatabaseHash());
int length = messages.length;
Expand Down Expand Up @@ -233,7 +238,7 @@ private List<Future> send(MQDestination mqDestination, String topicName, Message
for (FlatMessage flatMessage : flatMessages) {
if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
mqDestination.getPartitionsNum(),
partitionNum,
mqDestination.getPartitionHash(),
this.mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,19 @@ public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Mes
}

public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName, destination.getDynamicTopicPartitionNum());
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
if (!mqProperties.isFlatMessage()) {
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
message.getId(),
destination.getPartitionsNum(),
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = messages.length;
Expand Down Expand Up @@ -207,7 +212,7 @@ public void send(final MQDestination destination, String topicName, com.alibaba.

for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
destination.getPartitionsNum(),
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
Expand Down
1 change: 1 addition & 0 deletions deployer/src/main/resources/example/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
1 change: 1 addition & 0 deletions deployer/src/main/resources/spring/default-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,6 @@
<property name="partition" value="${canal.mq.partition}" />
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
<property name="partitionHash" value="${canal.mq.partitionHash}" />
<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
</bean>
</beans>
1 change: 1 addition & 0 deletions deployer/src/main/resources/spring/file-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,5 +186,6 @@
<property name="partition" value="${canal.mq.partition}" />
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
<property name="partitionHash" value="${canal.mq.partitionHash}" />
<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
</bean>
</beans>
1 change: 1 addition & 0 deletions deployer/src/main/resources/spring/group-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -276,5 +276,6 @@
<property name="partition" value="${canal.mq.partition}" />
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
<property name="partitionHash" value="${canal.mq.partitionHash}" />
<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
</bean>
</beans>
1 change: 1 addition & 0 deletions deployer/src/main/resources/spring/memory-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,6 @@
<property name="partition" value="${canal.mq.partition}" />
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
<property name="partitionHash" value="${canal.mq.partitionHash}" />
<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class CanalMQConfig {
private Integer partitionsNum;
private String partitionHash;
private String dynamicTopic;
private String dynamicTopicPartitionNum;

public String getTopic() {
return topic;
Expand Down Expand Up @@ -47,4 +48,12 @@ public String getDynamicTopic() {
public void setDynamicTopic(String dynamicTopic) {
this.dynamicTopic = dynamicTopic;
}

public String getDynamicTopicPartitionNum() {
return dynamicTopicPartitionNum;
}

public void setDynamicTopicPartitionNum(String dynamicTopicPartitionNum) {
this.dynamicTopicPartitionNum = dynamicTopicPartitionNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private void worker(String destination, AtomicBoolean destinationRunning) {
canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
canalDestination.setPartitionHash(mqConfig.getPartitionHash());
canalDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());

canalServer.subscribe(clientIdentity);
logger.info("## the MQ producer: {} is running now ......", destination);
Expand Down

0 comments on commit b81e9df

Please sign in to comment.