diff --git a/src/completion-listener.spec.ts b/src/completion-listener.spec.ts index c4931798..4b9a19c7 100644 --- a/src/completion-listener.spec.ts +++ b/src/completion-listener.spec.ts @@ -1,3 +1,4 @@ +import { getEventListeners, getMaxListeners } from 'events'; import { TestScheduler } from 'rxjs/testing'; import { CloseEvent } from './command'; @@ -56,6 +57,16 @@ describe('listen', () => { await expect(result).resolves.toHaveLength(0); }); + it('does not leak memory when listening for abort signals', () => { + const abortCtrl = new AbortController(); + const maxListeners = getMaxListeners(abortCtrl.signal); + createController().listen( + Array.from({ length: maxListeners + 1 }, () => new FakeCommand()), + abortCtrl.signal, + ); + expect(getEventListeners(abortCtrl.signal, 'abort')).toHaveLength(1); + }); + it('check for success once all commands have emitted at least a single close event', async () => { const finallyCallback = jest.fn(); const result = createController().listen(commands).finally(finallyCallback); diff --git a/src/completion-listener.ts b/src/completion-listener.ts index 913594d1..eea8cfe6 100644 --- a/src/completion-listener.ts +++ b/src/completion-listener.ts @@ -1,5 +1,5 @@ import * as Rx from 'rxjs'; -import { delay, filter, map, switchMap, take } from 'rxjs/operators'; +import { delay, filter, map, share, switchMap, take } from 'rxjs/operators'; import { CloseEvent, Command } from './command'; @@ -101,6 +101,9 @@ export class CompletionListener { // without an immediate delay delay(0, this.scheduler), map(() => undefined), + // #502 - node might warn of too many active listeners on this object if it isn't shared, + // as each command subscribes to abort event over and over + share(), ); const closeStreams = commands.map((command) =>