mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-13 11:09:10 -05:00
Merge remote-tracking branch 'origin/main' into aaron/sqlserver-yaml-tests
This commit is contained in:
@@ -114,6 +114,7 @@ func NewSqlEngine(
|
||||
|
||||
config.ClusterController.RegisterStoredProcedures(pro)
|
||||
pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook)
|
||||
config.ClusterController.ManageDatabaseProvider(pro)
|
||||
|
||||
// Load in privileges from file, if it exists
|
||||
persister := mysql_file_handler.NewPersister(config.PrivFilePath, config.DoltCfgDirPath)
|
||||
|
||||
@@ -266,11 +266,18 @@ func Serve(
|
||||
clusterRemoteSrv.Serve(listeners)
|
||||
}()
|
||||
}
|
||||
|
||||
clusterController.ManageQueryConnections(
|
||||
mySQLServer.SessionManager().Iter,
|
||||
sqlEngine.GetUnderlyingEngine().ProcessList.Kill,
|
||||
mySQLServer.SessionManager().KillConnection,
|
||||
)
|
||||
} else {
|
||||
lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err)
|
||||
startError = err
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if ok, f := mrEnv.IsLocked(); ok {
|
||||
|
||||
@@ -16,7 +16,7 @@ require (
|
||||
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371
|
||||
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66
|
||||
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
|
||||
github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e
|
||||
github.com/dolthub/vitess v0.0.0-20220930181015-759b1acd9188
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/fatih/color v1.13.0
|
||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||
@@ -57,7 +57,7 @@ require (
|
||||
require (
|
||||
github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
|
||||
github.com/cenkalti/backoff/v4 v4.1.3
|
||||
github.com/dolthub/go-mysql-server v0.12.1-0.20220929211840-02a9c38c169f
|
||||
github.com/dolthub/go-mysql-server v0.12.1-0.20221003181623-42a4fb9ec5f8
|
||||
github.com/google/flatbuffers v2.0.6+incompatible
|
||||
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6
|
||||
github.com/mitchellh/go-ps v1.0.0
|
||||
|
||||
@@ -178,8 +178,8 @@ github.com/dolthub/flatbuffers v1.13.0-dh.1 h1:OWJdaPep22N52O/0xsUevxJ6Qfw1M2txC
|
||||
github.com/dolthub/flatbuffers v1.13.0-dh.1/go.mod h1:CorYGaDmXjHz1Z7i50PYXG1Ricn31GcA2wNOTFIQAKE=
|
||||
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
|
||||
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
|
||||
github.com/dolthub/go-mysql-server v0.12.1-0.20220929211840-02a9c38c169f h1:+7jgFuHeF3Vj2eG7thWUprQNFz18Dg6f1Y/yow35KVk=
|
||||
github.com/dolthub/go-mysql-server v0.12.1-0.20220929211840-02a9c38c169f/go.mod h1:Ndof+jmKE/AISRWgeyx+RUvNlAtMOPSUzTM/iCOfx70=
|
||||
github.com/dolthub/go-mysql-server v0.12.1-0.20221003181623-42a4fb9ec5f8 h1:PcD6hKf6GkK+pNXZL+aD5rP4iyX8yjmXY21Puh5iF9U=
|
||||
github.com/dolthub/go-mysql-server v0.12.1-0.20221003181623-42a4fb9ec5f8/go.mod h1:qQReFVsJ0CPJLxX3lAuZxYWi6T4twzErI//D2bSSttY=
|
||||
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371 h1:oyPHJlzumKta1vnOQqUnfdz+pk3EmnHS3Nd0cCT0I2g=
|
||||
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms=
|
||||
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=
|
||||
@@ -188,8 +188,8 @@ github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 h1:WRPDbpJWEnPxP
|
||||
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66/go.mod h1:N5ZIbMGuDUpTpOFQ7HcsN6WSIpTGQjHP+Mz27AfmAgk=
|
||||
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE=
|
||||
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
|
||||
github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e h1:vC5OgmUm1Dd8vQP1YqgRvYMbHvrLNdQkd3S7udbS3BQ=
|
||||
github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e/go.mod h1:oVFIBdqMFEkt4Xz2fzFJBNtzKhDEjwdCF0dzde39iKs=
|
||||
github.com/dolthub/vitess v0.0.0-20220930181015-759b1acd9188 h1:TJVpwZ8XY/fNK1ZgV9QMw+DE6WKJRi28Ruilk/qTmOU=
|
||||
github.com/dolthub/vitess v0.0.0-20220930181015-759b1acd9188/go.mod h1:oVFIBdqMFEkt4Xz2fzFJBNtzKhDEjwdCF0dzde39iKs=
|
||||
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
|
||||
@@ -16,7 +16,6 @@ package dtestutils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
@@ -28,8 +27,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -120,79 +117,6 @@ func CreateEmptyTestTable(t *testing.T, dEnv *env.DoltEnv, tableName string, sch
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// CreateTestTable creates a new test table with the name, schema, and rows given.
|
||||
func CreateTestTable(t *testing.T, dEnv *env.DoltEnv, tableName string, sch schema.Schema, rs ...row.Row) {
|
||||
imt := table.NewInMemTable(sch)
|
||||
|
||||
for _, r := range rs {
|
||||
_ = imt.AppendRow(r)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
vrw := dEnv.DoltDB.ValueReadWriter()
|
||||
ns := dEnv.DoltDB.NodeStore()
|
||||
|
||||
rowMap, err := types.NewMap(ctx, vrw)
|
||||
require.NoError(t, err)
|
||||
me := rowMap.Edit()
|
||||
for i := 0; i < imt.NumRows(); i++ {
|
||||
r, err := imt.GetRow(i)
|
||||
require.NoError(t, err)
|
||||
k, v := r.NomsMapKey(sch), r.NomsMapValue(sch)
|
||||
me.Set(k, v)
|
||||
}
|
||||
rowMap, err = me.Map(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
tbl, err := doltdb.NewNomsTable(ctx, vrw, ns, sch, rowMap, nil, nil)
|
||||
require.NoError(t, err)
|
||||
tbl, err = editor.RebuildAllIndexes(ctx, tbl, editor.TestEditorOptions(vrw))
|
||||
require.NoError(t, err)
|
||||
|
||||
sch, err = tbl.GetSchema(ctx)
|
||||
require.NoError(t, err)
|
||||
rows, err := tbl.GetRowData(ctx)
|
||||
require.NoError(t, err)
|
||||
indexes, err := tbl.GetIndexSet(ctx)
|
||||
require.NoError(t, err)
|
||||
err = putTableToWorking(ctx, dEnv, sch, rows, indexes, tableName, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func putTableToWorking(ctx context.Context, dEnv *env.DoltEnv, sch schema.Schema, rows durable.Index, indexData durable.IndexSet, tableName string, autoVal types.Value) error {
|
||||
root, err := dEnv.WorkingRoot(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", doltdb.ErrNomsIO, err)
|
||||
}
|
||||
|
||||
vrw := dEnv.DoltDB.ValueReadWriter()
|
||||
ns := dEnv.DoltDB.NodeStore()
|
||||
tbl, err := doltdb.NewTable(ctx, vrw, ns, sch, rows, indexData, autoVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newRoot, err := root.PutTable(ctx, tableName, tbl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rootHash, err := root.HashOf()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newRootHash, err := newRoot.HashOf()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rootHash == newRootHash {
|
||||
return nil
|
||||
}
|
||||
|
||||
return dEnv.UpdateWorkingRoot(ctx, newRoot)
|
||||
}
|
||||
|
||||
// MustSchema takes a variable number of columns and returns a schema.
|
||||
func MustSchema(cols ...schema.Column) schema.Schema {
|
||||
hasPKCols := false
|
||||
|
||||
@@ -17,6 +17,7 @@ package remotesrv
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
@@ -38,6 +39,8 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
var ErrUnimplemented = errors.New("unimplemented")
|
||||
|
||||
type RemoteChunkStore struct {
|
||||
HttpHost string
|
||||
csCache DBCache
|
||||
@@ -79,7 +82,10 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -133,7 +139,10 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -199,7 +208,10 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
nextPath := getRepoPath(req)
|
||||
if nextPath != repoPath {
|
||||
repoPath = nextPath
|
||||
cs = rs.getStore(logger, repoPath)
|
||||
cs, err = rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cs == nil {
|
||||
return status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
@@ -292,7 +304,10 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -341,7 +356,10 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -349,7 +367,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
|
||||
|
||||
logger.Printf("found %s", repoPath)
|
||||
|
||||
err := cs.Rebase(ctx)
|
||||
err = cs.Rebase(ctx)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred during processing of Rebace rpc of %s details: %v", repoPath, err)
|
||||
@@ -364,7 +382,10 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -385,7 +406,10 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -399,7 +423,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
|
||||
updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount)
|
||||
}
|
||||
|
||||
err := cs.AddTableFilesToManifest(ctx, updates)
|
||||
err = cs.AddTableFilesToManifest(ctx, updates)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred updating the manifest: %s", err.Error())
|
||||
@@ -426,12 +450,15 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion)
|
||||
cs, err := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
err := cs.Rebase(ctx)
|
||||
err = cs.Rebase(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -453,7 +480,10 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -522,7 +552,10 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -536,7 +569,7 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount)
|
||||
}
|
||||
|
||||
err := cs.AddTableFilesToManifest(ctx, updates)
|
||||
err = cs.AddTableFilesToManifest(ctx, updates)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred updating the manifest: %s", err.Error())
|
||||
@@ -546,18 +579,20 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
return &remotesapi.AddTableFilesResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) RemoteSrvStore {
|
||||
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) (RemoteSrvStore, error) {
|
||||
return rs.getOrCreateStore(logger, repoPath, types.Format_Default.VersionString())
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) RemoteSrvStore {
|
||||
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) (RemoteSrvStore, error) {
|
||||
cs, err := rs.csCache.Get(repoPath, nbfVerStr)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("Failed to retrieve chunkstore for %s\n", repoPath)
|
||||
if errors.Is(err, ErrUnimplemented) {
|
||||
return nil, status.Error(codes.Unimplemented, err.Error())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cs
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
var requestId int32
|
||||
|
||||
@@ -15,9 +15,15 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
)
|
||||
|
||||
var ErrServerTransitionedRolesErr = errors.New("this server transitioned cluster roles. this connection can no longer be used. please reconnect.")
|
||||
|
||||
func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureDetails {
|
||||
return sql.ExternalStoredProcedureDetails{
|
||||
Name: "dolt_assume_cluster_role",
|
||||
@@ -29,10 +35,19 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD
|
||||
},
|
||||
},
|
||||
Function: func(ctx *sql.Context, role string, epoch int) (sql.RowIter, error) {
|
||||
err := controller.setRoleAndEpoch(role, epoch)
|
||||
if role == string(RoleDetectedBrokenConfig) {
|
||||
return nil, errors.New("cannot set role to detected_broken_config; valid values are 'primary' and 'standby'")
|
||||
}
|
||||
changerole, err := controller.setRoleAndEpoch(role, epoch, true /* graceful */, int(ctx.Session.ID()))
|
||||
if err != nil {
|
||||
// We did not transition, no need to set our session to read-only, etc.
|
||||
return nil, err
|
||||
}
|
||||
if changerole {
|
||||
// We transitioned, make sure we do not run anymore queries on this session.
|
||||
ctx.Session.SetTransaction(nil)
|
||||
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerTransitionedRolesErr)
|
||||
}
|
||||
return sql.RowsToRowIter(sql.Row{0}), nil
|
||||
},
|
||||
}
|
||||
|
||||
@@ -38,7 +38,6 @@ type commithook struct {
|
||||
lgr atomic.Value // *logrus.Entry
|
||||
remotename string
|
||||
dbname string
|
||||
lout io.Writer
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
cond *sync.Cond
|
||||
@@ -48,6 +47,11 @@ type commithook struct {
|
||||
nextHeadIncomingTime time.Time
|
||||
lastSuccess time.Time
|
||||
currentError *string
|
||||
cancelReplicate func()
|
||||
|
||||
// waitNotify is set by controller when it needs to track whether the
|
||||
// commithooks are caught up with replicating to the standby.
|
||||
waitNotify func()
|
||||
|
||||
role Role
|
||||
|
||||
@@ -136,6 +140,9 @@ func (h *commithook) replicate(ctx context.Context) {
|
||||
h.attemptReplicate(ctx)
|
||||
} else {
|
||||
lgr.Tracef("cluster/commithook: background thread: waiting for signal.")
|
||||
if h.waitNotify != nil {
|
||||
h.waitNotify()
|
||||
}
|
||||
h.cond.Wait()
|
||||
lgr.Tracef("cluster/commithook: background thread: woken up.")
|
||||
}
|
||||
@@ -144,15 +151,28 @@ func (h *commithook) replicate(ctx context.Context) {
|
||||
|
||||
// called with h.mu locked.
|
||||
func (h *commithook) shouldReplicate() bool {
|
||||
if h.role != RolePrimary {
|
||||
return false
|
||||
}
|
||||
if h.nextHead == h.lastPushedHead {
|
||||
if h.isCaughtUp() {
|
||||
return false
|
||||
}
|
||||
return (h.nextPushAttempt == (time.Time{}) || time.Now().After(h.nextPushAttempt))
|
||||
}
|
||||
|
||||
// called with h.mu locked. Returns true if the standby is true-d up, false
|
||||
// otherwise. Different from shouldReplicate() in that it does not care about
|
||||
// nextPushAttempt, for example. Used in Controller.waitForReplicate.
|
||||
func (h *commithook) isCaughtUp() bool {
|
||||
if h.role != RolePrimary {
|
||||
return true
|
||||
}
|
||||
return h.nextHead == h.lastPushedHead
|
||||
}
|
||||
|
||||
func (h *commithook) isCaughtUpLocking() bool {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.isCaughtUp()
|
||||
}
|
||||
|
||||
// called with h.mu locked.
|
||||
func (h *commithook) primaryNeedsInit() bool {
|
||||
return h.role == RolePrimary && h.nextHead == (hash.Hash{})
|
||||
@@ -168,6 +188,13 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
toPush := h.nextHead
|
||||
incomingTime := h.nextHeadIncomingTime
|
||||
destDB := h.destDB
|
||||
ctx, h.cancelReplicate = context.WithCancel(ctx)
|
||||
defer func() {
|
||||
if h.cancelReplicate != nil {
|
||||
h.cancelReplicate()
|
||||
}
|
||||
h.cancelReplicate = nil
|
||||
}()
|
||||
h.mu.Unlock()
|
||||
|
||||
if destDB == nil {
|
||||
@@ -183,6 +210,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
if toPush == h.nextHead {
|
||||
h.nextPushAttempt = time.Now().Add(1 * time.Second)
|
||||
}
|
||||
h.cancelReplicate = nil
|
||||
return
|
||||
}
|
||||
lgr.Tracef("cluster/commithook: fetched destDB")
|
||||
@@ -208,20 +236,22 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
if err == nil {
|
||||
h.currentError = nil
|
||||
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
|
||||
h.lastPushedHead = toPush
|
||||
h.lastSuccess = incomingTime
|
||||
h.nextPushAttempt = time.Time{}
|
||||
} else {
|
||||
h.currentError = new(string)
|
||||
*h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err)
|
||||
lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
|
||||
// add some delay if a new head didn't come in while we were pushing.
|
||||
if toPush == h.nextHead {
|
||||
// TODO: We could add some backoff here.
|
||||
h.nextPushAttempt = time.Now().Add(1 * time.Second)
|
||||
if h.role == RolePrimary {
|
||||
if err == nil {
|
||||
h.currentError = nil
|
||||
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
|
||||
h.lastPushedHead = toPush
|
||||
h.lastSuccess = incomingTime
|
||||
h.nextPushAttempt = time.Time{}
|
||||
} else {
|
||||
h.currentError = new(string)
|
||||
*h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err)
|
||||
lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
|
||||
// add some delay if a new head didn't come in while we were pushing.
|
||||
if toPush == h.nextHead {
|
||||
// TODO: We could add some backoff here.
|
||||
h.nextPushAttempt = time.Now().Add(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -279,7 +309,7 @@ func (h *commithook) tick(ctx context.Context) {
|
||||
func (h *commithook) recordSuccessfulRemoteSrvCommit() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if h.role == RolePrimary {
|
||||
if h.role != RoleStandby {
|
||||
return
|
||||
}
|
||||
h.lastSuccess = time.Now()
|
||||
@@ -298,9 +328,24 @@ func (h *commithook) setRole(role Role) {
|
||||
h.nextPushAttempt = time.Time{}
|
||||
h.role = role
|
||||
h.lgr.Store(h.rootLgr.WithField(logFieldRole, string(role)))
|
||||
if h.cancelReplicate != nil {
|
||||
h.cancelReplicate()
|
||||
h.cancelReplicate = nil
|
||||
}
|
||||
if role == RoleDetectedBrokenConfig {
|
||||
h.currentError = &errDetectedBrokenConfigStr
|
||||
}
|
||||
h.cond.Signal()
|
||||
}
|
||||
|
||||
func (h *commithook) setWaitNotify(f func()) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.waitNotify = f
|
||||
}
|
||||
|
||||
var errDetectedBrokenConfigStr = "error: more than one server was configured as primary in the same epoch. this server has stopped accepting writes. choose a primary in the cluster and call dolt_assume_cluster_role() on servers in the cluster to start replication at a higher epoch"
|
||||
|
||||
// Execute on this commithook updates the target root hash we're attempting to
|
||||
// replicate and wakes the replication thread.
|
||||
func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
|
||||
@@ -314,6 +359,11 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat
|
||||
}
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
lgr = h.logger()
|
||||
if h.role != RolePrimary {
|
||||
lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID())
|
||||
return nil
|
||||
}
|
||||
if root != h.nextHead {
|
||||
lgr.Tracef("signaling replication thread to push new head: %v", root.String())
|
||||
h.nextHeadIncomingTime = time.Now()
|
||||
@@ -329,7 +379,6 @@ func (h *commithook) HandleError(ctx context.Context, err error) error {
|
||||
}
|
||||
|
||||
func (h *commithook) SetLogger(ctx context.Context, wr io.Writer) error {
|
||||
h.lout = wr
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -16,9 +16,11 @@ package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -38,6 +40,7 @@ type Role string
|
||||
|
||||
const RolePrimary Role = "primary"
|
||||
const RoleStandby Role = "standby"
|
||||
const RoleDetectedBrokenConfig Role = "detected_broken_config"
|
||||
|
||||
const PersistentConfigPrefix = "sqlserver.cluster"
|
||||
|
||||
@@ -52,12 +55,23 @@ type Controller struct {
|
||||
sinterceptor serverinterceptor
|
||||
cinterceptor clientinterceptor
|
||||
lgr *logrus.Logger
|
||||
|
||||
provider dbProvider
|
||||
iterSessions IterSessions
|
||||
killQuery func(uint32)
|
||||
killConnection func(uint32) error
|
||||
}
|
||||
|
||||
type sqlvars interface {
|
||||
AddSystemVariables(sysVars []sql.SystemVariable)
|
||||
}
|
||||
|
||||
// We can manage certain aspects of the exposed databases on the server through
|
||||
// this.
|
||||
type dbProvider interface {
|
||||
SetIsStandby(bool)
|
||||
}
|
||||
|
||||
type procedurestore interface {
|
||||
Register(sql.ExternalStoredProcedureDetails)
|
||||
}
|
||||
@@ -84,10 +98,15 @@ func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig)
|
||||
commithooks: make([]*commithook, 0),
|
||||
lgr: lgr,
|
||||
}
|
||||
roleSetter := func(role string, epoch int) {
|
||||
ret.setRoleAndEpoch(role, epoch, false /* graceful */, -1 /* saveConnID */)
|
||||
}
|
||||
ret.sinterceptor.lgr = lgr.WithFields(logrus.Fields{})
|
||||
ret.sinterceptor.setRole(role, epoch)
|
||||
ret.sinterceptor.roleSetter = roleSetter
|
||||
ret.cinterceptor.lgr = lgr.WithFields(logrus.Fields{})
|
||||
ret.cinterceptor.setRole(role, epoch)
|
||||
ret.cinterceptor.roleSetter = roleSetter
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
@@ -122,6 +141,29 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.
|
||||
return nil
|
||||
}
|
||||
|
||||
type IterSessions func(func(sql.Session) (bool, error)) error
|
||||
|
||||
func (c *Controller) ManageDatabaseProvider(p dbProvider) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.provider = p
|
||||
c.setProviderIsStandby(c.role != RolePrimary)
|
||||
}
|
||||
|
||||
func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery func(uint32), killConnection func(uint32) error) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.iterSessions = iterSessions
|
||||
c.killQuery = killQuery
|
||||
c.killConnection = killConnection
|
||||
}
|
||||
|
||||
func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv) ([]*commithook, error) {
|
||||
ttfdir, err := denv.TempTableFilesDir()
|
||||
if err != nil {
|
||||
@@ -210,6 +252,7 @@ func (c *Controller) persistVariables() error {
|
||||
func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) (Role, int, error) {
|
||||
toset := make(map[string]string)
|
||||
persistentRole := pCfg.GetStringOrDefault(DoltClusterRoleVariable, "")
|
||||
var roleFromPersistentConfig bool
|
||||
persistentEpoch := pCfg.GetStringOrDefault(DoltClusterRoleEpochVariable, "")
|
||||
if persistentRole == "" {
|
||||
if cfg.BootstrapRole() != "" {
|
||||
@@ -221,6 +264,7 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea
|
||||
}
|
||||
toset[DoltClusterRoleVariable] = persistentRole
|
||||
} else {
|
||||
roleFromPersistentConfig = true
|
||||
lgr.Tracef("cluster/controller: persisted cluster role is %s", persistentRole)
|
||||
}
|
||||
if persistentEpoch == "" {
|
||||
@@ -231,7 +275,10 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea
|
||||
lgr.Tracef("cluster/controller: persisted cluster role epoch is %s", persistentEpoch)
|
||||
}
|
||||
if persistentRole != string(RolePrimary) && persistentRole != string(RoleStandby) {
|
||||
return "", 0, fmt.Errorf("persisted role %s.%s = %s must be \"primary\" or \"secondary\"", PersistentConfigPrefix, DoltClusterRoleVariable, persistentRole)
|
||||
isallowed := persistentRole == string(RoleDetectedBrokenConfig) && roleFromPersistentConfig
|
||||
if !isallowed {
|
||||
return "", 0, fmt.Errorf("persisted role %s.%s = %s must be \"primary\" or \"secondary\"", PersistentConfigPrefix, DoltClusterRoleVariable, persistentRole)
|
||||
}
|
||||
}
|
||||
epochi, err := strconv.Atoi(persistentEpoch)
|
||||
if err != nil {
|
||||
@@ -246,39 +293,61 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea
|
||||
return Role(persistentRole), epochi, nil
|
||||
}
|
||||
|
||||
func (c *Controller) setRoleAndEpoch(role string, epoch int) error {
|
||||
func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, saveConnID int) (bool, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if epoch == c.epoch && role == string(c.role) {
|
||||
return nil
|
||||
}
|
||||
if epoch == c.epoch {
|
||||
return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role)
|
||||
}
|
||||
if epoch < c.epoch {
|
||||
return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if role != "primary" && role != "standby" {
|
||||
return fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role)
|
||||
if role != string(RolePrimary) && role != string(RoleStandby) && role != string(RoleDetectedBrokenConfig) {
|
||||
return false, fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role)
|
||||
}
|
||||
|
||||
if epoch < c.epoch {
|
||||
return false, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch)
|
||||
}
|
||||
if epoch == c.epoch {
|
||||
// This is allowed for non-graceful transitions to 'standby', which only occur from interceptors and
|
||||
// other signals that the cluster is misconfigured.
|
||||
isallowed := !graceful && (role == string(RoleStandby) || role == string(RoleDetectedBrokenConfig))
|
||||
if !isallowed {
|
||||
return false, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role)
|
||||
}
|
||||
}
|
||||
|
||||
changedrole := role != string(c.role)
|
||||
|
||||
if changedrole {
|
||||
var err error
|
||||
if role == string(RoleStandby) {
|
||||
if graceful {
|
||||
err = c.gracefulTransitionToStandby(saveConnID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
} else {
|
||||
c.immediateTransitionToStandby()
|
||||
}
|
||||
} else if role == string(RoleDetectedBrokenConfig) {
|
||||
c.immediateTransitionToStandby()
|
||||
} else {
|
||||
c.transitionToPrimary(saveConnID)
|
||||
}
|
||||
}
|
||||
|
||||
c.role = Role(role)
|
||||
c.epoch = epoch
|
||||
|
||||
if changedrole {
|
||||
// TODO: Role is transitioning...lots of stuff to do.
|
||||
}
|
||||
|
||||
c.refreshSystemVars()
|
||||
c.cinterceptor.setRole(c.role, c.epoch)
|
||||
c.sinterceptor.setRole(c.role, c.epoch)
|
||||
for _, h := range c.commithooks {
|
||||
h.setRole(c.role)
|
||||
if changedrole {
|
||||
for _, h := range c.commithooks {
|
||||
h.setRole(c.role)
|
||||
}
|
||||
}
|
||||
return c.persistVariables()
|
||||
return changedrole, c.persistVariables()
|
||||
}
|
||||
|
||||
func (c *Controller) roleAndEpoch() (Role, int) {
|
||||
@@ -339,3 +408,136 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server
|
||||
args.DBCache = remotesrvStoreCache{args.DBCache, c}
|
||||
return args
|
||||
}
|
||||
|
||||
// The order of operations is:
|
||||
// * Set all databases in database_provider to read-only.
|
||||
// * Kill all running queries in GMS.
|
||||
// * Replicate all databases to their standby remotes.
|
||||
// - If success, return success.
|
||||
// - If failure, set all databases in database_provider back to their original state. Return failure.
|
||||
//
|
||||
// saveConnID is potentially a connID of the caller to
|
||||
// dolt_assume_cluster_role(), which should not be killed with the other
|
||||
// connections. That connection will be transitioned to a terminal error state
|
||||
// after returning the results of dolt_assume_cluster_role().
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) gracefulTransitionToStandby(saveConnID int) error {
|
||||
c.setProviderIsStandby(true)
|
||||
c.killRunningQueries(saveConnID)
|
||||
// TODO: this can block with c.mu held, although we are not too
|
||||
// interested in the server proceeding gracefully while this is
|
||||
// happening.
|
||||
if err := c.waitForHooksToReplicate(); err != nil {
|
||||
c.setProviderIsStandby(false)
|
||||
c.killRunningQueries(saveConnID)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// The order of operations is:
|
||||
// * Set all databases in database_provider to read-only.
|
||||
// * Kill all running queries in GMS.
|
||||
// * Return success. NOTE: we do not attempt to replicate to the standby.
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) immediateTransitionToStandby() error {
|
||||
c.setProviderIsStandby(true)
|
||||
c.killRunningQueries(-1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// The order of operations is:
|
||||
// * Set all databases in database_provider back to their original mode: read-write or read only.
|
||||
// * Kill all running queries in GMS.
|
||||
// * Return success.
|
||||
//
|
||||
// saveConnID is potentially the connID of the caller to
|
||||
// dolt_assume_cluster_role().
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) transitionToPrimary(saveConnID int) error {
|
||||
c.setProviderIsStandby(false)
|
||||
c.killRunningQueries(saveConnID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Kills all running queries in the managed GMS engine.
|
||||
// called with c.mu held
|
||||
func (c *Controller) killRunningQueries(saveConnID int) {
|
||||
if c.iterSessions != nil {
|
||||
c.iterSessions(func(session sql.Session) (stop bool, err error) {
|
||||
if int(session.ID()) != saveConnID {
|
||||
c.killQuery(session.ID())
|
||||
c.killConnection(session.ID())
|
||||
}
|
||||
return
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// called with c.mu held
|
||||
func (c *Controller) setProviderIsStandby(standby bool) {
|
||||
if c.provider != nil {
|
||||
c.provider.SetIsStandby(standby)
|
||||
}
|
||||
}
|
||||
|
||||
const waitForHooksToReplicateTimeout = 10 * time.Second
|
||||
|
||||
// Called during a graceful transition from primary to standby. Waits until all
|
||||
// commithooks report nextHead == lastPushedHead.
|
||||
//
|
||||
// TODO: make the deadline here configurable or something.
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) waitForHooksToReplicate() error {
|
||||
caughtup := make([]bool, len(c.commithooks))
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(c.commithooks))
|
||||
for li, lch := range c.commithooks {
|
||||
i := li
|
||||
ch := lch
|
||||
if ch.isCaughtUpLocking() {
|
||||
caughtup[i] = true
|
||||
wg.Done()
|
||||
} else {
|
||||
ch.setWaitNotify(func() {
|
||||
// called with ch.mu locked.
|
||||
if !caughtup[i] && ch.isCaughtUp() {
|
||||
caughtup[i] = true
|
||||
wg.Done()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
var success bool
|
||||
select {
|
||||
case <-done:
|
||||
success = true
|
||||
case <-time.After(waitForHooksToReplicateTimeout):
|
||||
success = false
|
||||
}
|
||||
for _, ch := range c.commithooks {
|
||||
ch.setWaitNotify(nil)
|
||||
}
|
||||
// make certain we don't leak the wg.Wait goroutine in the failure case.
|
||||
for _, b := range caughtup {
|
||||
if !b {
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
if success {
|
||||
c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.")
|
||||
return nil
|
||||
} else {
|
||||
c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.")
|
||||
return errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,19 +36,20 @@ const clusterRoleEpochHeader = "x-dolt-cluster-role-epoch"
|
||||
// interceptor anytime it changes. In turn, this interceptor:
|
||||
// * adds the server's current role and epoch to the request headers for every
|
||||
// outbound request.
|
||||
// * fails all outgoing requests immediately with codes.Unavailable if the role
|
||||
// == RoleStandby, since this server should not be replicating when it believes
|
||||
// it is a standby.
|
||||
// * fails all outgoing requests immediately with codes.FailedPrecondition if
|
||||
// the role == RoleStandby, since this server should not be replicating when it
|
||||
// believes it is a standby.
|
||||
// * watches returned response headers for a situation which causes this server
|
||||
// to force downgrade from primary to standby. In particular, when a returned
|
||||
// response header asserts that the standby replica is a primary at a higher
|
||||
// epoch than this server, this incterceptor coordinates with the Controller to
|
||||
// immediately transition to standby and to stop replicating to the standby.
|
||||
type clientinterceptor struct {
|
||||
lgr *logrus.Entry
|
||||
role Role
|
||||
epoch int
|
||||
mu sync.Mutex
|
||||
lgr *logrus.Entry
|
||||
role Role
|
||||
epoch int
|
||||
mu sync.Mutex
|
||||
roleSetter func(role string, epoch int)
|
||||
}
|
||||
|
||||
func (ci *clientinterceptor) setRole(role Role, epoch int) {
|
||||
@@ -67,8 +68,12 @@ func (ci *clientinterceptor) getRole() (Role, int) {
|
||||
func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor {
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
role, epoch := ci.getRole()
|
||||
ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role))
|
||||
if role == RoleStandby {
|
||||
return nil, status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby")
|
||||
return nil, status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby")
|
||||
}
|
||||
if role == RoleDetectedBrokenConfig {
|
||||
return nil, status.Error(codes.FailedPrecondition, "this server is in detected_broken_config and is not currently replicating to its standby")
|
||||
}
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
|
||||
var header metadata.MD
|
||||
@@ -81,8 +86,12 @@ func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor {
|
||||
func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
role, epoch := ci.getRole()
|
||||
ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role))
|
||||
if role == RoleStandby {
|
||||
return status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby")
|
||||
return status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby")
|
||||
}
|
||||
if role == RoleDetectedBrokenConfig {
|
||||
return status.Error(codes.FailedPrecondition, "this server is in detected_broken_config and is not currently replicating to its standby")
|
||||
}
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
|
||||
var header metadata.MD
|
||||
@@ -95,18 +104,24 @@ func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor {
|
||||
func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, role Role, epoch int) {
|
||||
epochs := header.Get(clusterRoleEpochHeader)
|
||||
roles := header.Get(clusterRoleHeader)
|
||||
if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) {
|
||||
if len(epochs) > 0 && len(roles) > 0 {
|
||||
if respepoch, err := strconv.Atoi(epochs[0]); err == nil {
|
||||
if respepoch == epoch {
|
||||
ci.lgr.Errorf("cluster: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.")
|
||||
// TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|...
|
||||
} else if respepoch > epoch {
|
||||
// The server we replicate to thinks it is the primary at a higher epoch than us...
|
||||
ci.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch)
|
||||
// TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|...
|
||||
if roles[0] == string(RolePrimary) {
|
||||
if respepoch == epoch {
|
||||
ci.lgr.Errorf("cluster: clientinterceptor: this server and the server replicating to it are both primary at the same epoch. force transitioning to detected_broken_config.")
|
||||
ci.roleSetter(string(RoleDetectedBrokenConfig), respepoch)
|
||||
} else if respepoch > epoch {
|
||||
// The server we replicate to thinks it is the primary at a higher epoch than us...
|
||||
ci.lgr.Warnf("cluster: clientinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch)
|
||||
ci.roleSetter(string(RoleStandby), respepoch)
|
||||
}
|
||||
} else if roles[0] == string(RoleDetectedBrokenConfig) && respepoch >= epoch {
|
||||
ci.lgr.Errorf("cluster: clientinterceptor: this server learned from its standby that the standby is in detected_broken_config. force transitioning to detected_broken_config.")
|
||||
ci.roleSetter(string(RoleDetectedBrokenConfig), respepoch)
|
||||
}
|
||||
}
|
||||
}
|
||||
ci.lgr.Warnf("cluster: clientinterceptor: response was missing role and epoch metadata")
|
||||
}
|
||||
|
||||
func (ci *clientinterceptor) Options() []grpc.DialOption {
|
||||
@@ -122,8 +137,8 @@ func (ci *clientinterceptor) Options() []grpc.DialOption {
|
||||
// interceptor anytime it changes. In turn, this interceptor:
|
||||
// * adds the server's current role and epoch to the response headers for every
|
||||
// request.
|
||||
// * fails all incoming requests immediately with codes.Unavailable if the
|
||||
// current role == RolePrimary, since nothing should be replicating to us in
|
||||
// * fails all incoming requests immediately with codes.FailedPrecondition if the
|
||||
// current role != RoleStandby, since nothing should be replicating to us in
|
||||
// that state.
|
||||
// * watches incoming request headers for a situation which causes this server
|
||||
// to force downgrade from primary to standby. In particular, when an incoming
|
||||
@@ -131,10 +146,11 @@ func (ci *clientinterceptor) Options() []grpc.DialOption {
|
||||
// than our current epoch, this interceptor coordinates with the Controller to
|
||||
// immediately transition to standby and allow replication requests through.
|
||||
type serverinterceptor struct {
|
||||
lgr *logrus.Entry
|
||||
role Role
|
||||
epoch int
|
||||
mu sync.Mutex
|
||||
lgr *logrus.Entry
|
||||
role Role
|
||||
epoch int
|
||||
mu sync.Mutex
|
||||
roleSetter func(role string, epoch int)
|
||||
}
|
||||
|
||||
func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
|
||||
@@ -150,7 +166,11 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
|
||||
}
|
||||
if role == RolePrimary {
|
||||
// As a primary, we do not accept replication requests.
|
||||
return status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication")
|
||||
return status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
|
||||
}
|
||||
if role == RoleDetectedBrokenConfig {
|
||||
// As a primary, we do not accept replication requests.
|
||||
return status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication")
|
||||
}
|
||||
return handler(srv, ss)
|
||||
}
|
||||
@@ -169,7 +189,11 @@ func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor {
|
||||
}
|
||||
if role == RolePrimary {
|
||||
// As a primary, we do not accept replication requests.
|
||||
return nil, status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication")
|
||||
return nil, status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
|
||||
}
|
||||
if role == RoleDetectedBrokenConfig {
|
||||
// As a primary, we do not accept replication requests.
|
||||
return nil, status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication")
|
||||
}
|
||||
return handler(ctx, req)
|
||||
}
|
||||
@@ -186,12 +210,12 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role,
|
||||
// at the same epoch. We will become standby
|
||||
// and our peer will become standby. An
|
||||
// operator will need to get involved.
|
||||
si.lgr.Errorf("cluster: this server and its standby replica are both primary at the same epoch. force transitioning to standby.")
|
||||
// TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch|
|
||||
si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to detected_broken_config.")
|
||||
si.roleSetter(string(RoleDetectedBrokenConfig), reqepoch)
|
||||
} else if reqepoch > epoch {
|
||||
// The client replicating to us thinks it is the primary at a higher epoch than us.
|
||||
si.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
|
||||
// TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch|
|
||||
si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
|
||||
si.roleSetter(string(RoleStandby), reqepoch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
@@ -43,6 +45,11 @@ func (s *server) Watch(req *grpc_health_v1.HealthCheckRequest, ss grpc_health_v1
|
||||
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
|
||||
}
|
||||
|
||||
func noopSetRole(string, int) {
|
||||
}
|
||||
|
||||
var lgr = logrus.StandardLogger().WithFields(logrus.Fields{})
|
||||
|
||||
func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient), serveropts []grpc.ServerOption, dialopts []grpc.DialOption) *server {
|
||||
addr, err := net.ResolveUnixAddr("unix", "test_grpc.socket")
|
||||
require.NoError(t, err)
|
||||
@@ -85,6 +92,8 @@ func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient),
|
||||
func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) {
|
||||
var si serverinterceptor
|
||||
si.setRole(RoleStandby, 10)
|
||||
si.roleSetter = noopSetRole
|
||||
si.lgr = lgr
|
||||
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
var md metadata.MD
|
||||
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
|
||||
@@ -101,6 +110,8 @@ func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) {
|
||||
func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) {
|
||||
var si serverinterceptor
|
||||
si.setRole(RoleStandby, 10)
|
||||
si.roleSetter = noopSetRole
|
||||
si.lgr = lgr
|
||||
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
var md metadata.MD
|
||||
srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
|
||||
@@ -119,15 +130,17 @@ func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) {
|
||||
func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) {
|
||||
var si serverinterceptor
|
||||
si.setRole(RolePrimary, 10)
|
||||
si.roleSetter = noopSetRole
|
||||
si.lgr = lgr
|
||||
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
ctx := metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value")
|
||||
_, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unavailable, status.Code(err))
|
||||
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
|
||||
ctx = metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value")
|
||||
ss, err := client.Watch(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.NoError(t, err)
|
||||
_, err = ss.Recv()
|
||||
assert.Equal(t, codes.Unavailable, status.Code(err))
|
||||
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
|
||||
}, si.Options(), nil)
|
||||
assert.Nil(t, srv.md)
|
||||
}
|
||||
@@ -135,6 +148,8 @@ func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) {
|
||||
func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) {
|
||||
var ci clientinterceptor
|
||||
ci.setRole(RolePrimary, 10)
|
||||
ci.roleSetter = noopSetRole
|
||||
ci.lgr = lgr
|
||||
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unimplemented, status.Code(err))
|
||||
@@ -150,6 +165,8 @@ func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) {
|
||||
func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) {
|
||||
var ci clientinterceptor
|
||||
ci.setRole(RolePrimary, 10)
|
||||
ci.roleSetter = noopSetRole
|
||||
ci.lgr = lgr
|
||||
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
require.NoError(t, err)
|
||||
@@ -167,14 +184,16 @@ func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) {
|
||||
func TestClientInterceptorAsStandbyDoesNotSendRequest(t *testing.T) {
|
||||
var ci clientinterceptor
|
||||
ci.setRole(RolePrimary, 10)
|
||||
ci.roleSetter = noopSetRole
|
||||
ci.lgr = lgr
|
||||
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unimplemented, status.Code(err))
|
||||
ci.setRole(RoleStandby, 11)
|
||||
_, err = client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unavailable, status.Code(err))
|
||||
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
|
||||
_, err = client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unavailable, status.Code(err))
|
||||
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
|
||||
}, nil, ci.Options())
|
||||
if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) {
|
||||
assert.Equal(t, "primary", srv.md.Get(clusterRoleHeader)[0])
|
||||
|
||||
@@ -24,7 +24,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
@@ -181,12 +181,34 @@ func assertSchemasEqual(t *testing.T, expected, actual sql.Schema) {
|
||||
|
||||
// CreateTableFn returns a SetupFunc that creates a table with the rows given
|
||||
// todo(andy): replace with ExecuteSetupSQL
|
||||
func CreateTableFn(tableName string, tableSchema schema.Schema, initialRows ...row.Row) SetupFn {
|
||||
func CreateTableFn(tableName string, tableSchema schema.Schema, queries string) SetupFn {
|
||||
return func(t *testing.T, dEnv *env.DoltEnv) {
|
||||
dtestutils.CreateTestTable(t, dEnv, tableName, tableSchema, initialRows...)
|
||||
CreateTestTable(t, dEnv, tableName, tableSchema, queries)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateTestTable creates a new test table with the name, schema, and rows given.
|
||||
func CreateTestTable(t *testing.T, dEnv *env.DoltEnv, tableName string, sch schema.Schema, queries string) {
|
||||
ctx := context.Background()
|
||||
root, err := dEnv.WorkingRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
vrw := dEnv.DoltDB.ValueReadWriter()
|
||||
ns := dEnv.DoltDB.NodeStore()
|
||||
|
||||
rows, err := durable.NewEmptyIndex(ctx, vrw, ns, sch)
|
||||
require.NoError(t, err)
|
||||
tbl, err := doltdb.NewTable(ctx, vrw, ns, sch, rows, nil, nil)
|
||||
require.NoError(t, err)
|
||||
root, err = root.PutTable(ctx, tableName, tbl)
|
||||
require.NoError(t, err)
|
||||
err = dEnv.UpdateWorkingRoot(ctx, root)
|
||||
require.NoError(t, err)
|
||||
root, err = ExecuteSql(t, dEnv, root, queries)
|
||||
require.NoError(t, err)
|
||||
err = dEnv.UpdateWorkingRoot(ctx, root)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func ExecuteSetupSQL(ctx context.Context, queries string) SetupFn {
|
||||
return func(t *testing.T, dEnv *env.DoltEnv) {
|
||||
root, err := dEnv.WorkingRoot(ctx)
|
||||
|
||||
@@ -54,6 +54,7 @@ type DoltDatabaseProvider struct {
|
||||
remoteDialer dbfactory.GRPCDialProvider // TODO: why isn't this a method defined on the remote object
|
||||
|
||||
dbFactoryUrl string
|
||||
isStandby *bool
|
||||
}
|
||||
|
||||
var _ sql.DatabaseProvider = (*DoltDatabaseProvider)(nil)
|
||||
@@ -115,6 +116,7 @@ func NewDoltDatabaseProviderWithDatabases(defaultBranch string, fs filesys.Files
|
||||
defaultBranch: defaultBranch,
|
||||
dbFactoryUrl: doltdb.LocalDirDoltDB,
|
||||
InitDatabaseHook: ConfigureReplicationDatabaseHook,
|
||||
isStandby: new(bool),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -147,6 +149,15 @@ func (p DoltDatabaseProvider) FileSystem() filesys.Filesys {
|
||||
return p.fs
|
||||
}
|
||||
|
||||
// If this DatabaseProvider is set to standby |true|, it returns every dolt
|
||||
// database as a read only database. Set back to |false| to get read-write
|
||||
// behavior from dolt databases again.
|
||||
func (p DoltDatabaseProvider) SetIsStandby(standby bool) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
*p.isStandby = standby
|
||||
}
|
||||
|
||||
// FileSystemForDatabase returns a filesystem, with the working directory set to the root directory
|
||||
// of the requested database. If the requested database isn't found, a database not found error
|
||||
// is returned.
|
||||
@@ -167,9 +178,10 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da
|
||||
var ok bool
|
||||
p.mu.RLock()
|
||||
db, ok = p.databases[formatDbMapKeyName(name)]
|
||||
standby := *p.isStandby
|
||||
p.mu.RUnlock()
|
||||
if ok {
|
||||
return db, nil
|
||||
return wrapForStandby(db, standby), nil
|
||||
}
|
||||
|
||||
db, _, ok, err = p.databaseForRevision(ctx, name)
|
||||
@@ -188,7 +200,21 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da
|
||||
}
|
||||
|
||||
// Don't track revision databases, just instantiate them on demand
|
||||
return db, nil
|
||||
return wrapForStandby(db, standby), nil
|
||||
}
|
||||
|
||||
func wrapForStandby(db sql.Database, standby bool) sql.Database {
|
||||
if !standby {
|
||||
return db
|
||||
}
|
||||
if _, ok := db.(ReadOnlyDatabase); ok {
|
||||
return db
|
||||
}
|
||||
if db, ok := db.(Database); ok {
|
||||
// :-/. Hopefully it's not too sliced.
|
||||
return ReadOnlyDatabase{db}
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
// attemptCloneReplica attempts to clone a database from the configured replication remote URL template, returning an error
|
||||
|
||||
@@ -60,6 +60,10 @@ type DoltSession struct {
|
||||
tempTables map[string][]sql.Table
|
||||
globalsConf config.ReadWriteConfig
|
||||
mu *sync.Mutex
|
||||
|
||||
// If non-nil, this will be returned from ValidateSession.
|
||||
// Used by sqle/cluster to put a session into a terminal err state.
|
||||
validateErr error
|
||||
}
|
||||
|
||||
var _ sql.Session = (*DoltSession)(nil)
|
||||
@@ -178,10 +182,22 @@ func (d *DoltSession) Flush(ctx *sql.Context, dbName string) error {
|
||||
return d.SetRoot(ctx, dbName, ws.WorkingRoot())
|
||||
}
|
||||
|
||||
// SetValidateErr sets an error on this session to be returned from every call
|
||||
// to ValidateSession. This is effectively a way to disable a session.
|
||||
//
|
||||
// Used by sql/cluster logic to make sessions on a server which has
|
||||
// transitioned roles termainlly error.
|
||||
func (d *DoltSession) SetValidateErr(err error) {
|
||||
d.validateErr = err
|
||||
}
|
||||
|
||||
// ValidateSession validates a working set if there are a valid sessionState with non-nil working set.
|
||||
// If there is no sessionState or its current working set not defined, then no need for validation,
|
||||
// so no error is returned.
|
||||
func (d *DoltSession) ValidateSession(ctx *sql.Context, dbName string) error {
|
||||
if d.validateErr != nil {
|
||||
return d.validateErr
|
||||
}
|
||||
sessionState, ok, err := d.LookupDbState(ctx, dbName)
|
||||
if !ok {
|
||||
return nil
|
||||
|
||||
@@ -46,7 +46,7 @@ var skipPrepared bool
|
||||
// SkipPreparedsCount is used by the "ci-check-repo CI workflow
|
||||
// as a reminder to consider prepareds when adding a new
|
||||
// enginetest suite.
|
||||
const SkipPreparedsCount = 79
|
||||
const SkipPreparedsCount = 80
|
||||
|
||||
const skipPreparedFlag = "DOLT_SKIP_PREPARED_ENGINETESTS"
|
||||
|
||||
@@ -258,6 +258,11 @@ func TestQueryPlans(t *testing.T) {
|
||||
enginetest.TestQueryPlans(t, harness, queries.PlanTests)
|
||||
}
|
||||
|
||||
func TestIntegrationQueryPlans(t *testing.T) {
|
||||
harness := newDoltHarness(t).WithParallelism(1)
|
||||
enginetest.TestIntegrationPlans(t, harness)
|
||||
}
|
||||
|
||||
func TestDoltDiffQueryPlans(t *testing.T) {
|
||||
harness := newDoltHarness(t).WithParallelism(2) // want Exchange nodes
|
||||
harness.Setup(setup.SimpleSetup...)
|
||||
|
||||
@@ -39,7 +39,7 @@ import (
|
||||
// that the final output is as expected.
|
||||
func TestMergeableIndexes(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip()
|
||||
t.Skip() // this test is specific to Noms ranges
|
||||
}
|
||||
|
||||
engine, denv, root, db, indexTuples := setupIndexes(t, "test", `INSERT INTO test VALUES
|
||||
@@ -1379,7 +1379,7 @@ func TestMergeableIndexes(t *testing.T) {
|
||||
// TODO: disassociate NULL ranges from value ranges and fix the intermediate ranges (finalRanges).
|
||||
func TestMergeableIndexesNulls(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip()
|
||||
t.Skip() // this test is specific to Noms ranges
|
||||
}
|
||||
|
||||
engine, denv, root, db, indexTuples := setupIndexes(t, "test", `INSERT INTO test VALUES
|
||||
|
||||
@@ -20279,10 +20279,10 @@ func TestExplain(t *testing.T) {
|
||||
expectedExplain := "IndexedJoin(d.Symbol = t.Symbol)\n" +
|
||||
" ├─ TableAlias(d)\n" +
|
||||
" │ └─ Table(daily_summary)\n" +
|
||||
" │ └─ columns: [Type Symbol Country TradingDate Open High Low Close Volume OpenInt]\n" +
|
||||
" │ └─ columns: [type symbol country tradingdate open high low close volume openint]\n" +
|
||||
" └─ TableAlias(t)\n" +
|
||||
" └─ IndexedTableAccess(symbols)\n" +
|
||||
" ├─ index: [symbols.Symbol]\n" +
|
||||
" └─ columns: [Symbol Name Sector IPOYear]"
|
||||
" └─ columns: [symbol name sector ipoyear]"
|
||||
assert.Equal(t, expectedExplain, strings.Join(rowStrings, "\n"))
|
||||
}
|
||||
|
||||
@@ -15,8 +15,6 @@
|
||||
package sqle
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
@@ -37,15 +35,15 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ddb, ok := db.(Database)
|
||||
sdb, ok := db.(SqlDatabase)
|
||||
if !ok {
|
||||
return nil, errors.New("unimplemented")
|
||||
return nil, remotesrv.ErrUnimplemented
|
||||
}
|
||||
datasdb := doltdb.HackDatasDatabaseFromDoltDB(ddb.DbData().Ddb)
|
||||
datasdb := doltdb.HackDatasDatabaseFromDoltDB(sdb.DbData().Ddb)
|
||||
cs := datas.ChunkStoreFromDatabase(datasdb)
|
||||
rss, ok := cs.(remotesrv.RemoteSrvStore)
|
||||
if !ok {
|
||||
return nil, errors.New("unimplemented")
|
||||
return nil, remotesrv.ErrUnimplemented
|
||||
}
|
||||
return rss, nil
|
||||
}
|
||||
|
||||
@@ -16,27 +16,24 @@ package sqle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
func TestSqlBatchInserts(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip() // todo: convert to enginetests
|
||||
}
|
||||
|
||||
insertStatements := []string{
|
||||
`insert into people (id, first_name, last_name, is_married, age, rating, uuid, num_episodes) values
|
||||
(7, "Maggie", "Simpson", false, 1, 5.1, '00000000-0000-0000-0000-000000000007', 677)`,
|
||||
@@ -90,36 +87,39 @@ func TestSqlBatchInserts(t *testing.T) {
|
||||
allAppearanceRows, err := GetAllRows(root, AppearancesTableName)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.ElementsMatch(t, AllPeopleRows, allPeopleRows)
|
||||
assert.ElementsMatch(t, AllEpsRows, allEpsRows)
|
||||
assert.ElementsMatch(t, AllAppsRows, allAppearanceRows)
|
||||
AllPeopleSqlRows := ToSqlRows(PeopleTestSchema, AllPeopleRows...)
|
||||
AllEpsSqlRows := ToSqlRows(EpisodesTestSchema, AllEpsRows...)
|
||||
AllAppsSqlRows := ToSqlRows(AppearancesTestSchema, AllAppsRows...)
|
||||
assert.ElementsMatch(t, AllPeopleSqlRows, allPeopleRows)
|
||||
assert.ElementsMatch(t, AllEpsSqlRows, allEpsRows)
|
||||
assert.ElementsMatch(t, AllAppsSqlRows, allAppearanceRows)
|
||||
|
||||
// Now commit the batch and check for new rows
|
||||
err = db.Flush(sqlCtx)
|
||||
require.NoError(t, err)
|
||||
|
||||
var expectedPeople, expectedEpisodes, expectedAppearances []row.Row
|
||||
var expectedPeople, expectedEpisodes, expectedAppearances []sql.Row
|
||||
|
||||
expectedPeople = append(expectedPeople, AllPeopleRows...)
|
||||
expectedPeople = append(expectedPeople,
|
||||
expectedPeople = append(expectedPeople, AllPeopleSqlRows...)
|
||||
expectedPeople = append(expectedPeople, ToSqlRows(PeopleTestSchema,
|
||||
NewPeopleRowWithOptionalFields(7, "Maggie", "Simpson", false, 1, 5.1, uuid.MustParse("00000000-0000-0000-0000-000000000007"), 677),
|
||||
NewPeopleRowWithOptionalFields(8, "Milhouse", "VanHouten", false, 1, 5.1, uuid.MustParse("00000000-0000-0000-0000-000000000008"), 677),
|
||||
newPeopleRow(9, "Clancey", "Wiggum"),
|
||||
newPeopleRow(10, "Montgomery", "Burns"),
|
||||
newPeopleRow(11, "Ned", "Flanders"),
|
||||
)
|
||||
)...)
|
||||
|
||||
expectedEpisodes = append(expectedEpisodes, AllEpsRows...)
|
||||
expectedEpisodes = append(expectedEpisodes,
|
||||
expectedEpisodes = append(expectedEpisodes, AllEpsSqlRows...)
|
||||
expectedEpisodes = append(expectedEpisodes, ToSqlRows(EpisodesTestSchema,
|
||||
newEpsRow(5, "Bart the General"),
|
||||
newEpsRow(6, "Moaning Lisa"),
|
||||
newEpsRow(7, "The Call of the Simpsons"),
|
||||
newEpsRow(8, "The Telltale Head"),
|
||||
newEpsRow(9, "Life on the Fast Lane"),
|
||||
)
|
||||
)...)
|
||||
|
||||
expectedAppearances = append(expectedAppearances, AllAppsRows...)
|
||||
expectedAppearances = append(expectedAppearances,
|
||||
expectedAppearances = append(expectedAppearances, AllAppsSqlRows...)
|
||||
expectedAppearances = append(expectedAppearances, ToSqlRows(AppearancesTestSchema,
|
||||
newAppsRow(7, 5),
|
||||
newAppsRow(7, 6),
|
||||
newAppsRow(8, 7),
|
||||
@@ -128,7 +128,7 @@ func TestSqlBatchInserts(t *testing.T) {
|
||||
newAppsRow(10, 5),
|
||||
newAppsRow(10, 6),
|
||||
newAppsRow(11, 9),
|
||||
)
|
||||
)...)
|
||||
|
||||
root, err = db.GetRoot(sqlCtx)
|
||||
require.NoError(t, err)
|
||||
@@ -139,9 +139,9 @@ func TestSqlBatchInserts(t *testing.T) {
|
||||
allAppearanceRows, err = GetAllRows(root, AppearancesTableName)
|
||||
require.NoError(t, err)
|
||||
|
||||
assertRowSetsEqual(t, expectedPeople, allPeopleRows)
|
||||
assertRowSetsEqual(t, expectedEpisodes, allEpsRows)
|
||||
assertRowSetsEqual(t, expectedAppearances, allAppearanceRows)
|
||||
assertRowSetsEqual(t, PeopleTestSchema, expectedPeople, allPeopleRows)
|
||||
assertRowSetsEqual(t, EpisodesTestSchema, expectedEpisodes, allEpsRows)
|
||||
assertRowSetsEqual(t, AppearancesTestSchema, expectedAppearances, allAppearanceRows)
|
||||
}
|
||||
|
||||
func TestSqlBatchInsertIgnoreReplace(t *testing.T) {
|
||||
@@ -193,14 +193,10 @@ func TestSqlBatchInsertIgnoreReplace(t *testing.T) {
|
||||
|
||||
allPeopleRows, err = GetAllRows(root, PeopleTableName)
|
||||
assert.NoError(t, err)
|
||||
assertRowSetsEqual(t, expectedPeople, allPeopleRows)
|
||||
assertRowSetsEqual(t, PeopleTestSchema, ToSqlRows(PeopleTestSchema, expectedPeople...), allPeopleRows)
|
||||
}
|
||||
|
||||
func TestSqlBatchInsertErrors(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip() // todo: convert to enginetests
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
dEnv := CreateTestDatabase(t)
|
||||
root, err := dEnv.WorkingRoot(ctx)
|
||||
@@ -230,64 +226,31 @@ func TestSqlBatchInsertErrors(t *testing.T) {
|
||||
assert.NoError(t, db.Flush(sqlCtx))
|
||||
}
|
||||
|
||||
func assertRowSetsEqual(t *testing.T, expected, actual []row.Row) {
|
||||
equal, diff := rowSetsEqual(expected, actual)
|
||||
assert.True(t, equal, diff)
|
||||
func assertRowSetsEqual(t *testing.T, sch schema.Schema, expected, actual []sql.Row) {
|
||||
require.Equal(t, len(expected), len(actual),
|
||||
"Sets have different sizes: expected %d, was %d", len(expected), len(actual))
|
||||
sqlSch, err := sqlutil.FromDoltSchema("", sch)
|
||||
require.NoError(t, err)
|
||||
sortSqlRows(t, sqlSch.Schema, expected)
|
||||
sortSqlRows(t, sqlSch.Schema, actual)
|
||||
if !assert.Equal(t, expected, actual) {
|
||||
t.Skip("")
|
||||
}
|
||||
}
|
||||
|
||||
// Returns whether the two slices of rows contain the same elements using set semantics (no duplicates), and an error
|
||||
// string if they aren't.
|
||||
func rowSetsEqual(expected, actual []row.Row) (bool, string) {
|
||||
if len(expected) != len(actual) {
|
||||
return false, fmt.Sprintf("Sets have different sizes: expected %d, was %d", len(expected), len(actual))
|
||||
}
|
||||
|
||||
for _, ex := range expected {
|
||||
if !containsRow(actual, ex) {
|
||||
return false, fmt.Sprintf("Missing row: %v", ex)
|
||||
func sortSqlRows(t *testing.T, sch sql.Schema, rows []sql.Row) {
|
||||
sort.Slice(rows, func(i, j int) bool {
|
||||
l, r := rows[i], rows[j]
|
||||
for idx, col := range sch {
|
||||
c, err := col.Type.Compare(l[idx], r[idx])
|
||||
require.NoError(t, err)
|
||||
if c == 0 {
|
||||
continue
|
||||
}
|
||||
return c < 0
|
||||
}
|
||||
}
|
||||
|
||||
return true, ""
|
||||
}
|
||||
|
||||
func containsRow(rs []row.Row, r row.Row) bool {
|
||||
for _, r2 := range rs {
|
||||
equal, _ := rowsEqual(r, r2)
|
||||
if equal {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func rowsEqual(expected, actual row.Row) (bool, string) {
|
||||
er, ar := make(map[uint64]types.Value), make(map[uint64]types.Value)
|
||||
_, err := expected.IterCols(func(t uint64, v types.Value) (bool, error) {
|
||||
er[t] = v
|
||||
return false, nil
|
||||
return false
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
_, err = actual.IterCols(func(t uint64, v types.Value) (bool, error) {
|
||||
ar[t] = v
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
opts := cmp.Options{cmp.AllowUnexported(), dtestutils.FloatComparer, dtestutils.TimestampComparer}
|
||||
eq := cmp.Equal(er, ar, opts)
|
||||
var diff string
|
||||
if !eq {
|
||||
diff = cmp.Diff(er, ar, opts)
|
||||
}
|
||||
return eq, diff
|
||||
}
|
||||
|
||||
func newPeopleRow(id int, firstName, lastName string) row.Row {
|
||||
|
||||
@@ -782,9 +782,6 @@ func TestRenameTableStatements(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAlterSystemTables(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip("") // todo: convert to enginetests
|
||||
}
|
||||
systemTableNames := []string{"dolt_log", "dolt_history_people", "dolt_diff_people", "dolt_commit_diff_people"} // "dolt_docs",
|
||||
reservedTableNames := []string{"dolt_schemas", "dolt_query_catalog"}
|
||||
|
||||
@@ -794,15 +791,12 @@ func TestAlterSystemTables(t *testing.T) {
|
||||
dtestutils.CreateEmptyTestTable(t, dEnv, "dolt_docs", doltdb.DocsSchema)
|
||||
dtestutils.CreateEmptyTestTable(t, dEnv, doltdb.SchemasTableName, SchemasTableSchema())
|
||||
|
||||
dtestutils.CreateTestTable(t, dEnv, "dolt_docs",
|
||||
doltdb.DocsSchema,
|
||||
NewRow(types.String("LICENSE.md"), types.String("A license")))
|
||||
dtestutils.CreateTestTable(t, dEnv, doltdb.DoltQueryCatalogTableName,
|
||||
dtables.DoltQueryCatalogSchema,
|
||||
NewRow(types.String("abc123"), types.Uint(1), types.String("example"), types.String("select 2+2 from dual"), types.String("description")))
|
||||
dtestutils.CreateTestTable(t, dEnv, doltdb.SchemasTableName,
|
||||
SchemasTableSchema(),
|
||||
NewRowWithPks([]types.Value{types.String("view"), types.String("name")}, types.String("select 2+2 from dual")))
|
||||
CreateTestTable(t, dEnv, "dolt_docs", doltdb.DocsSchema,
|
||||
"INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')")
|
||||
CreateTestTable(t, dEnv, doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema,
|
||||
"INSERT INTO dolt_query_catalog VALUES ('abc123', 1, 'example', 'select 2+2 from dual', 'description')")
|
||||
CreateTestTable(t, dEnv, doltdb.SchemasTableName, SchemasTableSchema(),
|
||||
"INSERT INTO dolt_schemas (type, name, fragment, id) VALUES ('view', 'name', 'select 2+2 from dual', 1)")
|
||||
}
|
||||
|
||||
t.Run("Create", func(t *testing.T) {
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dtables"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
// Set to the name of a single test to run just that test, useful for debugging
|
||||
@@ -180,9 +179,6 @@ func TestExecuteDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestExecuteDeleteSystemTables(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip() // todo: convert to enginetest
|
||||
}
|
||||
for _, test := range systemTableDeleteTests {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
testDeleteQuery(t, test)
|
||||
@@ -193,9 +189,8 @@ func TestExecuteDeleteSystemTables(t *testing.T) {
|
||||
var systemTableDeleteTests = []DeleteTest{
|
||||
{
|
||||
Name: "delete dolt_docs",
|
||||
AdditionalSetup: CreateTableFn("dolt_docs",
|
||||
doltdb.DocsSchema,
|
||||
NewRow(types.String("LICENSE.md"), types.String("A license"))),
|
||||
AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema,
|
||||
"INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')"),
|
||||
DeleteQuery: "delete from dolt_docs where doc_name = 'LICENSE.md'",
|
||||
SelectQuery: "select * from dolt_docs",
|
||||
ExpectedRows: []sql.Row{},
|
||||
@@ -203,9 +198,8 @@ var systemTableDeleteTests = []DeleteTest{
|
||||
},
|
||||
{
|
||||
Name: "delete dolt_query_catalog",
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName,
|
||||
dtables.DoltQueryCatalogSchema,
|
||||
NewRow(types.String("abc123"), types.Uint(1), types.String("example"), types.String("select 2+2 from dual"), types.String("description"))),
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema,
|
||||
"INSERT INTO dolt_query_catalog VALUES ('abc123', 1, 'example', 'select 2+2 from dual', 'description')"),
|
||||
DeleteQuery: "delete from dolt_query_catalog",
|
||||
SelectQuery: "select * from dolt_query_catalog",
|
||||
ExpectedRows: ToSqlRows(dtables.DoltQueryCatalogSchema),
|
||||
@@ -213,9 +207,8 @@ var systemTableDeleteTests = []DeleteTest{
|
||||
},
|
||||
{
|
||||
Name: "delete dolt_schemas",
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName,
|
||||
SchemasTableSchema(),
|
||||
NewRowWithPks([]types.Value{types.String("view"), types.String("name")}, types.String("select 2+2 from dual"))),
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(),
|
||||
"INSERT INTO dolt_schemas (type, name, fragment, id) VALUES ('view', 'name', 'select 2+2 from dual', 1)"),
|
||||
DeleteQuery: "delete from dolt_schemas",
|
||||
SelectQuery: "select * from dolt_schemas",
|
||||
ExpectedRows: ToSqlRows(dtables.DoltQueryCatalogSchema),
|
||||
|
||||
@@ -377,7 +377,7 @@ func TestExecuteInsert(t *testing.T) {
|
||||
var systemTableInsertTests = []InsertTest{
|
||||
{
|
||||
Name: "insert into dolt_docs",
|
||||
AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema),
|
||||
AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema, ""),
|
||||
InsertQuery: "insert into dolt_docs (doc_name, doc_text) values ('README.md', 'Some text')",
|
||||
SelectQuery: "select * from dolt_docs",
|
||||
ExpectedRows: []sql.Row{{"README.md", "Some text"}},
|
||||
@@ -385,14 +385,8 @@ var systemTableInsertTests = []InsertTest{
|
||||
},
|
||||
{
|
||||
Name: "insert into dolt_query_catalog",
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName,
|
||||
dtables.DoltQueryCatalogSchema,
|
||||
NewRowWithSchema(dtables.DoltQueryCatalogSchema,
|
||||
types.String("existingEntry"),
|
||||
types.Uint(2),
|
||||
types.String("example"),
|
||||
types.String("select 2+2 from dual"),
|
||||
types.String("description"))),
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema,
|
||||
"INSERT INTO dolt_query_catalog VALUES ('existingEntry', 2, 'example', 'select 2+2 from dual', 'description')"),
|
||||
InsertQuery: "insert into dolt_query_catalog (id, display_order, name, query, description) values ('abc123', 1, 'example', 'select 1+1 from dual', 'description')",
|
||||
SelectQuery: "select * from dolt_query_catalog ORDER BY id",
|
||||
ExpectedRows: ToSqlRows(CompressSchema(dtables.DoltQueryCatalogSchema),
|
||||
@@ -403,7 +397,7 @@ var systemTableInsertTests = []InsertTest{
|
||||
},
|
||||
{
|
||||
Name: "insert into dolt_schemas",
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema()),
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(), ""),
|
||||
InsertQuery: "insert into dolt_schemas (id, type, name, fragment) values (1, 'view', 'name', 'select 2+2 from dual')",
|
||||
SelectQuery: "select * from dolt_schemas ORDER BY id",
|
||||
ExpectedRows: ToSqlRows(CompressSchema(SchemasTableSchema()),
|
||||
@@ -414,9 +408,6 @@ var systemTableInsertTests = []InsertTest{
|
||||
}
|
||||
|
||||
func TestInsertIntoSystemTables(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip() // todo: convert to enginetest
|
||||
}
|
||||
for _, test := range systemTableInsertTests {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
testInsertQuery(t, test)
|
||||
|
||||
@@ -252,9 +252,8 @@ func TestExecuteReplace(t *testing.T) {
|
||||
var systemTableReplaceTests = []ReplaceTest{
|
||||
{
|
||||
Name: "replace into dolt_docs",
|
||||
AdditionalSetup: CreateTableFn("dolt_docs",
|
||||
doltdb.DocsSchema,
|
||||
NewRow(types.String("LICENSE.md"), types.String("A license"))),
|
||||
AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema,
|
||||
"INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')"),
|
||||
ReplaceQuery: "replace into dolt_docs (doc_name, doc_text) values ('LICENSE.md', 'Some text')",
|
||||
SelectQuery: "select * from dolt_docs",
|
||||
ExpectedRows: []sql.Row{{"LICENSE.md", "Some text"}},
|
||||
@@ -262,9 +261,8 @@ var systemTableReplaceTests = []ReplaceTest{
|
||||
},
|
||||
{
|
||||
Name: "replace into dolt_query_catalog",
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName,
|
||||
dtables.DoltQueryCatalogSchema,
|
||||
NewRow(types.String("existingEntry"), types.Uint(1), types.String("example"), types.String("select 2+2 from dual"), types.String("description"))),
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema,
|
||||
"INSERT INTO dolt_query_catalog VALUES ('existingEntry', 1, 'example', 'select 2+2 from dual', 'description')"),
|
||||
ReplaceQuery: "replace into dolt_query_catalog (id, display_order, name, query, description) values ('existingEntry', 1, 'example', 'select 1+1 from dual', 'description')",
|
||||
SelectQuery: "select * from dolt_query_catalog",
|
||||
ExpectedRows: ToSqlRows(CompressSchema(dtables.DoltQueryCatalogSchema),
|
||||
@@ -274,9 +272,8 @@ var systemTableReplaceTests = []ReplaceTest{
|
||||
},
|
||||
{
|
||||
Name: "replace into dolt_schemas",
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName,
|
||||
SchemasTableSchema(),
|
||||
NewRowWithSchema(SchemasTableSchema(), types.String("view"), types.String("name"), types.String("select 2+2 from dual"), types.Int(1), types.NullValue)),
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(),
|
||||
"INSERT INTO dolt_schemas VALUES ('view', 'name', 'select 2+2 from dual', 1, NULL)"),
|
||||
ReplaceQuery: "replace into dolt_schemas (id, type, name, fragment) values ('1', 'view', 'name', 'select 1+1 from dual')",
|
||||
SelectQuery: "select type, name, fragment, id, extra from dolt_schemas",
|
||||
ExpectedRows: []sql.Row{{"view", "name", "select 1+1 from dual", int64(1), nil}},
|
||||
@@ -285,9 +282,6 @@ var systemTableReplaceTests = []ReplaceTest{
|
||||
}
|
||||
|
||||
func TestReplaceIntoSystemTables(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip() // todo: convert to enginetest
|
||||
}
|
||||
for _, test := range systemTableReplaceTests {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
testReplaceQuery(t, test)
|
||||
|
||||
@@ -1283,12 +1283,8 @@ func TestJoins(t *testing.T) {
|
||||
var systemTableSelectTests = []SelectTest{
|
||||
{
|
||||
Name: "select from dolt_docs",
|
||||
AdditionalSetup: CreateTableFn("dolt_docs",
|
||||
doltdb.DocsSchema,
|
||||
NewRowWithSchema(doltdb.DocsSchema,
|
||||
types.String("LICENSE.md"),
|
||||
types.String("A license")),
|
||||
),
|
||||
AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema,
|
||||
"INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')"),
|
||||
Query: "select * from dolt_docs",
|
||||
ExpectedRows: ToSqlRows(CompressSchema(doltdb.DocsSchema),
|
||||
NewRow(types.String("LICENSE.md"), types.String("A license"))),
|
||||
@@ -1296,15 +1292,8 @@ var systemTableSelectTests = []SelectTest{
|
||||
},
|
||||
{
|
||||
Name: "select from dolt_query_catalog",
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName,
|
||||
dtables.DoltQueryCatalogSchema,
|
||||
NewRowWithSchema(dtables.DoltQueryCatalogSchema,
|
||||
types.String("existingEntry"),
|
||||
types.Uint(2),
|
||||
types.String("example"),
|
||||
types.String("select 2+2 from dual"),
|
||||
types.String("description")),
|
||||
),
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema,
|
||||
"INSERT INTO dolt_query_catalog VALUES ('existingEntry', 2, 'example', 'select 2+2 from dual', 'description')"),
|
||||
Query: "select * from dolt_query_catalog",
|
||||
ExpectedRows: ToSqlRows(CompressSchema(dtables.DoltQueryCatalogSchema),
|
||||
NewRow(types.String("existingEntry"), types.Uint(2), types.String("example"), types.String("select 2+2 from dual"), types.String("description")),
|
||||
@@ -1313,19 +1302,10 @@ var systemTableSelectTests = []SelectTest{
|
||||
},
|
||||
{
|
||||
Name: "select from dolt_schemas",
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName,
|
||||
SchemasTableSchema(),
|
||||
NewRowWithSchema(SchemasTableSchema(),
|
||||
types.String("view"),
|
||||
types.String("name"),
|
||||
types.String("select 2+2 from dual"),
|
||||
types.Int(1),
|
||||
CreateTestJSON(),
|
||||
)),
|
||||
Query: "select * from dolt_schemas",
|
||||
ExpectedRows: ToSqlRows(CompressSchema(SchemasTableSchema()),
|
||||
NewRow(types.String("view"), types.String("name"), types.String("select 2+2 from dual"), types.Int(1), CreateTestJSON()),
|
||||
),
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(),
|
||||
`INSERT INTO dolt_schemas VALUES ('view', 'name', 'select 2+2 from dual', 1, NULL)`),
|
||||
Query: "select * from dolt_schemas",
|
||||
ExpectedRows: []sql.Row{{"view", "name", "select 2+2 from dual", int64(1), nil}},
|
||||
ExpectedSchema: CompressSchema(SchemasTableSchema()),
|
||||
},
|
||||
}
|
||||
@@ -1338,9 +1318,6 @@ func CreateTestJSON() types.JSON {
|
||||
}
|
||||
|
||||
func TestSelectSystemTables(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip() // todo: convert to enginetest
|
||||
}
|
||||
for _, test := range systemTableSelectTests {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
testSelectQuery(t, test)
|
||||
@@ -1421,17 +1398,10 @@ func testSelectQuery(t *testing.T, test SelectTest) {
|
||||
}
|
||||
|
||||
func testSelectDiffQuery(t *testing.T, test SelectTest) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip("") // todo: convert to enginetests
|
||||
}
|
||||
|
||||
validateTest(t, test)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
cleanup := installTestCommitClock()
|
||||
defer cleanup()
|
||||
|
||||
dEnv := dtestutils.CreateTestEnv()
|
||||
InitializeWithHistory(t, ctx, dEnv, CreateHistory(ctx, dEnv, t)...)
|
||||
if test.AdditionalSetup != nil {
|
||||
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dtables"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/json"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
// Set to the name of a single test to run just that test, useful for debugging
|
||||
@@ -350,9 +349,6 @@ func TestExecuteUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestExecuteUpdateSystemTables(t *testing.T) {
|
||||
if types.Format_Default != types.Format_LD_1 {
|
||||
t.Skip() // todo: convert to enginetest
|
||||
}
|
||||
for _, test := range systemTableUpdateTests {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
testUpdateQuery(t, test)
|
||||
@@ -363,11 +359,8 @@ func TestExecuteUpdateSystemTables(t *testing.T) {
|
||||
var systemTableUpdateTests = []UpdateTest{
|
||||
{
|
||||
Name: "update dolt_docs",
|
||||
AdditionalSetup: CreateTableFn("dolt_docs",
|
||||
doltdb.DocsSchema,
|
||||
NewRowWithSchema(doltdb.DocsSchema,
|
||||
types.String("LICENSE.md"), types.String("A license"),
|
||||
)),
|
||||
AdditionalSetup: CreateTableFn("dolt_docs", doltdb.DocsSchema,
|
||||
"INSERT INTO dolt_docs VALUES ('LICENSE.md','A license')"),
|
||||
UpdateQuery: "update dolt_docs set doc_text = 'Some text';",
|
||||
SelectQuery: "select * from dolt_docs",
|
||||
ExpectedRows: []sql.Row{{"LICENSE.md", "Some text"}},
|
||||
@@ -375,37 +368,20 @@ var systemTableUpdateTests = []UpdateTest{
|
||||
},
|
||||
{
|
||||
Name: "update dolt_query_catalog",
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName,
|
||||
dtables.DoltQueryCatalogSchema,
|
||||
NewRowWithSchema(dtables.DoltQueryCatalogSchema,
|
||||
types.String("abc123"),
|
||||
types.Uint(1),
|
||||
types.String("example"),
|
||||
types.String("select 2+2 from dual"),
|
||||
types.String("description"),
|
||||
)),
|
||||
UpdateQuery: "update dolt_query_catalog set display_order = display_order + 1",
|
||||
SelectQuery: "select * from dolt_query_catalog",
|
||||
ExpectedRows: ToSqlRows(CompressSchema(dtables.DoltQueryCatalogSchema),
|
||||
NewRow(types.String("abc123"), types.Uint(2), types.String("example"), types.String("select 2+2 from dual"), types.String("description"))),
|
||||
AdditionalSetup: CreateTableFn(doltdb.DoltQueryCatalogTableName, dtables.DoltQueryCatalogSchema,
|
||||
"INSERT INTO dolt_query_catalog VALUES ('abc123', 1, 'example', 'select 2+2 from dual', 'description')"),
|
||||
UpdateQuery: "update dolt_query_catalog set display_order = display_order + 1",
|
||||
SelectQuery: "select * from dolt_query_catalog",
|
||||
ExpectedRows: []sql.Row{{"abc123", uint64(2), "example", "select 2+2 from dual", "description"}},
|
||||
ExpectedSchema: CompressSchema(dtables.DoltQueryCatalogSchema),
|
||||
},
|
||||
{
|
||||
Name: "update dolt_schemas",
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName,
|
||||
SchemasTableSchema(),
|
||||
NewRowWithSchema(SchemasTableSchema(),
|
||||
types.String("view"),
|
||||
types.String("name"),
|
||||
types.String("select 2+2 from dual"),
|
||||
types.Int(1),
|
||||
CreateTestJSON(),
|
||||
)),
|
||||
UpdateQuery: "update dolt_schemas set type = 'not a view'",
|
||||
SelectQuery: "select * from dolt_schemas",
|
||||
ExpectedRows: ToSqlRows(CompressSchema(SchemasTableSchema()),
|
||||
NewRow(types.String("not a view"), types.String("name"), types.String("select 2+2 from dual"), types.Int(1), CreateTestJSON()),
|
||||
),
|
||||
AdditionalSetup: CreateTableFn(doltdb.SchemasTableName, SchemasTableSchema(),
|
||||
`INSERT INTO dolt_schemas VALUES ('view', 'name', 'select 2+2 from dual', 1, NULL)`),
|
||||
UpdateQuery: "update dolt_schemas set type = 'not a view'",
|
||||
SelectQuery: "select * from dolt_schemas",
|
||||
ExpectedRows: []sql.Row{{"not a view", "name", "select 2+2 from dual", int64(1), nil}},
|
||||
ExpectedSchema: CompressSchema(SchemasTableSchema()),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@ package sqle
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"reflect"
|
||||
"sort"
|
||||
@@ -29,15 +28,8 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
|
||||
func setupEditorFkTest(t *testing.T) (*env.DoltEnv, *doltdb.RootValue) {
|
||||
@@ -665,7 +657,8 @@ func assertTableEditorRows(t *testing.T, root *doltdb.RootValue, expected []sql.
|
||||
|
||||
var sqlRows []sql.Row
|
||||
if len(expected) > 0 {
|
||||
sqlRows = sqlRowsFromDurableIndex(t, rows, sch)
|
||||
sqlRows, err = SqlRowsFromDurableIndex(rows, sch)
|
||||
require.NoError(t, err)
|
||||
expected = sortInt64Rows(convertSqlRowToInt64(expected))
|
||||
sqlRows = sortInt64Rows(convertSqlRowToInt64(sqlRows))
|
||||
if !assert.Equal(t, expected, sqlRows) {
|
||||
@@ -708,7 +701,8 @@ func assertTableEditorRows(t *testing.T, root *doltdb.RootValue, expected []sql.
|
||||
expectedIndexRows = sortInt64Rows(expectedIndexRows)
|
||||
|
||||
if len(expectedIndexRows) > 0 {
|
||||
sqlRows = sqlRowsFromDurableIndex(t, indexRowData, indexSch)
|
||||
sqlRows, err = SqlRowsFromDurableIndex(indexRowData, indexSch)
|
||||
require.NoError(t, err)
|
||||
expected = sortInt64Rows(convertSqlRowToInt64(expected))
|
||||
sqlRows = sortInt64Rows(convertSqlRowToInt64(sqlRows))
|
||||
if !reflect.DeepEqual(expectedIndexRows, sqlRows) {
|
||||
@@ -883,67 +877,3 @@ ALTER TABLE three ADD FOREIGN KEY (v1, v2) REFERENCES two(v1, v2) ON DELETE CASC
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func sqlRowsFromDurableIndex(t *testing.T, idx durable.Index, sch schema.Schema) []sql.Row {
|
||||
ctx := context.Background()
|
||||
var sqlRows []sql.Row
|
||||
if types.Format_Default == types.Format_DOLT {
|
||||
rowData := durable.ProllyMapFromIndex(idx)
|
||||
kd, vd := rowData.Descriptors()
|
||||
iter, err := rowData.IterAll(ctx)
|
||||
require.NoError(t, err)
|
||||
for {
|
||||
var k, v val.Tuple
|
||||
k, v, err = iter.Next(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
sqlRow, err := sqlRowFromTuples(sch, kd, vd, k, v)
|
||||
require.NoError(t, err)
|
||||
sqlRows = append(sqlRows, sqlRow)
|
||||
}
|
||||
|
||||
} else {
|
||||
// types.Format_LD_1 and types.Format_DOLT_DEV
|
||||
rowData := durable.NomsMapFromIndex(idx)
|
||||
_ = rowData.IterAll(ctx, func(key, value types.Value) error {
|
||||
r, err := row.FromNoms(sch, key.(types.Tuple), value.(types.Tuple))
|
||||
assert.NoError(t, err)
|
||||
sqlRow, err := sqlutil.DoltRowToSqlRow(r, sch)
|
||||
assert.NoError(t, err)
|
||||
sqlRows = append(sqlRows, sqlRow)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return sqlRows
|
||||
}
|
||||
|
||||
func sqlRowFromTuples(sch schema.Schema, kd, vd val.TupleDesc, k, v val.Tuple) (sql.Row, error) {
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
r := make(sql.Row, sch.GetAllCols().Size())
|
||||
keyless := schema.IsKeyless(sch)
|
||||
|
||||
for i, col := range sch.GetAllCols().GetColumns() {
|
||||
pos, ok := sch.GetPKCols().TagToIdx[col.Tag]
|
||||
if ok {
|
||||
r[i], err = index.GetField(ctx, kd, pos, k, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
pos, ok = sch.GetNonPKCols().TagToIdx[col.Tag]
|
||||
if keyless {
|
||||
pos += 1 // compensate for cardinality field
|
||||
}
|
||||
if ok {
|
||||
r[i], err = index.GetField(ctx, vd, pos, v, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
@@ -314,43 +315,24 @@ func MutateRow(sch schema.Schema, r row.Row, tagsAndVals ...interface{}) row.Row
|
||||
return mutated
|
||||
}
|
||||
|
||||
func GetAllRows(root *doltdb.RootValue, tableName string) ([]row.Row, error) {
|
||||
func GetAllRows(root *doltdb.RootValue, tableName string) ([]sql.Row, error) {
|
||||
ctx := context.Background()
|
||||
table, _, err := root.GetTable(ctx, tableName)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rowData, err := table.GetNomsRowData(ctx)
|
||||
|
||||
rowIdx, err := table.GetRowData(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sch, err := table.GetSchema(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var rows []row.Row
|
||||
err = rowData.Iter(ctx, func(key, value types.Value) (stop bool, err error) {
|
||||
r, err := row.FromNoms(sch, key.(types.Tuple), value.(types.Tuple))
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
rows = append(rows, r)
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
return SqlRowsFromDurableIndex(rowIdx, sch)
|
||||
}
|
||||
|
||||
var idColTag0TypeUUID = schema.NewColumn("id", 0, types.IntKind, true)
|
||||
|
||||
@@ -29,15 +29,18 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
config2 "github.com/dolthub/dolt/go/libraries/utils/config"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
|
||||
// ExecuteSql executes all the SQL non-select statements given in the string against the root value given and returns
|
||||
@@ -426,3 +429,76 @@ func CreateTestDatabase(t *testing.T) *env.DoltEnv {
|
||||
require.NoError(t, err)
|
||||
return dEnv
|
||||
}
|
||||
|
||||
func SqlRowsFromDurableIndex(idx durable.Index, sch schema.Schema) ([]sql.Row, error) {
|
||||
ctx := context.Background()
|
||||
var sqlRows []sql.Row
|
||||
if types.Format_Default == types.Format_DOLT {
|
||||
rowData := durable.ProllyMapFromIndex(idx)
|
||||
kd, vd := rowData.Descriptors()
|
||||
iter, err := rowData.IterAll(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for {
|
||||
var k, v val.Tuple
|
||||
k, v, err = iter.Next(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sqlRow, err := sqlRowFromTuples(sch, kd, vd, k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sqlRows = append(sqlRows, sqlRow)
|
||||
}
|
||||
|
||||
} else {
|
||||
// types.Format_LD_1 and types.Format_DOLT_DEV
|
||||
rowData := durable.NomsMapFromIndex(idx)
|
||||
_ = rowData.IterAll(ctx, func(key, value types.Value) error {
|
||||
r, err := row.FromNoms(sch, key.(types.Tuple), value.(types.Tuple))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sqlRow, err := sqlutil.DoltRowToSqlRow(r, sch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sqlRows = append(sqlRows, sqlRow)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return sqlRows, nil
|
||||
}
|
||||
|
||||
func sqlRowFromTuples(sch schema.Schema, kd, vd val.TupleDesc, k, v val.Tuple) (sql.Row, error) {
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
r := make(sql.Row, sch.GetAllCols().Size())
|
||||
keyless := schema.IsKeyless(sch)
|
||||
|
||||
for i, col := range sch.GetAllCols().GetColumns() {
|
||||
pos, ok := sch.GetPKCols().TagToIdx[col.Tag]
|
||||
if ok {
|
||||
r[i], err = index.GetField(ctx, kd, pos, k, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
pos, ok = sch.GetNonPKCols().TagToIdx[col.Tag]
|
||||
if keyless {
|
||||
pos += 1 // compensate for cardinality field
|
||||
}
|
||||
if ok {
|
||||
r[i], err = index.GetField(ctx, vd, pos, v, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
@@ -131,7 +131,9 @@ cluster:
|
||||
# same role, new epoch
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12"
|
||||
# new role, new epoch
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13');" "status\n0"
|
||||
# we assert on a new connection, since the server leaves the previous connection in an permanent error state.
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
|
||||
|
||||
# Server comes back up with latest assumed role.
|
||||
kill $SERVER_PID
|
||||
@@ -205,6 +207,7 @@ cluster:
|
||||
cd serverone
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
@@ -236,6 +239,7 @@ cluster:
|
||||
cd ../servertwo
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
@@ -280,3 +284,609 @@ cluster:
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "| 5 " ]] || false
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: booted standby server is read only" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)'
|
||||
cd ..
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
SERVER_PID=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1
|
||||
[[ "$output" =~ "Database repo1 is read-only" ]] || false
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: booted primary server is read write" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)'
|
||||
cd ..
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
SERVER_PID=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" ""
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n10"
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: standby transitioned to primary becomes writable" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)'
|
||||
cd ..
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
SERVER_PID=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1
|
||||
[[ "$output" =~ "Database repo1 is read-only" ]] || false
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', 11)" "status\n0"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 0
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n10"
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: primary transitioned to standby becomes read only" {
|
||||
# In order to gracefully transition to standby, we will need the standby running.
|
||||
cd serverone
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
serverone_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
cd ../servertwo
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERTWO_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
servertwo_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5)"
|
||||
run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', 11)" "" 1
|
||||
run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1
|
||||
[[ "$output" =~ "Database repo1 is read-only" ]] || false
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
|
||||
kill $servertwo_pid
|
||||
wait $servertwo_pid
|
||||
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: misconfigured cluster with primaries at same epoch, both transition to detected_broken_config" {
|
||||
cd serverone
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
serverone_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
cd ../servertwo
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERTWO_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
servertwo_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
|
||||
# Run a query to make sure everyone sees everyone...
|
||||
run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "CREATE TABLE vals (i int primary key)" "" 1
|
||||
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\ndetected_broken_config,10"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\ndetected_broken_config,10"
|
||||
|
||||
kill $servertwo_pid
|
||||
wait $servertwo_pid
|
||||
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: an older primary comes up, becomes a standby and does not overwrite newer primary" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)'
|
||||
cd ../
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 15
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
serverone_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
cd ../servertwo
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1),(2),(3),(4),(5)'
|
||||
cd ../
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERTWO_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
servertwo_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT count(*) FROM vals" "count(*)\n10"
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,15"
|
||||
|
||||
kill $servertwo_pid
|
||||
wait $servertwo_pid
|
||||
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: a new primary comes up, old primary becomes a standby and has its state overwritten" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1),(2),(3),(4),(5)'
|
||||
cd ../
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
serverone_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
cd ../servertwo
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)'
|
||||
cd ../
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERTWO_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 15
|
||||
remotesapi:
|
||||
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
servertwo_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,15"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT count(*) FROM vals" "count(*)\n10"
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,15"
|
||||
|
||||
kill $servertwo_pid
|
||||
wait $servertwo_pid
|
||||
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: primary -> standby without the standby up fails" {
|
||||
cd serverone
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo2)
|
||||
dolt sql-server --config server.yaml &
|
||||
SERVER_PID=$!
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
# when we do this, we become read only for a little bit...
|
||||
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '11');" "" 1
|
||||
[[ "$output" =~ "failed to transition from primary to standby gracefully" ]] || false
|
||||
|
||||
# after that fails, we should still be primary and we should accept writes.
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (0);select i from vals" ";;i\n0"
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: standby -> primary leave connection used to transition in an error state" {
|
||||
cd serverone
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo2)
|
||||
dolt sql-server --config server.yaml &
|
||||
SERVER_PID=$!
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
# when we do this, we become read only for a little bit...
|
||||
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '11');select 2 from dual" "" 1
|
||||
[[ "$output" =~ "this connection can no longer be used" ]] || false
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: primary replicates to standby, fails over, new primary replicates to standby, fails over, new primary has all writes" {
|
||||
cd serverone
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
serverone_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
cd ../servertwo
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERTWO_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
servertwo_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (0),(1),(2),(3),(4);call dolt_assume_cluster_role('standby', 2);" ";;status\n0"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals;call dolt_assume_cluster_role('primary', 2)" "count(*)\n5;status\n0"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (5),(6),(7),(8),(9);call dolt_assume_cluster_role('standby', 3)" ";status\n0"
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals;call dolt_assume_cluster_role('primary', 3)" "count(*)\n10;status\n0"
|
||||
|
||||
kill $servertwo_pid
|
||||
wait $servertwo_pid
|
||||
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user