mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-09 03:09:12 -06:00
Make server use GetMany to load hinted chunks (#3026)
Now that we have GetMany, the server can use it directly to let the chunk-fetching layer figure out the best way to batch up requests. A small refactor allows ValidatingBatchingSink to directly update the hint cache instead of relying on logic inside ReadValue to do it. I made that change because ReadValue now also does a bunch of other things around caching read values and checking a cache of 'pending Puts' that will never have anything in it when used from the server. Toward issue #3019
This commit is contained in:
@@ -122,7 +122,9 @@ func (s *DynamoStore) Get(h hash.Hash) Chunk {
|
||||
func (s *DynamoStore) GetMany(hashes hash.HashSet, foundChunks chan *Chunk) {
|
||||
for h, _ := range hashes {
|
||||
c := s.Get(h)
|
||||
foundChunks <- &c
|
||||
if !c.IsEmpty() {
|
||||
foundChunks <- &c
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -103,7 +103,9 @@ func (l *LevelDBStore) Get(ref hash.Hash) Chunk {
|
||||
func (l *LevelDBStore) GetMany(hashes hash.HashSet, foundChunks chan *Chunk) {
|
||||
for h, _ := range hashes {
|
||||
c := l.Get(h)
|
||||
foundChunks <- &c
|
||||
if !c.IsEmpty() {
|
||||
foundChunks <- &c
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -35,7 +35,9 @@ func (ms *MemoryStore) Get(h hash.Hash) Chunk {
|
||||
func (ms *MemoryStore) GetMany(hashes hash.HashSet, foundChunks chan *Chunk) {
|
||||
for h, _ := range hashes {
|
||||
c := ms.Get(h)
|
||||
foundChunks <- &c
|
||||
if !c.IsEmpty() {
|
||||
foundChunks <- &c
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -26,13 +26,15 @@ import (
|
||||
func TestHandleWriteValue(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
ds := NewDatabase(cs)
|
||||
db := NewDatabase(cs)
|
||||
|
||||
l := types.NewList(
|
||||
ds.WriteValue(types.Bool(true)),
|
||||
ds.WriteValue(types.Bool(false)),
|
||||
db.WriteValue(types.Bool(true)),
|
||||
db.WriteValue(types.Bool(false)),
|
||||
)
|
||||
ds.WriteValue(l)
|
||||
r := db.WriteValue(l)
|
||||
_, err := db.CommitValue(db.GetDataset("datasetID"), r)
|
||||
assert.NoError(err)
|
||||
|
||||
hint := l.Hash()
|
||||
newItem := types.NewEmptyBlob()
|
||||
@@ -49,8 +51,8 @@ func TestHandleWriteValue(t *testing.T) {
|
||||
HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs)
|
||||
|
||||
if assert.Equal(http.StatusCreated, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
|
||||
ds2 := NewDatabase(cs)
|
||||
v := ds2.ReadValue(l2.Hash())
|
||||
db2 := NewDatabase(cs)
|
||||
v := db2.ReadValue(l2.Hash())
|
||||
if assert.NotNil(v) {
|
||||
assert.True(v.Equals(l2), "%+v != %+v", v, l2)
|
||||
}
|
||||
@@ -89,8 +91,8 @@ func TestHandleWriteValueDupChunks(t *testing.T) {
|
||||
HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs)
|
||||
|
||||
if assert.Equal(http.StatusCreated, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
|
||||
ds2 := NewDatabase(cs)
|
||||
v := ds2.ReadValue(newItem.Hash())
|
||||
db := NewDatabase(cs)
|
||||
v := db.ReadValue(newItem.Hash())
|
||||
if assert.NotNil(v) {
|
||||
assert.True(v.Equals(newItem), "%+v != %+v", v, newItem)
|
||||
}
|
||||
@@ -100,13 +102,15 @@ func TestHandleWriteValueDupChunks(t *testing.T) {
|
||||
func TestHandleWriteValueBackpressure(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &backpressureCS{ChunkStore: chunks.NewMemoryStore()}
|
||||
ds := NewDatabase(cs)
|
||||
db := NewDatabase(cs)
|
||||
|
||||
l := types.NewList(
|
||||
ds.WriteValue(types.Bool(true)),
|
||||
ds.WriteValue(types.Bool(false)),
|
||||
db.WriteValue(types.Bool(true)),
|
||||
db.WriteValue(types.Bool(false)),
|
||||
)
|
||||
ds.WriteValue(l)
|
||||
r := db.WriteValue(l)
|
||||
_, err := db.CommitValue(db.GetDataset("datasetID"), r)
|
||||
assert.NoError(err)
|
||||
|
||||
hint := l.Hash()
|
||||
newItem := types.NewEmptyBlob()
|
||||
|
||||
@@ -44,7 +44,7 @@ type BatchStore interface {
|
||||
}
|
||||
|
||||
// Hints are a set of hashes that should be used to speed up the validation of one or more Chunks.
|
||||
type Hints map[hash.Hash]struct{}
|
||||
type Hints hash.HashSet
|
||||
|
||||
// BatchStoreAdaptor provides a naive implementation of BatchStore should only be used with ChunkStores that can Put relatively quickly. It provides no actual batching or validation. Its intended use is for adapting a ChunkStore for use in something that requires a BatchStore.
|
||||
type BatchStoreAdaptor struct {
|
||||
|
||||
@@ -32,19 +32,23 @@ func NewValidatingBatchingSink(cs chunks.ChunkStore) *ValidatingBatchingSink {
|
||||
|
||||
// Prepare primes the type info cache used to validate Enqueued Chunks by reading the Chunks referenced by the provided hints.
|
||||
func (vbs *ValidatingBatchingSink) Prepare(hints Hints) {
|
||||
rl := make(chan struct{}, batchSize)
|
||||
foundChunks := make(chan *chunks.Chunk, batchSize)
|
||||
wg := sync.WaitGroup{}
|
||||
for hint := range hints {
|
||||
for i := 0; i < batchSize; i++ {
|
||||
wg.Add(1)
|
||||
rl <- struct{}{}
|
||||
go func(hint hash.Hash) {
|
||||
vbs.vs.ReadValue(hint)
|
||||
<-rl
|
||||
wg.Done()
|
||||
}(hint)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
tc := vbs.pool.Get()
|
||||
defer vbs.pool.Put(tc)
|
||||
for c := range foundChunks {
|
||||
v := DecodeFromBytes(c.Data(), vbs.vs, tc.(*TypeCache))
|
||||
vbs.vs.setHintsForReadValue(v, c.Hash())
|
||||
}
|
||||
}()
|
||||
}
|
||||
vbs.cs.GetMany(hash.HashSet(hints), foundChunks)
|
||||
close(foundChunks)
|
||||
wg.Wait()
|
||||
close(rl)
|
||||
}
|
||||
|
||||
// DecodedChunk holds a pointer to a Chunk and the Value that results from
|
||||
|
||||
@@ -46,7 +46,7 @@ type ValueReadWriter interface {
|
||||
type ValueStore struct {
|
||||
bs BatchStore
|
||||
cacheMu sync.RWMutex
|
||||
cache map[hash.Hash]chunkCacheEntry
|
||||
hintCache map[hash.Hash]chunkCacheEntry
|
||||
pendingHints map[hash.Hash]chunkCacheEntry
|
||||
pendingMu sync.RWMutex
|
||||
pendingPuts map[hash.Hash]pendingChunk
|
||||
@@ -90,7 +90,7 @@ func NewValueStoreWithCache(bs BatchStore, cacheSize uint64) *ValueStore {
|
||||
return &ValueStore{
|
||||
bs: bs,
|
||||
cacheMu: sync.RWMutex{},
|
||||
cache: map[hash.Hash]chunkCacheEntry{},
|
||||
hintCache: map[hash.Hash]chunkCacheEntry{},
|
||||
pendingHints: map[hash.Hash]chunkCacheEntry{},
|
||||
pendingMu: sync.RWMutex{},
|
||||
pendingPuts: map[hash.Hash]pendingChunk{},
|
||||
@@ -132,18 +132,21 @@ func (lvs *ValueStore) ReadValue(h hash.Hash) Value {
|
||||
|
||||
v := DecodeValue(chunk, lvs)
|
||||
lvs.valueCache.Add(h, uint64(len(chunk.Data())), v)
|
||||
lvs.setHintsForReadValue(v, h)
|
||||
return v
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) setHintsForReadValue(v Value, h hash.Hash) {
|
||||
var entry chunkCacheEntry = absentChunk{}
|
||||
if v != nil {
|
||||
lvs.cacheChunks(v, h, false)
|
||||
// h is trivially a hint for v, so consider putting that in the cache. If we got to v by reading some higher-level chunk, this entry gets dropped on the floor because h already has a hint in the cache. If we later read some other chunk that references v, cacheChunks will overwrite this with a hint pointing to that chunk.
|
||||
lvs.setHintsForReachable(v, h, false)
|
||||
// h is trivially a hint for v, so consider putting that in the hintCache. If we got to v by reading some higher-level chunk, this entry gets dropped on the floor because h already has a hint in the hintCache. If we later read some other chunk that references v, setHintsForReachable will overwrite this with a hint pointing to that chunk.
|
||||
// If we don't do this, top-level Values that get read but not written -- such as the existing Head of a Database upon a Commit -- can be erroneously left out during a pull.
|
||||
entry = hintedChunk{v.Type(), h}
|
||||
}
|
||||
if cur := lvs.check(h); cur == nil || cur.Hint().IsEmpty() {
|
||||
lvs.set(h, entry, false)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// WriteValue takes a Value, schedules it to be written it to lvs, and returns
|
||||
@@ -176,7 +179,7 @@ func (lvs *ValueStore) WriteValue(v Value) Ref {
|
||||
})
|
||||
}()
|
||||
|
||||
lvs.cacheChunks(v, v.Hash(), true)
|
||||
lvs.setHintsForReachable(v, v.Hash(), true)
|
||||
lvs.set(h, (*presentChunk)(v.Type()), false)
|
||||
lvs.valueCache.Drop(h)
|
||||
return r
|
||||
@@ -205,8 +208,8 @@ func (lvs *ValueStore) Close() error {
|
||||
return lvs.bs.Close()
|
||||
}
|
||||
|
||||
// cacheChunks looks at the Chunks reachable from v and, for each one checks if there's a hint in the cache. If there isn't, or if the hint is a self-reference, the chunk gets r set as its new hint.
|
||||
func (lvs *ValueStore) cacheChunks(v Value, r hash.Hash, toPending bool) {
|
||||
// setHintsForReachable looks at the Chunks reachable from v and, for each one checks if there's a hint in the hintCache. If there isn't, or if the hint is a self-reference, the chunk gets r set as its new hint.
|
||||
func (lvs *ValueStore) setHintsForReachable(v Value, r hash.Hash, toPending bool) {
|
||||
v.WalkRefs(func(reachable Ref) {
|
||||
hash := reachable.TargetHash()
|
||||
if cur := lvs.check(hash); cur == nil || cur.Hint().IsEmpty() || cur.Hint() == hash {
|
||||
@@ -225,7 +228,7 @@ func (lvs *ValueStore) isPresent(r hash.Hash) (present bool) {
|
||||
func (lvs *ValueStore) check(r hash.Hash) chunkCacheEntry {
|
||||
lvs.cacheMu.RLock()
|
||||
defer lvs.cacheMu.RUnlock()
|
||||
return lvs.cache[r]
|
||||
return lvs.hintCache[r]
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) set(r hash.Hash, entry chunkCacheEntry, toPending bool) {
|
||||
@@ -234,7 +237,7 @@ func (lvs *ValueStore) set(r hash.Hash, entry chunkCacheEntry, toPending bool) {
|
||||
if toPending {
|
||||
lvs.pendingHints[r] = entry
|
||||
} else {
|
||||
lvs.cache[r] = entry
|
||||
lvs.hintCache[r] = entry
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,7 +246,7 @@ func (lvs *ValueStore) mergePendingHints() {
|
||||
defer lvs.cacheMu.Unlock()
|
||||
|
||||
for h, entry := range lvs.pendingHints {
|
||||
lvs.cache[h] = entry
|
||||
lvs.hintCache[h] = entry
|
||||
}
|
||||
lvs.pendingHints = map[hash.Hash]chunkCacheEntry{}
|
||||
}
|
||||
@@ -266,11 +269,11 @@ func (lvs *ValueStore) opCache() opCache {
|
||||
func (lvs *ValueStore) checkChunksInCache(v Value, readValues bool) Hints {
|
||||
hints := map[hash.Hash]struct{}{}
|
||||
collectHints := func(reachable Ref) {
|
||||
// First, check the type cache to see if reachable is already known to be valid.
|
||||
// First, check the hintCache to see if reachable is already known to be valid.
|
||||
targetHash := reachable.TargetHash()
|
||||
entry := lvs.check(targetHash)
|
||||
|
||||
// If it's not already in the cache, attempt to read the value directly, which will put it and its chunks into the cache.
|
||||
// If it's not already in the hintCache, attempt to read the value directly, which will put it and its chunks into the hintCache.
|
||||
if entry == nil || !entry.Present() {
|
||||
var reachableV Value
|
||||
if readValues {
|
||||
|
||||
Reference in New Issue
Block a user