Skip to content

Commit

Permalink
[FLINK-35265] Add upgradeSnapshotReference to track latest snapshot o…
Browse files Browse the repository at this point in the history
…f Flink jobs
  • Loading branch information
mateczagany committed Jul 8, 2024
1 parent ec34cf1 commit 5876f6a
Show file tree
Hide file tree
Showing 23 changed files with 303 additions and 305 deletions.
3 changes: 2 additions & 1 deletion docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| state | java.lang.String | Last observed state of the job. |
| startTime | java.lang.String | Start time of the job. |
| updateTime | java.lang.String | Update time of the job. |
| upgradeSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | |
| savepointInfo | org.apache.flink.kubernetes.operator.api.status.SavepointInfo | Information about pending and last savepoint for the job. |
| checkpointInfo | org.apache.flink.kubernetes.operator.api.status.CheckpointInfo | Information about pending and last checkpoint for the job. |

Expand Down Expand Up @@ -449,7 +450,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator. |
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator for manual and periodic snapshots. Only used if FlinkStateSnapshot resources are disabled. |
| triggerId | java.lang.String | Trigger id of a pending savepoint operation. |
| triggerTimestamp | java.lang.Long | Trigger timestamp of a pending savepoint operation. |
| triggerType | org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType | Savepoint trigger mechanism. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.api.spec;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -45,4 +46,13 @@ public class FlinkStateSnapshotReference {
* savepoint path.
*/
private String path;

public static FlinkStateSnapshotReference fromPath(String path) {
return new FlinkStateSnapshotReference(null, null, path);
}

public static FlinkStateSnapshotReference fromResource(FlinkStateSnapshot resource) {
return new FlinkStateSnapshotReference(
resource.getMetadata().getNamespace(), resource.getMetadata().getName(), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.api.status;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.fabric8.crd.generator.annotation.PrinterColumn;
Expand Down Expand Up @@ -50,6 +51,8 @@ public class JobStatus {
/** Update time of the job. */
private String updateTime;

private FlinkStateSnapshotReference upgradeSnapshotReference;

/** Information about pending and last savepoint for the job. */
@Deprecated private SavepointInfo savepointInfo = new SavepointInfo();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
@AllArgsConstructor
@Builder
public class SavepointInfo implements SnapshotInfo {
/** Last completed savepoint by the operator. */
/**
* Last completed savepoint by the operator for manual and periodic snapshots. Only used if
* FlinkStateSnapshot resources are disabled.
*/
private Savepoint lastSavepoint;

/** Trigger id of a pending savepoint operation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
Expand Down Expand Up @@ -63,7 +65,6 @@ public void observeSavepointStatus(FlinkResourceContext<CR> ctx) {

var resource = ctx.getResource();
var jobStatus = resource.getStatus().getJobStatus();
var savepointInfo = jobStatus.getSavepointInfo();
var jobId = jobStatus.getJobId();

// If any manual or periodic savepoint is in progress, observe it
Expand All @@ -73,11 +74,11 @@ public void observeSavepointStatus(FlinkResourceContext<CR> ctx) {

// If job is in globally terminal state, observe last savepoint
if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
observeLatestSavepoint(
ctx.getFlinkService(), savepointInfo, jobId, ctx.getObserveConfig());
observeLatestCheckpoint(
ctx.getFlinkService(), jobStatus, jobId, ctx.getObserveConfig());
}

cleanupSavepointHistory(ctx, savepointInfo);
cleanupSavepointHistory(ctx, jobStatus.getSavepointInfo());
}

public void observeCheckpointStatus(FlinkResourceContext<CR> ctx) {
Expand Down Expand Up @@ -122,7 +123,9 @@ private void observeTriggeredSavepoint(FlinkResourceContext<CR> ctx, String jobI
"Savepoint attempt failed after grace period. Won't be retried again: "
+ err);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
savepointInfo, (AbstractFlinkResource) resource, SAVEPOINT);
savepointInfo.getTriggerType(),
(AbstractFlinkResource) resource,
SAVEPOINT);
} else {
LOG.warn("Savepoint failed within grace period, retrying: " + err);
}
Expand All @@ -149,7 +152,9 @@ private void observeTriggeredSavepoint(FlinkResourceContext<CR> ctx, String jobI
: null);

ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
savepointInfo, resource, SAVEPOINT);
savepointInfo.getTriggerType(), resource, SAVEPOINT);

// In case of periodic and manual savepoint, we still use lastSavepoint
savepointInfo.updateLastSavepoint(savepoint);
}

Expand Down Expand Up @@ -181,7 +186,9 @@ private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job
"Checkpoint attempt failed after grace period. Won't be retried again: "
+ err);
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
checkpointInfo, (AbstractFlinkResource) resource, CHECKPOINT);
checkpointInfo.getTriggerType(),
(AbstractFlinkResource) resource,
CHECKPOINT);
} else {
LOG.warn("Checkpoint failed within grace period, retrying: " + err);
}
Expand All @@ -207,7 +214,7 @@ private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job
: null);

ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
checkpointInfo, resource, CHECKPOINT);
checkpointInfo.getTriggerType(), resource, CHECKPOINT);
checkpointInfo.updateLastCheckpoint(checkpoint);
}

Expand Down Expand Up @@ -273,15 +280,19 @@ private void disposeSavepointQuietly(
}
}

private void observeLatestSavepoint(
private void observeLatestCheckpoint(
FlinkService flinkService,
SavepointInfo savepointInfo,
JobStatus jobStatus,
String jobID,
Configuration observeConfig) {
try {
flinkService
.getLastCheckpoint(JobID.fromHexString(jobID), observeConfig)
.ifPresent(savepointInfo::updateLastSavepoint);
.ifPresent(
snapshot ->
jobStatus.setUpgradeSnapshotReference(
FlinkStateSnapshotReference.fromPath(
snapshot.getLocation())));
} catch (Exception e) {
LOG.error("Could not observe latest savepoint information.", e);
throw new ReconciliationException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
Expand Down Expand Up @@ -186,12 +185,12 @@ public static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconcili
}

public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSnapshotTriggerNonce(
SnapshotInfo snapshotInfo,
SnapshotTriggerType snapshotTriggerType,
AbstractFlinkResource<SPEC, ?> target,
SnapshotType snapshotType) {

// We only need to update for MANUAL triggers
if (snapshotInfo.getTriggerType() != SnapshotTriggerType.MANUAL) {
if (snapshotTriggerType != SnapshotTriggerType.MANUAL) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
Expand Down Expand Up @@ -182,18 +180,18 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {

private Optional<String> getInitialSnapshotPath(
FlinkResourceContext<CR> ctx, AbstractFlinkSpec spec) {
if (spec.getJob() == null || spec.getJob().getFlinkStateSnapshotReference() == null) {
return Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath);
if (spec.getJob() == null) {
return Optional.empty();
}

var snapshotRef = spec.getJob().getFlinkStateSnapshotReference();
if (snapshotRef.getName() != null) {
if (spec.getJob().getFlinkStateSnapshotReference() != null) {
return Optional.of(
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
ctx.getKubernetesClient(), snapshotRef));
ctx.getKubernetesClient(),
spec.getJob().getFlinkStateSnapshotReference()));
}

return Optional.empty();
return Optional.ofNullable(spec.getJob().getInitialSavepointPath());
}

private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
Expand Down Expand Up @@ -239,11 +237,12 @@ private void updateStatusBeforeFirstDeployment(
var initialSp = spec.getJob().getInitialSavepointPath();

if (snapshotRef != null) {
status.getJobStatus().setUpgradeSnapshotReference(snapshotRef);
initialUpgradeMode = UpgradeMode.SAVEPOINT;
} else if (initialSp != null) {
status.getJobStatus()
.getSavepointInfo()
.setLastSavepoint(Savepoint.of(initialSp, SnapshotTriggerType.UNKNOWN));
.setUpgradeSnapshotReference(
FlinkStateSnapshotReference.fromPath(initialSp));
initialUpgradeMode = UpgradeMode.SAVEPOINT;
}

Expand Down
Loading

0 comments on commit 5876f6a

Please sign in to comment.