Skip to content

Commit

Permalink
feat(Stream): race (#3305)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim <hello@timsmart.co>
  • Loading branch information
dilame and tim-smart committed Aug 27, 2024
1 parent 601a657 commit 16e3b3c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
25 changes: 25 additions & 0 deletions .changeset/stream-race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"effect": minor
---


feat(Stream): implement `race` operator, which accepts two upstreams and returns a stream that mirrors the first upstream to emit an item and interrupts the other upstream.

```ts
import { Stream, Schedule, Console, Effect } from "effect"

const stream = Stream.fromSchedule(Schedule.spaced('2 millis')).pipe(
Stream.race(Stream.fromSchedule(Schedule.spaced('1 millis'))),
Stream.take(6),
Stream.tap(n => Console.log(n))
)

Effect.runPromise(Stream.runDrain(stream))
// Output each millisecond from the first stream, the rest streams are interrupted
// 0
// 1
// 2
// 3
// 4
// 5
```
36 changes: 36 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3472,6 +3472,42 @@ export const provideSomeLayer: {
): Stream<A, E | E2, RIn | Exclude<R, ROut>>
} = internal.provideSomeLayer

/**
* Returns a stream that mirrors the first upstream to emit an item.
* As soon as one of the upstream emits a first value, the other is interrupted.
* The resulting stream will forward all items from the "winning" source stream.
* Any upstream failures will cause the returned stream to fail.
*
* @example
* import { Stream, Schedule, Console, Effect } from "effect"
*
* const stream = Stream.fromSchedule(Schedule.spaced('2 millis')).pipe(
* Stream.race(Stream.fromSchedule(Schedule.spaced('1 millis'))),
* Stream.take(6),
* Stream.tap(Console.log)
* )
*
* Effect.runPromise(Stream.runDrain(stream))
* // Output each millisecond from the first stream, the rest streams are interrupted
* // 0
* // 1
* // 2
* // 3
* // 4
* // 5
* @since 3.7.0
* @category racing
*/
export const race: {
<AR, ER, RR>(
right: Stream<AR, ER, RR>
): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL | AR, EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR>(
left: Stream<AL, EL, RL>,
right: Stream<AR, ER, RR>
): Stream<AL | AR, EL | ER, RL | RR>
} = internal.race

/**
* Returns a stream that mirrors the first upstream to emit an item.
* As soon as one of the upstream emits a first value, all the others are interrupted.
Expand Down
18 changes: 18 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4873,6 +4873,24 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S
return new StreamImpl(go(min, max, chunkSize))
})

/** @internal */
export const race: {
<AR, ER, RR>(
right: Stream.Stream<AR, ER, RR>
): <AL, EL, RL>(left: Stream.Stream<AL, EL, RL>) => Stream.Stream<AL | AR, EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR>(
left: Stream.Stream<AL, EL, RL>,
right: Stream.Stream<AR, ER, RR>
): Stream.Stream<AL | AR, EL | ER, RL | RR>
} = dual(
2,
<AL, EL, RL, AR, ER, RR>(
left: Stream.Stream<AL, EL, RL>,
right: Stream.Stream<AR, ER, RR>
): Stream.Stream<AL | AR, EL | ER, RL | RR> => raceAll(left, right)
)

/** @internal */
export const raceAll = <S extends ReadonlyArray<Stream.Stream<any, any, any>>>(
...streams: S
): Stream.Stream<
Expand Down

0 comments on commit 16e3b3c

Please sign in to comment.