Optimizing how we commit SQL transactions in the binlog replication applier thread so that we don't have to commit in all databases

This commit is contained in:
Jason Fulghum
2024-08-27 14:48:36 -07:00
parent a4cef150f6
commit 4c4e9bc312
3 changed files with 103 additions and 27 deletions

View File

@@ -55,15 +55,16 @@ const (
// This type is NOT used concurrently there is currently only one single applier process running to process binlog
// events, so the state in this type is NOT protected with a mutex.
type binlogReplicaApplier struct {
format *mysql.BinlogFormat
tableMapsById map[uint64]*mysql.TableMap
stopReplicationChan chan struct{}
currentGtid mysql.GTID
replicationSourceUuid string
currentPosition *mysql.Position // successfully executed GTIDs
filters *filterConfiguration
running atomic.Bool
engine *gms.Engine
format *mysql.BinlogFormat
tableMapsById map[uint64]*mysql.TableMap
stopReplicationChan chan struct{}
currentGtid mysql.GTID
replicationSourceUuid string
currentPosition *mysql.Position // successfully executed GTIDs
filters *filterConfiguration
running atomic.Bool
engine *gms.Engine
dbsWithUncommittedChanges map[string]struct{}
}
func newBinlogReplicaApplier(filters *filterConfiguration) *binlogReplicaApplier {
@@ -326,7 +327,6 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.Engine, event mysql.BinlogEvent) error {
var err error
createCommit := false
commitToAllDatabases := false
// We don't support checksum validation, so we MUST strip off any checksum bytes if present, otherwise it gets
// interpreted as part of the payload and corrupts the data. Future checksum sizes, are not guaranteed to be the
@@ -356,7 +356,6 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
// XA-capable storage engine. For more details, see: https://mariadb.com/kb/en/xid_event/
ctx.GetLogger().Trace("Received binlog event: XID")
createCommit = true
commitToAllDatabases = true
case event.IsQuery():
// A Query event represents a statement executed on the source server that should be executed on the
@@ -375,13 +374,6 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
"sql_mode": fmt.Sprintf("0x%x", query.SqlMode),
}).Trace("Received binlog event: Query")
// When executing SQL statements sent from the primary, we can't be sure what database was modified unless we
// look closely at the statement. For example, we could be connected to db01, but executed
// "create table db02.t (...);" i.e., looking at query.Database is NOT enough to always determine the correct
// database that was modified, so instead, we commit to all databases when we see a Query binlog event to
// avoid issues with correctness, at the cost of being slightly less efficient
commitToAllDatabases = true
if query.Options&mysql.QFlagOptionAutoIsNull > 0 {
ctx.GetLogger().Tracef("Setting sql_auto_is_null ON")
ctx.SetSessionVariable(ctx, "sql_auto_is_null", 1)
@@ -541,13 +533,10 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}
if createCommit {
var databasesToCommit []string
if commitToAllDatabases {
databasesToCommit = getAllUserDatabaseNames(ctx, engine)
for _, database := range databasesToCommit {
executeQueryWithEngine(ctx, engine, "use `"+database+"`;")
executeQueryWithEngine(ctx, engine, "commit;")
}
doltSession := dsess.DSessFromSess(ctx.Session)
databasesToCommit := doltSession.DirtyDatabases()
if err = doltSession.CommitTransaction(ctx, doltSession.GetTransaction()); err != nil {
return err
}
// Record the last GTID processed after the commit
@@ -562,17 +551,46 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}
// For now, create a Dolt commit from every data update. Eventually, we'll want to make this configurable.
ctx.GetLogger().Trace("Creating Dolt commit(s)")
for _, database := range databasesToCommit {
// We commit to every database that we saw had a dirty session these identify the databases where we have
// run DML commands through the engine. We also commit to every database that was modified through a RowEvent,
// which is all tracked through the applier's databasesWithUncommitedChanges property these don't show up
// as dirty in our session, since we used TableWriter to update them.
a.addDatabasesWithUncommittedChanges(databasesToCommit...)
for _, database := range a.databasesWithUncommittedChanges() {
executeQueryWithEngine(ctx, engine, "use `"+database+"`;")
executeQueryWithEngine(ctx, engine,
fmt.Sprintf("call dolt_commit('-Am', 'Dolt binlog replica commit: GTID %s');", a.currentGtid))
}
a.dbsWithUncommittedChanges = nil
}
return nil
}
// addDatabasesWithUncommittedChanges marks the specifeid |dbNames| as databases with uncommitted changes so that
// the replica applier knows which databases need to have Dolt commits created.
func (a *binlogReplicaApplier) addDatabasesWithUncommittedChanges(dbNames ...string) {
if a.dbsWithUncommittedChanges == nil {
a.dbsWithUncommittedChanges = make(map[string]struct{})
}
for _, dbName := range dbNames {
a.dbsWithUncommittedChanges[dbName] = struct{}{}
}
}
// databasesWithUncommittedChanges returns a slice of database names indicating which databases have uncommitted
// changes and need a Dolt commit created.
func (a *binlogReplicaApplier) databasesWithUncommittedChanges() []string {
if a.dbsWithUncommittedChanges == nil {
return nil
}
dbNames := make([]string, 0, len(a.dbsWithUncommittedChanges))
for dbName, _ := range a.dbsWithUncommittedChanges {
dbNames = append(dbNames, dbName)
}
return dbNames
}
// processRowEvent processes a WriteRows, DeleteRows, or UpdateRows binlog event and returns an error if any problems
// were encountered.
func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.BinlogEvent, engine *gms.Engine) error {
@@ -599,6 +617,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
return nil
}
a.addDatabasesWithUncommittedChanges(tableMap.Database)
rows, err := event.Rows(*a.format, tableMap)
if err != nil {
return err

View File

@@ -35,6 +35,7 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
@@ -123,6 +124,47 @@ func TestBinlogReplicationSanityCheck(t *testing.T) {
requireReplicaResults(t, "select * from db01.tableT", [][]any{{"300"}})
}
// TestBinlogReplicationWithHundredsOfDatabases asserts that we can efficiently replicate the creation of hundreds of databases.
func TestBinlogReplicationWithHundredsOfDatabases(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
// Create a table on the primary and verify on the replica
primaryDatabase.MustExec("create table tableT (pk int primary key)")
waitForReplicaToCatchUp(t)
assertCreateTableStatement(t, replicaDatabase, "tableT",
"CREATE TABLE tableT ( pk int NOT NULL, PRIMARY KEY (pk)) "+
"ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin")
assertRepoStateFileExists(t, "db01")
// Create a few hundred databases on the primary and let them replicate to the replica
dbCount := 300
startTime := time.Now()
for i := range dbCount {
dbName := fmt.Sprintf("db%03d", i)
primaryDatabase.MustExec(fmt.Sprintf("create database %s", dbName))
}
waitForReplicaToCatchUp(t)
endTime := time.Now()
logrus.Infof("Time to replicate %d databases: %v", dbCount, endTime.Sub(startTime))
// Spot check the presence of a database on the replica
assertRepoStateFileExists(t, "db042")
// Insert some data in one database
startTime = time.Now()
primaryDatabase.MustExec("use db042;")
primaryDatabase.MustExec("create table t (pk int primary key);")
primaryDatabase.MustExec("insert into t values (100), (101), (102);")
// Verify the results on the replica
waitForReplicaToCatchUp(t)
requireReplicaResults(t, "select * from db042.t;", [][]any{{"100"}, {"101"}, {"102"}})
endTime = time.Now()
logrus.Infof("Time to replicate inserts to 1 database (out of %d): %v", endTime.Sub(startTime), dbCount)
}
// TestAutoRestartReplica tests that a Dolt replica automatically starts up replication if
// replication was running when the replica was shut down.
func TestAutoRestartReplica(t *testing.T) {

View File

@@ -563,6 +563,21 @@ func (d *DoltSession) dirtyWorkingSets() []*branchState {
return dirtyStates
}
// DirtyDatabases returns the names of databases who have outstanding changes in this session and need to be committed
// in a SQL transaction before they are visible to other sessions.
func (d *DoltSession) DirtyDatabases() []string {
var dbNames []string
for _, dbState := range d.dbStates {
for _, branchState := range dbState.heads {
if branchState.dirty {
dbNames = append(dbNames, dbState.dbName)
break
}
}
}
return dbNames
}
// CommitWorkingSet commits the working set for the transaction given, without creating a new dolt commit.
// Clients should typically use CommitTransaction, which performs additional checks, instead of this method.
func (d *DoltSession) CommitWorkingSet(ctx *sql.Context, dbName string, tx sql.Transaction) error {