mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-30 03:26:47 -05:00
PR Feedback: Making processJournalRecords() less aggressive on error'ing out for invalid journal records, to preserve graceful crash consistency
This commit is contained in:
@@ -43,7 +43,6 @@ const (
|
||||
EnvDoltAuthorDate = "DOLT_AUTHOR_DATE"
|
||||
EnvDoltCommitterDate = "DOLT_COMMITTER_DATE"
|
||||
EnvDbNameReplace = "DOLT_DBNAME_REPLACE"
|
||||
EnvSkipInvalidJournalRecords = "DOLT_SKIP_INVALID_JOURNAL_RECORDS"
|
||||
EnvDoltRootHost = "DOLT_ROOT_HOST"
|
||||
EnvDoltRootPassword = "DOLT_ROOT_PASSWORD"
|
||||
)
|
||||
|
||||
@@ -22,10 +22,10 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
@@ -227,7 +227,7 @@ func readJournalRecord(buf []byte) (rec journalRec, err error) {
|
||||
|
||||
// validateJournalRecord performs some sanity checks on the buffer |buf| containing a journal
|
||||
// record, such as checking that the length of the record is not too short, and checking the
|
||||
// checksum. If any problems are detected, an erorr is returned, otherwise nil is returned.
|
||||
// checksum. If any problems are detected, an error is returned, otherwise nil is returned.
|
||||
func validateJournalRecord(buf []byte) error {
|
||||
if len(buf) < (journalRecLenSz + journalRecChecksumSz) {
|
||||
return fmt.Errorf("invalid journal record: buffer length too small (%d < %d)", len(buf), (journalRecLenSz + journalRecChecksumSz))
|
||||
@@ -251,6 +251,12 @@ func validateJournalRecord(buf []byte) error {
|
||||
// processJournalRecords iterates over a chunk journal's records by reading from disk using |r|, starting at
|
||||
// offset |off|, and calls the callback function |cb| with each journal record. The offset where reading was stopped
|
||||
// is returned, or any error encountered along the way.
|
||||
// If an invalid journal record is found, it is not included and this function stops processing journal
|
||||
// entries, but does not return an error. Journal records may be incomplete if the system crashes while
|
||||
// records are being persisted to disk. This isn't likely, but the OS filesystem write is not an atomic
|
||||
// operation, so it's possible to leave the journal in a corrupted state. We must gracefully recover
|
||||
// without preventing the server from starting up, so we are careful to only return the journal file
|
||||
// offset that points to end fo the last valid record.
|
||||
func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error) (int64, error) {
|
||||
var (
|
||||
buf []byte
|
||||
@@ -266,11 +272,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
|
||||
for {
|
||||
// peek to read next record size
|
||||
if buf, err = rdr.Peek(uint32Size); err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// The first 4 bytes in the journal record are the total length of the record (including
|
||||
@@ -285,35 +287,44 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
|
||||
}
|
||||
|
||||
if buf, err = rdr.Peek(int(l)); err != nil {
|
||||
return 0, err
|
||||
break
|
||||
}
|
||||
|
||||
if err = validateJournalRecord(buf); err != nil {
|
||||
// If the DOLT_SKIP_INVALID_JOURNAL_RECORDS env var is set, then we stop reading the journal
|
||||
// as soon as we hit an invalid record. This allows users to opt-in to the previous behavior
|
||||
// where we process as many journal records we can, but stop once we hit an invalid record.
|
||||
if os.Getenv(dconfig.EnvSkipInvalidJournalRecords) != "" {
|
||||
break
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
// TODO: The NomsBlockStore manifest is updated when the sql engine is shutdown cleanly. In the clean shutdown
|
||||
// case, we have the root hash value and the number of chunks written to the journal from the manifest
|
||||
// and we could use that to more aggressively validate journal records. When we are starting up from a
|
||||
// clean shutdown, we expect all journal records to be valid, and could safely error out during startup
|
||||
// for invalid records.
|
||||
if validationErr := validateJournalRecord(buf); validationErr != nil {
|
||||
// NOTE: We don't assign the validation error to err, because we want to stop processing journal records
|
||||
// when we see an invalid record and return successfully from processJournalRecords(), so that only
|
||||
// the preceding, valid records in the journal are used.
|
||||
logrus.Errorf("Error validating journal record; "+
|
||||
"skipping remaining journal records past offset %d: %s", off, validationErr)
|
||||
break
|
||||
}
|
||||
|
||||
var rec journalRec
|
||||
if rec, err = readJournalRecord(buf); err != nil {
|
||||
return 0, err
|
||||
break // failed to read valid record
|
||||
}
|
||||
if err = cb(off, rec); err != nil {
|
||||
return 0, err
|
||||
break
|
||||
}
|
||||
|
||||
// advance |rdr| state by |l| bytes
|
||||
if _, err = io.ReadFull(rdr, buf); err != nil {
|
||||
return 0, err
|
||||
break
|
||||
}
|
||||
off += int64(len(buf))
|
||||
}
|
||||
|
||||
// If a non-EOF error was captured while processing journal records, return a
|
||||
// journal offset of 0 and the error, which will cause startup to halt.
|
||||
if err != nil && err != io.EOF {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// reset the file pointer to end of the last
|
||||
// successfully processed journal record
|
||||
if _, err = r.Seek(off, 0); err != nil {
|
||||
|
||||
@@ -18,14 +18,12 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
@@ -120,22 +118,10 @@ func TestProcessJournalRecords(t *testing.T) {
|
||||
assert.Equal(t, int(off), int(n))
|
||||
require.NoError(t, err)
|
||||
|
||||
// write a bogus record to the end and verify that we get an error
|
||||
// write a bogus record to the end and verify that we don't get an error
|
||||
i, sum = 0, 0
|
||||
writeCorruptJournalRecord(journal[off:])
|
||||
n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "CRC checksum does not match")
|
||||
assert.Equal(t, cnt, i)
|
||||
// Since an error was encountered, the returned offset is 0
|
||||
assert.Equal(t, 0, int(n))
|
||||
|
||||
// Turn on the env setting to stop processing journal records once we hit an invalid record
|
||||
require.NoError(t, os.Setenv(dconfig.EnvSkipInvalidJournalRecords, "1"))
|
||||
i, sum = 0, 0
|
||||
// write a bogus record to the end and process again
|
||||
writeCorruptJournalRecord(journal[off:])
|
||||
n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, cnt, i)
|
||||
assert.Equal(t, int(off), int(n))
|
||||
|
||||
Reference in New Issue
Block a user