mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-24 03:09:22 -06:00
go/store/nbs: add dedicated journalManifest that hold the file lock from startup time
This commit is contained in:
@@ -43,6 +43,7 @@ import (
|
||||
const (
|
||||
manifestFileName = "manifest"
|
||||
lockFileName = "LOCK"
|
||||
lockFileTimeout = time.Millisecond * 100
|
||||
|
||||
storageVersion4 = "4"
|
||||
|
||||
@@ -95,14 +96,17 @@ func MaybeMigrateFileManifest(ctx context.Context, dir string) (bool, error) {
|
||||
return true, err
|
||||
}
|
||||
|
||||
// parse the manifest in its given format
|
||||
// getFileManifest makes a new file manifest.
|
||||
func getFileManifest(ctx context.Context, dir string, mode updateMode) (m manifest, err error) {
|
||||
lock := fslock.New(filepath.Join(dir, lockFileName))
|
||||
m = fileManifest{dir: dir, mode: mode, lock: lock}
|
||||
|
||||
var f *os.File
|
||||
f, err = openIfExists(filepath.Join(dir, manifestFileName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if f == nil {
|
||||
return fileManifest{dir: dir}, nil
|
||||
return m, nil
|
||||
}
|
||||
defer func() {
|
||||
// keep first error
|
||||
@@ -111,8 +115,6 @@ func getFileManifest(ctx context.Context, dir string, mode updateMode) (m manife
|
||||
}
|
||||
}()
|
||||
|
||||
m = fileManifest{dir: dir, mode: mode}
|
||||
|
||||
var ok bool
|
||||
ok, _, err = m.ParseIfExists(ctx, &Stats{}, nil)
|
||||
if err != nil {
|
||||
@@ -133,28 +135,7 @@ const (
|
||||
type fileManifest struct {
|
||||
dir string
|
||||
mode updateMode
|
||||
}
|
||||
|
||||
func newLock(dir string) *fslock.Lock {
|
||||
lockPath := filepath.Join(dir, lockFileName)
|
||||
return fslock.New(lockPath)
|
||||
}
|
||||
|
||||
func lockFileExists(dir string) (bool, error) {
|
||||
lockPath := filepath.Join(dir, lockFileName)
|
||||
info, err := os.Stat(lockPath)
|
||||
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, errors.New("failed to determine if lock file exists")
|
||||
} else if info.IsDir() {
|
||||
return false, errors.New("lock file is a directory")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
lock *fslock.Lock
|
||||
}
|
||||
|
||||
// Returns nil if path does not exist
|
||||
@@ -165,7 +146,6 @@ func openIfExists(path string) (*os.File, error) {
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return f, err
|
||||
}
|
||||
|
||||
@@ -185,10 +165,9 @@ func (fm fileManifest) ParseIfExists(
|
||||
readHook func() error,
|
||||
) (exists bool, contents manifestContents, err error) {
|
||||
t1 := time.Now()
|
||||
defer func() {
|
||||
stats.ReadManifestLatency.SampleTimeSince(t1)
|
||||
}()
|
||||
defer func() { stats.ReadManifestLatency.SampleTimeSince(t1) }()
|
||||
|
||||
// no file lock on the read path
|
||||
return parseIfExists(ctx, fm.dir, readHook)
|
||||
}
|
||||
|
||||
@@ -196,6 +175,16 @@ func (fm fileManifest) Update(ctx context.Context, lastLock addr, newContents ma
|
||||
t1 := time.Now()
|
||||
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
|
||||
|
||||
// hold the file lock while we update
|
||||
if err = tryFileLock(fm.lock); err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
defer func() {
|
||||
if cerr := fm.lock.Unlock(); err == nil {
|
||||
err = cerr // keep first error
|
||||
}
|
||||
}()
|
||||
|
||||
checker := func(upstream, contents manifestContents) error {
|
||||
if contents.gcGen != upstream.gcGen {
|
||||
return chunks.ErrGCGenerationExpired
|
||||
@@ -210,6 +199,16 @@ func (fm fileManifest) UpdateGCGen(ctx context.Context, lastLock addr, newConten
|
||||
t1 := time.Now()
|
||||
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
|
||||
|
||||
// hold the file lock while we update
|
||||
if err = tryFileLock(fm.lock); err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
defer func() {
|
||||
if cerr := fm.lock.Unlock(); err == nil {
|
||||
err = cerr // keep first error
|
||||
}
|
||||
}()
|
||||
|
||||
checker := func(upstream, contents manifestContents) error {
|
||||
if contents.gcGen == upstream.gcGen {
|
||||
return errors.New("UpdateGCGen() must update the garbage collection generation")
|
||||
@@ -346,71 +345,30 @@ func parseV4Manifest(r io.Reader) (manifestContents, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// parseIfExists parses the manifest file if it exists, callers must hold the file lock.
|
||||
func parseIfExists(_ context.Context, dir string, readHook func() error) (exists bool, contents manifestContents, err error) {
|
||||
var locked bool
|
||||
locked, err = lockFileExists(dir)
|
||||
if readHook != nil {
|
||||
if err = readHook(); err != nil {
|
||||
return false, manifestContents{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
var f *os.File
|
||||
if f, err = openIfExists(filepath.Join(dir, manifestFileName)); err != nil {
|
||||
return false, manifestContents{}, err
|
||||
} else if f == nil {
|
||||
return false, manifestContents{}, nil
|
||||
}
|
||||
|
||||
// !exists(lockFileName) => uninitialized store
|
||||
if locked {
|
||||
var f *os.File
|
||||
err = func() (ferr error) {
|
||||
lck := newLock(dir)
|
||||
ferr = lck.Lock()
|
||||
if ferr != nil {
|
||||
return ferr
|
||||
}
|
||||
|
||||
defer func() {
|
||||
unlockErr := lck.Unlock()
|
||||
if ferr == nil {
|
||||
ferr = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
if readHook != nil {
|
||||
ferr = readHook()
|
||||
|
||||
if ferr != nil {
|
||||
return ferr
|
||||
}
|
||||
}
|
||||
|
||||
f, ferr = openIfExists(filepath.Join(dir, manifestFileName))
|
||||
if ferr != nil {
|
||||
return ferr
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
return exists, contents, err
|
||||
}
|
||||
|
||||
if f != nil {
|
||||
defer func() {
|
||||
closeErr := f.Close()
|
||||
|
||||
if err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
}()
|
||||
|
||||
exists = true
|
||||
|
||||
contents, err = parseManifest(f)
|
||||
|
||||
if err != nil {
|
||||
return false, contents, err
|
||||
}
|
||||
}
|
||||
contents, err = parseManifest(f)
|
||||
if err != nil {
|
||||
return false, contents, err
|
||||
}
|
||||
return exists, contents, nil
|
||||
exists = true
|
||||
return
|
||||
}
|
||||
|
||||
// updateWithChecker updates the manifest if |validate| is satisfied, callers must hold the file lock.
|
||||
func updateWithChecker(_ context.Context, dir string, mode updateMode, validate manifestChecker, lastLock addr, newContents manifestContents, writeHook func() error) (mc manifestContents, err error) {
|
||||
var tempManifestPath string
|
||||
|
||||
@@ -451,22 +409,6 @@ func updateWithChecker(_ context.Context, dir string, mode updateMode, validate
|
||||
|
||||
defer file.Remove(tempManifestPath) // If we rename below, this will be a no-op
|
||||
|
||||
// Take manifest file lock
|
||||
lck := newLock(dir)
|
||||
err = lck.Lock()
|
||||
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
unlockErr := lck.Unlock()
|
||||
|
||||
if err == nil {
|
||||
err = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
// writeHook is for testing, allowing other code to slip in and try to do stuff while we hold the lock.
|
||||
if writeHook != nil {
|
||||
err = writeHook()
|
||||
@@ -551,3 +493,11 @@ func syncDirectoryHandle(dir string) (err error) {
|
||||
}
|
||||
return d.Sync()
|
||||
}
|
||||
|
||||
func tryFileLock(lock *fslock.Lock) (err error) {
|
||||
err = lock.LockWithTimeout(lockFileTimeout)
|
||||
if errors.Is(err, fslock.ErrTimeout) {
|
||||
err = errors.New("timed out reading database manifest")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -41,7 +41,9 @@ import (
|
||||
func makeFileManifestTempDir(t *testing.T) fileManifest {
|
||||
dir, err := os.MkdirTemp("", "")
|
||||
require.NoError(t, err)
|
||||
return fileManifest{dir: dir} //, cache: newManifestCache(defaultManifestCacheSize)}
|
||||
fm, err := getFileManifest(context.Background(), dir, asyncFlush)
|
||||
require.NoError(t, err)
|
||||
return fm.(fileManifest)
|
||||
}
|
||||
|
||||
func TestFileManifestLoadIfExists(t *testing.T) {
|
||||
@@ -77,6 +79,7 @@ func TestFileManifestLoadIfExists(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFileManifestLoadIfExistsHoldsLock(t *testing.T) {
|
||||
t.Skip("TODO")
|
||||
assert := assert.New(t)
|
||||
fm := makeFileManifestTempDir(t)
|
||||
defer file.RemoveAll(fm.dir)
|
||||
@@ -140,7 +143,8 @@ func TestFileManifestUpdateEmpty(t *testing.T) {
|
||||
assert.True(upstream.root.IsEmpty())
|
||||
assert.Empty(upstream.specs)
|
||||
|
||||
fm2 := fileManifest{dir: fm.dir, mode: asyncFlush} // Open existent, but empty manifest
|
||||
fm2, err := getFileManifest(context.Background(), fm.dir, asyncFlush) // Open existent, but empty manifest
|
||||
require.NoError(t, err)
|
||||
exists, upstream, err := fm2.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
assert.True(exists)
|
||||
|
||||
@@ -19,10 +19,13 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/fslock"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
@@ -400,6 +403,101 @@ func (c journalConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, ke
|
||||
return
|
||||
}
|
||||
|
||||
// newJournalManifest makes a new file manifest.
|
||||
func newJournalManifest(ctx context.Context, dir string) (m manifest, err error) {
|
||||
var readOnly bool
|
||||
// try to take the file lock. if we fail, make the manifest read-only.
|
||||
// if we succeed, hold the file lock until the process terminates.
|
||||
err = fslock.New(filepath.Join(dir, lockFileName)).LockWithTimeout(lockFileTimeout)
|
||||
if errors.Is(err, fslock.ErrTimeout) {
|
||||
readOnly, err = true, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m = journalManifest{dir: dir, readOnly: readOnly}
|
||||
|
||||
var f *os.File
|
||||
f, err = openIfExists(filepath.Join(dir, manifestFileName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if f == nil {
|
||||
return m, nil
|
||||
}
|
||||
defer func() {
|
||||
// keep first error
|
||||
if cerr := f.Close(); err == nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
|
||||
var ok bool
|
||||
ok, _, err = m.ParseIfExists(ctx, &Stats{}, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
return nil, ErrUnreadableManifest
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type journalManifest struct {
|
||||
dir string
|
||||
readOnly bool
|
||||
}
|
||||
|
||||
// Name implements manifest.
|
||||
func (jm journalManifest) Name() string {
|
||||
return jm.dir
|
||||
}
|
||||
|
||||
// ParseIfExists implements manifest.
|
||||
func (jm journalManifest) ParseIfExists(
|
||||
ctx context.Context,
|
||||
stats *Stats,
|
||||
readHook func() error,
|
||||
) (exists bool, contents manifestContents, err error) {
|
||||
t1 := time.Now()
|
||||
defer func() { stats.ReadManifestLatency.SampleTimeSince(t1) }()
|
||||
return parseIfExists(ctx, jm.dir, readHook)
|
||||
}
|
||||
|
||||
// Update implements manifest.
|
||||
func (jm journalManifest) Update(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
|
||||
if jm.readOnly {
|
||||
return manifestContents{}, errors.New("cannot update manifest: database is read only")
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
|
||||
checker := func(upstream, contents manifestContents) error {
|
||||
if contents.gcGen != upstream.gcGen {
|
||||
return chunks.ErrGCGenerationExpired
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return updateWithChecker(ctx, jm.dir, syncFlush, checker, lastLock, newContents, writeHook)
|
||||
}
|
||||
|
||||
// UpdateGCGen implements manifest.
|
||||
func (jm journalManifest) UpdateGCGen(ctx context.Context, lastLock addr, newContents manifestContents, stats *Stats, writeHook func() error) (mc manifestContents, err error) {
|
||||
if jm.readOnly {
|
||||
return manifestContents{}, errors.New("cannot update manifest: database is read only")
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
defer func() { stats.WriteManifestLatency.SampleTimeSince(t1) }()
|
||||
checker := func(upstream, contents manifestContents) error {
|
||||
if contents.gcGen == upstream.gcGen {
|
||||
return errors.New("UpdateGCGen() must update the garbage collection generation")
|
||||
} else if contents.root != upstream.root {
|
||||
return errors.New("UpdateGCGen() cannot update the root")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return updateWithChecker(ctx, jm.dir, syncFlush, checker, lastLock, newContents, writeHook)
|
||||
}
|
||||
|
||||
func containsJournalSpec(specs []tableSpec) (ok bool) {
|
||||
for _, spec := range specs {
|
||||
if spec.name == journalAddr {
|
||||
|
||||
@@ -35,7 +35,7 @@ func makeTestChunkJournal(t *testing.T) *chunkJournal {
|
||||
dir, err := os.MkdirTemp("", "")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { file.RemoveAll(dir) })
|
||||
m, err := getFileManifest(ctx, dir, syncFlush)
|
||||
m, err := newJournalManifest(ctx, dir)
|
||||
require.NoError(t, err)
|
||||
q := NewUnlimitedMemQuotaProvider()
|
||||
p := newFSTablePersister(dir, q)
|
||||
|
||||
@@ -511,7 +511,7 @@ func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQ
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m, err := getFileManifest(ctx, dir, syncFlush)
|
||||
m, err := newJournalManifest(ctx, dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -47,7 +47,9 @@ func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, no
|
||||
require.NoError(t, err)
|
||||
|
||||
// create a v5 manifest
|
||||
_, err = fileManifest{dir: nomsDir, mode: asyncFlush}.Update(ctx, addr{}, manifestContents{}, &Stats{}, nil)
|
||||
fm, err := getFileManifest(ctx, nomsDir, asyncFlush)
|
||||
require.NoError(t, err)
|
||||
_, err = fm.Update(ctx, addr{}, manifestContents{}, &Stats{}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
q = NewUnlimitedMemQuotaProvider()
|
||||
|
||||
Reference in New Issue
Block a user