diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index bd7da010ea..d670aa3972 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -295,6 +295,12 @@ public VenicePushJob(String jobId, Properties vanillaProps) { Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData())); sharedTmpDir = new Path(pushJobSetting.sharedTmpDir); jobTmpDir = new Path(pushJobSetting.jobTmpDir); + String pushId = + pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url"); + if (pushJobSetting.isSourceKafka) { + pushId = pushJobSetting.repushTTLEnabled ? Version.generateTTLRePushId(pushId) : Version.generateRePushId(pushId); + } + pushJobDetails.pushId = pushId; } // This is a part of the public API. There is value in exposing this to users of VenicePushJob for reporting purposes @@ -521,7 +527,6 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { Validate.isAssignableFrom(DataWriterComputeJob.class, objectClass); pushJobSettingToReturn.dataWriterComputeJobClass = objectClass; } - return pushJobSettingToReturn; } @@ -754,10 +759,7 @@ public void run() { } Optional optionalCompressionDictionary = getCompressionDictionary(); - String pushId = - pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url"); if (pushJobSetting.isSourceKafka) { - pushId = Version.generateRePushId(pushId); if (pushJobSetting.sourceKafkaInputVersionInfo.getHybridStoreConfig() != null && pushJobSetting.rewindTimeInSecondsOverride == NOT_SET) { pushJobSetting.rewindTimeInSecondsOverride = DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE; @@ -784,12 +786,11 @@ public void run() { pushJobSetting, inputDataInfo.getInputFileDataSizeInBytes(), controllerClient, - pushId, + pushJobDetails.pushId.toString(), props, optionalCompressionDictionary); updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.NEW_VERSION_CREATED); // Update and send push job details with new info to the controller - pushJobDetails.pushId = pushId; pushJobDetails.partitionCount = pushJobSetting.partitionCount; pushJobDetails.valueCompressionStrategy = pushJobSetting.topicCompressionStrategy != null ? pushJobSetting.topicCompressionStrategy.getValue() @@ -1594,7 +1595,6 @@ private void initPushJobDetails() { pushJobDetails.clusterName = pushJobSetting.clusterName; pushJobDetails.overallStatus = new ArrayList<>(); pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.STARTED.getValue())); - pushJobDetails.pushId = ""; pushJobDetails.partitionCount = -1; pushJobDetails.valueCompressionStrategy = CompressionStrategy.NO_OP.getValue(); pushJobDetails.chunkingEnabled = false; diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index 9391b16972..2314956959 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -174,6 +174,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertTrue(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1); + Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString())); } // Test with explicit TTL start timestamp @@ -184,6 +185,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertTrue(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, 100); + Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString())); } // Test with explicit TTL age @@ -217,6 +219,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertFalse(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1); + Assert.assertTrue(Version.isPushIdRePush(pushJob.getPushJobDetails().getPushId().toString())); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java index 406422e8ed..c509201314 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java @@ -36,6 +36,8 @@ public interface Version extends Comparable, DataModelBackedStructure addVersion( version.setViewConfigs(store.getViewConfigs()); - if (repushSourceVersion > NON_EXISTING_VERSION) { + if (isRepush && repushSourceVersion > NON_EXISTING_VERSION) { version.setRepushSourceVersion(repushSourceVersion); }