From 78d451b8837d2fbabf43519b70a74ddadfa23645 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 13 Feb 2025 09:53:42 -0800 Subject: [PATCH] go: remotestorage: chunk_fetcher: Ranges carry the Dict{Offset,Len}, instead of a GetDict callback. --- .../doltcore/remotestorage/chunk_fetcher.go | 68 +++++++++---------- .../doltcore/remotestorage/chunk_store.go | 9 +-- .../remotestorage/internal/ranges/ranges.go | 26 +++---- .../internal/ranges/ranges_test.go | 28 ++++---- 4 files changed, 67 insertions(+), 64 deletions(-) diff --git a/go/libraries/doltcore/remotestorage/chunk_fetcher.go b/go/libraries/doltcore/remotestorage/chunk_fetcher.go index 870f171a0d..e9f7b3ad1d 100644 --- a/go/libraries/doltcore/remotestorage/chunk_fetcher.go +++ b/go/libraries/doltcore/remotestorage/chunk_fetcher.go @@ -349,7 +349,7 @@ type fetchResp struct { } type fetchReq struct { - respCh chan fetchResp + respCh chan fetchResp } // A simple structure to keep track of *GetRange requests along with @@ -363,11 +363,11 @@ type downloads struct { // (or concurrently with) any chunkRanges, since the chunk // range fetches will block the fetching thread on the // population of the dictionary cache entry. - dictRanges *ranges.Tree + dictRanges *ranges.Tree // Holds all pending and fetched dictionaries for any chunk // ranges that have gone into |chunkRanges|. - dictCache *dictionaryCache - refreshes map[string]*locationRefresh + dictCache *dictionaryCache + refreshes map[string]*locationRefresh } func newDownloads() downloads { @@ -390,15 +390,13 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) { d.refreshes[path] = refresh } for _, r := range hgr.Ranges { - var getDict func() (any, error) if r.DictionaryLength != 0 { - var first bool - getDict, first = d.dictCache.get(path, r.DictionaryOffset, r.DictionaryLength) - if first { - d.dictRanges.Insert(hgr.Url, nil, r.DictionaryOffset, r.DictionaryLength, nil) + _, has := d.dictCache.getOrCreate(path, r.DictionaryOffset, r.DictionaryLength) + if !has { + d.dictRanges.Insert(hgr.Url, nil, r.DictionaryOffset, r.DictionaryLength, 0, 0) } } - d.chunkRanges.Insert(hgr.Url, r.Hash[:], r.Offset, r.Length, getDict) + d.chunkRanges.Insert(hgr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength) } } @@ -407,10 +405,11 @@ func toGetRange(rs []*ranges.GetRange) *GetRange { for _, r := range rs { ret.Url = r.Url ret.Ranges = append(ret.Ranges, &Range{ - Hash: r.Hash, - Offset: r.Offset, - Length: r.Length, - GetDict: r.GetDict, + Hash: r.Hash, + Offset: r.Offset, + Length: r.Length, + DictOffset: r.DictOffset, + DictLength: r.DictLength, }) } return ret @@ -575,12 +574,13 @@ func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, do return nil } -func deliverChunkCallback(chunkCh chan nbs.ToChunker) func(context.Context, []byte, *Range) error { +func deliverChunkCallback(chunkCh chan nbs.ToChunker, path string, dictCache *dictionaryCache) func(context.Context, []byte, *Range) error { return func(ctx context.Context, bs []byte, rang *Range) error { h := hash.New(rang.Hash[:]) var cc nbs.ToChunker - if rang.GetDict != nil { - dictRes, err := rang.GetDict() + if rang.DictLength != 0 { + payload, _ := dictCache.getOrCreate(path, rang.DictOffset, rang.DictLength) + dictRes, err := payload.Get() if err != nil { return err } @@ -608,7 +608,8 @@ func setDictionaryCallback(dictCache *dictionaryCache, path string) func(context if err == nil { ddict, err = gozstd.NewDDict(decompressed) } - dictCache.set(path, rang.Offset, rang.Length, ddict, err) + payload, _ := dictCache.getOrCreate(path, rang.Offset, rang.Length) + payload.Set(ddict, err) // XXX: For now, we fail here on any error, instead of when we try to use the dictionary... // For now, the record in the cache will be terminally failed and is never removed. return err @@ -630,7 +631,7 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don case fetchResp := <-respCh: var cb func(context.Context, []byte, *Range) error if fetchResp.rangeType == rangeType_Chunk { - cb = deliverChunkCallback(chunkCh) + cb = deliverChunkCallback(chunkCh, fetchResp.path, fetchResp.dictCache) } else { cb = setDictionaryCallback(fetchResp.dictCache, fetchResp.path) } @@ -653,7 +654,7 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don // fetching those contents. Every |GetRange| that has a dictionary // dependency gets the record out of the dictionary cache. The first // time the cache entry is created, the download thread also schedules -// the dictionary itself to be fetched and populated through |set|. +// the dictionary itself to be fetched and populated through |Set|. type dictionaryCache struct { dictionaries sync.Map } @@ -673,21 +674,20 @@ type DictionaryPayload struct { err error } -func (dc *dictionaryCache) get(path string, offset uint64, length uint32) (func() (any, error), bool) { +func (p *DictionaryPayload) Get() (any, error) { + <-p.done + return p.res, p.err +} + +func (p *DictionaryPayload) Set(res any, err error) { + p.res = res + p.err = err + close(p.done) +} + +func (dc *dictionaryCache) getOrCreate(path string, offset uint64, length uint32) (*DictionaryPayload, bool) { key := DictionaryKey{path, offset, length} entry, loaded := dc.dictionaries.LoadOrStore(key, &DictionaryPayload{done: make(chan struct{})}) payload := entry.(*DictionaryPayload) - return func() (any, error) { - <-payload.done - return payload.res, payload.err - }, !loaded -} - -func (dc *dictionaryCache) set(path string, offset uint64, length uint32, res any, err error) { - key := DictionaryKey{path, offset, length} - entry, _ := dc.dictionaries.LoadOrStore(key, &DictionaryPayload{done: make(chan struct{})}) - payload := entry.(*DictionaryPayload) - payload.res = res - payload.err = err - close(payload.done) + return payload, loaded } diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 1fd3d83003..cd89448dc4 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -379,10 +379,11 @@ type GetRange struct { } type Range struct { - Hash []byte - Offset uint64 - Length uint32 - GetDict func() (any, error) + Hash []byte + Offset uint64 + Length uint32 + DictOffset uint64 + DictLength uint32 } func ResourcePath(urlS string) string { diff --git a/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go b/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go index 404ad7db02..924756350e 100644 --- a/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go +++ b/go/libraries/doltcore/remotestorage/internal/ranges/ranges.go @@ -35,12 +35,13 @@ import ( // some state which allows them to fetch those dictionaries from a shared // chache when they need them. That is their GetDict callback. type GetRange struct { - Url string - Hash []byte - Offset uint64 - Length uint32 - GetDict func() (any, error) - Region *Region + Url string + Hash []byte + Offset uint64 + Length uint32 + DictOffset uint64 + DictLength uint32 + Region *Region } // A |Region| represents a continuous range of bytes within in a Url. @@ -153,13 +154,14 @@ func (t *Tree) Len() int { return t.t.Len() } -func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, getDict func() (any, error)) { +func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, dictOffset uint64, dictLength uint32) { ins := &GetRange{ - Url: t.intern(url), - Hash: hash, - Offset: offset, - Length: length, - GetDict: getDict, + Url: t.intern(url), + Hash: hash, + Offset: offset, + Length: length, + DictOffset: dictOffset, + DictLength: dictLength, } t.t.ReplaceOrInsert(ins) diff --git a/go/libraries/doltcore/remotestorage/internal/ranges/ranges_test.go b/go/libraries/doltcore/remotestorage/internal/ranges/ranges_test.go index ea9ae352ee..1f36ec296f 100644 --- a/go/libraries/doltcore/remotestorage/internal/ranges/ranges_test.go +++ b/go/libraries/doltcore/remotestorage/internal/ranges/ranges_test.go @@ -77,11 +77,11 @@ func TestTree(t *testing.T) { tree := NewTree(8 * 1024) // Insert 1KB ranges every 16 KB. for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 { - tree.Insert("A", []byte{}, uint64(j), 1024, nil) + tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0) } // Insert 1KB ranges every 16 KB, offset by 8KB. for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 { - tree.Insert("A", []byte{}, uint64(i), 1024, nil) + tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0) } assertTree(t, tree) }) @@ -89,11 +89,11 @@ func TestTree(t *testing.T) { tree := NewTree(8 * 1024) // Insert 1KB ranges every 16 KB, offset by 8KB. for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 { - tree.Insert("A", []byte{}, uint64(i), 1024, nil) + tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0) } // Insert 1KB ranges every 16 KB. for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 { - tree.Insert("A", []byte{}, uint64(j), 1024, nil) + tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0) } assertTree(t, tree) }) @@ -111,7 +111,7 @@ func TestTree(t *testing.T) { }) tree := NewTree(8 * 1024) for _, offset := range entries { - tree.Insert("A", []byte{}, offset, 1024, nil) + tree.Insert("A", []byte{}, offset, 1024, 0, 0) } assertTree(t, tree) } @@ -126,7 +126,7 @@ func TestTree(t *testing.T) { "B", "A", "9", "8", } for i, j := 0, 0; i < 16; i, j = i+1, j+1024 { - tree.Insert(files[i], []byte{}, uint64(j), 1024, nil) + tree.Insert(files[i], []byte{}, uint64(j), 1024, 0, 0) } assert.Equal(t, 16, tree.regions.Len()) assert.Equal(t, 16, tree.t.Len()) @@ -134,17 +134,17 @@ func TestTree(t *testing.T) { t.Run("MergeInMiddle", func(t *testing.T) { tree := NewTree(8 * 1024) // 1KB chunk at byte 0 - tree.Insert("A", []byte{}, 0, 1024, nil) + tree.Insert("A", []byte{}, 0, 1024, 0, 0) // 1KB chunk at byte 16KB - tree.Insert("A", []byte{}, 16384, 1024, nil) + tree.Insert("A", []byte{}, 16384, 1024, 0, 0) assert.Equal(t, 2, tree.regions.Len()) assert.Equal(t, 2, tree.t.Len()) // 1KB chunk at byte 8KB - tree.Insert("A", []byte{}, 8192, 1024, nil) + tree.Insert("A", []byte{}, 8192, 1024, 0, 0) assert.Equal(t, 1, tree.regions.Len()) assert.Equal(t, 3, tree.t.Len()) - tree.Insert("A", []byte{}, 4096, 1024, nil) - tree.Insert("A", []byte{}, 12228, 1024, nil) + tree.Insert("A", []byte{}, 4096, 1024, 0, 0) + tree.Insert("A", []byte{}, 12228, 1024, 0, 0) assert.Equal(t, 1, tree.regions.Len()) assert.Equal(t, 5, tree.t.Len()) e, _ := tree.t.Min() @@ -184,7 +184,7 @@ func TestTree(t *testing.T) { t.Run("InsertAscending", func(t *testing.T) { tree := NewTree(4 * 1024) for _, e := range entries { - tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil) + tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0) } assertTree(t, tree) }) @@ -192,7 +192,7 @@ func TestTree(t *testing.T) { tree := NewTree(4 * 1024) for i := len(entries) - 1; i >= 0; i-- { e := entries[i] - tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil) + tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0) } assertTree(t, tree) }) @@ -205,7 +205,7 @@ func TestTree(t *testing.T) { }) tree := NewTree(4 * 1024) for _, e := range entries { - tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil) + tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0) } assertTree(t, tree) }