Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sse): support bidirectional messaging #66

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 66 additions & 17 deletions docs/2.adapters/sse.md
Original file line number Diff line number Diff line change
@@ -1,59 +1,108 @@
---
icon: oui:token-event
icon: clarity:two-way-arrows-line
---

# SSE

> Integrate crossws with [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
> Integrate crossws with server-sent events and fetch-api.

If your deployment server is incapable of of handling WebSocket upgrades but support standard web API ([`Request`](https://developer.mozilla.org/en-US/docs/Web/API/Request) and [`Response`](https://developer.mozilla.org/en-US/docs/Web/API/Response)) you can integrate crossws to act as a one way (server to client) handler using [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
If your deployment target does not supports handling WebSocket upgrades, crossws SSE adapter allows to add integration based on web platform standards ([`fetch`](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) and [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource))

> [!IMPORTANT]
> This is an experimental adapter and works only with a limited subset of crossws functionalities.
> This is an experimental adapter, requires server support and a different way of connection from clients.

> [!IMPORTANT]
> Instead of [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) client you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client to connect such server.
> [!NOTE]
> HTTP/2 server support is recommended in order to increase browser limitations about number of SSE connections (from 6 to 100) and also to allow bidirectional messaging with streaming.

## Usage

### Server side

Define adapter:

```ts
import sseAdapter from "crossws/adapters/sse";

const sse = sseAdapter({
const ws = sseAdapter({
bidir: true, // Enable bidirectional messaging support
hooks: {
upgrade(request) {
// Handle upgrade logic
// You can return a custom response to abort
// You can return { headers } to override default headers
// In case of bidirectional mode, extra auth is recommended based on request
// You can return a new Response() instead to abort
return {
headers: {},
};
},
open(peer) {
// Use this hook to send messages to peer
peer.send("hello!");
peer.send(`Welcome ${peer}`);
},
message(peer, message) {
// Accepting messages from peer (bidirectional mode)
console.log(`Message from ${peer}: ${message}`); // Message from <id>: ping
},
},
});
```

Inside your Web compatible server handler:
Inside your web server handler:

```js
async fetch(request) {
const url = new URL(request.url)

// Handle SSE
if (url.pathname === "/sse" && request.headers.get("accept") === "text/event-stream") {
return sse.fetch(request);
if (url.pathname === "/sse") {
return ws.fetch(request);
}

return new Response("server is up!")
return new Response("default page")
}
```

In order to connect to the server, you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client:
### Client side

In order to receive messages from server, you need to use an [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) client.

In order to send messages to the server make sure `bdir: true` option is enabled on the server, then you need to first wait for `crosswd-id` to get the peer id associated with connection and then use fetch calls to send messages to the server. You can optionally use a stream to send multiple messages to the server via single connection similar to WebSockets.

> [!NOTE]
> In theory, it is possible to bidirectional communication on a single HTTP/2 connection, however, due to a [current limitation in fetch standard](https://github.com/whatwg/fetch/issues/1254) we need 2 connections, one for receiving messages and one for sending.

```js
const ev = new EventSource("http://<server>/sse");

ev.addEventListener("message", (event) => {
console.log(event.data); // hello!
// Listen for messages from server
console.log("Message:", event.data); // Welcome <id>!
});

ev.addEventListener("crossws-id", (event) => {
// Using peer id we can send messages to the server
const peerId = event.id;

// Method 1: Send each message with a separated fetch call
fetch(url, {
method: "POST",
headers: { "x-crossws-id": peerId },
body: "ping", // message
});

// Method 2: Using body stream to send multiple messages (requires HTTP/2 + TLS)
fetch(url, {
method: "POST",
duplex: "half",
headers: {
"content-type": "application/octet-stream",
"x-crossws-id": peerId,
},
body: new ReadableStream({
start(controller) {
// You can send multiple messages to the server with single connection
controller.enqueue("ping");
},
}).pipeThrough(new TextEncoderStream()),
});
});
```

Expand Down
127 changes: 88 additions & 39 deletions src/adapters/sse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import { toBufferLike } from "../utils.ts";
import { toString } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts";
import { Peer } from "../peer.ts";

// --- types ---
Expand All @@ -10,43 +11,86 @@ export interface SSEAdapter extends AdapterInstance {
fetch(req: Request): Promise<Response>;
}

export interface SSEOptions extends AdapterOptions {}
export interface SSEOptions extends AdapterOptions {
bidir?: boolean;
}

// --- adapter ---

// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
export default defineWebSocketAdapter<SSEAdapter, SSEOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
const peers = new Set<SSEPeer>();

return {
...adapterUtils(peers),
fetch: async (request: Request) => {
const _res = await hooks.callHook("upgrade", request);
if (_res instanceof Response) {
return _res;
}
export default defineWebSocketAdapter<SSEAdapter, SSEOptions>((opts = {}) => {
const hooks = new AdapterHookable(opts);
const peers = new Set<SSEPeer>();
const peersMap = opts.bidir ? new Map<string, SSEPeer>() : undefined;

return {
...adapterUtils(peers),
fetch: async (request: Request) => {
const _res = await hooks.callHook("upgrade", request);
if (_res instanceof Response) {
return _res;
}

const peer = new SSEPeer({ peers, sse: { request, hooks } });

let headers: HeadersInit = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
if (_res?.headers) {
headers = new Headers(headers);
for (const [key, value] of new Headers(_res.headers)) {
headers.set(key, value);
let peer: SSEPeer;

if (opts.bidir && request.body && request.headers.has("x-crossws-id")) {
// Accept bidirectional streaming request
const id = request.headers.get("x-crossws-id")!;
peer = peersMap?.get(id) as SSEPeer;
if (!peer) {
return new Response("invalid peer id", { status: 400 });
}
const stream = request.body.pipeThrough(new TextDecoderStream());
try {
for await (const chunk of stream) {
hooks.callHook("message", peer, new Message(chunk));
}
} catch {
await stream.cancel().catch(() => {});
}
// eslint-disable-next-line unicorn/no-null
return new Response(null, {});
} else {
// Add a new peer
peer = new SSEPeer({
peers,
sse: {
request,
hooks,
onClose: () => {
peers.delete(peer);
if (opts.bidir) {
peersMap!.delete(peer.id);
}
},
},
});
peers.add(peer);
if (opts.bidir) {
peersMap!.set(peer.id, peer);
peer._sendEvent("crossws-id", peer.id);
}
}

return new Response(peer._sseStream, { ..._res, headers });
},
};
},
);
let headers: HeadersInit = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
if (opts.bidir) {
headers["x-crossws-id"] = peer.id;
}
if (_res?.headers) {
headers = new Headers(headers);
for (const [key, value] of new Headers(_res.headers)) {
headers.set(key, value);
}
}

return new Response(peer._sseStream, { ..._res, headers });
},
};
});

// --- peer ---

Expand All @@ -55,10 +99,12 @@ class SSEPeer extends Peer<{
sse: {
request: Request;
hooks: AdapterHookable;
onClose: (peer: SSEPeer) => void;
};
}> {
_sseStream: ReadableStream;
_sseStreamController?: ReadableStreamDefaultController;

constructor(internal: SSEPeer["_internal"]) {
super(internal);
this._sseStream = new ReadableStream({
Expand All @@ -67,9 +113,10 @@ class SSEPeer extends Peer<{
this._internal.sse.hooks.callHook("open", this);
},
cancel: () => {
this._internal.sse.onClose(this);
this._internal.sse.hooks.callHook("close", this);
},
});
}).pipeThrough(new TextEncoderStream());
}

get url() {
Expand All @@ -80,21 +127,23 @@ class SSEPeer extends Peer<{
return this._internal.sse.request.headers;
}

_sendEvent(event: string, data: string) {
const lines = data.split("\n");
this._sseStreamController?.enqueue(
`event: ${event}\n${lines.map((l) => `data: ${l}`)}\n\n`,
);
}

send(message: any) {
let data = toBufferLike(message);
if (typeof data !== "string") {
// eslint-disable-next-line unicorn/prefer-code-point
data = btoa(String.fromCharCode(...new Uint8Array(data)));
}
this._sseStreamController?.enqueue(`event: message\ndata: ${data}\n\n`);
this._sendEvent("message", toString(message));
return 0;
}

publish(topic: string, message: any) {
const data = toBufferLike(message);
const data = toString(message);
for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) {
peer._sseStreamController?.enqueue(data);
peer._sendEvent("message", data);
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ export function toBufferLike(val: any): BufferLike {
return val;
}

export function toString(val: any): string {
if (typeof val === "string") {
return val;
}
const data = toBufferLike(val);
if (typeof data === "string") {
return data;
}
// eslint-disable-next-line unicorn/prefer-code-point
const base64 = btoa(String.fromCharCode(...new Uint8Array(data)));
return `data:application/octet-stream;base64,${base64}`;
}

// Forked from sindresorhus/is-plain-obj (MIT)
// Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (https://sindresorhus.com)
// From https://github.com/unjs/defu/blob/main/src/_utils.ts
Expand Down
30 changes: 17 additions & 13 deletions test/adapters/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ import { wsTestsExec } from "../_utils";
import EventSource from "eventsource";

describe("sse", () => {
wsTestsExec("bun run ./sse.ts", { adapter: "sse" }, (getURL, opts) => {
test("connects to the server", async () => {
const url = getURL().replace("ws", "http");
const ev = new EventSource(url);
const messages: string[] = [];
ev.addEventListener("message", (event) => {
messages.push(event.data);
wsTestsExec(
"deno run --unstable-byonm -A ./sse.ts",
{ adapter: "sse" },
(getURL) => {
test("connects to the server", async () => {
const url = getURL().replace("ws", "http") + "_sse";
const ev = new EventSource(url);
const messages: string[] = [];
ev.addEventListener("message", (event) => {
messages.push(event.data);
});
await new Promise((resolve) => ev.addEventListener("open", resolve));
ev.close();
expect(messages[0]).toMatch(/Welcome to the server \w+/);
expect(messages.length).toBe(1);
});
await new Promise((resolve) => ev.addEventListener("open", resolve));
ev.close();
expect(messages[0]).toMatch(/Welcome to the server \w+/);
expect(messages.length).toBe(1);
});
});
},
);
});
Loading
Loading