diff --git a/packages/engine.io-client/lib/socket.ts b/packages/engine.io-client/lib/socket.ts index 761ab71e7a..a4968a9fb6 100644 --- a/packages/engine.io-client/lib/socket.ts +++ b/packages/engine.io-client/lib/socket.ts @@ -10,6 +10,7 @@ import { CookieJar, createCookieJar, defaultBinaryType, + nextTick, } from "./globals.node.js"; import debugModule from "debug"; // debug() @@ -318,6 +319,8 @@ export class SocketWithoutUpgrade extends Emitter< private _pingTimeout: number = -1; private _maxPayload?: number = -1; private _pingTimeoutTimer: NodeJS.Timer; + private _pingTimeoutTime: number | null = null; + private _scheduledDisconnectFromIsResponsive = false; private clearTimeoutFn: typeof clearTimeout; private readonly _beforeunloadEventListener: () => void; private readonly _offlineEventListener: () => void; @@ -628,9 +631,13 @@ export class SocketWithoutUpgrade extends Emitter< */ private _resetPingTimeout() { this.clearTimeoutFn(this._pingTimeoutTimer); + if (this._pingInterval <= 0 || this._pingTimeout <= 0) return; + + const delay = this._pingInterval + this._pingTimeout; + this._pingTimeoutTime = Date.now() + delay; this._pingTimeoutTimer = this.setTimeoutFn(() => { this._onClose("ping timeout"); - }, this._pingInterval + this._pingTimeout); + }, delay); if (this.opts.autoUnref) { this._pingTimeoutTimer.unref(); } @@ -708,6 +715,32 @@ export class SocketWithoutUpgrade extends Emitter< return this.writeBuffer; } + /** + * Returns `true` if the connection is responding to heartbeats. + * + * If heartbeats are disabled this will always return `true`. + * + * @return {boolean} + */ + public isResponsive() { + if (this.readyState === "closed") return false; + if (this._pingTimeoutTime === null) return true; + + const responsive = Date.now() < this._pingTimeoutTime; + if (!responsive && !this._scheduledDisconnectFromIsResponsive) { + debug( + "`isResponsive` detected missed ping so scheduling connection close" + ); + this._scheduledDisconnectFromIsResponsive = true; + + nextTick(() => { + this._onClose("ping timeout"); + }, this.setTimeoutFn); + } + + return responsive; + } + /** * Sends a message. * @@ -717,6 +750,9 @@ export class SocketWithoutUpgrade extends Emitter< * @return {Socket} for chaining. */ public write(msg: RawData, options?: WriteOptions, fn?: () => void) { + // this will schedule a disconnection on next tick if heartbeat missed + this.isResponsive(); + this._sendPacket("message", msg, options, fn); return this; } @@ -730,8 +766,7 @@ export class SocketWithoutUpgrade extends Emitter< * @return {Socket} for chaining. */ public send(msg: RawData, options?: WriteOptions, fn?: () => void) { - this._sendPacket("message", msg, options, fn); - return this; + return this.write(msg, options, fn); } /** @@ -858,6 +893,7 @@ export class SocketWithoutUpgrade extends Emitter< // clear timers this.clearTimeoutFn(this._pingTimeoutTimer); + this._pingTimeoutTime = null; // stop event from firing again for transport this.transport.removeAllListeners("close"); diff --git a/packages/socket.io-client/lib/manager.ts b/packages/socket.io-client/lib/manager.ts index 6f7c61c094..77fc55f1da 100644 --- a/packages/socket.io-client/lib/manager.ts +++ b/packages/socket.io-client/lib/manager.ts @@ -492,6 +492,17 @@ export class Manager< this._close(); } + /** + * Returns `true` if the connection is responding to heartbeats. + * + * If heartbeats are disabled this will always return `true`. + * + * @return {boolean} + */ + isResponsive() { + return this.engine.isResponsive ? this.engine.isResponsive() : true; + } + /** * Writes a packet. * diff --git a/packages/socket.io-client/lib/socket.ts b/packages/socket.io-client/lib/socket.ts index e65ab4dd83..b8cedfbc2a 100644 --- a/packages/socket.io-client/lib/socket.ts +++ b/packages/socket.io-client/lib/socket.ts @@ -449,7 +449,10 @@ export class Socket< this.flags.volatile && (!isTransportWritable || !this.connected); if (discardPacket) { debug("discard packet as the transport is not currently writable"); - } else if (this.connected) { + } else if ( + this.connected && + (this.flags.volatile || this.io.isResponsive()) + ) { this.notifyOutgoingListeners(packet); this.packet(packet); } else {