Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

canal集成kafka,配置多个destinations,造成kafka事务异常 #1826

Closed
2 tasks done
nieyuan1991 opened this issue May 21, 2019 · 4 comments
Closed
2 tasks done
Labels
Milestone

Comments

@nieyuan1991
Copy link
Contributor

  • I have searched the issues of this repository and believe that this is not a duplicate.
  • I have checked the FAQ of this repository and believe that this is not a duplicate.

environment

  • canal version 1.1.3
  • mysql version 5.7.22
  • kafka version 1.1.1

Issue Description

canal集成kafka,配置多个destinations,造成kafka事务异常。
每一个destination起一个线程订阅canal,当线程1执行完beginTransaction()时,线程2执行beginTransaction()就会造成kafka事务异常,如下:

exception trace:

ERROR com.alibaba.otter.canal.server.CanalMQStarter - TransactionalId canal-transactional-id: Invalid transition attempted from state IN_TRANSACT
ION to state IN_TRANSACTION
org.apache.kafka.common.KafkaException: TransactionalId canal-transactional-id: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758) ~[kafka-clients-1.1.1.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751) ~[kafka-clients-1.1.1.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216) ~[kafka-clients-1.1.1.jar:na]
        at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:587) ~[kafka-clients-1.1.1.jar:na]
        at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:106) ~[canal.server-1.1.3.jar:na]
        at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:182) [canal.server-1.1.3.jar:na]
        at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:22) [canal.server-1.1.3.jar:na]
        at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:224) [canal.server-1.1.3.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
@agapple
Copy link
Member

agapple commented Jun 19, 2019

之前设计的时候考虑写出MQ主要是单例模式,对于多个队列同时的事务发送目前是有问题.

规避的办法:

  1. 一个canal server只配置一个发送队列
  2. 关闭kafka的事务消息模式
  3. 针对每个队列,单独使用一个producer

@agapple agapple added the bug label Jun 19, 2019
@agapple agapple added this to the v1.1.4 milestone Jun 19, 2019
@hookover
Copy link

如何针对每个队列,单独使用一个producer
@agapple

@MrYueQ
Copy link

MrYueQ commented Jun 24, 2019

如何针对每个队列,单独使用一个producer
@agapple

一个队列,一个producer 。 可以直接理解为 一个kafka topic 对应一个 canal instance

@agapple
Copy link
Member

agapple commented Jun 25, 2019

后面我会调整一下kafka的并发模型,去掉transaction模型.

大致的思路:

  1. 提前计算一批数据的mq partition
  2. 多个partition队列的数据采用并行提交,单个partition采用kafka的batch提交模式,采用异步模式+callback,确保partition分区的最后一条数据的callbak后才认为提交完成

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants