Skip to content

Commit

Permalink
[Feature][Connector-V2]Jdbc chunk split add “snapshot.split.column” p…
Browse files Browse the repository at this point in the history
…arams apache#7794
  • Loading branch information
XenosK committed Oct 15, 2024
1 parent 69bf03c commit 1165447
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
@Getter protected final StopConfig stopConfig;

@Getter protected final int splitSize;
@Getter protected final String splitColumn;

@Getter protected final double distributionFactorUpper;
@Getter protected final double distributionFactorLower;
Expand All @@ -50,6 +51,7 @@ public BaseSourceConfig(
StartupConfig startupConfig,
StopConfig stopConfig,
int splitSize,
String splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -59,6 +61,7 @@ public BaseSourceConfig(
this.startupConfig = startupConfig;
this.stopConfig = stopConfig;
this.splitSize = splitSize;
this.splitColumn = splitColumn;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.sampleShardingThreshold = sampleShardingThreshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public JdbcSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
String splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -70,6 +71,7 @@ public JdbcSourceConfig(
startupConfig,
stopConfig,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD.defaultValue();
protected int inverseSamplingRate = JdbcSourceOptions.INVERSE_SAMPLING_RATE.defaultValue();
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
protected String splitColumn = SourceOptions.SNAPSHOT_SPLIT_COLUMN.defaultValue();;
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
protected String serverTimeZone = JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
protected long connectTimeoutMillis = JdbcSourceOptions.CONNECT_TIMEOUT_MS.defaultValue();
Expand All @@ -65,6 +66,11 @@ public JdbcSourceConfigFactory hostname(String hostname) {
return this;
}

public JdbcSourceConfigFactory splitColumn(String splitColumn) {
this.splitColumn = splitColumn;
return this;
}

/** Integer port number of the database server. */
public JdbcSourceConfigFactory port(int port) {
this.port = port;
Expand Down Expand Up @@ -239,6 +245,7 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
this.sampleShardingThreshold = config.get(JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD);
this.inverseSamplingRate = config.get(JdbcSourceOptions.INVERSE_SAMPLING_RATE);
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
this.splitColumn = config.get(SourceOptions.SNAPSHOT_SPLIT_COLUMN);
this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
this.connectTimeoutMillis = config.get(JdbcSourceOptions.CONNECT_TIMEOUT_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ default List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection, TableId
.collect(Collectors.toList());
}

default Boolean isUniqueKey(JdbcConnection jdbcConnection, TableId tableId, String columnName)
throws SQLException {
boolean isUnique = false;
if (null != columnName) {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
ResultSet resultSet =
metaData.getIndexInfo(
tableId.catalog(), tableId.schema(), tableId.table(), false, false);

while (resultSet.next()) {
if (columnName.equalsIgnoreCase(resultSet.getString("COLUMN_NAME"))
&& resultSet.getBoolean("NON_UNIQUE") == false) {
isUnique = true;
break;
}
}
}
return isUnique;
}

default List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId)
throws SQLException {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public class SourceOptions {
.withDescription(
"The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.");

public static final Option<String> SNAPSHOT_SPLIT_COLUMN =
Options.key("snapshot.split.column")
.stringType()
.noDefaultValue()
.withDescription(
"The split column of table snapshot, captured tables are split into multiple splits when read the snapshot of table.");

public static final Option<Integer> SNAPSHOT_FETCH_SIZE =
Options.key("snapshot.fetch.size")
.intType()
Expand Down Expand Up @@ -111,7 +118,7 @@ public class SourceOptions {
public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
.optional(FORMAT)
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE, SNAPSHOT_SPLIT_COLUMN)
.optional(INCREMENTAL_PARALLELISM)
.optional(DEBEZIUM_PROPERTIES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,28 @@ protected SnapshotSplit createSnapshotSplit(
protected Column getSplitColumn(
JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId tableId)
throws SQLException {
Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
Column splitColumn = null;
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();

// first , compare user defined split column is in the primary key or unique key
String sc = null;
try {
sc = sourceConfig.getSplitColumn();
} catch (Exception e) {
log.error("Config splitColumn get exception in {}:{}", tableId, e);
}
Boolean isUniqueKey = dialect.isUniqueKey(jdbc, tableId, sc);
if (isUniqueKey) {
Column column = table.columnWithName(sc);
return column;
} else {
log.warn("Config splitColumn not exists or nor unique key for table {}", tableId);
}

Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
if (primaryKey.isPresent()) {
List<String> pkColumns = primaryKey.get().getColumnNames();

Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
for (String pkColumn : pkColumns) {
Column column = table.columnWithName(pkColumn);
if (isEvenlySplitColumn(column)) {
Expand All @@ -400,7 +416,6 @@ protected Column getSplitColumn(

List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
if (!uniqueKeys.isEmpty()) {
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
for (ConstraintKey uniqueKey : uniqueKeys) {
List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
uniqueKey.getColumnNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public MySqlSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
String splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -64,6 +65,7 @@ public MySqlSourceConfig(
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public MySqlSourceConfig create(int subtaskId) {
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public OracleSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
String splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -72,6 +73,7 @@ public OracleSourceConfig(
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public OracleSourceConfig create(int subtask) {
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public PostgresSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
String splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -59,6 +60,7 @@ public PostgresSourceConfig(
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public PostgresSourceConfig create(int subtask) {
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public SqlServerSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
String splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -64,6 +65,7 @@ public SqlServerSourceConfig(
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public SqlServerSourceConfig create(int subtask) {
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down

0 comments on commit 1165447

Please sign in to comment.