From c20370c98aae16994046f579e99e3610c7b4050d Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Tue, 20 Jun 2023 21:26:24 -0700 Subject: [PATCH] potential fix for #2016 if rows in a transaction are flowing to different partitions, data loss was possible if one partition got stuck while the "commit" message made progress. This has the net affect of serializing maxwell's ability to make progress in a binlog. i think that's mostly a good thing. --- .../producer/AbstractAsyncProducer.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java b/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java index 22fc194d1..e25ea37e4 100644 --- a/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java @@ -30,20 +30,18 @@ public CallbackCompleter(InflightMessageList inflightMessages, Position position public void markCompleted() { inflightMessages.freeSlot(messageID); - if(isTXCommit) { - InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position); + InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position); - if (message != null) { - context.setPosition(message.position); - long currentTime = System.currentTimeMillis(); - long endToEndLatency = currentTime - message.eventTimeMS; + if (message != null && this.isTXCommit) { + context.setPosition(message.position); + long currentTime = System.currentTimeMillis(); + long endToEndLatency = currentTime - message.eventTimeMS; - messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS); - messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS); + messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS); + messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS); - if (endToEndLatency > metricsAgeSloMs) { - messageLatencySloViolationCount.inc(); - } + if (endToEndLatency > metricsAgeSloMs) { + messageLatencySloViolationCount.inc(); } } } @@ -84,9 +82,7 @@ public final void push(RowMap r) throws Exception { long messageID = inflightMessages.waitForSlot(); - if(r.isTXCommit()) { - inflightMessages.addMessage(position, r.getTimestampMillis(), messageID); - } + inflightMessages.addMessage(position, r.getTimestampMillis(), messageID); CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);