mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-23 13:48:42 -05:00
Merge pull request #5330 from dolthub/zachmu/remote-ref-replication
Fixed race when multiple clients attempt to connect to the same branc…
This commit is contained in:
@@ -931,6 +931,7 @@ func (ddb *DoltDB) GetRefsOfType(ctx context.Context, refTypeFilter map[ref.RefT
|
||||
}
|
||||
|
||||
// NewBranchAtCommit creates a new branch with HEAD at the commit given. Branch names must pass IsValidUserBranchName.
|
||||
// Silently overwrites any existing branch with the same name given, if one exists.
|
||||
func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, commit *Commit) error {
|
||||
if !IsValidBranchRef(branchRef) {
|
||||
panic(fmt.Sprintf("invalid branch name %s, use IsValidUserBranchName check", branchRef.String()))
|
||||
|
||||
@@ -152,9 +152,8 @@ func (p DoltDatabaseProvider) FileSystem() filesys.Filesys {
|
||||
return p.fs
|
||||
}
|
||||
|
||||
// If this DatabaseProvider is set to standby |true|, it returns every dolt
|
||||
// database as a read only database. Set back to |false| to get read-write
|
||||
// behavior from dolt databases again.
|
||||
// SetIsStandby sets whether this provider is set to standby |true|. Standbys return every dolt database as a read only
|
||||
// database. Set back to |false| to get read-write behavior from dolt databases again.
|
||||
func (p DoltDatabaseProvider) SetIsStandby(standby bool) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
@@ -187,13 +186,16 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da
|
||||
return wrapForStandby(db, standby), nil
|
||||
}
|
||||
|
||||
// Revision databases aren't tracked in the map, just instantiated on demand
|
||||
db, _, ok, err = p.databaseForRevision(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// A final check: if the database doesn't exist and this is a read replica, attempt to clone it from the remote
|
||||
if !ok {
|
||||
db, err = p.databaseForClone(ctx, name)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -203,7 +205,6 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da
|
||||
}
|
||||
}
|
||||
|
||||
// Don't track revision databases, just instantiate them on demand
|
||||
return wrapForStandby(db, standby), nil
|
||||
}
|
||||
|
||||
@@ -263,7 +264,7 @@ func (p DoltDatabaseProvider) attemptCloneReplica(ctx *sql.Context, dbName strin
|
||||
func (p DoltDatabaseProvider) HasDatabase(ctx *sql.Context, name string) bool {
|
||||
_, err := p.Database(ctx, name)
|
||||
if err != nil && !sql.ErrDatabaseNotFound.Is(err) {
|
||||
ctx.GetLogger().Errorf(err.Error())
|
||||
ctx.GetLogger().Warnf("Error getting database %s: %s", name, err.Error())
|
||||
}
|
||||
return err == nil
|
||||
}
|
||||
@@ -725,7 +726,7 @@ func (p DoltDatabaseProvider) databaseForRevision(ctx *sql.Context, revDB string
|
||||
return nil, dsess.InitialDbState{}, false, err
|
||||
}
|
||||
|
||||
caseSensitiveBranchName, isBranch, err := isBranch(ctx, srcDb, resolvedRevSpec, p.remoteDialer)
|
||||
caseSensitiveBranchName, isBranch, err := isBranch(ctx, srcDb, resolvedRevSpec)
|
||||
if err != nil {
|
||||
return nil, dsess.InitialDbState{}, false, err
|
||||
}
|
||||
@@ -733,8 +734,9 @@ func (p DoltDatabaseProvider) databaseForRevision(ctx *sql.Context, revDB string
|
||||
if isBranch {
|
||||
// fetch the upstream head if this is a replicated db
|
||||
if replicaDb, ok := srcDb.(ReadReplicaDatabase); ok {
|
||||
// TODO move this out of analysis phase, should only happen at read time
|
||||
err := switchAndFetchReplicaHead(ctx, resolvedRevSpec, replicaDb)
|
||||
// TODO move this out of analysis phase, should only happen at read time, when the transaction begins (like is
|
||||
// the case with a branch that already exists locally)
|
||||
err := p.ensureReplicaHeadExists(ctx, resolvedRevSpec, replicaDb)
|
||||
if err != nil {
|
||||
return nil, dsess.InitialDbState{}, false, err
|
||||
}
|
||||
@@ -751,7 +753,7 @@ func (p DoltDatabaseProvider) databaseForRevision(ctx *sql.Context, revDB string
|
||||
return db, init, true, nil
|
||||
}
|
||||
|
||||
isTag, err := isTag(ctx, srcDb, resolvedRevSpec, p.remoteDialer)
|
||||
isTag, err := isTag(ctx, srcDb, resolvedRevSpec)
|
||||
if err != nil {
|
||||
return nil, dsess.InitialDbState{}, false, err
|
||||
}
|
||||
@@ -759,6 +761,7 @@ func (p DoltDatabaseProvider) databaseForRevision(ctx *sql.Context, revDB string
|
||||
if isTag {
|
||||
// TODO: this should be an interface, not a struct
|
||||
replicaDb, ok := srcDb.(ReadReplicaDatabase)
|
||||
|
||||
if ok {
|
||||
srcDb = replicaDb.Database
|
||||
}
|
||||
@@ -981,69 +984,14 @@ func (p DoltDatabaseProvider) IsRevisionDatabase(ctx *sql.Context, dbName string
|
||||
return revision != "", nil
|
||||
}
|
||||
|
||||
// switchAndFetchReplicaHead tries to pull the latest version of a branch. Will fail if the branch
|
||||
// does not exist on the ReadReplicaDatabase's remote. If the target branch is not a replication
|
||||
// head, the new branch will not be continuously fetched.
|
||||
func switchAndFetchReplicaHead(ctx *sql.Context, branch string, db ReadReplicaDatabase) error {
|
||||
branchRef := ref.NewBranchRef(branch)
|
||||
|
||||
var branchExists bool
|
||||
branches, err := db.ddb.GetBranches(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, br := range branches {
|
||||
if br.String() == branch {
|
||||
branchExists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// check whether branch is on remote before creating local tracking branch
|
||||
cm, err := actions.FetchRemoteBranch(ctx, db.tmpDir, db.remote, db.srcDB, db.DbData().Ddb, branchRef, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmHash, err := cm.HashOf()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create refs/heads/branch dataset
|
||||
if !branchExists {
|
||||
err = db.ddb.NewBranchAtCommit(ctx, branchRef, cm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
dSess := dsess.DSessFromSess(ctx.Session)
|
||||
currentBranchRef, err := dSess.CWBHeadRef(ctx, db.name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create workingSets/heads/branch and update the working set
|
||||
err = db.RebaseSourceDb(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = pullBranches(ctx, db, []doltdb.RefWithHash{{
|
||||
Ref: branchRef,
|
||||
Hash: cmHash,
|
||||
}}, nil, currentBranchRef, pullBehavior_fastForward)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
// ensureReplicaHeadExists tries to pull the latest version of a remote branch. Will fail if the branch
|
||||
// does not exist on the ReadReplicaDatabase's remote.
|
||||
func (p DoltDatabaseProvider) ensureReplicaHeadExists(ctx *sql.Context, branch string, db ReadReplicaDatabase) error {
|
||||
return db.CreateLocalBranchFromRemote(ctx, ref.NewBranchRef(branch))
|
||||
}
|
||||
|
||||
// isBranch returns whether a branch with the given name is in scope for the database given
|
||||
func isBranch(ctx context.Context, db SqlDatabase, branchName string, dialer dbfactory.GRPCDialProvider) (string, bool, error) {
|
||||
func isBranch(ctx context.Context, db SqlDatabase, branchName string) (string, bool, error) {
|
||||
var ddbs []*doltdb.DoltDB
|
||||
|
||||
if rdb, ok := db.(ReadReplicaDatabase); ok {
|
||||
@@ -1062,7 +1010,7 @@ func isBranch(ctx context.Context, db SqlDatabase, branchName string, dialer dbf
|
||||
return brName, true, nil
|
||||
}
|
||||
|
||||
brName, branchExists, err = isRemoteBranch(ctx, db, ddbs, branchName)
|
||||
brName, branchExists, err = isRemoteBranch(ctx, ddbs, branchName)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
@@ -1088,21 +1036,15 @@ func isLocalBranch(ctx context.Context, ddbs []*doltdb.DoltDB, branchName string
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
// isRemoteBranch is called when the branch in connection string is not available as a local branch, so it searches
|
||||
// for a remote tracking branch. If there is only one match, it creates a new local branch from the remote tracking
|
||||
// branch and sets its upstream to it.
|
||||
func isRemoteBranch(ctx context.Context, srcDB SqlDatabase, ddbs []*doltdb.DoltDB, branchName string) (string, bool, error) {
|
||||
// isRemoteBranch returns whether the given branch name is a remote branch on any of the databases provided.
|
||||
func isRemoteBranch(ctx context.Context, ddbs []*doltdb.DoltDB, branchName string) (string, bool, error) {
|
||||
for _, ddb := range ddbs {
|
||||
bn, branchExists, remoteRef, err := ddb.HasRemoteTrackingBranch(ctx, branchName)
|
||||
bn, branchExists, _, err := ddb.HasRemoteTrackingBranch(ctx, branchName)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
if branchExists {
|
||||
err = createLocalBranchFromRemoteTrackingBranch(ctx, srcDB.DbData(), ddb, branchName, remoteRef)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
return bn, true, nil
|
||||
}
|
||||
}
|
||||
@@ -1110,36 +1052,8 @@ func isRemoteBranch(ctx context.Context, srcDB SqlDatabase, ddbs []*doltdb.DoltD
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
// createLocalBranchFromRemoteTrackingBranch creates a new local branch from given remote tracking branch
|
||||
// and sets its upstream to it.
|
||||
func createLocalBranchFromRemoteTrackingBranch(ctx context.Context, dbData env.DbData, ddb *doltdb.DoltDB, branchName string, remoteRef ref.RemoteRef) error {
|
||||
startPt := remoteRef.GetPath()
|
||||
err := actions.CreateBranchOnDB(ctx, ddb, branchName, startPt, false, remoteRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// at this point the branch is created on db
|
||||
branchRef := ref.NewBranchRef(branchName)
|
||||
remote := remoteRef.GetRemote()
|
||||
refSpec, err := ref.ParseRefSpecForRemote(remote, remoteRef.GetBranch())
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: '%s'", err, remote)
|
||||
}
|
||||
|
||||
src := refSpec.SrcRef(branchRef)
|
||||
dest := refSpec.DestRef(src)
|
||||
|
||||
return dbData.Rsw.UpdateBranch(branchRef.GetPath(), env.BranchConfig{
|
||||
Merge: ref.MarshalableRef{
|
||||
Ref: dest,
|
||||
},
|
||||
Remote: remote,
|
||||
})
|
||||
}
|
||||
|
||||
// isTag returns whether a tag with the given name is in scope for the database given
|
||||
func isTag(ctx context.Context, db SqlDatabase, tagName string, dialer dbfactory.GRPCDialProvider) (bool, error) {
|
||||
func isTag(ctx context.Context, db SqlDatabase, tagName string) (bool, error) {
|
||||
var ddbs []*doltdb.DoltDB
|
||||
|
||||
if rdb, ok := db.(ReadReplicaDatabase); ok {
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
@@ -183,7 +184,7 @@ func (rrd ReadReplicaDatabase) PullFromRemote(ctx *sql.Context) error {
|
||||
}
|
||||
|
||||
remoteRefs = prunedRefs
|
||||
err = pullBranches(ctx, rrd, remoteRefs, localRefs, currentBranchRef, behavior)
|
||||
err = pullBranchesAndUpdateWorkingSet(ctx, rrd, remoteRefs, localRefs, currentBranchRef, behavior)
|
||||
|
||||
if err != nil && !dsess.IgnoreReplicationErrors() {
|
||||
return err
|
||||
@@ -193,7 +194,7 @@ func (rrd ReadReplicaDatabase) PullFromRemote(ctx *sql.Context) error {
|
||||
}
|
||||
|
||||
case allHeads == int8(1):
|
||||
err = pullBranches(ctx, rrd, remoteRefs, localRefs, currentBranchRef, behavior)
|
||||
err = pullBranchesAndUpdateWorkingSet(ctx, rrd, remoteRefs, localRefs, currentBranchRef, behavior)
|
||||
if err != nil && !dsess.IgnoreReplicationErrors() {
|
||||
return err
|
||||
} else if err != nil {
|
||||
@@ -215,8 +216,55 @@ func (rrd ReadReplicaDatabase) PullFromRemote(ctx *sql.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rrd ReadReplicaDatabase) RebaseSourceDb(ctx *sql.Context) error {
|
||||
return rrd.srcDB.Rebase(ctx)
|
||||
// CreateLocalBranchFromRemote pulls the given branch from the remote database and creates a local tracking branch for
|
||||
// it. This is only used for initializing a new local branch being pulled from a remote during connection
|
||||
// initialization, and doesn't do the full work of remote synchronization that happens on transaction start.
|
||||
func (rrd ReadReplicaDatabase) CreateLocalBranchFromRemote(ctx *sql.Context, branchRef ref.BranchRef) error {
|
||||
_, err := rrd.limiter.Run(ctx, "pullNewBranch", func() (any, error) {
|
||||
// because several clients can queue up waiting to create the same local branch, double check to see if this
|
||||
// work was already done and bail early if so
|
||||
_, branchExists, err := rrd.ddb.HasBranch(ctx, branchRef.GetPath())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if branchExists {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
cm, err := actions.FetchRemoteBranch(ctx, rrd.tmpDir, rrd.remote, rrd.srcDB, rrd.ddb, branchRef, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cmHash, err := cm.HashOf()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create refs/heads/branch dataset
|
||||
err = rrd.ddb.NewBranchAtCommit(ctx, branchRef, cm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = rrd.srcDB.Rebase(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = pullBranches(ctx, rrd, []doltdb.RefWithHash{{
|
||||
Ref: branchRef,
|
||||
Hash: cmHash,
|
||||
}}, nil, pullBehavior_fastForward)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, err
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type pullBehavior bool
|
||||
@@ -224,9 +272,10 @@ type pullBehavior bool
|
||||
const pullBehavior_fastForward pullBehavior = false
|
||||
const pullBehavior_forcePull pullBehavior = true
|
||||
|
||||
// pullBranches pulls the remote branches named. If a corresponding local branch exists, it will be fast-forwarded. If
|
||||
// it doesn't exist, it will be created.
|
||||
func pullBranches(
|
||||
// pullBranchesAndUpdateWorkingSet pulls the remote branches named. If a corresponding local branch exists, it will be
|
||||
// fast-forwarded. If it doesn't exist, it will be created. Afterward, the working set of the current branch is
|
||||
// updated if the current branch ref was updated by the pull.
|
||||
func pullBranchesAndUpdateWorkingSet(
|
||||
ctx *sql.Context,
|
||||
rrd ReadReplicaDatabase,
|
||||
remoteRefs []doltdb.RefWithHash,
|
||||
@@ -234,68 +283,8 @@ func pullBranches(
|
||||
currentBranchRef ref.DoltRef,
|
||||
behavior pullBehavior,
|
||||
) error {
|
||||
localRefsByPath := make(map[string]doltdb.RefWithHash)
|
||||
remoteRefsByPath := make(map[string]doltdb.RefWithHash)
|
||||
remoteHashes := make([]hash.Hash, len(remoteRefs))
|
||||
|
||||
for i, b := range remoteRefs {
|
||||
remoteRefsByPath[b.Ref.GetPath()] = b
|
||||
remoteHashes[i] = b.Hash
|
||||
}
|
||||
|
||||
for _, b := range localRefs {
|
||||
localRefsByPath[b.Ref.GetPath()] = b
|
||||
}
|
||||
|
||||
// XXX: Our view of which remote branches to pull and what to set the
|
||||
// local branches to was computed outside of the limiter, concurrently
|
||||
// with other possible attempts to pull from the remote. Now we are
|
||||
// applying changes based on that view. This seems capable of rolling
|
||||
// back changes which were applied from another thread.
|
||||
|
||||
_, err := rrd.limiter.Run(ctx, "-all", func() (any, error) {
|
||||
err := rrd.ddb.PullChunks(ctx, rrd.tmpDir, rrd.srcDB, remoteHashes, nil)
|
||||
|
||||
for _, remoteRef := range remoteRefs {
|
||||
localRef, localRefExists := localRefsByPath[remoteRef.Ref.GetPath()]
|
||||
switch {
|
||||
case err != nil:
|
||||
case localRefExists:
|
||||
// TODO: this should work for workspaces too but doesn't, only branches
|
||||
if localRef.Ref.GetType() == ref.BranchRefType {
|
||||
if localRef.Hash != remoteRef.Hash {
|
||||
if behavior == pullBehavior_forcePull {
|
||||
err = rrd.ddb.SetHead(ctx, remoteRef.Ref, remoteRef.Hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
err = rrd.ddb.FastForwardToHash(ctx, remoteRef.Ref, remoteRef.Hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
switch remoteRef.Ref.GetType() {
|
||||
case ref.BranchRefType:
|
||||
err = rrd.ddb.SetHead(ctx, remoteRef.Ref, remoteRef.Hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case ref.TagRefType:
|
||||
err = rrd.ddb.SetHead(ctx, remoteRef.Ref, remoteRef.Hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
ctx.GetLogger().Warnf("skipping replication for unhandled remote ref %s", remoteRef.Ref.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
remoteRefsByPath, err := pullBranches(ctx, rrd, remoteRefs, localRefs, behavior)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -361,6 +350,142 @@ func pullBranches(
|
||||
return nil
|
||||
}
|
||||
|
||||
// pullBranches pulls the remote branches named and returns the map of their hashes keyed by branch path.
|
||||
func pullBranches(
|
||||
ctx *sql.Context,
|
||||
rrd ReadReplicaDatabase,
|
||||
remoteRefs []doltdb.RefWithHash,
|
||||
localRefs []doltdb.RefWithHash,
|
||||
behavior pullBehavior,
|
||||
) (map[string]doltdb.RefWithHash, error) {
|
||||
localRefsByPath := make(map[string]doltdb.RefWithHash)
|
||||
remoteRefsByPath := make(map[string]doltdb.RefWithHash)
|
||||
remoteHashes := make([]hash.Hash, len(remoteRefs))
|
||||
|
||||
for i, b := range remoteRefs {
|
||||
remoteRefsByPath[b.Ref.GetPath()] = b
|
||||
remoteHashes[i] = b.Hash
|
||||
}
|
||||
|
||||
for _, b := range localRefs {
|
||||
localRefsByPath[b.Ref.GetPath()] = b
|
||||
}
|
||||
|
||||
// XXX: Our view of which remote branches to pull and what to set the
|
||||
// local branches to was computed outside of the limiter, concurrently
|
||||
// with other possible attempts to pull from the remote. Now we are
|
||||
// applying changes based on that view. This seems capable of rolling
|
||||
// back changes which were applied from another thread.
|
||||
|
||||
_, err := rrd.limiter.Run(ctx, "-all", func() (any, error) {
|
||||
pullErr := rrd.ddb.PullChunks(ctx, rrd.tmpDir, rrd.srcDB, remoteHashes, nil)
|
||||
|
||||
REFS: // every successful pass through the loop below must end with CONTINUE REFS to get out of the retry loop
|
||||
for _, remoteRef := range remoteRefs {
|
||||
trackingRef := ref.NewRemoteRef(rrd.remote.Name, remoteRef.Ref.GetPath())
|
||||
localRef, localRefExists := localRefsByPath[remoteRef.Ref.GetPath()]
|
||||
|
||||
// loop on optimistic lock failures
|
||||
OPTIMISTIC_RETRY:
|
||||
for {
|
||||
if pullErr != nil || localRefExists {
|
||||
pullErr = nil
|
||||
|
||||
// TODO: this should work for workspaces too but doesn't, only branches
|
||||
if localRef.Ref.GetType() == ref.BranchRefType {
|
||||
err := rrd.pullLocalBranch(ctx, localRef, remoteRef, trackingRef, behavior)
|
||||
if errors.Is(err, datas.ErrOptimisticLockFailed) {
|
||||
continue OPTIMISTIC_RETRY
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
continue REFS
|
||||
} else {
|
||||
switch remoteRef.Ref.GetType() {
|
||||
case ref.BranchRefType:
|
||||
err := rrd.createNewBranchFromRemote(ctx, remoteRef, trackingRef)
|
||||
if errors.Is(err, datas.ErrOptimisticLockFailed) {
|
||||
continue OPTIMISTIC_RETRY
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: Establish upstream tracking for this new branch
|
||||
continue REFS
|
||||
case ref.TagRefType:
|
||||
err := rrd.ddb.SetHead(ctx, remoteRef.Ref, remoteRef.Hash)
|
||||
if errors.Is(err, datas.ErrOptimisticLockFailed) {
|
||||
continue OPTIMISTIC_RETRY
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
continue REFS
|
||||
default:
|
||||
ctx.GetLogger().Warnf("skipping replication for unhandled remote ref %s", remoteRef.Ref.String())
|
||||
continue REFS
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return remoteRefsByPath, nil
|
||||
}
|
||||
|
||||
func (rrd ReadReplicaDatabase) createNewBranchFromRemote(ctx *sql.Context, remoteRef doltdb.RefWithHash, trackingRef ref.RemoteRef) error {
|
||||
ctx.GetLogger().Tracef("creating local branch %s", remoteRef.Ref.GetPath())
|
||||
|
||||
// If a local branch isn't present for the remote branch, create a new branch for it. We need to use
|
||||
// NewBranchAtCommit so that the branch has its associated working set created at the same time. Creating
|
||||
// branch refs without associate working sets causes errors in other places.
|
||||
spec, err := doltdb.NewCommitSpec(remoteRef.Hash.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cm, err := rrd.ddb.Resolve(ctx, spec, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = rrd.ddb.NewBranchAtCommit(ctx, remoteRef.Ref, cm)
|
||||
err = rrd.ddb.SetHead(ctx, trackingRef, remoteRef.Hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return rrd.ddb.SetHead(ctx, trackingRef, remoteRef.Hash)
|
||||
}
|
||||
|
||||
func (rrd ReadReplicaDatabase) pullLocalBranch(ctx *sql.Context, localRef doltdb.RefWithHash, remoteRef doltdb.RefWithHash, trackingRef ref.RemoteRef, behavior pullBehavior) error {
|
||||
if localRef.Hash != remoteRef.Hash {
|
||||
if behavior == pullBehavior_forcePull {
|
||||
err := rrd.ddb.SetHead(ctx, remoteRef.Ref, remoteRef.Hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err := rrd.ddb.FastForwardToHash(ctx, remoteRef.Ref, remoteRef.Hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err := rrd.ddb.SetHead(ctx, trackingRef, remoteRef.Hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getReplicationRefs(ctx *sql.Context, rrd ReadReplicaDatabase) (
|
||||
remoteRefs []doltdb.RefWithHash,
|
||||
localRefs []doltdb.RefWithHash,
|
||||
|
||||
@@ -112,15 +112,13 @@ stop_sql_server() {
|
||||
|
||||
wait=$1
|
||||
if [ ! -z "$SERVER_PID" ]; then
|
||||
serverpidinuse=$(lsof -i -P -n | grep LISTEN | grep $SERVER_PID | wc -l)
|
||||
if [ $serverpidinuse -gt 0 ]; then
|
||||
kill $SERVER_PID
|
||||
# ignore failures of kill command in the case the server is already dead
|
||||
run kill $SERVER_PID
|
||||
if [ $wait ]; then
|
||||
while ps -p $SERVER_PID > /dev/null; do
|
||||
sleep .1;
|
||||
done
|
||||
fi;
|
||||
fi
|
||||
fi
|
||||
SERVER_PID=
|
||||
}
|
||||
|
||||
@@ -82,6 +82,8 @@ teardown() {
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_to_remote remote1
|
||||
dolt config --local --add sqlserver.global.dolt_async_replication 1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_all_heads 1
|
||||
|
||||
start_sql_server repo1
|
||||
|
||||
dolt sql-client --use-db repo1 -P $PORT -u dolt -q "CALL DOLT_COMMIT('-am', 'Step 1');"
|
||||
@@ -91,6 +93,7 @@ teardown() {
|
||||
|
||||
cd ../repo2
|
||||
dolt pull remote1
|
||||
|
||||
run dolt sql -q "select * from test" -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "${lines[0]}" =~ "pk" ]]
|
||||
@@ -198,11 +201,13 @@ teardown() {
|
||||
skiponwindows "Missing dependencies"
|
||||
|
||||
cd repo1
|
||||
dolt checkout -b new_feature
|
||||
dolt checkout -b b1
|
||||
dolt commit -am "first commit"
|
||||
dolt branch new_feature2
|
||||
dolt push remote1 new_feature
|
||||
dolt push remote1 new_feature2
|
||||
dolt branch b2
|
||||
dolt branch b3
|
||||
dolt push remote1 b1
|
||||
dolt push remote1 b2
|
||||
dolt push remote1 b3
|
||||
dolt checkout main
|
||||
dolt push remote1 main
|
||||
|
||||
@@ -216,12 +221,18 @@ teardown() {
|
||||
[ $status -eq 0 ]
|
||||
[ "$output" = "" ]
|
||||
|
||||
# Can't use dolt sql-client to connect to branches
|
||||
|
||||
# Connecting to heads that exist only on the remote should work fine (they get fetched)
|
||||
dolt sql-client --use-db "repo2/new_feature" -u dolt -P $PORT -q "show tables" "Tables_in_repo2/new_feature\ntest"
|
||||
dolt sql-client --use-db repo2 -P $PORT -u dolt -q 'use `repo2/new_feature2`'
|
||||
run dolt sql-client --use-db repo2 -P $PORT -u dolt -q 'select * from `repo2/new_feature2`.test'
|
||||
dolt sql-client --use-db "repo2/b1" -u dolt -P $PORT -q "show tables" "Tables_in_repo2/b1\ntest"
|
||||
dolt sql-client --use-db repo2 -P $PORT -u dolt -q 'use `repo2/b2`'
|
||||
run dolt sql-client --use-db repo2 -P $PORT -u dolt -q 'select * from `repo2/b2`.test'
|
||||
[ $status -eq 0 ]
|
||||
[[ "$output" =~ "pk" ]] || false
|
||||
[[ "$output" =~ " 0 " ]] || false
|
||||
[[ "$output" =~ " 1 " ]] || false
|
||||
[[ "$output" =~ " 2 " ]] || false
|
||||
|
||||
# Remote branch we have never USEd before
|
||||
run dolt sql-client --use-db repo2 -P $PORT -u dolt -q 'select * from `repo2/b3`.test'
|
||||
[ $status -eq 0 ]
|
||||
[[ "$output" =~ "pk" ]] || false
|
||||
[[ "$output" =~ " 0 " ]] || false
|
||||
@@ -229,21 +240,13 @@ teardown() {
|
||||
[[ "$output" =~ " 2 " ]] || false
|
||||
|
||||
# Connecting to heads that don't exist should error out
|
||||
run dolt sql-client --use-db "repo2/notexist" -u dolt -P $PORT -q 'use `repo2/new_feature2`'
|
||||
run dolt sql-client --use-db "repo2/notexist" -u dolt -P $PORT -q 'use `repo2/b2`'
|
||||
[ $status -ne 0 ]
|
||||
[[ $output =~ "database not found" ]] || false
|
||||
|
||||
run dolt sql-client --use-db repo2 -P $PORT -u dolt -q 'use `repo2/notexist`'
|
||||
[ $status -ne 0 ]
|
||||
[[ $output =~ "database not found" ]] || false
|
||||
|
||||
# Creating a branch locally that doesn't exist on the remote
|
||||
# works, but connecting to it is an error (nothing to pull)
|
||||
dolt sql-client --use-db "repo2/new_feature" -u dolt -P $PORT -q "call dolt_checkout('-b', 'new_branch')"
|
||||
|
||||
run dolt sql-client --use-db "repo2/new_branch" -u dolt -P $PORT -q "show tables"
|
||||
[ $status -ne 0 ]
|
||||
[[ $output =~ "database not found" ]] || false
|
||||
}
|
||||
|
||||
@test "remotes-sql-server: pull all heads" {
|
||||
@@ -398,6 +401,9 @@ teardown() {
|
||||
run dolt branch
|
||||
[[ ! "$output" =~ "feature" ]] || false
|
||||
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_all_heads 1
|
||||
dolt config --local --add sqlserver.global.dolt_read_replica_remote remote1
|
||||
|
||||
start_sql_server repo2
|
||||
|
||||
# No data on main
|
||||
@@ -410,11 +416,6 @@ teardown() {
|
||||
[[ "$output" =~ "feature" ]] || false
|
||||
[[ ! "$output" =~ "main" ]] || false
|
||||
|
||||
# connecting to remote branch that does not exist creates new local branch and sets upstream
|
||||
run dolt sql-client --use-db repo2/feature -P $PORT -u dolt -q "call dolt_commit('--allow-empty', '-m', 'empty'); call dolt_push()"
|
||||
[ $status -eq 0 ]
|
||||
[[ ! "$output" =~ "the current branch has no upstream branch" ]] || false
|
||||
|
||||
run dolt sql-client --use-db repo2/feature -P $PORT -u dolt -q "show tables"
|
||||
[ $status -eq 0 ]
|
||||
[[ "$output" =~ "Tables_in_repo2/feature" ]] || false
|
||||
@@ -422,12 +423,43 @@ teardown() {
|
||||
|
||||
run dolt branch
|
||||
[[ "$output" =~ "feature" ]] || false
|
||||
}
|
||||
|
||||
@test "remotes-sql-server: connect to remote branch pushed after server starts" {
|
||||
skiponwindows "Missing dependencies"
|
||||
|
||||
cd repo1
|
||||
dolt checkout -b feature
|
||||
dolt commit -am "first commit"
|
||||
dolt push remote1 feature
|
||||
dolt checkout main
|
||||
dolt push remote1 main
|
||||
|
||||
cd ../repo2
|
||||
dolt fetch
|
||||
run dolt branch
|
||||
[[ ! "$output" =~ "feature" ]] || false
|
||||
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_all_heads 1
|
||||
dolt config --local --add sqlserver.global.dolt_read_replica_remote remote1
|
||||
|
||||
start_sql_server repo2
|
||||
|
||||
cd ../repo1
|
||||
dolt checkout feature
|
||||
dolt pull remote1 feature
|
||||
run dolt log -n 1 --oneline
|
||||
[[ "$output" =~ "empty" ]] || false
|
||||
dolt branch newbranch
|
||||
dolt push remote1 newbranch
|
||||
|
||||
run dolt sql-client --use-db repo2/feature -P $PORT -u dolt -q "select active_branch()"
|
||||
[ $status -eq 0 ]
|
||||
[[ "$output" =~ "feature" ]] || false
|
||||
|
||||
run dolt sql-client --use-db repo2/newbranch -P $PORT -u dolt -q "select active_branch()"
|
||||
[ $status -eq 0 ]
|
||||
[[ "$output" =~ "newbranch" ]] || false
|
||||
|
||||
run dolt branch
|
||||
[[ "$output" =~ "feature" ]] || false
|
||||
[[ "$output" =~ "newbranch" ]] || false
|
||||
}
|
||||
|
||||
@test "remotes-sql-server: connect to remote tracking branch fails if there are multiple remotes" {
|
||||
|
||||
@@ -493,6 +493,54 @@ SQL
|
||||
[[ "$output" =~ "v1" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: pull creates remote tracking branches" {
|
||||
dolt clone file://./rem1 repo2
|
||||
cd repo2
|
||||
dolt sql -q "create table t1 (a int primary key);"
|
||||
dolt commit -Am "new table"
|
||||
dolt branch b1
|
||||
dolt branch b2
|
||||
dolt push origin b1
|
||||
dolt push origin b2
|
||||
|
||||
cd ../repo1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_all_heads 1
|
||||
dolt config --local --add sqlserver.global.dolt_read_replica_remote remote1
|
||||
|
||||
run dolt sql -q 'USE `repo1/b2`; show tables;' -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 3 ]
|
||||
[[ "$output" =~ "t1" ]] || false
|
||||
|
||||
run dolt branch -a
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 6 ]
|
||||
[[ "$output" =~ "remotes/remote1/b1" ]] || false
|
||||
[[ "$output" =~ "remotes/remote1/b2" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: connect to a branch not on the remote" {
|
||||
dolt clone file://./rem1 repo2
|
||||
cd repo2
|
||||
dolt sql -q "create table t1 (a int primary key);"
|
||||
dolt commit -Am "new table"
|
||||
dolt branch b1
|
||||
dolt push origin b1
|
||||
|
||||
cd ../repo1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_all_heads 1
|
||||
dolt config --local --add sqlserver.global.dolt_read_replica_remote remote1
|
||||
|
||||
run dolt sql -q 'USE `repo1/B1`; show tables;' -r csv
|
||||
[ "$status" -eq 0 ]
|
||||
[ "${#lines[@]}" -eq 3 ]
|
||||
[[ "$output" =~ "t1" ]] || false
|
||||
|
||||
run dolt sql -q 'USE `repo1/notfound`;' -r csv
|
||||
[ "$status" -ne 0 ]
|
||||
[[ "$output" =~ "database not found" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: push feature head" {
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_to_remote remote1
|
||||
@@ -675,6 +723,8 @@ SQL
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_to_remote remote1
|
||||
dolt config --local --add sqlserver.global.dolt_async_replication 1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_all_heads 1
|
||||
|
||||
dolt sql -q "create table t1 (a int primary key)"
|
||||
dolt sql -q "call dolt_add('.')"
|
||||
dolt sql -q "call dolt_commit('-am', 'cm')"
|
||||
|
||||
Reference in New Issue
Block a user