Skip to content

Commit

Permalink
feat(sse): support bidirectional messaging (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 authored Aug 8, 2024
1 parent 2e49cc3 commit 8bd94c5
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 98 deletions.
83 changes: 66 additions & 17 deletions docs/2.adapters/sse.md
Original file line number Diff line number Diff line change
@@ -1,59 +1,108 @@
---
icon: oui:token-event
icon: clarity:two-way-arrows-line
---

# SSE

> Integrate crossws with [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
> Integrate crossws with server-sent events and fetch-api.
If your deployment server is incapable of of handling WebSocket upgrades but support standard web API ([`Request`](https://developer.mozilla.org/en-US/docs/Web/API/Request) and [`Response`](https://developer.mozilla.org/en-US/docs/Web/API/Response)) you can integrate crossws to act as a one way (server to client) handler using [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
If your deployment target does not supports handling WebSocket upgrades, crossws SSE adapter allows to add integration based on web platform standards ([`fetch`](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) and [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource))

> [!IMPORTANT]
> This is an experimental adapter and works only with a limited subset of crossws functionalities.
> This is an experimental adapter, requires server support and a different way of connection from clients.
> [!IMPORTANT]
> Instead of [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) client you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client to connect such server.
> [!NOTE]
> HTTP/2 server support is recommended in order to increase browser limitations about number of SSE connections (from 6 to 100) and also to allow bidirectional messaging with streaming.
## Usage

### Server side

Define adapter:

```ts
import sseAdapter from "crossws/adapters/sse";

const sse = sseAdapter({
const ws = sseAdapter({
bidir: true, // Enable bidirectional messaging support
hooks: {
upgrade(request) {
// Handle upgrade logic
// You can return a custom response to abort
// You can return { headers } to override default headers
// In case of bidirectional mode, extra auth is recommended based on request
// You can return a new Response() instead to abort
return {
headers: {},
};
},
open(peer) {
// Use this hook to send messages to peer
peer.send("hello!");
peer.send(`Welcome ${peer}`);
},
message(peer, message) {
// Accepting messages from peer (bidirectional mode)
console.log(`Message from ${peer}: ${message}`); // Message from <id>: ping
},
},
});
```

Inside your Web compatible server handler:
Inside your web server handler:

```js
async fetch(request) {
const url = new URL(request.url)

// Handle SSE
if (url.pathname === "/sse" && request.headers.get("accept") === "text/event-stream") {
return sse.fetch(request);
if (url.pathname === "/sse") {
return ws.fetch(request);
}

return new Response("server is up!")
return new Response("default page")
}
```

In order to connect to the server, you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client:
### Client side

In order to receive messages from server, you need to use an [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) client.

In order to send messages to the server make sure `bdir: true` option is enabled on the server, then you need to first wait for `crosswd-id` to get the peer id associated with connection and then use fetch calls to send messages to the server. You can optionally use a stream to send multiple messages to the server via single connection similar to WebSockets.

> [!NOTE]
> In theory, it is possible to bidirectional communication on a single HTTP/2 connection, however, due to a [current limitation in fetch standard](https://github.com/whatwg/fetch/issues/1254) we need 2 connections, one for receiving messages and one for sending.
```js
const ev = new EventSource("http://<server>/sse");

ev.addEventListener("message", (event) => {
console.log(event.data); // hello!
// Listen for messages from server
console.log("Message:", event.data); // Welcome <id>!
});

ev.addEventListener("crossws-id", (event) => {
// Using peer id we can send messages to the server
const peerId = event.id;

// Method 1: Send each message with a separated fetch call
fetch(url, {
method: "POST",
headers: { "x-crossws-id": peerId },
body: "ping", // message
});

// Method 2: Using body stream to send multiple messages (requires HTTP/2 + TLS)
fetch(url, {
method: "POST",
duplex: "half",
headers: {
"content-type": "application/octet-stream",
"x-crossws-id": peerId,
},
body: new ReadableStream({
start(controller) {
// You can send multiple messages to the server with single connection
controller.enqueue("ping");
},
}).pipeThrough(new TextEncoderStream()),
});
});
```

Expand Down
127 changes: 88 additions & 39 deletions src/adapters/sse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import { toBufferLike } from "../utils.ts";
import { toString } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts";
import { Peer } from "../peer.ts";

// --- types ---
Expand All @@ -10,43 +11,86 @@ export interface SSEAdapter extends AdapterInstance {
fetch(req: Request): Promise<Response>;
}

export interface SSEOptions extends AdapterOptions {}
export interface SSEOptions extends AdapterOptions {
bidir?: boolean;
}

// --- adapter ---

// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
export default defineWebSocketAdapter<SSEAdapter, SSEOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
const peers = new Set<SSEPeer>();

return {
...adapterUtils(peers),
fetch: async (request: Request) => {
const _res = await hooks.callHook("upgrade", request);
if (_res instanceof Response) {
return _res;
}
export default defineWebSocketAdapter<SSEAdapter, SSEOptions>((opts = {}) => {
const hooks = new AdapterHookable(opts);
const peers = new Set<SSEPeer>();
const peersMap = opts.bidir ? new Map<string, SSEPeer>() : undefined;

return {
...adapterUtils(peers),
fetch: async (request: Request) => {
const _res = await hooks.callHook("upgrade", request);
if (_res instanceof Response) {
return _res;
}

const peer = new SSEPeer({ peers, sse: { request, hooks } });

let headers: HeadersInit = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
if (_res?.headers) {
headers = new Headers(headers);
for (const [key, value] of new Headers(_res.headers)) {
headers.set(key, value);
let peer: SSEPeer;

if (opts.bidir && request.body && request.headers.has("x-crossws-id")) {
// Accept bidirectional streaming request
const id = request.headers.get("x-crossws-id")!;
peer = peersMap?.get(id) as SSEPeer;
if (!peer) {
return new Response("invalid peer id", { status: 400 });
}
const stream = request.body.pipeThrough(new TextDecoderStream());
try {
for await (const chunk of stream) {
hooks.callHook("message", peer, new Message(chunk));
}
} catch {
await stream.cancel().catch(() => {});
}
// eslint-disable-next-line unicorn/no-null
return new Response(null, {});
} else {
// Add a new peer
peer = new SSEPeer({
peers,
sse: {
request,
hooks,
onClose: () => {
peers.delete(peer);
if (opts.bidir) {
peersMap!.delete(peer.id);
}
},
},
});
peers.add(peer);
if (opts.bidir) {
peersMap!.set(peer.id, peer);
peer._sendEvent("crossws-id", peer.id);
}
}

return new Response(peer._sseStream, { ..._res, headers });
},
};
},
);
let headers: HeadersInit = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
if (opts.bidir) {
headers["x-crossws-id"] = peer.id;
}
if (_res?.headers) {
headers = new Headers(headers);
for (const [key, value] of new Headers(_res.headers)) {
headers.set(key, value);
}
}

return new Response(peer._sseStream, { ..._res, headers });
},
};
});

// --- peer ---

Expand All @@ -55,10 +99,12 @@ class SSEPeer extends Peer<{
sse: {
request: Request;
hooks: AdapterHookable;
onClose: (peer: SSEPeer) => void;
};
}> {
_sseStream: ReadableStream;
_sseStreamController?: ReadableStreamDefaultController;

constructor(internal: SSEPeer["_internal"]) {
super(internal);
this._sseStream = new ReadableStream({
Expand All @@ -67,9 +113,10 @@ class SSEPeer extends Peer<{
this._internal.sse.hooks.callHook("open", this);
},
cancel: () => {
this._internal.sse.onClose(this);
this._internal.sse.hooks.callHook("close", this);
},
});
}).pipeThrough(new TextEncoderStream());
}

get url() {
Expand All @@ -80,21 +127,23 @@ class SSEPeer extends Peer<{
return this._internal.sse.request.headers;
}

_sendEvent(event: string, data: string) {
const lines = data.split("\n");
this._sseStreamController?.enqueue(
`event: ${event}\n${lines.map((l) => `data: ${l}`)}\n\n`,
);
}

send(message: any) {
let data = toBufferLike(message);
if (typeof data !== "string") {
// eslint-disable-next-line unicorn/prefer-code-point
data = btoa(String.fromCharCode(...new Uint8Array(data)));
}
this._sseStreamController?.enqueue(`event: message\ndata: ${data}\n\n`);
this._sendEvent("message", toString(message));
return 0;
}

publish(topic: string, message: any) {
const data = toBufferLike(message);
const data = toString(message);
for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) {
peer._sseStreamController?.enqueue(data);
peer._sendEvent("message", data);
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ export function toBufferLike(val: any): BufferLike {
return val;
}

export function toString(val: any): string {
if (typeof val === "string") {
return val;
}
const data = toBufferLike(val);
if (typeof data === "string") {
return data;
}
// eslint-disable-next-line unicorn/prefer-code-point
const base64 = btoa(String.fromCharCode(...new Uint8Array(data)));
return `data:application/octet-stream;base64,${base64}`;
}

// Forked from sindresorhus/is-plain-obj (MIT)
// Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (https://sindresorhus.com)
// From https://github.com/unjs/defu/blob/main/src/_utils.ts
Expand Down
30 changes: 17 additions & 13 deletions test/adapters/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ import { wsTestsExec } from "../_utils";
import EventSource from "eventsource";

describe("sse", () => {
wsTestsExec("bun run ./sse.ts", { adapter: "sse" }, (getURL, opts) => {
test("connects to the server", async () => {
const url = getURL().replace("ws", "http");
const ev = new EventSource(url);
const messages: string[] = [];
ev.addEventListener("message", (event) => {
messages.push(event.data);
wsTestsExec(
"deno run --unstable-byonm -A ./sse.ts",
{ adapter: "sse" },
(getURL) => {
test("connects to the server", async () => {
const url = getURL().replace("ws", "http") + "_sse";
const ev = new EventSource(url);
const messages: string[] = [];
ev.addEventListener("message", (event) => {
messages.push(event.data);
});
await new Promise((resolve) => ev.addEventListener("open", resolve));
ev.close();
expect(messages[0]).toMatch(/Welcome to the server \w+/);
expect(messages.length).toBe(1);
});
await new Promise((resolve) => ev.addEventListener("open", resolve));
ev.close();
expect(messages[0]).toMatch(/Welcome to the server \w+/);
expect(messages.length).toBe(1);
});
});
},
);
});
Loading

0 comments on commit 8bd94c5

Please sign in to comment.