From 95b6c274d8d6b5f0cf2f7f8ea39ca403c2f8fcf9 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Sat, 2 Dec 2023 00:24:28 -0500 Subject: [PATCH 1/3] events: remove abort listener from signal in `on` the `abortHandler` function is declared within the scope of the `events.on` function so cannot be removed by the caller which can lead to a memory leak adding the abort listener using the `addAbortListener` helper returns a disposable that can be used to clean up the listener when the iterator is exited Fixes: https://github.com/nodejs/node/issues/51010 --- lib/events.js | 2 +- .../parallel/test-events-on-async-iterator.js | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/lib/events.js b/lib/events.js index 3ba2484ece938f..ef77af7ab3f4dc 100644 --- a/lib/events.js +++ b/lib/events.js @@ -1169,7 +1169,7 @@ function on(emitter, event, options = kEmptyObject) { } if (signal) { kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; - eventTargetAgnosticAddListener( + addEventListener( signal, 'abort', abortListener, diff --git a/test/parallel/test-events-on-async-iterator.js b/test/parallel/test-events-on-async-iterator.js index 057af8537f3275..fd31d1a33af6ab 100644 --- a/test/parallel/test-events-on-async-iterator.js +++ b/test/parallel/test-events-on-async-iterator.js @@ -6,6 +6,7 @@ const assert = require('assert'); const { on, EventEmitter } = require('events'); const { NodeEventTarget, + kEvents } = require('internal/event_target'); async function basic() { @@ -372,6 +373,28 @@ async function abortableOnAfterDone() { }); } +async function abortListenerRemovedAfterComplete() { + const ee = new EventEmitter(); + const ac = new AbortController(); + + const i = setInterval(() => ee.emit('foo', 'foo'), 1); + try { + // Return case + const endedIterator = on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].size > 0); + endedIterator.return(); + assert.strictEqual(ac.signal[kEvents].size, 0); + + // Throw case + const throwIterator = on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].size > 0); + throwIterator.throw(new Error()); + assert.strictEqual(ac.signal[kEvents].size, 0); + } finally { + clearInterval(i); + } +} + async function run() { const funcs = [ basic, @@ -391,6 +414,7 @@ async function run() { eventTargetAbortableOnAfter, eventTargetAbortableOnAfter2, abortableOnAfterDone, + abortListenerRemovedAfterComplete, ]; for (const fn of funcs) { From 5819b36629137d42793a098987930fb00f24b867 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 13 Mar 2024 21:10:21 -0400 Subject: [PATCH 2/3] add abort listener using `addAbortListener` util and invoke disposable interface --- lib/events.js | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/lib/events.js b/lib/events.js index ef77af7ab3f4dc..996658e1d3f565 100644 --- a/lib/events.js +++ b/lib/events.js @@ -1167,14 +1167,8 @@ function on(emitter, event, options = kEmptyObject) { addEventListener(emitter, closeEvents[i], closeHandler); } } - if (signal) { - kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; - addEventListener( - signal, - 'abort', - abortListener, - { __proto__: null, once: true, [kResistStopPropagation]: true }); - } + + const abortListenerDisposable = signal ? addAbortListener(signal, abortListener) : null; return iterator; @@ -1201,6 +1195,7 @@ function on(emitter, event, options = kEmptyObject) { } function closeHandler() { + abortListenerDisposable?.[SymbolDispose](); removeAll(); finished = true; const doneResult = createIterResult(undefined, true); From 719f745c5f617869076a51691c12372a8ec73f42 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 14 Mar 2024 10:00:41 -0400 Subject: [PATCH 3/3] test: add abort test case and check either entire event map or abort listener list --- test/parallel/test-events-on-async-iterator.js | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/test/parallel/test-events-on-async-iterator.js b/test/parallel/test-events-on-async-iterator.js index fd31d1a33af6ab..2a68bf987dde28 100644 --- a/test/parallel/test-events-on-async-iterator.js +++ b/test/parallel/test-events-on-async-iterator.js @@ -379,17 +379,25 @@ async function abortListenerRemovedAfterComplete() { const i = setInterval(() => ee.emit('foo', 'foo'), 1); try { + // Below: either the kEvents map is empty or the 'abort' listener list is empty + // Return case const endedIterator = on(ee, 'foo', { signal: ac.signal }); - assert.ok(ac.signal[kEvents].size > 0); + assert.ok(ac.signal[kEvents].get('abort').size > 0); endedIterator.return(); - assert.strictEqual(ac.signal[kEvents].size, 0); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); // Throw case const throwIterator = on(ee, 'foo', { signal: ac.signal }); - assert.ok(ac.signal[kEvents].size > 0); + assert.ok(ac.signal[kEvents].get('abort').size > 0); throwIterator.throw(new Error()); - assert.strictEqual(ac.signal[kEvents].size, 0); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); + + // Abort case + on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].get('abort').size > 0); + ac.abort(new Error()); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); } finally { clearInterval(i); }