From 315cdd207466f36e3aa26a5f98c5cd9fd88fab74 Mon Sep 17 00:00:00 2001 From: Pradithya Aria Date: Wed, 6 Mar 2019 15:07:49 +0800 Subject: [PATCH 1/2] Preload spec in serving cache --- serving/pom.xml | 7 ++ .../config/ServingApiConfiguration.java | 42 ++++++- .../serving/service/CachedSpecStorage.java | 88 ++++++++++---- .../src/main/resources/application.properties | 1 + .../service/CachedSpecStorageTest.java | 115 ++++++++++++++++++ 5 files changed, 228 insertions(+), 25 deletions(-) create mode 100644 serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java diff --git a/serving/pom.xml b/serving/pom.xml index 34cfcd777e..c6db97ec91 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -232,6 +232,13 @@ 2.23.0 test + + + com.google.guava + guava-testlib + 26.0-jre + test + diff --git a/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java b/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java index c8e5225a3c..cf251abe4e 100644 --- a/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java +++ b/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java @@ -17,8 +17,10 @@ package feast.serving.config; +import com.google.common.base.Ticker; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import feast.serving.service.CachedSpecStorage; import feast.serving.service.CoreService; import feast.serving.service.FeatureStorageRegistry; @@ -26,10 +28,14 @@ import feast.specs.StorageSpecProto.StorageSpec; import io.opentracing.Tracer; import io.opentracing.contrib.concurrent.TracedExecutorService; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -66,9 +72,39 @@ public AppConfig getAppConfig( @Bean public SpecStorage getCoreServiceSpecStorage( @Value("${feast.core.host}") String coreServiceHost, - @Value("${feast.core.grpc.port}") String coreServicePort) { - return new CachedSpecStorage( - new CoreService(coreServiceHost, Integer.parseInt(coreServicePort))); + @Value("${feast.core.grpc.port}") String coreServicePort, + @Value("${feast.cacheDurationMinute}") int cacheDurationMinute) { + Duration cacheDuration = Duration.ofMinutes(cacheDurationMinute); + + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("cache-refresh-thread") + .setDaemon(true) + .build(); + ExecutorService parentExecutor = Executors.newSingleThreadExecutor(threadFactory); + final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(parentExecutor); + + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + final CachedSpecStorage cachedSpecStorage = + new CachedSpecStorage( + new CoreService(coreServiceHost, Integer.parseInt(coreServicePort)), + executorService, + cacheDuration, + Ticker.systemTicker()); + + // reload all specs including new ones periodically + scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + cachedSpecStorage.populateCache(); + } + }, cacheDurationMinute, TimeUnit.MINUTES); + + // load all specs during start up + try { + cachedSpecStorage.populateCache(); + } catch (Exception e) { + log.error("Unable to preload feast's spec"); + } + return cachedSpecStorage; } @Bean diff --git a/serving/src/main/java/feast/serving/service/CachedSpecStorage.java b/serving/src/main/java/feast/serving/service/CachedSpecStorage.java index dcbb74f6c8..88404eccf9 100644 --- a/serving/src/main/java/feast/serving/service/CachedSpecStorage.java +++ b/serving/src/main/java/feast/serving/service/CachedSpecStorage.java @@ -17,22 +17,26 @@ package feast.serving.service; +import com.google.common.base.Ticker; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import lombok.extern.slf4j.Slf4j; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import feast.serving.exception.SpecRetrievalException; import feast.specs.EntitySpecProto.EntitySpec; import feast.specs.FeatureSpecProto.FeatureSpec; import feast.specs.StorageSpecProto.StorageSpec; - import java.time.Duration; import java.util.Collections; import java.util.Map; +import lombok.extern.slf4j.Slf4j; /** SpecStorage implementation with built-in in-memory cache. */ @Slf4j public class CachedSpecStorage implements SpecStorage { + private static final int MAX_SPEC_COUNT = 1000; + private final CoreService coreService; private final LoadingCache entitySpecCache; private final CacheLoader entitySpecLoader; @@ -41,14 +45,11 @@ public class CachedSpecStorage implements SpecStorage { private final LoadingCache storageSpecCache; private final CacheLoader storageSpecLoader; - private static final Duration CACHE_DURATION; - private static final int MAX_SPEC_COUNT = 1000; - - static { - CACHE_DURATION = Duration.ofMinutes(30); - } - - public CachedSpecStorage(CoreService coreService) { + public CachedSpecStorage( + CoreService coreService, + ListeningExecutorService executorService, + Duration cacheDuration, + Ticker ticker) { this.coreService = coreService; entitySpecLoader = new CacheLoader() { @@ -58,14 +59,25 @@ public EntitySpec load(String key) throws Exception { } @Override - public Map loadAll(Iterable keys) throws Exception { - return coreService.getEntitySpecs((Iterable) keys); + public ListenableFuture reload(String key, EntitySpec oldValue) + throws Exception { + return executorService.submit( + () -> { + EntitySpec result = oldValue; + try { + result = coreService.getEntitySpecs(Collections.singleton(key)).get(key); + } catch (Exception e) { + log.error("Error reloading entity spec"); + } + return result; + }); } }; entitySpecCache = CacheBuilder.newBuilder() .maximumSize(MAX_SPEC_COUNT) - .expireAfterAccess(CACHE_DURATION) + .refreshAfterWrite(cacheDuration) + .ticker(ticker) .build(entitySpecLoader); featureSpecLoader = @@ -76,34 +88,54 @@ public FeatureSpec load(String key) throws Exception { } @Override - public Map loadAll(Iterable keys) + public ListenableFuture reload(String key, FeatureSpec oldValue) throws Exception { - return coreService.getFeatureSpecs((Iterable) keys); + return executorService.submit( + () -> { + FeatureSpec result = oldValue; + try { + result = coreService.getFeatureSpecs(Collections.singleton(key)).get(key); + } catch (Exception e) { + log.error("Error reloading feature spec"); + } + return result; + }); } }; featureSpecCache = CacheBuilder.newBuilder() .maximumSize(MAX_SPEC_COUNT) - .expireAfterAccess(CACHE_DURATION) + .refreshAfterWrite(cacheDuration) + .ticker(ticker) .build(featureSpecLoader); storageSpecLoader = new CacheLoader() { @Override - public Map loadAll(Iterable keys) - throws Exception { - return coreService.getStorageSpecs((Iterable) keys); + public StorageSpec load(String key) throws Exception { + return coreService.getStorageSpecs(Collections.singleton(key)).get(key); } @Override - public StorageSpec load(String key) throws Exception { - return coreService.getStorageSpecs(Collections.singleton(key)).get(key); + public ListenableFuture reload(String key, StorageSpec oldValue) + throws Exception { + return executorService.submit( + () -> { + StorageSpec result = oldValue; + try { + result = coreService.getStorageSpecs(Collections.singleton(key)).get(key); + } catch (Exception e) { + log.error("Error reloading storage spec"); + } + return result; + }); } }; storageSpecCache = CacheBuilder.newBuilder() .maximumSize(MAX_SPEC_COUNT) - .expireAfterAccess(CACHE_DURATION) + .refreshAfterWrite(cacheDuration) + .ticker(ticker) .build(storageSpecLoader); } @@ -177,4 +209,16 @@ public Map getAllStorageSpecs() { public boolean isConnected() { return coreService.isConnected(); } + + /** Preload all spec into cache. */ + public void populateCache() { + Map featureSpecMap = coreService.getAllFeatureSpecs(); + featureSpecCache.putAll(featureSpecMap); + + Map entitySpecMap = coreService.getAllEntitySpecs(); + entitySpecCache.putAll(entitySpecMap); + + Map storageSpecMap = coreService.getAllStorageSpecs(); + storageSpecCache.putAll(storageSpecMap); + } } diff --git a/serving/src/main/resources/application.properties b/serving/src/main/resources/application.properties index ff14ded742..0327f18769 100644 --- a/serving/src/main/resources/application.properties +++ b/serving/src/main/resources/application.properties @@ -24,6 +24,7 @@ feast.maxentity=${FEAST_MAX_ENTITY_PER_BATCH:2000} feast.timeout=${FEAST_RETRIEVAL_TIMEOUT:5} feast.redispool.maxsize=${FEAST_REDIS_POOL_MAX_SIZE:128} feast.redispool.maxidle=${FEAST_REDIS_POOL_MAX_IDLE:16} +feast.cacheDurationMinute=${FEAST_SPEC_CACHE_DURATION_MINUTE:5} statsd.host= ${STATSD_HOST:localhost} statsd.port= ${STATSD_PORT:8125} diff --git a/serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java b/serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java new file mode 100644 index 0000000000..c9d19042ca --- /dev/null +++ b/serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java @@ -0,0 +1,115 @@ +package feast.serving.service; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.testing.FakeTicker; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import feast.specs.EntitySpecProto.EntitySpec; +import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.specs.StorageSpecProto.StorageSpec; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; + +public class CachedSpecStorageTest { + + private FakeTicker fakeTicker; + private CoreService coreService; + private CachedSpecStorage cachedSpecStorage; + + @Before + public void setUp() throws Exception { + fakeTicker = new FakeTicker(); + coreService = mock(CoreService.class); + ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + cachedSpecStorage = new CachedSpecStorage(coreService, executorService, + Duration.ofSeconds(5), fakeTicker); + } + + @Test + public void shouldNotBeNull() { + assertNotNull(cachedSpecStorage); + } + + @Test + public void testPopulateCache() { + Map featureSpecMap = new HashMap<>(); + featureSpecMap.put("feature_1", mock(FeatureSpec.class)); + + Map storageSpecMap = new HashMap<>(); + storageSpecMap.put("storage_1", mock(StorageSpec.class)); + + Map entitySpecMap = new HashMap<>(); + entitySpecMap.put("entity_1", mock(EntitySpec.class)); + + when(coreService.getAllFeatureSpecs()).thenReturn(featureSpecMap); + when(coreService.getAllEntitySpecs()).thenReturn(entitySpecMap); + when(coreService.getAllStorageSpecs()).thenReturn(storageSpecMap); + + cachedSpecStorage.populateCache(); + Map result = cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1")); + Map result1 = cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1")); + Map result2 = cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1")); + + assertThat(result.size(), equalTo(1)); + assertThat(result1.size(), equalTo(1)); + assertThat(result2.size(), equalTo(1)); + + verify(coreService, times(0)).getFeatureSpecs(any(Iterable.class)); + verify(coreService, times(0)).getStorageSpecs(any(Iterable.class)); + verify(coreService, times(0)).getEntitySpecs(any(Iterable.class)); + } + + @Test + public void reloadFailureShouldReturnOldValue() { + Map featureSpecMap = new HashMap<>(); + featureSpecMap.put("feature_1", mock(FeatureSpec.class)); + + Map storageSpecMap = new HashMap<>(); + storageSpecMap.put("storage_1", mock(StorageSpec.class)); + + Map entitySpecMap = new HashMap<>(); + entitySpecMap.put("entity_1", mock(EntitySpec.class)); + + when(coreService.getAllFeatureSpecs()).thenReturn(featureSpecMap); + when(coreService.getFeatureSpecs(any(Iterable.class))).thenThrow(new RuntimeException("error")); + when(coreService.getAllEntitySpecs()).thenReturn(entitySpecMap); + when(coreService.getEntitySpecs(any(Iterable.class))).thenThrow(new RuntimeException("error")); + when(coreService.getAllStorageSpecs()).thenReturn(storageSpecMap); + when(coreService.getStorageSpecs(any(Iterable.class))).thenThrow(new RuntimeException("error")); + + + cachedSpecStorage.populateCache(); + Map result = cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1")); + Map result1 = cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1")); + Map result2 = cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1")); + + assertThat(result.size(), equalTo(1)); + assertThat(result1.size(), equalTo(1)); + assertThat(result2.size(), equalTo(1)); + verify(coreService, times(0)).getFeatureSpecs(any(Iterable.class)); + verify(coreService, times(0)).getStorageSpecs(any(Iterable.class)); + verify(coreService, times(0)).getEntitySpecs(any(Iterable.class)); + + fakeTicker.advance(6, TimeUnit.SECONDS); + + result = cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1")); + result1 = cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1")); + result2 = cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1")); + assertThat(result.size(), equalTo(1)); + assertThat(result1.size(), equalTo(1)); + assertThat(result2.size(), equalTo(1)); + } +} \ No newline at end of file From 06226fe60bef5fcb3ced1462c696edd8ef3df4f2 Mon Sep 17 00:00:00 2001 From: Pradithya Aria Date: Thu, 7 Mar 2019 18:56:31 +0800 Subject: [PATCH 2/2] Disable async refresh and only periodically refresh cache --- .../config/ServingApiConfiguration.java | 36 ++----- .../serving/service/CachedSpecStorage.java | 100 ++---------------- .../service/CachedSpecStorageTest.java | 39 +++---- 3 files changed, 33 insertions(+), 142 deletions(-) diff --git a/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java b/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java index cf251abe4e..dcee2f73c6 100644 --- a/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java +++ b/serving/src/main/java/feast/serving/config/ServingApiConfiguration.java @@ -17,10 +17,8 @@ package feast.serving.config; -import com.google.common.base.Ticker; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import feast.serving.service.CachedSpecStorage; import feast.serving.service.CoreService; import feast.serving.service.FeatureStorageRegistry; @@ -28,13 +26,11 @@ import feast.specs.StorageSpecProto.StorageSpec; import io.opentracing.Tracer; import io.opentracing.contrib.concurrent.TracedExecutorService; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -45,15 +41,12 @@ import org.springframework.http.converter.protobuf.ProtobufJsonFormatHttpMessageConverter; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; -/** - * Global bean configuration. - */ +/** Global bean configuration. */ @Slf4j @Configuration public class ServingApiConfiguration implements WebMvcConfigurer { - @Autowired - private ProtobufJsonFormatHttpMessageConverter protobufConverter; + @Autowired private ProtobufJsonFormatHttpMessageConverter protobufConverter; @Bean public AppConfig getAppConfig( @@ -74,29 +67,14 @@ public SpecStorage getCoreServiceSpecStorage( @Value("${feast.core.host}") String coreServiceHost, @Value("${feast.core.grpc.port}") String coreServicePort, @Value("${feast.cacheDurationMinute}") int cacheDurationMinute) { - Duration cacheDuration = Duration.ofMinutes(cacheDurationMinute); - - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("cache-refresh-thread") - .setDaemon(true) - .build(); - ExecutorService parentExecutor = Executors.newSingleThreadExecutor(threadFactory); - final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(parentExecutor); - - ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(); final CachedSpecStorage cachedSpecStorage = - new CachedSpecStorage( - new CoreService(coreServiceHost, Integer.parseInt(coreServicePort)), - executorService, - cacheDuration, - Ticker.systemTicker()); + new CachedSpecStorage(new CoreService(coreServiceHost, Integer.parseInt(coreServicePort))); // reload all specs including new ones periodically - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - cachedSpecStorage.populateCache(); - } - }, cacheDurationMinute, TimeUnit.MINUTES); + scheduledExecutorService.schedule( + () -> cachedSpecStorage.populateCache(), cacheDurationMinute, TimeUnit.MINUTES); // load all specs during start up try { diff --git a/serving/src/main/java/feast/serving/service/CachedSpecStorage.java b/serving/src/main/java/feast/serving/service/CachedSpecStorage.java index 88404eccf9..bddcdbc23f 100644 --- a/serving/src/main/java/feast/serving/service/CachedSpecStorage.java +++ b/serving/src/main/java/feast/serving/service/CachedSpecStorage.java @@ -17,17 +17,13 @@ package feast.serving.service; -import com.google.common.base.Ticker; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import feast.serving.exception.SpecRetrievalException; import feast.specs.EntitySpecProto.EntitySpec; import feast.specs.FeatureSpecProto.FeatureSpec; import feast.specs.StorageSpecProto.StorageSpec; -import java.time.Duration; import java.util.Collections; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -35,7 +31,7 @@ /** SpecStorage implementation with built-in in-memory cache. */ @Slf4j public class CachedSpecStorage implements SpecStorage { - private static final int MAX_SPEC_COUNT = 1000; + private static final int MAX_SPEC_COUNT = 10000; private final CoreService coreService; private final LoadingCache entitySpecCache; @@ -45,98 +41,24 @@ public class CachedSpecStorage implements SpecStorage { private final LoadingCache storageSpecCache; private final CacheLoader storageSpecLoader; - public CachedSpecStorage( - CoreService coreService, - ListeningExecutorService executorService, - Duration cacheDuration, - Ticker ticker) { + public CachedSpecStorage(CoreService coreService) { this.coreService = coreService; entitySpecLoader = - new CacheLoader() { - @Override - public EntitySpec load(String key) throws Exception { - return coreService.getEntitySpecs(Collections.singletonList(key)).get(key); - } - - @Override - public ListenableFuture reload(String key, EntitySpec oldValue) - throws Exception { - return executorService.submit( - () -> { - EntitySpec result = oldValue; - try { - result = coreService.getEntitySpecs(Collections.singleton(key)).get(key); - } catch (Exception e) { - log.error("Error reloading entity spec"); - } - return result; - }); - } - }; - entitySpecCache = - CacheBuilder.newBuilder() - .maximumSize(MAX_SPEC_COUNT) - .refreshAfterWrite(cacheDuration) - .ticker(ticker) - .build(entitySpecLoader); + CacheLoader.from( + (String key) -> coreService.getEntitySpecs(Collections.singletonList(key)).get(key)); + entitySpecCache = CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(entitySpecLoader); featureSpecLoader = - new CacheLoader() { - @Override - public FeatureSpec load(String key) throws Exception { - return coreService.getFeatureSpecs(Collections.singletonList(key)).get(key); - } - - @Override - public ListenableFuture reload(String key, FeatureSpec oldValue) - throws Exception { - return executorService.submit( - () -> { - FeatureSpec result = oldValue; - try { - result = coreService.getFeatureSpecs(Collections.singleton(key)).get(key); - } catch (Exception e) { - log.error("Error reloading feature spec"); - } - return result; - }); - } - }; + CacheLoader.from( + (String key) -> coreService.getFeatureSpecs(Collections.singletonList(key)).get(key)); featureSpecCache = - CacheBuilder.newBuilder() - .maximumSize(MAX_SPEC_COUNT) - .refreshAfterWrite(cacheDuration) - .ticker(ticker) - .build(featureSpecLoader); + CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(featureSpecLoader); storageSpecLoader = - new CacheLoader() { - @Override - public StorageSpec load(String key) throws Exception { - return coreService.getStorageSpecs(Collections.singleton(key)).get(key); - } - - @Override - public ListenableFuture reload(String key, StorageSpec oldValue) - throws Exception { - return executorService.submit( - () -> { - StorageSpec result = oldValue; - try { - result = coreService.getStorageSpecs(Collections.singleton(key)).get(key); - } catch (Exception e) { - log.error("Error reloading storage spec"); - } - return result; - }); - } - }; + CacheLoader.from( + (String key) -> coreService.getStorageSpecs(Collections.singletonList(key)).get(key)); storageSpecCache = - CacheBuilder.newBuilder() - .maximumSize(MAX_SPEC_COUNT) - .refreshAfterWrite(cacheDuration) - .ticker(ticker) - .build(storageSpecLoader); + CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(storageSpecLoader); } @Override diff --git a/serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java b/serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java index c9d19042ca..76a7b78061 100644 --- a/serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java +++ b/serving/src/test/java/feast/serving/service/CachedSpecStorageTest.java @@ -1,7 +1,8 @@ package feast.serving.service; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -9,38 +10,25 @@ import static org.mockito.Mockito.when; import com.google.common.testing.FakeTicker; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import feast.specs.EntitySpecProto.EntitySpec; import feast.specs.FeatureSpecProto.FeatureSpec; import feast.specs.StorageSpecProto.StorageSpec; -import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; public class CachedSpecStorageTest { - private FakeTicker fakeTicker; private CoreService coreService; private CachedSpecStorage cachedSpecStorage; @Before public void setUp() throws Exception { - fakeTicker = new FakeTicker(); coreService = mock(CoreService.class); - ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - cachedSpecStorage = new CachedSpecStorage(coreService, executorService, - Duration.ofSeconds(5), fakeTicker); - } - - @Test - public void shouldNotBeNull() { - assertNotNull(cachedSpecStorage); + cachedSpecStorage = new CachedSpecStorage(coreService); } @Test @@ -59,9 +47,12 @@ public void testPopulateCache() { when(coreService.getAllStorageSpecs()).thenReturn(storageSpecMap); cachedSpecStorage.populateCache(); - Map result = cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1")); - Map result1 = cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1")); - Map result2 = cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1")); + Map result = + cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1")); + Map result1 = + cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1")); + Map result2 = + cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1")); assertThat(result.size(), equalTo(1)); assertThat(result1.size(), equalTo(1)); @@ -90,11 +81,13 @@ public void reloadFailureShouldReturnOldValue() { when(coreService.getAllStorageSpecs()).thenReturn(storageSpecMap); when(coreService.getStorageSpecs(any(Iterable.class))).thenThrow(new RuntimeException("error")); - cachedSpecStorage.populateCache(); - Map result = cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1")); - Map result1 = cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1")); - Map result2 = cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1")); + Map result = + cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1")); + Map result1 = + cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1")); + Map result2 = + cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1")); assertThat(result.size(), equalTo(1)); assertThat(result1.size(), equalTo(1)); @@ -103,8 +96,6 @@ public void reloadFailureShouldReturnOldValue() { verify(coreService, times(0)).getStorageSpecs(any(Iterable.class)); verify(coreService, times(0)).getEntitySpecs(any(Iterable.class)); - fakeTicker.advance(6, TimeUnit.SECONDS); - result = cachedSpecStorage.getFeatureSpecs(Collections.singletonList("feature_1")); result1 = cachedSpecStorage.getStorageSpecs(Collections.singletonList("storage_1")); result2 = cachedSpecStorage.getEntitySpecs(Collections.singletonList("entity_1"));