Merge remote-tracking branch 'origin/main' into aaron/dsess-lifecycle

This commit is contained in:
Aaron Son
2025-02-03 10:01:27 -08:00
19 changed files with 1319 additions and 1066 deletions

View File

@@ -47,6 +47,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/statspro"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)
// SqlEngine packages up the context necessary to run sql queries against dsqle.
@@ -55,6 +56,7 @@ type SqlEngine struct {
contextFactory contextFactory
dsessFactory sessionFactory
engine *gms.Engine
fs filesys.Filesys
}
type sessionFactory func(mysqlSess *sql.BaseSession, pro sql.DatabaseProvider) (*dsess.DoltSession, error)
@@ -196,6 +198,7 @@ func NewSqlEngine(
sqlEngine.contextFactory = sqlContextFactory()
sqlEngine.dsessFactory = sessFactory
sqlEngine.engine = engine
sqlEngine.fs = pro.FileSystem()
// configuring stats depends on sessionBuilder
// sessionBuilder needs ref to statsProv
@@ -316,6 +319,10 @@ func (se *SqlEngine) GetUnderlyingEngine() *gms.Engine {
return se.engine
}
func (se *SqlEngine) FileSystem() filesys.Filesys {
return se.fs
}
func (se *SqlEngine) Close() error {
if se.engine != nil {
if se.engine.Analyzer.Catalog.BinlogReplicaController != nil {

View File

@@ -559,21 +559,27 @@ func ConfigureServices(
}
listenaddr := fmt.Sprintf(":%d", port)
sqlContextInterceptor := sqle.SqlContextServerInterceptor{
Factory: sqlEngine.NewDefaultContext,
}
args := remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
ReadOnly: apiReadOnly || serverConfig.ReadOnly(),
HttpListenAddr: listenaddr,
GrpcListenAddr: listenaddr,
ConcurrencyControl: remotesapi.PushConcurrencyControl_PUSH_CONCURRENCY_CONTROL_ASSERT_WORKING_SET,
Options: sqlContextInterceptor.Options(),
HttpInterceptor: sqlContextInterceptor.HTTP(nil),
}
var err error
args.FS, args.DBCache, err = sqle.RemoteSrvFSAndDBCache(sqlEngine.NewDefaultContext, sqle.DoNotCreateUnknownDatabases)
args.FS = sqlEngine.FileSystem()
args.DBCache, err = sqle.RemoteSrvDBCache(sqle.GetInterceptorSqlContext, sqle.DoNotCreateUnknownDatabases)
if err != nil {
lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err)
return err
}
authenticator := newAccessController(sqlEngine.NewDefaultContext, sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.MySQLDb)
authenticator := newAccessController(sqle.GetInterceptorSqlContext, sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.MySQLDb)
args = sqle.WithUserPasswordAuth(args, authenticator)
args.TLSConfig = serverConf.TLSConfig
@@ -621,6 +627,7 @@ func ConfigureServices(
lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err)
return err
}
args.FS = sqlEngine.FileSystem()
clusterRemoteSrvTLSConfig, err := LoadClusterTLSConfig(serverConfig.ClusterConfig())
if err != nil {
@@ -634,7 +641,7 @@ func ConfigureServices(
lgr.Errorf("error creating remotesapi server on port %d: %v", *serverConfig.RemotesapiPort(), err)
return err
}
clusterController.RegisterGrpcServices(sqlEngine.NewDefaultContext, clusterRemoteSrv.srv.GrpcServer())
clusterController.RegisterGrpcServices(sqle.GetInterceptorSqlContext, clusterRemoteSrv.srv.GrpcServer())
clusterRemoteSrv.lis, err = clusterRemoteSrv.srv.Listeners()
if err != nil {

View File

@@ -56,7 +56,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
github.com/dolthub/go-mysql-server v0.19.1-0.20250128182847-3f5bb8c52cd8
github.com/dolthub/go-mysql-server v0.19.1-0.20250131110511-67aa2a430366
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63
github.com/dolthub/swiss v0.1.0
github.com/esote/minmaxheap v1.0.0

View File

@@ -179,8 +179,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-icu-regex v0.0.0-20241215010122-db690dd53c90 h1:Sni8jrP0sy/w9ZYXoff4g/ixe+7bFCZlfCqXKJSU+zM=
github.com/dolthub/go-icu-regex v0.0.0-20241215010122-db690dd53c90/go.mod h1:ylU4XjUpsMcvl/BKeRRMXSH7e7WBrPXdSLvnRJYrxEA=
github.com/dolthub/go-mysql-server v0.19.1-0.20250128182847-3f5bb8c52cd8 h1:eEGYHOC5Ft+56yPaH26gsdbonrZ2EiTwQLy8Oj3TAFE=
github.com/dolthub/go-mysql-server v0.19.1-0.20250128182847-3f5bb8c52cd8/go.mod h1:jYEJ8tNkA7K3k39X8iMqaX3MSMmViRgh222JSLHDgVc=
github.com/dolthub/go-mysql-server v0.19.1-0.20250131110511-67aa2a430366 h1:pJ+upgX6hrhyqgpkmk9Ye9lIPSualMHZcUMs8kWknV4=
github.com/dolthub/go-mysql-server v0.19.1-0.20250131110511-67aa2a430366/go.mod h1:jYEJ8tNkA7K3k39X8iMqaX3MSMmViRgh222JSLHDgVc=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q=
github.com/dolthub/ishell v0.0.0-20240701202509-2b217167d718 h1:lT7hE5k+0nkBdj/1UOSFwjWpNxf+LCApbRHgnCA17XE=

View File

@@ -27,27 +27,27 @@ import (
// TestBinlogReplicationForAllTypes tests that operations (inserts, updates, and deletes) on all SQL
// data types can be successfully replicated.
func TestBinlogReplicationForAllTypes(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)
// Set the session's timezone to UTC, to avoid TIMESTAMP test values changing
// when they are converted to UTC for storage.
primaryDatabase.MustExec("SET @@time_zone = '+0:00';")
h.primaryDatabase.MustExec("SET @@time_zone = '+0:00';")
// Create the test table
tableName := "alltypes"
createTableStatement := generateCreateTableStatement(tableName)
primaryDatabase.MustExec(createTableStatement)
h.primaryDatabase.MustExec(createTableStatement)
// Make inserts on the primary small, large, and null values
primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 0))
primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 1))
primaryDatabase.MustExec(generateInsertNullValuesStatement(tableName))
h.primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 0))
h.primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 1))
h.primaryDatabase.MustExec(generateInsertNullValuesStatement(tableName))
// Verify inserts on replica
waitForReplicaToCatchUp(t)
rows, err := replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
rows, err := h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
@@ -62,14 +62,14 @@ func TestBinlogReplicationForAllTypes(t *testing.T) {
require.NoError(t, rows.Close())
// Make updates on the primary
primaryDatabase.MustExec(generateUpdateToNullValuesStatement(tableName, 1))
primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 2, 0))
primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 3, 1))
h.primaryDatabase.MustExec(generateUpdateToNullValuesStatement(tableName, 1))
h.primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 2, 0))
h.primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 3, 1))
// Verify updates on the replica
waitForReplicaToCatchUp(t)
replicaDatabase.MustExec("use db01;")
rows, err = replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
h.replicaDatabase.MustExec("use db01;")
rows, err = h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
@@ -84,13 +84,13 @@ func TestBinlogReplicationForAllTypes(t *testing.T) {
require.NoError(t, rows.Close())
// Make deletes on the primary
primaryDatabase.MustExec("delete from alltypes where pk=1;")
primaryDatabase.MustExec("delete from alltypes where pk=2;")
primaryDatabase.MustExec("delete from alltypes where pk=3;")
h.primaryDatabase.MustExec("delete from alltypes where pk=1;")
h.primaryDatabase.MustExec("delete from alltypes where pk=2;")
h.primaryDatabase.MustExec("delete from alltypes where pk=3;")
// Verify deletes on the replica
waitForReplicaToCatchUp(t)
rows, err = replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
rows, err = h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
require.False(t, rows.Next())
require.NoError(t, rows.Close())

View File

@@ -24,37 +24,37 @@ import (
// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)
// Ignore replication events for db01.t2. Also tests that the first filter setting is overwritten by
// the second and that db and that db and table names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t1);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(DB01.T2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t1);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(DB01.T2);")
// Assert that status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"])
require.Equal(t, "", status["Replicate_Do_Table"])
// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()
// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
@@ -63,7 +63,7 @@ func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
require.NoError(t, rows.Close())
// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
@@ -75,37 +75,37 @@ func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
// TestBinlogReplicationFilters_doTablesOnly tests that the doTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)
// Do replication events for db01.t1. Also tests that the first filter setting is overwritten by
// the second and that db and that db and table names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t2);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(DB01.T1);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(DB01.T1);")
// Assert that status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.Equal(t, "db01.t1", status["Replicate_Do_Table"])
require.Equal(t, "", status["Replicate_Ignore_Table"])
// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()
// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
@@ -114,7 +114,7 @@ func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
require.NoError(t, rows.Close())
// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
@@ -126,38 +126,38 @@ func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
// TestBinlogReplicationFilters_doTablesAndIgnoreTables tests that the doTables and ignoreTables
// replication filtering options are correctly applied and honored when used together.
func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)
// Do replication events for db01.t1, and db01.t2
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);")
// Ignore replication events for db01.t2
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t2);")
// Assert that replica status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.True(t, status["Replicate_Do_Table"] == "db01.t1,db01.t2" ||
status["Replicate_Do_Table"] == "db01.t2,db01.t1")
require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"])
// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()
// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
@@ -166,7 +166,7 @@ func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
require.NoError(t, rows.Close())
// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
@@ -177,15 +177,15 @@ func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
// TestBinlogReplicationFilters_errorCases test returned errors for various error cases.
func TestBinlogReplicationFilters_errorCases(t *testing.T) {
defer teardown(t)
startSqlServers(t)
h := newHarness(t)
h.startSqlServers()
// All tables must be qualified with a database
_, err := replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(t1);")
_, err := h.replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(t1);")
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")
_, err = replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(t1);")
_, err = h.replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(t1);")
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")
}

