Skip to content

Commit

Permalink
refactor: improve types
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Jun 21, 2024
1 parent 362bc78 commit c310b7b
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 190 deletions.
2 changes: 0 additions & 2 deletions lib/engine.io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export const protocol = parser.protocol;
* @param {Function} callback
* @param {Object} options
* @return {Server} websocket.io server
* @api public
*/

function listen(port, options: AttachOptions & ServerOptions, fn) {
Expand Down Expand Up @@ -46,7 +45,6 @@ function listen(port, options: AttachOptions & ServerOptions, fn) {
* @param {http.Server} server
* @param {Object} options
* @return {Server} engine server
* @api public
*/

function attach(server, options: AttachOptions & ServerOptions) {
Expand Down
75 changes: 36 additions & 39 deletions lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import type { CorsOptions, CorsOptionsDelegate } from "cors";
import type { Duplex } from "stream";
import { WebTransport } from "./transports/webtransport";
import { createPacketDecoderStream } from "engine.io-parser";
import type { EngineRequest } from "./transport";

const debug = debugModule("engine");

const kResponseHeaders = Symbol("responseHeaders");

type Transport = "polling" | "websocket";
type Transport = "polling" | "websocket" | "webtransport";

export interface AttachOptions {
/**
Expand Down Expand Up @@ -169,7 +170,6 @@ export abstract class BaseServer extends EventEmitter {
* Server constructor.
*
* @param {Object} opts - options
* @api public
*/
constructor(opts: ServerOptions = {}) {
super();
Expand Down Expand Up @@ -246,21 +246,25 @@ export abstract class BaseServer extends EventEmitter {
* Returns a list of available transports for upgrade given a certain transport.
*
* @return {Array}
* @api public
*/
public upgrades(transport) {
public upgrades(transport: string) {
if (!this.opts.allowUpgrades) return [];
return transports[transport].upgradesTo || [];
}

/**
* Verifies a request.
*
* @param {http.IncomingMessage}
* @return {Boolean} whether the request is valid
* @api private
* @param {EngineRequest} req
* @param upgrade - whether it's an upgrade request
* @param fn
* @protected
*/
protected verify(req, upgrade, fn) {
protected verify(
req: any,
upgrade: boolean,
fn: (errorCode?: number, errorContext?: any) => void
) {
// transport check
const transport = req._query.transport;
// WebTransport does not go through the verify() method, see the onWebTransportSession() method
Expand Down Expand Up @@ -384,8 +388,6 @@ export abstract class BaseServer extends EventEmitter {

/**
* Closes all clients.
*
* @api public
*/
public close() {
debug("closing all open clients");
Expand All @@ -404,23 +406,26 @@ export abstract class BaseServer extends EventEmitter {
* generate a socket id.
* Overwrite this method to generate your custom socket id
*
* @param {Object} request object
* @api public
* @param {IncomingMessage} req - the request object
*/
public generateId(req) {
public generateId(req: IncomingMessage) {
return base64id.generateId();
}

/**
* Handshakes a new client.
*
* @param {String} transport name
* @param {Object} request object
* @param {String} transportName
* @param {Object} req - the request object
* @param {Function} closeConnection
*
* @api protected
* @protected
*/
protected async handshake(transportName, req, closeConnection) {
protected async handshake(
transportName: string,
req: any,
closeConnection: (errorCode?: number, errorContext?: any) => void
) {
const protocol = req._query.EIO === "4" ? 4 : 3; // 3rd revision by default
if (protocol === 3 && !this.opts.allowEIO3) {
debug("unsupported protocol version");
Expand Down Expand Up @@ -661,7 +666,7 @@ export class Server extends BaseServer {
/**
* Initialize websocket server
*
* @api protected
* @protected
*/
protected init() {
if (!~this.opts.transports.indexOf("websocket")) return;
Expand Down Expand Up @@ -708,30 +713,30 @@ export class Server extends BaseServer {
/**
* Prepares a request by processing the query string.
*
* @api private
* @private
*/
private prepare(req) {
private prepare(req: EngineRequest) {
// try to leverage pre-existing `req._query` (e.g: from connect)
if (!req._query) {
req._query = ~req.url.indexOf("?") ? qs.parse(parse(req.url).query) : {};
req._query = (
~req.url.indexOf("?") ? qs.parse(parse(req.url).query) : {}
) as Record<string, string>;
}
}

protected createTransport(transportName, req) {
protected createTransport(transportName: string, req: IncomingMessage) {
return new transports[transportName](req);
}

/**
* Handles an Engine.IO HTTP request.
*
* @param {IncomingMessage} req
* @param {EngineRequest} req
* @param {ServerResponse} res
* @api public
*/
public handleRequest(req: IncomingMessage, res: ServerResponse) {
public handleRequest(req: EngineRequest, res: ServerResponse) {
debug('handling "%s" http request "%s"', req.method, req.url);
this.prepare(req);
// @ts-ignore
req.res = res;

const callback = (errorCode, errorContext) => {
Expand All @@ -746,15 +751,12 @@ export class Server extends BaseServer {
return;
}

// @ts-ignore
if (req._query.sid) {
debug("setting new request for existing client");
// @ts-ignore
this.clients[req._query.sid].transport.onRequest(req);
} else {
const closeConnection = (errorCode, errorContext) =>
abortRequest(res, errorCode, errorContext);
// @ts-ignore
this.handshake(req._query.transport, req, closeConnection);
}
};
Expand All @@ -770,11 +772,9 @@ export class Server extends BaseServer {

/**
* Handles an Engine.IO HTTP Upgrade.
*
* @api public
*/
public handleUpgrade(
req: IncomingMessage,
req: EngineRequest,
socket: Duplex,
upgradeHead: Buffer
) {
Expand Down Expand Up @@ -819,7 +819,7 @@ export class Server extends BaseServer {
* Called upon a ws.io connection.
*
* @param {ws.Socket} websocket
* @api private
* @private
*/
private onWebSocket(req, socket, websocket) {
websocket.on("error", onUpgradeError);
Expand Down Expand Up @@ -877,7 +877,6 @@ export class Server extends BaseServer {
*
* @param {http.Server} server
* @param {Object} options
* @api public
*/
public attach(server: HttpServer, options: AttachOptions = {}) {
const path = this._computePath(options);
Expand All @@ -898,7 +897,7 @@ export class Server extends BaseServer {
server.on("request", (req, res) => {
if (check(req)) {
debug('intercepting request for path "%s"', path);
this.handleRequest(req, res);
this.handleRequest(req as EngineRequest, res);
} else {
let i = 0;
const l = listeners.length;
Expand All @@ -911,7 +910,7 @@ export class Server extends BaseServer {
if (~this.opts.transports.indexOf("websocket")) {
server.on("upgrade", (req, socket, head) => {
if (check(req)) {
this.handleUpgrade(req, socket, head);
this.handleUpgrade(req as EngineRequest, socket, head);
} else if (false !== options.destroyUpgrade) {
// default node behavior is to disconnect when no handlers
// but by adding a handler, we prevent that
Expand Down Expand Up @@ -939,7 +938,7 @@ export class Server extends BaseServer {
* @param errorCode - the error code
* @param errorContext - additional error context
*
* @api private
* @private
*/

function abortRequest(res, errorCode, errorContext) {
Expand All @@ -964,8 +963,6 @@ function abortRequest(res, errorCode, errorContext) {
* @param {net.Socket} socket
* @param {string} errorCode - the error code
* @param {object} errorContext - additional error context
*
* @api private
*/

function abortUpgrade(
Expand Down
Loading

0 comments on commit c310b7b

Please sign in to comment.