Skip to content

Commit

Permalink
[FLINK-35126] Rework default checkpoint progress check window
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jul 15, 2024
1 parent 2b82a93 commit cb90b10
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 38 deletions.
6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enable checkpoint progress health check for clusters.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
<td style="word-wrap: break-word;">5 min</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval.</td>
<td>If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enable checkpoint progress health check for clusters.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
<td style="word-wrap: break-word;">5 min</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval.</td>
<td>If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ public static String operatorConfigKey(String key) {
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED =
operatorConfig("cluster.health-check.checkpoint-progress.enabled")
.booleanType()
.defaultValue(false)
.defaultValue(true)
.withDescription(
"Whether to enable checkpoint progress health check for clusters.");

Expand All @@ -511,9 +511,9 @@ public static String operatorConfigKey(String key) {
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW =
operatorConfig("cluster.health-check.checkpoint-progress.window")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.noDefaultValue()
.withDescription(
"If no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval.");
"If no checkpoints are completed within the defined time window, the job is considered unhealthy. The minimum window size is `max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 2)`, which also serves as the default value when checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 tolerable failures, the default progress check window will be 20 minutes.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> OPERATOR_JOB_RESTART_FAILED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,30 +178,46 @@ private boolean evaluateCheckpoints(
return true;
}

var completedCheckpointsCheckWindow =
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
var windowOpt =
configuration.getOptional(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);

CheckpointConfig checkpointConfig = new CheckpointConfig();
checkpointConfig.configure(configuration);
var checkpointingInterval = checkpointConfig.getCheckpointInterval();
var checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 1;
var minCompletedCheckpointsCheckWindow =
Math.max(
checkpointingInterval * tolerationFailureNumber,
checkpointingTimeout * tolerationFailureNumber);
if (completedCheckpointsCheckWindow.toMillis() < minCompletedCheckpointsCheckWindow) {
LOG.warn(
"{} is not long enough. Default to max({} * {}, {} * {}): {}ms",
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(),
CHECKPOINTING_INTERVAL.key(),
TOLERABLE_FAILURE_NUMBER.key(),
CHECKPOINTING_TIMEOUT.key(),
TOLERABLE_FAILURE_NUMBER.key(),
minCompletedCheckpointsCheckWindow);
completedCheckpointsCheckWindow = Duration.ofMillis(minCompletedCheckpointsCheckWindow);
var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 2;
var minCheckWindow =
Duration.ofMillis(
Math.max(
checkpointingInterval * tolerationFailureNumber,
checkpointingTimeout * tolerationFailureNumber));

if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) {
// If no explicit checkpoint check window is specified and checkpointing is disabled
// based on the config, we don't do anything
return true;
}

var completedCheckpointsCheckWindow =
windowOpt
.filter(
d -> {
if (d.compareTo(minCheckWindow) < 0) {
LOG.debug(
"{} is not long enough. Default to max({} * {}, {} * {}): {}",
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW
.key(),
CHECKPOINTING_INTERVAL.key(),
TOLERABLE_FAILURE_NUMBER.key(),
CHECKPOINTING_TIMEOUT.key(),
TOLERABLE_FAILURE_NUMBER.key(),
minCheckWindow);
return false;
}
return true;
})
.orElse(minCheckWindow);

if (observedClusterHealthInfo.getNumCompletedCheckpoints()
< lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void verifyApplicationNoCompletedCheckpointsJmRecovery(
// Ensure the last savepoint has been taken more than 10 minutes ago (Default checkpoint
// interval)
clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 600000);
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 1200000);
setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(), clusterHealthInfo);
testController.getStatusRecorder().patchAndCacheStatus(appCluster, kubernetesClient);
testController.reconcile(appCluster, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public void evaluateShouldOverwriteRestartCountWhenTimestampIsOutOfWindow() {
@Test
public void evaluateShouldOverwriteCompletedCheckpointCountWhenLess() {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, true);
configuration.set(
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, Duration.ofMinutes(5));
var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant1, 0, 1);
var observedClusterHealthInfo2 = createClusterHealthInfo(validInstant2, 0, 0);

Expand Down Expand Up @@ -278,39 +280,49 @@ private static Stream<Arguments> provideParametersEvaluateCheckpointing() {
Instant tenSecInstant = ofEpochSecond(10);
Instant twoMinInstant = ofEpochSecond(120);
Instant fourMinInstant = twoMinInstant.plus(2, ChronoUnit.MINUTES);

Duration oneMin = Duration.ofMinutes(1);
return Stream.of(
// ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow
Arguments.of(twoMinInstant, fourMinInstant, 30L, 30L, null, false),
Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 30L, null, false),
// Verify checkpoint progress even if checkpointing not configured
Arguments.of(twoMinInstant, fourMinInstant, oneMin, null, 30L, null, false),
// Verify default window if not explicitly configured
Arguments.of(twoMinInstant, fourMinInstant, null, 30L, 30L, null, false),
// Verify check is off if both window and checkpointing is not configured
Arguments.of(twoMinInstant, fourMinInstant, null, null, 30L, null, true),
// ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
Arguments.of(twoMinInstant, fourMinInstant, 120L, 30L, null, true),
Arguments.of(twoMinInstant, fourMinInstant, oneMin, 60L, 30L, null, true),
// ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
Arguments.of(tenSecInstant, fourMinInstant, 120L, 30L, null, false),
Arguments.of(tenSecInstant, fourMinInstant, oneMin, 60L, 30L, null, false),
// ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
Arguments.of(twoMinInstant, fourMinInstant, 30L, 10L, 3, true),
Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 10L, 3, true),
// ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
Arguments.of(tenSecInstant, fourMinInstant, 30L, 10L, 3, false),
Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 10L, 3, false),
// ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
Arguments.of(twoMinInstant, fourMinInstant, 30L, 120L, null, true),
Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 60L, null, true),
// ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
Arguments.of(tenSecInstant, fourMinInstant, 30L, 120L, null, false),
Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 60L, null, false),
// ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
Arguments.of(twoMinInstant, fourMinInstant, 10L, 30L, 3, true),
Arguments.of(twoMinInstant, fourMinInstant, oneMin, 10L, 30L, 3, true),
// ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
Arguments.of(tenSecInstant, fourMinInstant, 10L, 30L, 3, false));
Arguments.of(tenSecInstant, fourMinInstant, oneMin, 10L, 30L, 3, false));
}

@ParameterizedTest
@MethodSource("provideParametersEvaluateCheckpointing")
public void evaluateCheckpointing(
Instant validInstant1,
Instant validInstant2,
Duration window,
Long checkpointingInterval,
long checkpointingTimeout,
Integer tolerationFailureNumber,
boolean expectedIsHealthy) {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, true);
configuration.set(
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, Duration.ofMinutes(1));
if (window != null) {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, window);
}
if (checkpointingInterval != null) {
configuration.set(CHECKPOINTING_INTERVAL, Duration.ofSeconds(checkpointingInterval));
}
Expand Down

0 comments on commit cb90b10

Please sign in to comment.