From efc5d8960ac737ea05d76664231c492a2b7f3825 Mon Sep 17 00:00:00 2001
From: Justin Lin
Date: Mon, 10 Feb 2020 11:04:09 -0800
Subject: [PATCH] Add undelete method to BlobStore (#1373)
Add undelete method to BlobStore.
Co-authored-by: David Harju
---
.../notification/NotificationSystem.java | 20 +
.../java/com.github.ambry/store/Store.java | 7 +
.../CloudBlobStore.java | 5 +
.../LoggingNotificationSystem.java | 18 +-
.../InMemoryStore.java | 5 +
.../NettyMessageProcessorTest.java | 10 +
.../com.github.ambry.server/MockCluster.java | 22 ++
.../MockStorageManager.java | 33 ++
.../StatsManagerTest.java | 6 +
.../com.github.ambry.store/BlobStore.java | 68 ++++
.../PersistentIndex.java | 44 ++-
.../com.github.ambry.store/StoreMetrics.java | 5 +
.../com.github.ambry.store/BlobStoreTest.java | 353 ++++++++++++++++--
build.gradle | 4 +-
14 files changed, 558 insertions(+), 42 deletions(-)
diff --git a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
index 3253287264..af42042bfd 100644
--- a/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
+++ b/ambry-api/src/main/java/com.github.ambry/notification/NotificationSystem.java
@@ -57,6 +57,16 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
*/
void onBlobDeleted(String blobId, String serviceId, Account account, Container container);
+ /**
+ * Notifies the underlying system when the blob is undeleted.
+ * @param blobId The id of the blob whose undeleted state has been replicated
+ * @param serviceId The service ID of the service undeleting the blob. This can be null if unknown.
+ * @param account The {@link Account} for the blob
+ * @param container The {@link Container} for the blob
+ */
+ default void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
+ }
+
/**
* Notifies the underlying system when a blob is replicated to a node
* @param sourceHost The source host from where the notification is being invoked
@@ -86,4 +96,14 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
*/
void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info);
+
+ /**
+ * Notifies the underlying system when a undeleted state of a blob is replicated to a node.
+ * @param sourceHost The source host from where the notification is being invoked
+ * @param port The port of the source host from where the notification is being invoked.
+ * @param blobId The id of the blob whose undeleted state has been replicated
+ * @param sourceType The source that undeleted the blob replica
+ */
+ default void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
+ }
}
diff --git a/ambry-api/src/main/java/com.github.ambry/store/Store.java b/ambry-api/src/main/java/com.github.ambry/store/Store.java
index a1075cb32a..170a444139 100644
--- a/ambry-api/src/main/java/com.github.ambry/store/Store.java
+++ b/ambry-api/src/main/java/com.github.ambry/store/Store.java
@@ -54,6 +54,13 @@ public interface Store {
*/
void delete(MessageWriteSet messageSetToDelete) throws StoreException;
+ /**
+ * Undelete the blob identified by {@code id}.
+ * @param info The {@link MessageInfo} that carries some basic information about this operation.
+ * @return the lifeVersion of the undeleted blob.
+ */
+ short undelete(MessageInfo info) throws StoreException;
+
/**
* Updates the TTL of all the messages that are part of the message set
* @param messageSetToUpdate The list of messages that need to be updated
diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java
index ef305b0b82..6e87a06ddc 100644
--- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java
+++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudBlobStore.java
@@ -364,6 +364,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
}
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ throw new UnsupportedOperationException("Undelete not supported in cloud store");
+ }
+
/**
* {@inheritDoc}
* Currently, the only supported operation is to set the TTL to infinite (i.e. no arbitrary increase or decrease)
diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java b/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java
index a2632ec762..6ebcec3fa3 100644
--- a/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java
+++ b/ambry-commons/src/main/java/com.github.ambry.commons/LoggingNotificationSystem.java
@@ -64,6 +64,15 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
: container.getName()) + ", containerId " + (container == null ? null : container.getId()));
}
+ @Override
+ public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
+ logger.debug("onBlobUndeleted " + blobId,
+ ", " + serviceId + ", accountName " + (account == null ? null : account.getName()) + ", accountId" + (
+ account == null ? null : account.getId()) + ", containerName " + (container == null ? null
+ : container.getName()) + ", containerId " + (container == null ? null : container.getId()));
+
+ }
+
@Override
public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
@@ -71,15 +80,20 @@ public void onBlobReplicaCreated(String sourceHost, int port, String blobId, Blo
@Override
public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
- logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
+ logger.debug("onBlobReplicaDeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
}
@Override
public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info) {
logger.debug(
- "onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType
+ "onBlobReplicaUpdated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType
+ ", " + info);
}
+
+ @Override
+ public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
+ logger.debug("onBlobReplicaUndeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
+ }
}
diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
index e2bcd27044..e0f4a7fad0 100644
--- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
+++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java
@@ -245,6 +245,11 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ throw new UnsupportedOperationException("Undelete unsupported for now");
+ }
+
@Override
public FindInfo findEntriesSince(FindToken token, long maxSizeOfEntries) throws StoreException {
// unused function
diff --git a/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java b/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java
index f23662be85..02606dc847 100644
--- a/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java
+++ b/ambry-rest/src/test/java/com.github.ambry.rest/NettyMessageProcessorTest.java
@@ -460,6 +460,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
throw new IllegalStateException("Not implemented");
}
+ @Override
+ public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
+ throw new IllegalStateException("Not implemented");
+ }
+
@Override
public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
throw new IllegalStateException("Not implemented");
@@ -476,6 +481,11 @@ public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, Blo
throw new IllegalStateException("Not implemented");
}
+ @Override
+ public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
+ throw new IllegalStateException("Not implemented");
+ }
+
@Override
public void close() {
// no op.
diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java
index eea298e21a..74cdac1b6f 100644
--- a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java
+++ b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java
@@ -373,6 +373,7 @@ class EventTracker {
private final int numberOfReplicas;
private final Helper creationHelper;
private final Helper deletionHelper;
+ private final Helper undeleteHelper;
private final ConcurrentMap updateHelpers = new ConcurrentHashMap<>();
/**
@@ -440,6 +441,7 @@ private String getKey(String host, int port) {
numberOfReplicas = expectedNumberOfReplicas;
creationHelper = new Helper();
deletionHelper = new Helper();
+ undeleteHelper = new Helper();
}
/**
@@ -460,6 +462,15 @@ void trackDeletion(String host, int port) {
deletionHelper.track(host, port);
}
+ /**
+ * Tracks the undelete event that arrived on {@code host}:{@code port}.
+ * @param host the host that received the undelete
+ * @param port the port of the host that describes the instance along with {@code host}.
+ */
+ void trackUndelete(String host, int port) {
+ undeleteHelper.track(host, port);
+ }
+
/**
* Tracks the update event of type {@code updateType} that arrived on {@code host}:{@code port}.
* @param host the host that received the update
@@ -564,6 +575,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
// ignore
}
+ @Override
+ public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
+ // ignore
+ }
+
@Override
public synchronized void onBlobReplicaCreated(String sourceHost, int port, String blobId,
BlobReplicaSourceType sourceType) {
@@ -585,6 +601,12 @@ public synchronized void onBlobReplicaUpdated(String sourceHost, int port, Strin
.trackUpdate(sourceHost, port, updateType);
}
+ @Override
+ public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
+ objectTracker.computeIfAbsent(blobId, k -> new EventTracker(getNumReplicas(blobId)))
+ .trackUndelete(sourceHost, port);
+ }
+
@Override
public void close() {
// ignore
diff --git a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java
index ea8bd54c6f..694a2298fe 100644
--- a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java
+++ b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java
@@ -24,6 +24,9 @@
import com.github.ambry.config.DiskManagerConfig;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
+import com.github.ambry.messageformat.MessageFormatInputStream;
+import com.github.ambry.messageformat.MessageFormatWriteSet;
+import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.protocol.RequestOrResponseType;
import com.github.ambry.replication.FindToken;
import com.github.ambry.replication.FindTokenHelper;
@@ -134,6 +137,31 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
messageSetToUpdate.getMessageSetInfo().stream().map(MessageInfo::getStoreKey).collect(Collectors.toList()));
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ operationReceived = RequestOrResponseType.UndeleteRequest;
+ try {
+ MessageFormatInputStream stream =
+ new UndeleteMessageFormatInputStream(info.getStoreKey(), info.getAccountId(), info.getContainerId(),
+ info.getOperationTimeMs(), (short) returnValueOfUndelete);
+ // Update info to add stream size;
+ info = new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(),
+ info.getOperationTimeMs());
+ ArrayList infoList = new ArrayList<>();
+ infoList.add(info);
+ messageWriteSetReceived = new MessageFormatWriteSet(stream, infoList, false);
+ } catch (Exception e) {
+ throw new StoreException("Unknown error while trying to undelete blobs from store", e,
+ StoreErrorCodes.Unknown_Error);
+ }
+ throwExceptionIfRequired();
+ checkValidityOfIds(messageWriteSetReceived.getMessageSetInfo()
+ .stream()
+ .map(MessageInfo::getStoreKey)
+ .collect(Collectors.toList()));
+ return returnValueOfUndelete;
+ }
+
@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
operationReceived = RequestOrResponseType.ReplicaMetadataRequest;
@@ -324,6 +352,11 @@ private void checkValidityOfIds(Collection extends StoreKey> ids) throws Store
* The return value for a call to {@link #removeBlobStore(PartitionId)}.
*/
boolean returnValueOfRemoveBlobStore = true;
+
+ /**
+ * The return value for a call to {@link TestStore#undelete(MessageInfo)}.
+ */
+ short returnValueOfUndelete = 1;
/**
* The {@link PartitionId} that was provided in the call to {@link #scheduleNextForCompaction(PartitionId)}
*/
diff --git a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java
index 111b74c3e9..24facc66c6 100644
--- a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java
+++ b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java
@@ -36,6 +36,7 @@
import com.github.ambry.replication.FindToken;
import com.github.ambry.replication.MockReplicationManager;
import com.github.ambry.store.FindInfo;
+import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageWriteSet;
import com.github.ambry.store.MockStoreKeyConverterFactory;
import com.github.ambry.store.StorageManager;
@@ -707,6 +708,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
throw new IllegalStateException("Not implemented");
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ throw new IllegalStateException("Not implemented");
+ }
+
@Override
public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException {
throw new IllegalStateException("Not implemented");
diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
index 0d816600ba..0746e1ecc4 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
@@ -18,6 +18,9 @@
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.config.StoreConfig;
+import com.github.ambry.messageformat.MessageFormatInputStream;
+import com.github.ambry.messageformat.MessageFormatWriteSet;
+import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.replication.FindToken;
import com.github.ambry.utils.FileLock;
import com.github.ambry.utils.Time;
@@ -612,6 +615,71 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}
+ @Override
+ public short undelete(MessageInfo info) throws StoreException {
+ checkStarted();
+ final Timer.Context context = metrics.undeleteResponse.time();
+ try {
+ StoreKey id = info.getStoreKey();
+ Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset();
+ List values = index.findAllIndexValuesForKey(id, null);
+ index.validateSanityForUndelete(id, values, IndexValue.LIFE_VERSION_FROM_FRONTEND);
+ IndexValue latestValue = values.get(0);
+ short lifeVersion = (short) (latestValue.getLifeVersion() + 1);
+ MessageFormatInputStream stream =
+ new UndeleteMessageFormatInputStream(id, info.getAccountId(), info.getContainerId(),
+ info.getOperationTimeMs(), lifeVersion);
+ // Update info to add stream size;
+ info =
+ new MessageInfo(id, stream.getSize(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs());
+ ArrayList infoList = new ArrayList<>();
+ infoList.add(info);
+ MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false);
+ if (!info.getStoreKey().isAccountContainerMatch(latestValue.getAccountId(), latestValue.getContainerId())) {
+ if (config.storeValidateAuthorization) {
+ throw new StoreException(
+ "UNDELETE authorization failure. Key: " + info.getStoreKey() + " Actually accountId: "
+ + latestValue.getAccountId() + "Actually containerId: " + latestValue.getContainerId(),
+ StoreErrorCodes.Authorization_Failure);
+ } else {
+ logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}",
+ info.getStoreKey(), latestValue.getAccountId(), latestValue.getContainerId());
+ metrics.undeleteAuthorizationFailureCount.inc();
+ }
+ }
+ synchronized (storeWriteLock) {
+ Offset currentIndexEndOffset = index.getCurrentEndOffset();
+ if (!currentIndexEndOffset.equals(indexEndOffsetBeforeCheck)) {
+ FileSpan fileSpan = new FileSpan(indexEndOffsetBeforeCheck, currentIndexEndOffset);
+ IndexValue value =
+ index.findKey(info.getStoreKey(), fileSpan, EnumSet.allOf(PersistentIndex.IndexEntryType.class));
+ if (value != null) {
+ throw new StoreException("Cannot undelete id " + info.getStoreKey() + " since concurrent operation occurs",
+ StoreErrorCodes.Life_Version_Conflict);
+ }
+ }
+ Offset endOffsetOfLastMessage = log.getEndOffset();
+ writeSet.writeTo(log);
+ logger.trace("Store : {} undelete mark written to log", dataDir);
+ FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize());
+ index.markAsUndeleted(info.getStoreKey(), fileSpan, info.getOperationTimeMs());
+ // TODO: update blobstore stats for undelete (2020-02-10)
+ }
+ onSuccess();
+ return lifeVersion;
+ } catch (StoreException e) {
+ if (e.getErrorCode() == StoreErrorCodes.IOError) {
+ onError();
+ }
+ throw e;
+ } catch (Exception e) {
+ throw new StoreException("Unknown error while trying to undelete blobs from store " + dataDir, e,
+ StoreErrorCodes.Unknown_Error);
+ } finally {
+ context.stop();
+ }
+ }
+
@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
checkStarted();
diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
index 4ed1045175..0b978423fe 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java
@@ -31,7 +31,6 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@@ -646,6 +645,19 @@ private IndexValue findKey(StoreKey key, FileSpan fileSpan, EnumSet findAllIndexValuesForKey(StoreKey key, FileSpan fileSpan) throws StoreException {
+ return findAllIndexValuesForKey(key, fileSpan, EnumSet.allOf(IndexEntryType.class), validIndexSegments);
+ }
+
/**
* Finds all the {@link IndexValue}s associated with the given {@code key} that matches any of the provided {@code types}
* if present in the index with the given {@code fileSpan} and return them in reversed chronological order. If there is
@@ -720,19 +732,19 @@ void validateSanityForUndelete(StoreKey key, List values, short life
// This is from recovery or replication, make sure the last value is a put and the first value's lifeVersion is strictly
// less than the given lifeVersion. We don't care about the first value's type, it can be a put, ttl_update or delete, it
// can even be an undelete.
- IndexValue firstValue = values.get(0);
- IndexValue lastValue = values.get(values.size() - 1);
- if (!lastValue.isPut()) {
+ IndexValue latestValue = values.get(0);
+ IndexValue oldestValue = values.get(values.size() - 1);
+ if (!oldestValue.isPut()) {
throw new StoreException("Id " + key + " requires first value to be a put in index " + dataDir,
StoreErrorCodes.ID_Deleted_Permanently);
}
- if (firstValue.getLifeVersion() >= lifeVersion) {
+ if (latestValue.getLifeVersion() >= lifeVersion) {
throw new StoreException(
- "LifeVersion conflict in index. Id " + key + " LifeVersion: " + firstValue.getLifeVersion()
+ "LifeVersion conflict in index. Id " + key + " LifeVersion: " + latestValue.getLifeVersion()
+ " Undelete LifeVersion: " + lifeVersion, StoreErrorCodes.Life_Version_Conflict);
}
- maybeChangeExpirationDate(lastValue, values);
- if (isExpired(lastValue)) {
+ maybeChangeExpirationDate(oldestValue, values);
+ if (isExpired(oldestValue)) {
throw new StoreException("Id " + key + " already expired in index " + dataDir, StoreErrorCodes.TTL_Expired);
}
}
@@ -763,25 +775,25 @@ void validateSanityForUndeleteWithoutLifeVersion(StoreKey key, List
StoreErrorCodes.ID_Undeleted);
}
}
- // First item has to be put and last item has to be a delete.
+ // Latest value has to be put and oldest value has to be a delete.
// PutRecord can't expire and delete record can't be older than the delete retention time.
- IndexValue firstValue = values.get(0);
- IndexValue lastValue = values.get(values.size() - 1);
- if (firstValue.isUndelete()) {
+ IndexValue latestValue = values.get(0);
+ IndexValue oldestValue = values.get(values.size() - 1);
+ if (latestValue.isUndelete()) {
throw new StoreException("Id " + key + " is already undeleted in index" + dataDir, StoreErrorCodes.ID_Undeleted);
}
- if (!lastValue.isPut() || !firstValue.isDelete()) {
+ if (!oldestValue.isPut() || !latestValue.isDelete()) {
throw new StoreException(
"Id " + key + " requires first value to be a put and last value to be a delete in index " + dataDir,
StoreErrorCodes.ID_Not_Deleted);
}
- if (firstValue.getOperationTimeInMs() + TimeUnit.DAYS.toMillis(config.storeDeletedMessageRetentionDays)
+ if (latestValue.getOperationTimeInMs() + TimeUnit.DAYS.toMillis(config.storeDeletedMessageRetentionDays)
< time.milliseconds()) {
throw new StoreException("Id " + key + " already permanently deleted in index " + dataDir,
StoreErrorCodes.ID_Deleted_Permanently);
}
- maybeChangeExpirationDate(lastValue, values);
- if (isExpired(lastValue)) {
+ maybeChangeExpirationDate(oldestValue, values);
+ if (isExpired(oldestValue)) {
throw new StoreException("Id " + key + " already expired in index " + dataDir, StoreErrorCodes.TTL_Expired);
}
}
diff --git a/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java b/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java
index 04c95f7878..8ba4bba6eb 100644
--- a/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java
+++ b/ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java
@@ -33,6 +33,7 @@ public class StoreMetrics {
public final Timer putResponse;
public final Timer deleteResponse;
public final Timer ttlUpdateResponse;
+ public final Timer undeleteResponse;
public final Timer findEntriesSinceResponse;
public final Timer findMissingKeysResponse;
public final Timer isKeyDeletedResponse;
@@ -71,6 +72,7 @@ public class StoreMetrics {
public final Counter getAuthorizationFailureCount;
public final Counter deleteAuthorizationFailureCount;
public final Counter ttlUpdateAuthorizationFailureCount;
+ public final Counter undeleteAuthorizationFailureCount;
public final Counter keyInFindEntriesAbsent;
public final Counter duplicateKeysInBatch;
public final Counter storeIoErrorTriggeredShutdownCount;
@@ -107,6 +109,7 @@ public StoreMetrics(String prefix, MetricRegistry registry) {
putResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StorePutResponse"));
deleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreDeleteResponse"));
ttlUpdateResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreTtlUpdateResponse"));
+ undeleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreUndeleteResponse"));
findEntriesSinceResponse =
registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreFindEntriesSinceResponse"));
findMissingKeysResponse =
@@ -163,6 +166,8 @@ public StoreMetrics(String prefix, MetricRegistry registry) {
registry.counter(MetricRegistry.name(BlobStore.class, name + "DeleteAuthorizationFailureCount"));
ttlUpdateAuthorizationFailureCount =
registry.counter(MetricRegistry.name(BlobStore.class, name + "TtlUpdateAuthorizationFailureCount"));
+ undeleteAuthorizationFailureCount =
+ registry.counter(MetricRegistry.name(BlobStore.class, name + "UndeleteAuthorizationFailureCount"));
keyInFindEntriesAbsent = registry.counter(MetricRegistry.name(BlobStore.class, name + "KeyInFindEntriesAbsent"));
duplicateKeysInBatch = registry.counter(MetricRegistry.name(BlobStore.class, name + "DuplicateKeysInBatch"));
storeIoErrorTriggeredShutdownCount =
diff --git a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java
index 94e6f41bd5..dd21c50b4b 100644
--- a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java
+++ b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java
@@ -19,6 +19,7 @@
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
+import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.replication.FindToken;
import com.github.ambry.utils.ByteBufferOutputStream;
import com.github.ambry.utils.MockTime;
@@ -59,6 +60,7 @@
import org.mockito.Mockito;
import static org.junit.Assert.*;
+import static org.junit.Assume.*;
import static org.mockito.Mockito.*;
@@ -77,15 +79,32 @@ public class BlobStoreTest {
}
}
+ private static final int MOCK_ID_STRING_LENGTH = 10;
+ private static final MockId randomMockId =
+ new MockId(UtilsTest.getRandomString(MOCK_ID_STRING_LENGTH), (short) 0, (short) 0);
// setupTestState() is coupled to these numbers. Changing them *will* cause setting test state or tests to fail.
- private static final long LOG_CAPACITY = 30000;
- private static final long SEGMENT_CAPACITY = 3000;
+ private static final long LOG_CAPACITY = 50000;
+ private static final long SEGMENT_CAPACITY = 5000;
private static final int MAX_IN_MEM_ELEMENTS = 5;
// deliberately do not divide the capacities perfectly.
private static final int PUT_RECORD_SIZE = 53;
private static final int DELETE_RECORD_SIZE = 29;
private static final int TTL_UPDATE_RECORD_SIZE = 37;
+ private static int UNDELETE_RECORD_SIZE;
+ static {
+ // Since undelete record is constructed in BlobStore, we can't set an arbitrary number as its record size.
+ // This static block constructs a UndeleteMessageFormatInputStream and returned its size. We have to make sure
+ // that the mock id's size is predefined and can't be changed while testing.
+ try {
+ UNDELETE_RECORD_SIZE =
+ (int) (new UndeleteMessageFormatInputStream(randomMockId, (short) 0, (short) 0, 0, (short) 0).getSize());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ static final int deleteRetentionDay = 1;
private static final byte[] DELETE_BUF = TestUtils.getRandomBytes(DELETE_RECORD_SIZE);
private static final byte[] TTL_UPDATE_BUF = TestUtils.getRandomBytes(TTL_UPDATE_RECORD_SIZE);
@@ -123,14 +142,18 @@ private static class CallableResult {
// Will be non-null only for GET.
final StoreInfo storeInfo;
- CallableResult(MockId id, StoreInfo storeInfo) {
+ // Will be non-null only for Undelete
+ final Short lifeVersion;
+
+ CallableResult(MockId id, StoreInfo storeInfo, Short lifeVersion) {
this.id = id;
this.storeInfo = storeInfo;
+ this.lifeVersion = lifeVersion;
}
}
// a static instance to return for Deleter::call() and TtlUpdater::call().
- private static final CallableResult EMPTY_RESULT = new CallableResult(null, null);
+ private static final CallableResult EMPTY_RESULT = new CallableResult(null, null, null);
/**
* Puts a blob and returns the {@link MockId} associated with it.
@@ -139,7 +162,7 @@ private class Putter implements Callable {
@Override
public CallableResult call() throws Exception {
- return new CallableResult(put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0), null);
+ return new CallableResult(put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0), null, null);
}
}
@@ -161,7 +184,7 @@ private class Getter implements Callable {
@Override
public CallableResult call() throws Exception {
- return new CallableResult(null, store.get(Collections.singletonList(id), storeGetOptions));
+ return new CallableResult(null, store.get(Collections.singletonList(id), storeGetOptions), null);
}
}
@@ -207,6 +230,25 @@ public CallableResult call() throws Exception {
}
}
+ /**
+ * Undelete a blob
+ */
+ private class Undeleter implements Callable {
+ final MockId id;
+
+ /**
+ * @param id the {@link MockId} to undelete.
+ */
+ Undeleter(MockId id) {
+ this.id = id;
+ }
+
+ @Override
+ public CallableResult call() throws Exception {
+ return new CallableResult(null, null, undelete(id));
+ }
+ }
+
// used by getUniqueId() to make sure keys are never regenerated in a single test run.
private final Set generatedKeys = Collections.newSetFromMap(new ConcurrentHashMap());
// A map of all the keys. The key is the MockId and the value is a Pair that contains the metadata and data of the
@@ -222,6 +264,11 @@ public CallableResult call() throws Exception {
private final Set liveKeys = Collections.newSetFromMap(new ConcurrentHashMap());
// Set of all keys that have had their TTLs updated
private final Set ttlUpdatedKeys = Collections.newSetFromMap(new ConcurrentHashMap());
+ // Set of all keys that have are deleted and should be compacted
+ private final Set deletedAndShouldBeCompactedKeys =
+ Collections.newSetFromMap(new ConcurrentHashMap());
+ // Set of all undeleted keys
+ private final Set undeletedKeys = Collections.newSetFromMap(new ConcurrentHashMap());
// Indicates whether the log is segmented
private final boolean isLogSegmented;
@@ -263,13 +310,14 @@ public static List
* Also tests GET with different combinations of {@link StoreGetOptions}.
* @throws InterruptedException
@@ -450,7 +502,7 @@ public void basicTest() throws InterruptedException, IOException, StoreException
expiredKeys.add(addedId);
// GET of all the keys implicitly tests the PUT, UPDATE and DELETE.
- // live keys
+ // live keys, included undeleted keys
StoreInfo storeInfo = store.get(new ArrayList<>(liveKeys), EnumSet.noneOf(StoreGetOptions.class));
checkStoreInfo(storeInfo, liveKeys);
@@ -557,6 +609,24 @@ public void concurrentTtlUpdateTest() throws Exception {
verifyTtlUpdateFutures(ttlUpdaters, futures);
}
+ /**
+ * Tests the case where there are many concurrent undelete.
+ */
+ @Test
+ public void concurrentUndeleteTest() throws Exception {
+ assumeTrue(isLogSegmented);
+ int extraBlobCount = 2000 / UNDELETE_RECORD_SIZE + 1;
+ List ids = put(extraBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time);
+ List undeleters = new ArrayList<>();
+ for (MockId id : ids) {
+ delete(id);
+ undeleters.add(new Undeleter(id));
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(undeleters.size());
+ List> futures = executorService.invokeAll(undeleters);
+ verifyUndeleteFutures(undeleters, futures);
+ }
+
/**
* Tests the case where there are concurrent PUTs, GETs and DELETEs.
* @throws Exception
@@ -568,11 +638,13 @@ public void concurrentAllTest() throws Exception {
for (int i = 0; i < putBlobCount; i++) {
putters.add(new Putter());
}
+ List> callables = new ArrayList>(putters);
List getters = new ArrayList<>(liveKeys.size());
for (MockId id : liveKeys) {
getters.add(new Getter(id, EnumSet.allOf(StoreGetOptions.class)));
}
+ callables.addAll(getters);
int deleteBlobCount = 1000 / PUT_RECORD_SIZE;
List idsToDelete = put(deleteBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time);
@@ -580,6 +652,7 @@ public void concurrentAllTest() throws Exception {
for (MockId id : idsToDelete) {
deleters.add(new Deleter(id));
}
+ callables.addAll(deleters);
int updateTtlBlobCount = 1000 / PUT_RECORD_SIZE;
List idsToUpdateTtl = put(updateTtlBlobCount, PUT_RECORD_SIZE, expiresAtMs);
@@ -587,12 +660,19 @@ public void concurrentAllTest() throws Exception {
for (MockId id : idsToUpdateTtl) {
ttlUpdaters.add(new TtlUpdater(id));
}
-
- List> callables = new ArrayList>(putters);
- callables.addAll(getters);
- callables.addAll(deleters);
callables.addAll(ttlUpdaters);
+ List undeleters = new ArrayList<>();
+ if (isLogSegmented) {
+ int undeleteBlobCount = 1000 / UNDELETE_RECORD_SIZE;
+ List ids = put(undeleteBlobCount, PUT_RECORD_SIZE, Utils.Infinite_Time);
+ for (MockId id : ids) {
+ delete(id);
+ undeleters.add(new Undeleter(id));
+ }
+ callables.addAll(undeleters);
+ }
+
ExecutorService executorService = Executors.newFixedThreadPool(callables.size());
List> futures = executorService.invokeAll(callables);
verifyPutFutures(putters, futures.subList(0, putters.size()));
@@ -601,6 +681,10 @@ public void concurrentAllTest() throws Exception {
futures.subList(putters.size() + getters.size(), putters.size() + getters.size() + deleters.size()));
verifyTtlUpdateFutures(ttlUpdaters,
futures.subList(putters.size() + getters.size() + deleters.size(), callables.size()));
+ if (isLogSegmented) {
+ verifyUndeleteFutures(undeleters,
+ futures.subList(putters.size() + getters.size() + deleters.size() + ttlUpdaters.size(), callables.size()));
+ }
}
/**
@@ -634,6 +718,10 @@ public void putErrorCasesTest() throws StoreException {
verifyPutFailure(expiredKeys.iterator().next(), StoreErrorCodes.Already_Exist);
// deleted
verifyPutFailure(deletedKeys.iterator().next(), StoreErrorCodes.Already_Exist);
+ // undeleted
+ if (isLogSegmented) {
+ verifyPutFailure(undeletedKeys.iterator().next(), StoreErrorCodes.Already_Exist);
+ }
// duplicates
MockId id = getUniqueId();
MessageInfo info =
@@ -671,6 +759,38 @@ public void deleteErrorCasesTest() throws StoreException {
}
}
+ /**
+ * Tests error cases for {@link BlobStore#undelete(MessageInfo)}.
+ * @throws StoreException
+ */
+ @Test
+ public void undeleteErrorCasesTest() throws Exception {
+ assumeTrue(isLogSegmented);
+ // Pick a live key that is not undeleted
+ MockId id = null;
+ for (MockId liveId : liveKeys) {
+ if (!undeletedKeys.contains(liveId)) {
+ id = liveId;
+ break;
+ }
+ }
+ assertNotNull("Should get a live id that are not undeleted", id);
+ verifyUndeleteFailure(id, StoreErrorCodes.ID_Not_Deleted);
+
+ // id already undeleted
+ verifyUndeleteFailure(undeletedKeys.iterator().next(), StoreErrorCodes.ID_Undeleted);
+
+ // id already deleted permanently
+ verifyUndeleteFailure(deletedAndShouldBeCompactedKeys.iterator().next(), StoreErrorCodes.ID_Deleted_Permanently);
+
+ // id already expired
+ id = put(1, PUT_RECORD_SIZE, time.seconds()).get(0);
+ verifyUndeleteFailure(id, StoreErrorCodes.ID_Not_Deleted);
+ delete(id);
+ time.sleep(2 * Time.MsPerSec);
+ verifyUndeleteFailure(id, StoreErrorCodes.TTL_Expired);
+ }
+
/**
* Test DELETE with valid accountId and containerId.
*/
@@ -790,12 +910,48 @@ public void ttlUpdateAuthorizationSuccessTest() throws Exception {
}
}
+ /**
+ * Test UNDELETE with valid accountId and containerId
+ * @throws Exception
+ */
+ @Test
+ public void undeleteAuthorizationSuccessTest() throws Exception {
+ assumeTrue(isLogSegmented);
+ short[] accountIds = {-1, Utils.getRandomShort(TestUtils.RANDOM), -1};
+ short[] containerIds = {-1, -1, Utils.getRandomShort(TestUtils.RANDOM)};
+ for (int i = 0; i < accountIds.length; i++) {
+ MockId mockId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time, accountIds[i], containerIds[i]).get(0);
+ delete(new MockId(mockId.getID(), mockId.getAccountId(), mockId.getContainerId()));
+ undelete(new MockId(mockId.getID(), mockId.getAccountId(), mockId.getContainerId()));
+ verifyUndeleteFailure(mockId, StoreErrorCodes.ID_Undeleted);
+ }
+ }
+
+ /**
+ * Test UNDELETE with invalid accountId/containerId. Failure is expected.
+ * @throws Exception
+ */
+ @Test
+ public void undeleteAuthorizationFailureTest() throws Exception {
+ assumeTrue(isLogSegmented);
+ MockId mockId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0);
+ delete(mockId);
+ short[] accountIds =
+ {-1, Utils.getRandomShort(TestUtils.RANDOM), -1, mockId.getAccountId(), Utils.getRandomShort(TestUtils.RANDOM)};
+ short[] containerIds = {-1, -1, Utils.getRandomShort(TestUtils.RANDOM), Utils.getRandomShort(TestUtils.RANDOM),
+ mockId.getContainerId()};
+ for (int i = 0; i < accountIds.length; i++) {
+ verifyUndeleteFailure(new MockId(mockId.getID(), accountIds[i], containerIds[i]),
+ StoreErrorCodes.Authorization_Failure);
+ }
+ }
+
/**
* Test various duplicate and collision cases for {@link BlobStore#put(MessageWriteSet)}
* @throws Exception
*/
@Test
- public void idCollisionTest() throws Exception {
+ public void putIdCollisionTest() throws Exception {
// Populate global lists of keys and crcs.
List allMockIdList = new ArrayList<>();
List allCrcList = new ArrayList<>();
@@ -928,6 +1084,9 @@ public void isKeyDeletedTest() throws StoreException {
for (MockId id : allKeys.keySet()) {
assertEquals("Returned state is not as expected", deletedKeys.contains(id), store.isKeyDeleted(id));
}
+ for (MockId id : deletedAndShouldBeCompactedKeys) {
+ assertTrue("Returned state is not as expected", store.isKeyDeleted(id));
+ }
// non existent id
try {
store.isKeyDeleted(getUniqueId());
@@ -997,7 +1156,7 @@ public void storeIoErrorCountTest() throws StoreException, IOException {
MessageInfo corruptedInfo = new MessageInfo(getUniqueId(), PUT_RECORD_SIZE, Utils.getRandomShort(TestUtils.RANDOM),
Utils.getRandomShort(TestUtils.RANDOM), Utils.Infinite_Time);
MessageInfo info1 =
- new MessageInfo(id1, PUT_RECORD_SIZE, 2 * 24 * 60 * 60 * 1000, id1.getAccountId(), id1.getContainerId(),
+ new MessageInfo(id1, PUT_RECORD_SIZE, 3 * 24 * 60 * 60 * 1000, id1.getAccountId(), id1.getContainerId(),
Utils.Infinite_Time);
MessageInfo info2 =
new MessageInfo(id2, PUT_RECORD_SIZE, id2.getAccountId(), id2.getContainerId(), Utils.Infinite_Time);
@@ -1359,7 +1518,7 @@ private MockId getUniqueId() {
private MockId getUniqueId(short accountId, short containerId) {
MockId id;
do {
- id = new MockId(UtilsTest.getRandomString(10), accountId, containerId);
+ id = new MockId(UtilsTest.getRandomString(MOCK_ID_STRING_LENGTH), accountId, containerId);
} while (generatedKeys.contains(id));
generatedKeys.add(id);
return id;
@@ -1424,17 +1583,41 @@ private List put(int count, long size, long expiresAtMs, short accountId
* @throws StoreException
*/
private MessageInfo delete(MockId idToDelete) throws StoreException {
+ return delete(idToDelete, time.milliseconds());
+ }
+
+ /**
+ * Deletes a blob with a given operationTimeMs
+ * @param idToDelete the {@link MockId} of the blob to DELETE.
+ * @param operationTimeMs the operationTimeMs in {@link MessageInfo}.
+ * @return the {@link MessageInfo} associated with the DELETE.
+ * @throws StoreException
+ */
+ private MessageInfo delete(MockId idToDelete, long operationTimeMs) throws StoreException {
MessageInfo putMsgInfo = allKeys.get(idToDelete).getFirst();
MessageInfo info =
new MessageInfo(idToDelete, DELETE_RECORD_SIZE, putMsgInfo.getAccountId(), putMsgInfo.getContainerId(),
- time.milliseconds());
+ operationTimeMs);
ByteBuffer buffer = ByteBuffer.wrap(DELETE_BUF);
store.delete(new MockMessageWriteSet(Collections.singletonList(info), Collections.singletonList(buffer)));
deletedKeys.add(idToDelete);
+ undeletedKeys.remove(idToDelete);
liveKeys.remove(idToDelete);
return info;
}
+ private short undelete(MockId idToUndelete) throws StoreException {
+ MessageInfo putMsgInfo = allKeys.get(idToUndelete).getFirst();
+ MessageInfo info =
+ new MessageInfo(idToUndelete, UNDELETE_RECORD_SIZE, putMsgInfo.getAccountId(), putMsgInfo.getContainerId(),
+ time.milliseconds());
+ short lifeVersion = store.undelete(info);
+ deletedKeys.remove(idToUndelete);
+ liveKeys.add(idToUndelete);
+ undeletedKeys.add(idToUndelete);
+ return lifeVersion;
+ }
+
/**
* Updates the TTL of a blob
* @param idToUpdate the {@link MockId} of the blob to update.
@@ -1473,6 +1656,8 @@ private void checkStoreInfo(StoreInfo storeInfo, Set expectedKeys) throw
assertEquals("ContainerId mismatch", expectedInfo.getContainerId(), messageInfo.getContainerId());
assertEquals("OperationTime mismatch", expectedInfo.getOperationTimeMs(), messageInfo.getOperationTimeMs());
assertEquals("isTTLUpdated not as expected", ttlUpdatedKeys.contains(id), messageInfo.isTtlUpdated());
+ assertEquals("isDeleted not as expected", deletedKeys.contains(id), messageInfo.isDeleted());
+ assertEquals("isUndeleted not as expected", undeletedKeys.contains(id), messageInfo.isUndeleted());
long expiresAtMs = ttlUpdatedKeys.contains(id) ? Utils.Infinite_Time : expectedInfo.getExpirationTimeInMs();
expiresAtMs = Utils.getTimeInMsToTheNearestSec(expiresAtMs);
assertEquals("Unexpected expiresAtMs in MessageInfo", expiresAtMs, messageInfo.getExpirationTimeInMs());
@@ -1510,14 +1695,16 @@ private void verifyGetFailure(MockId id, StoreErrorCodes expectedErrorCode) {
* individually. For understanding the created store, please read the source code which is annotated with comments.
* @param addTtlUpdates if {@code true}, adds ttl update entries (temporary until all components can handle TTL
* updates)
+ * @param addUndelete if {@code true}, adds undelete entries (temporary until all components can handle UNDELETE)
* @throws InterruptedException
* @throws StoreException
*/
- private void setupTestState(boolean addTtlUpdates) throws InterruptedException, StoreException {
+ private void setupTestState(boolean addTtlUpdates, boolean addUndelete) throws InterruptedException, StoreException {
long segmentCapacity = isLogSegmented ? SEGMENT_CAPACITY : LOG_CAPACITY;
properties.put("store.index.max.number.of.inmem.elements", Integer.toString(MAX_IN_MEM_ELEMENTS));
properties.put("store.segment.size.in.bytes", Long.toString(segmentCapacity));
properties.put("store.validate.authorization", "true");
+ properties.put("store.deleted.message.retention.days", Integer.toString(CuratedLogIndexState.deleteRetentionDay));
store = createBlobStore(getMockReplicaId(tempDirStr));
store.start();
// advance time by a second in order to be able to add expired keys and to avoid keys that are expired from
@@ -1606,10 +1793,15 @@ private void setupTestState(boolean addTtlUpdates) throws InterruptedException,
delete(idToDelete);
deletes++;
}
+ long sizeAdded = 0;
+ if (addUndelete) {
+ sizeAdded += addDeleteAndShouldCompactEntry();
+ sizeAdded += addCuratedUndeleteToLogSegment();
+ }
// 1 PUT entry that spans the rest of the data in the segment (upto a third of the segment size)
long size = sizeToWrite - (LogSegment.HEADER_SIZE + puts * PUT_RECORD_SIZE + deletes * DELETE_RECORD_SIZE
- + ttlUpdates * TTL_UPDATE_RECORD_SIZE);
+ + ttlUpdates * TTL_UPDATE_RECORD_SIZE) - sizeAdded;
addedId = put(1, size, Utils.Infinite_Time).get(0);
idsByLogSegment.get(2).add(addedId);
// the store counts the wasted space at the end of the second segment as "used capacity".
@@ -1622,6 +1814,76 @@ private void setupTestState(boolean addTtlUpdates) throws InterruptedException,
assertEquals("Store size not as expected", expectedStoreSize, store.getSizeInBytes());
}
+ /**
+ * Add a blob that will be deleted right away, the the deletes' operation time will be set to 0 so the delete would
+ * be fallen out of retention time.
+ * @throws StoreException
+ */
+ private long addDeleteAndShouldCompactEntry() throws StoreException {
+ // 1 Put entry and 1 delete that should be compacted
+ MockId addedId = put(1, PUT_RECORD_SIZE, Utils.Infinite_Time).get(0);
+ idsByLogSegment.get(2).add(addedId);
+ delete(addedId, (long) 0);
+ deletedAndShouldBeCompactedKeys.add(addedId);
+ return PUT_RECORD_SIZE + DELETE_RECORD_SIZE;
+ }
+
+ /**
+ * Add several undeleted blobs to cover some possible undeleted scenarios.
+ * @throws StoreException
+ */
+ private long addCuratedUndeleteToLogSegment() throws StoreException {
+ // Make sure we have these records
+ // 1. P, D -> U
+ // 2. P, T, D -> U
+ // 3. P, D, U, D -> U
+ // 4. P, D, U, T, D -> U
+
+ int puts = 0, ttls = 0, deletes = 0, undeletes = 0;
+ List ids = put(4, CuratedLogIndexState.PUT_RECORD_SIZE, Utils.Infinite_Time);
+ puts += 4;
+ for (MockId id : ids) {
+ idsByLogSegment.get(2).add(id);
+ }
+ MockId pd = ids.get(0);
+ MockId ptd = ids.get(1);
+ MockId pdud = ids.get(2);
+ MockId pdutd = ids.get(3);
+
+ // finish P, D
+ delete(pd);
+ deletes++;
+ // finish P, T, D
+ updateTtl(ptd);
+ ttls++;
+ delete(ptd);
+ deletes++;
+ // finish P, D, U, D
+ delete(pdud);
+ deletes++;
+ undelete(pdud);
+ undeletes++;
+ delete(pdud);
+ deletes++;
+ // finish P, D, U, T, D
+ delete(pdutd);
+ deletes++;
+ undelete(pdutd);
+ undeletes++;
+ updateTtl(pdutd);
+ ttls++;
+ delete(pdutd);
+ deletes++;
+
+ // add undelete to all of them
+ for (MockId id : ids) {
+ undelete(id);
+ }
+ undeletes += 4;
+ return puts * PUT_RECORD_SIZE + ttls * TTL_UPDATE_RECORD_SIZE + deletes * DELETE_RECORD_SIZE
+ + undeletes * UNDELETE_RECORD_SIZE;
+ }
+
/**
* Adds some curated data into the store in order to ensure a good mix for testing. For understanding the created
* store, please read the source code which is annotated with comments.
@@ -1980,6 +2242,24 @@ private void verifyTtlUpdateFutures(List ttlUpdaters, List undeleters, List> futures)
+ throws Exception {
+ for (int i = 0; i < undeleters.size(); i++) {
+ MockId id = undeleters.get(i).id;
+ Future future = futures.get(i);
+ future.get(1, TimeUnit.SECONDS);
+ verifyUndelete(id);
+ }
+ }
+
/**
* Verifies that {@code id} has been TTL updated
* @param id
@@ -1993,6 +2273,18 @@ private void verifyTtlUpdate(MockId id) throws Exception {
checkStoreInfo(storeInfo, Collections.singleton(id));
}
+ /**
+ * Verifies that {@code id} has undelete flag set to be true
+ * @param id
+ * @throws Exception
+ */
+ private void verifyUndelete(MockId id) throws Exception {
+ StoreInfo storeInfo = store.get(Collections.singletonList(id), EnumSet.noneOf(StoreGetOptions.class));
+ assertEquals("ID not as expected", id, storeInfo.getMessageReadSetInfo().get(0).getStoreKey());
+ assertTrue("Undelete flag not expected", storeInfo.getMessageReadSetInfo().get(0).isUndeleted());
+ checkStoreInfo(storeInfo, Collections.singleton(id));
+ }
+
// putErrorCasesTest() helpers
/**
@@ -2056,6 +2348,23 @@ private void verifyTtlUpdateFailure(MockId idToUpdate, long newExpiryTimeMs, Sto
}
}
+ /**
+ * Verifies that UNDELETE fails.
+ * @param idToUndelete the {@link MockId} to UNDELETE.
+ * @param expectedErrorCode the expected {@link StoreErrorCodes} for the failure.
+ */
+ private void verifyUndeleteFailure(MockId idToUndelete, StoreErrorCodes expectedErrorCode) {
+ MessageInfo info =
+ new MessageInfo(idToUndelete, UNDELETE_RECORD_SIZE, idToUndelete.getAccountId(), idToUndelete.getContainerId(),
+ time.milliseconds());
+ try {
+ store.undelete(info);
+ fail("Store UNDELETE should have failed for key " + idToUndelete);
+ } catch (StoreException e) {
+ assertEquals("Unexpected StoreErrorCode", expectedErrorCode, e.getErrorCode());
+ }
+ }
+
// shutdownTest() helpers
/**
diff --git a/build.gradle b/build.gradle
index 6be39c4646..0e3303edd5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -243,8 +243,8 @@ project(':ambry-server') {
project(':ambry-store') {
dependencies {
compile project(':ambry-api'),
- project(':ambry-utils')
- compile "io.dropwizard.metrics:metrics-core:$metricsVersion"
+ project(':ambry-utils'),
+ project(':ambry-messageformat')
compile "net.smacke:jaydio:$jaydioVersion"
testCompile project(':ambry-clustermap')
testCompile project(':ambry-clustermap').sourceSets.test.output