diff --git a/go/store/nbs/archive_build.go b/go/store/nbs/archive_build.go index 7e48561cdd..4038fd5017 100644 --- a/go/store/nbs/archive_build.go +++ b/go/store/nbs/archive_build.go @@ -44,88 +44,14 @@ const fourMb = 1 << 22 var errNotEnoughChunks = errors.New("Not enough samples to build default dictionary") -func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, progress chan interface{}) error { +func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, progress chan interface{}) (err error) { if gs, ok := cs.(*GenerationalNBS); ok { - outPath, _ := gs.oldGen.Path() - oldgen := gs.oldGen.tables.upstream - - swapMap := make(map[hash.Hash]hash.Hash) - - revertMap := smd.RevertMap() - - for id, ogcs := range oldgen { - if arc, ok := ogcs.(archiveChunkSource); ok { - orginTfId := revertMap[id] - exists, err := smd.oldGenTableExists(orginTfId) - if err != nil { - return err - } - if exists { - // We have a fast path to follow because original table file is still on disk. - swapMap[arc.hash()] = orginTfId - } else { - // We don't have the original table file id, so we have to create a new one. - classicTable, err := NewCmpChunkTableWriter("") - if err != nil { - return err - } - - err = arc.iterate(ctx, func(chk chunks.Chunk) error { - cmpChk := ChunkToCompressedChunk(chk) - _, err := classicTable.AddChunk(cmpChk) - if err != nil { - return err - } - - progress <- fmt.Sprintf("Unarchiving %s (bytes: %d)", chk.Hash().String(), len(chk.Data())) - return nil - }, &Stats{}) - if err != nil { - return err - } - - _, id, err := classicTable.Finish() - if err != nil { - return err - } - err = classicTable.FlushToFile(filepath.Join(outPath, id)) - if err != nil { - return err - } - - swapMap[arc.hash()] = hash.Parse(id) - } - } - } - - if len(swapMap) > 0 { - //NM4 TODO: This code path must only be run on an offline database. We should add a check for that. - specs, err := gs.oldGen.tables.toSpecs() - newSpecs := make([]tableSpec, 0, len(specs)) - for _, spec := range specs { - if newSpec, exists := swapMap[spec.name]; exists { - newSpecs = append(newSpecs, tableSpec{newSpec, spec.chunkCount}) - } else { - newSpecs = append(newSpecs, spec) - } - } - err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default) - if err != nil { - return err - } - } - } - return nil -} - -func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) (err error) { - if gs, ok := cs.(*GenerationalNBS); ok { - err = archiveSingleBlockStore(ctx, func() *NomsBlockStore { return gs.newGen }, dagGroups, purge, progress) + err = unArchiveSingleBlockStore(ctx, gs.newGen, smd, progress) if err != nil { return err } - err = archiveSingleBlockStore(ctx, func() *NomsBlockStore { return gs.oldGen }, dagGroups, purge, progress) + err = unArchiveSingleBlockStore(ctx, gs.oldGen, smd, progress) if err != nil { return err } @@ -135,11 +61,126 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel return nil } -func archiveSingleBlockStore(ctx context.Context, bscb func() *NomsBlockStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) error { +func unArchiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, smd StorageMetadata, progress chan interface{}) error { + outPath, _ := blockStore.Path() + + // The source set changes out from under us, but the names of the table files will stay stable enough + // to iterate over them. + allFiles := make([]hash.Hash, 0, len(blockStore.tables.upstream)) + sourceSet := blockStore.tables.upstream + for tf := range sourceSet { + allFiles = append(allFiles, tf) + } + + revertMap := smd.RevertMap() + + for _, id := range allFiles { + sourceSet = blockStore.tables.upstream + cs := sourceSet[id] + + arc, ok := cs.(archiveChunkSource) + if !ok { + continue + } + if isJournalAddr(id) { + continue + } + + orginTfId := revertMap[id] + exists, err := smd.oldGenTableExists(orginTfId) + if err != nil { + return err + } + + chkCnt, err := arc.count() + if err != nil { + return fmt.Errorf("failed to count chunks in archive %s: %w", id.String(), err) + } + + var newTF hash.Hash + if exists { + // We have a fast path to follow because original table file is still on disk. + newTF = orginTfId + } else { + // We don't have the original table file id, so we have to create a new one. + classicTable, err := NewCmpChunkTableWriter("") + if err != nil { + return err + } + + err = arc.iterate(ctx, func(chk chunks.Chunk) error { + cmpChk := ChunkToCompressedChunk(chk) + _, err := classicTable.AddChunk(cmpChk) + if err != nil { + return err + } + + return nil + }, &Stats{}) + if err != nil { + return err + } + + _, id, err := classicTable.Finish() + if err != nil { + return err + } + err = classicTable.FlushToFile(filepath.Join(outPath, id)) + if err != nil { + return err + } + + newTF = hash.Parse(id) + } + + specs, err := blockStore.tables.toSpecs() + newSpecs := make([]tableSpec, 0, len(specs)) + purgeFile := "" + for _, spec := range specs { + if id == spec.name { + newSpecs = append(newSpecs, tableSpec{newTF, chkCnt}) + purgeFile = filepath.Join(outPath, id.String()+ArchiveFileSuffix) + } else { + newSpecs = append(newSpecs, spec) + } + } + + err = blockStore.swapTables(ctx, newSpecs, chunks.GCMode_Default) + if err != nil { + return err + } + + err = os.Remove(purgeFile) + if err != nil { + // failing to purge is non-fatal. + progress <- fmt.Sprintf("Failed to purge. %s", purgeFile) + } + } + + return nil +} + +func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) (err error) { + if gs, ok := cs.(*GenerationalNBS); ok { + err = archiveSingleBlockStore(ctx, gs.newGen, dagGroups, purge, progress) + if err != nil { + return err + } + + err = archiveSingleBlockStore(ctx, gs.oldGen, dagGroups, purge, progress) + if err != nil { + return err + } + } else { + return errors.New("runtime error: GenerationalNBS Expected") + } + return nil +} + +func archiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) error { // Currently, we don't have any stats to report. Required for calls to the lower layers tho. var stats Stats - blockStore := bscb() path, _ := blockStore.Path() allFiles := make([]hash.Hash, 0, len(blockStore.tables.upstream)) diff --git a/integration-tests/bats/archive.bats b/integration-tests/bats/archive.bats index a38f2a2495..98e521d494 100755 --- a/integration-tests/bats/archive.bats +++ b/integration-tests/bats/archive.bats @@ -104,6 +104,36 @@ mutations_and_gc_statement() { dolt sql -q "$(update_statement)" } +@test "archive: multi archive newgen then revert" { + # Getting multiple table files in `newgen` is a little gross. + dolt sql -q "$(mutations_and_gc_statement)" + mkdir remote + dolt remote add origin file://remote + dolt push origin main + + dolt clone file://remote cloned + cd cloned + dolt archive --purge + files=$(find . -name "*darc" | wc -l | sed 's/[ \t]//g') + [ "$files" -eq "1" ] + + cd .. + dolt sql -q "$(mutations_and_gc_statement)" + dolt push origin main + + cd cloned + dolt fetch + dolt archive --purge + files=$(find . -name "*darc" | wc -l | sed 's/[ \t]//g') + [ "$files" -eq "2" ] + + dolt archive --revert + files=$(find . -name "*darc" | wc -l | sed 's/[ \t]//g') + [ "$files" -eq "0" ] + + dolt fsck +} + @test "archive: multiple archives" { dolt sql -q "$(mutations_and_gc_statement)" dolt sql -q "$(mutations_and_gc_statement)"