From 78d1e55ca877e77815727be3335821856a7f1a80 Mon Sep 17 00:00:00 2001 From: ikedam Date: Sun, 3 Jul 2022 15:34:24 +0900 Subject: [PATCH] Let aws-sdk retry upload especially when conflict with parallel uploads * fixes #129 --- src/main/java/LoggingHelper.java | 9 ++++ src/main/java/S3DataManager.java | 71 +++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/src/main/java/LoggingHelper.java b/src/main/java/LoggingHelper.java index a781c5e..2960592 100644 --- a/src/main/java/LoggingHelper.java +++ b/src/main/java/LoggingHelper.java @@ -36,4 +36,13 @@ public static void log(final TaskListener listener, String message, String secon listener.getLogger().println(completeMessage); } } + + public static void logError(final TaskListener listener, String message, Throwable t) { + log(listener, message); + if(listener == null) { + t.printStackTrace(System.out); + } else { + t.printStackTrace(listener.getLogger()); + } + } } diff --git a/src/main/java/S3DataManager.java b/src/main/java/S3DataManager.java index 00ba3f6..cc668ad 100644 --- a/src/main/java/S3DataManager.java +++ b/src/main/java/S3DataManager.java @@ -27,6 +27,7 @@ import java.io.File; import java.io.FileInputStream; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.util.UUID; @@ -85,12 +86,12 @@ public UploadToS3Output uploadSourceToS3(TaskListener listener, FilePath workspa PutObjectRequest putObjectRequest; PutObjectResult putObjectResult = new PutObjectResult(); - try(InputStream zipFileInputStream = localFile.read()) { + try(InputStream zipFileInputStream = new ResetableFilePathInputStream(localFile)) { putObjectRequest = new PutObjectRequest(s3InputBucket, s3InputKey, zipFileInputStream, objectMetadata); LoggingHelper.log(listener, "Uploading to S3 at location " + putObjectRequest.getBucketName() + "/" + putObjectRequest.getKey() + ". MD5 checksum is " + zipFileMD5); putObjectResult = s3Client.putObject(putObjectRequest); } catch (SdkClientException e) { - LoggingHelper.log(listener, "Unexpected exception upon uploading source zip to S3: " + e.getMessage()); + LoggingHelper.logError(listener, "Unexpected exception upon uploading source zip to S3: " + e.getMessage(), e); } try { @@ -109,4 +110,70 @@ private String getTempFilePath(String filePath) { public static String getZipMD5(File zipFile) throws IOException { return new String(encodeBase64(DigestUtils.md5(new FileInputStream(zipFile))), Charsets.UTF_8); } + + /** + * ResetableFilePathInputStream wraps `FilePath.read()` and supports `mark()` and `reset()` by calling `FilePath.read()` again. + * Parallel uploads may result errors and let aws-sdk retry upload with this. + */ + private static class ResetableFilePathInputStream extends FilterInputStream { + private long pos = 0; + private long marked = 0; + private FilePath file; + + ResetableFilePathInputStream(FilePath file) throws IOException, InterruptedException { + super(file.read()); + this.file = file; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void mark(int readlimit) { + this.marked = this.pos; + } + + @Override + public void reset() throws IOException { + if (this.in != null) { + this.in.close(); + } + try { + this.in = this.file.read(); + } catch (IOException|InterruptedException e) { + throw new IOException("Failed to reopen " + this.file.getName(), e); + } + this.skip(this.marked); + this.pos = this.marked; + } + + @Override + public int read() throws IOException { + int r = super.read(); + if (r != -1) { + ++this.pos; + } + return r; + } + + @Override + public int read(byte[] b) throws IOException { + int r = super.read(b); + if (r > 0) { + this.pos += r; + } + return r; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int r = super.read(b, off, len); + if (r > 0) { + this.pos += r; + } + return r; + } + } }