Skip to content

Commit

Permalink
[FLINK-35265] Add namespace field to JobReference
Browse files Browse the repository at this point in the history
  • Loading branch information
mateczagany committed Aug 7, 2024
1 parent 5d179ca commit 56a30ff
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| ----------| ---- | ---- |
| kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. |
| name | java.lang.String | Name of the Flink resource. |
| namespace | java.lang.String | Namespace of the Flink resource. If empty, the operator will use the namespace of the snapshot. |

### JobSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,16 @@ public class JobReference {
/** Name of the Flink resource. */
private String name;

/**
* Namespace of the Flink resource. If empty, the operator will use the namespace of the
* snapshot.
*/
private String namespace;

public static JobReference fromFlinkResource(AbstractFlinkResource<?, ?> flinkResource) {
var result = new JobReference();
result.setName(flinkResource.getMetadata().getName());
result.setNamespace(flinkResource.getMetadata().getNamespace());

if (flinkResource instanceof FlinkDeployment) {
result.setKind(JobKind.FLINK_DEPLOYMENT);
Expand All @@ -64,6 +71,6 @@ public String toString() {
} else if (kind == JobKind.FLINK_SESSION_JOB) {
kindString = CrdConstants.KIND_SESSION_JOB;
}
return String.format("%s/%s", kindString, name);
return String.format("%s/%s (%s)", namespace, name, kindString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,9 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(
return Set.of();
}
return Set.of(
new ResourceID(
snapshot.getSpec()
.getJobReference()
.getName(),
snapshot.getMetadata().getNamespace()));
FlinkStateSnapshotUtils
.getSnapshotJobReferenceResourceId(
snapshot));
})
.withNamespacesInheritedFromController(context)
.followNamespaceChanges(true)
Expand All @@ -206,12 +204,9 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(

// If FlinkSessionJob, retrieve deployment
var resourceId =
new ResourceID(
snapshot.getSpec()
.getJobReference()
.getName(),
snapshot.getMetadata()
.getNamespace());
FlinkStateSnapshotUtils
.getSnapshotJobReferenceResourceId(
snapshot);
var flinkSessionJob =
flinkSessionJobEventSource
.get(resourceId)
Expand All @@ -226,11 +221,9 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(
.getNamespace()));
}
return Set.of(
new ResourceID(
snapshot.getSpec()
.getJobReference()
.getName(),
snapshot.getMetadata().getNamespace()));
FlinkStateSnapshotUtils
.getSnapshotJobReferenceResourceId(
snapshot));
})
.withNamespacesInheritedFromController(context)
.followNamespaceChanges(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nullable;

import java.time.Instant;
import java.util.Optional;

import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
Expand Down Expand Up @@ -373,4 +375,19 @@ public static void snapshotInProgress(FlinkStateSnapshot snapshot, String trigge
public static void snapshotTriggerPending(FlinkStateSnapshot snapshot) {
snapshot.getStatus().setState(TRIGGER_PENDING);
}

/**
* Extracts the namespace of the job reference from a snapshot resource. This is either
* explicitly specified in the job reference, or it will fallback to the namespace of the
* snapshot.
*
* @param snapshot snapshot resource
* @return namespace with the job reference to be found in
*/
public static ResourceID getSnapshotJobReferenceResourceId(FlinkStateSnapshot snapshot) {
var namespace =
Optional.ofNullable(snapshot.getSpec().getJobReference().getNamespace())
.orElse(snapshot.getMetadata().getNamespace());
return new ResourceID(snapshot.getSpec().getJobReference().getName(), namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
Expand Down Expand Up @@ -501,8 +502,8 @@ public Optional<String> validateSessionJob(

@Override
public Optional<String> validateStateSnapshot(
FlinkStateSnapshot savepoint, Optional<AbstractFlinkResource<?, ?>> target) {
var spec = savepoint.getSpec();
FlinkStateSnapshot snapshot, Optional<AbstractFlinkResource<?, ?>> target) {
var spec = snapshot.getSpec();

if ((!spec.isSavepoint() && !spec.isCheckpoint())
|| (spec.isSavepoint() && spec.isCheckpoint())) {
Expand All @@ -522,13 +523,14 @@ public Optional<String> validateStateSnapshot(
// If the savepoint has already been processed by the operator, we don't need to check the
// job reference.
if (target.isEmpty()
&& (savepoint.getStatus() == null
&& (snapshot.getStatus() == null
|| FlinkStateSnapshotStatus.State.TRIGGER_PENDING.equals(
savepoint.getStatus().getState()))) {
snapshot.getStatus().getState()))) {
var resourceId = FlinkStateSnapshotUtils.getSnapshotJobReferenceResourceId(snapshot);
return Optional.of(
String.format(
"Target for snapshot (%s) in namespace %s was not found",
spec.getJobReference(), savepoint.getMetadata().getNamespace()));
"Target for snapshot %s/%s was not found",
resourceId.getNamespace().orElse(null), resourceId.getName()));
}

return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,10 @@ public void testReconcileJobNotFound() {
var snapshot = createSavepoint(deployment);
var errorMessage =
String.format(
"Secondary resource FlinkDeployment/%s for savepoint snapshot-test was not found",
deployment.getMetadata().getName());
"Secondary resource %s/%s (%s) for savepoint snapshot-test was not found",
deployment.getMetadata().getNamespace(),
deployment.getMetadata().getName(),
CrdConstants.KIND_FLINK_DEPLOYMENT);

// First reconcile will trigger the snapshot.
controller.reconcile(snapshot, TestUtils.createSnapshotContext(client, deployment));
Expand Down Expand Up @@ -556,8 +558,10 @@ public void testReconcileJobNotRunning() {
var snapshot = createSavepoint(deployment);
var errorMessage =
String.format(
"Secondary resource FlinkDeployment/%s for savepoint snapshot-test is not running",
deployment.getMetadata().getName());
"Secondary resource %s/%s (%s) for savepoint snapshot-test is not running",
deployment.getMetadata().getNamespace(),
deployment.getMetadata().getName(),
CrdConstants.KIND_FLINK_DEPLOYMENT);

controller.reconcile(snapshot, context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,19 +1072,19 @@ public void testFlinkStateSnapshotValidator() {
null);

var refName = "does-not-exist";
var namespace = "default";
var snapshot =
TestUtils.buildFlinkStateSnapshotSavepoint(
false,
JobReference.builder()
.kind(JobKind.FLINK_DEPLOYMENT)
.name(refName)
.namespace(namespace)
.build());
testStateSnapshotValidate(
snapshot,
Optional.empty(),
String.format(
"Target for snapshot (FlinkDeployment/%s) in namespace test was not found",
refName));
String.format("Target for snapshot %s/%s was not found", namespace, refName));
}

private void testStateSnapshotValidateWithModifier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.JobKind;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -101,15 +102,21 @@ private void validateSessionJob(KubernetesResource resource) {
}

private void validateStateSnapshot(KubernetesResource resource) {
FlinkStateSnapshot savepoint =
objectMapper.convertValue(resource, FlinkStateSnapshot.class);
FlinkStateSnapshot snapshot = objectMapper.convertValue(resource, FlinkStateSnapshot.class);

var namespace = savepoint.getMetadata().getNamespace();
var jobRef = savepoint.getSpec().getJobReference();
var jobRef = snapshot.getSpec().getJobReference();

AbstractFlinkResource<?, ?> targetResource = null;
if (jobRef != null && jobRef.getName() != null && jobRef.getKind() != null) {
var namespace =
FlinkStateSnapshotUtils.getSnapshotJobReferenceResourceId(snapshot)
.getNamespace()
.orElseThrow(
() ->
new IllegalArgumentException(
"Cannot determine namespace for snapshot"));
var key = Cache.namespaceKeyFunc(namespace, jobRef.getName());

if (JobKind.FLINK_DEPLOYMENT.equals(jobRef.getKind())) {
targetResource =
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
Expand All @@ -124,7 +131,7 @@ private void validateStateSnapshot(KubernetesResource resource) {

for (FlinkResourceValidator validator : validators) {
Optional<String> validationError =
validator.validateStateSnapshot(savepoint, Optional.ofNullable(targetResource));
validator.validateStateSnapshot(snapshot, Optional.ofNullable(targetResource));
if (validationError.isPresent()) {
throw new NotAllowedException(validationError.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ spec:
type: string
name:
type: string
namespace:
type: string
type: object
savepoint:
properties:
Expand Down

0 comments on commit 56a30ff

Please sign in to comment.