Skip to content

Commit

Permalink
Merge pull request #958 from rewerma/master
Browse files Browse the repository at this point in the history
kafka生产端增加按pk hash到对应partition功能
  • Loading branch information
agapple authored Sep 21, 2018
2 parents 09df602 + 19b7f73 commit 05718a9
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 73 deletions.
10 changes: 7 additions & 3 deletions deployer/src/main/resources/kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ canalGetTimeout: 100
flatMessage: true

canalDestinations:
- canalDestination: example
topic: example
partition:
- canalDestination: example
topic: exp3
# #对应topic分区数量
# partitionsNum: 3
# partitionHash:
# #库名.表名: 唯一主键
# mytest.person: id


Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
package com.alibaba.otter.canal.protocol;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Table;
import java.util.*;

import com.google.protobuf.ByteString;

/**
Expand All @@ -31,7 +25,7 @@ public class FlatMessage implements Serializable {
private List<Map<String, String>> data;
private List<Map<String, String>> old;

public FlatMessage() {
public FlatMessage(){
}

public FlatMessage(long id){
Expand Down Expand Up @@ -126,6 +120,12 @@ public void setOld(List<Map<String, String>> old) {
this.old = old;
}

/**
* 将Message转换为FlatMessage
*
* @param message 原生message
* @return FlatMessage列表
*/
public static List<FlatMessage> messageConverter(Message message) {
try {
if (message == null) {
Expand Down Expand Up @@ -231,11 +231,22 @@ public static List<FlatMessage> messageConverter(Message message) {
}
}

/**
* 将FlatMessage按指定的字段值hash拆分
*
* @param flatMessage flatMessage
* @param partitionsNum 分区数量
* @param pkHashConfig hash映射
* @return 拆分后的flatMessage数组
*/
public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
Table<String, String, String> pkHashConfig) {
Map<String, String> pkHashConfig) {
if (partitionsNum == null) {
partitionsNum = 1;
}
FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];

String pk = pkHashConfig.get(flatMessage.getDatabase(), flatMessage.getTable());
String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
if (pk == null || flatMessage.getIsDdl()) {
partitionMessages[0] = flatMessage;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,18 @@ public void stop() {
}
}

public void send(KafkaProperties.Topic topic, Message message, Callback callback) {
public void send(KafkaProperties.CanalDestination canalDestination, Message message, Callback callback) {
try {
// producer.beginTransaction();
if (!kafkaProperties.getFlatMessage()) {
ProducerRecord<String, Message> record;
if (topic.getPartition() != null) {
record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
if (canalDestination.getPartition() != null) {
record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
canalDestination.getPartition(),
null,
message);
} else {
record = new ProducerRecord<String, Message>(topic.getTopic(), message);
record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
}

producer.send(record);
Expand All @@ -84,17 +87,46 @@ record = new ProducerRecord<String, Message>(topic.getTopic(), message);
List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
if (flatMessages != null) {
for (FlatMessage flatMessage : flatMessages) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic.getTopic(),
JSON.toJSONString(flatMessage));
producer2.send(record);
if (canalDestination.getPartition() != null) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination
.getTopic(), canalDestination.getPartition(), null, JSON.toJSONString(flatMessage));
producer2.send(record);
} else {
if (canalDestination.getPartitionHash() != null
&& !canalDestination.getPartitionHash().isEmpty()) {
FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
canalDestination.getPartitionsNum(),
canalDestination.getPartitionHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
FlatMessage flatMessagePart = partitionFlatMessage[i];
if (flatMessagePart != null) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
canalDestination.getTopic(),
i,
null,
JSON.toJSONString(flatMessagePart));
producer2.send(record);
}
}
} else {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
canalDestination.getTopic(),
0,
null,
JSON.toJSONString(flatMessage));
producer2.send(record);
}
}

}
}
}

// producer.commitTransaction();
callback.commit();
if (logger.isDebugEnabled()) {
logger.debug("send message to kafka topic: {}", topic.getTopic());
logger.debug("send message to kafka topic: {}", canalDestination.getTopic());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.yaml.snakeyaml.Yaml;

import com.alibaba.otter.canal.kafka.KafkaProperties.CanalDestination;
import com.alibaba.otter.canal.kafka.KafkaProperties.Topic;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.CanalServerStarter;
Expand Down Expand Up @@ -132,10 +131,7 @@ private void worker(CanalDestination destination) {
try {
int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
if (batchId != -1 && size != 0) {
Topic topic = new Topic();
topic.setTopic(destination.getTopic());
topic.setPartition(destination.getPartition());
canalKafkaProducer.send(topic, message, new CanalKafkaProducer.Callback() {
canalKafkaProducer.send(destination, message, new CanalKafkaProducer.Callback() {

@Override
public void commit() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.alibaba.otter.canal.kafka;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;

/**
* kafka 配置项
Expand All @@ -27,10 +26,11 @@ public class KafkaProperties {

public static class CanalDestination {

private String canalDestination;
private String topic;
private Integer partition;
private Set<Topic> topics = new HashSet<Topic>();
private String canalDestination;
private String topic;
private Integer partition;
private Integer partitionsNum;
private Map<String, String> partitionHash;

public String getCanalDestination() {
return canalDestination;
Expand All @@ -56,52 +56,20 @@ public void setPartition(Integer partition) {
this.partition = partition;
}

public Set<Topic> getTopics() {
return topics;
public Integer getPartitionsNum() {
return partitionsNum;
}

public void setTopics(Set<Topic> topics) {
this.topics = topics;
public void setPartitionsNum(Integer partitionsNum) {
this.partitionsNum = partitionsNum;
}
}

public static class Topic {

private String topic;
private Integer partition;

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public Integer getPartition() {
return partition;
}

public void setPartition(Integer partition) {
this.partition = partition;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Topic topic1 = (Topic) o;

if (topic != null ? !topic.equals(topic1.topic) : topic1.topic != null) return false;
return partition != null ? partition.equals(topic1.partition) : topic1.partition == null;
public Map<String, String> getPartitionHash() {
return partitionHash;
}

@Override
public int hashCode() {
int result = topic != null ? topic.hashCode() : 0;
result = 31 * result + (partition != null ? partition.hashCode() : 0);
return result;
public void setPartitionHash(Map<String, String> partitionHash) {
this.partitionHash = partitionHash;
}
}

Expand Down

0 comments on commit 05718a9

Please sign in to comment.