mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-09 03:09:12 -06:00
archive support to read version 2 format.
Docs updated, and updated tests.
This commit is contained in:
@@ -25,11 +25,21 @@ ByteSpans concatenated together, with an index at the end of the file. Chunk Add
|
||||
Chunks from the Archive.
|
||||
|
||||
ByteSpans are arbitrary offset/lengths into the file which store (1) zstd dictionary data, and (2) compressed chunk
|
||||
data. Each Chunk is stored as a pair of ByteSpans (dict,data). Dictionary ByteSpans can (should) be used by multiple
|
||||
data.
|
||||
|
||||
Each Chunk is stored as one or two ByteSpans. Dictionary ByteSpans can (should) be used by multiple
|
||||
Chunks, so there are more ByteSpans than Chunks. The Index is used to map Chunks to ByteSpan pairs. These pairs are
|
||||
called ChunkRefs, and we store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a
|
||||
given Chunk with minimal processing at load time.
|
||||
|
||||
Format Version Differences:
|
||||
- Version 1: All chunks are compressed with zStd. Dictionaries are stored as a chunk ref with dictionary ID 0.
|
||||
The dictionaries themselves are zStd compressed. Chunks are stored with a pair of ByteSpans, the first
|
||||
being the dictionary, and the second being the chunk data.
|
||||
- Version 2: In addition to zStd compressed chunks, we also support Snappy compressed chunks, in the same format
|
||||
as Noms table files. Any Snappy compressed chunk will have a dictionary ID of 0, and the chunk data
|
||||
will be stored in the second Bytespan. It is stored with 32 bit CRC, just like Noms table files.
|
||||
|
||||
A Dolt Archive file follows the following format:
|
||||
+------------+------------+-----+------------+-------+----------+--------+
|
||||
| ByteSpan 1 | ByteSpan 2 | ... | ByteSpan N | Index | Metadata | Footer |
|
||||
@@ -48,7 +58,7 @@ Footer:
|
||||
based on the Chunk Count. This is not the case with a Dolt Archive.
|
||||
- Metadata Length: The length of the Metadata in bytes.
|
||||
- CheckSums: See Below.
|
||||
- Format Version: Sequence starting at 1.
|
||||
- Format Version: Sequence starting at 1. Currently, 1 and 2 are supported.
|
||||
- File Signature: Some would call this a magic number. Not on my watch. Dolt Archives have a 7 byte signature: "DOLTARC"
|
||||
|
||||
CheckSums:
|
||||
@@ -104,6 +114,8 @@ Index:
|
||||
+--------------------------------+---------------------------+
|
||||
- Dictionary: ID for a ByteSpan to be used as zstd dictionary. 0 refers to the empty ByteSpan, which indicates no dictionary.
|
||||
- Chunk: ID for the ByteSpan containing the Chunk data. Never 0.
|
||||
- ChunkRefs with a Dictionary ID of 0 are zStd compressed Chunks. The Chunk data is stored in the second ByteSpan. (version 1)
|
||||
- ChunkRefs with a Dictionary ID of 0 are Snappy compressed Chunks. The Chunk data is stored in the second ByteSpan. (version 2)
|
||||
|
||||
Suffixes:
|
||||
+--------------------+--------------------+-----+----------------------+
|
||||
@@ -144,14 +156,14 @@ Chunk Retrieval (phase 1 is similar to NBS):
|
||||
- Take the Chunk Id discovered in Phase one, and use it to grab that index from the ChunkRefs Map.
|
||||
- Retrieve the ByteSpan Id for the Chunk data. Verify integrity with CRC.
|
||||
- If Dictionary is 0:
|
||||
- Decompress the Chunk data using zstd (no dictionary)
|
||||
- Decompress the Chunk data using zstd (no dictionary, version 1).
|
||||
- Decompress the Chunk data using snappy (no dictionary, version 2).
|
||||
- Otherwise:
|
||||
- Retrieve the ByteSpan ID for the Dictionary data.
|
||||
- Decompress the Chunk data using zstd with the Dictionary data.
|
||||
*/
|
||||
|
||||
const (
|
||||
archiveFormatVersion = uint8(1)
|
||||
archiveFileSignature = "DOLTARC"
|
||||
archiveFileSigSize = uint64(len(archiveFileSignature))
|
||||
archiveCheckSumSize = sha512.Size * 3 // sha512 3 times.
|
||||
@@ -182,6 +194,13 @@ const ( // afr = Archive FooteR
|
||||
afrSigOffset = afrVersionOffset + 1
|
||||
)
|
||||
|
||||
// Archive Format Versions.
|
||||
const (
|
||||
archiveVersionInitial = uint8(1)
|
||||
archiveVersionSnappySupport = uint8(2)
|
||||
archiveFormatVersionMax = archiveVersionSnappySupport
|
||||
)
|
||||
|
||||
// Archive Metadata Data Keys are the fields in the archive metadata that are stored in the footer. These are used
|
||||
// to store information about the archive that is semi-structured. The data is stored in JSON format, all values are strings.
|
||||
const ( //amdk = Archive Metadata Data Key
|
||||
|
||||
@@ -359,7 +359,7 @@ func writeDataToArchive(
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
err = arcW.stageChunk(cs.chunkId, dictId, dataId)
|
||||
err = arcW.stageZStdChunk(cs.chunkId, dictId, dataId)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
@@ -376,7 +376,7 @@ func writeDataToArchive(
|
||||
ungroupedChunkCount := int32(len(allChunks))
|
||||
ungroupedChunkProgress := int32(0)
|
||||
|
||||
// Any chunks remaining will be written out individually.
|
||||
// Any chunks remaining will be written out individually, using the default dictionary.
|
||||
for h := range allChunks {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -396,7 +396,7 @@ func writeDataToArchive(
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
err = arcW.stageChunk(h, dictId, id)
|
||||
err = arcW.stageZStdChunk(h, dictId, id)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ func newArchiveMetadata(ctx context.Context, reader tableReaderAt, fileSize uint
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if footer.formatVersion != archiveFormatVersion {
|
||||
if footer.formatVersion > archiveFormatVersionMax {
|
||||
return nil, ErrInvalidFormatVersion
|
||||
}
|
||||
|
||||
@@ -258,8 +258,8 @@ func buildFooter(fileSize uint64, buf []byte) (f archiveFooter, err error) {
|
||||
err = ErrInvalidFileSignature
|
||||
return
|
||||
}
|
||||
// Verify Format Version. Currently only one version is supported, but we'll need to be more flexible in the future.
|
||||
if f.formatVersion != archiveFormatVersion {
|
||||
// Verify Format Version. 1 and 2 supported.
|
||||
if f.formatVersion > archiveFormatVersionMax {
|
||||
err = ErrInvalidFormatVersion
|
||||
return
|
||||
}
|
||||
@@ -309,7 +309,20 @@ func (ar archiveReader) get(ctx context.Context, hash hash.Hash, stats *Stats) (
|
||||
if err != nil || data == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if dict == nil {
|
||||
if ar.footer.formatVersion >= archiveVersionSnappySupport {
|
||||
// Snappy compression format. The data is compressed with a checksum at the end.
|
||||
cc, err := NewCompressedChunk(hash, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
chk, err := cc.ToChunk()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return chk.Data(), nil
|
||||
}
|
||||
return nil, errors.New("runtime error: unable to get archived chunk. dictionary is nil")
|
||||
}
|
||||
|
||||
@@ -329,6 +342,18 @@ func (ar archiveReader) getAsToChunker(ctx context.Context, h hash.Hash, stats *
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if dict == nil {
|
||||
if ar.footer.formatVersion >= archiveVersionSnappySupport {
|
||||
// Snappy compression format. The data is compressed with a checksum at the end.
|
||||
cc, err := NewCompressedChunk(h, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cc, nil
|
||||
}
|
||||
return nil, errors.New("runtime error: unable to get archived chunk. dictionary is nil")
|
||||
}
|
||||
|
||||
if data == nil {
|
||||
return ArchiveToChunker{h, nil, []byte{}}, nil
|
||||
}
|
||||
@@ -354,10 +379,14 @@ func (ar archiveReader) readByteSpan(ctx context.Context, bs byteSpan, stats *St
|
||||
return buff, nil
|
||||
}
|
||||
|
||||
// getRaw returns the raw data for the given hash. If the hash is not found, nil is returned for both slices. Also,
|
||||
// no error is returned in this case. Errors will only be returned if there is an io error.
|
||||
// getRaw returns the raw data for the given hash. If the hash is not found, nil is returned for both output, and no error.
|
||||
//
|
||||
// The data returned is still compressed, and the DDict is required to decompress it.
|
||||
// The data is returned still compressed:
|
||||
// Format Version 1: Only zStd compression is supported. The data returned requires the dictionary to be decompressed.
|
||||
// Format Version 2: The compression format of the data is:
|
||||
// - zStd when a dictionary is returned. The data is decompressed with the dictionary.
|
||||
// - Snappy compression when no dictionary is returned. The data has a checksum 32 bit checksum at the end. This
|
||||
// format matches the noms format.
|
||||
func (ar archiveReader) getRaw(ctx context.Context, hash hash.Hash, stats *Stats) (dict *gozstd.DDict, data []byte, err error) {
|
||||
idx := ar.search(hash)
|
||||
if idx < 0 {
|
||||
|
||||
@@ -24,27 +24,42 @@ import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/gozstd"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
|
||||
// There are many tests which don't actually use the dictionary to compress. But some dictionary is required, so
|
||||
// we'll use this one.
|
||||
var defaultDict []byte
|
||||
var defaultCDict *gozstd.CDict
|
||||
|
||||
func init() {
|
||||
defaultDict, defaultCDict = generateTerribleDefaultDictionary()
|
||||
}
|
||||
|
||||
func TestArchiveSingleChunk(t *testing.T) {
|
||||
writer := NewFixedBufferByteSink(make([]byte, 1024))
|
||||
writer := NewFixedBufferByteSink(make([]byte, 4096))
|
||||
aw := newArchiveWriterWithSink(writer)
|
||||
|
||||
dId, err := aw.writeByteSpan(defaultDict)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint32(1), dId)
|
||||
|
||||
testBlob := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
bsId, err := aw.writeByteSpan(testBlob)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint32(1), bsId)
|
||||
assert.Equal(t, uint64(10), aw.bytesWritten) // 10 data bytes. No CRC or anything.
|
||||
assert.Equal(t, uint32(2), bsId)
|
||||
|
||||
dataSz := uint64(len(defaultDict)) + uint64(len(testBlob))
|
||||
assert.Equal(t, dataSz, aw.bytesWritten)
|
||||
|
||||
oneHash := hashWithPrefix(t, 23)
|
||||
|
||||
err = aw.stageChunk(oneHash, 0, 1)
|
||||
err = aw.stageZStdChunk(oneHash, dId, bsId)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = aw.finalizeByteSpans()
|
||||
@@ -52,9 +67,10 @@ func TestArchiveSingleChunk(t *testing.T) {
|
||||
|
||||
err = aw.writeIndex()
|
||||
assert.NoError(t, err)
|
||||
// Index size is not deterministic from the number of chunks, but when no dictionaries are in play, 36 bytes is correct
|
||||
// because: 8 (uint64,prefix) + 8 (uint64,offset) + 4 (uint32,dict) + 4 (uint32,data) + 12 (hash.Suffix) = 36
|
||||
assert.Equal(t, uint32(36), aw.indexLen)
|
||||
// Index size is not deterministic from the number of chunks, but when 1 dictionary and one chunk are in play, 44 bytes is correct:
|
||||
// [SpanIndex - two ByteSpans] [Prefix Map] [chunk ref ] [hash.Suffix --]
|
||||
// 16 (2 uint64s) + 8 (1 uint64) + 8 (2 uint32s) + 12 = ___44___
|
||||
assert.Equal(t, uint32(44), aw.indexLen)
|
||||
|
||||
err = aw.writeMetadata([]byte(""))
|
||||
assert.NoError(t, err)
|
||||
@@ -62,7 +78,7 @@ func TestArchiveSingleChunk(t *testing.T) {
|
||||
err = aw.writeFooter()
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 10+36+archiveFooterSize, aw.bytesWritten) // 10 data bytes, 36 index bytes + footer
|
||||
assert.Equal(t, dataSz+44+archiveFooterSize, aw.bytesWritten)
|
||||
|
||||
theBytes := writer.buff[:writer.pos]
|
||||
fileSize := uint64(len(theBytes))
|
||||
@@ -77,20 +93,19 @@ func TestArchiveSingleChunk(t *testing.T) {
|
||||
|
||||
dict, data, err := aIdx.getRaw(context.Background(), oneHash, &Stats{})
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, dict)
|
||||
assert.NotNil(t, dict)
|
||||
assert.Equal(t, testBlob, data)
|
||||
}
|
||||
|
||||
func TestArchiveSingleChunkWithDictionary(t *testing.T) {
|
||||
writer := NewFixedBufferByteSink(make([]byte, 4096))
|
||||
aw := newArchiveWriterWithSink(writer)
|
||||
testDict, _ := generateTerribleDefaultDictionary()
|
||||
testData := []byte{9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
|
||||
_, _ = aw.writeByteSpan(testDict)
|
||||
_, _ = aw.writeByteSpan(defaultDict)
|
||||
_, _ = aw.writeByteSpan(testData)
|
||||
|
||||
h := hashWithPrefix(t, 42)
|
||||
err := aw.stageChunk(h, 1, 2)
|
||||
err := aw.stageZStdChunk(h, 1, 2)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_ = aw.finalizeByteSpans()
|
||||
@@ -119,7 +134,6 @@ func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
|
||||
writer := NewFixedBufferByteSink(make([]byte, 4096))
|
||||
aw := newArchiveWriterWithSink(writer)
|
||||
|
||||
defDict, _ := generateTerribleDefaultDictionary() // span 1
|
||||
data1 := []byte{11, 11, 11, 11, 11, 11, 11, 11, 11, 11} // span 2
|
||||
dict1, _ := generateDictionary(1) // span 3
|
||||
data2 := []byte{22, 22, 22, 22, 22, 22, 22, 22, 22, 22} // span 4
|
||||
@@ -127,41 +141,41 @@ func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
|
||||
data4 := []byte{44, 44, 44, 44, 44, 44, 44, 44, 44, 44} // span 6
|
||||
dict2, _ := generateDictionary(2) // span 7
|
||||
|
||||
id, _ := aw.writeByteSpan(defDict)
|
||||
id, _ := aw.writeByteSpan(defaultDict)
|
||||
assert.Equal(t, uint32(1), id)
|
||||
|
||||
h1 := hashWithPrefix(t, 42)
|
||||
id, _ = aw.writeByteSpan(data1)
|
||||
assert.Equal(t, uint32(2), id)
|
||||
_ = aw.stageChunk(h1, 1, 2)
|
||||
_ = aw.stageZStdChunk(h1, 1, 2)
|
||||
|
||||
h2 := hashWithPrefix(t, 42)
|
||||
id, _ = aw.writeByteSpan(dict1)
|
||||
assert.Equal(t, uint32(3), id)
|
||||
id, _ = aw.writeByteSpan(data2)
|
||||
assert.Equal(t, uint32(4), id)
|
||||
_ = aw.stageChunk(h2, 3, 4)
|
||||
_ = aw.stageZStdChunk(h2, 3, 4)
|
||||
|
||||
h3 := hashWithPrefix(t, 42)
|
||||
id, _ = aw.writeByteSpan(data3)
|
||||
assert.Equal(t, uint32(5), id)
|
||||
_ = aw.stageChunk(h3, 3, 5)
|
||||
_ = aw.stageZStdChunk(h3, 3, 5)
|
||||
|
||||
h4 := hashWithPrefix(t, 81)
|
||||
id, _ = aw.writeByteSpan(data4)
|
||||
assert.Equal(t, uint32(6), id)
|
||||
_ = aw.stageChunk(h4, 0, 6)
|
||||
_ = aw.stageZStdChunk(h4, 1, 6)
|
||||
|
||||
h5 := hashWithPrefix(t, 21)
|
||||
id, _ = aw.writeByteSpan(dict2)
|
||||
assert.Equal(t, uint32(7), id)
|
||||
_ = aw.stageChunk(h5, 7, 2)
|
||||
_ = aw.stageZStdChunk(h5, 7, 2)
|
||||
|
||||
h6 := hashWithPrefix(t, 88)
|
||||
_ = aw.stageChunk(h6, 7, 2)
|
||||
_ = aw.stageZStdChunk(h6, 7, 2)
|
||||
|
||||
h7 := hashWithPrefix(t, 42)
|
||||
_ = aw.stageChunk(h7, 3, 5)
|
||||
_ = aw.stageZStdChunk(h7, 3, 5)
|
||||
|
||||
_ = aw.finalizeByteSpans()
|
||||
_ = aw.writeIndex()
|
||||
@@ -203,7 +217,7 @@ func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
|
||||
assert.Equal(t, data3, data)
|
||||
|
||||
dict, data, _ = aIdx.getRaw(c, h4, s)
|
||||
assert.Nil(t, dict)
|
||||
assert.NotNil(t, dict)
|
||||
assert.Equal(t, data, data)
|
||||
|
||||
dict, data, _ = aIdx.getRaw(c, h5, s)
|
||||
@@ -243,7 +257,7 @@ func TestArchiveDictDecompression(t *testing.T) {
|
||||
chId, err := aw.writeByteSpan(cmp)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = aw.stageChunk(chk.Hash(), dictId, chId)
|
||||
err = aw.stageZStdChunk(chk.Hash(), dictId, chId)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
err = aw.finalizeByteSpans()
|
||||
@@ -304,14 +318,13 @@ func TestArchiveChunkCorruption(t *testing.T) {
|
||||
writer := NewFixedBufferByteSink(make([]byte, 4096))
|
||||
aw := newArchiveWriterWithSink(writer)
|
||||
|
||||
defDict, _ := generateTerribleDefaultDictionary()
|
||||
_, _ = aw.writeByteSpan(defDict)
|
||||
_, _ = aw.writeByteSpan(defaultDict)
|
||||
|
||||
testBlob := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
_, _ = aw.writeByteSpan(testBlob)
|
||||
|
||||
h := hashWithPrefix(t, 23)
|
||||
_ = aw.stageChunk(h, 1, 2)
|
||||
_ = aw.stageZStdChunk(h, 1, 2)
|
||||
_ = aw.finalizeByteSpans()
|
||||
_ = aw.writeIndex()
|
||||
_ = aw.writeMetadata(nil)
|
||||
@@ -325,7 +338,7 @@ func TestArchiveChunkCorruption(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Corrupt the data
|
||||
writer.buff[len(defDict)+3] = writer.buff[len(defDict)+3] + 1
|
||||
writer.buff[len(defaultDict)+3] = writer.buff[len(defaultDict)+3] + 1
|
||||
|
||||
data, err := idx.get(context.Background(), h, &Stats{})
|
||||
assert.ErrorContains(t, err, "cannot decompress invalid src")
|
||||
@@ -341,7 +354,7 @@ func TestArchiveCheckSumValidations(t *testing.T) {
|
||||
_, _ = aw.writeByteSpan(testBlob)
|
||||
|
||||
h := hashWithPrefix(t, 23)
|
||||
_ = aw.stageChunk(h, 0, 1)
|
||||
_ = aw.stageZStdChunk(h, 0, 1)
|
||||
err := aw.finalizeByteSpans()
|
||||
assert.NoError(t, err)
|
||||
err = aw.writeIndex()
|
||||
@@ -437,29 +450,51 @@ func TestProllyBinSearch(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestDuplicateInsertion(t *testing.T) {
|
||||
func TestDictionaryRangeError(t *testing.T) {
|
||||
writer := NewFixedBufferByteSink(make([]byte, 1024))
|
||||
aw := newArchiveWriterWithSink(writer)
|
||||
testBlob := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
_, _ = aw.writeByteSpan(testBlob)
|
||||
h := hashWithPrefix(t, 23)
|
||||
err := aw.stageZStdChunk(h, 0, 1)
|
||||
assert.Equal(t, ErrInvalidDictionaryRange, err)
|
||||
}
|
||||
|
||||
func TestDuplicateInsertion(t *testing.T) {
|
||||
writer := NewFixedBufferByteSink(make([]byte, 4096))
|
||||
aw := newArchiveWriterWithSink(writer)
|
||||
|
||||
_, _ = aw.writeByteSpan(defaultDict)
|
||||
|
||||
testBlob := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
_, _ = aw.writeByteSpan(testBlob)
|
||||
|
||||
h := hashWithPrefix(t, 23)
|
||||
_ = aw.stageChunk(h, 0, 1)
|
||||
err := aw.stageChunk(h, 0, 1)
|
||||
_ = aw.stageZStdChunk(h, 1, 2)
|
||||
err := aw.stageZStdChunk(h, 1, 2)
|
||||
assert.Equal(t, ErrDuplicateChunkWritten, err)
|
||||
}
|
||||
|
||||
func TestInsertRanges(t *testing.T) {
|
||||
writer := NewFixedBufferByteSink(make([]byte, 1024))
|
||||
aw := newArchiveWriterWithSink(writer)
|
||||
|
||||
_, _ = aw.writeByteSpan(defaultDict)
|
||||
|
||||
testBlob := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
_, _ = aw.writeByteSpan(testBlob)
|
||||
|
||||
h := hashWithPrefix(t, 23)
|
||||
err := aw.stageChunk(h, 0, 2)
|
||||
err := aw.stageZStdChunk(h, 1, 3)
|
||||
assert.Equal(t, ErrInvalidChunkRange, err)
|
||||
|
||||
err = aw.stageChunk(h, 2, 1)
|
||||
err = aw.stageZStdChunk(h, 0, 1)
|
||||
assert.Equal(t, ErrInvalidDictionaryRange, err)
|
||||
|
||||
err = aw.stageZStdChunk(h, 1, 0)
|
||||
assert.Equal(t, ErrInvalidChunkRange, err)
|
||||
|
||||
err = aw.stageZStdChunk(h, 4, 1)
|
||||
assert.Equal(t, ErrInvalidDictionaryRange, err)
|
||||
}
|
||||
|
||||
@@ -482,7 +517,7 @@ func TestFooterVersionAndSignature(t *testing.T) {
|
||||
rdr, err := newArchiveReader(context.Background(), tra, fileSize, &Stats{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, archiveFormatVersion, rdr.footer.formatVersion)
|
||||
assert.Equal(t, archiveFormatVersionMax, rdr.footer.formatVersion)
|
||||
assert.Equal(t, archiveFileSignature, rdr.footer.fileSignature)
|
||||
|
||||
// Corrupt the version
|
||||
@@ -493,13 +528,12 @@ func TestFooterVersionAndSignature(t *testing.T) {
|
||||
assert.ErrorContains(t, err, "invalid format version")
|
||||
|
||||
// Corrupt the signature, but first restore the version.
|
||||
theBytes[fileSize-archiveFooterSize+afrVersionOffset] = archiveFormatVersion
|
||||
theBytes[fileSize-archiveFooterSize+afrVersionOffset] = archiveFormatVersionMax
|
||||
theBytes[fileSize-archiveFooterSize+afrSigOffset+2] = 'X'
|
||||
readerAt = bytes.NewReader(theBytes)
|
||||
tra = tableReaderAtAdapter{readerAt}
|
||||
_, err = newArchiveReader(context.Background(), tra, fileSize, &Stats{})
|
||||
assert.ErrorContains(t, err, "invalid file signature")
|
||||
|
||||
}
|
||||
|
||||
func TestChunkRelations(t *testing.T) {
|
||||
@@ -564,10 +598,9 @@ func TestArchiveChunkGroup(t *testing.T) {
|
||||
//
|
||||
// There is also some non-determinism in the compression library itself, so the compression ratios are compared against
|
||||
// ranges we've seen over several runs of the tests.
|
||||
_, defDict := generateTerribleDefaultDictionary()
|
||||
var stats Stats
|
||||
_, cache, hs := generateSimilarChunks(42, 10)
|
||||
cg, err := newChunkGroup(context.TODO(), cache, hs, defDict, &stats)
|
||||
cg, err := newChunkGroup(context.TODO(), cache, hs, defaultCDict, &stats)
|
||||
require.NoError(t, err)
|
||||
assertFloatBetween(t, cg.totalRatioWDict, 0.86, 0.87)
|
||||
assertIntBetween(t, cg.totalBytesSavedWDict, 8690, 8720)
|
||||
@@ -580,7 +613,7 @@ func TestArchiveChunkGroup(t *testing.T) {
|
||||
// Adding unsimilar chunk should change the ratio significantly downward. Doing this mainly to ensure the next
|
||||
// chunk tests positive because of it's high similarity.
|
||||
addChunkToCache(cache, unsimilar)
|
||||
err = cg.addChunk(context.TODO(), cache, unsimilar, defDict, &stats)
|
||||
err = cg.addChunk(context.TODO(), cache, unsimilar, defaultCDict, &stats)
|
||||
assert.NoError(t, err)
|
||||
assertFloatBetween(t, cg.totalRatioWDict, 0.78, 0.81)
|
||||
assertIntBetween(t, cg.totalBytesSavedWDict, 8650, 8700)
|
||||
@@ -592,7 +625,7 @@ func TestArchiveChunkGroup(t *testing.T) {
|
||||
assert.True(t, v)
|
||||
|
||||
addChunkToCache(cache, similar)
|
||||
err = cg.addChunk(context.TODO(), cache, similar, defDict, &stats)
|
||||
err = cg.addChunk(context.TODO(), cache, similar, defaultCDict, &stats)
|
||||
assert.NoError(t, err)
|
||||
assertFloatBetween(t, cg.totalRatioWDict, 0.80, 0.81)
|
||||
assertIntBetween(t, cg.totalBytesSavedWDict, 9650, 9700)
|
||||
@@ -611,7 +644,7 @@ func assertIntBetween(t *testing.T, actual, min, max int) {
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions to create test data below...dd.
|
||||
// Helper functions to create test data below...
|
||||
func hashWithPrefix(t *testing.T, prefix uint64) hash.Hash {
|
||||
randomBytes := make([]byte, 20)
|
||||
n, err := rand.Read(randomBytes)
|
||||
|
||||
@@ -65,7 +65,7 @@ type archiveWriter struct {
|
||||
There is a workflow to writing an archive:
|
||||
1. writeByteSpan: Write a group of bytes to the archive. This will immediately write the bytes to the output, and
|
||||
return an ID for the byte span. Caller must keep track of this ID.
|
||||
2. stageChunk: Given a hash, dictionary (as byteSpan ID), and data (as byteSpan ID), stage a chunk for writing. This
|
||||
2. stageZStdChunk: Given a hash, dictionary (as byteSpan ID), and data (as byteSpan ID), stage a chunk for writing. This
|
||||
does not write anything to disk yet.
|
||||
3. Repeat steps 1 and 2 as necessary. You can interleave them, but all chunks must be staged before the next step.
|
||||
4. finalizeByteSpans: At this point, all byte spans have been written out, and the checksum for the data block
|
||||
@@ -128,9 +128,9 @@ func (aw *archiveWriter) chunkSeen(h hash.Hash) bool {
|
||||
return aw.seenChunks.Has(h)
|
||||
}
|
||||
|
||||
func (aw *archiveWriter) stageChunk(hash hash.Hash, dictionary, data uint32) error {
|
||||
func (aw *archiveWriter) stageZStdChunk(hash hash.Hash, dictionary, data uint32) error {
|
||||
if aw.workflowStage != stageByteSpan {
|
||||
return fmt.Errorf("Runtime error: stageChunk called out of order")
|
||||
return fmt.Errorf("Runtime error: stageZStdChunk called out of order")
|
||||
}
|
||||
|
||||
if data == 0 || data > uint32(len(aw.stagedBytes)) {
|
||||
@@ -139,7 +139,7 @@ func (aw *archiveWriter) stageChunk(hash hash.Hash, dictionary, data uint32) err
|
||||
if aw.seenChunks.Has(hash) {
|
||||
return ErrDuplicateChunkWritten
|
||||
}
|
||||
if dictionary > uint32(len(aw.stagedBytes)) {
|
||||
if dictionary == 0 || dictionary > uint32(len(aw.stagedBytes)) {
|
||||
return ErrInvalidDictionaryRange
|
||||
}
|
||||
|
||||
@@ -148,6 +148,23 @@ func (aw *archiveWriter) stageChunk(hash hash.Hash, dictionary, data uint32) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aw *archiveWriter) stageSnappyChunk(hash hash.Hash, data uint32) error {
|
||||
if aw.workflowStage != stageByteSpan {
|
||||
return fmt.Errorf("Runtime error: stageSnappyChunk called out of order")
|
||||
}
|
||||
|
||||
if data == 0 || data > uint32(len(aw.stagedBytes)) {
|
||||
return ErrInvalidChunkRange
|
||||
}
|
||||
if aw.seenChunks.Has(hash) {
|
||||
return ErrDuplicateChunkWritten
|
||||
}
|
||||
|
||||
aw.seenChunks.Insert(hash)
|
||||
aw.stagedChunks = append(aw.stagedChunks, stagedChunkRef{hash, 0, data})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scrs stagedChunkRefSlice) Len() int {
|
||||
return len(scrs)
|
||||
}
|
||||
@@ -310,7 +327,7 @@ func (aw *archiveWriter) writeFooter() error {
|
||||
}
|
||||
|
||||
// Write out the format version
|
||||
_, err = aw.output.Write([]byte{archiveFormatVersion})
|
||||
_, err = aw.output.Write([]byte{archiveFormatVersionMax})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user