drain after break

This commit is contained in:
Andy Arthur
2021-06-24 16:34:46 -07:00
parent fb858ced8f
commit 8d668547c5

View File

@@ -161,7 +161,6 @@ func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcDB, sink
}
wr, err := nbs.NewCmpChunkTableWriter(tempDir)
if err != nil {
return nil, err
}
@@ -386,11 +385,6 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
twDetails.ChunksBuffered = 0
for cmpAndRef := range processed {
if err != nil {
// drain to prevent deadlock
continue
}
twDetails.ChunksBuffered++
if twDetails.ChunksBuffered%1000 == 0 {
@@ -400,7 +394,7 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
err = p.wr.AddCmpChunk(cmpAndRef.cmpChnk)
if ae.SetIfError(err) {
continue
break
}
if p.wr.Size() >= p.chunksPerTF {
@@ -408,7 +402,7 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
p.wr, err = nbs.NewCmpChunkTableWriter(p.tempDir)
if ae.SetIfError(err) {
continue
break
}
}
@@ -425,7 +419,9 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
}
}
}
for _ = range processed {
// drain to prevent deadlock
}
if err := ae.Get(); err != nil {
return nil, nil, err
}