diff --git a/lib/lib-storage/README.md b/lib/lib-storage/README.md index 56bf7c9621f2..db70ec3dc903 100644 --- a/lib/lib-storage/README.md +++ b/lib/lib-storage/README.md @@ -16,12 +16,23 @@ try { client: new S3({}) || new S3Client({}), params: { Bucket, Key, Body }, + // optional tags tags: [ /*...*/ - ], // optional tags - queueSize: 4, // optional concurrency configuration - partSize: 1024 * 1024 * 5, // optional size of each part, in bytes, at least 5MB - leavePartsOnError: false, // optional manually handle dropped parts + ], + + // additional optional fields show default values below: + + // (optional) concurrency configuration + queueSize: 4, + + // (optional) size of each part, in bytes, at least 5MB + partSize: 1024 * 1024 * 5, + + // (optional) when true, do not automatically call AbortMultipartUpload when + // a multipart upload fails to complete. You should then manually handle + // the leftover parts. + leavePartsOnError: false, }); parallelUploads3.on("httpUploadProgress", (progress) => { diff --git a/lib/lib-storage/src/Upload.spec.ts b/lib/lib-storage/src/Upload.spec.ts index 65a2e5c5f77e..fe1edff2ff12 100644 --- a/lib/lib-storage/src/Upload.spec.ts +++ b/lib/lib-storage/src/Upload.spec.ts @@ -711,4 +711,16 @@ describe(Upload.name, () => { expect(error).toBeDefined(); } }); + + it("should reject calling .done() more than once on an instance", async () => { + const upload = new Upload({ + params, + client: new S3({}), + }); + + await upload.done(); + expect(() => upload.done()).rejects.toEqual( + new Error("@aws-sdk/lib-storage: this instance of Upload has already executed .done(). Create a new instance.") + ); + }); }); diff --git a/lib/lib-storage/src/Upload.ts b/lib/lib-storage/src/Upload.ts index 5105451e0a06..6288a89df4bb 100644 --- a/lib/lib-storage/src/Upload.ts +++ b/lib/lib-storage/src/Upload.ts @@ -1,4 +1,5 @@ import { + AbortMultipartUploadCommand, CompletedPart, CompleteMultipartUploadCommand, CompleteMultipartUploadCommandOutput, @@ -36,18 +37,18 @@ const MIN_PART_SIZE = 1024 * 1024 * 5; export class Upload extends EventEmitter { /** - * S3 multipart upload does not allow more than 10000 parts. + * S3 multipart upload does not allow more than 10,000 parts. */ - private MAX_PARTS = 10000; + private MAX_PARTS = 10_000; // Defaults. - private queueSize = 4; - private partSize = MIN_PART_SIZE; - private leavePartsOnError = false; - private tags: Tag[] = []; + private readonly queueSize: number = 4; + private readonly partSize = MIN_PART_SIZE; + private readonly leavePartsOnError: boolean = false; + private readonly tags: Tag[] = []; - private client: S3Client; - private params: PutObjectCommandInput; + private readonly client: S3Client; + private readonly params: PutObjectCommandInput; // used for reporting progress. private totalBytes?: number; @@ -57,13 +58,19 @@ export class Upload extends EventEmitter { private abortController: IAbortController; private concurrentUploaders: Promise[] = []; private createMultiPartPromise?: Promise; + private abortMultipartUploadCommand: AbortMultipartUploadCommand | null = null; private uploadedParts: CompletedPart[] = []; - private uploadId?: string; - uploadEvent?: string; + private uploadEnqueuedPartsCount = 0; + /** + * Last UploadId if the upload was done with MultipartUpload and not PutObject. + */ + public uploadId?: string; + public uploadEvent?: string; private isMultiPart = true; private singleUploadResult?: CompleteMultipartUploadCommandOutput; + private sent = false; constructor(options: Options) { super(); @@ -94,6 +101,12 @@ export class Upload extends EventEmitter { } public async done(): Promise { + if (this.sent) { + throw new Error( + "@aws-sdk/lib-storage: this instance of Upload has already executed .done(). Create a new instance." + ); + } + this.sent = true; return await Promise.race([this.__doMultipartUpload(), this.__abortTimeout(this.abortController.signal)]); } @@ -184,104 +197,64 @@ export class Upload extends EventEmitter { private async __createMultipartUpload(): Promise { if (!this.createMultiPartPromise) { const createCommandParams = { ...this.params, Body: undefined }; - this.createMultiPartPromise = this.client.send(new CreateMultipartUploadCommand(createCommandParams)); + this.createMultiPartPromise = this.client + .send(new CreateMultipartUploadCommand(createCommandParams)) + .then((createMpuResponse) => { + // We use the parameter Bucket/Key rather than the information from + // createMultipartUpload response in case the Bucket is an access point arn. + this.abortMultipartUploadCommand = new AbortMultipartUploadCommand({ + Bucket: this.params.Bucket, + Key: this.params.Key, + UploadId: createMpuResponse.UploadId, + }); + return createMpuResponse; + }); } return this.createMultiPartPromise; } private async __doConcurrentUpload(dataFeeder: AsyncGenerator): Promise { for await (const dataPart of dataFeeder) { - if (this.uploadedParts.length > this.MAX_PARTS) { + if (this.uploadEnqueuedPartsCount > this.MAX_PARTS) { throw new Error( - `Exceeded ${this.MAX_PARTS} as part of the upload to ${this.params.Key} and ${this.params.Bucket}.` + `Exceeded ${this.MAX_PARTS} parts in multipart upload to Bucket: ${this.params.Bucket} Key: ${this.params.Key}.` ); } - try { - if (this.abortController.signal.aborted) { - return; - } + if (this.abortController.signal.aborted) { + return; + } - // Use put instead of multi-part for one chunk uploads. - if (dataPart.partNumber === 1 && dataPart.lastPart) { - return await this.__uploadUsingPut(dataPart); - } + // Use put instead of multipart for one chunk uploads. + if (dataPart.partNumber === 1 && dataPart.lastPart) { + return await this.__uploadUsingPut(dataPart); + } - if (!this.uploadId) { - const { UploadId } = await this.__createMultipartUpload(); - this.uploadId = UploadId; - if (this.abortController.signal.aborted) { - return; - } + if (!this.uploadId) { + const { UploadId } = await this.__createMultipartUpload(); + this.uploadId = UploadId; + if (this.abortController.signal.aborted) { + return; } + } - const partSize: number = byteLength(dataPart.data) || 0; - - const requestHandler = this.client.config.requestHandler; - const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null; - - let lastSeenBytes = 0; - const uploadEventListener = (event: ProgressEvent, request: HttpRequest) => { - const requestPartSize = Number(request.query["partNumber"]) || -1; - - if (requestPartSize !== dataPart.partNumber) { - // ignored, because the emitted event is not for this part. - return; - } - - if (event.total && partSize) { - this.bytesUploadedSoFar += event.loaded - lastSeenBytes; - lastSeenBytes = event.loaded; - } - - this.__notifyProgress({ - loaded: this.bytesUploadedSoFar, - total: this.totalBytes, - part: dataPart.partNumber, - Key: this.params.Key, - Bucket: this.params.Bucket, - }); - }; + const partSize: number = byteLength(dataPart.data) || 0; - if (eventEmitter !== null) { - // The requestHandler is the xhr-http-handler. - eventEmitter.on("xhr.upload.progress", uploadEventListener); - } + const requestHandler = this.client.config.requestHandler; + const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null; - const partResult = await this.client.send( - new UploadPartCommand({ - ...this.params, - UploadId: this.uploadId, - Body: dataPart.data, - PartNumber: dataPart.partNumber, - }) - ); + let lastSeenBytes = 0; + const uploadEventListener = (event: ProgressEvent, request: HttpRequest) => { + const requestPartSize = Number(request.query["partNumber"]) || -1; - if (eventEmitter !== null) { - eventEmitter.off("xhr.upload.progress", uploadEventListener); - } - - if (this.abortController.signal.aborted) { + if (requestPartSize !== dataPart.partNumber) { + // ignored, because the emitted event is not for this part. return; } - if (!partResult.ETag) { - throw new Error( - `Part ${dataPart.partNumber} is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?` - ); - } - - this.uploadedParts.push({ - PartNumber: dataPart.partNumber, - ETag: partResult.ETag, - ...(partResult.ChecksumCRC32 && { ChecksumCRC32: partResult.ChecksumCRC32 }), - ...(partResult.ChecksumCRC32C && { ChecksumCRC32C: partResult.ChecksumCRC32C }), - ...(partResult.ChecksumSHA1 && { ChecksumSHA1: partResult.ChecksumSHA1 }), - ...(partResult.ChecksumSHA256 && { ChecksumSHA256: partResult.ChecksumSHA256 }), - }); - - if (eventEmitter === null) { - this.bytesUploadedSoFar += partSize; + if (event.total && partSize) { + this.bytesUploadedSoFar += event.loaded - lastSeenBytes; + lastSeenBytes = event.loaded; } this.__notifyProgress({ @@ -291,33 +264,89 @@ export class Upload extends EventEmitter { Key: this.params.Key, Bucket: this.params.Bucket, }); - } catch (e) { - // Failed to create multi-part or put - if (!this.uploadId) { - throw e; - } - // on leavePartsOnError throw an error so users can deal with it themselves, - // otherwise swallow the error. - if (this.leavePartsOnError) { - throw e; - } + }; + + if (eventEmitter !== null) { + // The requestHandler is the xhr-http-handler. + eventEmitter.on("xhr.upload.progress", uploadEventListener); + } + + this.uploadEnqueuedPartsCount += 1; + + const partResult = await this.client.send( + new UploadPartCommand({ + ...this.params, + UploadId: this.uploadId, + Body: dataPart.data, + PartNumber: dataPart.partNumber, + }) + ); + + if (eventEmitter !== null) { + eventEmitter.off("xhr.upload.progress", uploadEventListener); } + + if (this.abortController.signal.aborted) { + return; + } + + if (!partResult.ETag) { + throw new Error( + `Part ${dataPart.partNumber} is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?` + ); + } + + this.uploadedParts.push({ + PartNumber: dataPart.partNumber, + ETag: partResult.ETag, + ...(partResult.ChecksumCRC32 && { ChecksumCRC32: partResult.ChecksumCRC32 }), + ...(partResult.ChecksumCRC32C && { ChecksumCRC32C: partResult.ChecksumCRC32C }), + ...(partResult.ChecksumSHA1 && { ChecksumSHA1: partResult.ChecksumSHA1 }), + ...(partResult.ChecksumSHA256 && { ChecksumSHA256: partResult.ChecksumSHA256 }), + }); + + if (eventEmitter === null) { + this.bytesUploadedSoFar += partSize; + } + + this.__notifyProgress({ + loaded: this.bytesUploadedSoFar, + total: this.totalBytes, + part: dataPart.partNumber, + Key: this.params.Key, + Bucket: this.params.Bucket, + }); } } private async __doMultipartUpload(): Promise { - // Set up data input chunks. const dataFeeder = getChunk(this.params.Body, this.partSize); + const concurrentUploaderFailures: Error[] = []; - // Create and start concurrent uploads. for (let index = 0; index < this.queueSize; index++) { - const currentUpload = this.__doConcurrentUpload(dataFeeder); + const currentUpload = this.__doConcurrentUpload(dataFeeder).catch((err) => { + concurrentUploaderFailures.push(err); + }); this.concurrentUploaders.push(currentUpload); } - // Create and start concurrent uploads. await Promise.all(this.concurrentUploaders); + if (concurrentUploaderFailures.length >= 1) { + await this.markUploadAsAborted(); + /** + * Previously, each promise in concurrentUploaders could potentially throw + * and immediately return control to user code. However, we want to wait for + * all uploaders to finish before calling AbortMultipartUpload to avoid + * stranding uploaded parts. + * + * We throw only the first error to be consistent with prior behavior, + * but may consider combining the errors into a report in the future. + */ + throw concurrentUploaderFailures[0]; + } + if (this.abortController.signal.aborted) { + await this.markUploadAsAborted(); throw Object.assign(new Error("Upload aborted."), { name: "AbortError" }); } @@ -341,6 +370,8 @@ export class Upload extends EventEmitter { result = this.singleUploadResult!; } + this.abortMultipartUploadCommand = null; + // Add tags to the object after it's completed the upload. if (this.tags.length) { await this.client.send( @@ -356,6 +387,18 @@ export class Upload extends EventEmitter { return result; } + /** + * Abort the last multipart upload in progress + * if we know the upload id, the user did not specify to leave the parts, and + * we have a prepared AbortMultipartUpload command. + */ + private async markUploadAsAborted(): Promise { + if (this.uploadId && !this.leavePartsOnError && null !== this.abortMultipartUploadCommand) { + await this.client.send(this.abortMultipartUploadCommand); + this.abortMultipartUploadCommand = null; + } + } + private __notifyProgress(progress: Progress): void { if (this.uploadEvent) { this.emit(this.uploadEvent, progress); diff --git a/lib/lib-storage/src/lib-storage.e2e.spec.ts b/lib/lib-storage/src/lib-storage.e2e.spec.ts index c0909ab1e3a8..61842fd57cbf 100644 --- a/lib/lib-storage/src/lib-storage.e2e.spec.ts +++ b/lib/lib-storage/src/lib-storage.e2e.spec.ts @@ -48,5 +48,65 @@ describe("@aws-sdk/lib-storage", () => { expect(await object.Body?.transformToString()).toEqual(dataString); }); } + + it("should call AbortMultipartUpload if unable to complete a multipart upload.", async () => { + class MockFailureS3 extends S3 { + public counter = 0; + async send(command: any, ...rest: any[]) { + if (command?.constructor?.name === "UploadPartCommand" && this.counter++ % 3 === 0) { + throw new Error("simulated upload part error"); + } + return super.send(command, ...rest); + } + } + + const client = new MockFailureS3({ + region, + credentials, + }); + + const requestLog = [] as string[]; + + client.middlewareStack.add( + (next, context) => async (args) => { + const result = await next(args); + requestLog.push([context.clientName, context.commandName, result.output.$metadata.httpStatusCode].join(" ")); + return result; + }, + { + name: "E2eRequestLog", + step: "build", + override: true, + } + ); + + const s3Upload = new Upload({ + client, + params: { + Bucket, + Key, + Body: data, + }, + }); + // eslint-disable-next-line @typescript-eslint/no-unused-vars + await s3Upload.done().catch((ignored) => {}); + + const uploadStatus = await client + .listParts({ + Bucket, + Key, + UploadId: s3Upload.uploadId, + }) + .then((listParts) => listParts.$metadata.httpStatusCode) + .catch((err) => err.toString()); + + expect(uploadStatus).toMatch(/NoSuchUpload:(.*?)aborted or completed\./); + expect(requestLog).toEqual([ + "S3Client CreateMultipartUploadCommand 200", + "S3Client UploadPartCommand 200", + "S3Client UploadPartCommand 200", + "S3Client AbortMultipartUploadCommand 204", + ]); + }); }); });