Skip to content

Commit

Permalink
Add post blob size (#1188)
Browse files Browse the repository at this point in the history
This change add the "x-ambry-blob-size" for POST requests. For stitched requests, the value of "x-ambry-blob-size" will be the sum of all the chunk sizes.
  • Loading branch information
ankagrawal authored and cgtz committed Jun 18, 2019
1 parent af857f7 commit 8e2c5a1
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 17 deletions.
10 changes: 10 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ public interface RestRequest extends ReadableStreamChannel {
*/
long getBytesReceived();

/**
* Gets the number of bytes read as part of the blob data at this point in time. After the request has been fully read,
* this can be used to determine the full blob size in bytes. The result of this method is only valid for requests
* that have blob data in them.
* @return the current number of blob bytes read from the request body.
*/
default long getBlobBytesReceived() {
return 0;
}

/**
* @return {@code true} if SSL was used for this request (i.e. the request has an associated {@link SSLSession})
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ public long getBytesReceived() {
return bytesReceived.get();
}

@Override
public long getBlobBytesReceived() {
return bytesReceived.get();
}

@Override
public boolean isOpen() {
onEventComplete(Event.IsOpen);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private Callback<String> routerStitchBlobCallback(BlobInfo blobInfo) {
*/
private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.postRouterPutBlobMetrics, blobId -> {
setSignedIdMetadataIfRequired(blobInfo.getBlobProperties());
setSignedIdMetadataAndBlobSize(blobInfo.getBlobProperties());
idConverter.convert(restRequest, blobId, idConverterCallback(blobInfo));
}, uri, LOGGER, finalCallback);
}
Expand Down Expand Up @@ -301,16 +301,18 @@ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceExceptio
* @param blobProperties the {@link BlobProperties} from the request.
* @throws RestServiceException
*/
private void setSignedIdMetadataIfRequired(BlobProperties blobProperties) throws RestServiceException {
private void setSignedIdMetadataAndBlobSize(BlobProperties blobProperties) throws RestServiceException {
if (RestUtils.isChunkUpload(restRequest.getArgs())) {
Map<String, String> metadata = new HashMap<>(2);
metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBytesReceived()));
metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBlobBytesReceived()));
metadata.put(RestUtils.Headers.SESSION,
RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SESSION, true));
metadata.put(EXPIRATION_TIME_MS_KEY,
Long.toString(Utils.addSecondsToEpochTime(time.milliseconds(), blobProperties.getTimeToLiveInSeconds())));
restRequest.setArg(RestUtils.InternalKeys.SIGNED_ID_METADATA_KEY, metadata);
}
//the actual blob size is the number of bytes read
restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBlobBytesReceived());
}

/**
Expand Down Expand Up @@ -348,6 +350,7 @@ List<ChunkInfo> getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj
}
List<ChunkInfo> chunksToStitch = new ArrayList<>(signedChunkIds.size());
String expectedSession = null;
long totalStitchedBlobSize = 0;
for (String signedChunkId : signedChunkIds) {
if (!idSigningService.isIdSigned(signedChunkId)) {
throw new RestServiceException("All chunks IDs must be signed: " + signedChunkId,
Expand All @@ -360,6 +363,8 @@ List<ChunkInfo> getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj
expectedSession = verifyChunkUploadSession(metadata, expectedSession);
@SuppressWarnings("ConstantConditions")
long chunkSizeBytes = RestUtils.getLongHeader(metadata, RestUtils.Headers.BLOB_SIZE, true);

totalStitchedBlobSize += chunkSizeBytes;
// Expiration time is sent to the router, but not verified in this handler. The router is responsible for making
// checks related to internal ambry requirements, like making sure that the chunks do not expire before the
// metadata blob.
Expand All @@ -369,6 +374,8 @@ List<ChunkInfo> getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj

chunksToStitch.add(new ChunkInfo(blobId, chunkSizeBytes, expirationTimeMs));
}
//the actual blob size for stitched blob is the sum of all the chunk sizes
restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, totalStitchedBlobSize);
return chunksToStitch;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void getAndUseSignedUrlTest() throws Exception {
URI uri = new URI(signedPostUrl);
httpRequest = buildRequest(HttpMethod.POST, uri.getPath() + "?" + uri.getQuery(), null, content);
responseParts = nettyClient.sendRequest(httpRequest, null, null).get();
String blobId = verifyPostAndReturnBlobId(responseParts);
String blobId = verifyPostAndReturnBlobId(responseParts, content.capacity());

// verify POST
headers.add(RestUtils.Headers.BLOB_SIZE, content.capacity());
Expand Down Expand Up @@ -422,9 +422,9 @@ public void stitchedUploadTest() throws Exception {
Account account = ACCOUNT_SERVICE.createAndAddRandomAccount();
Container container = account.getContainerById(Container.DEFAULT_PRIVATE_CONTAINER_ID);
Pair<List<String>, byte[]> idsAndContent = uploadDataChunksAndVerify(account, container, 50, 50, 50, 50, 17);
stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond());
stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 217);
idsAndContent = uploadDataChunksAndVerify(account, container, 167);
stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond());
stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 167);
}

/**
Expand Down Expand Up @@ -600,7 +600,7 @@ private void doPostGetHeadUpdateDeleteTest(int contentSize, Account toPostAccoun
} else {
headers.add(RestUtils.Headers.USER_META_DATA_HEADER_PREFIX + "key1", "value1");
headers.add(RestUtils.Headers.USER_META_DATA_HEADER_PREFIX + "key2", "value2");
blobId = postBlobAndVerify(headers, content);
blobId = postBlobAndVerify(headers, content, contentSize);
}
headers.add(RestUtils.Headers.BLOB_SIZE, content.capacity());
getBlobAndVerify(blobId, null, null, headers, isPrivate, content, expectedAccountName, expectedContainerName);
Expand Down Expand Up @@ -680,19 +680,19 @@ private void setAmbryHeadersForPut(HttpHeaders httpHeaders, long ttlInSecs, bool
* @throws ExecutionException
* @throws InterruptedException
*/
private String postBlobAndVerify(HttpHeaders headers, ByteBuffer content)
private String postBlobAndVerify(HttpHeaders headers, ByteBuffer content, long contentSize)
throws ExecutionException, InterruptedException {
FullHttpRequest httpRequest = buildRequest(HttpMethod.POST, "/", headers, content);
ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get();
return verifyPostAndReturnBlobId(responseParts);
return verifyPostAndReturnBlobId(responseParts, contentSize);
}

