mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-18 17:50:48 -05:00
store/{nbs,chunks}: Make ChunkStore#GetMany{,Compressed} take send-only channels.
This commit is contained in:
@@ -160,7 +160,7 @@ func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
|
||||
}
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
|
||||
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *chunks.Chunk) error {
|
||||
ae := atomicerr.New()
|
||||
wg := &sync.WaitGroup{}
|
||||
foundCmp := make(chan chunks.Chunkable, 1024)
|
||||
@@ -204,7 +204,7 @@ func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
|
||||
|
||||
// GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks
|
||||
// which have been found. Any non-present chunks will silently be ignored.
|
||||
func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error {
|
||||
func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan<- chunks.Chunkable) error {
|
||||
hashToChunk := dcs.cache.Get(hashes)
|
||||
|
||||
notCached := make([]hash.Hash, 0, len(hashes))
|
||||
@@ -323,7 +323,7 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (m
|
||||
return resourceToUrlAndRanges, nil
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.HashSet, notCached []hash.Hash, foundChunks chan chunks.Chunkable) error {
|
||||
func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.HashSet, notCached []hash.Hash, foundChunks chan<- chunks.Chunkable) error {
|
||||
// get the locations where the chunks can be downloaded from
|
||||
resourceToUrlAndRanges, err := dcs.getDLLocs(ctx, notCached)
|
||||
|
||||
|
||||
@@ -51,11 +51,11 @@ type ChunkStore interface {
|
||||
// GetMany gets the Chunks with |hashes| from the store. On return,
|
||||
// |foundChunks| will have been fully sent all chunks which have been
|
||||
// found. Any non-present chunks will silently be ignored.
|
||||
GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error
|
||||
GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *Chunk) error
|
||||
|
||||
// GetManyCompressed gets the Chunkable obects with |hashes| from the store. On return, |foundChunks| will have been
|
||||
// fully sent all chunks which have been found. Any non-present chunks will silently be ignored.
|
||||
GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan Chunkable) error
|
||||
GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan<- Chunkable) error
|
||||
|
||||
// Returns true iff the value at the address |h| is contained in the
|
||||
// store
|
||||
|
||||
@@ -130,7 +130,7 @@ func (ms *MemoryStoreView) Get(ctx context.Context, h hash.Hash) (Chunk, error)
|
||||
return ms.storage.Get(ctx, h)
|
||||
}
|
||||
|
||||
func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error {
|
||||
func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *Chunk) error {
|
||||
for h := range hashes {
|
||||
c, err := ms.Get(ctx, h)
|
||||
|
||||
@@ -146,7 +146,7 @@ func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, fou
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MemoryStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan Chunkable) error {
|
||||
func (ms *MemoryStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan<- Chunkable) error {
|
||||
for h := range hashes {
|
||||
c, err := ms.Get(ctx, h)
|
||||
|
||||
|
||||
@@ -63,12 +63,12 @@ func (s *TestStoreView) Get(ctx context.Context, h hash.Hash) (Chunk, error) {
|
||||
return s.ChunkStore.Get(ctx, h)
|
||||
}
|
||||
|
||||
func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error {
|
||||
func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *Chunk) error {
|
||||
s.Reads += len(hashes)
|
||||
return s.ChunkStore.GetMany(ctx, hashes, foundChunks)
|
||||
}
|
||||
|
||||
func (s *TestStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan Chunkable) error {
|
||||
func (s *TestStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan<- Chunkable) error {
|
||||
s.Reads += len(hashes)
|
||||
return s.ChunkStore.GetManyCompressed(ctx, hashes, foundCmpChunks)
|
||||
}
|
||||
|
||||
@@ -46,11 +46,11 @@ func (fb fileBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, er
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
|
||||
func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *chunks.Chunk) error {
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
func (fb fileBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error {
|
||||
func (fb fileBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan<- chunks.Chunkable) error {
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
|
||||
@@ -39,11 +39,11 @@ func (nb nullBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, er
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
|
||||
func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *chunks.Chunk) error {
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
func (nb nullBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error {
|
||||
func (nb nullBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan<- chunks.Chunkable) error {
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
|
||||
@@ -138,7 +138,7 @@ func (mt *memTable) get(ctx context.Context, h addr, stats *Stats) ([]byte, erro
|
||||
return mt.chunks[h], nil
|
||||
}
|
||||
|
||||
func (mt *memTable) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (mt *memTable) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
var remaining bool
|
||||
for _, r := range reqs {
|
||||
data := mt.chunks[*r.a]
|
||||
@@ -153,7 +153,7 @@ func (mt *memTable) getMany(ctx context.Context, reqs []getRecord, foundChunks c
|
||||
return remaining
|
||||
}
|
||||
|
||||
func (mt *memTable) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (mt *memTable) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
|
||||
@@ -265,7 +265,7 @@ func (crg chunkReaderGroup) hasMany(addrs []hasRecord) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (crg chunkReaderGroup) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (crg chunkReaderGroup) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
for _, haver := range crg {
|
||||
remaining := haver.getMany(ctx, reqs, foundChunks, wg, ae, stats)
|
||||
|
||||
@@ -277,7 +277,7 @@ func (crg chunkReaderGroup) getMany(ctx context.Context, reqs []getRecord, found
|
||||
return true
|
||||
}
|
||||
|
||||
func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
for _, haver := range crg {
|
||||
remaining := haver.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, stats)
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ func (ccs *persistingChunkSource) get(ctx context.Context, h addr, stats *Stats)
|
||||
return cr.get(ctx, h, stats)
|
||||
}
|
||||
|
||||
func (ccs *persistingChunkSource) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (ccs *persistingChunkSource) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
cr := ccs.getReader()
|
||||
|
||||
if cr == nil {
|
||||
@@ -134,7 +134,7 @@ func (ccs *persistingChunkSource) getMany(ctx context.Context, reqs []getRecord,
|
||||
return cr.getMany(ctx, reqs, foundChunks, wg, ae, stats)
|
||||
}
|
||||
|
||||
func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
cr := ccs.getReader()
|
||||
|
||||
if cr == nil {
|
||||
@@ -262,11 +262,11 @@ func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]by
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (ecs emptyChunkSource) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -404,13 +404,13 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
|
||||
return chunks.EmptyChunk, nil
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
|
||||
func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *chunks.Chunk) error {
|
||||
return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, reqs []getRecord, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
return cr.getMany(ctx, reqs, foundChunks, wg, ae, nbs.stats)
|
||||
})
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan chunks.Chunkable) error {
|
||||
func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan<- chunks.Chunkable) error {
|
||||
return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, reqs []getRecord, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
return cr.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, nbs.stats)
|
||||
})
|
||||
|
||||
@@ -227,8 +227,8 @@ type chunkReader interface {
|
||||
has(h addr) (bool, error)
|
||||
hasMany(addrs []hasRecord) (bool, error)
|
||||
get(ctx context.Context, h addr, stats *Stats) ([]byte, error)
|
||||
getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
|
||||
getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
|
||||
getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
|
||||
getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
|
||||
extract(ctx context.Context, chunks chan<- extractRecord) error
|
||||
count() (uint32, error)
|
||||
uncompressedLen() (uint64, error)
|
||||
@@ -240,7 +240,7 @@ type chunkReadPlanner interface {
|
||||
ctx context.Context,
|
||||
reqs []getRecord,
|
||||
offsetRecords offsetRecSlice,
|
||||
foundChunks chan *chunks.Chunk,
|
||||
foundChunks chan<- *chunks.Chunk,
|
||||
wg *sync.WaitGroup,
|
||||
ae *atomicerr.AtomicError,
|
||||
stats *Stats,
|
||||
@@ -249,7 +249,7 @@ type chunkReadPlanner interface {
|
||||
ctx context.Context,
|
||||
reqs []getRecord,
|
||||
offsetRecords offsetRecSlice,
|
||||
foundCmpChunks chan chunks.Chunkable,
|
||||
foundCmpChunks chan<- chunks.Chunkable,
|
||||
wg *sync.WaitGroup,
|
||||
ae *atomicerr.AtomicError,
|
||||
stats *Stats,
|
||||
|
||||
@@ -383,7 +383,7 @@ func (tr tableReader) readCompressedAtOffsets(
|
||||
readStart, readEnd uint64,
|
||||
reqs []getRecord,
|
||||
offsets offsetRecSlice,
|
||||
foundCmpChunks chan chunks.Chunkable,
|
||||
foundCmpChunks chan<- chunks.Chunkable,
|
||||
stats *Stats,
|
||||
) error {
|
||||
return tr.readAtOffsetsWithCB(ctx, readStart, readEnd, reqs, offsets, stats, func(cmp CompressedChunk) error {
|
||||
@@ -397,7 +397,7 @@ func (tr tableReader) readAtOffsets(
|
||||
readStart, readEnd uint64,
|
||||
reqs []getRecord,
|
||||
offsets offsetRecSlice,
|
||||
foundChunks chan *chunks.Chunk,
|
||||
foundChunks chan<- *chunks.Chunk,
|
||||
stats *Stats,
|
||||
) error {
|
||||
return tr.readAtOffsetsWithCB(ctx, readStart, readEnd, reqs, offsets, stats, func(cmp CompressedChunk) error {
|
||||
@@ -466,7 +466,7 @@ func (tr tableReader) readAtOffsetsWithCB(
|
||||
func (tr tableReader) getMany(
|
||||
ctx context.Context,
|
||||
reqs []getRecord,
|
||||
foundChunks chan *chunks.Chunk,
|
||||
foundChunks chan<- *chunks.Chunk,
|
||||
wg *sync.WaitGroup,
|
||||
ae *atomicerr.AtomicError,
|
||||
stats *Stats) bool {
|
||||
@@ -477,7 +477,7 @@ func (tr tableReader) getMany(
|
||||
tr.getManyAtOffsets(ctx, reqs, offsetRecords, foundChunks, wg, ae, stats)
|
||||
return remaining
|
||||
}
|
||||
func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
// Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set
|
||||
// of table locations which must be read in order to satisfy the getMany operation.
|
||||
offsetRecords, remaining := tr.findOffsets(reqs)
|
||||
@@ -485,7 +485,7 @@ func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, f
|
||||
return remaining
|
||||
}
|
||||
|
||||
func (tr tableReader) getManyCompressedAtOffsets(ctx context.Context, reqs []getRecord, offsetRecords offsetRecSlice, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) {
|
||||
func (tr tableReader) getManyCompressedAtOffsets(ctx context.Context, reqs []getRecord, offsetRecords offsetRecSlice, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) {
|
||||
tr.getManyAtOffsetsWithReadFunc(ctx, reqs, offsetRecords, wg, ae, stats, func(
|
||||
ctx context.Context,
|
||||
readStart, readEnd uint64,
|
||||
@@ -501,7 +501,7 @@ func (tr tableReader) getManyAtOffsets(
|
||||
ctx context.Context,
|
||||
reqs []getRecord,
|
||||
offsetRecords offsetRecSlice,
|
||||
foundChunks chan *chunks.Chunk,
|
||||
foundChunks chan<- *chunks.Chunk,
|
||||
wg *sync.WaitGroup,
|
||||
ae *atomicerr.AtomicError,
|
||||
stats *Stats,
|
||||
|
||||
@@ -131,7 +131,7 @@ func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error
|
||||
return f(ts.upstream)
|
||||
}
|
||||
|
||||
func (ts tableSet) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (ts tableSet) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
f := func(css chunkSources) bool {
|
||||
for _, haver := range css {
|
||||
if ae.IsSet() {
|
||||
@@ -163,7 +163,7 @@ func (ts tableSet) getMany(ctx context.Context, reqs []getRecord, foundChunks ch
|
||||
return f(ts.novel) && f(ts.upstream)
|
||||
}
|
||||
|
||||
func (ts tableSet) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
func (ts tableSet) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
|
||||
f := func(css chunkSources) bool {
|
||||
for _, haver := range css {
|
||||
if ae.IsSet() {
|
||||
|
||||
Reference in New Issue
Block a user