Merge pull request #5485 from dolthub/aaron/types-value_store-remove-bufferedChunks

go/store/types: Remove bufferedChunks from ValueStore.
This commit is contained in:
Aaron Son
2023-03-06 09:35:27 -08:00
committed by GitHub
4 changed files with 22 additions and 463 deletions

View File

@@ -218,15 +218,15 @@ func (ms *MemoryStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCb
return err
}
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.pendingRefs == nil {
ms.pendingRefs = addrs
} else {
ms.pendingRefs.InsertAll(addrs)
}
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.pending == nil {
ms.pending = map[hash.Hash]Chunk{}
}

View File

@@ -74,7 +74,7 @@ func TestIncrementalLoadList(t *testing.T) {
actual := actualVar.(List)
expectedCount := cs.Reads()
assert.Equal(1, expectedCount)
assert.Equal(2, expectedCount)
// There will be one read per chunk.
chunkReads := make([]int, expected.Len())
for i := uint64(0); i < expected.Len(); i++ {

View File

@@ -73,16 +73,12 @@ type ValueReadWriter interface {
// Currently, WriteValue validates the following properties of a Value v:
// - v can be correctly serialized and its Ref taken
type ValueStore struct {
cs chunks.ChunkStore
bufferMu sync.RWMutex
bufferedChunks map[hash.Hash]chunks.Chunk
bufferedChunksMax uint64
bufferedChunkSize uint64
withBufferedChildren map[hash.Hash]uint64 // chunk Hash -> ref height
validateContentAddr bool
decodedChunks *sizecache.SizeCache
nbf *NomsBinFormat
cs chunks.ChunkStore
validateContentAddr bool
decodedChunks *sizecache.SizeCache
nbf *NomsBinFormat
gcMu sync.RWMutex
gcCond *sync.Cond
doingGC bool
@@ -145,16 +141,11 @@ func NewValueStore(cs chunks.ChunkStore) *ValueStore {
func newValueStoreWithCacheAndPending(cs chunks.ChunkStore, cacheSize, pendingMax uint64) *ValueStore {
vs := &ValueStore{
cs: cs,
bufferMu: sync.RWMutex{},
bufferedChunks: map[hash.Hash]chunks.Chunk{},
bufferedChunksMax: pendingMax,
withBufferedChildren: map[hash.Hash]uint64{},
decodedChunks: sizecache.New(cacheSize),
versOnce: sync.Once{},
cs: cs,
decodedChunks: sizecache.New(cacheSize),
versOnce: sync.Once{},
}
vs.gcCond = sync.NewCond(&vs.bufferMu)
vs.gcCond = sync.NewCond(&vs.gcMu)
return vs
}
@@ -186,42 +177,20 @@ func (lvs *ValueStore) Format() *NomsBinFormat {
func (lvs *ValueStore) ReadValue(ctx context.Context, h hash.Hash) (Value, error) {
lvs.versOnce.Do(lvs.expectVersion)
if v, ok := lvs.decodedChunks.Get(h); ok {
if v == nil {
return nil, errors.New("value present but empty")
}
d.PanicIfTrue(v == nil)
nv := v.(Value)
if lvs.validateContentAddr {
if err := validateContentAddress(lvs.nbf, h, nv); err != nil {
return nil, err
}
}
return nv, nil
}
chunk := func() chunks.Chunk {
lvs.bufferMu.RLock()
defer lvs.bufferMu.RUnlock()
if pending, ok := lvs.bufferedChunks[h]; ok {
return pending
}
return chunks.EmptyChunk
}()
if chunk.IsEmpty() {
var err error
chunk, err = lvs.cs.Get(ctx, h)
if err != nil {
return nil, err
}
chunk, err := lvs.cs.Get(ctx, h)
if err != nil {
return nil, err
}
if chunk.IsEmpty() {
return nil, nil
}
v, err := DecodeValue(chunk, lvs)
if err != nil {
return nil, err
}
@@ -273,36 +242,10 @@ func (lvs *ValueStore) ReadManyValues(ctx context.Context, hashes hash.HashSlice
for _, h := range hashes {
if v, ok := lvs.decodedChunks.Get(h); ok {
d.PanicIfTrue(v == nil)
nv := v.(Value)
if lvs.validateContentAddr {
if err := validateContentAddress(lvs.nbf, h, nv); err != nil {
return nil, err
}
}
foundValues[h] = nv
continue
}
chunk := func() chunks.Chunk {
lvs.bufferMu.RLock()
defer lvs.bufferMu.RUnlock()
if pending, ok := lvs.bufferedChunks[h]; ok {
return pending
}
return chunks.EmptyChunk
}()
if !chunk.IsEmpty() {
var err error
foundValues[h], err = decode(h, &chunk)
if err != nil {
return nil, err
}
continue
}
remaining.Insert(h)
}
@@ -369,7 +312,7 @@ func (lvs *ValueStore) WriteValue(ctx context.Context, v Value) (Ref, error) {
return Ref{}, err
}
err = lvs.bufferChunk(ctx, v, c, height)
err = lvs.cs.Put(ctx, c, lvs.getAddrs)
if err != nil {
return Ref{}, err
}
@@ -377,134 +320,6 @@ func (lvs *ValueStore) WriteValue(ctx context.Context, v Value) (Ref, error) {
return r, nil
}
// bufferChunk enqueues c (which is the serialization of v) within this
// ValueStore. Buffered chunks are flushed progressively to the underlying
// ChunkStore in a way which attempts to locate children and grandchildren
// sequentially together. The following invariants are retained:
//
// 1. For any given chunk currently in the buffer, only direct children of the
// chunk may also be presently buffered (any grandchildren will have been
// flushed).
// 2. The total data occupied by buffered chunks does not exceed
// lvs.bufferedChunksMax
func (lvs *ValueStore) bufferChunk(ctx context.Context, v Value, c chunks.Chunk, height uint64) error {
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
lvs.waitForGC()
if lvs.Format().UsesFlatbuffers() {
// We do not do write buffering in the new format.
// Ref heights are not universally known, and the
// invariants mentioned above cannot be maintained
// in the general case.
//
// Buffering with full dependency tracking would be
// possible, and in __DOLT__, WalkAddrs may be
// cheap enough that it would be possible to get back
// cache-locality in our flushes without ref heights.
return lvs.cs.Put(ctx, c, lvs.getAddrs)
}
d.PanicIfTrue(height == 0)
h := c.Hash()
if _, present := lvs.bufferedChunks[h]; !present {
lvs.bufferedChunks[h] = c
lvs.bufferedChunkSize += uint64(len(c.Data()))
}
put := func(h hash.Hash, c chunks.Chunk) error {
err := lvs.cs.Put(ctx, c, lvs.getAddrs)
if err != nil {
return err
}
lvs.bufferedChunkSize -= uint64(len(c.Data()))
delete(lvs.bufferedChunks, h)
delete(lvs.withBufferedChildren, h)
return nil
}
putChildren := func(parent hash.Hash) error {
pending, isBuffered := lvs.bufferedChunks[parent]
if !isBuffered {
return nil
}
err := walkRefs(pending.Data(), lvs.nbf, func(grandchildRef Ref) error {
gch := grandchildRef.TargetHash()
if pending, present := lvs.bufferedChunks[gch]; present {
return put(gch, pending)
}
return nil
})
if err != nil {
return err
}
delete(lvs.withBufferedChildren, parent)
return nil
}
// Enforce invariant (1)
if height > 1 {
err := v.walkRefs(lvs.nbf, func(childRef Ref) error {
childHash := childRef.TargetHash()
if _, isBuffered := lvs.bufferedChunks[childHash]; isBuffered {
lvs.withBufferedChildren[h] = height
}
if _, hasBufferedChildren := lvs.withBufferedChildren[childHash]; hasBufferedChildren {
return putChildren(childHash)
}
return nil
})
if err != nil {
return err
}
}
// Enforce invariant (2)
for lvs.bufferedChunkSize > lvs.bufferedChunksMax {
var tallest hash.Hash
var height uint64 = 0
for parent, ht := range lvs.withBufferedChildren {
if ht > height {
tallest = parent
height = ht
}
}
if height == 0 { // This can happen if there are no pending parents
var chunk chunks.Chunk
for tallest, chunk = range lvs.bufferedChunks {
// Any pendingPut is as good as another in this case, so take the first one
break
}
err := put(tallest, chunk)
if err != nil {
return err
}
continue
}
err := putChildren(tallest)
if err != nil {
return err
}
}
return nil
}
func (lvs *ValueStore) Root(ctx context.Context) (hash.Hash, error) {
root, err := lvs.cs.Root(ctx)
@@ -519,13 +334,6 @@ func (lvs *ValueStore) Rebase(ctx context.Context) error {
return lvs.cs.Rebase(ctx)
}
func (lvs *ValueStore) Flush(ctx context.Context) error {
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
lvs.waitForGC()
return lvs.flush(ctx, hash.Hash{})
}
// Call with lvs.bufferMu locked. Blocks until doingGC == false, releasing the
// lock while we are blocked. Returns with the lock held, doingGC == false.
func (lvs *ValueStore) waitForGC() {
@@ -542,8 +350,8 @@ func (lvs *ValueStore) waitForGC() {
// When val == true, this routine will block until it has a unique opportunity
// to toggle doingGC from false to true while holding the lock.
func (lvs *ValueStore) toggleGC(val bool) {
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
if !val {
if !lvs.doingGC {
panic("tried to toggleGC to false while it was not true...")
@@ -557,57 +365,6 @@ func (lvs *ValueStore) toggleGC(val bool) {
return
}
func (lvs *ValueStore) flush(ctx context.Context, current hash.Hash) error {
put := func(h hash.Hash, chunk chunks.Chunk) error {
err := lvs.cs.Put(ctx, chunk, lvs.getAddrs)
if err != nil {
return err
}
delete(lvs.bufferedChunks, h)
delete(lvs.withBufferedChildren, h)
lvs.bufferedChunkSize -= uint64(len(chunk.Data()))
return nil
}
for parent := range lvs.withBufferedChildren {
if pending, present := lvs.bufferedChunks[parent]; present {
err := walkRefs(pending.Data(), lvs.nbf, func(reachable Ref) error {
if pending, present := lvs.bufferedChunks[reachable.TargetHash()]; present {
return put(reachable.TargetHash(), pending)
}
return nil
})
if err != nil {
return err
}
err = put(parent, pending)
if err != nil {
return err
}
}
}
for _, c := range lvs.bufferedChunks {
// Can't use put() because it's wrong to delete from a lvs.bufferedChunks while iterating it.
err := lvs.cs.Put(ctx, c, lvs.getAddrs)
if err != nil {
return err
}
lvs.bufferedChunkSize -= uint64(len(c.Data()))
}
d.PanicIfFalse(lvs.bufferedChunkSize == 0)
lvs.withBufferedChildren = map[hash.Hash]uint64{}
lvs.bufferedChunks = map[hash.Hash]chunks.Chunk{}
return nil
}
// Commit flushes all bufferedChunks into the ChunkStore, with best-effort
// locality, and attempts to Commit, updating the root to |current| (or keeping
// it the same as Root()). If the root has moved since this ValueStore was
@@ -615,15 +372,10 @@ func (lvs *ValueStore) flush(ctx context.Context, current hash.Hash) error {
// rebased. Until Commit() succeeds, no work of the ValueStore will be visible
// to other readers of the underlying ChunkStore.
func (lvs *ValueStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
lvs.waitForGC()
err := lvs.flush(ctx, current)
if err != nil {
return false, err
}
success, err := lvs.cs.Commit(ctx, current, last)
if err != nil {
return false, err
@@ -662,21 +414,11 @@ func makeBatches(hss []hash.HashSet, count int) [][]hash.Hash {
return res
}
func (lvs *ValueStore) numBuffChunks() int {
lvs.bufferMu.RLock()
defer lvs.bufferMu.RUnlock()
return len(lvs.bufferedChunks)
}
// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error {
lvs.toggleGC(true)
defer lvs.toggleGC(false)
if lvs.numBuffChunks() > 0 {
return errors.New("invalid GC state; bufferedChunks must be empty.")
}
lvs.versOnce.Do(lvs.expectVersion)
root, err := lvs.Root(ctx)
@@ -813,18 +555,7 @@ func (lvs *ValueStore) gcProcessRefs(ctx context.Context, visited hash.HashSet,
}
}
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
if len(lvs.bufferedChunks) > 0 {
return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.")
}
// purge the cache
lvs.decodedChunks.Purge()
lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize)
lvs.bufferedChunkSize = 0
lvs.withBufferedChildren = map[hash.Hash]uint64{}
return nil
}

View File

@@ -122,178 +122,6 @@ func TestValueReadMany(t *testing.T) {
}
}
func TestValueWriteFlush(t *testing.T) {
assert := assert.New(t)
vals := ValueSlice{String("hello"), Bool(true), Float(42)}
vs := newTestValueStore()
if vs.Format().UsesFlatbuffers() {
t.Skip()
}
hashes := hash.HashSet{}
for _, v := range vals {
hashes.Insert(mustRef(vs.WriteValue(context.Background(), v)).TargetHash())
}
assert.NotZero(vs.bufferedChunkSize)
rt, err := vs.Root(context.Background())
require.NoError(t, err)
_, err = vs.Commit(context.Background(), rt, rt)
require.NoError(t, err)
assert.Zero(vs.bufferedChunkSize)
}
type checkingChunkStore struct {
chunks.ChunkStore
a *assert.Assertions
expectedOrder hash.HashSlice
}
func (cbs *checkingChunkStore) expect(rs ...Ref) {
for _, r := range rs {
cbs.expectedOrder = append(cbs.expectedOrder, r.TargetHash())
}
}
func (cbs *checkingChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
if cbs.a.NotZero(len(cbs.expectedOrder), "Unexpected Put of %s", c.Hash()) {
cbs.a.Equal(cbs.expectedOrder[0], c.Hash())
cbs.expectedOrder = cbs.expectedOrder[1:]
}
return cbs.ChunkStore.Put(context.Background(), c, getAddrs)
}
func (cbs *checkingChunkStore) Flush() {
cbs.a.Empty(cbs.expectedOrder)
}
func TestFlushOrder(t *testing.T) {
assert := assert.New(t)
storage := &chunks.TestStorage{}
ccs := &checkingChunkStore{storage.NewView(), assert, nil}
vs := NewValueStore(ccs)
// Graph, which should be flushed grandchildren-first, bottom-up
// l
// / \
// ml1 ml2
// / \ \
// b ml f
// / \
// s n
//
// Expected order: s, n, b, ml, f, ml1, ml2, l
s := String("oy")
n := Float(42)
sr, err := vs.WriteValue(context.Background(), s)
require.NoError(t, err)
nr, err := vs.WriteValue(context.Background(), n)
require.NoError(t, err)
ccs.expect(sr, nr)
ml, err := NewList(context.Background(), vs, sr, nr)
require.NoError(t, err)
b, err := NewEmptyBlob(vs)
require.NoError(t, err)
br, err := vs.WriteValue(context.Background(), b)
require.NoError(t, err)
mlr, err := vs.WriteValue(context.Background(), ml)
require.NoError(t, err)
ccs.expect(br, mlr)
ml1, err := NewList(context.Background(), vs, br, mlr)
require.NoError(t, err)
f := Bool(false)
fr, err := vs.WriteValue(context.Background(), f)
require.NoError(t, err)
ccs.expect(fr)
ml2, err := NewList(context.Background(), vs, fr)
require.NoError(t, err)
ml1r, err := vs.WriteValue(context.Background(), ml1)
require.NoError(t, err)
ml2r, err := vs.WriteValue(context.Background(), ml2)
require.NoError(t, err)
ccs.expect(ml1r, ml2r)
l, err := NewList(context.Background(), vs, ml1r, ml2r)
require.NoError(t, err)
r, err := vs.WriteValue(context.Background(), l)
require.NoError(t, err)
ccs.expect(r)
rt, err := vs.Root(context.Background())
require.NoError(t, err)
_, err = vs.Commit(context.Background(), rt, rt)
require.NoError(t, err)
}
func TestFlushOverSize(t *testing.T) {
assert := assert.New(t)
storage := &chunks.TestStorage{}
ccs := &checkingChunkStore{storage.NewView(), assert, nil}
vs := newValueStoreWithCacheAndPending(ccs, 0, 30)
s := String("oy")
sr, err := vs.WriteValue(context.Background(), s)
require.NoError(t, err)
ccs.expect(sr)
NewList(context.Background(), vs, sr) // will write the root chunk
}
func TestTolerateTopDown(t *testing.T) {
assert := assert.New(t)
storage := &chunks.TestStorage{}
ccs := &checkingChunkStore{storage.NewView(), assert, nil}
vs := NewValueStore(ccs)
// Once the L-ML-S portion of this graph is written once, it's legal to make a Struct ST that contains a ref directly to ML and write it. Then you can write S and ML and Flush ST, which contitutes top-down writing.
// L ST
// \ /
// ML
// /
// S
S := String("oy")
sr, err := vs.WriteValue(context.Background(), S)
require.NoError(t, err)
ccs.expect(sr)
ML, err := NewList(context.Background(), vs, sr)
require.NoError(t, err)
mlr, err := vs.WriteValue(context.Background(), ML)
require.NoError(t, err)
ccs.expect(mlr)
L, err := NewList(context.Background(), vs, mlr)
require.NoError(t, err)
lr, err := vs.WriteValue(context.Background(), L)
require.NoError(t, err)
ccs.expect(lr)
rt, err := vs.Root(context.Background())
require.NoError(t, err)
_, err = vs.Commit(context.Background(), rt, rt)
require.NoError(t, err)
assert.Zero(len(vs.bufferedChunks))
ST, err := NewStruct(vs.Format(), "", StructData{"r": mlr})
require.NoError(t, err)
str, err := vs.WriteValue(context.Background(), ST) // ST into bufferedChunks
require.NoError(t, err)
_, err = vs.WriteValue(context.Background(), S) // S into bufferedChunks
require.NoError(t, err)
_, err = vs.WriteValue(context.Background(), ML) // ML into bufferedChunks AND withBufferedChunks
require.NoError(t, err)
// At this point, ValueStore believes ST is a standalone chunk, and that ML -> S
// So, it'll look at ML, the one parent it knows about, first and write its child (S). Then, it'll write ML, and then it'll flush the remaining buffered chunks, which is just ST.
ccs.expect(sr, mlr, str)
rt, err = vs.Root(context.Background())
require.NoError(t, err)
_, err = vs.Commit(context.Background(), rt, rt)
require.NoError(t, err)
}
func TestPanicOnBadVersion(t *testing.T) {
storage := &chunks.MemoryStorage{}
t.Run("Read", func(t *testing.T) {