Async push for replication (#2369)

* starter code

* async replication prototype

* benchmark async push

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* delete hanging line

* add comment for concurrent map access

* move replication into databaseProvider

* top-level wait group for async threads

* fix testsg

* standardize wg format

* missing header

* address data race

* make async test shorter

* delete unnecessary lines

* starter code for new sqlEngine interface

* new engine interface with background threads

* brian's comments

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* missed a datarace

* fix bats

* bad go.sum

* add latest GMS commit

* bump GMS

Co-authored-by: max-hoffman <max-hoffman@users.noreply.github.com>
This commit is contained in:
Maximilian Hoffman
2021-12-11 13:41:58 -08:00
committed by GitHub
parent 57ab97e743
commit 0cbd7e780c
40 changed files with 760 additions and 62 deletions
+2 -2
View File
@@ -69,9 +69,9 @@ func (cmd *trackedCommand) RequiresRepo() bool {
return false
}
func (cmd *trackedCommand) Exec(ctx context.Context, cmdStr string, args []string, dEnv *env.DoltEnv) int {
func (cmd *trackedCommand) Exec(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
cmd.called = true
cmd.cmdStr = cmdStr
cmd.cmdStr = commandStr
cmd.args = args
return 0
}
+1 -1
View File
@@ -67,7 +67,7 @@ func (cmd *DumpDocsCmd) ArgParser() *argparser.ArgParser {
}
// Exec executes the command
func (cmd *DumpDocsCmd) Exec(_ context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
func (cmd *DumpDocsCmd) Exec(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
ap := cmd.ArgParser()
help, usage := cli.HelpAndUsagePrinters(cli.GetCommandDocumentation(commandStr, cli.CommandDocumentationContent{}, ap))
+14 -1
View File
@@ -63,12 +63,18 @@ func NewSqlEngine(
return nil, err
}
bThreads := sql.NewBackgroundThreads()
dbs, err = dsqle.ApplyReplicationConfig(ctx, bThreads, mrEnv, cli.CliOut, dbs...)
if err != nil {
return nil, err
}
infoDB := information_schema.NewInformationSchemaDatabase()
all := append(dsqleDBsAsSqlDBs(dbs), infoDB)
pro := dsqle.NewDoltDatabaseProvider(mrEnv.Config(), mrEnv.FileSystem(), all...)
engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{Auth: au})
engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{Auth: au}).WithBackgroundThreads(bThreads)
if dbg, ok := os.LookupEnv("DOLT_SQL_DEBUG_LOG"); ok && strings.ToLower(dbg) == "true" {
engine.Analyzer.Debug = true
@@ -216,6 +222,13 @@ func (se *SqlEngine) GetUnderlyingEngine() *gms.Engine {
return se.engine
}
func (se *SqlEngine) Close() error {
if se.engine != nil {
return se.engine.Close()
}
return nil
}
func dsqleDBsAsSqlDBs(dbs []dsqle.SqlDatabase) []sql.Database {
sqlDbs := make([]sql.Database, 0, len(dbs))
for _, db := range dbs {
+1
View File
@@ -34,6 +34,7 @@ import (
func CollectDBs(ctx context.Context, mrEnv *env.MultiRepoEnv) ([]sqle.SqlDatabase, error) {
var dbs []sqle.SqlDatabase
var db sqle.SqlDatabase
err := mrEnv.Iter(func(name string, dEnv *env.DoltEnv) (stop bool, err error) {
postCommitHooks, err := GetCommitHooks(ctx, dEnv)
if err != nil {
+3 -1
View File
@@ -251,11 +251,13 @@ func rebaseSqlEngine(ctx context.Context, dEnv *env.DoltEnv, cm *doltdb.Commit)
opts := editor.Options{Deaf: dEnv.DbEaFactory()}
db := dsqle.NewDatabase(dbName, dEnv.DbData(), opts)
pro := dsqle.NewDoltDatabaseProvider(dEnv.Config, dEnv.FS, db)
mrEnv, err := env.DoltEnvAsMultiEnv(ctx, dEnv)
if err != nil {
return nil, nil, err
}
pro := dsqle.NewDoltDatabaseProvider(dEnv.Config, mrEnv.FileSystem(), db)
parallelism := runtime.GOMAXPROCS(0)
azr := analyzer.NewBuilder(pro).WithParallelism(parallelism).Build()
+1 -1
View File
@@ -47,7 +47,7 @@ func (z GenZshCompCmd) Description() string {
return "Creates a zsh autocomp file for all dolt commands"
}
func (z GenZshCompCmd) Exec(_ context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
func (z GenZshCompCmd) Exec(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
ap := z.ArgParser()
help, usage := cli.HelpAndUsagePrinters(cli.GetCommandDocumentation(commandStr, cli.CommandDocumentationContent{}, ap))
+1
View File
@@ -141,6 +141,7 @@ func exportSchemas(ctx context.Context, apr *argparser.ArgParseResults, root *do
func exportTblSchema(ctx context.Context, tblName string, root *doltdb.RootValue, wr io.Writer, opts editor.Options) errhand.VerboseError {
sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(root, opts))
stmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, tblName)
if err != nil {
return errhand.VerboseErrorFromError(err)
+1
View File
@@ -285,6 +285,7 @@ func importSchema(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPars
// inferred schemas have no foreign keys
sqlDb := sqle.NewSingleTableDatabase(tblName, sch, nil, nil)
sqlCtx, engine, _ := sqle.PrepareCreateTableStmt(ctx, sqlDb)
stmt, err := sqle.GetCreateTableStmt(sqlCtx, engine, tblName)
if err != nil {
return errhand.VerboseErrorFromError(err)
+6 -2
View File
@@ -365,6 +365,7 @@ func execShell(
if err != nil {
return errhand.VerboseErrorFromError(err)
}
defer se.Close()
err = runShell(ctx, se, mrEnv)
if err != nil {
@@ -385,6 +386,7 @@ func execBatch(
if err != nil {
return errhand.VerboseErrorFromError(err)
}
defer se.Close()
sqlCtx, err := se.NewContext(ctx)
if err != nil {
@@ -419,6 +421,7 @@ func execMultiStatements(
if err != nil {
return errhand.VerboseErrorFromError(err)
}
defer se.Close()
sqlCtx, err := se.NewContext(ctx)
if err != nil {
@@ -445,6 +448,7 @@ func execQuery(
if err != nil {
return errhand.VerboseErrorFromError(err)
}
defer se.Close()
sqlCtx, err := se.NewContext(ctx)
if err != nil {
@@ -1084,7 +1088,7 @@ func processBatchQuery(ctx *sql.Context, query string, se *engine.SqlEngine) err
}
currentBatchMode := invalidBatchMode
if v, err := ctx.GetSessionVariable(ctx, dsqle.CurrentBatchModeKey); err == nil {
if v, err := ctx.GetSessionVariable(ctx, dsess.CurrentBatchModeKey); err == nil {
currentBatchMode = batchMode(v.(int64))
} else {
return err
@@ -1104,7 +1108,7 @@ func processBatchQuery(ctx *sql.Context, query string, se *engine.SqlEngine) err
}
}
err = ctx.SetSessionVariable(ctx, dsqle.CurrentBatchModeKey, int64(newBatchMode))
err = ctx.SetSessionVariable(ctx, dsess.CurrentBatchModeKey, int64(newBatchMode))
if err != nil {
return err
}
+2 -1
View File
@@ -164,8 +164,9 @@ func Serve(
sqlEngine, err := engine.NewSqlEngine(ctx, mrEnv, engine.FormatTabular, "", serverConf.Auth, serverConfig.AutoCommit())
if err != nil {
return nil, err
return err, nil
}
defer sqlEngine.Close()
mySQLServer, startError = server.NewServer(
serverConf,
+5
View File
@@ -22,6 +22,7 @@ import (
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/fatih/color"
@@ -311,7 +312,11 @@ func runMain() int {
}
start := time.Now()
var wg sync.WaitGroup
ctx, stop := context.WithCancel(ctx)
res := doltCommand.Exec(ctx, "dolt", args, dEnv)
stop()
wg.Wait()
if csMetrics && dEnv.DoltDB != nil {
metricsSummary := dEnv.DoltDB.CSMetricsSummary()
@@ -23,13 +23,14 @@
package eventsapi
import (
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
duration "github.com/golang/protobuf/ptypes/duration"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -4,6 +4,7 @@ package eventsapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@@ -23,11 +23,12 @@
package eventsapi
import (
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -21,12 +21,13 @@
package remotesapi
import (
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -4,6 +4,7 @@ package remotesapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@@ -21,11 +21,12 @@
package remotesapi
import (
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -4,6 +4,7 @@ package remotesapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
+1 -1
View File
@@ -19,7 +19,7 @@ require (
github.com/denisbrodbeck/machineid v1.0.1
github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078
github.com/dolthub/fslock v0.0.3
github.com/dolthub/go-mysql-server v0.11.1-0.20211210010708-cabe8547ece9
github.com/dolthub/go-mysql-server v0.11.1-0.20211211201600-84ec026ac27d
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
+2 -4
View File
@@ -173,10 +173,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-mysql-server v0.11.1-0.20211208235858-69503651e330 h1:sg68DTU/kILE9MGDonGLRmMZ45VJ7+a/cSM7CKQ9IfI=
github.com/dolthub/go-mysql-server v0.11.1-0.20211208235858-69503651e330/go.mod h1:jz6842q6Q+fhvhS4opzQb7dgLMDf2895WXd4UDbCqoI=
github.com/dolthub/go-mysql-server v0.11.1-0.20211210010708-cabe8547ece9 h1:nQiLFVeosujN83EQigI4vBp1g+uSzEHfQaTUsSinRz4=
github.com/dolthub/go-mysql-server v0.11.1-0.20211210010708-cabe8547ece9/go.mod h1:jz6842q6Q+fhvhS4opzQb7dgLMDf2895WXd4UDbCqoI=
github.com/dolthub/go-mysql-server v0.11.1-0.20211211201600-84ec026ac27d h1:lUS1CCm+r0kHcuGs2FD2uope0tqCX1VUYEXF2xwxbuo=
github.com/dolthub/go-mysql-server v0.11.1-0.20211211201600-84ec026ac27d/go.mod h1:jz6842q6Q+fhvhS4opzQb7dgLMDf2895WXd4UDbCqoI=
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 h1:0ol5pj+QlKUKAtqs1LiPM3ZJKs+rHPgLSsMXmhTrCAM=
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=
+175 -8
View File
@@ -17,10 +17,14 @@ package doltdb
import (
"context"
"io"
"sync"
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
)
type PushOnWriteHook struct {
@@ -38,21 +42,21 @@ func NewPushOnWriteHook(destDB *DoltDB, tmpDir string) *PushOnWriteHook {
}
// Execute implements datas.CommitHook, replicates head updates to the destDb field
func (rh *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
return pushDataset(ctx, rh.destDB, db, rh.tmpDir, ds)
func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
return pushDataset(ctx, ph.destDB, db, ph.tmpDir, ds)
}
// HandleError implements datas.CommitHook
func (rh *PushOnWriteHook) HandleError(ctx context.Context, err error) error {
if rh.out != nil {
rh.out.Write([]byte(err.Error()))
func (ph *PushOnWriteHook) HandleError(ctx context.Context, err error) error {
if ph.out != nil {
ph.out.Write([]byte(err.Error()))
}
return nil
}
// SetLogger implements datas.CommitHook
func (rh *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {
rh.out = wr
func (ph *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {
ph.out = wr
return nil
}
@@ -93,6 +97,69 @@ func pushDataset(ctx context.Context, destDB, srcDB datas.Database, tempTableDir
return err
}
type PushArg struct {
ds datas.Dataset
db datas.Database
hash hash.Hash
}
type AsyncPushOnWriteHook struct {
out io.Writer
ch chan PushArg
}
const (
asyncPushBufferSize = 2048
asyncPushInterval = 500 * time.Millisecond
asyncPushProcessCommit = "async_push_process_commit"
asyncPushSyncReplica = "async_push_sync_replica"
)
var _ datas.CommitHook = (*AsyncPushOnWriteHook)(nil)
// NewAsyncPushOnWriteHook creates a AsyncReplicateHook
func NewAsyncPushOnWriteHook(bThreads *sql.BackgroundThreads, destDB *DoltDB, tmpDir string, logger io.Writer) (*AsyncPushOnWriteHook, error) {
ch := make(chan PushArg, asyncPushBufferSize)
err := RunAsyncReplicationThreads(bThreads, ch, destDB, tmpDir, logger)
if err != nil {
return nil, err
}
return &AsyncPushOnWriteHook{ch: ch}, nil
}
// Execute implements datas.CommitHook, replicates head updates to the destDb field
func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
rf, ok, err := ds.MaybeHeadRef()
if err != nil {
return ErrHashNotFound
}
if !ok {
return ErrHashNotFound
}
select {
case ah.ch <- PushArg{ds: ds, db: db, hash: rf.TargetHash()}:
case <-ctx.Done():
ah.ch <- PushArg{ds: ds, db: db, hash: rf.TargetHash()}
return ctx.Err()
}
return nil
}
// HandleError implements datas.CommitHook
func (ah *AsyncPushOnWriteHook) HandleError(ctx context.Context, err error) error {
if ah.out != nil {
ah.out.Write([]byte(err.Error()))
}
return nil
}
// SetLogger implements datas.CommitHook
func (ah *AsyncPushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {
ah.out = wr
return nil
}
type LogHook struct {
msg []byte
out io.Writer
@@ -127,3 +194,103 @@ func (lh *LogHook) SetLogger(ctx context.Context, wr io.Writer) error {
lh.out = wr
return nil
}
func RunAsyncReplicationThreads(bThreads *sql.BackgroundThreads, ch chan PushArg, destDB *DoltDB, tmpDir string, logger io.Writer) error {
mu := &sync.Mutex{}
var newHeads = make(map[string]PushArg, asyncPushBufferSize)
updateHead := func(p PushArg) {
mu.Lock()
newHeads[p.ds.ID()] = p
mu.Unlock()
}
// newCtx lets first goroutine drain before the second goroutine finalizes
newCtx, stop := context.WithCancel(context.Background())
// The first goroutine amortizes commits into a map keyed by dataset id.
// When the parent context cancels, this goroutine drains and kills its
// dependent goroutine.
//
// We do not track sequential commits because push follows historical
// dependencies. This does not account for reset --force, which
// breaks historical dependence.
err := bThreads.Add(asyncPushProcessCommit, func(ctx context.Context) {
for {
select {
case p, ok := <-ch:
if !ok {
return
}
updateHead(p)
case <-ctx.Done():
stop()
return
}
}
})
if err != nil {
return err
}
getHeadsCopy := func() map[string]PushArg {
mu.Lock()
defer mu.Unlock()
if len(newHeads) == 0 {
return nil
}
var newHeadsCopy = make(map[string]PushArg, asyncPushBufferSize)
for k, v := range newHeads {
newHeadsCopy[k] = v
}
return newHeadsCopy
}
isNewHeads := func(newHeads map[string]PushArg) bool {
defer mu.Unlock()
mu.Lock()
return len(newHeads) != 0
}
flush := func(newHeads map[string]PushArg, latestHeads map[string]hash.Hash) {
newHeadsCopy := getHeadsCopy()
if !isNewHeads(newHeadsCopy) {
return
}
for id, newCm := range newHeadsCopy {
if latest, ok := latestHeads[id]; !ok || latest != newCm.hash {
// use background context to drain after sql context is canceled
err := pushDataset(context.Background(), destDB.db, newCm.db, tmpDir, newCm.ds)
if err != nil {
logger.Write([]byte("replication failed: " + err.Error()))
}
latestHeads[id] = newCm.hash
}
}
}
// The second goroutine pushes updates to a remote chunkstore.
// This goroutine waits for first goroutine to drain before closing
// the channel and exiting.
err = bThreads.Add(asyncPushSyncReplica, func(ctx context.Context) {
defer close(ch)
var latestHeads = make(map[string]hash.Hash, asyncPushBufferSize)
ticker := time.NewTicker(asyncPushInterval)
for {
select {
case <-newCtx.Done():
flush(newHeads, latestHeads)
return
case <-ticker.C:
flush(newHeads, latestHeads)
}
}
})
if err != nil {
return err
}
return nil
}
@@ -21,14 +21,16 @@ import (
"path/filepath"
"testing"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/buffer"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/test"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/types"
"github.com/stretchr/testify/assert"
)
const defaultBranch = "main"
@@ -70,14 +72,14 @@ func TestPushOnWriteHook(t *testing.T) {
}
ddb, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS)
err = ddb.WriteEmptyRepo(context.Background(), "master", committerName, committerEmail)
err = ddb.WriteEmptyRepo(context.Background(), "main", committerName, committerEmail)
if err != nil {
t.Fatal("Unexpected error creating empty repo", err)
}
// prepare a commit in the source repo
cs, _ := NewCommitSpec("master")
cs, _ := NewCommitSpec("main")
commit, err := ddb.Resolve(context.Background(), cs, nil)
if err != nil {
@@ -163,3 +165,99 @@ func TestLogHook(t *testing.T) {
assert.Equal(t, buffer.Bytes(), msg)
})
}
func TestAsyncPushOnWrite(t *testing.T) {
ctx := context.Background()
// destination repo
testDir, err := test.ChangeToTestDir("TestReplicationDest")
if err != nil {
panic("Couldn't change the working directory to the test directory.")
}
committerName := "Bill Billerson"
committerEmail := "bigbillieb@fake.horse"
tmpDir := filepath.Join(testDir, dbfactory.DoltDataDir)
err = filesys.LocalFS.MkDirs(tmpDir)
if err != nil {
t.Fatal("Failed to create noms directory")
}
destDB, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS)
// source repo
testDir, err = test.ChangeToTestDir("TestReplicationSource")
if err != nil {
panic("Couldn't change the working directory to the test directory.")
}
tmpDir = filepath.Join(testDir, dbfactory.DoltDataDir)
err = filesys.LocalFS.MkDirs(tmpDir)
if err != nil {
t.Fatal("Failed to create noms directory")
}
ddb, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS)
err = ddb.WriteEmptyRepo(context.Background(), "main", committerName, committerEmail)
if err != nil {
t.Fatal("Unexpected error creating empty repo", err)
}
// setup hook
bThreads := sql.NewBackgroundThreads()
hook, err := NewAsyncPushOnWriteHook(bThreads, destDB, tmpDir, &buffer.Buffer{})
if err != nil {
t.Fatal("Unexpected error creating push hook", err)
}
t.Run("replicate to remote", func(t *testing.T) {
for i := 0; i < 200; i++ {
cs, _ := NewCommitSpec("main")
commit, err := ddb.Resolve(context.Background(), cs, nil)
if err != nil {
t.Fatal("Couldn't find commit")
}
meta, err := commit.GetCommitMeta()
assert.NoError(t, err)
if meta.Name != committerName || meta.Email != committerEmail {
t.Error("Unexpected metadata")
}
root, err := commit.GetRootValue()
assert.NoError(t, err)
tSchema := createTestSchema(t)
rowData, _ := createTestRowData(t, ddb.db, tSchema)
tbl, err := CreateTestTable(ddb.db, tSchema, rowData)
if err != nil {
t.Fatal("Failed to create test table with data")
}
root, err = root.PutTable(context.Background(), "test", tbl)
assert.NoError(t, err)
valHash, err := ddb.WriteRootValue(context.Background(), root)
assert.NoError(t, err)
meta, err = NewCommitMeta(committerName, committerEmail, "Sample data")
if err != nil {
t.Error("Failed to commit")
}
_, err = ddb.Commit(context.Background(), valHash, ref.NewBranchRef(defaultBranch), meta)
ds, err := ddb.db.GetDataset(ctx, "refs/heads/main")
err = hook.Execute(ctx, ds, ddb.db)
}
})
}
@@ -58,7 +58,6 @@ func (cmd fvCommand) exec(ctx context.Context, dEnv *env.DoltEnv) int {
// execute the command using |cmd.user|'s Feature Version
doltdb.DoltFeatureVersion = cmd.user.vers
defer func() { doltdb.DoltFeatureVersion = DoltFeatureVersionCopy }()
return cmd.cmd.Exec(ctx, cmd.cmd.Name(), cmd.args, dEnv)
}
@@ -19,6 +19,7 @@ import (
"context"
"fmt"
"io"
"sync"
"testing"
"time"
@@ -348,7 +349,7 @@ type ConflictsCat struct {
func (c ConflictsCat) CommandString() string { return fmt.Sprintf("conflicts_cat: %s", c.TableName) }
// Exec executes a ConflictsCat command on a test dolt environment.
func (c ConflictsCat) Exec(t *testing.T, dEnv *env.DoltEnv) error {
func (c ConflictsCat) Exec(t *testing.T, wg *sync.WaitGroup, dEnv *env.DoltEnv) error {
out := cnfcmds.CatCmd{}.Exec(context.Background(), "dolt conflicts cat", []string{c.TableName}, dEnv)
require.Equal(t, 0, out)
return nil
@@ -288,6 +288,7 @@ func TestKeylessMergeConflicts(t *testing.T) {
t.Run(test.name+"_resolved_ours", func(t *testing.T) {
dEnv := dtu.CreateTestEnv()
setupTest(t, ctx, dEnv, test.setup)
resolve := cnfcmds.ResolveCmd{}
@@ -304,6 +305,7 @@ func TestKeylessMergeConflicts(t *testing.T) {
})
t.Run(test.name+"_resolved_theirs", func(t *testing.T) {
dEnv := dtu.CreateTestEnv()
setupTest(t, ctx, dEnv, test.setup)
resolve := cnfcmds.ResolveCmd{}
@@ -471,6 +471,7 @@ func fkCollection(fks ...doltdb.ForeignKey) *doltdb.ForeignKeyCollection {
func testMergeSchemas(t *testing.T, test mergeSchemaTest) {
dEnv := dtestutils.CreateTestEnv()
ctx := context.Background()
for _, c := range setupCommon {
c.exec(t, ctx, dEnv)
}
@@ -73,6 +73,7 @@ func NewSqlEngineMover(ctx context.Context, dEnv *env.DoltEnv, writeSch schema.S
if err != nil {
return nil, err
}
defer se.Close()
sqlCtx, err := se.NewContext(ctx)
if err != nil {
@@ -203,7 +203,6 @@ func setupFilterBranchTests(t *testing.T) *env.DoltEnv {
func testFilterBranch(t *testing.T, test filterBranchTest) {
ctx := context.Background()
dEnv := setupFilterBranchTests(t)
for _, c := range test.setup {
exitCode := c.cmd.Exec(ctx, c.cmd.Name(), c.args, dEnv)
require.Equal(t, 0, exitCode)
@@ -44,6 +44,7 @@ const (
DoltCommitOnTransactionCommit = "dolt_transaction_commit"
TransactionsDisabledSysVar = "dolt_transactions_disabled"
ForceTransactionCommit = "dolt_force_transaction_commit"
CurrentBatchModeKey = "batch_mode"
)
const NonpersistableSessionCode = 1105 // default
@@ -99,6 +100,14 @@ func init() {
Type: sql.NewSystemBoolType(ForceTransactionCommit),
Default: int8(0),
},
{
Name: CurrentBatchModeKey,
Scope: sql.SystemVariableScope_Session,
Dynamic: true,
SetVarHintApplies: false,
Type: sql.NewSystemIntType(CurrentBatchModeKey, -9223372036854775808, 9223372036854775807, false),
Default: int64(0),
},
})
_, ok := os.LookupEnv(TransactionMergeStompEnvKey)
@@ -58,7 +58,11 @@ var _ enginetest.ReadOnlyDatabaseHarness = (*DoltHarness)(nil)
func newDoltHarness(t *testing.T) *DoltHarness {
dEnv := dtestutils.CreateTestEnv()
pro := sqle.NewDoltDatabaseProvider(dEnv.Config, dEnv.FS).WithDbFactoryUrl(doltdb.InMemDoltDB)
mrEnv, err := env.DoltEnvAsMultiEnv(context.Background(), dEnv)
require.NoError(t, err)
pro := sqle.NewDoltDatabaseProvider(dEnv.Config, mrEnv.FileSystem())
require.NoError(t, err)
pro = pro.WithDbFactoryUrl(doltdb.InMemDoltDB)
localConfig := dEnv.Config.WriteableConfig()
@@ -72,13 +76,14 @@ func newDoltHarness(t *testing.T) *DoltHarness {
}
var defaultSkippedQueries = []string{
"show variables", // we set extra variables
"show create table fk_tbl", // we create an extra key for the FK that vanilla gms does not
"show indexes from", // we create / expose extra indexes (for foreign keys)
"json_arrayagg", // TODO: aggregation ordering
"json_objectagg", // TODO: aggregation ordering
"typestable", // Bit type isn't working?
"dolt_commit_diff_", // see broken queries in `dolt_system_table_queries.go`
"show variables", // we set extra variables
"show create table fk_tbl", // we create an extra key for the FK that vanilla gms does not
"show indexes from", // we create / expose extra indexes (for foreign keys)
"json_arrayagg", // TODO: aggregation ordering
"json_objectagg", // TODO: aggregation ordering
"typestable", // Bit type isn't working?
"dolt_commit_diff_", // see broken queries in `dolt_system_table_queries.go`
"show global variables like", // we set extra variables
}
// WithParallelism returns a copy of the harness with parallelism set to the given number of threads. A value of 0 or
@@ -199,7 +204,10 @@ func (d *DoltHarness) NewReadOnlyDatabases(names ...string) (dbs []sql.ReadOnlyD
}
func (d *DoltHarness) NewDatabaseProvider(dbs ...sql.Database) sql.MutableDatabaseProvider {
return sqle.NewDoltDatabaseProvider(d.env.Config, d.env.FS, dbs...).WithDbFactoryUrl(doltdb.InMemDoltDB)
mrEnv, err := env.DoltEnvAsMultiEnv(context.Background(), d.env)
require.NoError(d.t, err)
pro := sqle.NewDoltDatabaseProvider(d.env.Config, mrEnv.FileSystem(), dbs...)
return pro.WithDbFactoryUrl(doltdb.InMemDoltDB)
}
func getDbState(t *testing.T, db sqle.Database, dEnv *env.DoltEnv) dsess.InitialDbState {
@@ -342,7 +342,14 @@ func schemaToSchemaString(sch sql.Schema) (string, error) {
func sqlNewEngine(dEnv *env.DoltEnv) (*sqle.Engine, error) {
opts := editor.Options{Deaf: dEnv.DbEaFactory()}
db := dsql.NewDatabase("dolt", dEnv.DbData(), opts)
pro := dsql.NewDoltDatabaseProvider(dEnv.Config, dEnv.FS, db)
mrEnv, err := env.DoltEnvAsMultiEnv(context.Background(), dEnv)
if err != nil {
return nil, err
}
pro := dsql.NewDoltDatabaseProvider(dEnv.Config, mrEnv.FileSystem(), db)
pro = pro.WithDbFactoryUrl(doltdb.InMemDoltDB)
engine := sqle.NewDefault(pro)
return engine, nil
@@ -100,7 +100,12 @@ func setupIndexes(t *testing.T, tableName, insertQuery string) (*sqle.Engine, *e
tbl: tbl,
editOpts: opts,
}
pro := NewDoltDatabaseProvider(dEnv.Config, dEnv.FS, tiDb)
mrEnv, err := env.DoltEnvAsMultiEnv(context.Background(), dEnv)
require.NoError(t, err)
pro := NewDoltDatabaseProvider(dEnv.Config, mrEnv.FileSystem(), tiDb)
pro = pro.WithDbFactoryUrl(doltdb.InMemDoltDB)
engine = sqle.NewDefault(pro)
// Get an updated root to use for the rest of the test
@@ -108,7 +108,7 @@ func (rrd ReadReplicaDatabase) PullFromRemote(ctx context.Context) error {
}
switch {
case headsArg != "" && allHeads == int8(1):
case headsArg != "" && allHeads == SysVarTrue:
return fmt.Errorf("%w; cannot set both 'dolt_replicate_heads' and 'dolt_replicate_all_heads'", ErrInvalidReplicateHeadsSetting)
case headsArg != "":
heads, ok := headsArg.(string)
+140
View File
@@ -0,0 +1,140 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqle
import (
"context"
"fmt"
"io"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/types"
)
func getPushOnWriteHook(ctx context.Context, bThreads *sql.BackgroundThreads, dEnv *env.DoltEnv, logger io.Writer) (datas.CommitHook, error) {
_, val, ok := sql.SystemVariables.GetGlobal(ReplicateToRemoteKey)
if !ok {
return nil, sql.ErrUnknownSystemVariable.New(ReplicateToRemoteKey)
} else if val == "" {
return nil, nil
}
remoteName, ok := val.(string)
if !ok {
return nil, sql.ErrInvalidSystemVariableValue.New(val)
}
remotes, err := dEnv.GetRemotes()
if err != nil {
return nil, err
}
rem, ok := remotes[remoteName]
if !ok {
return nil, fmt.Errorf("%w: '%s'", env.ErrRemoteNotFound, remoteName)
}
ddb, err := rem.GetRemoteDB(ctx, types.Format_Default)
if err != nil {
return nil, err
}
_, val, ok = sql.SystemVariables.GetGlobal(AsyncReplicationKey)
if _, val, ok = sql.SystemVariables.GetGlobal(AsyncReplicationKey); ok && val == SysVarTrue {
return doltdb.NewAsyncPushOnWriteHook(bThreads, ddb, dEnv.TempTableFilesDir(), logger)
}
return doltdb.NewPushOnWriteHook(ddb, dEnv.TempTableFilesDir()), nil
}
// GetCommitHooks creates a list of hooks to execute on database commit. If doltdb.SkipReplicationErrorsKey is set,
// replace misconfigured hooks with doltdb.LogHook instances that prints a warning when trying to execute.
func GetCommitHooks(ctx context.Context, bThreads *sql.BackgroundThreads, dEnv *env.DoltEnv, logger io.Writer) ([]datas.CommitHook, error) {
postCommitHooks := make([]datas.CommitHook, 0)
if hook, err := getPushOnWriteHook(ctx, bThreads, dEnv, logger); err != nil {
err = fmt.Errorf("failure loading hook; %w", err)
if SkipReplicationWarnings() {
postCommitHooks = append(postCommitHooks, doltdb.NewLogHook([]byte(err.Error()+"\n")))
} else {
return nil, err
}
} else if hook != nil {
postCommitHooks = append(postCommitHooks, hook)
}
for _, h := range postCommitHooks {
h.SetLogger(ctx, logger)
}
return postCommitHooks, nil
}
// newReplicaDatabase creates a new dsqle.ReadReplicaDatabase. If the doltdb.SkipReplicationErrorsKey global variable is set,
// skip errors related to database construction only and return a partially functional dsqle.ReadReplicaDatabase
// that will log warnings when attempting to perform replica commands.
func newReplicaDatabase(ctx context.Context, name string, remoteName string, dEnv *env.DoltEnv) (ReadReplicaDatabase, error) {
opts := editor.Options{
Deaf: dEnv.DbEaFactory(),
}
db := NewDatabase(name, dEnv.DbData(), opts)
rrd, err := NewReadReplicaDatabase(ctx, db, remoteName, dEnv)
if err != nil {
err = fmt.Errorf("%w from remote '%s'; %s", ErrFailedToLoadReplicaDB, remoteName, err.Error())
if !SkipReplicationWarnings() {
return ReadReplicaDatabase{}, err
}
cli.Println(err)
return ReadReplicaDatabase{Database: db}, nil
}
return rrd, nil
}
func ApplyReplicationConfig(ctx context.Context, bThreads *sql.BackgroundThreads, mrEnv *env.MultiRepoEnv, logger io.Writer, dbs ...SqlDatabase) ([]SqlDatabase, error) {
outputDbs := make([]SqlDatabase, len(dbs))
for i, db := range dbs {
dEnv := mrEnv.GetEnv(db.Name())
if dEnv == nil {
outputDbs = append(outputDbs, db)
continue
}
postCommitHooks, err := GetCommitHooks(ctx, bThreads, dEnv, logger)
if err != nil {
return nil, err
}
dEnv.DoltDB.SetCommitHooks(ctx, postCommitHooks)
if _, remote, ok := sql.SystemVariables.GetGlobal(ReadReplicaRemoteKey); ok && remote != "" {
remoteName, ok := remote.(string)
if !ok {
return nil, sql.ErrInvalidSystemVariableValue.New(remote)
}
db, err = newReplicaDatabase(ctx, db.Name(), remoteName, dEnv)
if err != nil {
return nil, err
}
}
outputDbs[i] = db
}
return outputDbs, nil
}
@@ -0,0 +1,46 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqle
import (
"context"
"testing"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/buffer"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
)
func TestCommitHooksNoErrors(t *testing.T) {
dEnv := dtestutils.CreateEnvWithSeedData(t)
AddDoltSystemVariables()
sql.SystemVariables.SetGlobal(SkipReplicationErrorsKey, true)
sql.SystemVariables.SetGlobal(ReplicateToRemoteKey, "unknown")
bThreads := sql.NewBackgroundThreads()
hooks, err := GetCommitHooks(context.Background(), bThreads, dEnv, &buffer.Buffer{})
assert.NoError(t, err)
if len(hooks) < 1 {
t.Error("failed to produce noop hook")
} else {
switch h := hooks[0].(type) {
case *doltdb.LogHook:
default:
t.Errorf("expected LogHook, found: %s", h)
}
}
}
+19 -10
View File
@@ -25,19 +25,20 @@ const (
SkipReplicationErrorsKey = "dolt_skip_replication_errors"
ReplicateHeadsKey = "dolt_replicate_heads"
ReplicateAllHeadsKey = "dolt_replicate_all_heads"
CurrentBatchModeKey = "batch_mode"
AsyncReplicationKey = "dolt_async_replication"
)
const (
SysVarFalse = int8(0)
SysVarTrue = int8(1)
)
func init() {
AddDoltSystemVariables()
}
func AddDoltSystemVariables() {
sql.SystemVariables.AddSystemVariables([]sql.SystemVariable{
{
Name: CurrentBatchModeKey,
Scope: sql.SystemVariableScope_Session,
Dynamic: true,
SetVarHintApplies: false,
Type: sql.NewSystemIntType(CurrentBatchModeKey, -9223372036854775808, 9223372036854775807, false),
Default: int64(0),
},
{
Name: DefaultBranchKey,
Scope: sql.SystemVariableScope_Global,
@@ -86,6 +87,14 @@ func AddDoltSystemVariables() {
Type: sql.NewSystemBoolType(ReplicateAllHeadsKey),
Default: int8(0),
},
{
Name: AsyncReplicationKey,
Scope: sql.SystemVariableScope_Session,
Dynamic: true,
SetVarHintApplies: false,
Type: sql.NewSystemBoolType(AsyncReplicationKey),
Default: int8(0),
},
})
}
@@ -94,5 +103,5 @@ func SkipReplicationWarnings() bool {
if !ok {
panic("dolt system variables not loaded")
}
return skip == int8(1)
return skip == SysVarTrue
}
@@ -0,0 +1,132 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package serverbench
import (
"context"
"fmt"
"runtime/pprof"
"strings"
"testing"
srv "github.com/dolthub/dolt/go/cmd/dolt/commands/sqlserver"
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/testcommands"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
)
// usage: `go test -bench .`
func BenchmarkAsyncPushOnWrite(b *testing.B) {
setup := make([]query, 1)
setup[0] = "CREATE TABLE bench (a int, b int, c int);"
q := strings.Builder{}
q.WriteString("INSERT INTO bench (a, b, c) VALUES (0, 0, 0)")
i := 1
for i < 1000 {
q.WriteString(fmt.Sprintf(",(%d, %d, %d)", i, i, i))
i++
}
qs := q.String()
bench := make([]query, 100)
commit := query("select dolt_commit('-am', 'cm')")
i = 0
for i < len(bench) {
bench[i] = query(qs)
bench[i+1] = commit
i += 2
}
benchmarkAsyncPush(b, serverTest{
name: "smoke bench",
setup: setup,
bench: bench,
})
}
func benchmarkAsyncPush(b *testing.B, test serverTest) {
var dEnv *env.DoltEnv
var cfg srv.ServerConfig
ctx := context.Background()
// setup
dEnv, cfg = getAsyncEnvAndConfig(ctx, b)
dsess.InitPersistedSystemVars(dEnv)
executeServerQueries(ctx, b, dEnv, cfg, test.setup)
// bench
f := getProfFile(b)
err := pprof.StartCPUProfile(f)
if err != nil {
b.Fatal(err)
}
defer func() {
pprof.StopCPUProfile()
if err = f.Close(); err != nil {
b.Fatal(err)
}
fmt.Printf("\twriting CPU profile for %s: %s\n", b.Name(), f.Name())
}()
b.Run(test.name, func(b *testing.B) {
executeServerQueries(ctx, b, dEnv, cfg, test.bench)
})
}
func getAsyncEnvAndConfig(ctx context.Context, b *testing.B) (dEnv *env.DoltEnv, cfg srv.ServerConfig) {
multiSetup := testcommands.NewMultiRepoTestSetup(b.Fatal)
multiSetup.NewDB("dolt_bench")
multiSetup.NewRemote("remote1")
writerName := multiSetup.DbNames[0]
localCfg, ok := multiSetup.MrEnv.GetEnv(writerName).Config.GetConfig(env.LocalConfig)
if !ok {
b.Fatal("local config does not exist")
}
localCfg.SetStrings(map[string]string{fmt.Sprintf("%s.%s", env.SqlServerGlobalsPrefix, sqle.ReplicateToRemoteKey): "remote1", fmt.Sprintf("%s.%s", env.SqlServerGlobalsPrefix, sqle.AsyncReplicationKey): "1"})
yaml := []byte(fmt.Sprintf(`
log_level: warning
behavior:
read_only: false
user:
name: "root"
password: ""
databases:
- name: "%s"
path: "%s"
listener:
host: localhost
port: %d
max_connections: 128
read_timeout_millis: 28800000
write_timeout_millis: 28800000
`, writerName, multiSetup.DbPaths[writerName], port))
cfg, err := srv.NewYamlConfig(yaml)
if err != nil {
b.Fatal(err)
}
return multiSetup.MrEnv.GetEnv(writerName), cfg
}
+3 -3
View File
@@ -79,7 +79,7 @@ SQL
[ "$status" -eq "0" ]
[[ "$output" =~ 'UNIQUE KEY `v1` (`v1`)' ]] || false
run dolt index ls test2
run dolt index ls test2
[ "$status" -eq "0" ]
[[ "$output" =~ "v1(v1)" ]] || false
run dolt schema show test2
@@ -117,11 +117,11 @@ SQL
run dolt index ls test
[ "$status" -eq "0" ]
[[ "$output" =~ "v1(v1)" ]] || false
[[ "$output" =~ "v1v2(v1, v2)" ]] || false
[[ "$output" =~ "v1v2(v1, v2)" ]] || false
run dolt schema show test
[ "$status" -eq "0" ]
[[ "$output" =~ 'KEY `v1` (`v1`)' ]] || false
[[ "$output" =~ 'KEY `v1v2` (`v1`,`v2`)' ]] || false
[[ "$output" =~ 'KEY `v1v2` (`v1`,`v2`)' ]] || false
}
@test "index: CREATE INDEX then INSERT" {
+17
View File
@@ -375,3 +375,20 @@ teardown() {
[ "$status" -eq 0 ]
[[ ! "output" =~ "t1" ]] || false
}
@test "replication: async push on cli engine commit" {
cd repo1
dolt config --local --add sqlserver.global.dolt_replicate_to_remote remote1
dolt config --local --add sqlserver.global.dolt_async_replication 1
dolt sql -q "create table t1 (a int primary key)"
dolt sql -q "select dolt_commit('-am', 'cm')"
sleep 5
cd ..
dolt clone file://./rem1 repo2
cd repo2
run dolt ls
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 2 ]
[[ "$output" =~ "t1" ]] || false
}
@@ -73,6 +73,30 @@ teardown() {
[[ "${lines[3]}" =~ "2" ]]
}
@test "sql-server-remotes: async push on sql-session commit" {
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
cd repo1
dolt config --local --add sqlserver.global.dolt_replicate_to_remote remote1
dolt config --local --add sqlserver.global.dolt_async_replication 1
start_sql_server repo1
multi_query repo1 1 "
SELECT DOLT_COMMIT('-am', 'Step 1');"
# threads guarenteed to flush after we stop server
stop_sql_server
cd ../repo2
dolt pull remote1
run dolt sql -q "select * from test" -r csv
[ "$status" -eq 0 ]
[[ "${lines[0]}" =~ "pk" ]]
[[ "${lines[1]}" =~ "0" ]]
[[ "${lines[2]}" =~ "1" ]]
[[ "${lines[3]}" =~ "2" ]]
}
@test "sql-server-remotes: pull new commits on read" {
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."