Skip to content

Commit

Permalink
[FLINK-33803]remove metadata/generation fields from ReconciliationMet…
Browse files Browse the repository at this point in the history
…adata
  • Loading branch information
angelachenn committed Aug 22, 2024
1 parent 249a694 commit cb2af0a
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;

import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -35,19 +34,15 @@ public class ReconciliationMetadata {

private String apiVersion;

private ObjectMeta metadata;

private boolean firstDeployment;

public static ReconciliationMetadata from(AbstractFlinkResource<?, ?> resource) {
ObjectMeta metadata = new ObjectMeta();
metadata.setGeneration(resource.getMetadata().getGeneration());

var firstDeploy =
resource.getStatus().getReconciliationStatus().isBeforeFirstDeployment()
|| isFirstDeployment(resource);

return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy);
return new ReconciliationMetadata(resource.getApiVersion(), firstDeploy);
}

private static boolean isFirstDeployment(AbstractFlinkResource<?, ?> resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public static <T extends AbstractFlinkSpec> SpecWithMeta<T> deserializeSpecWithM
try {
ObjectNode wrapper = (ObjectNode) objectMapper.readTree(specWithMetaString);
ObjectNode internalMeta = (ObjectNode) wrapper.remove(INTERNAL_METADATA_JSON_KEY);

if (internalMeta == null) {
// migrating from old format
wrapper.remove("apiVersion");
return new SpecWithMeta<>(objectMapper.treeToValue(wrapper, specClass), null);
} else {
internalMeta.remove("metadata");
return new SpecWithMeta<>(
objectMapper.treeToValue(wrapper.get("spec"), specClass),
objectMapper.convertValue(internalMeta, ReconciliationMetadata.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ public class SpecUtilsTest {
@Test
public void testSpecSerializationWithVersion() throws JsonProcessingException {
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
app.getMetadata().setGeneration(12L);
String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);

ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
assertEquals(12L, internalMeta.get("metadata").get("generation").asLong());
assertEquals(
app.getSpec(),
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
Expand All @@ -56,4 +54,26 @@ public void testSpecSerializationWithVersion() throws JsonProcessingException {
migrated.getSpec().getJob().getJarURI());
assertNull(migrated.getMeta());
}

@Test
public void testSpecSerializationWithoutGeneration() throws JsonProcessingException {
// with regards to ReconcialiationMetadata & SpecWithMeta
FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
app.getMetadata().setGeneration(12L);
String serialized = SpecUtils.writeSpecWithMeta(app.getSpec(), app);
ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);

ObjectNode internalMeta = (ObjectNode) node.get(SpecUtils.INTERNAL_METADATA_JSON_KEY);
assertEquals("flink.apache.org/v1beta1", internalMeta.get("apiVersion").asText());
assertEquals(
app.getSpec(),
SpecUtils.deserializeSpecWithMeta(serialized, FlinkDeploymentSpec.class).getSpec());
assertNull(app.getStatus().getObservedGeneration());

// test backward compatibility
String oldSerialized =
"{\"apiVersion\":\"flink.apache.org/v1beta1\",\"metadata\":{\"generation\":5},\"firstDeployment\":false}";
var migrated = SpecUtils.deserializeSpecWithMeta(oldSerialized, FlinkDeploymentSpec.class);
assertNull(migrated.getMeta());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ public void observeAlreadyUpgraded() {
status.getJobManagerDeploymentStatus());

var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
assertEquals(321L, status.getObservedGeneration());
assertEquals(JobState.RUNNING, specWithMeta.getSpec().getJob().getState());
assertEquals(5, specWithMeta.getSpec().getJob().getParallelism());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void observeAlreadyUpgraded() {
status.getJobManagerDeploymentStatus());

var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
assertEquals(321L, specWithMeta.getMeta().getMetadata().getGeneration());
assertEquals(321L, status.getObservedGeneration());
assertEquals("1", specWithMeta.getSpec().getFlinkConfiguration().get("k"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1124,30 +1124,18 @@ public void testUpgradeReconciledGeneration() throws Exception {
reconciler.reconcile(deployment, context);
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());

assertEquals(
1L,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpecWithMeta()
.getMeta()
.getMetadata()
.getGeneration());
assertEquals(1L, deployment.getStatus().getObservedGeneration());

// Submit no-op upgrade
deployment.getSpec().getFlinkConfiguration().put("kubernetes.operator.test", "value");
deployment.getMetadata().setGeneration(2L);
deployment
.getStatus()
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);

reconciler.reconcile(deployment, context);
assertEquals(
2L,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpecWithMeta()
.getMeta()
.getMetadata()
.getGeneration());
assertEquals(2L, deployment.getStatus().getObservedGeneration());
}

@ParameterizedTest
Expand Down

0 comments on commit cb2af0a

Please sign in to comment.