From 9eb29abdeb3e57323153ad7ef78b05be064d3e0f Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Thu, 20 Jul 2023 15:15:32 +0100 Subject: [PATCH 1/7] Add type for AsyncIterator/Iterable combo --- src/types.d.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/types.d.ts b/src/types.d.ts index 164cafa..faa5093 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -200,3 +200,8 @@ export interface Subject extends Observer { /** The {@link Source} that issues the signals as the {@link Observer} methods are called. */ source: Source; } + +/** Async Iterable/Iterator after having converted a {@link Source}. + * @see {@link toAsyncIterable} for a helper that creates this structure. + */ +export interface SourceIterable extends AsyncIterator, AsyncIterable {} From b17a44333b28752f8b8295812d45d5c14a0ad7b3 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Thu, 20 Jul 2023 15:16:12 +0100 Subject: [PATCH 2/7] Extract symbol helpers --- src/helpers.ts | 27 +++++++++++++++++++++++++++ src/observable.ts | 23 +---------------------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/helpers.ts b/src/helpers.ts index eb4b4a1..59dd229 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -1,5 +1,11 @@ import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types'; +declare global { + interface SymbolConstructor { + readonly observable: symbol; + } +} + /** Placeholder {@link TeardownFn | teardown functions} that's a no-op. * @see {@link TeardownFn} for the definition and usage of teardowns. * @internal @@ -39,3 +45,24 @@ export function push(value: T): Push { 0: value, } as Push; } + +/** Returns the well-known symbol specifying the default AsyncIterator. + * @internal + */ +export const asyncIteratorSymbol = (): typeof Symbol.asyncIterator => + (typeof Symbol === 'function' && Symbol.asyncIterator) || ('@@asyncIterator' as any); + +/** Returns the well-known symbol specifying the default ES Observable. + * @privateRemarks + * This symbol is used to mark an object as a default ES Observable. By the specification, an object + * that abides by the default Observable implementation must carry a method set to this well-known + * symbol that returns the Observable implementation. It's common for this object to be an + * Observable itself and return itself on this method. + * + * @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility + * between Observable implementations. + * + * @internal + */ +export const observableSymbol = (): typeof Symbol.observable => + Symbol.observable || ('@@observable' as any); diff --git a/src/observable.ts b/src/observable.ts index f226630..b80c079 100644 --- a/src/observable.ts +++ b/src/observable.ts @@ -1,11 +1,5 @@ import { Source, SignalKind, TalkbackKind } from './types'; -import { push, start, talkbackPlaceholder } from './helpers'; - -declare global { - interface SymbolConstructor { - readonly observable: symbol; - } -} +import { push, start, talkbackPlaceholder, observableSymbol } from './helpers'; /** A definition of the ES Observable Subscription type that is returned by * {@link Observable.subscribe} @@ -118,21 +112,6 @@ interface Observable { [Symbol.observable](): Observable; } -/** Returns the well-known symbol specifying the default ES Observable. - * @privateRemarks - * This symbol is used to mark an object as a default ES Observable. By the specification, an object - * that abides by the default Observable implementation must carry a method set to this well-known - * symbol that returns the Observable implementation. It's common for this object to be an - * Observable itself and return itself on this method. - * - * @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility - * between Observable implementations. - * - * @internal - */ -const observableSymbol = (): typeof Symbol.observable => - Symbol.observable || ('@@observable' as any); - /** Converts an ES Observable to a {@link Source}. * @param input - The {@link ObservableLike} object that will be converted. * @returns A {@link Source} wrapping the passed Observable. From 49162491bfea1632d1ed432e4a086871e2136b85 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Thu, 20 Jul 2023 15:16:25 +0100 Subject: [PATCH 3/7] Add more flexible AsyncIterator conversion helpers --- src/sinks.ts | 85 ++++++++++++++++++++++++++------------------------ src/sources.ts | 14 +++++++-- 2 files changed, 56 insertions(+), 43 deletions(-) diff --git a/src/sinks.ts b/src/sinks.ts index 739e346..e91eff1 100644 --- a/src/sinks.ts +++ b/src/sinks.ts @@ -1,5 +1,5 @@ -import { Source, Subscription, TalkbackKind, SignalKind } from './types'; -import { talkbackPlaceholder } from './helpers'; +import { Source, Subscription, TalkbackKind, SignalKind, SourceIterable } from './types'; +import { talkbackPlaceholder, asyncIteratorSymbol } from './helpers'; /** Creates a subscription to a given source and invokes a `subscriber` callback for each value. * @param subscriber - A callback function called for each issued value. @@ -124,49 +124,54 @@ const doneResult = { done: true } as IteratorReturnResult; * } * ``` */ -export const toAsyncIterable = (source: Source): AsyncIterable => ({ - [Symbol.asyncIterator](): AsyncIterator { - const buffer: T[] = []; +export const toAsyncIterable = (source: Source): SourceIterable => { + const buffer: T[] = []; - let ended = false; - let talkback = talkbackPlaceholder; - let next: ((value: IteratorResult) => void) | void; + let ended = false; + let started = false; + let talkback = talkbackPlaceholder; + let next: ((value: IteratorResult) => void) | void; - source(signal => { - if (ended) { - /*noop*/ - } else if (signal === SignalKind.End) { - if (next) next = next(doneResult); - ended = true; - } else if (signal.tag === SignalKind.Start) { - (talkback = signal[0])(TalkbackKind.Pull); - } else if (next) { - next = next({ value: signal[0], done: false }); - } else { - buffer.push(signal[0]); + return { + async next(): Promise> { + if (!started) { + started = true; + source(signal => { + if (ended) { + /*noop*/ + } else if (signal === SignalKind.End) { + if (next) next = next(doneResult); + ended = true; + } else if (signal.tag === SignalKind.Start) { + (talkback = signal[0])(TalkbackKind.Pull); + } else if (next) { + next = next({ value: signal[0], done: false }); + } else { + buffer.push(signal[0]); + } + }); } - }); - - return { - async next(): Promise> { - if (ended && !buffer.length) { - return doneResult; - } else if (!ended && buffer.length <= 1) { - talkback(TalkbackKind.Pull); - } - return buffer.length - ? { value: buffer.shift()!, done: false } - : new Promise(resolve => (next = resolve)); - }, - async return(): Promise> { - if (!ended) next = talkback(TalkbackKind.Close); - ended = true; + if (ended && !buffer.length) { return doneResult; - }, - }; - }, -}); + } else if (!ended && buffer.length <= 1) { + talkback(TalkbackKind.Pull); + } + + return buffer.length + ? { value: buffer.shift()!, done: false } + : new Promise(resolve => (next = resolve)); + }, + async return(): Promise> { + if (!ended) next = talkback(TalkbackKind.Close); + ended = true; + return doneResult; + }, + [asyncIteratorSymbol()](): SourceIterable { + return this; + }, + }; +}; /** Subscribes to a given source and collects all synchronous values into an array. * @param source - A {@link Source}. diff --git a/src/sources.ts b/src/sources.ts index abeb6c2..480f0de 100644 --- a/src/sources.ts +++ b/src/sources.ts @@ -1,5 +1,11 @@ import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'; -import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers'; +import { + push, + start, + talkbackPlaceholder, + teardownPlaceholder, + asyncIteratorSymbol, +} from './helpers'; import { share } from './operators'; /** Helper creating a Source from a factory function when it's subscribed to. @@ -45,9 +51,11 @@ export function lazy(produce: () => Source): Source { * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols} * for the JS Iterable protocol. */ -export function fromAsyncIterable(iterable: AsyncIterable): Source { +export function fromAsyncIterable(iterable: AsyncIterable | AsyncIterator): Source { return sink => { - const iterator = iterable[Symbol.asyncIterator](); + const iterator: AsyncIterator = + (iterable[asyncIteratorSymbol()] && iterable[asyncIteratorSymbol()]()) || iterable; + let ended = false; let looping = false; let pulled = false; From 2dd532b3d9ba947730a653691ece69de28c1aa3b Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Thu, 20 Jul 2023 15:40:58 +0100 Subject: [PATCH 4/7] Update tests --- src/__tests__/sinks.test.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/__tests__/sinks.test.ts b/src/__tests__/sinks.test.ts index e6f3fc7..99d53fe 100644 --- a/src/__tests__/sinks.test.ts +++ b/src/__tests__/sinks.test.ts @@ -244,10 +244,11 @@ describe('toAsyncIterable', () => { }; const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); + const next$ = asyncIterator.next(); - expect(pulls).toBe(1); + expect(pulls).toBe(2); sink!(push(0)); - expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); + expect(await next$).toEqual({ value: 0, done: false }); expect(pulls).toBe(2); sink!(push(1)); @@ -273,13 +274,14 @@ describe('toAsyncIterable', () => { }; const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); + const next$ = asyncIterator.next(); sink!(push(0)); sink!(push(1)); sink!(SignalKind.End); - expect(pulls).toBe(1); - expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); + expect(pulls).toBe(2); + expect(await next$).toEqual({ value: 0, done: false }); expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); expect(await asyncIterator.next()).toEqual({ done: true }); }); @@ -298,7 +300,8 @@ describe('toAsyncIterable', () => { }; const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); - expect(pulls).toBe(1); + asyncIterator.next(); + expect(pulls).toBe(2); let resolved = false; @@ -330,9 +333,10 @@ describe('toAsyncIterable', () => { }; const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); + const next$ = asyncIterator.next(); sink!(push(0)); - expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); + expect(await next$).toEqual({ value: 0, done: false }); expect(await asyncIterator.return!()).toEqual({ done: true }); sink!(push(1)); From 4cdc6a0aa81d7b0b2f98cb7c1a0d0fd2918dc03c Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Thu, 20 Jul 2023 16:01:28 +0100 Subject: [PATCH 5/7] Prevent duplicate pull signals --- src/__tests__/sinks.test.ts | 11 +++++------ src/sinks.ts | 14 ++++++++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/__tests__/sinks.test.ts b/src/__tests__/sinks.test.ts index 99d53fe..86a10ae 100644 --- a/src/__tests__/sinks.test.ts +++ b/src/__tests__/sinks.test.ts @@ -246,18 +246,17 @@ describe('toAsyncIterable', () => { const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); const next$ = asyncIterator.next(); - expect(pulls).toBe(2); sink!(push(0)); expect(await next$).toEqual({ value: 0, done: false }); - expect(pulls).toBe(2); + expect(pulls).toBe(1); sink!(push(1)); expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); - expect(pulls).toBe(3); + expect(pulls).toBe(2); sink!(SignalKind.End); expect(await asyncIterator.next()).toEqual({ done: true }); - expect(pulls).toBe(3); + expect(pulls).toBe(2); }); it('buffers actively pushed values', async () => { @@ -280,7 +279,7 @@ describe('toAsyncIterable', () => { sink!(push(1)); sink!(SignalKind.End); - expect(pulls).toBe(2); + expect(pulls).toBe(1); expect(await next$).toEqual({ value: 0, done: false }); expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); expect(await asyncIterator.next()).toEqual({ done: true }); @@ -301,7 +300,7 @@ describe('toAsyncIterable', () => { const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); asyncIterator.next(); - expect(pulls).toBe(2); + expect(pulls).toBe(1); let resolved = false; diff --git a/src/sinks.ts b/src/sinks.ts index e91eff1..69fccf4 100644 --- a/src/sinks.ts +++ b/src/sinks.ts @@ -129,6 +129,7 @@ export const toAsyncIterable = (source: Source): SourceIterable => { let ended = false; let started = false; + let pulled = false; let talkback = talkbackPlaceholder; let next: ((value: IteratorResult) => void) | void; @@ -143,18 +144,23 @@ export const toAsyncIterable = (source: Source): SourceIterable => { if (next) next = next(doneResult); ended = true; } else if (signal.tag === SignalKind.Start) { + pulled = true; (talkback = signal[0])(TalkbackKind.Pull); - } else if (next) { - next = next({ value: signal[0], done: false }); } else { - buffer.push(signal[0]); + pulled = false; + if (next) { + next = next({ value: signal[0], done: false }); + } else { + buffer.push(signal[0]); + } } }); } if (ended && !buffer.length) { return doneResult; - } else if (!ended && buffer.length <= 1) { + } else if (!ended && !pulled && buffer.length <= 1) { + pulled = true; talkback(TalkbackKind.Pull); } From b2dc6271e211a4562b58a629eadc070031a24d17 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Thu, 20 Jul 2023 16:04:02 +0100 Subject: [PATCH 6/7] Add changeset --- .changeset/yellow-hounds-heal.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/yellow-hounds-heal.md diff --git a/.changeset/yellow-hounds-heal.md b/.changeset/yellow-hounds-heal.md new file mode 100644 index 0000000..942b6c8 --- /dev/null +++ b/.changeset/yellow-hounds-heal.md @@ -0,0 +1,5 @@ +--- +'wonka': patch +--- + +Improve compatibility of `fromAsyncIterable` and `toAsyncIterable`. The `toAsyncIterable` will now output an object that's both an `AsyncIterator` and an `AsyncIterable`. Both helpers will now use a polyfill for `Symbol.asyncIterator` to improve compatibility with the Hermes engine and Babel transpilation. From 5388531451abab45c986fc5583b845bf7e3ae68b Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Thu, 20 Jul 2023 16:22:45 +0100 Subject: [PATCH 7/7] Align symbol helpers --- src/helpers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helpers.ts b/src/helpers.ts index 59dd229..c237eb7 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -65,4 +65,4 @@ export const asyncIteratorSymbol = (): typeof Symbol.asyncIterator => * @internal */ export const observableSymbol = (): typeof Symbol.observable => - Symbol.observable || ('@@observable' as any); + (typeof Symbol === 'function' && Symbol.observable) || ('@@observable' as any);