Skip to content

Commit

Permalink
Revert "potential fix for #2016"
Browse files Browse the repository at this point in the history
Right idea, but dumb implementation.
This reverts commit c20370c.
  • Loading branch information
osheroff committed Sep 9, 2023
1 parent 21b104e commit 1849ee4
Showing 1 changed file with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,20 @@ public CallbackCompleter(InflightMessageList inflightMessages, Position position

public void markCompleted() {
inflightMessages.freeSlot(messageID);
InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);
if(isTXCommit) {
InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);

if (message != null && this.isTXCommit) {
context.setPosition(message.position);
long currentTime = System.currentTimeMillis();
long endToEndLatency = currentTime - message.eventTimeMS;
if (message != null) {
context.setPosition(message.position);
long currentTime = System.currentTimeMillis();
long endToEndLatency = currentTime - message.eventTimeMS;

messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS);
messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS);
messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS);
messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS);

if (endToEndLatency > metricsAgeSloMs) {
messageLatencySloViolationCount.inc();
if (endToEndLatency > metricsAgeSloMs) {
messageLatencySloViolationCount.inc();
}
}
}
}
Expand Down Expand Up @@ -81,7 +83,9 @@ public final void push(RowMap r) throws Exception {

long messageID = inflightMessages.waitForSlot();

inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
if(r.isTXCommit()) {
inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
}

CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);

Expand Down

0 comments on commit 1849ee4

Please sign in to comment.