From 2b82a9343140515646f823d85146e3e850bcb764 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Tue, 9 Jul 2024 11:32:32 +0200 Subject: [PATCH] [FLINK-35831] Rotate jobId for both savepoint and stateless deploys --- .../deployment/ApplicationReconciler.java | 12 ++++----- .../deployment/ApplicationReconcilerTest.java | 25 ++++++++++++++++--- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 5f5aedba27..8621c50b9c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -175,7 +175,8 @@ public void deploy( statusRecorder.patchAndCacheStatus(relatedResource, ctx.getKubernetesClient()); } - setJobIdIfNecessary(spec, relatedResource, deployConfig, ctx.getKubernetesClient()); + setJobIdIfNecessary( + relatedResource, deployConfig, ctx.getKubernetesClient(), requireHaMetadata); eventRecorder.triggerEvent( relatedResource, @@ -193,10 +194,10 @@ public void deploy( } private void setJobIdIfNecessary( - FlinkDeploymentSpec spec, FlinkDeployment resource, Configuration deployConfig, - KubernetesClient client) { + KubernetesClient client, + boolean lastStateDeploy) { // The jobId assigned by Flink would be constant, // overwrite to avoid checkpoint path conflicts. // https://issues.apache.org/jira/browse/FLINK-19358 @@ -208,9 +209,8 @@ private void setJobIdIfNecessary( } var status = resource.getStatus(); - // generate jobId initially or rotate on every deployment when mode is stateless - if (status.getJobStatus().getJobId() == null - || spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) { + // Rotate job id when not last-state deployment + if (status.getJobStatus().getJobId() == null || !lastStateDeploy) { String jobId = JobID.generate().toHexString(); // record before first deployment to ensure we use it on any retry status.getJobStatus().setJobId(jobId); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index ec323babe6..6f9e17fd8d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -230,7 +230,15 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception { .isFirstDeployment()); JobID jobId = runningJobs.get(0).f1.getJobId(); - verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); + + // Last state upgrade + FlinkDeployment lastStateUpgrade = ReconciliationUtils.clone(deployment); + getJobSpec(lastStateUpgrade).setUpgradeMode(UpgradeMode.LAST_STATE); + lastStateUpgrade.getSpec().setRestartNonce(1234L); + reconciler.reconcile(deployment, context); + reconciler.reconcile(deployment, context); + // Make sure jobId is rotated on last-state startup + verifyJobId(lastStateUpgrade, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); // Test stateless upgrade FlinkDeployment statelessUpgrade = ReconciliationUtils.clone(deployment); @@ -284,7 +292,10 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception { SnapshotTriggerType.UPGRADE, getSavepointInfo(statefulUpgrade).getLastSavepoint().getTriggerType()); assertEquals(SnapshotStatus.SUCCEEDED, getLastSnapshotStatus(statefulUpgrade, SAVEPOINT)); - verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); + + // Make sure jobId rotated on savepoint + verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); + jobId = runningJobs.get(0).f1.getJobId(); getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE); deployment.getSpec().setRestartNonce(100L); @@ -325,7 +336,8 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception { assertEquals(1, flinkService.getRunningCount()); assertEquals("finished_sp", runningJobs.get(0).f0); - verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); + // Make sure jobId rotated on savepoint + verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); } private void verifyJobId( @@ -335,6 +347,13 @@ private void verifyJobId( assertEquals(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId.toHexString()); } + private void verifyNewJobId(JobStatusMessage status, Configuration conf, JobID jobId) { + assertNotEquals(jobId.toHexString(), status.getJobId()); + assertEquals( + conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), + status.getJobId().toHexString()); + } + @NotNull private static Savepoint savepointFromSavepointInfo( SavepointInfo savepointInfo, Long savepointTriggerNonce) {