Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] [Improve] Unify the checkpoint setting key of Flink #4296

Merged
merged 7 commits into from
Mar 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.seatunnel.common;

public final class Constants {
public static final String ROW_ROOT = "__root__";
public static final String ROW_TMP = "__tmp__";

public static final String LOGO = "SeaTunnel";

Expand All @@ -39,8 +37,6 @@ public final class Constants {

public static final String HDFS_USER = "hdfs.user";

public static final String CHECKPOINT_INTERVAL = "checkpoint.interval";

public static final String CHECKPOINT_ID = "checkpoint.id";

public static final String UUID = "uuid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
Expand Down Expand Up @@ -235,9 +236,16 @@ private void setTimeCharacteristic() {
}

private void setCheckpoint() {
if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {

long interval = 0;
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
}

if (interval > 0) {
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
environment.enableCheckpointing(interval);

if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ private ConfigKeyName() {
public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
public static final String PARALLELISM = "execution.parallelism";
public static final String MAX_PARALLELISM = "execution.max-parallelism";
public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";

@Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
Expand Down Expand Up @@ -235,9 +236,16 @@ private void setTimeCharacteristic() {
}

private void setCheckpoint() {
if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {

long interval = 0;
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
}

if (interval > 0) {
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
environment.enableCheckpointing(interval);

if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private ConfigKeyName() {
public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
public static final String PARALLELISM = "execution.parallelism";
public static final String MAX_PARALLELISM = "execution.max-parallelism";
public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
@Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ public TextCommandService createTextCommandService() {

@Override
public void printNodeInfo() {
extCommon.printNodeInfo(systemLogger, "");
extCommon.printNodeInfo(systemLogger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ void onClusterStateChange(ClusterState ignored) {
// TODO This is where cluster state changes are handled
}

void printNodeInfo(ILogger log, String addToProductName) {
log.info(imdgVersionMessage());
void printNodeInfo(ILogger log) {
log.info(imgVersionMessage());
log.info(clusterNameMessage());
log.fine(serializationVersionMessage());
log.info('\n' + Constants.ST_LOGO);
log.info(Constants.COPYRIGHT_LINE);
}

private String imdgVersionMessage() {
private String imgVersionMessage() {
String build = node.getBuildInfo().getBuild();
String revision = node.getBuildInfo().getRevision();
if (!revision.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.source;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
Expand Down Expand Up @@ -74,7 +75,8 @@ public MicroBatchReader createMicroBatchReader(
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 1);
Integer checkpointInterval =
options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
options.getInt(
EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath =
StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state");
Configuration configuration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.translation.spark.source.partition.micro;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand Down Expand Up @@ -74,7 +75,7 @@ public Offset latestOffset() {
public InputPartition[] planInputPartitions(Offset start, Offset end) {
int checkpointInterval =
caseInsensitiveStringMap.getInt(
Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT);
Configuration configuration =
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
String hdfsRoot =
Expand Down