View File

@@ -23,30 +23,30 @@ import (
// TestBinlogReplicationMultiDb tests that binlog events spanning multiple databases are correctly
// applied by a replica.
func TestBinlogReplicationMultiDb(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)
// Make changes on the primary to db01 and db02
primaryDatabase.MustExec("create database db02;")
primaryDatabase.MustExec("use db01;")
primaryDatabase.MustExec("create table t01 (pk int primary key, c1 int default (0))")
primaryDatabase.MustExec("use db02;")
primaryDatabase.MustExec("create table t02 (pk int primary key, c1 int default (0))")
primaryDatabase.MustExec("use db01;")
primaryDatabase.MustExec("insert into t01 (pk) values (1), (3), (5), (8), (9);")
primaryDatabase.MustExec("use db02;")
primaryDatabase.MustExec("insert into t02 (pk) values (2), (4), (6), (7), (10);")
primaryDatabase.MustExec("use db01;")
primaryDatabase.MustExec("delete from t01 where pk=9;")
primaryDatabase.MustExec("delete from db02.t02 where pk=10;")
primaryDatabase.MustExec("use db02;")
primaryDatabase.MustExec("update db01.t01 set pk=7 where pk=8;")
primaryDatabase.MustExec("update t02 set pk=8 where pk=7;")
h.primaryDatabase.MustExec("create database db02;")
h.primaryDatabase.MustExec("use db01;")
h.primaryDatabase.MustExec("create table t01 (pk int primary key, c1 int default (0))")
h.primaryDatabase.MustExec("use db02;")
h.primaryDatabase.MustExec("create table t02 (pk int primary key, c1 int default (0))")
h.primaryDatabase.MustExec("use db01;")
h.primaryDatabase.MustExec("insert into t01 (pk) values (1), (3), (5), (8), (9);")
h.primaryDatabase.MustExec("use db02;")
h.primaryDatabase.MustExec("insert into t02 (pk) values (2), (4), (6), (7), (10);")
h.primaryDatabase.MustExec("use db01;")
h.primaryDatabase.MustExec("delete from t01 where pk=9;")
h.primaryDatabase.MustExec("delete from db02.t02 where pk=10;")
h.primaryDatabase.MustExec("use db02;")
h.primaryDatabase.MustExec("update db01.t01 set pk=7 where pk=8;")
h.primaryDatabase.MustExec("update t02 set pk=8 where pk=7;")
// Verify the changes in db01 on the replica
waitForReplicaToCatchUp(t)
rows, err := replicaDatabase.Queryx("select * from db01.t01 order by pk asc;")
h.waitForReplicaToCatchUp()
rows, err := h.replicaDatabase.Queryx("select * from db01.t01 order by pk asc;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
@@ -61,8 +61,8 @@ func TestBinlogReplicationMultiDb(t *testing.T) {
require.NoError(t, rows.Close())
// Verify db01.dolt_diff
replicaDatabase.MustExec("use db01;")
rows, err = replicaDatabase.Queryx("select * from db01.dolt_diff;")
h.replicaDatabase.MustExec("use db01;")
rows, err = h.replicaDatabase.Queryx("select * from db01.dolt_diff;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t01", row["table_name"])
@@ -85,8 +85,8 @@ func TestBinlogReplicationMultiDb(t *testing.T) {
require.NoError(t, rows.Close())
// Verify the changes in db02 on the replica
replicaDatabase.MustExec("use db02;")
rows, err = replicaDatabase.Queryx("select * from db02.t02 order by pk asc;")
h.replicaDatabase.MustExec("use db02;")
rows, err = h.replicaDatabase.Queryx("select * from db02.t02 order by pk asc;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "2", row["pk"])
@@ -100,7 +100,7 @@ func TestBinlogReplicationMultiDb(t *testing.T) {
require.NoError(t, rows.Close())
// Verify db02.dolt_diff
rows, err = replicaDatabase.Queryx("select * from db02.dolt_diff;")
rows, err = h.replicaDatabase.Queryx("select * from db02.dolt_diff;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t02", row["table_name"])
@@ -125,28 +125,28 @@ func TestBinlogReplicationMultiDb(t *testing.T) {
// TestBinlogReplicationMultiDbTransactions tests that binlog events for transactions that span
// multiple DBs are applied correctly to a replica.
func TestBinlogReplicationMultiDbTransactions(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)
// Make changes on the primary to db01 and db02
primaryDatabase.MustExec("create database db02;")
primaryDatabase.MustExec("create table db01.t01 (pk int primary key, c1 int default (0))")
primaryDatabase.MustExec("create table db02.t02 (pk int primary key, c1 int default (0))")
primaryDatabase.MustExec("set @autocommit = 0;")
h.primaryDatabase.MustExec("create database db02;")
h.primaryDatabase.MustExec("create table db01.t01 (pk int primary key, c1 int default (0))")
h.primaryDatabase.MustExec("create table db02.t02 (pk int primary key, c1 int default (0))")
h.primaryDatabase.MustExec("set @autocommit = 0;")
primaryDatabase.MustExec("start transaction;")
primaryDatabase.MustExec("insert into db01.t01 (pk) values (1), (3), (5), (8), (9);")
primaryDatabase.MustExec("insert into db02.t02 (pk) values (2), (4), (6), (7), (10);")
primaryDatabase.MustExec("delete from db01.t01 where pk=9;")
primaryDatabase.MustExec("delete from db02.t02 where pk=10;")
primaryDatabase.MustExec("update db01.t01 set pk=7 where pk=8;")
primaryDatabase.MustExec("update db02.t02 set pk=8 where pk=7;")
primaryDatabase.MustExec("commit;")
h.primaryDatabase.MustExec("start transaction;")
h.primaryDatabase.MustExec("insert into db01.t01 (pk) values (1), (3), (5), (8), (9);")
h.primaryDatabase.MustExec("insert into db02.t02 (pk) values (2), (4), (6), (7), (10);")
h.primaryDatabase.MustExec("delete from db01.t01 where pk=9;")
h.primaryDatabase.MustExec("delete from db02.t02 where pk=10;")
h.primaryDatabase.MustExec("update db01.t01 set pk=7 where pk=8;")
h.primaryDatabase.MustExec("update db02.t02 set pk=8 where pk=7;")
h.primaryDatabase.MustExec("commit;")
// Verify the changes in db01 on the replica
waitForReplicaToCatchUp(t)
rows, err := replicaDatabase.Queryx("select * from db01.t01 order by pk asc;")
h.waitForReplicaToCatchUp()
rows, err := h.replicaDatabase.Queryx("select * from db01.t01 order by pk asc;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
@@ -160,8 +160,8 @@ func TestBinlogReplicationMultiDbTransactions(t *testing.T) {
require.NoError(t, rows.Close())
// Verify db01.dolt_diff
replicaDatabase.MustExec("use db01;")
rows, err = replicaDatabase.Queryx("select * from db01.dolt_diff;")
h.replicaDatabase.MustExec("use db01;")
rows, err = h.replicaDatabase.Queryx("select * from db01.dolt_diff;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t01", row["table_name"])
@@ -175,9 +175,9 @@ func TestBinlogReplicationMultiDbTransactions(t *testing.T) {
require.NoError(t, rows.Close())
// Verify the changes in db02 on the replica
waitForReplicaToCatchUp(t)
replicaDatabase.MustExec("use db02;")
rows, err = replicaDatabase.Queryx("select * from db02.t02 order by pk asc;")
h.waitForReplicaToCatchUp()
h.replicaDatabase.MustExec("use db02;")
rows, err = h.replicaDatabase.Queryx("select * from db02.t02 order by pk asc;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "2", row["pk"])
@@ -191,7 +191,7 @@ func TestBinlogReplicationMultiDbTransactions(t *testing.T) {
require.NoError(t, rows.Close())
// Verify db02.dolt_diff
rows, err = replicaDatabase.Queryx("select * from db02.dolt_diff;")
rows, err = h.replicaDatabase.Queryx("select * from db02.dolt_diff;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t02", row["table_name"])

View File

@@ -28,38 +28,34 @@ import (
"github.com/stretchr/testify/require"
)
var toxiClient *toxiproxyclient.Client
var mysqlProxy *toxiproxyclient.Proxy
var proxyPort int
// TestBinlogReplicationAutoReconnect tests that the replica's connection to the primary is correctly
// reestablished if it drops.
func TestBinlogReplicationAutoReconnect(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
configureToxiProxy(t)
configureFastConnectionRetry(t)
startReplicationAndCreateTestDb(t, proxyPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.configureToxiProxy()
h.configureFastConnectionRetry()
h.startReplicationAndCreateTestDb(h.proxyPort)
// Get the replica started up and ensure it's in sync with the primary before turning on the limit_data toxic
testInitialReplicaStatus(t)
primaryDatabase.MustExec("create table reconnect_test(pk int primary key, c1 varchar(255));")
waitForReplicaToCatchUp(t)
turnOnLimitDataToxic(t)
h.testInitialReplicaStatus()
h.primaryDatabase.MustExec("create table reconnect_test(pk int primary key, c1 varchar(255));")
h.waitForReplicaToCatchUp()
h.turnOnLimitDataToxic()
for i := 0; i < 1000; i++ {
value := "foobarbazbashfoobarbazbashfoobarbazbashfoobarbazbashfoobarbazbash"
primaryDatabase.MustExec(fmt.Sprintf("insert into reconnect_test values (%v, %q)", i, value))
h.primaryDatabase.MustExec(fmt.Sprintf("insert into reconnect_test values (%v, %q)", i, value))
}
// Remove the limit_data toxic so that a connection can be reestablished
err := mysqlProxy.RemoveToxic("limit_data")
err := h.mysqlProxy.RemoveToxic("limit_data")
require.NoError(t, err)
t.Logf("Toxiproxy proxy limit_data toxic removed")
// Assert that all records get written to the table
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()
rows, err := replicaDatabase.Queryx("select min(pk) as min, max(pk) as max, count(pk) as count from db01.reconnect_test;")
rows, err := h.replicaDatabase.Queryx("select min(pk) as min, max(pk) as max, count(pk) as count from db01.reconnect_test;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
@@ -69,7 +65,7 @@ func TestBinlogReplicationAutoReconnect(t *testing.T) {
require.NoError(t, rows.Close())
// Assert that show replica status show reconnection IO error
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.Equal(t, "1158", status["Last_IO_Errno"])
require.True(t, strings.Contains(status["Last_IO_Error"].(string), "EOF"))
requireRecentTimeString(t, status["Last_IO_Error_Timestamp"])
@@ -77,54 +73,54 @@ func TestBinlogReplicationAutoReconnect(t *testing.T) {
// configureFastConnectionRetry configures the replica to retry a failed connection after 5s, instead of the default 60s
// connection retry interval. This is used for testing connection retry logic without waiting the full default period.
func configureFastConnectionRetry(_ *testing.T) {
replicaDatabase.MustExec(
func (h *harness) configureFastConnectionRetry() {
h.replicaDatabase.MustExec(
"change replication source to SOURCE_CONNECT_RETRY=5;")
}
// testInitialReplicaStatus tests the data returned by SHOW REPLICA STATUS and errors
// out if any values are not what is expected for a replica that has just connected
// to a MySQL primary.
func testInitialReplicaStatus(t *testing.T) {
status := showReplicaStatus(t)
func (h *harness) testInitialReplicaStatus() {
status := h.showReplicaStatus()
// Positioning settings
require.Equal(t, "1", status["Auto_Position"])
require.Equal(h.t, "1", status["Auto_Position"])
// Connection settings
require.Equal(t, "5", status["Connect_Retry"])
require.Equal(t, "86400", status["Source_Retry_Count"])
require.Equal(t, "localhost", status["Source_Host"])
require.NotEmpty(t, status["Source_Port"])
require.NotEmpty(t, status["Source_User"])
require.Equal(h.t, "5", status["Connect_Retry"])
require.Equal(h.t, "86400", status["Source_Retry_Count"])
require.Equal(h.t, "localhost", status["Source_Host"])
require.NotEmpty(h.t, status["Source_Port"])
require.NotEmpty(h.t, status["Source_User"])
// Error status
require.Equal(t, "0", status["Last_Errno"])
require.Equal(t, "", status["Last_Error"])
require.Equal(t, "0", status["Last_IO_Errno"])
require.Equal(t, "", status["Last_IO_Error"])
require.Equal(t, "", status["Last_IO_Error_Timestamp"])
require.Equal(t, "0", status["Last_SQL_Errno"])
require.Equal(t, "", status["Last_SQL_Error"])
require.Equal(t, "", status["Last_SQL_Error_Timestamp"])
require.Equal(h.t, "0", status["Last_Errno"])
require.Equal(h.t, "", status["Last_Error"])
require.Equal(h.t, "0", status["Last_IO_Errno"])
require.Equal(h.t, "", status["Last_IO_Error"])
require.Equal(h.t, "", status["Last_IO_Error_Timestamp"])
require.Equal(h.t, "0", status["Last_SQL_Errno"])
require.Equal(h.t, "", status["Last_SQL_Error"])
require.Equal(h.t, "", status["Last_SQL_Error_Timestamp"])
// Empty filter configuration
require.Equal(t, "", status["Replicate_Do_Table"])
require.Equal(t, "", status["Replicate_Ignore_Table"])
require.Equal(h.t, "", status["Replicate_Do_Table"])
require.Equal(h.t, "", status["Replicate_Ignore_Table"])
// Thread status
require.True(t,
require.True(h.t,
status["Replica_IO_Running"] == "Yes" ||
status["Replica_IO_Running"] == "Connecting")
require.Equal(t, "Yes", status["Replica_SQL_Running"])
require.Equal(h.t, "Yes", status["Replica_SQL_Running"])
// Unsupported fields
require.Equal(t, "INVALID", status["Source_Log_File"])
require.Equal(t, "Ignored", status["Source_SSL_Allowed"])
require.Equal(t, "None", status["Until_Condition"])
require.Equal(t, "0", status["SQL_Delay"])
require.Equal(t, "0", status["SQL_Remaining_Delay"])
require.Equal(t, "0", status["Seconds_Behind_Source"])
require.Equal(h.t, "INVALID", status["Source_Log_File"])
require.Equal(h.t, "Ignored", status["Source_SSL_Allowed"])
require.Equal(h.t, "None", status["Until_Condition"])
require.Equal(h.t, "0", status["SQL_Delay"])
require.Equal(h.t, "0", status["SQL_Remaining_Delay"])
require.Equal(h.t, "0", status["Seconds_Behind_Source"])
}
// requireRecentTimeString asserts that the specified |datetime| is a non-nil timestamp string
@@ -141,14 +137,14 @@ func requireRecentTimeString(t *testing.T, datetime interface{}) {
// showReplicaStatus returns a map with the results of SHOW REPLICA STATUS, keyed by the
// name of each column.
func showReplicaStatus(t *testing.T) map[string]interface{} {
rows, err := replicaDatabase.Queryx("show replica status;")
require.NoError(t, err)
func (h *harness) showReplicaStatus() map[string]interface{} {
rows, err := h.replicaDatabase.Queryx("show replica status;")
require.NoError(h.t, err)
defer rows.Close()
return convertMapScanResultToStrings(readNextRow(t, rows))
return convertMapScanResultToStrings(readNextRow(h.t, rows))
}
func configureToxiProxy(t *testing.T) {
func (h *harness) configureToxiProxy() {
toxiproxyPort := findFreePort()
metrics := toxiproxy.NewMetricsContainer(prometheus.NewRegistry())
@@ -157,31 +153,31 @@ func configureToxiProxy(t *testing.T) {
toxiproxyServer.Listen("localhost", strconv.Itoa(toxiproxyPort))
}()
time.Sleep(500 * time.Millisecond)
t.Logf("Toxiproxy control plane running on port %d", toxiproxyPort)
h.t.Logf("Toxiproxy control plane running on port %d", toxiproxyPort)
toxiClient = toxiproxyclient.NewClient(fmt.Sprintf("localhost:%d", toxiproxyPort))
h.toxiClient = toxiproxyclient.NewClient(fmt.Sprintf("localhost:%d", toxiproxyPort))
proxyPort = findFreePort()
h.proxyPort = findFreePort()
var err error
mysqlProxy, err = toxiClient.CreateProxy("mysql",
fmt.Sprintf("localhost:%d", proxyPort), // downstream
fmt.Sprintf("localhost:%d", mySqlPort)) // upstream
h.mysqlProxy, err = h.toxiClient.CreateProxy("mysql",
fmt.Sprintf("localhost:%d", h.proxyPort), // downstream
fmt.Sprintf("localhost:%d", h.mySqlPort)) // upstream
if err != nil {
panic(fmt.Sprintf("unable to create toxiproxy: %v", err.Error()))
}
t.Logf("Toxiproxy proxy started on port %d", proxyPort)
h.t.Logf("Toxiproxy proxy started on port %d", h.proxyPort)
}
// turnOnLimitDataToxic adds a limit_data toxic to the active Toxiproxy, which prevents more than 1KB of data
// from being sent from the primary through the proxy to the replica. Callers MUST call configureToxiProxy
// before calling this function.
func turnOnLimitDataToxic(t *testing.T) {
require.NotNil(t, mysqlProxy)
_, err := mysqlProxy.AddToxic("limit_data", "limit_data", "downstream", 1.0, toxiproxyclient.Attributes{
func (h *harness) turnOnLimitDataToxic() {
require.NotNil(h.t, h.mysqlProxy)
_, err := h.mysqlProxy.AddToxic("limit_data", "limit_data", "downstream", 1.0, toxiproxyclient.Attributes{
"bytes": 1_000,
})
require.NoError(t, err)
t.Logf("Toxiproxy proxy with limit_data toxic (1KB) started on port %d", proxyPort)
require.NoError(h.t, err)
h.t.Logf("Toxiproxy proxy with limit_data toxic (1KB) started on port %d", h.proxyPort)
}
// convertMapScanResultToStrings converts each value in the specified map |m| into a string.

View File

@@ -25,11 +25,11 @@ import (
// TestBinlogReplicationServerRestart tests that a replica can be configured and started, then the
// server process can be restarted and replica can be restarted without problems.
func TestBinlogReplicationServerRestart(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)
primaryDatabase.MustExec("create table t (pk int auto_increment primary key)")
h.primaryDatabase.MustExec("create table t (pk int auto_increment primary key)")
// Launch a goroutine that inserts data for 5 seconds
var wg sync.WaitGroup
@@ -38,22 +38,22 @@ func TestBinlogReplicationServerRestart(t *testing.T) {
defer wg.Done()
limit := 5 * time.Second
for startTime := time.Now(); time.Now().Sub(startTime) <= limit; {
primaryDatabase.MustExec("insert into t values (DEFAULT);")
h.primaryDatabase.MustExec("insert into t values (DEFAULT);")
time.Sleep(100 * time.Millisecond)
}
}()
// Let the replica process a few transactions, then stop the server and pause a second
waitForReplicaToReachGtid(t, 3)
stopDoltSqlServer(t)
h.waitForReplicaToReachGtid(3)
h.stopDoltSqlServer()
time.Sleep(1000 * time.Millisecond)
var err error
doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil)
h.doltPort, h.doltProcess, err = h.startDoltSqlServer(nil)
require.NoError(t, err)
// Check replication status on the replica and assert configuration persisted
status := showReplicaStatus(t)
status := h.showReplicaStatus()
// The default Connect_Retry interval is 60s; but some tests configure a faster connection retry interval
require.True(t, status["Connect_Retry"] == "5" || status["Connect_Retry"] == "60")
require.Equal(t, "86400", status["Source_Retry_Count"])
@@ -64,16 +64,16 @@ func TestBinlogReplicationServerRestart(t *testing.T) {
// Restart replication on replica
// TODO: For now, we have to set server_id each time we start the service.
// Turn this into a persistent sys var
replicaDatabase.MustExec("set @@global.server_id=123;")
replicaDatabase.MustExec("START REPLICA")
h.replicaDatabase.MustExec("set @@global.server_id=123;")
h.replicaDatabase.MustExec("START REPLICA")
// Assert that all changes have replicated from the primary
wg.Wait()
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()
countMaxQuery := "SELECT COUNT(pk) AS count, MAX(pk) as max FROM db01.t;"
primaryRows, err := primaryDatabase.Queryx(countMaxQuery)
primaryRows, err := h.primaryDatabase.Queryx(countMaxQuery)
require.NoError(t, err)
replicaRows, err := replicaDatabase.Queryx(countMaxQuery)
replicaRows, err := h.replicaDatabase.Queryx(countMaxQuery)
require.NoError(t, err)
primaryRow := convertMapScanResultToStrings(readNextRow(t, primaryRows))
replicaRow := convertMapScanResultToStrings(readNextRow(t, replicaRows))

View File

@@ -0,0 +1,72 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd
// +build darwin dragonfly freebsd linux netbsd openbsd
package binlogreplication
import (
"os"
"os/exec"
"os/signal"
"syscall"
"time"
)
func ApplyCmdAttributes(cmd *exec.Cmd) {
// Nothing...
}
func StopProcess(proc *os.Process) error {
err := proc.Signal(syscall.SIGTERM)
if err != nil {
return err
}
_, err = proc.Wait()
return err
}
// These tests spawn child process for go compiling, dolt sql-server
// and for mysqld. We would like to clean up these child processes
// when the program exits. In general, we use *testing.T.Cleanup to
// terminate any running processes associated with the test.
//
// On a shell, when a user runs 'go test .', and then they deliver
// an interrupt, '^C', the shell delivers a SIGINT to the process
// group of the foreground process. In our case, `dolt`, `go`, and
// the default signal handler for the golang runtime (this test
// program) will all terminate the program on delivery of a SIGINT.
// `mysqld`, however, does not terminate on receiving SIGINT. Thus,
// we install a handler here, and we translate the Interrupt into
// a SIGTERM against the process group. That will get `mysqld` to
// shutdown as well.
func InstallSignalHandlers() {
interrupts := make(chan os.Signal, 1)
signal.Notify(interrupts, os.Interrupt)
go func() {
<-interrupts
// |mysqld| will exit on SIGTERM
syscall.Kill(-os.Getpid(), syscall.SIGTERM)
time.Sleep(1 * time.Second)
// Canceling this context will cause os.Process.Kill
// to get called on any still-running processes.
commandCtxCancel()
time.Sleep(1 * time.Second)
// Redeliver SIGINT to ourselves with the default
// signal handler restored.
signal.Reset(os.Interrupt)
syscall.Kill(-os.Getpid(), syscall.SIGINT)
}()
}

View File

@@ -0,0 +1,50 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build windows
// +build windows
package binlogreplication
import (
"os"
"os/exec"
"syscall"
"golang.org/x/sys/windows"
)
func ApplyCmdAttributes(cmd *exec.Cmd) {
// Creating a new process group for the process will allow GracefulStop to send the break signal to that process
// without also killing the parent process
cmd.SysProcAttr = &syscall.SysProcAttr{
CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
}
}
func StopProcess(proc *os.Process) error {
err := windows.GenerateConsoleCtrlEvent(windows.CTRL_BREAK_EVENT, uint32(proc.Pid))
if err != nil {
return err
}
_, err = proc.Wait()
return err
}
// I don't know if there is any magic necessary here, but regardless,
// we don't run these tests on windows, so there are never child
// mysqld processes to worry about.
func InstallSignalHandlers() {
}

View File

@@ -688,9 +688,14 @@ func (c *Controller) RemoteSrvServerArgs(ctxFactory func(context.Context) (*sql.
listenaddr := c.RemoteSrvListenAddr()
args.HttpListenAddr = listenaddr
args.GrpcListenAddr = listenaddr
args.Options = c.ServerOptions()
ctxInterceptor := sqle.SqlContextServerInterceptor{
Factory: ctxFactory,
}
args.Options = append(args.Options, ctxInterceptor.Options()...)
args.Options = append(args.Options, c.ServerOptions()...)
args.HttpInterceptor = ctxInterceptor.HTTP(args.HttpInterceptor)
var err error
args.FS, args.DBCache, err = sqle.RemoteSrvFSAndDBCache(ctxFactory, sqle.CreateUnknownDatabases)
args.DBCache, err = sqle.RemoteSrvDBCache(sqle.GetInterceptorSqlContext, sqle.CreateUnknownDatabases)
if err != nil {
return remotesrv.ServerArgs{}, err
}
@@ -699,7 +704,7 @@ func (c *Controller) RemoteSrvServerArgs(ctxFactory func(context.Context) (*sql.
keyID := creds.PubKeyToKID(c.pub)
keyIDStr := creds.B32CredsEncoding.EncodeToString(keyID)
args.HttpInterceptor = JWKSHandlerInterceptor(keyIDStr, c.pub)
args.HttpInterceptor = JWKSHandlerInterceptor(args.HttpInterceptor, keyIDStr, c.pub)
return args, nil
}

View File

@@ -46,16 +46,21 @@ func (h JWKSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write(b)
}
func JWKSHandlerInterceptor(keyID string, pub ed25519.PublicKey) func(http.Handler) http.Handler {
func JWKSHandlerInterceptor(existing func(http.Handler) http.Handler, keyID string, pub ed25519.PublicKey) func(http.Handler) http.Handler {
jh := JWKSHandler{KeyID: keyID, PublicKey: pub}
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
this := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.EscapedPath() == "/.well-known/jwks.json" {
jh.ServeHTTP(w, r)
return
}
h.ServeHTTP(w, r)
})
if existing != nil {
return existing(this)
} else {
return this
}
}
}

View File

@@ -688,7 +688,8 @@ func (p *DoltDatabaseProvider) CloneDatabaseFromRemote(
if exists {
deleteErr := p.fs.Delete(dbName, true)
if deleteErr != nil {
err = fmt.Errorf("%s: unable to clean up failed clone in directory '%s'", err.Error(), dbName)
err = fmt.Errorf("%s: unable to clean up failed clone in directory '%s': %s",
err.Error(), dbName, deleteErr.Error())
}
}
return err

View File

@@ -16,13 +16,15 @@ package sqle
import (
"context"
"errors"
"net/http"
"github.com/dolthub/go-mysql-server/sql"
"google.golang.org/grpc"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/datas"
)
@@ -81,17 +83,12 @@ type CreateUnknownDatabasesSetting bool
const CreateUnknownDatabases CreateUnknownDatabasesSetting = true
const DoNotCreateUnknownDatabases CreateUnknownDatabasesSetting = false
// Considers |args| and returns a new |remotesrv.ServerArgs| instance which
// will serve databases accessible through |ctxFactory|.
func RemoteSrvFSAndDBCache(ctxFactory func(context.Context) (*sql.Context, error), createSetting CreateUnknownDatabasesSetting) (filesys.Filesys, remotesrv.DBCache, error) {
sqlCtx, err := ctxFactory(context.Background())
if err != nil {
return nil, nil, err
}
sess := dsess.DSessFromSess(sqlCtx.Session)
fs := sess.Provider().FileSystem()
// Returns a remotesrv.DBCache instance which will use the *sql.Context
// returned from |ctxFactory| to access a database in the session
// DatabaseProvider.
func RemoteSrvDBCache(ctxFactory func(context.Context) (*sql.Context, error), createSetting CreateUnknownDatabasesSetting) (remotesrv.DBCache, error) {
dbcache := remotesrvStore{ctxFactory, bool(createSetting)}
return fs, dbcache, nil
return dbcache, nil
}
func WithUserPasswordAuth(args remotesrv.ServerArgs, authnz remotesrv.AccessControl) remotesrv.ServerArgs {
@@ -102,3 +99,88 @@ func WithUserPasswordAuth(args remotesrv.ServerArgs, authnz remotesrv.AccessCont
args.Options = append(args.Options, si.Options()...)
return args
}
type SqlContextServerInterceptor struct {
Factory func(context.Context) (*sql.Context, error)
}
type serverStreamWrapper struct {
grpc.ServerStream
ctx context.Context
}
func (s serverStreamWrapper) Context() context.Context {
return s.ctx
}
type sqlContextInterceptorKey struct{}
func GetInterceptorSqlContext(ctx context.Context) (*sql.Context, error) {
if v := ctx.Value(sqlContextInterceptorKey{}); v != nil {
return v.(*sql.Context), nil
}
return nil, errors.New("misconfiguration; a sql.Context should always be available from the interceptor chain.")
}
func (si SqlContextServerInterceptor) Stream() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
sqlCtx, err := si.Factory(ss.Context())
if err != nil {
return err
}
sql.SessionCommandBegin(sqlCtx.Session)
defer sql.SessionCommandEnd(sqlCtx.Session)
defer sql.SessionEnd(sqlCtx.Session)
newCtx := context.WithValue(ss.Context(), sqlContextInterceptorKey{}, sqlCtx)
newSs := serverStreamWrapper{
ServerStream: ss,
ctx: newCtx,
}
return handler(srv, newSs)
}
}
func (si SqlContextServerInterceptor) Unary() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
sqlCtx, err := si.Factory(ctx)
if err != nil {
return nil, err
}
sql.SessionCommandBegin(sqlCtx.Session)
defer sql.SessionCommandEnd(sqlCtx.Session)
defer sql.SessionEnd(sqlCtx.Session)
newCtx := context.WithValue(ctx, sqlContextInterceptorKey{}, sqlCtx)
return handler(newCtx, req)
}
}
func (si SqlContextServerInterceptor) HTTP(existing func(http.Handler) http.Handler) func(http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
this := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
sqlCtx, err := si.Factory(ctx)
if err != nil {
http.Error(w, "could not initialize sql.Context", http.StatusInternalServerError)
return
}
sql.SessionCommandBegin(sqlCtx.Session)
defer sql.SessionCommandEnd(sqlCtx.Session)
defer sql.SessionEnd(sqlCtx.Session)
newCtx := context.WithValue(ctx, sqlContextInterceptorKey{}, sqlCtx)
newReq := r.WithContext(newCtx)
h.ServeHTTP(w, newReq)
})
if existing != nil {
return existing(this)
} else {
return this
}
}
}
func (si SqlContextServerInterceptor) Options() []grpc.ServerOption {
return []grpc.ServerOption{
grpc.ChainUnaryInterceptor(si.Unary()),
grpc.ChainStreamInterceptor(si.Stream()),
}
}

View File

@@ -769,10 +769,10 @@ func (sm SerialMessage) WalkAddrs(nbf *NomsBinFormat, cb func(addr hash.Hash) er
return err
}
}
case serial.TableSchemaFileID, serial.ForeignKeyCollectionFileID:
case serial.TableSchemaFileID, serial.ForeignKeyCollectionFileID, serial.TupleFileID:
// no further references from these file types
return nil
case serial.ProllyTreeNodeFileID, serial.AddressMapFileID, serial.MergeArtifactsFileID, serial.BlobFileID, serial.CommitClosureFileID:
case serial.ProllyTreeNodeFileID, serial.AddressMapFileID, serial.MergeArtifactsFileID, serial.BlobFileID, serial.CommitClosureFileID, serial.VectorIndexNodeFileID:
return message.WalkAddresses(context.TODO(), serial.Message(sm), func(ctx context.Context, addr hash.Hash) error {
return cb(addr)
})

View File

@@ -430,3 +430,14 @@ SQL
[[ "$output" =~ "pk1" ]] || false
[[ "${#lines[@]}" = "1" ]] || false
}
@test "vector-index: can GC" {
dolt sql <<SQL
CREATE VECTOR INDEX idx_v1 ON onepk(v1);
INSERT INTO onepk VALUES (1, '[99, 51]'), (2, '[11, 55]'), (3, '[88, 52]'), (4, '[22, 54]'), (5, '[77, 53]');
SQL
dolt gc
dolt sql <<SQL
INSERT INTO onepk VALUES (6, '[99, 51]'), (7, '[11, 55]'), (8, '[88, 52]'), (9, '[22, 54]'), (10, '[77, 53]');
SQL
}