diff --git a/.changeset/new-dancers-ring.md b/.changeset/new-dancers-ring.md new file mode 100644 index 0000000000..58aff999f3 --- /dev/null +++ b/.changeset/new-dancers-ring.md @@ -0,0 +1,6 @@ +--- +"@effect/platform": patch +"@effect/rpc": patch +--- + +use Mailbox for Workers, Socket & Rpc diff --git a/.changeset/thick-dingos-melt.md b/.changeset/thick-dingos-melt.md new file mode 100644 index 0000000000..f9d00d4b7e --- /dev/null +++ b/.changeset/thick-dingos-melt.md @@ -0,0 +1,33 @@ +--- +"effect": minor +--- + +add Mailbox module, a queue which can have done or failure signals + +```ts +import { Effect, Mailbox } from "effect" +import * as assert from "node:assert" + +Effect.gen(function* () { + const mailbox = yield* Mailbox.make() + + // add messages to the mailbox + yield* mailbox.offer(1) + yield* mailbox.offer(2) + yield* mailbox.offerAll([3, 4, 5]) + + // take messages from the mailbox + const [messages, done] = yield* mailbox.takeAll + assert.deepStrictEqual(messages, [1, 2, 3, 4, 5]) + assert.strictEqual(done, false) + + // signal that the mailbox is done + yield* mailbox.end + const [messages2, done2] = yield* mailbox.takeAll + assert.deepStrictEqual(messages2, []) + assert.strictEqual(done2, true) + + // signal that the mailbox is failed + yield* mailbox.fail("boom") +}) +``` diff --git a/packages/effect/src/Mailbox.ts b/packages/effect/src/Mailbox.ts new file mode 100644 index 0000000000..6dfe4be184 --- /dev/null +++ b/packages/effect/src/Mailbox.ts @@ -0,0 +1,236 @@ +/** + * @since 3.8.0 + * @experimental + */ +import type { Cause, NoSuchElementException } from "./Cause.js" +import type { Channel } from "./Channel.js" +import type { Chunk } from "./Chunk.js" +import type { Effect } from "./Effect.js" +import type { Exit } from "./Exit.js" +import type { Inspectable } from "./Inspectable.js" +import * as internal from "./internal/mailbox.js" +import type { Option } from "./Option.js" +import { hasProperty } from "./Predicate.js" +import type { Stream } from "./Stream.js" + +/** + * @since 3.8.0 + * @experimental + * @category type ids + */ +export const TypeId: unique symbol = internal.TypeId + +/** + * @since 3.8.0 + * @experimental + * @category type ids + */ +export type TypeId = typeof TypeId + +/** + * @since 3.8.0 + * @experimental + * @category type ids + */ +export const ReadonlyTypeId: unique symbol = internal.ReadonlyTypeId + +/** + * @since 3.8.0 + * @experimental + * @category type ids + */ +export type ReadonlyTypeId = typeof ReadonlyTypeId + +/** + * @since 3.8.0 + * @experimental + * @category guards + */ +export const isMailbox = (u: unknown): u is Mailbox => hasProperty(u, TypeId) + +/** + * @since 3.8.0 + * @experimental + * @category guards + */ +export const isReadonlyMailbox = (u: unknown): u is ReadonlyMailbox => + hasProperty(u, ReadonlyTypeId) + +/** + * A `Mailbox` is a queue that can be signaled to be done or failed. + * + * @since 3.8.0 + * @experimental + * @category models + */ +export interface Mailbox extends ReadonlyMailbox { + readonly [TypeId]: TypeId + /** + * Add a message to the mailbox. Returns `false` if the mailbox is done. + */ + readonly offer: (message: A) => Effect + /** + * Add a message to the mailbox. Returns `false` if the mailbox is done. + */ + readonly unsafeOffer: (message: A) => boolean + /** + * Add multiple messages to the mailbox. Returns the remaining messages that + * were not added. + */ + readonly offerAll: (messages: Iterable) => Effect> + /** + * Add multiple messages to the mailbox. Returns the remaining messages that + * were not added. + */ + readonly unsafeOfferAll: (messages: Iterable) => Chunk + /** + * Fail the mailbox with an error. If the mailbox is already done, `false` is + * returned. + */ + readonly fail: (error: E) => Effect + /** + * Fail the mailbox with a cause. If the mailbox is already done, `false` is + * returned. + */ + readonly failCause: (cause: Cause) => Effect + /** + * Signal that the mailbox is complete. If the mailbox is already done, `false` is + * returned. + */ + readonly end: Effect + /** + * Signal that the mailbox is done. If the mailbox is already done, `false` is + * returned. + */ + readonly done: (exit: Exit) => Effect + /** + * Signal that the mailbox is done. If the mailbox is already done, `false` is + * returned. + */ + readonly unsafeDone: (exit: Exit) => boolean + /** + * Shutdown the mailbox, canceling any pending operations. + * If the mailbox is already done, `false` is returned. + */ + readonly shutdown: Effect +} + +/** + * A `ReadonlyMailbox` represents a mailbox that can only be read from. + * + * @since 3.8.0 + * @experimental + * @category models + */ +export interface ReadonlyMailbox + extends Effect, done: boolean], E>, Inspectable +{ + readonly [ReadonlyTypeId]: ReadonlyTypeId + /** + * Take all messages from the mailbox, returning an empty Chunk if the mailbox + * is empty or done. + */ + readonly clear: Effect, E> + /** + * Take all messages from the mailbox, or wait for messages to be available. + * + * If the mailbox is done, the `done` flag will be `true`. If the mailbox + * fails, the Effect will fail with the error. + */ + readonly takeAll: Effect, done: boolean], E> + /** + * Take a specified number of messages from the mailbox. It will only take + * up to the capacity of the mailbox. + * + * If the mailbox is done, the `done` flag will be `true`. If the mailbox + * fails, the Effect will fail with the error. + */ + readonly takeN: (n: number) => Effect, done: boolean], E> + /** + * Take a single message from the mailbox, or wait for a message to be + * available. + * + * If the mailbox is done, it will fail with `NoSuchElementException`. If the + * mailbox fails, the Effect will fail with the error. + */ + readonly take: Effect + /** Wait for the mailbox to be done. */ + readonly await: Effect + /** + * Check the size of the mailbox. + * + * If the mailbox is complete, it will return `None`. + */ + readonly size: Effect> + /** + * Check the size of the mailbox. + * + * If the mailbox is complete, it will return `None`. + */ + readonly unsafeSize: () => Option +} + +/** + * A `Mailbox` is a queue that can be signaled to be done or failed. + * + * @since 3.8.0 + * @experimental + * @category constructors + * @example + * import { Effect, Mailbox } from "effect" + * + * Effect.gen(function*() { + * const mailbox = yield* Mailbox.make() + * + * // add messages to the mailbox + * yield* mailbox.offer(1) + * yield* mailbox.offer(2) + * yield* mailbox.offerAll([3, 4, 5]) + * + * // take messages from the mailbox + * const [messages, done] = yield* mailbox.takeAll + * assert.deepStrictEqual(messages, [1, 2, 3, 4, 5]) + * assert.strictEqual(done, false) + * + * // signal that the mailbox is done + * yield* mailbox.end + * const [messages2, done2] = yield* mailbox.takeAll + * assert.deepStrictEqual(messages2, []) + * assert.strictEqual(done2, true) + * + * // signal that the mailbox has failed + * yield* mailbox.fail("boom") + * }) + */ +export const make: (capacity?: number | undefined) => Effect> = internal.make + +/** + * Run an `Effect` into a `Mailbox`, where success ends the mailbox and failure + * fails the mailbox. + * + * @since 3.8.0 + * @experimental + * @category combinators + */ +export const into: { + (self: Mailbox): (effect: Effect) => Effect + (effect: Effect, self: Mailbox): Effect +} = internal.into + +/** + * Create a `Channel` from a `Mailbox`. + * + * @since 3.8.0 + * @experimental + * @category conversions + */ +export const toChannel: (self: ReadonlyMailbox) => Channel, unknown, E> = internal.toChannel + +/** + * Create a `Stream` from a `Mailbox`. + * + * @since 3.8.0 + * @experimental + * @category conversions + */ +export const toStream: (self: ReadonlyMailbox) => Stream = internal.toStream diff --git a/packages/effect/src/index.ts b/packages/effect/src/index.ts index 121a8db705..cb90a4ac72 100644 --- a/packages/effect/src/index.ts +++ b/packages/effect/src/index.ts @@ -409,6 +409,12 @@ export * as LogSpan from "./LogSpan.js" */ export * as Logger from "./Logger.js" +/** + * @since 3.8.0 + * @experimental + */ +export * as Mailbox from "./Mailbox.js" + /** * @since 2.0.0 */ diff --git a/packages/effect/src/internal/core.ts b/packages/effect/src/internal/core.ts index 4b3350fba3..dcd6dcaad2 100644 --- a/packages/effect/src/internal/core.ts +++ b/packages/effect/src/internal/core.ts @@ -516,9 +516,7 @@ export const unsafeAsync = ( cancelerRef = register(resume) } effect.effect_instruction_i1 = blockingOn - return cancelerRef !== undefined ? - onInterrupt(effect, (_) => cancelerRef!) : - effect + return onInterrupt(effect, (_) => isEffect(cancelerRef) ? cancelerRef : void_) } /* @internal */ diff --git a/packages/effect/src/internal/mailbox.ts b/packages/effect/src/internal/mailbox.ts new file mode 100644 index 0000000000..9468a4e3a8 --- /dev/null +++ b/packages/effect/src/internal/mailbox.ts @@ -0,0 +1,463 @@ +import * as Arr from "../Array.js" +import type { Cause } from "../Cause.js" +import { NoSuchElementException } from "../Cause.js" +import type { Channel } from "../Channel.js" +import * as Chunk from "../Chunk.js" +import type { Effect } from "../Effect.js" +import * as Effectable from "../Effectable.js" +import type { Exit } from "../Exit.js" +import { dual } from "../Function.js" +import * as Inspectable from "../Inspectable.js" +import * as Iterable from "../Iterable.js" +import type * as Api from "../Mailbox.js" +import * as Option from "../Option.js" +import { pipeArguments } from "../Pipeable.js" +import { hasProperty } from "../Predicate.js" +import type { Scheduler } from "../Scheduler.js" +import type { Stream } from "../Stream.js" +import * as channel from "./channel.js" +import * as coreChannel from "./core-stream.js" +import * as core from "./core.js" +import * as stream from "./stream.js" + +/** @internal */ +export const TypeId: Api.TypeId = Symbol.for("effect/Mailbox") as Api.TypeId + +/** @internal */ +export const ReadonlyTypeId: Api.ReadonlyTypeId = Symbol.for("effect/Mailbox/ReadonlyMailbox") as Api.ReadonlyTypeId + +/** @internal */ +export const isMailbox = (u: unknown): u is Api.Mailbox => hasProperty(u, TypeId) + +/** @internal */ +export const isReadonlyMailbox = (u: unknown): u is Api.ReadonlyMailbox => + hasProperty(u, ReadonlyTypeId) + +type MailboxState = { + readonly _tag: "Open" + readonly takers: Set<(_: Effect) => void> + readonly offers: Set> + readonly awaiters: Set<(_: Effect) => void> +} | { + readonly _tag: "Closing" + readonly takers: Set<(_: Effect) => void> + readonly offers: Set> + readonly awaiters: Set<(_: Effect) => void> + readonly exit: Exit +} | { + readonly _tag: "Done" + readonly exit: Exit +} + +type OfferEntry = { + readonly _tag: "Array" + readonly remaining: Array + offset: number + readonly resume: (_: Effect>) => void +} | { + readonly _tag: "Single" + readonly message: A + readonly resume: (_: Effect) => void +} + +const empty = Chunk.empty() +const exitEmpty = core.exitSucceed(empty) +const exitFalse = core.exitSucceed(false) +const exitTrue = core.exitSucceed(true) +const constDone = [empty, true] as const + +class MailboxImpl extends Effectable.Class, done: boolean], E> + implements Api.Mailbox +{ + readonly [TypeId]: Api.TypeId = TypeId + readonly [ReadonlyTypeId]: Api.ReadonlyTypeId = ReadonlyTypeId + private state: MailboxState = { + _tag: "Open", + takers: new Set(), + offers: new Set(), + awaiters: new Set() + } + private messages: Array = [] + private messagesChunk = Chunk.empty() + constructor( + readonly scheduler: Scheduler, + readonly capacity: number + ) { + super() + } + + offer(message: A): Effect { + return core.suspend(() => { + if (this.state._tag !== "Open") { + return exitFalse + } else if (this.messages.length + this.messagesChunk.length >= this.capacity) { + return this.offerRemainingSingle(message) + } + this.messages.push(message) + this.scheduleReleaseTaker() + return exitTrue + }) + } + unsafeOffer(message: A): boolean { + if (this.state._tag !== "Open") { + return false + } else if (this.messages.length + this.messagesChunk.length >= this.capacity) { + return false + } + this.messages.push(message) + this.scheduleReleaseTaker() + return true + } + offerAll(messages: Iterable): Effect> { + return core.suspend(() => { + if (this.state._tag !== "Open") { + return core.succeed(Chunk.fromIterable(messages)) + } + const remaining = this.unsafeOfferAllArray(messages) + if (remaining.length === 0) { + return exitEmpty + } + return this.offerRemainingArray(remaining) + }) + } + unsafeOfferAll(messages: Iterable): Chunk.Chunk { + return Chunk.unsafeFromArray(this.unsafeOfferAllArray(messages)) + } + unsafeOfferAllArray(messages: Iterable): Array { + if (this.state._tag !== "Open") { + return Arr.fromIterable(messages) + } else if (this.capacity === Number.POSITIVE_INFINITY) { + if (this.messages.length > 0) { + this.messagesChunk = Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) + } + if (Chunk.isChunk(messages)) { + this.messagesChunk = Chunk.appendAll(this.messagesChunk, messages) + } else { + this.messages = Arr.fromIterable(messages) + } + this.scheduleReleaseTaker() + return [] + } + const free = this.capacity - this.messages.length - this.messagesChunk.length + if (free === 0) { + return Arr.fromIterable(messages) + } + const remaining: Array = [] + let i = 0 + for (const message of messages) { + if (i < free) { + this.messages.push(message) + } else { + remaining.push(message) + } + i++ + } + this.scheduleReleaseTaker() + return remaining + } + fail(error: E) { + return this.done(core.exitFail(error)) + } + failCause(cause: Cause) { + return this.done(core.exitFailCause(cause)) + } + unsafeDone(exit: Exit): boolean { + if (this.state._tag !== "Open") { + return false + } else if (this.state.offers.size === 0) { + this.finalize(exit) + return true + } + this.state = { ...this.state, _tag: "Closing", exit } + return true + } + shutdown: Effect = core.sync(() => { + if (this.state._tag === "Done") { + return true + } + const offers = this.state.offers + this.finalize(this.state._tag === "Open" ? core.exitVoid : this.state.exit) + if (offers.size > 0) { + for (const entry of offers) { + if (entry._tag === "Single") { + entry.resume(exitFalse) + } else { + entry.resume(core.exitSucceed(Chunk.unsafeFromArray(entry.remaining.slice(entry.offset)))) + } + } + offers.clear() + } + return true + }) + done(exit: Exit) { + return core.sync(() => this.unsafeDone(exit)) + } + end = this.done(core.exitVoid) + clear: Effect, E> = core.suspend(() => { + const messages = this.unsafeTakeAll() + if (messages.length === 0 && this.state._tag === "Done") { + return core.exitAs(this.state.exit, empty) + } + this.releaseCapacity() + return core.succeed(messages) + }) + takeAll: Effect, done: boolean], E> = core.suspend(() => { + const messages = this.unsafeTakeAll() + if (messages.length === 0) { + if (this.state._tag === "Done") { + return core.exitAs(this.state.exit, constDone) + } + return core.zipRight(this.awaitTake, this.takeAll) + } else if (this.state._tag === "Done") { + return core.succeed([messages, this.state.exit._tag === "Success"]) + } + this.releaseCapacity() + return core.succeed([messages, false]) + }) + takeN(n: number): Effect, done: boolean], E> { + return core.suspend(() => { + if (n <= 0) { + return this.state._tag === "Done" + ? core.exitAs(this.state.exit, constDone) + : core.succeed([empty, false]) + } + n = Math.min(n, this.capacity) + let messages: Chunk.Chunk + if (n <= this.messagesChunk.length) { + messages = Chunk.take(this.messagesChunk, n) + this.messagesChunk = Chunk.drop(this.messagesChunk, n) + } else if (n <= this.messages.length + this.messagesChunk.length) { + this.messagesChunk = Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) + this.messages = [] + messages = Chunk.take(this.messagesChunk, n) + this.messagesChunk = Chunk.drop(this.messagesChunk, n) + } else if (this.state._tag === "Done") { + return core.exitAs(this.state.exit, constDone) + } else { + return core.zipRight(this.awaitTake, this.takeN(n)) + } + if (this.state._tag === "Done") { + return core.succeed([messages, this.state.exit._tag === "Success"]) + } + this.releaseCapacity() + return core.succeed([messages, false]) + }) + } + take: Effect = core.suspend(() => { + let message: A + if (this.messagesChunk.length > 0) { + message = Chunk.unsafeHead(this.messagesChunk) + this.messagesChunk = Chunk.drop(this.messagesChunk, 1) + } else if (this.messages.length > 0) { + message = this.messages[0] + this.messagesChunk = Chunk.drop(Chunk.unsafeFromArray(this.messages), 1) + this.messages = [] + } else if (this.state._tag === "Done") { + return core.exitZipRight(this.state.exit, core.exitFail(new NoSuchElementException())) + } else { + return core.zipRight(this.awaitTake, this.take) + } + this.releaseCapacity() + return core.succeed(message) + }) + await: Effect = core.unsafeAsync((resume) => { + if (this.state._tag === "Done") { + return resume(this.state.exit) + } + this.state.awaiters.add(resume) + return core.sync(() => { + if (this.state._tag !== "Done") { + this.state.awaiters.delete(resume) + } + }) + }) + unsafeSize(): Option.Option { + const size = this.messages.length + this.messagesChunk.length + return this.state._tag === "Done" && size === 0 ? Option.none() : Option.some(size) + } + size = core.sync(() => this.unsafeSize()) + + commit() { + return this.takeAll + } + pipe() { + return pipeArguments(this, arguments) + } + toJSON() { + return { + _id: "effect/Mailbox", + state: this.state._tag, + size: this.unsafeSize().toJSON() + } + } + toString(): string { + return Inspectable.format(this) + } + [Inspectable.NodeInspectSymbol]() { + return Inspectable.format(this) + } + + private offerRemainingSingle(message: A) { + return core.unsafeAsync((resume) => { + if (this.state._tag !== "Open") { + return resume(exitFalse) + } + const entry: OfferEntry = { _tag: "Single", message, resume } + this.state.offers.add(entry) + return core.sync(() => { + if (this.state._tag === "Open") { + this.state.offers.delete(entry) + } + }) + }) + } + private offerRemainingArray(remaining: Array) { + return core.unsafeAsync>((resume) => { + if (this.state._tag !== "Open") { + return resume(core.exitSucceed(Chunk.unsafeFromArray(remaining))) + } + const entry: OfferEntry = { _tag: "Array", remaining, offset: 0, resume } + this.state.offers.add(entry) + return core.sync(() => { + if (this.state._tag === "Open") { + this.state.offers.delete(entry) + } + }) + }) + } + private releaseCapacity() { + if (this.state._tag === "Done" || this.state.offers.size === 0) { + return + } + let n = this.capacity - this.messages.length - this.messagesChunk.length + for (const entry of this.state.offers) { + if (n === 0) return + else if (entry._tag === "Single") { + this.messages.push(entry.message) + n-- + entry.resume(exitTrue) + this.state.offers.delete(entry) + } else { + for (; entry.offset < entry.remaining.length; entry.offset++) { + if (n === 0) return + this.messages.push(entry.remaining[entry.offset]) + n-- + } + entry.resume(exitEmpty) + this.state.offers.delete(entry) + } + } + if (this.state._tag === "Closing") { + this.finalize(this.state.exit) + } + } + private awaitTake = core.unsafeAsync((resume) => { + if (this.state._tag === "Done") { + return resume(this.state.exit) + } + this.state.takers.add(resume) + return core.sync(() => { + if (this.state._tag !== "Done") { + this.state.takers.delete(resume) + } + }) + }) + + private scheduleRunning = false + private scheduleReleaseTaker() { + if (this.scheduleRunning) { + return + } + this.scheduleRunning = true + this.scheduler.scheduleTask(this.releaseTaker, 0) + } + private releaseTaker = () => { + this.scheduleRunning = false + if (this.state._tag === "Done") { + return + } else if (this.state.takers.size === 0) { + return + } + const taker = Iterable.unsafeHead(this.state.takers) + this.state.takers.delete(taker) + taker(core.exitVoid) + } + + private unsafeTakeAll() { + if (this.messagesChunk.length > 0) { + const messages = this.messages.length > 0 ? + Chunk.appendAll(this.messagesChunk, Chunk.unsafeFromArray(this.messages)) : + this.messagesChunk + this.messagesChunk = empty + this.messages = [] + return messages + } else if (this.messages.length > 0) { + const messages = Chunk.unsafeFromArray(this.messages) + this.messages = [] + return messages + } + return empty + } + + private finalize(exit: Exit) { + if (this.state._tag === "Done") { + return + } + const openState = this.state + this.state = { _tag: "Done", exit } + for (const taker of openState.takers) { + taker(exit) + } + openState.takers.clear() + for (const awaiter of openState.awaiters) { + awaiter(exit) + } + openState.awaiters.clear() + } +} + +/** @internal */ +export const make = (capacity?: number | undefined): Effect> => + core.withFiberRuntime((fiber) => + core.succeed( + new MailboxImpl( + fiber.currentScheduler, + capacity ?? Number.POSITIVE_INFINITY + ) + ) + ) + +/** @internal */ +export const into: { + ( + self: Api.Mailbox + ): (effect: Effect) => Effect + ( + effect: Effect, + self: Api.Mailbox + ): Effect +} = dual( + 2, + ( + effect: Effect, + self: Api.Mailbox + ): Effect => + core.uninterruptibleMask((restore) => + core.matchCauseEffect(restore(effect), { + onFailure: (cause) => self.failCause(cause), + onSuccess: (_) => self.end + }) + ) +) + +/** @internal */ +export const toChannel = (self: Api.ReadonlyMailbox): Channel, unknown, E> => { + const loop: Channel, unknown, E> = coreChannel.flatMap(self.takeAll, ([messages, done]) => + done + ? messages.length === 0 ? coreChannel.void : coreChannel.write(messages) + : channel.zipRight(coreChannel.write(messages), loop)) + return loop +} + +/** @internal */ +export const toStream = (self: Api.ReadonlyMailbox): Stream => stream.fromChannel(toChannel(self)) diff --git a/packages/effect/test/Mailbox.test.ts b/packages/effect/test/Mailbox.test.ts new file mode 100644 index 0000000000..504e558329 --- /dev/null +++ b/packages/effect/test/Mailbox.test.ts @@ -0,0 +1,134 @@ +import { Chunk, Effect, Exit, Fiber, Mailbox, Option, Stream } from "effect" +import { assert, describe, it } from "effect/test/utils/extend" + +describe("Mailbox", () => { + it.effect("offerAll with capacity", () => + Effect.gen(function*() { + const mailbox = yield* Mailbox.make(2) + const fiber = yield* mailbox.offerAll([1, 2, 3, 4]).pipe( + Effect.fork + ) + yield* Effect.yieldNow({ priority: 1 }) + assert.isNull(fiber.unsafePoll()) + + let result = yield* mailbox + assert.deepStrictEqual(Chunk.toReadonlyArray(result[0]), [1, 2]) + assert.isFalse(result[1]) + + yield* Effect.yieldNow({ priority: 1 }) + assert.isNotNull(fiber.unsafePoll()) + + result = yield* mailbox.takeAll + assert.deepStrictEqual(Chunk.toReadonlyArray(result[0]), [3, 4]) + assert.isFalse(result[1]) + + yield* Effect.yieldNow({ priority: 1 }) + assert.deepStrictEqual(fiber.unsafePoll(), Exit.succeed(Chunk.empty())) + })) + + it.effect("offerAll can be interrupted", () => + Effect.gen(function*() { + const mailbox = yield* Mailbox.make(2) + const fiber = yield* mailbox.offerAll([1, 2, 3, 4]).pipe( + Effect.fork + ) + + yield* Effect.yieldNow({ priority: 1 }) + yield* Fiber.interrupt(fiber) + yield* Effect.yieldNow({ priority: 1 }) + + let result = yield* mailbox.takeAll + assert.deepStrictEqual(Chunk.toReadonlyArray(result[0]), [1, 2]) + assert.isFalse(result[1]) + + yield* mailbox.offer(5) + yield* Effect.yieldNow({ priority: 1 }) + + result = yield* mailbox.takeAll + assert.deepStrictEqual(Chunk.toReadonlyArray(result[0]), [5]) + assert.isFalse(result[1]) + })) + + it.effect("done completes takes", () => + Effect.gen(function*() { + const mailbox = yield* Mailbox.make(2) + const fiber = yield* mailbox.takeAll.pipe( + Effect.fork + ) + yield* Effect.yieldNow() + yield* mailbox.done(Exit.void) + assert.deepStrictEqual(yield* fiber.await, Exit.succeed([Chunk.empty(), true] as const)) + })) + + it.effect("end", () => + Effect.gen(function*() { + const mailbox = yield* Mailbox.make(2) + yield* Effect.fork(mailbox.offerAll([1, 2, 3, 4])) + yield* Effect.fork(mailbox.offerAll([5, 6, 7, 8])) + yield* Effect.fork(mailbox.offer(9)) + yield* Effect.fork(mailbox.end) + const items = yield* Stream.runCollect(Mailbox.toStream(mailbox)) + assert.deepStrictEqual(Chunk.toReadonlyArray(items), [1, 2, 3, 4, 5, 6, 7, 8, 9]) + assert.strictEqual(yield* mailbox.await, void 0) + assert.strictEqual(yield* mailbox.offer(10), false) + })) + + it.effect("end with take", () => + Effect.gen(function*() { + const mailbox = yield* Mailbox.make(2) + yield* Effect.fork(mailbox.offerAll([1, 2])) + yield* Effect.fork(mailbox.offer(3)) + yield* Effect.fork(mailbox.end) + assert.strictEqual(yield* mailbox.take, 1) + assert.strictEqual(yield* mailbox.take, 2) + assert.strictEqual(yield* mailbox.take, 3) + assert.strictEqual(yield* mailbox.take.pipe(Effect.optionFromOptional), Option.none()) + assert.strictEqual(yield* mailbox.await, void 0) + assert.strictEqual(yield* mailbox.offer(10), false) + })) + + it.effect("fail", () => + Effect.gen(function*() { + const mailbox = yield* Mailbox.make(2) + yield* Effect.fork(mailbox.offerAll([1, 2, 3, 4])) + yield* Effect.fork(mailbox.offer(5)) + yield* Effect.fork(mailbox.fail("boom")) + const takeArr = Effect.map(mailbox.takeAll, ([_]) => Chunk.toReadonlyArray(_)) + assert.deepStrictEqual(yield* takeArr, [1, 2]) + assert.deepStrictEqual(yield* takeArr, [3, 4]) + const [items, done] = yield* mailbox.takeAll + assert.deepStrictEqual(Chunk.toReadonlyArray(items), [5]) + assert.strictEqual(done, false) + const error = yield* mailbox.takeAll.pipe(Effect.flip) + assert.deepStrictEqual(error, "boom") + assert.strictEqual(yield* mailbox.await.pipe(Effect.flip), "boom") + assert.strictEqual(yield* mailbox.offer(6), false) + })) + + it.effect("shutdown", () => + Effect.gen(function*() { + const mailbox = yield* Mailbox.make(2) + yield* Effect.fork(mailbox.offerAll([1, 2, 3, 4])) + yield* Effect.fork(mailbox.offerAll([5, 6, 7, 8])) + yield* Effect.fork(mailbox.shutdown) + const items = yield* Stream.runCollect(Mailbox.toStream(mailbox)) + assert.deepStrictEqual(Chunk.toReadonlyArray(items), [1, 2]) + assert.strictEqual(yield* mailbox.await, void 0) + assert.strictEqual(yield* mailbox.offer(10), false) + })) + + it.effect("fail doesnt drop items", () => + Effect.gen(function*() { + const mailbox = yield* Mailbox.make(2) + yield* Effect.fork(mailbox.offerAll([1, 2, 3, 4])) + yield* Effect.fork(mailbox.offer(5)) + yield* Effect.fork(mailbox.fail("boom")) + const items: Array = [] + const error = yield* Mailbox.toStream(mailbox).pipe( + Stream.runForEach((item) => Effect.sync(() => items.push(item))), + Effect.flip + ) + assert.deepStrictEqual(items, [1, 2, 3, 4, 5]) + assert.strictEqual(error, "boom") + })) +}) diff --git a/packages/platform-bun/src/internal/httpServer.ts b/packages/platform-bun/src/internal/httpServer.ts index f4169b4bfb..eb10b0b9d7 100644 --- a/packages/platform-bun/src/internal/httpServer.ts +++ b/packages/platform-bun/src/internal/httpServer.ts @@ -412,16 +412,20 @@ class ServerRequestImpl extends Inspectable.Class implements ServerRequest.HttpS }) const writer = Effect.succeed(write) const runRaw = ( - handler: (_: Uint8Array | string) => Effect.Effect<_, E, R> + handler: (_: Uint8Array | string) => Effect.Effect<_, E, R> | void ): Effect.Effect => FiberSet.make().pipe( Effect.flatMap((set) => FiberSet.runtime(set)().pipe( Effect.flatMap((run) => { - ws.data.run = function(data: Uint8Array | string) { - run(handler(data)) + function runRaw(data: Uint8Array | string) { + const result = handler(data) + if (Effect.isEffect(result)) { + run(result) + } } - ws.data.buffer.forEach((data) => run(handler(data))) + ws.data.run = runRaw + ws.data.buffer.forEach(runRaw) ws.data.buffer.length = 0 return FiberSet.join(set) }) @@ -434,7 +438,7 @@ class ServerRequestImpl extends Inspectable.Class implements ServerRequest.HttpS ) const encoder = new TextEncoder() - const run = (handler: (_: Uint8Array) => Effect.Effect<_, E, R>) => + const run = (handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) => runRaw((data) => typeof data === "string" ? handler(encoder.encode(data)) : handler(data)) return Socket.Socket.of({ diff --git a/packages/platform-node-shared/src/NodeSocket.ts b/packages/platform-node-shared/src/NodeSocket.ts index da86239093..c5c0e06233 100644 --- a/packages/platform-node-shared/src/NodeSocket.ts +++ b/packages/platform-node-shared/src/NodeSocket.ts @@ -84,7 +84,7 @@ export const fromDuplex = ( Effect.bindTo("sendQueue"), Effect.bind("openContext", () => Effect.context>()), Effect.map(({ openContext, sendQueue }) => { - const run = (handler: (_: Uint8Array) => Effect.Effect<_, E, R>) => + const run = (handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) => Effect.scope.pipe( Effect.bindTo("scope"), Effect.bind("conn", ({ scope }) => @@ -121,7 +121,10 @@ export const fromDuplex = ( ), Effect.tap(({ conn, fiberSet, run }) => { conn.on("data", (chunk) => { - run(handler(chunk)) + const result = handler(chunk) + if (Effect.isEffect(result)) { + run(result) + } }) return Effect.async((resume) => { diff --git a/packages/platform/src/Socket.ts b/packages/platform/src/Socket.ts index c0a374829f..b880c21ff6 100644 --- a/packages/platform/src/Socket.ts +++ b/packages/platform/src/Socket.ts @@ -2,7 +2,7 @@ * @since 1.0.0 */ import * as Channel from "effect/Channel" -import * as Chunk from "effect/Chunk" +import type * as Chunk from "effect/Chunk" import * as Context from "effect/Context" import * as Deferred from "effect/Deferred" import type { DurationInput } from "effect/Duration" @@ -14,6 +14,7 @@ import * as FiberSet from "effect/FiberSet" import { dual } from "effect/Function" import { globalValue } from "effect/GlobalValue" import * as Layer from "effect/Layer" +import * as Mailbox from "effect/Mailbox" import * as Predicate from "effect/Predicate" import * as Queue from "effect/Queue" import * as Scope from "effect/Scope" @@ -52,11 +53,11 @@ export const Socket: Context.Tag = Context.GenericTag( */ export interface Socket { readonly [TypeId]: TypeId - readonly run: <_, E, R>( - handler: (_: Uint8Array) => Effect.Effect<_, E, R> + readonly run: <_, E = never, R = never>( + handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void ) => Effect.Effect - readonly runRaw: <_, E, R>( - handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> + readonly runRaw: <_, E = never, R = never>( + handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void ) => Effect.Effect readonly writer: Effect.Effect< (chunk: Uint8Array | string | CloseEvent) => Effect.Effect, @@ -190,27 +191,25 @@ export const toChannelMap = ( > => Effect.scope.pipe( Effect.bindTo("scope"), - Effect.let("state", () => ({ finished: false, buffer: [] as Array })), - Effect.bind("semaphore", () => Effect.makeSemaphore(0)), + Effect.bind("mailbox", () => Mailbox.make()), Effect.bind("writeScope", ({ scope }) => Scope.fork(scope, ExecutionStrategy.sequential)), Effect.bind("write", ({ writeScope }) => Scope.extend(self.writer, writeScope)), - Effect.bind("deferred", () => Deferred.make()), Effect.let( "input", ( - { deferred, write, writeScope } + { mailbox, write, writeScope } ): AsyncProducer.AsyncInputProducer, unknown> => ({ awaitRead: () => Effect.void, emit(chunk) { return Effect.catchAllCause( Effect.forEach(chunk, write, { discard: true }), - (cause) => Deferred.failCause(deferred, cause) + (cause) => mailbox.failCause(cause) ) }, error(error) { return Effect.zipRight( Scope.close(writeScope, Exit.void), - Deferred.failCause(deferred, error) + mailbox.failCause(error) ) }, done() { @@ -218,35 +217,16 @@ export const toChannelMap = ( } }) ), - Effect.tap(({ deferred, scope, semaphore, state }) => + Effect.tap(({ mailbox, scope }) => self.runRaw((data) => { - state.buffer.push(f(data)) - return semaphore.release(1) + mailbox.unsafeOffer(f(data)) }).pipe( - Effect.intoDeferred(deferred), - Effect.raceFirst(Deferred.await(deferred)), - Effect.ensuring(Effect.suspend(() => { - state.finished = true - return semaphore.release(1) - })), + Mailbox.into(mailbox), Effect.forkIn(scope), Effect.interruptible ) ), - Effect.map(({ deferred, input, semaphore, state }) => { - const loop: Channel.Channel, unknown, SocketError | IE, unknown, void, unknown> = Channel.flatMap( - semaphore.take(1), - (_) => { - if (state.buffer.length === 0) { - return state.finished ? Deferred.await(deferred) : loop - } - const chunk = Chunk.unsafeFromArray(state.buffer) - state.buffer = [] - return Channel.zipRight(Channel.write(chunk), state.finished ? Deferred.await(deferred) : loop) - } - ) - return Channel.embedInput(loop, input) - }), + Effect.map(({ input, mailbox }) => Channel.embedInput(Mailbox.toChannel(mailbox), input)), Channel.unwrapScoped ) @@ -395,14 +375,7 @@ export const makeWebSocket = (url: string | Effect.Effect, options?: { (typeof url === "string" ? Effect.succeed(url) : url).pipe( Effect.flatMap((url) => Effect.map(WebSocketConstructor, (f) => f(url))) ), - (ws) => - Effect.sync(() => { - ws.onclose = null - ws.onerror = null - ws.onmessage = null - ws.onopen = null - return ws.close() - }) + (ws) => Effect.sync(() => ws.close()) ), options ) @@ -424,7 +397,7 @@ export const fromWebSocket = ( (sendQueue) => { const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError - const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) => + const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) => acquire.pipe( Effect.bindTo("ws"), Effect.bind("fiberSet", () => FiberSet.make()), @@ -433,16 +406,29 @@ export const fromWebSocket = ( Effect.tap(({ fiberSet, run, ws }) => { let open = false - ws.onmessage = (event) => { - run(handler( + function onMessage(event: MessageEvent) { + const result = handler( typeof event.data === "string" ? event.data : event.data instanceof Uint8Array ? event.data : new Uint8Array(event.data) - )) + ) + if (Effect.isEffect(result)) { + run(result) + } } - ws.onclose = (event) => { + function onError(cause: Event) { + ws.removeEventListener("message", onMessage) + ws.removeEventListener("close", onClose) + Deferred.unsafeDone( + fiberSet.deferred, + Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause })) + ) + } + function onClose(event: globalThis.CloseEvent) { + ws.removeEventListener("message", onMessage) + ws.removeEventListener("error", onError) Deferred.unsafeDone( fiberSet.deferred, Effect.fail( @@ -454,19 +440,17 @@ export const fromWebSocket = ( ) ) } - ws.onerror = (cause) => { - Deferred.unsafeDone( - fiberSet.deferred, - Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause })) - ) - } + + ws.addEventListener("close", onClose, { once: true }) + ws.addEventListener("error", onError, { once: true }) + ws.addEventListener("message", onMessage) if (ws.readyState !== 1) { const openDeferred = Deferred.unsafeMake(fiber.id()) - ws.onopen = () => { + ws.addEventListener("open", () => { open = true Deferred.unsafeDone(openDeferred, Effect.void) - } + }, { once: true }) return Deferred.await(openDeferred).pipe( Effect.timeoutFail({ duration: options?.openTimeout ?? 10000, @@ -514,7 +498,7 @@ export const fromWebSocket = ( ) const encoder = new TextEncoder() - const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) => + const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) => runRaw((data) => typeof data === "string" ? handler(encoder.encode(data)) @@ -600,7 +584,7 @@ export const fromTransformStream = (acquire: Effect.Effect { const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError - const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) => + const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) => acquire.pipe( Effect.bindTo("stream"), Effect.bind("reader", ({ stream }) => @@ -681,7 +665,7 @@ export const fromTransformStream = (acquire: Effect.Effect(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) => + const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) => runRaw((data) => typeof data === "string" ? handler(encoder.encode(data)) diff --git a/packages/platform/src/internal/worker.ts b/packages/platform/src/internal/worker.ts index 18b29adcf2..2e89e2d80c 100644 --- a/packages/platform/src/internal/worker.ts +++ b/packages/platform/src/internal/worker.ts @@ -1,8 +1,6 @@ import * as Schema from "@effect/schema/Schema" import * as Serializable from "@effect/schema/Serializable" -import * as Cause from "effect/Cause" import * as Channel from "effect/Channel" -import * as Chunk from "effect/Chunk" import * as Context from "effect/Context" import * as Deferred from "effect/Deferred" import * as Effect from "effect/Effect" @@ -11,9 +9,9 @@ import * as FiberRef from "effect/FiberRef" import * as FiberSet from "effect/FiberSet" import { identity, pipe } from "effect/Function" import * as Layer from "effect/Layer" +import * as Mailbox from "effect/Mailbox" import * as Option from "effect/Option" import * as Pool from "effect/Pool" -import * as Queue from "effect/Queue" import * as Schedule from "effect/Schedule" import * as Scope from "effect/Scope" import * as Stream from "effect/Stream" @@ -62,7 +60,7 @@ export const makeManager = Effect.gen(function*() { let requestIdCounter = 0 const requestMap = new Map< number, - Queue.Queue, E | WorkerError>> | Deferred.Deferred + Mailbox.Mailbox | Deferred.Deferred >() const collector = Transferable.unsafeMakeCollector() @@ -84,10 +82,10 @@ export const makeManager = Effect.gen(function*() { return handleMessage(message[1]) }).pipe( Effect.onError((cause) => - Effect.forEach(requestMap.values(), (queue) => - Deferred.DeferredTypeId in queue - ? Deferred.failCause(queue, cause) - : Queue.offer(queue, Exit.failCause(cause))) + Effect.forEach(requestMap.values(), (mailbox) => + Deferred.DeferredTypeId in mailbox + ? Deferred.failCause(mailbox, cause) + : mailbox.failCause(cause)) ), Effect.retry(Schedule.spaced(1000)), Effect.annotateLogs({ @@ -100,10 +98,10 @@ export const makeManager = Effect.gen(function*() { yield* Effect.addFinalizer(() => Effect.zipRight( - Effect.forEach(requestMap.values(), (queue) => - Deferred.DeferredTypeId in queue - ? Deferred.interrupt(queue) - : Queue.offer(queue, Exit.failCause(Cause.empty)), { + Effect.forEach(requestMap.values(), (mailbox) => + Deferred.DeferredTypeId in mailbox + ? Deferred.interrupt(mailbox) + : mailbox.end, { discard: true }), Effect.sync(() => requestMap.clear()) @@ -112,61 +110,58 @@ export const makeManager = Effect.gen(function*() { const handleMessage = (response: Worker.Worker.Response) => Effect.suspend(() => { - const queue = requestMap.get(response[0]) - if (!queue) return Effect.void + const mailbox = requestMap.get(response[0]) + if (!mailbox) return Effect.void switch (response[1]) { // data case 0: { - return Deferred.DeferredTypeId in queue - ? Deferred.succeed(queue, response[2][0]) - : Queue.offer(queue, Exit.succeed(response[2])) + return Deferred.DeferredTypeId in mailbox + ? Deferred.succeed(mailbox, response[2][0]) + : mailbox.offerAll(response[2]) } // end case 1: { if (response.length === 2) { - return Deferred.DeferredTypeId in queue - ? Deferred.interrupt(queue) - : Queue.offer(queue, Exit.failCause(Cause.empty)) + return Deferred.DeferredTypeId in mailbox + ? Deferred.interrupt(mailbox) + : mailbox.end } - return Deferred.DeferredTypeId in queue - ? Deferred.succeed(queue, response[2][0]) - : Effect.zipRight( - Queue.offer(queue, Exit.succeed(response[2])), - Queue.offer(queue, Exit.failCause(Cause.empty)) - ) + return Deferred.DeferredTypeId in mailbox + ? Deferred.succeed(mailbox, response[2][0]) + : Effect.zipRight(mailbox.offerAll(response[2]), mailbox.end) } // error / defect case 2: case 3: { if (response[1] === 2) { - return Deferred.DeferredTypeId in queue - ? Deferred.fail(queue, response[2]) - : Queue.offer(queue, Exit.fail(response[2])) + return Deferred.DeferredTypeId in mailbox + ? Deferred.fail(mailbox, response[2]) + : mailbox.fail(response[2]) } const cause = WorkerError.decodeCause(response[2]) - return Deferred.DeferredTypeId in queue - ? Deferred.failCause(queue, cause) - : Queue.offer(queue, Exit.failCause(cause)) + return Deferred.DeferredTypeId in mailbox + ? Deferred.failCause(mailbox, cause) + : mailbox.failCause(cause) } } }) const executeAcquire = < - Q extends Queue.Queue, E | WorkerError>> | Deferred.Deferred - >(request: I, makeQueue: Effect.Effect) => + Q extends Mailbox.Mailbox | Deferred.Deferred + >(request: I, makeMailbox: Effect.Effect) => Effect.withFiberRuntime<{ readonly id: number - readonly queue: Q + readonly mailbox: Q }>((fiber) => { const context = fiber.getFiberRef(FiberRef.currentContext) const span = Context.getOption(context, Tracer.ParentSpan).pipe( Option.filter((span): span is Tracer.Span => span._tag === "Span") ) const id = requestIdCounter++ - return makeQueue.pipe( - Effect.tap((queue) => { - requestMap.set(id, queue) + return makeMailbox.pipe( + Effect.tap((mailbox) => { + requestMap.set(id, mailbox) return wrappedEncode(request).pipe( Effect.tap((payload) => backing.send([ @@ -177,13 +172,13 @@ export const makeManager = Effect.gen(function*() { ], collector.unsafeRead()) ), Effect.catchAllCause((cause) => - Deferred.DeferredTypeId in queue ? - Deferred.failCause(queue, cause) : - Queue.offer(queue, Exit.failCause(cause)) + Mailbox.isMailbox(mailbox) + ? mailbox.failCause(cause) + : Deferred.failCause(mailbox, cause) ) ) }), - Effect.map((queue) => ({ id, queue })) + Effect.map((mailbox) => ({ id, mailbox })) ) }) @@ -197,18 +192,8 @@ export const makeManager = Effect.gen(function*() { const execute = (request: I) => Stream.fromChannel( Channel.acquireUseRelease( - executeAcquire(request, Queue.unbounded, E | WorkerError>>()), - ({ queue }) => { - const loop: Channel.Channel, unknown, E | WorkerError, unknown, void, unknown> = Channel - .flatMap( - Queue.take(queue), - Exit.match({ - onFailure: (cause) => Cause.isEmpty(cause) ? Channel.void : Channel.failCause(cause), - onSuccess: (value) => Channel.flatMap(Channel.write(Chunk.unsafeFromArray(value)), () => loop) - }) - ) - return loop - }, + executeAcquire(request, Mailbox.make()), + ({ mailbox }) => Mailbox.toChannel(mailbox), executeRelease ) ) @@ -216,7 +201,7 @@ export const makeManager = Effect.gen(function*() { const executeEffect = (request: I) => Effect.acquireUseRelease( executeAcquire(request, Deferred.make()), - ({ queue }) => Deferred.await(queue), + ({ mailbox }) => Deferred.await(mailbox), executeRelease ) diff --git a/packages/rpc/src/RpcRouter.ts b/packages/rpc/src/RpcRouter.ts index c3bf8aae33..a1113e25e5 100644 --- a/packages/rpc/src/RpcRouter.ts +++ b/packages/rpc/src/RpcRouter.ts @@ -11,9 +11,9 @@ import * as Context from "effect/Context" import * as Effect from "effect/Effect" import * as Exit from "effect/Exit" import { dual, pipe } from "effect/Function" +import * as Mailbox from "effect/Mailbox" import { type Pipeable, pipeArguments } from "effect/Pipeable" import * as Predicate from "effect/Predicate" -import * as Queue from "effect/Queue" import * as Stream from "effect/Stream" import { StreamRequestTypeId, withRequestTag } from "./internal/rpc.js" import * as Rpc from "./Rpc.js" @@ -172,21 +172,6 @@ export const provideService: { service: S ): RpcRouter> => fromSet(new Set([...self.rpcs].map(Rpc.provideService(tag, service))))) -const EOF = Symbol.for("@effect/rpc/Router/EOF") - -const channelFromQueue = (queue: Queue.Queue) => { - const loop: Channel.Channel> = Channel.flatMap( - Queue.takeBetween(queue, 1, Number.MAX_SAFE_INTEGER), - (chunk) => { - if (Chunk.unsafeLast(chunk) === EOF) { - return Channel.write(Chunk.dropRight(chunk as Chunk.Chunk, 1)) - } - return Channel.zipRight(Channel.write(chunk as Chunk.Chunk), loop) - } - ) - return loop -} - const emptyExit = Schema.encodeSync(Schema.Exit({ failure: Schema.Never, success: Schema.Never, @@ -219,8 +204,8 @@ export const toHandler = >(router: R, options?: { return (u: unknown): Stream.Stream> => pipe( decode(u), - Effect.zip(Queue.bounded(4)), - Effect.tap(([requests, queue]) => + Effect.zip(Mailbox.make(4)), + Effect.tap(([requests, mailbox]) => pipe( Effect.forEach(requests, (req, index) => { const [request, rpc] = req.request @@ -231,11 +216,11 @@ export const toHandler = >(router: R, options?: { Effect.flatMap(encode), Effect.orDie, Effect.matchCauseEffect({ - onSuccess: (response) => Queue.offer(queue, [index, response]), + onSuccess: (response) => mailbox.offer([index, response]), onFailure: (cause) => Effect.flatMap( encode(Exit.failCause(cause)), - (response) => Queue.offer(queue, [index, response]) + (response) => mailbox.offer([index, response]) ) }), Effect.locally(Rpc.currentHeaders, req.headers as any), @@ -259,16 +244,16 @@ export const toHandler = >(router: R, options?: { Channel.mapOutEffect((chunk) => Effect.flatMap( encode(Chunk.map(chunk, Exit.succeed)), - (response) => Queue.offer(queue, [index, response]) + (response) => mailbox.offer([index, response]) ) ), Channel.runDrain, Effect.matchCauseEffect({ - onSuccess: () => Queue.offer(queue, [index, [emptyExit]]), + onSuccess: () => mailbox.offer([index, [emptyExit]]), onFailure: (cause) => Effect.flatMap( encode(Chunk.of(Exit.failCause(cause))), - (response) => Queue.offer(queue, [index, response]) + (response) => mailbox.offer([index, response]) ) }), Effect.locally(Rpc.currentHeaders, req.headers as any), @@ -285,11 +270,11 @@ export const toHandler = >(router: R, options?: { }) ) }, { concurrency: "unbounded", discard: true }), - Effect.ensuring(Queue.offer(queue, EOF)), + Effect.ensuring(mailbox.end), Effect.forkScoped ) ), - Effect.map(([_, queue]) => Stream.fromChannel(channelFromQueue(queue))), + Effect.map(([_, mailbox]) => Mailbox.toStream(mailbox)), Stream.unwrapScoped ) }