diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 3df1d49c23..1b6b28fe4b 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -158,6 +158,18 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce return ingestJob; } + public Job clone() { + return Job.builder() + .setStores(getStores()) + .setStoreName(getStoreName()) + .setSourceConfig(getSourceConfig()) + .setSourceType(getSourceType()) + .setFeatureSetJobStatuses(new HashSet<>()) + .setRunner(getRunner()) + .setStatus(JobStatus.UNKNOWN) + .build(); + } + @Override public int hashCode() { return Objects.hash(getSource(), this.stores, this.runner); diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 8451dc9e51..e134396c3f 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -166,10 +166,18 @@ List makeJobUpdateTasks(Iterable>> sourceToStor continue; } - if (jobRequiresUpgrade(job, stores)) { + if (jobRequiresUpgrade(job, stores) && job.isRunning()) { + // Since we want to upgrade job without downtime + // it would make sense to spawn clone of current job + // and terminate old version on the next Poll. + // Both jobs should be in the same consumer group and not conflict with each other + job = job.clone(); + job.setId(groupingStrategy.createJobId(job)); job.setStores(stores); - jobTasks.add(UpgradeJobTask.builder().setJob(job).setJobManager(jobManager).build()); + isSafeToStopJobs = false; + + jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build()); } else { jobTasks.add(UpdateJobStatusTask.builder().setJob(job).setJobManager(jobManager).build()); } diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 83d3482a9a..2a4fc24693 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -18,14 +18,14 @@ import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED; import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; import static org.hamcrest.core.StringContains.containsString; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; @@ -723,6 +723,7 @@ public void shouldUpgradeJobWhenNeeded() { Job.builder() .setStatus(JobStatus.RUNNING) .setFeatureSetJobStatuses(new HashSet<>()) + .setSource(source) .setStores(new HashSet<>()) .setExtId("extId") .build(); @@ -736,7 +737,7 @@ public void shouldUpgradeJobWhenNeeded() { jcsWithConsolidation.makeJobUpdateTasks( ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); - assertThat("UpgradeTask is expected", tasks.get(0) instanceof UpgradeJobTask); + assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask); } @Test @@ -826,6 +827,61 @@ public void shouldCreateJobPerStore() throws InvalidProtocolBufferException { String.format("kafka-%d-to-test-2", Objects.hashCode(source.getConfig())))))); } + @Test + public void shouldCloneRunningJobOnUpgrade() throws InvalidProtocolBufferException { + Store store1 = + TestUtil.createStore( + "test-1", List.of(Subscription.newBuilder().setName("*").setProject("*").build())); + Store store2 = + TestUtil.createStore( + "test-2", List.of(Subscription.newBuilder().setName("*").setProject("*").build())); + + Source source = TestUtil.createKafkaSource("kafka", "topic", false); + + when(specService.listStores(any())) + .thenReturn( + ListStoresResponse.newBuilder() + .addStore(store1.toProto()) + .addStore(store2.toProto()) + .build()); + + when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "%")) + .thenReturn(ImmutableList.of(TestUtil.createEmptyFeatureSet("fs", source))); + + Job existingJob = + Job.builder() + .setStores(ImmutableSet.of(store1)) + .setSource(source) + .setExtId("extId") + .setId("some-id") + .setStatus(JobStatus.RUNNING) + .build(); + + when(jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + eq(source.getType()), + eq(source.getConfig()), + any(), + eq(JobStatus.getTerminalStates()))) + .thenReturn(Optional.of(existingJob)); + + when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); + + jcsWithConsolidation.Poll(); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + // not stopped yet + verify(jobManager, never()).abortJob(any()); + + verify(jobManager, times(1)).startJob(jobCaptor.capture()); + + Job actual = jobCaptor.getValue(); + assertThat(actual.getId(), not("some-id")); + assertThat(actual.getSource(), equalTo(existingJob.getSource())); + assertThat(actual.getStores(), containsInAnyOrder(store1, store2)); + } + private ConsumerRecord newAckMessage( String key, int version, String jobName) { return new ConsumerRecord<>(