Bug fix for commit-locked databases with replication

This commit is contained in:
Zach Musgrave
2022-03-21 14:29:25 -07:00
parent 0fe92461e5
commit 3d1fd2b236

View File

@@ -244,23 +244,31 @@ func (p DoltDatabaseProvider) databaseForRevision(ctx context.Context, revDB str
return nil, dsess.InitialDbState{}, false, nil
}
if replicaDb, ok := srcDb.(ReadReplicaDatabase); ok {
// TODO move this out of analysis phase, should only happen at read time
err := switchAndFetchReplicaHead(ctx, revSpec, replicaDb)
if err != nil {
return nil, dsess.InitialDbState{}, false, err
}
}
if isBranch(ctx, srcDb.DbData().Ddb, revSpec) {
db, init, err := dbRevisionForBranch(ctx, srcDb, revSpec)
if err != nil {
return nil, dsess.InitialDbState{}, false, err
}
// fetch the replica head if this is a replicated db
if replicaDb, ok := srcDb.(ReadReplicaDatabase); ok {
// TODO move this out of analysis phase, should only happen at read time
err := switchAndFetchReplicaHead(ctx, revSpec, replicaDb)
if err != nil {
return nil, dsess.InitialDbState{}, false, err
}
}
return db, init, true, nil
}
if doltdb.IsValidCommitHash(revSpec) {
// TODO: this should be an interface, not a struct
replicaDb, ok := srcDb.(ReadReplicaDatabase)
if ok {
srcDb = replicaDb.Database
}
srcDb, ok = srcDb.(Database)
if !ok {
return nil, dsess.InitialDbState{}, false, nil
@@ -297,16 +305,11 @@ func (p DoltDatabaseProvider) Function(ctx *sql.Context, name string) (sql.Funct
// switchAndFetchReplicaHead tries to pull the latest version of a branch. Will fail if the branch
// does not exist on the ReadReplicaDatabase's remote. If the target branch is not a replication
// head, the new branch will not be continuously fetched.
func switchAndFetchReplicaHead(ctx context.Context, branch string, db sql.Database) error {
destDb, ok := db.(ReadReplicaDatabase)
if !ok {
return ErrFailedToCastToReplicaDb
}
func switchAndFetchReplicaHead(ctx context.Context, branch string, db ReadReplicaDatabase) error {
branchRef := ref.NewBranchRef(branch)
var branchExists bool
branches, err := destDb.ddb.GetBranches(ctx)
branches, err := db.ddb.GetBranches(ctx)
if err != nil {
return err
}
@@ -319,14 +322,14 @@ func switchAndFetchReplicaHead(ctx context.Context, branch string, db sql.Databa
}
// check whether branch is on remote before creating local tracking branch
cm, err := actions.FetchRemoteBranch(ctx, destDb.tmpDir, destDb.remote, destDb.srcDB, destDb.DbData().Ddb, branchRef, nil, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
cm, err := actions.FetchRemoteBranch(ctx, db.tmpDir, db.remote, db.srcDB, db.DbData().Ddb, branchRef, nil, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
if err != nil {
return err
}
// create refs/heads/branch dataset
if !branchExists {
err = destDb.ddb.NewBranchAtCommit(ctx, branchRef, cm)
err = db.ddb.NewBranchAtCommit(ctx, branchRef, cm)
if err != nil {
return err
}
@@ -334,13 +337,13 @@ func switchAndFetchReplicaHead(ctx context.Context, branch string, db sql.Databa
// update ReadReplicaRemote with new HEAD
// dolt_replicate_heads configuration remains unchanged
destDb, err = destDb.SetHeadRef(branchRef)
db, err = db.SetHeadRef(branchRef)
if err != nil {
return err
}
// create workingSets/heads/branch and update the working set
err = pullBranches(ctx, destDb, []string{branch})
err = pullBranches(ctx, db, []string{branch})
if err != nil {
return err
}