Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35265] Implement FlinkStateSnapshot custom resource #821

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 110 additions & 2 deletions docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r

## Spec

### CheckpointSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec

**Description**: Spec for checkpoint state snapshots. This is an empty class, used to instruct the operator to
trigger a checkpoint.

| Parameter | Type | Docs |
| ----------| ---- | ---- |

### FlinkDeploymentSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec

Expand Down Expand Up @@ -81,6 +90,29 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | Flink configuration overrides for the Flink deployment or Flink session job. |
| deploymentName | java.lang.String | The name of the target session cluster deployment. |

### FlinkStateSnapshotReference
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference

**Description**: Reference for a FlinkStateSnapshot.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| namespace | java.lang.String | Namespace of the snapshot resource. |
| name | java.lang.String | Name of the snapshot resource. |
| path | java.lang.String | If a path is given, all other fields will be ignored, and this will be used as the initial savepoint path. |

### FlinkStateSnapshotSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec

**Description**: Spec that describes a FlinkStateSnapshot.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| jobReference | org.apache.flink.kubernetes.operator.api.spec.JobReference | Source to take a snapshot of. Not required if it's a savepoint and alreadyExists is true. |
| savepoint | org.apache.flink.kubernetes.operator.api.spec.SavepointSpec | Spec in case of savepoint. |
| checkpoint | org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec | Spec in case of checkpoint. |
| backoffLimit | int | Maximum number of retries before the snapshot is considered as failed. Set to -1 for unlimited or 0 for no retries. |

### FlinkVersion
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkVersion

Expand Down Expand Up @@ -109,6 +141,16 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| labels | java.util.Map<java.lang.String,java.lang.String> | Ingress labels. |
| tls | java.util.List<io.fabric8.kubernetes.api.model.networking.v1.IngressTLS> | Ingress tls. |

### JobKind
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobKind

**Description**: Describes the Kubernetes kind of job reference.

| Value | Docs |
| ----- | ---- |
| FlinkDeployment | FlinkDeployment CR kind. |
| FlinkSessionJob | FlinkSessionJob CR kind. |

### JobManagerSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec

Expand All @@ -120,6 +162,17 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| replicas | int | Number of JobManager replicas. Must be 1 for non-HA deployments. |
| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. |

### JobReference
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobReference

**Description**: Flink resource reference that can be a FlinkDeployment or FlinkSessionJob.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. |
| name | java.lang.String | Name of the Flink resource. |
| namespace | java.lang.String | Namespace of the Flink resource. If empty, the operator will use the namespace of the snapshot. |

### JobSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec

Expand All @@ -134,10 +187,11 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| state | org.apache.flink.kubernetes.operator.api.spec.JobState | Desired state for the job. |
| savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger savepoint for the running job. In order to trigger a savepoint, change the number to a different non-null value. |
| initialSavepointPath | java.lang.String | Savepoint path used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
| flinkStateSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | Snapshot reference used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
| checkpointTriggerNonce | java.lang.Long | Nonce used to manually trigger checkpoint for the running job. In order to trigger a checkpoint, change the number to a different non-null value. |
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath or the path/FlinkStateSnapshot reference in flinkStateSnapshotReference. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |

### JobState
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
Expand Down Expand Up @@ -170,6 +224,18 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| memory | java.lang.String | Amount of memory allocated to the pod. Example: 1024m, 1g |
| ephemeralStorage | java.lang.String | Amount of ephemeral storage allocated to the pod. Example: 1024m, 2G |

### SavepointSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.SavepointSpec

**Description**: Spec for savepoint state snapshots.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| path | java.lang.String | Optional path for the savepoint. |
| formatType | org.apache.flink.kubernetes.operator.api.status.SavepointFormatType | Savepoint format to use. |
| disposeOnDelete | java.lang.Boolean | Dispose the savepoints upon CR deletion. |
| alreadyExists | java.lang.Boolean | Indicates that the savepoint already exists on the given path. The Operator will not trigger any new savepoints, just update the status of the resource as a completed snapshot. |

### TaskManagerSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec

Expand Down Expand Up @@ -285,6 +351,47 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| lifecycleState | org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | Lifecycle state of the Flink resource (including being rolled back, failed etc.). |
| reconciliationStatus | org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus | Status of the last reconcile operation. |

### FlinkStateSnapshotStatus
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus

**Description**: Last observed status of the Flink state snapshot.

| Parameter | Type | Docs |
| ----------| ---- | ---- |

### State
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State

**Description**: Describes state of a snapshot.

| Value | Docs |
| ----- | ---- |
| COMPLETED | Snapshot was successful and available. |
| FAILED | Error during snapshot. |
| IN_PROGRESS | Snapshot in progress. |
| TRIGGER_PENDING | Not yet processed by the operator. |
| ABANDONED | Snapshot abandoned due to job failure/upgrade. |
| state | org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State | Current state of the snapshot. |
| triggerId | java.lang.String | Trigger ID of the snapshot. |
| triggerTimestamp | java.lang.String | Trigger timestamp of a pending snapshot operation. |
| resultTimestamp | java.lang.String | Timestamp when the snapshot was last created/failed. |
| path | java.lang.String | Final path of the snapshot. |
| error | java.lang.String | Optional error information about the FlinkStateSnapshot. |
| failures | int | Number of failures, used for tracking max retries. |

