diff --git a/go/nbs/compacting_chunk_source.go b/go/nbs/compacting_chunk_source.go index b5db1ef01c..abf7acc0b7 100644 --- a/go/nbs/compacting_chunk_source.go +++ b/go/nbs/compacting_chunk_source.go @@ -81,10 +81,10 @@ func (ccs *compactingChunkSource) count() uint32 { return ccs.cs.count() } -func (ccs *compactingChunkSource) byteLen() uint64 { +func (ccs *compactingChunkSource) lens() []uint32 { ccs.wg.Wait() d.Chk.True(ccs.cs != nil) - return ccs.cs.byteLen() + return ccs.cs.lens() } func (ccs *compactingChunkSource) hash() addr { @@ -131,8 +131,8 @@ func (ecs emptyChunkSource) count() uint32 { return 0 } -func (ecs emptyChunkSource) byteLen() uint64 { - return 0 +func (ecs emptyChunkSource) lens() []uint32 { + return nil } func (ecs emptyChunkSource) hash() addr { diff --git a/go/nbs/file_table_persister.go b/go/nbs/file_table_persister.go index b5c14e5af8..b3bf298161 100644 --- a/go/nbs/file_table_persister.go +++ b/go/nbs/file_table_persister.go @@ -20,17 +20,7 @@ type fsTablePersister struct { } func (ftp fsTablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { - name, data, count, errata := mt.write(haver) - // TODO: remove when BUG 3156 is fixed - for h, eData := range errata { - func() { - temp, err := ioutil.TempFile(ftp.dir, "errata-"+h.String()) - d.PanicIfError(err) - defer checkClose(temp) - io.Copy(temp, bytes.NewReader(eData)) - }() - } - return ftp.persistTable(name, data, count) + return ftp.persistTable(mt.write(haver)) } func (ftp fsTablePersister) persistTable(name addr, data []byte, chunkCount uint32) chunkSource { diff --git a/go/nbs/mem_table.go b/go/nbs/mem_table.go index b6e3f4028e..68f4de0e5d 100644 --- a/go/nbs/mem_table.go +++ b/go/nbs/mem_table.go @@ -5,8 +5,6 @@ package nbs import ( - "fmt" - "os" "sort" "sync" @@ -52,8 +50,12 @@ func (mt *memTable) count() uint32 { return uint32(len(mt.order)) } -func (mt *memTable) byteLen() uint64 { - return mt.totalData +func (mt *memTable) lens() []uint32 { + lengths := make([]uint32, 0, len(mt.chunks)) + for _, data := range mt.chunks { + lengths = append(lengths, uint32(len(data))) + } + return lengths } func (mt *memTable) has(h addr) (has bool) { @@ -106,8 +108,8 @@ func (mt *memTable) extract(order EnumerationOrder, chunks chan<- extractRecord) } } -func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32, errata map[addr][]byte) { - maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData) +func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32) { + maxSize := maxTableSize(mt.lens()) buff := make([]byte, maxSize) tw := newTableWriter(buff, mt.snapper) @@ -126,13 +128,5 @@ func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint } tableSize, name := tw.finish() - // TODO: remove when BUG 3156 is fixed - if len(tw.errata) > 0 { - fmt.Fprintf(os.Stderr, "BUG 3156: table %s; %d chunks, %d total data; max table size %d\n", name.String(), len(mt.order), mt.totalData, maxSize) - for h, data := range tw.errata { - fmt.Fprintf(os.Stderr, " Failed to write %s of uncompressed length %d\n", h.String(), len(data)) - } - } - - return name, buff[:tableSize], count, tw.errata + return name, buff[:tableSize], count } diff --git a/go/nbs/mem_table_test.go b/go/nbs/mem_table_test.go index 5d3781fa78..8b58e8857b 100644 --- a/go/nbs/mem_table_test.go +++ b/go/nbs/mem_table_test.go @@ -89,7 +89,7 @@ func TestMemTableWrite(t *testing.T) { assert.True(tr1.has(computeAddr(chunks[1]))) assert.True(tr2.has(computeAddr(chunks[2]))) - _, data, count, _ := mt.write(chunkReaderGroup{tr1, tr2}) + _, data, count := mt.write(chunkReaderGroup{tr1, tr2}) assert.Equal(uint32(1), count) outReader := newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize) @@ -98,7 +98,7 @@ func TestMemTableWrite(t *testing.T) { assert.False(outReader.has(computeAddr(chunks[2]))) } -func TestMemTableWriteErrata(t *testing.T) { +func TestMemTableSnappyWriteOutOfLine(t *testing.T) { assert := assert.New(t) mt := newMemTable(1024) @@ -111,19 +111,9 @@ func TestMemTableWriteErrata(t *testing.T) { for _, c := range chunks { assert.True(mt.addChunk(computeAddr(c), c)) } - mt.snapper = &outOfLineSnappy{[]bool{false, true, false}} // chunks[1] should wind up in errata + mt.snapper = &outOfLineSnappy{[]bool{false, true, false}} // chunks[1] should wind up getting written "out of line" - _, data, count, errata := mt.write(nil) - assert.EqualValues(3, count) - - outReader := newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize) - assertChunksInReader(chunks, outReader, assert) - - if assert.Len(errata, 1) { - data, present := errata[computeAddr(chunks[1])] - assert.True(present) - assert.EqualValues(chunks[1], data) - } + assert.Panics(func() { mt.write(nil) }) } type outOfLineSnappy struct { @@ -187,9 +177,9 @@ func (crg chunkReaderGroup) count() (count uint32) { return } -func (crg chunkReaderGroup) byteLen() (data uint64) { +func (crg chunkReaderGroup) lens() (lengths []uint32) { for _, haver := range crg { - data += haver.byteLen() + lengths = append(lengths, haver.lens()...) } return } diff --git a/go/nbs/root_tracker_test.go b/go/nbs/root_tracker_test.go index 46465795e9..e8e7fbea02 100644 --- a/go/nbs/root_tracker_test.go +++ b/go/nbs/root_tracker_test.go @@ -197,7 +197,7 @@ type fakeTablePersister struct { func (ftp fakeTablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { if mt.count() > 0 { - name, data, chunkCount, _ := mt.write(haver) + name, data, chunkCount := mt.write(haver) if chunkCount > 0 { ftp.sources[name] = newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize) return chunkSourceAdapter{ftp.sources[name], name} diff --git a/go/nbs/s3_table_persister.go b/go/nbs/s3_table_persister.go index 8489644af5..038804d2a4 100644 --- a/go/nbs/s3_table_persister.go +++ b/go/nbs/s3_table_persister.go @@ -36,12 +36,7 @@ type s3UploadedPart struct { } func (s3p s3TablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { - name, data, count, errata := mt.write(haver) - // TODO: remove when BUG 3156 is fixed - for h, eData := range errata { - s3p.multipartUpload(eData, "errata-"+h.String()) - } - return s3p.persistTable(name, data, count) + return s3p.persistTable(mt.write(haver)) } func (s3p s3TablePersister) persistTable(name addr, data []byte, chunkCount uint32) chunkSource { diff --git a/go/nbs/s3_table_persister_test.go b/go/nbs/s3_table_persister_test.go index fa605c84f6..2f46416d4b 100644 --- a/go/nbs/s3_table_persister_test.go +++ b/go/nbs/s3_table_persister_test.go @@ -36,7 +36,7 @@ func TestS3TablePersisterCompact(t *testing.T) { } func calcPartSize(rdr chunkReader, maxPartNum int) int { - return int(maxTableSize(uint64(rdr.count()), rdr.byteLen())) / maxPartNum + return int(maxTableSize(rdr.lens())) / maxPartNum } func TestS3TablePersisterCompactSinglePart(t *testing.T) { @@ -135,11 +135,11 @@ func TestS3TablePersisterCompactAll(t *testing.T) { } func bytesToChunkSource(bs ...[]byte) chunkSource { - sum := 0 + lengths := []uint32{} for _, b := range bs { - sum += len(b) + lengths = append(lengths, uint32(len(b))) } - maxSize := maxTableSize(uint64(len(bs)), uint64(sum)) + maxSize := maxTableSize(lengths) buff := make([]byte, maxSize) tw := newTableWriter(buff, nil) for _, b := range bs { diff --git a/go/nbs/table.go b/go/nbs/table.go index 330bc468a0..1d9f425d00 100644 --- a/go/nbs/table.go +++ b/go/nbs/table.go @@ -205,7 +205,7 @@ type chunkReader interface { get(h addr) []byte getMany(reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup) bool count() uint32 - byteLen() uint64 + lens() []uint32 extract(order EnumerationOrder, chunks chan<- extractRecord) } diff --git a/go/nbs/table_persister.go b/go/nbs/table_persister.go index 5a8bee3874..c51359aea7 100644 --- a/go/nbs/table_persister.go +++ b/go/nbs/table_persister.go @@ -57,16 +57,16 @@ func (csbc chunkSourcesByDescendingCount) Swap(i, j int) { csbc[i], csbc[j] = cs func compactSourcesToBuffer(sources chunkSources, rl chan struct{}) (name addr, data []byte, chunkCount uint32) { d.Chk.True(rl != nil) - totalData := uint64(0) + + lengths := []uint32{} for _, src := range sources { - chunkCount += src.count() - totalData += src.byteLen() + lengths = append(lengths, src.lens()...) } - if chunkCount == 0 { + if len(lengths) == 0 { return } - maxSize := maxTableSize(uint64(chunkCount), totalData) + maxSize := maxTableSize(lengths) buff := make([]byte, maxSize) // This can blow up RAM (BUG 3130) tw := newTableWriter(buff, nil) diff --git a/go/nbs/table_reader.go b/go/nbs/table_reader.go index 56a4203142..dcf944e868 100644 --- a/go/nbs/table_reader.go +++ b/go/nbs/table_reader.go @@ -19,7 +19,6 @@ import ( type tableIndex struct { chunkCount uint32 - totalPhysicalData uint64 prefixes, offsets []uint64 lengths, ordinals []uint32 suffixes []byte @@ -40,9 +39,8 @@ func parseTableIndex(buff []byte) tableIndex { pos -= magicNumberSize d.Chk.True(string(buff[pos:]) == magicNumber) - // total chunk data + // Ignore total chunk data pos -= uint64Size - totalPhysicalData := binary.BigEndian.Uint64(buff[pos:]) pos -= uint32Size chunkCount := binary.BigEndian.Uint32(buff[pos:]) @@ -62,7 +60,7 @@ func parseTableIndex(buff []byte) tableIndex { prefixes, ordinals := computePrefixes(chunkCount, buff[pos:pos+tuplesSize]) return tableIndex{ - chunkCount, totalPhysicalData, + chunkCount, prefixes, offsets, lengths, ordinals, suffixes, @@ -188,8 +186,8 @@ func (tr tableReader) count() uint32 { return tr.chunkCount } -func (tr tableReader) byteLen() uint64 { - return tr.totalPhysicalData +func (tr tableReader) lens() []uint32 { + return tr.lengths } // returns true iff |h| can be found in this table. diff --git a/go/nbs/table_set.go b/go/nbs/table_set.go index 5399494433..e86a7a7eff 100644 --- a/go/nbs/table_set.go +++ b/go/nbs/table_set.go @@ -117,14 +117,16 @@ func (ts tableSet) count() uint32 { return f(ts.novel) + f(ts.upstream) } -func (ts tableSet) byteLen() uint64 { - f := func(css chunkSources) (data uint64) { +func (ts tableSet) lens() (lengths []uint32) { + f := func(css chunkSources) { for _, haver := range css { - data += haver.byteLen() + lengths = append(lengths, haver.lens()...) } return } - return f(ts.novel) + f(ts.upstream) + f(ts.novel) + f(ts.upstream) + return } // Size returns the number of tables in this tableSet. diff --git a/go/nbs/table_test.go b/go/nbs/table_test.go index 6b9773266a..245dac84ba 100644 --- a/go/nbs/table_test.go +++ b/go/nbs/table_test.go @@ -21,11 +21,11 @@ import ( ) func buildTable(chunks [][]byte) ([]byte, addr) { - totalData := uint64(0) + lengths := []uint32{} for _, chunk := range chunks { - totalData += uint64(len(chunk)) + lengths = append(lengths, uint32(len(chunk))) } - capacity := maxTableSize(uint64(len(chunks)), totalData) + capacity := maxTableSize(lengths) buff := make([]byte, capacity) @@ -125,9 +125,12 @@ func TestHasManySequentialPrefix(t *testing.T) { } bogusData := []byte("bogus") // doesn't matter what this is. hasMany() won't check chunkRecords - totalData := uint64(len(bogusData) * len(addrs)) + lengths := []uint32{} + for range addrs { + lengths = append(lengths, uint32(len(bogusData))) + } - capacity := maxTableSize(uint64(len(addrs)), totalData) + capacity := maxTableSize(lengths) buff := make([]byte, capacity) tw := newTableWriter(buff, nil) diff --git a/go/nbs/table_writer.go b/go/nbs/table_writer.go index 41fd958193..471e3388f5 100644 --- a/go/nbs/table_writer.go +++ b/go/nbs/table_writer.go @@ -9,7 +9,6 @@ import ( "encoding/binary" "fmt" "hash" - "os" "sort" "github.com/attic-labs/noms/go/d" @@ -24,7 +23,6 @@ type tableWriter struct { prefixes prefixIndexSlice // TODO: This is in danger of exploding memory blockHash hash.Hash - errata map[addr][]byte // TODO: Get rid of this once we've diagnosed and fixed BUG 3156 snapper snappyEncoder } @@ -38,12 +36,14 @@ func (r realSnappyEncoder) Encode(dst, src []byte) []byte { return snappy.Encode(dst, src) } -func maxTableSize(numChunks, totalData uint64) uint64 { - avgChunkSize := totalData / numChunks - d.Chk.True(avgChunkSize < maxChunkSize) - maxSnappySize := snappy.MaxEncodedLen(int(avgChunkSize)) - d.Chk.True(maxSnappySize > 0) - return numChunks*(prefixTupleSize+lengthSize+addrSuffixSize+checksumSize+uint64(maxSnappySize)) + footerSize +func maxTableSize(lengths []uint32) (max uint64) { + max = footerSize + for _, length := range lengths { + max += prefixTupleSize + lengthSize + addrSuffixSize + checksumSize // Metadata + d.Chk.True(int64(length) < maxInt) + max += uint64(snappy.MaxEncodedLen(int(length))) + } + return } func indexSize(numChunks uint32) uint64 { @@ -58,7 +58,6 @@ func newTableWriter(buff []byte, snapper snappyEncoder) *tableWriter { return &tableWriter{ buff: buff, blockHash: sha512.New(), - errata: map[addr][]byte{}, snapper: snapper, } } @@ -72,19 +71,12 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool { compressed := tw.snapper.Encode(tw.buff[tw.pos:], data) dataLength := uint64(len(compressed)) - // BUG 3156 indicates that, sometimes, snappy decides that there's not enough space in tw.buff[tw.pos:] to encode into. - // This _should not be_, because we believe that we allocate enough space in |tw.buff| to cover snappy's worst-case but...we've seen some instances. + // BUG 3156 indicated that, sometimes, snappy decided that there's not enough space in tw.buff[tw.pos:] to encode into. + // This _should never happen anymore be_, because we iterate over all chunks to be added and sum the max amount of space that snappy says it might need. // Since we know that |data| can't be 0-length, we also know that the compressed version of |data| has length greater than zero. The first element in a snappy-encoded blob is a Uvarint indicating how much data is present. Therefore, if there's a Uvarint-encoded 0 at tw.buff[tw.pos:], we know that snappy did not write anything there and we have a problem. if v, n := binary.Uvarint(tw.buff[tw.pos:]); v == 0 { d.Chk.True(n != 0) - d.Chk.True(uint64(len(tw.buff[tw.pos:])) >= dataLength) - - fmt.Fprintf(os.Stderr, "BUG 3156: unbuffered chunk %s: uncompressed %d, compressed %d, snappy max %d, tw.buff %d\n", h.String(), len(data), dataLength, snappy.MaxEncodedLen(len(data)), len(tw.buff[tw.pos:])) - - // Copy the compressed data over to tw.buff. - copy(tw.buff[tw.pos:], compressed) - // Store the uncompressed data, so code with access to durable storage can save it off for analysis. - tw.errata[h] = data + panic(fmt.Errorf("BUG 3156: unbuffered chunk %s: uncompressed %d, compressed %d, snappy max %d, tw.buff %d\n", h.String(), len(data), dataLength, snappy.MaxEncodedLen(len(data)), len(tw.buff[tw.pos:]))) } tw.pos += dataLength