Skip to content

Commit

Permalink
Consolidate Effect and Stream async methods (#2087)
Browse files Browse the repository at this point in the history
  • Loading branch information
IMax153 committed Feb 13, 2024
1 parent b881365 commit 2b6f15e
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 432 deletions.
23 changes: 23 additions & 0 deletions .changeset/cuddly-parrots-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
"effect": minor
---

Consolidate `Effect.asyncOption`, `Effect.asyncEither`, `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt`

This PR removes `Effect.asyncOption` and `Effect.asyncEither` as their behavior can be entirely implemented with the new signature of `Effect.async`, which optionally returns a cleanup `Effect` from the registration callback.

```ts
declare const async: <A, E = never, R = never>(
register: (callback: (_: Effect<A, E, R>) => void, signal: AbortSignal) => void | Effect<void, never, R>,
blockingOn?: FiberId
) => Effect<A, E, R>
```
Additionally, this PR removes `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt` as their behavior can be entirely implemented with the new signature of `Stream.async`, which can optionally return a cleanup `Effect` from the registration callback.
```ts
declare const async: <A, E = never, R = never>(
register: (emit: Emit<R, E, A, void>) => Effect<void, never, R> | void,
outputBuffer?: number
) => Stream<A, E, R>
```
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"private": true,
"type": "module",
"packageManager": "pnpm@8.12.1",
"packageManager": "pnpm@8.15.1",
"workspaces": [
"packages/*"
],
Expand Down
63 changes: 12 additions & 51 deletions packages/effect/src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -976,17 +976,20 @@ export const validateFirst: {
// -------------------------------------------------------------------------------------

/**
* Imports an asynchronous side-effect into a pure `Effect` value.
* The callback function `Effect<A, E, R> => void` must be called at most once.
* Imports an asynchronous side-effect into a pure `Effect` value. The callback
* function `Effect<A, E, R> => void` **MUST** be called at most once.
*
* If an Effect is returned by the registration function, it will be executed
* if the fiber executing the effect is interrupted.
* The registration function can optionally return an Effect, which will be
* executed if the `Fiber` executing this Effect is interrupted.
*
* The registration function can also receive an `AbortSignal` if required for
* interruption.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
* The `FiberId` of the fiber that may complete the async callback may also be
* specified. This is called the "blocking fiber" because it suspends the fiber
* executing the `async` Effect (i.e. semantically blocks the fiber from making
* progress). Specifying this fiber id in cases where it is known will improve
* diagnostics, but not affect the behavior of the returned effect.
*
* @since 2.0.0
* @category constructors
Expand All @@ -1005,51 +1008,9 @@ export const async: <A, E = never, R = never>(
* @since 2.0.0
* @category constructors
*/
export const asyncEffect: <A, E, R, X, E2, R2>(
register: (callback: (_: Effect<A, E, R>) => void) => Effect<X, E2, R2>
) => Effect<A, E | E2, R | R2> = _runtime.asyncEffect

/**
* Imports an asynchronous effect into a pure `Effect` value, possibly returning
* the value synchronously.
*
* If the register function returns a value synchronously, then the callback
* function `Effect<A, E, R> => void` must not be called. Otherwise the callback
* function must be called at most once.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
*
* @since 2.0.0
* @category constructors
*/
export const asyncOption: <A, E = never, R = never>(
register: (callback: (_: Effect<A, E, R>) => void) => Option.Option<Effect<A, E, R>>,
blockingOn?: FiberId.FiberId
) => Effect<A, E, R> = effect.asyncOption

/**
* Imports an asynchronous side-effect into an effect. It has the option of
* returning the value synchronously, which is useful in cases where it cannot
* be determined if the effect is synchronous or asynchronous until the register
* is actually executed. It also has the option of returning a canceler,
* which will be used by the runtime to cancel the asynchronous effect if the fiber
* executing the effect is interrupted.
*
* If the register function returns a value synchronously, then the callback
* function `Effect<A, E, R> => void` must not be called. Otherwise the callback
* function must be called at most once.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
*
* @since 2.0.0
* @category constructors
*/
export const asyncEither: <A, E = never, R = never>(
register: (callback: (effect: Effect<A, E, R>) => void) => Either.Either<Effect<void, never, R>, Effect<A, E, R>>,
blockingOn?: FiberId.FiberId
) => Effect<A, E, R> = core.asyncEither
export const asyncEffect: <A, E, R, R3, E2, R2>(
register: (callback: (_: Effect<A, E, R>) => void) => Effect<Effect<void, never, R3> | void, E2, R2>
) => Effect<A, E | E2, R | R2 | R3> = _runtime.asyncEffect

/**
* @since 2.0.0
Expand Down
37 changes: 6 additions & 31 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,18 @@ export const as: {
} = internal.as

const _async: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => void,
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<void, never, R> | void,
outputBuffer?: number
) => Stream<A, E, R> = internal._async

export {
/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The optionality of the error type `E` can be used to signal the end
* of the stream, by setting it to `None`.
* times. The optionality of the error type `E` in `Emit` can be used to
* signal the end of the stream by setting it to `None`.
*
* The registration function can optionally return an `Effect`, which will be
* executed if the `Fiber` executing this Effect is interrupted.
*
* @since 2.0.0
* @category constructors
Expand All @@ -286,34 +289,6 @@ export const asyncEffect: <A, E = never, R = never>(
outputBuffer?: number
) => Stream<A, E, R> = internal.asyncEffect

/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback returns either a canceler or
* synchronously returns a stream. The optionality of the error type `E` can
* be used to signal the end of the stream, by setting it to `None`.
*
* @since 2.0.0
* @category constructors
*/
export const asyncInterrupt: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Either.Either<Effect.Effect<unknown, never, R>, Stream<A, E, R>>,
outputBuffer?: number
) => Stream<A, E, R> = internal.asyncInterrupt

/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback can possibly return the stream
* synchronously. The optionality of the error type `E` can be used to signal
* the end of the stream, by setting it to `None`.
*
* @since 2.0.0
* @category constructors
*/
export const asyncOption: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Option.Option<Stream<A, E, R>>,
outputBuffer?: number
) => Stream<A, E, R> = internal.asyncOption

