Skip to content

Commit

Permalink
[FLINK-35265] Savepoints with alreadyExists can be created without jo…
Browse files Browse the repository at this point in the history
…b reference
  • Loading branch information
mateczagany committed Jul 15, 2024
1 parent 71a4137 commit 17bd60b
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| jobReference | org.apache.flink.kubernetes.operator.api.spec.JobReference | Source to take a snapshot of. |
| jobReference | org.apache.flink.kubernetes.operator.api.spec.JobReference | Source to take a snapshot of. Not required if it's a savepoint and alreadyExists is true. |
| savepoint | org.apache.flink.kubernetes.operator.api.spec.SavepointSpec | Spec in case of savepoint. |
| checkpoint | org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec | Spec in case of checkpoint. |
| backoffLimit | int | Maximum number of retries before the snapshot is considered as failed. Set to -1 for unlimited or 0 for no retries. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@SuperBuilder
@JsonIgnoreProperties(ignoreUnknown = true)
public class FlinkStateSnapshotSpec implements Diffable<FlinkStateSnapshotSpec> {
/** Source to take a snapshot of. */
/** Source to take a snapshot of. Not required if it's a savepoint and alreadyExists is true. */
private JobReference jobReference;

/** Spec in case of savepoint. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public FlinkOperatorConfiguration getOperatorConfig() {

public Optional<AbstractFlinkResource<?, ?>> getSecondaryResource() {
var jobRef = getResource().getSpec().getJobReference();
if (jobRef == null) {
return Optional.empty();
}

if (JobKind.FLINK_DEPLOYMENT.equals(jobRef.getKind())) {
return getJosdkContext().getSecondaryResource(FlinkDeployment.class).map(r -> r);
} else if (JobKind.FLINK_SESSION_JOB.equals(jobRef.getKind())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,23 +501,30 @@ public Optional<String> validateSessionJob(
@Override
public Optional<String> validateStateSnapshot(
FlinkStateSnapshot savepoint, Optional<AbstractFlinkResource<?, ?>> target) {
var namespace = savepoint.getMetadata().getNamespace();
var spec = savepoint.getSpec();
var targetName = spec.getJobReference().toString();

if (target.isEmpty()) {
return Optional.of(
String.format(
"Target for snapshot (%s) in namespace %s was not found",
targetName, namespace));
}

if ((!spec.isSavepoint() && !spec.isCheckpoint())
|| (spec.isSavepoint() && spec.isCheckpoint())) {
return Optional.of(
"Exactly one of checkpoint or savepoint configurations has to be set.");
}

if (spec.isSavepoint() && spec.getSavepoint().getAlreadyExists()) {
return Optional.empty();
}

// The remaining checks are not required if savepoint already exists.
if (spec.getJobReference() == null) {
return Optional.of("Job reference must be supplied for this snapshot");
}

if (target.isEmpty()) {
return Optional.of(
String.format(
"Target for snapshot (%s) in namespace %s was not found",
spec.getJobReference(), savepoint.getMetadata().getNamespace()));
}

return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ public void testReconcileBackoff(int backoffLimit) {
assertThat(snapshot.getStatus().getState()).isEqualTo(FlinkStateSnapshotState.FAILED);
}

@Test
public void testReconcileSavepointAlreadyExists() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testReconcileSavepointAlreadyExists(boolean jobReferenced) {
var deployment = createDeployment();
context = TestUtils.createSnapshotContext(client, deployment);
var snapshot = createSavepoint(deployment, true);
context = TestUtils.createSnapshotContext(client, jobReferenced ? deployment : null);
var snapshot = createSavepoint(jobReferenced ? deployment : null, true);

controller.reconcile(snapshot, context);

Expand Down Expand Up @@ -559,7 +560,7 @@ private FlinkStateSnapshot createSavepoint(
"test",
SAVEPOINT_PATH,
alreadyExists,
JobReference.fromFlinkResource(deployment));
deployment == null ? null : JobReference.fromFlinkResource(deployment));
snapshot.getSpec().setBackoffLimit(backoffLimit);
snapshot.getSpec().getSavepoint().setFormatType(SavepointFormatType.CANONICAL);
client.resource(snapshot).create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,17 @@ public void testFlinkStateSnapshotValidator() {
},
"Exactly one of checkpoint or savepoint configurations has to be set.");

testStateSnapshotValidateWithModifier(
snapshot -> snapshot.getSpec().setJobReference(null),
"Job reference must be supplied for this snapshot");

testStateSnapshotValidateWithModifier(
snapshot -> {
snapshot.getSpec().setJobReference(null);
snapshot.getSpec().getSavepoint().setAlreadyExists(true);
},
null);

var refName = "does-not-exist";
var snapshot =
TestUtils.buildFlinkStateSnapshotSavepoint(
Expand Down

0 comments on commit 17bd60b

Please sign in to comment.