Skip to content

Commit

Permalink
feat: add support for WebTransport
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Jun 12, 2023
1 parent db3de84 commit 7195c0f
Show file tree
Hide file tree
Showing 10 changed files with 2,644 additions and 443 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:

strategy:
matrix:
node-version: [14, 16]
node-version: [16]

steps:
- name: Checkout repository
Expand Down
34 changes: 34 additions & 0 deletions lib/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Emitter } from "@socket.io/component-emitter";
import { installTimerFunctions } from "./util.js";
import debugModule from "debug"; // debug()
import { SocketOptions } from "./socket.js";
import { encode } from "./contrib/parseqs.js";

const debug = debugModule("engine.io-client:transport"); // debug()

Expand Down Expand Up @@ -171,6 +172,39 @@ export abstract class Transport extends Emitter<
*/
public pause(onPause: () => void) {}

protected uri(schema: string, query: Record<string, unknown> = {}) {
return (
schema +
"://" +
this._hostname() +
this._port() +
this.opts.path +
this._query(query)
);
}

private _hostname() {
const hostname = this.opts.hostname;
return hostname.indexOf(":") === -1 ? hostname : "[" + hostname + "]";
}

private _port() {
if (
this.opts.port &&
((this.opts.secure && this.opts.port !== "443") ||
(!this.opts.secure && this.opts.port !== "80"))
) {
return ":" + this.opts.port;
} else {
return "";
}
}

private _query(query: Record<string, unknown>) {
const encodedQuery = encode(query);
return encodedQuery.length ? "?" + encodedQuery : "";
}

protected abstract doOpen();
protected abstract doClose();
protected abstract write(packets: Packet[]);
Expand Down
2 changes: 2 additions & 0 deletions lib/transports/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Polling } from "./polling.js";
import { WS } from "./websocket.js";
import { WT } from "./webtransport.js";

export const transports = {
websocket: WS,
webtransport: WT,
polling: Polling,
};
110 changes: 110 additions & 0 deletions lib/transports/webtransport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { Transport } from "../transport.js";
import { nextTick } from "./websocket-constructor.js";
import {
encodePacketToBinary,
decodePacketFromBinary,
Packet,
} from "engine.io-parser";
import debugModule from "debug"; // debug()

const debug = debugModule("engine.io-client:webtransport"); // debug()

function shouldIncludeBinaryHeader(packet, encoded) {
// 48 === "0".charCodeAt(0) (OPEN packet type)
// 54 === "6".charCodeAt(0) (NOOP packet type)
return (
packet.type === "message" &&
typeof packet.data !== "string" &&
encoded[0] >= 48 &&
encoded[0] <= 54
);
}

export class WT extends Transport {
private transport: any;
private writer: any;

get name() {
return "webtransport";
}

protected doOpen() {
// @ts-ignore
if (typeof WebTransport !== "function") {
return;
}
// @ts-ignore
this.transport = new WebTransport(
this.uri("https"),
this.opts.transportOptions[this.name]
);

this.transport.closed.then(() => this.onClose());

// note: we could have used async/await, but that would require some additional polyfills
this.transport.ready.then(() => {
this.transport.createBidirectionalStream().then((stream) => {
const reader = stream.readable.getReader();
this.writer = stream.writable.getWriter();

let binaryFlag;

const read = () => {
reader.read().then(({ done, value }) => {
if (done) {
debug("session is closed");
return;
}
debug("received chunk: %o", value);
if (!binaryFlag && value.byteLength === 1 && value[0] === 54) {
binaryFlag = true;
} else {
// TODO expose binarytype
this.onPacket(
decodePacketFromBinary(value, binaryFlag, "arraybuffer")
);
binaryFlag = false;
}
read();
});
};

read();

const handshake = this.query.sid ? `0{"sid":"${this.query.sid}"}` : "0";
this.writer
.write(new TextEncoder().encode(handshake))
.then(() => this.onOpen());
});
});
}

protected write(packets: Packet[]) {
this.writable = false;

for (let i = 0; i < packets.length; i++) {
const packet = packets[i];
const lastPacket = i === packets.length - 1;

encodePacketToBinary(packet, (data) => {
if (shouldIncludeBinaryHeader(packet, data)) {
debug("writing binary header");
this.writer.write(Uint8Array.of(54));
}
debug("writing chunk: %o", data);
this.writer.write(data).then(() => {
if (lastPacket) {
nextTick(() => {
this.writable = true;
this.emitReserved("drain");
}, this.setTimeoutFn);
}
});
});
}
}

protected doClose() {
this.transport?.close();
}
}
Loading

0 comments on commit 7195c0f

Please sign in to comment.