mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-19 19:21:44 -05:00
/go/{cmd,libraries}: close dbs in no cache mode
This commit is contained in:
96
go/cmd/dolt/commands/engine/lock_release_test.go
Normal file
96
go/cmd/dolt/commands/engine/lock_release_test.go
Normal file
@@ -0,0 +1,96 @@
|
||||
// Copyright 2026 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/fslock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/config"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
)
|
||||
|
||||
// TestCreateDatabase_ReleasesLockOnEngineClose asserts that when embedded callers opt into
|
||||
// disable_singleton_cache, closing the SQL engine releases the underlying filesystem lock
|
||||
// for a newly created database so subsequent opens can proceed.
|
||||
func TestCreateDatabase_ReleasesLockOnEngineClose(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("skipping on windows due to differing file locking semantics")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
root := t.TempDir()
|
||||
|
||||
fs, err := filesys.LocalFS.WithWorkingDir(root)
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := config.NewMapConfig(map[string]string{
|
||||
config.UserNameKey: "test",
|
||||
config.UserEmailKey: "test@example.com",
|
||||
})
|
||||
|
||||
dbLoadParams := map[string]interface{}{
|
||||
dbfactory.DisableSingletonCacheParam: struct{}{},
|
||||
dbfactory.FailOnJournalLockTimeoutParam: struct{}{},
|
||||
}
|
||||
|
||||
rootEnv := env.LoadWithoutDB(ctx, env.GetCurrentUserHomeDir, fs, doltdb.LocalDirDoltDB, "test")
|
||||
rootEnv.DBLoadParams = map[string]interface{}{
|
||||
dbfactory.DisableSingletonCacheParam: struct{}{},
|
||||
dbfactory.FailOnJournalLockTimeoutParam: struct{}{},
|
||||
}
|
||||
mrEnv, err := env.MultiEnvForDirectory(ctx, cfg, fs, "test", rootEnv)
|
||||
require.NoError(t, err)
|
||||
|
||||
seCfg := &SqlEngineConfig{
|
||||
ServerUser: "root",
|
||||
ServerHost: "localhost",
|
||||
Autocommit: true,
|
||||
DBLoadParams: dbLoadParams,
|
||||
}
|
||||
|
||||
se, err := NewSqlEngine(ctx, mrEnv, seCfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
sqlCtx, err := se.NewLocalContext(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, _, err = se.Query(sqlCtx, "CREATE DATABASE IF NOT EXISTS testdb")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = se.Close()
|
||||
require.True(t, err == nil || errors.Is(err, context.Canceled), "unexpected close error: %v", err)
|
||||
|
||||
// If the DB is properly closed, we should be able to take the lock quickly.
|
||||
lockPath := filepath.Join(root, "testdb", ".dolt", "noms", "LOCK")
|
||||
_, err = os.Stat(lockPath)
|
||||
require.NoError(t, err, "expected lock file to exist at %s", lockPath)
|
||||
|
||||
lck := fslock.New(lockPath)
|
||||
err = lck.LockWithTimeout(25 * time.Millisecond)
|
||||
require.NoError(t, err, "expected lock to be free after engine close (path=%s)", lockPath)
|
||||
require.NoError(t, lck.Unlock())
|
||||
}
|
||||
@@ -186,6 +186,9 @@ func NewSqlEngine(
|
||||
return nil, err
|
||||
}
|
||||
pro = pro.WithRemoteDialer(mrEnv.RemoteDialProvider())
|
||||
if config != nil && len(config.DBLoadParams) > 0 {
|
||||
pro.SetDBLoadParams(config.DBLoadParams)
|
||||
}
|
||||
|
||||
config.ClusterController.RegisterStoredProcedures(pro)
|
||||
if config.ClusterController != nil {
|
||||
|
||||
18
go/libraries/doltcore/env/environment.go
vendored
18
go/libraries/doltcore/env/environment.go
vendored
@@ -127,6 +127,9 @@ func (dEnv *DoltEnv) DoltDB(ctx context.Context) *doltdb.DoltDB {
|
||||
|
||||
func (dEnv *DoltEnv) LoadDoltDBWithParams(ctx context.Context, nbf *types.NomsBinFormat, urlStr string, fs filesys.Filesys, params map[string]interface{}) error {
|
||||
if dEnv.doltDB == nil {
|
||||
if nbf == nil {
|
||||
nbf = types.Format_Default
|
||||
}
|
||||
// Merge any environment-level DB load params without mutating the caller's map.
|
||||
if len(dEnv.DBLoadParams) > 0 {
|
||||
if params == nil {
|
||||
@@ -136,11 +139,14 @@ func (dEnv *DoltEnv) LoadDoltDBWithParams(ctx context.Context, nbf *types.NomsBi
|
||||
maps.Copy(params, dEnv.DBLoadParams)
|
||||
}
|
||||
}
|
||||
ddb, err := doltdb.LoadDoltDBWithParams(ctx, types.Format_Default, urlStr, fs, params)
|
||||
ddb, err := doltdb.LoadDoltDBWithParams(ctx, nbf, urlStr, fs, params)
|
||||
if err != nil {
|
||||
dEnv.DBLoadError = err
|
||||
return err
|
||||
}
|
||||
dEnv.doltDB = ddb
|
||||
dEnv.urlStr = urlStr
|
||||
dEnv.DBLoadError = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -506,9 +512,7 @@ func (dEnv *DoltEnv) InitRepoWithNoData(ctx context.Context, nbf *types.NomsBinF
|
||||
return err
|
||||
}
|
||||
|
||||
dEnv.doltDB, err = doltdb.LoadDoltDB(ctx, nbf, dEnv.urlStr, dEnv.FS)
|
||||
|
||||
return err
|
||||
return dEnv.LoadDoltDBWithParams(ctx, nbf, dEnv.urlStr, dEnv.FS, nil)
|
||||
}
|
||||
|
||||
var ErrCannotCreateDirDoesNotExist = errors.New("dir does not exist")
|
||||
@@ -640,13 +644,11 @@ func (dEnv *DoltEnv) InitDBWithTime(ctx context.Context, nbf *types.NomsBinForma
|
||||
}
|
||||
|
||||
func (dEnv *DoltEnv) InitDBWithCommitMetaGenerator(ctx context.Context, nbf *types.NomsBinFormat, branchName string, commitMeta datas.CommitMetaGenerator) error {
|
||||
var err error
|
||||
dEnv.doltDB, err = doltdb.LoadDoltDB(ctx, nbf, dEnv.urlStr, dEnv.FS)
|
||||
if err != nil {
|
||||
if err := dEnv.LoadDoltDBWithParams(ctx, nbf, dEnv.urlStr, dEnv.FS, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = dEnv.DoltDB(ctx).WriteEmptyRepoWithCommitMetaGenerator(ctx, branchName, commitMeta)
|
||||
err := dEnv.DoltDB(ctx).WriteEmptyRepoWithCommitMetaGenerator(ctx, branchName, commitMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", doltdb.ErrNomsIO, err)
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -52,6 +53,13 @@ type DoltDatabaseProvider struct {
|
||||
fs filesys.Filesys
|
||||
remoteDialer dbfactory.GRPCDialProvider // TODO: why isn't this a method defined on the remote object
|
||||
|
||||
// dbLoadParams are optional parameters to apply when loading local file-backed databases created
|
||||
// via provider-managed code paths (e.g. CREATE DATABASE / dolt_clone / undrop). These should match
|
||||
// the params threaded into env.DoltEnv.DBLoadParams by higher layers (e.g. engine.SqlEngineConfig.DBLoadParams).
|
||||
//
|
||||
// Note: these params are only effective if they are set before the underlying DoltDB is loaded.
|
||||
dbLoadParams map[string]interface{}
|
||||
|
||||
// dbLocations maps a database name to its file system root
|
||||
dbLocations map[string]filesys.Filesys
|
||||
databases map[string]dsess.SqlDatabase
|
||||
@@ -195,6 +203,31 @@ func (p *DoltDatabaseProvider) WithRemoteDialer(provider dbfactory.GRPCDialProvi
|
||||
return &cp
|
||||
}
|
||||
|
||||
// SetDBLoadParams sets optional DB load params for newly created / registered databases. The provided map is cloned.
|
||||
func (p *DoltDatabaseProvider) SetDBLoadParams(params map[string]interface{}) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if len(params) == 0 {
|
||||
p.dbLoadParams = nil
|
||||
return
|
||||
}
|
||||
p.dbLoadParams = maps.Clone(params)
|
||||
}
|
||||
|
||||
func (p *DoltDatabaseProvider) applyDBLoadParamsToEnv(dEnv *env.DoltEnv) {
|
||||
if dEnv == nil {
|
||||
return
|
||||
}
|
||||
if len(p.dbLoadParams) == 0 {
|
||||
return
|
||||
}
|
||||
if dEnv.DBLoadParams == nil {
|
||||
dEnv.DBLoadParams = maps.Clone(p.dbLoadParams)
|
||||
return
|
||||
}
|
||||
maps.Copy(dEnv.DBLoadParams, p.dbLoadParams)
|
||||
}
|
||||
|
||||
// AddInitDatabaseHook adds an InitDatabaseHook to this provider. The hook will be invoked
|
||||
// whenever this provider creates a new database.
|
||||
func (p *DoltDatabaseProvider) AddInitDatabaseHook(hook InitDatabaseHook) {
|
||||
@@ -212,9 +245,43 @@ func (p *DoltDatabaseProvider) FileSystem() filesys.Filesys {
|
||||
}
|
||||
|
||||
func (p *DoltDatabaseProvider) Close() {
|
||||
p.mu.RLock()
|
||||
closeDoltDBs := p.dbLoadParams != nil
|
||||
if closeDoltDBs {
|
||||
_, closeDoltDBs = p.dbLoadParams[dbfactory.DisableSingletonCacheParam]
|
||||
}
|
||||
|
||||
// Copy the databases so we can close outside the lock.
|
||||
dbs := make([]dsess.SqlDatabase, 0, len(p.databases))
|
||||
for _, db := range p.databases {
|
||||
dbs = append(dbs, db)
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
|
||||
for _, db := range dbs {
|
||||
db.Close()
|
||||
}
|
||||
|
||||
// In embedded / nocache mode, the underlying DoltDBs are not tracked by the singleton cache, so
|
||||
// the provider is responsible for closing them to release filesystem locks.
|
||||
if closeDoltDBs {
|
||||
seen := make(map[*doltdb.DoltDB]struct{})
|
||||
for _, db := range dbs {
|
||||
if db == nil {
|
||||
continue
|
||||
}
|
||||
for _, ddb := range db.DoltDatabases() {
|
||||
if ddb == nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[ddb]; ok {
|
||||
continue
|
||||
}
|
||||
seen[ddb] = struct{}{}
|
||||
_ = ddb.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Installs an InitDatabaseHook which configures new databases--those
|
||||
@@ -512,7 +579,9 @@ func (p *DoltDatabaseProvider) CreateCollatedDatabase(ctx *sql.Context, name str
|
||||
}
|
||||
|
||||
// TODO: fill in version appropriately
|
||||
newEnv := env.Load(ctx, env.GetCurrentUserHomeDir, newFs, p.dbFactoryUrl, "TODO")
|
||||
// Use LoadWithoutDB so we can apply db-load params before any DB is opened.
|
||||
newEnv := env.LoadWithoutDB(ctx, env.GetCurrentUserHomeDir, newFs, p.dbFactoryUrl, "TODO")
|
||||
p.applyDBLoadParamsToEnv(newEnv)
|
||||
|
||||
newDbStorageFormat := types.Format_Default
|
||||
err = newEnv.InitRepo(ctx, newDbStorageFormat, sess.Username(), sess.Email(), p.defaultBranch)
|
||||
@@ -753,6 +822,7 @@ func (p *DoltDatabaseProvider) cloneDatabaseFromRemote(
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.applyDBLoadParamsToEnv(dEnv)
|
||||
|
||||
err = actions.CloneRemote(ctx, srcDB, remoteName, branch, false, depth, dEnv)
|
||||
if err != nil {
|
||||
@@ -860,7 +930,9 @@ func (p *DoltDatabaseProvider) UndropDatabase(ctx *sql.Context, name string) (er
|
||||
return err
|
||||
}
|
||||
|
||||
newEnv := env.Load(ctx, env.GetCurrentUserHomeDir, newFs, p.dbFactoryUrl, "TODO")
|
||||
// Use LoadWithoutDB so we can apply db-load params before any DB is opened.
|
||||
newEnv := env.LoadWithoutDB(ctx, env.GetCurrentUserHomeDir, newFs, p.dbFactoryUrl, "TODO")
|
||||
p.applyDBLoadParamsToEnv(newEnv)
|
||||
return p.registerNewDatabase(ctx, exactCaseName, newEnv)
|
||||
}
|
||||
|
||||
@@ -882,6 +954,9 @@ func (p *DoltDatabaseProvider) registerNewDatabase(ctx *sql.Context, name string
|
||||
return fmt.Errorf("unable to register new database without database provider mutex being locked")
|
||||
}
|
||||
|
||||
// Ensure any provider-supplied DB load params are applied before any lazy DB load occurs.
|
||||
p.applyDBLoadParamsToEnv(newEnv)
|
||||
|
||||
fkChecks, err := ctx.GetSessionVariable(ctx, "foreign_key_checks")
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"maps"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -588,7 +589,11 @@ func (sc *StatsController) initStorage(ctx context.Context, fs filesys.Filesys)
|
||||
return nil, fmt.Errorf("unable to make directory '%s', cause: %s", dbfactory.DoltStatsDir, err.Error())
|
||||
}
|
||||
|
||||
dEnv = env.Load(ctx, sc.hdpEnv.GetUserHomeDir, statsFs, urlPath, "test")
|
||||
// Use LoadWithoutDB so DB load params can be applied before any DB is opened.
|
||||
dEnv = env.LoadWithoutDB(ctx, sc.hdpEnv.GetUserHomeDir, statsFs, urlPath, doltversion.Version)
|
||||
if sc.hdpEnv != nil && len(sc.hdpEnv.DBLoadParams) > 0 {
|
||||
dEnv.DBLoadParams = maps.Clone(sc.hdpEnv.DBLoadParams)
|
||||
}
|
||||
err = dEnv.InitRepo(ctx, types.Format_Default, "stats", "stats@stats.com", env.DefaultInitBranch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -597,6 +602,9 @@ func (sc *StatsController) initStorage(ctx context.Context, fs filesys.Filesys)
|
||||
return nil, fmt.Errorf("file exists where the dolt stats directory should be")
|
||||
} else {
|
||||
dEnv = env.LoadWithoutDB(ctx, sc.hdpEnv.GetUserHomeDir, statsFs, "", doltversion.Version)
|
||||
if sc.hdpEnv != nil && len(sc.hdpEnv.DBLoadParams) > 0 {
|
||||
dEnv.DBLoadParams = maps.Clone(sc.hdpEnv.DBLoadParams)
|
||||
}
|
||||
}
|
||||
|
||||
if err := dEnv.LoadDoltDBWithParams(ctx, types.Format_Default, urlPath, statsFs, params); err != nil {
|
||||
|
||||
@@ -242,12 +242,17 @@ func (sc *StatsController) Gc(ctx *sql.Context) error {
|
||||
}
|
||||
|
||||
func (sc *StatsController) Close() {
|
||||
var (
|
||||
doneCh chan struct{}
|
||||
kv StatsKv
|
||||
)
|
||||
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
|
||||
// Already closed.
|
||||
select {
|
||||
case <-sc.closed:
|
||||
sc.mu.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
@@ -260,5 +265,26 @@ func (sc *StatsController) Close() {
|
||||
sc.signalListener(leStop)
|
||||
|
||||
close(sc.closed)
|
||||
doneCh = sc.workerDoneCh
|
||||
kv = sc.kv
|
||||
sc.mu.Unlock()
|
||||
|
||||
// Best-effort wait for worker exit to avoid racing a close of underlying storage.
|
||||
if doneCh != nil {
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
// If we're using a prolly-backed stats store, it owns a DoltDB with its own filesystem locks.
|
||||
// Close it best-effort on shutdown so embedded callers can reopen without contention.
|
||||
if ps, ok := kv.(*prollyStats); ok && ps != nil && ps.destDb != nil {
|
||||
for _, ddb := range ps.destDb.DoltDatabases() {
|
||||
if ddb != nil {
|
||||
_ = ddb.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user