Skip to content

Commit

Permalink
[FLINK-35265] Add check for Flink1.17 and refactor snapshot state lif…
Browse files Browse the repository at this point in the history
…ecycle management
  • Loading branch information
mateczagany committed Jul 10, 2024
1 parent 63c0ec3 commit 0d2260a
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;

Expand Down Expand Up @@ -103,14 +103,9 @@ public UpdateControl<FlinkStateSnapshot> reconcile(
statusRecorder.patchAndCacheStatus(flinkStateSnapshot, ctx.getKubernetesClient());
reconciler.reconcile(ctx);
} catch (Exception e) {
eventRecorder.triggerSnapshotEvent(
flinkStateSnapshot,
EventRecorder.Type.Warning,
EventRecorder.Reason.SnapshotError,
EventRecorder.Component.Snapshot,
e.getMessage(),
josdkContext.getClient());
throw new ReconciliationException(e);
FlinkStateSnapshotUtils.snapshotFailed(
josdkContext.getClient(), eventRecorder, flinkStateSnapshot, e.getMessage());
LOG.error("Failed to reconcile {}", flinkStateSnapshot, e);
}

var updateControl = getUpdateControl(ctx);
Expand Down Expand Up @@ -161,7 +156,7 @@ private UpdateControl<FlinkStateSnapshot> getUpdateControl(FlinkStateSnapshotCon
"Snapshot {} failed and will be retried in {} seconds...",
resource.getMetadata().getName(),
retrySeconds);
resource.getStatus().setState(FlinkStateSnapshotState.TRIGGER_PENDING);
FlinkStateSnapshotUtils.snapshotTriggerPending(resource);
return UpdateControl.<FlinkStateSnapshot>noUpdate()
.rescheduleAfter(Duration.ofSeconds(retrySeconds));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.flink.kubernetes.operator.observer.snapshot;

import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
Expand All @@ -29,14 +27,11 @@
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;

import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;

