Skip to content

Commit

Permalink
Fix logic bug discovered by @fcollonval
Browse files Browse the repository at this point in the history
  • Loading branch information
afshin committed Nov 9, 2022
1 parent 91af63a commit d6f4ff8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 40 deletions.
42 changes: 19 additions & 23 deletions packages/signaling/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ export interface ISignal<T, U> {
}

/**
* An object that is both a signal and an async iterable iterator.
* An object that is both a signal and an async iterable.
*/
export interface IStream<T, U>
extends ISignal<T, U>,
AsyncIterableIterator<U> {}
export interface IStream<T, U> extends ISignal<T, U>, AsyncIterable<U> {}

/**
* A concrete implementation of `ISignal`.
Expand Down Expand Up @@ -350,15 +348,18 @@ export namespace Signal {
}

/**
* A stream with the characteristics of a signal and an async iterator.
* A stream with the characteristics of a signal and an async iterable.
*/
export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
/**
* Return an async iterator that yields every emission.
*/
async *[Symbol.asyncIterator](): AsyncIterableIterator<U> {
while (this.pending) {
yield this.pending.promise;
let pending = this.pending;
while (true) {
const resolved = await pending.promise;
pending = resolved.next;
yield resolved.args;
}
}

Expand All @@ -368,34 +369,29 @@ export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
* @param args - The args to pass to the connected slots.
*/
emit(args: U): void {
if (this.blocked) {
return;
if (!this.blocked) {
const { pending } = this;
this.pending = new PromiseDelegate();
pending.resolve({ args, next: this.pending });
super.emit(args);
}
const { pending } = this;
super.emit(args);
this.pending = new PromiseDelegate();
pending.resolve(args);
}

/**
* Await the next value of the stream.
*
* @returns the next async iterator value in the stream.
*/
async next(): Promise<IteratorResult<U>> {
return { value: await this.pending.promise };
}

/**
* A promise that resolves the currently pending iteration.
*/
protected pending = new PromiseDelegate<U>();
protected pending: Private.Pending<U> = new PromiseDelegate();
}

/**
* The namespace for the module implementation details.
*/
namespace Private {
/**
* A pending promise in a promise chain underlying a stream.
*/
export type Pending<U> = PromiseDelegate<{ args: U; next: Pending<U> }>;

/**
* The signal exception handler function.
*/
Expand Down
37 changes: 20 additions & 17 deletions packages/signaling/tests/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -587,31 +587,34 @@ describe('@lumino/signaling', () => {
describe('#[Symbol.asyncIterator]()', () => {
it('should yield emissions and respected blocking', async () => {
const stream = new Stream<unknown, string>({});
const expected = 'async';
const input = 'async';
const expected = 'aINTERRUPTEDsync';
let emitted = '';
let once = true;
stream.connect((_, emitted) => {
if (once) {
once = false;
stream.emit('I');
stream.emit('N');
stream.emit('T');
stream.emit('E');
stream.emit('R');
stream.emit('R');
stream.emit('U');
stream.emit('P');
stream.emit('T');
stream.emit('E');
stream.emit('D');
}
});
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
expected.split('').forEach(x => setTimeout(() => stream.emit(x)));
input.split('').forEach(x => setTimeout(() => stream.emit(x)));
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
for await (const letter of stream) {
emitted = emitted.concat(letter);
if (emitted === expected) break;
}
});
});

describe('#next()', () => {
it('should resolve an iterator result and respect blocking', async () => {
const stream = new Stream<unknown, string>({});
const expected = 'next';
let emitted = '';
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
expected.split('').forEach(x => setTimeout(() => stream.emit(x)));
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
for (let it = await stream.next(); !it.done; it = await stream.next()) {
emitted = emitted.concat(it.value);
if (emitted === expected) break;
}
});
});
});
});

0 comments on commit d6f4ff8

Please sign in to comment.