From 8cfc5e6512061f7dbbc3ae5dd59611a77e1cc25a Mon Sep 17 00:00:00 2001 From: cmasone-attic Date: Tue, 7 Feb 2017 15:43:06 -0800 Subject: [PATCH] Gather more info about Bug 3156 (#3158) There's some case that causes chunks that compress to more than about 55k (we think these are quite big, chunks that are many hundreds of K in size) not to wind up correctly inserted into tables. It looks like the snappy library believes the buffer we've allocated may not be large enough, so it allocates its own space and this screws us up. This patch changes two things: 1) The CRC in the NBS format is now the CRC of the _compressed_ data 2) Such chunks will be manually copied into the table, so they won't be missing anymore Also, when the code detects a case where the snappy library decided to allocate its own storage, it saves the uncompressed data off to the side, so that it can be pushed to durable storage. Such chunks are stored on disk or in S3 named like "-errata", and logging is dumped out so we can figure out which tables were supposed to contain these chunks. Towards #3156 --- go/nbs/file_table_persister.go | 12 ++++++- go/nbs/mem_table.go | 14 ++++++-- go/nbs/mem_table_test.go | 2 +- go/nbs/root_tracker_test.go | 2 +- go/nbs/s3_table_persister.go | 65 +++++++++++++++++++--------------- go/nbs/store.go | 2 +- go/nbs/table_reader.go | 8 ++--- go/nbs/table_writer.go | 22 +++++++++++- 8 files changed, 88 insertions(+), 39 deletions(-) diff --git a/go/nbs/file_table_persister.go b/go/nbs/file_table_persister.go index b3bf298161..5366bc8f3e 100644 --- a/go/nbs/file_table_persister.go +++ b/go/nbs/file_table_persister.go @@ -20,7 +20,17 @@ type fsTablePersister struct { } func (ftp fsTablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { - return ftp.persistTable(mt.write(haver)) + name, data, count, errata := mt.write(haver) + // TODO: remove when BUG 3156 is fixed + for h, eData := range errata { + func() { + temp, err := ioutil.TempFile(ftp.dir, h.String()+"-errata") + d.PanicIfError(err) + defer checkClose(temp) + io.Copy(temp, bytes.NewReader(eData)) + }() + } + return ftp.persistTable(name, data, count) } func (ftp fsTablePersister) persistTable(name addr, data []byte, chunkCount uint32) chunkSource { diff --git a/go/nbs/mem_table.go b/go/nbs/mem_table.go index 07d2172f3e..6c47d1e223 100644 --- a/go/nbs/mem_table.go +++ b/go/nbs/mem_table.go @@ -10,6 +10,7 @@ import ( "github.com/attic-labs/noms/go/chunks" "github.com/attic-labs/noms/go/hash" + "github.com/attic-labs/noms/go/util/verbose" ) type memTable struct { @@ -102,7 +103,7 @@ func (mt *memTable) extract(order EnumerationOrder, chunks chan<- extractRecord) } } -func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32) { +func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32, errata map[addr][]byte) { maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData) buff := make([]byte, maxSize) tw := newTableWriter(buff) @@ -121,5 +122,14 @@ func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint } } tableSize, name := tw.finish() - return name, buff[:tableSize], count + + // TODO: remove when BUG 3156 is fixed + if len(tw.errata) > 0 { + verbose.Log("BUG 3156: table %s; %d chunks, %d total data; max table size %d\n", name.String(), len(mt.order), mt.totalData, maxSize) + for h, data := range tw.errata { + verbose.Log(" Failed to write %s of uncompressed length %d\n", h.String(), len(data)) + } + } + + return name, buff[:tableSize], count, tw.errata } diff --git a/go/nbs/mem_table_test.go b/go/nbs/mem_table_test.go index cc62d1d1d4..e316b2410a 100644 --- a/go/nbs/mem_table_test.go +++ b/go/nbs/mem_table_test.go @@ -88,7 +88,7 @@ func TestMemTableWrite(t *testing.T) { assert.True(tr1.has(computeAddr(chunks[1]))) assert.True(tr2.has(computeAddr(chunks[2]))) - _, data, count := mt.write(chunkReaderGroup{tr1, tr2}) + _, data, count, _ := mt.write(chunkReaderGroup{tr1, tr2}) assert.Equal(uint32(1), count) outReader := newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize) diff --git a/go/nbs/root_tracker_test.go b/go/nbs/root_tracker_test.go index e8e7fbea02..46465795e9 100644 --- a/go/nbs/root_tracker_test.go +++ b/go/nbs/root_tracker_test.go @@ -197,7 +197,7 @@ type fakeTablePersister struct { func (ftp fakeTablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { if mt.count() > 0 { - name, data, chunkCount := mt.write(haver) + name, data, chunkCount, _ := mt.write(haver) if chunkCount > 0 { ftp.sources[name] = newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize) return chunkSourceAdapter{ftp.sources[name], name} diff --git a/go/nbs/s3_table_persister.go b/go/nbs/s3_table_persister.go index 29a813cb64..97913eb7db 100644 --- a/go/nbs/s3_table_persister.go +++ b/go/nbs/s3_table_persister.go @@ -36,40 +36,21 @@ type s3UploadedPart struct { } func (s3p s3TablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { - return s3p.persistTable(mt.write(haver)) + name, data, count, errata := mt.write(haver) + // TODO: remove when BUG 3156 is fixed + for h, eData := range errata { + s3p.multipartUpload(eData, h.String()+"-errata") + } + return s3p.persistTable(name, data, count) } func (s3p s3TablePersister) persistTable(name addr, data []byte, chunkCount uint32) chunkSource { if chunkCount > 0 { t1 := time.Now() - result, err := s3p.s3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ - Bucket: aws.String(s3p.bucket), - Key: aws.String(name.String()), - }) - d.PanicIfError(err) - uploadID := *result.UploadId - - multipartUpload, err := s3p.uploadParts(data, name.String(), uploadID) - if err != nil { - _, abrtErr := s3p.s3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ - Bucket: aws.String(s3p.bucket), - Key: aws.String(name.String()), - UploadId: aws.String(uploadID), - }) - d.Chk.NoError(abrtErr) - panic(err) // TODO: Better error handling here - } - - _, err = s3p.s3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ - Bucket: aws.String(s3p.bucket), - Key: aws.String(name.String()), - MultipartUpload: multipartUpload, - UploadId: aws.String(uploadID), - }) - d.Chk.NoError(err) - s3tr := &s3TableReader{s3: s3p.s3, bucket: s3p.bucket, h: name} - + s3p.multipartUpload(data, name.String()) verbose.Log("Compacted table of %d Kb in %s", len(data)/1024, time.Since(t1)) + + s3tr := &s3TableReader{s3: s3p.s3, bucket: s3p.bucket, h: name} index := parseTableIndex(data) if s3p.indexCache != nil { s3p.indexCache.put(name, index) @@ -84,6 +65,34 @@ func (s3p s3TablePersister) CompactAll(sources chunkSources) chunkSource { return s3p.persistTable(compactSourcesToBuffer(sources, s3p.readRl)) } +func (s3p s3TablePersister) multipartUpload(data []byte, key string) { + result, err := s3p.s3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(s3p.bucket), + Key: aws.String(key), + }) + d.Chk.NoError(err) + uploadID := *result.UploadId + + multipartUpload, err := s3p.uploadParts(data, key, uploadID) + if err != nil { + _, abrtErr := s3p.s3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: aws.String(s3p.bucket), + Key: aws.String(key), + UploadId: aws.String(uploadID), + }) + d.Chk.NoError(abrtErr) + panic(err) // TODO: Better error handling here + } + + _, err = s3p.s3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(s3p.bucket), + Key: aws.String(key), + MultipartUpload: multipartUpload, + UploadId: aws.String(uploadID), + }) + d.Chk.NoError(err) +} + func (s3p s3TablePersister) uploadParts(data []byte, key, uploadID string) (*s3.CompletedMultipartUpload, error) { sent, failed, done := make(chan s3UploadedPart), make(chan error), make(chan struct{}) diff --git a/go/nbs/store.go b/go/nbs/store.go index 05901cb24b..93025e5f54 100644 --- a/go/nbs/store.go +++ b/go/nbs/store.go @@ -29,7 +29,7 @@ type EnumerationOrder uint8 const ( // StorageVersion is the version of the on-disk Noms Chunks Store data format. - StorageVersion = "0" + StorageVersion = "1" defaultMemTableSize uint64 = (1 << 20) * 128 // 128MB defaultAWSReadLimit = 1024 diff --git a/go/nbs/table_reader.go b/go/nbs/table_reader.go index 9f7b67cffe..56a4203142 100644 --- a/go/nbs/table_reader.go +++ b/go/nbs/table_reader.go @@ -363,12 +363,12 @@ func canReadAhead(fRec offsetRec, fLength uint32, readStart, readEnd, blockSize // Fetches the byte stream of data logically encoded within the table starting at |pos|. func (tr tableReader) parseChunk(buff []byte) []byte { dataLen := uint64(len(buff)) - checksumSize + + chksum := binary.BigEndian.Uint32(buff[dataLen:]) + d.Chk.True(chksum == crc(buff[:dataLen])) + data, err := snappy.Decode(nil, buff[:dataLen]) d.Chk.NoError(err) - buff = buff[dataLen:] - - chksum := binary.BigEndian.Uint32(buff) - d.Chk.True(chksum == crc(data)) return data } diff --git a/go/nbs/table_writer.go b/go/nbs/table_writer.go index 5ba4f8e90b..2ed8efab85 100644 --- a/go/nbs/table_writer.go +++ b/go/nbs/table_writer.go @@ -11,6 +11,7 @@ import ( "sort" "github.com/attic-labs/noms/go/d" + "github.com/attic-labs/noms/go/util/verbose" "github.com/golang/snappy" ) @@ -21,6 +22,8 @@ type tableWriter struct { totalPhysicalData uint64 prefixes prefixIndexSlice // TODO: This is in danger of exploding memory blockHash hash.Hash + + errata map[addr][]byte // TODO: Get rid of this once we've diagnosed and fixed BUG 3156 } func maxTableSize(numChunks, totalData uint64) uint64 { @@ -40,6 +43,7 @@ func newTableWriter(buff []byte) *tableWriter { return &tableWriter{ buff: buff, blockHash: sha512.New(), + errata: map[addr][]byte{}, } } @@ -51,11 +55,27 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool { // Compress data straight into tw.buff compressed := snappy.Encode(tw.buff[tw.pos:], data) dataLength := uint64(len(compressed)) + + // BUG 3156 indicates that, sometimes, snappy decides that there's not enough space in tw.buff[tw.pos:] to encode into. + // This _should not be_, because we believe that we allocate enough space in |tw.buff| to cover snappy's worst-case but...we've seen some instances. + // Since we know that |data| can't be 0-length, we also know that the compressed version of |data| has length greater than zero. The first element in a snappy-encoded blob is a Uvarint indicating how much data is present. Therefore, if there's a Uvarint-encoded 0 at tw.buff[tw.pos:], we know that snappy did not write anything there and we have a problem. + if v, n := binary.Uvarint(tw.buff[tw.pos:]); v == 0 { + d.Chk.True(n != 0) + d.Chk.True(uint64(len(tw.buff[tw.pos:])) >= dataLength) + + verbose.Log("BUG 3156: unbuffered chunk %s: uncompressed %d, compressed %d, snappy max %d, tw.buff %d\n", h.String(), len(data), dataLength, snappy.MaxEncodedLen(len(data)), len(tw.buff[tw.pos:])) + + // Copy the compressed data over to tw.buff. + copy(tw.buff[tw.pos:], compressed) + // Store the uncompressed data, so code with access to durable storage can save it off for analysis. + tw.errata[h] = data + } + tw.pos += dataLength tw.totalPhysicalData += dataLength // checksum (4 LSBytes, big-endian) - binary.BigEndian.PutUint32(tw.buff[tw.pos:], crc(data)) + binary.BigEndian.PutUint32(tw.buff[tw.pos:], crc(compressed)) tw.pos += checksumSize // Stored in insertion order