First steel thread for archive fetch.

This commit is contained in:
Neil Macneale IV
2025-02-04 10:17:04 -08:00
parent 66f1b9f73c
commit 6c270b1d2c
8 changed files with 295 additions and 37 deletions

View File

@@ -18,9 +18,12 @@ import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/dolthub/gozstd"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@@ -68,7 +71,14 @@ const (
reliableCallDeliverRespTimeout = 15 * time.Second
)
var globalDictCache *DictionaryCache
var once sync.Once
func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher {
once.Do(func() {
globalDictCache = NewDictionaryCache(newDownloads(), dcs.csClient)
})
eg, ctx := errgroup.WithContext(ctx)
ret := &ChunkFetcher{
eg: eg,
@@ -368,18 +378,27 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) {
d.refreshes[path] = refresh
}
for _, r := range gr.Ranges {
d.ranges.Insert(gr.Url, r.Hash, r.Offset, r.Length)
// NM4 - Split at this point? Break the dictionary into its own request.
d.ranges.Insert(gr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength)
// if r.DictionaryLength == 0 {
// // NM4 - maybe invert the hash, and add it to a set of..... not sure.
// d.ranges.Insert(gr.Url, r.Hash, r.DictionaryOffset, r.DictionaryLength)
// }
}
}
// NM4 - On the client side, we only request HttpRanges for raw bytes. The struct includes the dictionary offset and length,
// but those only make sense in the response of DownloadLocations.
func toGetRange(rs []*ranges.GetRange) *GetRange {
ret := new(GetRange)
for _, r := range rs {
ret.Url = r.Url
ret.Ranges = append(ret.Ranges, &remotesapi.RangeChunk{
Hash: r.Hash,
Offset: r.Offset,
Length: r.Length,
Hash: r.Hash,
Offset: r.Offset,
Length: r.Length,
DictionaryOffset: r.DictionaryOffset,
DictionaryLength: r.DictionaryLength,
})
}
return ret
@@ -593,3 +612,112 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don
}
}
}
///////
type DictionaryKey struct {
url string
off uint64
len uint32
}
type DictionaryCache struct {
mu sync.Mutex
cache map[DictionaryKey]*gozstd.DDict
client remotesapi.ChunkStoreServiceClient
dlds downloads
}
func NewDictionaryCache(downloads downloads, client remotesapi.ChunkStoreServiceClient) *DictionaryCache {
return &DictionaryCache{
mu: sync.Mutex{},
cache: make(map[DictionaryKey]*gozstd.DDict),
client: client,
dlds: downloads,
}
}
func (dc *DictionaryCache) Get(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
// Way too granular... but I'll use a real cache for production. prototype maddddddneeesssss
dc.mu.Lock()
defer dc.mu.Unlock()
path := rang.ResourcePath()
off := rang.Ranges[idx].DictionaryOffset
ln := rang.Ranges[idx].DictionaryLength
key := DictionaryKey{path, off, ln}
if v, ok := dc.cache[key]; ok {
return v, nil
} else {
pathToUrl := dc.dlds.refreshes[path]
if pathToUrl == nil {
// Kinda do what Add does....
refresh := new(locationRefresh)
sRang := &remotesapi.HttpGetRange{}
sRang.Url = rang.Url
sRang.Ranges = append(sRang.Ranges, &remotesapi.RangeChunk{Offset: off, Length: ln})
rang := &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: sRang}
dl := &remotesapi.DownloadLoc{Location: rang}
refresh.Add(dl)
dc.dlds.refreshes[path] = refresh
pathToUrl = refresh
}
ctx := context.Background()
fetcher := globalHttpFetcher
urlF := func(lastError error) (string, error) {
earl, err := pathToUrl.GetURL(ctx, lastError, dc.client)
if err != nil {
return "", err
}
if earl == "" {
earl = path
}
return earl, nil
}
resp := reliable.StreamingRangeDownload(ctx, reliable.StreamingRangeRequest{
Fetcher: fetcher,
Offset: off,
Length: uint64(ln),
UrlFact: urlF,
Stats: stats,
Health: recorder,
BackOffFact: func(ctx context.Context) backoff.BackOff {
return downloadBackOff(ctx, 3) // params.DownloadRetryCount)
},
Throughput: reliable.MinimumThroughputCheck{
CheckInterval: defaultRequestParams.ThroughputMinimumCheckInterval,
BytesPerCheck: defaultRequestParams.ThroughputMinimumBytesPerCheck,
NumIntervals: defaultRequestParams.ThroughputMinimumNumIntervals,
},
RespHeadersTimeout: defaultRequestParams.RespHeadersTimeout,
})
defer resp.Close()
buf := make([]byte, ln)
_, err := io.ReadFull(resp.Body, buf)
if err != nil {
return nil, err
}
rawDict, err := gozstd.Decompress(nil, buf)
if err != nil {
return nil, err
}
dict, err := gozstd.NewDDict(rawDict)
if err != nil {
return nil, err
}
dc.cache[key] = dict
return dict, nil
}
}

