go/store/nbs: Fix possible data loss from Dolt CLI utilization when executing against a running server that was in the process of writing to its journal file.

Since Dolt v1.78.5, Dolt has truncated the journal file to the latest successfully loaded record when it successfully loads a database. This is the correct behavior when the CLI is operating as the exclusive writer to the database. However, Dolt also has a mode where it can load the database in read-only mode. Due to a bug, Dolt was also truncating the journal file when it was operating in this mode. The end result was the running something like `dolt sql -r csv -q ...` against a Dolt database that was running a sql-server and was currently accepting writes could incorrectly truncate the journal file. The Dolt sql-server process would go ahead writing into the journal at its previously extended offset and the space between the truncation and the next write offset would be 0 filled by the operating system. Attempting to load the database later or accessing the chunks located at that corrupted portion of the journal file would result in checksum errors or failure to load messages.

This change correctly updates Dolt to only Truncate the journal file when we are loading it in read-write mode.
This commit is contained in:
Aaron Son
2026-01-20 14:45:59 -08:00
parent 383b107d79
commit ffef6e92d7
5 changed files with 23 additions and 23 deletions

View File

@@ -155,7 +155,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu
return err
}
_, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb)
_, err = j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb)
if err != nil {
return err
}
@@ -183,7 +183,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu
}
// parse existing journal file
root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb)
root, err := j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb)
if err != nil {
return err
}

View File

