diff --git a/packages/effect/src/internal/mailbox.ts b/packages/effect/src/internal/mailbox.ts index 4b768213d9..875de46fbc 100644 --- a/packages/effect/src/internal/mailbox.ts +++ b/packages/effect/src/internal/mailbox.ts @@ -39,6 +39,16 @@ type MailboxState = { readonly resume: (_: Effect>) => void }> readonly awaiters: Set<(_: Effect) => void> +} | { + readonly _tag: "Closing" + readonly takers: Set<(_: Effect) => void> + readonly offers: Set<{ + readonly remaining: Array + offset: number + readonly resume: (_: Effect>) => void + }> + readonly awaiters: Set<(_: Effect) => void> + readonly exit: Exit } | { readonly _tag: "Done" readonly exit: Exit @@ -76,7 +86,7 @@ class MailboxImpl implements Api.Mailbox { }) } private releaseCapacity(n: number) { - if (this.state._tag !== "Open") { + if (this.state._tag === "Done") { return } else if (this.state.offers.size === 0) { return @@ -95,14 +105,17 @@ class MailboxImpl implements Api.Mailbox { this.state.offers.delete(entry) entry.resume(core.exitSucceed([])) } + if (this.state._tag === "Closing") { + this.finalize(this.state.exit) + } } private awaitTake = core.unsafeAsync((resume) => { - if (this.state._tag !== "Open") { + if (this.state._tag === "Done") { return resume(this.state.exit) } this.state.takers.add(resume) return core.sync(() => { - if (this.state._tag === "Open") { + if (this.state._tag !== "Done") { this.state.takers.delete(resume) } }) @@ -118,7 +131,7 @@ class MailboxImpl implements Api.Mailbox { } private releaseTaker = () => { this.scheduleRunning = false - if (this.state._tag !== "Open") { + if (this.state._tag === "Done") { return } else if (this.state.takers.size === 0) { return @@ -194,13 +207,19 @@ class MailboxImpl implements Api.Mailbox { unsafeDone(exit: Exit): boolean { if (this.state._tag !== "Open") { return false + } else if (this.state.offers.size === 0) { + this.finalize(exit) + return true + } + this.state = { ...this.state, _tag: "Closing", exit } + return true + } + private finalize(exit: Exit) { + if (this.state._tag === "Done") { + return } const openState = this.state this.state = { _tag: "Done", exit } - for (const entry of openState.offers) { - entry.resume(core.exitSucceed(entry.remaining.slice(entry.offset))) - } - openState.offers.clear() for (const taker of openState.takers) { taker(exit) } @@ -209,7 +228,6 @@ class MailboxImpl implements Api.Mailbox { awaiter(exit) } openState.awaiters.clear() - return true } done(exit: Exit) { return core.sync(() => this.unsafeDone(exit)) @@ -261,18 +279,18 @@ class MailboxImpl implements Api.Mailbox { return core.zipRight(this.awaitTake, this.take) }) await: Effect = core.unsafeAsync((resume) => { - if (this.state._tag !== "Open") { + if (this.state._tag === "Done") { return resume(this.state.exit) } this.state.awaiters.add(resume) return core.sync(() => { - if (this.state._tag === "Open") { + if (this.state._tag !== "Done") { this.state.awaiters.delete(resume) } }) }) unsafeSize(): Option.Option { - return this.state._tag === "Open" ? Option.some(MutableList.length(this.messages)) : Option.none() + return this.state._tag !== "Done" ? Option.some(MutableList.length(this.messages)) : Option.none() } size = core.sync(() => this.unsafeSize()) } diff --git a/packages/effect/test/Mailbox.test.ts b/packages/effect/test/Mailbox.test.ts index 4be31dc170..999848e59c 100644 --- a/packages/effect/test/Mailbox.test.ts +++ b/packages/effect/test/Mailbox.test.ts @@ -49,17 +49,6 @@ describe("Mailbox", () => { assert.isFalse(result[1]) })) - it.effect("done completes offers", () => - Effect.gen(function*() { - const mailbox = yield* Mailbox.make(2) - const fiber = yield* mailbox.offerAll([1, 2, 3, 4]).pipe( - Effect.fork - ) - yield* Effect.yieldNow() - yield* mailbox.done(Exit.void) - assert.deepStrictEqual(yield* fiber.await, Exit.succeed([3, 4])) - })) - it.effect("done completes takes", () => Effect.gen(function*() { const mailbox = yield* Mailbox.make(2)