diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html b/docs/layouts/shortcodes/generated/dynamic_section.html
index 9c1083fca6..be9638158e 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -22,15 +22,15 @@
kubernetes.operator.cluster.health-check.checkpoint-progress.enabled |
- false |
+ true |
Boolean |
Whether to enable checkpoint progress health check for clusters. |
kubernetes.operator.cluster.health-check.checkpoint-progress.window |
- 5 min |
+ (none) |
Duration |
- If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval. |
+ If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes. |
kubernetes.operator.cluster.health-check.enabled |
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 172029c879..218e2626c4 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -22,15 +22,15 @@
kubernetes.operator.cluster.health-check.checkpoint-progress.enabled |
- false |
+ true |
Boolean |
Whether to enable checkpoint progress health check for clusters. |
kubernetes.operator.cluster.health-check.checkpoint-progress.window |
- 5 min |
+ (none) |
Duration |
- If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval. |
+ If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes. |
kubernetes.operator.cluster.health-check.enabled |
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index b065d22885..a9e6d32299 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -502,7 +502,7 @@ public static String operatorConfigKey(String key) {
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED =
operatorConfig("cluster.health-check.checkpoint-progress.enabled")
.booleanType()
- .defaultValue(false)
+ .defaultValue(true)
.withDescription(
"Whether to enable checkpoint progress health check for clusters.");
@@ -511,9 +511,9 @@ public static String operatorConfigKey(String key) {
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW =
operatorConfig("cluster.health-check.checkpoint-progress.window")
.durationType()
- .defaultValue(Duration.ofMinutes(5))
+ .noDefaultValue()
.withDescription(
- "If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval.");
+ "If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes.");
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption OPERATOR_JOB_RESTART_FAILED =
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
index c84f34138a..d38a8f0fd2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
@@ -178,30 +178,46 @@ private boolean evaluateCheckpoints(
return true;
}
- var completedCheckpointsCheckWindow =
- configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
+ var windowOpt =
+ configuration.getOptional(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
CheckpointConfig checkpointConfig = new CheckpointConfig();
checkpointConfig.configure(configuration);
var checkpointingInterval = checkpointConfig.getCheckpointInterval();
var checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
- var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 1;
- var minCompletedCheckpointsCheckWindow =
- Math.max(
- checkpointingInterval * tolerationFailureNumber,
- checkpointingTimeout * tolerationFailureNumber);
- if (completedCheckpointsCheckWindow.toMillis() < minCompletedCheckpointsCheckWindow) {
- LOG.warn(
- "{} is not long enough. Default to max({} * {}, {} * {}): {}ms",
- OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(),
- CHECKPOINTING_INTERVAL.key(),
- TOLERABLE_FAILURE_NUMBER.key(),
- CHECKPOINTING_TIMEOUT.key(),
- TOLERABLE_FAILURE_NUMBER.key(),
- minCompletedCheckpointsCheckWindow);
- completedCheckpointsCheckWindow = Duration.ofMillis(minCompletedCheckpointsCheckWindow);
+ var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 2;
+ var minCheckWindow =
+ Duration.ofMillis(
+ Math.max(
+ checkpointingInterval * tolerationFailureNumber,
+ checkpointingTimeout * tolerationFailureNumber));
+
+ if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) {
+ // If no explicit checkpoint check window is specified and checkpointing is disabled
+ // based on the config, we don't do anything
+ return true;
}
+ var completedCheckpointsCheckWindow =
+ windowOpt
+ .filter(
+ d -> {
+ if (d.compareTo(minCheckWindow) < 0) {
+ LOG.debug(
+ "{} is not long enough. Default to max({} * {}, {} * {}): {}",
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW
+ .key(),
+ CHECKPOINTING_INTERVAL.key(),
+ TOLERABLE_FAILURE_NUMBER.key(),
+ CHECKPOINTING_TIMEOUT.key(),
+ TOLERABLE_FAILURE_NUMBER.key(),
+ minCheckWindow);
+ return false;
+ }
+ return true;
+ })
+ .orElse(minCheckWindow);
+
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
< lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
LOG.debug(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 050db7d453..c7b26d1b90 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -138,7 +138,7 @@ public void verifyApplicationNoCompletedCheckpointsJmRecovery(
// Ensure the last savepoint has been taken more than 10 minutes ago (Default checkpoint
// interval)
clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
- clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 600000);
+ clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 1200000);
setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(), clusterHealthInfo);
testController.getStatusRecorder().patchAndCacheStatus(appCluster, kubernetesClient);
testController.reconcile(appCluster, context);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
index 9d58d71bfe..83a0e7515f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
@@ -159,6 +159,8 @@ public void evaluateShouldOverwriteRestartCountWhenTimestampIsOutOfWindow() {
@Test
public void evaluateShouldOverwriteCompletedCheckpointCountWhenLess() {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, true);
+ configuration.set(
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, Duration.ofMinutes(5));
var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant1, 0, 1);
var observedClusterHealthInfo2 = createClusterHealthInfo(validInstant2, 0, 0);
@@ -278,25 +280,33 @@ private static Stream provideParametersEvaluateCheckpointing() {
Instant tenSecInstant = ofEpochSecond(10);
Instant twoMinInstant = ofEpochSecond(120);
Instant fourMinInstant = twoMinInstant.plus(2, ChronoUnit.MINUTES);
+
+ Duration oneMin = Duration.ofMinutes(1);
return Stream.of(
// ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow
- Arguments.of(twoMinInstant, fourMinInstant, 30L, 30L, null, false),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 30L, null, false),
+ // Verify checkpoint progress even if checkpointing not configured
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, null, 30L, null, false),
+ // Verify default window if not explicitly configured
+ Arguments.of(twoMinInstant, fourMinInstant, null, 30L, 30L, null, false),
+ // Verify check is off if both window and checkpointing is not configured
+ Arguments.of(twoMinInstant, fourMinInstant, null, null, 30L, null, true),
// ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
- Arguments.of(twoMinInstant, fourMinInstant, 120L, 30L, null, true),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 60L, 30L, null, true),
// ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
- Arguments.of(tenSecInstant, fourMinInstant, 120L, 30L, null, false),
+ Arguments.of(tenSecInstant, fourMinInstant, oneMin, 60L, 30L, null, false),
// ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
- Arguments.of(twoMinInstant, fourMinInstant, 30L, 10L, 3, true),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 10L, 3, true),
// ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
- Arguments.of(tenSecInstant, fourMinInstant, 30L, 10L, 3, false),
+ Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 10L, 3, false),
// ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
- Arguments.of(twoMinInstant, fourMinInstant, 30L, 120L, null, true),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 60L, null, true),
// ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
- Arguments.of(tenSecInstant, fourMinInstant, 30L, 120L, null, false),
+ Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 60L, null, false),
// ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
- Arguments.of(twoMinInstant, fourMinInstant, 10L, 30L, 3, true),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 10L, 30L, 3, true),
// ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
- Arguments.of(tenSecInstant, fourMinInstant, 10L, 30L, 3, false));
+ Arguments.of(tenSecInstant, fourMinInstant, oneMin, 10L, 30L, 3, false));
}
@ParameterizedTest
@@ -304,13 +314,15 @@ private static Stream provideParametersEvaluateCheckpointing() {
public void evaluateCheckpointing(
Instant validInstant1,
Instant validInstant2,
+ Duration window,
Long checkpointingInterval,
long checkpointingTimeout,
Integer tolerationFailureNumber,
boolean expectedIsHealthy) {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, true);
- configuration.set(
- OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, Duration.ofMinutes(1));
+ if (window != null) {
+ configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, window);
+ }
if (checkpointingInterval != null) {
configuration.set(CHECKPOINTING_INTERVAL, Duration.ofSeconds(checkpointingInterval));
}