Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stitched blobs support different sized component blobs #1175

Merged
merged 15 commits into from
Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ public class RouterConfig {
@Default("5")
public final short routerBlobidCurrentVersion;

/**
* The version to use for new metadata blobs.
*/
@Config("router.metadata.content.version")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to call this router.metadata.blob.content.version ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like router.metadata.content.version because it matches the terminology used in the Metadata_Content_Format_* classes

@Default("2")
public final short routerMetadataContentVersion;

/**
* The KeyManagementServiceFactory that will be used to fetch {@link com.github.ambry.router.KeyManagementService}
*/
Expand Down Expand Up @@ -292,6 +299,8 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
routerBlobidCurrentVersion =
verifiableProperties.getShortFromAllowedValues("router.blobid.current.version", (short) 6,
new Short[]{1, 2, 3, 4, 5, 6});
routerMetadataContentVersion =
verifiableProperties.getShortFromAllowedValues("router.metadata.content.version", (short) 2, new Short[]{2, 3});
routerKeyManagementServiceFactory =
verifiableProperties.getString("router.key.management.service.factory", DEFAULT_KMS_FACTORY);
routerCryptoServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@
package com.github.ambry.messageformat;

import com.github.ambry.store.StoreKey;
import com.github.ambry.utils.Pair;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;


/**
* This class holds information about a composite blob parsed from the metadata blob. It contains the chunk size,
* total composite blob size, and a list of keys for the blob's data chunks.
*/
public class CompositeBlobInfo {

private final int chunkSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are chunkSize/keys still needed after introducing StoreKeyAndSizeAndOffset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chunk size would still be used for deserializing V2 metadata content and keys is used so that getKeys() call doesn't have to parse the StoreKeyAndSizeAndOffset object every time it is called

private final long totalSize;
private final List<StoreKey> keys;
private final List<Long> offsets = new ArrayList<>();
private final List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
private final short metadataContentVersion;

/**
* Construct a {@link CompositeBlobInfo} object.
Expand All @@ -35,8 +43,81 @@ public class CompositeBlobInfo {
*/
public CompositeBlobInfo(int chunkSize, long totalSize, List<StoreKey> keys) {
this.chunkSize = chunkSize;
if (chunkSize < 1L) {
throw new IllegalArgumentException("chunkSize cannot be 0 or less");
}
if (keys == null || keys.isEmpty()) {
throw new IllegalArgumentException("keys should not be null or empty");
}
long rightAmountOfKeys = totalSize / chunkSize + (totalSize % chunkSize == 0 ? 0 : 1);
if (rightAmountOfKeys != keys.size()) {
throw new IllegalArgumentException("keys should contain " + rightAmountOfKeys + " keys, not " + keys.size());
}
this.totalSize = totalSize;
this.keys = keys;
metadataContentVersion = MessageFormatRecord.Metadata_Content_Version_V2;
long last = 0;
for (StoreKey key : keys) {
offsets.add(last);
if (totalSize - last < 1L) {
throw new IllegalArgumentException("CompositeBlobInfo can't be composed of blobs with size 0 or less");
}
chunkMetadataList.add(new ChunkMetadata(key, last, Math.min(chunkSize, totalSize - last)));
last += chunkSize;
}
}

/**
* Construct a {@link CompositeBlobInfo} object.
* @param keysAndContentSizes list of store keys and the size of the data content they reference
*/
public CompositeBlobInfo(List<Pair<StoreKey, Long>> keysAndContentSizes) {
if (keysAndContentSizes == null || keysAndContentSizes.isEmpty()) {
throw new IllegalArgumentException("keysAndContentSizes should not be null or empty");
}
this.chunkSize = -1;
long last = 0;
metadataContentVersion = MessageFormatRecord.Metadata_Content_Version_V3;
for (Pair<StoreKey, Long> keyAndContentSize : keysAndContentSizes) {
if (keyAndContentSize.getSecond() < 1L) {
throw new IllegalArgumentException("CompositeBlobInfo can't be composed of blobs with size 0 or less");
}
StoreKey key = keyAndContentSize.getFirst();
offsets.add(last);
chunkMetadataList.add(new ChunkMetadata(key, last, keyAndContentSize.getSecond()));
last += keyAndContentSize.getSecond();
}
this.totalSize = last;
}

/**
* Returns the keys (along with their data content size and offset relative to the total
* composite blob) of the chunks related to the given byte range of the entire composite
* blob
* @param start inclusive starting byte index
* @param end inclusive ending byte index
* @return the list of {@link ChunkMetadata} that lie upon the inclusive range
* between 'start' and 'end'
*/
public List<ChunkMetadata> getStoreKeysInByteRange(long start, long end) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might want to add unit tests for CompositeBlobInfo testing for edge cases of this method.
e.g.:

  • 0 byte chunks in the middle of the blob (Potentially a possibility with segmented blobs)
  • start/end = last byte of blob

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, actually writing that now. CompositeBlobInfo didn't have a unit test before because it was a POJO class but with this method it should be tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain "0 byte chunks in the middle of the blob"?

Copy link
Contributor

@cgtz cgtz Jun 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. So I was thinking that since we allow 0 byte simple blob uploads, they are valid candidates for stitching. So, someone could potentially create a blob with chunk sizes like: 25, 30, 0, 0, 70, 0, 5

If we call this method with a start offset of 30, would the range start from the second or third chunk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in person, going to change the code so that stitched blobs will not allow blobs of 0 length to be stitched (will throw an exception if that happens)

if (end < start || start < 0L || end >= totalSize) {
throw new IllegalArgumentException(
"Bad input parameters, start=" + start + " end=" + end + " totalSize=" + totalSize);
}
int startIdx = Collections.binarySearch(offsets, start);
//binarySearch returns -(insertion point) - 1 if not an exact match, which points to the index of
//the first element greater than 'start', or list.size() if all elements in the list are
// less than 'start'.
if (startIdx < 0) {
startIdx = -startIdx - 2; //points to element with offset closest to but less than 'start'
}

int endIdx = Collections.binarySearch(offsets, end);
if (endIdx < 0) {
endIdx = -endIdx - 2; //points to element with offset closest to but less than 'end'
}
endIdx++; //List.subList expects an exclusive index

return chunkMetadataList.subList(startIdx, endIdx);
}

/**
Expand All @@ -60,6 +141,75 @@ public int getChunkSize() {
* @return A list of {@link StoreKey}s.
*/
public List<StoreKey> getKeys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to materializing a list of StoreKeys for every CompositeBlobInfo is to return a "view" of chunkMetadataList:

  public List<StoreKey> getKeys() {
    return new AbstractList<StoreKey>() {
      @Override
      public StoreKey get(int index) {
        return chunkMetadataList.get(index).getStoreKey();
      }

      @Override
      public int size() {
        return chunkMetadataList.size();
      }
    };
  }

It seems like this would suffice for the two use cases that need the getKeys() method, the getChunkIdsOnly option in the router and BlobIdTransformer, as both of them are just interested in iterating through all of the keys and not mutating the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool idea, I'll use it

return keys;
return new AbstractList<StoreKey>() {
@Override
public StoreKey get(int index) {
return chunkMetadataList.get(index).getStoreKey();
}

@Override
public int size() {
return chunkMetadataList.size();
}
};
}

