Skip to content

Commit

Permalink
Fix Split can not be cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed May 9, 2024
1 parent d826cf9 commit 959232d
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ default Object[] sampleDataFromColumn(
String columnName,
int samplingRate,
int fetchSize)
throws SQLException {
throws Exception {
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
Expand All @@ -333,6 +333,9 @@ default Object[] sampleDataFromColumn(
if (count % samplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public Object[] sampleDataFromColumn(
String columnName,
int samplingRate,
int fetchSize)
throws SQLException {
throws Exception {
String sampleQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
sampleQuery =
Expand All @@ -158,6 +158,9 @@ public Object[] sampleDataFromColumn(
if (count % samplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized void close() {
}
}

public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws SQLException {
public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws Exception {
log.info("Start splitting table {} into chunks...", table.getTablePath());
long start = System.currentTimeMillis();

Expand All @@ -111,7 +111,7 @@ public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws
}

protected abstract Collection<JdbcSourceSplit> createSplits(
JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws SQLException;
JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws SQLException, Exception;

public PreparedStatement generateSplitStatement(JdbcSourceSplit split) throws SQLException {
if (split.getSplitKeyName() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public DynamicChunkSplitter(JdbcSourceConfig config) {

@Override
protected Collection<JdbcSourceSplit> createSplits(
JdbcSourceTable table, SeaTunnelRowType splitKey) throws SQLException {
JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception {
return createDynamicSplits(table, splitKey);
}

Expand All @@ -70,7 +70,7 @@ protected PreparedStatement createSplitStatement(JdbcSourceSplit split) throws S
}

private Collection<JdbcSourceSplit> createDynamicSplits(
JdbcSourceTable table, SeaTunnelRowType splitKey) throws SQLException {
JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception {
String splitKeyName = splitKey.getFieldNames()[0];
SeaTunnelDataType splitKeyType = splitKey.getFieldType(0);
List<ChunkRange> chunks = splitTableIntoChunks(table, splitKeyName, splitKeyType);
Expand Down Expand Up @@ -102,7 +102,7 @@ private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit split)

private List<ChunkRange> splitTableIntoChunks(
JdbcSourceTable table, String splitColumnName, SeaTunnelDataType splitColumnType)
throws SQLException {
throws Exception {
Pair<Object, Object> minMax = queryMinMax(table, splitColumnName);
Object min = minMax.getLeft();
Object max = minMax.getRight();
Expand Down Expand Up @@ -133,7 +133,7 @@ private List<ChunkRange> splitTableIntoChunks(

private List<ChunkRange> evenlyColumnSplitChunks(
JdbcSourceTable table, String splitColumnName, Object min, Object max, int chunkSize)
throws SQLException {
throws Exception {
TablePath tablePath = table.getTablePath();
double distributionFactorUpper = config.getSplitEvenDistributionFactorUpperBound();
double distributionFactorLower = config.getSplitEvenDistributionFactorLowerBound();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -104,7 +103,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
};

@Test
public void testSampleDataFromColumnSuccess() throws SQLException {
public void testSampleDataFromColumnSuccess() throws Exception {
JdbcDialect dialect = new OracleDialect();
JdbcSourceTable table =
JdbcSourceTable.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public void testSplit() throws Exception {

private JdbcSourceSplit[] getCheckedSplitArray(
Map<String, Object> configMap, CatalogTable table, String splitKey, int splitNum)
throws SQLException {
throws Exception {
configMap.put("partition_column", splitKey);
DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap);

Expand Down

0 comments on commit 959232d

Please sign in to comment.