From d290a9f4628cd45c5bc19feebb50740fd0f7e177 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Thu, 22 May 2025 10:12:30 -0700 Subject: [PATCH] Refactor archive to convert all table files incrementally By all table files, I mean both newgen and oldgen. --- go/store/nbs/archive_build.go | 157 ++++++++++++++++++---------------- 1 file changed, 85 insertions(+), 72 deletions(-) diff --git a/go/store/nbs/archive_build.go b/go/store/nbs/archive_build.go index a784e68e2a..d20858534b 100644 --- a/go/store/nbs/archive_build.go +++ b/go/store/nbs/archive_build.go @@ -40,6 +40,8 @@ const defaultDictionarySize = 1 << 12 // NM4 - maybe just select the largest chu const maxSamples = 1000 const minSamples = 25 +var errNotEnoughChunks = errors.New("Not enough samples to build default dictionary") + func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, progress chan interface{}) error { if gs, ok := cs.(*GenerationalNBS); ok { outPath, _ := gs.oldGen.Path() @@ -115,86 +117,97 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p } func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) (err error) { - // Currently, we don't have any stats to report. Required for calls to the lower layers tho. - var stats Stats - if gs, ok := cs.(*GenerationalNBS); ok { - outPath, _ := gs.oldGen.Path() - oldgen := gs.oldGen.tables.upstream - - swapMap := make(map[hash.Hash]hash.Hash) - - for tf, ogcs := range oldgen { - if _, ok := ogcs.(archiveChunkSource); ok { - continue - } - - idx, err := ogcs.index() - if err != nil { - return err - } - - originalSize := idx.tableFileSize() - - archivePath := "" - archiveName := hash.Hash{} - archivePath, archiveName, err = convertTableFileToArchive(ctx, ogcs, idx, dagGroups, outPath, progress, &stats) - if err != nil { - return err - } - - fileInfo, err := os.Stat(archivePath) - if err != nil { - progress <- "Failed to stat archive file" - return err - } - archiveSize := fileInfo.Size() - - err = verifyAllChunks(ctx, idx, archivePath, progress, &stats) - if err != nil { - return err - } - - percentReduction := -100.0 * (float64(archiveSize)/float64(originalSize) - 1.0) - progress <- fmt.Sprintf("Archived %s (%d -> %d bytes, %.2f%% reduction)", archiveName, originalSize, archiveSize, percentReduction) - - swapMap[tf] = archiveName - } - - if len(swapMap) == 0 { - return fmt.Errorf("No tables found to archive. Run 'dolt gc' first") - } - - cleanup := make([]hash.Hash, 0, len(swapMap)) - - //NM4 TODO: This code path must only be run on an offline database. We should add a check for that. - specs, err := gs.oldGen.tables.toSpecs() - newSpecs := make([]tableSpec, 0, len(specs)) - for _, spec := range specs { - if newSpec, exists := swapMap[spec.name]; exists { - newSpecs = append(newSpecs, tableSpec{newSpec, spec.chunkCount}) - cleanup = append(cleanup, spec.name) - } else { - newSpecs = append(newSpecs, spec) - } - } - err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default) + err = archiveSingleBlockStore(ctx, gs.newGen, dagGroups, purge, progress) if err != nil { return err } - if purge && len(cleanup) > 0 { - for _, h := range cleanup { - tf := filepath.Join(outPath, h.String()) - err = os.Remove(tf) - if err != nil { - return err + err = archiveSingleBlockStore(ctx, gs.oldGen, dagGroups, purge, progress) + if err != nil { + return err + } + } else { + return errors.New("Modern DB Expected") + } + return nil +} + +func archiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) error { + // Currently, we don't have any stats to report. Required for calls to the lower layers tho. + var stats Stats + + path, _ := blockStore.Path() + sourceSet := blockStore.tables.upstream + + for tf, cs := range sourceSet { + if _, ok := cs.(archiveChunkSource); ok { + continue + } + if isJournalAddr(cs.hash()) { + continue + } + + idx, err := cs.index() + if err != nil { + return err + } + + originalSize := idx.tableFileSize() + + archivePath := "" + archiveName := hash.Hash{} + archivePath, archiveName, err = convertTableFileToArchive(ctx, cs, idx, dagGroups, path, progress, &stats) + if err != nil { + if errors.Is(err, errNotEnoughChunks) { + progress <- fmt.Sprintf("Not enough chunks to build archive for %s. Skipping.", cs.hash().String()) + continue + } + + return err + } + + fileInfo, err := os.Stat(archivePath) + if err != nil { + progress <- "Failed to stat archive file" + return err + } + archiveSize := fileInfo.Size() + + err = verifyAllChunks(ctx, idx, archivePath, progress, &stats) + if err != nil { + return err + } + + percentReduction := -100.0 * (float64(archiveSize)/float64(originalSize) - 1.0) + progress <- fmt.Sprintf("Archived %s (%d -> %d bytes, %.2f%% reduction)", archiveName, originalSize, archiveSize, percentReduction) + + specs, err := blockStore.tables.toSpecs() + newSpecs := make([]tableSpec, 0, len(specs)) + purgeFile := "" + for _, spec := range specs { + if tf == spec.name { + newSpecs = append(newSpecs, tableSpec{archiveName, spec.chunkCount}) + if purge { + purgeFile = filepath.Join(path, tf.String()) } + } else { + newSpecs = append(newSpecs, spec) } } - } else { - return errors.New("Modern DB Expected") + err = blockStore.swapTables(ctx, newSpecs, chunks.GCMode_Default) + if err != nil { + return err + } + + if len(purgeFile) > 0 { + err = os.Remove(purgeFile) + if err != nil { + // failing to purge is non-fatal. + progress <- fmt.Sprintf("Failed to purge. %s", purgeFile) + } + } } return nil } @@ -217,7 +230,7 @@ func convertTableFileToArchive( if len(defaultSamples) >= minSamples { defaultDict = buildDictionary(defaultSamples) } else { - return "", hash.Hash{}, errors.New("Not enough samples to build default dictionary") + return "", hash.Hash{}, errNotEnoughChunks } defaultSamples = nil