diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java index ec400a7cc116..797b61b1c3c8 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java @@ -172,8 +172,19 @@ private void unsetAndScheduleNextRenewal() { private void scheduleNextRenewal() { // Schedules next renewal if there are still messages to process and no renewals scheduled that // could handle them, otherwise does nothing + Message nextMessage; + synchronized (lock) { + Message peek = messageQueue.peek(); + // We remove from the queue messages that were removed from the ack deadline renewer (and + // possibly re-added) + while (peek != null && (!messageDeadlines.containsKey(peek.messageId()) + || messageDeadlines.get(peek.messageId()) > peek.expectedDeadline())) { + messageQueue.poll(); + peek = messageQueue.peek(); + } + nextMessage = peek; + } synchronized (futureLock) { - Message nextMessage = messageQueue.peek(); if (renewerFuture == null && nextMessage != null) { long delay = (nextMessage.expectedDeadline() - clock.millis()) - NEXT_RENEWAL_THRESHOLD_MILLIS;