Skip to content

Commit

Permalink
[Improve][Core] Unified the checkpoint setting key of Flink (apache#4296
Browse files Browse the repository at this point in the history
)

* [Doc] [Connector-V2] Improve MySQL-CDC Doc For connect.timeout.ms field

* [Core] [Improve] Unify the checkpoint setting key of Flink

* Revert "[Doc] [Connector-V2] Improve MySQL-CDC Doc For connect.timeout.ms field"

This reverts commit cd95635.

* [Core] [Improve] Unify the checkpoint setting key of Flink

* [Core] [Improve] Unify the checkpoint setting key of Flink
  • Loading branch information
Hisoka-X authored and hailin0 committed Mar 23, 2023
1 parent c11c171 commit 8104326
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.seatunnel.common;

public final class Constants {
public static final String ROW_ROOT = "__root__";
public static final String ROW_TMP = "__tmp__";

public static final String LOGO = "SeaTunnel";

Expand All @@ -39,8 +37,6 @@ public final class Constants {

public static final String HDFS_USER = "hdfs.user";

public static final String CHECKPOINT_INTERVAL = "checkpoint.interval";

public static final String CHECKPOINT_ID = "checkpoint.id";

public static final String UUID = "uuid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
Expand Down Expand Up @@ -235,9 +236,16 @@ private void setTimeCharacteristic() {
}

private void setCheckpoint() {
if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {

long interval = 0;
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
}

if (interval > 0) {
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
environment.enableCheckpointing(interval);

if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ private ConfigKeyName() {
public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
public static final String PARALLELISM = "execution.parallelism";
public static final String MAX_PARALLELISM = "execution.max-parallelism";
public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";

@Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
Expand Down Expand Up @@ -235,9 +236,16 @@ private void setTimeCharacteristic() {
}

private void setCheckpoint() {
if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {

long interval = 0;
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
}

if (interval > 0) {
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
environment.enableCheckpointing(interval);

if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private ConfigKeyName() {
public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
public static final String PARALLELISM = "execution.parallelism";
public static final String MAX_PARALLELISM = "execution.max-parallelism";
public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
@Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ public TextCommandService createTextCommandService() {

@Override
public void printNodeInfo() {
extCommon.printNodeInfo(systemLogger, "");
extCommon.printNodeInfo(systemLogger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ void onClusterStateChange(ClusterState ignored) {
// TODO This is where cluster state changes are handled
}

void printNodeInfo(ILogger log, String addToProductName) {
log.info(imdgVersionMessage());
void printNodeInfo(ILogger log) {
log.info(imgVersionMessage());
log.info(clusterNameMessage());
log.fine(serializationVersionMessage());
log.info('\n' + Constants.ST_LOGO);
log.info(Constants.COPYRIGHT_LINE);
}

private String imdgVersionMessage() {
private String imgVersionMessage() {
String build = node.getBuildInfo().getBuild();
String revision = node.getBuildInfo().getRevision();
if (!revision.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.source;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
Expand Down Expand Up @@ -74,7 +75,8 @@ public MicroBatchReader createMicroBatchReader(
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 1);
Integer checkpointInterval =
options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
options.getInt(
EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath =
StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state");
Configuration configuration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.translation.spark.source.partition.micro;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand Down Expand Up @@ -74,7 +75,7 @@ public Offset latestOffset() {
public InputPartition[] planInputPartitions(Offset start, Offset end) {
int checkpointInterval =
caseInsensitiveStringMap.getInt(
Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
EnvCommonOptions.CHECKPOINT_INTERVAL.key(), CHECKPOINT_INTERVAL_DEFAULT);
Configuration configuration =
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
String hdfsRoot =
Expand Down

0 comments on commit 8104326

Please sign in to comment.