View File

@@ -32,6 +32,7 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
"github.com/dolthub/gozstd"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@@ -371,6 +372,7 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
return nil
}
// NM4 - Extending the protobuf isn't not really necesary. Possible split this out into a new struct.
type GetRange remotesapi.HttpGetRange
func (gr *GetRange) ResourcePath() string {
@@ -436,6 +438,7 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he
if len(gr.Ranges) == 0 {
return func() error { return nil }
}
return func() error {
urlF := func(lastError error) (string, error) {
url, err := pathToUrl(ctx, lastError, gr.ResourcePath())
@@ -466,9 +469,9 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he
RespHeadersTimeout: params.RespHeadersTimeout,
})
defer resp.Close()
reader := &RangeChunkReader{GetRange: gr, Reader: resp.Body}
reader := &RangeChunkReader{Path: gr.ResourcePath(), GetRange: gr, Reader: resp.Body}
for {
cc, err := reader.ReadChunk()
cc, err := reader.ReadChunk(stats, health)
if errors.Is(err, io.EOF) {
return nil
}
@@ -484,14 +487,59 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he
}
}
type ArchiveToChunker struct {
h hash.Hash
dictionary *gozstd.DDict
chunkData []byte
}
func (a ArchiveToChunker) Hash() hash.Hash {
return a.h
}
func (a ArchiveToChunker) ToChunk() (chunks.Chunk, error) {
dict := a.dictionary
data := a.chunkData
rawChunk, err := gozstd.DecompressDict(nil, data, dict)
// NM4 - calculate chunk addr for safety while testing.
newChunk := chunks.NewChunk(rawChunk)
if newChunk.Hash() != a.h {
panic("Hash Mismatch!!")
}
return newChunk, err
}
func (a ArchiveToChunker) FullCompressedChunkLen() uint32 {
//TODO Not sure what the right impl for this is.... NM4.
return uint32(len(a.chunkData)) // + dictionary???
}
func (a ArchiveToChunker) IsEmpty() bool {
//TODO implement me
return len(a.chunkData) == 0
}
func (a ArchiveToChunker) IsGhost() bool {
//TODO implement me
// NM4 - yes, need to. Or maybe not????
return false
}
var _ nbs.ToChunker = (*ArchiveToChunker)(nil)
type RangeChunkReader struct {
Path string
GetRange *GetRange
Reader io.Reader
i int
skip int
}
func (r *RangeChunkReader) ReadChunk() (nbs.CompressedChunk, error) {
// NM4 - THis is the place where we need to intercept responses and conjour the "full" chunk.
func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.HealthRecorder) (nbs.ToChunker, error) {
if r.skip > 0 {
_, err := io.CopyN(io.Discard, r.Reader, int64(r.skip))
if err != nil {
@@ -499,21 +547,41 @@ func (r *RangeChunkReader) ReadChunk() (nbs.CompressedChunk, error) {
}
r.skip = 0
}
if r.i >= len(r.GetRange.Ranges) {
idx := r.i
r.i += 1
if idx >= len(r.GetRange.Ranges) {
return nbs.CompressedChunk{}, io.EOF
}
if r.i < len(r.GetRange.Ranges)-1 {
r.skip = int(r.GetRange.GapBetween(r.i, r.i+1))
if idx < len(r.GetRange.Ranges)-1 {
r.skip = int(r.GetRange.GapBetween(idx, idx+1))
}
l := r.GetRange.Ranges[r.i].Length
h := hash.New(r.GetRange.Ranges[r.i].Hash)
r.i += 1
rang := r.GetRange.Ranges[idx]
l := rang.Length
h := hash.New(rang.Hash)
if strings.HasPrefix(h.String(), "eh9e0b3ou") {
_ = h.String()
}
buf := make([]byte, l)
_, err := io.ReadFull(r.Reader, buf)
if err != nil {
return nbs.CompressedChunk{}, err
} else {
return nbs.NewCompressedChunk(h, buf)
if rang.DictionaryLength == 0 {
// NOMS snappy compressed chunk.
return nbs.NewCompressedChunk(h, buf)
} else {
dict, err := globalDictCache.Get(r.GetRange, idx, stats, health)
if err != nil {
return nbs.CompressedChunk{}, err
}
return ArchiveToChunker{h: h, dictionary: dict, chunkData: buf}, nil
}
}
}

View File

@@ -33,6 +33,11 @@ type GetRange struct {
Offset uint64
Length uint32
Region *Region
// Archive file format requires the url/dictionary offset/length to be carried through to fully resolve the chunk.
// This information is not used withing the range calculations at all, as the range is not related to the chunk content.
DictionaryOffset uint64
DictionaryLength uint32
}
// A |Region| represents a continuous range of bytes within in a Url.
@@ -145,12 +150,14 @@ func (t *Tree) Len() int {
return t.t.Len()
}
func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32) {
func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, dictOffset uint64, dictLength uint32) {
ins := &GetRange{
Url: t.intern(url),
Hash: hash,
Offset: offset,
Length: length,
Url: t.intern(url),
Hash: hash,
Offset: offset,
Length: length,
DictionaryOffset: dictOffset,
DictionaryLength: dictLength,
}
t.t.ReplaceOrInsert(ins)

View File

@@ -77,11 +77,11 @@ func TestTree(t *testing.T) {
tree := NewTree(8 * 1024)
// Insert 1KB ranges every 16 KB.
for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 {
tree.Insert("A", []byte{}, uint64(j), 1024)
tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0)
}
// Insert 1KB ranges every 16 KB, offset by 8KB.
for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 {
tree.Insert("A", []byte{}, uint64(i), 1024)
tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0)
}
assertTree(t, tree)
})
@@ -89,11 +89,11 @@ func TestTree(t *testing.T) {
tree := NewTree(8 * 1024)
// Insert 1KB ranges every 16 KB, offset by 8KB.
for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 {
tree.Insert("A", []byte{}, uint64(i), 1024)
tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0)
}
// Insert 1KB ranges every 16 KB.
for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 {
tree.Insert("A", []byte{}, uint64(j), 1024)
tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0)
}
assertTree(t, tree)
})
@@ -111,7 +111,7 @@ func TestTree(t *testing.T) {
})
tree := NewTree(8 * 1024)
for _, offset := range entries {
tree.Insert("A", []byte{}, offset, 1024)
tree.Insert("A", []byte{}, offset, 1024, 0, 0)
}
assertTree(t, tree)
}
@@ -126,7 +126,7 @@ func TestTree(t *testing.T) {
"B", "A", "9", "8",
}
for i, j := 0, 0; i < 16; i, j = i+1, j+1024 {
tree.Insert(files[i], []byte{}, uint64(j), 1024)
tree.Insert(files[i], []byte{}, uint64(j), 1024, 0, 0)
}
assert.Equal(t, 16, tree.regions.Len())
assert.Equal(t, 16, tree.t.Len())
@@ -134,17 +134,17 @@ func TestTree(t *testing.T) {
t.Run("MergeInMiddle", func(t *testing.T) {
tree := NewTree(8 * 1024)
// 1KB chunk at byte 0
tree.Insert("A", []byte{}, 0, 1024)
tree.Insert("A", []byte{}, 0, 1024, 0, 0)
// 1KB chunk at byte 16KB
tree.Insert("A", []byte{}, 16384, 1024)
tree.Insert("A", []byte{}, 16384, 1024, 0, 0)
assert.Equal(t, 2, tree.regions.Len())
assert.Equal(t, 2, tree.t.Len())
// 1KB chunk at byte 8KB
tree.Insert("A", []byte{}, 8192, 1024)
tree.Insert("A", []byte{}, 8192, 1024, 0, 0)
assert.Equal(t, 1, tree.regions.Len())
assert.Equal(t, 3, tree.t.Len())
tree.Insert("A", []byte{}, 4096, 1024)
tree.Insert("A", []byte{}, 12228, 1024)
tree.Insert("A", []byte{}, 4096, 1024, 0, 0)
tree.Insert("A", []byte{}, 12228, 1024, 0, 0)
assert.Equal(t, 1, tree.regions.Len())
assert.Equal(t, 5, tree.t.Len())
e, _ := tree.t.Min()
@@ -184,7 +184,7 @@ func TestTree(t *testing.T) {
t.Run("InsertAscending", func(t *testing.T) {
tree := NewTree(4 * 1024)
for _, e := range entries {
tree.Insert(e.url, []byte{e.id}, e.offset, e.length)
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
}
assertTree(t, tree)
})
@@ -192,7 +192,7 @@ func TestTree(t *testing.T) {
tree := NewTree(4 * 1024)
for i := len(entries) - 1; i >= 0; i-- {
e := entries[i]
tree.Insert(e.url, []byte{e.id}, e.offset, e.length)
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
}
assertTree(t, tree)
})
@@ -205,7 +205,7 @@ func TestTree(t *testing.T) {
})
tree := NewTree(4 * 1024)
for _, e := range entries {
tree.Insert(e.url, []byte{e.id}, e.offset, e.length)
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
}
assertTree(t, tree)
}

View File

@@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
@@ -384,8 +385,14 @@ func (p *Puller) Pull(ctx context.Context) error {
if err != nil {
return err
}
} else {
panic("TODO: handle ZStd-CompressedChunk") // NM4.
} else if _, ok := cChk.(remotestorage.ArchiveToChunker); ok {
// NM4 - Until we can write quickly to archives.....
cc := nbs.ChunkToCompressedChunk(chnk)
err = p.wr.AddCompressedChunk(ctx, cc)
if err != nil {
return err
}
}
}
})

