Skip to content

Commit

Permalink
Prevent duplicate pull signals
Browse files Browse the repository at this point in the history
  • Loading branch information
kitten committed Jul 20, 2023
1 parent 2dd532b commit 4cdc6a0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
11 changes: 5 additions & 6 deletions src/__tests__/sinks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,17 @@ describe('toAsyncIterable', () => {
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
const next$ = asyncIterator.next();

expect(pulls).toBe(2);
sink!(push(0));
expect(await next$).toEqual({ value: 0, done: false });
expect(pulls).toBe(2);
expect(pulls).toBe(1);

sink!(push(1));
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(pulls).toBe(3);
expect(pulls).toBe(2);

sink!(SignalKind.End);
expect(await asyncIterator.next()).toEqual({ done: true });
expect(pulls).toBe(3);
expect(pulls).toBe(2);
});

it('buffers actively pushed values', async () => {
Expand All @@ -280,7 +279,7 @@ describe('toAsyncIterable', () => {
sink!(push(1));
sink!(SignalKind.End);

expect(pulls).toBe(2);
expect(pulls).toBe(1);
expect(await next$).toEqual({ value: 0, done: false });
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(await asyncIterator.next()).toEqual({ done: true });
Expand All @@ -301,7 +300,7 @@ describe('toAsyncIterable', () => {

const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
asyncIterator.next();
expect(pulls).toBe(2);
expect(pulls).toBe(1);

let resolved = false;

Expand Down
14 changes: 10 additions & 4 deletions src/sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ export const toAsyncIterable = <T>(source: Source<T>): SourceIterable<T> => {

let ended = false;
let started = false;
let pulled = false;
let talkback = talkbackPlaceholder;
let next: ((value: IteratorResult<T>) => void) | void;

Expand All @@ -143,18 +144,23 @@ export const toAsyncIterable = <T>(source: Source<T>): SourceIterable<T> => {
if (next) next = next(doneResult);
ended = true;
} else if (signal.tag === SignalKind.Start) {
pulled = true;
(talkback = signal[0])(TalkbackKind.Pull);
} else if (next) {
next = next({ value: signal[0], done: false });
} else {
buffer.push(signal[0]);
pulled = false;
if (next) {
next = next({ value: signal[0], done: false });
} else {
buffer.push(signal[0]);
}
}
});
}

if (ended && !buffer.length) {
return doneResult;
} else if (!ended && buffer.length <= 1) {
} else if (!ended && !pulled && buffer.length <= 1) {
pulled = true;
talkback(TalkbackKind.Pull);
}

Expand Down

0 comments on commit 4cdc6a0

Please sign in to comment.