Skip to content

Commit

Permalink
[FLINK-35493][snapshot] Add historical cleanup for FlinkStateSnapshot…
Browse files Browse the repository at this point in the history
… CRs
  • Loading branch information
mateczagany authored Aug 20, 2024
1 parent d03b816 commit c8a6ca8
Show file tree
Hide file tree
Showing 17 changed files with 1,041 additions and 312 deletions.
39 changes: 27 additions & 12 deletions docs/content/docs/custom-resource/snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,28 +168,43 @@ There is no guarantee on the timely execution of the periodic snapshots as they
The operator automatically keeps track of the snapshot history triggered by upgrade, manual and periodic snapshot operations.
This is necessary so cleanup can be performed by the operator for old snapshots.

Users can control the cleanup behaviour by specifying a maximum age and maximum count for the savepoint and checkpoint resources in the history.
{{< hint info >}}
Snapshot cleanup happens lazily and only when the Flink resource associated with the snapshot is running.
It is therefore very likely that savepoints live beyond the max age configuration.
{{< /hint >}}

#### Savepoints

Users can control the cleanup behaviour by specifying maximum age and maximum count for savepoints.
If a max age is specified, FlinkStateSnapshot resources of savepoint type will be cleaned up based on the `metadata.creationTimestamp` field.
Snapshots will be cleaned up regardless of their status, but the operator will always keep at least 1 completed FlinkStateSnapshot for every Flink job at all time.

Example configuration:
```
kubernetes.operator.savepoint.history.max.age: 24 h
kubernetes.operator.savepoint.history.max.count: 5

kubernetes.operator.checkpoint.history.max.age: 24 h
kubernetes.operator.checkpoint.history.max.count: 5
```

To also dispose of savepoint data on savepoint cleanup, set `kubernetes.operator.savepoint.dispose-on-delete: true`.
This config will set `spec.savepoint.disposeOnDelete` to true for FlinkStateSnapshot CRs created by upgrade, periodic and manual savepoints created using `savepointTriggerNonce`.

To disable automatic savepoint cleanup by the operator you can set `kubernetes.operator.savepoint.cleanup.enabled: false`.

#### Checkpoints

FlinkStateSnapshots of checkpoint type will always be cleaned up. It's not possible to set max age for them.
The maxmimum amount of checkpoint resources retained will be deteremined by the Flink configuration `state.checkpoints.num-retained`.

{{< hint warning >}}
Checkpoint history history cleanup is only supported if FlinkStateSnapshot resources are enabled.
Checkpoint cleanup is only supported if FlinkStateSnapshot resources are enabled.
This operation will only delete the FlinkStateSnapshot CR, and will never delete any checkpoint data on the filesystem.
{{< /hint >}}

{{< hint info >}}
Savepoint cleanup happens lazily and only when the Flink resource associated with the snapshot is running.
It is therefore very likely that savepoints live beyond the max age configuration.
{{< /hint >}}

To also dispose of savepoint data on savepoint cleanup, set `kubernetes.operator.savepoint.dispose-on-delete: true`.
This config will set `spec.savepoint.disposeOnDelete` to true for FlinkStateSnapshot CRs created by periodic savepoints and manual ones created using `savepointTriggerNonce`.
### Snapshot History For Legacy Savepoints

To disable savepoint/checkpoint cleanup by the operator you can set `kubernetes.operator.savepoint.cleanup.enabled: false` and `kubernetes.operator.checkpoint.cleanup.enabled: false`.
Legacy savepoints found in FlinkDeployment/FlinkSessionJob CRs under the deprecated `status.jobStatus.savepointInfo.savepointHistory` will be cleaned up:
- For max age, it will be cleaned up when its trigger timestamp exceeds max age
- For max count and FlinkStateSnapshot resources **disabled**, it will be cleaned up when `savepointHistory` exceeds max count
- For max count and FlinkStateSnapshot resources **enabled**, it will be cleaned up when `savepointHistory` + number of FlinkStateSnapshot CRs related to the job exceed max count

6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
<td><h5>kubernetes.operator.savepoint.cleanup.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enable clean up of savepoint history.</td>
<td>Whether to enable clean up of savepoint FlinkStateSnapshot resources. Savepoint state will be disposed of as well if the snapshot CR spec is configured as such. For automatic savepoints this can be configured via the kubernetes.operator.savepoint.dispose-on-delete config option.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.dispose-on-delete</h5></td>
Expand All @@ -174,13 +174,13 @@
<td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
<td style="word-wrap: break-word;">1 d</td>
<td>Duration</td>
<td>Maximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.</td>
<td>Maximum age for savepoint FlinkStateSnapshot resources to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>Maximum number of savepoint history entries to retain.</td>
<td>Maximum number of savepoint FlinkStateSnapshot resources entries to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.trigger.grace-period</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@
<td><h5>kubernetes.operator.savepoint.cleanup.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enable clean up of savepoint history.</td>
<td>Whether to enable clean up of savepoint FlinkStateSnapshot resources. Savepoint state will be disposed of as well if the snapshot CR spec is configured as such. For automatic savepoints this can be configured via the kubernetes.operator.savepoint.dispose-on-delete config option.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.dispose-on-delete</h5></td>
Expand All @@ -372,25 +372,25 @@
<td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
<td style="word-wrap: break-word;">1 d</td>
<td>Duration</td>
<td>Maximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.</td>
<td>Maximum age for savepoint FlinkStateSnapshot resources to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.age.threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Maximum age threshold for savepoint history entries to retain.</td>
<td>Maximum age threshold for FlinkStateSnapshot resources to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>Maximum number of savepoint history entries to retain.</td>
<td>Maximum number of savepoint FlinkStateSnapshot resources entries to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count.threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Maximum number threshold of savepoint history entries to retain.</td>
<td>Maximum number threshold of savepoint FlinkStateSnapshot resources to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.trigger.grace-period</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@
<td><h5>kubernetes.operator.savepoint.history.max.age.threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Maximum age threshold for savepoint history entries to retain.</td>
<td>Maximum age threshold for FlinkStateSnapshot resources to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count.threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Maximum number threshold of savepoint history entries to retain.</td>
<td>Maximum number threshold of savepoint FlinkStateSnapshot resources to retain.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.startup.stop-on-informer-error</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,14 @@ public static String kubernetes(Instant instant) {
ZonedDateTime dateTime = instant.atZone(ZoneId.systemDefault());
return dateTime.format(DateTimeFormatter.ISO_INSTANT);
}

