Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: universal access to all peers #60

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions src/adapters/bun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
import type { WebSocketHandler, ServerWebSocket, Server } from "bun";
import { Message } from "../message";
import { Peer } from "../peer";
import { AdapterOptions, defineWebSocketAdapter } from "../types";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types";
import { AdapterHookable } from "../hooks";
import { toBufferLike } from "../_utils";

export interface BunAdapter {
export interface BunAdapter extends AdapterInstance {
websocket: WebSocketHandler<ContextData>;
handleUpgrade(req: Request, server: Server): Promise<Response | undefined>;
}

export interface BunOptions extends AdapterOptions {}

type ContextData = {
_peer?: Peer;
_peer?: BunPeer;
request?: Request;
requestUrl?: string;
server?: Server;
Expand All @@ -24,7 +28,9 @@ type ContextData = {
export default defineWebSocketAdapter<BunAdapter, BunOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
const peers = new Set<BunPeer>();
return {
peers,
async handleUpgrade(request, server) {
const res = await hooks.callHook("upgrade", request);
if (res instanceof Response) {
Expand All @@ -38,46 +44,52 @@ export default defineWebSocketAdapter<BunAdapter, BunOptions>(
} satisfies ContextData,
headers: res?.headers,
});
return upgradeOK
? undefined
: new Response("Upgrade failed", { status: 500 });
if (!upgradeOK) {
return new Response("Upgrade failed", { status: 500 });
}
},
websocket: {
message: (ws, message) => {
const peer = getPeer(ws);
const peer = getPeer(ws, peers);
hooks.callHook("message", peer, new Message(message));
},
open: (ws) => {
const peer = getPeer(ws);
const peer = getPeer(ws, peers);
peers.add(peer);
hooks.callAdapterHook("bun:open", peer, ws);
hooks.callHook("open", peer);
},
close: (ws) => {
const peer = getPeer(ws);
const peer = getPeer(ws, peers);
peers.delete(peer);
hooks.callAdapterHook("bun:close", peer, ws);
hooks.callHook("close", peer, {});
},
drain: (ws) => {
const peer = getPeer(ws);
const peer = getPeer(ws, peers);
hooks.callAdapterHook("bun:drain", peer);
},
ping(ws, data) {
const peer = getPeer(ws);
const peer = getPeer(ws, peers);
hooks.callAdapterHook("bun:ping", peer, ws, data);
},
pong(ws, data) {
const peer = getPeer(ws);
const peer = getPeer(ws, peers);
hooks.callAdapterHook("bun:pong", peer, ws, data);
},
},
};
},
);

