Skip to content

Commit

Permalink
refactor: better internal organization
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 committed Aug 6, 2024
1 parent a96dca3 commit 2744f21
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 305 deletions.
40 changes: 40 additions & 0 deletions src/adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type { AdapterHooks, Hooks, ResolveHooks } from "./hooks.ts";
import type { Peer } from "./peer.ts";

export function adapterUtils(peers: Set<Peer>) {
return {
peers,
publish(topic: string, message: any, options) {
const firstPeer = peers.values().next().value as Peer;
if (firstPeer) {
firstPeer.send(message, options);
firstPeer.publish(topic, message, options);
}
},
} satisfies AdapterInstance;
}

// --- types ---

export interface AdapterInstance {
readonly peers: Set<Peer>;
readonly publish: Peer["publish"];
}

export interface AdapterOptions {
resolve?: ResolveHooks;
hooks?: Hooks;
adapterHooks?: AdapterHooks;
}

export type Adapter<
AdapterT extends AdapterInstance = AdapterInstance,
Options extends AdapterOptions = AdapterOptions,
> = (options?: Options) => AdapterT;

export function defineWebSocketAdapter<
AdapterT extends AdapterInstance = AdapterInstance,
Options extends AdapterOptions = AdapterOptions,
>(factory: Adapter<AdapterT, Options>) {
return factory;
}
25 changes: 14 additions & 11 deletions src/adapters/bun.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
// https://bun.sh/docs/api/websockets

import type { WebSocketHandler, ServerWebSocket, Server } from "bun";
import { Message } from "../message";
import { Peer } from "../peer";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types";
import { AdapterHookable } from "../hooks";
import { adapterUtils, toBufferLike } from "../_utils";

import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts";
import { Peer } from "../peer.ts";

// --- types ---

export interface BunAdapter extends AdapterInstance {
websocket: WebSocketHandler<ContextData>;
Expand All @@ -25,6 +23,9 @@ type ContextData = {
server?: Server;
};

// --- adapter ---

// https://bun.sh/docs/api/websockets
export default defineWebSocketAdapter<BunAdapter, BunOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
Expand Down Expand Up @@ -82,6 +83,8 @@ export default defineWebSocketAdapter<BunAdapter, BunOptions>(
},
);

// --- peer ---

