-
-
Notifications
You must be signed in to change notification settings - Fork 231
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(Stream): raceAll implementation (#3131)
Co-authored-by: Tim <hello@timsmart.co>
- Loading branch information
Showing
4 changed files
with
159 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
--- | ||
"effect": minor | ||
--- | ||
|
||
feat(Stream): implement "raceAll" operator, which returns a stream that mirrors the first source stream to emit an item. | ||
|
||
```ts | ||
import { Stream, Schedule, Console, Effect } from "effect"; | ||
|
||
const stream = Stream.raceAll( | ||
Stream.fromSchedule(Schedule.spaced("1 millis")), | ||
Stream.fromSchedule(Schedule.spaced("2 millis")), | ||
Stream.fromSchedule(Schedule.spaced("4 millis")), | ||
).pipe(Stream.take(6), Stream.tap(Console.log)); | ||
|
||
Effect.runPromise(Stream.runDrain(stream)); | ||
// Output only from the first stream, the rest streams are interrupted | ||
// 0 | ||
// 1 | ||
// 2 | ||
// 3 | ||
// 4 | ||
// 5 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import * as Chunk from "effect/Chunk" | ||
import * as Effect from "effect/Effect" | ||
import * as Fiber from "effect/Fiber" | ||
import * as Schedule from "effect/Schedule" | ||
import * as Stream from "effect/Stream" | ||
import * as it from "effect/test/utils/extend" | ||
import * as TestClock from "effect/TestClock" | ||
import { assert, describe } from "vitest" | ||
|
||
describe("Stream", () => { | ||
it.effect("raceAll sync", () => | ||
Effect.gen(function*($) { | ||
const result = yield* $( | ||
Stream.raceAll( | ||
Stream.make(0, 1, 2, 3), | ||
Stream.make(4, 5, 6, 7), | ||
Stream.make(7, 8, 9, 10) | ||
), | ||
Stream.runCollect, | ||
Effect.map(Chunk.toReadonlyArray) | ||
) | ||
assert.deepStrictEqual(result, [0, 1, 2, 3]) | ||
})) | ||
|
||
it.effect("raceAll async", () => | ||
Effect.gen(function*($) { | ||
const fiber = yield* $( | ||
Stream.raceAll( | ||
Stream.fromSchedule(Schedule.spaced("1 second")), | ||
Stream.fromSchedule(Schedule.spaced("2 second")) | ||
), | ||
Stream.take(5), | ||
Stream.runCollect, | ||
Effect.map(Chunk.toReadonlyArray), | ||
Effect.fork | ||
) | ||
yield* TestClock.adjust("5 second") | ||
const result = yield* Fiber.join(fiber) | ||
assert.deepStrictEqual(result, [0, 1, 2, 3, 4]) | ||
})) | ||
|
||
it.effect("raceAll combined async + sync", () => | ||
Effect.gen(function*($) { | ||
const result = yield* $( | ||
Stream.raceAll( | ||
Stream.fromSchedule(Schedule.spaced("1 second")), | ||
Stream.make(0, 1, 2, 3) | ||
), | ||
Stream.runCollect, | ||
Effect.map(Chunk.toReadonlyArray) | ||
) | ||
assert.deepStrictEqual(result, [0, 1, 2, 3]) | ||
})) | ||
|
||
it.effect("raceAll combined sync + async", () => | ||
Effect.gen(function*($) { | ||
const result = yield* $( | ||
Stream.raceAll( | ||
Stream.make(0, 1, 2, 3), | ||
Stream.fromSchedule(Schedule.spaced("1 second")) | ||
), | ||
Stream.runCollect, | ||
Effect.map(Chunk.toReadonlyArray) | ||
) | ||
assert.deepStrictEqual(result, [0, 1, 2, 3]) | ||
})) | ||
}) |