Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve compatibility of AsyncIterable helpers for polyfills (Babel/Hermes related) #165

Merged
merged 7 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/yellow-hounds-heal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'wonka': patch
---

Improve compatibility of `fromAsyncIterable` and `toAsyncIterable`. The `toAsyncIterable` will now output an object that's both an `AsyncIterator` and an `AsyncIterable`. Both helpers will now use a polyfill for `Symbol.asyncIterator` to improve compatibility with the Hermes engine and Babel transpilation.
17 changes: 10 additions & 7 deletions src/__tests__/sinks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,19 @@ describe('toAsyncIterable', () => {
};

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
const next$ = asyncIterator.next();

expect(pulls).toBe(1);
sink!(push(0));
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(pulls).toBe(2);
expect(await next$).toEqual({ value: 0, done: false });
expect(pulls).toBe(1);

sink!(push(1));
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(pulls).toBe(3);
expect(pulls).toBe(2);

sink!(SignalKind.End);
expect(await asyncIterator.next()).toEqual({ done: true });
expect(pulls).toBe(3);
expect(pulls).toBe(2);
});

it('buffers actively pushed values', async () => {
Expand All @@ -273,13 +273,14 @@ describe('toAsyncIterable', () => {
};

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
const next$ = asyncIterator.next();

sink!(push(0));
sink!(push(1));
sink!(SignalKind.End);

expect(pulls).toBe(1);
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(await next$).toEqual({ value: 0, done: false });
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(await asyncIterator.next()).toEqual({ done: true });
});
Expand All @@ -298,6 +299,7 @@ describe('toAsyncIterable', () => {
};

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
asyncIterator.next();
expect(pulls).toBe(1);

let resolved = false;
Expand Down Expand Up @@ -330,9 +332,10 @@ describe('toAsyncIterable', () => {
};

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
const next$ = asyncIterator.next();

sink!(push(0));
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(await next$).toEqual({ value: 0, done: false });
expect(await asyncIterator.return!()).toEqual({ done: true });

sink!(push(1));
Expand Down
27 changes: 27 additions & 0 deletions src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types';

declare global {
interface SymbolConstructor {
readonly observable: symbol;
}
}

/** Placeholder {@link TeardownFn | teardown functions} that's a no-op.
* @see {@link TeardownFn} for the definition and usage of teardowns.
* @internal
Expand Down Expand Up @@ -39,3 +45,24 @@ export function push<T>(value: T): Push<T> {
0: value,
} as Push<T>;
}

/** Returns the well-known symbol specifying the default AsyncIterator.
* @internal
*/
export const asyncIteratorSymbol = (): typeof Symbol.asyncIterator =>
(typeof Symbol === 'function' && Symbol.asyncIterator) || ('@@asyncIterator' as any);

/** Returns the well-known symbol specifying the default ES Observable.
* @privateRemarks
* This symbol is used to mark an object as a default ES Observable. By the specification, an object
* that abides by the default Observable implementation must carry a method set to this well-known
* symbol that returns the Observable implementation. It's common for this object to be an
* Observable itself and return itself on this method.
*
* @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
* between Observable implementations.
*
* @internal
*/
export const observableSymbol = (): typeof Symbol.observable =>
(typeof Symbol === 'function' && Symbol.observable) || ('@@observable' as any);
23 changes: 1 addition & 22 deletions src/observable.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { Source, SignalKind, TalkbackKind } from './types';
import { push, start, talkbackPlaceholder } from './helpers';

declare global {
interface SymbolConstructor {
readonly observable: symbol;
}
}
import { push, start, talkbackPlaceholder, observableSymbol } from './helpers';

/** A definition of the ES Observable Subscription type that is returned by
* {@link Observable.subscribe}
Expand Down Expand Up @@ -118,21 +112,6 @@ interface Observable<T> {
[Symbol.observable](): Observable<T>;
}

/** Returns the well-known symbol specifying the default ES Observable.
* @privateRemarks
* This symbol is used to mark an object as a default ES Observable. By the specification, an object
* that abides by the default Observable implementation must carry a method set to this well-known
* symbol that returns the Observable implementation. It's common for this object to be an
* Observable itself and return itself on this method.
*
* @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
* between Observable implementations.
*
* @internal
*/
const observableSymbol = (): typeof Symbol.observable =>
Symbol.observable || ('@@observable' as any);

/** Converts an ES Observable to a {@link Source}.
* @param input - The {@link ObservableLike} object that will be converted.
* @returns A {@link Source} wrapping the passed Observable.
Expand Down
91 changes: 51 additions & 40 deletions src/sinks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Source, Subscription, TalkbackKind, SignalKind } from './types';
import { talkbackPlaceholder } from './helpers';
import { Source, Subscription, TalkbackKind, SignalKind, SourceIterable } from './types';
import { talkbackPlaceholder, asyncIteratorSymbol } from './helpers';

/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
* @param subscriber - A callback function called for each issued value.
Expand Down Expand Up @@ -124,49 +124,60 @@ const doneResult = { done: true } as IteratorReturnResult<void>;
* }
* ```
*/
export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({
[Symbol.asyncIterator](): AsyncIterator<T> {
const buffer: T[] = [];
export const toAsyncIterable = <T>(source: Source<T>): SourceIterable<T> => {
const buffer: T[] = [];

let ended = false;
let talkback = talkbackPlaceholder;
let next: ((value: IteratorResult<T>) => void) | void;
let ended = false;
let started = false;
let pulled = false;
let talkback = talkbackPlaceholder;
let next: ((value: IteratorResult<T>) => void) | void;

source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
if (next) next = next(doneResult);
ended = true;
} else if (signal.tag === SignalKind.Start) {
(talkback = signal[0])(TalkbackKind.Pull);
} else if (next) {
next = next({ value: signal[0], done: false });
} else {
buffer.push(signal[0]);
return {
async next(): Promise<IteratorResult<T>> {
if (!started) {
started = true;
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
if (next) next = next(doneResult);
ended = true;
} else if (signal.tag === SignalKind.Start) {
pulled = true;
(talkback = signal[0])(TalkbackKind.Pull);
} else {
pulled = false;
if (next) {
next = next({ value: signal[0], done: false });
} else {
buffer.push(signal[0]);
}
}
});
}
});

return {
async next(): Promise<IteratorResult<T>> {
if (ended && !buffer.length) {
return doneResult;
} else if (!ended && buffer.length <= 1) {
talkback(TalkbackKind.Pull);
}

return buffer.length
? { value: buffer.shift()!, done: false }
: new Promise(resolve => (next = resolve));
},
async return(): Promise<IteratorReturnResult<void>> {
if (!ended) next = talkback(TalkbackKind.Close);
ended = true;
if (ended && !buffer.length) {
return doneResult;
},
};
},
});
} else if (!ended && !pulled && buffer.length <= 1) {
pulled = true;
talkback(TalkbackKind.Pull);
}

return buffer.length
? { value: buffer.shift()!, done: false }
: new Promise(resolve => (next = resolve));
},
async return(): Promise<IteratorReturnResult<void>> {
if (!ended) next = talkback(TalkbackKind.Close);
ended = true;
return doneResult;
},
[asyncIteratorSymbol()](): SourceIterable<T> {
return this;
},
};
};

