mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-15 18:50:29 -06:00
NBS: Fix large HTTP {get,has}Refs/ requests (#3629)
When we added GetMany and HasMany, we didn't realize that requests could then be larger than the allowable HTTP form size. This patch makes the body of getRefs and hasRefs be serialized as binary instead, which addresses this issue and actually makes the request body more compact. Fixes #3589
This commit is contained in:
@@ -10,7 +10,7 @@ import (
|
||||
"os"
|
||||
)
|
||||
|
||||
const NomsVersion = "7.13"
|
||||
const NomsVersion = "7.14"
|
||||
const NOMS_VERSION_NEXT_ENV_NAME = "NOMS_VERSION_NEXT"
|
||||
const NOMS_VERSION_NEXT_ENV_VALUE = "1"
|
||||
|
||||
|
||||
@@ -28,13 +28,13 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
httpChunkSinkConcurrency = 6
|
||||
readBufferSize = 1 << 12 // 4K
|
||||
httpChunkStoreConcurrency = 6
|
||||
readThreshold = 1 << 12 // 4K
|
||||
)
|
||||
|
||||
var customHTTPTransport = http.Transport{
|
||||
// Since we limit ourselves to a maximum of httpChunkSinkConcurrency concurrent http requests, we think it's OK to up MaxIdleConnsPerHost so that one connection stays open for each concurrent request
|
||||
MaxIdleConnsPerHost: httpChunkSinkConcurrency,
|
||||
// Since we limit ourselves to a maximum of httpChunkStoreConcurrency concurrent http requests, we think it's OK to up MaxIdleConnsPerHost so that one connection stays open for each concurrent request
|
||||
MaxIdleConnsPerHost: httpChunkStoreConcurrency,
|
||||
// This sets, essentially, an idle-timeout. The timer starts counting AFTER the client has finished sending the entire request to the server. As soon as the client receives the server's response headers, the timeout is canceled.
|
||||
ResponseHeaderTimeout: time.Duration(4) * time.Minute,
|
||||
}
|
||||
@@ -75,7 +75,7 @@ func newHTTPChunkStoreWithClient(baseURL, auth string, client httpDoer) *httpChu
|
||||
getQueue: make(chan chunks.ReadRequest),
|
||||
hasQueue: make(chan chunks.ReadRequest),
|
||||
finishedChan: make(chan struct{}),
|
||||
rateLimit: make(chan struct{}, httpChunkSinkConcurrency),
|
||||
rateLimit: make(chan struct{}, httpChunkStoreConcurrency),
|
||||
workerWg: &sync.WaitGroup{},
|
||||
cacheMu: &sync.RWMutex{},
|
||||
unwrittenPuts: nbs.NewCache(),
|
||||
@@ -204,19 +204,19 @@ func (hcs *httpChunkStore) HasMany(hashes hash.HashSet) (absent hash.HashSet) {
|
||||
return remaining
|
||||
}
|
||||
|
||||
foundChunks := make(chan hash.Hash)
|
||||
notFoundChunks := make(chan hash.Hash)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(remaining))
|
||||
select {
|
||||
case <-hcs.finishedChan:
|
||||
d.Panic("Tried to HasMany on closed ChunkStore")
|
||||
case hcs.hasQueue <- chunks.NewAbsentManyRequest(remaining, wg, foundChunks):
|
||||
case hcs.hasQueue <- chunks.NewAbsentManyRequest(remaining, wg, notFoundChunks):
|
||||
}
|
||||
go func() { defer close(foundChunks); wg.Wait() }()
|
||||
go func() { defer close(notFoundChunks); wg.Wait() }()
|
||||
|
||||
absent = hash.HashSet{}
|
||||
for found := range foundChunks {
|
||||
absent.Insert(found)
|
||||
for notFound := range notFoundChunks {
|
||||
absent.Insert(notFound)
|
||||
}
|
||||
return absent
|
||||
}
|
||||
@@ -245,16 +245,14 @@ func (hcs *httpChunkStore) batchReadRequests(queue <-chan chunks.ReadRequest, ge
|
||||
func (hcs *httpChunkStore) sendReadRequests(req chunks.ReadRequest, queue <-chan chunks.ReadRequest, getter batchGetter) {
|
||||
batch := chunks.ReadBatch{}
|
||||
|
||||
count := 0
|
||||
addReq := func(req chunks.ReadRequest) {
|
||||
for h := range req.Hashes() {
|
||||
batch[h] = append(batch[h], req.Outstanding())
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
addReq(req)
|
||||
for drained := false; !drained && len(batch) < readBufferSize; {
|
||||
for drained := false; !drained && len(batch) < readThreshold; {
|
||||
select {
|
||||
case req := <-queue:
|
||||
addReq(req)
|
||||
@@ -266,9 +264,9 @@ func (hcs *httpChunkStore) sendReadRequests(req chunks.ReadRequest, queue <-chan
|
||||
hcs.rateLimit <- struct{}{}
|
||||
go func() {
|
||||
defer batch.Close()
|
||||
defer func() { <-hcs.rateLimit }()
|
||||
|
||||
getter(batch)
|
||||
<-hcs.rateLimit
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -286,7 +284,7 @@ func (hcs *httpChunkStore) getRefs(batch chunks.ReadBatch) {
|
||||
|
||||
req := newRequest("POST", hcs.auth, u.String(), buildHashesRequest(batch), http.Header{
|
||||
"Accept-Encoding": {"x-snappy-framed"},
|
||||
"Content-Type": {"application/x-www-form-urlencoded"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
})
|
||||
|
||||
res, err := hcs.httpClient.Do(req)
|
||||
@@ -296,7 +294,8 @@ func (hcs *httpChunkStore) getRefs(batch chunks.ReadBatch) {
|
||||
defer closeResponse(reader)
|
||||
|
||||
if http.StatusOK != res.StatusCode {
|
||||
d.Panic("Unexpected response: %s", http.StatusText(res.StatusCode))
|
||||
data, _ := ioutil.ReadAll(reader)
|
||||
d.Panic("Unexpected response: %s\n%s", http.StatusText(res.StatusCode), string(data))
|
||||
}
|
||||
|
||||
chunkChan := make(chan *chunks.Chunk, 16)
|
||||
@@ -318,7 +317,7 @@ func (hcs *httpChunkStore) hasRefs(batch chunks.ReadBatch) {
|
||||
|
||||
req := newRequest("POST", hcs.auth, u.String(), buildHashesRequest(batch), http.Header{
|
||||
"Accept-Encoding": {"x-snappy-framed"},
|
||||
"Content-Type": {"application/x-www-form-urlencoded"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
})
|
||||
|
||||
res, err := hcs.httpClient.Do(req)
|
||||
@@ -375,7 +374,6 @@ func sendWriteRequest(u url.URL, auth, vers string, p *nbs.NomsBlockCache, cli h
|
||||
|
||||
body := buildWriteValueRequest(chunkChan)
|
||||
req := newRequest("POST", auth, u.String(), body, http.Header{
|
||||
"Accept-Encoding": {"gzip"},
|
||||
"Content-Encoding": {"x-snappy-framed"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
})
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
package datas
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
@@ -253,6 +254,35 @@ func (suite *HTTPChunkStoreSuite) TestGetMany() {
|
||||
suite.True(hashes.Has(notPresent))
|
||||
}
|
||||
|
||||
func (suite *HTTPChunkStoreSuite) TestOverGetThreshold_Issue3589() {
|
||||
if testing.Short() {
|
||||
suite.T().Skip("Skipping test in short mode.")
|
||||
}
|
||||
// BUG 3589 happened because we requested enough hashes that the body was over 10MB. The new way of encoding getRefs request bodies means that 10MB will no longer be a limitation. This test will generate a request larger than 10MB.
|
||||
count := ((10 * (1 << 20)) / hash.ByteLen) + 1
|
||||
hashes := make(hash.HashSet, count)
|
||||
for i := 0; i < count-1; i++ {
|
||||
h := hash.Hash{}
|
||||
binary.BigEndian.PutUint64(h[hash.ByteLen-8:], uint64(i))
|
||||
hashes.Insert(h)
|
||||
}
|
||||
|
||||
present := chunks.NewChunk([]byte("ghi"))
|
||||
suite.serverCS.Put(present)
|
||||
persistChunks(suite.serverCS)
|
||||
hashes.Insert(present.Hash())
|
||||
|
||||
foundChunks := make(chan *chunks.Chunk)
|
||||
go func() { suite.http.GetMany(hashes, foundChunks); close(foundChunks) }()
|
||||
|
||||
found := hash.HashSet{}
|
||||
for c := range foundChunks {
|
||||
found.Insert(c.Hash())
|
||||
}
|
||||
suite.Len(found, 1)
|
||||
suite.True(found.Has(present.Hash()))
|
||||
}
|
||||
|
||||
func (suite *HTTPChunkStoreSuite) TestGetManyAllCached() {
|
||||
chnx := []chunks.Chunk{
|
||||
chunks.NewChunk([]byte("abc")),
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -194,22 +193,26 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
// Contents of the returned io.Reader are snappy-compressed.
|
||||
func buildWriteValueRequest(chunkChan chan *chunks.Chunk) io.Reader {
|
||||
// Contents of the returned io.ReadCloser are snappy-compressed.
|
||||
func buildWriteValueRequest(chunkChan chan *chunks.Chunk) io.ReadCloser {
|
||||
body, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
gw := snappy.NewBufferedWriter(pw)
|
||||
sw := snappy.NewBufferedWriter(pw)
|
||||
defer checkClose(pw)
|
||||
defer checkClose(sw)
|
||||
for c := range chunkChan {
|
||||
chunks.Serialize(*c, gw)
|
||||
chunks.Serialize(*c, sw)
|
||||
}
|
||||
d.Chk.NoError(gw.Close())
|
||||
d.Chk.NoError(pw.Close())
|
||||
}()
|
||||
|
||||
return body
|
||||
}
|
||||
|
||||
func checkClose(c io.Closer) {
|
||||
d.PanicIfError(c.Close())
|
||||
}
|
||||
|
||||
func bodyReader(req *http.Request) (reader io.ReadCloser) {
|
||||
reader = req.Body
|
||||
if strings.Contains(req.Header.Get("Content-Encoding"), "gzip") {
|
||||
@@ -309,26 +312,19 @@ func handleGetBlob(w http.ResponseWriter, req *http.Request, ps URLParams, cs ch
|
||||
}
|
||||
|
||||
func extractHashes(req *http.Request) hash.HashSlice {
|
||||
err := req.ParseForm()
|
||||
d.PanicIfError(err)
|
||||
hashStrs := req.PostForm["ref"]
|
||||
if len(hashStrs) <= 0 {
|
||||
d.Panic("PostForm is empty")
|
||||
}
|
||||
|
||||
hashes := make(hash.HashSlice, len(hashStrs))
|
||||
for idx, refStr := range hashStrs {
|
||||
hashes[idx] = hash.Parse(refStr)
|
||||
}
|
||||
return hashes
|
||||
reader := bodyReader(req)
|
||||
defer reader.Close()
|
||||
defer io.Copy(ioutil.Discard, reader) // Ensure all data on reader is consumed
|
||||
return deserializeHashes(reader)
|
||||
}
|
||||
|
||||
func buildHashesRequest(batch chunks.ReadBatch) io.Reader {
|
||||
values := &url.Values{}
|
||||
for h := range batch {
|
||||
values.Add("ref", h.String())
|
||||
}
|
||||
return strings.NewReader(values.Encode())
|
||||
func buildHashesRequest(batch chunks.ReadBatch) io.ReadCloser {
|
||||
body, pw := io.Pipe()
|
||||
go func() {
|
||||
defer checkClose(pw)
|
||||
serializeHashes(pw, batch)
|
||||
}()
|
||||
return body
|
||||
}
|
||||
|
||||
func handleHasRefs(w http.ResponseWriter, req *http.Request, ps URLParams, cs chunks.ChunkStore) {
|
||||
|
||||
@@ -139,18 +139,12 @@ func TestBuildHashesRequest(t *testing.T) {
|
||||
hash.Parse("00000000000000000000000000000003"): nil,
|
||||
}
|
||||
r := buildHashesRequest(batch)
|
||||
b, err := ioutil.ReadAll(r)
|
||||
assert.NoError(err)
|
||||
defer r.Close()
|
||||
requested := deserializeHashes(r)
|
||||
|
||||
urlValues, err := url.ParseQuery(string(b))
|
||||
assert.NoError(err)
|
||||
assert.NotEmpty(urlValues)
|
||||
|
||||
queryRefs := urlValues["ref"]
|
||||
assert.Len(queryRefs, len(batch))
|
||||
for _, r := range queryRefs {
|
||||
_, present := batch[hash.Parse(r)]
|
||||
assert.True(present, "Query contains %s, which is not in initial refs", r)
|
||||
for _, h := range requested {
|
||||
_, present := batch[h]
|
||||
assert.True(present, "Query contains %s, which is not in initial refs", h)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,13 +162,13 @@ func TestHandleGetRefs(t *testing.T) {
|
||||
}
|
||||
persistChunks(cs)
|
||||
|
||||
body := strings.NewReader(fmt.Sprintf("ref=%s&ref=%s", chnx[0].Hash(), chnx[1].Hash()))
|
||||
body := buildHashesRequest(chunks.ReadBatch{chnx[0].Hash(): nil, chnx[1].Hash(): nil})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
HandleGetRefs(
|
||||
w,
|
||||
newRequest("POST", "", "", body, http.Header{
|
||||
"Content-Type": {"application/x-www-form-urlencoded"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
}),
|
||||
params{},
|
||||
storage.NewView(),
|
||||
@@ -274,13 +268,17 @@ func TestHandleHasRefs(t *testing.T) {
|
||||
cs.Put(present)
|
||||
persistChunks(cs)
|
||||
|
||||
body := strings.NewReader(fmt.Sprintf("ref=%s&ref=%s&ref=%s", chnx[0].Hash(), chnx[1].Hash(), present.Hash()))
|
||||
body := buildHashesRequest(chunks.ReadBatch{
|
||||
chnx[0].Hash(): nil,
|
||||
chnx[1].Hash(): nil,
|
||||
present.Hash(): nil,
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
HandleHasRefs(
|
||||
w,
|
||||
newRequest("POST", "", "", body, http.Header{
|
||||
"Content-Type": {"application/x-www-form-urlencoded"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
}),
|
||||
params{},
|
||||
storage.NewView(),
|
||||
|
||||
47
go/datas/serialize_hashes.go
Normal file
47
go/datas/serialize_hashes.go
Normal file
@@ -0,0 +1,47 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
)
|
||||
|
||||
func serializeHashes(w io.Writer, batch chunks.ReadBatch) {
|
||||
err := binary.Write(w, binary.BigEndian, uint32(len(batch))) // 4 billion hashes is probably absurd. Maybe this should be smaller?
|
||||
d.PanicIfError(err)
|
||||
for h := range batch {
|
||||
serializeHash(w, h)
|
||||
}
|
||||
}
|
||||
|
||||
func serializeHash(w io.Writer, h hash.Hash) {
|
||||
_, err := w.Write(h[:])
|
||||
d.PanicIfError(err)
|
||||
}
|
||||
|
||||
func deserializeHashes(reader io.Reader) hash.HashSlice {
|
||||
count := uint32(0)
|
||||
err := binary.Read(reader, binary.BigEndian, &count)
|
||||
d.PanicIfError(err)
|
||||
|
||||
hashes := make(hash.HashSlice, count)
|
||||
for i := range hashes {
|
||||
hashes[i] = deserializeHash(reader)
|
||||
}
|
||||
return hashes
|
||||
}
|
||||
|
||||
func deserializeHash(reader io.Reader) hash.Hash {
|
||||
h := hash.Hash{}
|
||||
n, err := io.ReadFull(reader, h[:])
|
||||
d.PanicIfError(err)
|
||||
d.PanicIfFalse(int(hash.ByteLen) == n)
|
||||
return h
|
||||
}
|
||||
33
go/datas/serialize_hashes_test.go
Normal file
33
go/datas/serialize_hashes_test.go
Normal file
@@ -0,0 +1,33 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/testify/assert"
|
||||
)
|
||||
|
||||
func TestHashRoundTrip(t *testing.T) {
|
||||
b := &bytes.Buffer{}
|
||||
input := chunks.ReadBatch{
|
||||
hash.Parse("00000000000000000000000000000000"): nil,
|
||||
hash.Parse("00000000000000000000000000000001"): nil,
|
||||
hash.Parse("00000000000000000000000000000002"): nil,
|
||||
hash.Parse("00000000000000000000000000000003"): nil,
|
||||
}
|
||||
defer input.Close()
|
||||
|
||||
serializeHashes(b, input)
|
||||
output := deserializeHashes(b)
|
||||
assert.Len(t, output, len(input), "Output has different number of elements than input: %v, %v", output, input)
|
||||
for _, h := range output {
|
||||
_, present := input[h]
|
||||
assert.True(t, present, "%s is in output but not in input", h)
|
||||
}
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
4:7.13:8s92pdafhd4hkhav6r4748u1rjlosh1k:5b1e9knhol2orv0a8ej6tvelc46jp92l:bsvid54jt8pjto211lcdl14tbfd39jmn:2:998se5i5mf15fld7f318818i6ie0c8rr:2
|
||||
4:7.14:8s92pdafhd4hkhav6r4748u1rjlosh1k:5b1e9knhol2orv0a8ej6tvelc46jp92l:bsvid54jt8pjto211lcdl14tbfd39jmn:2:998se5i5mf15fld7f318818i6ie0c8rr:2
|
||||
Reference in New Issue
Block a user