From cd95635ea482a2f0f785990f35f2f8cc47c96365 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Mon, 6 Mar 2023 18:42:04 +0800 Subject: [PATCH 1/5] [Doc] [Connector-V2] Improve MySQL-CDC Doc For connect.timeout.ms field --- docs/en/connector-v2/source/MySQL-CDC.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index ee2a2d4101c..c764b92637b 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -5,7 +5,7 @@ ## Description The MySQL CDC connector allows for reading snapshot data and incremental data from MySQL database. This document -describes how to setup the MySQL CDC connector to run SQL queries against MySQL databases. +describes how to set up the MySQL CDC connector to run SQL queries against MySQL databases. ## Key features @@ -18,7 +18,7 @@ describes how to setup the MySQL CDC connector to run SQL queries against MySQL ## Options -| name | type | required | default value | +| name | type | required | default value | |------------------------------------------------|----------|----------|---------------| | username | String | Yes | - | | password | String | Yes | - | @@ -38,7 +38,7 @@ describes how to setup the MySQL CDC connector to run SQL queries against MySQL | snapshot.fetch.size | Integer | No | 1024 | | server-id | String | No | - | | server-time-zone | String | No | UTC | -| connect.timeout | Duration | No | 30s | +| connect.timeout.ms | Duration | No | 30000 | | connect.max-retries | Integer | No | 3 | | connection.pool.size | Integer | No | 20 | | chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 | @@ -137,7 +137,7 @@ By default, a random number is generated between 5400 and 6400, though we recomm The session time zone in database server. -### connect.timeout [Duration] +### connect.timeout.ms [long] The maximum time that the connector should wait after trying to connect to the database server before timing out. @@ -191,7 +191,6 @@ source { ## Changelog -### next version - - Add MySQL CDC Source Connector +### next version From 0e662c75d6b7b37349529995ea29d56235c2d7bb Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 7 Mar 2023 11:29:02 +0800 Subject: [PATCH 2/5] [Core] [Improve] Unify the checkpoint setting key of Flink --- .../java/org/apache/seatunnel/common/Constants.java | 4 ---- .../flink/execution/FlinkRuntimeEnvironment.java | 12 ++++++++++-- .../core/starter/flink/utils/ConfigKeyName.java | 3 ++- .../seatunnel/engine/server/NodeExtension.java | 2 +- .../seatunnel/engine/server/NodeExtensionCommon.java | 6 +++--- .../spark/source/SeaTunnelSourceSupport.java | 4 +++- .../source/partition/micro/SeaTunnelMicroBatch.java | 3 ++- 7 files changed, 21 insertions(+), 13 deletions(-) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java index 030cc4dbd35..52d2f76731a 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java @@ -18,8 +18,6 @@ package org.apache.seatunnel.common; public final class Constants { - public static final String ROW_ROOT = "__root__"; - public static final String ROW_TMP = "__tmp__"; public static final String LOGO = "SeaTunnel"; @@ -37,8 +35,6 @@ public final class Constants { public static final String HDFS_USER = "hdfs.user"; - public static final String CHECKPOINT_INTERVAL = "checkpoint.interval"; - public static final String CHECKPOINT_ID = "checkpoint.id"; public static final String UUID = "uuid"; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index e180ebb74cc..7fb75064a4c 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; @@ -235,9 +236,16 @@ private void setTimeCharacteristic() { } private void setCheckpoint() { - if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + + long interval = 0; + if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) { + interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); + } + + if (interval > 0) { CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); - long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); environment.enableCheckpointing(interval); if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java index 78d444a57dc..699d7e3798a 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java @@ -27,7 +27,8 @@ private ConfigKeyName() { public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public static final String PARALLELISM = "execution.parallelism"; public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; + + @Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java index 2c62314e2b1..d4137955c8b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java @@ -85,6 +85,6 @@ public TextCommandService createTextCommandService() { @Override public void printNodeInfo() { - extCommon.printNodeInfo(systemLogger, ""); + extCommon.printNodeInfo(systemLogger); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java index 8cf0a34706a..8551f9381af 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java @@ -58,15 +58,15 @@ void onClusterStateChange(ClusterState ignored) { // TODO This is where cluster state changes are handled } - void printNodeInfo(ILogger log, String addToProductName) { - log.info(imdgVersionMessage()); + void printNodeInfo(ILogger log) { + log.info(imgVersionMessage()); log.info(clusterNameMessage()); log.fine(serializationVersionMessage()); log.info('\n' + Constants.ST_LOGO); log.info(Constants.COPYRIGHT_LINE); } - private String imdgVersionMessage() { + private String imgVersionMessage() { String build = node.getBuildInfo().getBuild(); String revision = node.getBuildInfo().getRevision(); if (!revision.isEmpty()) { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java index 4e2c3e111b0..604ba203cae 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.translation.spark.source; import org.apache.seatunnel.api.common.CommonOptions; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.Constants; @@ -74,7 +75,8 @@ public MicroBatchReader createMicroBatchReader( SeaTunnelSource seaTunnelSource = getSeaTunnelSource(options); Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 1); Integer checkpointInterval = - options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT); + options.getInt( + EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT); String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state"); Configuration configuration = diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java index 4d9a9b17f19..c4d03d2bd76 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.translation.spark.source.partition.micro; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportCoordinate; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -74,7 +75,7 @@ public Offset latestOffset() { public InputPartition[] planInputPartitions(Offset start, Offset end) { int checkpointInterval = caseInsensitiveStringMap.getInt( - Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT); + EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT); Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration(); String hdfsRoot = From 4de69c90ea32e23a10271473d1ae25242d084920 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 7 Mar 2023 11:31:34 +0800 Subject: [PATCH 3/5] Revert "[Doc] [Connector-V2] Improve MySQL-CDC Doc For connect.timeout.ms field" This reverts commit cd95635ea482a2f0f785990f35f2f8cc47c96365. --- docs/en/connector-v2/source/MySQL-CDC.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index c764b92637b..ee2a2d4101c 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -5,7 +5,7 @@ ## Description The MySQL CDC connector allows for reading snapshot data and incremental data from MySQL database. This document -describes how to set up the MySQL CDC connector to run SQL queries against MySQL databases. +describes how to setup the MySQL CDC connector to run SQL queries against MySQL databases. ## Key features @@ -18,7 +18,7 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL ## Options -| name | type | required | default value | +| name | type | required | default value | |------------------------------------------------|----------|----------|---------------| | username | String | Yes | - | | password | String | Yes | - | @@ -38,7 +38,7 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL | snapshot.fetch.size | Integer | No | 1024 | | server-id | String | No | - | | server-time-zone | String | No | UTC | -| connect.timeout.ms | Duration | No | 30000 | +| connect.timeout | Duration | No | 30s | | connect.max-retries | Integer | No | 3 | | connection.pool.size | Integer | No | 20 | | chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 | @@ -137,7 +137,7 @@ By default, a random number is generated between 5400 and 6400, though we recomm The session time zone in database server. -### connect.timeout.ms [long] +### connect.timeout [Duration] The maximum time that the connector should wait after trying to connect to the database server before timing out. @@ -191,6 +191,7 @@ source { ## Changelog +### next version + - Add MySQL CDC Source Connector -### next version From d7535af21aa0ece1fd00f22b9543e0e5ea9f0bb5 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 7 Mar 2023 11:29:02 +0800 Subject: [PATCH 4/5] [Core] [Improve] Unify the checkpoint setting key of Flink --- .../java/org/apache/seatunnel/common/Constants.java | 4 ---- .../flink/execution/FlinkRuntimeEnvironment.java | 12 ++++++++++-- .../core/starter/flink/utils/ConfigKeyName.java | 3 ++- .../seatunnel/engine/server/NodeExtension.java | 2 +- .../seatunnel/engine/server/NodeExtensionCommon.java | 6 +++--- .../spark/source/SeaTunnelSourceSupport.java | 4 +++- .../source/partition/micro/SeaTunnelMicroBatch.java | 3 ++- 7 files changed, 21 insertions(+), 13 deletions(-) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java index 030cc4dbd35..52d2f76731a 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java @@ -18,8 +18,6 @@ package org.apache.seatunnel.common; public final class Constants { - public static final String ROW_ROOT = "__root__"; - public static final String ROW_TMP = "__tmp__"; public static final String LOGO = "SeaTunnel"; @@ -37,8 +35,6 @@ public final class Constants { public static final String HDFS_USER = "hdfs.user"; - public static final String CHECKPOINT_INTERVAL = "checkpoint.interval"; - public static final String CHECKPOINT_ID = "checkpoint.id"; public static final String UUID = "uuid"; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index e180ebb74cc..7fb75064a4c 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; @@ -235,9 +236,16 @@ private void setTimeCharacteristic() { } private void setCheckpoint() { - if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + + long interval = 0; + if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) { + interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); + } + + if (interval > 0) { CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); - long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); environment.enableCheckpointing(interval); if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java index 78d444a57dc..699d7e3798a 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java @@ -27,7 +27,8 @@ private ConfigKeyName() { public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public static final String PARALLELISM = "execution.parallelism"; public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; + + @Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java index 2c62314e2b1..d4137955c8b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java @@ -85,6 +85,6 @@ public TextCommandService createTextCommandService() { @Override public void printNodeInfo() { - extCommon.printNodeInfo(systemLogger, ""); + extCommon.printNodeInfo(systemLogger); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java index 8cf0a34706a..8551f9381af 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java @@ -58,15 +58,15 @@ void onClusterStateChange(ClusterState ignored) { // TODO This is where cluster state changes are handled } - void printNodeInfo(ILogger log, String addToProductName) { - log.info(imdgVersionMessage()); + void printNodeInfo(ILogger log) { + log.info(imgVersionMessage()); log.info(clusterNameMessage()); log.fine(serializationVersionMessage()); log.info('\n' + Constants.ST_LOGO); log.info(Constants.COPYRIGHT_LINE); } - private String imdgVersionMessage() { + private String imgVersionMessage() { String build = node.getBuildInfo().getBuild(); String revision = node.getBuildInfo().getRevision(); if (!revision.isEmpty()) { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java index 4e2c3e111b0..604ba203cae 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.translation.spark.source; import org.apache.seatunnel.api.common.CommonOptions; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.Constants; @@ -74,7 +75,8 @@ public MicroBatchReader createMicroBatchReader( SeaTunnelSource seaTunnelSource = getSeaTunnelSource(options); Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 1); Integer checkpointInterval = - options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT); + options.getInt( + EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT); String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state"); Configuration configuration = diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java index 4d9a9b17f19..c4d03d2bd76 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.translation.spark.source.partition.micro; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportCoordinate; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -74,7 +75,7 @@ public Offset latestOffset() { public InputPartition[] planInputPartitions(Offset start, Offset end) { int checkpointInterval = caseInsensitiveStringMap.getInt( - Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT); + EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT); Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration(); String hdfsRoot = From 33a3bfd3e55c8a0cf6c1dbd3cf51bc9b5fa4e30d Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 7 Mar 2023 11:42:11 +0800 Subject: [PATCH 5/5] [Core] [Improve] Unify the checkpoint setting key of Flink --- .../flink/execution/FlinkRuntimeEnvironment.java | 12 ++++++++++-- .../core/starter/flink/utils/ConfigKeyName.java | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 951312b7403..4b5bef07cb0 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; @@ -235,9 +236,16 @@ private void setTimeCharacteristic() { } private void setCheckpoint() { - if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + + long interval = 0; + if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) { + interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); + } + + if (interval > 0) { CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); - long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); environment.enableCheckpointing(interval); if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java index 78d444a57dc..cc8229f26dc 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java @@ -27,7 +27,7 @@ private ConfigKeyName() { public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public static final String PARALLELISM = "execution.parallelism"; public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; + @Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";