From ffef6e92d725f7af9adca48bc32a7825878be2e8 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 20 Jan 2026 14:45:59 -0800 Subject: [PATCH] go/store/nbs: Fix possible data loss from Dolt CLI utilization when executing against a running server that was in the process of writing to its journal file. Since Dolt v1.78.5, Dolt has truncated the journal file to the latest successfully loaded record when it successfully loads a database. This is the correct behavior when the CLI is operating as the exclusive writer to the database. However, Dolt also has a mode where it can load the database in read-only mode. Due to a bug, Dolt was also truncating the journal file when it was operating in this mode. The end result was the running something like `dolt sql -r csv -q ...` against a Dolt database that was running a sql-server and was currently accepting writes could incorrectly truncate the journal file. The Dolt sql-server process would go ahead writing into the journal at its previously extended offset and the space between the truncation and the next write offset would be 0 filled by the operating system. Attempting to load the database later or accessing the chunks located at that corrupted portion of the journal file would result in checksum errors or failure to load messages. This change correctly updates Dolt to only Truncate the journal file when we are loading it in read-write mode. --- go/store/nbs/journal.go | 4 ++-- go/store/nbs/journal_record.go | 18 +++++++++--------- go/store/nbs/journal_record_test.go | 12 ++++++------ go/store/nbs/journal_writer.go | 4 ++-- go/store/nbs/journal_writer_test.go | 8 ++++---- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index c405558b43..ab3dd860df 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -155,7 +155,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu return err } - _, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb) + _, err = j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb) if err != nil { return err } @@ -183,7 +183,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu } // parse existing journal file - root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb) + root, err := j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb) if err != nil { return err } diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index 8b0d683d7c..d724ba3549 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -264,16 +264,16 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) defer lock.Unlock() journalPath := filepath.Join(nomsDir, chunkJournalName) - jornalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666) + journalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666) if err != nil { return "", fmt.Errorf("could not open chunk journal file: %w", err) } - defer jornalFile.Close() + defer journalFile.Close() noOp := func(o int64, r journalRec) error { return nil } // First verify that the journal has data loss. var offset int64 - offset, err = processJournalRecords(context.Background(), jornalFile, 0, noOp, nil) + offset, err = processJournalRecords(context.Background(), journalFile, true /* tryTruncate */, 0, noOp, nil) if err == nil { // No data loss detected, nothing to do. return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed") @@ -283,7 +283,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) } // Seek back to the start, to perform a full copy. - if _, err = jornalFile.Seek(0, io.SeekStart); err != nil { + if _, err = journalFile.Seek(0, io.SeekStart); err != nil { return "", fmt.Errorf("could not seek to start of chunk journal file: %w", err) } @@ -296,7 +296,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) if err != nil { return "", fmt.Errorf("could not create backup of corrupted chunk journal file: %w", err) } - if _, err = io.Copy(saveFile, jornalFile); err != nil { + if _, err = io.Copy(saveFile, journalFile); err != nil { return "", fmt.Errorf("could not backup corrupted chunk journal file: %w", err) } @@ -305,10 +305,10 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) } // Now truncate the journal file to the last known good offset. - if err = jornalFile.Truncate(offset); err != nil { + if err = journalFile.Truncate(offset); err != nil { return "", fmt.Errorf("could not truncate corrupted chunk journal file: %w", err) } - if err = jornalFile.Sync(); err != nil { + if err = journalFile.Sync(); err != nil { return "", fmt.Errorf("could not sync truncated chunk journal file: %w", err) } @@ -327,7 +327,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) // // The |warningsCb| callback is called with any errors encountered that we automatically recover from. This allows the caller // to handle the situation in a context specific way. -func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { +func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { var ( buf []byte err error @@ -444,7 +444,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f // When we have a real file, we truncate anything which is beyond the current offset. Historically we put // null bytes there, and there have been cases of garbage data being present instead of nulls. If there is any // data beyond the current offset which we can parse and looks like data loss, we would have errored out above. - if f, ok := r.(*os.File); ok { + if f, ok := r.(*os.File); ok && tryTruncate { err = f.Truncate(off) if err != nil { return 0, err diff --git a/go/store/nbs/journal_record_test.go b/go/store/nbs/journal_record_test.go index 5ca12007e7..84034620e6 100644 --- a/go/store/nbs/journal_record_test.go +++ b/go/store/nbs/journal_record_test.go @@ -116,7 +116,7 @@ func TestProcessJournalRecords(t *testing.T) { } var recoverErr error - n, err := processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e }) + n, err := processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) require.NoError(t, err) @@ -125,7 +125,7 @@ func TestProcessJournalRecords(t *testing.T) { // write a bogus record to the end and verify that we don't get an error i, sum = 0, 0 writeCorruptJournalRecord(journal[off:]) - n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e }) + n, err = processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) @@ -227,7 +227,7 @@ func TestJournalForDataLoss(t *testing.T) { } var recoverErr error - _, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e }) if td.lossExpected { require.Error(t, err) @@ -288,7 +288,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // no confidence in the rest of the test. ctx := context.Background() var recoverErr error - bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e }) + bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) require.Equal(t, off, uint32(bytesRead)) require.Error(t, recoverErr) // We do expect a warning here, but no data loss. @@ -312,7 +312,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // Copy lost data into journal buffer at the test offset. copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData) - _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.Error(t, err) require.True(t, errors.Is(err, ErrJournalDataLoss)) require.Error(t, recoverErr) @@ -457,7 +457,7 @@ func processJournalAndCollectRecords(t *testing.T, journalData []byte) []testRec t.FailNow() } - _, err := processJournalRecords(ctx, bytes.NewReader(journalData), 0, func(offset int64, rec journalRec) error { + _, err := processJournalRecords(ctx, bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error { records = append(records, testRecord{hash: rec.address, kind: rec.kind}) return nil }, warnCb) diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index a462d54e4b..f9b16824ee 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -163,7 +163,7 @@ var _ io.Closer = &journalWriter{} // are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we // extend the journal index with one metadata flush before existing this function to save indexing // progress. -func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) { +func (wr *journalWriter) bootstrapJournal(ctx context.Context, canWrite bool, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) { wr.lock.Lock() defer wr.lock.Unlock() @@ -276,7 +276,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer // process the non-indexed portion of the journal starting at |wr.indexed|, // at minimum the non-indexed portion will include a root hash record. // Index lookups are added to the ongoing batch to re-synchronize. - wr.off, err = processJournalRecords(ctx, wr.journal, wr.indexed, func(o int64, r journalRec) error { + wr.off, err = processJournalRecords(ctx, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error { switch r.kind { case chunkJournalRecKind: rng := Range{ diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go index 27a540e45c..cea880820f 100644 --- a/go/store/nbs/journal_writer_test.go +++ b/go/store/nbs/journal_writer_test.go @@ -185,7 +185,7 @@ func newTestJournalWriter(t *testing.T, path string) *journalWriter { j, err := createJournalWriter(ctx, path) require.NoError(t, err) require.NotNil(t, j) - _, err = j.bootstrapJournal(ctx, nil, nil) + _, err = j.bootstrapJournal(ctx, true, nil, nil) require.NoError(t, err) return j } @@ -220,7 +220,7 @@ func TestJournalWriterBootstrap(t *testing.T) { j, _, err := openJournalWriter(ctx, path) require.NoError(t, err) reflogBuffer := newReflogRingBuffer(10) - last, err = j.bootstrapJournal(ctx, reflogBuffer, nil) + last, err = j.bootstrapJournal(ctx, true, reflogBuffer, nil) require.NoError(t, err) assertExpectedIterationOrder(t, reflogBuffer, []string{last.String()}) @@ -363,7 +363,7 @@ func TestJournalIndexBootstrap(t *testing.T) { require.NoError(t, err) require.True(t, ok) // bootstrap journal and validate chunk records - last, err := journal.bootstrapJournal(ctx, nil, nil) + last, err := journal.bootstrapJournal(ctx, true, nil, nil) assert.NoError(t, err) for _, e := range expected { var act CompressedChunk @@ -398,7 +398,7 @@ func TestJournalIndexBootstrap(t *testing.T) { jnl, ok, err := openJournalWriter(ctx, idxPath) require.NoError(t, err) require.True(t, ok) - _, err = jnl.bootstrapJournal(ctx, nil, nil) + _, err = jnl.bootstrapJournal(ctx, true, nil, nil) assert.Error(t, err) }) }