mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-09 00:39:54 -06:00
go: sqle: dolt_gc: Add DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE env variable to control dolt_gc safepoint behavior.
This is a short-term setting which will allow choosing the session-aware gc safepoint behavior, instead of the legacy behavior which kills all in-flight connections when performing a GC.
This commit is contained in:
@@ -45,4 +45,8 @@ const (
|
||||
EnvDbNameReplace = "DOLT_DBNAME_REPLACE"
|
||||
EnvDoltRootHost = "DOLT_ROOT_HOST"
|
||||
EnvDoltRootPassword = "DOLT_ROOT_PASSWORD"
|
||||
|
||||
// If set, must be "kill_connections" or "session_aware"
|
||||
// Will go away after session_aware is made default-and-only.
|
||||
EnvGCSafepointControllerChoice = "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE"
|
||||
)
|
||||
|
||||
@@ -37,10 +37,19 @@ const (
|
||||
cmdSuccess = 0
|
||||
)
|
||||
|
||||
var useSessionAwareSafepointController bool
|
||||
|
||||
func init() {
|
||||
if os.Getenv(dconfig.EnvDisableGcProcedure) != "" {
|
||||
DoltGCFeatureFlag = false
|
||||
}
|
||||
if choice := os.Getenv(dconfig.EnvGCSafepointControllerChoice); choice != "" {
|
||||
if choice == "session_aware" {
|
||||
useSessionAwareSafepointController = true
|
||||
} else if choice != "kill_connections" {
|
||||
panic("Invalid value for " + dconfig.EnvGCSafepointControllerChoice + ". must be session_aware or kill_connections")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var DoltGCFeatureFlag = true
|
||||
@@ -162,8 +171,8 @@ type sessionAwareSafepointController struct {
|
||||
callCtx *sql.Context
|
||||
origEpoch int
|
||||
|
||||
waiter *dsess.GCSafepointWaiter
|
||||
keeper func(hash.Hash) bool
|
||||
waiter *dsess.GCSafepointWaiter
|
||||
keeper func(hash.Hash) bool
|
||||
}
|
||||
|
||||
func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dsess.DoltSession) error {
|
||||
@@ -261,14 +270,17 @@ func (impl *DoltGCProcedure) doGC(ctx *sql.Context, args []string) (int, error)
|
||||
}
|
||||
|
||||
var sc types.GCSafepointController
|
||||
sc = killConnectionsSafepointController{
|
||||
origEpoch: origepoch,
|
||||
callCtx: ctx,
|
||||
}
|
||||
sc = &sessionAwareSafepointController{
|
||||
origEpoch: origepoch,
|
||||
callCtx: ctx,
|
||||
controller: impl.gcSafepointController,
|
||||
if useSessionAwareSafepointController {
|
||||
sc = &sessionAwareSafepointController{
|
||||
origEpoch: origepoch,
|
||||
callCtx: ctx,
|
||||
controller: impl.gcSafepointController,
|
||||
}
|
||||
} else {
|
||||
sc = killConnectionsSafepointController{
|
||||
origEpoch: origepoch,
|
||||
callCtx: ctx,
|
||||
}
|
||||
}
|
||||
err = ddb.GC(ctx, mode, sc)
|
||||
if err != nil {
|
||||
|
||||
@@ -144,6 +144,7 @@ func (w *GCSafepointWaiter) Wait(ctx context.Context) error {
|
||||
}
|
||||
|
||||
var closedCh = make(chan struct{})
|
||||
|
||||
func init() {
|
||||
close(closedCh)
|
||||
}
|
||||
|
||||
@@ -31,22 +31,43 @@ import (
|
||||
)
|
||||
|
||||
func TestConcurrentGC(t *testing.T) {
|
||||
var gct gcTest
|
||||
gct.numThreads = 8
|
||||
gct.duration = 10 * time.Second
|
||||
var base = gcTest {
|
||||
numThreads: 8,
|
||||
duration: 10 * time.Second,
|
||||
}
|
||||
t.Run("NoCommits", func(t *testing.T) {
|
||||
gct.run(t)
|
||||
var base = base
|
||||
base.commit = false
|
||||
t.Run("KillConnections", func(t *testing.T) {
|
||||
var gct = base
|
||||
gct.run(t)
|
||||
})
|
||||
t.Run("SessionAware", func(t *testing.T) {
|
||||
var gct = base
|
||||
gct.sessionAware = true
|
||||
gct.run(t)
|
||||
})
|
||||
})
|
||||
gct.commit = true
|
||||
t.Run("WithCommits", func(t *testing.T) {
|
||||
gct.run(t)
|
||||
var base = base
|
||||
base.commit = true
|
||||
t.Run("KillConnections", func(t *testing.T) {
|
||||
var gct = base
|
||||
gct.run(t)
|
||||
})
|
||||
t.Run("SessionAware", func(t *testing.T) {
|
||||
var gct = base
|
||||
gct.sessionAware = true
|
||||
gct.run(t)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type gcTest struct {
|
||||
numThreads int
|
||||
duration time.Duration
|
||||
commit bool
|
||||
numThreads int
|
||||
duration time.Duration
|
||||
commit bool
|
||||
sessionAware bool
|
||||
}
|
||||
|
||||
func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) {
|
||||
@@ -69,13 +90,19 @@ func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) {
|
||||
|
||||
func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int) error {
|
||||
conn, err := db.Conn(ctx)
|
||||
if err != nil {
|
||||
if gct.sessionAware {
|
||||
if !assert.NoError(t, err) {
|
||||
return nil
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Logf("err in Conn: %v", err)
|
||||
return nil
|
||||
}
|
||||
defer conn.Close()
|
||||
_, err = conn.ExecContext(ctx, "update vals set val = val+1 where id = ?", i)
|
||||
if err != nil {
|
||||
if gct.sessionAware {
|
||||
assert.NoError(t, err)
|
||||
} else if err != nil {
|
||||
if !assert.NotContains(t, err.Error(), "dangling ref") {
|
||||
return err
|
||||
}
|
||||
@@ -89,7 +116,9 @@ func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int)
|
||||
}
|
||||
if gct.commit {
|
||||
_, err = conn.ExecContext(ctx, fmt.Sprintf("call dolt_commit('-am', 'increment vals id = %d')", i))
|
||||
if err != nil {
|
||||
if gct.sessionAware {
|
||||
assert.NoError(t, err)
|
||||
} else if err != nil {
|
||||
if !assert.NotContains(t, err.Error(), "dangling ref") {
|
||||
return err
|
||||
}
|
||||
@@ -107,30 +136,27 @@ func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int)
|
||||
|
||||
func (gct gcTest) doGC(t *testing.T, ctx context.Context, db *sql.DB) error {
|
||||
conn, err := db.Conn(ctx)
|
||||
if err != nil {
|
||||
if gct.sessionAware {
|
||||
if !assert.NoError(t, err) {
|
||||
return nil
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Logf("err in Conn for dolt_gc: %v", err)
|
||||
return nil
|
||||
}
|
||||
defer func() {
|
||||
// After calling dolt_gc, the connection is bad. Remove it from the connection pool.
|
||||
conn.Raw(func(_ any) error {
|
||||
return sqldriver.ErrBadConn
|
||||
})
|
||||
}()
|
||||
if !gct.sessionAware {
|
||||
defer func() {
|
||||
// After calling dolt_gc, the connection is bad. Remove it from the connection pool.
|
||||
conn.Raw(func(_ any) error {
|
||||
return sqldriver.ErrBadConn
|
||||
})
|
||||
}()
|
||||
} else {
|
||||
defer conn.Close()
|
||||
}
|
||||
b := time.Now()
|
||||
_, err = conn.ExecContext(ctx, "call dolt_gc()")
|
||||
if err != nil {
|
||||
if !assert.NotContains(t, err.Error(), "dangling ref") {
|
||||
return err
|
||||
}
|
||||
if !assert.NotContains(t, err.Error(), "is unexpected noms value") {
|
||||
return err
|
||||
}
|
||||
if !assert.NotContains(t, err.Error(), "interface conversion: types.Value is nil") {
|
||||
return err
|
||||
}
|
||||
t.Logf("err in Exec dolt_gc: %v", err)
|
||||
} else {
|
||||
if assert.NoError(t, err) {
|
||||
t.Logf("successful dolt_gc took %v", time.Since(b))
|
||||
}
|
||||
return nil
|
||||
@@ -184,7 +210,14 @@ func (gct gcTest) run(t *testing.T) {
|
||||
repo, err := rs.MakeRepo("concurrent_gc_test")
|
||||
require.NoError(t, err)
|
||||
|
||||
server := MakeServer(t, repo, &driver.Server{})
|
||||
srvSettings := &driver.Server{}
|
||||
if gct.sessionAware {
|
||||
srvSettings.Envs = append(srvSettings.Envs, "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE=session_aware")
|
||||
} else {
|
||||
srvSettings.Envs = append(srvSettings.Envs, "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE=kill_connections")
|
||||
}
|
||||
|
||||
server := MakeServer(t, repo, srvSettings)
|
||||
server.DBName = "concurrent_gc_test"
|
||||
|
||||
db, err := server.DB(driver.Connection{User: "root"})
|
||||
|
||||
Reference in New Issue
Block a user