diff --git a/.changeset/cuddly-parrots-know.md b/.changeset/cuddly-parrots-know.md
new file mode 100644
index 0000000000..17ae1c45db
--- /dev/null
+++ b/.changeset/cuddly-parrots-know.md
@@ -0,0 +1,23 @@
+---
+"effect": minor
+---
+
+Consolidate `Effect.asyncOption`, `Effect.asyncEither`, `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt`
+
+This PR removes `Effect.asyncOption` and `Effect.asyncEither` as their behavior can be entirely implemented with the new signature of `Effect.async`, which optionally returns a cleanup `Effect` from the registration callback.
+
+```ts
+declare const async: (
+ register: (callback: (_: Effect) => void, signal: AbortSignal) => void | Effect,
+ blockingOn?: FiberId
+) => Effect
+```
+
+Additionally, this PR removes `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt` as their behavior can be entirely implemented with the new signature of `Stream.async`, which can optionally return a cleanup `Effect` from the registration callback.
+
+```ts
+declare const async: (
+ register: (emit: Emit) => Effect | void,
+ outputBuffer?: number
+) => Stream
+```
diff --git a/package.json b/package.json
index 39b1f2143f..ef860a9c3d 100644
--- a/package.json
+++ b/package.json
@@ -1,7 +1,7 @@
{
"private": true,
"type": "module",
- "packageManager": "pnpm@8.12.1",
+ "packageManager": "pnpm@8.15.1",
"workspaces": [
"packages/*"
],
diff --git a/packages/effect/src/Effect.ts b/packages/effect/src/Effect.ts
index 43369d6ffa..9bf2c68281 100644
--- a/packages/effect/src/Effect.ts
+++ b/packages/effect/src/Effect.ts
@@ -976,17 +976,20 @@ export const validateFirst: {
// -------------------------------------------------------------------------------------
/**
- * Imports an asynchronous side-effect into a pure `Effect` value.
- * The callback function `Effect => void` must be called at most once.
+ * Imports an asynchronous side-effect into a pure `Effect` value. The callback
+ * function `Effect => void` **MUST** be called at most once.
*
- * If an Effect is returned by the registration function, it will be executed
- * if the fiber executing the effect is interrupted.
+ * The registration function can optionally return an Effect, which will be
+ * executed if the `Fiber` executing this Effect is interrupted.
*
* The registration function can also receive an `AbortSignal` if required for
* interruption.
*
- * The `FiberId` of the fiber that may complete the async callback may be
- * provided to allow for better diagnostics.
+ * The `FiberId` of the fiber that may complete the async callback may also be
+ * specified. This is called the "blocking fiber" because it suspends the fiber
+ * executing the `async` Effect (i.e. semantically blocks the fiber from making
+ * progress). Specifying this fiber id in cases where it is known will improve
+ * diagnostics, but not affect the behavior of the returned effect.
*
* @since 2.0.0
* @category constructors
@@ -1005,51 +1008,9 @@ export const async: (
* @since 2.0.0
* @category constructors
*/
-export const asyncEffect: (
- register: (callback: (_: Effect) => void) => Effect
-) => Effect = _runtime.asyncEffect
-
-/**
- * Imports an asynchronous effect into a pure `Effect` value, possibly returning
- * the value synchronously.
- *
- * If the register function returns a value synchronously, then the callback
- * function `Effect => void` must not be called. Otherwise the callback
- * function must be called at most once.
- *
- * The `FiberId` of the fiber that may complete the async callback may be
- * provided to allow for better diagnostics.
- *
- * @since 2.0.0
- * @category constructors
- */
-export const asyncOption: (
- register: (callback: (_: Effect) => void) => Option.Option>,
- blockingOn?: FiberId.FiberId
-) => Effect = effect.asyncOption
-
-/**
- * Imports an asynchronous side-effect into an effect. It has the option of
- * returning the value synchronously, which is useful in cases where it cannot
- * be determined if the effect is synchronous or asynchronous until the register
- * is actually executed. It also has the option of returning a canceler,
- * which will be used by the runtime to cancel the asynchronous effect if the fiber
- * executing the effect is interrupted.
- *
- * If the register function returns a value synchronously, then the callback
- * function `Effect => void` must not be called. Otherwise the callback
- * function must be called at most once.
- *
- * The `FiberId` of the fiber that may complete the async callback may be
- * provided to allow for better diagnostics.
- *
- * @since 2.0.0
- * @category constructors
- */
-export const asyncEither: (
- register: (callback: (effect: Effect) => void) => Either.Either, Effect>,
- blockingOn?: FiberId.FiberId
-) => Effect = core.asyncEither
+export const asyncEffect: (
+ register: (callback: (_: Effect) => void) => Effect | void, E2, R2>
+) => Effect = _runtime.asyncEffect
/**
* @since 2.0.0
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index dd65f89881..8d7f6b577c 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -256,15 +256,18 @@ export const as: {
} = internal.as
const _async: (
- register: (emit: Emit.Emit) => void,
+ register: (emit: Emit.Emit) => Effect.Effect | void,
outputBuffer?: number
) => Stream = internal._async
export {
/**
* Creates a stream from an asynchronous callback that can be called multiple
- * times. The optionality of the error type `E` can be used to signal the end
- * of the stream, by setting it to `None`.
+ * times. The optionality of the error type `E` in `Emit` can be used to
+ * signal the end of the stream by setting it to `None`.
+ *
+ * The registration function can optionally return an `Effect`, which will be
+ * executed if the `Fiber` executing this Effect is interrupted.
*
* @since 2.0.0
* @category constructors
@@ -286,34 +289,6 @@ export const asyncEffect: (
outputBuffer?: number
) => Stream = internal.asyncEffect
-/**
- * Creates a stream from an asynchronous callback that can be called multiple
- * times. The registration of the callback returns either a canceler or
- * synchronously returns a stream. The optionality of the error type `E` can
- * be used to signal the end of the stream, by setting it to `None`.
- *
- * @since 2.0.0
- * @category constructors
- */
-export const asyncInterrupt: (
- register: (emit: Emit.Emit) => Either.Either, Stream>,
- outputBuffer?: number
-) => Stream = internal.asyncInterrupt
-
-/**
- * Creates a stream from an asynchronous callback that can be called multiple
- * times. The registration of the callback can possibly return the stream
- * synchronously. The optionality of the error type `E` can be used to signal
- * the end of the stream, by setting it to `None`.
- *
- * @since 2.0.0
- * @category constructors
- */
-export const asyncOption: (
- register: (emit: Emit.Emit) => Option.Option>,
- outputBuffer?: number
-) => Stream = internal.asyncOption
-
/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback itself returns an a scoped
diff --git a/packages/effect/src/internal/clock.ts b/packages/effect/src/internal/clock.ts
index cad21ceb3a..80cd809b1c 100644
--- a/packages/effect/src/internal/clock.ts
+++ b/packages/effect/src/internal/clock.ts
@@ -2,7 +2,6 @@ import type * as Clock from "../Clock.js"
import * as Context from "../Context.js"
import * as Duration from "../Duration.js"
import type * as Effect from "../Effect.js"
-import * as Either from "../Either.js"
import { constFalse } from "../Function.js"
import * as core from "./core.js"
@@ -85,9 +84,9 @@ class ClockImpl implements Clock.Clock {
}
sleep(duration: Duration.Duration): Effect.Effect {
- return core.asyncEither((cb) => {
- const canceler = globalClockScheduler.unsafeSchedule(() => cb(core.unit), duration)
- return Either.left(core.asUnit(core.sync(canceler)))
+ return core.async((resume) => {
+ const canceler = globalClockScheduler.unsafeSchedule(() => resume(core.unit), duration)
+ return core.asUnit(core.sync(canceler))
})
}
}
diff --git a/packages/effect/src/internal/core-effect.ts b/packages/effect/src/internal/core-effect.ts
index bef5208681..88fb7e1224 100644
--- a/packages/effect/src/internal/core-effect.ts
+++ b/packages/effect/src/internal/core-effect.ts
@@ -4,9 +4,8 @@ import * as Clock from "../Clock.js"
import * as Context from "../Context.js"
import * as Duration from "../Duration.js"
import type * as Effect from "../Effect.js"
-import * as Either from "../Either.js"
import type * as Fiber from "../Fiber.js"
-import * as FiberId from "../FiberId.js"
+import type * as FiberId from "../FiberId.js"
import type * as FiberRef from "../FiberRef.js"
import * as FiberRefs from "../FiberRefs.js"
import type * as FiberRefsPatch from "../FiberRefsPatch.js"
@@ -75,26 +74,6 @@ export const asSome = (self: Effect.Effect): Effect.Effect(self: Effect.Effect): Effect.Effect, R> =>
core.mapError(self, Option.some)
-/* @internal */
-export const asyncOption = (
- register: (callback: (_: Effect.Effect) => void) => Option.Option>,
- blockingOn: FiberId.FiberId = FiberId.none
-): Effect.Effect =>
- core.asyncEither(
- (cb) => {
- const option = register(cb)
- switch (option._tag) {
- case "None": {
- return Either.left(core.unit)
- }
- case "Some": {
- return Either.right(option.value)
- }
- }
- },
- blockingOn
- )
-
/* @internal */
export const try_: {
(options: {
diff --git a/packages/effect/src/internal/core.ts b/packages/effect/src/internal/core.ts
index bb65d74250..25322c3036 100644
--- a/packages/effect/src/internal/core.ts
+++ b/packages/effect/src/internal/core.ts
@@ -480,22 +480,6 @@ export const async = (
effect
})
-/* @internal */
-export const asyncEither = (
- register: (
- resume: (effect: Effect.Effect) => void
- ) => Either.Either, Effect.Effect>,
- blockingOn: FiberId.FiberId = FiberId.none
-): Effect.Effect =>
- async((resume) => {
- const result = register(resume)
- if (Either.isRight(result)) {
- resume(result.right)
- } else {
- return result.left
- }
- }, blockingOn)
-
/* @internal */
export const catchAllCause = dual<
(
@@ -1462,11 +1446,11 @@ export const zipWith: {
): Effect.Effect => flatMap(self, (a) => map(that, (b) => f(a, b))))
/* @internal */
-export const never: Effect.Effect = asyncEither(() => {
+export const never: Effect.Effect = async(() => {
const interval = setInterval(() => {
//
}, 2 ** 31 - 1)
- return Either.left(sync(() => clearInterval(interval)))
+ return sync(() => clearInterval(interval))
})
// -----------------------------------------------------------------------------
@@ -2844,18 +2828,18 @@ export const deferredMakeAs = (fiberId: FiberId.FiberId): Effect.E
/* @internal */
export const deferredAwait = (self: Deferred.Deferred): Effect.Effect =>
- asyncEither((k) => {
+ async((resume) => {
const state = MutableRef.get(self.state)
switch (state._tag) {
case DeferredOpCodes.OP_STATE_DONE: {
- return Either.right(state.effect)
+ return resume(state.effect)
}
case DeferredOpCodes.OP_STATE_PENDING: {
pipe(
self.state,
- MutableRef.set(deferred.pending([k, ...state.joiners]))
+ MutableRef.set(deferred.pending([resume, ...state.joiners]))
)
- return Either.left(deferredInterruptJoiner(self, k))
+ return deferredInterruptJoiner(self, resume)
}
}
}, self.blockingOn)
diff --git a/packages/effect/src/internal/effect/circular.ts b/packages/effect/src/internal/effect/circular.ts
index 0ae3473586..8c07bbd4a5 100644
--- a/packages/effect/src/internal/effect/circular.ts
+++ b/packages/effect/src/internal/effect/circular.ts
@@ -2,7 +2,6 @@ import type * as Cause from "../../Cause.js"
import type * as Deferred from "../../Deferred.js"
import * as Duration from "../../Duration.js"
import type * as Effect from "../../Effect.js"
-import * as Either from "../../Either.js"
import * as Equal from "../../Equal.js"
import type { Equivalence } from "../../Equivalence.js"
import * as Exit from "../../Exit.js"
@@ -45,7 +44,7 @@ class Semaphore {
}
readonly take = (n: number): Effect.Effect =>
- core.asyncEither((resume) => {
+ core.async((resume) => {
if (this.free < n) {
const observer = () => {
if (this.free < n) {
@@ -57,12 +56,12 @@ class Semaphore {
return true
}
this.waiters.add(observer)
- return Either.left(core.sync(() => {
+ return core.sync(() => {
this.waiters.delete(observer)
- }))
+ })
}
this.taken += n
- return Either.right(core.succeed(n))
+ return resume(core.succeed(n))
})
readonly updateTaken = (f: (n: number) => number): Effect.Effect =>
diff --git a/packages/effect/src/internal/runtime.ts b/packages/effect/src/internal/runtime.ts
index 5f3daac39f..b5a4634c76 100644
--- a/packages/effect/src/internal/runtime.ts
+++ b/packages/effect/src/internal/runtime.ts
@@ -22,6 +22,7 @@ import * as core from "./core.js"
import * as executionStrategy from "./executionStrategy.js"
import * as FiberRuntime from "./fiberRuntime.js"
import * as fiberScope from "./fiberScope.js"
+import { internalize } from "./internalize.js"
import * as OpCodes from "./opCodes/effect.js"
import * as runtimeFlags from "./runtimeFlags.js"
import * as _supervisor from "./supervisor.js"
@@ -409,22 +410,34 @@ export const unsafeRunSyncExitEffect = unsafeRunSyncExit(defaultRuntime)
// circular with Effect
/** @internal */
-export const asyncEffect = (
- register: (callback: (_: Effect.Effect) => void) => Effect.Effect
-): Effect.Effect =>
- core.flatMap(
- core.deferredMake(),
- (deferred) =>
- core.flatMap(runtime(), (runtime) =>
- core.uninterruptibleMask((restore) =>
- core.zipRight(
- FiberRuntime.fork(restore(
- core.catchAllCause(
- register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))),
- (cause) => core.deferredFailCause(deferred, cause)
- )
- )),
- restore(core.deferredAwait(deferred))
- )
- ))
- )
+export const asyncEffect = (
+ register: (
+ callback: (_: Effect.Effect) => void
+ ) => Effect.Effect | void, E2, R2>
+): Effect.Effect =>
+ core.suspend(() => {
+ internalize(register)
+ let cleanup: Effect.Effect | void = undefined
+ return core.flatMap(
+ core.deferredMake(),
+ (deferred) =>
+ core.flatMap(runtime(), (runtime) =>
+ core.uninterruptibleMask((restore) =>
+ core.zipRight(
+ FiberRuntime.fork(restore(
+ core.matchCauseEffect(
+ register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))),
+ {
+ onFailure: (cause) => core.deferredFailCause(deferred, cause),
+ onSuccess: (cleanup_) => {
+ cleanup = cleanup_
+ return core.unit
+ }
+ }
+ )
+ )),
+ restore(core.onInterrupt(core.deferredAwait(deferred), () => cleanup ?? core.unit))
+ )
+ ))
+ )
+ })
diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts
index 824b2d1c63..85b88423e3 100644
--- a/packages/effect/src/internal/stream.ts
+++ b/packages/effect/src/internal/stream.ts
@@ -48,7 +48,7 @@ import * as pull from "./stream/pull.js"
import * as SinkEndReason from "./stream/sinkEndReason.js"
import * as ZipAllState from "./stream/zipAllState.js"
import * as ZipChunksState from "./stream/zipChunksState.js"
-import * as _take from "./take.js"
+import * as InternalTake from "./take.js"
/** @internal */
const StreamSymbolKey = "effect/Stream"
@@ -461,13 +461,57 @@ export const as = dual<
/** @internal */
export const _async = (
- register: (emit: Emit.Emit) => void,
+ register: (
+ emit: Emit.Emit
+ ) => Effect.Effect | void,
outputBuffer = 16
): Stream.Stream =>
- asyncOption((cb) => {
- register(cb)
- return Option.none()
- }, outputBuffer)
+ Effect.acquireRelease(
+ Queue.bounded>(outputBuffer),
+ (queue) => Queue.shutdown(queue)
+ ).pipe(
+ Effect.flatMap((output) =>
+ Effect.runtime().pipe(
+ Effect.flatMap((runtime) =>
+ Effect.sync(() => {
+ const runPromiseExit = Runtime.runPromiseExit(runtime)
+ const canceler = register(emit.make((resume) =>
+ InternalTake.fromPull(resume).pipe(
+ Effect.flatMap((take) => Queue.offer(output, take)),
+ Effect.asUnit,
+ runPromiseExit
+ ).then((exit) => {
+ if (Exit.isFailure(exit)) {
+ if (!Cause.isInterrupted(exit.cause)) {
+ throw Cause.squash(exit.cause)
+ }
+ }
+ })
+ ))
+ return canceler
+ })
+ ),
+ Effect.map((value) => {
+ const loop: Channel.Channel, unknown, E, unknown, void, unknown> = Queue.take(output).pipe(
+ Effect.flatMap((take) => InternalTake.done(take)),
+ Effect.match({
+ onFailure: (maybeError) =>
+ core.fromEffect(Queue.shutdown(output)).pipe(
+ channel.zipRight(Option.match(maybeError, {
+ onNone: () => core.unit,
+ onSome: (error) => core.fail(error)
+ }))
+ ),
+ onSuccess: (chunk) => core.write(chunk).pipe(core.flatMap(() => loop))
+ }),
+ channel.unwrap
+ )
+ return fromChannel(loop).pipe(ensuring(value ?? Effect.unit))
+ })
+ )
+ ),
+ unwrapScoped
+ )
/** @internal */
export const asyncEffect = (
@@ -487,7 +531,7 @@ export const asyncEffect = (
register(
emit.make((k) =>
pipe(
- _take.fromPull(k),
+ InternalTake.fromPull(k),
Effect.flatMap((take) => Queue.offer(output, take)),
Effect.asUnit,
Runtime.runPromiseExit(runtime)
@@ -503,7 +547,7 @@ export const asyncEffect = (
Effect.map(() => {
const loop: Channel.Channel, unknown, E, unknown, void, unknown> = pipe(
Queue.take(output),
- Effect.flatMap(_take.done),
+ Effect.flatMap(InternalTake.done),
Effect.match({
onFailure: (maybeError) =>
pipe(
@@ -524,84 +568,6 @@ export const asyncEffect = (
fromChannel
)
-/** @internal */
-export const asyncInterrupt = (
- register: (
- emit: Emit.Emit
- ) => Either.Either, Stream.Stream>,
- outputBuffer = 16
-): Stream.Stream =>
- pipe(
- Effect.acquireRelease(
- Queue.bounded>(outputBuffer),
- (queue) => Queue.shutdown(queue)
- ),
- Effect.flatMap((output) =>
- pipe(
- Effect.runtime(),
- Effect.flatMap((runtime) =>
- pipe(
- Effect.sync(() =>
- register(
- emit.make((k) =>
- pipe(
- _take.fromPull(k),
- Effect.flatMap((take) => Queue.offer(output, take)),
- Effect.asUnit,
- Runtime.runPromiseExit(runtime)
- ).then((exit) => {
- if (Exit.isFailure(exit)) {
- if (!Cause.isInterrupted(exit.cause)) {
- throw Cause.squash(exit.cause)
- }
- }
- })
- )
- )
- ),
- Effect.map(Either.match({
- onLeft: (canceler) => {
- const loop: Channel.Channel, unknown, E, unknown, void, unknown> = pipe(
- Queue.take(output),
- Effect.flatMap(_take.done),
- Effect.match({
- onFailure: (maybeError) =>
- channel.zipRight(
- core.fromEffect(Queue.shutdown(output)),
- Option.match(maybeError, {
- onNone: () => core.unit,
- onSome: core.fail
- })
- ),
- onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => loop))
- }),
- channel.unwrap
- )
- return pipe(fromChannel(loop), ensuring(canceler))
- },
- onRight: (stream) => unwrap(pipe(Queue.shutdown(output), Effect.as(stream)))
- }))
- )
- )
- )
- ),
- unwrapScoped
- )
-
-/** @internal */
-export const asyncOption = (
- register: (emit: Emit.Emit) => Option.Option>,
- outputBuffer = 16
-): Stream.Stream =>
- asyncInterrupt(
- (emit) =>
- Option.match(register(emit), {
- onNone: () => Either.left(Effect.unit),
- onSome: Either.right
- }),
- outputBuffer
- )
-
/** @internal */
export const asyncScoped = (
register: (emit: Emit.Emit) => Effect.Effect,
@@ -620,7 +586,7 @@ export const asyncScoped = (
register(
emit.make((k) =>
pipe(
- _take.fromPull(k),
+ InternalTake.fromPull(k),
Effect.flatMap((take) => Queue.offer(output, take)),
Effect.asUnit,
Runtime.runPromiseExit(runtime)
@@ -642,7 +608,7 @@ export const asyncScoped = (
pull.end() :
pipe(
Queue.take(output),
- Effect.flatMap(_take.done),
+ Effect.flatMap(InternalTake.done),
Effect.onError(() =>
pipe(
Ref.set(ref, true),
@@ -884,7 +850,7 @@ export const bufferChunks = dual<
Effect.map(queue, (queue) => {
const process: Channel.Channel, unknown, E, unknown, void, unknown> = pipe(
core.fromEffect(Queue.take(queue)),
- core.flatMap(_take.match({
+ core.flatMap(InternalTake.match({
onEnd: () => core.unit,
onFailure: core.failCause,
onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process))
@@ -947,7 +913,7 @@ const bufferUnbounded = (self: Stream.Stream): Stream.Stream {
const process: Channel.Channel, unknown, E, unknown, void, unknown> = pipe(
core.fromEffect(Queue.take(queue)),
- core.flatMap(_take.match({
+ core.flatMap(InternalTake.match({
onEnd: () => core.unit,
onFailure: core.failCause,
onSuccess: (value) => core.flatMap(core.write(value), () => process)
@@ -989,7 +955,7 @@ const bufferSignal = (
Effect.flatMap(
(deferred) =>
pipe(
- Queue.offer(queue, [_take.chunk(input), deferred] as const),
+ Queue.offer(queue, [InternalTake.chunk(input), deferred] as const),
Effect.flatMap((added) => pipe(Ref.set(ref, deferred), Effect.when(() => added)))
)
),
@@ -997,8 +963,8 @@ const bufferSignal = (
core.fromEffect,
core.flatMap(() => producer(queue, ref))
),
- onFailure: (error) => terminate(_take.failCause(error)),
- onDone: () => terminate(_take.end)
+ onFailure: (error) => terminate(InternalTake.failCause(error)),
+ onDone: () => terminate(InternalTake.end)
})
}
const consumer = (
@@ -1009,7 +975,7 @@ const bufferSignal = (
core.flatMap(([take, deferred]) =>
channel.zipRight(
core.fromEffect(Deferred.succeed(deferred, void 0)),
- _take.match(take, {
+ InternalTake.match(take, {
onEnd: () => core.unit,
onFailure: core.failCause,
onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process))
@@ -1469,7 +1435,7 @@ export const combineChunks = dual<
core.flatMap(
core.fromEffect(pipe(
handoff,
- Handoff.offer>(_take.chunk(input))
+ Handoff.offer>(InternalTake.chunk(input))
)),
() => producer(handoff, latch)
),
@@ -1477,11 +1443,11 @@ export const combineChunks = dual<
core.fromEffect(
Handoff.offer>(
handoff,
- _take.failCause(cause)
+ InternalTake.failCause(cause)
)
),
onDone: (): Channel.Channel, never, Err, unknown, unknown, R> =>
- core.fromEffect(Handoff.offer>(handoff, _take.end))
+ core.fromEffect(Handoff.offer>(handoff, InternalTake.end))
})
)
return new StreamImpl(
@@ -1515,7 +1481,7 @@ export const combineChunks = dual<
Effect.zipRight(
pipe(
Handoff.take(left),
- Effect.flatMap(_take.done)
+ Effect.flatMap(InternalTake.done)
)
)
)
@@ -1525,7 +1491,7 @@ export const combineChunks = dual<
Effect.zipRight(
pipe(
Handoff.take(right),
- Effect.flatMap(_take.done)
+ Effect.flatMap(InternalTake.done)
)
)
)
@@ -3388,7 +3354,7 @@ export const interleaveWith = dual<
onInput: (value: A | A2) =>
core.flatMap(
core.fromEffect(
- Handoff.offer>(handoff, _take.of(value))
+ Handoff.offer>(handoff, InternalTake.of(value))
),
() => producer(handoff)
),
@@ -3396,12 +3362,12 @@ export const interleaveWith = dual<
core.fromEffect(
Handoff.offer>(
handoff,
- _take.failCause(cause)
+ InternalTake.failCause(cause)
)
),
onDone: () =>
core.fromEffect(
- Handoff.offer>(handoff, _take.end)
+ Handoff.offer>(handoff, InternalTake.end)
)
})
return new StreamImpl(
@@ -3437,7 +3403,7 @@ export const interleaveWith = dual<
if (bool && !leftDone) {
return pipe(
core.fromEffect(Handoff.take(left)),
- core.flatMap(_take.match({
+ core.flatMap(InternalTake.match({
onEnd: () => rightDone ? core.unit : process(true, rightDone),
onFailure: core.failCause,
onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => process(leftDone, rightDone)))
@@ -3447,7 +3413,7 @@ export const interleaveWith = dual<
if (!bool && !rightDone) {
return pipe(
core.fromEffect(Handoff.take(right)),
- core.flatMap(_take.match({
+ core.flatMap(InternalTake.match({
onEnd: () => leftDone ? core.unit : process(leftDone, true),
onFailure: core.failCause,
onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => process(leftDone, rightDone)))
@@ -5450,9 +5416,9 @@ export const runIntoQueueScoped = dual<
): Effect.Effect => {
const writer: Channel.Channel, Chunk.Chunk, never, E, unknown, unknown, R> = core
.readWithCause({
- onInput: (input: Chunk.Chunk) => core.flatMap(core.write(_take.chunk(input)), () => writer),
- onFailure: (cause) => core.write(_take.failCause(cause)),
- onDone: () => core.write(_take.end)
+ onInput: (input: Chunk.Chunk) => core.flatMap(core.write(InternalTake.chunk(input)), () => writer),
+ onFailure: (cause) => core.write(InternalTake.failCause(cause)),
+ onDone: () => core.write(InternalTake.end)
})
return pipe(
core.pipeTo(toChannel(self), writer),
@@ -6229,7 +6195,7 @@ export const tapSink = dual<
.readWithCause({
onInput: (chunk: Chunk.Chunk) =>
pipe(
- core.fromEffect(Queue.offer(queue, _take.chunk(chunk))),
+ core.fromEffect(Queue.offer(queue, InternalTake.chunk(chunk))),
core.foldCauseChannel({
onFailure: () => core.flatMap(core.write(chunk), () => channel.identityChannel()),
onSuccess: () => core.flatMap(core.write(chunk), () => loop)
@@ -6237,7 +6203,7 @@ export const tapSink = dual<
) as Channel.Channel, Chunk.Chunk, E | E2, E, unknown, unknown, R2>,
onFailure: (cause: Cause.Cause) =>
pipe(
- core.fromEffect(Queue.offer(queue, _take.failCause(cause))),
+ core.fromEffect(Queue.offer(queue, InternalTake.failCause(cause))),
core.foldCauseChannel({
onFailure: () => core.failCause(cause),
onSuccess: () => core.failCause(cause)
@@ -6245,7 +6211,7 @@ export const tapSink = dual<
),
onDone: () =>
pipe(
- core.fromEffect(Queue.offer(queue, _take.end)),
+ core.fromEffect(Queue.offer(queue, InternalTake.end)),
core.foldCauseChannel({
onFailure: () => core.unit,
onSuccess: () => core.unit
@@ -6256,7 +6222,7 @@ export const tapSink = dual<
new StreamImpl(pipe(
core.pipeTo(toChannel(self), loop),
channel.ensuring(Effect.zipRight(
- Effect.forkDaemon(Queue.offer(queue, _take.end)),
+ Effect.forkDaemon(Queue.offer(queue, InternalTake.end)),
Deferred.await(deferred)
))
)),
diff --git a/packages/effect/test/Effect/async.test.ts b/packages/effect/test/Effect/async.test.ts
index 4e59bf0282..ae3ae46a69 100644
--- a/packages/effect/test/Effect/async.test.ts
+++ b/packages/effect/test/Effect/async.test.ts
@@ -22,8 +22,8 @@ describe("Effect", () => {
}))
it.effect("simple asyncEffect must return", () =>
Effect.gen(function*($) {
- const result = yield* $(Effect.asyncEffect((cb) => {
- return Effect.succeed(cb(Effect.succeed(42)))
+ const result = yield* $(Effect.asyncEffect((resume) => {
+ return Effect.succeed(resume(Effect.succeed(42)))
}))
assert.strictEqual(result, 42)
}))
@@ -95,18 +95,18 @@ describe("Effect", () => {
assert.deepStrictEqual(unexpected, Chunk.empty())
assert.deepStrictEqual(result, Option.none()) // the timeout should happen
}))
- it.live("asyncMaybe should not resume fiber twice after synchronous result", () =>
+ it.live("async should not resume fiber twice after synchronous result", () =>
Effect.gen(function*($) {
const step = yield* $(Deferred.make())
const unexpectedPlace = yield* $(Ref.make(Chunk.empty()))
const runtime = yield* $(Effect.runtime())
const fiber = yield* $(
- Effect.asyncOption((cb) => {
+ Effect.async((resume) => {
Runtime.runCallback(runtime)(pipe(
Deferred.await(step),
- Effect.zipRight(Effect.sync(() => cb(Ref.update(unexpectedPlace, Chunk.prepend(1)))))
+ Effect.zipRight(Effect.sync(() => resume(Ref.update(unexpectedPlace, Chunk.prepend(1)))))
))
- return Option.some(Effect.unit)
+ return Effect.unit
}),
Effect.flatMap(() =>
Effect.async(() => {
diff --git a/packages/effect/test/Effect/concurrency.test.ts b/packages/effect/test/Effect/concurrency.test.ts
index 482b3c7770..2597182c89 100644
--- a/packages/effect/test/Effect/concurrency.test.ts
+++ b/packages/effect/test/Effect/concurrency.test.ts
@@ -49,7 +49,7 @@ describe("Effect", () => {
const release = yield* $(Deferred.make())
const acquire = yield* $(Deferred.make())
const fiber = yield* $(
- Effect.asyncEffect((_) =>
+ Effect.asyncEffect((_) =>
// This will never complete because the callback is never invoked
Effect.acquireUseRelease(
Deferred.succeed(acquire, void 0),
diff --git a/packages/effect/test/Effect/interruption.test.ts b/packages/effect/test/Effect/interruption.test.ts
index 5cb4a77a5f..0b0172083f 100644
--- a/packages/effect/test/Effect/interruption.test.ts
+++ b/packages/effect/test/Effect/interruption.test.ts
@@ -546,11 +546,11 @@ describe("Effect", () => {
it.effect("async cancelation", () =>
Effect.gen(function*($) {
const ref = MutableRef.make(0)
- const effect = Effect.asyncEither(() => {
+ const effect = Effect.async(() => {
pipe(ref, MutableRef.set(MutableRef.get(ref) + 1))
- return Either.left(Effect.sync(() => {
+ return Effect.sync(() => {
pipe(ref, MutableRef.set(MutableRef.get(ref) - 1))
- }))
+ })
})
yield* $(Effect.unit, Effect.race(effect))
const result = MutableRef.get(ref)
diff --git a/packages/effect/test/Stream/async.test.ts b/packages/effect/test/Stream/async.test.ts
index c82766db89..5f2ebea181 100644
--- a/packages/effect/test/Stream/async.test.ts
+++ b/packages/effect/test/Stream/async.test.ts
@@ -3,7 +3,6 @@ import * as Cause from "effect/Cause"
import * as Chunk from "effect/Chunk"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
-import * as Either from "effect/Either"
import * as Exit from "effect/Exit"
import * as Fiber from "effect/Fiber"
import { pipe } from "effect/Function"
@@ -29,109 +28,14 @@ describe("Stream", () => {
assert.deepStrictEqual(Array.from(result), array)
}))
- it.effect("asyncEffect - simple example", () =>
- Effect.gen(function*($) {
- const array = [1, 2, 3, 4, 5]
- const latch = yield* $(Deferred.make())
- const fiber = yield* $(
- Stream.asyncEffect((emit) => {
- array.forEach((n) => {
- emit(Effect.succeed(Chunk.of(n)))
- })
- return pipe(
- Deferred.succeed(latch, void 0),
- Effect.zipRight(Effect.unit)
- )
- }),
- Stream.take(array.length),
- Stream.runCollect,
- Effect.fork
- )
- yield* $(Deferred.await(latch))
- const result = yield* $(Fiber.join(fiber))
- assert.deepStrictEqual(Array.from(result), array)
- }))
-
- it.effect("asyncEffect - handles errors", () =>
- Effect.gen(function*($) {
- const error = new Cause.RuntimeException("boom")
- const result = yield* $(
- Stream.asyncEffect((emit) => {
- emit.fromEffect(Effect.fail(error))
- return Effect.unit
- }),
- Stream.runCollect,
- Effect.exit
- )
- assert.deepStrictEqual(result, Exit.fail(error))
- }))
-
- it.effect("asyncEffect - handles defects", () =>
- Effect.gen(function*($) {
- const error = new Cause.RuntimeException("boom")
- const result = yield* $(
- Stream.asyncEffect(() => {
- throw error
- }),
- Stream.runCollect,
- Effect.exit
- )
- assert.deepStrictEqual(result, Exit.die(error))
- }))
-
- it.effect("asyncEffect - signals the end of the stream", () =>
- Effect.gen(function*($) {
- const result = yield* $(
- Stream.asyncEffect((emit) => {
- emit(Effect.fail(Option.none()))
- return Effect.unit
- }),
- Stream.runCollect
- )
- assert.isTrue(Chunk.isEmpty(result))
- }))
-
- it.effect("asyncEffect - backpressure", () =>
- Effect.gen(function*($) {
- const refCount = yield* $(Ref.make(0))
- const refDone = yield* $(Ref.make(false))
- const stream = Stream.asyncEffect>((emit) => {
- Promise.all(
- // 1st consumed by sink, 2-6 – in queue, 7th – back pressured
- [1, 2, 3, 4, 5, 6, 7].map((n) =>
- emit.fromEffectChunk(
- pipe(
- Ref.set(refCount, n),
- Effect.zipRight(Effect.succeed(Chunk.of(1)))
- )
- )
- )
- ).then(() =>
- emit.fromEffect(
- pipe(
- Ref.set(refDone, true),
- Effect.zipRight(Effect.fail(Option.none()))
- )
- )
- )
- return Effect.unit
- }, 5)
- const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never))
- const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
- yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
- const result = yield* $(Ref.get(refDone))
- yield* $(Fiber.interrupt(fiber))
- assert.isFalse(result)
- }))
-
- it.effect("asyncInterrupt - left", () =>
+ it.effect("async - with cleanup", () =>
Effect.gen(function*($) {
const ref = yield* $(Ref.make(false))
const latch = yield* $(Deferred.make())
const fiber = yield* $(
- Stream.asyncInterrupt((emit) => {
+ Stream.async((emit) => {
emit.chunk(Chunk.of(void 0))
- return Either.left(Ref.set(ref, true))
+ return Ref.set(ref, true)
}),
Stream.tap(() => Deferred.succeed(latch, void 0)),
Stream.runDrain,
@@ -143,35 +47,25 @@ describe("Stream", () => {
assert.isTrue(result)
}))
- it.effect("asyncInterrupt - right", () =>
- Effect.gen(function*($) {
- const chunk = Chunk.range(1, 5)
- const result = yield* $(
- Stream.asyncInterrupt(() => Either.right(Stream.fromChunk(chunk))),
- Stream.runCollect
- )
- assert.deepStrictEqual(Array.from(result), Array.from(chunk))
- }))
-
- it.effect("asyncInterrupt - signals the end of the stream", () =>
+ it.effect("async - signals the end of the stream", () =>
Effect.gen(function*($) {
const result = yield* $(
- Stream.asyncInterrupt((emit) => {
+ Stream.async((emit) => {
emit.end()
- return Either.left(Effect.unit)
+ return Effect.unit
}),
Stream.runCollect
)
assert.isTrue(Chunk.isEmpty(result))
}))
- it.effect("asyncInterrupt - handles errors", () =>
+ it.effect("async - handles errors", () =>
Effect.gen(function*($) {
const error = new Cause.RuntimeException("boom")
const result = yield* $(
- Stream.asyncInterrupt((emit) => {
+ Stream.async((emit) => {
emit.fromEffect(Effect.fail(error))
- return Either.left(Effect.unit)
+ return Effect.unit
}),
Stream.runCollect,
Effect.exit
@@ -179,11 +73,11 @@ describe("Stream", () => {
assert.deepStrictEqual(result, Exit.fail(error))
}))
- it.effect("asyncInterrupt - handles defects", () =>
+ it.effect("async - handles defects", () =>
Effect.gen(function*($) {
const error = new Cause.RuntimeException("boom")
const result = yield* $(
- Stream.asyncInterrupt(() => {
+ Stream.async(() => {
throw error
}),
Stream.runCollect,
@@ -192,11 +86,11 @@ describe("Stream", () => {
assert.deepStrictEqual(result, Exit.die(error))
}))
- it.effect("asyncInterrupt - backpressure", () =>
+ it.effect("async - backpressure", () =>
Effect.gen(function*($) {
const refCount = yield* $(Ref.make(0))
const refDone = yield* $(Ref.make(false))
- const stream = Stream.asyncInterrupt>((emit) => {
+ const stream = Stream.async>((emit) => {
Promise.all(
// 1st consumed by sink, 2-6 – in queue, 7th – back pressured
[1, 2, 3, 4, 5, 6, 7].map((n) =>
@@ -215,7 +109,7 @@ describe("Stream", () => {
)
)
)
- return Either.left(Effect.unit)
+ return Effect.unit
}, 5)
const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never))
const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
@@ -225,51 +119,36 @@ describe("Stream", () => {
assert.isFalse(result)
}))
- it.effect("asyncOption - signals the end of the stream", () =>
- Effect.gen(function*($) {
- const result = yield* $(
- Stream.asyncOption((emit) => {
- emit(Effect.fail(Option.none()))
- return Option.none()
- }),
- Stream.runCollect
- )
- assert.isTrue(Chunk.isEmpty(result))
- }))
-
- it.effect("asyncOption - some", () =>
- Effect.gen(function*($) {
- const chunk = Chunk.range(1, 5)
- const result = yield* $(
- Stream.asyncOption(() => Option.some(Stream.fromChunk(chunk))),
- Stream.runCollect
- )
- assert.deepStrictEqual(Array.from(result), Array.from(chunk))
- }))
-
- it.effect("asyncOption - none", () =>
+ it.effect("asyncEffect - simple example", () =>
Effect.gen(function*($) {
const array = [1, 2, 3, 4, 5]
- const result = yield* $(
- Stream.asyncOption((emit) => {
+ const latch = yield* $(Deferred.make())
+ const fiber = yield* $(
+ Stream.asyncEffect((emit) => {
array.forEach((n) => {
emit(Effect.succeed(Chunk.of(n)))
})
- return Option.none()
+ return pipe(
+ Deferred.succeed(latch, void 0),
+ Effect.zipRight(Effect.unit)
+ )
}),
Stream.take(array.length),
- Stream.runCollect
+ Stream.runCollect,
+ Effect.fork
)
+ yield* $(Deferred.await(latch))
+ const result = yield* $(Fiber.join(fiber))
assert.deepStrictEqual(Array.from(result), array)
}))
- it.effect("asyncOption - handles errors", () =>
+ it.effect("asyncEffect - handles errors", () =>
Effect.gen(function*($) {
const error = new Cause.RuntimeException("boom")
const result = yield* $(
- Stream.asyncOption((emit) => {
+ Stream.asyncEffect((emit) => {
emit.fromEffect(Effect.fail(error))
- return Option.none()
+ return Effect.unit
}),
Stream.runCollect,
Effect.exit
@@ -277,11 +156,11 @@ describe("Stream", () => {
assert.deepStrictEqual(result, Exit.fail(error))
}))
- it.effect("asyncOption - handles defects", () =>
+ it.effect("asyncEffect - handles defects", () =>
Effect.gen(function*($) {
const error = new Cause.RuntimeException("boom")
const result = yield* $(
- Stream.asyncOption(() => {
+ Stream.asyncEffect(() => {
throw error
}),
Stream.runCollect,
@@ -290,11 +169,23 @@ describe("Stream", () => {
assert.deepStrictEqual(result, Exit.die(error))
}))
- it.effect("asyncOption - backpressure", () =>
+ it.effect("asyncEffect - signals the end of the stream", () =>
+ Effect.gen(function*($) {
+ const result = yield* $(
+ Stream.asyncEffect((emit) => {
+ emit(Effect.fail(Option.none()))
+ return Effect.unit
+ }),
+ Stream.runCollect
+ )
+ assert.isTrue(Chunk.isEmpty(result))
+ }))
+
+ it.effect("asyncEffect - backpressure", () =>
Effect.gen(function*($) {
const refCount = yield* $(Ref.make(0))
const refDone = yield* $(Ref.make(false))
- const stream = Stream.asyncOption>((emit) => {
+ const stream = Stream.asyncEffect>((emit) => {
Promise.all(
// 1st consumed by sink, 2-6 – in queue, 7th – back pressured
[1, 2, 3, 4, 5, 6, 7].map((n) =>
@@ -313,16 +204,114 @@ describe("Stream", () => {
)
)
)
- return Option.none()
+ return Effect.unit
}, 5)
const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never))
const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
const result = yield* $(Ref.get(refDone))
- yield* $(Fiber.interrupt(fiber), Effect.exit)
+ yield* $(Fiber.interrupt(fiber))
assert.isFalse(result)
}))
+ // it.effect("asyncOption - signals the end of the stream", () =>
+ // Effect.gen(function*($) {
+ // const result = yield* $(
+ // Stream.asyncOption((emit) => {
+ // emit(Effect.fail(Option.none()))
+ // return Option.none()
+ // }),
+ // Stream.runCollect
+ // )
+ // assert.isTrue(Chunk.isEmpty(result))
+ // }))
+
+ // it.effect("asyncOption - some", () =>
+ // Effect.gen(function*($) {
+ // const chunk = Chunk.range(1, 5)
+ // const result = yield* $(
+ // Stream.asyncOption(() => Option.some(Stream.fromChunk(chunk))),
+ // Stream.runCollect
+ // )
+ // assert.deepStrictEqual(Array.from(result), Array.from(chunk))
+ // }))
+
+ // it.effect("asyncOption - none", () =>
+ // Effect.gen(function*($) {
+ // const array = [1, 2, 3, 4, 5]
+ // const result = yield* $(
+ // Stream.asyncOption((emit) => {
+ // array.forEach((n) => {
+ // emit(Effect.succeed(Chunk.of(n)))
+ // })
+ // return Option.none()
+ // }),
+ // Stream.take(array.length),
+ // Stream.runCollect
+ // )
+ // assert.deepStrictEqual(Array.from(result), array)
+ // }))
+
+ // it.effect("asyncOption - handles errors", () =>
+ // Effect.gen(function*($) {
+ // const error = new Cause.RuntimeException("boom")
+ // const result = yield* $(
+ // Stream.asyncOption((emit) => {
+ // emit.fromEffect(Effect.fail(error))
+ // return Option.none()
+ // }),
+ // Stream.runCollect,
+ // Effect.exit
+ // )
+ // assert.deepStrictEqual(result, Exit.fail(error))
+ // }))
+
+ // it.effect("asyncOption - handles defects", () =>
+ // Effect.gen(function*($) {
+ // const error = new Cause.RuntimeException("boom")
+ // const result = yield* $(
+ // Stream.asyncOption(() => {
+ // throw error
+ // }),
+ // Stream.runCollect,
+ // Effect.exit
+ // )
+ // assert.deepStrictEqual(result, Exit.die(error))
+ // }))
+
+ // it.effect("asyncOption - backpressure", () =>
+ // Effect.gen(function*($) {
+ // const refCount = yield* $(Ref.make(0))
+ // const refDone = yield* $(Ref.make(false))
+ // const stream = Stream.asyncOption>((emit) => {
+ // Promise.all(
+ // // 1st consumed by sink, 2-6 – in queue, 7th – back pressured
+ // [1, 2, 3, 4, 5, 6, 7].map((n) =>
+ // emit.fromEffectChunk(
+ // pipe(
+ // Ref.set(refCount, n),
+ // Effect.zipRight(Effect.succeed(Chunk.of(1)))
+ // )
+ // )
+ // )
+ // ).then(() =>
+ // emit.fromEffect(
+ // pipe(
+ // Ref.set(refDone, true),
+ // Effect.zipRight(Effect.fail(Option.none()))
+ // )
+ // )
+ // )
+ // return Option.none()
+ // }, 5)
+ // const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never))
+ // const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
+ // yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
+ // const result = yield* $(Ref.get(refDone))
+ // yield* $(Fiber.interrupt(fiber), Effect.exit)
+ // assert.isFalse(result)
+ // }))
+
it.effect("asyncScoped", () =>
Effect.gen(function*($) {
const array = [1, 2, 3, 4, 5]