From 71e03088f21bccf3c0078a8315d1e300d6c5391a Mon Sep 17 00:00:00 2001 From: Jason Fulghum Date: Fri, 3 Nov 2023 13:23:20 -0700 Subject: [PATCH 1/6] Updating BATS tests for reflog now that reflog uses a ring buffer to store root references (instead of ignoring root refs once the internal limit was hit) --- integration-tests/bats/reflog.bats | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/integration-tests/bats/reflog.bats b/integration-tests/bats/reflog.bats index 1884361c44..38d5a7e34e 100644 --- a/integration-tests/bats/reflog.bats +++ b/integration-tests/bats/reflog.bats @@ -6,18 +6,22 @@ teardown() { teardown_common } +# Asserts that when DOLT_DISABLE_REFLOG is set, the dolt_reflog() table +# function returns an empty result set with no error. @test "reflog: disabled with DOLT_DISABLE_REFLOG" { export DOLT_DISABLE_REFLOG=true setup_common dolt sql -q "create table t (i int primary key, j int);" dolt sql -q "insert into t values (1, 1), (2, 2), (3, 3)"; dolt commit -Am "initial commit" + dolt commit --allow-empty -m "test commit 1" run dolt sql -q "select * from dolt_reflog();" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 0 ] } +# Sanity check for the most basic case of querying the Dolt reflog @test "reflog: enabled by default" { setup_common dolt sql -q "create table t (i int primary key, j int);" @@ -31,16 +35,22 @@ teardown() { [[ "$output" =~ "Initialize data repository" ]] || false } +# Asserts that when DOLT_REFLOG_RECORD_LIMIT has been set, the reflog only contains the +# most recent entries and is limited by the env var's value. @test "reflog: set DOLT_REFLOG_RECORD_LIMIT" { export DOLT_REFLOG_RECORD_LIMIT=2 setup_common dolt sql -q "create table t (i int primary key, j int);" dolt sql -q "insert into t values (1, 1), (2, 2), (3, 3)"; dolt commit -Am "initial commit" - dolt commit --allow-empty -m "test commit" + dolt commit --allow-empty -m "test commit 1" + dolt commit --allow-empty -m "test commit 2" + # Only the most recent two ref changes should appear in the log run dolt sql -q "select * from dolt_reflog();" [ "$status" -eq 0 ] - [[ "$output" =~ "exceeded reflog record limit" ]] || false - [[ "$output" =~ "Initialize data repository" ]] || false -} \ No newline at end of file + [[ "$output" =~ "test commit 1" ]] || false + [[ "$output" =~ "test commit 2" ]] || false + [[ ! "$output" =~ "initial commit" ]] || false + [[ ! "$output" =~ "Initialize data repository" ]] || false +} From 881346fdb2b9bcc42cf6abb54c781cc68fce46e2 Mon Sep 17 00:00:00 2001 From: Jason Fulghum Date: Fri, 3 Nov 2023 15:44:43 -0700 Subject: [PATCH 2/6] First pass at moving reflog data to a ring buffer --- go/store/nbs/journal.go | 130 ++++++++------------ go/store/nbs/journal_writer.go | 29 ++--- go/store/nbs/journal_writer_test.go | 11 +- go/store/nbs/reflog_ring_buffer.go | 156 ++++++++++++++++++++++++ go/store/nbs/reflog_ring_buffer_test.go | 131 ++++++++++++++++++++ 5 files changed, 356 insertions(+), 101 deletions(-) create mode 100644 go/store/nbs/reflog_ring_buffer.go create mode 100644 go/store/nbs/reflog_ring_buffer_test.go diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index 7dc45e4ad7..87ab6de454 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -23,7 +23,6 @@ import ( "path/filepath" "sort" "strconv" - "sync" "time" "github.com/sirupsen/logrus" @@ -38,27 +37,19 @@ const ( chunkJournalName = chunkJournalAddr // todo ) +// reflogDisabled indicates whether access to the reflog has been disabled and if so, no chunk journal root references +// should be kept in memory. This is controlled by the DOLT_DISABLE_REFLOG env var and this var is ONLY written to +// during initialization. All access after initialization is read-only, so no additional locking is needed. var reflogDisabled = false -var reflogRecordLimit = 100_000 -var loggedReflogMaxSizeWarning = false + +// defaultReflogBufferSize controls how many of the most recent root references for root updates are kept in-memory. +// This default can be overridden by setting the DOLT_REFLOG_RECORD_LIMIT before Dolt starts. +const defaultReflogBufferSize = 5_000 func init() { if os.Getenv(dconfig.EnvDisableReflog) != "" { reflogDisabled = true } - - if limit := os.Getenv(dconfig.EnvReflogRecordLimit); limit != "" { - i, err := strconv.Atoi(limit) - if err != nil { - logrus.Warnf("unable to parse integer value for %s: %s", dconfig.EnvReflogRecordLimit, err.Error()) - } else { - if i <= 0 { - reflogDisabled = true - } else { - reflogRecordLimit = i - } - } - } } // ChunkJournal is a persistence abstraction for a NomsBlockStore. @@ -72,12 +63,9 @@ type ChunkJournal struct { backing *journalManifest persister *fsTablePersister - // mu locks access to the in-memory roots and root timestamp information that is queried by Dolt's reflog - mu sync.Mutex - // roots holds an in-memory representation of the root hashes that have been written to the ChunkJournal - roots []string - // rootTimestamps holds a timestamp for each of the root hashes that have been written to the ChunkJournal - rootTimestamps []time.Time + // reflogRingBuffer holds the most recent roots written to the chunk journal so that they can be + // quickly loaded for reflog queries without having to re-read the journal file from disk. + reflogRingBuffer *reflogRingBuffer } var _ tablePersister = &ChunkJournal{} @@ -94,6 +82,7 @@ func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifes j := &ChunkJournal{path: path, backing: m, persister: p} j.contents.nbfVers = nbfVers + j.reflogRingBuffer = newReflogRingBuffer(reflogBufferSize()) ok, err := fileExists(path) if err != nil { @@ -108,6 +97,33 @@ func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifes return j, nil } +// reflogBufferSize returns the size of the ring buffer to allocate to store in-memory roots references when +// new roots are written to a chunk journal. If reflog queries have been disabled, this function will return 0. +// If the default buffer size has been overridden via DOLT_REFLOG_RECORD_LIMIT, that value will be returned if +// it can be successfully parsed. Otherwise, the default buffer size will be returned. +func reflogBufferSize() int { + if reflogDisabled { + return 0 + } + + reflogBufferSize := defaultReflogBufferSize + if limit := os.Getenv(dconfig.EnvReflogRecordLimit); limit != "" { + i, err := strconv.Atoi(limit) + if err != nil { + logrus.Warnf("unable to parse integer value for %s from %s: %s", + dconfig.EnvReflogRecordLimit, limit, err.Error()) + } else { + if i <= 0 { + reflogDisabled = true + } else { + reflogBufferSize = i + } + } + } + + return reflogBufferSize +} + // bootstrapJournalWriter initializes the journalWriter, which manages access to the // journal file for this ChunkJournal. The bootstrapping process differs depending // on whether a journal file exists at startup time. @@ -133,7 +149,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context) (err error) { return err } - _, _, _, err = j.wr.bootstrapJournal(ctx) + _, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer) if err != nil { return err } @@ -161,16 +177,11 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context) (err error) { } // parse existing journal file - root, roots, rootTimestamps, err := j.wr.bootstrapJournal(ctx) + root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer) if err != nil { return err } - j.mu.Lock() - j.roots = roots - j.rootTimestamps = rootTimestamps - j.mu.Unlock() - mc, err := trueUpBackingManifest(ctx, root, j.backing) if err != nil { return err @@ -214,49 +225,17 @@ func trueUpBackingManifest(ctx context.Context, root hash.Hash, backing *journal // and passes the root and associated timestamp to a callback function, |f|. If |f| returns an error, iteration // is stopped and the error is returned. func (j *ChunkJournal) IterateRoots(f func(root string, timestamp *time.Time) error) error { - roots, rootTimestamps, length, err := j.readCurrentRootsAndTimestamps() - if err != nil { - return err - } - - // journal.roots are stored in chronological order. We need to process them in that order so that we can - // accurately detect the root where a ref was first set to a commit. Note that we are careful to not iterate - // beyond |length| in the slice, otherwise we risk a race condition that would read inconsistent data. - for i := 0; i < length; i++ { + return j.reflogRingBuffer.Iterate(func(entry reflogRootHashEntry) error { // If we're reading a chunk journal written with an older version of Dolt, the root hash journal record may // not have a timestamp value, so we'll have a time.Time instance in its zero value. If we see this, pass // nil instead to signal to callers that there is no valid timestamp available. - var timestamp *time.Time = nil - if time.Time.IsZero(rootTimestamps[i]) == false { - timestamp = &rootTimestamps[i] + var pTimestamp *time.Time = nil + if time.Time.IsZero(entry.timestamp) == false { + pTimestamp = &entry.timestamp } - err := f(roots[i], timestamp) - if err != nil { - return err - } - } - return nil -} - -// readCurrentRootsAndTimestamps grabs the mutex that protects the in-memory root and root timestamps that represent the -// root hash updates in the chunk journal and returns the references to the roots and root timestamps slices, as well as -// the length that can be safely read from them. Callers MUST honor this length and NOT read beyond it in the returned -// slices, otherwise they risk getting inconsistent data (since the chunk journal continues to append entries to these -// slices as new root update journal records are saved). -func (j *ChunkJournal) readCurrentRootsAndTimestamps() (roots []string, rootTimestamps []time.Time, length int, err error) { - j.mu.Lock() - defer j.mu.Unlock() - - roots = j.roots - rootTimestamps = j.rootTimestamps - length = len(roots) - if len(roots) != len(rootTimestamps) { - return nil, nil, -1, fmt.Errorf( - "different number of roots and root timestamps encountered in ChunkJournal") - } - - return roots, rootTimestamps, length, nil + return f(entry.root, pTimestamp) + }) } // Persist implements tablePersister. @@ -383,16 +362,10 @@ func (j *ChunkJournal) Update(ctx context.Context, lastLock addr, next manifestC // Update the in-memory structures so that the ChunkJournal can be queried for reflog data if !reflogDisabled { - j.mu.Lock() - defer j.mu.Unlock() - - if len(j.roots) < reflogRecordLimit { - j.roots = append(j.roots, next.root.String()) - j.rootTimestamps = append(j.rootTimestamps, time.Now()) - } else if !loggedReflogMaxSizeWarning { - loggedReflogMaxSizeWarning = true - logrus.Warnf("exceeded reflog record limit (%d)", reflogRecordLimit) - } + j.reflogRingBuffer.Push(reflogRootHashEntry{ + root: next.root.String(), + timestamp: time.Now(), + }) } return j.contents, nil @@ -433,8 +406,7 @@ func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock addr, next mani // Truncate the in-memory root and root timestamp metadata to the most recent // entry, and double check that it matches the root stored in the manifest. if !reflogDisabled { - j.mu.Lock() - defer j.mu.Unlock() + j.reflogRingBuffer.TruncateToLastRecord() if len(j.roots) == 0 { return manifestContents{}, fmt.Errorf( diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index 4bedf0820b..f18c1eda00 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -22,7 +22,6 @@ import ( "os" "path/filepath" "sync" - "time" "github.com/dolthub/swiss" "github.com/sirupsen/logrus" @@ -162,10 +161,10 @@ type journalWriter struct { var _ io.Closer = &journalWriter{} // bootstrapJournal reads in records from the journal file and the journal index file, initializing -// the state of the journalWriter. It returns the most recent root hash for the journal, as well as -// a slice of all root hash strings in reverse chronological order, and a slice of timestamps that -// indicate the time each returned root was created. -func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, roots []string, times []time.Time, err error) { +// the state of the journalWriter. Root hashes read from root update records in the journal are written +// to |reflogRingBuffer|, which maintains the most recently updated roots which are used to generate the +// reflog. This function returns the most recent root hash for the journal as well as any error encountered. +func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer) (last hash.Hash, err error) { wr.lock.Lock() defer wr.lock.Unlock() @@ -173,7 +172,6 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, wr.maxNovel = journalIndexDefaultMaxNovel } wr.ranges = newRangeIndex() - loggedReflogMaxSizeWarning := false p := filepath.Join(filepath.Dir(wr.path), journalIndexFileName) var ok bool @@ -192,7 +190,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, if ok { var info os.FileInfo if info, err = wr.index.Stat(); err != nil { - return hash.Hash{}, nil, nil, err + return hash.Hash{}, err } // initialize range index with enough capacity to @@ -254,7 +252,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, if cerr := wr.corruptIndexRecovery(ctx); cerr != nil { err = fmt.Errorf("error recovering corrupted chunk journal index: %s", err.Error()) } - return hash.Hash{}, nil, nil, err + return hash.Hash{}, err } wr.ranges = wr.ranges.flatten() } @@ -272,14 +270,11 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, case rootHashJournalRecKind: last = hash.Hash(r.address) - if !reflogDisabled { - if len(roots) < reflogRecordLimit { - roots = append(roots, r.address.String()) - times = append(times, r.timestamp) - } else if !loggedReflogMaxSizeWarning { - loggedReflogMaxSizeWarning = true - logrus.Warnf("journal writer exceeded reflog record limit (%d)", reflogRecordLimit) - } + if !reflogDisabled && reflogRingBuffer != nil { + reflogRingBuffer.Push(reflogRootHashEntry{ + root: r.address.String(), + timestamp: r.timestamp, + }) } default: @@ -288,7 +283,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, return nil }) if err != nil { - return hash.Hash{}, nil, nil, err + return hash.Hash{}, err } return diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go index ced412bd5d..ca164a7cfb 100644 --- a/go/store/nbs/journal_writer_test.go +++ b/go/store/nbs/journal_writer_test.go @@ -184,7 +184,7 @@ func newTestJournalWriter(t *testing.T, path string) *journalWriter { j, err := createJournalWriter(ctx, path) require.NoError(t, err) require.NotNil(t, j) - _, _, _, err = j.bootstrapJournal(ctx) + _, err = j.bootstrapJournal(ctx, nil) require.NoError(t, err) return j } @@ -217,8 +217,10 @@ func TestJournalWriterBootstrap(t *testing.T) { j, _, err := openJournalWriter(ctx, path) require.NoError(t, err) - _, _, _, err = j.bootstrapJournal(ctx) + reflogBuffer := newReflogRingBuffer(10) + last, err = j.bootstrapJournal(ctx, reflogBuffer) require.NoError(t, err) + assertExpectedIterationOrder(t, reflogBuffer, []string{last.String()}) validateAllLookups(t, j, data) @@ -353,7 +355,7 @@ func TestJournalIndexBootstrap(t *testing.T) { require.NoError(t, err) require.True(t, ok) // bootstrap journal and validate chunk records - last, _, _, err := journal.bootstrapJournal(ctx) + last, err := journal.bootstrapJournal(ctx, nil) assert.NoError(t, err) for _, e := range expected { var act CompressedChunk @@ -388,9 +390,8 @@ func TestJournalIndexBootstrap(t *testing.T) { jnl, ok, err := openJournalWriter(ctx, idxPath) require.NoError(t, err) require.True(t, ok) - _, _, _, err = jnl.bootstrapJournal(ctx) + _, err = jnl.bootstrapJournal(ctx, nil) assert.Error(t, err) - }) } } diff --git a/go/store/nbs/reflog_ring_buffer.go b/go/store/nbs/reflog_ring_buffer.go new file mode 100644 index 0000000000..e2211e8c92 --- /dev/null +++ b/go/store/nbs/reflog_ring_buffer.go @@ -0,0 +1,156 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "fmt" + "sync" + "time" +) + +// reflogRootHashEntry is a data container for a root hash update that was recorded to the chunk journal. It contains +// the root and the time at which it was written. +type reflogRootHashEntry struct { + root string + timestamp time.Time +} + +// reflogRingBuffer is a fixed size circular buffer that allows the most recent N entries to be iterated over (where +// N is equal to the size requested when this ring buffer is constructed. Its locking strategy assumes that +// only new entries are written to the head (through Push) and that existing entries will never need to be +// updated. Internally, it allocates a slice that is twice as large as the requested size, so that less locking +// is needed when iterating over entries to read them. +type reflogRingBuffer struct { + items []reflogRootHashEntry + mu *sync.Mutex + requestedSize int + totalSize int + insertIndex int + itemCount int +} + +// newReflogRingBuffer creates a new reflogRingBuffer that allows the reflog to query up to |size| records. +// Internally, the ring buffer allocates extra storage so that |size| records can be read while new root entries +// are still being recorded. +func newReflogRingBuffer(size int) *reflogRingBuffer { + if size < 0 { + panic(fmt.Sprintf("invalid size specified in newReflogRingBuffer construction: %d", size)) + } + + return &reflogRingBuffer{ + requestedSize: size, + totalSize: size * 2, + items: make([]reflogRootHashEntry, size*2), + mu: &sync.Mutex{}, + insertIndex: 0, + itemCount: 0, + } +} + +// Push pushes |newItem| onto this ring buffer, replacing the oldest entry in this ring buffer once the buffer +// is fully populated. +func (rb *reflogRingBuffer) Push(newItem reflogRootHashEntry) { + rb.mu.Lock() + defer rb.mu.Unlock() + + rb.items[rb.insertIndex] = newItem + rb.insertIndex = (rb.insertIndex + 1) % len(rb.items) + if rb.itemCount < rb.requestedSize { + rb.itemCount++ + } +} + +// Iterate traverses the entries in this ring buffer and invokes the specified callback function, |f|, on each +// entry. Iteration starts with the oldest entries inserted into this ring buffer and ends with the most recent +// entry. This function will iterate over at most N entries, where N is the requested size the caller specified +// when constructing this ring buffer. +func (rb *reflogRingBuffer) Iterate(f func(item reflogRootHashEntry) error) error { + startPosition, endPosition := rb.getIterationIndexes() + if startPosition == endPosition { + return nil + } + + for idx := startPosition; ; { + // The ring buffer holds twice as many entries as we ever expose through the Iterate function, so that + // entries can still be inserted without having to lock the whole ring buffer during iteration. However, + // as a sanity check, before we look at an index, we make sure the current insertion index doesn't conflict. + // TODO: this works if we happen to catch the currentInsertIndex at exactly the right time, but it could fly + // over with us missing it. It would be a better sanity check to make sure the current insertion index + // isn't within a range – perhaps 20% of the requested size? + rb.mu.Lock() + currentInsertIndex := rb.insertIndex + rb.mu.Unlock() + + if idx == currentInsertIndex { + return fmt.Errorf("unable to finish iteration: insertion index has wrapped around into read range") + } + + err := f(rb.items[idx]) + if err != nil { + return err + } + + // Move to next spot + idx = (idx + 1) % rb.totalSize + if idx == endPosition { + break + } + } + + return nil +} + +// TruncateToLastRecord resets this ring buffer so that it only exposes the single, most recently written record. +// If this ring buffer has not already had any records pushed in it, then this is a no-op and the ring buffer +// remains with a zero item count. +func (rb *reflogRingBuffer) TruncateToLastRecord() { + rb.mu.Lock() + defer rb.mu.Unlock() + + if rb.itemCount == 0 { + return + } + + rb.itemCount = 1 +} + +// getIterationIndexes returns the start (inclusive) and end (exclusive) positions for iterating over the +// entries in this ring buffer. Note that the end position may be less than the start position, which +// indicates that iteration should wrap around the ring buffer. +func (rb *reflogRingBuffer) getIterationIndexes() (int, int) { + rb.mu.Lock() + defer rb.mu.Unlock() + + // If the buffer is empty, return the start position equal to the end position so that iteration is a no-op + if rb.itemCount == 0 || rb.totalSize == 0 { + return rb.insertIndex, rb.insertIndex + } + + // When the ring buffer isn't fully populated yet, we need to be careful to limit iteration to the number + // of items that have actually been inserted. Once more entries have been inserted than the requested size + // of this ring buffer, we will iterate over only the most recent entries and limit to the requested size. + itemCount := rb.itemCount + if itemCount > rb.requestedSize { + itemCount = rb.requestedSize + } + + endPosition := rb.insertIndex + startPosition := (endPosition - itemCount) % rb.totalSize + if startPosition < 0 { + startPosition = rb.totalSize + startPosition + } + + return startPosition, endPosition +} diff --git a/go/store/nbs/reflog_ring_buffer_test.go b/go/store/nbs/reflog_ring_buffer_test.go new file mode 100644 index 0000000000..9d12ca6bbb --- /dev/null +++ b/go/store/nbs/reflog_ring_buffer_test.go @@ -0,0 +1,131 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +// TestIteration asserts that we can iterate over the contents of a reflog ring buffer as the ring buffer grows. +func TestIteration(t *testing.T) { + buffer := newReflogRingBuffer(5) + + // Assert that Iterate returns the correct items in the correct order when the buffer + // contains fewer items than the requested buffer size. + insertTestRecord(buffer, "aaaa") + insertTestRecord(buffer, "bbbb") + insertTestRecord(buffer, "cccc") + assertExpectedIterationOrder(t, buffer, []string{"aaaa", "bbbb", "cccc"}) + + // Assert that Iterate returns the correct items in the correct order when the buffer + // contains the same number of items as the requested buffer size. + insertTestRecord(buffer, "dddd") + insertTestRecord(buffer, "eeee") + assertExpectedIterationOrder(t, buffer, []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}) + + // Insert two new records that cause the buffer to exclude the first two records + insertTestRecord(buffer, "ffff") + insertTestRecord(buffer, "gggg") + assertExpectedIterationOrder(t, buffer, []string{"cccc", "dddd", "eeee", "ffff", "gggg"}) + + // Insert three records to fill up the buffer's internal capacity + insertTestRecord(buffer, "hhhh") + insertTestRecord(buffer, "iiii") + insertTestRecord(buffer, "jjjj") + assertExpectedIterationOrder(t, buffer, []string{"ffff", "gggg", "hhhh", "iiii", "jjjj"}) + + // Insert four records to test the buffer wrapping around for the first time + insertTestRecord(buffer, "kkkk") + insertTestRecord(buffer, "llll") + insertTestRecord(buffer, "mmmm") + insertTestRecord(buffer, "nnnn") + assertExpectedIterationOrder(t, buffer, []string{"jjjj", "kkkk", "llll", "mmmm", "nnnn"}) + + // Insert 10 records to test the buffer wrapping around a second time + insertTestRecord(buffer, "oooo") + insertTestRecord(buffer, "pppp") + insertTestRecord(buffer, "qqqq") + insertTestRecord(buffer, "rrrr") + insertTestRecord(buffer, "ssss") + insertTestRecord(buffer, "tttt") + insertTestRecord(buffer, "uuuu") + insertTestRecord(buffer, "vvvv") + insertTestRecord(buffer, "wwww") + insertTestRecord(buffer, "xxxx") + assertExpectedIterationOrder(t, buffer, []string{"tttt", "uuuu", "vvvv", "wwww", "xxxx"}) +} + +// TestTruncateToLastRecord asserts that the TruncateToLastRecord works correctly regardless of how much data +// is currently stored in the buffer. +func TestTruncateToLastRecord(t *testing.T) { + buffer := newReflogRingBuffer(5) + + // When the buffer is empty, TruncateToLastRecord is a no-op + buffer.TruncateToLastRecord() + assertExpectedIterationOrder(t, buffer, []string{}) + buffer.TruncateToLastRecord() + assertExpectedIterationOrder(t, buffer, []string{}) + + // When the buffer contains only a single item, TruncateToLastRecord is a no-op + insertTestRecord(buffer, "aaaa") + buffer.TruncateToLastRecord() + assertExpectedIterationOrder(t, buffer, []string{"aaaa"}) + buffer.TruncateToLastRecord() + assertExpectedIterationOrder(t, buffer, []string{"aaaa"}) + + // When the buffer is not full, TruncateToLastRecord reduces the buffer to the most recent logical record + insertTestRecord(buffer, "bbbb") + insertTestRecord(buffer, "cccc") + insertTestRecord(buffer, "dddd") + buffer.TruncateToLastRecord() + assertExpectedIterationOrder(t, buffer, []string{"dddd"}) + + // When the buffer is full, TruncateToLastRecord reduces the buffer to the most recent logical record + insertTestRecord(buffer, "aaaa") + insertTestRecord(buffer, "bbbb") + insertTestRecord(buffer, "cccc") + insertTestRecord(buffer, "dddd") + insertTestRecord(buffer, "eeee") + insertTestRecord(buffer, "ffff") + insertTestRecord(buffer, "gggg") + insertTestRecord(buffer, "hhhh") + insertTestRecord(buffer, "iiii") + insertTestRecord(buffer, "jjjj") + insertTestRecord(buffer, "kkkk") + insertTestRecord(buffer, "llll") + insertTestRecord(buffer, "mmmm") + buffer.TruncateToLastRecord() + assertExpectedIterationOrder(t, buffer, []string{"mmmm"}) +} + +func insertTestRecord(buffer *reflogRingBuffer, root string) { + buffer.Push(reflogRootHashEntry{ + root: root, + timestamp: time.Now(), + }) +} + +func assertExpectedIterationOrder(t *testing.T, buffer *reflogRingBuffer, expectedRoots []string) { + i := 0 + buffer.Iterate(func(item reflogRootHashEntry) error { + assert.Equal(t, expectedRoots[i], item.root) + assert.False(t, time.Time.IsZero(item.timestamp)) + i++ + return nil + }) + assert.Equal(t, len(expectedRoots), i) +} From 68b5733aa08d0be4638fb27da696abc80d92be60 Mon Sep 17 00:00:00 2001 From: Jason Fulghum Date: Mon, 6 Nov 2023 08:56:25 -0800 Subject: [PATCH 3/6] Removing old sanity check --- go/store/nbs/journal.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index 87ab6de454..edd20f8b0e 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -407,17 +407,7 @@ func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock addr, next mani // entry, and double check that it matches the root stored in the manifest. if !reflogDisabled { j.reflogRingBuffer.TruncateToLastRecord() - - if len(j.roots) == 0 { - return manifestContents{}, fmt.Errorf( - "ChunkJournal roots not intialized; no roots in memory") - } - j.roots = j.roots[len(j.roots)-1:] - j.rootTimestamps = j.rootTimestamps[len(j.rootTimestamps)-1:] - if j.roots[0] != latest.root.String() { - return manifestContents{}, fmt.Errorf( - "ChunkJournal root doesn't match manifest root") - } + // TODO: sanity check that j.reflogRingBuffer.Peek matches latest.root ? } return latest, nil From 829cc9f1eb412771ede5401b0034a8133066baaf Mon Sep 17 00:00:00 2001 From: fulghum Date: Mon, 6 Nov 2023 17:07:43 +0000 Subject: [PATCH 4/6] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/store/nbs/journal.go | 2 +- go/store/nbs/reflog_ring_buffer_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index edd20f8b0e..284b1660f7 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -25,12 +25,12 @@ import ( "strconv" "time" + "github.com/dolthub/fslock" "github.com/sirupsen/logrus" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/fslock" ) const ( diff --git a/go/store/nbs/reflog_ring_buffer_test.go b/go/store/nbs/reflog_ring_buffer_test.go index 9d12ca6bbb..6ca962cc52 100644 --- a/go/store/nbs/reflog_ring_buffer_test.go +++ b/go/store/nbs/reflog_ring_buffer_test.go @@ -15,9 +15,10 @@ package nbs import ( - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" ) // TestIteration asserts that we can iterate over the contents of a reflog ring buffer as the ring buffer grows. From 2b3097a285fbb368b7e047d1e496215de7989a14 Mon Sep 17 00:00:00 2001 From: Jason Fulghum Date: Mon, 6 Nov 2023 09:47:48 -0800 Subject: [PATCH 5/6] Adding a sanity check to exit iteration early if the insertion index wraps around into the iteration range. --- go/store/nbs/reflog_ring_buffer.go | 37 +++++++++++++------ go/store/nbs/reflog_ring_buffer_test.go | 47 ++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 11 deletions(-) diff --git a/go/store/nbs/reflog_ring_buffer.go b/go/store/nbs/reflog_ring_buffer.go index e2211e8c92..771b6edb62 100644 --- a/go/store/nbs/reflog_ring_buffer.go +++ b/go/store/nbs/reflog_ring_buffer.go @@ -15,11 +15,17 @@ package nbs import ( + "errors" "fmt" "sync" "time" ) +// errUnsafeIteration is returned when iterating through a ring buffer too slowly and new, inserted data is detected +// as wrapping around into the iteration range. +var errUnsafeIteration = errors.New( + "unable to finish iteration: insertion index has wrapped around into iteration range") + // reflogRootHashEntry is a data container for a root hash update that was recorded to the chunk journal. It contains // the root and the time at which it was written. type reflogRootHashEntry struct { @@ -85,16 +91,10 @@ func (rb *reflogRingBuffer) Iterate(f func(item reflogRootHashEntry) error) erro for idx := startPosition; ; { // The ring buffer holds twice as many entries as we ever expose through the Iterate function, so that // entries can still be inserted without having to lock the whole ring buffer during iteration. However, - // as a sanity check, before we look at an index, we make sure the current insertion index doesn't conflict. - // TODO: this works if we happen to catch the currentInsertIndex at exactly the right time, but it could fly - // over with us missing it. It would be a better sanity check to make sure the current insertion index - // isn't within a range – perhaps 20% of the requested size? - rb.mu.Lock() - currentInsertIndex := rb.insertIndex - rb.mu.Unlock() - - if idx == currentInsertIndex { - return fmt.Errorf("unable to finish iteration: insertion index has wrapped around into read range") + // as a sanity check, before we look at an index, we make sure the current insertion index hasn't + // gone into the range we're iterating. + if rb.insertionIndexIsInRange(startPosition, endPosition) { + return fmt.Errorf("unable to finish iteration: insertion index has wrapped around into iteration range") } err := f(rb.items[idx]) @@ -154,3 +154,20 @@ func (rb *reflogRingBuffer) getIterationIndexes() (int, int) { return startPosition, endPosition } + +// insertionIndexIsInRange returns true if the current insertion pointer for this ring buffer is within the +// specified |rangeStart| and |rangeEnd| indexes. This function handles ranges that wrap around the ring buffer. +func (rb *reflogRingBuffer) insertionIndexIsInRange(rangeStart, rangeEnd int) bool { + rb.mu.Lock() + currentInsertIndex := rb.insertIndex + rb.mu.Unlock() + + // If the range wraps around the ring buffer, adjust currentInsertIndex and rangeEnd + // so that we can use the same logic for an in range check. + if rangeStart > rangeEnd { + currentInsertIndex += rb.totalSize + rangeEnd += rb.totalSize + } + + return currentInsertIndex >= rangeStart && currentInsertIndex <= rangeEnd +} diff --git a/go/store/nbs/reflog_ring_buffer_test.go b/go/store/nbs/reflog_ring_buffer_test.go index 9d12ca6bbb..6f60f89dcc 100644 --- a/go/store/nbs/reflog_ring_buffer_test.go +++ b/go/store/nbs/reflog_ring_buffer_test.go @@ -15,9 +15,13 @@ package nbs import ( - "github.com/stretchr/testify/assert" + "fmt" + "sync" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // TestIteration asserts that we can iterate over the contents of a reflog ring buffer as the ring buffer grows. @@ -112,6 +116,47 @@ func TestTruncateToLastRecord(t *testing.T) { assertExpectedIterationOrder(t, buffer, []string{"mmmm"}) } +// TestSlowIteration asserts that when iterating through a reflog ring buffer too slowly and the insertion index +// wraps around into the iteration range, that iteration stops early and an error is returned. +func TestSlowIteration(t *testing.T) { + buffer := newReflogRingBuffer(5) + buffer.Push(reflogRootHashEntry{"aaaa", time.Now()}) + buffer.Push(reflogRootHashEntry{"bbbb", time.Now()}) + buffer.Push(reflogRootHashEntry{"cccc", time.Now()}) + buffer.Push(reflogRootHashEntry{"dddd", time.Now()}) + buffer.Push(reflogRootHashEntry{"eeee", time.Now()}) + + // Create a wait group to allow our two go routines to complete before + wg := sync.WaitGroup{} + wg.Add(2) + + // Create one goroutine that *very* slowly iterates through the buffer + var err error + iterationCount := 0 + go func() { + err = buffer.Iterate(func(item reflogRootHashEntry) error { + time.Sleep(1 * time.Second) + iterationCount++ + return nil + }) + wg.Done() + }() + + // Create a second goroutine that quickly inserts more data than the buffer can hold + go func() { + for i := 0; i < 20; i++ { + buffer.Push(reflogRootHashEntry{fmt.Sprintf("i-%d", i), time.Now()}) + } + wg.Done() + }() + + // Wait for both goroutines to complete, then assert that the conflict detection logic triggered + wg.Wait() + require.Error(t, err) + require.Equal(t, errUnsafeIteration, err) + require.True(t, iterationCount < 5) +} + func insertTestRecord(buffer *reflogRingBuffer, root string) { buffer.Push(reflogRootHashEntry{ root: root, From 272b6d93dcadd459db557a6ed9a6803fa2040df7 Mon Sep 17 00:00:00 2001 From: Jason Fulghum Date: Mon, 6 Nov 2023 11:51:55 -0800 Subject: [PATCH 6/6] Fixing iteration conflict detection --- go/store/nbs/reflog_ring_buffer.go | 49 +++++++++++++++++++------ go/store/nbs/reflog_ring_buffer_test.go | 38 +++++-------------- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/go/store/nbs/reflog_ring_buffer.go b/go/store/nbs/reflog_ring_buffer.go index 771b6edb62..a344a89464 100644 --- a/go/store/nbs/reflog_ring_buffer.go +++ b/go/store/nbs/reflog_ring_buffer.go @@ -45,6 +45,7 @@ type reflogRingBuffer struct { totalSize int insertIndex int itemCount int + epoch uint } // newReflogRingBuffer creates a new reflogRingBuffer that allows the reflog to query up to |size| records. @@ -62,6 +63,7 @@ func newReflogRingBuffer(size int) *reflogRingBuffer { mu: &sync.Mutex{}, insertIndex: 0, itemCount: 0, + epoch: 1, } } @@ -73,6 +75,10 @@ func (rb *reflogRingBuffer) Push(newItem reflogRootHashEntry) { rb.items[rb.insertIndex] = newItem rb.insertIndex = (rb.insertIndex + 1) % len(rb.items) + if rb.insertIndex == 0 { + rb.epoch++ + } + if rb.itemCount < rb.requestedSize { rb.itemCount++ } @@ -83,7 +89,7 @@ func (rb *reflogRingBuffer) Push(newItem reflogRootHashEntry) { // entry. This function will iterate over at most N entries, where N is the requested size the caller specified // when constructing this ring buffer. func (rb *reflogRingBuffer) Iterate(f func(item reflogRootHashEntry) error) error { - startPosition, endPosition := rb.getIterationIndexes() + startPosition, endPosition, startingEpoch := rb.getIterationIndexes() if startPosition == endPosition { return nil } @@ -93,8 +99,8 @@ func (rb *reflogRingBuffer) Iterate(f func(item reflogRootHashEntry) error) erro // entries can still be inserted without having to lock the whole ring buffer during iteration. However, // as a sanity check, before we look at an index, we make sure the current insertion index hasn't // gone into the range we're iterating. - if rb.insertionIndexIsInRange(startPosition, endPosition) { - return fmt.Errorf("unable to finish iteration: insertion index has wrapped around into iteration range") + if rb.insertionIndexIsInRange(startPosition, endPosition, startingEpoch) { + return errUnsafeIteration } err := f(rb.items[idx]) @@ -127,15 +133,16 @@ func (rb *reflogRingBuffer) TruncateToLastRecord() { } // getIterationIndexes returns the start (inclusive) and end (exclusive) positions for iterating over the -// entries in this ring buffer. Note that the end position may be less than the start position, which -// indicates that iteration should wrap around the ring buffer. -func (rb *reflogRingBuffer) getIterationIndexes() (int, int) { +// entries in this ring buffer, as well as the current epoch, or generation of the ring buffer for the starting +// position. Note that the end position may be less than the start position, which indicates that iteration +// wraps around the ring buffer. +func (rb *reflogRingBuffer) getIterationIndexes() (int, int, uint) { rb.mu.Lock() defer rb.mu.Unlock() // If the buffer is empty, return the start position equal to the end position so that iteration is a no-op if rb.itemCount == 0 || rb.totalSize == 0 { - return rb.insertIndex, rb.insertIndex + return rb.insertIndex, rb.insertIndex, rb.epoch } // When the ring buffer isn't fully populated yet, we need to be careful to limit iteration to the number @@ -148,26 +155,46 @@ func (rb *reflogRingBuffer) getIterationIndexes() (int, int) { endPosition := rb.insertIndex startPosition := (endPosition - itemCount) % rb.totalSize + epoch := rb.epoch if startPosition < 0 { startPosition = rb.totalSize + startPosition + epoch-- } - return startPosition, endPosition + return startPosition, endPosition, epoch } // insertionIndexIsInRange returns true if the current insertion pointer for this ring buffer is within the -// specified |rangeStart| and |rangeEnd| indexes. This function handles ranges that wrap around the ring buffer. -func (rb *reflogRingBuffer) insertionIndexIsInRange(rangeStart, rangeEnd int) bool { +// specified |rangeStart| and |rangeEnd| indexes. The |startingEpoch| parameter is used to determine if the +// current insertion index has wrapped around the ring buffer, possibly multiple times. +func (rb *reflogRingBuffer) insertionIndexIsInRange(rangeStart, rangeEnd int, startingEpoch uint) bool { rb.mu.Lock() currentInsertIndex := rb.insertIndex + currentEpoch := rb.epoch rb.mu.Unlock() + // When the epoch value overflows and wraps around to 0 again, adjust the starting epoch accordingly + epochDelta := currentEpoch - startingEpoch + if epochDelta < 0 { + maxUint := ^uint(0) + epochDelta += maxUint + } + // If the range wraps around the ring buffer, adjust currentInsertIndex and rangeEnd // so that we can use the same logic for an in range check. if rangeStart > rangeEnd { currentInsertIndex += rb.totalSize rangeEnd += rb.totalSize + epochDelta-- } - return currentInsertIndex >= rangeStart && currentInsertIndex <= rangeEnd + switch epochDelta { + case 0: + // same epoch + return currentInsertIndex >= rangeStart && currentInsertIndex < rangeEnd + case 1: + return currentInsertIndex >= rangeStart + default: + return true + } } diff --git a/go/store/nbs/reflog_ring_buffer_test.go b/go/store/nbs/reflog_ring_buffer_test.go index 6f60f89dcc..e2f4756c12 100644 --- a/go/store/nbs/reflog_ring_buffer_test.go +++ b/go/store/nbs/reflog_ring_buffer_test.go @@ -16,7 +16,6 @@ package nbs import ( "fmt" - "sync" "testing" "time" @@ -116,9 +115,9 @@ func TestTruncateToLastRecord(t *testing.T) { assertExpectedIterationOrder(t, buffer, []string{"mmmm"}) } -// TestSlowIteration asserts that when iterating through a reflog ring buffer too slowly and the insertion index -// wraps around into the iteration range, that iteration stops early and an error is returned. -func TestSlowIteration(t *testing.T) { +// TestIterationConflict asserts that when iterating through a reflog ring buffer and new items are written to the +// buffer and wrap around into the iteration range, that iteration stops early and an error is returned. +func TestIterationConflict(t *testing.T) { buffer := newReflogRingBuffer(5) buffer.Push(reflogRootHashEntry{"aaaa", time.Now()}) buffer.Push(reflogRootHashEntry{"bbbb", time.Now()}) @@ -126,32 +125,14 @@ func TestSlowIteration(t *testing.T) { buffer.Push(reflogRootHashEntry{"dddd", time.Now()}) buffer.Push(reflogRootHashEntry{"eeee", time.Now()}) - // Create a wait group to allow our two go routines to complete before - wg := sync.WaitGroup{} - wg.Add(2) - - // Create one goroutine that *very* slowly iterates through the buffer - var err error iterationCount := 0 - go func() { - err = buffer.Iterate(func(item reflogRootHashEntry) error { - time.Sleep(1 * time.Second) - iterationCount++ - return nil - }) - wg.Done() - }() - - // Create a second goroutine that quickly inserts more data than the buffer can hold - go func() { - for i := 0; i < 20; i++ { + err := buffer.Iterate(func(item reflogRootHashEntry) error { + for i := 0; i < 100; i++ { buffer.Push(reflogRootHashEntry{fmt.Sprintf("i-%d", i), time.Now()}) } - wg.Done() - }() - - // Wait for both goroutines to complete, then assert that the conflict detection logic triggered - wg.Wait() + iterationCount++ + return nil + }) require.Error(t, err) require.Equal(t, errUnsafeIteration, err) require.True(t, iterationCount < 5) @@ -166,11 +147,12 @@ func insertTestRecord(buffer *reflogRingBuffer, root string) { func assertExpectedIterationOrder(t *testing.T, buffer *reflogRingBuffer, expectedRoots []string) { i := 0 - buffer.Iterate(func(item reflogRootHashEntry) error { + err := buffer.Iterate(func(item reflogRootHashEntry) error { assert.Equal(t, expectedRoots[i], item.root) assert.False(t, time.Time.IsZero(item.timestamp)) i++ return nil }) + assert.NoError(t, err) assert.Equal(t, len(expectedRoots), i) }