Merge pull request #1252 from dolthub/aaron/dolt-s3-table-reader-parallelize-large-index-loads

go/store/nbs: s3_table_reader: Parallelize loading large table file indexes.
This commit is contained in:
Aaron Son
2021-01-26 10:57:13 -08:00
committed by GitHub
3 changed files with 69 additions and 15 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2019 Dolthub, Inc.
// Copyright 2019-2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -68,7 +68,7 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead
size := indexSize(chunkCount) + footerSize
buff := make([]byte, size)
n, err := s3.ReadFromEnd(ctx, name, buff, stats)
n, _, err := s3.ReadFromEnd(ctx, name, buff, stats)
if err != nil {
return nil, &dynamoTableReaderAt{}, err

View File

@@ -1,4 +1,4 @@
// Copyright 2019 Dolthub, Inc.
// Copyright 2019-2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -27,6 +27,9 @@ import (
"io"
"net"
"os"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
@@ -34,6 +37,7 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/jpillora/backoff"
"golang.org/x/sync/errgroup"
)
const (
@@ -111,7 +115,7 @@ func (s3or *s3ObjectReader) ReadAt(ctx context.Context, name addr, p []byte, off
stats.S3ReadLatency.SampleTimeSince(t1)
}()
n, err = s3or.readRange(ctx, name, p, s3RangeHeader(off, int64(len(p))))
n, _, err = s3or.readRange(ctx, name, p, s3RangeHeader(off, int64(len(p))))
return
}
@@ -120,17 +124,56 @@ func s3RangeHeader(off, length int64) string {
return fmt.Sprintf("%s=%d-%d", s3RangePrefix, off, lastByte)
}
func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte, stats *Stats) (n int, err error) {
const maxS3ReadFromEndReqSize = 256 * 1024 * 1024 // 256MB
const preferredS3ReadFromEndReqSize = 128 * 1024 * 1024 // 128MB
func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte, stats *Stats) (n int, sz uint64, err error) {
// TODO: enable this to use the tableCache. The wrinkle is the tableCache currently just returns a ReaderAt, which doesn't give you the length of the object that backs it, so you can't calculate an offset if all you know is that you want the last N bytes.
defer func(t1 time.Time) {
stats.S3BytesPerRead.Sample(uint64(len(p)))
stats.S3ReadLatency.SampleTimeSince(t1)
}(time.Now())
totalN := uint64(0)
if len(p) > maxS3ReadFromEndReqSize {
// If we're bigger than 256MB, parallelize the read...
// Read the footer first and capture the size of the entire table file.
n, sz, err := s3or.readRange(ctx, name, p[len(p)-footerSize:], fmt.Sprintf("%s=-%d", s3RangePrefix, footerSize))
if err != nil {
return n, sz, err
}
totalN += uint64(n)
eg, egctx := errgroup.WithContext(ctx)
start := 0
for start < len(p)-footerSize {
// Make parallel read requests of up to 128MB.
end := start + preferredS3ReadFromEndReqSize
if end > len(p)-footerSize {
end = len(p) - footerSize
}
bs := p[start:end]
rangeStart := sz - uint64(len(p)) + uint64(start)
rangeEnd := sz - uint64(len(p)) + uint64(end) - 1
eg.Go(func() error {
n, _, err := s3or.readRange(egctx, name, bs, fmt.Sprintf("%s=%d-%d", s3RangePrefix, rangeStart, rangeEnd))
if err != nil {
return err
}
atomic.AddUint64(&totalN, uint64(n))
return nil
})
start = end
}
err = eg.Wait()
if err != nil {
return 0, 0, err
}
return int(totalN), sz, nil
}
return s3or.readRange(ctx, name, p, fmt.Sprintf("%s=-%d", s3RangePrefix, len(p)))
}
func (s3or *s3ObjectReader) readRange(ctx context.Context, name addr, p []byte, rangeHeader string) (n int, err error) {
read := func() (int, error) {
func (s3or *s3ObjectReader) readRange(ctx context.Context, name addr, p []byte, rangeHeader string) (n int, sz uint64, err error) {
read := func() (int, uint64, error) {
if s3or.readRl != nil {
s3or.readRl <- struct{}{}
defer func() {
@@ -146,19 +189,30 @@ func (s3or *s3ObjectReader) readRange(ctx context.Context, name addr, p []byte,
result, err := s3or.s3.GetObjectWithContext(ctx, input)
if err != nil {
return 0, err
return 0, 0, err
}
defer result.Body.Close()
if *result.ContentLength != int64(len(p)) {
return 0, fmt.Errorf("failed to read entire range, key: %v, len(p): %d, rangeHeader: %s, ContentLength: %d", s3or.key(name.String()), len(p), rangeHeader, *result.ContentLength)
return 0, 0, fmt.Errorf("failed to read entire range, key: %v, len(p): %d, rangeHeader: %s, ContentLength: %d", s3or.key(name.String()), len(p), rangeHeader, *result.ContentLength)
}
sz := uint64(0)
if result.ContentRange != nil {
i := strings.Index(*result.ContentRange, "/")
if i != -1 {
sz, err = strconv.ParseUint((*result.ContentRange)[i+1:], 10, 64)
if err != nil {
return 0, 0, err
}
}
}
n, err = io.ReadFull(result.Body, p)
return n, err
return n, sz, err
}
n, err = read()
n, sz, err = read()
// We hit the point of diminishing returns investigating #3255, so add retries. In conversations with AWS people, it's not surprising to get transient failures when talking to S3, though SDKs are intended to have their own retrying. The issue may be that, in Go, making the S3 request and reading the data are separate operations, and the SDK kind of can't do its own retrying to handle failures in the latter.
if isConnReset(err) {
// We are backing off here because its possible and likely that the rate of requests to S3 is the underlying issue.
@@ -168,13 +222,13 @@ func (s3or *s3ObjectReader) readRange(ctx context.Context, name addr, p []byte,
Factor: 2,
Jitter: true,
}
for ; isConnReset(err); n, err = read() {
for ; isConnReset(err); n, sz, err = read() {
dur := b.Duration()
time.Sleep(dur)
}
}
return n, err
return n, sz, err
}
func isConnReset(err error) bool {

View File

@@ -23,7 +23,7 @@ import (
"strings"
)
var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2019-2020) Dolthub, Inc.
var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2021|2019-2020|2019-2021|2020-2021) Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 \(the "License"\);
// you may not use this file except in compliance with the License.
@@ -39,7 +39,7 @@ var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2019-2020) Dolt
`)
var ExpectedHeaderForFileFromNoms = regexp.MustCompile(`// Copyright (2019|2020|2019-2020) Dolthub, Inc.
var ExpectedHeaderForFileFromNoms = regexp.MustCompile(`// Copyright (2019|2020|2021|2019-2020|2019-2021|2020-2021) Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 \(the "License"\);
// you may not use this file except in compliance with the License.