Fix sending forms with large request bodies (#4241)

* add e2e test for submitting forms over https on localhost

* add multipart/form-data test

* add multiparty to parse multipart/form-data streams

* test form submission with attachments

* add repro for #4253

* stream_buffer failing on large body size

* wip: stream buffer fixes

* add eslint + require spec helper

* always cleanup on error and on response

* WIP: continue refactoring stream buffer

- utilize ‘finish’ and ‘chunk’ events to know when to push into
internal readable buffer instead of using `readable.push(‘’)`
- recursively call readable.push(…) until it returns false
- add tests for recursive push calls
- add tests for ensuring readables don’t end until writeable buffer does

* fix remaining tests after stream buffer refactor

* use path.resolve not path.join for handling cy.readFile + cy.writeFile

* consolidate e2e form multipart tests with existing ones

- use env var for passing around path to large earth image
- dynamically fetch large earth.jpg img and gitignore it prior to
running tests
- finish tests + passing implementation

* reader() -> createReadStream()

* guard against reqBodyBuffer being null


Co-authored-by: Brian Mann <brian.mann86@gmail.com>
This commit is contained in:
Zach Bloomquist
2019-05-22 04:19:14 -04:00
committed by Brian Mann
parent b568e82545
commit 9147e4f988
15 changed files with 649 additions and 140 deletions

1
.gitignore vendored
View File

@@ -43,6 +43,7 @@ packages/driver/test/cypress/videos
packages/server/.cy
packages/server/.projects
packages/server/support
packages/server/test/support/fixtures/server/imgs
packages/server/test/support/fixtures/server/libs
# CLI tool

View File

@@ -1,4 +1,4 @@
exports['e2e form submissions passing 1'] = `
exports['e2e forms <form> submissions passes with http on localhost 1'] = `
====================================================================================================
@@ -7,37 +7,43 @@ exports['e2e form submissions passing 1'] = `
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Cypress: 1.2.3 │
│ Browser: FooBrowser 88 │
│ Specs: 1 found (form_submission_passing_spec.coffee)
│ Searched: cypress/integration/form_submission_passing_spec.coffee
│ Specs: 1 found (form_submission_multipart_spec.coffee) │
│ Searched: cypress/integration/form_submission_multipart_spec.coffee │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
────────────────────────────────────────────────────────────────────────────────────────────────────
Running: form_submission_passing_spec.coffee... (1 of 1)
Running: form_submission_multipart_spec.coffee... (1 of 1)
form submissions
will find 'form success' message by default (after retrying)
needs an explicit should when an element is immediately found
<form> submissions
can submit a form correctly
can submit a multipart/form-data form correctly
can submit a multipart/form-data form with attachments
✓ image/png
✓ application/pdf
✓ image/jpeg
✓ large application/pdf
✓ large image/jpeg
2 passing
7 passing
(Results)
┌───────────────────────────────────────────────────┐
│ Tests: 2
│ Passing: 2
│ Failing: 0 │
│ Pending: 0 │
│ Skipped: 0 │
│ Screenshots: 0 │
│ Video: true │
│ Duration: X seconds │
│ Spec Ran: form_submission_passing_spec.coffee │
└───────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────
│ Tests: 7
│ Passing: 7
│ Failing: 0
│ Pending: 0
│ Skipped: 0
│ Screenshots: 0
│ Video: true
│ Duration: X seconds
│ Spec Ran: form_submission_multipart_spec.coffee │
└─────────────────────────────────────────────────────
(Video)
@@ -53,14 +59,82 @@ exports['e2e form submissions passing 1'] = `
Spec Tests Passing Failing Pending Skipped
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ ✔ form_submission_passing_spec.coffee XX:XX 2 2 - - - │
│ ✔ form_submission_multipart_spec.coffee XX:XX 7 7 - - - │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
All specs passed! XX:XX 2 2 - - -
All specs passed! XX:XX 7 7 - - -
`
exports['e2e form submissions failing 1'] = `
exports['e2e forms <form> submissions passes with https on localhost 1'] = `
====================================================================================================
(Run Starting)
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Cypress: 1.2.3 │
│ Browser: FooBrowser 88 │
│ Specs: 1 found (form_submission_multipart_spec.coffee) │
│ Searched: cypress/integration/form_submission_multipart_spec.coffee │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
────────────────────────────────────────────────────────────────────────────────────────────────────
Running: form_submission_multipart_spec.coffee... (1 of 1)
<form> submissions
✓ can submit a form correctly
✓ can submit a multipart/form-data form correctly
can submit a multipart/form-data form with attachments
✓ image/png
✓ application/pdf
✓ image/jpeg
✓ large application/pdf
✓ large image/jpeg
7 passing
(Results)
┌─────────────────────────────────────────────────────┐
│ Tests: 7 │
│ Passing: 7 │
│ Failing: 0 │
│ Pending: 0 │
│ Skipped: 0 │
│ Screenshots: 0 │
│ Video: true │
│ Duration: X seconds │
│ Spec Ran: form_submission_multipart_spec.coffee │
└─────────────────────────────────────────────────────┘
(Video)
- Started processing: Compressing to 32 CRF
- Finished processing: /foo/bar/.projects/e2e/cypress/videos/abc123.mp4 (X seconds)
====================================================================================================
(Run Finished)
Spec Tests Passing Failing Pending Skipped
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ ✔ form_submission_multipart_spec.coffee XX:XX 7 7 - - - │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
All specs passed! XX:XX 7 7 - - -
`
exports['e2e forms submissions with jquery XHR POST failing 1'] = `
====================================================================================================
@@ -146,3 +220,65 @@ exports['e2e form submissions failing 1'] = `
`
exports['e2e forms submissions with jquery XHR POST passing 1'] = `
====================================================================================================
(Run Starting)
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Cypress: 1.2.3 │
│ Browser: FooBrowser 88 │
│ Specs: 1 found (form_submission_passing_spec.coffee) │
│ Searched: cypress/integration/form_submission_passing_spec.coffee │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
────────────────────────────────────────────────────────────────────────────────────────────────────
Running: form_submission_passing_spec.coffee... (1 of 1)
form submissions
✓ will find 'form success' message by default (after retrying)
✓ needs an explicit should when an element is immediately found
2 passing
(Results)
┌───────────────────────────────────────────────────┐
│ Tests: 2 │
│ Passing: 2 │
│ Failing: 0 │
│ Pending: 0 │
│ Skipped: 0 │
│ Screenshots: 0 │
│ Video: true │
│ Duration: X seconds │
│ Spec Ran: form_submission_passing_spec.coffee │
└───────────────────────────────────────────────────┘
(Video)
- Started processing: Compressing to 32 CRF
- Finished processing: /foo/bar/.projects/e2e/cypress/videos/abc123.mp4 (X seconds)
====================================================================================================
(Run Finished)
Spec Tests Passing Failing Pending Skipped
┌────────────────────────────────────────────────────────────────────────────────────────────────┐
│ ✔ form_submission_passing_spec.coffee XX:XX 2 2 - - - │
└────────────────────────────────────────────────────────────────────────────────────────────────┘
All specs passed! XX:XX 2 2 - - -
`

View File

@@ -4,7 +4,7 @@ fs = require("./util/fs")
module.exports = {
readFile: (projectRoot, file, options = {}) ->
filePath = path.join(projectRoot, file)
filePath = path.resolve(projectRoot, file)
readFn = if path.extname(filePath) is ".json"
fs.readJsonAsync
else
@@ -21,7 +21,7 @@ module.exports = {
throw err
writeFile: (projectRoot, file, contents, options = {}) ->
filePath = path.join(projectRoot, file)
filePath = path.resolve(projectRoot, file)
writeOptions = {
encoding: options.encoding or "utf8"
flag: options.flag or "w"

View File

@@ -302,9 +302,17 @@ createRetryingRequestStream = (opts = {}) ->
reqBodyBuffer = streamBuffer()
retryStream = duplexify(reqBodyBuffer, delayStream)
cleanup = ->
if reqBodyBuffer
## null req body out to free memory
reqBodyBuffer.unpipeAll()
reqBodyBuffer = null
emitError = (err) ->
retryStream.emit("error", err)
cleanup()
tryStartStream = ->
## if our request has been aborted
## in the time that we were waiting to retry
@@ -324,7 +332,7 @@ createRetryingRequestStream = (opts = {}) ->
## into the reqStream, then reapply this now
if req
reqStream.emit('pipe', req)
reqBodyBuffer.reader().pipe(reqStream)
reqBodyBuffer.createReadStream().pipe(reqStream)
## forward the abort call to the underlying request
retryStream.abort = ->
@@ -390,9 +398,7 @@ createRetryingRequestStream = (opts = {}) ->
onElse: ->
debug("successful response received", { requestId })
## null req body out to free memory
reqBodyBuffer.unpipeAll()
reqBodyBuffer = null
cleanup()
## forward the response event upwards which should happen
## prior to the pipe event, same as what request does

View File

@@ -1,13 +1,15 @@
const _ = require('lodash')
const debug = require('debug')('cypress:server:stream_buffer')
const stream = require('stream')
const through2 = require('through2')
function streamBuffer (initialSize = 2048) {
let buffer = Buffer.allocUnsafe(initialSize)
let bytesWritten = 0
let finished = false
const onChunk = (chunk, enc, cb) => {
const readers = []
const onWrite = (chunk, enc, cb) => {
if (chunk.length + bytesWritten > buffer.length) {
let newBufferLength = buffer.length
@@ -30,46 +32,75 @@ function streamBuffer (initialSize = 2048) {
bytesWritten += chunk.copy(buffer, bytesWritten)
return cb(null, chunk)
// emit in case there are readers waiting
writeable.emit('chunk', chunk)
cb()
}
const onFlush = (cb) => {
const onFinal = (cb) => {
debug('stream buffer writeable final called')
finished = true
cb()
}
const bufferer = through2(onChunk, onFlush)
const readers = []
const writeable = new stream.Writable({
write: onWrite,
final: onFinal,
autoDestroy: true,
})
bufferer.reader = () => {
writeable.createReadStream = () => {
let bytesRead = 0
const readerId = _.uniqueId('reader')
const onRead = function (size) {
// if there are unread bytes in the buffer,
// send up to bytesWritten back
if (bytesRead < bytesWritten) {
const chunkLength = bytesWritten - bytesRead
const bytes = buffer.slice(bytesRead, bytesRead + chunkLength)
const bytesLength = bytes.length
debug('reading unread bytes from buffer %o', {
readerId, bytesRead, bytesWritten, chunkLength, readChunkLength: bytesLength,
})
bytesRead += bytesLength
// if we can still push more bytes into
// the buffer then do it
if (readable.push(bytes)) {
return onRead(size)
}
}
// if it's finished and there are no unread bytes, EOF
if (finished) {
// cleanup listeners that were added
writeable.removeListener('chunk', onRead)
writeable.removeListener('finish', onRead)
debug('buffered stream EOF %o', { readerId })
return readable.push(null)
}
// if we're not finished we may end up writing
// more data - or we may end
writeable.removeListener('chunk', onRead)
writeable.once('chunk', onRead)
// if the writeable stream buffer isn't finished
// yet - then read() will not be called again,
// so we restart reading when its finished
writeable.removeListener('finish', onRead)
writeable.once('finish', onRead)
}
const readable = new stream.Readable({
read (size = initialSize) {
// if there are unread bytes in the buffer, send up to bytesWritten back
if (bytesRead < bytesWritten) {
const chunkLength = Math.min(size, bytesWritten)
const bytes = buffer.slice(bytesRead, chunkLength)
debug('reading unread bytes from buffer %o', { bytesRead, bytesWritten, readChunkLength: bytes.length, chunkLength, size })
bytesRead += bytes.length
return this.push(bytes)
}
// if there are no unread bytes, but the bufferer
// is still writing in, send an empty string
if (!finished) {
debug('no unread bytes, sending empty string %o', { bytesRead, bytesWritten })
return this.push('')
}
// if it's finished and there are no unread bytes, EOF
debug('buffered stream EOF')
this.push(null)
},
read: onRead,
autoDestroy: true,
})
readers.push(readable)
@@ -77,21 +108,20 @@ function streamBuffer (initialSize = 2048) {
return readable
}
bufferer.unpipeAll = () => {
readers.forEach((reader) => {
reader.unpipe() // unpipes from all destinations
})
writeable.unpipeAll = () => {
buffer = null // aggressive GC
_.invokeMap(readers, 'unpipe')
}
bufferer._buffer = () => {
writeable._buffer = () => {
return buffer
}
bufferer._finished = () => {
writeable._finished = () => {
return finished
}
return bufferer
return writeable
}
module.exports = {

View File

@@ -77,6 +77,7 @@
"istanbul": "0.4.5",
"mocked-env": "1.2.4",
"mockery": "1.7.0",
"multiparty": "4.2.1",
"nock": "9.6.1",
"npm-install-version": "6.0.2",
"obfuscator": "0.5.4",

View File

@@ -0,0 +1,8 @@
{
"globals": {
"sinon": true
},
"extends": [
"plugin:cypress-dev/tests"
]
}

View File

@@ -1,18 +1,147 @@
rp = require("request-promise")
path = require("path")
Promise = require("bluebird")
bodyParser = require("body-parser")
multiparty = require("multiparty")
fs = require("../../lib/util/fs")
e2e = require("../support/helpers/e2e")
Fixtures = require("../support/helpers/fixtures")
describe "e2e form submissions", ->
e2e.setup()
HTTPS_PORT = 11443
HTTP_PORT = 11180
it "passing", ->
e2e.exec(@, {
spec: "form_submission_passing_spec.coffee"
snapshot: true
expectedExitCode: 0
e2ePath = Fixtures.projectPath("e2e")
pathToLargeImage = Fixtures.path("server/imgs/earth.jpg")
getFormHtml = (formAttrs, textValue = '') ->
"""
<html>
<body>
<form method="POST" #{formAttrs}>
<input name="foo" type="text" value="#{textValue}"/>
<input name="bar" type="file"/>
<input type="submit"/>
</form>
</body>
</html>
"""
onServer = (app) ->
app.post "/verify-attachment", (req, res) ->
form = new multiparty.Form()
form.parse req, (err, fields, files) ->
fixturePath = path.resolve(e2ePath, "cypress", "fixtures", fields["foo"][0])
filePath = files["bar"][0].path
Promise.props({
fixture: fs.readFileAsync(fixturePath),
upload: fs.readFileAsync(filePath)
})
.then ({ fixture, upload }) ->
ret = fixture.compare(upload)
if ret is 0
return res.send('files match')
res.send(
"""
file did not match. file at #{fixturePath} did not match #{filePath}.
<br/>
<hr/>
buffer compare yielded: #{ret}
"""
)
## all routes below this point will have bodies parsed
app.use(bodyParser.text({
type: '*/*' ## parse any content-type
}))
app.get "/", (req, res) ->
res
.type('html')
.send(getFormHtml('action="/dump-body"'))
app.get "/multipart-form-data", (req, res) ->
res
.type('html')
.send(getFormHtml('action="/dump-body" enctype="multipart/form-data"'))
app.get "/multipart-with-attachment", (req, res) ->
res
.type('html')
.send(getFormHtml('action="/verify-attachment" enctype="multipart/form-data"', req.query.fixturePath))
app.post "/dump-body", (req, res) ->
res
.type('html')
.send(req.body)
describe "e2e forms", ->
context "submissions with jquery XHR POST", ->
e2e.setup()
it "passing", ->
e2e.exec(@, {
spec: "form_submission_passing_spec.coffee"
snapshot: true
expectedExitCode: 0
})
it "failing", ->
e2e.exec(@, {
spec: "form_submission_failing_spec.coffee"
snapshot: true
expectedExitCode: 1
})
context "<form> submissions", ->
e2e.setup({
settings: {
env: {
PATH_TO_LARGE_IMAGE: pathToLargeImage
}
}
servers: [
{
port: HTTPS_PORT
https: true
onServer
},
{
port: HTTP_PORT
onServer
}
]
})
it "failing", ->
e2e.exec(@, {
spec: "form_submission_failing_spec.coffee"
snapshot: true
expectedExitCode: 1
})
before ->
## go out and fetch this image if we don't already have it
fs
.readFileAsync(pathToLargeImage)
.catch { code: "ENOENT"}, ->
## 16MB image, too big to include with git repo
rp("https://test-page-speed.cypress.io/files/huge-image.jpg")
.then (resp) ->
fs.outputFileAsync(pathToLargeImage, resp)
it "passes with https on localhost", ->
e2e.exec(@, {
config: {
baseUrl: "https://localhost:#{HTTPS_PORT}"
}
spec: "form_submission_multipart_spec.coffee"
snapshot: true
expectedExitCode: 0
})
it "passes with http on localhost", ->
e2e.exec(@, {
config: {
baseUrl: "http://localhost:#{HTTP_PORT}"
}
spec: "form_submission_multipart_spec.coffee"
snapshot: true
expectedExitCode: 0
})

View File

@@ -57,13 +57,25 @@ controllers = {
res.send('<img src="/immediate-reset?load-img"/>')
printBodyThirdTimeForm: (req, res) ->
res.send("<html><body><form method='POST' action='/print-body-third-time'><input type='text' name='foo'/><input type='submit'/></form></body></html>")
res.send(
"""
<html>
<body>
<form method='POST' action='/print-body-third-time'>
<input type='text' name='foo'/>
<input type='submit'/>
</form>
</body>
</html>
"""
)
printBodyThirdTime: (req, res) ->
console.log(req.body)
res.type('html')
if counts[req.url] == 3
if counts[req.url] is 3
return res.send(JSON.stringify(req.body))
req.socket.destroy()

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.5 KiB

View File

@@ -0,0 +1,77 @@
{ Blob, _ } = Cypress
Cypress.Commands.add 'setFile', { prevSubject: "element" }, (element, filePath) ->
mimeTypes = {
jpeg: "image/jpeg"
jpg: "image/jpeg"
png: "image/png"
pdf: "application/pdf"
}
filePathSplitted = filePath.split('.').pop()
mimeType = if mimeTypes[filePathSplitted] != undefined then mimeTypes[filePathSplitted] else null
fixtureOrReadFile = (filePath) ->
if _.startsWith(filePath, "/")
return cy.readFile(filePath, "base64")
return cy.fixture(filePath, "base64")
return fixtureOrReadFile(filePath).then (image) ->
return Blob.base64StringToBlob(image).then (blob) ->
elementNode = element[0]
file = new File([ blob ], filePath, type: mimeType)
dataTransfer = new DataTransfer
dataTransfer.items.add(file)
elementNode.files = dataTransfer.files
elementNode.dispatchEvent new Event("change", { bubbles: true })
describe "<form> submissions", ->
it "can submit a form correctly", ->
cy
.visit("/")
.get("input[type=text]")
.type("hello world")
.get("input[type=submit]")
.click()
.document()
.contains('hello+world')
it "can submit a multipart/form-data form correctly", ->
cy
.visit("/multipart-form-data")
.get("input[type=text]")
.type("hello world")
.get("input[type=submit]")
.click()
.document()
.contains('hello world')
context "can submit a multipart/form-data form with attachments", ->
testUpload = (fixturePath, containsOpts= {}) ->
cy.visit("/multipart-with-attachment?fixturePath=#{fixturePath}")
.get("input[type=file]")
.setFile(fixturePath)
.get("input[type=submit]")
.click()
.document()
.contains('files match', containsOpts)
it "image/png", ->
testUpload("../../static/javascript-logo.png")
it "application/pdf", ->
testUpload("sample.pdf")
it "image/jpeg", ->
testUpload("sample.jpg")
## https://github.com/cypress-io/cypress/issues/4253
it "large application/pdf", ->
testUpload("bigger-sample.pdf")
## https://github.com/cypress-io/cypress/issues/4240
it "large image/jpeg", ->
testUpload(Cypress.env("PATH_TO_LARGE_IMAGE"), {
timeout: 120000
})

View File

@@ -28,4 +28,4 @@
<input name="name">
<button id="submit" type="submit">submit</button>
</form>
</body>
</body>

View File

@@ -1,116 +1,225 @@
const { describe, it } = require('mocha')
const { expect } = require('chai')
require('../spec_helper')
const _ = require('lodash')
const fs = require('fs')
const streamBuffer = require('../../lib/util/stream_buffer')
const stream = require('stream')
const Promise = require('bluebird')
const concatStream = require('concat-stream')
const { streamBuffer } = require('../../lib/util/stream_buffer')
function drain (stream) {
let buf = ''
let chunk
while ((chunk = stream.read())) {
buf += chunk
}
return buf
return new Promise((resolve) => {
return stream.pipe(concatStream((buf) => {
resolve(buf.toString())
}))
})
}
describe('lib/util/stream_buffer', function () {
it('reads out no matter when we write', function () {
describe('lib/util/stream_buffer', () => {
it('reads out no matter when we write', function (done) {
done = _.after(2, done)
const pt = stream.PassThrough()
const sb = streamBuffer.streamBuffer()
const sb = streamBuffer()
pt.pipe(sb)
pt.write('test')
pt.write('test 2')
pt.write('1')
pt.write(' 2')
const reader = sb.reader()
const tickWrite = (chunk) => {
process.nextTick(() => {
pt.write(chunk)
})
}
let buf = drain(reader)
const readable = sb.createReadStream()
expect(buf).to.eq('testtest 2')
readable.once('data', (data2) => {
expect(data2.toString()).to.eq('1 2')
pt.write('test 3')
tickWrite(' 3')
buf += drain(reader)
readable.once('data', (data3) => {
expect(data3.toString()).to.eq(' 3')
expect(buf).to.eq('testtest 2test 3')
tickWrite(' 4')
pt.write('test 4')
const readable2 = sb.createReadStream()
const reader2 = sb.reader()
readable.once('data', (data4) => {
expect(data4.toString()).to.eq(' 4')
})
let buf2 = drain(reader2)
readable2.once('data', (data) => {
expect(data.toString()).to.eq('1 2 3 4')
expect(buf2).to.eq('testtest 2test 3test 4')
tickWrite(' 5')
pt.write('test 5')
readable2.once('data', (data5) => {
expect(data5.toString()).to.eq(' 5')
buf += drain(reader)
expect(buf).to.eq('testtest 2test 3test 4test 5')
done()
})
buf2 += drain(reader2)
expect(buf2).to.eq('testtest 2test 3test 4test 5')
readable.once('data', (data5) => {
expect(data5.toString()).to.eq(' 5')
done()
})
})
})
})
})
it('on overflow, enlarges the internal buffer by the smallest power of 2 that can fit the chunk', function () {
const sb = streamBuffer.streamBuffer(64)
it('on overflow, enlarges the internal buffer by the smallest power of 2 that can fit the chunk', () => {
const sb = streamBuffer(64)
sb.write('A'.repeat(65))
expect(sb._buffer().length).to.eq(128)
sb.write('A'.repeat(1024))
sb.end('A'.repeat(1024))
expect(sb._buffer().length).to.eq(2048)
const reader = sb.reader()
const buf = drain(reader)
const readable = sb.createReadStream()
expect(buf).to.eq('A'.repeat(1089))
return drain(readable)
.then((buf) => {
expect(buf).to.eq('A'.repeat(1089))
})
})
it('finishes when buffer stream closes while still allowing data to be drained', function () {
const sb = streamBuffer.streamBuffer()
sb.pipe(sb)
it('finishes when buffer stream closes while still allowing data to be drained', (done) => {
const sb = streamBuffer()
sb.write('foo')
sb.write('bar')
expect(sb._finished()).to.be.false
sb.end()
expect(sb._finished()).to.be.true
sb.end(() => {
expect(sb._finished()).to.be.true
const reader = sb.reader()
const buf = drain(reader)
const readable = sb.createReadStream()
expect(buf).to.eq('foobar')
return drain(readable)
.then((buf) => {
expect(buf).to.eq('foobar')
const reader2 = sb.reader()
const buf2 = drain(reader2)
const readable2 = sb.createReadStream()
expect(buf2).to.eq('foobar')
return drain(readable2)
.then((buf2) => {
expect(buf2).to.eq('foobar')
done()
})
})
})
})
it('can be piped into and then read from', function (done) {
const expected = fs.readFileSync(__filename).toString()
const rs = fs.createReadStream(__filename)
const ws = fs.createWriteStream('/dev/null')
const sb = streamBuffer.streamBuffer()
const sb = streamBuffer()
rs.pipe(sb).pipe(ws)
rs.pipe(sb)
const reader = sb.reader()
const readable = sb.createReadStream()
rs.on('end', () => {
const buf = drain(reader)
return drain(readable)
.then((buf) => {
expect(buf).to.eq(expected)
expect(buf).to.eq(expected)
done()
done()
})
})
})
it('readable recursively pushes until it returns false', (done) => {
const sb = streamBuffer()
const readable = sb.createReadStream()
const writeable = stream.Writable({
final () {
expect(readable.push).to.be.calledTwice
expect(readable.push.firstCall).to.be.calledWith(buf)
expect(readable.push.secondCall).to.be.calledWith(null)
done()
},
write (chunk, enc, cb) {
cb()
},
})
sinon.spy(readable, 'push')
readable.pipe(writeable)
const size = 64 * 1024 // 64 kb
const buf = Buffer.alloc(size, '!')
sb.end(buf)
})
it('readable pipes do not end until the writeable ends', function (done) {
const sb = streamBuffer()
const readable = sb.createReadStream()
const writeable = stream.Writable({
final () {
expect(sb.writable).to.be.false
expect(sb._writableState).to.have.property('ended', true)
done()
},
write (chunk, enc, cb) {
process.nextTick(() => {
if (sb.writable) {
sb.end('asdf')
}
})
cb()
},
})
readable.pipe(writeable)
const size = 64 * 1024 // 64 kb
const buf = Buffer.alloc(size, '!')
sb.write(buf)
})
it('can handle a massive req body', function (done) {
const size = 16 * 1024 // 16 kb
const repeat = 3
const body = Buffer.alloc(size, '!')
const sb = streamBuffer()
const pt = new stream.PassThrough({
highWaterMark: Number.MAX_SAFE_INTEGER,
})
pt.pipe(sb, { end: true })
pt.write(Buffer.alloc(size, '!'))
pt.write(Buffer.alloc(size, '!'))
pt.write(Buffer.alloc(size, '!'))
pt.on('end', () => {
const readable = sb.createReadStream()
drain(readable)
.then((buf) => {
expect(buf.length).to.eq(body.length * repeat)
expect(buf).to.eq(body.toString().repeat(repeat))
done()
})
})
pt.end()
})
})