diff --git a/go/nbs/table.go b/go/nbs/table.go index 07fef8cca0..80b5e02c2d 100644 --- a/go/nbs/table.go +++ b/go/nbs/table.go @@ -9,6 +9,7 @@ import ( "crypto/sha512" "encoding/base32" "encoding/binary" + "hash/crc32" ) /* @@ -22,9 +23,9 @@ import ( +----------------+----------------+-----+----------------+-------+--------+ Chunk Record: - +--------------------+---------------------------+ - | (4) Address suffix | (Chunk Length) Chunk Data | - +--------------------+---------------------------+ + +---------------------------+----------------+ + | (Chunk Length) Chunk Data | (Uint32) CRC32 | + +---------------------------+----------------+ -Address suffix is the 4 least-significant bytes of the Chunk's address. Used (e.g. in place of CRC32) as a checksum and a filter against false positive reads costing more than one IOP. @@ -112,11 +113,17 @@ const ( magicNumberSize uint64 = uint64(len(magicNumber)) footerSize = uint32Size + uint64Size + magicNumberSize prefixTupleSize = addrPrefixSize + ordinalSize - checksumSize uint64 = 4 + checksumSize uint64 = uint32Size maxChunkLengthSize uint64 = binary.MaxVarintLen64 maxChunkSize uint64 = 0xffffffff // Snappy won't compress slices bigger than this ) +var crcTable = crc32.MakeTable(crc32.Castagnoli) + +func crc(b []byte) uint32 { + return crc32.Update(0, crcTable, b) +} + func computeAddrDefault(data []byte) addr { r := sha512.Sum512(data) h := addr{} diff --git a/go/nbs/table_reader.go b/go/nbs/table_reader.go index 4734759b82..d370f93300 100644 --- a/go/nbs/table_reader.go +++ b/go/nbs/table_reader.go @@ -107,8 +107,7 @@ func (tr tableReader) hasMany(addrs []hasRecord) (remaining bool) { // prefixes are equal, so locate and compare against the corresponding suffix for j := filterIdx; j < filterLen && addr.prefix == tr.prefixes[j]; j++ { - li := uint64(tr.prefixIdxToOrdinal(j)) * addrSuffixSize - if bytes.Compare(addr.a[addrPrefixSize:], tr.suffixes[li:li+addrSuffixSize]) == 0 { + if tr.ordinalSuffixMatches(tr.prefixIdxToOrdinal(j), *addr.a) { addrs[i].has = true break } @@ -155,10 +154,7 @@ func (tr tableReader) has(h addr) bool { idx := tr.prefixIdx(prefix) for ; idx < tr.chunkCount && tr.prefixes[idx] == prefix; idx++ { - ordinal := tr.prefixIdxToOrdinal(idx) - suffixOffset := uint64(ordinal) * addrSuffixSize - - if bytes.Compare(tr.suffixes[suffixOffset:suffixOffset+addrSuffixSize], h[addrPrefixSize:]) == 0 { + if tr.ordinalSuffixMatches(tr.prefixIdxToOrdinal(idx), h) { return true } } @@ -166,6 +162,12 @@ func (tr tableReader) has(h addr) bool { return false } +// Return true IFF the suffix at insertion order |ordinal| matches the address |a|. +func (tr tableReader) ordinalSuffixMatches(ordinal uint32, a addr) bool { + li := uint64(ordinal) * addrSuffixSize + return bytes.Compare(a[addrPrefixSize:], tr.suffixes[li:li+addrSuffixSize]) == 0 +} + // returns the storage associated with |h|, iff present. Returns nil if absent. On success, // the returned byte slice directly references the underlying storage. func (tr tableReader) get(h addr) (data []byte) { @@ -174,6 +176,10 @@ func (tr tableReader) get(h addr) (data []byte) { for ; idx < tr.chunkCount && tr.prefixes[idx] == prefix; idx++ { ordinal := tr.prefixIdxToOrdinal(idx) + if !tr.ordinalSuffixMatches(ordinal, h) { + continue + } + offset := tr.offsets[ordinal] length := uint64(tr.lengths[ordinal]) buff := make([]byte, length) // TODO: Avoid this allocation for every get @@ -206,8 +212,8 @@ const readAmpThresh = 1 << 1 // getMany retrieves multiple stored blocks and optimizes by attempting to read in larger physical // blocks which contain multiple stored blocks. |reqs| must be sorted by address prefix. func (tr tableReader) getMany(reqs []getRecord) (remaining bool) { - filterIdx := uint64(0) - filterLen := uint64(len(tr.prefixes)) + filterIdx := uint32(0) + filterLen := uint32(len(tr.prefixes)) offsetRecords := make(offsetRecSlice, 0, len(reqs)) // Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set @@ -228,9 +234,11 @@ func (tr tableReader) getMany(reqs []getRecord) (remaining bool) { continue } - // record all offsets within the table which *may* contain the address we are searching for. + // record all offsets within the table which contain the data required. for j := filterIdx; j < filterLen && req.prefix == tr.prefixes[j]; j++ { - offsetRecords = append(offsetRecords, offsetRec{uint32(i), tr.ordinals[j], tr.offsets[tr.ordinals[j]]}) + if tr.ordinalSuffixMatches(tr.prefixIdxToOrdinal(j), *req.a) { + offsetRecords = append(offsetRecords, offsetRec{uint32(i), tr.ordinals[j], tr.offsets[tr.ordinals[j]]}) + } } } @@ -326,23 +334,13 @@ func (tr tableReader) getMany(reqs []getRecord) (remaining bool) { // Fetches the byte stream of data logically encoded within the table starting at |pos|. func (tr tableReader) parseChunk(h addr, buff []byte) []byte { - // chksum (4 LSBytes, big-endian) - chksum := binary.BigEndian.Uint32(buff) - if chksum != h.Checksum() { - return nil // false positive - } - buff = buff[checksumSize:] - - // data - data, err := snappy.Decode(nil, buff) + dataLen := uint64(len(buff)) - checksumSize + data, err := snappy.Decode(nil, buff[:dataLen]) d.Chk.NoError(err) + buff = buff[dataLen:] - computedAddr := computeAddr(data) - d.Chk.True(chksum == computedAddr.Checksum()) // integrity check - - if computedAddr != h { - return nil // false positive - } + chksum := binary.BigEndian.Uint32(buff) + d.Chk.True(chksum == crc(data)) return data } diff --git a/go/nbs/table_writer.go b/go/nbs/table_writer.go index 0e988569c9..5ba4f8e90b 100644 --- a/go/nbs/table_writer.go +++ b/go/nbs/table_writer.go @@ -48,16 +48,16 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool { panic("NBS blocks cannont be zero length") } - // checksum (4 LSBytes, big-endian) - copy(tw.buff[tw.pos:tw.pos+checksumSize], h[addrSize-checksumSize:]) - tw.pos += checksumSize - // Compress data straight into tw.buff compressed := snappy.Encode(tw.buff[tw.pos:], data) dataLength := uint64(len(compressed)) tw.pos += dataLength tw.totalPhysicalData += dataLength + // checksum (4 LSBytes, big-endian) + binary.BigEndian.PutUint32(tw.buff[tw.pos:], crc(data)) + tw.pos += checksumSize + // Stored in insertion order tw.prefixes = append(tw.prefixes, prefixIndexRec{ h.Prefix(),