Make archiveWriter thread safe

This commit is contained in:
Neil Macneale IV
2025-03-05 12:18:02 -08:00
parent 357a1cf201
commit a97eea5b8e

View File

@@ -22,6 +22,8 @@ import (
"io"
"path/filepath"
"sort"
"strings"
"sync"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -59,6 +61,9 @@ type archiveWriter struct {
footerCheckSum sha512Sum
workflowStage stage
finalPath string
// Currently using a blunt lock for the writer. The writeByteSpan and stage* methods may benefit from
// a more nuanced approach.
lock sync.Mutex
}
/*
@@ -66,7 +71,7 @@ There is a workflow to writing an archive:
1. writeByteSpan: Write a group of bytes to the archive. This will immediately write the bytes to the output, and
return an ID for the byte span. Caller must keep track of this ID.
2. stageZStdChunk: Given a hash, dictionary (as byteSpan ID), and data (as byteSpan ID), stage a chunk for writing. This
does not write anything to disk yet.
does not write anything to disk yet. stageSnappyChunk is a similar function for snappy compressed chunks (no dictionary).
3. Repeat steps 1 and 2 as necessary. You can interleave them, but all chunks must be staged before the next step.
4. finalizeByteSpans: At this point, all byte spans have been written out, and the checksum for the data block
is calculated. No more byte spans can be written after this step.
@@ -75,12 +80,10 @@ There is a workflow to writing an archive:
6. writeMetadata: Write the metadataSpan to the archive. Calculate the metadataSpan checksum at the end of this step.
7. writeFooter: Write the footer to the archive. This will write out the index length, byte span count, chunk count.
8. flushToFile: Write the archive to disk and move into its new home.
When all of these steps have been completed without error, the ByteSink used to create the writer can be flushed and closed
to complete the archive writing process.
*/
// newArchiveWriter - Create an *archiveWriter with the given output ByteSync, which will be used to materialize an archive on disk.
// newArchiveWriter creates a new archiveWriter. Output is written to a temp file, as the file name won't be known
// until we've finished writing the footer.
func newArchiveWriter() (*archiveWriter, error) {
bs, err := NewBufferedFileByteSink("", defaultTableSinkBlockSize, defaultChBufferSize)
if err != nil {
@@ -88,18 +91,24 @@ func newArchiveWriter() (*archiveWriter, error) {
}
hbs := NewSHA512HashingByteSink(bs)
return &archiveWriter{output: hbs, seenChunks: hash.HashSet{}}, nil
return &archiveWriter{output: hbs, seenChunks: hash.HashSet{}, lock: sync.Mutex{}}, nil
}
// newArchiveWriter - Create an *archiveWriter with the given output ByteSync. This is used for testing.
func newArchiveWriterWithSink(bs ByteSink) *archiveWriter {
hbs := NewSHA512HashingByteSink(bs)
return &archiveWriter{output: hbs, seenChunks: hash.HashSet{}}
return &archiveWriter{output: hbs, seenChunks: hash.HashSet{}, lock: sync.Mutex{}}
}
// writeByteSpan writes a byte span to the archive, returning the ByteSpan ID if the write was successful. Note
// that writing an empty byte span is a no-op and will return 0. Also, the slice passed in is copied, so the caller
// can reuse the slice after this call.
//
// This method acquires the lock on the writer.
func (aw *archiveWriter) writeByteSpan(b []byte) (uint32, error) {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageByteSpan {
return 0, fmt.Errorf("Runtime error: writeByteSpan called out of order")
}
@@ -125,10 +134,20 @@ func (aw *archiveWriter) writeByteSpan(b []byte) (uint32, error) {
}
func (aw *archiveWriter) chunkSeen(h hash.Hash) bool {
aw.lock.Lock()
defer aw.lock.Unlock()
return aw.seenChunks.Has(h)
}
// stageZStdChunk stages a zStd compressed chunk for writing. The |dictionary| and |data| arguments must refer to IDs
// returned by |writeByteSpan|.
//
// This method acquires the lock on the writer. There will be races for sure.
func (aw *archiveWriter) stageZStdChunk(hash hash.Hash, dictionary, data uint32) error {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageByteSpan {
return fmt.Errorf("Runtime error: stageZStdChunk called out of order")
}
@@ -148,12 +167,19 @@ func (aw *archiveWriter) stageZStdChunk(hash hash.Hash, dictionary, data uint32)
return nil
}
func (aw *archiveWriter) stageSnappyChunk(hash hash.Hash, data uint32) error {
// stageSnappyChunk stages a snappy compressed chunk for writing. This is similar to stageZStdChunk, but does not require
// the dictionary. the |dataId| must refer to an ID returned by |writeByteSpan|.
//
// This method acquires the lock on the writer. There will be races to call this method for sure.
func (aw *archiveWriter) stageSnappyChunk(hash hash.Hash, dataId uint32) error {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageByteSpan {
return fmt.Errorf("Runtime error: stageSnappyChunk called out of order")
}
if data == 0 || data > uint32(len(aw.stagedBytes)) {
if dataId == 0 || dataId > uint32(len(aw.stagedBytes)) {
return ErrInvalidChunkRange
}
if aw.seenChunks.Has(hash) {
@@ -161,7 +187,7 @@ func (aw *archiveWriter) stageSnappyChunk(hash hash.Hash, data uint32) error {
}
aw.seenChunks.Insert(hash)
aw.stagedChunks = append(aw.stagedChunks, stagedChunkRef{hash, 0, data})
aw.stagedChunks = append(aw.stagedChunks, stagedChunkRef{hash, 0, dataId})
return nil
}
@@ -175,7 +201,15 @@ func (scrs stagedChunkRefSlice) Swap(i, j int) {
scrs[i], scrs[j] = scrs[j], scrs[i]
}
// finalizeByteSpans should be called after all byte spans have been written. It calculates the checksum for the data
// to be written later in the footer.
//
// This method acquires the lock on the writer. There should never be a race to call this method, but the lock
// guards against the |workflowStage| being changed by another thread.
func (aw *archiveWriter) finalizeByteSpans() error {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageByteSpan {
return fmt.Errorf("Runtime error: finalizeByteSpans called out of order")
}
@@ -204,7 +238,13 @@ var _ io.Writer = &streamCounter{}
// writeIndex writes the index to the archive. Expects the hasher to be reset before being called, and will reset it. It
// sets the indexLen and indexCheckSum fields on the archiveWriter, and updates the bytesWritten field.
//
// This method acquires the lock on the writer. There should never be a race to call this method, but the lock
// guards against the |workflowStage| being changed by another thread.
func (aw *archiveWriter) writeIndex() error {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageIndex {
return fmt.Errorf("Runtime error: writeIndex called out of order")
}
@@ -270,7 +310,13 @@ func (aw *archiveWriter) writeIndex() error {
// It sets the metadataLen and metadataCheckSum fields on the archiveWriter, and updates the bytesWritten field.
//
// Empty input is allowed.
//
// This method acquires the lock on the writer. There should never be a race to call this method, but the lock
// guards against the |workflowStage| being changed by another thread.
func (aw *archiveWriter) writeMetadata(data []byte) error {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageMetadata {
return fmt.Errorf("Runtime error: writeMetadata called out of order")
}
@@ -292,7 +338,15 @@ func (aw *archiveWriter) writeMetadata(data []byte) error {
return nil
}
// writeFooter writes the footer to the archive. This method is intended to be called after writeMetadata,
// and will complete the writing of bytes into the temp file.
//
// This method acquires the lock on the writer. There should never be a race to call this method, but the lock
// guards against the |workflowStage| being changed by another thread.
func (aw *archiveWriter) writeFooter() error {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageFooter {
return fmt.Errorf("Runtime error: writeFooter called out of order")
}
@@ -347,6 +401,8 @@ func (aw *archiveWriter) writeFooter() error {
return nil
}
// writeCheckSums writes the data, index and metadata checksums into the footer.
// Internal helper method. Really only should be used by |writeFooter| Assumes the lock is held.
func (aw *archiveWriter) writeCheckSums() error {
err := aw.writeSha512(aw.dataCheckSum)
if err != nil {
@@ -361,6 +417,8 @@ func (aw *archiveWriter) writeCheckSums() error {
return aw.writeSha512(aw.metadataCheckSum)
}
// writeSha512 writes a sha512Sum to the archive. Increments the bytesWritten field.
// Internal helper method. Assumes the lock is held.
func (aw *archiveWriter) writeSha512(sha sha512Sum) error {
_, err := aw.output.Write(sha[:])
if err != nil {
@@ -372,6 +430,7 @@ func (aw *archiveWriter) writeSha512(sha sha512Sum) error {
}
// Write a uint64 to the archive. Increments the bytesWritten field.
// Internal helper method. Assumes the lock is held.
func (aw *archiveWriter) writeUint64(val uint64) error {
err := binary.Write(aw.output, binary.BigEndian, val)
if err != nil {
@@ -383,6 +442,7 @@ func (aw *archiveWriter) writeUint64(val uint64) error {
}
// Write a uint32 to the archive. Increments the bytesWritten field.
// Internal helper method. Assumes the lock is held.
func (aw *archiveWriter) writeUint32(val uint32) error {
err := binary.Write(aw.output, binary.BigEndian, val)
if err != nil {
@@ -393,22 +453,19 @@ func (aw *archiveWriter) writeUint32(val uint32) error {
return nil
}
// Write a uint64 to the archive as a varint. This is used during the index writing process, so we expect the io.Writer
// to keep track of the written byte count.
func writeVarUint64(w io.Writer, val uint64) error {
var buf [binary.MaxVarintLen64]byte
n := binary.PutUvarint(buf[:], val)
_, err := w.Write(buf[:n])
return err
}
// flushToFile writes the archive to disk. The input is the directory where the file should be written, the file name
// will be the footer hash + ".darc" as a suffix.
// flushToFile writes the archive to disk. Path must end in ".darc"
func (aw *archiveWriter) flushToFile(fullPath string) error {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageFlush {
return fmt.Errorf("Runtime error: flushToFile called out of order")
}
if !strings.HasSuffix(fullPath, ArchiveFileSuffix) {
return fmt.Errorf("Invalid archive file path: %s", fullPath)
}
if bs, ok := aw.output.backingSink.(*BufferedFileByteSink); ok {
err := bs.finish()
if err != nil {
@@ -426,6 +483,9 @@ func (aw *archiveWriter) flushToFile(fullPath string) error {
}
func (aw *archiveWriter) getName() (hash.Hash, error) {
aw.lock.Lock()
defer aw.lock.Unlock()
if aw.workflowStage != stageFlush && aw.workflowStage != stageDone {
return hash.Hash{}, fmt.Errorf("Runtime error: getName called out of order")
}
@@ -434,6 +494,8 @@ func (aw *archiveWriter) getName() (hash.Hash, error) {
}
func (aw *archiveWriter) genFileName(path string) (string, error) {
// No need to lock here, as aw.getName() acquires the lock.
if aw.workflowStage != stageFlush {
return "", fmt.Errorf("Runtime error: genFileName called out of order")
}