From 43c3b43ea3f0e535bdca0680ae246900aa5349da Mon Sep 17 00:00:00 2001 From: wwwzbwcom Date: Fri, 5 Mar 2021 19:55:00 +0800 Subject: [PATCH] stream: make Readable.from performance better PR-URL: https://github.com/nodejs/node/pull/37609 Reviewed-By: Robert Nagy Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina --- benchmark/streams/readable-from.js | 26 +++++++++++++++ lib/internal/streams/from.js | 51 +++++++++++++++++++----------- 2 files changed, 58 insertions(+), 19 deletions(-) create mode 100644 benchmark/streams/readable-from.js diff --git a/benchmark/streams/readable-from.js b/benchmark/streams/readable-from.js new file mode 100644 index 00000000000000..2dcf10ffef2732 --- /dev/null +++ b/benchmark/streams/readable-from.js @@ -0,0 +1,26 @@ +'use strict'; + +const common = require('../common'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [1e7], +}); + +async function main({ n }) { + const arr = []; + for (let i = 0; i < n; i++) { + arr.push(`${i}`); + } + + const s = new Readable.from(arr); + + bench.start(); + s.on('data', (data) => { + // eslint-disable-next-line no-unused-expressions + data; + }); + s.on('close', () => { + bench.end(n); + }); +} diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 6798411fa4b038..906ddcf9af844b 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -3,7 +3,7 @@ const { PromisePrototypeThen, SymbolAsyncIterator, - SymbolIterator + SymbolIterator, } = primordials; const { Buffer } = require('buffer'); @@ -25,18 +25,22 @@ function from(Readable, iterable, opts) { }); } - if (iterable && iterable[SymbolAsyncIterator]) + let isAsync = false; + if (iterable && iterable[SymbolAsyncIterator]) { + isAsync = true; iterator = iterable[SymbolAsyncIterator](); - else if (iterable && iterable[SymbolIterator]) + } else if (iterable && iterable[SymbolIterator]) { + isAsync = false; iterator = iterable[SymbolIterator](); - else + } else { throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); + } const readable = new Readable({ objectMode: true, highWaterMark: 1, // TODO(ronag): What options should be allowed? - ...opts + ...opts, }); // Flag to protect against _read @@ -75,23 +79,32 @@ function from(Readable, iterable, opts) { } async function next() { - try { - const { value, done } = await iterator.next(); - if (done) { - readable.push(null); - } else { - const res = await value; - if (res === null) { - reading = false; - throw new ERR_STREAM_NULL_VALUES(); - } else if (readable.push(res)) { - next(); + for (;;) { + try { + const { value, done } = isAsync ? + await iterator.next() : + iterator.next(); + + if (done) { + readable.push(null); } else { - reading = false; + const res = (value && + typeof value.then === 'function') ? + await value : + value; + if (res === null) { + reading = false; + throw new ERR_STREAM_NULL_VALUES(); + } else if (readable.push(res)) { + continue; + } else { + reading = false; + } } + } catch (err) { + readable.destroy(err); } - } catch (err) { - readable.destroy(err); + break; } } return readable;