Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #147]An enhancement about the convert in RocketMQTemplate #152

Merged
merged 6 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rocketmq-spring-boot-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
</modules>

<properties>
<rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
<rocketmq-spring-boot-starter-version>2.0.4-SNAPSHOT</rocketmq-spring-boot-starter-version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* MessageExtConsumer, consume listener impl class.
*/
@Service
@RocketMQMessageListener(topic = "message-ext-topic", selectorExpression = "tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
@RocketMQMessageListener(topic = "${demo.rocketmq.msgExtTopic}", selectorExpression = "tag0||tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ public class OrderPaidEventConsumer implements RocketMQListener<OrderPaidEvent>

@Override
public void onMessage(OrderPaidEvent orderPaidEvent) {
System.out.printf("------- OrderPaidEventConsumer received: %s \n", orderPaidEvent);
System.out.printf("------- OrderPaidEventConsumer received: %s [orderId : %s]\n", orderPaidEvent,orderPaidEvent.getOrderId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.samples.springboot.consumer;

import org.apache.rocketmq.samples.springboot.domain.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
* RocketMQMessageListener
*/
@Service
@RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<User> {
@Override
public void onMessage(User message) {

System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getUserAge(), message.getUserName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.samples.springboot.domain;

public class User {
private String userName;
private Byte userAge;

public String getUserName() {
return userName;
}

public User setUserName(String userName) {
this.userName = userName;
return this;
}

public Byte getUserAge() {
return userAge;
}

public User setUserAge(Byte userAge) {
this.userAge = userAge;
return this;
}

@Override
public String toString() {
return "User{" +
"userName='" + userName + '\'' +
", userAge=" + userAge +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ demo.rocketmq.topic=string-topic
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
demo.rocketmq.transTopic=spring-transaction-topic
demo.rocketmq.topic.user=user-topic

# another nameserver different global
demo.rocketmq.myNameServer=127.0.0.1:9876
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.samples.springboot.domain.OrderPaidEvent;
import org.apache.rocketmq.samples.springboot.domain.User;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
Expand All @@ -30,8 +31,10 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;

import javax.annotation.Resource;
import java.math.BigDecimal;
Expand All @@ -52,6 +55,9 @@ public class ProducerApplication implements CommandLineRunner {
private String springTransTopic;
@Value("${demo.rocketmq.topic}")
private String springTopic;
@Value("${demo.rocketmq.topic.user}")
private String userTopic;

@Value("${demo.rocketmq.orderTopic}")
private String orderPaidTopic;
@Value("${demo.rocketmq.msgExtTopic}")
Expand All @@ -69,8 +75,15 @@ public void run(String... args) throws Exception {
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

sendResult = rocketMQTemplate.syncSend(userTopic, new User().setUserAge((byte)18).setUserName("Kitty"));
System.out.printf("syncSend1 to topic %s sendResult=%s %n", userTopic, sendResult);

sendResult = rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(
new User().setUserAge((byte)21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", userTopic, sendResult);

// Use the extRocketMQTemplate
sendResult = extRocketMQTemplate.syncSend(springTopic, "Hello, World!");
sendResult = extRocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World!2222".getBytes()).build());
System.out.printf("extRocketMQTemplate.syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

// Send string with spring Message
Expand All @@ -79,10 +92,12 @@ public void run(String... args) throws Exception {

// Send user-defined object
rocketMQTemplate.asyncSend(orderPaidTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}

@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
Expand All @@ -95,7 +110,6 @@ public void onException(Throwable var1) {
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag1");


// Send a batch of strings
testBatchMessages();

Expand All @@ -107,17 +121,16 @@ private void testBatchMessages() {
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}

SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);

System.out.printf("--- Batch messages send result :" + sr);
}


private void testTransaction() throws MessagingException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.samples.springboot.domain;

public class User {
private String userName;
private Byte userAge;

public String getUserName() {
return userName;
}

public User setUserName(String userName) {
this.userName = userName;
return this;
}

public Byte getUserAge() {
return userAge;
}

public User setUserAge(Byte userAge) {
this.userAge = userAge;
return this;
}

@Override
public String toString() {
return "User{" +
"userName='" + userName + '\'' +
", userAge=" + userAge +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ demo.rocketmq.topic=string-topic
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
demo.rocketmq.transTopic=spring-transaction-topic
demo.rocketmq.topic.user=user-topic

demo.rocketmq.extNameServer=127.0.0.1:9876
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.rocketmq.spring.autoconfigure;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import java.util.Objects;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
Expand All @@ -38,9 +41,7 @@
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.Objects;

import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
Expand All @@ -53,18 +54,20 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
private RocketMQProperties rocketMQProperties;

private ObjectMapper objectMapper;
private RocketMQMessageConverter rocketMQMessageConverter;

public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
StandardEnvironment environment,
RocketMQProperties rocketMQProperties) {
RocketMQMessageConverter rocketMQMessageConverter,
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.objectMapper = rocketMQMessageObjectMapper;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
this.applicationContext = (ConfigurableApplicationContext)applicationContext;
}

@Override
Expand All @@ -84,7 +87,7 @@ private void registerTemplate(String beanName, Object bean) {
}

ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
validate(annotation, genericApplicationContext);

DefaultMQProducer mqProducer = createProducer(annotation);
Expand All @@ -94,13 +97,12 @@ private void registerTemplate(String beanName, Object bean) {
mqProducer.start();
} catch (MQClientException e) {
throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
beanName), e);
beanName), e);
}
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean;
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
rocketMQTemplate.setObjectMapper(objectMapper);


log.info("Set real producer to :{} {}", beanName, annotation.value());
}

Expand All @@ -124,7 +126,7 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota

if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
annotation.enableMsgTrace(), customizedTraceTopic);
annotation.enableMsgTrace(), customizedTraceTopic);
producer.setVipChannelEnabled(false);
} else {
producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
Expand All @@ -141,18 +143,19 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota
return producer;
}

private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {
private void validate(ExtRocketMQTemplateConfiguration annotation,
GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQTemplateConfiguration",
annotation.value()));
"please check the @ExtRocketMQTemplateConfiguration",
annotation.value()));
}

if (rocketMQProperties.getNameServer() == null ||
rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
throw new BeanDefinitionValidationException(
"Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
"global property, please use the default RocketMQTemplate!");
"Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
"global property, please use the default RocketMQTemplate!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

@Configuration
@ConditionalOnMissingBean(ObjectMapper.class)
@Deprecated
class JacksonFallbackConfiguration {

@Bean
Expand Down
Loading