Skip to content

Commit

Permalink
perf($OSS): fully support async stream and download resources
Browse files Browse the repository at this point in the history
fully support async stream and download resources

BREAKING CHANGE: fully support async stream and download resources
  • Loading branch information
johnnymillergh committed Aug 12, 2021
1 parent 64d7776 commit decb5a1
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
Expand All @@ -30,13 +29,6 @@ public class ReadResourceController {

@GetMapping("/stream/{bucket}/{object}")
@ApiOperation(value = "Stream single resource", notes = "Stream single resource")
public ResponseEntity<Resource> streamSingleResource(@PathVariable String bucket, @PathVariable String object,
@RequestHeader(name = HttpHeaders.RANGE, required = false) String range) {
return this.readResourceService.streamSingleResource(bucket, object, range);
}

@GetMapping("/async/stream/{bucket}/{object}")
@ApiOperation(value = "Stream single resource", notes = "Stream single resource")
public ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@PathVariable String bucket,
@PathVariable String object,
@RequestHeader(name = HttpHeaders.RANGE,
Expand All @@ -46,7 +38,8 @@ public ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@PathVari

@GetMapping("/download/{bucket}/{object}")
@ApiOperation(value = "Download single resource", notes = "Download single resource")
public ResponseEntity<Resource> downloadSingleResource(@PathVariable String bucket, @PathVariable String object) {
return this.readResourceService.downloadSingleResource(bucket, object);
public ResponseEntity<StreamingResponseBody> downloadSingleResource(@PathVariable String bucket,
@PathVariable String object) {
return this.readResourceService.asyncDownloadSingleResource(bucket, object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,46 @@ public interface ReadResourceService {
DataSize LARGE_CHUNK_SIZE = DataSize.ofMegabytes(8);

/**
* Stream single resource.
* Async stream single resource response entity.
*
* @param bucket the bucket
* @param object the object
* @param range the range
* @return the single resource
* @return the response entity
*/
ResponseEntity<Resource> streamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range);
ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range);

/**
* Async stream single resource response entity.
* Async download single resource response entity.
*
* @param bucket the bucket
* @param object the object
* @param range the range
* @return the response entity
*/
ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range);
ResponseEntity<StreamingResponseBody> asyncDownloadSingleResource(@NotBlank String bucket, @NotBlank String object);

/**
* Stream single resource.
*
* @param bucket the bucket
* @param object the object
* @param range the range
* @return the single resource
* @deprecated since it's not async method, will be deleted in the future.
*/
@Deprecated(forRemoval = true)
ResponseEntity<Resource> streamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range);

/**
* Download single resource response entity.
*
* @param bucket the bucket
* @param object the object
* @return the response entity
* @deprecated since it's not async method, will be deleted in the future.
*/
@Deprecated(forRemoval = true)
ResponseEntity<Resource> downloadSingleResource(@NotBlank String bucket, @NotBlank String object);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public class ReadResourceServiceImpl implements ReadResourceService {
private final MinioHelper minioHelper;

@Override
public ResponseEntity<Resource> streamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range) {
public ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@NotBlank String bucket,
@NotBlank String object,
@Nullable String range) {
StatObjectResponse statObjectResponse;
try {
statObjectResponse = this.minioHelper.statObject(bucket, object);
Expand All @@ -48,15 +49,41 @@ public ResponseEntity<Resource> streamSingleResource(@NotBlank String bucket, @N
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.contentLength(statObjectResponse.size())
.contentType(MediaType.parseMediaType(statObjectResponse.contentType()))
.body(new InputStreamResource(getObjectResponse));
.body(outputStream -> {
IoUtil.copy(getObjectResponse, outputStream);
IoUtil.close(getObjectResponse);
});
}
return this.getResourceRegion(bucket, object, statObjectResponse, httpRanges);
return this.asyncGetResourceRegion(bucket, object, statObjectResponse, httpRanges);
}

@Override
public ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@NotBlank String bucket,
@NotBlank String object,
@Nullable String range) {
@SuppressWarnings("DuplicatedCode")
public ResponseEntity<StreamingResponseBody> asyncDownloadSingleResource(@NotBlank String bucket,
@NotBlank String object) {
StatObjectResponse statObjectResponse;
try {
statObjectResponse = this.minioHelper.statObject(bucket, object);
} catch (Exception e) {
log.error("Exception occurred when looking for object. Exception message: {}", e.getMessage());
return ResponseEntity.notFound().build();
}
val getObjectResponse = this.minioHelper.getObject(bucket, object);
return ResponseEntity.ok()
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.header(HttpHeaders.CONTENT_DISPOSITION,
ContentDisposition.builder("attachment").filename(object).build().toString())
.contentLength(statObjectResponse.size())
.contentType(MediaType.parseMediaType(statObjectResponse.contentType()))
.body((outputStream -> {
IoUtil.copy(getObjectResponse, outputStream);
IoUtil.close(getObjectResponse);
}));
}

@Override
public ResponseEntity<Resource> streamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range) {
StatObjectResponse statObjectResponse;
try {
statObjectResponse = this.minioHelper.statObject(bucket, object);
Expand All @@ -71,15 +98,13 @@ public ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@NotBlank
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.contentLength(statObjectResponse.size())
.contentType(MediaType.parseMediaType(statObjectResponse.contentType()))
.body(outputStream -> {
IoUtil.copy(getObjectResponse, outputStream);
IoUtil.close(getObjectResponse);
});
.body(new InputStreamResource(getObjectResponse));
}
return this.asyncGetResourceRegion(bucket, object, statObjectResponse, httpRanges);
return this.getResourceRegion(bucket, object, statObjectResponse, httpRanges);
}

@Override
@SuppressWarnings("DuplicatedCode")
public ResponseEntity<Resource> downloadSingleResource(String bucket, String object) {
StatObjectResponse statObjectResponse;
try {
Expand All @@ -99,9 +124,9 @@ public ResponseEntity<Resource> downloadSingleResource(String bucket, String obj
}

@SuppressWarnings("DuplicatedCode")
private ResponseEntity<Resource> getResourceRegion(String bucket, String object,
StatObjectResponse statObjectResponse,
List<HttpRange> httpRanges) {
private ResponseEntity<StreamingResponseBody> asyncGetResourceRegion(String bucket, String object,
StatObjectResponse statObjectResponse,
List<HttpRange> httpRanges) {
val getObjectResponse = this.minioHelper.getObject(bucket, object, httpRanges.get(0).getRangeStart(0),
MEDIUM_CHUNK_SIZE.toBytes());
val start = httpRanges.get(0).getRangeStart(0);
Expand All @@ -114,13 +139,27 @@ private ResponseEntity<Resource> getResourceRegion(String bucket, String object,
.header(HttpHeaders.CONTENT_RANGE, String.format("bytes %d-%d/%d", start, end, resourceLength))
.contentLength(rangeLength)
.contentType(MediaType.parseMediaType(statObjectResponse.contentType()))
.body(new InputStreamResource(getObjectResponse));
.body(outputStream -> {
IoUtil.copy(getObjectResponse, outputStream);
IoUtil.close(getObjectResponse);
});
}

/**
* Gets resource region.
*
* @param bucket the bucket
* @param object the object
* @param statObjectResponse the stat object response
* @param httpRanges the http ranges
* @return the resource region
* @deprecated since it's not async method, will be deleted in the future.
*/
@Deprecated(forRemoval = true)
@SuppressWarnings("DuplicatedCode")
private ResponseEntity<StreamingResponseBody> asyncGetResourceRegion(String bucket, String object,
StatObjectResponse statObjectResponse,
List<HttpRange> httpRanges) {
private ResponseEntity<Resource> getResourceRegion(String bucket, String object,
StatObjectResponse statObjectResponse,
List<HttpRange> httpRanges) {
val getObjectResponse = this.minioHelper.getObject(bucket, object, httpRanges.get(0).getRangeStart(0),
MEDIUM_CHUNK_SIZE.toBytes());
val start = httpRanges.get(0).getRangeStart(0);
Expand All @@ -133,9 +172,6 @@ private ResponseEntity<StreamingResponseBody> asyncGetResourceRegion(String buck
.header(HttpHeaders.CONTENT_RANGE, String.format("bytes %d-%d/%d", start, end, resourceLength))
.contentLength(rangeLength)
.contentType(MediaType.parseMediaType(statObjectResponse.contentType()))
.body(outputStream -> {
IoUtil.copy(getObjectResponse, outputStream);
IoUtil.close(getObjectResponse);
});
.body(new InputStreamResource(getObjectResponse));
}
}

0 comments on commit decb5a1

Please sign in to comment.