diff --git a/.changeset/stream-tap-start.md b/.changeset/stream-tap-start.md new file mode 100644 index 0000000000..e452ac879c --- /dev/null +++ b/.changeset/stream-tap-start.md @@ -0,0 +1,22 @@ +--- +"effect": minor +--- + +Implement `Stream.tapStart` that adds an effect to be executed at the start of the stream. + +```ts +import { Console, Effect, Stream } from "effect"; + +const stream = Stream.make(1, 2, 3).pipe( + Stream.tapStart(Console.log("Stream started")), + Stream.map((n) => n * 2), + Stream.tap((n) => Console.log(`after mapping: ${n}`)) +) + +Effect. runPromise(Stream. runCollect(stream)).then(console. log) +// Stream started +// after mapping: 2 +// after mapping: 4 +// after mapping: 6 +// { _id: 'Chunk', values: [ 2, 4, 6 ] } +``` diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 338e2ca1ae..cdc328ece3 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -4536,6 +4536,38 @@ export const tapSink: { (self: Stream, sink: Sink.Sink): Stream } = internal.tapSink +/** + * Adds an effect to be executed at the start of the stream. + * + * @example + * import { Console, Effect, Stream } from "effect" + * + * const stream = Stream.make(1, 2, 3).pipe( + * Stream.tapStart(Console.log("Stream started")), + * Stream.map((n) => n * 2), + * Stream.tap((n) => Console.log(`after mapping: ${n}`)) + * ) + * + * // Effect.runPromise(Stream.runCollect(stream)).then(console.log) + * // Stream started + * // after mapping: 2 + * // after mapping: 4 + * // after mapping: 6 + * // { _id: 'Chunk', values: [ 2, 4, 6 ] } + * + * @since 3.6.0 + * @category sequencing + */ +export const tapStart: { + <_, E2, R2>( + effect: Effect.Effect<_, E2, R2> + ): (self: Stream.Stream) => Stream.Stream + ( + self: Stream.Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream.Stream +} = internal.tapStart + /** * Delays the chunks of this stream according to the given bandwidth * parameters using the token bucket algorithm. Allows for burst in the diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 402408abae..d58aea59ac 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -6194,6 +6194,23 @@ export const tap: { ): Stream.Stream => mapEffectSequential(self, (a) => Effect.as(f(a), a)) ) +/** @internal */ +export const tapStart: { + <_, E2, R2>( + effect: Effect.Effect<_, E2, R2> + ): (self: Stream.Stream) => Stream.Stream + ( + self: Stream.Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream.Stream +} = dual( + 2, + ( + self: Stream.Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream.Stream => concat(drain(fromEffect(effect)), self) +) + /** @internal */ export const tapBoth: { ( diff --git a/packages/effect/test/Stream/tapping.test.ts b/packages/effect/test/Stream/tapping.test.ts index 2d47b72940..9588ce545a 100644 --- a/packages/effect/test/Stream/tapping.test.ts +++ b/packages/effect/test/Stream/tapping.test.ts @@ -97,7 +97,11 @@ describe("Stream", () => { const result = yield* $( Stream.make(1, 2, 3), Stream.tapBoth({ - onSuccess: (n) => pipe(Effect.fail("error"), Effect.when(() => n === 3)), + onSuccess: (n) => + pipe( + Effect.fail("error"), + Effect.when(() => n === 3) + ), onFailure: () => Effect.void }), Stream.either, @@ -184,4 +188,16 @@ describe("Stream", () => { const result = yield* $(Ref.get(ref)) assert.strictEqual(result, 6) })) + + it.effect("tapStart", () => + Effect.gen(function*($) { + let counter = 0 + const result = yield* $( + Stream.make(1, 1), + Stream.tapStart(Effect.sync(() => counter++)), + Stream.runCollect + ) + assert.strictEqual(counter, 1) + assert.deepStrictEqual(Array.from(result), [1, 1]) + })) })