Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Undelete blob store #1338

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public interface RequestAPI {
*/
void handleDeleteRequest(Request request) throws IOException, InterruptedException;

/**
* Undeletes the blob from the store.
* @param request The request that contains the partition and id of the blob that needs to be undeleted.
* @throws IOException if there are I/O errors carrying our the required operation.
* @throws InterruptedException if request processing is interrupted.
*/
void handleUndeleteRequest(Request request) throws IOException, InterruptedException;

/**
* Updates the TTL of a blob as required in {@code request}.
* @param request The request that contains the partition and id of the blob that needs to be updated.
Expand Down
7 changes: 7 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public interface Store {
*/
void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException;

/**
* Undeletes all the messages that are part of the message set
* @param messageSetToUndelete The list of messages that need to be deleted
* @throws StoreException
*/
void undelete(MessageWriteSet messageSetToUndelete) throws StoreException;

/**
* Finds all the entries from the store given a find token
* @param token The token that acts as a bookmark to make subsequent searches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ public enum StoreErrorCodes {
Already_Updated,
Update_Not_Allowed,
File_Not_Found,
Channel_Closed
Channel_Closed,
Life_Version_Conflict,
ID_Not_Deleted
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException
}
}

@Override
public void undelete(MessageWriteSet messageSetToUndelete) throws StoreException {
throw new UnsupportedOperationException("Undelete is unsupported in CloudBlobStore");
}

/**
* Add a blob state mapping to the recent blob cache.
* @param blobKey the blob key to cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,34 @@
*
*/
public class DeleteMessageFormatInputStream extends MessageFormatInputStream {
public DeleteMessageFormatInputStream(StoreKey key, short accountId, short containerId, long deletionTimeMs)
throws MessageFormatException {
public DeleteMessageFormatInputStream(StoreKey key, short accountId, short containerId, long deletionTimeMs,
short lifeVersion) throws MessageFormatException {
int headerSize = MessageFormatRecord.getHeaderSizeForVersion(MessageFormatRecord.headerVersionToUse);
int deleteRecordSize = MessageFormatRecord.Update_Format_V3.getRecordSize(SubRecord.Type.DELETE);
buffer = ByteBuffer.allocate(headerSize + key.sizeInBytes() + deleteRecordSize);
if (MessageFormatRecord.headerVersionToUse == MessageFormatRecord.Message_Header_Version_V1) {
MessageFormatRecord.MessageHeader_Format_V1.serializeHeader(buffer, deleteRecordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
} else {
MessageFormatRecord.MessageHeader_Format_V2.serializeHeader(buffer, deleteRecordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
switch (MessageFormatRecord.headerVersionToUse) {
case MessageFormatRecord.Message_Header_Version_V1:
MessageFormatRecord.MessageHeader_Format_V1.serializeHeader(buffer, deleteRecordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
break;
case MessageFormatRecord.Message_Header_Version_V2:
MessageFormatRecord.MessageHeader_Format_V2.serializeHeader(buffer, deleteRecordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
break;
case MessageFormatRecord.Message_Header_Version_V3:
MessageFormatRecord.MessageHeader_Format_V3.serializeHeader(buffer, lifeVersion, deleteRecordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
break;
default:
throw new IllegalStateException("Unexpected Message Header: " + MessageFormatRecord.headerVersionToUse);
}
buffer.put(key.toBytes());
// set the message as deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,29 @@
*/
public class TtlUpdateMessageFormatInputStream extends MessageFormatInputStream {
public TtlUpdateMessageFormatInputStream(StoreKey key, short accountId, short containerId, long expiresAtMs,
long updateTimeInMs) throws MessageFormatException {
long updateTimeInMs, short lifeVersion) throws MessageFormatException {
int headerSize = MessageFormatRecord.getHeaderSizeForVersion(MessageFormatRecord.headerVersionToUse);
int recordSize = MessageFormatRecord.Update_Format_V3.getRecordSize(SubRecord.Type.TTL_UPDATE);
buffer = ByteBuffer.allocate(headerSize + key.sizeInBytes() + recordSize);
MessageFormatRecord.MessageHeader_Format_V2.serializeHeader(buffer, recordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
switch (MessageFormatRecord.headerVersionToUse) {
case MessageFormatRecord.Message_Header_Version_V2:
MessageFormatRecord.MessageHeader_Format_V2.serializeHeader(buffer, recordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
break;
case MessageFormatRecord.Message_Header_Version_V3:
MessageFormatRecord.MessageHeader_Format_V3.serializeHeader(buffer, lifeVersion, recordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
break;
default:
throw new IllegalStateException("Unexpected Message Header: " + MessageFormatRecord.headerVersionToUse);
}

buffer.put(key.toBytes());
MessageFormatRecord.Update_Format_V3.serialize(buffer,
new UpdateRecord(accountId, containerId, updateTimeInMs, new TtlUpdateSubRecord(expiresAtMs)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.messageformat;

import com.github.ambry.store.StoreKey;
import java.nio.ByteBuffer;


/**
* Represents a Undelete message.
*
* - - - - - - - - - - - - -
* | Message Header |
* - - - - - - - - - - - - -
* | blob key |
* - - - - - - - - - - - - -
* | update Record |
* - - - - - - - - - - - - -
*
*/
public class UndeleteMessageFormatInputStream extends MessageFormatInputStream {
public UndeleteMessageFormatInputStream(StoreKey key, short accountId, short containerId,
long updateTimeInMs, short lifeVersion) throws MessageFormatException {
int headerSize = MessageFormatRecord.getHeaderSizeForVersion(MessageFormatRecord.headerVersionToUse);
int recordSize = MessageFormatRecord.Update_Format_V3.getRecordSize(SubRecord.Type.UNDELETE);
buffer = ByteBuffer.allocate(headerSize + key.sizeInBytes() + recordSize);
MessageFormatRecord.MessageHeader_Format_V3.serializeHeader(buffer, lifeVersion, recordSize,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset, headerSize + key.sizeInBytes(),
MessageFormatRecord.Message_Header_Invalid_Relative_Offset,
MessageFormatRecord.Message_Header_Invalid_Relative_Offset);
buffer.put(key.toBytes());
MessageFormatRecord.Update_Format_V3.serialize(buffer,
new UpdateRecord(accountId, containerId, updateTimeInMs, new UndeleteSubRecord()));
messageLength = buffer.capacity();
buffer.flip();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ public ArrayList<Long> initialize(short[] blobVersions, BlobType[] blobTypes)
MessageFormatInputStream msg3t;
if (MessageFormatRecord.headerVersionToUse >= MessageFormatRecord.Message_Header_Version_V2) {
msg3t =
new TtlUpdateMessageFormatInputStream(keys[1], accountId, containerId, Utils.Infinite_Time, updateTimeMs);
new TtlUpdateMessageFormatInputStream(keys[1], accountId, containerId, Utils.Infinite_Time, updateTimeMs,
(short) 0);
} else {
msg3t = getPutMessage(keys[5], ByteBuffer.wrap(encryptionKey), blobProperties, usermetadata, blob, BLOB_SIZE,
blobVersions[0], blobTypes[0]);
}

DeleteMessageFormatInputStream msg4d =
new DeleteMessageFormatInputStream(keys[1], accountId, containerId, updateTimeMs);
new DeleteMessageFormatInputStream(keys[1], accountId, containerId, updateTimeMs, (short) 0);

MessageFormatInputStream msg5 =
getPutMessage(keys[3], ByteBuffer.wrap(encryptionKey), blobProperties, usermetadata, blob, BLOB_SIZE,
Expand All @@ -127,7 +128,8 @@ public ArrayList<Long> initialize(short[] blobVersions, BlobType[] blobTypes)
MessageFormatInputStream msg7t;
if (MessageFormatRecord.headerVersionToUse >= MessageFormatRecord.Message_Header_Version_V2) {
msg7t =
new TtlUpdateMessageFormatInputStream(keys[0], accountId, containerId, Utils.Infinite_Time, updateTimeMs);
new TtlUpdateMessageFormatInputStream(keys[0], accountId, containerId, Utils.Infinite_Time, updateTimeMs,
(short) 0);
} else {
msg7t = getPutMessage(keys[6], ByteBuffer.wrap(encryptionKey), blobProperties, usermetadata, blob, BLOB_SIZE,
blobVersions[0], blobTypes[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void initialize() throws MessageFormatException, IOException {
MessageFormatInputStream msg4;
if (MessageFormatRecord.headerVersionToUse >= MessageFormatRecord.Message_Header_Version_V2) {
msg4 = new TtlUpdateMessageFormatInputStream(keys[1], keys[1].getAccountId(), keys[1].getContainerId(),
Utils.Infinite_Time, updateTimeInMs);
Utils.Infinite_Time, updateTimeInMs, (short) 0);
} else {
msg4 = new PutMessageFormatInputStream(keys[3], ByteBuffer.wrap(encryptionKey),
new BlobProperties(4000, "test", keys[3].getAccountId(), keys[3].getContainerId(), false),
Expand All @@ -116,14 +116,15 @@ public void initialize() throws MessageFormatException, IOException {

// 5th message
DeleteMessageFormatInputStream msg5 =
new DeleteMessageFormatInputStream(keys[1], keys[1].getAccountId(), keys[1].getContainerId(), updateTimeInMs);
new DeleteMessageFormatInputStream(keys[1], keys[1].getAccountId(), keys[1].getContainerId(), updateTimeInMs,
(short) 0);
sizes.add(msg5.getSize());

// 6th message
MessageFormatInputStream msg6;
if (MessageFormatRecord.headerVersionToUse >= MessageFormatRecord.Message_Header_Version_V2) {
msg6 = new TtlUpdateMessageFormatInputStream(keys[0], keys[0].getAccountId(), keys[0].getContainerId(),
Utils.Infinite_Time, updateTimeInMs);
Utils.Infinite_Time, updateTimeInMs, (short) 0);
} else {
msg6 = new PutMessageFormatInputStream(keys[4], ByteBuffer.wrap(encryptionKey),
new BlobProperties(4000, "test", keys[4].getAccountId(), keys[4].getContainerId(), false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void messageFormatDeleteRecordTest() throws IOException, MessageFormatExc
deleteRecordSize = MessageFormatRecord.Update_Format_V2.getRecordSize();
useV2Header = MessageFormatRecord.headerVersionToUse == MessageFormatRecord.Message_Header_Version_V2;
} else {
messageFormatStream = new DeleteMessageFormatInputStream(key, accountId, containerId, deletionTimeMs);
messageFormatStream = new DeleteMessageFormatInputStream(key, accountId, containerId, deletionTimeMs, (short) 0);
deleteRecordSize = MessageFormatRecord.Update_Format_V3.getRecordSize(SubRecord.Type.DELETE);
useV2Header = MessageFormatRecord.headerVersionToUse == MessageFormatRecord.Message_Header_Version_V2;
}
Expand Down Expand Up @@ -359,7 +359,7 @@ public void messageFormatTtlUpdateRecordTest() throws IOException, MessageFormat
long ttlUpdateTimeMs = SystemTime.getInstance().milliseconds() + TestUtils.RANDOM.nextInt();
long updatedExpiryMs = ttlUpdateTimeMs + TestUtils.RANDOM.nextInt();
MessageFormatInputStream messageFormatStream =
new TtlUpdateMessageFormatInputStream(key, accountId, containerId, updatedExpiryMs, ttlUpdateTimeMs);
new TtlUpdateMessageFormatInputStream(key, accountId, containerId, updatedExpiryMs, ttlUpdateTimeMs, (short) 0);
long ttlUpdateRecordSize = MessageFormatRecord.Update_Format_V3.getRecordSize(SubRecord.Type.TTL_UPDATE);
int headerSize = MessageFormatRecord.getHeaderSizeForVersion(MessageFormatRecord.headerVersionToUse);
Assert.assertEquals(headerSize + ttlUpdateRecordSize + key.sizeInBytes(), messageFormatStream.getSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ private void testDeletedRecords(short blobVersion, BlobType blobType) throws Exc
containerId = Utils.getRandomShort(RANDOM);
long deletionTimeMs = SystemTime.getInstance().milliseconds() + RANDOM.nextInt();
MessageFormatInputStream messageFormatStream2 =
new DeleteMessageFormatInputStream(key2, accountId, containerId, deletionTimeMs);
new DeleteMessageFormatInputStream(key2, accountId, containerId, deletionTimeMs, (short) 0);

MessageInfo msgInfo2 =
new MessageInfo(key2, messageFormatStream2.getSize(), accountId, containerId, deletionTimeMs);
Expand Down
Loading