From 6c270b1d2cebbd650ae5f6c76c502fa423cb4264 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Tue, 4 Feb 2025 10:17:04 -0800 Subject: [PATCH] First steel thread for archive fetch. --- .../doltcore/remotestorage/chunk_fetcher.go | 136 +++++++++++++++++- .../doltcore/remotestorage/chunk_store.go | 88 ++++++++++-- .../remotestorage/internal/ranges/ranges.go | 17 ++- .../internal/ranges/ranges_test.go | 28 ++-- go/store/datas/pull/puller.go | 11 +- go/store/nbs/archive.go | 2 +- go/store/nbs/archive_build.go | 2 +- integration-tests/bats/archive.bats | 48 +++++++ 8 files changed, 295 insertions(+), 37 deletions(-) diff --git a/go/libraries/doltcore/remotestorage/chunk_fetcher.go b/go/libraries/doltcore/remotestorage/chunk_fetcher.go index 830c90d35e..997512513c 100644 --- a/go/libraries/doltcore/remotestorage/chunk_fetcher.go +++ b/go/libraries/doltcore/remotestorage/chunk_fetcher.go @@ -18,9 +18,12 @@ import ( "context" "errors" "io" + "sync" "sync/atomic" "time" + "github.com/cenkalti/backoff/v4" + "github.com/dolthub/gozstd" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -68,7 +71,14 @@ const ( reliableCallDeliverRespTimeout = 15 * time.Second ) +var globalDictCache *DictionaryCache +var once sync.Once + func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher { + once.Do(func() { + globalDictCache = NewDictionaryCache(newDownloads(), dcs.csClient) + }) + eg, ctx := errgroup.WithContext(ctx) ret := &ChunkFetcher{ eg: eg, @@ -368,18 +378,27 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) { d.refreshes[path] = refresh } for _, r := range gr.Ranges { - d.ranges.Insert(gr.Url, r.Hash, r.Offset, r.Length) + // NM4 - Split at this point? Break the dictionary into its own request. + d.ranges.Insert(gr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength) + // if r.DictionaryLength == 0 { + // // NM4 - maybe invert the hash, and add it to a set of..... not sure. + // d.ranges.Insert(gr.Url, r.Hash, r.DictionaryOffset, r.DictionaryLength) + // } } } +// NM4 - On the client side, we only request HttpRanges for raw bytes. The struct includes the dictionary offset and length, +// but those only make sense in the response of DownloadLocations. func toGetRange(rs []*ranges.GetRange) *GetRange { ret := new(GetRange) for _, r := range rs { ret.Url = r.Url ret.Ranges = append(ret.Ranges, &remotesapi.RangeChunk{ - Hash: r.Hash, - Offset: r.Offset, - Length: r.Length, + Hash: r.Hash, + Offset: r.Offset, + Length: r.Length, + DictionaryOffset: r.DictionaryOffset, + DictionaryLength: r.DictionaryLength, }) } return ret @@ -593,3 +612,112 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don } } } + +/////// + +type DictionaryKey struct { + url string + off uint64 + len uint32 +} + +type DictionaryCache struct { + mu sync.Mutex + cache map[DictionaryKey]*gozstd.DDict + client remotesapi.ChunkStoreServiceClient + dlds downloads +} + +func NewDictionaryCache(downloads downloads, client remotesapi.ChunkStoreServiceClient) *DictionaryCache { + return &DictionaryCache{ + mu: sync.Mutex{}, + cache: make(map[DictionaryKey]*gozstd.DDict), + client: client, + dlds: downloads, + } +} + +func (dc *DictionaryCache) Get(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) { + // Way too granular... but I'll use a real cache for production. prototype maddddddneeesssss + dc.mu.Lock() + defer dc.mu.Unlock() + + path := rang.ResourcePath() + off := rang.Ranges[idx].DictionaryOffset + ln := rang.Ranges[idx].DictionaryLength + + key := DictionaryKey{path, off, ln} + if v, ok := dc.cache[key]; ok { + return v, nil + } else { + + pathToUrl := dc.dlds.refreshes[path] + if pathToUrl == nil { + // Kinda do what Add does.... + refresh := new(locationRefresh) + + sRang := &remotesapi.HttpGetRange{} + sRang.Url = rang.Url + sRang.Ranges = append(sRang.Ranges, &remotesapi.RangeChunk{Offset: off, Length: ln}) + rang := &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: sRang} + dl := &remotesapi.DownloadLoc{Location: rang} + + refresh.Add(dl) + dc.dlds.refreshes[path] = refresh + + pathToUrl = refresh + } + + ctx := context.Background() + fetcher := globalHttpFetcher + + urlF := func(lastError error) (string, error) { + earl, err := pathToUrl.GetURL(ctx, lastError, dc.client) + if err != nil { + return "", err + } + if earl == "" { + earl = path + } + return earl, nil + } + + resp := reliable.StreamingRangeDownload(ctx, reliable.StreamingRangeRequest{ + Fetcher: fetcher, + Offset: off, + Length: uint64(ln), + UrlFact: urlF, + Stats: stats, + Health: recorder, + BackOffFact: func(ctx context.Context) backoff.BackOff { + return downloadBackOff(ctx, 3) // params.DownloadRetryCount) + }, + Throughput: reliable.MinimumThroughputCheck{ + CheckInterval: defaultRequestParams.ThroughputMinimumCheckInterval, + BytesPerCheck: defaultRequestParams.ThroughputMinimumBytesPerCheck, + NumIntervals: defaultRequestParams.ThroughputMinimumNumIntervals, + }, + RespHeadersTimeout: defaultRequestParams.RespHeadersTimeout, + }) + defer resp.Close() + + buf := make([]byte, ln) + _, err := io.ReadFull(resp.Body, buf) + if err != nil { + return nil, err + } + + rawDict, err := gozstd.Decompress(nil, buf) + if err != nil { + return nil, err + } + + dict, err := gozstd.NewDDict(rawDict) + if err != nil { + return nil, err + } + + dc.cache[key] = dict + return dict, nil + } +} diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 75fa614c2c..f9c8d4ca8d 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -32,6 +32,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "github.com/dolthub/gozstd" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -371,6 +372,7 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha return nil } +// NM4 - Extending the protobuf isn't not really necesary. Possible split this out into a new struct. type GetRange remotesapi.HttpGetRange func (gr *GetRange) ResourcePath() string { @@ -436,6 +438,7 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he if len(gr.Ranges) == 0 { return func() error { return nil } } + return func() error { urlF := func(lastError error) (string, error) { url, err := pathToUrl(ctx, lastError, gr.ResourcePath()) @@ -466,9 +469,9 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he RespHeadersTimeout: params.RespHeadersTimeout, }) defer resp.Close() - reader := &RangeChunkReader{GetRange: gr, Reader: resp.Body} + reader := &RangeChunkReader{Path: gr.ResourcePath(), GetRange: gr, Reader: resp.Body} for { - cc, err := reader.ReadChunk() + cc, err := reader.ReadChunk(stats, health) if errors.Is(err, io.EOF) { return nil } @@ -484,14 +487,59 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he } } +type ArchiveToChunker struct { + h hash.Hash + dictionary *gozstd.DDict + chunkData []byte +} + +func (a ArchiveToChunker) Hash() hash.Hash { + return a.h +} + +func (a ArchiveToChunker) ToChunk() (chunks.Chunk, error) { + dict := a.dictionary + data := a.chunkData + rawChunk, err := gozstd.DecompressDict(nil, data, dict) + // NM4 - calculate chunk addr for safety while testing. + newChunk := chunks.NewChunk(rawChunk) + + if newChunk.Hash() != a.h { + panic("Hash Mismatch!!") + } + + return newChunk, err + +} + +func (a ArchiveToChunker) FullCompressedChunkLen() uint32 { + //TODO Not sure what the right impl for this is.... NM4. + return uint32(len(a.chunkData)) // + dictionary??? +} + +func (a ArchiveToChunker) IsEmpty() bool { + //TODO implement me + return len(a.chunkData) == 0 +} + +func (a ArchiveToChunker) IsGhost() bool { + //TODO implement me + // NM4 - yes, need to. Or maybe not???? + return false +} + +var _ nbs.ToChunker = (*ArchiveToChunker)(nil) + type RangeChunkReader struct { + Path string GetRange *GetRange Reader io.Reader i int skip int } -func (r *RangeChunkReader) ReadChunk() (nbs.CompressedChunk, error) { +// NM4 - THis is the place where we need to intercept responses and conjour the "full" chunk. +func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.HealthRecorder) (nbs.ToChunker, error) { if r.skip > 0 { _, err := io.CopyN(io.Discard, r.Reader, int64(r.skip)) if err != nil { @@ -499,21 +547,41 @@ func (r *RangeChunkReader) ReadChunk() (nbs.CompressedChunk, error) { } r.skip = 0 } - if r.i >= len(r.GetRange.Ranges) { + + idx := r.i + r.i += 1 + + if idx >= len(r.GetRange.Ranges) { return nbs.CompressedChunk{}, io.EOF } - if r.i < len(r.GetRange.Ranges)-1 { - r.skip = int(r.GetRange.GapBetween(r.i, r.i+1)) + if idx < len(r.GetRange.Ranges)-1 { + r.skip = int(r.GetRange.GapBetween(idx, idx+1)) } - l := r.GetRange.Ranges[r.i].Length - h := hash.New(r.GetRange.Ranges[r.i].Hash) - r.i += 1 + + rang := r.GetRange.Ranges[idx] + l := rang.Length + h := hash.New(rang.Hash) + + if strings.HasPrefix(h.String(), "eh9e0b3ou") { + _ = h.String() + } + buf := make([]byte, l) _, err := io.ReadFull(r.Reader, buf) if err != nil { return nbs.CompressedChunk{}, err } else { - return nbs.NewCompressedChunk(h, buf) + if rang.DictionaryLength == 0 { + // NOMS snappy compressed chunk. + return nbs.NewCompressedChunk(h, buf) + } else { + dict, err := globalDictCache.Get(r.GetRange, idx, stats, health) + if err != nil { + return nbs.CompressedChunk{}, err + } + + return ArchiveToChunker{h: h, dictionary: dict, chunkData: buf}, nil + } } } diff --git a/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go b/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go index de1600bd0a..9e3ebe1623 100644 --- a/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go +++ b/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go @@ -33,6 +33,11 @@ type GetRange struct { Offset uint64 Length uint32 Region *Region + + // Archive file format requires the url/dictionary offset/length to be carried through to fully resolve the chunk. + // This information is not used withing the range calculations at all, as the range is not related to the chunk content. + DictionaryOffset uint64 + DictionaryLength uint32 } // A |Region| represents a continuous range of bytes within in a Url. @@ -145,12 +150,14 @@ func (t *Tree) Len() int { return t.t.Len() } -func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32) { +func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, dictOffset uint64, dictLength uint32) { ins := &GetRange{ - Url: t.intern(url), - Hash: hash, - Offset: offset, - Length: length, + Url: t.intern(url), + Hash: hash, + Offset: offset, + Length: length, + DictionaryOffset: dictOffset, + DictionaryLength: dictLength, } t.t.ReplaceOrInsert(ins) diff --git a/go/libraries/doltcore/remotestorage/internal/ranges/ranges_test.go b/go/libraries/doltcore/remotestorage/internal/ranges/ranges_test.go index 9ec6b04e33..1f36ec296f 100644 --- a/go/libraries/doltcore/remotestorage/internal/ranges/ranges_test.go +++ b/go/libraries/doltcore/remotestorage/internal/ranges/ranges_test.go @@ -77,11 +77,11 @@ func TestTree(t *testing.T) { tree := NewTree(8 * 1024) // Insert 1KB ranges every 16 KB. for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 { - tree.Insert("A", []byte{}, uint64(j), 1024) + tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0) } // Insert 1KB ranges every 16 KB, offset by 8KB. for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 { - tree.Insert("A", []byte{}, uint64(i), 1024) + tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0) } assertTree(t, tree) }) @@ -89,11 +89,11 @@ func TestTree(t *testing.T) { tree := NewTree(8 * 1024) // Insert 1KB ranges every 16 KB, offset by 8KB. for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 { - tree.Insert("A", []byte{}, uint64(i), 1024) + tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0) } // Insert 1KB ranges every 16 KB. for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 { - tree.Insert("A", []byte{}, uint64(j), 1024) + tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0) } assertTree(t, tree) }) @@ -111,7 +111,7 @@ func TestTree(t *testing.T) { }) tree := NewTree(8 * 1024) for _, offset := range entries { - tree.Insert("A", []byte{}, offset, 1024) + tree.Insert("A", []byte{}, offset, 1024, 0, 0) } assertTree(t, tree) } @@ -126,7 +126,7 @@ func TestTree(t *testing.T) { "B", "A", "9", "8", } for i, j := 0, 0; i < 16; i, j = i+1, j+1024 { - tree.Insert(files[i], []byte{}, uint64(j), 1024) + tree.Insert(files[i], []byte{}, uint64(j), 1024, 0, 0) } assert.Equal(t, 16, tree.regions.Len()) assert.Equal(t, 16, tree.t.Len()) @@ -134,17 +134,17 @@ func TestTree(t *testing.T) { t.Run("MergeInMiddle", func(t *testing.T) { tree := NewTree(8 * 1024) // 1KB chunk at byte 0 - tree.Insert("A", []byte{}, 0, 1024) + tree.Insert("A", []byte{}, 0, 1024, 0, 0) // 1KB chunk at byte 16KB - tree.Insert("A", []byte{}, 16384, 1024) + tree.Insert("A", []byte{}, 16384, 1024, 0, 0) assert.Equal(t, 2, tree.regions.Len()) assert.Equal(t, 2, tree.t.Len()) // 1KB chunk at byte 8KB - tree.Insert("A", []byte{}, 8192, 1024) + tree.Insert("A", []byte{}, 8192, 1024, 0, 0) assert.Equal(t, 1, tree.regions.Len()) assert.Equal(t, 3, tree.t.Len()) - tree.Insert("A", []byte{}, 4096, 1024) - tree.Insert("A", []byte{}, 12228, 1024) + tree.Insert("A", []byte{}, 4096, 1024, 0, 0) + tree.Insert("A", []byte{}, 12228, 1024, 0, 0) assert.Equal(t, 1, tree.regions.Len()) assert.Equal(t, 5, tree.t.Len()) e, _ := tree.t.Min() @@ -184,7 +184,7 @@ func TestTree(t *testing.T) { t.Run("InsertAscending", func(t *testing.T) { tree := NewTree(4 * 1024) for _, e := range entries { - tree.Insert(e.url, []byte{e.id}, e.offset, e.length) + tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0) } assertTree(t, tree) }) @@ -192,7 +192,7 @@ func TestTree(t *testing.T) { tree := NewTree(4 * 1024) for i := len(entries) - 1; i >= 0; i-- { e := entries[i] - tree.Insert(e.url, []byte{e.id}, e.offset, e.length) + tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0) } assertTree(t, tree) }) @@ -205,7 +205,7 @@ func TestTree(t *testing.T) { }) tree := NewTree(4 * 1024) for _, e := range entries { - tree.Insert(e.url, []byte{e.id}, e.offset, e.length) + tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0) } assertTree(t, tree) } diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index 63ddf25ff4..c215988689 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -28,6 +28,7 @@ import ( "sync/atomic" "time" + "github.com/dolthub/dolt/go/libraries/doltcore/remotestorage" "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" @@ -384,8 +385,14 @@ func (p *Puller) Pull(ctx context.Context) error { if err != nil { return err } - } else { - panic("TODO: handle ZStd-CompressedChunk") // NM4. + } else if _, ok := cChk.(remotestorage.ArchiveToChunker); ok { + // NM4 - Until we can write quickly to archives..... + cc := nbs.ChunkToCompressedChunk(chnk) + + err = p.wr.AddCompressedChunk(ctx, cc) + if err != nil { + return err + } } } }) diff --git a/go/store/nbs/archive.go b/go/store/nbs/archive.go index 11594201d9..98692571fd 100644 --- a/go/store/nbs/archive.go +++ b/go/store/nbs/archive.go @@ -27,7 +27,7 @@ Chunks from the Archive. ByteSpans are arbitrary offset/lengths into the file which store (1) zstd dictionary data, and (2) compressed chunk data. Each Chunk is stored as a pair of ByteSpans (dict,data). Dictionary ByteSpans can (should) be used by multiple Chunks, so there are more ByteSpans than Chunks. The Index is used to map Chunks to ByteSpan pairs. These pairs are -called ChunkRefs, and were store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a +called ChunkRefs, and we store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a given Chunk with minimal processing at load time. A Dolt Archive file follows the following format: diff --git a/go/store/nbs/archive_build.go b/go/store/nbs/archive_build.go index 349bf239d1..e30f61b7d7 100644 --- a/go/store/nbs/archive_build.go +++ b/go/store/nbs/archive_build.go @@ -57,7 +57,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p return err } if exists { - // We have a fast path to follow because oritinal table file is still on disk. + // We have a fast path to follow because original table file is still on disk. swapMap[arc.hash()] = orginTfId } else { // We don't have the original table file id, so we have to create a new one. diff --git a/integration-tests/bats/archive.bats b/integration-tests/bats/archive.bats index 2a8675d6b7..32100da5d3 100755 --- a/integration-tests/bats/archive.bats +++ b/integration-tests/bats/archive.bats @@ -213,3 +213,51 @@ mutations_and_gc_statement() { [[ "$output" =~ "151525" ]] || false # i = 1 - 550, sum is 151525 } + +@test "archive: can fetch chunks from an archived repo" { + mkdir -p remote/.dolt + mkdir cloned + + # Copy the archive test repo to remote directory + cp -R $BATS_TEST_DIRNAME/archive-test-repo/* remote/.dolt + cd remote + + port=$( definePORT ) + + remotesrv --http-port $port --grpc-port $port --repo-mode & + remotesrv_pid=$! + [[ "$remotesrv_pid" -gt 0 ]] || false + + cd ../cloned + dolt clone http://localhost:$port/test-org/test-repo repo1 + # Fetch when there are no changes. + cd repo1 + dolt fetch + + ## update the remote repo directly. Need to run the archive command when the server is stopped. + ## This will result in achived files on the remote, which we will need to read chunks from when we fetch. + cd ../../remote + kill $remotesrv_pid + wait $remotesrv_pid || : + remotesrv_pid="" + dolt sql -q "$(mutations_and_gc_statement)" + dolt archive + + remotesrv --http-port $port --grpc-port $port --repo-mode & + remotesrv_pid=$! + [[ "$remotesrv_pid" -gt 0 ]] || false + + cd ../cloned/repo1 + + run dolt fetch + [ "$status" -eq 0 ] + + run dolt status + [ "$status" -eq 0 ] + + [[ "$output" =~ "Your branch is behind 'origin/main' by 20 commits, and can be fast-forwarded" ]] || false + + # Verify the repo has integrity. + dolt fsck +} +