mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-21 00:42:25 -05:00
Merge branch 'master' into zachmu/functions
This commit is contained in:
@@ -185,10 +185,8 @@ func pullerProgFunc(ctx context.Context, pullerEventCh chan datas.PullerEvent) {
|
||||
uploadRate := ""
|
||||
|
||||
for evt := range pullerEventCh {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
default:
|
||||
}
|
||||
switch evt.EventType {
|
||||
case datas.NewLevelTWEvent:
|
||||
@@ -247,10 +245,8 @@ func progFunc(ctx context.Context, progChan chan datas.PullProgress) {
|
||||
lenPrinted := 0
|
||||
done := false
|
||||
for !done {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -232,6 +232,8 @@ func newSessionBuilder(sqlEngine *sqle.Engine, username string, email string, pr
|
||||
cli.PrintErr(err)
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
db.GetDoltDB().SetCommitHookLogger(ctx, doltSess.GetLogger().Logger.Out)
|
||||
}
|
||||
|
||||
return doltSess, ir, vr, nil
|
||||
|
||||
@@ -16,7 +16,6 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
@@ -50,7 +49,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "0.28.4"
|
||||
Version = "0.28.5"
|
||||
)
|
||||
|
||||
var dumpDocsCommand = &commands.DumpDocsCmd{}
|
||||
@@ -148,7 +147,27 @@ func runMain() int {
|
||||
case pprofServerFlag:
|
||||
// serve the pprof endpoints setup in the init function run when "net/http/pprof" is imported
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe("localhost:6060", nil))
|
||||
cyanStar := color.CyanString("*")
|
||||
cli.Println(cyanStar, "Starting pprof server on port 6060.")
|
||||
cli.Println(cyanStar, "Go to", color.CyanString("http://localhost:6060/debug/pprof"), "in a browser to see supported endpoints.")
|
||||
cli.Println(cyanStar)
|
||||
cli.Println(cyanStar, "Known endpoints are:")
|
||||
cli.Println(cyanStar, " /allocs: A sampling of all past memory allocations")
|
||||
cli.Println(cyanStar, " /block: Stack traces that led to blocking on synchronization primitives")
|
||||
cli.Println(cyanStar, " /cmdline: The command line invocation of the current program")
|
||||
cli.Println(cyanStar, " /goroutine: Stack traces of all current goroutines")
|
||||
cli.Println(cyanStar, " /heap: A sampling of memory allocations of live objects. You can specify the gc GET parameter to run GC before taking the heap sample.")
|
||||
cli.Println(cyanStar, " /mutex: Stack traces of holders of contended mutexes")
|
||||
cli.Println(cyanStar, " /profile: CPU profile. You can specify the duration in the seconds GET parameter. After you get the profile file, use the go tool pprof command to investigate the profile.")
|
||||
cli.Println(cyanStar, " /threadcreate: Stack traces that led to the creation of new OS threads")
|
||||
cli.Println(cyanStar, " /trace: A trace of execution of the current program. You can specify the duration in the seconds GET parameter. After you get the trace file, use the go tool trace command to investigate the trace.")
|
||||
cli.Println()
|
||||
|
||||
err := http.ListenAndServe("localhost:6060", nil)
|
||||
|
||||
if err != nil {
|
||||
cli.Println(color.YellowString("pprof server exited with error: %v", err))
|
||||
}
|
||||
}()
|
||||
args = args[1:]
|
||||
|
||||
@@ -234,7 +253,6 @@ func runMain() int {
|
||||
}
|
||||
|
||||
root, err := env.GetCurrentUserHomeDir()
|
||||
|
||||
if err != nil {
|
||||
cli.PrintErrln(color.RedString("Failed to load the HOME directory: %v", err))
|
||||
return 1
|
||||
@@ -281,6 +299,10 @@ func runMain() int {
|
||||
|
||||
defer tempfiles.MovableTempFileProvider.Clean()
|
||||
|
||||
if dEnv.DoltDB != nil {
|
||||
dEnv.DoltDB.SetCommitHookLogger(ctx, cli.OutStream)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
res := doltCommand.Exec(ctx, "dolt", args, dEnv)
|
||||
|
||||
|
||||
157
go/libraries/doltcore/doltdb/commit_hooks.go
Normal file
157
go/libraries/doltcore/doltdb/commit_hooks.go
Normal file
@@ -0,0 +1,157 @@
|
||||
// Copyright 2021 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package doltdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
)
|
||||
|
||||
const BackupToRemoteKey = "DOLT_BACKUP_TO_REMOTE"
|
||||
|
||||
type ReplicateHook struct {
|
||||
destDB datas.Database
|
||||
tmpDir string
|
||||
outf io.Writer
|
||||
}
|
||||
|
||||
// NewReplicateHook creates a ReplicateHook, parameterizaed by the backup database
|
||||
// and a local tempfile for pushing
|
||||
func NewReplicateHook(destDB *DoltDB, tmpDir string) *ReplicateHook {
|
||||
return &ReplicateHook{destDB: destDB.db, tmpDir: tmpDir}
|
||||
}
|
||||
|
||||
// Execute implements datas.CommitHook, replicates head updates to the destDb field
|
||||
func (rh *ReplicateHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
|
||||
return replicate(ctx, rh.destDB, db, rh.tmpDir, ds)
|
||||
}
|
||||
|
||||
// HandleError implements datas.CommitHook
|
||||
func (rh *ReplicateHook) HandleError(ctx context.Context, err error) error {
|
||||
if rh.outf != nil {
|
||||
rh.outf.Write([]byte(err.Error()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetLogger implements datas.CommitHook
|
||||
func (rh *ReplicateHook) SetLogger(ctx context.Context, wr io.Writer) error {
|
||||
rh.outf = wr
|
||||
return nil
|
||||
}
|
||||
|
||||
// replicate pushes a dataset from srcDB to destDB and force sets the destDB ref to the new dataset value
|
||||
func replicate(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error {
|
||||
stRef, ok, err := ds.MaybeHeadRef()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
// No head ref, return
|
||||
return nil
|
||||
}
|
||||
|
||||
rf, err := ref.Parse(ds.ID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newCtx, cancelFunc := context.WithCancel(ctx)
|
||||
wg, progChan, pullerEventCh := runProgFuncs(newCtx)
|
||||
defer stopProgFuncs(cancelFunc, wg, progChan, pullerEventCh)
|
||||
puller, err := datas.NewPuller(ctx, tempTableDir, defaultChunksPerTF, srcDB, destDB, stRef.TargetHash(), pullerEventCh)
|
||||
if err == datas.ErrDBUpToDate {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = puller.Pull(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ds, err = destDB.GetDataset(ctx, rf.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = destDB.SetHead(ctx, ds, stRef)
|
||||
return err
|
||||
}
|
||||
|
||||
func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-pullerEventCh:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func progFunc(ctx context.Context, progChan <-chan datas.PullProgress) {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-progChan:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan datas.PullProgress, chan datas.PullerEvent) {
|
||||
pullerEventCh := make(chan datas.PullerEvent)
|
||||
progChan := make(chan datas.PullProgress)
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
progFunc(ctx, progChan)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
pullerProgFunc(ctx, pullerEventCh)
|
||||
}()
|
||||
|
||||
return wg, progChan, pullerEventCh
|
||||
}
|
||||
|
||||
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) {
|
||||
cancel()
|
||||
close(progChan)
|
||||
close(pullerEventCh)
|
||||
wg.Wait()
|
||||
}
|
||||
149
go/libraries/doltcore/doltdb/commit_hooks_test.go
Normal file
149
go/libraries/doltcore/doltdb/commit_hooks_test.go
Normal file
@@ -0,0 +1,149 @@
|
||||
// Copyright 2021 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package doltdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/test"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestReplicateHook(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// destination repo
|
||||
testDir, err := test.ChangeToTestDir("TestReplicationDest")
|
||||
|
||||
if err != nil {
|
||||
panic("Couldn't change the working directory to the test directory.")
|
||||
}
|
||||
|
||||
committerName := "Bill Billerson"
|
||||
committerEmail := "bigbillieb@fake.horse"
|
||||
|
||||
tmpDir := filepath.Join(testDir, dbfactory.DoltDataDir)
|
||||
err = filesys.LocalFS.MkDirs(tmpDir)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Failed to create noms directory")
|
||||
}
|
||||
|
||||
destDB, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS)
|
||||
|
||||
// source repo
|
||||
testDir, err = test.ChangeToTestDir("TestReplicationSource")
|
||||
|
||||
if err != nil {
|
||||
panic("Couldn't change the working directory to the test directory.")
|
||||
}
|
||||
|
||||
tmpDir = filepath.Join(testDir, dbfactory.DoltDataDir)
|
||||
err = filesys.LocalFS.MkDirs(tmpDir)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Failed to create noms directory")
|
||||
}
|
||||
|
||||
ddb, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS)
|
||||
err = ddb.WriteEmptyRepo(context.Background(), "master", committerName, committerEmail)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error creating empty repo", err)
|
||||
}
|
||||
|
||||
// prepare a commit in the source repo
|
||||
cs, _ := NewCommitSpec("master")
|
||||
commit, err := ddb.Resolve(context.Background(), cs, nil)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't find commit")
|
||||
}
|
||||
|
||||
meta, err := commit.GetCommitMeta()
|
||||
assert.NoError(t, err)
|
||||
|
||||
if meta.Name != committerName || meta.Email != committerEmail {
|
||||
t.Error("Unexpected metadata")
|
||||
}
|
||||
|
||||
root, err := commit.GetRootValue()
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
names, err := root.GetTableNames(context.Background())
|
||||
assert.NoError(t, err)
|
||||
if len(names) != 0 {
|
||||
t.Fatal("There should be no tables in empty db")
|
||||
}
|
||||
|
||||
tSchema := createTestSchema(t)
|
||||
rowData, _ := createTestRowData(t, ddb.db, tSchema)
|
||||
tbl, err := CreateTestTable(ddb.db, tSchema, rowData)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Failed to create test table with data")
|
||||
}
|
||||
|
||||
root, err = root.PutTable(context.Background(), "test", tbl)
|
||||
assert.NoError(t, err)
|
||||
|
||||
valHash, err := ddb.WriteRootValue(context.Background(), root)
|
||||
assert.NoError(t, err)
|
||||
|
||||
meta, err = NewCommitMeta(committerName, committerEmail, "Sample data")
|
||||
if err != nil {
|
||||
t.Error("Failed to commit")
|
||||
}
|
||||
|
||||
// setup hook
|
||||
hook := NewReplicateHook(destDB, tmpDir)
|
||||
ddb.SetCommitHooks(ctx, []datas.CommitHook{hook})
|
||||
|
||||
t.Run("replicate to backup remote", func(t *testing.T) {
|
||||
srcCommit, err := ddb.Commit(context.Background(), valHash, ref.NewBranchRef("master"), meta)
|
||||
ds, err := ddb.db.GetDataset(ctx, "refs/heads/master")
|
||||
err = hook.Execute(ctx, ds, ddb.db)
|
||||
assert.NoError(t, err)
|
||||
|
||||
cs, _ = NewCommitSpec("master")
|
||||
destCommit, err := destDB.Resolve(context.Background(), cs, nil)
|
||||
|
||||
srcHash, _ := srcCommit.HashOf()
|
||||
destHash, _ := destCommit.HashOf()
|
||||
assert.Equal(t, srcHash, destHash)
|
||||
})
|
||||
|
||||
t.Run("replicate handle error logs to writer", func(t *testing.T) {
|
||||
var buffer = &bytes.Buffer{}
|
||||
err = hook.SetLogger(ctx, buffer)
|
||||
assert.NoError(t, err)
|
||||
|
||||
msg := "prince charles is a vampire"
|
||||
hook.HandleError(ctx, errors.New(msg))
|
||||
|
||||
assert.Equal(t, buffer.String(), msg)
|
||||
})
|
||||
}
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -1279,3 +1280,15 @@ func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB
|
||||
func (ddb *DoltDB) Clone(ctx context.Context, destDB *DoltDB, eventCh chan<- datas.TableFileEvent) error {
|
||||
return datas.Clone(ctx, ddb.db, destDB.db, eventCh)
|
||||
}
|
||||
|
||||
func (ddb *DoltDB) SetCommitHooks(ctx context.Context, postHooks []datas.CommitHook) *DoltDB {
|
||||
ddb.db = ddb.db.SetCommitHooks(ctx, postHooks)
|
||||
return ddb
|
||||
}
|
||||
|
||||
func (ddb *DoltDB) SetCommitHookLogger(ctx context.Context, wr io.Writer) *DoltDB {
|
||||
if ddb.db != nil {
|
||||
ddb.db = ddb.db.SetCommitHookLogger(ctx, wr)
|
||||
}
|
||||
return ddb
|
||||
}
|
||||
|
||||
3
go/libraries/doltcore/env/actions/remotes.go
vendored
3
go/libraries/doltcore/env/actions/remotes.go
vendored
@@ -20,12 +20,11 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
|
||||
|
||||
eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
|
||||
"github.com/dolthub/dolt/go/libraries/events"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/earl"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
|
||||
39
go/libraries/doltcore/env/environment.go
vendored
39
go/libraries/doltcore/env/environment.go
vendored
@@ -19,6 +19,7 @@ import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
@@ -38,6 +39,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/config"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
@@ -56,6 +58,34 @@ const (
|
||||
tempTablesDir = "temptf"
|
||||
)
|
||||
|
||||
func getCommitHooks(ctx context.Context, dEnv *DoltEnv) ([]datas.CommitHook, error) {
|
||||
postCommitHooks := make([]datas.CommitHook, 0)
|
||||
|
||||
backupName := os.Getenv(doltdb.BackupToRemoteKey)
|
||||
if backupName != "" {
|
||||
remotes, err := dEnv.GetRemotes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rem, ok := remotes[backupName]
|
||||
if !ok {
|
||||
return nil, ErrRemoteNotFound
|
||||
}
|
||||
ddb, err := rem.GetRemoteDB(ctx, types.Format_Default)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
replicateHook := doltdb.NewReplicateHook(ddb, dEnv.TempTableFilesDir())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
postCommitHooks = append(postCommitHooks, replicateHook)
|
||||
}
|
||||
|
||||
return postCommitHooks, nil
|
||||
}
|
||||
|
||||
var zeroHashStr = (hash.Hash{}).String()
|
||||
|
||||
var ErrPreexistingDoltDir = errors.New(".dolt dir already exists")
|
||||
@@ -158,6 +188,15 @@ func Load(ctx context.Context, hdp HomeDirProvider, fs filesys.Filesys, urlStr,
|
||||
}
|
||||
}
|
||||
|
||||
if dbLoadErr == nil {
|
||||
postCommitHooks, dbLoadErr := getCommitHooks(ctx, dEnv)
|
||||
if dbLoadErr != nil {
|
||||
dEnv.DBLoadError = dbLoadErr
|
||||
} else {
|
||||
dEnv.DoltDB.SetCommitHooks(ctx, postCommitHooks)
|
||||
}
|
||||
}
|
||||
|
||||
return dEnv
|
||||
}
|
||||
|
||||
|
||||
11
go/libraries/doltcore/env/remotes.go
vendored
11
go/libraries/doltcore/env/remotes.go
vendored
@@ -57,6 +57,17 @@ type Remote struct {
|
||||
dialer dbfactory.GRPCDialProvider
|
||||
}
|
||||
|
||||
func GetRemote(ctx context.Context, remoteName, remoteUrl string, params map[string]string, dialer dbfactory.GRPCDialProvider) (Remote, *doltdb.DoltDB, error) {
|
||||
r := NewRemote(remoteName, remoteUrl, params, dialer)
|
||||
ddb, err := r.GetRemoteDB(ctx, types.Format_Default)
|
||||
|
||||
if err != nil {
|
||||
return NoRemote, nil, err
|
||||
}
|
||||
|
||||
return r, ddb, nil
|
||||
}
|
||||
|
||||
func NewRemote(name, url string, params map[string]string, dialer dbfactory.GRPCDialProvider) Remote {
|
||||
return Remote{name, url, []string{"refs/heads/*:refs/remotes/" + name + "/*"}, params, dialer}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/utils/tracing"
|
||||
"github.com/dolthub/dolt/go/store/atomicerr"
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
@@ -66,7 +65,7 @@ var ErrInvalidDoltSpecPath = errors.New("invalid dolt spec path")
|
||||
var globalHttpFetcher HTTPFetcher = &http.Client{}
|
||||
|
||||
var _ nbs.TableFileStore = (*DoltChunkStore)(nil)
|
||||
var _ datas.NBSCompressedChunkStore = (*DoltChunkStore)(nil)
|
||||
var _ nbs.NBSCompressedChunkStore = (*DoltChunkStore)(nil)
|
||||
var _ chunks.ChunkStore = (*DoltChunkStore)(nil)
|
||||
var _ chunks.LoggingChunkStore = (*DoltChunkStore)(nil)
|
||||
|
||||
|
||||
@@ -155,10 +155,8 @@ func (d DoltPullFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) {
|
||||
|
||||
func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -171,10 +169,8 @@ func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent)
|
||||
|
||||
func progFunc(ctx context.Context, progChan <-chan datas.PullProgress) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -24,7 +24,8 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
sqle.MinRowsPerPartition = 2
|
||||
sqle.MinRowsPerPartition = 8
|
||||
sqle.MaxRowsPerPartition = 1024
|
||||
}
|
||||
|
||||
func TestQueries(t *testing.T) {
|
||||
|
||||
@@ -26,6 +26,38 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
var _ sql.RowIter = (*keylessRowIter)(nil)
|
||||
|
||||
type keylessRowIter struct {
|
||||
keyedIter *DoltMapIter
|
||||
|
||||
cardIdx int
|
||||
nonCardCols int
|
||||
|
||||
lastRead sql.Row
|
||||
lastCard uint64
|
||||
}
|
||||
|
||||
func (k *keylessRowIter) Next() (sql.Row, error) {
|
||||
if k.lastCard == 0 {
|
||||
r, err := k.keyedIter.Next()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
k.lastCard = r[k.cardIdx].(uint64)
|
||||
k.lastRead = r[:k.nonCardCols]
|
||||
}
|
||||
|
||||
k.lastCard--
|
||||
return k.lastRead, nil
|
||||
}
|
||||
|
||||
func (k keylessRowIter) Close(ctx *sql.Context) error {
|
||||
return k.keyedIter.Close(ctx)
|
||||
}
|
||||
|
||||
// An iterator over the rows of a table.
|
||||
type doltTableRowIter struct {
|
||||
sql.RowIter
|
||||
@@ -43,48 +75,71 @@ func newRowIterator(ctx *sql.Context, tbl *doltdb.Table, projCols []string, part
|
||||
|
||||
if schema.IsKeyless(sch) {
|
||||
// would be more optimal to project columns into keyless tables also
|
||||
return newKeylessRowIterator(ctx, tbl, partition)
|
||||
return newKeylessRowIterator(ctx, tbl, projCols, partition)
|
||||
} else {
|
||||
return newKeyedRowIter(ctx, tbl, projCols, partition)
|
||||
}
|
||||
}
|
||||
|
||||
func newKeylessRowIterator(ctx *sql.Context, tbl *doltdb.Table, partition *doltTablePartition) (*doltTableRowIter, error) {
|
||||
var iter table.SqlTableReader
|
||||
var err error
|
||||
if partition.end == NoUpperBound {
|
||||
iter, err = table.NewBufferedTableReader(ctx, tbl)
|
||||
} else {
|
||||
iter, err = table.NewBufferedTableReaderForPartition(ctx, tbl, partition.start, partition.end)
|
||||
}
|
||||
|
||||
func newKeylessRowIterator(ctx *sql.Context, tbl *doltdb.Table, projectedCols []string, partition *doltTablePartition) (sql.RowIter, error) {
|
||||
mapIter, err := iterForPartition(ctx, partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &doltTableRowIter{
|
||||
ctx: ctx,
|
||||
reader: iter,
|
||||
cols, tagToSqlColIdx, err := getTagToResColIdx(ctx, tbl, projectedCols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
idxOfCardinality := len(cols)
|
||||
tagToSqlColIdx[schema.KeylessRowCardinalityTag] = idxOfCardinality
|
||||
|
||||
colsCopy := make([]schema.Column, len(cols), len(cols)+1)
|
||||
copy(colsCopy, cols)
|
||||
colsCopy = append(colsCopy, schema.NewColumn("__cardinality__", schema.KeylessRowCardinalityTag, types.UintKind, false))
|
||||
|
||||
conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, colsCopy, len(colsCopy))
|
||||
keyedItr, err := NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &keylessRowIter{
|
||||
keyedIter: keyedItr,
|
||||
cardIdx: idxOfCardinality,
|
||||
nonCardCols: len(cols),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newKeyedRowIter(ctx context.Context, tbl *doltdb.Table, projectedCols []string, partition *doltTablePartition) (sql.RowIter, error) {
|
||||
var err error
|
||||
var mapIter types.MapTupleIterator
|
||||
mapIter, err := iterForPartition(ctx, partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cols, tagToSqlColIdx, err := getTagToResColIdx(ctx, tbl, projectedCols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, cols, len(cols))
|
||||
return NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil
|
||||
}
|
||||
|
||||
func iterForPartition(ctx context.Context, partition *doltTablePartition) (types.MapTupleIterator, error) {
|
||||
rowData := partition.rowData
|
||||
if partition.end == NoUpperBound {
|
||||
mapIter, err = rowData.RangeIterator(ctx, 0, rowData.Len())
|
||||
return rowData.RangeIterator(ctx, 0, rowData.Len())
|
||||
} else {
|
||||
mapIter, err = partition.IteratorForPartition(ctx, rowData)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return partition.IteratorForPartition(ctx, rowData)
|
||||
}
|
||||
}
|
||||
|
||||
func getTagToResColIdx(ctx context.Context, tbl *doltdb.Table, projectedCols []string) ([]schema.Column, map[uint64]int, error) {
|
||||
sch, err := tbl.GetSchema(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
cols := sch.GetAllCols().GetColumns()
|
||||
@@ -96,9 +151,7 @@ func newKeyedRowIter(ctx context.Context, tbl *doltdb.Table, projectedCols []str
|
||||
tagToSqlColIdx[col.Tag] = i
|
||||
}
|
||||
}
|
||||
|
||||
conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, cols, len(cols))
|
||||
return NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil
|
||||
return cols, tagToSqlColIdx, nil
|
||||
}
|
||||
|
||||
// Next returns the next row in this row iterator, or an io.EOF error if there aren't any more.
|
||||
|
||||
@@ -47,6 +47,7 @@ const (
|
||||
partitionMultiplier = 2.0
|
||||
)
|
||||
|
||||
var MaxRowsPerPartition uint64 = 32 * 1024
|
||||
var MinRowsPerPartition uint64 = 1024
|
||||
|
||||
func init() {
|
||||
@@ -358,15 +359,20 @@ func (t *DoltTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error) {
|
||||
return newDoltTablePartitionIter(rowData, doltTablePartition{0, 0, rowData}), nil
|
||||
}
|
||||
|
||||
maxPartitions := uint64(partitionMultiplier * runtime.NumCPU())
|
||||
numPartitions := (numElements / MinRowsPerPartition) + 1
|
||||
itemsPerPartition := MaxRowsPerPartition
|
||||
numPartitions := (numElements / itemsPerPartition) + 1
|
||||
if numPartitions < uint64(partitionMultiplier*runtime.NumCPU()) {
|
||||
itemsPerPartition = numElements / uint64(partitionMultiplier*runtime.NumCPU())
|
||||
|
||||
if numPartitions > maxPartitions {
|
||||
numPartitions = maxPartitions
|
||||
if itemsPerPartition == 0 {
|
||||
itemsPerPartition = numElements
|
||||
numPartitions = 1
|
||||
} else {
|
||||
numPartitions = (numElements / itemsPerPartition) + 1
|
||||
}
|
||||
}
|
||||
|
||||
partitions := make([]doltTablePartition, numPartitions)
|
||||
itemsPerPartition := numElements / numPartitions
|
||||
for i := uint64(0); i < numPartitions-1; i++ {
|
||||
partitions[i] = doltTablePartition{i * itemsPerPartition, (i + 1) * itemsPerPartition, rowData}
|
||||
}
|
||||
|
||||
@@ -169,6 +169,14 @@ type Database interface {
|
||||
// level detail of the database that should infrequently be needed by
|
||||
// clients.
|
||||
chunkStore() chunks.ChunkStore
|
||||
|
||||
// SetCommitHooks attaches a list of CommitHook that can be executed
|
||||
// after CommitWithWorkingSet
|
||||
SetCommitHooks(context.Context, []CommitHook) *database
|
||||
|
||||
// WithCommitHookLogger passes an error handler from the user-facing session
|
||||
// to a commit hook executed at the datas layer
|
||||
SetCommitHookLogger(context.Context, io.Writer) *database
|
||||
}
|
||||
|
||||
func NewDatabase(cs chunks.ChunkStore) Database {
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
@@ -36,7 +37,8 @@ import (
|
||||
|
||||
type database struct {
|
||||
*types.ValueStore
|
||||
rt rootTracker
|
||||
rt rootTracker
|
||||
postCommitHooks []CommitHook
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -44,6 +46,16 @@ var (
|
||||
ErrMergeNeeded = errors.New("dataset head is not ancestor of commit")
|
||||
)
|
||||
|
||||
// CommitHook is an abstraction for executing arbitrary commands after atomic database commits
|
||||
type CommitHook interface {
|
||||
// Execute is arbitrary read-only function whose arguments are new Dataset commit into a specific Database
|
||||
Execute(ctx context.Context, ds Dataset, db Database) error
|
||||
// HandleError is an bridge function to handle Execute errors
|
||||
HandleError(ctx context.Context, err error) error
|
||||
// SetLogger lets clients specify an output stream for HandleError
|
||||
SetLogger(ctx context.Context, wr io.Writer) error
|
||||
}
|
||||
|
||||
// TODO: fix panics
|
||||
// rootTracker is a narrowing of the ChunkStore interface, to keep Database disciplined about working directly with Chunks
|
||||
type rootTracker interface {
|
||||
@@ -771,6 +783,8 @@ func (db *database) CommitWithWorkingSet(
|
||||
return Dataset{}, Dataset{}, err
|
||||
}
|
||||
|
||||
db.callCommitHooks(ctx, commitDS)
|
||||
|
||||
return commitDS, workingSetDS, nil
|
||||
}
|
||||
|
||||
@@ -963,10 +977,35 @@ func buildNewCommit(ctx context.Context, ds Dataset, v types.Value, opts CommitO
|
||||
|
||||
func (db *database) doHeadUpdate(ctx context.Context, ds Dataset, updateFunc func(ds Dataset) error) (Dataset, error) {
|
||||
err := updateFunc(ds)
|
||||
|
||||
if err != nil {
|
||||
return Dataset{}, err
|
||||
}
|
||||
|
||||
return db.GetDataset(ctx, ds.ID())
|
||||
}
|
||||
|
||||
func (db *database) SetCommitHooks(ctx context.Context, postHooks []CommitHook) *database {
|
||||
db.postCommitHooks = postHooks
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *database) SetCommitHookLogger(ctx context.Context, wr io.Writer) *database {
|
||||
for _, h := range db.postCommitHooks {
|
||||
h.SetLogger(ctx, wr)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *database) PostCommitHooks() []CommitHook {
|
||||
return db.postCommitHooks
|
||||
}
|
||||
|
||||
func (db *database) callCommitHooks(ctx context.Context, ds Dataset) {
|
||||
var err error
|
||||
for _, hook := range db.postCommitHooks {
|
||||
err = hook.Execute(ctx, ds, db)
|
||||
if err != nil {
|
||||
hook.HandleError(ctx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,17 +59,12 @@ type CmpChnkAndRefs struct {
|
||||
refs map[hash.Hash]int
|
||||
}
|
||||
|
||||
type NBSCompressedChunkStore interface {
|
||||
chunks.ChunkStore
|
||||
GetManyCompressed(context.Context, hash.HashSet, func(nbs.CompressedChunk)) error
|
||||
}
|
||||
|
||||
// Puller is used to sync data between to Databases
|
||||
type Puller struct {
|
||||
fmt *types.NomsBinFormat
|
||||
|
||||
srcDB Database
|
||||
srcChunkStore NBSCompressedChunkStore
|
||||
srcChunkStore nbs.NBSCompressedChunkStore
|
||||
sinkDBCS chunks.ChunkStore
|
||||
rootChunkHash hash.Hash
|
||||
downloaded hash.HashSet
|
||||
@@ -158,7 +153,7 @@ func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcDB, sink
|
||||
return nil, fmt.Errorf("cannot pull from src to sink; src version is %v and sink version is %v", srcDB.chunkStore().Version(), sinkDB.chunkStore().Version())
|
||||
}
|
||||
|
||||
srcChunkStore, ok := srcDB.chunkStore().(NBSCompressedChunkStore)
|
||||
srcChunkStore, ok := srcDB.chunkStore().(nbs.NBSCompressedChunkStore)
|
||||
if !ok {
|
||||
return nil, ErrIncompatibleSourceChunkStore
|
||||
}
|
||||
|
||||
@@ -81,6 +81,11 @@ func makeGlobalCaches() {
|
||||
makeManifestManager = func(m manifest) manifestManager { return manifestManager{m, manifestCache, manifestLocks} }
|
||||
}
|
||||
|
||||
type NBSCompressedChunkStore interface {
|
||||
chunks.ChunkStore
|
||||
GetManyCompressed(context.Context, hash.HashSet, func(CompressedChunk)) error
|
||||
}
|
||||
|
||||
type NomsBlockStore struct {
|
||||
mm manifestManager
|
||||
p tablePersister
|
||||
|
||||
@@ -23,13 +23,13 @@ teardown() {
|
||||
|
||||
run dolt sql -q "INSERT INTO test (c0) VALUES (44);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 = 44;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 = 44 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "4,44" ]] || false
|
||||
|
||||
run dolt sql -q "INSERT INTO test (c0) VALUES (55),(66);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 50;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 50 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "5,55" ]] || false
|
||||
[[ "$output" =~ "6,66" ]] || false
|
||||
@@ -45,7 +45,7 @@ CREATE TABLE ai (
|
||||
INSERT INTO ai VALUES (NULL,1),(NULL,2),(NULL,3);
|
||||
SQL
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM ai;" -r csv
|
||||
run dolt sql -q "SELECT * FROM ai ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "1,1" ]] || false
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
@@ -55,7 +55,7 @@ SQL
|
||||
@test "auto_increment: insert into empty auto_increment table" {
|
||||
run dolt sql -q "INSERT INTO test (c0) VALUES (1);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 = 1;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 = 1 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "1,1" ]] || false
|
||||
}
|
||||
@@ -67,7 +67,7 @@ SQL
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "INSERT INTO test VALUES (2,2);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM test;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "1,1" ]] || false
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
@@ -80,13 +80,14 @@ SQL
|
||||
|
||||
run dolt sql -q "INSERT INTO test (pk,c0) VALUES (NULL,2);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 1;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 1 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
|
||||
run dolt sql -q "INSERT INTO test VALUES (NULL,3), (10,10), (NULL,11);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 2;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 2 ORDER BY pk;" -r csv
|
||||
echo $output
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "3,3" ]] || false
|
||||
[[ "$output" =~ "10,10" ]] || false
|
||||
@@ -98,13 +99,13 @@ SQL
|
||||
|
||||
run dolt sql -q "INSERT INTO test (pk,c0) VALUES (0,2);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 1;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 1 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
|
||||
run dolt sql -q "INSERT INTO test VALUES (0,3), (10,10), (0,11);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 2;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test WHERE c0 > 2 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "3,3" ]] || false
|
||||
[[ "$output" =~ "10,10" ]] || false
|
||||
@@ -121,7 +122,7 @@ INSERT INTO test (c0) VALUES (21);
|
||||
SQL
|
||||
[ "$status" -eq 0 ]
|
||||
|
||||
run dolt sql -q "SELECT * FROM test;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "1,1" ]] || false
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
@@ -143,7 +144,7 @@ INSERT INTO test2 SELECT (pk + 20), c0 FROM test2;
|
||||
SQL
|
||||
[ "$status" -eq 0 ]
|
||||
|
||||
run dolt sql -q "select * from test2 order by pk" -r csv
|
||||
run dolt sql -q "select * from test2 ORDER BY pk" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "1,1" ]] || false
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
@@ -170,7 +171,7 @@ SQL
|
||||
dolt sql -q "INSERT INTO auto_float (pk, c0) VALUES (3.9,4);"
|
||||
dolt sql -q "INSERT INTO auto_float (c0) VALUES (5);"
|
||||
|
||||
run dolt sql -q "SELECT * FROM auto_float;" -r csv
|
||||
run dolt sql -q "SELECT * FROM auto_float ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "1,1" ]] || false
|
||||
[[ "$output" =~ "2.1,2" ]] || false
|
||||
@@ -193,7 +194,7 @@ SQL
|
||||
echo "$TYPE"
|
||||
run dolt sql -q "INSERT INTO auto_$TYPE (c0) VALUES (2);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM auto_$TYPE WHERE c0 > 1;" -r csv
|
||||
run dolt sql -q "SELECT * FROM auto_$TYPE WHERE c0 > 1 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
done
|
||||
@@ -211,7 +212,7 @@ SQL
|
||||
echo "$TYPE"
|
||||
run dolt sql -q "INSERT INTO auto2_$TYPE (c0) VALUES (2);"
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM auto2_$TYPE WHERE c0 > 1;" -r csv
|
||||
run dolt sql -q "SELECT * FROM auto2_$TYPE WHERE c0 > 1 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
done
|
||||
@@ -248,7 +249,7 @@ SQL
|
||||
dolt merge other
|
||||
|
||||
dolt sql -q "INSERT INTO test VALUES (NULL,22);"
|
||||
run dolt sql -q "SELECT pk FROM test WHERE c0 = 22;" -r csv
|
||||
run dolt sql -q "SELECT pk FROM test WHERE c0 = 22 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "22" ]] || false
|
||||
}
|
||||
@@ -271,7 +272,7 @@ SQL
|
||||
dolt checkout master
|
||||
dolt merge other
|
||||
dolt sql -q "INSERT INTO test VALUES (NULL,22);"
|
||||
run dolt sql -q "SELECT pk FROM test WHERE c0 = 22;" -r csv
|
||||
run dolt sql -q "SELECT pk FROM test WHERE c0 = 22 ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "22" ]] || false
|
||||
}
|
||||
@@ -280,7 +281,7 @@ SQL
|
||||
run dolt sql -q "ALTER TABLE test AUTO_INCREMENT = 10;"
|
||||
[ "$status" -eq 0 ]
|
||||
dolt sql -q "INSERT INTO test VALUES (NULL,10);"
|
||||
run dolt sql -q "SELECT * FROM test;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "10,10" ]] || false
|
||||
|
||||
@@ -288,7 +289,7 @@ SQL
|
||||
ALTER TABLE test AUTO_INCREMENT = 20;
|
||||
INSERT INTO test VALUES (NULL,20),(30,30),(NULL,31);
|
||||
SQL
|
||||
run dolt sql -q "SELECT * FROM test;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "10,10" ]] || false
|
||||
[[ "$output" =~ "20,20" ]] || false
|
||||
@@ -310,7 +311,7 @@ ALTER TABLE index_test ADD INDEX (c0);
|
||||
INSERT INTO index_test (c0) VALUES (4),(5),(6);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "select * from index_test" -r csv
|
||||
run dolt sql -q "select * from index_test ORDER BY pk" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -329,7 +330,7 @@ SQL
|
||||
|
||||
dolt sql -q "INSERT INTO test (c0) SELECT pk FROM other;"
|
||||
|
||||
run dolt sql -q "SELECT * FROM test;" -r csv
|
||||
run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -351,7 +352,7 @@ TRUNCATE t;
|
||||
INSERT INTO t (c0) VALUES (1),(2),(3);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "SELECT * FROM t;" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -370,7 +371,7 @@ INSERT INTO t (c0) VALUES (1),(2),(3);
|
||||
INSERT INTO t (c0) VALUES (4),(5),(6);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "SELECT * FROM t;" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -393,7 +394,7 @@ INSERT INTO t VALUES (4, 4);
|
||||
INSERT INTO t (c0) VALUES (5),(6),(7);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "SELECT * FROM t;" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -419,7 +420,7 @@ INSERT into t VALUES (3, 3);
|
||||
INSERT INTO t (c0) VALUES (8);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "SELECT * FROM t;" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -451,7 +452,7 @@ SELECT DOLT_MERGE('test');
|
||||
INSERT INTO t VALUES (NULL,5),(6,6),(NULL,7);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "SELECT * FROM t;" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -482,7 +483,7 @@ SELECT DOLT_MERGE('test');
|
||||
INSERT INTO t VALUES (10,10),(NULL,11);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "SELECT * FROM t;" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -515,7 +516,7 @@ SELECT DOLT_MERGE('test');
|
||||
INSERT INTO t VALUES (3,3),(NULL,6);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "SELECT * FROM t;" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -548,7 +549,7 @@ SELECT DOLT_MERGE('test');
|
||||
INSERT INTO t VALUES (NULL,6);
|
||||
SQL
|
||||
|
||||
run dolt sql -q "SELECT * FROM t;" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk,c0" ]] || false
|
||||
[[ "${lines[1]}" =~ "1,1" ]] || false
|
||||
@@ -564,7 +565,7 @@ SQL
|
||||
[ "$status" -eq 0 ]
|
||||
|
||||
dolt sql -q "insert into test2 values (0, 'john', 0)"
|
||||
run dolt sql -q "SELECT * from test2" -r csv
|
||||
run dolt sql -q "SELECT * from test2 ORDER BY pk" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "0,john,0" ]] || false
|
||||
}
|
||||
@@ -574,7 +575,7 @@ SQL
|
||||
dolt sql -q "ALTER TABLE t CHANGE COLUMN pk pk int NOT NULL AUTO_INCREMENT PRIMARY KEY;"
|
||||
|
||||
dolt sql -q 'insert into t values (NULL), (NULL), (NULL)'
|
||||
run dolt sql -q "SELECT * FROM t" -r csv
|
||||
run dolt sql -q "SELECT * FROM t ORDER BY pk" -r csv
|
||||
[[ "${lines[0]}" =~ "pk" ]] || false
|
||||
[[ "${lines[1]}" =~ "1" ]] || false
|
||||
[[ "${lines[2]}" =~ "2" ]] || false
|
||||
|
||||
@@ -100,12 +100,12 @@ test_mutation() {
|
||||
expected="$3"
|
||||
uses_pk="$4"
|
||||
dolt sql -q "$dml"
|
||||
run dolt sql -q "select * from $table" -r csv
|
||||
run dolt sql -q "select * from $table ORDER BY pk1" -r csv
|
||||
[ "$status" -eq "0" ]
|
||||
[ "$output" == "$expected" ] || (echo $output && exit 1)
|
||||
dolt reset --hard
|
||||
dolt sql --batch -q "$dml ; $dml"
|
||||
run dolt sql -q "select * from $table" -r csv
|
||||
run dolt sql -q "select * from $table ORDER BY pk1" -r csv
|
||||
[ "$status" -eq "0" ]
|
||||
[ "$output" == "$expected" ] || (echo $output && exit 1)
|
||||
run dolt sql -q "explain $dml"
|
||||
|
||||
@@ -392,7 +392,7 @@ SQL
|
||||
run dolt ls
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "test" ]] || false
|
||||
run dolt sql -q "select * from test"
|
||||
run dolt sql -q "select * from test ORDER BY pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 11 ]
|
||||
[ "${lines[3]}" = '| a | "" | 1 |' ]
|
||||
@@ -418,7 +418,7 @@ SQL
|
||||
[ "$status" -eq 0 ]
|
||||
|
||||
# schema argument subsets the data and adds empty column
|
||||
run dolt sql -r csv -q "select * from subset"
|
||||
run dolt sql -r csv -q "select * from subset ORDER BY pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${lines[0]}" = "pk,c1,c3,noData" ]
|
||||
[ "${lines[1]}" = "0,1,3," ]
|
||||
@@ -440,7 +440,7 @@ SQL
|
||||
run dolt ls
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "empty_strings_null_values" ]] || false
|
||||
run dolt sql -q "select * from empty_strings_null_values"
|
||||
run dolt sql -q "select * from empty_strings_null_values ORDER BY pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 11 ]
|
||||
[ "${lines[3]}" = '| a | "" | 1 |' ]
|
||||
@@ -467,7 +467,7 @@ SQL
|
||||
run dolt ls
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "test" ]] || false
|
||||
run dolt sql -q "select * from test"
|
||||
run dolt sql -q "select * from test ORDER BY pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 11 ]
|
||||
[ "${lines[3]}" = '| a | "" | 1 |' ]
|
||||
|
||||
@@ -2025,13 +2025,13 @@ CREATE TABLE test2 (
|
||||
INSERT INTO test VALUES (0, NULL), (1, NULL), (2, NULL);
|
||||
INSERT INTO test2 VALUES (0, NULL, NULL), (1, NULL, NULL), (2, 1, NULL), (3, 1, NULL), (4, NULL, 1), (5, NULL, 1);
|
||||
SQL
|
||||
run dolt sql -q "SELECT * FROM test" -r=json
|
||||
run dolt sql -q "SELECT * FROM test order by pk" -r=json
|
||||
[ "$status" -eq "0" ]
|
||||
[[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2}]}' ]] || false
|
||||
run dolt sql -q "SELECT * FROM test WHERE v1 IS NULL" -r=json
|
||||
run dolt sql -q "SELECT * FROM test WHERE v1 IS NULL order by pk" -r=json
|
||||
[ "$status" -eq "0" ]
|
||||
[[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2}]}' ]] || false
|
||||
run dolt sql -q "SELECT * FROM test2" -r=json
|
||||
run dolt sql -q "SELECT * FROM test2 order by pk" -r=json
|
||||
[ "$status" -eq "0" ]
|
||||
[[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2,"v1":1},{"pk":3,"v1":1},{"pk":4,"v2":1},{"pk":5,"v2":1}]}' ]] || false
|
||||
}
|
||||
|
||||
@@ -336,7 +336,7 @@ SQL
|
||||
dolt checkout master
|
||||
run dolt merge other
|
||||
[ "$status" -eq 0 ]
|
||||
run dolt sql -q "SELECT * FROM quiz;" -r csv
|
||||
run dolt sql -q "SELECT * FROM quiz ORDER BY pk;" -r csv
|
||||
[[ "${lines[0]}" =~ "pk" ]] || false
|
||||
[[ "${lines[1]}" =~ "10" ]] || false
|
||||
[[ "${lines[2]}" =~ "11" ]] || false
|
||||
|
||||
@@ -100,7 +100,7 @@ teardown() {
|
||||
|
||||
@test "query-catalog: executed saved" {
|
||||
Q1="select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 order by 1"
|
||||
Q2="select pk from one_pk"
|
||||
Q2="select pk from one_pk order by pk"
|
||||
dolt sql -q "$Q1" -s name1
|
||||
dolt sql -q "$Q2" -s name2
|
||||
|
||||
@@ -138,7 +138,7 @@ EOF
|
||||
EXPECTED=$(cat <<'EOF'
|
||||
id,display_order,name,query,description
|
||||
name1,1,name1,"select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 order by 1",""
|
||||
name2,2,name2,select pk from one_pk,""
|
||||
name2,2,name2,select pk from one_pk order by pk,""
|
||||
EOF
|
||||
)
|
||||
|
||||
@@ -154,7 +154,7 @@ EOF
|
||||
EXPECTED=$(cat <<'EOF'
|
||||
id,display_order,name,query,description
|
||||
name1,1,name1,"select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 and pk < 3 order by 1 desc",""
|
||||
name2,2,name2,select pk from one_pk,""
|
||||
name2,2,name2,select pk from one_pk order by pk,""
|
||||
EOF
|
||||
)
|
||||
|
||||
|
||||
54
integration-tests/bats/replication.bats
Normal file
54
integration-tests/bats/replication.bats
Normal file
@@ -0,0 +1,54 @@
|
||||
#!/usr/bin/env bats
|
||||
load $BATS_TEST_DIRNAME/helper/common.bash
|
||||
|
||||
setup() {
|
||||
setup_common
|
||||
TMPDIRS=$(pwd)/tmpdirs
|
||||
mkdir -p $TMPDIRS/{bac1,repo1}
|
||||
|
||||
# repo1 -> bac1 -> repo2
|
||||
cd $TMPDIRS/repo1
|
||||
dolt init
|
||||
dolt branch feature
|
||||
dolt remote add backup1 file://../bac1
|
||||
cd $TMPDIRS
|
||||
}
|
||||
|
||||
teardown() {
|
||||
teardown_common
|
||||
rm -rf $TMPDIRS
|
||||
cd $BATS_TMPDIR
|
||||
}
|
||||
|
||||
@test "replication: default no replication" {
|
||||
cd repo1
|
||||
dolt sql -q "create table t1 (a int primary key)"
|
||||
dolt commit -am "cm"
|
||||
|
||||
[ ! -d "../bac1/.dolt" ] || false
|
||||
}
|
||||
|
||||
@test "replication: push on commit" {
|
||||
export DOLT_BACKUP_TO_REMOTE=backup1
|
||||
cd repo1
|
||||
dolt remote -v
|
||||
dolt sql -q "create table t1 (a int primary key)"
|
||||
dolt commit -am "cm"
|
||||
|
||||
cd ..
|
||||
dolt clone file://./bac1 repo2
|
||||
export DOLT_BACKUP_TO_REMOTE=
|
||||
cd repo2
|
||||
run dolt ls
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 2 ]
|
||||
[[ "$output" =~ "t1" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: no tags" {
|
||||
export DOLT_BACKUP_TO_REMOTE=backup1
|
||||
cd repo1
|
||||
dolt tag
|
||||
|
||||
[ ! -d "../bac1/.dolt" ] || false
|
||||
}
|
||||
@@ -683,7 +683,7 @@ SQL
|
||||
SELECT DOLT_MERGE('feature-branch');
|
||||
"
|
||||
|
||||
server_query repo1 1 "SELECT * FROM test" "pk\n1\n2\n3\n1000"
|
||||
server_query repo1 1 "SELECT * FROM test ORDER BY pk" "pk\n1\n2\n3\n1000"
|
||||
|
||||
server_query repo1 1 "SELECT COUNT(*) FROM dolt_log" "COUNT(*)\n3"
|
||||
}
|
||||
@@ -1038,8 +1038,6 @@ while True:
|
||||
dolt remote add origin file://../rem1
|
||||
start_sql_server repo1
|
||||
|
||||
dolt status
|
||||
dolt branch
|
||||
dolt push origin master
|
||||
run server_query repo1 1 "select dolt_push() as p" "p\n0"
|
||||
[ "$status" -eq 1 ]
|
||||
@@ -1050,3 +1048,32 @@ while True:
|
||||
skip "In-memory branch doesn't track upstream correctly"
|
||||
server_query repo1 1 "select dolt_push() as p" "p\n1"
|
||||
}
|
||||
|
||||
@test "sql-server: replicate to backup after sql-session commit" {
|
||||
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
|
||||
|
||||
mkdir bac1
|
||||
cd repo1
|
||||
dolt remote add backup1 file://../bac1
|
||||
export DOLT_BACKUP_TO_REMOTE=backup1
|
||||
start_sql_server repo1
|
||||
|
||||
multi_query repo1 1 "
|
||||
CREATE TABLE test (
|
||||
pk int primary key
|
||||
);
|
||||
INSERT INTO test VALUES (0),(1),(2);
|
||||
SELECT DOLT_ADD('.');
|
||||
SELECT DOLT_COMMIT('-m', 'Step 1');"
|
||||
|
||||
cd ..
|
||||
dolt clone file://./bac1 repo3
|
||||
cd repo3
|
||||
export DOLT_BACKUP_TO_REMOTE=
|
||||
run dolt sql -q "select * from test" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk" ]]
|
||||
[[ "${lines[1]}" =~ "0" ]]
|
||||
[[ "${lines[2]}" =~ "1" ]]
|
||||
[[ "${lines[3]}" =~ "2" ]]
|
||||
}
|
||||
|
||||
@@ -324,23 +324,23 @@ SQL
|
||||
}
|
||||
|
||||
@test "sql: basic inner join" {
|
||||
run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1"
|
||||
run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 order by pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 8 ]
|
||||
first_join_output=$output
|
||||
run dolt sql -q "select pk,pk1,pk2 from two_pk join one_pk on one_pk.c1=two_pk.c1"
|
||||
run dolt sql -q "select pk,pk1,pk2 from two_pk join one_pk on one_pk.c1=two_pk.c1 order by pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 8 ]
|
||||
[ "$output" = "$first_join_output" ]
|
||||
run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 where pk=1"
|
||||
run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 where pk=1 order by pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 5 ]
|
||||
run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1"
|
||||
run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 order by pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ foo ]] || false
|
||||
[[ "$output" =~ bar ]] || false
|
||||
[ "${#lines[@]}" -eq 8 ]
|
||||
run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 where one_pk.c1=10"
|
||||
run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 where one_pk.c1=10 order by pk"
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 5 ]
|
||||
[[ "$output" =~ "10" ]] || false
|
||||
|
||||
Reference in New Issue
Block a user