Remote support for archives, and pulling impl toggles by env var

This commit is contained in:
Neil Macneale IV
2025-03-06 18:25:21 -08:00
parent ff94e296ae
commit aa2175c1a0
11 changed files with 98 additions and 70 deletions
+1 -1
View File
@@ -418,7 +418,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
}
func (rs *RemoteChunkStore) getUploadUrl(md metadata.MD, repoPath string, tfd *remotesapi.TableFileDetails) *url.URL {
fileID := hash.New(tfd.Id).String()
fileID := hash.New(tfd.Id).String() + tfd.Suffix
params := url.Values{}
params.Add("num_chunks", strconv.Itoa(int(tfd.NumChunks)))
params.Add("content_length", strconv.Itoa(int(tfd.ContentLength)))
+15 -5
View File
@@ -123,8 +123,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
}
i := strings.LastIndex(path, "/")
// a table file name is currently 32 characters, plus the '/' is 33.
if i < 0 || len(path[i:]) != 33 {
if i < 0 || !validateFileName(path[i+1:]) {
logger = logger.WithField("status", http.StatusNotFound)
respWr.WriteHeader(http.StatusNotFound)
return
@@ -231,7 +230,7 @@ func readTableFile(logger *logrus.Entry, path string, respWr http.ResponseWriter
}
}()
if rangeStr == "" {
if rangeStr != "" {
respWr.WriteHeader(http.StatusPartialContent)
} else {
respWr.WriteHeader(http.StatusOK)
@@ -288,8 +287,7 @@ func (u *uploadreader) Close() error {
}
func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, path, fileId string, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) (*logrus.Entry, int) {
_, ok := hash.MaybeParse(fileId)
if !ok {
if !validateFileName(fileId) {
logger = logger.WithField("status", http.StatusBadRequest)
logger.Warnf("%s is not a valid hash", fileId)
return logger, http.StatusBadRequest
@@ -443,3 +441,15 @@ func ExtractBasicAuthCreds(ctx context.Context) (*RequestCredentials, error) {
return &RequestCredentials{Username: username, Password: password, Address: addr.Addr.String()}, nil
}
}
func validateFileName(fileName string) bool {
if len(fileName) == 32 {
_, ok := hash.MaybeParse(fileName)
return ok
}
if len(fileName) == 32+len(nbs.ArchiveFileSuffix) && strings.HasSuffix(fileName, nbs.ArchiveFileSuffix) {
_, ok := hash.MaybeParse(fileName[:32])
return ok
}
return false
}
@@ -933,7 +933,9 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has
}
for h, contentHash := range hashToContentHash {
err := dcs.uploadTableFileWithRetries(ctx, h, uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) {
// Tables created on this path are always starting from memory tables and ending up as noms table files.
// As a result, the suffix is always empty.
err := dcs.uploadTableFileWithRetries(ctx, h, "", uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) {
data := hashToData[h]
return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil
})
@@ -945,7 +947,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has
return hashToCount, nil
}
func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error {
func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error {
op := func() error {
body, contentLength, err := getContent()
if err != nil {
@@ -957,6 +959,7 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table
ContentLength: contentLength,
ContentHash: tableFileContentHash,
NumChunks: numChunks,
Suffix: suffix,
}
dcs.logf("getting upload location for file %s", tableFileId.String())
@@ -1061,17 +1064,14 @@ func (dcs *DoltChunkStore) SupportedOperations() chunks.TableFileStoreOps {
// WriteTableFile reads a table file from the provided reader and writes it to the chunk store.
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
// Err if the suffix is an archive file
suffix := ""
if strings.HasSuffix(fileId, nbs.ArchiveFileSuffix) {
return errors.New("cannot write archive file ids currently.")
suffix = nbs.ArchiveFileSuffix
fileId = strings.TrimSuffix(fileId, nbs.ArchiveFileSuffix)
}
fileIdBytes := hash.Parse(fileId)
err := dcs.uploadTableFileWithRetries(ctx, fileIdBytes, uint64(numChunks), contentHash, getRd)
if err != nil {
return err
}
return nil
return dcs.uploadTableFileWithRetries(ctx, fileIdBytes, suffix, uint64(numChunks), contentHash, getRd)
}
// AddTableFilesToManifest adds table files to the manifest
+19 -8
View File
@@ -17,6 +17,8 @@ package pull
import (
"context"
"io"
"os"
"strings"
"sync"
"sync/atomic"
@@ -54,7 +56,7 @@ import (
type PullTableFileWriter struct {
cfg PullTableFileWriterConfig
addChunkCh chan nbs.CompressedChunk
addChunkCh chan nbs.ToChunker
newWriterCh chan nbs.GenericTableWriter
egCtx context.Context
eg *errgroup.Group
@@ -99,7 +101,7 @@ type PullTableFileWriterStats struct {
func NewPullTableFileWriter(ctx context.Context, cfg PullTableFileWriterConfig) *PullTableFileWriter {
ret := &PullTableFileWriter{
cfg: cfg,
addChunkCh: make(chan nbs.CompressedChunk),
addChunkCh: make(chan nbs.ToChunker),
newWriterCh: make(chan nbs.GenericTableWriter, cfg.MaximumBufferedFiles),
getAddrs: cfg.GetAddrs,
}
@@ -124,7 +126,7 @@ func (w *PullTableFileWriter) GetStats() PullTableFileWriterStats {
// This method may block for arbitrary amounts of time if there is already a
// lot of buffered table files and we are waiting for uploads to succeed before
// creating more table files.
func (w *PullTableFileWriter) AddCompressedChunk(ctx context.Context, chk nbs.CompressedChunk) error {
func (w *PullTableFileWriter) AddToChunker(ctx context.Context, chk nbs.ToChunker) error {
select {
case w.addChunkCh <- chk:
return nil
@@ -169,7 +171,12 @@ func (w *PullTableFileWriter) uploadAndFinalizeThread() (err error) {
go func() {
defer manifestWg.Done()
for ttf := range respCh {
manifestUpdates[ttf.id] = ttf.numChunks
id := ttf.id
if strings.HasSuffix(id, nbs.ArchiveFileSuffix) {
id = id[:len(id)-len(nbs.ArchiveFileSuffix)]
}
manifestUpdates[id] = ttf.numChunks
}
}()
@@ -235,19 +242,23 @@ LOOP:
}
if curWr == nil {
// curWr, err = nbs.NewCmpChunkTableWriter(w.cfg.TempDir)
curWr, err = nbs.NewArchiveStreamWriter()
if os.Getenv("DOLT_ARCHIVE_PULL_STREAMER") != "" {
curWr, err = nbs.NewArchiveStreamWriter(w.cfg.TempDir)
} else {
curWr, err = nbs.NewCmpChunkTableWriter(w.cfg.TempDir)
}
if err != nil {
return err
}
}
// Add the chunk to writer.
err = curWr.AddChunk(newChnk)
bytes, err := curWr.AddChunk(newChnk)
if err != nil {
return err
}
atomic.AddUint64(&w.bufferedSendBytes, uint64(len(newChnk.FullCompressedChunk)))
atomic.AddUint64(&w.bufferedSendBytes, uint64(bytes))
}
}
+3 -13
View File
@@ -381,19 +381,9 @@ func (p *Puller) Pull(ctx context.Context) error {
}
tracker.TickProcessed()
if compressedChunk, ok := cChk.(nbs.CompressedChunk); ok {
err = p.wr.AddCompressedChunk(ctx, compressedChunk)
if err != nil {
return err
}
} else if _, ok := cChk.(nbs.ArchiveToChunker); ok {
// NM4 - Until we can write quickly to archives.....
cc := nbs.ChunkToCompressedChunk(chnk)
err = p.wr.AddCompressedChunk(ctx, cc)
if err != nil {
return err
}
err = p.wr.AddToChunker(ctx, cChk)
if err != nil {
return err
}
}
})
+2 -2
View File
@@ -68,7 +68,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p
err = arc.iterate(ctx, func(chk chunks.Chunk) error {
cmpChk := ChunkToCompressedChunk(chk)
err := classicTable.AddChunk(cmpChk)
_, err := classicTable.AddChunk(cmpChk)
if err != nil {
return err
}
@@ -233,7 +233,7 @@ func convertTableFileToArchive(
cmpBuff = gozstd.Compress(cmpBuff[:0], defaultDict)
// p("Default Dict Raw vs Compressed: %d , %d\n", len(defaultDict), len(cmpDefDict))
arcW, err := newArchiveWriter()
arcW, err := newArchiveWriter("")
if err != nil {
return "", hash.Hash{}, err
}
+18 -13
View File
@@ -91,8 +91,8 @@ There is a workflow to writing an archive:
// newArchiveWriter creates a new archiveWriter. Output is written to a temp file, as the file name won't be known
// until we've finished writing the footer.
func newArchiveWriter() (*archiveWriter, error) {
bs, err := NewBufferedFileByteSink("", defaultTableSinkBlockSize, defaultChBufferSize)
func newArchiveWriter(tmpDir string) (*archiveWriter, error) {
bs, err := NewBufferedFileByteSink(tmpDir, defaultTableSinkBlockSize, defaultChBufferSize)
if err != nil {
return nil, err
}
@@ -537,8 +537,8 @@ type ArchiveStreamWriter struct {
chunkCount *int32
}
func NewArchiveStreamWriter() (*ArchiveStreamWriter, error) {
writer, err := newArchiveWriter()
func NewArchiveStreamWriter(tmpDir string) (*ArchiveStreamWriter, error) {
writer, err := newArchiveWriter(tmpDir)
if err != nil {
return nil, err
}
@@ -575,18 +575,18 @@ func (asw *ArchiveStreamWriter) ChunkCount() int {
return int(atomic.LoadInt32(asw.chunkCount))
}
func (asw *ArchiveStreamWriter) AddChunk(chunker ToChunker) error {
func (asw *ArchiveStreamWriter) AddChunk(chunker ToChunker) (uint32, error) {
if cc, ok := chunker.(CompressedChunk); ok {
return asw.writeCompressedChunk(cc)
}
if ac, ok := chunker.(ArchiveToChunker); ok {
return asw.writeArchiveToChunker(ac)
}
return fmt.Errorf("Unknown chunk type: %T", chunker)
return 0, fmt.Errorf("Unknown chunk type: %T", chunker)
}
func (asw *ArchiveStreamWriter) ContentLength() uint64 {
return asw.writer.bytesWritten
return asw.writer.md5Summer.Size()
}
func (asw *ArchiveStreamWriter) GetMD5() []byte {
@@ -597,9 +597,11 @@ func (asw *ArchiveStreamWriter) Remove() error {
return os.Remove(asw.writer.finalPath)
}
func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker) error {
func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker) (uint32, error) {
dict := chunker.dict
bytesWritten := uint32(0)
var err error
dictId, ok := asw.dictMap[dict]
if !ok {
@@ -618,6 +620,7 @@ func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker)
if err != nil {
return err
}
bytesWritten += uint32(len(*dict.rawDictionary))
asw.dictMap[dict] = dictId
return nil
}()
@@ -625,18 +628,20 @@ func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker)
dataId, err := asw.writer.writeByteSpan(chunker.chunkData)
if err != nil {
return err
return bytesWritten, err
}
bytesWritten += uint32(len(chunker.chunkData))
atomic.AddInt32(asw.chunkCount, 1)
return asw.writer.stageZStdChunk(chunker.Hash(), dictId, dataId)
return bytesWritten, asw.writer.stageZStdChunk(chunker.Hash(), dictId, dataId)
}
func (asw *ArchiveStreamWriter) writeCompressedChunk(chunker CompressedChunk) error {
func (asw *ArchiveStreamWriter) writeCompressedChunk(chunker CompressedChunk) (uint32, error) {
writeCount := uint32(len(chunker.FullCompressedChunk))
dataId, err := asw.writer.writeByteSpan(chunker.FullCompressedChunk)
if err != nil {
return err
return 0, err
}
atomic.AddInt32(asw.chunkCount, 1)
return asw.writer.stageSnappyChunk(chunker.Hash(), dataId)
return writeCount, asw.writer.stageSnappyChunk(chunker.Hash(), dataId)
}
+1 -1
View File
@@ -316,7 +316,7 @@ func (sink *BufferedFileByteSink) Reader() (io.ReadCloser, error) {
return os.Open(sink.path)
}
// HashingByteSink is a ByteSink that keeps an md5 hash of all the data written to it.
// HashingByteSink is a ByteSink that keeps an hash of all the data written to it.
type HashingByteSink struct {
backingSink ByteSink
hasher hash.Hash
+23 -13
View File
@@ -18,6 +18,7 @@ import (
"crypto/sha512"
"encoding/binary"
"errors"
"fmt"
gohash "hash"
"io"
"os"
@@ -38,9 +39,11 @@ type GenericTableWriter interface {
// ChunkCount returns the number of chunks written to the table file. This can be called before Finish to determine
// if the maximum number of chunks has been reached.
ChunkCount() int
// AddChunk adds a chunk to the table file. The underlying implementation of ToChunker will probably be exploided
// AddChunk adds a chunk to the table file. The underlying implementation of ToChunker will probably be exploited
// by implementors of GenericTableWriter so that their bytes can be efficiently written to the table file.
AddChunk(ToChunker) error
//
// If no error occurs, the number of bytes written to the store is returned.
AddChunk(ToChunker) (uint32, error)
// ContentLength returns the number of bytes written to the table file. This can be called before Finish to determine
// if the file is too large.
ContentLength() uint64
@@ -65,7 +68,6 @@ var ErrDuplicateChunkWritten = errors.New("duplicate chunks written")
// CmpChunkTableWriter writes CompressedChunks to a table file
type CmpChunkTableWriter struct {
sink *HashingByteSink
totalCompressedData uint64
totalUncompressedData uint64
prefixes prefixIndexSlice
blockAddr *hash.Hash
@@ -81,7 +83,7 @@ func NewCmpChunkTableWriter(tempDir string) (*CmpChunkTableWriter, error) {
return nil, err
}
return &CmpChunkTableWriter{NewMD5HashingByteSink(s), 0, 0, nil, nil, s.path}, nil
return &CmpChunkTableWriter{NewMD5HashingByteSink(s), 0, nil, nil, s.path}, nil
}
func (tw *CmpChunkTableWriter) ChunkCount() int {
@@ -99,12 +101,12 @@ func (tw *CmpChunkTableWriter) GetMD5() []byte {
}
// AddCmpChunk adds a compressed chunk
func (tw *CmpChunkTableWriter) AddChunk(tc ToChunker) error {
func (tw *CmpChunkTableWriter) AddChunk(tc ToChunker) (uint32, error) {
if tc.IsGhost() {
// Ghost chunks cannot be written to a table file. They should
// always be filtered by the write processes before landing
// here.
return ErrGhostChunkRequested
return 0, ErrGhostChunkRequested
}
if tc.IsEmpty() {
panic("NBS blocks cannot be zero length")
@@ -112,33 +114,41 @@ func (tw *CmpChunkTableWriter) AddChunk(tc ToChunker) error {
c, ok := tc.(CompressedChunk)
if !ok {
panic("runtime error: Require a CompressedChunk instance")
if arc, ok := tc.(ArchiveToChunker); ok {
// Decompress, and recompress since we can only write snappy compressed objects to this store.
chk, err := arc.ToChunk()
if err != nil {
return 0, err
}
c = ChunkToCompressedChunk(chk)
} else {
panic(fmt.Sprintf("Unknown chunk type: %T", tc))
}
}
uncmpLen, err := snappy.DecodedLen(c.CompressedData)
if err != nil {
return err
return 0, err
}
fullLen := len(c.FullCompressedChunk)
fullLen := uint32(len(c.FullCompressedChunk))
_, err = tw.sink.Write(c.FullCompressedChunk)
if err != nil {
return err
return 0, err
}
tw.totalCompressedData += uint64(len(c.CompressedData))
tw.totalUncompressedData += uint64(uncmpLen)
// Stored in insertion order
tw.prefixes = append(tw.prefixes, prefixIndexRec{
c.H,
uint32(len(tw.prefixes)),
uint32(fullLen),
fullLen,
})
return nil
return fullLen, nil
}
// Finish will write the index and footer of the table file and return the id of the file.
+5 -4
View File
@@ -59,7 +59,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
tw, err := NewCmpChunkTableWriter("")
require.NoError(t, err)
for _, cmpChnk := range found {
err = tw.AddChunk(cmpChnk)
_, err = tw.AddChunk(cmpChnk)
require.NoError(t, err)
}
@@ -70,9 +70,9 @@ func TestCmpChunkTableWriter(t *testing.T) {
tw, err := NewCmpChunkTableWriter("")
require.NoError(t, err)
for _, cmpChnk := range found {
err = tw.AddChunk(cmpChnk)
_, err = tw.AddChunk(cmpChnk)
require.NoError(t, err)
err = tw.AddChunk(cmpChnk)
_, err = tw.AddChunk(cmpChnk)
require.NoError(t, err)
}
_, err = tw.Finish()
@@ -98,7 +98,8 @@ func TestCmpChunkTableWriter(t *testing.T) {
func TestCmpChunkTableWriterGhostChunk(t *testing.T) {
tw, err := NewCmpChunkTableWriter("")
require.NoError(t, err)
require.Error(t, tw.AddChunk(NewGhostCompressedChunk(hash.Parse("6af71afc2ea0hmp4olev0vp9q1q5gvb1"))))
_, err = tw.AddChunk(NewGhostCompressedChunk(hash.Parse("6af71afc2ea0hmp4olev0vp9q1q5gvb1")))
require.Error(t, err)
}
func TestContainsDuplicates(t *testing.T) {
+2 -1
View File
@@ -57,7 +57,8 @@ func newGarbageCollectionCopier(tfp tableFilePersister) (*gcCopier, error) {
}
func (gcc *gcCopier) addChunk(ctx context.Context, c ToChunker) error {
return gcc.writer.AddChunk(c)
_, err := gcc.writer.AddChunk(c)
return err
}
// If the writer should be closed and deleted, instead of being used with