Skip to content

Commit

Permalink
add Fiber{Set,Map}.runtime api (#1941)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jan 22, 2024
1 parent 202befc commit 07c1921
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 41 deletions.
5 changes: 5 additions & 0 deletions .changeset/real-elephants-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

make data-last FiberSet.run accept an Effect
5 changes: 5 additions & 0 deletions .changeset/selfish-elephants-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

add Fiber{Map,Set}.makeRuntime
5 changes: 5 additions & 0 deletions .changeset/serious-news-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

add Fiber{Set,Map}.runtime api
5 changes: 5 additions & 0 deletions .changeset/seven-parents-run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

make data-last FiberMap.run accept an Effect
113 changes: 97 additions & 16 deletions packages/effect/src/FiberMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import type { NoSuchElementException } from "./Cause.js"
import * as Fiber from "./Fiber.js"
import * as FiberId from "./FiberId.js"
import { dual } from "./Function.js"
import type { FiberMap } from "./index.js"
import * as Inspectable from "./Inspectable.js"
import * as MutableHashMap from "./MutableHashMap.js"
import * as Option from "./Option.js"
import { type Pipeable, pipeArguments } from "./Pipeable.js"
import * as Predicate from "./Predicate.js"
import * as Runtime from "./Runtime.js"

/**
* @since 2.0.0
Expand Down Expand Up @@ -98,6 +100,26 @@ const unsafeMake = <K, E = unknown, A = unknown>(): FiberMap<K, E, A> => {
export const make = <K, E = unknown, A = unknown>(): Effect.Effect<Scope.Scope, never, FiberMap<K, E, A>> =>
Effect.acquireRelease(Effect.sync(() => unsafeMake<K, E, A>()), clear)

/**
* Create an Effect run function that is backed by a FiberMap.
*
* @since 2.0.0
* @categories constructors
*/
export const makeRuntime = <R, K, E = unknown, A = unknown>(): Effect.Effect<
Scope.Scope | R,
never,
<XE extends E, XA extends A>(
key: K,
effect: Effect.Effect<R, XE, XA>,
options?: Runtime.RunForkOptions | undefined
) => Fiber.RuntimeFiber<XE, XA>
> =>
Effect.flatMap(
make<K, E, A>(),
(self) => runtime(self)<R>()
)

/**
* Add a fiber to the FiberMap. When the fiber completes, it will be removed from the FiberMap.
* If the key already exists in the FiberMap, the previous fiber will be interrupted.
Expand Down Expand Up @@ -263,30 +285,89 @@ export const clear = <K, E, A>(self: FiberMap<K, E, A>): Effect.Effect<never, ne
* @categories combinators
*/
export const run: {
<K, E, A, R, XE extends E, XA extends A>(
key: K,
effect: Effect.Effect<R, XE, XA>
): (self: FiberMap<K, E, A>) => Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
<K, E, A, R, XE extends E, XA extends A>(
<K, E, A>(
self: FiberMap<K, E, A>,
key: K,
effect: Effect.Effect<R, XE, XA>
): Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
} = dual<
<K, E, A, R, XE extends E, XA extends A>(
key: K,
key: K
): <R, XE extends E, XA extends A>(
effect: Effect.Effect<R, XE, XA>
) => (self: FiberMap<K, E, A>) => Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>,
) => Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
<K, E, A, R, XE extends E, XA extends A>(
self: FiberMap<K, E, A>,
key: K,
effect: Effect.Effect<R, XE, XA>
) => Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
>(3, (self, key, effect) =>
Effect.tap(
): Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
} = function() {
if (arguments.length === 2) {
const self = arguments[0] as FiberMap<any>
const key = arguments[1]
return (effect: Effect.Effect<any, any, any>) =>
Effect.tap(
Effect.forkDaemon(effect),
(fiber) => set(self, key, fiber)
)
}
const self = arguments[0] as FiberMap<any>
const key = arguments[1]
const effect = arguments[2] as Effect.Effect<any, any, any>
return Effect.tap(
Effect.forkDaemon(effect),
(fiber) => set(self, key, fiber)
))
) as any
}

