Skip to content

Commit

Permalink
address rob comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zzmao committed May 6, 2019
1 parent 6927479 commit dd38123
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private boolean shouldUpload(MessageInfo messageInfo) {
if (messageInfo.isDeleted()) {
return false;
}
// expiration time above threshold. Expired blobs are blocked by ReplicaThread.
// expiration time above threshold or ttlUpdate is set. Expired blobs are blocked by ReplicaThread.
return (messageInfo.getExpirationTimeInMs() == Utils.Infinite_Time
|| messageInfo.getExpirationTimeInMs() - messageInfo.getOperationTimeMs() >= minTtlMillis
|| messageInfo.isTtlUpdated());
Expand Down Expand Up @@ -208,12 +208,14 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
try {
for (MessageInfo msgInfo : messageSetToUpdate.getMessageSetInfo()) {
// MessageInfo.expirationTimeInMs is not reliable if ttlUpdate is set. See {@code PersistentIndex#findKey()}
// and {@code PersistentIndex#markAsPermanent()}.
// and {@code PersistentIndex#markAsPermanent()}. If we change updateTtl to be more flexible, code here will
// need to be modified.
if (msgInfo.isTtlUpdated()) {
BlobId blobId = (BlobId) msgInfo.getStoreKey();
cloudDestination.updateBlobExpiration(blobId, Utils.Infinite_Time);
} else {
logger.error("updateTtl() is called but msgInfo.isTtlUpdated is not set. msgInfo: {}", msgInfo);
vcrMetrics.updateTtlNotSetError.inc();
}
}
} catch (CloudStorageException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.github.ambry.cloud;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;

Expand All @@ -28,13 +27,15 @@ public class VcrMetrics {
public final Counter blobDecryptionCount;
public final Timer blobEncryptionTime;
public final Timer blobDecryptionTime;
public final Counter updateTtlNotSetError;

public VcrMetrics(MetricRegistry registry) {
this.registry = registry;
blobEncryptionCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobEncryptionCount"));
blobDecryptionCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobDecryptionCount"));
blobEncryptionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobEncryptionTime"));
blobDecryptionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobDecryptionTime"));
updateTtlNotSetError = registry.counter(MetricRegistry.name(CloudBlobStore.class, "UpdateTtlNotSetError"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public void testPutWithTtl() throws Exception {
hosts.put(remoteHost.dataNodeId, remoteHost);
MockConnectionPool connectionPool = new MockConnectionPool(hosts, clusterMap, 4);

// Generate BlobId for coming PUT.
// Generate BlobIds for following PUT.
short blobIdVersion = CommonTestUtils.getCurrentBlobIdVersion();
short accountId = Utils.getRandomShort(TestUtils.RANDOM);
short containerId = Utils.getRandomShort(TestUtils.RANDOM);
Expand All @@ -316,13 +316,8 @@ public void testPutWithTtl() throws Exception {

// Set up VCR
Properties props = new Properties();
props.setProperty("clustermap.host.name", "localhost");
props.setProperty("clustermap.resolve.hostnames", "false");
props.setProperty("clustermap.cluster.name", "clusterName");
props.setProperty("clustermap.datacenter.name", "DC1");
props.setProperty("clustermap.ssl.enabled.datacenters", "DC0,DC1");
setBasicProperties(props);
props.setProperty("clustermap.port", "12300");
props.setProperty("kms.default.container.key", TestUtils.getRandomKey(64));
props.setProperty("vcr.ssl.port", "12345");

ReplicationConfig replicationConfig = new ReplicationConfig(new VerifiableProperties(props));
Expand Down Expand Up @@ -358,17 +353,18 @@ public void testPutWithTtl() throws Exception {
new AtomicInteger(0), cloudDataNode, connectionPool, replicationConfig, replicationMetrics, null,
storeKeyConverter, transformer, clusterMap.getMetricRegistry(), false, cloudDataNode.getDatacenterName(),
new ResponseHandler(clusterMap), new MockTime());
List<List<RemoteReplicaInfo>> replicasToReplicateList = new ArrayList<>(replicasToReplicate.values());

long referenceTime = System.currentTimeMillis();
// Case 1: Put already expired. Replication happens after Put and after TtlUpdate.
// Upload to Cloud only after replicating ttlUpdate.
BlobId id = blobIdList.get(0);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime - 2000, referenceTime - 1000);
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertFalse("Blob should not exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 2: Put already expired. Replication happens after TtlUpdate.
Expand All @@ -377,18 +373,18 @@ public void testPutWithTtl() throws Exception {
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime - 2000, referenceTime - 1000);
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 3: Put TTL less than cloudConfig.vcrMinTtlDays. Replication happens after Put and after TtlUpdate.
// Upload to Cloud only after replicating ttlUpdate.
id = blobIdList.get(2);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime, referenceTime + TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays) - 1);
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertFalse("Blob should not exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 4: Put TTL less than cloudConfig.vcrMinTtlDays. Replication happens after TtlUpdate.
Expand All @@ -397,18 +393,18 @@ public void testPutWithTtl() throws Exception {
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime, referenceTime + TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays) - 1);
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 5: Put TTL greater than or equals to cloudConfig.vcrMinTtlDays. Replication happens after Put and after TtlUpdate.
// Upload to Cloud after Put and update ttl after TtlUpdate.
id = blobIdList.get(4);
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime, referenceTime + TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays));
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertTrue(latchBasedInMemoryCloudDestination.doesBlobExist(id));
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Case 6: Put TTL greater than or equals to cloudConfig.vcrMinTtlDays. Replication happens after TtlUpdate.
Expand All @@ -417,7 +413,7 @@ public void testPutWithTtl() throws Exception {
addPutMessagesToReplicasOfPartition(id, accountId, containerId, partitionId, Collections.singletonList(remoteHost),
referenceTime, referenceTime + TimeUnit.DAYS.toMillis(cloudConfig.vcrMinTtlDays));
addTtlUpdateMessagesToReplicasOfPartition(partitionId, id, Collections.singletonList(remoteHost));
replicaThread.replicate(new ArrayList<>(replicasToReplicate.values()));
replicaThread.replicate(replicasToReplicateList);
assertTrue("Blob should exist.", latchBasedInMemoryCloudDestination.doesBlobExist(id));

// Verify expiration time of all blobs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,4 @@ public boolean isEmpty() {
@Override
public void shutdown() throws StoreException {
}
}
}

0 comments on commit dd38123

Please sign in to comment.