From b116cf14c3b50fd6103abdf6f42af47220aca28a Mon Sep 17 00:00:00 2001
From: RYE <1294566108@qq.com>
Date: Sun, 23 Apr 2023 17:40:18 +0800
Subject: [PATCH 01/13] feat:Integrate RocketMQ 5.0 client with Spring
---
pom.xml | 3 +
.../springboot/ClientConsumeApplication.java | 120 +++++++
.../springboot/ExtRocketMQTemplate.java | 12 +
.../springboot/consumer/MyConsumer.java | 23 ++
.../springboot/ClientProducerApplication.java | 171 +++++++++
.../springboot/domain/UserMessage.java | 37 ++
.../ExtConsumerResetConfiguration.java | 74 ++++
.../ExtProducerResetConfiguration.java | 61 ++++
.../annotation/RocketMQMessageListener.java | 69 ++++
...ketMQMessageListenerBeanPostProcessor.java | 91 +++++
.../RocketMQTransactionListener.java | 16 +
.../ExtConsumerResetConfiguration.java | 137 +++++++
.../ExtTemplateResetConfiguration.java | 118 ++++++
.../ListenerContainerConfiguration.java | 95 +++++
.../MessageConverterConfiguration.java | 21 ++
.../RocketMQAutoConfiguration.java | 144 ++++++++
.../RocketMQListenerConfiguration.java | 24 ++
.../autoconfigure/RocketMQProperties.java | 287 +++++++++++++++
.../RocketMQTransactionConfiguration.java | 60 ++++
.../rocketmq/client/client/common/Pair.java | 30 ++
.../client/core/RocketMQClientTemplate.java | 337 ++++++++++++++++++
.../client/client/core/RocketMQListener.java | 12 +
.../core/RocketMQTransactionChecker.java | 12 +
.../support/DefaultListenerContainer.java | 331 +++++++++++++++++
.../client/support/RocketMQHeaders.java | 20 ++
.../support/RocketMQListenerContainer.java | 9 +
.../support/RocketMQMessageConverter.java | 67 ++++
.../client/client/support/RocketMQUtil.java | 152 ++++++++
.../main/resources/META-INF/spring.factories | 2 +
29 files changed, 2535 insertions(+)
create mode 100644 rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumeApplication.java
create mode 100644 rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
create mode 100644 rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MyConsumer.java
create mode 100644 rocketmq-client-spring-boot-samples/rocketmq-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerApplication.java
create mode 100644 rocketmq-client-spring-boot-samples/rocketmq-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/ExtConsumerResetConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/ExtProducerResetConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQMessageListener.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQTransactionListener.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/autoconfigure/ExtConsumerResetConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/autoconfigure/ExtTemplateResetConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/autoconfigure/ListenerContainerConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/autoconfigure/MessageConverterConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/autoconfigure/RocketMQAutoConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/autoconfigure/RocketMQListenerConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/autoconfigure/RocketMQProperties.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/autoconfigure/RocketMQTransactionConfiguration.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/common/Pair.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/core/RocketMQClientTemplate.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/core/RocketMQListener.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/core/RocketMQTransactionChecker.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/support/DefaultListenerContainer.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/support/RocketMQHeaders.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/support/RocketMQListenerContainer.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/support/RocketMQMessageConverter.java
create mode 100644 rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/support/RocketMQUtil.java
create mode 100644 rocketmq-client-spring-boot/src/main/resources/META-INF/spring.factories
diff --git a/pom.xml b/pom.xml
index f4659e2c..099d9052 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,6 +215,9 @@
rocketmq-spring-boot-parent
rocketmq-spring-boot
rocketmq-spring-boot-starter
+ rocketmq-client-spring-boot
+ rocketmq-client-spring-boot-parent
+ rocketmq-client-spring-boot-starter
diff --git a/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumeApplication.java b/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumeApplication.java
new file mode 100644
index 00000000..60ce31d0
--- /dev/null
+++ b/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumeApplication.java
@@ -0,0 +1,120 @@
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.client.core.RocketMQClientTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+@SpringBootApplication
+public class ClientConsumeApplication implements CommandLineRunner {
+ private static final Logger log = LoggerFactory.getLogger(ClientConsumeApplication.class);
+
+ @Resource
+ RocketMQClientTemplate rocketMQClientTemplate;
+
+ @Resource(name = "extRocketMQTemplate")
+ RocketMQClientTemplate extRocketMQTemplate;
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientConsumeApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+ receiveSimpleConsumerMessage();
+ receiveExtSimpleConsumerMessage();
+ }
+
+ public void receiveSimpleConsumerMessage() throws ClientException {
+ do {
+ final List messages = rocketMQClientTemplate.receive(16, Duration.ofSeconds(15));
+ log.info("Received {} message(s)", messages.size());
+ for (MessageView message : messages) {
+ log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
+ final MessageId messageId = message.getMessageId();
+ try {
+ rocketMQClientTemplate.ack(message);
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ } catch (Throwable t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ }
+ }
+ } while (true);
+ }
+
+ public void receiveExtSimpleConsumerMessage() throws ClientException {
+ do {
+ final List messages = extRocketMQTemplate.receive(16, Duration.ofSeconds(15));
+ log.info("Received {} message(s)", messages.size());
+ for (MessageView message : messages) {
+ log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
+ final MessageId messageId = message.getMessageId();
+ try {
+ rocketMQClientTemplate.ack(message);
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ } catch (Throwable t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ }
+ }
+ } while (true);
+ }
+
+
+ public void receiveSimpleConsumerMessageAsynchronously() {
+ do {
+ int maxMessageNum = 16;
+ // Set message invisible duration after it is received.
+ Duration invisibleDuration = Duration.ofSeconds(15);
+ // Set individual thread pool for receive callback.
+ ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
+ // Set individual thread pool for ack callback.
+ ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
+ CompletableFuture> future0;
+ try {
+ future0 = rocketMQClientTemplate.receiveAsync(maxMessageNum, invisibleDuration);
+ } catch (ClientException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ future0.whenCompleteAsync(((messages, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to receive message from remote", throwable);
+ // Return early.
+ return;
+ }
+ log.info("Received {} message(s)", messages.size());
+ // Using messageView as key rather than message id because message id may be duplicated.
+ final Map> map =
+ messages.stream().collect(Collectors.toMap(message -> message, rocketMQClientTemplate::ackAsync));
+ for (Map.Entry> entry : map.entrySet()) {
+ final MessageId messageId = entry.getKey().getMessageId();
+ final CompletableFuture future = entry.getValue();
+ future.whenCompleteAsync((v, t) -> {
+ if (null != t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ // Return early.
+ return;
+ }
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ }, ackCallbackExecutor);
+ }
+
+ }), receiveCallbackExecutor);
+ } while (true);
+ }
+
+}
diff --git a/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java b/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
new file mode 100644
index 00000000..478fdcf5
--- /dev/null
+++ b/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
@@ -0,0 +1,12 @@
+package org.apache.rocketmq.samples.springboot;
+
+
+import org.apache.rocketmq.client.client.annotation.ExtConsumerResetConfiguration;
+import org.apache.rocketmq.client.client.core.RocketMQClientTemplate;
+
+/**
+ * @author Akai
+ */
+@ExtConsumerResetConfiguration(topic = "${ext.rocketmq.topic:}")
+public class ExtRocketMQTemplate extends RocketMQClientTemplate {
+}
diff --git a/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MyConsumer.java b/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MyConsumer.java
new file mode 100644
index 00000000..54d7b294
--- /dev/null
+++ b/rocketmq-client-spring-boot-samples/rocketmq-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MyConsumer.java
@@ -0,0 +1,23 @@
+package org.apache.rocketmq.samples.springboot.consumer;
+
+
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.client.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.client.client.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author Akai
+ */
+@Service
+@RocketMQMessageListener(endpoints = "${demo.rocketmq.endpoints:}", topic = "${demo.rocketmq.topic:}",
+ consumerGroup = "${demo.rocketmq.consumer-group:}", tag = "${demo.rocketmq.tag:}")
+public class MyConsumer implements RocketMQListener {
+
+ @Override
+ public ConsumeResult consume(MessageView messageView) {
+ System.out.println("handle my message:" + messageView);
+ return ConsumeResult.SUCCESS;
+ }
+}
diff --git a/rocketmq-client-spring-boot-samples/rocketmq-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerApplication.java b/rocketmq-client-spring-boot-samples/rocketmq-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerApplication.java
new file mode 100644
index 00000000..c9f34a84
--- /dev/null
+++ b/rocketmq-client-spring-boot-samples/rocketmq-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerApplication.java
@@ -0,0 +1,171 @@
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.apis.producer.Transaction;
+import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+
+import org.apache.rocketmq.client.client.annotation.RocketMQTransactionListener;
+import org.apache.rocketmq.client.client.common.Pair;
+import org.apache.rocketmq.client.client.core.RocketMQClientTemplate;
+import org.apache.rocketmq.client.client.core.RocketMQTransactionChecker;
+import org.apache.rocketmq.samples.springboot.domain.UserMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.messaging.support.MessageBuilder;
+
+import javax.annotation.Resource;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+@SpringBootApplication
+public class ClientProducerApplication implements CommandLineRunner {
+
+ private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);
+
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+
+ @Value("${demo.rocketmq.fifo-topic}")
+ private String fifoTopic;
+
+ @Value("${demo.rocketmq.normal-topic}")
+ private String normalTopic;
+
+ @Value("${demo.rocketmq.delay-topic}")
+ private String delayTopic;
+
+ @Value("${demo.rocketmq.trans-topic}")
+ private String transTopic;
+
+ @Value("${demo.rocketmq.message-group}")
+ private String messageGroup;
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientProducerApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws ClientException {
+ testSendDelayMessage();
+ testSendFIFOMessage();
+ testSendNormalMessage();
+ testSendTransactionMessage();
+ }
+
+ void testASycSendMessage() {
+ CompletableFuture future = rocketMQClientTemplate.asyncSend(normalTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), null);
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, future);
+
+ CompletableFuture future1 = rocketMQClientTemplate.asyncSend(normalTopic, "normal message", null);
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, future1);
+
+ CompletableFuture future2 = rocketMQClientTemplate.asyncSend(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8), null);
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, future2);
+
+ CompletableFuture future3 = rocketMQClientTemplate.asyncSend(normalTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build(), null);
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, future3);
+ }
+
+ void testSendDelayMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), Duration.ofSeconds(10));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build(), Duration.ofSeconds(20));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "this is my message",
+ Duration.ofSeconds(30));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "byte messages".getBytes(StandardCharsets.UTF_8),
+ Duration.ofSeconds(40));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+ }
+
+ void testSendFIFOMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build(), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "fifo message", messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "byte message".getBytes(StandardCharsets.UTF_8), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+ }
+
+ void testSendNormalMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build());
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+ }
+
+ void testSendTransactionMessage() throws ClientException {
+ Pair pair;
+ SendReceipt sendReceipt;
+ try {
+ pair = rocketMQClientTemplate.sendGRpcMessageInTransaction(transTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), null);
+ } catch (ClientException e) {
+ throw new RuntimeException(e);
+ }
+ sendReceipt = pair.getLeft();
+ System.out.printf("transactionSend to topic %s sendReceipt=%s %n", transTopic, sendReceipt);
+ Transaction transaction = pair.getRight();
+ // executed local transaction
+ if (doLocalTransaction(1)) {
+ transaction.commit();
+ } else {
+ transaction.rollback();
+ }
+ }
+
+ @RocketMQTransactionListener
+ class TransactionListenerImpl implements RocketMQTransactionChecker {
+ @Override
+ public TransactionResolution check(MessageView messageView) {
+ if (Objects.nonNull(messageView.getProperties().get("KEY"))) {
+ log.info("commit transaction");
+ return TransactionResolution.COMMIT;
+ }
+ log.info("rollback transaction");
+ return TransactionResolution.ROLLBACK;
+ }
+ }
+
+ boolean doLocalTransaction(int number) {
+ log.info("execute local transaction");
+ if (number > 0) {
+ return true;
+ }
+ return false;
+ }
+
+}
diff --git a/rocketmq-client-spring-boot-samples/rocketmq-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java b/rocketmq-client-spring-boot-samples/rocketmq-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java
new file mode 100644
index 00000000..88919ff1
--- /dev/null
+++ b/rocketmq-client-spring-boot-samples/rocketmq-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java
@@ -0,0 +1,37 @@
+package org.apache.rocketmq.samples.springboot.domain;
+
+/**
+ * @author Akai
+ */
+public class UserMessage {
+ int id;
+ private String userName;
+ private Byte userAge;
+
+ public int getId() {
+ return id;
+ }
+
+ public UserMessage setId(int id) {
+ this.id = id;
+ return this;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public UserMessage setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public Byte getUserAge() {
+ return userAge;
+ }
+
+ public UserMessage setUserAge(Byte userAge) {
+ this.userAge = userAge;
+ return this;
+ }
+}
diff --git a/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/ExtConsumerResetConfiguration.java
new file mode 100644
index 00000000..41238813
--- /dev/null
+++ b/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/ExtConsumerResetConfiguration.java
@@ -0,0 +1,74 @@
+package org.apache.rocketmq.client.client.annotation;
+
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.*;
+
+/**
+ * @author Akai
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ExtConsumerResetConfiguration {
+
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}";
+ String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}";
+ String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}";
+ String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}";
+ String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}";
+ String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}";
+
+ /**
+ * The component name of the Consumer configuration.
+ */
+ String value() default "";
+
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+ /**
+ * Tag of consumer.
+ */
+ String tag() default TAG_PLACEHOLDER;
+
+ /**
+ * Topic name of consumer.
+ */
+ String topic() default TOPIC_PLACEHOLDER;
+
+ /**
+ * The access point that the SDK should communicate with.
+ */
+ String endpoints() default ENDPOINTS_PLACEHOLDER;
+
+ /**
+ * The load balancing group for the simple consumer.
+ */
+ String consumerGroup() default CONSUMER_GROUP_PLACEHOLDER;
+
+ /**
+ * The type of filter expression
+ */
+ String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER;
+
+ /**
+ * The requestTimeout of client,it is 3s by default.
+ */
+ int requestTimeout() default 3;
+
+ /**
+ * The max await time when receive messages from the server.
+ */
+ int awaitDuration() default 0;
+
+}
diff --git a/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/ExtProducerResetConfiguration.java b/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/ExtProducerResetConfiguration.java
new file mode 100644
index 00000000..82380d9c
--- /dev/null
+++ b/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/ExtProducerResetConfiguration.java
@@ -0,0 +1,61 @@
+package org.apache.rocketmq.client.client.annotation;
+
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.*;
+
+/**
+ * @author Akai
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ExtProducerResetConfiguration {
+
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.producer.accessKey:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.producer.secretKey:}";
+ String TOPIC_PLACEHOLDER = "${rocketmq.producer.topic:}";
+ String ENDPOINTS_PLACEHOLDER = "${rocketmq.producer.endpoints:}";
+
+ /**
+ * The component name of the Producer configuration.
+ */
+ String value() default "";
+
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+ /**
+ * The access point that the SDK should communicate with.
+ */
+ String endpoints() default ENDPOINTS_PLACEHOLDER;
+
+ /**
+ * Topic name of consumer.
+ */
+ String topic() default TOPIC_PLACEHOLDER;
+
+ /**
+ * Request timeout is 3s by default.
+ */
+ int requestTimeout() default 3;
+
+ /**
+ * Enable or disable the use of Secure Sockets Layer (SSL) for network transport.
+ */
+ boolean sslEnabled() default true;
+
+ /**
+ * Max attempts for max internal retries of message publishing.
+ */
+ int maxAttempts() default 3;
+
+}
diff --git a/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQMessageListener.java b/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQMessageListener.java
new file mode 100644
index 00000000..21796fb6
--- /dev/null
+++ b/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQMessageListener.java
@@ -0,0 +1,69 @@
+package org.apache.rocketmq.client.client.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * @author Akai
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface RocketMQMessageListener {
+
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.push-consumer.access-key:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
+ String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}";
+ String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}";
+ String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}";
+
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+ /**
+ * The access point that the SDK should communicate with.
+ */
+ String endpoints() default ENDPOINTS_PLACEHOLDER;
+
+ /**
+ * Topic name of consumer.
+ */
+ String topic() default TOPIC_PLACEHOLDER;
+
+ /**
+ * Tag of consumer.
+ */
+ String tag() default TAG_PLACEHOLDER;
+
+ /**
+ * The type of filter expression
+ */
+ String filterExpressionType() default "tag";
+
+ /**
+ * The load balancing group for the simple consumer.
+ */
+ String consumerGroup();
+
+ /**
+ * The requestTimeout of client,it is 3s by default.
+ */
+ int requestTimeout() default 3;
+
+
+ int maxCachedMessageCount() default 1024;
+
+
+ int maxCacheMessageSizeInBytes() default 67108864;
+
+
+ int consumptionThreadCount() default 20;
+
+
+}
diff --git a/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQMessageListenerBeanPostProcessor.java b/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
new file mode 100644
index 00000000..c80642f8
--- /dev/null
+++ b/rocketmq-client-spring-boot/src/main/java/org/apache/rocketmq/client/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
@@ -0,0 +1,91 @@
+package org.apache.rocketmq.client.client.annotation;
+
+import org.apache.rocketmq.client.client.autoconfigure.ListenerContainerConfiguration;
+import org.springframework.aop.support.AopUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.core.OrderComparator;
+import org.springframework.core.annotation.AnnotationUtils;
+
+import java.lang.reflect.AnnotatedElement;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * @author Akai
+ */
+public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
+
+ private ApplicationContext applicationContext;
+
+ private AnnotationEnhancer enhancer;
+
+ private ListenerContainerConfiguration listenerContainerConfiguration;
+
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ return bean;
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ Class> targetClass = AopUtils.getTargetClass(bean);
+ RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
+ if (ann != null) {
+ RocketMQMessageListener enhance = enhance(targetClass, ann);
+ if (listenerContainerConfiguration != null) {
+ listenerContainerConfiguration.registerContainer(beanName, bean, enhance);
+ }
+ }
+ return bean;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ buildEnhancer();
+ this.listenerContainerConfiguration = this.applicationContext.getBean(ListenerContainerConfiguration.class);
+ }
+
+ private void buildEnhancer() {
+ if (this.applicationContext != null) {
+ Map enhancersMap =
+ this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
+ if (enhancersMap.size() > 0) {
+ List enhancers = enhancersMap.values()
+ .stream()
+ .sorted(new OrderComparator())
+ .collect(Collectors.toList());
+ this.enhancer = (attrs, element) -> {
+ Map newAttrs = attrs;
+ //遍历所有注解增强器,将前一次注解增强得到的结果作为下一个注解增强器的参数,最后返回多个注解增强器处理后的结果
+ for (AnnotationEnhancer enh : enhancers) {
+ newAttrs = enh.apply(newAttrs, element);
+ }
+ return attrs;
+ };
+ }
+ }
+ }
+
+ private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) {
+ if (this.enhancer == null) {
+ return ann;
+ } else {
+ return AnnotationUtils.synthesizeAnnotation(
+ this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null);
+ }
+ }
+
+ public interface AnnotationEnhancer extends BiFunction