/**
* Parses a Kubernetes-compatible datetime.
*
* @param datetime datetime in Kubernetes format
* @return time parsed
*/
public static Instant parseKubernetes(String datetime) {
return Instant.parse(datetime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
operatorConfig.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD);

Boolean exceptionStackTraceEnabled =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_ENABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,35 +210,70 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Whether to enable recovery of missing/deleted jobmanager deployments.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
operatorConfig("savepoint.dispose-on-delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<SavepointFormatType> OPERATOR_SAVEPOINT_FORMAT_TYPE =
operatorConfig("savepoint.format.type")
.enumType(SavepointFormatType.class)
.defaultValue(SavepointFormatType.DEFAULT)
.withDescription(
"Type of the binary format in which a savepoint should be taken.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<CheckpointType> OPERATOR_CHECKPOINT_TYPE =
operatorConfig("checkpoint.type")
.enumType(CheckpointType.class)
.defaultValue(CheckpointType.FULL)
.withDescription("Type of checkpoint.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> OPERATOR_SAVEPOINT_CLEANUP_ENABLED =
operatorConfig("savepoint.cleanup.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to enable clean up of savepoint history.");
.withDescription(
String.format(
"Whether to enable clean up of savepoint FlinkStateSnapshot resources. Savepoint state will be disposed of as well if the snapshot CR spec is configured as such. For automatic savepoints this can be configured via the %s config option.",
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE.key()));

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT =
operatorConfig("savepoint.history.max.count")
.intType()
.defaultValue(10)
.withDescription("Maximum number of savepoint history entries to retain.");
.withDescription(
"Maximum number of savepoint FlinkStateSnapshot resources entries to retain.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD =
ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key() + ".threshold")
.intType()
.noDefaultValue()
.withDescription(
"Maximum number threshold of savepoint history entries to retain.");
"Maximum number threshold of savepoint FlinkStateSnapshot resources to retain.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_MAX_AGE =
operatorConfig("savepoint.history.max.age")
.durationType()
.defaultValue(Duration.ofHours(24))
.withDescription(
"Maximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.");
"Maximum age for savepoint FlinkStateSnapshot resources to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD =
ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() + ".threshold")
.durationType()
.noDefaultValue()
.withDescription(
"Maximum age threshold for FlinkStateSnapshot resources to retain.");

@Documentation.Section(SECTION_SYSTEM)
public static final ConfigOption<Boolean> OPERATOR_EXCEPTION_STACK_TRACE_ENABLED =
Expand Down Expand Up @@ -280,14 +315,6 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Key-Value pair where key is the REGEX to filter through the exception messages and value is the string to be included in CR status error label field if the REGEX matches. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD =
ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() + ".threshold")
.durationType()
.noDefaultValue()
.withDescription(
"Maximum age threshold for savepoint history entries to retain.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Map<String, String>> JAR_ARTIFACT_HTTP_HEADER =
operatorConfig("user.artifacts.http.header")
Expand Down Expand Up @@ -438,29 +465,6 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
operatorConfig("savepoint.dispose-on-delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<SavepointFormatType> OPERATOR_SAVEPOINT_FORMAT_TYPE =
operatorConfig("savepoint.format.type")
.enumType(SavepointFormatType.class)
.defaultValue(SavepointFormatType.DEFAULT)
.withDescription(
"Type of the binary format in which a savepoint should be taken.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<CheckpointType> OPERATOR_CHECKPOINT_TYPE =
operatorConfig("checkpoint.type")
.enumType(CheckpointType.class)
.defaultValue(CheckpointType.FULL)
.withDescription("Type of checkpoint.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Boolean> OPERATOR_HEALTH_PROBE_ENABLED =
operatorConfig("health.probe.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
Expand All @@ -49,6 +51,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -197,9 +201,19 @@ private void handleRecoveryFailed(
@Override
public Map<String, EventSource> prepareEventSources(
EventSourceContext<FlinkDeployment> context) {
return EventSourceInitializer.nameEventSources(
EventSourceUtils.getSessionJobInformerEventSource(context),
EventSourceUtils.getDeploymentInformerEventSource(context));
List<EventSource> eventSources = new ArrayList<>();
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));

if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
eventSources.add(
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
} else {
LOG.warn(
"Could not initialize informer for snapshots as the CRD has not been installed!");
}

return EventSourceInitializer.nameEventSources(eventSources.toArray(EventSource[]::new));
}

@Override
Expand Down
Loading

0 comments on commit c8a6ca8

Please sign in to comment.