Skip to content

Commit

Permalink
potential fix for #2016
Browse files Browse the repository at this point in the history
if rows in a transaction are flowing to different partitions, data loss
was possible if one partition got stuck while the "commit" message made
progress.

This has the net affect of serializing maxwell's ability to make
progress in a binlog.  i think that's mostly a good thing.
  • Loading branch information
osheroff committed Jun 21, 2023
1 parent 76992cf commit c20370c
Showing 1 changed file with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,18 @@ public CallbackCompleter(InflightMessageList inflightMessages, Position position

public void markCompleted() {
inflightMessages.freeSlot(messageID);
if(isTXCommit) {
InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);
InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);

if (message != null) {
context.setPosition(message.position);
long currentTime = System.currentTimeMillis();
long endToEndLatency = currentTime - message.eventTimeMS;
if (message != null && this.isTXCommit) {
context.setPosition(message.position);
long currentTime = System.currentTimeMillis();
long endToEndLatency = currentTime - message.eventTimeMS;

messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS);
messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS);
messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS);
messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS);

if (endToEndLatency > metricsAgeSloMs) {
messageLatencySloViolationCount.inc();
}
if (endToEndLatency > metricsAgeSloMs) {
messageLatencySloViolationCount.inc();
}
}
}
Expand Down Expand Up @@ -84,9 +82,7 @@ public final void push(RowMap r) throws Exception {

long messageID = inflightMessages.waitForSlot();

if(r.isTXCommit()) {
inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
}
inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);

CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);

Expand Down

0 comments on commit c20370c

Please sign in to comment.