Skip to content

Commit

Permalink
spawn job on upgrade (#828)
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleksii Moskalenko committed Jun 25, 2020
1 parent ff95d68 commit a894044
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 6 deletions.
12 changes: 12 additions & 0 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce
return ingestJob;
}

public Job clone() {
return Job.builder()
.setStores(getStores())
.setStoreName(getStoreName())
.setSourceConfig(getSourceConfig())
.setSourceType(getSourceType())
.setFeatureSetJobStatuses(new HashSet<>())
.setRunner(getRunner())
.setStatus(JobStatus.UNKNOWN)
.build();
}

@Override
public int hashCode() {
return Objects.hash(getSource(), this.stores, this.runner);
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,18 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
continue;
}

if (jobRequiresUpgrade(job, stores)) {
if (jobRequiresUpgrade(job, stores) && job.isRunning()) {
// Since we want to upgrade job without downtime
// it would make sense to spawn clone of current job
// and terminate old version on the next Poll.
// Both jobs should be in the same consumer group and not conflict with each other
job = job.clone();
job.setId(groupingStrategy.createJobId(job));
job.setStores(stores);

jobTasks.add(UpgradeJobTask.builder().setJob(job).setJobManager(jobManager).build());
isSafeToStopJobs = false;

jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build());
} else {
jobTasks.add(UpdateJobStatusTask.builder().setJob(job).setJobManager(jobManager).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED;
import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

Expand Down Expand Up @@ -723,6 +723,7 @@ public void shouldUpgradeJobWhenNeeded() {
Job.builder()
.setStatus(JobStatus.RUNNING)
.setFeatureSetJobStatuses(new HashSet<>())
.setSource(source)
.setStores(new HashSet<>())
.setExtId("extId")
.build();
Expand All @@ -736,7 +737,7 @@ public void shouldUpgradeJobWhenNeeded() {
jcsWithConsolidation.makeJobUpdateTasks(
ImmutableList.of(Pair.of(source, ImmutableSet.of(store))));

assertThat("UpgradeTask is expected", tasks.get(0) instanceof UpgradeJobTask);
assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask);
}

@Test
Expand Down Expand Up @@ -826,6 +827,61 @@ public void shouldCreateJobPerStore() throws InvalidProtocolBufferException {
String.format("kafka-%d-to-test-2", Objects.hashCode(source.getConfig()))))));
}

@Test
public void shouldCloneRunningJobOnUpgrade() throws InvalidProtocolBufferException {
Store store1 =
TestUtil.createStore(
"test-1", List.of(Subscription.newBuilder().setName("*").setProject("*").build()));
Store store2 =
TestUtil.createStore(
"test-2", List.of(Subscription.newBuilder().setName("*").setProject("*").build()));

Source source = TestUtil.createKafkaSource("kafka", "topic", false);

when(specService.listStores(any()))
.thenReturn(
ListStoresResponse.newBuilder()
.addStore(store1.toProto())
.addStore(store2.toProto())
.build());

when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "%"))
.thenReturn(ImmutableList.of(TestUtil.createEmptyFeatureSet("fs", source)));

Job existingJob =
Job.builder()
.setStores(ImmutableSet.of(store1))
.setSource(source)
.setExtId("extId")
.setId("some-id")
.setStatus(JobStatus.RUNNING)
.build();

when(jobRepository
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
eq(source.getType()),
eq(source.getConfig()),
any(),
eq(JobStatus.getTerminalStates())))
.thenReturn(Optional.of(existingJob));

when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW);

jcsWithConsolidation.Poll();

ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);

// not stopped yet
verify(jobManager, never()).abortJob(any());

verify(jobManager, times(1)).startJob(jobCaptor.capture());

Job actual = jobCaptor.getValue();
assertThat(actual.getId(), not("some-id"));
assertThat(actual.getSource(), equalTo(existingJob.getSource()));
assertThat(actual.getStores(), containsInAnyOrder(store1, store2));
}

private ConsumerRecord<String, IngestionJobProto.FeatureSetSpecAck> newAckMessage(
String key, int version, String jobName) {
return new ConsumerRecord<>(
Expand Down

0 comments on commit a894044

Please sign in to comment.