diff --git a/doc/api/stream.md b/doc/api/stream.md index d8c27ea1c4c816..934a3bf047edb9 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -47,10 +47,10 @@ There are four fundamental stream types within Node.js: ### Object Mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` -objects. It is possible, however, for stream implementations to work with other -types of JavaScript values (with the exception of `null`, which serves a special -purpose within streams). Such streams are considered to operate in "object -mode". +(or `Uint8Array`) objects. It is possible, however, for stream implementations +to work with other types of JavaScript values (with the exception of `null`, +which serves a special purpose within streams). Such streams are considered to +operate in "object mode". Stream instances are switched into object mode using the `objectMode` option when the stream is created. Attempting to switch an existing stream into @@ -352,12 +352,17 @@ See also: [`writable.uncork()`][]. ##### writable.end([chunk][, encoding][, callback]) -* `chunk` {string|Buffer|any} Optional data to write. For streams not operating - in object mode, `chunk` must be a string or a `Buffer`. For object mode - streams, `chunk` may be any JavaScript value other than `null`. -* `encoding` {string} The encoding, if `chunk` is a String +* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams + not operating in object mode, `chunk` must be a string, `Buffer` or + `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value + other than `null`. +* `encoding` {string} The encoding, if `chunk` is a string * `callback` {Function} Optional callback for when the stream is finished Calling the `writable.end()` method signals that no more data will be written @@ -434,14 +439,20 @@ See also: [`writable.cork()`][]. -* `chunk` {string|Buffer} The data to write -* `encoding` {string} The encoding, if `chunk` is a String +* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams + not operating in object mode, `chunk` must be a string, `Buffer` or + `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value + other than `null`. +* `encoding` {string} The encoding, if `chunk` is a string * `callback` {Function} Callback for when this chunk of data is flushed * Returns: {boolean} `false` if the stream wishes for the calling code to wait for the `'drain'` event to be emitted before continuing to write @@ -985,9 +996,16 @@ setTimeout(() => { ##### readable.unshift(chunk) -* `chunk` {Buffer|string|any} Chunk of data to unshift onto the read queue +* `chunk` {Buffer|Uint8Array|string|any} Chunk of data to unshift onto the + read queue. For streams not operating in object mode, `chunk` must be a + string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be + any JavaScript value other than `null`. The `readable.unshift()` method pushes a chunk of data back into the internal buffer. This is useful in certain situations where a stream is being consumed by @@ -1274,8 +1292,9 @@ constructor and implement the `writable._write()` method. The Defaults to `true` * `objectMode` {boolean} Whether or not the [`stream.write(anyObj)`][stream-write] is a valid operation. When set, - it becomes possible to write JavaScript values other than string or - `Buffer` if supported by the stream implementation. Defaults to `false` + it becomes possible to write JavaScript values other than string, + `Buffer` or `Uint8Array` if supported by the stream implementation. + Defaults to `false` * `write` {Function} Implementation for the [`stream._write()`][stream-_write] method. * `writev` {Function} Implementation for the @@ -1564,16 +1583,26 @@ internal to the class that defines it, and should never be called directly by user programs. #### readable.push(chunk[, encoding]) + -* `chunk` {Buffer|null|string|any} Chunk of data to push into the read queue -* `encoding` {string} Encoding of String chunks. Must be a valid +* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the + read queue. For streams not operating in object mode, `chunk` must be a + string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be + any JavaScript value. +* `encoding` {string} Encoding of string chunks. Must be a valid Buffer encoding, such as `'utf8'` or `'ascii'` * Returns {boolean} `true` if additional chunks of data may continued to be pushed; `false` otherwise. -When `chunk` is not `null`, the `chunk` of data will be added to the -internal queue for users of the stream to consume. Passing `chunk` as `null` -signals the end of the stream (EOF), after which no more data can be written. +When `chunk` is a `Buffer`, `Uint8Array` or `string`, the `chunk` of data will +be added to the internal queue for users of the stream to consume. +Passing `chunk` as `null` signals the end of the stream (EOF), after which no +more data can be written. When the Readable is operating in paused mode, the data added with `readable.push()` can be read out by calling the @@ -2088,8 +2117,8 @@ Readable stream class internals. Use of `readable.push('')` is not recommended. -Pushing a zero-byte string or `Buffer` to a stream that is not in object mode -has an interesting side effect. Because it *is* a call to +Pushing a zero-byte string, `Buffer` or `Uint8Array` to a stream that is not in +object mode has an interesting side effect. Because it *is* a call to [`readable.push()`][stream-push], the call will end the reading process. However, because the argument is an empty string, no data is added to the readable buffer so there is nothing for a user to consume. diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 9666c7d6fb2a18..4ba224cd5a90b0 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -212,6 +212,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (er) { stream.emit('error', er); } else if (state.objectMode || chunk && chunk.length > 0) { + if (typeof chunk !== 'string' && + Object.getPrototypeOf(chunk) !== Buffer.prototype && + !state.objectMode) { + chunk = Stream._uint8ArrayToBuffer(chunk); + } + if (addToFront) { if (state.endEmitted) stream.emit('error', new Error('stream.unshift() after end event')); @@ -259,7 +265,7 @@ function addChunk(stream, state, chunk, addToFront) { function chunkInvalid(state, chunk) { var er; - if (!(chunk instanceof Buffer) && + if (!Stream._isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) { diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 768d80a96e065c..615f5a37624f31 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -248,7 +248,11 @@ function validChunk(stream, state, chunk, cb) { Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; var ret = false; - var isBuf = (chunk instanceof Buffer); + var isBuf = Stream._isUint8Array(chunk) && !state.objectMode; + + if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) { + chunk = Stream._uint8ArrayToBuffer(chunk); + } if (typeof encoding === 'function') { cb = encoding; diff --git a/lib/internal/streams/BufferList.js b/lib/internal/streams/BufferList.js index b9bdfa7d0ea0ea..6e724e9fb85695 100644 --- a/lib/internal/streams/BufferList.js +++ b/lib/internal/streams/BufferList.js @@ -2,6 +2,10 @@ const Buffer = require('buffer').Buffer; +function copyBuffer(src, target, offset) { + Buffer.prototype.copy.call(src, target, offset); +} + module.exports = class BufferList { constructor() { this.head = null; @@ -63,7 +67,7 @@ module.exports = class BufferList { var p = this.head; var i = 0; while (p) { - p.data.copy(ret, i); + copyBuffer(p.data, ret, i); i += p.data.length; p = p.next; } diff --git a/lib/stream.js b/lib/stream.js index dca4f50fc09eac..9c1ba986d01e5a 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -21,6 +21,8 @@ 'use strict'; +const Buffer = require('buffer').Buffer; + // Note: export Stream before Readable/Writable/Duplex/... // to avoid a cross-reference(require) issues const Stream = module.exports = require('internal/streams/legacy'); @@ -33,3 +35,38 @@ Stream.PassThrough = require('_stream_passthrough'); // Backwards-compat with node 0.4.x Stream.Stream = Stream; + +// Internal utilities +try { + Stream._isUint8Array = process.binding('util').isUint8Array; +} catch (e) { + // This throws for Node < 4.2.0 because there’s no util binding and + // returns undefined for Node < 7.4.0. +} + +if (!Stream._isUint8Array) { + Stream._isUint8Array = function _isUint8Array(obj) { + return Object.prototype.toString.call(obj) === '[object Uint8Array]'; + }; +} + +const version = process.version.substr(1).split('.'); +if (version[0] === 0 && version[1] < 12) { + Stream._uint8ArrayToBuffer = Buffer; +} else { + try { + const internalBuffer = require('internal/buffer'); + Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { + return new internalBuffer.FastBuffer(chunk.buffer, + chunk.byteOffset, + chunk.byteLength); + }; + } catch (e) { + } + + if (!Stream._uint8ArrayToBuffer) { + Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { + return Buffer.prototype.slice.call(chunk); + }; + } +} diff --git a/test/parallel/test-stream-uint8array.js b/test/parallel/test-stream-uint8array.js new file mode 100644 index 00000000000000..1b02c55d97fdc3 --- /dev/null +++ b/test/parallel/test-stream-uint8array.js @@ -0,0 +1,102 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const Buffer = require('buffer').Buffer; + +const { Readable, Writable } = require('stream'); + +const ABC = new Uint8Array([0x41, 0x42, 0x43]); +const DEF = new Uint8Array([0x44, 0x45, 0x46]); +const GHI = new Uint8Array([0x47, 0x48, 0x49]); + +{ + // Simple Writable test. + + let n = 0; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + if (n++ === 0) { + assert.strictEqual(String(chunk), 'ABC'); + } else { + assert.strictEqual(String(chunk), 'DEF'); + } + + cb(); + }, 2) + }); + + writable.write(ABC); + writable.end(DEF); +} + +{ + // Writable test, pass in Uint8Array in object mode. + + const writable = new Writable({ + objectMode: true, + write: common.mustCall((chunk, encoding, cb) => { + assert(!(chunk instanceof Buffer)); + assert(chunk instanceof Uint8Array); + assert.strictEqual(chunk, ABC); + assert.strictEqual(encoding, 'utf8'); + cb(); + }) + }); + + writable.end(ABC); +} + +{ + // Writable test, multiple writes carried out via writev. + let callback; + + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert.strictEqual(encoding, 'buffer'); + assert.strictEqual(String(chunk), 'ABC'); + callback = cb; + }), + writev: common.mustCall((chunks, cb) => { + assert.strictEqual(chunks.length, 2); + assert.strictEqual(chunks[0].encoding, 'buffer'); + assert.strictEqual(chunks[1].encoding, 'buffer'); + assert.strictEqual(chunks[0].chunk + chunks[1].chunk, 'DEFGHI'); + }) + }); + + writable.write(ABC); + writable.write(DEF); + writable.end(GHI); + callback(); +} + +{ + // Simple Readable test. + const readable = new Readable({ + read() {} + }); + + readable.push(DEF); + readable.unshift(ABC); + + const buf = readable.read(); + assert(buf instanceof Buffer); + assert.deepStrictEqual([...buf], [...ABC, ...DEF]); +} + +{ + // Readable test, setEncoding. + const readable = new Readable({ + read() {} + }); + + readable.setEncoding('utf8'); + + readable.push(DEF); + readable.unshift(ABC); + + const out = readable.read(); + assert.strictEqual(out, 'ABCDEF'); +}