diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js index 4de951ca549c73..81ec1fc12b80f4 100644 --- a/lib/internal/bootstrap/node.js +++ b/lib/internal/bootstrap/node.js @@ -140,9 +140,13 @@ function startup() { } if (isMainThread) { - mainThreadSetup.setupStdio(); + const { getStdout, getStdin, getStderr } = + NativeModule.require('internal/process/stdio').getMainThreadStdio(); + setupProcessStdio(getStdout, getStdin, getStderr); } else { - workerThreadSetup.setupStdio(); + const { getStdout, getStdin, getStderr } = + workerThreadSetup.initializeWorkerStdio(); + setupProcessStdio(getStdout, getStdin, getStderr); } if (global.__coverage__) @@ -312,8 +316,14 @@ function startup() { function startExecution() { // This means we are in a Worker context, and any script execution // will be directed by the worker module. - if (internalBinding('worker').getEnvMessagePort() !== undefined) { - NativeModule.require('internal/worker').setupChild(); + if (!isMainThread) { + const workerThreadSetup = NativeModule.require( + 'internal/process/worker_thread_only' + ); + // Set up the message port and start listening + const { workerFatalExeception } = workerThreadSetup.setup(); + // Overwrite fatalException + process._fatalException = workerFatalExeception; return; } @@ -505,6 +515,31 @@ function setupProcessObject() { EventEmitter.call(process); } +function setupProcessStdio(getStdout, getStdin, getStderr) { + Object.defineProperty(process, 'stdout', { + configurable: true, + enumerable: true, + get: getStdout + }); + + Object.defineProperty(process, 'stderr', { + configurable: true, + enumerable: true, + get: getStderr + }); + + Object.defineProperty(process, 'stdin', { + configurable: true, + enumerable: true, + get: getStdin + }); + + process.openStdin = function() { + process.stdin.resume(); + return process.stdin; + }; +} + function setupGlobalVariables() { Object.defineProperty(global, Symbol.toStringTag, { value: 'global', diff --git a/lib/internal/process/main_thread_only.js b/lib/internal/process/main_thread_only.js index 862194ae46e27e..42579e9da8acd1 100644 --- a/lib/internal/process/main_thread_only.js +++ b/lib/internal/process/main_thread_only.js @@ -16,15 +16,6 @@ const { validateString } = require('internal/validators'); -const { - setupProcessStdio, - getMainThreadStdio -} = require('internal/process/stdio'); - -function setupStdio() { - setupProcessStdio(getMainThreadStdio()); -} - // The execution of this function itself should not cause any side effects. function wrapProcessMethods(binding) { function chdir(directory) { @@ -174,7 +165,6 @@ function setupChildProcessIpcChannel() { } module.exports = { - setupStdio, wrapProcessMethods, setupSignalHandlers, setupChildProcessIpcChannel, diff --git a/lib/internal/process/stdio.js b/lib/internal/process/stdio.js index 5e9ff6b26097a6..bf5f6df15f123c 100644 --- a/lib/internal/process/stdio.js +++ b/lib/internal/process/stdio.js @@ -1,6 +1,5 @@ 'use strict'; -exports.setupProcessStdio = setupProcessStdio; exports.getMainThreadStdio = getMainThreadStdio; function dummyDestroy(err, cb) { cb(err); } @@ -134,31 +133,6 @@ function getMainThreadStdio() { }; } -function setupProcessStdio({ getStdout, getStdin, getStderr }) { - Object.defineProperty(process, 'stdout', { - configurable: true, - enumerable: true, - get: getStdout - }); - - Object.defineProperty(process, 'stderr', { - configurable: true, - enumerable: true, - get: getStderr - }); - - Object.defineProperty(process, 'stdin', { - configurable: true, - enumerable: true, - get: getStdin - }); - - process.openStdin = function() { - process.stdin.resume(); - return process.stdin; - }; -} - function createWritableStdioStream(fd) { var stream; const tty_wrap = internalBinding('tty_wrap'); diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js index 834ba6078fca44..a9332fb4277363 100644 --- a/lib/internal/process/worker_thread_only.js +++ b/lib/internal/process/worker_thread_only.js @@ -2,23 +2,54 @@ // This file contains process bootstrappers that can only be // run in the worker thread. +const { + getEnvMessagePort, + threadId +} = internalBinding('worker'); + +const debug = require('util').debuglog('worker'); const { - setupProcessStdio -} = require('internal/process/stdio'); + kWaitingStreams, + ReadableWorkerStdio, + WritableWorkerStdio +} = require('internal/worker/io'); const { - workerStdio + createMessageHandler, + createWorkerFatalExeception } = require('internal/worker'); -function setupStdio() { - setupProcessStdio({ - getStdout: () => workerStdio.stdout, - getStderr: () => workerStdio.stderr, - getStdin: () => workerStdio.stdin - }); +const workerStdio = {}; + +function initializeWorkerStdio() { + const port = getEnvMessagePort(); + port[kWaitingStreams] = 0; + workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin'); + workerStdio.stdout = new WritableWorkerStdio(port, 'stdout'); + workerStdio.stderr = new WritableWorkerStdio(port, 'stderr'); + + return { + getStdout() { return workerStdio.stdout; }, + getStderr() { return workerStdio.stderr; }, + getStdin() { return workerStdio.stdin; } + }; +} + +function setup() { + debug(`[${threadId}] is setting up worker child environment`); + + const port = getEnvMessagePort(); + const publicWorker = require('worker_threads'); + port.on('message', createMessageHandler(publicWorker, port, workerStdio)); + port.start(); + + return { + workerFatalExeception: createWorkerFatalExeception(port) + }; } module.exports = { - setupStdio + initializeWorkerStdio, + setup }; diff --git a/lib/internal/worker.js b/lib/internal/worker.js index c079afbeb1928c..c4393d0459cf7e 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -4,35 +4,37 @@ const EventEmitter = require('events'); const assert = require('assert'); const path = require('path'); const util = require('util'); -const { Readable, Writable } = require('stream'); const { ERR_WORKER_PATH, ERR_WORKER_UNSERIALIZABLE_ERROR, ERR_WORKER_UNSUPPORTED_EXTENSION, } = require('internal/errors').codes; const { validateString } = require('internal/validators'); +const { clearAsyncIdStack } = require('internal/async_hooks'); -const { MessagePort, MessageChannel } = internalBinding('messaging'); const { - handle_onclose: handleOnCloseSymbol, - oninit: onInitSymbol -} = internalBinding('symbols'); -const { clearAsyncIdStack } = require('internal/async_hooks'); + drainMessagePort, + MessageChannel, + messageTypes, + kPort, + kIncrementsPortRef, + kWaitingStreams, + kStdioWantsMoreDataCallback, + setupPortReferencing, + ReadableWorkerStdio, + WritableWorkerStdio, +} = require('internal/worker/io'); const { serializeError, deserializeError } = require('internal/error-serdes'); const { pathToFileURL } = require('url'); const { Worker: WorkerImpl, - getEnvMessagePort, threadId } = internalBinding('worker'); const isMainThread = threadId === 0; -const kOnMessageListener = Symbol('kOnMessageListener'); const kHandle = Symbol('kHandle'); -const kName = Symbol('kName'); -const kPort = Symbol('kPort'); const kPublicPort = Symbol('kPublicPort'); const kDispose = Symbol('kDispose'); const kOnExit = Symbol('kOnExit'); @@ -40,213 +42,9 @@ const kOnMessage = Symbol('kOnMessage'); const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); const kOnErrorMessage = Symbol('kOnErrorMessage'); const kParentSideStdio = Symbol('kParentSideStdio'); -const kWritableCallbacks = Symbol('kWritableCallbacks'); -const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); -const kStartedReading = Symbol('kStartedReading'); -const kWaitingStreams = Symbol('kWaitingStreams'); -const kIncrementsPortRef = Symbol('kIncrementsPortRef'); const debug = util.debuglog('worker'); -const messageTypes = { - UP_AND_RUNNING: 'upAndRunning', - COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError', - ERROR_MESSAGE: 'errorMessage', - STDIO_PAYLOAD: 'stdioPayload', - STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', - LOAD_SCRIPT: 'loadScript' -}; - -// We have to mess with the MessagePort prototype a bit, so that a) we can make -// it inherit from EventEmitter, even though it is a C++ class, and b) we do -// not provide methods that are not present in the Browser and not documented -// on our side (e.g. hasRef). -// Save a copy of the original set of methods as a shallow clone. -const MessagePortPrototype = Object.create( - Object.getPrototypeOf(MessagePort.prototype), - Object.getOwnPropertyDescriptors(MessagePort.prototype)); -// Set up the new inheritance chain. -Object.setPrototypeOf(MessagePort, EventEmitter); -Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype); -// Finally, purge methods we don't want to be public. -delete MessagePort.prototype.stop; -delete MessagePort.prototype.drain; -MessagePort.prototype.ref = MessagePortPrototype.ref; -MessagePort.prototype.unref = MessagePortPrototype.unref; - -// A communication channel consisting of a handle (that wraps around an -// uv_async_t) which can receive information from other threads and emits -// .onmessage events, and a function used for sending data to a MessagePort -// in some other thread. -MessagePort.prototype[kOnMessageListener] = function onmessage(payload) { - debug(`[${threadId}] received message`, payload); - // Emit the deserialized object to userland. - this.emit('message', payload); -}; - -// This is for compatibility with the Web's MessagePort API. It makes sense to -// provide it as an `EventEmitter` in Node.js, but if somebody overrides -// `onmessage`, we'll switch over to the Web API model. -Object.defineProperty(MessagePort.prototype, 'onmessage', { - enumerable: true, - configurable: true, - get() { - return this[kOnMessageListener]; - }, - set(value) { - this[kOnMessageListener] = value; - if (typeof value === 'function') { - this.ref(); - MessagePortPrototype.start.call(this); - } else { - this.unref(); - MessagePortPrototype.stop.call(this); - } - } -}); - -// This is called from inside the `MessagePort` constructor. -function oninit() { - setupPortReferencing(this, this, 'message'); -} - -Object.defineProperty(MessagePort.prototype, onInitSymbol, { - enumerable: true, - writable: false, - value: oninit -}); - -// This is called after the underlying `uv_async_t` has been closed. -function onclose() { - if (typeof this.onclose === 'function') { - // Not part of the Web standard yet, but there aren't many reasonable - // alternatives in a non-EventEmitter usage setting. - // Refs: https://github.com/whatwg/html/issues/1766 - this.onclose(); - } - this.emit('close'); -} - -Object.defineProperty(MessagePort.prototype, handleOnCloseSymbol, { - enumerable: false, - writable: false, - value: onclose -}); - -MessagePort.prototype.close = function(cb) { - if (typeof cb === 'function') - this.once('close', cb); - MessagePortPrototype.close.call(this); -}; - -Object.defineProperty(MessagePort.prototype, util.inspect.custom, { - enumerable: false, - writable: false, - value: function inspect() { // eslint-disable-line func-name-matching - let ref; - try { - // This may throw when `this` does not refer to a native object, - // e.g. when accessing the prototype directly. - ref = MessagePortPrototype.hasRef.call(this); - } catch { return this; } - return Object.assign(Object.create(MessagePort.prototype), - ref === undefined ? { - active: false, - } : { - active: true, - refed: ref - }, - this); - } -}); - -function setupPortReferencing(port, eventEmitter, eventName) { - // Keep track of whether there are any workerMessage listeners: - // If there are some, ref() the channel so it keeps the event loop alive. - // If there are none or all are removed, unref() the channel so the worker - // can shutdown gracefully. - port.unref(); - eventEmitter.on('newListener', (name) => { - if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { - port.ref(); - MessagePortPrototype.start.call(port); - } - }); - eventEmitter.on('removeListener', (name) => { - if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { - MessagePortPrototype.stop.call(port); - port.unref(); - } - }); -} - - -class ReadableWorkerStdio extends Readable { - constructor(port, name) { - super(); - this[kPort] = port; - this[kName] = name; - this[kIncrementsPortRef] = true; - this[kStartedReading] = false; - this.on('end', () => { - if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0) - this[kPort].unref(); - }); - } - - _read() { - if (!this[kStartedReading] && this[kIncrementsPortRef]) { - this[kStartedReading] = true; - if (this[kPort][kWaitingStreams]++ === 0) - this[kPort].ref(); - } - - this[kPort].postMessage({ - type: messageTypes.STDIO_WANTS_MORE_DATA, - stream: this[kName] - }); - } -} - -class WritableWorkerStdio extends Writable { - constructor(port, name) { - super({ decodeStrings: false }); - this[kPort] = port; - this[kName] = name; - this[kWritableCallbacks] = []; - } - - _write(chunk, encoding, cb) { - this[kPort].postMessage({ - type: messageTypes.STDIO_PAYLOAD, - stream: this[kName], - chunk, - encoding - }); - this[kWritableCallbacks].push(cb); - if (this[kPort][kWaitingStreams]++ === 0) - this[kPort].ref(); - } - - _final(cb) { - this[kPort].postMessage({ - type: messageTypes.STDIO_PAYLOAD, - stream: this[kName], - chunk: null - }); - cb(); - } - - [kStdioWantsMoreDataCallback]() { - const cbs = this[kWritableCallbacks]; - this[kWritableCallbacks] = []; - for (const cb of cbs) - cb(); - if ((this[kPort][kWaitingStreams] -= cbs.length) === 0) - this[kPort].unref(); - } -} - class Worker extends EventEmitter { constructor(filename, options = {}) { super(); @@ -314,8 +112,8 @@ class Worker extends EventEmitter { [kOnExit](code) { debug(`[${threadId}] hears end event for Worker ${this.threadId}`); - MessagePortPrototype.drain.call(this[kPublicPort]); - MessagePortPrototype.drain.call(this[kPort]); + drainMessagePort(this[kPublicPort]); + drainMessagePort(this[kPort]); this[kDispose](); this.emit('exit', code); this.removeAllListeners(); @@ -421,25 +219,8 @@ class Worker extends EventEmitter { } } -const workerStdio = {}; -if (!isMainThread) { - const port = getEnvMessagePort(); - port[kWaitingStreams] = 0; - workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin'); - workerStdio.stdout = new WritableWorkerStdio(port, 'stdout'); - workerStdio.stderr = new WritableWorkerStdio(port, 'stderr'); -} - -let originalFatalException; - -function setupChild() { - // Called during bootstrap to set up worker script execution. - debug(`[${threadId}] is setting up worker child environment`); - const port = getEnvMessagePort(); - - const publicWorker = require('worker_threads'); - - port.on('message', (message) => { +function createMessageHandler(publicWorker, port, workerStdio) { + return function(message) { if (message.type === messageTypes.LOAD_SCRIPT) { const { filename, doEval, workerData, publicPort, hasStdin } = message; publicWorker.parentPort = publicPort; @@ -471,14 +252,15 @@ function setupChild() { } assert.fail(`Unknown worker message type ${message.type}`); - }); - - port.start(); + }; +} - originalFatalException = process._fatalException; - process._fatalException = fatalException; +function createWorkerFatalExeception(port) { + const { + fatalException: originalFatalException + } = require('internal/process/execution'); - function fatalException(error) { + return function(error) { debug(`[${threadId}] gets fatal exception`); let caught = false; try { @@ -505,7 +287,7 @@ function setupChild() { process.exit(); } - } + }; } function pipeWithoutWarning(source, dest) { @@ -521,11 +303,9 @@ function pipeWithoutWarning(source, dest) { } module.exports = { - MessagePort, - MessageChannel, + createMessageHandler, + createWorkerFatalExeception, threadId, Worker, - setupChild, - isMainThread, - workerStdio + isMainThread }; diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js new file mode 100644 index 00000000000000..d249ba8508b7af --- /dev/null +++ b/lib/internal/worker/io.js @@ -0,0 +1,245 @@ +'use strict'; + +const { + handle_onclose: handleOnCloseSymbol, + oninit: onInitSymbol +} = internalBinding('symbols'); +const { + MessagePort, + MessageChannel +} = internalBinding('messaging'); +const { threadId } = internalBinding('worker'); + +const { Readable, Writable } = require('stream'); +const EventEmitter = require('events'); +const util = require('util'); +const debug = util.debuglog('worker'); + +const kIncrementsPortRef = Symbol('kIncrementsPortRef'); +const kName = Symbol('kName'); +const kOnMessageListener = Symbol('kOnMessageListener'); +const kPort = Symbol('kPort'); +const kWaitingStreams = Symbol('kWaitingStreams'); +const kWritableCallbacks = Symbol('kWritableCallbacks'); +const kStartedReading = Symbol('kStartedReading'); +const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); + +const messageTypes = { + UP_AND_RUNNING: 'upAndRunning', + COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError', + ERROR_MESSAGE: 'errorMessage', + STDIO_PAYLOAD: 'stdioPayload', + STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', + LOAD_SCRIPT: 'loadScript' +}; + +// Original drain from C++ +const originalDrain = MessagePort.prototype.drain; + +function drainMessagePort(port) { + return originalDrain.call(port); +} + +// We have to mess with the MessagePort prototype a bit, so that a) we can make +// it inherit from EventEmitter, even though it is a C++ class, and b) we do +// not provide methods that are not present in the Browser and not documented +// on our side (e.g. hasRef). +// Save a copy of the original set of methods as a shallow clone. +const MessagePortPrototype = Object.create( + Object.getPrototypeOf(MessagePort.prototype), + Object.getOwnPropertyDescriptors(MessagePort.prototype)); +// Set up the new inheritance chain. +Object.setPrototypeOf(MessagePort, EventEmitter); +Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype); +// Finally, purge methods we don't want to be public. +delete MessagePort.prototype.stop; +delete MessagePort.prototype.drain; +MessagePort.prototype.ref = MessagePortPrototype.ref; +MessagePort.prototype.unref = MessagePortPrototype.unref; + +// A communication channel consisting of a handle (that wraps around an +// uv_async_t) which can receive information from other threads and emits +// .onmessage events, and a function used for sending data to a MessagePort +// in some other thread. +MessagePort.prototype[kOnMessageListener] = function onmessage(payload) { + debug(`[${threadId}] received message`, payload); + // Emit the deserialized object to userland. + this.emit('message', payload); +}; + +// This is for compatibility with the Web's MessagePort API. It makes sense to +// provide it as an `EventEmitter` in Node.js, but if somebody overrides +// `onmessage`, we'll switch over to the Web API model. +Object.defineProperty(MessagePort.prototype, 'onmessage', { + enumerable: true, + configurable: true, + get() { + return this[kOnMessageListener]; + }, + set(value) { + this[kOnMessageListener] = value; + if (typeof value === 'function') { + this.ref(); + MessagePortPrototype.start.call(this); + } else { + this.unref(); + MessagePortPrototype.stop.call(this); + } + } +}); + +// This is called from inside the `MessagePort` constructor. +function oninit() { + setupPortReferencing(this, this, 'message'); +} + +Object.defineProperty(MessagePort.prototype, onInitSymbol, { + enumerable: true, + writable: false, + value: oninit +}); + +// This is called after the underlying `uv_async_t` has been closed. +function onclose() { + if (typeof this.onclose === 'function') { + // Not part of the Web standard yet, but there aren't many reasonable + // alternatives in a non-EventEmitter usage setting. + // Refs: https://github.com/whatwg/html/issues/1766 + this.onclose(); + } + this.emit('close'); +} + +Object.defineProperty(MessagePort.prototype, handleOnCloseSymbol, { + enumerable: false, + writable: false, + value: onclose +}); + +MessagePort.prototype.close = function(cb) { + if (typeof cb === 'function') + this.once('close', cb); + MessagePortPrototype.close.call(this); +}; + +Object.defineProperty(MessagePort.prototype, util.inspect.custom, { + enumerable: false, + writable: false, + value: function inspect() { // eslint-disable-line func-name-matching + let ref; + try { + // This may throw when `this` does not refer to a native object, + // e.g. when accessing the prototype directly. + ref = MessagePortPrototype.hasRef.call(this); + } catch { return this; } + return Object.assign(Object.create(MessagePort.prototype), + ref === undefined ? { + active: false, + } : { + active: true, + refed: ref + }, + this); + } +}); + +function setupPortReferencing(port, eventEmitter, eventName) { + // Keep track of whether there are any workerMessage listeners: + // If there are some, ref() the channel so it keeps the event loop alive. + // If there are none or all are removed, unref() the channel so the worker + // can shutdown gracefully. + port.unref(); + eventEmitter.on('newListener', (name) => { + if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + port.ref(); + MessagePortPrototype.start.call(port); + } + }); + eventEmitter.on('removeListener', (name) => { + if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + MessagePortPrototype.stop.call(port); + port.unref(); + } + }); +} + + +class ReadableWorkerStdio extends Readable { + constructor(port, name) { + super(); + this[kPort] = port; + this[kName] = name; + this[kIncrementsPortRef] = true; + this[kStartedReading] = false; + this.on('end', () => { + if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0) + this[kPort].unref(); + }); + } + + _read() { + if (!this[kStartedReading] && this[kIncrementsPortRef]) { + this[kStartedReading] = true; + if (this[kPort][kWaitingStreams]++ === 0) + this[kPort].ref(); + } + + this[kPort].postMessage({ + type: messageTypes.STDIO_WANTS_MORE_DATA, + stream: this[kName] + }); + } +} + +class WritableWorkerStdio extends Writable { + constructor(port, name) { + super({ decodeStrings: false }); + this[kPort] = port; + this[kName] = name; + this[kWritableCallbacks] = []; + } + + _write(chunk, encoding, cb) { + this[kPort].postMessage({ + type: messageTypes.STDIO_PAYLOAD, + stream: this[kName], + chunk, + encoding + }); + this[kWritableCallbacks].push(cb); + if (this[kPort][kWaitingStreams]++ === 0) + this[kPort].ref(); + } + + _final(cb) { + this[kPort].postMessage({ + type: messageTypes.STDIO_PAYLOAD, + stream: this[kName], + chunk: null + }); + cb(); + } + + [kStdioWantsMoreDataCallback]() { + const cbs = this[kWritableCallbacks]; + this[kWritableCallbacks] = []; + for (const cb of cbs) + cb(); + if ((this[kPort][kWaitingStreams] -= cbs.length) === 0) + this[kPort].unref(); + } +} + +module.exports = { + drainMessagePort, + messageTypes, + kPort, + kIncrementsPortRef, + kWaitingStreams, + kStdioWantsMoreDataCallback, + MessagePort, + MessageChannel, + setupPortReferencing, + ReadableWorkerStdio, + WritableWorkerStdio +}; diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 0609650cd5731d..828edb6bffce7b 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -2,12 +2,15 @@ const { isMainThread, - MessagePort, - MessageChannel, threadId, Worker } = require('internal/worker'); +const { + MessagePort, + MessageChannel +} = require('internal/worker/io'); + module.exports = { isMainThread, MessagePort, diff --git a/node.gyp b/node.gyp index 8db51f68572fd6..d2c3560131b7e5 100644 --- a/node.gyp +++ b/node.gyp @@ -181,6 +181,7 @@ 'lib/internal/stream_base_commons.js', 'lib/internal/vm/source_text_module.js', 'lib/internal/worker.js', + 'lib/internal/worker/io.js', 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/buffer_list.js',