Skip to content

Commit

Permalink
fix($Quartz): correct job execution to avoid NPE
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnymillergh committed Apr 15, 2022
1 parent 59de800 commit f8932b0
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ fun invokeMethod(quartzJobConfiguration: QuartzJobConfiguration) {
val beanName = getBeanName(invokeTarget)
val methodName = getMethodName(invokeTarget)
val methodParams = getMethodParams(invokeTarget!!)
val bean: Any = if (!isValidClassName(beanName)) {
if (!isValidClassName(beanName)) {
lazyDebug(log) { "Getting the bean from Spring IoC container by bean name: `$beanName`" }
SpringUtil.getBean(beanName)
} else {
lazyDebug(log) { "Initialize a new object by class name: `$beanName`" }
Class.forName(beanName).getDeclaredConstructor().newInstance()
}.let {
lazyDebug(log) { "Found the bean (`$beanName`) from Spring IoC container, $it" }
invokeMethod(it, methodName, methodParams)
}
invokeMethod(bean, methodName, methodParams)
}

/**
Expand Down Expand Up @@ -77,12 +79,14 @@ private fun invokeMethod(
* @param invokeTarget the invoke target
* @return the boolean
*/
fun isValidClassName(invokeTarget: String?): Boolean {
return StringUtils.countOccurrencesOf(invokeTarget!!, ".") > 1
fun isValidClassName(invokeTarget: String): Boolean {
return StringUtils.countOccurrencesOf(invokeTarget, ".") > 1
}

/**
* Gets bean name.
* Gets bean name. For example:
* * `greetingBean.hello()`
* * `com.jmsoftware.maf.springcloudstarter.quartz.GreetingBean.hello()`
*
* @param invokeTarget the invoke target
* @return the bean name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,40 @@
package com.jmsoftware.maf.springcloudstarter.rabbitmq;
package com.jmsoftware.maf.springcloudstarter.rabbitmq

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import com.jmsoftware.maf.common.util.logger
import org.springframework.amqp.core.*
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean

/**
* <h1>DelayedMessageConfiguration</h1>
* <p>
* # DelayedMessageConfiguration
*
* Change description here.
*
* @author Johnny Miller (鍾俊), email: johnnysviva@outlook.com, 10/2/21 12:03 PM
* @see
* <a href='https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq'>Scheduling Messages with RabbitMQ</a>
* @author Johnny Miller (锺俊), e-mail: johnnysviva@outlook.com, date: 4/15/22 8:12 PM
* @see <a href='https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq'>Scheduling Messages with RabbitMQ</a>
* @see <a href='https://github.com/rabbitmq/rabbitmq-delayed-message-exchange'>RabbitMQ Delayed Message Plugin</a>
* @see <a href='https://www.rabbitmq.com/community-plugins.html'>Community Plugins</a>
**/
@Slf4j
public class DelayedMessageConfiguration {
public static final String DELAYED_MESSAGE_QUEUE_NAME = "delayed-message.queue";
public static final String DELAYED_MESSAGE_EXCHANGE_NAME = "delayed-message.exchange";
public static final String DELAYED_MESSAGE_ROUTING_KEY = "delayed-message.routing-key";
*/
class DelayedMessageConfiguration {
companion object {
const val DELAYED_MESSAGE_QUEUE_NAME = "delayed-message.queue"
const val DELAYED_MESSAGE_EXCHANGE_NAME = "delayed-message.exchange"
const val DELAYED_MESSAGE_ROUTING_KEY = "delayed-message.routing-key"
private val log = logger()
}

/**
* Delayed message queue, which is durable, non-exclusive and non auto-delete.
*
* @return the delayed message queue
*/
@Bean
public Queue delayedMessageQueue() {
val delayedMessageQueue = QueueBuilder.durable(DELAYED_MESSAGE_QUEUE_NAME).build();
log.warn("Built delayed message queue: {}", delayedMessageQueue);
return delayedMessageQueue;
fun delayedMessageQueue(): Queue {
return QueueBuilder
.durable(DELAYED_MESSAGE_QUEUE_NAME)
.build().apply {
log.warn("Built delayed message queue: $this")
}
}

/**
Expand All @@ -42,18 +43,16 @@ public class DelayedMessageConfiguration {
* @return the custom exchange
*/
@Bean
public Exchange delayedMessageExchange() {
val arguments = Maps.<String, Object>newHashMap();
// To use the Delayed Message Exchange you just need to declare an exchange providing
// the "x-delayed-message" exchange type as follows
arguments.put("x-delayed-type", "direct");
val delayedMessageExchange = ExchangeBuilder
.directExchange(DELAYED_MESSAGE_EXCHANGE_NAME)
.delayed()
.withArguments(arguments)
.build();
log.warn("Built delayed message exchange: {}", delayedMessageExchange);
return delayedMessageExchange;
fun delayedMessageExchange(): Exchange {
return ExchangeBuilder
.directExchange(DELAYED_MESSAGE_EXCHANGE_NAME)
.delayed()
// To use the Delayed Message Exchange you just need to declare an exchange providing
// the "x-delayed-message" exchange type as follows
.withArguments(mapOf("x-delayed-type" to "direct"))
.build<Exchange>().apply {
log.warn("Built delayed message exchange: $this")
}
}

/**
Expand All @@ -64,14 +63,16 @@ public class DelayedMessageConfiguration {
* @return the binding
*/
@Bean
public Binding delayedMessageBinding(@Qualifier("delayedMessageQueue") Queue delayedMessageQueue,
@Qualifier("delayedMessageExchange") Exchange delayedMessageExchange) {
val binding = BindingBuilder
.bind(delayedMessageQueue)
.to(delayedMessageExchange)
.with(DELAYED_MESSAGE_ROUTING_KEY)
.noargs();
log.warn("Built delayed message binding: {}", binding);
return binding;
fun delayedMessageBinding(
@Qualifier("delayedMessageQueue") delayedMessageQueue: Queue?,
@Qualifier("delayedMessageExchange") delayedMessageExchange: Exchange?
): Binding {
return BindingBuilder
.bind(delayedMessageQueue)
.to(delayedMessageExchange)
.with(DELAYED_MESSAGE_ROUTING_KEY)
.noargs().apply {
log.warn("Built delayed message binding: $this")
}
}
}
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
package com.jmsoftware.maf.springcloudstarter.rabbitmq;
package com.jmsoftware.maf.springcloudstarter.rabbitmq

import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import com.jmsoftware.maf.common.util.logger
import com.jmsoftware.maf.springcloudstarter.rabbitmq.DelayedMessageConfiguration
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
import org.springframework.boot.autoconfigure.amqp.RabbitProperties
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Import
import org.springframework.context.annotation.Primary

/**
* # RabbitmqConfiguration
*
* Description: RabbitmqConfiguration, change description here.
*
* @author Johnny Miller (锺俊), email: johnnysviva@outlook.com, date: 3/9/2021 11:31 AM
* @author Johnny Miller (锺俊), e-mail: johnnysviva@outlook.com, date: 4/15/22 8:14 PM
* @see RabbitAutoConfiguration
* @see <a href='https://spring.io/guides/gs/messaging-rabbitmq/'>Messaging with RabbitMQ</a>
* @see
* <a href='https://thepracticaldeveloper.com/produce-and-consume-json-messages-with-spring-boot-amqp/'>Sending and receiving JSON messages with Spring Boot AMQP and RabbitMQ</a>
**/
@Slf4j
@ConditionalOnClass({TopicExchange.class})
@Import({DelayedMessageConfiguration.class})
public class RabbitmqConfiguration {
* @see <a href='https://thepracticaldeveloper.com/produce-and-consume-json-messages-with-spring-boot-amqp/'>Sending and receiving JSON messages with Spring Boot AMQP and RabbitMQ</a>
*/
@ConditionalOnClass(TopicExchange::class)
@Import(
DelayedMessageConfiguration::class
)
class RabbitmqConfiguration {
companion object {
private val log = logger()
}

@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
fun messageConverter(): MessageConverter {
return Jackson2JsonMessageConverter()
}

/**
Expand All @@ -40,27 +46,24 @@ public class RabbitmqConfiguration {
* @param configurer the configurer
* @param connectionFactory the connection factory
* @return the simple rabbit listener container factory
* @see RabbitAnnotationDrivenConfiguration#directRabbitListenerContainerFactory(org.springframework.boot.autoconfigure.amqp.DirectRabbitListenerContainerFactoryConfigurer, org.springframework.amqp.rabbit.connection.ConnectionFactory)
* @see RabbitAnnotationDrivenConfiguration.directRabbitListenerContainerFactory
*/
@Primary
@Bean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
RabbitProperties rabbitProperties,
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory
) {
val simpleContainer = rabbitProperties.getListener().getSimple();
simpleContainer.setConcurrency(2);
simpleContainer.setMaxConcurrency(4);
log.warn(
"Overriding `rabbitListenerContainerFactory` with custom configuration, concurrency: {}, "
+ "maxConcurrency: {}", simpleContainer.getConcurrency(), simpleContainer.getMaxConcurrency()
);
val factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
configurer.configure(factory, connectionFactory);
return factory;
@Bean(name = ["rabbitListenerContainerFactory"])
fun rabbitListenerContainerFactory(
rabbitProperties: RabbitProperties,
configurer: SimpleRabbitListenerContainerFactoryConfigurer,
connectionFactory: ConnectionFactory
): SimpleRabbitListenerContainerFactory {
val simpleContainer = rabbitProperties.listener.simple
simpleContainer.concurrency = 2
simpleContainer.maxConcurrency = 4
log.warn("Overriding `rabbitListenerContainerFactory` with custom configuration, concurrency: ${simpleContainer.concurrency}, maxConcurrency: ${simpleContainer.maxConcurrency}")
val factory = SimpleRabbitListenerContainerFactory()
factory.setConnectionFactory(connectionFactory)
factory.setConcurrentConsumers(2)
factory.setMaxConcurrentConsumers(4)
configurer.configure(factory, connectionFactory)
return factory
}
}

0 comments on commit f8932b0

Please sign in to comment.