From dfdb4a4b2a920184a7f641f7dad3e12a275a3207 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 11 May 2023 15:46:55 -0700 Subject: [PATCH 01/13] go: sqle: cluster: Implement a first pass at dolt_cluster_ack_writes_timeout_secs. Setting this system variable to a non-zero value on a primary replica in a sql-server cluster will cause dolt to block a SQL client performing a commit until that client's commit is fully replicated to the replicas. If there is a timeout, currently a warning is logged in the logs. --- .../doltcore/sqle/cluster/commithook.go | 81 ++++++++++++- .../doltcore/sqle/cluster/controller.go | 19 ++++ .../doltcore/sqle/cluster/initdbhook.go | 4 + go/libraries/doltcore/sqle/dsess/variables.go | 5 +- .../doltcore/sqle/system_variables.go | 7 ++ .../tests/sql-server-cluster.yaml | 106 ++++++++++++++++++ 6 files changed, 219 insertions(+), 3 deletions(-) diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 1b540f028d..f3b0bbb94a 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -53,6 +53,16 @@ type commithook struct { // commithooks are caught up with replicating to the standby. waitNotify func() + // This is a slice of notification channels maintained by the + // commithook. The semantics are: + // 1. All accesses to |successChs| must happen with |mu| held. + // 2. There maybe be |0| or more channels in the slice. + // 3. As a reader, if |successChs| is non-empty, you should just read a value, for example, |successChs[0]| and use it. All entries will be closed at the same time. If |successChs| is empty when you need a channel, you should add one to it. + // 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then. + successChs []chan struct{} + + execTimeout time.Duration + role Role // The standby replica to which the new root gets replicated. @@ -143,6 +153,12 @@ func (h *commithook) replicate(ctx context.Context) { if h.waitNotify != nil { h.waitNotify() } + if len(h.successChs) != 0 { + for _, ch := range h.successChs { + close(ch) + } + h.successChs = nil + } h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") } @@ -192,6 +208,13 @@ func (h *commithook) attemptReplicate(ctx context.Context) { } h.cancelReplicate = nil }() + successChs := h.successChs + h.successChs = nil + defer func() { + if len(successChs) != 0 { + h.successChs = append(h.successChs, successChs...) + } + }() h.mu.Unlock() if destDB == nil { @@ -242,6 +265,12 @@ func (h *commithook) attemptReplicate(ctx context.Context) { h.lastPushedHead = toPush h.lastSuccess = incomingTime h.nextPushAttempt = time.Time{} + if len(successChs) != 0 { + for _, ch := range successChs { + close(ch) + } + successChs = nil + } } else { h.currentError = new(string) *h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err) @@ -350,6 +379,47 @@ func (h *commithook) setWaitNotify(f func()) bool { return true } +type replicationResult int + +const replicationResultTimeout = 0 +const replicationResultContextCanceled = 1 +const replicationResultSuccess = 2 + +// Blocks the current goroutine until: +// 1. There is no replication necessary, i.e., isCaughtUp() == true. This returns replicationResultSuccess. +// 2. The replication of |nextHead|, or a later head, at the time this method was called succeeds. This returns replicationResultSuccess. +// 3. ctx.Done() closes. This returns replicationResultContextCanceled. +// 4. timeout passes. This returns replicationResultSuccess. +func (h *commithook) waitForReplicationSuccess(ctx context.Context, timeout time.Duration) replicationResult { + h.mu.Lock() + if h.isCaughtUp() { + h.mu.Unlock() + return replicationResultSuccess + } + if len(h.successChs) == 0 { + h.successChs = append(h.successChs, make(chan struct{})) + } + ch := h.successChs[0] + h.mu.Unlock() + select { + case <-ch: + return replicationResultSuccess + case <-ctx.Done(): + return replicationResultContextCanceled + case <-time.After(timeout): + return replicationResultTimeout + } +} + +// Set by the controller. If it is non-zero, the Execute() DatabaseHook +// callback will block the calling goroutine for that many seconds waiting for +// replication quiescence. +func (h *commithook) setExecTimeout(timeout time.Duration) { + h.mu.Lock() + h.execTimeout = timeout + h.mu.Unlock() +} + var errDetectedBrokenConfigStr = "error: more than one server was configured as primary in the same epoch. this server has stopped accepting writes. choose a primary in the cluster and call dolt_assume_cluster_role() on servers in the cluster to start replication at a higher epoch" // Execute on this commithook updates the target root hash we're attempting to @@ -364,10 +434,10 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat return err } h.mu.Lock() - defer h.mu.Unlock() lgr = h.logger() if h.role != RolePrimary { lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID()) + h.mu.Unlock() return nil } if root != h.nextHead { @@ -377,6 +447,15 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat h.nextPushAttempt = time.Time{} h.cond.Signal() } + execTimeout := h.execTimeout + h.mu.Unlock() + if execTimeout != time.Duration(0) { + res := h.waitForReplicationSuccess(ctx, execTimeout) + if res != replicationResultSuccess { + // TODO: Get this failure into the *sql.Context warnings. + lgr.Warnf("cluster/commithook failed to replicate write before the timeout. timeout: %d, wait result: %v", execTimeout, res) + } + } return nil } diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 322af3ed63..e4facd8eab 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -82,6 +82,7 @@ type Controller struct { type sqlvars interface { AddSystemVariables(sysVars []sql.SystemVariable) + GetGlobal(name string) (sql.SystemVariable, interface{}, bool) } // We can manage certain aspects of the exposed databases on the server through @@ -175,6 +176,20 @@ func (c *Controller) ManageSystemVariables(variables sqlvars) { c.mu.Lock() defer c.mu.Unlock() c.systemVars = variables + + // We reset this system variable here to put our NotifyChanged on it. + v, _, ok := variables.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) + if !ok { + panic(fmt.Sprintf("internal error: did not find required global system variable %s", dsess.DoltClusterAckWritesTimeoutSecs)) + } + v.NotifyChanged = func(scope sql.SystemVariableScope, v sql.SystemVarValue) { + c.mu.Lock() + defer c.mu.Unlock() + for _, hook := range c.commithooks { + hook.setExecTimeout(time.Duration(v.Val.(int64)) * time.Second) + } + } + variables.AddSystemVariables([]sql.SystemVariable{v}) c.refreshSystemVars() } @@ -194,6 +209,10 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql. if err != nil { return err } + _, execTimeoutVal, _ := c.systemVars.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) + for _, h := range hooks { + h.setExecTimeout(time.Duration(execTimeoutVal.(int64)) * time.Second) + } c.commithooks = append(c.commithooks, hooks...) } return nil diff --git a/go/libraries/doltcore/sqle/cluster/initdbhook.go b/go/libraries/doltcore/sqle/cluster/initdbhook.go index 7713c91540..588f35a235 100644 --- a/go/libraries/doltcore/sqle/cluster/initdbhook.go +++ b/go/libraries/doltcore/sqle/cluster/initdbhook.go @@ -17,6 +17,7 @@ package cluster import ( "context" "strings" + "time" "github.com/dolthub/go-mysql-server/sql" @@ -57,6 +58,8 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig }) } + _, execTimeoutVal, _ := controller.systemVars.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) + role, _ := controller.roleAndEpoch() for i, r := range controller.cfg.StandbyRemotes() { ttfdir, err := denv.TempTableFilesDir() @@ -64,6 +67,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig return err } commitHook := newCommitHook(controller.lgr, r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir) + commitHook.setExecTimeout(time.Duration(execTimeoutVal.(int64)) * time.Second) denv.DoltDB.PrependCommitHook(ctx, commitHook) controller.registerCommitHook(commitHook) if err := commitHook.Run(bt); err != nil { diff --git a/go/libraries/doltcore/sqle/dsess/variables.go b/go/libraries/doltcore/sqle/dsess/variables.go index c979dded55..07b18431e7 100644 --- a/go/libraries/doltcore/sqle/dsess/variables.go +++ b/go/libraries/doltcore/sqle/dsess/variables.go @@ -52,8 +52,9 @@ const ( ShowBranchDatabases = "dolt_show_branch_databases" DoltLogLevel = "dolt_log_level" - DoltClusterRoleVariable = "dolt_cluster_role" - DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch" + DoltClusterRoleVariable = "dolt_cluster_role" + DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch" + DoltClusterAckWritesTimeoutSecs = "dolt_cluster_ack_writes_timeout_secs" ) const URLTemplateDatabasePlaceholder = "{database}" diff --git a/go/libraries/doltcore/sqle/system_variables.go b/go/libraries/doltcore/sqle/system_variables.go index 9875b7642b..a18ec75336 100644 --- a/go/libraries/doltcore/sqle/system_variables.go +++ b/go/libraries/doltcore/sqle/system_variables.go @@ -166,6 +166,13 @@ func AddDoltSystemVariables() { Type: types.NewSystemBoolType(dsess.ShowBranchDatabases), Default: int8(0), }, + { + Name: dsess.DoltClusterAckWritesTimeoutSecs, + Dynamic: true, + Scope: sql.SystemVariableScope_Persist, + Type: types.NewSystemIntType(dsess.DoltClusterAckWritesTimeoutSecs, 0, 60, false), + Default: int64(0), + }, }) } diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index 1bd5b6ec00..c8a75205d5 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -992,6 +992,112 @@ tests: result: columns: ["count(*)"] rows: [["5"]] +- name: dolt_cluster_ack_writes_timeout_secs behavior + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + # This test writes new commits on the primary and quickly checks for them on + # the secondary. If dolt_cluster_ack_writes_timeout_secs is working as + # intended, the new writes will always be present. + connections: + - on: server1 + queries: + - exec: 'SET @@PERSIST.dolt_cluster_ack_writes_timeout_secs = 10' + - exec: 'CREATE DATABASE repo1' + - exec: 'USE repo1' + - exec: 'CREATE TABLE vals (i INT PRIMARY KEY)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: "SHOW TABLES" + result: + columns: ["Tables_in_repo1"] + rows: [["vals"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'INSERT INTO vals VALUES (0),(1),(2),(3),(4)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM vals' + result: + columns: ["COUNT(*)"] + rows: [["5"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'INSERT INTO vals VALUES (5),(6),(7),(8),(9)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM vals' + result: + columns: ["COUNT(*)"] + rows: [["10"]] + # Restart both servers to test the behavior of the persisted variable. + - on: server1 + restart_server: {} + - on: server2 + restart_server: {} + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'INSERT INTO vals VALUES (10),(11),(12),(13),(14)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM vals' + result: + columns: ["COUNT(*)"] + rows: [["15"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'INSERT INTO vals VALUES (15),(16),(17),(18),(19)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM vals' + result: + columns: ["COUNT(*)"] + rows: [["20"]] - name: call dolt checkout multi_repos: - name: server1 From af966fa9fdf798be17a456e526cf0b8393e99211 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 12 May 2023 16:13:15 -0700 Subject: [PATCH 02/13] Add ReplicationStatusController out parameter to some DoltDB write methods. --- go/cmd/dolt/commands/commit.go | 1 + go/cmd/dolt/commands/merge.go | 1 + go/libraries/doltcore/doltdb/commit_hooks.go | 16 +++---- .../doltcore/doltdb/commit_hooks_test.go | 4 +- go/libraries/doltcore/doltdb/doltdb.go | 17 ++++++-- go/libraries/doltcore/doltdb/hooksdatabase.go | 43 +++++++++++++++++-- .../dtestutils/testcommands/multienv.go | 1 + go/libraries/doltcore/env/actions/checkout.go | 1 + go/libraries/doltcore/env/actions/reset.go | 2 +- go/libraries/doltcore/env/environment.go | 8 ++-- go/libraries/doltcore/env/memory.go | 4 +- go/libraries/doltcore/migrate/progress.go | 2 +- go/libraries/doltcore/migrate/transform.go | 2 +- .../binlog_replica_applier.go | 2 +- .../doltcore/sqle/cluster/commithook.go | 8 ++-- .../sqle/dprocedures/dolt_checkout.go | 2 +- .../doltcore/sqle/dsess/transactions.go | 4 +- .../doltcore/sqle/read_replica_database.go | 2 +- 18 files changed, 85 insertions(+), 35 deletions(-) diff --git a/go/cmd/dolt/commands/commit.go b/go/cmd/dolt/commands/commit.go index b0b7a53818..40c197c120 100644 --- a/go/cmd/dolt/commands/commit.go +++ b/go/cmd/dolt/commands/commit.go @@ -229,6 +229,7 @@ func performCommit(ctx context.Context, commandStr string, args []string, dEnv * ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(), prevHash, dEnv.NewWorkingSetMeta(fmt.Sprintf("Updated by %s %s", commandStr, strings.Join(args, " "))), + nil, ) if err != nil { if apr.Contains(cli.AmendFlag) { diff --git a/go/cmd/dolt/commands/merge.go b/go/cmd/dolt/commands/merge.go index 0b3731ca48..b9751f2fd5 100644 --- a/go/cmd/dolt/commands/merge.go +++ b/go/cmd/dolt/commands/merge.go @@ -538,6 +538,7 @@ func executeNoFFMergeAndCommit(ctx context.Context, dEnv *env.DoltEnv, spec *mer ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(), wsHash, dEnv.NewWorkingSetMeta(msg), + nil, ) if err != nil { diff --git a/go/libraries/doltcore/doltdb/commit_hooks.go b/go/libraries/doltcore/doltdb/commit_hooks.go index faf8cac34c..9fc349f78e 100644 --- a/go/libraries/doltcore/doltdb/commit_hooks.go +++ b/go/libraries/doltcore/doltdb/commit_hooks.go @@ -49,8 +49,8 @@ func NewPushOnWriteHook(destDB *DoltDB, tmpDir string) *PushOnWriteHook { } // Execute implements CommitHook, replicates head updates to the destDb field -func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { - return pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir) +func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) { + return nil, pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir) } func pushDataset(ctx context.Context, destDB, srcDB datas.Database, ds datas.Dataset, tmpDir string) error { @@ -135,16 +135,16 @@ func (*AsyncPushOnWriteHook) ExecuteForWorkingSets() bool { } // Execute implements CommitHook, replicates head updates to the destDb field -func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { +func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) { addr, _ := ds.MaybeHeadAddr() select { case ah.ch <- PushArg{ds: ds, db: db, hash: addr}: case <-ctx.Done(): ah.ch <- PushArg{ds: ds, db: db, hash: addr} - return ctx.Err() + return nil, ctx.Err() } - return nil + return nil, nil } // HandleError implements CommitHook @@ -174,12 +174,12 @@ func NewLogHook(msg []byte) *LogHook { } // Execute implements CommitHook, writes message to log channel -func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { +func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) { if lh.out != nil { _, err := lh.out.Write(lh.msg) - return err + return nil, err } - return nil + return nil, nil } // HandleError implements CommitHook diff --git a/go/libraries/doltcore/doltdb/commit_hooks_test.go b/go/libraries/doltcore/doltdb/commit_hooks_test.go index 02c5806673..80e10facff 100644 --- a/go/libraries/doltcore/doltdb/commit_hooks_test.go +++ b/go/libraries/doltcore/doltdb/commit_hooks_test.go @@ -136,7 +136,7 @@ func TestPushOnWriteHook(t *testing.T) { ds, err := ddb.db.GetDataset(ctx, "refs/heads/main") require.NoError(t, err) - err = hook.Execute(ctx, ds, ddb.db) + _, err = hook.Execute(ctx, ds, ddb.db) require.NoError(t, err) cs, _ = NewCommitSpec(defaultBranch) @@ -269,7 +269,7 @@ func TestAsyncPushOnWrite(t *testing.T) { require.NoError(t, err) ds, err := ddb.db.GetDataset(ctx, "refs/heads/main") require.NoError(t, err) - err = hook.Execute(ctx, ds, ddb.db) + _, err = hook.Execute(ctx, ds, ddb.db) require.NoError(t, err) } }) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 07654e85bb..56a2fc1e54 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1124,7 +1124,7 @@ func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, } ws = ws.WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot) - return ddb.UpdateWorkingSet(ctx, wsRef, ws, currWsHash, TodoWorkingSetMeta()) + return ddb.UpdateWorkingSet(ctx, wsRef, ws, currWsHash, TodoWorkingSetMeta(), nil) } // CopyWorkingSet copies a WorkingSetRef from one ref to another. If `force` is @@ -1155,7 +1155,7 @@ func (ddb *DoltDB) CopyWorkingSet(ctx context.Context, fromWSRef ref.WorkingSetR } } - return ddb.UpdateWorkingSet(ctx, toWSRef, ws, currWsHash, TodoWorkingSetMeta()) + return ddb.UpdateWorkingSet(ctx, toWSRef, ws, currWsHash, TodoWorkingSetMeta(), nil) } // DeleteBranch deletes the branch given, returning an error if it doesn't exist. @@ -1216,6 +1216,13 @@ func (ddb *DoltDB) NewTagAtCommit(ctx context.Context, tagRef ref.DoltRef, c *Co return err } +type ReplicationStatusController struct { + // A slice of funcs which can be called to wait for the replication + // associated with a commithook to complete. Must return if the + // associated Context is canceled. + Wait []func(ctx context.Context) error +} + // UpdateWorkingSet updates the working set with the ref given to the root value given // |prevHash| is the hash of the expected WorkingSet struct stored in the ref, not the hash of the RootValue there. func (ddb *DoltDB) UpdateWorkingSet( @@ -1224,6 +1231,7 @@ func (ddb *DoltDB) UpdateWorkingSet( workingSet *WorkingSet, prevHash hash.Hash, meta *datas.WorkingSetMeta, + replicationStatus *ReplicationStatusController, ) error { ds, err := ddb.db.GetDataset(ctx, workingSetRef.String()) if err != nil { @@ -1235,6 +1243,7 @@ func (ddb *DoltDB) UpdateWorkingSet( return err } + ctx = withReplicaState(ctx, replicationStatus) _, err = ddb.db.UpdateWorkingSet(ctx, ds, datas.WorkingSetSpec{ Meta: meta, WorkingRoot: workingRootRef, @@ -1255,6 +1264,7 @@ func (ddb *DoltDB) CommitWithWorkingSet( commit *PendingCommit, workingSet *WorkingSet, prevHash hash.Hash, meta *datas.WorkingSetMeta, + replicationStatus *ReplicationStatusController, ) (*Commit, error) { wsDs, err := ddb.db.GetDataset(ctx, workingSetRef.String()) if err != nil { @@ -1271,7 +1281,8 @@ func (ddb *DoltDB) CommitWithWorkingSet( return nil, err } - commitDataset, _, err := ddb.db.CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{ + rsCtx := withReplicaState(ctx, replicationStatus) + commitDataset, _, err := ddb.db.CommitWithWorkingSet(rsCtx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{ Meta: meta, WorkingRoot: workingRootRef, StagedRoot: stagedRef, diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 54d128d043..656d666251 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -25,6 +25,24 @@ import ( "sync" ) +type replicaStateContextKey struct{ +} + +func withReplicaState(ctx context.Context, c *ReplicationStatusController) context.Context { + if c != nil { + return context.WithValue(ctx, replicaStateContextKey{}, c) + } + return ctx +} + +func getReplicaState(ctx context.Context) *ReplicationStatusController { + v := ctx.Value(replicaStateContextKey{}) + if v == nil { + return nil + } + return v.(*ReplicationStatusController) +} + type hooksDatabase struct { datas.Database postCommitHooks []CommitHook @@ -33,7 +51,7 @@ type hooksDatabase struct { // CommitHook is an abstraction for executing arbitrary commands after atomic database commits type CommitHook interface { // Execute is arbitrary read-only function whose arguments are new Dataset commit into a specific Database - Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error + Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) // HandleError is an bridge function to handle Execute errors HandleError(ctx context.Context, err error) error // SetLogger lets clients specify an output stream for HandleError @@ -59,22 +77,39 @@ func (db hooksDatabase) PostCommitHooks() []CommitHook { } func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset, onlyWS bool) { - var err error var wg sync.WaitGroup - for _, hook := range db.postCommitHooks { + rsc := getReplicaState(ctx) + if rsc != nil { + rsc.Wait = make([]func(context.Context) error, len(db.postCommitHooks)) + } + for il, hook := range db.postCommitHooks { if !onlyWS || hook.ExecuteForWorkingSets() { + i := il hook := hook wg.Add(1) go func() { defer wg.Done() - err = hook.Execute(ctx, ds, db) + f, err := hook.Execute(ctx, ds, db) if err != nil { hook.HandleError(ctx, err) } + if rsc != nil { + rsc.Wait[i] = f + } }() } } wg.Wait() + if rsc != nil { + j := 0 + for i := range rsc.Wait { + if rsc.Wait[i] != nil { + rsc.Wait[j] = rsc.Wait[i] + j++ + } + } + rsc.Wait = rsc.Wait[:j] + } } func (db hooksDatabase) CommitWithWorkingSet( diff --git a/go/libraries/doltcore/dtestutils/testcommands/multienv.go b/go/libraries/doltcore/dtestutils/testcommands/multienv.go index 202c249e96..1a02907169 100644 --- a/go/libraries/doltcore/dtestutils/testcommands/multienv.go +++ b/go/libraries/doltcore/dtestutils/testcommands/multienv.go @@ -268,6 +268,7 @@ func (mr *MultiRepoTestSetup) CommitWithWorkingSet(dbName string) *doltdb.Commit ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(), prevHash, doltdb.TodoWorkingSetMeta(), + nil, ) if err != nil { panic("couldn't commit: " + err.Error()) diff --git a/go/libraries/doltcore/env/actions/checkout.go b/go/libraries/doltcore/env/actions/checkout.go index 54b9e07980..9f8a8bc26e 100644 --- a/go/libraries/doltcore/env/actions/checkout.go +++ b/go/libraries/doltcore/env/actions/checkout.go @@ -331,6 +331,7 @@ func cleanOldWorkingSet( initialWs.WithWorkingRoot(newRoots.Working).WithStagedRoot(newRoots.Staged).ClearMerge(), h, dEnv.NewWorkingSetMeta("reset hard"), + nil, ) if err != nil { return err diff --git a/go/libraries/doltcore/env/actions/reset.go b/go/libraries/doltcore/env/actions/reset.go index 0a601ff365..62b8903b0e 100644 --- a/go/libraries/doltcore/env/actions/reset.go +++ b/go/libraries/doltcore/env/actions/reset.go @@ -164,7 +164,7 @@ func ResetHard( return err } - err = dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.WithWorkingRoot(roots.Working).WithStagedRoot(roots.Staged).ClearMerge(), h, dEnv.NewWorkingSetMeta("reset hard")) + err = dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.WithWorkingRoot(roots.Working).WithStagedRoot(roots.Staged).ClearMerge(), h, dEnv.NewWorkingSetMeta("reset hard"), nil) if err != nil { return err } diff --git a/go/libraries/doltcore/env/environment.go b/go/libraries/doltcore/env/environment.go index d7cb31a44b..4eca2b6a9a 100644 --- a/go/libraries/doltcore/env/environment.go +++ b/go/libraries/doltcore/env/environment.go @@ -629,7 +629,7 @@ func (dEnv *DoltEnv) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb.Root wsRef = ws.Ref() } - return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, dEnv.workingSetMeta()) + return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, dEnv.workingSetMeta(), nil) } // UpdateWorkingSet updates the working set for the current working branch to the value given. @@ -648,7 +648,7 @@ func (dEnv *DoltEnv) UpdateWorkingSet(ctx context.Context, ws *doltdb.WorkingSet } } - return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws, h, dEnv.workingSetMeta()) + return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws, h, dEnv.workingSetMeta(), nil) } type repoStateReader struct { @@ -758,7 +758,7 @@ func (dEnv *DoltEnv) UpdateStagedRoot(ctx context.Context, newRoot *doltdb.RootV wsRef = ws.Ref() } - return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, dEnv.workingSetMeta()) + return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, dEnv.workingSetMeta(), nil) } func (dEnv *DoltEnv) AbortMerge(ctx context.Context) error { @@ -772,7 +772,7 @@ func (dEnv *DoltEnv) AbortMerge(ctx context.Context) error { return err } - return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.AbortMerge(), h, dEnv.workingSetMeta()) + return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.AbortMerge(), h, dEnv.workingSetMeta(), nil) } func (dEnv *DoltEnv) workingSetMeta() *datas.WorkingSetMeta { diff --git a/go/libraries/doltcore/env/memory.go b/go/libraries/doltcore/env/memory.go index 7be9bdfadf..c973fc428f 100644 --- a/go/libraries/doltcore/env/memory.go +++ b/go/libraries/doltcore/env/memory.go @@ -136,7 +136,7 @@ func (m MemoryRepoState) UpdateStagedRoot(ctx context.Context, newRoot *doltdb.R wsRef = ws.Ref() } - return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, m.workingSetMeta()) + return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, m.workingSetMeta(), nil) } func (m MemoryRepoState) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb.RootValue) error { @@ -162,7 +162,7 @@ func (m MemoryRepoState) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb. wsRef = ws.Ref() } - return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, m.workingSetMeta()) + return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, m.workingSetMeta(), nil) } func (m MemoryRepoState) WorkingSet(ctx context.Context) (*doltdb.WorkingSet, error) { diff --git a/go/libraries/doltcore/migrate/progress.go b/go/libraries/doltcore/migrate/progress.go index 12139e6b54..26592dee8c 100644 --- a/go/libraries/doltcore/migrate/progress.go +++ b/go/libraries/doltcore/migrate/progress.go @@ -298,6 +298,6 @@ func commitRoot( Name: meta.Name, Email: meta.Email, Timestamp: uint64(time.Now().Unix()), - }) + }, nil) return err } diff --git a/go/libraries/doltcore/migrate/transform.go b/go/libraries/doltcore/migrate/transform.go index 5c17cbc605..5101475c31 100644 --- a/go/libraries/doltcore/migrate/transform.go +++ b/go/libraries/doltcore/migrate/transform.go @@ -93,7 +93,7 @@ func migrateWorkingSet(ctx context.Context, menv Environment, brRef ref.BranchRe newWs := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(wr).WithStagedRoot(sr) - return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta()) + return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta(), nil) } func migrateCommit(ctx context.Context, menv Environment, oldCm *doltdb.Commit, new *doltdb.DoltDB, prog *progress) error { diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go index f286fcbb95..c83c9e25d0 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go @@ -706,7 +706,7 @@ func closeWriteSession(ctx *sql.Context, engine *gms.Engine, databaseName string return err } - return sqlDatabase.DbData().Ddb.UpdateWorkingSet(ctx, newWorkingSet.Ref(), newWorkingSet, hash, newWorkingSet.Meta()) + return sqlDatabase.DbData().Ddb.UpdateWorkingSet(ctx, newWorkingSet.Ref(), newWorkingSet, hash, newWorkingSet.Meta(), nil) } // getTableSchema returns a sql.Schema for the specified table in the specified database. diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index f3b0bbb94a..0ed653d5dc 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -424,21 +424,21 @@ var errDetectedBrokenConfigStr = "error: more than one server was configured as // Execute on this commithook updates the target root hash we're attempting to // replicate and wakes the replication thread. -func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { +func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) { lgr := h.logger() lgr.Tracef("cluster/commithook: Execute called post commit") cs := datas.ChunkStoreFromDatabase(db) root, err := cs.Root(ctx) if err != nil { lgr.Errorf("cluster/commithook: Execute: error retrieving local database root: %v", err) - return err + return nil, err } h.mu.Lock() lgr = h.logger() if h.role != RolePrimary { lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID()) h.mu.Unlock() - return nil + return nil, nil } if root != h.nextHead { lgr.Tracef("signaling replication thread to push new head: %v", root.String()) @@ -456,7 +456,7 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat lgr.Warnf("cluster/commithook failed to replicate write before the timeout. timeout: %d, wait result: %v", execTimeout, res) } } - return nil + return nil, nil } func (h *commithook) HandleError(ctx context.Context, err error) error { diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go b/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go index 21ef428198..5ab0417340 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go @@ -168,7 +168,7 @@ func createWorkingSetForLocalBranch(ctx *sql.Context, ddb *doltdb.DoltDB, branch } ws := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot) - return ddb.UpdateWorkingSet(ctx, wsRef, ws, hash.Hash{} /* current hash... */, doltdb.TodoWorkingSetMeta()) + return ddb.UpdateWorkingSet(ctx, wsRef, ws, hash.Hash{} /* current hash... */, doltdb.TodoWorkingSetMeta(), nil) } // getRevisionForRevisionDatabase returns the root database name and revision for a database, or just the root database name if the specified db name is not a revision database. diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index f3786e2379..0c1a66201b 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -216,7 +216,7 @@ func doltCommit(ctx *sql.Context, workingSet = workingSet.ClearMerge() - newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx)) + newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), nil) return workingSet, newCommit, err } @@ -227,7 +227,7 @@ func txCommit(ctx *sql.Context, workingSet *doltdb.WorkingSet, hash hash.Hash, ) (*doltdb.WorkingSet, *doltdb.Commit, error) { - return workingSet, nil, tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx)) + return workingSet, nil, tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), nil) } // DoltCommit commits the working set and creates a new DoltCommit as specified, in one atomic write diff --git a/go/libraries/doltcore/sqle/read_replica_database.go b/go/libraries/doltcore/sqle/read_replica_database.go index 511878bc95..beb4ae50fd 100644 --- a/go/libraries/doltcore/sqle/read_replica_database.go +++ b/go/libraries/doltcore/sqle/read_replica_database.go @@ -334,7 +334,7 @@ func pullBranchesAndUpdateWorkingSet( if commitRootHash != wsWorkingRootHash || commitRootHash != wsStagedRootHash { ws = ws.WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot) - err = rrd.ddb.UpdateWorkingSet(ctx, ws.Ref(), ws, prevHash, doltdb.TodoWorkingSetMeta()) + err = rrd.ddb.UpdateWorkingSet(ctx, ws.Ref(), ws, prevHash, doltdb.TodoWorkingSetMeta(), nil) if err == nil { return nil } From 261b10c92bbd7262f4c8daf38d995d4dbb7bfde6 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 12 May 2023 16:40:42 -0700 Subject: [PATCH 03/13] go: sqle: transactions,cluster: Move block-for-cluster-replication behavior to Dolt transaction logic and out of the commit hook. --- .../doltcore/sqle/cluster/commithook.go | 67 +++++-------------- .../doltcore/sqle/cluster/controller.go | 18 ----- .../doltcore/sqle/cluster/initdbhook.go | 4 -- .../doltcore/sqle/dsess/transactions.go | 48 ++++++++++++- 4 files changed, 61 insertions(+), 76 deletions(-) diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 0ed653d5dc..f49ce7123d 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -61,8 +61,6 @@ type commithook struct { // 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then. successChs []chan struct{} - execTimeout time.Duration - role Role // The standby replica to which the new root gets replicated. @@ -379,47 +377,6 @@ func (h *commithook) setWaitNotify(f func()) bool { return true } -type replicationResult int - -const replicationResultTimeout = 0 -const replicationResultContextCanceled = 1 -const replicationResultSuccess = 2 - -// Blocks the current goroutine until: -// 1. There is no replication necessary, i.e., isCaughtUp() == true. This returns replicationResultSuccess. -// 2. The replication of |nextHead|, or a later head, at the time this method was called succeeds. This returns replicationResultSuccess. -// 3. ctx.Done() closes. This returns replicationResultContextCanceled. -// 4. timeout passes. This returns replicationResultSuccess. -func (h *commithook) waitForReplicationSuccess(ctx context.Context, timeout time.Duration) replicationResult { - h.mu.Lock() - if h.isCaughtUp() { - h.mu.Unlock() - return replicationResultSuccess - } - if len(h.successChs) == 0 { - h.successChs = append(h.successChs, make(chan struct{})) - } - ch := h.successChs[0] - h.mu.Unlock() - select { - case <-ch: - return replicationResultSuccess - case <-ctx.Done(): - return replicationResultContextCanceled - case <-time.After(timeout): - return replicationResultTimeout - } -} - -// Set by the controller. If it is non-zero, the Execute() DatabaseHook -// callback will block the calling goroutine for that many seconds waiting for -// replication quiescence. -func (h *commithook) setExecTimeout(timeout time.Duration) { - h.mu.Lock() - h.execTimeout = timeout - h.mu.Unlock() -} - var errDetectedBrokenConfigStr = "error: more than one server was configured as primary in the same epoch. this server has stopped accepting writes. choose a primary in the cluster and call dolt_assume_cluster_role() on servers in the cluster to start replication at a higher epoch" // Execute on this commithook updates the target root hash we're attempting to @@ -434,10 +391,10 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat return nil, err } h.mu.Lock() + defer h.mu.Unlock() lgr = h.logger() if h.role != RolePrimary { lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID()) - h.mu.Unlock() return nil, nil } if root != h.nextHead { @@ -447,16 +404,22 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat h.nextPushAttempt = time.Time{} h.cond.Signal() } - execTimeout := h.execTimeout - h.mu.Unlock() - if execTimeout != time.Duration(0) { - res := h.waitForReplicationSuccess(ctx, execTimeout) - if res != replicationResultSuccess { - // TODO: Get this failure into the *sql.Context warnings. - lgr.Warnf("cluster/commithook failed to replicate write before the timeout. timeout: %d, wait result: %v", execTimeout, res) + var waitF func(context.Context) error + if !h.isCaughtUp() { + if len(h.successChs) == 0 { + h.successChs = append(h.successChs, make(chan struct{})) + } + successCh := h.successChs[0] + waitF = func(ctx context.Context) error { + select { + case <-successCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } } } - return nil, nil + return waitF, nil } func (h *commithook) HandleError(ctx context.Context, err error) error { diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index e4facd8eab..7c821aa6e0 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -176,20 +176,6 @@ func (c *Controller) ManageSystemVariables(variables sqlvars) { c.mu.Lock() defer c.mu.Unlock() c.systemVars = variables - - // We reset this system variable here to put our NotifyChanged on it. - v, _, ok := variables.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) - if !ok { - panic(fmt.Sprintf("internal error: did not find required global system variable %s", dsess.DoltClusterAckWritesTimeoutSecs)) - } - v.NotifyChanged = func(scope sql.SystemVariableScope, v sql.SystemVarValue) { - c.mu.Lock() - defer c.mu.Unlock() - for _, hook := range c.commithooks { - hook.setExecTimeout(time.Duration(v.Val.(int64)) * time.Second) - } - } - variables.AddSystemVariables([]sql.SystemVariable{v}) c.refreshSystemVars() } @@ -209,10 +195,6 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql. if err != nil { return err } - _, execTimeoutVal, _ := c.systemVars.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) - for _, h := range hooks { - h.setExecTimeout(time.Duration(execTimeoutVal.(int64)) * time.Second) - } c.commithooks = append(c.commithooks, hooks...) } return nil diff --git a/go/libraries/doltcore/sqle/cluster/initdbhook.go b/go/libraries/doltcore/sqle/cluster/initdbhook.go index 588f35a235..7713c91540 100644 --- a/go/libraries/doltcore/sqle/cluster/initdbhook.go +++ b/go/libraries/doltcore/sqle/cluster/initdbhook.go @@ -17,7 +17,6 @@ package cluster import ( "context" "strings" - "time" "github.com/dolthub/go-mysql-server/sql" @@ -58,8 +57,6 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig }) } - _, execTimeoutVal, _ := controller.systemVars.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) - role, _ := controller.roleAndEpoch() for i, r := range controller.cfg.StandbyRemotes() { ttfdir, err := denv.TempTableFilesDir() @@ -67,7 +64,6 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig return err } commitHook := newCommitHook(controller.lgr, r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir) - commitHook.setExecTimeout(time.Duration(execTimeoutVal.(int64)) * time.Second) denv.DoltDB.PrependCommitHook(ctx, commitHook) controller.registerCommitHook(commitHook) if err := commitHook.Run(bt); err != nil { diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index 0c1a66201b..3234ec9e5a 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -15,6 +15,7 @@ package dsess import ( + "context" "errors" "fmt" "strings" @@ -216,7 +217,9 @@ func doltCommit(ctx *sql.Context, workingSet = workingSet.ClearMerge() - newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), nil) + var rsc doltdb.ReplicationStatusController + newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), &rsc) + waitForReplicationController(ctx, rsc) return workingSet, newCommit, err } @@ -227,7 +230,10 @@ func txCommit(ctx *sql.Context, workingSet *doltdb.WorkingSet, hash hash.Hash, ) (*doltdb.WorkingSet, *doltdb.Commit, error) { - return workingSet, nil, tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), nil) + var rsc doltdb.ReplicationStatusController + err := tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), &rsc) + waitForReplicationController(ctx, rsc) + return workingSet, nil, err } // DoltCommit commits the working set and creates a new DoltCommit as specified, in one atomic write @@ -235,6 +241,44 @@ func (tx *DoltTransaction) DoltCommit(ctx *sql.Context, workingSet *doltdb.Worki return tx.doCommit(ctx, workingSet, commit, doltCommit) } +func waitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatusController) { + if len(rsc.Wait) == 0 { + return + } + _, timeout, ok := sql.SystemVariables.GetGlobal(DoltClusterAckWritesTimeoutSecs) + if !ok { + return + } + timeoutI := timeout.(int64) + if timeoutI == 0 { + return + } + + cCtx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + wg.Add(len(rsc.Wait)) + for _, f := range rsc.Wait { + f := f + go func() error { + defer wg.Done() + return f(cCtx) + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-time.After(time.Duration(timeoutI) * time.Second): + // TODO: Error, warning, something... + case <-done: + } +} + // doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit func (tx *DoltTransaction) doCommit( ctx *sql.Context, From 2e4107fe22948a383ec68872b0281dc32458dd84 Mon Sep 17 00:00:00 2001 From: reltuk Date: Fri, 12 May 2023 23:50:15 +0000 Subject: [PATCH 04/13] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/doltdb/hooksdatabase.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 656d666251..96bf911e3e 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -25,7 +25,7 @@ import ( "sync" ) -type replicaStateContextKey struct{ +type replicaStateContextKey struct { } func withReplicaState(ctx context.Context, c *ReplicationStatusController) context.Context { From 535e59141d9a75af63e4ef8097c88957d68df548 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Sat, 13 May 2023 07:59:39 -0700 Subject: [PATCH 05/13] integration-tests/go-sql-server-driver: Cluster, dolt_cluster_ack_writes_timeout_secs, add a failing test to assert that dolt_branch modifications are waited on. --- .../tests/sql-server-cluster.yaml | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index c8a75205d5..48d9892d97 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -1098,6 +1098,51 @@ tests: result: columns: ["COUNT(*)"] rows: [["20"]] + # Assert that branch creation and deletion also blocks on replication. + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'CALL DOLT_BRANCH("new_branch")' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM dolt_branches' + result: + columns: ["COUNT(*)"] + rows: [["2"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'CALL DOLT_BRANCH("-d", "new_branch")' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM dolt_branches' + result: + columns: ["COUNT(*)"] + rows: [["1"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'CALL DOLT_BRANCH("new_branch")' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM dolt_branches' + result: + columns: ["COUNT(*)"] + rows: [["2"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'CALL DOLT_BRANCH("-d", "new_branch")' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM dolt_branches' + result: + columns: ["COUNT(*)"] + rows: [["1"]] - name: call dolt checkout multi_repos: - name: server1 From c1fedfc4577c078bb0586e9f22e0a79d7ea79116 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 15 May 2023 10:20:12 -0700 Subject: [PATCH 06/13] go/libraries/doltcore/sqle: dprocedures: dolt_branch: Make dolt branch operations (and some related operations, like dolt checkout) able to block on cluster replication. --- go/cmd/dolt/commands/branch.go | 6 +-- go/cmd/dolt/commands/checkout.go | 2 +- go/libraries/doltcore/doltdb/doltdb.go | 9 +++-- go/libraries/doltcore/env/actions/branch.go | 34 +++++++++-------- go/libraries/doltcore/env/actions/clone.go | 2 +- go/libraries/doltcore/env/actions/remotes.go | 4 +- go/libraries/doltcore/env/environment.go | 2 +- go/libraries/doltcore/migrate/progress.go | 2 +- go/libraries/doltcore/rebase/rebase.go | 2 +- .../doltcore/sqle/dprocedures/dolt_branch.go | 38 +++++++++++-------- .../sqle/dprocedures/dolt_checkout.go | 16 +++++--- .../doltcore/sqle/dprocedures/dolt_remote.go | 12 ++++-- .../doltcore/sqle/dsess/transactions.go | 6 +-- .../doltcore/sqle/read_replica_database.go | 6 +-- 14 files changed, 80 insertions(+), 61 deletions(-) diff --git a/go/cmd/dolt/commands/branch.go b/go/cmd/dolt/commands/branch.go index d1ba4b1856..a63f6d99a6 100644 --- a/go/cmd/dolt/commands/branch.go +++ b/go/cmd/dolt/commands/branch.go @@ -245,7 +245,7 @@ func moveBranch(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgParseR force := apr.Contains(cli.ForceFlag) src := apr.Arg(0) dest := apr.Arg(1) - err := actions.RenameBranch(ctx, dEnv.DbData(), src, apr.Arg(1), dEnv, force) + err := actions.RenameBranch(ctx, dEnv.DbData(), src, apr.Arg(1), dEnv, force, nil) var verr errhand.VerboseError if err != nil { @@ -306,7 +306,7 @@ func deleteBranches(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPa err := actions.DeleteBranch(ctx, dEnv.DbData(), brName, actions.DeleteOptions{ Force: force, Remote: apr.Contains(cli.RemoteParam), - }, dEnv) + }, dEnv, nil) if err != nil { var verr errhand.VerboseError @@ -379,7 +379,7 @@ func createBranch(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPars } } - err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, apr.Contains(cli.ForceFlag)) + err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, apr.Contains(cli.ForceFlag), nil) if err != nil { return HandleVErrAndExitCode(errhand.BuildDError(err.Error()).Build(), usage) } diff --git a/go/cmd/dolt/commands/checkout.go b/go/cmd/dolt/commands/checkout.go index 59a2d3f206..5badb3d5e7 100644 --- a/go/cmd/dolt/commands/checkout.go +++ b/go/cmd/dolt/commands/checkout.go @@ -238,7 +238,7 @@ func checkoutRemoteBranchOrSuggestNew(ctx context.Context, dEnv *env.DoltEnv, na } func checkoutNewBranchFromStartPt(ctx context.Context, dEnv *env.DoltEnv, newBranch, startPt string) errhand.VerboseError { - err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, false) + err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, false, nil) if err != nil { return errhand.BuildDError(err.Error()).Build() } diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 56a2fc1e54..a7d81b508d 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1079,7 +1079,7 @@ func (ddb *DoltDB) GetRefsOfTypeByNomsRoot(ctx context.Context, refTypeFilter ma // NewBranchAtCommit creates a new branch with HEAD at the commit given. Branch names must pass IsValidUserBranchName. // Silently overwrites any existing branch with the same name given, if one exists. -func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, commit *Commit) error { +func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, commit *Commit, replicationStatus *ReplicationStatusController) error { if !IsValidBranchRef(branchRef) { panic(fmt.Sprintf("invalid branch name %s, use IsValidUserBranchName check", branchRef.String())) } @@ -1124,7 +1124,7 @@ func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, } ws = ws.WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot) - return ddb.UpdateWorkingSet(ctx, wsRef, ws, currWsHash, TodoWorkingSetMeta(), nil) + return ddb.UpdateWorkingSet(ctx, wsRef, ws, currWsHash, TodoWorkingSetMeta(), replicationStatus) } // CopyWorkingSet copies a WorkingSetRef from one ref to another. If `force` is @@ -1159,8 +1159,9 @@ func (ddb *DoltDB) CopyWorkingSet(ctx context.Context, fromWSRef ref.WorkingSetR } // DeleteBranch deletes the branch given, returning an error if it doesn't exist. -func (ddb *DoltDB) DeleteBranch(ctx context.Context, branch ref.DoltRef) error { - return ddb.deleteRef(ctx, branch) +func (ddb *DoltDB) DeleteBranch(ctx context.Context, branch ref.DoltRef, replicationStatus *ReplicationStatusController) error { + rsCtx := withReplicaState(ctx, replicationStatus) + return ddb.deleteRef(rsCtx, branch) } func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef) error { diff --git a/go/libraries/doltcore/env/actions/branch.go b/go/libraries/doltcore/env/actions/branch.go index 313bdf7004..0c182cce86 100644 --- a/go/libraries/doltcore/env/actions/branch.go +++ b/go/libraries/doltcore/env/actions/branch.go @@ -31,11 +31,13 @@ var ErrCOBranchDelete = errors.New("attempted to delete checked out branch") var ErrUnmergedBranch = errors.New("branch is not fully merged") var ErrWorkingSetsOnBothBranches = errors.New("checkout would overwrite uncommitted changes on target branch") -func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch string, remoteDbPro env.RemoteDbProvider, force bool) error { +func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch string, remoteDbPro env.RemoteDbProvider, force bool, rsc *doltdb.ReplicationStatusController) error { oldRef := ref.NewBranchRef(oldBranch) newRef := ref.NewBranchRef(newBranch) - err := CopyBranchOnDB(ctx, dbData.Ddb, oldBranch, newBranch, force) + // TODO: This function smears the branch updates across multiple commits of the datas.Database. + + err := CopyBranchOnDB(ctx, dbData.Ddb, oldBranch, newBranch, force, rsc) if err != nil { return err } @@ -66,14 +68,14 @@ func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch s } } - return DeleteBranch(ctx, dbData, oldBranch, DeleteOptions{Force: true}, remoteDbPro) + return DeleteBranch(ctx, dbData, oldBranch, DeleteOptions{Force: true}, remoteDbPro, rsc) } func CopyBranch(ctx context.Context, dEnv *env.DoltEnv, oldBranch, newBranch string, force bool) error { - return CopyBranchOnDB(ctx, dEnv.DoltDB, oldBranch, newBranch, force) + return CopyBranchOnDB(ctx, dEnv.DoltDB, oldBranch, newBranch, force, nil) } -func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranch string, force bool) error { +func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranch string, force bool, rsc *doltdb.ReplicationStatusController) error { oldRef := ref.NewBranchRef(oldBranch) newRef := ref.NewBranchRef(newBranch) @@ -104,7 +106,7 @@ func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranc return err } - return ddb.NewBranchAtCommit(ctx, newRef, cm) + return ddb.NewBranchAtCommit(ctx, newRef, cm, rsc) } type DeleteOptions struct { @@ -112,7 +114,7 @@ type DeleteOptions struct { Remote bool } -func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts DeleteOptions, remoteDbPro env.RemoteDbProvider) error { +func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts DeleteOptions, remoteDbPro env.RemoteDbProvider, rsc *doltdb.ReplicationStatusController) error { var branchRef ref.DoltRef if opts.Remote { var err error @@ -127,10 +129,10 @@ func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts De } } - return DeleteBranchOnDB(ctx, dbData, branchRef, opts, remoteDbPro) + return DeleteBranchOnDB(ctx, dbData, branchRef, opts, remoteDbPro, rsc) } -func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.DoltRef, opts DeleteOptions, pro env.RemoteDbProvider) error { +func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.DoltRef, opts DeleteOptions, pro env.RemoteDbProvider, rsc *doltdb.ReplicationStatusController) error { ddb := dbdata.Ddb hasRef, err := ddb.HasRef(ctx, branchRef) @@ -173,7 +175,7 @@ func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.Dolt } } - return ddb.DeleteBranch(ctx, branchRef) + return ddb.DeleteBranch(ctx, branchRef, rsc) } // validateBranchMergedIntoCurrentWorkingBranch returns an error if the given branch is not fully merged into the HEAD of the current branch. @@ -267,8 +269,8 @@ func validateBranchMergedIntoUpstream(ctx context.Context, dbdata env.DbData, br return nil } -func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch, startPt string, force bool) error { - err := createBranch(ctx, dbData, newBranch, startPt, force) +func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch, startPt string, force bool, rsc *doltdb.ReplicationStatusController) error { + err := createBranch(ctx, dbData, newBranch, startPt, force, rsc) if err != nil { if err == ErrAlreadyExists { @@ -289,7 +291,7 @@ func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch, return nil } -func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, startingPoint string, force bool, headRef ref.DoltRef) error { +func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, startingPoint string, force bool, headRef ref.DoltRef, rsc *doltdb.ReplicationStatusController) error { branchRef := ref.NewBranchRef(newBranch) hasRef, err := ddb.HasRef(ctx, branchRef) if err != nil { @@ -314,7 +316,7 @@ func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, starti return err } - err = ddb.NewBranchAtCommit(ctx, branchRef, cm) + err = ddb.NewBranchAtCommit(ctx, branchRef, cm, rsc) if err != nil { return err } @@ -322,8 +324,8 @@ func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, starti return nil } -func createBranch(ctx context.Context, dbData env.DbData, newBranch, startingPoint string, force bool) error { - return CreateBranchOnDB(ctx, dbData.Ddb, newBranch, startingPoint, force, dbData.Rsr.CWBHeadRef()) +func createBranch(ctx context.Context, dbData env.DbData, newBranch, startingPoint string, force bool, rsc *doltdb.ReplicationStatusController) error { + return CreateBranchOnDB(ctx, dbData.Ddb, newBranch, startingPoint, force, dbData.Rsr.CWBHeadRef(), rsc) } var emptyHash = hash.Hash{} diff --git a/go/libraries/doltcore/env/actions/clone.go b/go/libraries/doltcore/env/actions/clone.go index a6907d1c60..779a4027b5 100644 --- a/go/libraries/doltcore/env/actions/clone.go +++ b/go/libraries/doltcore/env/actions/clone.go @@ -230,7 +230,7 @@ func CloneRemote(ctx context.Context, srcDB *doltdb.DoltDB, remoteName, branch s } if brnch.GetPath() != branch { - err := dEnv.DoltDB.DeleteBranch(ctx, brnch) + err := dEnv.DoltDB.DeleteBranch(ctx, brnch, nil) if err != nil { return fmt.Errorf("%w: %s; %s", ErrFailedToDeleteBranch, brnch.String(), err.Error()) } diff --git a/go/libraries/doltcore/env/actions/remotes.go b/go/libraries/doltcore/env/actions/remotes.go index 21dab4f914..179e8efeef 100644 --- a/go/libraries/doltcore/env/actions/remotes.go +++ b/go/libraries/doltcore/env/actions/remotes.go @@ -220,14 +220,14 @@ func DeleteRemoteBranch(ctx context.Context, targetRef ref.BranchRef, remoteRef } if hasRef { - err = remoteDB.DeleteBranch(ctx, targetRef) + err = remoteDB.DeleteBranch(ctx, targetRef, nil) } if err != nil { return err } - err = localDB.DeleteBranch(ctx, remoteRef) + err = localDB.DeleteBranch(ctx, remoteRef, nil) if err != nil { return err diff --git a/go/libraries/doltcore/env/environment.go b/go/libraries/doltcore/env/environment.go index 4eca2b6a9a..3fb3a4e562 100644 --- a/go/libraries/doltcore/env/environment.go +++ b/go/libraries/doltcore/env/environment.go @@ -911,7 +911,7 @@ func (dEnv *DoltEnv) RemoveRemote(ctx context.Context, name string) error { rr := r.(ref.RemoteRef) if rr.GetRemote() == remote.Name { - err = ddb.DeleteBranch(ctx, rr) + err = ddb.DeleteBranch(ctx, rr, nil) if err != nil { return fmt.Errorf("%w; failed to delete remote tracking ref '%s'; %s", ErrFailedToDeleteRemote, rr.String(), err.Error()) diff --git a/go/libraries/doltcore/migrate/progress.go b/go/libraries/doltcore/migrate/progress.go index 26592dee8c..7d9656bc57 100644 --- a/go/libraries/doltcore/migrate/progress.go +++ b/go/libraries/doltcore/migrate/progress.go @@ -182,7 +182,7 @@ func persistMigratedCommitMapping(ctx context.Context, ddb *doltdb.DoltDB, mappi } br := ref.NewBranchRef(MigratedCommitsBranch) - err = ddb.NewBranchAtCommit(ctx, br, init) + err = ddb.NewBranchAtCommit(ctx, br, init, nil) if err != nil { return err } diff --git a/go/libraries/doltcore/rebase/rebase.go b/go/libraries/doltcore/rebase/rebase.go index 5ade41afba..3cad66c404 100644 --- a/go/libraries/doltcore/rebase/rebase.go +++ b/go/libraries/doltcore/rebase/rebase.go @@ -152,7 +152,7 @@ func rebaseRefs(ctx context.Context, dbData env.DbData, replay ReplayCommitFn, n for i, r := range refs { switch dRef := r.(type) { case ref.BranchRef: - err = ddb.NewBranchAtCommit(ctx, dRef, newHeads[i]) + err = ddb.NewBranchAtCommit(ctx, dRef, newHeads[i], nil) case ref.TagRef: // rewrite tag with new commit diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_branch.go b/go/libraries/doltcore/sqle/dprocedures/dolt_branch.go index 3d0c7f2776..5002f8d872 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_branch.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_branch.go @@ -64,30 +64,36 @@ func doDoltBranch(ctx *sql.Context, args []string) (int, error) { return 1, fmt.Errorf("Could not load database %s", dbName) } + var rsc doltdb.ReplicationStatusController + switch { case apr.Contains(cli.CopyFlag): - err = copyBranch(ctx, dbData, apr) + err = copyBranch(ctx, dbData, apr, &rsc) case apr.Contains(cli.MoveFlag): - err = renameBranch(ctx, dbData, apr, dSess, dbName) + err = renameBranch(ctx, dbData, apr, dSess, dbName, &rsc) case apr.Contains(cli.DeleteFlag), apr.Contains(cli.DeleteForceFlag): - err = deleteBranches(ctx, dbData, apr, dSess, dbName) + err = deleteBranches(ctx, dbData, apr, dSess, dbName, &rsc) default: - err = createNewBranch(ctx, dbData, apr) + err = createNewBranch(ctx, dbData, apr, &rsc) } if err != nil { return 1, err } else { - return 0, commitTransaction(ctx, dSess) + return 0, commitTransaction(ctx, dSess, &rsc) } } -func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession) error { +func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession, rsc *doltdb.ReplicationStatusController) error { err := dSess.CommitTransaction(ctx, ctx.GetTransaction()) if err != nil { return err } + if rsc != nil { + dsess.WaitForReplicationController(ctx, *rsc) + } + // Because this transaction manipulation is happening outside the engine's awareness, we need to set it to nil here // to get a fresh transaction started on the next statement. // TODO: put this under engine control @@ -97,7 +103,7 @@ func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession) error { // renameBranch takes DoltSession and database name to try accessing file system for dolt database. // If the oldBranch being renamed is the current branch on CLI, then RepoState head will be updated with the newBranch ref. -func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string) error { +func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() != 2 { return InvalidArgErr } @@ -124,7 +130,7 @@ func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseRe return err } - err := actions.RenameBranch(ctx, dbData, oldBranchName, newBranchName, sess.Provider(), force) + err := actions.RenameBranch(ctx, dbData, oldBranchName, newBranchName, sess.Provider(), force, rsc) if err != nil { return err } @@ -150,7 +156,7 @@ func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseRe // deleteBranches takes DoltSession and database name to try accessing file system for dolt database. // If the database is not session state db and the branch being deleted is the current branch on CLI, it will update // the RepoState to set head as empty branchRef. -func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string) error { +func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() == 0 { return InvalidArgErr } @@ -194,7 +200,7 @@ func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParse err = actions.DeleteBranch(ctx, dbData, branchName, actions.DeleteOptions{ Force: force, - }, dSess.Provider()) + }, dSess.Provider(), rsc) if err != nil { return err } @@ -274,7 +280,7 @@ func loadConfig(ctx *sql.Context) *env.DoltCliConfig { return dEnv.Config } -func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults) error { +func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() == 0 || apr.NArg() > 2 { return InvalidArgErr } @@ -332,7 +338,7 @@ func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgPars return err } - err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, startPt, apr.Contains(cli.ForceFlag)) + err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, startPt, apr.Contains(cli.ForceFlag), rsc) if err != nil { return err } @@ -348,7 +354,7 @@ func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgPars return nil } -func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults) error { +func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() != 2 { return InvalidArgErr } @@ -364,10 +370,10 @@ func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResu } force := apr.Contains(cli.ForceFlag) - return copyABranch(ctx, dbData, srcBr, destBr, force) + return copyABranch(ctx, dbData, srcBr, destBr, force, rsc) } -func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr string, force bool) error { +func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr string, force bool, rsc *doltdb.ReplicationStatusController) error { if err := branch_control.CanCreateBranch(ctx, destBr); err != nil { return err } @@ -378,7 +384,7 @@ func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr strin return err } } - err := actions.CopyBranchOnDB(ctx, dbData.Ddb, srcBr, destBr, force) + err := actions.CopyBranchOnDB(ctx, dbData.Ddb, srcBr, destBr, force, rsc) if err != nil { if err == doltdb.ErrBranchNotFound { return errors.New(fmt.Sprintf("fatal: A branch named '%s' not found", srcBr)) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go b/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go index 5ab0417340..4e851e18fc 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go @@ -71,9 +71,11 @@ func doDoltCheckout(ctx *sql.Context, args []string) (int, error) { return 1, fmt.Errorf("Could not load database %s", currentDbName) } + var rsc doltdb.ReplicationStatusController + // Checking out new branch. if branchOrTrack { - err = checkoutNewBranch(ctx, dbName, dbData, apr) + err = checkoutNewBranch(ctx, dbName, dbData, apr, &rsc) if err != nil { return 1, err } else { @@ -121,13 +123,15 @@ func doDoltCheckout(ctx *sql.Context, args []string) (int, error) { err = checkoutTables(ctx, roots, dbName, args) if err != nil && apr.NArg() == 1 { - err = checkoutRemoteBranch(ctx, dbName, dbData, branchName, apr) + err = checkoutRemoteBranch(ctx, dbName, dbData, branchName, apr, &rsc) } if err != nil { return 1, err } + dsess.WaitForReplicationController(ctx, rsc) + return 0, nil } @@ -196,7 +200,7 @@ func getRevisionForRevisionDatabase(ctx *sql.Context, dbName string) (string, st // checkoutRemoteBranch checks out a remote branch creating a new local branch with the same name as the remote branch // and set its upstream. The upstream persists out of sql session. -func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, branchName string, apr *argparser.ArgParseResults) error { +func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, branchName string, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { remoteRefs, err := actions.GetRemoteBranchRef(ctx, dbData.Ddb, branchName) if err != nil { return errors.New("fatal: unable to read from data repository") @@ -206,7 +210,7 @@ func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, br return fmt.Errorf("error: could not find %s", branchName) } else if len(remoteRefs) == 1 { remoteRef := remoteRefs[0] - err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, remoteRef.String(), false) + err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, remoteRef.String(), false, rsc) if err != nil { return err } @@ -226,7 +230,7 @@ func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, br } } -func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr *argparser.ArgParseResults) error { +func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { var newBranchName string var remoteName, remoteBranchName string var startPt = "head" @@ -259,7 +263,7 @@ func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr * newBranchName = newBranch } - err = actions.CreateBranchWithStartPt(ctx, dbData, newBranchName, startPt, false) + err = actions.CreateBranchWithStartPt(ctx, dbData, newBranchName, startPt, false, rsc) if err != nil { return err } diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_remote.go b/go/libraries/doltcore/sqle/dprocedures/dolt_remote.go index aa1b59840d..2e6d32451e 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_remote.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_remote.go @@ -24,6 +24,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/errhand" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/ref" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" @@ -65,11 +66,13 @@ func doDoltRemote(ctx *sql.Context, args []string) (int, error) { return 1, fmt.Errorf("error: invalid argument, use 'dolt_remotes' system table to list remotes") } + var rsc doltdb.ReplicationStatusController + switch apr.Arg(0) { case "add": err = addRemote(ctx, dbName, dbData, apr, dSess) case "remove", "rm": - err = removeRemote(ctx, dbData, apr) + err = removeRemote(ctx, dbData, apr, &rsc) default: err = fmt.Errorf("error: invalid argument") } @@ -77,6 +80,9 @@ func doDoltRemote(ctx *sql.Context, args []string) (int, error) { if err != nil { return 1, err } + + dsess.WaitForReplicationController(ctx, rsc) + return 0, nil } @@ -106,7 +112,7 @@ func addRemote(_ *sql.Context, dbName string, dbd env.DbData, apr *argparser.Arg return dbd.Rsw.AddRemote(r) } -func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResults) error { +func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() != 2 { return fmt.Errorf("error: invalid argument") } @@ -133,7 +139,7 @@ func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResul rr := r.(ref.RemoteRef) if rr.GetRemote() == remote.Name { - err = ddb.DeleteBranch(ctx, rr) + err = ddb.DeleteBranch(ctx, rr, rsc) if err != nil { return fmt.Errorf("%w; failed to delete remote tracking ref '%s'; %s", env.ErrFailedToDeleteRemote, rr.String(), err.Error()) diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index 3234ec9e5a..eb9159db85 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -219,7 +219,7 @@ func doltCommit(ctx *sql.Context, var rsc doltdb.ReplicationStatusController newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), &rsc) - waitForReplicationController(ctx, rsc) + WaitForReplicationController(ctx, rsc) return workingSet, newCommit, err } @@ -232,7 +232,7 @@ func txCommit(ctx *sql.Context, ) (*doltdb.WorkingSet, *doltdb.Commit, error) { var rsc doltdb.ReplicationStatusController err := tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), &rsc) - waitForReplicationController(ctx, rsc) + WaitForReplicationController(ctx, rsc) return workingSet, nil, err } @@ -241,7 +241,7 @@ func (tx *DoltTransaction) DoltCommit(ctx *sql.Context, workingSet *doltdb.Worki return tx.doCommit(ctx, workingSet, commit, doltCommit) } -func waitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatusController) { +func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatusController) { if len(rsc.Wait) == 0 { return } diff --git a/go/libraries/doltcore/sqle/read_replica_database.go b/go/libraries/doltcore/sqle/read_replica_database.go index beb4ae50fd..1cb591e3ef 100644 --- a/go/libraries/doltcore/sqle/read_replica_database.go +++ b/go/libraries/doltcore/sqle/read_replica_database.go @@ -243,7 +243,7 @@ func (rrd ReadReplicaDatabase) CreateLocalBranchFromRemote(ctx *sql.Context, bra } // create refs/heads/branch dataset - err = rrd.ddb.NewBranchAtCommit(ctx, branchRef, cm) + err = rrd.ddb.NewBranchAtCommit(ctx, branchRef, cm, nil) if err != nil { return nil, err } @@ -458,7 +458,7 @@ func (rrd ReadReplicaDatabase) createNewBranchFromRemote(ctx *sql.Context, remot return err } - err = rrd.ddb.NewBranchAtCommit(ctx, remoteRef.Ref, cm) + err = rrd.ddb.NewBranchAtCommit(ctx, remoteRef.Ref, cm, nil) err = rrd.ddb.SetHead(ctx, trackingRef, remoteRef.Hash) if err != nil { return err @@ -534,7 +534,7 @@ func refsToDelete(remRefs, localRefs []doltdb.RefWithHash) []doltdb.RefWithHash func deleteBranches(ctx *sql.Context, rrd ReadReplicaDatabase, branches []doltdb.RefWithHash) error { for _, b := range branches { - err := rrd.ddb.DeleteBranch(ctx, b.Ref) + err := rrd.ddb.DeleteBranch(ctx, b.Ref, nil) if errors.Is(err, doltdb.ErrBranchNotFound) { continue } else if err != nil { From db9dce8e5043fd6aad5eccc4f61bac3b4e293b53 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 15 May 2023 10:51:11 -0700 Subject: [PATCH 07/13] go: sqle: dsess/transactions: Turn replication failures into session warnings if dolt_cluster_ack_writes_timeout_secs is on. --- .../doltcore/sqle/dsess/transactions.go | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index eb9159db85..077c6c0fe2 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -24,6 +24,7 @@ import ( "github.com/dolthub/go-mysql-server/sql" "github.com/sirupsen/logrus" + "github.com/dolthub/vitess/go/mysql" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" @@ -255,14 +256,17 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus } cCtx, cancel := context.WithCancel(ctx) - defer cancel() var wg sync.WaitGroup wg.Add(len(rsc.Wait)) - for _, f := range rsc.Wait { + for i, f := range rsc.Wait { f := f - go func() error { + i := i + go func() { defer wg.Done() - return f(cCtx) + err := f(cCtx) + if err == nil { + rsc.Wait[i] = nil + } }() } @@ -274,9 +278,28 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus select { case <-time.After(time.Duration(timeoutI) * time.Second): - // TODO: Error, warning, something... + // We timed out before all the waiters were done. + // First we make certain to finalize everything. + cancel() + <-done case <-done: + cancel() } + + // Just because our waiters all completed does not mean they all + // returned nil errors. Any non-nil entries in rsc.Wait returned an + // error. We turn those into warnings here. + numFailed := 0 + for _, f := range rsc.Wait { + if f != nil { + numFailed += 1 + } + } + ctx.Session.Warn(&sql.Warning{ + Level: "Warning", + Code: mysql.ERQueryTimeout, + Message: fmt.Sprintf("Timed out replication of commit to %d out of %d replicas.", numFailed, len(rsc.Wait)), + }) } // doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit From a886ab276a828cab62124d519fa9696339a77048 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 15 May 2023 13:56:50 -0700 Subject: [PATCH 08/13] go: sqle: cluster: Add a circuit breaker for dolt_cluster_ack_writes_timeout_secs. Once a replica fails the timeout, do not block on it going forward. --- go/libraries/doltcore/doltdb/doltdb.go | 7 ++++ go/libraries/doltcore/doltdb/hooksdatabase.go | 10 +++++ .../doltcore/sqle/cluster/commithook.go | 39 ++++++++++++++----- .../doltcore/sqle/dsess/transactions.go | 7 +++- 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index a7d81b508d..a95be4e82d 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1222,6 +1222,13 @@ type ReplicationStatusController struct { // associated with a commithook to complete. Must return if the // associated Context is canceled. Wait []func(ctx context.Context) error + + // There is an entry here for each function in Wait. If a Wait fails, + // you can notify the corresponding function in this slice. This might + // control resiliency behaviors like adaptive retry and timeouts, + // circuit breakers, etc. and might feed into exposed replication + // metrics. + NotifyWaitFailed []func() } // UpdateWorkingSet updates the working set with the ref given to the root value given diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 96bf911e3e..464bf48942 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -81,6 +81,7 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset rsc := getReplicaState(ctx) if rsc != nil { rsc.Wait = make([]func(context.Context) error, len(db.postCommitHooks)) + rsc.NotifyWaitFailed = make([]func(), len(db.postCommitHooks)) } for il, hook := range db.postCommitHooks { if !onlyWS || hook.ExecuteForWorkingSets() { @@ -95,6 +96,13 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset } if rsc != nil { rsc.Wait[i] = f + if nf, ok := hook.(interface{ + NotifyWaitFailed() + }); ok { + rsc.NotifyWaitFailed[i] = nf.NotifyWaitFailed + } else { + rsc.NotifyWaitFailed[i] = func() {} + } } }() } @@ -105,10 +113,12 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset for i := range rsc.Wait { if rsc.Wait[i] != nil { rsc.Wait[j] = rsc.Wait[i] + rsc.NotifyWaitFailed[j] = rsc.NotifyWaitFailed[i] j++ } } rsc.Wait = rsc.Wait[:j] + rsc.NotifyWaitFailed = rsc.NotifyWaitFailed[:j] } } diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index f49ce7123d..475a9fc180 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -61,6 +61,12 @@ type commithook struct { // 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then. successChs []chan struct{} + // If this is true, the waitF returned by Execute() will fast fail if + // we are not already caught up, instead of blocking on a successCh + // actually indicated we are caught up. This is set to by a call to + // NotifyWaitFailed(), an optional interface on CommitHook. + circuitBreakerOpen bool + role Role // The standby replica to which the new root gets replicated. @@ -157,6 +163,7 @@ func (h *commithook) replicate(ctx context.Context) { } h.successChs = nil } + h.circuitBreakerOpen = false h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") } @@ -406,22 +413,34 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat } var waitF func(context.Context) error if !h.isCaughtUp() { - if len(h.successChs) == 0 { - h.successChs = append(h.successChs, make(chan struct{})) - } - successCh := h.successChs[0] - waitF = func(ctx context.Context) error { - select { - case <-successCh: - return nil - case <-ctx.Done(): - return ctx.Err() + if h.circuitBreakerOpen { + waitF = func(ctx context.Context) error { + return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname) + } + } else { + if len(h.successChs) == 0 { + h.successChs = append(h.successChs, make(chan struct{})) + } + successCh := h.successChs[0] + waitF = func(ctx context.Context) error { + select { + case <-successCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } } } } return waitF, nil } +func (h *commithook) NotifyWaitFailed() { + h.mu.Lock() + defer h.mu.Unlock() + h.circuitBreakerOpen = true +} + func (h *commithook) HandleError(ctx context.Context, err error) error { return nil } diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index 077c6c0fe2..e259ed29eb 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -276,12 +276,14 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus close(done) }() + waitFailed := false select { case <-time.After(time.Duration(timeoutI) * time.Second): // We timed out before all the waiters were done. // First we make certain to finalize everything. cancel() <-done + waitFailed = true case <-done: cancel() } @@ -290,9 +292,12 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus // returned nil errors. Any non-nil entries in rsc.Wait returned an // error. We turn those into warnings here. numFailed := 0 - for _, f := range rsc.Wait { + for i, f := range rsc.Wait { if f != nil { numFailed += 1 + if waitFailed { + rsc.NotifyWaitFailed[i]() + } } } ctx.Session.Warn(&sql.Warning{ From 09eb9aaed57739e07cffbf9bc30c3ecb8b15f649 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 15 May 2023 13:58:14 -0700 Subject: [PATCH 09/13] go: sqle: cluster: Only close successChs when we are actually caught up. --- go/libraries/doltcore/sqle/cluster/commithook.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 475a9fc180..1889257f4e 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -157,13 +157,13 @@ func (h *commithook) replicate(ctx context.Context) { if h.waitNotify != nil { h.waitNotify() } - if len(h.successChs) != 0 { + if len(h.successChs) != 0 && h.isCaughtUp() { for _, ch := range h.successChs { close(ch) } h.successChs = nil + h.circuitBreakerOpen = false } - h.circuitBreakerOpen = false h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") } From a5358ff02dfb4c26fe6c220068c560c78ebde252 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 15 May 2023 14:17:21 -0700 Subject: [PATCH 10/13] go: Catch some stragglers in _test files for an interface change. --- go/libraries/doltcore/dtestutils/testcommands/multienv.go | 2 +- go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go | 2 +- go/libraries/doltcore/merge/merge_test.go | 2 +- go/libraries/doltcore/sqle/sqlselect_test.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/libraries/doltcore/dtestutils/testcommands/multienv.go b/go/libraries/doltcore/dtestutils/testcommands/multienv.go index 1a02907169..f669255be3 100644 --- a/go/libraries/doltcore/dtestutils/testcommands/multienv.go +++ b/go/libraries/doltcore/dtestutils/testcommands/multienv.go @@ -156,7 +156,7 @@ func (mr *MultiRepoTestSetup) NewRemote(remoteName string) { func (mr *MultiRepoTestSetup) NewBranch(dbName, branchName string) { dEnv := mr.envs[dbName] - err := actions.CreateBranchWithStartPt(context.Background(), dEnv.DbData(), branchName, "head", false) + err := actions.CreateBranchWithStartPt(context.Background(), dEnv.DbData(), branchName, "head", false, nil) if err != nil { mr.Errhand(err) } diff --git a/go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go b/go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go index 8f79fefc3c..6188662261 100644 --- a/go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go +++ b/go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go @@ -86,7 +86,7 @@ func TestGetDotDotRevisions(t *testing.T) { // Create a feature branch. bref := ref.NewBranchRef("feature") - err = dEnv.DoltDB.NewBranchAtCommit(context.Background(), bref, mainCommits[5]) + err = dEnv.DoltDB.NewBranchAtCommit(context.Background(), bref, mainCommits[5], nil) require.NoError(t, err) // Create 3 commits on feature branch. diff --git a/go/libraries/doltcore/merge/merge_test.go b/go/libraries/doltcore/merge/merge_test.go index 2b5fa3acdf..c1e54c6749 100644 --- a/go/libraries/doltcore/merge/merge_test.go +++ b/go/libraries/doltcore/merge/merge_test.go @@ -743,7 +743,7 @@ func buildLeftRightAncCommitsAndBranches(t *testing.T, ddb *doltdb.DoltDB, rootT commit, err := ddb.Commit(context.Background(), hash, ref.NewBranchRef(env.DefaultInitBranch), meta) require.NoError(t, err) - err = ddb.NewBranchAtCommit(context.Background(), ref.NewBranchRef("to-merge"), initialCommit) + err = ddb.NewBranchAtCommit(context.Background(), ref.NewBranchRef("to-merge"), initialCommit, nil) require.NoError(t, err) mergeCommit, err := ddb.Commit(context.Background(), mergeHash, ref.NewBranchRef("to-merge"), meta) require.NoError(t, err) diff --git a/go/libraries/doltcore/sqle/sqlselect_test.go b/go/libraries/doltcore/sqle/sqlselect_test.go index ff3caee88a..88caf6d0b6 100644 --- a/go/libraries/doltcore/sqle/sqlselect_test.go +++ b/go/libraries/doltcore/sqle/sqlselect_test.go @@ -1675,7 +1675,7 @@ func processNode(t *testing.T, ctx context.Context, dEnv *env.DoltEnv, node Hist require.NoError(t, err) if !ok { - err = dEnv.DoltDB.NewBranchAtCommit(ctx, branchRef, parent) + err = dEnv.DoltDB.NewBranchAtCommit(ctx, branchRef, parent, nil) require.NoError(t, err) } From 05125426dc48da639d26ddc4f1dd6d80bfda7a4f Mon Sep 17 00:00:00 2001 From: reltuk Date: Mon, 15 May 2023 21:26:49 +0000 Subject: [PATCH 11/13] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/doltdb/hooksdatabase.go | 2 +- go/libraries/doltcore/sqle/dsess/transactions.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 464bf48942..2b60c5dea5 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -96,7 +96,7 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset } if rsc != nil { rsc.Wait[i] = f - if nf, ok := hook.(interface{ + if nf, ok := hook.(interface { NotifyWaitFailed() }); ok { rsc.NotifyWaitFailed[i] = nf.NotifyWaitFailed diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index e259ed29eb..9be838ed59 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -23,8 +23,8 @@ import ( "time" "github.com/dolthub/go-mysql-server/sql" - "github.com/sirupsen/logrus" "github.com/dolthub/vitess/go/mysql" + "github.com/sirupsen/logrus" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" From 2b159425f59a3b05e16d7ed0da5dec81364d452d Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 16 May 2023 10:12:36 -0700 Subject: [PATCH 12/13] go: doltdb: hooksdatabase: Pass replication status controller as a field on the pass-by-value struct, instead of through the context. --- go/libraries/doltcore/doltdb/doltdb.go | 30 +++++------ go/libraries/doltcore/doltdb/hooksdatabase.go | 52 ++++++++----------- 2 files changed, 36 insertions(+), 46 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index a95be4e82d..4722582ab7 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1160,11 +1160,10 @@ func (ddb *DoltDB) CopyWorkingSet(ctx context.Context, fromWSRef ref.WorkingSetR // DeleteBranch deletes the branch given, returning an error if it doesn't exist. func (ddb *DoltDB) DeleteBranch(ctx context.Context, branch ref.DoltRef, replicationStatus *ReplicationStatusController) error { - rsCtx := withReplicaState(ctx, replicationStatus) - return ddb.deleteRef(rsCtx, branch) + return ddb.deleteRef(ctx, branch, replicationStatus) } -func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef) error { +func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef, replicationStatus *ReplicationStatusController) error { ds, err := ddb.db.GetDataset(ctx, dref.String()) if err != nil { @@ -1185,7 +1184,7 @@ func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef) error { } } - _, err = ddb.db.Delete(ctx, ds) + _, err = ddb.db.withReplicationStatusController(replicationStatus).Delete(ctx, ds) return err } @@ -1251,8 +1250,7 @@ func (ddb *DoltDB) UpdateWorkingSet( return err } - ctx = withReplicaState(ctx, replicationStatus) - _, err = ddb.db.UpdateWorkingSet(ctx, ds, datas.WorkingSetSpec{ + _, err = ddb.db.withReplicationStatusController(replicationStatus).UpdateWorkingSet(ctx, ds, datas.WorkingSetSpec{ Meta: meta, WorkingRoot: workingRootRef, StagedRoot: stagedRef, @@ -1289,13 +1287,13 @@ func (ddb *DoltDB) CommitWithWorkingSet( return nil, err } - rsCtx := withReplicaState(ctx, replicationStatus) - commitDataset, _, err := ddb.db.CommitWithWorkingSet(rsCtx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{ - Meta: meta, - WorkingRoot: workingRootRef, - StagedRoot: stagedRef, - MergeState: mergeState, - }, prevHash, commit.CommitOptions) + commitDataset, _, err := ddb.db.withReplicationStatusController(replicationStatus). + CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{ + Meta: meta, + WorkingRoot: workingRootRef, + StagedRoot: stagedRef, + MergeState: mergeState, + }, prevHash, commit.CommitOptions) if err != nil { return nil, err @@ -1329,7 +1327,7 @@ func (ddb *DoltDB) DeleteWorkingSet(ctx context.Context, workingSetRef ref.Worki } func (ddb *DoltDB) DeleteTag(ctx context.Context, tag ref.DoltRef) error { - err := ddb.deleteRef(ctx, tag) + err := ddb.deleteRef(ctx, tag, nil) if err == ErrBranchNotFound { return ErrTagNotFound @@ -1356,7 +1354,7 @@ func (ddb *DoltDB) NewWorkspaceAtCommit(ctx context.Context, workRef ref.DoltRef } func (ddb *DoltDB) DeleteWorkspace(ctx context.Context, workRef ref.DoltRef) error { - err := ddb.deleteRef(ctx, workRef) + err := ddb.deleteRef(ctx, workRef, nil) if err == ErrBranchNotFound { return ErrWorkspaceNotFound @@ -1671,7 +1669,7 @@ func (ddb *DoltDB) RemoveStashAtIdx(ctx context.Context, idx int) error { // RemoveAllStashes removes the stash list Dataset from the database, // which equivalent to removing Stash entries from the stash list. func (ddb *DoltDB) RemoveAllStashes(ctx context.Context) error { - err := ddb.deleteRef(ctx, ref.NewStashRef()) + err := ddb.deleteRef(ctx, ref.NewStashRef(), nil) if err == ErrBranchNotFound { return nil } diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 2b60c5dea5..9565485ac9 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -17,35 +17,17 @@ package doltdb import ( "context" "io" + "sync" "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" - - "sync" ) -type replicaStateContextKey struct { -} - -func withReplicaState(ctx context.Context, c *ReplicationStatusController) context.Context { - if c != nil { - return context.WithValue(ctx, replicaStateContextKey{}, c) - } - return ctx -} - -func getReplicaState(ctx context.Context) *ReplicationStatusController { - v := ctx.Value(replicaStateContextKey{}) - if v == nil { - return nil - } - return v.(*ReplicationStatusController) -} - type hooksDatabase struct { datas.Database postCommitHooks []CommitHook + rsc *ReplicationStatusController } // CommitHook is an abstraction for executing arbitrary commands after atomic database commits @@ -61,7 +43,8 @@ type CommitHook interface { } func (db hooksDatabase) SetCommitHooks(ctx context.Context, postHooks []CommitHook) hooksDatabase { - db.postCommitHooks = postHooks + db.postCommitHooks = make([]CommitHook, len(postHooks)) + copy(db.postCommitHooks, postHooks) return db } @@ -72,16 +55,25 @@ func (db hooksDatabase) SetCommitHookLogger(ctx context.Context, wr io.Writer) h return db } +func (db hooksDatabase) withReplicationStatusController(rsc *ReplicationStatusController) hooksDatabase { + db.rsc = rsc + return db +} + func (db hooksDatabase) PostCommitHooks() []CommitHook { - return db.postCommitHooks + toret := make([]CommitHook, len(db.postCommitHooks)) + copy(toret, db.postCommitHooks) + return toret } func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset, onlyWS bool) { var wg sync.WaitGroup - rsc := getReplicaState(ctx) + rsc := db.rsc + var ioff int if rsc != nil { - rsc.Wait = make([]func(context.Context) error, len(db.postCommitHooks)) - rsc.NotifyWaitFailed = make([]func(), len(db.postCommitHooks)) + ioff = len(rsc.Wait) + rsc.Wait = append(rsc.Wait, make([]func(context.Context) error, len(db.postCommitHooks))...) + rsc.NotifyWaitFailed = append(rsc.NotifyWaitFailed, make([]func(), len(db.postCommitHooks))...) } for il, hook := range db.postCommitHooks { if !onlyWS || hook.ExecuteForWorkingSets() { @@ -95,13 +87,13 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset hook.HandleError(ctx, err) } if rsc != nil { - rsc.Wait[i] = f + rsc.Wait[i+ioff] = f if nf, ok := hook.(interface { NotifyWaitFailed() }); ok { - rsc.NotifyWaitFailed[i] = nf.NotifyWaitFailed + rsc.NotifyWaitFailed[i+ioff] = nf.NotifyWaitFailed } else { - rsc.NotifyWaitFailed[i] = func() {} + rsc.NotifyWaitFailed[i+ioff] = func() {} } } }() @@ -109,8 +101,8 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset } wg.Wait() if rsc != nil { - j := 0 - for i := range rsc.Wait { + j := ioff + for i := ioff; i < len(rsc.Wait); i++ { if rsc.Wait[i] != nil { rsc.Wait[j] = rsc.Wait[i] rsc.NotifyWaitFailed[j] = rsc.NotifyWaitFailed[i] From 8bbb49b79efdb159618a7656ae794deae4b1f466 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 16 May 2023 15:42:14 -0700 Subject: [PATCH 13/13] go: doltdb,sqle/cluster: Some PR feedback. Small cleanups. --- go/libraries/doltcore/doltdb/hooksdatabase.go | 11 ++++++++--- go/libraries/doltcore/sqle/cluster/commithook.go | 11 ++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 9565485ac9..8b14765ca1 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -42,6 +42,13 @@ type CommitHook interface { ExecuteForWorkingSets() bool } +// If a commit hook supports this interface, it can be notified if waiting for +// replication in the callback returned by |Execute| failed to complete in time +// or returned an error. +type NotifyWaitFailedCommitHook interface { + NotifyWaitFailed() +} + func (db hooksDatabase) SetCommitHooks(ctx context.Context, postHooks []CommitHook) hooksDatabase { db.postCommitHooks = make([]CommitHook, len(postHooks)) copy(db.postCommitHooks, postHooks) @@ -88,9 +95,7 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset } if rsc != nil { rsc.Wait[i+ioff] = f - if nf, ok := hook.(interface { - NotifyWaitFailed() - }); ok { + if nf, ok := hook.(NotifyWaitFailedCommitHook); ok { rsc.NotifyWaitFailed[i+ioff] = nf.NotifyWaitFailed } else { rsc.NotifyWaitFailed[i+ioff] = func() {} diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 1889257f4e..2e3d822ed2 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -32,6 +32,7 @@ import ( ) var _ doltdb.CommitHook = (*commithook)(nil) +var _ doltdb.NotifyWaitFailedCommitHook = (*commithook)(nil) type commithook struct { rootLgr *logrus.Entry @@ -56,7 +57,7 @@ type commithook struct { // This is a slice of notification channels maintained by the // commithook. The semantics are: // 1. All accesses to |successChs| must happen with |mu| held. - // 2. There maybe be |0| or more channels in the slice. + // 2. There may be |0| or more channels in the slice. // 3. As a reader, if |successChs| is non-empty, you should just read a value, for example, |successChs[0]| and use it. All entries will be closed at the same time. If |successChs| is empty when you need a channel, you should add one to it. // 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then. successChs []chan struct{} @@ -65,7 +66,7 @@ type commithook struct { // we are not already caught up, instead of blocking on a successCh // actually indicated we are caught up. This is set to by a call to // NotifyWaitFailed(), an optional interface on CommitHook. - circuitBreakerOpen bool + fastFailReplicationWait bool role Role @@ -162,7 +163,7 @@ func (h *commithook) replicate(ctx context.Context) { close(ch) } h.successChs = nil - h.circuitBreakerOpen = false + h.fastFailReplicationWait = false } h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") @@ -413,7 +414,7 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat } var waitF func(context.Context) error if !h.isCaughtUp() { - if h.circuitBreakerOpen { + if h.fastFailReplicationWait { waitF = func(ctx context.Context) error { return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname) } @@ -438,7 +439,7 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat func (h *commithook) NotifyWaitFailed() { h.mu.Lock() defer h.mu.Unlock() - h.circuitBreakerOpen = true + h.fastFailReplicationWait = true } func (h *commithook) HandleError(ctx context.Context, err error) error {