Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache of recently seen blobs in CloudBlobStore #1225

Merged
merged 4 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class CloudConfig {
public static final String CLOUD_DELETED_BLOB_RETENTION_DAYS = "cloud.deleted.blob.retention.days";
public static final String CLOUD_BLOB_COMPACTION_INTERVAL_HOURS = "cloud.blob.compaction.interval.hours";
public static final String CLOUD_BLOB_COMPACTION_QUERY_LIMIT = "cloud.blob.compaction.query.limit";
public static final String CLOUD_RECENT_BLOB_CACHE_LIMIT = "cloud.recent.blob.cache.limit";
public static final String VCR_ASSIGNED_PARTITIONS = "vcr.assigned.partitions";
public static final String VCR_PROXY_HOST = "vcr.proxy.host";
public static final String VCR_PROXY_PORT = "vcr.proxy.port";
Expand All @@ -49,6 +50,7 @@ public class CloudConfig {
public static final int DEFAULT_MIN_TTL_DAYS = 14;
public static final int DEFAULT_RETENTION_DAYS = 7;
public static final int DEFAULT_COMPACTION_QUERY_LIMIT = 100000;
public static final int DEFAULT_RECENT_BLOB_CACHE_LIMIT = 10000;
public static final int DEFAULT_VCR_PROXY_PORT = 3128;

/**
Expand Down Expand Up @@ -141,6 +143,7 @@ public class CloudConfig {
@Config(CLOUD_BLOB_COMPACTION_QUERY_LIMIT)
@Default("100000")
public final int cloudBlobCompactionQueryLimit;

/**
* The dead blob compaction interval in hours
*/
Expand All @@ -149,7 +152,15 @@ public class CloudConfig {
public final int cloudBlobCompactionIntervalHours;

/**
* The comma-separated list of statically assigned partitions. Optional.
* The max size of recently-accessed blob cache in each cloud blob store.
*/
@Config(CLOUD_RECENT_BLOB_CACHE_LIMIT)
@Default("10000")
public final int recentBlobCacheLimit;

/**
* The comma-separated list of statically assigned partitions.
* Used by static VCR cluster only.
*/
@Config(VCR_ASSIGNED_PARTITIONS)
@Default("null")
Expand Down Expand Up @@ -193,6 +204,7 @@ public CloudConfig(VerifiableProperties verifiableProperties) {
cloudBlobCompactionIntervalHours = verifiableProperties.getInt(CLOUD_BLOB_COMPACTION_INTERVAL_HOURS, 24);
cloudBlobCompactionQueryLimit =
verifiableProperties.getInt(CLOUD_BLOB_COMPACTION_QUERY_LIMIT, DEFAULT_COMPACTION_QUERY_LIMIT);
recentBlobCacheLimit = verifiableProperties.getInt(CLOUD_RECENT_BLOB_CACHE_LIMIT, DEFAULT_RECENT_BLOB_CACHE_LIMIT);

// Proxy settings
vcrProxyHost = verifiableProperties.getString(VCR_PROXY_HOST, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -56,6 +58,7 @@ public class CloudBackupManager extends ReplicationEngine {
private final VcrMetrics vcrMetrics;
private final VirtualReplicatorCluster virtualReplicatorCluster;
private final CloudStorageCompactor cloudStorageCompactor;
private final Map<String, Store> partitionStoreMap = new HashMap<>();

public CloudBackupManager(VerifiableProperties properties, CloudConfig cloudConfig,
ReplicationConfig replicationConfig, ClusterMapConfig clusterMapConfig, StoreConfig storeConfig,
Expand Down Expand Up @@ -116,6 +119,12 @@ public void onPartitionRemoved(PartitionId partitionId) {
vcrMetrics.removePartitionErrorCount.inc();
}
}
Store cloudStore = partitionStoreMap.get(partitionId.toPathString());
if (cloudStore != null) {
cloudStore.shutdown();
} else {
logger.warn("Store not found for partition {}", partitionId);
}
logger.info("Partition {} removed from {}", partitionId, dataNodeId);
// We don't close cloudBlobStore because because replicate in ReplicaThread is using a copy of
// remoteReplicaInfo which needs CloudBlobStore.
Expand Down Expand Up @@ -190,6 +199,7 @@ void addPartition(PartitionId partitionId) throws ReplicationException {
retList.add(partitionInfo);
return retList;
});
partitionStoreMap.put(partitionId.toPathString(), cloudStore);
} else {
try {
cloudStore.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -59,12 +61,17 @@
class CloudBlobStore implements Store {

private static final Logger logger = LoggerFactory.getLogger(CloudBlobStore.class);
private static final int cacheInitialCapacity = 1000;
private static final float cacheLoadFactor = 0.75f;
private final PartitionId partitionId;
private final CloudDestination cloudDestination;
private final ClusterMap clusterMap;
private final CloudBlobCryptoAgentFactory cryptoAgentFactory;
private final CloudBlobCryptoAgent cryptoAgent;
private final VcrMetrics vcrMetrics;

// Map blobId to state (created, ttlUpdated, deleted)
private final Map<String, BlobState> recentBlobCache;
private final long minTtlMillis;
private final boolean requireEncryption;
private boolean started;
Expand All @@ -88,6 +95,7 @@ class CloudBlobStore implements Store {
this.vcrMetrics = Objects.requireNonNull(vcrMetrics, "vcrMetrics is required");
minTtlMillis = TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays);
requireEncryption = cloudConfig.vcrRequireEncryption;
recentBlobCache = Collections.synchronizedMap(new RecentBlobCache(cloudConfig.recentBlobCacheLimit));

String cryptoAgentFactoryClass = cloudConfig.cloudBlobCryptoAgentFactoryClass;
try {
Expand Down Expand Up @@ -166,6 +174,7 @@ private void putBlob(MessageInfo messageInfo, ByteBuffer messageBuf, long size)
// If buffer was encrypted, we no longer know its size
long bufferlen = bufferChanged ? -1 : size;
cloudDestination.uploadBlob(blobId, bufferlen, blobMetadata, new ByteBufferInputStream(messageBuf));
addToCache(blobId.getID(), BlobState.CREATED);
} else {
logger.trace("Blob is skipped: {}", messageInfo);
vcrMetrics.blobUploadSkippedCount.inc();
Expand All @@ -191,6 +200,9 @@ private boolean shouldUpload(MessageInfo messageInfo) {
if (messageInfo.isDeleted()) {
return false;
}
if (recentBlobCache.containsKey(messageInfo.getStoreKey().getID())) {
return false;
}
// expiration time above threshold. Expired blobs are blocked by ReplicaThread.
return (messageInfo.getExpirationTimeInMs() == Utils.Infinite_Time
|| messageInfo.getExpirationTimeInMs() - messageInfo.getOperationTimeMs() >= minTtlMillis);
Expand All @@ -204,7 +216,12 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
try {
for (MessageInfo msgInfo : messageSetToDelete.getMessageSetInfo()) {
BlobId blobId = (BlobId) msgInfo.getStoreKey();
cloudDestination.deleteBlob(blobId, msgInfo.getOperationTimeMs());
String blobKey = msgInfo.getStoreKey().getID();
BlobState blobState = recentBlobCache.get(blobKey);
if (blobState != BlobState.DELETED) {
cloudDestination.deleteBlob(blobId, msgInfo.getOperationTimeMs());
addToCache(blobKey, BlobState.DELETED);
}
}
} catch (CloudStorageException ex) {
throw new StoreException(ex, StoreErrorCodes.IOError);
Expand All @@ -228,7 +245,11 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
// need to be modified.
if (msgInfo.isTtlUpdated()) {
BlobId blobId = (BlobId) msgInfo.getStoreKey();
cloudDestination.updateBlobExpiration(blobId, Utils.Infinite_Time);
BlobState blobState = recentBlobCache.get(blobId.getID());
if (blobState == null || blobState == BlobState.CREATED) {
cloudDestination.updateBlobExpiration(blobId, Utils.Infinite_Time);
addToCache(blobId.getID(), BlobState.TTL_UPDATED);
}
} else {
logger.error("updateTtl() is called but msgInfo.isTtlUpdated is not set. msgInfo: {}", msgInfo);
vcrMetrics.updateTtlNotSetError.inc();
Expand All @@ -239,6 +260,16 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}

/**
* Add a blob state mapping to the recent blob cache.
* @param blobKey the blob key to cache.
* @param blobState the state of the blob.
*/
// Visible for test
void addToCache(String blobKey, BlobState blobState) {
recentBlobCache.put(blobKey, blobState);
}

@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
CloudFindToken inputToken = (CloudFindToken) token;
Expand Down Expand Up @@ -274,10 +305,21 @@ public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) th
public Set<StoreKey> findMissingKeys(List<StoreKey> keys) throws StoreException {
checkStarted();
// Check existence of keys in cloud metadata
List<BlobId> blobIdList = keys.stream().map(key -> (BlobId) key).collect(Collectors.toList());
List<BlobId> blobIdQueryList = keys.stream()
.filter(key -> !recentBlobCache.containsKey(key.getID()))
.map(key -> (BlobId) key)
.collect(Collectors.toList());
if (blobIdQueryList.isEmpty()) {
// Cool, the cache did its job and eliminated a Cosmos query!
return Collections.emptySet();
}
try {
Set<String> foundSet = cloudDestination.getBlobMetadata(blobIdList).keySet();
return keys.stream().filter(key -> !foundSet.contains(key.getID())).collect(Collectors.toSet());
Set<String> foundSet = cloudDestination.getBlobMetadata(blobIdQueryList).keySet();
// return input keys - cached keys - keys returned by query
return keys.stream()
.filter(key -> !foundSet.contains(key.getID()))
.filter(key -> !recentBlobCache.containsKey(key.getID()))
.collect(Collectors.toSet());
} catch (CloudStorageException ex) {
throw new StoreException(ex, StoreErrorCodes.IOError);
}
Expand All @@ -291,10 +333,8 @@ public StoreStats getStoreStats() {
@Override
public boolean isKeyDeleted(StoreKey key) throws StoreException {
checkStarted();
// Return false for now, because we don't track recently deleted keys
// This way, the replica thread will replay the delete resulting in a no-op
// TODO: Consider LRU cache of recently deleted keys
return false;
// Not definitive, but okay for some deletes to be replayed.
return (BlobState.DELETED == recentBlobCache.get(key.getID()));
}

@Override
Expand All @@ -310,6 +350,7 @@ public boolean isEmpty() {

@Override
public void shutdown() {
recentBlobCache.clear();
started = false;
}

Expand Down Expand Up @@ -341,6 +382,7 @@ public String toString() {
return "PartitionId: " + partitionId.toPathString() + " in the cloud";
}

/** A {@link Write} implementation used by this store to write data. */
private class CloudWriteChannel implements Write {
private final CloudBlobStore cloudBlobStore;
private final List<MessageInfo> messageInfoList;
Expand Down Expand Up @@ -383,4 +425,25 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
}
}
}

/** The lifecycle state of a recently seen blob. */
enum BlobState {CREATED, TTL_UPDATED, DELETED}

/**
* A local LRA cache of recent blobs processed by this store.
*/
private class RecentBlobCache extends LinkedHashMap<String, BlobState> {
private final int maxEntries;

public RecentBlobCache(int maxEntries) {
// Use access order for eviction
super(cacheInitialCapacity, cacheLoadFactor, true);
this.maxEntries = maxEntries;
}

@Override
protected boolean removeEldestEntry(Map.Entry<String, BlobState> eldest) {
return (this.size() > maxEntries);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public VcrMetrics(MetricRegistry registry) {
blobDecryptionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobDecryptionTime"));
blobUploadSkippedCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobUploadSkippedCount"));
updateTtlNotSetError = registry.counter(MetricRegistry.name(CloudBlobStore.class, "UpdateTtlNotSetError"));
blobCompactionTime = registry.timer(MetricRegistry.name(CloudStorageCompactor.class, "BlobCompactionTime"));
blobCompactionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobCompactionTime"));
addPartitionErrorCount = registry.counter(MetricRegistry.name(CloudBackupManager.class, "AddPartitionErrorCount"));
removePartitionErrorCount =
registry.counter(MetricRegistry.name(CloudBackupManager.class, "RemovePartitionErrorCount"));
Expand Down
Loading