mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-12 19:39:32 -05:00
Use 4Mb window. And clean up so many comments
This commit is contained in:
@@ -637,11 +637,7 @@ func (ar archiveReader) getMetadata(ctx context.Context, stats *Stats) ([]byte,
|
||||
}
|
||||
|
||||
func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error, stats *Stats) error {
|
||||
// ar.spanIndex already contains sorted end offsets of ByteSpans
|
||||
// spanIndex[i] is the end offset of ByteSpan ID (i+1)
|
||||
// We can use this directly for efficient offset lookup
|
||||
|
||||
// Build separate reverse indexes for dictionary and data ByteSpans
|
||||
// Build reverse indexes for dictionary and data ByteSpans
|
||||
// dictReverseIndex: Dictionary ByteSpan ID -> struct{} - indicates that we expect that span to be a dictionary.
|
||||
// dataReverseIndex: Data ByteSpan ID -> chunk ref index - indicates that we expect that span to be a data chunk,
|
||||
// and the value is the index into the chunkRefs slice where the chunk reference is stored.
|
||||
@@ -651,27 +647,25 @@ func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error
|
||||
for chunkRefIdx := uint32(0); chunkRefIdx < ar.footer.chunkCount; chunkRefIdx++ {
|
||||
dictId, dataId := ar.getChunkRef(int(chunkRefIdx))
|
||||
|
||||
// Add mapping for dictionary ByteSpan (if not null)
|
||||
if dictId != 0 {
|
||||
dictReverseIndex[dictId] = struct{}{}
|
||||
}
|
||||
|
||||
// Add mapping for data ByteSpan
|
||||
dataReverseIndex[dataId] = chunkRefIdx
|
||||
}
|
||||
|
||||
// Load data in 1MB chunks starting from the first byte of the data section
|
||||
const bufferSize = 1024 * 1024 // 1MB
|
||||
// Load data in 4MB chunks starting from the first byte of the data section
|
||||
const bufferSize = 4 * 1024 * 1024
|
||||
dataSpan := ar.footer.dataSpan()
|
||||
currentBlockStart := dataSpan.offset // This is 0 with all current archive formats. Probably won't ever change.
|
||||
|
||||
loadedDictionaries := make(map[uint32]*DecompBundle)
|
||||
loadedDictionaries := make(map[uint32]*gozstd.DDict)
|
||||
byteSpanCounter := uint32(1)
|
||||
|
||||
// Read the data block
|
||||
dataBlock := make([]byte, bufferSize)
|
||||
for currentBlockStart < (dataSpan.offset + dataSpan.length) {
|
||||
// Calculate how much data to read (up to 1MB or remaining data)
|
||||
// Calculate how much data to read (up tp bufferSize) from the current block.
|
||||
remainingData := dataSpan.offset + dataSpan.length - currentBlockStart
|
||||
readSize := bufferSize
|
||||
if remainingData < bufferSize {
|
||||
@@ -690,6 +684,10 @@ func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error
|
||||
currentBlockStart = blockEnd
|
||||
|
||||
for byteSpanCounter <= ar.footer.byteSpanCount {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
span := ar.getByteSpanByID(byteSpanCounter)
|
||||
|
||||
adjustedOffset := span.offset - blockStart
|
||||
@@ -707,10 +705,8 @@ func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failure creating dictionary from bytes: %w", err)
|
||||
}
|
||||
loadedDictionaries[byteSpanCounter] = dict
|
||||
goto NEXT
|
||||
loadedDictionaries[byteSpanCounter] = dict.dDict
|
||||
} else if _, exists := dataReverseIndex[byteSpanCounter]; exists {
|
||||
// Process data ByteSpan - determine compression type
|
||||
chunkId := dataReverseIndex[byteSpanCounter]
|
||||
dictId, dataId := ar.getChunkRef(int(chunkId))
|
||||
|
||||
@@ -745,13 +741,12 @@ func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error
|
||||
panic("Reverse Index incomplete: Dictionary ID not found in loaded dictionaries")
|
||||
}
|
||||
|
||||
chunkData, err = gozstd.DecompressDict(nil, spanData, dict.dDict)
|
||||
chunkData, err = gozstd.DecompressDict(nil, spanData, dict)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Create and process the chunk
|
||||
chk := chunks.NewChunkWithHash(h, chunkData)
|
||||
err = cb(chk)
|
||||
if err != nil {
|
||||
@@ -760,7 +755,6 @@ func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error
|
||||
} else {
|
||||
panic("Reverse Index incomplete: ByteSpan ID not found in either dictionary or data reverse index")
|
||||
}
|
||||
NEXT:
|
||||
byteSpanCounter++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -515,9 +515,9 @@ func (tr tableReader) getManyAtOffsetsWithReadFunc(
|
||||
offsetRecords offsetRecSlice,
|
||||
stats *Stats,
|
||||
readAtOffsets func(
|
||||
ctx context.Context,
|
||||
rb readBatch,
|
||||
stats *Stats) error,
|
||||
ctx context.Context,
|
||||
rb readBatch,
|
||||
stats *Stats) error,
|
||||
) error {
|
||||
batches := toReadBatches(offsetRecords, tr.blockSize)
|
||||
for i := range batches {
|
||||
@@ -786,17 +786,14 @@ func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build offset records similar to the extract method
|
||||
// Collect all chunk info then sort by offset.
|
||||
// The index is sorted by prefix, but we need to process chunkRecs in storage order (by offset)
|
||||
type chunkRecord struct {
|
||||
offset uint64
|
||||
length uint32
|
||||
hash hash.Hash
|
||||
}
|
||||
|
||||
chunkRecs := make([]chunkRecord, 0, count)
|
||||
|
||||
// First pass: collect all chunk info, and sort by offset.
|
||||
for i := uint32(0); i < count; i++ {
|
||||
var h hash.Hash
|
||||
ie, err := tr.idx.indexEntry(i, &h)
|
||||
@@ -817,12 +814,11 @@ func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks
|
||||
lastChunk := chunkRecs[len(chunkRecs)-1]
|
||||
totalDataSize := lastChunk.offset + uint64(lastChunk.length)
|
||||
|
||||
// Read data in 1MB chunkRecs
|
||||
const bufferSize = 1024 * 1024 // 1MB
|
||||
// Read data in 4MB chunkRecs
|
||||
const bufferSize = 4 * 1024 * 1024
|
||||
currentOffset := uint64(0)
|
||||
chunkIndex := 0
|
||||
|
||||
// Reuse buffer across reads
|
||||
dataBlock := make([]byte, bufferSize)
|
||||
|
||||
for chunkIndex < len(chunkRecs) {
|
||||
@@ -842,7 +838,7 @@ func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks
|
||||
blockStart := currentOffset
|
||||
blockEnd := currentOffset + uint64(readSize)
|
||||
|
||||
// Process all chunkRecs that are fully contained within this block
|
||||
// Process the chunks in the current block
|
||||
for chunkIndex < len(chunkRecs) {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
|
||||
Reference in New Issue
Block a user