Skip to content

Commit

Permalink
add propagateInterruption option to Fiber{Handle,Set,Map} (#3407)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Aug 14, 2024
1 parent f8d95a6 commit 8da85e5
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 53 deletions.
7 changes: 7 additions & 0 deletions .changeset/short-garlics-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"effect": minor
---

add `propagateInterruption` option to Fiber{Handle,Set,Map}

This option will send any external interrupts to the .join result.
56 changes: 46 additions & 10 deletions packages/effect/src/FiberHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import * as Exit from "./Exit.js"
import * as Fiber from "./Fiber.js"
import * as FiberId from "./FiberId.js"
import * as FiberRef from "./FiberRef.js"
import { dual } from "./Function.js"
import { constFalse, dual } from "./Function.js"
import * as HashSet from "./HashSet.js"
import * as Inspectable from "./Inspectable.js"
import type { FiberRuntime } from "./internal/fiberRuntime.js"
import * as Option from "./Option.js"
Expand Down Expand Up @@ -142,6 +143,17 @@ export const makeRuntime = <R, E = unknown, A = unknown>(): Effect.Effect<
(self) => runtime(self)<R>()
)

const internalFiberIdId = -1
const internalFiberId = FiberId.make(internalFiberIdId, 0)
const isInternalInterruption = Cause.reduceWithContext(undefined, {
emptyCase: constFalse,
failCase: constFalse,
dieCase: constFalse,
interruptCase: (_, fiberId) => HashSet.has(FiberId.ids(fiberId), internalFiberIdId),
sequentialCase: (_, left, right) => left || right,
parallelCase: (_, left, right) => left || right
})

