Skip to content

Commit

Permalink
perf($OSS): support async stream
Browse files Browse the repository at this point in the history
support async stream

BREAKING CHANGE: support async stream
  • Loading branch information
johnnymillergh committed Aug 11, 2021
1 parent be943ce commit f413484
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

/**
* <h1>ReadResourceController</h1>
Expand All @@ -34,6 +35,15 @@ public ResponseEntity<Resource> streamSingleResource(@PathVariable String bucket
return this.readResourceService.streamSingleResource(bucket, object, range);
}

@GetMapping("/async/stream/{bucket}/{object}")
@ApiOperation(value = "Stream single resource", notes = "Stream single resource")
public ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@PathVariable String bucket,
@PathVariable String object,
@RequestHeader(name = HttpHeaders.RANGE,
required = false) String range) {
return this.readResourceService.asyncStreamSingleResource(bucket, object, range);
}

@GetMapping("/download/{bucket}/{object}")
@ApiOperation(value = "Download single resource", notes = "Download single resource")
public ResponseEntity<Resource> downloadSingleResource(@PathVariable String bucket, @PathVariable String object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.springframework.lang.Nullable;
import org.springframework.util.unit.DataSize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

import javax.validation.constraints.NotBlank;

Expand Down Expand Up @@ -33,6 +34,17 @@ public interface ReadResourceService {
ResponseEntity<Resource> streamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range);

/**
* Async stream single resource response entity.
*
* @param bucket the bucket
* @param object the object
* @param range the range
* @return the response entity
*/
ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range);

/**
* Download single resource response entity.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.jmsoftware.maf.osscenter.read.service.impl;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.IoUtil;
import com.jmsoftware.maf.osscenter.read.service.ReadResourceService;
import com.jmsoftware.maf.springcloudstarter.helper.MinioHelper;
import io.minio.StatObjectResponse;
Expand Down Expand Up @@ -30,15 +31,6 @@
public class ReadResourceServiceImpl implements ReadResourceService {
private final MinioHelper minioHelper;

/**
* {@inheritDoc}
* <p>
* TODO: consider to refactor this code by StreamingResponseBody
*
* @see StreamingResponseBody
* @see
* <a href='https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#mvc-ann-async-http-streaming'>HTTP Streaming</a>
*/
@Override
public ResponseEntity<Resource> streamSingleResource(@NotBlank String bucket, @NotBlank String object,
@Nullable String range) {
Expand All @@ -61,6 +53,32 @@ public ResponseEntity<Resource> streamSingleResource(@NotBlank String bucket, @N
return this.getResourceRegion(bucket, object, statObjectResponse, httpRanges);
}

@Override
public ResponseEntity<StreamingResponseBody> asyncStreamSingleResource(@NotBlank String bucket,
@NotBlank String object,
@Nullable String range) {
StatObjectResponse statObjectResponse;
try {
statObjectResponse = this.minioHelper.statObject(bucket, object);
} catch (Exception e) {
log.error("Exception occurred when looking for object. Exception message: {}", e.getMessage());
return ResponseEntity.notFound().build();
}
val httpRanges = HttpRange.parseRanges(range);
if (CollUtil.isEmpty(httpRanges)) {
val getObjectResponse = this.minioHelper.getObject(bucket, object, 0, MEDIUM_CHUNK_SIZE.toBytes());
return ResponseEntity.ok()
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.contentLength(statObjectResponse.size())
.contentType(MediaType.parseMediaType(statObjectResponse.contentType()))
.body(outputStream -> {
IoUtil.copy(getObjectResponse, outputStream);
IoUtil.close(getObjectResponse);
});
}
return this.asyncGetResourceRegion(bucket, object, statObjectResponse, httpRanges);
}

@Override
public ResponseEntity<Resource> downloadSingleResource(String bucket, String object) {
StatObjectResponse statObjectResponse;
Expand All @@ -80,6 +98,7 @@ public ResponseEntity<Resource> downloadSingleResource(String bucket, String obj
.body(new InputStreamResource(getObjectResponse));
}

@SuppressWarnings("DuplicatedCode")
private ResponseEntity<Resource> getResourceRegion(String bucket, String object,
StatObjectResponse statObjectResponse,
List<HttpRange> httpRanges) {
Expand All @@ -97,4 +116,26 @@ private ResponseEntity<Resource> getResourceRegion(String bucket, String object,
.contentType(MediaType.parseMediaType(statObjectResponse.contentType()))
.body(new InputStreamResource(getObjectResponse));
}

@SuppressWarnings("DuplicatedCode")
private ResponseEntity<StreamingResponseBody> asyncGetResourceRegion(String bucket, String object,
StatObjectResponse statObjectResponse,
List<HttpRange> httpRanges) {
val getObjectResponse = this.minioHelper.getObject(bucket, object, httpRanges.get(0).getRangeStart(0),
MEDIUM_CHUNK_SIZE.toBytes());
val start = httpRanges.get(0).getRangeStart(0);
var end = start + MEDIUM_CHUNK_SIZE.toBytes() - 1;
val resourceLength = statObjectResponse.size();
end = Math.min(end, resourceLength - 1);
val rangeLength = end - start + 1;
return ResponseEntity.status(HttpStatus.PARTIAL_CONTENT)
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.header(HttpHeaders.CONTENT_RANGE, String.format("bytes %d-%d/%d", start, end, resourceLength))
.contentLength(rangeLength)
.contentType(MediaType.parseMediaType(statObjectResponse.contentType()))
.body(outputStream -> {
IoUtil.copy(getObjectResponse, outputStream);
IoUtil.close(getObjectResponse);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
Expand All @@ -20,11 +21,13 @@ public class AsyncConfiguration {

@Bean
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setMaxPoolSize(100);
executor.setBeanName("spring-boot-starter-thread-pool-task-executor");
executor.setThreadNamePrefix(this.mafProjectProperty.getProjectArtifactId());
val executor = new ThreadPoolTaskExecutor();
val corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
log.info("corePoolSize = {}, for AsyncTaskExecutor", corePoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(corePoolSize * 3);
executor.setBeanName("async-task-executor");
executor.setThreadNamePrefix(String.format("%s-", this.mafProjectProperty.getProjectArtifactId()));
// Specify the RejectedExecutionHandler to use for the ExecutorService.
// Default is the ExecutorService's default abort policy.
executor.setRejectedExecutionHandler((runnable, executor1) -> {
Expand Down

0 comments on commit f413484

Please sign in to comment.