From cb90b10c93241f9994acdd11d59ce1ec25ccbc47 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Mon, 6 May 2024 11:51:33 +0200 Subject: [PATCH] [FLINK-35126] Rework default checkpoint progress check window --- .../shortcodes/generated/dynamic_section.html | 6 +-- ...ernetes_operator_config_configuration.html | 6 +-- .../KubernetesOperatorConfigOptions.java | 6 +-- .../observer/ClusterHealthEvaluator.java | 50 ++++++++++++------- .../UnhealthyDeploymentRestartTest.java | 2 +- .../observer/ClusterHealthEvaluatorTest.java | 34 +++++++++---- 6 files changed, 66 insertions(+), 38 deletions(-) diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html b/docs/layouts/shortcodes/generated/dynamic_section.html index 9c1083fca6..be9638158e 100644 --- a/docs/layouts/shortcodes/generated/dynamic_section.html +++ b/docs/layouts/shortcodes/generated/dynamic_section.html @@ -22,15 +22,15 @@
kubernetes.operator.cluster.health-check.checkpoint-progress.enabled
- false + true Boolean Whether to enable checkpoint progress health check for clusters.
kubernetes.operator.cluster.health-check.checkpoint-progress.window
- 5 min + (none) Duration - If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval. + If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes.
kubernetes.operator.cluster.health-check.enabled
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 172029c879..218e2626c4 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -22,15 +22,15 @@
kubernetes.operator.cluster.health-check.checkpoint-progress.enabled
- false + true Boolean Whether to enable checkpoint progress health check for clusters.
kubernetes.operator.cluster.health-check.checkpoint-progress.window
- 5 min + (none) Duration - If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval. + If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes.
kubernetes.operator.cluster.health-check.enabled
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index b065d22885..a9e6d32299 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -502,7 +502,7 @@ public static String operatorConfigKey(String key) { OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED = operatorConfig("cluster.health-check.checkpoint-progress.enabled") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription( "Whether to enable checkpoint progress health check for clusters."); @@ -511,9 +511,9 @@ public static String operatorConfigKey(String key) { OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW = operatorConfig("cluster.health-check.checkpoint-progress.window") .durationType() - .defaultValue(Duration.ofMinutes(5)) + .noDefaultValue() .withDescription( - "If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval."); + "If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes."); @Documentation.Section(SECTION_DYNAMIC) public static final ConfigOption OPERATOR_JOB_RESTART_FAILED = diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java index c84f34138a..d38a8f0fd2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java @@ -178,30 +178,46 @@ private boolean evaluateCheckpoints( return true; } - var completedCheckpointsCheckWindow = - configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW); + var windowOpt = + configuration.getOptional(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW); CheckpointConfig checkpointConfig = new CheckpointConfig(); checkpointConfig.configure(configuration); var checkpointingInterval = checkpointConfig.getCheckpointInterval(); var checkpointingTimeout = checkpointConfig.getCheckpointTimeout(); - var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 1; - var minCompletedCheckpointsCheckWindow = - Math.max( - checkpointingInterval * tolerationFailureNumber, - checkpointingTimeout * tolerationFailureNumber); - if (completedCheckpointsCheckWindow.toMillis() < minCompletedCheckpointsCheckWindow) { - LOG.warn( - "{} is not long enough. Default to max({} * {}, {} * {}): {}ms", - OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(), - CHECKPOINTING_INTERVAL.key(), - TOLERABLE_FAILURE_NUMBER.key(), - CHECKPOINTING_TIMEOUT.key(), - TOLERABLE_FAILURE_NUMBER.key(), - minCompletedCheckpointsCheckWindow); - completedCheckpointsCheckWindow = Duration.ofMillis(minCompletedCheckpointsCheckWindow); + var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 2; + var minCheckWindow = + Duration.ofMillis( + Math.max( + checkpointingInterval * tolerationFailureNumber, + checkpointingTimeout * tolerationFailureNumber)); + + if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) { + // If no explicit checkpoint check window is specified and checkpointing is disabled + // based on the config, we don't do anything + return true; } + var completedCheckpointsCheckWindow = + windowOpt + .filter( + d -> { + if (d.compareTo(minCheckWindow) < 0) { + LOG.debug( + "{} is not long enough. Default to max({} * {}, {} * {}): {}", + OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW + .key(), + CHECKPOINTING_INTERVAL.key(), + TOLERABLE_FAILURE_NUMBER.key(), + CHECKPOINTING_TIMEOUT.key(), + TOLERABLE_FAILURE_NUMBER.key(), + minCheckWindow); + return false; + } + return true; + }) + .orElse(minCheckWindow); + if (observedClusterHealthInfo.getNumCompletedCheckpoints() < lastValidClusterHealthInfo.getNumCompletedCheckpoints()) { LOG.debug( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java index 050db7d453..c7b26d1b90 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java @@ -138,7 +138,7 @@ public void verifyApplicationNoCompletedCheckpointsJmRecovery( // Ensure the last savepoint has been taken more than 10 minutes ago (Default checkpoint // interval) clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp( - clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 600000); + clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 1200000); setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(), clusterHealthInfo); testController.getStatusRecorder().patchAndCacheStatus(appCluster, kubernetesClient); testController.reconcile(appCluster, context); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java index 9d58d71bfe..83a0e7515f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java @@ -159,6 +159,8 @@ public void evaluateShouldOverwriteRestartCountWhenTimestampIsOutOfWindow() { @Test public void evaluateShouldOverwriteCompletedCheckpointCountWhenLess() { configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, true); + configuration.set( + OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, Duration.ofMinutes(5)); var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant1, 0, 1); var observedClusterHealthInfo2 = createClusterHealthInfo(validInstant2, 0, 0); @@ -278,25 +280,33 @@ private static Stream provideParametersEvaluateCheckpointing() { Instant tenSecInstant = ofEpochSecond(10); Instant twoMinInstant = ofEpochSecond(120); Instant fourMinInstant = twoMinInstant.plus(2, ChronoUnit.MINUTES); + + Duration oneMin = Duration.ofMinutes(1); return Stream.of( // ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow - Arguments.of(twoMinInstant, fourMinInstant, 30L, 30L, null, false), + Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 30L, null, false), + // Verify checkpoint progress even if checkpointing not configured + Arguments.of(twoMinInstant, fourMinInstant, oneMin, null, 30L, null, false), + // Verify default window if not explicitly configured + Arguments.of(twoMinInstant, fourMinInstant, null, 30L, 30L, null, false), + // Verify check is off if both window and checkpointing is not configured + Arguments.of(twoMinInstant, fourMinInstant, null, null, 30L, null, true), // ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointInterval - Arguments.of(twoMinInstant, fourMinInstant, 120L, 30L, null, true), + Arguments.of(twoMinInstant, fourMinInstant, oneMin, 60L, 30L, null, true), // ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointInterval - Arguments.of(tenSecInstant, fourMinInstant, 120L, 30L, null, false), + Arguments.of(tenSecInstant, fourMinInstant, oneMin, 60L, 30L, null, false), // ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure - Arguments.of(twoMinInstant, fourMinInstant, 30L, 10L, 3, true), + Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 10L, 3, true), // ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure - Arguments.of(tenSecInstant, fourMinInstant, 30L, 10L, 3, false), + Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 10L, 3, false), // ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout - Arguments.of(twoMinInstant, fourMinInstant, 30L, 120L, null, true), + Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 60L, null, true), // ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout - Arguments.of(tenSecInstant, fourMinInstant, 30L, 120L, null, false), + Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 60L, null, false), // ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure - Arguments.of(twoMinInstant, fourMinInstant, 10L, 30L, 3, true), + Arguments.of(twoMinInstant, fourMinInstant, oneMin, 10L, 30L, 3, true), // ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure - Arguments.of(tenSecInstant, fourMinInstant, 10L, 30L, 3, false)); + Arguments.of(tenSecInstant, fourMinInstant, oneMin, 10L, 30L, 3, false)); } @ParameterizedTest @@ -304,13 +314,15 @@ private static Stream provideParametersEvaluateCheckpointing() { public void evaluateCheckpointing( Instant validInstant1, Instant validInstant2, + Duration window, Long checkpointingInterval, long checkpointingTimeout, Integer tolerationFailureNumber, boolean expectedIsHealthy) { configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, true); - configuration.set( - OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, Duration.ofMinutes(1)); + if (window != null) { + configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, window); + } if (checkpointingInterval != null) { configuration.set(CHECKPOINTING_INTERVAL, Duration.ofSeconds(checkpointingInterval)); }