diff --git a/node/events.ts b/node/events.ts index 92fe7a7042f2..f4b3e0bc590c 100644 --- a/node/events.ts +++ b/node/events.ts @@ -393,3 +393,121 @@ export function once( } }); } + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function createIterResult(value: any, done: boolean): IteratorResult { + return { value, done }; +} + +interface AsyncInterable { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + next(): Promise>; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return(): Promise>; + throw(err: Error): void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [Symbol.asyncIterator](): any; +} + +/** + * Returns an AsyncIterator that iterates eventName events. It will throw if + * the EventEmitter emits 'error'. It removes all listeners when exiting the + * loop. The value returned by each iteration is an array composed of the + * emitted event arguments. + */ +export function on( + emitter: EventEmitter, + event: string | symbol +): AsyncInterable { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const unconsumedEventValues: any[] = []; + const unconsumedPromises = []; + let error = null; + let finished = false; + + const iterator = { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + next(): Promise> { + // First, we consume all unread events + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const value: any = unconsumedEventValues.shift(); + if (value) { + return Promise.resolve(createIterResult(value, false)); + } + + // Then we error, if an error happened + // This happens one time if at all, because after 'error' + // we stop listening + if (error) { + const p: Promise = Promise.reject(error); + // Only the first element errors + error = null; + return p; + } + + // If the iterator is finished, resolve to done + if (finished) { + return Promise.resolve(createIterResult(undefined, true)); + } + + // Wait until an event happens + return new Promise(function(resolve, reject) { + unconsumedPromises.push({ resolve, reject }); + }); + }, + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return(): Promise> { + emitter.removeListener(event, eventHandler); + emitter.removeListener("error", errorHandler); + finished = true; + + for (const promise of unconsumedPromises) { + promise.resolve(createIterResult(undefined, true)); + } + + return Promise.resolve(createIterResult(undefined, true)); + }, + + throw(err: Error): void { + error = err; + emitter.removeListener(event, eventHandler); + emitter.removeListener("error", errorHandler); + }, + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [Symbol.asyncIterator](): AsyncIterable { + return this; + } + }; + + emitter.on(event, eventHandler); + emitter.on("error", errorHandler); + + return iterator; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + function eventHandler(...args: any[]): void { + const promise = unconsumedPromises.shift(); + if (promise) { + promise.resolve(createIterResult(args, false)); + } else { + unconsumedEventValues.push(args); + } + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + function errorHandler(err: any): void { + finished = true; + + const toError = unconsumedPromises.shift(); + if (toError) { + toError.reject(err); + } else { + // The next time we call next() + error = err; + } + + iterator.return(); + } +} diff --git a/node/events_test.ts b/node/events_test.ts index f86265d724d9..c89df298aa9e 100644 --- a/node/events_test.ts +++ b/node/events_test.ts @@ -5,7 +5,7 @@ import { fail, assertThrows } from "../testing/asserts.ts"; -import EventEmitter, { WrappedFunction, once } from "./events.ts"; +import EventEmitter, { WrappedFunction, once, on } from "./events.ts"; const shouldNeverBeEmitted: Function = () => { fail("Should never be called"); @@ -439,3 +439,183 @@ test({ assertEquals(events, ["errorMonitor event", "error"]); } }); + +test({ + name: "asyncronous iteration of events are handled as expected", + async fn() { + const ee = new EventEmitter(); + setTimeout(() => { + ee.emit("foo", "bar"); + ee.emit("bar", 24); + ee.emit("foo", 42); + }, 0); + + const iterable = on(ee, "foo"); + + const expected = [["bar"], [42]]; + + for await (const event of iterable) { + const current = expected.shift(); + + assertEquals(current, event); + + if (expected.length === 0) { + break; + } + } + assertEquals(ee.listenerCount("foo"), 0); + assertEquals(ee.listenerCount("error"), 0); + } +}); + +test({ + name: "asyncronous error handling of emitted events works as expected", + async fn() { + const ee = new EventEmitter(); + const _err = new Error("kaboom"); + setTimeout(() => { + ee.emit("error", _err); + }, 0); + + const iterable = on(ee, "foo"); + let thrown = false; + + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const event of iterable) { + fail("no events should be processed due to the error thrown"); + } + } catch (err) { + thrown = true; + assertEquals(err, _err); + } + assertEquals(thrown, true); + } +}); + +test({ + name: "error thrown during asyncronous processing of events is handled", + async fn() { + const ee = new EventEmitter(); + const _err = new Error("kaboom"); + setTimeout(() => { + ee.emit("foo", 42); + ee.emit("error", _err); + }, 0); + + const iterable = on(ee, "foo"); + const expected = [[42]]; + let thrown = false; + + try { + for await (const event of iterable) { + const current = expected.shift(); + assertEquals(current, event); + } + } catch (err) { + thrown = true; + assertEquals(err, _err); + } + assertEquals(thrown, true); + assertEquals(ee.listenerCount("foo"), 0); + assertEquals(ee.listenerCount("error"), 0); + } +}); + +test({ + name: + "error thrown in processing loop of asyncronous event prevents processing of additional events", + async fn() { + const ee = new EventEmitter(); + const _err = new Error("kaboom"); + + setTimeout(() => { + ee.emit("foo", 42); + ee.emit("foo", 999); + }, 0); + + try { + for await (const event of on(ee, "foo")) { + assertEquals(event, [42]); + throw _err; + } + } catch (err) { + assertEquals(err, _err); + } + + assertEquals(ee.listenerCount("foo"), 0); + assertEquals(ee.listenerCount("error"), 0); + } +}); + +test({ + name: "asyncronous iterator next() works as expected", + async fn() { + const ee = new EventEmitter(); + const iterable = on(ee, "foo"); + + setTimeout(function() { + ee.emit("foo", "bar"); + ee.emit("foo", 42); + iterable.return(); + }, 0); + + const results = await Promise.all([ + iterable.next(), + iterable.next(), + iterable.next() + ]); + + assertEquals(results, [ + { + value: ["bar"], + done: false + }, + { + value: [42], + done: false + }, + { + value: undefined, + done: true + } + ]); + + assertEquals(await iterable.next(), { + value: undefined, + done: true + }); + } +}); + +test({ + name: "async iterable throw handles various scenarios", + async fn() { + const ee = new EventEmitter(); + const iterable = on(ee, "foo"); + + setTimeout(() => { + ee.emit("foo", "bar"); + ee.emit("foo", 42); // lost in the queue + iterable.throw(_err); + }, 0); + + const _err = new Error("kaboom"); + let thrown = false; + + const expected = [["bar"], [42]]; + + try { + for await (const event of iterable) { + assertEquals(event, expected.shift()); + } + } catch (err) { + thrown = true; + assertEquals(err, _err); + } + assertEquals(thrown, true); + assertEquals(expected.length, 0); + assertEquals(ee.listenerCount("foo"), 0); + assertEquals(ee.listenerCount("error"), 0); + } +});