WIP: continue making progress building out request retry

- swap out passthrough’s for pumpify / duplexify / pump
- clean up error handling / messaging / retry logic
This commit is contained in:
Brian Mann
2019-04-29 01:56:18 -04:00
parent 24e52d27ab
commit 7be1051197

View File

@@ -7,6 +7,7 @@ debug = require("debug")("cypress:server:request")
moment = require("moment")
Promise = require("bluebird")
stream = require("stream")
pumpify = require("pumpify")
agent = require("@packages/network").agent
statusCode = require("./util/status_code")
Cookies = require("./automation/cookies")
@@ -177,80 +178,143 @@ pipeEvent = (source, destination, event) ->
source.on event, (args...) ->
destination.emit(event, args...)
createRetryingRequestStream = (opts) ->
retryingReqStream = stream.PassThrough()
createRetryingRequestStream = (opts = {}) ->
retryStream = pumpify()
retryStream.on "error", (err) ->
debugger
didAbort = false
pipeSrc = null
emitError = (err) ->
# retryStream.emit("error", err)
retryStream.destroy(err)
tryStartStream = (iteration = 0) ->
reqStream = r(opts)
## if our request has been aborted
## in the time that we were waiting to retry
## then immediately bail
if didAbort
return
# re-attach so proxy can control the response
retryingReqStream.abort = ->
reqStream.abort()
didReceiveResponse = false
retry = (err) ->
attempts = iteration + 1
retry = ->
delay = getDelayForRetry(iteration)
reqStream.abort()
retryingReqStream.emit("retry", { iteration, delay })
debug("received an error on request. retrying after '#{delay}ms' %o", {
opts
attempts
delay
err
})
retryStream.emit("retry", { attempts, delay })
debug("retry %o", { iteration, delay })
setTimeout ->
tryStartStream(iteration + 1)
tryStartStream(attempts)
, delay
onUselessError = (err) ->
# this reqStream is now garbage, but sometimes it still receives errors, so this can't crash the process
debug("received another error on stream %o", err)
reqStream = r(opts)
delayStream = stream.PassThrough()
onError = (err) ->
# on `error`, retry and emit `retry` on retryingReqStream
retryStream.setPipeline([reqStream, delayStream])
# retryStream.setWritable(reqStream)
# retryStream.setReadable(delayStream)
debug("received an error creating stream %o", err)
## if we're retrying and we previous piped
## into the reqStream, then reapply this now
if pipeSrc
pipeSrc.pipe(reqStream)
reqStream.on "error", onUselessError
## forward the abort call to the underlying request
retryStream.abort = ->
didAbort = true
reqStream.abort()
onPiped = (src) ->
## store this so we can reapply it
## if we need to retry
## TODO: this needs to write to the fs
## so we can re-read the request body
## later and then remove it after the
## response is complete
pipeSrc = src
## when this passthrough stream is being piped into
## then make sure we properly "forward" and connect
## forward it to the real reqStream which enables
## request to read off the IncomingMessage readable stream
retryStream.once("pipe", onPiped)
reqStream.on "error", (err) ->
if didReceiveResponse
## if we've already begun processing the requests
## response, then that means we failed during transit
## and its no longer safe to retry. all we can do now
## is propogate the error upwards
debug("received an error on request after response started %o", { opts, err })
return emitError(err)
## otherwise, see if we can retry another request under the hood...
if not isRetriableError(err, opts)
debug('this err aint retriable %o', err)
return retryingReqStream.emit("error", err)
debug("received a non-retryable request error %o", { opts, err })
return emitError(err)
if iteration >= MAX_REQUEST_RETRIES
debug("retried %dx and still network error, not retrying", MAX_REQUEST_RETRIES)
return retryingReqStream.emit("error", err)
debug("exhausted all attempts to retry request", {
attempts: iteration,
opts,
err,
})
retry()
return emitError(err)
reqStream.once "error", onError
return retry(err)
## TODO: need to forward the other request + http.ClientRequest events
## abort[ed], complete, request, etc...
reqStream.once "request", (req) ->
## remove the pipe listener since once the request has
## been made, we cannot pipe into the reqStream anymore
retryStream.removeListener("pipe", onPiped)
reqStream.once "response", (incomingRes) ->
reqStream.removeListener("error", onError)
didReceiveResponse = true
## ok, no net error, but what about a bad status code?
if hasRetriableStatusCodeFailure(incomingRes, opts) && iteration < MAX_REQUEST_RETRIES
debug("received failing status code on res, retrying", _.pick(incomingRes, "statusCode"))
reqStream.on "error", onUselessError
return retry()
# on `response`, begin piping everything and re-emit `response` on retryingReqStream
reqStream.pipe(retryingReqStream)
## otherwise, we've successfully received a valid response...
# also need to pipe all the non-data events
_.map(
[
# all `stream.Readable` events except "data"
"close", "end", "error", "pause", "readable", "resume"
],
_.partial(pipeEvent, reqStream, retryingReqStream)
)
## forward the response event upwards which should happen
## prior to the pipe event, same as what request does
## https://github.com/request/request/blob/master/request.js#L1059
retryStream.emit("response", incomingRes)
retryingReqStream.emit("response", incomingRes)
# retryStream.setReadable(delayStream)
return null
tryStartStream()
retryingReqStream
return retryStream
module.exports = (options = {}) ->
defaults = {