diff --git a/.licenserc.yaml b/.licenserc.yaml
index ba91d1fa..5eec96d9 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -33,6 +33,7 @@ header:
- 'src/test/**/*.log'
- '*/src/test/resources/META-INF/service/*'
- '*/src/main/resources/META-INF/service/*'
+ - '*/src/main/resources/META-INF/spring/*'
- '**/*/spring.factories'
- '**/target/**'
- '**/*.iml'
diff --git a/pom.xml b/pom.xml
index f4659e2c..43e278fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,6 +215,9 @@
rocketmq-spring-boot-parentrocketmq-spring-bootrocketmq-spring-boot-starter
+ rocketmq-v5-client-spring-boot
+ rocketmq-v5-client-spring-boot-parent
+ rocketmq-v5-client-spring-boot-starter
diff --git a/rocketmq-spring-boot-parent/pom.xml b/rocketmq-spring-boot-parent/pom.xml
index 102238f2..a01a8142 100644
--- a/rocketmq-spring-boot-parent/pom.xml
+++ b/rocketmq-spring-boot-parent/pom.xml
@@ -27,6 +27,7 @@
rocketmq-spring-boot-parent
+ 2.2.4-SNAPSHOTpomRocketMQ Spring Boot Parent
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index 793eb5d1..7e7d7bbb 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -40,7 +40,7 @@
1.81.8
- 2.2.3-SNAPSHOT
+ 2.2.4-SNAPSHOT
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 9f10438e..a8c7379c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.rocketmq.spring.autoconfigure;
import org.apache.rocketmq.client.AccessChannel;
diff --git a/rocketmq-v5-client-spring-boot-parent/pom.xml b/rocketmq-v5-client-spring-boot-parent/pom.xml
new file mode 100644
index 00000000..a0a10fd5
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-parent/pom.xml
@@ -0,0 +1,187 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-spring-all
+ 2.2.4-SNAPSHOT
+ ../pom.xml
+
+
+ rocketmq-v5-client-spring-boot-parent
+ pom
+ 2.2.4-SNAPSHOT
+
+ rocketmq-v5-client-spring-boot-parent
+ rocketmq-v5-client-spring-boot-parent
+
+
+ ${project.basedir}/..
+ 2.5.9
+ 5.3.20
+
+ 2.2.4-SNAPSHOT
+ 5.1.0
+ 1.7.25
+ 2.11.1
+ 1.2.83
+ 4.13.2
+ 5.0.5
+ 1.8
+ @
+
+ UTF-8
+ UTF-8
+ ${java.version}
+ ${java.version}
+ -Xdoclint:none
+ jacoco
+
+ ${project.basedir}/../test/target/jacoco-it.exec
+ file:**/generated-sources/**,**/test/**
+
+
+
+
+
+ org.springframework.boot
+ spring-boot
+ ${spring.boot.version}
+
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ ${spring.boot.version}
+
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure-processor
+ ${spring.boot.version}
+
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ ${spring.boot.version}
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ ${spring.boot.version}
+ test
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ ${spring.boot.version}
+
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+ ${spring.boot.version}
+
+
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot
+ ${rocketmq.client.spring.boot.version}
+
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+ ${rocketmq.spring.client.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-acl
+ ${rocketmq.version}
+
+
+
+ org.springframework
+ spring-messaging
+ ${spring.version}
+
+
+
+ org.springframework
+ spring-core
+ ${spring.version}
+
+
+
+ org.springframework
+ spring-context
+ ${spring.version}
+
+
+
+ org.springframework
+ spring-aop
+ ${spring.version}
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${jackson.version}
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+
+ junit
+ junit
+ ${junit.version}
+
+
+
+
+
diff --git a/rocketmq-v5-client-spring-boot-samples/README-CN.md b/rocketmq-v5-client-spring-boot-samples/README-CN.md
new file mode 100644
index 00000000..59698fcf
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/README-CN.md
@@ -0,0 +1,668 @@
+
+
+# Normal消息发送
+
+
+
+### 修改application.properties
+
+**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。 **demo.rocketmq.normal-topic:** 用户自定义消息发送的topic
+
+```properties
+rocketmq.producer.endpoints=127.0.0.1:8081
+rocketmq.producer.topic=normalTopic
+demo.rocketmq.normal-topic=normalTopic
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
+
+
+
+### 编写代码
+
+通过@Value注解引入配置文件参数,指定自定义topic 通过@Resource注解引入RocketMQClientTemplate容器 通过调用**RocketMQClientTemplate#syncSendNormalMessage**方法进行normal消息的发送(消息的参数类型可选:Object、String、byte[]、Message)
+
+```java
+@SpringBootApplication
+public class ClientProducerApplication implements CommandLineRunner {
+
+ private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);
+
+ @Value("${demo.rocketmq.normal-topic}")
+ private String normalTopic;
+
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientProducerApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws ClientException {
+ testSendNormalMessage();
+ }
+
+ //Test sending normal message
+ void testSendNormalMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build());
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+ }
+
+ @Data
+ @AllArgsConstructor
+ public class UserMeaasge implements Serializable {
+ private int id;
+ private String userName;
+ private Byte userAge;
+ }
+
+}
+```
+
+
+
+# FIFO消息发送
+
+
+
+### 修改application.properties
+
+**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。 **demo.rocketmq.fifo-topic:** 用户自定义消息发送的topic **demo.rocketmq.message-group=group1:** 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。
+
+```properties
+rocketmq.producer.endpoints=127.0.0.1:8081
+rocketmq.producer.topic=fifoTopic
+demo.rocketmq.fifo-topic=fifoTopic
+demo.rocketmq.message-group=group1
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
+
+
+
+### 编写代码
+
+通过@Value注解引入配置文件参数,指定自定义topic 通过@Resource注解引入RocketMQClientTemplate容器 通过调用**RocketMQClientTemplate#syncSendNormalMessage**方法进行fifo消息的发送(参数类型可选:Object、String、byte[]、Message) 发送fifo消息时需要设置参数:消费者组(MessageGroup)
+
+```java
+@SpringBootApplication
+public class ClientProducerApplication implements CommandLineRunner {
+
+ private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);
+
+ @Value("${demo.rocketmq.fifo-topic}")
+ private String fifoTopic;
+
+ @Value("${demo.rocketmq.message-group}")
+ private String messageGroup;
+
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientProducerApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws ClientException {
+ testSendFIFOMessage();
+ }
+
+ //Test sending fifo message
+ void testSendFIFOMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build(), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "fifo message", messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "byte message".getBytes(StandardCharsets.UTF_8), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+ }
+
+ @Data
+ @AllArgsConstructor
+ public class UserMeaasge implements Serializable {
+ private int id;
+ private String userName;
+ private Byte userAge;
+ }
+
+}
+```
+
+
+
+# Delay消息发送
+
+
+
+### 修改application.properties
+
+**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。 **demo.rocketmq.delay-topic:** 用户自定义消息发送的topic
+
+```class
+rocketmq.producer.endpoints=127.0.0.1:8081
+rocketmq.producer.topic=delayTopic
+demo.rocketmq.fifo-topic=delayTopic
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
+
+
+
+### 编写代码
+
+通过@Value注解引入配置文件参数,指定自定义topic 通过@Resource注解引入RocketMQClientTemplate容器 通过调用**RocketMQClientTemplate#syncSendNormalMessage**方法进行delay消息的发送(消息的参数类型可选:Object、String、byte[]、Message) 发送delay消息时需要指定延迟时间:DeliveryTimestamp
+
+```java
+@SpringBootApplication
+public class ClientProducerApplication implements CommandLineRunner {
+
+ private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);
+
+ @Value("${demo.rocketmq.delay-topic}")
+ private String delayTopic;
+
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+ public static void main(String[] args) {
+ SpringApplication.run(ClientProducerApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws ClientException {
+ testSendDelayMessage();
+ }
+
+ //Test sending delay message
+ void testSendDelayMessage() {
+
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), Duration.ofSeconds(10));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build(), Duration.ofSeconds(30));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "this is my message",
+ Duration.ofSeconds(60));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "byte messages".getBytes(StandardCharsets.UTF_8),
+ Duration.ofSeconds(90));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+ }
+
+ @Data
+ @AllArgsConstructor
+ public class UserMeaasge implements Serializable {
+ int id;
+ private String userName;
+ private Byte userAge;
+ }
+
+}
+```
+
+
+
+# 事务消息发送
+
+
+
+### 修改application.properties
+
+**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。 **demo.rocketmq.delay-topic:** 用户自定义消息发送的topic
+
+```class
+rocketmq.producer.endpoints=127.0.0.1:8081
+rocketmq.producer.topic=transTopic
+demo.rocketmq.trans-topic=transTopic
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
+
+
+
+### 编写代码
+
+通过@Value注解引入配置文件参数,指定自定义topic 通过@Resource注解引入RocketMQClientTemplate容器 通过调用**RocketMQClientTemplate#sendMessageInTransaction**方法进行事务消息的发送(消息的参数类型可选:Object、String、byte[]、Message)。 发送成功后会收到Pair类型的返回值,其左值代表返回值SendReceipt;右值代表Transaction,可以让用户根据本地事务处理结果的业务逻辑来决定commit还是rollback。 使用注解@RocketMQTransactionListener标记一个自定义类,该类必须实现RocketMQTransactionChecker接口,并重写TransactionResolution check(MessageView messageView)方法。
+
+```class
+ void testSendTransactionMessage() throws ClientException {
+ Pair pair;
+ SendReceipt sendReceipt;
+ try {
+ pair = rocketMQClientTemplate.sendMessageInTransaction(transTopic, MessageBuilder.
+ withPayload(new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3)).setHeader("OrderId", 1).build());
+ } catch (ClientException e) {
+ throw new RuntimeException(e);
+ }
+ sendReceipt = pair.getSendReceipt();
+ System.out.printf("transactionSend to topic %s sendReceipt=%s %n", transTopic, sendReceipt);
+ Transaction transaction = pair.getTransaction();
+ // executed local transaction
+ if (doLocalTransaction(1)) {
+ transaction.commit();
+ } else {
+ transaction.rollback();
+ }
+ }
+
+ @RocketMQTransactionListener
+ static class TransactionListenerImpl implements RocketMQTransactionChecker {
+ @Override
+ public TransactionResolution check(MessageView messageView) {
+ if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
+ log.info("Receive transactional message check, message={}", messageView);
+ return TransactionResolution.COMMIT;
+ }
+ log.info("rollback transaction");
+ return TransactionResolution.ROLLBACK;
+ }
+ }
+
+ boolean doLocalTransaction(int number) {
+ log.info("execute local transaction");
+ return number > 0;
+ }
+```
+
+
+
+# 异步消息发送
+
+
+
+### 修改application.properties
+
+**rocketmq.producer.topic:** 用于给生产者设置topic名称(可选,但建议使用),生产者可以在消息发布之前**预取**topic路由。 **demo.rocketmq.delay-topic:** 用户自定义消息发送的topic
+
+```class
+rocketmq.producer.endpoints=127.0.0.1:8081
+demo.rocketmq.fifo-topic=fifoTopic
+demo.rocketmq.delay-topic=delayTopic
+demo.rocketmq.normal-topic=normalTopic
+demo.rocketmq.message-group=group1
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
+
+
+
+### 编写代码
+
+```class
+ void testASycSendMessage() {
+
+ CompletableFuture future0 = new CompletableFuture<>();
+ CompletableFuture future1 = new CompletableFuture<>();
+ CompletableFuture future2 = new CompletableFuture<>();
+ ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
+
+ future0.whenCompleteAsync((sendReceipt, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to send message", throwable);
+ return;
+ }
+ log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ }, sendCallbackExecutor);
+
+ future1.whenCompleteAsync((sendReceipt, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to send message", throwable);
+ return;
+ }
+ log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ }, sendCallbackExecutor);
+
+ future2.whenCompleteAsync((sendReceipt, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to send message", throwable);
+ return;
+ }
+ log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ }, sendCallbackExecutor);
+
+ CompletableFuture completableFuture0 = rocketMQClientTemplate.asyncSendNormalMessage(normalTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), future0);
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, completableFuture0);
+
+ CompletableFuture completableFuture1 = rocketMQClientTemplate.asyncSendFifoMessage(fifoTopic, "fifo message",
+ messageGroup, future1);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, completableFuture1);
+
+ CompletableFuture completableFuture2 = rocketMQClientTemplate.asyncSendDelayMessage(delayTopic,
+ "delay message".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(10), future2);
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, completableFuture2);
+ }
+```
+
+
+
+# 接收消息
+
+
+
+### Push 模式
+
+
+
+#### 修改application.properties
+
+```class
+demo.fifo.rocketmq.endpoints=localhost:8081
+demo.fifo.rocketmq.topic=fifoTopic
+demo.fifo.rocketmq.consumer-group=fifoGroup
+demo.fifo.rocketmq.tag=*
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
+
+
+
+#### 编写代码
+
+```java
+@Service
+@RocketMQMessageListener(endpoints = "${demo.fifo.rocketmq.endpoints:}", topic = "${demo.fifo.rocketmq.topic:}",
+ consumerGroup = "${demo.fifo.rocketmq.consumer-group:}", tag = "${demo.fifo.rocketmq.tag:}")
+public class FifoConsumer implements RocketMQListener {
+
+ @Override
+ public ConsumeResult consume(MessageView messageView) {
+ System.out.println("handle my fifo message:" + messageView);
+ return ConsumeResult.SUCCESS;
+ }
+}
+```
+
+
+
+### Simple 模式
+
+
+
+#### 同步订阅
+
+
+
+##### 修改application.properties
+
+```class
+rocketmq.simple-consumer.endpoints=localhost:8081
+rocketmq.simple-consumer.consumer-group=normalGroup
+rocketmq.simple-consumer.topic=normalTopic
+rocketmq.simple-consumer.tag=*
+rocketmq.simple-consumer.filter-expression-type=tag
+ext.rocketmq.topic=delayTopic
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口
+
+
+
+##### 编写代码
+
+此时测验原始的RocketMQClientTemplate和我们拓展的ExtRocketMQTemplate是否有效:
+
+1. 首先定义拓展ExtRocketMQTemplate,需要加上@ExtConsumerResetConfiguration,并指定topic等关键字段。
+
+```java
+@ExtConsumerResetConfiguration(topic = "${ext.rocketmq.topic:}")
+public class ExtRocketMQTemplate extends RocketMQClientTemplate {
+}
+```
+
+2. receiveSimpleConsumerMessage方法消费topic=normalTopic的消息,receiveExtSimpleConsumerMessage方法消费topic=delayTopic的消息。
+
+```java
+@SpringBootApplication
+public class ClientConsumeApplication implements CommandLineRunner {
+ private static final Logger log = LoggerFactory.getLogger(ClientConsumeApplication.class);
+
+ @Resource
+ RocketMQClientTemplate rocketMQClientTemplate;
+
+ @Resource(name = "extRocketMQTemplate")
+ RocketMQClientTemplate extRocketMQTemplate;
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientConsumeApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+ receiveSimpleConsumerMessage();
+ receiveExtSimpleConsumerMessage();
+ }
+
+ public void receiveSimpleConsumerMessage() throws ClientException {
+ do {
+ final List messages = rocketMQClientTemplate.receive(16, Duration.ofSeconds(15));
+ log.info("Received {} message(s)", messages.size());
+ for (MessageView message : messages) {
+ log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
+ final MessageId messageId = message.getMessageId();
+ try {
+ rocketMQClientTemplate.ack(message);
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ } catch (Throwable t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ }
+ }
+ } while (true);
+ }
+
+ public void receiveExtSimpleConsumerMessage() throws ClientException {
+ do {
+ final List messages = extRocketMQTemplate.receive(16, Duration.ofSeconds(15));
+ log.info("Received {} message(s)", messages.size());
+ for (MessageView message : messages) {
+ log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
+ final MessageId messageId = message.getMessageId();
+ try {
+ rocketMQClientTemplate.ack(message);
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ } catch (Throwable t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ }
+ }
+ } while (true);
+ }
+
+}
+```
+
+
+
+#### 异步订阅
+
+
+
+##### 修改application.properties
+
+```class
+rocketmq.simple-consumer.endpoints=localhost:8081
+rocketmq.simple-consumer.consumer-group=normalGroup
+rocketmq.simple-consumer.topic=normalTopic
+rocketmq.simple-consumer.tag=*
+rocketmq.simple-consumer.filter-expression-type=tag
+```
+
+
+
+##### 编写代码
+
+```class
+ public void receiveSimpleConsumerMessageAsynchronously() {
+ do {
+ int maxMessageNum = 16;
+ // Set message invisible duration after it is received.
+ Duration invisibleDuration = Duration.ofSeconds(15);
+ // Set individual thread pool for receive callback.
+ ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
+ // Set individual thread pool for ack callback.
+ ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
+ CompletableFuture> future0;
+ try {
+ future0 = rocketMQClientTemplate.receiveAsync(maxMessageNum, invisibleDuration);
+ } catch (ClientException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ future0.whenCompleteAsync(((messages, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to receive message from remote", throwable);
+ // Return early.
+ return;
+ }
+ log.info("Received {} message(s)", messages.size());
+ // Using messageView as key rather than message id because message id may be duplicated.
+ final Map> map =
+ messages.stream().collect(Collectors.toMap(message -> message, rocketMQClientTemplate::ackAsync));
+ for (Map.Entry> entry : map.entrySet()) {
+ final MessageId messageId = entry.getKey().getMessageId();
+ final CompletableFuture future = entry.getValue();
+ future.whenCompleteAsync((v, t) -> {
+ if (null != t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ // Return early.
+ return;
+ }
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ }, ackCallbackExecutor);
+ }
+
+ }), receiveCallbackExecutor);
+ } while (true);
+ }
+```
+
+
+
+# ACL功能
+
+
+
+## Producer端
+
+
+
+### 修改application.properties
+
+```class
+rocketmq.producer.endpoints=localhost:8081
+rocketmq.producer.topic=normalTopic
+rocketmq.producer.access-key=yourAccessKey
+rocketmq.producer.secret-key=yourSecretKey
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口,并修改AccessKey与SecretKey为真实数据
+
+
+
+### 编写代码
+
+```java
+@SpringBootApplication
+public class ClientProducerACLApplication implements CommandLineRunner {
+
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+
+ @Value("${demo.acl.rocketmq.normal-topic}")
+ private String normalTopic;
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientProducerACLApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws ClientException {
+ testSendNormalMessage();
+ }
+
+ void testSendNormalMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build());
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+ }
+}
+
+```
+
+
+
+## Consumer端
+
+
+
+### 修改application.properties
+
+```class
+demo.acl.rocketmq.endpoints=localhost:8081
+demo.acl.rocketmq.topic=normalTopic
+demo.acl.rocketmq.consumer-group=normalGroup
+demo.acl.rocketmq.tag=*
+demo.acl.rocketmq.access-key=yourAccessKey
+demo.acl.rocketmq.secret-key=yourSecretKey
+```
+
+> 注意:
+> 请将上述示例配置中的127.0.0.1:8081替换成真实RocketMQ的endpoints地址与端口,并修改AccessKey与SecretKey为真实数据
+
+
+
+### 编写代码
+
+```java
+@Service
+@RocketMQMessageListener(accessKey = "${demo.acl.rocketmq.access-key:}", secretKey = "${demo.acl.rocketmq.secret-key:}", endpoints = "${demo.acl.rocketmq.endpoints:}", topic = "${demo.acl.rocketmq.topic:}",
+ consumerGroup = "${demo.acl.rocketmq.consumer-group:}", tag = "${demo.acl.rocketmq.tag:}")
+public class ACLConsumer implements RocketMQListener {
+ @Override
+ public ConsumeResult consume(MessageView messageView) {
+ System.out.println("handle my acl message:" + messageView);
+ return ConsumeResult.SUCCESS;
+ }
+}
+```
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-samples/pom.xml b/rocketmq-v5-client-spring-boot-samples/pom.xml
new file mode 100644
index 00000000..02b00837
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/pom.xml
@@ -0,0 +1,90 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-samples
+ pom
+ 2.2.4-SNAPSHOT
+
+ rocketmq-v5-client-spring-boot-samples
+ rocketmq-v5-client-spring-boot-samples
+
+
+ rocketmq-v5-client-producer-demo
+ rocketmq-v5-client-consume-demo
+ rocketmq-v5-client-consume-acl-demo
+ rocketmq-v5-client-producer-acl-demo
+
+
+
+ 1.8
+ 1.8
+ 2.2.4-SNAPSHOT
+
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-starter
+ ${rocketmq-v5-client-spring-boot-starter-version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 2.17
+
+
+ validate
+ validate
+
+ src/main/resources
+ style/rmq_checkstyle.xml
+ UTF-8
+ true
+ true
+
+
+ check
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.0.RELEASE
+
+
+
+ repackage
+
+
+
+
+
+
+
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml
new file mode 100644
index 00000000..082fd24e
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml
@@ -0,0 +1,29 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-samples
+ 2.2.4-SNAPSHOT
+
+ rocketmq-v5-client-consume-acl-demo
+
+
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumerACLApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumerACLApplication.java
new file mode 100644
index 00000000..544d5903
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumerACLApplication.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ClientConsumerACLApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientConsumerACLApplication.class, args);
+ }
+
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
new file mode 100644
index 00000000..2e583b7b
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot.consumer;
+
+
+import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+@Service
+@RocketMQMessageListener(accessKey = "${demo.acl.rocketmq.access-key:}", secretKey = "${demo.acl.rocketmq.secret-key:}",
+ tag = "${demo.acl.rocketmq.tag:}", topic = "${demo.acl.rocketmq.topic:}",
+ endpoints = "${demo.acl.rocketmq.endpoints:}", consumerGroup = "${demo.acl.rocketmq.consumer-group:}")
+public class ACLConsumer implements RocketMQListener {
+ @Override
+ public ConsumeResult consume(MessageView messageView) {
+ System.out.println("handle my acl message:" + messageView);
+ return ConsumeResult.SUCCESS;
+ }
+}
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
new file mode 100644
index 00000000..6aea301a
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
@@ -0,0 +1,7 @@
+demo.acl.rocketmq.endpoints=localhost:8081
+demo.acl.rocketmq.topic=normalTopic
+demo.acl.rocketmq.consumer-group=normalGroup
+demo.acl.rocketmq.access-key=RocketMQ
+demo.acl.rocketmq.secret-key=12345678
+demo.acl.rocketmq.tag=*
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml
new file mode 100644
index 00000000..2e549a6d
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml
@@ -0,0 +1,29 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-samples
+ 2.2.4-SNAPSHOT
+
+
+ rocketmq-v5-client-consume-demo
+
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumeApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumeApplication.java
new file mode 100644
index 00000000..8b2bd661
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientConsumeApplication.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+@SpringBootApplication
+public class ClientConsumeApplication implements CommandLineRunner {
+ private static final Logger log = LoggerFactory.getLogger(ClientConsumeApplication.class);
+
+ @Resource
+ RocketMQClientTemplate rocketMQClientTemplate;
+
+ @Resource(name = "extRocketMQTemplate")
+ RocketMQClientTemplate extRocketMQTemplate;
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientConsumeApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+ receiveSimpleConsumerMessage();
+ receiveExtSimpleConsumerMessage();
+ //receiveSimpleConsumerMessageAsynchronously();
+ }
+
+ public void receiveSimpleConsumerMessage() throws ClientException {
+ do {
+ final List messages = rocketMQClientTemplate.receive(16, Duration.ofSeconds(15));
+ log.info("Received {} message(s)", messages.size());
+ for (MessageView message : messages) {
+ log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
+ final MessageId messageId = message.getMessageId();
+ try {
+ rocketMQClientTemplate.ack(message);
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ } catch (Throwable t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ }
+ }
+ } while (true);
+ }
+
+ public void receiveExtSimpleConsumerMessage() throws ClientException {
+ do {
+ final List messages = extRocketMQTemplate.receive(16, Duration.ofSeconds(15));
+ log.info("Received {} message(s)", messages.size());
+ for (MessageView message : messages) {
+ log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
+ final MessageId messageId = message.getMessageId();
+ try {
+ rocketMQClientTemplate.ack(message);
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ } catch (Throwable t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ }
+ }
+ } while (true);
+ }
+
+
+ public void receiveSimpleConsumerMessageAsynchronously() {
+ do {
+ int maxMessageNum = 16;
+ // Set message invisible duration after it is received.
+ Duration invisibleDuration = Duration.ofSeconds(15);
+ // Set individual thread pool for receive callback.
+ ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
+ // Set individual thread pool for ack callback.
+ ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
+ CompletableFuture> future0;
+ try {
+ future0 = rocketMQClientTemplate.receiveAsync(maxMessageNum, invisibleDuration);
+ } catch (ClientException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ future0.whenCompleteAsync(((messages, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to receive message from remote", throwable);
+ // Return early.
+ return;
+ }
+ log.info("Received {} message(s)", messages.size());
+ // Using messageView as key rather than message id because message id may be duplicated.
+ final Map> map =
+ messages.stream().collect(Collectors.toMap(message -> message, rocketMQClientTemplate::ackAsync));
+ for (Map.Entry> entry : map.entrySet()) {
+ final MessageId messageId = entry.getKey().getMessageId();
+ final CompletableFuture future = entry.getValue();
+ future.whenCompleteAsync((v, t) -> {
+ if (null != t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ // Return early.
+ return;
+ }
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ }, ackCallbackExecutor);
+ }
+
+ }), receiveCallbackExecutor);
+ } while (true);
+ }
+
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
new file mode 100644
index 00000000..25b66689
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+
+@ExtConsumerResetConfiguration(topic = "${ext.rocketmq.topic:}")
+public class ExtRocketMQTemplate extends RocketMQClientTemplate {
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/FifoConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/FifoConsumer.java
new file mode 100644
index 00000000..ac33a726
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/FifoConsumer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.client.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+@Service
+@RocketMQMessageListener(endpoints = "${demo.fifo.rocketmq.endpoints:}", topic = "${demo.fifo.rocketmq.topic:}",
+ consumerGroup = "${demo.fifo.rocketmq.consumer-group:}", tag = "${demo.fifo.rocketmq.tag:}")
+public class FifoConsumer implements RocketMQListener {
+
+ @Override
+ public ConsumeResult consume(MessageView messageView) {
+ System.out.println("handle my fifo message:" + messageView);
+ return ConsumeResult.SUCCESS;
+ }
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TransConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TransConsumer.java
new file mode 100644
index 00000000..1ef973af
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TransConsumer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot.consumer;
+
+
+import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+@Service
+@RocketMQMessageListener(endpoints = "${demo.trans.rocketmq.endpoints:}", topic = "${demo.trans.rocketmq.topic:}",
+ consumerGroup = "${demo.trans.rocketmq.consumer-group:}", tag = "${demo.trans.rocketmq.tag:}")
+public class TransConsumer implements RocketMQListener {
+ public ConsumeResult consume(MessageView messageView) {
+ System.out.println("handle my transaction message:" + messageView);
+ return ConsumeResult.SUCCESS;
+ }
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/resources/application.properties
new file mode 100644
index 00000000..39c6a0b1
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/resources/application.properties
@@ -0,0 +1,14 @@
+rocketmq.simple-consumer.endpoints=localhost:8081
+rocketmq.simple-consumer.consumer-group=normalGroup
+rocketmq.simple-consumer.topic=normalTopic
+rocketmq.simple-consumer.tag=*
+rocketmq.simple-consumer.filter-expression-type=tag
+demo.fifo.rocketmq.endpoints=localhost:8081
+demo.fifo.rocketmq.topic=fifoTopic
+demo.fifo.rocketmq.consumer-group=fifoGroup
+demo.fifo.rocketmq.tag=*
+demo.trans.rocketmq.endpoints=localhost:8081
+demo.trans.rocketmq.topic=transTopic
+demo.trans.rocketmq.consumer-group=transGroup
+demo.trans.rocketmq.tag=*
+ext.rocketmq.topic=delayTopic
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml
new file mode 100644
index 00000000..c7e2cd1d
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml
@@ -0,0 +1,28 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-samples
+ 2.2.4-SNAPSHOT
+
+
+ rocketmq-v5-client-producer-acl-demo
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerACLApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerACLApplication.java
new file mode 100644
index 00000000..66b4f2ca
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerACLApplication.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+import org.apache.rocketmq.samples.springboot.domain.UserMessage;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.messaging.support.MessageBuilder;
+
+import javax.annotation.Resource;
+import java.nio.charset.StandardCharsets;
+
+@SpringBootApplication
+public class ClientProducerACLApplication implements CommandLineRunner {
+
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+
+ @Value("${rocketmq.producer.topic}")
+ private String normalTopic;
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientProducerACLApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws ClientException {
+ testSendNormalMessage();
+ }
+
+ void testSendNormalMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build());
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+ }
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java
new file mode 100644
index 00000000..b31e48d0
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class UserMessage {
+ int id;
+ private String userName;
+ private Byte userAge;
+
+ public int getId() {
+ return id;
+ }
+
+ public UserMessage setId(int id) {
+ this.id = id;
+ return this;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public UserMessage setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public Byte getUserAge() {
+ return userAge;
+ }
+
+ public UserMessage setUserAge(Byte userAge) {
+ this.userAge = userAge;
+ return this;
+ }
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/resources/application.properties
new file mode 100644
index 00000000..962bc26e
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/src/main/resources/application.properties
@@ -0,0 +1,5 @@
+rocketmq.producer.endpoints=localhost:8081
+rocketmq.producer.topic=normalTopic
+rocketmq.producer.access-key=RocketMQ
+rocketmq.producer.secret-key=12345678
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml
new file mode 100644
index 00000000..d6a68592
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml
@@ -0,0 +1,29 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-samples
+ 2.2.4-SNAPSHOT
+
+
+ rocketmq-v5-client-producer-demo
+
+
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerApplication.java
new file mode 100644
index 00000000..f03df016
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/ClientProducerApplication.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.apis.producer.Transaction;
+import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+
+import org.apache.rocketmq.client.annotation.RocketMQTransactionListener;
+import org.apache.rocketmq.client.common.Pair;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+import org.apache.rocketmq.client.core.RocketMQTransactionChecker;
+import org.apache.rocketmq.samples.springboot.domain.UserMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.messaging.support.MessageBuilder;
+
+import javax.annotation.Resource;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@SpringBootApplication
+public class ClientProducerApplication implements CommandLineRunner {
+
+ private static final Logger log = LoggerFactory.getLogger(ClientProducerApplication.class);
+
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+
+ @Value("${demo.rocketmq.fifo-topic}")
+ private String fifoTopic;
+
+ @Value("${demo.rocketmq.normal-topic}")
+ private String normalTopic;
+
+ @Value("${demo.rocketmq.delay-topic}")
+ private String delayTopic;
+
+ @Value("${demo.rocketmq.trans-topic}")
+ private String transTopic;
+
+ @Value("${demo.rocketmq.message-group}")
+ private String messageGroup;
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(ClientProducerApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws ClientException {
+ testASycSendMessage();
+ testSendDelayMessage();
+ testSendFIFOMessage();
+ testSendNormalMessage();
+ testSendTransactionMessage();
+ }
+
+ void testASycSendMessage() {
+
+ CompletableFuture future0 = new CompletableFuture<>();
+ CompletableFuture future1 = new CompletableFuture<>();
+ CompletableFuture future2 = new CompletableFuture<>();
+ ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
+
+ future0.whenCompleteAsync((sendReceipt, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to send message", throwable);
+ return;
+ }
+ log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ }, sendCallbackExecutor);
+
+ future1.whenCompleteAsync((sendReceipt, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to send message", throwable);
+ return;
+ }
+ log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ }, sendCallbackExecutor);
+
+ future2.whenCompleteAsync((sendReceipt, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to send message", throwable);
+ return;
+ }
+ log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ }, sendCallbackExecutor);
+
+ CompletableFuture completableFuture0 = rocketMQClientTemplate.asyncSendNormalMessage(normalTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), future0);
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, completableFuture0);
+
+ CompletableFuture completableFuture1 = rocketMQClientTemplate.asyncSendFifoMessage(fifoTopic, "fifo message",
+ messageGroup, future1);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, completableFuture1);
+
+ CompletableFuture completableFuture2 = rocketMQClientTemplate.asyncSendDelayMessage(delayTopic,
+ "delay message".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(10), future2);
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, completableFuture2);
+ }
+
+ void testSendDelayMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), Duration.ofSeconds(10));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build(), Duration.ofSeconds(30));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "this is my message",
+ Duration.ofSeconds(60));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(delayTopic, "byte messages".getBytes(StandardCharsets.UTF_8),
+ Duration.ofSeconds(90));
+ System.out.printf("delaySend to topic %s sendReceipt=%s %n", delayTopic, sendReceipt);
+ }
+
+ void testSendFIFOMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build(), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "fifo message", messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendFifoMessage(fifoTopic, "byte message".getBytes(StandardCharsets.UTF_8), messageGroup);
+ System.out.printf("fifoSend to topic %s sendReceipt=%s %n", fifoTopic, sendReceipt);
+ }
+
+ void testSendNormalMessage() {
+ SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "normal message");
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, "byte message".getBytes(StandardCharsets.UTF_8));
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+
+ sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(normalTopic, MessageBuilder.
+ withPayload("test message".getBytes()).build());
+ System.out.printf("normalSend to topic %s sendReceipt=%s %n", normalTopic, sendReceipt);
+ }
+
+ void testSendTransactionMessage() throws ClientException {
+ Pair pair;
+ SendReceipt sendReceipt;
+ try {
+ pair = rocketMQClientTemplate.sendMessageInTransaction(transTopic, MessageBuilder.
+ withPayload(new UserMessage()
+ .setId(1).setUserName("name").setUserAge((byte) 3)).setHeader("OrderId", 1).build());
+ } catch (ClientException e) {
+ throw new RuntimeException(e);
+ }
+ sendReceipt = pair.getSendReceipt();
+ System.out.printf("transactionSend to topic %s sendReceipt=%s %n", transTopic, sendReceipt);
+ Transaction transaction = pair.getTransaction();
+ // executed local transaction
+ if (doLocalTransaction(1)) {
+ transaction.commit();
+ } else {
+ transaction.rollback();
+ }
+ }
+
+ @RocketMQTransactionListener
+ static class TransactionListenerImpl implements RocketMQTransactionChecker {
+ @Override
+ public TransactionResolution check(MessageView messageView) {
+ if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
+ log.info("Receive transactional message check, message={}", messageView);
+ return TransactionResolution.COMMIT;
+ }
+ log.info("rollback transaction");
+ return TransactionResolution.ROLLBACK;
+ }
+ }
+
+ boolean doLocalTransaction(int number) {
+ log.info("execute local transaction");
+ return number > 0;
+ }
+
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java
new file mode 100644
index 00000000..b31e48d0
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/UserMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class UserMessage {
+ int id;
+ private String userName;
+ private Byte userAge;
+
+ public int getId() {
+ return id;
+ }
+
+ public UserMessage setId(int id) {
+ this.id = id;
+ return this;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public UserMessage setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public Byte getUserAge() {
+ return userAge;
+ }
+
+ public UserMessage setUserAge(Byte userAge) {
+ this.userAge = userAge;
+ return this;
+ }
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/resources/application.properties
new file mode 100644
index 00000000..36fc218f
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/resources/application.properties
@@ -0,0 +1,7 @@
+rocketmq.producer.endpoints=localhost:8081
+rocketmq.producer.topic=normalTopic
+demo.rocketmq.fifo-topic=fifoTopic
+demo.rocketmq.delay-topic=delayTopic
+demo.rocketmq.trans-topic=transTopic
+demo.rocketmq.normal-topic=normalTopic
+demo.rocketmq.message-group=group1
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-samples/style/copyright/Apache.xml b/rocketmq-v5-client-spring-boot-samples/style/copyright/Apache.xml
new file mode 100644
index 00000000..e3e3dec3
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/style/copyright/Apache.xml
@@ -0,0 +1,23 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-samples/style/copyright/profiles_settings.xml b/rocketmq-v5-client-spring-boot-samples/style/copyright/profiles_settings.xml
new file mode 100644
index 00000000..747c7e2b
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-samples/style/rmq_checkstyle.xml b/rocketmq-v5-client-spring-boot-samples/style/rmq_checkstyle.xml
new file mode 100644
index 00000000..2e9658f4
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/rocketmq-v5-client-spring-boot-samples/style/rmq_codeStyle.xml b/rocketmq-v5-client-spring-boot-samples/style/rmq_codeStyle.xml
new file mode 100644
index 00000000..9db075e3
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/style/rmq_codeStyle.xml
@@ -0,0 +1,157 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-starter/pom.xml b/rocketmq-v5-client-spring-boot-starter/pom.xml
new file mode 100644
index 00000000..a8d638bf
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-starter/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-parent
+ 2.2.4-SNAPSHOT
+ ../rocketmq-v5-client-spring-boot-parent/pom.xml
+
+
+ rocketmq-v5-client-spring-boot-starter
+ jar
+ 2.2.4-SNAPSHOT
+
+ rocketmq-v5-client-spring-boot-starter
+ rocketmq-v5-client-spring-boot-starter
+
+
+ 8
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot
+
+
+
+
+
diff --git a/rocketmq-v5-client-spring-boot/pom.xml b/rocketmq-v5-client-spring-boot/pom.xml
new file mode 100644
index 00000000..960189a6
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot/pom.xml
@@ -0,0 +1,100 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-parent
+ 2.2.4-SNAPSHOT
+ ../rocketmq-v5-client-spring-boot-parent/pom.xml
+
+
+ rocketmq-v5-client-spring-boot
+ jar
+
+ rocketmq-v5-client-spring-boot
+ rocketmq-v5-client-spring-boot
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+
+
+ org.springframework.boot
+ spring-boot
+ true
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ true
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure-processor
+ true
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ true
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework
+ spring-messaging
+
+
+ org.springframework
+ spring-core
+
+
+ org.springframework
+ spring-context
+
+
+ org.springframework
+ spring-aop
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+
+
+ junit
+ junit
+ test
+
+
+
+
+
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
new file mode 100644
index 00000000..8615e2b6
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.annotation;
+
+import org.springframework.stereotype.Component;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.annotation.Documented;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ExtConsumerResetConfiguration {
+
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}";
+ String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}";
+ String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}";
+ String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}";
+ String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}";
+ String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}";
+
+ /**
+ * The component name of the Consumer configuration.
+ */
+ String value() default "";
+
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+ /**
+ * Tag of consumer.
+ */
+ String tag() default TAG_PLACEHOLDER;
+
+ /**
+ * Topic name of consumer.
+ */
+ String topic() default TOPIC_PLACEHOLDER;
+
+ /**
+ * The access point that the SDK should communicate with.
+ */
+ String endpoints() default ENDPOINTS_PLACEHOLDER;
+
+ /**
+ * The load balancing group for the simple consumer.
+ */
+ String consumerGroup() default CONSUMER_GROUP_PLACEHOLDER;
+
+ /**
+ * The type of filter expression
+ */
+ String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER;
+
+ /**
+ * The requestTimeout of client,it is 3s by default.
+ */
+ int requestTimeout() default 3;
+
+ /**
+ * The max await time when receive messages from the server.
+ */
+ int awaitDuration() default 0;
+
+}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java
new file mode 100644
index 00000000..8849d0fc
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtProducerResetConfiguration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.annotation;
+
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.annotation.Documented;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ExtProducerResetConfiguration {
+
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.producer.accessKey:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.producer.secretKey:}";
+ String TOPIC_PLACEHOLDER = "${rocketmq.producer.topic:}";
+ String ENDPOINTS_PLACEHOLDER = "${rocketmq.producer.endpoints:}";
+
+ /**
+ * The component name of the Producer configuration.
+ */
+ String value() default "";
+
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+ /**
+ * The access point that the SDK should communicate with.
+ */
+ String endpoints() default ENDPOINTS_PLACEHOLDER;
+
+ /**
+ * Topic name of consumer.
+ */
+ String topic() default TOPIC_PLACEHOLDER;
+
+ /**
+ * Request timeout is 3s by default.
+ */
+ int requestTimeout() default 3;
+
+ /**
+ * Enable or disable the use of Secure Sockets Layer (SSL) for network transport.
+ */
+ boolean sslEnabled() default true;
+
+ /**
+ * Max attempts for max internal retries of message publishing.
+ */
+ int maxAttempts() default 3;
+
+}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java
new file mode 100644
index 00000000..89107aa1
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.annotation.Documented;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface RocketMQMessageListener {
+
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.push-consumer.access-key:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
+ String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}";
+ String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}";
+ String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}";
+
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+ /**
+ * The access point that the SDK should communicate with.
+ */
+ String endpoints() default ENDPOINTS_PLACEHOLDER;
+
+ /**
+ * Topic name of consumer.
+ */
+ String topic() default TOPIC_PLACEHOLDER;
+
+ /**
+ * Tag of consumer.
+ */
+ String tag() default TAG_PLACEHOLDER;
+
+ /**
+ * The type of filter expression
+ */
+ String filterExpressionType() default "tag";
+
+ /**
+ * The load balancing group for the simple consumer.
+ */
+ String consumerGroup() default "";
+
+ /**
+ * The requestTimeout of client,it is 3s by default.
+ */
+ int requestTimeout() default 3;
+
+
+ int maxCachedMessageCount() default 1024;
+
+
+ int maxCacheMessageSizeInBytes() default 67108864;
+
+
+ int consumptionThreadCount() default 20;
+
+
+}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
new file mode 100644
index 00000000..61f3e1d2
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.annotation;
+
+import org.apache.rocketmq.client.autoconfigure.ListenerContainerConfiguration;
+import org.springframework.aop.support.AopUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.core.OrderComparator;
+import org.springframework.core.annotation.AnnotationUtils;
+
+import java.lang.reflect.AnnotatedElement;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
+
+ private ApplicationContext applicationContext;
+
+ private AnnotationEnhancer enhancer;
+
+ private ListenerContainerConfiguration listenerContainerConfiguration;
+
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ return bean;
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ Class> targetClass = AopUtils.getTargetClass(bean);
+ RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
+ if (ann != null) {
+ RocketMQMessageListener enhance = enhance(targetClass, ann);
+ if (listenerContainerConfiguration != null) {
+ listenerContainerConfiguration.registerContainer(beanName, bean, enhance);
+ }
+ }
+ return bean;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ buildEnhancer();
+ this.listenerContainerConfiguration = this.applicationContext.getBean(ListenerContainerConfiguration.class);
+ }
+
+ private void buildEnhancer() {
+ if (this.applicationContext != null) {
+ Map enhancersMap =
+ this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
+ if (enhancersMap.size() > 0) {
+ List enhancers = enhancersMap.values()
+ .stream()
+ .sorted(new OrderComparator())
+ .collect(Collectors.toList());
+ this.enhancer = (attrs, element) -> {
+ Map newAttrs = attrs;
+ for (AnnotationEnhancer enh : enhancers) {
+ newAttrs = enh.apply(newAttrs, element);
+ }
+ return attrs;
+ };
+ }
+ }
+ }
+
+ private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) {
+ if (this.enhancer == null) {
+ return ann;
+ } else {
+ return AnnotationUtils.synthesizeAnnotation(
+ this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null);
+ }
+ }
+
+ public interface AnnotationEnhancer extends BiFunction