mirror of
https://github.com/dolthub/dolt.git
synced 2025-12-21 11:59:41 -06:00
Journal repair. Untested
This commit is contained in:
@@ -55,6 +55,10 @@ var fsckDocs = cli.CommandDocumentationContent{
|
||||
},
|
||||
}
|
||||
|
||||
const (
|
||||
journalReviveFlag = "revive-journal-with-data-loss"
|
||||
)
|
||||
|
||||
func (cmd FsckCmd) Docs() *cli.CommandDocumentation {
|
||||
return cli.NewCommandDocumentation(fsckDocs, cmd.ArgParser())
|
||||
}
|
||||
@@ -62,6 +66,7 @@ func (cmd FsckCmd) Docs() *cli.CommandDocumentation {
|
||||
func (cmd FsckCmd) ArgParser() *argparser.ArgParser {
|
||||
ap := argparser.NewArgParserWithMaxArgs(cmd.Name(), 0)
|
||||
ap.SupportsFlag(cli.QuietFlag, "", "Don't show progress. Just print final report.")
|
||||
ap.SupportsFlag(journalReviveFlag, "", "Revives a corrupted chunk journal by discarding invalid chunks. This may result in data loss.")
|
||||
|
||||
return ap
|
||||
}
|
||||
@@ -81,6 +86,23 @@ func (cmd FsckCmd) Exec(ctx context.Context, commandStr string, args []string, d
|
||||
return status
|
||||
}
|
||||
|
||||
if apr.Contains(journalReviveFlag) {
|
||||
root, err := dEnv.FS.Abs("")
|
||||
if err != nil {
|
||||
cli.PrintErrln("Could not get absolute path for dolt data directory:", err.Error())
|
||||
}
|
||||
noms := filepath.Join(root, "noms")
|
||||
|
||||
path, err := nbs.ReviveJournalWithDataLoss(noms)
|
||||
if err != nil {
|
||||
cli.PrintErrln("Could not revive chunk journal:", err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
cli.Printf("Revived chunk journal at %s\n", path)
|
||||
return 0
|
||||
}
|
||||
|
||||
quiet := apr.Contains(cli.QuietFlag)
|
||||
|
||||
// We expect these to work because the database has already been initialized in higher layers. We'll check anyway
|
||||
@@ -177,7 +199,7 @@ func printFSCKReport(report *FSCKReport) int {
|
||||
|
||||
func fsckHandleProgress(ctx context.Context, progress <-chan string, quiet bool) {
|
||||
for item := range progress {
|
||||
// when ctx is cancelled, keep draining but stop printing
|
||||
// when ctx is canceled, keep draining but stop printing
|
||||
if !quiet && ctx.Err() == nil {
|
||||
cli.Println(item)
|
||||
}
|
||||
|
||||
@@ -23,10 +23,12 @@ import (
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/fslock"
|
||||
)
|
||||
|
||||
// journalRec is a record in a chunk journal. Its serialization format uses
|
||||
@@ -247,6 +249,66 @@ func validateJournalRecord(buf []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReviveJournalWithDataLoss attempts to recover from a corrupted chunk journal file located in |nomsDir|. This is
|
||||
// a special access method for use by the FSCK command. It acquired the lock on the NBS store,
|
||||
// verifies that there is dataloss, and if so, truncates the journal to the last known good offset. A backup of the
|
||||
// corrupted journal file is created with a timestamped suffix before truncating. If no data loss is detected, no action
|
||||
// is taken and an empty string is returned. If data loss is detected and recovery is performed, the path to the backup
|
||||
// file is returned.
|
||||
func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) {
|
||||
lock := fslock.New(filepath.Join(nomsDir, lockFileName))
|
||||
err = lock.TryLock()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not acquire lock on NBS store: %w", err)
|
||||
}
|
||||
defer lock.Unlock()
|
||||
|
||||
journalPath := filepath.Join(nomsDir, chunkJournalName)
|
||||
jornalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not open chunk journal file: %w", err)
|
||||
}
|
||||
defer jornalFile.Close()
|
||||
|
||||
noOp := func(o int64, r journalRec) error { return nil }
|
||||
// First verify that the journal has data loss.
|
||||
var offset int64
|
||||
offset, err = processJournalRecords(context.Background(), jornalFile, 0, noOp, nil)
|
||||
if err == nil {
|
||||
// No data loss detected, nothing to do.
|
||||
return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed")
|
||||
}
|
||||
if !errors.Is(err, ErrJournalDataLoss) {
|
||||
return "", fmt.Errorf("could not process chunk journal file: %w", err)
|
||||
}
|
||||
|
||||
// Seek back to the start, to perform a full copy.
|
||||
if _, err = jornalFile.Seek(0, io.SeekStart); err != nil {
|
||||
return "", fmt.Errorf("could not seek to start of chunk journal file: %w", err)
|
||||
}
|
||||
|
||||
// Create a backup of the journal file before truncating.
|
||||
preservePath = fmt.Sprintf("%s_save_%d", journalPath, time.Now().Format("2006_01_02_15_04_05"))
|
||||
saveFile, err := os.OpenFile(preservePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0666)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not create backup of corrupted chunk journal file: %w", err)
|
||||
}
|
||||
defer saveFile.Close()
|
||||
if _, err = io.Copy(saveFile, jornalFile); err != nil {
|
||||
return "", fmt.Errorf("could not backup corrupted chunk journal file: %w", err)
|
||||
}
|
||||
|
||||
// Now truncate the journal file to the last known good offset.
|
||||
if err = jornalFile.Truncate(offset); err != nil {
|
||||
return "", fmt.Errorf("could not truncate corrupted chunk journal file: %w", err)
|
||||
}
|
||||
if err = jornalFile.Sync(); err != nil {
|
||||
return "", fmt.Errorf("could not sync truncated chunk journal file: %w", err)
|
||||
}
|
||||
|
||||
return preservePath, nil
|
||||
}
|
||||
|
||||
// processJournalRecords iterates over a chunk journal's records by reading from disk using |r|, starting at
|
||||
// offset |off|, and calls the callback function |cb| with each journal record. The offset where reading was stopped
|
||||
// is returned, or any error encountered along the way.
|
||||
|
||||
Reference in New Issue
Block a user