diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index b03acf95e2..19bf7dcc1d 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -362,8 +362,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f } } if dataLossFound { - err = fmt.Errorf("possible data loss detected in journal file at offset %d", off) - return 0, err + return 0, NewJournalDataLossError(off) } } @@ -376,25 +375,50 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f return off, nil } +var ErrJournalDataLoss = errors.New("corrupted journal") + +func NewJournalDataLossError(offset int64) error { + return fmt.Errorf("possible data loss detected in journal file at offset %d: %w", offset, ErrJournalDataLoss) +} + // possibleDataLossCheck checks for non-zero data starting at |off| in |r|. When calling this method, we've already hit // as state where we can't read a record at |off|, so we are going to scan forward from |off| to see if there is any // valid root record followed by another record (root hash or chunk). func possibleDataLossCheck(reader *bufio.Reader) (dataLoss bool, err error) { firstRootFound := false - remainingData, err := io.ReadAll(reader) - if err != nil { + atEOF := false + + bufferPrefix := 0 + const buffSize = journalWriterBuffSize * 2 + + buf := make([]byte, buffSize) + +ReadBatchGoto: + n, err := io.ReadFull(reader, buf[bufferPrefix:]) + switch err { + case nil: + // Got a full buffer. Let's go. + case io.ErrUnexpectedEOF: + // Final short read before EOF: n bytes valid + atEOF = true + buf = buf[:bufferPrefix+n] + case io.EOF: + // ReadFull only returns EOF if no bytes were read. We may have plenty of unprocessed data in buf. + atEOF = true + buf = buf[:bufferPrefix] + default: return false, err } idx := 0 - for idx <= len(remainingData)-rootHashRecordSize() { - sz := readUint32(remainingData[idx : idx+uint32Size]) + for idx <= len(buf)-rootHashRecordSize() { + sz := readUint32(buf[idx : idx+uint32Size]) if sz > 0 && sz <= journalWriterBuffSize { // in the right range. - if int(sz) <= len(remainingData[idx:]) { + if int(sz) <= len(buf[idx:]) { // try to validate it. - candidate := remainingData[idx : idx+int(sz)] + candidate := buf[idx : idx+int(sz)] e := validateJournalRecord(candidate) if e == nil { record, err := readJournalRecord(candidate) @@ -403,7 +427,7 @@ func possibleDataLossCheck(reader *bufio.Reader) (dataLoss bool, err error) { return false, err } if firstRootFound { - // found the second record! + // found the second record! => possible data loss return true, nil } if record.kind == rootHashJournalRecKind { @@ -414,11 +438,32 @@ func possibleDataLossCheck(reader *bufio.Reader) (dataLoss bool, err error) { idx += int(sz) continue } + } else { + // Not enough data to validate this record. Break out to try to read more data. Interestingly, if you are + // looking at random data, and you've parsed a size which is one byte shy of the max size (5Mb), this means + // we only got halfway through the buffer, and we're forced to perform a largish copy of the remaining data to the front + // of the buffer. + if !atEOF { + // We can read more data. Data shifted after the loop then we'll hit that sweet goto. + break + } + + // We are at EOF, so we can't read more data. Just end the scan byte for byte. + // We got a length which is in the range, but there isn't enough data in the file so we just assume + // the current position isn't the start of a valid record. } } idx++ } + if !atEOF { + // Shift remaining data to front of buffer and read more. + remaining := len(buf) - idx + copy(buf[0:], buf[idx:idx+remaining]) + bufferPrefix = remaining + goto ReadBatchGoto + } + return false, nil } diff --git a/go/store/nbs/journal_record_test.go b/go/store/nbs/journal_record_test.go index b17d92e0f8..bbf9cfedab 100644 --- a/go/store/nbs/journal_record_test.go +++ b/go/store/nbs/journal_record_test.go @@ -17,6 +17,7 @@ package nbs import ( "bytes" "context" + "errors" "fmt" "math/rand" "testing" @@ -244,6 +245,88 @@ func TestJournalForDataLoss(t *testing.T) { } } +func TestJournalForDataLossOnBoundary(t *testing.T) { + r := rand.New(rand.NewSource(987654321)) + // The data loss detection logic has some special cases around buffer boundaries to avoid reading all data into + // memory at once. This test constructs a journal which starts with a few valid records, then a bunch of garbage + // data, then we'll stick two records which constitute data loss on the buffer boundary at each byte of the valid buffer. + // + // The data loss code uses a 10 Mb buffer, so we'll make a 10 Mb journal plus 1 Kb. + bufSz := 2*journalWriterBuffSize + (1 << 10) + + // Pre generate random buffer to speed this process up. + randBuf := make([]byte, bufSz) + r.Read(randBuf) + + nullBuf := make([]byte, bufSz) + for i := range nullBuf { + nullBuf[i] = 0 + } + + type backerType struct { + buf []byte + name string + } + var backers = []backerType{ + {buf: randBuf, name: "randBuf"}, + {buf: nullBuf, name: "nullBuf"}, + } + for _, backer := range backers { + journalBuf := make([]byte, bufSz) + copy(journalBuf, randBuf) + + var rec journalRec + var off uint32 + rec, _ = makeRootHashRecord() + off += writeRootHashRecord(journalBuf[:], rec.address) + rec, _ = makeChunkRecord() + off += writeChunkRecord(journalBuf[off:], mustCompressedChunk(rec)) + + check := func(o int64, r journalRec) (_ error) { return nil } + + // Verify that the journal as it exists at this moment does not trigger data loss. If it did, we'd have + // no confidence in the rest of the test. + ctx := context.Background() + var recoverErr error + bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e }) + require.NoError(t, err) + require.Equal(t, off, uint32(bytesRead)) + require.Error(t, recoverErr) // We do expect a warning here, but no data loss. + + // TODO: create a lost chunk which is slightly smaller than journalWriterBuffSize. + lostData := make([]byte, 1<<10) // lost data is only ~180 bytes. Allocate 1K. + off = 0 + rec, _ = makeRootHashRecord() + off += writeRootHashRecord(lostData[:], rec.address) + rec, _ = makeChunkRecord() + off += writeChunkRecord(lostData[off:], mustCompressedChunk(rec)) + lostData = lostData[:off] + + t.Run(backer.name, func(t *testing.T) { + // startPoint and endPoint define the range of offsets to test for data loss on the boundary. + startPoint := (2 * journalWriterBuffSize) - uint32(len(lostData)) - uint32(rootHashRecordSize()) + // endPoint puts the fist byte of lostData 10 bytes after the 10 Mb read. + endPoint := (2 * journalWriterBuffSize) + uint32(rootHashRecordSize()) + + for startPoint <= endPoint { + // Copy lost data into journal buffer at the test offset. + copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData) + + _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e }) + require.Error(t, err) + require.True(t, errors.Is(err, ErrJournalDataLoss)) + require.Error(t, recoverErr) + + // Reset the journal buffer to original status. + copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], backer.buf[startPoint:startPoint+uint32(len(lostData))]) + + // Testing every option takes a couple hours. Don't use `r` because we want this to be non-deterministic. + startPoint += uint32(rand.Intn(38) + 1) // Don't want to skip rootHashRecordSize() entirely. + } + }) + } +} + func randomMemTable(cnt int) (*memTable, map[hash.Hash]chunks.Chunk) { chnx := make(map[hash.Hash]chunks.Chunk, cnt) for i := 0; i < cnt; i++ { @@ -355,7 +438,7 @@ func writeCorruptJournalRecord(buf []byte) (n uint32) { func mustCompressedChunk(rec journalRec) CompressedChunk { d.PanicIfFalse(rec.kind == chunkJournalRecKind) - cc, err := NewCompressedChunk(hash.Hash(rec.address), rec.payload) + cc, err := NewCompressedChunk(rec.address, rec.payload) d.PanicIfError(err) return cc }