diff --git a/go/cmd/dolt/commands/fsck.go b/go/cmd/dolt/commands/fsck.go index 55809da306..73d0383eb0 100644 --- a/go/cmd/dolt/commands/fsck.go +++ b/go/cmd/dolt/commands/fsck.go @@ -55,6 +55,10 @@ var fsckDocs = cli.CommandDocumentationContent{ }, } +const ( + journalReviveFlag = "revive-journal-with-data-loss" +) + func (cmd FsckCmd) Docs() *cli.CommandDocumentation { return cli.NewCommandDocumentation(fsckDocs, cmd.ArgParser()) } @@ -62,6 +66,7 @@ func (cmd FsckCmd) Docs() *cli.CommandDocumentation { func (cmd FsckCmd) ArgParser() *argparser.ArgParser { ap := argparser.NewArgParserWithMaxArgs(cmd.Name(), 0) ap.SupportsFlag(cli.QuietFlag, "", "Don't show progress. Just print final report.") + ap.SupportsFlag(journalReviveFlag, "", "Revives a corrupted chunk journal by discarding invalid chunks. This may result in data loss.") return ap } @@ -81,6 +86,23 @@ func (cmd FsckCmd) Exec(ctx context.Context, commandStr string, args []string, d return status } + if apr.Contains(journalReviveFlag) { + root, err := dEnv.FS.Abs("") + if err != nil { + cli.PrintErrln("Could not get absolute path for dolt data directory:", err.Error()) + } + noms := filepath.Join(root, "noms") + + path, err := nbs.ReviveJournalWithDataLoss(noms) + if err != nil { + cli.PrintErrln("Could not revive chunk journal:", err.Error()) + return 1 + } + + cli.Printf("Revived chunk journal at %s\n", path) + return 0 + } + quiet := apr.Contains(cli.QuietFlag) // We expect these to work because the database has already been initialized in higher layers. We'll check anyway @@ -177,7 +199,7 @@ func printFSCKReport(report *FSCKReport) int { func fsckHandleProgress(ctx context.Context, progress <-chan string, quiet bool) { for item := range progress { - // when ctx is cancelled, keep draining but stop printing + // when ctx is canceled, keep draining but stop printing if !quiet && ctx.Err() == nil { cli.Println(item) } diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index 30de99c917..a7f3ea90b7 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -23,10 +23,12 @@ import ( "io" "math" "os" + "path/filepath" "time" "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/fslock" ) // journalRec is a record in a chunk journal. Its serialization format uses @@ -247,6 +249,66 @@ func validateJournalRecord(buf []byte) error { return nil } +// ReviveJournalWithDataLoss attempts to recover from a corrupted chunk journal file located in |nomsDir|. This is +// a special access method for use by the FSCK command. It acquired the lock on the NBS store, +// verifies that there is dataloss, and if so, truncates the journal to the last known good offset. A backup of the +// corrupted journal file is created with a timestamped suffix before truncating. If no data loss is detected, no action +// is taken and an empty string is returned. If data loss is detected and recovery is performed, the path to the backup +// file is returned. +func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) { + lock := fslock.New(filepath.Join(nomsDir, lockFileName)) + err = lock.TryLock() + if err != nil { + return "", fmt.Errorf("could not acquire lock on NBS store: %w", err) + } + defer lock.Unlock() + + journalPath := filepath.Join(nomsDir, chunkJournalName) + jornalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666) + if err != nil { + return "", fmt.Errorf("could not open chunk journal file: %w", err) + } + defer jornalFile.Close() + + noOp := func(o int64, r journalRec) error { return nil } + // First verify that the journal has data loss. + var offset int64 + offset, err = processJournalRecords(context.Background(), jornalFile, 0, noOp, nil) + if err == nil { + // No data loss detected, nothing to do. + return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed") + } + if !errors.Is(err, ErrJournalDataLoss) { + return "", fmt.Errorf("could not process chunk journal file: %w", err) + } + + // Seek back to the start, to perform a full copy. + if _, err = jornalFile.Seek(0, io.SeekStart); err != nil { + return "", fmt.Errorf("could not seek to start of chunk journal file: %w", err) + } + + // Create a backup of the journal file before truncating. + preservePath = fmt.Sprintf("%s_save_%d", journalPath, time.Now().Format("2006_01_02_15_04_05")) + saveFile, err := os.OpenFile(preservePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0666) + if err != nil { + return "", fmt.Errorf("could not create backup of corrupted chunk journal file: %w", err) + } + defer saveFile.Close() + if _, err = io.Copy(saveFile, jornalFile); err != nil { + return "", fmt.Errorf("could not backup corrupted chunk journal file: %w", err) + } + + // Now truncate the journal file to the last known good offset. + if err = jornalFile.Truncate(offset); err != nil { + return "", fmt.Errorf("could not truncate corrupted chunk journal file: %w", err) + } + if err = jornalFile.Sync(); err != nil { + return "", fmt.Errorf("could not sync truncated chunk journal file: %w", err) + } + + return preservePath, nil +} + // processJournalRecords iterates over a chunk journal's records by reading from disk using |r|, starting at // offset |off|, and calls the callback function |cb| with each journal record. The offset where reading was stopped // is returned, or any error encountered along the way.