mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-09 10:38:10 -06:00
Update archive --revert to handle newgen archive files
This commit is contained in:
@@ -44,88 +44,14 @@ const fourMb = 1 << 22
|
||||
|
||||
var errNotEnoughChunks = errors.New("Not enough samples to build default dictionary")
|
||||
|
||||
func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, progress chan interface{}) error {
|
||||
func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, progress chan interface{}) (err error) {
|
||||
if gs, ok := cs.(*GenerationalNBS); ok {
|
||||
outPath, _ := gs.oldGen.Path()
|
||||
oldgen := gs.oldGen.tables.upstream
|
||||
|
||||
swapMap := make(map[hash.Hash]hash.Hash)
|
||||
|
||||
revertMap := smd.RevertMap()
|
||||
|
||||
for id, ogcs := range oldgen {
|
||||
if arc, ok := ogcs.(archiveChunkSource); ok {
|
||||
orginTfId := revertMap[id]
|
||||
exists, err := smd.oldGenTableExists(orginTfId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
// We have a fast path to follow because original table file is still on disk.
|
||||
swapMap[arc.hash()] = orginTfId
|
||||
} else {
|
||||
// We don't have the original table file id, so we have to create a new one.
|
||||
classicTable, err := NewCmpChunkTableWriter("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = arc.iterate(ctx, func(chk chunks.Chunk) error {
|
||||
cmpChk := ChunkToCompressedChunk(chk)
|
||||
_, err := classicTable.AddChunk(cmpChk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
progress <- fmt.Sprintf("Unarchiving %s (bytes: %d)", chk.Hash().String(), len(chk.Data()))
|
||||
return nil
|
||||
}, &Stats{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, id, err := classicTable.Finish()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = classicTable.FlushToFile(filepath.Join(outPath, id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
swapMap[arc.hash()] = hash.Parse(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(swapMap) > 0 {
|
||||
//NM4 TODO: This code path must only be run on an offline database. We should add a check for that.
|
||||
specs, err := gs.oldGen.tables.toSpecs()
|
||||
newSpecs := make([]tableSpec, 0, len(specs))
|
||||
for _, spec := range specs {
|
||||
if newSpec, exists := swapMap[spec.name]; exists {
|
||||
newSpecs = append(newSpecs, tableSpec{newSpec, spec.chunkCount})
|
||||
} else {
|
||||
newSpecs = append(newSpecs, spec)
|
||||
}
|
||||
}
|
||||
err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) (err error) {
|
||||
if gs, ok := cs.(*GenerationalNBS); ok {
|
||||
err = archiveSingleBlockStore(ctx, func() *NomsBlockStore { return gs.newGen }, dagGroups, purge, progress)
|
||||
err = unArchiveSingleBlockStore(ctx, gs.newGen, smd, progress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = archiveSingleBlockStore(ctx, func() *NomsBlockStore { return gs.oldGen }, dagGroups, purge, progress)
|
||||
err = unArchiveSingleBlockStore(ctx, gs.oldGen, smd, progress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -135,11 +61,126 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel
|
||||
return nil
|
||||
}
|
||||
|
||||
func archiveSingleBlockStore(ctx context.Context, bscb func() *NomsBlockStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) error {
|
||||
func unArchiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, smd StorageMetadata, progress chan interface{}) error {
|
||||
outPath, _ := blockStore.Path()
|
||||
|
||||
// The source set changes out from under us, but the names of the table files will stay stable enough
|
||||
// to iterate over them.
|
||||
allFiles := make([]hash.Hash, 0, len(blockStore.tables.upstream))
|
||||
sourceSet := blockStore.tables.upstream
|
||||
for tf := range sourceSet {
|
||||
allFiles = append(allFiles, tf)
|
||||
}
|
||||
|
||||
revertMap := smd.RevertMap()
|
||||
|
||||
for _, id := range allFiles {
|
||||
sourceSet = blockStore.tables.upstream
|
||||
cs := sourceSet[id]
|
||||
|
||||
arc, ok := cs.(archiveChunkSource)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if isJournalAddr(id) {
|
||||
continue
|
||||
}
|
||||
|
||||
orginTfId := revertMap[id]
|
||||
exists, err := smd.oldGenTableExists(orginTfId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
chkCnt, err := arc.count()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count chunks in archive %s: %w", id.String(), err)
|
||||
}
|
||||
|
||||
var newTF hash.Hash
|
||||
if exists {
|
||||
// We have a fast path to follow because original table file is still on disk.
|
||||
newTF = orginTfId
|
||||
} else {
|
||||
// We don't have the original table file id, so we have to create a new one.
|
||||
classicTable, err := NewCmpChunkTableWriter("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = arc.iterate(ctx, func(chk chunks.Chunk) error {
|
||||
cmpChk := ChunkToCompressedChunk(chk)
|
||||
_, err := classicTable.AddChunk(cmpChk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}, &Stats{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, id, err := classicTable.Finish()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = classicTable.FlushToFile(filepath.Join(outPath, id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newTF = hash.Parse(id)
|
||||
}
|
||||
|
||||
specs, err := blockStore.tables.toSpecs()
|
||||
newSpecs := make([]tableSpec, 0, len(specs))
|
||||
purgeFile := ""
|
||||
for _, spec := range specs {
|
||||
if id == spec.name {
|
||||
newSpecs = append(newSpecs, tableSpec{newTF, chkCnt})
|
||||
purgeFile = filepath.Join(outPath, id.String()+ArchiveFileSuffix)
|
||||
} else {
|
||||
newSpecs = append(newSpecs, spec)
|
||||
}
|
||||
}
|
||||
|
||||
err = blockStore.swapTables(ctx, newSpecs, chunks.GCMode_Default)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.Remove(purgeFile)
|
||||
if err != nil {
|
||||
// failing to purge is non-fatal.
|
||||
progress <- fmt.Sprintf("Failed to purge. %s", purgeFile)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) (err error) {
|
||||
if gs, ok := cs.(*GenerationalNBS); ok {
|
||||
err = archiveSingleBlockStore(ctx, gs.newGen, dagGroups, purge, progress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = archiveSingleBlockStore(ctx, gs.oldGen, dagGroups, purge, progress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return errors.New("runtime error: GenerationalNBS Expected")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func archiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) error {
|
||||
// Currently, we don't have any stats to report. Required for calls to the lower layers tho.
|
||||
var stats Stats
|
||||
|
||||
blockStore := bscb()
|
||||
path, _ := blockStore.Path()
|
||||
|
||||
allFiles := make([]hash.Hash, 0, len(blockStore.tables.upstream))
|
||||
|
||||
@@ -104,6 +104,36 @@ mutations_and_gc_statement() {
|
||||
dolt sql -q "$(update_statement)"
|
||||
}
|
||||
|
||||
@test "archive: multi archive newgen then revert" {
|
||||
# Getting multiple table files in `newgen` is a little gross.
|
||||
dolt sql -q "$(mutations_and_gc_statement)"
|
||||
mkdir remote
|
||||
dolt remote add origin file://remote
|
||||
dolt push origin main
|
||||
|
||||
dolt clone file://remote cloned
|
||||
cd cloned
|
||||
dolt archive --purge
|
||||
files=$(find . -name "*darc" | wc -l | sed 's/[ \t]//g')
|
||||
[ "$files" -eq "1" ]
|
||||
|
||||
cd ..
|
||||
dolt sql -q "$(mutations_and_gc_statement)"
|
||||
dolt push origin main
|
||||
|
||||
cd cloned
|
||||
dolt fetch
|
||||
dolt archive --purge
|
||||
files=$(find . -name "*darc" | wc -l | sed 's/[ \t]//g')
|
||||
[ "$files" -eq "2" ]
|
||||
|
||||
dolt archive --revert
|
||||
files=$(find . -name "*darc" | wc -l | sed 's/[ \t]//g')
|
||||
[ "$files" -eq "0" ]
|
||||
|
||||
dolt fsck
|
||||
}
|
||||
|
||||
@test "archive: multiple archives" {
|
||||
dolt sql -q "$(mutations_and_gc_statement)"
|
||||
dolt sql -q "$(mutations_and_gc_statement)"
|
||||
|
||||
Reference in New Issue
Block a user