Skip to content

Commit

Permalink
Merge pull request apache#77 from walking98/master
Browse files Browse the repository at this point in the history
[Issue apache#78] Add new accessChannel property for both producer/consumer @SInCE RMQ 4.5.1
  • Loading branch information
duhenglucky authored May 30, 2019
2 parents eca080d + 2afd4f1 commit e0c5499
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";

/**
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
Expand Down Expand Up @@ -103,4 +104,9 @@
* The property of "name-server".
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;

/**
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.spring.autoconfigure;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
Expand Down Expand Up @@ -113,7 +114,11 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String

String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
container.setRocketMQMessageListener(annotation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
Expand Down Expand Up @@ -80,6 +81,8 @@ public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");

String accessChannel = rocketMQProperties.getAccessChannel();

DefaultMQProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
Expand All @@ -94,6 +97,9 @@ public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties
}

producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public class RocketMQProperties {
*/
private String nameServer;

/**
* Enum type for accesChannel, values: LOCAL, CLOUD
*/
private String accessChannel;

private Producer producer;

public String getNameServer() {
Expand All @@ -39,6 +44,14 @@ public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}

public String getAccessChannel() {
return accessChannel;
}

public void setAccessChannel(String accessChannel) {
this.accessChannel = accessChannel;
}

public RocketMQProperties.Producer getProducer() {
return producer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.spring.support;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,

private String nameServer;

private AccessChannel accessChannel = AccessChannel.LOCAL;

private String consumerGroup;

private String topic;
Expand Down Expand Up @@ -125,6 +128,14 @@ public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}

public AccessChannel getAccessChannel() {
return accessChannel;
}

public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
}

public String getConsumerGroup() {
return consumerGroup;
}
Expand Down Expand Up @@ -431,6 +442,9 @@ private void initRocketMQPushConsumer() throws MQClientException {
} else {
consumer.setNamesrvAddr(nameServer);
}
if (accessChannel != null) {
consumer.setAccessChannel(accessChannel);
}
consumer.setConsumeThreadMax(consumeThreadMax);
if (consumeThreadMax < consumer.getConsumeThreadMin()) {
consumer.setConsumeThreadMin(consumeThreadMax);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,26 @@ public void testDefaultMQProducerNotCreatedByDefault() {
@Test
public void testDefaultMQProducerWithRelaxPropertyName() {
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
"rocketmq.producer.group=spring_rocketmq").
"rocketmq.producer.group=spring_rocketmq",
"rocketmq.accessChannel=LOCAL").
run((context) -> {
assertThat(context).hasSingleBean(DefaultMQProducer.class);
assertThat(context).hasSingleBean(RocketMQProperties.class);
});

}

@Test
public void testBadAccessChannelProperty() {
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
"rocketmq.producer.group=spring_rocketmq",
"rocketmq.accessChannel=LOCAL123").
run((context) -> {
//Should throw exception for bad accessChannel property
assertThat(context).getFailure();
});
}

@Test
public void testDefaultMQProducer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
Expand Down

0 comments on commit e0c5499

Please sign in to comment.