mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-04 19:41:26 -05:00
go/libraries/utils/pipeline: Fix racey finalization of a pipeline.
Before this change, when an initial stage function returned a non-nil err, it would return non-nil, resulting in closing its output channel and leaving the errgroup responsible for canceling the egCtx. The next transform stage would read `nil, false` on its inCh. That stage would forward the `nil` down the transform pipeline. If the inCh selects all resolved down the transform pipeline before any of the `<- ctx.Done()` selects, then the pipeline would see the exact same behavior for the error case as it would see for an `io.EOF` being reached in the first stage. Instead, transform stages in the pipeline should abort without forwarding data if they see a closed input channel. In turn, input stages should forward one last `nil` sentinel value to their output channel when the see an `io.EOF` before they close the channel.
This commit is contained in:
@@ -666,14 +666,6 @@ func runMultiStatementMode(ctx *sql.Context, se *engine.SqlEngine, input io.Read
|
||||
if err != nil {
|
||||
return errhand.VerboseErrorFromError(err)
|
||||
}
|
||||
if err != nil {
|
||||
verr := formatQueryError(fmt.Sprintf("error on line %d for query %s", scanner.statementStartLine, query), err)
|
||||
cli.PrintErrln(verr.Verbose())
|
||||
// If continueOnErr is set keep executing the remaining queries but print the error out anyway.
|
||||
if !continueOnErr {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
query = ""
|
||||
|
||||
@@ -115,24 +115,27 @@ func (s *Stage) start(eg *errgroup.Group, ctx context.Context) {
|
||||
// which move through the pipeline.
|
||||
func (s *Stage) runFirstStageInPipeline(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
iwp, err := s.stageFunc(ctx, nil)
|
||||
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
if err == io.EOF {
|
||||
// We send one last `nil` as an end-of-stream sentinel
|
||||
// before we close the channel.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case s.outCh <- nil:
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done(): // prevents potential write deadlock
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case s.outCh <- iwp:
|
||||
}
|
||||
@@ -145,14 +148,14 @@ func (s *Stage) runPipelineStage(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
|
||||
case inBatch, ok := <-s.inCh:
|
||||
err := s.transformBatch(ctx, inBatch)
|
||||
|
||||
if !ok && (err == io.EOF || err == nil) {
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
err := s.transformBatch(ctx, inBatch)
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -162,7 +165,6 @@ func (s *Stage) runPipelineStage(ctx context.Context) error {
|
||||
|
||||
func (s *Stage) transformBatch(ctx context.Context, inBatch []ItemWithProps) error {
|
||||
outBatch, err := s.stageFunc(ctx, inBatch)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user