Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate Effect and Stream async methods #2087

Merged
merged 6 commits into from
Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .changeset/cuddly-parrots-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
"effect": minor
---

Consolidate `Effect.asyncOption`, `Effect.asyncEither`, `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt`

This PR removes `Effect.asyncOption` and `Effect.asyncEither` as their behavior can be entirely implemented with the new signature of `Effect.async`, which optionally returns a cleanup `Effect` from the registration callback.

```ts
declare const async: <A, E = never, R = never>(
register: (callback: (_: Effect<A, E, R>) => void, signal: AbortSignal) => void | Effect<void, never, R>,
blockingOn?: FiberId
) => Effect<A, E, R>
```

Additionally, this PR removes `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt` as their behavior can be entirely implemented with the new signature of `Stream.async`, which can optionally return a cleanup `Effect` from the registration callback.

```ts
declare const async: <A, E = never, R = never>(
register: (emit: Emit<R, E, A, void>) => Effect<void, never, R> | void,
outputBuffer?: number
) => Stream<A, E, R>
```
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"private": true,
"type": "module",
"packageManager": "pnpm@8.12.1",
"packageManager": "pnpm@8.15.1",
"workspaces": [
"packages/*"
],
Expand Down
63 changes: 12 additions & 51 deletions packages/effect/src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -976,17 +976,20 @@ export const validateFirst: {
// -------------------------------------------------------------------------------------

/**
* Imports an asynchronous side-effect into a pure `Effect` value.
* The callback function `Effect<A, E, R> => void` must be called at most once.
* Imports an asynchronous side-effect into a pure `Effect` value. The callback
* function `Effect<A, E, R> => void` **MUST** be called at most once.
*
* If an Effect is returned by the registration function, it will be executed
* if the fiber executing the effect is interrupted.
* The registration function can optionally return an Effect, which will be
* executed if the `Fiber` executing this Effect is interrupted.
*
* The registration function can also receive an `AbortSignal` if required for
* interruption.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
* The `FiberId` of the fiber that may complete the async callback may also be
* specified. This is called the "blocking fiber" because it suspends the fiber
* executing the `async` Effect (i.e. semantically blocks the fiber from making
* progress). Specifying this fiber id in cases where it is known will improve
* diagnostics, but not affect the behavior of the returned effect.
*
* @since 2.0.0
* @category constructors
Expand All @@ -1005,51 +1008,9 @@ export const async: <A, E = never, R = never>(
* @since 2.0.0
* @category constructors
*/
export const asyncEffect: <A, E, R, X, E2, R2>(
register: (callback: (_: Effect<A, E, R>) => void) => Effect<X, E2, R2>
) => Effect<A, E | E2, R | R2> = _runtime.asyncEffect

/**
* Imports an asynchronous effect into a pure `Effect` value, possibly returning
* the value synchronously.
*
* If the register function returns a value synchronously, then the callback
* function `Effect<A, E, R> => void` must not be called. Otherwise the callback
* function must be called at most once.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
*
* @since 2.0.0
* @category constructors
*/
export const asyncOption: <A, E = never, R = never>(
register: (callback: (_: Effect<A, E, R>) => void) => Option.Option<Effect<A, E, R>>,
blockingOn?: FiberId.FiberId
) => Effect<A, E, R> = effect.asyncOption

/**
* Imports an asynchronous side-effect into an effect. It has the option of
* returning the value synchronously, which is useful in cases where it cannot
* be determined if the effect is synchronous or asynchronous until the register
* is actually executed. It also has the option of returning a canceler,
* which will be used by the runtime to cancel the asynchronous effect if the fiber
* executing the effect is interrupted.
*
* If the register function returns a value synchronously, then the callback
* function `Effect<A, E, R> => void` must not be called. Otherwise the callback
* function must be called at most once.
*
* The `FiberId` of the fiber that may complete the async callback may be
* provided to allow for better diagnostics.
*
* @since 2.0.0
* @category constructors
*/
export const asyncEither: <A, E = never, R = never>(
register: (callback: (effect: Effect<A, E, R>) => void) => Either.Either<Effect<void, never, R>, Effect<A, E, R>>,
blockingOn?: FiberId.FiberId
) => Effect<A, E, R> = core.asyncEither
export const asyncEffect: <A, E, R, R3, E2, R2>(
register: (callback: (_: Effect<A, E, R>) => void) => Effect<Effect<void, never, R3> | void, E2, R2>
) => Effect<A, E | E2, R | R2 | R3> = _runtime.asyncEffect

/**
* @since 2.0.0
Expand Down
37 changes: 6 additions & 31 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,18 @@ export const as: {
} = internal.as

const _async: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => void,
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<void, never, R> | void,
outputBuffer?: number
) => Stream<A, E, R> = internal._async

export {
/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The optionality of the error type `E` can be used to signal the end
* of the stream, by setting it to `None`.
* times. The optionality of the error type `E` in `Emit` can be used to
* signal the end of the stream by setting it to `None`.
*
* The registration function can optionally return an `Effect`, which will be
* executed if the `Fiber` executing this Effect is interrupted.
*
* @since 2.0.0
* @category constructors
Expand All @@ -286,34 +289,6 @@ export const asyncEffect: <A, E = never, R = never>(
outputBuffer?: number
) => Stream<A, E, R> = internal.asyncEffect

/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback returns either a canceler or
* synchronously returns a stream. The optionality of the error type `E` can
* be used to signal the end of the stream, by setting it to `None`.
*
* @since 2.0.0
* @category constructors
*/
export const asyncInterrupt: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Either.Either<Effect.Effect<unknown, never, R>, Stream<A, E, R>>,
outputBuffer?: number
) => Stream<A, E, R> = internal.asyncInterrupt

