mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-04 10:25:17 -06:00
/go/libraries/doltcore/sqle/dsess/globalstate.go: parallelize global state also
This commit is contained in:
@@ -22,6 +22,7 @@ import (
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/fatih/color"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
|
||||
@@ -46,43 +47,75 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol
|
||||
rootRefs = append(rootRefs, branches...)
|
||||
rootRefs = append(rootRefs, remotes...)
|
||||
|
||||
var roots []doltdb.Rootish
|
||||
for _, b := range rootRefs {
|
||||
switch b.GetType() {
|
||||
case ref.BranchRefType:
|
||||
wsRef, err := ref.WorkingSetRefForHead(b)
|
||||
if err != nil {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: working set error: elapsed: %v\n", time.Since(start))
|
||||
return GlobalStateImpl{}, err
|
||||
}
|
||||
rootRefsChan := make(chan doltdb.Rootish, len(rootRefs))
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
ws, err := db.ResolveWorkingSet(ctx, wsRef)
|
||||
if err == doltdb.ErrWorkingSetNotFound {
|
||||
// use the branch head if there isn't a working set for it
|
||||
cm, err := db.ResolveCommitRef(ctx, b)
|
||||
eg.Go(func() error {
|
||||
defer close(rootRefsChan)
|
||||
wg.Wait()
|
||||
return nil
|
||||
})
|
||||
|
||||
for _, b := range rootRefs {
|
||||
wg.Add(1)
|
||||
eg.Go(func() error {
|
||||
defer wg.Done()
|
||||
switch b.GetType() {
|
||||
case ref.BranchRefType:
|
||||
wsRef, err := ref.WorkingSetRefForHead(b)
|
||||
if err != nil {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit error: elapsed: %v\n", time.Since(start))
|
||||
return GlobalStateImpl{}, err
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: working set error: elapsed: %v\n", time.Since(start))
|
||||
return err
|
||||
}
|
||||
roots = append(roots, cm)
|
||||
} else if err != nil {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve working set error: elapsed: %v\n", time.Since(start))
|
||||
return GlobalStateImpl{}, err
|
||||
} else {
|
||||
roots = append(roots, ws)
|
||||
|
||||
ws, err := db.ResolveWorkingSet(egCtx, wsRef)
|
||||
if err == doltdb.ErrWorkingSetNotFound {
|
||||
// use the branch head if there isn't a working set for it
|
||||
cm, err := db.ResolveCommitRef(egCtx, b)
|
||||
if err != nil {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit error: elapsed: %v\n", time.Since(start))
|
||||
return err
|
||||
}
|
||||
rootRefsChan <- cm
|
||||
} else if err != nil {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve working set error: elapsed: %v\n", time.Since(start))
|
||||
return err
|
||||
} else {
|
||||
rootRefsChan <- ws
|
||||
}
|
||||
case ref.RemoteRefType:
|
||||
cm, err := db.ResolveCommitRef(egCtx, b)
|
||||
if err != nil {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit remote ref error: elapsed: %v\n", time.Since(start))
|
||||
return err
|
||||
}
|
||||
rootRefsChan <- cm
|
||||
}
|
||||
case ref.RemoteRefType:
|
||||
cm, err := db.ResolveCommitRef(ctx, b)
|
||||
if err != nil {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: resolve commit remote ref error: elapsed: %v\n", time.Since(start))
|
||||
return GlobalStateImpl{}, err
|
||||
}
|
||||
roots = append(roots, cm)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
err = eg.Wait()
|
||||
if err != nil {
|
||||
return GlobalStateImpl{}, err
|
||||
}
|
||||
|
||||
if len(rootRefsChan) != len(rootRefs) {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: rootRefsChan does not equal rootRefs\n")
|
||||
}
|
||||
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: success: elapsed: %v\n", time.Since(start))
|
||||
|
||||
var roots []doltdb.Rootish
|
||||
for rootRef := range rootRefsChan {
|
||||
roots = append(roots, rootRef)
|
||||
}
|
||||
|
||||
if len(roots) != len(rootRefs) {
|
||||
fmt.Fprintf(color.Output, "DUSTIN: NewGlobalStateStoreForDb: roots does not equal rootRefs\n")
|
||||
}
|
||||
|
||||
tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...)
|
||||
if err != nil {
|
||||
return GlobalStateImpl{}, err
|
||||
|
||||
Reference in New Issue
Block a user