Merge remote-tracking branch 'origin/main' into aaron/sql-cluster-status-procedure

This commit is contained in:
Aaron Son
2022-09-29 10:10:41 -07:00
27 changed files with 1330 additions and 62 deletions

View File

@@ -219,7 +219,7 @@ MySQL comes with a MySQL server called `mysqld` and a MySQL client called `mysql
mysql Ver 8.0.29 for macos12.2 on x86_64 (Homebrew)
```
Now, to connect the `mysql` client to Dolt, you have to force the MySQL client through the TCP interface by passing in a host and port. The default is the socket interface which Dolt supports, but is not on by default. The MySQL client also requires you specify a user, in this case `root`.
Now, to connect the `mysql` client to Dolt, you are going to force the MySQL client through the TCP interface by passing in a host and port. The default is the socket interface which Dolt supports, but is only available on `localhost`. So, it's better to show off the TCP interface. The MySQL client also requires you specify a user, in this case `root`.
```bash
% mysql --host 127.0.0.1 --port 3306 -uroot

View File

@@ -104,6 +104,8 @@ const (
CommitFlag = "commit"
NoCommitFlag = "no-commit"
NoEditFlag = "no-edit"
OursFlag = "ours"
TheirsFlag = "theirs"
)
const (
@@ -134,6 +136,13 @@ func CreateCommitArgParser() *argparser.ArgParser {
return ap
}
func CreateConflictsResolveArgParser() *argparser.ArgParser {
ap := argparser.NewArgParser()
ap.SupportsFlag(OursFlag, "", "For all conflicts, take the version from our branch and resolve the conflict")
ap.SupportsFlag(TheirsFlag, "", "For all conflicts, take the version from their branch and resolve the conflict")
return ap
}
func CreateMergeArgParser() *argparser.ArgParser {
ap := argparser.NewArgParser()
ap.SupportsFlag(NoFFParam, "", "Create a merge commit even when the merge resolves as a fast-forward.")

View File

@@ -57,7 +57,7 @@ import (
)
const (
Version = "0.41.6"
Version = "0.41.7"
)
var dumpDocsCommand = &commands.DumpDocsCmd{}

View File

@@ -21,11 +21,12 @@
package remotesapi
import (
reflect "reflect"
sync "sync"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
const (

View File

@@ -8,6 +8,7 @@ package remotesapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"

View File

@@ -16,7 +16,7 @@ require (
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
github.com/dolthub/vitess v0.0.0-20220927165657-8eb73ed0ff24
github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e
github.com/dustin/go-humanize v1.0.0
github.com/fatih/color v1.13.0
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
@@ -57,7 +57,7 @@ require (
require (
github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
github.com/cenkalti/backoff/v4 v4.1.3
github.com/dolthub/go-mysql-server v0.12.1-0.20220927222348-8b178aa50764
github.com/dolthub/go-mysql-server v0.12.1-0.20220929062247-323a847921de
github.com/google/flatbuffers v2.0.6+incompatible
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6
github.com/mitchellh/go-ps v1.0.0

View File

@@ -178,8 +178,8 @@ github.com/dolthub/flatbuffers v1.13.0-dh.1 h1:OWJdaPep22N52O/0xsUevxJ6Qfw1M2txC
github.com/dolthub/flatbuffers v1.13.0-dh.1/go.mod h1:CorYGaDmXjHz1Z7i50PYXG1Ricn31GcA2wNOTFIQAKE=
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-mysql-server v0.12.1-0.20220927222348-8b178aa50764 h1:7cJCBFyyHl1pGrNgME77mUkkzlSZ94jo3vZOQb/xan4=
github.com/dolthub/go-mysql-server v0.12.1-0.20220927222348-8b178aa50764/go.mod h1:8zHF9V5MPmb3dWB2kGjeltnHQc15QdynK9GnwSutreA=
github.com/dolthub/go-mysql-server v0.12.1-0.20220929062247-323a847921de h1:YkKR9AOt/Mta3suApA5bEwwTF/GbdYLva3zVbP0lxi0=
github.com/dolthub/go-mysql-server v0.12.1-0.20220929062247-323a847921de/go.mod h1:Ndof+jmKE/AISRWgeyx+RUvNlAtMOPSUzTM/iCOfx70=
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371 h1:oyPHJlzumKta1vnOQqUnfdz+pk3EmnHS3Nd0cCT0I2g=
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=
@@ -188,8 +188,8 @@ github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 h1:WRPDbpJWEnPxP
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66/go.mod h1:N5ZIbMGuDUpTpOFQ7HcsN6WSIpTGQjHP+Mz27AfmAgk=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
github.com/dolthub/vitess v0.0.0-20220927165657-8eb73ed0ff24 h1:2ARLp21egNaEvWIN4/tfprcyAlWsRj4bdey+Hv7HMEM=
github.com/dolthub/vitess v0.0.0-20220927165657-8eb73ed0ff24/go.mod h1:oVFIBdqMFEkt4Xz2fzFJBNtzKhDEjwdCF0dzde39iKs=
github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e h1:vC5OgmUm1Dd8vQP1YqgRvYMbHvrLNdQkd3S7udbS3BQ=
github.com/dolthub/vitess v0.0.0-20220929061157-c71cf6a7768e/go.mod h1:oVFIBdqMFEkt4Xz2fzFJBNtzKhDEjwdCF0dzde39iKs=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=

View File

@@ -52,8 +52,8 @@ type ConflictReader struct {
}
// NewConflictReader returns a new conflict reader for a given table
func NewConflictReader(ctx context.Context, tbl *doltdb.Table) (*ConflictReader, error) {
base, sch, mergeSch, err := tbl.GetConflictSchemas(ctx, "") // tblName unused by old storage format
func NewConflictReader(ctx context.Context, tbl *doltdb.Table, tblName string) (*ConflictReader, error) {
base, sch, mergeSch, err := tbl.GetConflictSchemas(ctx, tblName) // tblName unused by old storage format
if err != nil {
return nil, err
}

View File

@@ -253,7 +253,7 @@ func updateProllySecondaryIndexes(ctx context.Context, tm TableMerger, cellWiseE
if err != nil {
return nil, nil, err
}
lm, err := getMutableSecondaryIdxs(ctx, tm.leftSch, ls)
lm, err := GetMutableSecondaryIdxs(ctx, tm.leftSch, ls)
if err != nil {
return nil, nil, err
}
@@ -262,7 +262,7 @@ func updateProllySecondaryIndexes(ctx context.Context, tm TableMerger, cellWiseE
if err != nil {
return nil, nil, err
}
rm, err := getMutableSecondaryIdxs(ctx, tm.rightSch, rs)
rm, err := GetMutableSecondaryIdxs(ctx, tm.rightSch, rs)
if err != nil {
return nil, nil, err
}

View File

@@ -26,8 +26,8 @@ import (
"github.com/dolthub/dolt/go/store/val"
)
// getMutableSecondaryIdxs returns a MutableSecondaryIdx for each secondary index in |indexes|.
func getMutableSecondaryIdxs(ctx context.Context, sch schema.Schema, indexes durable.IndexSet) ([]MutableSecondaryIdx, error) {
// GetMutableSecondaryIdxs returns a MutableSecondaryIdx for each secondary index in |indexes|.
func GetMutableSecondaryIdxs(ctx context.Context, sch schema.Schema, indexes durable.IndexSet) ([]MutableSecondaryIdx, error) {
mods := make([]MutableSecondaryIdx, sch.Indexes().Count())
for i, index := range sch.Indexes().AllIndexes() {
idx, err := indexes.GetIndex(ctx, sch, index.Name())

View File

@@ -20,6 +20,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/dolthub/go-mysql-server/sql"
@@ -34,7 +35,7 @@ var _ doltdb.CommitHook = (*commithook)(nil)
type commithook struct {
rootLgr *logrus.Entry
lgr *logrus.Entry
lgr atomic.Value // *logrus.Entry
remotename string
dbname string
lout io.Writer
@@ -62,12 +63,15 @@ type commithook struct {
tempDir string
}
var errDestDBRootHashMoved error = errors.New("sqle: cluster: standby replication: destination database root hash moved during our write, while it is assumed we are the only writer.")
var errDestDBRootHashMoved error = errors.New("cluster/commithook: standby replication: destination database root hash moved during our write, while it is assumed we are the only writer.")
const logFieldThread = "thread"
const logFieldRole = "role"
func newCommitHook(lgr *logrus.Logger, remotename, dbname string, role Role, destDBF func(context.Context) (*doltdb.DoltDB, error), srcDB *doltdb.DoltDB, tempDir string) *commithook {
var ret commithook
ret.rootLgr = lgr.WithField("thread", "Standby Replication - "+dbname+" to "+remotename)
ret.lgr = ret.rootLgr.WithField("role", string(role))
ret.rootLgr = lgr.WithField(logFieldThread, "Standby Replication - "+dbname+" to "+remotename)
ret.lgr.Store(ret.rootLgr.WithField(logFieldRole, string(role)))
ret.remotename = remotename
ret.dbname = dbname
ret.role = role
@@ -82,15 +86,29 @@ func (h *commithook) Run(bt *sql.BackgroundThreads) error {
return bt.Add("Standby Replication - "+h.dbname+" to "+h.remotename, h.run)
}
func (h *commithook) run(ctx context.Context) {
// The hook comes up attempting to replicate the current head.
h.logger().Tracef("cluster/commithook: background thread: running.")
h.wg.Add(2)
go h.replicate(ctx)
go h.tick(ctx)
<-ctx.Done()
h.logger().Tracef("cluster/commithook: background thread: requested shutdown, signaling replication thread.")
h.cond.Signal()
h.wg.Wait()
h.logger().Tracef("cluster/commithook: background thread: completed.")
}
func (h *commithook) replicate(ctx context.Context) {
defer h.wg.Done()
defer h.lgr.Tracef("cluster/commithook: background thread: replicate: shutdown.")
defer h.logger().Tracef("cluster/commithook: background thread: replicate: shutdown.")
h.mu.Lock()
defer h.mu.Unlock()
for {
lgr := h.logger()
// Shutdown for context canceled.
if ctx.Err() != nil {
h.lgr.Tracef("cluster/commithook replicate thread exiting; saw ctx.Err(): %v", ctx.Err())
lgr.Tracef("cluster/commithook replicate thread exiting; saw ctx.Err(): %v", ctx.Err())
if h.shouldReplicate() {
// attempt a last true-up of our standby as we shutdown
// TODO: context.WithDeadline based on config / convention?
@@ -98,8 +116,8 @@ func (h *commithook) replicate(ctx context.Context) {
}
return
}
if h.role == RolePrimary && h.nextHead == (hash.Hash{}) {
h.lgr.Tracef("cluster/commithook: fetching current head.")
if h.primaryNeedsInit() {
lgr.Tracef("cluster/commithook: fetching current head.")
// When the replicate thread comes up, it attempts to replicate the current head.
datasDB := doltdb.HackDatasDatabaseFromDoltDB(h.srcDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
@@ -107,19 +125,19 @@ func (h *commithook) replicate(ctx context.Context) {
h.nextHead, err = cs.Root(ctx)
if err != nil {
// TODO: if err != nil, something is really wrong; should shutdown or backoff.
h.lgr.Warningf("standby replication thread failed to load database root: %v", err)
lgr.Warningf("standby replication thread failed to load database root: %v", err)
h.nextHead = hash.Hash{}
}
// We do not know when this head was written, but we
// starting trying to replicate it now.
// are starting to try to replicate it now.
h.nextHeadIncomingTime = time.Now()
} else if h.shouldReplicate() {
h.attemptReplicate(ctx)
} else {
h.lgr.Tracef("cluster/commithook: background thread: waiting for signal.")
lgr.Tracef("cluster/commithook: background thread: waiting for signal.")
h.cond.Wait()
h.lgr.Tracef("cluster/commithook: background thread: woken up.")
lgr.Tracef("cluster/commithook: background thread: woken up.")
}
}
}
@@ -135,25 +153,31 @@ func (h *commithook) shouldReplicate() bool {
return (h.nextPushAttempt == (time.Time{}) || time.Now().After(h.nextPushAttempt))
}
// called with h.mu locked.
func (h *commithook) primaryNeedsInit() bool {
return h.role == RolePrimary && h.nextHead == (hash.Hash{})
}
// Called by the replicate thread to push the nextHead to the destDB and set
// its root to the new value.
//
// preconditions: h.mu is locked and shouldReplicate() returned true.
// when this function returns, h.mu is locked.
func (h *commithook) attemptReplicate(ctx context.Context) {
lgr := h.logger()
toPush := h.nextHead
incomingTime := h.nextHeadIncomingTime
destDB := h.destDB
h.mu.Unlock()
if destDB == nil {
h.lgr.Tracef("cluster/commithook: attempting to fetch destDB.")
lgr.Tracef("cluster/commithook: attempting to fetch destDB.")
var err error
destDB, err = h.destDBF(ctx)
if err != nil {
h.currentError = new(string)
*h.currentError = fmt.Sprintf("could not replicate to standby: error fetching destDB: %v", err)
h.lgr.Warnf("cluster/commithook: could not replicate to standby: error fetching destDB: %v", err)
lgr.Warnf("cluster/commithook: could not replicate to standby: error fetching destDB: %v.", err)
h.mu.Lock()
// TODO: We could add some backoff here.
if toPush == h.nextHead {
@@ -161,16 +185,16 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
}
return
}
h.lgr.Tracef("cluster/commithook: fetched destDB")
lgr.Tracef("cluster/commithook: fetched destDB")
h.mu.Lock()
h.destDB = destDB
h.mu.Unlock()
}
h.lgr.Tracef("cluster/commithook: pushing chunks for root hash %v to destDB", toPush.String())
lgr.Tracef("cluster/commithook: pushing chunks for root hash %v to destDB", toPush.String())
err := destDB.PullChunks(ctx, h.tempDir, h.srcDB, toPush, nil, nil)
if err == nil {
h.lgr.Tracef("cluster/commithook: successfully pushed chunks, setting root")
lgr.Tracef("cluster/commithook: successfully pushed chunks, setting root")
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
var curRootHash hash.Hash
@@ -186,14 +210,14 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
h.mu.Lock()
if err == nil {
h.currentError = nil
h.lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
h.lastPushedHead = toPush
h.lastPushedSuccess = incomingTime
h.nextPushAttempt = time.Time{}
} else {
h.currentError = new(string)
*h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err)
h.lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
// add some delay if a new head didn't come in while we were pushing.
if toPush == h.nextHead {
// TODO: We could add some backoff here.
@@ -234,16 +258,18 @@ func (h *commithook) status() (replicationLag *time.Duration, lastUpdate *time.T
return
}
func (h *commithook) logger() *logrus.Entry {
return h.lgr.Load().(*logrus.Entry)
}
// TODO: Would be more efficient to only tick when we have outstanding work...
func (h *commithook) tick(ctx context.Context) {
defer h.wg.Done()
defer h.lgr.Trace("tick thread returning")
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
time.Sleep(1 * time.Second)
return
case <-ticker.C:
h.cond.Signal()
@@ -251,19 +277,6 @@ func (h *commithook) tick(ctx context.Context) {
}
}
func (h *commithook) run(ctx context.Context) {
// The hook comes up attempting to replicate the current head.
h.lgr.Tracef("cluster/commithook: background thread: running.")
h.wg.Add(2)
go h.replicate(ctx)
go h.tick(ctx)
<-ctx.Done()
h.lgr.Tracef("cluster/commithook: background thread: requested shutdown, signaling replication thread.")
h.cond.Signal()
h.wg.Wait()
h.lgr.Tracef("cluster/commithook: background thread: completed.")
}
func (h *commithook) setRole(role Role) {
h.mu.Lock()
defer h.mu.Unlock()
@@ -275,24 +288,25 @@ func (h *commithook) setRole(role Role) {
h.lastPushedSuccess = time.Time{}
h.nextPushAttempt = time.Time{}
h.role = role
h.lgr = h.rootLgr.WithField("role", string(role))
h.lgr.Store(h.rootLgr.WithField(logFieldRole, string(role)))
h.cond.Signal()
}
// Execute on this commithook updates the target root hash we're attempting to
// replicate and wakes the replication thread.
func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
h.lgr.Warnf("cluster/commithook: Execute called post commit")
lgr := h.logger()
lgr.Warnf("cluster/commithook: Execute called post commit")
cs := datas.ChunkStoreFromDatabase(db)
root, err := cs.Root(ctx)
if err != nil {
h.lgr.Warnf("cluster/commithook: Execute: error retrieving local database root: %v", err)
lgr.Warnf("cluster/commithook: Execute: error retrieving local database root: %v", err)
return err
}
h.mu.Lock()
defer h.mu.Unlock()
if root != h.nextHead {
h.lgr.Tracef("signaling replication thread to push new head: %v", root.String())
lgr.Tracef("signaling replication thread to push new head: %v", root.String())
h.nextHeadIncomingTime = time.Now()
h.nextHead = root
h.nextPushAttempt = time.Time{}

View File

@@ -83,7 +83,9 @@ func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig)
commithooks: make([]*commithook, 0),
lgr: lgr,
}
ret.sinterceptor.lgr = lgr.WithFields(logrus.Fields{})
ret.sinterceptor.setRole(role, epoch)
ret.cinterceptor.lgr = lgr.WithFields(logrus.Fields{})
ret.cinterceptor.setRole(role, epoch)
return ret, nil
}

View File

@@ -19,6 +19,8 @@ import (
"strconv"
"sync"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
@@ -43,6 +45,7 @@ const clusterRoleEpochHeader = "x-dolt-cluster-role-epoch"
// epoch than this server, this incterceptor coordinates with the Controller to
// immediately transition to standby and to stop replicating to the standby.
type clientinterceptor struct {
lgr *logrus.Entry
role Role
epoch int
mu sync.Mutex
@@ -93,10 +96,14 @@ func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, role Role
epochs := header.Get(clusterRoleEpochHeader)
roles := header.Get(clusterRoleHeader)
if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) {
if retepoch, err := strconv.Atoi(epochs[0]); err == nil {
if retepoch > epoch {
if respepoch, err := strconv.Atoi(epochs[0]); err == nil {
if respepoch == epoch {
ci.lgr.Errorf("cluster: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.")
// TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|...
} else if respepoch > epoch {
// The server we replicate to thinks it is the primary at a higher epoch than us...
// TODO: Signal to controller that we are forced to become a standby at epoch |retepoch|...
ci.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch)
// TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|...
}
}
}
@@ -124,6 +131,7 @@ func (ci *clientinterceptor) Options() []grpc.DialOption {
// than our current epoch, this interceptor coordinates with the Controller to
// immediately transition to standby and allow replication requests through.
type serverinterceptor struct {
lgr *logrus.Entry
role Role
epoch int
mu sync.Mutex
@@ -172,8 +180,17 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role,
roles := header.Get(clusterRoleHeader)
if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) && role == RolePrimary {
if reqepoch, err := strconv.Atoi(epochs[0]); err == nil {
if reqepoch > epoch {
if reqepoch == epoch {
// Misconfiguration in the cluster means this
// server and its standby are marked as Primary
// at the same epoch. We will become standby
// and our peer will become standby. An
// operator will need to get involved.
si.lgr.Errorf("cluster: this server and its standby replica are both primary at the same epoch. force transitioning to standby.")
// TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch|
} else if reqepoch > epoch {
// The client replicating to us thinks it is the primary at a higher epoch than us.
si.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
// TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch|
}
}

View File

@@ -0,0 +1,448 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dprocedures
import (
"errors"
"fmt"
"io"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/conflict"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/merge"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
var ErrConfSchIncompatible = errors.New("the conflict schema's columns are not equal to the current schema's columns, please resolve manually")
// doltConflictsResolve is the stored procedure version of the function `dolt conflict resolve`.
func doltConflictsResolve(ctx *sql.Context, args ...string) (sql.RowIter, error) {
res, err := DoDoltConflictsResolve(ctx, args)
if err != nil {
return nil, err
}
return rowToIter(res), nil
}
// DoltConflictsCatFunc runs a `dolt commit` in the SQL context, committing staged changes to head.
// Deprecated: please use the version in the dprocedures package
type DoltConflictsCatFunc struct {
children []sql.Expression
}
func getProllyRowMaps(ctx *sql.Context, vrw types.ValueReadWriter, ns tree.NodeStore, hash hash.Hash, tblName string) (prolly.Map, error) {
rootVal, err := doltdb.LoadRootValueFromRootIshAddr(ctx, vrw, ns, hash)
tbl, ok, err := rootVal.GetTable(ctx, tblName)
if err != nil {
return prolly.Map{}, err
}
if !ok {
return prolly.Map{}, doltdb.ErrTableNotFound
}
idx, err := tbl.GetRowData(ctx)
if err != nil {
return prolly.Map{}, err
}
return durable.ProllyMapFromIndex(idx), nil
}
func resolveProllyConflicts(ctx *sql.Context, tbl *doltdb.Table, tblName string, sch schema.Schema) (*doltdb.Table, error) {
var err error
artifactIdx, err := tbl.GetArtifacts(ctx)
if err != nil {
return nil, err
}
artifactMap := durable.ProllyMapFromArtifactIndex(artifactIdx)
iter, err := artifactMap.IterAllConflicts(ctx)
if err != nil {
return nil, err
}
// get mutable prolly map
ourIdx, err := tbl.GetRowData(ctx)
if err != nil {
return nil, err
}
ourMap := durable.ProllyMapFromIndex(ourIdx)
mutMap := ourMap.Mutate()
// get mutable secondary indexes
idxSet, err := tbl.GetIndexSet(ctx)
if err != nil {
return nil, err
}
mutIdxs, err := merge.GetMutableSecondaryIdxs(ctx, sch, idxSet)
if err != nil {
return nil, err
}
var theirRoot hash.Hash
var theirMap prolly.Map
for {
cnfArt, err := iter.Next(ctx)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
// reload if their root hash changes
if theirRoot != cnfArt.TheirRootIsh {
theirMap, err = getProllyRowMaps(ctx, tbl.ValueReadWriter(), tbl.NodeStore(), cnfArt.TheirRootIsh, tblName)
if err != nil {
return nil, err
}
theirRoot = cnfArt.TheirRootIsh
}
// get row data
var ourRow, theirRow val.Tuple
err = ourMap.Get(ctx, cnfArt.Key, func(_, v val.Tuple) error {
ourRow = v
return nil
})
if err != nil {
return nil, err
}
err = theirMap.Get(ctx, cnfArt.Key, func(_, v val.Tuple) error {
theirRow = v
return nil
})
if err != nil {
return nil, err
}
// update row data
if len(theirRow) == 0 {
err = mutMap.Delete(ctx, cnfArt.Key)
} else {
err = mutMap.Put(ctx, cnfArt.Key, theirRow)
}
if err != nil {
return nil, err
}
// update secondary indexes
for _, mutIdx := range mutIdxs {
if len(ourRow) == 0 {
err = mutIdx.InsertEntry(ctx, cnfArt.Key, theirRow)
} else if len(theirRow) == 0 {
err = mutIdx.DeleteEntry(ctx, cnfArt.Key, ourRow)
} else {
err = mutIdx.UpdateEntry(ctx, cnfArt.Key, ourRow, theirRow)
}
if err != nil {
return nil, err
}
}
}
// Update table
newMap, err := mutMap.Map(ctx)
if err != nil {
return nil, err
}
newIdx := durable.IndexFromProllyMap(newMap)
newTbl, err := tbl.UpdateRows(ctx, newIdx)
if err != nil {
return nil, err
}
// Apply index set changes
for _, mutIdx := range mutIdxs {
m, err := mutIdx.Map(ctx)
if err != nil {
return nil, err
}
idxSet, err = idxSet.PutIndex(ctx, mutIdx.Name, durable.IndexFromProllyMap(m))
if err != nil {
return nil, err
}
}
newTbl, err = newTbl.SetIndexSet(ctx, idxSet)
if err != nil {
return nil, err
}
return newTbl, nil
}
func resolvePkConflicts(ctx *sql.Context, opts editor.Options, tbl *doltdb.Table, tblName string, sch schema.Schema, conflicts types.Map) (*doltdb.Table, error) {
// Create table editor
tblEditor, err := editor.NewTableEditor(ctx, tbl, sch, tblName, opts)
if err != nil {
return nil, err
}
err = conflicts.Iter(ctx, func(key, val types.Value) (stop bool, err error) {
k := key.(types.Tuple)
cnf, err := conflict.ConflictFromTuple(val.(types.Tuple))
if err != nil {
return true, err
}
// row was removed
if types.IsNull(cnf.MergeValue) {
baseRow, err := row.FromNoms(sch, k, cnf.Base.(types.Tuple))
if err != nil {
return true, err
}
err = tblEditor.DeleteRow(ctx, baseRow)
if err != nil {
return true, err
}
return false, nil
}
newRow, err := row.FromNoms(sch, k, cnf.MergeValue.(types.Tuple))
if err != nil {
return true, err
}
if isValid, err := row.IsValid(newRow, sch); err != nil {
return true, err
} else if !isValid {
return true, table.NewBadRow(newRow, "error resolving conflicts", fmt.Sprintf("row with primary key %v in table %s does not match constraints or types of the table's schema.", key, tblName))
}
// row was added
if types.IsNull(cnf.Value) {
err = tblEditor.InsertRow(ctx, newRow, nil)
if err != nil {
return true, err
}
return false, nil
}
// row was modified
oldRow, err := row.FromNoms(sch, k, cnf.Value.(types.Tuple))
if err != nil {
return true, err
}
err = tblEditor.UpdateRow(ctx, oldRow, newRow, nil)
if err != nil {
return true, err
}
return false, nil
})
if err != nil {
return nil, err
}
return tblEditor.Table(ctx)
}
func resolveKeylessConflicts(ctx *sql.Context, tbl *doltdb.Table, conflicts types.Map) (*doltdb.Table, error) {
rowData, err := tbl.GetNomsRowData(ctx)
if err != nil {
return nil, err
}
mapEditor := rowData.Edit()
err = conflicts.Iter(ctx, func(key, value types.Value) (stop bool, err error) {
cnf, err := conflict.ConflictFromTuple(value.(types.Tuple))
if err != nil {
return true, err
}
if types.IsNull(cnf.MergeValue) {
mapEditor.Remove(key)
} else {
mapEditor.Set(key, cnf.MergeValue)
}
return false, nil
})
if err != nil {
return nil, err
}
rowData, err = mapEditor.Map(ctx)
if err != nil {
return nil, err
}
return tbl.UpdateNomsRows(ctx, rowData)
}
func resolveNomsConflicts(ctx *sql.Context, opts editor.Options, tbl *doltdb.Table, tblName string, sch schema.Schema) (*doltdb.Table, error) {
// Get conflicts
_, confIdx, err := tbl.GetConflicts(ctx)
if err != nil {
return nil, err
}
conflicts := durable.NomsMapFromConflictIndex(confIdx)
if schema.IsKeyless(sch) {
return resolveKeylessConflicts(ctx, tbl, conflicts)
}
return resolvePkConflicts(ctx, opts, tbl, tblName, sch, conflicts)
}
func validateConstraintViolations(ctx *sql.Context, before, after *doltdb.RootValue, table string) error {
tables, err := after.GetTableNames(ctx)
if err != nil {
return err
}
// todo: this is an expensive way to compute this
_, violators, err := merge.AddForeignKeyViolations(ctx, after, before, set.NewStrSet(tables), hash.Of(nil))
if err != nil {
return err
}
if violators.Size() > 0 {
return fmt.Errorf("resolving conflicts for table %s created foreign key violations", table)
}
return nil
}
func clearTableAndUpdateRoot(ctx *sql.Context, root *doltdb.RootValue, tbl *doltdb.Table, tblName string) (*doltdb.RootValue, error) {
newTbl, err := tbl.ClearConflicts(ctx)
if err != nil {
return nil, err
}
newRoot, err := root.PutTable(ctx, tblName, newTbl)
if err != nil {
return nil, err
}
return newRoot, nil
}
func ResolveConflicts(ctx *sql.Context, dSess *dsess.DoltSession, root *doltdb.RootValue, dbName string, ours bool, tblNames []string) error {
for _, tblName := range tblNames {
tbl, ok, err := root.GetTable(ctx, tblName)
if err != nil {
return err
}
if !ok {
return doltdb.ErrTableNotFound
}
if has, err := tbl.HasConflicts(ctx); err != nil {
return err
} else if !has {
return nil
}
sch, err := tbl.GetSchema(ctx)
if err != nil {
return err
}
_, ourSch, theirSch, err := tbl.GetConflictSchemas(ctx, tblName)
if err != nil {
return err
}
if ours && !schema.ColCollsAreEqual(sch.GetAllCols(), ourSch.GetAllCols()) {
return ErrConfSchIncompatible
} else if !ours && !schema.ColCollsAreEqual(sch.GetAllCols(), theirSch.GetAllCols()) {
return ErrConfSchIncompatible
}
if !ours {
if tbl.Format() == types.Format_DOLT {
tbl, err = resolveProllyConflicts(ctx, tbl, tblName, sch)
} else {
state, _, err := dSess.LookupDbState(ctx, dbName)
if err != nil {
return err
}
opts := state.WriteSession.GetOptions()
tbl, err = resolveNomsConflicts(ctx, opts, tbl, tblName, sch)
}
if err != nil {
return err
}
}
newRoot, err := clearTableAndUpdateRoot(ctx, root, tbl, tblName)
if err != nil {
return err
}
err = validateConstraintViolations(ctx, root, newRoot, tblName)
if err != nil {
return err
}
root = newRoot
}
return dSess.SetRoot(ctx, dbName, root)
}
func DoDoltConflictsResolve(ctx *sql.Context, args []string) (int, error) {
dbName := ctx.GetCurrentDatabase()
fmt.Printf("database name: %s", dbName)
apr, err := cli.CreateConflictsResolveArgParser().Parse(args)
if err != nil {
return 1, err
}
dSess := dsess.DSessFromSess(ctx.Session)
roots, ok := dSess.GetRoots(ctx, dbName)
if !ok {
return 1, fmt.Errorf("Could not load database %s", dbName)
}
ours := apr.Contains(cli.OursFlag)
theirs := apr.Contains(cli.TheirsFlag)
if ours && theirs {
return 1, fmt.Errorf("specify only either --ours or --theirs")
} else if !ours && !theirs {
return 1, fmt.Errorf("--ours or --theirs must be supplied")
}
if apr.NArg() == 0 {
return 1, fmt.Errorf("specify at least one table to resolve conflicts")
}
// get all tables in conflict
root := roots.Working
tbls := apr.Args
if len(tbls) == 1 && tbls[0] == "." {
if allTables, err := root.TablesInConflict(ctx); err != nil {
return 1, err
} else {
tbls = allTables
}
}
err = ResolveConflicts(ctx, dSess, root, dbName, ours, tbls)
if err != nil {
return 1, err
}
return 0, nil
}

View File

@@ -24,6 +24,7 @@ var DoltProcedures = []sql.ExternalStoredProcedureDetails{
{Name: "dolt_clean", Schema: int64Schema("status"), Function: doltClean},
{Name: "dolt_clone", Schema: int64Schema("status"), Function: doltClone},
{Name: "dolt_commit", Schema: stringSchema("hash"), Function: doltCommit},
{Name: "dolt_conflicts_resolve", Schema: int64Schema("status"), Function: doltConflictsResolve},
{Name: "dolt_fetch", Schema: int64Schema("success"), Function: doltFetch},
{Name: "dolt_merge", Schema: int64Schema("fast_forward", "conflicts"), Function: doltMerge},
{Name: "dolt_pull", Schema: int64Schema("fast_forward", "conflicts"), Function: doltPull},

View File

@@ -47,6 +47,7 @@ const (
var ErrWorkingSetChanges = goerrors.NewKind("Cannot switch working set, session state is dirty. " +
"Rollback or commit changes before changing working sets.")
var ErrSessionNotPeristable = errors.New("session is not persistable")
var ErrCurrentBranchDeleted = errors.New("current branch has been force deleted. run 'USE <database>/<branch>' to checkout a different branch, or reconnect to the server")
// DoltSession is the sql.Session implementation used by dolt. It is accessible through a *sql.Context instance
type DoltSession struct {
@@ -177,6 +178,36 @@ func (d *DoltSession) Flush(ctx *sql.Context, dbName string) error {
return d.SetRoot(ctx, dbName, ws.WorkingRoot())
}
// ValidateSession validates a working set if there are a valid sessionState with non-nil working set.
// If there is no sessionState or its current working set not defined, then no need for validation,
// so no error is returned.
func (d *DoltSession) ValidateSession(ctx *sql.Context, dbName string) error {
sessionState, ok, err := d.LookupDbState(ctx, dbName)
if !ok {
return nil
}
if err != nil {
return err
}
if sessionState.WorkingSet == nil {
return nil
}
wsRef := sessionState.WorkingSet.Ref()
_, err = sessionState.dbData.Ddb.ResolveWorkingSet(ctx, wsRef)
if err == doltdb.ErrWorkingSetNotFound {
_, err = d.newWorkingSetForHead(ctx, wsRef, dbName)
// if the current head is not found, the branch was force deleted, so use nil working set.
if errors.Is(err, doltdb.ErrBranchNotFound) {
return ErrCurrentBranchDeleted
} else if err != nil {
return err
}
} else if err != nil {
return err
}
return nil
}
// StartTransaction refreshes the state of this session and starts a new transaction.
func (d *DoltSession) StartTransaction(ctx *sql.Context, dbName string, tCharacteristic sql.TransactionCharacteristic) (sql.Transaction, error) {
if TransactionsDisabled(ctx) {

View File

@@ -43,7 +43,7 @@ func NewConflictsTable(ctx *sql.Context, tblName string, root *doltdb.RootValue,
}
func newNomsConflictsTable(ctx *sql.Context, tbl *doltdb.Table, tblName string, root *doltdb.RootValue, rs RootSetter) (sql.Table, error) {
rd, err := merge.NewConflictReader(ctx, tbl)
rd, err := merge.NewConflictReader(ctx, tbl, tblName)
if err != nil {
return nil, err
}
@@ -109,7 +109,7 @@ func (ct ConflictsTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error)
// PartitionRows returns a RowIter for the given partition
func (ct ConflictsTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
// conflict reader must be reset each time partitionRows is called.
rd, err := merge.NewConflictReader(ctx, ct.tbl)
rd, err := merge.NewConflictReader(ctx, ct.tbl, ct.tblName)
if err != nil {
return nil, err
}

View File

@@ -201,6 +201,104 @@ var DoltBranchMultiSessionScriptTests = []queries.ScriptTest{
},
},
},
{
Name: "Test multi-session behavior for force deleting active branch with autocommit on",
Assertions: []queries.ScriptTestAssertion{
{
Query: "/* client a */ SET @@autocommit=1;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ CALL DOLT_CHECKOUT('-b', 'branch1');",
Expected: []sql.Row{{0}},
},
{
Query: "/* client a */ select active_branch();",
Expected: []sql.Row{{"branch1"}},
},
{
Query: "/* client b */ select active_branch();",
Expected: []sql.Row{{"main"}},
},
{
Query: "/* client b */ select name from dolt_branches order by name;",
Expected: []sql.Row{{"branch1"}, {"main"}},
},
{
Query: "/* client b */ CALL DOLT_BRANCH('-D', 'branch1');",
Expected: []sql.Row{{0}},
},
{
Query: "/* client b */ select name from dolt_branches;",
Expected: []sql.Row{{"main"}},
},
{
Query: "/* client a */ select name from dolt_branches;",
ExpectedErrStr: "Error 1105: current branch has been force deleted. run 'USE <database>/<branch>' to checkout a different branch, or reconnect to the server",
},
{
Query: "/* client a */ CALL DOLT_CHECKOUT('main');",
ExpectedErrStr: "Error 1105: current branch has been force deleted. run 'USE <database>/<branch>' to checkout a different branch, or reconnect to the server",
},
{
Query: "/* client a */ USE dolt/main;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ select active_branch();",
Expected: []sql.Row{{"main"}},
},
},
},
{
Name: "Test multi-session behavior for force deleting active branch with autocommit off",
Assertions: []queries.ScriptTestAssertion{
{
Query: "/* client a */ SET @@autocommit=0;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ CALL DOLT_CHECKOUT('-b', 'branch1');",
Expected: []sql.Row{{0}},
},
{
Query: "/* client a */ select active_branch();",
Expected: []sql.Row{{"branch1"}},
},
{
Query: "/* client b */ select active_branch();",
Expected: []sql.Row{{"main"}},
},
{
Query: "/* client b */ select name from dolt_branches order by name;",
Expected: []sql.Row{{"branch1"}, {"main"}},
},
{
Query: "/* client b */ CALL DOLT_BRANCH('-D', 'branch1');",
Expected: []sql.Row{{0}},
},
{
Query: "/* client b */ select name from dolt_branches;",
Expected: []sql.Row{{"main"}},
},
{
Query: "/* client a */ select name from dolt_branches;",
ExpectedErrStr: "Error 1105: current branch has been force deleted. run 'USE <database>/<branch>' to checkout a different branch, or reconnect to the server",
},
{
Query: "/* client a */ CALL DOLT_CHECKOUT('main');",
ExpectedErrStr: "Error 1105: current branch has been force deleted. run 'USE <database>/<branch>' to checkout a different branch, or reconnect to the server",
},
{
Query: "/* client a */ USE dolt/main;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ select active_branch();",
Expected: []sql.Row{{"main"}},
},
},
},
}
// TestDoltMultiSessionBehavior runs tests that exercise multi-session logic on a running SQL server. Statements

View File

@@ -435,6 +435,49 @@ teardown() {
[[ "$output" =~ "Merge:" ]] || false
}
@test "1pk5col-ints: generate a merge conflict and resolve with ours using stored procedure" {
dolt add test
dolt commit -m "added test table"
dolt branch test-branch
dolt sql -q "insert into test values (0, 1, 2, 3, 4, 5)"
dolt add test
dolt commit -m "added test row"
dolt checkout test-branch
dolt sql -q "insert into test values (0, 1, 2, 3, 4, 6)"
dolt add test
dolt commit -m "added conflicting test row"
dolt checkout main
run dolt merge test-branch --no-commit
[ "$status" -eq 0 ]
[[ "$output" =~ "CONFLICT (content)" ]]
run dolt conflicts cat test
[ "$status" -eq 0 ]
[[ "$output" =~ \+[[:space:]]+\|[[:space:]]+ours[[:space:]] ]] || false
[[ "$output" =~ \+[[:space:]]+\|[[:space:]]+theirs[[:space:]] ]] || false
EXPECTED=$(echo -e "table,num_conflicts\ntest,1")
run dolt sql -r csv -q 'SELECT * FROM dolt_conflicts'
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
run dolt sql -q "call dolt_conflicts_resolve('--ours', 'test')"
[ "$status" -eq 0 ]
run dolt sql -q "select * from test"
[ "$status" -eq 0 ]
[[ "$output" =~ \|[[:space:]]+5 ]] || false
[[ ! "$output" =~ \|[[:space:]]+6 ]] || false
run dolt conflicts cat test
[[ ! "$output" =~ "ours" ]] || false
[[ ! "$output" =~ "theirs" ]] || false
dolt add test
dolt commit -m "merged and resolved conflict"
run dolt log
[[ "$output" =~ "added test row" ]] || false
[[ "$output" =~ "added conflicting test row" ]] || false
[[ "$output" =~ "merged and resolved conflict" ]] || false
[[ "$output" =~ "Merge:" ]] || false
}
@test "1pk5col-ints: generate a merge conflict and try to roll back using dolt merge --abort" {
dolt add test
dolt commit -m "added test table"
@@ -487,6 +530,26 @@ teardown() {
[[ ! "$output" =~ "|5" ]] || false
}
@test "1pk5col-ints: generate a merge conflict and resolve with theirs using stored procedure" {
dolt add test
dolt commit -m "added test table"
dolt branch test-branch
dolt sql -q "insert into test values (0, 1, 2, 3, 4, 5)"
dolt add test
dolt commit -m "added test row"
dolt checkout test-branch
dolt sql -q "insert into test values (0, 1, 2, 3, 4, 6)"
dolt add test
dolt commit -m "added conflicting test row"
dolt checkout main
dolt merge test-branch
run dolt sql -q "call dolt_conflicts_resolve('--theirs', 'test')"
[ "$status" -eq 0 ]
run dolt sql -q "select * from test"
[[ "$output" =~ \|[[:space:]]+6 ]] || false
[[ ! "$output" =~ "|5" ]] || false
}
@test "1pk5col-ints: put a row that violates the schema" {
run dolt sql -q "insert into test values (0, 1, 2, 3, 4, 'foo')"
[ "$status" -ne 0 ]

View File

@@ -459,6 +459,84 @@ SQL
dolt reset --hard
}
@test "conflict-detection-2: two branches, one deletes rows, one modifies those same rows. merge. conflict. resolve with stored procedure" {
dolt sql -q 'CREATE TABLE foo (`pk` INT PRIMARY KEY, `col:1` INT);'
dolt sql -q "INSERT INTO foo VALUES (1, 1), (2, 1), (3, 1), (4, 1), (5, 1);"
dolt add foo
dolt commit -m 'initial commit.'
dolt checkout -b deleter
dolt sql -q 'delete from foo'
dolt add foo
dolt commit -m 'delete commit.'
dolt checkout -b modifier main
dolt sql -q 'update foo set `col:1` = `col:1` + 1 where pk in (1, 3, 5);'
dolt add foo
dolt commit -m 'modify commit.'
dolt checkout -b merge-into-modified modifier
run dolt merge deleter -m "merge"
[ "$status" -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
dolt merge --abort
# Accept theirs deletes all rows.
dolt checkout main
dolt branch -d -f merge-into-modified
dolt checkout -b merge-into-modified modifier
dolt merge deleter -m "merge"
dolt sql -q "call dolt_conflicts_resolve('--theirs', 'foo')"
run dolt sql -q 'select count(*) from foo'
[ "$status" -eq 0 ]
[[ "$output" =~ "| 0 |" ]] || false
dolt merge --abort
dolt reset --hard
# Accept ours deletes two rows.
dolt checkout main
dolt branch -d -f merge-into-modified
dolt checkout -b merge-into-modified modifier
dolt merge deleter -m "merge"
dolt sql -q "call dolt_conflicts_resolve('--ours', 'foo')"
run dolt sql -q 'select count(*) from foo'
[ "$status" -eq 0 ]
[[ "$output" =~ "| 3 |" ]] || false
dolt merge --abort
dolt reset --hard
dolt checkout -b merge-into-deleter deleter
run dolt merge modifier -m "merge"
[ "$status" -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
dolt merge --abort
# Accept ours deletes all rows.
dolt checkout main
dolt branch -d -f merge-into-deleter
dolt checkout -b merge-into-deleter deleter
dolt merge modifier -m "merge"
dolt sql -q "call dolt_conflicts_resolve('--ours', 'foo')"
run dolt sql -q 'select count(*) from foo'
[ "$status" -eq 0 ]
[[ "$output" =~ "| 0 |" ]] || false
dolt merge --abort
dolt reset --hard
# Accept theirs adds modified.
dolt checkout main
dolt branch -d -f merge-into-deleter
dolt checkout -b merge-into-deleter deleter
dolt merge modifier -m "merge"
dolt sql -q "call dolt_conflicts_resolve('--theirs', 'foo')"
run dolt sql -q 'select count(*) from foo'
[ "$status" -eq 0 ]
[[ "$output" =~ "| 3 |" ]] || false
dolt merge --abort
dolt reset --hard
}
@test "conflict-detection-2: dolt_force_transaction_commit along with dolt_allow_commit_conflicts ignores conflicts" {
dolt sql <<"SQL"
CREATE TABLE test (pk BIGINT PRIMARY KEY, v1 BIGINT);
@@ -528,3 +606,44 @@ SQL
[ "$output" = "" ]
! [[ "$output" =~ "pk" ]] || false
}
@test "conflict-detection-2: conflicts table properly cleared on dolt conflicts resolve with stored procedure" {
dolt sql -q "create table test(pk int, c1 int, primary key(pk))"
run dolt conflicts cat test
[ $status -eq 0 ]
[ "$output" = "" ]
! [[ "$output" =~ "pk" ]] || false
dolt add .
dolt commit -m "created table"
dolt branch branch1
dolt sql -q "insert into test values (0,0)"
dolt add .
dolt commit -m "inserted 0,0"
dolt checkout branch1
dolt sql -q "insert into test values (0,1)"
dolt add .
dolt commit -m "inserted 0,1"
dolt checkout main
dolt merge branch1 -m "merge"
run dolt sql -q "call dolt_conflicts_resolve('--ours', 'test')"
[ $status -eq 0 ]
run dolt conflicts cat test
[ $status -eq 0 ]
[ "$output" = "" ]
! [[ "$output" =~ "pk" ]] || false
run dolt sql -q "update test set c1=1"
[ $status -eq 0 ]
! [[ "$output" =~ "unresolved conflicts from the merge" ]] || false
dolt add .
dolt commit -m "Committing active merge"
run dolt conflicts cat test
[ $status -eq 0 ]
[ "$output" = "" ]
! [[ "$output" =~ "pk" ]] || false
}

View File

@@ -1290,6 +1290,44 @@ SQL
[[ "$output" =~ "violation" ]] || false
}
@test "foreign-keys: Resolve catches violations with stored procedure" {
dolt sql <<SQL
ALTER TABLE child ADD CONSTRAINT fk_v1 FOREIGN KEY (v1) REFERENCES parent(v1);
INSERT INTO parent VALUES (0,0,0);
INSERT INTO child VALUES (0,0,0);
SQL
dolt add -A
dolt commit -m "added tables"
dolt branch other
dolt sql <<SQL
INSERT INTO parent VALUES (1,1,1);
INSERT INTO child VALUES (1,1,1);
SQL
dolt add -A
dolt commit -m "added 1s"
dolt checkout other
dolt sql <<SQL
INSERT INTO parent VALUES (1,2,2);
INSERT INTO child VALUES (1,2,2);
SQL
dolt add -A
dolt commit -m "added 2s"
dolt checkout main
dolt merge other -m "merge other"
run dolt sql <<SQL
set @@dolt_allow_commit_conflicts = 1;
call dolt_conflicts_resolve('--theirs', 'parent');
SQL
[ "$status" -eq 1 ]
[[ "$output" =~ "violation" ]] || false
run dolt sql <<SQL
set @@dolt_allow_commit_conflicts = 1;
call dolt_conflicts_resolve('--theirs', 'child');
SQL
[ "$status" -eq 1 ]
[[ "$output" =~ "violation" ]] || false
}
@test "foreign-keys: FKs move with the working set on checkout" {
dolt add . && dolt commit -m "added parent and child tables"
dolt branch other

View File

@@ -181,6 +181,24 @@ setup_merge() {
[[ "${lines[3]}" =~ "2,12" ]] || false
}
@test "garbage_collection: leave merge commit with stored procedure" {
skip_nbf_dolt
setup_merge
dolt merge other -m "merge"
dolt gc
dolt sql -q "call dolt_conflicts_resolve('--ours', '.')"
dolt add .
dolt commit -am "resolved conflicts with ours"
run dolt sql -q "SELECT * FROM test;" -r csv
[ "$status" -eq 0 ]
[[ "${lines[1]}" =~ "0,10" ]] || false
[[ "${lines[2]}" =~ "1,11" ]] || false
[[ "${lines[3]}" =~ "2,12" ]] || false
}
@test "garbage_collection: leave working pre-merge" {
setup_merge

View File

@@ -2277,6 +2277,33 @@ SQL
[[ "${#lines[@]}" = "6" ]] || false
}
@test "index: Merge resolving all OURS with stored procedure" {
dolt sql -q "CREATE INDEX idx_v1 ON onepk(v1);"
dolt add -A
dolt commit -m "baseline commit"
dolt checkout -b other
dolt checkout main
dolt sql -q "INSERT INTO onepk VALUES (1, 11, 101), (2, 22, 202), (3, -33, 33), (4, 44, 404)"
dolt add -A
dolt commit -m "main changes"
dolt checkout other
dolt sql -q "INSERT INTO onepk VALUES (1, -11, 11), (2, -22, 22), (3, -33, 33), (4, -44, 44), (5, -55, 55)"
dolt add -A
dolt commit -m "other changes"
dolt checkout main
dolt merge other -m "merge"
dolt sql -q "call dolt_conflicts_resolve('--ours', 'onepk')"
run dolt index cat onepk idx_v1 -r=csv
[ "$status" -eq "0" ]
[[ "$output" =~ "v1,pk1" ]] || false
[[ "$output" =~ "-55,5" ]] || false
[[ "$output" =~ "-33,3" ]] || false
[[ "$output" =~ "11,1" ]] || false
[[ "$output" =~ "22,2" ]] || false
[[ "$output" =~ "44,4" ]] || false
[[ "${#lines[@]}" = "6" ]] || false
}
@test "index: Merge resolving all THEIRS" {
dolt sql -q "CREATE INDEX idx_v1 ON onepk(v1);"
@@ -2305,6 +2332,33 @@ SQL
[[ "${#lines[@]}" = "6" ]] || false
}
@test "index: Merge resolving all THEIRS with stored procedure" {
dolt sql -q "CREATE INDEX idx_v1 ON onepk(v1);"
dolt add -A
dolt commit -m "baseline commit"
dolt checkout -b other
dolt checkout main
dolt sql -q "INSERT INTO onepk VALUES (1, 11, 101), (2, 22, 202), (3, -33, 33), (4, 44, 404)"
dolt add -A
dolt commit -m "main changes"
dolt checkout other
dolt sql -q "INSERT INTO onepk VALUES (1, -11, 11), (2, -22, 22), (3, -33, 33), (4, -44, 44), (5, -55, 55)"
dolt add -A
dolt commit -m "other changes"
dolt checkout main
dolt merge other -m "merge"
dolt sql -q "call dolt_conflicts_resolve('--theirs', 'onepk')"
run dolt index cat onepk idx_v1 -r=csv
[ "$status" -eq "0" ]
[[ "$output" =~ "v1,pk1" ]] || false
[[ "$output" =~ "-55,5" ]] || false
[[ "$output" =~ "-44,4" ]] || false
[[ "$output" =~ "-33,3" ]] || false
[[ "$output" =~ "-22,2" ]] || false
[[ "$output" =~ "-11,1" ]] || false
[[ "${#lines[@]}" = "6" ]] || false
}
@test "index: Merge individually resolving OURS/THEIRS" {
dolt sql -q "CREATE INDEX idx_v1 ON onepk(v1);"

View File

@@ -169,3 +169,53 @@ SQL
[ "${lines[1]}" = '1,"{""a"": 1}"' ]
[ "${lines[2]}" = '2,"{""b"": 99}"' ]
}
@test "json: merge JSON values with stored procedure" {
dolt sql <<SQL
CREATE TABLE js (
pk int PRIMARY KEY,
js json
);
INSERT INTO js VALUES (1, '{"a":1}'), (2, '{"b":2}');
SQL
dolt add .
dolt commit -am "added JSON table"
dolt branch other
dolt branch another
dolt sql <<SQL
UPDATE js SET js = '{"a":11}' WHERE pk = 1;
SQL
dolt commit -am "made changes on branch main"
dolt checkout other
dolt sql <<SQL
UPDATE js SET js = '{"b":22}' WHERE pk = 2;
SQL
dolt commit -am "made changes on branch other"
dolt checkout main
dolt merge other --no-commit
run dolt sql -q "SELECT * FROM js;" -r csv
[ "$status" -eq 0 ]
[ "${lines[1]}" = '1,"{""a"": 11}"' ]
[ "${lines[2]}" = '2,"{""b"": 22}"' ]
dolt commit -am "merged other into main"
# test merge conflicts
dolt checkout another
dolt sql <<SQL
UPDATE js SET js = '{"b":99}' WHERE pk = 2;
SQL
dolt commit -am "made changes on branch another"
run dolt merge other -m "merge"
[ "$status" -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
run dolt sql -q "call dolt_conflicts_resolve('--ours', 'js')"
[ "$status" -eq 0 ]
run dolt sql -q "SELECT * FROM js;" -r csv
[ "$status" -eq 0 ]
[ "${lines[1]}" = '1,"{""a"": 1}"' ]
[ "${lines[2]}" = '2,"{""b"": 99}"' ]
}

View File

@@ -425,6 +425,31 @@ SQL
[[ "${lines[1]}" = "6,6" ]] || false
}
@test "keyless: merge duplicate deletes with stored procedure" {
make_dupe_table
dolt branch left
dolt checkout -b right
dolt sql -q "DELETE FROM dupe LIMIT 2;"
dolt commit -am "deleted two rows on right"
dolt checkout left
dolt sql -q "DELETE FROM dupe LIMIT 4;"
dolt commit -am "deleted four rows on left"
run dolt merge right -m "merge"
[ $status -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
run dolt sql -q "call dolt_conflicts_resolve('--ours', 'dupe')"
[ $status -eq 0 ]
dolt commit -am "resolved"
run dolt sql -q "select sum(c0), sum(c1) from dupe" -r csv
[ $status -eq 0 ]
[[ "${lines[1]}" = "6,6" ]] || false
}
@test "keyless: diff duplicate updates" {
make_dupe_table
@@ -473,6 +498,31 @@ SQL
[[ "${lines[1]}" = "10,12" ]] || false
}
@test "keyless: merge duplicate updates with stored procedure" {
make_dupe_table
dolt branch left
dolt checkout -b right
dolt sql -q "UPDATE dupe SET c1 = 2 LIMIT 2;"
dolt commit -am "updated two rows on right"
dolt checkout left
dolt sql -q "UPDATE dupe SET c1 = 2 LIMIT 4;"
dolt commit -am "updated four rows on left"
run dolt merge right -m "merge"
[ $status -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
run dolt sql -q "call dolt_conflicts_resolve('--theirs', 'dupe')"
[ $status -eq 0 ]
dolt commit -am "resolved"
run dolt sql -q "select sum(c0), sum(c1) from dupe" -r csv
[ $status -eq 0 ]
[[ "${lines[1]}" = "10,12" ]] || false
}
@test "keyless: sql diff" {
skip "unimplemented"
dolt sql <<SQL
@@ -629,7 +679,6 @@ CSV
[ $status -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
dolt conflicts resolve --ours keyless
run dolt conflicts resolve --ours keyless
[ $status -eq 0 ]
dolt commit -am "resolved"
@@ -734,6 +783,34 @@ SQL
[[ "${lines[3]}" = "9,9" ]] || false
}
@test "keyless: merge branches with convergent mutation history with stored procedure" {
dolt branch other
dolt sql -q "INSERT INTO keyless VALUES (7,7),(8,8),(9,9);"
dolt commit -am "inserted on main"
dolt checkout other
dolt sql <<SQL
INSERT INTO keyless VALUES (9,19),(8,8),(7,17);
UPDATE keyless SET c0 = 7, c1 = 7 WHERE c1 = 19;
UPDATE keyless SET c0 = 9, c1 = 9 WHERE c1 = 17;
SQL
dolt commit -am "inserted on other"
run dolt merge main -m "merge"
[ $status -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
run dolt sql -q "call dolt_conflicts_resolve('--theirs', 'keyless')"
[ $status -eq 0 ]
dolt commit -am "resolved"
run dolt sql -q "select * from keyless where c0 > 6 order by c0" -r csv
[ $status -eq 0 ]
[[ "${lines[1]}" = "7,7" ]] || false
[[ "${lines[2]}" = "8,8" ]] || false
[[ "${lines[3]}" = "9,9" ]] || false
}
@test "keyless: diff branches with offset mutation history" {
dolt branch other
@@ -775,6 +852,31 @@ SQL
[[ "${lines[4]}" = "9,9" ]] || false
}
@test "keyless: merge branches with offset mutation history with stored procedure" {
dolt branch other
dolt sql -q "INSERT INTO keyless VALUES (7,7),(8,8),(9,9);"
dolt commit -am "inserted on main"
dolt checkout other
dolt sql -q "INSERT INTO keyless VALUES (7,7),(7,7),(8,8),(9,9);"
dolt commit -am "inserted on other"
run dolt merge main -m "merge"
[ $status -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
run dolt sql -q "call dolt_conflicts_resolve('--ours', 'keyless')"
[ $status -eq 0 ]
dolt commit -am "resolved"
run dolt sql -q "select * from keyless where c0 > 6 order by c0" -r csv
[ $status -eq 0 ]
[[ "${lines[1]}" = "7,7" ]] || false
[[ "${lines[2]}" = "7,7" ]] || false
[[ "${lines[3]}" = "8,8" ]] || false
[[ "${lines[4]}" = "9,9" ]] || false
}
@test "keyless: diff delete+add against working" {
dolt sql <<SQL
@@ -833,6 +935,32 @@ SQL
[ "${#lines[@]}" -eq 4 ]
}
@test "keyless: merge delete+add on two branches with stored procedure" {
dolt branch left
dolt checkout -b right
dolt sql -q "DELETE FROM keyless WHERE c0 = 2;"
dolt commit -am "deleted twos on right"
dolt checkout left
dolt sql -q "INSERT INTO keyless VALUES (2,2);"
dolt commit -am "inserted twos on left"
run dolt merge right -m "merge"
[ $status -eq 0 ]
[[ "$output" =~ "CONFLICT" ]] || false
run dolt sql -q "call dolt_conflicts_resolve('--theirs', 'keyless')"
[ $status -eq 0 ]
dolt commit -am "resolved"
run dolt sql -q "select * from keyless order by c0" -r csv
[ $status -eq 0 ]
[[ "${lines[1]}" = "0,0" ]] || false
[[ "${lines[2]}" = "1,1" ]] || false
[[ "${lines[3]}" = "1,1" ]] || false
[ "${#lines[@]}" -eq 4 ]
}
@test "keyless: create secondary index" {
dolt sql -q "create index idx on keyless (c1)"

View File

@@ -379,7 +379,6 @@ SQL
run dolt merge other --no-commit
log_status_eq 0
[[ "$output" =~ "CONFLICT" ]] || false
dolt conflicts resolve --theirs dolt_schemas
run dolt conflicts resolve --theirs dolt_schemas
log_status_eq 0
run dolt sql -q "select name from dolt_schemas" -r csv
@@ -387,6 +386,26 @@ SQL
[[ "$output" =~ "c1c1" ]] || false
}
@test "merge: Add views on two branches, merge with stored procedure" {
dolt branch other
dolt sql -q "CREATE VIEW pkpk AS SELECT pk*pk FROM test1;"
dolt add . && dolt commit -m "added view on table test1"
dolt checkout other
dolt sql -q "CREATE VIEW c1c1 AS SELECT c1*c1 FROM test2;"
dolt add . && dolt commit -m "added view on table test2"
dolt checkout main
run dolt merge other --no-commit
log_status_eq 0
[[ "$output" =~ "CONFLICT" ]] || false
run dolt sql -q "call dolt_conflicts_resolve('--theirs', 'dolt_schemas')"
log_status_eq 0
run dolt sql -q "select name from dolt_schemas" -r csv
log_status_eq 0
[[ "$output" =~ "c1c1" ]] || false
}
@test "merge: Add views on two branches, merge without conflicts" {
dolt branch other
dolt sql -q "CREATE VIEW pkpk AS SELECT pk*pk FROM test1;"

View File

@@ -0,0 +1,157 @@
#!/usr/bin/env bats
load $BATS_TEST_DIRNAME/helper/common.bash
setup() {
setup_common
}
basic_conflict() {
dolt sql -q "create table t (i int primary key, t text)"
dolt add .
dolt commit -am "init commit"
dolt checkout -b other
dolt sql -q "insert into t values (1,'other')"
dolt commit -am "other commit"
dolt checkout main
dolt sql -q "insert into t values (1,'main')"
dolt commit -am "main commit"
}
teardown() {
assert_feature_version
teardown_common
}
@test "sql-conflicts-resolve: call with no arguments, errors" {
run dolt sql -q "call dolt_conflicts_resolve()"
[ $status -eq 1 ]
[[ $output =~ "--ours or --theirs must be supplied" ]] || false
}
@test "sql-conflicts-resolve: call without specifying table, errors" {
run dolt sql -q "call dolt_conflicts_resolve('--theirs')"
[ $status -eq 1 ]
[[ $output =~ "specify at least one table to resolve conflicts" ]] || false
}
@test "sql-conflicts-resolve: call with non-existent table, errors" {
run dolt sql -q "call dolt_conflicts_resolve('--ours', 'notexists')"
[ $status -eq 1 ]
[[ $output =~ "table not found" ]] || false
}
@test "sql-conflicts-resolve: no conflicts, no changes" {
basic_conflict
dolt checkout main
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "main" ]] || false
dolt checkout other
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "other" ]] || false
dolt checkout main
run dolt sql -q "CALL dolt_conflicts_resolve('--ours', 't')"
[ $status -eq 0 ]
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "main" ]] || false
run dolt sql -q "CALL dolt_conflicts_resolve('--theirs', 't')"
[ $status -eq 0 ]
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "main" ]] || false
dolt checkout other
run dolt sql -q "CALL dolt_conflicts_resolve('--ours', 't')"
[ $status -eq 0 ]
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "other" ]] || false
run dolt sql -q "CALL dolt_conflicts_resolve('--theirs', 't')"
[ $status -eq 0 ]
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "other" ]] || false
}
@test "sql-conflicts-resolve: merge other into main, resolve with ours" {
basic_conflict
dolt checkout main
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "main" ]] || false
run dolt merge other
[ $status -eq 0 ]
[[ $output =~ "Automatic merge failed" ]] || false
run dolt sql -q "CALL dolt_conflicts_resolve('--ours', 't')"
[ $status -eq 0 ]
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "main" ]] || false
}
@test "sql-conflicts-resolve: merge other into main, resolve with theirs" {
basic_conflict
dolt checkout main
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "main" ]] || false
run dolt merge other
[ $status -eq 0 ]
[[ $output =~ "Automatic merge failed" ]] || false
run dolt sql -q "CALL dolt_conflicts_resolve('--theirs', 't')"
[ $status -eq 0 ]
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "other" ]] || false
}
@test "sql-conflicts-resolve: merge main into other, resolve with ours" {
basic_conflict
dolt checkout other
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "other" ]] || false
run dolt merge main
[ $status -eq 0 ]
[[ $output =~ "Automatic merge failed" ]] || false
run dolt sql -q "CALL dolt_conflicts_resolve('--ours', 't')"
[ $status -eq 0 ]
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "other" ]] || false
}
@test "sql-conflicts-resolve: merge main into other, resolve with theirs" {
basic_conflict
dolt checkout other
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "other" ]] || false
run dolt merge main
[ $status -eq 0 ]
[[ $output =~ "Automatic merge failed" ]] || false
run dolt sql -q "CALL dolt_conflicts_resolve('--theirs', 't')"
[ $status -eq 0 ]
run dolt sql -q "select * from t"
[ $status -eq 0 ]
[[ $output =~ "main" ]] || false
}