Skip to content

Commit

Permalink
change Runtime.RunCallbackOptions type parameters order from `RunCa… (
Browse files Browse the repository at this point in the history
  • Loading branch information
gcanti authored and tim-smart committed Feb 7, 2024
1 parent 56762cf commit 2c52048
Show file tree
Hide file tree
Showing 16 changed files with 80 additions and 66 deletions.
5 changes: 5 additions & 0 deletions .changeset/afraid-pumpkins-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

change `Runtime.AsyncFiberException` type parameters order from `AsyncFiberException<E, A>` to `AsyncFiberException<A, E = never>`
5 changes: 5 additions & 0 deletions .changeset/brave-days-invite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

change `Runtime.Cancel` type parameters order from `Cancel<E, A>` to `Cancel<A, E = never>`
5 changes: 5 additions & 0 deletions .changeset/fuzzy-lamps-live.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

change `Runtime.RunCallbackOptions` type parameters order from `RunCallbackOptions<E, A>` to `RunCallbackOptions<A, E = never>`
4 changes: 2 additions & 2 deletions packages/effect/src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4714,8 +4714,8 @@ export const runFork: <A, E>(
*/
export const runCallback: <A, E>(
effect: Effect<A, E>,
options?: Runtime.RunCallbackOptions<E, A> | undefined
) => Runtime.Cancel<E, A> = _runtime.unsafeRunEffect
options?: Runtime.RunCallbackOptions<A, E> | undefined
) => Runtime.Cancel<A, E> = _runtime.unsafeRunEffect

/**
* Runs an `Effect` workflow, returning a `Promise` which resolves with the
Expand Down
14 changes: 7 additions & 7 deletions packages/effect/src/Runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type { Scope } from "./Scope.js"
* @since 2.0.0
* @category models
*/
export interface AsyncFiberException<out E, out A> {
export interface AsyncFiberException<out A, out E = never> {
readonly _tag: "AsyncFiberException"
readonly fiber: Fiber.RuntimeFiber<A, E>
}
Expand All @@ -28,8 +28,8 @@ export interface AsyncFiberException<out E, out A> {
* @since 2.0.0
* @category models
*/
export interface Cancel<out E, out A> {
(fiberId?: FiberId.FiberId, options?: RunCallbackOptions<E, A> | undefined): void
export interface Cancel<out A, out E = never> {
(fiberId?: FiberId.FiberId, options?: RunCallbackOptions<A, E> | undefined): void
}

/**
Expand Down Expand Up @@ -100,7 +100,7 @@ export const runSync: <R>(runtime: Runtime<R>) => <A, E>(effect: Effect.Effect<A
* @since 2.0.0
* @category models
*/
export interface RunCallbackOptions<E, A> extends RunForkOptions {
export interface RunCallbackOptions<in A, in E = never> extends RunForkOptions {
readonly onExit?: ((exit: Exit.Exit<A, E>) => void) | undefined
}

Expand All @@ -116,10 +116,10 @@ export interface RunCallbackOptions<E, A> extends RunForkOptions {
*/
export const runCallback: <R>(
runtime: Runtime<R>
) => <E, A>(
) => <A, E>(
effect: Effect.Effect<A, E, R>,
options?: RunCallbackOptions<E, A> | undefined
) => (fiberId?: FiberId.FiberId | undefined, options?: RunCallbackOptions<E, A> | undefined) => void =
options?: RunCallbackOptions<A, E> | undefined
) => (fiberId?: FiberId.FiberId | undefined, options?: RunCallbackOptions<A, E> | undefined) => void =
internal.unsafeRunCallback

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3873,7 +3873,7 @@ export const toQueueOfElements: {
* @since 2.0.0
* @category destructors
*/
export const toReadableStream: <E, A>(source: Stream<A, E>) => ReadableStream<A> = internal.toReadableStream
export const toReadableStream: <A, E>(source: Stream<A, E>) => ReadableStream<A> = internal.toReadableStream

/**
* Applies the transducer to the stream and emits its outputs.
Expand Down
12 changes: 6 additions & 6 deletions packages/effect/src/internal/keyedPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class KeyedPoolImpl<in K, in out A, out E = never> implements KeyedPool.KeyedPoo
}
}

type MapValue<E, A> = Complete<A, E> | Pending<A, E>
type MapValue<A, E> = Complete<A, E> | Pending<A, E>

class Complete<in out A, out E> implements Equal.Equal {
readonly _tag = "Complete"
Expand Down Expand Up @@ -101,22 +101,22 @@ const makeImpl = <K, A, E, R>(
fiberRuntime.all([
core.context<R>(),
core.fiberId,
core.sync(() => MutableRef.make(HashMap.empty<K, MapValue<E, A>>())),
core.sync(() => MutableRef.make(HashMap.empty<K, MapValue<A, E>>())),
fiberRuntime.scopeMake()
]),
core.map(([context, fiberId, map, scope]) => {
const getOrCreatePool = (key: K): Effect.Effect<Pool.Pool<A, E>> =>
core.suspend(() => {
let value: MapValue<E, A> | undefined = Option.getOrUndefined(HashMap.get(MutableRef.get(map), key))
let value: MapValue<A, E> | undefined = Option.getOrUndefined(HashMap.get(MutableRef.get(map), key))
if (value === undefined) {
return core.uninterruptibleMask((restore) => {
const deferred = core.deferredUnsafeMake<Pool.Pool<A, E>>(fiberId)
value = new Pending(deferred)
let previous: MapValue<E, A> | undefined = undefined
let previous: MapValue<A, E> | undefined = undefined
if (HashMap.has(MutableRef.get(map), key)) {
previous = Option.getOrUndefined(HashMap.get(MutableRef.get(map), key))
} else {
MutableRef.update(map, HashMap.set(key, value as MapValue<E, A>))
MutableRef.update(map, HashMap.set(key, value as MapValue<A, E>))
}
if (previous === undefined) {
return pipe(
Expand All @@ -143,7 +143,7 @@ const makeImpl = <K, A, E, R>(
)
},
onSuccess: (pool) => {
MutableRef.update(map, HashMap.set(key, new Complete(pool) as MapValue<E, A>))
MutableRef.update(map, HashMap.set(key, new Complete(pool) as MapValue<A, E>))
return core.as(
core.deferredSucceed(deferred, pool),
pool
Expand Down
26 changes: 13 additions & 13 deletions packages/effect/src/internal/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface PoolState {
readonly free: number
}

interface Attempted<E, A> {
interface Attempted<A, E> {
readonly result: Exit.Exit<A, E>
readonly finalizer: Effect.Effect<unknown>
}
Expand Down Expand Up @@ -141,7 +141,7 @@ class PoolImpl<in out A, in out E> implements Pool.Pool<A, E> {
readonly max: number,
readonly isShuttingDown: Ref.Ref<boolean>,
readonly state: Ref.Ref<PoolState>,
readonly items: Queue.Queue<Attempted<E, A>>,
readonly items: Queue.Queue<Attempted<A, E>>,
readonly invalidated: Ref.Ref<HashSet.HashSet<A>>,
readonly track: (exit: Exit.Exit<A, E>) => Effect.Effect<unknown>
) {}
Expand Down Expand Up @@ -178,7 +178,7 @@ class PoolImpl<in out A, in out E> implements Pool.Pool<A, E> {
get get(): Effect.Effect<A, E, Scope.Scope> {
const acquire = (
restore: <AX, EX, RX>(effect: Effect.Effect<AX, EX, RX>) => Effect.Effect<AX, EX, RX>
): Effect.Effect<Attempted<E, A>> =>
): Effect.Effect<Attempted<A, E>> =>
core.flatMap(ref.get(this.isShuttingDown), (down) =>
down
? core.interrupt
Expand Down Expand Up @@ -214,7 +214,7 @@ class PoolImpl<in out A, in out E> implements Pool.Pool<A, E> {
return [core.interrupt, state] as const
})))

const release = (attempted: Attempted<E, A>): Effect.Effect<unknown> =>
const release = (attempted: Attempted<A, E>): Effect.Effect<unknown> =>
core.exitMatch(attempted.result, {
onFailure: () =>
core.flatten(ref.modify(this.state, (state) => {
Expand Down Expand Up @@ -266,7 +266,7 @@ const allocate = <A, E>(
core.exit(restore(fiberRuntime.scopeExtend(self.creator, scope))),
(exit) =>
core.flatMap(
core.succeed<Attempted<E, A>>({
core.succeed<Attempted<A, E>>({
result: exit as Exit.Exit<A, E>,
finalizer: core.scopeClose(scope, core.exitSucceed(void 0))
}),
Expand All @@ -292,7 +292,7 @@ const excess = <A, E>(self: PoolImpl<A, E>): Effect.Effect<number> =>

const finalizeInvalid = <A, E>(
self: PoolImpl<A, E>,
attempted: Attempted<E, A>
attempted: Attempted<A, E>
): Effect.Effect<unknown> =>
pipe(
forEach(attempted, (a) => ref.update(self.invalidated, HashSet.remove(a))),
Expand Down Expand Up @@ -337,7 +337,7 @@ const getAndShutdown = <A, E>(self: PoolImpl<A, E>): Effect.Effect<void> =>
/**
* Begins pre-allocating pool entries based on minimum pool size.
*/
const initialize = <E, A>(self: PoolImpl<A, E>): Effect.Effect<void> =>
const initialize = <A, E>(self: PoolImpl<A, E>): Effect.Effect<void> =>
fiberRuntime.replicateEffect(
core.uninterruptibleMask((restore) =>
core.flatten(ref.modify(self.state, (state) => {
Expand All @@ -357,7 +357,7 @@ const initialize = <E, A>(self: PoolImpl<A, E>): Effect.Effect<void> =>
/**
* Shrinks the pool down, but never to less than the minimum size.
*/
const shrink = <E, A>(self: PoolImpl<A, E>): Effect.Effect<void> =>
const shrink = <A, E>(self: PoolImpl<A, E>): Effect.Effect<void> =>
core.uninterruptible(
core.flatten(ref.modify(self.state, (state) => {
if (state.size > self.min && state.free > 0) {
Expand All @@ -379,24 +379,24 @@ const shrink = <E, A>(self: PoolImpl<A, E>): Effect.Effect<void> =>
}))
)

const shutdown = <E, A>(self: PoolImpl<A, E>): Effect.Effect<void> =>
const shutdown = <A, E>(self: PoolImpl<A, E>): Effect.Effect<void> =>
core.flatten(ref.modify(self.isShuttingDown, (down) =>
down
? [queue.awaitShutdown(self.items), true] as const
: [core.zipRight(getAndShutdown(self), queue.awaitShutdown(self.items)), true]))

const isFailure = <E, A>(self: Attempted<E, A>): boolean => core.exitIsFailure(self.result)
const isFailure = <A, E>(self: Attempted<A, E>): boolean => core.exitIsFailure(self.result)

const forEach = <E, A, E2, R>(
self: Attempted<E, A>,
self: Attempted<A, E>,
f: (a: A) => Effect.Effect<unknown, E2, R>
): Effect.Effect<unknown, E2, R> =>
core.exitMatch(self.result, {
onFailure: () => core.unit,
onSuccess: f
})

const toEffect = <E, A>(self: Attempted<E, A>): Effect.Effect<A, E> => self.result
const toEffect = <A, E>(self: Attempted<A, E>): Effect.Effect<A, E> => self.result

/**
* A more powerful variant of `make` that allows specifying a `Strategy` that
Expand All @@ -417,7 +417,7 @@ const makeWith = <A, E, R, S, R2>(
core.context<R>(),
ref.make(false),
ref.make<PoolState>({ size: 0, free: 0 }),
queue.bounded<Attempted<E, A>>(options.max),
queue.bounded<Attempted<A, E>>(options.max),
ref.make(HashSet.empty<A>()),
options.strategy.initial()
]),
Expand Down
8 changes: 4 additions & 4 deletions packages/effect/src/internal/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ export const unsafeFork = <R>(runtime: Runtime.Runtime<R>) =>

/** @internal */
export const unsafeRunCallback = <R>(runtime: Runtime.Runtime<R>) =>
<E, A>(
<A, E>(
effect: Effect.Effect<A, E, R>,
options: Runtime.RunCallbackOptions<E, A> = {}
): (fiberId?: FiberId.FiberId, options?: Runtime.RunCallbackOptions<E, A> | undefined) => void => {
options: Runtime.RunCallbackOptions<A, E> = {}
): (fiberId?: FiberId.FiberId, options?: Runtime.RunCallbackOptions<A, E> | undefined) => void => {
const fiberRuntime = unsafeFork(runtime)(effect, options)

if (options.onExit) {
Expand Down Expand Up @@ -131,7 +131,7 @@ export const unsafeRunSync = <R>(runtime: Runtime.Runtime<R>) => <A, E>(effect:
}
}

const asyncFiberException = <A, E>(fiber: Fiber.RuntimeFiber<A, E>): Runtime.AsyncFiberException<E, A> => {
const asyncFiberException = <A, E>(fiber: Fiber.RuntimeFiber<A, E>): Runtime.AsyncFiberException<A, E> => {
const limit = Error.stackTraceLimit
Error.stackTraceLimit = 0
const error = (new Error()) as any
Expand Down
10 changes: 5 additions & 5 deletions packages/effect/src/internal/sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ export const splitWhere = dual<
core.fromEffect(Ref.make(Chunk.empty<In>())),
core.flatMap((ref) =>
pipe(
splitWhereSplitter<E, In>(false, ref, f),
splitWhereSplitter<In, E>(false, ref, f),
channel.pipeToOrFail(toChannel(self)),
core.collectElements,
core.flatMap(([leftovers, z]) =>
Expand All @@ -1846,22 +1846,22 @@ export const splitWhere = dual<
})

/** @internal */
const splitWhereSplitter = <E, A>(
const splitWhereSplitter = <A, E>(
written: boolean,
leftovers: Ref.Ref<Chunk.Chunk<A>>,
f: Predicate<A>
): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E, never, unknown, unknown> =>
core.readWithCause({
onInput: (input) => {
if (Chunk.isEmpty(input)) {
return splitWhereSplitter<E, A>(written, leftovers, f)
return splitWhereSplitter(written, leftovers, f)
}
if (written) {
const index = indexWhere(input, f)
if (index === -1) {
return channel.zipRight(
core.write(input),
splitWhereSplitter<E, A>(true, leftovers, f)
splitWhereSplitter<A, E>(true, leftovers, f)
)
}
const [left, right] = Chunk.splitAt(input, index)
Expand All @@ -1874,7 +1874,7 @@ const splitWhereSplitter = <E, A>(
if (index === -1) {
return channel.zipRight(
core.write(input),
splitWhereSplitter<E, A>(true, leftovers, f)
splitWhereSplitter<A, E>(true, leftovers, f)
)
}
const [left, right] = pipe(input, Chunk.splitAt(Math.max(index, 1)))
Expand Down
12 changes: 6 additions & 6 deletions packages/effect/src/internal/stm/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ export const unsafeAtomically = <A, E, R>(
}
case TryCommitOpCodes.OP_SUSPEND: {
const txnId = TxnId.make()
const state: { value: STMState.STMState<E, A> } = { value: STMState.running }
const state: { value: STMState.STMState<A, E> } = { value: STMState.running }
const effect = Effect.async(
(k: (effect: Effect.Effect<A, E, R>) => unknown): void =>
tryCommitAsync(fiberId, self, txnId, state, env, scheduler, priority, k)
Expand Down Expand Up @@ -235,11 +235,11 @@ export const unsafeAtomically = <A, E, R>(
const tryCommit = <A, E, R>(
fiberId: FiberId.FiberId,
stm: STM.STM<A, E, R>,
state: { value: STMState.STMState<E, A> },
state: { value: STMState.STMState<A, E> },
env: Context.Context<R>,
scheduler: Scheduler.Scheduler,
priority: number
): TryCommit.TryCommit<E, A> => {
): TryCommit.TryCommit<A, E> => {
const journal: Journal.Journal = new Map()
const tExit = new STMDriver(stm, journal, fiberId, env).run()
const analysis = Journal.analyzeJournal(journal)
Expand Down Expand Up @@ -300,7 +300,7 @@ const tryCommitSync = <A, E, R>(
env: Context.Context<R>,
scheduler: Scheduler.Scheduler,
priority: number
): TryCommit.TryCommit<E, A> => {
): TryCommit.TryCommit<A, E> => {
const journal: Journal.Journal = new Map()
const tExit = new STMDriver(stm, journal, fiberId, env).run()
const analysis = Journal.analyzeJournal(journal)
Expand Down Expand Up @@ -355,7 +355,7 @@ const tryCommitAsync = <A, E, R>(
fiberId: FiberId.FiberId,
self: STM.STM<A, E, R>,
txnId: TxnId.TxnId,
state: { value: STMState.STMState<E, A> },
state: { value: STMState.STMState<A, E> },
context: Context.Context<R>,
scheduler: Scheduler.Scheduler,
priority: number,
Expand Down Expand Up @@ -386,7 +386,7 @@ const completeTodos = <A, E>(
journal: Journal.Journal,
scheduler: Scheduler.Scheduler,
priority: number
): TryCommit.TryCommit<E, A> => {
): TryCommit.TryCommit<A, E> => {
const todos = Journal.collectTodos(journal)
if (todos.size > 0) {
scheduler.scheduleTask(() => Journal.execTodos(todos), priority)
Expand Down
2 changes: 1 addition & 1 deletion packages/effect/src/internal/stm/stm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const acquireUseRelease = dual<
release: (resource: A) => STM.STM<A3, E3, R3>
): Effect.Effect<A2, E | E2 | E3, R | R2 | R3> =>
Effect.uninterruptibleMask((restore) => {
let state: STMState.STMState<E, A> = STMState.running
let state: STMState.STMState<A, E> = STMState.running
return pipe(
restore(
core.unsafeAtomically(
Expand Down
Loading

0 comments on commit 2c52048

Please sign in to comment.