From 8a48ee7cdf92a1659524876cd25457fd51fbfeb3 Mon Sep 17 00:00:00 2001 From: Cacie Prins Date: Tue, 16 Jul 2024 09:46:40 -0400 Subject: [PATCH] fix: Unhandled "WebSocket connection closed" when CDP connection is unstable (#29830) * unit and integration tests that reproduce websocket disconnected unhandled exception * WIP: command queue * complete command queue and retry refactor * cri-client changes pass tests; modify certain tests for readability and accuracy * removes unnecessary logic from command queue, adds unit tests for command queue * rm unused cdp state - this should be reserved for future refactor * small edits to cri-client: better error handling, more comprehensive comments * comment re: queue property * rearrange cri client member methods for readability * further edits * Changelog * Update cli/CHANGELOG.md Co-authored-by: Mike McCready <66998419+MikeMcC399@users.noreply.github.com> * fix continuous retry on close * split heavier debugs to verbose --------- Co-authored-by: Mike McCready <66998419+MikeMcC399@users.noreply.github.com> Co-authored-by: Jennifer Shehane --- cli/CHANGELOG.md | 1 + .../server/lib/browsers/cdp-command-queue.ts | 86 +++++ packages/server/lib/browsers/cri-client.ts | 359 ++++++++++-------- packages/server/test/integration/cdp_spec.ts | 330 ++++++++++------ .../unit/browsers/cdp-command-queue_spec.ts | 163 ++++++++ .../test/unit/browsers/cri-client_spec.ts | 19 +- 6 files changed, 699 insertions(+), 259 deletions(-) create mode 100644 packages/server/lib/browsers/cdp-command-queue.ts create mode 100644 packages/server/test/unit/browsers/cdp-command-queue_spec.ts diff --git a/cli/CHANGELOG.md b/cli/CHANGELOG.md index fd900cc81e48..ab3ed1a07e9c 100644 --- a/cli/CHANGELOG.md +++ b/cli/CHANGELOG.md @@ -5,6 +5,7 @@ _Released 7/16/2024 (PENDING)_ **Bugfixes:** +- Fixed an issue where unhandled `WebSocket connection closed` exceptions would be thrown when CDP connections rapidly connect, disconnect, and connect again while there are pending commands. Fixes [#29572](https://github.com/cypress-io/cypress/issues/29572). - CLI output properly displays non-JSON response bodies when a Test Replay upload attempt returns a non-JSON response body for a non-200 status code. Addressed in [#29801](https://github.com/cypress-io/cypress/pull/29801). - Fixed an issue where the ReadStream used to upload a Test Replay recording could erroneously be re-used when retrying in cases of retryable upload failures. Fixes [#29227](https://github.com/cypress-io/cypress/issues/29227). - Fixed an issue where command snapshots were not being captured within the `cy.origin()` command within Test Replay. Addressed in [#29828](https://github.com/cypress-io/cypress/pull/29828). diff --git a/packages/server/lib/browsers/cdp-command-queue.ts b/packages/server/lib/browsers/cdp-command-queue.ts new file mode 100644 index 000000000000..e79904e4960f --- /dev/null +++ b/packages/server/lib/browsers/cdp-command-queue.ts @@ -0,0 +1,86 @@ +import type ProtocolMapping from 'devtools-protocol/types/protocol-mapping' +import pDefer, { DeferredPromise } from 'p-defer' +import type { CdpCommand } from './cdp_automation' +import Debug from 'debug' + +const debug = Debug('cypress:server:browsers:cdp-command-queue') +const debugVerbose = Debug('cypress:server:browsers:cd-command-queue') + +type CommandReturn = ProtocolMapping.Commands[T]['returnType'] + +export type Command = { + command: T + params?: object + deferred: DeferredPromise> + sessionId?: string +} + +export class CDPCommandQueue { + private queue: Command[] = [] + + public get entries () { + return [...this.queue] + } + + public add ( + command: TCmd, + params: ProtocolMapping.Commands[TCmd]['paramsType'][0], + sessionId?: string, + ): Promise> { + debug('enqueing command %s', command) + debugVerbose('enqueing command %s with params %o', command, params) + + const deferred = pDefer>() + + const commandPackage: Command = { + command, + params, + deferred, + sessionId, + } + + this.queue.push(commandPackage) + + debug('Command enqueued; new length: %d', this.queue.length) + debugVerbose('Queue Contents: %O', this.queue) + + return deferred.promise + } + + public clear () { + debug('clearing command queue') + this.queue = [] + } + + public extract (search: Partial>): Command | undefined { + // this should find, remove, and return if found a given command + + const index = this.queue.findIndex((enqueued) => { + for (let k of Object.keys(search)) { + if (search[k] !== enqueued[k]) { + return false + } + } + + return true + }) + + debug('extracting %o from commands at index %d', search, index) + + if (index === -1) { + return undefined + } + + const [extracted] = this.queue.splice(index, 1) + + return extracted + } + + public shift () { + return this.queue.shift() + } + + public unshift (value: Command) { + return this.queue.unshift(value) + } +} diff --git a/packages/server/lib/browsers/cri-client.ts b/packages/server/lib/browsers/cri-client.ts index 21c22bac2a04..595fbeb09b08 100644 --- a/packages/server/lib/browsers/cri-client.ts +++ b/packages/server/lib/browsers/cri-client.ts @@ -2,6 +2,8 @@ import CDP from 'chrome-remote-interface' import debugModule from 'debug' import _ from 'lodash' import * as errors from '../errors' +import { CDPCommandQueue } from './cdp-command-queue' +import { asyncRetry } from '../util/async_retry' import type ProtocolMapping from 'devtools-protocol/types/protocol-mapping' import type EventEmitter from 'events' import type WebSocket from 'ws' @@ -59,6 +61,15 @@ interface CDPClient extends CDP.Client { _ws: WebSocket } +const ConnectionClosedKind: 'CONNECTION_CLOSED' = 'CONNECTION_CLOSED' + +class ConnectionClosedError extends Error { + public readonly kind = ConnectionClosedKind + static isConnectionClosedError (err: Error & { kind?: any }): err is ConnectionClosedError { + return err.kind === ConnectionClosedKind + } +} + export const DEFAULT_NETWORK_ENABLE_OPTIONS = { maxTotalBufferSize: 0, maxResourceBufferSize: 0, @@ -178,6 +189,7 @@ type CreateParams = { fullyManageTabs?: boolean browserClient?: ICriClient onReconnectAttempt?: (retryIndex: number) => void + onCriConnectionClosed?: () => void } export class CriClient implements ICriClient { @@ -185,6 +197,8 @@ export class CriClient implements ICriClient { private enableCommands: EnableCommand[] = [] private enqueuedCommands: EnqueuedCommand[] = [] + private _commandQueue: CDPCommandQueue = new CDPCommandQueue() + private _closed = false private _connected = false private crashed = false @@ -202,28 +216,9 @@ export class CriClient implements ICriClient { private fullyManageTabs?: boolean, private browserClient?: ICriClient, private onReconnectAttempt?: (retryIndex: number) => void, + private onCriConnectionClosed?: () => void, ) {} - get ws () { - return this.cri!._ws - } - - get queue () { - return { - enableCommands: this.enableCommands, - enqueuedCommands: this.enqueuedCommands, - subscriptions: this.subscriptions, - } - } - - get closed () { - return this._closed - } - - get connected () { - return this._connected - } - static async create ({ target, onAsynchronousError, @@ -234,145 +229,40 @@ export class CriClient implements ICriClient { fullyManageTabs, browserClient, onReconnectAttempt, + onCriConnectionClosed, }: CreateParams): Promise { - const newClient = new CriClient(target, onAsynchronousError, host, port, onReconnect, protocolManager, fullyManageTabs, browserClient, onReconnectAttempt) + const newClient = new CriClient(target, onAsynchronousError, host, port, onReconnect, protocolManager, fullyManageTabs, browserClient, onReconnectAttempt, onCriConnectionClosed) await newClient.connect() return newClient } - private async reconnect (retryIndex: number = 0) { - this._connected = false - - if (this.closed) { - debug('disconnected, not reconnecting because client is closed %o', { closed: this.closed, target: this.targetId }) - this.enqueuedCommands = [] - - return - } - - this.onReconnectAttempt?.(retryIndex) - - debug('disconnected, attempting to reconnect... %o', { retryIndex, closed: this.closed, target: this.targetId }) - - await this.connect() - - debug('restoring subscriptions + running *.enable and queued commands... %o', { subscriptions: this.subscriptions, enableCommands: this.enableCommands, enqueuedCommands: this.enqueuedCommands, target: this.targetId }) - - this.subscriptions.forEach((sub) => { - this.cri?.on(sub.eventName, sub.cb as any) - }) - - // '*.enable' commands need to be resent on reconnect or any events in - // that namespace will no longer be received - await Promise.all(this.enableCommands.map(async ({ command, params, sessionId }) => { - // these commands may have been enqueued, so we need to resolve those promises and remove - // them from the queue when we send here - const isInFlightCommand = (candidate: EnqueuedCommand) => { - return candidate.command === command && candidate.params === params && candidate.sessionId === sessionId - } - const enqueued = this.enqueuedCommands.find(isInFlightCommand) - - try { - const response = await this.cri?.send(command, params, sessionId) - - enqueued?.p.resolve(response) - } catch (e) { - enqueued?.p.reject(e) - } finally { - this.enqueuedCommands = this.enqueuedCommands.filter((candidate) => { - return !isInFlightCommand(candidate) - }) - } - })) - - this.enqueuedCommands.forEach((cmd) => { - this.cri!.send(cmd.command, cmd.params, cmd.sessionId).then(cmd.p.resolve as any, cmd.p.reject as any) - }) - - this.enqueuedCommands = [] - - if (this.onReconnect) { - this.onReconnect(this) - } - - // When CDP disconnects, it will automatically reconnect and re-apply various subscriptions - // (e.g. DOM.enable, Network.enable, etc.). However, we need to restart tracking DOM mutations - // from scratch. We do this by capturing a brand new full snapshot of the DOM. - await this.protocolManager?.cdpReconnect() + get ws () { + return this.cri!._ws } - private retryReconnect = async () => { - if (this.reconnection) { - debug('reconnection in progress; not starting new process, returning promise for in-flight reconnection attempt') - - return this.reconnection - } - - debug('disconnected, starting retries to reconnect... %o', { closed: this.closed, target: this.targetId }) - - const retry = async (retryIndex = 0) => { - retryIndex++ - - try { - const attempt = await this.reconnect(retryIndex) - - this.reconnection = undefined - - return attempt - } catch (err) { - if (this.closed) { - debug('could not reconnect because client is closed %o', { closed: this.closed, target: this.targetId }) - - this.enqueuedCommands = [] - - return - } - - debug('could not reconnect, retrying... %o', { closed: this.closed, target: this.targetId, err }) - - if (retryIndex < 20) { - await new Promise((resolve) => setTimeout(resolve, 100)) - - return retry(retryIndex) + // this property is accessed in a couple different places, but should be refactored to be + // private - queues are internal to this class, and should not be exposed + get queue () { + return { + enableCommands: this.enableCommands, + enqueuedCommands: this._commandQueue.entries.map((entry) => { + return { + ...entry, + p: entry.deferred, } - - const cdpError = errors.get('CDP_COULD_NOT_RECONNECT', err) - - // If we cannot reconnect to CDP, we will be unable to move to the next set of specs since we use CDP to clean up and close tabs. Marking this as fatal - cdpError.isFatalApiErr = true - this.reconnection = undefined - this.onAsynchronousError(cdpError) - } + }), + subscriptions: this.subscriptions, } - - this.reconnection = retry() - - return this.reconnection } - private enqueueCommand ( - command: TCmd, - params: ProtocolMapping.Commands[TCmd]['paramsType'][0], - sessionId?: string, - ): Promise { - return new Promise((resolve, reject) => { - const obj: EnqueuedCommand = { - command, - p: { resolve, reject }, - } - - if (params) { - obj.params = params - } - - if (sessionId) { - obj.sessionId = sessionId - } + get closed () { + return this._closed + } - this.enqueuedCommands.push(obj) - }) + get connected () { + return this._connected } public connect = async () => { @@ -412,7 +302,7 @@ export class CriClient implements ICriClient { // that we don't want to reconnect on && !process.env.CYPRESS_INTERNAL_E2E_TESTING_SELF ) { - this.cri.on('disconnect', this.retryReconnect) + this.cri.on('disconnect', this._reconnect) } // We're only interested in child target traffic. Browser cri traffic is @@ -498,9 +388,9 @@ export class CriClient implements ICriClient { debug('error classified as WEBSOCKET_NOT_OPEN_RE; enqueuing and attempting to reconnect') - const p = this.enqueueCommand(command, params, sessionId) + const p = this._enqueueCommand(command, params, sessionId) - await this.retryReconnect() + await this._reconnect() // if enqueued commands were wiped out from the reconnect and the socket is already closed, reject the command as it will never be run if (this.enqueuedCommands.length === 0 && this.closed) { @@ -513,7 +403,7 @@ export class CriClient implements ICriClient { } } - return this.enqueueCommand(command, params, sessionId) + return this._enqueueCommand(command, params, sessionId) } public on = (eventName: T, cb: (data: ProtocolMapping.Events[T][0], sessionId?: string) => void) => { @@ -541,6 +431,7 @@ export class CriClient implements ICriClient { } public close = async () => { + debug('closing') if (this._closed) { debug('not closing, cri client is already closed %o', { closed: this._closed, target: this.targetId }) @@ -557,6 +448,176 @@ export class CriClient implements ICriClient { debug('error closing cri client targeting %s: %o', this.targetId, e) } finally { debug('closed cri client %o', { closed: this._closed, target: this.targetId }) + if (this.onCriConnectionClosed) { + this.onCriConnectionClosed() + } + } + } + + private _enqueueCommand ( + command: TCmd, + params: ProtocolMapping.Commands[TCmd]['paramsType'][0], + sessionId?: string, + ): Promise { + return this._commandQueue.add(command, params, sessionId) + } + + private _isConnectionError (error: Error) { + return WEBSOCKET_NOT_OPEN_RE.test(error.message) + } + + private _reconnect = async () => { + debug('preparing to reconnect') + if (this.reconnection) { + debug('not reconnecting as there is an active reconnection attempt') + + return this.reconnection + } + + this._connected = false + + if (this._closed) { + debug('Target %s disconnected, not reconnecting because client is closed.', this.targetId) + this._commandQueue.clear() + + return + } + + let attempt = 1 + + try { + this.reconnection = asyncRetry(() => { + if (this._closed) { + throw new ConnectionClosedError('Reconnection halted due to a closed client.') + } + + this.onReconnectAttempt?.(attempt) + attempt++ + + return this.connect() + }, { + maxAttempts: 20, + retryDelay: () => 100, + shouldRetry: (err) => { + debug('error while reconnecting to Target %s: %o', this.targetId, err) + if (err && ConnectionClosedError.isConnectionClosedError(err)) { + return false + } + + debug('Retying reconnection attempt') + + return true + }, + })() + + await this.reconnection + this.reconnection = undefined + debug('reconnected') + } catch (err) { + debug('error(s) on reconnecting: ', err) + const significantError: Error = err.errors ? (err as AggregateError).errors[err.errors.length - 1] : err + + const retryHaltedDueToClosed = ConnectionClosedError.isConnectionClosedError(err) || + (err as AggregateError)?.errors?.find((predicate) => ConnectionClosedError.isConnectionClosedError(predicate)) + + if (!retryHaltedDueToClosed) { + const cdpError = errors.get('CDP_COULD_NOT_RECONNECT', significantError) + + cdpError.isFatalApiErr = true + this.reconnection = undefined + this._commandQueue.clear() + this.onAsynchronousError(cdpError) + } + + // do not re-throw; error handling is done via onAsynchronousError + return + } + + try { + await this._restoreState() + await this._drainCommandQueue() + + await this.protocolManager?.cdpReconnect() + } catch (e) { + if (this._isConnectionError(e)) { + return this._reconnect() + } + + throw e + } + + // previous timing of this had it happening before subscriptions/enablements were restored, + // and before any enqueued commands were sent. This made testing problematic. Changing the + // timing may have implications for browsers that wish to update frame tree - that process + // will now be kicked off after state restoration & pending commands, rather then before. + // This warrants extra scrutiny in tests. (convert to PR comment) + if (this.onReconnect) { + this.onReconnect(this) + } + } + + private async _restoreState () { + debug('resubscribing to %d subscriptions', this.subscriptions.length) + + this.subscriptions.forEach((sub) => { + this.cri?.on(sub.eventName, sub.cb as any) + }) + + // '*.enable' commands need to be resent on reconnect or any events in + // that namespace will no longer be received + debug('re-enabling %d enablements', this.enableCommands.length) + await Promise.all(this.enableCommands.map(async ({ command, params, sessionId }) => { + // these commands may have been enqueued, so we need to resolve those promises and remove + // them from the queue when we send here + const inFlightCommand = this._commandQueue.extract({ command, params, sessionId }) + + try { + const response = await this.cri?.send(command, params, sessionId) + + inFlightCommand?.deferred.resolve(response) + } catch (err) { + debug('error re-enabling %s: ', command, err) + if (this._isConnectionError(err)) { + // Connection errors are thrown here so that a reconnection attempt + // can be made. + throw err + } else { + // non-connection errors are appropriate for rejecting the original command promise + inFlightCommand?.deferred.reject(err) + } + } + })) + } + + private async _drainCommandQueue () { + debug('sending %d enqueued commands', this._commandQueue.entries.length) + while (this._commandQueue.entries.length) { + const enqueued = this._commandQueue.shift() + + if (!enqueued) { + return + } + + try { + debug('sending enqueued command %s', enqueued.command) + const response = await this.cri!.send(enqueued.command, enqueued.params, enqueued.sessionId) + + debug('sent command, received ', { response }) + enqueued.deferred.resolve(response) + debug('resolved enqueued promise') + } catch (e) { + debug('enqueued command %s failed:', enqueued.command, e) + if (this._isConnectionError(e)) { + // similar to restoring state, connection errors are re-thrown so that + // the connection can be restored. The command is queued for re-delivery + // upon reconnect. + debug('re-enqueuing command and re-throwing') + this._commandQueue.unshift(enqueued) + throw e + } else { + enqueued.deferred.reject(e) + } + } } } } diff --git a/packages/server/test/integration/cdp_spec.ts b/packages/server/test/integration/cdp_spec.ts index 2210da30cad7..2bd2b5bd4422 100644 --- a/packages/server/test/integration/cdp_spec.ts +++ b/packages/server/test/integration/cdp_spec.ts @@ -6,7 +6,7 @@ import WebSocket from 'ws' import { CdpCommand, CdpEvent } from '../../lib/browsers/cdp_automation' import { CriClient } from '../../lib/browsers/cri-client' import { expect, nock } from '../spec_helper' - +import pDefer from 'p-defer' import sinon from 'sinon' // import Bluebird from 'bluebird' @@ -20,11 +20,6 @@ type CDPCommands = { params?: object } -type CDPSubscriptions = { - eventName: CdpEvent - cb: () => void -} - type OnWSConnection = (wsClient: WebSocket) => void describe('CDP Clients', () => { @@ -34,6 +29,8 @@ describe('CDP Clients', () => { let criClient: CriClient let messages: object[] let onMessage: sinon.SinonStub + let messageResponse: ReturnType + let neverAck: boolean const startWsServer = async (onConnection?: OnWSConnection): Promise => { return new Promise((resolve, reject) => { @@ -48,14 +45,26 @@ describe('CDP Clients', () => { // eslint-disable-next-line no-console ws.on('error', console.error) - ws.on('message', (data) => { + ws.on('message', async (data) => { const msg = JSON.parse(data.toString()) messages.push(msg) onMessage(msg) + if (neverAck) { + return + } + // ACK back if we have a msg.id if (msg.id) { + if (messageResponse) { + const message = await messageResponse.promise + + ws.send(JSON.stringify({ id: msg.id, result: message })) + + return + } + ws.send(JSON.stringify({ id: msg.id, result: {}, @@ -93,6 +102,7 @@ describe('CDP Clients', () => { } beforeEach(async () => { + messageResponse = undefined messages = [] onMessage = sinon.stub() @@ -103,6 +113,7 @@ describe('CDP Clients', () => { }) afterEach(async () => { + debug('after each,', !!wsSrv) await criClient.close().catch(() => { }) await closeWsServer() }) @@ -165,129 +176,232 @@ describe('CDP Clients', () => { }) }) - it('restores sending enqueued commands, subscriptions, and enable commands on reconnect', () => { - const enableCommands: CDPCommands[] = [ - { command: 'Page.enable', params: {} }, - { command: 'Network.enable', params: {} }, - { command: 'Runtime.addBinding', params: { name: 'foo' } }, - { command: 'Target.setDiscoverTargets', params: { discover: true } }, - ] + it('stops trying to reconnect if .close() is called, and does not trigger an async error', async () => { + const stub = sinon.stub() + const onCriConnectionClosed = sinon.stub() + const haltedReconnection = new Promise(async (resolve, reject) => { + onCriConnectionClosed.callsFake(resolve) + criClient = await CriClient.create({ + target: `ws://127.0.0.1:${wsServerPort}`, + onAsynchronousError: reject, + onReconnect: reject, + onReconnectAttempt: stub, + onCriConnectionClosed, + }) - const enqueuedCommands: CDPCommands[] = [ - { command: 'Page.navigate', params: { url: 'about:blank' } }, - { command: 'Performance.getMetrics', params: {} }, - ] + await Promise.all([ + clientDisconnected(), + closeWsServer(), + ]) - const cb = sinon.stub() + criClient.close() + }) - const subscriptions: CDPSubscriptions[] = [ - { eventName: 'Network.requestWillBeSent', cb }, - { eventName: 'Network.responseReceived', cb }, - ] + await haltedReconnection - let wsClient + expect(onCriConnectionClosed).to.have.been.called + }) - const stub = sinon.stub().onThirdCall().callsFake(async () => { + it('continuously re-sends commands that fail due to disconnect, until target is closed', async () => { + /** + * This test is specifically for the case when a CRIClient websocket trampolines, and + * enqueued messages fail due to a disconnected websocket. + * + * That happens if a command fails due to an in-flight disconnect, and then fails again + * after being enqueued due to an in-flight disconnect. + * + * The steps taken here to reproduce: + * 1. Connect to the websocket + * 2. Send the command, and wait for it to be received by the websocket (but not responded to) + * 3. Disconnect the websocket + * 4. Allow the websocket to be reconnected after 3 tries, and wait for successful reconnection + * 5. Wait for the command to be re-sent and received by the websocket (but not responded to) + * 6. Disconnect the websocket. + * 7. Allow the websocket to be reconnected after 3 tries, and wait for successful reconnection + * 8. Wait for the command to be resent, received, and responded to successfully. + */ + neverAck = true + const command: CDPCommands = { + command: 'DOM.getDocument', + params: { depth: -1 }, + } + let reconnectPromise = pDefer() + let commandSent = pDefer() + const reconnectOnThirdTry = sinon.stub().onThirdCall().callsFake(async () => { wsSrv = await startWsServer((ws) => { - wsClient = ws }) }) - const onReconnect = sinon.stub() + const onReconnect = sinon.stub().callsFake(() => { + reconnectPromise.resolve() + }) - return new Promise(async (resolve, reject) => { - const onAsynchronousError = reject + criClient = await CriClient.create({ + target: `ws://127.0.0.1:${wsServerPort}`, + onAsynchronousError: (e) => commandSent.reject(e), + onReconnect, + onReconnectAttempt: reconnectOnThirdTry, + }) - criClient = await CriClient.create({ - target: `ws://127.0.0.1:${wsServerPort}`, - onAsynchronousError, - onReconnect, - onReconnectAttempt: stub, - }) + onMessage.callsFake(() => { + commandSent.resolve() + }) - const send = (commands: CDPCommands[]) => { - commands.forEach(({ command, params }) => { - criClient.send(command, params) - }) - } + const cmdExecution = criClient.send(command.command, command.params) - const on = (subscriptions: CDPSubscriptions[]) => { - subscriptions.forEach(({ eventName, cb }) => { - criClient.on(eventName, cb) - }) - } + await commandSent.promise + await Promise.all([clientDisconnected(), closeWsServer()]) - // send these in before we disconnect - send(enableCommands) + commandSent = pDefer() + onMessage.resetHistory() - await Promise.all([ - clientDisconnected(), - closeWsServer(), - ]) + reconnectOnThirdTry.resetHistory() - // expect 6 message calls - onMessage = sinon.stub().onCall(5).callsFake(resolve) + await commandSent.promise - // now enqueue these commands - send(enqueuedCommands) - on(subscriptions) + await Promise.all([clientDisconnected(), closeWsServer()]) - const { queue } = criClient + reconnectOnThirdTry.resetHistory() - // assert they're in the queue - expect(queue.enqueuedCommands).to.containSubset(enqueuedCommands) - expect(queue.enableCommands).to.containSubset(enableCommands) - expect(queue.subscriptions).to.containSubset(subscriptions.map(({ eventName, cb }) => { - return { - eventName, - cb: _.isFunction, - } - })) - }) - .then(() => { - const { queue } = criClient - - expect(queue.enqueuedCommands).to.be.empty - expect(queue.enableCommands).not.to.be.empty - expect(queue.subscriptions).not.to.be.empty - - const messageCalls = _ - .chain(onMessage.args) - .flatten() - .map(({ method, params }) => { - return { - command: method, - params: params ?? {}, - } - }) - .value() - - expect(onMessage).to.have.callCount(6) - expect(messageCalls).to.deep.eq( - _.concat( - enableCommands, - enqueuedCommands, - ), - ) - - return new Promise((resolve) => { - cb.onSecondCall().callsFake(resolve) - - wsClient.send(JSON.stringify({ - method: 'Network.requestWillBeSent', - params: { foo: 'bar' }, - })) - - wsClient.send(JSON.stringify({ - method: 'Network.responseReceived', - params: { baz: 'quux' }, - })) + reconnectPromise = pDefer() + + // set up response value + messageResponse = pDefer() + + neverAck = false + + messageResponse.resolve({ response: true }) + + const res: any = await cmdExecution + + expect(res.response).to.eq(true) + + expect(reconnectPromise.promise).to.be.fulfilled + }) + + it('reattaches subscriptions, reenables enablements, and sends pending commands on reconnect', async () => { + let reconnectPromise = pDefer() + let commandSent = pDefer() + let wsClient + const reconnectOnThirdTry = sinon.stub().onThirdCall().callsFake(async () => { + wsSrv = await startWsServer((ws) => { + wsClient = ws }) }) - .then(() => { - expect(cb.firstCall).to.be.calledWith({ foo: 'bar' }) - expect(cb.secondCall).to.be.calledWith({ baz: 'quux' }) + + const onReconnect = sinon.stub().callsFake(() => { + reconnectPromise.resolve() + }) + + criClient = await CriClient.create({ + target: `ws://127.0.0.1:${wsServerPort}`, + onAsynchronousError: (e) => commandSent.reject(e), + onReconnect, + onReconnectAttempt: reconnectOnThirdTry, }) + + onMessage.callsFake(() => { + commandSent.resolve() + }) + + const enablements: CDPCommands[] = [ + { command: 'Page.enable', params: {} }, + { command: 'Network.enable', params: {} }, + { command: 'Runtime.addBinding', params: { name: 'foo' } }, + { command: 'Target.setDiscoverTargets', params: { discover: true } }, + ] + + const networkRequestSubscription = { + eventName: 'Network.requestWillBeSent', + cb: sinon.stub(), + mockEvent: { foo: 'bar' }, + } + const networkResponseSubscription = { + eventName: 'Network.responseReceived', + cb: sinon.stub(), + mockEvent: { baz: 'quux' }, + } + + const subscriptions = [ + networkRequestSubscription, + networkResponseSubscription, + ] + + // initialize state + + for (const { command, params } of enablements) { + await criClient.send(command, params) + } + for (const { eventName, cb } of subscriptions) { + criClient.on(eventName as CdpEvent, cb) + } + + const commandsToEnqueue: (CDPCommands & { promise?: Promise })[] = [ + { command: 'Page.navigate', params: { url: 'about:blank' }, promise: undefined }, + { command: 'Performance.getMetrics', params: {}, promise: undefined }, + ] + + // prevent commands from resolving, for now + neverAck = true + // send each command, and wait for them to be sent (but not responded to) + for (let i = 0; i < commandsToEnqueue.length; i++) { + commandSent = pDefer() + const command = commandsToEnqueue[i] + + commandsToEnqueue[i].promise = criClient.send(command.command, command.params) + + await commandSent.promise + } + + onMessage.resetHistory() + // disconnect the websocket, causing enqueued commands to be enqueued + await Promise.all([clientDisconnected(), closeWsServer()]) + + // re-enable responses from underlying CDP + neverAck = false + + // CriClient should now retry to connect, and succeed on the third try. Wait for reconnect. + // this promise is resolved when: CDP is reconnected, state is restored, and queue is drained + await reconnectPromise.promise + + // onMessage call history was reset prior to reconnection - these are assertions about + // calls made after that reset + for (const { command, params } of enablements) { + /** + * sinon/sinon-chai's expect(stub).to.have.been.calledWith({ + * partial: object + * }) + * does not work as advertised, at least with our version of sinon/chai/sinon-chai. + * because the message id has the potential to be brittle, we want to assert that + * onmessage was called with a specific command and params, regardless of message id + */ + const sentArgs = onMessage.args.filter(([arg]) => { + return arg.method === command && _.isEqual(arg.params, params) + }) + + expect(sentArgs, `onMessage args for enqueued command ${command}`).to.have.lengthOf(1) + } + for (const { command, params } of commandsToEnqueue) { + const sentArgs = onMessage.args.filter(([{ method, params: p }]) => { + return method === command && _.isEqual(p, params) + }) + + expect(sentArgs, `onMessage args for enqueued command ${command}`).to.have.lengthOf(1) + } + // for full integration, send events that should be subscribed to, and expect that subscription + // callback to be called + for (const { eventName, cb, mockEvent } of subscriptions) { + const deferred = pDefer() + + cb.onFirstCall().callsFake(deferred.resolve) + wsClient.send(JSON.stringify({ + method: eventName, + params: mockEvent, + })) + + await deferred.promise + expect(cb).to.have.been.calledWith(mockEvent) + } }) it('stops reconnecting after close is called', () => { diff --git a/packages/server/test/unit/browsers/cdp-command-queue_spec.ts b/packages/server/test/unit/browsers/cdp-command-queue_spec.ts new file mode 100644 index 000000000000..dbb3f4db74cc --- /dev/null +++ b/packages/server/test/unit/browsers/cdp-command-queue_spec.ts @@ -0,0 +1,163 @@ +import { CDPCommandQueue, Command } from '../../../lib/browsers/cdp-command-queue' +import type ProtocolMapping from 'devtools-protocol/types/protocol-mapping' +import pDeferred from 'p-defer' +import _ from 'lodash' + +const { expect } = require('../../spec_helper') + +function matchCommand (search: Partial>) { + return (predicate: Partial>) => { + return _.isEqual(search.command, predicate.command) && _.isEqual(search.params, predicate.params) + } +} + +describe('CDPCommandQueue', () => { + const enableAnimation: { + command: 'Animation.enable' + params: undefined + } = { command: 'Animation.enable', params: undefined } + const removeAttribute: { + command: 'DOM.removeAttribute' + params: ProtocolMapping.Commands['DOM.removeAttribute']['paramsType'][0] + } = { command: 'DOM.removeAttribute', params: { name: 'attribute', nodeId: 123 } } + + describe('.entries', () => { + describe('when an entry is added', () => { + let queue: CDPCommandQueue + + beforeEach(() => { + queue = new CDPCommandQueue() + queue.add(enableAnimation.command, enableAnimation.params) + }) + + it('reflects only the entry that was added', () => { + expect(queue.entries.find(matchCommand(enableAnimation)), 'queue should contain enableAnimation').not.to.be.undefined + expect(queue.entries.length).to.eq(1) + }) + + describe('and another is added', () => { + beforeEach(() => { + queue.add(removeAttribute.command, removeAttribute.params) + }) + + it('reflects only the entries that have been added', () => { + expect(queue.entries.find(matchCommand(enableAnimation))).not.to.be.undefined + expect(queue.entries.find(matchCommand(removeAttribute))).not.to.be.undefined + expect(queue.entries).to.have.lengthOf(2) + }) + }) + + describe('and the is cleared', () => { + beforeEach(() => { + queue.clear() + }) + + it('has no entries', () => { + expect(queue.entries.find(matchCommand(enableAnimation))).to.be.undefined + expect(queue.entries).to.have.lengthOf(0) + }) + }) + }) + }) + + describe('.add', () => { + it('adds a command to the queue and returns a promise that is resolved when the command is resolved', () => { + const sessionId = '1234' + const queue = new CDPCommandQueue() + + const commandPromise = queue.add(enableAnimation.command, enableAnimation.params, sessionId) + const enqueued = queue.entries[0] + + expect(enqueued.command).to.eq(enableAnimation.command) + expect(_.isEqual(enqueued.params, enableAnimation.params), 'params are preserved').to.be.true + expect(enqueued.sessionId).to.eq(sessionId) + expect(enqueued.deferred).not.to.be.undefined + + const resolution = { value: true } + + enqueued.deferred.resolve(resolution) + expect(commandPromise).to.eventually.equal(resolution) + }) + }) + + describe('.clear', () => { + it('clears the queue', () => { + const queue = new CDPCommandQueue() + + queue.add(enableAnimation.command, enableAnimation.params) + queue.add(removeAttribute.command, removeAttribute.params) + expect(queue.entries).to.have.lengthOf(2) + queue.clear() + expect(queue.entries).to.have.lengthOf(0) + }) + }) + + describe('.extract', () => { + let queue: CDPCommandQueue + let searchCommand: Partial> + let addCommand: Partial> + + beforeEach(() => { + queue = new CDPCommandQueue() + }) + + describe('when the given search predicate exists in the queue', () => { + beforeEach(() => { + searchCommand = enableAnimation + addCommand = enableAnimation + }) + + it('returns the matching enqueued command, and removes it from the queue', () => { + queue.add(addCommand.command, addCommand.params) + const found = queue.extract(searchCommand) + + expect(found.command).to.eq(searchCommand.command) + expect(found.params).to.eq(searchCommand.params) + expect(queue.entries).to.have.lengthOf(0) + }) + }) + + describe('when the given search predicate does not exist in the queue', () => { + beforeEach(() => { + addCommand = removeAttribute + searchCommand = enableAnimation + }) + + it('returns undefined, and does not modify the queue', () => { + queue.add(addCommand.command, addCommand.params) + expect(queue.entries).to.have.lengthOf(1) + const found = queue.extract(searchCommand) + + expect(found).to.be.undefined + expect(queue.entries).to.have.lengthOf(1) + }) + }) + }) + + describe('.shift', () => { + it('removes and returns the entry from the beginning of the queue', () => { + const queue = new CDPCommandQueue() + + queue.add(enableAnimation.command, enableAnimation.params) + queue.add(removeAttribute.command, removeAttribute.params) + const next = queue.shift() + + expect(next.command).to.eq(enableAnimation.command) + expect(queue.entries).to.have.lengthOf(1) + }) + }) + + describe('.unshift', () => { + it('adds an entry to the front of the queue', () => { + const queue = new CDPCommandQueue() + + queue.add(enableAnimation.command, enableAnimation.params) + const deferred = pDeferred() + + queue.unshift({ + command: enableAnimation.command, + deferred, + }) + }) + }) +}) diff --git a/packages/server/test/unit/browsers/cri-client_spec.ts b/packages/server/test/unit/browsers/cri-client_spec.ts index 861a3dd351a2..f57291a80606 100644 --- a/packages/server/test/unit/browsers/cri-client_spec.ts +++ b/packages/server/test/unit/browsers/cri-client_spec.ts @@ -1,6 +1,7 @@ import EventEmitter from 'events' -import type { CriClient } from '../../../lib/browsers/cri-client' import { ProtocolManagerShape } from '@packages/types' +import type { CriClient } from '../../../lib/browsers/cri-client' + const { expect, proxyquire, sinon } = require('../../spec_helper') const DEBUGGER_URL = 'http://foo' @@ -98,8 +99,9 @@ describe('lib/browsers/cri-client', function () { 'WebSocket is not open', // @see https://github.com/cypress-io/cypress/issues/7180 'WebSocket is already in CLOSING or CLOSED state', + 'WebSocket connection closed', ]).forEach((msg) => { - it(`with '${msg}'`, async function () { + it(`with one '${msg}' message it retries once`, async function () { const err = new Error(msg) send.onFirstCall().rejects(err) @@ -111,6 +113,19 @@ describe('lib/browsers/cri-client', function () { expect(send).to.be.calledTwice }) + + it(`with two '${msg}' message it retries twice`, async () => { + const err = new Error(msg) + + send.onFirstCall().rejects(err) + send.onSecondCall().rejects(err) + send.onThirdCall().resolves() + + const client = await getClient() + + await client.send('DOM.getDocument', { depth: -1 }) + expect(send).to.have.callCount(3) + }) }) })