Skip to content

Commit

Permalink
Blobs with TTL update should be uploaded to Cloud. (#1165)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzmao authored and lightningrob committed May 6, 2019
1 parent aaf8d92 commit 7f82832
Show file tree
Hide file tree
Showing 15 changed files with 1,169 additions and 878 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ private boolean shouldUpload(MessageInfo messageInfo) {
if (messageInfo.isDeleted()) {
return false;
}

// expiration time above threshold
// expiration time above threshold or ttlUpdate is set. Expired blobs are blocked by ReplicaThread.
return (messageInfo.getExpirationTimeInMs() == Utils.Infinite_Time
|| messageInfo.getExpirationTimeInMs() - messageInfo.getOperationTimeMs() >= minTtlMillis);
|| messageInfo.getExpirationTimeInMs() - messageInfo.getOperationTimeMs() >= minTtlMillis
|| messageInfo.isTtlUpdated());
}

@Override
Expand Down Expand Up @@ -218,8 +218,16 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
// Note: we skipped uploading the blob on PUT record if the TTL was below threshold.
try {
for (MessageInfo msgInfo : messageSetToUpdate.getMessageSetInfo()) {
BlobId blobId = (BlobId) msgInfo.getStoreKey();
cloudDestination.updateBlobExpiration(blobId, msgInfo.getExpirationTimeInMs());
// MessageInfo.expirationTimeInMs is not reliable if ttlUpdate is set. See {@code PersistentIndex#findKey()}
// and {@code PersistentIndex#markAsPermanent()}. If we change updateTtl to be more flexible, code here will
// need to be modified.
if (msgInfo.isTtlUpdated()) {
BlobId blobId = (BlobId) msgInfo.getStoreKey();
cloudDestination.updateBlobExpiration(blobId, Utils.Infinite_Time);
} else {
logger.error("updateTtl() is called but msgInfo.isTtlUpdated is not set. msgInfo: {}", msgInfo);
vcrMetrics.updateTtlNotSetError.inc();
}
}
} catch (CloudStorageException ex) {
throw new StoreException(ex, StoreErrorCodes.IOError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public HelixVcrClusterMetrics(MetricRegistry registry) {
partitionIdNotInClusterMapOnRemove =
registry.counter(MetricRegistry.name(HelixVcrCluster.class, "PartitionIdNotInClusterMapOnRemove"));
partitionIdNotInClusterMapOnAdd =
registry.counter(MetricRegistry.name(VcrServer.class, "PartitionIdNotInClusterMapOnAdd"));
registry.counter(MetricRegistry.name(HelixVcrCluster.class, "PartitionIdNotInClusterMapOnAdd"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.github.ambry.cloud;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;

Expand All @@ -31,6 +30,7 @@ public class VcrMetrics {
public final Timer blobEncryptionTime;
public final Timer blobDecryptionTime;
public final Counter blobUploadSkippedCount;
public final Counter updateTtlNotSetError;

public VcrMetrics(MetricRegistry registry) {
this.registry = registry;
Expand All @@ -41,6 +41,7 @@ public VcrMetrics(MetricRegistry registry) {
blobEncryptionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobEncryptionTime"));
blobDecryptionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobDecryptionTime"));
blobUploadSkippedCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobUploadSkippedCount"));
updateTtlNotSetError = registry.counter(MetricRegistry.name(CloudBlobStore.class, "UpdateTtlNotSetError"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,40 @@
package com.github.ambry.cloud;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMapUtils;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.MockClusterMap;
import com.github.ambry.clustermap.MockPartitionId;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.commons.BlobId;
import com.github.ambry.commons.BlobIdFactory;
import com.github.ambry.commons.CommonTestUtils;
import com.github.ambry.commons.ResponseHandler;
import com.github.ambry.config.CloudConfig;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.network.Port;
import com.github.ambry.network.PortType;
import com.github.ambry.replication.BlobIdTransformer;
import com.github.ambry.replication.MockConnectionPool;
import com.github.ambry.replication.MockFindToken;
import com.github.ambry.replication.MockHost;
import com.github.ambry.replication.RemoteReplicaInfo;
import com.github.ambry.replication.ReplicaThread;
import com.github.ambry.replication.ReplicationMetrics;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MockMessageWriteSet;
import com.github.ambry.store.MockStoreKeyConverterFactory;
import com.github.ambry.store.Store;
import com.github.ambry.store.StoreErrorCodes;
import com.github.ambry.store.StoreException;
import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.store.Transformer;
import com.github.ambry.utils.MockTime;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.TestUtils;
import com.github.ambry.utils.Utils;
import java.io.InputStream;
Expand All @@ -37,10 +60,13 @@
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;

import static com.github.ambry.commons.BlobId.*;
import static com.github.ambry.replication.ReplicationTest.*;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -112,7 +138,6 @@ private void testStorePuts(boolean requireEncryption) throws Exception {
// Put blobs with and without expiration and encryption
MockMessageWriteSet messageWriteSet = new MockMessageWriteSet();
int count = 5;
long expireTime = System.currentTimeMillis() + 10000;
int expectedUploads = 0;
int expectedEncryptions = 0;
for (int j = 0; j < count; j++) {
Expand All @@ -126,8 +151,6 @@ private void testStorePuts(boolean requireEncryption) throws Exception {
if (requireEncryption) {
expectedEncryptions++;
}
// Short TTL and encrypted, should not be uploaded nor encrypted
addBlobToSet(messageWriteSet, size, expireTime, refAccountId, refContainerId, true);
}
store.put(messageWriteSet);
verify(dest, times(expectedUploads)).uploadBlob(any(BlobId.class), anyLong(), any(CloudBlobMetadata.class),
Expand Down Expand Up @@ -255,6 +278,152 @@ public void testExceptionalDest() throws Exception {
}
}

/**
* Test PUT(with TTL) and TtlUpdate record replication.
* Replication may happen after PUT and after TtlUpdate, or after TtlUpdate only.
* PUT may already expired, expiration time < upload threshold or expiration time >= upload threshold.
* @throws Exception
*/
@Test
public void testPutWithTtl() throws Exception {
// Set up remote host
MockClusterMap clusterMap = new MockClusterMap();
MockHost remoteHost = getLocalAndRemoteHosts(clusterMap).getSecond();
List<PartitionId> partitionIds = clusterMap.getWritablePartitionIds(null);
PartitionId partitionId = partitionIds.get(0);
StoreKeyFactory storeKeyFactory = new BlobIdFactory(clusterMap);
MockStoreKeyConverterFactory storeKeyConverterFactory = new MockStoreKeyConverterFactory(null, null);
storeKeyConverterFactory.setConversionMap(new HashMap<>());
storeKeyConverterFactory.setReturnInputIfAbsent(true);
MockStoreKeyConverterFactory.MockStoreKeyConverter storeKeyConverter =
storeKeyConverterFactory.getStoreKeyConverter();
Transformer transformer = new BlobIdTransformer(storeKeyFactory, storeKeyConverter);
Map<DataNodeId, MockHost> hosts = new HashMap<>();
hosts.put(remoteHost.dataNodeId, remoteHost);
MockConnectionPool connectionPool = new MockConnectionPool(hosts, clusterMap, 4);

// Generate BlobIds for following PUT.
short blobIdVersion = CommonTestUtils.getCurrentBlobIdVersion();
short accountId = Utils.getRandomShort(TestUtils.RANDOM);
short containerId = Utils.getRandomShort(TestUtils.RANDOM);
boolean toEncrypt = TestUtils.RANDOM.nextBoolean();
List<BlobId> blobIdList = new ArrayList<>();
for (int i = 0; i < 6; i++) {
blobIdList.add(
new BlobId(blobIdVersion, BlobId.BlobIdType.NATIVE, ClusterMapUtils.UNKNOWN_DATACENTER_ID, accountId,
containerId, partitionId, toEncrypt, BlobId.BlobDataType.DATACHUNK));
}

// Set up VCR
Properties props = new Properties();
setBasicProperties(props);
props.setProperty("clustermap.port", "12300");
props.setProperty("vcr.ssl.port", "12345");

ReplicationConfig replicationConfig = new ReplicationConfig(new VerifiableProperties(props));
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
CloudConfig cloudConfig = new CloudConfig(new VerifiableProperties(props));
CloudDataNode cloudDataNode = new CloudDataNode(cloudConfig, clusterMapConfig);

LatchBasedInMemoryCloudDestination latchBasedInMemoryCloudDestination =
new LatchBasedInMemoryCloudDestination(blobIdList);
CloudReplica cloudReplica = new CloudReplica(cloudConfig, partitionId, cloudDataNode);
CloudBlobStore cloudBlobStore =
new CloudBlobStore(new VerifiableProperties(props), partitionId, latchBasedInMemoryCloudDestination,
new VcrMetrics(new MetricRegistry()));
cloudBlobStore.start();

// Prepare RemoteReplicaInfo for ReplicaThread.
Map<DataNodeId, List<RemoteReplicaInfo>> replicasToReplicate = null;
for (ReplicaId replica : partitionId.getReplicaIds()) {
if (replica.getDataNodeId() == remoteHost.dataNodeId) {
RemoteReplicaInfo remoteReplicaInfo =
new RemoteReplicaInfo(replica, cloudReplica, cloudBlobStore, new MockFindToken(0, 0), Long.MAX_VALUE,
SystemTime.getInstance(), new Port(remoteHost.dataNodeId.getPort(), PortType.PLAINTEXT));
replicasToReplicate =
Collections.singletonMap(remoteHost.dataNodeId, Collections.singletonList(remoteReplicaInfo));
break;
}
}
ReplicationMetrics replicationMetrics = new ReplicationMetrics(new MetricRegistry(), Collections.emptyList());
replicationMetrics.populatePerColoMetrics(Collections.singleton(remoteHost.dataNodeId.getDatacenterName()));

ReplicaThread replicaThread =
new ReplicaThread("threadtest", replicasToReplicate, new MockFindToken.MockFindTokenFactory(), clusterMap,
new AtomicInteger(0), cloudDataNode, connectionPool, replicationConfig, replicationMetrics, null,
storeKeyConverter, transformer, clusterMap.getMetricRegistry(), false, cloudDataNode.getDatacenterName(),
new ResponseHandler(clusterMap), new MockTime());
List<List<RemoteReplicaInfo>> replicasToReplicateList = new ArrayList<>(replicasToReplicate.values());

long referenceTime = System.currentTimeMillis();
// Case 1: Put already expired. Replication happens after Put and after TtlUpdate.
// Upload to Cloud only after replicating ttlUpdate.
BlobId id = blobIdList.get(0);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime - 2000, referenceTime - 1000);
replicaThread.replicate(replicasToReplicateList);
assertFalse("Blob should not exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 2: Put already expired. Replication happens after TtlUpdate.
// Upload to Cloud only after replicating ttlUpdate.
id = blobIdList.get(1);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime - 2000, referenceTime - 1000);
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 3: Put TTL less than cloudConfig.vcrMinTtlDays. Replication happens after Put and after TtlUpdate.
// Upload to Cloud only after replicating ttlUpdate.
id = blobIdList.get(2);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime, referenceTime + TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays) - 1);
replicaThread.replicate(replicasToReplicateList);
assertFalse("Blob should not exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 4: Put TTL less than cloudConfig.vcrMinTtlDays. Replication happens after TtlUpdate.
// Upload to Cloud only after replicating ttlUpdate.
id = blobIdList.get(3);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime, referenceTime + TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays) - 1);
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 5: Put TTL greater than or equals to cloudConfig.vcrMinTtlDays. Replication happens after Put and after TtlUpdate.
// Upload to Cloud after Put and update ttl after TtlUpdate.
id = blobIdList.get(4);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime, referenceTime + TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays));
replicaThread.replicate(replicasToReplicateList);
assertTrue(latchBasedInMemoryCloudDestination.doesBlobExist(id));
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 6: Put TTL greater than or equals to cloudConfig.vcrMinTtlDays. Replication happens after TtlUpdate.
// Upload to Cloud after TtlUpdate.
id = blobIdList.get(5);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime, referenceTime + TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays));
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Verify expiration time of all blobs.
Map<String, CloudBlobMetadata> map = latchBasedInMemoryCloudDestination.getBlobMetadata(blobIdList);
for (BlobId blobId : blobIdList) {
assertEquals("Blob ttl should be infinite now.", Utils.Infinite_Time,
map.get(blobId.toString()).getExpirationTime());
}
}

/**
* Utility method to generate a BlobId and byte buffer for a blob with specified properties and add them to the specified MessageWriteSet.
* @param messageWriteSet the {@link MockMessageWriteSet} in which to store the data.
Expand All @@ -270,7 +439,7 @@ private BlobId addBlobToSet(MockMessageWriteSet messageWriteSet, long size, long
short containerId, boolean encrypted) {
BlobId id = getUniqueId(accountId, containerId, encrypted);
long crc = random.nextLong();
MessageInfo info = new MessageInfo(id, size, false, false, expiresAtMs, crc, accountId, containerId, operationTime);
MessageInfo info = new MessageInfo(id, size, false, true, expiresAtMs, crc, accountId, containerId, operationTime);
ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes((int) size));
messageWriteSet.add(info, buffer);
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ public boolean deleteBlob(BlobId blobId, long deletionTime) throws CloudStorageE

@Override
public boolean updateBlobExpiration(BlobId blobId, long expirationTime) throws CloudStorageException {
return true;
if (map.containsKey(blobId)) {
map.get(blobId).getFirst().setExpirationTime(expirationTime);
return true;
} else {
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
/**
* A replica thread is responsible for handling replication for a set of partitions assigned to it
*/
class ReplicaThread implements Runnable {
public class ReplicaThread implements Runnable {

private final Map<DataNodeId, List<RemoteReplicaInfo>> replicasToReplicateGroupedByNode;
private final Set<PartitionId> replicationDisabledPartitions = new HashSet<>();
Expand Down Expand Up @@ -114,7 +114,7 @@ class ReplicaThread implements Runnable {

private volatile boolean allDisabled = false;

ReplicaThread(String threadName, Map<DataNodeId, List<RemoteReplicaInfo>> replicasToReplicateGroupedByNode,
public ReplicaThread(String threadName, Map<DataNodeId, List<RemoteReplicaInfo>> replicasToReplicateGroupedByNode,
FindTokenFactory findTokenFactory, ClusterMap clusterMap, AtomicInteger correlationIdGenerator,
DataNodeId dataNodeId, ConnectionPool connectionPool, ReplicationConfig replicationConfig,
ReplicationMetrics replicationMetrics, NotificationSystem notification, StoreKeyConverter storeKeyConverter,
Expand Down Expand Up @@ -232,7 +232,7 @@ public void run() {
* Replicas from the given replicas
* @param replicasToReplicate list of {@link RemoteReplicaInfo} by data node
*/
void replicate(List<List<RemoteReplicaInfo>> replicasToReplicate) {
public void replicate(List<List<RemoteReplicaInfo>> replicasToReplicate) {
// shuffle the nodes
Collections.shuffle(replicasToReplicate);
boolean allCaughtUp = true;
Expand Down Expand Up @@ -388,10 +388,10 @@ List<ExchangeMetadataResponse> exchangeMetadata(ConnectedChannel connectedChanne
replicaMetadataResponseInfo.getRemoteReplicaLagInBytes());
exchangeMetadataResponseList.add(exchangeMetadataResponse);
} catch (Exception e) {
replicationMetrics.updateLocalStoreError(remoteReplicaInfo.getReplicaId());
logger.error(
"Remote node: " + remoteNode + " Thread name: " + threadName + " Remote replica: " + remoteReplicaInfo
.getReplicaId(), e);
replicationMetrics.updateLocalStoreError(remoteReplicaInfo.getReplicaId());
responseHandler.onEvent(remoteReplicaInfo.getReplicaId(), e);
ExchangeMetadataResponse exchangeMetadataResponse =
new ExchangeMetadataResponse(ServerErrorCode.Unknown_Error);
Expand Down
Loading

0 comments on commit 7f82832

Please sign in to comment.