Skip to content

Commit

Permalink
JobCoordinator can be configured to use only subset of FeatureSets or…
Browse files Browse the repository at this point in the history
… Stores (#947)
  • Loading branch information
pyalex committed Aug 14, 2020
1 parent afccfbf commit 52c17a3
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 12 deletions.
25 changes: 25 additions & 0 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import feast.common.logging.config.LoggingProperties;
import feast.common.validators.OneOfStrings;
import feast.core.config.FeastProperties.StreamProperties.FeatureStreamOptions;
import feast.proto.core.StoreProto;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -114,6 +115,30 @@ public static class CoordinatorProperties {

/* Labels to identify jobs managed by this job coordinator */
private Map<String, String> jobSelector = new HashMap<>();

/* Selectors to define featureSets that are responsibility of current JobManager */
private List<FeatureSetSelector> featureSetSelector = new ArrayList<>();

/* Specify names of stores that must be used by current JobManager */
private List<String> whitelistedStores = new ArrayList<>();

/**
* Similarly to Store's subscription this selector defines set of FeatureSets. All FeatureSets
* that match both project and name will be selected. Project and name may use *
*/
@Getter
@Setter
public static class FeatureSetSelector {
private String project;
private String name;

public StoreProto.Store.Subscription toSubscription() {
return StoreProto.Store.Subscription.newBuilder()
.setName(this.name)
.setProject(this.project)
.build();
}
}
}

/** List of configured job runners. */
Expand Down
22 changes: 19 additions & 3 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class JobCoordinatorService {
private final JobProperties jobProperties;
private final JobGroupingStrategy groupingStrategy;
private final KafkaTemplate<String, FeatureSetSpec> specPublisher;
private final List<Store.Subscription> featureSetSubscriptions;
private final List<String> whitelistedStores;

@Autowired
public JobCoordinatorService(
Expand All @@ -81,6 +83,11 @@ public JobCoordinatorService(
this.jobProperties = feastProperties.getJobs();
this.specPublisher = specPublisher;
this.groupingStrategy = groupingStrategy;
this.featureSetSubscriptions =
feastProperties.getJobs().getCoordinator().getFeatureSetSelector().stream()
.map(JobProperties.CoordinatorProperties.FeatureSetSelector::toSubscription)
.collect(Collectors.toList());
this.whitelistedStores = feastProperties.getJobs().getCoordinator().getWhitelistedStores();
}

/**
Expand Down Expand Up @@ -290,7 +297,9 @@ private Collection<Job> getExtraJobs(List<Job> keepJobs) {

private List<Store> getAllStores() {
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
return listStoresResponse.getStoreList();
return listStoresResponse.getStoreList().stream()
.filter(s -> this.whitelistedStores.contains(s.getName()))
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -319,7 +328,7 @@ Iterable<Pair<Source, Set<Store>>> getSourceToStoreMappings() {
* @param store to get subscribed FeatureSets for
* @return list of FeatureSets that the store subscribes to.
*/
private List<FeatureSet> getFeatureSetsForStore(Store store) {
List<FeatureSet> getFeatureSetsForStore(Store store) {
return store.getSubscriptionsList().stream()
.flatMap(
subscription -> {
Expand All @@ -330,7 +339,14 @@ private List<FeatureSet> getFeatureSetsForStore(Store store) {
.setProject(subscription.getProject())
.setFeatureSetName(subscription.getName())
.build())
.getFeatureSetsList().stream();
.getFeatureSetsList().stream()
.filter(
f ->
this.featureSetSubscriptions.isEmpty()
|| isSubscribedToFeatureSet(
this.featureSetSubscriptions,
f.getSpec().getProject(),
f.getSpec().getName()));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
String.format(
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ feast:
jobSelector:
application: feast

# Specify feature sets that should be handled by current instance of JobManager
featureSetSelector:
- project: "*"
name: "*"
# Stores names that are enabled on current instance of JobManager
whitelisted-stores:
- online
- online_cluster
- historical

stream:
# Feature stream type. Only kafka is supported.
type: kafka
Expand Down
27 changes: 24 additions & 3 deletions core/src/test/java/feast/core/service/JobCoordinatorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@
"feast.jobs.enabled=true",
"feast.jobs.polling_interval_milliseconds=1000",
"feast.stream.specsOptions.notifyIntervalMilliseconds=100",
"feast.jobs.coordinator.consolidate-jobs-per-source=true"
"feast.jobs.coordinator.consolidate-jobs-per-source=true",
"feast.jobs.coordinator.feature-set-selector[0].name=test",
"feast.jobs.coordinator.feature-set-selector[0].project=default",
"feast.jobs.coordinator.whitelisted-stores[0]=test-store",
"feast.jobs.coordinator.whitelisted-stores[1]=new-store",
})
public class JobCoordinatorIT extends BaseIT {
@Autowired private FakeJobManager jobManager;
Expand Down Expand Up @@ -127,7 +131,7 @@ public void shouldCreateJobForNewSource() {
@Test
public void shouldUpgradeJobWhenStoreChanged() {
apiClient.simpleApplyFeatureSet(
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project", "test"));
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test"));

await().until(jobManager::getAllJobs, hasSize(1));

Expand All @@ -151,7 +155,7 @@ public void shouldUpgradeJobWhenStoreChanged() {
@Test
public void shouldRestoreJobThatStopped() {
apiClient.simpleApplyFeatureSet(
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project", "test"));
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test"));

await().until(jobManager::getAllJobs, hasSize(1));
Job job = jobRepository.findByStatus(JobStatus.RUNNING).get(0);
Expand All @@ -173,11 +177,28 @@ public void shouldRestoreJobThatStopped() {
hasProperty("id", not(ingestionJobs.get(0).getId())))));
}

@Test
@SneakyThrows
public void shouldNotCreateJobForUnwantedFeatureSet() {
apiClient.simpleApplyFeatureSet(
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "other"));

Thread.sleep(2000);

assertThat(jobManager.getAllJobs(), hasSize(0));
}

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Nested
class SpecNotificationFlow extends SequentialFlow {
Job job;

@AfterAll
public void tearDown() {
jobManager.cleanAll();
jobRepository.deleteAll();
}

@Test
@Order(1)
public void shouldSendNewSpec() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.core.service;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.beans.HasPropertyWithValue.hasProperty;
import static org.hamcrest.core.Is.isA;
Expand Down Expand Up @@ -73,7 +74,21 @@ public void setUp() {
feastProperties = new FeastProperties();
JobProperties jobProperties = new JobProperties();
jobProperties.setJobUpdateTimeoutSeconds(5);

JobProperties.CoordinatorProperties.FeatureSetSelector selector =
new JobProperties.CoordinatorProperties.FeatureSetSelector();
selector.setName("fs*");
selector.setProject("*");

JobProperties.CoordinatorProperties coordinatorProperties =
new JobProperties.CoordinatorProperties();
coordinatorProperties.setFeatureSetSelector(ImmutableList.of(selector));
coordinatorProperties.setWhitelistedStores(
ImmutableList.of("test-store", "test", "test-1", "test-2", "normal-store"));

jobProperties.setCoordinator(coordinatorProperties);
feastProperties.setJobs(jobProperties);

TestUtil.setupAuditLogger();

JobManager jobManager = mock(JobManager.class);
Expand Down Expand Up @@ -136,8 +151,8 @@ public void shouldGroupJobsBySource() {
Source source1 = DataGenerator.createSource("servers:9092", "topic");
Source source2 = DataGenerator.createSource("others.servers:9092", "topic");

FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "project1", "features1");
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "project1", "features2");
FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "project1", "fs1");
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "project1", "fs2");

when(specService.listFeatureSets(
Filter.newBuilder().setFeatureSetName("*").setProject("project1").build()))
Expand Down Expand Up @@ -170,8 +185,8 @@ public void shouldUseStoreSubscriptionToMapStore() {
Source source1 = DataGenerator.createSource("servers:9092", "topic");
Source source2 = DataGenerator.createSource("other.servers:9092", "topic");

FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "default", "feature1");
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "default", "feature2");
FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "default", "fs1");
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "default", "fs2");

when(specService.listFeatureSets(
Filter.newBuilder().setFeatureSetName("features1").setProject("*").build()))
Expand Down Expand Up @@ -268,7 +283,7 @@ public void shouldCreateJobPerStore() throws InvalidProtocolBufferException {

Source source = DataGenerator.createSource("servers:9092", "topic");

FeatureSet featureSet = DataGenerator.createFeatureSet(source, "default", "features1");
FeatureSet featureSet = DataGenerator.createFeatureSet(source, "default", "fs1");

when(specService.listFeatureSets(
Filter.newBuilder().setFeatureSetName("*").setProject("*").build()))
Expand Down Expand Up @@ -310,7 +325,6 @@ public void shouldCloneRunningJobOnUpgrade() throws InvalidProtocolBufferExcepti
Store store2 =
DataGenerator.createStore(
"test-2", Store.StoreType.REDIS, ImmutableList.of(Triple.of("*", "*", false)));
;

Source source = DataGenerator.createSource("servers:9092", "topic");

Expand All @@ -333,4 +347,62 @@ public void shouldCloneRunningJobOnUpgrade() throws InvalidProtocolBufferExcepti
assertThat(jobTasks, hasSize(1));
assertThat(jobTasks, hasItem(isA(CreateJobTask.class)));
}

@Test
@SneakyThrows
public void shouldSelectOnlyFeatureSetsThatJobManagerSubscribedTo() {
Store store = DataGenerator.getDefaultStore();
Source source = DataGenerator.getDefaultSource();

FeatureSet featureSet1 = DataGenerator.createFeatureSet(source, "default", "fs1");
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source, "project", "fs3");
FeatureSet featureSet3 = DataGenerator.createFeatureSet(source, "default", "not-fs");

when(specService.listFeatureSets(
Filter.newBuilder().setFeatureSetName("*").setProject("*").build()))
.thenReturn(
ListFeatureSetsResponse.newBuilder()
.addAllFeatureSets(Lists.newArrayList(featureSet1, featureSet2, featureSet3))
.build());

List<FeatureSet> featureSetsForStore = jcsWithConsolidation.getFeatureSetsForStore(store);
assertThat(featureSetsForStore, containsInAnyOrder(featureSet1, featureSet2));
}

@Test
@SneakyThrows
public void shouldSelectOnlyStoresThatNotBlacklisted() {
Store store1 =
DataGenerator.createStore(
"normal-store",
Store.StoreType.REDIS,
ImmutableList.of(Triple.of("project1", "*", false)));
Store store2 =
DataGenerator.createStore(
"blacklisted-store",
Store.StoreType.REDIS,
ImmutableList.of(Triple.of("project2", "*", false)));

Source source1 = DataGenerator.createSource("source-1", "topic");
Source source2 = DataGenerator.createSource("source-2", "topic");

FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "default", "fs1");
FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "project", "fs3");

when(specService.listStores(any()))
.thenReturn(ListStoresResponse.newBuilder().addStore(store1).addStore(store2).build());

when(specService.listFeatureSets(
Filter.newBuilder().setProject("project1").setFeatureSetName("*").build()))
.thenReturn(ListFeatureSetsResponse.newBuilder().addFeatureSets(featureSet1).build());

when(specService.listFeatureSets(
Filter.newBuilder().setProject("project2").setFeatureSetName("*").build()))
.thenReturn(ListFeatureSetsResponse.newBuilder().addFeatureSets(featureSet2).build());

ArrayList<Pair<Source, Set<Store>>> pairs =
Lists.newArrayList(jcsWithConsolidation.getSourceToStoreMappings());

assertThat(pairs, containsInAnyOrder(Pair.of(source1, ImmutableSet.of(store1))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ feast-core:
jobSelector:
application: feast
tag: $IMAGE_TAG
featureSetSelector:
- project: "*"
name: "*"
whitelisted-stores:
- online
- historical
runners:
- name: dataflow
type: DataflowRunner
Expand Down

0 comments on commit 52c17a3

Please sign in to comment.