/**
* Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberMap.
*
* @example
* import { Context, Effect, FiberMap } from "effect"
*
* interface Users {
* readonly _: unique symbol
* }
* const Users = Context.Tag<Users, {
* getAll: Effect.Effect<never, never, Array<unknown>>
* }>()
*
* Effect.gen(function*(_) {
* const map = yield* _(FiberMap.make<string>())
* const run = yield* _(FiberMap.runtime(map)<Users>())
*
* // run some effects and add the fibers to the map
* run("effect-a", Effect.andThen(Users, _ => _.getAll))
* run("effect-b", Effect.andThen(Users, _ => _.getAll))
* }).pipe(
* Effect.scoped // The fibers will be interrupted when the scope is closed
* )
*
* @since 2.0.0
* @categories combinators
*/
export const runtime: <K, E, A>(
self: FiberMap<K, E, A>
) => <R>() => Effect.Effect<
R,
never,
<XE extends E, XA extends A>(
key: K,
effect: Effect.Effect<R, XE, XA>,
options?: Runtime.RunForkOptions | undefined
) => Fiber.RuntimeFiber<XE, XA>
> = <K, E, A>(self: FiberMap<K, E, A>) => <R>() =>
Effect.map(
Effect.runtime<R>(),
(runtime) => {
const runFork = Runtime.runFork(runtime)
return <XE extends E, XA extends A>(
key: K,
effect: Effect.Effect<R, XE, XA>,
options?: Runtime.RunForkOptions | undefined
) => {
const fiber = runFork(effect, options)
unsafeSet(self, key, fiber)
return fiber
}
}
)

/**
* @since 2.0.0
Expand Down
99 changes: 86 additions & 13 deletions packages/effect/src/FiberSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { dual } from "./Function.js"
import * as Inspectable from "./Inspectable.js"
import { type Pipeable, pipeArguments } from "./Pipeable.js"
import * as Predicate from "./Predicate.js"
import * as Runtime from "./Runtime.js"

/**
* @since 2.0.0
Expand Down Expand Up @@ -94,6 +95,25 @@ const unsafeMake = <E = unknown, A = unknown>(): FiberSet<E, A> => {
export const make = <E = unknown, A = unknown>(): Effect.Effect<Scope.Scope, never, FiberSet<E, A>> =>
Effect.acquireRelease(Effect.sync(() => unsafeMake<E, A>()), clear)

/**
* Create an Effect run function that is backed by a FiberSet.
*
* @since 2.0.0
* @categories constructors
*/
export const makeRuntime = <R, E = unknown, A = unknown>(): Effect.Effect<
Scope.Scope | R,
never,
<XE extends E, XA extends A>(
effect: Effect.Effect<R, XE, XA>,
options?: Runtime.RunForkOptions | undefined
) => Fiber.RuntimeFiber<XE, XA>
> =>
Effect.flatMap(
make<E, A>(),
(self) => runtime(self)<R>()
)

/**
* Add a fiber to the FiberSet. When the fiber completes, it will be removed.
*
Expand Down Expand Up @@ -165,26 +185,79 @@ export const clear = <E, A>(self: FiberSet<E, A>): Effect.Effect<never, never, v
* @categories combinators
*/
export const run: {
<E, A, R, XE extends E, XA extends A>(
<E, A>(self: FiberSet<E, A>): <R, XE extends E, XA extends A>(
effect: Effect.Effect<R, XE, XA>
): (self: FiberSet<E, A>) => Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
) => Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
<E, A, R, XE extends E, XA extends A>(
self: FiberSet<E, A>,
effect: Effect.Effect<R, XE, XA>
): Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
} = dual<
<E, A, R, XE extends E, XA extends A>(
effect: Effect.Effect<R, XE, XA>
) => (self: FiberSet<E, A>) => Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>,
<E, A, R, XE extends E, XA extends A>(
self: FiberSet<E, A>,
effect: Effect.Effect<R, XE, XA>
) => Effect.Effect<R, never, Fiber.RuntimeFiber<XE, XA>>
>(2, (self, effect) =>
Effect.tap(
} = function() {
const self = arguments[0] as FiberSet<any>
if (arguments.length === 1) {
return (effect: Effect.Effect<any, any, any>) =>
Effect.tap(
Effect.forkDaemon(effect),
(fiber) => add(self, fiber)
)
}
const effect = arguments[1] as Effect.Effect<any, any, any>
return Effect.tap(
Effect.forkDaemon(effect),
(fiber) => add(self, fiber)
))
) as any
}

