Skip to content

Commit

Permalink
[FLINK-35265] Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mateczagany committed Jul 25, 2024
1 parent 3d9fc21 commit cd3fca7
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class FlinkConfigManager {

private volatile Configuration defaultConfig;
private volatile FlinkOperatorConfiguration defaultOperatorConfiguration;
// TODO: From 1.11 release, snapshot CRD should be mandatory and this can be removed.
private final boolean snapshotCrdInstalled;
private final AtomicLong defaultConfigVersion = new AtomicLong(0);
private final LoadingCache<Key, Configuration> cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ public class FlinkStateSnapshotContext {
/**
* @return Operator configuration for this resource.
*/
public FlinkOperatorConfiguration operatorConfig() {
return configManager.getOperatorConfiguration(
getResource().getMetadata().getNamespace(), null);
private FlinkOperatorConfiguration operatorConfig() {
return getConfigManager()
.getOperatorConfiguration(getResource().getMetadata().getNamespace(), null);
}

public Configuration referencedJobObserveConfig() {
return configManager.getObserveConfig(getReferencedJobFlinkDeployment());
private Configuration referencedJobObserveConfig() {
return getConfigManager().getObserveConfig(getReferencedJobFlinkDeployment());
}

public FlinkDeployment referencedJobFlinkDeployment() {
private FlinkDeployment referencedJobFlinkDeployment() {
return getJosdkContext()
.getSecondaryResource(FlinkDeployment.class)
.orElseThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private Optional<String> getInitialSnapshotPath(

if (spec.getJob().getFlinkStateSnapshotReference() != null) {
return Optional.of(
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
ctx.getKubernetesClient(),
spec.getJob().getFlinkStateSnapshotReference()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ protected void restoreJob(
.map(
ref ->
FlinkStateSnapshotUtils
.getAndValidateFlinkStateSnapshotPath(
.getValidatedFlinkStateSnapshotPath(
ctx.getKubernetesClient(), ref));
}

Expand Down Expand Up @@ -465,7 +465,7 @@ private void redeployWithSavepoint(
if (snapshotRef != null) {
savepointPath =
Optional.of(
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
ctx.getKubernetesClient(), snapshotRef));
status.getJobStatus().setUpgradeSnapshotReference(snapshotRef);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ public String triggerSavepoint(
String savepointDirectory,
Configuration conf)
throws Exception {
LOG.info("Triggering new savepoint using new method");
try (var clusterClient = getClusterClient(conf)) {
var savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
var savepointTriggerMessageParameters =
Expand All @@ -520,7 +519,7 @@ public String triggerSavepoint(
@Override
public String triggerCheckpoint(
String jobId,
org.apache.flink.core.execution.CheckpointType checkpointFormatType,
org.apache.flink.core.execution.CheckpointType checkpointType,
Configuration conf)
throws Exception {
LOG.info("Triggering new checkpoint");
Expand All @@ -537,7 +536,7 @@ public String triggerCheckpoint(
.sendRequest(
checkpointTriggerHeaders,
checkpointTriggerMessageParameters,
new CheckpointTriggerRequestBody(checkpointFormatType, null))
new CheckpointTriggerRequestBody(checkpointType, null))
.get(timeout, TimeUnit.SECONDS);
LOG.info("Checkpoint successfully triggered: " + response.getTriggerId().toHexString());
return response.getTriggerId().toHexString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ String triggerSavepoint(

String triggerCheckpoint(
String jobId,
org.apache.flink.core.execution.CheckpointType checkpointFormatType,
org.apache.flink.core.execution.CheckpointType checkpointType,
Configuration conf)
throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.kubernetes.operator.utils;

import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
Expand All @@ -31,6 +32,7 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;

Expand Down Expand Up @@ -168,22 +170,7 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(

InformerConfiguration<FlinkSessionJob> configurationFlinkSessionJob =
InformerConfiguration.from(FlinkSessionJob.class, context)
.withSecondaryToPrimaryMapper(
flinkResource ->
context
.getPrimaryCache()
.byIndex(
FLINK_STATE_SNAPSHOT_IDX,
indexKey(
flinkResource
.getMetadata()
.getName(),
flinkResource
.getMetadata()
.getNamespace()))
.stream()
.map(ResourceID::fromResource)
.collect(Collectors.toSet()))
.withSecondaryToPrimaryMapper(getSnapshotPrimaryMapper(context))
.withPrimaryToSecondaryMapper(
(PrimaryToSecondaryMapper<FlinkStateSnapshot>)
snapshot -> {
Expand All @@ -208,22 +195,7 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(

InformerConfiguration<FlinkDeployment> configurationFlinkDeployment =
InformerConfiguration.from(FlinkDeployment.class, context)
.withSecondaryToPrimaryMapper(
flinkResource ->
context
.getPrimaryCache()
.byIndex(
FLINK_STATE_SNAPSHOT_IDX,
indexKey(
flinkResource
.getMetadata()
.getName(),
flinkResource
.getMetadata()
.getNamespace()))
.stream()
.map(ResourceID::fromResource)
.collect(Collectors.toSet()))
.withSecondaryToPrimaryMapper(getSnapshotPrimaryMapper(context))
.withPrimaryToSecondaryMapper(
(PrimaryToSecondaryMapper<FlinkStateSnapshot>)
snapshot -> {
Expand Down Expand Up @@ -269,6 +241,22 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(
return new EventSource[] {flinkSessionJobEventSource, flinkDeploymentEventSource};
}

private static <T extends AbstractFlinkResource<?, ?>>
SecondaryToPrimaryMapper<T> getSnapshotPrimaryMapper(
EventSourceContext<FlinkStateSnapshot> ctx) {
return flinkResource ->
ctx
.getPrimaryCache()
.byIndex(
FLINK_STATE_SNAPSHOT_IDX,
indexKey(
flinkResource.getMetadata().getName(),
flinkResource.getMetadata().getNamespace()))
.stream()
.map(ResourceID::fromResource)
.collect(Collectors.toSet());
}

private static String indexKey(String name, String namespace) {
return name + "#" + namespace;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public class FlinkStateSnapshotUtils {
* @param snapshotRef snapshot reference
* @return found savepoint path
*/
public static String getAndValidateFlinkStateSnapshotPath(
public static String getValidatedFlinkStateSnapshotPath(
KubernetesClient kubernetesClient, FlinkStateSnapshotReference snapshotRef) {
if (!StringUtils.isBlank(snapshotRef.getPath())) {
if (StringUtils.isNotBlank(snapshotRef.getPath())) {
return snapshotRef.getPath();
}

Expand All @@ -74,33 +74,20 @@ public static String getAndValidateFlinkStateSnapshotPath(
String.format("Invalid snapshot name: %s", snapshotRef.getName()));
}

FlinkStateSnapshot result;
if (snapshotRef.getName() != null) {
var namespace = snapshotRef.getNamespace();
if (namespace == null) {
result =
kubernetesClient
var result =
snapshotRef.getNamespace() == null
? kubernetesClient
.resources(FlinkStateSnapshot.class)
.withName(snapshotRef.getName())
.get();
} else {
result =
kubernetesClient
.get()
: kubernetesClient
.resources(FlinkStateSnapshot.class)
.inNamespace(namespace)
.inNamespace(snapshotRef.getNamespace())
.withName(snapshotRef.getName())
.get();
}
} else {
result =
kubernetesClient
.resources(FlinkStateSnapshot.class)
.withName(snapshotRef.getName())
.get();
}

if (result == null) {
throw new IllegalArgumentException(
throw new IllegalStateException(
String.format(
"Cannot find snapshot %s in namespace %s.",
snapshotRef.getNamespace(), snapshotRef.getName()));
Expand All @@ -116,15 +103,15 @@ public static String getAndValidateFlinkStateSnapshotPath(
}

if (COMPLETED != result.getStatus().getState()) {
throw new IllegalArgumentException(
throw new IllegalStateException(
String.format(
"Snapshot %s/%s is not complete yet.",
snapshotRef.getNamespace(), snapshotRef.getName()));
}

var path = result.getStatus().getPath();
if (StringUtils.isBlank(path)) {
throw new IllegalArgumentException(
throw new IllegalStateException(
String.format(
"Snapshot %s/%s path is incorrect: %s.",
snapshotRef.getNamespace(), snapshotRef.getName(), path));
Expand Down Expand Up @@ -266,21 +253,21 @@ public static FlinkStateSnapshotReference createReferenceForUpgradeSavepoint(
AbstractFlinkResource<?, ?> flinkResource,
SavepointFormatType savepointFormatType,
String savepointPath) {
if (isSnapshotResourceEnabled(operatorConf, conf)) {
var snapshot =
createSavepointResource(
kubernetesClient,
flinkResource,
savepointPath,
SnapshotTriggerType.UPGRADE,
savepointFormatType,
conf.get(
KubernetesOperatorConfigOptions
.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE));
return FlinkStateSnapshotReference.fromResource(snapshot);
} else {
if (!isSnapshotResourceEnabled(operatorConf, conf)) {
return FlinkStateSnapshotReference.fromPath(savepointPath);
}

var disposeOnDelete =
conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE);
var snapshot =
createSavepointResource(
kubernetesClient,
flinkResource,
savepointPath,
SnapshotTriggerType.UPGRADE,
savepointFormatType,
disposeOnDelete);
return FlinkStateSnapshotReference.fromResource(snapshot);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public String triggerSavepoint(
@Override
public String triggerCheckpoint(
String jobId,
org.apache.flink.core.execution.CheckpointType checkpointFormatType,
org.apache.flink.core.execution.CheckpointType checkpointType,
Configuration conf) {
var triggerId = "checkpoint_trigger_" + checkpointTriggerCounter++;
checkpointTriggers.put(triggerId, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ public void testReconcileJobNotFound() {

// observe phase triggers event for snapshot abandoned, then validation will also trigger an
// event.
assertThat(flinkStateSnapshotEventCollector.events).hasSize(2);
assertThat(flinkStateSnapshotEventCollector.events).hasSize(1);
assertThat(flinkStateSnapshotEventCollector.events.get(0))
.satisfies(
event -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ public class FlinkStateSnapshotUtilsTest {
private static final String SAVEPOINT_PATH = "/tmp/savepoint-01";

@Test
public void testGetAndValidateFlinkStateSnapshotPathPathGiven() {
public void testGetValidatedFlinkStateSnapshotPathPathGiven() {
var snapshotRef = FlinkStateSnapshotReference.builder().path(SAVEPOINT_PATH).build();
var snapshotResult =
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(client, snapshotRef);
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
assertEquals(SAVEPOINT_PATH, snapshotResult);
}

@Test
public void testGetAndValidateFlinkStateSnapshotPathFoundResource() {
public void testGetValidatedFlinkStateSnapshotPathFoundResource() {
var snapshot = initSavepoint(COMPLETED, null);
client.resource(snapshot).create();

Expand All @@ -88,23 +88,23 @@ public void testGetAndValidateFlinkStateSnapshotPathFoundResource() {
.name(SAVEPOINT_NAME)
.build();
var snapshotResult =
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(client, snapshotRef);
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
assertEquals(SAVEPOINT_PATH, snapshotResult);
}

@Test
public void testGetAndValidateFlinkStateSnapshotPathInvalidName() {
public void testGetValidatedFlinkStateSnapshotPathInvalidName() {
var snapshotRef =
FlinkStateSnapshotReference.builder().namespace(NAMESPACE).name(" ").build();
assertThrows(
IllegalArgumentException.class,
IllegalStateException.class,
() ->
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
client, snapshotRef));
}

@Test
public void testGetAndValidateFlinkStateSnapshotPathNotFound() {
public void testGetValidatedFlinkStateSnapshotPathNotFound() {
var snapshotRef =
FlinkStateSnapshotReference.builder()
.namespace("not-exists")
Expand All @@ -113,7 +113,7 @@ public void testGetAndValidateFlinkStateSnapshotPathNotFound() {
assertThrows(
IllegalArgumentException.class,
() ->
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
client, snapshotRef));
}

Expand All @@ -129,12 +129,12 @@ public void testGetAndValidateFlinkStateSnapshotAlreadyExists() {
.name(SAVEPOINT_NAME)
.build();
var snapshotResult =
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(client, snapshotRef);
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef);
assertEquals(SAVEPOINT_PATH, snapshotResult);
}

@Test
public void testGetAndValidateFlinkStateSnapshotPathNotCompleted() {
public void testGetValidatedFlinkStateSnapshotPathNotCompleted() {
var snapshot = initSavepoint(IN_PROGRESS, null);
client.resource(snapshot).create();

Expand All @@ -144,9 +144,9 @@ public void testGetAndValidateFlinkStateSnapshotPathNotCompleted() {
.name(SAVEPOINT_NAME)
.build();
assertThrows(
IllegalArgumentException.class,
IllegalStateException.class,
() ->
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(
client, snapshotRef));
}

Expand Down
Loading

0 comments on commit cd3fca7

Please sign in to comment.