Skip to content

Commit

Permalink
adding async operation support
Browse files Browse the repository at this point in the history
  • Loading branch information
martinheidegger committed Jan 11, 2022
1 parent f17a440 commit a954a44
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 22 deletions.
36 changes: 27 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

An iteration of the Node.js core streams with a series of improvements.

```
npm install streamx
```sh
$ npm install streamx
```

[![Build Status](https://github.com/streamxorg/streamx/workflows/Build%20Status/badge.svg)](https://github.com/streamxorg/streamx/actions?query=workflow%3A%22Build+Status%22)
Expand Down Expand Up @@ -55,15 +55,15 @@ improvements above.

streamx has a much smaller footprint when compiled for the browser:

```
```sh
$ for x in stream{,x}; do echo $x: $(browserify -r $x | wc -c) bytes; done
stream: 173844 bytes
streamx: 46943 bytes
```

With optimizations turned on, the difference is even more stark:

```
```sh
$ for x in stream{,x}; do echo $x: $(browserify -r $x -p tinyify | wc -c) bytes; done
stream: 62649 bytes
streamx: 8460 bytes
Expand Down Expand Up @@ -105,7 +105,7 @@ Create a new readable stream.

Options include:

```
```js
{
highWaterMark: 16384, // max buffer size in bytes
map: (data) => data, // optional function to map input data
Expand All @@ -118,6 +118,12 @@ Options include:
In addition you can pass the `open`, `read`, and `destroy` functions as shorthands in
the constructor instead of overwrite the methods below.

`open`, `read` and `destroy` have the same signature as the `._open`, `._read` and `._destroy`
methods but also support async variants. For example: You can pass in `async read() {}` instead
of `read(cb) {}`

All passed-in functions will be executed with the readable stream as `this`.

The default byteLength function returns the byte length of buffers and `1024`
for any other object. This means the buffer will contain around 16 non buffers
or buffers worth 16kb when full if the defaults are used.
Expand Down Expand Up @@ -263,7 +269,7 @@ Create a new writable stream.

Options include:

```
```js
{
highWaterMark: 16384, // max buffer size in bytes
map: (data) => data, // optional function to map input data
Expand All @@ -272,8 +278,14 @@ Options include:
}
```

In addition you can pass the `open`, `write`, `final`, and `destroy` functions as shorthands in
the constructor instead of overwrite the methods below.
In addition you can pass the `open`, `write`, `writev`, `final`, and `destroy` functions as
shorthands in the constructor instead of overwrite the methods below.

`open`, `write`, `writev`, `final` and `destroy` have the same signature as the `._open`,
`._write`, `._writev`. `._final` and `._destroy` methods but also support async variants.
For example: You can pass in `async write(data) {}` instead of `write(data, cb) {}`.

All passed-in functions will be executed with the writable stream as `this`.

The default byteLength function returns the byte length of buffers and `1024`
for any other object. This means the buffer will contain around 16 non buffers
Expand Down Expand Up @@ -400,7 +412,13 @@ in `read` or `write`/`writev` or to override the corresponding `._read`, `._writ

A transform stream is a duplex stream that maps the data written to it and emits that as readable data.

Has the same options as a duplex stream except you can provide a `transform` function also.
Has the same options as a duplex stream except you can provide a `transform` and `flush` function also.

The `transform` and `flush` operations have the same signature as the `._transform` and `._flush` but
`._transform` also support an async variant. For example:
You can pass in `async transform (data) {}` instead of `transform (data, cb) {}`.

All passed-in functions will be executed with the duplex stream as `this`.

#### `ts._transform(data, callback)`

Expand Down
50 changes: 39 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,34 @@ function afterTransform (err, data) {
this._writableState.afterWrite(err)
}

function mapAsync (name, asyncOrNot) {
if (asyncOrNot.length === 0) {
return function (cb) {
const p = asyncOrNot.call(this)
if (!p) return cb(new Error(`Async template .${name} is expected to return a Promise.`))
p.then(
data => cb(null, data),
cb
)
}
}
return asyncOrNot
}

function mapAsync1 (name, asyncOrNot) {
if (asyncOrNot.length === 1) {
return function (arg, cb) {
const p = asyncOrNot.call(this, arg)
if (!p) return cb(new Error(`Async template .${name} is expected to return a Promise.`))
p.then(
data => cb(null, data),
cb
)
}
}
return asyncOrNot
}

class Stream extends EventEmitter {
constructor (opts) {
super()
Expand All @@ -518,8 +546,8 @@ class Stream extends EventEmitter {
this._writableState = null

if (opts) {
if (opts.open) this._open = opts.open
if (opts.destroy) this._destroy = opts.destroy
if (opts.open) this._open = mapAsync('open', opts.open)
if (opts.destroy) this._destroy = mapAsync('destroy', opts.destroy)
if (opts.predestroy) this._predestroy = opts.predestroy
if (opts.signal) {
opts.signal.addEventListener('abort', abort.bind(this))
Expand Down Expand Up @@ -602,7 +630,7 @@ class Readable extends Stream {
this._readableState = new ReadableState(this, opts)

if (opts) {
if (opts.read) this._read = opts.read
if (opts.read) this._read = mapAsync('read', opts.read)
if (opts.eagerOpen) this.resume().pause()
}
}
Expand Down Expand Up @@ -759,9 +787,9 @@ class Writable extends Stream {
this._writableState = new WritableState(this, opts)

if (opts) {
if (opts.writev) this._writev = opts.writev
if (opts.write) this._write = opts.write
if (opts.final) this._final = opts.final
if (opts.writev) this._writev = mapAsync1('writev', opts.writev)
if (opts.write) this._write = mapAsync1('write', opts.write)
if (opts.final) this._final = mapAsync('final', opts.final)
}
}

Expand Down Expand Up @@ -801,9 +829,9 @@ class Duplex extends Readable { // and Writable
this._writableState = new WritableState(this, opts)

if (opts) {
if (opts.writev) this._writev = opts.writev
if (opts.write) this._write = opts.write
if (opts.final) this._final = opts.final
if (opts.writev) this._writev = mapAsync1('writev', opts.writev)
if (opts.write) this._write = mapAsync1('write', opts.write)
if (opts.final) this._final = mapAsync('final', opts.final)
}
}

Expand Down Expand Up @@ -837,8 +865,8 @@ class Transform extends Duplex {
this._transformState = new TransformState(this)

if (opts) {
if (opts.transform) this._transform = opts.transform
if (opts.flush) this._flush = opts.flush
if (opts.transform) this._transform = mapAsync1('transform', opts.transform)
if (opts.flush) this._flush = mapAsync('flush', opts.flush)
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ tape('if open does not end, it should stall', function (t) {
t.plan(1)

const d = new Duplex({
open () {
open (_cb) {
t.pass('open called')
},
read () {
Expand Down
29 changes: 29 additions & 0 deletions test/passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,32 @@ tape('passthrough', t => {
r.push('bar')
r.push(null)
})

tape('async transform option', async function (t) {
const r = Readable.from([1, 2, 3]).pipe(new PassThrough({
async transform (a) {
return a.toString()
}
}))

const result = []
for await (const entry of r) {
result.push(entry)
}
t.same(result, ['1', '2', '3'])
t.end()
})

tape('async final option', async function (t) {
const r = Readable.from([1, 2, 3]).pipe(new PassThrough({
flush () {
return new Promise(resolve => setTimeout(resolve, 30))
}
}))
const start = Date.now()
r.on('close', () => {
t.ok((Date.now() - start) > 25)
t.end()
})
r.resume()
})
2 changes: 1 addition & 1 deletion test/pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ tape('simple pipe', function (t) {
cb(null)
},

final () {
async final () {
t.pass('final called')
t.same(buffered, ['hello', 'world'])
t.end()
Expand Down
57 changes: 57 additions & 0 deletions test/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,60 @@ tape('use mapReadable to map data', async function (t) {
t.deepEquals(obj, { foo: 1 })
}
})

tape('async read option', async function (t) {
let index = 0
const data = ['a', 'b', 'c', null]
const r = new Readable({
async read () {
this.push(data[index++])
}
})
const res = []
for await (const entry of r) {
res.push(entry)
}
t.same(res, ['a', 'b', 'c'])
t.end()
})

tape('async open option', function (t) {
const r = new Readable({
open () {
return new Promise(resolve => setTimeout(resolve, 30))
}
})
const start = Date.now()
r.on('data', () => {
t.ok((Date.now() - start) > 25)
t.end()
})
r.push(1)
r.push(null)
})

tape('async destroy option', function (t) {
const r = new Readable({
destroy () {
return new Promise(resolve => setTimeout(resolve, 30))
}
})
r.push(null)
const start = Date.now()
r.on('close', () => {
t.ok((Date.now() - start) > 25)
t.end()
})
})

tape('error when no promise is returned by async .read template', function (t) {
t.plan(1)
const r = new Readable({
read () {}
})
r.on('error', error => {
t.ok(error instanceof Error)
t.end()
})
r.read()
})
Loading

0 comments on commit a954a44

Please sign in to comment.