Skip to content

Commit

Permalink
fixes issue #4388 , support mariadb 10.x log_bin_compress
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Sep 1, 2023
1 parent 252ca02 commit f13cc4e
Show file tree
Hide file tree
Showing 23 changed files with 673 additions and 343 deletions.
332 changes: 230 additions & 102 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java

Large diffs are not rendered by default.

50 changes: 29 additions & 21 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,15 @@ public List<LogEvent> processIterateDecode(LogEvent event, LogContext context) t
TransactionPayloadLogEvent compressEvent = ((TransactionPayloadLogEvent) event);
LogBuffer iterateBuffer = null;
if (compressEvent.isCompressByZstd()) {
ZstdCompressorInputStream in = new ZstdCompressorInputStream(
new ByteArrayInputStream(compressEvent.getPayload()));
byte[] decodeBytes = IOUtils.toByteArray(in);
iterateBuffer = new LogBuffer(decodeBytes, 0, decodeBytes.length);
try (ZstdCompressorInputStream in = new ZstdCompressorInputStream(
new ByteArrayInputStream(compressEvent.getPayload()))) {
byte[] decodeBytes = IOUtils.toByteArray(in);
iterateBuffer = new LogBuffer(decodeBytes, 0, decodeBytes.length);
}
} else if (compressEvent.isCompressByNone()) {
iterateBuffer = new LogBuffer(compressEvent.getPayload(), 0, compressEvent.getPayload().length);
} else {
throw new IllegalArgumentException("unkonow compresstype for " + event.getHeader().getLogFileName()
throw new IllegalArgumentException("unknow compress type for " + event.getHeader().getLogFileName()
+ ":" + event.getHeader().getLogPos());
}

Expand Down Expand Up @@ -446,6 +447,7 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
BinlogCheckPointLogEvent event = new BinlogCheckPointLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
logPosition.fileName = event.getFilename();
return event;
}
case LogEvent.GTID_EVENT: {
Expand Down Expand Up @@ -487,31 +489,37 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
return event;
}
case LogEvent.QUERY_COMPRESSED_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MaraiDB QUERY_COMPRESSED_EVENT from: " + context.getLogPosition());
}
break;
QueryCompressedLogEvent event = new QueryCompressedLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.WRITE_ROWS_COMPRESSED_EVENT_V1:
case LogEvent.WRITE_ROWS_COMPRESSED_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MaraiDB WRITE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
}
break;
WriteRowsCompressLogEvent event = new WriteRowsCompressLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
header.putGtid(context.getGtidSet(), gtidLogEvent);
return event;
}
case LogEvent.UPDATE_ROWS_COMPRESSED_EVENT_V1:
case LogEvent.UPDATE_ROWS_COMPRESSED_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MaraiDB UPDATE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
}
break;
UpdateRowsCompressLogEvent event = new UpdateRowsCompressLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
header.putGtid(context.getGtidSet(), gtidLogEvent);
return event;
}
case LogEvent.DELETE_ROWS_COMPRESSED_EVENT_V1:
case LogEvent.DELETE_ROWS_COMPRESSED_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MaraiDB DELETE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
}
break;
DeleteRowsCompressLogEvent event = new DeleteRowsCompressLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
header.putGtid(context.getGtidSet(), gtidLogEvent);
return event;
}
default:
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
* @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
* @version 1.0
*/
public final class DeleteRowsLogEvent extends RowsLogEvent {
public class DeleteRowsLogEvent extends RowsLogEvent {

public DeleteRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header, buffer, descriptionEvent);
super(header, buffer, descriptionEvent, false, false);
}

public DeleteRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
boolean compress){
super(header, buffer, descriptionEvent, false, compress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ public FormatDescriptionLogEvent(final int binlogVersion){
postHeaderLen[GTID_EVENT - 1] = GTID_HEADER_LEN;
postHeaderLen[GTID_LIST_EVENT - 1] = GTID_LIST_HEADER_LEN;
postHeaderLen[START_ENCRYPTION_EVENT - 1] = START_ENCRYPTION_HEADER_LEN;

// mariadb compress
postHeaderLen[QUERY_COMPRESSED_EVENT - 1] = QUERY_COMPRESSED_EVENT;
postHeaderLen[WRITE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2;
postHeaderLen[UPDATE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2;
postHeaderLen[DELETE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2;
postHeaderLen[WRITE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1;
postHeaderLen[UPDATE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1;
postHeaderLen[DELETE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1;
break;

case 3: /* 4.0.x x>=2 */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.taobao.tddl.dbsync.binlog.event;

import java.util.HashMap;
import java.util.Map;

import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;

import java.util.HashMap;
import java.util.Map;

/**
* The Common-Header, documented in the table @ref Table_common_header "below",
* always has the same form and length within one version of MySQL. Each event
Expand Down Expand Up @@ -66,7 +66,7 @@
*/
public final class LogHeader {

protected final int type;
protected int type;

/**
* The offset in the log where this event originally appeared (it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,13 @@ public class QueryLogEvent extends LogEvent {
private String timezone;

public QueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent)
throws IOException{
super(header);
throws IOException{
this(header, buffer, descriptionEvent, false);
}

public QueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
boolean compress) throws IOException{
super(header);
final int commonHeaderLen = descriptionEvent.commonHeaderLen;
final int postHeaderLen = descriptionEvent.postHeaderLen[header.type - 1];
/*
Expand Down Expand Up @@ -495,10 +499,15 @@ public QueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEve
unpackVariables(buffer, end);
buffer.position(end);
buffer.limit(limit);

/* A 2nd variable part; this is common to all versions */
final int queryLen = dataLen - dbLen - 1;
dbname = buffer.getFixName(dbLen + 1);
int queryLen = dataLen - dbLen - 1;
if (compress) {
// mariadb compress log event
// see https://github.com/alibaba/canal/issues/4388
buffer = buffer.uncompressBuf();
queryLen = buffer.limit();
}
if (clientCharset >= 0) {
charset = CharsetConversion.getNioCharset(clientCharset);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ public RotateLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEv

final int filenameOffset = headerSize + postHeaderLen;
int filenameLen = buffer.limit() - filenameOffset;
if (filenameLen > FN_REFLEN - 1) filenameLen = FN_REFLEN - 1;
if (filenameLen > FN_REFLEN - 1) {
filenameLen = FN_REFLEN - 1;
}
buffer.position(filenameOffset);

filename = buffer.getFixString(filenameLen);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import java.nio.charset.Charset;
import java.util.BitSet;

import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogContext;
import com.taobao.tddl.dbsync.binlog.LogEvent;
import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;

/**
* Common base class for all row-containing log events.
Expand Down Expand Up @@ -118,9 +118,14 @@ public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEven
this(header, buffer, descriptionEvent, false);
}

public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent, boolean partial){
super(header);
public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
boolean partial){
this(header, buffer, descriptionEvent, false, false);
}

public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent, boolean partial,
boolean compress){
super(header);
final int commonHeaderLen = descriptionEvent.commonHeaderLen;
final int postHeaderLen = descriptionEvent.postHeaderLen[header.type - 1];
int headerLen = 0;
Expand Down Expand Up @@ -167,14 +172,27 @@ public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEven
columns = buffer.getBitmap(columnLen);

if (header.type == UPDATE_ROWS_EVENT_V1 || header.type == UPDATE_ROWS_EVENT
|| header.type == PARTIAL_UPDATE_ROWS_EVENT) {
|| header.type == PARTIAL_UPDATE_ROWS_EVENT || header.type == UPDATE_ROWS_COMPRESSED_EVENT
|| header.type == UPDATE_ROWS_COMPRESSED_EVENT_V1) {
changeColumns = buffer.getBitmap(columnLen);
} else {
changeColumns = columns;
}

// XXX: Don't handle buffer in another thread.
int dataSize = buffer.limit() - buffer.position();
if (compress) {
// mariadb compress log event
// see https://github.com/alibaba/canal/issues/4388
buffer = buffer.uncompressBuf();
dataSize = buffer.limit();
// rewrite type
if (postHeaderLen == FormatDescriptionLogEvent.ROWS_HEADER_LEN_V2) {
header.type = header.type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT;
} else {
header.type = header.type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1;
}
}
rowsBuf = buffer.duplicate(dataSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
* @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
* @version 1.0
*/
public final class UpdateRowsLogEvent extends RowsLogEvent {
public class UpdateRowsLogEvent extends RowsLogEvent {

public UpdateRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header, buffer, descriptionEvent, false);
super(header, buffer, descriptionEvent, false , false);
}

public UpdateRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
boolean partial){
super(header, buffer, descriptionEvent, partial);
super(header, buffer, descriptionEvent, partial ,false);
}

public UpdateRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
boolean partial , boolean compress){
super(header, buffer, descriptionEvent, partial , compress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
* @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
* @version 1.0
*/
public final class WriteRowsLogEvent extends RowsLogEvent {
public class WriteRowsLogEvent extends RowsLogEvent {

public WriteRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header, buffer, descriptionEvent);
super(header, buffer, descriptionEvent, false, false);
}

public WriteRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
boolean compress){
super(header, buffer, descriptionEvent, false, compress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,20 @@
*/
public class BinlogCheckPointLogEvent extends IgnorableLogEvent {

private final String filename;

public BinlogCheckPointLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header, buffer, descriptionEvent);
// do nothing , just mariadb binlog checkpoint
// mariadb binlog checkpoint
final int headerSize = descriptionEvent.getCommonHeaderLen();
final int postHeaderLen = descriptionEvent.getPostHeaderLen()[getHeader().getType() - 1];

buffer.position(headerSize);
long binlogFileLen = buffer.getUint32();
filename = buffer.getFixString((int) binlogFileLen);
}

public String getFilename() {
return filename;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package com.taobao.tddl.dbsync.binlog.event.mariadb;import com.taobao.tddl.dbsync.binlog.LogBuffer;import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;import com.taobao.tddl.dbsync.binlog.event.LogHeader;/** * mariadb compress rows event * * @author jianghang * @since 1.1.7 */public class DeleteRowsCompressLogEvent extends DeleteRowsLogEvent { public DeleteRowsCompressLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){ super(header, buffer, descriptionEvent, true); }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package com.taobao.tddl.dbsync.binlog.event.mariadb;import java.io.IOException;import com.taobao.tddl.dbsync.binlog.LogBuffer;import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;import com.taobao.tddl.dbsync.binlog.event.LogHeader;import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;/** * mariadb compress query event * * @author jianghang * @since 1.1.7 */public class QueryCompressedLogEvent extends QueryLogEvent { public QueryCompressedLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent) throws IOException{ super(header, buffer, descriptionEvent, true); }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package com.taobao.tddl.dbsync.binlog.event.mariadb;import com.taobao.tddl.dbsync.binlog.LogBuffer;import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;import com.taobao.tddl.dbsync.binlog.event.LogHeader;import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;/** * mariadb compress rows event * * @author jianghang * @since 1.1.7 */public class UpdateRowsCompressLogEvent extends UpdateRowsLogEvent { public UpdateRowsCompressLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){ super(header, buffer, descriptionEvent, false, true); }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package com.taobao.tddl.dbsync.binlog.event.mariadb;import com.taobao.tddl.dbsync.binlog.LogBuffer;import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;import com.taobao.tddl.dbsync.binlog.event.LogHeader;import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;/** * mariadb compress rows event * * @author jianghang * @since 1.1.7 */public class WriteRowsCompressLogEvent extends WriteRowsLogEvent { public WriteRowsCompressLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){ super(header, buffer, descriptionEvent, true); }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.sql.Statement;
import java.util.List;

import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -19,12 +20,12 @@ public void testSimple() {
DirectLogFetcher fecther = new DirectLogFetcher();
try {
Class.forName("com.mysql.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");
Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "123456");
Statement statement = connection.createStatement();
statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");

fecther.open(connection, "binlog.000002", 4L, 1);
fecther.open(connection, "mysql-bin.000002", 4L, 1);

LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
Expand All @@ -51,6 +52,9 @@ public void processEvent(LogEvent event, LogDecoder decoder, LogContext context)
case LogEvent.ROTATE_EVENT:
binlogFileName = ((RotateLogEvent) event).getFilename();
break;
case LogEvent.BINLOG_CHECKPOINT_EVENT:
binlogFileName = ((BinlogCheckPointLogEvent) event).getFilename();
break;
case LogEvent.WRITE_ROWS_EVENT_V1:
case LogEvent.WRITE_ROWS_EVENT:
parseRowsEvent((WriteRowsLogEvent) event);
Expand Down
Loading

0 comments on commit f13cc4e

Please sign in to comment.