diff --git a/src/testing/internal/ObservableStream.ts b/src/testing/internal/ObservableStream.ts index b19be0d469b..e7d8bd3e757 100644 --- a/src/testing/internal/ObservableStream.ts +++ b/src/testing/internal/ObservableStream.ts @@ -1,4 +1,5 @@ import type { Observable } from "../../utilities/index.js"; +import { ReadableStream } from "node:stream/web"; interface TakeOptions { timeout?: number; @@ -8,40 +9,23 @@ type ObservableEvent = | { type: "error"; error: any } | { type: "complete" }; -async function* observableToAsyncEventIterator(observable: Observable) { - let resolveNext: (value: ObservableEvent) => void; - const promises: Promise>[] = []; - queuePromise(); - - function queuePromise() { - promises.push( - new Promise>((resolve) => { - resolveNext = (event: ObservableEvent) => { - resolve(event); - queuePromise(); - }; - }) - ); - } - - observable.subscribe( - (value) => resolveNext({ type: "next", value }), - (error) => resolveNext({ type: "error", error }), - () => resolveNext({ type: "complete" }) - ); - yield "initialization value" as unknown as Promise>; - - while (true) { - yield promises.shift()!; +export class ObservableStream { + private reader: ReadableStreamDefaultReader>; + constructor(observable: Observable) { + this.reader = new ReadableStream>({ + start(controller) { + observable.subscribe( + (value) => controller.enqueue({ type: "next", value }), + (error) => controller.enqueue({ type: "error", error }), + () => controller.enqueue({ type: "complete" }) + ); + }, + }).getReader(); } -} - -class IteratorStream { - constructor(private iterator: AsyncGenerator) {} - async take({ timeout = 100 }: TakeOptions = {}): Promise { + take({ timeout = 100 }: TakeOptions = {}) { return Promise.race([ - this.iterator.next().then((result) => result.value!), + this.reader.read().then((result) => result.value!), new Promise((_, reject) => { setTimeout( reject, @@ -51,16 +35,6 @@ class IteratorStream { }), ]); } -} - -export class ObservableStream extends IteratorStream> { - constructor(observable: Observable) { - const iterator = observableToAsyncEventIterator(observable); - // we need to call next() once to start the generator so we immediately subscribe. - // the first value is always "initialization value" which we don't care about - iterator.next(); - super(iterator); - } async takeNext(options?: TakeOptions): Promise { const event = await this.take(options);