diff --git a/.changeset/little-moons-dress.md b/.changeset/little-moons-dress.md
new file mode 100644
index 0000000000..3d272ed80b
--- /dev/null
+++ b/.changeset/little-moons-dress.md
@@ -0,0 +1,5 @@
+---
+"effect": patch
+---
+
+Updated the JSDocs for the `Stream` module by adding examples to key functions.
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index be3c6d9537..de85861b94 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -188,6 +188,29 @@ export const accumulateChunks: (self: Stream) => Stream
+ * Effect.gen(function*() {
+ * yield* Console.log(`Opening ${filename}`)
+ * return {
+ * getLines: Effect.succeed(["Line 1", "Line 2", "Line 3"]),
+ * close: Console.log(`Closing ${filename}`)
+ * }
+ * })
+ *
+ * const stream = Stream.acquireRelease(
+ * open("file.txt"),
+ * (file) => file.close
+ * ).pipe(Stream.flatMap((file) => file.getLines))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // Opening file.txt
+ * // Closing file.txt
+ * // { _id: 'Chunk', values: [ [ 'Line 1', 'Line 2', 'Line 3' ] ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -269,6 +292,14 @@ export const aggregateWithinEither: {
/**
* Maps the success values of this stream to the specified constant value.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.range(1, 5).pipe(Stream.as(null))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ null, null, null, null, null ] }
+ *
* @since 2.0.0
* @category mapping
*/
@@ -291,6 +322,29 @@ export {
* The registration function can optionally return an `Effect`, which will be
* executed if the `Fiber` executing this Effect is interrupted.
*
+ * @example
+ * import type { StreamEmit } from "effect"
+ * import { Chunk, Effect, Option, Stream } from "effect"
+ *
+ * const events = [1, 2, 3, 4]
+ *
+ * const stream = Stream.async(
+ * (emit: StreamEmit.Emit) => {
+ * events.forEach((n) => {
+ * setTimeout(() => {
+ * if (n === 3) {
+ * emit(Effect.fail(Option.none())) // Terminate the stream
+ * } else {
+ * emit(Effect.succeed(Chunk.of(n))) // Add the current item to the stream
+ * }
+ * }, 100 * n)
+ * })
+ * }
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -350,6 +404,77 @@ export const branchAfter: {
* as this stream. The driver stream will only ever advance the `maximumLag`
* chunks before the slowest downstream stream.
*
+ * @example
+ * import { Console, Effect, Fiber, Schedule, Stream } from "effect"
+ *
+ * const numbers = Effect.scoped(
+ * Stream.range(1, 20).pipe(
+ * Stream.tap((n) => Console.log(`Emit ${n} element before broadcasting`)),
+ * Stream.broadcast(2, 5),
+ * Stream.flatMap(([first, second]) =>
+ * Effect.gen(function*() {
+ * const fiber1 = yield* Stream.runFold(first, 0, (acc, e) => Math.max(acc, e)).pipe(
+ * Effect.andThen((max) => Console.log(`Maximum: ${max}`)),
+ * Effect.fork
+ * )
+ * const fiber2 = yield* second.pipe(
+ * Stream.schedule(Schedule.spaced("1 second")),
+ * Stream.runForEach((n) => Console.log(`Logging to the Console: ${n}`)),
+ * Effect.fork
+ * )
+ * yield* Fiber.join(fiber1).pipe(
+ * Effect.zip(Fiber.join(fiber2), { concurrent: true })
+ * )
+ * })
+ * ),
+ * Stream.runCollect
+ * )
+ * )
+ *
+ * // Effect.runPromise(numbers).then(console.log)
+ * // Emit 1 element before broadcasting
+ * // Emit 2 element before broadcasting
+ * // Emit 3 element before broadcasting
+ * // Emit 4 element before broadcasting
+ * // Emit 5 element before broadcasting
+ * // Emit 6 element before broadcasting
+ * // Emit 7 element before broadcasting
+ * // Emit 8 element before broadcasting
+ * // Emit 9 element before broadcasting
+ * // Emit 10 element before broadcasting
+ * // Emit 11 element before broadcasting
+ * // Logging to the Console: 1
+ * // Logging to the Console: 2
+ * // Logging to the Console: 3
+ * // Logging to the Console: 4
+ * // Logging to the Console: 5
+ * // Emit 12 element before broadcasting
+ * // Emit 13 element before broadcasting
+ * // Emit 14 element before broadcasting
+ * // Emit 15 element before broadcasting
+ * // Emit 16 element before broadcasting
+ * // Logging to the Console: 6
+ * // Logging to the Console: 7
+ * // Logging to the Console: 8
+ * // Logging to the Console: 9
+ * // Logging to the Console: 10
+ * // Emit 17 element before broadcasting
+ * // Emit 18 element before broadcasting
+ * // Emit 19 element before broadcasting
+ * // Emit 20 element before broadcasting
+ * // Logging to the Console: 11
+ * // Logging to the Console: 12
+ * // Logging to the Console: 13
+ * // Logging to the Console: 14
+ * // Logging to the Console: 15
+ * // Maximum: 20
+ * // Logging to the Console: 16
+ * // Logging to the Console: 17
+ * // Logging to the Console: 18
+ * // Logging to the Console: 19
+ * // Logging to the Console: 20
+ * // { _id: 'Chunk', values: [ undefined ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -430,9 +555,38 @@ export const broadcastedQueuesDynamic: {
* Allows a faster producer to progress independently of a slower consumer by
* buffering up to `capacity` elements in a queue.
*
- * @note This combinator destroys the chunking structure. It's recommended to
+ * Note: This combinator destroys the chunking structure. It's recommended to
* use rechunk afterwards. Additionally, prefer capacities that are powers
* of 2 for better performance.
+ *
+ * @example
+ * import { Console, Effect, Schedule, Stream } from "effect"
+ *
+ * const stream = Stream.range(1, 10).pipe(
+ * Stream.tap((n) => Console.log(`before buffering: ${n}`)),
+ * Stream.buffer({ capacity: 4 }),
+ * Stream.tap((n) => Console.log(`after buffering: ${n}`)),
+ * Stream.schedule(Schedule.spaced("5 seconds"))
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // before buffering: 1
+ * // before buffering: 2
+ * // before buffering: 3
+ * // before buffering: 4
+ * // before buffering: 5
+ * // before buffering: 6
+ * // after buffering: 1
+ * // after buffering: 2
+ * // before buffering: 7
+ * // after buffering: 3
+ * // before buffering: 8
+ * // after buffering: 4
+ * // before buffering: 9
+ * // after buffering: 5
+ * // before buffering: 10
+ * // ...
+ *
* @since 2.0.0
* @category utils
*/
@@ -618,6 +772,14 @@ export const catchSomeCause: {
* previous element emitted, using natural equality to determine whether two
* elements are equal.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.make(1, 1, 1, 2, 2, 3, 4).pipe(Stream.changes)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3, 4 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -749,6 +911,17 @@ export const combineChunks: {
* that emits the elements from this stream and then the elements from the
* specified stream.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const s1 = Stream.make(1, 2, 3)
+ * const s2 = Stream.make(4, 5)
+ *
+ * const stream = Stream.concat(s1, s2)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -760,6 +933,24 @@ export const concat: {
/**
* Concatenates all of the streams in the chunk to one stream.
*
+ * @example
+ * import { Chunk, Effect, Stream } from "effect"
+ *
+ * const s1 = Stream.make(1, 2, 3)
+ * const s2 = Stream.make(4, 5)
+ * const s3 = Stream.make(6, 7, 8)
+ *
+ * const stream = Stream.concatAll(Chunk.make(s1, s2, s3))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // {
+ * // _id: 'Chunk',
+ * // values: [
+ * // 1, 2, 3, 4,
+ * // 5, 6, 7, 8
+ * // ]
+ * // }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -772,6 +963,22 @@ export const concatAll: (streams: Chunk.Chunk>) => Stre
*
* See also `Stream.zip` for the more common point-wise variant.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const s1 = Stream.make(1, 2, 3)
+ * const s2 = Stream.make("a", "b")
+ *
+ * const product = Stream.cross(s1, s2)
+ *
+ * // Effect.runPromise(Stream.runCollect(product)).then(console.log)
+ * // {
+ * // _id: "Chunk",
+ * // values: [
+ * // [ 1, "a" ], [ 1, "b" ], [ 2, "a" ], [ 2, "b" ], [ 3, "a" ], [ 3, "b" ]
+ * // ]
+ * // }
+ *
* @since 2.0.0
* @category utils
*/
@@ -844,6 +1051,45 @@ export const crossWith: {
* example, a search engine may only want to initiate a search after a user
* has paused typing so as to not prematurely recommend results.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * let last = Date.now()
+ * const log = (message: string) =>
+ * Effect.sync(() => {
+ * const end = Date.now()
+ * console.log(`${message} after ${end - last}ms`)
+ * last = end
+ * })
+ *
+ * const stream = Stream.make(1, 2, 3).pipe(
+ * Stream.concat(
+ * Stream.fromEffect(Effect.sleep("200 millis").pipe(Effect.as(4))) // Emit 4 after 200 ms
+ * ),
+ * Stream.concat(Stream.make(5, 6)), // Continue with more rapid values
+ * Stream.concat(
+ * Stream.fromEffect(Effect.sleep("150 millis").pipe(Effect.as(7))) // Emit 7 after 150 ms
+ * ),
+ * Stream.concat(Stream.make(8)),
+ * Stream.tap((n) => log(`Received ${n}`)),
+ * Stream.debounce("100 millis"), // Only emit values after a pause of at least 100 milliseconds,
+ * Stream.tap((n) => log(`> Emitted ${n}`))
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // Received 1 after 5ms
+ * // Received 2 after 2ms
+ * // Received 3 after 0ms
+ * // > Emitted 3 after 104ms
+ * // Received 4 after 99ms
+ * // Received 5 after 1ms
+ * // Received 6 after 0ms
+ * // > Emitted 6 after 101ms
+ * // Received 7 after 50ms
+ * // Received 8 after 1ms
+ * // > Emitted 8 after 101ms
+ * // { _id: 'Chunk', values: [ 3, 6, 8 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -939,6 +1185,15 @@ export const distributedWithDynamic: {
* Converts this stream to a stream that executes its effects but emits no
* elements. Useful for sequencing effects using streams:
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * // We create a stream and immediately drain it.
+ * const stream = Stream.range(1, 6).pipe(Stream.drain)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -1054,6 +1309,14 @@ export const either: (self: Stream) => Stream = internal.empty
/**
* Executes the provided finalizer after this stream's finalizers run.
*
+ * @example
+ * import { Console, Effect, Stream } from "effect"
+ *
+ * const program = Stream.fromEffect(Console.log("Application Logic.")).pipe(
+ * Stream.concat(Stream.finalizer(Console.log("Finalizing the stream"))),
+ * Stream.ensuring(
+ * Console.log("Doing some other works after stream's finalization")
+ * )
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(program)).then(console.log)
+ * // Application Logic.
+ * // Finalizing the stream
+ * // Doing some other works after stream's finalization
+ * // { _id: 'Chunk', values: [ undefined, undefined ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -1133,6 +1412,18 @@ export const execute: (effect: Effect.Effect) => Stream(evaluate: LazyArg>) => Stream n % 2 === 0))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }
+ *
* @since 2.0.0
* @category filtering
*/
@@ -1248,6 +1547,29 @@ export const filterMapWhileEffect: {
* Creates a one-element stream that never fails and executes the finalizer
* when it ends.
*
+ * @example
+ * import { Console, Effect, Stream } from "effect"
+ *
+ * const application = Stream.fromEffect(Console.log("Application Logic."))
+ *
+ * const deleteDir = (dir: string) => Console.log(`Deleting dir: ${dir}`)
+ *
+ * const program = application.pipe(
+ * Stream.concat(
+ * Stream.finalizer(
+ * deleteDir("tmp").pipe(
+ * Effect.andThen(Console.log("Temporary directory was deleted."))
+ * )
+ * )
+ * )
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(program)).then(console.log)
+ * // Application Logic.
+ * // Deleting dir: tmp
+ * // Temporary directory was deleted.
+ * // { _id: 'Chunk', values: [ undefined, undefined ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1408,6 +1730,22 @@ export const forever: (self: Stream) => Stream = inte
/**
* Creates a stream from an `AsyncIterable`.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const myAsyncIterable = async function*() {
+ * yield 1
+ * yield 2
+ * }
+ *
+ * const stream = Stream.fromAsyncIterable(
+ * myAsyncIterable(),
+ * (e) => new Error(String(e)) // Error Handling
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1437,6 +1775,15 @@ export const toChannel: (
/**
* Creates a stream from a `Chunk` of values.
*
+ * @example
+ * import { Chunk, Effect, Stream } from "effect"
+ *
+ * // Creating a stream with values from a single Chunk
+ * const stream = Stream.fromChunk(Chunk.make(1, 2, 3))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1477,6 +1824,15 @@ export const fromChunkQueue: (
/**
* Creates a stream from an arbitrary number of chunks.
*
+ * @example
+ * import { Chunk, Effect, Stream } from "effect"
+ *
+ * // Creating a stream with values from multiple Chunks
+ * const stream = Stream.fromChunks(Chunk.make(1, 2, 3), Chunk.make(4, 5, 6))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1486,6 +1842,14 @@ export const fromChunks: (...chunks: Array>) => Stream = in
* Either emits the success value of this effect or terminates the stream
* with the failure value of this effect.
*
+ * @example
+ * import { Effect, Random, Stream } from "effect"
+ *
+ * const stream = Stream.fromEffect(Random.nextInt)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // Example Output: { _id: 'Chunk', values: [ 922694024 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1530,6 +1894,16 @@ export const fromPubSub: {
/**
* Creates a new `Stream` from an iterable collection of values.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const numbers = [1, 2, 3]
+ *
+ * const stream = Stream.fromIterable(numbers)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1538,6 +1912,23 @@ export const fromIterable: (iterable: Iterable) => Stream = internal.fr
/**
* Creates a stream from an effect producing a value of type `Iterable`.
*
+ * @example
+ * import { Context, Effect, Stream } from "effect"
+ *
+ * class Database extends Context.Tag("Database")<
+ * Database,
+ * { readonly getUsers: Effect.Effect> }
+ * >() {}
+ *
+ * const getUsers = Database.pipe(Effect.andThen((_) => _.getUsers))
+ *
+ * const stream = Stream.fromIterableEffect(getUsers)
+ *
+ * // Effect.runPromise(
+ * // Stream.runCollect(stream.pipe(Stream.provideService(Database, { getUsers: Effect.succeed(["user1", "user2"]) })))
+ * // ).then(console.log)
+ * // { _id: 'Chunk', values: [ 'user1', 'user2' ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1614,6 +2005,19 @@ export const fromReadableStreamByob: (
* input. The stream will emit an element for each value output from the
* schedule, continuing for as long as the schedule continues.
*
+ * @example
+ * import { Effect, Schedule, Stream } from "effect"
+ *
+ * // Emits values every 1 second for a total of 5 emissions
+ * const schedule = Schedule.spaced("1 second").pipe(
+ * Schedule.compose(Schedule.recurs(5))
+ * )
+ *
+ * const stream = Stream.fromSchedule(schedule)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1635,6 +2039,35 @@ export const groupAdjacentBy: {
/**
* More powerful version of `Stream.groupByKey`.
*
+ * @example
+ * import { Chunk, Effect, GroupBy, Stream } from "effect"
+ *
+ * const groupByKeyResult = Stream.fromIterable([
+ * "Mary",
+ * "James",
+ * "Robert",
+ * "Patricia",
+ * "John",
+ * "Jennifer",
+ * "Rebecca",
+ * "Peter"
+ * ]).pipe(
+ * Stream.groupBy((name) => Effect.succeed([name.substring(0, 1), name]))
+ * )
+ *
+ * const stream = GroupBy.evaluate(groupByKeyResult, (key, stream) =>
+ * Stream.fromEffect(
+ * Stream.runCollect(stream).pipe(
+ * Effect.andThen((chunk) => [key, Chunk.size(chunk)] as const)
+ * )
+ * ))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // {
+ * // _id: 'Chunk',
+ * // values: [ [ 'M', 1 ], [ 'J', 3 ], [ 'R', 2 ], [ 'P', 2 ] ]
+ * // }
+ *
* @since 2.0.0
* @category grouping
*/
@@ -1706,6 +2139,22 @@ export const groupByKey: {
/**
* Partitions the stream with specified `chunkSize`.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.range(0, 8).pipe(Stream.grouped(3))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then((chunks) => console.log("%o", chunks))
+ * // {
+ * // _id: 'Chunk',
+ * // values: [
+ * // { _id: 'Chunk', values: [ 0, 1, 2, [length]: 3 ] },
+ * // { _id: 'Chunk', values: [ 3, 4, 5, [length]: 3 ] },
+ * // { _id: 'Chunk', values: [ 6, 7, 8, [length]: 3 ] },
+ * // [length]: 3
+ * // ]
+ * // }
+ *
* @since 2.0.0
* @category utils
*/
@@ -1718,6 +2167,43 @@ export const grouped: {
* Partitions the stream with the specified `chunkSize` or until the specified
* `duration` has passed, whichever is satisfied first.
*
+ * @example
+ * import { Chunk, Effect, Schedule, Stream } from "effect"
+ *
+ * const stream = Stream.range(0, 9).pipe(
+ * Stream.repeat(Schedule.spaced("1 second")),
+ * Stream.groupedWithin(18, "1.5 seconds"),
+ * Stream.take(3)
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then((chunks) => console.log(Chunk.toArray(chunks)))
+ * // [
+ * // {
+ * // _id: 'Chunk',
+ * // values: [
+ * // 0, 1, 2, 3, 4, 5, 6,
+ * // 7, 8, 9, 0, 1, 2, 3,
+ * // 4, 5, 6, 7
+ * // ]
+ * // },
+ * // {
+ * // _id: 'Chunk',
+ * // values: [
+ * // 8, 9, 0, 1, 2,
+ * // 3, 4, 5, 6, 7,
+ * // 8, 9
+ * // ]
+ * // },
+ * // {
+ * // _id: 'Chunk',
+ * // values: [
+ * // 0, 1, 2, 3, 4, 5, 6,
+ * // 7, 8, 9, 0, 1, 2, 3,
+ * // 4, 5, 6, 7
+ * // ]
+ * // }
+ * // ]
+ *
* @since 2.0.0
* @category utils
*/
@@ -1789,6 +2275,16 @@ export const identity: () => Stream = internal
* one stream is exhausted all remaining values in the other stream will be
* pulled.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const s1 = Stream.make(1, 2, 3)
+ * const s2 = Stream.make(4, 5, 6)
+ *
+ * const stream = Stream.interleave(s1, s2)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 4, 2, 5, 3, 6 ] }
* @since 2.0.0
* @category utils
*/
@@ -1806,6 +2302,25 @@ export const interleave: {
* stream are exhausted further requests for values from that stream will be
* ignored.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const s1 = Stream.make(1, 3, 5, 7, 9)
+ * const s2 = Stream.make(2, 4, 6, 8, 10)
+ *
+ * const booleanStream = Stream.make(true, false, false).pipe(Stream.forever)
+ *
+ * const stream = Stream.interleaveWith(s1, s2, booleanStream)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // {
+ * // _id: 'Chunk',
+ * // values: [
+ * // 1, 2, 4, 3, 6,
+ * // 8, 5, 10, 7, 9
+ * // ]
+ * // }
+ *
* @since 2.0.0
* @category utils
*/
@@ -1824,6 +2339,20 @@ export const interleaveWith: {
/**
* Intersperse stream with provided `element`.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.make(1, 2, 3, 4, 5).pipe(Stream.intersperse(0))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // {
+ * // _id: 'Chunk',
+ * // values: [
+ * // 1, 0, 2, 0, 3,
+ * // 0, 4, 0, 5
+ * // ]
+ * // }
+ *
* @since 2.0.0
* @category utils
*/
@@ -1835,6 +2364,27 @@ export const intersperse: {
/**
* Intersperse the specified element, also adding a prefix and a suffix.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.make(1, 2, 3, 4, 5).pipe(
+ * Stream.intersperseAffixes({
+ * start: "[",
+ * middle: "-",
+ * end: "]"
+ * })
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // {
+ * // _id: 'Chunk',
+ * // values: [
+ * // '[', 1, '-', 2, '-',
+ * // 3, '-', 4, '-', 5,
+ * // ']'
+ * // ]
+ * // }
+ *
* @since 2.0.0
* @category utils
*/
@@ -1896,6 +2446,15 @@ export const interruptWhenDeferred: {
* The infinite stream of iterative function application: a, f(a), f(f(a)),
* f(f(f(a))), ...
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * // An infinite Stream of numbers starting from 1 and incrementing
+ * const stream = Stream.iterate(1, (n) => n + 1)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(10)))).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1904,6 +2463,14 @@ export const iterate: (value: A, next: (value: A) => A) => Stream = intern
/**
* Creates a stream from an sequence of values.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.make(1, 2, 3)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -1912,6 +2479,14 @@ export const make: >(...as: As) => Stream = in
/**
* Transforms the elements of this stream using the supplied function.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.make(1, 2, 3).pipe(Stream.map((n) => n + 1))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 2, 3, 4 ] }
+ *
* @since 2.0.0
* @category mapping
*/
@@ -1923,6 +2498,18 @@ export const map: {
/**
* Statefully maps over the elements of this stream to produce new elements.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const runningTotal = (stream: Stream.Stream): Stream.Stream =>
+ * stream.pipe(Stream.mapAccum(0, (s, a) => [s + a, s + a]))
+ *
+ * // input: 0, 1, 2, 3, 4, 5, 6
+ * // Effect.runPromise(Stream.runCollect(runningTotal(Stream.range(0, 6)))).then(
+ * // console.log
+ * // )
+ * // { _id: "Chunk", values: [ 0, 1, 3, 6, 10, 15, 21 ] }
+ *
* @since 2.0.0
* @category mapping
*/
@@ -1998,6 +2585,17 @@ export const mapChunksEffect: {
* Maps each element to an iterable, and flattens the iterables into the
* output of this stream.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const numbers = Stream.make("1-2-3", "4-5", "6").pipe(
+ * Stream.mapConcat((s) => s.split("-")),
+ * Stream.map((s) => parseInt(s))
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(numbers)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }
+ *
* @since 2.0.0
* @category mapping
*/
@@ -2055,6 +2653,16 @@ export const mapConcatEffect: {
/**
* Maps over elements of the stream with the specified effectful function.
*
+ * @example
+ * import { Effect, Random, Stream } from "effect"
+ *
+ * const stream = Stream.make(10, 20, 30).pipe(
+ * Stream.mapEffect((n) => Random.nextIntBetween(0, n))
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // Example Output: { _id: 'Chunk', values: [ 7, 19, 8 ] }
+ *
* @since 2.0.0
* @category mapping
*/
@@ -2111,6 +2719,21 @@ export const mapErrorCause: {
* New produced stream will terminate when both specified stream terminate if
* no termination strategy is specified.
*
+ * @example
+ * import { Effect, Schedule, Stream } from "effect"
+ *
+ * const s1 = Stream.make(1, 2, 3).pipe(
+ * Stream.schedule(Schedule.spaced("100 millis"))
+ * )
+ * const s2 = Stream.make(4, 5, 6).pipe(
+ * Stream.schedule(Schedule.spaced("200 millis"))
+ * )
+ *
+ * const stream = Stream.merge(s1, s2)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -2157,6 +2780,24 @@ export const mergeAll: {
* New produced stream will terminate when both specified stream terminate if
* no termination strategy is specified.
*
+ * @example
+ * import { Effect, Schedule, Stream } from "effect"
+ *
+ * const s1 = Stream.make("1", "2", "3").pipe(
+ * Stream.schedule(Schedule.spaced("100 millis"))
+ * )
+ * const s2 = Stream.make(4.1, 5.3, 6.2).pipe(
+ * Stream.schedule(Schedule.spaced("200 millis"))
+ * )
+ *
+ * const stream = Stream.mergeWith(s1, s2, {
+ * onSelf: (s) => parseInt(s),
+ * onOther: (n) => Math.floor(n)
+ * })
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -2380,6 +3021,17 @@ export const orElseSucceed: {
* than the unfolding of the state. This is useful for embedding paginated
* APIs, hence the name.
*
+ * @example
+ * import { Effect, Option, Stream } from "effect"
+ *
+ * const stream = Stream.paginate(0, (n) => [
+ * n,
+ * n < 3 ? Option.some(n + 1) : Option.none()
+ * ])
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 0, 1, 2, 3 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -2430,6 +3082,25 @@ export const paginateEffect: (
* evaluated to false. The faster stream may advance by up to buffer elements
* further than the slower one.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const partition = Stream.range(1, 10).pipe(
+ * Stream.partition((n) => n % 2 === 0, { bufferSize: 5 })
+ * )
+ *
+ * const program = Effect.scoped(
+ * Effect.gen(function*() {
+ * const [evens, odds] = yield* partition
+ * console.log(yield* Stream.runCollect(evens))
+ * console.log(yield* Stream.runCollect(odds))
+ * })
+ * )
+ *
+ * // Effect.runPromise(program)
+ * // { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }
+ * // { _id: 'Chunk', values: [ 1, 3, 5, 7, 9 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -2462,6 +3133,28 @@ export const partition: {
* Split a stream by an effectful predicate. The faster stream may advance by
* up to buffer elements further than the slower one.
*
+ * @example
+ * import { Effect, Either, Stream } from "effect"
+ *
+ * const partition = Stream.range(1, 9).pipe(
+ * Stream.partitionEither(
+ * (n) => Effect.succeed(n % 2 === 0 ? Either.left(n) : Either.right(n)),
+ * { bufferSize: 5 }
+ * )
+ * )
+ *
+ * const program = Effect.scoped(
+ * Effect.gen(function*() {
+ * const [evens, odds] = yield* partition
+ * console.log(yield* Stream.runCollect(evens))
+ * console.log(yield* Stream.runCollect(odds))
+ * })
+ * )
+ *
+ * // Effect.runPromise(program)
+ * // { _id: 'Chunk', values: [ 2, 4, 6, 8 ] }
+ * // { _id: 'Chunk', values: [ 1, 3, 5, 7, 9 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -2667,6 +3360,15 @@ export const provideSomeLayer: {
/**
* Constructs a stream from a range of integers, including both endpoints.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * // A Stream with a range of numbers from 1 to 5
+ * const stream = Stream.range(1, 5)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -2714,6 +3416,14 @@ export const refineOrDieWith: {
* Repeats the entire stream using the specified schedule. The stream will
* execute normally, and then repeat again according to the provided schedule.
*
+ * @example
+ * import { Effect, Schedule, Stream } from "effect"
+ *
+ * const stream = Stream.repeat(Stream.succeed(1), Schedule.forever)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 1, 1, 1, 1 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -2726,6 +3436,14 @@ export const repeat: {
* Creates a stream from an effect producing a value of type `A` which repeats
* forever.
*
+ * @example
+ * import { Effect, Random, Stream } from "effect"
+ *
+ * const stream = Stream.repeatEffect(Random.nextInt)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
+ * // Example Output: { _id: 'Chunk', values: [ 3891571149, 4239494205, 2352981603, 2339111046, 1488052210 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -2756,6 +3474,22 @@ export const repeatEffectChunkOption: (
* Creates a stream from an effect producing values of type `A` until it fails
* with `None`.
*
+ * @example
+ * // In this example, we're draining an Iterator to create a stream from it
+ * import { Stream, Effect, Option } from "effect"
+ *
+ * const drainIterator = (it: Iterator): Stream.Stream =>
+ * Stream.repeatEffectOption(
+ * Effect.sync(() => it.next()).pipe(
+ * Effect.andThen((res) => {
+ * if (res.done) {
+ * return Effect.fail(Option.none())
+ * }
+ * return Effect.succeed(res.value)
+ * })
+ * )
+ * )
+ *
* @since 2.0.0
* @category constructors
*/
@@ -2837,6 +3571,14 @@ export const repeatElementsWith: {
/**
* Repeats the provided value infinitely.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.repeatValue(0)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
+ * // { _id: 'Chunk', values: [ 0, 0, 0, 0, 0 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -3276,6 +4018,14 @@ export const runSum: (self: Stream) => Effect.Effect a + b))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15, 21 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -3372,6 +4122,24 @@ export const scheduleWith: {
/**
* Creates a single-valued stream from a scoped resource.
*
+ * @example
+ * import { Console, Effect, Stream } from "effect"
+ *
+ * // Creating a single-valued stream from a scoped resource
+ * const stream = Stream.scoped(
+ * Effect.acquireUseRelease(
+ * Console.log("acquire"),
+ * () => Console.log("use"),
+ * () => Console.log("release")
+ * )
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // acquire
+ * // use
+ * // release
+ * // { _id: 'Chunk', values: [ undefined ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -3488,6 +4256,15 @@ export const splitLines: (self: Stream) => Stream(stream: LazyArg>) => Stream n + 1), 5)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -3523,6 +4308,14 @@ export const take: {
/**
* Takes the last specified number of elements from this stream.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.takeRight(Stream.make(1, 2, 3, 4, 5, 6), 3)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 4, 5, 6 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -3535,6 +4328,14 @@ export const takeRight: {
* Takes all elements of the stream until the specified predicate evaluates to
* `true`.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.takeUntil(Stream.iterate(0, (n) => n + 1), (n) => n === 4)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -3564,6 +4365,14 @@ export const takeUntilEffect: {
* Takes all elements of the stream for as long as the specified predicate
* evaluates to `true`.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.takeWhile(Stream.iterate(0, (n) => n + 1), (n) => n < 5)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -3577,6 +4386,24 @@ export const takeWhile: {
/**
* Adds an effect to consumption of every element of the stream.
*
+ * @example
+ * import { Console, Effect, Stream } from "effect"
+ *
+ * const stream = Stream.make(1, 2, 3).pipe(
+ * Stream.tap((n) => Console.log(`before mapping: ${n}`)),
+ * Stream.map((n) => n * 2),
+ * Stream.tap((n) => Console.log(`after mapping: ${n}`))
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // before mapping: 1
+ * // after mapping: 2
+ * // before mapping: 2
+ * // after mapping: 4
+ * // before mapping: 3
+ * // after mapping: 6
+ * // { _id: 'Chunk', values: [ 2, 4, 6 ] }
+ *
* @since 2.0.0
* @category sequencing
*/
@@ -3657,7 +4484,7 @@ export const tapSink: {
* parameters using the token bucket algorithm. Allows for burst in the
* processing of elements by allowing the token bucket to accumulate tokens up
* to a `units + burst` threshold. The weight of each chunk is determined by
- * the `costFn` function.
+ * the `cost` function.
*
* If using the "enforce" strategy, chunks that do not meet the bandwidth
* constraints are dropped. If using the "shape" strategy, chunks are delayed
@@ -3665,6 +4492,43 @@ export const tapSink: {
*
* Defaults to the "shape" strategy.
*
+ * @example
+ * import { Chunk, Effect, Schedule, Stream } from "effect"
+ *
+ * let last = Date.now()
+ * const log = (message: string) =>
+ * Effect.sync(() => {
+ * const end = Date.now()
+ * console.log(`${message} after ${end - last}ms`)
+ * last = end
+ * })
+ *
+ * const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe(
+ * Stream.take(6),
+ * Stream.tap((n) => log(`Received ${n}`)),
+ * Stream.throttle({
+ * cost: Chunk.size,
+ * duration: "100 millis",
+ * units: 1
+ * }),
+ * Stream.tap((n) => log(`> Emitted ${n}`))
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // Received 0 after 56ms
+ * // > Emitted 0 after 0ms
+ * // Received 1 after 52ms
+ * // > Emitted 1 after 48ms
+ * // Received 2 after 52ms
+ * // > Emitted 2 after 49ms
+ * // Received 3 after 52ms
+ * // > Emitted 3 after 48ms
+ * // Received 4 after 52ms
+ * // > Emitted 4 after 47ms
+ * // Received 5 after 52ms
+ * // > Emitted 5 after 49ms
+ * // { _id: 'Chunk', values: [ 0, 1, 2, 3, 4, 5 ] }
+ *
* @since 2.0.0
* @category utils
*/
@@ -3731,6 +4595,27 @@ export const throttleEffect: {
/**
* A stream that emits void values spaced by the specified duration.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * let last = Date.now()
+ * const log = (message: string) =>
+ * Effect.sync(() => {
+ * const end = Date.now()
+ * console.log(`${message} after ${end - last}ms`)
+ * last = end
+ * })
+ *
+ * const stream = Stream.tick("1 seconds").pipe(Stream.tap(() => log("tick")))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
+ * // tick after 4ms
+ * // tick after 1003ms
+ * // tick after 1001ms
+ * // tick after 1002ms
+ * // tick after 1002ms
+ * // { _id: 'Chunk', values: [ undefined, undefined, undefined, undefined, undefined ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -3820,6 +4705,32 @@ export const toPubSub: {
* finished, or with Some error if it fails, otherwise it returns a chunk of
* the stream's output.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * // Simulate a chunked stream
+ * const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(Stream.rechunk(2))
+ *
+ * const program = Effect.gen(function*() {
+ * // Create an effect to get data chunks from the stream
+ * const getChunk = yield* Stream.toPull(stream)
+ *
+ * // Continuously fetch and process chunks
+ * while (true) {
+ * const chunk = yield* getChunk
+ * console.log(chunk)
+ * }
+ * })
+ *
+ * // Effect.runPromise(Effect.scoped(program)).then(console.log, console.error)
+ * // { _id: 'Chunk', values: [ 1, 2 ] }
+ * // { _id: 'Chunk', values: [ 3, 4 ] }
+ * // { _id: 'Chunk', values: [ 5 ] }
+ * // (FiberFailure) Error: {
+ * // "_id": "Option",
+ * // "_tag": "None"
+ * // }
+ *
* @since 2.0.0
* @category destructors
*/
@@ -3947,6 +4858,14 @@ export const transduce: {
/**
* Creates a stream by peeling off the "layers" of a value of type `S`.
*
+ * @example
+ * import { Effect, Option, Stream } from "effect"
+ *
+ * const stream = Stream.unfold(1, (n) => Option.some([n, n + 1]))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -3979,6 +4898,17 @@ export const unfoldChunkEffect: (
* Creates a stream by effectfully peeling off the "layers" of a value of type
* `S`.
*
+ * @example
+ * import { Effect, Option, Random, Stream } from "effect"
+ *
+ * const stream = Stream.unfoldEffect(1, (n) =>
+ * Random.nextBoolean.pipe(
+ * Effect.map((b) => (b ? Option.some([n, -n]) : Option.some([n, n])))
+ * ))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
+ * // { _id: 'Chunk', values: [ 1, -1, -1, -1, -1 ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -3992,6 +4922,14 @@ export {
/**
* A stream that contains a single `void` value.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.void
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ undefined ] }
+ *
* @since 2.0.0
* @category constructors
*/
@@ -4112,6 +5050,18 @@ export const withSpan: {
*
* The new stream will end when one of the sides ends.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * // We create two streams and zip them together.
+ * const stream = Stream.zip(
+ * Stream.make(1, 2, 3, 4, 5, 6),
+ * Stream.make("a", "b", "c")
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ [ 1, 'a' ], [ 2, 'b' ], [ 3, 'c' ] ] }
+ *
* @since 2.0.0
* @category zipping
*/
@@ -4146,6 +5096,18 @@ export const zipFlatten: {
* The defaults `defaultLeft` and `defaultRight` will be used if the streams
* have different lengths and one of the streams has ended before the other.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.zipAll(Stream.make(1, 2, 3, 4, 5, 6), {
+ * other: Stream.make("a", "b", "c"),
+ * defaultSelf: 0,
+ * defaultOther: "x"
+ * })
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: "Chunk", values: [ [ 1, "a" ], [ 2, "b" ], [ 3, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ] ] }
+ *
* @since 2.0.0
* @category zipping
*/
@@ -4331,6 +5293,19 @@ export const zipAllSortedByKeyWith: {
* The functions `left` and `right` will be used if the streams have different
* lengths and one of the streams has ended before the other.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.zipAllWith(Stream.make(1, 2, 3, 4, 5, 6), {
+ * other: Stream.make("a", "b", "c"),
+ * onSelf: (n) => [n, "x"],
+ * onOther: (s) => [0, s],
+ * onBoth: (n, s) => [n - s.length, s]
+ * })
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: "Chunk", values: [ [ 0, "a" ], [ 1, "b" ], [ 2, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ] ] }
+ *
* @since 2.0.0
* @category zipping
*/
@@ -4363,6 +5338,22 @@ export const zipAllWith: {
* that emitted elements that are not the last value in chunks will never be
* used for zipping.
*
+ * @example
+ * import { Effect, Schedule, Stream } from "effect"
+ *
+ * const s1 = Stream.make(1, 2, 3).pipe(
+ * Stream.schedule(Schedule.spaced("1 second"))
+ * )
+ *
+ * const s2 = Stream.make("a", "b", "c", "d").pipe(
+ * Stream.schedule(Schedule.spaced("500 millis"))
+ * )
+ *
+ * const stream = Stream.zipLatest(s1, s2)
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: "Chunk", values: [ [ 1, "a" ], [ 1, "b" ], [ 2, "b" ], [ 2, "c" ], [ 2, "d" ], [ 3, "d" ] ] }
+ *
* @since 2.0.0
* @category zipping
*/
@@ -4388,7 +5379,7 @@ export const zipLatest: {
* Stream.fromSchedule(Schedule.spaced('4 millis')),
* ).pipe(Stream.take(6), Stream.tap(Console.log))
*
- * Effect.runPromise(Stream.runDrain(stream))
+ * // Effect.runPromise(Stream.runDrain(stream))
* // Output:
* // [ 0, 0, 0 ]
* // [ 1, 0, 0 ]
@@ -4468,6 +5459,19 @@ export const zipRight: {
*
* The new stream will end when one of the sides ends.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * // We create two streams and zip them with custom logic.
+ * const stream = Stream.zipWith(
+ * Stream.make(1, 2, 3, 4, 5, 6),
+ * Stream.make("a", "b", "c"),
+ * (n, s) => [n - s.length, s]
+ * )
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
+ * // { _id: 'Chunk', values: [ [ 0, 'a' ], [ 1, 'b' ], [ 2, 'c' ] ] }
+ *
* @since 2.0.0
* @category zipping
*/
@@ -4513,6 +5517,19 @@ export const zipWithChunks: {
/**
* Zips each element with the next element if present.
*
+ * @example
+ * import { Chunk, Effect, Stream } from "effect"
+ *
+ * const stream = Stream.zipWithNext(Stream.make(1, 2, 3, 4))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
+ * // [
+ * // [ 1, { _id: 'Option', _tag: 'Some', value: 2 } ],
+ * // [ 2, { _id: 'Option', _tag: 'Some', value: 3 } ],
+ * // [ 3, { _id: 'Option', _tag: 'Some', value: 4 } ],
+ * // [ 4, { _id: 'Option', _tag: 'None' } ]
+ * // ]
+ *
* @since 2.0.0
* @category zipping
*/
@@ -4522,6 +5539,19 @@ export const zipWithNext: (self: Stream) => Stream<[A, Option.
* Zips each element with the previous element. Initially accompanied by
* `None`.
*
+ * @example
+ * import { Chunk, Effect, Stream } from "effect"
+ *
+ * const stream = Stream.zipWithPrevious(Stream.make(1, 2, 3, 4))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
+ * // [
+ * // [ { _id: 'Option', _tag: 'None' }, 1 ],
+ * // [ { _id: 'Option', _tag: 'Some', value: 1 }, 2 ],
+ * // [ { _id: 'Option', _tag: 'Some', value: 2 }, 3 ],
+ * // [ { _id: 'Option', _tag: 'Some', value: 3 }, 4 ]
+ * // ]
+ *
* @since 2.0.0
* @category zipping
*/
@@ -4531,6 +5561,35 @@ export const zipWithPrevious: (self: Stream) => Stream<[Option
/**
* Zips each element with both the previous and next element.
*
+ * @example
+ * import { Chunk, Effect, Stream } from "effect"
+ *
+ * const stream = Stream.zipWithPreviousAndNext(Stream.make(1, 2, 3, 4))
+ *
+ * // Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
+ * // [
+ * // [
+ * // { _id: 'Option', _tag: 'None' },
+ * // 1,
+ * // { _id: 'Option', _tag: 'Some', value: 2 }
+ * // ],
+ * // [
+ * // { _id: 'Option', _tag: 'Some', value: 1 },
+ * // 2,
+ * // { _id: 'Option', _tag: 'Some', value: 3 }
+ * // ],
+ * // [
+ * // { _id: 'Option', _tag: 'Some', value: 2 },
+ * // 3,
+ * // { _id: 'Option', _tag: 'Some', value: 4 }
+ * // ],
+ * // [
+ * // { _id: 'Option', _tag: 'Some', value: 3 },
+ * // 4,
+ * // { _id: 'Option', _tag: 'None' }
+ * // ]
+ * // ]
+ *
* @since 2.0.0
* @category zipping
*/
@@ -4541,6 +5600,19 @@ export const zipWithPreviousAndNext: (
/**
* Zips this stream together with the index of elements.
*
+ * @example
+ * import { Effect, Stream } from "effect"
+ *
+ * const stream = Stream.make("Mary", "James", "Robert", "Patricia")
+ *
+ * const indexedStream = Stream.zipWithIndex(stream)
+ *
+ * // Effect.runPromise(Stream.runCollect(indexedStream)).then(console.log)
+ * // {
+ * // _id: 'Chunk',
+ * // values: [ [ 'Mary', 0 ], [ 'James', 1 ], [ 'Robert', 2 ], [ 'Patricia', 3 ] ]
+ * // }
+ *
* @since 2.0.0
* @category zipping
*/