/**
* Get the list of keys for the composite blob's data chunks, along with the
* key's data content size and offset relative to the total composite blob
* @return A list of {@link ChunkMetadata}
*/
public List<ChunkMetadata> getChunkMetadataList() {
return chunkMetadataList;
}

/**
* Return the version of the metadata content record that this object was deserialized from
* @return the version of the metadata content record that this object was deserialized from
*/
public short getMetadataContentVersion() {
return metadataContentVersion;
}

/**
* POJO class for holding a store key, the size of the data content it refers to,
* and the offset of the data in the larger composite blob
*/
public static class ChunkMetadata {
private final StoreKey storeKey;
private final long offset;
private final long size;

public ChunkMetadata(StoreKey storeKey, long offset, long size) {
this.storeKey = storeKey;
this.offset = offset;
this.size = size;
}

public StoreKey getStoreKey() {
return storeKey;
}

public long getOffset() {
return offset;
}

public long getSize() {
return size;
}

@Override
public boolean equals(Object that) {
if (that instanceof ChunkMetadata) {
ChunkMetadata thatCM = (ChunkMetadata) that;
return storeKey.equals(thatCM.storeKey) && offset == thatCM.offset && size == thatCM.size;
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(storeKey, offset, size);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.Crc32;
import com.github.ambry.utils.CrcInputStream;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class MessageFormatRecord {
public static final short Blob_Version_V1 = 1;
public static final short Blob_Version_V2 = 2;
public static final short Metadata_Content_Version_V2 = 2;
public static final short Metadata_Content_Version_V3 = 3;
public static final int Message_Header_Invalid_Relative_Offset = -1;

static short headerVersionToUse = Message_Header_Version_V2;
Expand Down Expand Up @@ -1457,7 +1459,7 @@ public static void serializeMetadataContentRecord(ByteBuffer outputBuffer, int c
* @throws MessageFormatException
*/
public static CompositeBlobInfo deserializeMetadataContentRecord(DataInputStream stream,
StoreKeyFactory storeKeyFactory) throws IOException, MessageFormatException {
StoreKeyFactory storeKeyFactory) throws IOException {
List<StoreKey> keys = new ArrayList<StoreKey>();
int chunkSize = stream.readInt();
long totalSize = stream.readLong();
Expand All @@ -1469,6 +1471,97 @@ public static CompositeBlobInfo deserializeMetadataContentRecord(DataInputStream
return new CompositeBlobInfo(chunkSize, totalSize, keys);
}
}

/**
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* | | | | | | | | |
* | version | total size | # of keys | size of | key1 | size of | key2 | ...... |
* |(2 bytes)| (8 bytes) | (4 bytes) | key1 blob | | key2 blob | | ...... |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the totalSize field meant for sanity checking that sum(chunksizes) == totalsize or are there other places where it is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's only used for checking

* | | | | (8 bytes) | | (8 bytes) | | |
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* version - The version of the metadata content record
*
* total size - total size of the object this metadata describes.
*
* # of keys - number of keys in the metadata blob
*
* size of key1 blob - size of the data referenced by key1
*
* key1 - first key to be part of metadata blob
*
* size of key2 blob - size of the data referenced by key2
*
* key2 - second key to be part of metadata blob
*
*/
public static class Metadata_Content_Format_V3 {
private static final int NUM_OF_KEYS_FIELD_SIZE_IN_BYTES = 4;
private static final int SIZE_OF_BLOB_FIELD_SIZE_IN_BYTES = 8;
private static final int TOTAL_SIZE_FIELD_SIZE_IN_BYTES = 8;

/**
* Get the total size of the metadata content record.
* @param keySize The size of each key in bytes.
* @param numberOfKeys The total number of keys.
* @return The total size in bytes.
*/
public static int getMetadataContentSize(int keySize, int numberOfKeys) {
return Version_Field_Size_In_Bytes + TOTAL_SIZE_FIELD_SIZE_IN_BYTES + NUM_OF_KEYS_FIELD_SIZE_IN_BYTES
+ numberOfKeys * (keySize + SIZE_OF_BLOB_FIELD_SIZE_IN_BYTES);
}

/**
* Serialize a metadata content record.
* @param outputBuffer output buffer of the serialized metadata content
* @param totalSize total size of the blob data content
* @param keysAndContentSizes list of data blob keys referenced by the metadata
* blob along with the data content sizes of each data blob
*/
public static void serializeMetadataContentRecord(ByteBuffer outputBuffer, long totalSize,
List<Pair<StoreKey, Long>> keysAndContentSizes) {
int keySize = keysAndContentSizes.get(0).getFirst().sizeInBytes();
outputBuffer.putShort(Metadata_Content_Version_V3);
outputBuffer.putLong(totalSize);
outputBuffer.putInt(keysAndContentSizes.size());
long sum = 0;
for (Pair<StoreKey, Long> keyAndContentSize : keysAndContentSizes) {
if (keyAndContentSize.getFirst().sizeInBytes() != keySize) {
throw new IllegalArgumentException("Keys are not of same size");
}
outputBuffer.putLong(keyAndContentSize.getSecond());
outputBuffer.put(keyAndContentSize.getFirst().toBytes());
sum += keyAndContentSize.getSecond();
}
if (sum != totalSize) {
throw new IllegalArgumentException("Key content sizes do not equal total size");
}
}

/**
* Deserialize a metadata content record from a stream.
* @param stream The stream to read the serialized record from.
* @param storeKeyFactory The factory to use for parsing keys in the serialized metadata content record.
* @return A {@link CompositeBlobInfo} object with the chunk size and list of keys from the record.
* @throws IOException
*/
public static CompositeBlobInfo deserializeMetadataContentRecord(DataInputStream stream,
StoreKeyFactory storeKeyFactory) throws IOException {
List<Pair<StoreKey, Long>> keysAndContentSizes = new ArrayList<>();
long totalSize = stream.readLong();
long sum = 0;
int numberOfKeys = stream.readInt();
for (int i = 0; i < numberOfKeys; i++) {
long contentSize = stream.readLong();
StoreKey storeKey = storeKeyFactory.getStoreKey(stream);
keysAndContentSizes.add(new Pair<>(storeKey, contentSize));
sum += contentSize;
}
if (sum != totalSize) {
throw new IllegalArgumentException("Key content sizes do not equal total size");
}
return new CompositeBlobInfo(keysAndContentSizes);
}
}
}

class DeserializedBlobProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@
import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.Pair;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;


/**
* A class to serialize and deserialize MetadataContent which forms the content of a Metadata Blob.
*/
public class MetadataContentSerDe {

/**
* Serialize the input list of keys that form the metadata content.
* @param chunkSize the size of the intermediate data chunks for the object this metadata describes.
* @param totalSize the total size of the object this metadata describes.
* @param keys the input list of keys that form the metadata content.
* @return a ByteBuffer containing the serialized output.
*/
public static ByteBuffer serializeMetadataContent(int chunkSize, long totalSize, List<StoreKey> keys) {
public static ByteBuffer serializeMetadataContentV2(int chunkSize, long totalSize, List<StoreKey> keys) {
int bufSize =
MessageFormatRecord.Metadata_Content_Format_V2.getMetadataContentSize(keys.get(0).sizeInBytes(), keys.size());
ByteBuffer outputBuf = ByteBuffer.allocate(bufSize);
Expand All @@ -42,6 +45,21 @@ public static ByteBuffer serializeMetadataContent(int chunkSize, long totalSize,
return outputBuf;
}

/**
* Serialize the input list of keys with data content sizes that form the metadata content.
* @param totalSize the total size of the object this metadata describes.
* @param keysAndContentSizes the input list of keys and related data content sizes that form the metadata content.
* @return a ByteBuffer containing the serialized output.
*/
public static ByteBuffer serializeMetadataContentV3(long totalSize, List<Pair<StoreKey, Long>> keysAndContentSizes) {
int bufSize = MessageFormatRecord.Metadata_Content_Format_V3.getMetadataContentSize(
keysAndContentSizes.get(0).getFirst().sizeInBytes(), keysAndContentSizes.size());
ByteBuffer outputBuf = ByteBuffer.allocate(bufSize);
MessageFormatRecord.Metadata_Content_Format_V3.serializeMetadataContentRecord(outputBuf, totalSize,
keysAndContentSizes);
return outputBuf;
}

/**
* Deserialize the serialized metadata content in the input ByteBuffer using the given {@link StoreKeyFactory} as a
* reference.
Expand All @@ -58,6 +76,9 @@ public static CompositeBlobInfo deserializeMetadataContentRecord(ByteBuffer buf,
case MessageFormatRecord.Metadata_Content_Version_V2:
return MessageFormatRecord.Metadata_Content_Format_V2.deserializeMetadataContentRecord(
new DataInputStream(new ByteBufferInputStream(buf)), storeKeyFactory);
case MessageFormatRecord.Metadata_Content_Version_V3:
return MessageFormatRecord.Metadata_Content_Format_V3.deserializeMetadataContentRecord(
new DataInputStream(new ByteBufferInputStream(buf)), storeKeyFactory);
default:
throw new MessageFormatException("Unknown version encountered for MetadataContent: " + version,
MessageFormatErrorCodes.Unknown_Format_Version);
Expand Down
Loading