Skip to content

Commit

Permalink
[FLINK-35265] Fix cleanup logic for abandoned snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
mateczagany committed Jul 24, 2024
1 parent 0f03c0a commit 58ed871
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,17 @@ public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws Exception {
resourceName);
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
case COMPLETED:
var flinkDeployment = getFlinkDeployment(ctx);
return handleSnapshotCleanup(resource, flinkDeployment, ctx);
case FAILED:
LOG.info(
"Savepoint was not successful, cleaning up resource {} without disposal...",
resourceName);
return DeleteControl.defaultDelete();
case TRIGGER_PENDING:
case ABANDONED:
LOG.info(
"Savepoint has not started yet, cleaning up resource {} without disposal...",
"Savepoint state is {}, cleaning up resource {} without disposal...",
state.name(),
resourceName);
return DeleteControl.defaultDelete();
case COMPLETED:
var flinkDeployment = getFlinkDeployment(ctx);
return handleSnapshotCleanup(resource, flinkDeployment, ctx);
default:
LOG.info("Unknown savepoint state for {}: {}", resourceName, state);
return DeleteControl.defaultDelete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.kubernetes.operator.api.spec.Resource;
import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
Expand Down Expand Up @@ -518,7 +519,11 @@ public Optional<String> validateStateSnapshot(
return Optional.of("Job reference must be supplied for this snapshot");
}

if (target.isEmpty()) {
// If the savepoint has already been processed by the operator, we don't need to check the job reference.
if (target.isEmpty()
&& (savepoint.getStatus() == null
|| FlinkStateSnapshotStatus.State.TRIGGER_PENDING.equals(
savepoint.getStatus().getState()))) {
return Optional.of(
String.format(
"Target for snapshot (%s) in namespace %s was not found",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ public void testReconcileSavepointCleanup() {
assertDeleteControl(controller.cleanup(snapshot, context), true, null);
assertThat(flinkService.getDisposedSavepoints()).isEmpty();

snapshot.getSpec().getSavepoint().setDisposeOnDelete(true);
snapshot.getStatus().setState(ABANDONED);
assertDeleteControl(controller.cleanup(snapshot, context), true, null);
assertThat(flinkService.getDisposedSavepoints()).isEmpty();

snapshot.getSpec().getSavepoint().setDisposeOnDelete(true);
snapshot.getStatus().setState(IN_PROGRESS);
assertDeleteControl(
Expand Down

0 comments on commit 58ed871

Please sign in to comment.