go: remotestorage: chunk_fetcher: Ranges carry the Dict{Offset,Len}, instead of a GetDict callback.

This commit is contained in:
Aaron Son
2025-02-13 09:53:42 -08:00
parent 52fab61a3d
commit 78d451b883
4 changed files with 67 additions and 64 deletions

View File

@@ -349,7 +349,7 @@ type fetchResp struct {
}
type fetchReq struct {
respCh chan fetchResp
respCh chan fetchResp
}
// A simple structure to keep track of *GetRange requests along with
@@ -363,11 +363,11 @@ type downloads struct {
// (or concurrently with) any chunkRanges, since the chunk
// range fetches will block the fetching thread on the
// population of the dictionary cache entry.
dictRanges *ranges.Tree
dictRanges *ranges.Tree
// Holds all pending and fetched dictionaries for any chunk
// ranges that have gone into |chunkRanges|.
dictCache *dictionaryCache
refreshes map[string]*locationRefresh
dictCache *dictionaryCache
refreshes map[string]*locationRefresh
}
func newDownloads() downloads {
@@ -390,15 +390,13 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) {
d.refreshes[path] = refresh
}
for _, r := range hgr.Ranges {
var getDict func() (any, error)
if r.DictionaryLength != 0 {
var first bool
getDict, first = d.dictCache.get(path, r.DictionaryOffset, r.DictionaryLength)
if first {
d.dictRanges.Insert(hgr.Url, nil, r.DictionaryOffset, r.DictionaryLength, nil)
_, has := d.dictCache.getOrCreate(path, r.DictionaryOffset, r.DictionaryLength)
if !has {
d.dictRanges.Insert(hgr.Url, nil, r.DictionaryOffset, r.DictionaryLength, 0, 0)
}
}
d.chunkRanges.Insert(hgr.Url, r.Hash[:], r.Offset, r.Length, getDict)
d.chunkRanges.Insert(hgr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength)
}
}
@@ -407,10 +405,11 @@ func toGetRange(rs []*ranges.GetRange) *GetRange {
for _, r := range rs {
ret.Url = r.Url
ret.Ranges = append(ret.Ranges, &Range{
Hash: r.Hash,
Offset: r.Offset,
Length: r.Length,
GetDict: r.GetDict,
Hash: r.Hash,
Offset: r.Offset,
Length: r.Length,
DictOffset: r.DictOffset,
DictLength: r.DictLength,
})
}
return ret
@@ -575,12 +574,13 @@ func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, do
return nil
}
func deliverChunkCallback(chunkCh chan nbs.ToChunker) func(context.Context, []byte, *Range) error {
func deliverChunkCallback(chunkCh chan nbs.ToChunker, path string, dictCache *dictionaryCache) func(context.Context, []byte, *Range) error {
return func(ctx context.Context, bs []byte, rang *Range) error {
h := hash.New(rang.Hash[:])
var cc nbs.ToChunker
if rang.GetDict != nil {
dictRes, err := rang.GetDict()
if rang.DictLength != 0 {
payload, _ := dictCache.getOrCreate(path, rang.DictOffset, rang.DictLength)
dictRes, err := payload.Get()
if err != nil {
return err
}
@@ -608,7 +608,8 @@ func setDictionaryCallback(dictCache *dictionaryCache, path string) func(context
if err == nil {
ddict, err = gozstd.NewDDict(decompressed)
}
dictCache.set(path, rang.Offset, rang.Length, ddict, err)
payload, _ := dictCache.getOrCreate(path, rang.Offset, rang.Length)
payload.Set(ddict, err)
// XXX: For now, we fail here on any error, instead of when we try to use the dictionary...
// For now, the record in the cache will be terminally failed and is never removed.
return err
@@ -630,7 +631,7 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don
case fetchResp := <-respCh:
var cb func(context.Context, []byte, *Range) error
if fetchResp.rangeType == rangeType_Chunk {
cb = deliverChunkCallback(chunkCh)
cb = deliverChunkCallback(chunkCh, fetchResp.path, fetchResp.dictCache)
} else {
cb = setDictionaryCallback(fetchResp.dictCache, fetchResp.path)
}
@@ -653,7 +654,7 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don
// fetching those contents. Every |GetRange| that has a dictionary
// dependency gets the record out of the dictionary cache. The first
// time the cache entry is created, the download thread also schedules
// the dictionary itself to be fetched and populated through |set|.
// the dictionary itself to be fetched and populated through |Set|.
type dictionaryCache struct {
dictionaries sync.Map
}
@@ -673,21 +674,20 @@ type DictionaryPayload struct {
err error
}
func (dc *dictionaryCache) get(path string, offset uint64, length uint32) (func() (any, error), bool) {
func (p *DictionaryPayload) Get() (any, error) {
<-p.done
return p.res, p.err
}
func (p *DictionaryPayload) Set(res any, err error) {
p.res = res
p.err = err
close(p.done)
}
func (dc *dictionaryCache) getOrCreate(path string, offset uint64, length uint32) (*DictionaryPayload, bool) {
key := DictionaryKey{path, offset, length}
entry, loaded := dc.dictionaries.LoadOrStore(key, &DictionaryPayload{done: make(chan struct{})})
payload := entry.(*DictionaryPayload)
return func() (any, error) {
<-payload.done
return payload.res, payload.err
}, !loaded
}
func (dc *dictionaryCache) set(path string, offset uint64, length uint32, res any, err error) {
key := DictionaryKey{path, offset, length}
entry, _ := dc.dictionaries.LoadOrStore(key, &DictionaryPayload{done: make(chan struct{})})
payload := entry.(*DictionaryPayload)
payload.res = res
payload.err = err
close(payload.done)
return payload, loaded
}

View File

@@ -379,10 +379,11 @@ type GetRange struct {
}
type Range struct {
Hash []byte
Offset uint64
Length uint32
GetDict func() (any, error)
Hash []byte
Offset uint64
Length uint32
DictOffset uint64
DictLength uint32
}
func ResourcePath(urlS string) string {

View File

@@ -35,12 +35,13 @@ import (
// some state which allows them to fetch those dictionaries from a shared
// chache when they need them. That is their GetDict callback.
type GetRange struct {
Url string
Hash []byte
Offset uint64
Length uint32
GetDict func() (any, error)
Region *Region
Url string
Hash []byte
Offset uint64
Length uint32
DictOffset uint64
DictLength uint32
Region *Region
}
// A |Region| represents a continuous range of bytes within in a Url.
@@ -153,13 +154,14 @@ func (t *Tree) Len() int {
return t.t.Len()
}
func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, getDict func() (any, error)) {
func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, dictOffset uint64, dictLength uint32) {
ins := &GetRange{
Url: t.intern(url),
Hash: hash,
Offset: offset,
Length: length,
GetDict: getDict,
Url: t.intern(url),
Hash: hash,
Offset: offset,
Length: length,
DictOffset: dictOffset,
DictLength: dictLength,
}
t.t.ReplaceOrInsert(ins)

View File

@@ -77,11 +77,11 @@ func TestTree(t *testing.T) {
tree := NewTree(8 * 1024)
// Insert 1KB ranges every 16 KB.
for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 {
tree.Insert("A", []byte{}, uint64(j), 1024, nil)
tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0)
}
// Insert 1KB ranges every 16 KB, offset by 8KB.
for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 {
tree.Insert("A", []byte{}, uint64(i), 1024, nil)
tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0)
}
assertTree(t, tree)
})
@@ -89,11 +89,11 @@ func TestTree(t *testing.T) {
tree := NewTree(8 * 1024)
// Insert 1KB ranges every 16 KB, offset by 8KB.
for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 {
tree.Insert("A", []byte{}, uint64(i), 1024, nil)
tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0)
}
// Insert 1KB ranges every 16 KB.
for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 {
tree.Insert("A", []byte{}, uint64(j), 1024, nil)
tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0)
}
assertTree(t, tree)
})
@@ -111,7 +111,7 @@ func TestTree(t *testing.T) {
})
tree := NewTree(8 * 1024)
for _, offset := range entries {
tree.Insert("A", []byte{}, offset, 1024, nil)
tree.Insert("A", []byte{}, offset, 1024, 0, 0)
}
assertTree(t, tree)
}
@@ -126,7 +126,7 @@ func TestTree(t *testing.T) {
"B", "A", "9", "8",
}
for i, j := 0, 0; i < 16; i, j = i+1, j+1024 {
tree.Insert(files[i], []byte{}, uint64(j), 1024, nil)
tree.Insert(files[i], []byte{}, uint64(j), 1024, 0, 0)
}
assert.Equal(t, 16, tree.regions.Len())
assert.Equal(t, 16, tree.t.Len())
@@ -134,17 +134,17 @@ func TestTree(t *testing.T) {
t.Run("MergeInMiddle", func(t *testing.T) {
tree := NewTree(8 * 1024)
// 1KB chunk at byte 0
tree.Insert("A", []byte{}, 0, 1024, nil)
tree.Insert("A", []byte{}, 0, 1024, 0, 0)
// 1KB chunk at byte 16KB
tree.Insert("A", []byte{}, 16384, 1024, nil)
tree.Insert("A", []byte{}, 16384, 1024, 0, 0)
assert.Equal(t, 2, tree.regions.Len())
assert.Equal(t, 2, tree.t.Len())
// 1KB chunk at byte 8KB
tree.Insert("A", []byte{}, 8192, 1024, nil)
tree.Insert("A", []byte{}, 8192, 1024, 0, 0)
assert.Equal(t, 1, tree.regions.Len())
assert.Equal(t, 3, tree.t.Len())
tree.Insert("A", []byte{}, 4096, 1024, nil)
tree.Insert("A", []byte{}, 12228, 1024, nil)
tree.Insert("A", []byte{}, 4096, 1024, 0, 0)
tree.Insert("A", []byte{}, 12228, 1024, 0, 0)
assert.Equal(t, 1, tree.regions.Len())
assert.Equal(t, 5, tree.t.Len())
e, _ := tree.t.Min()
@@ -184,7 +184,7 @@ func TestTree(t *testing.T) {
t.Run("InsertAscending", func(t *testing.T) {
tree := NewTree(4 * 1024)
for _, e := range entries {
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil)
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
}
assertTree(t, tree)
})
@@ -192,7 +192,7 @@ func TestTree(t *testing.T) {
tree := NewTree(4 * 1024)
for i := len(entries) - 1; i >= 0; i-- {
e := entries[i]
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil)
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
}
assertTree(t, tree)
})
@@ -205,7 +205,7 @@ func TestTree(t *testing.T) {
})
tree := NewTree(4 * 1024)
for _, e := range entries {
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil)
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
}
assertTree(t, tree)
}