From dc6263810f8b507eb8ba1b6f949d39103a26a196 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 8 Jul 2024 06:01:33 +0300 Subject: [PATCH] fix(Stream|Channel): exclude `Scope` from the resulting effect env of all the non-scoped `.run*` methods (#3190) Co-authored-by: Tim --- .changeset/stream-channel-run-env-fix.md | 9 ++ packages/effect/src/Channel.ts | 6 +- packages/effect/src/Stream.ts | 36 ++++-- packages/effect/src/internal/channel.ts | 7 +- .../src/internal/channel/channelExecutor.ts | 2 +- packages/effect/src/internal/stream.ts | 114 +++++++++++------- 6 files changed, 113 insertions(+), 61 deletions(-) create mode 100644 .changeset/stream-channel-run-env-fix.md diff --git a/.changeset/stream-channel-run-env-fix.md b/.changeset/stream-channel-run-env-fix.md new file mode 100644 index 0000000000..d4995b0a0c --- /dev/null +++ b/.changeset/stream-channel-run-env-fix.md @@ -0,0 +1,9 @@ +--- +"effect": minor +--- + +Ensure `Scope` is excluded from `R` in the `Channel` / `Stream` `run*` functions. + +This fix ensures that `Scope` is now properly excluded from the resulting effect environment. +The affected functions include `run`, `runCollect`, `runCount`, `runDrain` and other non-scoped `run*` in both `Stream` and `Channel` modules. +This fix brings the type declaration in line with the runtime implementation. diff --git a/packages/effect/src/Channel.ts b/packages/effect/src/Channel.ts index d8e369eafc..723d004101 100644 --- a/packages/effect/src/Channel.ts +++ b/packages/effect/src/Channel.ts @@ -1916,7 +1916,7 @@ export const repeated: ( */ export const run: ( self: Channel -) => Effect.Effect = channel.run +) => Effect.Effect> = channel.run /** * Run the channel until it finishes with a done value or fails with an error @@ -1929,7 +1929,7 @@ export const run: ( */ export const runCollect: ( self: Channel -) => Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Env> = channel.runCollect +) => Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Exclude> = channel.runCollect /** * Runs a channel until the end is received. @@ -1939,7 +1939,7 @@ export const runCollect: ( */ export const runDrain: ( self: Channel -) => Effect.Effect = channel.runDrain +) => Effect.Effect> = channel.runDrain /** * Use a scoped effect to emit an output element. diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index c8da81cfcf..94bfb59f84 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2962,7 +2962,7 @@ export const run: { ( self: Stream, sink: Sink.Sink - ): Effect.Effect + ): Effect.Effect> } = internal.run /** @@ -2971,7 +2971,8 @@ export const run: { * @since 2.0.0 * @category destructors */ -export const runCollect: (self: Stream) => Effect.Effect, E, R> = internal.runCollect +export const runCollect: (self: Stream) => Effect.Effect, E, Exclude> = + internal.runCollect /** * Runs the stream and emits the number of elements processed @@ -2979,7 +2980,8 @@ export const runCollect: (self: Stream) => Effect.Effect(self: Stream) => Effect.Effect = internal.runCount +export const runCount: (self: Stream) => Effect.Effect> = + internal.runCount /** * Runs the stream only for its effects. The emitted elements are discarded. @@ -2987,7 +2989,8 @@ export const runCount: (self: Stream) => Effect.Effect(self: Stream) => Effect.Effect = internal.runDrain +export const runDrain: (self: Stream) => Effect.Effect> = + internal.runDrain /** * Executes a pure fold over the stream of values - reduces all elements in @@ -2997,8 +3000,8 @@ export const runDrain: (self: Stream) => Effect.Effect(s: S, f: (s: S, a: A) => S): (self: Stream) => Effect.Effect - (self: Stream, s: S, f: (s: S, a: A) => S): Effect.Effect + (s: S, f: (s: S, a: A) => S): (self: Stream) => Effect.Effect> + (self: Stream, s: S, f: (s: S, a: A) => S): Effect.Effect> } = internal.runFold /** @@ -3011,12 +3014,12 @@ export const runFoldEffect: { ( s: S, f: (s: S, a: A) => Effect.Effect - ): (self: Stream) => Effect.Effect + ): (self: Stream) => Effect.Effect> ( self: Stream, s: S, f: (s: S, a: A) => Effect.Effect - ): Effect.Effect + ): Effect.Effect> } = internal.runFoldEffect /** @@ -3058,8 +3061,17 @@ export const runFoldScopedEffect: { * @category destructors */ export const runFoldWhile: { - (s: S, cont: Predicate, f: (s: S, a: A) => S): (self: Stream) => Effect.Effect - (self: Stream, s: S, cont: Predicate, f: (s: S, a: A) => S): Effect.Effect + ( + s: S, + cont: Predicate, + f: (s: S, a: A) => S + ): (self: Stream) => Effect.Effect> + ( + self: Stream, + s: S, + cont: Predicate, + f: (s: S, a: A) => S + ): Effect.Effect> } = internal.runFoldWhile /** @@ -3074,13 +3086,13 @@ export const runFoldWhileEffect: { s: S, cont: Predicate, f: (s: S, a: A) => Effect.Effect - ): (self: Stream) => Effect.Effect + ): (self: Stream) => Effect.Effect> ( self: Stream, s: S, cont: Predicate, f: (s: S, a: A) => Effect.Effect - ): Effect.Effect + ): Effect.Effect> } = internal.runFoldWhileEffect /** diff --git a/packages/effect/src/internal/channel.ts b/packages/effect/src/internal/channel.ts index f5039f4c2a..ecaf38072c 100644 --- a/packages/effect/src/internal/channel.ts +++ b/packages/effect/src/internal/channel.ts @@ -2052,17 +2052,18 @@ export const repeated = ( /** @internal */ export const run = ( self: Channel.Channel -): Effect.Effect => Effect.scoped(executor.runScoped(self)) +): Effect.Effect> => Effect.scoped(executor.runScoped(self)) /** @internal */ export const runCollect = ( self: Channel.Channel -): Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Env> => executor.run(core.collectElements(self)) +): Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Exclude> => + executor.run(core.collectElements(self)) /** @internal */ export const runDrain = ( self: Channel.Channel -): Effect.Effect => executor.run(drain(self)) +): Effect.Effect> => executor.run(drain(self)) /** @internal */ export const scoped = ( diff --git a/packages/effect/src/internal/channel/channelExecutor.ts b/packages/effect/src/internal/channel/channelExecutor.ts index 64b5bfb83c..fa1cc76dd7 100644 --- a/packages/effect/src/internal/channel/channelExecutor.ts +++ b/packages/effect/src/internal/channel/channelExecutor.ts @@ -1095,7 +1095,7 @@ export const readUpstream = ( /** @internal */ export const run = ( self: Channel.Channel -): Effect.Effect => pipe(runScoped(self), Effect.scoped) +): Effect.Effect> => pipe(runScoped(self), Effect.scoped) /** @internal */ export const runScoped = ( diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 15c757e185..1d32a02a16 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -5128,37 +5128,44 @@ export const retry = dual< export const run = dual< ( sink: Sink.Sink - ) => (self: Stream.Stream) => Effect.Effect, + ) => (self: Stream.Stream) => Effect.Effect>, ( self: Stream.Stream, sink: Sink.Sink - ) => Effect.Effect + ) => Effect.Effect> >(2, ( self: Stream.Stream, sink: Sink.Sink -): Effect.Effect => +): Effect.Effect> => pipe(toChannel(self), channel.pipeToOrFail(_sink.toChannel(sink)), channel.runDrain)) /** @internal */ -export const runCollect = (self: Stream.Stream): Effect.Effect, E, R> => - pipe(self, run(_sink.collectAll())) +export const runCollect = ( + self: Stream.Stream +): Effect.Effect, E, Exclude> => pipe(self, run(_sink.collectAll())) /** @internal */ -export const runCount = (self: Stream.Stream): Effect.Effect => +export const runCount = (self: Stream.Stream): Effect.Effect> => pipe(self, run(_sink.count)) /** @internal */ -export const runDrain = (self: Stream.Stream): Effect.Effect => +export const runDrain = (self: Stream.Stream): Effect.Effect> => pipe(self, run(_sink.drain)) /** @internal */ export const runFold = dual< - (s: S, f: (s: S, a: A) => S) => (self: Stream.Stream) => Effect.Effect, - (self: Stream.Stream, s: S, f: (s: S, a: A) => S) => Effect.Effect + ( + s: S, + f: (s: S, a: A) => S + ) => (self: Stream.Stream) => Effect.Effect>, + (self: Stream.Stream, s: S, f: (s: S, a: A) => S) => Effect.Effect> >( 3, - (self: Stream.Stream, s: S, f: (s: S, a: A) => S): Effect.Effect => - pipe(self, runFoldWhileScoped(s, constTrue, f), Effect.scoped) + ( + self: Stream.Stream, + s: S, + f: (s: S, a: A) => S + ): Effect.Effect> => pipe(self, runFoldWhileScoped(s, constTrue, f), Effect.scoped) ) /** @internal */ @@ -5166,17 +5173,18 @@ export const runFoldEffect = dual< ( s: S, f: (s: S, a: A) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect, + ) => (self: Stream.Stream) => Effect.Effect>, ( self: Stream.Stream, s: S, f: (s: S, a: A) => Effect.Effect - ) => Effect.Effect + ) => Effect.Effect> >(3, ( self: Stream.Stream, s: S, f: (s: S, a: A) => Effect.Effect -): Effect.Effect => pipe(self, runFoldWhileScopedEffect(s, constTrue, f), Effect.scoped)) +): Effect.Effect> => + pipe(self, runFoldWhileScopedEffect(s, constTrue, f), Effect.scoped)) /** @internal */ export const runFoldScoped = dual< @@ -5211,14 +5219,19 @@ export const runFoldWhile = dual< s: S, cont: Predicate, f: (s: S, a: A) => S - ) => (self: Stream.Stream) => Effect.Effect, - (self: Stream.Stream, s: S, cont: Predicate, f: (s: S, a: A) => S) => Effect.Effect + ) => (self: Stream.Stream) => Effect.Effect>, + ( + self: Stream.Stream, + s: S, + cont: Predicate, + f: (s: S, a: A) => S + ) => Effect.Effect> >(4, ( self: Stream.Stream, s: S, cont: Predicate, f: (s: S, a: A) => S -): Effect.Effect => pipe(self, runFoldWhileScoped(s, cont, f), Effect.scoped)) +): Effect.Effect> => pipe(self, runFoldWhileScoped(s, cont, f), Effect.scoped)) /** @internal */ export const runFoldWhileEffect = dual< @@ -5226,19 +5239,20 @@ export const runFoldWhileEffect = dual< s: S, cont: Predicate, f: (s: S, a: A) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect, + ) => (self: Stream.Stream) => Effect.Effect>, ( self: Stream.Stream, s: S, cont: Predicate, f: (s: S, a: A) => Effect.Effect - ) => Effect.Effect + ) => Effect.Effect> >(4, ( self: Stream.Stream, s: S, cont: Predicate, f: (s: S, a: A) => Effect.Effect -): Effect.Effect => pipe(self, runFoldWhileScopedEffect(s, cont, f), Effect.scoped)) +): Effect.Effect> => + pipe(self, runFoldWhileScopedEffect(s, cont, f), Effect.scoped)) /** @internal */ export const runFoldWhileScoped = dual< @@ -5284,29 +5298,29 @@ export const runFoldWhileScopedEffect = dual< export const runForEach = dual< ( f: (a: A) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect, + ) => (self: Stream.Stream) => Effect.Effect>, ( self: Stream.Stream, f: (a: A) => Effect.Effect - ) => Effect.Effect + ) => Effect.Effect> >(2, ( self: Stream.Stream, f: (a: A) => Effect.Effect -): Effect.Effect => pipe(self, run(_sink.forEach(f)))) +): Effect.Effect> => pipe(self, run(_sink.forEach(f)))) /** @internal */ export const runForEachChunk = dual< ( f: (a: Chunk.Chunk) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect, + ) => (self: Stream.Stream) => Effect.Effect>, ( self: Stream.Stream, f: (a: Chunk.Chunk) => Effect.Effect - ) => Effect.Effect + ) => Effect.Effect> >(2, ( self: Stream.Stream, f: (a: Chunk.Chunk) => Effect.Effect -): Effect.Effect => pipe(self, run(_sink.forEachChunk(f)))) +): Effect.Effect> => pipe(self, run(_sink.forEachChunk(f)))) /** @internal */ export const runForEachChunkScoped = dual< @@ -5340,15 +5354,15 @@ export const runForEachScoped = dual< export const runForEachWhile = dual< ( f: (a: A) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect, + ) => (self: Stream.Stream) => Effect.Effect>, ( self: Stream.Stream, f: (a: A) => Effect.Effect - ) => Effect.Effect + ) => Effect.Effect> >(2, ( self: Stream.Stream, f: (a: A) => Effect.Effect -): Effect.Effect => pipe(self, run(_sink.forEachWhile(f)))) +): Effect.Effect> => pipe(self, run(_sink.forEachWhile(f)))) /** @internal */ export const runForEachWhileScoped = dual< @@ -5365,17 +5379,25 @@ export const runForEachWhileScoped = dual< ): Effect.Effect => pipe(self, runScoped(_sink.forEachWhile(f)))) /** @internal */ -export const runHead = (self: Stream.Stream): Effect.Effect, E, R> => - pipe(self, run(_sink.head())) +export const runHead = ( + self: Stream.Stream +): Effect.Effect, E, Exclude> => pipe(self, run(_sink.head())) /** @internal */ export const runIntoPubSub = dual< - (pubsub: PubSub.PubSub>) => (self: Stream.Stream) => Effect.Effect, - (self: Stream.Stream, pubsub: PubSub.PubSub>) => Effect.Effect + ( + pubsub: PubSub.PubSub> + ) => (self: Stream.Stream) => Effect.Effect>, + ( + self: Stream.Stream, + pubsub: PubSub.PubSub> + ) => Effect.Effect> >( 2, - (self: Stream.Stream, pubsub: PubSub.PubSub>): Effect.Effect => - pipe(self, runIntoQueue(pubsub)) + ( + self: Stream.Stream, + pubsub: PubSub.PubSub> + ): Effect.Effect> => pipe(self, runIntoQueue(pubsub)) ) /** @internal */ @@ -5394,12 +5416,19 @@ export const runIntoPubSubScoped = dual< /** @internal */ export const runIntoQueue = dual< - (queue: Queue.Enqueue>) => (self: Stream.Stream) => Effect.Effect, - (self: Stream.Stream, queue: Queue.Enqueue>) => Effect.Effect + ( + queue: Queue.Enqueue> + ) => (self: Stream.Stream) => Effect.Effect>, + ( + self: Stream.Stream, + queue: Queue.Enqueue> + ) => Effect.Effect> >( 2, - (self: Stream.Stream, queue: Queue.Enqueue>): Effect.Effect => - pipe(self, runIntoQueueScoped(queue), Effect.scoped) + ( + self: Stream.Stream, + queue: Queue.Enqueue> + ): Effect.Effect> => pipe(self, runIntoQueueScoped(queue), Effect.scoped) ) /** @internal */ @@ -5462,8 +5491,9 @@ export const runIntoQueueScoped = dual< }) /** @internal */ -export const runLast = (self: Stream.Stream): Effect.Effect, E, R> => - pipe(self, run(_sink.last())) +export const runLast = ( + self: Stream.Stream +): Effect.Effect, E, Exclude> => pipe(self, run(_sink.last())) /** @internal */ export const runScoped = dual< @@ -5486,7 +5516,7 @@ export const runScoped = dual< )) /** @internal */ -export const runSum = (self: Stream.Stream): Effect.Effect => +export const runSum = (self: Stream.Stream): Effect.Effect> => pipe(self, run(_sink.sum)) /** @internal */