Skip to content

Commit

Permalink
feat: accept browser readable streams as input (ipfs#21)
Browse files Browse the repository at this point in the history
N.b you can't read from a browser reaable stream repeatedly as the
first call to `stream.getReader` locks the reader to the first
invocation so I had to change all the test data to be functions
that reutrn a new instance of the data.
  • Loading branch information
achingbrain authored and hugomrdias committed Jan 23, 2020
1 parent fd3bfc5 commit 0902067
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
20 changes: 17 additions & 3 deletions src/files/normalise-input.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ module.exports = function normaliseInput (input) {
})()
}

// window.ReadableStream
if (typeof input.getReader === 'function') {
return (async function * () {
for await (const obj of browserStreamToIt(input)) {
yield toFileObject(obj)
}
})()
}

// AsyncIterable<?>
if (input[Symbol.asyncIterator]) {
return (async function * () {
Expand Down Expand Up @@ -171,6 +180,11 @@ function toAsyncIterable (input) {
return blobToAsyncGenerator(input)
}

// Browser stream
if (typeof input.getReader === 'function') {
return browserStreamToIt(input)
}

// Iterator<?>
if (input[Symbol.iterator]) {
return (async function * () { // eslint-disable-line require-await
Expand Down Expand Up @@ -232,14 +246,14 @@ function isFileObject (obj) {
function blobToAsyncGenerator (blob) {
if (typeof blob.stream === 'function') {
// firefox < 69 does not support blob.stream()
return streamBlob(blob)
return browserStreamToIt(blob.stream())
}

return readBlob(blob)
}

async function * streamBlob (blob) {
const reader = blob.stream().getReader()
async function * browserStreamToIt (stream) {
const reader = stream.getReader()

while (true) {
const result = await reader.read()
Expand Down
54 changes: 35 additions & 19 deletions test/files/normalise-input.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,24 @@ const globalThis = require('../../src/globalthis')
chai.use(dirtyChai)
const expect = chai.expect

const STRING = 'hello world'
const BUFFER = Buffer.from(STRING)
const ARRAY = Array.from(BUFFER)
const TYPEDARRAY = Uint8Array.from(ARRAY)
const STRING = () => 'hello world'
const BUFFER = () => Buffer.from(STRING())
const ARRAY = () => Array.from(BUFFER())
const TYPEDARRAY = () => Uint8Array.from(ARRAY())
let BLOB
let WINDOW_READABLE_STREAM

if (supportsFileReader) {
BLOB = new globalThis.Blob([
STRING
BLOB = () => new globalThis.Blob([
STRING()
])

WINDOW_READABLE_STREAM = () => new globalThis.ReadableStream({
start (controller) {
controller.enqueue(BUFFER())
controller.close()
}
})
}

async function verifyNormalisation (input) {
Expand All @@ -31,7 +39,7 @@ async function verifyNormalisation (input) {
chai.assert.fail('Content should have been an iterable or an async iterable')
}

expect(await all(input[0].content)).to.deep.equal([BUFFER])
expect(await all(input[0].content)).to.deep.equal([BUFFER()])
expect(input[0].path).to.equal('')
}

Expand All @@ -54,56 +62,56 @@ function asyncIterableOf (thing) {
describe('normalise-input', function () {
function testInputType (content, name, isBytes) {
it(name, async function () {
await testContent(content)
await testContent(content())
})

if (isBytes) {
it(`Iterable<${name}>`, async function () {
await testContent(iterableOf(content))
await testContent(iterableOf(content()))
})

it(`AsyncIterable<${name}>`, async function () {
await testContent(asyncIterableOf(content))
await testContent(asyncIterableOf(content()))
})
}

it(`{ path: '', content: ${name} }`, async function () {
await testContent({ path: '', content })
await testContent({ path: '', content: content() })
})

if (isBytes) {
it(`{ path: '', content: Iterable<${name}> }`, async function () {
await testContent({ path: '', content: iterableOf(content) })
await testContent({ path: '', content: iterableOf(content()) })
})

it(`{ path: '', content: AsyncIterable<${name}> }`, async function () {
await testContent({ path: '', content: asyncIterableOf(content) })
await testContent({ path: '', content: asyncIterableOf(content()) })
})
}

it(`Iterable<{ path: '', content: ${name} }`, async function () {
await testContent(iterableOf({ path: '', content }))
await testContent(iterableOf({ path: '', content: content() }))
})

it(`AsyncIterable<{ path: '', content: ${name} }`, async function () {
await testContent(asyncIterableOf({ path: '', content }))
await testContent(asyncIterableOf({ path: '', content: content() }))
})

if (isBytes) {
it(`Iterable<{ path: '', content: Iterable<${name}> }>`, async function () {
await testContent(iterableOf({ path: '', content: iterableOf(content) }))
await testContent(iterableOf({ path: '', content: iterableOf(content()) }))
})

it(`Iterable<{ path: '', content: AsyncIterable<${name}> }>`, async function () {
await testContent(iterableOf({ path: '', content: asyncIterableOf(content) }))
await testContent(iterableOf({ path: '', content: asyncIterableOf(content()) }))
})

it(`AsyncIterable<{ path: '', content: Iterable<${name}> }>`, async function () {
await testContent(asyncIterableOf({ path: '', content: iterableOf(content) }))
await testContent(asyncIterableOf({ path: '', content: iterableOf(content()) }))
})

it(`AsyncIterable<{ path: '', content: AsyncIterable<${name}> }>`, async function () {
await testContent(asyncIterableOf({ path: '', content: asyncIterableOf(content) }))
await testContent(asyncIterableOf({ path: '', content: asyncIterableOf(content()) }))
})
}
}
Expand All @@ -124,6 +132,14 @@ describe('normalise-input', function () {
testInputType(BLOB, 'Blob', false)
})

describe('window.ReadableStream', () => {
if (!supportsFileReader) {
return
}

testInputType(WINDOW_READABLE_STREAM, 'window.ReadableStream', false)
})

describe('Iterable<Number>', () => {
testInputType(ARRAY, 'Iterable<Number>', false)
})
Expand Down

0 comments on commit 0902067

Please sign in to comment.