From 0ee5b7764cb2c6894191f2973ae5877cf7ba8d3f Mon Sep 17 00:00:00 2001 From: Maxim Khramtsov Date: Thu, 12 Sep 2024 01:48:01 +0200 Subject: [PATCH] make Fiber subtype of Effect (#3590) Co-authored-by: maksim.khramtsov --- .changeset/seven-lamps-nail.md | 16 ++++ packages/effect/dtslint/Unify.ts | 17 +++- packages/effect/src/Fiber.ts | 45 ++++++++- .../effect/src/internal/effect/circular.ts | 4 + packages/effect/src/internal/fiber.ts | 85 +++++++++++------ packages/effect/src/internal/fiberRuntime.ts | 95 ++++++++++--------- packages/effect/test/Fiber.test.ts | 6 ++ 7 files changed, 192 insertions(+), 76 deletions(-) create mode 100644 .changeset/seven-lamps-nail.md diff --git a/.changeset/seven-lamps-nail.md b/.changeset/seven-lamps-nail.md new file mode 100644 index 0000000000..7691b1b0f9 --- /dev/null +++ b/.changeset/seven-lamps-nail.md @@ -0,0 +1,16 @@ +--- +"effect": minor +--- + +The `Fiber` is now a subtype of `Effect`. This change removes the need for explicit call `Fiber.join`. + +```typescript +import { Effect, Fiber } from "effect" + +Effect.gen(function*() { + const fiber = yield* Effect.fork(Effect.succeed(1)) + + const oldWay = yield* Fiber.join(fiber) + const now = yield* fiber +})) +``` diff --git a/packages/effect/dtslint/Unify.ts b/packages/effect/dtslint/Unify.ts index 175fe08137..835d43df22 100644 --- a/packages/effect/dtslint/Unify.ts +++ b/packages/effect/dtslint/Unify.ts @@ -2,6 +2,7 @@ import type * as Deferred from "effect/Deferred" import type * as Effect from "effect/Effect" import * as Either from "effect/Either" import type * as Exit from "effect/Exit" +import type * as Fiber from "effect/Fiber" import type * as FiberRef from "effect/FiberRef" import type * as Micro from "effect/Micro" import type * as Option from "effect/Option" @@ -78,8 +79,18 @@ export type FiberRefUnify = Unify.Unify< | FiberRef.FiberRef<1> | FiberRef.FiberRef<"a"> > +// $ExpectType Fiber<"a" | 1, "b" | 2> +export type FiberUnify = Unify.Unify< + | Fiber.Fiber<1, 2> + | Fiber.Fiber<"a", "b"> +> +// $ExpectType RuntimeFiber<"a" | 1, "b" | 2> +export type RuntimeFiberUnify = Unify.Unify< + | Fiber.RuntimeFiber<1, 2> + | Fiber.RuntimeFiber<"a", "b"> +> -// $ExpectType 0 | Option | Ref<1> | SynchronizedRef<1> | SubscriptionRef<1> | Deferred<1, 2> | Deferred<"a", "b"> | Ref<"A"> | SynchronizedRef<"A"> | SubscriptionRef<"A"> | FiberRef<12> | FiberRef<"a2"> | Either<1 | "A", 0 | "E"> | Effect<1 | "A", 0 | "E", "R" | "R1"> | RcRef<1 | "A", 0 | "E"> +// $ExpectType 0 | Option | Ref<1> | SynchronizedRef<1> | SubscriptionRef<1> | Deferred<1, 2> | Deferred<"a", "b"> | Fiber<"a" | 1, "b" | 2> | RuntimeFiber<"a" | 1, "b" | 2> | Ref<"A"> | SynchronizedRef<"A"> | SubscriptionRef<"A"> | FiberRef<12> | FiberRef<"a2"> | Either<1 | "A", 0 | "E"> | Effect<1 | "A", 0 | "E", "R" | "R1"> | RcRef<1 | "A", 0 | "E"> export type AllUnify = Unify.Unify< | Either.Either<1, 0> | Either.Either<"A", "E"> @@ -99,5 +110,9 @@ export type AllUnify = Unify.Unify< | Deferred.Deferred<"a", "b"> | FiberRef.FiberRef<12> | FiberRef.FiberRef<"a2"> + | Fiber.Fiber<1, 2> + | Fiber.Fiber<"a", "b"> + | Fiber.RuntimeFiber<1, 2> + | Fiber.RuntimeFiber<"a", "b"> | 0 > diff --git a/packages/effect/src/Fiber.ts b/packages/effect/src/Fiber.ts index 3f18046372..3c3e76065b 100644 --- a/packages/effect/src/Fiber.ts +++ b/packages/effect/src/Fiber.ts @@ -18,13 +18,13 @@ import * as internal from "./internal/fiber.js" import * as fiberRuntime from "./internal/fiberRuntime.js" import type * as Option from "./Option.js" import type * as order from "./Order.js" -import type { Pipeable } from "./Pipeable.js" import type * as RuntimeFlags from "./RuntimeFlags.js" import type { Scheduler } from "./Scheduler.js" import type * as Scope from "./Scope.js" import type { Supervisor } from "./Supervisor.js" import type { AnySpan, Tracer } from "./Tracer.js" import type * as Types from "./Types.js" +import type * as Unify from "./Unify.js" /** * @since 2.0.0 @@ -62,7 +62,7 @@ export type RuntimeFiberTypeId = typeof RuntimeFiberTypeId * @since 2.0.0 * @category models */ -export interface Fiber extends Fiber.Variance, Pipeable { +export interface Fiber extends Effect.Effect, Fiber.Variance { /** * The identity of the fiber. */ @@ -97,6 +97,26 @@ export interface Fiber extends Fiber.Variance, Pipea * resume immediately. Otherwise, the effect will resume when the fiber exits. */ interruptAsFork(fiberId: FiberId.FiberId): Effect.Effect + + readonly [Unify.typeSymbol]?: unknown + readonly [Unify.unifySymbol]?: FiberUnify + readonly [Unify.ignoreSymbol]?: FiberUnifyIgnore +} + +/** + * @category models + * @since 3.8.0 + */ +export interface FiberUnify extends Effect.EffectUnify { + Fiber?: () => A[Unify.typeSymbol] extends Fiber | infer _ ? Fiber : never +} + +/** + * @category models + * @since 3.8.0 + */ +export interface FiberUnifyIgnore extends Effect.EffectUnifyIgnore { + Effect?: true } /** @@ -190,6 +210,27 @@ export interface RuntimeFiber extends Fiber, Fiber.R * Gets the current supervisor */ get currentSupervisor(): Supervisor + + readonly [Unify.typeSymbol]?: unknown + readonly [Unify.unifySymbol]?: RuntimeFiberUnify + readonly [Unify.ignoreSymbol]?: RuntimeFiberUnifyIgnore +} + +/** + * @category models + * @since 3.8.0 + */ +export interface RuntimeFiberUnify extends FiberUnify { + RuntimeFiber?: () => A[Unify.typeSymbol] extends RuntimeFiber | infer _ ? RuntimeFiber + : never +} + +/** + * @category models + * @since 3.8.0 + */ +export interface RuntimeFiberUnifyIgnore extends FiberUnifyIgnore { + Fiber?: true } /** diff --git a/packages/effect/src/internal/effect/circular.ts b/packages/effect/src/internal/effect/circular.ts index da384e5457..cff08b7670 100644 --- a/packages/effect/src/internal/effect/circular.ts +++ b/packages/effect/src/internal/effect/circular.ts @@ -665,6 +665,10 @@ export const zipWithFiber = dual< f: (a: A, b: B) => C ) => Fiber.Fiber >(3, (self, that, f) => ({ + ...Effectable.CommitPrototype, + commit() { + return internalFiber.join(this) + }, [internalFiber.FiberTypeId]: internalFiber.fiberVariance, id: () => pipe(self.id(), FiberId.getOrElse(that.id())), await: pipe( diff --git a/packages/effect/src/internal/fiber.ts b/packages/effect/src/internal/fiber.ts index 5f4e1a88a2..dae7a6f1ac 100644 --- a/packages/effect/src/internal/fiber.ts +++ b/packages/effect/src/internal/fiber.ts @@ -14,6 +14,7 @@ import * as order from "../Order.js" import { pipeArguments } from "../Pipeable.js" import { hasProperty } from "../Predicate.js" import * as core from "./core.js" +import * as effectable from "./effectable.js" import * as fiberScope from "./fiberScope.js" import * as runtimeFlags from "./runtimeFlags.js" @@ -76,15 +77,23 @@ export const children = ( ): Effect.Effect>> => self.children /** @internal */ -export const done = (exit: Exit.Exit): Fiber.Fiber => ({ - ...fiberProto, - id: () => FiberId.none, - await: core.succeed(exit), - children: core.succeed([]), - inheritAll: core.void, - poll: core.succeed(Option.some(exit)), - interruptAsFork: () => core.void -}) +export const done = (exit: Exit.Exit): Fiber.Fiber => { + const _fiber = { + ...effectable.CommitPrototype, + commit() { + return join(this) + }, + ...fiberProto, + id: () => FiberId.none, + await: core.succeed(exit), + children: core.succeed([]), + inheritAll: core.void, + poll: core.succeed(Option.some(exit)), + interruptAsFork: () => core.void + } + + return _fiber +} /** @internal */ export const dump = (self: Fiber.RuntimeFiber): Effect.Effect => @@ -148,25 +157,32 @@ export const map = dual< export const mapEffect = dual< (f: (a: A) => Effect.Effect) => (self: Fiber.Fiber) => Fiber.Fiber, (self: Fiber.Fiber, f: (a: A) => Effect.Effect) => Fiber.Fiber ->(2, (self, f) => ({ - ...fiberProto, - id: () => self.id(), - await: core.flatMap(self.await, Exit.forEachEffect(f)), - children: self.children, - inheritAll: self.inheritAll, - poll: core.flatMap(self.poll, (result) => { - switch (result._tag) { - case "None": - return core.succeed(Option.none()) - case "Some": - return pipe( - Exit.forEachEffect(result.value, f), - core.map(Option.some) - ) - } - }), - interruptAsFork: (id) => self.interruptAsFork(id) -})) +>(2, (self, f) => { + const _fiber = { + ...effectable.CommitPrototype, + commit() { + return join(this) + }, + ...fiberProto, + id: () => self.id(), + await: core.flatMap(self.await, Exit.forEachEffect(f)), + children: self.children, + inheritAll: self.inheritAll, + poll: core.flatMap(self.poll, (result) => { + switch (result._tag) { + case "None": + return core.succeed(Option.none()) + case "Some": + return pipe( + Exit.forEachEffect(result.value, f), + core.map(Option.some) + ) + } + }), + interruptAsFork: (id: FiberId.FiberId) => self.interruptAsFork(id) + } + return _fiber +}) /** @internal */ export const mapFiber = dual< @@ -212,7 +228,11 @@ export const match = dual< }) /** @internal */ -export const never: Fiber.Fiber = { +const _never = { + ...effectable.CommitPrototype, + commit() { + return join(this) + }, ...fiberProto, id: () => FiberId.none, await: core.never, @@ -222,11 +242,18 @@ export const never: Fiber.Fiber = { interruptAsFork: () => core.never } +/** @internal */ +export const never: Fiber.Fiber = _never + /** @internal */ export const orElse = dual< (that: Fiber.Fiber) => (self: Fiber.Fiber) => Fiber.Fiber, (self: Fiber.Fiber, that: Fiber.Fiber) => Fiber.Fiber >(2, (self, that) => ({ + ...effectable.CommitPrototype, + commit() { + return join(this) + }, ...fiberProto, id: () => FiberId.getOrElse(self.id(), that.id()), await: core.zipWith( diff --git a/packages/effect/src/internal/fiberRuntime.ts b/packages/effect/src/internal/fiberRuntime.ts index e2cac6688d..f75f7d3e80 100644 --- a/packages/effect/src/internal/fiberRuntime.ts +++ b/packages/effect/src/internal/fiberRuntime.ts @@ -10,7 +10,7 @@ import type { DefaultServices } from "../DefaultServices.js" import * as Deferred from "../Deferred.js" import type * as Duration from "../Duration.js" import type * as Effect from "../Effect.js" -import { EffectTypeId } from "../Effectable.js" +import * as Effectable from "../Effectable.js" import type * as Either from "../Either.js" import * as ExecutionStrategy from "../ExecutionStrategy.js" import type * as Exit from "../Exit.js" @@ -269,14 +269,11 @@ export interface Snapshot { } /** @internal */ -export class FiberRuntime implements Fiber.RuntimeFiber { +export class FiberRuntime extends Effectable.Class + implements Fiber.RuntimeFiber +{ readonly [internalFiber.FiberTypeId] = internalFiber.fiberVariance readonly [internalFiber.RuntimeFiberTypeId] = runtimeFiberVariance - - pipe() { - return pipeArguments(this, arguments) - } - private _fiberRefs: FiberRefs.FiberRefs private _fiberId: FiberId.Runtime private _queue = new Array() @@ -304,6 +301,7 @@ export class FiberRuntime implements Fiber.RuntimeFi fiberRefs0: FiberRefs.FiberRefs, runtimeFlags0: RuntimeFlags.RuntimeFlags ) { + super() this.currentRuntimeFlags = runtimeFlags0 this._fiberId = fiberId this._fiberRefs = fiberRefs0 @@ -315,6 +313,10 @@ export class FiberRuntime implements Fiber.RuntimeFi this.refreshRefCache() } + commit(): Effect.Effect { + return internalFiber.join(this) + } + /** * The identity of the fiber. */ @@ -1334,10 +1336,10 @@ export class FiberRuntime implements Fiber.RuntimeFi // @ts-expect-error cur = this.currentTracer.context( () => { - if (version.getCurrentVersion() !== (cur as core.Primitive)[EffectTypeId]._V) { + if (version.getCurrentVersion() !== (cur as core.Primitive)[core.EffectTypeId]._V) { return core.dieMessage( `Cannot execute an Effect versioned ${ - (cur as core.Primitive)[EffectTypeId]._V + (cur as core.Primitive)[core.EffectTypeId]._V } with a Runtime of version ${version.getCurrentVersion()}` ) } @@ -3365,46 +3367,51 @@ export const fiberAwaitAll = >>( > => forEach(fibers, internalFiber._await) as any /** @internal */ -export const fiberAll = (fibers: Iterable>): Fiber.Fiber, E> => ({ - [internalFiber.FiberTypeId]: internalFiber.fiberVariance, - id: () => - RA.fromIterable(fibers).reduce((id, fiber) => FiberId.combine(id, fiber.id()), FiberId.none as FiberId.FiberId), - await: core.exit(forEachParUnbounded(fibers, (fiber) => core.flatten(fiber.await), false)), - children: core.map(forEachParUnbounded(fibers, (fiber) => fiber.children, false), RA.flatten), - inheritAll: core.forEachSequentialDiscard(fibers, (fiber) => fiber.inheritAll), - poll: core.map( - core.forEachSequential(fibers, (fiber) => fiber.poll), - RA.reduceRight( - Option.some, E>>(core.exitSucceed(new Array())), - (optionB, optionA) => { - switch (optionA._tag) { - case "None": { - return Option.none() - } - case "Some": { - switch (optionB._tag) { - case "None": { - return Option.none() - } - case "Some": { - return Option.some( - core.exitZipWith(optionA.value, optionB.value, { - onSuccess: (a, chunk) => [a, ...chunk], - onFailure: internalCause.parallel - }) - ) +export const fiberAll = (fibers: Iterable>): Fiber.Fiber, E> => { + const _fiberAll = { + ...Effectable.CommitPrototype, + commit() { + return internalFiber.join(this) + }, + [internalFiber.FiberTypeId]: internalFiber.fiberVariance, + id: () => + RA.fromIterable(fibers).reduce((id, fiber) => FiberId.combine(id, fiber.id()), FiberId.none as FiberId.FiberId), + await: core.exit(forEachParUnbounded(fibers, (fiber) => core.flatten(fiber.await), false)), + children: core.map(forEachParUnbounded(fibers, (fiber) => fiber.children, false), RA.flatten), + inheritAll: core.forEachSequentialDiscard(fibers, (fiber) => fiber.inheritAll), + poll: core.map( + core.forEachSequential(fibers, (fiber) => fiber.poll), + RA.reduceRight( + Option.some, E>>(core.exitSucceed(new Array())), + (optionB, optionA) => { + switch (optionA._tag) { + case "None": { + return Option.none() + } + case "Some": { + switch (optionB._tag) { + case "None": { + return Option.none() + } + case "Some": { + return Option.some( + core.exitZipWith(optionA.value, optionB.value, { + onSuccess: (a, chunk) => [a, ...chunk], + onFailure: internalCause.parallel + }) + ) + } } } } } - } - ) - ), - interruptAsFork: (fiberId) => core.forEachSequentialDiscard(fibers, (fiber) => fiber.interruptAsFork(fiberId)), - pipe() { - return pipeArguments(this, arguments) + ) + ), + interruptAsFork: (fiberId: FiberId.FiberId) => + core.forEachSequentialDiscard(fibers, (fiber) => fiber.interruptAsFork(fiberId)) } -}) + return _fiberAll +} /* @internal */ export const fiberInterruptFork = (self: Fiber.Fiber): Effect.Effect => diff --git a/packages/effect/test/Fiber.test.ts b/packages/effect/test/Fiber.test.ts index 1bd813ccae..0442f7b44f 100644 --- a/packages/effect/test/Fiber.test.ts +++ b/packages/effect/test/Fiber.test.ts @@ -226,4 +226,10 @@ describe("Fiber", () => { const result = yield* $(Fiber.join(Fiber.all(fibers)), Effect.asVoid) assert.isUndefined(result) }), 10000) + it.effect("is subtype of Effect", () => + Effect.gen(function*() { + const fiber = yield* Effect.fork(Effect.succeed(1)) + const fiberResult = yield* fiber + assert(1 === fiberResult) + })) })