Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: performance improvement on readline async iterator #41276

Merged
merged 25 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c467ffe
lib: performance improvement on readline async iterator
Dec 20, 2021
05db8ff
lib: undoing accidental changes
Farenheith Dec 22, 2021
3ecee3c
lib: changing once to on in waitNext
Farenheith Dec 23, 2021
83919d2
lib: adding benchmarks for readline iterable
Farenheith Dec 29, 2021
03ac26d
lib: adding different lines multiplier
Farenheith Dec 29, 2021
870b5cf
lib: fixing lint on benchmark
Farenheith Dec 29, 2021
ae3325c
lib: adding test to validate slow stream
Farenheith Dec 29, 2021
bb6f6ab
lib: adding return and throw implementation to eventsToAsyncIteratorF…
Farenheith Dec 30, 2021
221e239
lib: performance improvement on readline async iterator
Dec 20, 2021
b536144
lib: undoing accidental changes
Farenheith Dec 22, 2021
31efb2d
lib: changing once to on in waitNext
Farenheith Dec 23, 2021
0e04a18
lib: fixing lint on benchmark
Farenheith Dec 29, 2021
7a8c3fa
lib: adding test to validate slow stream
Farenheith Dec 29, 2021
156837b
lib: adding return and throw implementation to eventsToAsyncIteratorF…
Farenheith Dec 30, 2021
87da1a2
lib: keep ASCII order on primordials import
Farenheith Feb 2, 2022
e7f358a
lib: changing standard for internal constants names
Farenheith Feb 2, 2022
d5917c9
lib: unifying on and eventsToAsyncIteratorFactory functions
Farenheith Feb 4, 2022
0143cdf
lib: making firstEventParam interna only
Farenheith Feb 4, 2022
917c51b
lib: refactoring function on
Farenheith Feb 5, 2022
4e8be80
lint: fixing linting problems after merge
Farenheith Sep 14, 2022
610a275
refactor: Apply suggestions from code review
Farenheith Sep 14, 2022
e1dd9d3
refactor: readding wrongly removed SymAsyncIterator definition
Farenheith Sep 14, 2022
26d066b
refactor: using ArrayPrototypePush instead of push
Farenheith Sep 14, 2022
835bb5f
lib: fixing FixedQueue.shift calls mistakenly using ArrayPrototypeShift
Farenheith Sep 15, 2022
e83baea
lib: fixing faulting PromiseResolve call
Farenheith Sep 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions benchmark/readline/readline-iterable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';
const common = require('../common.js');
const readline = require('readline');
const { Readable } = require('stream');

const bench = common.createBenchmark(main, {
n: [1e1, 1e2, 1e3, 1e4, 1e5, 1e6],
});

