Skip to content

Commit

Permalink
[ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307)
Browse files Browse the repository at this point in the history
* Support LitePullMessage in RocketMQ-Spring

* Add a property in litePullConnsumer named pullBatchSize.

* Add necessary comments.
  • Loading branch information
heihaozi authored Dec 4, 2020
1 parent d20b01c commit af9f20e
Show file tree
Hide file tree
Showing 14 changed files with 718 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@

package org.apache.rocketmq.samples.springboot;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;
import java.util.List;

/**
* ConsumerApplication
*/
@SpringBootApplication
public class ConsumerACLApplication {
public class ConsumerACLApplication implements CommandLineRunner {

@Resource
private RocketMQTemplate rocketMQTemplate;

public static void main(String[] args) {
SpringApplication.run(ConsumerACLApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
////This is an example of pull consumer with access-key and secret-key.
List<String> messages = rocketMQTemplate.receive(String.class);
System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
spring.application.name=rocketmq-consume-acl-demo

rocketmq.name-server=Endpoint_of_Aliware_MQ
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
rocketmq.topic=normal_topic_define_in_Aliware_MQ

# properties used in application code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,39 @@

package org.apache.rocketmq.samples.springboot;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;
import java.util.List;

/**
* ConsumerApplication
*/
@SpringBootApplication
public class ConsumerApplication {
public class ConsumerApplication implements CommandLineRunner {

@Resource
private RocketMQTemplate rocketMQTemplate;

@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
//This is an example of pull consumer using rocketMQTemplate.
List<String> messages = rocketMQTemplate.receive(String.class);
System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);

//This is an example of pull consumer using extRocketMQTemplate.
messages = extRocketMQTemplate.receive(String.class);
System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.samples.springboot;

import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
spring.application.name=rocketmq-consume-demo

rocketmq.name-server=localhost:9876
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
# properties used in application code
demo.rocketmq.topic=string-topic
demo.rocketmq.bytesRequestTopic=bytesRequestTopic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.spring.annotation;

import org.springframework.stereotype.Component;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface ExtRocketMQConsumerConfiguration {

String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String GROUP_PLACEHOLDER = "${rocketmq.consumer.group:}";
String TOPIC_PLACEHOLDER = "${rocketmq.consumer.topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";

/**
* The component name of the Producer configuration.
*/
String value() default "";

/**
* The property of "name-server".
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;

/**
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;

/**
* Group name of consumer.
*/
String group() default GROUP_PLACEHOLDER;

/**
* Topic name of consumer.
*/
String topic() default TOPIC_PLACEHOLDER;

/**
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
MessageModel messageModel() default MessageModel.CLUSTERING;

/**
* Control how to selector message.
*
* @see SelectorType
*/
SelectorType selectorType() default SelectorType.TAG;

/**
* Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
*/
String selectorExpression() default "*";

/**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;

/**
* The property of "secret-key".
*/
String secretKey() default SECRET_KEY_PLACEHOLDER;

/**
* Maximum number of messages pulled each time.
*/
int pullBatchSize() default 10;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.spring.autoconfigure;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.stream.Collectors;

@Configuration
public class ExtConsumerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {

private final static Logger log = LoggerFactory.getLogger(ExtConsumerResetConfiguration.class);

private ConfigurableApplicationContext applicationContext;

private StandardEnvironment environment;

private RocketMQProperties rocketMQProperties;

private RocketMQMessageConverter rocketMQMessageConverter;

public ExtConsumerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}

@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext
.getBeansWithAnnotation(ExtRocketMQConsumerConfiguration.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

beans.forEach(this::registerTemplate);
}

private void registerTemplate(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
}

ExtRocketMQConsumerConfiguration annotation = clazz.getAnnotation(ExtRocketMQConsumerConfiguration.class);
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
validate(annotation, genericApplicationContext);

DefaultLitePullConsumer consumer = null;
try {
consumer = createConsumer(annotation);
// Set instanceName same as the beanName
consumer.setInstanceName(beanName);
consumer.start();
} catch (Exception e) {
log.error("Failed to startup PullConsumer for RocketMQTemplate {}", beanName, e);
}
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
rocketMQTemplate.setConsumer(consumer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real consumer to :{} {}", beanName, annotation.value());
}

private DefaultLitePullConsumer createConsumer(ExtRocketMQConsumerConfiguration annotation)
throws MQClientException {

RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
if (consumerConfig == null) {
consumerConfig = new RocketMQProperties.Consumer();
}
String nameServer = resolvePlaceholders(annotation.nameServer(), rocketMQProperties.getNameServer());
String groupName = resolvePlaceholders(annotation.group(), consumerConfig.getGroup());
String topicName = resolvePlaceholders(annotation.topic(), consumerConfig.getTopic());
Assert.hasText(nameServer, "[nameServer] must not be null");
Assert.hasText(groupName, "[group] must not be null");
Assert.hasText(topicName, "[topic] must not be null");

String accessChannel = resolvePlaceholders(annotation.accessChannel(), rocketMQProperties.getAccessChannel());
MessageModel messageModel = annotation.messageModel();
SelectorType selectorType = annotation.selectorType();
String selectorExpression = annotation.selectorExpression();
String ak = resolvePlaceholders(annotation.accessKey(), consumerConfig.getAccessKey());
String sk = resolvePlaceholders(annotation.secretKey(), consumerConfig.getSecretKey());
int pullBatchSize = annotation.pullBatchSize();

DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
return litePullConsumer;
}

private String resolvePlaceholders(String text, String defaultValue) {
String value = environment.resolvePlaceholders(text);
return StringUtils.isEmpty(value) ? defaultValue : value;
}

private void validate(ExtRocketMQConsumerConfiguration annotation,
GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(
String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQConsumerConfiguration",
annotation.value()));
}
}
}
Loading

0 comments on commit af9f20e

Please sign in to comment.