/** The observer of {@link org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
@RequiredArgsConstructor
public class StateSnapshotObserver {
Expand Down Expand Up @@ -106,10 +101,12 @@ private void handleSavepoint(
resourceName,
resource.getStatus().getTriggerId());
} else if (savepointInfo.getError() != null) {
snapshotFailed(ctx.getKubernetesClient(), resource, savepointInfo.getError());
FlinkStateSnapshotUtils.snapshotFailed(
ctx.getKubernetesClient(), eventRecorder, resource, savepointInfo.getError());
} else {
LOG.info("Savepoint {} successful: {}", resourceName, savepointInfo.getLocation());
snapshotSuccessful(resource, savepointInfo.getLocation());
FlinkStateSnapshotUtils.snapshotSuccessful(
resource, savepointInfo.getLocation(), false);
}
}

Expand All @@ -130,7 +127,8 @@ private void handleCheckpoint(
}

if (checkpointInfo.getError() != null) {
snapshotFailed(ctx.getKubernetesClient(), resource, checkpointInfo.getError());
FlinkStateSnapshotUtils.snapshotFailed(
ctx.getKubernetesClient(), eventRecorder, resource, checkpointInfo.getError());
} else {
LOG.debug(
"Checkpoint {} was successful, querying final checkpoint path...",
Expand All @@ -146,39 +144,16 @@ private void handleCheckpoint(
if (checkpointStatsResult.isPending()) {
return;
} else if (checkpointStatsResult.getError() != null) {
snapshotFailed(
ctx.getKubernetesClient(), resource, checkpointStatsResult.getError());
FlinkStateSnapshotUtils.snapshotFailed(
ctx.getKubernetesClient(),
eventRecorder,
resource,
checkpointStatsResult.getError());
}

LOG.info("Checkpoint {} successful: {}", resourceName, checkpointStatsResult.getPath());
snapshotSuccessful(resource, checkpointStatsResult.getPath());
FlinkStateSnapshotUtils.snapshotSuccessful(
resource, checkpointStatsResult.getPath(), false);
}
}

private void snapshotFailed(
KubernetesClient kubernetesClient, FlinkStateSnapshot snapshot, String error) {
var reason =
snapshot.getSpec().isSavepoint()
? EventRecorder.Reason.SavepointError
: EventRecorder.Reason.CheckpointError;
eventRecorder.triggerSnapshotEvent(
snapshot,
EventRecorder.Type.Warning,
reason,
EventRecorder.Component.Snapshot,
String.format("Snapshot failed with error '%s'", error),
kubernetesClient);

snapshot.getStatus().setState(FlinkStateSnapshotState.FAILED);
snapshot.getStatus().setError(error);
snapshot.getStatus().setFailures(snapshot.getStatus().getFailures() + 1);
snapshot.getStatus().setResultTimestamp(DateTimeUtils.kubernetes(Instant.now()));
}

private void snapshotSuccessful(FlinkStateSnapshot snapshot, String location) {
snapshot.getStatus().setState(FlinkStateSnapshotState.COMPLETED);
snapshot.getStatus().setPath(location);
snapshot.getStatus().setError(null);
snapshot.getStatus().setResultTimestamp(DateTimeUtils.kubernetes(Instant.now()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@

package org.apache.flink.kubernetes.operator.reconciler.snapshot;

import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.apache.flink.util.Preconditions;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
Expand All @@ -39,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Optional;

/** The reconciler for the {@link org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
Expand All @@ -64,11 +60,9 @@ public void reconcile(FlinkStateSnapshotContext ctx) throws Exception {
LOG.info(
"Snapshot {} is marked as completed in spec, skipping triggering savepoint.",
resource.getMetadata().getName());
resource.getStatus().setState(FlinkStateSnapshotState.COMPLETED);
resource.getStatus().setPath(resource.getSpec().getSavepoint().getPath());
var time = DateTimeUtils.kubernetes(Instant.now());
resource.getStatus().setTriggerTimestamp(time);
resource.getStatus().setResultTimestamp(time);

FlinkStateSnapshotUtils.snapshotSuccessful(
resource, resource.getSpec().getSavepoint().getPath(), true);
return;
}

Expand All @@ -81,23 +75,14 @@ public void reconcile(FlinkStateSnapshotContext ctx) throws Exception {
}

var jobId = ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId();
var ctxFlinkDeployment =
ctxFactory.getResourceContext(
ctx.getReferencedJobFlinkDeployment(), ctx.getJosdkContext());
var triggerIdOpt =
triggerCheckpointOrSavepoint(resource.getSpec(), ctxFlinkDeployment, jobId);
var triggerIdOpt = triggerCheckpointOrSavepoint(resource.getSpec(), ctx, jobId);

if (triggerIdOpt.isEmpty()) {
LOG.warn("Failed to trigger snapshot {}", resource.getMetadata().getName());
return;
}

resource.getMetadata()
.getLabels()
.putIfAbsent(CrdConstants.LABEL_SNAPSHOT_TYPE, SnapshotTriggerType.MANUAL.name());
resource.getStatus().setState(FlinkStateSnapshotState.IN_PROGRESS);
resource.getStatus().setTriggerId(triggerIdOpt.get());
resource.getStatus().setTriggerTimestamp(DateTimeUtils.kubernetes(Instant.now()));
FlinkStateSnapshotUtils.snapshotInProgress(resource, triggerIdOpt.get());
}

public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws Exception {
Expand Down Expand Up @@ -186,11 +171,12 @@ private DeleteControl handleSnapshotCleanup(
}
}

private static Optional<String> triggerCheckpointOrSavepoint(
FlinkStateSnapshotSpec spec,
FlinkResourceContext<FlinkDeployment> flinkDeploymentContext,
String jobId)
private Optional<String> triggerCheckpointOrSavepoint(
FlinkStateSnapshotSpec spec, FlinkStateSnapshotContext ctx, String jobId)
throws Exception {
var flinkDeploymentContext =
ctxFactory.getResourceContext(
ctx.getReferencedJobFlinkDeployment(), ctx.getJosdkContext());
var flinkService = flinkDeploymentContext.getFlinkService();
var conf =
Preconditions.checkNotNull(
Expand All @@ -217,13 +203,24 @@ private static Optional<String> triggerCheckpointOrSavepoint(
spec.getSavepoint().getFormatType().name()),
path,
conf));
} else {
} else if (spec.isCheckpoint()) {
if (!SnapshotUtils.isSnapshotTriggeringSupported(conf)) {
FlinkStateSnapshotUtils.snapshotFailed(
ctx.getKubernetesClient(),
eventRecorder,
ctx.getResource(),
"Manual checkpoint triggering is not supported for this Flink job (requires Flink 1.17+)");
return Optional.empty();
}
return Optional.of(
flinkService.triggerCheckpoint(
jobId,
org.apache.flink.core.execution.CheckpointType.valueOf(
spec.getCheckpoint().getCheckpointType().name()),
conf));
} else {
throw new IllegalArgumentException(
"Snapshot must specify either savepoint or checkpoint spec");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public static boolean abandonSnapshotIfJobNotRunning(
String.format(
"Secondary resource %s for savepoint %s was not found",
snapshot.getSpec().getJobReference(), snapshot.getMetadata().getName());
abandonSnapshot(client, snapshot, eventRecorder, message);
snapshotAbandoned(client, snapshot, eventRecorder, message);
return true;
}

Expand All @@ -299,37 +299,117 @@ public static boolean abandonSnapshotIfJobNotRunning(
String.format(
"Secondary resource %s for savepoint %s is not running",
snapshot.getSpec().getJobReference(), snapshot.getMetadata().getName());
abandonSnapshot(client, snapshot, eventRecorder, message);
snapshotAbandoned(client, snapshot, eventRecorder, message);
return true;
}

return false;
}

/**
* Sets the status fields of the snapshot to an abandoned state and triggers a Kubernetes event.
* Sets a snapshot's state to {@link
* org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState#ABANDONED}.
*
* @param client Kubernetes client
* @param snapshot snapshot
* @param eventRecorder event recorder
* @param message message of the generated event
* @param kubernetesClient kubernetes client
* @param eventRecorder event recorder to add event
* @param snapshot snapshot resource
* @param error message for the event and to add to the resource status
*/
private static void abandonSnapshot(
KubernetesClient client,
private static void snapshotAbandoned(
KubernetesClient kubernetesClient,
FlinkStateSnapshot snapshot,
EventRecorder eventRecorder,
String message) {
String error) {
eventRecorder.triggerSnapshotEvent(
snapshot,
EventRecorder.Type.Warning,
EventRecorder.Reason.SnapshotAbandoned,
EventRecorder.Component.Snapshot,
message,
client);
error,
kubernetesClient);

snapshot.getStatus().setState(FlinkStateSnapshotState.ABANDONED);
snapshot.getStatus().setPath(null);
snapshot.getStatus().setError(null);
snapshot.getStatus().setError(error);
snapshot.getStatus().setResultTimestamp(DateTimeUtils.kubernetes(Instant.now()));
}

