From bcff88e13b9fae50fc5cdb7b22b88325acb281a5 Mon Sep 17 00:00:00 2001 From: Rob Block Date: Thu, 25 Jul 2019 15:07:26 -0700 Subject: [PATCH 1/3] Add cache of recently seen blobs in CloudBlobStore to reduce unnecessary Azure requests. --- .../com.github.ambry/config/CloudConfig.java | 14 +++- .../CloudBackupManager.java | 10 +++ .../CloudBlobStore.java | 80 ++++++++++++++++--- .../com.github.ambry.cloud/VcrMetrics.java | 2 +- .../CloudBlobStoreTest.java | 32 ++++++-- .../MockMessageWriteSet.java | 9 +++ 6 files changed, 131 insertions(+), 16 deletions(-) diff --git a/ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java b/ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java index d8640e383e..7f35c7dad7 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java @@ -31,6 +31,7 @@ public class CloudConfig { public static final String CLOUD_DELETED_BLOB_RETENTION_DAYS = "cloud.deleted.blob.retention.days"; public static final String CLOUD_BLOB_COMPACTION_INTERVAL_HOURS = "cloud.blob.compaction.interval.hours"; public static final String CLOUD_BLOB_COMPACTION_QUERY_LIMIT = "cloud.blob.compaction.query.limit"; + public static final String CLOUD_RECENT_BLOB_CACHE_LIMIT = "cloud.recent.blob.cache.limit"; public static final String VCR_ASSIGNED_PARTITIONS = "vcr.assigned.partitions"; public static final String VCR_PROXY_HOST = "vcr.proxy.host"; public static final String VCR_PROXY_PORT = "vcr.proxy.port"; @@ -49,6 +50,7 @@ public class CloudConfig { public static final int DEFAULT_MIN_TTL_DAYS = 14; public static final int DEFAULT_RETENTION_DAYS = 7; public static final int DEFAULT_COMPACTION_QUERY_LIMIT = 100000; + public static final int DEFAULT_RECENT_BLOB_CACHE_LIMIT = 10000; public static final int DEFAULT_VCR_PROXY_PORT = 3128; /** @@ -141,6 +143,7 @@ public class CloudConfig { @Config(CLOUD_BLOB_COMPACTION_QUERY_LIMIT) @Default("100000") public final int cloudBlobCompactionQueryLimit; + /** * The dead blob compaction interval in hours */ @@ -149,7 +152,15 @@ public class CloudConfig { public final int cloudBlobCompactionIntervalHours; /** - * The comma-separated list of statically assigned partitions. Optional. + * The max size of recently-accessed blob cache in each cloud blob store. + */ + @Config(CLOUD_RECENT_BLOB_CACHE_LIMIT) + @Default("10000") + public final int recentBlobCacheLimit; + + /** + * The comma-separated list of statically assigned partitions. + * Used by static VCR cluster only. */ @Config(VCR_ASSIGNED_PARTITIONS) @Default("null") @@ -193,6 +204,7 @@ public CloudConfig(VerifiableProperties verifiableProperties) { cloudBlobCompactionIntervalHours = verifiableProperties.getInt(CLOUD_BLOB_COMPACTION_INTERVAL_HOURS, 24); cloudBlobCompactionQueryLimit = verifiableProperties.getInt(CLOUD_BLOB_COMPACTION_QUERY_LIMIT, DEFAULT_COMPACTION_QUERY_LIMIT); + recentBlobCacheLimit = verifiableProperties.getInt(CLOUD_RECENT_BLOB_CACHE_LIMIT, DEFAULT_RECENT_BLOB_CACHE_LIMIT); // Proxy settings vcrProxyHost = verifiableProperties.getString(VCR_PROXY_HOST, null); diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java index 99d67332cb..bdb30d2cee 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java @@ -38,7 +38,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -56,6 +58,7 @@ public class CloudBackupManager extends ReplicationEngine { private final VcrMetrics vcrMetrics; private final VirtualReplicatorCluster virtualReplicatorCluster; private final CloudStorageCompactor cloudStorageCompactor; + private final Map partitionStoreMap = new HashMap<>(); public CloudBackupManager(VerifiableProperties properties, CloudConfig cloudConfig, ReplicationConfig replicationConfig, ClusterMapConfig clusterMapConfig, StoreConfig storeConfig, @@ -116,6 +119,12 @@ public void onPartitionRemoved(PartitionId partitionId) { vcrMetrics.removePartitionErrorCount.inc(); } } + Store cloudStore = partitionStoreMap.get(partitionId.toPathString()); + if (cloudStore != null) { + cloudStore.shutdown(); + } else { + logger.warn("Store not found for partition {}", partitionId); + } logger.info("Partition {} removed from {}", partitionId, dataNodeId); // We don't close cloudBlobStore because because replicate in ReplicaThread is using a copy of // remoteReplicaInfo which needs CloudBlobStore. @@ -190,6 +199,7 @@ void addPartition(PartitionId partitionId) throws ReplicationException { retList.add(partitionInfo); return retList; }); + partitionStoreMap.put(partitionId.toPathString(), cloudStore); } else { try { cloudStore.shutdown(); diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java index 035e240139..5c1d1ebb16 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java @@ -42,7 +42,9 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -65,6 +67,14 @@ class CloudBlobStore implements Store { private final CloudBlobCryptoAgentFactory cryptoAgentFactory; private final CloudBlobCryptoAgent cryptoAgent; private final VcrMetrics vcrMetrics; + + /** The lifecycle state of a recently seen blob. */ + enum BlobState { + CREATED, TTL_UPDATED, DELETED + } + + // Map blobId to state (created, ttlUpdated, deleted) + private final Map recentBlobCache; private final long minTtlMillis; private final boolean requireEncryption; private boolean started; @@ -88,6 +98,7 @@ class CloudBlobStore implements Store { this.vcrMetrics = Objects.requireNonNull(vcrMetrics, "vcrMetrics is required"); minTtlMillis = TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays); requireEncryption = cloudConfig.vcrRequireEncryption; + recentBlobCache = Collections.synchronizedMap(new RecentBlobCache(1000, cloudConfig.recentBlobCacheLimit)); String cryptoAgentFactoryClass = cloudConfig.cloudBlobCryptoAgentFactoryClass; try { @@ -166,6 +177,7 @@ private void putBlob(MessageInfo messageInfo, ByteBuffer messageBuf, long size) // If buffer was encrypted, we no longer know its size long bufferlen = bufferChanged ? -1 : size; cloudDestination.uploadBlob(blobId, bufferlen, blobMetadata, new ByteBufferInputStream(messageBuf)); + addToCache(blobId.getID(), BlobState.CREATED); } else { logger.trace("Blob is skipped: {}", messageInfo); vcrMetrics.blobUploadSkippedCount.inc(); @@ -191,6 +203,9 @@ private boolean shouldUpload(MessageInfo messageInfo) { if (messageInfo.isDeleted()) { return false; } + if (recentBlobCache.containsKey(messageInfo.getStoreKey().getID())) { + return false; + } // expiration time above threshold. Expired blobs are blocked by ReplicaThread. return (messageInfo.getExpirationTimeInMs() == Utils.Infinite_Time || messageInfo.getExpirationTimeInMs() - messageInfo.getOperationTimeMs() >= minTtlMillis); @@ -204,7 +219,12 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException { try { for (MessageInfo msgInfo : messageSetToDelete.getMessageSetInfo()) { BlobId blobId = (BlobId) msgInfo.getStoreKey(); - cloudDestination.deleteBlob(blobId, msgInfo.getOperationTimeMs()); + String blobKey = msgInfo.getStoreKey().getID(); + BlobState blobState = recentBlobCache.get(blobKey); + if (blobState != BlobState.DELETED) { + cloudDestination.deleteBlob(blobId, msgInfo.getOperationTimeMs()); + addToCache(blobKey, BlobState.DELETED); + } } } catch (CloudStorageException ex) { throw new StoreException(ex, StoreErrorCodes.IOError); @@ -228,7 +248,11 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException // need to be modified. if (msgInfo.isTtlUpdated()) { BlobId blobId = (BlobId) msgInfo.getStoreKey(); - cloudDestination.updateBlobExpiration(blobId, Utils.Infinite_Time); + BlobState blobState = recentBlobCache.get(blobId.getID()); + if (blobState == null || blobState == BlobState.CREATED) { + cloudDestination.updateBlobExpiration(blobId, Utils.Infinite_Time); + addToCache(blobId.getID(), BlobState.TTL_UPDATED); + } } else { logger.error("updateTtl() is called but msgInfo.isTtlUpdated is not set. msgInfo: {}", msgInfo); vcrMetrics.updateTtlNotSetError.inc(); @@ -239,6 +263,16 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException } } + /** + * Add a blob state mapping to the recent blob cache. + * @param blobKey the blob key to cache. + * @param blobState the state of the blob. + */ + // Visible for test + void addToCache(String blobKey, BlobState blobState) { + recentBlobCache.put(blobKey, blobState); + } + @Override public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException { CloudFindToken inputToken = (CloudFindToken) token; @@ -274,10 +308,21 @@ public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) th public Set findMissingKeys(List keys) throws StoreException { checkStarted(); // Check existence of keys in cloud metadata - List blobIdList = keys.stream().map(key -> (BlobId) key).collect(Collectors.toList()); + List blobIdQueryList = keys.stream() + .filter(key -> !recentBlobCache.containsKey(key.getID())) + .map(key -> (BlobId) key) + .collect(Collectors.toList()); + if (blobIdQueryList.isEmpty()) { + // Cool, the cache did its job and eliminated a Cosmos query! + return Collections.emptySet(); + } try { - Set foundSet = cloudDestination.getBlobMetadata(blobIdList).keySet(); - return keys.stream().filter(key -> !foundSet.contains(key.getID())).collect(Collectors.toSet()); + Set foundSet = cloudDestination.getBlobMetadata(blobIdQueryList).keySet(); + // return input keys - cached keys - keys returned by query + return keys.stream() + .filter(key -> !foundSet.contains(key.getID())) + .filter(key -> !recentBlobCache.containsKey(key.getID())) + .collect(Collectors.toSet()); } catch (CloudStorageException ex) { throw new StoreException(ex, StoreErrorCodes.IOError); } @@ -291,10 +336,8 @@ public StoreStats getStoreStats() { @Override public boolean isKeyDeleted(StoreKey key) throws StoreException { checkStarted(); - // Return false for now, because we don't track recently deleted keys - // This way, the replica thread will replay the delete resulting in a no-op - // TODO: Consider LRU cache of recently deleted keys - return false; + // Not definitive, but okay for some deletes to be replayed. + return (BlobState.DELETED == recentBlobCache.get(key.getID())); } @Override @@ -310,6 +353,7 @@ public boolean isEmpty() { @Override public void shutdown() { + recentBlobCache.clear(); started = false; } @@ -341,6 +385,7 @@ public String toString() { return "PartitionId: " + partitionId.toPathString() + " in the cloud"; } + /** A {@link Write} implementation used by this store to write data. */ private class CloudWriteChannel implements Write { private final CloudBlobStore cloudBlobStore; private final List messageInfoList; @@ -383,4 +428,21 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep } } } + + /** + * A local LRU cache of recent blobs processed by this store. + */ + private class RecentBlobCache extends LinkedHashMap { + private final int maxEntries; + + public RecentBlobCache(int initialCapacity, int maxEntries) { + super(initialCapacity); + this.maxEntries = maxEntries; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return (this.size() > maxEntries); + } + } } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrMetrics.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrMetrics.java index 8f37dcde8d..be4491613e 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrMetrics.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrMetrics.java @@ -48,7 +48,7 @@ public VcrMetrics(MetricRegistry registry) { blobDecryptionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobDecryptionTime")); blobUploadSkippedCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobUploadSkippedCount")); updateTtlNotSetError = registry.counter(MetricRegistry.name(CloudBlobStore.class, "UpdateTtlNotSetError")); - blobCompactionTime = registry.timer(MetricRegistry.name(CloudStorageCompactor.class, "BlobCompactionTime")); + blobCompactionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobCompactionTime")); addPartitionErrorCount = registry.counter(MetricRegistry.name(CloudBackupManager.class, "AddPartitionErrorCount")); removePartitionErrorCount = registry.counter(MetricRegistry.name(CloudBackupManager.class, "RemovePartitionErrorCount")); diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java index b4a912c250..a971a0461d 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java @@ -43,7 +43,6 @@ import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MockMessageWriteSet; import com.github.ambry.store.MockStoreKeyConverterFactory; -import com.github.ambry.store.Store; import com.github.ambry.store.StoreErrorCodes; import com.github.ambry.store.StoreException; import com.github.ambry.store.StoreKey; @@ -82,7 +81,7 @@ */ public class CloudBlobStoreTest { - private Store store; + private CloudBlobStore store; private CloudDestination dest; private PartitionId partitionId; private ClusterMap clusterMap; @@ -168,6 +167,13 @@ private void testStorePuts(boolean requireEncryption) throws Exception { assertEquals("Unexpected blobs count", expectedUploads, inMemoryDest.getBlobsUploaded()); assertEquals("Unexpected byte count", expectedBytesUploaded, inMemoryDest.getBytesUploaded()); assertEquals("Unexpected encryption count", expectedEncryptions, vcrMetrics.blobEncryptionCount.getCount()); + + // Try to put the same blobs again (e.g. from another replica), should already be cached. + messageWriteSet.resetBuffers(); + store.put(messageWriteSet); + assertEquals("Unexpected blobs count", expectedUploads, inMemoryDest.getBlobsUploaded()); + assertEquals("Unexpected byte count", expectedBytesUploaded, inMemoryDest.getBytesUploaded()); + assertEquals("Unexpected skipped count", expectedUploads, vcrMetrics.blobUploadSkippedCount.getCount()); } /** Test the CloudBlobStore delete method. */ @@ -182,6 +188,10 @@ public void testStoreDeletes() throws Exception { } store.delete(messageWriteSet); verify(dest, times(count)).deleteBlob(any(BlobId.class), eq(operationTime)); + + // Call second time, should all be cached causing deletions to be skipped. + store.delete(messageWriteSet); + verify(dest, times(count)).deleteBlob(any(BlobId.class), eq(operationTime)); } /** Test the CloudBlobStore updateTtl method. */ @@ -197,6 +207,10 @@ public void testStoreTtlUpdates() throws Exception { } store.updateTtl(messageWriteSet); verify(dest, times(count)).updateBlobExpiration(any(BlobId.class), anyLong()); + + // Call second time, should all be cached causing updates to be skipped. + store.updateTtl(messageWriteSet); + verify(dest, times(count)).updateBlobExpiration(any(BlobId.class), anyLong()); } /** Test the CloudBlobStore findMissingKeys method. */ @@ -221,6 +235,15 @@ public void testFindMissingKeys() throws Exception { Set missingKeys = store.findMissingKeys(keys); verify(dest).getBlobMetadata(anyList()); assertEquals("Wrong number of missing keys", count, missingKeys.size()); + + // Add keys to cache and rerun (should be cached) + for (StoreKey storeKey : keys) { + store.addToCache(storeKey.getID(), CloudBlobStore.BlobState.CREATED); + } + missingKeys = store.findMissingKeys(keys); + assertTrue("Expected no missing keys", missingKeys.isEmpty()); + // getBlobMetadata should not have been called a second time. + verify(dest).getBlobMetadata(anyList()); } /** Test the CloudBlobStore findEntriesSince method. */ @@ -264,9 +287,8 @@ private List generateMetadataList(long startTime, long blobSi List metadataList = new ArrayList<>(); for (int j = 0; j < count; j++) { BlobId blobId = getUniqueId(); - CloudBlobMetadata metadata = - new CloudBlobMetadata(blobId, startTime, Utils.Infinite_Time, blobSize, CloudBlobMetadata.EncryptionOrigin.NONE, - null, null); + CloudBlobMetadata metadata = new CloudBlobMetadata(blobId, startTime, Utils.Infinite_Time, blobSize, + CloudBlobMetadata.EncryptionOrigin.NONE, null, null); metadata.setUploadTime(startTime + j); metadataList.add(metadata); } diff --git a/ambry-store/src/test/java/com.github.ambry.store/MockMessageWriteSet.java b/ambry-store/src/test/java/com.github.ambry.store/MockMessageWriteSet.java index da4e47e76a..ff9c844a2b 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/MockMessageWriteSet.java +++ b/ambry-store/src/test/java/com.github.ambry.store/MockMessageWriteSet.java @@ -69,6 +69,15 @@ public void add(MessageInfo info, ByteBuffer buffer) { buffers.add(buffer); } + /** + * Reset the buffers so they can be written again. + */ + public void resetBuffers() { + for (ByteBuffer buffer : buffers) { + buffer.flip(); + } + } + @Override public long writeTo(Write writeChannel) throws StoreException { if (exception != null) { From a41a8079fc64d766cdefc4a73a8c5e8fff10ac19 Mon Sep 17 00:00:00 2001 From: Rob Block Date: Mon, 5 Aug 2019 16:47:20 -0700 Subject: [PATCH 2/3] Address David's review comments. --- .../CloudBlobStore.java | 15 +-- .../CloudBlobStoreTest.java | 98 +++++++++++++++---- 2 files changed, 88 insertions(+), 25 deletions(-) diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java index 5c1d1ebb16..451c67ba5b 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java @@ -61,6 +61,8 @@ class CloudBlobStore implements Store { private static final Logger logger = LoggerFactory.getLogger(CloudBlobStore.class); + private static final int cacheInitialCapacity = 1000; + private static final float cacheLoadFactor = 0.75f; private final PartitionId partitionId; private final CloudDestination cloudDestination; private final ClusterMap clusterMap; @@ -69,9 +71,7 @@ class CloudBlobStore implements Store { private final VcrMetrics vcrMetrics; /** The lifecycle state of a recently seen blob. */ - enum BlobState { - CREATED, TTL_UPDATED, DELETED - } + enum BlobState {CREATED, TTL_UPDATED, DELETED} // Map blobId to state (created, ttlUpdated, deleted) private final Map recentBlobCache; @@ -98,7 +98,7 @@ enum BlobState { this.vcrMetrics = Objects.requireNonNull(vcrMetrics, "vcrMetrics is required"); minTtlMillis = TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays); requireEncryption = cloudConfig.vcrRequireEncryption; - recentBlobCache = Collections.synchronizedMap(new RecentBlobCache(1000, cloudConfig.recentBlobCacheLimit)); + recentBlobCache = Collections.synchronizedMap(new RecentBlobCache(cloudConfig.recentBlobCacheLimit)); String cryptoAgentFactoryClass = cloudConfig.cloudBlobCryptoAgentFactoryClass; try { @@ -430,13 +430,14 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep } /** - * A local LRU cache of recent blobs processed by this store. + * A local LRA cache of recent blobs processed by this store. */ private class RecentBlobCache extends LinkedHashMap { private final int maxEntries; - public RecentBlobCache(int initialCapacity, int maxEntries) { - super(initialCapacity); + public RecentBlobCache(int maxEntries) { + // Use access order for eviction + super(cacheInitialCapacity, cacheLoadFactor, true); this.maxEntries = maxEntries; } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java index a971a0461d..bd2489bbcb 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java @@ -90,6 +90,7 @@ public class CloudBlobStoreTest { private short refAccountId = 50; private short refContainerId = 100; private long operationTime = System.currentTimeMillis(); + private final int defaultCacheLimit = 1000; @Before public void setup() throws Exception { @@ -99,11 +100,13 @@ public void setup() throws Exception { /** * Setup the cloud blobstore. - * @param requireEncryption value of requireEncryption flag in CloudConfig. * @param inMemoryDestination whether to use in-memory cloud destination instead of mock + * @param requireEncryption value of requireEncryption flag in CloudConfig. + * @param cacheLimit size of the store's recent blob cache. * @param start whether to start the store. */ - private void setupCloudStore(boolean inMemoryDestination, boolean requireEncryption, boolean start) throws Exception { + private void setupCloudStore(boolean inMemoryDestination, boolean requireEncryption, int cacheLimit, boolean start) + throws Exception { Properties properties = new Properties(); // Required clustermap properties setBasicProperties(properties); @@ -111,6 +114,7 @@ private void setupCloudStore(boolean inMemoryDestination, boolean requireEncrypt properties.setProperty(CloudConfig.VCR_REQUIRE_ENCRYPTION, Boolean.toString(requireEncryption)); properties.setProperty(CloudConfig.CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS, TestCloudBlobCryptoAgentFactory.class.getName()); + properties.setProperty(CloudConfig.CLOUD_RECENT_BLOB_CACHE_LIMIT, String.valueOf(cacheLimit)); VerifiableProperties verifiableProperties = new VerifiableProperties(properties); dest = inMemoryDestination ? new LatchBasedInMemoryCloudDestination(Collections.emptyList()) : mock(CloudDestination.class); @@ -141,7 +145,7 @@ public void testStorePuts() throws Exception { } private void testStorePuts(boolean requireEncryption) throws Exception { - setupCloudStore(true, requireEncryption, true); + setupCloudStore(true, requireEncryption, defaultCacheLimit, true); // Put blobs with and without expiration and encryption MockMessageWriteSet messageWriteSet = new MockMessageWriteSet(); int count = 5; @@ -179,7 +183,7 @@ private void testStorePuts(boolean requireEncryption) throws Exception { /** Test the CloudBlobStore delete method. */ @Test public void testStoreDeletes() throws Exception { - setupCloudStore(false, true, true); + setupCloudStore(false, true, defaultCacheLimit, true); MockMessageWriteSet messageWriteSet = new MockMessageWriteSet(); int count = 10; for (int j = 0; j < count; j++) { @@ -197,7 +201,7 @@ public void testStoreDeletes() throws Exception { /** Test the CloudBlobStore updateTtl method. */ @Test public void testStoreTtlUpdates() throws Exception { - setupCloudStore(false, true, true); + setupCloudStore(false, true, defaultCacheLimit, true); MockMessageWriteSet messageWriteSet = new MockMessageWriteSet(); int count = 10; for (int j = 0; j < count; j++) { @@ -216,7 +220,7 @@ public void testStoreTtlUpdates() throws Exception { /** Test the CloudBlobStore findMissingKeys method. */ @Test public void testFindMissingKeys() throws Exception { - setupCloudStore(false, true, true); + setupCloudStore(false, true, defaultCacheLimit, true); int count = 10; List keys = new ArrayList<>(); Map metadataMap = new HashMap<>(); @@ -249,7 +253,7 @@ public void testFindMissingKeys() throws Exception { /** Test the CloudBlobStore findEntriesSince method. */ @Test public void testFindEntriesSince() throws Exception { - setupCloudStore(false, true, true); + setupCloudStore(false, true, defaultCacheLimit, true); long maxTotalSize = 1000000; // 1) start with empty token, call find, return some data long startTime = System.currentTimeMillis(); @@ -283,23 +287,48 @@ public void testFindEntriesSince() throws Exception { assertEquals(outputToken, finalToken); } - private List generateMetadataList(long startTime, long blobSize, int count) { - List metadataList = new ArrayList<>(); - for (int j = 0; j < count; j++) { - BlobId blobId = getUniqueId(); - CloudBlobMetadata metadata = new CloudBlobMetadata(blobId, startTime, Utils.Infinite_Time, blobSize, - CloudBlobMetadata.EncryptionOrigin.NONE, null, null); - metadata.setUploadTime(startTime + j); - metadataList.add(metadata); + /** Test CloudBlobStore cache eviction. */ + @Test + public void testCacheEvictionOrder() throws Exception { + // setup store with small cache size + int cacheSize = 10; + long blobSize = 10; + setupCloudStore(false, false, cacheSize, true); + // put blobs to fill up cache + List blobIdList = new ArrayList<>(); + for (int j = 0; j < cacheSize; j++) { + blobIdList.add(getUniqueId()); + store.addToCache(blobIdList.get(j).getID(), CloudBlobStore.BlobState.CREATED); } - return metadataList; + + // findMissingKeys should stay in cache + store.findMissingKeys(blobIdList); + verify(dest, never()).getBlobMetadata(anyList()); + // Perform access on first 5 blobs + MockMessageWriteSet messageWriteSet = new MockMessageWriteSet(); + for (int j = 0; j < 5; j++) { + addBlobToSet(messageWriteSet, (BlobId) blobIdList.get(j), blobSize, Utils.Infinite_Time); + } + store.updateTtl(messageWriteSet); + + // put 5 more blobs + for (int j = 10; j < 15; j++) { + blobIdList.add(getUniqueId()); + store.addToCache(blobIdList.get(j).getID(), CloudBlobStore.BlobState.CREATED); + } + // get same 1-5 which should be still cached. + store.findMissingKeys(blobIdList.subList(0, 5)); + verify(dest, never()).getBlobMetadata(anyList()); + // call findMissingKeys on 6-10 which should trigger getBlobMetadata + store.findMissingKeys(blobIdList.subList(5, 10)); + verify(dest).getBlobMetadata(anyList()); } /** Test verifying behavior when store not started. */ @Test public void testStoreNotStarted() throws Exception { // Create store and don't start it. - setupCloudStore(false, true, false); + setupCloudStore(false, true, defaultCacheLimit, false); List keys = Collections.singletonList(getUniqueId()); MockMessageWriteSet messageWriteSet = new MockMessageWriteSet(); addBlobToSet(messageWriteSet, 10, Utils.Infinite_Time, refAccountId, refContainerId, true); @@ -511,6 +540,25 @@ public void testPutWithTtl() throws Exception { } } + /** + * Utility method to generate a list of {@link CloudBlobMetadata} with a range of upload times. + * @param startTime the base time for the upload time range. + * @param blobSize the blob size. + * @param count the list size. + * @return the constructed list. + */ + private List generateMetadataList(long startTime, long blobSize, int count) { + List metadataList = new ArrayList<>(); + for (int j = 0; j < count; j++) { + BlobId blobId = getUniqueId(); + CloudBlobMetadata metadata = new CloudBlobMetadata(blobId, startTime, Utils.Infinite_Time, blobSize, + CloudBlobMetadata.EncryptionOrigin.NONE, null, null); + metadata.setUploadTime(startTime + j); + metadataList.add(metadata); + } + return metadataList; + } + /** * Utility method to generate a BlobId and byte buffer for a blob with specified properties and add them to the specified MessageWriteSet. * @param messageWriteSet the {@link MockMessageWriteSet} in which to store the data. @@ -520,7 +568,6 @@ public void testPutWithTtl() throws Exception { * @param containerId the container Id. * @param encrypted the encrypted bit. * @return the generated {@link BlobId}. - * @throws StoreException */ private BlobId addBlobToSet(MockMessageWriteSet messageWriteSet, long size, long expiresAtMs, short accountId, short containerId, boolean encrypted) { @@ -532,6 +579,21 @@ private BlobId addBlobToSet(MockMessageWriteSet messageWriteSet, long size, long return id; } + /** + * Utility method to add a BlobId and generated byte buffer to the specified MessageWriteSet. + * @param messageWriteSet the {@link MockMessageWriteSet} in which to store the data. + * @param blobId the blobId to add. + * @param size the size of the byte buffer. + * @param expiresAtMs the expiration time. + */ + private void addBlobToSet(MockMessageWriteSet messageWriteSet, BlobId blobId, long size, long expiresAtMs) { + long crc = random.nextLong(); + MessageInfo info = + new MessageInfo(blobId, size, false, true, expiresAtMs, crc, refAccountId, refContainerId, operationTime); + ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes((int) size)); + messageWriteSet.add(info, buffer); + } + /** * Utility method to generate a {@link BlobId} for the reference account and container. * @return the generated {@link BlobId}. From 76d9eb41a90a0f5cdd809476bd9e5b762d632f8c Mon Sep 17 00:00:00 2001 From: Rob Block Date: Tue, 6 Aug 2019 14:26:52 -0700 Subject: [PATCH 3/3] Move enum to end of class. --- .../main/java/com.github.ambry.cloud/CloudBlobStore.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java index 451c67ba5b..59f8a7d72f 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java @@ -70,9 +70,6 @@ class CloudBlobStore implements Store { private final CloudBlobCryptoAgent cryptoAgent; private final VcrMetrics vcrMetrics; - /** The lifecycle state of a recently seen blob. */ - enum BlobState {CREATED, TTL_UPDATED, DELETED} - // Map blobId to state (created, ttlUpdated, deleted) private final Map recentBlobCache; private final long minTtlMillis; @@ -429,6 +426,9 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep } } + /** The lifecycle state of a recently seen blob. */ + enum BlobState {CREATED, TTL_UPDATED, DELETED} + /** * A local LRA cache of recent blobs processed by this store. */