/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback can possibly return the stream
* synchronously. The optionality of the error type `E` can be used to signal
* the end of the stream, by setting it to `None`.
*
* @since 2.0.0
* @category constructors
*/
export const asyncOption: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Option.Option<Stream<A, E, R>>,
outputBuffer?: number
) => Stream<A, E, R> = internal.asyncOption

/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback itself returns an a scoped
Expand Down
7 changes: 3 additions & 4 deletions packages/effect/src/internal/clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type * as Clock from "../Clock.js"
import * as Context from "../Context.js"
import * as Duration from "../Duration.js"
import type * as Effect from "../Effect.js"
import * as Either from "../Either.js"
import { constFalse } from "../Function.js"
import * as core from "./core.js"
import * as timeout from "./timeout.js"
Expand Down Expand Up @@ -86,9 +85,9 @@ class ClockImpl implements Clock.Clock {
}

sleep(duration: Duration.Duration): Effect.Effect<void> {
return core.asyncEither<void>((cb) => {
const canceler = globalClockScheduler.unsafeSchedule(() => cb(core.unit), duration)
return Either.left(core.asUnit(core.sync(canceler)))
return core.async<void>((resume) => {
const canceler = globalClockScheduler.unsafeSchedule(() => resume(core.unit), duration)
return core.asUnit(core.sync(canceler))
})
}
}
Expand Down
23 changes: 1 addition & 22 deletions packages/effect/src/internal/core-effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import * as Clock from "../Clock.js"
import * as Context from "../Context.js"
import * as Duration from "../Duration.js"
import type * as Effect from "../Effect.js"
import * as Either from "../Either.js"
import type * as Fiber from "../Fiber.js"
import * as FiberId from "../FiberId.js"
import type * as FiberId from "../FiberId.js"
import type * as FiberRef from "../FiberRef.js"
import * as FiberRefs from "../FiberRefs.js"
import type * as FiberRefsPatch from "../FiberRefsPatch.js"
Expand Down Expand Up @@ -75,26 +74,6 @@ export const asSome = <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<Opt
export const asSomeError = <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, Option.Option<E>, R> =>
core.mapError(self, Option.some)

/* @internal */
export const asyncOption = <A, E = never, R = never>(
register: (callback: (_: Effect.Effect<A, E, R>) => void) => Option.Option<Effect.Effect<A, E, R>>,
blockingOn: FiberId.FiberId = FiberId.none
): Effect.Effect<A, E, R> =>
core.asyncEither(
(cb) => {
const option = register(cb)
switch (option._tag) {
case "None": {
return Either.left(core.unit)
}
case "Some": {
return Either.right(option.value)
}
}
},
blockingOn
)

/* @internal */
export const try_: {
<A, E>(options: {
Expand Down
28 changes: 6 additions & 22 deletions packages/effect/src/internal/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,22 +480,6 @@ export const async = <A, E = never, R = never>(
effect
})

/* @internal */
export const asyncEither = <A, E = never, R = never>(
register: (
resume: (effect: Effect.Effect<A, E, R>) => void
) => Either.Either<Effect.Effect<void, never, R>, Effect.Effect<A, E, R>>,
blockingOn: FiberId.FiberId = FiberId.none
): Effect.Effect<A, E, R> =>
async<A, E, R>((resume) => {
const result = register(resume)
if (Either.isRight(result)) {
resume(result.right)
} else {
return result.left
}
}, blockingOn)

/* @internal */
export const catchAllCause = dual<
<E, A2, E2, R2>(
Expand Down Expand Up @@ -1462,11 +1446,11 @@ export const zipWith: {
): Effect.Effect<B, E | E2, R | R2> => flatMap(self, (a) => map(that, (b) => f(a, b))))

/* @internal */
export const never: Effect.Effect<never> = asyncEither<never>(() => {
export const never: Effect.Effect<never> = async<never>(() => {
const interval = setInterval(() => {
//
}, 2 ** 31 - 1)
return Either.left(sync(() => clearInterval(interval)))
return sync(() => clearInterval(interval))
})

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -2844,18 +2828,18 @@ export const deferredMakeAs = <A, E = never>(fiberId: FiberId.FiberId): Effect.E

/* @internal */
export const deferredAwait = <A, E>(self: Deferred.Deferred<A, E>): Effect.Effect<A, E> =>
asyncEither<A, E>((k) => {
async<A, E>((resume) => {
const state = MutableRef.get(self.state)
switch (state._tag) {
case DeferredOpCodes.OP_STATE_DONE: {
return Either.right(state.effect)
return resume(state.effect)
}
case DeferredOpCodes.OP_STATE_PENDING: {
pipe(
self.state,
MutableRef.set(deferred.pending([k, ...state.joiners]))
MutableRef.set(deferred.pending([resume, ...state.joiners]))
)
return Either.left(deferredInterruptJoiner(self, k))
return deferredInterruptJoiner(self, resume)
}
}
}, self.blockingOn)
Expand Down
9 changes: 4 additions & 5 deletions packages/effect/src/internal/effect/circular.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type * as Cause from "../../Cause.js"
import type * as Deferred from "../../Deferred.js"
import * as Duration from "../../Duration.js"
import type * as Effect from "../../Effect.js"
import * as Either from "../../Either.js"
import * as Equal from "../../Equal.js"
import type { Equivalence } from "../../Equivalence.js"
import * as Exit from "../../Exit.js"
Expand Down Expand Up @@ -45,7 +44,7 @@ class Semaphore {
}

readonly take = (n: number): Effect.Effect<number> =>
core.asyncEither<number>((resume) => {
core.async<number>((resume) => {
if (this.free < n) {
const observer = () => {
if (this.free < n) {
Expand All @@ -57,12 +56,12 @@ class Semaphore {
return true
}
this.waiters.add(observer)
return Either.left(core.sync(() => {
return core.sync(() => {
this.waiters.delete(observer)
}))
})
}
this.taken += n
return Either.right(core.succeed(n))
return resume(core.succeed(n))
})

readonly updateTaken = (f: (n: number) => number): Effect.Effect<void> =>
Expand Down
51 changes: 32 additions & 19 deletions packages/effect/src/internal/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import * as core from "./core.js"
import * as executionStrategy from "./executionStrategy.js"
import * as FiberRuntime from "./fiberRuntime.js"
import * as fiberScope from "./fiberScope.js"
import { internalize } from "./internalize.js"
import * as OpCodes from "./opCodes/effect.js"
import * as runtimeFlags from "./runtimeFlags.js"
import * as _supervisor from "./supervisor.js"
Expand Down Expand Up @@ -409,22 +410,34 @@ export const unsafeRunSyncExitEffect = unsafeRunSyncExit(defaultRuntime)
// circular with Effect

/** @internal */
export const asyncEffect = <A, E, R, X, E2, R2>(
register: (callback: (_: Effect.Effect<A, E, R>) => void) => Effect.Effect<X, E2, R2>
): Effect.Effect<A, E | E2, R | R2> =>
core.flatMap(
core.deferredMake<A, E | E2>(),
(deferred) =>
core.flatMap(runtime<R | R2>(), (runtime) =>
core.uninterruptibleMask((restore) =>
core.zipRight(
FiberRuntime.fork(restore(
core.catchAllCause(
register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))),
(cause) => core.deferredFailCause(deferred, cause)
)
)),
restore(core.deferredAwait(deferred))
)
))
)
export const asyncEffect = <A, E, R, R3, E2, R2>(
register: (
callback: (_: Effect.Effect<A, E, R>) => void
) => Effect.Effect<Effect.Effect<void, never, R3> | void, E2, R2>
): Effect.Effect<A, E | E2, R | R2 | R3> =>
core.suspend(() => {
internalize(register)
let cleanup: Effect.Effect<void, never, R3> | void = undefined
return core.flatMap(
core.deferredMake<A, E | E2>(),
(deferred) =>
core.flatMap(runtime<R | R2 | R3>(), (runtime) =>
core.uninterruptibleMask((restore) =>
core.zipRight(
FiberRuntime.fork(restore(
core.matchCauseEffect(
register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))),
{
onFailure: (cause) => core.deferredFailCause(deferred, cause),
onSuccess: (cleanup_) => {
cleanup = cleanup_
return core.unit
}
}
)
)),
restore(core.onInterrupt(core.deferredAwait(deferred), () => cleanup ?? core.unit))
)
))
)
})
Loading
Loading