diff --git a/client/pom.xml b/client/pom.xml
index 70a610761..5e06afd82 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -138,6 +138,11 @@
pulsar-client
provided
+
+ org.apache.pulsar
+ pulsar-client-admin
+ provided
+
diff --git a/connector/pulsarmq-connector/pom.xml b/connector/pulsarmq-connector/pom.xml
index d1193944b..5fc5e63ae 100644
--- a/connector/pulsarmq-connector/pom.xml
+++ b/connector/pulsarmq-connector/pom.xml
@@ -38,6 +38,10 @@
org.apache.pulsar
pulsar-client
+
+ org.apache.pulsar
+ pulsar-client-admin
+
diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java
index 19df91fe0..55c8d80b3 100644
--- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java
+++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java
@@ -57,6 +57,10 @@ public class PulsarMQConstants {
* 最大重试次数
*/
public static final String PULSARMQ_MAX_REDELIVERY_COUNT = ROOT + "." + "maxRedeliveryCount";
+ /**
+ * Pulsar admin服务器地址
+ */
+ public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl";
}
diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java
index c7fc9c9ae..a984e79e4 100644
--- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java
+++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java
@@ -30,6 +30,10 @@ public class PulsarMQProducerConfig extends MQProperties {
* 生产者角色权限,请确保该角色有canal使用的所有topic生产者权限(最低要求)
*/
private String roleToken;
+ /**
+ * admin服务器地址
+ */
+ private String adminServerUrl;
public String getServerUrl() {
return serverUrl;
@@ -54,4 +58,12 @@ public String getTopicTenantPrefix() {
public void setTopicTenantPrefix(String topicTenantPrefix) {
this.topicTenantPrefix = topicTenantPrefix;
}
+
+ public String getAdminServerUrl() {
+ return adminServerUrl;
+ }
+
+ public void setAdminServerUrl(String adminServerUrl) {
+ this.adminServerUrl = adminServerUrl;
+ }
}
diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java
index fc9369a10..f418f818d 100644
--- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java
+++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java
@@ -6,20 +6,14 @@
import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
-import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
-import com.alibaba.otter.canal.connector.core.util.MessageUtil;
import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
-import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.*;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Properties;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
@@ -112,7 +106,9 @@ public void init(Properties properties, String topic, String groupId) {
}
this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
- this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
+ //this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
+ // 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称
+ this.subscriptName = groupId;
if (StringUtils.isEmpty(this.subscriptName)) {
throw new RuntimeException("Pulsar Consumer subscriptName required");
}
@@ -157,10 +153,12 @@ public void connect() {
}
// 连接创建客户端
try {
- pulsarClient = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .authentication(AuthenticationFactory.token(roleToken))
- .build();
+ //AuthenticationDataProvider
+ ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl);
+ if(StringUtils.isNotEmpty(roleToken)) {
+ builder.authentication(AuthenticationFactory.token(roleToken));
+ }
+ pulsarClient = builder.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
@@ -220,7 +218,6 @@ public List getMessage(Long timeout, TimeUnit unit) {
List messageList = Lists.newArrayList();
try {
Messages messages = pulsarMQConsumer.batchReceive();
-
if (null == messages || messages.size() == 0) {
return messageList;
}
@@ -228,14 +225,17 @@ public List getMessage(Long timeout, TimeUnit unit) {
this.lastGetBatchMessage = messages;
for (org.apache.pulsar.client.api.Message msg : messages) {
byte[] data = msg.getData();
- if (!this.flatMessage) {
+ /*if (!this.flatMessage) {
Message message = CanalMessageSerializerUtil.deserializer(data);
List list = MessageUtil.convert(message);
messageList.addAll(list);
} else {
CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
messageList.add(commonMessage);
- }
+ }*/
+ // CanalMessageSerializerUtil.deserializer(data) 会转换失败
+ CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
+ messageList.add(commonMessage);
}
} catch (PulsarClientException e) {
throw new CanalClientException("Receive pulsar batch message error", e);
@@ -278,7 +278,8 @@ public void disconnect() {
return;
}
try {
- this.pulsarMQConsumer.unsubscribe();
+ // 会导致暂停期间数据丢失
+ // this.pulsarMQConsumer.unsubscribe();
this.pulsarClient.close();
} catch (PulsarClientException e) {
throw new CanalClientException("Disconnect pulsar consumer error", e);
diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java
index 4c2c8f38b..53c48b3c6 100644
--- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java
+++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java
@@ -17,6 +17,8 @@
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
import org.slf4j.Logger;
@@ -50,6 +52,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
* pulsar客户端,管理连接
*/
protected PulsarClient client;
+ /**
+ * Pulsar admin 客户端
+ */
+ protected PulsarAdmin pulsarAdmin;
@Override
public void init(Properties properties) {
@@ -61,17 +67,28 @@ public void init(Properties properties) {
// 初始化连接客户端
try {
- client = PulsarClient.builder()
+ ClientBuilder builder = PulsarClient.builder()
// 填写pulsar的连接地址
- .serviceUrl(pulsarMQProducerConfig.getServerUrl())
- // 角色权限认证的token
- .authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()))
- .build();
+ .serviceUrl(pulsarMQProducerConfig.getServerUrl());
+ if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) {
+ // 角色权限认证的token
+ builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
+ }
+ client = builder.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
- // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
+ // 初始化Pulsar admin
+ if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) {
+ try {
+ pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
parallelPartitionSendThreadSize,
@@ -106,6 +123,10 @@ private void loadPulsarMQProperties(Properties properties) {
if (!StringUtils.isEmpty(topicTenantPrefix)) {
tmpProperties.setTopicTenantPrefix(topicTenantPrefix);
}
+ String adminServerUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL);
+ if(!StringUtils.isEmpty(adminServerUrl)) {
+ tmpProperties.setAdminServerUrl(adminServerUrl);
+ }
if (logger.isDebugEnabled()) {
logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
}
@@ -182,6 +203,11 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
+ // 创建多分区topic
+ if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0 && PRODUCERS.get(topicName)==null) {
+ createMultipleTopic(topicName, partitionNum);
+ }
+
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
@@ -319,22 +345,41 @@ private void sendMessage(String topic, int partition, List flatMess
}
/**
- * 获取指定topic的生产者,并且使用缓存
- *
+ * 创建多分区topic
* @param topic
- * @return org.apache.pulsar.client.api.Producer
- * @date 2021/9/10 11:21
- * @author chad
- * @since 1 by chad at 2021/9/10 新增
+ * @param partitionNum
+ */
+ private void createMultipleTopic(String topic,Integer partitionNum) {
+ // 拼接topic前缀
+ PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties;
+ String prefix = pulsarMQProperties.getTopicTenantPrefix();
+ String fullTopic = topic;
+ if (!StringUtils.isEmpty(prefix)) {
+ if (!prefix.endsWith("/")) {
+ fullTopic = "/" + fullTopic;
+ }
+ fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic;
+ }
+
+ // 创建分区topic
+ try {
+ pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum);
+ } catch (PulsarAdminException e) {
+ // TODO 无论是否报错,都继续后续的操作,此处不进行阻塞
+ }
+ }
+ /**
+ * 获取topic
+ * @param topic
+ * @return
*/
private Producer getProducer(String topic) {
Producer producer = PRODUCERS.get(topic);
-
- if (null == producer) {
+ if (null == producer || !producer.isConnected()) {
try {
synchronized (PRODUCERS) {
producer = PRODUCERS.get(topic);
- if (null != producer) {
+ if (null != producer && producer.isConnected()) {
return producer;
}
@@ -405,7 +450,7 @@ public int choosePartition(Message> msg, TopicMetadata metadata) {
@Override
public void stop() {
- logger.info("## Stop RocketMQ producer##");
+ logger.info("## Stop PulsarMQ producer##");
for (Producer p : PRODUCERS.values()) {
try {
diff --git a/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer b/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer
new file mode 100644
index 000000000..54cdb8aab
--- /dev/null
+++ b/connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer
@@ -0,0 +1 @@
+pulsarmq=com.alibaba.otter.canal.connector.pulsarmq.consumer.CanalPulsarMQConsumer
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 65605933a..f84a81ef0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -260,7 +260,7 @@
com.alibaba
druid
- 1.2.6
+ 1.2.8
com.lmax
@@ -346,6 +346,11 @@
pulsar-client
2.8.1
+
+ org.apache.pulsar
+ pulsar-client-admin
+ 2.8.1
+