Remove usage of ReadAll in journal dataloss checking

This commit is contained in:
Neil Macneale IV
2025-11-24 18:19:28 -08:00
parent a84723fe91
commit 108a2f1d8e
2 changed files with 138 additions and 10 deletions

View File

@@ -362,8 +362,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
}
}
if dataLossFound {
err = fmt.Errorf("possible data loss detected in journal file at offset %d", off)
return 0, err
return 0, NewJournalDataLossError(off)
}
}
@@ -376,25 +375,50 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
return off, nil
}
var ErrJournalDataLoss = errors.New("corrupted journal")
func NewJournalDataLossError(offset int64) error {
return fmt.Errorf("possible data loss detected in journal file at offset %d: %w", offset, ErrJournalDataLoss)
}
// possibleDataLossCheck checks for non-zero data starting at |off| in |r|. When calling this method, we've already hit
// as state where we can't read a record at |off|, so we are going to scan forward from |off| to see if there is any
// valid root record followed by another record (root hash or chunk).
func possibleDataLossCheck(reader *bufio.Reader) (dataLoss bool, err error) {
firstRootFound := false
remainingData, err := io.ReadAll(reader)
if err != nil {
atEOF := false
bufferPrefix := 0
const buffSize = journalWriterBuffSize * 2
buf := make([]byte, buffSize)
ReadBatchGoto:
n, err := io.ReadFull(reader, buf[bufferPrefix:])
switch err {
case nil:
// Got a full buffer. Let's go.
case io.ErrUnexpectedEOF:
// Final short read before EOF: n bytes valid
atEOF = true
buf = buf[:bufferPrefix+n]
case io.EOF:
// ReadFull only returns EOF if no bytes were read. We may have plenty of unprocessed data in buf.
atEOF = true
buf = buf[:bufferPrefix]
default:
return false, err
}
idx := 0
for idx <= len(remainingData)-rootHashRecordSize() {
sz := readUint32(remainingData[idx : idx+uint32Size])
for idx <= len(buf)-rootHashRecordSize() {
sz := readUint32(buf[idx : idx+uint32Size])
if sz > 0 && sz <= journalWriterBuffSize {
// in the right range.
if int(sz) <= len(remainingData[idx:]) {
if int(sz) <= len(buf[idx:]) {
// try to validate it.
candidate := remainingData[idx : idx+int(sz)]
candidate := buf[idx : idx+int(sz)]
e := validateJournalRecord(candidate)
if e == nil {
record, err := readJournalRecord(candidate)
@@ -403,7 +427,7 @@ func possibleDataLossCheck(reader *bufio.Reader) (dataLoss bool, err error) {
return false, err
}
if firstRootFound {
// found the second record!
// found the second record! => possible data loss
return true, nil
}
if record.kind == rootHashJournalRecKind {
@@ -414,11 +438,32 @@ func possibleDataLossCheck(reader *bufio.Reader) (dataLoss bool, err error) {
idx += int(sz)
continue
}
} else {
// Not enough data to validate this record. Break out to try to read more data. Interestingly, if you are
// looking at random data, and you've parsed a size which is one byte shy of the max size (5Mb), this means
// we only got halfway through the buffer, and we're forced to perform a largish copy of the remaining data to the front
// of the buffer.
if !atEOF {
// We can read more data. Data shifted after the loop then we'll hit that sweet goto.
break
}
// We are at EOF, so we can't read more data. Just end the scan byte for byte.
// We got a length which is in the range, but there isn't enough data in the file so we just assume
// the current position isn't the start of a valid record.
}
}
idx++
}
if !atEOF {
// Shift remaining data to front of buffer and read more.
remaining := len(buf) - idx
copy(buf[0:], buf[idx:idx+remaining])
bufferPrefix = remaining
goto ReadBatchGoto
}
return false, nil
}

View File

@@ -17,6 +17,7 @@ package nbs
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"testing"
@@ -244,6 +245,88 @@ func TestJournalForDataLoss(t *testing.T) {
}
}
func TestJournalForDataLossOnBoundary(t *testing.T) {
r := rand.New(rand.NewSource(987654321))
// The data loss detection logic has some special cases around buffer boundaries to avoid reading all data into
// memory at once. This test constructs a journal which starts with a few valid records, then a bunch of garbage
// data, then we'll stick two records which constitute data loss on the buffer boundary at each byte of the valid buffer.
//
// The data loss code uses a 10 Mb buffer, so we'll make a 10 Mb journal plus 1 Kb.
bufSz := 2*journalWriterBuffSize + (1 << 10)
// Pre generate random buffer to speed this process up.
randBuf := make([]byte, bufSz)
r.Read(randBuf)
nullBuf := make([]byte, bufSz)
for i := range nullBuf {
nullBuf[i] = 0
}
type backerType struct {
buf []byte
name string
}
var backers = []backerType{
{buf: randBuf, name: "randBuf"},
{buf: nullBuf, name: "nullBuf"},
}
for _, backer := range backers {
journalBuf := make([]byte, bufSz)
copy(journalBuf, randBuf)
var rec journalRec
var off uint32
rec, _ = makeRootHashRecord()
off += writeRootHashRecord(journalBuf[:], rec.address)
rec, _ = makeChunkRecord()
off += writeChunkRecord(journalBuf[off:], mustCompressedChunk(rec))
check := func(o int64, r journalRec) (_ error) { return nil }
// Verify that the journal as it exists at this moment does not trigger data loss. If it did, we'd have
// no confidence in the rest of the test.
ctx := context.Background()
var recoverErr error
bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e })
require.NoError(t, err)
require.Equal(t, off, uint32(bytesRead))
require.Error(t, recoverErr) // We do expect a warning here, but no data loss.
// TODO: create a lost chunk which is slightly smaller than journalWriterBuffSize.
lostData := make([]byte, 1<<10) // lost data is only ~180 bytes. Allocate 1K.
off = 0
rec, _ = makeRootHashRecord()
off += writeRootHashRecord(lostData[:], rec.address)
rec, _ = makeChunkRecord()
off += writeChunkRecord(lostData[off:], mustCompressedChunk(rec))
lostData = lostData[:off]
t.Run(backer.name, func(t *testing.T) {
// startPoint and endPoint define the range of offsets to test for data loss on the boundary.
startPoint := (2 * journalWriterBuffSize) - uint32(len(lostData)) - uint32(rootHashRecordSize())
// endPoint puts the fist byte of lostData 10 bytes after the 10 Mb read.
endPoint := (2 * journalWriterBuffSize) + uint32(rootHashRecordSize())
for startPoint <= endPoint {
// Copy lost data into journal buffer at the test offset.
copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData)
_, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e })
require.Error(t, err)
require.True(t, errors.Is(err, ErrJournalDataLoss))
require.Error(t, recoverErr)
// Reset the journal buffer to original status.
copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], backer.buf[startPoint:startPoint+uint32(len(lostData))])
// Testing every option takes a couple hours. Don't use `r` because we want this to be non-deterministic.
startPoint += uint32(rand.Intn(38) + 1) // Don't want to skip rootHashRecordSize() entirely.
}
})
}
}
func randomMemTable(cnt int) (*memTable, map[hash.Hash]chunks.Chunk) {
chnx := make(map[hash.Hash]chunks.Chunk, cnt)
for i := 0; i < cnt; i++ {
@@ -355,7 +438,7 @@ func writeCorruptJournalRecord(buf []byte) (n uint32) {
func mustCompressedChunk(rec journalRec) CompressedChunk {
d.PanicIfFalse(rec.kind == chunkJournalRecKind)
cc, err := NewCompressedChunk(hash.Hash(rec.address), rec.payload)
cc, err := NewCompressedChunk(rec.address, rec.payload)
d.PanicIfError(err)
return cc
}