diff --git a/.changeset/cuddly-parrots-know.md b/.changeset/cuddly-parrots-know.md new file mode 100644 index 0000000000..17ae1c45db --- /dev/null +++ b/.changeset/cuddly-parrots-know.md @@ -0,0 +1,23 @@ +--- +"effect": minor +--- + +Consolidate `Effect.asyncOption`, `Effect.asyncEither`, `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt` + +This PR removes `Effect.asyncOption` and `Effect.asyncEither` as their behavior can be entirely implemented with the new signature of `Effect.async`, which optionally returns a cleanup `Effect` from the registration callback. + +```ts +declare const async: ( + register: (callback: (_: Effect) => void, signal: AbortSignal) => void | Effect, + blockingOn?: FiberId +) => Effect +``` + +Additionally, this PR removes `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt` as their behavior can be entirely implemented with the new signature of `Stream.async`, which can optionally return a cleanup `Effect` from the registration callback. + +```ts +declare const async: ( + register: (emit: Emit) => Effect | void, + outputBuffer?: number +) => Stream +``` diff --git a/package.json b/package.json index 3b82acfa59..81a26693f2 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "private": true, "type": "module", - "packageManager": "pnpm@8.12.1", + "packageManager": "pnpm@8.15.1", "workspaces": [ "packages/*" ], diff --git a/packages/effect/src/Effect.ts b/packages/effect/src/Effect.ts index f40df93ff5..1986eb5a9a 100644 --- a/packages/effect/src/Effect.ts +++ b/packages/effect/src/Effect.ts @@ -976,17 +976,20 @@ export const validateFirst: { // ------------------------------------------------------------------------------------- /** - * Imports an asynchronous side-effect into a pure `Effect` value. - * The callback function `Effect => void` must be called at most once. + * Imports an asynchronous side-effect into a pure `Effect` value. The callback + * function `Effect => void` **MUST** be called at most once. * - * If an Effect is returned by the registration function, it will be executed - * if the fiber executing the effect is interrupted. + * The registration function can optionally return an Effect, which will be + * executed if the `Fiber` executing this Effect is interrupted. * * The registration function can also receive an `AbortSignal` if required for * interruption. * - * The `FiberId` of the fiber that may complete the async callback may be - * provided to allow for better diagnostics. + * The `FiberId` of the fiber that may complete the async callback may also be + * specified. This is called the "blocking fiber" because it suspends the fiber + * executing the `async` Effect (i.e. semantically blocks the fiber from making + * progress). Specifying this fiber id in cases where it is known will improve + * diagnostics, but not affect the behavior of the returned effect. * * @since 2.0.0 * @category constructors @@ -1005,51 +1008,9 @@ export const async: ( * @since 2.0.0 * @category constructors */ -export const asyncEffect: ( - register: (callback: (_: Effect) => void) => Effect -) => Effect = _runtime.asyncEffect - -/** - * Imports an asynchronous effect into a pure `Effect` value, possibly returning - * the value synchronously. - * - * If the register function returns a value synchronously, then the callback - * function `Effect => void` must not be called. Otherwise the callback - * function must be called at most once. - * - * The `FiberId` of the fiber that may complete the async callback may be - * provided to allow for better diagnostics. - * - * @since 2.0.0 - * @category constructors - */ -export const asyncOption: ( - register: (callback: (_: Effect) => void) => Option.Option>, - blockingOn?: FiberId.FiberId -) => Effect = effect.asyncOption - -/** - * Imports an asynchronous side-effect into an effect. It has the option of - * returning the value synchronously, which is useful in cases where it cannot - * be determined if the effect is synchronous or asynchronous until the register - * is actually executed. It also has the option of returning a canceler, - * which will be used by the runtime to cancel the asynchronous effect if the fiber - * executing the effect is interrupted. - * - * If the register function returns a value synchronously, then the callback - * function `Effect => void` must not be called. Otherwise the callback - * function must be called at most once. - * - * The `FiberId` of the fiber that may complete the async callback may be - * provided to allow for better diagnostics. - * - * @since 2.0.0 - * @category constructors - */ -export const asyncEither: ( - register: (callback: (effect: Effect) => void) => Either.Either, Effect>, - blockingOn?: FiberId.FiberId -) => Effect = core.asyncEither +export const asyncEffect: ( + register: (callback: (_: Effect) => void) => Effect | void, E2, R2> +) => Effect = _runtime.asyncEffect /** * @since 2.0.0 diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index dd65f89881..8d7f6b577c 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -256,15 +256,18 @@ export const as: { } = internal.as const _async: ( - register: (emit: Emit.Emit) => void, + register: (emit: Emit.Emit) => Effect.Effect | void, outputBuffer?: number ) => Stream = internal._async export { /** * Creates a stream from an asynchronous callback that can be called multiple - * times. The optionality of the error type `E` can be used to signal the end - * of the stream, by setting it to `None`. + * times. The optionality of the error type `E` in `Emit` can be used to + * signal the end of the stream by setting it to `None`. + * + * The registration function can optionally return an `Effect`, which will be + * executed if the `Fiber` executing this Effect is interrupted. * * @since 2.0.0 * @category constructors @@ -286,34 +289,6 @@ export const asyncEffect: ( outputBuffer?: number ) => Stream = internal.asyncEffect -/** - * Creates a stream from an asynchronous callback that can be called multiple - * times. The registration of the callback returns either a canceler or - * synchronously returns a stream. The optionality of the error type `E` can - * be used to signal the end of the stream, by setting it to `None`. - * - * @since 2.0.0 - * @category constructors - */ -export const asyncInterrupt: ( - register: (emit: Emit.Emit) => Either.Either, Stream>, - outputBuffer?: number -) => Stream = internal.asyncInterrupt - -/** - * Creates a stream from an asynchronous callback that can be called multiple - * times. The registration of the callback can possibly return the stream - * synchronously. The optionality of the error type `E` can be used to signal - * the end of the stream, by setting it to `None`. - * - * @since 2.0.0 - * @category constructors - */ -export const asyncOption: ( - register: (emit: Emit.Emit) => Option.Option>, - outputBuffer?: number -) => Stream = internal.asyncOption - /** * Creates a stream from an asynchronous callback that can be called multiple * times. The registration of the callback itself returns an a scoped diff --git a/packages/effect/src/internal/clock.ts b/packages/effect/src/internal/clock.ts index cad21ceb3a..80cd809b1c 100644 --- a/packages/effect/src/internal/clock.ts +++ b/packages/effect/src/internal/clock.ts @@ -2,7 +2,6 @@ import type * as Clock from "../Clock.js" import * as Context from "../Context.js" import * as Duration from "../Duration.js" import type * as Effect from "../Effect.js" -import * as Either from "../Either.js" import { constFalse } from "../Function.js" import * as core from "./core.js" @@ -85,9 +84,9 @@ class ClockImpl implements Clock.Clock { } sleep(duration: Duration.Duration): Effect.Effect { - return core.asyncEither((cb) => { - const canceler = globalClockScheduler.unsafeSchedule(() => cb(core.unit), duration) - return Either.left(core.asUnit(core.sync(canceler))) + return core.async((resume) => { + const canceler = globalClockScheduler.unsafeSchedule(() => resume(core.unit), duration) + return core.asUnit(core.sync(canceler)) }) } } diff --git a/packages/effect/src/internal/core-effect.ts b/packages/effect/src/internal/core-effect.ts index bef5208681..88fb7e1224 100644 --- a/packages/effect/src/internal/core-effect.ts +++ b/packages/effect/src/internal/core-effect.ts @@ -4,9 +4,8 @@ import * as Clock from "../Clock.js" import * as Context from "../Context.js" import * as Duration from "../Duration.js" import type * as Effect from "../Effect.js" -import * as Either from "../Either.js" import type * as Fiber from "../Fiber.js" -import * as FiberId from "../FiberId.js" +import type * as FiberId from "../FiberId.js" import type * as FiberRef from "../FiberRef.js" import * as FiberRefs from "../FiberRefs.js" import type * as FiberRefsPatch from "../FiberRefsPatch.js" @@ -75,26 +74,6 @@ export const asSome = (self: Effect.Effect): Effect.Effect(self: Effect.Effect): Effect.Effect, R> => core.mapError(self, Option.some) -/* @internal */ -export const asyncOption = ( - register: (callback: (_: Effect.Effect) => void) => Option.Option>, - blockingOn: FiberId.FiberId = FiberId.none -): Effect.Effect => - core.asyncEither( - (cb) => { - const option = register(cb) - switch (option._tag) { - case "None": { - return Either.left(core.unit) - } - case "Some": { - return Either.right(option.value) - } - } - }, - blockingOn - ) - /* @internal */ export const try_: { (options: { diff --git a/packages/effect/src/internal/core.ts b/packages/effect/src/internal/core.ts index c844b27c0f..56361c0ff4 100644 --- a/packages/effect/src/internal/core.ts +++ b/packages/effect/src/internal/core.ts @@ -480,22 +480,6 @@ export const async = ( effect }) -/* @internal */ -export const asyncEither = ( - register: ( - resume: (effect: Effect.Effect) => void - ) => Either.Either, Effect.Effect>, - blockingOn: FiberId.FiberId = FiberId.none -): Effect.Effect => - async((resume) => { - const result = register(resume) - if (Either.isRight(result)) { - resume(result.right) - } else { - return result.left - } - }, blockingOn) - /* @internal */ export const catchAllCause = dual< ( @@ -1462,11 +1446,11 @@ export const zipWith: { ): Effect.Effect => flatMap(self, (a) => map(that, (b) => f(a, b)))) /* @internal */ -export const never: Effect.Effect = asyncEither(() => { +export const never: Effect.Effect = async(() => { const interval = setInterval(() => { // }, 2 ** 31 - 1) - return Either.left(sync(() => clearInterval(interval))) + return sync(() => clearInterval(interval)) }) // ----------------------------------------------------------------------------- @@ -2844,18 +2828,18 @@ export const deferredMakeAs = (fiberId: FiberId.FiberId): Effect.E /* @internal */ export const deferredAwait = (self: Deferred.Deferred): Effect.Effect => - asyncEither((k) => { + async((resume) => { const state = MutableRef.get(self.state) switch (state._tag) { case DeferredOpCodes.OP_STATE_DONE: { - return Either.right(state.effect) + return resume(state.effect) } case DeferredOpCodes.OP_STATE_PENDING: { pipe( self.state, - MutableRef.set(deferred.pending([k, ...state.joiners])) + MutableRef.set(deferred.pending([resume, ...state.joiners])) ) - return Either.left(deferredInterruptJoiner(self, k)) + return deferredInterruptJoiner(self, resume) } } }, self.blockingOn) diff --git a/packages/effect/src/internal/effect/circular.ts b/packages/effect/src/internal/effect/circular.ts index 22048f76c3..9ccf2dd315 100644 --- a/packages/effect/src/internal/effect/circular.ts +++ b/packages/effect/src/internal/effect/circular.ts @@ -2,7 +2,6 @@ import type * as Cause from "../../Cause.js" import type * as Deferred from "../../Deferred.js" import * as Duration from "../../Duration.js" import type * as Effect from "../../Effect.js" -import * as Either from "../../Either.js" import * as Equal from "../../Equal.js" import type { Equivalence } from "../../Equivalence.js" import * as Exit from "../../Exit.js" @@ -45,7 +44,7 @@ class Semaphore { } readonly take = (n: number): Effect.Effect => - core.asyncEither((resume) => { + core.async((resume) => { if (this.free < n) { const observer = () => { if (this.free < n) { @@ -56,12 +55,12 @@ class Semaphore { resume(core.succeed(n)) } this.waiters.add(observer) - return Either.left(core.sync(() => { + return core.sync(() => { this.waiters.delete(observer) - })) + }) } this.taken += n - return Either.right(core.succeed(n)) + return resume(core.succeed(n)) }) readonly updateTaken = (f: (n: number) => number): Effect.Effect => diff --git a/packages/effect/src/internal/runtime.ts b/packages/effect/src/internal/runtime.ts index 08674e7455..49751dc33e 100644 --- a/packages/effect/src/internal/runtime.ts +++ b/packages/effect/src/internal/runtime.ts @@ -22,6 +22,7 @@ import * as core from "./core.js" import * as executionStrategy from "./executionStrategy.js" import * as FiberRuntime from "./fiberRuntime.js" import * as fiberScope from "./fiberScope.js" +import { internalize } from "./internalize.js" import * as OpCodes from "./opCodes/effect.js" import * as runtimeFlags from "./runtimeFlags.js" import * as _supervisor from "./supervisor.js" @@ -450,22 +451,34 @@ export const unsafeRunSyncExitEffect = unsafeRunSyncExit(defaultRuntime) // circular with Effect /** @internal */ -export const asyncEffect = ( - register: (callback: (_: Effect.Effect) => void) => Effect.Effect -): Effect.Effect => - core.flatMap( - core.deferredMake(), - (deferred) => - core.flatMap(runtime(), (runtime) => - core.uninterruptibleMask((restore) => - core.zipRight( - FiberRuntime.fork(restore( - core.catchAllCause( - register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))), - (cause) => core.deferredFailCause(deferred, cause) - ) - )), - restore(core.deferredAwait(deferred)) - ) - )) - ) +export const asyncEffect = ( + register: ( + callback: (_: Effect.Effect) => void + ) => Effect.Effect | void, E2, R2> +): Effect.Effect => + core.suspend(() => { + internalize(register) + let cleanup: Effect.Effect | void = undefined + return core.flatMap( + core.deferredMake(), + (deferred) => + core.flatMap(runtime(), (runtime) => + core.uninterruptibleMask((restore) => + core.zipRight( + FiberRuntime.fork(restore( + core.matchCauseEffect( + register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))), + { + onFailure: (cause) => core.deferredFailCause(deferred, cause), + onSuccess: (cleanup_) => { + cleanup = cleanup_ + return core.unit + } + } + ) + )), + restore(core.onInterrupt(core.deferredAwait(deferred), () => cleanup ?? core.unit)) + ) + )) + ) + }) diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 824b2d1c63..85b88423e3 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -48,7 +48,7 @@ import * as pull from "./stream/pull.js" import * as SinkEndReason from "./stream/sinkEndReason.js" import * as ZipAllState from "./stream/zipAllState.js" import * as ZipChunksState from "./stream/zipChunksState.js" -import * as _take from "./take.js" +import * as InternalTake from "./take.js" /** @internal */ const StreamSymbolKey = "effect/Stream" @@ -461,13 +461,57 @@ export const as = dual< /** @internal */ export const _async = ( - register: (emit: Emit.Emit) => void, + register: ( + emit: Emit.Emit + ) => Effect.Effect | void, outputBuffer = 16 ): Stream.Stream => - asyncOption((cb) => { - register(cb) - return Option.none() - }, outputBuffer) + Effect.acquireRelease( + Queue.bounded>(outputBuffer), + (queue) => Queue.shutdown(queue) + ).pipe( + Effect.flatMap((output) => + Effect.runtime().pipe( + Effect.flatMap((runtime) => + Effect.sync(() => { + const runPromiseExit = Runtime.runPromiseExit(runtime) + const canceler = register(emit.make((resume) => + InternalTake.fromPull(resume).pipe( + Effect.flatMap((take) => Queue.offer(output, take)), + Effect.asUnit, + runPromiseExit + ).then((exit) => { + if (Exit.isFailure(exit)) { + if (!Cause.isInterrupted(exit.cause)) { + throw Cause.squash(exit.cause) + } + } + }) + )) + return canceler + }) + ), + Effect.map((value) => { + const loop: Channel.Channel, unknown, E, unknown, void, unknown> = Queue.take(output).pipe( + Effect.flatMap((take) => InternalTake.done(take)), + Effect.match({ + onFailure: (maybeError) => + core.fromEffect(Queue.shutdown(output)).pipe( + channel.zipRight(Option.match(maybeError, { + onNone: () => core.unit, + onSome: (error) => core.fail(error) + })) + ), + onSuccess: (chunk) => core.write(chunk).pipe(core.flatMap(() => loop)) + }), + channel.unwrap + ) + return fromChannel(loop).pipe(ensuring(value ?? Effect.unit)) + }) + ) + ), + unwrapScoped + ) /** @internal */ export const asyncEffect = ( @@ -487,7 +531,7 @@ export const asyncEffect = ( register( emit.make((k) => pipe( - _take.fromPull(k), + InternalTake.fromPull(k), Effect.flatMap((take) => Queue.offer(output, take)), Effect.asUnit, Runtime.runPromiseExit(runtime) @@ -503,7 +547,7 @@ export const asyncEffect = ( Effect.map(() => { const loop: Channel.Channel, unknown, E, unknown, void, unknown> = pipe( Queue.take(output), - Effect.flatMap(_take.done), + Effect.flatMap(InternalTake.done), Effect.match({ onFailure: (maybeError) => pipe( @@ -524,84 +568,6 @@ export const asyncEffect = ( fromChannel ) -/** @internal */ -export const asyncInterrupt = ( - register: ( - emit: Emit.Emit - ) => Either.Either, Stream.Stream>, - outputBuffer = 16 -): Stream.Stream => - pipe( - Effect.acquireRelease( - Queue.bounded>(outputBuffer), - (queue) => Queue.shutdown(queue) - ), - Effect.flatMap((output) => - pipe( - Effect.runtime(), - Effect.flatMap((runtime) => - pipe( - Effect.sync(() => - register( - emit.make((k) => - pipe( - _take.fromPull(k), - Effect.flatMap((take) => Queue.offer(output, take)), - Effect.asUnit, - Runtime.runPromiseExit(runtime) - ).then((exit) => { - if (Exit.isFailure(exit)) { - if (!Cause.isInterrupted(exit.cause)) { - throw Cause.squash(exit.cause) - } - } - }) - ) - ) - ), - Effect.map(Either.match({ - onLeft: (canceler) => { - const loop: Channel.Channel, unknown, E, unknown, void, unknown> = pipe( - Queue.take(output), - Effect.flatMap(_take.done), - Effect.match({ - onFailure: (maybeError) => - channel.zipRight( - core.fromEffect(Queue.shutdown(output)), - Option.match(maybeError, { - onNone: () => core.unit, - onSome: core.fail - }) - ), - onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => loop)) - }), - channel.unwrap - ) - return pipe(fromChannel(loop), ensuring(canceler)) - }, - onRight: (stream) => unwrap(pipe(Queue.shutdown(output), Effect.as(stream))) - })) - ) - ) - ) - ), - unwrapScoped - ) - -/** @internal */ -export const asyncOption = ( - register: (emit: Emit.Emit) => Option.Option>, - outputBuffer = 16 -): Stream.Stream => - asyncInterrupt( - (emit) => - Option.match(register(emit), { - onNone: () => Either.left(Effect.unit), - onSome: Either.right - }), - outputBuffer - ) - /** @internal */ export const asyncScoped = ( register: (emit: Emit.Emit) => Effect.Effect, @@ -620,7 +586,7 @@ export const asyncScoped = ( register( emit.make((k) => pipe( - _take.fromPull(k), + InternalTake.fromPull(k), Effect.flatMap((take) => Queue.offer(output, take)), Effect.asUnit, Runtime.runPromiseExit(runtime) @@ -642,7 +608,7 @@ export const asyncScoped = ( pull.end() : pipe( Queue.take(output), - Effect.flatMap(_take.done), + Effect.flatMap(InternalTake.done), Effect.onError(() => pipe( Ref.set(ref, true), @@ -884,7 +850,7 @@ export const bufferChunks = dual< Effect.map(queue, (queue) => { const process: Channel.Channel, unknown, E, unknown, void, unknown> = pipe( core.fromEffect(Queue.take(queue)), - core.flatMap(_take.match({ + core.flatMap(InternalTake.match({ onEnd: () => core.unit, onFailure: core.failCause, onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process)) @@ -947,7 +913,7 @@ const bufferUnbounded = (self: Stream.Stream): Stream.Stream { const process: Channel.Channel, unknown, E, unknown, void, unknown> = pipe( core.fromEffect(Queue.take(queue)), - core.flatMap(_take.match({ + core.flatMap(InternalTake.match({ onEnd: () => core.unit, onFailure: core.failCause, onSuccess: (value) => core.flatMap(core.write(value), () => process) @@ -989,7 +955,7 @@ const bufferSignal = ( Effect.flatMap( (deferred) => pipe( - Queue.offer(queue, [_take.chunk(input), deferred] as const), + Queue.offer(queue, [InternalTake.chunk(input), deferred] as const), Effect.flatMap((added) => pipe(Ref.set(ref, deferred), Effect.when(() => added))) ) ), @@ -997,8 +963,8 @@ const bufferSignal = ( core.fromEffect, core.flatMap(() => producer(queue, ref)) ), - onFailure: (error) => terminate(_take.failCause(error)), - onDone: () => terminate(_take.end) + onFailure: (error) => terminate(InternalTake.failCause(error)), + onDone: () => terminate(InternalTake.end) }) } const consumer = ( @@ -1009,7 +975,7 @@ const bufferSignal = ( core.flatMap(([take, deferred]) => channel.zipRight( core.fromEffect(Deferred.succeed(deferred, void 0)), - _take.match(take, { + InternalTake.match(take, { onEnd: () => core.unit, onFailure: core.failCause, onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process)) @@ -1469,7 +1435,7 @@ export const combineChunks = dual< core.flatMap( core.fromEffect(pipe( handoff, - Handoff.offer>(_take.chunk(input)) + Handoff.offer>(InternalTake.chunk(input)) )), () => producer(handoff, latch) ), @@ -1477,11 +1443,11 @@ export const combineChunks = dual< core.fromEffect( Handoff.offer>( handoff, - _take.failCause(cause) + InternalTake.failCause(cause) ) ), onDone: (): Channel.Channel, never, Err, unknown, unknown, R> => - core.fromEffect(Handoff.offer>(handoff, _take.end)) + core.fromEffect(Handoff.offer>(handoff, InternalTake.end)) }) ) return new StreamImpl( @@ -1515,7 +1481,7 @@ export const combineChunks = dual< Effect.zipRight( pipe( Handoff.take(left), - Effect.flatMap(_take.done) + Effect.flatMap(InternalTake.done) ) ) ) @@ -1525,7 +1491,7 @@ export const combineChunks = dual< Effect.zipRight( pipe( Handoff.take(right), - Effect.flatMap(_take.done) + Effect.flatMap(InternalTake.done) ) ) ) @@ -3388,7 +3354,7 @@ export const interleaveWith = dual< onInput: (value: A | A2) => core.flatMap( core.fromEffect( - Handoff.offer>(handoff, _take.of(value)) + Handoff.offer>(handoff, InternalTake.of(value)) ), () => producer(handoff) ), @@ -3396,12 +3362,12 @@ export const interleaveWith = dual< core.fromEffect( Handoff.offer>( handoff, - _take.failCause(cause) + InternalTake.failCause(cause) ) ), onDone: () => core.fromEffect( - Handoff.offer>(handoff, _take.end) + Handoff.offer>(handoff, InternalTake.end) ) }) return new StreamImpl( @@ -3437,7 +3403,7 @@ export const interleaveWith = dual< if (bool && !leftDone) { return pipe( core.fromEffect(Handoff.take(left)), - core.flatMap(_take.match({ + core.flatMap(InternalTake.match({ onEnd: () => rightDone ? core.unit : process(true, rightDone), onFailure: core.failCause, onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => process(leftDone, rightDone))) @@ -3447,7 +3413,7 @@ export const interleaveWith = dual< if (!bool && !rightDone) { return pipe( core.fromEffect(Handoff.take(right)), - core.flatMap(_take.match({ + core.flatMap(InternalTake.match({ onEnd: () => leftDone ? core.unit : process(leftDone, true), onFailure: core.failCause, onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => process(leftDone, rightDone))) @@ -5450,9 +5416,9 @@ export const runIntoQueueScoped = dual< ): Effect.Effect => { const writer: Channel.Channel, Chunk.Chunk, never, E, unknown, unknown, R> = core .readWithCause({ - onInput: (input: Chunk.Chunk) => core.flatMap(core.write(_take.chunk(input)), () => writer), - onFailure: (cause) => core.write(_take.failCause(cause)), - onDone: () => core.write(_take.end) + onInput: (input: Chunk.Chunk) => core.flatMap(core.write(InternalTake.chunk(input)), () => writer), + onFailure: (cause) => core.write(InternalTake.failCause(cause)), + onDone: () => core.write(InternalTake.end) }) return pipe( core.pipeTo(toChannel(self), writer), @@ -6229,7 +6195,7 @@ export const tapSink = dual< .readWithCause({ onInput: (chunk: Chunk.Chunk) => pipe( - core.fromEffect(Queue.offer(queue, _take.chunk(chunk))), + core.fromEffect(Queue.offer(queue, InternalTake.chunk(chunk))), core.foldCauseChannel({ onFailure: () => core.flatMap(core.write(chunk), () => channel.identityChannel()), onSuccess: () => core.flatMap(core.write(chunk), () => loop) @@ -6237,7 +6203,7 @@ export const tapSink = dual< ) as Channel.Channel, Chunk.Chunk, E | E2, E, unknown, unknown, R2>, onFailure: (cause: Cause.Cause) => pipe( - core.fromEffect(Queue.offer(queue, _take.failCause(cause))), + core.fromEffect(Queue.offer(queue, InternalTake.failCause(cause))), core.foldCauseChannel({ onFailure: () => core.failCause(cause), onSuccess: () => core.failCause(cause) @@ -6245,7 +6211,7 @@ export const tapSink = dual< ), onDone: () => pipe( - core.fromEffect(Queue.offer(queue, _take.end)), + core.fromEffect(Queue.offer(queue, InternalTake.end)), core.foldCauseChannel({ onFailure: () => core.unit, onSuccess: () => core.unit @@ -6256,7 +6222,7 @@ export const tapSink = dual< new StreamImpl(pipe( core.pipeTo(toChannel(self), loop), channel.ensuring(Effect.zipRight( - Effect.forkDaemon(Queue.offer(queue, _take.end)), + Effect.forkDaemon(Queue.offer(queue, InternalTake.end)), Deferred.await(deferred) )) )), diff --git a/packages/effect/test/Effect/async.test.ts b/packages/effect/test/Effect/async.test.ts index 4e59bf0282..ae3ae46a69 100644 --- a/packages/effect/test/Effect/async.test.ts +++ b/packages/effect/test/Effect/async.test.ts @@ -22,8 +22,8 @@ describe("Effect", () => { })) it.effect("simple asyncEffect must return", () => Effect.gen(function*($) { - const result = yield* $(Effect.asyncEffect((cb) => { - return Effect.succeed(cb(Effect.succeed(42))) + const result = yield* $(Effect.asyncEffect((resume) => { + return Effect.succeed(resume(Effect.succeed(42))) })) assert.strictEqual(result, 42) })) @@ -95,18 +95,18 @@ describe("Effect", () => { assert.deepStrictEqual(unexpected, Chunk.empty()) assert.deepStrictEqual(result, Option.none()) // the timeout should happen })) - it.live("asyncMaybe should not resume fiber twice after synchronous result", () => + it.live("async should not resume fiber twice after synchronous result", () => Effect.gen(function*($) { const step = yield* $(Deferred.make()) const unexpectedPlace = yield* $(Ref.make(Chunk.empty())) const runtime = yield* $(Effect.runtime()) const fiber = yield* $( - Effect.asyncOption((cb) => { + Effect.async((resume) => { Runtime.runCallback(runtime)(pipe( Deferred.await(step), - Effect.zipRight(Effect.sync(() => cb(Ref.update(unexpectedPlace, Chunk.prepend(1))))) + Effect.zipRight(Effect.sync(() => resume(Ref.update(unexpectedPlace, Chunk.prepend(1))))) )) - return Option.some(Effect.unit) + return Effect.unit }), Effect.flatMap(() => Effect.async(() => { diff --git a/packages/effect/test/Effect/concurrency.test.ts b/packages/effect/test/Effect/concurrency.test.ts index 482b3c7770..2597182c89 100644 --- a/packages/effect/test/Effect/concurrency.test.ts +++ b/packages/effect/test/Effect/concurrency.test.ts @@ -49,7 +49,7 @@ describe("Effect", () => { const release = yield* $(Deferred.make()) const acquire = yield* $(Deferred.make()) const fiber = yield* $( - Effect.asyncEffect((_) => + Effect.asyncEffect((_) => // This will never complete because the callback is never invoked Effect.acquireUseRelease( Deferred.succeed(acquire, void 0), diff --git a/packages/effect/test/Effect/interruption.test.ts b/packages/effect/test/Effect/interruption.test.ts index 5cb4a77a5f..0b0172083f 100644 --- a/packages/effect/test/Effect/interruption.test.ts +++ b/packages/effect/test/Effect/interruption.test.ts @@ -546,11 +546,11 @@ describe("Effect", () => { it.effect("async cancelation", () => Effect.gen(function*($) { const ref = MutableRef.make(0) - const effect = Effect.asyncEither(() => { + const effect = Effect.async(() => { pipe(ref, MutableRef.set(MutableRef.get(ref) + 1)) - return Either.left(Effect.sync(() => { + return Effect.sync(() => { pipe(ref, MutableRef.set(MutableRef.get(ref) - 1)) - })) + }) }) yield* $(Effect.unit, Effect.race(effect)) const result = MutableRef.get(ref) diff --git a/packages/effect/test/Stream/async.test.ts b/packages/effect/test/Stream/async.test.ts index c82766db89..5f2ebea181 100644 --- a/packages/effect/test/Stream/async.test.ts +++ b/packages/effect/test/Stream/async.test.ts @@ -3,7 +3,6 @@ import * as Cause from "effect/Cause" import * as Chunk from "effect/Chunk" import * as Deferred from "effect/Deferred" import * as Effect from "effect/Effect" -import * as Either from "effect/Either" import * as Exit from "effect/Exit" import * as Fiber from "effect/Fiber" import { pipe } from "effect/Function" @@ -29,109 +28,14 @@ describe("Stream", () => { assert.deepStrictEqual(Array.from(result), array) })) - it.effect("asyncEffect - simple example", () => - Effect.gen(function*($) { - const array = [1, 2, 3, 4, 5] - const latch = yield* $(Deferred.make()) - const fiber = yield* $( - Stream.asyncEffect((emit) => { - array.forEach((n) => { - emit(Effect.succeed(Chunk.of(n))) - }) - return pipe( - Deferred.succeed(latch, void 0), - Effect.zipRight(Effect.unit) - ) - }), - Stream.take(array.length), - Stream.runCollect, - Effect.fork - ) - yield* $(Deferred.await(latch)) - const result = yield* $(Fiber.join(fiber)) - assert.deepStrictEqual(Array.from(result), array) - })) - - it.effect("asyncEffect - handles errors", () => - Effect.gen(function*($) { - const error = new Cause.RuntimeException("boom") - const result = yield* $( - Stream.asyncEffect((emit) => { - emit.fromEffect(Effect.fail(error)) - return Effect.unit - }), - Stream.runCollect, - Effect.exit - ) - assert.deepStrictEqual(result, Exit.fail(error)) - })) - - it.effect("asyncEffect - handles defects", () => - Effect.gen(function*($) { - const error = new Cause.RuntimeException("boom") - const result = yield* $( - Stream.asyncEffect(() => { - throw error - }), - Stream.runCollect, - Effect.exit - ) - assert.deepStrictEqual(result, Exit.die(error)) - })) - - it.effect("asyncEffect - signals the end of the stream", () => - Effect.gen(function*($) { - const result = yield* $( - Stream.asyncEffect((emit) => { - emit(Effect.fail(Option.none())) - return Effect.unit - }), - Stream.runCollect - ) - assert.isTrue(Chunk.isEmpty(result)) - })) - - it.effect("asyncEffect - backpressure", () => - Effect.gen(function*($) { - const refCount = yield* $(Ref.make(0)) - const refDone = yield* $(Ref.make(false)) - const stream = Stream.asyncEffect>((emit) => { - Promise.all( - // 1st consumed by sink, 2-6 – in queue, 7th – back pressured - [1, 2, 3, 4, 5, 6, 7].map((n) => - emit.fromEffectChunk( - pipe( - Ref.set(refCount, n), - Effect.zipRight(Effect.succeed(Chunk.of(1))) - ) - ) - ) - ).then(() => - emit.fromEffect( - pipe( - Ref.set(refDone, true), - Effect.zipRight(Effect.fail(Option.none())) - ) - ) - ) - return Effect.unit - }, 5) - const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) - const fiber = yield* $(stream, Stream.run(sink), Effect.fork) - yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 })) - const result = yield* $(Ref.get(refDone)) - yield* $(Fiber.interrupt(fiber)) - assert.isFalse(result) - })) - - it.effect("asyncInterrupt - left", () => + it.effect("async - with cleanup", () => Effect.gen(function*($) { const ref = yield* $(Ref.make(false)) const latch = yield* $(Deferred.make()) const fiber = yield* $( - Stream.asyncInterrupt((emit) => { + Stream.async((emit) => { emit.chunk(Chunk.of(void 0)) - return Either.left(Ref.set(ref, true)) + return Ref.set(ref, true) }), Stream.tap(() => Deferred.succeed(latch, void 0)), Stream.runDrain, @@ -143,35 +47,25 @@ describe("Stream", () => { assert.isTrue(result) })) - it.effect("asyncInterrupt - right", () => - Effect.gen(function*($) { - const chunk = Chunk.range(1, 5) - const result = yield* $( - Stream.asyncInterrupt(() => Either.right(Stream.fromChunk(chunk))), - Stream.runCollect - ) - assert.deepStrictEqual(Array.from(result), Array.from(chunk)) - })) - - it.effect("asyncInterrupt - signals the end of the stream", () => + it.effect("async - signals the end of the stream", () => Effect.gen(function*($) { const result = yield* $( - Stream.asyncInterrupt((emit) => { + Stream.async((emit) => { emit.end() - return Either.left(Effect.unit) + return Effect.unit }), Stream.runCollect ) assert.isTrue(Chunk.isEmpty(result)) })) - it.effect("asyncInterrupt - handles errors", () => + it.effect("async - handles errors", () => Effect.gen(function*($) { const error = new Cause.RuntimeException("boom") const result = yield* $( - Stream.asyncInterrupt((emit) => { + Stream.async((emit) => { emit.fromEffect(Effect.fail(error)) - return Either.left(Effect.unit) + return Effect.unit }), Stream.runCollect, Effect.exit @@ -179,11 +73,11 @@ describe("Stream", () => { assert.deepStrictEqual(result, Exit.fail(error)) })) - it.effect("asyncInterrupt - handles defects", () => + it.effect("async - handles defects", () => Effect.gen(function*($) { const error = new Cause.RuntimeException("boom") const result = yield* $( - Stream.asyncInterrupt(() => { + Stream.async(() => { throw error }), Stream.runCollect, @@ -192,11 +86,11 @@ describe("Stream", () => { assert.deepStrictEqual(result, Exit.die(error)) })) - it.effect("asyncInterrupt - backpressure", () => + it.effect("async - backpressure", () => Effect.gen(function*($) { const refCount = yield* $(Ref.make(0)) const refDone = yield* $(Ref.make(false)) - const stream = Stream.asyncInterrupt>((emit) => { + const stream = Stream.async>((emit) => { Promise.all( // 1st consumed by sink, 2-6 – in queue, 7th – back pressured [1, 2, 3, 4, 5, 6, 7].map((n) => @@ -215,7 +109,7 @@ describe("Stream", () => { ) ) ) - return Either.left(Effect.unit) + return Effect.unit }, 5) const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) const fiber = yield* $(stream, Stream.run(sink), Effect.fork) @@ -225,51 +119,36 @@ describe("Stream", () => { assert.isFalse(result) })) - it.effect("asyncOption - signals the end of the stream", () => - Effect.gen(function*($) { - const result = yield* $( - Stream.asyncOption((emit) => { - emit(Effect.fail(Option.none())) - return Option.none() - }), - Stream.runCollect - ) - assert.isTrue(Chunk.isEmpty(result)) - })) - - it.effect("asyncOption - some", () => - Effect.gen(function*($) { - const chunk = Chunk.range(1, 5) - const result = yield* $( - Stream.asyncOption(() => Option.some(Stream.fromChunk(chunk))), - Stream.runCollect - ) - assert.deepStrictEqual(Array.from(result), Array.from(chunk)) - })) - - it.effect("asyncOption - none", () => + it.effect("asyncEffect - simple example", () => Effect.gen(function*($) { const array = [1, 2, 3, 4, 5] - const result = yield* $( - Stream.asyncOption((emit) => { + const latch = yield* $(Deferred.make()) + const fiber = yield* $( + Stream.asyncEffect((emit) => { array.forEach((n) => { emit(Effect.succeed(Chunk.of(n))) }) - return Option.none() + return pipe( + Deferred.succeed(latch, void 0), + Effect.zipRight(Effect.unit) + ) }), Stream.take(array.length), - Stream.runCollect + Stream.runCollect, + Effect.fork ) + yield* $(Deferred.await(latch)) + const result = yield* $(Fiber.join(fiber)) assert.deepStrictEqual(Array.from(result), array) })) - it.effect("asyncOption - handles errors", () => + it.effect("asyncEffect - handles errors", () => Effect.gen(function*($) { const error = new Cause.RuntimeException("boom") const result = yield* $( - Stream.asyncOption((emit) => { + Stream.asyncEffect((emit) => { emit.fromEffect(Effect.fail(error)) - return Option.none() + return Effect.unit }), Stream.runCollect, Effect.exit @@ -277,11 +156,11 @@ describe("Stream", () => { assert.deepStrictEqual(result, Exit.fail(error)) })) - it.effect("asyncOption - handles defects", () => + it.effect("asyncEffect - handles defects", () => Effect.gen(function*($) { const error = new Cause.RuntimeException("boom") const result = yield* $( - Stream.asyncOption(() => { + Stream.asyncEffect(() => { throw error }), Stream.runCollect, @@ -290,11 +169,23 @@ describe("Stream", () => { assert.deepStrictEqual(result, Exit.die(error)) })) - it.effect("asyncOption - backpressure", () => + it.effect("asyncEffect - signals the end of the stream", () => + Effect.gen(function*($) { + const result = yield* $( + Stream.asyncEffect((emit) => { + emit(Effect.fail(Option.none())) + return Effect.unit + }), + Stream.runCollect + ) + assert.isTrue(Chunk.isEmpty(result)) + })) + + it.effect("asyncEffect - backpressure", () => Effect.gen(function*($) { const refCount = yield* $(Ref.make(0)) const refDone = yield* $(Ref.make(false)) - const stream = Stream.asyncOption>((emit) => { + const stream = Stream.asyncEffect>((emit) => { Promise.all( // 1st consumed by sink, 2-6 – in queue, 7th – back pressured [1, 2, 3, 4, 5, 6, 7].map((n) => @@ -313,16 +204,114 @@ describe("Stream", () => { ) ) ) - return Option.none() + return Effect.unit }, 5) const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) const fiber = yield* $(stream, Stream.run(sink), Effect.fork) yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 })) const result = yield* $(Ref.get(refDone)) - yield* $(Fiber.interrupt(fiber), Effect.exit) + yield* $(Fiber.interrupt(fiber)) assert.isFalse(result) })) + // it.effect("asyncOption - signals the end of the stream", () => + // Effect.gen(function*($) { + // const result = yield* $( + // Stream.asyncOption((emit) => { + // emit(Effect.fail(Option.none())) + // return Option.none() + // }), + // Stream.runCollect + // ) + // assert.isTrue(Chunk.isEmpty(result)) + // })) + + // it.effect("asyncOption - some", () => + // Effect.gen(function*($) { + // const chunk = Chunk.range(1, 5) + // const result = yield* $( + // Stream.asyncOption(() => Option.some(Stream.fromChunk(chunk))), + // Stream.runCollect + // ) + // assert.deepStrictEqual(Array.from(result), Array.from(chunk)) + // })) + + // it.effect("asyncOption - none", () => + // Effect.gen(function*($) { + // const array = [1, 2, 3, 4, 5] + // const result = yield* $( + // Stream.asyncOption((emit) => { + // array.forEach((n) => { + // emit(Effect.succeed(Chunk.of(n))) + // }) + // return Option.none() + // }), + // Stream.take(array.length), + // Stream.runCollect + // ) + // assert.deepStrictEqual(Array.from(result), array) + // })) + + // it.effect("asyncOption - handles errors", () => + // Effect.gen(function*($) { + // const error = new Cause.RuntimeException("boom") + // const result = yield* $( + // Stream.asyncOption((emit) => { + // emit.fromEffect(Effect.fail(error)) + // return Option.none() + // }), + // Stream.runCollect, + // Effect.exit + // ) + // assert.deepStrictEqual(result, Exit.fail(error)) + // })) + + // it.effect("asyncOption - handles defects", () => + // Effect.gen(function*($) { + // const error = new Cause.RuntimeException("boom") + // const result = yield* $( + // Stream.asyncOption(() => { + // throw error + // }), + // Stream.runCollect, + // Effect.exit + // ) + // assert.deepStrictEqual(result, Exit.die(error)) + // })) + + // it.effect("asyncOption - backpressure", () => + // Effect.gen(function*($) { + // const refCount = yield* $(Ref.make(0)) + // const refDone = yield* $(Ref.make(false)) + // const stream = Stream.asyncOption>((emit) => { + // Promise.all( + // // 1st consumed by sink, 2-6 – in queue, 7th – back pressured + // [1, 2, 3, 4, 5, 6, 7].map((n) => + // emit.fromEffectChunk( + // pipe( + // Ref.set(refCount, n), + // Effect.zipRight(Effect.succeed(Chunk.of(1))) + // ) + // ) + // ) + // ).then(() => + // emit.fromEffect( + // pipe( + // Ref.set(refDone, true), + // Effect.zipRight(Effect.fail(Option.none())) + // ) + // ) + // ) + // return Option.none() + // }, 5) + // const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) + // const fiber = yield* $(stream, Stream.run(sink), Effect.fork) + // yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 })) + // const result = yield* $(Ref.get(refDone)) + // yield* $(Fiber.interrupt(fiber), Effect.exit) + // assert.isFalse(result) + // })) + it.effect("asyncScoped", () => Effect.gen(function*($) { const array = [1, 2, 3, 4, 5]