### State
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State

**Description**: Describes state of a snapshot.

| Value | Docs |
| ----- | ---- |
| COMPLETED | Snapshot was successful and available. |
| FAILED | Error during snapshot. |
| IN_PROGRESS | Snapshot in progress. |
| TRIGGER_PENDING | Not yet processed by the operator. |
| ABANDONED | Snapshot abandoned due to job failure/upgrade. |

### JobManagerDeploymentStatus
**Class**: org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus

Expand All @@ -310,6 +417,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| state | java.lang.String | Last observed state of the job. |
| startTime | java.lang.String | Start time of the job. |
| updateTime | java.lang.String | Update time of the job. |
| upgradeSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | |
| savepointInfo | org.apache.flink.kubernetes.operator.api.status.SavepointInfo | Information about pending and last savepoint for the job. |
| checkpointInfo | org.apache.flink.kubernetes.operator.api.status.CheckpointInfo | Information about pending and last checkpoint for the job. |

Expand Down Expand Up @@ -356,7 +464,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator. |
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator for manual and periodic snapshots. Only used if FlinkStateSnapshot resources are disabled. |
| triggerId | java.lang.String | Trigger id of a pending savepoint operation. |
| triggerTimestamp | java.lang.Long | Trigger timestamp of a pending savepoint operation. |
| triggerType | org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType | Savepoint trigger mechanism. |
Expand Down
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
<td>Boolean</td>
<td>Whether to enable clean up of savepoint history.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.dispose-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.format.type</h5></td>
<td style="word-wrap: break-word;">CANONICAL</td>
Expand All @@ -182,6 +188,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Create new FlinkStateSnapshot resources for storing snapshots. Disable if you wish to use the deprecated mode and save snapshot results to FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to legacy mode during runtime if the CRD is not found, even if this value is true.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.user.artifacts.http.header</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@
<td>Boolean</td>
<td>Whether to enable clean up of savepoint history.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.dispose-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.savepoint.format.type</h5></td>
<td style="word-wrap: break-word;">CANONICAL</td>
Expand Down Expand Up @@ -392,6 +398,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Create new FlinkStateSnapshot resources for storing snapshots. Disable if you wish to use the deprecated mode and save snapshot results to FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to legacy mode during runtime if the CRD is not found, even if this value is true.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.startup.stop-on-informer-error</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
1 change: 1 addition & 0 deletions e2e-tests/data/flinkdep-cr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spec:
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
kubernetes.operator.snapshot.resource.enabled: "false"
serviceAccount: flink
podTemplate:
spec:
Expand Down
1 change: 1 addition & 0 deletions e2e-tests/data/sessionjob-cr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spec:
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
kubernetes.operator.snapshot.resource.enabled: "false"
serviceAccount: flink
podTemplate:
spec:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,15 @@ public static String readable(Instant instant, ZoneId zoneId) {
ZonedDateTime dateTime = instant.atZone(zoneId);
return dateTime.format(DEFAULT_FORMATTER);
}

/**
* Convert an Instant to a format that is used in Kubernetes.
*
* @param instant The Instant to convert.
* @return The Kubernetes format in the system default zone.
*/
public static String kubernetes(Instant instant) {
ZonedDateTime dateTime = instant.atZone(ZoneId.systemDefault());
return dateTime.format(DateTimeFormatter.ISO_INSTANT);
}
}
1 change: 1 addition & 0 deletions flink-kubernetes-operator-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ under the License.
<includes>
<include>flinkdeployments.flink.apache.org-v1.yml</include>
<include>flinksessionjobs.flink.apache.org-v1.yml</include>
<include>flinkstatesnapshots.flink.apache.org-v1.yml</include>
</includes>
<filtering>false</filtering>
</resource>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ public class CrdConstants {
public static final String API_VERSION = "v1beta1";
public static final String KIND_SESSION_JOB = "FlinkSessionJob";
public static final String KIND_FLINK_DEPLOYMENT = "FlinkDeployment";
public static final String KIND_FLINK_STATE_SNAPSHOT = "FlinkStateSnapshot";

public static final String LABEL_TARGET_SESSION = "target.session";

public static final String EPHEMERAL_STORAGE = "ephemeral-storage";

public static final String LABEL_SNAPSHOT_TYPE = "snapshot.type";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.api;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

/** Custom resource definition for taking manual savepoints of Flink jobs. */
@Experimental
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonDeserialize()
@Group(CrdConstants.API_GROUP)
@Version(CrdConstants.API_VERSION)
@ShortNames({"flinksnp"})
public class FlinkStateSnapshot
extends CustomResource<FlinkStateSnapshotSpec, FlinkStateSnapshotStatus>
implements Namespaced {

@Override
public FlinkStateSnapshotSpec initSpec() {
return new FlinkStateSnapshotSpec();
}
}
Loading
Loading