Skip to content

Commit

Permalink
fixed issue #3494 , polardb-x 2.0 cdc support implicit_id/varaibles
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Apr 28, 2021
1 parent 5cc85ff commit 474063a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpGTIDCommandPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
Expand Down Expand Up @@ -444,7 +443,8 @@ private void updateSettings() throws IOException {
// mysql5.6需要设置slave_uuid避免被server kill链接
update("set @slave_uuid=uuid()");
} catch (Exception e) {
if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
if (!StringUtils.contains(e.getMessage(), "Unknown system variable")
&& !StringUtils.contains(e.getMessage(), "slave_uuid can't be set")) {
logger.warn("update slave_uuid failed", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
private int dumpErrorCount = 0; // binlogDump失败异常计数
private int dumpErrorCountThreshold = 2; // binlogDump失败异常计数阀值
private boolean rdsOssMode = false;
private boolean autoResetLatestPosMode = false; // true: binlog被删除之后,自动按最新的数据订阅
private boolean autoResetLatestPosMode = false; // true:
// binlog被删除之后,自动按最新的数据订阅

protected ErosaConnection buildErosaConnection() {
return buildMysqlConnection(this.runningInfo);
Expand Down Expand Up @@ -347,7 +348,7 @@ protected EntryPosition findStartPosition(ErosaConnection connection) throws IOE
if (StringUtils.isNotEmpty(logPosition.getPostion().getGtid())) {
return logPosition.getPostion();
}
}else {
} else {
if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
return masterPosition;
}
Expand Down Expand Up @@ -401,7 +402,7 @@ protected EntryPosition findPositionWithMasterIdAndTimestamp(MysqlConnection con
fixedPosition.getJournalName(),
true);
if (entryPosition == null) {
throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position"
throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position "
+ fixedPosition.getJournalName() + ":" + fixedPosition.getPosition());
}
return entryPosition;
Expand Down Expand Up @@ -486,7 +487,8 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
return findPosition;
}
// 处理 binlog 位点被删除的情况,提供自动重置到当前位点的功能
// 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog 丢失调到最新位点也即意味着数据丢失
// 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog
// 丢失调到最新位点也即意味着数据丢失
if (isAutoResetLatestPosMode()) {
dumpErrorCount = 0;
return findEndPosition(mysqlConnection);
Expand All @@ -497,9 +499,9 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
return null;
}
} else if (StringUtils.isBlank(logPosition.getPostion().getJournalName())
&& logPosition.getPostion().getPosition() <= 0
&& logPosition.getPostion().getTimestamp() > 0) {
return fallbackFindByStartTimestamp(logPosition,mysqlConnection);
&& logPosition.getPostion().getPosition() <= 0
&& logPosition.getPostion().getTimestamp() > 0) {
return fallbackFindByStartTimestamp(logPosition, mysqlConnection);
}
// 其余情况
logger.warn("prepare to find start position just last position\n {}",
Expand All @@ -522,7 +524,7 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
* @param mysqlConnection
* @return
*/
protected EntryPosition fallbackFindByStartTimestamp(LogPosition logPosition,MysqlConnection mysqlConnection){
protected EntryPosition fallbackFindByStartTimestamp(LogPosition logPosition, MysqlConnection mysqlConnection) {
long timestamp = logPosition.getPostion().getTimestamp();
long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event,
}

if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
if (tableMetaCache.isOnRDS()) {
if (tableMetaCache.isOnRDS() || tableMetaCache.isOnPolarX()) {
// 特殊处理下RDS的场景
List<FieldMeta> primaryKeys = tableMeta.getPrimaryFields();
if (primaryKeys == null || primaryKeys.isEmpty()) {
Expand Down Expand Up @@ -680,6 +680,9 @@ private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event,
if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
// 不解析最后一列
String rdsRowIdColumnName = "__#alibaba_rds_row_id#__";
if (tableMetaCache.isOnPolarX()) {
rdsRowIdColumnName = "_drds_implicit_id_";
}
buffer.nextValue(rdsRowIdColumnName, i, info.type, info.meta, false);
Column.Builder columnBuilder = Column.newBuilder();
columnBuilder.setName(rdsRowIdColumnName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class TableMetaCache {
public static final String EXTRA = "EXTRA";
private MysqlConnection connection;
private boolean isOnRDS = false;
private boolean isOnPolarX = false;
private boolean isOnTSDB = false;

private TableMetaTSDB tableMetaTSDB;
Expand Down Expand Up @@ -79,6 +80,14 @@ public TableMeta load(String name) throws Exception {
}
} catch (IOException e) {
}

try {
ResultSetPacket packet = connection.query("show global variables like 'polarx\\_%'");
if (packet.getFieldValues().size() > 0) {
isOnPolarX = true;
}
} catch (IOException e) {
}
}

private synchronized TableMeta getTableMetaByDB(String fullname) throws IOException {
Expand Down Expand Up @@ -254,7 +263,6 @@ private String getFullName(String schema, String table) {
.toString();
}


public boolean isOnTSDB() {
return isOnTSDB;
}
Expand All @@ -271,4 +279,12 @@ public void setOnRDS(boolean isOnRDS) {
this.isOnRDS = isOnRDS;
}

public boolean isOnPolarX() {
return isOnPolarX;
}

public void setOnPolarX(boolean isOnPolarX) {
this.isOnPolarX = isOnPolarX;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import org.apache.commons.lang.StringUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -46,7 +45,6 @@
import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;

@Ignore
public class DirectLogFetcherTest {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());
Expand All @@ -58,7 +56,7 @@ public class DirectLogFetcherTest {
public void testSimple() {
DirectLogFetcher fetcher = new DirectLogFetcher();
try {
MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal");
connector.connect();
updateSettings(connector);
loadBinlogChecksum(connector);
Expand Down Expand Up @@ -210,7 +208,8 @@ private void updateSettings(MysqlConnector connector) throws IOException {
// mysql5.6需要设置slave_uuid避免被server kill链接
update("set @slave_uuid=uuid()", connector);
} catch (Exception e) {
if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
if (!StringUtils.contains(e.getMessage(), "Unknown system variable")
&& !StringUtils.contains(e.getMessage(), "slave_uuid can't be set")) {
logger.warn("update slave_uuid failed", e);
}
}
Expand Down

0 comments on commit 474063a

Please sign in to comment.