snappy->zStd automatically with a default dictionary

This commit is contained in:
Neil Macneale IV
2025-03-13 18:05:26 -07:00
parent 60fdbf8b1b
commit e674839b90
8 changed files with 201 additions and 46 deletions

View File

@@ -206,7 +206,7 @@ func (w *PullTableFileWriter) addChunkThread() (err error) {
defer func() {
if curWr != nil {
// Cleanup dangling writer, whose contents will never be used.
_, _ = curWr.Finish()
_, _, _ = curWr.Finish()
rd, _ := curWr.Reader()
if rd != nil {
rd.Close()
@@ -292,11 +292,13 @@ func (w *PullTableFileWriter) uploadThread(ctx context.Context, reqCh chan nbs.G
if !ok {
return nil
}
// content length before we finish the write, which will
// add the index and table file footer.
chunksLen := wr.ContentLength()
id, err := wr.Finish()
_, id, err := wr.Finish()
if err != nil {
return err
}
chunkData, err := wr.ChunkDataLength()
if err != nil {
return err
}
@@ -305,8 +307,8 @@ func (w *PullTableFileWriter) uploadThread(ctx context.Context, reqCh chan nbs.G
id: id,
read: wr,
numChunks: wr.ChunkCount(),
chunksLen: chunksLen,
contentLen: wr.ContentLength(),
chunksLen: chunkData,
contentLen: wr.FullLength(),
contentHash: wr.GetMD5(),
}
err = w.uploadTempTableFile(ctx, ttf)

View File

@@ -80,7 +80,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p
return err
}
id, err := classicTable.Finish()
_, id, err := classicTable.Finish()
if err != nil {
return err
}

View File

@@ -21,11 +21,14 @@ import (
"github.com/dolthub/dolt/go/store/hash"
)
// DecompBundle is a bundle of a dictionary and its raw bytes. This is necesary because we sometimes need to copy
// DecompBundle is a bundle of a dictionary and its raw bytes. This is necessary because we sometimes need to copy
// the raw dictionary from one archive to another. The C interface around zStd objects doesn't give us a way to
// get the raw dictionary bytes, so we'll use this struct as the primary interface to pass around dictionaries.
//
// We also track the compression dictionary (CDict) because we sometimes need it too.
type DecompBundle struct {
dictionary *gozstd.DDict
dDict *gozstd.DDict
cDict *gozstd.CDict
rawDictionary *[]byte
}
@@ -37,18 +40,23 @@ func NewDecompBundle(compressedDict []byte) (*DecompBundle, error) {
if err != nil {
return nil, err
}
cDict, err := gozstd.NewCDict(rawDict)
if err != nil {
return nil, err
}
dict, err := gozstd.NewDDict(rawDict)
if err != nil {
return nil, err
}
return &DecompBundle{dictionary: dict, rawDictionary: &rawDict}, nil
return &DecompBundle{dDict: dict, rawDictionary: &rawDict, cDict: cDict}, nil
}
type ArchiveToChunker struct {
h hash.Hash
dict *DecompBundle
h hash.Hash
dict *DecompBundle
// The chunk data in it's compressed form, using the dict
chunkData []byte
}
@@ -66,7 +74,7 @@ func (a ArchiveToChunker) Hash() hash.Hash {
}
func (a ArchiveToChunker) ToChunk() (chunks.Chunk, error) {
dict := a.dict.dictionary
dict := a.dict.dDict
data := a.chunkData
rawChunk, err := gozstd.DecompressDict(nil, data, dict)
if err != nil {

View File

@@ -362,7 +362,7 @@ func (ar archiveReader) get(ctx context.Context, hash hash.Hash, stats *Stats) (
}
var result []byte
result, err = gozstd.DecompressDict(nil, data, dict.dictionary)
result, err = gozstd.DecompressDict(nil, data, dict.dDict)
if err != nil {
return nil, err
}

View File

@@ -18,12 +18,14 @@ import (
"bytes"
"crypto/sha512"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/gozstd"
"github.com/dolthub/dolt/go/store/hash"
@@ -67,6 +69,7 @@ type archiveWriter struct {
fullMD5 md5Sum
workflowStage stage
finalPath string
chunkDataLength uint64
}
/*
@@ -210,6 +213,7 @@ func (aw *archiveWriter) finalizeByteSpans() error {
// Get the checksum for the data written so far
aw.dataCheckSum = sha512Sum(aw.output.GetSum())
aw.output.ResetHasher()
aw.chunkDataLength = aw.md5Summer.Size()
aw.workflowStage = stageIndex
return nil
@@ -473,10 +477,23 @@ func (aw *archiveWriter) genFileName(path string) (string, error) {
return fullPath, nil
}
func (aw *archiveWriter) getChunkDataLength() (uint64, error) {
if aw.workflowStage == stageByteSpan {
return 0, errors.New("runtime error: chunkData not valid until finalized")
}
return aw.chunkDataLength, nil
}
type ArchiveStreamWriter struct {
writer *archiveWriter
dictMap map[*DecompBundle]uint32
chunkCount int32
// snappyQueue is a queue of CompressedChunk that have been written, but not flushed to the archive.
// These are kept in memory until we have enough to create a compression dictionary for them (and subsequent
// snappy chunks). When this value is nil, the snappyDict must be set (they are exclusive)
snappyQueue *[]CompressedChunk
snappyDict *DecompBundle
}
func NewArchiveStreamWriter(tmpDir string) (*ArchiveStreamWriter, error) {
@@ -484,10 +501,15 @@ func NewArchiveStreamWriter(tmpDir string) (*ArchiveStreamWriter, error) {
if err != nil {
return nil, err
}
sq := make([]CompressedChunk, 0, 1000)
return &ArchiveStreamWriter{
writer,
map[*DecompBundle]uint32{},
0,
writer: writer,
dictMap: map[*DecompBundle]uint32{},
chunkCount: 0,
snappyQueue: &sq,
snappyDict: nil,
}, nil
}
@@ -497,25 +519,48 @@ func (asw *ArchiveStreamWriter) Reader() (io.ReadCloser, error) {
return asw.writer.output.Reader()
}
func (asw *ArchiveStreamWriter) Finish() (string, error) {
func (asw *ArchiveStreamWriter) Finish() (uint32, string, error) {
bytesWritten := uint32(0)
if asw.snappyQueue != nil {
// There may be snappy chunks queued up because we didn't get enough to build a dictionary.
for _, cc := range *asw.snappyQueue {
dataId, err := asw.writer.writeByteSpan(cc.FullCompressedChunk)
if err != nil {
return bytesWritten, "", err
}
bytesWritten += uint32(len(cc.FullCompressedChunk))
asw.chunkCount += 1
err = asw.writer.stageSnappyChunk(cc.Hash(), dataId)
if err != nil {
return bytesWritten, "", err
}
}
}
// This will perform all the steps to construct an archive file - starting with the finalization of byte spans.
// All writeByteSpan calls and stage* calls must be completed before this.
err := indexFinalize(asw.writer, hash.Hash{})
if err != nil {
return "", err
return 0, "", err
}
h, err := asw.writer.getName()
if err != nil {
return "", err
return 0, "", err
}
return h.String() + ArchiveFileSuffix, nil
return 0, h.String() + ArchiveFileSuffix, nil
}
func (asw *ArchiveStreamWriter) ChunkCount() int {
return int(asw.chunkCount)
}
func (asw *ArchiveStreamWriter) ChunkDataLength() (uint64, error) {
return asw.writer.getChunkDataLength()
}
func (asw *ArchiveStreamWriter) AddChunk(chunker ToChunker) (uint32, error) {
if cc, ok := chunker.(CompressedChunk); ok {
return asw.writeCompressedChunk(cc)
@@ -526,7 +571,7 @@ func (asw *ArchiveStreamWriter) AddChunk(chunker ToChunker) (uint32, error) {
return 0, fmt.Errorf("Unknown chunk type: %T", chunker)
}
func (asw *ArchiveStreamWriter) ContentLength() uint64 {
func (asw *ArchiveStreamWriter) FullLength() uint64 {
return asw.writer.md5Summer.Size()
}
@@ -554,7 +599,7 @@ func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker)
if err != nil {
return 0, err
}
bytesWritten += uint32(len(*dict.rawDictionary))
bytesWritten += uint32(len(compressedDict))
asw.dictMap[dict] = dictId
}
@@ -567,12 +612,84 @@ func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker)
return bytesWritten, asw.writer.stageZStdChunk(chunker.Hash(), dictId, dataId)
}
func (asw *ArchiveStreamWriter) writeCompressedChunk(chunker CompressedChunk) (uint32, error) {
writeCount := uint32(len(chunker.FullCompressedChunk))
dataId, err := asw.writer.writeByteSpan(chunker.FullCompressedChunk)
func (asw *ArchiveStreamWriter) writeCompressedChunk(chunker CompressedChunk) (bytesWritten uint32, err error) {
if asw.snappyQueue != nil {
// We have a queue of compressed chunks that we are waiting to flush.
// Add this chunk to the queue.
*asw.snappyQueue = append(*asw.snappyQueue, chunker)
if len(*asw.snappyQueue) < maxSamples {
return 0, nil
}
// We have enough to build a dictionary. Build it, and flush the queue.
samples := make([]*chunks.Chunk, len(*asw.snappyQueue))
for i, cc := range *asw.snappyQueue {
chk, err := cc.ToChunk()
if err != nil {
return 0, err
}
samples[i] = &chk
}
rawDictionary := buildDictionary(samples)
compressedDict := gozstd.Compress(nil, rawDictionary)
asw.snappyDict, err = NewDecompBundle(compressedDict)
if err != nil {
return 0, err
}
// New dictionary. Write it out, and add id to the map.
dictId, err := asw.writer.writeByteSpan(compressedDict)
if err != nil {
return 0, err
}
bytesWritten += uint32(len(compressedDict))
asw.dictMap[asw.snappyDict] = dictId
// Now stage all the
for _, cc := range *asw.snappyQueue {
bw := uint32(0)
bw, err = asw.convertSnappyAndStage(cc)
if err != nil {
return bytesWritten, err
}
bytesWritten += bw
asw.chunkCount += 1
}
asw.snappyQueue = nil
return bytesWritten, err
} else {
// Convert this chunk from snappy to zstd, and write it out.
bw, err := asw.convertSnappyAndStage(chunker)
if err != nil {
return 0, err
}
asw.chunkCount += 1
return bw, nil
}
}
// convertSnappyAndStage converts a snappy compressed chunk to zstd compression and stages it for writing.
// It returns the number of bytes written and an error if any occurred during the process. This method
// assumes that the snappyDict is already created and available in the ArchiveStreamWriter.
func (asw *ArchiveStreamWriter) convertSnappyAndStage(cc CompressedChunk) (uint32, error) {
dictId, ok := asw.dictMap[asw.snappyDict]
if !ok {
return 0, errors.New("runtime error: snappyDict not found in dictMap")
}
h := cc.Hash()
chk, err := cc.ToChunk()
if err != nil {
return 0, err
}
asw.chunkCount += 1
return writeCount, asw.writer.stageSnappyChunk(chunker.Hash(), dataId)
compressedData := gozstd.CompressDict(nil, chk.Data(), asw.snappyDict.cDict)
dataId, err := asw.writer.writeByteSpan(compressedData)
if err != nil {
return 0, err
}
bytesWritten := uint32(len(compressedData))
return bytesWritten, asw.writer.stageZStdChunk(h, dictId, dataId)
}

View File

@@ -24,9 +24,8 @@ import (
"os"
"sort"
"github.com/golang/snappy"
"github.com/dolthub/dolt/go/store/hash"
"github.com/golang/snappy"
)
// GenericTableWriter is an interface for writing table files regardless of the output format
@@ -35,18 +34,25 @@ type GenericTableWriter interface {
Reader() (io.ReadCloser, error)
// Finish completed the writing of the table file and returns the calculated name of the table. Note that Finish
// doesn't move the file, but it returns the name that the file should be moved to.
Finish() (string, error)
// It also returns the additional bytes written to the table file. Those bytes are included in the ContentLength.
Finish() (uint32, string, error)
// ChunkCount returns the number of chunks written to the table file. This can be called before Finish to determine
// if the maximum number of chunks has been reached.
ChunkCount() int
// AddChunk adds a chunk to the table file. The underlying implementation of ToChunker will probably be exploited
// by implementors of GenericTableWriter so that their bytes can be efficiently written to the table file.
//
// The number of bytes written to storage is returned. This could be 0 even on success if the writer decides
// to defer writing the chunks. In the event that AddChunk triggers a flush, the number of bytes written to storage
// will be returned.
//
// If no error occurs, the number of bytes written to the store is returned.
AddChunk(ToChunker) (uint32, error)
// ContentLength returns the number of bytes written to the table file. This can be called before Finish to determine
// if the file is too large.
ContentLength() uint64
// ChunkDataLentgh returns the number of bytes written which are specifically tracked data. It will not include
// data written for the indexes of the storage files. The returned value is only valid after Finish is called.
ChunkDataLength() (uint64, error)
// FullLength returns the number of bytes written to the table file.
FullLength() uint64
// GetMD5 returns the MD5 hash of the table file. This can can only be called after Finish.
GetMD5() []byte
// Remove cleans up and artifacts created by the table writer. Called after everything else is done.
@@ -68,6 +74,7 @@ var ErrDuplicateChunkWritten = errors.New("duplicate chunks written")
// CmpChunkTableWriter writes CompressedChunks to a table file
type CmpChunkTableWriter struct {
sink *HashingByteSink
chunkDataLength uint64
totalUncompressedData uint64
prefixes prefixIndexSlice
blockAddr *hash.Hash
@@ -83,7 +90,14 @@ func NewCmpChunkTableWriter(tempDir string) (*CmpChunkTableWriter, error) {
return nil, err
}
return &CmpChunkTableWriter{NewMD5HashingByteSink(s), 0, nil, nil, s.path}, nil
return &CmpChunkTableWriter{
sink: NewMD5HashingByteSink(s),
chunkDataLength: 0,
totalUncompressedData: 0,
prefixes: nil,
blockAddr: nil,
path: s.path,
}, nil
}
func (tw *CmpChunkTableWriter) ChunkCount() int {
@@ -91,7 +105,7 @@ func (tw *CmpChunkTableWriter) ChunkCount() int {
}
// Gets the size of the entire table file in bytes
func (tw *CmpChunkTableWriter) ContentLength() uint64 {
func (tw *CmpChunkTableWriter) FullLength() uint64 {
return tw.sink.Size()
}
@@ -152,21 +166,25 @@ func (tw *CmpChunkTableWriter) AddChunk(tc ToChunker) (uint32, error) {
}
// Finish will write the index and footer of the table file and return the id of the file.
func (tw *CmpChunkTableWriter) Finish() (string, error) {
func (tw *CmpChunkTableWriter) Finish() (uint32, string, error) {
if tw.blockAddr != nil {
return "", ErrAlreadyFinished
return 0, "", ErrAlreadyFinished
}
startSize := tw.sink.Size()
// This happens to be the chunk data size.
tw.chunkDataLength = startSize
blockHash, err := tw.writeIndex()
if err != nil {
return "", err
return 0, "", err
}
err = tw.writeFooter()
if err != nil {
return "", err
return 0, "", err
}
var h []byte
@@ -174,7 +192,17 @@ func (tw *CmpChunkTableWriter) Finish() (string, error) {
blockAddr := hash.New(h[:hash.ByteLen])
tw.blockAddr = &blockAddr
return tw.blockAddr.String(), nil
endSize := tw.sink.Size()
return uint32(endSize - startSize), tw.blockAddr.String(), nil
}
func (tw *CmpChunkTableWriter) ChunkDataLength() (uint64, error) {
if tw.chunkDataLength == 0 {
return 0, errors.New("runtime error: ChunkDataLength invalid before Finish")
}
return tw.chunkDataLength, nil
}
// FlushToFile can be called after Finish in order to write the data out to the path provided.

View File

@@ -63,7 +63,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
require.NoError(t, err)
}
id, err := tw.Finish()
_, id, err := tw.Finish()
require.NoError(t, err)
t.Run("ErrDuplicateChunkWritten", func(t *testing.T) {
@@ -75,7 +75,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
_, err = tw.AddChunk(cmpChnk)
require.NoError(t, err)
}
_, err = tw.Finish()
_, _, err = tw.Finish()
require.Error(t, err, ErrDuplicateChunkWritten)
})

View File

@@ -69,7 +69,7 @@ func (gcc *gcCopier) cancel(_ context.Context) error {
func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err error) {
var filename string
filename, err = gcc.writer.Finish()
_, filename, err = gcc.writer.Finish()
if err != nil {
return nil, err
}
@@ -120,7 +120,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err e
return nil, err
}
defer r.Close()
sz := gcc.writer.ContentLength()
sz := gcc.writer.FullLength()
err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount()))
if err != nil {