Skip to content

Commit

Permalink
fixes issue alibaba#4388 , support mysql 8.0.20+ TransactionPayloadLo…
Browse files Browse the repository at this point in the history
…gEvent parse
  • Loading branch information
agapple authored and zoemak committed Jan 30, 2024
1 parent c77173e commit a85cb6d
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 77 deletions.
8 changes: 8 additions & 0 deletions dbsync/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<!-- test dependency -->
<dependency>
<groupId>junit</groupId>
Expand Down
24 changes: 16 additions & 8 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -30,7 +29,9 @@ protected LogBuffer(){
}

public LogBuffer(byte[] buffer, final int origin, final int limit){
if (origin + limit > buffer.length) throw new IllegalArgumentException("capacity excceed: " + (origin + limit));
if (origin + limit > buffer.length) {
throw new IllegalArgumentException("capacity excceed: " + (origin + limit));
}

this.buffer = buffer;
this.origin = origin;
Expand All @@ -42,7 +43,9 @@ public LogBuffer(byte[] buffer, final int origin, final int limit){
* Return n bytes in this buffer.
*/
public final LogBuffer duplicate(final int pos, final int len) {
if (pos + len > limit) throw new IllegalArgumentException("limit excceed: " + (pos + len));
if (pos + len > limit) {
throw new IllegalArgumentException("limit excceed: " + (pos + len));
}

// XXX: Do momery copy avoid buffer modified.
final int off = origin + pos;
Expand All @@ -54,8 +57,9 @@ public final LogBuffer duplicate(final int pos, final int len) {
* Return next n bytes in this buffer.
*/
public final LogBuffer duplicate(final int len) {
if (position + len > origin + limit) throw new IllegalArgumentException("limit excceed: "
+ (position + len - origin));
if (position + len > origin + limit) {
throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
}

// XXX: Do momery copy avoid buffer modified.
final int end = position + len;
Expand Down Expand Up @@ -103,7 +107,9 @@ public final int position() {
* <tt>newPosition</tt> do not hold
*/
public final LogBuffer position(final int newPosition) {
if (newPosition > limit || newPosition < 0) throw new IllegalArgumentException("limit excceed: " + newPosition);
if (newPosition > limit || newPosition < 0) {
throw new IllegalArgumentException("limit excceed: " + newPosition);
}

this.position = origin + newPosition;
return this;
Expand All @@ -116,8 +122,9 @@ public final LogBuffer position(final int newPosition) {
* @return This buffer
*/
public final LogBuffer forward(final int len) {
if (position + len > origin + limit) throw new IllegalArgumentException("limit excceed: "
+ (position + len - origin));
if (position + len > origin + limit) {
throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
}

this.position += len;
return this;
Expand Down Expand Up @@ -1675,4 +1682,5 @@ public final String hexdump(final int pos, final int len) {
}
return "";
}

}
10 changes: 10 additions & 0 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public final class LogContext {

private LogEvent gtidLogEvent; // save current gtid log event

private boolean iterateDecode = false;

public LogContext(){
this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
}
Expand Down Expand Up @@ -82,4 +84,12 @@ public LogEvent getGtidLogEvent() {
public void setGtidLogEvent(LogEvent gtidLogEvent) {
this.gtidLogEvent = gtidLogEvent;
}

public boolean isIterateDecode() {
return iterateDecode;
}

public void setIterateDecode(boolean iterateDecode) {
this.iterateDecode = iterateDecode;
}
}
90 changes: 71 additions & 19 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package com.taobao.tddl.dbsync.binlog;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.BitSet;
import java.util.List;

import com.taobao.tddl.dbsync.binlog.event.*;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.*;
import com.taobao.tddl.dbsync.binlog.event.mariadb.*;

/**
* Implements a binary-log decoder.
Expand Down Expand Up @@ -56,6 +57,8 @@ public final void handle(final int flagIndex) {
handleSet.set(flagIndex);
}

private LogBuffer compressIterateBuffer;

/**
* Decoding an event from binary-log buffer.
*
Expand All @@ -64,7 +67,6 @@ public final void handle(final int flagIndex) {
*/
public LogEvent decode(LogBuffer buffer, LogContext context) throws IOException {
final int limit = buffer.limit();

if (limit >= FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN) {
LogHeader header = new LogHeader(buffer, context.getFormatDescription());

Expand Down Expand Up @@ -109,6 +111,58 @@ public LogEvent decode(LogBuffer buffer, LogContext context) throws IOException
return null;
}

/**
* * process compress binlog payload
*
* @param event
* @param context
* @return
* @throws IOException
*/
public List<LogEvent> processIterateDecode(LogEvent event, LogContext context) throws IOException {
List<LogEvent> events = Lists.newArrayList();
if (event.getHeader().getType() == LogEvent.TRANSACTION_PAYLOAD_EVENT) {
// iterate for compresss payload
TransactionPayloadLogEvent compressEvent = ((TransactionPayloadLogEvent) event);
LogBuffer iterateBuffer = null;
if (compressEvent.isCompressByZstd()) {
ZstdCompressorInputStream in = new ZstdCompressorInputStream(
new ByteArrayInputStream(compressEvent.getPayload()));
byte[] decodeBytes = IOUtils.toByteArray(in);
iterateBuffer = new LogBuffer(decodeBytes, 0, decodeBytes.length);
} else if (compressEvent.isCompressByNone()) {
iterateBuffer = new LogBuffer(compressEvent.getPayload(), 0, compressEvent.getPayload().length);
} else {
throw new IllegalArgumentException("unkonow compresstype for " + event.getHeader().getLogFileName()
+ ":" + event.getHeader().getLogPos());
}

try {
context.setIterateDecode(true);
while (iterateBuffer.hasRemaining()) {// iterate
LogEvent deEvent = decode(iterateBuffer, context);
if (deEvent == null) {
break;
}

// compress event logPos = 0
deEvent.getHeader().setLogFileName(event.getHeader().getLogFileName());
deEvent.getHeader().setLogPos(event.getHeader().getLogPos());
// 需要重置payload每个event的eventLen , ack位点更新依赖logPos - eventLen,
// 原因:每个payload都是uncompress的eventLen,无法对应物理binlog的eventLen
// 隐患:memory计算空间大小时会出现放大的情况,影响getBatch的数量
deEvent.getHeader().setEventLen(event.getHeader().getEventLen());
events.add(deEvent);
}
} finally {
context.setIterateDecode(false);
}
} else {
// TODO support mariadb compress binlog
}
return events;
}

/**
* Deserialize an event from buffer.
*
Expand All @@ -127,8 +181,12 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
}

if (checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_OFF && checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_UNDEF) {
// remove checksum bytes
buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
if (context.isIterateDecode()) {
// transaction compress payload在主事件已经处理了checksum,遍历解析event忽略checksum处理
} else {
// remove checksum bytes
buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
}
}
GTIDSet gtidSet = context.getGtidSet();
LogEvent gtidLogEvent = context.getGtidLogEvent();
Expand Down Expand Up @@ -360,16 +418,10 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
return event;
}
case LogEvent.TRANSACTION_PAYLOAD_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MySQL TRANSACTION_PAYLOAD_EVENT from: " + context.getLogPosition());
}
break;

// TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header,
// buffer, descriptionEvent);
// /* updating position in context */
// logPosition.position = header.getLogPos();
// return event;
TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.VIEW_CHANGE_EVENT: {
ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
public static final int TRANSACTION_CONTEXT_HEADER_LEN = 18;
public static final int VIEW_CHANGE_HEADER_LEN = 52;
public static final int XA_PREPARE_HEADER_LEN = 0;
public static final int TRANSACTION_PAYLOAD_HEADER_LEN = 0;

public static final int ANNOTATE_ROWS_HEADER_LEN = 0;
public static final int BINLOG_CHECKPOINT_HEADER_LEN = 4;
Expand Down Expand Up @@ -113,14 +114,14 @@ public FormatDescriptionLogEvent(LogHeader header, LogBuffer buffer, FormatDescr
long calc = getVersionProduct();
if (calc >= checksumVersionProduct) {
/*
* the last bytes are the checksum alg desc and value (or value's
* room)
* the last bytes are the checksum alg desc and value (or value's room)
*/
numberOfEventTypes -= BINLOG_CHECKSUM_ALG_DESC_LEN;
}

if (logger.isInfoEnabled()) logger.info("common_header_len= " + commonHeaderLen + ", number_of_event_types= "
+ numberOfEventTypes);
if (logger.isInfoEnabled()) {
logger.info("common_header_len= " + commonHeaderLen + ", number_of_event_types= " + numberOfEventTypes);
}
}

/** MySQL 5.0 format descriptions. */
Expand Down Expand Up @@ -212,6 +213,7 @@ public FormatDescriptionLogEvent(final int binlogVersion){
postHeaderLen[VIEW_CHANGE_EVENT - 1] = VIEW_CHANGE_HEADER_LEN;
postHeaderLen[XA_PREPARE_LOG_EVENT - 1] = XA_PREPARE_HEADER_LEN;
postHeaderLen[PARTIAL_UPDATE_ROWS_EVENT - 1] = ROWS_HEADER_LEN_V2;
postHeaderLen[TRANSACTION_PAYLOAD_EVENT - 1] = TRANSACTION_PAYLOAD_HEADER_LEN;

// mariadb 10
postHeaderLen[ANNOTATE_ROWS_EVENT - 1] = ANNOTATE_ROWS_HEADER_LEN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ public String getLogFileName() {
public void setLogFileName(String logFileName) {
this.logFileName = logFileName;
}
public void setLogPos(long logPos) {
this.logPos = logPos;
}
public void setEventLen(int eventLen) {
this.eventLen = eventLen;
}

private void processCheckSum(LogBuffer buffer) {
if (checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_OFF && checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_UNDEF) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,91 @@

/**
* @author agapple 2022年5月23日 下午7:05:39
* @version 1.1.6
* @version 1.1.7
* @since mysql 8.0.20
*/
public class TransactionPayloadLogEvent extends LogEvent {

public static final short COMPRESSION_TYPE_MIN_LENGTH = 1;
public static final short COMPRESSION_TYPE_MAX_LENGTH = 9;
public static final short PAYLOAD_SIZE_MIN_LENGTH = 0;
public static final short PAYLOAD_SIZE_MAX_LENGTH = 9;
public static final short UNCOMPRESSED_SIZE_MIN_LENGTH = 0;
public static final short UNCOMPRESSED_SIZE_MAX_LENGTH = 9;
public static final int MAX_DATA_LENGTH = COMPRESSION_TYPE_MAX_LENGTH
+ PAYLOAD_SIZE_MAX_LENGTH
+ UNCOMPRESSED_SIZE_MAX_LENGTH;

/** Marks the end of the payload header. */
public static final int OTW_PAYLOAD_HEADER_END_MARK = 0;

/** The payload field */
public static final int OTW_PAYLOAD_SIZE_FIELD = 1;

/** The compression type field */
public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2;

/** The uncompressed size field */
public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3;

/* ZSTD compression. */
public final static int COMPRESS_TYPE_ZSTD = 0;
/* No compression. */
public final static int COMPRESS_TYPE_NONE = 255;

private long m_compression_type = COMPRESS_TYPE_NONE;
private long m_payload_size;
private long m_uncompressed_size;
private byte[] m_payload;

public TransactionPayloadLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header);

final int commonHeaderLen = descriptionEvent.getCommonHeaderLen();
final int postHeaderLen = descriptionEvent.getPostHeaderLen()[header.getType() - 1];

int offset = commonHeaderLen;
buffer.position(offset);
long type = 0, length = 0;
while (buffer.hasRemaining()) {
type = buffer.getPackedLong(); // type
if (type == OTW_PAYLOAD_HEADER_END_MARK) {
break;
}

length = buffer.getPackedLong(); // length
switch ((int) type) {
case OTW_PAYLOAD_SIZE_FIELD:
m_payload_size = buffer.getPackedLong(); // value
break;
case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD:
m_compression_type = buffer.getPackedLong(); // value
break;
case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
m_uncompressed_size = buffer.getPackedLong(); // value
break;
default:
buffer.forward((int) length);
break;
}

}

if (m_uncompressed_size == 0) {
m_uncompressed_size = m_payload_size;
}
m_payload = buffer.getData((int) m_payload_size);
}

public boolean isCompressByZstd() {
return m_compression_type == COMPRESS_TYPE_ZSTD;
}

public boolean isCompressByNone() {
return m_compression_type == COMPRESS_TYPE_NONE;
}

public byte[] getPayload() {
return m_payload;
}
}
Loading

0 comments on commit a85cb6d

Please sign in to comment.