Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][JDBC Source] Fix Split can not be cancel #6825

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
}

private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ default Object queryMin(
@Deprecated
Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
throws SQLException;
throws Exception;

/**
* Performs a sampling operation on the specified column of a table in a JDBC-connected
Expand All @@ -97,7 +97,7 @@ Object[] sampleDataFromColumn(
*/
default Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int samplingRate)
throws SQLException {
throws Exception {
return sampleDataFromColumn(jdbc, tableId, column.name(), samplingRate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
throws SQLException {
throws Exception {
return new Object[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
throws SQLException {
throws Exception {
return new Object[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, inverseSamplingRate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public static Object[] sampleDataFromColumn(

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));

Expand All @@ -172,6 +172,9 @@ public static Object[] skipReadAndSortSampleData(
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
} finally {
if (rs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
return OracleUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public static Object[] sampleDataFromColumn(

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quoteSchemaAndTable(tableId));

Expand All @@ -176,6 +176,9 @@ public static Object[] skipReadAndSortSampleData(
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
} finally {
if (rs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
return PostgresUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, null, inverseSamplingRate);
}

@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int inverseSamplingRate)
throws SQLException {
throws Exception {
return PostgresUtils.skipReadAndSortSampleData(
jdbc, tableId, column.name(), column, inverseSamplingRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public static Object[] skipReadAndSortSampleData(
String columnName,
Column column,
int inverseSamplingRate)
throws SQLException {
throws Exception {
columnName = quote(columnName);
if (column != null) {
columnName = JDBC_DIALECT.convertType(columnName, column.typeName());
Expand All @@ -187,6 +187,9 @@ public static Object[] skipReadAndSortSampleData(
if (count % 100000 == 0) {
log.info("Processing row index: {}", count);
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Object queryMin(
@Override
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
return SqlServerUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public static Object[] sampleDataFromColumn(

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
throws Exception {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));

Expand All @@ -177,6 +177,9 @@ public static Object[] skipReadAndSortSampleData(
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
} finally {
if (rs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ default Object[] sampleDataFromColumn(
String columnName,
int samplingRate,
int fetchSize)
throws SQLException {
throws Exception {
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
Expand All @@ -337,6 +337,9 @@ default Object[] sampleDataFromColumn(
if (count % samplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, added

throw new InterruptedException("Thread interrupted");
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public Object[] sampleDataFromColumn(
String columnName,
int samplingRate,
int fetchSize)
throws SQLException {
throws Exception {
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
Expand All @@ -158,6 +158,9 @@ public Object[] sampleDataFromColumn(
if (count % samplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized void close() {
}
}

public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws SQLException {
public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws Exception {
log.info("Start splitting table {} into chunks...", table.getTablePath());
long start = System.currentTimeMillis();

Expand All @@ -111,7 +111,7 @@ public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws
}

protected abstract Collection<JdbcSourceSplit> createSplits(
JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws SQLException;
JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws SQLException, Exception;

public PreparedStatement generateSplitStatement(JdbcSourceSplit split, TableSchema schema)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public DynamicChunkSplitter(JdbcSourceConfig config) {

@Override
protected Collection<JdbcSourceSplit> createSplits(
JdbcSourceTable table, SeaTunnelRowType splitKey) throws SQLException {
JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception {
return createDynamicSplits(table, splitKey);
}

Expand All @@ -73,7 +73,7 @@ protected PreparedStatement createSplitStatement(JdbcSourceSplit split, TableSch
}

private Collection<JdbcSourceSplit> createDynamicSplits(
JdbcSourceTable table, SeaTunnelRowType splitKey) throws SQLException {
JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception {
String splitKeyName = splitKey.getFieldNames()[0];
SeaTunnelDataType splitKeyType = splitKey.getFieldType(0);
List<ChunkRange> chunks = splitTableIntoChunks(table, splitKeyName, splitKeyType);
Expand Down Expand Up @@ -105,7 +105,7 @@ private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit split, Tab

private List<ChunkRange> splitTableIntoChunks(
JdbcSourceTable table, String splitColumnName, SeaTunnelDataType splitColumnType)
throws SQLException {
throws Exception {
Pair<Object, Object> minMax = queryMinMax(table, splitColumnName);
Object min = minMax.getLeft();
Object max = minMax.getRight();
Expand Down Expand Up @@ -136,7 +136,7 @@ private List<ChunkRange> splitTableIntoChunks(

private List<ChunkRange> evenlyColumnSplitChunks(
JdbcSourceTable table, String splitColumnName, Object min, Object max, int chunkSize)
throws SQLException {
throws Exception {
TablePath tablePath = table.getTablePath();
double distributionFactorUpper = config.getSplitEvenDistributionFactorUpperBound();
double distributionFactorLower = config.getSplitEvenDistributionFactorLowerBound();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -104,7 +103,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
};

@Test
public void testSampleDataFromColumnSuccess() throws SQLException {
public void testSampleDataFromColumnSuccess() throws Exception {
JdbcDialect dialect = new OracleDialect();
JdbcSourceTable table =
JdbcSourceTable.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public class JdbcIrisIT extends AbstractJdbcIT {
};

@Test
public void testSampleDataFromColumnSuccess() throws SQLException {
public void testSampleDataFromColumnSuccess() throws Exception {
JdbcDialect dialect = new IrisDialect();
JdbcSourceTable table =
JdbcSourceTable.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public void testSplit() throws Exception {

private JdbcSourceSplit[] getCheckedSplitArray(
Map<String, Object> configMap, CatalogTable table, String splitKey, int splitNum)
throws SQLException {
throws Exception {
configMap.put("partition_column", splitKey);
DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);

Expand Down
Loading