const loremIpsum = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed
do eiusmod tempor incididunt ut labore et dolore magna aliqua.
Dui accumsan sit amet nulla facilisi morbi tempus iaculis urna.
Eget dolor morbi non arcu risus quis varius quam quisque.
Lacus viverra vitae congue eu consequat ac felis donec.
Amet porttitor eget dolor morbi non arcu.
Velit ut tortor pretium viverra suspendisse.
Mauris nunc congue nisi vitae suscipit tellus.
Amet nisl suscipit adipiscing bibendum est ultricies integer.
Sit amet dictum sit amet justo donec enim diam.
Condimentum mattis pellentesque id nibh tortor id aliquet lectus proin.
Diam in arcu cursus euismod quis viverra nibh.
Rest of line`;

function getLoremIpsumStream(repetitions) {
const readable = Readable({
objectMode: true,
});
let i = 0;
readable._read = () => readable.push(
i++ >= repetitions ? null : loremIpsum
);
return readable;
}

async function main({ n }) {
bench.start();
let lineCount = 0;

const iterable = readline.createInterface({
input: getLoremIpsumStream(n),
});

// eslint-disable-next-line no-unused-vars
for await (const _ of iterable) {
lineCount++;
}
bench.end(lineCount);
}
166 changes: 113 additions & 53 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

const {
ArrayPrototypeJoin,
ArrayPrototypeShift,
ArrayPrototypePop,
ArrayPrototypePush,
ArrayPrototypeSlice,
ArrayPrototypeSplice,
ArrayPrototypeUnshift,
Expand All @@ -33,6 +34,7 @@ const {
FunctionPrototypeBind,
FunctionPrototypeCall,
NumberIsNaN,
NumberMAX_SAFE_INTEGER,
ObjectCreate,
ObjectDefineProperty,
ObjectDefineProperties,
Expand All @@ -59,6 +61,8 @@ const {
} = require('internal/util/inspect');

let spliceOne;
let FixedQueue;
let kFirstEventParam;

const {
AbortError,
Expand All @@ -73,6 +77,7 @@ const {
} = require('internal/errors');

const {
validateInteger,
validateAbortSignal,
validateBoolean,
validateFunction,
Expand All @@ -84,6 +89,7 @@ const kErrorMonitor = Symbol('events.errorMonitor');
const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners');
const kMaxEventTargetListenersWarned =
Symbol('events.maxEventTargetListenersWarned');
const kWatermarkData = SymbolFor('nodejs.watermarkData');

let EventEmitterAsyncResource;
// The EventEmitterAsyncResource has to be initialized lazily because event.js
Expand Down Expand Up @@ -999,25 +1005,44 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
* Returns an `AsyncIterator` that iterates `event` events.
* @param {EventEmitter} emitter
* @param {string | symbol} event
* @param {{ signal: AbortSignal; }} [options]
* @param {{
* signal: AbortSignal;
* close?: string[];
* highWatermark?: number,
* lowWatermark?: number
* }} [options]
* @returns {AsyncIterator}
*/
function on(emitter, event, options) {
const signal = options?.signal;
function on(emitter, event, options = {}) {
// Parameters validation
const signal = options.signal;
validateAbortSignal(signal, 'options.signal');
if (signal?.aborted)
throw new AbortError(undefined, { cause: signal?.reason });

const unconsumedEvents = [];
const unconsumedPromises = [];
const highWatermark = options.highWatermark ?? NumberMAX_SAFE_INTEGER;
validateInteger(highWatermark, 'options.highWatermark', 1);
const lowWatermark = options.lowWatermark ?? 1;
validateInteger(lowWatermark, 'options.lowWatermark', 1);

// Preparing controlling queues and variables
FixedQueue ??= require('internal/fixed_queue');
const unconsumedEvents = new FixedQueue();
const unconsumedPromises = new FixedQueue();
let paused = false;
let error = null;
let finished = false;
let size = 0;

const iterator = ObjectSetPrototypeOf({
next() {
// First, we consume all unread events
const value = unconsumedEvents.shift();
if (value) {
if (size) {
const value = unconsumedEvents.shift();
size--;
if (paused && size < lowWatermark) {
emitter.resume();
paused = false;
}
return PromiseResolve(createIterResult(value, false));
}

Expand All @@ -1032,9 +1057,7 @@ function on(emitter, event, options) {
}

// If the iterator is finished, resolve to done
if (finished) {
return PromiseResolve(createIterResult(undefined, true));
}
if (finished) return closeHandler();

// Wait until an event happens
return new Promise(function(resolve, reject) {
Expand All @@ -1043,46 +1066,62 @@ function on(emitter, event, options) {
},

return() {
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);

if (signal) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}

finished = true;

for (const promise of unconsumedPromises) {
promise.resolve(createIterResult(undefined, true));
}

return PromiseResolve(createIterResult(undefined, true));
return closeHandler();
},

throw(err) {
if (!err || !(err instanceof Error)) {
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
'Error', err);
}
error = err;
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
errorHandler(err);
},

[SymbolAsyncIterator]() {
return this;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part wasn't needed because this method is inherited from the parent prototype

},
[kWatermarkData]: {
/**
* The current queue size
*/
get size() {
return size;
},
/**
* The low watermark. The emitter is resumed every time size is lower than it
*/
get low() {
return lowWatermark;
},
/**
* The high watermark. The emitter is paused every time size is higher than it
*/
get high() {
return highWatermark;
},
/**
* It checks wether the emitter is paused by the watermark controller or not
*/
get isPaused() {
return paused;
}
},
}, AsyncIteratorPrototype);

eventTargetAgnosticAddListener(emitter, event, eventHandler);
// Adding event handlers
const { addEventListener, removeAll } = listenersController();
kFirstEventParam ??= require('internal/events/symbols').kFirstEventParam;
addEventListener(emitter, event, options[kFirstEventParam] ? eventHandler : function(...args) {
return eventHandler(args);
});
if (event !== 'error' && typeof emitter.on === 'function') {
emitter.on('error', errorHandler);
addEventListener(emitter, 'error', errorHandler);
}
const closeEvents = options?.close;
if (closeEvents?.length) {
for (let i = 0; i < closeEvents.length; i++) {
addEventListener(emitter, closeEvents[i], closeHandler);
}
}

if (signal) {
eventTargetAgnosticAddListener(
signal,
Expand All @@ -1097,27 +1136,48 @@ function on(emitter, event, options) {
errorHandler(new AbortError(undefined, { cause: signal?.reason }));
}

function eventHandler(...args) {
const promise = ArrayPrototypeShift(unconsumedPromises);
if (promise) {
promise.resolve(createIterResult(args, false));
} else {
unconsumedEvents.push(args);
}
function eventHandler(value) {
if (unconsumedPromises.isEmpty()) {
size++;
if (!paused && size > highWatermark) {
paused = true;
emitter.pause();
}
unconsumedEvents.push(value);
} else unconsumedPromises.shift().resolve(createIterResult(value, false));
}

function errorHandler(err) {
finished = true;
if (unconsumedPromises.isEmpty()) error = err;
else unconsumedPromises.shift().reject(err);
Farenheith marked this conversation as resolved.
Show resolved Hide resolved

const toError = ArrayPrototypeShift(unconsumedPromises);
closeHandler();
}

if (toError) {
toError.reject(err);
} else {
// The next time we call next()
error = err;
function closeHandler() {
removeAll();
finished = true;
const doneResult = createIterResult(undefined, true);
while (!unconsumedPromises.isEmpty()) {
unconsumedPromises.shift().resolve(doneResult);
Farenheith marked this conversation as resolved.
Show resolved Hide resolved
}

iterator.return();
return PromiseResolve(doneResult);
}
}

function listenersController() {
const listeners = [];

return {
addEventListener(emitter, event, handler, flags) {
eventTargetAgnosticAddListener(emitter, event, handler, flags);
ArrayPrototypePush(listeners, [emitter, event, handler, flags]);
},
removeAll() {
while (listeners.length > 0) {
ReflectApply(eventTargetAgnosticRemoveListener, undefined, ArrayPrototypePop(listeners));
}
}
};
}
11 changes: 11 additions & 0 deletions lib/internal/events/symbols.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'use strict';

const {
Symbol,
} = primordials;

const kFirstEventParam = Symbol('nodejs.kFirstEventParam');

module.exports = {
kFirstEventParam,
};
45 changes: 9 additions & 36 deletions lib/internal/readline/interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const {
kSubstringSearch,
} = require('internal/readline/utils');
let emitKeypressEvents;
let kFirstEventParam;
const {
clearScreenDown,
cursorTo,
Expand All @@ -70,9 +71,6 @@ const {

const { StringDecoder } = require('string_decoder');

// Lazy load Readable for startup performance.
let Readable;

const kHistorySize = 30;
const kMaxUndoRedoStackSize = 2048;
const kMincrlfDelay = 100;
Expand Down Expand Up @@ -1346,40 +1344,15 @@ class Interface extends InterfaceConstructor {
*/
[SymbolAsyncIterator]() {
if (this[kLineObjectStream] === undefined) {
if (Readable === undefined) {
Readable = require('stream').Readable;
}
const readable = new Readable({
objectMode: true,
read: () => {
this.resume();
},
destroy: (err, cb) => {
this.off('line', lineListener);
this.off('close', closeListener);
this.close();
cb(err);
},
});
const lineListener = (input) => {
if (!readable.push(input)) {
// TODO(rexagod): drain to resume flow
this.pause();
}
};
const closeListener = () => {
readable.push(null);
};
const errorListener = (err) => {
readable.destroy(err);
};
this.on('error', errorListener);
this.on('line', lineListener);
this.on('close', closeListener);
this[kLineObjectStream] = readable;
kFirstEventParam ??= require('internal/events/symbols').kFirstEventParam;
this[kLineObjectStream] = EventEmitter.on(
this, 'line', {
close: ['close'],
highWatermark: 1024,
[kFirstEventParam]: true,
});
}

return this[kLineObjectStream][SymbolAsyncIterator]();
return this[kLineObjectStream];
}
}

Expand Down
7 changes: 7 additions & 0 deletions test/benchmark/test-bechmark-readline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
'use strict';

require('../common');

const runBenchmark = require('../common/benchmark');

runBenchmark('readline', { NODEJS_BENCHMARK_ZERO_ALLOWED: 1 });
Loading