Add Metadata and checksums

This commit is contained in:
Neil Macneale IV
2024-04-24 15:43:36 -07:00
parent 1ae578196d
commit 0cf305a481
6 changed files with 265 additions and 107 deletions

View File

@@ -59,12 +59,12 @@ func (r sectionReaderByteReader) ReadByte() (byte, error) {
}
func newArchiveIndex(reader io.ReaderAt, fileSize uint64) (archiveIndex, error) {
idx, bs, cc, err := loadFooter(reader, fileSize)
idx, bs, cc, md, err := loadFooter(reader, fileSize)
if err != nil {
return archiveIndex{}, err
}
indexStart := fileSize - archiveFooterSize - uint64(idx)
indexStart := fileSize - uint64(md) - archiveFooterSize - uint64(idx)
section := io.NewSectionReader(reader, int64(indexStart), int64(idx))
byteSpans := make([]byteSpan, bs+1)
@@ -127,8 +127,8 @@ func newArchiveIndex(reader io.ReaderAt, fileSize uint64) (archiveIndex, error)
}, nil
}
func loadFooter(reader io.ReaderAt, fileSize uint64) (indexSize, byteSpanCount, chunkCount uint32, err error) {
section := io.NewSectionReader(reader, int64(fileSize-archiveFooterSize), archiveFooterSize)
func loadFooter(reader io.ReaderAt, fileSize uint64) (indexSize, byteSpanCount, chunkCount, metadataSize uint32, err error) {
section := io.NewSectionReader(reader, int64(fileSize-archiveFooterSize), int64(archiveFooterSize))
bytesRead := 0
buf := make([]byte, archiveFooterSize)
@@ -136,25 +136,27 @@ func loadFooter(reader io.ReaderAt, fileSize uint64) (indexSize, byteSpanCount,
if err != nil {
return
}
if bytesRead != archiveFooterSize {
if bytesRead != int(archiveFooterSize) {
err = io.ErrUnexpectedEOF
return
}
// Verify File Signature
if string(buf[13:]) != archiveFileSignature {
if string(buf[archiveFooterSize-archiveFileSigSize:]) != archiveFileSignature {
err = ErrInvalidFileSignature
return
}
// Verify Format Version. Currently only one version is supported, but we'll need to be more flexible in the future.
if buf[12] != archiveFormatVersion {
if buf[archiveFooterSize-(archiveFileSigSize+1)] != archiveFormatVersion {
err = ErrInvalidFormatVersion
return
}
// NM4 - I hate this so much.
indexSize = binary.BigEndian.Uint32(buf[:uint32Size])
byteSpanCount = binary.BigEndian.Uint32(buf[uint32Size : uint32Size*2])
chunkCount = binary.BigEndian.Uint32(buf[uint32Size*2 : uint32Size*3])
metadataSize = binary.BigEndian.Uint32(buf[uint32Size*3 : uint32Size*4])
return
}

View File

@@ -27,7 +27,7 @@ import (
)
func TestArchiveSingleChunk(t *testing.T) {
writer := NewFixedBufferTableSink(make([]byte, 1024))
writer := NewFixedBufferByteSink(make([]byte, 1024))
aw := newArchiveWriter(writer)
testBlob := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
@@ -41,15 +41,20 @@ func TestArchiveSingleChunk(t *testing.T) {
err = aw.stageChunk(oneHash, 0, 1)
assert.NoError(t, err)
n, err := aw.writeIndex()
aw.finalizeByteSpans()
err = aw.writeIndex()
assert.NoError(t, err)
assert.Equal(t, uint32(24), n) // Verified manually. A single chunk allows for single byte varints, so
assert.Equal(t, uint32(24), aw.indexLen) // Verified manually. A single chunk allows for single byte varints, so
// ByteSpan -> 2 bytes, Prefix -> 8 bytes, ChunkRef -> 2 bytes, Suffix -> 12 bytes. Total 24 bytes.
err = aw.writeFooter(n)
err = aw.writeMetadata([]byte(""))
assert.NoError(t, err)
assert.Equal(t, uint64(54), aw.bytesWritten) // 10 + 24 + 20 (footer is 20 bytes)
err = aw.writeFooter()
assert.NoError(t, err)
assert.Equal(t, 10+24+archiveFooterSize, aw.bytesWritten) // 10 data bytes, 24 index bytes + footer
theBytes := writer.buff[:writer.pos]
fileSize := uint64(len(theBytes))
@@ -67,7 +72,7 @@ func TestArchiveSingleChunk(t *testing.T) {
}
func TestArchiveSingleChunkWithDictionary(t *testing.T) {
writer := NewFixedBufferTableSink(make([]byte, 1024))
writer := NewFixedBufferByteSink(make([]byte, 1024))
aw := newArchiveWriter(writer)
testDict := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
@@ -79,8 +84,10 @@ func TestArchiveSingleChunkWithDictionary(t *testing.T) {
err := aw.stageChunk(h, 1, 2)
assert.NoError(t, err)
n, _ := aw.writeIndex()
err = aw.writeFooter(n)
aw.finalizeByteSpans()
_ = aw.writeIndex()
_ = aw.writeMetadata([]byte(""))
err = aw.writeFooter()
assert.NoError(t, err)
theBytes := writer.buff[:writer.pos]
@@ -99,7 +106,7 @@ func TestArchiveSingleChunkWithDictionary(t *testing.T) {
}
func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
writer := NewFixedBufferTableSink(make([]byte, 1024))
writer := NewFixedBufferByteSink(make([]byte, 1024))
aw := newArchiveWriter(writer)
dict1 := []byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1} // span 1
@@ -136,8 +143,11 @@ func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
_ = aw.stageChunk(h6, 0, 6)
_ = aw.stageChunk(h7, 1, 7)
n, _ := aw.writeIndex()
_ = aw.writeFooter(n)
aw.finalizeByteSpans()
_ = aw.writeIndex()
_ = aw.writeMetadata([]byte(""))
_ = aw.writeFooter()
theBytes := writer.buff[:writer.pos]
fileSize := uint64(len(theBytes))
@@ -187,7 +197,7 @@ func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
}
func TestArchiveDictDecompression(t *testing.T) {
writer := NewFixedBufferTableSink(make([]byte, 4096))
writer := NewFixedBufferByteSink(make([]byte, 4096))
// This is 32K worth of data, but it's all very similar. Only fits in 4K if compressed with a dictionary.
chks := generateSimilarChunks(42, 32)
@@ -212,16 +222,19 @@ func TestArchiveDictDecompression(t *testing.T) {
err = aw.stageChunk(chk.Hash(), dictId, chId)
assert.NoError(t, err)
}
aw.finalizeByteSpans()
n, err := aw.writeIndex()
err = aw.writeIndex()
assert.NoError(t, err)
err = aw.writeFooter(n)
err = aw.writeMetadata([]byte("hello world"))
err = aw.writeFooter()
assert.NoError(t, err)
theBytes := writer.buff[:writer.pos]
fileSize := uint64(len(theBytes))
readerAt := bytes.NewReader(theBytes)
aIdx, err := newArchiveIndex(readerAt, fileSize)
assert.NoError(t, err)
// Now verify that we can look up the chunks by their original addresses, and the data is the same.
for _, chk := range chks {
@@ -232,16 +245,16 @@ func TestArchiveDictDecompression(t *testing.T) {
}
func TestArchiveBlockCorruption(t *testing.T) {
writer := NewFixedBufferTableSink(make([]byte, 1024))
writer := NewFixedBufferByteSink(make([]byte, 1024))
aw := newArchiveWriter(writer)
testBlob := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
_, _ = aw.writeByteSpan(testBlob)
h := hashWithPrefix(t, 23)
_ = aw.stageChunk(h, 0, 1)
n, _ := aw.writeIndex()
_ = aw.writeFooter(n)
aw.finalizeByteSpans()
_ = aw.writeIndex()
_ = aw.writeFooter()
theBytes := writer.buff[:writer.pos]
fileSize := uint64(len(theBytes))

View File

@@ -15,6 +15,7 @@
package nbs
import (
"bytes"
"encoding/binary"
"errors"
"io"
@@ -32,24 +33,34 @@ There are byte spans with in the archive which are not addressable by a Chunk Ad
to aid in the compression of the Chunks.
A Dolt Archive file follows the following format:
+------------+------------+-----+------------+-------+--------+
| ByteSpan 1 | ByteSpan 2 | ... | ByteSpan N | Index | Footer |
+------------+------------+-----+------------+-------+--------+
+------------+------------+-----+------------+-------+----------+--------+
| ByteSpan 1 | ByteSpan 2 | ... | ByteSpan N | Index | Metadata | Footer |
+------------+------------+-----+------------+-------+----------+--------+
In reverse order, since that's how we read it
Footer:
+----------------------+-------------------------+----------------------+--------------------+--------------------+
| (Uint32) IndexLength | (Uint32) ByteSpan Count | (Uint32) Chunk Count | (1) Format Version | (7) File Signature |
+----------------------+-------------------------+----------------------+--------------------+--------------------+
+----------------------+-------------------------+----------------------+--------------------------+-----------------+------------------------+--------------------+
| (Uint32) IndexLength | (Uint32) ByteSpan Count | (Uint32) Chunk Count | (Uint32) Metadata Length | (192) CheckSums | (Uint8) Format Version | (7) File Signature |
+----------------------+-------------------------+----------------------+--------------------------+-----------------+------------------------+--------------------+
- Index Length: The length of the Index in bytes.
- ByteSpan Count: (N) The number of ByteSpans in the Archive. (does not include the null ByteSpan)
- Chunk Count: (M) The number of Chunk Records in the Archive.
* These 3 values are all required to properly parse the Index. Note that the NBS Index has a deterministic size
based on the Chunk Count. This is not the case with a Dolt Archive.
- Metadata Length: The length of the Metadata in bytes.
- CheckSums: See Below.
- Format Version: Sequence starting at 1.
- File Signature: Some would call this a magic number. Not on my watch. Dolt Archives have a 7 byte signature: "DOLTARC"
CheckSums:
+----------------------------+-------------------+----------------------+
| (64) Sha512 ByteSpan 1 - N | (64) Sha512 Index | (64) Sha512 Metadata |
+----------------------------+-------------------+----------------------+
- The Sha512 checksums of the ByteSpans, Index, and Metadata. Currently unused, but may be used in the future. Leaves
the opening to verify integrity manually at least, but could be used in the future to allow to break the file into
parts, and ensure we can verify the integrity of each part.
Index:
+--------------+------------+-----------------+----------+
| ByteSpan Map | Prefix Map | ChunkReferences | Suffixes |
@@ -98,6 +109,12 @@ Index:
- Each Hash Suffix is the last 12 bytes of a Chunk in this Table.
- Hash Suffix M must correspond to Prefix M and Chunk Record M
Metadata:
The Metadata section is intended to be used for additional information about the Archive. This may include the version
of Dolt that created the archive, possibly references to other archives, or other information. For Format version 1,
We use a simple JSON object. The Metadata Length is the length of the JSON object in bytes. Could be a Flatbuffer in
the future, which would mandate a format version bump.
ByteSpan:
+----------------+
| Data as []byte |
@@ -129,15 +146,24 @@ Chunk Retrieval (phase 1 is similar to NBS):
- Decompress the Chunk data using zstd with the Dictionary data.
*/
const archiveFormatVersion = 1
const archiveFileSignature = "DOLTARC"
const archiveFooterSize = 4 + 4 + 4 + 1 + 7
const (
archiveFormatVersion = 1
archiveFileSignature = "DOLTARC"
archiveFileSigSize = uint64(len(archiveFileSignature))
archiveCheckSumSize = 64 * 3 // sha512 3 times.
archiveFooterSize = uint32Size + // index length
uint32Size + // byte span count
uint32Size + // chunk count
uint32Size + // metadata length
archiveCheckSumSize +
1 + // version byte
archiveFileSigSize
)
var ErrInvalidChunkRange = errors.New("invalid chunk range")
var ErrInvalidDictionaryRange = errors.New("invalid dictionary range")
var ErrInvalidFileSignature = errors.New("invalid file signature")
var ErrInvalidFormatVersion = errors.New("invalid format version")
var ErrCRCMismatch = errors.New("CRC mismatch")
type stagedByteSpan struct {
offset uint64
@@ -151,15 +177,23 @@ type stagedChunkRef struct {
}
type stagedChunkRefSlice []stagedChunkRef
type sha512Sum [64]byte
type archiveWriter struct {
output io.Writer
bytesWritten uint64
stagedBytes stagedByteSpanSlice
stagedChunks stagedChunkRefSlice
output HashingByteSink
bytesWritten uint64
stagedBytes stagedByteSpanSlice
stagedChunks stagedChunkRefSlice
indexLen uint32
metadataLen uint32
dataCheckSum sha512Sum
indexCheckSum sha512Sum
metadataCheckSum sha512Sum
}
func newArchiveWriter(output io.Writer) *archiveWriter {
return &archiveWriter{output: output}
func newArchiveWriter(output ByteSink) *archiveWriter {
hbs := NewSHA512HashingByteSink(output)
return &archiveWriter{output: *hbs}
}
// writeByteSpan writes a byte span to the archive, returning the ByteSpan ID if the write was successful. Note
@@ -209,55 +243,58 @@ func (scrs stagedChunkRefSlice) Swap(i, j int) {
scrs[i], scrs[j] = scrs[j], scrs[i]
}
func (aw *archiveWriter) finalize() error {
indexLen, err := aw.writeIndex()
if err != nil {
return err
}
return aw.writeFooter(indexLen)
// finalizeByteSpans writes the ... NM4
func (aw *archiveWriter) finalizeByteSpans() {
// Get the checksum for the data written so far
aw.dataCheckSum = sha512Sum(aw.output.GetSum())
aw.output.ResetHasher()
}
func (aw *archiveWriter) writeFooter(indexLen uint32) error {
// Write out the index length
err := binary.Write(aw.output, binary.BigEndian, indexLen)
// Helper method.
func (aw *archiveWriter) writeUint32(val uint32) error {
bb := &bytes.Buffer{}
err := binary.Write(bb, binary.BigEndian, val)
if err != nil {
return err
}
i := bb.Len()
_ = i
n, err := aw.output.Write(bb.Bytes())
if err != nil {
return err
}
if n != uint32Size {
return io.ErrShortWrite
}
aw.bytesWritten += uint32Size
// Write out the byte span count
err = binary.Write(aw.output, binary.BigEndian, uint32(len(aw.stagedBytes)))
if err != nil {
return err
}
aw.bytesWritten += uint32Size
// Write out the chunk count
err = binary.Write(aw.output, binary.BigEndian, uint32(len(aw.stagedChunks)))
if err != nil {
return err
}
aw.bytesWritten += uint32Size
// Write out the format version
err = binary.Write(aw.output, binary.BigEndian, uint8(archiveFormatVersion))
if err != nil {
return err
}
aw.bytesWritten++
// Write out the file signature
_, err = aw.output.Write([]byte(archiveFileSignature))
if err != nil {
return err
}
aw.bytesWritten += 7
return nil
}
func (aw *archiveWriter) writeIndex() (uint32, error) {
func (aw *archiveWriter) writeUint64(val uint64) error {
bb := &bytes.Buffer{}
err := binary.Write(bb, binary.BigEndian, val)
if err != nil {
return err
}
n, err := aw.output.Write(bb.Bytes())
if err != nil {
return err
}
if n != uint64Size {
return io.ErrShortWrite
}
aw.bytesWritten += uint64Size
return nil
}
// writeIndex writes the index to the archive. Expects the hasher to be reset before be called, and will reset it. It
// sets the indexLen and indexCheckSum fields on the archiveWriter, and updates the bytesWritten field.
func (aw *archiveWriter) writeIndex() error {
startingByteCount := aw.bytesWritten
varIbuf := make([]byte, binary.MaxVarintLen64)
@@ -267,20 +304,20 @@ func (aw *archiveWriter) writeIndex() (uint32, error) {
n := binary.PutUvarint(varIbuf, bs.offset)
written, err := aw.output.Write(varIbuf[:n])
if err != nil {
return 0, err
return err
}
if written != n {
return 0, io.ErrShortWrite
return io.ErrShortWrite
}
aw.bytesWritten += uint64(written)
n = binary.PutUvarint(varIbuf, uint64(bs.length))
written, err = aw.output.Write(varIbuf[:n])
if err != nil {
return 0, err
return err
}
if written != n {
return 0, io.ErrShortWrite
return io.ErrShortWrite
}
aw.bytesWritten += uint64(written)
}
@@ -291,31 +328,30 @@ func (aw *archiveWriter) writeIndex() (uint32, error) {
// We lay down the sorted chunk list in it's three forms.
// Prefix Map
for _, scr := range aw.stagedChunks {
err := binary.Write(aw.output, binary.BigEndian, scr.hash.Prefix())
err := aw.writeUint64(scr.hash.Prefix())
if err != nil {
return 0, err
return err
}
aw.bytesWritten += uint64Size
}
// ChunkReferences
for _, scr := range aw.stagedChunks {
n := binary.PutUvarint(varIbuf, uint64(scr.dictionary))
written, err := aw.output.Write(varIbuf[:n])
if err != nil {
return 0, err
return err
}
if written != n {
return 0, io.ErrShortWrite
return io.ErrShortWrite
}
aw.bytesWritten += uint64(written)
n = binary.PutUvarint(varIbuf, uint64(scr.data))
written, err = aw.output.Write(varIbuf[:n])
if err != nil {
return 0, err
return err
}
if written != n {
return 0, io.ErrShortWrite
return io.ErrShortWrite
}
aw.bytesWritten += uint64(written)
}
@@ -323,14 +359,110 @@ func (aw *archiveWriter) writeIndex() (uint32, error) {
for _, scr := range aw.stagedChunks {
n, err := aw.output.Write(scr.hash.Suffix())
if err != nil {
return 0, err
return err
}
if n != hash.SuffixLen {
return 0, io.ErrShortWrite
return io.ErrShortWrite
}
aw.bytesWritten += uint64(hash.SuffixLen)
}
return uint32(aw.bytesWritten - startingByteCount), nil
aw.indexLen = uint32(aw.bytesWritten - startingByteCount)
aw.indexCheckSum = sha512Sum(aw.output.GetSum())
aw.output.ResetHasher()
return nil
}
// writeMetadata writes the metadata to the archive. Expects the hasher to be reset before be called, and will reset it.
// It sets the metadataLen and metadataCheckSum fields on the archiveWriter, and updates the bytesWritten field.
//
// Empty input is allowed.
func (aw *archiveWriter) writeMetadata(data []byte) error {
if data == nil || len(data) == 0 {
aw.metadataCheckSum = sha512Sum(aw.output.GetSum())
aw.metadataLen = 0
aw.output.ResetHasher()
return nil
}
written, err := aw.output.Write(data)
if err != nil {
return err
}
if written != len(data) {
return io.ErrShortWrite
}
aw.bytesWritten += uint64(written)
aw.metadataLen = uint32(written)
aw.metadataCheckSum = sha512Sum(aw.output.GetSum())
aw.output.ResetHasher()
return nil
}
func (aw *archiveWriter) writeFooter() error {
// Write out the index length
err := aw.writeUint32(aw.indexLen)
if err != nil {
return err
}
// Write out the byte span count
err = aw.writeUint32(uint32(len(aw.stagedBytes)))
if err != nil {
return err
}
// Write out the chunk count
err = aw.writeUint32(uint32(len(aw.stagedChunks)))
if err != nil {
return err
}
// Write out the metadata length
err = aw.writeUint32(aw.metadataLen)
if err != nil {
return err
}
err = aw.writeCheckSums()
if err != nil {
return err
}
// Write out the format version
_, err = aw.output.Write([]byte{archiveFormatVersion})
if err != nil {
return err
}
aw.bytesWritten++
// Write out the file signature
_, err = aw.output.Write([]byte(archiveFileSignature))
if err != nil {
return err
}
aw.bytesWritten += archiveFileSigSize
return nil
}
func (aw *archiveWriter) writeCheckSums() error {
_, err := aw.output.Write(aw.dataCheckSum[:])
if err != nil {
return err
}
_, err = aw.output.Write(aw.indexCheckSum[:])
if err != nil {
return err
}
_, err = aw.output.Write(aw.metadataCheckSum[:])
if err != nil {
return err
}
aw.bytesWritten += archiveCheckSumSize
return nil
}

View File

@@ -17,6 +17,7 @@ package nbs
import (
"bytes"
"crypto/md5"
"crypto/sha512"
"errors"
"hash"
"io"
@@ -72,8 +73,8 @@ type FixedBufferByteSink struct {
pos uint64
}
// NewFixedBufferTableSink creates a FixedBufferTableSink which will use the supplied buffer
func NewFixedBufferTableSink(buff []byte) *FixedBufferByteSink {
// NewFixedBufferByteSink creates a FixedBufferTableSink which will use the supplied buffer
func NewFixedBufferByteSink(buff []byte) *FixedBufferByteSink {
if len(buff) == 0 {
panic("must provide a buffer")
}
@@ -119,8 +120,8 @@ type BlockBufferByteSink struct {
blocks [][]byte
}
// NewBlockBufferTableSink creates a BlockBufferByteSink with the provided block size.
func NewBlockBufferTableSink(blockSize int) *BlockBufferByteSink {
// NewBlockBufferByteSink creates a BlockBufferByteSink with the provided block size.
func NewBlockBufferByteSink(blockSize int) *BlockBufferByteSink {
block := make([]byte, 0, blockSize)
return &BlockBufferByteSink{blockSize, 0, [][]byte{block}}
}
@@ -322,7 +323,11 @@ type HashingByteSink struct {
size uint64
}
func NewHashingByteSink(backingSink ByteSink) *HashingByteSink {
func NewSHA512HashingByteSink(backingSink ByteSink) *HashingByteSink {
return &HashingByteSink{backingSink: backingSink, hasher: sha512.New(), size: 0}
}
func NewMD5HashingByteSink(backingSink ByteSink) *HashingByteSink {
return &HashingByteSink{backingSink: backingSink, hasher: md5.New(), size: 0}
}
@@ -361,11 +366,17 @@ func (sink *HashingByteSink) Reader() (io.ReadCloser, error) {
return sink.backingSink.Reader()
}
// GetMD5 gets the MD5 hash of all the bytes written to the sink
func (sink *HashingByteSink) GetMD5() []byte {
// Execute the hasher.Sum() function and return the result
func (sink *HashingByteSink) GetSum() []byte {
return sink.hasher.Sum(nil)
}
// ResetHasher resets the hasher to allow for checksums at various points in the data stream. The expectation is that
// you would call GetSum prior to calling this function.
func (sink *HashingByteSink) ResetHasher() {
sink.hasher.Reset()
}
// Size gets the number of bytes written to the sink
func (sink *HashingByteSink) Size() uint64 {
return sink.size

View File

@@ -28,7 +28,7 @@ import (
func TestBlockBufferTableSink(t *testing.T) {
createSink := func() ByteSink {
return NewBlockBufferTableSink(128)
return NewBlockBufferByteSink(128)
}
suite.Run(t, &TableSinkSuite{createSink, t})
@@ -36,7 +36,7 @@ func TestBlockBufferTableSink(t *testing.T) {
func TestFixedBufferTableSink(t *testing.T) {
createSink := func() ByteSink {
return NewFixedBufferTableSink(make([]byte, 32*1024))
return NewFixedBufferByteSink(make([]byte, 32*1024))
}
suite.Run(t, &TableSinkSuite{createSink, t})

View File

@@ -57,7 +57,7 @@ func NewCmpChunkTableWriter(tempDir string) (*CmpChunkTableWriter, error) {
return nil, err
}
return &CmpChunkTableWriter{NewHashingByteSink(s), 0, 0, nil, nil, s.path}, nil
return &CmpChunkTableWriter{NewMD5HashingByteSink(s), 0, 0, nil, nil, s.path}, nil
}
func (tw *CmpChunkTableWriter) ChunkCount() int {
@@ -71,7 +71,7 @@ func (tw *CmpChunkTableWriter) ContentLength() uint64 {
// Gets the MD5 of the entire table file
func (tw *CmpChunkTableWriter) GetMD5() []byte {
return tw.sink.GetMD5()
return tw.sink.GetSum()
}
// AddCmpChunk adds a compressed chunk