diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 8f1cafd2d0..f27c85b10e 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -25,6 +25,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "net/url" "sort" @@ -392,9 +393,9 @@ func (s3p awsTablePersister) assembleTable(ctx context.Context, plan compactionP readWg.Add(1) go func(m manualPart) { defer readWg.Done() - n, _ := m.srcR.Read(buff[m.dstStart:m.dstEnd]) - if int64(n) < m.dstEnd-m.dstStart { - ae.SetIfError(errors.New("failed to read all the table data")) + err := m.run(ctx, buff) + if err != nil { + ae.SetIfError(fmt.Errorf("failed to read conjoin table data: %w", err)) } }(man) } @@ -507,8 +508,17 @@ type copyPart struct { } type manualPart struct { - srcR io.Reader - dstStart, dstEnd int64 + src chunkSource + start, end int64 +} + +func (mp manualPart) run(ctx context.Context, buff []byte) error { + reader, _, err := mp.src.reader(ctx) + if err != nil { + return err + } + _, err = io.ReadFull(reader, buff[mp.start:mp.end]) + return err } // dividePlan assumes that plan.sources (which is of type chunkSourcesByDescendingDataSize) is correctly sorted by descending data size. @@ -545,12 +555,7 @@ func dividePlan(ctx context.Context, plan compactionPlan, minPartSize, maxPartSi var offset int64 for ; i < len(plan.sources.sws); i++ { sws := plan.sources.sws[i] - rdr, _, err := sws.source.reader(ctx) - if err != nil { - return nil, nil, nil, err - } - - manuals = append(manuals, manualPart{rdr, offset, offset + int64(sws.dataLen)}) + manuals = append(manuals, manualPart{sws.source, offset, offset + int64(sws.dataLen)}) offset += int64(sws.dataLen) buffSize += sws.dataLen } diff --git a/go/store/nbs/aws_table_persister_test.go b/go/store/nbs/aws_table_persister_test.go index 4eec8f821b..49df3f4ef8 100644 --- a/go/store/nbs/aws_table_persister_test.go +++ b/go/store/nbs/aws_table_persister_test.go @@ -304,7 +304,7 @@ func TestAWSTablePersisterDividePlan(t *testing.T) { assert.Len(manuals, 1) ti, err = tooSmall.index() require.NoError(t, err) - assert.EqualValues(calcChunkRangeSize(ti), manuals[0].dstEnd-manuals[0].dstStart) + assert.EqualValues(calcChunkRangeSize(ti), manuals[0].end-manuals[0].start) } func TestAWSTablePersisterCalcPartSizes(t *testing.T) {