Bug fix for pushing: need to SetHead on the remote DB

This commit is contained in:
Zach Musgrave
2022-09-06 20:12:11 -07:00
parent 9bfc968e04
commit ee6580f76e

View File

@@ -53,19 +53,45 @@ func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db dat
// TODO: this code and pushDataset are largely duplicated from doltDb.PullChunks.
// Clean it up, and preferably make more db stores capable of using the puller interface
if datas.CanUsePuller(db) && datas.CanUsePuller(ph.destDB) {
return pushDataset(ctx, ph.destDB, db, ph.tmpDir, ds)
return pushDatasetWithPuller(ctx, ph.destDB, db, ph.tmpDir, ds)
}
return ph.pushDataset(ctx, ds, db)
}
func (ph *PushOnWriteHook) pushDataset(ctx context.Context, ds datas.Dataset, db datas.Database) error {
targetHash, ok := ds.MaybeHeadAddr()
if !ok {
return fmt.Errorf("dataset empty")
}
addr, ok := ds.MaybeHeadAddr()
if !ok {
_, err := ph.destDB.Delete(ctx, ds)
return err
}
rf, err := ref.Parse(ds.ID())
if err != nil {
return err
}
srcCS := datas.ChunkStoreFromDatabase(db)
destCS := datas.ChunkStoreFromDatabase(ph.destDB)
waf := types.WalkAddrsForNBF(ph.fmt)
targetHash, ok := ds.MaybeHeadAddr()
if !ok {
return fmt.Errorf("dataset empty")
err = pull.Pull(ctx, srcCS, destCS, waf, targetHash, nil)
if err != nil {
return err
}
return pull.Pull(ctx, srcCS, destCS, waf, targetHash, nil)
ds, err = ph.destDB.GetDataset(ctx, rf.String())
if err != nil {
return err
}
_, err = ph.destDB.SetHead(ctx, ds, addr)
return err
}
// HandleError implements CommitHook
@@ -86,7 +112,7 @@ func (ph *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {
}
// replicate pushes a dataset from srcDB to destDB and force sets the destDB ref to the new dataset value
func pushDataset(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error {
func pushDatasetWithPuller(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error {
addr, ok := ds.MaybeHeadAddr()
if !ok {
_, err := destDB.Delete(ctx, ds)
@@ -284,7 +310,7 @@ func RunAsyncReplicationThreads(bThreads *sql.BackgroundThreads, ch chan PushArg
for id, newCm := range newHeadsCopy {
if latest, ok := latestHeads[id]; !ok || latest != newCm.hash {
// use background context to drain after sql context is canceled
err := pushDataset(context.Background(), destDB.db, newCm.db, tmpDir, newCm.ds)
err := pushDatasetWithPuller(context.Background(), destDB.db, newCm.db, tmpDir, newCm.ds)
if err != nil {
logger.Write([]byte("replication failed: " + err.Error()))
}