Skip to content

Commit

Permalink
stream: refactor to use more primordials
Browse files Browse the repository at this point in the history
PR-URL: nodejs#36346
Reviewed-By: Zijian Liu <lxxyxzj@gmail.com>
Reviewed-By: Darshan Sen <raisinten@gmail.com>
  • Loading branch information
aduh95 committed Feb 1, 2021
1 parent e6fbe48 commit 419686c
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 80 deletions.
16 changes: 10 additions & 6 deletions lib/internal/streams/buffer_list.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
'use strict';

const {
StringPrototypeSlice,
SymbolIterator,
TypedArrayPrototypeSet,
Uint8Array,
} = primordials;

Expand Down Expand Up @@ -67,7 +69,7 @@ module.exports = class BufferList {
let p = this.head;
let i = 0;
while (p) {
ret.set(p.data, i);
TypedArrayPrototypeSet(ret, p.data, i);
i += p.data.length;
p = p.next;
}
Expand Down Expand Up @@ -120,9 +122,9 @@ module.exports = class BufferList {
else
this.head = this.tail = null;
} else {
ret += str.slice(0, n);
ret += StringPrototypeSlice(str, 0, n);
this.head = p;
p.data = str.slice(n);
p.data = StringPrototypeSlice(str, n);
}
break;
}
Expand All @@ -141,18 +143,20 @@ module.exports = class BufferList {
do {
const buf = p.data;
if (n > buf.length) {
ret.set(buf, retLen - n);
TypedArrayPrototypeSet(ret, buf, retLen - n);
n -= buf.length;
} else {
if (n === buf.length) {
ret.set(buf, retLen - n);
TypedArrayPrototypeSet(ret, buf, retLen - n);
++c;
if (p.next)
this.head = p.next;
else
this.head = this.tail = null;
} else {
ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n);
TypedArrayPrototypeSet(ret,
new Uint8Array(buf.buffer, buf.byteOffset, n),
retLen - n);
this.head = p;
p.data = buf.slice(n);
}
Expand Down
11 changes: 8 additions & 3 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
const {
ERR_MULTIPLE_CALLBACK
} = require('internal/errors').codes;
const { Symbol } = primordials;
const {
FunctionPrototypeCall,
Symbol,
} = primordials;

const kDestroy = Symbol('kDestroy');
const kConstruct = Symbol('kConstruct');
Expand Down Expand Up @@ -93,7 +96,8 @@ function _destroy(self, err, cb) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
FunctionPrototypeCall(
then,
result,
function() {
if (called)
Expand Down Expand Up @@ -311,7 +315,8 @@ function constructNT(stream) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
FunctionPrototypeCall(
then,
result,
function() {
// If the callback was invoked, do nothing further.
Expand Down
20 changes: 13 additions & 7 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

'use strict';

const {
FunctionPrototype,
FunctionPrototypeCall,
} = primordials;
const {
ERR_STREAM_PREMATURE_CLOSE
} = require('internal/errors').codes;
Expand Down Expand Up @@ -53,7 +57,7 @@ function isWritableFinished(stream) {
return wState.finished || (wState.ended && wState.length === 0);
}

function nop() {}
const nop = FunctionPrototype;

function isReadableEnded(stream) {
if (stream.readableEnded) return true;
Expand Down Expand Up @@ -110,7 +114,7 @@ function eos(stream, options, callback) {
if (stream.destroyed) willEmitClose = false;

if (willEmitClose && (!stream.readable || readable)) return;
if (!readable || readableEnded) callback.call(stream);
if (!readable || readableEnded) FunctionPrototypeCall(callback, stream);
};

let readableEnded = stream.readableEnded ||
Expand All @@ -123,23 +127,25 @@ function eos(stream, options, callback) {
if (stream.destroyed) willEmitClose = false;

if (willEmitClose && (!stream.writable || writable)) return;
if (!writable || writableFinished) callback.call(stream);
if (!writable || writableFinished) FunctionPrototypeCall(callback, stream);
};

const onerror = (err) => {
callback.call(stream, err);
FunctionPrototypeCall(callback, stream, err);
};

const onclose = () => {
if (readable && !readableEnded) {
if (!isReadableEnded(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
return FunctionPrototypeCall(callback, stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
return FunctionPrototypeCall(callback, stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
callback.call(stream);
FunctionPrototypeCall(callback, stream);
};

const onrequest = () => {
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/streams/from.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const {
PromisePrototypeThen,
SymbolAsyncIterator,
SymbolIterator
} = primordials;
Expand Down Expand Up @@ -55,7 +56,8 @@ function from(Readable, iterable, opts) {
readable._destroy = function(error, cb) {
if (needToClose) {
needToClose = false;
close().then(
PromisePrototypeThen(
close(),
() => process.nextTick(cb, error),
(e) => process.nextTick(cb, error || e),
);
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/streams/lazy_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
'use strict';

const {
FunctionPrototypeCall,
ObjectDefineProperties,
ObjectDefineProperty,
ObjectSetPrototypeOf,
Expand All @@ -25,7 +26,7 @@ ObjectSetPrototypeOf(LazyTransform, stream.Transform);

function makeGetter(name) {
return function() {
stream.Transform.call(this, this._options);
FunctionPrototypeCall(stream.Transform, this, this._options);
this._writableState.decodeStrings = false;

if (!this._options || !this._options.defaultEncoding) {
Expand Down
6 changes: 4 additions & 2 deletions lib/internal/streams/legacy.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

const {
ArrayIsArray,
ArrayPrototypeUnshift,
FunctionPrototypeCall,
ObjectSetPrototypeOf,
} = primordials;

const EE = require('events');

function Stream(opts) {
EE.call(this, opts);
FunctionPrototypeCall(EE, this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
Expand Down Expand Up @@ -106,7 +108,7 @@ function prependListener(emitter, event, fn) {
if (!emitter._events || !emitter._events[event])
emitter.on(event, fn);
else if (ArrayIsArray(emitter._events[event]))
emitter._events[event].unshift(fn);
ArrayPrototypeUnshift(emitter._events[event], fn);
else
emitter._events[event] = [fn, emitter._events[event]];
}
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/streams/passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'use strict';

const {
FunctionPrototypeCall,
ObjectSetPrototypeOf,
} = primordials;

Expand All @@ -39,7 +40,7 @@ function PassThrough(options) {
if (!(this instanceof PassThrough))
return new PassThrough(options);

Transform.call(this, options);
FunctionPrototypeCall(Transform, this, options);
}

PassThrough.prototype._transform = function(chunk, encoding, cb) {
Expand Down
14 changes: 9 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

const {
ArrayIsArray,
ArrayPrototypePop,
ArrayPrototypePush,
ArrayPrototypeShift,
FunctionPrototypeCall,
ReflectApply,
SymbolAsyncIterator,
SymbolIterator,
Expand Down Expand Up @@ -75,7 +79,7 @@ function popCallback(streams) {
// a single stream. Therefore optimize for the average case instead of
// checking for length === 0 as well.
validateCallback(streams[streams.length - 1]);
return streams.pop();
return ArrayPrototypePop(streams);
}

function isReadable(obj) {
Expand Down Expand Up @@ -114,7 +118,7 @@ async function* fromReadable(val) {
Readable = require('internal/streams/readable');
}

yield* Readable.prototype[SymbolAsyncIterator].call(val);
yield* FunctionPrototypeCall(Readable.prototype[SymbolAsyncIterator], val);
}

async function pump(iterable, writable, finish) {
Expand Down Expand Up @@ -171,7 +175,7 @@ function pipeline(...streams) {
}

while (destroys.length) {
destroys.shift()(error);
ArrayPrototypeShift(destroys)(error);
}

if (final) {
Expand All @@ -187,7 +191,7 @@ function pipeline(...streams) {

if (isStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
ArrayPrototypePush(destroys, destroyer(stream, reading, writing, finish));
}

if (i === 0) {
Expand Down Expand Up @@ -250,7 +254,7 @@ function pipeline(...streams) {
ret = pt;

finishCount++;
destroys.push(destroyer(ret, false, true, finish));
ArrayPrototypePush(destroys, destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
Expand Down
Loading

0 comments on commit 419686c

Please sign in to comment.