mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-09 03:09:12 -06:00
merge replication warnings PR
This commit is contained in:
@@ -58,6 +58,8 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/osutil"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/tracing"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
type batchMode int64
|
||||
@@ -66,8 +68,6 @@ const (
|
||||
invalidBatchMode batchMode = iota
|
||||
insertBatchMode
|
||||
deleteBatchMode
|
||||
|
||||
currentBatchModeKey = "batch_mode"
|
||||
)
|
||||
|
||||
var sqlDocs = cli.CommandDocumentationContent{
|
||||
@@ -108,40 +108,7 @@ const (
|
||||
var delimiterRegex = regexp.MustCompile(`(?i)^\s*DELIMITER\s+(\S+)\s*(\s+\S+\s*)?$`)
|
||||
|
||||
func init() {
|
||||
sql.SystemVariables.AddSystemVariables([]sql.SystemVariable{
|
||||
{
|
||||
Name: currentBatchModeKey,
|
||||
Scope: sql.SystemVariableScope_Session,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemIntType(currentBatchModeKey, -9223372036854775808, 9223372036854775807, false),
|
||||
Default: int64(0),
|
||||
},
|
||||
{
|
||||
Name: dsess.DoltDefaultBranchKey,
|
||||
Scope: sql.SystemVariableScope_Global,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemStringType(dsess.DoltDefaultBranchKey),
|
||||
Default: "",
|
||||
},
|
||||
{
|
||||
Name: doltdb.ReplicateToRemoteKey,
|
||||
Scope: sql.SystemVariableScope_Global,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemStringType(doltdb.ReplicateToRemoteKey),
|
||||
Default: "",
|
||||
},
|
||||
{
|
||||
Name: doltdb.DoltReadReplicaKey,
|
||||
Scope: sql.SystemVariableScope_Global,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemStringType(doltdb.DoltReadReplicaKey),
|
||||
Default: "",
|
||||
},
|
||||
})
|
||||
dsqle.AddDoltSystemVariables()
|
||||
}
|
||||
|
||||
type SqlCmd struct {
|
||||
@@ -502,6 +469,35 @@ func newDatabase(name string, dEnv *env.DoltEnv) dsqle.Database {
|
||||
return dsqle.NewDatabase(name, dEnv.DbData(), opts)
|
||||
}
|
||||
|
||||
// newReplicaDatabase creates a new dsqle.ReadReplicaDatabase. If the doltdb.SkipReplicationErrorsKey global variable is set,
|
||||
// skip errors related to database construction only and return a partially functional dsqle.ReadReplicaDatabase
|
||||
// that will log warnings when attempting to perform replica commands.
|
||||
func newReplicaDatabase(ctx context.Context, name string, remoteName string, dEnv *env.DoltEnv) (dsqle.ReadReplicaDatabase, error) {
|
||||
var skipErrors bool
|
||||
if _, val, ok := sql.SystemVariables.GetGlobal(dsqle.SkipReplicationErrorsKey); !ok {
|
||||
return dsqle.ReadReplicaDatabase{}, sql.ErrUnknownSystemVariable.New(dsqle.SkipReplicationErrorsKey)
|
||||
} else if val == int8(1) {
|
||||
skipErrors = true
|
||||
}
|
||||
|
||||
opts := editor.Options{
|
||||
Deaf: dEnv.DbEaFactory(),
|
||||
}
|
||||
|
||||
db := dsqle.NewDatabase(name, dEnv.DbData(), opts)
|
||||
|
||||
rrd, err := dsqle.NewReadReplicaDatabase(ctx, db, remoteName, dEnv.RepoStateReader(), dEnv.TempTableFilesDir())
|
||||
if err != nil {
|
||||
err = fmt.Errorf("%w from remote '%s'; %s", dsqle.ErrFailedToLoadReplicaDB, remoteName, err.Error())
|
||||
if !skipErrors {
|
||||
return dsqle.ReadReplicaDatabase{}, err
|
||||
}
|
||||
cli.Println(err)
|
||||
return dsqle.ReadReplicaDatabase{Database: db}, nil
|
||||
}
|
||||
return rrd, nil
|
||||
}
|
||||
|
||||
func execQuery(
|
||||
ctx context.Context,
|
||||
mrEnv *env.MultiRepoEnv,
|
||||
@@ -538,13 +534,70 @@ func execQuery(
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPushOnWriteHook(ctx context.Context, dEnv *env.DoltEnv) (*doltdb.PushOnWriteHook, error) {
|
||||
_, val, ok := sql.SystemVariables.GetGlobal(dsqle.ReplicateToRemoteKey)
|
||||
if !ok {
|
||||
return nil, sql.ErrUnknownSystemVariable.New(dsqle.SkipReplicationErrorsKey)
|
||||
} else if val == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
remoteName, ok := val.(string)
|
||||
if !ok {
|
||||
return nil, sql.ErrInvalidSystemVariableValue.New(val)
|
||||
}
|
||||
|
||||
remotes, err := dEnv.GetRemotes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rem, ok := remotes[remoteName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: '%s'", env.ErrRemoteNotFound, remoteName)
|
||||
}
|
||||
|
||||
ddb, err := rem.GetRemoteDB(ctx, types.Format_Default)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pushHook := doltdb.NewPushOnWriteHook(ddb, dEnv.TempTableFilesDir())
|
||||
return pushHook, nil
|
||||
}
|
||||
|
||||
// GetCommitHooks creates a list of hooks to execute on database commit. If doltdb.SkipReplicationErrorsKey is set,
|
||||
// replace misconfigured hooks with doltdb.LogHook instances that prints a warning when trying to execute.
|
||||
func GetCommitHooks(ctx context.Context, dEnv *env.DoltEnv) ([]datas.CommitHook, error) {
|
||||
postCommitHooks := make([]datas.CommitHook, 0)
|
||||
var skipErrors bool
|
||||
if _, val, ok := sql.SystemVariables.GetGlobal(dsqle.SkipReplicationErrorsKey); !ok {
|
||||
return nil, sql.ErrUnknownSystemVariable.New(dsqle.SkipReplicationErrorsKey)
|
||||
} else if val == int8(1) {
|
||||
skipErrors = true
|
||||
}
|
||||
|
||||
if hook, err := getPushOnWriteHook(ctx, dEnv); err != nil {
|
||||
err = fmt.Errorf("failure loading hook; %w", err)
|
||||
if skipErrors {
|
||||
postCommitHooks = append(postCommitHooks, doltdb.NewLogHook([]byte(err.Error()+"\n")))
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
} else if hook != nil {
|
||||
postCommitHooks = append(postCommitHooks, hook)
|
||||
}
|
||||
|
||||
return postCommitHooks, nil
|
||||
}
|
||||
|
||||
// CollectDBs takes a MultiRepoEnv and creates Database objects from each environment and returns a slice of these
|
||||
// objects.
|
||||
func CollectDBs(ctx context.Context, mrEnv *env.MultiRepoEnv) ([]dsqle.SqlDatabase, error) {
|
||||
var dbs []dsqle.SqlDatabase
|
||||
var db dsqle.SqlDatabase
|
||||
err := mrEnv.Iter(func(name string, dEnv *env.DoltEnv) (stop bool, err error) {
|
||||
postCommitHooks, err := env.GetCommitHooks(ctx, dEnv)
|
||||
postCommitHooks, err := GetCommitHooks(ctx, dEnv)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
@@ -552,13 +605,12 @@ func CollectDBs(ctx context.Context, mrEnv *env.MultiRepoEnv) ([]dsqle.SqlDataba
|
||||
|
||||
db = newDatabase(name, dEnv)
|
||||
|
||||
if _, val, ok := sql.SystemVariables.GetGlobal(doltdb.DoltReadReplicaKey); ok && val != "" {
|
||||
remoteName, ok := val.(string)
|
||||
if _, remote, ok := sql.SystemVariables.GetGlobal(dsqle.ReadReplicaRemoteKey); ok && remote != "" {
|
||||
remoteName, ok := remote.(string)
|
||||
if !ok {
|
||||
return true, sql.ErrInvalidSystemVariableValue.New(val)
|
||||
return true, sql.ErrInvalidSystemVariableValue.New(remote)
|
||||
}
|
||||
|
||||
db, err = dsqle.NewReadReplicaDatabase(ctx, db.(dsqle.Database), remoteName, dEnv.RepoStateReader(), dEnv.TempTableFilesDir(), doltdb.TodoWorkingSetMeta())
|
||||
db, err = newReplicaDatabase(ctx, name, remoteName, dEnv)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
@@ -1184,7 +1236,7 @@ func processBatchQuery(ctx *sql.Context, query string, se *sqlEngine) error {
|
||||
}
|
||||
|
||||
currentBatchMode := invalidBatchMode
|
||||
if v, err := ctx.GetSessionVariable(ctx, currentBatchModeKey); err == nil {
|
||||
if v, err := ctx.GetSessionVariable(ctx, dsqle.CurrentBatchModeKey); err == nil {
|
||||
currentBatchMode = batchMode(v.(int64))
|
||||
} else {
|
||||
return err
|
||||
@@ -1204,7 +1256,7 @@ func processBatchQuery(ctx *sql.Context, query string, se *sqlEngine) error {
|
||||
}
|
||||
}
|
||||
|
||||
err = ctx.SetSessionVariable(ctx, currentBatchModeKey, int64(newBatchMode))
|
||||
err = ctx.SetSessionVariable(ctx, dsqle.CurrentBatchModeKey, int64(newBatchMode))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1507,6 +1559,11 @@ func newSqlEngine(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// this is overwritten only for server sessions
|
||||
for _, db := range dbs {
|
||||
db.DbData().Ddb.SetCommitHookLogger(ctx, cli.CliOut)
|
||||
}
|
||||
|
||||
// TODO: this should just be the session default like it is with MySQL
|
||||
err = sess.SetSessionVariable(sql.NewContext(ctx), sql.AutoCommitSessionVar, true)
|
||||
if err != nil {
|
||||
|
||||
@@ -18,11 +18,14 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
@@ -559,3 +562,21 @@ func TestDelete(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCommitHooksNoErrors(t *testing.T) {
|
||||
dEnv := dtestutils.CreateEnvWithSeedData(t)
|
||||
sqle.AddDoltSystemVariables()
|
||||
sql.SystemVariables.SetGlobal(sqle.SkipReplicationErrorsKey, true)
|
||||
sql.SystemVariables.SetGlobal(sqle.ReplicateToRemoteKey, "unknown")
|
||||
hooks, err := GetCommitHooks(context.Background(), dEnv)
|
||||
assert.NoError(t, err)
|
||||
if len(hooks) < 1 {
|
||||
t.Error("failed to produce noop hook")
|
||||
} else {
|
||||
switch h := hooks[0].(type) {
|
||||
case *doltdb.LogHook:
|
||||
default:
|
||||
t.Errorf("expected LogHook, found: %s", h)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -228,7 +228,7 @@ func getDbStates(ctx context.Context, dbs []dsqle.SqlDatabase) ([]dsess.InitialD
|
||||
var init dsess.InitialDbState
|
||||
var err error
|
||||
|
||||
_, val, ok := sql.SystemVariables.GetGlobal(dsess.DoltDefaultBranchKey)
|
||||
_, val, ok := sql.SystemVariables.GetGlobal(dsqle.DefaultBranchKey)
|
||||
if ok && val != "" {
|
||||
init, err = GetInitialDBStateWithDefaultBranch(ctx, db, val.(string))
|
||||
} else {
|
||||
|
||||
@@ -26,10 +26,10 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/testcommands"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/config"
|
||||
)
|
||||
@@ -399,7 +399,7 @@ func TestReadReplica(t *testing.T) {
|
||||
if !ok {
|
||||
t.Fatal("local config does not exist")
|
||||
}
|
||||
config.NewPrefixConfig(localCfg, env.SqlServerGlobalsPrefix).SetStrings(map[string]string{doltdb.DoltReadReplicaKey: "remote1"})
|
||||
config.NewPrefixConfig(localCfg, env.SqlServerGlobalsPrefix).SetStrings(map[string]string{sqle.ReadReplicaRemoteKey: "remote1"})
|
||||
dsess.InitPersistedSystemVars(multiSetup.MrEnv.GetEnv(readReplicaDbName))
|
||||
|
||||
// start server as read replica
|
||||
|
||||
@@ -23,44 +23,41 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
)
|
||||
|
||||
const (
|
||||
ReplicateToRemoteKey = "dolt_replicate_to_remote"
|
||||
DoltReadReplicaKey = "dolt_read_replica_remote"
|
||||
)
|
||||
|
||||
type ReplicateHook struct {
|
||||
type PushOnWriteHook struct {
|
||||
destDB datas.Database
|
||||
tmpDir string
|
||||
outf io.Writer
|
||||
out io.Writer
|
||||
}
|
||||
|
||||
// NewReplicateHook creates a ReplicateHook, parameterizaed by the backup database
|
||||
var _ datas.CommitHook = (*PushOnWriteHook)(nil)
|
||||
|
||||
// NewPushOnWriteHook creates a ReplicateHook, parameterizaed by the backup database
|
||||
// and a local tempfile for pushing
|
||||
func NewReplicateHook(destDB *DoltDB, tmpDir string) *ReplicateHook {
|
||||
return &ReplicateHook{destDB: destDB.db, tmpDir: tmpDir}
|
||||
func NewPushOnWriteHook(destDB *DoltDB, tmpDir string) *PushOnWriteHook {
|
||||
return &PushOnWriteHook{destDB: destDB.db, tmpDir: tmpDir}
|
||||
}
|
||||
|
||||
// Execute implements datas.CommitHook, replicates head updates to the destDb field
|
||||
func (rh *ReplicateHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
|
||||
return replicate(ctx, rh.destDB, db, rh.tmpDir, ds)
|
||||
func (rh *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
|
||||
return pushDataset(ctx, rh.destDB, db, rh.tmpDir, ds)
|
||||
}
|
||||
|
||||
// HandleError implements datas.CommitHook
|
||||
func (rh *ReplicateHook) HandleError(ctx context.Context, err error) error {
|
||||
if rh.outf != nil {
|
||||
rh.outf.Write([]byte(err.Error()))
|
||||
func (rh *PushOnWriteHook) HandleError(ctx context.Context, err error) error {
|
||||
if rh.out != nil {
|
||||
rh.out.Write([]byte(err.Error()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetLogger implements datas.CommitHook
|
||||
func (rh *ReplicateHook) SetLogger(ctx context.Context, wr io.Writer) error {
|
||||
rh.outf = wr
|
||||
func (rh *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {
|
||||
rh.out = wr
|
||||
return nil
|
||||
}
|
||||
|
||||
// replicate pushes a dataset from srcDB to destDB and force sets the destDB ref to the new dataset value
|
||||
func replicate(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error {
|
||||
func pushDataset(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error {
|
||||
stRef, ok, err := ds.MaybeHeadRef()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -99,3 +96,38 @@ func replicate(ctx context.Context, destDB, srcDB datas.Database, tempTableDir s
|
||||
_, err = destDB.SetHead(ctx, ds, stRef)
|
||||
return err
|
||||
}
|
||||
|
||||
type LogHook struct {
|
||||
msg []byte
|
||||
out io.Writer
|
||||
}
|
||||
|
||||
var _ datas.CommitHook = (*LogHook)(nil)
|
||||
|
||||
// NewLogHook is a noop that logs to a writer when invoked
|
||||
func NewLogHook(msg []byte) *LogHook {
|
||||
return &LogHook{msg: msg}
|
||||
}
|
||||
|
||||
// Execute implements datas.CommitHook, writes message to log channel
|
||||
func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
|
||||
if lh.out != nil {
|
||||
_, err := lh.out.Write(lh.msg)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleError implements datas.CommitHook
|
||||
func (lh *LogHook) HandleError(ctx context.Context, err error) error {
|
||||
if lh.out != nil {
|
||||
lh.out.Write([]byte(err.Error()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetLogger implements datas.CommitHook
|
||||
func (lh *LogHook) SetLogger(ctx context.Context, wr io.Writer) error {
|
||||
lh.out = wr
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ import (
|
||||
|
||||
const defaultBranch = "main"
|
||||
|
||||
func TestReplicateHook(t *testing.T) {
|
||||
func TestPushOnWriteHook(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// destination repo
|
||||
@@ -121,10 +121,10 @@ func TestReplicateHook(t *testing.T) {
|
||||
}
|
||||
|
||||
// setup hook
|
||||
hook := NewReplicateHook(destDB, tmpDir)
|
||||
hook := NewPushOnWriteHook(destDB, tmpDir)
|
||||
ddb.SetCommitHooks(ctx, []datas.CommitHook{hook})
|
||||
|
||||
t.Run("replicate to backup remote", func(t *testing.T) {
|
||||
t.Run("replicate to remote", func(t *testing.T) {
|
||||
srcCommit, err := ddb.Commit(context.Background(), valHash, ref.NewBranchRef(defaultBranch), meta)
|
||||
ds, err := ddb.db.GetDataset(ctx, "refs/heads/main")
|
||||
err = hook.Execute(ctx, ds, ddb.db)
|
||||
@@ -149,3 +149,17 @@ func TestReplicateHook(t *testing.T) {
|
||||
assert.Equal(t, buffer.String(), msg)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLogHook(t *testing.T) {
|
||||
msg := []byte("hello")
|
||||
var err error
|
||||
t.Run("new log hook", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
hook := NewLogHook(msg)
|
||||
var buffer = &bytes.Buffer{}
|
||||
err = hook.SetLogger(ctx, buffer)
|
||||
assert.NoError(t, err)
|
||||
hook.Execute(ctx, datas.Dataset{}, nil)
|
||||
assert.Equal(t, buffer.Bytes(), msg)
|
||||
})
|
||||
}
|
||||
|
||||
33
go/libraries/doltcore/env/environment.go
vendored
33
go/libraries/doltcore/env/environment.go
vendored
@@ -25,7 +25,6 @@ import (
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
@@ -39,7 +38,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/config"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
@@ -58,37 +56,6 @@ const (
|
||||
tempTablesDir = "temptf"
|
||||
)
|
||||
|
||||
func GetCommitHooks(ctx context.Context, dEnv *DoltEnv) ([]datas.CommitHook, error) {
|
||||
postCommitHooks := make([]datas.CommitHook, 0)
|
||||
if _, val, ok := sql.SystemVariables.GetGlobal(doltdb.ReplicateToRemoteKey); ok && val != "" {
|
||||
backupName, ok := val.(string)
|
||||
if !ok {
|
||||
return nil, sql.ErrInvalidSystemVariableValue.New(val)
|
||||
}
|
||||
|
||||
remotes, err := dEnv.GetRemotes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rem, ok := remotes[backupName]
|
||||
if !ok {
|
||||
return nil, ErrRemoteNotFound
|
||||
}
|
||||
ddb, err := rem.GetRemoteDB(ctx, types.Format_Default)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
replicateHook := doltdb.NewReplicateHook(ddb, dEnv.TempTableFilesDir())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
postCommitHooks = append(postCommitHooks, replicateHook)
|
||||
}
|
||||
|
||||
return postCommitHooks, nil
|
||||
}
|
||||
|
||||
var zeroHashStr = (hash.Hash{}).String()
|
||||
|
||||
var ErrPreexistingDoltDir = errors.New(".dolt dir already exists")
|
||||
|
||||
@@ -65,7 +65,10 @@ func DbsAsDSQLDBs(dbs []sql.Database) []SqlDatabase {
|
||||
switch v := sqlDb.(type) {
|
||||
case ReadReplicaDatabase, Database:
|
||||
dsqlDBs = append(dsqlDBs, v)
|
||||
case ReadOnlyDatabase, *UserSpaceDatabase:
|
||||
default:
|
||||
// esoteric analyzer errors occur if we silently drop databases, usually caused by pointer receivers
|
||||
panic("cannot cast to SqlDatabase")
|
||||
}
|
||||
}
|
||||
return dsqlDBs
|
||||
|
||||
@@ -144,12 +144,18 @@ func getPersistedValue(conf config.ReadableConfig, k string) (interface{}, error
|
||||
|
||||
var res interface{}
|
||||
switch value.(type) {
|
||||
case int, int8, int16, int32, int64:
|
||||
case int8:
|
||||
var tmp int64
|
||||
tmp, err = strconv.ParseInt(v, 10, 8)
|
||||
res = int8(tmp)
|
||||
case int, int16, int32, int64:
|
||||
res, err = strconv.ParseInt(v, 10, 64)
|
||||
case uint, uint8, uint16, uint32, uint64:
|
||||
res, err = strconv.ParseUint(v, 10, 64)
|
||||
case float32, float64:
|
||||
res, err = strconv.ParseFloat(v, 64)
|
||||
case bool:
|
||||
return nil, sql.ErrInvalidType.New(value)
|
||||
case string:
|
||||
return v, nil
|
||||
default:
|
||||
@@ -192,6 +198,8 @@ func setPersistedValue(conf config.WritableConfig, key string, value interface{}
|
||||
return config.SetFloat(conf, key, v)
|
||||
case string:
|
||||
return config.SetString(conf, key, v)
|
||||
case bool:
|
||||
return sql.ErrInvalidType.New(v)
|
||||
default:
|
||||
return sql.ErrInvalidType.New(v)
|
||||
}
|
||||
|
||||
@@ -136,6 +136,16 @@ func TestSetPersistedValue(t *testing.T) {
|
||||
Name: "string",
|
||||
Value: "7",
|
||||
},
|
||||
{
|
||||
Name: "bool",
|
||||
Value: true,
|
||||
Err: sql.ErrInvalidType,
|
||||
},
|
||||
{
|
||||
Name: "bool",
|
||||
Value: false,
|
||||
Err: sql.ErrInvalidType,
|
||||
},
|
||||
{
|
||||
Value: complex64(7),
|
||||
Err: sql.ErrInvalidType,
|
||||
@@ -160,32 +170,67 @@ func TestSetPersistedValue(t *testing.T) {
|
||||
func TestGetPersistedValue(t *testing.T) {
|
||||
tests := []struct {
|
||||
Name string
|
||||
Value string
|
||||
ExpectedRes interface{}
|
||||
Err *errors.Kind
|
||||
Err bool
|
||||
}{
|
||||
{
|
||||
Name: "long_query_time",
|
||||
Value: "7",
|
||||
ExpectedRes: float64(7),
|
||||
},
|
||||
{
|
||||
Name: "tls_ciphersuites",
|
||||
Value: "7",
|
||||
ExpectedRes: "7",
|
||||
},
|
||||
{
|
||||
Name: "max_connections",
|
||||
Value: "7",
|
||||
ExpectedRes: int64(7),
|
||||
},
|
||||
{
|
||||
Name: "tmp_table_size",
|
||||
Value: "7",
|
||||
ExpectedRes: uint64(7),
|
||||
},
|
||||
{
|
||||
Name: "activate_all_roles_on_login",
|
||||
Value: "true",
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Name: "activate_all_roles_on_login",
|
||||
Value: "on",
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Name: "activate_all_roles_on_login",
|
||||
Value: "1",
|
||||
ExpectedRes: int8(1),
|
||||
},
|
||||
{
|
||||
Name: "activate_all_roles_on_login",
|
||||
Value: "false",
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Name: "activate_all_roles_on_login",
|
||||
Value: "off",
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Name: "activate_all_roles_on_login",
|
||||
Value: "0",
|
||||
ExpectedRes: int8(0),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.Name, func(t *testing.T) {
|
||||
conf := config.NewMapConfig(map[string]string{tt.Name: "7"})
|
||||
if val, err := getPersistedValue(conf, tt.Name); tt.Err != nil {
|
||||
assert.True(t, tt.Err.Is(err))
|
||||
conf := config.NewMapConfig(map[string]string{tt.Name: tt.Value})
|
||||
if val, err := getPersistedValue(conf, tt.Name); tt.Err {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.Equal(t, tt.ExpectedRes, val)
|
||||
}
|
||||
|
||||
@@ -44,7 +44,6 @@ const (
|
||||
DoltCommitOnTransactionCommit = "dolt_transaction_commit"
|
||||
TransactionsDisabledSysVar = "dolt_transactions_disabled"
|
||||
ForceTransactionCommit = "dolt_force_transaction_commit"
|
||||
DoltDefaultBranchKey = "dolt_default_branch"
|
||||
)
|
||||
|
||||
const NonpersistableSessionCode = 1105 // default
|
||||
|
||||
@@ -16,6 +16,8 @@ package sqle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
@@ -45,9 +47,11 @@ var _ sql.TriggerDatabase = &ReadReplicaDatabase{}
|
||||
var _ sql.StoredProcedureDatabase = ReadReplicaDatabase{}
|
||||
var _ sql.TransactionDatabase = ReadReplicaDatabase{}
|
||||
|
||||
var ErrFailedToLoadReplicaDB = errors.New("failed to load replica database")
|
||||
|
||||
var EmptyReadReplica = ReadReplicaDatabase{}
|
||||
|
||||
func NewReadReplicaDatabase(ctx context.Context, db Database, remoteName string, rsr env.RepoStateReader, tmpDir string, meta *doltdb.WorkingSetMeta) (ReadReplicaDatabase, error) {
|
||||
func NewReadReplicaDatabase(ctx context.Context, db Database, remoteName string, rsr env.RepoStateReader, tmpDir string) (ReadReplicaDatabase, error) {
|
||||
remotes, err := rsr.GetRemotes()
|
||||
if err != nil {
|
||||
return EmptyReadReplica, err
|
||||
@@ -55,7 +59,7 @@ func NewReadReplicaDatabase(ctx context.Context, db Database, remoteName string,
|
||||
|
||||
remote, ok := remotes[remoteName]
|
||||
if !ok {
|
||||
return EmptyReadReplica, env.ErrRemoteNotFound
|
||||
return EmptyReadReplica, fmt.Errorf("%w: '%s'", env.ErrRemoteNotFound, remoteName)
|
||||
}
|
||||
|
||||
srcDB, err := remote.GetRemoteDB(ctx, types.Format_Default)
|
||||
@@ -91,9 +95,13 @@ func NewReadReplicaDatabase(ctx context.Context, db Database, remoteName string,
|
||||
}
|
||||
|
||||
func (rrd ReadReplicaDatabase) StartTransaction(ctx *sql.Context, tCharacteristic sql.TransactionCharacteristic) (sql.Transaction, error) {
|
||||
err := rrd.pullFromReplica(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if rrd.srcDB != nil {
|
||||
err := rrd.pullFromReplica(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
ctx.GetLogger().Warn("replication failure; dolt_replication_remote value is misconfigured")
|
||||
}
|
||||
return rrd.Database.StartTransaction(ctx, tCharacteristic)
|
||||
}
|
||||
|
||||
72
go/libraries/doltcore/sqle/system_variables.go
Normal file
72
go/libraries/doltcore/sqle/system_variables.go
Normal file
@@ -0,0 +1,72 @@
|
||||
// Copyright 2021 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqle
|
||||
|
||||
import (
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultBranchKey = "dolt_default_branch"
|
||||
ReplicateToRemoteKey = "dolt_replicate_to_remote"
|
||||
ReadReplicaRemoteKey = "dolt_read_replica_remote"
|
||||
SkipReplicationErrorsKey = "dolt_skip_replication_errors"
|
||||
CurrentBatchModeKey = "batch_mode"
|
||||
)
|
||||
|
||||
func AddDoltSystemVariables() {
|
||||
sql.SystemVariables.AddSystemVariables([]sql.SystemVariable{
|
||||
{
|
||||
Name: CurrentBatchModeKey,
|
||||
Scope: sql.SystemVariableScope_Session,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemIntType(CurrentBatchModeKey, -9223372036854775808, 9223372036854775807, false),
|
||||
Default: int64(0),
|
||||
},
|
||||
{
|
||||
Name: DefaultBranchKey,
|
||||
Scope: sql.SystemVariableScope_Global,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemStringType(DefaultBranchKey),
|
||||
Default: "",
|
||||
},
|
||||
{
|
||||
Name: ReplicateToRemoteKey,
|
||||
Scope: sql.SystemVariableScope_Global,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemStringType(ReplicateToRemoteKey),
|
||||
Default: "",
|
||||
},
|
||||
{
|
||||
Name: ReadReplicaRemoteKey,
|
||||
Scope: sql.SystemVariableScope_Global,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemStringType(ReadReplicaRemoteKey),
|
||||
Default: "",
|
||||
},
|
||||
{
|
||||
Name: SkipReplicationErrorsKey,
|
||||
Scope: sql.SystemVariableScope_Global,
|
||||
Dynamic: true,
|
||||
SetVarHintApplies: false,
|
||||
Type: sql.NewSystemBoolType(SkipReplicationErrorsKey),
|
||||
Default: int8(0),
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -28,9 +28,9 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
srv "github.com/dolthub/dolt/go/cmd/dolt/commands/sqlserver"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/testcommands"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
|
||||
)
|
||||
|
||||
type query string
|
||||
@@ -120,7 +120,7 @@ func getEnvAndConfig(ctx context.Context, b *testing.B) (dEnv *env.DoltEnv, cfg
|
||||
if !ok {
|
||||
b.Fatal("local config does not exist")
|
||||
}
|
||||
localCfg.SetStrings(map[string]string{doltdb.ReplicateToRemoteKey: "remote1"})
|
||||
localCfg.SetStrings(map[string]string{sqle.ReplicateToRemoteKey: "remote1"})
|
||||
|
||||
yaml := []byte(fmt.Sprintf(`
|
||||
log_level: warning
|
||||
|
||||
@@ -99,3 +99,58 @@ teardown() {
|
||||
[ "${#lines[@]}" -eq 2 ]
|
||||
[[ "$output" =~ "t1" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: no remote error" {
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.DOLT_REPLICATE_TO_REMOTE unknown
|
||||
run dolt sql -q "create table t1 (a int primary key)"
|
||||
[ "$status" -eq 1 ]
|
||||
[[ ! "$output" =~ "panic" ]] || false
|
||||
[[ "$output" =~ "failure loading hook; remote not found: 'unknown'" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: quiet replication warnings" {
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_skip_replication_errors 1
|
||||
dolt config --local --add sqlserver.global.DOLT_REPLICATE_TO_REMOTE unknown
|
||||
run dolt sql -q "create table t1 (a int primary key)"
|
||||
[ "$status" -eq 0 ]
|
||||
[[ ! "$output" =~ "remote not found" ]] || false
|
||||
|
||||
run dolt sql -q "select dolt_commit('-am', 'cm')"
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "failure loading hook; remote not found: 'unknown'" ]] || false
|
||||
[[ "$output" =~ "dolt_commit('-am', 'cm')" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: bad source doesn't error during non-transactional commands" {
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.DOLT_READ_REPLICA_REMOTE unknown
|
||||
|
||||
run dolt status
|
||||
[ "$status" -eq 0 ]
|
||||
[[ ! "$output" =~ "remote not found: 'unknown'" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: replica sink errors" {
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.DOLT_READ_REPLICA_REMOTE unknown
|
||||
|
||||
run dolt sql -q "show tables"
|
||||
[ "$status" -eq 1 ]
|
||||
[[ ! "$output" =~ "panic" ]]
|
||||
[[ "$output" =~ "remote not found: 'unknown'" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: replica sink quiet warning" {
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.DOLT_READ_REPLICA_REMOTE unknown
|
||||
dolt config --local --add sqlserver.global.dolt_skip_replication_errors 1
|
||||
|
||||
run dolt sql -q "show tables"
|
||||
[ "$status" -eq 0 ]
|
||||
[[ ! "$output" =~ "panic" ]]
|
||||
[[ "$output" =~ "remote not found: 'unknown'" ]] || false
|
||||
[[ "$output" =~ "dolt_replication_remote value is misconfigured" ]] || false
|
||||
}
|
||||
|
||||
|
||||
139
integration-tests/bats/sql-server-remotes.bats
Normal file
139
integration-tests/bats/sql-server-remotes.bats
Normal file
@@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env bats
|
||||
load $BATS_TEST_DIRNAME/helper/common.bash
|
||||
load $BATS_TEST_DIRNAME/helper/query-server-common.bash
|
||||
|
||||
make_repo() {
|
||||
mkdir "$1"
|
||||
cd "$1"
|
||||
dolt init
|
||||
cd ..
|
||||
}
|
||||
|
||||
setup() {
|
||||
setup_no_dolt_init
|
||||
make_repo repo1
|
||||
make_repo repo2
|
||||
mkdir rem1
|
||||
}
|
||||
|
||||
teardown() {
|
||||
stop_sql_server
|
||||
teardown_common
|
||||
}
|
||||
|
||||
@test "sql-server-remotes: sql-push --set-remote within session" {
|
||||
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
|
||||
|
||||
cd repo1
|
||||
dolt remote add remote1 file://../rem1
|
||||
dolt remote add origin file://../rem1
|
||||
start_sql_server repo1
|
||||
|
||||
dolt push origin main
|
||||
run server_query repo1 1 "select dolt_push() as p" "p\n0"
|
||||
[ "$status" -eq 1 ]
|
||||
[[ "$output" =~ "the current branch has no upstream branch" ]] || false
|
||||
|
||||
server_query repo1 1 "select dolt_push('--set-upstream', 'origin', 'main') as p" "p\n1"
|
||||
|
||||
skip "In-memory branch doesn't track upstream"
|
||||
server_query repo1 1 "select dolt_push() as p" "p\n1"
|
||||
}
|
||||
|
||||
@test "sql-server-remotes: replicate to remote after sql-session commit" {
|
||||
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
|
||||
|
||||
cd repo1
|
||||
dolt remote add remote1 file://../rem1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_to_remote remote1
|
||||
start_sql_server repo1
|
||||
|
||||
multi_query repo1 1 "
|
||||
CREATE TABLE test (
|
||||
pk int primary key
|
||||
);
|
||||
INSERT INTO test VALUES (0),(1),(2);
|
||||
SELECT DOLT_ADD('.');
|
||||
SELECT DOLT_COMMIT('-m', 'Step 1');"
|
||||
|
||||
cd ..
|
||||
dolt clone file://./rem1 repo3
|
||||
cd repo3
|
||||
run dolt sql -q "select * from test" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk" ]]
|
||||
[[ "${lines[1]}" =~ "0" ]]
|
||||
[[ "${lines[2]}" =~ "1" ]]
|
||||
[[ "${lines[3]}" =~ "2" ]]
|
||||
}
|
||||
|
||||
@test "sql-server-remotes: read-replica pulls new commits on read" {
|
||||
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
|
||||
|
||||
cd repo2
|
||||
dolt remote add remote1 file://../rem1
|
||||
dolt push -u remote1 main
|
||||
|
||||
cd ..
|
||||
rm -rf repo1
|
||||
dolt clone file://./rem1 repo1
|
||||
cd repo1
|
||||
dolt remote add remote1 file://../rem1
|
||||
|
||||
cd ../repo2
|
||||
dolt sql -q "create table test (a int)"
|
||||
dolt commit -am "new commit"
|
||||
dolt push -u remote1 main
|
||||
|
||||
cd ../repo1
|
||||
dolt config --local --add sqlserver.global.dolt_read_replica_remote remote1
|
||||
start_sql_server repo1
|
||||
|
||||
server_query repo1 1 "show tables" "Table\ntest"
|
||||
}
|
||||
|
||||
@test "sql-server-remotes: replica remote not found error" {
|
||||
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
|
||||
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_read_replica_remote unknown
|
||||
|
||||
run dolt sql-server
|
||||
[ "$status" -eq 1 ]
|
||||
[[ ! "$output" =~ "panic" ]]
|
||||
[[ "$output" =~ "remote not found: 'unknown'" ]] || false
|
||||
}
|
||||
|
||||
@test "sql-server-remotes: quiet replica warnings" {
|
||||
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
|
||||
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_skip_replication_errors 1
|
||||
dolt config --local --add sqlserver.global.dolt_read_replica_remote unknown
|
||||
start_sql_server repo1
|
||||
|
||||
run server_query repo1 1 "show tables" "Table\n"
|
||||
}
|
||||
|
||||
@test "sql-server-remotes: replication source remote not found error" {
|
||||
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
|
||||
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_to_remote unknown
|
||||
|
||||
run dolt sql-server
|
||||
[ "$status" -eq 1 ]
|
||||
[[ ! "$output" =~ "panic" ]]
|
||||
[[ "$output" =~ "remote not found: 'unknown'" ]] || false
|
||||
}
|
||||
|
||||
@test "sql-server-remotes: quiet replication source warnings" {
|
||||
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
|
||||
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_skip_replication_errors 1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_to_remote unknown
|
||||
start_sql_server repo1
|
||||
|
||||
server_query repo1 1 "show tables" "Table\n"
|
||||
}
|
||||
Reference in New Issue
Block a user