Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some tweaks for pulsarmq-connector #4060

Merged
merged 8 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@
<artifactId>pulsar-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<scope>provided</scope>
</dependency>

<!-- junit -->
<dependency>
Expand Down
4 changes: 4 additions & 0 deletions connector/pulsarmq-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class PulsarMQConstants {
* 最大重试次数
*/
public static final String PULSARMQ_MAX_REDELIVERY_COUNT = ROOT + "." + "maxRedeliveryCount";
/**
* Pulsar admin服务器地址
*/
public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl";


}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class PulsarMQProducerConfig extends MQProperties {
* 生产者角色权限,请确保该角色有canal使用的所有topic生产者权限(最低要求)
*/
private String roleToken;
/**
* admin服务器地址
*/
private String adminServerUrl;

public String getServerUrl() {
return serverUrl;
Expand All @@ -54,4 +58,12 @@ public String getTopicTenantPrefix() {
public void setTopicTenantPrefix(String topicTenantPrefix) {
this.topicTenantPrefix = topicTenantPrefix;
}

public String getAdminServerUrl() {
return adminServerUrl;
}

public void setAdminServerUrl(String adminServerUrl) {
this.adminServerUrl = adminServerUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@
import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
import com.alibaba.otter.canal.connector.core.util.MessageUtil;
import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.*;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -112,7 +106,9 @@ public void init(Properties properties, String topic, String groupId) {
}
this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
//this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
// 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称
this.subscriptName = groupId;
if (StringUtils.isEmpty(this.subscriptName)) {
throw new RuntimeException("Pulsar Consumer subscriptName required");
}
Expand Down Expand Up @@ -157,10 +153,12 @@ public void connect() {
}
// 连接创建客户端
try {
pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(roleToken))
.build();
//AuthenticationDataProvider
ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl);
if(StringUtils.isNotEmpty(roleToken)) {
builder.authentication(AuthenticationFactory.token(roleToken));
}
pulsarClient = builder.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -220,22 +218,24 @@ public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
List<CommonMessage> messageList = Lists.newArrayList();
try {
Messages<byte[]> messages = pulsarMQConsumer.batchReceive();

if (null == messages || messages.size() == 0) {
return messageList;
}
// 保存当前消费记录,用于ack和rollback
this.lastGetBatchMessage = messages;
for (org.apache.pulsar.client.api.Message<byte[]> msg : messages) {
byte[] data = msg.getData();
if (!this.flatMessage) {
/*if (!this.flatMessage) {
Message message = CanalMessageSerializerUtil.deserializer(data);
List<CommonMessage> list = MessageUtil.convert(message);
messageList.addAll(list);
} else {
CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
messageList.add(commonMessage);
}
}*/
// CanalMessageSerializerUtil.deserializer(data) 会转换失败
CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
messageList.add(commonMessage);
}
} catch (PulsarClientException e) {
throw new CanalClientException("Receive pulsar batch message error", e);
Expand Down Expand Up @@ -278,7 +278,8 @@ public void disconnect() {
return;
}
try {
this.pulsarMQConsumer.unsubscribe();
// 会导致暂停期间数据丢失
// this.pulsarMQConsumer.unsubscribe();
this.pulsarClient.close();
} catch (PulsarClientException e) {
throw new CanalClientException("Disconnect pulsar consumer error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,6 +52,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
* pulsar客户端,管理连接
*/
protected PulsarClient client;
/**
* Pulsar admin 客户端
*/
protected PulsarAdmin pulsarAdmin;

@Override
public void init(Properties properties) {
Expand All @@ -61,17 +67,28 @@ public void init(Properties properties) {

// 初始化连接客户端
try {
client = PulsarClient.builder()
ClientBuilder builder = PulsarClient.builder()
// 填写pulsar的连接地址
.serviceUrl(pulsarMQProducerConfig.getServerUrl())
// 角色权限认证的token
.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()))
.build();
.serviceUrl(pulsarMQProducerConfig.getServerUrl());
if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) {
// 角色权限认证的token
builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
}
client = builder.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
// 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载

// 初始化Pulsar admin
if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) {
try {
pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}

// 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
parallelPartitionSendThreadSize,
Expand Down Expand Up @@ -106,6 +123,10 @@ private void loadPulsarMQProperties(Properties properties) {
if (!StringUtils.isEmpty(topicTenantPrefix)) {
tmpProperties.setTopicTenantPrefix(topicTenantPrefix);
}
String adminServerUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL);
if(!StringUtils.isEmpty(adminServerUrl)) {
tmpProperties.setAdminServerUrl(adminServerUrl);
}
if (logger.isDebugEnabled()) {
logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
}
Expand Down Expand Up @@ -182,6 +203,11 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
// 创建多分区topic
if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0 && PRODUCERS.get(topicName)==null) {
createMultipleTopic(topicName, partitionNum);
}

ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
Expand Down Expand Up @@ -319,22 +345,41 @@ private void sendMessage(String topic, int partition, List<FlatMessage> flatMess
}

/**
* 获取指定topic的生产者,并且使用缓存
*
* 创建多分区topic
* @param topic
* @return org.apache.pulsar.client.api.Producer<byte [ ]>
* @date 2021/9/10 11:21
* @author chad
* @since 1 by chad at 2021/9/10 新增
* @param partitionNum
*/
private void createMultipleTopic(String topic,Integer partitionNum) {
// 拼接topic前缀
PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties;
String prefix = pulsarMQProperties.getTopicTenantPrefix();
String fullTopic = topic;
if (!StringUtils.isEmpty(prefix)) {
if (!prefix.endsWith("/")) {
fullTopic = "/" + fullTopic;
}
fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic;
}

// 创建分区topic
try {
pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum);
} catch (PulsarAdminException e) {
// TODO 无论是否报错,都继续后续的操作,此处不进行阻塞
}
}
/**
* 获取topic
* @param topic
* @return
*/
private Producer<byte[]> getProducer(String topic) {
Producer producer = PRODUCERS.get(topic);

if (null == producer) {
if (null == producer || !producer.isConnected()) {
try {
synchronized (PRODUCERS) {
producer = PRODUCERS.get(topic);
if (null != producer) {
if (null != producer && producer.isConnected()) {
return producer;
}

Expand Down Expand Up @@ -405,7 +450,7 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {

@Override
public void stop() {
logger.info("## Stop RocketMQ producer##");
logger.info("## Stop PulsarMQ producer##");

for (Producer p : PRODUCERS.values()) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pulsarmq=com.alibaba.otter.canal.connector.pulsarmq.consumer.CanalPulsarMQConsumer
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.6</version>
<version>1.2.8</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
Expand Down Expand Up @@ -346,6 +346,11 @@
<artifactId>pulsar-client</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down