diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java index c5e04d5fab5..602143e100c 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java @@ -18,8 +18,6 @@ package org.apache.seatunnel.common; public final class Constants { - public static final String ROW_ROOT = "__root__"; - public static final String ROW_TMP = "__tmp__"; public static final String LOGO = "SeaTunnel"; @@ -39,8 +37,6 @@ public final class Constants { public static final String HDFS_USER = "hdfs.user"; - public static final String CHECKPOINT_INTERVAL = "checkpoint.interval"; - public static final String CHECKPOINT_ID = "checkpoint.id"; public static final String UUID = "uuid"; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index e180ebb74cc..7fb75064a4c 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; @@ -235,9 +236,16 @@ private void setTimeCharacteristic() { } private void setCheckpoint() { - if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + + long interval = 0; + if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) { + interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); + } + + if (interval > 0) { CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); - long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); environment.enableCheckpointing(interval); if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java index 78d444a57dc..699d7e3798a 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java @@ -27,7 +27,8 @@ private ConfigKeyName() { public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public static final String PARALLELISM = "execution.parallelism"; public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; + + @Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri"; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 951312b7403..4b5bef07cb0 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; @@ -235,9 +236,16 @@ private void setTimeCharacteristic() { } private void setCheckpoint() { - if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + + long interval = 0; + if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) { + interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { + interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); + } + + if (interval > 0) { CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); - long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); environment.enableCheckpointing(interval); if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java index 78d444a57dc..cc8229f26dc 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java @@ -27,7 +27,7 @@ private ConfigKeyName() { public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public static final String PARALLELISM = "execution.parallelism"; public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; + @Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java index 2c62314e2b1..d4137955c8b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java @@ -85,6 +85,6 @@ public TextCommandService createTextCommandService() { @Override public void printNodeInfo() { - extCommon.printNodeInfo(systemLogger, ""); + extCommon.printNodeInfo(systemLogger); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java index 8cf0a34706a..8551f9381af 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java @@ -58,15 +58,15 @@ void onClusterStateChange(ClusterState ignored) { // TODO This is where cluster state changes are handled } - void printNodeInfo(ILogger log, String addToProductName) { - log.info(imdgVersionMessage()); + void printNodeInfo(ILogger log) { + log.info(imgVersionMessage()); log.info(clusterNameMessage()); log.fine(serializationVersionMessage()); log.info('\n' + Constants.ST_LOGO); log.info(Constants.COPYRIGHT_LINE); } - private String imdgVersionMessage() { + private String imgVersionMessage() { String build = node.getBuildInfo().getBuild(); String revision = node.getBuildInfo().getRevision(); if (!revision.isEmpty()) { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java index 4e2c3e111b0..604ba203cae 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.translation.spark.source; import org.apache.seatunnel.api.common.CommonOptions; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.Constants; @@ -74,7 +75,8 @@ public MicroBatchReader createMicroBatchReader( SeaTunnelSource seaTunnelSource = getSeaTunnelSource(options); Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 1); Integer checkpointInterval = - options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT); + options.getInt( + EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT); String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state"); Configuration configuration = diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java index 4d9a9b17f19..c4d03d2bd76 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatch.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.translation.spark.source.partition.micro; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportCoordinate; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -74,7 +75,7 @@ public Offset latestOffset() { public InputPartition[] planInputPartitions(Offset start, Offset end) { int checkpointInterval = caseInsensitiveStringMap.getInt( - Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT); + EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT); Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration(); String hdfsRoot =