Merge pull request #2217 from dolthub/aaron/remotestorage-update-size-limited

go/libraries/doltcore/remotestorage: Interface for cache returns true if cache is full / truncated in size.
This commit is contained in:
Aaron Son
2021-10-05 16:12:58 -07:00
committed by GitHub
31 changed files with 136 additions and 103 deletions
@@ -22,7 +22,7 @@ import (
// ChunkCache is an interface used for caching chunks
type ChunkCache interface {
// Put puts a slice of chunks into the cache.
Put(c []nbs.CompressedChunk)
Put(c []nbs.CompressedChunk) bool
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
@@ -250,7 +250,7 @@ type CacheStats interface {
func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
hashes := hash.HashSet{h: struct{}{}}
var found *chunks.Chunk
err := dcs.GetMany(ctx, hashes, func(c *chunks.Chunk) { found = c })
err := dcs.GetMany(ctx, hashes, func(_ context.Context, c *chunks.Chunk) { found = c })
if err != nil {
return chunks.EmptyChunk, err
}
@@ -261,10 +261,10 @@ func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
}
}
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error {
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
ae := atomicerr.New()
decompressedSize := uint64(0)
err := dcs.GetManyCompressed(ctx, hashes, func(cc nbs.CompressedChunk) {
err := dcs.GetManyCompressed(ctx, hashes, func(ctx context.Context, cc nbs.CompressedChunk) {
if ae.IsSet() {
return
}
@@ -273,7 +273,7 @@ func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
return
}
atomic.AddUint64(&decompressedSize, uint64(len(c.Data())))
found(&c)
found(ctx, &c)
})
if span := opentracing.SpanFromContext(ctx); span != nil {
span.LogKV("decompressed_bytes", decompressedSize)
@@ -289,7 +289,7 @@ func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
// GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks
// which have been found. Any non-present chunks will silently be ignored.
func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(nbs.CompressedChunk)) error {
func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, nbs.CompressedChunk)) error {
span, ctx := tracing.StartSpan(ctx, "remotestorage.GetManyCompressed")
defer span.Finish()
@@ -305,7 +305,7 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
if c.IsEmpty() {
notCached = append(notCached, h)
} else {
found(c)
found(ctx, c)
}
}
@@ -364,6 +364,9 @@ func (gr *GetRange) SplitAtGaps(maxGapBytes uint64) []*GetRange {
if gr.GapBetween(j-1, j) > maxGapBytes {
break
}
if gr.GapBetween(i, j) > MaxFetchSize {
break
}
j++
}
res = append(res, &GetRange{Url: gr.Url, Ranges: gr.Ranges[i:j]})
@@ -606,15 +609,13 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (d
return res, nil
}
func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.HashSet, notCached []hash.Hash, found func(nbs.CompressedChunk)) error {
func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.HashSet, notCached []hash.Hash, found func(context.Context, nbs.CompressedChunk)) error {
// get the locations where the chunks can be downloaded from
dlLocs, err := dcs.getDLLocs(ctx, notCached)
if err != nil {
return err
}
var wg sync.WaitGroup
// channel to receive chunks on
chunkChan := make(chan nbs.CompressedChunk, 128)
@@ -623,35 +624,37 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.H
toSend[h] = struct{}{}
}
eg, egCtx := errgroup.WithContext(ctx)
// start a go routine to receive the downloaded chunks on
wg.Add(1)
go func() {
defer wg.Done()
for chunk := range chunkChan {
dcs.cache.PutChunk(chunk)
eg.Go(func() error {
for {
select {
case chunk, ok := <-chunkChan:
if !ok {
return nil
}
if dcs.cache.PutChunk(chunk) {
return errors.New("too much data...")
}
h := chunk.Hash()
h := chunk.Hash()
if _, send := toSend[h]; send {
found(chunk)
if _, send := toSend[h]; send {
found(egCtx, chunk)
}
case <-egCtx.Done():
return nil
}
}
}()
})
// download the chunks and close the channel after
func() {
eg.Go(func() error {
defer close(chunkChan)
err = dcs.downloadChunks(ctx, dlLocs, chunkChan)
}()
return dcs.downloadChunks(egCtx, dlLocs, chunkChan)
})
// wait for all the results to finish processing
wg.Wait()
if err != nil {
return err
}
return nil
return eg.Wait()
}
// Returns true iff the value at the address |h| is contained in the
@@ -736,7 +739,9 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
}
if len(found) > 0 {
dcs.cache.Put(found)
if dcs.cache.Put(found) {
return hash.HashSet{}, errors.New("too much data")
}
}
return absent, nil
@@ -748,7 +753,9 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
// Get(), GetMany(), Has() and HasMany().
func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk) error {
cc := nbs.ChunkToCompressedChunk(c)
dcs.cache.Put([]nbs.CompressedChunk{cc})
if dcs.cache.Put([]nbs.CompressedChunk{cc}) {
return errors.New("too much data")
}
return nil
}
@@ -999,6 +1006,8 @@ const (
chunkAggDistance = 8 * 1024
)
const MaxFetchSize = 128 * 1024 * 1024
var defaultConcurrency ConcurrencyParams = ConcurrencyParams{
ConcurrentSmallFetches: 64,
ConcurrentLargeFetches: 2,
@@ -37,7 +37,7 @@ func newMapChunkCache() *mapChunkCache {
}
// Put puts a slice of chunks into the cache.
func (mcc *mapChunkCache) Put(chnks []nbs.CompressedChunk) {
func (mcc *mapChunkCache) Put(chnks []nbs.CompressedChunk) bool {
mcc.mu.Lock()
defer mcc.mu.Unlock()
@@ -57,6 +57,8 @@ func (mcc *mapChunkCache) Put(chnks []nbs.CompressedChunk) {
mcc.toFlush[h] = c
}
}
return false
}
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
@@ -94,8 +96,6 @@ func (mcc *mapChunkCache) Has(hashes hash.HashSet) (absent hash.HashSet) {
return absent
}
// PutChunk puts a single chunk in the cache. true returns in the event that the chunk was cached successfully
// and false is returned if that chunk is already is the cache.
func (mcc *mapChunkCache) PutChunk(ch nbs.CompressedChunk) bool {
mcc.mu.Lock()
defer mcc.mu.Unlock()
@@ -104,7 +104,6 @@ func (mcc *mapChunkCache) PutChunk(ch nbs.CompressedChunk) bool {
if existing, ok := mcc.hashToChunk[h]; !ok || existing.IsEmpty() {
mcc.hashToChunk[h] = ch
mcc.toFlush[h] = ch
return true
}
return false
@@ -83,9 +83,8 @@ func TestMapChunkCache(t *testing.T) {
assert.True(t, reflect.DeepEqual(absent, moreHashes), "unexpected absent hashset (seed %d)", seed)
assert.False(t, mapChunkCache.PutChunk(chks[0]), "existing chunk should return false (seed %d)", seed)
assert.True(t, mapChunkCache.PutChunk(moreChks[0]), "new chunk should return true (seed %d)", seed)
mapChunkCache.PutChunk(chks[0])
mapChunkCache.PutChunk(moreChks[0])
toFlush = mapChunkCache.GetAndClearChunksToFlush()
@@ -29,7 +29,8 @@ var noopChunkCache = &noopChunkCacheImpl{}
type noopChunkCacheImpl struct {
}
func (*noopChunkCacheImpl) Put(chnks []nbs.CompressedChunk) {
func (*noopChunkCacheImpl) Put(chnks []nbs.CompressedChunk) bool {
return false
}
func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]nbs.CompressedChunk {
@@ -41,7 +42,7 @@ func (*noopChunkCacheImpl) Has(hashes hash.HashSet) (absent hash.HashSet) {
}
func (*noopChunkCacheImpl) PutChunk(ch nbs.CompressedChunk) bool {
return true
return false
}
func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk {
+1 -1
View File
@@ -41,7 +41,7 @@ type ChunkStore interface {
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
GetMany(ctx context.Context, hashes hash.HashSet, found func(*Chunk)) error
GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *Chunk)) error
// Returns true iff the value at the address |h| is contained in the
// store
+1 -1
View File
@@ -77,7 +77,7 @@ func (csMW *CSMetricWrapper) Get(ctx context.Context, h hash.Hash) (Chunk, error
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
func (csMW *CSMetricWrapper) GetMany(ctx context.Context, hashes hash.HashSet, found func(*Chunk)) error {
func (csMW *CSMetricWrapper) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *Chunk)) error {
atomic.AddInt32(&csMW.TotalChunkGets, int32(len(hashes)))
return csMW.cs.GetMany(ctx, hashes, found)
}
+2 -2
View File
@@ -139,7 +139,7 @@ func (ms *MemoryStoreView) Get(ctx context.Context, h hash.Hash) (Chunk, error)
return ms.storage.Get(ctx, h)
}
func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, found func(*Chunk)) error {
func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *Chunk)) error {
for h := range hashes {
c, err := ms.Get(ctx, h)
@@ -148,7 +148,7 @@ func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, fou
}
if !c.IsEmpty() {
found(&c)
found(ctx, &c)
}
}
+1 -1
View File
@@ -66,7 +66,7 @@ func (s *TestStoreView) Get(ctx context.Context, h hash.Hash) (Chunk, error) {
return s.ChunkStore.Get(ctx, h)
}
func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, found func(*Chunk)) error {
func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *Chunk)) error {
atomic.AddInt32(&s.reads, int32(len(hashes)))
return s.ChunkStore.GetMany(ctx, hashes, found)
}
+1 -1
View File
@@ -370,7 +370,7 @@ func PullWithoutBatching(ctx context.Context, srcDB, sinkDB Database, sourceRef
func getChunks(ctx context.Context, srcDB Database, batch hash.HashSlice, sampleSize uint64, sampleCount uint64, updateProgress func(moreDone uint64, moreKnown uint64, moreApproxBytesWritten uint64)) (map[hash.Hash]*chunks.Chunk, error) {
mu := &sync.Mutex{}
neededChunks := map[hash.Hash]*chunks.Chunk{}
err := srcDB.chunkStore().GetMany(ctx, batch.HashSet(), func(c *chunks.Chunk) {
err := srcDB.chunkStore().GetMany(ctx, batch.HashSet(), func(ctx context.Context, c *chunks.Chunk) {
mu.Lock()
defer mu.Unlock()
neededChunks[c.Hash()] = c
+6 -1
View File
@@ -377,7 +377,12 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
ae := atomicerr.New()
go func() {
defer close(found)
err := p.srcChunkStore.GetManyCompressed(ctx, batch, func(c nbs.CompressedChunk) { found <- c })
err := p.srcChunkStore.GetManyCompressed(ctx, batch, func(ctx context.Context, c nbs.CompressedChunk) {
select {
case found <- c:
case <-ctx.Done():
}
})
ae.SetIfError(err)
}()
@@ -137,7 +137,12 @@ func benchmarkReadMany(openStore storeOpenFn, hashes hashSlice, src *dataSource,
wg.Add(1)
go func(hashes hash.HashSlice) {
chunkChan := make(chan *chunks.Chunk, len(hashes))
err := store.GetMany(context.Background(), hashes.HashSet(), func(c *chunks.Chunk) { chunkChan <- c })
err := store.GetMany(context.Background(), hashes.HashSet(), func(ctx context.Context, c *chunks.Chunk) {
select {
case chunkChan <- c:
case <-ctx.Done():
}
})
d.PanicIfError(err)
@@ -153,7 +158,12 @@ func benchmarkReadMany(openStore storeOpenFn, hashes hashSlice, src *dataSource,
if len(batch) > 0 {
chunkChan := make(chan *chunks.Chunk, len(batch))
err := store.GetMany(context.Background(), batch.HashSet(), func(c *chunks.Chunk) { chunkChan <- c })
err := store.GetMany(context.Background(), batch.HashSet(), func(ctx context.Context, c *chunks.Chunk) {
select {
case chunkChan <- c:
case <-ctx.Done():
}
})
assert.NoError(t, err)
close(chunkChan)
+1 -1
View File
@@ -46,7 +46,7 @@ func (fb fileBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, er
panic("not impl")
}
func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error {
func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
panic("not impl")
}
+1 -1
View File
@@ -39,7 +39,7 @@ func (nb nullBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, er
panic("not impl")
}
func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error {
func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
panic("not impl")
}
+6 -1
View File
@@ -223,7 +223,12 @@ func (suite *BlockStoreSuite) TestChunkStoreGetMany() {
}
chunkChan := make(chan *chunks.Chunk, len(hashes))
err = suite.store.GetMany(context.Background(), hashes.HashSet(), func(c *chunks.Chunk) { chunkChan <- c })
err = suite.store.GetMany(context.Background(), hashes.HashSet(), func(ctx context.Context, c *chunks.Chunk) {
select {
case chunkChan <- c:
case <-ctx.Done():
}
})
suite.NoError(err)
close(chunkChan)
+1 -1
View File
@@ -89,7 +89,7 @@ func (nbc *NomsBlockCache) Get(ctx context.Context, hash hash.Hash) (chunks.Chun
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
func (nbc *NomsBlockCache) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error {
func (nbc *NomsBlockCache) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
return nbc.chunks.GetMany(ctx, hashes, found)
}
+2 -2
View File
@@ -48,7 +48,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
found := make([]CompressedChunk, 0)
eg, egCtx := errgroup.WithContext(ctx)
_, err = tr.getManyCompressed(egCtx, eg, reqs, func(c CompressedChunk) { found = append(found, c) }, &Stats{})
_, err = tr.getManyCompressed(egCtx, eg, reqs, func(ctx context.Context, c CompressedChunk) { found = append(found, c) }, &Stats{})
require.NoError(t, err)
require.NoError(t, eg.Wait())
@@ -93,7 +93,7 @@ func readAllChunks(ctx context.Context, hashes hash.HashSet, reader tableReader)
reqs := toGetRecords(hashes)
found := make([]*chunks.Chunk, 0)
eg, ctx := errgroup.WithContext(ctx)
_, err := reader.getMany(ctx, eg, reqs, func(c *chunks.Chunk) { found = append(found, c) }, &Stats{})
_, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c *chunks.Chunk) { found = append(found, c) }, &Stats{})
if err != nil {
return nil, err
}
+7 -7
View File
@@ -68,17 +68,17 @@ func (gcs *GenerationalNBS) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
// GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks
// which have been found. Any non-present chunks will silently be ignored.
func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error {
func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
mu := &sync.Mutex{}
notInOldGen := hashes.Copy()
err := gcs.oldGen.GetMany(ctx, hashes, func(chunk *chunks.Chunk) {
err := gcs.oldGen.GetMany(ctx, hashes, func(ctx context.Context, chunk *chunks.Chunk) {
func() {
mu.Lock()
defer mu.Unlock()
delete(notInOldGen, chunk.Hash())
}()
found(chunk)
found(ctx, chunk)
})
if err != nil {
@@ -92,17 +92,17 @@ func (gcs *GenerationalNBS) GetMany(ctx context.Context, hashes hash.HashSet, fo
return gcs.newGen.GetMany(ctx, notInOldGen, found)
}
func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(CompressedChunk)) error {
func (gcs *GenerationalNBS) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
mu := &sync.Mutex{}
notInOldGen := hashes.Copy()
err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(chunk CompressedChunk) {
err := gcs.oldGen.GetManyCompressed(ctx, hashes, func(ctx context.Context, chunk CompressedChunk) {
func() {
mu.Lock()
defer mu.Unlock()
delete(notInOldGen, chunk.Hash())
}()
found(chunk)
found(ctx, chunk)
})
if err != nil {
@@ -223,7 +223,7 @@ func (gcs *GenerationalNBS) copyToOldGen(ctx context.Context, hashes hash.HashSe
}
var putErr error
err = gcs.newGen.GetMany(ctx, notInOldGen, func(chunk *chunks.Chunk) {
err = gcs.newGen.GetMany(ctx, notInOldGen, func(ctx context.Context, chunk *chunks.Chunk) {
if putErr == nil {
putErr = gcs.oldGen.Put(ctx, *chunk)
}
@@ -63,7 +63,7 @@ func hashesForChunks(chunks []chunks.Chunk, indexes map[int]bool) hash.HashSet {
type foundHashes hash.HashSet
func (fh foundHashes) found(chk *chunks.Chunk) {
func (fh foundHashes) found(ctx context.Context, chk *chunks.Chunk) {
fh[chk.Hash()] = struct{}{}
}
+4 -4
View File
@@ -138,13 +138,13 @@ func (mt *memTable) get(ctx context.Context, h addr, stats *Stats) ([]byte, erro
return mt.chunks[h], nil
}
func (mt *memTable) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(*chunks.Chunk), stats *Stats) (bool, error) {
func (mt *memTable) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
var remaining bool
for _, r := range reqs {
data := mt.chunks[*r.a]
if data != nil {
c := chunks.NewChunkWithHash(hash.Hash(*r.a), data)
found(&c)
found(ctx, &c)
} else {
remaining = true
}
@@ -152,13 +152,13 @@ func (mt *memTable) getMany(ctx context.Context, eg *errgroup.Group, reqs []getR
return remaining, nil
}
func (mt *memTable) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(CompressedChunk), stats *Stats) (bool, error) {
func (mt *memTable) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
var remaining bool
for _, r := range reqs {
data := mt.chunks[*r.a]
if data != nil {
c := chunks.NewChunkWithHash(hash.Hash(*r.a), data)
found(ChunkToCompressedChunk(c))
found(ctx, ChunkToCompressedChunk(c))
} else {
remaining = true
}
+2 -2
View File
@@ -264,7 +264,7 @@ func (crg chunkReaderGroup) hasMany(addrs []hasRecord) (bool, error) {
return true, nil
}
func (crg chunkReaderGroup) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(*chunks.Chunk), stats *Stats) (bool, error) {
func (crg chunkReaderGroup) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
for _, haver := range crg {
remaining, err := haver.getMany(ctx, eg, reqs, found, stats)
if err != nil {
@@ -277,7 +277,7 @@ func (crg chunkReaderGroup) getMany(ctx context.Context, eg *errgroup.Group, req
return true, nil
}
func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(CompressedChunk), stats *Stats) (bool, error) {
func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
for _, haver := range crg {
remaining, err := haver.getManyCompressed(ctx, eg, reqs, found, stats)
if err != nil {
+1 -1
View File
@@ -83,7 +83,7 @@ func (nbsMW *NBSMetricWrapper) PruneTableFiles(ctx context.Context) error {
// GetManyCompressed gets the compressed Chunks with |hashes| from the store. On return,
// |found| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
func (nbsMW *NBSMetricWrapper) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(CompressedChunk)) error {
func (nbsMW *NBSMetricWrapper) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
atomic.AddInt32(&nbsMW.TotalChunkGets, int32(len(hashes)))
return nbsMW.nbs.GetManyCompressed(ctx, hashes, found)
}
+4 -4
View File
@@ -134,7 +134,7 @@ func (ccs *persistingChunkSource) get(ctx context.Context, h addr, stats *Stats)
return cr.get(ctx, h, stats)
}
func (ccs *persistingChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(*chunks.Chunk), stats *Stats) (bool, error) {
func (ccs *persistingChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
cr := ccs.getReader()
if cr == nil {
return false, ErrNoReader
@@ -142,7 +142,7 @@ func (ccs *persistingChunkSource) getMany(ctx context.Context, eg *errgroup.Grou
return cr.getMany(ctx, eg, reqs, found, stats)
}
func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(CompressedChunk), stats *Stats) (bool, error) {
func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
cr := ccs.getReader()
if cr == nil {
return false, ErrNoReader
@@ -268,11 +268,11 @@ func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]by
return nil, nil
}
func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(*chunks.Chunk), stats *Stats) (bool, error) {
func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(CompressedChunk), stats *Stats) (bool, error) {
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
return true, nil
}
+6 -1
View File
@@ -119,7 +119,12 @@ func TestStats(t *testing.T) {
hashes[i] = c.Hash()
}
chunkChan := make(chan *chunks.Chunk, 3)
err = store.GetMany(context.Background(), hashes.HashSet(), func(c *chunks.Chunk) { chunkChan <- c })
err = store.GetMany(context.Background(), hashes.HashSet(), func(ctx context.Context, c *chunks.Chunk) {
select {
case chunkChan <- c:
case <-ctx.Done():
}
})
require.NoError(t, err)
assert.Equal(uint64(4), stats(store).FileReadLatency.Samples())
assert.Equal(uint64(54), stats(store).FileBytesPerRead.Sum())
+4 -4
View File
@@ -82,7 +82,7 @@ func makeGlobalCaches() {
type NBSCompressedChunkStore interface {
chunks.ChunkStore
GetManyCompressed(context.Context, hash.HashSet, func(CompressedChunk)) error
GetManyCompressed(context.Context, hash.HashSet, func(context.Context, CompressedChunk)) error
}
type NomsBlockStore struct {
@@ -625,7 +625,7 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
return chunks.EmptyChunk, nil
}
func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error {
func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
span, ctx := tracing.StartSpan(ctx, "nbs.GetMany")
span.LogKV("num_hashes", len(hashes))
defer func() {
@@ -636,7 +636,7 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
})
}
func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(CompressedChunk)) error {
func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, CompressedChunk)) error {
span, ctx := tracing.StartSpan(ctx, "nbs.GetManyCompressed")
span.LogKV("num_hashes", len(hashes))
defer func() {
@@ -1486,7 +1486,7 @@ LOOP:
var addErr error
mu := new(sync.Mutex)
hashset := hash.NewHashSet(hs...)
err := nbs.GetManyCompressed(ctx, hashset, func(c CompressedChunk) {
err := nbs.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) {
mu.Lock()
defer mu.Unlock()
if addErr != nil {
+4 -4
View File
@@ -227,8 +227,8 @@ type chunkReader interface {
has(h addr) (bool, error)
hasMany(addrs []hasRecord) (bool, error)
get(ctx context.Context, h addr, stats *Stats) ([]byte, error)
getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(*chunks.Chunk), stats *Stats) (bool, error)
getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(CompressedChunk), stats *Stats) (bool, error)
getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error)
getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error)
extract(ctx context.Context, chunks chan<- extractRecord) error
count() (uint32, error)
uncompressedLen() (uint64, error)
@@ -243,14 +243,14 @@ type chunkReadPlanner interface {
ctx context.Context,
eg *errgroup.Group,
offsetRecords offsetRecSlice,
found func(*chunks.Chunk),
found func(context.Context, *chunks.Chunk),
stats *Stats,
) error
getManyCompressedAtOffsets(
ctx context.Context,
eg *errgroup.Group,
offsetRecords offsetRecSlice,
found func(CompressedChunk),
found func(context.Context, CompressedChunk),
stats *Stats,
) error
}
+12 -12
View File
@@ -675,11 +675,11 @@ var _ chunkReader = tableReader{}
func (tr tableReader) readCompressedAtOffsets(
ctx context.Context,
rb readBatch,
found func(CompressedChunk),
found func(context.Context, CompressedChunk),
stats *Stats,
) error {
return tr.readAtOffsetsWithCB(ctx, rb, stats, func(cmp CompressedChunk) error {
found(cmp)
return tr.readAtOffsetsWithCB(ctx, rb, stats, func(ctx context.Context, cmp CompressedChunk) error {
found(ctx, cmp)
return nil
})
}
@@ -687,17 +687,17 @@ func (tr tableReader) readCompressedAtOffsets(
func (tr tableReader) readAtOffsets(
ctx context.Context,
rb readBatch,
found func(*chunks.Chunk),
found func(context.Context, *chunks.Chunk),
stats *Stats,
) error {
return tr.readAtOffsetsWithCB(ctx, rb, stats, func(cmp CompressedChunk) error {
return tr.readAtOffsetsWithCB(ctx, rb, stats, func(ctx context.Context, cmp CompressedChunk) error {
chk, err := cmp.ToChunk()
if err != nil {
return err
}
found(&chk)
found(ctx, &chk)
return nil
})
}
@@ -706,7 +706,7 @@ func (tr tableReader) readAtOffsetsWithCB(
ctx context.Context,
rb readBatch,
stats *Stats,
cb func(cmp CompressedChunk) error,
cb func(ctx context.Context, cmp CompressedChunk) error,
) error {
readLength := rb.End() - rb.Start()
buff := make([]byte, readLength)
@@ -726,7 +726,7 @@ func (tr tableReader) readAtOffsetsWithCB(
return err
}
err = cb(cmp)
err = cb(ctx, cmp)
if err != nil {
return err
}
@@ -741,7 +741,7 @@ func (tr tableReader) getMany(
ctx context.Context,
eg *errgroup.Group,
reqs []getRecord,
found func(*chunks.Chunk),
found func(context.Context, *chunks.Chunk),
stats *Stats) (bool, error) {
// Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set
@@ -750,7 +750,7 @@ func (tr tableReader) getMany(
err := tr.getManyAtOffsets(ctx, eg, offsetRecords, found, stats)
return remaining, err
}
func (tr tableReader) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(CompressedChunk), stats *Stats) (bool, error) {
func (tr tableReader) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
// Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set
// of table locations which must be read in order to satisfy the getMany operation.
offsetRecords, remaining := tr.findOffsets(reqs)
@@ -758,7 +758,7 @@ func (tr tableReader) getManyCompressed(ctx context.Context, eg *errgroup.Group,
return remaining, err
}
func (tr tableReader) getManyCompressedAtOffsets(ctx context.Context, eg *errgroup.Group, offsetRecords offsetRecSlice, found func(CompressedChunk), stats *Stats) error {
func (tr tableReader) getManyCompressedAtOffsets(ctx context.Context, eg *errgroup.Group, offsetRecords offsetRecSlice, found func(context.Context, CompressedChunk), stats *Stats) error {
return tr.getManyAtOffsetsWithReadFunc(ctx, eg, offsetRecords, stats, func(
ctx context.Context,
rb readBatch,
@@ -771,7 +771,7 @@ func (tr tableReader) getManyAtOffsets(
ctx context.Context,
eg *errgroup.Group,
offsetRecords offsetRecSlice,
found func(*chunks.Chunk),
found func(context.Context, *chunks.Chunk),
stats *Stats,
) error {
return tr.getManyAtOffsetsWithReadFunc(ctx, eg, offsetRecords, stats, func(
+2 -2
View File
@@ -133,7 +133,7 @@ func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error
return f(ts.upstream)
}
func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(*chunks.Chunk), stats *Stats) (remaining bool, err error) {
func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (remaining bool, err error) {
f := func(css chunkSources) bool {
for _, haver := range css {
if rp, ok := haver.(chunkReadPlanner); ok {
@@ -161,7 +161,7 @@ func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRe
return f(ts.novel) && err == nil && f(ts.upstream), err
}
func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(CompressedChunk), stats *Stats) (remaining bool, err error) {
func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (remaining bool, err error) {
f := func(css chunkSources) bool {
for _, haver := range css {
if rp, ok := haver.(chunkReadPlanner); ok {
+2 -2
View File
@@ -216,7 +216,7 @@ func TestGetMany(t *testing.T) {
eg, ctx := errgroup.WithContext(context.Background())
got := make([]*chunks.Chunk, 0)
_, err = tr.getMany(ctx, eg, getBatch, func(c *chunks.Chunk) { got = append(got, c) }, &Stats{})
_, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c *chunks.Chunk) { got = append(got, c) }, &Stats{})
require.NoError(t, err)
require.NoError(t, eg.Wait())
@@ -375,7 +375,7 @@ func doTestNGetMany(t *testing.T, count int) {
eg, ctx := errgroup.WithContext(context.Background())
got := make([]*chunks.Chunk, 0)
_, err = tr.getMany(ctx, eg, getBatch, func(c *chunks.Chunk) { got = append(got, c) }, &Stats{})
_, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c *chunks.Chunk) { got = append(got, c) }, &Stats{})
require.NoError(t, err)
require.NoError(t, eg.Wait())
+1 -1
View File
@@ -267,7 +267,7 @@ func (lvs *ValueStore) ReadManyValues(ctx context.Context, hashes hash.HashSlice
if len(remaining) != 0 {
mu := new(sync.Mutex)
var decodeErr error
err := lvs.cs.GetMany(ctx, remaining, func(c *chunks.Chunk) {
err := lvs.cs.GetMany(ctx, remaining, func(ctx context.Context, c *chunks.Chunk) {
mu.Lock()
defer mu.Unlock()
if decodeErr != nil {
+2 -2
View File
@@ -126,7 +126,7 @@ func (f *FileValueStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, er
}
// GetMany gets chunks by their hashes. Chunks that are found are written to the channel.
func (f *FileValueStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(*chunks.Chunk)) error {
func (f *FileValueStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error {
f.chunkLock.Lock()
defer f.chunkLock.Unlock()
@@ -135,7 +135,7 @@ func (f *FileValueStore) GetMany(ctx context.Context, hashes hash.HashSet, found
if ok {
ch := chunks.NewChunkWithHash(h, data)
found(&ch)
found(ctx, &ch)
}
}