mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-12 02:58:53 -06:00
Remove ChunkStore backpressure mechanism (#3278)
This was something that evolved from the way that Dynamo stores data, and a way to allow clients to make incremental write progress. We never actually made the clients handle it properly, though, and so much has changed since we wrote it that it's only going to be in the way of building something better. Fixes #3234
This commit is contained in:
@@ -5,7 +5,6 @@
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
@@ -62,25 +61,11 @@ type ChunkSink interface {
|
||||
// Put writes c into the ChunkSink, blocking until the operation is complete.
|
||||
Put(c Chunk)
|
||||
|
||||
// PutMany tries to write chunks into the sink. It will block as it
|
||||
// handles as many as possible, then return a BackpressureError containing
|
||||
// the rest (if any).
|
||||
PutMany(chunks []Chunk) BackpressureError
|
||||
// PutMany writes chunks into the sink, blocking until the operation is complete.
|
||||
PutMany(chunks []Chunk)
|
||||
|
||||
// On return, any previously Put chunks should be durable
|
||||
Flush()
|
||||
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// BackpressureError is a slice of hash.Hash that indicates some chunks could
|
||||
// not be Put(). Caller is free to try to Put them again later.
|
||||
type BackpressureError hash.HashSlice
|
||||
|
||||
func (b BackpressureError) Error() string {
|
||||
return fmt.Sprintf("Tried to Put %d too many Chunks", len(b))
|
||||
}
|
||||
|
||||
func (b BackpressureError) AsHashes() hash.HashSlice {
|
||||
return hash.HashSlice(b)
|
||||
}
|
||||
|
||||
@@ -137,25 +137,10 @@ func (s *DynamoStore) Has(h hash.Hash) bool {
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func (s *DynamoStore) PutMany(chunks []Chunk) (e BackpressureError) {
|
||||
for i, c := range chunks {
|
||||
if s.unwrittenPuts.Has(c) {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case s.writeQueue <- c:
|
||||
s.requestWg.Add(1)
|
||||
s.unwrittenPuts.Add(c)
|
||||
default:
|
||||
notPut := chunks[i:]
|
||||
e = make(BackpressureError, len(notPut))
|
||||
for j, np := range notPut {
|
||||
e[j] = np.Hash()
|
||||
}
|
||||
return
|
||||
}
|
||||
func (s *DynamoStore) PutMany(chunks []Chunk) {
|
||||
for _, c := range chunks {
|
||||
s.Put(c)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *DynamoStore) Put(c Chunk) {
|
||||
|
||||
@@ -65,11 +65,10 @@ func (ms *MemoryStore) Put(c Chunk) {
|
||||
ms.data[c.Hash()] = c
|
||||
}
|
||||
|
||||
func (ms *MemoryStore) PutMany(chunks []Chunk) (e BackpressureError) {
|
||||
func (ms *MemoryStore) PutMany(chunks []Chunk) {
|
||||
for _, c := range chunks {
|
||||
ms.Put(c)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MemoryStore) Len() int {
|
||||
|
||||
@@ -51,11 +51,10 @@ func (s *TestStore) Put(c Chunk) {
|
||||
s.MemoryStore.Put(c)
|
||||
}
|
||||
|
||||
func (s *TestStore) PutMany(chunks []Chunk) (e BackpressureError) {
|
||||
func (s *TestStore) PutMany(chunks []Chunk) {
|
||||
for _, c := range chunks {
|
||||
s.Put(c)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// TestStoreFactory is public, and exposes Stores to ensure that test code can directly query instances vended by this factory.
|
||||
|
||||
@@ -171,7 +171,7 @@ func (suite *HTTPBatchStoreSuite) TestPutChunkWithHints() {
|
||||
types.EncodeValue(vals[0], nil),
|
||||
types.EncodeValue(vals[1], nil),
|
||||
}
|
||||
suite.NoError(suite.cs.PutMany(chnx))
|
||||
suite.cs.PutMany(chnx)
|
||||
l := types.NewList(types.NewRef(vals[0]), types.NewRef(vals[1]))
|
||||
|
||||
suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{
|
||||
@@ -183,52 +183,6 @@ func (suite *HTTPBatchStoreSuite) TestPutChunkWithHints() {
|
||||
suite.Equal(3, suite.cs.Writes)
|
||||
}
|
||||
|
||||
type backpressureCS struct {
|
||||
chunks.ChunkStore
|
||||
tries int
|
||||
}
|
||||
|
||||
func (b *backpressureCS) PutMany(chnx []chunks.Chunk) chunks.BackpressureError {
|
||||
if chnx == nil {
|
||||
return nil
|
||||
}
|
||||
b.tries++
|
||||
|
||||
if len(chnx) <= b.tries {
|
||||
return b.ChunkStore.PutMany(chnx)
|
||||
}
|
||||
if bpe := b.ChunkStore.PutMany(chnx[:b.tries]); bpe != nil {
|
||||
return bpe
|
||||
}
|
||||
|
||||
bpe := make(chunks.BackpressureError, len(chnx)-b.tries)
|
||||
for i, c := range chnx[b.tries:] {
|
||||
bpe[i] = c.Hash()
|
||||
}
|
||||
return bpe
|
||||
}
|
||||
|
||||
func (suite *HTTPBatchStoreSuite) TestPutChunksBackpressure() {
|
||||
bpcs := &backpressureCS{ChunkStore: suite.cs}
|
||||
bs := NewHTTPBatchStoreForTest(bpcs)
|
||||
defer bs.Close()
|
||||
defer bpcs.Close()
|
||||
|
||||
vals := []types.Value{
|
||||
types.String("abc"),
|
||||
types.String("def"),
|
||||
}
|
||||
l := types.NewList()
|
||||
for _, v := range vals {
|
||||
bs.SchedulePut(types.EncodeValue(v, nil), 1, types.Hints{})
|
||||
l = l.Append(types.NewRef(v))
|
||||
}
|
||||
bs.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{})
|
||||
bs.Flush()
|
||||
|
||||
suite.Equal(6, suite.cs.Writes)
|
||||
}
|
||||
|
||||
func (suite *HTTPBatchStoreSuite) TestRoot() {
|
||||
c := types.EncodeValue(types.NewMap(), nil)
|
||||
suite.cs.Put(c)
|
||||
@@ -266,7 +220,7 @@ func (suite *HTTPBatchStoreSuite) TestGet() {
|
||||
chunks.NewChunk([]byte("abc")),
|
||||
chunks.NewChunk([]byte("def")),
|
||||
}
|
||||
suite.NoError(suite.cs.PutMany(chnx))
|
||||
suite.cs.PutMany(chnx)
|
||||
got := suite.store.Get(chnx[0].Hash())
|
||||
suite.Equal(chnx[0].Hash(), got.Hash())
|
||||
got = suite.store.Get(chnx[1].Hash())
|
||||
@@ -279,7 +233,7 @@ func (suite *HTTPBatchStoreSuite) TestGetMany() {
|
||||
chunks.NewChunk([]byte("def")),
|
||||
}
|
||||
notPresent := chunks.NewChunk([]byte("ghi")).Hash()
|
||||
suite.NoError(suite.cs.PutMany(chnx))
|
||||
suite.cs.PutMany(chnx)
|
||||
|
||||
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), notPresent)
|
||||
foundChunks := make(chan *chunks.Chunk)
|
||||
@@ -316,7 +270,7 @@ func (suite *HTTPBatchStoreSuite) TestGetManySomeCached() {
|
||||
chunks.NewChunk([]byte("def")),
|
||||
}
|
||||
cached := chunks.NewChunk([]byte("ghi"))
|
||||
suite.NoError(suite.cs.PutMany(chnx))
|
||||
suite.cs.PutMany(chnx)
|
||||
suite.store.SchedulePut(cached, 1, types.Hints{})
|
||||
|
||||
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), cached.Hash())
|
||||
@@ -334,7 +288,7 @@ func (suite *HTTPBatchStoreSuite) TestGetSame() {
|
||||
chunks.NewChunk([]byte("def")),
|
||||
chunks.NewChunk([]byte("def")),
|
||||
}
|
||||
suite.NoError(suite.cs.PutMany(chnx))
|
||||
suite.cs.PutMany(chnx)
|
||||
got := suite.store.Get(chnx[0].Hash())
|
||||
suite.Equal(chnx[0].Hash(), got.Hash())
|
||||
got = suite.store.Get(chnx[1].Hash())
|
||||
@@ -346,7 +300,7 @@ func (suite *HTTPBatchStoreSuite) TestHas() {
|
||||
chunks.NewChunk([]byte("abc")),
|
||||
chunks.NewChunk([]byte("def")),
|
||||
}
|
||||
suite.NoError(suite.cs.PutMany(chnx))
|
||||
suite.cs.PutMany(chnx)
|
||||
suite.True(suite.store.Has(chnx[0].Hash()))
|
||||
suite.True(suite.store.Has(chnx[1].Hash()))
|
||||
}
|
||||
|
||||
@@ -99,27 +99,17 @@ func (lbs *localBatchStore) Flush() {
|
||||
chunkChan := make(chan *chunks.Chunk, 128)
|
||||
go func() {
|
||||
err := lbs.unwrittenPuts.ExtractChunks(lbs.hashes, chunkChan)
|
||||
d.Chk.NoError(err)
|
||||
d.PanicIfError(err)
|
||||
close(chunkChan)
|
||||
}()
|
||||
|
||||
lbs.vbs.Prepare(lbs.hints)
|
||||
var bpe chunks.BackpressureError
|
||||
for c := range chunkChan {
|
||||
if bpe == nil {
|
||||
dc := lbs.vbs.DecodeUnqueued(c)
|
||||
bpe = lbs.vbs.Enqueue(*dc.Chunk, *dc.Value)
|
||||
} else {
|
||||
bpe = append(bpe, c.Hash())
|
||||
}
|
||||
}
|
||||
if bpe == nil {
|
||||
bpe = lbs.vbs.Flush()
|
||||
}
|
||||
// Should probably do a thing with bpe. Will need to keep track of chunk hashes that are SechedulePut'd in order to do this :-/
|
||||
if bpe != nil {
|
||||
d.PanicIfError(bpe) // guarded because if bpe == nil, this still fires for some reason. Maybe something to do with custom error type??
|
||||
dc := lbs.vbs.DecodeUnqueued(c)
|
||||
lbs.vbs.Enqueue(*dc.Chunk, *dc.Value)
|
||||
}
|
||||
lbs.vbs.Flush()
|
||||
|
||||
lbs.unwrittenPuts.Clear(lbs.hashes)
|
||||
lbs.hashes = hash.HashSet{}
|
||||
lbs.hints = types.Hints{}
|
||||
|
||||
@@ -164,22 +164,15 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
}
|
||||
}()
|
||||
|
||||
var bpe chunks.BackpressureError
|
||||
for ch := range decoded {
|
||||
dc := <-ch
|
||||
if dc.Chunk != nil && dc.Value != nil {
|
||||
if bpe == nil {
|
||||
totalDataWritten += len(dc.Chunk.Data())
|
||||
bpe = vbs.Enqueue(*dc.Chunk, *dc.Value)
|
||||
chunkCount++
|
||||
if chunkCount%100 == 0 {
|
||||
verbose.Log("Enqueued %d chunks", chunkCount)
|
||||
}
|
||||
} else {
|
||||
bpe = append(bpe, dc.Chunk.Hash())
|
||||
totalDataWritten += len(dc.Chunk.Data())
|
||||
vbs.Enqueue(*dc.Chunk, *dc.Value)
|
||||
chunkCount++
|
||||
if chunkCount%100 == 0 {
|
||||
verbose.Log("Enqueued %d chunks", chunkCount)
|
||||
}
|
||||
// If a previous Enqueue() errored, we still need to drain chunkChan
|
||||
// TODO: what about having Deserialize take a 'done' channel to stop it?
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,17 +181,7 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
panic(d.Wrap(fmt.Errorf("Deserialization failure: %v", err)))
|
||||
}
|
||||
|
||||
if bpe == nil {
|
||||
bpe = vbs.Flush()
|
||||
}
|
||||
if bpe != nil {
|
||||
w.WriteHeader(http.StatusTooManyRequests)
|
||||
w.Header().Add("Content-Type", "application/octet-stream")
|
||||
writer := respWriter(req, w)
|
||||
defer writer.Close()
|
||||
serializeHashes(writer, bpe.AsHashes())
|
||||
return
|
||||
}
|
||||
vbs.Flush()
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
|
||||
@@ -99,40 +99,6 @@ func TestHandleWriteValueDupChunks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleWriteValueBackpressure(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &backpressureCS{ChunkStore: chunks.NewMemoryStore()}
|
||||
db := NewDatabase(cs)
|
||||
|
||||
l := types.NewList(
|
||||
db.WriteValue(types.Bool(true)),
|
||||
db.WriteValue(types.Bool(false)),
|
||||
)
|
||||
r := db.WriteValue(l)
|
||||
_, err := db.CommitValue(db.GetDataset("datasetID"), r)
|
||||
assert.NoError(err)
|
||||
|
||||
hint := l.Hash()
|
||||
newItem := types.NewEmptyBlob()
|
||||
itemChunk := types.EncodeValue(newItem, nil)
|
||||
l2 := l.Insert(1, types.NewRef(newItem))
|
||||
listChunk := types.EncodeValue(l2, nil)
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
serializeHints(body, map[hash.Hash]struct{}{hint: {}})
|
||||
chunks.Serialize(itemChunk, body)
|
||||
chunks.Serialize(listChunk, body)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs)
|
||||
|
||||
if assert.Equal(http.StatusTooManyRequests, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
|
||||
hashes := deserializeHashes(w.Body)
|
||||
assert.Len(hashes, 1)
|
||||
assert.Equal(l2.Hash(), hashes[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildWriteValueRequest(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
input1, input2 := "abc", "def"
|
||||
@@ -212,8 +178,7 @@ func TestHandleGetRefs(t *testing.T) {
|
||||
chunks.NewChunk([]byte(input1)),
|
||||
chunks.NewChunk([]byte(input2)),
|
||||
}
|
||||
err := cs.PutMany(chnx)
|
||||
assert.NoError(err)
|
||||
cs.PutMany(chnx)
|
||||
|
||||
body := strings.NewReader(fmt.Sprintf("ref=%s&ref=%s", chnx[0].Hash(), chnx[1].Hash()))
|
||||
|
||||
@@ -316,8 +281,7 @@ func TestHandleHasRefs(t *testing.T) {
|
||||
chunks.NewChunk([]byte(input1)),
|
||||
chunks.NewChunk([]byte(input2)),
|
||||
}
|
||||
err := cs.PutMany(chnx)
|
||||
assert.NoError(err)
|
||||
cs.PutMany(chnx)
|
||||
|
||||
absent := hash.Parse("00000000000000000000000000000002")
|
||||
body := strings.NewReader(fmt.Sprintf("ref=%s&ref=%s&ref=%s", chnx[0].Hash(), chnx[1].Hash(), absent))
|
||||
|
||||
@@ -209,7 +209,7 @@ func (suite *BlockStoreSuite) TestCompactOnUpdateRoot() {
|
||||
}
|
||||
|
||||
root := smallTableStore.Root()
|
||||
suite.NoError(smallTableStore.PutMany(chunx[:testMaxTables]))
|
||||
smallTableStore.PutMany(chunx[:testMaxTables])
|
||||
suite.True(smallTableStore.UpdateRoot(chunx[0].Hash(), root)) // Commit write
|
||||
|
||||
exists, _, mRoot, specs := mm.ParseIfExists(nil)
|
||||
@@ -218,7 +218,7 @@ func (suite *BlockStoreSuite) TestCompactOnUpdateRoot() {
|
||||
suite.Len(specs, testMaxTables)
|
||||
|
||||
root = smallTableStore.Root()
|
||||
suite.NoError(smallTableStore.PutMany(chunx[testMaxTables:]))
|
||||
smallTableStore.PutMany(chunx[testMaxTables:])
|
||||
suite.True(smallTableStore.UpdateRoot(chunx[testMaxTables].Hash(), root)) // Should compact
|
||||
|
||||
exists, _, mRoot, specs = mm.ParseIfExists(nil)
|
||||
|
||||
@@ -178,20 +178,10 @@ func (nbs *NomsBlockStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints t
|
||||
nbs.Put(c)
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) PutMany(chunx []chunks.Chunk) (err chunks.BackpressureError) {
|
||||
for ; len(chunx) > 0; chunx = chunx[1:] {
|
||||
c := chunx[0]
|
||||
a := addr(c.Hash())
|
||||
if !nbs.addChunk(a, c.Data()) {
|
||||
break
|
||||
}
|
||||
nbs.putCount++
|
||||
}
|
||||
func (nbs *NomsBlockStore) PutMany(chunx []chunks.Chunk) {
|
||||
for _, c := range chunx {
|
||||
err = append(err, c.Hash())
|
||||
nbs.Put(c)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: figure out if there's a non-error reason for this to return false. If not, get rid of return value.
|
||||
|
||||
@@ -80,9 +80,8 @@ func (vbs *ValidatingBatchingSink) DecodeUnqueued(c *chunks.Chunk) DecodedChunk
|
||||
// ChunkStore. It is assumed that v is the Value decoded from c, and so v can
|
||||
// be used to validate the ref-completeness of c. The instance keeps an
|
||||
// internal buffer of Chunks, spilling to the ChunkStore when the buffer is
|
||||
// full. If an attempt to Put Chunks fails, this method returns the
|
||||
// BackpressureError from the underlying ChunkStore.
|
||||
func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk, v Value) (err chunks.BackpressureError) {
|
||||
// full.
|
||||
func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk, v Value) {
|
||||
h := c.Hash()
|
||||
vbs.vs.ensureChunksInCache(v)
|
||||
vbs.vs.set(h, hintedChunk{v.Type(), h}, false)
|
||||
@@ -91,25 +90,17 @@ func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk, v Value) (err chunks.
|
||||
vbs.count++
|
||||
|
||||
if vbs.count == batchSize {
|
||||
err = vbs.cs.PutMany(vbs.batch[:vbs.count])
|
||||
vbs.cs.PutMany(vbs.batch[:vbs.count])
|
||||
vbs.count = 0
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Flush Puts any Chunks buffered by Enqueue calls into the backing
|
||||
// ChunkStore. If the attempt to Put fails, this method returns the
|
||||
// BackpressureError returned by the underlying ChunkStore.
|
||||
func (vbs *ValidatingBatchingSink) Flush() (err chunks.BackpressureError) {
|
||||
// ChunkStore.
|
||||
func (vbs *ValidatingBatchingSink) Flush() {
|
||||
if vbs.count > 0 {
|
||||
err = vbs.cs.PutMany(vbs.batch[:vbs.count])
|
||||
vbs.cs.PutMany(vbs.batch[:vbs.count])
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
vbs.cs.Flush()
|
||||
}
|
||||
|
||||
vbs.cs.Flush()
|
||||
vbs.count = 0
|
||||
return
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ func TestValidatingBatchingSinkDecodeAlreadyEnqueued(t *testing.T) {
|
||||
c := EncodeValue(v, nil)
|
||||
vbs := NewValidatingBatchingSink(chunks.NewTestStore())
|
||||
|
||||
assert.NoError(t, vbs.Enqueue(c, v))
|
||||
vbs.Enqueue(c, v)
|
||||
dc := vbs.DecodeUnqueued(&c)
|
||||
assert.Nil(t, dc.Chunk)
|
||||
assert.Nil(t, dc.Value)
|
||||
@@ -106,8 +106,8 @@ func TestValidatingBatchingSinkEnqueueAndFlush(t *testing.T) {
|
||||
cs := chunks.NewTestStore()
|
||||
vbs := NewValidatingBatchingSink(cs)
|
||||
|
||||
assert.NoError(t, vbs.Enqueue(c, v))
|
||||
assert.NoError(t, vbs.Flush())
|
||||
vbs.Enqueue(c, v)
|
||||
vbs.Flush()
|
||||
assert.Equal(t, 1, cs.Writes)
|
||||
}
|
||||
|
||||
@@ -117,9 +117,9 @@ func TestValidatingBatchingSinkEnqueueImplicitFlush(t *testing.T) {
|
||||
|
||||
for i := 0; i <= batchSize; i++ {
|
||||
v := Number(i)
|
||||
assert.NoError(t, vbs.Enqueue(EncodeValue(v, nil), v))
|
||||
vbs.Enqueue(EncodeValue(v, nil), v)
|
||||
}
|
||||
assert.Equal(t, batchSize, cs.Writes)
|
||||
assert.NoError(t, vbs.Flush())
|
||||
vbs.Flush()
|
||||
assert.Equal(t, 1, cs.Writes-batchSize)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user