go: sqle: dsess/transactions: Turn replication failures into session warnings if dolt_cluster_ack_writes_timeout_secs is on.

This commit is contained in:
Aaron Son
2023-05-15 10:51:11 -07:00
parent c1fedfc457
commit db9dce8e50
@@ -24,6 +24,7 @@ import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"
"github.com/dolthub/vitess/go/mysql"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
@@ -255,14 +256,17 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus
}
cCtx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(len(rsc.Wait))
for _, f := range rsc.Wait {
for i, f := range rsc.Wait {
f := f
go func() error {
i := i
go func() {
defer wg.Done()
return f(cCtx)
err := f(cCtx)
if err == nil {
rsc.Wait[i] = nil
}
}()
}
@@ -274,9 +278,28 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus
select {
case <-time.After(time.Duration(timeoutI) * time.Second):
// TODO: Error, warning, something...
// We timed out before all the waiters were done.
// First we make certain to finalize everything.
cancel()
<-done
case <-done:
cancel()
}
// Just because our waiters all completed does not mean they all
// returned nil errors. Any non-nil entries in rsc.Wait returned an
// error. We turn those into warnings here.
numFailed := 0
for _, f := range rsc.Wait {
if f != nil {
numFailed += 1
}
}
ctx.Session.Warn(&sql.Warning{
Level: "Warning",
Code: mysql.ERQueryTimeout,
Message: fmt.Sprintf("Timed out replication of commit to %d out of %d replicas.", numFailed, len(rsc.Wait)),
})
}
// doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit