Skip to content

Commit

Permalink
ensure offers aren't dropped
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Sep 10, 2024
1 parent ff3d3ac commit 7543277
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
42 changes: 30 additions & 12 deletions packages/effect/src/internal/mailbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ type MailboxState<A, E> = {
readonly resume: (_: Effect<Array<A>>) => void
}>
readonly awaiters: Set<(_: Effect<void, E>) => void>
} | {
readonly _tag: "Closing"
readonly takers: Set<(_: Effect<void, E>) => void>
readonly offers: Set<{
readonly remaining: Array<A>
offset: number
readonly resume: (_: Effect<Array<A>>) => void
}>
readonly awaiters: Set<(_: Effect<void, E>) => void>
readonly exit: Exit<void, E>
} | {
readonly _tag: "Done"
readonly exit: Exit<void, E>
Expand Down Expand Up @@ -76,7 +86,7 @@ class MailboxImpl<A, E> implements Api.Mailbox<A, E> {
})
}
private releaseCapacity(n: number) {
if (this.state._tag !== "Open") {
if (this.state._tag === "Done") {
return
} else if (this.state.offers.size === 0) {
return
Expand All @@ -95,14 +105,17 @@ class MailboxImpl<A, E> implements Api.Mailbox<A, E> {
this.state.offers.delete(entry)
entry.resume(core.exitSucceed([]))
}
if (this.state._tag === "Closing") {
this.finalize(this.state.exit)
}
}
private awaitTake = core.unsafeAsync<void, E>((resume) => {
if (this.state._tag !== "Open") {
if (this.state._tag === "Done") {
return resume(this.state.exit)
}
this.state.takers.add(resume)
return core.sync(() => {
if (this.state._tag === "Open") {
if (this.state._tag !== "Done") {
this.state.takers.delete(resume)
}
})
Expand All @@ -118,7 +131,7 @@ class MailboxImpl<A, E> implements Api.Mailbox<A, E> {
}
private releaseTaker = () => {
this.scheduleRunning = false
if (this.state._tag !== "Open") {
if (this.state._tag === "Done") {
return
} else if (this.state.takers.size === 0) {
return
Expand Down Expand Up @@ -194,13 +207,19 @@ class MailboxImpl<A, E> implements Api.Mailbox<A, E> {
unsafeDone(exit: Exit<void, E>): boolean {
if (this.state._tag !== "Open") {
return false
} else if (this.state.offers.size === 0) {
this.finalize(exit)
return true
}
this.state = { ...this.state, _tag: "Closing", exit }
return true
}
private finalize(exit: Exit<void, E>) {
if (this.state._tag === "Done") {
return
}
const openState = this.state
this.state = { _tag: "Done", exit }
for (const entry of openState.offers) {
entry.resume(core.exitSucceed(entry.remaining.slice(entry.offset)))
}
openState.offers.clear()
for (const taker of openState.takers) {
taker(exit)
}
Expand All @@ -209,7 +228,6 @@ class MailboxImpl<A, E> implements Api.Mailbox<A, E> {
awaiter(exit)
}
openState.awaiters.clear()
return true
}
done(exit: Exit<void, E>) {
return core.sync(() => this.unsafeDone(exit))
Expand Down Expand Up @@ -261,18 +279,18 @@ class MailboxImpl<A, E> implements Api.Mailbox<A, E> {
return core.zipRight(this.awaitTake, this.take)
})
await: Effect<void, E> = core.unsafeAsync<void, E>((resume) => {
if (this.state._tag !== "Open") {
if (this.state._tag === "Done") {
return resume(this.state.exit)
}
this.state.awaiters.add(resume)
return core.sync(() => {
if (this.state._tag === "Open") {
if (this.state._tag !== "Done") {
this.state.awaiters.delete(resume)
}
})
})
unsafeSize(): Option.Option<number> {
return this.state._tag === "Open" ? Option.some(MutableList.length(this.messages)) : Option.none()
return this.state._tag !== "Done" ? Option.some(MutableList.length(this.messages)) : Option.none()
}
size = core.sync(() => this.unsafeSize())
}
Expand Down
11 changes: 0 additions & 11 deletions packages/effect/test/Mailbox.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,6 @@ describe("Mailbox", () => {
assert.isFalse(result[1])
}))

it.effect("done completes offers", () =>
Effect.gen(function*() {
const mailbox = yield* Mailbox.make<number>(2)
const fiber = yield* mailbox.offerAll([1, 2, 3, 4]).pipe(
Effect.fork
)
yield* Effect.yieldNow()
yield* mailbox.done(Exit.void)
assert.deepStrictEqual(yield* fiber.await, Exit.succeed([3, 4]))
}))

it.effect("done completes takes", () =>
Effect.gen(function*() {
const mailbox = yield* Mailbox.make<number>(2)
Expand Down

0 comments on commit 7543277

Please sign in to comment.