Skip to content

Commit

Permalink
ignore compression event
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Sep 1, 2022
1 parent 92758b8 commit ce916ee
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 12 deletions.
61 changes: 49 additions & 12 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
}
case LogEvent.SLAVE_EVENT: /* can never happen (unused event) */
{
if (logger.isWarnEnabled()) logger.warn("Skipping unsupported SLAVE_EVENT from: "
+ context.getLogPosition());
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported SLAVE_EVENT from: " + context.getLogPosition());
}
break;
}
case LogEvent.CREATE_FILE_EVENT: {
Expand Down Expand Up @@ -264,22 +265,25 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
return descriptionEvent;
}
case LogEvent.PRE_GA_WRITE_ROWS_EVENT: {
if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_WRITE_ROWS_EVENT from: "
+ context.getLogPosition());
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported PRE_GA_WRITE_ROWS_EVENT from: " + context.getLogPosition());
}
// ev = new Write_rows_log_event_old(buf, event_len,
// description_event);
break;
}
case LogEvent.PRE_GA_UPDATE_ROWS_EVENT: {
if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_UPDATE_ROWS_EVENT from: "
+ context.getLogPosition());
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported PRE_GA_UPDATE_ROWS_EVENT from: " + context.getLogPosition());
}
// ev = new Update_rows_log_event_old(buf, event_len,
// description_event);
break;
}
case LogEvent.PRE_GA_DELETE_ROWS_EVENT: {
if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_DELETE_ROWS_EVENT from: "
+ context.getLogPosition());
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported PRE_GA_DELETE_ROWS_EVENT from: " + context.getLogPosition());
}
// ev = new Delete_rows_log_event_old(buf, event_len,
// description_event);
break;
Expand Down Expand Up @@ -356,10 +360,16 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
return event;
}
case LogEvent.TRANSACTION_PAYLOAD_EVENT: {
TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MySQL TRANSACTION_PAYLOAD_EVENT from: " + context.getLogPosition());
}
break;

// TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header,
// buffer, descriptionEvent);
// /* updating position in context */
// logPosition.position = header.getLogPos();
// return event;
}
case LogEvent.VIEW_CHANGE_EVENT: {
ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);
Expand Down Expand Up @@ -424,6 +434,33 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.QUERY_COMPRESSED_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MaraiDB QUERY_COMPRESSED_EVENT from: " + context.getLogPosition());
}
break;
}
case LogEvent.WRITE_ROWS_COMPRESSED_EVENT_V1:
case LogEvent.WRITE_ROWS_COMPRESSED_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MaraiDB WRITE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
}
break;
}
case LogEvent.UPDATE_ROWS_COMPRESSED_EVENT_V1:
case LogEvent.UPDATE_ROWS_COMPRESSED_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MaraiDB UPDATE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
}
break;
}
case LogEvent.DELETE_ROWS_COMPRESSED_EVENT_V1:
case LogEvent.DELETE_ROWS_COMPRESSED_EVENT: {
if (logger.isWarnEnabled()) {
logger.warn("Skipping unsupported MaraiDB DELETE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
}
break;
}
default:
/*
* Create an object of Ignorable_log_event for unrecognized
Expand Down
15 changes: 15 additions & 0 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ public abstract class LogEvent {

public static final int START_ENCRYPTION_EVENT = 164;

// mariadb 10.10.1
/*
* Compressed binlog event. Note that the order between WRITE/UPDATE/DELETE
* events is significant; this is so that we can convert from the compressed to
* the uncompressed event type with (type-WRITE_ROWS_COMPRESSED_EVENT +
* WRITE_ROWS_EVENT) and similar for _V1.
*/
public static final int QUERY_COMPRESSED_EVENT = 165;
public static final int WRITE_ROWS_COMPRESSED_EVENT_V1 = 166;
public static final int UPDATE_ROWS_COMPRESSED_EVENT_V1 = 167;
public static final int DELETE_ROWS_COMPRESSED_EVENT_V1 = 168;
public static final int WRITE_ROWS_COMPRESSED_EVENT = 169;
public static final int UPDATE_ROWS_COMPRESSED_EVENT = 170;
public static final int DELETE_ROWS_COMPRESSED_EVENT = 171;

/** end marker */
public static final int ENUM_END_EVENT = 165;

Expand Down

0 comments on commit ce916ee

Please sign in to comment.