diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index b35af5cc69..dd34b42f96 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -160,7 +160,7 @@ func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, } } -func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error { +func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *chunks.Chunk) error { ae := atomicerr.New() wg := &sync.WaitGroup{} foundCmp := make(chan chunks.Chunkable, 1024) @@ -204,7 +204,7 @@ func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, fou // GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks // which have been found. Any non-present chunks will silently be ignored. -func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error { +func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan<- chunks.Chunkable) error { hashToChunk := dcs.cache.Get(hashes) notCached := make([]hash.Hash, 0, len(hashes)) @@ -323,7 +323,7 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (m return resourceToUrlAndRanges, nil } -func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.HashSet, notCached []hash.Hash, foundChunks chan chunks.Chunkable) error { +func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.HashSet, notCached []hash.Hash, foundChunks chan<- chunks.Chunkable) error { // get the locations where the chunks can be downloaded from resourceToUrlAndRanges, err := dcs.getDLLocs(ctx, notCached) diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index 3bfc8985ce..9e1301a292 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -51,11 +51,11 @@ type ChunkStore interface { // GetMany gets the Chunks with |hashes| from the store. On return, // |foundChunks| will have been fully sent all chunks which have been // found. Any non-present chunks will silently be ignored. - GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error + GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *Chunk) error // GetManyCompressed gets the Chunkable obects with |hashes| from the store. On return, |foundChunks| will have been // fully sent all chunks which have been found. Any non-present chunks will silently be ignored. - GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan Chunkable) error + GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan<- Chunkable) error // Returns true iff the value at the address |h| is contained in the // store diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 15be44992d..8da24349e4 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -130,7 +130,7 @@ func (ms *MemoryStoreView) Get(ctx context.Context, h hash.Hash) (Chunk, error) return ms.storage.Get(ctx, h) } -func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error { +func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *Chunk) error { for h := range hashes { c, err := ms.Get(ctx, h) @@ -146,7 +146,7 @@ func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, fou return nil } -func (ms *MemoryStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan Chunkable) error { +func (ms *MemoryStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan<- Chunkable) error { for h := range hashes { c, err := ms.Get(ctx, h) diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index fd6335435b..92ddeb3e4a 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -63,12 +63,12 @@ func (s *TestStoreView) Get(ctx context.Context, h hash.Hash) (Chunk, error) { return s.ChunkStore.Get(ctx, h) } -func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error { +func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *Chunk) error { s.Reads += len(hashes) return s.ChunkStore.GetMany(ctx, hashes, foundChunks) } -func (s *TestStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan Chunkable) error { +func (s *TestStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan<- Chunkable) error { s.Reads += len(hashes) return s.ChunkStore.GetManyCompressed(ctx, hashes, foundCmpChunks) } diff --git a/go/store/nbs/benchmarks/file_block_store.go b/go/store/nbs/benchmarks/file_block_store.go index 891f4864fb..60ff4ca866 100644 --- a/go/store/nbs/benchmarks/file_block_store.go +++ b/go/store/nbs/benchmarks/file_block_store.go @@ -46,11 +46,11 @@ func (fb fileBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, er panic("not impl") } -func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error { +func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *chunks.Chunk) error { panic("not impl") } -func (fb fileBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error { +func (fb fileBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan<- chunks.Chunkable) error { panic("not impl") } diff --git a/go/store/nbs/benchmarks/null_block_store.go b/go/store/nbs/benchmarks/null_block_store.go index 541f558cee..f6a2b8cf3e 100644 --- a/go/store/nbs/benchmarks/null_block_store.go +++ b/go/store/nbs/benchmarks/null_block_store.go @@ -39,11 +39,11 @@ func (nb nullBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, er panic("not impl") } -func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error { +func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *chunks.Chunk) error { panic("not impl") } -func (nb nullBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error { +func (nb nullBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan<- chunks.Chunkable) error { panic("not impl") } diff --git a/go/store/nbs/mem_table.go b/go/store/nbs/mem_table.go index 95d517add0..6c66b7a98a 100644 --- a/go/store/nbs/mem_table.go +++ b/go/store/nbs/mem_table.go @@ -138,7 +138,7 @@ func (mt *memTable) get(ctx context.Context, h addr, stats *Stats) ([]byte, erro return mt.chunks[h], nil } -func (mt *memTable) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (mt *memTable) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { var remaining bool for _, r := range reqs { data := mt.chunks[*r.a] @@ -153,7 +153,7 @@ func (mt *memTable) getMany(ctx context.Context, reqs []getRecord, foundChunks c return remaining } -func (mt *memTable) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (mt *memTable) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { panic("not implemented") } diff --git a/go/store/nbs/mem_table_test.go b/go/store/nbs/mem_table_test.go index 0c89239ab0..bcd659ded4 100644 --- a/go/store/nbs/mem_table_test.go +++ b/go/store/nbs/mem_table_test.go @@ -265,7 +265,7 @@ func (crg chunkReaderGroup) hasMany(addrs []hasRecord) (bool, error) { return true, nil } -func (crg chunkReaderGroup) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (crg chunkReaderGroup) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { for _, haver := range crg { remaining := haver.getMany(ctx, reqs, foundChunks, wg, ae, stats) @@ -277,7 +277,7 @@ func (crg chunkReaderGroup) getMany(ctx context.Context, reqs []getRecord, found return true } -func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { for _, haver := range crg { remaining := haver.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, stats) diff --git a/go/store/nbs/persisting_chunk_source.go b/go/store/nbs/persisting_chunk_source.go index a2405fe526..f11d7c84bd 100644 --- a/go/store/nbs/persisting_chunk_source.go +++ b/go/store/nbs/persisting_chunk_source.go @@ -123,7 +123,7 @@ func (ccs *persistingChunkSource) get(ctx context.Context, h addr, stats *Stats) return cr.get(ctx, h, stats) } -func (ccs *persistingChunkSource) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (ccs *persistingChunkSource) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { cr := ccs.getReader() if cr == nil { @@ -134,7 +134,7 @@ func (ccs *persistingChunkSource) getMany(ctx context.Context, reqs []getRecord, return cr.getMany(ctx, reqs, foundChunks, wg, ae, stats) } -func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { cr := ccs.getReader() if cr == nil { @@ -262,11 +262,11 @@ func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]by return nil, nil } -func (ecs emptyChunkSource) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (ecs emptyChunkSource) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { return true } -func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { return true } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index c101cd0e97..238518c1d3 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -404,13 +404,13 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, return chunks.EmptyChunk, nil } -func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error { +func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan<- *chunks.Chunk) error { return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, reqs []getRecord, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { return cr.getMany(ctx, reqs, foundChunks, wg, ae, nbs.stats) }) } -func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan chunks.Chunkable) error { +func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan<- chunks.Chunkable) error { return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, reqs []getRecord, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { return cr.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, nbs.stats) }) diff --git a/go/store/nbs/table.go b/go/store/nbs/table.go index b8390ff8cd..3f453cf964 100644 --- a/go/store/nbs/table.go +++ b/go/store/nbs/table.go @@ -227,8 +227,8 @@ type chunkReader interface { has(h addr) (bool, error) hasMany(addrs []hasRecord) (bool, error) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) - getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool - getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool + getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool + getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool extract(ctx context.Context, chunks chan<- extractRecord) error count() (uint32, error) uncompressedLen() (uint64, error) @@ -240,7 +240,7 @@ type chunkReadPlanner interface { ctx context.Context, reqs []getRecord, offsetRecords offsetRecSlice, - foundChunks chan *chunks.Chunk, + foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats, @@ -249,7 +249,7 @@ type chunkReadPlanner interface { ctx context.Context, reqs []getRecord, offsetRecords offsetRecSlice, - foundCmpChunks chan chunks.Chunkable, + foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats, diff --git a/go/store/nbs/table_reader.go b/go/store/nbs/table_reader.go index 8823cea3ea..e2a0dc7d25 100644 --- a/go/store/nbs/table_reader.go +++ b/go/store/nbs/table_reader.go @@ -383,7 +383,7 @@ func (tr tableReader) readCompressedAtOffsets( readStart, readEnd uint64, reqs []getRecord, offsets offsetRecSlice, - foundCmpChunks chan chunks.Chunkable, + foundCmpChunks chan<- chunks.Chunkable, stats *Stats, ) error { return tr.readAtOffsetsWithCB(ctx, readStart, readEnd, reqs, offsets, stats, func(cmp CompressedChunk) error { @@ -397,7 +397,7 @@ func (tr tableReader) readAtOffsets( readStart, readEnd uint64, reqs []getRecord, offsets offsetRecSlice, - foundChunks chan *chunks.Chunk, + foundChunks chan<- *chunks.Chunk, stats *Stats, ) error { return tr.readAtOffsetsWithCB(ctx, readStart, readEnd, reqs, offsets, stats, func(cmp CompressedChunk) error { @@ -466,7 +466,7 @@ func (tr tableReader) readAtOffsetsWithCB( func (tr tableReader) getMany( ctx context.Context, reqs []getRecord, - foundChunks chan *chunks.Chunk, + foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { @@ -477,7 +477,7 @@ func (tr tableReader) getMany( tr.getManyAtOffsets(ctx, reqs, offsetRecords, foundChunks, wg, ae, stats) return remaining } -func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { // Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set // of table locations which must be read in order to satisfy the getMany operation. offsetRecords, remaining := tr.findOffsets(reqs) @@ -485,7 +485,7 @@ func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, f return remaining } -func (tr tableReader) getManyCompressedAtOffsets(ctx context.Context, reqs []getRecord, offsetRecords offsetRecSlice, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) { +func (tr tableReader) getManyCompressedAtOffsets(ctx context.Context, reqs []getRecord, offsetRecords offsetRecSlice, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) { tr.getManyAtOffsetsWithReadFunc(ctx, reqs, offsetRecords, wg, ae, stats, func( ctx context.Context, readStart, readEnd uint64, @@ -501,7 +501,7 @@ func (tr tableReader) getManyAtOffsets( ctx context.Context, reqs []getRecord, offsetRecords offsetRecSlice, - foundChunks chan *chunks.Chunk, + foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats, diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index 93ee946116..f4d2ed8f7f 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -131,7 +131,7 @@ func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error return f(ts.upstream) } -func (ts tableSet) getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (ts tableSet) getMany(ctx context.Context, reqs []getRecord, foundChunks chan<- *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { f := func(css chunkSources) bool { for _, haver := range css { if ae.IsSet() { @@ -163,7 +163,7 @@ func (ts tableSet) getMany(ctx context.Context, reqs []getRecord, foundChunks ch return f(ts.novel) && f(ts.upstream) } -func (ts tableSet) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { +func (ts tableSet) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan<- chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool { f := func(css chunkSources) bool { for _, haver := range css { if ae.IsSet() {