/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback itself returns an a scoped
Expand Down
7 changes: 3 additions & 4 deletions packages/effect/src/internal/clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type * as Clock from "../Clock.js"
import * as Context from "../Context.js"
import * as Duration from "../Duration.js"
import type * as Effect from "../Effect.js"
import * as Either from "../Either.js"
import { constFalse } from "../Function.js"
import * as core from "./core.js"

Expand Down Expand Up @@ -85,9 +84,9 @@ class ClockImpl implements Clock.Clock {
}

sleep(duration: Duration.Duration): Effect.Effect<void> {
return core.asyncEither<void>((cb) => {
const canceler = globalClockScheduler.unsafeSchedule(() => cb(core.unit), duration)
return Either.left(core.asUnit(core.sync(canceler)))
return core.async<void>((resume) => {
const canceler = globalClockScheduler.unsafeSchedule(() => resume(core.unit), duration)
return core.asUnit(core.sync(canceler))
})
}
}
Expand Down
23 changes: 1 addition & 22 deletions packages/effect/src/internal/core-effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import * as Clock from "../Clock.js"
import * as Context from "../Context.js"
import * as Duration from "../Duration.js"
import type * as Effect from "../Effect.js"
import * as Either from "../Either.js"
import type * as Fiber from "../Fiber.js"
import * as FiberId from "../FiberId.js"
import type * as FiberId from "../FiberId.js"
import type * as FiberRef from "../FiberRef.js"
import * as FiberRefs from "../FiberRefs.js"
import type * as FiberRefsPatch from "../FiberRefsPatch.js"
Expand Down Expand Up @@ -75,26 +74,6 @@ export const asSome = <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<Opt
export const asSomeError = <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, Option.Option<E>, R> =>
core.mapError(self, Option.some)

/* @internal */
export const asyncOption = <A, E = never, R = never>(
register: (callback: (_: Effect.Effect<A, E, R>) => void) => Option.Option<Effect.Effect<A, E, R>>,
blockingOn: FiberId.FiberId = FiberId.none
): Effect.Effect<A, E, R> =>
core.asyncEither(
(cb) => {
const option = register(cb)
switch (option._tag) {
case "None": {
return Either.left(core.unit)
}
case "Some": {
return Either.right(option.value)
}
}
},
blockingOn
)

/* @internal */
export const try_: {
<A, E>(options: {
Expand Down
28 changes: 6 additions & 22 deletions packages/effect/src/internal/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,22 +480,6 @@ export const async = <A, E = never, R = never>(
effect
})

/* @internal */
export const asyncEither = <A, E = never, R = never>(
register: (
resume: (effect: Effect.Effect<A, E, R>) => void
) => Either.Either<Effect.Effect<void, never, R>, Effect.Effect<A, E, R>>,
blockingOn: FiberId.FiberId = FiberId.none
): Effect.Effect<A, E, R> =>
async<A, E, R>((resume) => {
const result = register(resume)
if (Either.isRight(result)) {
resume(result.right)
} else {
return result.left
}
}, blockingOn)

/* @internal */
export const catchAllCause = dual<
<E, A2, E2, R2>(
Expand Down Expand Up @@ -1462,11 +1446,11 @@ export const zipWith: {
): Effect.Effect<B, E | E2, R | R2> => flatMap(self, (a) => map(that, (b) => f(a, b))))

/* @internal */
export const never: Effect.Effect<never> = asyncEither<never>(() => {
export const never: Effect.Effect<never> = async<never>(() => {
const interval = setInterval(() => {
//
}, 2 ** 31 - 1)
return Either.left(sync(() => clearInterval(interval)))
return sync(() => clearInterval(interval))
})

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -2844,18 +2828,18 @@ export const deferredMakeAs = <A, E = never>(fiberId: FiberId.FiberId): Effect.E

/* @internal */
export const deferredAwait = <A, E>(self: Deferred.Deferred<A, E>): Effect.Effect<A, E> =>
asyncEither<A, E>((k) => {
async<A, E>((resume) => {
const state = MutableRef.get(self.state)
switch (state._tag) {
case DeferredOpCodes.OP_STATE_DONE: {
return Either.right(state.effect)
return resume(state.effect)
}
case DeferredOpCodes.OP_STATE_PENDING: {
pipe(
self.state,
MutableRef.set(deferred.pending([k, ...state.joiners]))
MutableRef.set(deferred.pending([resume, ...state.joiners]))
)
return Either.left(deferredInterruptJoiner(self, k))
return deferredInterruptJoiner(self, resume)
}
}
}, self.blockingOn)
Expand Down
9 changes: 4 additions & 5 deletions packages/effect/src/internal/effect/circular.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type * as Cause from "../../Cause.js"
import type * as Deferred from "../../Deferred.js"
import * as Duration from "../../Duration.js"
import type * as Effect from "../../Effect.js"
import * as Either from "../../Either.js"
import * as Equal from "../../Equal.js"
import type { Equivalence } from "../../Equivalence.js"
import * as Exit from "../../Exit.js"
Expand Down Expand Up @@ -45,7 +44,7 @@ class Semaphore {
}

readonly take = (n: number): Effect.Effect<number> =>
core.asyncEither<number>((resume) => {
core.async<number>((resume) => {
if (this.free < n) {
const observer = () => {
if (this.free < n) {
Expand All @@ -56,12 +55,12 @@ class Semaphore {
resume(core.succeed(n))
}
this.waiters.add(observer)
return Either.left(core.sync(() => {
return core.sync(() => {
this.waiters.delete(observer)
}))
})
}
this.taken += n
return Either.right(core.succeed(n))
return resume(core.succeed(n))
})

readonly updateTaken = (f: (n: number) => number): Effect.Effect<number> =>
Expand Down
51 changes: 32 additions & 19 deletions packages/effect/src/internal/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import * as core from "./core.js"
import * as executionStrategy from "./executionStrategy.js"
import * as FiberRuntime from "./fiberRuntime.js"
import * as fiberScope from "./fiberScope.js"
import { internalize } from "./internalize.js"
import * as OpCodes from "./opCodes/effect.js"
import * as runtimeFlags from "./runtimeFlags.js"
import * as _supervisor from "./supervisor.js"
Expand Down Expand Up @@ -409,22 +410,34 @@ export const unsafeRunSyncExitEffect = unsafeRunSyncExit(defaultRuntime)
// circular with Effect

/** @internal */
export const asyncEffect = <A, E, R, X, E2, R2>(
register: (callback: (_: Effect.Effect<A, E, R>) => void) => Effect.Effect<X, E2, R2>
): Effect.Effect<A, E | E2, R | R2> =>
core.flatMap(
core.deferredMake<A, E | E2>(),
(deferred) =>
core.flatMap(runtime<R | R2>(), (runtime) =>
core.uninterruptibleMask((restore) =>
core.zipRight(
FiberRuntime.fork(restore(
core.catchAllCause(
register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))),
(cause) => core.deferredFailCause(deferred, cause)
)
)),
restore(core.deferredAwait(deferred))
)
))
)
export const asyncEffect = <A, E, R, R3, E2, R2>(
register: (
callback: (_: Effect.Effect<A, E, R>) => void
) => Effect.Effect<Effect.Effect<void, never, R3> | void, E2, R2>
): Effect.Effect<A, E | E2, R | R2 | R3> =>
core.suspend(() => {
internalize(register)
let cleanup: Effect.Effect<void, never, R3> | void = undefined
return core.flatMap(
core.deferredMake<A, E | E2>(),
(deferred) =>
core.flatMap(runtime<R | R2 | R3>(), (runtime) =>
core.uninterruptibleMask((restore) =>
core.zipRight(
FiberRuntime.fork(restore(
core.matchCauseEffect(
register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))),
{
onFailure: (cause) => core.deferredFailCause(deferred, cause),
onSuccess: (cleanup_) => {
cleanup = cleanup_
return core.unit
}
}
)
)),
restore(core.onInterrupt(core.deferredAwait(deferred), () => cleanup ?? core.unit))
)
))
)
})
Loading

0 comments on commit 2b6f15e

Please sign in to comment.