Skip to content

Commit

Permalink
stream: optimize creation
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Oct 23, 2023
1 parent 25576b5 commit 9426333
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 55 deletions.
42 changes: 40 additions & 2 deletions lib/internal/streams/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ const {

module.exports = Duplex;

const Stream = require('internal/streams/legacy').Stream;
const Readable = require('internal/streams/readable');
const Writable = require('internal/streams/writable');

const {
addAbortSignal,
} = require('internal/streams/add-abort-signal');

const destroyImpl = require('internal/streams/destroy');
const { kOnConstructed } = require('internal/streams/utils');

ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
ObjectSetPrototypeOf(Duplex, Readable);

Expand All @@ -55,8 +63,8 @@ function Duplex(options) {
if (!(this instanceof Duplex))
return new Duplex(options);

Readable.call(this, options);
Writable.call(this, options);
this._readableState = new Readable.ReadableState(options, this, true);
this._writableState = new Writable.WritableState(options, this, true);

if (options) {
this.allowHalfOpen = options.allowHalfOpen !== false;
Expand All @@ -73,9 +81,39 @@ function Duplex(options) {
this._writableState.ended = true;
this._writableState.finished = true;
}

if (typeof options.read === 'function')
this._read = options.read;

if (typeof options.write === 'function')
this._write = options.write;

if (typeof options.writev === 'function')
this._writev = options.writev;

if (typeof options.destroy === 'function')
this._destroy = options.destroy;

if (typeof options.final === 'function')
this._final = options.final;

if (typeof options.construct === 'function')
this._construct = options.construct;

if (options.signal)
addAbortSignal(options.signal, this);
} else {
this.allowHalfOpen = true;
}

Stream.call(this, options);

if (this._construct != null) {
destroyImpl.construct(this, () => {
this._readableState[kOnConstructed](this);
this._writableState[kOnConstructed](this);
});
}
}

ObjectDefineProperties(Duplex.prototype, {
Expand Down
32 changes: 13 additions & 19 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const {
AbortError,
} = require('internal/errors');
const { validateObject } = require('internal/validators');
const { kOnConstructed } = require('internal/streams/utils');

const kState = Symbol('kState');

Expand Down Expand Up @@ -251,14 +252,6 @@ ObjectDefineProperties(ReadableState.prototype, {


function ReadableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
if (typeof isDuplex !== 'boolean')
isDuplex = stream instanceof Stream.Duplex;

// Bit map field to store ReadableState more effciently with 1 bit per field
// instead of a V8 slot per field.
this[kState] = kEmitClose | kAutoDestroy | kConstructed | kSync;
Expand Down Expand Up @@ -310,16 +303,17 @@ function ReadableState(options, stream, isDuplex) {
}
}

ReadableState.prototype[kOnConstructed] = function onConstructed (stream) {
if ((this[kState] & kNeedReadable) !== 0) {
maybeReadMore(stream, this);
}
}

function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);

// Checking for a Stream.Duplex instance is faster here instead of inside
// the ReadableState constructor, at least with V8 6.5.
const isDuplex = this instanceof Stream.Duplex;

this._readableState = new ReadableState(options, this, isDuplex);
this._readableState = new ReadableState(options, this, false);

if (options) {
if (typeof options.read === 'function')
Expand All @@ -331,17 +325,17 @@ function Readable(options) {
if (typeof options.construct === 'function')
this._construct = options.construct;

if (options.signal && !isDuplex)
if (options.signal)
addAbortSignal(options.signal, this);
}

Stream.call(this, options);

destroyImpl.construct(this, () => {
if (this._readableState.needReadable) {
maybeReadMore(this, this._readableState);
}
});
if (this._construct != null) {
destroyImpl.construct(this, () => {
this._readableState[kOnConstructed](this);
});
}
}

Readable.prototype.destroy = destroyImpl.destroy;
Expand Down
3 changes: 3 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const kIsReadable = SymbolFor('nodejs.stream.readable');
const kIsWritable = SymbolFor('nodejs.stream.writable');
const kIsDisturbed = SymbolFor('nodejs.stream.disturbed');

const kOnConstructed = Symbol('kOnConstructed')

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');

Expand Down Expand Up @@ -303,6 +305,7 @@ function isErrored(stream) {
}

module.exports = {
kOnConstructed,
isDestroyed,
kIsDestroyed,
isDisturbed,
Expand Down
56 changes: 22 additions & 34 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const EE = require('events');
const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
const { kOnConstructed } = require('internal/streams/utils');

const {
addAbortSignal,
Expand Down Expand Up @@ -290,20 +291,15 @@ ObjectDefineProperties(WritableState.prototype, {
});

function WritableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream,
// e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
if (typeof isDuplex !== 'boolean')
isDuplex = stream instanceof Stream.Duplex;

// Bit map field to store WritableState more effciently with 1 bit per field
// instead of a V8 slot per field.
this[kState] = kSync | kConstructed | kEmitClose | kAutoDestroy;

if (options && options.objectMode) this[kState] |= kObjectMode;
if (isDuplex && options && options.writableObjectMode) this[kState] |= kObjectMode;
if (options && options.objectMode)
this[kState] |= kObjectMode;

if (isDuplex && options && options.writableObjectMode)
this[kState] |= kObjectMode;

// The point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
Expand Down Expand Up @@ -372,23 +368,21 @@ ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
},
});

function Writable(options) {
// Writable ctor is applied to Duplexes, too.
// `realHasInstance` is necessary because using plain `instanceof`
// would return false, as no `_writableState` property is attached.

// Trying to use the custom `instanceof` for Writable here will also break the
// Node.js LazyTransform implementation, which has a non-trivial getter for
// `_writableState` that would lead to infinite recursion.
WritableState.prototype[kOnConstructed] = function onConstructed(stream) {
if ((this[kState] & kWriting) === 0) {
clearBuffer(stream, this);
}

// Checking for a Stream.Duplex instance is faster here instead of inside
// the WritableState constructor, at least with V8 6.5.
const isDuplex = (this instanceof Stream.Duplex);
if ((this[kState] & kEnding) !== 0) {
finishMaybe(stream, this);
}
}

if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this))
function Writable(options) {
if (!(this instanceof Writable))
return new Writable(options);

this._writableState = new WritableState(options, this, isDuplex);
this._writableState = new WritableState(options, this, false);

if (options) {
if (typeof options.write === 'function')
Expand All @@ -412,17 +406,11 @@ function Writable(options) {

Stream.call(this, options);

destroyImpl.construct(this, () => {
const state = this._writableState;

if ((state[kState] & kWriting) === 0) {
clearBuffer(this, state);
}

if ((state[kState] & kEnding) !== 0) {
finishMaybe(this, state);
}
});
if (this._construct != null) {
destroyImpl.construct(this, () => {
this._writableState[kOnConstructed](this);
});
}
}

ObjectDefineProperty(Writable, SymbolHasInstance, {
Expand Down

0 comments on commit 9426333

Please sign in to comment.