From bb8b66d9183e1ae582d3f3c37bb8d8c26ce67808 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Wed, 25 Sep 2019 15:21:17 +0200 Subject: [PATCH] Use more cancellation-friendly strategy for `limit: N` uploads (#1736) * xhr-upload: redo limit option using different strategy * xhr-upload: fix marking requests as done * Move duplicate createEventTracker definitions to @uppy/utils * tus: new cancellation for local uploads * tus: fix cancelling queued uploads * tus: fix wrong name * aws-s3-multipart: smol refactor * aws-s3-multipart: new cancellation style * aws-s3: new cancellation * utils: port limitPromises test to RateLimitedQueue * timing fix? * tus: new cancellation on websocket * xhr-upload: implement cancellation for remote uploads * aws-s3-multipart: port to new cancellation * utils: remove limitPromises * xhr-upload: remove pause/resume code * xhr-upload: clean up event listeners * xhr-upload: extract progress timer to separate class * Move ProgressTimeout class to utils * utils: update typings * Fix lint * tus: make pause/resume respect the rate limiting queue * tus: try to explain some of the tricky things at play in the Tus#upload method * aws-s3-multipart: add missing { abort: true } * aws-s3-multipart: make pause/resume respect the rate limiting queue * eslintrc.json - fixed eslint warnings for jsdoc * Revert "eslintrc.json - fixed eslint warnings for jsdoc" This reverts commit 50b24773ceaaafba6414437a3ebc0335681d65a4. * eslintrc.json - allow indentation in jsdoc comments * xhr-upload: fix promise return value * tus: fix promise return value * tus: add missing `return () => {}` * utils: typing export type fixes * tus: add more jsdoc * companion-client: add missing SocketOptions type * utils: fix more export typings * core,companion-client,tus: more typings tweaking * companion-client: test fix :weary: * companion-client: add type for Socket#open * tus: fix emit() call * add local cancellation fuzz....ish? test --- .eslintrc.json | 6 +- bin/endtoend-build-tests | 2 +- package-lock.json | 69 +++- package.json | 3 + packages/@uppy/aws-s3-multipart/src/index.js | 320 ++++++++++------- packages/@uppy/aws-s3/src/index.js | 55 +-- packages/@uppy/companion-client/src/Socket.js | 40 ++- .../@uppy/companion-client/src/Socket.test.js | 10 +- .../@uppy/companion-client/types/index.d.ts | 4 + packages/@uppy/core/types/index.d.ts | 4 + packages/@uppy/tus/src/index.js | 325 +++++++++++++----- packages/@uppy/utils/src/EventTracker.js | 21 ++ packages/@uppy/utils/src/ProgressTimeout.js | 37 ++ packages/@uppy/utils/src/RateLimitedQueue.js | 115 +++++++ .../@uppy/utils/src/RateLimitedQueue.test.js | 47 +++ packages/@uppy/utils/src/limitPromises.js | 36 -- .../@uppy/utils/src/limitPromises.test.js | 47 --- packages/@uppy/utils/types/index.d.ts | 148 +++++--- packages/@uppy/xhr-upload/src/index.js | 183 ++++++---- test/endtoend/chaos-monkey/index.html | 37 ++ test/endtoend/chaos-monkey/main.js | 39 +++ test/endtoend/chaos-monkey/test.js | 103 ++++++ test/endtoend/utils.js | 24 ++ test/endtoend/wdio.base.conf.js | 7 +- test/endtoend/wdio.remote.conf.js | 1 + tsconfig.json | 2 + 26 files changed, 1210 insertions(+), 475 deletions(-) create mode 100644 packages/@uppy/utils/src/EventTracker.js create mode 100644 packages/@uppy/utils/src/ProgressTimeout.js create mode 100644 packages/@uppy/utils/src/RateLimitedQueue.js create mode 100644 packages/@uppy/utils/src/RateLimitedQueue.test.js delete mode 100644 packages/@uppy/utils/src/limitPromises.js delete mode 100644 packages/@uppy/utils/src/limitPromises.test.js create mode 100644 test/endtoend/chaos-monkey/index.html create mode 100644 test/endtoend/chaos-monkey/main.js create mode 100644 test/endtoend/chaos-monkey/test.js diff --git a/.eslintrc.json b/.eslintrc.json index 8f4793c6c5..cec745d15a 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -15,17 +15,15 @@ "jsx-quotes": ["error", "prefer-double"], "compat/compat": ["error"], - // "no-unused-vars": ["warn", { "vars": "all", "args": "after-used", "ignoreRestSiblings": false }], - "jsdoc/check-alignment": ["warn"], "jsdoc/check-examples": ["warn"], - "jsdoc/check-indentation": ["warn"], "jsdoc/check-param-names": ["warn"], "jsdoc/check-syntax": ["warn"], "jsdoc/check-tag-names": ["warn"], "jsdoc/check-types": ["warn"], "jsdoc/newline-after-description": ["warn"], - "jsdoc/valid-types": ["warn"] + "jsdoc/valid-types": ["warn"], + "jsdoc/check-indentation": ["off"] }, "settings": { "react": { diff --git a/bin/endtoend-build-tests b/bin/endtoend-build-tests index ade56a23e8..2572431015 100755 --- a/bin/endtoend-build-tests +++ b/bin/endtoend-build-tests @@ -12,7 +12,7 @@ __base="$(basename ${__file} .sh)" __root="$(cd "$(dirname "${__dir}")" && pwd)" # Tests using a simple build setup. -tests="tus-drag-drop tus-dashboard i18n-drag-drop xhr-limit providers thumbnails transloadit url-plugin" +tests="chaos-monkey i18n-drag-drop providers thumbnails transloadit tus-drag-drop url-plugin xhr-limit" for t in $tests; do mkdir -p "${__root}/test/endtoend/$t/dist" diff --git a/package-lock.json b/package-lock.json index b636ec28e1..59d165f088 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1304,6 +1304,15 @@ "resolved": "https://registry.npmjs.org/@iarna/toml/-/toml-2.2.3.tgz", "integrity": "sha512-FmuxfCuolpLl0AnQ2NHSzoUKWEJDFl63qXjzdoWBVyFCXzMGm1spBzk7LeHNoVCiWCF7mRVms9e6jEV9+MoPbg==" }, + "@jamen/lorem": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/@jamen/lorem/-/lorem-0.2.0.tgz", + "integrity": "sha512-qdJTjlAdAicZUcpJFFEl/tkoQ9t9q/FqhDndIwtRKRNoWG+IwU5zqxV+96RlBWtmzudjw7SYOod76h1FP7PSzg==", + "dev": true, + "requires": { + "bytes": "^3.0.0" + } + }, "@jest/console": { "version": "24.7.1", "resolved": "https://registry.npmjs.org/@jest/console/-/console-24.7.1.tgz", @@ -9325,6 +9334,43 @@ } } }, + "brake": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/brake/-/brake-1.0.1.tgz", + "integrity": "sha1-V8SFTboeChjJnRX08qqEx3E0c9s=", + "dev": true, + "requires": { + "inherits": "~2.0.1", + "minimist": "~0.1.0", + "readable-stream": "~1.0.27-1" + }, + "dependencies": { + "minimist": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.1.0.tgz", + "integrity": "sha1-md9lelJXTCHJBXSX33QnkLK0wN4=", + "dev": true + }, + "readable-stream": { + "version": "1.0.34", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.0.34.tgz", + "integrity": "sha1-Elgg40vIQtLyqq+v5MKRbuMsFXw=", + "dev": true, + "requires": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=", + "dev": true + } + } + }, "brfs": { "version": "1.6.1", "resolved": "https://registry.npmjs.org/brfs/-/brfs-1.6.1.tgz", @@ -15797,11 +15843,11 @@ "dev": true }, "follow-redirects": { - "version": "1.7.0", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.7.0.tgz", - "integrity": "sha512-m/pZQy4Gj287eNy94nivy5wchN3Kp+Q5WgUPNy5lJSZ3sgkVKSYV/ZChMAQVIgx1SqfZ2zBZtPA2YlXIWxxJOQ==", + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.9.0.tgz", + "integrity": "sha512-CRcPzsSIbXyVDl0QI01muNDu69S8trU4jArW9LpOt2WtC6LyUJetcIrmfHsRBx7/Jb6GHJUiuqyYxPooFfNt6A==", "requires": { - "debug": "^3.2.6" + "debug": "^3.0.0" }, "dependencies": { "debug": { @@ -18347,13 +18393,20 @@ } }, "http-proxy": { - "version": "1.17.0", - "resolved": "https://registry.npmjs.org/http-proxy/-/http-proxy-1.17.0.tgz", - "integrity": "sha512-Taqn+3nNvYRfJ3bGvKfBSRwy1v6eePlm3oc/aWVxZp57DQr5Eq3xhKJi7Z4hZpS8PC3H4qI+Yly5EmFacGuA/g==", + "version": "1.18.0", + "resolved": "https://registry.npmjs.org/http-proxy/-/http-proxy-1.18.0.tgz", + "integrity": "sha512-84I2iJM/n1d4Hdgc6y2+qY5mDaz2PUVjlg9znE9byl+q0uC3DeByqBGReQu5tpLK0TAqTIXScRUV+dg7+bUPpQ==", "requires": { - "eventemitter3": "^3.0.0", + "eventemitter3": "^4.0.0", "follow-redirects": "^1.0.0", "requires-port": "^1.0.0" + }, + "dependencies": { + "eventemitter3": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.0.tgz", + "integrity": "sha512-qerSRB0p+UDEssxTtm6EDKcE7W4OaoisfIMl4CngyEhjpYglocpNg6UEqCvemdGhosAsg4sO2dXJOdyBifPGCg==" + } } }, "http-proxy-agent": { diff --git a/package.json b/package.json index 4fd57fe101..1a6f6bd859 100644 --- a/package.json +++ b/package.json @@ -78,6 +78,7 @@ "@babel/polyfill": "^7.4.4", "@babel/preset-env": "^7.4.5", "@babel/register": "^7.4.4", + "@jamen/lorem": "^0.2.0", "@octokit/rest": "^16.25.0", "@size-limit/preset-big-lib": "^2.1.1", "@types/aws-serverless-express": "^3.0.1", @@ -107,6 +108,7 @@ "babel-jest": "^24.8.0", "babel-plugin-inline-package-json": "^2.0.0", "babelify": "^10.0.0", + "brake": "^1.0.1", "browser-resolve": "^1.11.3", "browser-sync": "^2.26.5", "browserify": "^16.2.3", @@ -138,6 +140,7 @@ "glob": "^7.1.3", "globby": "^9.2.0", "gzip-size": "^5.0.0", + "http-proxy": "^1.18.0", "isomorphic-fetch": "2.2.1", "jest": "24.8.0", "json3": "^3.3.2", diff --git a/packages/@uppy/aws-s3-multipart/src/index.js b/packages/@uppy/aws-s3-multipart/src/index.js index 6232f15d33..5590cac7f1 100644 --- a/packages/@uppy/aws-s3-multipart/src/index.js +++ b/packages/@uppy/aws-s3-multipart/src/index.js @@ -1,29 +1,11 @@ const { Plugin } = require('@uppy/core') const { Socket, Provider, RequestClient } = require('@uppy/companion-client') +const EventTracker = require('@uppy/utils/lib/EventTracker') const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress') const getSocketHost = require('@uppy/utils/lib/getSocketHost') -const limitPromises = require('@uppy/utils/lib/limitPromises') +const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue') const Uploader = require('./MultipartUploader') -/** - * Create a wrapper around an event emitter with a `remove` method to remove - * all events that were added using the wrapped emitter. - */ -function createEventTracker (emitter) { - const events = [] - return { - on (event, fn) { - events.push([event, fn]) - return emitter.on(event, fn) - }, - remove () { - events.forEach(([event, fn]) => { - emitter.off(event, fn) - }) - } - } -} - function assertServerError (res) { if (res && res.error) { const error = new Error(res.message) @@ -53,15 +35,11 @@ module.exports = class AwsS3Multipart extends Plugin { completeMultipartUpload: this.completeMultipartUpload.bind(this) } - this.opts = Object.assign({}, defaultOptions, opts) + this.opts = { ...defaultOptions, ...opts } this.upload = this.upload.bind(this) - if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) { - this.limitRequests = limitPromises(this.opts.limit) - } else { - this.limitRequests = (fn) => fn - } + this.requests = new RateLimitedQueue(this.opts.limit) this.uploaders = Object.create(null) this.uploaderEvents = Object.create(null) @@ -71,6 +49,9 @@ module.exports = class AwsS3Multipart extends Plugin { /** * Clean up all references for a file's upload: the MultipartUploader instance, * any events related to the file, and the Companion WebSocket connection. + * + * Set `opts.abort` to tell S3 that the multipart upload is cancelled and must be removed. + * This should be done when the user cancels the upload, not when the upload is completed or errored. */ resetUploaderReferences (fileID, opts = {}) { if (this.uploaders[fileID]) { @@ -147,102 +128,149 @@ module.exports = class AwsS3Multipart extends Plugin { uploadFile (file) { return new Promise((resolve, reject) => { - const upload = new Uploader(file.data, Object.assign({ - // .bind to pass the file object to each handler. - createMultipartUpload: this.limitRequests(this.opts.createMultipartUpload.bind(this, file)), - listParts: this.limitRequests(this.opts.listParts.bind(this, file)), - prepareUploadPart: this.opts.prepareUploadPart.bind(this, file), - completeMultipartUpload: this.limitRequests(this.opts.completeMultipartUpload.bind(this, file)), - abortMultipartUpload: this.limitRequests(this.opts.abortMultipartUpload.bind(this, file)), - - limit: this.opts.limit || 5, - onStart: (data) => { - const cFile = this.uppy.getFile(file.id) - this.uppy.setFileState(file.id, { - s3Multipart: Object.assign({}, cFile.s3Multipart, { - key: data.key, - uploadId: data.uploadId, - parts: [] - }) - }) - }, - onProgress: (bytesUploaded, bytesTotal) => { - this.uppy.emit('upload-progress', file, { - uploader: this, - bytesUploaded: bytesUploaded, - bytesTotal: bytesTotal - }) - }, - onError: (err) => { - this.uppy.log(err) - this.uppy.emit('upload-error', file, err) - err.message = `Failed because: ${err.message}` - - this.resetUploaderReferences(file.id) - reject(err) - }, - onSuccess: (result) => { - const uploadResp = { - uploadURL: result.location + const onStart = (data) => { + const cFile = this.uppy.getFile(file.id) + this.uppy.setFileState(file.id, { + s3Multipart: { + ...cFile.s3Multipart, + key: data.key, + uploadId: data.uploadId, + parts: [] } + }) + } - this.resetUploaderReferences(file.id) + const onProgress = (bytesUploaded, bytesTotal) => { + this.uppy.emit('upload-progress', file, { + uploader: this, + bytesUploaded: bytesUploaded, + bytesTotal: bytesTotal + }) + } - this.uppy.emit('upload-success', file, uploadResp) + const onError = (err) => { + this.uppy.log(err) + this.uppy.emit('upload-error', file, err) + err.message = `Failed because: ${err.message}` - if (result.location) { - this.uppy.log('Download ' + upload.file.name + ' from ' + result.location) - } + this.resetUploaderReferences(file.id) + reject(err) + } - resolve(upload) - }, - onPartComplete: (part) => { - // Store completed parts in state. - const cFile = this.uppy.getFile(file.id) - if (!cFile) { - return - } - this.uppy.setFileState(file.id, { - s3Multipart: Object.assign({}, cFile.s3Multipart, { - parts: [ - ...cFile.s3Multipart.parts, - part - ] - }) - }) + const onSuccess = (result) => { + const uploadResp = { + uploadURL: result.location + } + + this.resetUploaderReferences(file.id) + + this.uppy.emit('upload-success', file, uploadResp) + + if (result.location) { + this.uppy.log('Download ' + upload.file.name + ' from ' + result.location) + } + + resolve(upload) + } - this.uppy.emit('s3-multipart:part-uploaded', cFile, part) + const onPartComplete = (part) => { + // Store completed parts in state. + const cFile = this.uppy.getFile(file.id) + if (!cFile) { + return } - }, file.s3Multipart)) + this.uppy.setFileState(file.id, { + s3Multipart: { + ...cFile.s3Multipart, + parts: [ + ...cFile.s3Multipart.parts, + part + ] + } + }) + + this.uppy.emit('s3-multipart:part-uploaded', cFile, part) + } + + const upload = new Uploader(file.data, { + // .bind to pass the file object to each handler. + createMultipartUpload: this.requests.wrapPromiseFunction( + this.opts.createMultipartUpload.bind(this, file)), + listParts: this.requests.wrapPromiseFunction( + this.opts.listParts.bind(this, file)), + prepareUploadPart: this.opts.prepareUploadPart.bind(this, file), + completeMultipartUpload: this.requests.wrapPromiseFunction( + this.opts.completeMultipartUpload.bind(this, file)), + abortMultipartUpload: this.requests.wrapPromiseFunction( + this.opts.abortMultipartUpload.bind(this, file)), + + onStart, + onProgress, + onError, + onSuccess, + onPartComplete, + + limit: this.opts.limit || 5, + ...file.s3Multipart + }) this.uploaders[file.id] = upload - this.uploaderEvents[file.id] = createEventTracker(this.uppy) + this.uploaderEvents[file.id] = new EventTracker(this.uppy) + + let queuedRequest = this.requests.run(() => { + if (!file.isPaused) { + upload.start() + } + // Don't do anything here, the caller will take care of cancelling the upload itself + // using resetUploaderReferences(). This is because resetUploaderReferences() has to be + // called when this request is still in the queue, and has not been started yet, too. At + // that point this cancellation function is not going to be called. + return () => {} + }) this.onFileRemove(file.id, (removed) => { + queuedRequest.abort() this.resetUploaderReferences(file.id, { abort: true }) resolve(`upload ${removed.id} was removed`) }) + this.onCancelAll(file.id, () => { + queuedRequest.abort() + this.resetUploaderReferences(file.id, { abort: true }) + resolve(`upload ${file.id} was canceled`) + }) + this.onFilePause(file.id, (isPaused) => { if (isPaused) { + // Remove this file from the queue so another file can start in its place. + queuedRequest.abort() upload.pause() } else { - upload.start() + // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue. + queuedRequest.abort() + queuedRequest = this.requests.run(() => { + upload.start() + return () => {} + }) } }) this.onPauseAll(file.id, () => { + queuedRequest.abort() upload.pause() }) this.onResumeAll(file.id, () => { - upload.start() + queuedRequest.abort() + if (file.error) { + upload.abort() + } + queuedRequest = this.requests.run(() => { + upload.start() + return () => {} + }) }) - if (!file.isPaused) { - upload.start() - } - if (!file.isRestored) { this.uppy.emit('upload-started', file, upload) } @@ -252,24 +280,22 @@ module.exports = class AwsS3Multipart extends Plugin { uploadRemote (file) { this.resetUploaderReferences(file.id) - return new Promise((resolve, reject) => { - if (file.serverToken) { - return this.connectToServerSocket(file) - .then(() => resolve()) - .catch(reject) - } - - this.uppy.emit('upload-started', file) + this.uppy.emit('upload-started', file) + if (file.serverToken) { + return this.connectToServerSocket(file) + } + return new Promise((resolve, reject) => { const Client = file.remote.providerOptions.provider ? Provider : RequestClient const client = new Client(this.uppy, file.remote.providerOptions) client.post( file.remote.url, - Object.assign({}, file.remote.body, { + { + ...file.remote.body, protocol: 's3-multipart', size: file.data.size, metadata: file.meta - }) + } ).then((res) => { this.uppy.setFileState(file.id, { serverToken: res.token }) file = this.uppy.getFile(file.id) @@ -288,46 +314,78 @@ module.exports = class AwsS3Multipart extends Plugin { return new Promise((resolve, reject) => { const token = file.serverToken const host = getSocketHost(file.remote.companionUrl) - const socket = new Socket({ target: `${host}/api/${token}` }) + const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false }) this.uploaderSockets[socket] = socket - this.uploaderEvents[file.id] = createEventTracker(this.uppy) + this.uploaderEvents[file.id] = new EventTracker(this.uppy) this.onFileRemove(file.id, (removed) => { + queuedRequest.abort() + socket.send('pause', {}) this.resetUploaderReferences(file.id, { abort: true }) resolve(`upload ${file.id} was removed`) }) this.onFilePause(file.id, (isPaused) => { - socket.send(isPaused ? 'pause' : 'resume', {}) + if (isPaused) { + // Remove this file from the queue so another file can start in its place. + queuedRequest.abort() + socket.send('pause', {}) + } else { + // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue. + queuedRequest.abort() + queuedRequest = this.requests.run(() => { + socket.send('resume', {}) + return () => {} + }) + } }) - this.onPauseAll(file.id, () => socket.send('pause', {})) + this.onPauseAll(file.id, () => { + queuedRequest.abort() + socket.send('pause', {}) + }) + + this.onCancelAll(file.id, () => { + queuedRequest.abort() + socket.send('pause', {}) + this.resetUploaderReferences(file.id) + resolve(`upload ${file.id} was canceled`) + }) this.onResumeAll(file.id, () => { + queuedRequest.abort() if (file.error) { socket.send('pause', {}) } - socket.send('resume', {}) + queuedRequest = this.requests.run(() => { + socket.send('resume', {}) + }) }) this.onRetry(file.id, () => { - socket.send('pause', {}) - socket.send('resume', {}) + // Only do the retry if the upload is actually in progress; + // else we could try to send these messages when the upload is still queued. + // We may need a better check for this since the socket may also be closed + // for other reasons, like network failures. + if (socket.isOpen) { + socket.send('pause', {}) + socket.send('resume', {}) + } }) this.onRetryAll(file.id, () => { - socket.send('pause', {}) - socket.send('resume', {}) + if (socket.isOpen) { + socket.send('pause', {}) + socket.send('resume', {}) + } }) - if (file.isPaused) { - socket.send('pause', {}) - } - socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file)) socket.on('error', (errData) => { this.uppy.emit('upload-error', file, new Error(errData.error)) + this.resetUploaderReferences(file.id) + queuedRequest.done() reject(new Error(errData.error)) }) @@ -337,8 +395,19 @@ module.exports = class AwsS3Multipart extends Plugin { } this.uppy.emit('upload-success', file, uploadResp) + this.resetUploaderReferences(file.id) + queuedRequest.done() resolve() }) + + let queuedRequest = this.requests.run(() => { + socket.open() + if (file.isPaused) { + socket.send('pause', {}) + } + + return () => {} + }) }) } @@ -393,6 +462,13 @@ module.exports = class AwsS3Multipart extends Plugin { }) } + onCancelAll (fileID, cb) { + this.uploaderEvents[fileID].on('cancel-all', () => { + if (!this.uppy.getFile(fileID)) return + cb() + }) + } + onResumeAll (fileID, cb) { this.uploaderEvents[fileID].on('resume-all', () => { if (!this.uppy.getFile(fileID)) return @@ -409,19 +485,15 @@ module.exports = class AwsS3Multipart extends Plugin { } }) this.uppy.addUploader(this.upload) - - this.uppy.on('cancel-all', () => { - this.uppy.getFiles().forEach((file) => { - this.resetUploaderReferences(file.id, { abort: true }) - }) - }) } uninstall () { + const { capabilities } = this.uppy.getState() this.uppy.setState({ - capabilities: Object.assign({}, this.uppy.getState().capabilities, { + capabilities: { + ...capabilities, resumableUploads: false - }) + } }) this.uppy.removeUploader(this.upload) } diff --git a/packages/@uppy/aws-s3/src/index.js b/packages/@uppy/aws-s3/src/index.js index 7a498ed798..f95c3654b7 100644 --- a/packages/@uppy/aws-s3/src/index.js +++ b/packages/@uppy/aws-s3/src/index.js @@ -1,7 +1,7 @@ const resolveUrl = require('resolve-url') const { Plugin } = require('@uppy/core') const Translator = require('@uppy/utils/lib/Translator') -const limitPromises = require('@uppy/utils/lib/limitPromises') +const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue') const { RequestClient } = require('@uppy/companion-client') const XHRUpload = require('@uppy/xhr-upload') @@ -59,11 +59,7 @@ module.exports = class AwsS3 extends Plugin { this.prepareUpload = this.prepareUpload.bind(this) - if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) { - this.limitRequests = limitPromises(this.opts.limit) - } else { - this.limitRequests = (fn) => fn - } + this.requests = new RateLimitedQueue(this.opts.limit) } getUploadParameters (file) { @@ -102,25 +98,29 @@ module.exports = class AwsS3 extends Plugin { }) }) - const getUploadParameters = this.limitRequests(this.opts.getUploadParameters) + // Wrapping rate-limited opts.getUploadParameters in a Promise takes some boilerplate! + const getUploadParameters = this.requests.wrapPromiseFunction((file) => { + return this.opts.getUploadParameters(file) + }) return Promise.all( fileIDs.map((id) => { const file = this.uppy.getFile(id) - const paramsPromise = Promise.resolve() - .then(() => getUploadParameters(file)) - return paramsPromise.then((params) => { - return this.validateParameters(file, params) - }).then((params) => { - this.uppy.emit('preprocess-progress', file, { - mode: 'determinate', - message: this.i18n('preparingUpload'), - value: 1 + return getUploadParameters(file) + .then((params) => { + return this.validateParameters(file, params) + }) + .then((params) => { + this.uppy.emit('preprocess-progress', file, { + mode: 'determinate', + message: this.i18n('preparingUpload'), + value: 1 + }) + return params + }) + .catch((error) => { + this.uppy.emit('upload-error', file, error) }) - return params - }).catch((error) => { - this.uppy.emit('upload-error', file, error) - }) }) ).then((responses) => { const updatedFiles = {} @@ -147,16 +147,21 @@ module.exports = class AwsS3 extends Plugin { xhrOpts.headers = headers } - const updatedFile = Object.assign({}, file, { - meta: Object.assign({}, file.meta, fields), + const updatedFile = { + ...file, + meta: { ...file.meta, ...fields }, xhrUpload: xhrOpts - }) + } updatedFiles[id] = updatedFile }) + const { files } = this.uppy.getState() this.uppy.setState({ - files: Object.assign({}, this.uppy.getState().files, updatedFiles) + files: { + ...files, + ...updatedFiles + } }) fileIDs.forEach((id) => { @@ -175,7 +180,7 @@ module.exports = class AwsS3 extends Plugin { fieldName: 'file', responseUrlFieldName: 'location', timeout: this.opts.timeout, - limit: this.opts.limit, + __queue: this.requests, responseType: 'text', // Get the response data from a successful XMLHttpRequest instance. // `content` is the S3 response as a string. diff --git a/packages/@uppy/companion-client/src/Socket.js b/packages/@uppy/companion-client/src/Socket.js index 2d35f15b1d..435af084b4 100644 --- a/packages/@uppy/companion-client/src/Socket.js +++ b/packages/@uppy/companion-client/src/Socket.js @@ -2,18 +2,34 @@ const ee = require('namespace-emitter') module.exports = class UppySocket { constructor (opts) { - this.queued = [] + this.opts = opts + this._queued = [] this.isOpen = false - this.socket = new WebSocket(opts.target) this.emitter = ee() + this._handleMessage = this._handleMessage.bind(this) + + this.close = this.close.bind(this) + this.emit = this.emit.bind(this) + this.on = this.on.bind(this) + this.once = this.once.bind(this) + this.send = this.send.bind(this) + + if (!opts || opts.autoOpen !== false) { + this.open() + } + } + + open () { + this.socket = new WebSocket(this.opts.target) + this.socket.onopen = (e) => { this.isOpen = true - while (this.queued.length > 0 && this.isOpen) { - const first = this.queued[0] + while (this._queued.length > 0 && this.isOpen) { + const first = this._queued[0] this.send(first.action, first.payload) - this.queued = this.queued.slice(1) + this._queued = this._queued.slice(1) } } @@ -21,26 +37,20 @@ module.exports = class UppySocket { this.isOpen = false } - this._handleMessage = this._handleMessage.bind(this) - this.socket.onmessage = this._handleMessage - - this.close = this.close.bind(this) - this.emit = this.emit.bind(this) - this.on = this.on.bind(this) - this.once = this.once.bind(this) - this.send = this.send.bind(this) } close () { - return this.socket.close() + if (this.socket) { + this.socket.close() + } } send (action, payload) { // attach uuid if (!this.isOpen) { - this.queued.push({ action, payload }) + this._queued.push({ action, payload }) return } diff --git a/packages/@uppy/companion-client/src/Socket.test.js b/packages/@uppy/companion-client/src/Socket.test.js index ead60c888a..8fbda486de 100644 --- a/packages/@uppy/companion-client/src/Socket.test.js +++ b/packages/@uppy/companion-client/src/Socket.test.js @@ -66,7 +66,7 @@ describe('Socket', () => { const uppySocket = new UppySocket({ target: 'foo' }) uppySocket.send('bar', 'boo') - expect(uppySocket.queued).toEqual([{ action: 'bar', payload: 'boo' }]) + expect(uppySocket._queued).toEqual([{ action: 'bar', payload: 'boo' }]) expect(webSocketSendSpy.mock.calls.length).toEqual(0) }) @@ -76,7 +76,7 @@ describe('Socket', () => { uppySocket.send('bar', 'boo') uppySocket.send('moo', 'baa') - expect(uppySocket.queued).toEqual([ + expect(uppySocket._queued).toEqual([ { action: 'bar', payload: 'boo' }, { action: 'moo', payload: 'baa' } ]) @@ -84,7 +84,7 @@ describe('Socket', () => { webSocketInstance.triggerOpen() - expect(uppySocket.queued).toEqual([]) + expect(uppySocket._queued).toEqual([]) expect(webSocketSendSpy.mock.calls.length).toEqual(2) expect(webSocketSendSpy.mock.calls[0]).toEqual([ JSON.stringify({ action: 'bar', payload: 'boo' }) @@ -99,11 +99,11 @@ describe('Socket', () => { const webSocketInstance = uppySocket.socket webSocketInstance.triggerOpen() uppySocket.send('bar', 'boo') - expect(uppySocket.queued).toEqual([]) + expect(uppySocket._queued).toEqual([]) webSocketInstance.triggerClose() uppySocket.send('bar', 'boo') - expect(uppySocket.queued).toEqual([{ action: 'bar', payload: 'boo' }]) + expect(uppySocket._queued).toEqual([{ action: 'bar', payload: 'boo' }]) }) it('should close the websocket when it is force closed', () => { diff --git a/packages/@uppy/companion-client/types/index.d.ts b/packages/@uppy/companion-client/types/index.d.ts index c24f718a77..11059996ee 100644 --- a/packages/@uppy/companion-client/types/index.d.ts +++ b/packages/@uppy/companion-client/types/index.d.ts @@ -28,10 +28,14 @@ export class Provider extends RequestClient { export interface SocketOptions { target: string; + autoOpen?: boolean; } export class Socket { + isOpen: boolean; + constructor (opts: SocketOptions); + open (): void; close (): void; send (action: string, payload: any): void; on (action: string, handler: (param: any) => void): void; diff --git a/packages/@uppy/core/types/index.d.ts b/packages/@uppy/core/types/index.d.ts index 943b1201ad..207ea7425d 100644 --- a/packages/@uppy/core/types/index.d.ts +++ b/packages/@uppy/core/types/index.d.ts @@ -105,6 +105,10 @@ declare module Uppy { on(event: 'complete', callback: (result: UploadResult) => void): Uppy; on(event: string, callback: (...args: any[]) => void): Uppy; off(event: string, callback: any): Uppy; + /** + * For use by plugins only! + */ + emit(event: string, ...args: any[]): void; updateAll(state: object): void; setState(patch: object): void; getState = {}>(): State; diff --git a/packages/@uppy/tus/src/index.js b/packages/@uppy/tus/src/index.js index f2eb42e5fe..d197a70608 100644 --- a/packages/@uppy/tus/src/index.js +++ b/packages/@uppy/tus/src/index.js @@ -4,11 +4,21 @@ const { Provider, RequestClient, Socket } = require('@uppy/companion-client') const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress') const getSocketHost = require('@uppy/utils/lib/getSocketHost') const settle = require('@uppy/utils/lib/settle') -const limitPromises = require('@uppy/utils/lib/limitPromises') +const EventTracker = require('@uppy/utils/lib/EventTracker') +const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue') const getFingerprint = require('./getFingerprint') -// Extracted from https://github.com/tus/tus-js-client/blob/master/lib/upload.js#L13 -// excepted we removed 'fingerprint' key to avoid adding more dependencies +/** @typedef {import('..').TusOptions} TusOptions */ +/** @typedef {import('@uppy/core').Uppy} Uppy */ +/** @typedef {import('@uppy/core').UppyFile} UppyFile */ +/** @typedef {import('@uppy/core').FailedUppyFile<{}>} FailedUppyFile */ + +/** + * Extracted from https://github.com/tus/tus-js-client/blob/master/lib/upload.js#L13 + * excepted we removed 'fingerprint' key to avoid adding more dependencies + * + * @type {TusOptions} + */ const tusDefaultOptions = { endpoint: '', resume: true, @@ -25,32 +35,16 @@ const tusDefaultOptions = { retryDelays: null } -/** - * Create a wrapper around an event emitter with a `remove` method to remove - * all events that were added using the wrapped emitter. - */ -function createEventTracker (emitter) { - const events = [] - return { - on (event, fn) { - events.push([event, fn]) - return emitter.on(event, fn) - }, - remove () { - events.forEach(([event, fn]) => { - emitter.off(event, fn) - }) - } - } -} - /** * Tus resumable file uploader - * */ module.exports = class Tus extends Plugin { static VERSION = require('../package.json').version + /** + * @param {Uppy} uppy + * @param {TusOptions} opts + */ constructor (uppy, opts) { super(uppy, opts) this.type = 'uploader' @@ -67,14 +61,14 @@ module.exports = class Tus extends Plugin { } // merge default options with the ones set by user + /** @type {import("..").TusOptions} */ this.opts = Object.assign({}, defaultOptions, opts) - // Simultaneous upload limiting is shared across all uploads with this plugin. - if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) { - this.limitUploads = limitPromises(this.opts.limit) - } else { - this.limitUploads = (fn) => fn - } + /** + * Simultaneous upload limiting is shared across all uploads with this plugin. + * @type {RateLimitedQueue} + */ + this.requests = new RateLimitedQueue(this.opts.limit) this.uploaders = Object.create(null) this.uploaderEvents = Object.create(null) @@ -101,6 +95,8 @@ module.exports = class Tus extends Plugin { /** * Clean up all references for a file's upload: the tus.Upload instance, * any events related to the file, and the Companion WebSocket connection. + * + * @param {string} fileID */ resetUploaderReferences (fileID) { if (this.uploaders[fileID]) { @@ -118,18 +114,43 @@ module.exports = class Tus extends Plugin { } /** - * Create a new Tus upload + * Create a new Tus upload. * - * @param {object} file for use with upload - * @param {integer} current file in a queue - * @param {integer} total number of files in a queue - * @returns {Promise} + * A lot can happen during an upload, so this is quite hard to follow! + * - First, the upload is started. If the file was already paused by the time the upload starts, nothing should happen. + * If the `limit` option is used, the upload must be queued onto the `this.requests` queue. + * When an upload starts, we store the tus.Upload instance, and an EventTracker instance that manages the event listeners + * for pausing, cancellation, removal, etc. + * - While the upload is in progress, it may be paused or cancelled. + * Pausing aborts the underlying tus.Upload, and removes the upload from the `this.requests` queue. All other state is + * maintained. + * Cancelling removes the upload from the `this.requests` queue, and completely aborts the upload--the tus.Upload instance + * is aborted and discarded, the EventTracker instance is destroyed (removing all listeners). + * Resuming the upload uses the `this.requests` queue as well, to prevent selectively pausing and resuming uploads from + * bypassing the limit. + * - After completing an upload, the tus.Upload and EventTracker instances are cleaned up, and the upload is marked as done + * in the `this.requests` queue. + * - When an upload completed with an error, the same happens as on successful completion, but the `upload()` promise is rejected. + * + * When working on this function, keep in mind: + * - When an upload is completed or cancelled for any reason, the tus.Upload and EventTracker instances need to be cleaned up using this.resetUploaderReferences(). + * - When an upload is cancelled or paused, for any reason, it needs to be removed from the `this.requests` queue using `queuedRequest.abort()`. + * - When an upload is completed for any reason, including errors, it needs to be marked as such using `queuedRequest.done()`. + * - When an upload is started or resumed, it needs to go through the `this.requests` queue. The `queuedRequest` variable must be updated so the other uses of it are valid. + * - Before replacing the `queuedRequest` variable, the previous `queuedRequest` must be aborted, else it will keep taking up a spot in the queue. + * + * @param {UppyFile} file for use with upload + * @param {number} current file in a queue + * @param {number} total number of files in a queue + * @returns {Promise} */ upload (file, current, total) { this.resetUploaderReferences(file.id) // Create a new tus upload return new Promise((resolve, reject) => { + this.uppy.emit('upload-started', file) + const optsTus = Object.assign( {}, tusDefaultOptions, @@ -150,6 +171,7 @@ module.exports = class Tus extends Plugin { err.message = `Failed because: ${err.message}` this.resetUploaderReferences(file.id) + queuedRequest.done() reject(err) } @@ -174,6 +196,7 @@ module.exports = class Tus extends Plugin { } this.resetUploaderReferences(file.id) + queuedRequest.done() resolve(upload) } @@ -203,78 +226,106 @@ module.exports = class Tus extends Plugin { const upload = new tus.Upload(file.data, optsTus) this.uploaders[file.id] = upload - this.uploaderEvents[file.id] = createEventTracker(this.uppy) + this.uploaderEvents[file.id] = new EventTracker(this.uppy) + + let queuedRequest = this.requests.run(() => { + if (!file.isPaused) { + upload.start() + } + // Don't do anything here, the caller will take care of cancelling the upload itself + // using resetUploaderReferences(). This is because resetUploaderReferences() has to be + // called when this request is still in the queue, and has not been started yet, too. At + // that point this cancellation function is not going to be called. + // Also, we need to remove the request from the queue _without_ destroying everything + // related to this upload to handle pauses. + return () => {} + }) this.onFileRemove(file.id, (targetFileID) => { + queuedRequest.abort() this.resetUploaderReferences(file.id) resolve(`upload ${targetFileID} was removed`) }) this.onPause(file.id, (isPaused) => { if (isPaused) { + // Remove this file from the queue so another file can start in its place. + queuedRequest.abort() upload.abort() } else { - upload.start() + // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue. + queuedRequest.abort() + queuedRequest = this.requests.run(() => { + upload.start() + return () => {} + }) } }) this.onPauseAll(file.id, () => { + queuedRequest.abort() upload.abort() }) this.onCancelAll(file.id, () => { + queuedRequest.abort() this.resetUploaderReferences(file.id) resolve(`upload ${file.id} was canceled`) }) this.onResumeAll(file.id, () => { + queuedRequest.abort() if (file.error) { upload.abort() } - upload.start() + queuedRequest = this.requests.run(() => { + upload.start() + return () => {} + }) }) - - if (!file.isPaused) { - upload.start() - } + }).catch((err) => { + this.uppy.emit('upload-error', file, err) + throw err }) } + /** + * @param {UppyFile} file for use with upload + * @param {number} current file in a queue + * @param {number} total number of files in a queue + * @return {Promise} + */ uploadRemote (file, current, total) { this.resetUploaderReferences(file.id) - const opts = Object.assign( - {}, - this.opts, + const opts = { ...this.opts } + if (file.tus) { // Install file-specific upload overrides. - file.tus || {} - ) + Object.assign(opts, file.tus) + } - return new Promise((resolve, reject) => { - this.uppy.log(file.remote.url) - if (file.serverToken) { - return this.connectToServerSocket(file) - .then(() => resolve()) - .catch(reject) - } + this.uppy.emit('upload-started', file) + this.uppy.log(file.remote.url) - this.uppy.emit('upload-started', file) + if (file.serverToken) { + return this.connectToServerSocket(file) + } + + return new Promise((resolve, reject) => { const Client = file.remote.providerOptions.provider ? Provider : RequestClient const client = new Client(this.uppy, file.remote.providerOptions) - client.post( - file.remote.url, - Object.assign({}, file.remote.body, { - endpoint: opts.endpoint, - uploadUrl: opts.uploadUrl, - protocol: 'tus', - size: file.data.size, - metadata: file.meta - }) - ).then((res) => { + + // !! cancellation is NOT supported at this stage yet + client.post(file.remote.url, { + ...file.remote.body, + endpoint: opts.endpoint, + uploadUrl: opts.uploadUrl, + protocol: 'tus', + size: file.data.size, + metadata: file.meta + }).then((res) => { this.uppy.setFileState(file.id, { serverToken: res.token }) file = this.uppy.getFile(file.id) - return file - }).then((file) => { return this.connectToServerSocket(file) }).then(() => { resolve() @@ -284,48 +335,85 @@ module.exports = class Tus extends Plugin { }) } + /** + * See the comment on the upload() method. + * + * Additionally, when an upload is removed, completed, or cancelled, we need to close the WebSocket connection. This is handled by the resetUploaderReferences() function, so the same guidelines apply as in upload(). + * + * @param {UppyFile} file + */ connectToServerSocket (file) { return new Promise((resolve, reject) => { const token = file.serverToken const host = getSocketHost(file.remote.companionUrl) - const socket = new Socket({ target: `${host}/api/${token}` }) + const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false }) this.uploaderSockets[file.id] = socket - this.uploaderEvents[file.id] = createEventTracker(this.uppy) + this.uploaderEvents[file.id] = new EventTracker(this.uppy) this.onFileRemove(file.id, () => { + queuedRequest.abort() socket.send('pause', {}) + this.resetUploaderReferences(file.id) resolve(`upload ${file.id} was removed`) }) this.onPause(file.id, (isPaused) => { - isPaused ? socket.send('pause', {}) : socket.send('resume', {}) + if (isPaused) { + // Remove this file from the queue so another file can start in its place. + queuedRequest.abort() + socket.send('pause', {}) + } else { + // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue. + queuedRequest.abort() + queuedRequest = this.requests.run(() => { + socket.send('resume', {}) + return () => {} + }) + } }) - this.onPauseAll(file.id, () => socket.send('pause', {})) + this.onPauseAll(file.id, () => { + queuedRequest.abort() + socket.send('pause', {}) + }) - this.onCancelAll(file.id, () => socket.send('pause', {})) + this.onCancelAll(file.id, () => { + queuedRequest.abort() + socket.send('pause', {}) + this.resetUploaderReferences(file.id) + resolve(`upload ${file.id} was canceled`) + }) this.onResumeAll(file.id, () => { + queuedRequest.abort() if (file.error) { socket.send('pause', {}) } - socket.send('resume', {}) + queuedRequest = this.requests.run(() => { + socket.send('resume', {}) + return () => {} + }) }) this.onRetry(file.id, () => { - socket.send('pause', {}) - socket.send('resume', {}) + // Only do the retry if the upload is actually in progress; + // else we could try to send these messages when the upload is still queued. + // We may need a better check for this since the socket may also be closed + // for other reasons, like network failures. + if (socket.isOpen) { + socket.send('pause', {}) + socket.send('resume', {}) + } }) this.onRetryAll(file.id, () => { - socket.send('pause', {}) - socket.send('resume', {}) + // See the comment in the onRetry() call + if (socket.isOpen) { + socket.send('pause', {}) + socket.send('resume', {}) + } }) - if (file.isPaused) { - socket.send('pause', {}) - } - socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file)) socket.on('error', (errData) => { @@ -340,9 +428,12 @@ module.exports = class Tus extends Plugin { this.uppy.setFileState(file.id, { serverToken: null }) + } else { + socket.close() } this.uppy.emit('upload-error', file, error) + queuedRequest.done() reject(error) }) @@ -353,14 +444,33 @@ module.exports = class Tus extends Plugin { this.uppy.emit('upload-success', file, uploadResp) this.resetUploaderReferences(file.id) + queuedRequest.done() resolve() }) + + let queuedRequest = this.requests.run(() => { + socket.open() + if (file.isPaused) { + socket.send('pause', {}) + } + + // Don't do anything here, the caller will take care of cancelling the upload itself + // using resetUploaderReferences(). This is because resetUploaderReferences() has to be + // called when this request is still in the queue, and has not been started yet, too. At + // that point this cancellation function is not going to be called. + // Also, we need to remove the request from the queue _without_ destroying everything + // related to this upload to handle pauses. + return () => {} + }) }) } /** * Store the uploadUrl on the file options, so that when Golden Retriever * restores state, we will continue uploading to the correct URL. + * + * @param {UppyFile} file + * @param {string} uploadURL */ onReceiveUploadUrl (file, uploadURL) { const currentFile = this.uppy.getFile(file.id) @@ -376,12 +486,20 @@ module.exports = class Tus extends Plugin { } } + /** + * @param {string} fileID + * @param {function(string): void} cb + */ onFileRemove (fileID, cb) { this.uploaderEvents[fileID].on('file-removed', (file) => { if (fileID === file.id) cb(file.id) }) } + /** + * @param {string} fileID + * @param {function(boolean): void} cb + */ onPause (fileID, cb) { this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => { if (fileID === targetFileID) { @@ -391,6 +509,10 @@ module.exports = class Tus extends Plugin { }) } + /** + * @param {string} fileID + * @param {function(): void} cb + */ onRetry (fileID, cb) { this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => { if (fileID === targetFileID) { @@ -399,6 +521,10 @@ module.exports = class Tus extends Plugin { }) } + /** + * @param {string} fileID + * @param {function(): void} cb + */ onRetryAll (fileID, cb) { this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => { if (!this.uppy.getFile(fileID)) return @@ -406,6 +532,10 @@ module.exports = class Tus extends Plugin { }) } + /** + * @param {string} fileID + * @param {function(): void} cb + */ onPauseAll (fileID, cb) { this.uploaderEvents[fileID].on('pause-all', () => { if (!this.uppy.getFile(fileID)) return @@ -413,6 +543,10 @@ module.exports = class Tus extends Plugin { }) } + /** + * @param {string} fileID + * @param {function(): void} cb + */ onCancelAll (fileID, cb) { this.uploaderEvents[fileID].on('cancel-all', () => { if (!this.uppy.getFile(fileID)) return @@ -420,6 +554,10 @@ module.exports = class Tus extends Plugin { }) } + /** + * @param {string} fileID + * @param {function(): void} cb + */ onResumeAll (fileID, cb) { this.uploaderEvents[fileID].on('resume-all', () => { if (!this.uppy.getFile(fileID)) return @@ -427,32 +565,29 @@ module.exports = class Tus extends Plugin { }) } + /** + * @param {(UppyFile | FailedUppyFile)[]} files + */ uploadFiles (files) { - const actions = files.map((file, i) => { - const current = parseInt(i, 10) + 1 + const promises = files.map((file, i) => { + const current = i + 1 const total = files.length - if (file.error) { - return () => Promise.reject(new Error(file.error)) + if ('error' in file && file.error) { + return Promise.reject(new Error(file.error)) } else if (file.isRemote) { - // We emit upload-started here, so that it's also emitted for files - // that have to wait due to the `limit` option. - this.uppy.emit('upload-started', file) - return this.uploadRemote.bind(this, file, current, total) + return this.uploadRemote(file, current, total) } else { - this.uppy.emit('upload-started', file) - return this.upload.bind(this, file, current, total) + return this.upload(file, current, total) } }) - const promises = actions.map((action) => { - const limitedAction = this.limitUploads(action) - return limitedAction() - }) - return settle(promises) } + /** + * @param {string[]} fileIDs + */ handleUpload (fileIDs) { if (fileIDs.length === 0) { this.uppy.log('[Tus] No files to upload') diff --git a/packages/@uppy/utils/src/EventTracker.js b/packages/@uppy/utils/src/EventTracker.js new file mode 100644 index 0000000000..e8a48c448d --- /dev/null +++ b/packages/@uppy/utils/src/EventTracker.js @@ -0,0 +1,21 @@ +/** + * Create a wrapper around an event emitter with a `remove` method to remove + * all events that were added using the wrapped emitter. + */ +module.exports = class EventTracker { + constructor (emitter) { + this._events = [] + this._emitter = emitter + } + + on (event, fn) { + this._events.push([event, fn]) + return this._emitter.on(event, fn) + } + + remove () { + this._events.forEach(([event, fn]) => { + this._emitter.off(event, fn) + }) + } +} diff --git a/packages/@uppy/utils/src/ProgressTimeout.js b/packages/@uppy/utils/src/ProgressTimeout.js new file mode 100644 index 0000000000..e1d670f9e3 --- /dev/null +++ b/packages/@uppy/utils/src/ProgressTimeout.js @@ -0,0 +1,37 @@ +/** + * Helper to abort upload requests if there has not been any progress for `timeout` ms. + * Create an instance using `timer = new ProgressTimeout(10000, onTimeout)` + * Call `timer.progress()` to signal that there has been progress of any kind. + * Call `timer.done()` when the upload has completed. + */ +class ProgressTimeout { + constructor (timeout, timeoutHandler) { + this._timeout = timeout + this._onTimedOut = timeoutHandler + this._isDone = false + this._aliveTimer = null + this._onTimedOut = this._onTimedOut.bind(this) + } + + progress () { + // Some browsers fire another progress event when the upload is + // cancelled, so we have to ignore progress after the timer was + // told to stop. + if (this._isDone) return + + if (this._timeout > 0) { + if (this._aliveTimer) clearTimeout(this._aliveTimer) + this._aliveTimer = setTimeout(this._onTimedOut, this._timeout) + } + } + + done () { + if (this._aliveTimer) { + clearTimeout(this._aliveTimer) + this._aliveTimer = null + } + this._isDone = true + } +} + +module.exports = ProgressTimeout diff --git a/packages/@uppy/utils/src/RateLimitedQueue.js b/packages/@uppy/utils/src/RateLimitedQueue.js new file mode 100644 index 0000000000..395f02bf71 --- /dev/null +++ b/packages/@uppy/utils/src/RateLimitedQueue.js @@ -0,0 +1,115 @@ +module.exports = class RateLimitedQueue { + constructor (limit) { + if (typeof limit !== 'number' || limit === 0) { + this.limit = Infinity + } else { + this.limit = limit + } + + this.activeRequests = 0 + this.queuedHandlers = [] + } + + _call (fn) { + this.activeRequests += 1 + + let done = false + + let cancelActive + try { + cancelActive = fn() + } catch (err) { + this.activeRequests -= 1 + throw err + } + + return { + abort: () => { + if (done) return + done = true + this.activeRequests -= 1 + cancelActive() + this._next() + }, + + done: () => { + if (done) return + done = true + this.activeRequests -= 1 + this._next() + } + } + } + + _next () { + if (this.activeRequests >= this.limit) { + return + } + if (this.queuedHandlers.length === 0) { + return + } + + // Dispatch the next request, and update the abort/done handlers + // so that cancelling it does the Right Thing (and doesn't just try + // to dequeue an already-running request). + const next = this.queuedHandlers.shift() + const handler = this._call(next.fn) + next.abort = handler.abort + next.done = handler.done + } + + _queue (fn) { + const handler = { + fn, + abort: () => { + this._dequeue(handler) + }, + done: () => { + throw new Error('Cannot mark a queued request as done: this indicates a bug') + } + } + this.queuedHandlers.push(handler) + return handler + } + + _dequeue (handler) { + const index = this.queuedHandlers.indexOf(handler) + if (index !== -1) { + this.queuedHandlers.splice(index, 1) + } + } + + run (fn) { + if (this.activeRequests < this.limit) { + return this._call(fn) + } + return this._queue(fn) + } + + wrapPromiseFunction (fn) { + return (...args) => new Promise((resolve, reject) => { + const queuedRequest = this.run(() => { + let cancelError + fn(...args).then((result) => { + if (cancelError) { + reject(cancelError) + } else { + queuedRequest.done() + resolve(result) + } + }, (err) => { + if (cancelError) { + reject(cancelError) + } else { + queuedRequest.done() + reject(err) + } + }) + + return () => { + cancelError = new Error('Cancelled') + } + }) + }) + } +} diff --git a/packages/@uppy/utils/src/RateLimitedQueue.test.js b/packages/@uppy/utils/src/RateLimitedQueue.test.js new file mode 100644 index 0000000000..a32e8cb124 --- /dev/null +++ b/packages/@uppy/utils/src/RateLimitedQueue.test.js @@ -0,0 +1,47 @@ +const RateLimitedQueue = require('./RateLimitedQueue') + +const delay = ms => new Promise(resolve => setTimeout(resolve, ms)) + +describe('RateLimitedQueue', () => { + let pending = 0 + function fn () { + pending++ + return delay(15).then(() => pending--) + } + + it('should run at most N promises at the same time', async () => { + const queue = new RateLimitedQueue(4) + const fn2 = queue.wrapPromiseFunction(fn) + + const result = Promise.all([ + fn2(), fn2(), fn2(), fn2(), + fn2(), fn2(), fn2(), fn2(), + fn2(), fn2() + ]) + + expect(pending).toBe(4) + + await delay(10) + expect(pending).toBe(4) + + await result + expect(pending).toBe(0) + }) + + it('should accept Infinity as limit', () => { + const queue = new RateLimitedQueue(Infinity) + const fn2 = queue.wrapPromiseFunction(fn) + + const result = Promise.all([ + fn2(), fn2(), fn2(), fn2(), + fn2(), fn2(), fn2(), fn2(), + fn2(), fn2() + ]) + + expect(pending).toBe(10) + + return result.then(() => { + expect(pending).toBe(0) + }) + }) +}) diff --git a/packages/@uppy/utils/src/limitPromises.js b/packages/@uppy/utils/src/limitPromises.js deleted file mode 100644 index 316111285f..0000000000 --- a/packages/@uppy/utils/src/limitPromises.js +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Limit the amount of simultaneously pending Promises. - * Returns a function that, when passed a function `fn`, - * will make sure that at most `limit` calls to `fn` are pending. - * - * @param {number} limit - * @returns {function()} - */ -module.exports = function limitPromises (limit) { - let pending = 0 - const queue = [] - return (fn) => { - return (...args) => { - const call = () => { - pending++ - const promise = fn(...args) - promise.then(onfinish, onfinish) - return promise - } - - if (pending >= limit) { - return new Promise((resolve, reject) => { - queue.push(() => { - call().then(resolve, reject) - }) - }) - } - return call() - } - } - function onfinish () { - pending-- - const next = queue.shift() - if (next) next() - } -} diff --git a/packages/@uppy/utils/src/limitPromises.test.js b/packages/@uppy/utils/src/limitPromises.test.js deleted file mode 100644 index 72e94f55d0..0000000000 --- a/packages/@uppy/utils/src/limitPromises.test.js +++ /dev/null @@ -1,47 +0,0 @@ -const limitPromises = require('./limitPromises') - -describe('limitPromises', () => { - let pending = 0 - function fn () { - pending++ - return new Promise((resolve) => setTimeout(resolve, 10)) - .then(() => pending--) - } - - it('should run at most N promises at the same time', () => { - const limit = limitPromises(4) - const fn2 = limit(fn) - - const result = Promise.all([ - fn2(), fn2(), fn2(), fn2(), - fn2(), fn2(), fn2(), fn2(), - fn2(), fn2() - ]) - - expect(pending).toBe(4) - setTimeout(() => { - expect(pending).toBe(4) - }, 10) - - return result.then(() => { - expect(pending).toBe(0) - }) - }) - - it('should accept Infinity as limit', () => { - const limit = limitPromises(Infinity) - const fn2 = limit(fn) - - const result = Promise.all([ - fn2(), fn2(), fn2(), fn2(), - fn2(), fn2(), fn2(), fn2(), - fn2(), fn2() - ]) - - expect(pending).toBe(10) - - return result.then(() => { - expect(pending).toBe(0) - }) - }) -}) diff --git a/packages/@uppy/utils/types/index.d.ts b/packages/@uppy/utils/types/index.d.ts index 7f270ae56d..55564e41fe 100644 --- a/packages/@uppy/utils/types/index.d.ts +++ b/packages/@uppy/utils/types/index.d.ts @@ -1,28 +1,81 @@ declare module '@uppy/utils/lib/Translator' { - export interface TranslatorOptions { - locale: { - strings: { - [key: string]: string | { [plural: number]: string }; + namespace Translator { + export interface TranslatorOptions { + locale: { + strings: { + [key: string]: string | { [plural: number]: string }; + }; + pluralize: (n: number) => number; }; - pluralize: (n: number) => number; + } + } + + class Translator { + constructor(opts: Translator.TranslatorOptions); + } + + export = Translator +} + +declare module '@uppy/utils/lib/EventTracker' { + namespace EventTracker { + export type EventHandler = (...args: any[]) => void; + export interface Emitter { + on: (event: string, handler: EventHandler) => void; + off: (event: string, handler: EventHandler) => void; + } + } + + class EventTracker { + constructor(emitter: EventTracker.Emitter); + on(event: string, handler: EventTracker.EventHandler): void; + remove(): void; + } + + export = EventTracker +} + +declare module '@uppy/utils/lib/ProgressTimeout' { + class ProgressTimeout { + constructor(timeout: number, timeoutHandler: () => void); + progress(): void; + done(): void; + } + export = ProgressTimeout +} + +declare module '@uppy/utils/lib/RateLimitedQueue' { + namespace RateLimitedQueue { + export type AbortFunction = () => void; + export type PromiseFunction = (...args: any[]) => Promise; + export type QueueEntry = { + abort: () => void, + done: () => void, }; } - export default class Translator { - constructor(opts: TranslatorOptions); + class RateLimitedQueue { + constructor(limit: number); + run(fn: () => RateLimitedQueue.AbortFunction): RateLimitedQueue.QueueEntry; + wrapPromiseFunction(fn: () => RateLimitedQueue.PromiseFunction): RateLimitedQueue.PromiseFunction; } + + export = RateLimitedQueue } declare module '@uppy/utils/lib/canvasToBlob' { - export default function canvasToBlob(canvas: HTMLCanvasElement, type: string, quality?: number): Promise; + function canvasToBlob(canvas: HTMLCanvasElement, type: string, quality?: number): Promise; + export = canvasToBlob } declare module '@uppy/utils/lib/dataURItoBlob' { - export default function dataURItoBlob(dataURI: string, opts: { mimeType?: string, name?: string }): Blob; + function dataURItoBlob(dataURI: string, opts: { mimeType?: string, name?: string }): Blob; + export = dataURItoBlob } declare module '@uppy/utils/lib/dataURItoFile' { - export default function dataURItoFile(dataURI: string, opts: { mimeType?: string, name?: string }): File; + function dataURItoFile(dataURI: string, opts: { mimeType?: string, name?: string }): File; + export = dataURItoFile } declare module '@uppy/utils/lib/emitSocketProgress' { @@ -34,104 +87,117 @@ declare module '@uppy/utils/lib/emitSocketProgress' { bytesTotal: number; } - export default function emitSocketProgress(uploader: object, progressData: ProgressData, file: UppyUtils.UppyFile): void; + function emitSocketProgress(uploader: object, progressData: ProgressData, file: UppyUtils.UppyFile): void; + export = emitSocketProgress } declare module '@uppy/utils/lib/findAllDOMElements' { - export default function findAllDOMElements(element: string | HTMLElement): HTMLElement[]; + function findAllDOMElements(element: string | HTMLElement): HTMLElement[]; + export = findAllDOMElements } declare module '@uppy/utils/lib/findDOMElement' { - export default function findDOMElement(element: string | HTMLElement): HTMLElement | null; + function findDOMElement(element: string | HTMLElement): HTMLElement | null; + export = findDOMElement } declare module '@uppy/utils/lib/generateFileID' { import UppyUtils = require('@uppy/utils'); - export default function generateFileID(file: UppyUtils.UppyFile): string; + function generateFileID(file: UppyUtils.UppyFile): string; + export = generateFileID } declare module '@uppy/utils/lib/getBytesRemaining' { - export default function getBytesRemaining(progress: { bytesTotal: number, bytesUploaded: number }): number; + function getBytesRemaining(progress: { bytesTotal: number, bytesUploaded: number }): number; + export = getBytesRemaining } declare module '@uppy/utils/lib/getETA' { - export default function getETA(progress: object): number; + function getETA(progress: object): number; + export = getETA } declare module '@uppy/utils/lib/getFileNameAndExtension' { - export default function getFileNameAndExtension(filename: string): { name: string, extension: string }; + function getFileNameAndExtension(filename: string): { name: string, extension: string }; + export = getFileNameAndExtension } declare module '@uppy/utils/lib/getFileType' { import UppyUtils = require('@uppy/utils'); - export default function getFileType(file: UppyUtils.UppyFile): string | null; + function getFileType(file: UppyUtils.UppyFile): string | null; + export = getFileType } declare module '@uppy/utils/lib/getFileTypeExtension' { - export default function getFileTypeExtension(mime: string): string; + function getFileTypeExtension(mime: string): string; + export = getFileTypeExtension } declare module '@uppy/utils/lib/getSocketHost' { - export default function getSocketHost(url: string): string; + function getSocketHost(url: string): string; + export = getSocketHost } declare module '@uppy/utils/lib/getSpeed' { - export default function getSpeed(progress: { bytesTotal: number, bytesUploaded: number }): number; + function getSpeed(progress: { bytesTotal: number, bytesUploaded: number }): number; + export = getSpeed } declare module '@uppy/utils/lib/getTimeStamp' { - export default function getTimeStamp(): string; + function getTimeStamp(): string; + export = getTimeStamp } declare module '@uppy/utils/lib/isDOMElement' { - export default function isDOMElement(element: any): boolean; + function isDOMElement(element: any): boolean; + export = isDOMElement } declare module '@uppy/utils/lib/isObjectURL' { - export default function isObjectURL(url: string): boolean; + function isObjectURL(url: string): boolean; + export = isObjectURL } declare module '@uppy/utils/lib/isDragDropSupported' { - export default function isDragDropSupported(): boolean; + function isDragDropSupported(): boolean; + export = isDragDropSupported } declare module '@uppy/utils/lib/isPreviewSupported' { - export default function isPreviewSupported(mime: string): boolean; + function isPreviewSupported(mime: string): boolean; + export = isPreviewSupported } declare module '@uppy/utils/lib/isTouchDevice' { - export default function isTouchDevice(): boolean; -} - -declare module '@uppy/utils/lib/limitPromises' { - // TODO guess this could be generic but it's probably fine this way - // because it's mostly for internal use - type LimitedFunction = (...args: any[]) => Promise; - type LimitedFunctionFactory = (fn: (...args: any[]) => Promise) => LimitedFunction; - - export default function limitPromises(limit: number): LimitedFunctionFactory; + function isTouchDevice(): boolean; + export = isTouchDevice } declare module '@uppy/utils/lib/prettyETA' { - export default function prettyETA(seconds: number): string; + function prettyETA(seconds: number): string; + export = prettyETA } declare module '@uppy/utils/lib/secondsToTime' { - export default function secondsToTime(seconds: number): string; + function secondsToTime(seconds: number): string; + export = secondsToTime } declare module '@uppy/utils/lib/settle' { - export default function settle(promises: Promise[]): Promise<{ successful: T[], failed: any[] }>; + function settle(promises: Promise[]): Promise<{ successful: T[], failed: any[] }>; + export = settle } declare module '@uppy/utils/lib/toArray' { - export default function toArray(list: any): any[]; + function toArray(list: any): any[]; + export = toArray } declare module '@uppy/utils/lib/getDroppedFiles' { - export default function getDroppedFiles(dataTransfer: DataTransfer, options?: object): Promise; + function getDroppedFiles(dataTransfer: DataTransfer, options?: object): Promise; + export = getDroppedFiles } declare module '@uppy/utils' { diff --git a/packages/@uppy/xhr-upload/src/index.js b/packages/@uppy/xhr-upload/src/index.js index 69b31a83e7..4e70ebcd7e 100644 --- a/packages/@uppy/xhr-upload/src/index.js +++ b/packages/@uppy/xhr-upload/src/index.js @@ -5,7 +5,9 @@ const { Provider, RequestClient, Socket } = require('@uppy/companion-client') const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress') const getSocketHost = require('@uppy/utils/lib/getSocketHost') const settle = require('@uppy/utils/lib/settle') -const limitPromises = require('@uppy/utils/lib/limitPromises') +const EventTracker = require('@uppy/utils/lib/EventTracker') +const ProgressTimeout = require('@uppy/utils/lib/ProgressTimeout') +const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue') function buildResponseError (xhr, error) { // No error message @@ -111,15 +113,18 @@ module.exports = class XHRUpload extends Plugin { this.handleUpload = this.handleUpload.bind(this) // Simultaneous upload limiting is shared across all uploads with this plugin. - if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) { - this.limitUploads = limitPromises(this.opts.limit) + // __queue is for internal Uppy use only! + if (this.opts.__queue instanceof RateLimitedQueue) { + this.requests = this.opts.__queue } else { - this.limitUploads = (fn) => fn + this.requests = new RateLimitedQueue(this.opts.limit) } if (this.opts.bundle && !this.opts.formData) { throw new Error('`opts.formData` must be true when `opts.bundle` is enabled.') } + + this.uploaderEvents = Object.create(null) } getOptions (file) { @@ -141,49 +146,6 @@ module.exports = class XHRUpload extends Plugin { return opts } - // Helper to abort upload requests if there has not been any progress for `timeout` ms. - // Create an instance using `timer = createProgressTimeout(10000, onTimeout)` - // Call `timer.progress()` to signal that there has been progress of any kind. - // Call `timer.done()` when the upload has completed. - createProgressTimeout (timeout, timeoutHandler) { - const uppy = this.uppy - const self = this - let isDone = false - - function onTimedOut () { - uppy.log(`[XHRUpload] timed out`) - const error = new Error(self.i18n('timedOut', { seconds: Math.ceil(timeout / 1000) })) - timeoutHandler(error) - } - - let aliveTimer = null - function progress () { - // Some browsers fire another progress event when the upload is - // cancelled, so we have to ignore progress after the timer was - // told to stop. - if (isDone) return - - if (timeout > 0) { - if (aliveTimer) clearTimeout(aliveTimer) - aliveTimer = setTimeout(onTimedOut, timeout) - } - } - - function done () { - uppy.log(`[XHRUpload] timer done`) - if (aliveTimer) { - clearTimeout(aliveTimer) - aliveTimer = null - } - isDone = true - } - - return { - progress, - done - } - } - addMetadata (formData, meta, opts) { const metaFields = Array.isArray(opts.metaFields) ? opts.metaFields @@ -240,17 +202,21 @@ module.exports = class XHRUpload extends Plugin { this.uppy.log(`uploading ${current} of ${total}`) return new Promise((resolve, reject) => { + this.uppy.emit('upload-started', file) + const data = opts.formData ? this.createFormDataUpload(file, opts) : this.createBareUpload(file, opts) - const timer = this.createProgressTimeout(opts.timeout, (error) => { + const timer = new ProgressTimeout(opts.timeout, () => { xhr.abort() + const error = new Error(this.i18n('timedOut', { seconds: Math.ceil(opts.timeout / 1000) })) this.uppy.emit('upload-error', file, error) reject(error) }) const xhr = new XMLHttpRequest() + this.uploaderEvents[file.id] = new EventTracker(this.uppy) const id = cuid() @@ -276,6 +242,11 @@ module.exports = class XHRUpload extends Plugin { xhr.addEventListener('load', (ev) => { this.uppy.log(`[XHRUpload] ${id} finished`) timer.done() + queuedRequest.done() + if (this.uploaderEvents[file.id]) { + this.uploaderEvents[file.id].remove() + this.uploaderEvents[file.id] = null + } if (opts.validateStatus(ev.target.status, xhr.responseText, xhr)) { const body = opts.getResponseData(xhr.responseText, xhr) @@ -311,6 +282,11 @@ module.exports = class XHRUpload extends Plugin { xhr.addEventListener('error', (ev) => { this.uppy.log(`[XHRUpload] ${id} errored`) timer.done() + queuedRequest.done() + if (this.uploaderEvents[file.id]) { + this.uploaderEvents[file.id].remove() + this.uploaderEvents[file.id] = null + } const error = buildResponseError(xhr, opts.getResponseError(xhr.responseText, xhr)) this.uppy.emit('upload-error', file, error) @@ -329,19 +305,21 @@ module.exports = class XHRUpload extends Plugin { xhr.setRequestHeader(header, opts.headers[header]) }) - xhr.send(data) - - this.uppy.on('file-removed', (removedFile) => { - if (removedFile.id === file.id) { + const queuedRequest = this.requests.run(() => { + xhr.send(data) + return () => { timer.done() xhr.abort() - reject(new Error('File removed')) } }) - this.uppy.on('cancel-all', () => { - timer.done() - xhr.abort() + this.onFileRemove(file.id, () => { + queuedRequest.abort() + reject(new Error('File removed')) + }) + + this.onCancelAll(file.id, () => { + queuedRequest.abort() reject(new Error('Upload cancelled')) }) }) @@ -350,6 +328,8 @@ module.exports = class XHRUpload extends Plugin { uploadRemote (file, current, total) { const opts = this.getOptions(file) return new Promise((resolve, reject) => { + this.uppy.emit('upload-started', file) + const fields = {} const metaFields = Array.isArray(opts.metaFields) ? opts.metaFields @@ -372,7 +352,30 @@ module.exports = class XHRUpload extends Plugin { }).then((res) => { const token = res.token const host = getSocketHost(file.remote.companionUrl) - const socket = new Socket({ target: `${host}/api/${token}` }) + const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false }) + this.uploaderEvents[file.id] = new EventTracker(this.uppy) + + this.onFileRemove(file.id, () => { + socket.send('pause', {}) + queuedRequest.abort() + resolve(`upload ${file.id} was removed`) + }) + + this.onCancelAll(file.id, () => { + socket.send('pause', {}) + queuedRequest.abort() + resolve(`upload ${file.id} was canceled`) + }) + + this.onRetry(file.id, () => { + socket.send('pause', {}) + socket.send('resume', {}) + }) + + this.onRetryAll(file.id, () => { + socket.send('pause', {}) + socket.send('resume', {}) + }) socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file)) @@ -387,7 +390,11 @@ module.exports = class XHRUpload extends Plugin { } this.uppy.emit('upload-success', file, uploadResp) - socket.close() + queuedRequest.done() + if (this.uploaderEvents[file.id]) { + this.uploaderEvents[file.id].remove() + this.uploaderEvents[file.id] = null + } return resolve() }) @@ -397,8 +404,22 @@ module.exports = class XHRUpload extends Plugin { ? opts.getResponseError(resp.responseText, resp) : Object.assign(new Error(errData.error.message), { cause: errData.error }) this.uppy.emit('upload-error', file, error) + queuedRequest.done() + if (this.uploaderEvents[file.id]) { + this.uploaderEvents[file.id].remove() + this.uploaderEvents[file.id] = null + } reject(error) }) + + const queuedRequest = this.requests.run(() => { + socket.open() + if (file.isPaused) { + socket.send('pause', {}) + } + + return () => socket.close() + }) }) }) } @@ -416,8 +437,9 @@ module.exports = class XHRUpload extends Plugin { const xhr = new XMLHttpRequest() - const timer = this.createProgressTimeout(this.opts.timeout, (error) => { + const timer = new ProgressTimeout(this.opts.timeout, () => { xhr.abort() + const error = new Error(this.i18n('timedOut', { seconds: Math.ceil(this.opts.timeout / 1000) })) emitError(error) reject(error) }) @@ -502,29 +524,48 @@ module.exports = class XHRUpload extends Plugin { } uploadFiles (files) { - const actions = files.map((file, i) => { + const promises = files.map((file, i) => { const current = parseInt(i, 10) + 1 const total = files.length if (file.error) { - return () => Promise.reject(new Error(file.error)) + return Promise.reject(new Error(file.error)) } else if (file.isRemote) { - // We emit upload-started here, so that it's also emitted for files - // that have to wait due to the `limit` option. - this.uppy.emit('upload-started', file) - return this.uploadRemote.bind(this, file, current, total) + return this.uploadRemote(file, current, total) } else { - this.uppy.emit('upload-started', file) - return this.upload.bind(this, file, current, total) + return this.upload(file, current, total) } }) - const promises = actions.map((action) => { - const limitedAction = this.limitUploads(action) - return limitedAction() + return settle(promises) + } + + onFileRemove (fileID, cb) { + this.uploaderEvents[fileID].on('file-removed', (file) => { + if (fileID === file.id) cb(file.id) }) + } - return settle(promises) + onRetry (fileID, cb) { + this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => { + if (fileID === targetFileID) { + cb() + } + }) + } + + onRetryAll (fileID, cb) { + this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => { + if (!this.uppy.getFile(fileID)) return + cb() + }) + } + + onCancelAll (fileID, cb) { + this.uploaderEvents[fileID].on('cancel-all', () => { + if (!this.uppy.getFile(fileID)) return + cb() + }) } handleUpload (fileIDs) { diff --git a/test/endtoend/chaos-monkey/index.html b/test/endtoend/chaos-monkey/index.html new file mode 100644 index 0000000000..32d52566e2 --- /dev/null +++ b/test/endtoend/chaos-monkey/index.html @@ -0,0 +1,37 @@ + + + + + + Uppy test page + + + +
+

Chaos monkey

+
+
+
+
    +
    +
    +
    + + + + + + diff --git a/test/endtoend/chaos-monkey/main.js b/test/endtoend/chaos-monkey/main.js new file mode 100644 index 0000000000..a8e65a2152 --- /dev/null +++ b/test/endtoend/chaos-monkey/main.js @@ -0,0 +1,39 @@ +require('es6-promise/auto') +require('whatwg-fetch') +const Uppy = require('@uppy/core') +const Dashboard = require('@uppy/dashboard') +const Tus = require('@uppy/tus') + +const isOnTravis = !!(process.env.TRAVIS && process.env.CI) +const endpoint = isOnTravis ? 'http://companion.test:1081' : 'http://localhost:1081' + +let id = 0 +window.setup = function (options) { + id += 1 + + // Initialise Uppy with Drag & Drop + const uppy = Uppy({ id: `uppy${id}`, debug: true }) + + uppy.use(Dashboard, { inline: true, target: '#dash' }) + uppy.use(Tus, { + endpoint: `${endpoint}/files/`, + limit: options.limit + }) + uppy.on('file-added', (file) => { + randomColorImage(function (blob) { + uppy.setFileState(file.id, { preview: URL.createObjectURL(blob) }) + }) + }) + + return uppy +} + +function randomColorImage (callback) { + const canvas = document.createElement('canvas') + canvas.width = 140 + canvas.height = 140 + const context = canvas.getContext('2d') + context.fillStyle = '#xxxxxx'.replace(/x/g, () => '0123456789ABCDEF'[Math.floor(Math.random() * 16)]) + context.fillRect(0, 0, 140, 140) + canvas.toBlob(callback) +} diff --git a/test/endtoend/chaos-monkey/test.js b/test/endtoend/chaos-monkey/test.js new file mode 100644 index 0000000000..b8773b051e --- /dev/null +++ b/test/endtoend/chaos-monkey/test.js @@ -0,0 +1,103 @@ +/* global browser, expect */ +const crypto = require('crypto') +const lorem = require('@jamen/lorem') +const { selectFakeFile } = require('../utils') + +const testURL = 'http://localhost:4567/chaos-monkey' + +describe('Chaos monkey', function () { + this.timeout(5 * 60 * 1000) // 5 minutes + + beforeEach(async () => { + await browser.url(testURL) + }) + + it('Add and cancel a bunch', async () => { + await browser.execute(function () { + window.currentUppy = window.setup({ limit: 3 }) + window.onerror = function (message) { + window.anyError = message + } + }) + + const types = ['application/octet-stream', 'text/plain'] + const generate = { + 'application/octet-stream' () { + const len = Math.round(Math.random() * 5000000) + return crypto.randomBytes(len) + }, + 'text/plain' () { + const len = Math.round(Math.random() * 5000000) + return Buffer.from(lorem(len)) + } + } + + async function addFile () { + await browser.execute(function () { + window.addLogMessage('Adding a file') + }) + const type = types[Math.floor(Math.random() * types.length)] + const data = generate[type]().toString('base64') + + const name = `${Math.random().toString(32).slice(2)}-file` + await browser.execute(selectFakeFile, 'currentUppy', name, type, data) + } + + function cancelFile () { + return browser.execute(function () { + window.addLogMessage('Cancelling a file') + // prefer deleting a file that is uploading right now + var selector = Math.random() <= 0.7 + ? '.is-inprogress .uppy-DashboardItem-action--remove' + : '.uppy-DashboardItem-action--remove' + var buttons = document.querySelectorAll(selector) + var del = buttons[Math.floor(Math.random() * buttons.length)] + if (del) del.click() + }) + } + + function startUploadIfAnyWaitingFiles () { + return browser.execute(function () { + window.addLogMessage('Starting upload') + var start = document.querySelector('.uppy-StatusBar-actionBtn--upload') + if (start) start.click() + }) + } + + function cancelAll () { + return browser.execute(function () { + window.addLogMessage('Cancelling everything') + var button = document.querySelector('.uppy-DashboardContent-back') + if (button) button.click() + }) + } + + await addFile() + await addFile() + await addFile() + + for (let i = 0; i < 300; i++) { + await browser.pause(50 + Math.floor(Math.random() * 300)) + const v = Math.floor(Math.random() * 100) + if (v < 45) { + await addFile() + } else if (v < 55) { + await cancelFile() + } else if (v === 55) { + await cancelAll() + } else if (v < 75) { + await startUploadIfAnyWaitingFiles() + } else { + // wait + } + } + + await cancelAll() + + const errorMessage = await browser.execute(function () { + return window.anyError + }) + // yikes chai, why can this not be a function call + expect(errorMessage).to.not.exist // eslint-disable-line no-unused-expressions + }) +}) diff --git a/test/endtoend/utils.js b/test/endtoend/utils.js index 2b60c10ee4..ddb9883f07 100644 --- a/test/endtoend/utils.js +++ b/test/endtoend/utils.js @@ -140,6 +140,9 @@ const tus = require('tus-node-server') const os = require('os') const rimraf = promisify(require('rimraf')) const { randomBytes } = require('crypto') +const http = require('http') +const httpProxy = require('http-proxy') +const brake = require('brake') class TusService { constructor ({ tusServerPort = 1080 }) { this.port = tusServerPort @@ -153,16 +156,37 @@ class TusService { directory: this.path }) + const proxy = httpProxy.createProxyServer() + this.slowServer = http.createServer((req, res) => { + proxy.web(req, res, { + target: `http://localhost:1080`, + // 200 kbps max upload, checking the rate limit every 20ms + buffer: req.pipe(brake({ + period: 20, + rate: 200 * 1024 / 50 + })) + }, (err) => { // eslint-disable-line handle-callback-err + // ignore, typically a cancelled request + }) + }) + const listen = promisify(this.tusServer.listen.bind(this.tusServer)) this.server = await listen({ host: '0.0.0.0', port: this.port }) + const listen2 = promisify(this.slowServer.listen.bind(this.slowServer)) + await listen2(this.port + 1) } async onComplete () { + if (this.slowServer) { + const close = promisify(this.slowServer.close.bind(this.slowServer)) + await close() + } if (this.server) { const close = promisify(this.server.close.bind(this.server)) await close() } await rimraf(this.path) + this.slowServer = null this.tusServer = null } } diff --git a/test/endtoend/wdio.base.conf.js b/test/endtoend/wdio.base.conf.js index 408b357e33..d284fcf6ec 100644 --- a/test/endtoend/wdio.base.conf.js +++ b/test/endtoend/wdio.base.conf.js @@ -89,15 +89,16 @@ exports.config = { [CompanionService], [StaticServerService, { folders: [ + { mount: '/chaos-monkey', path: './test/endtoend/chaos-monkey/dist' }, + { mount: '/create-react-app', path: './test/endtoend/create-react-app/build' }, { mount: '/i18n-drag-drop', path: './test/endtoend/i18n-drag-drop/dist' }, - { mount: '/tus-drag-drop', path: './test/endtoend/tus-drag-drop/dist' }, - { mount: '/xhr-limit', path: './test/endtoend/xhr-limit/dist' }, { mount: '/providers', path: './test/endtoend/providers/dist' }, { mount: '/thumbnails', path: './test/endtoend/thumbnails/dist' }, { mount: '/transloadit', path: './test/endtoend/transloadit/dist' }, + { mount: '/tus-drag-drop', path: './test/endtoend/tus-drag-drop/dist' }, { mount: '/typescript', path: './test/endtoend/typescript/dist' }, { mount: '/url-plugin', path: './test/endtoend/url-plugin/dist' }, - { mount: '/create-react-app', path: './test/endtoend/create-react-app/build' } + { mount: '/xhr-limit', path: './test/endtoend/xhr-limit/dist' } ] }], [TusService] diff --git a/test/endtoend/wdio.remote.conf.js b/test/endtoend/wdio.remote.conf.js index f09ee45ca4..ce5eb447b5 100644 --- a/test/endtoend/wdio.remote.conf.js +++ b/test/endtoend/wdio.remote.conf.js @@ -31,6 +31,7 @@ exports.config = { // Patterns to exclude. exclude: [ + 'test/endtoend/chaos-monkey/*', 'test/endtoend/url-plugin/*', 'test/endtoend/transloadit/*' ], diff --git a/tsconfig.json b/tsconfig.json index e38bb81a2d..3951af9322 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -6,6 +6,8 @@ "dom", "esnext" ], + "resolveJsonModule": true, + "allowJs": true, "noImplicitAny": true, "noImplicitThis": true, "strictNullChecks": true,