diff --git a/.changeset/metal-doors-call.md b/.changeset/metal-doors-call.md new file mode 100644 index 000000000..b093c6731 --- /dev/null +++ b/.changeset/metal-doors-call.md @@ -0,0 +1,6 @@ +--- +"@effect-app/infra-adapters": minor +"effect-app": minor +--- + +improve: expose interrupt, not waitUntilEmpty on FiberSets diff --git a/packages/infra-adapters/src/RequestFiberSet.ts b/packages/infra-adapters/src/RequestFiberSet.ts index 12724873a..d5d0f890a 100644 --- a/packages/infra-adapters/src/RequestFiberSet.ts +++ b/packages/infra-adapters/src/RequestFiberSet.ts @@ -12,25 +12,25 @@ const make = Effect.gen(function*($) { Effect.andThen((count) => Effect.logInfo(`Joining ${count} current fibers on the RequestFiberSet`)), Effect.andThen(FiberSet.join(set)) ) - const waitUntilEmpty = Effect.gen(function*($) { - const currentSize = yield* $(FiberSet.size(set)) - if (currentSize === 0) { - return - } - yield* $(Effect.logInfo("Waiting RequestFiberSet to be empty: " + currentSize)) - while ((yield* $(FiberSet.size(set))) > 0) yield* $(Effect.sleep("250 millis")) - yield* $(Effect.logDebug("RequestFiberSet is empty")) - }) const run = FiberSet.run(set) - const register = (self: Effect) => self.pipe(Effect.fork, Effect.tap(add), Effect.andThen(Fiber.join)) - yield* Effect.addFinalizer(() => Effect.uninterruptible(waitUntilEmpty)) + // const waitUntilEmpty = Effect.gen(function*($) { + // const currentSize = yield* $(FiberSet.size(set)) + // if (currentSize === 0) { + // return + // } + // yield* $(Effect.logInfo("Waiting RequestFiberSet to be empty: " + currentSize)) + // while ((yield* $(FiberSet.size(set))) > 0) yield* $(Effect.sleep("250 millis")) + // yield* $(Effect.logDebug("RequestFiberSet is empty")) + // }) + // TODO: loop and interrupt all fibers in the set continuously? + const interrupt = Fiber.interruptAll(set) return { + interrupt, join, - waitUntilEmpty, run, add, addAll, diff --git a/packages/infra-adapters/src/ServiceBus.ts b/packages/infra-adapters/src/ServiceBus.ts index 7f360d0c0..9453943c2 100644 --- a/packages/infra-adapters/src/ServiceBus.ts +++ b/packages/infra-adapters/src/ServiceBus.ts @@ -29,8 +29,7 @@ function makeSender(queueName: string) { return yield* $( Effect.acquireRelease( Effect.sync(() => serviceBusClient.createSender(queueName)), - (subscription) => - RequestFiberSet.waitUntilEmpty.pipe(Effect.andThen(Effect.promise(() => subscription.close()))) + (subscription) => Effect.promise(() => subscription.close()) ) ) }) diff --git a/packages/prelude/src/services/MainFiberSet.ts b/packages/prelude/src/services/MainFiberSet.ts index 68c63859a..5dfd61c2c 100644 --- a/packages/prelude/src/services/MainFiberSet.ts +++ b/packages/prelude/src/services/MainFiberSet.ts @@ -1,5 +1,4 @@ -import type { Fiber } from "@effect-app/core" -import { Context, Effect, FiberSet, Layer } from "@effect-app/core" +import { Context, Effect, Fiber, FiberSet, Layer } from "@effect-app/core" import type {} from "effect/Scope" import type {} from "effect/Context" @@ -16,7 +15,21 @@ const make = Effect.gen(function*($) { ) const run = FiberSet.run(set) + // const waitUntilEmpty = Effect.gen(function*($) { + // const currentSize = yield* $(FiberSet.size(set)) + // if (currentSize === 0) { + // return + // } + // yield* $(Effect.logInfo("Waiting MainFiberSet to be empty: " + currentSize)) + // while ((yield* $(FiberSet.size(set))) > 0) yield* $(Effect.sleep("250 millis")) + // yield* $(Effect.logDebug("MainFiberSet is empty")) + // }) + + // TODO: loop and interrupt all fibers in the set continuously? + const interrupt = Fiber.interruptAll(set) + return { + interrupt, join, run, add,