mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-07 03:09:05 -06:00
NBS: Calculate maxTableSize precisely (#3165)
Though Raf and I can't figure out how, it's clear that the method we initially used for calculating the max amount of space for snappy-compressed chunk data was incorrect. That's the root cause of of all the chunks to be written and summing the snappy.MaxEncodedLen() for each. Fixes #3156
This commit is contained in:
@@ -81,10 +81,10 @@ func (ccs *compactingChunkSource) count() uint32 {
|
||||
return ccs.cs.count()
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) byteLen() uint64 {
|
||||
func (ccs *compactingChunkSource) lens() []uint32 {
|
||||
ccs.wg.Wait()
|
||||
d.Chk.True(ccs.cs != nil)
|
||||
return ccs.cs.byteLen()
|
||||
return ccs.cs.lens()
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) hash() addr {
|
||||
@@ -131,8 +131,8 @@ func (ecs emptyChunkSource) count() uint32 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) byteLen() uint64 {
|
||||
return 0
|
||||
func (ecs emptyChunkSource) lens() []uint32 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) hash() addr {
|
||||
|
||||
@@ -20,17 +20,7 @@ type fsTablePersister struct {
|
||||
}
|
||||
|
||||
func (ftp fsTablePersister) Compact(mt *memTable, haver chunkReader) chunkSource {
|
||||
name, data, count, errata := mt.write(haver)
|
||||
// TODO: remove when BUG 3156 is fixed
|
||||
for h, eData := range errata {
|
||||
func() {
|
||||
temp, err := ioutil.TempFile(ftp.dir, "errata-"+h.String())
|
||||
d.PanicIfError(err)
|
||||
defer checkClose(temp)
|
||||
io.Copy(temp, bytes.NewReader(eData))
|
||||
}()
|
||||
}
|
||||
return ftp.persistTable(name, data, count)
|
||||
return ftp.persistTable(mt.write(haver))
|
||||
}
|
||||
|
||||
func (ftp fsTablePersister) persistTable(name addr, data []byte, chunkCount uint32) chunkSource {
|
||||
|
||||
@@ -5,8 +5,6 @@
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
@@ -52,8 +50,12 @@ func (mt *memTable) count() uint32 {
|
||||
return uint32(len(mt.order))
|
||||
}
|
||||
|
||||
func (mt *memTable) byteLen() uint64 {
|
||||
return mt.totalData
|
||||
func (mt *memTable) lens() []uint32 {
|
||||
lengths := make([]uint32, 0, len(mt.chunks))
|
||||
for _, data := range mt.chunks {
|
||||
lengths = append(lengths, uint32(len(data)))
|
||||
}
|
||||
return lengths
|
||||
}
|
||||
|
||||
func (mt *memTable) has(h addr) (has bool) {
|
||||
@@ -106,8 +108,8 @@ func (mt *memTable) extract(order EnumerationOrder, chunks chan<- extractRecord)
|
||||
}
|
||||
}
|
||||
|
||||
func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32, errata map[addr][]byte) {
|
||||
maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData)
|
||||
func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32) {
|
||||
maxSize := maxTableSize(mt.lens())
|
||||
buff := make([]byte, maxSize)
|
||||
tw := newTableWriter(buff, mt.snapper)
|
||||
|
||||
@@ -126,13 +128,5 @@ func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint
|
||||
}
|
||||
tableSize, name := tw.finish()
|
||||
|
||||
// TODO: remove when BUG 3156 is fixed
|
||||
if len(tw.errata) > 0 {
|
||||
fmt.Fprintf(os.Stderr, "BUG 3156: table %s; %d chunks, %d total data; max table size %d\n", name.String(), len(mt.order), mt.totalData, maxSize)
|
||||
for h, data := range tw.errata {
|
||||
fmt.Fprintf(os.Stderr, " Failed to write %s of uncompressed length %d\n", h.String(), len(data))
|
||||
}
|
||||
}
|
||||
|
||||
return name, buff[:tableSize], count, tw.errata
|
||||
return name, buff[:tableSize], count
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ func TestMemTableWrite(t *testing.T) {
|
||||
assert.True(tr1.has(computeAddr(chunks[1])))
|
||||
assert.True(tr2.has(computeAddr(chunks[2])))
|
||||
|
||||
_, data, count, _ := mt.write(chunkReaderGroup{tr1, tr2})
|
||||
_, data, count := mt.write(chunkReaderGroup{tr1, tr2})
|
||||
assert.Equal(uint32(1), count)
|
||||
|
||||
outReader := newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize)
|
||||
@@ -98,7 +98,7 @@ func TestMemTableWrite(t *testing.T) {
|
||||
assert.False(outReader.has(computeAddr(chunks[2])))
|
||||
}
|
||||
|
||||
func TestMemTableWriteErrata(t *testing.T) {
|
||||
func TestMemTableSnappyWriteOutOfLine(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
mt := newMemTable(1024)
|
||||
|
||||
@@ -111,19 +111,9 @@ func TestMemTableWriteErrata(t *testing.T) {
|
||||
for _, c := range chunks {
|
||||
assert.True(mt.addChunk(computeAddr(c), c))
|
||||
}
|
||||
mt.snapper = &outOfLineSnappy{[]bool{false, true, false}} // chunks[1] should wind up in errata
|
||||
mt.snapper = &outOfLineSnappy{[]bool{false, true, false}} // chunks[1] should wind up getting written "out of line"
|
||||
|
||||
_, data, count, errata := mt.write(nil)
|
||||
assert.EqualValues(3, count)
|
||||
|
||||
outReader := newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize)
|
||||
assertChunksInReader(chunks, outReader, assert)
|
||||
|
||||
if assert.Len(errata, 1) {
|
||||
data, present := errata[computeAddr(chunks[1])]
|
||||
assert.True(present)
|
||||
assert.EqualValues(chunks[1], data)
|
||||
}
|
||||
assert.Panics(func() { mt.write(nil) })
|
||||
}
|
||||
|
||||
type outOfLineSnappy struct {
|
||||
@@ -187,9 +177,9 @@ func (crg chunkReaderGroup) count() (count uint32) {
|
||||
return
|
||||
}
|
||||
|
||||
func (crg chunkReaderGroup) byteLen() (data uint64) {
|
||||
func (crg chunkReaderGroup) lens() (lengths []uint32) {
|
||||
for _, haver := range crg {
|
||||
data += haver.byteLen()
|
||||
lengths = append(lengths, haver.lens()...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -197,7 +197,7 @@ type fakeTablePersister struct {
|
||||
|
||||
func (ftp fakeTablePersister) Compact(mt *memTable, haver chunkReader) chunkSource {
|
||||
if mt.count() > 0 {
|
||||
name, data, chunkCount, _ := mt.write(haver)
|
||||
name, data, chunkCount := mt.write(haver)
|
||||
if chunkCount > 0 {
|
||||
ftp.sources[name] = newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize)
|
||||
return chunkSourceAdapter{ftp.sources[name], name}
|
||||
|
||||
@@ -36,12 +36,7 @@ type s3UploadedPart struct {
|
||||
}
|
||||
|
||||
func (s3p s3TablePersister) Compact(mt *memTable, haver chunkReader) chunkSource {
|
||||
name, data, count, errata := mt.write(haver)
|
||||
// TODO: remove when BUG 3156 is fixed
|
||||
for h, eData := range errata {
|
||||
s3p.multipartUpload(eData, "errata-"+h.String())
|
||||
}
|
||||
return s3p.persistTable(name, data, count)
|
||||
return s3p.persistTable(mt.write(haver))
|
||||
}
|
||||
|
||||
func (s3p s3TablePersister) persistTable(name addr, data []byte, chunkCount uint32) chunkSource {
|
||||
|
||||
@@ -36,7 +36,7 @@ func TestS3TablePersisterCompact(t *testing.T) {
|
||||
}
|
||||
|
||||
func calcPartSize(rdr chunkReader, maxPartNum int) int {
|
||||
return int(maxTableSize(uint64(rdr.count()), rdr.byteLen())) / maxPartNum
|
||||
return int(maxTableSize(rdr.lens())) / maxPartNum
|
||||
}
|
||||
|
||||
func TestS3TablePersisterCompactSinglePart(t *testing.T) {
|
||||
@@ -135,11 +135,11 @@ func TestS3TablePersisterCompactAll(t *testing.T) {
|
||||
}
|
||||
|
||||
func bytesToChunkSource(bs ...[]byte) chunkSource {
|
||||
sum := 0
|
||||
lengths := []uint32{}
|
||||
for _, b := range bs {
|
||||
sum += len(b)
|
||||
lengths = append(lengths, uint32(len(b)))
|
||||
}
|
||||
maxSize := maxTableSize(uint64(len(bs)), uint64(sum))
|
||||
maxSize := maxTableSize(lengths)
|
||||
buff := make([]byte, maxSize)
|
||||
tw := newTableWriter(buff, nil)
|
||||
for _, b := range bs {
|
||||
|
||||
@@ -205,7 +205,7 @@ type chunkReader interface {
|
||||
get(h addr) []byte
|
||||
getMany(reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup) bool
|
||||
count() uint32
|
||||
byteLen() uint64
|
||||
lens() []uint32
|
||||
extract(order EnumerationOrder, chunks chan<- extractRecord)
|
||||
}
|
||||
|
||||
|
||||
@@ -57,16 +57,16 @@ func (csbc chunkSourcesByDescendingCount) Swap(i, j int) { csbc[i], csbc[j] = cs
|
||||
|
||||
func compactSourcesToBuffer(sources chunkSources, rl chan struct{}) (name addr, data []byte, chunkCount uint32) {
|
||||
d.Chk.True(rl != nil)
|
||||
totalData := uint64(0)
|
||||
|
||||
lengths := []uint32{}
|
||||
for _, src := range sources {
|
||||
chunkCount += src.count()
|
||||
totalData += src.byteLen()
|
||||
lengths = append(lengths, src.lens()...)
|
||||
}
|
||||
if chunkCount == 0 {
|
||||
if len(lengths) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
maxSize := maxTableSize(uint64(chunkCount), totalData)
|
||||
maxSize := maxTableSize(lengths)
|
||||
buff := make([]byte, maxSize) // This can blow up RAM (BUG 3130)
|
||||
tw := newTableWriter(buff, nil)
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
|
||||
type tableIndex struct {
|
||||
chunkCount uint32
|
||||
totalPhysicalData uint64
|
||||
prefixes, offsets []uint64
|
||||
lengths, ordinals []uint32
|
||||
suffixes []byte
|
||||
@@ -40,9 +39,8 @@ func parseTableIndex(buff []byte) tableIndex {
|
||||
pos -= magicNumberSize
|
||||
d.Chk.True(string(buff[pos:]) == magicNumber)
|
||||
|
||||
// total chunk data
|
||||
// Ignore total chunk data
|
||||
pos -= uint64Size
|
||||
totalPhysicalData := binary.BigEndian.Uint64(buff[pos:])
|
||||
|
||||
pos -= uint32Size
|
||||
chunkCount := binary.BigEndian.Uint32(buff[pos:])
|
||||
@@ -62,7 +60,7 @@ func parseTableIndex(buff []byte) tableIndex {
|
||||
prefixes, ordinals := computePrefixes(chunkCount, buff[pos:pos+tuplesSize])
|
||||
|
||||
return tableIndex{
|
||||
chunkCount, totalPhysicalData,
|
||||
chunkCount,
|
||||
prefixes, offsets,
|
||||
lengths, ordinals,
|
||||
suffixes,
|
||||
@@ -188,8 +186,8 @@ func (tr tableReader) count() uint32 {
|
||||
return tr.chunkCount
|
||||
}
|
||||
|
||||
func (tr tableReader) byteLen() uint64 {
|
||||
return tr.totalPhysicalData
|
||||
func (tr tableReader) lens() []uint32 {
|
||||
return tr.lengths
|
||||
}
|
||||
|
||||
// returns true iff |h| can be found in this table.
|
||||
|
||||
@@ -117,14 +117,16 @@ func (ts tableSet) count() uint32 {
|
||||
return f(ts.novel) + f(ts.upstream)
|
||||
}
|
||||
|
||||
func (ts tableSet) byteLen() uint64 {
|
||||
f := func(css chunkSources) (data uint64) {
|
||||
func (ts tableSet) lens() (lengths []uint32) {
|
||||
f := func(css chunkSources) {
|
||||
for _, haver := range css {
|
||||
data += haver.byteLen()
|
||||
lengths = append(lengths, haver.lens()...)
|
||||
}
|
||||
return
|
||||
}
|
||||
return f(ts.novel) + f(ts.upstream)
|
||||
f(ts.novel)
|
||||
f(ts.upstream)
|
||||
return
|
||||
}
|
||||
|
||||
// Size returns the number of tables in this tableSet.
|
||||
|
||||
@@ -21,11 +21,11 @@ import (
|
||||
)
|
||||
|
||||
func buildTable(chunks [][]byte) ([]byte, addr) {
|
||||
totalData := uint64(0)
|
||||
lengths := []uint32{}
|
||||
for _, chunk := range chunks {
|
||||
totalData += uint64(len(chunk))
|
||||
lengths = append(lengths, uint32(len(chunk)))
|
||||
}
|
||||
capacity := maxTableSize(uint64(len(chunks)), totalData)
|
||||
capacity := maxTableSize(lengths)
|
||||
|
||||
buff := make([]byte, capacity)
|
||||
|
||||
@@ -125,9 +125,12 @@ func TestHasManySequentialPrefix(t *testing.T) {
|
||||
}
|
||||
|
||||
bogusData := []byte("bogus") // doesn't matter what this is. hasMany() won't check chunkRecords
|
||||
totalData := uint64(len(bogusData) * len(addrs))
|
||||
lengths := []uint32{}
|
||||
for range addrs {
|
||||
lengths = append(lengths, uint32(len(bogusData)))
|
||||
}
|
||||
|
||||
capacity := maxTableSize(uint64(len(addrs)), totalData)
|
||||
capacity := maxTableSize(lengths)
|
||||
buff := make([]byte, capacity)
|
||||
tw := newTableWriter(buff, nil)
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
@@ -24,7 +23,6 @@ type tableWriter struct {
|
||||
prefixes prefixIndexSlice // TODO: This is in danger of exploding memory
|
||||
blockHash hash.Hash
|
||||
|
||||
errata map[addr][]byte // TODO: Get rid of this once we've diagnosed and fixed BUG 3156
|
||||
snapper snappyEncoder
|
||||
}
|
||||
|
||||
@@ -38,12 +36,14 @@ func (r realSnappyEncoder) Encode(dst, src []byte) []byte {
|
||||
return snappy.Encode(dst, src)
|
||||
}
|
||||
|
||||
func maxTableSize(numChunks, totalData uint64) uint64 {
|
||||
avgChunkSize := totalData / numChunks
|
||||
d.Chk.True(avgChunkSize < maxChunkSize)
|
||||
maxSnappySize := snappy.MaxEncodedLen(int(avgChunkSize))
|
||||
d.Chk.True(maxSnappySize > 0)
|
||||
return numChunks*(prefixTupleSize+lengthSize+addrSuffixSize+checksumSize+uint64(maxSnappySize)) + footerSize
|
||||
func maxTableSize(lengths []uint32) (max uint64) {
|
||||
max = footerSize
|
||||
for _, length := range lengths {
|
||||
max += prefixTupleSize + lengthSize + addrSuffixSize + checksumSize // Metadata
|
||||
d.Chk.True(int64(length) < maxInt)
|
||||
max += uint64(snappy.MaxEncodedLen(int(length)))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func indexSize(numChunks uint32) uint64 {
|
||||
@@ -58,7 +58,6 @@ func newTableWriter(buff []byte, snapper snappyEncoder) *tableWriter {
|
||||
return &tableWriter{
|
||||
buff: buff,
|
||||
blockHash: sha512.New(),
|
||||
errata: map[addr][]byte{},
|
||||
snapper: snapper,
|
||||
}
|
||||
}
|
||||
@@ -72,19 +71,12 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool {
|
||||
compressed := tw.snapper.Encode(tw.buff[tw.pos:], data)
|
||||
dataLength := uint64(len(compressed))
|
||||
|
||||
// BUG 3156 indicates that, sometimes, snappy decides that there's not enough space in tw.buff[tw.pos:] to encode into.
|
||||
// This _should not be_, because we believe that we allocate enough space in |tw.buff| to cover snappy's worst-case but...we've seen some instances.
|
||||
// BUG 3156 indicated that, sometimes, snappy decided that there's not enough space in tw.buff[tw.pos:] to encode into.
|
||||
// This _should never happen anymore be_, because we iterate over all chunks to be added and sum the max amount of space that snappy says it might need.
|
||||
// Since we know that |data| can't be 0-length, we also know that the compressed version of |data| has length greater than zero. The first element in a snappy-encoded blob is a Uvarint indicating how much data is present. Therefore, if there's a Uvarint-encoded 0 at tw.buff[tw.pos:], we know that snappy did not write anything there and we have a problem.
|
||||
if v, n := binary.Uvarint(tw.buff[tw.pos:]); v == 0 {
|
||||
d.Chk.True(n != 0)
|
||||
d.Chk.True(uint64(len(tw.buff[tw.pos:])) >= dataLength)
|
||||
|
||||
fmt.Fprintf(os.Stderr, "BUG 3156: unbuffered chunk %s: uncompressed %d, compressed %d, snappy max %d, tw.buff %d\n", h.String(), len(data), dataLength, snappy.MaxEncodedLen(len(data)), len(tw.buff[tw.pos:]))
|
||||
|
||||
// Copy the compressed data over to tw.buff.
|
||||
copy(tw.buff[tw.pos:], compressed)
|
||||
// Store the uncompressed data, so code with access to durable storage can save it off for analysis.
|
||||
tw.errata[h] = data
|
||||
panic(fmt.Errorf("BUG 3156: unbuffered chunk %s: uncompressed %d, compressed %d, snappy max %d, tw.buff %d\n", h.String(), len(data), dataLength, snappy.MaxEncodedLen(len(data)), len(tw.buff[tw.pos:])))
|
||||
}
|
||||
|
||||
tw.pos += dataLength
|
||||
|
||||
Reference in New Issue
Block a user