Skip to content

Commit

Permalink
[Improve][JDBC Source] Fix Split can not be cancel (apache#6825)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored and chaorongzhi committed Aug 21, 2024
1 parent f2864b1 commit 9091545
Show file tree
Hide file tree
Showing 19 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
}

private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ default Object queryMin(
@Deprecated
Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
throws SQLException;
throws Exception;

/**
* Performs a sampling operation on the specified column of a table in a JDBC-connected
Expand All @@ -97,7 +97,7 @@ Object[] sampleDataFromColumn(
*/
default Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int samplingRate)
throws SQLException {
throws Exception {
return sampleDataFromColumn(jdbc, tableId, column.name(), samplingRate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
throws SQLException {
throws Exception {
return new Object[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
throws SQLException {
throws Exception {
return new Object[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, inverseSamplingRate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public static Object[] sampleDataFromColumn(

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));

Expand All @@ -172,6 +172,9 @@ public static Object[] skipReadAndSortSampleData(
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
} finally {
if (rs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
return OracleUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public static Object[] sampleDataFromColumn(

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quoteSchemaAndTable(tableId));

Expand All @@ -176,6 +176,9 @@ public static Object[] skipReadAndSortSampleData(
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
} finally {
if (rs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
return PostgresUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, null, inverseSamplingRate);
}

@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int inverseSamplingRate)
throws SQLException {
throws Exception {
return PostgresUtils.skipReadAndSortSampleData(
jdbc, tableId, column.name(), column, inverseSamplingRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public static Object[] skipReadAndSortSampleData(
String columnName,
Column column,
int inverseSamplingRate)
throws SQLException {
throws Exception {
columnName = quote(columnName);
if (column != null) {
columnName = JDBC_DIALECT.convertType(columnName, column.typeName());
Expand All @@ -187,6 +187,9 @@ public static Object[] skipReadAndSortSampleData(
if (count % 100000 == 0) {
log.info("Processing row index: {}", count);
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
return SqlServerUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public static Object[] sampleDataFromColumn(

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));

Expand All @@ -177,6 +177,9 @@ public static Object[] skipReadAndSortSampleData(
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
} finally {
if (rs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ default Object[] sampleDataFromColumn(
String columnName,
int samplingRate,
int fetchSize)
throws SQLException {
throws Exception {
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
Expand All @@ -337,6 +337,9 @@ default Object[] sampleDataFromColumn(
if (count % samplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public Object[] sampleDataFromColumn(
String columnName,
int samplingRate,
int fetchSize)
throws SQLException {
throws Exception {
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
Expand All @@ -158,6 +158,9 @@ public Object[] sampleDataFromColumn(
if (count % samplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized void close() {
}
}

public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws SQLException {
public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws Exception {
log.info("Start splitting table {} into chunks...", table.getTablePath());
long start = System.currentTimeMillis();

Expand All @@ -111,7 +111,7 @@ public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws
}

protected abstract Collection<JdbcSourceSplit> createSplits(
JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws SQLException;
JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws SQLException, Exception;

public PreparedStatement generateSplitStatement(JdbcSourceSplit split, TableSchema schema)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public DynamicChunkSplitter(JdbcSourceConfig config) {

@Override
protected Collection<JdbcSourceSplit> createSplits(
JdbcSourceTable table, SeaTunnelRowType splitKey) throws SQLException {
JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception {
return createDynamicSplits(table, splitKey);
}

Expand All @@ -73,7 +73,7 @@ protected PreparedStatement createSplitStatement(JdbcSourceSplit split, TableSch
}

private Collection<JdbcSourceSplit> createDynamicSplits(
JdbcSourceTable table, SeaTunnelRowType splitKey) throws SQLException {
JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception {
String splitKeyName = splitKey.getFieldNames()[0];
SeaTunnelDataType splitKeyType = splitKey.getFieldType(0);
List<ChunkRange> chunks = splitTableIntoChunks(table, splitKeyName, splitKeyType);
Expand Down Expand Up @@ -105,7 +105,7 @@ private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit split, Tab

private List<ChunkRange> splitTableIntoChunks(
JdbcSourceTable table, String splitColumnName, SeaTunnelDataType splitColumnType)
throws SQLException {
throws Exception {
Pair<Object, Object> minMax = queryMinMax(table, splitColumnName);
Object min = minMax.getLeft();
Object max = minMax.getRight();
Expand Down Expand Up @@ -136,7 +136,7 @@ private List<ChunkRange> splitTableIntoChunks(

private List<ChunkRange> evenlyColumnSplitChunks(
JdbcSourceTable table, String splitColumnName, Object min, Object max, int chunkSize)
throws SQLException {
throws Exception {
TablePath tablePath = table.getTablePath();
double distributionFactorUpper = config.getSplitEvenDistributionFactorUpperBound();
double distributionFactorLower = config.getSplitEvenDistributionFactorLowerBound();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -104,7 +103,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
};

@Test
public void testSampleDataFromColumnSuccess() throws SQLException {
public void testSampleDataFromColumnSuccess() throws Exception {
JdbcDialect dialect = new OracleDialect();
JdbcSourceTable table =
JdbcSourceTable.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public class JdbcIrisIT extends AbstractJdbcIT {
};

@Test
public void testSampleDataFromColumnSuccess() throws SQLException {
public void testSampleDataFromColumnSuccess() throws Exception {
JdbcDialect dialect = new IrisDialect();
JdbcSourceTable table =
JdbcSourceTable.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public void testSplit() throws Exception {

private JdbcSourceSplit[] getCheckedSplitArray(
Map<String, Object> configMap, CatalogTable table, String splitKey, int splitNum)
throws SQLException {
throws Exception {
configMap.put("partition_column", splitKey);
DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);

Expand Down

0 comments on commit 9091545

Please sign in to comment.