Skip to content

Commit

Permalink
[ISSUE alibaba#2044] Fix DefaultLitePullConsumerImpl NPE (alibaba#2059)
Browse files Browse the repository at this point in the history
* fix DefaultLitePullConsumerImpl NPE

* add ut
  • Loading branch information
lebron374 authored Jun 5, 2020
1 parent 49a722f commit 7953c4f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ public void run() {

ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);

if (processQueue == null && processQueue.isDropped()) {
if (null == processQueue || processQueue.isDropped()) {
log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class DefaultLitePullConsumerTest {
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private MQAdminImpl mQAdminImpl;
@Mock
private AssignedMessageQueue assignedMQ;

private RebalanceImpl rebalanceImpl;
private OffsetStore offsetStore;
Expand Down Expand Up @@ -304,6 +306,53 @@ public void testPauseAndResume_Success() throws Exception {
}
}

@Test
public void testPullTaskImpl_ProcessQueueNull() throws Exception {
DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
// set ProcessQueue dropped = true
DefaultLitePullConsumerImpl localLitePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
when(assignedMQ.isPaused(any(MessageQueue.class))).thenReturn(false);
when(assignedMQ.getProcessQueue(any(MessageQueue.class))).thenReturn(null);
litePullConsumer.start();
field.set(localLitePullConsumerImpl, assignedMQ);

List<MessageExt> result = litePullConsumer.poll(100);
assertThat(result.isEmpty()).isTrue();
} finally {
litePullConsumer.shutdown();
}
}

@Test
public void testPullTaskImpl_ProcessQueueDropped() throws Exception {
DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
// set ProcessQueue dropped = true
DefaultLitePullConsumerImpl localLitePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(localLitePullConsumerImpl);
assignedMessageQueue.getProcessQueue(messageQueue).setDropped(true);
litePullConsumer.start();

List<MessageExt> result = litePullConsumer.poll(100);
assertThat(result.isEmpty()).isTrue();
} finally {
litePullConsumer.shutdown();
}
}

@Test
public void testRegisterTopicMessageQueueChangeListener_Success() throws Exception {
flag = false;
Expand Down

0 comments on commit 7953c4f

Please sign in to comment.