mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-09 11:19:01 -05:00
Merge pull request #6947 from dolthub/fulghum/reflog-2
Switch reflog data to be stored in a ring buffer
This commit is contained in:
@@ -23,7 +23,6 @@ import (
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/fslock"
|
||||
@@ -38,27 +37,19 @@ const (
|
||||
chunkJournalName = chunkJournalAddr // todo
|
||||
)
|
||||
|
||||
// reflogDisabled indicates whether access to the reflog has been disabled and if so, no chunk journal root references
|
||||
// should be kept in memory. This is controlled by the DOLT_DISABLE_REFLOG env var and this var is ONLY written to
|
||||
// during initialization. All access after initialization is read-only, so no additional locking is needed.
|
||||
var reflogDisabled = false
|
||||
var reflogRecordLimit = 100_000
|
||||
var loggedReflogMaxSizeWarning = false
|
||||
|
||||
// defaultReflogBufferSize controls how many of the most recent root references for root updates are kept in-memory.
|
||||
// This default can be overridden by setting the DOLT_REFLOG_RECORD_LIMIT before Dolt starts.
|
||||
const defaultReflogBufferSize = 5_000
|
||||
|
||||
func init() {
|
||||
if os.Getenv(dconfig.EnvDisableReflog) != "" {
|
||||
reflogDisabled = true
|
||||
}
|
||||
|
||||
if limit := os.Getenv(dconfig.EnvReflogRecordLimit); limit != "" {
|
||||
i, err := strconv.Atoi(limit)
|
||||
if err != nil {
|
||||
logrus.Warnf("unable to parse integer value for %s: %s", dconfig.EnvReflogRecordLimit, err.Error())
|
||||
} else {
|
||||
if i <= 0 {
|
||||
reflogDisabled = true
|
||||
} else {
|
||||
reflogRecordLimit = i
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ChunkJournal is a persistence abstraction for a NomsBlockStore.
|
||||
@@ -72,12 +63,9 @@ type ChunkJournal struct {
|
||||
backing *journalManifest
|
||||
persister *fsTablePersister
|
||||
|
||||
// mu locks access to the in-memory roots and root timestamp information that is queried by Dolt's reflog
|
||||
mu sync.Mutex
|
||||
// roots holds an in-memory representation of the root hashes that have been written to the ChunkJournal
|
||||
roots []string
|
||||
// rootTimestamps holds a timestamp for each of the root hashes that have been written to the ChunkJournal
|
||||
rootTimestamps []time.Time
|
||||
// reflogRingBuffer holds the most recent roots written to the chunk journal so that they can be
|
||||
// quickly loaded for reflog queries without having to re-read the journal file from disk.
|
||||
reflogRingBuffer *reflogRingBuffer
|
||||
}
|
||||
|
||||
var _ tablePersister = &ChunkJournal{}
|
||||
@@ -94,6 +82,7 @@ func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifes
|
||||
|
||||
j := &ChunkJournal{path: path, backing: m, persister: p}
|
||||
j.contents.nbfVers = nbfVers
|
||||
j.reflogRingBuffer = newReflogRingBuffer(reflogBufferSize())
|
||||
|
||||
ok, err := fileExists(path)
|
||||
if err != nil {
|
||||
@@ -108,6 +97,33 @@ func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifes
|
||||
return j, nil
|
||||
}
|
||||
|
||||
// reflogBufferSize returns the size of the ring buffer to allocate to store in-memory roots references when
|
||||
// new roots are written to a chunk journal. If reflog queries have been disabled, this function will return 0.
|
||||
// If the default buffer size has been overridden via DOLT_REFLOG_RECORD_LIMIT, that value will be returned if
|
||||
// it can be successfully parsed. Otherwise, the default buffer size will be returned.
|
||||
func reflogBufferSize() int {
|
||||
if reflogDisabled {
|
||||
return 0
|
||||
}
|
||||
|
||||
reflogBufferSize := defaultReflogBufferSize
|
||||
if limit := os.Getenv(dconfig.EnvReflogRecordLimit); limit != "" {
|
||||
i, err := strconv.Atoi(limit)
|
||||
if err != nil {
|
||||
logrus.Warnf("unable to parse integer value for %s from %s: %s",
|
||||
dconfig.EnvReflogRecordLimit, limit, err.Error())
|
||||
} else {
|
||||
if i <= 0 {
|
||||
reflogDisabled = true
|
||||
} else {
|
||||
reflogBufferSize = i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return reflogBufferSize
|
||||
}
|
||||
|
||||
// bootstrapJournalWriter initializes the journalWriter, which manages access to the
|
||||
// journal file for this ChunkJournal. The bootstrapping process differs depending
|
||||
// on whether a journal file exists at startup time.
|
||||
@@ -133,7 +149,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
_, _, _, err = j.wr.bootstrapJournal(ctx)
|
||||
_, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -161,16 +177,11 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// parse existing journal file
|
||||
root, roots, rootTimestamps, err := j.wr.bootstrapJournal(ctx)
|
||||
root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
j.mu.Lock()
|
||||
j.roots = roots
|
||||
j.rootTimestamps = rootTimestamps
|
||||
j.mu.Unlock()
|
||||
|
||||
mc, err := trueUpBackingManifest(ctx, root, j.backing)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -214,49 +225,17 @@ func trueUpBackingManifest(ctx context.Context, root hash.Hash, backing *journal
|
||||
// and passes the root and associated timestamp to a callback function, |f|. If |f| returns an error, iteration
|
||||
// is stopped and the error is returned.
|
||||
func (j *ChunkJournal) IterateRoots(f func(root string, timestamp *time.Time) error) error {
|
||||
roots, rootTimestamps, length, err := j.readCurrentRootsAndTimestamps()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// journal.roots are stored in chronological order. We need to process them in that order so that we can
|
||||
// accurately detect the root where a ref was first set to a commit. Note that we are careful to not iterate
|
||||
// beyond |length| in the slice, otherwise we risk a race condition that would read inconsistent data.
|
||||
for i := 0; i < length; i++ {
|
||||
return j.reflogRingBuffer.Iterate(func(entry reflogRootHashEntry) error {
|
||||
// If we're reading a chunk journal written with an older version of Dolt, the root hash journal record may
|
||||
// not have a timestamp value, so we'll have a time.Time instance in its zero value. If we see this, pass
|
||||
// nil instead to signal to callers that there is no valid timestamp available.
|
||||
var timestamp *time.Time = nil
|
||||
if time.Time.IsZero(rootTimestamps[i]) == false {
|
||||
timestamp = &rootTimestamps[i]
|
||||
var pTimestamp *time.Time = nil
|
||||
if time.Time.IsZero(entry.timestamp) == false {
|
||||
pTimestamp = &entry.timestamp
|
||||
}
|
||||
err := f(roots[i], timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// readCurrentRootsAndTimestamps grabs the mutex that protects the in-memory root and root timestamps that represent the
|
||||
// root hash updates in the chunk journal and returns the references to the roots and root timestamps slices, as well as
|
||||
// the length that can be safely read from them. Callers MUST honor this length and NOT read beyond it in the returned
|
||||
// slices, otherwise they risk getting inconsistent data (since the chunk journal continues to append entries to these
|
||||
// slices as new root update journal records are saved).
|
||||
func (j *ChunkJournal) readCurrentRootsAndTimestamps() (roots []string, rootTimestamps []time.Time, length int, err error) {
|
||||
j.mu.Lock()
|
||||
defer j.mu.Unlock()
|
||||
|
||||
roots = j.roots
|
||||
rootTimestamps = j.rootTimestamps
|
||||
length = len(roots)
|
||||
if len(roots) != len(rootTimestamps) {
|
||||
return nil, nil, -1, fmt.Errorf(
|
||||
"different number of roots and root timestamps encountered in ChunkJournal")
|
||||
}
|
||||
|
||||
return roots, rootTimestamps, length, nil
|
||||
return f(entry.root, pTimestamp)
|
||||
})
|
||||
}
|
||||
|
||||
// Persist implements tablePersister.
|
||||
@@ -383,16 +362,10 @@ func (j *ChunkJournal) Update(ctx context.Context, lastLock addr, next manifestC
|
||||
|
||||
// Update the in-memory structures so that the ChunkJournal can be queried for reflog data
|
||||
if !reflogDisabled {
|
||||
j.mu.Lock()
|
||||
defer j.mu.Unlock()
|
||||
|
||||
if len(j.roots) < reflogRecordLimit {
|
||||
j.roots = append(j.roots, next.root.String())
|
||||
j.rootTimestamps = append(j.rootTimestamps, time.Now())
|
||||
} else if !loggedReflogMaxSizeWarning {
|
||||
loggedReflogMaxSizeWarning = true
|
||||
logrus.Warnf("exceeded reflog record limit (%d)", reflogRecordLimit)
|
||||
}
|
||||
j.reflogRingBuffer.Push(reflogRootHashEntry{
|
||||
root: next.root.String(),
|
||||
timestamp: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
return j.contents, nil
|
||||
@@ -433,19 +406,8 @@ func (j *ChunkJournal) UpdateGCGen(ctx context.Context, lastLock addr, next mani
|
||||
// Truncate the in-memory root and root timestamp metadata to the most recent
|
||||
// entry, and double check that it matches the root stored in the manifest.
|
||||
if !reflogDisabled {
|
||||
j.mu.Lock()
|
||||
defer j.mu.Unlock()
|
||||
|
||||
if len(j.roots) == 0 {
|
||||
return manifestContents{}, fmt.Errorf(
|
||||
"ChunkJournal roots not intialized; no roots in memory")
|
||||
}
|
||||
j.roots = j.roots[len(j.roots)-1:]
|
||||
j.rootTimestamps = j.rootTimestamps[len(j.rootTimestamps)-1:]
|
||||
if j.roots[0] != latest.root.String() {
|
||||
return manifestContents{}, fmt.Errorf(
|
||||
"ChunkJournal root doesn't match manifest root")
|
||||
}
|
||||
j.reflogRingBuffer.TruncateToLastRecord()
|
||||
// TODO: sanity check that j.reflogRingBuffer.Peek matches latest.root ?
|
||||
}
|
||||
|
||||
return latest, nil
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/swiss"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -162,10 +161,10 @@ type journalWriter struct {
|
||||
var _ io.Closer = &journalWriter{}
|
||||
|
||||
// bootstrapJournal reads in records from the journal file and the journal index file, initializing
|
||||
// the state of the journalWriter. It returns the most recent root hash for the journal, as well as
|
||||
// a slice of all root hash strings in reverse chronological order, and a slice of timestamps that
|
||||
// indicate the time each returned root was created.
|
||||
func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, roots []string, times []time.Time, err error) {
|
||||
// the state of the journalWriter. Root hashes read from root update records in the journal are written
|
||||
// to |reflogRingBuffer|, which maintains the most recently updated roots which are used to generate the
|
||||
// reflog. This function returns the most recent root hash for the journal as well as any error encountered.
|
||||
func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer) (last hash.Hash, err error) {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
|
||||
@@ -173,7 +172,6 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash,
|
||||
wr.maxNovel = journalIndexDefaultMaxNovel
|
||||
}
|
||||
wr.ranges = newRangeIndex()
|
||||
loggedReflogMaxSizeWarning := false
|
||||
|
||||
p := filepath.Join(filepath.Dir(wr.path), journalIndexFileName)
|
||||
var ok bool
|
||||
@@ -192,7 +190,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash,
|
||||
if ok {
|
||||
var info os.FileInfo
|
||||
if info, err = wr.index.Stat(); err != nil {
|
||||
return hash.Hash{}, nil, nil, err
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
|
||||
// initialize range index with enough capacity to
|
||||
@@ -254,7 +252,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash,
|
||||
if cerr := wr.corruptIndexRecovery(ctx); cerr != nil {
|
||||
err = fmt.Errorf("error recovering corrupted chunk journal index: %s", err.Error())
|
||||
}
|
||||
return hash.Hash{}, nil, nil, err
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
wr.ranges = wr.ranges.flatten()
|
||||
}
|
||||
@@ -272,14 +270,11 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash,
|
||||
|
||||
case rootHashJournalRecKind:
|
||||
last = hash.Hash(r.address)
|
||||
if !reflogDisabled {
|
||||
if len(roots) < reflogRecordLimit {
|
||||
roots = append(roots, r.address.String())
|
||||
times = append(times, r.timestamp)
|
||||
} else if !loggedReflogMaxSizeWarning {
|
||||
loggedReflogMaxSizeWarning = true
|
||||
logrus.Warnf("journal writer exceeded reflog record limit (%d)", reflogRecordLimit)
|
||||
}
|
||||
if !reflogDisabled && reflogRingBuffer != nil {
|
||||
reflogRingBuffer.Push(reflogRootHashEntry{
|
||||
root: r.address.String(),
|
||||
timestamp: r.timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
default:
|
||||
@@ -288,7 +283,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash,
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return hash.Hash{}, nil, nil, err
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -184,7 +184,7 @@ func newTestJournalWriter(t *testing.T, path string) *journalWriter {
|
||||
j, err := createJournalWriter(ctx, path)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, j)
|
||||
_, _, _, err = j.bootstrapJournal(ctx)
|
||||
_, err = j.bootstrapJournal(ctx, nil)
|
||||
require.NoError(t, err)
|
||||
return j
|
||||
}
|
||||
@@ -217,8 +217,10 @@ func TestJournalWriterBootstrap(t *testing.T) {
|
||||
|
||||
j, _, err := openJournalWriter(ctx, path)
|
||||
require.NoError(t, err)
|
||||
_, _, _, err = j.bootstrapJournal(ctx)
|
||||
reflogBuffer := newReflogRingBuffer(10)
|
||||
last, err = j.bootstrapJournal(ctx, reflogBuffer)
|
||||
require.NoError(t, err)
|
||||
assertExpectedIterationOrder(t, reflogBuffer, []string{last.String()})
|
||||
|
||||
validateAllLookups(t, j, data)
|
||||
|
||||
@@ -353,7 +355,7 @@ func TestJournalIndexBootstrap(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
// bootstrap journal and validate chunk records
|
||||
last, _, _, err := journal.bootstrapJournal(ctx)
|
||||
last, err := journal.bootstrapJournal(ctx, nil)
|
||||
assert.NoError(t, err)
|
||||
for _, e := range expected {
|
||||
var act CompressedChunk
|
||||
@@ -388,9 +390,8 @@ func TestJournalIndexBootstrap(t *testing.T) {
|
||||
jnl, ok, err := openJournalWriter(ctx, idxPath)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
_, _, _, err = jnl.bootstrapJournal(ctx)
|
||||
_, err = jnl.bootstrapJournal(ctx, nil)
|
||||
assert.Error(t, err)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
200
go/store/nbs/reflog_ring_buffer.go
Normal file
200
go/store/nbs/reflog_ring_buffer.go
Normal file
@@ -0,0 +1,200 @@
|
||||
// Copyright 2023 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// errUnsafeIteration is returned when iterating through a ring buffer too slowly and new, inserted data is detected
|
||||
// as wrapping around into the iteration range.
|
||||
var errUnsafeIteration = errors.New(
|
||||
"unable to finish iteration: insertion index has wrapped around into iteration range")
|
||||
|
||||
// reflogRootHashEntry is a data container for a root hash update that was recorded to the chunk journal. It contains
|
||||
// the root and the time at which it was written.
|
||||
type reflogRootHashEntry struct {
|
||||
root string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// reflogRingBuffer is a fixed size circular buffer that allows the most recent N entries to be iterated over (where
|
||||
// N is equal to the size requested when this ring buffer is constructed. Its locking strategy assumes that
|
||||
// only new entries are written to the head (through Push) and that existing entries will never need to be
|
||||
// updated. Internally, it allocates a slice that is twice as large as the requested size, so that less locking
|
||||
// is needed when iterating over entries to read them.
|
||||
type reflogRingBuffer struct {
|
||||
items []reflogRootHashEntry
|
||||
mu *sync.Mutex
|
||||
requestedSize int
|
||||
totalSize int
|
||||
insertIndex int
|
||||
itemCount int
|
||||
epoch uint
|
||||
}
|
||||
|
||||
// newReflogRingBuffer creates a new reflogRingBuffer that allows the reflog to query up to |size| records.
|
||||
// Internally, the ring buffer allocates extra storage so that |size| records can be read while new root entries
|
||||
// are still being recorded.
|
||||
func newReflogRingBuffer(size int) *reflogRingBuffer {
|
||||
if size < 0 {
|
||||
panic(fmt.Sprintf("invalid size specified in newReflogRingBuffer construction: %d", size))
|
||||
}
|
||||
|
||||
return &reflogRingBuffer{
|
||||
requestedSize: size,
|
||||
totalSize: size * 2,
|
||||
items: make([]reflogRootHashEntry, size*2),
|
||||
mu: &sync.Mutex{},
|
||||
insertIndex: 0,
|
||||
itemCount: 0,
|
||||
epoch: 1,
|
||||
}
|
||||
}
|
||||
|
||||
// Push pushes |newItem| onto this ring buffer, replacing the oldest entry in this ring buffer once the buffer
|
||||
// is fully populated.
|
||||
func (rb *reflogRingBuffer) Push(newItem reflogRootHashEntry) {
|
||||
rb.mu.Lock()
|
||||
defer rb.mu.Unlock()
|
||||
|
||||
rb.items[rb.insertIndex] = newItem
|
||||
rb.insertIndex = (rb.insertIndex + 1) % len(rb.items)
|
||||
if rb.insertIndex == 0 {
|
||||
rb.epoch++
|
||||
}
|
||||
|
||||
if rb.itemCount < rb.requestedSize {
|
||||
rb.itemCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate traverses the entries in this ring buffer and invokes the specified callback function, |f|, on each
|
||||
// entry. Iteration starts with the oldest entries inserted into this ring buffer and ends with the most recent
|
||||
// entry. This function will iterate over at most N entries, where N is the requested size the caller specified
|
||||
// when constructing this ring buffer.
|
||||
func (rb *reflogRingBuffer) Iterate(f func(item reflogRootHashEntry) error) error {
|
||||
startPosition, endPosition, startingEpoch := rb.getIterationIndexes()
|
||||
if startPosition == endPosition {
|
||||
return nil
|
||||
}
|
||||
|
||||
for idx := startPosition; ; {
|
||||
// The ring buffer holds twice as many entries as we ever expose through the Iterate function, so that
|
||||
// entries can still be inserted without having to lock the whole ring buffer during iteration. However,
|
||||
// as a sanity check, before we look at an index, we make sure the current insertion index hasn't
|
||||
// gone into the range we're iterating.
|
||||
if rb.insertionIndexIsInRange(startPosition, endPosition, startingEpoch) {
|
||||
return errUnsafeIteration
|
||||
}
|
||||
|
||||
err := f(rb.items[idx])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Move to next spot
|
||||
idx = (idx + 1) % rb.totalSize
|
||||
if idx == endPosition {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TruncateToLastRecord resets this ring buffer so that it only exposes the single, most recently written record.
|
||||
// If this ring buffer has not already had any records pushed in it, then this is a no-op and the ring buffer
|
||||
// remains with a zero item count.
|
||||
func (rb *reflogRingBuffer) TruncateToLastRecord() {
|
||||
rb.mu.Lock()
|
||||
defer rb.mu.Unlock()
|
||||
|
||||
if rb.itemCount == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
rb.itemCount = 1
|
||||
}
|
||||
|
||||
// getIterationIndexes returns the start (inclusive) and end (exclusive) positions for iterating over the
|
||||
// entries in this ring buffer, as well as the current epoch, or generation of the ring buffer for the starting
|
||||
// position. Note that the end position may be less than the start position, which indicates that iteration
|
||||
// wraps around the ring buffer.
|
||||
func (rb *reflogRingBuffer) getIterationIndexes() (int, int, uint) {
|
||||
rb.mu.Lock()
|
||||
defer rb.mu.Unlock()
|
||||
|
||||
// If the buffer is empty, return the start position equal to the end position so that iteration is a no-op
|
||||
if rb.itemCount == 0 || rb.totalSize == 0 {
|
||||
return rb.insertIndex, rb.insertIndex, rb.epoch
|
||||
}
|
||||
|
||||
// When the ring buffer isn't fully populated yet, we need to be careful to limit iteration to the number
|
||||
// of items that have actually been inserted. Once more entries have been inserted than the requested size
|
||||
// of this ring buffer, we will iterate over only the most recent entries and limit to the requested size.
|
||||
itemCount := rb.itemCount
|
||||
if itemCount > rb.requestedSize {
|
||||
itemCount = rb.requestedSize
|
||||
}
|
||||
|
||||
endPosition := rb.insertIndex
|
||||
startPosition := (endPosition - itemCount) % rb.totalSize
|
||||
epoch := rb.epoch
|
||||
if startPosition < 0 {
|
||||
startPosition = rb.totalSize + startPosition
|
||||
epoch--
|
||||
}
|
||||
|
||||
return startPosition, endPosition, epoch
|
||||
}
|
||||
|
||||
// insertionIndexIsInRange returns true if the current insertion pointer for this ring buffer is within the
|
||||
// specified |rangeStart| and |rangeEnd| indexes. The |startingEpoch| parameter is used to determine if the
|
||||
// current insertion index has wrapped around the ring buffer, possibly multiple times.
|
||||
func (rb *reflogRingBuffer) insertionIndexIsInRange(rangeStart, rangeEnd int, startingEpoch uint) bool {
|
||||
rb.mu.Lock()
|
||||
currentInsertIndex := rb.insertIndex
|
||||
currentEpoch := rb.epoch
|
||||
rb.mu.Unlock()
|
||||
|
||||
// When the epoch value overflows and wraps around to 0 again, adjust the starting epoch accordingly
|
||||
epochDelta := currentEpoch - startingEpoch
|
||||
if epochDelta < 0 {
|
||||
maxUint := ^uint(0)
|
||||
epochDelta += maxUint
|
||||
}
|
||||
|
||||
// If the range wraps around the ring buffer, adjust currentInsertIndex and rangeEnd
|
||||
// so that we can use the same logic for an in range check.
|
||||
if rangeStart > rangeEnd {
|
||||
currentInsertIndex += rb.totalSize
|
||||
rangeEnd += rb.totalSize
|
||||
epochDelta--
|
||||
}
|
||||
|
||||
switch epochDelta {
|
||||
case 0:
|
||||
// same epoch
|
||||
return currentInsertIndex >= rangeStart && currentInsertIndex < rangeEnd
|
||||
case 1:
|
||||
return currentInsertIndex >= rangeStart
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
158
go/store/nbs/reflog_ring_buffer_test.go
Normal file
158
go/store/nbs/reflog_ring_buffer_test.go
Normal file
@@ -0,0 +1,158 @@
|
||||
// Copyright 2023 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestIteration asserts that we can iterate over the contents of a reflog ring buffer as the ring buffer grows.
|
||||
func TestIteration(t *testing.T) {
|
||||
buffer := newReflogRingBuffer(5)
|
||||
|
||||
// Assert that Iterate returns the correct items in the correct order when the buffer
|
||||
// contains fewer items than the requested buffer size.
|
||||
insertTestRecord(buffer, "aaaa")
|
||||
insertTestRecord(buffer, "bbbb")
|
||||
insertTestRecord(buffer, "cccc")
|
||||
assertExpectedIterationOrder(t, buffer, []string{"aaaa", "bbbb", "cccc"})
|
||||
|
||||
// Assert that Iterate returns the correct items in the correct order when the buffer
|
||||
// contains the same number of items as the requested buffer size.
|
||||
insertTestRecord(buffer, "dddd")
|
||||
insertTestRecord(buffer, "eeee")
|
||||
assertExpectedIterationOrder(t, buffer, []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"})
|
||||
|
||||
// Insert two new records that cause the buffer to exclude the first two records
|
||||
insertTestRecord(buffer, "ffff")
|
||||
insertTestRecord(buffer, "gggg")
|
||||
assertExpectedIterationOrder(t, buffer, []string{"cccc", "dddd", "eeee", "ffff", "gggg"})
|
||||
|
||||
// Insert three records to fill up the buffer's internal capacity
|
||||
insertTestRecord(buffer, "hhhh")
|
||||
insertTestRecord(buffer, "iiii")
|
||||
insertTestRecord(buffer, "jjjj")
|
||||
assertExpectedIterationOrder(t, buffer, []string{"ffff", "gggg", "hhhh", "iiii", "jjjj"})
|
||||
|
||||
// Insert four records to test the buffer wrapping around for the first time
|
||||
insertTestRecord(buffer, "kkkk")
|
||||
insertTestRecord(buffer, "llll")
|
||||
insertTestRecord(buffer, "mmmm")
|
||||
insertTestRecord(buffer, "nnnn")
|
||||
assertExpectedIterationOrder(t, buffer, []string{"jjjj", "kkkk", "llll", "mmmm", "nnnn"})
|
||||
|
||||
// Insert 10 records to test the buffer wrapping around a second time
|
||||
insertTestRecord(buffer, "oooo")
|
||||
insertTestRecord(buffer, "pppp")
|
||||
insertTestRecord(buffer, "qqqq")
|
||||
insertTestRecord(buffer, "rrrr")
|
||||
insertTestRecord(buffer, "ssss")
|
||||
insertTestRecord(buffer, "tttt")
|
||||
insertTestRecord(buffer, "uuuu")
|
||||
insertTestRecord(buffer, "vvvv")
|
||||
insertTestRecord(buffer, "wwww")
|
||||
insertTestRecord(buffer, "xxxx")
|
||||
assertExpectedIterationOrder(t, buffer, []string{"tttt", "uuuu", "vvvv", "wwww", "xxxx"})
|
||||
}
|
||||
|
||||
// TestTruncateToLastRecord asserts that the TruncateToLastRecord works correctly regardless of how much data
|
||||
// is currently stored in the buffer.
|
||||
func TestTruncateToLastRecord(t *testing.T) {
|
||||
buffer := newReflogRingBuffer(5)
|
||||
|
||||
// When the buffer is empty, TruncateToLastRecord is a no-op
|
||||
buffer.TruncateToLastRecord()
|
||||
assertExpectedIterationOrder(t, buffer, []string{})
|
||||
buffer.TruncateToLastRecord()
|
||||
assertExpectedIterationOrder(t, buffer, []string{})
|
||||
|
||||
// When the buffer contains only a single item, TruncateToLastRecord is a no-op
|
||||
insertTestRecord(buffer, "aaaa")
|
||||
buffer.TruncateToLastRecord()
|
||||
assertExpectedIterationOrder(t, buffer, []string{"aaaa"})
|
||||
buffer.TruncateToLastRecord()
|
||||
assertExpectedIterationOrder(t, buffer, []string{"aaaa"})
|
||||
|
||||
// When the buffer is not full, TruncateToLastRecord reduces the buffer to the most recent logical record
|
||||
insertTestRecord(buffer, "bbbb")
|
||||
insertTestRecord(buffer, "cccc")
|
||||
insertTestRecord(buffer, "dddd")
|
||||
buffer.TruncateToLastRecord()
|
||||
assertExpectedIterationOrder(t, buffer, []string{"dddd"})
|
||||
|
||||
// When the buffer is full, TruncateToLastRecord reduces the buffer to the most recent logical record
|
||||
insertTestRecord(buffer, "aaaa")
|
||||
insertTestRecord(buffer, "bbbb")
|
||||
insertTestRecord(buffer, "cccc")
|
||||
insertTestRecord(buffer, "dddd")
|
||||
insertTestRecord(buffer, "eeee")
|
||||
insertTestRecord(buffer, "ffff")
|
||||
insertTestRecord(buffer, "gggg")
|
||||
insertTestRecord(buffer, "hhhh")
|
||||
insertTestRecord(buffer, "iiii")
|
||||
insertTestRecord(buffer, "jjjj")
|
||||
insertTestRecord(buffer, "kkkk")
|
||||
insertTestRecord(buffer, "llll")
|
||||
insertTestRecord(buffer, "mmmm")
|
||||
buffer.TruncateToLastRecord()
|
||||
assertExpectedIterationOrder(t, buffer, []string{"mmmm"})
|
||||
}
|
||||
|
||||
// TestIterationConflict asserts that when iterating through a reflog ring buffer and new items are written to the
|
||||
// buffer and wrap around into the iteration range, that iteration stops early and an error is returned.
|
||||
func TestIterationConflict(t *testing.T) {
|
||||
buffer := newReflogRingBuffer(5)
|
||||
buffer.Push(reflogRootHashEntry{"aaaa", time.Now()})
|
||||
buffer.Push(reflogRootHashEntry{"bbbb", time.Now()})
|
||||
buffer.Push(reflogRootHashEntry{"cccc", time.Now()})
|
||||
buffer.Push(reflogRootHashEntry{"dddd", time.Now()})
|
||||
buffer.Push(reflogRootHashEntry{"eeee", time.Now()})
|
||||
|
||||
iterationCount := 0
|
||||
err := buffer.Iterate(func(item reflogRootHashEntry) error {
|
||||
for i := 0; i < 100; i++ {
|
||||
buffer.Push(reflogRootHashEntry{fmt.Sprintf("i-%d", i), time.Now()})
|
||||
}
|
||||
iterationCount++
|
||||
return nil
|
||||
})
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errUnsafeIteration, err)
|
||||
require.True(t, iterationCount < 5)
|
||||
}
|
||||
|
||||
func insertTestRecord(buffer *reflogRingBuffer, root string) {
|
||||
buffer.Push(reflogRootHashEntry{
|
||||
root: root,
|
||||
timestamp: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
func assertExpectedIterationOrder(t *testing.T, buffer *reflogRingBuffer, expectedRoots []string) {
|
||||
i := 0
|
||||
err := buffer.Iterate(func(item reflogRootHashEntry) error {
|
||||
assert.Equal(t, expectedRoots[i], item.root)
|
||||
assert.False(t, time.Time.IsZero(item.timestamp))
|
||||
i++
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(expectedRoots), i)
|
||||
}
|
||||
@@ -6,18 +6,22 @@ teardown() {
|
||||
teardown_common
|
||||
}
|
||||
|
||||
# Asserts that when DOLT_DISABLE_REFLOG is set, the dolt_reflog() table
|
||||
# function returns an empty result set with no error.
|
||||
@test "reflog: disabled with DOLT_DISABLE_REFLOG" {
|
||||
export DOLT_DISABLE_REFLOG=true
|
||||
setup_common
|
||||
dolt sql -q "create table t (i int primary key, j int);"
|
||||
dolt sql -q "insert into t values (1, 1), (2, 2), (3, 3)";
|
||||
dolt commit -Am "initial commit"
|
||||
dolt commit --allow-empty -m "test commit 1"
|
||||
|
||||
run dolt sql -q "select * from dolt_reflog();"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 0 ]
|
||||
}
|
||||
|
||||
# Sanity check for the most basic case of querying the Dolt reflog
|
||||
@test "reflog: enabled by default" {
|
||||
setup_common
|
||||
dolt sql -q "create table t (i int primary key, j int);"
|
||||
@@ -31,16 +35,22 @@ teardown() {
|
||||
[[ "$output" =~ "Initialize data repository" ]] || false
|
||||
}
|
||||
|
||||
# Asserts that when DOLT_REFLOG_RECORD_LIMIT has been set, the reflog only contains the
|
||||
# most recent entries and is limited by the env var's value.
|
||||
@test "reflog: set DOLT_REFLOG_RECORD_LIMIT" {
|
||||
export DOLT_REFLOG_RECORD_LIMIT=2
|
||||
setup_common
|
||||
dolt sql -q "create table t (i int primary key, j int);"
|
||||
dolt sql -q "insert into t values (1, 1), (2, 2), (3, 3)";
|
||||
dolt commit -Am "initial commit"
|
||||
dolt commit --allow-empty -m "test commit"
|
||||
dolt commit --allow-empty -m "test commit 1"
|
||||
dolt commit --allow-empty -m "test commit 2"
|
||||
|
||||
# Only the most recent two ref changes should appear in the log
|
||||
run dolt sql -q "select * from dolt_reflog();"
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "exceeded reflog record limit" ]] || false
|
||||
[[ "$output" =~ "Initialize data repository" ]] || false
|
||||
}
|
||||
[[ "$output" =~ "test commit 1" ]] || false
|
||||
[[ "$output" =~ "test commit 2" ]] || false
|
||||
[[ ! "$output" =~ "initial commit" ]] || false
|
||||
[[ ! "$output" =~ "Initialize data repository" ]] || false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user