Merge pull request #5307 from dolthub/aaron/nbs-chunkSource-reader

go/store/nbs: Make a chunkSource able to return a real io.Reader.
This commit is contained in:
Aaron Son
2023-02-03 17:21:52 -08:00
committed by GitHub
10 changed files with 64 additions and 13 deletions

View File

@@ -233,6 +233,11 @@ type bsTableReaderAt struct {
bs blobstore.Blobstore
}
func (bsTRA *bsTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
rc, _, err := bsTRA.bs.Get(ctx, bsTRA.key, blobstore.AllRange)
return rc, err
}
// ReadAtWithStats is the bsTableReaderAt implementation of the tableReaderAt interface
func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (int, error) {
br := blobstore.NewBlobRange(off, int64(len(p)))

View File

@@ -22,6 +22,7 @@
package nbs
import (
"bytes"
"context"
"fmt"
"io"
@@ -53,6 +54,14 @@ func (t tableNotInDynamoErr) Error() string {
return fmt.Sprintf("NBS table %s not present in DynamoDB table %s", t.nbs, t.dynamo)
}
func (dtra *dynamoTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
data, err := dtra.ddb.ReadTable(ctx, dtra.h, &Stats{})
if err != nil {
return nil, err
}
return io.NopCloser(bytes.NewReader(data)), nil
}
func (dtra *dynamoTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {
data, err := dtra.ddb.ReadTable(ctx, dtra.h, stats)

View File

@@ -70,8 +70,8 @@ func (ecs emptyChunkSource) index() (tableIndex, error) {
return onHeapTableIndex{}, nil
}
func (ecs emptyChunkSource) reader(context.Context) (io.Reader, uint64, error) {
return &bytes.Buffer{}, 0, nil
func (ecs emptyChunkSource) reader(context.Context) (io.ReadCloser, uint64, error) {
return io.NopCloser(&bytes.Buffer{}), 0, nil
}
func (ecs emptyChunkSource) getRecordRanges(lookups []getRecord) (map[hash.Hash]Range, error) {

View File

@@ -55,7 +55,7 @@ func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) {
func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) {
path := filepath.Join(dir, h.String())
index, err := func() (ti onHeapTableIndex, err error) {
index, sz, err := func() (ti onHeapTableIndex, sz int64, err error) {
// Be careful with how |f| is used below. |RefFile| returns a cached
// os.File pointer so the code needs to use f in a concurrency-safe
@@ -82,7 +82,8 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint
}
idxSz := int64(indexSize(chunkCount) + footerSize)
indexOffset := fi.Size() - idxSz
sz = fi.Size()
indexOffset := sz - idxSz
r := io.NewSectionReader(f, indexOffset, idxSz)
var b []byte
@@ -122,7 +123,7 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint
return nil, errors.New("unexpected chunk count")
}
tr, err := newTableReader(index, &cacheReaderAt{path, fc}, fileBlockSize)
tr, err := newTableReader(index, &cacheReaderAt{path, fc, sz}, fileBlockSize)
if err != nil {
index.Close()
return nil, err
@@ -153,6 +154,11 @@ func (mmtr *fileTableReader) clone() (chunkSource, error) {
type cacheReaderAt struct {
path string
fc *fdCache
sz int64
}
func (cra *cacheReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
return io.NopCloser(io.LimitReader(&readerAdapter{cra, 0, ctx}, cra.sz)), nil
}
func (cra *cacheReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {

View File

@@ -193,9 +193,9 @@ func (s journalChunkSource) hash() addr {
}
// reader implements chunkSource.
func (s journalChunkSource) reader(context.Context) (io.Reader, uint64, error) {
func (s journalChunkSource) reader(context.Context) (io.ReadCloser, uint64, error) {
rdr, sz, err := s.journal.Snapshot()
return rdr, uint64(sz), err
return io.NopCloser(rdr), uint64(sz), err
}
func (s journalChunkSource) getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) {

View File

@@ -24,6 +24,7 @@ package nbs
import (
"bytes"
"context"
"io"
"os"
"testing"
@@ -179,15 +180,20 @@ func TestMemTableWrite(t *testing.T) {
}
type tableReaderAtAdapter struct {
*bytes.Reader
br *bytes.Reader
}
func tableReaderAtFromBytes(b []byte) tableReaderAt {
return tableReaderAtAdapter{bytes.NewReader(b)}
}
func (adapter tableReaderAtAdapter) Reader(ctx context.Context) (io.ReadCloser, error) {
r := *adapter.br
return io.NopCloser(&r), nil
}
func (adapter tableReaderAtAdapter) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {
return adapter.ReadAt(p, off)
return adapter.br.ReadAt(p, off)
}
func TestMemTableSnappyWriteOutOfLine(t *testing.T) {

View File

@@ -60,6 +60,10 @@ type s3svc interface {
PutObjectWithContext(ctx aws.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error)
}
func (s3tra *s3TableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
return s3tra.s3.Reader(ctx, s3tra.h)
}
func (s3tra *s3TableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {
return s3tra.s3.ReadAt(ctx, s3tra.h, p, off, stats)
}
@@ -79,6 +83,10 @@ func (s3or *s3ObjectReader) key(k string) string {
return k
}
func (s3or *s3ObjectReader) Reader(ctx context.Context, name addr) (io.ReadCloser, error) {
return s3or.reader(ctx, name)
}
func (s3or *s3ObjectReader) ReadAt(ctx context.Context, name addr, p []byte, off int64, stats *Stats) (n int, err error) {
t1 := time.Now()
@@ -143,6 +151,18 @@ func (s3or *s3ObjectReader) ReadFromEnd(ctx context.Context, name addr, p []byte
return s3or.readRange(ctx, name, p, fmt.Sprintf("%s=-%d", s3RangePrefix, len(p)))
}
func (s3or *s3ObjectReader) reader(ctx context.Context, name addr) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(s3or.bucket),
Key: aws.String(s3or.key(name.String())),
}
result, err := s3or.s3.GetObjectWithContext(ctx, input)
if err != nil {
return nil, err
}
return result.Body, nil
}
func (s3or *s3ObjectReader) readRange(ctx context.Context, name addr, p []byte, rangeHeader string) (n int, sz uint64, err error) {
read := func() (int, uint64, error) {
if s3or.readRl != nil {

View File

@@ -1292,7 +1292,7 @@ func newTableFile(cs chunkSource, info tableSpec) tableFile {
if err != nil {
return nil, 0, err
}
return io.NopCloser(r), s, nil
return r, s, nil
},
}
}

View File

@@ -260,7 +260,7 @@ type chunkSource interface {
hash() addr
// opens a Reader to the first byte of the chunkData segment of this table.
reader(context.Context) (io.Reader, uint64, error)
reader(context.Context) (io.ReadCloser, uint64, error)
// getRecordRanges sets getRecord.found to true, and returns a Range for each present getRecord query.
getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error)

View File

@@ -130,6 +130,7 @@ func (ir indexResult) Length() uint32 {
type tableReaderAt interface {
ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error)
Reader(ctx context.Context) (io.ReadCloser, error)
}
// tableReader implements get & has queries against a single nbs table. goroutine safe.
@@ -631,10 +632,14 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord)
return nil
}
func (tr tableReader) reader(ctx context.Context) (io.Reader, uint64, error) {
func (tr tableReader) reader(ctx context.Context) (io.ReadCloser, uint64, error) {
i, _ := tr.index()
sz := i.tableFileSize()
return io.LimitReader(&readerAdapter{tr.r, 0, ctx}, int64(sz)), sz, nil
r, err := tr.r.Reader(ctx)
if err != nil {
return nil, 0, err
}
return r, sz, nil
}
func (tr tableReader) getRecordRanges(requests []getRecord) (map[hash.Hash]Range, error) {