Merge pull request #5386 from dolthub/aaron/nbs-aws-conjoin-fix-short-read-err

go/store/nbs: aws_table_persister.go: Fix conjoin code to not error on a single short Read() by using io.ReadFull.
This commit is contained in:
Aaron Son
2023-02-17 18:50:04 -08:00
committed by GitHub
2 changed files with 17 additions and 12 deletions

View File

@@ -25,6 +25,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/url"
"sort"
@@ -392,9 +393,9 @@ func (s3p awsTablePersister) assembleTable(ctx context.Context, plan compactionP
readWg.Add(1)
go func(m manualPart) {
defer readWg.Done()
n, _ := m.srcR.Read(buff[m.dstStart:m.dstEnd])
if int64(n) < m.dstEnd-m.dstStart {
ae.SetIfError(errors.New("failed to read all the table data"))
err := m.run(ctx, buff)
if err != nil {
ae.SetIfError(fmt.Errorf("failed to read conjoin table data: %w", err))
}
}(man)
}
@@ -507,8 +508,17 @@ type copyPart struct {
}
type manualPart struct {
srcR io.Reader
dstStart, dstEnd int64
src chunkSource
start, end int64
}
func (mp manualPart) run(ctx context.Context, buff []byte) error {
reader, _, err := mp.src.reader(ctx)
if err != nil {
return err
}
_, err = io.ReadFull(reader, buff[mp.start:mp.end])
return err
}
// dividePlan assumes that plan.sources (which is of type chunkSourcesByDescendingDataSize) is correctly sorted by descending data size.
@@ -545,12 +555,7 @@ func dividePlan(ctx context.Context, plan compactionPlan, minPartSize, maxPartSi
var offset int64
for ; i < len(plan.sources.sws); i++ {
sws := plan.sources.sws[i]
rdr, _, err := sws.source.reader(ctx)
if err != nil {
return nil, nil, nil, err
}
manuals = append(manuals, manualPart{rdr, offset, offset + int64(sws.dataLen)})
manuals = append(manuals, manualPart{sws.source, offset, offset + int64(sws.dataLen)})
offset += int64(sws.dataLen)
buffSize += sws.dataLen
}

View File

@@ -304,7 +304,7 @@ func TestAWSTablePersisterDividePlan(t *testing.T) {
assert.Len(manuals, 1)
ti, err = tooSmall.index()
require.NoError(t, err)
assert.EqualValues(calcChunkRangeSize(ti), manuals[0].dstEnd-manuals[0].dstStart)
assert.EqualValues(calcChunkRangeSize(ti), manuals[0].end-manuals[0].start)
}
func TestAWSTablePersisterCalcPartSizes(t *testing.T) {