Skip to content

Commit

Permalink
[FLINK-35265] Implement FlinkStateSnapshot custom resource
Browse files Browse the repository at this point in the history
  • Loading branch information
Mate Czagany authored and mateczagany committed May 30, 2024
1 parent 454ac6e commit b7c7644
Show file tree
Hide file tree
Showing 97 changed files with 4,567 additions and 606 deletions.
95 changes: 94 additions & 1 deletion docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ This page serves as a full reference for FlinkDeployment custom resource definit

## Spec

### CheckpointSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec

**Description**: Spec for checkpoint state snapshots.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| checkpointType | org.apache.flink.kubernetes.operator.api.status.CheckpointType | Type of checkpoint to take. |

### FlinkDeploymentSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec

Expand Down Expand Up @@ -71,6 +80,29 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | Flink configuration overrides for the Flink deployment or Flink session job. |
| deploymentName | java.lang.String | The name of the target session cluster deployment. |

### FlinkStateSnapshotReference
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference

**Description**: Reference for a FlinkStateSnapshot.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| namespace | java.lang.String | Namespace of the snapshot resource. |
| name | java.lang.String | Name of the snapshot resource. |
| path | java.lang.String | If a path is given, all other fields will be ignored, and this will be used as the initial savepoint path. |

### FlinkStateSnapshotSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec

**Description**: Spec that describes a FlinkStateSnapshot.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| jobReference | org.apache.flink.kubernetes.operator.api.spec.JobReference | Source to take a snapshot of. |
| savepoint | org.apache.flink.kubernetes.operator.api.spec.SavepointSpec | Spec in case of savepoint. |
| checkpoint | org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec | Spec in case of checkpoint. |
| backoffLimit | int | Maximum number of retries before the snapshot is considered as failed. Set to -1 for unlimited or 0 for no retries. |

### FlinkVersion
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkVersion

Expand Down Expand Up @@ -99,6 +131,16 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| labels | java.util.Map<java.lang.String,java.lang.String> | Ingress labels. |
| tls | java.util.List<io.fabric8.kubernetes.api.model.networking.v1.IngressTLS> | Ingress tls. |

### JobKind
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobKind

**Description**: Describes the Kubernetes kind of job reference.

| Value | Docs |
| ----- | ---- |
| FlinkDeployment | FlinkDeployment CR kind. |
| FlinkSessionJob | FlinkSessionJob CR kind. |

### JobManagerSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec

Expand All @@ -110,6 +152,16 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| replicas | int | Number of JobManager replicas. Must be 1 for non-HA deployments. |
| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. |

### JobReference
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobReference

**Description**: Flink resource reference that can be a FlinkDeployment or FlinkSessionJob.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. |
| name | java.lang.String | Name of the Flink resource. |

### JobSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec

Expand All @@ -124,10 +176,11 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| state | org.apache.flink.kubernetes.operator.api.spec.JobState | Desired state for the job. |
| savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger savepoint for the running job. In order to trigger a savepoint, change the number to a different non-null value. |
| initialSavepointPath | java.lang.String | Savepoint path used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
| flinkStateSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | Snapshot reference used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
| checkpointTriggerNonce | java.lang.Long | Nonce used to manually trigger checkpoint for the running job. In order to trigger a checkpoint, change the number to a different non-null value. |
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath or initialSavepointName. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |

### JobState
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
Expand Down Expand Up @@ -160,6 +213,18 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| memory | java.lang.String | Amount of memory allocated to the pod. Example: 1024m, 1g |
| ephemeralStorage | java.lang.String | Amount of ephemeral storage allocated to the pod. Example: 1024m, 2G |

### SavepointSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.SavepointSpec

**Description**: Spec for savepoint state snapshots.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| path | java.lang.String | Optional path for the savepoint. |
| formatType | org.apache.flink.kubernetes.operator.api.status.SavepointFormatType | Savepoint format to use. |
| disposeOnDelete | java.lang.Boolean | Dispose the savepoints upon CR deletion. |
| alreadyExists | java.lang.Boolean | Indicates that the savepoint already exists on the given path. The Operator will not trigger any new savepoints, just update the status of the resource as a completed snapshot. |

### TaskManagerSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec

Expand Down Expand Up @@ -275,6 +340,34 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| lifecycleState | org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | Lifecycle state of the Flink resource (including being rolled back, failed etc.). |
| reconciliationStatus | org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus | Status of the last reconcile operation. |

### FlinkStateSnapshotState
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState

**Description**: Describes current snapshot state.

| Value | Docs |
| ----- | ---- |
| COMPLETED | Snapshot was successful and available. |
| FAILED | Error during snapshot. |
| IN_PROGRESS | Snapshot in progress. |
| TRIGGER_PENDING | Not yet processed by the operator. |
| ABANDONED | Snapshot abandoned due to job failure/upgrade. |

### FlinkStateSnapshotStatus
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus

**Description**: Last observed status of the Flink state snapshot.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| state | org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState | Current state of the snapshot. |
| triggerId | java.lang.String | Trigger ID of the snapshot. |
| triggerTimestamp | java.lang.String | Trigger timestamp of a pending snapshot operation. |
| resultTimestamp | java.lang.String | Timestamp when the snapshot was last created/failed. |
| path | java.lang.String | Final path of the snapshot. |
| error | java.lang.String | Optional error information about the FlinkStateSnapshot. |
| failures | int | Number of failures, used for tracking max retries. |

