From ec1bcab2f9d1b092ac9aad02bd9ac51df967805b Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 17 Feb 2023 16:38:52 -0800 Subject: [PATCH] go/store/nbs: aws_table_persister.go: Fix conjoin code to not error on a single short Read() by using io.ReadFull. In #5307 we started returning a real io.Reader for chunkSource.reader(ctx). That means the Read() calls can come back short if they are directly attached to a net.TCPConn, for example. It is not an error for a Read() to come back short. Use io.ReadFull to correctly express the intent in the conjoin code for AWS remotes. Fixes a bug where doltremoteapi fails commits sometimes. --- go/store/nbs/aws_table_persister.go | 27 ++++++++++++++---------- go/store/nbs/aws_table_persister_test.go | 2 +- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 8f1cafd2d0..f27c85b10e 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -25,6 +25,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "net/url" "sort" @@ -392,9 +393,9 @@ func (s3p awsTablePersister) assembleTable(ctx context.Context, plan compactionP readWg.Add(1) go func(m manualPart) { defer readWg.Done() - n, _ := m.srcR.Read(buff[m.dstStart:m.dstEnd]) - if int64(n) < m.dstEnd-m.dstStart { - ae.SetIfError(errors.New("failed to read all the table data")) + err := m.run(ctx, buff) + if err != nil { + ae.SetIfError(fmt.Errorf("failed to read conjoin table data: %w", err)) } }(man) } @@ -507,8 +508,17 @@ type copyPart struct { } type manualPart struct { - srcR io.Reader - dstStart, dstEnd int64 + src chunkSource + start, end int64 +} + +func (mp manualPart) run(ctx context.Context, buff []byte) error { + reader, _, err := mp.src.reader(ctx) + if err != nil { + return err + } + _, err = io.ReadFull(reader, buff[mp.start:mp.end]) + return err } // dividePlan assumes that plan.sources (which is of type chunkSourcesByDescendingDataSize) is correctly sorted by descending data size. @@ -545,12 +555,7 @@ func dividePlan(ctx context.Context, plan compactionPlan, minPartSize, maxPartSi var offset int64 for ; i < len(plan.sources.sws); i++ { sws := plan.sources.sws[i] - rdr, _, err := sws.source.reader(ctx) - if err != nil { - return nil, nil, nil, err - } - - manuals = append(manuals, manualPart{rdr, offset, offset + int64(sws.dataLen)}) + manuals = append(manuals, manualPart{sws.source, offset, offset + int64(sws.dataLen)}) offset += int64(sws.dataLen) buffSize += sws.dataLen } diff --git a/go/store/nbs/aws_table_persister_test.go b/go/store/nbs/aws_table_persister_test.go index 4eec8f821b..49df3f4ef8 100644 --- a/go/store/nbs/aws_table_persister_test.go +++ b/go/store/nbs/aws_table_persister_test.go @@ -304,7 +304,7 @@ func TestAWSTablePersisterDividePlan(t *testing.T) { assert.Len(manuals, 1) ti, err = tooSmall.index() require.NoError(t, err) - assert.EqualValues(calcChunkRangeSize(ti), manuals[0].dstEnd-manuals[0].dstStart) + assert.EqualValues(calcChunkRangeSize(ti), manuals[0].end-manuals[0].start) } func TestAWSTablePersisterCalcPartSizes(t *testing.T) {