Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add post blob size #1188

Merged
merged 6 commits into from
Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private Callback<String> routerStitchBlobCallback(BlobInfo blobInfo) {
*/
private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.postRouterPutBlobMetrics, blobId -> {
setSignedIdMetadataIfRequired(blobInfo.getBlobProperties());
setSignedIdMetadataAndBlobSize(blobInfo.getBlobProperties());
idConverter.convert(restRequest, blobId, idConverterCallback(blobInfo));
}, uri, LOGGER, finalCallback);
}
Expand Down Expand Up @@ -301,7 +301,7 @@ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceExceptio
* @param blobProperties the {@link BlobProperties} from the request.
* @throws RestServiceException
*/
private void setSignedIdMetadataIfRequired(BlobProperties blobProperties) throws RestServiceException {
private void setSignedIdMetadataAndBlobSize(BlobProperties blobProperties) throws RestServiceException {
if (RestUtils.isChunkUpload(restRequest.getArgs())) {
Map<String, String> metadata = new HashMap<>(2);
metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBytesReceived()));
Expand All @@ -311,6 +311,8 @@ private void setSignedIdMetadataIfRequired(BlobProperties blobProperties) throws
Long.toString(Utils.addSecondsToEpochTime(time.milliseconds(), blobProperties.getTimeToLiveInSeconds())));
restRequest.setArg(RestUtils.InternalKeys.SIGNED_ID_METADATA_KEY, metadata);
}
//the actual blob size is the number of bytes read
restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBytesReceived());
}

/**
Expand Down Expand Up @@ -348,6 +350,7 @@ List<ChunkInfo> getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj
}
List<ChunkInfo> chunksToStitch = new ArrayList<>(signedChunkIds.size());
String expectedSession = null;
long totalStitchedBlobSize = 0;
for (String signedChunkId : signedChunkIds) {
if (!idSigningService.isIdSigned(signedChunkId)) {
throw new RestServiceException("All chunks IDs must be signed: " + signedChunkId,
Expand All @@ -360,6 +363,8 @@ List<ChunkInfo> getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj
expectedSession = verifyChunkUploadSession(metadata, expectedSession);
@SuppressWarnings("ConstantConditions")
long chunkSizeBytes = RestUtils.getLongHeader(metadata, RestUtils.Headers.BLOB_SIZE, true);

totalStitchedBlobSize += chunkSizeBytes;
// Expiration time is sent to the router, but not verified in this handler. The router is responsible for making
// checks related to internal ambry requirements, like making sure that the chunks do not expire before the
// metadata blob.
Expand All @@ -369,6 +374,8 @@ List<ChunkInfo> getChunksToStitch(BlobProperties stitchedBlobProperties, JSONObj

chunksToStitch.add(new ChunkInfo(blobId, chunkSizeBytes, expirationTimeMs));
}
//the actual blob size for stitched blob is the sum of all the chunk sizes
restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, totalStitchedBlobSize);
return chunksToStitch;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ private void doChunkUploadTest(int contentLength, boolean chunkUpload, String up
InMemoryRouter.InMemoryBlob blob = router.getActiveBlobs().get(idConverterFactory.lastInput);
assertEquals("Unexpected blob content stored", ByteBuffer.wrap(content), blob.getBlob());
assertEquals("Unexpected ttl stored", blobTtlSecs, blob.getBlobProperties().getTimeToLiveInSeconds());
//check that blob size matches the actual upload size
assertEquals("Invalid blob size", Integer.toString(contentLength), restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE));
} else {
TestUtils.assertException(ExecutionException.class, () -> future.get(TIMEOUT_SECS, TimeUnit.SECONDS),
errorChecker);
Expand Down Expand Up @@ -468,6 +470,19 @@ private byte[] getStitchRequestBody(List<String> signedChunkIds) {
return StitchRequestSerDe.toJson(signedChunkIds).toString().getBytes(StandardCharsets.UTF_8);
}

/**
* Caculate the blob size for a stitched blob from the individual chunks.
* @param stitchedChunks list of chunks stitched to form the blob.
* @return size of the stitched blob
*/
private long getStitchedBlobSize(List<ChunkInfo> stitchedChunks) {
long blobSize = 0;
for(ChunkInfo chunkInfo : stitchedChunks) {
blobSize += chunkInfo.getChunkSizeInBytes();
}
return blobSize;
}

/**
* Make a stitch blob call using {@link PostBlobHandler} and verify the result of the operation.
* @param requestBody the body of the stitch request to supply.
Expand Down Expand Up @@ -496,6 +511,9 @@ private void stitchBlobAndVerify(byte[] requestBody, List<ChunkInfo> expectedSti
.map(chunkInfo -> router.getActiveBlobs().get(chunkInfo.getBlobId()).getBlob().array())
.forEach(buf -> expectedContent.write(buf, 0, buf.length));
assertEquals("Unexpected blob content stored", ByteBuffer.wrap(expectedContent.toByteArray()), blob.getBlob());
//check actual size of stitched blob
assertEquals("Unexpected blob size", Long.toString(getStitchedBlobSize(expectedStitchedChunks)),
restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE));
} else {
TestUtils.assertException(ExecutionException.class, () -> future.get(TIMEOUT_SECS, TimeUnit.SECONDS),
errorChecker);
Expand Down