diff --git a/go/nbs/file_table_persister.go b/go/nbs/file_table_persister.go index b3bf298161..5366bc8f3e 100644 --- a/go/nbs/file_table_persister.go +++ b/go/nbs/file_table_persister.go @@ -20,7 +20,17 @@ type fsTablePersister struct { } func (ftp fsTablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { - return ftp.persistTable(mt.write(haver)) + name, data, count, errata := mt.write(haver) + // TODO: remove when BUG 3156 is fixed + for h, eData := range errata { + func() { + temp, err := ioutil.TempFile(ftp.dir, h.String()+"-errata") + d.PanicIfError(err) + defer checkClose(temp) + io.Copy(temp, bytes.NewReader(eData)) + }() + } + return ftp.persistTable(name, data, count) } func (ftp fsTablePersister) persistTable(name addr, data []byte, chunkCount uint32) chunkSource { diff --git a/go/nbs/mem_table.go b/go/nbs/mem_table.go index 07d2172f3e..6c47d1e223 100644 --- a/go/nbs/mem_table.go +++ b/go/nbs/mem_table.go @@ -10,6 +10,7 @@ import ( "github.com/attic-labs/noms/go/chunks" "github.com/attic-labs/noms/go/hash" + "github.com/attic-labs/noms/go/util/verbose" ) type memTable struct { @@ -102,7 +103,7 @@ func (mt *memTable) extract(order EnumerationOrder, chunks chan<- extractRecord) } } -func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32) { +func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32, errata map[addr][]byte) { maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData) buff := make([]byte, maxSize) tw := newTableWriter(buff) @@ -121,5 +122,14 @@ func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint } } tableSize, name := tw.finish() - return name, buff[:tableSize], count + + // TODO: remove when BUG 3156 is fixed + if len(tw.errata) > 0 { + verbose.Log("BUG 3156: table %s; %d chunks, %d total data; max table size %d\n", name.String(), len(mt.order), mt.totalData, maxSize) + for h, data := range tw.errata { + verbose.Log(" Failed to write %s of uncompressed length %d\n", h.String(), len(data)) + } + } + + return name, buff[:tableSize], count, tw.errata } diff --git a/go/nbs/mem_table_test.go b/go/nbs/mem_table_test.go index cc62d1d1d4..e316b2410a 100644 --- a/go/nbs/mem_table_test.go +++ b/go/nbs/mem_table_test.go @@ -88,7 +88,7 @@ func TestMemTableWrite(t *testing.T) { assert.True(tr1.has(computeAddr(chunks[1]))) assert.True(tr2.has(computeAddr(chunks[2]))) - _, data, count := mt.write(chunkReaderGroup{tr1, tr2}) + _, data, count, _ := mt.write(chunkReaderGroup{tr1, tr2}) assert.Equal(uint32(1), count) outReader := newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize) diff --git a/go/nbs/root_tracker_test.go b/go/nbs/root_tracker_test.go index e8e7fbea02..46465795e9 100644 --- a/go/nbs/root_tracker_test.go +++ b/go/nbs/root_tracker_test.go @@ -197,7 +197,7 @@ type fakeTablePersister struct { func (ftp fakeTablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { if mt.count() > 0 { - name, data, chunkCount := mt.write(haver) + name, data, chunkCount, _ := mt.write(haver) if chunkCount > 0 { ftp.sources[name] = newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize) return chunkSourceAdapter{ftp.sources[name], name} diff --git a/go/nbs/s3_table_persister.go b/go/nbs/s3_table_persister.go index 29a813cb64..97913eb7db 100644 --- a/go/nbs/s3_table_persister.go +++ b/go/nbs/s3_table_persister.go @@ -36,40 +36,21 @@ type s3UploadedPart struct { } func (s3p s3TablePersister) Compact(mt *memTable, haver chunkReader) chunkSource { - return s3p.persistTable(mt.write(haver)) + name, data, count, errata := mt.write(haver) + // TODO: remove when BUG 3156 is fixed + for h, eData := range errata { + s3p.multipartUpload(eData, h.String()+"-errata") + } + return s3p.persistTable(name, data, count) } func (s3p s3TablePersister) persistTable(name addr, data []byte, chunkCount uint32) chunkSource { if chunkCount > 0 { t1 := time.Now() - result, err := s3p.s3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ - Bucket: aws.String(s3p.bucket), - Key: aws.String(name.String()), - }) - d.PanicIfError(err) - uploadID := *result.UploadId - - multipartUpload, err := s3p.uploadParts(data, name.String(), uploadID) - if err != nil { - _, abrtErr := s3p.s3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ - Bucket: aws.String(s3p.bucket), - Key: aws.String(name.String()), - UploadId: aws.String(uploadID), - }) - d.Chk.NoError(abrtErr) - panic(err) // TODO: Better error handling here - } - - _, err = s3p.s3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ - Bucket: aws.String(s3p.bucket), - Key: aws.String(name.String()), - MultipartUpload: multipartUpload, - UploadId: aws.String(uploadID), - }) - d.Chk.NoError(err) - s3tr := &s3TableReader{s3: s3p.s3, bucket: s3p.bucket, h: name} - + s3p.multipartUpload(data, name.String()) verbose.Log("Compacted table of %d Kb in %s", len(data)/1024, time.Since(t1)) + + s3tr := &s3TableReader{s3: s3p.s3, bucket: s3p.bucket, h: name} index := parseTableIndex(data) if s3p.indexCache != nil { s3p.indexCache.put(name, index) @@ -84,6 +65,34 @@ func (s3p s3TablePersister) CompactAll(sources chunkSources) chunkSource { return s3p.persistTable(compactSourcesToBuffer(sources, s3p.readRl)) } +func (s3p s3TablePersister) multipartUpload(data []byte, key string) { + result, err := s3p.s3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(s3p.bucket), + Key: aws.String(key), + }) + d.Chk.NoError(err) + uploadID := *result.UploadId + + multipartUpload, err := s3p.uploadParts(data, key, uploadID) + if err != nil { + _, abrtErr := s3p.s3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: aws.String(s3p.bucket), + Key: aws.String(key), + UploadId: aws.String(uploadID), + }) + d.Chk.NoError(abrtErr) + panic(err) // TODO: Better error handling here + } + + _, err = s3p.s3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(s3p.bucket), + Key: aws.String(key), + MultipartUpload: multipartUpload, + UploadId: aws.String(uploadID), + }) + d.Chk.NoError(err) +} + func (s3p s3TablePersister) uploadParts(data []byte, key, uploadID string) (*s3.CompletedMultipartUpload, error) { sent, failed, done := make(chan s3UploadedPart), make(chan error), make(chan struct{}) diff --git a/go/nbs/store.go b/go/nbs/store.go index 05901cb24b..93025e5f54 100644 --- a/go/nbs/store.go +++ b/go/nbs/store.go @@ -29,7 +29,7 @@ type EnumerationOrder uint8 const ( // StorageVersion is the version of the on-disk Noms Chunks Store data format. - StorageVersion = "0" + StorageVersion = "1" defaultMemTableSize uint64 = (1 << 20) * 128 // 128MB defaultAWSReadLimit = 1024 diff --git a/go/nbs/table_reader.go b/go/nbs/table_reader.go index 9f7b67cffe..56a4203142 100644 --- a/go/nbs/table_reader.go +++ b/go/nbs/table_reader.go @@ -363,12 +363,12 @@ func canReadAhead(fRec offsetRec, fLength uint32, readStart, readEnd, blockSize // Fetches the byte stream of data logically encoded within the table starting at |pos|. func (tr tableReader) parseChunk(buff []byte) []byte { dataLen := uint64(len(buff)) - checksumSize + + chksum := binary.BigEndian.Uint32(buff[dataLen:]) + d.Chk.True(chksum == crc(buff[:dataLen])) + data, err := snappy.Decode(nil, buff[:dataLen]) d.Chk.NoError(err) - buff = buff[dataLen:] - - chksum := binary.BigEndian.Uint32(buff) - d.Chk.True(chksum == crc(data)) return data } diff --git a/go/nbs/table_writer.go b/go/nbs/table_writer.go index 5ba4f8e90b..2ed8efab85 100644 --- a/go/nbs/table_writer.go +++ b/go/nbs/table_writer.go @@ -11,6 +11,7 @@ import ( "sort" "github.com/attic-labs/noms/go/d" + "github.com/attic-labs/noms/go/util/verbose" "github.com/golang/snappy" ) @@ -21,6 +22,8 @@ type tableWriter struct { totalPhysicalData uint64 prefixes prefixIndexSlice // TODO: This is in danger of exploding memory blockHash hash.Hash + + errata map[addr][]byte // TODO: Get rid of this once we've diagnosed and fixed BUG 3156 } func maxTableSize(numChunks, totalData uint64) uint64 { @@ -40,6 +43,7 @@ func newTableWriter(buff []byte) *tableWriter { return &tableWriter{ buff: buff, blockHash: sha512.New(), + errata: map[addr][]byte{}, } } @@ -51,11 +55,27 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool { // Compress data straight into tw.buff compressed := snappy.Encode(tw.buff[tw.pos:], data) dataLength := uint64(len(compressed)) + + // BUG 3156 indicates that, sometimes, snappy decides that there's not enough space in tw.buff[tw.pos:] to encode into. + // This _should not be_, because we believe that we allocate enough space in |tw.buff| to cover snappy's worst-case but...we've seen some instances. + // Since we know that |data| can't be 0-length, we also know that the compressed version of |data| has length greater than zero. The first element in a snappy-encoded blob is a Uvarint indicating how much data is present. Therefore, if there's a Uvarint-encoded 0 at tw.buff[tw.pos:], we know that snappy did not write anything there and we have a problem. + if v, n := binary.Uvarint(tw.buff[tw.pos:]); v == 0 { + d.Chk.True(n != 0) + d.Chk.True(uint64(len(tw.buff[tw.pos:])) >= dataLength) + + verbose.Log("BUG 3156: unbuffered chunk %s: uncompressed %d, compressed %d, snappy max %d, tw.buff %d\n", h.String(), len(data), dataLength, snappy.MaxEncodedLen(len(data)), len(tw.buff[tw.pos:])) + + // Copy the compressed data over to tw.buff. + copy(tw.buff[tw.pos:], compressed) + // Store the uncompressed data, so code with access to durable storage can save it off for analysis. + tw.errata[h] = data + } + tw.pos += dataLength tw.totalPhysicalData += dataLength // checksum (4 LSBytes, big-endian) - binary.BigEndian.PutUint32(tw.buff[tw.pos:], crc(data)) + binary.BigEndian.PutUint32(tw.buff[tw.pos:], crc(compressed)) tw.pos += checksumSize // Stored in insertion order