diff --git a/.changeset/quiet-peas-carry.md b/.changeset/quiet-peas-carry.md new file mode 100644 index 0000000000..d0e90939cb --- /dev/null +++ b/.changeset/quiet-peas-carry.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +change `Take` type parameters order from `Take` to `Take` diff --git a/packages/effect/mod.sh b/packages/effect/mod.sh index cf0a298205..f2857cf163 100755 --- a/packages/effect/mod.sh +++ b/packages/effect/mod.sh @@ -1,5 +1,5 @@ #!/bin/bash -dirs=(../effect/src) +dirs=(../effect/test) for dir in ${dirs[@]}; do echo Refactoring $dir diff --git a/packages/effect/mod.ts b/packages/effect/mod.ts index 5b77d37b6f..42f5b8d802 100644 --- a/packages/effect/mod.ts +++ b/packages/effect/mod.ts @@ -7,10 +7,12 @@ const enabled = { swapSTMGenParams: false, swapDeferredParams: false, swapTDeferredParams: false, + swapTakeParams: false, cleanupSTM: false, cleanupEffect: false, cleanupStream: false, - cleanupExit: false + cleanupExit: false, + cleanupTake: true } const cleanup = (name: string) => (ast: cs.ASTPath) => { @@ -31,6 +33,7 @@ const cleanupEffect = cleanup("Effect") const cleanupStream = cleanup("Stream") const cleanupExit = cleanup("Exit") const cleanupSTM = cleanup("STM") +const cleanupTake = cleanup("Take") const filter = (ast: cs.ASTPath, nodeName: string) => { const name = ast.value.typeName @@ -71,8 +74,8 @@ const swapParamsREA = (nodeName: string) => (ast: cs.ASTPath } } -const swapDeferredParams = (ast: cs.ASTPath) => { - const is = filter(ast, "Deferred") +const swapParamsEA = (nodeName: string) => (ast: cs.ASTPath) => { + const is = filter(ast, nodeName) if ( is(ast.value.typeName) && ast.value.typeParameters && @@ -80,22 +83,14 @@ const swapDeferredParams = (ast: cs.ASTPath) => { ) { const params = ast.value.typeParameters.params const newParams = [params[1], params[0]] + popNever(newParams) ast.value.typeParameters.params = newParams } } -const swapTDeferredParams = (ast: cs.ASTPath) => { - const is = filter(ast, "TDeferred") - if ( - is(ast.value.typeName) && - ast.value.typeParameters && - ast.value.typeParameters.params.length === 2 - ) { - const params = ast.value.typeParameters.params - const newParams = [params[1], params[0]] - ast.value.typeParameters.params = newParams - } -} +const swapDeferredParams = swapParamsEA("Deferred") +const swapTDeferredParams = swapParamsEA("TDeferred") +const swapTakeParams = swapParamsEA("Take") const swapSTMParams = swapParamsREA("STM") const swapSTMGenParams = swapParamsREA("STMGen") @@ -141,6 +136,9 @@ export default function transformer(file: cs.FileInfo, api: cs.API) { if (enabled.swapTDeferredParams) { swapTDeferredParams(ast) } + if (enabled.swapTakeParams) { + swapTakeParams(ast) + } if (enabled.cleanupEffect) { cleanupEffect(ast) } @@ -153,6 +151,9 @@ export default function transformer(file: cs.FileInfo, api: cs.API) { if (enabled.cleanupSTM) { cleanupSTM(ast) } + if (enabled.cleanupTake) { + cleanupTake(ast) + } }) return root.toSource() diff --git a/packages/effect/src/GroupBy.ts b/packages/effect/src/GroupBy.ts index 092a62c55b..520b4bf748 100644 --- a/packages/effect/src/GroupBy.ts +++ b/packages/effect/src/GroupBy.ts @@ -30,7 +30,7 @@ export type GroupByTypeId = typeof GroupByTypeId * @category models */ export interface GroupBy extends GroupBy.Variance, Pipeable { - readonly grouped: Stream.Stream>], E, R> + readonly grouped: Stream.Stream>], E, R> } /** @@ -99,5 +99,5 @@ export const first: { * @category constructors */ export const make: ( - grouped: Stream.Stream>], E, R> + grouped: Stream.Stream>], E, R> ) => GroupBy = internal.make diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index d0ee74be06..74cc1b254f 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -399,12 +399,12 @@ export const broadcastedQueues: { maximumLag: number ): ( self: Stream - ) => Effect.Effect>, N>, never, R | Scope.Scope> + ) => Effect.Effect>, N>, never, R | Scope.Scope> ( self: Stream, n: N, maximumLag: number - ): Effect.Effect>, N>, never, Scope.Scope | R> + ): Effect.Effect>, N>, never, Scope.Scope | R> } = internal.broadcastedQueues /** @@ -422,11 +422,11 @@ export const broadcastedQueuesDynamic: { maximumLag: number ): ( self: Stream - ) => Effect.Effect>, never, Scope.Scope>, never, R | Scope.Scope> + ) => Effect.Effect>, never, Scope.Scope>, never, R | Scope.Scope> ( self: Stream, maximumLag: number - ): Effect.Effect>, never, Scope.Scope>, never, Scope.Scope | R> + ): Effect.Effect>, never, Scope.Scope>, never, Scope.Scope | R> } = internal.broadcastedQueuesDynamic /** @@ -1397,7 +1397,7 @@ export const flattenIterables: (self: Stream, E, R>) => Str * @since 2.0.0 * @category sequencing */ -export const flattenTake: (self: Stream, E, R>) => Stream = +export const flattenTake: (self: Stream, E, R>) => Stream = internal.flattenTake /** @@ -3180,8 +3180,8 @@ export const runHead: (self: Stream) => Effect.Effect(pubsub: PubSub.PubSub>): (self: Stream) => Effect.Effect - (self: Stream, pubsub: PubSub.PubSub>): Effect.Effect + (pubsub: PubSub.PubSub>): (self: Stream) => Effect.Effect + (self: Stream, pubsub: PubSub.PubSub>): Effect.Effect } = internal.runIntoPubSub /** @@ -3193,9 +3193,9 @@ export const runIntoPubSub: { */ export const runIntoPubSubScoped: { ( - pubsub: PubSub.PubSub> + pubsub: PubSub.PubSub> ): (self: Stream) => Effect.Effect - (self: Stream, pubsub: PubSub.PubSub>): Effect.Effect + (self: Stream, pubsub: PubSub.PubSub>): Effect.Effect } = internal.runIntoPubSubScoped /** @@ -3206,8 +3206,8 @@ export const runIntoPubSubScoped: { * @category destructors */ export const runIntoQueue: { - (queue: Queue.Enqueue>): (self: Stream) => Effect.Effect - (self: Stream, queue: Queue.Enqueue>): Effect.Effect + (queue: Queue.Enqueue>): (self: Stream) => Effect.Effect + (self: Stream, queue: Queue.Enqueue>): Effect.Effect } = internal.runIntoQueue /** @@ -3236,9 +3236,9 @@ export const runIntoQueueElementsScoped: { */ export const runIntoQueueScoped: { ( - queue: Queue.Enqueue> + queue: Queue.Enqueue> ): (self: Stream) => Effect.Effect - (self: Stream, queue: Queue.Enqueue>): Effect.Effect + (self: Stream, queue: Queue.Enqueue>): Effect.Effect } = internal.runIntoQueueScoped /** @@ -3799,11 +3799,11 @@ export const timeoutTo: { export const toPubSub: { ( capacity: number - ): (self: Stream) => Effect.Effect>, never, Scope.Scope | R> + ): (self: Stream) => Effect.Effect>, never, Scope.Scope | R> ( self: Stream, capacity: number - ): Effect.Effect>, never, Scope.Scope | R> + ): Effect.Effect>, never, Scope.Scope | R> } = internal.toPubSub /** @@ -3834,14 +3834,14 @@ export const toQueue: { | { readonly strategy?: "dropping" | "sliding" | "suspend" | undefined; readonly capacity?: number | undefined } | { readonly strategy: "unbounded" } | undefined - ): (self: Stream) => Effect.Effect>, never, Scope.Scope | R> + ): (self: Stream) => Effect.Effect>, never, Scope.Scope | R> ( self: Stream, options?: | { readonly strategy?: "dropping" | "sliding" | "suspend" | undefined; readonly capacity?: number | undefined } | { readonly strategy: "unbounded" } | undefined - ): Effect.Effect>, never, Scope.Scope | R> + ): Effect.Effect>, never, Scope.Scope | R> } = internal.toQueue /** diff --git a/packages/effect/src/Take.ts b/packages/effect/src/Take.ts index 86008a3a3b..b41a2b6ea0 100644 --- a/packages/effect/src/Take.ts +++ b/packages/effect/src/Take.ts @@ -23,14 +23,14 @@ export const TakeTypeId: unique symbol = internal.TakeTypeId export type TakeTypeId = typeof TakeTypeId /** - * A `Take` represents a single `take` from a queue modeling a stream of + * A `Take` represents a single `take` from a queue modeling a stream of * values. A `Take` may be a failure cause `Cause`, a chunk value `Chunk`, * or an end-of-stream marker. * * @since 2.0.0 * @category models */ -export interface Take extends Take.Variance, Pipeable { +export interface Take extends Take.Variance, Pipeable { /** @internal */ readonly exit: Exit.Exit, Option.Option> } @@ -43,10 +43,10 @@ export declare namespace Take { * @since 2.0.0 * @category models */ - export interface Variance { + export interface Variance { readonly [TakeTypeId]: { - readonly _E: Types.Covariant readonly _A: Types.Covariant + readonly _E: Types.Covariant } } } @@ -57,7 +57,7 @@ export declare namespace Take { * @since 2.0.0 * @category constructors */ -export const chunk: (chunk: Chunk.Chunk) => Take = internal.chunk +export const chunk: (chunk: Chunk.Chunk) => Take = internal.chunk /** * Creates a failing `Take` with the specified defect. @@ -65,7 +65,7 @@ export const chunk: (chunk: Chunk.Chunk) => Take = internal.chun * @since 2.0.0 * @category constructors */ -export const die: (defect: unknown) => Take = internal.die +export const die: (defect: unknown) => Take = internal.die /** * Creates a failing `Take` with the specified error message. @@ -73,15 +73,15 @@ export const die: (defect: unknown) => Take = internal.die * @since 2.0.0 * @category constructors */ -export const dieMessage: (message: string) => Take = internal.dieMessage +export const dieMessage: (message: string) => Take = internal.dieMessage /** - * Transforms a `Take` to an `Effect`. + * Transforms a `Take` to an `Effect`. * * @since 2.0.0 * @category destructors */ -export const done: (self: Take) => Effect.Effect, Option.Option> = internal.done +export const done: (self: Take) => Effect.Effect, Option.Option> = internal.done /** * Represents the end-of-stream marker. @@ -89,7 +89,7 @@ export const done: (self: Take) => Effect.Effect, Opt * @since 2.0.0 * @category constructors */ -export const end: Take = internal.end +export const end: Take = internal.end /** * Creates a failing `Take` with the specified error. @@ -97,7 +97,7 @@ export const end: Take = internal.end * @since 2.0.0 * @category constructors */ -export const fail: (error: E) => Take = internal.fail +export const fail: (error: E) => Take = internal.fail /** * Creates a failing `Take` with the specified cause. @@ -105,17 +105,17 @@ export const fail: (error: E) => Take = internal.fail * @since 2.0.0 * @category constructors */ -export const failCause: (cause: Cause.Cause) => Take = internal.failCause +export const failCause: (cause: Cause.Cause) => Take = internal.failCause /** * Creates an effect from `Effect` that does not fail, but succeeds with - * the `Take`. Error from stream when pulling is converted to + * the `Take`. Error from stream when pulling is converted to * `Take.failCause`. Creates a single value chunk. * * @since 2.0.0 * @category constructors */ -export const fromEffect: (effect: Effect.Effect) => Effect.Effect, never, R> = +export const fromEffect: (effect: Effect.Effect) => Effect.Effect, never, R> = internal.fromEffect /** @@ -124,11 +124,11 @@ export const fromEffect: (effect: Effect.Effect) => Effect.Eff * @since 2.0.0 * @category constructors */ -export const fromExit: (exit: Exit.Exit) => Take = internal.fromExit +export const fromExit: (exit: Exit.Exit) => Take = internal.fromExit /** * Creates effect from `Effect, Option, R>` that does not fail, but - * succeeds with the `Take`. Errors from stream when pulling are converted + * succeeds with the `Take`. Errors from stream when pulling are converted * to `Take.failCause`, and the end-of-stream is converted to `Take.end`. * * @since 2.0.0 @@ -136,7 +136,7 @@ export const fromExit: (exit: Exit.Exit) => Take = internal.fr */ export const fromPull: ( pull: Effect.Effect, Option.Option, R> -) => Effect.Effect, never, R> = internal.fromPull +) => Effect.Effect, never, R> = internal.fromPull /** * Checks if this `take` is done (`Take.end`). @@ -144,7 +144,7 @@ export const fromPull: ( * @since 2.0.0 * @category getters */ -export const isDone: (self: Take) => boolean = internal.isDone +export const isDone: (self: Take) => boolean = internal.isDone /** * Checks if this `take` is a failure. @@ -152,7 +152,7 @@ export const isDone: (self: Take) => boolean = internal.isDone * @since 2.0.0 * @category getters */ -export const isFailure: (self: Take) => boolean = internal.isFailure +export const isFailure: (self: Take) => boolean = internal.isFailure /** * Checks if this `take` is a success. @@ -160,7 +160,7 @@ export const isFailure: (self: Take) => boolean = internal.isFailure * @since 2.0.0 * @category getters */ -export const isSuccess: (self: Take) => boolean = internal.isSuccess +export const isSuccess: (self: Take) => boolean = internal.isSuccess /** * Constructs a `Take`. @@ -168,17 +168,17 @@ export const isSuccess: (self: Take) => boolean = internal.isSuccess * @since 2.0.0 * @category constructors */ -export const make: (exit: Exit.Exit, Option.Option>) => Take = internal.make +export const make: (exit: Exit.Exit, Option.Option>) => Take = internal.make /** - * Transforms `Take` to `Take` by applying function `f`. + * Transforms `Take` to `Take` by applying function `f`. * * @since 2.0.0 * @category mapping */ export const map: { - (f: (a: A) => B): (self: Take) => Take - (self: Take, f: (a: A) => B): Take + (f: (a: A) => B): (self: Take) => Take + (self: Take, f: (a: A) => B): Take } = internal.map /** @@ -195,9 +195,9 @@ export const match: { readonly onFailure: (cause: Cause.Cause) => Z2 readonly onSuccess: (chunk: Chunk.Chunk) => Z3 } - ): (self: Take) => Z | Z2 | Z3 - ( - self: Take, + ): (self: Take) => Z | Z2 | Z3 + ( + self: Take, options: { readonly onEnd: () => Z readonly onFailure: (cause: Cause.Cause) => Z2 @@ -216,21 +216,21 @@ export const match: { * @category destructors */ export const matchEffect: { - ( + ( options: { readonly onEnd: Effect.Effect readonly onFailure: (cause: Cause.Cause) => Effect.Effect readonly onSuccess: (chunk: Chunk.Chunk) => Effect.Effect } - ): (self: Take) => Effect.Effect - ( - self: Take, + ): (self: Take) => Effect.Effect + ( + self: Take, options: { readonly onEnd: Effect.Effect readonly onFailure: (cause: Cause.Cause) => Effect.Effect readonly onSuccess: (chunk: Chunk.Chunk) => Effect.Effect } - ): Effect.Effect + ): Effect.Effect } = internal.matchEffect /** @@ -239,7 +239,7 @@ export const matchEffect: { * @since 2.0.0 * @category constructors */ -export const of: (value: A) => Take = internal.of +export const of: (value: A) => Take = internal.of /** * Returns an effect that effectfully "peeks" at the success of this take. @@ -248,11 +248,11 @@ export const of: (value: A) => Take = internal.of * @category sequencing */ export const tap: { - ( + ( f: (chunk: Chunk.Chunk) => Effect.Effect<_, E2, R> - ): (self: Take) => Effect.Effect - ( - self: Take, + ): (self: Take) => Effect.Effect + ( + self: Take, f: (chunk: Chunk.Chunk) => Effect.Effect<_, E2, R> ): Effect.Effect } = internal.tap diff --git a/packages/effect/src/internal/configProvider.ts b/packages/effect/src/internal/configProvider.ts index 88d28595c9..e00c2fd0df 100644 --- a/packages/effect/src/internal/configProvider.ts +++ b/packages/effect/src/internal/configProvider.ts @@ -626,9 +626,7 @@ const transpose = (array: ReadonlyArray>): ReadonlyArray array.map((row) => row[column as any])) } -const escapeRegex = (string: string): string => { - return string.replace(/[/\-\\^$*+?.()|[\]{}]/g, "\\$&") -} +const escapeRegex = (string: string): string => string.replace(/[/\-\\^$*+?.()|[\]{}]/g, "\\$&") const indicesFrom = (quotedIndices: HashSet.HashSet): Effect.Effect> => pipe( diff --git a/packages/effect/src/internal/groupBy.ts b/packages/effect/src/internal/groupBy.ts index 3b0b3a872b..a45b99e18d 100644 --- a/packages/effect/src/internal/groupBy.ts +++ b/packages/effect/src/internal/groupBy.ts @@ -113,7 +113,7 @@ export const first = dual< /** @internal */ export const make = ( - grouped: Stream.Stream>], E, R> + grouped: Stream.Stream>], E, R> ): GroupBy.GroupBy => ({ [GroupByTypeId]: groupByVariance, pipe() { @@ -155,7 +155,7 @@ export const groupBy = dual< Deferred.make<(key: K, value: V) => Effect.Effect>>() ) const output = yield* $(Effect.acquireRelease( - Queue.bounded>], Option.Option>>( + Queue.bounded>], Option.Option>>( options?.bufferSize ?? 16 ), (queue) => Queue.shutdown(queue) @@ -417,8 +417,8 @@ export const groupByKey = dual< } ): GroupBy.GroupBy => { const loop = ( - map: Map>>, - outerQueue: Queue.Queue>]>> + map: Map>>, + outerQueue: Queue.Queue>], E>> ): Channel.Channel, unknown, E, never, unknown> => core.readWithCause({ onInput: (input: Chunk.Chunk) => @@ -428,7 +428,7 @@ export const groupByKey = dual< const innerQueue = map.get(key) if (innerQueue === undefined) { return pipe( - Queue.bounded>(options?.bufferSize ?? 16), + Queue.bounded>(options?.bufferSize ?? 16), Effect.flatMap((innerQueue) => pipe( Effect.sync(() => { @@ -483,11 +483,11 @@ export const groupByKey = dual< }) return make(stream.unwrapScoped( pipe( - Effect.sync(() => new Map>>()), + Effect.sync(() => new Map>>()), Effect.flatMap((map) => pipe( Effect.acquireRelease( - Queue.unbounded>]>>(), + Queue.unbounded>], E>>(), (queue) => Queue.shutdown(queue) ), Effect.flatMap((queue) => diff --git a/packages/effect/src/internal/logSpan.ts b/packages/effect/src/internal/logSpan.ts index 86b60a663e..4b9a4c5af9 100644 --- a/packages/effect/src/internal/logSpan.ts +++ b/packages/effect/src/internal/logSpan.ts @@ -7,9 +7,7 @@ export const make = (label: string, startTime: number): LogSpan.LogSpan => ({ }) /** @internal */ -export const render = (now: number) => { - return (self: LogSpan.LogSpan): string => { - const label = self.label.replace(/[\s="]/g, "_") - return `${label}=${now - self.startTime}ms` - } +export const render = (now: number) => (self: LogSpan.LogSpan): string => { + const label = self.label.replace(/[\s="]/g, "_") + return `${label}=${now - self.startTime}ms` } diff --git a/packages/effect/src/internal/stm/tDeferred.ts b/packages/effect/src/internal/stm/tDeferred.ts index 0169492776..ca3ad8f466 100644 --- a/packages/effect/src/internal/stm/tDeferred.ts +++ b/packages/effect/src/internal/stm/tDeferred.ts @@ -18,7 +18,9 @@ export const TDeferredTypeId: TDeferred.TDeferredTypeId = Symbol.for( /** @internal */ const tDeferredVariance = { + /* c8 ignore next */ _A: (_: any) => _, + /* c8 ignore next */ _E: (_: any) => _ } diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index dea6393530..dc99e357a2 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -477,7 +477,7 @@ export const asyncEffect = ( ): Stream.Stream => pipe( Effect.acquireRelease( - Queue.bounded>(outputBuffer), + Queue.bounded>(outputBuffer), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) => @@ -534,7 +534,7 @@ export const asyncInterrupt = ( ): Stream.Stream => pipe( Effect.acquireRelease( - Queue.bounded>(outputBuffer), + Queue.bounded>(outputBuffer), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) => @@ -610,7 +610,7 @@ export const asyncScoped = ( ): Stream.Stream> => pipe( Effect.acquireRelease( - Queue.bounded>(outputBuffer), + Queue.bounded>(outputBuffer), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) => @@ -766,21 +766,21 @@ export const broadcastedQueues = dual< maximumLag: number ) => ( self: Stream.Stream - ) => Effect.Effect>, N>, never, Scope.Scope | R>, + ) => Effect.Effect>, N>, never, Scope.Scope | R>, ( self: Stream.Stream, n: N, maximumLag: number - ) => Effect.Effect>, N>, never, Scope.Scope | R> + ) => Effect.Effect>, N>, never, Scope.Scope | R> >(3, ( self: Stream.Stream, n: N, maximumLag: number -): Effect.Effect>, N>, never, Scope.Scope | R> => - Effect.flatMap(PubSub.bounded>(maximumLag), (pubsub) => +): Effect.Effect>, N>, never, Scope.Scope | R> => + Effect.flatMap(PubSub.bounded>(maximumLag), (pubsub) => pipe( Effect.all(Array.from({ length: n }, () => PubSub.subscribe(pubsub))) as Effect.Effect< - Stream.Stream.DynamicTuple>, N>, + Stream.Stream.DynamicTuple>, N>, never, R >, @@ -793,15 +793,15 @@ export const broadcastedQueuesDynamic = dual< maximumLag: number ) => ( self: Stream.Stream - ) => Effect.Effect>, never, Scope.Scope>, never, Scope.Scope | R>, + ) => Effect.Effect>, never, Scope.Scope>, never, Scope.Scope | R>, ( self: Stream.Stream, maximumLag: number - ) => Effect.Effect>, never, Scope.Scope>, never, Scope.Scope | R> + ) => Effect.Effect>, never, Scope.Scope>, never, Scope.Scope | R> >(2, ( self: Stream.Stream, maximumLag: number -): Effect.Effect>, never, Scope.Scope>, never, Scope.Scope | R> => +): Effect.Effect>, never, Scope.Scope>, never, Scope.Scope | R> => Effect.map(toPubSub(self, maximumLag), PubSub.subscribe)) /** @internal */ @@ -902,7 +902,7 @@ const bufferChunksDropping = dual< (self: Stream.Stream, capacity: number) => Stream.Stream >(2, (self: Stream.Stream, capacity: number): Stream.Stream => { const queue = Effect.acquireRelease( - Queue.dropping, Deferred.Deferred]>(capacity), + Queue.dropping, Deferred.Deferred]>(capacity), (queue) => Queue.shutdown(queue) ) return new StreamImpl(bufferSignal(queue, toChannel(self))) @@ -913,7 +913,7 @@ const bufferChunksSliding = dual< (self: Stream.Stream, capacity: number) => Stream.Stream >(2, (self: Stream.Stream, capacity: number): Stream.Stream => { const queue = Effect.acquireRelease( - Queue.sliding, Deferred.Deferred]>(capacity), + Queue.sliding, Deferred.Deferred]>(capacity), (queue) => Queue.shutdown(queue) ) return new StreamImpl(bufferSignal(queue, toChannel(self))) @@ -924,7 +924,7 @@ const bufferDropping = dual< (self: Stream.Stream, capacity: number) => Stream.Stream >(2, (self: Stream.Stream, capacity: number): Stream.Stream => { const queue = Effect.acquireRelease( - Queue.dropping, Deferred.Deferred]>(capacity), + Queue.dropping, Deferred.Deferred]>(capacity), (queue) => Queue.shutdown(queue) ) return new StreamImpl(bufferSignal(queue, toChannel(rechunk(1)(self)))) @@ -935,7 +935,7 @@ const bufferSliding = dual< (self: Stream.Stream, capacity: number) => Stream.Stream >(2, (self: Stream.Stream, capacity: number): Stream.Stream => { const queue = Effect.acquireRelease( - Queue.sliding, Deferred.Deferred]>(capacity), + Queue.sliding, Deferred.Deferred]>(capacity), (queue) => Queue.shutdown(queue) ) return new StreamImpl(bufferSignal(queue, toChannel(pipe(self, rechunk(1))))) @@ -961,14 +961,14 @@ const bufferUnbounded = (self: Stream.Stream): Stream.Stream( - scoped: Effect.Effect, Deferred.Deferred]>, never, Scope.Scope>, + scoped: Effect.Effect, Deferred.Deferred]>, never, Scope.Scope>, bufferChannel: Channel.Channel, void> ): Channel.Channel, void> => { const producer = ( - queue: Queue.Queue, Deferred.Deferred]>, + queue: Queue.Queue, Deferred.Deferred]>, ref: Ref.Ref> ): Channel.Channel, unknown, never, never, unknown> => { - const terminate = (take: Take.Take): Channel.Channel, unknown, never, never, unknown> => + const terminate = (take: Take.Take): Channel.Channel, unknown, never, never, unknown> => pipe( Ref.get(ref), Effect.tap(Deferred.await), @@ -1003,7 +1003,7 @@ const bufferSignal = ( }) } const consumer = ( - queue: Queue.Queue, Deferred.Deferred]> + queue: Queue.Queue, Deferred.Deferred]> ): Channel.Channel, void> => { const process: Channel.Channel, void> = pipe( core.fromEffect(Queue.take(queue)), @@ -1460,7 +1460,7 @@ export const combineChunks = dual< ) => Effect.Effect, S], Option.Option>, never, R5> ): Stream.Stream => { const producer = ( - handoff: Handoff.Handoff>, + handoff: Handoff.Handoff>, latch: Handoff.Handoff ): Channel.Channel, unknown, never, never, unknown> => channel.zipRight( @@ -1470,26 +1470,26 @@ export const combineChunks = dual< core.flatMap( core.fromEffect(pipe( handoff, - Handoff.offer>(_take.chunk(input)) + Handoff.offer>(_take.chunk(input)) )), () => producer(handoff, latch) ), onFailure: (cause) => core.fromEffect( - Handoff.offer>( + Handoff.offer>( handoff, _take.failCause(cause) ) ), onDone: (): Channel.Channel, unknown, never, never, unknown> => - core.fromEffect(Handoff.offer>(handoff, _take.end)) + core.fromEffect(Handoff.offer>(handoff, _take.end)) }) ) return new StreamImpl( pipe( Effect.all([ - Handoff.make>(), - Handoff.make>(), + Handoff.make>(), + Handoff.make>(), Handoff.make(), Handoff.make() ]), @@ -2818,7 +2818,7 @@ export const flattenIterables = (self: Stream.Stream, E, R> pipe(self, map(Chunk.fromIterable), flattenChunks) /** @internal */ -export const flattenTake = (self: Stream.Stream, E, R>): Stream.Stream => +export const flattenTake = (self: Stream.Stream, E, R>): Stream.Stream => flattenChunks(flattenExitOption(pipe(self, map((take) => take.exit)))) /** @internal */ @@ -3384,33 +3384,33 @@ export const interleaveWith = dual< decider: Stream.Stream ): Stream.Stream => { const producer = ( - handoff: Handoff.Handoff> + handoff: Handoff.Handoff> ): Channel.Channel => core.readWithCause({ onInput: (value: A | A2) => core.flatMap( core.fromEffect( - Handoff.offer>(handoff, _take.of(value)) + Handoff.offer>(handoff, _take.of(value)) ), () => producer(handoff) ), onFailure: (cause) => core.fromEffect( - Handoff.offer>( + Handoff.offer>( handoff, _take.failCause(cause) ) ), onDone: () => core.fromEffect( - Handoff.offer>(handoff, _take.end) + Handoff.offer>(handoff, _take.end) ) }) return new StreamImpl( channel.unwrapScoped( pipe( - Handoff.make>(), - Effect.zip(Handoff.make>()), + Handoff.make>(), + Effect.zip(Handoff.make>()), Effect.tap(([left]) => pipe( toChannel(self), @@ -5383,35 +5383,35 @@ export const runHead = (self: Stream.Stream): Effect.Effect(pubsub: PubSub.PubSub>) => (self: Stream.Stream) => Effect.Effect, - (self: Stream.Stream, pubsub: PubSub.PubSub>) => Effect.Effect + (pubsub: PubSub.PubSub>) => (self: Stream.Stream) => Effect.Effect, + (self: Stream.Stream, pubsub: PubSub.PubSub>) => Effect.Effect >( 2, - (self: Stream.Stream, pubsub: PubSub.PubSub>): Effect.Effect => + (self: Stream.Stream, pubsub: PubSub.PubSub>): Effect.Effect => pipe(self, runIntoQueue(pubsub)) ) /** @internal */ export const runIntoPubSubScoped = dual< ( - pubsub: PubSub.PubSub> + pubsub: PubSub.PubSub> ) => (self: Stream.Stream) => Effect.Effect, ( self: Stream.Stream, - pubsub: PubSub.PubSub> + pubsub: PubSub.PubSub> ) => Effect.Effect >(2, ( self: Stream.Stream, - pubsub: PubSub.PubSub> + pubsub: PubSub.PubSub> ): Effect.Effect => pipe(self, runIntoQueueScoped(pubsub))) /** @internal */ export const runIntoQueue = dual< - (queue: Queue.Enqueue>) => (self: Stream.Stream) => Effect.Effect, - (self: Stream.Stream, queue: Queue.Enqueue>) => Effect.Effect + (queue: Queue.Enqueue>) => (self: Stream.Stream) => Effect.Effect, + (self: Stream.Stream, queue: Queue.Enqueue>) => Effect.Effect >( 2, - (self: Stream.Stream, queue: Queue.Enqueue>): Effect.Effect => + (self: Stream.Stream, queue: Queue.Enqueue>): Effect.Effect => pipe(self, runIntoQueueScoped(queue), Effect.scoped) ) @@ -5449,17 +5449,17 @@ export const runIntoQueueElementsScoped = dual< /** @internal */ export const runIntoQueueScoped = dual< ( - queue: Queue.Enqueue> + queue: Queue.Enqueue> ) => (self: Stream.Stream) => Effect.Effect, ( self: Stream.Stream, - queue: Queue.Enqueue> + queue: Queue.Enqueue> ) => Effect.Effect >(2, ( self: Stream.Stream, - queue: Queue.Enqueue> + queue: Queue.Enqueue> ): Effect.Effect => { - const writer: Channel.Channel, unknown, never, Take.Take, unknown> = core + const writer: Channel.Channel, unknown, never, Take.Take, unknown> = core .readWithCause({ onInput: (input: Chunk.Chunk) => core.flatMap(core.write(_take.chunk(input)), () => writer), onFailure: (cause) => core.write(_take.failCause(cause)), @@ -6233,7 +6233,7 @@ export const tapSink = dual< sink: Sink.Sink ): Stream.Stream => pipe( - fromEffect(Effect.all([Queue.bounded>(1), Deferred.make()])), + fromEffect(Effect.all([Queue.bounded>(1), Deferred.make()])), flatMap(([queue, deferred]) => { const right = flattenTake(fromQueue(queue, { maxChunkSize: 1 })) const loop: Channel.Channel, unknown, E | E2, Chunk.Chunk, unknown> = core @@ -6563,18 +6563,18 @@ export const timeoutTo = dual< export const toPubSub = dual< ( capacity: number - ) => (self: Stream.Stream) => Effect.Effect>, never, Scope.Scope | R>, + ) => (self: Stream.Stream) => Effect.Effect>, never, Scope.Scope | R>, ( self: Stream.Stream, capacity: number - ) => Effect.Effect>, never, Scope.Scope | R> + ) => Effect.Effect>, never, Scope.Scope | R> >(2, ( self: Stream.Stream, capacity: number -): Effect.Effect>, never, Scope.Scope | R> => +): Effect.Effect>, never, Scope.Scope | R> => pipe( Effect.acquireRelease( - PubSub.bounded>(capacity), + PubSub.bounded>(capacity), (pubsub) => PubSub.shutdown(pubsub) ), Effect.tap((pubsub) => pipe(self, runIntoPubSubScoped(pubsub), Effect.forkScoped)) @@ -6605,7 +6605,7 @@ export const toQueue = dual< } ) => ( self: Stream.Stream - ) => Effect.Effect>, never, R | Scope.Scope>, + ) => Effect.Effect>, never, R | Scope.Scope>, ( self: Stream.Stream, options?: { @@ -6614,7 +6614,7 @@ export const toQueue = dual< } | { readonly strategy: "unbounded" } - ) => Effect.Effect>, never, R | Scope.Scope> + ) => Effect.Effect>, never, R | Scope.Scope> >((args) => isStream(args[0]), ( self: Stream.Stream, options?: { @@ -6627,12 +6627,12 @@ export const toQueue = dual< Effect.tap( Effect.acquireRelease( options?.strategy === "unbounded" ? - Queue.unbounded>() : + Queue.unbounded>() : options?.strategy === "dropping" ? - Queue.dropping>(options.capacity ?? 2) : + Queue.dropping>(options.capacity ?? 2) : options?.strategy === "sliding" ? - Queue.sliding>(options.capacity ?? 2) : - Queue.bounded>(options?.capacity ?? 2), + Queue.sliding>(options.capacity ?? 2) : + Queue.bounded>(options?.capacity ?? 2), (queue) => Queue.shutdown(queue) ), (queue) => Effect.forkScoped(runIntoQueueScoped(self, queue)) diff --git a/packages/effect/src/internal/stream/pull.ts b/packages/effect/src/internal/stream/pull.ts index 7cd5cfcdff..2f8108085e 100644 --- a/packages/effect/src/internal/stream/pull.ts +++ b/packages/effect/src/internal/stream/pull.ts @@ -30,5 +30,5 @@ export const failCause = (cause: Cause.Cause): Effect.Effect( - dequeue: Queue.Dequeue> + dequeue: Queue.Dequeue> ): Effect.Effect, Option.Option> => Effect.flatMap(Queue.take(dequeue), take.done) diff --git a/packages/effect/src/internal/take.ts b/packages/effect/src/internal/take.ts index 8d39a16ac1..51b090682e 100644 --- a/packages/effect/src/internal/take.ts +++ b/packages/effect/src/internal/take.ts @@ -17,13 +17,13 @@ export const TakeTypeId: Take.TakeTypeId = Symbol.for( const takeVariance = { /* c8 ignore next */ - _E: (_: never) => _, + _A: (_: never) => _, /* c8 ignore next */ - _A: (_: never) => _ + _E: (_: never) => _ } /** @internal */ -export class TakeImpl implements Take.Take { +export class TakeImpl implements Take.Take { readonly [TakeTypeId] = takeVariance constructor(readonly exit: Exit.Exit, Option.Option>) { } @@ -33,41 +33,41 @@ export class TakeImpl implements Take.Take { } /** @internal */ -export const chunk = (chunk: Chunk.Chunk): Take.Take => new TakeImpl(Exit.succeed(chunk)) +export const chunk = (chunk: Chunk.Chunk): Take.Take => new TakeImpl(Exit.succeed(chunk)) /** @internal */ -export const die = (defect: unknown): Take.Take => new TakeImpl(Exit.die(defect)) +export const die = (defect: unknown): Take.Take => new TakeImpl(Exit.die(defect)) /** @internal */ -export const dieMessage = (message: string): Take.Take => +export const dieMessage = (message: string): Take.Take => new TakeImpl(Exit.die(new Cause.RuntimeException(message))) /** @internal */ -export const done = (self: Take.Take): Effect.Effect, Option.Option> => +export const done = (self: Take.Take): Effect.Effect, Option.Option> => Effect.suspend(() => self.exit) /** @internal */ -export const end: Take.Take = new TakeImpl(Exit.fail(Option.none())) +export const end: Take.Take = new TakeImpl(Exit.fail(Option.none())) /** @internal */ -export const fail = (error: E): Take.Take => new TakeImpl(Exit.fail(Option.some(error))) +export const fail = (error: E): Take.Take => new TakeImpl(Exit.fail(Option.some(error))) /** @internal */ -export const failCause = (cause: Cause.Cause): Take.Take => +export const failCause = (cause: Cause.Cause): Take.Take => new TakeImpl(Exit.failCause(pipe(cause, Cause.map(Option.some)))) /** @internal */ -export const fromEffect = (effect: Effect.Effect): Effect.Effect, never, R> => +export const fromEffect = (effect: Effect.Effect): Effect.Effect, never, R> => Effect.matchCause(effect, { onFailure: failCause, onSuccess: of }) /** @internal */ -export const fromExit = (exit: Exit.Exit): Take.Take => +export const fromExit = (exit: Exit.Exit): Take.Take => new TakeImpl(pipe(exit, Exit.mapBoth({ onFailure: Option.some, onSuccess: Chunk.of }))) /** @internal */ export const fromPull = ( pull: Effect.Effect, Option.Option, R> -): Effect.Effect, never, R> => +): Effect.Effect, never, R> => Effect.matchCause(pull, { onFailure: (cause) => Option.match(Cause.flipCauseOption(cause), { @@ -78,21 +78,21 @@ export const fromPull = ( }) /** @internal */ -export const isDone = (self: Take.Take): boolean => +export const isDone = (self: Take.Take): boolean => Exit.match(self.exit, { onFailure: (cause) => Option.isNone(Cause.flipCauseOption(cause)), onSuccess: constFalse }) /** @internal */ -export const isFailure = (self: Take.Take): boolean => +export const isFailure = (self: Take.Take): boolean => Exit.match(self.exit, { onFailure: (cause) => Option.isSome(Cause.flipCauseOption(cause)), onSuccess: constFalse }) /** @internal */ -export const isSuccess = (self: Take.Take): boolean => +export const isSuccess = (self: Take.Take): boolean => Exit.match(self.exit, { onFailure: constFalse, onSuccess: constTrue @@ -101,7 +101,7 @@ export const isSuccess = (self: Take.Take): boolean => /** @internal */ export const make = ( exit: Exit.Exit, Option.Option> -): Take.Take => new TakeImpl(exit) +): Take.Take => new TakeImpl(exit) /** @internal */ export const match = dual< @@ -111,17 +111,17 @@ export const match = dual< readonly onFailure: (cause: Cause.Cause) => Z2 readonly onSuccess: (chunk: Chunk.Chunk) => Z3 } - ) => (self: Take.Take) => Z | Z2 | Z3, - ( - self: Take.Take, + ) => (self: Take.Take) => Z | Z2 | Z3, + ( + self: Take.Take, options: { readonly onEnd: () => Z readonly onFailure: (cause: Cause.Cause) => Z2 readonly onSuccess: (chunk: Chunk.Chunk) => Z3 } ) => Z | Z2 | Z3 ->(2, ( - self: Take.Take, +>(2, ( + self: Take.Take, { onEnd, onFailure, onSuccess }: { readonly onEnd: () => Z readonly onFailure: (cause: Cause.Cause) => Z2 @@ -139,23 +139,23 @@ export const match = dual< /** @internal */ export const matchEffect = dual< - ( + ( options: { readonly onEnd: Effect.Effect readonly onFailure: (cause: Cause.Cause) => Effect.Effect readonly onSuccess: (chunk: Chunk.Chunk) => Effect.Effect } - ) => (self: Take.Take) => Effect.Effect, - ( - self: Take.Take, + ) => (self: Take.Take) => Effect.Effect, + ( + self: Take.Take, options: { readonly onEnd: Effect.Effect readonly onFailure: (cause: Cause.Cause) => Effect.Effect readonly onSuccess: (chunk: Chunk.Chunk) => Effect.Effect } ) => Effect.Effect ->(2, ( - self: Take.Take, +>(2, ( + self: Take.Take, { onEnd, onFailure, onSuccess }: { readonly onEnd: Effect.Effect readonly onFailure: (cause: Cause.Cause) => Effect.Effect @@ -173,27 +173,27 @@ export const matchEffect = dual< /** @internal */ export const map = dual< - (f: (a: A) => B) => (self: Take.Take) => Take.Take, - (self: Take.Take, f: (a: A) => B) => Take.Take + (f: (a: A) => B) => (self: Take.Take) => Take.Take, + (self: Take.Take, f: (a: A) => B) => Take.Take >( 2, - (self: Take.Take, f: (a: A) => B): Take.Take => + (self: Take.Take, f: (a: A) => B): Take.Take => new TakeImpl(pipe(self.exit, Exit.map(Chunk.map(f)))) ) /** @internal */ -export const of = (value: A): Take.Take => new TakeImpl(Exit.succeed(Chunk.of(value))) +export const of = (value: A): Take.Take => new TakeImpl(Exit.succeed(Chunk.of(value))) /** @internal */ export const tap = dual< - ( + ( f: (chunk: Chunk.Chunk) => Effect.Effect<_, E2, R> - ) => (self: Take.Take) => Effect.Effect, - ( - self: Take.Take, + ) => (self: Take.Take) => Effect.Effect, + ( + self: Take.Take, f: (chunk: Chunk.Chunk) => Effect.Effect<_, E2, R> ) => Effect.Effect ->(2, ( - self: Take.Take, +>(2, ( + self: Take.Take, f: (chunk: Chunk.Chunk) => Effect.Effect<_, E2, R> ): Effect.Effect => pipe(self.exit, Exit.forEachEffect(f), Effect.asUnit)) diff --git a/packages/effect/test/Stream/aggregation.test.ts b/packages/effect/test/Stream/aggregation.test.ts index b3a7c07735..2e2225c241 100644 --- a/packages/effect/test/Stream/aggregation.test.ts +++ b/packages/effect/test/Stream/aggregation.test.ts @@ -141,7 +141,7 @@ describe("Stream", () => { // Explicitly uses live Clock it.effect("issue from zio-kafka", () => Effect.gen(function*($) { - const queue = yield* $(Queue.unbounded>()) + const queue = yield* $(Queue.unbounded>()) const fiber = yield* $( Stream.fromQueue(queue), Stream.flattenTake, diff --git a/packages/effect/test/Stream/sequencing.test.ts b/packages/effect/test/Stream/sequencing.test.ts index 6bb36a76b7..76bc8f6018 100644 --- a/packages/effect/test/Stream/sequencing.test.ts +++ b/packages/effect/test/Stream/sequencing.test.ts @@ -786,7 +786,7 @@ describe("Stream", () => { it.effect("flattenTake - works with empty streams", () => Effect.gen(function*($) { const result = yield* $( - Stream.fromIterable>([]), + Stream.fromIterable>([]), Stream.flattenTake, Stream.runCollect ) diff --git a/packages/effect/test/Stream/zipping.test.ts b/packages/effect/test/Stream/zipping.test.ts index 92133211f4..699ce57c8a 100644 --- a/packages/effect/test/Stream/zipping.test.ts +++ b/packages/effect/test/Stream/zipping.test.ts @@ -205,7 +205,7 @@ describe("Stream", () => { Effect.gen(function*($) { const left = yield* $(Queue.unbounded>()) const right = yield* $(Queue.unbounded>()) - const output = yield* $(Queue.bounded>(1)) + const output = yield* $(Queue.bounded>(1)) yield* $( Stream.fromChunkQueue(left), Stream.zipLatest(Stream.fromChunkQueue(right)),