diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 9984d8a8d8..cd354afc44 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -114,6 +114,7 @@ func NewSqlEngine( config.ClusterController.RegisterStoredProcedures(pro) pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook) + config.ClusterController.ManageDatabaseProvider(pro) // Load in privileges from file, if it exists persister := mysql_file_handler.NewPersister(config.PrivFilePath, config.DoltCfgDirPath) diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 0f2eabc84f..34539696f7 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -266,11 +266,18 @@ func Serve( clusterRemoteSrv.Serve(listeners) }() } + + clusterController.ManageQueryConnections( + mySQLServer.SessionManager().Iter, + sqlEngine.GetUnderlyingEngine().ProcessList.Kill, + mySQLServer.SessionManager().KillConnection, + ) } else { lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err) startError = err return } + } if ok, f := mrEnv.IsLocked(); ok { diff --git a/go/go.mod b/go/go.mod index 1b1f2b401c..71869e80ea 100644 --- a/go/go.mod +++ b/go/go.mod @@ -16,7 +16,7 @@ require ( github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371 github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 - github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e + github.com/dolthub/vitess v0.0.0-20220930181015-759b1acd9188 github.com/dustin/go-humanize v1.0.0 github.com/fatih/color v1.13.0 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 @@ -57,7 +57,7 @@ require ( require ( github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible github.com/cenkalti/backoff/v4 v4.1.3 - github.com/dolthub/go-mysql-server v0.12.1-0.20220929211840-02a9c38c169f + github.com/dolthub/go-mysql-server v0.12.1-0.20221003181623-42a4fb9ec5f8 github.com/google/flatbuffers v2.0.6+incompatible github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6 github.com/mitchellh/go-ps v1.0.0 diff --git a/go/go.sum b/go/go.sum index 319ff917e8..7d3f381d62 100644 --- a/go/go.sum +++ b/go/go.sum @@ -178,8 +178,8 @@ github.com/dolthub/flatbuffers v1.13.0-dh.1 h1:OWJdaPep22N52O/0xsUevxJ6Qfw1M2txC github.com/dolthub/flatbuffers v1.13.0-dh.1/go.mod h1:CorYGaDmXjHz1Z7i50PYXG1Ricn31GcA2wNOTFIQAKE= github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U= github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0= -github.com/dolthub/go-mysql-server v0.12.1-0.20220929211840-02a9c38c169f h1:+7jgFuHeF3Vj2eG7thWUprQNFz18Dg6f1Y/yow35KVk= -github.com/dolthub/go-mysql-server v0.12.1-0.20220929211840-02a9c38c169f/go.mod h1:Ndof+jmKE/AISRWgeyx+RUvNlAtMOPSUzTM/iCOfx70= +github.com/dolthub/go-mysql-server v0.12.1-0.20221003181623-42a4fb9ec5f8 h1:PcD6hKf6GkK+pNXZL+aD5rP4iyX8yjmXY21Puh5iF9U= +github.com/dolthub/go-mysql-server v0.12.1-0.20221003181623-42a4fb9ec5f8/go.mod h1:qQReFVsJ0CPJLxX3lAuZxYWi6T4twzErI//D2bSSttY= github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371 h1:oyPHJlzumKta1vnOQqUnfdz+pk3EmnHS3Nd0cCT0I2g= github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms= github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8= @@ -188,8 +188,8 @@ github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 h1:WRPDbpJWEnPxP github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66/go.mod h1:N5ZIbMGuDUpTpOFQ7HcsN6WSIpTGQjHP+Mz27AfmAgk= github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE= github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY= -github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e h1:vC5OgmUm1Dd8vQP1YqgRvYMbHvrLNdQkd3S7udbS3BQ= -github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e/go.mod h1:oVFIBdqMFEkt4Xz2fzFJBNtzKhDEjwdCF0dzde39iKs= +github.com/dolthub/vitess v0.0.0-20220930181015-759b1acd9188 h1:TJVpwZ8XY/fNK1ZgV9QMw+DE6WKJRi28Ruilk/qTmOU= +github.com/dolthub/vitess v0.0.0-20220930181015-759b1acd9188/go.mod h1:oVFIBdqMFEkt4Xz2fzFJBNtzKhDEjwdCF0dzde39iKs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= diff --git a/go/libraries/doltcore/dtestutils/schema.go b/go/libraries/doltcore/dtestutils/schema.go index ec2ce74290..ca0e2b9252 100644 --- a/go/libraries/doltcore/dtestutils/schema.go +++ b/go/libraries/doltcore/dtestutils/schema.go @@ -16,7 +16,6 @@ package dtestutils import ( "context" - "fmt" "math" "testing" @@ -28,8 +27,6 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/row" "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/libraries/doltcore/table" - "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" "github.com/dolthub/dolt/go/store/types" ) @@ -120,79 +117,6 @@ func CreateEmptyTestTable(t *testing.T, dEnv *env.DoltEnv, tableName string, sch require.NoError(t, err) } -// CreateTestTable creates a new test table with the name, schema, and rows given. -func CreateTestTable(t *testing.T, dEnv *env.DoltEnv, tableName string, sch schema.Schema, rs ...row.Row) { - imt := table.NewInMemTable(sch) - - for _, r := range rs { - _ = imt.AppendRow(r) - } - - ctx := context.Background() - vrw := dEnv.DoltDB.ValueReadWriter() - ns := dEnv.DoltDB.NodeStore() - - rowMap, err := types.NewMap(ctx, vrw) - require.NoError(t, err) - me := rowMap.Edit() - for i := 0; i < imt.NumRows(); i++ { - r, err := imt.GetRow(i) - require.NoError(t, err) - k, v := r.NomsMapKey(sch), r.NomsMapValue(sch) - me.Set(k, v) - } - rowMap, err = me.Map(ctx) - require.NoError(t, err) - - tbl, err := doltdb.NewNomsTable(ctx, vrw, ns, sch, rowMap, nil, nil) - require.NoError(t, err) - tbl, err = editor.RebuildAllIndexes(ctx, tbl, editor.TestEditorOptions(vrw)) - require.NoError(t, err) - - sch, err = tbl.GetSchema(ctx) - require.NoError(t, err) - rows, err := tbl.GetRowData(ctx) - require.NoError(t, err) - indexes, err := tbl.GetIndexSet(ctx) - require.NoError(t, err) - err = putTableToWorking(ctx, dEnv, sch, rows, indexes, tableName, nil) - require.NoError(t, err) -} - -func putTableToWorking(ctx context.Context, dEnv *env.DoltEnv, sch schema.Schema, rows durable.Index, indexData durable.IndexSet, tableName string, autoVal types.Value) error { - root, err := dEnv.WorkingRoot(ctx) - if err != nil { - return fmt.Errorf("%w: %v", doltdb.ErrNomsIO, err) - } - - vrw := dEnv.DoltDB.ValueReadWriter() - ns := dEnv.DoltDB.NodeStore() - tbl, err := doltdb.NewTable(ctx, vrw, ns, sch, rows, indexData, autoVal) - if err != nil { - return err - } - - newRoot, err := root.PutTable(ctx, tableName, tbl) - if err != nil { - return err - } - - rootHash, err := root.HashOf() - if err != nil { - return err - } - - newRootHash, err := newRoot.HashOf() - if err != nil { - return err - } - if rootHash == newRootHash { - return nil - } - - return dEnv.UpdateWorkingRoot(ctx, newRoot) -} - // MustSchema takes a variable number of columns and returns a schema. func MustSchema(cols ...schema.Column) schema.Schema { hasPKCols := false diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index f7ef49c52c..4f8407a6f4 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -17,6 +17,7 @@ package remotesrv import ( "context" "encoding/base64" + "errors" "fmt" "io" "net/url" @@ -38,6 +39,8 @@ import ( "github.com/dolthub/dolt/go/store/types" ) +var ErrUnimplemented = errors.New("unimplemented") + type RemoteChunkStore struct { HttpHost string csCache DBCache @@ -79,7 +82,10 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -133,7 +139,10 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -199,7 +208,10 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore nextPath := getRepoPath(req) if nextPath != repoPath { repoPath = nextPath - cs = rs.getStore(logger, repoPath) + cs, err = rs.getStore(logger, repoPath) + if err != nil { + return err + } if cs == nil { return status.Error(codes.Internal, "Could not get chunkstore") } @@ -292,7 +304,10 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -341,7 +356,10 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -349,7 +367,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe logger.Printf("found %s", repoPath) - err := cs.Rebase(ctx) + err = cs.Rebase(ctx) if err != nil { logger.Printf("error occurred during processing of Rebace rpc of %s details: %v", repoPath, err) @@ -364,7 +382,10 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -385,7 +406,10 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -399,7 +423,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount) } - err := cs.AddTableFilesToManifest(ctx, updates) + err = cs.AddTableFilesToManifest(ctx, updates) if err != nil { logger.Printf("error occurred updating the manifest: %s", err.Error()) @@ -426,12 +450,15 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion) + cs, err := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") } - err := cs.Rebase(ctx) + err = cs.Rebase(ctx) if err != nil { return nil, err } @@ -453,7 +480,10 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi. defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -522,7 +552,10 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -536,7 +569,7 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount) } - err := cs.AddTableFilesToManifest(ctx, updates) + err = cs.AddTableFilesToManifest(ctx, updates) if err != nil { logger.Printf("error occurred updating the manifest: %s", err.Error()) @@ -546,18 +579,20 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A return &remotesapi.AddTableFilesResponse{Success: true}, nil } -func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) RemoteSrvStore { +func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) (RemoteSrvStore, error) { return rs.getOrCreateStore(logger, repoPath, types.Format_Default.VersionString()) } -func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) RemoteSrvStore { +func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) (RemoteSrvStore, error) { cs, err := rs.csCache.Get(repoPath, nbfVerStr) - if err != nil { logger.Printf("Failed to retrieve chunkstore for %s\n", repoPath) + if errors.Is(err, ErrUnimplemented) { + return nil, status.Error(codes.Unimplemented, err.Error()) + } + return nil, err } - - return cs + return cs, nil } var requestId int32 diff --git a/go/libraries/doltcore/sqle/cluster/assume_role.go b/go/libraries/doltcore/sqle/cluster/assume_role.go index 52ea6cf7e6..8845ae8bed 100644 --- a/go/libraries/doltcore/sqle/cluster/assume_role.go +++ b/go/libraries/doltcore/sqle/cluster/assume_role.go @@ -15,9 +15,15 @@ package cluster import ( + "errors" + "github.com/dolthub/go-mysql-server/sql" + + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" ) +var ErrServerTransitionedRolesErr = errors.New("this server transitioned cluster roles. this connection can no longer be used. please reconnect.") + func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureDetails { return sql.ExternalStoredProcedureDetails{ Name: "dolt_assume_cluster_role", @@ -29,10 +35,19 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD }, }, Function: func(ctx *sql.Context, role string, epoch int) (sql.RowIter, error) { - err := controller.setRoleAndEpoch(role, epoch) + if role == string(RoleDetectedBrokenConfig) { + return nil, errors.New("cannot set role to detected_broken_config; valid values are 'primary' and 'standby'") + } + changerole, err := controller.setRoleAndEpoch(role, epoch, true /* graceful */, int(ctx.Session.ID())) if err != nil { + // We did not transition, no need to set our session to read-only, etc. return nil, err } + if changerole { + // We transitioned, make sure we do not run anymore queries on this session. + ctx.Session.SetTransaction(nil) + dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerTransitionedRolesErr) + } return sql.RowsToRowIter(sql.Row{0}), nil }, } diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 44a475607e..d3a5e6cac1 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -38,7 +38,6 @@ type commithook struct { lgr atomic.Value // *logrus.Entry remotename string dbname string - lout io.Writer mu sync.Mutex wg sync.WaitGroup cond *sync.Cond @@ -48,6 +47,11 @@ type commithook struct { nextHeadIncomingTime time.Time lastSuccess time.Time currentError *string + cancelReplicate func() + + // waitNotify is set by controller when it needs to track whether the + // commithooks are caught up with replicating to the standby. + waitNotify func() role Role @@ -136,6 +140,9 @@ func (h *commithook) replicate(ctx context.Context) { h.attemptReplicate(ctx) } else { lgr.Tracef("cluster/commithook: background thread: waiting for signal.") + if h.waitNotify != nil { + h.waitNotify() + } h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") } @@ -144,15 +151,28 @@ func (h *commithook) replicate(ctx context.Context) { // called with h.mu locked. func (h *commithook) shouldReplicate() bool { - if h.role != RolePrimary { - return false - } - if h.nextHead == h.lastPushedHead { + if h.isCaughtUp() { return false } return (h.nextPushAttempt == (time.Time{}) || time.Now().After(h.nextPushAttempt)) } +// called with h.mu locked. Returns true if the standby is true-d up, false +// otherwise. Different from shouldReplicate() in that it does not care about +// nextPushAttempt, for example. Used in Controller.waitForReplicate. +func (h *commithook) isCaughtUp() bool { + if h.role != RolePrimary { + return true + } + return h.nextHead == h.lastPushedHead +} + +func (h *commithook) isCaughtUpLocking() bool { + h.mu.Lock() + defer h.mu.Unlock() + return h.isCaughtUp() +} + // called with h.mu locked. func (h *commithook) primaryNeedsInit() bool { return h.role == RolePrimary && h.nextHead == (hash.Hash{}) @@ -168,6 +188,13 @@ func (h *commithook) attemptReplicate(ctx context.Context) { toPush := h.nextHead incomingTime := h.nextHeadIncomingTime destDB := h.destDB + ctx, h.cancelReplicate = context.WithCancel(ctx) + defer func() { + if h.cancelReplicate != nil { + h.cancelReplicate() + } + h.cancelReplicate = nil + }() h.mu.Unlock() if destDB == nil { @@ -183,6 +210,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) { if toPush == h.nextHead { h.nextPushAttempt = time.Now().Add(1 * time.Second) } + h.cancelReplicate = nil return } lgr.Tracef("cluster/commithook: fetched destDB") @@ -208,20 +236,22 @@ func (h *commithook) attemptReplicate(ctx context.Context) { } h.mu.Lock() - if err == nil { - h.currentError = nil - lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB") - h.lastPushedHead = toPush - h.lastSuccess = incomingTime - h.nextPushAttempt = time.Time{} - } else { - h.currentError = new(string) - *h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err) - lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err) - // add some delay if a new head didn't come in while we were pushing. - if toPush == h.nextHead { - // TODO: We could add some backoff here. - h.nextPushAttempt = time.Now().Add(1 * time.Second) + if h.role == RolePrimary { + if err == nil { + h.currentError = nil + lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB") + h.lastPushedHead = toPush + h.lastSuccess = incomingTime + h.nextPushAttempt = time.Time{} + } else { + h.currentError = new(string) + *h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err) + lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err) + // add some delay if a new head didn't come in while we were pushing. + if toPush == h.nextHead { + // TODO: We could add some backoff here. + h.nextPushAttempt = time.Now().Add(1 * time.Second) + } } } } @@ -279,7 +309,7 @@ func (h *commithook) tick(ctx context.Context) { func (h *commithook) recordSuccessfulRemoteSrvCommit() { h.mu.Lock() defer h.mu.Unlock() - if h.role == RolePrimary { + if h.role != RoleStandby { return } h.lastSuccess = time.Now() @@ -298,9 +328,24 @@ func (h *commithook) setRole(role Role) { h.nextPushAttempt = time.Time{} h.role = role h.lgr.Store(h.rootLgr.WithField(logFieldRole, string(role))) + if h.cancelReplicate != nil { + h.cancelReplicate() + h.cancelReplicate = nil + } + if role == RoleDetectedBrokenConfig { + h.currentError = &errDetectedBrokenConfigStr + } h.cond.Signal() } +func (h *commithook) setWaitNotify(f func()) { + h.mu.Lock() + defer h.mu.Unlock() + h.waitNotify = f +} + +var errDetectedBrokenConfigStr = "error: more than one server was configured as primary in the same epoch. this server has stopped accepting writes. choose a primary in the cluster and call dolt_assume_cluster_role() on servers in the cluster to start replication at a higher epoch" + // Execute on this commithook updates the target root hash we're attempting to // replicate and wakes the replication thread. func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { @@ -314,6 +359,11 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat } h.mu.Lock() defer h.mu.Unlock() + lgr = h.logger() + if h.role != RolePrimary { + lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID()) + return nil + } if root != h.nextHead { lgr.Tracef("signaling replication thread to push new head: %v", root.String()) h.nextHeadIncomingTime = time.Now() @@ -329,7 +379,6 @@ func (h *commithook) HandleError(ctx context.Context, err error) error { } func (h *commithook) SetLogger(ctx context.Context, wr io.Writer) error { - h.lout = wr return nil } diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 9014fb56c7..492ea7c012 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -16,9 +16,11 @@ package cluster import ( "context" + "errors" "fmt" "strconv" "sync" + "time" "github.com/dolthub/go-mysql-server/sql" "github.com/sirupsen/logrus" @@ -38,6 +40,7 @@ type Role string const RolePrimary Role = "primary" const RoleStandby Role = "standby" +const RoleDetectedBrokenConfig Role = "detected_broken_config" const PersistentConfigPrefix = "sqlserver.cluster" @@ -52,12 +55,23 @@ type Controller struct { sinterceptor serverinterceptor cinterceptor clientinterceptor lgr *logrus.Logger + + provider dbProvider + iterSessions IterSessions + killQuery func(uint32) + killConnection func(uint32) error } type sqlvars interface { AddSystemVariables(sysVars []sql.SystemVariable) } +// We can manage certain aspects of the exposed databases on the server through +// this. +type dbProvider interface { + SetIsStandby(bool) +} + type procedurestore interface { Register(sql.ExternalStoredProcedureDetails) } @@ -84,10 +98,15 @@ func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) commithooks: make([]*commithook, 0), lgr: lgr, } + roleSetter := func(role string, epoch int) { + ret.setRoleAndEpoch(role, epoch, false /* graceful */, -1 /* saveConnID */) + } ret.sinterceptor.lgr = lgr.WithFields(logrus.Fields{}) ret.sinterceptor.setRole(role, epoch) + ret.sinterceptor.roleSetter = roleSetter ret.cinterceptor.lgr = lgr.WithFields(logrus.Fields{}) ret.cinterceptor.setRole(role, epoch) + ret.cinterceptor.roleSetter = roleSetter return ret, nil } @@ -122,6 +141,29 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql. return nil } +type IterSessions func(func(sql.Session) (bool, error)) error + +func (c *Controller) ManageDatabaseProvider(p dbProvider) { + if c == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + c.provider = p + c.setProviderIsStandby(c.role != RolePrimary) +} + +func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery func(uint32), killConnection func(uint32) error) { + if c == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + c.iterSessions = iterSessions + c.killQuery = killQuery + c.killConnection = killConnection +} + func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv) ([]*commithook, error) { ttfdir, err := denv.TempTableFilesDir() if err != nil { @@ -210,6 +252,7 @@ func (c *Controller) persistVariables() error { func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) (Role, int, error) { toset := make(map[string]string) persistentRole := pCfg.GetStringOrDefault(DoltClusterRoleVariable, "") + var roleFromPersistentConfig bool persistentEpoch := pCfg.GetStringOrDefault(DoltClusterRoleEpochVariable, "") if persistentRole == "" { if cfg.BootstrapRole() != "" { @@ -221,6 +264,7 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea } toset[DoltClusterRoleVariable] = persistentRole } else { + roleFromPersistentConfig = true lgr.Tracef("cluster/controller: persisted cluster role is %s", persistentRole) } if persistentEpoch == "" { @@ -231,7 +275,10 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea lgr.Tracef("cluster/controller: persisted cluster role epoch is %s", persistentEpoch) } if persistentRole != string(RolePrimary) && persistentRole != string(RoleStandby) { - return "", 0, fmt.Errorf("persisted role %s.%s = %s must be \"primary\" or \"secondary\"", PersistentConfigPrefix, DoltClusterRoleVariable, persistentRole) + isallowed := persistentRole == string(RoleDetectedBrokenConfig) && roleFromPersistentConfig + if !isallowed { + return "", 0, fmt.Errorf("persisted role %s.%s = %s must be \"primary\" or \"secondary\"", PersistentConfigPrefix, DoltClusterRoleVariable, persistentRole) + } } epochi, err := strconv.Atoi(persistentEpoch) if err != nil { @@ -246,39 +293,61 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea return Role(persistentRole), epochi, nil } -func (c *Controller) setRoleAndEpoch(role string, epoch int) error { +func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, saveConnID int) (bool, error) { c.mu.Lock() defer c.mu.Unlock() if epoch == c.epoch && role == string(c.role) { - return nil - } - if epoch == c.epoch { - return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role) - } - if epoch < c.epoch { - return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch) + return false, nil } - if role != "primary" && role != "standby" { - return fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role) + if role != string(RolePrimary) && role != string(RoleStandby) && role != string(RoleDetectedBrokenConfig) { + return false, fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role) + } + + if epoch < c.epoch { + return false, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch) + } + if epoch == c.epoch { + // This is allowed for non-graceful transitions to 'standby', which only occur from interceptors and + // other signals that the cluster is misconfigured. + isallowed := !graceful && (role == string(RoleStandby) || role == string(RoleDetectedBrokenConfig)) + if !isallowed { + return false, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role) + } } changedrole := role != string(c.role) + if changedrole { + var err error + if role == string(RoleStandby) { + if graceful { + err = c.gracefulTransitionToStandby(saveConnID) + if err != nil { + return false, err + } + } else { + c.immediateTransitionToStandby() + } + } else if role == string(RoleDetectedBrokenConfig) { + c.immediateTransitionToStandby() + } else { + c.transitionToPrimary(saveConnID) + } + } + c.role = Role(role) c.epoch = epoch - if changedrole { - // TODO: Role is transitioning...lots of stuff to do. - } - c.refreshSystemVars() c.cinterceptor.setRole(c.role, c.epoch) c.sinterceptor.setRole(c.role, c.epoch) - for _, h := range c.commithooks { - h.setRole(c.role) + if changedrole { + for _, h := range c.commithooks { + h.setRole(c.role) + } } - return c.persistVariables() + return changedrole, c.persistVariables() } func (c *Controller) roleAndEpoch() (Role, int) { @@ -339,3 +408,136 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server args.DBCache = remotesrvStoreCache{args.DBCache, c} return args } + +// The order of operations is: +// * Set all databases in database_provider to read-only. +// * Kill all running queries in GMS. +// * Replicate all databases to their standby remotes. +// - If success, return success. +// - If failure, set all databases in database_provider back to their original state. Return failure. +// +// saveConnID is potentially a connID of the caller to +// dolt_assume_cluster_role(), which should not be killed with the other +// connections. That connection will be transitioned to a terminal error state +// after returning the results of dolt_assume_cluster_role(). +// +// called with c.mu held +func (c *Controller) gracefulTransitionToStandby(saveConnID int) error { + c.setProviderIsStandby(true) + c.killRunningQueries(saveConnID) + // TODO: this can block with c.mu held, although we are not too + // interested in the server proceeding gracefully while this is + // happening. + if err := c.waitForHooksToReplicate(); err != nil { + c.setProviderIsStandby(false) + c.killRunningQueries(saveConnID) + return err + } + return nil +} + +// The order of operations is: +// * Set all databases in database_provider to read-only. +// * Kill all running queries in GMS. +// * Return success. NOTE: we do not attempt to replicate to the standby. +// +// called with c.mu held +func (c *Controller) immediateTransitionToStandby() error { + c.setProviderIsStandby(true) + c.killRunningQueries(-1) + return nil +} + +// The order of operations is: +// * Set all databases in database_provider back to their original mode: read-write or read only. +// * Kill all running queries in GMS. +// * Return success. +// +// saveConnID is potentially the connID of the caller to +// dolt_assume_cluster_role(). +// +// called with c.mu held +func (c *Controller) transitionToPrimary(saveConnID int) error { + c.setProviderIsStandby(false) + c.killRunningQueries(saveConnID) + return nil +} + +// Kills all running queries in the managed GMS engine. +// called with c.mu held +func (c *Controller) killRunningQueries(saveConnID int) { + if c.iterSessions != nil { + c.iterSessions(func(session sql.Session) (stop bool, err error) { + if int(session.ID()) != saveConnID { + c.killQuery(session.ID()) + c.killConnection(session.ID()) + } + return + }) + } +} + +// called with c.mu held +func (c *Controller) setProviderIsStandby(standby bool) { + if c.provider != nil { + c.provider.SetIsStandby(standby) + } +} + +const waitForHooksToReplicateTimeout = 10 * time.Second + +// Called during a graceful transition from primary to standby. Waits until all +// commithooks report nextHead == lastPushedHead. +// +// TODO: make the deadline here configurable or something. +// +// called with c.mu held +func (c *Controller) waitForHooksToReplicate() error { + caughtup := make([]bool, len(c.commithooks)) + var wg sync.WaitGroup + wg.Add(len(c.commithooks)) + for li, lch := range c.commithooks { + i := li + ch := lch + if ch.isCaughtUpLocking() { + caughtup[i] = true + wg.Done() + } else { + ch.setWaitNotify(func() { + // called with ch.mu locked. + if !caughtup[i] && ch.isCaughtUp() { + caughtup[i] = true + wg.Done() + } + }) + } + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + var success bool + select { + case <-done: + success = true + case <-time.After(waitForHooksToReplicateTimeout): + success = false + } + for _, ch := range c.commithooks { + ch.setWaitNotify(nil) + } + // make certain we don't leak the wg.Wait goroutine in the failure case. + for _, b := range caughtup { + if !b { + wg.Done() + } + } + if success { + c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.") + return nil + } else { + c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.") + return errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.") + } +} diff --git a/go/libraries/doltcore/sqle/cluster/interceptors.go b/go/libraries/doltcore/sqle/cluster/interceptors.go index a0b40f0093..9bfff5bc90 100644 --- a/go/libraries/doltcore/sqle/cluster/interceptors.go +++ b/go/libraries/doltcore/sqle/cluster/interceptors.go @@ -36,19 +36,20 @@ const clusterRoleEpochHeader = "x-dolt-cluster-role-epoch" // interceptor anytime it changes. In turn, this interceptor: // * adds the server's current role and epoch to the request headers for every // outbound request. -// * fails all outgoing requests immediately with codes.Unavailable if the role -// == RoleStandby, since this server should not be replicating when it believes -// it is a standby. +// * fails all outgoing requests immediately with codes.FailedPrecondition if +// the role == RoleStandby, since this server should not be replicating when it +// believes it is a standby. // * watches returned response headers for a situation which causes this server // to force downgrade from primary to standby. In particular, when a returned // response header asserts that the standby replica is a primary at a higher // epoch than this server, this incterceptor coordinates with the Controller to // immediately transition to standby and to stop replicating to the standby. type clientinterceptor struct { - lgr *logrus.Entry - role Role - epoch int - mu sync.Mutex + lgr *logrus.Entry + role Role + epoch int + mu sync.Mutex + roleSetter func(role string, epoch int) } func (ci *clientinterceptor) setRole(role Role, epoch int) { @@ -67,8 +68,12 @@ func (ci *clientinterceptor) getRole() (Role, int) { func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { role, epoch := ci.getRole() + ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role)) if role == RoleStandby { - return nil, status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby") + return nil, status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby") + } + if role == RoleDetectedBrokenConfig { + return nil, status.Error(codes.FailedPrecondition, "this server is in detected_broken_config and is not currently replicating to its standby") } ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch)) var header metadata.MD @@ -81,8 +86,12 @@ func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor { func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { role, epoch := ci.getRole() + ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role)) if role == RoleStandby { - return status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby") + return status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby") + } + if role == RoleDetectedBrokenConfig { + return status.Error(codes.FailedPrecondition, "this server is in detected_broken_config and is not currently replicating to its standby") } ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch)) var header metadata.MD @@ -95,18 +104,24 @@ func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor { func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, role Role, epoch int) { epochs := header.Get(clusterRoleEpochHeader) roles := header.Get(clusterRoleHeader) - if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) { + if len(epochs) > 0 && len(roles) > 0 { if respepoch, err := strconv.Atoi(epochs[0]); err == nil { - if respepoch == epoch { - ci.lgr.Errorf("cluster: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.") - // TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|... - } else if respepoch > epoch { - // The server we replicate to thinks it is the primary at a higher epoch than us... - ci.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch) - // TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|... + if roles[0] == string(RolePrimary) { + if respepoch == epoch { + ci.lgr.Errorf("cluster: clientinterceptor: this server and the server replicating to it are both primary at the same epoch. force transitioning to detected_broken_config.") + ci.roleSetter(string(RoleDetectedBrokenConfig), respepoch) + } else if respepoch > epoch { + // The server we replicate to thinks it is the primary at a higher epoch than us... + ci.lgr.Warnf("cluster: clientinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch) + ci.roleSetter(string(RoleStandby), respepoch) + } + } else if roles[0] == string(RoleDetectedBrokenConfig) && respepoch >= epoch { + ci.lgr.Errorf("cluster: clientinterceptor: this server learned from its standby that the standby is in detected_broken_config. force transitioning to detected_broken_config.") + ci.roleSetter(string(RoleDetectedBrokenConfig), respepoch) } } } + ci.lgr.Warnf("cluster: clientinterceptor: response was missing role and epoch metadata") } func (ci *clientinterceptor) Options() []grpc.DialOption { @@ -122,8 +137,8 @@ func (ci *clientinterceptor) Options() []grpc.DialOption { // interceptor anytime it changes. In turn, this interceptor: // * adds the server's current role and epoch to the response headers for every // request. -// * fails all incoming requests immediately with codes.Unavailable if the -// current role == RolePrimary, since nothing should be replicating to us in +// * fails all incoming requests immediately with codes.FailedPrecondition if the +// current role != RoleStandby, since nothing should be replicating to us in // that state. // * watches incoming request headers for a situation which causes this server // to force downgrade from primary to standby. In particular, when an incoming @@ -131,10 +146,11 @@ func (ci *clientinterceptor) Options() []grpc.DialOption { // than our current epoch, this interceptor coordinates with the Controller to // immediately transition to standby and allow replication requests through. type serverinterceptor struct { - lgr *logrus.Entry - role Role - epoch int - mu sync.Mutex + lgr *logrus.Entry + role Role + epoch int + mu sync.Mutex + roleSetter func(role string, epoch int) } func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor { @@ -150,7 +166,11 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor { } if role == RolePrimary { // As a primary, we do not accept replication requests. - return status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication") + return status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication") + } + if role == RoleDetectedBrokenConfig { + // As a primary, we do not accept replication requests. + return status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication") } return handler(srv, ss) } @@ -169,7 +189,11 @@ func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor { } if role == RolePrimary { // As a primary, we do not accept replication requests. - return nil, status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication") + return nil, status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication") + } + if role == RoleDetectedBrokenConfig { + // As a primary, we do not accept replication requests. + return nil, status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication") } return handler(ctx, req) } @@ -186,12 +210,12 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role, // at the same epoch. We will become standby // and our peer will become standby. An // operator will need to get involved. - si.lgr.Errorf("cluster: this server and its standby replica are both primary at the same epoch. force transitioning to standby.") - // TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch| + si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to detected_broken_config.") + si.roleSetter(string(RoleDetectedBrokenConfig), reqepoch) } else if reqepoch > epoch { // The client replicating to us thinks it is the primary at a higher epoch than us. - si.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch) - // TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch| + si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch) + si.roleSetter(string(RoleStandby), reqepoch) } } } diff --git a/go/libraries/doltcore/sqle/cluster/interceptors_test.go b/go/libraries/doltcore/sqle/cluster/interceptors_test.go index 132b3bab8e..99b4897be5 100644 --- a/go/libraries/doltcore/sqle/cluster/interceptors_test.go +++ b/go/libraries/doltcore/sqle/cluster/interceptors_test.go @@ -20,6 +20,8 @@ import ( "sync" "testing" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -43,6 +45,11 @@ func (s *server) Watch(req *grpc_health_v1.HealthCheckRequest, ss grpc_health_v1 return status.Errorf(codes.Unimplemented, "method Watch not implemented") } +func noopSetRole(string, int) { +} + +var lgr = logrus.StandardLogger().WithFields(logrus.Fields{}) + func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient), serveropts []grpc.ServerOption, dialopts []grpc.DialOption) *server { addr, err := net.ResolveUnixAddr("unix", "test_grpc.socket") require.NoError(t, err) @@ -85,6 +92,8 @@ func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient), func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) { var si serverinterceptor si.setRole(RoleStandby, 10) + si.roleSetter = noopSetRole + si.lgr = lgr withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { var md metadata.MD _, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md)) @@ -101,6 +110,8 @@ func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) { func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) { var si serverinterceptor si.setRole(RoleStandby, 10) + si.roleSetter = noopSetRole + si.lgr = lgr withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { var md metadata.MD srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md)) @@ -119,15 +130,17 @@ func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) { func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) { var si serverinterceptor si.setRole(RolePrimary, 10) + si.roleSetter = noopSetRole + si.lgr = lgr srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { ctx := metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value") _, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) - assert.Equal(t, codes.Unavailable, status.Code(err)) + assert.Equal(t, codes.FailedPrecondition, status.Code(err)) ctx = metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value") ss, err := client.Watch(ctx, &grpc_health_v1.HealthCheckRequest{}) assert.NoError(t, err) _, err = ss.Recv() - assert.Equal(t, codes.Unavailable, status.Code(err)) + assert.Equal(t, codes.FailedPrecondition, status.Code(err)) }, si.Options(), nil) assert.Nil(t, srv.md) } @@ -135,6 +148,8 @@ func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) { func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) { var ci clientinterceptor ci.setRole(RolePrimary, 10) + ci.roleSetter = noopSetRole + ci.lgr = lgr srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { _, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}) assert.Equal(t, codes.Unimplemented, status.Code(err)) @@ -150,6 +165,8 @@ func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) { func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) { var ci clientinterceptor ci.setRole(RolePrimary, 10) + ci.roleSetter = noopSetRole + ci.lgr = lgr srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}) require.NoError(t, err) @@ -167,14 +184,16 @@ func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) { func TestClientInterceptorAsStandbyDoesNotSendRequest(t *testing.T) { var ci clientinterceptor ci.setRole(RolePrimary, 10) + ci.roleSetter = noopSetRole + ci.lgr = lgr srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { _, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}) assert.Equal(t, codes.Unimplemented, status.Code(err)) ci.setRole(RoleStandby, 11) _, err = client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}) - assert.Equal(t, codes.Unavailable, status.Code(err)) + assert.Equal(t, codes.FailedPrecondition, status.Code(err)) _, err = client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}) - assert.Equal(t, codes.Unavailable, status.Code(err)) + assert.Equal(t, codes.FailedPrecondition, status.Code(err)) }, nil, ci.Options()) if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) { assert.Equal(t, "primary", srv.md.Get(clusterRoleHeader)[0]) diff --git a/go/libraries/doltcore/sqle/common_test.go b/go/libraries/doltcore/sqle/common_test.go index 7117238219..8858172864 100644 --- a/go/libraries/doltcore/sqle/common_test.go +++ b/go/libraries/doltcore/sqle/common_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" - "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/row" "github.com/dolthub/dolt/go/libraries/doltcore/schema" @@ -181,12 +181,34 @@ func assertSchemasEqual(t *testing.T, expected, actual sql.Schema) { // CreateTableFn returns a SetupFunc that creates a table with the rows given // todo(andy): replace with ExecuteSetupSQL -func CreateTableFn(tableName string, tableSchema schema.Schema, initialRows ...row.Row) SetupFn { +func CreateTableFn(tableName string, tableSchema schema.Schema, queries string) SetupFn { return func(t *testing.T, dEnv *env.DoltEnv) { - dtestutils.CreateTestTable(t, dEnv, tableName, tableSchema, initialRows...) + CreateTestTable(t, dEnv, tableName, tableSchema, queries) } } +// CreateTestTable creates a new test table with the name, schema, and rows given. +func CreateTestTable(t *testing.T, dEnv *env.DoltEnv, tableName string, sch schema.Schema, queries string) { + ctx := context.Background() + root, err := dEnv.WorkingRoot(ctx) + require.NoError(t, err) + vrw := dEnv.DoltDB.ValueReadWriter() + ns := dEnv.DoltDB.NodeStore() + + rows, err := durable.NewEmptyIndex(ctx, vrw, ns, sch) + require.NoError(t, err) + tbl, err := doltdb.NewTable(ctx, vrw, ns, sch, rows, nil, nil) + require.NoError(t, err) + root, err = root.PutTable(ctx, tableName, tbl) + require.NoError(t, err) + err = dEnv.UpdateWorkingRoot(ctx, root) + require.NoError(t, err) + root, err = ExecuteSql(t, dEnv, root, queries) + require.NoError(t, err) + err = dEnv.UpdateWorkingRoot(ctx, root) + require.NoError(t, err) +} + func ExecuteSetupSQL(ctx context.Context, queries string) SetupFn { return func(t *testing.T, dEnv *env.DoltEnv) { root, err := dEnv.WorkingRoot(ctx) diff --git a/go/libraries/doltcore/sqle/database_provider.go b/go/libraries/doltcore/sqle/database_provider.go index 498534bff8..c8f0de9cfc 100644 --- a/go/libraries/doltcore/sqle/database_provider.go +++ b/go/libraries/doltcore/sqle/database_provider.go @@ -54,6 +54,7 @@ type DoltDatabaseProvider struct { remoteDialer dbfactory.GRPCDialProvider // TODO: why isn't this a method defined on the remote object dbFactoryUrl string + isStandby *bool } var _ sql.DatabaseProvider = (*DoltDatabaseProvider)(nil) @@ -115,6 +116,7 @@ func NewDoltDatabaseProviderWithDatabases(defaultBranch string, fs filesys.Files defaultBranch: defaultBranch, dbFactoryUrl: doltdb.LocalDirDoltDB, InitDatabaseHook: ConfigureReplicationDatabaseHook, + isStandby: new(bool), }, nil } @@ -147,6 +149,15 @@ func (p DoltDatabaseProvider) FileSystem() filesys.Filesys { return p.fs } +// If this DatabaseProvider is set to standby |true|, it returns every dolt +// database as a read only database. Set back to |false| to get read-write +// behavior from dolt databases again. +func (p DoltDatabaseProvider) SetIsStandby(standby bool) { + p.mu.Lock() + defer p.mu.Unlock() + *p.isStandby = standby +} + // FileSystemForDatabase returns a filesystem, with the working directory set to the root directory // of the requested database. If the requested database isn't found, a database not found error // is returned. @@ -167,9 +178,10 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da var ok bool p.mu.RLock() db, ok = p.databases[formatDbMapKeyName(name)] + standby := *p.isStandby p.mu.RUnlock() if ok { - return db, nil + return wrapForStandby(db, standby), nil } db, _, ok, err = p.databaseForRevision(ctx, name) @@ -188,7 +200,21 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da } // Don't track revision databases, just instantiate them on demand - return db, nil + return wrapForStandby(db, standby), nil +} + +func wrapForStandby(db sql.Database, standby bool) sql.Database { + if !standby { + return db + } + if _, ok := db.(ReadOnlyDatabase); ok { + return db + } + if db, ok := db.(Database); ok { + // :-/. Hopefully it's not too sliced. + return ReadOnlyDatabase{db} + } + return db } // attemptCloneReplica attempts to clone a database from the configured replication remote URL template, returning an error diff --git a/go/libraries/doltcore/sqle/dsess/session.go b/go/libraries/doltcore/sqle/dsess/session.go index 6caf876373..e0678672c8 100644 --- a/go/libraries/doltcore/sqle/dsess/session.go +++ b/go/libraries/doltcore/sqle/dsess/session.go @@ -60,6 +60,10 @@ type DoltSession struct { tempTables map[string][]sql.Table globalsConf config.ReadWriteConfig mu *sync.Mutex + + // If non-nil, this will be returned from ValidateSession. + // Used by sqle/cluster to put a session into a terminal err state. + validateErr error } var _ sql.Session = (*DoltSession)(nil) @@ -178,10 +182,22 @@ func (d *DoltSession) Flush(ctx *sql.Context, dbName string) error { return d.SetRoot(ctx, dbName, ws.WorkingRoot()) } +// SetValidateErr sets an error on this session to be returned from every call +// to ValidateSession. This is effectively a way to disable a session. +// +// Used by sql/cluster logic to make sessions on a server which has +// transitioned roles termainlly error. +func (d *DoltSession) SetValidateErr(err error) { + d.validateErr = err +} + // ValidateSession validates a working set if there are a valid sessionState with non-nil working set. // If there is no sessionState or its current working set not defined, then no need for validation, // so no error is returned. func (d *DoltSession) ValidateSession(ctx *sql.Context, dbName string) error { + if d.validateErr != nil { + return d.validateErr + } sessionState, ok, err := d.LookupDbState(ctx, dbName) if !ok { return nil diff --git a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go index 5acf95ab0a..b6705fd7a0 100644 --- a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go +++ b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go @@ -46,7 +46,7 @@ var skipPrepared bool // SkipPreparedsCount is used by the "ci-check-repo CI workflow // as a reminder to consider prepareds when adding a new // enginetest suite. -const SkipPreparedsCount = 79 +const SkipPreparedsCount = 80 const skipPreparedFlag = "DOLT_SKIP_PREPARED_ENGINETESTS" @@ -258,6 +258,11 @@ func TestQueryPlans(t *testing.T) { enginetest.TestQueryPlans(t, harness, queries.PlanTests) } +func TestIntegrationQueryPlans(t *testing.T) { + harness := newDoltHarness(t).WithParallelism(1) + enginetest.TestIntegrationPlans(t, harness) +} + func TestDoltDiffQueryPlans(t *testing.T) { harness := newDoltHarness(t).WithParallelism(2) // want Exchange nodes harness.Setup(setup.SimpleSetup...) diff --git a/go/libraries/doltcore/sqle/index/mergeable_indexes_test.go b/go/libraries/doltcore/sqle/index/mergeable_indexes_test.go index 4dc2e762ab..d2310890d1 100644 --- a/go/libraries/doltcore/sqle/index/mergeable_indexes_test.go +++ b/go/libraries/doltcore/sqle/index/mergeable_indexes_test.go @@ -39,7 +39,7 @@ import ( // that the final output is as expected. func TestMergeableIndexes(t *testing.T) { if types.Format_Default != types.Format_LD_1 { - t.Skip() + t.Skip() // this test is specific to Noms ranges } engine, denv, root, db, indexTuples := setupIndexes(t, "test", `INSERT INTO test VALUES @@ -1379,7 +1379,7 @@ func TestMergeableIndexes(t *testing.T) { // TODO: disassociate NULL ranges from value ranges and fix the intermediate ranges (finalRanges). func TestMergeableIndexesNulls(t *testing.T) { if types.Format_Default != types.Format_LD_1 { - t.Skip() + t.Skip() // this test is specific to Noms ranges } engine, denv, root, db, indexTuples := setupIndexes(t, "test", `INSERT INTO test VALUES diff --git a/go/libraries/doltcore/sqle/integration_test/stockmarket_test.go b/go/libraries/doltcore/sqle/integration_test/stockmarket_test.go index 03f75be5fe..f12e49aca2 100644 --- a/go/libraries/doltcore/sqle/integration_test/stockmarket_test.go +++ b/go/libraries/doltcore/sqle/integration_test/stockmarket_test.go @@ -20279,10 +20279,10 @@ func TestExplain(t *testing.T) { expectedExplain := "IndexedJoin(d.Symbol = t.Symbol)\n" + " ├─ TableAlias(d)\n" + " │ └─ Table(daily_summary)\n" + - " │ └─ columns: [Type Symbol Country TradingDate Open High Low Close Volume OpenInt]\n" + + " │ └─ columns: [type symbol country tradingdate open high low close volume openint]\n" + " └─ TableAlias(t)\n" + " └─ IndexedTableAccess(symbols)\n" + " ├─ index: [symbols.Symbol]\n" + - " └─ columns: [Symbol Name Sector IPOYear]" + " └─ columns: [symbol name sector ipoyear]" assert.Equal(t, expectedExplain, strings.Join(rowStrings, "\n")) } diff --git a/go/libraries/doltcore/sqle/remotesrv.go b/go/libraries/doltcore/sqle/remotesrv.go index 8932cd040d..d172df5bce 100644 --- a/go/libraries/doltcore/sqle/remotesrv.go +++ b/go/libraries/doltcore/sqle/remotesrv.go @@ -15,8 +15,6 @@ package sqle import ( - "errors" - "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" @@ -37,15 +35,15 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e if err != nil { return nil, err } - ddb, ok := db.(Database) + sdb, ok := db.(SqlDatabase) if !ok { - return nil, errors.New("unimplemented") + return nil, remotesrv.ErrUnimplemented } - datasdb := doltdb.HackDatasDatabaseFromDoltDB(ddb.DbData().Ddb) + datasdb := doltdb.HackDatasDatabaseFromDoltDB(sdb.DbData().Ddb) cs := datas.ChunkStoreFromDatabase(datasdb) rss, ok := cs.(remotesrv.RemoteSrvStore) if !ok { - return nil, errors.New("unimplemented") + return nil, remotesrv.ErrUnimplemented } return rss, nil } diff --git a/go/libraries/doltcore/sqle/sqlbatch_test.go b/go/libraries/doltcore/sqle/sqlbatch_test.go index f813104fdf..7911a2840d 100644 --- a/go/libraries/doltcore/sqle/sqlbatch_test.go +++ b/go/libraries/doltcore/sqle/sqlbatch_test.go @@ -16,27 +16,24 @@ package sqle import ( "context" - "fmt" "math/rand" + "sort" "testing" - "github.com/google/go-cmp/cmp" + "github.com/dolthub/go-mysql-server/sql" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" "github.com/dolthub/dolt/go/libraries/doltcore/row" + "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil" "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" "github.com/dolthub/dolt/go/store/types" ) func TestSqlBatchInserts(t *testing.T) { - if types.Format_Default != types.Format_LD_1 { - t.Skip() // todo: convert to enginetests - } - insertStatements := []string{ `insert into people (id, first_name, last_name, is_married, age, rating, uuid, num_episodes) values (7, "Maggie", "Simpson", false, 1, 5.1, '00000000-0000-0000-0000-000000000007', 677)`, @@ -90,36 +87,39 @@ func TestSqlBatchInserts(t *testing.T) { allAppearanceRows, err := GetAllRows(root, AppearancesTableName) require.NoError(t, err) - assert.ElementsMatch(t, AllPeopleRows, allPeopleRows) - assert.ElementsMatch(t, AllEpsRows, allEpsRows) - assert.ElementsMatch(t, AllAppsRows, allAppearanceRows) + AllPeopleSqlRows := ToSqlRows(PeopleTestSchema, AllPeopleRows...) + AllEpsSqlRows := ToSqlRows(EpisodesTestSchema, AllEpsRows...) + AllAppsSqlRows := ToSqlRows(AppearancesTestSchema, AllAppsRows...) + assert.ElementsMatch(t, AllPeopleSqlRows, allPeopleRows) + assert.ElementsMatch(t, AllEpsSqlRows, allEpsRows) + assert.ElementsMatch(t, AllAppsSqlRows, allAppearanceRows) // Now commit the batch and check for new rows err = db.Flush(sqlCtx) require.NoError(t, err) - var expectedPeople, expectedEpisodes, expectedAppearances []row.Row + var expectedPeople, expectedEpisodes, expectedAppearances []sql.Row - expectedPeople = append(expectedPeople, AllPeopleRows...) - expectedPeople = append(expectedPeople, + expectedPeople = append(expectedPeople, AllPeopleSqlRows...) + expectedPeople = append(expectedPeople, ToSqlRows(PeopleTestSchema, NewPeopleRowWithOptionalFields(7, "Maggie", "Simpson", false, 1, 5.1, uuid.MustParse("00000000-0000-0000-0000-000000000007"), 677), NewPeopleRowWithOptionalFields(8, "Milhouse", "VanHouten", false, 1, 5.1, uuid.MustParse("00000000-0000-0000-0000-000000000008"), 677), newPeopleRow(9, "Clancey", "Wiggum"), newPeopleRow(10, "Montgomery", "Burns"), newPeopleRow(11, "Ned", "Flanders"), - ) + )...) - expectedEpisodes = append(expectedEpisodes, AllEpsRows...) - expectedEpisodes = append(expectedEpisodes, + expectedEpisodes = append(expectedEpisodes, AllEpsSqlRows...) + expectedEpisodes = append(expectedEpisodes, ToSqlRows(EpisodesTestSchema, newEpsRow(5, "Bart the General"), newEpsRow(6, "Moaning Lisa"), newEpsRow(7, "The Call of the Simpsons"), newEpsRow(8, "The Telltale Head"), newEpsRow(9, "Life on the Fast Lane"), - ) + )...) - expectedAppearances = append(expectedAppearances, AllAppsRows...) - expectedAppearances = append(expectedAppearances, + expectedAppearances = append(expectedAppearances, AllAppsSqlRows...) + expectedAppearances = append(expectedAppearances, ToSqlRows(AppearancesTestSchema, newAppsRow(7, 5), newAppsRow(7, 6), newAppsRow(8, 7), @@ -128,7 +128,7 @@ func TestSqlBatchInserts(t *testing.T) { newAppsRow(10, 5), newAppsRow(10, 6), newAppsRow(11, 9), - ) + )...) root, err = db.GetRoot(sqlCtx) require.NoError(t, err) @@ -139,9 +139,9 @@ func TestSqlBatchInserts(t *testing.T) { allAppearanceRows, err = GetAllRows(root, AppearancesTableName) require.NoError(t, err) - assertRowSetsEqual(t, expectedPeople, allPeopleRows) - assertRowSetsEqual(t, expectedEpisodes, allEpsRows) - assertRowSetsEqual(t, expectedAppearances, allAppearanceRows) + assertRowSetsEqual(t, PeopleTestSchema, expectedPeople, allPeopleRows) + assertRowSetsEqual(t, EpisodesTestSchema, expectedEpisodes, allEpsRows) + assertRowSetsEqual(t, AppearancesTestSchema, expectedAppearances, allAppearanceRows) } func TestSqlBatchInsertIgnoreReplace(t *testing.T) { @@ -193,14 +193,10 @@ func TestSqlBatchInsertIgnoreReplace(t *testing.T) { allPeopleRows, err = GetAllRows(root, PeopleTableName) assert.NoError(t, err) - assertRowSetsEqual(t, expectedPeople, allPeopleRows) + assertRowSetsEqual(t, PeopleTestSchema, ToSqlRows(PeopleTestSchema, expectedPeople...), allPeopleRows) } func TestSqlBatchInsertErrors(t *testing.T) { - if types.Format_Default != types.Format_LD_1 { - t.Skip() // todo: convert to enginetests - } - ctx := context.Background() dEnv := CreateTestDatabase(t) root, err := dEnv.WorkingRoot(ctx) @@ -230,64 +226,31 @@ func TestSqlBatchInsertErrors(t *testing.T) { assert.NoError(t, db.Flush(sqlCtx)) } -func assertRowSetsEqual(t *testing.T, expected, actual []row.Row) { - equal, diff := rowSetsEqual(expected, actual) - assert.True(t, equal, diff) +func assertRowSetsEqual(t *testing.T, sch schema.Schema, expected, actual []sql.Row) { + require.Equal(t, len(expected), len(actual), + "Sets have different sizes: expected %d, was %d", len(expected), len(actual)) + sqlSch, err := sqlutil.FromDoltSchema("", sch) + require.NoError(t, err) + sortSqlRows(t, sqlSch.Schema, expected) + sortSqlRows(t, sqlSch.Schema, actual) + if !assert.Equal(t, expected, actual) { + t.Skip("") + } } -// Returns whether the two slices of rows contain the same elements using set semantics (no duplicates), and an error -// string if they aren't. -func rowSetsEqual(expected, actual []row.Row) (bool, string) { - if len(expected) != len(actual) { - return false, fmt.Sprintf("Sets have different sizes: expected %d, was %d", len(expected), len(actual)) - } - - for _, ex := range expected { - if !containsRow(actual, ex) { - return false, fmt.Sprintf("Missing row: %v", ex) +func sortSqlRows(t *testing.T, sch sql.Schema, rows []sql.Row) { + sort.Slice(rows, func(i, j int) bool { + l, r := rows[i], rows[j] + for idx, col := range sch { + c, err := col.Type.Compare(l[idx], r[idx]) + require.NoError(t, err) + if c == 0 { + continue + } + return c < 0 } - } - - return true, "" -} - -func containsRow(rs []row.Row, r row.Row) bool { - for _, r2 := range rs { - equal, _ := rowsEqual(r, r2) - if equal { - return true - } - } - return false -} - -func rowsEqual(expected, actual row.Row) (bool, string) { - er, ar := make(map[uint64]types.Value), make(map[uint64]types.Value) - _, err := expected.IterCols(func(t uint64, v types.Value) (bool, error) { - er[t] = v - return false, nil + return false }) - - if err != nil { - panic(err) - } - - _, err = actual.IterCols(func(t uint64, v types.Value) (bool, error) { - ar[t] = v - return false, nil - }) - - if err != nil { - panic(err) - } - - opts := cmp.Options{cmp.AllowUnexported(), dtestutils.FloatComparer, dtestutils.TimestampComparer} - eq := cmp.Equal(er, ar, opts) - var diff string - if !eq { - diff = cmp.Diff(er, ar, opts) - } - return eq, diff } func newPeopleRow(id int, firstName, lastName string) row.Row { diff --git a/go/libraries/doltcore/sqle/sqlddl_test.go b/go/libraries/doltcore/sqle/sqlddl_test.go index 26d2020787..5412834ec5 100644 --- a/go/libraries/doltcore/sqle/sqlddl_test.go +++ b/go/libraries/doltcore/sqle/sqlddl_test.go @@ -782,9 +782,6 @@ func TestRenameTableStatements(t *testing.T) { } func TestAlterSystemTables(t *testing.T) { - if types.Format_Default != types.Format_LD_1 { - t.Skip("") // todo: convert to enginetests - } systemTableNames := []string{"dolt_log", "dolt_history_people", "dolt_diff_people", "dolt_commit_diff_people"} // "dolt_docs", reservedTableNames := []string{"dolt_schemas", "dolt_query_catalog"} @@ -794,15 +791,12 @@ func TestAlterSystemTables(t *testing.T) { dtestutils.CreateEmptyTestTable(t, dEnv, "dolt_docs", doltdb.DocsSchema) dtestutils.CreateEmptyTestTable(t, dEnv, doltdb.SchemasTableName, SchemasTableSchema()) - dtestutils.CreateTestTable(t, dEnv, "dolt_docs", - doltdb.DocsSchema, - NewRow(types.String("LICENSE.md"), types.String("A license"))) - dtestutils.CreateTestTable(t, dEnv, doltdb.DoltQueryCatalogTableName, - dtables.DoltQueryCatalogSchema, - NewRow(types.String("abc123"), types.Uint(1), types.String("example"), types.String("select 2+2 from dual"), types.String("description"))) - dtestutils.CreateTestTable(t, dEnv, doltdb.SchemasTableName, - SchemasTableSchema(), - NewRowWithPks([]types.Value{types.String("view"), types.String("name")}, types.String("select 2+2 from dual"))) + CreateTestTable(t, dEnv, "dolt_docs", doltdb.DocsSchema, + "INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')") + CreateTestTable(t, dEnv, doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema, + "INSERT INTO dolt_query_catalog VALUES ('abc123', 1, 'example', 'select 2+2 from dual', 'description')") + CreateTestTable(t, dEnv, doltdb.SchemasTableName, SchemasTableSchema(), + "INSERT INTO dolt_schemas (type, name, fragment, id) VALUES ('view', 'name', 'select 2+2 from dual', 1)") } t.Run("Create", func(t *testing.T) { diff --git a/go/libraries/doltcore/sqle/sqldelete_test.go b/go/libraries/doltcore/sqle/sqldelete_test.go index 0fbcfccaee..0381dcd9e9 100644 --- a/go/libraries/doltcore/sqle/sqldelete_test.go +++ b/go/libraries/doltcore/sqle/sqldelete_test.go @@ -25,7 +25,6 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dtables" - "github.com/dolthub/dolt/go/store/types" ) // Set to the name of a single test to run just that test, useful for debugging @@ -180,9 +179,6 @@ func TestExecuteDelete(t *testing.T) { } func TestExecuteDeleteSystemTables(t *testing.T) { - if types.Format_Default != types.Format_LD_1 { - t.Skip() // todo: convert to enginetest - } for _, test := range systemTableDeleteTests { t.Run(test.Name, func(t *testing.T) { testDeleteQuery(t, test) @@ -193,9 +189,8 @@ func TestExecuteDeleteSystemTables(t *testing.T) { var systemTableDeleteTests = []DeleteTest{ { Name: "delete dolt_docs", - AdditionalSetup: CreateTableFn("dolt_docs", - doltdb.DocsSchema, - NewRow(types.String("LICENSE.md"), types.String("A license"))), + AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema, + "INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')"), DeleteQuery: "delete from dolt_docs where doc_name = 'LICENSE.md'", SelectQuery: "select * from dolt_docs", ExpectedRows: []sql.Row{}, @@ -203,9 +198,8 @@ var systemTableDeleteTests = []DeleteTest{ }, { Name: "delete dolt_query_catalog", - AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, - dtables.DoltQueryCatalogSchema, - NewRow(types.String("abc123"), types.Uint(1), types.String("example"), types.String("select 2+2 from dual"), types.String("description"))), + AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema, + "INSERT INTO dolt_query_catalog VALUES ('abc123', 1, 'example', 'select 2+2 from dual', 'description')"), DeleteQuery: "delete from dolt_query_catalog", SelectQuery: "select * from dolt_query_catalog", ExpectedRows: ToSqlRows(dtables.DoltQueryCatalogSchema), @@ -213,9 +207,8 @@ var systemTableDeleteTests = []DeleteTest{ }, { Name: "delete dolt_schemas", - AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, - SchemasTableSchema(), - NewRowWithPks([]types.Value{types.String("view"), types.String("name")}, types.String("select 2+2 from dual"))), + AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(), + "INSERT INTO dolt_schemas (type, name, fragment, id) VALUES ('view', 'name', 'select 2+2 from dual', 1)"), DeleteQuery: "delete from dolt_schemas", SelectQuery: "select * from dolt_schemas", ExpectedRows: ToSqlRows(dtables.DoltQueryCatalogSchema), diff --git a/go/libraries/doltcore/sqle/sqlinsert_test.go b/go/libraries/doltcore/sqle/sqlinsert_test.go index b48bb85f34..d3d5ef709d 100644 --- a/go/libraries/doltcore/sqle/sqlinsert_test.go +++ b/go/libraries/doltcore/sqle/sqlinsert_test.go @@ -377,7 +377,7 @@ func TestExecuteInsert(t *testing.T) { var systemTableInsertTests = []InsertTest{ { Name: "insert into dolt_docs", - AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema), + AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema, ""), InsertQuery: "insert into dolt_docs (doc_name, doc_text) values ('README.md', 'Some text')", SelectQuery: "select * from dolt_docs", ExpectedRows: []sql.Row{{"README.md", "Some text"}}, @@ -385,14 +385,8 @@ var systemTableInsertTests = []InsertTest{ }, { Name: "insert into dolt_query_catalog", - AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, - dtables.DoltQueryCatalogSchema, - NewRowWithSchema(dtables.DoltQueryCatalogSchema, - types.String("existingEntry"), - types.Uint(2), - types.String("example"), - types.String("select 2+2 from dual"), - types.String("description"))), + AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema, + "INSERT INTO dolt_query_catalog VALUES ('existingEntry', 2, 'example', 'select 2+2 from dual', 'description')"), InsertQuery: "insert into dolt_query_catalog (id, display_order, name, query, description) values ('abc123', 1, 'example', 'select 1+1 from dual', 'description')", SelectQuery: "select * from dolt_query_catalog ORDER BY id", ExpectedRows: ToSqlRows(CompressSchema(dtables.DoltQueryCatalogSchema), @@ -403,7 +397,7 @@ var systemTableInsertTests = []InsertTest{ }, { Name: "insert into dolt_schemas", - AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema()), + AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(), ""), InsertQuery: "insert into dolt_schemas (id, type, name, fragment) values (1, 'view', 'name', 'select 2+2 from dual')", SelectQuery: "select * from dolt_schemas ORDER BY id", ExpectedRows: ToSqlRows(CompressSchema(SchemasTableSchema()), @@ -414,9 +408,6 @@ var systemTableInsertTests = []InsertTest{ } func TestInsertIntoSystemTables(t *testing.T) { - if types.Format_Default != types.Format_LD_1 { - t.Skip() // todo: convert to enginetest - } for _, test := range systemTableInsertTests { t.Run(test.Name, func(t *testing.T) { testInsertQuery(t, test) diff --git a/go/libraries/doltcore/sqle/sqlreplace_test.go b/go/libraries/doltcore/sqle/sqlreplace_test.go index fcc7324931..d727173af5 100644 --- a/go/libraries/doltcore/sqle/sqlreplace_test.go +++ b/go/libraries/doltcore/sqle/sqlreplace_test.go @@ -252,9 +252,8 @@ func TestExecuteReplace(t *testing.T) { var systemTableReplaceTests = []ReplaceTest{ { Name: "replace into dolt_docs", - AdditionalSetup: CreateTableFn("dolt_docs", - doltdb.DocsSchema, - NewRow(types.String("LICENSE.md"), types.String("A license"))), + AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema, + "INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')"), ReplaceQuery: "replace into dolt_docs (doc_name, doc_text) values ('LICENSE.md', 'Some text')", SelectQuery: "select * from dolt_docs", ExpectedRows: []sql.Row{{"LICENSE.md", "Some text"}}, @@ -262,9 +261,8 @@ var systemTableReplaceTests = []ReplaceTest{ }, { Name: "replace into dolt_query_catalog", - AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, - dtables.DoltQueryCatalogSchema, - NewRow(types.String("existingEntry"), types.Uint(1), types.String("example"), types.String("select 2+2 from dual"), types.String("description"))), + AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema, + "INSERT INTO dolt_query_catalog VALUES ('existingEntry', 1, 'example', 'select 2+2 from dual', 'description')"), ReplaceQuery: "replace into dolt_query_catalog (id, display_order, name, query, description) values ('existingEntry', 1, 'example', 'select 1+1 from dual', 'description')", SelectQuery: "select * from dolt_query_catalog", ExpectedRows: ToSqlRows(CompressSchema(dtables.DoltQueryCatalogSchema), @@ -274,9 +272,8 @@ var systemTableReplaceTests = []ReplaceTest{ }, { Name: "replace into dolt_schemas", - AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, - SchemasTableSchema(), - NewRowWithSchema(SchemasTableSchema(), types.String("view"), types.String("name"), types.String("select 2+2 from dual"), types.Int(1), types.NullValue)), + AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(), + "INSERT INTO dolt_schemas VALUES ('view', 'name', 'select 2+2 from dual', 1, NULL)"), ReplaceQuery: "replace into dolt_schemas (id, type, name, fragment) values ('1', 'view', 'name', 'select 1+1 from dual')", SelectQuery: "select type, name, fragment, id, extra from dolt_schemas", ExpectedRows: []sql.Row{{"view", "name", "select 1+1 from dual", int64(1), nil}}, @@ -285,9 +282,6 @@ var systemTableReplaceTests = []ReplaceTest{ } func TestReplaceIntoSystemTables(t *testing.T) { - if types.Format_Default != types.Format_LD_1 { - t.Skip() // todo: convert to enginetest - } for _, test := range systemTableReplaceTests { t.Run(test.Name, func(t *testing.T) { testReplaceQuery(t, test) diff --git a/go/libraries/doltcore/sqle/sqlselect_test.go b/go/libraries/doltcore/sqle/sqlselect_test.go index 69d6512293..6604cb3b25 100644 --- a/go/libraries/doltcore/sqle/sqlselect_test.go +++ b/go/libraries/doltcore/sqle/sqlselect_test.go @@ -1283,12 +1283,8 @@ func TestJoins(t *testing.T) { var systemTableSelectTests = []SelectTest{ { Name: "select from dolt_docs", - AdditionalSetup: CreateTableFn("dolt_docs", - doltdb.DocsSchema, - NewRowWithSchema(doltdb.DocsSchema, - types.String("LICENSE.md"), - types.String("A license")), - ), + AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema, + "INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')"), Query: "select * from dolt_docs", ExpectedRows: ToSqlRows(CompressSchema(doltdb.DocsSchema), NewRow(types.String("LICENSE.md"), types.String("A license"))), @@ -1296,15 +1292,8 @@ var systemTableSelectTests = []SelectTest{ }, { Name: "select from dolt_query_catalog", - AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, - dtables.DoltQueryCatalogSchema, - NewRowWithSchema(dtables.DoltQueryCatalogSchema, - types.String("existingEntry"), - types.Uint(2), - types.String("example"), - types.String("select 2+2 from dual"), - types.String("description")), - ), + AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema, + "INSERT INTO dolt_query_catalog VALUES ('existingEntry', 2, 'example', 'select 2+2 from dual', 'description')"), Query: "select * from dolt_query_catalog", ExpectedRows: ToSqlRows(CompressSchema(dtables.DoltQueryCatalogSchema), NewRow(types.String("existingEntry"), types.Uint(2), types.String("example"), types.String("select 2+2 from dual"), types.String("description")), @@ -1313,19 +1302,10 @@ var systemTableSelectTests = []SelectTest{ }, { Name: "select from dolt_schemas", - AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, - SchemasTableSchema(), - NewRowWithSchema(SchemasTableSchema(), - types.String("view"), - types.String("name"), - types.String("select 2+2 from dual"), - types.Int(1), - CreateTestJSON(), - )), - Query: "select * from dolt_schemas", - ExpectedRows: ToSqlRows(CompressSchema(SchemasTableSchema()), - NewRow(types.String("view"), types.String("name"), types.String("select 2+2 from dual"), types.Int(1), CreateTestJSON()), - ), + AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(), + `INSERT INTO dolt_schemas VALUES ('view', 'name', 'select 2+2 from dual', 1, NULL)`), + Query: "select * from dolt_schemas", + ExpectedRows: []sql.Row{{"view", "name", "select 2+2 from dual", int64(1), nil}}, ExpectedSchema: CompressSchema(SchemasTableSchema()), }, } @@ -1338,9 +1318,6 @@ func CreateTestJSON() types.JSON { } func TestSelectSystemTables(t *testing.T) { - if types.Format_Default != types.Format_LD_1 { - t.Skip() // todo: convert to enginetest - } for _, test := range systemTableSelectTests { t.Run(test.Name, func(t *testing.T) { testSelectQuery(t, test) @@ -1421,17 +1398,10 @@ func testSelectQuery(t *testing.T, test SelectTest) { } func testSelectDiffQuery(t *testing.T, test SelectTest) { - if types.Format_Default != types.Format_LD_1 { - t.Skip("") // todo: convert to enginetests - } - validateTest(t, test) - ctx := context.Background() - cleanup := installTestCommitClock() defer cleanup() - dEnv := dtestutils.CreateTestEnv() InitializeWithHistory(t, ctx, dEnv, CreateHistory(ctx, dEnv, t)...) if test.AdditionalSetup != nil { diff --git a/go/libraries/doltcore/sqle/sqlupdate_test.go b/go/libraries/doltcore/sqle/sqlupdate_test.go index 2a98a8969d..c56300b2cc 100644 --- a/go/libraries/doltcore/sqle/sqlupdate_test.go +++ b/go/libraries/doltcore/sqle/sqlupdate_test.go @@ -27,7 +27,6 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dtables" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/json" - "github.com/dolthub/dolt/go/store/types" ) // Set to the name of a single test to run just that test, useful for debugging @@ -350,9 +349,6 @@ func TestExecuteUpdate(t *testing.T) { } func TestExecuteUpdateSystemTables(t *testing.T) { - if types.Format_Default != types.Format_LD_1 { - t.Skip() // todo: convert to enginetest - } for _, test := range systemTableUpdateTests { t.Run(test.Name, func(t *testing.T) { testUpdateQuery(t, test) @@ -363,11 +359,8 @@ func TestExecuteUpdateSystemTables(t *testing.T) { var systemTableUpdateTests = []UpdateTest{ { Name: "update dolt_docs", - AdditionalSetup: CreateTableFn("dolt_docs", - doltdb.DocsSchema, - NewRowWithSchema(doltdb.DocsSchema, - types.String("LICENSE.md"), types.String("A license"), - )), + AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema, + "INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')"), UpdateQuery: "update dolt_docs set doc_text = 'Some text';", SelectQuery: "select * from dolt_docs", ExpectedRows: []sql.Row{{"LICENSE.md", "Some text"}}, @@ -375,37 +368,20 @@ var systemTableUpdateTests = []UpdateTest{ }, { Name: "update dolt_query_catalog", - AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, - dtables.DoltQueryCatalogSchema, - NewRowWithSchema(dtables.DoltQueryCatalogSchema, - types.String("abc123"), - types.Uint(1), - types.String("example"), - types.String("select 2+2 from dual"), - types.String("description"), - )), - UpdateQuery: "update dolt_query_catalog set display_order = display_order + 1", - SelectQuery: "select * from dolt_query_catalog", - ExpectedRows: ToSqlRows(CompressSchema(dtables.DoltQueryCatalogSchema), - NewRow(types.String("abc123"), types.Uint(2), types.String("example"), types.String("select 2+2 from dual"), types.String("description"))), + AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema, + "INSERT INTO dolt_query_catalog VALUES ('abc123', 1, 'example', 'select 2+2 from dual', 'description')"), + UpdateQuery: "update dolt_query_catalog set display_order = display_order + 1", + SelectQuery: "select * from dolt_query_catalog", + ExpectedRows: []sql.Row{{"abc123", uint64(2), "example", "select 2+2 from dual", "description"}}, ExpectedSchema: CompressSchema(dtables.DoltQueryCatalogSchema), }, { Name: "update dolt_schemas", - AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, - SchemasTableSchema(), - NewRowWithSchema(SchemasTableSchema(), - types.String("view"), - types.String("name"), - types.String("select 2+2 from dual"), - types.Int(1), - CreateTestJSON(), - )), - UpdateQuery: "update dolt_schemas set type = 'not a view'", - SelectQuery: "select * from dolt_schemas", - ExpectedRows: ToSqlRows(CompressSchema(SchemasTableSchema()), - NewRow(types.String("not a view"), types.String("name"), types.String("select 2+2 from dual"), types.Int(1), CreateTestJSON()), - ), + AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(), + `INSERT INTO dolt_schemas VALUES ('view', 'name', 'select 2+2 from dual', 1, NULL)`), + UpdateQuery: "update dolt_schemas set type = 'not a view'", + SelectQuery: "select * from dolt_schemas", + ExpectedRows: []sql.Row{{"not a view", "name", "select 2+2 from dual", int64(1), nil}}, ExpectedSchema: CompressSchema(SchemasTableSchema()), }, } diff --git a/go/libraries/doltcore/sqle/table_editor_fk_test.go b/go/libraries/doltcore/sqle/table_editor_fk_test.go index b286845239..35600daaa9 100644 --- a/go/libraries/doltcore/sqle/table_editor_fk_test.go +++ b/go/libraries/doltcore/sqle/table_editor_fk_test.go @@ -17,7 +17,6 @@ package sqle import ( "context" "fmt" - "io" "math" "reflect" "sort" @@ -29,15 +28,8 @@ import ( "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" - "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" "github.com/dolthub/dolt/go/libraries/doltcore/env" - "github.com/dolthub/dolt/go/libraries/doltcore/row" - "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/libraries/doltcore/sqle/index" - "github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil" - "github.com/dolthub/dolt/go/store/types" - "github.com/dolthub/dolt/go/store/val" ) func setupEditorFkTest(t *testing.T) (*env.DoltEnv, *doltdb.RootValue) { @@ -665,7 +657,8 @@ func assertTableEditorRows(t *testing.T, root *doltdb.RootValue, expected []sql. var sqlRows []sql.Row if len(expected) > 0 { - sqlRows = sqlRowsFromDurableIndex(t, rows, sch) + sqlRows, err = SqlRowsFromDurableIndex(rows, sch) + require.NoError(t, err) expected = sortInt64Rows(convertSqlRowToInt64(expected)) sqlRows = sortInt64Rows(convertSqlRowToInt64(sqlRows)) if !assert.Equal(t, expected, sqlRows) { @@ -708,7 +701,8 @@ func assertTableEditorRows(t *testing.T, root *doltdb.RootValue, expected []sql. expectedIndexRows = sortInt64Rows(expectedIndexRows) if len(expectedIndexRows) > 0 { - sqlRows = sqlRowsFromDurableIndex(t, indexRowData, indexSch) + sqlRows, err = SqlRowsFromDurableIndex(indexRowData, indexSch) + require.NoError(t, err) expected = sortInt64Rows(convertSqlRowToInt64(expected)) sqlRows = sortInt64Rows(convertSqlRowToInt64(sqlRows)) if !reflect.DeepEqual(expectedIndexRows, sqlRows) { @@ -883,67 +877,3 @@ ALTER TABLE three ADD FOREIGN KEY (v1, v2) REFERENCES two(v1, v2) ON DELETE CASC }) } } - -func sqlRowsFromDurableIndex(t *testing.T, idx durable.Index, sch schema.Schema) []sql.Row { - ctx := context.Background() - var sqlRows []sql.Row - if types.Format_Default == types.Format_DOLT { - rowData := durable.ProllyMapFromIndex(idx) - kd, vd := rowData.Descriptors() - iter, err := rowData.IterAll(ctx) - require.NoError(t, err) - for { - var k, v val.Tuple - k, v, err = iter.Next(ctx) - if err == io.EOF { - break - } - require.NoError(t, err) - sqlRow, err := sqlRowFromTuples(sch, kd, vd, k, v) - require.NoError(t, err) - sqlRows = append(sqlRows, sqlRow) - } - - } else { - // types.Format_LD_1 and types.Format_DOLT_DEV - rowData := durable.NomsMapFromIndex(idx) - _ = rowData.IterAll(ctx, func(key, value types.Value) error { - r, err := row.FromNoms(sch, key.(types.Tuple), value.(types.Tuple)) - assert.NoError(t, err) - sqlRow, err := sqlutil.DoltRowToSqlRow(r, sch) - assert.NoError(t, err) - sqlRows = append(sqlRows, sqlRow) - return nil - }) - } - return sqlRows -} - -func sqlRowFromTuples(sch schema.Schema, kd, vd val.TupleDesc, k, v val.Tuple) (sql.Row, error) { - var err error - ctx := context.Background() - r := make(sql.Row, sch.GetAllCols().Size()) - keyless := schema.IsKeyless(sch) - - for i, col := range sch.GetAllCols().GetColumns() { - pos, ok := sch.GetPKCols().TagToIdx[col.Tag] - if ok { - r[i], err = index.GetField(ctx, kd, pos, k, nil) - if err != nil { - return nil, err - } - } - - pos, ok = sch.GetNonPKCols().TagToIdx[col.Tag] - if keyless { - pos += 1 // compensate for cardinality field - } - if ok { - r[i], err = index.GetField(ctx, vd, pos, v, nil) - if err != nil { - return nil, err - } - } - } - return r, nil -} diff --git a/go/libraries/doltcore/sqle/testdata.go b/go/libraries/doltcore/sqle/testdata.go index c190927e33..67efe3b43c 100644 --- a/go/libraries/doltcore/sqle/testdata.go +++ b/go/libraries/doltcore/sqle/testdata.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/dolthub/go-mysql-server/sql" "github.com/google/uuid" "github.com/stretchr/testify/require" @@ -314,43 +315,24 @@ func MutateRow(sch schema.Schema, r row.Row, tagsAndVals ...interface{}) row.Row return mutated } -func GetAllRows(root *doltdb.RootValue, tableName string) ([]row.Row, error) { +func GetAllRows(root *doltdb.RootValue, tableName string) ([]sql.Row, error) { ctx := context.Background() table, _, err := root.GetTable(ctx, tableName) - if err != nil { return nil, err } - rowData, err := table.GetNomsRowData(ctx) - + rowIdx, err := table.GetRowData(ctx) if err != nil { return nil, err } sch, err := table.GetSchema(ctx) - if err != nil { return nil, err } - var rows []row.Row - err = rowData.Iter(ctx, func(key, value types.Value) (stop bool, err error) { - r, err := row.FromNoms(sch, key.(types.Tuple), value.(types.Tuple)) - - if err != nil { - return false, err - } - - rows = append(rows, r) - return false, nil - }) - - if err != nil { - return nil, err - } - - return rows, nil + return SqlRowsFromDurableIndex(rowIdx, sch) } var idColTag0TypeUUID = schema.NewColumn("id", 0, types.IntKind, true) diff --git a/go/libraries/doltcore/sqle/testutil.go b/go/libraries/doltcore/sqle/testutil.go index 5af6c8c04d..88870361d7 100644 --- a/go/libraries/doltcore/sqle/testutil.go +++ b/go/libraries/doltcore/sqle/testutil.go @@ -29,15 +29,18 @@ import ( "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/row" "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/index" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil" "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" config2 "github.com/dolthub/dolt/go/libraries/utils/config" "github.com/dolthub/dolt/go/store/types" + "github.com/dolthub/dolt/go/store/val" ) // ExecuteSql executes all the SQL non-select statements given in the string against the root value given and returns @@ -426,3 +429,76 @@ func CreateTestDatabase(t *testing.T) *env.DoltEnv { require.NoError(t, err) return dEnv } + +func SqlRowsFromDurableIndex(idx durable.Index, sch schema.Schema) ([]sql.Row, error) { + ctx := context.Background() + var sqlRows []sql.Row + if types.Format_Default == types.Format_DOLT { + rowData := durable.ProllyMapFromIndex(idx) + kd, vd := rowData.Descriptors() + iter, err := rowData.IterAll(ctx) + if err != nil { + return nil, err + } + for { + var k, v val.Tuple + k, v, err = iter.Next(ctx) + if err == io.EOF { + break + } else if err != nil { + return nil, err + } + sqlRow, err := sqlRowFromTuples(sch, kd, vd, k, v) + if err != nil { + return nil, err + } + sqlRows = append(sqlRows, sqlRow) + } + + } else { + // types.Format_LD_1 and types.Format_DOLT_DEV + rowData := durable.NomsMapFromIndex(idx) + _ = rowData.IterAll(ctx, func(key, value types.Value) error { + r, err := row.FromNoms(sch, key.(types.Tuple), value.(types.Tuple)) + if err != nil { + return err + } + sqlRow, err := sqlutil.DoltRowToSqlRow(r, sch) + if err != nil { + return err + } + sqlRows = append(sqlRows, sqlRow) + return nil + }) + } + return sqlRows, nil +} + +func sqlRowFromTuples(sch schema.Schema, kd, vd val.TupleDesc, k, v val.Tuple) (sql.Row, error) { + var err error + ctx := context.Background() + r := make(sql.Row, sch.GetAllCols().Size()) + keyless := schema.IsKeyless(sch) + + for i, col := range sch.GetAllCols().GetColumns() { + pos, ok := sch.GetPKCols().TagToIdx[col.Tag] + if ok { + r[i], err = index.GetField(ctx, kd, pos, k, nil) + if err != nil { + return nil, err + } + } + + pos, ok = sch.GetNonPKCols().TagToIdx[col.Tag] + if keyless { + pos += 1 // compensate for cardinality field + } + if ok { + r[i], err = index.GetField(ctx, vd, pos, v, nil) + if err != nil { + return nil, err + } + } + } + return r, nil +} diff --git a/integration-tests/bats/sql-server-cluster.bats b/integration-tests/bats/sql-server-cluster.bats index 083295b65c..b0fab89046 100644 --- a/integration-tests/bats/sql-server-cluster.bats +++ b/integration-tests/bats/sql-server-cluster.bats @@ -131,7 +131,9 @@ cluster: # same role, new epoch server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12" # new role, new epoch - server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13');" "status\n0" + # we assert on a new connection, since the server leaves the previous connection in an permanent error state. + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13" # Server comes back up with latest assumed role. kill $SERVER_PID @@ -205,6 +207,7 @@ cluster: cd serverone echo " +log_level: trace user: name: dolt listener: @@ -236,6 +239,7 @@ cluster: cd ../servertwo echo " +log_level: trace user: name: dolt listener: @@ -280,3 +284,609 @@ cluster: [ "$status" -eq 0 ] [[ "$output" =~ "| 5 " ]] || false } + +@test "sql-server-cluster: booted standby server is read only" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)' + cd .. + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: standby + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + SERVER_PID=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1 + [[ "$output" =~ "Database repo1 is read-only" ]] || false + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" +} + +@test "sql-server-cluster: booted primary server is read write" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)' + cd .. + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + SERVER_PID=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n10" +} + +@test "sql-server-cluster: standby transitioned to primary becomes writable" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)' + cd .. + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: standby + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + SERVER_PID=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1 + [[ "$output" =~ "Database repo1 is read-only" ]] || false + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', 11)" "status\n0" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 0 + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n10" +} + +@test "sql-server-cluster: primary transitioned to standby becomes read only" { + # In order to gracefully transition to standby, we will need the standby running. + cd serverone + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: standby + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + serverone_pid=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + cd ../servertwo + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERTWO_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERTWO_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + servertwo_pid=$! + + wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5)" + run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', 11)" "" 1 + run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1 + [[ "$output" =~ "Database repo1 is read-only" ]] || false + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" + + kill $servertwo_pid + wait $servertwo_pid + + kill $serverone_pid + wait $serverone_pid +} + +@test "sql-server-cluster: misconfigured cluster with primaries at same epoch, both transition to detected_broken_config" { + cd serverone + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + serverone_pid=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + cd ../servertwo + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERTWO_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERTWO_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + servertwo_pid=$! + + wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 + + # Run a query to make sure everyone sees everyone... + run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "CREATE TABLE vals (i int primary key)" "" 1 + + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\ndetected_broken_config,10" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\ndetected_broken_config,10" + + kill $servertwo_pid + wait $servertwo_pid + + kill $serverone_pid + wait $serverone_pid +} + +@test "sql-server-cluster: an older primary comes up, becomes a standby and does not overwrite newer primary" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)' + cd ../ + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 15 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + serverone_pid=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + cd ../servertwo + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1),(2),(3),(4),(5)' + cd ../ + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERTWO_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERTWO_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + servertwo_pid=$! + + wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT count(*) FROM vals" "count(*)\n10" + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,15" + + kill $servertwo_pid + wait $servertwo_pid + + kill $serverone_pid + wait $serverone_pid +} + +@test "sql-server-cluster: a new primary comes up, old primary becomes a standby and has its state overwritten" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1),(2),(3),(4),(5)' + cd ../ + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + serverone_pid=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + cd ../servertwo + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)' + cd ../ + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERTWO_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 15 + remotesapi: + port: ${SERVERTWO_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + servertwo_pid=$! + + wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,15" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT count(*) FROM vals" "count(*)\n10" + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,15" + + kill $servertwo_pid + wait $servertwo_pid + + kill $serverone_pid + wait $serverone_pid +} + +@test "sql-server-cluster: primary -> standby without the standby up fails" { + cd serverone + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + (cd repo1 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo1) + (cd repo2 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo2) + dolt sql-server --config server.yaml & + SERVER_PID=$! + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + # when we do this, we become read only for a little bit... + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '11');" "" 1 + [[ "$output" =~ "failed to transition from primary to standby gracefully" ]] || false + + # after that fails, we should still be primary and we should accept writes. + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (0);select i from vals" ";;i\n0" +} + +@test "sql-server-cluster: standby -> primary leave connection used to transition in an error state" { + cd serverone + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: standby + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + (cd repo1 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo1) + (cd repo2 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo2) + dolt sql-server --config server.yaml & + SERVER_PID=$! + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + # when we do this, we become read only for a little bit... + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '11');select 2 from dual" "" 1 + [[ "$output" =~ "this connection can no longer be used" ]] || false +} + +@test "sql-server-cluster: primary replicates to standby, fails over, new primary replicates to standby, fails over, new primary has all writes" { + cd serverone + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + serverone_pid=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + cd ../servertwo + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERTWO_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: ${SERVERTWO_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + servertwo_pid=$! + + wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (0),(1),(2),(3),(4);call dolt_assume_cluster_role('standby', 2);" ";;status\n0" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals;call dolt_assume_cluster_role('primary', 2)" "count(*)\n5;status\n0" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (5),(6),(7),(8),(9);call dolt_assume_cluster_role('standby', 3)" ";status\n0" + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals;call dolt_assume_cluster_role('primary', 3)" "count(*)\n10;status\n0" + + kill $servertwo_pid + wait $servertwo_pid + + kill $serverone_pid + wait $serverone_pid +}