Bh/cmp chunks (#85)

ability to work with compressed chunks directly.
This commit is contained in:
Brian Hendriks
2019-09-19 10:53:10 -07:00
committed by GitHub
parent 25c427491d
commit af75a25acc
9 changed files with 728 additions and 43 deletions

View File

@@ -0,0 +1,318 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"crypto/sha512"
"encoding/binary"
"errors"
"hash"
"io"
"os"
"sort"
"github.com/golang/snappy"
"github.com/liquidata-inc/dolt/go/libraries/utils/iohelp"
)
// A ByteSink is an interface for writing bytes which can later be flushed to a writer
type ByteSink interface {
io.Writer
// Flush writes all the data that was written to the ByteSink to the supplied writer
Flush(wr io.Writer) error
}
// ErrBuffFull used by the FixedBufferSink when the data written is larger than the buffer allocated.
var ErrBufferFull = errors.New("buffer full")
// FixedBufferByteSink is a ByteSink implementation with a buffer whose size will not change. Writing more
// data than the fixed buffer can hold will result in an error
type FixedBufferByteSink struct {
buff []byte
pos uint64
}
// NewFixedBufferTableSink creates a FixedBufferTableSink which will use the supplied buffer
func NewFixedBufferTableSink(buff []byte) *FixedBufferByteSink {
if len(buff) == 0 {
panic("must provide a buffer")
}
return &FixedBufferByteSink{buff: buff}
}
// Write writes a byte array to the sink.
func (sink *FixedBufferByteSink) Write(src []byte) (int, error) {
dest := sink.buff[sink.pos:]
destLen := len(dest)
srcLen := len(src)
if destLen < srcLen {
return 0, ErrBufferFull
}
copy(dest, src)
sink.pos += uint64(srcLen)
return srcLen, nil
}
// Flush writes all the data that was written to the ByteSink to the supplied writer
func (sink *FixedBufferByteSink) Flush(wr io.Writer) error {
return iohelp.WriteAll(wr, sink.buff[:sink.pos])
}
// BlockBufferByteSink allocates blocks of data with a given block size to store the bytes written to the sink. New
// blocks are allocated as needed in order to handle all the data of the Write calls.
type BlockBufferByteSink struct {
blockSize int
pos uint64
blocks [][]byte
}
// NewBlockBufferTableSink creates a BlockBufferByteSink with the provided block size.
func NewBlockBufferTableSink(blockSize int) *BlockBufferByteSink {
block := make([]byte, 0, blockSize)
return &BlockBufferByteSink{blockSize, 0, [][]byte{block}}
}
// Write writes a byte array to the sink.
func (sink *BlockBufferByteSink) Write(src []byte) (int, error) {
srcLen := len(src)
currBlockIdx := len(sink.blocks) - 1
currBlock := sink.blocks[currBlockIdx]
remaining := cap(currBlock) - len(currBlock)
if remaining >= srcLen {
currBlock = append(currBlock, src...)
sink.blocks[currBlockIdx] = currBlock
} else {
if remaining > 0 {
currBlock = append(currBlock, src[:remaining]...)
sink.blocks[currBlockIdx] = currBlock
}
newBlock := make([]byte, 0, sink.blockSize)
newBlock = append(newBlock, src[remaining:]...)
sink.blocks = append(sink.blocks, newBlock)
}
sink.pos += uint64(srcLen)
return srcLen, nil
}
// Flush writes all the data that was written to the ByteSink to the supplied writer
func (sink *BlockBufferByteSink) Flush(wr io.Writer) (err error) {
return iohelp.WriteAll(wr, sink.blocks...)
}
const defaultTableSinkBlockSize = 2 * 1024 * 1024
// ErrNotFinished is an error returned by a CmpChunkTableWriter when a call to Flush* is called before Finish is called
var ErrNotFinished = errors.New("not finished")
// ErrAlreadyFinished is an error returned if Finish is called more than once on a CmpChunkTableWriter
var ErrAlreadyFinished = errors.New("already Finished")
// CmpChunkTableWriter writes CompressedChunks to a table file
type CmpChunkTableWriter struct {
sink ByteSink
totalCompressedData uint64
totalUncompressedData uint64
prefixes prefixIndexSlice // TODO: This is in danger of exploding memory
blockAddr *addr
}
// NewCmpChunkTableWriter creates a new CmpChunkTableWriter instance with a default ByteSink
func NewCmpChunkTableWriter() *CmpChunkTableWriter {
return &CmpChunkTableWriter{NewBlockBufferTableSink(defaultTableSinkBlockSize), 0, 0, nil, nil}
}
// AddCmpChunk adds a compressed chunk
func (tw *CmpChunkTableWriter) AddCmpChunk(c CompressedChunk) error {
if len(c.CompressedData) == 0 {
panic("NBS blocks cannot be zero length")
}
uncmpLen, err := snappy.DecodedLen(c.CompressedData)
if err != nil {
return err
}
fullLen := len(c.FullCompressedChunk)
_, err = tw.sink.Write(c.FullCompressedChunk)
if err != nil {
return err
}
tw.totalCompressedData += uint64(len(c.CompressedData))
tw.totalUncompressedData += uint64(uncmpLen)
a := addr(c.H)
// Stored in insertion order
tw.prefixes = append(tw.prefixes, prefixIndexRec{
a.Prefix(),
a[addrPrefixSize:],
uint32(len(tw.prefixes)),
uint32(fullLen),
})
return nil
}
// Finish will write the index and footer of the table file and return the id of the file.
func (tw *CmpChunkTableWriter) Finish() (string, error) {
if tw.blockAddr != nil {
return "", ErrAlreadyFinished
}
blockHash, err := tw.writeIndex()
if err != nil {
return "", err
}
err = tw.writeFooter()
if err != nil {
return "", err
}
var h []byte
h = blockHash.Sum(h)
var blockAddr addr
copy(blockAddr[:], h)
tw.blockAddr = &blockAddr
return tw.blockAddr.String(), nil
}
// FlushToFile can be called after Finish in order to write the data out to the path provided.
func (tw *CmpChunkTableWriter) FlushToFile(path string) error {
if tw.blockAddr == nil {
return ErrNotFinished
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
err = tw.sink.Flush(f)
if err != nil {
_ = f.Close()
return err
}
return f.Close()
}
// Flush can be called after Finish in order to write the data out to the writer provided.
func (tw *CmpChunkTableWriter) Flush(wr io.Writer) error {
if tw.blockAddr == nil {
return ErrNotFinished
}
err := tw.sink.Flush(wr)
if err != nil {
return err
}
return nil
}
func (tw *CmpChunkTableWriter) writeIndex() (hash.Hash, error) {
sort.Sort(tw.prefixes)
pfxScratch := [addrPrefixSize]byte{}
blockHash := sha512.New()
numRecords := uint32(len(tw.prefixes))
lengthsOffset := lengthsOffset(numRecords) // skip prefix and ordinal for each record
suffixesOffset := suffixesOffset(numRecords) // skip size for each record
suffixesLen := uint64(numRecords) * addrSuffixSize
buff := make([]byte, suffixesLen+suffixesOffset)
var pos uint64
for _, pi := range tw.prefixes {
binary.BigEndian.PutUint64(pfxScratch[:], pi.prefix)
// hash prefix
n := uint64(copy(buff[pos:], pfxScratch[:]))
if n != addrPrefixSize {
return nil, errors.New("failed to copy all data")
}
pos += n
// order
binary.BigEndian.PutUint32(buff[pos:], pi.order)
pos += ordinalSize
// length
offset := lengthsOffset + uint64(pi.order)*lengthSize
binary.BigEndian.PutUint32(buff[offset:], pi.size)
// hash suffix
offset = suffixesOffset + uint64(pi.order)*addrSuffixSize
n = uint64(copy(buff[offset:], pi.suffix))
if n != addrSuffixSize {
return nil, errors.New("failed to copy all bytes")
}
}
blockHash.Write(buff[suffixesOffset:])
_, err := tw.sink.Write(buff)
if err != nil {
return nil, err
}
return blockHash, nil
}
func (tw *CmpChunkTableWriter) writeFooter() error {
// chunk count
err := binary.Write(tw.sink, binary.BigEndian, uint32(len(tw.prefixes)))
if err != nil {
return err
}
// total uncompressed chunk data
err = binary.Write(tw.sink, binary.BigEndian, tw.totalUncompressedData)
if err != nil {
return err
}
// magic number
_, err = tw.sink.Write([]byte(magicNumber))
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,151 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"bytes"
"context"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/liquidata-inc/dolt/go/store/atomicerr"
"github.com/liquidata-inc/dolt/go/store/chunks"
"github.com/liquidata-inc/dolt/go/store/hash"
)
func TestBlockBufferTableSink(t *testing.T) {
suite.Run(t, &TableSinkSuite{sink: NewBlockBufferTableSink(128)})
}
func TestFixedBufferTableSink(t *testing.T) {
suite.Run(t, &TableSinkSuite{sink: NewFixedBufferTableSink(make([]byte, 32*1024))})
}
type TableSinkSuite struct {
sink ByteSink
t *testing.T
}
func (suite *TableSinkSuite) SetT(t *testing.T) {
suite.t = t
}
func (suite *TableSinkSuite) T() *testing.T {
return suite.t
}
func (suite *TableSinkSuite) TestWrite() {
data := make([]byte, 64)
for i := 0; i < 64; i++ {
data[i] = byte(i)
}
for i := 0; i < 32; i++ {
_, err := suite.sink.Write(data)
assert.NoError(suite.t, err)
}
}
func TestCmpChunkTableWriter(t *testing.T) {
// Put some chunks in a table file and get the buffer back which contains the table file data
ctx := context.Background()
expectedId, buff, err := WriteChunks(testMDChunks)
require.NoError(t, err)
// Setup a TableReader to read compressed chunks out of
ti, err := parseTableIndex(buff)
require.NoError(t, err)
tr := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
hashes := make(hash.HashSet)
for _, chnk := range testMDChunks {
hashes.Insert(chnk.Hash())
}
ae := atomicerr.New()
wg := &sync.WaitGroup{}
reqs := toGetRecords(hashes)
found := make(chan CompressedChunk, 128)
go func() {
defer close(found)
tr.getManyCompressed(ctx, reqs, found, wg, ae, &Stats{})
wg.Wait()
}()
// for all the chunks we find, write them using the compressed writer
tw := NewCmpChunkTableWriter()
for cmpChnk := range found {
err = tw.AddCmpChunk(cmpChnk)
require.NoError(t, err)
}
require.NoError(t, ae.Get())
id, err := tw.Finish()
require.NoError(t, err)
assert.Equal(t, expectedId, id)
output := bytes.NewBuffer(nil)
err = tw.Flush(output)
require.NoError(t, err)
outputBuff := output.Bytes()
outputTI, err := parseTableIndex(outputBuff)
require.NoError(t, err)
outputTR := newTableReader(outputTI, tableReaderAtFromBytes(buff), fileBlockSize)
compareContentsOfTables(t, ctx, hashes, tr, outputTR)
}
func compareContentsOfTables(t *testing.T, ctx context.Context, hashes hash.HashSet, expectedRd, actualRd tableReader) {
expected, err := readAllChunks(ctx, hashes, expectedRd)
require.NoError(t, err)
actual, err := readAllChunks(ctx, hashes, actualRd)
require.NoError(t, err)
assert.Equal(t, len(expected), len(actual))
assert.Equal(t, expected, actual)
}
func readAllChunks(ctx context.Context, hashes hash.HashSet, reader tableReader) (map[hash.Hash][]byte, error) {
wg := &sync.WaitGroup{}
ae := atomicerr.New()
reqs := toGetRecords(hashes)
found := make(chan *chunks.Chunk, 128)
go func() {
defer close(found)
reader.getMany(ctx, reqs, found, wg, ae, &Stats{})
wg.Wait()
}()
hashToData := make(map[hash.Hash][]byte)
for c := range found {
hashToData[c.Hash()] = c.Data()
}
if err := ae.Get(); err != nil {
return nil, err
}
return hashToData, nil
}

View File

@@ -41,6 +41,10 @@ func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) {
mt := newMemTable(size)
return writeChunksToMT(mt, chunks)
}
func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error) {
for _, chunk := range chunks {
if !mt.addChunk(addr(chunk.Hash()), chunk.Data()) {
return "", nil, errors.New("didn't create this memory table with enough space to add all the chunks")
@@ -149,6 +153,10 @@ func (mt *memTable) getMany(ctx context.Context, reqs []getRecord, foundChunks c
return remaining
}
func (mt *memTable) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
panic("not implemented")
}
func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) error {
for _, hrec := range mt.order {
chunks <- extractRecord{a: *hrec.a, data: mt.chunks[*hrec.a], err: nil}

View File

@@ -38,24 +38,32 @@ import (
"github.com/liquidata-inc/dolt/go/store/types"
)
var testMDChunks = []chunks.Chunk{
mustChunk(types.EncodeValue(types.String("Call me Ishmael. Some years ago—never mind how long precisely—having little or no money in my purse, "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("and nothing particular to interest me on shore, I thought I would sail about a little and see the watery "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("part of the world. It is a way I have of driving off the spleen and regulating the "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("circulation. Whenever I find myself growing grim about the mouth; whenever it is a damp, drizzly "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("November in my soul; whenever I find myself involuntarily pausing before coffin warehouses, and bringing "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("funeral I meet; and especially whenever my hypos get such an upper hand of me, that it requires "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("a strong moral principle to prevent me from deliberately stepping into the street, and methodically "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("knocking peoples hats off—then, I account it high time to get to sea as soon as I can."), types.Format_7_18)),
}
var testMDChunksSize uint64
func init() {
for _, chunk := range testMDChunks {
testMDChunksSize += uint64(len(chunk.Data()))
}
}
func mustChunk(chunk chunks.Chunk, err error) chunks.Chunk {
d.PanicIfError(err)
return chunk
}
func TestWriteChunks(t *testing.T) {
chunks := []chunks.Chunk{
mustChunk(types.EncodeValue(types.String("Call me Ishmael. Some years ago—never mind how long precisely—having little or no money in my purse, "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("and nothing particular to interest me on shore, I thought I would sail about a little and see the watery "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("part of the world. It is a way I have of driving off the spleen and regulating the "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("circulation. Whenever I find myself growing grim about the mouth; whenever it is a damp, drizzly "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("November in my soul; whenever I find myself involuntarily pausing before coffin warehouses, and bringing "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("funeral I meet; and especially whenever my hypos get such an upper hand of me, that it requires "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("a strong moral principle to prevent me from deliberately stepping into the street, and methodically "), types.Format_7_18)),
mustChunk(types.EncodeValue(types.String("knocking peoples hats off—then, I account it high time to get to sea as soon as I can."), types.Format_7_18)),
}
name, data, err := WriteChunks(chunks)
name, data, err := WriteChunks(testMDChunks)
if err != nil {
t.Error(err)
}
@@ -269,6 +277,18 @@ func (crg chunkReaderGroup) getMany(ctx context.Context, reqs []getRecord, found
return true
}
func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
for _, haver := range crg {
remaining := haver.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, stats)
if !remaining {
return false
}
}
return true
}
func (crg chunkReaderGroup) count() (count uint32, err error) {
for _, haver := range crg {
count += mustUint32(haver.count())

View File

@@ -134,6 +134,17 @@ func (ccs *persistingChunkSource) getMany(ctx context.Context, reqs []getRecord,
return cr.getMany(ctx, reqs, foundChunks, wg, ae, stats)
}
func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
cr := ccs.getReader()
if cr == nil {
ae.SetIfErrAndCheck(ErrNoReader)
return false
}
return cr.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, stats)
}
func (ccs *persistingChunkSource) wait() error {
ccs.wg.Wait()
return ccs.ae.Get()
@@ -255,6 +266,10 @@ func (ecs emptyChunkSource) getMany(ctx context.Context, reqs []getRecord, found
return true
}
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
return true
}
func (ecs emptyChunkSource) count() (uint32, error) {
return 0, nil
}

View File

@@ -405,6 +405,22 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
}
func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, reqs []getRecord, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
return cr.getMany(ctx, reqs, foundChunks, wg, ae, nbs.stats)
})
}
func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan CompressedChunk) error {
return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, reqs []getRecord, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
return cr.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, nbs.stats)
})
}
func (nbs *NomsBlockStore) getManyWithFunc(
ctx context.Context,
hashes hash.HashSet,
getManyFunc func(ctx context.Context, cr chunkReader, reqs []getRecord, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool,
) error {
t1 := time.Now()
reqs := toGetRecords(hashes)
@@ -424,7 +440,7 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
tables = nbs.tables
remaining = true
if nbs.mt != nil {
remaining = nbs.mt.getMany(ctx, reqs, foundChunks, nil, ae, nbs.stats)
remaining = getManyFunc(ctx, nbs.mt, reqs, nil, ae, nbs.stats)
}
return
@@ -435,7 +451,7 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
}
if remaining {
tables.getMany(ctx, reqs, foundChunks, wg, ae, nbs.stats)
getManyFunc(ctx, tables, reqs, wg, ae, nbs.stats)
wg.Wait()
}

View File

@@ -228,6 +228,7 @@ type chunkReader interface {
hasMany(addrs []hasRecord) (bool, error)
get(ctx context.Context, h addr, stats *Stats) ([]byte, error)
getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
extract(ctx context.Context, chunks chan<- extractRecord) error
count() (uint32, error)
uncompressedLen() (uint64, error)
@@ -244,6 +245,15 @@ type chunkReadPlanner interface {
ae *atomicerr.AtomicError,
stats *Stats,
)
getManyCompressedAtOffsets(
ctx context.Context,
reqs []getRecord,
offsetRecords offsetRecSlice,
foundCmpChunks chan CompressedChunk,
wg *sync.WaitGroup,
ae *atomicerr.AtomicError,
stats *Stats,
)
}
type chunkSource interface {

View File

@@ -37,6 +37,35 @@ import (
"github.com/liquidata-inc/dolt/go/store/hash"
)
type CompressedChunk struct {
H hash.Hash
FullCompressedChunk []byte
CompressedData []byte
}
func NewCompressedChunk(h hash.Hash, buff []byte) (CompressedChunk, error) {
dataLen := uint64(len(buff)) - checksumSize
chksum := binary.BigEndian.Uint32(buff[dataLen:])
compressedData := buff[:dataLen]
if chksum != crc(compressedData) {
return CompressedChunk{}, errors.New("checksum error")
}
return CompressedChunk{H: h, FullCompressedChunk: buff, CompressedData: compressedData}, nil
}
func (cmp CompressedChunk) Decompress() (chunks.Chunk, error) {
data, err := snappy.Decode(nil, cmp.CompressedData)
if err != nil {
return chunks.Chunk{}, err
}
return chunks.NewChunk(data), nil
}
var ErrInvalidTableFile = errors.New("invalid or corrupt table file")
type tableIndex struct {
@@ -298,17 +327,23 @@ func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, er
return nil, errors.New("failed to read all data")
}
data, err := tr.parseChunk(buff)
cmp, err := NewCompressedChunk(hash.Hash(h), buff)
if err != nil {
return nil, err
}
if data == nil {
if len(cmp.CompressedData) == 0 {
return nil, errors.New("failed to get data")
}
return data, nil
chnk, err := cmp.Decompress()
if err != nil {
return nil, err
}
return chnk.Data(), nil
}
type offsetRec struct {
@@ -323,6 +358,20 @@ func (hs offsetRecSlice) Len() int { return len(hs) }
func (hs offsetRecSlice) Less(i, j int) bool { return hs[i].offset < hs[j].offset }
func (hs offsetRecSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
func (tr tableReader) readCompressedAtOffsets(
ctx context.Context,
readStart, readEnd uint64,
reqs []getRecord,
offsets offsetRecSlice,
foundCmpChunks chan CompressedChunk,
stats *Stats,
) error {
return tr.readAtOffsetsWithCB(ctx, readStart, readEnd, reqs, offsets, stats, func(cmp CompressedChunk) error {
foundCmpChunks <- cmp
return nil
})
}
func (tr tableReader) readAtOffsets(
ctx context.Context,
readStart, readEnd uint64,
@@ -330,6 +379,26 @@ func (tr tableReader) readAtOffsets(
offsets offsetRecSlice,
foundChunks chan *chunks.Chunk,
stats *Stats,
) error {
return tr.readAtOffsetsWithCB(ctx, readStart, readEnd, reqs, offsets, stats, func(cmp CompressedChunk) error {
chk, err := cmp.Decompress()
if err != nil {
return err
}
foundChunks <- &chk
return nil
})
}
func (tr tableReader) readAtOffsetsWithCB(
ctx context.Context,
readStart, readEnd uint64,
reqs []getRecord,
offsets offsetRecSlice,
stats *Stats,
cb func(cmp CompressedChunk) error,
) error {
readLength := readEnd - readStart
buff := make([]byte, readLength)
@@ -356,14 +425,17 @@ func (tr tableReader) readAtOffsets(
return errors.New("length goes past the end")
}
data, err := tr.parseChunk(buff[localStart:localEnd])
cmp, err := NewCompressedChunk(hash.Hash(*rec.a), buff[localStart:localEnd])
if err != nil {
return err
}
c := chunks.NewChunkWithHash(hash.Hash(*rec.a), data)
foundChunks <- &c
err = cb(cmp)
if err != nil {
return err
}
}
return nil
@@ -385,6 +457,33 @@ func (tr tableReader) getMany(
tr.getManyAtOffsets(ctx, reqs, offsetRecords, foundChunks, wg, ae, stats)
return remaining
}
func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
// Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set
// of table locations which must be read in order to satisfy the getMany operation.
offsetRecords, remaining := tr.findOffsets(reqs)
tr.getManyCompressedAtOffsets(ctx, reqs, offsetRecords, foundCmpChunks, wg, ae, stats)
return remaining
}
func (tr tableReader) getManyCompressedAtOffsets(
ctx context.Context,
reqs []getRecord,
offsetRecords offsetRecSlice,
foundCmpChunks chan CompressedChunk,
wg *sync.WaitGroup,
ae *atomicerr.AtomicError,
stats *Stats,
) {
tr.getManyAtOffsetsWithReadFunc(ctx, reqs, offsetRecords, wg, ae, stats, func(
ctx context.Context,
readStart, readEnd uint64,
reqs []getRecord,
offsets offsetRecSlice,
stats *Stats) error {
return tr.readCompressedAtOffsets(ctx, readStart, readEnd, reqs, offsets, foundCmpChunks, stats)
})
}
func (tr tableReader) getManyAtOffsets(
ctx context.Context,
@@ -395,7 +494,31 @@ func (tr tableReader) getManyAtOffsets(
ae *atomicerr.AtomicError,
stats *Stats,
) {
tr.getManyAtOffsetsWithReadFunc(ctx, reqs, offsetRecords, wg, ae, stats, func(
ctx context.Context,
readStart, readEnd uint64,
reqs []getRecord,
offsets offsetRecSlice,
stats *Stats) error {
return tr.readAtOffsets(ctx, readStart, readEnd, reqs, offsets, foundChunks, stats)
})
}
func (tr tableReader) getManyAtOffsetsWithReadFunc(
ctx context.Context,
reqs []getRecord,
offsetRecords offsetRecSlice,
wg *sync.WaitGroup,
ae *atomicerr.AtomicError,
stats *Stats,
readAtOffsets func(
ctx context.Context,
readStart, readEnd uint64,
reqs []getRecord,
offsets offsetRecSlice,
stats *Stats) error,
) {
// Now |offsetRecords| contains all locations within the table which must be search (note
// that there may be duplicates of a particular location). Sort by offset and scan forward,
// grouping sequences of reads into large physical reads.
@@ -433,7 +556,7 @@ func (tr tableReader) getManyAtOffsets(
goBatch := batch
go func(batch offsetRecSlice) {
defer wg.Done()
err := tr.readAtOffsets(ctx, goReadStart, goReadEnd, reqs, goBatch, foundChunks, stats)
err := readAtOffsets(ctx, goReadStart, goReadEnd, reqs, goBatch, stats)
ae.SetIfError(err)
}(batch)
batch = nil
@@ -444,7 +567,7 @@ func (tr tableReader) getManyAtOffsets(
wg.Add(1)
go func(batch offsetRecSlice) {
defer wg.Done()
err := tr.readAtOffsets(ctx, readStart, readEnd, reqs, batch, foundChunks, stats)
err := readAtOffsets(ctx, readStart, readEnd, reqs, batch, stats)
ae.SetIfError(err)
}(batch)
batch = nil
@@ -515,25 +638,6 @@ func canReadAhead(fRec offsetRec, fLength uint32, readStart, readEnd, blockSize
return fRec.offset + uint64(fLength), true
}
// Fetches the byte stream of data logically encoded within the table starting at |pos|.
func (tr tableReader) parseChunk(buff []byte) ([]byte, error) {
dataLen := uint64(len(buff)) - checksumSize
chksum := binary.BigEndian.Uint32(buff[dataLen:])
if chksum != crc(buff[:dataLen]) {
return nil, errors.New("checksum error")
}
data, err := snappy.Decode(nil, buff[:dataLen])
if err != nil {
return nil, errors.New("decode error - likely corrupt data")
}
return data, nil
}
func (tr tableReader) calcReads(reqs []getRecord, blockSize uint64) (reads int, remaining bool, err error) {
var offsetRecords offsetRecSlice
// Pass #1: Build the set of table locations which must be read in order to find all the elements of |reqs| which are present in this table.
@@ -596,13 +700,20 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord)
sendChunk := func(i uint32) error {
localOffset := tr.offsets[i] - tr.offsets[0]
data, err := tr.parseChunk(buff[localOffset : localOffset+uint64(tr.lengths[i])])
cmp, err := NewCompressedChunk(hash.Hash(hashes[i]), buff[localOffset:localOffset+uint64(tr.lengths[i])])
if err != nil {
return err
}
chunks <- extractRecord{a: hashes[i], data: data}
chnk, err := cmp.Decompress()
if err != nil {
return err
}
chunks <- extractRecord{a: hashes[i], data: chnk.Data()}
return nil
}

View File

@@ -167,6 +167,42 @@ func (ts tableSet) getMany(ctx context.Context, reqs []getRecord, foundChunks ch
return f(ts.novel) && f(ts.upstream)
}
func (ts tableSet) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
f := func(css chunkSources) bool {
for _, haver := range css {
if ae.IsSet() {
return false
}
if rp, ok := haver.(chunkReadPlanner); ok {
offsets, remaining := rp.findOffsets(reqs)
wg.Add(1)
go func() {
defer wg.Done()
rp.getManyCompressedAtOffsets(ctx, reqs, offsets, foundCmpChunks, wg, ae, stats)
}()
if !remaining {
return false
}
continue
}
remaining := haver.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, stats)
if !remaining {
return false
}
}
return true
}
return f(ts.novel) && f(ts.upstream)
}
func (ts tableSet) calcReads(reqs []getRecord, blockSize uint64) (reads int, split, remaining bool, err error) {
f := func(css chunkSources) (int, bool, bool, error) {
reads, split := 0, false