More cleanup and tests for AWS support of archives

This commit is contained in:
Neil Macneale IV
2025-02-25 11:02:30 -08:00
parent f71585642f
commit 9c9cb8be28
6 changed files with 142 additions and 105 deletions

View File

@@ -50,6 +50,30 @@ func newArchiveChunkSource(ctx context.Context, dir string, h hash.Hash, chunkCo
return archiveChunkSource{archiveFile, aRdr}, nil
}
func newAWSArchiveChunkSource(ctx context.Context,
s3 *s3ObjectReader,
al awsLimits,
name string,
chunkCount uint32,
q MemoryQuotaProvider,
stats *Stats) (cs chunkSource, err error) {
footer := make([]byte, archiveFooterSize)
// sz is what we are really after here, but we'll use the bytes to construct the footer to avoid another call.
_, sz, err := s3.readRange(ctx, name, footer, httpEndRangeHeader(int(archiveFooterSize)))
if err != nil {
return emptyChunkSource{}, err
}
rdr := s3ReaderAt{name, s3}
aRdr, err := newArchiveReaderFromFooter(rdr, sz, footer)
if err != nil {
return archiveChunkSource{}, err
}
return archiveChunkSource{"", aRdr}, nil
}
func openReader(file string) (io.ReaderAt, uint64, error) {
f, err := os.Open(file)
if err != nil {
@@ -149,15 +173,13 @@ func (acs archiveChunkSource) currentSize() uint64 {
return acs.aRdr.footer.fileSize
}
// reader returns a reader for the entire archive file.
func (acs archiveChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, error) {
rdr := acs.aRdr.reader
chks, err := acs.count()
if err != nil {
return nil, 0, err
}
fz := acs.currentSize()
rc := io.NewSectionReader(rdr, 0, int64(acs.currentSize()))
return io.NopCloser(rc), uint64(chks), nil
rc := io.NewSectionReader(rdr, 0, int64(fz))
return io.NopCloser(rc), fz, nil
}
func (acs archiveChunkSource) uncompressedLen() (uint64, error) {
return 0, errors.New("Archive chunk source does not support uncompressedLen")
@@ -177,7 +199,7 @@ func (acs archiveChunkSource) clone() (chunkSource, error) {
rdr = acs.aRdr.clone(newReader)
} else {
// NM4 - S3 reader is stateless, so we can just use the same one.
// S3 reader is stateless, so we can just use the same one.
rdr = acs.aRdr.clone(acs.aRdr.reader)
}

View File

@@ -136,17 +136,35 @@ func newArchiveMetadata(reader io.ReaderAt, fileSize uint64) (*ArchiveMetadata,
}, nil
}
func newArchiveReaderFromFooter(reader io.ReaderAt, fileSz uint64, footer []byte) (archiveReader, error) {
if uint64(len(footer)) != archiveFooterSize {
return archiveReader{}, errors.New("runtime error: invalid footer.")
}
ftr, err := buildFooter(fileSz, footer)
if err != nil {
return archiveReader{}, err
}
return buildArchiveReader(reader, fileSz, ftr)
}
func newArchiveReader(reader io.ReaderAt, fileSize uint64) (archiveReader, error) {
footer, err := loadFooter(reader, fileSize)
if err != nil {
return archiveReader{}, err
}
return buildArchiveReader(reader, fileSize, footer)
}
func buildArchiveReader(reader io.ReaderAt, fileSize uint64, footer archiveFooter) (archiveReader, error) {
byteOffSpan := footer.indexByteOffsetSpan()
secRdr := io.NewSectionReader(reader, int64(byteOffSpan.offset), int64(byteOffSpan.length))
byteSpans := make([]uint64, footer.byteSpanCount+1)
byteSpans[0] = 0 // Null byteSpan to simplify logic.
err = binary.Read(secRdr, binary.BigEndian, byteSpans[1:])
err := binary.Read(secRdr, binary.BigEndian, byteSpans[1:])
if err != nil {
return archiveReader{}, err
}
@@ -213,7 +231,10 @@ func loadFooter(reader io.ReaderAt, fileSize uint64) (f archiveFooter, err error
if err != nil {
return
}
return buildFooter(fileSize, buf)
}
func buildFooter(fileSize uint64, buf []byte) (f archiveFooter, err error) {
f.formatVersion = buf[afrVersionOffset]
f.fileSignature = string(buf[afrSigOffset:])
// Verify File Signature

View File

@@ -22,39 +22,18 @@
package nbs
import (
"bytes"
"context"
"errors"
"io"
"strings"
"time"
"github.com/dolthub/dolt/go/store/hash"
)
// NM4 - rename to objectExistsInChunkSource
func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, name string, stats *Stats) (bool, error) {
magic := make([]byte, magicNumberSize)
n, err := s3.readS3TableFileFromEnd(ctx, name, magic, stats)
if err != nil {
return false, err
}
if n != len(magic) {
return false, errors.New("failed to read all data")
}
if strings.HasSuffix(name, ArchiveFileSuffix) {
// dolt magic number is a version byte + DOLTARC. We ignore the version byte here.
return bytes.Equal(magic[magicNumberSize-doltMagicSize:], []byte(doltMagicNumber)), nil
} else {
return bytes.Equal(magic, []byte(magicNumber)), nil
}
}
func newAWSTableFileChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
var tra tableReaderAt
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
n, err := s3.readS3TableFileFromEnd(ctx, name.String(), p, stats)
n, err := s3.readS3ObjectFromEnd(ctx, name.String(), p, stats)
if err != nil {
return err
}
@@ -76,31 +55,6 @@ func newAWSTableFileChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsL
return &chunkSourceAdapter{tr, name}, nil
}
func newAWSArchiveChunkSource(ctx context.Context,
s3 *s3ObjectReader,
al awsLimits,
name string,
chunkCount uint32,
q MemoryQuotaProvider,
stats *Stats) (cs chunkSource, err error) {
// Perform a readrange of the footer to get the size of the file.
footer := make([]byte, archiveFooterSize)
_, sz, err := s3.readRange(ctx, name, footer, httpEndRangeHeader(int(archiveFooterSize)))
if err != nil {
return emptyChunkSource{}, err
}
rdr := s3ReaderAt{name, s3}
aRdr, err := newArchiveReader(rdr, sz)
if err != nil {
return archiveChunkSource{}, err
}
return archiveChunkSource{"panic if we use this", aRdr}, nil
}
func loadTableIndex(ctx context.Context, stats *Stats, cnt uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) {
idxSz := int(indexSize(cnt) + footerSize)
offsetSz := int((cnt - (cnt / 2)) * offsetSize)

View File

@@ -33,6 +33,7 @@ import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -67,6 +68,8 @@ type awsLimits struct {
partTarget, partMin, partMax uint64
}
// Open takes the named object, and returns a chunkSource for it. This function works for both table files and archive
// files. If the table file doesn't exist, but |name| + ".darc" does, then an archive chunk source is returned instead.
func (s3p awsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
cs, err := newAWSTableFileChunkSource(
ctx,
@@ -81,8 +84,15 @@ func (s3p awsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCoun
return cs, nil
}
// if the error is for an object not found, we may be looking for an archive. We could check the error
// before trying to see if there is an archive.... NM4.
// errors.Is doesn't work with aws errors
reqErr, ok := err.(awserr.RequestFailure)
if !ok {
// Probably won't ever happen.
return emptyChunkSource{}, err
}
if reqErr.Code() != "NoSuchKey" || reqErr.StatusCode() != 404 {
return emptyChunkSource{}, err
}
e, err2 := s3p.Exists(ctx, name.String()+ArchiveFileSuffix, chunkCount, stats)
if e && err2 == nil {
@@ -100,12 +110,9 @@ func (s3p awsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCoun
}
func (s3p awsTablePersister) Exists(ctx context.Context, name string, _ uint32, stats *Stats) (bool, error) {
return tableExistsInChunkSource(
ctx,
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
name,
stats,
)
s3or := &s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}
return s3or.objectExistsInChunkSource(ctx, name, stats)
}
func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error {

View File

@@ -22,13 +22,16 @@
package nbs
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
@@ -36,10 +39,11 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jpillora/backoff"
"golang.org/x/sync/errgroup"
)
// s3ObjectReader is a wrapper for S3 that gives us some nice to haves for reading objects from S3.
// TODO: Bring all the multipart upload and remote-conjoin stuff over here and make this a better analogue to ddbTableStore
// TODO: Bring all the multipart upload and remote-conjoin stuff in.
type s3ObjectReader struct {
s3 s3iface.S3API
bucket string
@@ -156,57 +160,74 @@ func (s3or *s3ObjectReader) readRange(ctx context.Context, name string, p []byte
return n, sz, err
}
// NM4 - still table specific. Clean up. Doesn't need to be, IMO.
func (s3or *s3ObjectReader) readS3TableFileFromEnd(ctx context.Context, name string, p []byte, stats *Stats) (n int, err error) {
// readS3ObjectFromEnd reads the last |len(p)| bytes of the named object into |p|. The number of bytes read is returned,
func (s3or *s3ObjectReader) readS3ObjectFromEnd(ctx context.Context, name string, p []byte, stats *Stats) (n int, err error) {
defer func(t1 time.Time) {
stats.S3BytesPerRead.Sample(uint64(len(p)))
stats.S3ReadLatency.SampleTimeSince(t1)
}(time.Now())
if len(p) > maxS3ReadFromEndReqSize {
panic("ReadAtFromEnd: re-instate!")
/*
totalN := uint64(0)
// If we're bigger than 256MB, parallelize the read...
// Read the footer first and capture the size of the entire table file.
n, sz, err := s3or.readRange(ctx, name, p[len(p)-footerSize:], httpEndRangeHeader(footerSize))
if err != nil {
return n, err
totalN := uint64(0)
// If we're bigger than 256MB, parallelize the read...
// Read the last |footerSize| bytes to get the size of the file. We know that all table files are at least this big.
n, sz, err := s3or.readRange(ctx, name, p[len(p)-footerSize:], httpEndRangeHeader(footerSize))
if err != nil {
return n, err
}
totalN += uint64(n)
eg, egctx := errgroup.WithContext(ctx)
start := 0
for start < len(p)-footerSize {
// Make parallel read requests of up to 128MB.
end := start + preferredS3ReadFromEndReqSize
if end > len(p)-footerSize {
end = len(p) - footerSize
}
totalN += uint64(n)
eg, egctx := errgroup.WithContext(ctx)
start := 0
for start < len(p)-footerSize {
// Make parallel read requests of up to 128MB.
end := start + preferredS3ReadFromEndReqSize
if end > len(p)-footerSize {
end = len(p) - footerSize
bs := p[start:end]
rangeStart := sz - uint64(len(p)) + uint64(start)
rangeEnd := sz - uint64(len(p)) + uint64(end) - 1
length := rangeEnd - rangeStart
eg.Go(func() error {
n, _, err := s3or.readRange(egctx, name, bs, httpRangeHeader(int64(rangeStart), int64(length)))
if err != nil {
return err
}
bs := p[start:end]
rangeStart := sz - uint64(len(p)) + uint64(start)
rangeEnd := sz - uint64(len(p)) + uint64(end) - 1
length := rangeEnd - rangeStart
eg.Go(func() error {
n, _, err := s3or.readRange(egctx, name, bs, httpRangeHeader(int64(rangeStart), int64(length)))
if err != nil {
return err
}
atomic.AddUint64(&totalN, uint64(n))
return nil
})
start = end
}
err = eg.Wait()
if err != nil {
return 0, err
}
return int(totalN), nil
atomic.AddUint64(&totalN, uint64(n))
return nil
})
start = end
}
err = eg.Wait()
if err != nil {
return 0, err
}
return int(totalN), nil
} else {
n, _, err = s3or.readRange(ctx, name, p, httpEndRangeHeader(len(p)))
return n, err
}
}
*/
// objectExistsInChunkSource returns true if the object exists in the chunk source, and it verifies that
// the object signatures matches the |name|. A |name| which ends in .darc indicates an archive file, otherwise
// we verify the Noms magic number. True is returned if the object is legitimate, and false with an error if not.
func (s3or *s3ObjectReader) objectExistsInChunkSource(ctx context.Context, name string, stats *Stats) (bool, error) {
magic := make([]byte, magicNumberSize)
n, err := s3or.readS3ObjectFromEnd(ctx, name, magic, stats)
if err != nil {
return false, err
}
if n != len(magic) {
return false, errors.New("failed to read all data")
}
n, _, err = s3or.readRange(ctx, name, p, httpEndRangeHeader(len(p)))
return n, err
if strings.HasSuffix(name, ArchiveFileSuffix) {
// dolt magic number is a version byte + DOLTARC. We ignore the version byte here.
return bytes.Equal(magic[magicNumberSize-doltMagicSize:], []byte(doltMagicNumber)), nil
} else {
return bytes.Equal(magic, []byte(magicNumber)), nil
}
}
func isConnReset(err error) bool {

View File

@@ -46,10 +46,13 @@ skip_if_no_aws_tests() {
## is currently the only way we can get archive files from AWS backed databases
## when cloning.
dolt clone "$url" cloneddb
cd cloneddb
# Verify we can read data
run dolt sql -q 'select sum(i) from tbl;'
[[ "$status" -eq 0 ]] || false
[[ "$output" =~ "138075" ]] || false # i = 1 - 525, sum is 138075
dolt fsck
}
# bats test_tags=no_lambda
@@ -79,10 +82,12 @@ skip_if_no_aws_tests() {
run dolt sql -q 'select sum(i) from tbl;'
[[ "$status" -eq 0 ]] || false
[[ "$output" =~ "138033" ]] || false # i = 1 - 525, sum is 138075, then substract 42
dolt fsck
}
# bats test_tags=no_lambda
@test "archive-aws: can push and and clone archive" {
@test "archive-aws: can push and clone archive" {
skip_if_no_aws_tests
rm -rf .dolt
@@ -105,4 +110,11 @@ skip_if_no_aws_tests() {
run dolt sql -q 'select sum(i) from tbl;'
[[ "$status" -eq 0 ]] || false
[[ "$output" =~ "138075" ]] || false # i = 1 - 525, sum is 138075
dolt fsck
}
## NM4 - Need additional tests when we support streaming to archive files.
## - Push archive content to AWS, verify it is in archive format when we clone.
## - Incremental push/fetch produces archive files (remote and local).
## - GC then Archive, then push to AWS.