Skip to content

Commit

Permalink
Implement VCR findEntriesSince method and add batching to missing key…
Browse files Browse the repository at this point in the history
… query (#1216)

Implement findEntriesSince for CloudBlobStore and AzureCloudDestination
Add batching to missing keys query to prevent query length overflow
  • Loading branch information
lightningrob authored and zzmao committed Jul 19, 2019
1 parent db9a801 commit 5437b5f
Show file tree
Hide file tree
Showing 12 changed files with 503 additions and 30 deletions.
13 changes: 12 additions & 1 deletion ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class CloudConfig {
public static final String VCR_MIN_TTL_DAYS = "vcr.min.ttl.days";
public static final String CLOUD_DELETED_BLOB_RETENTION_DAYS = "cloud.deleted.blob.retention.days";
public static final String CLOUD_BLOB_COMPACTION_INTERVAL_HOURS = "cloud.blob.compaction.interval.hours";
public static final String CLOUD_BLOB_COMPACTION_QUERY_LIMIT = "cloud.blob.compaction.query.limit";
public static final String VCR_ASSIGNED_PARTITIONS = "vcr.assigned.partitions";
public static final String VCR_PROXY_HOST = "vcr.proxy.host";
public static final String VCR_PROXY_PORT = "vcr.proxy.port";
Expand All @@ -47,6 +48,7 @@ public class CloudConfig {
public static final String DEFAULT_VCR_CLUSTER_NAME = "VCRCluster";
public static final int DEFAULT_MIN_TTL_DAYS = 14;
public static final int DEFAULT_RETENTION_DAYS = 7;
public static final int DEFAULT_COMPACTION_QUERY_LIMIT = 100000;
public static final int DEFAULT_VCR_PROXY_PORT = 3128;

/**
Expand Down Expand Up @@ -133,6 +135,12 @@ public class CloudConfig {
@Default("7")
public final int cloudDeletedBlobRetentionDays;

/**
* The result set limit to set on the dead blobs query used in compaction.
*/
@Config(CLOUD_BLOB_COMPACTION_QUERY_LIMIT)
@Default("100000")
public final int cloudBlobCompactionQueryLimit;
/**
* The dead blob compaction interval in hours
*/
Expand Down Expand Up @@ -180,8 +188,11 @@ public CloudConfig(VerifiableProperties verifiableProperties) {
cloudBlobCryptoAgentFactoryClass = verifiableProperties.getString(CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS,
DEFAULT_CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS);
vcrMinTtlDays = verifiableProperties.getInt(VCR_MIN_TTL_DAYS, DEFAULT_MIN_TTL_DAYS);
cloudDeletedBlobRetentionDays = verifiableProperties.getInt(CLOUD_DELETED_BLOB_RETENTION_DAYS, DEFAULT_RETENTION_DAYS);
cloudDeletedBlobRetentionDays =
verifiableProperties.getInt(CLOUD_DELETED_BLOB_RETENTION_DAYS, DEFAULT_RETENTION_DAYS);
cloudBlobCompactionIntervalHours = verifiableProperties.getInt(CLOUD_BLOB_COMPACTION_INTERVAL_HOURS, 24);
cloudBlobCompactionQueryLimit =
verifiableProperties.getInt(CLOUD_BLOB_COMPACTION_QUERY_LIMIT, DEFAULT_COMPACTION_QUERY_LIMIT);

// Proxy settings
vcrProxyHost = verifiableProperties.getString(VCR_PROXY_HOST, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void addPartition(PartitionId partitionId) throws ReplicationException {
}
ReplicaId cloudReplica =
new CloudReplica(cloudConfig, partitionId, virtualReplicatorCluster.getCurrentDataNodeId());
Store cloudStore = new CloudBlobStore(properties, partitionId, cloudDestination, vcrMetrics);
Store cloudStore = new CloudBlobStore(properties, partitionId, cloudDestination, clusterMap, vcrMetrics);
try {
cloudStore.start();
} catch (StoreException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.cloud;

import com.codahale.metrics.Timer;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.CloudConfig;
Expand All @@ -37,6 +38,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
Expand All @@ -58,6 +61,7 @@ class CloudBlobStore implements Store {
private static final Logger logger = LoggerFactory.getLogger(CloudBlobStore.class);
private final PartitionId partitionId;
private final CloudDestination cloudDestination;
private final ClusterMap clusterMap;
private final CloudBlobCryptoAgentFactory cryptoAgentFactory;
private final CloudBlobCryptoAgent cryptoAgent;
private final VcrMetrics vcrMetrics;
Expand All @@ -70,14 +74,15 @@ class CloudBlobStore implements Store {
* @param properties the {@link VerifiableProperties} to use.
* @param partitionId partition associated with BlobStore.
* @param cloudDestination the {@link CloudDestination} to use.
* @param clusterMap the {@link ClusterMap} to use.
* @param vcrMetrics the {@link VcrMetrics} to use.
* @throws IllegalStateException if construction failed.
*/
CloudBlobStore(VerifiableProperties properties, PartitionId partitionId, CloudDestination cloudDestination,
VcrMetrics vcrMetrics) throws IllegalStateException {

ClusterMap clusterMap, VcrMetrics vcrMetrics) throws IllegalStateException {
CloudConfig cloudConfig = new CloudConfig(properties);
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(properties);
this.clusterMap = clusterMap;
this.cloudDestination = Objects.requireNonNull(cloudDestination, "cloudDestination is required");
this.partitionId = Objects.requireNonNull(partitionId, "partitionId is required");
this.vcrMetrics = Objects.requireNonNull(vcrMetrics, "vcrMetrics is required");
Expand Down Expand Up @@ -236,7 +241,33 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException

@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
throw new UnsupportedOperationException("Method not supported");
CloudFindToken inputToken = (CloudFindToken) token;
try {
List<CloudBlobMetadata> results =
cloudDestination.findEntriesSince(partitionId.toPathString(), inputToken, maxTotalSizeOfEntries);
if (results.isEmpty()) {
return new FindInfo(Collections.emptyList(), inputToken);
} else {
List<MessageInfo> messageEntries = new ArrayList<>();
for (CloudBlobMetadata metadata : results) {
BlobId blobId = new BlobId(metadata.getId(), clusterMap);
long operationTime = (metadata.getDeletionTime() > 0) ? metadata.getDeletionTime()
: (metadata.getCreationTime() > 0) ? metadata.getCreationTime() : metadata.getUploadTime();
boolean isDeleted = metadata.getDeletionTime() > 0;
boolean isTtlUpdated = false; // No way to know
MessageInfo messageInfo =
new MessageInfo(blobId, metadata.getSize(), isDeleted, isTtlUpdated, metadata.getExpirationTime(),
(short) metadata.getAccountId(), (short) metadata.getContainerId(), operationTime);
messageEntries.add(messageInfo);
}

// Build the new find token from the original one and the query results
CloudFindToken outputToken = CloudFindToken.getUpdatedToken(inputToken, results);
return new FindInfo(messageEntries, outputToken);
}
} catch (CloudStorageException | IOException ex) {
throw new StoreException(ex, StoreErrorCodes.IOError);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ boolean uploadBlob(BlobId blobId, long inputLength, CloudBlobMetadata cloudBlobM
*/
List<CloudBlobMetadata> getDeadBlobs(String partitionPath) throws CloudStorageException;

/**
* Returns a sequenced list of blobs in the specified partition, ordered by upload time starting from the
* specified time.
* @param partitionPath the partition to query.
* @param findToken the {@link CloudFindToken} specifying the boundary for the query.
* @param maxTotalSizeOfEntries the cumulative size limit for the list of blobs returned.
* @return a List of {@link CloudBlobMetadata} referencing the blobs returned by the query.
* @throws CloudStorageException
*/
List<CloudBlobMetadata> findEntriesSince(String partitionPath, CloudFindToken findToken, long maxTotalSizeOfEntries)
throws CloudStorageException;

/**
* Permanently delete the specified blob in the cloud destination.
* @param blobMetadata the {@link CloudBlobMetadata} referencing the blob to purge.
Expand Down
130 changes: 130 additions & 0 deletions ambry-cloud/src/main/java/com.github.ambry.cloud/CloudFindToken.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud;

import com.github.ambry.store.FindToken;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;


/**
* FindToken implementation used by the {@link CloudBlobStore}.
*/
public class CloudFindToken implements FindToken {

static final short VERSION_0 = 0;
static final short CURRENT_VERSION = VERSION_0;
private final short version;
private final long latestUploadTime;
private final String latestBlobId;
private final long bytesRead;

/** Constructor for start token */
public CloudFindToken() {
this(0, null, 0);
}

/** Constructor for in-progress token */
public CloudFindToken(long latestUploadTime, String latestBlobId, long bytesRead) {
this.version = CURRENT_VERSION;
this.latestUploadTime = latestUploadTime;
this.latestBlobId = latestBlobId;
this.bytesRead = bytesRead;
}

/**
* Utility to construct a new CloudFindToken from a previous instance and the results of a findEntriesSince query.
* @param prevToken the previous CloudFindToken.
* @param queryResults the results of a findEntriesSince query.
* @return the updated token.
*/
public static CloudFindToken getUpdatedToken(CloudFindToken prevToken, List<CloudBlobMetadata> queryResults) {
if (queryResults.isEmpty()) {
return prevToken;
} else {
CloudBlobMetadata lastResult = queryResults.get(queryResults.size() - 1);
long bytesReadThisQuery = queryResults.stream().mapToLong(CloudBlobMetadata::getSize).sum();
return new CloudFindToken(lastResult.getUploadTime(), lastResult.getId(), prevToken.getBytesRead() + bytesReadThisQuery);
}
}

@Override
public byte[] toBytes() {
byte[] buf = null;
switch (version) {
case VERSION_0:
int size = Short.BYTES + 2 * Long.BYTES;
buf = new byte[size];
ByteBuffer bufWrap = ByteBuffer.wrap(buf);
// add version
bufWrap.putShort(version);
// add latestUploadTime
bufWrap.putLong(latestUploadTime);
// add bytesRead
bufWrap.putLong(bytesRead);
if (latestBlobId != null) {
bufWrap.putShort((short) latestBlobId.length());
bufWrap.put(latestBlobId.getBytes());
} else {
bufWrap.putShort((short) 0);
}
break;
default:
throw new IllegalStateException("Unknown version: " + version);
}
return buf;
}

@Override
public long getBytesRead() {
return bytesRead;
}

public String getLatestBlobId() {
return latestBlobId;
}

public long getLatestUploadTime() {
return latestUploadTime;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CloudFindToken that = (CloudFindToken) o;
return version == that.version && latestUploadTime == that.latestUploadTime && bytesRead == that.bytesRead
&& Objects.equals(latestBlobId, that.latestBlobId);
}

@Override
public int hashCode() {
return Objects.hash(version, latestUploadTime, latestBlobId, bytesRead);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("version: ").append(version);
sb.append(" latestUploadTime: ").append(latestUploadTime);
sb.append(" latestBlobId: ").append(latestBlobId);
sb.append(" bytesRead: ").append(bytesRead);
return sb.toString();
}
}
Loading

0 comments on commit 5437b5f

Please sign in to comment.