Skip to content

Commit

Permalink
Refresh file before calling user-defined functions in AWS S3 Multipart
Browse files Browse the repository at this point in the history
  • Loading branch information
Madeline Lumetta committed Jul 20, 2023
1 parent 5146907 commit 8f3ef8e
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 13 deletions.
50 changes: 37 additions & 13 deletions packages/@uppy/aws-s3-multipart/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ class HTTPCommunicationQueue {

#uploadPartBytes

constructor (requests, options, setS3MultipartState) {
#getFile

constructor (requests, options, setS3MultipartState, getFile) {
this.#requests = requests
this.#setS3MultipartState = setS3MultipartState
this.#getFile = getFile
this.setOptions(options)
}

Expand Down Expand Up @@ -197,7 +200,7 @@ class HTTPCommunicationQueue {
}
}

const promise = this.#createMultipartUpload(file, signal)
const promise = this.#createMultipartUpload(this.#getFile(file), signal)

const abortPromise = () => {
promise.abort(signal.reason)
Expand Down Expand Up @@ -235,7 +238,7 @@ class HTTPCommunicationQueue {
// If the cached result rejects, there's nothing to abort.
return
}
await this.#abortMultipartUpload(file, awaitedResult)
await this.#abortMultipartUpload(this.#getFile(file), awaitedResult)
}

async #nonMultipartUpload (file, chunk, signal) {
Expand All @@ -244,7 +247,7 @@ class HTTPCommunicationQueue {
url,
fields,
headers,
} = await this.#getUploadParameters(file, { signal }).abortOn(signal)
} = await this.#getUploadParameters(this.#getFile(file), { signal }).abortOn(signal)

let body
const data = chunk.getData()
Expand Down Expand Up @@ -285,7 +288,10 @@ class HTTPCommunicationQueue {
try {
const parts = await Promise.all(chunks.map((chunk, i) => this.uploadChunk(file, i + 1, chunk, signal)))
throwIfAborted(signal)
return await this.#sendCompletionRequest(file, { key, uploadId, parts, signal }).abortOn(signal)
return await this.#sendCompletionRequest(
this.#getFile(file),
{ key, uploadId, parts, signal },
).abortOn(signal)
} catch (err) {
if (err?.cause !== pausingUploadReason && err?.name !== 'AbortError') {
// We purposefully don't wait for the promise and ignore its status,
Expand All @@ -308,7 +314,10 @@ class HTTPCommunicationQueue {
}
const { uploadId, key } = await this.getUploadId(file, signal)
throwIfAborted(signal)
const alreadyUploadedParts = await this.#listParts(file, { uploadId, key, signal }).abortOn(signal)
const alreadyUploadedParts = await this.#listParts(
this.#getFile(file),
{ uploadId, key, signal },
).abortOn(signal)
throwIfAborted(signal)
const parts = await Promise.all(
chunks
Expand All @@ -324,7 +333,10 @@ class HTTPCommunicationQueue {
}),
)
throwIfAborted(signal)
return this.#sendCompletionRequest(file, { key, uploadId, parts, signal }).abortOn(signal)
return this.#sendCompletionRequest(
this.#getFile(file),
{ key, uploadId, parts, signal },
).abortOn(signal)
}

/**
Expand All @@ -343,7 +355,7 @@ class HTTPCommunicationQueue {
const chunkData = chunk.getData()
const { onProgress, onComplete } = chunk

const signature = await this.#fetchSignature(file, {
const signature = await this.#fetchSignature(this.#getFile(file), {
uploadId, key, partNumber, body: chunkData, signal,
}).abortOn(signal)

Expand Down Expand Up @@ -413,7 +425,12 @@ export default class AwsS3Multipart extends UploaderPlugin {
* @type {RateLimitedQueue}
*/
this.requests = this.opts.rateLimitedQueue ?? new RateLimitedQueue(this.opts.limit)
this.#companionCommunicationQueue = new HTTPCommunicationQueue(this.requests, this.opts, this.#setS3MultipartState)
this.#companionCommunicationQueue = new HTTPCommunicationQueue(
this.requests,
this.opts,
this.#setS3MultipartState,
this.#getFile,
)

