From d9916ba19a6a9dac90293b44fd6c5fb97553716d Mon Sep 17 00:00:00 2001 From: hailin0 Date: Wed, 17 May 2023 21:42:04 +0800 Subject: [PATCH] [Hotfix][CDC] Fix chunk start/end parameter type error Incorrect wrapping as Array types, but only Array type required --- .../cdc/base/source/split/CompletedSnapshotSplitInfo.java | 8 ++++---- .../connectors/cdc/base/source/split/SnapshotSplit.java | 8 ++++---- .../cdc/mysql/source/eumerator/MySqlChunkSplitter.java | 4 +++- .../reader/fetch/scan/MySqlSnapshotSplitReadTask.java | 4 ++-- .../source/source/eumerator/SqlServerChunkSplitter.java | 4 +++- .../reader/fetch/scan/SqlServerSnapshotSplitReadTask.java | 4 ++-- 6 files changed, 18 insertions(+), 14 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java index 60e511ec2b5..39b78d03c56 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java @@ -30,16 +30,16 @@ public class CompletedSnapshotSplitInfo implements Serializable { private final String splitId; private final TableId tableId; private final SeaTunnelRowType splitKeyType; - private final Object splitStart; - private final Object splitEnd; + private final Object[] splitStart; + private final Object[] splitEnd; private final Offset watermark; public CompletedSnapshotSplitInfo( String splitId, TableId tableId, SeaTunnelRowType splitKeyType, - Object splitStart, - Object splitEnd, + Object[] splitStart, + Object[] splitEnd, Offset watermark) { this.splitId = splitId; this.tableId = tableId; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java index 733f8832f8b..776ad68918f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java @@ -28,8 +28,8 @@ public class SnapshotSplit extends SourceSplitBase { private static final long serialVersionUID = 1L; private final TableId tableId; private final SeaTunnelRowType splitKeyType; - private final Object splitStart; - private final Object splitEnd; + private final Object[] splitStart; + private final Object[] splitEnd; private final Offset highWatermark; @@ -37,8 +37,8 @@ public SnapshotSplit( String splitId, TableId tableId, SeaTunnelRowType splitKeyType, - Object splitStart, - Object splitEnd, + Object[] splitStart, + Object[] splitEnd, Offset highWatermark) { super(splitId); this.tableId = tableId; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java index a0f2f9c2622..c248edd69d7 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java @@ -295,8 +295,10 @@ private SnapshotSplit createSnapshotSplit( Object chunkStart, Object chunkEnd) { // currently, we only support single split column + Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; + Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; return new SnapshotSplit( - splitId(tableId, chunkId), tableId, splitKeyType, chunkStart, chunkEnd, null); + splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd, null); } // ------------------------------------------------------------------------------------------ diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java index 2aafa3789e4..82104aeb11f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java @@ -205,8 +205,8 @@ private void createDataEventsForTable( selectSql, snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null, - new Object[] {snapshotSplit.getSplitStart()}, - new Object[] {snapshotSplit.getSplitEnd()}, + snapshotSplit.getSplitStart(), + snapshotSplit.getSplitEnd(), snapshotSplit.getSplitKeyType().getTotalFields(), connectorConfig.getSnapshotFetchSize()); ResultSet rs = selectStatement.executeQuery()) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java index dc751ca2e03..d25269ee5db 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java @@ -292,8 +292,10 @@ private SnapshotSplit createSnapshotSplit( Object chunkStart, Object chunkEnd) { // currently, we only support single split column + Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; + Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; return new SnapshotSplit( - splitId(tableId, chunkId), tableId, splitKeyType, chunkStart, chunkEnd, null); + splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd, null); } // ------------------------------------------------------------------------------------------ diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java index 8995ef4f5a4..d25934ff6a2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java @@ -195,8 +195,8 @@ private void createDataEventsForTable( selectSql, snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null, - new Object[] {snapshotSplit.getSplitStart()}, - new Object[] {snapshotSplit.getSplitEnd()}, + snapshotSplit.getSplitStart(), + snapshotSplit.getSplitEnd(), snapshotSplit.getSplitKeyType().getTotalFields(), connectorConfig.getSnapshotFetchSize()); ResultSet rs = selectStatement.executeQuery()) {