mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-06 11:20:30 -05:00
[store] use hasCache to minimize pendingRef pool (#7672)
* [store] use hasCache to minimize pendingRef pool * better interfaces * fmt * vet
This commit is contained in:
committed by
GitHub
parent
fc6019e983
commit
864f9621e0
@@ -810,7 +810,7 @@ func (dcs *DoltChunkStore) errorIfDangling(ctx context.Context, addrs hash.HashS
|
||||
// Get(), GetMany(), Has() and HasMany().
|
||||
func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
|
||||
addrs := hash.NewHashSet()
|
||||
err := getAddrs(c)(ctx, addrs)
|
||||
err := getAddrs(c)(ctx, addrs, func(h hash.Hash) bool { return false })
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -61,7 +61,11 @@ var ErrNothingToCollect = errors.New("no changes since last gc")
|
||||
type GetAddrsCurry func(c Chunk) GetAddrsCb
|
||||
|
||||
// GetAddrsCb adds the refs for a pre-specified chunk to |addrs|
|
||||
type GetAddrsCb func(ctx context.Context, addrs hash.HashSet) error
|
||||
type GetAddrsCb func(ctx context.Context, addrs hash.HashSet, exists PendingRefExists) error
|
||||
|
||||
type PendingRefExists func(hash.Hash) bool
|
||||
|
||||
func NoopPendingRefExists(_ hash.Hash) bool { return false }
|
||||
|
||||
// ChunkStore is the core storage abstraction in noms. We can put data
|
||||
// anyplace we have a ChunkStore implementation for.
|
||||
|
||||
@@ -37,7 +37,7 @@ type ChunkStoreTestSuite struct {
|
||||
}
|
||||
|
||||
func noopGetAddrs(c Chunk) GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) error {
|
||||
return func(ctx context.Context, addrs hash.HashSet, _ PendingRefExists) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -58,7 +58,7 @@ func (suite *ChunkStoreTestSuite) TestChunkStorePut() {
|
||||
data := []byte("bcd")
|
||||
nc := NewChunk(data)
|
||||
err = store.Put(ctx, nc, func(c Chunk) GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) error {
|
||||
return func(ctx context.Context, addrs hash.HashSet, _ PendingRefExists) error {
|
||||
addrs.Insert(hash.Of([]byte("nonsense")))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -231,7 +231,7 @@ func (ms *MemoryStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCu
|
||||
}
|
||||
|
||||
addrs := hash.NewHashSet()
|
||||
err := getAddrs(c)(ctx, addrs)
|
||||
err := getAddrs(c)(ctx, addrs, NoopPendingRefExists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -56,6 +56,10 @@ func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, found
|
||||
return s.ChunkStore.GetMany(ctx, hashes, found)
|
||||
}
|
||||
|
||||
func (s *TestStoreView) CacheHas(_ hash.Hash) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *TestStoreView) Has(ctx context.Context, h hash.Hash) (bool, error) {
|
||||
atomic.AddInt32(&s.hases, 1)
|
||||
return s.ChunkStore.Has(ctx, h)
|
||||
|
||||
@@ -44,7 +44,7 @@ func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.Tes
|
||||
}
|
||||
|
||||
func noopGetAddrs(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(_ context.Context, _ hash.HashSet) error { return nil }
|
||||
return func(_ context.Context, _ hash.HashSet, _ chunks.PendingRefExists) error { return nil }
|
||||
}
|
||||
|
||||
func writeToEmptyStore(store chunks.ChunkStore, src *dataSource, t assert.TestingT) {
|
||||
|
||||
@@ -58,7 +58,7 @@ func (fb fileBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (pres
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
func (fb fileBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
|
||||
func (fb fileBlockStore) Put(ctx context.Context, c chunks.Chunk, _ chunks.GetAddrsCurry) error {
|
||||
_, err := io.Copy(fb.bw, bytes.NewReader(c.Data()))
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ func (suite *BlockStoreSuite) TestChunkStoreNotDir() {
|
||||
}
|
||||
|
||||
func noopGetAddrs(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) error {
|
||||
return func(ctx context.Context, addrs hash.HashSet, _ chunks.PendingRefExists) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -162,7 +162,7 @@ func (suite *BlockStoreSuite) TestChunkStorePut() {
|
||||
// Put chunk with dangling ref should error on Commit
|
||||
nc := chunks.NewChunk([]byte("bcd"))
|
||||
err = suite.store.Put(context.Background(), nc, func(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) error {
|
||||
return func(ctx context.Context, addrs hash.HashSet, _ chunks.PendingRefExists) error {
|
||||
addrs.Insert(hash.Of([]byte("lorem ipsum")))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -307,7 +307,7 @@ func (gcs *GenerationalNBS) copyToOldGen(ctx context.Context, hashes hash.HashSe
|
||||
err = gcs.newGen.GetMany(ctx, notInOldGen, func(ctx context.Context, chunk *chunks.Chunk) {
|
||||
if putErr == nil {
|
||||
putErr = gcs.oldGen.Put(ctx, *chunk, func(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) error { return nil }
|
||||
return func(ctx context.Context, addrs hash.HashSet, _ chunks.PendingRefExists) error { return nil }
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
@@ -206,7 +206,7 @@ func TestNBSPruneTableFiles(t *testing.T) {
|
||||
c := chunks.NewChunk([]byte("it's a boy!"))
|
||||
addrs := hash.NewHashSet()
|
||||
ok, err := st.addChunk(ctx, c, func(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(ctx context.Context, _ hash.HashSet) error {
|
||||
return func(ctx context.Context, _ hash.HashSet, _ chunks.PendingRefExists) error {
|
||||
addrs.Insert(c.Hash())
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -291,7 +291,7 @@ func (ts tableSet) Size() int {
|
||||
func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[hash.Hash, struct{}], stats *Stats) (tableSet, error) {
|
||||
addrs := hash.NewHashSet()
|
||||
for _, getAddrs := range mt.getChildAddrs {
|
||||
getAddrs(ctx, addrs)
|
||||
getAddrs(ctx, addrs, func(h hash.Hash) bool { return hasCache.Contains(h) })
|
||||
}
|
||||
mt.addChildRefs(addrs)
|
||||
|
||||
|
||||
@@ -150,9 +150,11 @@ func (ns nodeStore) Write(ctx context.Context, nd Node) (hash.Hash, error) {
|
||||
assertTrue(c.Size() > 0, "cannot write empty chunk to ChunkStore")
|
||||
|
||||
getAddrs := func(ch chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) (err error) {
|
||||
return func(ctx context.Context, addrs hash.HashSet, exists chunks.PendingRefExists) (err error) {
|
||||
err = message.WalkAddresses(ctx, ch.Data(), func(ctx context.Context, a hash.Hash) error {
|
||||
addrs.Insert(a)
|
||||
if !exists(a) {
|
||||
addrs.Insert(a)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
|
||||
@@ -476,7 +476,7 @@ func (t *testProtocol) NewDatabase(sp Spec) (datas.Database, error) {
|
||||
}
|
||||
|
||||
func noopGetAddrs(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) error {
|
||||
return func(ctx context.Context, addrs hash.HashSet, _ chunks.PendingRefExists) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ func AddrsFromNomsValue(c chunks.Chunk, nbf *NomsBinFormat, addrs hash.HashSet)
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) getAddrs(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) error {
|
||||
return func(ctx context.Context, addrs hash.HashSet, _ chunks.PendingRefExists) error {
|
||||
return AddrsFromNomsValue(c, lvs.nbf, addrs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ func (f *FileValueStore) WriteValue(ctx context.Context, v types.Value) (types.R
|
||||
}
|
||||
|
||||
err = f.Put(ctx, c, func(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(ctx context.Context, addrs hash.HashSet) error {
|
||||
return func(ctx context.Context, addrs hash.HashSet, _ chunks.PendingRefExists) error {
|
||||
return types.AddrsFromNomsValue(c, f.nbf, addrs)
|
||||
}
|
||||
})
|
||||
@@ -156,6 +156,11 @@ func (f *FileValueStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
func (f *FileValueStore) CacheHas(h hash.Hash) bool {
|
||||
_, ok := f.chunks[h]
|
||||
return ok
|
||||
}
|
||||
|
||||
// HasMany returns the set of hashes that are absent from the store
|
||||
func (f *FileValueStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) {
|
||||
f.chunkLock.Lock()
|
||||
@@ -188,7 +193,7 @@ func (f *FileValueStore) errorIfDangling(ctx context.Context, addrs hash.HashSet
|
||||
// Put puts a chunk into the store
|
||||
func (f *FileValueStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
|
||||
addrs := hash.NewHashSet()
|
||||
err := getAddrs(c)(ctx, addrs)
|
||||
err := getAddrs(c)(ctx, addrs, f.CacheHas)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -304,7 +304,7 @@ func read(ctx context.Context, rd io.Reader) (hash.Hash, *FileValueStore, error)
|
||||
}
|
||||
|
||||
err = store.Put(ctx, ch, func(c chunks.Chunk) chunks.GetAddrsCb {
|
||||
return func(_ context.Context, _ hash.HashSet) error { return nil }
|
||||
return func(_ context.Context, _ hash.HashSet, _ chunks.PendingRefExists) error { return nil }
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user