mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-09 10:38:10 -06:00
Verify chunks using suffix index not computing address from data. (#2907)
Revert to verifying chunks using the suffix index. Replace the inline 4-byte suffix used as integrity check with a more standard and efficient CRC32.
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"crypto/sha512"
|
||||
"encoding/base32"
|
||||
"encoding/binary"
|
||||
"hash/crc32"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -22,9 +23,9 @@ import (
|
||||
+----------------+----------------+-----+----------------+-------+--------+
|
||||
|
||||
Chunk Record:
|
||||
+--------------------+---------------------------+
|
||||
| (4) Address suffix | (Chunk Length) Chunk Data |
|
||||
+--------------------+---------------------------+
|
||||
+---------------------------+----------------+
|
||||
| (Chunk Length) Chunk Data | (Uint32) CRC32 |
|
||||
+---------------------------+----------------+
|
||||
|
||||
-Address suffix is the 4 least-significant bytes of the Chunk's address. Used (e.g. in place
|
||||
of CRC32) as a checksum and a filter against false positive reads costing more than one IOP.
|
||||
@@ -112,11 +113,17 @@ const (
|
||||
magicNumberSize uint64 = uint64(len(magicNumber))
|
||||
footerSize = uint32Size + uint64Size + magicNumberSize
|
||||
prefixTupleSize = addrPrefixSize + ordinalSize
|
||||
checksumSize uint64 = 4
|
||||
checksumSize uint64 = uint32Size
|
||||
maxChunkLengthSize uint64 = binary.MaxVarintLen64
|
||||
maxChunkSize uint64 = 0xffffffff // Snappy won't compress slices bigger than this
|
||||
)
|
||||
|
||||
var crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||
|
||||
func crc(b []byte) uint32 {
|
||||
return crc32.Update(0, crcTable, b)
|
||||
}
|
||||
|
||||
func computeAddrDefault(data []byte) addr {
|
||||
r := sha512.Sum512(data)
|
||||
h := addr{}
|
||||
|
||||
@@ -107,8 +107,7 @@ func (tr tableReader) hasMany(addrs []hasRecord) (remaining bool) {
|
||||
|
||||
// prefixes are equal, so locate and compare against the corresponding suffix
|
||||
for j := filterIdx; j < filterLen && addr.prefix == tr.prefixes[j]; j++ {
|
||||
li := uint64(tr.prefixIdxToOrdinal(j)) * addrSuffixSize
|
||||
if bytes.Compare(addr.a[addrPrefixSize:], tr.suffixes[li:li+addrSuffixSize]) == 0 {
|
||||
if tr.ordinalSuffixMatches(tr.prefixIdxToOrdinal(j), *addr.a) {
|
||||
addrs[i].has = true
|
||||
break
|
||||
}
|
||||
@@ -155,10 +154,7 @@ func (tr tableReader) has(h addr) bool {
|
||||
idx := tr.prefixIdx(prefix)
|
||||
|
||||
for ; idx < tr.chunkCount && tr.prefixes[idx] == prefix; idx++ {
|
||||
ordinal := tr.prefixIdxToOrdinal(idx)
|
||||
suffixOffset := uint64(ordinal) * addrSuffixSize
|
||||
|
||||
if bytes.Compare(tr.suffixes[suffixOffset:suffixOffset+addrSuffixSize], h[addrPrefixSize:]) == 0 {
|
||||
if tr.ordinalSuffixMatches(tr.prefixIdxToOrdinal(idx), h) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@@ -166,6 +162,12 @@ func (tr tableReader) has(h addr) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Return true IFF the suffix at insertion order |ordinal| matches the address |a|.
|
||||
func (tr tableReader) ordinalSuffixMatches(ordinal uint32, a addr) bool {
|
||||
li := uint64(ordinal) * addrSuffixSize
|
||||
return bytes.Compare(a[addrPrefixSize:], tr.suffixes[li:li+addrSuffixSize]) == 0
|
||||
}
|
||||
|
||||
// returns the storage associated with |h|, iff present. Returns nil if absent. On success,
|
||||
// the returned byte slice directly references the underlying storage.
|
||||
func (tr tableReader) get(h addr) (data []byte) {
|
||||
@@ -174,6 +176,10 @@ func (tr tableReader) get(h addr) (data []byte) {
|
||||
|
||||
for ; idx < tr.chunkCount && tr.prefixes[idx] == prefix; idx++ {
|
||||
ordinal := tr.prefixIdxToOrdinal(idx)
|
||||
if !tr.ordinalSuffixMatches(ordinal, h) {
|
||||
continue
|
||||
}
|
||||
|
||||
offset := tr.offsets[ordinal]
|
||||
length := uint64(tr.lengths[ordinal])
|
||||
buff := make([]byte, length) // TODO: Avoid this allocation for every get
|
||||
@@ -206,8 +212,8 @@ const readAmpThresh = 1 << 1
|
||||
// getMany retrieves multiple stored blocks and optimizes by attempting to read in larger physical
|
||||
// blocks which contain multiple stored blocks. |reqs| must be sorted by address prefix.
|
||||
func (tr tableReader) getMany(reqs []getRecord) (remaining bool) {
|
||||
filterIdx := uint64(0)
|
||||
filterLen := uint64(len(tr.prefixes))
|
||||
filterIdx := uint32(0)
|
||||
filterLen := uint32(len(tr.prefixes))
|
||||
offsetRecords := make(offsetRecSlice, 0, len(reqs))
|
||||
|
||||
// Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set
|
||||
@@ -228,9 +234,11 @@ func (tr tableReader) getMany(reqs []getRecord) (remaining bool) {
|
||||
continue
|
||||
}
|
||||
|
||||
// record all offsets within the table which *may* contain the address we are searching for.
|
||||
// record all offsets within the table which contain the data required.
|
||||
for j := filterIdx; j < filterLen && req.prefix == tr.prefixes[j]; j++ {
|
||||
offsetRecords = append(offsetRecords, offsetRec{uint32(i), tr.ordinals[j], tr.offsets[tr.ordinals[j]]})
|
||||
if tr.ordinalSuffixMatches(tr.prefixIdxToOrdinal(j), *req.a) {
|
||||
offsetRecords = append(offsetRecords, offsetRec{uint32(i), tr.ordinals[j], tr.offsets[tr.ordinals[j]]})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -326,23 +334,13 @@ func (tr tableReader) getMany(reqs []getRecord) (remaining bool) {
|
||||
|
||||
// Fetches the byte stream of data logically encoded within the table starting at |pos|.
|
||||
func (tr tableReader) parseChunk(h addr, buff []byte) []byte {
|
||||
// chksum (4 LSBytes, big-endian)
|
||||
chksum := binary.BigEndian.Uint32(buff)
|
||||
if chksum != h.Checksum() {
|
||||
return nil // false positive
|
||||
}
|
||||
buff = buff[checksumSize:]
|
||||
|
||||
// data
|
||||
data, err := snappy.Decode(nil, buff)
|
||||
dataLen := uint64(len(buff)) - checksumSize
|
||||
data, err := snappy.Decode(nil, buff[:dataLen])
|
||||
d.Chk.NoError(err)
|
||||
buff = buff[dataLen:]
|
||||
|
||||
computedAddr := computeAddr(data)
|
||||
d.Chk.True(chksum == computedAddr.Checksum()) // integrity check
|
||||
|
||||
if computedAddr != h {
|
||||
return nil // false positive
|
||||
}
|
||||
chksum := binary.BigEndian.Uint32(buff)
|
||||
d.Chk.True(chksum == crc(data))
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
@@ -48,16 +48,16 @@ func (tw *tableWriter) addChunk(h addr, data []byte) bool {
|
||||
panic("NBS blocks cannont be zero length")
|
||||
}
|
||||
|
||||
// checksum (4 LSBytes, big-endian)
|
||||
copy(tw.buff[tw.pos:tw.pos+checksumSize], h[addrSize-checksumSize:])
|
||||
tw.pos += checksumSize
|
||||
|
||||
// Compress data straight into tw.buff
|
||||
compressed := snappy.Encode(tw.buff[tw.pos:], data)
|
||||
dataLength := uint64(len(compressed))
|
||||
tw.pos += dataLength
|
||||
tw.totalPhysicalData += dataLength
|
||||
|
||||
// checksum (4 LSBytes, big-endian)
|
||||
binary.BigEndian.PutUint32(tw.buff[tw.pos:], crc(data))
|
||||
tw.pos += checksumSize
|
||||
|
||||
// Stored in insertion order
|
||||
tw.prefixes = append(tw.prefixes, prefixIndexRec{
|
||||
h.Prefix(),
|
||||
|
||||
Reference in New Issue
Block a user