From 4cdc6a0aa81d7b0b2f98cb7c1a0d0fd2918dc03c Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Thu, 20 Jul 2023 16:01:28 +0100 Subject: [PATCH] Prevent duplicate pull signals --- src/__tests__/sinks.test.ts | 11 +++++------ src/sinks.ts | 14 ++++++++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/__tests__/sinks.test.ts b/src/__tests__/sinks.test.ts index 99d53fe..86a10ae 100644 --- a/src/__tests__/sinks.test.ts +++ b/src/__tests__/sinks.test.ts @@ -246,18 +246,17 @@ describe('toAsyncIterable', () => { const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); const next$ = asyncIterator.next(); - expect(pulls).toBe(2); sink!(push(0)); expect(await next$).toEqual({ value: 0, done: false }); - expect(pulls).toBe(2); + expect(pulls).toBe(1); sink!(push(1)); expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); - expect(pulls).toBe(3); + expect(pulls).toBe(2); sink!(SignalKind.End); expect(await asyncIterator.next()).toEqual({ done: true }); - expect(pulls).toBe(3); + expect(pulls).toBe(2); }); it('buffers actively pushed values', async () => { @@ -280,7 +279,7 @@ describe('toAsyncIterable', () => { sink!(push(1)); sink!(SignalKind.End); - expect(pulls).toBe(2); + expect(pulls).toBe(1); expect(await next$).toEqual({ value: 0, done: false }); expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); expect(await asyncIterator.next()).toEqual({ done: true }); @@ -301,7 +300,7 @@ describe('toAsyncIterable', () => { const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); asyncIterator.next(); - expect(pulls).toBe(2); + expect(pulls).toBe(1); let resolved = false; diff --git a/src/sinks.ts b/src/sinks.ts index e91eff1..69fccf4 100644 --- a/src/sinks.ts +++ b/src/sinks.ts @@ -129,6 +129,7 @@ export const toAsyncIterable = (source: Source): SourceIterable => { let ended = false; let started = false; + let pulled = false; let talkback = talkbackPlaceholder; let next: ((value: IteratorResult) => void) | void; @@ -143,18 +144,23 @@ export const toAsyncIterable = (source: Source): SourceIterable => { if (next) next = next(doneResult); ended = true; } else if (signal.tag === SignalKind.Start) { + pulled = true; (talkback = signal[0])(TalkbackKind.Pull); - } else if (next) { - next = next({ value: signal[0], done: false }); } else { - buffer.push(signal[0]); + pulled = false; + if (next) { + next = next({ value: signal[0], done: false }); + } else { + buffer.push(signal[0]); + } } }); } if (ended && !buffer.length) { return doneResult; - } else if (!ended && buffer.length <= 1) { + } else if (!ended && !pulled && buffer.length <= 1) { + pulled = true; talkback(TalkbackKind.Pull); }