diff --git a/.changeset/stream-race.md b/.changeset/stream-race.md new file mode 100644 index 0000000000..6d0509de99 --- /dev/null +++ b/.changeset/stream-race.md @@ -0,0 +1,25 @@ +--- +"effect": minor +--- + + +feat(Stream): implement `race` operator, which accepts two upstreams and returns a stream that mirrors the first upstream to emit an item and interrupts the other upstream. + +```ts +import { Stream, Schedule, Console, Effect } from "effect" + +const stream = Stream.fromSchedule(Schedule.spaced('2 millis')).pipe( + Stream.race(Stream.fromSchedule(Schedule.spaced('1 millis'))), + Stream.take(6), + Stream.tap(n => Console.log(n)) +) + +Effect.runPromise(Stream.runDrain(stream)) +// Output each millisecond from the first stream, the rest streams are interrupted +// 0 +// 1 +// 2 +// 3 +// 4 +// 5 +``` \ No newline at end of file diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 28598f96b3..8197a130bc 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -3472,6 +3472,42 @@ export const provideSomeLayer: { ): Stream> } = internal.provideSomeLayer +/** + * Returns a stream that mirrors the first upstream to emit an item. + * As soon as one of the upstream emits a first value, the other is interrupted. + * The resulting stream will forward all items from the "winning" source stream. + * Any upstream failures will cause the returned stream to fail. + * + * @example + * import { Stream, Schedule, Console, Effect } from "effect" + * + * const stream = Stream.fromSchedule(Schedule.spaced('2 millis')).pipe( + * Stream.race(Stream.fromSchedule(Schedule.spaced('1 millis'))), + * Stream.take(6), + * Stream.tap(Console.log) + * ) + * + * Effect.runPromise(Stream.runDrain(stream)) + * // Output each millisecond from the first stream, the rest streams are interrupted + * // 0 + * // 1 + * // 2 + * // 3 + * // 4 + * // 5 + * @since 3.7.0 + * @category racing + */ +export const race: { + ( + right: Stream + ): (left: Stream) => Stream + ( + left: Stream, + right: Stream + ): Stream +} = internal.race + /** * Returns a stream that mirrors the first upstream to emit an item. * As soon as one of the upstream emits a first value, all the others are interrupted. diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 0f548bb835..e3c1a291d2 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4873,6 +4873,24 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S return new StreamImpl(go(min, max, chunkSize)) }) +/** @internal */ +export const race: { + ( + right: Stream.Stream + ): (left: Stream.Stream) => Stream.Stream + ( + left: Stream.Stream, + right: Stream.Stream + ): Stream.Stream +} = dual( + 2, + ( + left: Stream.Stream, + right: Stream.Stream + ): Stream.Stream => raceAll(left, right) +) + +/** @internal */ export const raceAll = >>( ...streams: S ): Stream.Stream<