Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.1.3-alpha-3版kafka主题为多分区时,消费到的数据不完整 #1636

Closed
wangsaner opened this issue Mar 26, 2019 · 3 comments
Closed
Assignees
Labels
Milestone

Comments

@wangsaner
Copy link
Contributor

wangsaner commented Mar 26, 2019

environment

  • canal version v1.1.3-alpha-3
  • mysql version 5.6

Issue Description

kafka主题为多分区时,消费端消费到的数据不完整

canal.properties主要配置如下:
######### binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = true
canal.instance.filter.table.error = true
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
##################################################
######### MQ #############
##################################################
canal.mq.servers = 192.168.1.15:9092,192.169.1.16:9092,192.168.1.15:9092
canal.mq.retries = 3
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
######### use transaction for kafka flatMessage batch produce
canal.mq.transaction = false

instance.properties主要配置如下:
######### table regex
canal.instance.filter.regex=schema.table
######### table black regex
canal.instance.filter.black.regex=
######### mq config
canal.mq.topic=topic
canal.mq.partition=0
######### hash partition config
canal.mq.partitionsNum=8
canal.mq.partitionHash=.*\\..*

######### kafka消费端 ########
@KafkaListener(topics = "${spring.kafka.consumer.topic}",
containerFactory = "kafkaListenerContainerFactory")
public void consumerListener(KafkaMessage message, Acknowledgment ack) {
try {
boolean success = true;
Message canalMessage = message.getMessage();
if(canalMessage != null) {
if (canalMessage.getId() != -1 &&
canalMessage.getEntries().size() > 0) {
success = printEntry(canalMessage.getEntries());
}
}
if(success)
ack.acknowledge();
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

Steps to reproduce

Expected behaviour

Actual behaviour

@wangsaner wangsaner reopened this Mar 27, 2019
@wangsaner wangsaner changed the title v1.1.3-alpha-3版kafka主题为多分区时,消费端丢数据 v1.1.3-alpha-3版kafka主题为多分区时,消费到的数据不完整 Mar 27, 2019
@agapple
Copy link
Member

agapple commented Mar 27, 2019

测试和验证的方式? 我之前测试过非flat模式,数据会被拆分到多个parition,总数是对的上

@wangsaner
Copy link
Contributor Author

wangsaner commented Mar 27, 2019

测试和验证的方式? 我之前测试过非flat模式,数据会被拆分到多个parition,总数是对的上

我是直接update一张表的所有记录,当主题为单分区的时候,消费端可以消费到所有变更记录,
但当将主题改为8个分区后,再修改表时,发现只能消费到一个分区的数据

@wangsaner
Copy link
Contributor Author

wangsaner commented Mar 27, 2019

找到问题,数据拆分到多个parition时,只取了for循环里面的最后一份拆分数据(com.alibaba.otter.canal.kafka.CanalKafkaProducer) :

之前:

   private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
                                                                                                    throws Exception {
    if (!kafkaProperties.getFlatMessage()) {
        ProducerRecord<String, Message> record = null;
        if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
            Message[] messages = MQMessageUtils.messagePartition(message,
                canalDestination.getPartitionsNum(),
                canalDestination.getPartitionHash());
            int length = messages.length;
            for (int i = 0; i < length; i++) {
                Message messagePartition = messages[i];
                if (messagePartition != null) {
                    //此处结果应该放至集合中
                    record = new ProducerRecord<>(topicName, i, null, messagePartition);
                }
            }
        } else {
            final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
            record = new ProducerRecord<>(topicName, partition, null, message);
        }                                                                                                                                                          
       if (record != null) {
            if (kafkaProperties.getTransaction()) {
                producer.send(record).get();
            } else {
                producer.send(record).get();
            } if (logger.isDebugEnabled()) {
                logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
            }
        }
    } else {
        // 发送扁平数据json
        List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
        if (flatMessages != null) {
            for (FlatMessage flatMessage : flatMessages) {
                if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        canalDestination.getPartitionsNum(),
                        canalDestination.getPartitionHash());
                    int length = partitionFlatMessage.length;
                    for (int i = 0; i < length; i++) {
                        FlatMessage flatMessagePart = partitionFlatMessage[i];
                        if (flatMessagePart != null) {
                            produce(topicName, i, flatMessagePart);
                        }
                    }
                } else {
                    final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                    produce(topicName, partition, flatMessage);
                }

                if (logger.isDebugEnabled()) {
                    logger.debug("Send flat message to kafka topic: [{}], packet: {}",
                        topicName,
                        JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                }
            }
        }
    }
}

修改后:

   private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
                                                                                                    throws Exception {
    if (!kafkaProperties.getFlatMessage()) {
        //update
        List<ProducerRecord<String, Message>> records = new ArrayList<>();
        if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
            Message[] messages = MQMessageUtils.messagePartition(message,
                canalDestination.getPartitionsNum(),
                canalDestination.getPartitionHash());
            int length = messages.length;
            for (int i = 0; i < length; i++) {
                Message messagePartition = messages[i];
                if (messagePartition != null) {
                    //update
                    records.add(new ProducerRecord<>(topicName, i, null, messagePartition));
                }
            }
        } else {
            final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
            //update
            records.add(new ProducerRecord<>(topicName, partition, null, message));
        }
        //update
        if (!records.isEmpty()) {
            for (ProducerRecord<String, Message> record : records) 
        		producer.send(record).get();if (logger.isDebugEnabled()) {
                logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
            }
        }
    } else {
        // 发送扁平数据json
        List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
        if (flatMessages != null) {
            for (FlatMessage flatMessage : flatMessages) {
                if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        canalDestination.getPartitionsNum(),
                        canalDestination.getPartitionHash());
                    int length = partitionFlatMessage.length;
                    for (int i = 0; i < length; i++) {
                        FlatMessage flatMessagePart = partitionFlatMessage[i];
                        if (flatMessagePart != null) {
                            produce(topicName, i, flatMessagePart);
                        }
                    }
                } else {
                    final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                    produce(topicName, partition, flatMessage);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Send flat message to kafka topic: [{}], packet: {}",
                        topicName,
                        JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                }
            }
        }
    }
}

@agapple

wangsaner added a commit to wangsaner/canal that referenced this issue Mar 27, 2019
非flat模式,当MQ主题为多分区时,数据拆分只取了for循环里面的最后一份数据,
导致消费端只能消费到一个分区的数据
agapple added a commit that referenced this issue Mar 28, 2019
@agapple agapple self-assigned this Mar 28, 2019
@agapple agapple added the bug label Mar 28, 2019
@agapple agapple added this to the v1.1.3 milestone Mar 28, 2019
@agapple agapple closed this as completed Mar 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants