Skip to content

Commit

Permalink
fix(controller): fix dataset link auth error (#1013)
Browse files Browse the repository at this point in the history
* support empty auth

* fix protocol integration error

* support offset download
  • Loading branch information
anda-ren authored Aug 26, 2022
1 parent e870ea7 commit 67bb17c
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ void pullLinkContent(
@RequestParam(name = "uri",required = true) String uri,
@Parameter(name = "authName", description = "auth name the link used")
@RequestParam(name = "authName",required = false) String authName,
@Parameter(name = "offset", description = "offset in the content")
@RequestParam(name = "offset",required = false) Long offset,
@Parameter(name = "size", description = "data size")
@RequestParam(name = "size",required = false) Long size,
HttpServletResponse httpResponse);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,15 @@ public void pullDS(String projectUrl, String datasetUrl, String versionUrl,

@Override
public void pullLinkContent(String projectUrl, String datasetUrl, String versionUrl,
String uri,String authName, HttpServletResponse httpResponse) {
String uri,String authName,Long offset,Long size, HttpServletResponse httpResponse) {
if(!StringUtils.hasText(datasetUrl) || !StringUtils.hasText(versionUrl) ){
throw new StarWhaleApiException(new SWValidationException(ValidSubject.SWDS)
.tip("please provide name and version for the DS "), HttpStatus.BAD_REQUEST);
}
SWDatasetVersionEntity datasetVersionEntity = swDatasetService.query(projectUrl, datasetUrl, versionUrl);
try {
ServletOutputStream outputStream = httpResponse.getOutputStream();
outputStream.write(swDatasetService.dataOf(datasetVersionEntity.getId(),uri,authName));
outputStream.write(swDatasetService.dataOf(datasetVersionEntity.getId(),uri,authName,offset,size));
outputStream.flush();
} catch (IOException e) {
log.error("error write data to response",e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class SWDatasetService {
@Resource
private StorageProperties storageProperties;

@Resource
private DSFileGetter dsFileGetter;

private BundleManager bundleManager() {
Expand Down Expand Up @@ -290,7 +291,8 @@ public SWDatasetVersionEntity query(String projectUrl, String datasetUrl, String
return versionEntity;
}

public byte[] dataOf(Long datasetId,String uri,String authName){
return dsFileGetter.dataOf(datasetId,uri,authName);
public byte[] dataOf(Long datasetId, String uri, String authName, Long offset,
Long size){
return dsFileGetter.dataOf(datasetId,uri,authName,offset,size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ public class IndexItem {
String dataOrigin;

@JsonProperty("object_store_type")
String object_store_type;
String objectStoreType;

@JsonProperty("auth_name")
String auth_name;
String authName;

@JsonProperty("data_mime_type")
String dataMimeType;

@JsonProperty("label")
String label;
Expand All @@ -72,6 +75,9 @@ public class IndexItem {
, new ColumnSchemaDesc("data_offset", ColumnType.INT64.name())
, new ColumnSchemaDesc("data_size", ColumnType.INT64.name())
, new ColumnSchemaDesc("data_origin", ColumnType.STRING.name())
, new ColumnSchemaDesc("object_store_type", ColumnType.STRING.name())
, new ColumnSchemaDesc("auth_name", ColumnType.STRING.name())
, new ColumnSchemaDesc("data_mime_type", ColumnType.STRING.name())
, new ColumnSchemaDesc("label", ColumnType.STRING.name())
));

Expand All @@ -83,6 +89,9 @@ public RecordDesc toRecordDesc(){
ret.add(new RecordValueDesc("data_offset",ColumnType.INT64.encode(dataOffset)));
ret.add(new RecordValueDesc("data_size",ColumnType.INT64.encode(dataSize)));
ret.add(new RecordValueDesc("data_origin",dataOrigin));
ret.add(new RecordValueDesc("object_store_type",objectStoreType));
ret.add(new RecordValueDesc("auth_name",authName));
ret.add(new RecordValueDesc("data_mime_type",dataMimeType));
ret.add(new RecordValueDesc("label",label));
return new RecordDesc(ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ public DSFileGetter(StorageAccessParser storageAccessParser) {
this.storageAccessParser = storageAccessParser;
}

public byte[] dataOf(Long datasetId, String uri, String authName) {
public byte[] dataOf(Long datasetId, String uri, String authName, Long offset,
Long size) {
StorageAccessService storageAccessService = storageAccessParser.getStorageAccessServiceFromAuth(
datasetId, uri, authName);
try (InputStream inputStream = storageAccessService.get(new StorageUri(uri).getPath())) {
try (InputStream inputStream = storageAccessService.get(new StorageUri(uri).getPath(),offset,size)) {
return inputStream.readAllBytes();
} catch (IOException ioException) {
log.error("error while accessing storage ", ioException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public StorageAccessParser(StorageAccessService defaultStorageAccessService,
*/
public StorageAccessService getStorageAccessServiceFromAuth(Long datasetId, String uri,
String authName) {

if(StringUtils.hasText(authName)){
authName=authName.toUpperCase();//env vars are uppercase always
}
StorageAccessService cachedStorageAccessService = storageAccessServicePool.get(
formatKey(datasetId, authName));
if (null != cachedStorageAccessService) {
Expand Down Expand Up @@ -110,10 +112,10 @@ S3Config env2S3Config(StorageUri storageUri, FileStorageEnv env, String authName
String endpoint = StringUtils.hasText(storageUri.getHost()) ? buildEndPoint(storageUri)
: envs.get(String.format(KEY_ENDPOINT, authName));
return new S3Config(bucket
, envs.get(accessKey)
, envs.get(accessSecret)
, accessKey
, accessSecret
, envs.get(String.format(KEY_REGION, authName))
, envs.get(endpoint));
, endpoint);
}

private String buildEndPoint(StorageUri storageUri) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class StorageAuths {

static final String NAME_DEFAULT="";

static final Pattern LINE_PATTERN=Pattern.compile("^(USER\\.(S3|HDFS|WEBHDFS|LOCALFS|NFS|FTP|SFTP|HTTP|HTTPS)\\.((\\w+)\\.)?(\\w+))=(\\w+)$");
static final Pattern LINE_PATTERN=Pattern.compile("^(USER\\.(S3|HDFS|WEBHDFS|LOCALFS|NFS|FTP|SFTP|HTTP|HTTPS)\\.((\\w+)\\.)?(\\w+))=(\\w*)$");
public StorageAuths(String authsText){
String[] lines = authsText.split("\n");
Stream.of(lines).forEach(line->{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class SwdsUploader {
final IndexWriter indexWriter;

static final String INDEX_FILE_NAME="_meta.jsonl";
static final String AUTH_FILE_NAME="auth_env";
static final String AUTH_FILE_NAME=".auth_env";

public SwdsUploader(HotSwdsHolder hotSwdsHolder, SWDatasetMapper swdsMapper,
SWDatasetVersionMapper swdsVersionMapper, StoragePathCoordinator storagePathCoordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Scheduled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@ public void testS3(){
+ "USER.S3.ACCESS_KEY=access_key\n"
+ "USER.S3.myname.ENDPOINT=endpoint1\n"
+ "USER.S3.myname.SECRET=secret1\n"
+ "USER.S3.MNIST.SECRET=\n"
+ "USER.S3.myname.ACCESS_KEY=access_key1\n";
StorageAuths storageAuths = new StorageAuths(auths);
FileStorageEnv defaultEnv = storageAuths.getEnv("");
FileStorageEnv myEnv = storageAuths.getEnv("myname");
Assertions.assertEquals(FileSystemEnvType.S3, defaultEnv.getEnvType());
Assertions.assertEquals("region", defaultEnv.getEnvs().get("USER.S3.REGION"));
Assertions.assertEquals("endpoint", defaultEnv.getEnvs().get("USER.S3.ENDPOINT"));
Assertions.assertEquals("secret", defaultEnv.getEnvs().get("USER.S3.SECRET"));
Assertions.assertEquals("access_key", defaultEnv.getEnvs().get("USER.S3.ACCESS_KEY"));

FileStorageEnv myEnv = storageAuths.getEnv("myname");
Assertions.assertEquals(FileSystemEnvType.S3, myEnv.getEnvType());
Assertions.assertNull(myEnv.getEnvs().get("USER.S3.myname.REGION"));
Assertions.assertEquals("endpoint1", myEnv.getEnvs().get("USER.S3.myname.ENDPOINT"));
Assertions.assertEquals("secret1", myEnv.getEnvs().get("USER.S3.myname.SECRET"));
Assertions.assertEquals("access_key1", myEnv.getEnvs().get("USER.S3.myname.ACCESS_KEY"));

FileStorageEnv mnist = storageAuths.getEnv("MNIST");
Assertions.assertEquals(FileSystemEnvType.S3, mnist.getEnvType());
Assertions.assertEquals("", mnist.getEnvs().get("USER.S3.MNIST.SECRET"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface StorageAccessService {
void put(String path,InputStream inputStream) throws IOException;
void put(String path,byte[] body) throws IOException;
InputStream get(String path) throws IOException;
InputStream get(String path,Long offset,Long size) throws IOException;
Stream<String> list(String path) throws IOException;
void delete(String path) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ public class StorageAccessServiceS3 implements StorageAccessService {

final S3Client s3client;

public StorageAccessServiceS3(S3Config s3Config){
public StorageAccessServiceS3(S3Config s3Config) {
this.s3Config = s3Config;
AwsBasicCredentials awsCreds = AwsBasicCredentials.create(
s3Config.getAccessKey(),
s3Config.getSecretKey());
S3ClientBuilder s3ClientBuilder = S3Client.builder()
.credentialsProvider(StaticCredentialsProvider.create(awsCreds))
.region(Region.of(s3Config.getRegion()));
if(s3Config.overWriteEndPoint()){
if (s3Config.overWriteEndPoint()) {
s3ClientBuilder.endpointOverride(URI.create(s3Config.getEndpoint()));
}
this.s3client = s3ClientBuilder
Expand All @@ -63,22 +63,24 @@ public StorageAccessServiceS3(S3Config s3Config){

@Override
public StorageObjectInfo head(String path) throws IOException {
HeadObjectRequest build = HeadObjectRequest.builder().bucket(s3Config.getBucket()).key(path).build();
try{
HeadObjectRequest build = HeadObjectRequest.builder().bucket(s3Config.getBucket()).key(path)
.build();
try {
HeadObjectResponse headObjectResponse = s3client.headObject(build);
return new StorageObjectInfo(true,headObjectResponse.contentLength(),mapToString(headObjectResponse.metadata()));
}catch (NoSuchKeyException e){
return new StorageObjectInfo(false,0L,null);
return new StorageObjectInfo(true, headObjectResponse.contentLength(),
mapToString(headObjectResponse.metadata()));
} catch (NoSuchKeyException e) {
return new StorageObjectInfo(false, 0L, null);
}

}

private String mapToString(Map<String, String> metadata) {
if(metadata == null || metadata.isEmpty()){
if (metadata == null || metadata.isEmpty()) {
return null;
}
StringBuilder stringBuilder = new StringBuilder();
metadata.forEach((k,v)->{
metadata.forEach((k, v) -> {
stringBuilder.append(k);
stringBuilder.append(":");
stringBuilder.append(v);
Expand All @@ -88,23 +90,27 @@ private String mapToString(Map<String, String> metadata) {
}

/**
* when you are trying to upload a file that is larger than Integer.MAX_VALUE bytes which is about 2G,
* you should wrapp the inputStream with a LargeFileInputStream
* when you are trying to upload a file that is larger than Integer.MAX_VALUE bytes which is
* about 2G, you should wrapp the inputStream with a LargeFileInputStream
*/
@Override
public void put(String path,InputStream inputStream) throws IOException {
public void put(String path, InputStream inputStream) throws IOException {
long fileSize;
if(inputStream instanceof LargeFileInputStream){
fileSize = ((LargeFileInputStream)inputStream).size();
}else {
if (inputStream instanceof LargeFileInputStream) {
fileSize = ((LargeFileInputStream) inputStream).size();
} else {
fileSize = inputStream.available();
}
s3client.putObject(PutObjectRequest.builder().bucket(s3Config.getBucket()).key(path).build(),RequestBody.fromInputStream(inputStream, fileSize));
s3client.putObject(
PutObjectRequest.builder().bucket(s3Config.getBucket()).key(path).build(),
RequestBody.fromInputStream(inputStream, fileSize));
}

@Override
public void put(String path, byte[] body) {
s3client.putObject(PutObjectRequest.builder().bucket(s3Config.getBucket()).key(path).build(),RequestBody.fromBytes(body));
s3client.putObject(
PutObjectRequest.builder().bucket(s3Config.getBucket()).key(path).build(),
RequestBody.fromBytes(body));
}

@Override
Expand All @@ -113,6 +119,22 @@ public InputStream get(String path) {
.getObject(GetObjectRequest.builder().bucket(s3Config.getBucket()).key(path).build());
}


//bytes=0-10098
static final String RANGE_FORMAT = "bytes=%d-%d";

@Override
public InputStream get(String path, Long offset, Long size) throws IOException {
if (null == offset || null == size || offset < 0 || size <= 0) {
return get(path);
}

return s3client
.getObject(
GetObjectRequest.builder().range(String.format(RANGE_FORMAT, offset, offset + size - 1))
.bucket(s3Config.getBucket()).key(path).build());
}

@Override
public Stream<String> list(String path) {
final ListObjectsResponse listObjectsResponse = s3client.listObjects(
Expand Down

0 comments on commit 67bb17c

Please sign in to comment.