From 7953c4f5f25a6896e6fe900ecb806f4e52d2ec9b Mon Sep 17 00:00:00 2001 From: lebron374 <304250917@qq.com> Date: Fri, 5 Jun 2020 17:45:34 +0800 Subject: [PATCH] [ISSUE #2044] Fix DefaultLitePullConsumerImpl NPE (#2059) * fix DefaultLitePullConsumerImpl NPE * add ut --- .../consumer/DefaultLitePullConsumerImpl.java | 2 +- .../consumer/DefaultLitePullConsumerTest.java | 49 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index f54078fcfd..e3d60ffadd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -691,7 +691,7 @@ public void run() { ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - if (processQueue == null && processQueue.isDropped()) { + if (null == processQueue || processQueue.isDropped()) { log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); return; } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index cc8d5e2bf7..de2f608382 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -81,6 +81,8 @@ public class DefaultLitePullConsumerTest { private MQClientAPIImpl mQClientAPIImpl; @Mock private MQAdminImpl mQAdminImpl; + @Mock + private AssignedMessageQueue assignedMQ; private RebalanceImpl rebalanceImpl; private OffsetStore offsetStore; @@ -304,6 +306,53 @@ public void testPauseAndResume_Success() throws Exception { } } + @Test + public void testPullTaskImpl_ProcessQueueNull() throws Exception { + DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); + field.setAccessible(true); + // set ProcessQueue dropped = true + DefaultLitePullConsumerImpl localLitePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); + field.setAccessible(true); + when(assignedMQ.isPaused(any(MessageQueue.class))).thenReturn(false); + when(assignedMQ.getProcessQueue(any(MessageQueue.class))).thenReturn(null); + litePullConsumer.start(); + field.set(localLitePullConsumerImpl, assignedMQ); + + List result = litePullConsumer.poll(100); + assertThat(result.isEmpty()).isTrue(); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testPullTaskImpl_ProcessQueueDropped() throws Exception { + DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); + field.setAccessible(true); + // set ProcessQueue dropped = true + DefaultLitePullConsumerImpl localLitePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); + field.setAccessible(true); + AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(localLitePullConsumerImpl); + assignedMessageQueue.getProcessQueue(messageQueue).setDropped(true); + litePullConsumer.start(); + + List result = litePullConsumer.poll(100); + assertThat(result.isEmpty()).isTrue(); + } finally { + litePullConsumer.shutdown(); + } + } + @Test public void testRegisterTopicMessageQueueChangeListener_Success() throws Exception { flag = false;