mirror of
https://github.com/cypress-io/cypress.git
synced 2025-12-30 19:19:53 -06:00
WIP: continue making progress building out request retry
- swap out passthrough’s for pumpify / duplexify / pump - clean up error handling / messaging / retry logic
This commit is contained in:
@@ -7,6 +7,7 @@ debug = require("debug")("cypress:server:request")
|
||||
moment = require("moment")
|
||||
Promise = require("bluebird")
|
||||
stream = require("stream")
|
||||
pumpify = require("pumpify")
|
||||
agent = require("@packages/network").agent
|
||||
statusCode = require("./util/status_code")
|
||||
Cookies = require("./automation/cookies")
|
||||
@@ -177,80 +178,143 @@ pipeEvent = (source, destination, event) ->
|
||||
source.on event, (args...) ->
|
||||
destination.emit(event, args...)
|
||||
|
||||
createRetryingRequestStream = (opts) ->
|
||||
retryingReqStream = stream.PassThrough()
|
||||
createRetryingRequestStream = (opts = {}) ->
|
||||
retryStream = pumpify()
|
||||
|
||||
retryStream.on "error", (err) ->
|
||||
debugger
|
||||
|
||||
didAbort = false
|
||||
pipeSrc = null
|
||||
|
||||
emitError = (err) ->
|
||||
# retryStream.emit("error", err)
|
||||
retryStream.destroy(err)
|
||||
|
||||
tryStartStream = (iteration = 0) ->
|
||||
reqStream = r(opts)
|
||||
## if our request has been aborted
|
||||
## in the time that we were waiting to retry
|
||||
## then immediately bail
|
||||
if didAbort
|
||||
return
|
||||
|
||||
# re-attach so proxy can control the response
|
||||
retryingReqStream.abort = ->
|
||||
reqStream.abort()
|
||||
didReceiveResponse = false
|
||||
|
||||
retry = (err) ->
|
||||
attempts = iteration + 1
|
||||
|
||||
retry = ->
|
||||
delay = getDelayForRetry(iteration)
|
||||
|
||||
reqStream.abort()
|
||||
|
||||
retryingReqStream.emit("retry", { iteration, delay })
|
||||
debug("received an error on request. retrying after '#{delay}ms' %o", {
|
||||
opts
|
||||
attempts
|
||||
delay
|
||||
err
|
||||
})
|
||||
|
||||
retryStream.emit("retry", { attempts, delay })
|
||||
|
||||
debug("retry %o", { iteration, delay })
|
||||
|
||||
setTimeout ->
|
||||
tryStartStream(iteration + 1)
|
||||
tryStartStream(attempts)
|
||||
, delay
|
||||
|
||||
onUselessError = (err) ->
|
||||
# this reqStream is now garbage, but sometimes it still receives errors, so this can't crash the process
|
||||
debug("received another error on stream %o", err)
|
||||
reqStream = r(opts)
|
||||
delayStream = stream.PassThrough()
|
||||
|
||||
onError = (err) ->
|
||||
# on `error`, retry and emit `retry` on retryingReqStream
|
||||
retryStream.setPipeline([reqStream, delayStream])
|
||||
# retryStream.setWritable(reqStream)
|
||||
# retryStream.setReadable(delayStream)
|
||||
|
||||
debug("received an error creating stream %o", err)
|
||||
## if we're retrying and we previous piped
|
||||
## into the reqStream, then reapply this now
|
||||
if pipeSrc
|
||||
pipeSrc.pipe(reqStream)
|
||||
|
||||
reqStream.on "error", onUselessError
|
||||
## forward the abort call to the underlying request
|
||||
retryStream.abort = ->
|
||||
didAbort = true
|
||||
|
||||
reqStream.abort()
|
||||
|
||||
onPiped = (src) ->
|
||||
## store this so we can reapply it
|
||||
## if we need to retry
|
||||
## TODO: this needs to write to the fs
|
||||
## so we can re-read the request body
|
||||
## later and then remove it after the
|
||||
## response is complete
|
||||
pipeSrc = src
|
||||
|
||||
## when this passthrough stream is being piped into
|
||||
## then make sure we properly "forward" and connect
|
||||
## forward it to the real reqStream which enables
|
||||
## request to read off the IncomingMessage readable stream
|
||||
retryStream.once("pipe", onPiped)
|
||||
|
||||
reqStream.on "error", (err) ->
|
||||
if didReceiveResponse
|
||||
## if we've already begun processing the requests
|
||||
## response, then that means we failed during transit
|
||||
## and its no longer safe to retry. all we can do now
|
||||
## is propogate the error upwards
|
||||
debug("received an error on request after response started %o", { opts, err })
|
||||
|
||||
return emitError(err)
|
||||
|
||||
## otherwise, see if we can retry another request under the hood...
|
||||
|
||||
if not isRetriableError(err, opts)
|
||||
debug('this err aint retriable %o', err)
|
||||
return retryingReqStream.emit("error", err)
|
||||
debug("received a non-retryable request error %o", { opts, err })
|
||||
|
||||
return emitError(err)
|
||||
|
||||
if iteration >= MAX_REQUEST_RETRIES
|
||||
debug("retried %dx and still network error, not retrying", MAX_REQUEST_RETRIES)
|
||||
return retryingReqStream.emit("error", err)
|
||||
debug("exhausted all attempts to retry request", {
|
||||
attempts: iteration,
|
||||
opts,
|
||||
err,
|
||||
})
|
||||
|
||||
retry()
|
||||
return emitError(err)
|
||||
|
||||
reqStream.once "error", onError
|
||||
return retry(err)
|
||||
|
||||
## TODO: need to forward the other request + http.ClientRequest events
|
||||
## abort[ed], complete, request, etc...
|
||||
|
||||
reqStream.once "request", (req) ->
|
||||
## remove the pipe listener since once the request has
|
||||
## been made, we cannot pipe into the reqStream anymore
|
||||
retryStream.removeListener("pipe", onPiped)
|
||||
|
||||
reqStream.once "response", (incomingRes) ->
|
||||
reqStream.removeListener("error", onError)
|
||||
didReceiveResponse = true
|
||||
|
||||
## ok, no net error, but what about a bad status code?
|
||||
if hasRetriableStatusCodeFailure(incomingRes, opts) && iteration < MAX_REQUEST_RETRIES
|
||||
debug("received failing status code on res, retrying", _.pick(incomingRes, "statusCode"))
|
||||
|
||||
reqStream.on "error", onUselessError
|
||||
|
||||
return retry()
|
||||
|
||||
# on `response`, begin piping everything and re-emit `response` on retryingReqStream
|
||||
reqStream.pipe(retryingReqStream)
|
||||
## otherwise, we've successfully received a valid response...
|
||||
|
||||
# also need to pipe all the non-data events
|
||||
_.map(
|
||||
[
|
||||
# all `stream.Readable` events except "data"
|
||||
"close", "end", "error", "pause", "readable", "resume"
|
||||
],
|
||||
_.partial(pipeEvent, reqStream, retryingReqStream)
|
||||
)
|
||||
## forward the response event upwards which should happen
|
||||
## prior to the pipe event, same as what request does
|
||||
## https://github.com/request/request/blob/master/request.js#L1059
|
||||
retryStream.emit("response", incomingRes)
|
||||
|
||||
retryingReqStream.emit("response", incomingRes)
|
||||
# retryStream.setReadable(delayStream)
|
||||
|
||||
|
||||
return null
|
||||
|
||||
tryStartStream()
|
||||
|
||||
retryingReqStream
|
||||
return retryStream
|
||||
|
||||
module.exports = (options = {}) ->
|
||||
defaults = {
|
||||
|
||||
Reference in New Issue
Block a user