Skip to content

Commit

Permalink
Generate assemblyId on client (#106)
Browse files Browse the repository at this point in the history
* Generate assemblyId on client

And allow user to retrieve it by calling promise.assemblyId
This allows the user to use or log the assemblyId even before it has been created for easier debugging

* Update README.md

Co-authored-by: Kevin van Zonneveld <kevin@vanzonneveld.net>
  • Loading branch information
mifi and kvz authored Apr 6, 2021
1 parent 384bb4e commit 2054ebf
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 81 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ function onAssemblyProgress(assembly) {
}
```

**Tip:** `createAssembly` returns a `Promise` with an extra property `assemblyId`. This can be used to retrieve the Assembly ID before the Assembly has even been created. Useful for debugging by logging this ID when the request starts, for example:

```js
const promise = transloadit.createAssembly(options)
console.log('Creating', promise.assemblyId)
const status = await promise
```


See also:
- [API documentation](https://transloadit.com/docs/api/#assemblies-post)
- Error codes and retry logic below
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"is-stream": "^2.0.0",
"lodash": "^4.17.20",
"p-map": "^4.0.0",
"tus-js-client": "^2.2.0"
"tus-js-client": "^2.2.0",
"uuid": "^8.3.2"
},
"devDependencies": {
"@babel/eslint-plugin": "^7.13.10",
Expand All @@ -42,8 +43,7 @@
"p-retry": "^4.2.0",
"request": "^2.88.2",
"temp": "^0.9.1",
"tsd": "^0.14.0",
"uuid": "^8.3.2"
"tsd": "^0.14.0"
},
"repository": {
"type": "git",
Expand Down
145 changes: 80 additions & 65 deletions src/Transloadit.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const intoStream = require('into-stream')
const isStream = require('is-stream')
const assert = require('assert')
const pMap = require('p-map')
const uuid = require('uuid')

const PaginationStream = require('./PaginationStream')
const { version } = require('../package.json')
Expand Down Expand Up @@ -173,7 +174,7 @@ class TransloaditClient {
* @param {object} opts assembly options
* @returns {Promise}
*/
async createAssembly (opts = {}, arg2) {
createAssembly (opts = {}, arg2) {
// Warn users of old callback API
if (typeof arg2 === 'function') {
throw new TypeError('You are trying to send a function as the second argument. This is no longer valid in this version. Please see github README for usage.')
Expand All @@ -194,90 +195,104 @@ class TransloaditClient {
// Keep track of how long the request took
const startTimeMs = getHrTimeMs()

// Undocumented feature to allow specifying the assembly id from the client
// (not recommended for general use due to security)
const urlSuffix = assemblyId != null ? `/assemblies/${assemblyId}` : '/assemblies'
// Undocumented feature to allow specifying a custom assembly id from the client
// Not recommended for general use due to security. E.g if the user doesn't provide a cryptographically
// secure ID, then anyone could access the assembly.
let effectiveAssemblyId
if (assemblyId != null) {
effectiveAssemblyId = assemblyId
} else {
effectiveAssemblyId = uuid.v4().replace(/-/g, '')
}
const urlSuffix = `/assemblies/${effectiveAssemblyId}`

this._lastUsedAssemblyUrl = `${this._endpoint}${urlSuffix}`
// We want to be able to return the promise immediately with custom data
const promise = (async () => {
this._lastUsedAssemblyUrl = `${this._endpoint}${urlSuffix}`

// eslint-disable-next-line no-bitwise
await pMap(Object.entries(files), async ([, path]) => access(path, fs.F_OK | fs.R_OK), { concurrency: 5 })
// eslint-disable-next-line no-bitwise
await pMap(Object.entries(files), async ([, path]) => access(path, fs.F_OK | fs.R_OK), { concurrency: 5 })

// Convert uploads to streams
const streamsMap = fromPairs(Object.entries(uploads).map(([label, value]) => {
const isReadable = isStream.readable(value)
if (!isReadable && isStream(value)) {
// https://github.com/transloadit/node-sdk/issues/92
throw new Error(`Upload named "${label}" is not a Readable stream`)
}
// Convert uploads to streams
const streamsMap = fromPairs(Object.entries(uploads).map(([label, value]) => {
const isReadable = isStream.readable(value)
if (!isReadable && isStream(value)) {
// https://github.com/transloadit/node-sdk/issues/92
throw new Error(`Upload named "${label}" is not a Readable stream`)
}

return [
label,
isStream.readable(value) ? value : intoStream(value),
]
}))
return [
label,
isStream.readable(value) ? value : intoStream(value),
]
}))

// Wrap in object structure (so we can know if it's a pathless stream or not)
const allStreamsMap = fromPairs(Object.entries(streamsMap).map(([label, stream]) => [label, { stream }]))
// Wrap in object structure (so we can know if it's a pathless stream or not)
const allStreamsMap = fromPairs(Object.entries(streamsMap).map(([label, stream]) => [label, { stream }]))

// Create streams from files too
for (const [label, path] of Object.entries(files)) {
const stream = fs.createReadStream(path)
allStreamsMap[label] = { stream, path } // File streams have path
}
// Create streams from files too
for (const [label, path] of Object.entries(files)) {
const stream = fs.createReadStream(path)
allStreamsMap[label] = { stream, path } // File streams have path
}

const allStreams = Object.values(allStreamsMap)
const allStreams = Object.values(allStreamsMap)

// Pause all streams
allStreams.forEach(({ stream }) => stream.pause())
// Pause all streams
allStreams.forEach(({ stream }) => stream.pause())

// If any stream emits error, we want to handle this and exit with error
const streamErrorPromise = new Promise((resolve, reject) => {
allStreams.forEach(({ stream }) => stream.on('error', reject))
})
// If any stream emits error, we want to handle this and exit with error
const streamErrorPromise = new Promise((resolve, reject) => {
allStreams.forEach(({ stream }) => stream.on('error', reject))
})

const createAssemblyAndUpload = async () => {
const useTus = isResumable && allStreams.every(isFileBasedStream)
const createAssemblyAndUpload = async () => {
const useTus = isResumable && allStreams.every(isFileBasedStream)

const requestOpts = {
urlSuffix,
method: 'post',
timeout,
params,
}
const requestOpts = {
urlSuffix,
method: 'post',
timeout,
params,
}

if (useTus) {
requestOpts.fields = {
tus_num_expected_upload_files: allStreams.length,
if (useTus) {
requestOpts.fields = {
tus_num_expected_upload_files: allStreams.length,
}
} else if (isResumable) {
logWarn('Disabling resumability because the size of one or more streams cannot be determined')
}
} else if (isResumable) {
logWarn('Disabling resumability because the size of one or more streams cannot be determined')
}

// upload as form multipart or tus?
const formUploadStreamsMap = useTus ? {} : allStreamsMap
const tusStreamsMap = useTus ? allStreamsMap : {}
// upload as form multipart or tus?
const formUploadStreamsMap = useTus ? {} : allStreamsMap
const tusStreamsMap = useTus ? allStreamsMap : {}

const result = await this._remoteJson(requestOpts, formUploadStreamsMap, onUploadProgress)
checkResult(result)
const result = await this._remoteJson(requestOpts, formUploadStreamsMap, onUploadProgress)
checkResult(result)

if (useTus && Object.keys(tusStreamsMap).length > 0) {
await sendTusRequest({
streamsMap: tusStreamsMap,
assembly : result,
onProgress: onUploadProgress,
if (useTus && Object.keys(tusStreamsMap).length > 0) {
await sendTusRequest({
streamsMap: tusStreamsMap,
assembly : result,
onProgress: onUploadProgress,
})
}

if (!waitForCompletion) return result
const awaitResult = await this.awaitAssemblyCompletion(result.assembly_id, {
timeout, onAssemblyProgress, startTimeMs,
})
checkResult(awaitResult)
return awaitResult
}

if (!waitForCompletion) return result
const awaitResult = await this.awaitAssemblyCompletion(result.assembly_id, {
timeout, onAssemblyProgress, startTimeMs,
})
checkResult(awaitResult)
return awaitResult
}
return Promise.race([createAssemblyAndUpload(), streamErrorPromise])
})()

return Promise.race([createAssemblyAndUpload(), streamErrorPromise])
// This allows the user to use or log the assemblyId even before it has been created for easier debugging
promise.assemblyId = effectiveAssemblyId
return promise
}

async awaitAssemblyCompletion (assemblyId, {
Expand Down
26 changes: 22 additions & 4 deletions test/integration/__tests__/live-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ function createClient (opts = {}) {
return new Transloadit({ authKey, authSecret, ...opts })
}

async function createAssembly (client, params) {
const assemblyId = uuid.v4().replace(/-/g, '')
console.log('createAssembly', assemblyId)
return client.createAssembly({ assemblyId, ...params })
function createAssembly (client, params) {
const promise = client.createAssembly(params)
const { assemblyId } = promise
console.log('createAssembly', assemblyId) // For easier debugging
return promise
}

const startServerAsync = async (handler) => new Promise((resolve, reject) => {
Expand Down Expand Up @@ -287,6 +288,23 @@ describe('API integration', () => {
expect(result.assembly_id).toEqual(assemblyId)
})

it('should allow getting the assemblyId on createAssembly even before it has been started', async () => {
const client = createClient()

const params = {
params: {
steps: {
dummy: dummyStep,
},
},
}

const promise = createAssembly(client, params)
expect(promise.assemblyId).toMatch(/^[\da-f]+$/)
const result = await promise
expect(result.assembly_id).toMatch(promise.assemblyId)
})

it('should throw a proper error for request stream', async () => {
const client = createClient()

Expand Down
18 changes: 10 additions & 8 deletions test/unit/__tests__/mock-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ jest.setTimeout(1000)

const getLocalClient = (opts) => new Transloadit({ authKey: '', authSecret: '', endpoint: 'http://localhost', ...opts })

const createAssemblyRegex = /\/assemblies\/[0-9a-f]{32}/

describe('Mocked API tests', () => {
afterEach(() => {
nock.cleanAll()
Expand All @@ -16,7 +18,7 @@ describe('Mocked API tests', () => {
const client = new Transloadit({ authKey: '', authSecret: '', endpoint: 'http://localhost' })

nock('http://localhost')
.post('/assemblies')
.post(createAssemblyRegex)
.delay(100)
.reply(200)

Expand Down Expand Up @@ -51,7 +53,7 @@ describe('Mocked API tests', () => {
const client = getLocalClient()

const scope = nock('http://localhost')
.post('/assemblies')
.post(createAssemblyRegex)
.reply(200, { ok: 'ASSEMBLY_UPLOADING' })
.get('/assemblies/1')
.query(() => true)
Expand Down Expand Up @@ -83,7 +85,7 @@ describe('Mocked API tests', () => {
const client = getLocalClient()

nock('http://localhost')
.post('/assemblies')
.post(createAssemblyRegex)
.reply(400, { error: 'INVALID_FILE_META_DATA' })

await expect(client.createAssembly()).rejects.toThrow(expect.objectContaining({ transloaditErrorCode: 'INVALID_FILE_META_DATA', message: 'INVALID_FILE_META_DATA' }))
Expand All @@ -93,7 +95,7 @@ describe('Mocked API tests', () => {
const client = getLocalClient()

nock('http://localhost')
.post('/assemblies')
.post(createAssemblyRegex)
.reply(400, { error: 'INVALID_FILE_META_DATA', assembly_id: '123', assembly_ssl_url: 'https://api2-oltu.transloadit.com/assemblies/foo' })

await expect(client.createAssembly()).rejects.toThrow(expect.objectContaining({
Expand All @@ -111,9 +113,9 @@ describe('Mocked API tests', () => {
// https://transloadit.com/blog/2012/04/introducing-rate-limiting/

const scope = nock('http://localhost')
.post('/assemblies')
.post(createAssemblyRegex)
.reply(413, { error: 'RATE_LIMIT_REACHED', info: { retryIn: 0.01 } })
.post('/assemblies')
.post(createAssemblyRegex)
.reply(200, { ok: 'ASSEMBLY_EXECUTING' })

await client.createAssembly()
Expand All @@ -124,7 +126,7 @@ describe('Mocked API tests', () => {
const client = getLocalClient({ maxRetries: 0 })

const scope = nock('http://localhost')
.post('/assemblies')
.post(createAssemblyRegex)
.reply(413, { error: 'RATE_LIMIT_REACHED', message: 'Request limit reached', info: { retryIn: 0.01 } })

await expect(client.createAssembly()).rejects.toThrow(expect.objectContaining({ transloaditErrorCode: 'RATE_LIMIT_REACHED', message: 'RATE_LIMIT_REACHED: Request limit reached' }))
Expand Down Expand Up @@ -190,7 +192,7 @@ describe('Mocked API tests', () => {
const client = getLocalClient()

const scope = nock('http://localhost')
.post('/assemblies')
.post(createAssemblyRegex)
.reply(200, { error: 'IMPORT_FILE_ERROR', assembly_id: '1' })

await expect(client.createAssembly()).rejects.toThrow(expect.objectContaining({
Expand Down
2 changes: 1 addition & 1 deletion test/unit/__tests__/test-transloadit-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ describe('Transloadit', () => {
it('should crash if attempt to use callback', async () => {
const client = new Transloadit({ authKey: 'foo_key', authSecret: 'foo_secret' })
const cb = () => {}
await expect(client.createAssembly({}, cb)).rejects.toThrow(TypeError)
expect(() => client.createAssembly({}, cb)).toThrow(TypeError)
})

describe('_calcSignature', () => {
Expand Down

0 comments on commit 2054ebf

Please sign in to comment.