Refactor archive to convert all table files incrementally

By all table files, I mean both newgen and oldgen.
This commit is contained in:
Neil Macneale IV
2025-05-22 10:12:30 -07:00
parent a8da9839a8
commit d290a9f462

View File

@@ -40,6 +40,8 @@ const defaultDictionarySize = 1 << 12 // NM4 - maybe just select the largest chu
const maxSamples = 1000
const minSamples = 25
var errNotEnoughChunks = errors.New("Not enough samples to build default dictionary")
func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, progress chan interface{}) error {
if gs, ok := cs.(*GenerationalNBS); ok {
outPath, _ := gs.oldGen.Path()
@@ -115,86 +117,97 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p
}
func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) (err error) {
// Currently, we don't have any stats to report. Required for calls to the lower layers tho.
var stats Stats
if gs, ok := cs.(*GenerationalNBS); ok {
outPath, _ := gs.oldGen.Path()
oldgen := gs.oldGen.tables.upstream
swapMap := make(map[hash.Hash]hash.Hash)
for tf, ogcs := range oldgen {
if _, ok := ogcs.(archiveChunkSource); ok {
continue
}
idx, err := ogcs.index()
if err != nil {
return err
}
originalSize := idx.tableFileSize()
archivePath := ""
archiveName := hash.Hash{}
archivePath, archiveName, err = convertTableFileToArchive(ctx, ogcs, idx, dagGroups, outPath, progress, &stats)
if err != nil {
return err
}
fileInfo, err := os.Stat(archivePath)
if err != nil {
progress <- "Failed to stat archive file"
return err
}
archiveSize := fileInfo.Size()
err = verifyAllChunks(ctx, idx, archivePath, progress, &stats)
if err != nil {
return err
}
percentReduction := -100.0 * (float64(archiveSize)/float64(originalSize) - 1.0)
progress <- fmt.Sprintf("Archived %s (%d -> %d bytes, %.2f%% reduction)", archiveName, originalSize, archiveSize, percentReduction)
swapMap[tf] = archiveName
}
if len(swapMap) == 0 {
return fmt.Errorf("No tables found to archive. Run 'dolt gc' first")
}
cleanup := make([]hash.Hash, 0, len(swapMap))
//NM4 TODO: This code path must only be run on an offline database. We should add a check for that.
specs, err := gs.oldGen.tables.toSpecs()
newSpecs := make([]tableSpec, 0, len(specs))
for _, spec := range specs {
if newSpec, exists := swapMap[spec.name]; exists {
newSpecs = append(newSpecs, tableSpec{newSpec, spec.chunkCount})
cleanup = append(cleanup, spec.name)
} else {
newSpecs = append(newSpecs, spec)
}
}
err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default)
err = archiveSingleBlockStore(ctx, gs.newGen, dagGroups, purge, progress)
if err != nil {
return err
}
if purge && len(cleanup) > 0 {
for _, h := range cleanup {
tf := filepath.Join(outPath, h.String())
err = os.Remove(tf)
if err != nil {
return err
err = archiveSingleBlockStore(ctx, gs.oldGen, dagGroups, purge, progress)
if err != nil {
return err
}
} else {
return errors.New("Modern DB Expected")
}
return nil
}
func archiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, dagGroups *ChunkRelations, purge bool, progress chan interface{}) error {
// Currently, we don't have any stats to report. Required for calls to the lower layers tho.
var stats Stats
path, _ := blockStore.Path()
sourceSet := blockStore.tables.upstream
for tf, cs := range sourceSet {
if _, ok := cs.(archiveChunkSource); ok {
continue
}
if isJournalAddr(cs.hash()) {
continue
}
idx, err := cs.index()
if err != nil {
return err
}
originalSize := idx.tableFileSize()
archivePath := ""
archiveName := hash.Hash{}
archivePath, archiveName, err = convertTableFileToArchive(ctx, cs, idx, dagGroups, path, progress, &stats)
if err != nil {
if errors.Is(err, errNotEnoughChunks) {
progress <- fmt.Sprintf("Not enough chunks to build archive for %s. Skipping.", cs.hash().String())
continue
}
return err
}
fileInfo, err := os.Stat(archivePath)
if err != nil {
progress <- "Failed to stat archive file"
return err
}
archiveSize := fileInfo.Size()
err = verifyAllChunks(ctx, idx, archivePath, progress, &stats)
if err != nil {
return err
}
percentReduction := -100.0 * (float64(archiveSize)/float64(originalSize) - 1.0)
progress <- fmt.Sprintf("Archived %s (%d -> %d bytes, %.2f%% reduction)", archiveName, originalSize, archiveSize, percentReduction)
specs, err := blockStore.tables.toSpecs()
newSpecs := make([]tableSpec, 0, len(specs))
purgeFile := ""
for _, spec := range specs {
if tf == spec.name {
newSpecs = append(newSpecs, tableSpec{archiveName, spec.chunkCount})
if purge {
purgeFile = filepath.Join(path, tf.String())
}
} else {
newSpecs = append(newSpecs, spec)
}
}
} else {
return errors.New("Modern DB Expected")
err = blockStore.swapTables(ctx, newSpecs, chunks.GCMode_Default)
if err != nil {
return err
}
if len(purgeFile) > 0 {
err = os.Remove(purgeFile)
if err != nil {
// failing to purge is non-fatal.
progress <- fmt.Sprintf("Failed to purge. %s", purgeFile)
}
}
}
return nil
}
@@ -217,7 +230,7 @@ func convertTableFileToArchive(
if len(defaultSamples) >= minSamples {
defaultDict = buildDictionary(defaultSamples)
} else {
return "", hash.Hash{}, errors.New("Not enough samples to build default dictionary")
return "", hash.Hash{}, errNotEnoughChunks
}
defaultSamples = nil