function getPeer(ws: ServerWebSocket<ContextData>) {
function getPeer(
ws: ServerWebSocket<ContextData>,
peers: Set<BunPeer>,
): BunPeer {
if (ws.data?._peer) {
return ws.data._peer;
}
const peer = new BunPeer({ bun: { ws } });
const peer = new BunPeer({ peers, bun: { ws } });
ws.data = {
...ws.data,
_peer: peer,
Expand All @@ -86,6 +98,7 @@ function getPeer(ws: ServerWebSocket<ContextData>) {
}

class BunPeer extends Peer<{
peers: Set<BunPeer>;
bun: { ws: ServerWebSocket<ContextData> };
}> {
get addr() {
Expand Down
35 changes: 32 additions & 3 deletions src/adapters/cloudflare-durable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import type * as CF from "@cloudflare/workers-types";
import type { DurableObject } from "cloudflare:workers";
import { AdapterOptions, defineWebSocketAdapter } from "../types";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types";
import { Peer } from "../peer";
import { Message } from "../message";
import { AdapterHookable } from "../hooks";
Expand All @@ -22,7 +26,7 @@ type CrosswsState = {
topics?: Set<string>;
};

export interface CloudflareDurableAdapter {
export interface CloudflareDurableAdapter extends AdapterInstance {
handleUpgrade(
req: Request | CF.Request,
env: unknown,
Expand Down Expand Up @@ -59,7 +63,9 @@ export default defineWebSocketAdapter<
CloudflareOptions
>((opts) => {
const hooks = new AdapterHookable(opts);
const peers = new Set<CloudflareDurablePeer>();
return {
peers,
handleUpgrade: async (req, env, _context) => {
const bindingName = opts?.bindingName ?? "$DurableObject";
const instanceName = opts?.instanceName ?? "crossws";
Expand All @@ -81,6 +87,7 @@ export default defineWebSocketAdapter<
server as unknown as CF.WebSocket,
request,
);
peers.add(peer);
(obj as DurableObjectPub).ctx.acceptWebSocket(server);
hooks.callAdapterHook("cloudflare:accept", peer);
hooks.callHook("open", peer);
Expand All @@ -98,10 +105,10 @@ export default defineWebSocketAdapter<
},
handleDurableClose: async (obj, ws, code, reason, wasClean) => {
const peer = peerFromDurableEvent(obj, ws as CF.WebSocket);
peers.delete(peer);
const details = { code, reason, wasClean };
hooks.callAdapterHook("cloudflare:close", peer, details);
hooks.callHook("close", peer, details);
ws.close(code, reason);
},
};
});
Expand All @@ -127,6 +134,7 @@ function peerFromDurableEvent(
}

class CloudflareDurablePeer extends Peer<{
peers?: never;
cloudflare: {
ws: AugmentedWebSocket;
request?: Request | CF.Request;
Expand Down Expand Up @@ -166,6 +174,27 @@ class CloudflareDurablePeer extends Peer<{
this._internal.cloudflare.ws.serializeAttachment(state);
}

get peers() {
const clients =
this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[];
return new Set(
clients.map((client) => {
let peer = client._crosswsPeer;
if (!peer) {
peer = client._crosswsPeer = new CloudflareDurablePeer({
cloudflare: {
ws: client,
request: undefined,
env: this._internal.cloudflare.env,
context: this._internal.cloudflare.context,
},
});
}
return peer;
}),
);
}

publish(topic: string, message: any): void {
const clients = (
this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[]
Expand Down
25 changes: 23 additions & 2 deletions src/adapters/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import type * as _cf from "@cloudflare/workers-types";

import { Peer } from "../peer";
import { AdapterOptions, defineWebSocketAdapter } from "../types.js";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types.js";
import { Message } from "../message";
import { WSError } from "../error";
import { AdapterHookable } from "../hooks.js";
Expand All @@ -12,7 +16,7 @@ import { toBufferLike } from "../_utils";
declare const WebSocketPair: typeof _cf.WebSocketPair;
declare const Response: typeof _cf.Response;

export interface CloudflareAdapter {
export interface CloudflareAdapter extends AdapterInstance {
handleUpgrade(
req: _cf.Request,
env: unknown,
Expand All @@ -25,7 +29,9 @@ export interface CloudflareOptions extends AdapterOptions {}
export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
const peers = new Set<CloudflarePeer>();
return {
peers,
handleUpgrade: async (request, env, context) => {
const res = await hooks.callHook(
"upgrade",
Expand All @@ -38,8 +44,10 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
const client = pair[0];
const server = pair[1];
const peer = new CloudflarePeer({
peers,
cloudflare: { client, server, request, env, context },
});
peers.add(peer);
server.accept();
hooks.callAdapterHook("cloudflare:accept", peer);
hooks.callHook("open", peer);
Expand All @@ -48,10 +56,12 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
hooks.callHook("message", peer, new Message(event.data));
});
server.addEventListener("error", (event) => {
peers.delete(peer);
hooks.callAdapterHook("cloudflare:error", peer, event);
hooks.callHook("error", peer, new WSError(event.error));
});
server.addEventListener("close", (event) => {
peers.delete(peer);
hooks.callAdapterHook("cloudflare:close", peer, event);
hooks.callHook("close", peer, event);
});
Expand All @@ -67,6 +77,7 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
);

class CloudflarePeer extends Peer<{
peers: Set<CloudflarePeer>;
cloudflare: {
client: _cf.WebSocket;
server: _cf.WebSocket;
Expand Down Expand Up @@ -96,6 +107,16 @@ class CloudflarePeer extends Peer<{
return 0;
}

publish(_topic: string, _message: any): void {
// Not supported
// Throws: A hanging Promise was canceled
// for (const peer of this._internal.peers) {
// if (peer !== this && peer._topics.has(_topic)) {
// peer.publish(_topic, _message);
// }
// }
}

close(code?: number, reason?: string) {
this._internal.cloudflare.client.close(code, reason);
}
Expand Down
32 changes: 17 additions & 15 deletions src/adapters/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
import { Message } from "../message.ts";
import { WSError } from "../error.ts";
import { Peer } from "../peer.ts";
import { AdapterOptions, defineWebSocketAdapter } from "../types.ts";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types.ts";
import { AdapterHookable } from "../hooks.ts";
import { toBufferLike } from "../_utils.ts";

export interface DenoAdapter {
export interface DenoAdapter extends AdapterInstance {
handleUpgrade(req: Request, info: ServeHandlerInfo): Promise<Response>;
}

Expand All @@ -22,15 +26,12 @@ declare global {
type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade;
type ServeHandlerInfo = unknown; // TODO

type DenoWSSharedState = {
peers: Set<DenoPeer>;
};

export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
const sharedState: DenoWSSharedState = { peers: new Set() };
const peers = new Set<DenoPeer>();
return {
peers,
handleUpgrade: async (request, info) => {
const res = await hooks.callHook("upgrade", request);
if (res instanceof Response) {
Expand All @@ -41,9 +42,10 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
headers: res?.headers,
});
const peer = new DenoPeer({
deno: { ws: upgrade.socket, request, info, sharedState },
peers,
deno: { ws: upgrade.socket, request, info },
});
sharedState.peers.add(peer);
peers.add(peer);
upgrade.socket.addEventListener("open", () => {
hooks.callAdapterHook("deno:open", peer);
hooks.callHook("open", peer);
Expand All @@ -53,12 +55,12 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
hooks.callHook("message", peer, new Message(event.data));
});
upgrade.socket.addEventListener("close", () => {
sharedState.peers.delete(peer);
peers.delete(peer);
hooks.callAdapterHook("deno:close", peer);
hooks.callHook("close", peer, {});
});
upgrade.socket.addEventListener("error", (error) => {
sharedState.peers.delete(peer);
peers.delete(peer);
hooks.callAdapterHook("deno:error", peer, error);
hooks.callHook("error", peer, new WSError(error));
});
Expand All @@ -69,11 +71,11 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
);

class DenoPeer extends Peer<{
peers: Set<DenoPeer>;
deno: {
ws: WebSocketUpgrade["socket"];
request: Request;
info: ServeHandlerInfo;
sharedState: DenoWSSharedState;
};
}> {
get addr() {
Expand All @@ -98,11 +100,11 @@ class DenoPeer extends Peer<{
return 0;
}

publish(topic: string, message: any): void {
publish(topic: string, message: any) {
const data = toBufferLike(message);
for (const peer of this._internal.deno.sharedState.peers) {
for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) {
peer.send(data);
peer._internal.deno.ws.send(data);
}
}
}
Expand Down
Loading