Fix building largest tables first.

Also address several todos.
This commit is contained in:
Neil Macneale IV
2025-08-25 16:32:50 -07:00
parent 5141526baf
commit f3d2b9052f
6 changed files with 54 additions and 39 deletions
+2 -2
View File
@@ -35,10 +35,10 @@ import (
// reconstructHashFromPrefixAndSuffix creates a hash from a prefix and suffix
func reconstructHashFromPrefixAndSuffix(prefix uint64, suffix [hash.SuffixLen]byte) hash.Hash {
hashBytes := make([]byte, hash.ByteLen)
hashBytes := [hash.ByteLen]byte{}
binary.BigEndian.PutUint64(hashBytes[:hash.PrefixLen], prefix)
copy(hashBytes[hash.PrefixLen:], suffix[:])
return hash.New(hashBytes)
return hash.New(hashBytes[:])
}
// archiveReader is a reader for the archive format. We use primitive type slices where possible. These are read directly
+6 -6
View File
@@ -751,7 +751,7 @@ func TestArchiveConjoinAll(t *testing.T) {
awCombined := newArchiveWriterWithSink(writerCombined)
readers := []archiveReader{archiveReader1, archiveReader2}
err := awCombined.conjoinAll(context.Background(), readersToSource(readers))
err := awCombined.conjoinAll(context.Background(), readersToSource(readers), &Stats{})
assert.NoError(t, err)
theBytes := writerCombined.buff[:writerCombined.pos]
@@ -806,7 +806,7 @@ func TestArchiveConjoinAllDuplicateChunk(t *testing.T) {
awCombined := newArchiveWriterWithSink(writerCombined)
readers := []archiveReader{archiveReader1, archiveReader2}
err := awCombined.conjoinAll(context.Background(), readersToSource(readers))
err := awCombined.conjoinAll(context.Background(), readersToSource(readers), &Stats{})
assert.NoError(t, err)
theBytes := writerCombined.buff[:writerCombined.pos]
@@ -884,7 +884,7 @@ func TestArchiveConjoinAllMixedCompression(t *testing.T) {
awCombined := newArchiveWriterWithSink(writerCombined)
readers := []archiveReader{archiveReader1, archiveReader2}
err := awCombined.conjoinAll(context.Background(), readersToSource(readers))
err := awCombined.conjoinAll(context.Background(), readersToSource(readers), &Stats{})
assert.NoError(t, err)
theBytes := writerCombined.buff[:writerCombined.pos]
@@ -990,7 +990,7 @@ func TestArchiveConjoinAllComprehensive(t *testing.T) {
writer1 := NewFixedBufferByteSink(make([]byte, 65536))
aw1 := newArchiveWriterWithSink(writer1)
err := aw1.conjoinAll(context.Background(), readersToSource(readers))
err := aw1.conjoinAll(context.Background(), readersToSource(readers), &Stats{})
assert.NoError(t, err)
// Create first combined reader
@@ -1045,14 +1045,14 @@ func TestArchiveConjoinAllComprehensive(t *testing.T) {
allExpectedChunks = append(allExpectedChunks, archiveChunks...)
allExpectedHashes = append(allExpectedHashes, hashes...)
}
// Second conjoin: combine the first result with additional readers
allReadersForSecondConjoin := append([]archiveReader{combinedReader1}, additionalReaders...)
writer2 := NewFixedBufferByteSink(make([]byte, 131072))
aw2 := newArchiveWriterWithSink(writer2)
err = aw2.conjoinAll(context.Background(), readersToSource(allReadersForSecondConjoin))
err = aw2.conjoinAll(context.Background(), readersToSource(allReadersForSecondConjoin), &Stats{})
assert.NoError(t, err)
// Create final combined reader
+34 -27
View File
@@ -279,7 +279,7 @@ func (aw *archiveWriter) writeIndex() error {
}
}
// sort stagedChunks by hash.Prefix(). Note this isn't a perfect sort for hashes, we are just grouping them by prefix
// sort stagedChunks by hash. This is foundational to the archive format.
sort.Sort(aw.stagedChunks)
// We lay down the sorted chunk list in it's three forms.
@@ -304,8 +304,6 @@ func (aw *archiveWriter) writeIndex() error {
}
}
indexSize := aw.bytesWritten - indexStart
// Suffixes output. This data is used to create the name for this archive.
aw.output.ResetHasher()
for _, scr := range aw.stagedChunks {
@@ -315,11 +313,10 @@ func (aw *archiveWriter) writeIndex() error {
}
}
dataWritten := uint64(len(aw.stagedChunks)) * hash.SuffixLen
indexSize += dataWritten
aw.bytesWritten += dataWritten
aw.indexLen = aw.bytesWritten - indexStart
aw.suffixCheckSum = sha512Sum(aw.output.GetSum())
aw.indexLen = indexSize
aw.output.ResetHasher()
aw.workflowStage = stageMetadata
@@ -733,31 +730,31 @@ func (asw *ArchiveStreamWriter) convertSnappyAndStage(cc CompressedChunk) (uint3
//
// This method finalizes the index and footer. Effectively completes the in memory archive writing
// process, but does not write it to disk.
func (aw *archiveWriter) conjoinAll(ctx context.Context, sources []chunkSource) error {
func (aw *archiveWriter) conjoinAll(ctx context.Context, sources []chunkSource, stats *Stats) error {
if len(sources) < 2 {
return fmt.Errorf("conjoinAll requires at least 2 archive readers, got %d", len(sources))
}
srcCnts := make([]sourceWithSize, 0, len(sources))
srcSz := make([]sourceWithSize, 0, len(sources))
for _, src := range sources {
cnt, err := src.count()
if err != nil {
return fmt.Errorf("failed to count chunks in source %T: %w", src, err)
aSrc, ok := src.(archiveChunkSource)
if !ok {
return fmt.Errorf("runtime error: source %T is not an archiveChunkSource", src)
}
srcCnts = append(srcCnts, sourceWithSize{src, uint64(cnt)})
dataSpan := aSrc.aRdr.footer.dataSpan()
srcSz = append(srcSz, sourceWithSize{src, dataSpan.length})
}
// similar to cloud conjoin, we build the index first. It could come after in this case.
thePlan, err := planArchiveConjoin(srcCnts)
thePlan, err := planArchiveConjoin(srcSz, stats)
if err != nil {
return fmt.Errorf("failed to plan archive conjoin: %w", err)
}
stats := &Stats{}
// Now that we have the plan, we slam all datablocks into the output stream then write the index last.
for _, src := range sources {
aSrc, ok := src.(archiveChunkSource)
for _, src := range thePlan.sources.sws {
aSrc, ok := src.source.(archiveChunkSource)
if !ok {
return fmt.Errorf("runtime error: source %T is not an archiveChunkSource", src)
}
@@ -791,18 +788,23 @@ type tableChunkRecord struct {
hash hash.Hash
}
func planArchiveConjoin(sources []sourceWithSize) (compactionPlan, error) {
func planArchiveConjoin(sources []sourceWithSize, stats *Stats) (compactionPlan, error) {
if len(sources) < 2 {
return compactionPlan{}, fmt.Errorf("conjoinIndexes requires at least 2 archive readers, got %d", len(sources))
}
// place largest chunk sources at the beginning of the conjoin
orderedSrcs := chunkSourcesByDescendingDataSize{sws: sources}
// sort.Sort(orderedSrcs)
sources = nil
writer := NewBlockBufferByteSink(fourMb)
aw := newArchiveWriterWithSink(writer)
currentDataOffset := uint64(0)
chunkCounter := uint32(0)
for _, src := range sources {
for _, src := range orderedSrcs.sws {
reader := src.source
arcSrc, ok := reader.(archiveChunkSource)
if !ok {
@@ -816,11 +818,11 @@ func planArchiveConjoin(sources []sourceWithSize) (compactionPlan, error) {
if err != nil {
return compactionPlan{}, err
}
chunks := index.chunkCount()
chunkCounter += chunks
chks := index.chunkCount()
chunkCounter += chks
chunkRecs := make([]tableChunkRecord, 0, chunks)
for i := uint32(0); i < chunks; i++ {
chunkRecs := make([]tableChunkRecord, 0, chks)
for i := uint32(0); i < chks; i++ {
var h hash.Hash
ie, err := index.indexEntry(i, &h)
if err != nil {
@@ -895,6 +897,8 @@ func planArchiveConjoin(sources []sourceWithSize) (compactionPlan, error) {
}
aw.bytesWritten = currentDataOffset
// Preserve this for stat reporting as aw.bytesWritten will be updated as we write the index.
dataBlocksLen := currentDataOffset
// The conjoin process is a little different from the normal archive writing process. We manually stick everything
// into the writer, and then finalize the index and footer at the end. The datablocks will be written in separately
@@ -918,11 +922,14 @@ func planArchiveConjoin(sources []sourceWithSize) (compactionPlan, error) {
return compactionPlan{}, fmt.Errorf("failed to build index buffer while conjoining archives: %w", err)
}
stats.BytesPerConjoin.Sample(dataBlocksLen + uint64(len(buf.Bytes())))
return compactionPlan{
sources: chunkSourcesByDescendingDataSize{sources},
name: name,
suffix: ArchiveFileSuffix,
mergedIndex: buf.Bytes(),
chunkCount: chunkCounter,
sources: orderedSrcs,
name: name,
suffix: ArchiveFileSuffix,
mergedIndex: buf.Bytes(),
chunkCount: chunkCounter,
totalCompressedData: dataBlocksLen,
}, nil
}
+3 -1
View File
@@ -424,7 +424,9 @@ func (mp manualPart) readFull(ctx context.Context, buff []byte) error {
// dividePlan assumes that plan.sources (which is of type chunkSourcesByDescendingDataSize) is correctly sorted by descending data size.
//
// NM4 - There is more to this interface. gDoc.
// This function divides |plan.sources| into two groups: those with enough chunk data to use S3's UploadPartCopy API (copies) and those without (manuals).
// The ordering of the parts is how we will upload them to S3, and the manual parts will be prefixed to the index, thus
// keeping |plan.sources| in the correct order so the index is correct.
func dividePlan(plan compactionPlan, minPartSize, maxPartSize uint64) (copies []copyPart, manuals []manualPart, buff []byte, err error) {
// NB: if maxPartSize < 2*minPartSize, splitting large copies apart isn't solvable. S3's limits are plenty far enough apart that this isn't a problem in production, but we could violate this in tests.
if maxPartSize < 2*minPartSize {
+7 -1
View File
@@ -94,7 +94,13 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour
sized = append(sized, sourceWithSize{src, src.currentSize()})
}
// NM4 - error out when we run into archives here.....
// Currently, archive tables are not supported in blobstorePersister.
for _, s := range sized {
_, ok := s.source.(archiveChunkSource)
if ok {
return nil, nil, errors.New("archive tables not supported in blobstorePersister")
}
}
plan, err := planTableConjoin(sized, stats)
if err != nil {
+2 -2
View File
@@ -116,7 +116,7 @@ type compactionPlan struct {
suffix string
mergedIndex []byte
chunkCount uint32
totalCompressedData uint64 // This is currently only used for stats and logging. Ignoring for archives. NM4.
totalCompressedData uint64
}
const (
@@ -155,7 +155,7 @@ func planRangeCopyConjoin(sources chunkSources, stats *Stats) (compactionPlan, e
switch mode {
case conjoinModeArchive:
return planArchiveConjoin(sized)
return planArchiveConjoin(sized, stats)
case conjoinModeTable:
return planTableConjoin(sized, stats)
default: