Skip to content

Commit

Permalink
Update Consumer.java
Browse files Browse the repository at this point in the history
  • Loading branch information
180254 authored May 3, 2024
1 parent 2d378d3 commit 6dac4bf
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -516,17 +516,17 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
.syncBatchPositionBitSetForPendingAck(position);
}
}
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

positionsAcked.add(position);

checkCanRemovePendingAcksAndHandle(position, msgId);

checkAckValidationError(ack, position);

totalAckCount += ackedCount;
Expand Down

0 comments on commit 6dac4bf

Please sign in to comment.