diff --git a/go/cmd/dolt/commands/push.go b/go/cmd/dolt/commands/push.go index af2f7eea9f..f7fa0f8b74 100644 --- a/go/cmd/dolt/commands/push.go +++ b/go/cmd/dolt/commands/push.go @@ -185,10 +185,8 @@ func pullerProgFunc(ctx context.Context, pullerEventCh chan datas.PullerEvent) { uploadRate := "" for evt := range pullerEventCh { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } switch evt.EventType { case datas.NewLevelTWEvent: @@ -247,10 +245,8 @@ func progFunc(ctx context.Context, progChan chan datas.PullProgress) { lenPrinted := 0 done := false for !done { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } select { case <-ctx.Done(): diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index bd1b792ae6..efd69151e3 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -232,6 +232,8 @@ func newSessionBuilder(sqlEngine *sqle.Engine, username string, email string, pr cli.PrintErr(err) return nil, nil, nil, err } + + db.GetDoltDB().SetCommitHookLogger(ctx, doltSess.GetLogger().Logger.Out) } return doltSess, ir, vr, nil diff --git a/go/cmd/dolt/dolt.go b/go/cmd/dolt/dolt.go index 7bbde32c8f..13eb855b5c 100644 --- a/go/cmd/dolt/dolt.go +++ b/go/cmd/dolt/dolt.go @@ -16,7 +16,6 @@ package main import ( "context" - "log" "net/http" _ "net/http/pprof" "os" @@ -50,7 +49,7 @@ import ( ) const ( - Version = "0.28.4" + Version = "0.28.5" ) var dumpDocsCommand = &commands.DumpDocsCmd{} @@ -148,7 +147,27 @@ func runMain() int { case pprofServerFlag: // serve the pprof endpoints setup in the init function run when "net/http/pprof" is imported go func() { - log.Println(http.ListenAndServe("localhost:6060", nil)) + cyanStar := color.CyanString("*") + cli.Println(cyanStar, "Starting pprof server on port 6060.") + cli.Println(cyanStar, "Go to", color.CyanString("http://localhost:6060/debug/pprof"), "in a browser to see supported endpoints.") + cli.Println(cyanStar) + cli.Println(cyanStar, "Known endpoints are:") + cli.Println(cyanStar, " /allocs: A sampling of all past memory allocations") + cli.Println(cyanStar, " /block: Stack traces that led to blocking on synchronization primitives") + cli.Println(cyanStar, " /cmdline: The command line invocation of the current program") + cli.Println(cyanStar, " /goroutine: Stack traces of all current goroutines") + cli.Println(cyanStar, " /heap: A sampling of memory allocations of live objects. You can specify the gc GET parameter to run GC before taking the heap sample.") + cli.Println(cyanStar, " /mutex: Stack traces of holders of contended mutexes") + cli.Println(cyanStar, " /profile: CPU profile. You can specify the duration in the seconds GET parameter. After you get the profile file, use the go tool pprof command to investigate the profile.") + cli.Println(cyanStar, " /threadcreate: Stack traces that led to the creation of new OS threads") + cli.Println(cyanStar, " /trace: A trace of execution of the current program. You can specify the duration in the seconds GET parameter. After you get the trace file, use the go tool trace command to investigate the trace.") + cli.Println() + + err := http.ListenAndServe("localhost:6060", nil) + + if err != nil { + cli.Println(color.YellowString("pprof server exited with error: %v", err)) + } }() args = args[1:] @@ -234,7 +253,6 @@ func runMain() int { } root, err := env.GetCurrentUserHomeDir() - if err != nil { cli.PrintErrln(color.RedString("Failed to load the HOME directory: %v", err)) return 1 @@ -281,6 +299,10 @@ func runMain() int { defer tempfiles.MovableTempFileProvider.Clean() + if dEnv.DoltDB != nil { + dEnv.DoltDB.SetCommitHookLogger(ctx, cli.OutStream) + } + start := time.Now() res := doltCommand.Exec(ctx, "dolt", args, dEnv) diff --git a/go/libraries/doltcore/doltdb/commit_hooks.go b/go/libraries/doltcore/doltdb/commit_hooks.go new file mode 100644 index 0000000000..94e0e4597d --- /dev/null +++ b/go/libraries/doltcore/doltdb/commit_hooks.go @@ -0,0 +1,157 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package doltdb + +import ( + "context" + "io" + "sync" + + "github.com/dolthub/dolt/go/libraries/doltcore/ref" + + "github.com/dolthub/dolt/go/store/datas" +) + +const BackupToRemoteKey = "DOLT_BACKUP_TO_REMOTE" + +type ReplicateHook struct { + destDB datas.Database + tmpDir string + outf io.Writer +} + +// NewReplicateHook creates a ReplicateHook, parameterizaed by the backup database +// and a local tempfile for pushing +func NewReplicateHook(destDB *DoltDB, tmpDir string) *ReplicateHook { + return &ReplicateHook{destDB: destDB.db, tmpDir: tmpDir} +} + +// Execute implements datas.CommitHook, replicates head updates to the destDb field +func (rh *ReplicateHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { + return replicate(ctx, rh.destDB, db, rh.tmpDir, ds) +} + +// HandleError implements datas.CommitHook +func (rh *ReplicateHook) HandleError(ctx context.Context, err error) error { + if rh.outf != nil { + rh.outf.Write([]byte(err.Error())) + } + return nil +} + +// SetLogger implements datas.CommitHook +func (rh *ReplicateHook) SetLogger(ctx context.Context, wr io.Writer) error { + rh.outf = wr + return nil +} + +// replicate pushes a dataset from srcDB to destDB and force sets the destDB ref to the new dataset value +func replicate(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error { + stRef, ok, err := ds.MaybeHeadRef() + if err != nil { + return err + } + if !ok { + // No head ref, return + return nil + } + + rf, err := ref.Parse(ds.ID()) + if err != nil { + return err + } + + newCtx, cancelFunc := context.WithCancel(ctx) + wg, progChan, pullerEventCh := runProgFuncs(newCtx) + defer stopProgFuncs(cancelFunc, wg, progChan, pullerEventCh) + puller, err := datas.NewPuller(ctx, tempTableDir, defaultChunksPerTF, srcDB, destDB, stRef.TargetHash(), pullerEventCh) + if err == datas.ErrDBUpToDate { + return nil + } else if err != nil { + return err + } + + err = puller.Pull(ctx) + if err != nil { + return err + } + + if err != nil { + return err + } + + ds, err = destDB.GetDataset(ctx, rf.String()) + if err != nil { + return err + } + + _, err = destDB.SetHead(ctx, ds, stRef) + return err +} + +func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) { + for { + if ctx.Err() != nil { + return + } + select { + case <-ctx.Done(): + return + case <-pullerEventCh: + default: + } + } +} + +func progFunc(ctx context.Context, progChan <-chan datas.PullProgress) { + for { + if ctx.Err() != nil { + return + } + select { + case <-ctx.Done(): + return + case <-progChan: + default: + } + } +} + +func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan datas.PullProgress, chan datas.PullerEvent) { + pullerEventCh := make(chan datas.PullerEvent) + progChan := make(chan datas.PullProgress) + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + progFunc(ctx, progChan) + }() + + wg.Add(1) + go func() { + defer wg.Done() + pullerProgFunc(ctx, pullerEventCh) + }() + + return wg, progChan, pullerEventCh +} + +func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) { + cancel() + close(progChan) + close(pullerEventCh) + wg.Wait() +} diff --git a/go/libraries/doltcore/doltdb/commit_hooks_test.go b/go/libraries/doltcore/doltdb/commit_hooks_test.go new file mode 100644 index 0000000000..f9b11ff789 --- /dev/null +++ b/go/libraries/doltcore/doltdb/commit_hooks_test.go @@ -0,0 +1,149 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package doltdb + +import ( + "bytes" + "context" + "errors" + "path/filepath" + "testing" + + "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" + "github.com/dolthub/dolt/go/libraries/doltcore/ref" + "github.com/dolthub/dolt/go/libraries/utils/filesys" + "github.com/dolthub/dolt/go/libraries/utils/test" + "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/types" + + "github.com/stretchr/testify/assert" +) + +func TestReplicateHook(t *testing.T) { + ctx := context.Background() + + // destination repo + testDir, err := test.ChangeToTestDir("TestReplicationDest") + + if err != nil { + panic("Couldn't change the working directory to the test directory.") + } + + committerName := "Bill Billerson" + committerEmail := "bigbillieb@fake.horse" + + tmpDir := filepath.Join(testDir, dbfactory.DoltDataDir) + err = filesys.LocalFS.MkDirs(tmpDir) + + if err != nil { + t.Fatal("Failed to create noms directory") + } + + destDB, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS) + + // source repo + testDir, err = test.ChangeToTestDir("TestReplicationSource") + + if err != nil { + panic("Couldn't change the working directory to the test directory.") + } + + tmpDir = filepath.Join(testDir, dbfactory.DoltDataDir) + err = filesys.LocalFS.MkDirs(tmpDir) + + if err != nil { + t.Fatal("Failed to create noms directory") + } + + ddb, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS) + err = ddb.WriteEmptyRepo(context.Background(), "master", committerName, committerEmail) + + if err != nil { + t.Fatal("Unexpected error creating empty repo", err) + } + + // prepare a commit in the source repo + cs, _ := NewCommitSpec("master") + commit, err := ddb.Resolve(context.Background(), cs, nil) + + if err != nil { + t.Fatal("Couldn't find commit") + } + + meta, err := commit.GetCommitMeta() + assert.NoError(t, err) + + if meta.Name != committerName || meta.Email != committerEmail { + t.Error("Unexpected metadata") + } + + root, err := commit.GetRootValue() + + assert.NoError(t, err) + + names, err := root.GetTableNames(context.Background()) + assert.NoError(t, err) + if len(names) != 0 { + t.Fatal("There should be no tables in empty db") + } + + tSchema := createTestSchema(t) + rowData, _ := createTestRowData(t, ddb.db, tSchema) + tbl, err := CreateTestTable(ddb.db, tSchema, rowData) + + if err != nil { + t.Fatal("Failed to create test table with data") + } + + root, err = root.PutTable(context.Background(), "test", tbl) + assert.NoError(t, err) + + valHash, err := ddb.WriteRootValue(context.Background(), root) + assert.NoError(t, err) + + meta, err = NewCommitMeta(committerName, committerEmail, "Sample data") + if err != nil { + t.Error("Failed to commit") + } + + // setup hook + hook := NewReplicateHook(destDB, tmpDir) + ddb.SetCommitHooks(ctx, []datas.CommitHook{hook}) + + t.Run("replicate to backup remote", func(t *testing.T) { + srcCommit, err := ddb.Commit(context.Background(), valHash, ref.NewBranchRef("master"), meta) + ds, err := ddb.db.GetDataset(ctx, "refs/heads/master") + err = hook.Execute(ctx, ds, ddb.db) + assert.NoError(t, err) + + cs, _ = NewCommitSpec("master") + destCommit, err := destDB.Resolve(context.Background(), cs, nil) + + srcHash, _ := srcCommit.HashOf() + destHash, _ := destCommit.HashOf() + assert.Equal(t, srcHash, destHash) + }) + + t.Run("replicate handle error logs to writer", func(t *testing.T) { + var buffer = &bytes.Buffer{} + err = hook.SetLogger(ctx, buffer) + assert.NoError(t, err) + + msg := "prince charles is a vampire" + hook.HandleError(ctx, errors.New(msg)) + + assert.Equal(t, buffer.String(), msg) + }) +} diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 60f7bedf82..45c6c7ccf8 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "io" "path/filepath" "strings" "time" @@ -1279,3 +1280,15 @@ func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB func (ddb *DoltDB) Clone(ctx context.Context, destDB *DoltDB, eventCh chan<- datas.TableFileEvent) error { return datas.Clone(ctx, ddb.db, destDB.db, eventCh) } + +func (ddb *DoltDB) SetCommitHooks(ctx context.Context, postHooks []datas.CommitHook) *DoltDB { + ddb.db = ddb.db.SetCommitHooks(ctx, postHooks) + return ddb +} + +func (ddb *DoltDB) SetCommitHookLogger(ctx context.Context, wr io.Writer) *DoltDB { + if ddb.db != nil { + ddb.db = ddb.db.SetCommitHookLogger(ctx, wr) + } + return ddb +} diff --git a/go/libraries/doltcore/env/actions/remotes.go b/go/libraries/doltcore/env/actions/remotes.go index 2deae6a7a5..c9fac9f392 100644 --- a/go/libraries/doltcore/env/actions/remotes.go +++ b/go/libraries/doltcore/env/actions/remotes.go @@ -20,12 +20,11 @@ import ( "fmt" "sync" - "github.com/dolthub/dolt/go/libraries/doltcore/remotestorage" - eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/ref" + "github.com/dolthub/dolt/go/libraries/doltcore/remotestorage" "github.com/dolthub/dolt/go/libraries/events" "github.com/dolthub/dolt/go/libraries/utils/earl" "github.com/dolthub/dolt/go/store/datas" diff --git a/go/libraries/doltcore/env/environment.go b/go/libraries/doltcore/env/environment.go index d9ac340e81..6693c41679 100644 --- a/go/libraries/doltcore/env/environment.go +++ b/go/libraries/doltcore/env/environment.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "errors" "fmt" + "os" "path/filepath" "runtime" "strings" @@ -38,6 +39,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" "github.com/dolthub/dolt/go/libraries/utils/config" "github.com/dolthub/dolt/go/libraries/utils/filesys" + "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" ) @@ -56,6 +58,34 @@ const ( tempTablesDir = "temptf" ) +func getCommitHooks(ctx context.Context, dEnv *DoltEnv) ([]datas.CommitHook, error) { + postCommitHooks := make([]datas.CommitHook, 0) + + backupName := os.Getenv(doltdb.BackupToRemoteKey) + if backupName != "" { + remotes, err := dEnv.GetRemotes() + if err != nil { + return nil, err + } + rem, ok := remotes[backupName] + if !ok { + return nil, ErrRemoteNotFound + } + ddb, err := rem.GetRemoteDB(ctx, types.Format_Default) + + if err != nil { + return nil, err + } + replicateHook := doltdb.NewReplicateHook(ddb, dEnv.TempTableFilesDir()) + if err != nil { + return nil, err + } + postCommitHooks = append(postCommitHooks, replicateHook) + } + + return postCommitHooks, nil +} + var zeroHashStr = (hash.Hash{}).String() var ErrPreexistingDoltDir = errors.New(".dolt dir already exists") @@ -158,6 +188,15 @@ func Load(ctx context.Context, hdp HomeDirProvider, fs filesys.Filesys, urlStr, } } + if dbLoadErr == nil { + postCommitHooks, dbLoadErr := getCommitHooks(ctx, dEnv) + if dbLoadErr != nil { + dEnv.DBLoadError = dbLoadErr + } else { + dEnv.DoltDB.SetCommitHooks(ctx, postCommitHooks) + } + } + return dEnv } diff --git a/go/libraries/doltcore/env/remotes.go b/go/libraries/doltcore/env/remotes.go index 0af18dc8b6..42e7f367d4 100644 --- a/go/libraries/doltcore/env/remotes.go +++ b/go/libraries/doltcore/env/remotes.go @@ -57,6 +57,17 @@ type Remote struct { dialer dbfactory.GRPCDialProvider } +func GetRemote(ctx context.Context, remoteName, remoteUrl string, params map[string]string, dialer dbfactory.GRPCDialProvider) (Remote, *doltdb.DoltDB, error) { + r := NewRemote(remoteName, remoteUrl, params, dialer) + ddb, err := r.GetRemoteDB(ctx, types.Format_Default) + + if err != nil { + return NoRemote, nil, err + } + + return r, ddb, nil +} + func NewRemote(name, url string, params map[string]string, dialer dbfactory.GRPCDialProvider) Remote { return Remote{name, url, []string{"refs/heads/*:refs/remotes/" + name + "/*"}, params, dialer} } diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 1abf4ee56d..a13643c263 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -40,7 +40,6 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/tracing" "github.com/dolthub/dolt/go/store/atomicerr" "github.com/dolthub/dolt/go/store/chunks" - "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nbs" "github.com/dolthub/dolt/go/store/types" @@ -66,7 +65,7 @@ var ErrInvalidDoltSpecPath = errors.New("invalid dolt spec path") var globalHttpFetcher HTTPFetcher = &http.Client{} var _ nbs.TableFileStore = (*DoltChunkStore)(nil) -var _ datas.NBSCompressedChunkStore = (*DoltChunkStore)(nil) +var _ nbs.NBSCompressedChunkStore = (*DoltChunkStore)(nil) var _ chunks.ChunkStore = (*DoltChunkStore)(nil) var _ chunks.LoggingChunkStore = (*DoltChunkStore)(nil) diff --git a/go/libraries/doltcore/sqle/dfunctions/dolt_pull.go b/go/libraries/doltcore/sqle/dfunctions/dolt_pull.go index 375cd7f7cf..b323ac14ef 100644 --- a/go/libraries/doltcore/sqle/dfunctions/dolt_pull.go +++ b/go/libraries/doltcore/sqle/dfunctions/dolt_pull.go @@ -155,10 +155,8 @@ func (d DoltPullFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) { func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) { for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } select { case <-ctx.Done(): @@ -171,10 +169,8 @@ func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) func progFunc(ctx context.Context, progChan <-chan datas.PullProgress) { for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } select { case <-ctx.Done(): diff --git a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go index 442de81239..9bbc6bac48 100644 --- a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go +++ b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go @@ -24,7 +24,8 @@ import ( ) func init() { - sqle.MinRowsPerPartition = 2 + sqle.MinRowsPerPartition = 8 + sqle.MaxRowsPerPartition = 1024 } func TestQueries(t *testing.T) { diff --git a/go/libraries/doltcore/sqle/rows.go b/go/libraries/doltcore/sqle/rows.go index b5765e5db0..66119c8e43 100644 --- a/go/libraries/doltcore/sqle/rows.go +++ b/go/libraries/doltcore/sqle/rows.go @@ -26,6 +26,38 @@ import ( "github.com/dolthub/dolt/go/store/types" ) +var _ sql.RowIter = (*keylessRowIter)(nil) + +type keylessRowIter struct { + keyedIter *DoltMapIter + + cardIdx int + nonCardCols int + + lastRead sql.Row + lastCard uint64 +} + +func (k *keylessRowIter) Next() (sql.Row, error) { + if k.lastCard == 0 { + r, err := k.keyedIter.Next() + + if err != nil { + return nil, err + } + + k.lastCard = r[k.cardIdx].(uint64) + k.lastRead = r[:k.nonCardCols] + } + + k.lastCard-- + return k.lastRead, nil +} + +func (k keylessRowIter) Close(ctx *sql.Context) error { + return k.keyedIter.Close(ctx) +} + // An iterator over the rows of a table. type doltTableRowIter struct { sql.RowIter @@ -43,48 +75,71 @@ func newRowIterator(ctx *sql.Context, tbl *doltdb.Table, projCols []string, part if schema.IsKeyless(sch) { // would be more optimal to project columns into keyless tables also - return newKeylessRowIterator(ctx, tbl, partition) + return newKeylessRowIterator(ctx, tbl, projCols, partition) } else { return newKeyedRowIter(ctx, tbl, projCols, partition) } } -func newKeylessRowIterator(ctx *sql.Context, tbl *doltdb.Table, partition *doltTablePartition) (*doltTableRowIter, error) { - var iter table.SqlTableReader - var err error - if partition.end == NoUpperBound { - iter, err = table.NewBufferedTableReader(ctx, tbl) - } else { - iter, err = table.NewBufferedTableReaderForPartition(ctx, tbl, partition.start, partition.end) - } - +func newKeylessRowIterator(ctx *sql.Context, tbl *doltdb.Table, projectedCols []string, partition *doltTablePartition) (sql.RowIter, error) { + mapIter, err := iterForPartition(ctx, partition) if err != nil { return nil, err } - return &doltTableRowIter{ - ctx: ctx, - reader: iter, + cols, tagToSqlColIdx, err := getTagToResColIdx(ctx, tbl, projectedCols) + if err != nil { + return nil, err + } + + idxOfCardinality := len(cols) + tagToSqlColIdx[schema.KeylessRowCardinalityTag] = idxOfCardinality + + colsCopy := make([]schema.Column, len(cols), len(cols)+1) + copy(colsCopy, cols) + colsCopy = append(colsCopy, schema.NewColumn("__cardinality__", schema.KeylessRowCardinalityTag, types.UintKind, false)) + + conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, colsCopy, len(colsCopy)) + keyedItr, err := NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil + if err != nil { + return nil, err + } + + return &keylessRowIter{ + keyedIter: keyedItr, + cardIdx: idxOfCardinality, + nonCardCols: len(cols), }, nil } func newKeyedRowIter(ctx context.Context, tbl *doltdb.Table, projectedCols []string, partition *doltTablePartition) (sql.RowIter, error) { - var err error - var mapIter types.MapTupleIterator + mapIter, err := iterForPartition(ctx, partition) + if err != nil { + return nil, err + } + + cols, tagToSqlColIdx, err := getTagToResColIdx(ctx, tbl, projectedCols) + if err != nil { + return nil, err + } + + conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, cols, len(cols)) + return NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil +} + +func iterForPartition(ctx context.Context, partition *doltTablePartition) (types.MapTupleIterator, error) { rowData := partition.rowData if partition.end == NoUpperBound { - mapIter, err = rowData.RangeIterator(ctx, 0, rowData.Len()) + return rowData.RangeIterator(ctx, 0, rowData.Len()) } else { - mapIter, err = partition.IteratorForPartition(ctx, rowData) - } - - if err != nil { - return nil, err + return partition.IteratorForPartition(ctx, rowData) } +} +func getTagToResColIdx(ctx context.Context, tbl *doltdb.Table, projectedCols []string) ([]schema.Column, map[uint64]int, error) { sch, err := tbl.GetSchema(ctx) if err != nil { - return nil, err + return nil, nil, err } cols := sch.GetAllCols().GetColumns() @@ -96,9 +151,7 @@ func newKeyedRowIter(ctx context.Context, tbl *doltdb.Table, projectedCols []str tagToSqlColIdx[col.Tag] = i } } - - conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, cols, len(cols)) - return NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil + return cols, tagToSqlColIdx, nil } // Next returns the next row in this row iterator, or an io.EOF error if there aren't any more. diff --git a/go/libraries/doltcore/sqle/tables.go b/go/libraries/doltcore/sqle/tables.go index b1786ed197..243c15260b 100644 --- a/go/libraries/doltcore/sqle/tables.go +++ b/go/libraries/doltcore/sqle/tables.go @@ -47,6 +47,7 @@ const ( partitionMultiplier = 2.0 ) +var MaxRowsPerPartition uint64 = 32 * 1024 var MinRowsPerPartition uint64 = 1024 func init() { @@ -358,15 +359,20 @@ func (t *DoltTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error) { return newDoltTablePartitionIter(rowData, doltTablePartition{0, 0, rowData}), nil } - maxPartitions := uint64(partitionMultiplier * runtime.NumCPU()) - numPartitions := (numElements / MinRowsPerPartition) + 1 + itemsPerPartition := MaxRowsPerPartition + numPartitions := (numElements / itemsPerPartition) + 1 + if numPartitions < uint64(partitionMultiplier*runtime.NumCPU()) { + itemsPerPartition = numElements / uint64(partitionMultiplier*runtime.NumCPU()) - if numPartitions > maxPartitions { - numPartitions = maxPartitions + if itemsPerPartition == 0 { + itemsPerPartition = numElements + numPartitions = 1 + } else { + numPartitions = (numElements / itemsPerPartition) + 1 + } } partitions := make([]doltTablePartition, numPartitions) - itemsPerPartition := numElements / numPartitions for i := uint64(0); i < numPartitions-1; i++ { partitions[i] = doltTablePartition{i * itemsPerPartition, (i + 1) * itemsPerPartition, rowData} } diff --git a/go/store/datas/database.go b/go/store/datas/database.go index 8959125cf9..8f121d12c9 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -169,6 +169,14 @@ type Database interface { // level detail of the database that should infrequently be needed by // clients. chunkStore() chunks.ChunkStore + + // SetCommitHooks attaches a list of CommitHook that can be executed + // after CommitWithWorkingSet + SetCommitHooks(context.Context, []CommitHook) *database + + // WithCommitHookLogger passes an error handler from the user-facing session + // to a commit hook executed at the datas layer + SetCommitHookLogger(context.Context, io.Writer) *database } func NewDatabase(cs chunks.ChunkStore) Database { diff --git a/go/store/datas/database_common.go b/go/store/datas/database_common.go index a628b816df..e97c1a4e7d 100644 --- a/go/store/datas/database_common.go +++ b/go/store/datas/database_common.go @@ -25,6 +25,7 @@ import ( "context" "errors" "fmt" + "io" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/d" @@ -36,7 +37,8 @@ import ( type database struct { *types.ValueStore - rt rootTracker + rt rootTracker + postCommitHooks []CommitHook } var ( @@ -44,6 +46,16 @@ var ( ErrMergeNeeded = errors.New("dataset head is not ancestor of commit") ) +// CommitHook is an abstraction for executing arbitrary commands after atomic database commits +type CommitHook interface { + // Execute is arbitrary read-only function whose arguments are new Dataset commit into a specific Database + Execute(ctx context.Context, ds Dataset, db Database) error + // HandleError is an bridge function to handle Execute errors + HandleError(ctx context.Context, err error) error + // SetLogger lets clients specify an output stream for HandleError + SetLogger(ctx context.Context, wr io.Writer) error +} + // TODO: fix panics // rootTracker is a narrowing of the ChunkStore interface, to keep Database disciplined about working directly with Chunks type rootTracker interface { @@ -771,6 +783,8 @@ func (db *database) CommitWithWorkingSet( return Dataset{}, Dataset{}, err } + db.callCommitHooks(ctx, commitDS) + return commitDS, workingSetDS, nil } @@ -963,10 +977,35 @@ func buildNewCommit(ctx context.Context, ds Dataset, v types.Value, opts CommitO func (db *database) doHeadUpdate(ctx context.Context, ds Dataset, updateFunc func(ds Dataset) error) (Dataset, error) { err := updateFunc(ds) - if err != nil { return Dataset{}, err } return db.GetDataset(ctx, ds.ID()) } + +func (db *database) SetCommitHooks(ctx context.Context, postHooks []CommitHook) *database { + db.postCommitHooks = postHooks + return db +} + +func (db *database) SetCommitHookLogger(ctx context.Context, wr io.Writer) *database { + for _, h := range db.postCommitHooks { + h.SetLogger(ctx, wr) + } + return db +} + +func (db *database) PostCommitHooks() []CommitHook { + return db.postCommitHooks +} + +func (db *database) callCommitHooks(ctx context.Context, ds Dataset) { + var err error + for _, hook := range db.postCommitHooks { + err = hook.Execute(ctx, ds, db) + if err != nil { + hook.HandleError(ctx, err) + } + } +} diff --git a/go/store/datas/puller.go b/go/store/datas/puller.go index 523af57f24..36913e0224 100644 --- a/go/store/datas/puller.go +++ b/go/store/datas/puller.go @@ -59,17 +59,12 @@ type CmpChnkAndRefs struct { refs map[hash.Hash]int } -type NBSCompressedChunkStore interface { - chunks.ChunkStore - GetManyCompressed(context.Context, hash.HashSet, func(nbs.CompressedChunk)) error -} - // Puller is used to sync data between to Databases type Puller struct { fmt *types.NomsBinFormat srcDB Database - srcChunkStore NBSCompressedChunkStore + srcChunkStore nbs.NBSCompressedChunkStore sinkDBCS chunks.ChunkStore rootChunkHash hash.Hash downloaded hash.HashSet @@ -158,7 +153,7 @@ func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcDB, sink return nil, fmt.Errorf("cannot pull from src to sink; src version is %v and sink version is %v", srcDB.chunkStore().Version(), sinkDB.chunkStore().Version()) } - srcChunkStore, ok := srcDB.chunkStore().(NBSCompressedChunkStore) + srcChunkStore, ok := srcDB.chunkStore().(nbs.NBSCompressedChunkStore) if !ok { return nil, ErrIncompatibleSourceChunkStore } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 454c7f19d2..3ce18484ef 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -81,6 +81,11 @@ func makeGlobalCaches() { makeManifestManager = func(m manifest) manifestManager { return manifestManager{m, manifestCache, manifestLocks} } } +type NBSCompressedChunkStore interface { + chunks.ChunkStore + GetManyCompressed(context.Context, hash.HashSet, func(CompressedChunk)) error +} + type NomsBlockStore struct { mm manifestManager p tablePersister diff --git a/integration-tests/bats/auto_increment.bats b/integration-tests/bats/auto_increment.bats index d691108a0f..039259ab78 100644 --- a/integration-tests/bats/auto_increment.bats +++ b/integration-tests/bats/auto_increment.bats @@ -23,13 +23,13 @@ teardown() { run dolt sql -q "INSERT INTO test (c0) VALUES (44);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 = 44;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 = 44 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "4,44" ]] || false run dolt sql -q "INSERT INTO test (c0) VALUES (55),(66);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 50;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 50 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "5,55" ]] || false [[ "$output" =~ "6,66" ]] || false @@ -45,7 +45,7 @@ CREATE TABLE ai ( INSERT INTO ai VALUES (NULL,1),(NULL,2),(NULL,3); SQL [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM ai;" -r csv + run dolt sql -q "SELECT * FROM ai ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2,2" ]] || false @@ -55,7 +55,7 @@ SQL @test "auto_increment: insert into empty auto_increment table" { run dolt sql -q "INSERT INTO test (c0) VALUES (1);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 = 1;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 = 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false } @@ -67,7 +67,7 @@ SQL [ "$status" -eq 0 ] run dolt sql -q "INSERT INTO test VALUES (2,2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2,2" ]] || false @@ -80,13 +80,14 @@ SQL run dolt sql -q "INSERT INTO test (pk,c0) VALUES (NULL,2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 1;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "2,2" ]] || false run dolt sql -q "INSERT INTO test VALUES (NULL,3), (10,10), (NULL,11);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 2;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 2 ORDER BY pk;" -r csv + echo $output [ "$status" -eq 0 ] [[ "$output" =~ "3,3" ]] || false [[ "$output" =~ "10,10" ]] || false @@ -98,13 +99,13 @@ SQL run dolt sql -q "INSERT INTO test (pk,c0) VALUES (0,2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 1;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "2,2" ]] || false run dolt sql -q "INSERT INTO test VALUES (0,3), (10,10), (0,11);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 2;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 2 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "3,3" ]] || false [[ "$output" =~ "10,10" ]] || false @@ -121,7 +122,7 @@ INSERT INTO test (c0) VALUES (21); SQL [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2,2" ]] || false @@ -143,7 +144,7 @@ INSERT INTO test2 SELECT (pk + 20), c0 FROM test2; SQL [ "$status" -eq 0 ] - run dolt sql -q "select * from test2 order by pk" -r csv + run dolt sql -q "select * from test2 ORDER BY pk" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2,2" ]] || false @@ -170,7 +171,7 @@ SQL dolt sql -q "INSERT INTO auto_float (pk, c0) VALUES (3.9,4);" dolt sql -q "INSERT INTO auto_float (c0) VALUES (5);" - run dolt sql -q "SELECT * FROM auto_float;" -r csv + run dolt sql -q "SELECT * FROM auto_float ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2.1,2" ]] || false @@ -193,7 +194,7 @@ SQL echo "$TYPE" run dolt sql -q "INSERT INTO auto_$TYPE (c0) VALUES (2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM auto_$TYPE WHERE c0 > 1;" -r csv + run dolt sql -q "SELECT * FROM auto_$TYPE WHERE c0 > 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "2,2" ]] || false done @@ -211,7 +212,7 @@ SQL echo "$TYPE" run dolt sql -q "INSERT INTO auto2_$TYPE (c0) VALUES (2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM auto2_$TYPE WHERE c0 > 1;" -r csv + run dolt sql -q "SELECT * FROM auto2_$TYPE WHERE c0 > 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "2,2" ]] || false done @@ -248,7 +249,7 @@ SQL dolt merge other dolt sql -q "INSERT INTO test VALUES (NULL,22);" - run dolt sql -q "SELECT pk FROM test WHERE c0 = 22;" -r csv + run dolt sql -q "SELECT pk FROM test WHERE c0 = 22 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "22" ]] || false } @@ -271,7 +272,7 @@ SQL dolt checkout master dolt merge other dolt sql -q "INSERT INTO test VALUES (NULL,22);" - run dolt sql -q "SELECT pk FROM test WHERE c0 = 22;" -r csv + run dolt sql -q "SELECT pk FROM test WHERE c0 = 22 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "22" ]] || false } @@ -280,7 +281,7 @@ SQL run dolt sql -q "ALTER TABLE test AUTO_INCREMENT = 10;" [ "$status" -eq 0 ] dolt sql -q "INSERT INTO test VALUES (NULL,10);" - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "10,10" ]] || false @@ -288,7 +289,7 @@ SQL ALTER TABLE test AUTO_INCREMENT = 20; INSERT INTO test VALUES (NULL,20),(30,30),(NULL,31); SQL - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "10,10" ]] || false [[ "$output" =~ "20,20" ]] || false @@ -310,7 +311,7 @@ ALTER TABLE index_test ADD INDEX (c0); INSERT INTO index_test (c0) VALUES (4),(5),(6); SQL - run dolt sql -q "select * from index_test" -r csv + run dolt sql -q "select * from index_test ORDER BY pk" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -329,7 +330,7 @@ SQL dolt sql -q "INSERT INTO test (c0) SELECT pk FROM other;" - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -351,7 +352,7 @@ TRUNCATE t; INSERT INTO t (c0) VALUES (1),(2),(3); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -370,7 +371,7 @@ INSERT INTO t (c0) VALUES (1),(2),(3); INSERT INTO t (c0) VALUES (4),(5),(6); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -393,7 +394,7 @@ INSERT INTO t VALUES (4, 4); INSERT INTO t (c0) VALUES (5),(6),(7); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -419,7 +420,7 @@ INSERT into t VALUES (3, 3); INSERT INTO t (c0) VALUES (8); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -451,7 +452,7 @@ SELECT DOLT_MERGE('test'); INSERT INTO t VALUES (NULL,5),(6,6),(NULL,7); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -482,7 +483,7 @@ SELECT DOLT_MERGE('test'); INSERT INTO t VALUES (10,10),(NULL,11); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -515,7 +516,7 @@ SELECT DOLT_MERGE('test'); INSERT INTO t VALUES (3,3),(NULL,6); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -548,7 +549,7 @@ SELECT DOLT_MERGE('test'); INSERT INTO t VALUES (NULL,6); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -564,7 +565,7 @@ SQL [ "$status" -eq 0 ] dolt sql -q "insert into test2 values (0, 'john', 0)" - run dolt sql -q "SELECT * from test2" -r csv + run dolt sql -q "SELECT * from test2 ORDER BY pk" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "0,john,0" ]] || false } @@ -574,7 +575,7 @@ SQL dolt sql -q "ALTER TABLE t CHANGE COLUMN pk pk int NOT NULL AUTO_INCREMENT PRIMARY KEY;" dolt sql -q 'insert into t values (NULL), (NULL), (NULL)' - run dolt sql -q "SELECT * FROM t" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk" -r csv [[ "${lines[0]}" =~ "pk" ]] || false [[ "${lines[1]}" =~ "1" ]] || false [[ "${lines[2]}" =~ "2" ]] || false diff --git a/integration-tests/bats/helper/index-on-writes-common.bash b/integration-tests/bats/helper/index-on-writes-common.bash index 21f93e5336..036918fd90 100644 --- a/integration-tests/bats/helper/index-on-writes-common.bash +++ b/integration-tests/bats/helper/index-on-writes-common.bash @@ -100,12 +100,12 @@ test_mutation() { expected="$3" uses_pk="$4" dolt sql -q "$dml" - run dolt sql -q "select * from $table" -r csv + run dolt sql -q "select * from $table ORDER BY pk1" -r csv [ "$status" -eq "0" ] [ "$output" == "$expected" ] || (echo $output && exit 1) dolt reset --hard dolt sql --batch -q "$dml ; $dml" - run dolt sql -q "select * from $table" -r csv + run dolt sql -q "select * from $table ORDER BY pk1" -r csv [ "$status" -eq "0" ] [ "$output" == "$expected" ] || (echo $output && exit 1) run dolt sql -q "explain $dml" diff --git a/integration-tests/bats/import-create-tables.bats b/integration-tests/bats/import-create-tables.bats index 828a853752..d4e48cd166 100755 --- a/integration-tests/bats/import-create-tables.bats +++ b/integration-tests/bats/import-create-tables.bats @@ -392,7 +392,7 @@ SQL run dolt ls [ "$status" -eq 0 ] [[ "$output" =~ "test" ]] || false - run dolt sql -q "select * from test" + run dolt sql -q "select * from test ORDER BY pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 11 ] [ "${lines[3]}" = '| a | "" | 1 |' ] @@ -418,7 +418,7 @@ SQL [ "$status" -eq 0 ] # schema argument subsets the data and adds empty column - run dolt sql -r csv -q "select * from subset" + run dolt sql -r csv -q "select * from subset ORDER BY pk" [ "$status" -eq 0 ] [ "${lines[0]}" = "pk,c1,c3,noData" ] [ "${lines[1]}" = "0,1,3," ] @@ -440,7 +440,7 @@ SQL run dolt ls [ "$status" -eq 0 ] [[ "$output" =~ "empty_strings_null_values" ]] || false - run dolt sql -q "select * from empty_strings_null_values" + run dolt sql -q "select * from empty_strings_null_values ORDER BY pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 11 ] [ "${lines[3]}" = '| a | "" | 1 |' ] @@ -467,7 +467,7 @@ SQL run dolt ls [ "$status" -eq 0 ] [[ "$output" =~ "test" ]] || false - run dolt sql -q "select * from test" + run dolt sql -q "select * from test ORDER BY pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 11 ] [ "${lines[3]}" = '| a | "" | 1 |' ] diff --git a/integration-tests/bats/index.bats b/integration-tests/bats/index.bats index 6a366212af..91f6493431 100644 --- a/integration-tests/bats/index.bats +++ b/integration-tests/bats/index.bats @@ -2025,13 +2025,13 @@ CREATE TABLE test2 ( INSERT INTO test VALUES (0, NULL), (1, NULL), (2, NULL); INSERT INTO test2 VALUES (0, NULL, NULL), (1, NULL, NULL), (2, 1, NULL), (3, 1, NULL), (4, NULL, 1), (5, NULL, 1); SQL - run dolt sql -q "SELECT * FROM test" -r=json + run dolt sql -q "SELECT * FROM test order by pk" -r=json [ "$status" -eq "0" ] [[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2}]}' ]] || false - run dolt sql -q "SELECT * FROM test WHERE v1 IS NULL" -r=json + run dolt sql -q "SELECT * FROM test WHERE v1 IS NULL order by pk" -r=json [ "$status" -eq "0" ] [[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2}]}' ]] || false - run dolt sql -q "SELECT * FROM test2" -r=json + run dolt sql -q "SELECT * FROM test2 order by pk" -r=json [ "$status" -eq "0" ] [[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2,"v1":1},{"pk":3,"v1":1},{"pk":4,"v2":1},{"pk":5,"v2":1}]}' ]] || false } diff --git a/integration-tests/bats/merge.bats b/integration-tests/bats/merge.bats index 55af6d9f8b..6d66c269ce 100644 --- a/integration-tests/bats/merge.bats +++ b/integration-tests/bats/merge.bats @@ -336,7 +336,7 @@ SQL dolt checkout master run dolt merge other [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM quiz;" -r csv + run dolt sql -q "SELECT * FROM quiz ORDER BY pk;" -r csv [[ "${lines[0]}" =~ "pk" ]] || false [[ "${lines[1]}" =~ "10" ]] || false [[ "${lines[2]}" =~ "11" ]] || false diff --git a/integration-tests/bats/query-catalog.bats b/integration-tests/bats/query-catalog.bats index facad96269..2288ccc7a1 100644 --- a/integration-tests/bats/query-catalog.bats +++ b/integration-tests/bats/query-catalog.bats @@ -100,7 +100,7 @@ teardown() { @test "query-catalog: executed saved" { Q1="select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 order by 1" - Q2="select pk from one_pk" + Q2="select pk from one_pk order by pk" dolt sql -q "$Q1" -s name1 dolt sql -q "$Q2" -s name2 @@ -138,7 +138,7 @@ EOF EXPECTED=$(cat <<'EOF' id,display_order,name,query,description name1,1,name1,"select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 order by 1","" -name2,2,name2,select pk from one_pk,"" +name2,2,name2,select pk from one_pk order by pk,"" EOF ) @@ -154,7 +154,7 @@ EOF EXPECTED=$(cat <<'EOF' id,display_order,name,query,description name1,1,name1,"select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 and pk < 3 order by 1 desc","" -name2,2,name2,select pk from one_pk,"" +name2,2,name2,select pk from one_pk order by pk,"" EOF ) diff --git a/integration-tests/bats/replication.bats b/integration-tests/bats/replication.bats new file mode 100644 index 0000000000..44c3edc337 --- /dev/null +++ b/integration-tests/bats/replication.bats @@ -0,0 +1,54 @@ +#!/usr/bin/env bats +load $BATS_TEST_DIRNAME/helper/common.bash + +setup() { + setup_common + TMPDIRS=$(pwd)/tmpdirs + mkdir -p $TMPDIRS/{bac1,repo1} + + # repo1 -> bac1 -> repo2 + cd $TMPDIRS/repo1 + dolt init + dolt branch feature + dolt remote add backup1 file://../bac1 + cd $TMPDIRS +} + +teardown() { + teardown_common + rm -rf $TMPDIRS + cd $BATS_TMPDIR +} + +@test "replication: default no replication" { + cd repo1 + dolt sql -q "create table t1 (a int primary key)" + dolt commit -am "cm" + + [ ! -d "../bac1/.dolt" ] || false +} + +@test "replication: push on commit" { + export DOLT_BACKUP_TO_REMOTE=backup1 + cd repo1 + dolt remote -v + dolt sql -q "create table t1 (a int primary key)" + dolt commit -am "cm" + + cd .. + dolt clone file://./bac1 repo2 + export DOLT_BACKUP_TO_REMOTE= + cd repo2 + run dolt ls + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 2 ] + [[ "$output" =~ "t1" ]] || false +} + +@test "replication: no tags" { + export DOLT_BACKUP_TO_REMOTE=backup1 + cd repo1 + dolt tag + + [ ! -d "../bac1/.dolt" ] || false +} diff --git a/integration-tests/bats/sql-server.bats b/integration-tests/bats/sql-server.bats index 6c7739d5cd..a2a1912cdd 100644 --- a/integration-tests/bats/sql-server.bats +++ b/integration-tests/bats/sql-server.bats @@ -683,7 +683,7 @@ SQL SELECT DOLT_MERGE('feature-branch'); " - server_query repo1 1 "SELECT * FROM test" "pk\n1\n2\n3\n1000" + server_query repo1 1 "SELECT * FROM test ORDER BY pk" "pk\n1\n2\n3\n1000" server_query repo1 1 "SELECT COUNT(*) FROM dolt_log" "COUNT(*)\n3" } @@ -1038,8 +1038,6 @@ while True: dolt remote add origin file://../rem1 start_sql_server repo1 - dolt status - dolt branch dolt push origin master run server_query repo1 1 "select dolt_push() as p" "p\n0" [ "$status" -eq 1 ] @@ -1050,3 +1048,32 @@ while True: skip "In-memory branch doesn't track upstream correctly" server_query repo1 1 "select dolt_push() as p" "p\n1" } + +@test "sql-server: replicate to backup after sql-session commit" { + skiponwindows "Has dependencies that are missing on the Jenkins Windows installation." + + mkdir bac1 + cd repo1 + dolt remote add backup1 file://../bac1 + export DOLT_BACKUP_TO_REMOTE=backup1 + start_sql_server repo1 + + multi_query repo1 1 " + CREATE TABLE test ( + pk int primary key + ); + INSERT INTO test VALUES (0),(1),(2); + SELECT DOLT_ADD('.'); + SELECT DOLT_COMMIT('-m', 'Step 1');" + + cd .. + dolt clone file://./bac1 repo3 + cd repo3 + export DOLT_BACKUP_TO_REMOTE= + run dolt sql -q "select * from test" -r csv + [ "$status" -eq 0 ] + [[ "${lines[0]}" =~ "pk" ]] + [[ "${lines[1]}" =~ "0" ]] + [[ "${lines[2]}" =~ "1" ]] + [[ "${lines[3]}" =~ "2" ]] +} diff --git a/integration-tests/bats/sql.bats b/integration-tests/bats/sql.bats index f712790e38..0682364e8f 100755 --- a/integration-tests/bats/sql.bats +++ b/integration-tests/bats/sql.bats @@ -324,23 +324,23 @@ SQL } @test "sql: basic inner join" { - run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1" + run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 order by pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 8 ] first_join_output=$output - run dolt sql -q "select pk,pk1,pk2 from two_pk join one_pk on one_pk.c1=two_pk.c1" + run dolt sql -q "select pk,pk1,pk2 from two_pk join one_pk on one_pk.c1=two_pk.c1 order by pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 8 ] [ "$output" = "$first_join_output" ] - run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 where pk=1" + run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 where pk=1 order by pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 5 ] - run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1" + run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 order by pk" [ "$status" -eq 0 ] [[ "$output" =~ foo ]] || false [[ "$output" =~ bar ]] || false [ "${#lines[@]}" -eq 8 ] - run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 where one_pk.c1=10" + run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 where one_pk.c1=10 order by pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 5 ] [[ "$output" =~ "10" ]] || false