Skip to content

Commit

Permalink
feat($RabbitMQ): support RabbitMQ delayed messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnymillergh committed Oct 2, 2021
1 parent 8a35ceb commit 51b5f2b
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.jmsoftware.maf.authcenter.configuration;

import com.google.common.collect.Maps;
import com.jmsoftware.maf.springcloudstarter.property.MafProjectProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

import static cn.hutool.core.text.CharSequenceUtil.format;
import static com.jmsoftware.maf.springcloudstarter.rabbitmq.DelayedMessageConfiguration.*;

/**
* <h1>DelayedMessageSender</h1>
* <p>
* Change description here.
*
* @author Johnny Miller (鍾俊), email: johnnysviva@outlook.com, 10/2/21 1:27 PM
**/
@Slf4j
@Component
@RequiredArgsConstructor
public class DelayedMessageSender {
private final RabbitTemplate rabbitTemplate;
private final MafProjectProperties mafProjectProperties;

@Scheduled(fixedDelay = 2L, timeUnit = TimeUnit.MINUTES)
public void sendMessage() {
val message = format("{} - {}", this.mafProjectProperties.getProjectArtifactId(), LocalDateTime.now());
val messageMap = Maps.<String, String>newHashMap();
messageMap.put("message", message);
this.rabbitTemplate.convertAndSend(
DELAYED_MESSAGE_EXCHANGE_NAME,
DELAYED_MESSAGE_ROUTING_KEY,
messageMap,
messagePostProcessor -> {
// Make it be consumed after 30 seconds
messagePostProcessor.getMessageProperties().setDelay(30 * 1000);
return messagePostProcessor;
}
);
log.info("Sent a delayed message into queue: {}, messageMap: {}", DELAYED_MESSAGE_QUEUE_NAME, messageMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.jmsoftware.maf.mafmis.configuration;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import static com.jmsoftware.maf.springcloudstarter.rabbitmq.DelayedMessageConfiguration.DELAYED_MESSAGE_QUEUE_NAME;

/**
* <h1>DelayedMessageListener</h1>
* <p>
* Change description here.
*
* @author Johnny Miller (鍾俊), email: johnnysviva@outlook.com, 10/2/21 1:38 PM
**/
@Slf4j
@Component
@RequiredArgsConstructor
public class DelayedMessageListener {
@RabbitListener(queues = DELAYED_MESSAGE_QUEUE_NAME)
public void receiveMessage(final Message message) {
log.info("Received message as a generic AMQP 'Message' wrapper: {}", message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.jmsoftware.maf.springcloudstarter.rabbitmq;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;

/**
* <h1>DelayedMessageConfiguration</h1>
* <p>
* Change description here.
*
* @author Johnny Miller (鍾俊), email: johnnysviva@outlook.com, 10/2/21 12:03 PM
* @see
* <a href='https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq'>Scheduling Messages with RabbitMQ</a>
* @see <a href='https://github.com/rabbitmq/rabbitmq-delayed-message-exchange'>RabbitMQ Delayed Message Plugin</a>
* @see <a href='https://www.rabbitmq.com/community-plugins.html'>Community Plugins</a>
**/
@Slf4j
public class DelayedMessageConfiguration {
public static final String DELAYED_MESSAGE_QUEUE_NAME = "delayed-message.queue";
public static final String DELAYED_MESSAGE_EXCHANGE_NAME = "delayed-message.exchange";
public static final String DELAYED_MESSAGE_ROUTING_KEY = "delayed-message.routing-key";

/**
* Delayed message queue, which is durable, non-exclusive and non auto-delete.
*
* @return the delayed message queue
*/
@Bean
public Queue delayedMessageQueue() {
val delayedMessageQueue = QueueBuilder.durable(DELAYED_MESSAGE_QUEUE_NAME).build();
log.warn("Built delayed message queue: {}", delayedMessageQueue);
return delayedMessageQueue;
}

/**
* Delayed message exchange custom exchange.
*
* @return the custom exchange
*/
@Bean
public Exchange delayedMessageExchange() {
val arguments = Maps.<String, Object>newHashMap();
// To use the Delayed Message Exchange you just need to declare an exchange providing
// the "x-delayed-message" exchange type as follows
arguments.put("x-delayed-type", "direct");
val delayedMessageExchange = ExchangeBuilder
.directExchange(DELAYED_MESSAGE_EXCHANGE_NAME)
.delayed()
.withArguments(arguments)
.build();
log.warn("Built delayed message exchange: {}", delayedMessageExchange);
return delayedMessageExchange;
}

/**
* Delayed message binding.
*
* @param delayedMessageQueue the delayed message queue
* @param delayedMessageExchange the delayed message exchange
* @return the binding
*/
@Bean
public Binding delayedMessageBinding(@Qualifier("delayedMessageQueue") Queue delayedMessageQueue,
@Qualifier("delayedMessageExchange") Exchange delayedMessageExchange) {
val binding = BindingBuilder
.bind(delayedMessageQueue)
.to(delayedMessageExchange)
.with(DELAYED_MESSAGE_ROUTING_KEY)
.noargs();
log.warn("Built delayed message binding: {}", binding);
return binding;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;

/**
* Description: RabbitmqConfiguration, change description here.
*
* @author Johnny Miller (锺俊), email: johnnysviva@outlook.com, date: 3/9/2021 11:31 AM
* @see <a href='https://spring.io/guides/gs/messaging-rabbitmq/'>Messaging with RabbitMQ</a>
* @see
* <a href='https://thepracticaldeveloper.com/produce-and-consume-json-messages-with-spring-boot-amqp/'>Sending and receiving JSON messages with Spring Boot AMQP and RabbitMQ</a>
**/
@Slf4j
@ConditionalOnClass({TopicExchange.class})
@Import({DelayedMessageConfiguration.class})
public class RabbitmqConfiguration {
public final String topicExchangeName;

Expand Down

0 comments on commit 51b5f2b

Please sign in to comment.