Skip to content

Commit

Permalink
improve: expose interrupt, not waitUntilEmpty on FiberSets
Browse files Browse the repository at this point in the history
  • Loading branch information
patroza committed Jul 10, 2024
1 parent 6b7a52d commit d12fc1a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 16 deletions.
6 changes: 6 additions & 0 deletions .changeset/metal-doors-call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@effect-app/infra-adapters": minor
"effect-app": minor
---

improve: expose interrupt, not waitUntilEmpty on FiberSets
24 changes: 12 additions & 12 deletions packages/infra-adapters/src/RequestFiberSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,25 @@ const make = Effect.gen(function*($) {
Effect.andThen((count) => Effect.logInfo(`Joining ${count} current fibers on the RequestFiberSet`)),
Effect.andThen(FiberSet.join(set))
)
const waitUntilEmpty = Effect.gen(function*($) {
const currentSize = yield* $(FiberSet.size(set))
if (currentSize === 0) {
return
}
yield* $(Effect.logInfo("Waiting RequestFiberSet to be empty: " + currentSize))
while ((yield* $(FiberSet.size(set))) > 0) yield* $(Effect.sleep("250 millis"))
yield* $(Effect.logDebug("RequestFiberSet is empty"))
})
const run = FiberSet.run(set)

const register = <A, E, R>(self: Effect<A, E, R>) =>
self.pipe(Effect.fork, Effect.tap(add), Effect.andThen(Fiber.join))

yield* Effect.addFinalizer(() => Effect.uninterruptible(waitUntilEmpty))
// const waitUntilEmpty = Effect.gen(function*($) {
// const currentSize = yield* $(FiberSet.size(set))
// if (currentSize === 0) {
// return
// }
// yield* $(Effect.logInfo("Waiting RequestFiberSet to be empty: " + currentSize))
// while ((yield* $(FiberSet.size(set))) > 0) yield* $(Effect.sleep("250 millis"))
// yield* $(Effect.logDebug("RequestFiberSet is empty"))
// })
// TODO: loop and interrupt all fibers in the set continuously?
const interrupt = Fiber.interruptAll(set)

return {
interrupt,
join,
waitUntilEmpty,
run,
add,
addAll,
Expand Down
3 changes: 1 addition & 2 deletions packages/infra-adapters/src/ServiceBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ function makeSender(queueName: string) {
return yield* $(
Effect.acquireRelease(
Effect.sync(() => serviceBusClient.createSender(queueName)),
(subscription) =>
RequestFiberSet.waitUntilEmpty.pipe(Effect.andThen(Effect.promise(() => subscription.close())))
(subscription) => Effect.promise(() => subscription.close())
)
)
})
Expand Down
17 changes: 15 additions & 2 deletions packages/prelude/src/services/MainFiberSet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Fiber } from "@effect-app/core"
import { Context, Effect, FiberSet, Layer } from "@effect-app/core"
import { Context, Effect, Fiber, FiberSet, Layer } from "@effect-app/core"

import type {} from "effect/Scope"
import type {} from "effect/Context"
Expand All @@ -16,7 +15,21 @@ const make = Effect.gen(function*($) {
)
const run = FiberSet.run(set)

// const waitUntilEmpty = Effect.gen(function*($) {
// const currentSize = yield* $(FiberSet.size(set))
// if (currentSize === 0) {
// return
// }
// yield* $(Effect.logInfo("Waiting MainFiberSet to be empty: " + currentSize))
// while ((yield* $(FiberSet.size(set))) > 0) yield* $(Effect.sleep("250 millis"))
// yield* $(Effect.logDebug("MainFiberSet is empty"))
// })

// TODO: loop and interrupt all fibers in the set continuously?
const interrupt = Fiber.interruptAll(set)

return {
interrupt,
join,
run,
add,
Expand Down

0 comments on commit d12fc1a

Please sign in to comment.