Skip to content

Commit

Permalink
[fix][connector-rocketmq]Fix a NPE problem when checkpoint.interval i…
Browse files Browse the repository at this point in the history
…s set too small(apache#6624)
  • Loading branch information
YalikWang committed Apr 1, 2024
1 parent 93c6cac commit 776cfb8
Showing 1 changed file with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,21 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
Long offset = entry.getValue();
try {
if (messageQueue != null && offset != null) {
consumerThreads
.get(messageQueue)
.getTasks()
.put(
consumer -> {
if (this.metadata.isEnabledCommitCheckpoint()) {
consumer.getOffsetStore()
.updateOffset(messageQueue, offset, false);
consumer.getOffsetStore().persist(messageQueue);
}
});
RocketMqConsumerThread rocketMqConsumerThread =
consumerThreads.get(messageQueue);
if (rocketMqConsumerThread != null) {
rocketMqConsumerThread
.getTasks()
.put(
consumer -> {
if (this.metadata.isEnabledCommitCheckpoint()) {
consumer.getOffsetStore()
.updateOffset(
messageQueue, offset, false);
consumer.getOffsetStore().persist(messageQueue);
}
});
}
}
} catch (InterruptedException e) {
log.error("commit offset failed", e);
Expand Down

0 comments on commit 776cfb8

Please sign in to comment.