Skip to content

Commit

Permalink
fixed issue alibaba#1636
Browse files Browse the repository at this point in the history
非flat模式,当MQ主题为多分区时,数据拆分只取了for循环里面的最后一份数据,
导致消费端只能消费到一个分区的数据
  • Loading branch information
wangsaner committed Mar 27, 2019
1 parent 0ec4699 commit 739963a
Showing 1 changed file with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.alibaba.otter.canal.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -138,7 +139,7 @@ public void send(MQProperties.CanalDestination canalDestination, Message message
private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
throws Exception {
if (!kafkaProperties.getFlatMessage()) {
ProducerRecord<String, Message> record = null;
List<ProducerRecord<String, Message>> records = new ArrayList<>();
if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
Message[] messages = MQMessageUtils.messagePartition(message,
canalDestination.getPartitionsNum(),
Expand All @@ -147,25 +148,23 @@ private void send(MQProperties.CanalDestination canalDestination, String topicNa
for (int i = 0; i < length; i++) {
Message messagePartition = messages[i];
if (messagePartition != null) {
record = new ProducerRecord<>(topicName, i, null, messagePartition);
records.add(new ProducerRecord<>(topicName, i, null, messagePartition));
}
}
} else {
final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
record = new ProducerRecord<>(topicName, partition, null, message);
records.add(new ProducerRecord<>(topicName, partition, null, message));
}

if (record != null) {
if (kafkaProperties.getTransaction()) {
producer.send(record).get();
} else {
producer.send(record).get();
}

if (logger.isDebugEnabled()) {
logger.debug("Send message to kafka topic: [{}], packet: {}", topicName, message.toString());
}
}
if (!records.isEmpty()) {
for (ProducerRecord<String, Message> record : records) {
producer.send(record).get();
}

if (logger.isDebugEnabled()) {
logger.debug("Send message to kafka topic: [{}], packet: {}", topicName, message.toString());
}
}
} else {
// 发送扁平数据json
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
Expand Down

0 comments on commit 739963a

Please sign in to comment.