Skip to content

Commit

Permalink
refactor!: overhaul internal impl (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 authored Aug 6, 2024
1 parent a46265c commit 48a9d6d
Show file tree
Hide file tree
Showing 19 changed files with 631 additions and 456 deletions.
71 changes: 40 additions & 31 deletions src/adapters/bun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,89 @@ import type { WebSocketHandler, ServerWebSocket, Server } from "bun";
import { Message } from "../message";
import { Peer } from "../peer";
import { AdapterOptions, defineWebSocketAdapter } from "../types";
import { createCrossWS } from "../crossws";
import { CrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

export interface BunAdapter {
websocket: WebSocketHandler<ContextData>;
handleUpgrade(req: Request, server: Server): Promise<boolean>;
handleUpgrade(req: Request, server: Server): Promise<Response | undefined>;
}

export interface BunOptions extends AdapterOptions {}

type ContextData = {
_peer?: Peer;
req?: Request;
request?: Request;
requestUrl?: string;
server?: Server;
};

export default defineWebSocketAdapter<BunAdapter, BunOptions>(
(options = {}) => {
const crossws = createCrossWS(options);

const getPeer = (ws: ServerWebSocket<ContextData>) => {
if (ws.data?._peer) {
return ws.data._peer;
}
const peer = new BunPeer({ bun: { ws } });
ws.data = ws.data || {};
ws.data._peer = peer;
return peer;
};

const crossws = new CrossWS(options);
return {
async handleUpgrade(req: Request, server: Server) {
const { headers } = await crossws.upgrade({
url: req.url,
headers: req.headers,
});
return server.upgrade(req, {
data: { req, server },
headers,
async handleUpgrade(request, server) {
const res = await crossws.callHook("upgrade", request);
if (res instanceof Response) {
return res;
}
const upgradeOK = server.upgrade(request, {
data: {
server,
request,
requestUrl: request.url,
} satisfies ContextData,
headers: res?.headers,
});
return upgradeOK
? undefined
: new Response("Upgrade failed", { status: 500 });
},
websocket: {
message: (ws, message) => {
const peer = getPeer(ws);
crossws.$callHook("bun:message", peer, ws, message);
crossws.callAdapterHook("bun:message", peer, ws, message);
crossws.callHook("message", peer, new Message(message));
},
open: (ws) => {
const peer = getPeer(ws);
crossws.$callHook("bun:open", peer, ws);
crossws.callAdapterHook("bun:open", peer, ws);
crossws.callHook("open", peer);
},
close: (ws) => {
const peer = getPeer(ws);
crossws.$callHook("bun:close", peer, ws);
crossws.callAdapterHook("bun:close", peer, ws);
crossws.callHook("close", peer, {});
},
drain: (ws) => {
const peer = getPeer(ws);
crossws.$callHook("bun:drain", peer);
crossws.callAdapterHook("bun:drain", peer);
},
ping(ws, data) {
const peer = getPeer(ws);
crossws.$callHook("bun:ping", peer, ws, data);
crossws.callAdapterHook("bun:ping", peer, ws, data);
},
pong(ws, data) {
const peer = getPeer(ws);
crossws.$callHook("bun:pong", peer, ws, data);
crossws.callAdapterHook("bun:pong", peer, ws, data);
},
},
};
},
);

function getPeer(ws: ServerWebSocket<ContextData>) {
if (ws.data?._peer) {
return ws.data._peer;
}
const peer = new BunPeer({ bun: { ws } });
ws.data = {
...ws.data,
_peer: peer,
};
return peer;
}

class BunPeer extends Peer<{
bun: { ws: ServerWebSocket<ContextData> };
}> {
Expand All @@ -94,11 +103,11 @@ class BunPeer extends Peer<{
}

get url() {
return this.ctx.bun.ws.data.req?.url || "/";
return this.ctx.bun.ws.data.requestUrl || "/";
}

get headers() {
return this.ctx.bun.ws.data.req?.headers || new Headers();
return this.ctx.bun.ws.data.request?.headers;
}

send(message: any, options?: { compress?: boolean }) {
Expand Down
118 changes: 61 additions & 57 deletions src/adapters/cloudflare-durable.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
// https://developers.cloudflare.com/durable-objects/examples/websocket-hibernation-server/

import type * as CF from "@cloudflare/workers-types";
import type { DurableObject } from "cloudflare:workers";
import { AdapterOptions, defineWebSocketAdapter } from "../types";
import { Peer } from "../peer";
import { Message } from "../message";
import { createCrossWS } from "../crossws";
import { CrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

declare class DurableObjectPub extends DurableObject {
public ctx: DurableObject["ctx"];
public env: unknown;
}

type AugmentedWebSocket = CF.WebSocket & {
_crosswsState?: CrosswsState;
_crosswsPeer?: CloudflareDurablePeer;
};

type CrosswsState = {
topics?: Set<string>;
};
Expand Down Expand Up @@ -51,92 +58,88 @@ export default defineWebSocketAdapter<
CloudflareDurableAdapter,
CloudflareOptions
>((opts) => {
const crossws = createCrossWS(opts);

const handleUpgrade: CloudflareDurableAdapter["handleUpgrade"] = async (
req,
env,
_context,
) => {
const bindingName = opts?.bindingName ?? "$DurableObject";
const instanceName = opts?.instanceName ?? "crossws";
const binding = (env as any)[bindingName] as CF.DurableObjectNamespace;
const id = binding.idFromName(instanceName);
const stub = binding.get(id);
return stub.fetch(req as CF.Request) as unknown as Response;
};

const handleDurableUpgrade: CloudflareDurableAdapter["handleDurableUpgrade"] =
async (obj, req) => {
const crossws = new CrossWS(opts);
return {
handleUpgrade: async (req, env, _context) => {
const bindingName = opts?.bindingName ?? "$DurableObject";
const instanceName = opts?.instanceName ?? "crossws";
const binding = (env as any)[bindingName] as CF.DurableObjectNamespace;
const id = binding.idFromName(instanceName);
const stub = binding.get(id);
return stub.fetch(req as CF.Request) as unknown as Response;
},
handleDurableUpgrade: async (obj, request) => {
const res = await crossws.callHook("upgrade", request as Request);
if (res instanceof Response) {
return res;
}
const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];
const peer = peerFromDurableEvent(obj, server);

const { headers } = await crossws.upgrade(peer);

const peer = peerFromDurableEvent(
obj,
server as unknown as CF.WebSocket,
request,
);
(obj as DurableObjectPub).ctx.acceptWebSocket(server);

crossws.$callHook("cloudflare:accept", peer);
crossws.callAdapterHook("cloudflare:accept", peer);
crossws.callHook("open", peer);

// eslint-disable-next-line unicorn/no-null
return new Response(null, {
status: 101,
webSocket: client,
headers,
headers: res?.headers,
});
};

const handleDurableMessage: CloudflareDurableAdapter["handleDurableMessage"] =
async (obj, ws, message) => {
const peer = peerFromDurableEvent(obj, ws);
crossws.$callHook("cloudflare:message", peer, message);
},
handleDurableMessage: async (obj, ws, message) => {
const peer = peerFromDurableEvent(obj, ws as CF.WebSocket);
crossws.callAdapterHook("cloudflare:message", peer, message);
crossws.callHook("message", peer, new Message(message));
};

const handleDurableClose: CloudflareDurableAdapter["handleDurableClose"] =
async (obj, ws, code, reason, wasClean) => {
const peer = peerFromDurableEvent(obj, ws);
},
handleDurableClose: async (obj, ws, code, reason, wasClean) => {
const peer = peerFromDurableEvent(obj, ws as CF.WebSocket);
const details = { code, reason, wasClean };
crossws.$callHook("cloudflare:close", peer, details);
crossws.callAdapterHook("cloudflare:close", peer, details);
crossws.callHook("close", peer, details);
ws.close(code, reason);
};

return {
handleUpgrade,
handleDurableUpgrade,
handleDurableClose,
handleDurableMessage,
},
};
});

function peerFromDurableEvent(
obj: DurableObject,
ws: WebSocket | CF.WebSocket,
ws: AugmentedWebSocket,
request?: Request | CF.Request,
): CloudflareDurablePeer {
if ((ws as any)._crosswsPeer) {
return (ws as any)._crosswsPeer;
let peer = ws._crosswsPeer;
if (peer) {
return peer;
}
return ((ws as any)._crosswsPeer = new CloudflareDurablePeer({
peer = ws._crosswsPeer = new CloudflareDurablePeer({
cloudflare: {
ws: ws as CF.WebSocket,
request,
env: (obj as DurableObjectPub).env,
context: (obj as DurableObjectPub).ctx,
},
}));
});
return peer;
}

class CloudflareDurablePeer extends Peer<{
cloudflare: {
ws: CF.WebSocket & { _crossws?: CrosswsState };
ws: AugmentedWebSocket;
request?: Request | CF.Request;
env: unknown;
context: DurableObject["ctx"];
};
}> {
get url() {
return this.ctx.cloudflare.ws.url || "";
return this.ctx.cloudflare.request?.url || this.ctx.cloudflare.ws.url || "";
}

get headers() {
return this.ctx.cloudflare.request?.headers as Headers;
}

get readyState() {
Expand All @@ -151,10 +154,11 @@ class CloudflareDurablePeer extends Peer<{
subscribe(topic: string): void {
super.subscribe(topic);
const state: CrosswsState = {
topics: this._subscriptions,
// Max limit: 2,048 bytes
...(this.ctx.cloudflare.ws.deserializeAttachment() as CrosswsState),
topics: this._topics,
};
this.ctx.cloudflare.ws._crossws = state;
// Max limit: 2,048 bytes
this.ctx.cloudflare.ws._crosswsState = state;
this.ctx.cloudflare.ws.serializeAttachment(state);
}

Expand All @@ -167,9 +171,9 @@ class CloudflareDurablePeer extends Peer<{
}
const data = toBufferLike(message);
for (const client of clients) {
let state = client._crossws;
let state = client._crosswsState;
if (!state) {
state = client._crossws =
state = client._crosswsState =
client.deserializeAttachment() as CrosswsState;
}
if (state.topics?.has(topic)) {
Expand Down
Loading

0 comments on commit 48a9d6d

Please sign in to comment.