diff --git a/ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java b/ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java index 8d9661d634..d8640e383e 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java @@ -30,6 +30,7 @@ public class CloudConfig { public static final String VCR_MIN_TTL_DAYS = "vcr.min.ttl.days"; public static final String CLOUD_DELETED_BLOB_RETENTION_DAYS = "cloud.deleted.blob.retention.days"; public static final String CLOUD_BLOB_COMPACTION_INTERVAL_HOURS = "cloud.blob.compaction.interval.hours"; + public static final String CLOUD_BLOB_COMPACTION_QUERY_LIMIT = "cloud.blob.compaction.query.limit"; public static final String VCR_ASSIGNED_PARTITIONS = "vcr.assigned.partitions"; public static final String VCR_PROXY_HOST = "vcr.proxy.host"; public static final String VCR_PROXY_PORT = "vcr.proxy.port"; @@ -47,6 +48,7 @@ public class CloudConfig { public static final String DEFAULT_VCR_CLUSTER_NAME = "VCRCluster"; public static final int DEFAULT_MIN_TTL_DAYS = 14; public static final int DEFAULT_RETENTION_DAYS = 7; + public static final int DEFAULT_COMPACTION_QUERY_LIMIT = 100000; public static final int DEFAULT_VCR_PROXY_PORT = 3128; /** @@ -133,6 +135,12 @@ public class CloudConfig { @Default("7") public final int cloudDeletedBlobRetentionDays; + /** + * The result set limit to set on the dead blobs query used in compaction. + */ + @Config(CLOUD_BLOB_COMPACTION_QUERY_LIMIT) + @Default("100000") + public final int cloudBlobCompactionQueryLimit; /** * The dead blob compaction interval in hours */ @@ -180,8 +188,11 @@ public CloudConfig(VerifiableProperties verifiableProperties) { cloudBlobCryptoAgentFactoryClass = verifiableProperties.getString(CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS, DEFAULT_CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS); vcrMinTtlDays = verifiableProperties.getInt(VCR_MIN_TTL_DAYS, DEFAULT_MIN_TTL_DAYS); - cloudDeletedBlobRetentionDays = verifiableProperties.getInt(CLOUD_DELETED_BLOB_RETENTION_DAYS, DEFAULT_RETENTION_DAYS); + cloudDeletedBlobRetentionDays = + verifiableProperties.getInt(CLOUD_DELETED_BLOB_RETENTION_DAYS, DEFAULT_RETENTION_DAYS); cloudBlobCompactionIntervalHours = verifiableProperties.getInt(CLOUD_BLOB_COMPACTION_INTERVAL_HOURS, 24); + cloudBlobCompactionQueryLimit = + verifiableProperties.getInt(CLOUD_BLOB_COMPACTION_QUERY_LIMIT, DEFAULT_COMPACTION_QUERY_LIMIT); // Proxy settings vcrProxyHost = verifiableProperties.getString(VCR_PROXY_HOST, null); diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java index ff0c5b1bd7..99d67332cb 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBackupManager.java @@ -159,7 +159,7 @@ void addPartition(PartitionId partitionId) throws ReplicationException { } ReplicaId cloudReplica = new CloudReplica(cloudConfig, partitionId, virtualReplicatorCluster.getCurrentDataNodeId()); - Store cloudStore = new CloudBlobStore(properties, partitionId, cloudDestination, vcrMetrics); + Store cloudStore = new CloudBlobStore(properties, partitionId, cloudDestination, clusterMap, vcrMetrics); try { cloudStore.start(); } catch (StoreException e) { diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java index cf3598b082..035e240139 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java @@ -14,6 +14,7 @@ package com.github.ambry.cloud; import com.codahale.metrics.Timer; +import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.commons.BlobId; import com.github.ambry.config.CloudConfig; @@ -37,6 +38,8 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -58,6 +61,7 @@ class CloudBlobStore implements Store { private static final Logger logger = LoggerFactory.getLogger(CloudBlobStore.class); private final PartitionId partitionId; private final CloudDestination cloudDestination; + private final ClusterMap clusterMap; private final CloudBlobCryptoAgentFactory cryptoAgentFactory; private final CloudBlobCryptoAgent cryptoAgent; private final VcrMetrics vcrMetrics; @@ -70,14 +74,15 @@ class CloudBlobStore implements Store { * @param properties the {@link VerifiableProperties} to use. * @param partitionId partition associated with BlobStore. * @param cloudDestination the {@link CloudDestination} to use. + * @param clusterMap the {@link ClusterMap} to use. * @param vcrMetrics the {@link VcrMetrics} to use. * @throws IllegalStateException if construction failed. */ CloudBlobStore(VerifiableProperties properties, PartitionId partitionId, CloudDestination cloudDestination, - VcrMetrics vcrMetrics) throws IllegalStateException { - + ClusterMap clusterMap, VcrMetrics vcrMetrics) throws IllegalStateException { CloudConfig cloudConfig = new CloudConfig(properties); ClusterMapConfig clusterMapConfig = new ClusterMapConfig(properties); + this.clusterMap = clusterMap; this.cloudDestination = Objects.requireNonNull(cloudDestination, "cloudDestination is required"); this.partitionId = Objects.requireNonNull(partitionId, "partitionId is required"); this.vcrMetrics = Objects.requireNonNull(vcrMetrics, "vcrMetrics is required"); @@ -236,7 +241,33 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException @Override public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException { - throw new UnsupportedOperationException("Method not supported"); + CloudFindToken inputToken = (CloudFindToken) token; + try { + List results = + cloudDestination.findEntriesSince(partitionId.toPathString(), inputToken, maxTotalSizeOfEntries); + if (results.isEmpty()) { + return new FindInfo(Collections.emptyList(), inputToken); + } else { + List messageEntries = new ArrayList<>(); + for (CloudBlobMetadata metadata : results) { + BlobId blobId = new BlobId(metadata.getId(), clusterMap); + long operationTime = (metadata.getDeletionTime() > 0) ? metadata.getDeletionTime() + : (metadata.getCreationTime() > 0) ? metadata.getCreationTime() : metadata.getUploadTime(); + boolean isDeleted = metadata.getDeletionTime() > 0; + boolean isTtlUpdated = false; // No way to know + MessageInfo messageInfo = + new MessageInfo(blobId, metadata.getSize(), isDeleted, isTtlUpdated, metadata.getExpirationTime(), + (short) metadata.getAccountId(), (short) metadata.getContainerId(), operationTime); + messageEntries.add(messageInfo); + } + + // Build the new find token from the original one and the query results + CloudFindToken outputToken = CloudFindToken.getUpdatedToken(inputToken, results); + return new FindInfo(messageEntries, outputToken); + } + } catch (CloudStorageException | IOException ex) { + throw new StoreException(ex, StoreErrorCodes.IOError); + } } @Override diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudDestination.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudDestination.java index 41b4292dc2..61db2aaa0c 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudDestination.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudDestination.java @@ -72,6 +72,18 @@ boolean uploadBlob(BlobId blobId, long inputLength, CloudBlobMetadata cloudBlobM */ List getDeadBlobs(String partitionPath) throws CloudStorageException; + /** + * Returns a sequenced list of blobs in the specified partition, ordered by upload time starting from the + * specified time. + * @param partitionPath the partition to query. + * @param findToken the {@link CloudFindToken} specifying the boundary for the query. + * @param maxTotalSizeOfEntries the cumulative size limit for the list of blobs returned. + * @return a List of {@link CloudBlobMetadata} referencing the blobs returned by the query. + * @throws CloudStorageException + */ + List findEntriesSince(String partitionPath, CloudFindToken findToken, long maxTotalSizeOfEntries) + throws CloudStorageException; + /** * Permanently delete the specified blob in the cloud destination. * @param blobMetadata the {@link CloudBlobMetadata} referencing the blob to purge. diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudFindToken.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudFindToken.java new file mode 100644 index 0000000000..4180687af6 --- /dev/null +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudFindToken.java @@ -0,0 +1,130 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import com.github.ambry.store.FindToken; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + + +/** + * FindToken implementation used by the {@link CloudBlobStore}. + */ +public class CloudFindToken implements FindToken { + + static final short VERSION_0 = 0; + static final short CURRENT_VERSION = VERSION_0; + private final short version; + private final long latestUploadTime; + private final String latestBlobId; + private final long bytesRead; + + /** Constructor for start token */ + public CloudFindToken() { + this(0, null, 0); + } + + /** Constructor for in-progress token */ + public CloudFindToken(long latestUploadTime, String latestBlobId, long bytesRead) { + this.version = CURRENT_VERSION; + this.latestUploadTime = latestUploadTime; + this.latestBlobId = latestBlobId; + this.bytesRead = bytesRead; + } + + /** + * Utility to construct a new CloudFindToken from a previous instance and the results of a findEntriesSince query. + * @param prevToken the previous CloudFindToken. + * @param queryResults the results of a findEntriesSince query. + * @return the updated token. + */ + public static CloudFindToken getUpdatedToken(CloudFindToken prevToken, List queryResults) { + if (queryResults.isEmpty()) { + return prevToken; + } else { + CloudBlobMetadata lastResult = queryResults.get(queryResults.size() - 1); + long bytesReadThisQuery = queryResults.stream().mapToLong(CloudBlobMetadata::getSize).sum(); + return new CloudFindToken(lastResult.getUploadTime(), lastResult.getId(), prevToken.getBytesRead() + bytesReadThisQuery); + } + } + + @Override + public byte[] toBytes() { + byte[] buf = null; + switch (version) { + case VERSION_0: + int size = Short.BYTES + 2 * Long.BYTES; + buf = new byte[size]; + ByteBuffer bufWrap = ByteBuffer.wrap(buf); + // add version + bufWrap.putShort(version); + // add latestUploadTime + bufWrap.putLong(latestUploadTime); + // add bytesRead + bufWrap.putLong(bytesRead); + if (latestBlobId != null) { + bufWrap.putShort((short) latestBlobId.length()); + bufWrap.put(latestBlobId.getBytes()); + } else { + bufWrap.putShort((short) 0); + } + break; + default: + throw new IllegalStateException("Unknown version: " + version); + } + return buf; + } + + @Override + public long getBytesRead() { + return bytesRead; + } + + public String getLatestBlobId() { + return latestBlobId; + } + + public long getLatestUploadTime() { + return latestUploadTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CloudFindToken that = (CloudFindToken) o; + return version == that.version && latestUploadTime == that.latestUploadTime && bytesRead == that.bytesRead + && Objects.equals(latestBlobId, that.latestBlobId); + } + + @Override + public int hashCode() { + return Objects.hash(version, latestUploadTime, latestBlobId, bytesRead); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("version: ").append(version); + sb.append(" latestUploadTime: ").append(latestUploadTime); + sb.append(" latestBlobId: ").append(latestBlobId); + sb.append(" bytesRead: ").append(bytesRead); + return sb.toString(); + } +} diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java index 2fee54ec83..af17541956 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java @@ -16,6 +16,7 @@ import com.codahale.metrics.Timer; import com.github.ambry.cloud.CloudBlobMetadata; import com.github.ambry.cloud.CloudDestination; +import com.github.ambry.cloud.CloudFindToken; import com.github.ambry.cloud.CloudStorageException; import com.github.ambry.commons.BlobId; import com.github.ambry.config.CloudConfig; @@ -46,6 +47,7 @@ import java.net.Proxy; import java.net.URISyntaxException; import java.security.InvalidKeyException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -68,11 +70,23 @@ class AzureCloudDestination implements CloudDestination { private static final Logger logger = LoggerFactory.getLogger(AzureCloudDestination.class); private static final String THRESHOLD_PARAM = "@threshold"; + private static final String LIMIT_PARAM = "@limit"; + private static final String TIME_SINCE_PARAM = "@timesince"; private static final String BATCH_ID_QUERY_TEMPLATE = "SELECT * FROM c WHERE c.id IN (%s)"; + static final int ID_QUERY_BATCH_SIZE = 1000; static final String DEAD_BLOBS_QUERY_TEMPLATE = - "SELECT * FROM c WHERE (c." + CloudBlobMetadata.FIELD_DELETION_TIME + " BETWEEN 1 AND " + THRESHOLD_PARAM + ")" - + " OR (c." + CloudBlobMetadata.FIELD_EXPIRATION_TIME + " BETWEEN 1 AND " + THRESHOLD_PARAM + ")"; + "SELECT TOP " + LIMIT_PARAM + " * FROM c WHERE (c." + CloudBlobMetadata.FIELD_DELETION_TIME + " BETWEEN 1 AND " + + THRESHOLD_PARAM + ")" + " OR (c." + CloudBlobMetadata.FIELD_EXPIRATION_TIME + " BETWEEN 1 AND " + + THRESHOLD_PARAM + ")" + " ORDER BY c." + CloudBlobMetadata.FIELD_UPLOAD_TIME + " ASC"; + // Note: ideally would like to order by uploadTime and id, but Cosmos doesn't allow without composite index. + // It is unlikely (but not impossible) for two blobs in same partition to have the same uploadTime (would have to + // be multiple VCR's uploading same partition). We track the lastBlobId in the CloudFindToken and skip it if + // is returned in successive queries. + static final String ENTRIES_SINCE_QUERY_TEMPLATE = + "SELECT TOP " + LIMIT_PARAM + " * FROM c WHERE c." + CloudBlobMetadata.FIELD_UPLOAD_TIME + " >= " + + TIME_SINCE_PARAM + " ORDER BY c." + CloudBlobMetadata.FIELD_UPLOAD_TIME + " ASC"; private static final String SEPARATOR = "-"; + private static final int findSinceQueryLimit = 1000; private final CloudStorageAccount azureAccount; private final CloudBlobClient azureBlobClient; private final DocumentClient documentClient; @@ -81,6 +95,7 @@ class AzureCloudDestination implements CloudDestination { private final AzureMetrics azureMetrics; private final String clusterName; private final long retentionPeriodMs; + private final int deadBlobsQueryLimit; // Containers known to exist in the storage account private final Set knownContainers = ConcurrentHashMap.newKeySet(); @@ -115,6 +130,7 @@ class AzureCloudDestination implements CloudDestination { ConsistencyLevel.Session); cosmosDataAccessor = new CosmosDataAccessor(documentClient, azureCloudConfig, azureMetrics); this.retentionPeriodMs = TimeUnit.DAYS.toMillis(cloudConfig.cloudDeletedBlobRetentionDays); + this.deadBlobsQueryLimit = cloudConfig.cloudBlobCompactionQueryLimit; logger.info("Created Azure destination"); } @@ -134,6 +150,7 @@ class AzureCloudDestination implements CloudDestination { this.azureMetrics = azureMetrics; this.clusterName = clusterName; this.retentionPeriodMs = TimeUnit.DAYS.toMillis(CloudConfig.DEFAULT_RETENTION_DAYS); + this.deadBlobsQueryLimit = CloudConfig.DEFAULT_COMPACTION_QUERY_LIMIT; // Create a blob client to interact with Blob storage azureBlobClient = azureAccount.createCloudBlobClient(); @@ -251,14 +268,39 @@ public Map getBlobMetadata(List blobIds) thro if (blobIds.isEmpty()) { return Collections.emptyMap(); } + + // CosmosDB has query size limit of 256k chars. + // Break list into chunks if necessary to avoid overflow. + List metadataList; + if (blobIds.size() > ID_QUERY_BATCH_SIZE) { + metadataList = new ArrayList<>(); + for (int j = 0; j < blobIds.size() / ID_QUERY_BATCH_SIZE + 1; j++) { + int start = j * ID_QUERY_BATCH_SIZE; + if (start >= blobIds.size()) { + break; + } + int end = Math.min((j + 1) * ID_QUERY_BATCH_SIZE, blobIds.size()); + List someBlobIds = blobIds.subList(start, end); + metadataList.addAll(getBlobMetadataChunked(someBlobIds)); + } + } else { + metadataList = getBlobMetadataChunked(blobIds); + } + + return metadataList.stream().collect(Collectors.toMap(m -> m.getId(), Function.identity())); + } + + private List getBlobMetadataChunked(List blobIds) throws CloudStorageException { + if (blobIds.isEmpty() || blobIds.size() > ID_QUERY_BATCH_SIZE) { + throw new IllegalArgumentException("Invalid input list size: " + blobIds.size()); + } String quotedBlobIds = String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList())); String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds); String partitionPath = blobIds.get(0).getPartition().toPathString(); try { - List metadataList = - cosmosDataAccessor.queryMetadata(partitionPath, new SqlQuerySpec(query), azureMetrics.missingKeysQueryTime); - return metadataList.stream().collect(Collectors.toMap(m -> m.getId(), Function.identity())); + return cosmosDataAccessor.queryMetadata(partitionPath, new SqlQuerySpec(query), + azureMetrics.missingKeysQueryTime); } catch (DocumentClientException dex) { throw new CloudStorageException("Failed to query blob metadata for partition " + partitionPath, dex); } @@ -269,7 +311,8 @@ public List getDeadBlobs(String partitionPath) throws CloudSt long now = System.currentTimeMillis(); long retentionThreshold = now - retentionPeriodMs; SqlQuerySpec deadBlobsQuery = new SqlQuerySpec(DEAD_BLOBS_QUERY_TEMPLATE, - new SqlParameterCollection(new SqlParameter(THRESHOLD_PARAM, retentionThreshold))); + new SqlParameterCollection(new SqlParameter(LIMIT_PARAM, deadBlobsQueryLimit), + new SqlParameter(THRESHOLD_PARAM, retentionThreshold))); try { return cosmosDataAccessor.queryMetadata(partitionPath, deadBlobsQuery, azureMetrics.deadBlobsQueryTime); } catch (DocumentClientException dex) { @@ -277,6 +320,43 @@ public List getDeadBlobs(String partitionPath) throws CloudSt } } + @Override + public List findEntriesSince(String partitionPath, CloudFindToken findToken, + long maxTotalSizeOfEntries) throws CloudStorageException { + SqlQuerySpec entriesSinceQuery = new SqlQuerySpec(ENTRIES_SINCE_QUERY_TEMPLATE, + new SqlParameterCollection(new SqlParameter(LIMIT_PARAM, findSinceQueryLimit), + new SqlParameter(TIME_SINCE_PARAM, findToken.getLatestUploadTime()))); + try { + List results = + cosmosDataAccessor.queryMetadata(partitionPath, entriesSinceQuery, azureMetrics.findSinceQueryTime); + if (results.isEmpty()) { + return results; + } + long totalSize = 0; + List cappedResults = new ArrayList<>(); + for (CloudBlobMetadata metadata : results) { + // Skip the last blobId in the token + if (metadata.getId().equals(findToken.getLatestBlobId())) { + continue; + } + + // Cap results at max size + if (totalSize + metadata.getSize() > maxTotalSizeOfEntries) { + // If we have no results yet, must add at least one regardless of size + if (cappedResults.isEmpty()) { + cappedResults.add(metadata); + } + break; + } + cappedResults.add(metadata); + totalSize += metadata.getSize(); + } + return cappedResults; + } catch (DocumentClientException dex) { + throw new CloudStorageException("Failed to query blobs for partition " + partitionPath, dex); + } + } + /** * Update the metadata for the specified blob. * @param blobId The {@link BlobId} to update. diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java index 29cf0f7596..5496f62051 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java @@ -35,6 +35,7 @@ public class AzureMetrics { public static final String DOCUMENT_QUERY_COUNT = "DocumentQueryCount"; public static final String MISSING_KEYS_QUERY_TIME = "MissingKeysQueryTime"; public static final String DEAD_BLOBS_QUERY_TIME = "DeadBlobsQueryTime"; + public static final String FIND_SINCE_QUERY_TIME = "FindSinceQueryTime"; public static final String BLOB_UPDATE_ERROR_COUNT = "BlobUpdateErrorCount"; public static final String STORAGE_ERROR_COUNT = "StorageErrorCount"; public static final String DOCUMENT_ERROR_COUNT = "DocumentErrorCount"; @@ -63,6 +64,7 @@ public class AzureMetrics { public final Timer missingKeysQueryTime; public final Counter documentQueryCount; public final Timer deadBlobsQueryTime; + public final Timer findSinceQueryTime; public final Counter blobUpdateErrorCount; public final Counter storageErrorCount; public final Counter documentErrorCount; @@ -94,6 +96,7 @@ public AzureMetrics(MetricRegistry registry) { documentQueryCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_QUERY_COUNT)); missingKeysQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, MISSING_KEYS_QUERY_TIME)); deadBlobsQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DEAD_BLOBS_QUERY_TIME)); + findSinceQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, FIND_SINCE_QUERY_TIME)); blobUpdateErrorCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPDATE_ERROR_COUNT)); storageErrorCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, STORAGE_ERROR_COUNT)); documentErrorCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_ERROR_COUNT)); diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java index 1895f5554d..b4a912c250 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/CloudBlobStoreTest.java @@ -14,6 +14,7 @@ package com.github.ambry.cloud; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ClusterMapUtils; import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.MockClusterMap; @@ -37,6 +38,8 @@ import com.github.ambry.replication.RemoteReplicaInfo; import com.github.ambry.replication.ReplicaThread; import com.github.ambry.replication.ReplicationMetrics; +import com.github.ambry.store.FindInfo; +import com.github.ambry.store.FindToken; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MockMessageWriteSet; import com.github.ambry.store.MockStoreKeyConverterFactory; @@ -82,6 +85,7 @@ public class CloudBlobStoreTest { private Store store; private CloudDestination dest; private PartitionId partitionId; + private ClusterMap clusterMap; private VcrMetrics vcrMetrics; private Random random = new Random(); private short refAccountId = 50; @@ -91,6 +95,7 @@ public class CloudBlobStoreTest { @Before public void setup() throws Exception { partitionId = new MockPartitionId(); + clusterMap = new MockClusterMap(); } /** @@ -111,7 +116,7 @@ private void setupCloudStore(boolean inMemoryDestination, boolean requireEncrypt dest = inMemoryDestination ? new LatchBasedInMemoryCloudDestination(Collections.emptyList()) : mock(CloudDestination.class); vcrMetrics = new VcrMetrics(new MetricRegistry()); - store = new CloudBlobStore(verifiableProperties, partitionId, dest, vcrMetrics); + store = new CloudBlobStore(verifiableProperties, partitionId, dest, clusterMap, vcrMetrics); if (start) { store.start(); } @@ -218,6 +223,56 @@ public void testFindMissingKeys() throws Exception { assertEquals("Wrong number of missing keys", count, missingKeys.size()); } + /** Test the CloudBlobStore findEntriesSince method. */ + @Test + public void testFindEntriesSince() throws Exception { + setupCloudStore(false, true, true); + long maxTotalSize = 1000000; + // 1) start with empty token, call find, return some data + long startTime = System.currentTimeMillis(); + long blobSize = 200000; + int numBlobsFound = 5; + List metadataList = generateMetadataList(startTime, blobSize, numBlobsFound); + when(dest.findEntriesSince(anyString(), any(CloudFindToken.class), anyLong())).thenReturn(metadataList); + CloudFindToken startToken = new CloudFindToken(); + FindInfo findInfo = store.findEntriesSince(startToken, maxTotalSize); + assertEquals(numBlobsFound, findInfo.getMessageEntries().size()); + CloudFindToken outputToken = (CloudFindToken) findInfo.getFindToken(); + assertEquals(startTime + numBlobsFound - 1, outputToken.getLatestUploadTime()); + assertEquals(blobSize * numBlobsFound, outputToken.getBytesRead()); + assertEquals(metadataList.get(numBlobsFound - 1).getId(), outputToken.getLatestBlobId()); + + // 2) call find with new token, return more data including lastBlob, verify token updated + startTime += 1000; + metadataList = generateMetadataList(startTime, blobSize, numBlobsFound); + when(dest.findEntriesSince(anyString(), any(CloudFindToken.class), anyLong())).thenReturn(metadataList); + findInfo = store.findEntriesSince(outputToken, maxTotalSize); + outputToken = (CloudFindToken) findInfo.getFindToken(); + assertEquals(startTime + numBlobsFound - 1, outputToken.getLatestUploadTime()); + assertEquals(blobSize * 2 * numBlobsFound, outputToken.getBytesRead()); + assertEquals(metadataList.get(numBlobsFound - 1).getId(), outputToken.getLatestBlobId()); + + // 3) call find with new token, no more data, verify token unchanged + when(dest.findEntriesSince(anyString(), any(CloudFindToken.class), anyLong())).thenReturn(Collections.emptyList()); + findInfo = store.findEntriesSince(outputToken, maxTotalSize); + assertTrue(findInfo.getMessageEntries().isEmpty()); + FindToken finalToken = (CloudFindToken) findInfo.getFindToken(); + assertEquals(outputToken, finalToken); + } + + private List generateMetadataList(long startTime, long blobSize, int count) { + List metadataList = new ArrayList<>(); + for (int j = 0; j < count; j++) { + BlobId blobId = getUniqueId(); + CloudBlobMetadata metadata = + new CloudBlobMetadata(blobId, startTime, Utils.Infinite_Time, blobSize, CloudBlobMetadata.EncryptionOrigin.NONE, + null, null); + metadata.setUploadTime(startTime + j); + metadataList.add(metadata); + } + return metadataList; + } + /** Test verifying behavior when store not started. */ @Test public void testStoreNotStarted() throws Exception { @@ -260,7 +315,8 @@ public void testExceptionalDest() throws Exception { props.setProperty(CloudConfig.CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS, TestCloudBlobCryptoAgentFactory.class.getName()); vcrMetrics = new VcrMetrics(new MetricRegistry()); - CloudBlobStore exStore = new CloudBlobStore(new VerifiableProperties(props), partitionId, exDest, vcrMetrics); + CloudBlobStore exStore = + new CloudBlobStore(new VerifiableProperties(props), partitionId, exDest, clusterMap, vcrMetrics); exStore.start(); List keys = Collections.singletonList(getUniqueId()); MockMessageWriteSet messageWriteSet = new MockMessageWriteSet(); @@ -336,7 +392,7 @@ public void testPutWithTtl() throws Exception { new LatchBasedInMemoryCloudDestination(blobIdList); CloudReplica cloudReplica = new CloudReplica(cloudConfig, partitionId, cloudDataNode); CloudBlobStore cloudBlobStore = - new CloudBlobStore(new VerifiableProperties(props), partitionId, latchBasedInMemoryCloudDestination, + new CloudBlobStore(new VerifiableProperties(props), partitionId, latchBasedInMemoryCloudDestination, clusterMap, new VcrMetrics(new MetricRegistry())); cloudBlobStore.start(); diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/LatchBasedInMemoryCloudDestination.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/LatchBasedInMemoryCloudDestination.java index 35f61fe256..a4986f6076 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/LatchBasedInMemoryCloudDestination.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/LatchBasedInMemoryCloudDestination.java @@ -106,6 +106,11 @@ public List getDeadBlobs(String partitionPath) { return Collections.emptyList(); } + @Override + public List findEntriesSince(String partitionPath, CloudFindToken findToken, long maxTotalSizeOfEntries) { + return Collections.emptyList(); + } + @Override public boolean purgeBlob(CloudBlobMetadata blobMetadata) { return true; diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java index 206f77253a..fd339f811b 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java @@ -17,7 +17,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.ambry.cloud.CloudBlobMetadata; import com.github.ambry.cloud.CloudDestinationFactory; +import com.github.ambry.cloud.CloudFindToken; import com.github.ambry.cloud.CloudStorageException; +import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockPartitionId; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.commons.BlobId; @@ -43,11 +45,13 @@ import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.HttpURLConnection; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpHost; import org.junit.Before; @@ -80,6 +84,10 @@ public class AzureCloudDestinationTest { private DocumentClient mockumentClient; private AzureMetrics azureMetrics; private int blobSize = 1024; + byte dataCenterId = 66; + short accountId = 101; + short containerId = 5; + long partition = 666; private BlobId blobId; private long creationTime = System.currentTimeMillis(); private long deletionTime = creationTime + 10000; @@ -106,10 +114,7 @@ public void setup() throws Exception { when(mockResponse.getResource()).thenReturn(metadataDoc); when(mockumentClient.readDocument(anyString(), any(RequestOptions.class))).thenReturn(mockResponse); - byte dataCenterId = 66; - short accountId = 101; - short containerId = 5; - PartitionId partitionId = new MockPartitionId(); + PartitionId partitionId = new MockPartitionId(partition, MockClusterMap.DEFAULT_PARTITION_CLASS); blobId = new BlobId(BLOB_ID_V6, BlobIdType.NATIVE, dataCenterId, accountId, containerId, partitionId, false, BlobDataType.DATACHUNK); @@ -230,29 +235,109 @@ public void testUpdateNotExists() throws Exception { /** Test querying metadata. */ @Test public void testQueryMetadata() throws Exception { + int batchSize = AzureCloudDestination.ID_QUERY_BATCH_SIZE; + testQueryMetadata(0, 0); + testQueryMetadata(batchSize, 1); + testQueryMetadata(batchSize + 1, 2); + testQueryMetadata(batchSize * 2 - 1, 2); + testQueryMetadata(batchSize * 2, 2); + } + + /** + * Test metadata query for different input count. + * @param numBlobs the number of blobs to query. + * @param expectedQueries the number of internal queries made after batching. + * @throws Exception + */ + private void testQueryMetadata(int numBlobs, int expectedQueries) throws Exception { + // Reset metrics + azureMetrics = new AzureMetrics(new MetricRegistry()); + azureDest = new AzureCloudDestination(mockAzureAccount, mockumentClient, "foo", clusterName, azureMetrics); QueryIterable mockIterable = mock(QueryIterable.class); - CloudBlobMetadata inputMetadata = new CloudBlobMetadata(blobId, creationTime, Utils.Infinite_Time, blobSize, - CloudBlobMetadata.EncryptionOrigin.NONE, null, null); - List docList = Collections.singletonList(new Document(objectMapper.writeValueAsString(inputMetadata))); + List blobIdList = new ArrayList<>(); + List docList = new ArrayList<>(); + for (int j = 0; j < numBlobs; j++) { + BlobId blobId = generateBlobId(); + blobIdList.add(blobId); + CloudBlobMetadata inputMetadata = new CloudBlobMetadata(blobId, creationTime, Utils.Infinite_Time, blobSize, + CloudBlobMetadata.EncryptionOrigin.NONE, null, null); + docList.add(new Document(objectMapper.writeValueAsString(inputMetadata))); + } when(mockIterable.iterator()).thenReturn(docList.iterator()); FeedResponse feedResponse = mock(FeedResponse.class); when(feedResponse.getQueryIterable()).thenReturn(mockIterable); when(mockumentClient.queryDocuments(anyString(), any(SqlQuerySpec.class), any(FeedOptions.class))).thenReturn( feedResponse); - Map metadataMap = azureDest.getBlobMetadata(Collections.singletonList(blobId)); - assertEquals("Expected single entry", 1, metadataMap.size()); - CloudBlobMetadata outputMetadata = metadataMap.get(blobId.getID()); - assertEquals("Returned metadata does not match original", inputMetadata, outputMetadata); - assertEquals(1, azureMetrics.documentQueryCount.getCount()); - assertEquals(1, azureMetrics.missingKeysQueryTime.getCount()); + Map metadataMap = azureDest.getBlobMetadata(blobIdList); + assertEquals("Wrong map size", blobIdList.size(), metadataMap.size()); + for (BlobId blobId : blobIdList) { + assertEquals("Unexpected id in metadata", blobId.getID(), metadataMap.get(blobId.getID()).getId()); + } + assertEquals(expectedQueries, azureMetrics.documentQueryCount.getCount()); + assertEquals(expectedQueries, azureMetrics.missingKeysQueryTime.getCount()); + } - // Test getDeadBlobs + /** Test getDeadBlobs */ + @Test + public void testGetDeadBlobs() throws Exception { + QueryIterable mockIterable = mock(QueryIterable.class); + when(mockIterable.iterator()).thenReturn(Collections.emptyList().iterator()); + FeedResponse feedResponse = mock(FeedResponse.class); + when(feedResponse.getQueryIterable()).thenReturn(mockIterable); + when(mockumentClient.queryDocuments(anyString(), any(SqlQuerySpec.class), any(FeedOptions.class))).thenReturn( + feedResponse); List metadataList = azureDest.getDeadBlobs(blobId.getPartition().toPathString()); assertEquals("Expected no dead blobs", 0, metadataList.size()); - assertEquals(2, azureMetrics.documentQueryCount.getCount()); + assertEquals(1, azureMetrics.documentQueryCount.getCount()); assertEquals(1, azureMetrics.deadBlobsQueryTime.getCount()); } + /** Test findEntriesSince. */ + @Test + public void testFindEntriesSince() throws Exception { + + long chunkSize = 110000; + long maxTotalSize = 1000000; // between 9 and 10 chunks + long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); + + // create metadata list where total size > maxTotalSize + List docList = new ArrayList<>(); + List blobIdList = new ArrayList<>(); + for (int j = 0; j < 20; j++) { + BlobId blobId = generateBlobId(); + blobIdList.add(blobId.getID()); + CloudBlobMetadata inputMetadata = new CloudBlobMetadata(blobId, creationTime, Utils.Infinite_Time, chunkSize, + CloudBlobMetadata.EncryptionOrigin.NONE, null, null); + inputMetadata.setUploadTime(startTime + j); + docList.add(new Document(objectMapper.writeValueAsString(inputMetadata))); + } + QueryIterable mockIterable = mock(QueryIterable.class); + when(mockIterable.iterator()).thenReturn(docList.iterator()); + FeedResponse feedResponse = mock(FeedResponse.class); + when(feedResponse.getQueryIterable()).thenReturn(mockIterable); + when(mockumentClient.queryDocuments(anyString(), any(SqlQuerySpec.class), any(FeedOptions.class))).thenReturn( + feedResponse); + CloudFindToken findToken = new CloudFindToken(); + // Run the query + List firstResult = + azureDest.findEntriesSince(blobId.getPartition().toPathString(), findToken, maxTotalSize); + assertEquals("Did not get expected doc count", maxTotalSize / chunkSize, firstResult.size()); + + docList = docList.subList(firstResult.size(), docList.size()); + when(mockIterable.iterator()).thenReturn(docList.iterator()); + String lastBlobId = firstResult.get(firstResult.size() - 1).getId(); + findToken = new CloudFindToken(startTime, lastBlobId, maxTotalSize); + List secondResult = + azureDest.findEntriesSince(blobId.getPartition().toPathString(), findToken, maxTotalSize); + assertEquals("Unexpected doc count", maxTotalSize / chunkSize, secondResult.size()); + assertEquals("Unexpected first blobId", blobIdList.get(firstResult.size()), secondResult.get(0).getId()); + + // Rerun with max size below blob size, and make sure it returns one result + when(mockIterable.iterator()).thenReturn(docList.iterator()); + assertEquals("Expected one result", 1, + azureDest.findEntriesSince(blobId.getPartition().toPathString(), findToken, chunkSize / 2).size()); + } + /** Test blob existence check. */ @Test public void testExistenceCheck() throws Exception { @@ -442,6 +527,15 @@ private void expectCloudStorageException(TestUtils.ThrowingRunnable runnable, Cl } } + /** + * Utility method to generate a BlobId. + * @return a BlobId for the default attributes. + */ + private BlobId generateBlobId() { + return new BlobId(BLOB_ID_V6, BlobIdType.NATIVE, dataCenterId, accountId, containerId, blobId.getPartition(), false, + BlobDataType.DATACHUNK); + } + /** * Utility method to get blob input stream. * @param blobSize size of blob to consider. diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java index 8efda35699..ead4b4f834 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java @@ -16,6 +16,7 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.github.ambry.cloud.CloudBlobMetadata; +import com.github.ambry.cloud.CloudFindToken; import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockPartitionId; import com.github.ambry.clustermap.PartitionId; @@ -249,6 +250,56 @@ public void testPurgeDeadBlobs() throws Exception { cleanup(); } + /** + * Test findEntriesSince. + * @throws Exception on error + */ + @Test + public void testFindEntriesSince() throws Exception { + cleanup(); + + PartitionId partitionId = new MockPartitionId(testPartition, MockClusterMap.DEFAULT_PARTITION_CLASS); + String partitionPath = String.valueOf(testPartition); + + // Upload some blobs with different upload times + int blobCount = 90; + int chunkSize = 1000; + int maxTotalSize = 20000; + int expectedNumQueries = (blobCount * chunkSize) / maxTotalSize + 2; + + long now = System.currentTimeMillis(); + long startTime = now - TimeUnit.DAYS.toMillis(7); + for (int j = 0; j < blobCount; j++) { + BlobId blobId = + new BlobId(BLOB_ID_V6, BlobIdType.NATIVE, dataCenterId, accountId, containerId, partitionId, false, + BlobDataType.DATACHUNK); + InputStream inputStream = getBlobInputStream(chunkSize); + CloudBlobMetadata cloudBlobMetadata = new CloudBlobMetadata(blobId, startTime, Utils.Infinite_Time, chunkSize, + CloudBlobMetadata.EncryptionOrigin.VCR, vcrKmsContext, cryptoAgentFactory); + cloudBlobMetadata.setUploadTime(startTime + j * 1000); + assertTrue("Expected upload to return true", + azureDest.uploadBlob(blobId, blobSize, cloudBlobMetadata, inputStream)); + } + + CloudFindToken findToken = new CloudFindToken(); + // Call findEntriesSince in a loop until no new entries are returned + List results = Collections.emptyList(); + int numQueries = 0; + int totalBlobsReturned = 0; + do { + results = azureDest.findEntriesSince(partitionPath, findToken, maxTotalSize); + numQueries++; + totalBlobsReturned += results.size(); + findToken = CloudFindToken.getUpdatedToken(findToken, results); + } while (!results.isEmpty()); + + assertEquals("Wrong number of queries", expectedNumQueries, numQueries); + assertEquals("Wrong number of blobs", blobCount, totalBlobsReturned); + assertEquals("Wrong byte count", blobCount * chunkSize, findToken.getBytesRead()); + + cleanup(); + } + private void cleanup() throws Exception { String partitionPath = String.valueOf(testPartition); Timer dummyTimer = new Timer(); diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationEngine.java b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationEngine.java index ac00348220..bf5eb69e8c 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationEngine.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationEngine.java @@ -54,7 +54,7 @@ public abstract class ReplicationEngine { protected final ReplicationConfig replicationConfig; - private final ClusterMap clusterMap; + protected final ClusterMap clusterMap; protected final ScheduledExecutorService scheduler; private final AtomicInteger correlationIdGenerator; private final ConnectionPool connectionPool;