Skip to content

Commit

Permalink
Add undelete method to BlobStore (#1373)
Browse files Browse the repository at this point in the history
Add undelete method to BlobStore.

Co-authored-by: David Harju <david.a.harju@gmail.com>
  • Loading branch information
justinlin-linkedin and dharju authored Feb 10, 2020
1 parent bc91abd commit efc5d89
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
*/
void onBlobDeleted(String blobId, String serviceId, Account account, Container container);

/**
* Notifies the underlying system when the blob is undeleted.
* @param blobId The id of the blob whose undeleted state has been replicated
* @param serviceId The service ID of the service undeleting the blob. This can be null if unknown.
* @param account The {@link Account} for the blob
* @param container The {@link Container} for the blob
*/
default void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
}

/**
* Notifies the underlying system when a blob is replicated to a node
* @param sourceHost The source host from where the notification is being invoked
Expand Down Expand Up @@ -86,4 +96,14 @@ void onBlobCreated(String blobId, BlobProperties blobProperties, Account account
*/
void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info);

/**
* Notifies the underlying system when a undeleted state of a blob is replicated to a node.
* @param sourceHost The source host from where the notification is being invoked
* @param port The port of the source host from where the notification is being invoked.
* @param blobId The id of the blob whose undeleted state has been replicated
* @param sourceType The source that undeleted the blob replica
*/
default void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
}
}
7 changes: 7 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public interface Store {
*/
void delete(MessageWriteSet messageSetToDelete) throws StoreException;

/**
* Undelete the blob identified by {@code id}.
* @param info The {@link MessageInfo} that carries some basic information about this operation.
* @return the lifeVersion of the undeleted blob.
*/
short undelete(MessageInfo info) throws StoreException;

/**
* Updates the TTL of all the messages that are part of the message set
* @param messageSetToUpdate The list of messages that need to be updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
}
}

@Override
public short undelete(MessageInfo info) throws StoreException {
throw new UnsupportedOperationException("Undelete not supported in cloud store");
}

/**
* {@inheritDoc}
* Currently, the only supported operation is to set the TTL to infinite (i.e. no arbitrary increase or decrease)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,36 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
: container.getName()) + ", containerId " + (container == null ? null : container.getId()));
}

@Override
public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
logger.debug("onBlobUndeleted " + blobId,
", " + serviceId + ", accountName " + (account == null ? null : account.getName()) + ", accountId" + (
account == null ? null : account.getId()) + ", containerName " + (container == null ? null
: container.getName()) + ", containerId " + (container == null ? null : container.getId()));

}

@Override
public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
}

@Override
public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
logger.debug("onBlobReplicaDeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
}

@Override
public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info) {
logger.debug(
"onBlobReplicaCreated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType
"onBlobReplicaUpdated " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType + ", " + updateType
+ ", " + info);
}

@Override
public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaUndeleted " + sourceHost + ", " + port + ", " + blobId + ", " + sourceType);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}

@Override
public short undelete(MessageInfo info) throws StoreException {
throw new UnsupportedOperationException("Undelete unsupported for now");
}

@Override
public FindInfo findEntriesSince(FindToken token, long maxSizeOfEntries) throws StoreException {
// unused function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
throw new IllegalStateException("Not implemented");
}

@Override
public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
throw new IllegalStateException("Not implemented");
}

@Override
public void onBlobReplicaCreated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
throw new IllegalStateException("Not implemented");
Expand All @@ -476,6 +481,11 @@ public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, Blo
throw new IllegalStateException("Not implemented");
}

@Override
public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
throw new IllegalStateException("Not implemented");
}

@Override
public void close() {
// no op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ class EventTracker {
private final int numberOfReplicas;
private final Helper creationHelper;
private final Helper deletionHelper;
private final Helper undeleteHelper;
private final ConcurrentMap<UpdateType, Helper> updateHelpers = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -440,6 +441,7 @@ private String getKey(String host, int port) {
numberOfReplicas = expectedNumberOfReplicas;
creationHelper = new Helper();
deletionHelper = new Helper();
undeleteHelper = new Helper();
}

/**
Expand All @@ -460,6 +462,15 @@ void trackDeletion(String host, int port) {
deletionHelper.track(host, port);
}

/**
* Tracks the undelete event that arrived on {@code host}:{@code port}.
* @param host the host that received the undelete
* @param port the port of the host that describes the instance along with {@code host}.
*/
void trackUndelete(String host, int port) {
undeleteHelper.track(host, port);
}

/**
* Tracks the update event of type {@code updateType} that arrived on {@code host}:{@code port}.
* @param host the host that received the update
Expand Down Expand Up @@ -564,6 +575,11 @@ public void onBlobDeleted(String blobId, String serviceId, Account account, Cont
// ignore
}

@Override
public void onBlobUndeleted(String blobId, String serviceId, Account account, Container container) {
// ignore
}

@Override
public synchronized void onBlobReplicaCreated(String sourceHost, int port, String blobId,
BlobReplicaSourceType sourceType) {
Expand All @@ -585,6 +601,12 @@ public synchronized void onBlobReplicaUpdated(String sourceHost, int port, Strin
.trackUpdate(sourceHost, port, updateType);
}

@Override
public void onBlobReplicaUndeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
objectTracker.computeIfAbsent(blobId, k -> new EventTracker(getNumReplicas(blobId)))
.trackUndelete(sourceHost, port);
}

@Override
public void close() {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import com.github.ambry.config.DiskManagerConfig;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.MessageFormatInputStream;
import com.github.ambry.messageformat.MessageFormatWriteSet;
import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.protocol.RequestOrResponseType;
import com.github.ambry.replication.FindToken;
import com.github.ambry.replication.FindTokenHelper;
Expand Down Expand Up @@ -134,6 +137,31 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
messageSetToUpdate.getMessageSetInfo().stream().map(MessageInfo::getStoreKey).collect(Collectors.toList()));
}

@Override
public short undelete(MessageInfo info) throws StoreException {
operationReceived = RequestOrResponseType.UndeleteRequest;
try {
MessageFormatInputStream stream =
new UndeleteMessageFormatInputStream(info.getStoreKey(), info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs(), (short) returnValueOfUndelete);
// Update info to add stream size;
info = new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs());
ArrayList<MessageInfo> infoList = new ArrayList<>();
infoList.add(info);
messageWriteSetReceived = new MessageFormatWriteSet(stream, infoList, false);
} catch (Exception e) {
throw new StoreException("Unknown error while trying to undelete blobs from store", e,
StoreErrorCodes.Unknown_Error);
}
throwExceptionIfRequired();
checkValidityOfIds(messageWriteSetReceived.getMessageSetInfo()
.stream()
.map(MessageInfo::getStoreKey)
.collect(Collectors.toList()));
return returnValueOfUndelete;
}

@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
operationReceived = RequestOrResponseType.ReplicaMetadataRequest;
Expand Down Expand Up @@ -324,6 +352,11 @@ private void checkValidityOfIds(Collection<? extends StoreKey> ids) throws Store
* The return value for a call to {@link #removeBlobStore(PartitionId)}.
*/
boolean returnValueOfRemoveBlobStore = true;

/**
* The return value for a call to {@link TestStore#undelete(MessageInfo)}.
*/
short returnValueOfUndelete = 1;
/**
* The {@link PartitionId} that was provided in the call to {@link #scheduleNextForCompaction(PartitionId)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.github.ambry.replication.FindToken;
import com.github.ambry.replication.MockReplicationManager;
import com.github.ambry.store.FindInfo;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageWriteSet;
import com.github.ambry.store.MockStoreKeyConverterFactory;
import com.github.ambry.store.StorageManager;
Expand Down Expand Up @@ -707,6 +708,11 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException {
throw new IllegalStateException("Not implemented");
}

@Override
public short undelete(MessageInfo info) throws StoreException {
throw new IllegalStateException("Not implemented");
}

@Override
public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException {
throw new IllegalStateException("Not implemented");
Expand Down
68 changes: 68 additions & 0 deletions ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.messageformat.MessageFormatInputStream;
import com.github.ambry.messageformat.MessageFormatWriteSet;
import com.github.ambry.messageformat.UndeleteMessageFormatInputStream;
import com.github.ambry.replication.FindToken;
import com.github.ambry.utils.FileLock;
import com.github.ambry.utils.Time;
Expand Down Expand Up @@ -612,6 +615,71 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}

@Override
public short undelete(MessageInfo info) throws StoreException {
checkStarted();
final Timer.Context context = metrics.undeleteResponse.time();
try {
StoreKey id = info.getStoreKey();
Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset();
List<IndexValue> values = index.findAllIndexValuesForKey(id, null);
index.validateSanityForUndelete(id, values, IndexValue.LIFE_VERSION_FROM_FRONTEND);
IndexValue latestValue = values.get(0);
short lifeVersion = (short) (latestValue.getLifeVersion() + 1);
MessageFormatInputStream stream =
new UndeleteMessageFormatInputStream(id, info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs(), lifeVersion);
// Update info to add stream size;
info =
new MessageInfo(id, stream.getSize(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs());
ArrayList<MessageInfo> infoList = new ArrayList<>();
infoList.add(info);
MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false);
if (!info.getStoreKey().isAccountContainerMatch(latestValue.getAccountId(), latestValue.getContainerId())) {
if (config.storeValidateAuthorization) {
throw new StoreException(
"UNDELETE authorization failure. Key: " + info.getStoreKey() + " Actually accountId: "
+ latestValue.getAccountId() + "Actually containerId: " + latestValue.getContainerId(),
StoreErrorCodes.Authorization_Failure);
} else {
logger.warn("UNDELETE authorization failure. Key: {} Actually accountId: {} Actually containerId: {}",
info.getStoreKey(), latestValue.getAccountId(), latestValue.getContainerId());
metrics.undeleteAuthorizationFailureCount.inc();
}
}
synchronized (storeWriteLock) {
Offset currentIndexEndOffset = index.getCurrentEndOffset();
if (!currentIndexEndOffset.equals(indexEndOffsetBeforeCheck)) {
FileSpan fileSpan = new FileSpan(indexEndOffsetBeforeCheck, currentIndexEndOffset);
IndexValue value =
index.findKey(info.getStoreKey(), fileSpan, EnumSet.allOf(PersistentIndex.IndexEntryType.class));
if (value != null) {
throw new StoreException("Cannot undelete id " + info.getStoreKey() + " since concurrent operation occurs",
StoreErrorCodes.Life_Version_Conflict);
}
}
Offset endOffsetOfLastMessage = log.getEndOffset();
writeSet.writeTo(log);
logger.trace("Store : {} undelete mark written to log", dataDir);
FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize());
index.markAsUndeleted(info.getStoreKey(), fileSpan, info.getOperationTimeMs());
// TODO: update blobstore stats for undelete (2020-02-10)
}
onSuccess();
return lifeVersion;
} catch (StoreException e) {
if (e.getErrorCode() == StoreErrorCodes.IOError) {
onError();
}
throw e;
} catch (Exception e) {
throw new StoreException("Unknown error while trying to undelete blobs from store " + dataDir, e,
StoreErrorCodes.Unknown_Error);
} finally {
context.stop();
}
}

@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) throws StoreException {
checkStarted();
Expand Down
Loading

0 comments on commit efc5d89

Please sign in to comment.