/** Subscribes to a given source and collects all synchronous values into an array.
* @param source - A {@link Source}.
Expand Down
14 changes: 11 additions & 3 deletions src/sources.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
import {
push,
start,
talkbackPlaceholder,
teardownPlaceholder,
asyncIteratorSymbol,
} from './helpers';
import { share } from './operators';

/** Helper creating a Source from a factory function when it's subscribed to.
Expand Down Expand Up @@ -45,9 +51,11 @@ export function lazy<T>(produce: () => Source<T>): Source<T> {
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
* for the JS Iterable protocol.
*/
export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {
export function fromAsyncIterable<T>(iterable: AsyncIterable<T> | AsyncIterator<T>): Source<T> {
return sink => {
const iterator = iterable[Symbol.asyncIterator]();
const iterator: AsyncIterator<T> =
(iterable[asyncIteratorSymbol()] && iterable[asyncIteratorSymbol()]()) || iterable;

let ended = false;
let looping = false;
let pulled = false;
Expand Down
5 changes: 5 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,8 @@ export interface Subject<T> extends Observer<T> {
/** The {@link Source} that issues the signals as the {@link Observer} methods are called. */
source: Source<T>;
}

/** Async Iterable/Iterator after having converted a {@link Source}.
* @see {@link toAsyncIterable} for a helper that creates this structure.
*/
export interface SourceIterable<T> extends AsyncIterator<T>, AsyncIterable<T> {}
Loading