/**
* Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberSet.
*
* @example
* import { Context, Effect, FiberSet } from "effect"
*
* interface Users {
* readonly _: unique symbol
* }
* const Users = Context.Tag<Users, {
* getAll: Effect.Effect<never, never, Array<unknown>>
* }>()
*
* Effect.gen(function*(_) {
* const set = yield* _(FiberSet.make())
* const run = yield* _(FiberSet.runtime(set)<Users>())
*
* // run some effects and add the fibers to the set
* run(Effect.andThen(Users, _ => _.getAll))
* }).pipe(
* Effect.scoped // The fibers will be interrupted when the scope is closed
* )
*
* @since 2.0.0
* @categories combinators
*/
export const runtime: <E, A>(
self: FiberSet<E, A>
) => <R>() => Effect.Effect<
R,
never,
<XE extends E, XA extends A>(
effect: Effect.Effect<R, XE, XA>,
options?: Runtime.RunForkOptions | undefined
) => Fiber.RuntimeFiber<XE, XA>
> = <E, A>(self: FiberSet<E, A>) => <R>() =>
Effect.map(
Effect.runtime<R>(),
(runtime) => {
const runFork = Runtime.runFork(runtime)
return <XE extends E, XA extends A>(
effect: Effect.Effect<R, XE, XA>,
options?: Runtime.RunForkOptions | undefined
) => {
const fiber = runFork(effect, options)
unsafeAdd(self, fiber)
return fiber
}
}
)

/**
* @since 2.0.0
Expand Down
36 changes: 29 additions & 7 deletions packages/effect/test/FiberMap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,42 @@ describe("FiberMap", () => {
const map = yield* _(FiberMap.make<number>())
yield* _(
Effect.forEach(ReadonlyArray.range(1, 10), (i) =>
FiberMap.run(
map,
i,
Effect.onInterrupt(
Effect.never,
() => Ref.update(ref, (n) => n + 1)
)
Effect.onInterrupt(
Effect.never,
() => Ref.update(ref, (n) => n + 1)
).pipe(
FiberMap.run(map, i)
))
)
yield* _(Effect.yieldNow())
}),
Effect.scoped
)

assert.strictEqual(yield* _(Ref.get(ref)), 10)
}))

it.effect("runtime", () =>
Effect.gen(function*(_) {
const ref = yield* _(Ref.make(0))
yield* _(
Effect.gen(function*(_) {
const map = yield* _(FiberMap.make<number>())
const run = yield* _(FiberMap.runtime(map)<never>())
ReadonlyArray.range(1, 10).forEach((i) =>
run(
i,
Effect.onInterrupt(
Effect.never,
() => Ref.update(ref, (n) => n + 1)
)
)
)
yield* _(Effect.yieldNow())
}),
Effect.scoped
)

assert.strictEqual(yield* _(Ref.get(ref)), 10)
}))
})
32 changes: 27 additions & 5 deletions packages/effect/test/FiberSet.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Effect, Ref } from "effect"
import { Effect, ReadonlyArray, Ref } from "effect"
import * as it from "effect-test/utils/extend"
import * as FiberSet from "effect/FiberSet"
import { assert, describe } from "vitest"
Expand All @@ -11,14 +11,36 @@ describe("FiberSet", () => {
Effect.gen(function*(_) {
const set = yield* _(FiberSet.make())
yield* _(
FiberSet.run(
set,
Effect.onInterrupt(
Effect.never,
() => Ref.update(ref, (n) => n + 1)
).pipe(
FiberSet.run(set)
),
Effect.replicateEffect(10)
)
yield* _(Effect.yieldNow())
}),
Effect.scoped
)

assert.strictEqual(yield* _(Ref.get(ref)), 10)
}))

it.effect("runtime", () =>
Effect.gen(function*(_) {
const ref = yield* _(Ref.make(0))
yield* _(
Effect.gen(function*(_) {
const set = yield* _(FiberSet.make())
const run = yield* _(FiberSet.runtime(set)<never>())
ReadonlyArray.range(1, 10).forEach(() =>
run(
Effect.onInterrupt(
Effect.never,
() => Ref.update(ref, (n) => n + 1)
)
),
Effect.replicateEffect(10)
)
)
yield* _(Effect.yieldNow())
}),
Expand Down

0 comments on commit 07c1921

Please sign in to comment.