function getPeer(
ws: ServerWebSocket<ContextData>,
peers: Set<BunPeer>,
Expand Down
23 changes: 13 additions & 10 deletions src/adapters/cloudflare-durable.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
// https://developers.cloudflare.com/durable-objects/examples/websocket-hibernation-server/
import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts";
import { Peer } from "../peer.ts";

import type * as CF from "@cloudflare/workers-types";
import type { DurableObject } from "cloudflare:workers";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types";
import { Peer } from "../peer";
import { Message } from "../message";
import { AdapterHookable } from "../hooks";
import { adapterUtils, toBufferLike } from "../_utils";

// --- types

declare class DurableObjectPub extends DurableObject {
public ctx: DurableObject["ctx"];
Expand Down Expand Up @@ -58,6 +56,9 @@ export interface CloudflareOptions extends AdapterOptions {
instanceName?: string;
}

// --- adapter ---

// https://developers.cloudflare.com/durable-objects/examples/websocket-hibernation-server/
export default defineWebSocketAdapter<
CloudflareDurableAdapter,
CloudflareOptions
Expand Down Expand Up @@ -133,6 +134,8 @@ function peerFromDurableEvent(
return peer;
}

// --- peer ---

class CloudflareDurablePeer extends Peer<{
peers?: never;
cloudflare: {
Expand Down
24 changes: 13 additions & 11 deletions src/adapters/cloudflare.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
// https://developers.cloudflare.com/workers/examples/websockets/
import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts";
import { WSError } from "../error.ts";
import { Peer } from "../peer.ts";

import type * as _cf from "@cloudflare/workers-types";

import { Peer } from "../peer";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types.js";
import { Message } from "../message";
import { WSError } from "../error";
import { AdapterHookable } from "../hooks.js";
import { adapterUtils, toBufferLike } from "../_utils";
// --- types ---

declare const WebSocketPair: typeof _cf.WebSocketPair;
declare const Response: typeof _cf.Response;
Expand All @@ -26,6 +23,9 @@ export interface CloudflareAdapter extends AdapterInstance {

export interface CloudflareOptions extends AdapterOptions {}

// --- adapter ---

// https://developers.cloudflare.com/workers/examples/websockets/
export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
Expand Down Expand Up @@ -76,6 +76,8 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
},
);

// --- peer ---

class CloudflarePeer extends Peer<{
peers: Set<CloudflarePeer>;
cloudflare: {
Expand Down
24 changes: 13 additions & 11 deletions src/adapters/deno.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
// https://deno.land/api?s=WebSocket
// https://deno.land/api?s=Deno.upgradeWebSocket
// https://examples.deno.land/http-server-websocket

import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts";
import { WSError } from "../error.ts";
import { Peer } from "../peer.ts";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types.ts";
import { AdapterHookable } from "../hooks.ts";
import { adapterUtils, toBufferLike } from "../_utils.ts";

// --- types ---

export interface DenoAdapter extends AdapterInstance {
handleUpgrade(req: Request, info: ServeHandlerInfo): Promise<Response>;
Expand All @@ -26,6 +21,11 @@ declare global {
type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade;
type ServeHandlerInfo = unknown; // TODO

// --- adapter ---

// https://deno.land/api?s=WebSocket
// https://deno.land/api?s=Deno.upgradeWebSocket
// https://examples.deno.land/http-server-websocket
export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
Expand Down Expand Up @@ -70,6 +70,8 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
},
);

// --- peer ---

class DenoPeer extends Peer<{
peers: Set<DenoPeer>;
deno: {
Expand Down
123 changes: 64 additions & 59 deletions src/adapters/node.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// https://github.com/websockets/ws
// https://github.com/websockets/ws/blob/master/doc/ws.md
import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts";
import { WSError } from "../error.ts";
import { Peer } from "../peer.ts";

import type { ClientRequest, IncomingMessage } from "node:http";
import type { Duplex } from "node:stream";
Expand All @@ -10,16 +15,8 @@ import type {
WebSocketServer,
WebSocket as WebSocketT,
} from "../../types/ws";
import { Peer } from "../peer";
import { Message } from "../message";
import { WSError } from "../error";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types";
import { AdapterHookable } from "../hooks";
import { adapterUtils, toBufferLike } from "../_utils";

// --- types ---

type AugmentedReq = IncomingMessage & { _upgradeHeaders?: HeadersInit };

Expand All @@ -33,6 +30,10 @@ export interface NodeOptions extends AdapterOptions {
serverOptions?: ServerOptions;
}

// --- adapter ---

// https://github.com/websockets/ws
// https://github.com/websockets/ws/blob/master/doc/ws.md
export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
Expand Down Expand Up @@ -123,53 +124,7 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
},
);

class NodeReqProxy {
_req: IncomingMessage;
_headers?: Headers;
_url?: string;

constructor(req: IncomingMessage) {
this._req = req;
}

get url(): string {
if (!this._url) {
const req = this._req;
const host = req.headers["host"] || "localhost";
const isSecure =
(req.socket as any)?.encrypted ??
req.headers["x-forwarded-proto"] === "https";
this._url = `${isSecure ? "https" : "http"}://${host}${req.url}`;
}
return this._url;
}

get headers(): Headers {
if (!this._headers) {
this._headers = new Headers(this._req.headers as HeadersInit);
}
return this._headers;
}
}

async function sendResponse(socket: Duplex, res: Response) {
const head = [
`HTTP/1.1 ${res.status || 200} ${res.statusText || ""}`,
...[...res.headers.entries()].map(
([key, value]) =>
`${encodeURIComponent(key)}: ${encodeURIComponent(value)}`,
),
];
socket.write(head.join("\r\n") + "\r\n\r\n");
if (res.body) {
for await (const chunk of res.body) {
socket.write(chunk);
}
}
return new Promise<void>((resolve) => {
socket.end(resolve);
});
}
// --- peer ---

class NodePeer extends Peer<{
peers: Set<NodePeer>;
Expand Down Expand Up @@ -246,3 +201,53 @@ class NodePeer extends Peer<{
this._internal.node.ws.terminate();
}
}

// --- web compat ---

class NodeReqProxy {
_req: IncomingMessage;
_headers?: Headers;
_url?: string;

constructor(req: IncomingMessage) {
this._req = req;
}

get url(): string {
if (!this._url) {
const req = this._req;
const host = req.headers["host"] || "localhost";
const isSecure =
(req.socket as any)?.encrypted ??
req.headers["x-forwarded-proto"] === "https";
this._url = `${isSecure ? "https" : "http"}://${host}${req.url}`;
}
return this._url;
}

get headers(): Headers {
if (!this._headers) {
this._headers = new Headers(this._req.headers as HeadersInit);
}
return this._headers;
}
}

async function sendResponse(socket: Duplex, res: Response) {
const head = [
`HTTP/1.1 ${res.status || 200} ${res.statusText || ""}`,
...[...res.headers.entries()].map(
([key, value]) =>
`${encodeURIComponent(key)}: ${encodeURIComponent(value)}`,
),
];
socket.write(head.join("\r\n") + "\r\n\r\n");
if (res.body) {
for await (const chunk of res.body) {
socket.write(chunk);
}
}
return new Promise<void>((resolve) => {
socket.end(resolve);
});
}
Loading

0 comments on commit 2744f21

Please sign in to comment.