/**
* Verifies a POST and returns the blob ID.
* @param responseParts the response received from the server.
* @returnn the blob ID of the blob.
*/
private String verifyPostAndReturnBlobId(ResponseParts responseParts) {
private String verifyPostAndReturnBlobId(ResponseParts responseParts, long contentSize) {
HttpResponse response = getHttpResponse(responseParts);
assertEquals("Unexpected response status", HttpResponseStatus.CREATED, response.status());
assertTrue("No Date header", response.headers().getTimeMillis(HttpHeaderNames.DATE, -1) != -1);
Expand All @@ -703,6 +703,7 @@ private String verifyPostAndReturnBlobId(ResponseParts responseParts) {
assertNotNull("Blob ID from POST should not be null", blobId);
assertNoContent(responseParts.queue, 1);
assertTrue("Channel should be active", HttpUtil.isKeepAlive(response));
assertEquals("Correct blob size should be returned in response", Long.toString(contentSize), response.headers().get(RestUtils.Headers.BLOB_SIZE));
verifyTrackingHeaders(response);
return blobId;
}
Expand Down Expand Up @@ -1162,7 +1163,7 @@ private String multipartPostBlobAndVerify(HttpHeaders headers, ByteBuffer conten
HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.POST, "/", headers);
HttpPostRequestEncoder encoder = createEncoder(httpRequest, content, usermetadata);
ResponseParts responseParts = nettyClient.sendRequest(encoder.finalizeRequest(), encoder, null).get();
return verifyPostAndReturnBlobId(responseParts);
return verifyPostAndReturnBlobId(responseParts, content.capacity());
}

/**
Expand Down Expand Up @@ -1255,7 +1256,7 @@ private Pair<List<String>, byte[]> uploadDataChunksAndVerify(Account account, Co
// Use signed URL to POST
httpRequest = buildRequest(HttpMethod.POST, uri.getPath() + "?" + uri.getQuery(), null, content);
responseParts = nettyClient.sendRequest(httpRequest, null, null).get();
String signedId = verifyPostAndReturnBlobId(responseParts);
String signedId = verifyPostAndReturnBlobId(responseParts, chunkSize);
assertTrue("Blob ID for chunk upload must be signed", idSigningService.isIdSigned(signedId.substring(1)));
Pair<String, Map<String, String>> idAndMetadata = idSigningService.parseSignedId(signedId.substring(1));
// Inspect metadata fields
Expand Down Expand Up @@ -1290,15 +1291,15 @@ private Pair<List<String>, byte[]> uploadDataChunksAndVerify(Account account, Co
* @throws Exception
*/
private void stitchBlobAndVerify(Account account, Container container, List<String> signedChunkIds,
byte[] fullContentArray) throws Exception {
byte[] fullContentArray, long stitchedBlobSize) throws Exception {
// stitchBlob
HttpHeaders stitchHeaders = new DefaultHttpHeaders();
setAmbryHeadersForPut(stitchHeaders, TTL_SECS, !container.isCacheable(), "stitcher", "video/mp4",
"stitchedUploadTest", account.getName(), container.getName());
HttpRequest httpRequest = buildRequest(HttpMethod.POST, Operations.STITCH, stitchHeaders,
ByteBuffer.wrap(StitchRequestSerDe.toJson(signedChunkIds).toString().getBytes(StandardCharsets.UTF_8)));
ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get();
String stitchedBlobId = verifyPostAndReturnBlobId(responseParts);
String stitchedBlobId = verifyPostAndReturnBlobId(responseParts, stitchedBlobSize);
HttpHeaders expectedGetHeaders = new DefaultHttpHeaders().add(stitchHeaders);
// Test different request types on stitched blob ID
// (getBlobInfo, getBlob, getBlob w/ range, head, updateBlobTtl, deleteBlob)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ private void doChunkUploadTest(int contentLength, boolean chunkUpload, String up
InMemoryRouter.InMemoryBlob blob = router.getActiveBlobs().get(idConverterFactory.lastInput);
assertEquals("Unexpected blob content stored", ByteBuffer.wrap(content), blob.getBlob());
assertEquals("Unexpected ttl stored", blobTtlSecs, blob.getBlobProperties().getTimeToLiveInSeconds());
//check that blob size matches the actual upload size
assertEquals("Invalid blob size", Integer.toString(contentLength), restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE));
} else {
TestUtils.assertException(ExecutionException.class, () -> future.get(TIMEOUT_SECS, TimeUnit.SECONDS),
errorChecker);
Expand Down Expand Up @@ -468,6 +470,19 @@ private byte[] getStitchRequestBody(List<String> signedChunkIds) {
return StitchRequestSerDe.toJson(signedChunkIds).toString().getBytes(StandardCharsets.UTF_8);
}

/**
* Caculate the blob size for a stitched blob from the individual chunks.
* @param stitchedChunks list of chunks stitched to form the blob.
* @return size of the stitched blob
*/
private long getStitchedBlobSize(List<ChunkInfo> stitchedChunks) {
long blobSize = 0;
for(ChunkInfo chunkInfo : stitchedChunks) {
blobSize += chunkInfo.getChunkSizeInBytes();
}
return blobSize;
}

/**
* Make a stitch blob call using {@link PostBlobHandler} and verify the result of the operation.
* @param requestBody the body of the stitch request to supply.
Expand Down Expand Up @@ -496,6 +511,9 @@ private void stitchBlobAndVerify(byte[] requestBody, List<ChunkInfo> expectedSti
.map(chunkInfo -> router.getActiveBlobs().get(chunkInfo.getBlobId()).getBlob().array())
.forEach(buf -> expectedContent.write(buf, 0, buf.length));
assertEquals("Unexpected blob content stored", ByteBuffer.wrap(expectedContent.toByteArray()), blob.getBlob());
//check actual size of stitched blob
assertEquals("Unexpected blob size", Long.toString(getStitchedBlobSize(expectedStitchedChunks)),
restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE));
} else {
TestUtils.assertException(ExecutionException.class, () -> future.get(TIMEOUT_SECS, TimeUnit.SECONDS),
errorChecker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
class NettyMultipartRequest extends NettyRequest {
private final Queue<HttpContent> rawRequestContents = new LinkedBlockingQueue<>();
private final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicLong bytesReceived = new AtomicLong(0);
private final long maxSizeAllowedInBytes;

//For Multipart request, bytes received in http content can be different from blob bytes received.
private final AtomicLong blobBytesReceived = new AtomicLong(0);
private boolean readyForRead = false;
private boolean hasBlob = false;

Expand Down Expand Up @@ -221,6 +221,7 @@ private void processPart(InterfaceHttpData part) throws RestServiceException {
try {
if (isOpen()) {
requestContents.add(new DefaultHttpContent(ReferenceCountUtil.retain(fileUpload.content())));
blobBytesReceived.set(fileUpload.content().capacity());
} else {
nettyMetrics.multipartRequestAlreadyClosedError.inc();
throw new RestServiceException("Request is closed", RestServiceErrorCode.RequestChannelClosed);
Expand Down Expand Up @@ -252,4 +253,9 @@ private void processPart(InterfaceHttpData part) throws RestServiceException {
throw new RestServiceException("Unexpected HTTP data", RestServiceErrorCode.BadRequest);
}
}

@Override
public long getBlobBytesReceived() {
return blobBytesReceived.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class NettyRequest implements RestRequest {
private final RestMethod restMethod;
private final RestRequestMetricsTracker restRequestMetricsTracker = new RestRequestMetricsTracker();
private final AtomicBoolean channelOpen = new AtomicBoolean(true);
private final AtomicLong bytesReceived = new AtomicLong(0);
protected final AtomicLong bytesReceived = new AtomicLong(0);
private final AtomicLong bytesBuffered = new AtomicLong(0);
private final Logger logger = LoggerFactory.getLogger(getClass());
private final RecvByteBufAllocator recvByteBufAllocator = new DefaultMaxBytesRecvByteBufAllocator();
Expand Down Expand Up @@ -391,6 +391,11 @@ public long getBytesReceived() {
return bytesReceived.get();
}

@Override
public long getBlobBytesReceived() {
return bytesReceived.get();
}

/**
* Adds some content in the form of {@link HttpContent} to this RestRequest. This content will be available to read
* through the read operations.
Expand Down

0 comments on commit 8e2c5a1

Please sign in to comment.