@@ -264,16 +264,16 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
defer lock.Unlock()
journalPath := filepath.Join(nomsDir, chunkJournalName)
jornalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666)
journalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666)
if err != nil {
return "", fmt.Errorf("could not open chunk journal file: %w", err)
}
defer jornalFile.Close()
defer journalFile.Close()
noOp := func(o int64, r journalRec) error { return nil }
// First verify that the journal has data loss.
var offset int64
offset, err = processJournalRecords(context.Background(), jornalFile, 0, noOp, nil)
offset, err = processJournalRecords(context.Background(), journalFile, true /* tryTruncate */, 0, noOp, nil)
if err == nil {
// No data loss detected, nothing to do.
return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed")
@@ -283,7 +283,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
}
// Seek back to the start, to perform a full copy.
if _, err = jornalFile.Seek(0, io.SeekStart); err != nil {
if _, err = journalFile.Seek(0, io.SeekStart); err != nil {
return "", fmt.Errorf("could not seek to start of chunk journal file: %w", err)
}
@@ -296,7 +296,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
if err != nil {
return "", fmt.Errorf("could not create backup of corrupted chunk journal file: %w", err)
}
if _, err = io.Copy(saveFile, jornalFile); err != nil {
if _, err = io.Copy(saveFile, journalFile); err != nil {
return "", fmt.Errorf("could not backup corrupted chunk journal file: %w", err)
}
@@ -305,10 +305,10 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
}
// Now truncate the journal file to the last known good offset.
if err = jornalFile.Truncate(offset); err != nil {
if err = journalFile.Truncate(offset); err != nil {
return "", fmt.Errorf("could not truncate corrupted chunk journal file: %w", err)
}
if err = jornalFile.Sync(); err != nil {
if err = journalFile.Sync(); err != nil {
return "", fmt.Errorf("could not sync truncated chunk journal file: %w", err)
}
@@ -327,7 +327,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
//
// The |warningsCb| callback is called with any errors encountered that we automatically recover from. This allows the caller
// to handle the situation in a context specific way.
func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) {
func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) {
var (
buf []byte
err error
@@ -444,7 +444,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
// When we have a real file, we truncate anything which is beyond the current offset. Historically we put
// null bytes there, and there have been cases of garbage data being present instead of nulls. If there is any
// data beyond the current offset which we can parse and looks like data loss, we would have errored out above.
if f, ok := r.(*os.File); ok {
if f, ok := r.(*os.File); ok && tryTruncate {
err = f.Truncate(off)
if err != nil {
return 0, err

View File

@@ -116,7 +116,7 @@ func TestProcessJournalRecords(t *testing.T) {
}
var recoverErr error
n, err := processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e })
n, err := processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e })
assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
require.NoError(t, err)
@@ -125,7 +125,7 @@ func TestProcessJournalRecords(t *testing.T) {
// write a bogus record to the end and verify that we don't get an error
i, sum = 0, 0
writeCorruptJournalRecord(journal[off:])
n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e })
n, err = processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e })
require.NoError(t, err)
assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
@@ -227,7 +227,7 @@ func TestJournalForDataLoss(t *testing.T) {
}
var recoverErr error
_, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), 0, check, func(e error) { recoverErr = e })
_, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e })
if td.lossExpected {
require.Error(t, err)
@@ -288,7 +288,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) {
// no confidence in the rest of the test.
ctx := context.Background()
var recoverErr error
bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e })
bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e })
require.NoError(t, err)
require.Equal(t, off, uint32(bytesRead))
require.Error(t, recoverErr) // We do expect a warning here, but no data loss.
@@ -312,7 +312,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) {
// Copy lost data into journal buffer at the test offset.
copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData)
_, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e })
_, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e })
require.Error(t, err)
require.True(t, errors.Is(err, ErrJournalDataLoss))
require.Error(t, recoverErr)
@@ -457,7 +457,7 @@ func processJournalAndCollectRecords(t *testing.T, journalData []byte) []testRec
t.FailNow()
}
_, err := processJournalRecords(ctx, bytes.NewReader(journalData), 0, func(offset int64, rec journalRec) error {
_, err := processJournalRecords(ctx, bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error {
records = append(records, testRecord{hash: rec.address, kind: rec.kind})
return nil
}, warnCb)

View File

@@ -163,7 +163,7 @@ var _ io.Closer = &journalWriter{}
// are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we
// extend the journal index with one metadata flush before existing this function to save indexing
// progress.
func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) {
func (wr *journalWriter) bootstrapJournal(ctx context.Context, canWrite bool, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) {
wr.lock.Lock()
defer wr.lock.Unlock()
@@ -276,7 +276,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer
// process the non-indexed portion of the journal starting at |wr.indexed|,
// at minimum the non-indexed portion will include a root hash record.
// Index lookups are added to the ongoing batch to re-synchronize.
wr.off, err = processJournalRecords(ctx, wr.journal, wr.indexed, func(o int64, r journalRec) error {
wr.off, err = processJournalRecords(ctx, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error {
switch r.kind {
case chunkJournalRecKind:
rng := Range{

View File

@@ -185,7 +185,7 @@ func newTestJournalWriter(t *testing.T, path string) *journalWriter {
j, err := createJournalWriter(ctx, path)
require.NoError(t, err)
require.NotNil(t, j)
_, err = j.bootstrapJournal(ctx, nil, nil)
_, err = j.bootstrapJournal(ctx, true, nil, nil)
require.NoError(t, err)
return j
}
@@ -220,7 +220,7 @@ func TestJournalWriterBootstrap(t *testing.T) {
j, _, err := openJournalWriter(ctx, path)
require.NoError(t, err)
reflogBuffer := newReflogRingBuffer(10)
last, err = j.bootstrapJournal(ctx, reflogBuffer, nil)
last, err = j.bootstrapJournal(ctx, true, reflogBuffer, nil)
require.NoError(t, err)
assertExpectedIterationOrder(t, reflogBuffer, []string{last.String()})
@@ -363,7 +363,7 @@ func TestJournalIndexBootstrap(t *testing.T) {
require.NoError(t, err)
require.True(t, ok)
// bootstrap journal and validate chunk records
last, err := journal.bootstrapJournal(ctx, nil, nil)
last, err := journal.bootstrapJournal(ctx, true, nil, nil)
assert.NoError(t, err)
for _, e := range expected {
var act CompressedChunk
@@ -398,7 +398,7 @@ func TestJournalIndexBootstrap(t *testing.T) {
jnl, ok, err := openJournalWriter(ctx, idxPath)
require.NoError(t, err)
require.True(t, ok)
_, err = jnl.bootstrapJournal(ctx, nil, nil)
_, err = jnl.bootstrapJournal(ctx, true, nil, nil)
assert.Error(t, err)
})
}