this.uploaders = Object.create(null)
this.uploaderEvents = Object.create(null)
Expand Down Expand Up @@ -675,6 +692,11 @@ export default class AwsS3Multipart extends UploaderPlugin {

#setS3MultipartState = (file, { key, uploadId }) => {
const cFile = this.uppy.getFile(file.id)
if (cFile == null) {
// file was removed from store
return
}

this.uppy.setFileState(file.id, {
s3Multipart: {
...cFile.s3Multipart,
Expand All @@ -684,10 +706,12 @@ export default class AwsS3Multipart extends UploaderPlugin {
})
}

#getFile = (file) => {
return this.uppy.getFile(file.id) || file
}

#uploadFile (file) {
return new Promise((resolve, reject) => {
const getFile = () => this.uppy.getFile(file.id) || file

const onProgress = (bytesUploaded, bytesTotal) => {
this.uppy.emit('upload-progress', file, {
uploader: this,
Expand All @@ -714,7 +738,7 @@ export default class AwsS3Multipart extends UploaderPlugin {

this.resetUploaderReferences(file.id)

this.uppy.emit('upload-success', getFile(), uploadResp)
this.uppy.emit('upload-success', this.#getFile(file), uploadResp)

if (result.location) {
this.uppy.log(`Download ${file.name} from ${result.location}`)
Expand All @@ -724,7 +748,7 @@ export default class AwsS3Multipart extends UploaderPlugin {
}

const onPartComplete = (part) => {
this.uppy.emit('s3-multipart:part-uploaded', getFile(), part)
this.uppy.emit('s3-multipart:part-uploaded', this.#getFile(file), part)
}

const upload = new MultipartUploader(file.data, {
Expand Down
235 changes: 235 additions & 0 deletions packages/@uppy/aws-s3-multipart/src/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,239 @@ describe('AwsS3Multipart', () => {
expect(client[Symbol.for('uppy test: getCompanionHeaders')]().authorization).toEqual(newToken)
})
})

describe('file metadata across custom main functions', () => {
let core
const createMultipartUpload = jest.fn(file => {
core.setFileMeta(file.id, {
...file.meta,
createMultipartUpload: true,
})

return {
uploadId: 'upload1234',
key: file.name,
}
})

const signPart = jest.fn((file, partData) => {
expect(file.meta.createMultipartUpload).toBe(true)

core.setFileMeta(file.id, {
...file.meta,
signPart: true,
[`part${partData.partNumber}`]: partData.partNumber,
})

return {
url: `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${partData.partNumber}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`,
}
})

const listParts = jest.fn((file) => {
expect(file.meta.createMultipartUpload).toBe(true)
core.setFileMeta(file.id, {
...file.meta,
listParts: true,
})

const partKeys = Object.keys(file.meta).filter(metaKey => metaKey.startsWith('part'))
return partKeys.map(metaKey => ({
PartNumber: file.meta[metaKey],
ETag: metaKey,
Size: 5 * MB,
}))
})

const completeMultipartUpload = jest.fn((file) => {
expect(file.meta.createMultipartUpload).toBe(true)
expect(file.meta.signPart).toBe(true)
for (let i = 1; i <= 10; i++) {
expect(file.meta[`part${i}`]).toBe(i)
}
return {}
})

const abortMultipartUpload = jest.fn((file) => {
expect(file.meta.createMultipartUpload).toBe(true)
expect(file.meta.signPart).toBe(true)
expect(file.meta.abortingPart).toBe(5)
return {}
})

beforeEach(() => {
createMultipartUpload.mockClear()
signPart.mockClear()
listParts.mockClear()
abortMultipartUpload.mockClear()
completeMultipartUpload.mockClear()
})

it('preserves file metadata if upload is completed', async () => {
core = new Core()
.use(AwsS3Multipart, {
createMultipartUpload,
signPart,
listParts,
completeMultipartUpload,
abortMultipartUpload,
})

nock('https://bucket.s3.us-east-2.amazonaws.com')
.defaultReplyHeaders({
'access-control-allow-headers': '*',
'access-control-allow-method': 'PUT',
'access-control-allow-origin': '*',
'access-control-expose-headers': 'ETag',
})
.put((uri) => uri.includes('test/upload/multitest.dat'))
.reply(200, '', { ETag: 'test' })
.persist()

const fileSize = 50 * MB
core.addFile({
source: 'jest',
name: 'multitest.dat',
type: 'application/octet-stream',
data: new File([new Uint8Array(fileSize)], {
type: 'application/octet-stream',
}),
})

await core.upload()
expect(createMultipartUpload).toHaveBeenCalled()
expect(signPart).toHaveBeenCalledTimes(10)
expect(completeMultipartUpload).toHaveBeenCalled()
})

it('preserves file metadata if upload is aborted', async () => {
const signPartWithAbort = jest.fn((file, partData) => {
expect(file.meta.createMultipartUpload).toBe(true)
if (partData.partNumber === 5) {
core.setFileMeta(file.id, {
...file.meta,
abortingPart: partData.partNumber,
})
core.removeFile(file.id)
return {}
}

core.setFileMeta(file.id, {
...file.meta,
signPart: true,
[`part${partData.partNumber}`]: partData.partNumber,
})

return {
url: `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${partData.partNumber}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`,
}
})

core = new Core()
.use(AwsS3Multipart, {
createMultipartUpload,
signPart: signPartWithAbort,
listParts,
completeMultipartUpload,
abortMultipartUpload,
})

nock('https://bucket.s3.us-east-2.amazonaws.com')
.defaultReplyHeaders({
'access-control-allow-headers': '*',
'access-control-allow-method': 'PUT',
'access-control-allow-origin': '*',
'access-control-expose-headers': 'ETag',
})
.put((uri) => uri.includes('test/upload/multitest.dat'))
.reply(200, '', { ETag: 'test' })
.persist()

const fileSize = 50 * MB
core.addFile({
source: 'jest',
name: 'multitest.dat',
type: 'application/octet-stream',
data: new File([new Uint8Array(fileSize)], {
type: 'application/octet-stream',
}),
})

await core.upload()
expect(createMultipartUpload).toHaveBeenCalled()
expect(signPartWithAbort).toHaveBeenCalled()
expect(abortMultipartUpload).toHaveBeenCalled()
})

it('preserves file metadata if upload is paused and resumed', async () => {
const completeMultipartUploadAfterPause = jest.fn((file) => {
expect(file.meta.createMultipartUpload).toBe(true)
expect(file.meta.signPart).toBe(true)
for (let i = 1; i <= 10; i++) {
expect(file.meta[`part${i}`]).toBe(i)
}

expect(file.meta.listParts).toBe(true)
return {}
})

const signPartWithPause = jest.fn((file, partData) => {
expect(file.meta.createMultipartUpload).toBe(true)
if (partData.partNumber === 3) {
core.setFileMeta(file.id, {
...file.meta,
abortingPart: partData.partNumber,
})
core.pauseResume(file.id)
setTimeout(() => core.pauseResume(file.id), 500)
}

core.setFileMeta(file.id, {
...file.meta,
signPart: true,
[`part${partData.partNumber}`]: partData.partNumber,
})

return {
url: `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${partData.partNumber}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`,
}
})

core = new Core()
.use(AwsS3Multipart, {
createMultipartUpload,
signPart: signPartWithPause,
listParts,
completeMultipartUpload: completeMultipartUploadAfterPause,
abortMultipartUpload,
})

nock('https://bucket.s3.us-east-2.amazonaws.com')
.defaultReplyHeaders({
'access-control-allow-headers': '*',
'access-control-allow-method': 'PUT',
'access-control-allow-origin': '*',
'access-control-expose-headers': 'ETag',
})
.put((uri) => uri.includes('test/upload/multitest.dat'))
.reply(200, '', { ETag: 'test' })
.persist()

const fileSize = 50 * MB
core.addFile({
source: 'jest',
name: 'multitest.dat',
type: 'application/octet-stream',
data: new File([new Uint8Array(fileSize)], {
type: 'application/octet-stream',
}),
})

await core.upload()
expect(createMultipartUpload).toHaveBeenCalled()
expect(signPartWithPause).toHaveBeenCalled()
expect(listParts).toHaveBeenCalled()
expect(completeMultipartUploadAfterPause).toHaveBeenCalled()
})
})
})

0 comments on commit 8f3ef8e

Please sign in to comment.