Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move remote file upload logic into companion-client #4573

Merged
merged 14 commits into from
Aug 24, 2023
227 changes: 27 additions & 200 deletions packages/@uppy/aws-s3-multipart/src/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import UploaderPlugin from '@uppy/core/lib/UploaderPlugin.js'
import { Socket, Provider, RequestClient } from '@uppy/companion-client'
import BasePlugin from '@uppy/core/lib/BasePlugin.js'
import { Provider, RequestClient } from '@uppy/companion-client'
import EventManager from '@uppy/utils/lib/EventManager'
import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
import getSocketHost from '@uppy/utils/lib/getSocketHost'
import { RateLimitedQueue } from '@uppy/utils/lib/RateLimitedQueue'
import { filterNonFailedFiles, filterFilesToEmitUploadStarted } from '@uppy/utils/lib/fileFilters'
import { createAbortError } from '@uppy/utils/lib/AbortController'
Expand Down Expand Up @@ -374,7 +372,7 @@ class HTTPCommunicationQueue {
}
}

export default class AwsS3Multipart extends UploaderPlugin {
export default class AwsS3Multipart extends BasePlugin {
static VERSION = packageJson.version

#companionCommunicationQueue
Expand Down Expand Up @@ -435,8 +433,6 @@ export default class AwsS3Multipart extends UploaderPlugin {
this.uploaders = Object.create(null)
this.uploaderEvents = Object.create(null)
this.uploaderSockets = Object.create(null)

this.setQueueRequestSocketToken(this.requests.wrapPromiseFunction(this.#requestSocketToken, { priority: -1 }))
}

[Symbol.for('uppy test: getClient')] () { return this.#client }
Expand Down Expand Up @@ -710,7 +706,7 @@ export default class AwsS3Multipart extends UploaderPlugin {
return this.uppy.getFile(file.id) || file
}

#uploadFile (file) {
#uploadLocalFile (file) {
return new Promise((resolve, reject) => {
const onProgress = (bytesUploaded, bytesTotal) => {
this.uppy.emit('upload-progress', file, {
Expand Down Expand Up @@ -770,191 +766,68 @@ export default class AwsS3Multipart extends UploaderPlugin {
})

this.uploaders[file.id] = upload
this.uploaderEvents[file.id] = new EventManager(this.uppy)
const eventManager = new EventManager(this.uppy)
this.uploaderEvents[file.id] = eventManager

this.onFileRemove(file.id, (removed) => {
eventManager.onFileRemove(file.id, (removed) => {
upload.abort()
this.resetUploaderReferences(file.id, { abort: true })
resolve(`upload ${removed.id} was removed`)
})

this.onCancelAll(file.id, ({ reason } = {}) => {
eventManager.onCancelAll(file.id, ({ reason } = {}) => {
if (reason === 'user') {
upload.abort()
this.resetUploaderReferences(file.id, { abort: true })
}
resolve(`upload ${file.id} was canceled`)
})

this.onFilePause(file.id, (isPaused) => {
eventManager.onFilePause(file.id, (isPaused) => {
if (isPaused) {
upload.pause()
} else {
upload.start()
}
})

this.onPauseAll(file.id, () => {
eventManager.onPauseAll(file.id, () => {
upload.pause()
})

this.onResumeAll(file.id, () => {
eventManager.onResumeAll(file.id, () => {
upload.start()
})

upload.start()
})
}

#requestSocketToken = async (file, options) => {
const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
const opts = { ...this.opts }

if (file.tus) {
// Install file-specific upload overrides.
Object.assign(opts, file.tus)
}

if (file.remote.url == null) {
throw new Error('Cannot connect to an undefined URL')
}

const res = await client.post(file.remote.url, {
// eslint-disable-next-line class-methods-use-this
#getCompanionClientArgs (file) {
return {
...file.remote.body,
protocol: 's3-multipart',
size: file.data.size,
metadata: file.meta,
}, options)
return res.token
}

async connectToServerSocket (file) {
return new Promise((resolve, reject) => {
let queuedRequest

const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
this.uploaderSockets[file.id] = socket
this.uploaderEvents[file.id] = new EventManager(this.uppy)

this.onFileRemove(file.id, () => {
socket.send('cancel', {})
queuedRequest.abort()
this.resetUploaderReferences(file.id, { abort: true })
resolve(`upload ${file.id} was removed`)
})

this.onFilePause(file.id, (isPaused) => {
if (isPaused) {
// Remove this file from the queue so another file can start in its place.
socket.send('pause', {})
queuedRequest.abort()
} else {
// Resuming an upload should be queued, else you could pause and then
// resume a queued upload to make it skip the queue.
queuedRequest.abort()
queuedRequest = this.requests.run(() => {
socket.open()
socket.send('resume', {})
return () => {}
})
}
})

this.onPauseAll(file.id, () => {
// First send the message, then call .abort,
// just to make sure socket is not closed, which .abort used to do
socket.send('pause', {})
queuedRequest.abort()
})

this.onCancelAll(file.id, ({ reason } = {}) => {
if (reason === 'user') {
socket.send('cancel', {})
queuedRequest.abort()
this.resetUploaderReferences(file.id)
}
resolve(`upload ${file.id} was canceled`)
})

this.onResumeAll(file.id, () => {
queuedRequest.abort()
if (file.error) {
socket.send('pause', {})
}
queuedRequest = this.requests.run(() => {
socket.open()
socket.send('resume', {})

return () => {}
})
})

this.onRetry(file.id, () => {
// Only do the retry if the upload is actually in progress;
// else we could try to send these messages when the upload is still queued.
// We may need a better check for this since the socket may also be closed
// for other reasons, like network failures.
if (socket.isOpen) {
socket.send('pause', {})
socket.send('resume', {})
}
})

this.onRetryAll(file.id, () => {
if (socket.isOpen) {
socket.send('pause', {})
socket.send('resume', {})
}
})

socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))

socket.on('error', (errData) => {
this.uppy.emit('upload-error', file, new Error(errData.error))
this.resetUploaderReferences(file.id)
socket.close()
queuedRequest.done()
reject(new Error(errData.error))
})

socket.on('success', (data) => {
const uploadResp = {
uploadURL: data.url,
}

this.uppy.emit('upload-success', file, uploadResp)
this.resetUploaderReferences(file.id)
socket.close()
queuedRequest.done()
resolve()
})

queuedRequest = this.requests.run(() => {
if (file.isPaused) {
socket.send('pause', {})
} else {
socket.open()
}

return () => {}
})
})
}
}

#upload = async (fileIDs) => {
if (fileIDs.length === 0) return undefined

const files = this.uppy.getFilesByIds(fileIDs)

const filesFiltered = filterNonFailedFiles(files)
const filesToEmit = filterFilesToEmitUploadStarted(filesFiltered)

this.uppy.emit('upload-start', filesToEmit)

const promises = filesFiltered.map((file) => {
if (file.isRemote) {
// TODO: why do we need to do this? why not always one or the other?
const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const getQueue = () => this.requests
const client = new Client(this.uppy, file.remote.providerOptions, getQueue)
this.#setResumableUploadsCapability(false)
const controller = new AbortController()

Expand All @@ -963,16 +836,20 @@ export default class AwsS3Multipart extends UploaderPlugin {
}
this.uppy.on('file-removed', removedHandler)

this.resetUploaderReferences(file.id)
const uploadPromise = this.uploadRemoteFile(file, { signal: controller.signal })
const uploadPromise = client.uploadRemoteFile(
file,
this.#getCompanionClientArgs(file),
{ signal: controller.signal },
)

this.requests.wrapSyncFunction(() => {
this.uppy.off('file-removed', removedHandler)
}, { priority: -1 })()

return uploadPromise
}
return this.#uploadFile(file)

return this.#uploadLocalFile(file)
})

const upload = await Promise.all(promises)
Expand All @@ -986,56 +863,6 @@ export default class AwsS3Multipart extends UploaderPlugin {
this.#client.setCompanionHeaders(this.opts.companionHeaders)
}

onFileRemove (fileID, cb) {
this.uploaderEvents[fileID].on('file-removed', (file) => {
if (fileID === file.id) cb(file.id)
})
}

onFilePause (fileID, cb) {
this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
if (fileID === targetFileID) {
cb(isPaused)
}
})
}

onRetry (fileID, cb) {
this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
if (fileID === targetFileID) {
cb()
}
})
}

onRetryAll (fileID, cb) {
this.uploaderEvents[fileID].on('retry-all', () => {
if (!this.uppy.getFile(fileID)) return
cb()
})
}

onPauseAll (fileID, cb) {
this.uploaderEvents[fileID].on('pause-all', () => {
if (!this.uppy.getFile(fileID)) return
cb()
})
}

onCancelAll (fileID, eventHandler) {
this.uploaderEvents[fileID].on('cancel-all', (...args) => {
if (!this.uppy.getFile(fileID)) return
eventHandler(...args)
})
}

onResumeAll (fileID, cb) {
this.uploaderEvents[fileID].on('resume-all', () => {
if (!this.uppy.getFile(fileID)) return
cb()
})
}

#setResumableUploadsCapability = (boolean) => {
const { capabilities } = this.uppy.getState()
this.uppy.setState({
Expand Down
Loading