Skip to content

Commit

Permalink
Let aws-sdk retry upload especially when conflict with parallel uploads
Browse files Browse the repository at this point in the history
* fixes awslabs#129
  • Loading branch information
ikedam committed Jul 3, 2022
1 parent 3fbfde2 commit 78d1e55
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
9 changes: 9 additions & 0 deletions src/main/java/LoggingHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,13 @@ public static void log(final TaskListener listener, String message, String secon
listener.getLogger().println(completeMessage);
}
}

public static void logError(final TaskListener listener, String message, Throwable t) {
log(listener, message);
if(listener == null) {
t.printStackTrace(System.out);
} else {
t.printStackTrace(listener.getLogger());
}
}
}
71 changes: 69 additions & 2 deletions src/main/java/S3DataManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.File;
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;
Expand Down Expand Up @@ -85,12 +86,12 @@ public UploadToS3Output uploadSourceToS3(TaskListener listener, FilePath workspa
PutObjectRequest putObjectRequest;
PutObjectResult putObjectResult = new PutObjectResult();

try(InputStream zipFileInputStream = localFile.read()) {
try(InputStream zipFileInputStream = new ResetableFilePathInputStream(localFile)) {
putObjectRequest = new PutObjectRequest(s3InputBucket, s3InputKey, zipFileInputStream, objectMetadata);
LoggingHelper.log(listener, "Uploading to S3 at location " + putObjectRequest.getBucketName() + "/" + putObjectRequest.getKey() + ". MD5 checksum is " + zipFileMD5);
putObjectResult = s3Client.putObject(putObjectRequest);
} catch (SdkClientException e) {
LoggingHelper.log(listener, "Unexpected exception upon uploading source zip to S3: " + e.getMessage());
LoggingHelper.logError(listener, "Unexpected exception upon uploading source zip to S3: " + e.getMessage(), e);
}

try {
Expand All @@ -109,4 +110,70 @@ private String getTempFilePath(String filePath) {
public static String getZipMD5(File zipFile) throws IOException {
return new String(encodeBase64(DigestUtils.md5(new FileInputStream(zipFile))), Charsets.UTF_8);
}

/**
* ResetableFilePathInputStream wraps `FilePath.read()` and supports `mark()` and `reset()` by calling `FilePath.read()` again.
* Parallel uploads may result errors and let aws-sdk retry upload with this.
*/
private static class ResetableFilePathInputStream extends FilterInputStream {
private long pos = 0;
private long marked = 0;
private FilePath file;

ResetableFilePathInputStream(FilePath file) throws IOException, InterruptedException {
super(file.read());
this.file = file;
}

@Override
public boolean markSupported() {
return true;
}

@Override
public void mark(int readlimit) {
this.marked = this.pos;
}

@Override
public void reset() throws IOException {
if (this.in != null) {
this.in.close();
}
try {
this.in = this.file.read();
} catch (IOException|InterruptedException e) {
throw new IOException("Failed to reopen " + this.file.getName(), e);
}
this.skip(this.marked);
this.pos = this.marked;
}

@Override
public int read() throws IOException {
int r = super.read();
if (r != -1) {
++this.pos;
}
return r;
}

@Override
public int read(byte[] b) throws IOException {
int r = super.read(b);
if (r > 0) {
this.pos += r;
}
return r;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int r = super.read(b, off, len);
if (r > 0) {
this.pos += r;
}
return r;
}
}
}

0 comments on commit 78d1e55

Please sign in to comment.