go/store/nbs: aws_table_persister.go: Fix conjoin code to not error on a single short Read() by using io.ReadFull.

In #5307 we started returning a real io.Reader for chunkSource.reader(ctx).
That means the Read() calls can come back short if they are directly attached
to a net.TCPConn, for example. It is not an error for a Read() to come back
short. Use io.ReadFull to correctly express the intent in the conjoin code for
AWS remotes.

Fixes a bug where doltremoteapi fails commits sometimes.
This commit is contained in:
Aaron Son
2023-02-17 16:38:52 -08:00
parent 6880634515
commit ec1bcab2f9
2 changed files with 17 additions and 12 deletions
+16 -11
View File
@@ -25,6 +25,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/url"
"sort"
@@ -392,9 +393,9 @@ func (s3p awsTablePersister) assembleTable(ctx context.Context, plan compactionP
readWg.Add(1)
go func(m manualPart) {
defer readWg.Done()
n, _ := m.srcR.Read(buff[m.dstStart:m.dstEnd])
if int64(n) < m.dstEnd-m.dstStart {
ae.SetIfError(errors.New("failed to read all the table data"))
err := m.run(ctx, buff)
if err != nil {
ae.SetIfError(fmt.Errorf("failed to read conjoin table data: %w", err))
}
}(man)
}
@@ -507,8 +508,17 @@ type copyPart struct {
}
type manualPart struct {
srcR io.Reader
dstStart, dstEnd int64
src chunkSource
start, end int64
}
func (mp manualPart) run(ctx context.Context, buff []byte) error {
reader, _, err := mp.src.reader(ctx)
if err != nil {
return err
}
_, err = io.ReadFull(reader, buff[mp.start:mp.end])
return err
}
// dividePlan assumes that plan.sources (which is of type chunkSourcesByDescendingDataSize) is correctly sorted by descending data size.
@@ -545,12 +555,7 @@ func dividePlan(ctx context.Context, plan compactionPlan, minPartSize, maxPartSi
var offset int64
for ; i < len(plan.sources.sws); i++ {
sws := plan.sources.sws[i]
rdr, _, err := sws.source.reader(ctx)
if err != nil {
return nil, nil, nil, err
}
manuals = append(manuals, manualPart{rdr, offset, offset + int64(sws.dataLen)})
manuals = append(manuals, manualPart{sws.source, offset, offset + int64(sws.dataLen)})
offset += int64(sws.dataLen)
buffSize += sws.dataLen
}
+1 -1
View File
@@ -304,7 +304,7 @@ func TestAWSTablePersisterDividePlan(t *testing.T) {
assert.Len(manuals, 1)
ti, err = tooSmall.index()
require.NoError(t, err)
assert.EqualValues(calcChunkRangeSize(ti), manuals[0].dstEnd-manuals[0].dstStart)
assert.EqualValues(calcChunkRangeSize(ti), manuals[0].end-manuals[0].start)
}
func TestAWSTablePersisterCalcPartSizes(t *testing.T) {