Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: support Uint8Array input to methods #11608

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ There are four fundamental stream types within Node.js:
### Object Mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
objects. It is possible, however, for stream implementations to work with other
types of JavaScript values (with the exception of `null`, which serves a special
purpose within streams). Such streams are considered to operate in "object
mode".
(or `Uint8Array`) objects. It is possible, however, for stream implementations
to work with other types of JavaScript values (with the exception of `null`,
which serves a special purpose within streams). Such streams are considered to
operate in "object mode".

Stream instances are switched into object mode using the `objectMode` option
when the stream is created. Attempting to switch an existing stream into
Expand Down Expand Up @@ -352,12 +352,17 @@ See also: [`writable.uncork()`][].
##### writable.end([chunk][, encoding][, callback])
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {string|Buffer|any} Optional data to write. For streams not operating
in object mode, `chunk` must be a string or a `Buffer`. For object mode
streams, `chunk` may be any JavaScript value other than `null`.
* `encoding` {string} The encoding, if `chunk` is a String
* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Optional callback for when the stream is finished

Calling the `writable.end()` method signals that no more data will be written
Expand Down Expand Up @@ -434,14 +439,20 @@ See also: [`writable.cork()`][].
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
- version: v6.0.0
pr-url: https://github.com/nodejs/node/pull/6170
description: Passing `null` as the `chunk` parameter will always be
considered invalid now, even in object mode.
-->

