Fixed race when multiple clients attempt to connect to the same branch the first time a replica fetches it

This commit is contained in:
Zach Musgrave
2023-02-07 15:29:34 -08:00
parent 0ae04783b7
commit bbc1824a40
2 changed files with 67 additions and 59 deletions

View File

@@ -152,9 +152,8 @@ func (p DoltDatabaseProvider) FileSystem() filesys.Filesys {
return p.fs
}
// If this DatabaseProvider is set to standby |true|, it returns every dolt
// database as a read only database. Set back to |false| to get read-write
// behavior from dolt databases again.
// SetIsStandby sets whether this provider is set to standby |true|. Standbys return every dolt database as a read only
// database. Set back to |false| to get read-write behavior from dolt databases again.
func (p DoltDatabaseProvider) SetIsStandby(standby bool) {
p.mu.Lock()
defer p.mu.Unlock()
@@ -194,6 +193,7 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da
if !ok {
db, err = p.databaseForClone(ctx, name)
if err != nil {
return nil, err
}
@@ -733,8 +733,9 @@ func (p DoltDatabaseProvider) databaseForRevision(ctx *sql.Context, revDB string
if isBranch {
// fetch the upstream head if this is a replicated db
if replicaDb, ok := srcDb.(ReadReplicaDatabase); ok {
// TODO move this out of analysis phase, should only happen at read time
err := switchAndFetchReplicaHead(ctx, resolvedRevSpec, replicaDb)
// TODO move this out of analysis phase, should only happen at read time, when the transaction begins (like is
// the case with a branch that already exists locally)
err := p.ensureReplicaHeadExists(ctx, resolvedRevSpec, replicaDb)
if err != nil {
return nil, dsess.InitialDbState{}, false, err
}
@@ -759,6 +760,7 @@ func (p DoltDatabaseProvider) databaseForRevision(ctx *sql.Context, revDB string
if isTag {
// TODO: this should be an interface, not a struct
replicaDb, ok := srcDb.(ReadReplicaDatabase)
if ok {
srcDb = replicaDb.Database
}
@@ -981,64 +983,18 @@ func (p DoltDatabaseProvider) IsRevisionDatabase(ctx *sql.Context, dbName string
return revision != "", nil
}
// switchAndFetchReplicaHead tries to pull the latest version of a branch. Will fail if the branch
// does not exist on the ReadReplicaDatabase's remote. If the target branch is not a replication
// head, the new branch will not be continuously fetched.
func switchAndFetchReplicaHead(ctx *sql.Context, branch string, db ReadReplicaDatabase) error {
branchRef := ref.NewBranchRef(branch)
var branchExists bool
branches, err := db.ddb.GetBranches(ctx)
// ensureReplicaHeadExists tries to pull the latest version of a remote branch. Will fail if the branch
// does not exist on the ReadReplicaDatabase's remote.
func (p DoltDatabaseProvider) ensureReplicaHeadExists(ctx *sql.Context, branch string, db ReadReplicaDatabase) error {
_, branchExists, err := db.ddb.HasBranch(ctx, branch)
if err != nil {
return err
}
for _, br := range branches {
if br.String() == branch {
branchExists = true
break
}
}
// check whether branch is on remote before creating local tracking branch
cm, err := actions.FetchRemoteBranch(ctx, db.tmpDir, db.remote, db.srcDB, db.DbData().Ddb, branchRef, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
if err != nil {
return err
}
cmHash, err := cm.HashOf()
if err != nil {
return err
}
// create refs/heads/branch dataset
if !branchExists {
err = db.ddb.NewBranchAtCommit(ctx, branchRef, cm)
if err != nil {
return err
}
return db.CreateLocalBranchFromRemote(ctx, ref.NewBranchRef(branch))
}
dSess := dsess.DSessFromSess(ctx.Session)
currentBranchRef, err := dSess.CWBHeadRef(ctx, db.name)
if err != nil {
return err
}
// create workingSets/heads/branch and update the working set
err = db.RebaseSourceDb(ctx)
if err != nil {
return err
}
err = pullBranches(ctx, db, []doltdb.RefWithHash{{
Ref: branchRef,
Hash: cmHash,
}}, nil, currentBranchRef, pullBehavior_fastForward)
if err != nil {
return err
}
return nil
}

View File

@@ -21,6 +21,7 @@ import (
"strings"
"sync"
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
@@ -215,8 +216,59 @@ func (rrd ReadReplicaDatabase) PullFromRemote(ctx *sql.Context) error {
return nil
}
func (rrd ReadReplicaDatabase) RebaseSourceDb(ctx *sql.Context) error {
return rrd.srcDB.Rebase(ctx)
// CreateLocalBranchFromRemote pulls the given branch from the remote database and creates a local tracking branch for it.
func (rrd ReadReplicaDatabase) CreateLocalBranchFromRemote(ctx *sql.Context, branchRef ref.BranchRef) error {
_, err := rrd.limiter.Run(ctx, "pullNewBranch", func() (any, error) {
// because several clients can queue up waiting to create the same local branch, double check to see if this
// work was already done and bail early if so
_, branchExists, err := rrd.ddb.HasBranch(ctx, branchRef.GetPath())
if err != nil {
return nil, err
}
if branchExists {
return nil, nil
}
cm, err := actions.FetchRemoteBranch(ctx, rrd.tmpDir, rrd.remote, rrd.srcDB, rrd.ddb, branchRef, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
if err != nil {
return nil, err
}
cmHash, err := cm.HashOf()
if err != nil {
return nil, err
}
// create refs/heads/branch dataset
err = rrd.ddb.NewBranchAtCommit(ctx, branchRef, cm)
if err != nil {
return nil, err
}
dSess := dsess.DSessFromSess(ctx.Session)
currentBranchRef, err := dSess.CWBHeadRef(ctx, rrd.name)
if err != nil {
return nil, err
}
err = rrd.srcDB.Rebase(ctx)
if err != nil {
return nil, err
}
err = pullBranches(ctx, rrd, []doltdb.RefWithHash{{
Ref: branchRef,
Hash: cmHash,
}}, nil, currentBranchRef, pullBehavior_fastForward)
if err != nil {
return nil, err
}
return nil, err
})
return err
}
type pullBehavior bool