From 8bd94c5e093d8a71cfa78243ae5a18d6385f31c7 Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Thu, 8 Aug 2024 10:59:14 +0200 Subject: [PATCH] feat(sse): support bidirectional messaging (#66) --- docs/2.adapters/sse.md | 83 ++++++++++++++++++----- src/adapters/sse.ts | 127 +++++++++++++++++++++++++----------- src/utils.ts | 13 ++++ test/adapters/sse.test.ts | 30 +++++---- test/fixture/_index.html.ts | 44 +++++++++++-- test/fixture/sse.ts | 113 +++++++++++++++++++++++++------- 6 files changed, 312 insertions(+), 98 deletions(-) diff --git a/docs/2.adapters/sse.md b/docs/2.adapters/sse.md index 09aff81..8116607 100644 --- a/docs/2.adapters/sse.md +++ b/docs/2.adapters/sse.md @@ -1,59 +1,108 @@ --- -icon: oui:token-event +icon: clarity:two-way-arrows-line --- # SSE -> Integrate crossws with [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events). +> Integrate crossws with server-sent events and fetch-api. -If your deployment server is incapable of of handling WebSocket upgrades but support standard web API ([`Request`](https://developer.mozilla.org/en-US/docs/Web/API/Request) and [`Response`](https://developer.mozilla.org/en-US/docs/Web/API/Response)) you can integrate crossws to act as a one way (server to client) handler using [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events). +If your deployment target does not supports handling WebSocket upgrades, crossws SSE adapter allows to add integration based on web platform standards ([`fetch`](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) and [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)) > [!IMPORTANT] -> This is an experimental adapter and works only with a limited subset of crossws functionalities. +> This is an experimental adapter, requires server support and a different way of connection from clients. -> [!IMPORTANT] -> Instead of [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) client you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client to connect such server. +> [!NOTE] +> HTTP/2 server support is recommended in order to increase browser limitations about number of SSE connections (from 6 to 100) and also to allow bidirectional messaging with streaming. + +## Usage + +### Server side + +Define adapter: ```ts import sseAdapter from "crossws/adapters/sse"; -const sse = sseAdapter({ +const ws = sseAdapter({ + bidir: true, // Enable bidirectional messaging support hooks: { upgrade(request) { - // Handle upgrade logic - // You can return a custom response to abort - // You can return { headers } to override default headers + // In case of bidirectional mode, extra auth is recommended based on request + // You can return a new Response() instead to abort + return { + headers: {}, + }; }, open(peer) { // Use this hook to send messages to peer - peer.send("hello!"); + peer.send(`Welcome ${peer}`); + }, + message(peer, message) { + // Accepting messages from peer (bidirectional mode) + console.log(`Message from ${peer}: ${message}`); // Message from : ping }, }, }); ``` -Inside your Web compatible server handler: +Inside your web server handler: ```js async fetch(request) { const url = new URL(request.url) // Handle SSE - if (url.pathname === "/sse" && request.headers.get("accept") === "text/event-stream") { - return sse.fetch(request); + if (url.pathname === "/sse") { + return ws.fetch(request); } - return new Response("server is up!") + return new Response("default page") } ``` -In order to connect to the server, you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client: +### Client side + +In order to receive messages from server, you need to use an [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) client. + +In order to send messages to the server make sure `bdir: true` option is enabled on the server, then you need to first wait for `crosswd-id` to get the peer id associated with connection and then use fetch calls to send messages to the server. You can optionally use a stream to send multiple messages to the server via single connection similar to WebSockets. + +> [!NOTE] +> In theory, it is possible to bidirectional communication on a single HTTP/2 connection, however, due to a [current limitation in fetch standard](https://github.com/whatwg/fetch/issues/1254) we need 2 connections, one for receiving messages and one for sending. ```js const ev = new EventSource("http:///sse"); ev.addEventListener("message", (event) => { - console.log(event.data); // hello! + // Listen for messages from server + console.log("Message:", event.data); // Welcome ! +}); + +ev.addEventListener("crossws-id", (event) => { + // Using peer id we can send messages to the server + const peerId = event.id; + + // Method 1: Send each message with a separated fetch call + fetch(url, { + method: "POST", + headers: { "x-crossws-id": peerId }, + body: "ping", // message + }); + + // Method 2: Using body stream to send multiple messages (requires HTTP/2 + TLS) + fetch(url, { + method: "POST", + duplex: "half", + headers: { + "content-type": "application/octet-stream", + "x-crossws-id": peerId, + }, + body: new ReadableStream({ + start(controller) { + // You can send multiple messages to the server with single connection + controller.enqueue("ping"); + }, + }).pipeThrough(new TextEncoderStream()), + }); }); ``` diff --git a/src/adapters/sse.ts b/src/adapters/sse.ts index 27a955e..70fcae9 100644 --- a/src/adapters/sse.ts +++ b/src/adapters/sse.ts @@ -1,7 +1,8 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; -import { toBufferLike } from "../utils.ts"; +import { toString } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { AdapterHookable } from "../hooks.ts"; +import { Message } from "../message.ts"; import { Peer } from "../peer.ts"; // --- types --- @@ -10,43 +11,86 @@ export interface SSEAdapter extends AdapterInstance { fetch(req: Request): Promise; } -export interface SSEOptions extends AdapterOptions {} +export interface SSEOptions extends AdapterOptions { + bidir?: boolean; +} // --- adapter --- // https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events -export default defineWebSocketAdapter( - (options = {}) => { - const hooks = new AdapterHookable(options); - const peers = new Set(); - - return { - ...adapterUtils(peers), - fetch: async (request: Request) => { - const _res = await hooks.callHook("upgrade", request); - if (_res instanceof Response) { - return _res; - } +export default defineWebSocketAdapter((opts = {}) => { + const hooks = new AdapterHookable(opts); + const peers = new Set(); + const peersMap = opts.bidir ? new Map() : undefined; + + return { + ...adapterUtils(peers), + fetch: async (request: Request) => { + const _res = await hooks.callHook("upgrade", request); + if (_res instanceof Response) { + return _res; + } - const peer = new SSEPeer({ peers, sse: { request, hooks } }); - - let headers: HeadersInit = { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - Connection: "keep-alive", - }; - if (_res?.headers) { - headers = new Headers(headers); - for (const [key, value] of new Headers(_res.headers)) { - headers.set(key, value); + let peer: SSEPeer; + + if (opts.bidir && request.body && request.headers.has("x-crossws-id")) { + // Accept bidirectional streaming request + const id = request.headers.get("x-crossws-id")!; + peer = peersMap?.get(id) as SSEPeer; + if (!peer) { + return new Response("invalid peer id", { status: 400 }); + } + const stream = request.body.pipeThrough(new TextDecoderStream()); + try { + for await (const chunk of stream) { + hooks.callHook("message", peer, new Message(chunk)); } + } catch { + await stream.cancel().catch(() => {}); } + // eslint-disable-next-line unicorn/no-null + return new Response(null, {}); + } else { + // Add a new peer + peer = new SSEPeer({ + peers, + sse: { + request, + hooks, + onClose: () => { + peers.delete(peer); + if (opts.bidir) { + peersMap!.delete(peer.id); + } + }, + }, + }); + peers.add(peer); + if (opts.bidir) { + peersMap!.set(peer.id, peer); + peer._sendEvent("crossws-id", peer.id); + } + } - return new Response(peer._sseStream, { ..._res, headers }); - }, - }; - }, -); + let headers: HeadersInit = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }; + if (opts.bidir) { + headers["x-crossws-id"] = peer.id; + } + if (_res?.headers) { + headers = new Headers(headers); + for (const [key, value] of new Headers(_res.headers)) { + headers.set(key, value); + } + } + + return new Response(peer._sseStream, { ..._res, headers }); + }, + }; +}); // --- peer --- @@ -55,10 +99,12 @@ class SSEPeer extends Peer<{ sse: { request: Request; hooks: AdapterHookable; + onClose: (peer: SSEPeer) => void; }; }> { _sseStream: ReadableStream; _sseStreamController?: ReadableStreamDefaultController; + constructor(internal: SSEPeer["_internal"]) { super(internal); this._sseStream = new ReadableStream({ @@ -67,9 +113,10 @@ class SSEPeer extends Peer<{ this._internal.sse.hooks.callHook("open", this); }, cancel: () => { + this._internal.sse.onClose(this); this._internal.sse.hooks.callHook("close", this); }, - }); + }).pipeThrough(new TextEncoderStream()); } get url() { @@ -80,21 +127,23 @@ class SSEPeer extends Peer<{ return this._internal.sse.request.headers; } + _sendEvent(event: string, data: string) { + const lines = data.split("\n"); + this._sseStreamController?.enqueue( + `event: ${event}\n${lines.map((l) => `data: ${l}`)}\n\n`, + ); + } + send(message: any) { - let data = toBufferLike(message); - if (typeof data !== "string") { - // eslint-disable-next-line unicorn/prefer-code-point - data = btoa(String.fromCharCode(...new Uint8Array(data))); - } - this._sseStreamController?.enqueue(`event: message\ndata: ${data}\n\n`); + this._sendEvent("message", toString(message)); return 0; } publish(topic: string, message: any) { - const data = toBufferLike(message); + const data = toString(message); for (const peer of this._internal.peers) { if (peer !== this && peer._topics.has(topic)) { - peer._sseStreamController?.enqueue(data); + peer._sendEvent("message", data); } } } diff --git a/src/utils.ts b/src/utils.ts index d7f0b44..2b192f8 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -30,6 +30,19 @@ export function toBufferLike(val: any): BufferLike { return val; } +export function toString(val: any): string { + if (typeof val === "string") { + return val; + } + const data = toBufferLike(val); + if (typeof data === "string") { + return data; + } + // eslint-disable-next-line unicorn/prefer-code-point + const base64 = btoa(String.fromCharCode(...new Uint8Array(data))); + return `data:application/octet-stream;base64,${base64}`; +} + // Forked from sindresorhus/is-plain-obj (MIT) // Copyright (c) Sindre Sorhus (https://sindresorhus.com) // From https://github.com/unjs/defu/blob/main/src/_utils.ts diff --git a/test/adapters/sse.test.ts b/test/adapters/sse.test.ts index 3b5fabf..4a649cf 100644 --- a/test/adapters/sse.test.ts +++ b/test/adapters/sse.test.ts @@ -3,18 +3,22 @@ import { wsTestsExec } from "../_utils"; import EventSource from "eventsource"; describe("sse", () => { - wsTestsExec("bun run ./sse.ts", { adapter: "sse" }, (getURL, opts) => { - test("connects to the server", async () => { - const url = getURL().replace("ws", "http"); - const ev = new EventSource(url); - const messages: string[] = []; - ev.addEventListener("message", (event) => { - messages.push(event.data); + wsTestsExec( + "deno run --unstable-byonm -A ./sse.ts", + { adapter: "sse" }, + (getURL) => { + test("connects to the server", async () => { + const url = getURL().replace("ws", "http") + "_sse"; + const ev = new EventSource(url); + const messages: string[] = []; + ev.addEventListener("message", (event) => { + messages.push(event.data); + }); + await new Promise((resolve) => ev.addEventListener("open", resolve)); + ev.close(); + expect(messages[0]).toMatch(/Welcome to the server \w+/); + expect(messages.length).toBe(1); }); - await new Promise((resolve) => ev.addEventListener("open", resolve)); - ev.close(); - expect(messages[0]).toMatch(/Welcome to the server \w+/); - expect(messages.length).toBe(1); - }); - }); + }, + ); }); diff --git a/test/fixture/_index.html.ts b/test/fixture/_index.html.ts index 1651196..8f87bbe 100644 --- a/test/fixture/_index.html.ts +++ b/test/fixture/_index.html.ts @@ -60,6 +60,8 @@ export default function indexTemplate(opts: { sse?: boolean } = {}) { format(); }; + let _send = () => {}; + let ws; const connectWS = async () => { const isSecure = location.protocol === "https:"; @@ -67,6 +69,7 @@ export default function indexTemplate(opts: { sse?: boolean } = {}) { if (ws) { log("ws", "Closing previous connection before reconnecting..."); ws.close(); + _send = () => {}; clear(); } @@ -85,24 +88,55 @@ export default function indexTemplate(opts: { sse?: boolean } = {}) { }); await new Promise((resolve) => ws.addEventListener("open", resolve)); + _send = (message) => ws.send(message); log("ws", "Connected!"); }; let sse; const connectSSE = async () => { - const url = "/sse"; + const url = "/_sse"; if (sse) { log("sse", "Closing previous connection before reconnecting..."); sse.close(); clear(); + send = () => {}; } log("sse", "Connecting to", url, "..."); sse = new EventSource(url); + sse.addEventListener("crossws-id", (event) => { + const peerId = event.data; + + const sendWithFetch = _send = (message) => fetch(url, { + method: 'POST', + headers: { 'x-crossws-id': peerId }, + body: message, + }).catch((error) => { + log("sse", "Cannot send message:", error); + }); + + fetch(url, { + method: 'POST', + duplex: 'half', + headers: { + 'content-type': 'application/octet-stream', + 'x-crossws-id': event.data + }, + body: new ReadableStream({ + start(controller) { + _send = (message) => { + controller.enqueue(message); + } + }, + }).pipeThrough(new TextEncoderStream()), + }).catch((error) => { + _send = sendWithFetch; + }); + }); + sse.addEventListener("message", async (event) => { - console.log(event) - const data = typeof event.data === "string" ? event.data : await event.data.text(); + const data = event.data; const { user = "system", message = "" } = data.startsWith("{") ? JSON.parse(data) : { message: data }; @@ -125,14 +159,14 @@ export default function indexTemplate(opts: { sse?: boolean } = {}) { const send = () => { console.log("sending message..."); if (store.message) { - ws.send(store.message); + _send(store.message); } store.message = ""; }; const ping = () => { log("ws", "Sending ping"); - ws.send("ping"); + _send("ping"); }; createApp({ diff --git a/test/fixture/sse.ts b/test/fixture/sse.ts index 52f2e96..03f6798 100644 --- a/test/fixture/sse.ts +++ b/test/fixture/sse.ts @@ -1,26 +1,91 @@ // You can run this demo using `npm run play:sse` in repo -import sseAdapter from "../../src/adapters/sse"; -import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared"; - -const ws = createDemo(sseAdapter); - -Bun.serve({ - port: process.env.PORT || 3001, - hostname: "localhost", - async fetch(request) { - const response = handleDemoRoutes(ws, request); - if (response) { - return response; - } - - // Handle SSE - if (request.headers.get("accept") === "text/event-stream") { - return ws.fetch(request); - } - - return new Response(await getIndexHTML({ sse: true }), { - headers: { "Content-Type": "text/html" }, - }); - }, -}); +import sseAdapter from "../../src/adapters/sse.ts"; +import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared.ts"; + +const ws = createDemo(sseAdapter, { bidir: true }); + +const port = Number.parseInt(Deno.env.get("PORT") || "") || 3001; + +const handler = async (request: Request) => { + const response = handleDemoRoutes(ws, request); + if (response) { + return response; + } + + // Handle SSE + const url = new URL(request.url); + if (url.pathname === "/_sse") { + return ws.fetch(request); + } + + return new Response(await getIndexHTML({ sse: true }), { + headers: { "Content-Type": "text/html" }, + }); +}; + +// Start HTTP server +Deno.serve({ hostname: "localhost", port }, handler); + +// Start TLS server +Deno.serve({ hostname: "localhost", port: port + 1, ...getCert() }, handler); + +function getCert() { + return { + cert: `-----BEGIN CERTIFICATE----- +MIIEZzCCAs+gAwIBAgIQVyX6P9rDCswFmeOSj8BmPTANBgkqhkiG9w0BAQsFADCB +izEeMBwGA1UEChMVbWtjZXJ0IGRldmVsb3BtZW50IENBMTAwLgYDVQQLDCdwb295 +YUBQb295YXMtTGFwdG9wLmxvY2FsIChQb295YSBQYXJzYSkxNzA1BgNVBAMMLm1r +Y2VydCBwb295YUBQb295YXMtTGFwdG9wLmxvY2FsIChQb295YSBQYXJzYSkwHhcN +MjQwODA3MTM1NjU2WhcNMjYxMTA3MTQ1NjU2WjBbMScwJQYDVQQKEx5ta2NlcnQg +ZGV2ZWxvcG1lbnQgY2VydGlmaWNhdGUxMDAuBgNVBAsMJ3Bvb3lhQFBvb3lhcy1M +YXB0b3AubG9jYWwgKFBvb3lhIFBhcnNhKTCCASIwDQYJKoZIhvcNAQEBBQADggEP +ADCCAQoCggEBAMJmEyDqC8/JqJK95+rmVL+eHxcg0B7btm4j6T4Xw2ls0Wop+YOn +eJIPvSsmgo6JIWeTQ4c5oNt+KBSR8uJhkxg07qgZsgpmz4nRVk0/ctwF/eDlw0TE +0hAm/ZM7tossm2WbWJMJM9pVc3g8DYm8Y8N3aE7E12Kcc59oMMc7mpUkifUlsGT1 +lcvBevdbGcsBsN3sZnj1mCpG2x6GbhbY7knkv5uqwUb3PtGXUVQztZacxcKjUiCg +GausF0VJ2xnVp0kI62CZStF1vJFNL9KsIn89/ZwtoQErfqqHoWz/Xx3xVHft6FDB +T9y9XBRd2oI4GUAZwKmHltycWaXzY0ewnl0CAwEAAaN2MHQwDgYDVR0PAQH/BAQD +AgWgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMB8GA1UdIwQYMBaAFClgBKwJPM5UroN+ +mpjlJakNc+mPMCwGA1UdEQQlMCOCCWxvY2FsaG9zdIcQAAAAAAAAAAAAAAAAAAAA +AYcEfwAAATANBgkqhkiG9w0BAQsFAAOCAYEAET312O3GMspkjF/P0SlXdpMFzZXk +CgxQ5LIvXfaNnheXNYUrdl/XMbZwB1ejp0TmMgv2mOucPyTq7gLsPDEULXelzNQg +GStbST/4bh7TVyfh1oLUkPFz2cKMEkzt2xdHNgqAcUn8ioYVAkU2Hf+1mppdh125 +bZOr5Ya3FDowhWU7FOiZlC2WVvNJ+rPcZS4xAWeT9XeCgSuYqGcG63f0cGAP7Ann +XR/Z1KVaa6Zh4PUynZmuBwoIVjzTUzRdfwrDmxr7SpadcTvug2qnmJe7SHPlH9QP +6Fk4Y1quGUbTpb4KmcZcq4fLP4tZOULzB9CBiiTlwBJwg1D661XZj4jjuJ+8qYCj +oyM4gguV/P9rqzdMgBOtzeheP7so//x5//oPA5NwXB+2tPLPIihffVAiPI6yULE4 +baiAvBhaCAxCuAhAFlILCiJXDbuH4FW8cisH/e7sTLiW9AVlmjcjeIHvWaupobI0 +6ucGp7VL+LZTK2FkIXShtkL5/tzm6jLNpuPp +-----END CERTIFICATE----- +`, + key: `-----BEGIN PRIVATE KEY----- +MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDCZhMg6gvPyaiS +vefq5lS/nh8XINAe27ZuI+k+F8NpbNFqKfmDp3iSD70rJoKOiSFnk0OHOaDbfigU +kfLiYZMYNO6oGbIKZs+J0VZNP3LcBf3g5cNExNIQJv2TO7aLLJtlm1iTCTPaVXN4 +PA2JvGPDd2hOxNdinHOfaDDHO5qVJIn1JbBk9ZXLwXr3WxnLAbDd7GZ49ZgqRtse +hm4W2O5J5L+bqsFG9z7Rl1FUM7WWnMXCo1IgoBmrrBdFSdsZ1adJCOtgmUrRdbyR +TS/SrCJ/Pf2cLaEBK36qh6Fs/18d8VR37ehQwU/cvVwUXdqCOBlAGcCph5bcnFml +82NHsJ5dAgMBAAECggEANZ4p0H49W8ZnNHIksWluHpviP2LRhHFdU+ubvYCYaU+W +Qw3owCNE4iRtLKWmhOHV0NeRXI7Miz20mFfZAg+fnqGa4cqUjMHmpECU6SGC8KTG +bW1x+lm/Bq16a02g/6oCAnhiacuz/ZhsDNGjekX3zkX1AyTPs2crjOAT9B/Adznt +Xf3pGRp8FgBFSTEbFEW+Pqy0Z345LTAZ6UpvXJl3BPb5bJovcyyZg/Mpoe2tQMex +o/uuYE52u3aY1sX7dq0qGVT0aDGrSFVjcdra3tb+nmlF43FPfC3EQtmoyhmiUXmn +lodSZ2IijVWI532nvsXRVJaFN5WWHJykEc05cTAKkQKBgQDwJvM2a/xvH/+MWpTx +6gPW9MoN+UcALMvY43NYubYmSyzYnzbPjDaN+djrN/eR6WLhycn8MPMcc33ydu1L +p7UEOWdfWrhzZXnA8CXsKSs/KpNdfjfyjCYD5dGEUwMuzUW6pB8+fOoi4rdBDaSC +KZ9UjgedNJz0e23u81uK9sJJ/wKBgQDPOi6GHAuaHbtQbPwGmyGBP3uyAKLblOMW +Q9mwJPv7SQO475lVEuYgYNOQPgIHQqeWMlRm7mrkfi4yNnGaMy5HT1GW8KlyDBEd +zMw3zF+OrRklEFGGnH0PlBQoQ9CegutClZCP5nh8k7aYJqNgNt44K5Yp1AxfnXlp +Ta6D/Lp/owKBgGwcLLsgK9je380QbiLlhWr8cgWOZa8ne3EdG60ilWRxzTOLoUIX +zetmQYfKfzH5jeE1VS+p3Ze+SkGf1j2Ltwq6yNV9YrHYSdJYicnh0q0x/ntFOex/ +uRFiIUrfj/w+vphCECqyUzj3NSYc/ST3ldmbwsO7jrjk492BQoGxik+DAoGAXY6S ++pEm28mYi9LoZcMb+VJD8jU/UYuCisbPPSs1aFmqiJAD1djWdL/CRFj6aXS6XKEU +YfQ55jbhfGIAH/IDbsZsu4yjs42nHKEdggOPEMctlwIrDG8SNzpPb25OfYH13PXR +cmZG91dpFIA9Om8LHKjw/qlxfKmH6vbbV1N+j6kCgYBW5bWZJ/VTlWTF6safPEkz +/NaBIetAk5WDsLt9fz/+ZXWw4AoomMFv2rA4zHDde9tS7NsQMeTNTZHlwazY6/Ek +hksf1vkN0QSwyL+eQoFbjB+ZkHqizXNyIbV/4i+JU9gnSyVVWzlVRmlk98RjBQCe +NxWZH4tpKI9i42Uv2aNVPg== +-----END PRIVATE KEY-----`, + }; +}