From 42c6164aa4fffc465320c9427e2ebf38d17b0730 Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Tue, 12 Oct 2021 14:45:56 -0700 Subject: [PATCH] proper nested waiting for conflict streaming map --- go/libraries/doltcore/merge/merge.go | 13 +++++++++---- go/store/types/map.go | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/go/libraries/doltcore/merge/merge.go b/go/libraries/doltcore/merge/merge.go index d18974c69d..77ffdf0a6f 100644 --- a/go/libraries/doltcore/merge/merge.go +++ b/go/libraries/doltcore/merge/merge.go @@ -444,14 +444,19 @@ func mergeTableData(ctx context.Context, vrw types.ValueReadWriter, tblName stri return nil }) + var conflicts types.Map + eg.Go(func() error { + var err error + // |sm|'s errgroup is a child of |eg| + // so we must wait here, before |eg| finishes + conflicts, err = sm.Wait() + return err + }) + if err := eg.Wait(); err != nil { return nil, types.EmptyMap, nil, err } - conflicts, err := sm.Wait() - if err != nil { - return nil, types.EmptyMap, nil, err - } newRoot, err := sess.Flush(ctx) if err != nil { return nil, types.EmptyMap, nil, err diff --git a/go/store/types/map.go b/go/store/types/map.go index f6b623a6f3..65c2587913 100644 --- a/go/store/types/map.go +++ b/go/store/types/map.go @@ -110,7 +110,7 @@ func NewMap(ctx context.Context, vrw ValueReadWriter, kv ...Value) (Map, error) func NewStreamingMap(ctx context.Context, vrw ValueReadWriter, kvs <-chan Value) *StreamingMap { d.PanicIfTrue(vrw == nil) sm := &StreamingMap{} - sm.eg, sm.egCtx = errgroup.WithContext(context.TODO()) + sm.eg, sm.egCtx = errgroup.WithContext(ctx) sm.eg.Go(func() error { m, err := readMapInput(sm.egCtx, vrw, kvs) sm.m = m