Skip to content

Commit

Permalink
feat: Asynchronous event iteration node polyfill (denoland/deno#4016)
Browse files Browse the repository at this point in the history
  • Loading branch information
cknight authored Feb 17, 2020
1 parent 922d56e commit 4e17917
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 1 deletion.
118 changes: 118 additions & 0 deletions node/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,121 @@ export function once(
}
});
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function createIterResult(value: any, done: boolean): IteratorResult<any> {
return { value, done };
}

interface AsyncInterable {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
next(): Promise<IteratorResult<any, any>>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return(): Promise<IteratorResult<any, any>>;
throw(err: Error): void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[Symbol.asyncIterator](): any;
}

/**
* Returns an AsyncIterator that iterates eventName events. It will throw if
* the EventEmitter emits 'error'. It removes all listeners when exiting the
* loop. The value returned by each iteration is an array composed of the
* emitted event arguments.
*/
export function on(
emitter: EventEmitter,
event: string | symbol
): AsyncInterable {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const unconsumedEventValues: any[] = [];
const unconsumedPromises = [];
let error = null;
let finished = false;

const iterator = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
next(): Promise<IteratorResult<any>> {
// First, we consume all unread events
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const value: any = unconsumedEventValues.shift();
if (value) {
return Promise.resolve(createIterResult(value, false));
}

// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
const p: Promise<never> = Promise.reject(error);
// Only the first element errors
error = null;
return p;
}

// If the iterator is finished, resolve to done
if (finished) {
return Promise.resolve(createIterResult(undefined, true));
}

// Wait until an event happens
return new Promise(function(resolve, reject) {
unconsumedPromises.push({ resolve, reject });
});
},

// eslint-disable-next-line @typescript-eslint/no-explicit-any
return(): Promise<IteratorResult<any>> {
emitter.removeListener(event, eventHandler);
emitter.removeListener("error", errorHandler);
finished = true;

for (const promise of unconsumedPromises) {
promise.resolve(createIterResult(undefined, true));
}

return Promise.resolve(createIterResult(undefined, true));
},

throw(err: Error): void {
error = err;
emitter.removeListener(event, eventHandler);
emitter.removeListener("error", errorHandler);
},

// eslint-disable-next-line @typescript-eslint/no-explicit-any
[Symbol.asyncIterator](): AsyncIterable<any> {
return this;
}
};

emitter.on(event, eventHandler);
emitter.on("error", errorHandler);

return iterator;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function eventHandler(...args: any[]): void {
const promise = unconsumedPromises.shift();
if (promise) {
promise.resolve(createIterResult(args, false));
} else {
unconsumedEventValues.push(args);
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function errorHandler(err: any): void {
finished = true;

const toError = unconsumedPromises.shift();
if (toError) {
toError.reject(err);
} else {
// The next time we call next()
error = err;
}

iterator.return();
}
}
182 changes: 181 additions & 1 deletion node/events_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
fail,
assertThrows
} from "../testing/asserts.ts";
import EventEmitter, { WrappedFunction, once } from "./events.ts";
import EventEmitter, { WrappedFunction, once, on } from "./events.ts";

const shouldNeverBeEmitted: Function = () => {
fail("Should never be called");
Expand Down Expand Up @@ -439,3 +439,183 @@ test({
assertEquals(events, ["errorMonitor event", "error"]);
}
});

test({
name: "asyncronous iteration of events are handled as expected",
async fn() {
const ee = new EventEmitter();
setTimeout(() => {
ee.emit("foo", "bar");
ee.emit("bar", 24);
ee.emit("foo", 42);
}, 0);

const iterable = on(ee, "foo");

const expected = [["bar"], [42]];

for await (const event of iterable) {
const current = expected.shift();

assertEquals(current, event);

if (expected.length === 0) {
break;
}
}
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});

test({
name: "asyncronous error handling of emitted events works as expected",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");
setTimeout(() => {
ee.emit("error", _err);
}, 0);

const iterable = on(ee, "foo");
let thrown = false;

try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const event of iterable) {
fail("no events should be processed due to the error thrown");
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
}
});

test({
name: "error thrown during asyncronous processing of events is handled",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");
setTimeout(() => {
ee.emit("foo", 42);
ee.emit("error", _err);
}, 0);

const iterable = on(ee, "foo");
const expected = [[42]];
let thrown = false;

try {
for await (const event of iterable) {
const current = expected.shift();
assertEquals(current, event);
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});

test({
name:
"error thrown in processing loop of asyncronous event prevents processing of additional events",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");

setTimeout(() => {
ee.emit("foo", 42);
ee.emit("foo", 999);
}, 0);

try {
for await (const event of on(ee, "foo")) {
assertEquals(event, [42]);
throw _err;
}
} catch (err) {
assertEquals(err, _err);
}

assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});

test({
name: "asyncronous iterator next() works as expected",
async fn() {
const ee = new EventEmitter();
const iterable = on(ee, "foo");

setTimeout(function() {
ee.emit("foo", "bar");
ee.emit("foo", 42);
iterable.return();
}, 0);

const results = await Promise.all([
iterable.next(),
iterable.next(),
iterable.next()
]);

assertEquals(results, [
{
value: ["bar"],
done: false
},
{
value: [42],
done: false
},
{
value: undefined,
done: true
}
]);

assertEquals(await iterable.next(), {
value: undefined,
done: true
});
}
});

test({
name: "async iterable throw handles various scenarios",
async fn() {
const ee = new EventEmitter();
const iterable = on(ee, "foo");

setTimeout(() => {
ee.emit("foo", "bar");
ee.emit("foo", 42); // lost in the queue
iterable.throw(_err);
}, 0);

const _err = new Error("kaboom");
let thrown = false;

const expected = [["bar"], [42]];

try {
for await (const event of iterable) {
assertEquals(event, expected.shift());
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
assertEquals(expected.length, 0);
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});

0 comments on commit 4e17917

Please sign in to comment.