View File

@@ -27,7 +27,7 @@ Chunks from the Archive.
ByteSpans are arbitrary offset/lengths into the file which store (1) zstd dictionary data, and (2) compressed chunk
data. Each Chunk is stored as a pair of ByteSpans (dict,data). Dictionary ByteSpans can (should) be used by multiple
Chunks, so there are more ByteSpans than Chunks. The Index is used to map Chunks to ByteSpan pairs. These pairs are
called ChunkRefs, and were store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a
called ChunkRefs, and we store them as [uint32,uint32] on disk. This allows us to quickly find the ByteSpans for a
given Chunk with minimal processing at load time.
A Dolt Archive file follows the following format:

View File

@@ -57,7 +57,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p
return err
}
if exists {
// We have a fast path to follow because oritinal table file is still on disk.
// We have a fast path to follow because original table file is still on disk.
swapMap[arc.hash()] = orginTfId
} else {
// We don't have the original table file id, so we have to create a new one.

View File

@@ -213,3 +213,51 @@ mutations_and_gc_statement() {
[[ "$output" =~ "151525" ]] || false # i = 1 - 550, sum is 151525
}
@test "archive: can fetch chunks from an archived repo" {
mkdir -p remote/.dolt
mkdir cloned
# Copy the archive test repo to remote directory
cp -R $BATS_TEST_DIRNAME/archive-test-repo/* remote/.dolt
cd remote
port=$( definePORT )
remotesrv --http-port $port --grpc-port $port --repo-mode &
remotesrv_pid=$!
[[ "$remotesrv_pid" -gt 0 ]] || false
cd ../cloned
dolt clone http://localhost:$port/test-org/test-repo repo1
# Fetch when there are no changes.
cd repo1
dolt fetch
## update the remote repo directly. Need to run the archive command when the server is stopped.
## This will result in achived files on the remote, which we will need to read chunks from when we fetch.
cd ../../remote
kill $remotesrv_pid
wait $remotesrv_pid || :
remotesrv_pid=""
dolt sql -q "$(mutations_and_gc_statement)"
dolt archive
remotesrv --http-port $port --grpc-port $port --repo-mode &
remotesrv_pid=$!
[[ "$remotesrv_pid" -gt 0 ]] || false
cd ../cloned/repo1
run dolt fetch
[ "$status" -eq 0 ]
run dolt status
[ "$status" -eq 0 ]
[[ "$output" =~ "Your branch is behind 'origin/main' by 20 commits, and can be fast-forwarded" ]] || false
# Verify the repo has integrity.
dolt fsck
}