### JobManagerDeploymentStatus
**Class**: org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus

Expand Down
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,24 @@
<td>Duration</td>
<td>Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.savepoint.dispose-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Savepoint data created by upgrade savepoints as FlinkStateSnapshot resources will be disposed of automatically when the Kubernetes resource gets deleted.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.checkpoint.interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Option to enable automatic checkpoint triggering. Can be specified either as a Duration type (i.e. '10m') or as a cron expression in Quartz format (6 or 7 positions, see http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The triggering schedule is not guaranteed, checkpoints will be triggered as part of the regular reconcile loop. NOTE: checkpoints are generally managed by Flink. This setting isn't meant to replace Flink's checkpoint settings, but to complement them in special cases. For instance, a full checkpoint might need to be occasionally triggered to break the chain of incremental checkpoints and consolidate the partial incremental files. WARNING: not intended to be used together with the cron-based periodic checkpoint triggering</td>
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.savepoint.dispose-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Savepoint data created by periodic savepoints as FlinkStateSnapshot resources will be disposed of automatically when the Kubernetes resource gets deleted.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -176,6 +188,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Create new FlinkStateSnapshot resources for storing snapshots. Disable if you wish to use the deprecated mode and save snapshot results to FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to the legacy mode during runtime if the CRD is not found, even if this value is true.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.user.artifacts.http.header</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@
<td>Duration</td>
<td>Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.savepoint.dispose-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Savepoint data created by upgrade savepoints as FlinkStateSnapshot resources will be disposed of automatically when the Kubernetes resource gets deleted.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.label.selector</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -272,6 +278,12 @@
<td>String</td>
<td>Option to enable automatic checkpoint triggering. Can be specified either as a Duration type (i.e. '10m') or as a cron expression in Quartz format (6 or 7 positions, see http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html).The triggering schedule is not guaranteed, checkpoints will be triggered as part of the regular reconcile loop. NOTE: checkpoints are generally managed by Flink. This setting isn't meant to replace Flink's checkpoint settings, but to complement them in special cases. For instance, a full checkpoint might need to be occasionally triggered to break the chain of incremental checkpoints and consolidate the partial incremental files. WARNING: not intended to be used together with the cron-based periodic checkpoint triggering</td>
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.savepoint.dispose-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Savepoint data created by periodic savepoints as FlinkStateSnapshot resources will be disposed of automatically when the Kubernetes resource gets deleted.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -386,6 +398,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Create new FlinkStateSnapshot resources for storing snapshots. Disable if you wish to use the deprecated mode and save snapshot results to FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to the legacy mode during runtime if the CRD is not found, even if this value is true.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.startup.stop-on-informer-error</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
1 change: 1 addition & 0 deletions e2e-tests/data/flinkdep-cr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spec:
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
kubernetes.operator.snapshot.resource.enabled: "false"
serviceAccount: flink
podTemplate:
spec:
Expand Down
1 change: 1 addition & 0 deletions e2e-tests/data/sessionjob-cr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spec:
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
kubernetes.operator.snapshot.resource.enabled: "false"
serviceAccount: flink
podTemplate:
spec:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,15 @@ public static String readable(Instant instant, ZoneId zoneId) {
ZonedDateTime dateTime = instant.atZone(zoneId);
return dateTime.format(DEFAULT_FORMATTER);
}

/**
* Convert an Instant to a format that is used in Kubernetes.
*
* @param instant The Instant to convert.
* @return The Kubernetes format in the system default zone.
*/
public static String kubernetes(Instant instant) {
ZonedDateTime dateTime = instant.atZone(ZoneId.systemDefault());
return dateTime.format(DateTimeFormatter.ISO_INSTANT);
}
}
1 change: 1 addition & 0 deletions flink-kubernetes-operator-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ under the License.
<includes>
<include>flinkdeployments.flink.apache.org-v1.yml</include>
<include>flinksessionjobs.flink.apache.org-v1.yml</include>
<include>flinkstatesnapshots.flink.apache.org-v1.yml</include>
</includes>
<filtering>false</filtering>
</resource>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ public class CrdConstants {
public static final String API_VERSION = "v1beta1";
public static final String KIND_SESSION_JOB = "FlinkSessionJob";
public static final String KIND_FLINK_DEPLOYMENT = "FlinkDeployment";
public static final String KIND_FLINK_STATE_SNAPSHOT = "FlinkStateSnapshot";

public static final String LABEL_TARGET_SESSION = "target.session";

public static final String EPHEMERAL_STORAGE = "ephemeral-storage";

public static final String LABEL_SNAPSHOT_TYPE = "snapshot.type";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.api;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

/** Custom resource definition for taking manual savepoints of Flink jobs. */
@Experimental
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonDeserialize()
@Group(CrdConstants.API_GROUP)
@Version(CrdConstants.API_VERSION)
@ShortNames({"flinksnp"})
public class FlinkStateSnapshot
extends CustomResource<FlinkStateSnapshotSpec, FlinkStateSnapshotStatus>
implements Namespaced {

@VisibleForTesting
@Override
public FlinkStateSnapshotStatus initStatus() {
return new FlinkStateSnapshotStatus();
}

@Override
public FlinkStateSnapshotSpec initSpec() {
return new FlinkStateSnapshotSpec();
}
}
Loading

0 comments on commit b7c7644

Please sign in to comment.