Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous event iteration node polyfill #4016

Merged
merged 2 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions std/node/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,121 @@ export function once(
}
});
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function createIterResult(value: any, done: boolean): IteratorResult<any> {
return { value, done };
}

interface AsyncInterable {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
next(): Promise<IteratorResult<any, any>>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return(): Promise<IteratorResult<any, any>>;
throw(err: Error): void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[Symbol.asyncIterator](): any;
}

/**
* Returns an AsyncIterator that iterates eventName events. It will throw if
* the EventEmitter emits 'error'. It removes all listeners when exiting the
* loop. The value returned by each iteration is an array composed of the
* emitted event arguments.
*/
export function on(
emitter: EventEmitter,
event: string | symbol
): AsyncInterable {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const unconsumedEventValues: any[] = [];
const unconsumedPromises = [];
let error = null;
let finished = false;

const iterator = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
next(): Promise<IteratorResult<any>> {
// First, we consume all unread events
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const value: any = unconsumedEventValues.shift();
if (value) {
return Promise.resolve(createIterResult(value, false));
}

// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
const p: Promise<never> = Promise.reject(error);
// Only the first element errors
error = null;
return p;
}

// If the iterator is finished, resolve to done
if (finished) {
return Promise.resolve(createIterResult(undefined, true));
}

// Wait until an event happens
return new Promise(function(resolve, reject) {
unconsumedPromises.push({ resolve, reject });
});
},

// eslint-disable-next-line @typescript-eslint/no-explicit-any
return(): Promise<IteratorResult<any>> {
emitter.removeListener(event, eventHandler);
emitter.removeListener("error", errorHandler);
finished = true;

for (const promise of unconsumedPromises) {
promise.resolve(createIterResult(undefined, true));
}

return Promise.resolve(createIterResult(undefined, true));
},

throw(err: Error): void {
error = err;
emitter.removeListener(event, eventHandler);
emitter.removeListener("error", errorHandler);
},

// eslint-disable-next-line @typescript-eslint/no-explicit-any
[Symbol.asyncIterator](): AsyncIterable<any> {
return this;
}
};

emitter.on(event, eventHandler);
emitter.on("error", errorHandler);

return iterator;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function eventHandler(...args: any[]): void {
const promise = unconsumedPromises.shift();
if (promise) {
promise.resolve(createIterResult(args, false));
} else {
unconsumedEventValues.push(args);
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function errorHandler(err: any): void {
finished = true;

const toError = unconsumedPromises.shift();
if (toError) {
toError.reject(err);
} else {
// The next time we call next()
error = err;
}

iterator.return();
}
}
182 changes: 181 additions & 1 deletion std/node/events_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
fail,
assertThrows
} from "../testing/asserts.ts";
import EventEmitter, { WrappedFunction, once } from "./events.ts";
import EventEmitter, { WrappedFunction, once, on } from "./events.ts";

const shouldNeverBeEmitted: Function = () => {
fail("Should never be called");
Expand Down Expand Up @@ -439,3 +439,183 @@ test({
assertEquals(events, ["errorMonitor event", "error"]);
}
});

test({
name: "asyncronous iteration of events are handled as expected",
async fn() {
const ee = new EventEmitter();
setTimeout(() => {
ee.emit("foo", "bar");
ee.emit("bar", 24);
ee.emit("foo", 42);
}, 0);

const iterable = on(ee, "foo");

const expected = [["bar"], [42]];

for await (const event of iterable) {
const current = expected.shift();

assertEquals(current, event);

if (expected.length === 0) {
break;
}
}
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});

test({
name: "asyncronous error handling of emitted events works as expected",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");
setTimeout(() => {
ee.emit("error", _err);
}, 0);

const iterable = on(ee, "foo");
let thrown = false;

try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const event of iterable) {
fail("no events should be processed due to the error thrown");
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
}
});

test({
name: "error thrown during asyncronous processing of events is handled",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");
setTimeout(() => {
ee.emit("foo", 42);
ee.emit("error", _err);
}, 0);

const iterable = on(ee, "foo");
const expected = [[42]];
let thrown = false;

try {
for await (const event of iterable) {
const current = expected.shift();
assertEquals(current, event);
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});

test({
name:
"error thrown in processing loop of asyncronous event prevents processing of additional events",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");

setTimeout(() => {
ee.emit("foo", 42);
ee.emit("foo", 999);
}, 0);

try {
for await (const event of on(ee, "foo")) {
assertEquals(event, [42]);
throw _err;
}
} catch (err) {
assertEquals(err, _err);
}

assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});

test({
name: "asyncronous iterator next() works as expected",
async fn() {
const ee = new EventEmitter();
const iterable = on(ee, "foo");

setTimeout(function() {
ee.emit("foo", "bar");
ee.emit("foo", 42);
iterable.return();
}, 0);

const results = await Promise.all([
iterable.next(),
iterable.next(),
iterable.next()
]);

assertEquals(results, [
{
value: ["bar"],
done: false
},
{
value: [42],
done: false
},
{
value: undefined,
done: true
}
]);

assertEquals(await iterable.next(), {
value: undefined,
done: true
});
}
});

test({
name: "async iterable throw handles various scenarios",
async fn() {
const ee = new EventEmitter();
const iterable = on(ee, "foo");

setTimeout(() => {
ee.emit("foo", "bar");
ee.emit("foo", 42); // lost in the queue
iterable.throw(_err);
}, 0);

const _err = new Error("kaboom");
let thrown = false;

const expected = [["bar"], [42]];

try {
for await (const event of iterable) {
assertEquals(event, expected.shift());
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
assertEquals(expected.length, 0);
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});