Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #632 ] Fix NPE caused by using @ ExtRocketMQTemplateConfiguration annotation extension to send messages in v5 #639

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.OrderComparator;
import org.springframework.core.annotation.AnnotationUtils;

Expand All @@ -32,14 +33,16 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {

private ApplicationContext applicationContext;

private AnnotationEnhancer enhancer;

private ListenerContainerConfiguration listenerContainerConfiguration;

private boolean running = false;

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
Expand All @@ -58,6 +61,34 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
return bean;
}

@Override
public int getPhase() {
return Integer.MAX_VALUE - 2000;
}

@Override
public void start() {
if (!isRunning()) {
this.setRunning(true);
listenerContainerConfiguration.startContainer();
}
}

@Override
public void stop() {

}

public void setRunning(boolean running) {
this.running = running;
}


@Override
public boolean isRunning() {
return running;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.springframework.util.Assert;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

@Configuration
Expand All @@ -48,6 +50,8 @@ public class ListenerContainerConfiguration implements ApplicationContextAware {

private RocketMQMessageConverter rocketMQMessageConverter;

private final List<DefaultListenerContainer> containers = new ArrayList<>();

public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
Expand All @@ -68,15 +72,23 @@ public void registerContainer(String beanName, Object bean, RocketMQMessageListe
genericApplicationContext.registerBean(containerBeanName, DefaultListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);

containers.add(container);

log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

public void startContainer() {
for (DefaultListenerContainer container : containers) {
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

private DefaultListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
Expand Down
Loading