mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-06 08:50:04 -06:00
Merge pull request #9929 from dolthub/macneale4/get-records-range-duplication
[no-release-notes] get records range duplication
This commit is contained in:
@@ -87,16 +87,20 @@ func (acs archiveChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior
|
|||||||
return res, gcBehavior_Continue, nil
|
return res, gcBehavior_Continue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (acs archiveChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) {
|
func (acs archiveChunkSource) hasMany(records []hasRecord, keeper keeperF) (bool, gcBehavior, error) {
|
||||||
// single threaded first pass.
|
// single threaded first pass.
|
||||||
foundAll := true
|
foundAll := true
|
||||||
for i, addr := range addrs {
|
for i, req := range records {
|
||||||
h := *addr.a
|
if req.has {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
h := *req.a
|
||||||
if acs.aRdr.has(h) {
|
if acs.aRdr.has(h) {
|
||||||
if keeper != nil && keeper(h) {
|
if keeper != nil && keeper(h) {
|
||||||
return false, gcBehavior_Block, nil
|
return false, gcBehavior_Block, nil
|
||||||
}
|
}
|
||||||
addrs[i].has = true
|
records[i].has = true
|
||||||
} else {
|
} else {
|
||||||
foundAll = false
|
foundAll = false
|
||||||
}
|
}
|
||||||
@@ -115,10 +119,13 @@ func (acs archiveChunkSource) get(ctx context.Context, h hash.Hash, keeper keepe
|
|||||||
return res, gcBehavior_Continue, nil
|
return res, gcBehavior_Continue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
|
func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, records []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
|
||||||
// single threaded first pass.
|
// single threaded first pass.
|
||||||
foundAll := true
|
foundAll := true
|
||||||
for i, req := range reqs {
|
for i, req := range records {
|
||||||
|
if req.found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
h := *req.a
|
h := *req.a
|
||||||
data, err := acs.aRdr.get(ctx, h, stats)
|
data, err := acs.aRdr.get(ctx, h, stats)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -132,7 +139,7 @@ func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, r
|
|||||||
}
|
}
|
||||||
chunk := chunks.NewChunk(data)
|
chunk := chunks.NewChunk(data)
|
||||||
found(ctx, &chunk)
|
found(ctx, &chunk)
|
||||||
reqs[i].found = true
|
records[i].found = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return !foundAll, gcBehavior_Continue, nil
|
return !foundAll, gcBehavior_Continue, nil
|
||||||
@@ -188,15 +195,20 @@ func (acs archiveChunkSource) clone() (chunkSource, error) {
|
|||||||
return archiveChunkSource{reader, acs.file}, nil
|
return archiveChunkSource{reader, acs.file}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (acs archiveChunkSource) getRecordRanges(_ context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
|
func (acs archiveChunkSource) getRecordRanges(_ context.Context, records []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
|
||||||
result := make(map[hash.Hash]Range, len(requests))
|
result := make(map[hash.Hash]Range, len(records))
|
||||||
for _, req := range requests {
|
for i, req := range records {
|
||||||
|
if req.found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
hAddr := *req.a
|
hAddr := *req.a
|
||||||
idx := acs.aRdr.search(hAddr)
|
idx := acs.aRdr.search(hAddr)
|
||||||
if idx < 0 {
|
if idx < 0 {
|
||||||
// Chunk not found.
|
// Chunk not found.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
records[i].found = true
|
||||||
|
|
||||||
if keeper != nil && keeper(hAddr) {
|
if keeper != nil && keeper(hAddr) {
|
||||||
return nil, gcBehavior_Block, nil
|
return nil, gcBehavior_Block, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -921,6 +921,58 @@ func TestArchiveConjoinAllMixedCompression(t *testing.T) {
|
|||||||
assert.Equal(t, expectedChunkCount, actualChunkCount, "Combined archive should have correct chunk count")
|
assert.Equal(t, expectedChunkCount, actualChunkCount, "Combined archive should have correct chunk count")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestArchiveGetRecordRanges(t *testing.T) {
|
||||||
|
// This test created two archives, with one chunk in common. Then we call `getRecordRanges` on both, and verify
|
||||||
|
// that we only get one range returned, based on which archive we call first.
|
||||||
|
|
||||||
|
sharedChunk := []byte{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}
|
||||||
|
sharedHash := hash.Of(sharedChunk)
|
||||||
|
chunks1 := [][]byte{
|
||||||
|
{10, 11, 12, 13, 14, 15, 16, 17, 18, 19},
|
||||||
|
sharedChunk,
|
||||||
|
}
|
||||||
|
firstHash := hash.Of(chunks1[0])
|
||||||
|
ar1, h1 := createTestArchiveWithHashes(t, chunks1, []hash.Hash{firstHash, sharedHash}, nil, "")
|
||||||
|
src1 := readersToSource([]archiveReader{ar1})[0]
|
||||||
|
|
||||||
|
chunks2 := [][]byte{
|
||||||
|
{20, 21, 22, 23, 24, 25, 26, 27, 28, 29},
|
||||||
|
sharedChunk,
|
||||||
|
}
|
||||||
|
firstHash = hash.Of(chunks2[0])
|
||||||
|
ar2, h2 := createTestArchiveWithHashes(t, chunks2, []hash.Hash{firstHash, sharedHash}, nil, "")
|
||||||
|
src2 := readersToSource([]archiveReader{ar2})[0]
|
||||||
|
|
||||||
|
records := make([]getRecord, 0, 3)
|
||||||
|
records = append(records, getRecord{a: &h1[0], prefix: h1[0].Prefix(), found: false})
|
||||||
|
records = append(records, getRecord{a: &h2[0], prefix: h2[0].Prefix(), found: false})
|
||||||
|
records = append(records, getRecord{a: &sharedHash, prefix: sharedHash.Prefix(), found: false})
|
||||||
|
|
||||||
|
rang1, _, err := src1.getRecordRanges(context.Background(), records, nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 2, len(rang1))
|
||||||
|
|
||||||
|
rang2, _, err := src2.getRecordRanges(context.Background(), records, nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, len(rang2))
|
||||||
|
_, ok := rang2[sharedHash]
|
||||||
|
assert.False(t, ok)
|
||||||
|
|
||||||
|
// reset records, and search in reverse order.
|
||||||
|
for i := range records {
|
||||||
|
records[i].found = false
|
||||||
|
}
|
||||||
|
rang1, _, err = src2.getRecordRanges(context.Background(), records, nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 2, len(rang1))
|
||||||
|
|
||||||
|
rang2, _, err = src1.getRecordRanges(context.Background(), records, nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, len(rang2))
|
||||||
|
_, ok = rang2[sharedHash]
|
||||||
|
assert.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
func TestArchiveConjoinAllComprehensive(t *testing.T) {
|
func TestArchiveConjoinAllComprehensive(t *testing.T) {
|
||||||
rand.Seed(42)
|
rand.Seed(42)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user