Skip to content

Commit

Permalink
properly handle multiple ports in SharedWorker (#2468)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored and mikearnaldi committed Apr 16, 2024
1 parent 6c6087a commit 6460414
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 48 deletions.
8 changes: 8 additions & 0 deletions .changeset/eight-birds-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@effect/platform-browser": patch
"@effect/platform-node": patch
"@effect/platform-bun": patch
"@effect/platform": patch
---

properly handle multiple ports in SharedWorker
97 changes: 68 additions & 29 deletions packages/platform-browser/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,47 @@ import { WorkerError } from "@effect/platform/WorkerError"
import * as Runner from "@effect/platform/WorkerRunner"
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
import * as FiberSet from "effect/FiberSet"
import { globalValue } from "effect/GlobalValue"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
import * as Schedule from "effect/Schedule"

const cachedPorts = globalValue("@effect/platform-browser/Worker/cachedPorts", () => new Set<MessagePort>())
function globalHandleConnect(event: MessageEvent) {
cachedPorts.add((event as MessageEvent).ports[0])
}
if ("onconnect" in self) {
self.onconnect = globalHandleConnect
}

const platformRunnerImpl = Runner.PlatformRunner.of({
[Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
start<I, O>(shutdown: Effect.Effect<void>) {
return Effect.gen(function*(_) {
const port = "postMessage" in self ?
self :
(yield* _(Effect.async<MessagePort, never, never>((resume, signal) => {
self.addEventListener("connect", function(event) {
const port = (event as MessageEvent).ports[0]
port.start()
resume(Effect.succeed(port))
}, { once: true, signal })
})))
const queue = yield* _(Queue.unbounded<I>())
yield* _(
let currentPortId = 0

const queue = yield* _(Queue.unbounded<readonly [portId: number, message: I]>())
const runFork = yield* _(FiberSet.makeRuntime<never>())
const ports = new Map<number, MessagePort>()
const send = (portId: number, message: O, transfer?: ReadonlyArray<unknown>) =>
Effect.sync(() => {
ports.get(portId)?.postMessage([1, message], {
transfer: transfer as any
})
})

function handlePort(port: MessagePort, sharedWorker: boolean) {
const portId = currentPortId++
ports.set(portId, port)

Effect.async<never, WorkerError, never>((resume) => {
function onMessage(event: MessageEvent) {
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
if (message[0] === 0) {
queue.unsafeOffer(message[1])
queue.unsafeOffer([portId, message[1]])
} else if (sharedWorker) {
resume(Effect.interrupt)
} else {
Effect.runFork(shutdown)
}
Expand All @@ -39,29 +56,51 @@ const platformRunnerImpl = Runner.PlatformRunner.of({
port.addEventListener("message", onMessage as any)
port.addEventListener("messageerror", onMessageError as any)
port.addEventListener("error", onError as any)

// ready
if ("start" in port) {
port.start()
}
port.postMessage([0])

return Effect.sync(() => {
port.removeEventListener("message", onMessage as any)
port.removeEventListener("messageerror", onMessageError as any)
port.removeEventListener("error", onError as any)
})
}),
Effect.tapErrorCause((cause) => Cause.isInterruptedOnly(cause) ? Effect.unit : Effect.logDebug(cause)),
Effect.retry(Schedule.forever),
Effect.annotateLogs({
package: "@effect/platform-browser",
module: "WorkerRunner"
}),
Effect.interruptible,
Effect.forkScoped
)
const send = (message: O, transfer?: ReadonlyArray<unknown>) =>
Effect.sync(() =>
port.postMessage([1, message], {
transfer: transfer as any
})
}).pipe(
Effect.tapErrorCause((cause) => Cause.isInterruptedOnly(cause) ? Effect.unit : Effect.logDebug(cause)),
Effect.retry(Schedule.forever),
Effect.annotateLogs({
package: "@effect/platform-browser",
module: "WorkerRunner"
}),
Effect.ensuring(Effect.sync(() => {
ports.delete(portId)
})),
Effect.interruptible,
runFork
)
// ready
port.postMessage([0])
}

if ("onconnect" in self) {
self.onconnect = function(event: MessageEvent) {
const port = (event as MessageEvent).ports[0]
handlePort(port, true)
}
yield* _(Effect.addFinalizer(() =>
Effect.sync(() => {
;(self as any).onconnect = globalHandleConnect
})
))
for (const port of cachedPorts) {
handlePort(port, true)
}
cachedPorts.clear()
} else {
handlePort(self as any, false)
}

return { queue, send }
})
}
Expand Down
6 changes: 3 additions & 3 deletions packages/platform-bun/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ const platformRunnerImpl = Runner.PlatformRunner.of({
return yield* _(Effect.die("not in a worker"))
}
const port = self
const queue = yield* _(Queue.unbounded<I>())
const queue = yield* _(Queue.unbounded<readonly [portId: number, message: I]>())
yield* _(
Effect.async<never, WorkerError>((resume) => {
function onMessage(event: MessageEvent) {
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
if (message[0] === 0) {
queue.unsafeOffer(message[1])
queue.unsafeOffer([0, message[1]])
} else {
Effect.runFork(shutdown)
}
Expand All @@ -46,7 +46,7 @@ const platformRunnerImpl = Runner.PlatformRunner.of({
Effect.interruptible,
Effect.forkScoped
)
const send = (message: O, transfer?: ReadonlyArray<unknown>) =>
const send = (_portId: number, message: O, transfer?: ReadonlyArray<unknown>) =>
Effect.sync(() =>
port.postMessage([1, message], {
transfer: transfer as any
Expand Down
6 changes: 3 additions & 3 deletions packages/platform-node/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ const platformRunnerImpl = Runner.PlatformRunner.of({
return yield* _(new WorkerError({ reason: "spawn", error: new Error("not in worker") }))
}
const port = WorkerThreads.parentPort
const queue = yield* _(Queue.unbounded<I>())
const queue = yield* _(Queue.unbounded<readonly [portId: number, message: I]>())
yield* _(
Effect.async<never, WorkerError>((resume) => {
port.on("message", (message: Runner.BackingRunner.Message<I>) => {
if (message[0] === 0) {
queue.unsafeOffer(message[1])
queue.unsafeOffer([0, message[1]])
} else {
Effect.runFork(shutdown)
}
Expand All @@ -41,7 +41,7 @@ const platformRunnerImpl = Runner.PlatformRunner.of({
Effect.interruptible,
Effect.forkScoped
)
const send = (message: O, transfers?: ReadonlyArray<unknown>) =>
const send = (_portId: number, message: O, transfers?: ReadonlyArray<unknown>) =>
Effect.sync(() => port.postMessage([1, message], transfers as any))
// ready
port.postMessage([0])
Expand Down
3 changes: 2 additions & 1 deletion packages/platform/src/WorkerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import type { WorkerError } from "./WorkerError.js"
* @category models
*/
export interface BackingRunner<I, O> {
readonly queue: Queue.Dequeue<I>
readonly queue: Queue.Dequeue<readonly [portId: number, message: I]>
readonly send: (
portId: number,
message: O,
transfers?: ReadonlyArray<unknown>
) => Effect.Effect<void>
Expand Down
28 changes: 16 additions & 12 deletions packages/platform/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ export const make = <I, R, E, O>(
yield* _(
Queue.take(backing.queue),
options?.decode ?
Effect.flatMap((req): Effect.Effect<Worker.Worker.Request<I>, WorkerError> => {
Effect.flatMap((msg): Effect.Effect<readonly [portId: number, Worker.Worker.Request<I>], WorkerError> => {
const req = msg[1]
if (req[1] === 1) {
return Effect.succeed(req)
return Effect.succeed(msg)
}

return Effect.map(options.decode!(req[2]), (data) => [req[0], req[1], data, req[3]])
return Effect.map(options.decode!(req[2]), (data) => [msg[0], [req[0], req[1], data, req[3]]])
}) :
identity,
Effect.tap((req) => {
Effect.tap(([portId, req]) => {
const id = req[0]
if (req[1] === 1) {
const fiber = fiberMap.get(id)
Expand All @@ -79,7 +80,7 @@ export const make = <I, R, E, O>(
? Effect.provideService(options.encodeOutput(req[2], data), Transferable.Collector, collector)
: Effect.succeed(data),
Effect.flatMap((payload) =>
backing.send([id, 0, [payload]], [
backing.send(portId, [id, 0, [payload]], [
...transfers,
...collector.unsafeRead()
])
Expand All @@ -93,7 +94,7 @@ export const make = <I, R, E, O>(
if (options?.encodeOutput === undefined) {
const payload = Chunk.toReadonlyArray(data)
const transfers = options?.transfers ? payload.flatMap(options.transfers) : undefined
return backing.send([id, 0, payload], transfers)
return backing.send(portId, [id, 0, payload], transfers)
}

const transfers: Array<unknown> = []
Expand All @@ -110,12 +111,12 @@ export const make = <I, R, E, O>(
Effect.provideService(Transferable.Collector, collector),
Effect.flatMap((payload) => {
collector.unsafeRead().forEach((transfer) => transfers.push(transfer))
return backing.send([id, 0, payload], transfers)
return backing.send(portId, [id, 0, payload], transfers)
})
)
}),
Stream.runDrain,
Effect.andThen(backing.send([id, 1]))
Effect.andThen(backing.send(portId, [id, 1]))
)

if (req[3]) {
Expand All @@ -131,7 +132,8 @@ export const make = <I, R, E, O>(

return effect
}),
Effect.catchIf(isWorkerError, (error) => backing.send([id, 3, WorkerError.encodeCause(Cause.fail(error))])),
Effect.catchIf(isWorkerError, (error) =>
backing.send(portId, [id, 3, WorkerError.encodeCause(Cause.fail(error))])),
Effect.catchAllCause((cause) =>
Either.match(Cause.failureOrCause(cause), {
onLeft: (error) => {
Expand All @@ -146,15 +148,17 @@ export const make = <I, R, E, O>(
)
: Effect.succeed(error),
Effect.flatMap((payload) =>
backing.send([id, 2, payload as any], [
backing.send(portId, [id, 2, payload as any], [
...transfers,
...collector.unsafeRead()
])
),
Effect.catchAllCause((cause) => backing.send([id, 3, WorkerError.encodeCause(cause)]))
Effect.catchAllCause((cause) =>
backing.send(portId, [id, 3, WorkerError.encodeCause(cause)])
)
)
},
onRight: (cause) => backing.send([id, 3, WorkerError.encodeCause(cause)])
onRight: (cause) => backing.send(portId, [id, 3, WorkerError.encodeCause(cause)])
})
),
Effect.ensuring(Effect.sync(() => fiberMap.delete(id))),
Expand Down

0 comments on commit 6460414

Please sign in to comment.