go/libraries/utils/pipeline/stage.go: Fix how we finalize a stage so that every consumer can still see nil in the success case.

This commit is contained in:
Aaron Son
2022-05-18 14:31:09 -07:00
parent ec7af0c507
commit edf0485148
+4 -8
View File
@@ -89,14 +89,7 @@ func (s *Stage) start(eg *errgroup.Group, ctx context.Context) {
defer func() {
if atomic.AddInt32(&stageWorkers, -1) == 0 {
if s.outCh != nil {
// To finalize our channel in the non-error case,
// we send a `nil` sentinel indicating we are done
// and then close the channel.
if rerr == nil {
select {
case <-ctx.Done():
case s.outCh <- nil:
}
close(s.outCh)
} else {
// In the error case, we do not want to close the
@@ -159,13 +152,16 @@ func (s *Stage) runPipelineStage(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case inBatch, ok := <-s.inCh:
if !ok {
if ctx.Err() != nil {
return ctx.Err()
}
err := s.transformBatch(ctx, inBatch)
if err != nil {
return err
}
if !ok {
return nil
}
}
}
}