proper nested waiting for conflict streaming map

This commit is contained in:
Andy Arthur
2021-10-12 14:45:56 -07:00
parent feca3225c4
commit 42c6164aa4
2 changed files with 10 additions and 5 deletions

View File

@@ -444,14 +444,19 @@ func mergeTableData(ctx context.Context, vrw types.ValueReadWriter, tblName stri
return nil
})
var conflicts types.Map
eg.Go(func() error {
var err error
// |sm|'s errgroup is a child of |eg|
// so we must wait here, before |eg| finishes
conflicts, err = sm.Wait()
return err
})
if err := eg.Wait(); err != nil {
return nil, types.EmptyMap, nil, err
}
conflicts, err := sm.Wait()
if err != nil {
return nil, types.EmptyMap, nil, err
}
newRoot, err := sess.Flush(ctx)
if err != nil {
return nil, types.EmptyMap, nil, err

View File

@@ -110,7 +110,7 @@ func NewMap(ctx context.Context, vrw ValueReadWriter, kv ...Value) (Map, error)
func NewStreamingMap(ctx context.Context, vrw ValueReadWriter, kvs <-chan Value) *StreamingMap {
d.PanicIfTrue(vrw == nil)
sm := &StreamingMap{}
sm.eg, sm.egCtx = errgroup.WithContext(context.TODO())
sm.eg, sm.egCtx = errgroup.WithContext(ctx)
sm.eg.Go(func() error {
m, err := readMapInput(sm.egCtx, vrw, kvs)
sm.m = m