diff --git a/go/store/nbs/archive_reader.go b/go/store/nbs/archive_reader.go index f17b96abd3..5ed91c1fb0 100644 --- a/go/store/nbs/archive_reader.go +++ b/go/store/nbs/archive_reader.go @@ -35,10 +35,10 @@ import ( // reconstructHashFromPrefixAndSuffix creates a hash from a prefix and suffix func reconstructHashFromPrefixAndSuffix(prefix uint64, suffix [hash.SuffixLen]byte) hash.Hash { - hashBytes := make([]byte, hash.ByteLen) + hashBytes := [hash.ByteLen]byte{} binary.BigEndian.PutUint64(hashBytes[:hash.PrefixLen], prefix) copy(hashBytes[hash.PrefixLen:], suffix[:]) - return hash.New(hashBytes) + return hash.New(hashBytes[:]) } // archiveReader is a reader for the archive format. We use primitive type slices where possible. These are read directly diff --git a/go/store/nbs/archive_test.go b/go/store/nbs/archive_test.go index 43b8b0b5b0..98f664b75f 100644 --- a/go/store/nbs/archive_test.go +++ b/go/store/nbs/archive_test.go @@ -751,7 +751,7 @@ func TestArchiveConjoinAll(t *testing.T) { awCombined := newArchiveWriterWithSink(writerCombined) readers := []archiveReader{archiveReader1, archiveReader2} - err := awCombined.conjoinAll(context.Background(), readersToSource(readers)) + err := awCombined.conjoinAll(context.Background(), readersToSource(readers), &Stats{}) assert.NoError(t, err) theBytes := writerCombined.buff[:writerCombined.pos] @@ -806,7 +806,7 @@ func TestArchiveConjoinAllDuplicateChunk(t *testing.T) { awCombined := newArchiveWriterWithSink(writerCombined) readers := []archiveReader{archiveReader1, archiveReader2} - err := awCombined.conjoinAll(context.Background(), readersToSource(readers)) + err := awCombined.conjoinAll(context.Background(), readersToSource(readers), &Stats{}) assert.NoError(t, err) theBytes := writerCombined.buff[:writerCombined.pos] @@ -884,7 +884,7 @@ func TestArchiveConjoinAllMixedCompression(t *testing.T) { awCombined := newArchiveWriterWithSink(writerCombined) readers := []archiveReader{archiveReader1, archiveReader2} - err := awCombined.conjoinAll(context.Background(), readersToSource(readers)) + err := awCombined.conjoinAll(context.Background(), readersToSource(readers), &Stats{}) assert.NoError(t, err) theBytes := writerCombined.buff[:writerCombined.pos] @@ -990,7 +990,7 @@ func TestArchiveConjoinAllComprehensive(t *testing.T) { writer1 := NewFixedBufferByteSink(make([]byte, 65536)) aw1 := newArchiveWriterWithSink(writer1) - err := aw1.conjoinAll(context.Background(), readersToSource(readers)) + err := aw1.conjoinAll(context.Background(), readersToSource(readers), &Stats{}) assert.NoError(t, err) // Create first combined reader @@ -1045,14 +1045,14 @@ func TestArchiveConjoinAllComprehensive(t *testing.T) { allExpectedChunks = append(allExpectedChunks, archiveChunks...) allExpectedHashes = append(allExpectedHashes, hashes...) } - + // Second conjoin: combine the first result with additional readers allReadersForSecondConjoin := append([]archiveReader{combinedReader1}, additionalReaders...) writer2 := NewFixedBufferByteSink(make([]byte, 131072)) aw2 := newArchiveWriterWithSink(writer2) - err = aw2.conjoinAll(context.Background(), readersToSource(allReadersForSecondConjoin)) + err = aw2.conjoinAll(context.Background(), readersToSource(allReadersForSecondConjoin), &Stats{}) assert.NoError(t, err) // Create final combined reader diff --git a/go/store/nbs/archive_writer.go b/go/store/nbs/archive_writer.go index d39c94882d..b7863ddcff 100644 --- a/go/store/nbs/archive_writer.go +++ b/go/store/nbs/archive_writer.go @@ -279,7 +279,7 @@ func (aw *archiveWriter) writeIndex() error { } } - // sort stagedChunks by hash.Prefix(). Note this isn't a perfect sort for hashes, we are just grouping them by prefix + // sort stagedChunks by hash. This is foundational to the archive format. sort.Sort(aw.stagedChunks) // We lay down the sorted chunk list in it's three forms. @@ -304,8 +304,6 @@ func (aw *archiveWriter) writeIndex() error { } } - indexSize := aw.bytesWritten - indexStart - // Suffixes output. This data is used to create the name for this archive. aw.output.ResetHasher() for _, scr := range aw.stagedChunks { @@ -315,11 +313,10 @@ func (aw *archiveWriter) writeIndex() error { } } dataWritten := uint64(len(aw.stagedChunks)) * hash.SuffixLen - indexSize += dataWritten aw.bytesWritten += dataWritten + aw.indexLen = aw.bytesWritten - indexStart aw.suffixCheckSum = sha512Sum(aw.output.GetSum()) - aw.indexLen = indexSize aw.output.ResetHasher() aw.workflowStage = stageMetadata @@ -733,31 +730,31 @@ func (asw *ArchiveStreamWriter) convertSnappyAndStage(cc CompressedChunk) (uint3 // // This method finalizes the index and footer. Effectively completes the in memory archive writing // process, but does not write it to disk. -func (aw *archiveWriter) conjoinAll(ctx context.Context, sources []chunkSource) error { +func (aw *archiveWriter) conjoinAll(ctx context.Context, sources []chunkSource, stats *Stats) error { if len(sources) < 2 { return fmt.Errorf("conjoinAll requires at least 2 archive readers, got %d", len(sources)) } - srcCnts := make([]sourceWithSize, 0, len(sources)) + srcSz := make([]sourceWithSize, 0, len(sources)) for _, src := range sources { - cnt, err := src.count() - if err != nil { - return fmt.Errorf("failed to count chunks in source %T: %w", src, err) + aSrc, ok := src.(archiveChunkSource) + if !ok { + return fmt.Errorf("runtime error: source %T is not an archiveChunkSource", src) } - srcCnts = append(srcCnts, sourceWithSize{src, uint64(cnt)}) + dataSpan := aSrc.aRdr.footer.dataSpan() + + srcSz = append(srcSz, sourceWithSize{src, dataSpan.length}) } // similar to cloud conjoin, we build the index first. It could come after in this case. - thePlan, err := planArchiveConjoin(srcCnts) + thePlan, err := planArchiveConjoin(srcSz, stats) if err != nil { return fmt.Errorf("failed to plan archive conjoin: %w", err) } - stats := &Stats{} - // Now that we have the plan, we slam all datablocks into the output stream then write the index last. - for _, src := range sources { - aSrc, ok := src.(archiveChunkSource) + for _, src := range thePlan.sources.sws { + aSrc, ok := src.source.(archiveChunkSource) if !ok { return fmt.Errorf("runtime error: source %T is not an archiveChunkSource", src) } @@ -791,18 +788,23 @@ type tableChunkRecord struct { hash hash.Hash } -func planArchiveConjoin(sources []sourceWithSize) (compactionPlan, error) { +func planArchiveConjoin(sources []sourceWithSize, stats *Stats) (compactionPlan, error) { if len(sources) < 2 { return compactionPlan{}, fmt.Errorf("conjoinIndexes requires at least 2 archive readers, got %d", len(sources)) } + // place largest chunk sources at the beginning of the conjoin + orderedSrcs := chunkSourcesByDescendingDataSize{sws: sources} + // sort.Sort(orderedSrcs) + sources = nil + writer := NewBlockBufferByteSink(fourMb) aw := newArchiveWriterWithSink(writer) currentDataOffset := uint64(0) chunkCounter := uint32(0) - for _, src := range sources { + for _, src := range orderedSrcs.sws { reader := src.source arcSrc, ok := reader.(archiveChunkSource) if !ok { @@ -816,11 +818,11 @@ func planArchiveConjoin(sources []sourceWithSize) (compactionPlan, error) { if err != nil { return compactionPlan{}, err } - chunks := index.chunkCount() - chunkCounter += chunks + chks := index.chunkCount() + chunkCounter += chks - chunkRecs := make([]tableChunkRecord, 0, chunks) - for i := uint32(0); i < chunks; i++ { + chunkRecs := make([]tableChunkRecord, 0, chks) + for i := uint32(0); i < chks; i++ { var h hash.Hash ie, err := index.indexEntry(i, &h) if err != nil { @@ -895,6 +897,8 @@ func planArchiveConjoin(sources []sourceWithSize) (compactionPlan, error) { } aw.bytesWritten = currentDataOffset + // Preserve this for stat reporting as aw.bytesWritten will be updated as we write the index. + dataBlocksLen := currentDataOffset // The conjoin process is a little different from the normal archive writing process. We manually stick everything // into the writer, and then finalize the index and footer at the end. The datablocks will be written in separately @@ -918,11 +922,14 @@ func planArchiveConjoin(sources []sourceWithSize) (compactionPlan, error) { return compactionPlan{}, fmt.Errorf("failed to build index buffer while conjoining archives: %w", err) } + stats.BytesPerConjoin.Sample(dataBlocksLen + uint64(len(buf.Bytes()))) + return compactionPlan{ - sources: chunkSourcesByDescendingDataSize{sources}, - name: name, - suffix: ArchiveFileSuffix, - mergedIndex: buf.Bytes(), - chunkCount: chunkCounter, + sources: orderedSrcs, + name: name, + suffix: ArchiveFileSuffix, + mergedIndex: buf.Bytes(), + chunkCount: chunkCounter, + totalCompressedData: dataBlocksLen, }, nil } diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 296e1e19a1..c6fe961f78 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -424,7 +424,9 @@ func (mp manualPart) readFull(ctx context.Context, buff []byte) error { // dividePlan assumes that plan.sources (which is of type chunkSourcesByDescendingDataSize) is correctly sorted by descending data size. // -// NM4 - There is more to this interface. gDoc. +// This function divides |plan.sources| into two groups: those with enough chunk data to use S3's UploadPartCopy API (copies) and those without (manuals). +// The ordering of the parts is how we will upload them to S3, and the manual parts will be prefixed to the index, thus +// keeping |plan.sources| in the correct order so the index is correct. func dividePlan(plan compactionPlan, minPartSize, maxPartSize uint64) (copies []copyPart, manuals []manualPart, buff []byte, err error) { // NB: if maxPartSize < 2*minPartSize, splitting large copies apart isn't solvable. S3's limits are plenty far enough apart that this isn't a problem in production, but we could violate this in tests. if maxPartSize < 2*minPartSize { diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 8e784710cd..e093833842 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -94,7 +94,13 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour sized = append(sized, sourceWithSize{src, src.currentSize()}) } - // NM4 - error out when we run into archives here..... + // Currently, archive tables are not supported in blobstorePersister. + for _, s := range sized { + _, ok := s.source.(archiveChunkSource) + if ok { + return nil, nil, errors.New("archive tables not supported in blobstorePersister") + } + } plan, err := planTableConjoin(sized, stats) if err != nil { diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index 351e39e8b0..b9d6559ca8 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -116,7 +116,7 @@ type compactionPlan struct { suffix string mergedIndex []byte chunkCount uint32 - totalCompressedData uint64 // This is currently only used for stats and logging. Ignoring for archives. NM4. + totalCompressedData uint64 } const ( @@ -155,7 +155,7 @@ func planRangeCopyConjoin(sources chunkSources, stats *Stats) (compactionPlan, e switch mode { case conjoinModeArchive: - return planArchiveConjoin(sized) + return planArchiveConjoin(sized, stats) case conjoinModeTable: return planTableConjoin(sized, stats) default: