diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index c405558b43..ab3dd860df 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -155,7 +155,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu return err } - _, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb) + _, err = j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb) if err != nil { return err } @@ -183,7 +183,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu } // parse existing journal file - root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb) + root, err := j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb) if err != nil { return err } diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index 8b0d683d7c..d724ba3549 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -264,16 +264,16 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) defer lock.Unlock() journalPath := filepath.Join(nomsDir, chunkJournalName) - jornalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666) + journalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666) if err != nil { return "", fmt.Errorf("could not open chunk journal file: %w", err) } - defer jornalFile.Close() + defer journalFile.Close() noOp := func(o int64, r journalRec) error { return nil } // First verify that the journal has data loss. var offset int64 - offset, err = processJournalRecords(context.Background(), jornalFile, 0, noOp, nil) + offset, err = processJournalRecords(context.Background(), journalFile, true /* tryTruncate */, 0, noOp, nil) if err == nil { // No data loss detected, nothing to do. return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed") @@ -283,7 +283,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) } // Seek back to the start, to perform a full copy. - if _, err = jornalFile.Seek(0, io.SeekStart); err != nil { + if _, err = journalFile.Seek(0, io.SeekStart); err != nil { return "", fmt.Errorf("could not seek to start of chunk journal file: %w", err) } @@ -296,7 +296,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) if err != nil { return "", fmt.Errorf("could not create backup of corrupted chunk journal file: %w", err) } - if _, err = io.Copy(saveFile, jornalFile); err != nil { + if _, err = io.Copy(saveFile, journalFile); err != nil { return "", fmt.Errorf("could not backup corrupted chunk journal file: %w", err) } @@ -305,10 +305,10 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) } // Now truncate the journal file to the last known good offset. - if err = jornalFile.Truncate(offset); err != nil { + if err = journalFile.Truncate(offset); err != nil { return "", fmt.Errorf("could not truncate corrupted chunk journal file: %w", err) } - if err = jornalFile.Sync(); err != nil { + if err = journalFile.Sync(); err != nil { return "", fmt.Errorf("could not sync truncated chunk journal file: %w", err) } @@ -327,7 +327,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) // // The |warningsCb| callback is called with any errors encountered that we automatically recover from. This allows the caller // to handle the situation in a context specific way. -func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { +func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { var ( buf []byte err error @@ -444,7 +444,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f // When we have a real file, we truncate anything which is beyond the current offset. Historically we put // null bytes there, and there have been cases of garbage data being present instead of nulls. If there is any // data beyond the current offset which we can parse and looks like data loss, we would have errored out above. - if f, ok := r.(*os.File); ok { + if f, ok := r.(*os.File); ok && tryTruncate { err = f.Truncate(off) if err != nil { return 0, err diff --git a/go/store/nbs/journal_record_test.go b/go/store/nbs/journal_record_test.go index 5ca12007e7..84034620e6 100644 --- a/go/store/nbs/journal_record_test.go +++ b/go/store/nbs/journal_record_test.go @@ -116,7 +116,7 @@ func TestProcessJournalRecords(t *testing.T) { } var recoverErr error - n, err := processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e }) + n, err := processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) require.NoError(t, err) @@ -125,7 +125,7 @@ func TestProcessJournalRecords(t *testing.T) { // write a bogus record to the end and verify that we don't get an error i, sum = 0, 0 writeCorruptJournalRecord(journal[off:]) - n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e }) + n, err = processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) @@ -227,7 +227,7 @@ func TestJournalForDataLoss(t *testing.T) { } var recoverErr error - _, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e }) if td.lossExpected { require.Error(t, err) @@ -288,7 +288,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // no confidence in the rest of the test. ctx := context.Background() var recoverErr error - bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e }) + bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) require.Equal(t, off, uint32(bytesRead)) require.Error(t, recoverErr) // We do expect a warning here, but no data loss. @@ -312,7 +312,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // Copy lost data into journal buffer at the test offset. copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData) - _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.Error(t, err) require.True(t, errors.Is(err, ErrJournalDataLoss)) require.Error(t, recoverErr) @@ -457,7 +457,7 @@ func processJournalAndCollectRecords(t *testing.T, journalData []byte) []testRec t.FailNow() } - _, err := processJournalRecords(ctx, bytes.NewReader(journalData), 0, func(offset int64, rec journalRec) error { + _, err := processJournalRecords(ctx, bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error { records = append(records, testRecord{hash: rec.address, kind: rec.kind}) return nil }, warnCb) diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index a462d54e4b..f9b16824ee 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -163,7 +163,7 @@ var _ io.Closer = &journalWriter{} // are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we // extend the journal index with one metadata flush before existing this function to save indexing // progress. -func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) { +func (wr *journalWriter) bootstrapJournal(ctx context.Context, canWrite bool, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) { wr.lock.Lock() defer wr.lock.Unlock() @@ -276,7 +276,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer // process the non-indexed portion of the journal starting at |wr.indexed|, // at minimum the non-indexed portion will include a root hash record. // Index lookups are added to the ongoing batch to re-synchronize. - wr.off, err = processJournalRecords(ctx, wr.journal, wr.indexed, func(o int64, r journalRec) error { + wr.off, err = processJournalRecords(ctx, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error { switch r.kind { case chunkJournalRecKind: rng := Range{ diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go index 27a540e45c..cea880820f 100644 --- a/go/store/nbs/journal_writer_test.go +++ b/go/store/nbs/journal_writer_test.go @@ -185,7 +185,7 @@ func newTestJournalWriter(t *testing.T, path string) *journalWriter { j, err := createJournalWriter(ctx, path) require.NoError(t, err) require.NotNil(t, j) - _, err = j.bootstrapJournal(ctx, nil, nil) + _, err = j.bootstrapJournal(ctx, true, nil, nil) require.NoError(t, err) return j } @@ -220,7 +220,7 @@ func TestJournalWriterBootstrap(t *testing.T) { j, _, err := openJournalWriter(ctx, path) require.NoError(t, err) reflogBuffer := newReflogRingBuffer(10) - last, err = j.bootstrapJournal(ctx, reflogBuffer, nil) + last, err = j.bootstrapJournal(ctx, true, reflogBuffer, nil) require.NoError(t, err) assertExpectedIterationOrder(t, reflogBuffer, []string{last.String()}) @@ -363,7 +363,7 @@ func TestJournalIndexBootstrap(t *testing.T) { require.NoError(t, err) require.True(t, ok) // bootstrap journal and validate chunk records - last, err := journal.bootstrapJournal(ctx, nil, nil) + last, err := journal.bootstrapJournal(ctx, true, nil, nil) assert.NoError(t, err) for _, e := range expected { var act CompressedChunk @@ -398,7 +398,7 @@ func TestJournalIndexBootstrap(t *testing.T) { jnl, ok, err := openJournalWriter(ctx, idxPath) require.NoError(t, err) require.True(t, ok) - _, err = jnl.bootstrapJournal(ctx, nil, nil) + _, err = jnl.bootstrapJournal(ctx, true, nil, nil) assert.Error(t, err) }) }