Add dataloss checking to journal parsing

This commit is contained in:
Neil Macneale IV
2025-11-17 13:16:09 -08:00
parent 9b24ca378d
commit d672608348
2 changed files with 218 additions and 25 deletions

View File

@@ -265,7 +265,10 @@ func (e *CorruptJournalRecortError) Error() string {
// operation, so it's possible to leave the journal in a corrupted state. We must gracefully recover
// without preventing the server from starting up, so we are careful to only return the journal file
// offset that points to end of the last valid record.
func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, valCb func(error)) (int64, error) {
//
// The |recoveredCb| callback is called with any errors encountered that we automatically recover from. This allows the caller
// to handle the situation in a context specific way.
func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, recoveredCb func(error)) (int64, error) {
var (
buf []byte
err error
@@ -276,10 +279,16 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
return 0, err
}
// There are a few ways we can recover from journals which seem truncated or corrupted, but we want to be sure
// to scan to the end of the file in these cases to ensure there is no indication of data loss.
recovered := false
rdr := bufio.NewReaderSize(r, journalWriterBuffSize)
for {
// peek to read next record size
if buf, err = rdr.Peek(uint32Size); err != nil {
// If we hit EOF here, it's expected. We can have no more than 3 bytes of data left, and we don't really have
// a way to determine if that was valid data or just padding. So we simply stop scanning.
break
}
@@ -291,24 +300,28 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
// length of 0, we know we've reached the end of the journal records and are starting to
// read the zero padding.
if l == 0 {
recovered = true
break
}
if l > journalWriterBuffSize {
if valCb != nil {
jErr := &CorruptJournalRecortError{
Data: buf,
Offset: off,
Why: fmt.Sprintf("invalid journal record length: %d exceeds max allowed size of %d", l, journalWriterBuffSize),
}
// We've run into an invalid journal record length, so we stop processing further records allowing
// the system to start, but we report the error via the validation callback.
valCb(jErr)
// Probably hit a corrupted record. Report error and stop processing.
if recoveredCb != nil {
// We don't assign this error to err, because we want to recover from this.
jErr := fmt.Errorf("invalid journal record length: %d exceeds max allowed size of %d", l, journalWriterBuffSize)
recoveredCb(jErr)
}
recovered = true
break
}
if buf, err = rdr.Peek(int(l)); err != nil {
if recoveredCb != nil {
// We probably wend off the end of the file. Report error and recover.
jErr := fmt.Errorf("failed to read full journal record of length %d at offset %d: %w", l, off, err)
recoveredCb(jErr)
}
recovered = true
break
}
@@ -318,18 +331,12 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
// clean shutdown, we expect all journal records to be valid, and could safely error out during startup
// for invalid records.
if validationErr := validateJournalRecord(buf); validationErr != nil {
if valCb != nil {
jErr := &CorruptJournalRecortError{
Data: buf,
Offset: off,
Why: validationErr.Error(),
}
// NOTE: We don't assign the validation error to err, because we want to stop processing journal records
// when we see an invalid record and return successfully from processJournalRecords(), so that only
// the preceding, valid records in the journal are used.
valCb(jErr)
if recoveredCb != nil {
// We don't assign the validation error to err, because we want to recover from this.
jErr := fmt.Errorf("invalid journal record at offset %d: %w", off, validationErr)
recoveredCb(jErr)
}
recovered = true
break
}
@@ -354,14 +361,74 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
return 0, err
}
// If we hit a recovery state, we want to check if there is any parseable data in the remainder of the journal.
if recovered {
notOk, _ := possibleDataLossCheck(rdr)
if notOk {
err = fmt.Errorf("possible data loss detected in journal file at offset %d", off)
return 0, err
}
}
// reset the file pointer to end of the last
// successfully processed journal record
if _, err = r.Seek(off, 0); err != nil {
return 0, err
}
return off, nil
}
// possibleDataLossCheck checks for non-zero data starting at |off| in |r|. When calling this method, we've already hit
// as state where we can't read a record at |off|, so we are going to scan forward from |off| to see if there is any
// valid root record followed by another record (root hash or chunk).
func possibleDataLossCheck(reader *bufio.Reader) (dataLoss bool, err error) {
firstRootFound := false
secondRecordFound := false
remainingData, err := io.ReadAll(reader)
if err != nil {
return false, err
}
idx := 0
for idx <= len(remainingData)-rootHashRecordSize() {
sz := readUint32(remainingData[idx : idx+uint32Size])
if sz > 0 && sz <= journalWriterBuffSize {
// in the right range.
if int(sz) <= len(remainingData[idx:]) {
// try to validate it.
candidate := remainingData[idx : idx+int(sz)]
e := validateJournalRecord(candidate)
if e == nil {
record, err := readJournalRecord(candidate)
if err != nil {
// Unexpected, since we already validated it.
return false, err
}
if firstRootFound {
// found the second record!
secondRecordFound = true
break
}
if record.kind == rootHashJournalRecKind {
// found the first root hash record
firstRootFound = true
}
// If we have a valid record, skip ahead by its size even if it's not an interesting record.
idx += int(sz)
continue
}
}
}
// Found unparsable data. Reset any found root hash state and continue scanning.
firstRootFound = false
idx++
}
return firstRootFound && secondRecordFound, nil
}
func peekRootHashAt(journal io.ReaderAt, offset int64) (root hash.Hash, err error) {
expSz := rootHashRecordSize()
buf := make([]byte, expSz) // assumes len(rec) is exactly rootHashRecordSize
@@ -374,8 +441,15 @@ func peekRootHashAt(journal io.ReaderAt, offset int64) (root hash.Hash, err erro
err = fmt.Errorf("invalid root hash record at %d: %d", offset, n)
return
}
return rootHashFromBuffer(buf, offset)
}
// peekRootHashFromBuffer extracts the root hash from a root hash journal record stored in |buf|. The buffer must always
// be exactly the size of a root hash record (i.e. rootHashRecordSize()), and the CRC is checked before parsing. |offset|
// is only used for error reporting, so it should be relative to the start of the journal file.
func rootHashFromBuffer(buf []byte, offset int64) (root hash.Hash, err error) {
sz := readUint32(buf)
if sz > uint32(expSz) {
if sz > uint32(rootHashRecordSize()) {
err = fmt.Errorf("invalid root hash record size at %d", offset)
return
}
@@ -392,7 +466,7 @@ func peekRootHashAt(journal io.ReaderAt, offset int64) (root hash.Hash, err erro
err = fmt.Errorf("expected root hash record, got kind: %d", rec.kind)
return
}
return hash.Hash(rec.address), nil
return rec.address, nil
}
func readUint32(buf []byte) uint32 {

View File

@@ -17,6 +17,7 @@ package nbs
import (
"bytes"
"context"
"fmt"
"math/rand"
"testing"
"time"
@@ -113,18 +114,136 @@ func TestProcessJournalRecords(t *testing.T) {
return
}
n, err := processJournalRecords(ctx, bytes.NewReader(journal), 0, check)
var recoverErr error
n, err := processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e })
assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
require.NoError(t, err)
require.NoError(t, recoverErr)
// write a bogus record to the end and verify that we don't get an error
i, sum = 0, 0
writeCorruptJournalRecord(journal[off:])
n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check)
n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e })
require.NoError(t, err)
assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
require.Error(t, recoverErr)
}
func TestJournalForDataLoss(t *testing.T) {
type section byte
const (
root section = iota // root hash record
chnk // chunk record
garb // garbage data.
null // null bytes.
)
type journalDesc struct {
lossExpected bool
recoveryExpected bool
rootsExpected int
chunksExpected int
sections []section
}
tests := []journalDesc{
// Normal cases - records followed by 4 null bytes, EOF, or null bytes then garbage
{false, false, 1, 0, []section{root}},
{false, false, 1, 0, []section{root, null, null, null}},
{false, false, 2, 1, []section{root, chnk, root, null, chnk, chnk}}, // No roots after the null bytes.
{false, false, 1, 0, []section{root, null, root}}, // a single root is not data loss - needs to be followed by a valid record.
{false, false, 1, 0, []section{root, null, garb, null, root, null}},
{false, false, 1, 0, []section{root, null, garb, null, root, garb}},
// Recovery cases when non-null bytes immediately follow a a record or any type.
{false, true, 1, 0, []section{root, garb, garb, garb}},
{false, true, 1, 2, []section{root, chnk, chnk, garb}}, // valid chunks still get reported to callback, even if they aren't followed by a root record.
{false, true, 1, 0, []section{root, garb, null, chnk, chnk}},
// Data loss cases. Any mystery data which has a sequence of a parsable root followed by any parsable records is data loss.
{true, false, 1, 0, []section{root, null, root, chnk}},
{true, false, 2, 1, []section{root, chnk, root, null, root, chnk, chnk, chnk}},
{true, true, 1, 0, []section{root, garb, root, chnk}},
{true, false, 1, 0, []section{root, null, root, root}},
{true, true, 1, 0, []section{root, garb, root, root}},
{true, false, 1, 0, []section{root, null, root, chnk, chnk, null}},
{true, true, 1, 0, []section{root, garb, root, chnk, chnk, garb}},
// edge cases where there are we see multiple roots, but they have garbage between them.
{false, false, 1, 0, []section{root, null, root, null, root, null, root, garb}},
{false, false, 1, 0, []section{root, null, root, garb, root, garb, root, garb}},
{false, true, 1, 0, []section{root, garb, root, null, root, null, root, null}},
// Chunks in the suffix garbage shouldn't matter.
{false, false, 1, 0, []section{root, null, chnk, chnk, chnk}},
{false, false, 1, 0, []section{root, null, chnk, chnk, chnk, root}},
}
journalRecordTimestampGenerator = testTimestampGenerator
rnd := rand.New(rand.NewSource(123454321))
for ti, td := range tests {
_ = td
t.Run(fmt.Sprintf("data check %d", ti), func(t *testing.T) {
ctx := context.Background()
journal := make([]byte, 1<<20) // 1 MB should be plenty for these tests.
var off uint32
for _, section := range td.sections {
var r journalRec
switch section {
case root:
r, _ = makeRootHashRecord()
off += writeRootHashRecord(journal[off:], r.address)
case chnk:
r, _ = makeChunkRecord()
off += writeChunkRecord(journal[off:], mustCompressedChunk(r))
case garb:
n := uint32(rnd.Intn(256) + 256)
rnd.Read(journal[off : off+n])
off += n
case null:
n := uint32(rand.Intn(256) + 256)
for i := uint32(0); i < n; i++ {
journal[off+i] = 0
}
off += n
}
}
// When we go into the recovery state, we should not call the call back for any more records.
// Verify that here with counters of each record type.
chunksFound := 0
rootsFound := 0
check := func(o int64, r journalRec) (_ error) {
switch r.kind {
case rootHashJournalRecKind:
rootsFound++
case chunkJournalRecKind:
chunksFound++
}
return
}
var recoverErr error
_, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), 0, check, func(e error) { recoverErr = e })
if td.lossExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, td.chunksExpected, chunksFound)
require.Equal(t, td.rootsExpected, rootsFound)
if td.recoveryExpected {
require.Error(t, recoverErr)
} else {
require.NoError(t, recoverErr)
}
})
}
}
func randomMemTable(cnt int) (*memTable, map[hash.Hash]chunks.Chunk) {