Merge pull request #4379 from dolthub/aaron/doltdb-commit_hooks-push-logic-cleanup

[no-release-notes] go/libraries/doltcore/doltdb: Cleanup commit_hooks pull chunks duplication.
This commit is contained in:
Aaron Son
2022-09-21 16:37:32 -07:00
committed by GitHub
3 changed files with 21 additions and 72 deletions
+12 -69
View File
@@ -25,7 +25,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/datas/pull"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)
@@ -51,19 +50,18 @@ func NewPushOnWriteHook(destDB *DoltDB, tmpDir string) *PushOnWriteHook {
// Execute implements CommitHook, replicates head updates to the destDb field
func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
// TODO: this code and pushDataset are largely duplicated from doltDb.PullChunks.
// Clean it up, and preferably make more db stores capable of using the puller interface
if datas.CanUsePuller(db) && datas.CanUsePuller(ph.destDB) {
return pushDatasetWithPuller(ctx, ph.destDB, db, ph.tmpDir, ds)
}
return ph.pushDataset(ctx, ds, db)
return pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir)
}
func (ph *PushOnWriteHook) pushDataset(ctx context.Context, ds datas.Dataset, db datas.Database) error {
func pushDataset(ctx context.Context, destDB, srcDB datas.Database, ds datas.Dataset, tmpDir string) error {
addr, ok := ds.MaybeHeadAddr()
if !ok {
_, err := ph.destDB.Delete(ctx, ds)
_, err := destDB.Delete(ctx, ds)
return err
}
err := pullHash(ctx, destDB, srcDB, addr, tmpDir, nil, nil)
if err != nil {
return err
}
@@ -72,21 +70,12 @@ func (ph *PushOnWriteHook) pushDataset(ctx context.Context, ds datas.Dataset, db
return err
}
srcCS := datas.ChunkStoreFromDatabase(db)
destCS := datas.ChunkStoreFromDatabase(ph.destDB)
waf := types.WalkAddrsForNBF(ph.fmt)
err = pull.Pull(ctx, srcCS, destCS, waf, addr, nil)
ds, err = destDB.GetDataset(ctx, rf.String())
if err != nil {
return err
}
ds, err = ph.destDB.GetDataset(ctx, rf.String())
if err != nil {
return err
}
_, err = ph.destDB.SetHead(ctx, ds, addr)
_, err = destDB.SetHead(ctx, ds, addr)
return err
}
@@ -107,46 +96,6 @@ func (ph *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {
return nil
}
// replicate pushes a dataset from srcDB to destDB and force sets the destDB ref to the new dataset value
func pushDatasetWithPuller(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error {
addr, ok := ds.MaybeHeadAddr()
if !ok {
_, err := destDB.Delete(ctx, ds)
return err
}
rf, err := ref.Parse(ds.ID())
if err != nil {
return err
}
srcCS := datas.ChunkStoreFromDatabase(srcDB)
destCS := datas.ChunkStoreFromDatabase(destDB)
waf, err := types.WalkAddrsForChunkStore(srcCS)
if err != nil {
return err
}
puller, err := pull.NewPuller(ctx, tempTableDir, defaultChunksPerTF, srcCS, destCS, waf, addr, nil)
if err != nil && err != pull.ErrDBUpToDate {
return err
}
if err != pull.ErrDBUpToDate {
err = puller.Pull(ctx)
if err != nil {
return err
}
}
ds, err = destDB.GetDataset(ctx, rf.String())
if err != nil {
return err
}
_, err = destDB.SetHead(ctx, ds, addr)
return err
}
type PushArg struct {
ds datas.Dataset
db datas.Database
@@ -292,21 +241,15 @@ func RunAsyncReplicationThreads(bThreads *sql.BackgroundThreads, ch chan PushArg
return newHeadsCopy
}
isNewHeads := func(newHeads map[string]PushArg) bool {
defer mu.Unlock()
mu.Lock()
return len(newHeads) != 0
}
flush := func(newHeads map[string]PushArg, latestHeads map[string]hash.Hash) {
newHeadsCopy := getHeadsCopy()
if !isNewHeads(newHeadsCopy) {
if len(newHeadsCopy) == 0 {
return
}
for id, newCm := range newHeadsCopy {
if latest, ok := latestHeads[id]; !ok || latest != newCm.hash {
// use background context to drain after sql context is canceled
err := pushDatasetWithPuller(context.Background(), destDB.db, newCm.db, tmpDir, newCm.ds)
err := pushDataset(context.Background(), destDB.db, newCm.db, newCm.ds, tmpDir)
if err != nil {
logger.Write([]byte("replication failed: " + err.Error()))
}
+7 -3
View File
@@ -1238,11 +1238,15 @@ func (ddb *DoltDB) pruneUnreferencedDatasets(ctx context.Context) error {
// given, pulling all chunks reachable from the given targetHash. Pull progress
// is communicated over the provided channel.
func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB, targetHash hash.Hash, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
srcCS := datas.ChunkStoreFromDatabase(srcDB.db)
destCS := datas.ChunkStoreFromDatabase(ddb.db)
return pullHash(ctx, ddb.db, srcDB.db, targetHash, tempDir, progChan, statsCh)
}
func pullHash(ctx context.Context, destDB, srcDB datas.Database, targetHash hash.Hash, tempDir string, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
srcCS := datas.ChunkStoreFromDatabase(srcDB)
destCS := datas.ChunkStoreFromDatabase(destDB)
waf := types.WalkAddrsForNBF(srcDB.Format())
if datas.CanUsePuller(srcDB.db) && datas.CanUsePuller(ddb.db) {
if datas.CanUsePuller(srcDB) && datas.CanUsePuller(destDB) {
puller, err := pull.NewPuller(ctx, tempDir, defaultChunksPerTF, srcCS, destCS, waf, targetHash, statsCh)
if err == pull.ErrDBUpToDate {
return nil
+2
View File
@@ -136,6 +136,8 @@ type Database interface {
// if this operation is not supported.
StatsSummary() string
Format() *types.NomsBinFormat
// chunkStore returns the ChunkStore used to read and write
// groups of values to the database efficiently. This interface is a low-
// level detail of the database that should infrequently be needed by