Skip to content

Commit

Permalink
feat(Stream): onEnd (#3303)
Browse files Browse the repository at this point in the history
  • Loading branch information
dilame authored Jul 19, 2024
1 parent 1fd83bd commit 80c8158
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 0 deletions.
22 changes: 22 additions & 0 deletions .changeset/stream-on-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
"effect": minor
---

Implement `Stream.onEnd` that adds an effect to be executed at the end of the stream.

```ts
import { Console, Effect, Stream } from "effect";

const stream = Stream.make(1, 2, 3).pipe(
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`)),
Stream.onEnd(Console.log("Stream ended"))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// after mapping: 2
// after mapping: 4
// after mapping: 6
// Stream ended
// { _id: 'Chunk', values: [ 2, 4, 6 ] }
```
32 changes: 32 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2927,6 +2927,38 @@ export const mkString: <E, R>(self: Stream<string, E, R>) => Effect.Effect<strin
*/
export const never: Stream<never> = internal.never

/**
* Adds an effect to be executed at the end of the stream.
*
* @example
* import { Console, Effect, Stream } from "effect"
*
* const stream = Stream.make(1, 2, 3).pipe(
* Stream.map((n) => n * 2),
* Stream.tap((n) => Console.log(`after mapping: ${n}`)),
* Stream.onEnd(Console.log("Stream ended"))
* )
*
* Effect.runPromise(Stream.runCollect(stream)).then(console.log)
* // after mapping: 2
* // after mapping: 4
* // after mapping: 6
* // Stream ended
* // { _id: 'Chunk', values: [ 2, 4, 6 ] }
*
* @since 3.6.0
* @category sequencing
*/
export const onEnd: {
<_, E2, R2>(
effect: Effect.Effect<_, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, _, E2, R2>(
self: Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream<A, E | E2, R | R2>
} = internal.onEnd

/**
* Runs the specified effect if this stream fails, providing the error to the
* effect if it exists.
Expand Down
17 changes: 17 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4162,6 +4162,23 @@ export const mkString = <E, R>(self: Stream.Stream<string, E, R>): Effect.Effect
/** @internal */
export const never: Stream.Stream<never> = fromEffect(Effect.never)

/** @internal */
export const onEnd: {
<_, E2, R2>(
effect: Effect.Effect<_, E2, R2>
): <A, E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A, E2 | E, R2 | R>
<A, E, R, _, E2, R2>(
self: Stream.Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream.Stream<A, E | E2, R | R2>
} = dual(
2,
<A, E, R, _, E2, R2>(
self: Stream.Stream<A, E, R>,
effect: Effect.Effect<_, E2, R2>
): Stream.Stream<A, E | E2, R | R2> => concat(self, drain(fromEffect(effect)))
)

/** @internal */
export const onError = dual<
<E, X, R2>(
Expand Down
12 changes: 12 additions & 0 deletions packages/effect/test/Stream/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,16 @@ describe("Stream", () => {
assert.strictEqual(counter, 1)
assert.deepStrictEqual(Array.from(result), [1, 1])
}))

it.effect("onEnd", () =>
Effect.gen(function*($) {
let counter = 0
const result = yield* $(
Stream.make(1, 2, 3),
Stream.onEnd(Effect.sync(() => counter++)),
Stream.runCollect
)
assert.strictEqual(counter, 1)
assert.deepStrictEqual(Array.from(result), [1, 2, 3])
}))
})

0 comments on commit 80c8158

Please sign in to comment.