* `chunk` {string|Buffer} The data to write
* `encoding` {string} The encoding, if `chunk` is a String
* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Callback for when this chunk of data is flushed
* Returns: {boolean} `false` if the stream wishes for the calling code to
wait for the `'drain'` event to be emitted before continuing to write
Expand Down Expand Up @@ -985,9 +996,16 @@ setTimeout(() => {
##### readable.unshift(chunk)
<!-- YAML
added: v0.9.11
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {Buffer|string|any} Chunk of data to unshift onto the read queue
* `chunk` {Buffer|Uint8Array|string|any} Chunk of data to unshift onto the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value other than `null`.

The `readable.unshift()` method pushes a chunk of data back into the internal
buffer. This is useful in certain situations where a stream is being consumed by
Expand Down Expand Up @@ -1274,8 +1292,9 @@ constructor and implement the `writable._write()` method. The
Defaults to `true`
* `objectMode` {boolean} Whether or not the
[`stream.write(anyObj)`][stream-write] is a valid operation. When set,
it becomes possible to write JavaScript values other than string or
`Buffer` if supported by the stream implementation. Defaults to `false`
it becomes possible to write JavaScript values other than string,
`Buffer` or `Uint8Array` if supported by the stream implementation.
Defaults to `false`
* `write` {Function} Implementation for the
[`stream._write()`][stream-_write] method.
* `writev` {Function} Implementation for the
Expand Down Expand Up @@ -1564,16 +1583,26 @@ internal to the class that defines it, and should never be called directly by
user programs.

#### readable.push(chunk[, encoding])
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {Buffer|null|string|any} Chunk of data to push into the read queue
* `encoding` {string} Encoding of String chunks. Must be a valid
* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value.
* `encoding` {string} Encoding of string chunks. Must be a valid
Buffer encoding, such as `'utf8'` or `'ascii'`
* Returns {boolean} `true` if additional chunks of data may continued to be
pushed; `false` otherwise.

When `chunk` is not `null`, the `chunk` of data will be added to the
internal queue for users of the stream to consume. Passing `chunk` as `null`
signals the end of the stream (EOF), after which no more data can be written.
When `chunk` is a `Buffer`, `Uint8Array` or `string`, the `chunk` of data will
be added to the internal queue for users of the stream to consume.
Passing `chunk` as `null` signals the end of the stream (EOF), after which no
more data can be written.

When the Readable is operating in paused mode, the data added with
`readable.push()` can be read out by calling the
Expand Down Expand Up @@ -2088,8 +2117,8 @@ Readable stream class internals.

Use of `readable.push('')` is not recommended.

Pushing a zero-byte string or `Buffer` to a stream that is not in object mode
has an interesting side effect. Because it *is* a call to
Pushing a zero-byte string, `Buffer` or `Uint8Array` to a stream that is not in
object mode has an interesting side effect. Because it *is* a call to
[`readable.push()`][stream-push], the call will end the reading process.
However, because the argument is an empty string, no data is added to the
readable buffer so there is nothing for a user to consume.
Expand Down
8 changes: 7 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (er) {
stream.emit('error', er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' &&
Object.getPrototypeOf(chunk) !== Buffer.prototype &&
!state.objectMode) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}

if (addToFront) {
if (state.endEmitted)
stream.emit('error', new Error('stream.unshift() after end event'));
Expand Down Expand Up @@ -259,7 +265,7 @@ function addChunk(stream, state, chunk, addToFront) {

function chunkInvalid(state, chunk) {
var er;
if (!(chunk instanceof Buffer) &&
if (!Stream._isUint8Array(chunk) &&
typeof chunk !== 'string' &&
chunk !== undefined &&
!state.objectMode) {
Expand Down
6 changes: 5 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,11 @@ function validChunk(stream, state, chunk, cb) {
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
var isBuf = (chunk instanceof Buffer);
var isBuf = Stream._isUint8Array(chunk) && !state.objectMode;

if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}

if (typeof encoding === 'function') {
cb = encoding;
Expand Down
6 changes: 5 additions & 1 deletion lib/internal/streams/BufferList.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

const Buffer = require('buffer').Buffer;

function copyBuffer(src, target, offset) {
Buffer.prototype.copy.call(src, target, offset);
}

module.exports = class BufferList {
constructor() {
this.head = null;
Expand Down Expand Up @@ -63,7 +67,7 @@ module.exports = class BufferList {
var p = this.head;
var i = 0;
while (p) {
p.data.copy(ret, i);
copyBuffer(p.data, ret, i);
i += p.data.length;
p = p.next;
}
Expand Down
37 changes: 37 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

'use strict';

const Buffer = require('buffer').Buffer;

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');
Expand All @@ -33,3 +35,38 @@ Stream.PassThrough = require('_stream_passthrough');

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

// Internal utilities
try {
Stream._isUint8Array = process.binding('util').isUint8Array;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this into internal/util?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joyeecheung The thing is that the streams modules can’t really unconditionally rely on internal modules because they are also exported as https://github.com/nodejs/readable-stream. @mscdex expressed a preference for having this code polyfilled on the readable-stream side and I agree, but ultimately that’s @nodejs/streams’ call …

(Please let me know if I’m misunderstanding you!)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, forgot about the vendoring out part :) Nevermind then!

} catch (e) {
// This throws for Node < 4.2.0 because there’s no util binding and
// returns undefined for Node < 7.4.0.
}

if (!Stream._isUint8Array) {
Stream._isUint8Array = function _isUint8Array(obj) {
return Object.prototype.toString.call(obj) === '[object Uint8Array]';
};
}

const version = process.version.substr(1).split('.');
if (version[0] === 0 && version[1] < 12) {
Stream._uint8ArrayToBuffer = Buffer;
} else {
try {
const internalBuffer = require('internal/buffer');
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return new internalBuffer.FastBuffer(chunk.buffer,
chunk.byteOffset,
chunk.byteLength);
};
} catch (e) {
}

if (!Stream._uint8ArrayToBuffer) {
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return Buffer.prototype.slice.call(chunk);
};
}
}
102 changes: 102 additions & 0 deletions test/parallel/test-stream-uint8array.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const Buffer = require('buffer').Buffer;

const { Readable, Writable } = require('stream');

const ABC = new Uint8Array([0x41, 0x42, 0x43]);
const DEF = new Uint8Array([0x44, 0x45, 0x46]);
const GHI = new Uint8Array([0x47, 0x48, 0x49]);

{
// Simple Writable test.

let n = 0;
const writable = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert(chunk instanceof Buffer);
if (n++ === 0) {
assert.strictEqual(String(chunk), 'ABC');
} else {
assert.strictEqual(String(chunk), 'DEF');
}

cb();
}, 2)
});

writable.write(ABC);
writable.end(DEF);
}

{
// Writable test, pass in Uint8Array in object mode.

const writable = new Writable({
objectMode: true,
write: common.mustCall((chunk, encoding, cb) => {
assert(!(chunk instanceof Buffer));
assert(chunk instanceof Uint8Array);
assert.strictEqual(chunk, ABC);
assert.strictEqual(encoding, 'utf8');
cb();
})
});

writable.end(ABC);
}

{
// Writable test, multiple writes carried out via writev.
let callback;

const writable = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert(chunk instanceof Buffer);
assert.strictEqual(encoding, 'buffer');
assert.strictEqual(String(chunk), 'ABC');
callback = cb;
}),
writev: common.mustCall((chunks, cb) => {
assert.strictEqual(chunks.length, 2);
assert.strictEqual(chunks[0].encoding, 'buffer');
assert.strictEqual(chunks[1].encoding, 'buffer');
assert.strictEqual(chunks[0].chunk + chunks[1].chunk, 'DEFGHI');
})
});

writable.write(ABC);
writable.write(DEF);
writable.end(GHI);
callback();
}

{
// Simple Readable test.
const readable = new Readable({
read() {}
});

readable.push(DEF);
readable.unshift(ABC);

const buf = readable.read();
assert(buf instanceof Buffer);
assert.deepStrictEqual([...buf], [...ABC, ...DEF]);
}

{
// Readable test, setEncoding.
const readable = new Readable({
read() {}
});

readable.setEncoding('utf8');

readable.push(DEF);
readable.unshift(ABC);

const out = readable.read();
assert.strictEqual(out, 'ABCDEF');
}