fix: do not re-use readstreams when retry an upload of the test replay recording (#29745)

* custom async retry for any async fn

* "proxied" fetch for PUT, throwing http errors

* cleans up put_fetch return types & parsing

* move protocol upload logic to put_protocol_artifact, rm upload_stream

* changelog

* changelog

* manually fix wsp in snapshot

* reorganizes a little, properly tests retryable logic

* removes 408 as a non-retriable error in unit test

* make expectation to be resolved more clear

* rm unnecessary type coercion

* changelog

* Update cli/CHANGELOG.md

* asyncRetry only throws Aggregate errors if there is more than one attempt

* Update packages/server/test/unit/cloud/network/is_retryable_error_spec.ts

Co-authored-by: Matt Schile <mschile@cypress.io>

* Update packages/server/test/unit/cloud/network/is_retryable_error_spec.ts

---------

Co-authored-by: Matt Schile <mschile@cypress.io>
This commit is contained in:
Cacie Prins
2024-07-08 09:30:05 -04:00
committed by GitHub
parent c93177ba74
commit 5f2236e6cc
15 changed files with 534 additions and 449 deletions
+8
View File
@@ -1,4 +1,12 @@
<!-- See the ../guides/writing-the-cypress-changelog.md for details on writing the changelog. -->
## 13.13.1
_Released 7/16/2024 (PENDING)_
**Bugfixes:**
- Fixed an issue where the ReadStream used to upload a Test Replay recording could erroneously be re-used when retrying in cases of retryable upload failures. Fixes [#29227](https://github.com/cypress-io/cypress/issues/29227)
## 13.13.0
_Released 7/01/2024_
@@ -1,9 +1,10 @@
import fsAsync from 'fs/promises'
import fs from 'fs'
import Debug from 'debug'
import { uploadStream, geometricRetry } from '../upload/upload_stream'
import { StreamActivityMonitor } from '../upload/stream_activity_monitor'
import { asyncRetry, linearDelay } from '../../util/async_retry'
import { putFetch, ParseKinds } from '../network/put_fetch'
import { isRetryableError } from '../network/is_retryable_error'
const debug = Debug('cypress:server:cloud:api:protocol-artifact')
// the upload will get canceled if the source stream does not
@@ -13,23 +14,37 @@ const debug = Debug('cypress:server:cloud:api:protocol-artifact')
const MAX_START_DWELL_TIME = 5000
const MAX_ACTIVITY_DWELL_TIME = 5000
export const putProtocolArtifact = async (artifactPath: string, maxFileSize: number, destinationUrl: string) => {
debug(`Atttempting to upload Test Replay archive from ${artifactPath} to ${destinationUrl})`)
const { size } = await fsAsync.stat(artifactPath)
export const _delay = linearDelay(500)
if (size > maxFileSize) {
throw new Error(`Spec recording too large: artifact is ${size} bytes, limit is ${maxFileSize} bytes`)
}
export const putProtocolArtifact = asyncRetry(
async (artifactPath: string, maxFileSize: number, destinationUrl: string) => {
debug(`Atttempting to upload Test Replay archive from ${artifactPath} to ${destinationUrl})`)
const { size } = await fsAsync.stat(artifactPath)
const activityMonitor = new StreamActivityMonitor(MAX_START_DWELL_TIME, MAX_ACTIVITY_DWELL_TIME)
const fileStream = fs.createReadStream(artifactPath)
if (size > maxFileSize) {
throw new Error(`Spec recording too large: artifact is ${size} bytes, limit is ${maxFileSize} bytes`)
}
await uploadStream(
fileStream,
destinationUrl,
size, {
retryDelay: geometricRetry,
activityMonitor,
},
)
}
const activityMonitor = new StreamActivityMonitor(MAX_START_DWELL_TIME, MAX_ACTIVITY_DWELL_TIME)
const fileStream = fs.createReadStream(artifactPath)
const controller = activityMonitor.getController()
await putFetch(destinationUrl, {
parse: ParseKinds.TEXT,
headers: {
'content-length': String(size),
'content-type': 'application/x-tar',
'accept': 'application/json',
},
// ts thinks this is a web fetch, which only expects ReadableStreams.
// But, this is a node fetch, which supports ReadStreams.
// @ts-expect-error
body: activityMonitor.monitor(fileStream),
signal: controller.signal,
})
}, {
maxAttempts: 3,
retryDelay: _delay,
shouldRetry: isRetryableError,
},
)
@@ -11,8 +11,8 @@ import { IArtifact, ArtifactUploadResult, ArtifactKinds } from './artifact'
import { createScreenshotArtifactBatch } from './screenshot_artifact'
import { createVideoArtifact } from './video_artifact'
import { createProtocolArtifact, composeProtocolErrorReportFromOptions } from './protocol_artifact'
import { HttpError } from '../api/http_error'
import { NetworkError } from '../api/network_error'
import { HttpError } from '../network/http_error'
import { NetworkError } from '../network/network_error'
const debug = Debug('cypress:server:cloud:artifacts')
@@ -216,6 +216,7 @@ export const uploadArtifacts = async (options: UploadArtifactOptions) => {
if (postUploadProtocolFatalError && postUploadProtocolFatalError.captureMethod === 'uploadCaptureArtifact') {
const error = postUploadProtocolFatalError.error
debug('protocol error: %O', error)
if ((error as AggregateError).errors) {
// eslint-disable-next-line no-console
console.log('')
@@ -1,4 +1,4 @@
import { scrubUrl } from './scrub_url'
import { scrubUrl } from '../api/scrub_url'
export const HttpErrorKind = 'HttpError'
@@ -0,0 +1,9 @@
import { NetworkError } from './network_error'
import { HttpError } from './http_error'
export const isRetryableError = (error?: Error) => {
return error ? (
NetworkError.isNetworkError(error) ||
HttpError.isHttpError(error) && [408, 429, 502, 503, 504].includes(error.status)
) : false
}
@@ -0,0 +1,11 @@
const ParseErrorKind = 'ParseErrorKind'
export class ParseError extends Error {
public readonly kind = ParseErrorKind
constructor (public readonly originalError: Error, message?: string) {
super(message)
}
static isParseError (err: Error & { kind: string }): err is ParseError {
return err.kind === ParseErrorKind
}
}
@@ -0,0 +1,82 @@
import crossFetch from 'cross-fetch'
import { NetworkError } from './network_error'
import { HttpError } from './http_error'
import { ParseError } from './parse_error'
import { agent } from '@packages/network'
import Debug from 'debug'
const debug = Debug('cypress-verbose:server:cloud:api:put')
type PutInit = Omit<RequestInit, 'agent' | 'method'>
export const ParseKinds = Object.freeze({
JSON: 'json',
TEXT: 'text',
})
type ParseKind = typeof ParseKinds[keyof typeof ParseKinds]
type PutOptions = PutInit & {
parse?: ParseKind
}
export async function putFetch <
TReturn extends any
> (input: RequestInfo | URL, options: PutOptions = { parse: 'json' }): Promise<TReturn> {
const {
parse,
...init
} = options
debug('Initiating PUT %s', input)
try {
const response = await crossFetch(input, {
...(init || {}),
method: 'PUT',
// cross-fetch thinks this is in the browser, so declares
// types based on that rather than on node-fetch which it
// actually uses under the hood. node-fetch supports `agent`.
// @ts-expect-error
agent,
})
if (response.status >= 400) {
const err = await HttpError.fromResponse(response)
throw err
}
try {
switch (parse) {
case ParseKinds.JSON:
return await response.json() as TReturn
case ParseKinds.TEXT:
return await response.text() as TReturn
default:
return response.body as any
}
} catch (e) {
const parseError = new ParseError(e, e.message)
parseError.stack = e.stack
throw parseError
}
} catch (e) {
debug('Error: %O', e)
if (ParseError.isParseError(e)) {
throw e
} else if (HttpError.isHttpError(e)) {
throw e
}
// if the error wasn't a parsing error, it's probably a Network error
const url = typeof input === 'string' ? input :
input instanceof URL ? input.href :
input instanceof Request ? input.url : 'UNKNOWN_URL'
const networkError = new NetworkError(e, url)
networkError.stack = e.stack
throw networkError
}
}
@@ -1,150 +0,0 @@
import crossFetch from 'cross-fetch'
import fetchCreator from 'fetch-retry-ts'
import type { ReadStream } from 'fs'
import type { StreamActivityMonitor } from './stream_activity_monitor'
import Debug from 'debug'
import { HttpError } from '../api/http_error'
import { NetworkError } from '../api/network_error'
import { agent } from '@packages/network'
const debug = Debug('cypress:server:cloud:uploadStream')
const debugVerbose = Debug('cypress-verbose:server:cloud:uploadStream')
/**
* These are retryable status codes. Other status codes are not valid for automatic
* retries: they indicate some issue with the client making the request, or that
* the server can never fulfill the request. Some of these status codes should only
* be retried if the request is idempotent, but I think it's fine for S3 for now.
* - 408 Request Timeout
* - 429 Too Many Requests (S3 can return this)
* - 502 Bad Gateway
* - 503 Service Unavailable
* - 504 Gateway Timeout
* other http status codes are not valid for automatic retries.
*/
const RETRYABLE_STATUS_CODES = [408, 429, 502, 503, 504]
/**
* expected to be passed into uploadStream: nock + delay is very difficult to
* use fake timers for, as the callback to generate a nock response (expectedly)
* executes before any retry is attempted, and there is no wait to "await" that
* retry hook to advance fake timers. If a method of using fake timers with nock
* is known, this can be refactored to simplify the uploadStream signature and
* bake the geometricRetry logic into the args passed to fetchCreator.
* without passing in a noop delay in the tests, or some way of advancing sinon's
* clock, the tests for uploadStream would take too long to execute.
*/
export const geometricRetry = (n) => {
return (n + 1) * 500
}
const identity = <T>(arg: T) => arg
type UploadStreamOptions = {
retryDelay?: (count: number) => number
activityMonitor?: StreamActivityMonitor
}
export const uploadStream = async (fileStream: ReadStream, destinationUrl: string, fileSize: number, options?: UploadStreamOptions): Promise<void> => {
debug(`Uploading file stream (${fileSize} bytes) to ${destinationUrl}`)
const retryDelay = options?.retryDelay ?? identity
const timeoutMonitor = options?.activityMonitor ?? undefined
/**
* To support more robust error messages from the server than statusText, we attempt to
* retrieve response.json(). This is async, so a list of error promises is stored here
* that must be resolved before throwing an aggregate error. This is necessary because
* ts-fetch-retry's retryOn fn does not support returning a promise.
*/
const errorPromises: Promise<Error>[] = []
const abortController = timeoutMonitor?.getController()
debug('PUT %s: %d byte file upload initiated', destinationUrl, fileSize)
const retryableFetch = fetchCreator(crossFetch as typeof crossFetch, {
retries: 2,
retryDelay,
retryOn: (attempt, retries, error, response) => {
debugVerbose('PUT %s Response: %O', destinationUrl, response)
debugVerbose('PUT %s Error: %O', destinationUrl, error)
// Record all HTTP errors encountered
const isHttpError = response?.status && response?.status >= 400
const isNetworkError = error && !timeoutMonitor?.getController().signal.reason
if (isHttpError) {
errorPromises.push(HttpError.fromResponse(response))
} else if (isNetworkError) {
errorPromises.push(Promise.resolve(new NetworkError(error, destinationUrl)))
}
const isUnderRetryLimit = attempt < retries
const isRetryableHttpError = (!!response?.status && RETRYABLE_STATUS_CODES.includes(response.status))
debug('checking if should retry: %s %O', destinationUrl, {
attempt,
retries,
networkError: error,
status: response?.status,
statusText: response?.statusText,
})
return (
isUnderRetryLimit && // retries param is ignored if retryOn is a fn, so have to impl
(isNetworkError || isRetryableHttpError)
)
},
})
return new Promise(async (resolve, reject) => {
debug(`${destinationUrl}: PUT ${fileSize}`)
try {
const response = await retryableFetch(destinationUrl, {
agent,
method: 'PUT',
headers: {
'content-length': String(fileSize),
'content-type': 'application/x-tar',
'accept': 'application/json',
},
// ts thinks this is a web fetch, which only expects ReadableStreams.
// But, this is a node fetch, which supports ReadStreams.
// @ts-expect-error
body: timeoutMonitor ? timeoutMonitor.monitor(fileStream) : fileStream,
...(abortController && { signal: abortController.signal }),
})
debug('PUT %s: HTTP %d %s', destinationUrl, response.status, response.statusText)
if (response.status >= 400) {
const errors = await Promise.all(errorPromises)
reject(
errors.length > 1 ?
new AggregateError(errors, `${errors.length} errors encountered during upload`) :
errors[0],
)
} else {
// S3 does not include a response body - if the request succeeds,
// a simple 200 response is returned
resolve()
}
} catch (e) {
debug('error on upload:', e)
const signalError = abortController?.signal.reason
const errors = await Promise.all(errorPromises)
debug('errors on upload:')
errors.forEach((e) => debug(e))
if (signalError && !errors.includes(signalError)) {
errors.push(signalError)
}
if (errors.length > 1) {
reject(new AggregateError(errors, `${errors.length} errors encountered during upload`))
} else {
reject(errors[0])
}
}
})
}
+54
View File
@@ -0,0 +1,54 @@
type RetryOptions = {
maxAttempts: number
retryDelay?: (attempt: number) => number
shouldRetry?: (err?: Error) => boolean
}
export function asyncRetry <
TArgs extends any[],
TResult extends any,
> (fn: (...args: TArgs) => Promise<TResult>, options: RetryOptions) {
return async (...args: TArgs): Promise<TResult> => {
let attempt = 0
let errors: Error[] = []
const shouldRetry = options.shouldRetry ?? (() => true)
do {
try {
return await fn(...args)
} catch (e) {
attempt++
errors.push(e as Error)
if (!shouldRetry(e)) {
if (errors.length === 1) {
throw e
}
throw new AggregateError(errors)
}
const delay = options.retryDelay ? options.retryDelay(attempt) : undefined
if (delay !== undefined) {
await new Promise((resolve) => {
return setTimeout(resolve, delay)
})
}
}
} while (attempt < options.maxAttempts)
if (errors.length === 1) {
throw errors[0]
}
throw new AggregateError(errors)
}
}
export const linearDelay = (inc: number) => {
return (attempt: number) => {
return attempt * inc
}
}
@@ -7,8 +7,10 @@ import chaiAsPromised from 'chai-as-promised'
import { ReadStream } from 'fs'
import { StreamActivityMonitor } from '../../../../lib/cloud/upload/stream_activity_monitor'
import { uploadStream } from '../../../../lib/cloud/upload/upload_stream'
import { HttpError } from '../../../../lib/cloud/api/http_error'
import { HttpError } from '../../../../lib/cloud/network/http_error'
import { putFetch, ParseKinds } from '../../../../lib/cloud/network/put_fetch'
import { linearDelay, asyncRetry } from '../../../../lib/util/async_retry'
import { isRetryableError } from '../../../../lib/cloud/network/is_retryable_error'
chai.use(chaiAsPromised).use(sinonChai)
@@ -16,14 +18,12 @@ describe('putProtocolArtifact', () => {
let filePath: string
let maxFileSize: number
let fileSize: number
let mockStreamMonitor
let mockStreamMonitor: sinon.SinonStubbedInstance<StreamActivityMonitor>
let mockReadStream
let destinationUrl
let statStub: sinon.SinonStub
let uploadStreamStub: sinon.SinonStub<Parameters<typeof uploadStream>, ReturnType<typeof uploadStream>>
let geometricRetryStub: sinon.SinonStub
let asyncRetryStub: sinon.SinonStub<Parameters<typeof asyncRetry>, ReturnType<typeof asyncRetry>>
let putFetchStub: sinon.SinonStub<Parameters<typeof putFetch>, ReturnType<typeof putFetch>>
let putProtocolArtifact: (artifactPath: string, maxFileSize: number, destinationUrl: string) => Promise<void>
/**
@@ -69,17 +69,30 @@ describe('putProtocolArtifact', () => {
createReadStream: sinon.stub().returns(mockReadStream),
})
geometricRetryStub = sinon.stub()
putFetchStub = sinon.stub()
uploadStreamStub = sinon.stub<Parameters<typeof uploadStream>, ReturnType<typeof uploadStream>>()
// these paths need to be what `put_protocol_artifact` used to import them
mockery.registerMock('../upload/upload_stream', {
geometricRetry: geometricRetryStub,
uploadStream: uploadStreamStub,
mockery.registerMock('../network/put_fetch', {
putFetch: putFetchStub,
ParseKinds,
})
asyncRetryStub = sinon.stub()
// asyncRetry is already unit tested, no need to test retry behavior: identity stub
asyncRetryStub.callsFake((fn) => fn)
mockery.registerMock('../../util/async_retry', {
asyncRetry: asyncRetryStub,
linearDelay,
})
const mockAbortController = sinon.createStubInstance(AbortController)
const mockSignal = sinon.createStubInstance(AbortSignal)
sinon.stub(mockAbortController, 'signal').get(() => mockSignal)
mockStreamMonitor = sinon.createStubInstance(StreamActivityMonitor)
mockStreamMonitor.getController.returns(mockAbortController)
mockery.registerMock('../upload/stream_activity_monitor', {
StreamActivityMonitor: sinon.stub().callsFake(() => {
return mockStreamMonitor
@@ -92,7 +105,9 @@ describe('putProtocolArtifact', () => {
stat: statStub,
})
putProtocolArtifact = require('../../../../lib/cloud/api/put_protocol_artifact').putProtocolArtifact
const req = require('../../../../lib/cloud/api/put_protocol_artifact')
putProtocolArtifact = req.putProtocolArtifact
})
afterEach(() => {
@@ -100,6 +115,15 @@ describe('putProtocolArtifact', () => {
mockery.disable()
})
it('is wrapped with an asyncRetry', () => {
const options = asyncRetryStub.firstCall.args[1]
expect(options.maxAttempts).to.eq(3)
expect(options.retryDelay).to.be.a('function')
// because of mockery, the isRetryableError ref here is different than the one imported into put_protocol_artifact_spec
expect(options.shouldRetry.toString()).to.eq(isRetryableError.toString())
})
describe('when provided an artifact path that does not exist', () => {
let invalidPath: string
@@ -146,37 +170,29 @@ describe('putProtocolArtifact', () => {
})
describe('and fetch completes successfully', () => {
it('resolves', () => {
uploadStreamStub.withArgs(
mockReadStream,
destinationUrl,
fileSize, {
retryDelay: geometricRetryStub,
activityMonitor: mockStreamMonitor,
},
).resolves()
it('resolves', async () => {
putFetchStub.resolves()
return expect(putProtocolArtifact(filePath, maxFileSize, destinationUrl)).to.be.fulfilled
expect(putProtocolArtifact(filePath, maxFileSize, destinationUrl)).to.be.fulfilled
})
})
describe('and uploadStream rejects', () => {
describe('and putFetch rejects', () => {
let httpErr: HttpError
let res: Response
beforeEach(() => {
res = sinon.createStubInstance(Response)
httpErr = new HttpError(`403 Forbidden (${destinationUrl})`, res)
uploadStreamStub.withArgs(
mockReadStream,
httpErr = new HttpError(
`403 Forbidden (${destinationUrl})`,
destinationUrl,
fileSize, {
retryDelay: geometricRetryStub,
activityMonitor: mockStreamMonitor,
},
).rejects(httpErr)
403,
'Forbidden',
res,
)
putFetchStub.rejects(httpErr)
})
it('rethrows', async () => {
@@ -0,0 +1,37 @@
import { isRetryableError } from '../../../../lib/cloud/network/is_retryable_error'
import { NetworkError } from '../../../../lib/cloud/network/network_error'
import { HttpError } from '../../../../lib/cloud/network/http_error'
import sinon from 'sinon'
import chai, { expect } from 'chai'
import sinonChai from 'sinon-chai'
chai.use(sinonChai)
describe('isRetryableError', () => {
const url = 'http://some/url'
it('returns true with a NetworkError', () => {
expect(isRetryableError(new NetworkError(new Error(), url))).to.be.true
})
it('returns true with retryable http errors', () => {
[408, 429, 502, 503, 504].forEach((status) => {
const err = new HttpError('some error', url, status, 'status text', sinon.createStubInstance(Response))
expect(isRetryableError(err)).to.be.true
})
})
it('returns false with non-retryable http errors', () => {
[400, 401, 402, 403, 404, 405, 406, 407, 409, 410, 411, 412, 413, 414, 416, 417, 418, 421, 422, 423, 424, 425, 426, 428, 431, 451, 500, 501, 505, 507, 508, 510, 511].forEach((status) => {
const err = new HttpError('some error', url, status, 'status text', sinon.createStubInstance(Response))
expect(isRetryableError(err)).to.be.false
})
})
it('returns false for other errors', () => {
expect(isRetryableError(new Error())).to.be.false
})
})
@@ -0,0 +1,128 @@
import sinon from 'sinon'
import { expect } from 'chai'
import { Response } from 'cross-fetch'
import proxyquire from 'proxyquire'
import type { putFetch } from '../../../../lib/cloud/network/put_fetch'
import { ParseError } from '../../../../lib/cloud/network/parse_error'
import { HttpError } from '../../../../lib/cloud/network/http_error'
import { NetworkError } from '../../../../lib/cloud/network/network_error'
describe('cloud/network/put_fetch', () => {
const url = 'https://some.test/url'
const jsonText = '{ "content": "json" }'
const jsonObj = JSON.parse(jsonText)
const nonJsonText = 'some text response'
const badJsonErr = 'Unexpected token < in JSON at position 0'
let resolveVal
let stubbedCrossFetch: sinon.SinonStub
let fetch: typeof putFetch
beforeEach(() => {
stubbedCrossFetch = sinon.stub()
const importPutFetch = proxyquire.noCallThru()('../../../../lib/cloud/network/put_fetch', {
'cross-fetch': stubbedCrossFetch,
})
fetch = importPutFetch.putFetch
})
describe('when fetch resolves', () => {
beforeEach(() => {
resolveVal = new Response()
sinon.stub(resolveVal, 'url').get(() => url)
stubbedCrossFetch.resolves(resolveVal)
})
describe('when fetch resolves with a json-parseable response', () => {
beforeEach(() => {
sinon.stub(resolveVal, 'json').resolves(jsonObj)
sinon.stub(resolveVal, 'text').resolves(jsonText)
})
describe('and parseJSON flag is true', () => {
it('resolves with the parsed object', async () => {
const res = await fetch<{'content': string}>(url)
expect(res).to.eq(jsonObj)
})
})
describe('and parseJSON flag is false', () => {
it('resolves with the response text as a string', async () => {
const res = await fetch(url, { parse: 'text' })
expect(res).to.eq(jsonText)
})
})
})
describe('when fetch resolves with a non-json-parseable response', () => {
beforeEach(() => {
sinon.stub(resolveVal, 'json').rejects(new Error(badJsonErr))
sinon.stub(resolveVal, 'text').resolves(nonJsonText)
})
describe('and default parse (json) is used', () => {
it('throws a parse error', async () => {
let err: any
try {
await fetch(url)
} catch (e) {
err = e
}
expect(err.message).to.eq(badJsonErr)
expect(ParseError.isParseError(err)).to.be.true
})
})
describe('and text parse is used', () => {
it('resolves with the response text as a string', async () => {
const res = await fetch(url, { parse: 'text' })
expect(res).to.eq(nonJsonText)
})
})
})
describe('when fetch resolves with a response indicative of an http error', () => {
beforeEach(() => {
sinon.stub(resolveVal, 'status').get(() => 400)
sinon.stub(resolveVal, 'statusText').get(() => 'Bad Request')
sinon.stub(resolveVal, 'text').resolves(`<error><ref>4125</ref><kind>BadRequest</kind></error>`)
sinon.stub(resolveVal, 'json').rejects(badJsonErr)
})
it('throws an HttpError', async () => {
let err
try {
await fetch(url, { parse: 'text' })
} catch (e) {
err = e
}
expect(err).not.to.be.undefined
expect(HttpError.isHttpError(err)).to.be.true
})
})
})
describe('when fetch rejects with a network error', () => {
const networkErrMsg = 'Error: ECONNRESET'
beforeEach(() => {
stubbedCrossFetch.rejects(new Error(networkErrMsg))
})
it('throws a NetworkError', async () => {
let err
try {
await fetch(url, { parse: 'text' })
} catch (e) {
err = e
}
expect(NetworkError.isNetworkError(err)).to.be.true
})
})
})
@@ -1,242 +0,0 @@
/// <reference lib="es2021" />
import fs, { ReadStream } from 'fs'
import { Readable } from 'stream'
import sinon from 'sinon'
import chai, { expect } from 'chai'
import sinonChai from 'sinon-chai'
import chaiAsPromised from 'chai-as-promised'
import { uploadStream, geometricRetry } from '../../../../lib/cloud/upload/upload_stream'
import { HttpError } from '../../../../lib/cloud/api/http_error'
import { StreamActivityMonitor, StreamStalledError, StreamStartTimedOutError } from '../../../../lib/cloud/upload/stream_activity_monitor'
chai.use(chaiAsPromised).use(sinonChai)
import nock from 'nock'
describe('geometricRetry', () => {
it('returns a geometrically increasing n', () => {
expect(geometricRetry(0)).to.eq(500)
expect(geometricRetry(1)).to.eq(1000)
expect(geometricRetry(2)).to.eq(1500)
})
})
describe('uploadStream', () => {
let destinationUrl: string
let destinationPath: string
let destinationDomain: string
let uploadPromise: Promise<any>
let scope: nock.Scope
let fileSize: number
let fsReadStream: ReadStream
let fileContents: string
function execSimpleStream () {
fsReadStream.push(fileContents)
fsReadStream.push(null)
}
function mockUpload () {
return scope.put(destinationPath, undefined, {
reqheaders: {
'Accept': 'application/json',
'Content-Type': 'application/x-tar',
'Content-Length': fileSize.toString(),
},
})
}
beforeEach(() => {
fileContents = 'lorem ipsum dolor set'
fileSize = fileContents.length
fsReadStream = new Readable() as ReadStream
sinon.stub(fs, 'createReadStream').callsFake(() => {
return fsReadStream
})
destinationDomain = 'http://somedomain.test'
destinationPath = '/upload'
destinationUrl = `${destinationDomain}${destinationPath}`
scope = nock(destinationDomain)
})
afterEach(() => {
(fs.createReadStream as sinon.SinonStub).restore()
nock.cleanAll()
})
describe('when fetch resolves with a 200 OK', () => {
beforeEach(() => {
mockUpload().reply(200, 'OK')
})
it(`resolves`, async () => {
uploadPromise = uploadStream(fsReadStream, destinationUrl, fileSize)
execSimpleStream()
await expect(uploadPromise).to.be.fulfilled
})
})
describe('when fetch resolves with a 4xx/5xx response', () => {
const status = 403
beforeEach(() => {
mockUpload().reply(status)
})
it('rejects with an appropriate HttpError', async () => {
const uploadPromise = uploadStream(fsReadStream, destinationUrl, fileSize)
execSimpleStream()
let err: Error | undefined
try {
await uploadPromise
} catch (e) {
err = e
}
expect(err).to.be.instanceOf(HttpError)
expect(err?.message).to.eq(`403 Forbidden (${destinationUrl})`)
})
})
describe('retry behavior', () => {
describe('when fetch resolves with a retryable http status code 3 times', () => {
const callCount = 3
let retryDelay
beforeEach(() => {
retryDelay = sinon.stub().returns((n: number) => {
return n
})
})
;[408, 429, 502, 503, 504].forEach((status) => {
it(`makes a total of ${callCount} calls for HTTP ${status} and eventually rejects`, async () => {
let count = 0
const inc = () => {
count++
}
mockUpload().times(4).reply(status, inc)
const uploadPromise = uploadStream(fsReadStream, destinationUrl, fileSize, {
retryDelay,
})
execSimpleStream()
await expect(uploadPromise).to.eventually.be.rejectedWith(AggregateError)
expect(retryDelay).to.have.been.called
expect(count).to.eq(callCount)
})
})
it('throws an aggregate error containing all of the errors encountered', async () => {
let uploadPromise
mockUpload().reply(503)
mockUpload().reply(408)
mockUpload().reply(502)
let error: AggregateError | undefined
try {
uploadPromise = uploadStream(fsReadStream, destinationUrl, fileSize)
execSimpleStream()
await uploadPromise
} catch (e) {
error = e
}
expect(error).not.be.undefined
expect(error?.message).to.eq('3 errors encountered during upload')
expect(error?.errors[0]?.message).to.eq(`503 Service Unavailable (${destinationUrl})`)
expect(error?.errors[1]?.message).to.eq(`408 Request Timeout (${destinationUrl})`)
expect(error?.errors[2]?.message).to.eq(`502 Bad Gateway (${destinationUrl})`)
})
})
describe('when fetch resolves with a retryable status code 2x, and then a 200', () => {
const callCount = 3
;[408, 429, 502, 503, 504].forEach((status) => {
it(`makes a total of ${callCount} requests after HTTP ${status} and eventually resolves`, async () => {
let count = 0
const inc = () => count++
mockUpload().reply(status, inc)
mockUpload().reply(status, inc)
mockUpload().reply(200, inc)
const uploadPromise = uploadStream(fsReadStream, destinationUrl, fileSize)
execSimpleStream()
await expect(uploadPromise).to.be.fulfilled
expect(count).to.eq(callCount)
})
})
})
})
describe('when passed a timeout controller', () => {
let activityMonitor: StreamActivityMonitor
const maxStartDwellTime = 1000
const maxActivityDwellTime = 1000
let abortController: AbortController
beforeEach(() => {
abortController = new AbortController()
activityMonitor = new StreamActivityMonitor(maxStartDwellTime, maxActivityDwellTime)
sinon.stub(activityMonitor, 'getController').callsFake(() => abortController)
sinon.stub(activityMonitor, 'monitor').callsFake((arg) => arg)
})
it('pipes the readstream through the timeout controller monitoring method', async () => {
mockUpload().reply(200)
const uploadPromise = uploadStream(fsReadStream, destinationUrl, fileSize, {
activityMonitor,
})
execSimpleStream()
await expect(uploadPromise).to.be.fulfilled
expect(activityMonitor.monitor).to.be.calledWith(fsReadStream)
})
describe('and the timeout monitor\'s signal aborts with a StreamStartTimedOut error', () => {
beforeEach(() => {
abortController.abort(new StreamStartTimedOutError(maxStartDwellTime))
})
it('rejects with a StreamStartFailed error', async () => {
const uploadPromise = uploadStream(fsReadStream, destinationUrl, fileSize, {
activityMonitor,
})
await expect(uploadPromise).to.be.rejectedWith(StreamStartTimedOutError)
})
})
describe('and the timeout monitor\'s signal aborts with a StreamStalled error', () => {
beforeEach(() => {
abortController.abort(new StreamStalledError(maxActivityDwellTime))
})
it('rejects with a StreamStalled error', async () => {
const uploadPromise = uploadStream(fsReadStream, destinationUrl, fileSize, {
activityMonitor,
})
await expect(uploadPromise).to.be.rejectedWith(StreamStalledError)
})
})
})
})
@@ -0,0 +1,116 @@
import { asyncRetry } from '../../../lib/util/async_retry'
import sinon from 'sinon'
import { expect } from 'chai'
describe('asyncRetry', () => {
let asyncFn
const resolution = { result: 'success' }
beforeEach(() => {
asyncFn = sinon.stub()
})
describe('base retry behavior', () => {
describe('when succeeds on the first try', () => {
beforeEach(() => {
asyncFn.onFirstCall().resolves(resolution)
})
it('resolves with the expected resolution, only having called the original fn once', async () => {
const res = await asyncRetry(asyncFn, {
maxAttempts: 3,
})()
expect(res).to.eq(resolution)
expect(asyncFn).to.have.been.calledOnce
})
})
describe('when succeeds on the second try', () => {
beforeEach(() => {
asyncFn.onFirstCall().rejects(new Error('first call rejection')).onSecondCall().resolves(resolution)
})
it('resolves with the expected resolution, only having called the original fn twice', async () => {
const res = await asyncRetry(asyncFn, {
maxAttempts: 2,
})()
expect(res).to.eq(resolution)
expect(asyncFn).to.have.been.calledTwice
})
})
describe('when succeeds on the third try, with max attempts as 2', () => {
beforeEach(() => {
asyncFn
.onFirstCall().rejects(new Error('first call rejection'))
.onSecondCall().rejects(new Error('second call rejection'))
.onThirdCall().resolves()
})
it('rejects with an aggregate error, having called original fn only twice', async () => {
let thrown: AggregateError | undefined = undefined
try {
await asyncRetry(asyncFn, { maxAttempts: 2 })()
} catch (e) {
thrown = e
}
expect(thrown).not.to.be.undefined
expect(thrown.errors.length).to.be.eq(2)
expect(thrown.errors[0].message).to.eq('first call rejection')
expect(thrown.errors[1].message).to.eq('second call rejection')
expect(asyncFn).to.have.been.calledTwice
})
})
describe('when fails on the first try, and a retry is not warranted', () => {
let err
beforeEach(() => {
err = new Error('some error')
asyncFn.rejects(err)
})
it('throws a non-aggregate error', async () => {
let thrown: Error & { errors?: any[] }
try {
await asyncRetry(asyncFn, { maxAttempts: 1 })()
} catch (e) {
thrown = e
}
expect(thrown.message).to.eq(err.message)
expect(thrown.errors).to.be.undefined
})
})
})
describe('retry delay', () => {
let clock: sinon.SinonFakeTimers
beforeEach(() => {
asyncFn.rejects(new Error('reject to test retry delay'))
clock = sinon.useFakeTimers()
})
afterEach(() => {
sinon.restore()
})
it('waits for a duration returned by retryDelay between each retry', async () => {
const delay = 500
const asyncP = asyncRetry(asyncFn, { maxAttempts: 2, retryDelay: () => delay })().catch((e) => {})
await clock.tickAsync(1)
expect(asyncFn).to.have.been.calledOnce
await clock.tickAsync(delay)
expect(asyncFn).to.have.been.calledTwice
await clock.tickAsync(delay)
await asyncP
expect(asyncFn).to.have.been.calledTwice
})
})
})