diff --git a/ambry-api/src/main/java/com.github.ambry/rest/RestRequest.java b/ambry-api/src/main/java/com.github.ambry/rest/RestRequest.java index c0f0680697..2393a35e1b 100644 --- a/ambry-api/src/main/java/com.github.ambry/rest/RestRequest.java +++ b/ambry-api/src/main/java/com.github.ambry/rest/RestRequest.java @@ -141,6 +141,16 @@ public interface RestRequest extends ReadableStreamChannel { */ long getBytesReceived(); + /** + * Gets the number of bytes read as part of the blob data at this point in time. After the request has been fully read, + * this can be used to determine the full blob size in bytes. The result of this method is only valid for requests + * that have blob data in them. + * @return the current number of blob bytes read from the request body. + */ + default long getBlobBytesReceived() { + return 0; + } + /** * @return {@code true} if SSL was used for this request (i.e. the request has an associated {@link SSLSession}) */ diff --git a/ambry-api/src/test/java/com.github.ambry/rest/MockRestRequest.java b/ambry-api/src/test/java/com.github.ambry/rest/MockRestRequest.java index 843a893987..d410ecdad0 100644 --- a/ambry-api/src/test/java/com.github.ambry/rest/MockRestRequest.java +++ b/ambry-api/src/test/java/com.github.ambry/rest/MockRestRequest.java @@ -255,6 +255,11 @@ public long getBytesReceived() { return bytesReceived.get(); } + @Override + public long getBlobBytesReceived() { + return bytesReceived.get(); + } + @Override public boolean isOpen() { onEventComplete(Event.IsOpen); diff --git a/ambry-frontend/src/main/java/com.github.ambry.frontend/PostBlobHandler.java b/ambry-frontend/src/main/java/com.github.ambry.frontend/PostBlobHandler.java index 4fe51ed735..edcd559486 100644 --- a/ambry-frontend/src/main/java/com.github.ambry.frontend/PostBlobHandler.java +++ b/ambry-frontend/src/main/java/com.github.ambry.frontend/PostBlobHandler.java @@ -212,7 +212,7 @@ private Callback routerStitchBlobCallback(BlobInfo blobInfo) { */ private Callback routerPutBlobCallback(BlobInfo blobInfo) { return buildCallback(frontendMetrics.postRouterPutBlobMetrics, blobId -> { - setSignedIdMetadataIfRequired(blobInfo.getBlobProperties()); + setSignedIdMetadataAndBlobSize(blobInfo.getBlobProperties()); idConverter.convert(restRequest, blobId, idConverterCallback(blobInfo)); }, uri, LOGGER, finalCallback); } @@ -301,16 +301,18 @@ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceExceptio * @param blobProperties the {@link BlobProperties} from the request. * @throws RestServiceException */ - private void setSignedIdMetadataIfRequired(BlobProperties blobProperties) throws RestServiceException { + private void setSignedIdMetadataAndBlobSize(BlobProperties blobProperties) throws RestServiceException { if (RestUtils.isChunkUpload(restRequest.getArgs())) { Map metadata = new HashMap<>(2); - metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBytesReceived())); + metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBlobBytesReceived())); metadata.put(RestUtils.Headers.SESSION, RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SESSION, true)); metadata.put(EXPIRATION_TIME_MS_KEY, Long.toString(Utils.addSecondsToEpochTime(time.milliseconds(), blobProperties.getTimeToLiveInSeconds()))); restRequest.setArg(RestUtils.InternalKeys.SIGNED_ID_METADATA_KEY, metadata); } + //the actual blob size is the number of bytes read + restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBlobBytesReceived()); } /** @@ -348,6 +350,7 @@ List getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj } List chunksToStitch = new ArrayList<>(signedChunkIds.size()); String expectedSession = null; + long totalStitchedBlobSize = 0; for (String signedChunkId : signedChunkIds) { if (!idSigningService.isIdSigned(signedChunkId)) { throw new RestServiceException("All chunks IDs must be signed: " + signedChunkId, @@ -360,6 +363,8 @@ List getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj expectedSession = verifyChunkUploadSession(metadata, expectedSession); @SuppressWarnings("ConstantConditions") long chunkSizeBytes = RestUtils.getLongHeader(metadata, RestUtils.Headers.BLOB_SIZE, true); + + totalStitchedBlobSize += chunkSizeBytes; // Expiration time is sent to the router, but not verified in this handler. The router is responsible for making // checks related to internal ambry requirements, like making sure that the chunks do not expire before the // metadata blob. @@ -369,6 +374,8 @@ List getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj chunksToStitch.add(new ChunkInfo(blobId, chunkSizeBytes, expirationTimeMs)); } + //the actual blob size for stitched blob is the sum of all the chunk sizes + restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, totalStitchedBlobSize); return chunksToStitch; } diff --git a/ambry-frontend/src/test/java/com.github.ambry.frontend/FrontendIntegrationTest.java b/ambry-frontend/src/test/java/com.github.ambry.frontend/FrontendIntegrationTest.java index 44b3a5a09b..60108233ef 100644 --- a/ambry-frontend/src/test/java/com.github.ambry.frontend/FrontendIntegrationTest.java +++ b/ambry-frontend/src/test/java/com.github.ambry.frontend/FrontendIntegrationTest.java @@ -379,7 +379,7 @@ public void getAndUseSignedUrlTest() throws Exception { URI uri = new URI(signedPostUrl); httpRequest = buildRequest(HttpMethod.POST, uri.getPath() + "?" + uri.getQuery(), null, content); responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); - String blobId = verifyPostAndReturnBlobId(responseParts); + String blobId = verifyPostAndReturnBlobId(responseParts, content.capacity()); // verify POST headers.add(RestUtils.Headers.BLOB_SIZE, content.capacity()); @@ -422,9 +422,9 @@ public void stitchedUploadTest() throws Exception { Account account = ACCOUNT_SERVICE.createAndAddRandomAccount(); Container container = account.getContainerById(Container.DEFAULT_PRIVATE_CONTAINER_ID); Pair, byte[]> idsAndContent = uploadDataChunksAndVerify(account, container, 50, 50, 50, 50, 17); - stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond()); + stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 217); idsAndContent = uploadDataChunksAndVerify(account, container, 167); - stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond()); + stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 167); } /** @@ -600,7 +600,7 @@ private void doPostGetHeadUpdateDeleteTest(int contentSize, Account toPostAccoun } else { headers.add(RestUtils.Headers.USER_META_DATA_HEADER_PREFIX + "key1", "value1"); headers.add(RestUtils.Headers.USER_META_DATA_HEADER_PREFIX + "key2", "value2"); - blobId = postBlobAndVerify(headers, content); + blobId = postBlobAndVerify(headers, content, contentSize); } headers.add(RestUtils.Headers.BLOB_SIZE, content.capacity()); getBlobAndVerify(blobId, null, null, headers, isPrivate, content, expectedAccountName, expectedContainerName); @@ -680,11 +680,11 @@ private void setAmbryHeadersForPut(HttpHeaders httpHeaders, long ttlInSecs, bool * @throws ExecutionException * @throws InterruptedException */ - private String postBlobAndVerify(HttpHeaders headers, ByteBuffer content) + private String postBlobAndVerify(HttpHeaders headers, ByteBuffer content, long contentSize) throws ExecutionException, InterruptedException { FullHttpRequest httpRequest = buildRequest(HttpMethod.POST, "/", headers, content); ResponseParts responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); - return verifyPostAndReturnBlobId(responseParts); + return verifyPostAndReturnBlobId(responseParts, contentSize); } /** @@ -692,7 +692,7 @@ private String postBlobAndVerify(HttpHeaders headers, ByteBuffer content) * @param responseParts the response received from the server. * @returnn the blob ID of the blob. */ - private String verifyPostAndReturnBlobId(ResponseParts responseParts) { + private String verifyPostAndReturnBlobId(ResponseParts responseParts, long contentSize) { HttpResponse response = getHttpResponse(responseParts); assertEquals("Unexpected response status", HttpResponseStatus.CREATED, response.status()); assertTrue("No Date header", response.headers().getTimeMillis(HttpHeaderNames.DATE, -1) != -1); @@ -703,6 +703,7 @@ private String verifyPostAndReturnBlobId(ResponseParts responseParts) { assertNotNull("Blob ID from POST should not be null", blobId); assertNoContent(responseParts.queue, 1); assertTrue("Channel should be active", HttpUtil.isKeepAlive(response)); + assertEquals("Correct blob size should be returned in response", Long.toString(contentSize), response.headers().get(RestUtils.Headers.BLOB_SIZE)); verifyTrackingHeaders(response); return blobId; } @@ -1162,7 +1163,7 @@ private String multipartPostBlobAndVerify(HttpHeaders headers, ByteBuffer conten HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.POST, "/", headers); HttpPostRequestEncoder encoder = createEncoder(httpRequest, content, usermetadata); ResponseParts responseParts = nettyClient.sendRequest(encoder.finalizeRequest(), encoder, null).get(); - return verifyPostAndReturnBlobId(responseParts); + return verifyPostAndReturnBlobId(responseParts, content.capacity()); } /** @@ -1255,7 +1256,7 @@ private Pair, byte[]> uploadDataChunksAndVerify(Account account, Co // Use signed URL to POST httpRequest = buildRequest(HttpMethod.POST, uri.getPath() + "?" + uri.getQuery(), null, content); responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); - String signedId = verifyPostAndReturnBlobId(responseParts); + String signedId = verifyPostAndReturnBlobId(responseParts, chunkSize); assertTrue("Blob ID for chunk upload must be signed", idSigningService.isIdSigned(signedId.substring(1))); Pair> idAndMetadata = idSigningService.parseSignedId(signedId.substring(1)); // Inspect metadata fields @@ -1290,7 +1291,7 @@ private Pair, byte[]> uploadDataChunksAndVerify(Account account, Co * @throws Exception */ private void stitchBlobAndVerify(Account account, Container container, List signedChunkIds, - byte[] fullContentArray) throws Exception { + byte[] fullContentArray, long stitchedBlobSize) throws Exception { // stitchBlob HttpHeaders stitchHeaders = new DefaultHttpHeaders(); setAmbryHeadersForPut(stitchHeaders, TTL_SECS, !container.isCacheable(), "stitcher", "video/mp4", @@ -1298,7 +1299,7 @@ private void stitchBlobAndVerify(Account account, Container container, List future.get(TIMEOUT_SECS, TimeUnit.SECONDS), errorChecker); @@ -468,6 +470,19 @@ private byte[] getStitchRequestBody(List signedChunkIds) { return StitchRequestSerDe.toJson(signedChunkIds).toString().getBytes(StandardCharsets.UTF_8); } + /** + * Caculate the blob size for a stitched blob from the individual chunks. + * @param stitchedChunks list of chunks stitched to form the blob. + * @return size of the stitched blob + */ + private long getStitchedBlobSize(List stitchedChunks) { + long blobSize = 0; + for(ChunkInfo chunkInfo : stitchedChunks) { + blobSize += chunkInfo.getChunkSizeInBytes(); + } + return blobSize; + } + /** * Make a stitch blob call using {@link PostBlobHandler} and verify the result of the operation. * @param requestBody the body of the stitch request to supply. @@ -496,6 +511,9 @@ private void stitchBlobAndVerify(byte[] requestBody, List expectedSti .map(chunkInfo -> router.getActiveBlobs().get(chunkInfo.getBlobId()).getBlob().array()) .forEach(buf -> expectedContent.write(buf, 0, buf.length)); assertEquals("Unexpected blob content stored", ByteBuffer.wrap(expectedContent.toByteArray()), blob.getBlob()); + //check actual size of stitched blob + assertEquals("Unexpected blob size", Long.toString(getStitchedBlobSize(expectedStitchedChunks)), + restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE)); } else { TestUtils.assertException(ExecutionException.class, () -> future.get(TIMEOUT_SECS, TimeUnit.SECONDS), errorChecker); diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/NettyMultipartRequest.java b/ambry-rest/src/main/java/com.github.ambry.rest/NettyMultipartRequest.java index b18e9491ac..de2b558f6b 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/NettyMultipartRequest.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/NettyMultipartRequest.java @@ -50,9 +50,9 @@ class NettyMultipartRequest extends NettyRequest { private final Queue rawRequestContents = new LinkedBlockingQueue<>(); private final Logger logger = LoggerFactory.getLogger(getClass()); - private final AtomicLong bytesReceived = new AtomicLong(0); private final long maxSizeAllowedInBytes; - + //For Multipart request, bytes received in http content can be different from blob bytes received. + private final AtomicLong blobBytesReceived = new AtomicLong(0); private boolean readyForRead = false; private boolean hasBlob = false; @@ -221,6 +221,7 @@ private void processPart(InterfaceHttpData part) throws RestServiceException { try { if (isOpen()) { requestContents.add(new DefaultHttpContent(ReferenceCountUtil.retain(fileUpload.content()))); + blobBytesReceived.set(fileUpload.content().capacity()); } else { nettyMetrics.multipartRequestAlreadyClosedError.inc(); throw new RestServiceException("Request is closed", RestServiceErrorCode.RequestChannelClosed); @@ -252,4 +253,9 @@ private void processPart(InterfaceHttpData part) throws RestServiceException { throw new RestServiceException("Unexpected HTTP data", RestServiceErrorCode.BadRequest); } } + + @Override + public long getBlobBytesReceived() { + return blobBytesReceived.get(); + } } diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/NettyRequest.java b/ambry-rest/src/main/java/com.github.ambry.rest/NettyRequest.java index 74663d34c4..c9c3e663f9 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/NettyRequest.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/NettyRequest.java @@ -80,7 +80,7 @@ class NettyRequest implements RestRequest { private final RestMethod restMethod; private final RestRequestMetricsTracker restRequestMetricsTracker = new RestRequestMetricsTracker(); private final AtomicBoolean channelOpen = new AtomicBoolean(true); - private final AtomicLong bytesReceived = new AtomicLong(0); + protected final AtomicLong bytesReceived = new AtomicLong(0); private final AtomicLong bytesBuffered = new AtomicLong(0); private final Logger logger = LoggerFactory.getLogger(getClass()); private final RecvByteBufAllocator recvByteBufAllocator = new DefaultMaxBytesRecvByteBufAllocator(); @@ -391,6 +391,11 @@ public long getBytesReceived() { return bytesReceived.get(); } + @Override + public long getBlobBytesReceived() { + return bytesReceived.get(); + } + /** * Adds some content in the form of {@link HttpContent} to this RestRequest. This content will be available to read * through the read operations.