Skip to content

Commit

Permalink
feat(Stream): tapStart
Browse files Browse the repository at this point in the history
  • Loading branch information
dilame committed Jul 18, 2024
1 parent 62ce405 commit 7a95371
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 1 deletion.
22 changes: 22 additions & 0 deletions .changeset/stream-tap-start.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
"effect": minor
---

Implement `Stream.tapStart` that adds an effect to be executed at the start of the stream.

```ts
import { Console, Effect, Stream } from "effect";

const stream = Stream.make(1, 2, 3).pipe(
Stream.tapStart(Console.log("Stream started")),
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`))
)

Effect. runPromise(Stream. runCollect(stream)).then(console. log)
// Stream started
// after mapping: 2
// after mapping: 4
// after mapping: 6
// { _id: 'Chunk', values: [ 2, 4, 6 ] }
```
32 changes: 32 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4536,6 +4536,38 @@ export const tapSink: {
<A, E, R, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<unknown, A, unknown, E2, R2>): Stream<A, E | E2, R | R2>
} = internal.tapSink

/**
* Adds an effect to be executed at the start of the stream.
*
* @example
* import { Console, Effect, Stream } from "effect"
*
* const stream = Stream.make(1, 2, 3).pipe(
* Stream.tapStart(Console.log("Stream started")),
* Stream.map((n) => n * 2),
* Stream.tap((n) => Console.log(`after mapping: ${n}`))
* )
*
* // Effect.runPromise(Stream.runCollect(stream)).then(console.log)
* // Stream started
* // after mapping: 2
* // after mapping: 4
* // after mapping: 6
* // { _id: 'Chunk', values: [ 2, 4, 6 ] }
*
* @since 3.6.0
* @category sequencing
*/
export const tapStart: {
<_, E2, R2>(
effect: Effect.Effect<_, E2, R2>
): <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E2 | E, R2 | R>
<A, E, R, _, E2, R2>(
self: Stream.Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream.Stream<A, E | E2, R | R2>
} = internal.tapStart

/**
* Delays the chunks of this stream according to the given bandwidth
* parameters using the token bucket algorithm. Allows for burst in the
Expand Down
17 changes: 17 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6194,6 +6194,23 @@ export const tap: {
): Stream.Stream<A, E | E2, R | R2> => mapEffectSequential(self, (a) => Effect.as(f(a), a))
)

/** @internal */
export const tapStart: {
<_, E2, R2>(
effect: Effect.Effect<_, E2, R2>
): <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E2 | E, R2 | R>
<A, E, R, _, E2, R2>(
self: Stream.Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream.Stream<A, E | E2, R | R2>
} = dual(
2,
<A, E, R, _, E2, R2>(
self: Stream.Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream.Stream<A, E | E2, R | R2> => concat(drain(fromEffect(effect)), self)
)

/** @internal */
export const tapBoth: {
<E, X1, E2, R2, A, X2, E3, R3>(
Expand Down
18 changes: 17 additions & 1 deletion packages/effect/test/Stream/tapping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ describe("Stream", () => {
const result = yield* $(
Stream.make(1, 2, 3),
Stream.tapBoth({
onSuccess: (n) => pipe(Effect.fail("error"), Effect.when(() => n === 3)),
onSuccess: (n) =>
pipe(
Effect.fail("error"),
Effect.when(() => n === 3)
),
onFailure: () => Effect.void
}),
Stream.either,
Expand Down Expand Up @@ -184,4 +188,16 @@ describe("Stream", () => {
const result = yield* $(Ref.get(ref))
assert.strictEqual(result, 6)
}))

it.effect("tapStart", () =>
Effect.gen(function*($) {
let counter = 0
const result = yield* $(
Stream.make(1, 1),
Stream.tapStart(Effect.sync(() => counter++)),
Stream.runCollect
)
assert.strictEqual(counter, 1)
assert.deepStrictEqual(Array.from(result), [1, 1])
}))
})

0 comments on commit 7a95371

Please sign in to comment.