mirror of
https://github.com/cypress-io/cypress.git
synced 2026-01-30 19:18:45 -06:00
fix: Unhandled "WebSocket connection closed" when CDP connection is unstable (#29830)
* unit and integration tests that reproduce websocket disconnected unhandled exception * WIP: command queue * complete command queue and retry refactor * cri-client changes pass tests; modify certain tests for readability and accuracy * removes unnecessary logic from command queue, adds unit tests for command queue * rm unused cdp state - this should be reserved for future refactor * small edits to cri-client: better error handling, more comprehensive comments * comment re: queue property * rearrange cri client member methods for readability * further edits * Changelog * Update cli/CHANGELOG.md Co-authored-by: Mike McCready <66998419+MikeMcC399@users.noreply.github.com> * fix continuous retry on close * split heavier debugs to verbose --------- Co-authored-by: Mike McCready <66998419+MikeMcC399@users.noreply.github.com> Co-authored-by: Jennifer Shehane <jennifer@cypress.io>
This commit is contained in:
@@ -5,6 +5,7 @@ _Released 7/16/2024 (PENDING)_
|
||||
|
||||
**Bugfixes:**
|
||||
|
||||
- Fixed an issue where unhandled `WebSocket connection closed` exceptions would be thrown when CDP connections rapidly connect, disconnect, and connect again while there are pending commands. Fixes [#29572](https://github.com/cypress-io/cypress/issues/29572).
|
||||
- CLI output properly displays non-JSON response bodies when a Test Replay upload attempt returns a non-JSON response body for a non-200 status code. Addressed in [#29801](https://github.com/cypress-io/cypress/pull/29801).
|
||||
- Fixed an issue where the ReadStream used to upload a Test Replay recording could erroneously be re-used when retrying in cases of retryable upload failures. Fixes [#29227](https://github.com/cypress-io/cypress/issues/29227).
|
||||
- Fixed an issue where command snapshots were not being captured within the `cy.origin()` command within Test Replay. Addressed in [#29828](https://github.com/cypress-io/cypress/pull/29828).
|
||||
|
||||
86
packages/server/lib/browsers/cdp-command-queue.ts
Normal file
86
packages/server/lib/browsers/cdp-command-queue.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
import type ProtocolMapping from 'devtools-protocol/types/protocol-mapping'
|
||||
import pDefer, { DeferredPromise } from 'p-defer'
|
||||
import type { CdpCommand } from './cdp_automation'
|
||||
import Debug from 'debug'
|
||||
|
||||
const debug = Debug('cypress:server:browsers:cdp-command-queue')
|
||||
const debugVerbose = Debug('cypress:server:browsers:cd-command-queue')
|
||||
|
||||
type CommandReturn<T extends CdpCommand> = ProtocolMapping.Commands[T]['returnType']
|
||||
|
||||
export type Command<T extends CdpCommand> = {
|
||||
command: T
|
||||
params?: object
|
||||
deferred: DeferredPromise<CommandReturn<T>>
|
||||
sessionId?: string
|
||||
}
|
||||
|
||||
export class CDPCommandQueue {
|
||||
private queue: Command<any>[] = []
|
||||
|
||||
public get entries () {
|
||||
return [...this.queue]
|
||||
}
|
||||
|
||||
public add <TCmd extends CdpCommand> (
|
||||
command: TCmd,
|
||||
params: ProtocolMapping.Commands[TCmd]['paramsType'][0],
|
||||
sessionId?: string,
|
||||
): Promise<CommandReturn<TCmd>> {
|
||||
debug('enqueing command %s', command)
|
||||
debugVerbose('enqueing command %s with params %o', command, params)
|
||||
|
||||
const deferred = pDefer<CommandReturn<TCmd>>()
|
||||
|
||||
const commandPackage: Command<TCmd> = {
|
||||
command,
|
||||
params,
|
||||
deferred,
|
||||
sessionId,
|
||||
}
|
||||
|
||||
this.queue.push(commandPackage)
|
||||
|
||||
debug('Command enqueued; new length: %d', this.queue.length)
|
||||
debugVerbose('Queue Contents: %O', this.queue)
|
||||
|
||||
return deferred.promise
|
||||
}
|
||||
|
||||
public clear () {
|
||||
debug('clearing command queue')
|
||||
this.queue = []
|
||||
}
|
||||
|
||||
public extract<T extends CdpCommand> (search: Partial<Command<T>>): Command<T> | undefined {
|
||||
// this should find, remove, and return if found a given command
|
||||
|
||||
const index = this.queue.findIndex((enqueued) => {
|
||||
for (let k of Object.keys(search)) {
|
||||
if (search[k] !== enqueued[k]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
debug('extracting %o from commands at index %d', search, index)
|
||||
|
||||
if (index === -1) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
const [extracted] = this.queue.splice(index, 1)
|
||||
|
||||
return extracted
|
||||
}
|
||||
|
||||
public shift () {
|
||||
return this.queue.shift()
|
||||
}
|
||||
|
||||
public unshift (value: Command<any>) {
|
||||
return this.queue.unshift(value)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,8 @@ import CDP from 'chrome-remote-interface'
|
||||
import debugModule from 'debug'
|
||||
import _ from 'lodash'
|
||||
import * as errors from '../errors'
|
||||
import { CDPCommandQueue } from './cdp-command-queue'
|
||||
import { asyncRetry } from '../util/async_retry'
|
||||
import type ProtocolMapping from 'devtools-protocol/types/protocol-mapping'
|
||||
import type EventEmitter from 'events'
|
||||
import type WebSocket from 'ws'
|
||||
@@ -59,6 +61,15 @@ interface CDPClient extends CDP.Client {
|
||||
_ws: WebSocket
|
||||
}
|
||||
|
||||
const ConnectionClosedKind: 'CONNECTION_CLOSED' = 'CONNECTION_CLOSED'
|
||||
|
||||
class ConnectionClosedError extends Error {
|
||||
public readonly kind = ConnectionClosedKind
|
||||
static isConnectionClosedError (err: Error & { kind?: any }): err is ConnectionClosedError {
|
||||
return err.kind === ConnectionClosedKind
|
||||
}
|
||||
}
|
||||
|
||||
export const DEFAULT_NETWORK_ENABLE_OPTIONS = {
|
||||
maxTotalBufferSize: 0,
|
||||
maxResourceBufferSize: 0,
|
||||
@@ -178,6 +189,7 @@ type CreateParams = {
|
||||
fullyManageTabs?: boolean
|
||||
browserClient?: ICriClient
|
||||
onReconnectAttempt?: (retryIndex: number) => void
|
||||
onCriConnectionClosed?: () => void
|
||||
}
|
||||
|
||||
export class CriClient implements ICriClient {
|
||||
@@ -185,6 +197,8 @@ export class CriClient implements ICriClient {
|
||||
private enableCommands: EnableCommand[] = []
|
||||
private enqueuedCommands: EnqueuedCommand[] = []
|
||||
|
||||
private _commandQueue: CDPCommandQueue = new CDPCommandQueue()
|
||||
|
||||
private _closed = false
|
||||
private _connected = false
|
||||
private crashed = false
|
||||
@@ -202,28 +216,9 @@ export class CriClient implements ICriClient {
|
||||
private fullyManageTabs?: boolean,
|
||||
private browserClient?: ICriClient,
|
||||
private onReconnectAttempt?: (retryIndex: number) => void,
|
||||
private onCriConnectionClosed?: () => void,
|
||||
) {}
|
||||
|
||||
get ws () {
|
||||
return this.cri!._ws
|
||||
}
|
||||
|
||||
get queue () {
|
||||
return {
|
||||
enableCommands: this.enableCommands,
|
||||
enqueuedCommands: this.enqueuedCommands,
|
||||
subscriptions: this.subscriptions,
|
||||
}
|
||||
}
|
||||
|
||||
get closed () {
|
||||
return this._closed
|
||||
}
|
||||
|
||||
get connected () {
|
||||
return this._connected
|
||||
}
|
||||
|
||||
static async create ({
|
||||
target,
|
||||
onAsynchronousError,
|
||||
@@ -234,145 +229,40 @@ export class CriClient implements ICriClient {
|
||||
fullyManageTabs,
|
||||
browserClient,
|
||||
onReconnectAttempt,
|
||||
onCriConnectionClosed,
|
||||
}: CreateParams): Promise<CriClient> {
|
||||
const newClient = new CriClient(target, onAsynchronousError, host, port, onReconnect, protocolManager, fullyManageTabs, browserClient, onReconnectAttempt)
|
||||
const newClient = new CriClient(target, onAsynchronousError, host, port, onReconnect, protocolManager, fullyManageTabs, browserClient, onReconnectAttempt, onCriConnectionClosed)
|
||||
|
||||
await newClient.connect()
|
||||
|
||||
return newClient
|
||||
}
|
||||
|
||||
private async reconnect (retryIndex: number = 0) {
|
||||
this._connected = false
|
||||
|
||||
if (this.closed) {
|
||||
debug('disconnected, not reconnecting because client is closed %o', { closed: this.closed, target: this.targetId })
|
||||
this.enqueuedCommands = []
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
this.onReconnectAttempt?.(retryIndex)
|
||||
|
||||
debug('disconnected, attempting to reconnect... %o', { retryIndex, closed: this.closed, target: this.targetId })
|
||||
|
||||
await this.connect()
|
||||
|
||||
debug('restoring subscriptions + running *.enable and queued commands... %o', { subscriptions: this.subscriptions, enableCommands: this.enableCommands, enqueuedCommands: this.enqueuedCommands, target: this.targetId })
|
||||
|
||||
this.subscriptions.forEach((sub) => {
|
||||
this.cri?.on(sub.eventName, sub.cb as any)
|
||||
})
|
||||
|
||||
// '*.enable' commands need to be resent on reconnect or any events in
|
||||
// that namespace will no longer be received
|
||||
await Promise.all(this.enableCommands.map(async ({ command, params, sessionId }) => {
|
||||
// these commands may have been enqueued, so we need to resolve those promises and remove
|
||||
// them from the queue when we send here
|
||||
const isInFlightCommand = (candidate: EnqueuedCommand) => {
|
||||
return candidate.command === command && candidate.params === params && candidate.sessionId === sessionId
|
||||
}
|
||||
const enqueued = this.enqueuedCommands.find(isInFlightCommand)
|
||||
|
||||
try {
|
||||
const response = await this.cri?.send(command, params, sessionId)
|
||||
|
||||
enqueued?.p.resolve(response)
|
||||
} catch (e) {
|
||||
enqueued?.p.reject(e)
|
||||
} finally {
|
||||
this.enqueuedCommands = this.enqueuedCommands.filter((candidate) => {
|
||||
return !isInFlightCommand(candidate)
|
||||
})
|
||||
}
|
||||
}))
|
||||
|
||||
this.enqueuedCommands.forEach((cmd) => {
|
||||
this.cri!.send(cmd.command, cmd.params, cmd.sessionId).then(cmd.p.resolve as any, cmd.p.reject as any)
|
||||
})
|
||||
|
||||
this.enqueuedCommands = []
|
||||
|
||||
if (this.onReconnect) {
|
||||
this.onReconnect(this)
|
||||
}
|
||||
|
||||
// When CDP disconnects, it will automatically reconnect and re-apply various subscriptions
|
||||
// (e.g. DOM.enable, Network.enable, etc.). However, we need to restart tracking DOM mutations
|
||||
// from scratch. We do this by capturing a brand new full snapshot of the DOM.
|
||||
await this.protocolManager?.cdpReconnect()
|
||||
get ws () {
|
||||
return this.cri!._ws
|
||||
}
|
||||
|
||||
private retryReconnect = async () => {
|
||||
if (this.reconnection) {
|
||||
debug('reconnection in progress; not starting new process, returning promise for in-flight reconnection attempt')
|
||||
|
||||
return this.reconnection
|
||||
}
|
||||
|
||||
debug('disconnected, starting retries to reconnect... %o', { closed: this.closed, target: this.targetId })
|
||||
|
||||
const retry = async (retryIndex = 0) => {
|
||||
retryIndex++
|
||||
|
||||
try {
|
||||
const attempt = await this.reconnect(retryIndex)
|
||||
|
||||
this.reconnection = undefined
|
||||
|
||||
return attempt
|
||||
} catch (err) {
|
||||
if (this.closed) {
|
||||
debug('could not reconnect because client is closed %o', { closed: this.closed, target: this.targetId })
|
||||
|
||||
this.enqueuedCommands = []
|
||||
|
||||
return
|
||||
// this property is accessed in a couple different places, but should be refactored to be
|
||||
// private - queues are internal to this class, and should not be exposed
|
||||
get queue () {
|
||||
return {
|
||||
enableCommands: this.enableCommands,
|
||||
enqueuedCommands: this._commandQueue.entries.map((entry) => {
|
||||
return {
|
||||
...entry,
|
||||
p: entry.deferred,
|
||||
}
|
||||
|
||||
debug('could not reconnect, retrying... %o', { closed: this.closed, target: this.targetId, err })
|
||||
|
||||
if (retryIndex < 20) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
|
||||
return retry(retryIndex)
|
||||
}
|
||||
|
||||
const cdpError = errors.get('CDP_COULD_NOT_RECONNECT', err)
|
||||
|
||||
// If we cannot reconnect to CDP, we will be unable to move to the next set of specs since we use CDP to clean up and close tabs. Marking this as fatal
|
||||
cdpError.isFatalApiErr = true
|
||||
this.reconnection = undefined
|
||||
this.onAsynchronousError(cdpError)
|
||||
}
|
||||
}),
|
||||
subscriptions: this.subscriptions,
|
||||
}
|
||||
|
||||
this.reconnection = retry()
|
||||
|
||||
return this.reconnection
|
||||
}
|
||||
|
||||
private enqueueCommand <TCmd extends CdpCommand> (
|
||||
command: TCmd,
|
||||
params: ProtocolMapping.Commands[TCmd]['paramsType'][0],
|
||||
sessionId?: string,
|
||||
): Promise<ProtocolMapping.Commands[TCmd]['returnType']> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const obj: EnqueuedCommand = {
|
||||
command,
|
||||
p: { resolve, reject },
|
||||
}
|
||||
get closed () {
|
||||
return this._closed
|
||||
}
|
||||
|
||||
if (params) {
|
||||
obj.params = params
|
||||
}
|
||||
|
||||
if (sessionId) {
|
||||
obj.sessionId = sessionId
|
||||
}
|
||||
|
||||
this.enqueuedCommands.push(obj)
|
||||
})
|
||||
get connected () {
|
||||
return this._connected
|
||||
}
|
||||
|
||||
public connect = async () => {
|
||||
@@ -412,7 +302,7 @@ export class CriClient implements ICriClient {
|
||||
// that we don't want to reconnect on
|
||||
&& !process.env.CYPRESS_INTERNAL_E2E_TESTING_SELF
|
||||
) {
|
||||
this.cri.on('disconnect', this.retryReconnect)
|
||||
this.cri.on('disconnect', this._reconnect)
|
||||
}
|
||||
|
||||
// We're only interested in child target traffic. Browser cri traffic is
|
||||
@@ -498,9 +388,9 @@ export class CriClient implements ICriClient {
|
||||
|
||||
debug('error classified as WEBSOCKET_NOT_OPEN_RE; enqueuing and attempting to reconnect')
|
||||
|
||||
const p = this.enqueueCommand(command, params, sessionId)
|
||||
const p = this._enqueueCommand(command, params, sessionId)
|
||||
|
||||
await this.retryReconnect()
|
||||
await this._reconnect()
|
||||
|
||||
// if enqueued commands were wiped out from the reconnect and the socket is already closed, reject the command as it will never be run
|
||||
if (this.enqueuedCommands.length === 0 && this.closed) {
|
||||
@@ -513,7 +403,7 @@ export class CriClient implements ICriClient {
|
||||
}
|
||||
}
|
||||
|
||||
return this.enqueueCommand(command, params, sessionId)
|
||||
return this._enqueueCommand(command, params, sessionId)
|
||||
}
|
||||
|
||||
public on = <T extends keyof ProtocolMapping.Events> (eventName: T, cb: (data: ProtocolMapping.Events[T][0], sessionId?: string) => void) => {
|
||||
@@ -541,6 +431,7 @@ export class CriClient implements ICriClient {
|
||||
}
|
||||
|
||||
public close = async () => {
|
||||
debug('closing')
|
||||
if (this._closed) {
|
||||
debug('not closing, cri client is already closed %o', { closed: this._closed, target: this.targetId })
|
||||
|
||||
@@ -557,6 +448,176 @@ export class CriClient implements ICriClient {
|
||||
debug('error closing cri client targeting %s: %o', this.targetId, e)
|
||||
} finally {
|
||||
debug('closed cri client %o', { closed: this._closed, target: this.targetId })
|
||||
if (this.onCriConnectionClosed) {
|
||||
this.onCriConnectionClosed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private _enqueueCommand <TCmd extends CdpCommand> (
|
||||
command: TCmd,
|
||||
params: ProtocolMapping.Commands[TCmd]['paramsType'][0],
|
||||
sessionId?: string,
|
||||
): Promise<ProtocolMapping.Commands[TCmd]['returnType']> {
|
||||
return this._commandQueue.add(command, params, sessionId)
|
||||
}
|
||||
|
||||
private _isConnectionError (error: Error) {
|
||||
return WEBSOCKET_NOT_OPEN_RE.test(error.message)
|
||||
}
|
||||
|
||||
private _reconnect = async () => {
|
||||
debug('preparing to reconnect')
|
||||
if (this.reconnection) {
|
||||
debug('not reconnecting as there is an active reconnection attempt')
|
||||
|
||||
return this.reconnection
|
||||
}
|
||||
|
||||
this._connected = false
|
||||
|
||||
if (this._closed) {
|
||||
debug('Target %s disconnected, not reconnecting because client is closed.', this.targetId)
|
||||
this._commandQueue.clear()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
let attempt = 1
|
||||
|
||||
try {
|
||||
this.reconnection = asyncRetry(() => {
|
||||
if (this._closed) {
|
||||
throw new ConnectionClosedError('Reconnection halted due to a closed client.')
|
||||
}
|
||||
|
||||
this.onReconnectAttempt?.(attempt)
|
||||
attempt++
|
||||
|
||||
return this.connect()
|
||||
}, {
|
||||
maxAttempts: 20,
|
||||
retryDelay: () => 100,
|
||||
shouldRetry: (err) => {
|
||||
debug('error while reconnecting to Target %s: %o', this.targetId, err)
|
||||
if (err && ConnectionClosedError.isConnectionClosedError(err)) {
|
||||
return false
|
||||
}
|
||||
|
||||
debug('Retying reconnection attempt')
|
||||
|
||||
return true
|
||||
},
|
||||
})()
|
||||
|
||||
await this.reconnection
|
||||
this.reconnection = undefined
|
||||
debug('reconnected')
|
||||
} catch (err) {
|
||||
debug('error(s) on reconnecting: ', err)
|
||||
const significantError: Error = err.errors ? (err as AggregateError).errors[err.errors.length - 1] : err
|
||||
|
||||
const retryHaltedDueToClosed = ConnectionClosedError.isConnectionClosedError(err) ||
|
||||
(err as AggregateError)?.errors?.find((predicate) => ConnectionClosedError.isConnectionClosedError(predicate))
|
||||
|
||||
if (!retryHaltedDueToClosed) {
|
||||
const cdpError = errors.get('CDP_COULD_NOT_RECONNECT', significantError)
|
||||
|
||||
cdpError.isFatalApiErr = true
|
||||
this.reconnection = undefined
|
||||
this._commandQueue.clear()
|
||||
this.onAsynchronousError(cdpError)
|
||||
}
|
||||
|
||||
// do not re-throw; error handling is done via onAsynchronousError
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
await this._restoreState()
|
||||
await this._drainCommandQueue()
|
||||
|
||||
await this.protocolManager?.cdpReconnect()
|
||||
} catch (e) {
|
||||
if (this._isConnectionError(e)) {
|
||||
return this._reconnect()
|
||||
}
|
||||
|
||||
throw e
|
||||
}
|
||||
|
||||
// previous timing of this had it happening before subscriptions/enablements were restored,
|
||||
// and before any enqueued commands were sent. This made testing problematic. Changing the
|
||||
// timing may have implications for browsers that wish to update frame tree - that process
|
||||
// will now be kicked off after state restoration & pending commands, rather then before.
|
||||
// This warrants extra scrutiny in tests. (convert to PR comment)
|
||||
if (this.onReconnect) {
|
||||
this.onReconnect(this)
|
||||
}
|
||||
}
|
||||
|
||||
private async _restoreState () {
|
||||
debug('resubscribing to %d subscriptions', this.subscriptions.length)
|
||||
|
||||
this.subscriptions.forEach((sub) => {
|
||||
this.cri?.on(sub.eventName, sub.cb as any)
|
||||
})
|
||||
|
||||
// '*.enable' commands need to be resent on reconnect or any events in
|
||||
// that namespace will no longer be received
|
||||
debug('re-enabling %d enablements', this.enableCommands.length)
|
||||
await Promise.all(this.enableCommands.map(async ({ command, params, sessionId }) => {
|
||||
// these commands may have been enqueued, so we need to resolve those promises and remove
|
||||
// them from the queue when we send here
|
||||
const inFlightCommand = this._commandQueue.extract({ command, params, sessionId })
|
||||
|
||||
try {
|
||||
const response = await this.cri?.send(command, params, sessionId)
|
||||
|
||||
inFlightCommand?.deferred.resolve(response)
|
||||
} catch (err) {
|
||||
debug('error re-enabling %s: ', command, err)
|
||||
if (this._isConnectionError(err)) {
|
||||
// Connection errors are thrown here so that a reconnection attempt
|
||||
// can be made.
|
||||
throw err
|
||||
} else {
|
||||
// non-connection errors are appropriate for rejecting the original command promise
|
||||
inFlightCommand?.deferred.reject(err)
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
private async _drainCommandQueue () {
|
||||
debug('sending %d enqueued commands', this._commandQueue.entries.length)
|
||||
while (this._commandQueue.entries.length) {
|
||||
const enqueued = this._commandQueue.shift()
|
||||
|
||||
if (!enqueued) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
debug('sending enqueued command %s', enqueued.command)
|
||||
const response = await this.cri!.send(enqueued.command, enqueued.params, enqueued.sessionId)
|
||||
|
||||
debug('sent command, received ', { response })
|
||||
enqueued.deferred.resolve(response)
|
||||
debug('resolved enqueued promise')
|
||||
} catch (e) {
|
||||
debug('enqueued command %s failed:', enqueued.command, e)
|
||||
if (this._isConnectionError(e)) {
|
||||
// similar to restoring state, connection errors are re-thrown so that
|
||||
// the connection can be restored. The command is queued for re-delivery
|
||||
// upon reconnect.
|
||||
debug('re-enqueuing command and re-throwing')
|
||||
this._commandQueue.unshift(enqueued)
|
||||
throw e
|
||||
} else {
|
||||
enqueued.deferred.reject(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import WebSocket from 'ws'
|
||||
import { CdpCommand, CdpEvent } from '../../lib/browsers/cdp_automation'
|
||||
import { CriClient } from '../../lib/browsers/cri-client'
|
||||
import { expect, nock } from '../spec_helper'
|
||||
|
||||
import pDefer from 'p-defer'
|
||||
import sinon from 'sinon'
|
||||
|
||||
// import Bluebird from 'bluebird'
|
||||
@@ -20,11 +20,6 @@ type CDPCommands = {
|
||||
params?: object
|
||||
}
|
||||
|
||||
type CDPSubscriptions = {
|
||||
eventName: CdpEvent
|
||||
cb: () => void
|
||||
}
|
||||
|
||||
type OnWSConnection = (wsClient: WebSocket) => void
|
||||
|
||||
describe('CDP Clients', () => {
|
||||
@@ -34,6 +29,8 @@ describe('CDP Clients', () => {
|
||||
let criClient: CriClient
|
||||
let messages: object[]
|
||||
let onMessage: sinon.SinonStub
|
||||
let messageResponse: ReturnType<typeof pDefer>
|
||||
let neverAck: boolean
|
||||
|
||||
const startWsServer = async (onConnection?: OnWSConnection): Promise<WebSocket.Server> => {
|
||||
return new Promise((resolve, reject) => {
|
||||
@@ -48,14 +45,26 @@ describe('CDP Clients', () => {
|
||||
|
||||
// eslint-disable-next-line no-console
|
||||
ws.on('error', console.error)
|
||||
ws.on('message', (data) => {
|
||||
ws.on('message', async (data) => {
|
||||
const msg = JSON.parse(data.toString())
|
||||
|
||||
messages.push(msg)
|
||||
onMessage(msg)
|
||||
|
||||
if (neverAck) {
|
||||
return
|
||||
}
|
||||
|
||||
// ACK back if we have a msg.id
|
||||
if (msg.id) {
|
||||
if (messageResponse) {
|
||||
const message = await messageResponse.promise
|
||||
|
||||
ws.send(JSON.stringify({ id: msg.id, result: message }))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ws.send(JSON.stringify({
|
||||
id: msg.id,
|
||||
result: {},
|
||||
@@ -93,6 +102,7 @@ describe('CDP Clients', () => {
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
messageResponse = undefined
|
||||
messages = []
|
||||
|
||||
onMessage = sinon.stub()
|
||||
@@ -103,6 +113,7 @@ describe('CDP Clients', () => {
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
debug('after each,', !!wsSrv)
|
||||
await criClient.close().catch(() => { })
|
||||
await closeWsServer()
|
||||
})
|
||||
@@ -165,129 +176,232 @@ describe('CDP Clients', () => {
|
||||
})
|
||||
})
|
||||
|
||||
it('restores sending enqueued commands, subscriptions, and enable commands on reconnect', () => {
|
||||
const enableCommands: CDPCommands[] = [
|
||||
{ command: 'Page.enable', params: {} },
|
||||
{ command: 'Network.enable', params: {} },
|
||||
{ command: 'Runtime.addBinding', params: { name: 'foo' } },
|
||||
{ command: 'Target.setDiscoverTargets', params: { discover: true } },
|
||||
]
|
||||
|
||||
const enqueuedCommands: CDPCommands[] = [
|
||||
{ command: 'Page.navigate', params: { url: 'about:blank' } },
|
||||
{ command: 'Performance.getMetrics', params: {} },
|
||||
]
|
||||
|
||||
const cb = sinon.stub()
|
||||
|
||||
const subscriptions: CDPSubscriptions[] = [
|
||||
{ eventName: 'Network.requestWillBeSent', cb },
|
||||
{ eventName: 'Network.responseReceived', cb },
|
||||
]
|
||||
|
||||
let wsClient
|
||||
|
||||
const stub = sinon.stub().onThirdCall().callsFake(async () => {
|
||||
wsSrv = await startWsServer((ws) => {
|
||||
wsClient = ws
|
||||
})
|
||||
})
|
||||
|
||||
const onReconnect = sinon.stub()
|
||||
|
||||
return new Promise(async (resolve, reject) => {
|
||||
const onAsynchronousError = reject
|
||||
|
||||
it('stops trying to reconnect if .close() is called, and does not trigger an async error', async () => {
|
||||
const stub = sinon.stub()
|
||||
const onCriConnectionClosed = sinon.stub()
|
||||
const haltedReconnection = new Promise<void>(async (resolve, reject) => {
|
||||
onCriConnectionClosed.callsFake(resolve)
|
||||
criClient = await CriClient.create({
|
||||
target: `ws://127.0.0.1:${wsServerPort}`,
|
||||
onAsynchronousError,
|
||||
onReconnect,
|
||||
onAsynchronousError: reject,
|
||||
onReconnect: reject,
|
||||
onReconnectAttempt: stub,
|
||||
onCriConnectionClosed,
|
||||
})
|
||||
|
||||
const send = (commands: CDPCommands[]) => {
|
||||
commands.forEach(({ command, params }) => {
|
||||
criClient.send(command, params)
|
||||
})
|
||||
}
|
||||
|
||||
const on = (subscriptions: CDPSubscriptions[]) => {
|
||||
subscriptions.forEach(({ eventName, cb }) => {
|
||||
criClient.on(eventName, cb)
|
||||
})
|
||||
}
|
||||
|
||||
// send these in before we disconnect
|
||||
send(enableCommands)
|
||||
|
||||
await Promise.all([
|
||||
clientDisconnected(),
|
||||
closeWsServer(),
|
||||
])
|
||||
|
||||
// expect 6 message calls
|
||||
onMessage = sinon.stub().onCall(5).callsFake(resolve)
|
||||
criClient.close()
|
||||
})
|
||||
|
||||
// now enqueue these commands
|
||||
send(enqueuedCommands)
|
||||
on(subscriptions)
|
||||
await haltedReconnection
|
||||
|
||||
const { queue } = criClient
|
||||
expect(onCriConnectionClosed).to.have.been.called
|
||||
})
|
||||
|
||||
// assert they're in the queue
|
||||
expect(queue.enqueuedCommands).to.containSubset(enqueuedCommands)
|
||||
expect(queue.enableCommands).to.containSubset(enableCommands)
|
||||
expect(queue.subscriptions).to.containSubset(subscriptions.map(({ eventName, cb }) => {
|
||||
return {
|
||||
eventName,
|
||||
cb: _.isFunction,
|
||||
}
|
||||
it('continuously re-sends commands that fail due to disconnect, until target is closed', async () => {
|
||||
/**
|
||||
* This test is specifically for the case when a CRIClient websocket trampolines, and
|
||||
* enqueued messages fail due to a disconnected websocket.
|
||||
*
|
||||
* That happens if a command fails due to an in-flight disconnect, and then fails again
|
||||
* after being enqueued due to an in-flight disconnect.
|
||||
*
|
||||
* The steps taken here to reproduce:
|
||||
* 1. Connect to the websocket
|
||||
* 2. Send the command, and wait for it to be received by the websocket (but not responded to)
|
||||
* 3. Disconnect the websocket
|
||||
* 4. Allow the websocket to be reconnected after 3 tries, and wait for successful reconnection
|
||||
* 5. Wait for the command to be re-sent and received by the websocket (but not responded to)
|
||||
* 6. Disconnect the websocket.
|
||||
* 7. Allow the websocket to be reconnected after 3 tries, and wait for successful reconnection
|
||||
* 8. Wait for the command to be resent, received, and responded to successfully.
|
||||
*/
|
||||
neverAck = true
|
||||
const command: CDPCommands = {
|
||||
command: 'DOM.getDocument',
|
||||
params: { depth: -1 },
|
||||
}
|
||||
let reconnectPromise = pDefer()
|
||||
let commandSent = pDefer()
|
||||
const reconnectOnThirdTry = sinon.stub().onThirdCall().callsFake(async () => {
|
||||
wsSrv = await startWsServer((ws) => {
|
||||
})
|
||||
})
|
||||
|
||||
const onReconnect = sinon.stub().callsFake(() => {
|
||||
reconnectPromise.resolve()
|
||||
})
|
||||
|
||||
criClient = await CriClient.create({
|
||||
target: `ws://127.0.0.1:${wsServerPort}`,
|
||||
onAsynchronousError: (e) => commandSent.reject(e),
|
||||
onReconnect,
|
||||
onReconnectAttempt: reconnectOnThirdTry,
|
||||
})
|
||||
|
||||
onMessage.callsFake(() => {
|
||||
commandSent.resolve()
|
||||
})
|
||||
|
||||
const cmdExecution = criClient.send(command.command, command.params)
|
||||
|
||||
await commandSent.promise
|
||||
await Promise.all([clientDisconnected(), closeWsServer()])
|
||||
|
||||
commandSent = pDefer()
|
||||
onMessage.resetHistory()
|
||||
|
||||
reconnectOnThirdTry.resetHistory()
|
||||
|
||||
await commandSent.promise
|
||||
|
||||
await Promise.all([clientDisconnected(), closeWsServer()])
|
||||
|
||||
reconnectOnThirdTry.resetHistory()
|
||||
|
||||
reconnectPromise = pDefer()
|
||||
|
||||
// set up response value
|
||||
messageResponse = pDefer()
|
||||
|
||||
neverAck = false
|
||||
|
||||
messageResponse.resolve({ response: true })
|
||||
|
||||
const res: any = await cmdExecution
|
||||
|
||||
expect(res.response).to.eq(true)
|
||||
|
||||
expect(reconnectPromise.promise).to.be.fulfilled
|
||||
})
|
||||
|
||||
it('reattaches subscriptions, reenables enablements, and sends pending commands on reconnect', async () => {
|
||||
let reconnectPromise = pDefer()
|
||||
let commandSent = pDefer()
|
||||
let wsClient
|
||||
const reconnectOnThirdTry = sinon.stub().onThirdCall().callsFake(async () => {
|
||||
wsSrv = await startWsServer((ws) => {
|
||||
wsClient = ws
|
||||
})
|
||||
})
|
||||
|
||||
const onReconnect = sinon.stub().callsFake(() => {
|
||||
reconnectPromise.resolve()
|
||||
})
|
||||
|
||||
criClient = await CriClient.create({
|
||||
target: `ws://127.0.0.1:${wsServerPort}`,
|
||||
onAsynchronousError: (e) => commandSent.reject(e),
|
||||
onReconnect,
|
||||
onReconnectAttempt: reconnectOnThirdTry,
|
||||
})
|
||||
|
||||
onMessage.callsFake(() => {
|
||||
commandSent.resolve()
|
||||
})
|
||||
|
||||
const enablements: CDPCommands[] = [
|
||||
{ command: 'Page.enable', params: {} },
|
||||
{ command: 'Network.enable', params: {} },
|
||||
{ command: 'Runtime.addBinding', params: { name: 'foo' } },
|
||||
{ command: 'Target.setDiscoverTargets', params: { discover: true } },
|
||||
]
|
||||
|
||||
const networkRequestSubscription = {
|
||||
eventName: 'Network.requestWillBeSent',
|
||||
cb: sinon.stub(),
|
||||
mockEvent: { foo: 'bar' },
|
||||
}
|
||||
const networkResponseSubscription = {
|
||||
eventName: 'Network.responseReceived',
|
||||
cb: sinon.stub(),
|
||||
mockEvent: { baz: 'quux' },
|
||||
}
|
||||
|
||||
const subscriptions = [
|
||||
networkRequestSubscription,
|
||||
networkResponseSubscription,
|
||||
]
|
||||
|
||||
// initialize state
|
||||
|
||||
for (const { command, params } of enablements) {
|
||||
await criClient.send(command, params)
|
||||
}
|
||||
for (const { eventName, cb } of subscriptions) {
|
||||
criClient.on(eventName as CdpEvent, cb)
|
||||
}
|
||||
|
||||
const commandsToEnqueue: (CDPCommands & { promise?: Promise<any> })[] = [
|
||||
{ command: 'Page.navigate', params: { url: 'about:blank' }, promise: undefined },
|
||||
{ command: 'Performance.getMetrics', params: {}, promise: undefined },
|
||||
]
|
||||
|
||||
// prevent commands from resolving, for now
|
||||
neverAck = true
|
||||
// send each command, and wait for them to be sent (but not responded to)
|
||||
for (let i = 0; i < commandsToEnqueue.length; i++) {
|
||||
commandSent = pDefer()
|
||||
const command = commandsToEnqueue[i]
|
||||
|
||||
commandsToEnqueue[i].promise = criClient.send(command.command, command.params)
|
||||
|
||||
await commandSent.promise
|
||||
}
|
||||
|
||||
onMessage.resetHistory()
|
||||
// disconnect the websocket, causing enqueued commands to be enqueued
|
||||
await Promise.all([clientDisconnected(), closeWsServer()])
|
||||
|
||||
// re-enable responses from underlying CDP
|
||||
neverAck = false
|
||||
|
||||
// CriClient should now retry to connect, and succeed on the third try. Wait for reconnect.
|
||||
// this promise is resolved when: CDP is reconnected, state is restored, and queue is drained
|
||||
await reconnectPromise.promise
|
||||
|
||||
// onMessage call history was reset prior to reconnection - these are assertions about
|
||||
// calls made after that reset
|
||||
for (const { command, params } of enablements) {
|
||||
/**
|
||||
* sinon/sinon-chai's expect(stub).to.have.been.calledWith({
|
||||
* partial: object
|
||||
* })
|
||||
* does not work as advertised, at least with our version of sinon/chai/sinon-chai.
|
||||
* because the message id has the potential to be brittle, we want to assert that
|
||||
* onmessage was called with a specific command and params, regardless of message id
|
||||
*/
|
||||
const sentArgs = onMessage.args.filter(([arg]) => {
|
||||
return arg.method === command && _.isEqual(arg.params, params)
|
||||
})
|
||||
|
||||
expect(sentArgs, `onMessage args for enqueued command ${command}`).to.have.lengthOf(1)
|
||||
}
|
||||
for (const { command, params } of commandsToEnqueue) {
|
||||
const sentArgs = onMessage.args.filter(([{ method, params: p }]) => {
|
||||
return method === command && _.isEqual(p, params)
|
||||
})
|
||||
|
||||
expect(sentArgs, `onMessage args for enqueued command ${command}`).to.have.lengthOf(1)
|
||||
}
|
||||
// for full integration, send events that should be subscribed to, and expect that subscription
|
||||
// callback to be called
|
||||
for (const { eventName, cb, mockEvent } of subscriptions) {
|
||||
const deferred = pDefer()
|
||||
|
||||
cb.onFirstCall().callsFake(deferred.resolve)
|
||||
wsClient.send(JSON.stringify({
|
||||
method: eventName,
|
||||
params: mockEvent,
|
||||
}))
|
||||
})
|
||||
.then(() => {
|
||||
const { queue } = criClient
|
||||
|
||||
expect(queue.enqueuedCommands).to.be.empty
|
||||
expect(queue.enableCommands).not.to.be.empty
|
||||
expect(queue.subscriptions).not.to.be.empty
|
||||
|
||||
const messageCalls = _
|
||||
.chain(onMessage.args)
|
||||
.flatten()
|
||||
.map(({ method, params }) => {
|
||||
return {
|
||||
command: method,
|
||||
params: params ?? {},
|
||||
}
|
||||
})
|
||||
.value()
|
||||
|
||||
expect(onMessage).to.have.callCount(6)
|
||||
expect(messageCalls).to.deep.eq(
|
||||
_.concat(
|
||||
enableCommands,
|
||||
enqueuedCommands,
|
||||
),
|
||||
)
|
||||
|
||||
return new Promise((resolve) => {
|
||||
cb.onSecondCall().callsFake(resolve)
|
||||
|
||||
wsClient.send(JSON.stringify({
|
||||
method: 'Network.requestWillBeSent',
|
||||
params: { foo: 'bar' },
|
||||
}))
|
||||
|
||||
wsClient.send(JSON.stringify({
|
||||
method: 'Network.responseReceived',
|
||||
params: { baz: 'quux' },
|
||||
}))
|
||||
})
|
||||
})
|
||||
.then(() => {
|
||||
expect(cb.firstCall).to.be.calledWith({ foo: 'bar' })
|
||||
expect(cb.secondCall).to.be.calledWith({ baz: 'quux' })
|
||||
})
|
||||
await deferred.promise
|
||||
expect(cb).to.have.been.calledWith(mockEvent)
|
||||
}
|
||||
})
|
||||
|
||||
it('stops reconnecting after close is called', () => {
|
||||
|
||||
163
packages/server/test/unit/browsers/cdp-command-queue_spec.ts
Normal file
163
packages/server/test/unit/browsers/cdp-command-queue_spec.ts
Normal file
@@ -0,0 +1,163 @@
|
||||
import { CDPCommandQueue, Command } from '../../../lib/browsers/cdp-command-queue'
|
||||
import type ProtocolMapping from 'devtools-protocol/types/protocol-mapping'
|
||||
import pDeferred from 'p-defer'
|
||||
import _ from 'lodash'
|
||||
|
||||
const { expect } = require('../../spec_helper')
|
||||
|
||||
function matchCommand (search: Partial<Command<any>>) {
|
||||
return (predicate: Partial<Command<any>>) => {
|
||||
return _.isEqual(search.command, predicate.command) && _.isEqual(search.params, predicate.params)
|
||||
}
|
||||
}
|
||||
|
||||
describe('CDPCommandQueue', () => {
|
||||
const enableAnimation: {
|
||||
command: 'Animation.enable'
|
||||
params: undefined
|
||||
} = { command: 'Animation.enable', params: undefined }
|
||||
const removeAttribute: {
|
||||
command: 'DOM.removeAttribute'
|
||||
params: ProtocolMapping.Commands['DOM.removeAttribute']['paramsType'][0]
|
||||
} = { command: 'DOM.removeAttribute', params: { name: 'attribute', nodeId: 123 } }
|
||||
|
||||
describe('.entries', () => {
|
||||
describe('when an entry is added', () => {
|
||||
let queue: CDPCommandQueue
|
||||
|
||||
beforeEach(() => {
|
||||
queue = new CDPCommandQueue()
|
||||
queue.add(enableAnimation.command, enableAnimation.params)
|
||||
})
|
||||
|
||||
it('reflects only the entry that was added', () => {
|
||||
expect(queue.entries.find(matchCommand(enableAnimation)), 'queue should contain enableAnimation').not.to.be.undefined
|
||||
expect(queue.entries.length).to.eq(1)
|
||||
})
|
||||
|
||||
describe('and another is added', () => {
|
||||
beforeEach(() => {
|
||||
queue.add(removeAttribute.command, removeAttribute.params)
|
||||
})
|
||||
|
||||
it('reflects only the entries that have been added', () => {
|
||||
expect(queue.entries.find(matchCommand(enableAnimation))).not.to.be.undefined
|
||||
expect(queue.entries.find(matchCommand(removeAttribute))).not.to.be.undefined
|
||||
expect(queue.entries).to.have.lengthOf(2)
|
||||
})
|
||||
})
|
||||
|
||||
describe('and the is cleared', () => {
|
||||
beforeEach(() => {
|
||||
queue.clear()
|
||||
})
|
||||
|
||||
it('has no entries', () => {
|
||||
expect(queue.entries.find(matchCommand(enableAnimation))).to.be.undefined
|
||||
expect(queue.entries).to.have.lengthOf(0)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('.add', () => {
|
||||
it('adds a command to the queue and returns a promise that is resolved when the command is resolved', () => {
|
||||
const sessionId = '1234'
|
||||
const queue = new CDPCommandQueue()
|
||||
|
||||
const commandPromise = queue.add(enableAnimation.command, enableAnimation.params, sessionId)
|
||||
const enqueued = queue.entries[0]
|
||||
|
||||
expect(enqueued.command).to.eq(enableAnimation.command)
|
||||
expect(_.isEqual(enqueued.params, enableAnimation.params), 'params are preserved').to.be.true
|
||||
expect(enqueued.sessionId).to.eq(sessionId)
|
||||
expect(enqueued.deferred).not.to.be.undefined
|
||||
|
||||
const resolution = { value: true }
|
||||
|
||||
enqueued.deferred.resolve(resolution)
|
||||
expect(commandPromise).to.eventually.equal(resolution)
|
||||
})
|
||||
})
|
||||
|
||||
describe('.clear', () => {
|
||||
it('clears the queue', () => {
|
||||
const queue = new CDPCommandQueue()
|
||||
|
||||
queue.add(enableAnimation.command, enableAnimation.params)
|
||||
queue.add(removeAttribute.command, removeAttribute.params)
|
||||
expect(queue.entries).to.have.lengthOf(2)
|
||||
queue.clear()
|
||||
expect(queue.entries).to.have.lengthOf(0)
|
||||
})
|
||||
})
|
||||
|
||||
describe('.extract', () => {
|
||||
let queue: CDPCommandQueue
|
||||
let searchCommand: Partial<Command<any>>
|
||||
let addCommand: Partial<Command<any>>
|
||||
|
||||
beforeEach(() => {
|
||||
queue = new CDPCommandQueue()
|
||||
})
|
||||
|
||||
describe('when the given search predicate exists in the queue', () => {
|
||||
beforeEach(() => {
|
||||
searchCommand = enableAnimation
|
||||
addCommand = enableAnimation
|
||||
})
|
||||
|
||||
it('returns the matching enqueued command, and removes it from the queue', () => {
|
||||
queue.add(addCommand.command, addCommand.params)
|
||||
const found = queue.extract(searchCommand)
|
||||
|
||||
expect(found.command).to.eq(searchCommand.command)
|
||||
expect(found.params).to.eq(searchCommand.params)
|
||||
expect(queue.entries).to.have.lengthOf(0)
|
||||
})
|
||||
})
|
||||
|
||||
describe('when the given search predicate does not exist in the queue', () => {
|
||||
beforeEach(() => {
|
||||
addCommand = removeAttribute
|
||||
searchCommand = enableAnimation
|
||||
})
|
||||
|
||||
it('returns undefined, and does not modify the queue', () => {
|
||||
queue.add(addCommand.command, addCommand.params)
|
||||
expect(queue.entries).to.have.lengthOf(1)
|
||||
const found = queue.extract(searchCommand)
|
||||
|
||||
expect(found).to.be.undefined
|
||||
expect(queue.entries).to.have.lengthOf(1)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('.shift', () => {
|
||||
it('removes and returns the entry from the beginning of the queue', () => {
|
||||
const queue = new CDPCommandQueue()
|
||||
|
||||
queue.add(enableAnimation.command, enableAnimation.params)
|
||||
queue.add(removeAttribute.command, removeAttribute.params)
|
||||
const next = queue.shift()
|
||||
|
||||
expect(next.command).to.eq(enableAnimation.command)
|
||||
expect(queue.entries).to.have.lengthOf(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe('.unshift', () => {
|
||||
it('adds an entry to the front of the queue', () => {
|
||||
const queue = new CDPCommandQueue()
|
||||
|
||||
queue.add(enableAnimation.command, enableAnimation.params)
|
||||
const deferred = pDeferred()
|
||||
|
||||
queue.unshift({
|
||||
command: enableAnimation.command,
|
||||
deferred,
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,6 +1,7 @@
|
||||
import EventEmitter from 'events'
|
||||
import type { CriClient } from '../../../lib/browsers/cri-client'
|
||||
import { ProtocolManagerShape } from '@packages/types'
|
||||
import type { CriClient } from '../../../lib/browsers/cri-client'
|
||||
|
||||
const { expect, proxyquire, sinon } = require('../../spec_helper')
|
||||
|
||||
const DEBUGGER_URL = 'http://foo'
|
||||
@@ -98,8 +99,9 @@ describe('lib/browsers/cri-client', function () {
|
||||
'WebSocket is not open',
|
||||
// @see https://github.com/cypress-io/cypress/issues/7180
|
||||
'WebSocket is already in CLOSING or CLOSED state',
|
||||
'WebSocket connection closed',
|
||||
]).forEach((msg) => {
|
||||
it(`with '${msg}'`, async function () {
|
||||
it(`with one '${msg}' message it retries once`, async function () {
|
||||
const err = new Error(msg)
|
||||
|
||||
send.onFirstCall().rejects(err)
|
||||
@@ -111,6 +113,19 @@ describe('lib/browsers/cri-client', function () {
|
||||
|
||||
expect(send).to.be.calledTwice
|
||||
})
|
||||
|
||||
it(`with two '${msg}' message it retries twice`, async () => {
|
||||
const err = new Error(msg)
|
||||
|
||||
send.onFirstCall().rejects(err)
|
||||
send.onSecondCall().rejects(err)
|
||||
send.onThirdCall().resolves()
|
||||
|
||||
const client = await getClient()
|
||||
|
||||
await client.send('DOM.getDocument', { depth: -1 })
|
||||
expect(send).to.have.callCount(3)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user