Skip to content

Commit

Permalink
[FLINK-35265] Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mateczagany committed Jul 5, 2024
1 parent 87b1c3d commit 5c78025
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
public class FlinkStateSnapshotStatus implements Diffable<FlinkStateSnapshotStatus> {

/** Current state of the snapshot. */
@PrinterColumn(name = "Snapshot Status")
@PrinterColumn(name = "Snapshot State")
private FlinkStateSnapshotState state = FlinkStateSnapshotState.TRIGGER_PENDING;

/** Trigger ID of the snapshot. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public UpdateControl<FlinkStateSnapshot> reconcile(
observer.observe(ctx);

// validate
if (!validateSavepoint(ctx)) {
if (!validateSnapshot(ctx)) {
statusRecorder.patchAndCacheStatus(flinkStateSnapshot, ctx.getKubernetesClient());
UpdateControl<FlinkStateSnapshot> updateControl = UpdateControl.noUpdate();
return updateControl.rescheduleAfter(
Expand Down Expand Up @@ -186,7 +186,7 @@ public Map<String, EventSource> prepareEventSources(
EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context));
}

private boolean validateSavepoint(FlinkStateSnapshotContext ctx) {
private boolean validateSnapshot(FlinkStateSnapshotContext ctx) {
var savepoint = ctx.getResource();
for (var validator : validators) {
var validationError =
Expand All @@ -199,7 +199,7 @@ private boolean validateSavepoint(FlinkStateSnapshotContext ctx) {
EventRecorder.Component.Operator,
validationError.get(),
ctx.getKubernetesClient());
return true;
return false;
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,7 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
updateStatusBeforeFirstDeployment(
cr, spec, deployConfig, status, ctx.getKubernetesClient());

Optional<String> savepointPathOpt =
Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath);
if (spec.getJob() != null && spec.getJob().getFlinkStateSnapshotReference() != null) {
var snapshotRef = spec.getJob().getFlinkStateSnapshotReference();
if (snapshotRef.getName() != null) {
savepointPathOpt =
Optional.of(
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
ctx.getKubernetesClient(), snapshotRef));
}
}

deploy(ctx, spec, deployConfig, savepointPathOpt, false);
deploy(ctx, spec, deployConfig, getInitialSnapshotPath(ctx, spec), false);

ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock);
return;
Expand Down Expand Up @@ -192,6 +180,22 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
}
}

private Optional<String> getInitialSnapshotPath(
FlinkResourceContext<CR> ctx, AbstractFlinkSpec spec) {
if (spec.getJob() == null || spec.getJob().getFlinkStateSnapshotReference() == null) {
return Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath);
}

var snapshotRef = spec.getJob().getFlinkStateSnapshotReference();
if (snapshotRef.getName() != null) {
return Optional.of(
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
ctx.getKubernetesClient(), snapshotRef));
}

return Optional.empty();
}

private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
var autoScalerCtx = ctx.getJobAutoScalerContext();
boolean autoscalerEnabled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,19 @@ public void testReconcileSnapshotDeploymentDoesNotExist() {
var deployment = createDeployment();
context = TestUtils.createSnapshotContext(client, null);
var snapshot = createSavepoint(deployment);
controller.reconcile(snapshot, context);

var exception =
assertThrows(
ReconciliationException.class,
() -> controller.reconcile(snapshot, context));
assertThat(exception.getCause()).hasMessageContaining("not found");
assertThat(snapshot.getStatus().getState())
.isEqualTo(FlinkStateSnapshotState.TRIGGER_PENDING);

assertThat(flinkStateSnapshotEventCollector.events)
.anySatisfy(
event -> {
assertThat(event.getReason())
.isEqualTo(EventRecorder.Reason.SnapshotError.name());
assertThat(event.getType())
.isEqualTo(EventRecorder.Type.Warning.name());
assertThat(event.getMessage())
.isEqualTo(exception.getCause().getMessage());
.hasSize(1)
.allSatisfy(
e -> {
assertThat(e.getReason())
.isEqualTo(EventRecorder.Reason.ValidationError.name());
assertThat(e.getType()).isEqualTo(EventRecorder.Type.Warning.name());
assertThat(e.getMessage()).contains("was not found");
});
}

Expand Down

0 comments on commit 5c78025

Please sign in to comment.