diff --git a/docs/1.guide/5.pubsub.md b/docs/1.guide/5.pubsub.md index 54c19bb..b59c1a5 100644 --- a/docs/1.guide/5.pubsub.md +++ b/docs/1.guide/5.pubsub.md @@ -6,9 +6,6 @@ icon: simple-icons:googlepubsub CrossWS supports native pub-sub API integration. A [peer](/guide/peer) can be subscribed to a set of named channels using `peer.subscribe()`. Messages can be published to a channel using `peer.publish(, )`. -> [!IMPORTANT] -> Native pub/sub is currently only available for [Bun](/adapters/bun), [Node.js (uWebSockets)](/adapters/node#uwebsockets) and [Cloudflare (durable objects)](/adapters/cloudflare). - ```js import { defineHooks } from "crossws"; diff --git a/docs/2.adapters/cloudflare.md b/docs/2.adapters/cloudflare.md index ef08541..b23059a 100644 --- a/docs/2.adapters/cloudflare.md +++ b/docs/2.adapters/cloudflare.md @@ -8,6 +8,9 @@ icon: devicon-plain:cloudflareworkers To integrate CrossWS with your Cloudflare Workers, you need to check for the `upgrade` header. +> [!IMPORTANT] +> For [pub/sub](/guide/pubsub) support, you need to use [Durable objects](#durable-objects-support). + ```ts import wsAdapter from "crossws/adapters/cloudflare"; diff --git a/src/adapters/deno.ts b/src/adapters/deno.ts index af88f58..1c6b8c3 100644 --- a/src/adapters/deno.ts +++ b/src/adapters/deno.ts @@ -22,9 +22,14 @@ declare global { type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade; type ServeHandlerInfo = unknown; // TODO +type DenoWSSharedState = { + peers: Set; +}; + export default defineWebSocketAdapter( (options = {}) => { const crossws = new CrossWS(options); + const sharedState: DenoWSSharedState = { peers: new Set() }; return { handleUpgrade: async (request, info) => { const res = await crossws.callHook("upgrade", request); @@ -36,8 +41,9 @@ export default defineWebSocketAdapter( headers: res?.headers, }); const peer = new DenoPeer({ - deno: { ws: upgrade.socket, request, info }, + deno: { ws: upgrade.socket, request, info, sharedState }, }); + sharedState.peers.add(peer); upgrade.socket.addEventListener("open", () => { crossws.callAdapterHook("deno:open", peer); crossws.callHook("open", peer); @@ -47,10 +53,12 @@ export default defineWebSocketAdapter( crossws.callHook("message", peer, new Message(event.data)); }); upgrade.socket.addEventListener("close", () => { + sharedState.peers.delete(peer); crossws.callAdapterHook("deno:close", peer); crossws.callHook("close", peer, {}); }); upgrade.socket.addEventListener("error", (error) => { + sharedState.peers.delete(peer); crossws.callAdapterHook("deno:error", peer, error); crossws.callHook("error", peer, new WSError(error)); }); @@ -65,6 +73,7 @@ class DenoPeer extends Peer<{ ws: WebSocketUpgrade["socket"]; request: Request; info: ServeHandlerInfo; + sharedState: DenoWSSharedState; }; }> { get addr() { @@ -89,6 +98,15 @@ class DenoPeer extends Peer<{ return 0; } + publish(topic: string, message: any): void { + const data = toBufferLike(message); + for (const peer of this.ctx.deno.sharedState.peers) { + if (peer !== this && peer._topics.has(topic)) { + peer.send(data); + } + } + } + close(code?: number, reason?: string) { this.ctx.deno.ws.close(code, reason); } diff --git a/test/adapters/deno.test.ts b/test/adapters/deno.test.ts index cc2d0a4..0c8b19e 100644 --- a/test/adapters/deno.test.ts +++ b/test/adapters/deno.test.ts @@ -2,5 +2,5 @@ import { describe } from "vitest"; import { wsTestsExec } from "../_utils"; describe("deno", () => { - wsTestsExec("deno run -A ./deno.ts", { pubsub: false, resHeaders: false }); + wsTestsExec("deno run -A ./deno.ts", { resHeaders: false }); });