From 8a78728167fb3f0b8fe51c21303095bf68410758 Mon Sep 17 00:00:00 2001 From: zhangjukai Date: Thu, 10 Feb 2022 16:41:18 +0800 Subject: [PATCH 1/7] 1. Add the com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer file to the META-INF/canal directory under the pulsarmq-connector project 2. Introduce pulsar-client-admin for Canal to automatically create multi-partition topics 3. Add the judgment that roleToken is null 4. The disconnect method in CanalPulsarMQConsumer removes this.pulsarMQConsumer.unsubscribe();, this code will cause data loss during stop 5. When getting Pulsar messages, they are all processed as flat messages, because CanalMessageSerializerUtil.deserializer(data) will deserialize exceptions 6. Use groupId as subscriptName, without the pulsarmq.subscriptName parameter, the entire adapter will be the same subscriber name using pulsarmq.subscriptName --- client/pom.xml | 5 + connector/pulsarmq-connector/pom.xml | 4 + .../pulsarmq/config/PulsarMQConstants.java | 4 + .../config/PulsarMQProducerConfig.java | 12 ++ .../consumer/CanalPulsarMQConsumer.java | 31 ++--- .../producer/CanalPulsarMQProducer.java | 110 +++++++++++++++--- ....canal.connector.core.spi.CanalMsgConsumer | 1 + pom.xml | 7 +- 8 files changed, 139 insertions(+), 35 deletions(-) create mode 100644 connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer diff --git a/client/pom.xml b/client/pom.xml index 70a6107613..5e06afd820 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -138,6 +138,11 @@ pulsar-client provided + + org.apache.pulsar + pulsar-client-admin + provided + diff --git a/connector/pulsarmq-connector/pom.xml b/connector/pulsarmq-connector/pom.xml index d1193944bd..5fc5e63aeb 100644 --- a/connector/pulsarmq-connector/pom.xml +++ b/connector/pulsarmq-connector/pom.xml @@ -38,6 +38,10 @@ org.apache.pulsar pulsar-client + + org.apache.pulsar + pulsar-client-admin + diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java index 19df91fe0c..55c8d80b32 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java @@ -57,6 +57,10 @@ public class PulsarMQConstants { * 最大重试次数 */ public static final String PULSARMQ_MAX_REDELIVERY_COUNT = ROOT + "." + "maxRedeliveryCount"; + /** + * Pulsar admin服务器地址 + */ + public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl"; } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java index c7fc9c9aea..a984e79e47 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java @@ -30,6 +30,10 @@ public class PulsarMQProducerConfig extends MQProperties { * 生产者角色权限,请确保该角色有canal使用的所有topic生产者权限(最低要求) */ private String roleToken; + /** + * admin服务器地址 + */ + private String adminServerUrl; public String getServerUrl() { return serverUrl; @@ -54,4 +58,12 @@ public String getTopicTenantPrefix() { public void setTopicTenantPrefix(String topicTenantPrefix) { this.topicTenantPrefix = topicTenantPrefix; } + + public String getAdminServerUrl() { + return adminServerUrl; + } + + public void setAdminServerUrl(String adminServerUrl) { + this.adminServerUrl = adminServerUrl; + } } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java index fc9369a109..f418f818d4 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java @@ -6,20 +6,14 @@ import com.alibaba.otter.canal.connector.core.consumer.CommonMessage; import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer; import com.alibaba.otter.canal.connector.core.spi.SPI; -import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil; -import com.alibaba.otter.canal.connector.core.util.MessageUtil; import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants; -import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.exception.CanalClientException; import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.client.api.*; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.TimeUnit; /** @@ -112,7 +106,9 @@ public void init(Properties properties, String topic, String groupId) { } this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL); this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN); - this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME); + //this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME); + // 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称 + this.subscriptName = groupId; if (StringUtils.isEmpty(this.subscriptName)) { throw new RuntimeException("Pulsar Consumer subscriptName required"); } @@ -157,10 +153,12 @@ public void connect() { } // 连接创建客户端 try { - pulsarClient = PulsarClient.builder() - .serviceUrl(serviceUrl) - .authentication(AuthenticationFactory.token(roleToken)) - .build(); + //AuthenticationDataProvider + ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl); + if(StringUtils.isNotEmpty(roleToken)) { + builder.authentication(AuthenticationFactory.token(roleToken)); + } + pulsarClient = builder.build(); } catch (PulsarClientException e) { throw new RuntimeException(e); } @@ -220,7 +218,6 @@ public List getMessage(Long timeout, TimeUnit unit) { List messageList = Lists.newArrayList(); try { Messages messages = pulsarMQConsumer.batchReceive(); - if (null == messages || messages.size() == 0) { return messageList; } @@ -228,14 +225,17 @@ public List getMessage(Long timeout, TimeUnit unit) { this.lastGetBatchMessage = messages; for (org.apache.pulsar.client.api.Message msg : messages) { byte[] data = msg.getData(); - if (!this.flatMessage) { + /*if (!this.flatMessage) { Message message = CanalMessageSerializerUtil.deserializer(data); List list = MessageUtil.convert(message); messageList.addAll(list); } else { CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class); messageList.add(commonMessage); - } + }*/ + // CanalMessageSerializerUtil.deserializer(data) 会转换失败 + CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class); + messageList.add(commonMessage); } } catch (PulsarClientException e) { throw new CanalClientException("Receive pulsar batch message error", e); @@ -278,7 +278,8 @@ public void disconnect() { return; } try { - this.pulsarMQConsumer.unsubscribe(); + // 会导致暂停期间数据丢失 + // this.pulsarMQConsumer.unsubscribe(); this.pulsarClient.close(); } catch (PulsarClientException e) { throw new CanalClientException("Disconnect pulsar consumer error", e); diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 4c2c8f38b5..636a863c98 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -17,6 +17,8 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.FlatMessage; import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; import org.apache.pulsar.shade.com.google.gson.JsonParser; import org.slf4j.Logger; @@ -50,6 +52,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ * pulsar客户端,管理连接 */ protected PulsarClient client; + /** + * Pulsar admin 客户端 + */ + protected PulsarAdmin pulsarAdmin; @Override public void init(Properties properties) { @@ -61,17 +67,28 @@ public void init(Properties properties) { // 初始化连接客户端 try { - client = PulsarClient.builder() + ClientBuilder builder = PulsarClient.builder() // 填写pulsar的连接地址 - .serviceUrl(pulsarMQProducerConfig.getServerUrl()) - // 角色权限认证的token - .authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken())) - .build(); + .serviceUrl(pulsarMQProducerConfig.getServerUrl()); + if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) { + // 角色权限认证的token + builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken())); + } + client = builder.build(); } catch (PulsarClientException e) { throw new RuntimeException(e); } - // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载 + // 初始化Pulsar admin + if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) { + try { + pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + } + + // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载 int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize(); sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize, parallelPartitionSendThreadSize, @@ -106,6 +123,10 @@ private void loadPulsarMQProperties(Properties properties) { if (!StringUtils.isEmpty(topicTenantPrefix)) { tmpProperties.setTopicTenantPrefix(topicTenantPrefix); } + String adminServerUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL); + if(!StringUtils.isEmpty(adminServerUrl)) { + tmpProperties.setAdminServerUrl(adminServerUrl); + } if (logger.isDebugEnabled()) { logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties)); } @@ -204,15 +225,16 @@ public void send(final MQDestination destination, String topicName, com.alibaba. for (int i = 0; i < len; i++) { final int partition = i; com.alibaba.otter.canal.protocol.Message m = messages[i]; + Integer topicPartitionNum = partitionNum; template.submit(() -> { - sendMessage(topicName, partition, m); + sendMessage(topicName, topicPartitionNum, partition, m); }); } } } else { // 默认分区 final int partition = destination.getPartition() != null ? destination.getPartition() : 0; - sendMessage(topicName, partition, message); + sendMessage(topicName, partitionNum, partition, message); } } else { // 串行分区 @@ -244,9 +266,10 @@ public void send(final MQDestination destination, String topicName, com.alibaba. final List flatMessagePart = partitionFlatMessages.get(i); if (flatMessagePart != null && flatMessagePart.size() > 0) { final int partition = i; + Integer topicPartitionNum = partitionNum; template.submit(() -> { // 批量发送 - sendMessage(topicName, partition, flatMessagePart); + sendMessage(topicName, topicPartitionNum, partition, flatMessagePart); }); } } @@ -256,7 +279,7 @@ public void send(final MQDestination destination, String topicName, com.alibaba. } else { // 默认分区 final int partition = destination.getPartition() != null ? destination.getPartition() : 0; - sendMessage(topicName, partition, flatMessages); + sendMessage(topicName, partitionNum, partition, flatMessages); } } } @@ -265,19 +288,20 @@ public void send(final MQDestination destination, String topicName, com.alibaba. * 发送原始消息,需要做分区处理 * * @param topic topic - * @param partitionNum 目标分区 + * @param topicPartitionNum topic分区数据 + * @param targetPartitionNum 目标分区 * @param msg 原始消息内容 * @return void * @date 2021/9/10 17:55 * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) { - Producer producer = getProducer(topic); + private void sendMessage(String topic,Integer topicPartitionNum, int targetPartitionNum, com.alibaba.otter.canal.protocol.Message msg) { + Producer producer = getProducer(topic,topicPartitionNum); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(targetPartitionNum)) .value(msgBytes).send(); // todo 判断发送结果 if (logger.isDebugEnabled()) { @@ -298,13 +322,13 @@ private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic, int partition, List flatMessages) { - Producer producer = getProducer(topic); + private void sendMessage(String topic, Integer topicPartitionNum, int targetPartitionNum, List flatMessages) { + Producer producer = getProducer(topic,topicPartitionNum); for (FlatMessage f : flatMessages) { try { MessageId msgResultId = producer .newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(targetPartitionNum)) .value(JSON.toJSONBytes(f, SerializerFeature.WriteMapNullValue)) .send() // @@ -318,6 +342,45 @@ private void sendMessage(String topic, int partition, List flatMess } } + private Producer getProducer(String topic) { + Producer producer = PRODUCERS.get(topic); + + if (null == producer) { + try { + synchronized (PRODUCERS) { + producer = PRODUCERS.get(topic); + if (null != producer) { + return producer; + } + + // 拼接topic前缀 + PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties; + String prefix = pulsarMQProperties.getTopicTenantPrefix(); + String fullTopic = topic; + if (!StringUtils.isEmpty(prefix)) { + if (!prefix.endsWith("/")) { + fullTopic = "/" + fullTopic; + } + fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic; + } + + // 创建指定topic的生产者 + producer = client.newProducer() + .topic(fullTopic) + // 指定路由器 + .messageRouter(new MessageRouterImpl(topic)) + .create(); + // 放入缓存 + PRODUCERS.put(topic, producer); + } + } catch (PulsarClientException e) { + logger.error("create producer failed for topic: " + topic, e); + throw new RuntimeException(e); + } + } + + return producer; + } /** * 获取指定topic的生产者,并且使用缓存 * @@ -327,7 +390,7 @@ private void sendMessage(String topic, int partition, List flatMess * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private Producer getProducer(String topic) { + private Producer getProducer(String topic,Integer partitionNum) { Producer producer = PRODUCERS.get(topic); if (null == producer) { @@ -349,6 +412,15 @@ private Producer getProducer(String topic) { fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic; } + // 创建分区topic + if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0) { + try { + pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum); + } catch (PulsarAdminException e) { + // TODO 无论是否报错,都继续后续的操作,此处不进行阻塞 + } + } + // 创建指定topic的生产者 producer = client.newProducer() .topic(fullTopic) @@ -405,7 +477,7 @@ public int choosePartition(Message msg, TopicMetadata metadata) { @Override public void stop() { - logger.info("## Stop RocketMQ producer##"); + logger.info("## Stop PulsarMQ producer##"); for (Producer p : PRODUCERS.values()) { try { diff --git a/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer b/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer new file mode 100644 index 0000000000..54cdb8aab6 --- /dev/null +++ b/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer @@ -0,0 +1 @@ +pulsarmq=com.alibaba.otter.canal.connector.pulsarmq.consumer.CanalPulsarMQConsumer \ No newline at end of file diff --git a/pom.xml b/pom.xml index 65605933a0..f84a81ef0c 100644 --- a/pom.xml +++ b/pom.xml @@ -260,7 +260,7 @@ com.alibaba druid - 1.2.6 + 1.2.8 com.lmax @@ -346,6 +346,11 @@ pulsar-client 2.8.1 + + org.apache.pulsar + pulsar-client-admin + 2.8.1 + From 49bf3527e988b27a8e879b131cc1a3c5c504e188 Mon Sep 17 00:00:00 2001 From: zhangjukai Date: Thu, 10 Feb 2022 18:17:28 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E9=80=BB=E8=BE=91=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../producer/CanalPulsarMQProducer.java | 65 ++++--------------- 1 file changed, 13 insertions(+), 52 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 636a863c98..874e9d8b39 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -203,6 +203,9 @@ public void send(final MQDestination destination, String topicName, com.alibaba. if (partitionNum == null) { partitionNum = destination.getPartitionsNum(); } + // 提前获取Producer对象,同时支持多分区topic的创建 + Producer producer = getProducer(topicName, partitionNum); + ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor); // 并发构造 MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor); @@ -225,16 +228,15 @@ public void send(final MQDestination destination, String topicName, com.alibaba. for (int i = 0; i < len; i++) { final int partition = i; com.alibaba.otter.canal.protocol.Message m = messages[i]; - Integer topicPartitionNum = partitionNum; template.submit(() -> { - sendMessage(topicName, topicPartitionNum, partition, m); + sendMessage(topicName, partition, m); }); } } } else { // 默认分区 final int partition = destination.getPartition() != null ? destination.getPartition() : 0; - sendMessage(topicName, partitionNum, partition, message); + sendMessage(topicName, partition, message); } } else { // 串行分区 @@ -266,10 +268,9 @@ public void send(final MQDestination destination, String topicName, com.alibaba. final List flatMessagePart = partitionFlatMessages.get(i); if (flatMessagePart != null && flatMessagePart.size() > 0) { final int partition = i; - Integer topicPartitionNum = partitionNum; template.submit(() -> { // 批量发送 - sendMessage(topicName, topicPartitionNum, partition, flatMessagePart); + sendMessage(topicName, partition, flatMessagePart); }); } } @@ -279,7 +280,7 @@ public void send(final MQDestination destination, String topicName, com.alibaba. } else { // 默认分区 final int partition = destination.getPartition() != null ? destination.getPartition() : 0; - sendMessage(topicName, partitionNum, partition, flatMessages); + sendMessage(topicName, partition, flatMessages); } } } @@ -288,20 +289,19 @@ public void send(final MQDestination destination, String topicName, com.alibaba. * 发送原始消息,需要做分区处理 * * @param topic topic - * @param topicPartitionNum topic分区数据 - * @param targetPartitionNum 目标分区 + * @param partitionNum 目标分区 * @param msg 原始消息内容 * @return void * @date 2021/9/10 17:55 * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic,Integer topicPartitionNum, int targetPartitionNum, com.alibaba.otter.canal.protocol.Message msg) { - Producer producer = getProducer(topic,topicPartitionNum); + private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) { + Producer producer = getProducer(topic,null); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(targetPartitionNum)) + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) .value(msgBytes).send(); // todo 判断发送结果 if (logger.isDebugEnabled()) { @@ -322,8 +322,8 @@ private void sendMessage(String topic,Integer topicPartitionNum, int targetParti * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic, Integer topicPartitionNum, int targetPartitionNum, List flatMessages) { - Producer producer = getProducer(topic,topicPartitionNum); + private void sendMessage(String topic, int targetPartitionNum, List flatMessages) { + Producer producer = getProducer(topic,null); for (FlatMessage f : flatMessages) { try { MessageId msgResultId = producer @@ -342,45 +342,6 @@ private void sendMessage(String topic, Integer topicPartitionNum, int targetPart } } - private Producer getProducer(String topic) { - Producer producer = PRODUCERS.get(topic); - - if (null == producer) { - try { - synchronized (PRODUCERS) { - producer = PRODUCERS.get(topic); - if (null != producer) { - return producer; - } - - // 拼接topic前缀 - PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties; - String prefix = pulsarMQProperties.getTopicTenantPrefix(); - String fullTopic = topic; - if (!StringUtils.isEmpty(prefix)) { - if (!prefix.endsWith("/")) { - fullTopic = "/" + fullTopic; - } - fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic; - } - - // 创建指定topic的生产者 - producer = client.newProducer() - .topic(fullTopic) - // 指定路由器 - .messageRouter(new MessageRouterImpl(topic)) - .create(); - // 放入缓存 - PRODUCERS.put(topic, producer); - } - } catch (PulsarClientException e) { - logger.error("create producer failed for topic: " + topic, e); - throw new RuntimeException(e); - } - } - - return producer; - } /** * 获取指定topic的生产者,并且使用缓存 * From 81de668986a9749a95ad847b8b1bcb0564794ad1 Mon Sep 17 00:00:00 2001 From: zhangjukai Date: Thu, 10 Feb 2022 18:29:40 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pulsarmq/producer/CanalPulsarMQProducer.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 874e9d8b39..15edef5d16 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -289,19 +289,19 @@ public void send(final MQDestination destination, String topicName, com.alibaba. * 发送原始消息,需要做分区处理 * * @param topic topic - * @param partitionNum 目标分区 + * @param partition 目标分区 * @param msg 原始消息内容 * @return void * @date 2021/9/10 17:55 * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) { - Producer producer = getProducer(topic,null); + private void sendMessage(String topic, int partition, com.alibaba.otter.canal.protocol.Message msg) { + Producer producer = PRODUCERS.get(topic); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) .value(msgBytes).send(); // todo 判断发送结果 if (logger.isDebugEnabled()) { @@ -322,13 +322,13 @@ private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic, int targetPartitionNum, List flatMessages) { - Producer producer = getProducer(topic,null); + private void sendMessage(String topic, int partition, List flatMessages) { + Producer producer = PRODUCERS.get(topic); for (FlatMessage f : flatMessages) { try { MessageId msgResultId = producer .newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(targetPartitionNum)) + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) .value(JSON.toJSONBytes(f, SerializerFeature.WriteMapNullValue)) .send() // From 4cf1765fac32885d462893705f5de9436154f2b0 Mon Sep 17 00:00:00 2001 From: zhangjukai Date: Fri, 11 Feb 2022 10:38:38 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E9=92=88=E5=AF=B9pulsar=E7=9A=84=E8=B0=83?= =?UTF-8?q?=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/pom.xml | 5 ++ connector/pulsarmq-connector/pom.xml | 4 ++ .../pulsarmq/config/PulsarMQConstants.java | 4 ++ .../config/PulsarMQProducerConfig.java | 12 ++++ .../consumer/CanalPulsarMQConsumer.java | 31 +++++----- .../producer/CanalPulsarMQProducer.java | 59 +++++++++++++++---- ....canal.connector.core.spi.CanalMsgConsumer | 1 + pom.xml | 7 ++- 8 files changed, 94 insertions(+), 29 deletions(-) create mode 100644 connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer diff --git a/client/pom.xml b/client/pom.xml index 70a6107613..5e06afd820 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -138,6 +138,11 @@ pulsar-client provided + + org.apache.pulsar + pulsar-client-admin + provided + diff --git a/connector/pulsarmq-connector/pom.xml b/connector/pulsarmq-connector/pom.xml index d1193944bd..5fc5e63aeb 100644 --- a/connector/pulsarmq-connector/pom.xml +++ b/connector/pulsarmq-connector/pom.xml @@ -38,6 +38,10 @@ org.apache.pulsar pulsar-client + + org.apache.pulsar + pulsar-client-admin + diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java index 19df91fe0c..55c8d80b32 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java @@ -57,6 +57,10 @@ public class PulsarMQConstants { * 最大重试次数 */ public static final String PULSARMQ_MAX_REDELIVERY_COUNT = ROOT + "." + "maxRedeliveryCount"; + /** + * Pulsar admin服务器地址 + */ + public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl"; } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java index c7fc9c9aea..a984e79e47 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java @@ -30,6 +30,10 @@ public class PulsarMQProducerConfig extends MQProperties { * 生产者角色权限,请确保该角色有canal使用的所有topic生产者权限(最低要求) */ private String roleToken; + /** + * admin服务器地址 + */ + private String adminServerUrl; public String getServerUrl() { return serverUrl; @@ -54,4 +58,12 @@ public String getTopicTenantPrefix() { public void setTopicTenantPrefix(String topicTenantPrefix) { this.topicTenantPrefix = topicTenantPrefix; } + + public String getAdminServerUrl() { + return adminServerUrl; + } + + public void setAdminServerUrl(String adminServerUrl) { + this.adminServerUrl = adminServerUrl; + } } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java index fc9369a109..f418f818d4 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java @@ -6,20 +6,14 @@ import com.alibaba.otter.canal.connector.core.consumer.CommonMessage; import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer; import com.alibaba.otter.canal.connector.core.spi.SPI; -import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil; -import com.alibaba.otter.canal.connector.core.util.MessageUtil; import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants; -import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.exception.CanalClientException; import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.client.api.*; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.TimeUnit; /** @@ -112,7 +106,9 @@ public void init(Properties properties, String topic, String groupId) { } this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL); this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN); - this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME); + //this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME); + // 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称 + this.subscriptName = groupId; if (StringUtils.isEmpty(this.subscriptName)) { throw new RuntimeException("Pulsar Consumer subscriptName required"); } @@ -157,10 +153,12 @@ public void connect() { } // 连接创建客户端 try { - pulsarClient = PulsarClient.builder() - .serviceUrl(serviceUrl) - .authentication(AuthenticationFactory.token(roleToken)) - .build(); + //AuthenticationDataProvider + ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl); + if(StringUtils.isNotEmpty(roleToken)) { + builder.authentication(AuthenticationFactory.token(roleToken)); + } + pulsarClient = builder.build(); } catch (PulsarClientException e) { throw new RuntimeException(e); } @@ -220,7 +218,6 @@ public List getMessage(Long timeout, TimeUnit unit) { List messageList = Lists.newArrayList(); try { Messages messages = pulsarMQConsumer.batchReceive(); - if (null == messages || messages.size() == 0) { return messageList; } @@ -228,14 +225,17 @@ public List getMessage(Long timeout, TimeUnit unit) { this.lastGetBatchMessage = messages; for (org.apache.pulsar.client.api.Message msg : messages) { byte[] data = msg.getData(); - if (!this.flatMessage) { + /*if (!this.flatMessage) { Message message = CanalMessageSerializerUtil.deserializer(data); List list = MessageUtil.convert(message); messageList.addAll(list); } else { CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class); messageList.add(commonMessage); - } + }*/ + // CanalMessageSerializerUtil.deserializer(data) 会转换失败 + CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class); + messageList.add(commonMessage); } } catch (PulsarClientException e) { throw new CanalClientException("Receive pulsar batch message error", e); @@ -278,7 +278,8 @@ public void disconnect() { return; } try { - this.pulsarMQConsumer.unsubscribe(); + // 会导致暂停期间数据丢失 + // this.pulsarMQConsumer.unsubscribe(); this.pulsarClient.close(); } catch (PulsarClientException e) { throw new CanalClientException("Disconnect pulsar consumer error", e); diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 4c2c8f38b5..15edef5d16 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -17,6 +17,8 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.FlatMessage; import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; import org.apache.pulsar.shade.com.google.gson.JsonParser; import org.slf4j.Logger; @@ -50,6 +52,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ * pulsar客户端,管理连接 */ protected PulsarClient client; + /** + * Pulsar admin 客户端 + */ + protected PulsarAdmin pulsarAdmin; @Override public void init(Properties properties) { @@ -61,17 +67,28 @@ public void init(Properties properties) { // 初始化连接客户端 try { - client = PulsarClient.builder() + ClientBuilder builder = PulsarClient.builder() // 填写pulsar的连接地址 - .serviceUrl(pulsarMQProducerConfig.getServerUrl()) - // 角色权限认证的token - .authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken())) - .build(); + .serviceUrl(pulsarMQProducerConfig.getServerUrl()); + if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) { + // 角色权限认证的token + builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken())); + } + client = builder.build(); } catch (PulsarClientException e) { throw new RuntimeException(e); } - // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载 + // 初始化Pulsar admin + if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) { + try { + pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + } + + // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载 int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize(); sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize, parallelPartitionSendThreadSize, @@ -106,6 +123,10 @@ private void loadPulsarMQProperties(Properties properties) { if (!StringUtils.isEmpty(topicTenantPrefix)) { tmpProperties.setTopicTenantPrefix(topicTenantPrefix); } + String adminServerUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL); + if(!StringUtils.isEmpty(adminServerUrl)) { + tmpProperties.setAdminServerUrl(adminServerUrl); + } if (logger.isDebugEnabled()) { logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties)); } @@ -182,6 +203,9 @@ public void send(final MQDestination destination, String topicName, com.alibaba. if (partitionNum == null) { partitionNum = destination.getPartitionsNum(); } + // 提前获取Producer对象,同时支持多分区topic的创建 + Producer producer = getProducer(topicName, partitionNum); + ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor); // 并发构造 MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor); @@ -265,19 +289,19 @@ public void send(final MQDestination destination, String topicName, com.alibaba. * 发送原始消息,需要做分区处理 * * @param topic topic - * @param partitionNum 目标分区 + * @param partition 目标分区 * @param msg 原始消息内容 * @return void * @date 2021/9/10 17:55 * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) { - Producer producer = getProducer(topic); + private void sendMessage(String topic, int partition, com.alibaba.otter.canal.protocol.Message msg) { + Producer producer = PRODUCERS.get(topic); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) .value(msgBytes).send(); // todo 判断发送结果 if (logger.isDebugEnabled()) { @@ -299,7 +323,7 @@ private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal * @since 1 by chad at 2021/9/10 新增 */ private void sendMessage(String topic, int partition, List flatMessages) { - Producer producer = getProducer(topic); + Producer producer = PRODUCERS.get(topic); for (FlatMessage f : flatMessages) { try { MessageId msgResultId = producer @@ -327,7 +351,7 @@ private void sendMessage(String topic, int partition, List flatMess * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private Producer getProducer(String topic) { + private Producer getProducer(String topic,Integer partitionNum) { Producer producer = PRODUCERS.get(topic); if (null == producer) { @@ -349,6 +373,15 @@ private Producer getProducer(String topic) { fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic; } + // 创建分区topic + if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0) { + try { + pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum); + } catch (PulsarAdminException e) { + // TODO 无论是否报错,都继续后续的操作,此处不进行阻塞 + } + } + // 创建指定topic的生产者 producer = client.newProducer() .topic(fullTopic) @@ -405,7 +438,7 @@ public int choosePartition(Message msg, TopicMetadata metadata) { @Override public void stop() { - logger.info("## Stop RocketMQ producer##"); + logger.info("## Stop PulsarMQ producer##"); for (Producer p : PRODUCERS.values()) { try { diff --git a/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer b/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer new file mode 100644 index 0000000000..54cdb8aab6 --- /dev/null +++ b/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer @@ -0,0 +1 @@ +pulsarmq=com.alibaba.otter.canal.connector.pulsarmq.consumer.CanalPulsarMQConsumer \ No newline at end of file diff --git a/pom.xml b/pom.xml index 65605933a0..f84a81ef0c 100644 --- a/pom.xml +++ b/pom.xml @@ -260,7 +260,7 @@ com.alibaba druid - 1.2.6 + 1.2.8 com.lmax @@ -346,6 +346,11 @@ pulsar-client 2.8.1 + + org.apache.pulsar + pulsar-client-admin + 2.8.1 + From b18a98bd4a2ca895c6f10fe831955075abaaef2d Mon Sep 17 00:00:00 2001 From: zhangjukai Date: Fri, 11 Feb 2022 13:44:49 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E6=81=A2=E5=A4=8D=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../connector/pulsarmq/producer/CanalPulsarMQProducer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 15edef5d16..7b51f21bb0 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -289,19 +289,19 @@ public void send(final MQDestination destination, String topicName, com.alibaba. * 发送原始消息,需要做分区处理 * * @param topic topic - * @param partition 目标分区 + * @param partitionNum 目标分区 * @param msg 原始消息内容 * @return void * @date 2021/9/10 17:55 * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic, int partition, com.alibaba.otter.canal.protocol.Message msg) { + private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) { Producer producer = PRODUCERS.get(topic); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) .value(msgBytes).send(); // todo 判断发送结果 if (logger.isDebugEnabled()) { From c229f2ed490cc25b4e5a4e561cd39d2377e3630d Mon Sep 17 00:00:00 2001 From: zhangjukai Date: Fri, 11 Feb 2022 13:50:00 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E6=81=A2=E5=A4=8D=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../connector/pulsarmq/producer/CanalPulsarMQProducer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 15edef5d16..7b51f21bb0 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -289,19 +289,19 @@ public void send(final MQDestination destination, String topicName, com.alibaba. * 发送原始消息,需要做分区处理 * * @param topic topic - * @param partition 目标分区 + * @param partitionNum 目标分区 * @param msg 原始消息内容 * @return void * @date 2021/9/10 17:55 * @author chad * @since 1 by chad at 2021/9/10 新增 */ - private void sendMessage(String topic, int partition, com.alibaba.otter.canal.protocol.Message msg) { + private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) { Producer producer = PRODUCERS.get(topic); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { MessageId msgResultId = producer.newMessage() - .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) + .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum)) .value(msgBytes).send(); // todo 判断发送结果 if (logger.isDebugEnabled()) { From 7bc024f074b63f8f2ec9cd0af1f4d66d3918d296 Mon Sep 17 00:00:00 2001 From: zhangjukai Date: Mon, 28 Feb 2022 10:02:25 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E5=A4=84=E7=90=86Producer=E5=A4=B1?= =?UTF-8?q?=E6=95=88=E7=9A=84=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../producer/CanalPulsarMQProducer.java | 58 +++++++++++-------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 7b51f21bb0..53c48b3c69 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -203,8 +203,10 @@ public void send(final MQDestination destination, String topicName, com.alibaba. if (partitionNum == null) { partitionNum = destination.getPartitionsNum(); } - // 提前获取Producer对象,同时支持多分区topic的创建 - Producer producer = getProducer(topicName, partitionNum); + // 创建多分区topic + if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0 && PRODUCERS.get(topicName)==null) { + createMultipleTopic(topicName, partitionNum); + } ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor); // 并发构造 @@ -297,7 +299,7 @@ public void send(final MQDestination destination, String topicName, com.alibaba. * @since 1 by chad at 2021/9/10 新增 */ private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) { - Producer producer = PRODUCERS.get(topic); + Producer producer = getProducer(topic); byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry()); try { MessageId msgResultId = producer.newMessage() @@ -323,7 +325,7 @@ private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal * @since 1 by chad at 2021/9/10 新增 */ private void sendMessage(String topic, int partition, List flatMessages) { - Producer producer = PRODUCERS.get(topic); + Producer producer = getProducer(topic); for (FlatMessage f : flatMessages) { try { MessageId msgResultId = producer @@ -343,22 +345,41 @@ private void sendMessage(String topic, int partition, List flatMess } /** - * 获取指定topic的生产者,并且使用缓存 - * + * 创建多分区topic * @param topic - * @return org.apache.pulsar.client.api.Producer - * @date 2021/9/10 11:21 - * @author chad - * @since 1 by chad at 2021/9/10 新增 + * @param partitionNum */ - private Producer getProducer(String topic,Integer partitionNum) { - Producer producer = PRODUCERS.get(topic); + private void createMultipleTopic(String topic,Integer partitionNum) { + // 拼接topic前缀 + PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties; + String prefix = pulsarMQProperties.getTopicTenantPrefix(); + String fullTopic = topic; + if (!StringUtils.isEmpty(prefix)) { + if (!prefix.endsWith("/")) { + fullTopic = "/" + fullTopic; + } + fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic; + } - if (null == producer) { + // 创建分区topic + try { + pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum); + } catch (PulsarAdminException e) { + // TODO 无论是否报错,都继续后续的操作,此处不进行阻塞 + } + } + /** + * 获取topic + * @param topic + * @return + */ + private Producer getProducer(String topic) { + Producer producer = PRODUCERS.get(topic); + if (null == producer || !producer.isConnected()) { try { synchronized (PRODUCERS) { producer = PRODUCERS.get(topic); - if (null != producer) { + if (null != producer && producer.isConnected()) { return producer; } @@ -373,15 +394,6 @@ private Producer getProducer(String topic,Integer partitionNum) { fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic; } - // 创建分区topic - if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0) { - try { - pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum); - } catch (PulsarAdminException e) { - // TODO 无论是否报错,都继续后续的操作,此处不进行阻塞 - } - } - // 创建指定topic的生产者 producer = client.newProducer() .topic(fullTopic)