From 7be1051197e5c911f73f6b18a91bbd33a99dcd4f Mon Sep 17 00:00:00 2001 From: Brian Mann Date: Mon, 29 Apr 2019 01:56:18 -0400 Subject: [PATCH] WIP: continue making progress building out request retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - swap out passthrough’s for pumpify / duplexify / pump - clean up error handling / messaging / retry logic --- packages/server/lib/request.coffee | 138 +++++++++++++++++++++-------- 1 file changed, 101 insertions(+), 37 deletions(-) diff --git a/packages/server/lib/request.coffee b/packages/server/lib/request.coffee index 1c1e27b4fb..87e752fbfc 100644 --- a/packages/server/lib/request.coffee +++ b/packages/server/lib/request.coffee @@ -7,6 +7,7 @@ debug = require("debug")("cypress:server:request") moment = require("moment") Promise = require("bluebird") stream = require("stream") +pumpify = require("pumpify") agent = require("@packages/network").agent statusCode = require("./util/status_code") Cookies = require("./automation/cookies") @@ -177,80 +178,143 @@ pipeEvent = (source, destination, event) -> source.on event, (args...) -> destination.emit(event, args...) -createRetryingRequestStream = (opts) -> - retryingReqStream = stream.PassThrough() +createRetryingRequestStream = (opts = {}) -> + retryStream = pumpify() + + retryStream.on "error", (err) -> + debugger + + didAbort = false + pipeSrc = null + + emitError = (err) -> + # retryStream.emit("error", err) + retryStream.destroy(err) tryStartStream = (iteration = 0) -> - reqStream = r(opts) + ## if our request has been aborted + ## in the time that we were waiting to retry + ## then immediately bail + if didAbort + return - # re-attach so proxy can control the response - retryingReqStream.abort = -> - reqStream.abort() + didReceiveResponse = false + + retry = (err) -> + attempts = iteration + 1 - retry = -> delay = getDelayForRetry(iteration) reqStream.abort() - retryingReqStream.emit("retry", { iteration, delay }) + debug("received an error on request. retrying after '#{delay}ms' %o", { + opts + attempts + delay + err + }) + + retryStream.emit("retry", { attempts, delay }) debug("retry %o", { iteration, delay }) setTimeout -> - tryStartStream(iteration + 1) + tryStartStream(attempts) , delay - onUselessError = (err) -> - # this reqStream is now garbage, but sometimes it still receives errors, so this can't crash the process - debug("received another error on stream %o", err) + reqStream = r(opts) + delayStream = stream.PassThrough() - onError = (err) -> - # on `error`, retry and emit `retry` on retryingReqStream + retryStream.setPipeline([reqStream, delayStream]) + # retryStream.setWritable(reqStream) + # retryStream.setReadable(delayStream) - debug("received an error creating stream %o", err) + ## if we're retrying and we previous piped + ## into the reqStream, then reapply this now + if pipeSrc + pipeSrc.pipe(reqStream) - reqStream.on "error", onUselessError + ## forward the abort call to the underlying request + retryStream.abort = -> + didAbort = true + + reqStream.abort() + + onPiped = (src) -> + ## store this so we can reapply it + ## if we need to retry + ## TODO: this needs to write to the fs + ## so we can re-read the request body + ## later and then remove it after the + ## response is complete + pipeSrc = src + + ## when this passthrough stream is being piped into + ## then make sure we properly "forward" and connect + ## forward it to the real reqStream which enables + ## request to read off the IncomingMessage readable stream + retryStream.once("pipe", onPiped) + + reqStream.on "error", (err) -> + if didReceiveResponse + ## if we've already begun processing the requests + ## response, then that means we failed during transit + ## and its no longer safe to retry. all we can do now + ## is propogate the error upwards + debug("received an error on request after response started %o", { opts, err }) + + return emitError(err) + + ## otherwise, see if we can retry another request under the hood... if not isRetriableError(err, opts) - debug('this err aint retriable %o', err) - return retryingReqStream.emit("error", err) + debug("received a non-retryable request error %o", { opts, err }) + + return emitError(err) if iteration >= MAX_REQUEST_RETRIES - debug("retried %dx and still network error, not retrying", MAX_REQUEST_RETRIES) - return retryingReqStream.emit("error", err) + debug("exhausted all attempts to retry request", { + attempts: iteration, + opts, + err, + }) - retry() + return emitError(err) - reqStream.once "error", onError + return retry(err) + + ## TODO: need to forward the other request + http.ClientRequest events + ## abort[ed], complete, request, etc... + + reqStream.once "request", (req) -> + ## remove the pipe listener since once the request has + ## been made, we cannot pipe into the reqStream anymore + retryStream.removeListener("pipe", onPiped) reqStream.once "response", (incomingRes) -> - reqStream.removeListener("error", onError) + didReceiveResponse = true ## ok, no net error, but what about a bad status code? if hasRetriableStatusCodeFailure(incomingRes, opts) && iteration < MAX_REQUEST_RETRIES debug("received failing status code on res, retrying", _.pick(incomingRes, "statusCode")) - reqStream.on "error", onUselessError - return retry() - # on `response`, begin piping everything and re-emit `response` on retryingReqStream - reqStream.pipe(retryingReqStream) + ## otherwise, we've successfully received a valid response... - # also need to pipe all the non-data events - _.map( - [ - # all `stream.Readable` events except "data" - "close", "end", "error", "pause", "readable", "resume" - ], - _.partial(pipeEvent, reqStream, retryingReqStream) - ) + ## forward the response event upwards which should happen + ## prior to the pipe event, same as what request does + ## https://github.com/request/request/blob/master/request.js#L1059 + retryStream.emit("response", incomingRes) - retryingReqStream.emit("response", incomingRes) + # retryStream.setReadable(delayStream) + + + return null tryStartStream() - retryingReqStream + return retryStream module.exports = (options = {}) -> defaults = {