/**
* Set the fiber in a FiberHandle. When the fiber completes, it will be removed from the FiberHandle.
* If a fiber is already running, it will be interrupted unless `options.onlyIfMissing` is set.
Expand All @@ -155,6 +167,7 @@ export const unsafeSet: {
options?: {
readonly interruptAs?: FiberId.FiberId | undefined
readonly onlyIfMissing?: boolean | undefined
readonly propagateInterruption?: boolean | undefined
}
): (self: FiberHandle<A, E>) => void
<A, E, XE extends E, XA extends A>(
Expand All @@ -163,6 +176,7 @@ export const unsafeSet: {
options?: {
readonly interruptAs?: FiberId.FiberId | undefined
readonly onlyIfMissing?: boolean | undefined
readonly propagateInterruption?: boolean | undefined
}
): void
} = dual((args) => isFiberHandle(args[0]), <A, E, XE extends E, XA extends A>(
Expand All @@ -171,19 +185,20 @@ export const unsafeSet: {
options?: {
readonly interruptAs?: FiberId.FiberId | undefined
readonly onlyIfMissing?: boolean | undefined
readonly propagateInterruption?: boolean | undefined
}
): void => {
if (self.state._tag === "Closed") {
fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none)
fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId))
return
} else if (self.state.fiber !== undefined) {
if (options?.onlyIfMissing === true) {
fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none)
fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId))
return
} else if (self.state.fiber === fiber) {
return
}
self.state.fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none)
self.state.fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId))
self.state.fiber = undefined
}

Expand All @@ -193,7 +208,14 @@ export const unsafeSet: {
if (self.state._tag === "Open" && fiber === self.state.fiber) {
self.state.fiber = undefined
}
if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) {
if (
Exit.isFailure(exit) &&
(
options?.propagateInterruption === true ?
!isInternalInterruption(exit.cause) :
!Cause.isInterruptedOnly(exit.cause)
)
) {
Deferred.unsafeDone(self.deferred, exit as any)
}
})
Expand All @@ -211,28 +233,32 @@ export const set: {
fiber: Fiber.RuntimeFiber<XA, XE>,
options?: {
readonly onlyIfMissing?: boolean
readonly propagateInterruption?: boolean | undefined
}
): (self: FiberHandle<A, E>) => Effect.Effect<void>
<A, E, XE extends E, XA extends A>(
self: FiberHandle<A, E>,
fiber: Fiber.RuntimeFiber<XA, XE>,
options?: {
readonly onlyIfMissing?: boolean
readonly propagateInterruption?: boolean | undefined
}
): Effect.Effect<void>
} = dual((args) => isFiberHandle(args[0]), <A, E, XE extends E, XA extends A>(
self: FiberHandle<A, E>,
fiber: Fiber.RuntimeFiber<XA, XE>,
options?: {
readonly onlyIfMissing?: boolean
readonly propagateInterruption?: boolean | undefined
}
): Effect.Effect<void> =>
Effect.fiberIdWith(
(fiberId) =>
Effect.sync(() =>
unsafeSet(self, fiber, {
interruptAs: fiberId,
onlyIfMissing: options?.onlyIfMissing
onlyIfMissing: options?.onlyIfMissing,
propagateInterruption: options?.propagateInterruption
})
)
))
Expand Down Expand Up @@ -261,12 +287,12 @@ export const get = <A, E>(self: FiberHandle<A, E>): Effect.Effect<Fiber.RuntimeF
*/
export const clear = <A, E>(self: FiberHandle<A, E>): Effect.Effect<void> =>
Effect.uninterruptibleMask((restore) =>
Effect.suspend(() => {
Effect.withFiberRuntime((fiber) => {
if (self.state._tag === "Closed" || self.state.fiber === undefined) {
return Effect.void
}
return Effect.zipRight(
restore(Fiber.interrupt(self.state.fiber)),
restore(Fiber.interruptAs(self.state.fiber, FiberId.combine(fiber.id(), internalFiberId))),
Effect.sync(() => {
if (self.state._tag === "Open") {
self.state.fiber = undefined
Expand Down Expand Up @@ -298,6 +324,7 @@ export const run: {
self: FiberHandle<A, E>,
options?: {
readonly onlyIfMissing?: boolean
readonly propagateInterruption?: boolean | undefined
}
): <R, XE extends E, XA extends A>(
effect: Effect.Effect<XA, XE, R>
Expand All @@ -307,13 +334,17 @@ export const run: {
effect: Effect.Effect<XA, XE, R>,
options?: {
readonly onlyIfMissing?: boolean
readonly propagateInterruption?: boolean | undefined
}
): Effect.Effect<Fiber.RuntimeFiber<XA, XE>, never, R>
} = function() {
const self = arguments[0] as FiberHandle
if (Effect.isEffect(arguments[1])) {
const effect = arguments[1]
const options = arguments[2] as { readonly onlyIfMissing?: boolean } | undefined
const options = arguments[2] as {
readonly onlyIfMissing?: boolean
readonly propagateInterruption?: boolean | undefined
} | undefined
return Effect.suspend(() => {
if (self.state._tag === "Closed") {
return Effect.interrupt
Expand All @@ -328,7 +359,10 @@ export const run: {
)
}) as any
}
const options = arguments[1] as { readonly onlyIfMissing?: boolean } | undefined
const options = arguments[1] as {
readonly onlyIfMissing?: boolean
readonly propagateInterruption?: boolean | undefined
} | undefined
return (effect: Effect.Effect<unknown, unknown, any>) =>
Effect.suspend(() => {
if (self.state._tag === "Closed") {
Expand Down Expand Up @@ -382,6 +416,7 @@ export const runtime: <A, E>(
options?:
| Runtime.RunForkOptions & {
readonly onlyIfMissing?: boolean | undefined
readonly propagateInterruption?: boolean | undefined
}
| undefined
) => Fiber.RuntimeFiber<XA, XE>,
Expand All @@ -397,6 +432,7 @@ export const runtime: <A, E>(
options?:
| Runtime.RunForkOptions & {
readonly onlyIfMissing?: boolean | undefined
readonly propagateInterruption?: boolean | undefined
}
| undefined
) => {
Expand Down
Loading

0 comments on commit 8da85e5

Please sign in to comment.