Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cloudflare durable objects support #54

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/1.guide/5.pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ icon: simple-icons:googlepubsub
CrossWS supports native pub-sub API integration. A [peer](/guide/peer) can be subscribed to a set of named channels using `peer.subscribe(<name>)`. Messages can be published to a channel using `peer.publish(<name>, <message>)`.

> [!IMPORTANT]
> Native pub/sub is currently only available for [Bun](/adapters/bun) and [Node.js](/adapters/node#uwebsockets).
> Native pub/sub is currently only available for [Bun](/adapters/bun), [Node.js (uWebSockets)](/adapters/node#uwebsockets) and [Cloudflare (durable objects)](/adapters/cloudflare).

```js
import { defineHooks } from "crossws";
Expand Down
67 changes: 65 additions & 2 deletions docs/2.adapters/cloudflare.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ To integrate CrossWS with your Cloudflare Workers, you need to check for the `up
```ts
import wsAdapter from "crossws/adapters/cloudflare";

const { handleUpgrade } = wsAdapter({
const ws = wsAdapter({
hooks: {
message: console.log,
},
Expand All @@ -20,7 +20,7 @@ const { handleUpgrade } = wsAdapter({
export default {
async fetch(request, env, context) {
if (request.headers.get("upgrade") === "websocket") {
return handleUpgrade(request, env, context);
return ws.handleUpgrade(request, env, context);
}
return new Response(
`<script>new WebSocket("ws://localhost:3000").addEventListener("open", (e) => e.target.send("Hello from client!"));</script>`,
Expand All @@ -40,3 +40,66 @@ export default {
::read-more
See [`playground/cloudflare.ts`](https://github.com/unjs/crossws/tree/main/playground/cloudflare.ts) for demo and [`src/adapters/cloudflare.ts`](https://github.com/unjs/crossws/tree/main/src/adapters/cloudflare.ts) for implementation.
::

## Durable objects support

To integrate CrossWS with Cloudflare [Durable Objects](https://developers.cloudflare.com/durable-objects/api/websockets/) (available on paid plans) with pub/sub and hibernation support, you need to check for the `upgrade` header and additionally export a Durable object with crossws adapter hooks integrated.

```js
import { DurableObject } from "cloudflare:workers";
import wsAdapter from "crossws/adapters/cloudflare-durable";

const ws = wsAdapter({
// bindingName: "$DurableObject",
// instanceName: "crossws",
hooks: {
message: console.log,
open(peer) {
peer.subscribe("chat");
peer.publish("chat", { user: "server", message: `${peer} joined!` });
},
},
});

export default {
async fetch(request, env, context) {
if (request.headers.get("upgrade") === "websocket") {
return ws.handleUpgrade(request, env, context);
}
return new Response(
`<script>new WebSocket("ws://localhost:3000").addEventListener("open", (e) => e.target.send("Hello from client!"));</script>`,
{ headers: { "content-type": "text/html" } },
);
},
};

export class $DurableObject extends DurableObject {
fetch(request) {
return ws.handleDurableUpgrade(this, request);
}

async webSocketMessage(client, message) {
return ws.handleDurableMessage(this, client, message);
}

async webSocketClose(client, code, reason, wasClean) {
return ws.handleDurableClose(this, client, code, reason, wasClean);
}
}
```

Update your `wrangler.toml` to specify Durable object:

```ini
[[durable_objects.bindings]]
name = "$DurableObject"
class_name = "$DurableObject"

[[migrations]]
tag = "v1"
new_classes = ["$DurableObject"]
```

::read-more
See [`playground/cloudflare-durable.ts`](https://github.com/unjs/crossws/tree/main/playground/cloudflare-durable.ts) for demo and [`src/adapters/cloudflare-durable.ts`](https://github.com/unjs/crossws/tree/main/src/adapters/cloudflare-durable.ts) for implementation.
::
8 changes: 0 additions & 8 deletions playground/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,3 @@ export function createDemo<T extends Adapter<any, any>>(
// resolve,
});
}

function stringify(val) {
const str = val.toString();
if (str === "[object Object]") {
return val.constructor?.name || "??";
}
return str;
}
42 changes: 42 additions & 0 deletions playground/cloudflare-durable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// You can run this demo using `npm run play:cf-durable` in repo
import type * as CF from "@cloudflare/workers-types";
import { DurableObject } from "cloudflare:workers";
import cloudflareAdapter from "../src/adapters/cloudflare-durable.ts";
import { createDemo, getIndexHTML } from "./_shared.ts";

const ws = createDemo(cloudflareAdapter);

export default {
async fetch(
request: Request,
env: Record<string, any>,
context: ExecutionContext,
): Promise<Response> {
if (request.headers.get("upgrade") === "websocket") {
return ws.handleUpgrade(request, env, context);
}

return new Response(await getIndexHTML(), {
headers: { "content-type": "text/html" },
});
},
};

export class $DurableObject extends DurableObject {
fetch(request: Request) {
return ws.handleDurableUpgrade(this, request);
}

async webSocketMessage(client: WebSocket, message: ArrayBuffer | string) {
return ws.handleDurableMessage(this, client, message);
}

async webSocketClose(
client: WebSocket,
code: number,
reason: string,
wasClean: boolean,
) {
return ws.handleDurableClose(this, client, code, reason, wasClean);
}
}
184 changes: 184 additions & 0 deletions src/adapters/cloudflare-durable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import type * as CF from "@cloudflare/workers-types";
import type { DurableObject } from "cloudflare:workers";
import { AdapterOptions, defineWebSocketAdapter } from "../types";
import { Peer } from "../peer";
import { Message } from "../message";
import { createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

declare class DurableObjectPub extends DurableObject {
public ctx: DurableObject["ctx"];
public env: unknown;
}

type CrosswsState = {
topics?: Set<string>;
};

export interface CloudflareDurableAdapter {
handleUpgrade(
req: Request | CF.Request,
env: unknown,
context: CF.ExecutionContext,
): Promise<Response>;

handleDurableUpgrade(
obj: DurableObject,
req: Request | CF.Request,
): Promise<Response>;

handleDurableMessage(
obj: DurableObject,
ws: WebSocket | CF.WebSocket,
message: ArrayBuffer | string,
): Promise<void>;

handleDurableClose(
obj: DurableObject,
ws: WebSocket | CF.WebSocket,
code: number,
reason: string,
wasClean: boolean,
): Promise<void>;
}

export interface CloudflareOptions extends AdapterOptions {
bindingName?: string;
instanceName?: string;
}

export default defineWebSocketAdapter<
CloudflareDurableAdapter,
CloudflareOptions
>((opts) => {
const crossws = createCrossWS(opts);

const handleUpgrade: CloudflareDurableAdapter["handleUpgrade"] = async (
req,
env,
_context,
) => {
const bindingName = opts?.bindingName ?? "$DurableObject";
const instanceName = opts?.instanceName ?? "crossws";
const binding = (env as any)[bindingName] as CF.DurableObjectNamespace;
const id = binding.idFromName(instanceName);
const stub = binding.get(id);
return stub.fetch(req as CF.Request) as unknown as Response;
};

const handleDurableUpgrade: CloudflareDurableAdapter["handleDurableUpgrade"] =
async (obj, req) => {
const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];
const peer = peerFromDurableEvent(obj, server);

const { headers } = await crossws.upgrade(peer);

(obj as DurableObjectPub).ctx.acceptWebSocket(server);

crossws.$callHook("cloudflare:accept", peer);
crossws.callHook("open", peer);

// eslint-disable-next-line unicorn/no-null
return new Response(null, {
status: 101,
webSocket: client,
headers,
});
};

const handleDurableMessage: CloudflareDurableAdapter["handleDurableMessage"] =
async (obj, ws, message) => {
const peer = peerFromDurableEvent(obj, ws);
crossws.$callHook("cloudflare:message", peer, message);
crossws.callHook("message", peer, new Message(message));
};

const handleDurableClose: CloudflareDurableAdapter["handleDurableClose"] =
async (obj, ws, code, reason, wasClean) => {
const peer = peerFromDurableEvent(obj, ws);
const details = { code, reason, wasClean };
crossws.$callHook("cloudflare:close", peer, details);
crossws.callHook("close", peer, details);
ws.close(code, reason);
};

return {
handleUpgrade,
handleDurableUpgrade,
handleDurableClose,
handleDurableMessage,
};
});

function peerFromDurableEvent(
obj: DurableObject,
ws: WebSocket | CF.WebSocket,
) {
return new CloudflareDurablePeer({
cloudflare: {
ws: ws as CF.WebSocket,
env: (obj as DurableObjectPub).env,
context: (obj as DurableObjectPub).ctx,
},
});
}

class CloudflareDurablePeer extends Peer<{
cloudflare: {
ws: CF.WebSocket & { _crossws?: CrosswsState };
env: unknown;
context: DurableObject["ctx"];
};
}> {
get url() {
return this.ctx.cloudflare.ws.url || "";
}

get readyState() {
return this.ctx.cloudflare.ws.readyState as -1 | 0 | 1 | 2 | 3;
}

send(message: any) {
this.ctx.cloudflare.ws.send(toBufferLike(message));
return 0;
}

subscribe(topic: string): void {
super.subscribe(topic);
const state: CrosswsState = {
topics: this._subscriptions,
};
this.ctx.cloudflare.ws._crossws = state;
// Max limit: 2,048 bytes
this.ctx.cloudflare.ws.serializeAttachment(state);
}

publish(topic: string, message: any): void {
const clients =
this.ctx.cloudflare.context.getWebSockets() as unknown as (typeof this.ctx.cloudflare.ws)[];
if (clients.length === 0) {
return;
}
const data = toBufferLike(message);
for (const client of clients) {
let state = client._crossws;
if (!state) {
state = client._crossws =
client.deserializeAttachment() as CrosswsState;
}
if (state.topics?.has(topic)) {
client.send(data);
}
}
}

close(code?: number, reason?: string) {
this.ctx.cloudflare.ws.close(code, reason);
}

terminate(): void {
this.close();
}
}
2 changes: 1 addition & 1 deletion src/adapters/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class CloudflarePeer extends Peer<{
}

get headers() {
return this.ctx.cloudflare.req.headers as Headers;
return this.ctx.cloudflare.req.headers as unknown as Headers;
}

get readyState() {
Expand Down
1 change: 1 addition & 0 deletions src/adapters/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class DenoPeer extends Peer<{
}

terminate(): void {
// @ts-ignore (terminate is Deno-only api)
this.ctx.deno.ws.terminate();
}
}
7 changes: 5 additions & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
"moduleResolution": "Node",
"esModuleInterop": true,
"skipLibCheck": true,
"strict": true
"strict": true,
"allowImportingTsExtensions": true,
"noEmit": true,
"types": ["node", "@cloudflare/workers-types"]
},
"include": ["src"]
"include": ["src", "playground"]
}
11 changes: 10 additions & 1 deletion wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,13 @@
compatibility_date = "2024-01-01"
workers_dev = false

main = "playground/cloudflare.ts"
# main = "playground/cloudflare.ts"
main = "playground/cloudflare-durable.ts"

[[durable_objects.bindings]]
name = "$DurableObject"
class_name = "$DurableObject"

[[migrations]]
tag = "v1"
new_classes = ["$DurableObject"]
Loading