/**
* Sets a snapshot's state to {@link
* org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState#FAILED}.
*
* @param kubernetesClient kubernetes client
* @param eventRecorder event recorder to add event
* @param snapshot snapshot resource
* @param error error message to add to the resource status
*/
public static void snapshotFailed(
KubernetesClient kubernetesClient,
EventRecorder eventRecorder,
FlinkStateSnapshot snapshot,
String error) {
var reason =
snapshot.getSpec().isSavepoint()
? EventRecorder.Reason.SavepointError
: EventRecorder.Reason.CheckpointError;
eventRecorder.triggerSnapshotEvent(
snapshot,
EventRecorder.Type.Warning,
reason,
EventRecorder.Component.Snapshot,
error,
kubernetesClient);

snapshot.getStatus().setState(FlinkStateSnapshotState.FAILED);
snapshot.getStatus().setError(error);
snapshot.getStatus().setFailures(snapshot.getStatus().getFailures() + 1);
snapshot.getStatus().setResultTimestamp(DateTimeUtils.kubernetes(Instant.now()));
}

/**
* Sets a snapshot's state to {@link
* org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState#COMPLETED}.
*
* @param snapshot snapshot resource
* @param location result location
* @param setTriggerTimestamp if ture, set the trigger timestamp to current time
*/
public static void snapshotSuccessful(
FlinkStateSnapshot snapshot, String location, boolean setTriggerTimestamp) {
var time = DateTimeUtils.kubernetes(Instant.now());

snapshot.getStatus().setState(FlinkStateSnapshotState.COMPLETED);
snapshot.getStatus().setPath(location);
snapshot.getStatus().setError(null);
snapshot.getStatus().setResultTimestamp(time);
if (setTriggerTimestamp) {
snapshot.getStatus().setTriggerTimestamp(time);
}
}

/**
* Sets a snapshot's state to {@link
* org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState#IN_PROGRESS}.
*
* @param snapshot snapshot resource
* @param triggerId trigger ID
*/
public static void snapshotInProgress(FlinkStateSnapshot snapshot, String triggerId) {
snapshot.getMetadata()
.getLabels()
.putIfAbsent(CrdConstants.LABEL_SNAPSHOT_TYPE, SnapshotTriggerType.MANUAL.name());
snapshot.getStatus().setState(FlinkStateSnapshotState.IN_PROGRESS);
snapshot.getStatus().setTriggerId(triggerId);
snapshot.getStatus().setTriggerTimestamp(DateTimeUtils.kubernetes(Instant.now()));
}

/**
* Sets a snapshot's state to {@link
* org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState#TRIGGER_PENDING}.
*
* @param snapshot snapshot resource
*/
public static void snapshotTriggerPending(FlinkStateSnapshot snapshot) {
snapshot.getStatus().setState(FlinkStateSnapshotState.TRIGGER_PENDING);
}
}
Loading

0 comments on commit 0d2260a

Please sign in to comment.