Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart Ingestion Job on code version update #949

Merged
merged 7 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public ConsolidatedJobStrategy(JobRepository jobRepository) {
}

@Override
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
public Job getOrCreateJob(
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels) {
return jobRepository
.findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source, null, JobStatus.getTerminalStates())
Expand All @@ -55,6 +56,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> store
.setStores(
stores.stream()
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
.setLabels(labels)
.build());
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/feast/core/job/JobGroupingStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import feast.core.model.Job;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -29,7 +30,8 @@
*/
public interface JobGroupingStrategy {
/** Get the non terminated ingestion job ingesting for given source and stores. */
Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores);
Job getOrCreateJob(
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels);
/** Create unique JobId that would be used as key in communications with JobRunner */
String createJobId(Job job);
/* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/feast/core/job/JobPerStoreStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import feast.proto.core.StoreProto;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -42,7 +43,8 @@ public JobPerStoreStrategy(JobRepository jobRepository) {
}

@Override
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
public Job getOrCreateJob(
SourceProto.Source source, Set<StoreProto.Store> stores, Map<String, String> labels) {
ArrayList<StoreProto.Store> storesList = Lists.newArrayList(stores);
if (storesList.size() != 1) {
throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy");
Expand All @@ -60,6 +62,7 @@ public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> store
.setStores(
stores.stream()
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
.setLabels(labels)
.build());
}

Expand Down
32 changes: 22 additions & 10 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ public static DataflowJobManager of(
Map<String, String> jobSelector,
Dataflow dataflow) {

// Retrieve labels to extend them with jobSelector
Map<String, String> jobLabels = new HashMap<>(runnerConfigOptions.getLabelsMap());
// Merge Job Selector Labels into runner options
jobSelector.forEach(jobLabels::put);
runnerConfigOptions = runnerConfigOptions.toBuilder().putAllLabels(jobLabels).build();

defaultOptions = new DataflowRunnerConfig(runnerConfigOptions);
this.dataflow = dataflow;
this.metrics = metricsProperties;
Expand Down Expand Up @@ -130,7 +124,11 @@ public Job startJob(Job job) {
try {
String extId =
submitDataflowJob(
job.getId(), job.getSource(), new HashSet<>(job.getStores().values()), false);
job.getId(),
job.getSource(),
new HashSet<>(job.getStores().values()),
job.getLabels(),
false);
job.setExtId(extId);
return job;

Expand Down Expand Up @@ -315,9 +313,13 @@ public List<Job> listRunningJobs() {
}

private String submitDataflowJob(
String jobName, SourceProto.Source source, Set<StoreProto.Store> sinks, boolean update) {
String jobName,
SourceProto.Source source,
Set<StoreProto.Store> sinks,
Map<String, String> labels,
boolean update) {
try {
ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, update);
ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, labels, update);
DataflowPipelineJob pipelineResult = runPipeline(pipelineOptions);
String jobId = waitForJobToRun(pipelineResult);
return jobId;
Expand All @@ -328,7 +330,11 @@ private String submitDataflowJob(
}

private ImportOptions getPipelineOptions(
String jobName, SourceProto.Source source, Set<StoreProto.Store> sinks, boolean update)
String jobName,
SourceProto.Source source,
Set<StoreProto.Store> sinks,
Map<String, String> labels,
boolean update)
throws IOException, IllegalAccessException {
ImportOptions pipelineOptions =
PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);
Expand All @@ -347,6 +353,12 @@ private ImportOptions getPipelineOptions(
pipelineOptions.setJobName(jobName);
pipelineOptions.setFilesToStage(
detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader()));

// Merge common labels with job's labels
Map<String, String> mergedLabels = new HashMap<>(defaultOptions.getLabels());
labels.forEach(mergedLabels::put);
pipelineOptions.setLabels(mergedLabels);

if (metrics.isEnabled()) {
pipelineOptions.setMetricsExporterType(metrics.getType());
if (metrics.getType().equals("statsd")) {
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ public JobStatus getStatus() {
public abstract Map<FeatureSetReference, FeatureSetDeliveryStatus>
getFeatureSetDeliveryStatuses();

// Job's labels
public abstract Map<String, String> getLabels();

public static Builder builder() {
return new AutoValue_Job.Builder()
.setFeatureSetDeliveryStatuses(new HashMap<>())
.setStores(new HashMap<>());
.setStores(new HashMap<>())
.setLabels(new HashMap<>());
}

@AutoValue.Builder
Expand All @@ -70,6 +74,8 @@ public interface Builder {
Builder setFeatureSetDeliveryStatuses(
Map<FeatureSetReference, FeatureSetDeliveryStatus> statuses);

Builder setLabels(Map<String, String> labels);

Job build();
}

Expand Down Expand Up @@ -164,12 +170,13 @@ public IngestionJobProto.IngestionJob toProto() {
return ingestJob;
}

public Job cloneWithId(String newJobId) {
public Job cloneWithIdAndLabels(String newJobId, Map<String, String> labels) {
return Job.builder()
.setSource(this.getSource())
.setFeatureSetDeliveryStatuses(new HashMap<>(this.getFeatureSetDeliveryStatuses()))
.setStores(new HashMap<>(this.getStores()))
.setId(newJobId)
.setLabels(labels)
.build();
}

Expand Down
23 changes: 18 additions & 5 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
public class JobCoordinatorService {

private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5;
public static final String VERSION_LABEL = "feast_version";

private final JobRepository jobRepository;
private final SpecService specService;
Expand All @@ -68,6 +69,8 @@ public class JobCoordinatorService {
private final KafkaTemplate<String, FeatureSetSpec> specPublisher;
private final List<Store.Subscription> featureSetSubscriptions;
private final List<String> whitelistedStores;
private final Map<String, String> jobLabels;
private final String currentVersion;

@Autowired
public JobCoordinatorService(
Expand All @@ -88,6 +91,9 @@ public JobCoordinatorService(
.map(JobProperties.CoordinatorProperties.FeatureSetSelector::toSubscription)
.collect(Collectors.toList());
this.whitelistedStores = feastProperties.getJobs().getCoordinator().getWhitelistedStores();
this.currentVersion = feastProperties.getVersion();
this.jobLabels = new HashMap<>(feastProperties.getJobs().getCoordinator().getJobSelector());
this.jobLabels.put(VERSION_LABEL, this.currentVersion);
}

/**
Expand Down Expand Up @@ -161,7 +167,7 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
Source source = mapping.getKey();
Set<Store> stores = mapping.getValue();

Job job = groupingStrategy.getOrCreateJob(source, stores);
Job job = groupingStrategy.getOrCreateJob(source, stores, this.jobLabels);

if (job.isDeployed()) {
if (!job.isRunning()) {
Expand All @@ -177,7 +183,7 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
// it would make sense to spawn clone of current job
// and terminate old version on the next Poll.
// Both jobs should be in the same consumer group and not conflict with each other
job = job.cloneWithId(groupingStrategy.createJobId(job));
job = job.cloneWithIdAndLabels(groupingStrategy.createJobId(job), this.jobLabels);
job.addAllStores(stores);

isSafeToStopJobs = false;
Expand Down Expand Up @@ -214,8 +220,9 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
/**
* Decides whether we need to upgrade (restart) given job. Since we send updated FeatureSets to
* IngestionJob via Kafka, and there's only one source per job (if it change - new job would be
* created) the only things that can cause upgrade here are stores: new stores can be added, or
* existing stores will change subscriptions.
* created) main trigger that can cause upgrade here are stores: new stores can be added, or
* existing stores will change subscriptions. Another trigger is release of new version: current
* version is being compared with job's version stored in labels.
*
* @param job {@link Job} to check
* @param stores Set of {@link Store} new version of stores (vs current version job.getStores())
Expand All @@ -227,6 +234,10 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
return true;
}

if (!this.currentVersion.equals(job.getLabels().get(VERSION_LABEL))) {
return true;
}

return false;
}

Expand Down Expand Up @@ -257,7 +268,9 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {

// Add featureSet to allocated job if not allocated before
for (Pair<Source, Set<Store>> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) {
Job job = groupingStrategy.getOrCreateJob(jobArgs.getLeft(), jobArgs.getRight());
Job job =
groupingStrategy.getOrCreateJob(
jobArgs.getLeft(), jobArgs.getRight(), Collections.emptyMap());
if (!job.isRunning()) {
continue;
}
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/java/feast/core/service/JobCoordinatorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
import static org.hamcrest.core.AllOf.allOf;

Expand Down Expand Up @@ -60,6 +61,7 @@
"feast.jobs.coordinator.feature-set-selector[0].project=default",
"feast.jobs.coordinator.whitelisted-stores[0]=test-store",
"feast.jobs.coordinator.whitelisted-stores[1]=new-store",
"feast.version=1.0.0"
})
public class JobCoordinatorIT extends BaseIT {
@Autowired private FakeJobManager jobManager;
Expand Down Expand Up @@ -188,6 +190,33 @@ public void shouldNotCreateJobForUnwantedFeatureSet() {
assertThat(jobManager.getAllJobs(), hasSize(0));
}

@Test
@SneakyThrows
public void shouldRestartJobWithOldVersion() {
apiClient.simpleApplyFeatureSet(
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test"));

Job job =
Job.builder()
.setSource(DataGenerator.getDefaultSource())
.setStores(
ImmutableMap.of(
DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore()))
.setId("some-running-id")
.setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "0.9.9"))
.build();

jobManager.startJob(job);
jobRepository.add(job);

await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED));

Job replacement = jobRepository.findByStatus(JobStatus.RUNNING).get(0);
assertThat(replacement.getSource(), equalTo(job.getSource()));
assertThat(replacement.getStores(), equalTo(job.getStores()));
assertThat(replacement.getLabels(), hasEntry(JobCoordinatorService.VERSION_LABEL, "1.0.0"));
}

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Nested
class SpecNotificationFlow extends SequentialFlow {
Expand All @@ -212,6 +241,7 @@ public void shouldSendNewSpec() {
ImmutableMap.of(
DataGenerator.getDefaultStore().getName(), DataGenerator.getDefaultStore()))
.setId("some-running-id")
.setLabels(ImmutableMap.of(JobCoordinatorService.VERSION_LABEL, "1.0.0"))
.build();

jobManager.startJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ public void setUp() {
coordinatorProperties.setFeatureSetSelector(ImmutableList.of(selector));
coordinatorProperties.setWhitelistedStores(
ImmutableList.of("test-store", "test", "test-1", "test-2", "normal-store"));
coordinatorProperties.setJobSelector(ImmutableMap.of("application", "feast"));

jobProperties.setCoordinator(coordinatorProperties);
feastProperties.setJobs(jobProperties);
feastProperties.setVersion("1.0.0");

TestUtil.setupAuditLogger();

Expand Down