diff --git a/lib/server.ts b/lib/server.ts index 355e14d5..5cec3d61 100644 --- a/lib/server.ts +++ b/lib/server.ts @@ -16,12 +16,11 @@ import type { CookieSerializeOptions } from "cookie"; import type { CorsOptions, CorsOptionsDelegate } from "cors"; import type { Duplex } from "stream"; import { WebTransport } from "./transports/webtransport"; -import { TextDecoder } from "util"; +import { createPacketDecoderStream } from "engine.io-parser"; const debug = debugModule("engine"); const kResponseHeaders = Symbol("responseHeaders"); -const TEXT_DECODER = new TextDecoder(); type Transport = "polling" | "websocket"; @@ -149,15 +148,13 @@ type Middleware = ( next: (err?: any) => void ) => void; -function parseSessionId(handshake: string) { - if (handshake.startsWith("0{")) { - try { - const parsed = JSON.parse(handshake.substring(1)); - if (typeof parsed.sid === "string") { - return parsed.sid; - } - } catch (e) {} - } +function parseSessionId(data: string) { + try { + const parsed = JSON.parse(data); + if (typeof parsed.sid === "string") { + return parsed.sid; + } + } catch (e) {} } export abstract class BaseServer extends EventEmitter { @@ -536,7 +533,11 @@ export abstract class BaseServer extends EventEmitter { } const stream = result.value; - const reader = stream.readable.getReader(); + const transformStream = createPacketDecoderStream( + this.opts.maxHttpBufferSize, + "nodebuffer" + ); + const reader = stream.readable.pipeThrough(transformStream).getReader(); // reading the first packet of the stream const { value, done } = await reader.read(); @@ -546,12 +547,13 @@ export abstract class BaseServer extends EventEmitter { } clearTimeout(timeout); - const handshake = TEXT_DECODER.decode(value); - // handshake is either - // "0" => new session - // '0{"sid":"xxxx"}' => upgrade - if (handshake === "0") { + if (value.type !== "open") { + debug("invalid WebTransport handshake"); + return session.close(); + } + + if (value.data === undefined) { const transport = new WebTransport(session, stream, reader); // note: we cannot use "this.generateId()", because there is no "req" argument @@ -572,7 +574,7 @@ export abstract class BaseServer extends EventEmitter { return; } - const sid = parseSessionId(handshake); + const sid = parseSessionId(value.data); if (!sid) { debug("invalid WebTransport handshake"); diff --git a/lib/transports/webtransport.ts b/lib/transports/webtransport.ts index b79b8164..4f6f6877 100644 --- a/lib/transports/webtransport.ts +++ b/lib/transports/webtransport.ts @@ -1,21 +1,9 @@ import { Transport } from "../transport"; import debugModule from "debug"; +import { createPacketEncoderStream } from "engine.io-parser"; const debug = debugModule("engine:webtransport"); -const BINARY_HEADER = Buffer.of(54); - -function shouldIncludeBinaryHeader(packet, encoded) { - // 48 === "0".charCodeAt(0) (OPEN packet type) - // 54 === "6".charCodeAt(0) (NOOP packet type) - return ( - packet.type === "message" && - typeof packet.data !== "string" && - encoded[0] >= 48 && - encoded[0] <= 54 - ); -} - /** * Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebTransport_API */ @@ -24,24 +12,24 @@ export class WebTransport extends Transport { constructor(private readonly session, stream, reader) { super({ _query: { EIO: "4" } }); - this.writer = stream.writable.getWriter(); + + const transformStream = createPacketEncoderStream(); + transformStream.readable.pipeTo(stream.writable); + this.writer = transformStream.writable.getWriter(); + (async () => { - let binaryFlag = false; - while (true) { - const { value, done } = await reader.read(); - if (done) { - debug("session is closed"); - break; - } - debug("received chunk: %o", value); - if (!binaryFlag && value.byteLength === 1 && value[0] === 54) { - binaryFlag = true; - continue; + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + debug("session is closed"); + break; + } + debug("received chunk: %o", value); + this.onPacket(value); } - this.onPacket( - this.parser.decodePacketFromBinary(value, binaryFlag, "nodebuffer") - ); - binaryFlag = false; + } catch (e) { + debug("error while reading: %s", e.message); } })(); @@ -58,26 +46,20 @@ export class WebTransport extends Transport { return true; } - send(packets) { + async send(packets) { this.writable = false; - for (let i = 0; i < packets.length; i++) { - const packet = packets[i]; - const isLast = i + 1 === packets.length; - - this.parser.encodePacketToBinary(packet, (data) => { - if (shouldIncludeBinaryHeader(packet, data)) { - debug("writing binary header"); - this.writer.write(BINARY_HEADER); - } - debug("writing chunk: %o", data); - this.writer.write(data); - if (isLast) { - this.writable = true; - this.emit("drain"); - } - }); + try { + for (let i = 0; i < packets.length; i++) { + const packet = packets[i]; + await this.writer.write(packet); + } + } catch (e) { + debug("error while writing: %s", e.message); } + + this.writable = true; + this.emit("drain"); } doClose(fn) { diff --git a/package-lock.json b/package-lock.json index 95421dce..7508dec7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "engine.io", - "version": "6.4.2", + "version": "6.5.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "engine.io", - "version": "6.4.2", + "version": "6.5.1", "license": "MIT", "dependencies": { "@types/cookie": "^0.4.1", @@ -17,7 +17,7 @@ "cookie": "~0.4.1", "cors": "~2.8.5", "debug": "~4.3.1", - "engine.io-parser": "~5.1.0", + "engine.io-parser": "~5.2.1", "ws": "~8.11.0" }, "devDependencies": { @@ -38,7 +38,7 @@ "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.30.0" }, "engines": { - "node": ">=10.0.0" + "node": ">=10.2.0" } }, "node_modules/@babel/code-frame": { @@ -819,10 +819,19 @@ "node": ">=0.4.0" } }, - "node_modules/engine.io-parser": { + "node_modules/engine.io-client/node_modules/engine.io-parser": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.1.0.tgz", "integrity": "sha512-enySgNiK5tyZFynt3z7iqBR+Bto9EVVVvDFuTT0ioHCGbzirZVGDGiQjZzEp8hWl6hd5FSVytJGuScX1C1C35w==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.1.tgz", + "integrity": "sha512-9JktcM3u18nU9N2Lz3bWeBgxVgOKpw7yhRaoxQA3FUDZzzw+9WlA6p4G4u0RixNkg14fH7EfEc/RhpurtiROTQ==", "engines": { "node": ">=10.0.0" } @@ -3115,6 +3124,14 @@ "engine.io-parser": "~5.1.0", "ws": "~8.11.0", "xmlhttprequest-ssl": "~2.0.0" + }, + "dependencies": { + "engine.io-parser": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.1.0.tgz", + "integrity": "sha512-enySgNiK5tyZFynt3z7iqBR+Bto9EVVVvDFuTT0ioHCGbzirZVGDGiQjZzEp8hWl6hd5FSVytJGuScX1C1C35w==", + "dev": true + } } }, "engine.io-client-v3": { @@ -3180,9 +3197,9 @@ } }, "engine.io-parser": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.1.0.tgz", - "integrity": "sha512-enySgNiK5tyZFynt3z7iqBR+Bto9EVVVvDFuTT0ioHCGbzirZVGDGiQjZzEp8hWl6hd5FSVytJGuScX1C1C35w==" + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.1.tgz", + "integrity": "sha512-9JktcM3u18nU9N2Lz3bWeBgxVgOKpw7yhRaoxQA3FUDZzzw+9WlA6p4G4u0RixNkg14fH7EfEc/RhpurtiROTQ==" }, "escalade": { "version": "3.1.1", diff --git a/package.json b/package.json index 04cd9b2f..e76a4976 100644 --- a/package.json +++ b/package.json @@ -39,7 +39,7 @@ "cookie": "~0.4.1", "cors": "~2.8.5", "debug": "~4.3.1", - "engine.io-parser": "~5.1.0", + "engine.io-parser": "~5.2.1", "ws": "~8.11.0" }, "devDependencies": { diff --git a/test/webtransport.mjs b/test/webtransport.mjs index d26f9dfd..3d54e554 100644 --- a/test/webtransport.mjs +++ b/test/webtransport.mjs @@ -85,12 +85,15 @@ function setup(opts, cb) { const reader = stream.readable.getReader(); const writer = stream.writable.getWriter(); - engine.on("connection", (socket) => { + engine.on("connection", async (socket) => { + await reader.read(); // header + await reader.read(); // payload (handshake) + cb({ engine, h3Server, socket, client, stream, reader, writer }); }); + await writer.write(Uint8Array.of(1)); await writer.write(TEXT_ENCODER.encode("0")); - await reader.read(); // handshake }); } @@ -130,11 +133,11 @@ describe("WebTransport", () => { const writer = stream.writable.getWriter(); (async function read() { - const { done, value } = await reader.read(); + const header = await reader.read(); - if (done) { - return; - } + expect(header.value).to.eql(Uint8Array.of(107)); + + const { value } = await reader.read(); const handshake = TEXT_DECODER.decode(value); expect(handshake.startsWith("0{")).to.be(true); @@ -142,7 +145,8 @@ describe("WebTransport", () => { partialDone(); })(); - await writer.write(TEXT_ENCODER.encode("0")); + writer.write(Uint8Array.of(1)); + writer.write(TEXT_ENCODER.encode("0")); }); }); @@ -194,6 +198,10 @@ describe("WebTransport", () => { const writer = stream.writable.getWriter(); (async function read() { + const header = await reader.read(); + + expect(header.value).to.eql(Uint8Array.of(6)); + const { done, value } = await reader.read(); if (done) { @@ -206,10 +214,13 @@ describe("WebTransport", () => { partialDone(); })(); + await writer.write(Uint8Array.of(31)); await writer.write( TEXT_ENCODER.encode(`0{"sid":"${payload.sid}"}`) ); + await writer.write(Uint8Array.of(6)); await writer.write(TEXT_ENCODER.encode(`2probe`)); + await writer.write(Uint8Array.of(1)); await writer.write(TEXT_ENCODER.encode(`5`)); }); } @@ -281,10 +292,14 @@ describe("WebTransport", () => { }, async ({ engine, h3Server, reader, writer }) => { for (let i = 0; i < 5; i++) { + const header = await reader.read(); + expect(header.value).to.eql(Uint8Array.of(1)); + const packet = await reader.read(); const value = TEXT_DECODER.decode(packet.value); expect(value).to.eql("2"); + writer.write(Uint8Array.of(1)); writer.write(TEXT_ENCODER.encode("3")); } @@ -338,6 +353,7 @@ describe("WebTransport", () => { success(engine, h3Server, done); }); + writer.write(Uint8Array.of(6)); writer.write(TEXT_ENCODER.encode("4hello")); }); }); @@ -346,6 +362,9 @@ describe("WebTransport", () => { setup({}, async ({ engine, h3Server, socket, reader }) => { socket.send("hello"); + const header = await reader.read(); + expect(header.value).to.eql(Uint8Array.of(6)); + const { value } = await reader.read(); const decoded = TEXT_DECODER.decode(value); expect(decoded).to.eql("4hello"); @@ -363,6 +382,7 @@ describe("WebTransport", () => { success(engine, h3Server, done); }); + writer.write(Uint8Array.of(131)); writer.write(Uint8Array.of(1, 2, 3)); }); }); @@ -371,64 +391,49 @@ describe("WebTransport", () => { setup({}, async ({ engine, h3Server, socket, reader }) => { socket.send(Buffer.of(1, 2, 3)); - const { value } = await reader.read(); - expect(value).to.eql(Uint8Array.of(1, 2, 3)); - - success(engine, h3Server, done); - }); - }); - - it("should send some binary data (client to server) (with binary flag)", (done) => { - setup({}, async ({ engine, h3Server, socket, writer }) => { - socket.on("data", (data) => { - expect(Buffer.isBuffer(data)).to.be(true); - expect(data).to.eql(Buffer.of(48, 1, 2, 3)); - - success(engine, h3Server, done); - }); - - writer.write(Uint8Array.of(54)); - writer.write(Uint8Array.of(48, 1, 2, 3)); - }); - }); - - it("should send some binary data (server to client) (with binary flag)", (done) => { - setup({}, async ({ engine, h3Server, socket, reader }) => { - socket.send(Buffer.of(48, 1, 2, 3)); - const header = await reader.read(); - expect(header.value).to.eql(Uint8Array.of(54)); + expect(header.value).to.eql(Uint8Array.of(131)); const { value } = await reader.read(); - expect(value).to.eql(Uint8Array.of(48, 1, 2, 3)); + expect(value).to.eql(Uint8Array.of(1, 2, 3)); success(engine, h3Server, done); }); }); - it("should send some binary data (client to server) (binary flag)", (done) => { + it("should send some big binary data (client to server)", (done) => { setup({}, async ({ engine, h3Server, socket, writer }) => { + const payload = Buffer.allocUnsafe(1e6); + socket.on("data", (data) => { expect(Buffer.isBuffer(data)).to.be(true); - expect(data).to.eql(Buffer.of(54)); + expect(data).to.eql(payload); success(engine, h3Server, done); }); - writer.write(Uint8Array.of(54)); - writer.write(Uint8Array.of(54)); + writer.write(Uint8Array.of(255, 0, 0, 0, 0, 0, 15, 66, 64)); + writer.write(payload); }); }); - it("should send some binary data (server to client) (binary flag)", (done) => { + it("should send some big binary data (server to client)", (done) => { setup({}, async ({ engine, h3Server, socket, reader }) => { - socket.send(Buffer.of(54)); + const payload = Buffer.allocUnsafe(1e6); + + socket.send(payload); const header = await reader.read(); - expect(header.value).to.eql(Uint8Array.of(54)); + expect(header.value).to.eql( + Uint8Array.of(255, 0, 0, 0, 0, 0, 15, 66, 64) + ); - const { value } = await reader.read(); - expect(value).to.eql(Uint8Array.of(54)); + const chunk1 = await reader.read(); + // the size of the chunk is implementation-specific (maxDatagramSize) + expect(chunk1.value).to.eql(payload.slice(0, 1228)); + + const chunk2 = await reader.read(); + expect(chunk2.value).to.eql(payload.slice(1228, 2456)); success(engine, h3Server, done); });