Skip to content

Commit

Permalink
Preload spec in serving cache (#152)
Browse files Browse the repository at this point in the history
* Preload spec in serving cache

* Disable async refresh and only periodically refresh cache
  • Loading branch information
pradithya authored and feast-ci-bot committed Mar 10, 2019
1 parent 0a89bd6 commit 1c6b893
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 66 deletions.
7 changes: 7 additions & 0 deletions serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@
<version>2.23.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<version>26.0-jre</version>
<scope>test</scope>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -39,15 +41,12 @@
import org.springframework.http.converter.protobuf.ProtobufJsonFormatHttpMessageConverter;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
* Global bean configuration.
*/
/** Global bean configuration. */
@Slf4j
@Configuration
public class ServingApiConfiguration implements WebMvcConfigurer {

@Autowired
private ProtobufJsonFormatHttpMessageConverter protobufConverter;
@Autowired private ProtobufJsonFormatHttpMessageConverter protobufConverter;

@Bean
public AppConfig getAppConfig(
Expand All @@ -66,9 +65,24 @@ public AppConfig getAppConfig(
@Bean
public SpecStorage getCoreServiceSpecStorage(
@Value("${feast.core.host}") String coreServiceHost,
@Value("${feast.core.grpc.port}") String coreServicePort) {
return new CachedSpecStorage(
new CoreService(coreServiceHost, Integer.parseInt(coreServicePort)));
@Value("${feast.core.grpc.port}") String coreServicePort,
@Value("${feast.cacheDurationMinute}") int cacheDurationMinute) {
ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
final CachedSpecStorage cachedSpecStorage =
new CachedSpecStorage(new CoreService(coreServiceHost, Integer.parseInt(coreServicePort)));

// reload all specs including new ones periodically
scheduledExecutorService.schedule(
() -> cachedSpecStorage.populateCache(), cacheDurationMinute, TimeUnit.MINUTES);

// load all specs during start up
try {
cachedSpecStorage.populateCache();
} catch (Exception e) {
log.error("Unable to preload feast's spec");
}
return cachedSpecStorage;
}

@Bean
Expand Down
82 changes: 24 additions & 58 deletions serving/src/main/java/feast/serving/service/CachedSpecStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import lombok.extern.slf4j.Slf4j;
import feast.serving.exception.SpecRetrievalException;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

/** SpecStorage implementation with built-in in-memory cache. */
@Slf4j
public class CachedSpecStorage implements SpecStorage {
private static final int MAX_SPEC_COUNT = 10000;

private final CoreService coreService;
private final LoadingCache<String, EntitySpec> entitySpecCache;
private final CacheLoader<String, EntitySpec> entitySpecLoader;
Expand All @@ -41,70 +41,24 @@ public class CachedSpecStorage implements SpecStorage {
private final LoadingCache<String, StorageSpec> storageSpecCache;
private final CacheLoader<String, StorageSpec> storageSpecLoader;

private static final Duration CACHE_DURATION;
private static final int MAX_SPEC_COUNT = 1000;

static {
CACHE_DURATION = Duration.ofMinutes(30);
}

public CachedSpecStorage(CoreService coreService) {
this.coreService = coreService;
entitySpecLoader =
new CacheLoader<String, EntitySpec>() {
@Override
public EntitySpec load(String key) throws Exception {
return coreService.getEntitySpecs(Collections.singletonList(key)).get(key);
}

@Override
public Map<String, EntitySpec> loadAll(Iterable<? extends String> keys) throws Exception {
return coreService.getEntitySpecs((Iterable<String>) keys);
}
};
entitySpecCache =
CacheBuilder.newBuilder()
.maximumSize(MAX_SPEC_COUNT)
.expireAfterAccess(CACHE_DURATION)
.build(entitySpecLoader);
CacheLoader.from(
(String key) -> coreService.getEntitySpecs(Collections.singletonList(key)).get(key));
entitySpecCache = CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(entitySpecLoader);

featureSpecLoader =
new CacheLoader<String, FeatureSpec>() {
@Override
public FeatureSpec load(String key) throws Exception {
return coreService.getFeatureSpecs(Collections.singletonList(key)).get(key);
}

@Override
public Map<String, FeatureSpec> loadAll(Iterable<? extends String> keys)
throws Exception {
return coreService.getFeatureSpecs((Iterable<String>) keys);
}
};
CacheLoader.from(
(String key) -> coreService.getFeatureSpecs(Collections.singletonList(key)).get(key));
featureSpecCache =
CacheBuilder.newBuilder()
.maximumSize(MAX_SPEC_COUNT)
.expireAfterAccess(CACHE_DURATION)
.build(featureSpecLoader);
CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(featureSpecLoader);

storageSpecLoader =
new CacheLoader<String, StorageSpec>() {
@Override
public Map<String, StorageSpec> loadAll(Iterable<? extends String> keys)
throws Exception {
return coreService.getStorageSpecs((Iterable<String>) keys);
}

@Override
public StorageSpec load(String key) throws Exception {
return coreService.getStorageSpecs(Collections.singleton(key)).get(key);
}
};
CacheLoader.from(
(String key) -> coreService.getStorageSpecs(Collections.singletonList(key)).get(key));
storageSpecCache =
CacheBuilder.newBuilder()
.maximumSize(MAX_SPEC_COUNT)
.expireAfterAccess(CACHE_DURATION)
.build(storageSpecLoader);
CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(storageSpecLoader);
}

@Override
Expand Down Expand Up @@ -177,4 +131,16 @@ public Map<String, StorageSpec> getAllStorageSpecs() {
public boolean isConnected() {
return coreService.isConnected();
}

/** Preload all spec into cache. */
public void populateCache() {
Map<String, FeatureSpec> featureSpecMap = coreService.getAllFeatureSpecs();
featureSpecCache.putAll(featureSpecMap);

Map<String, EntitySpec> entitySpecMap = coreService.getAllEntitySpecs();
entitySpecCache.putAll(entitySpecMap);

Map<String, StorageSpec> storageSpecMap = coreService.getAllStorageSpecs();
storageSpecCache.putAll(storageSpecMap);
}
}
1 change: 1 addition & 0 deletions serving/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ feast.maxentity=${FEAST_MAX_ENTITY_PER_BATCH:2000}
feast.timeout=${FEAST_RETRIEVAL_TIMEOUT:5}
feast.redispool.maxsize=${FEAST_REDIS_POOL_MAX_SIZE:128}
feast.redispool.maxidle=${FEAST_REDIS_POOL_MAX_IDLE:16}
feast.cacheDurationMinute=${FEAST_SPEC_CACHE_DURATION_MINUTE:5}

statsd.host= ${STATSD_HOST:localhost}
statsd.port= ${STATSD_PORT:8125}
Expand Down
106 changes: 106 additions & 0 deletions serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package feast.serving.service;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.testing.FakeTicker;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;

public class CachedSpecStorageTest {

private CoreService coreService;
private CachedSpecStorage cachedSpecStorage;

@Before
public void setUp() throws Exception {
coreService = mock(CoreService.class);
cachedSpecStorage = new CachedSpecStorage(coreService);
}

@Test
public void testPopulateCache() {
Map<String, FeatureSpec> featureSpecMap = new HashMap<>();
featureSpecMap.put("feature_1", mock(FeatureSpec.class));

Map<String, StorageSpec> storageSpecMap = new HashMap<>();
storageSpecMap.put("storage_1", mock(StorageSpec.class));

Map<String, EntitySpec> entitySpecMap = new HashMap<>();
entitySpecMap.put("entity_1", mock(EntitySpec.class));

when(coreService.getAllFeatureSpecs()).thenReturn(featureSpecMap);
when(coreService.getAllEntitySpecs()).thenReturn(entitySpecMap);
when(coreService.getAllStorageSpecs()).thenReturn(storageSpecMap);

cachedSpecStorage.populateCache();
Map<String, FeatureSpec> result =
cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1"));
Map<String, StorageSpec> result1 =
cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1"));
Map<String, EntitySpec> result2 =
cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1"));

assertThat(result.size(), equalTo(1));
assertThat(result1.size(), equalTo(1));
assertThat(result2.size(), equalTo(1));

verify(coreService, times(0)).getFeatureSpecs(any(Iterable.class));
verify(coreService, times(0)).getStorageSpecs(any(Iterable.class));
verify(coreService, times(0)).getEntitySpecs(any(Iterable.class));
}

@Test
public void reloadFailureShouldReturnOldValue() {
Map<String, FeatureSpec> featureSpecMap = new HashMap<>();
featureSpecMap.put("feature_1", mock(FeatureSpec.class));

Map<String, StorageSpec> storageSpecMap = new HashMap<>();
storageSpecMap.put("storage_1", mock(StorageSpec.class));

Map<String, EntitySpec> entitySpecMap = new HashMap<>();
entitySpecMap.put("entity_1", mock(EntitySpec.class));

when(coreService.getAllFeatureSpecs()).thenReturn(featureSpecMap);
when(coreService.getFeatureSpecs(any(Iterable.class))).thenThrow(new RuntimeException("error"));
when(coreService.getAllEntitySpecs()).thenReturn(entitySpecMap);
when(coreService.getEntitySpecs(any(Iterable.class))).thenThrow(new RuntimeException("error"));
when(coreService.getAllStorageSpecs()).thenReturn(storageSpecMap);
when(coreService.getStorageSpecs(any(Iterable.class))).thenThrow(new RuntimeException("error"));

cachedSpecStorage.populateCache();
Map<String, FeatureSpec> result =
cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1"));
Map<String, StorageSpec> result1 =
cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1"));
Map<String, EntitySpec> result2 =
cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1"));

assertThat(result.size(), equalTo(1));
assertThat(result1.size(), equalTo(1));
assertThat(result2.size(), equalTo(1));
verify(coreService, times(0)).getFeatureSpecs(any(Iterable.class));
verify(coreService, times(0)).getStorageSpecs(any(Iterable.class));
verify(coreService, times(0)).getEntitySpecs(any(Iterable.class));

result = cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1"));
result1 = cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1"));
result2 = cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1"));
assertThat(result.size(), equalTo(1));
assertThat(result1.size(), equalTo(1));
assertThat(result2.size(), equalTo(1));
}
}

0 comments on commit 1c6b893

Please sign in to comment.