mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-29 11:31:28 -05:00
go: remotestorage: Rework how dictionary fetching and dictionary cache is populated.
This commit is contained in:
@@ -22,9 +22,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/dolthub/gozstd"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
@@ -72,14 +70,7 @@ const (
|
||||
reliableCallDeliverRespTimeout = 15 * time.Second
|
||||
)
|
||||
|
||||
var globalDictCache *dictionaryCache
|
||||
var once sync.Once
|
||||
|
||||
func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher {
|
||||
once.Do(func() {
|
||||
globalDictCache = NewDictionaryCache(dcs.csClient)
|
||||
})
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
ret := &ChunkFetcher{
|
||||
eg: eg,
|
||||
@@ -345,8 +336,11 @@ func getMissingChunks(req *remotesapi.GetDownloadLocsRequest, resp *remotesapi.G
|
||||
}
|
||||
|
||||
type fetchResp struct {
|
||||
get *GetRange
|
||||
refresh func(ctx context.Context, err error, client remotesapi.ChunkStoreServiceClient) (string, error)
|
||||
get *GetRange
|
||||
refresh func(ctx context.Context, err error, client remotesapi.ChunkStoreServiceClient) (string, error)
|
||||
rangeType rangeType
|
||||
dictCache *dictionaryCache
|
||||
path string
|
||||
}
|
||||
|
||||
type fetchReq struct {
|
||||
@@ -357,20 +351,24 @@ type fetchReq struct {
|
||||
// A simple structure to keep track of *GetRange requests along with
|
||||
// |locationRefreshes| for the URL paths we have seen.
|
||||
type downloads struct {
|
||||
ranges *ranges.Tree
|
||||
refreshes map[string]*locationRefresh
|
||||
chunkRanges *ranges.Tree
|
||||
dictRanges *ranges.Tree
|
||||
dictCache *dictionaryCache
|
||||
refreshes map[string]*locationRefresh
|
||||
}
|
||||
|
||||
func newDownloads() downloads {
|
||||
return downloads{
|
||||
ranges: ranges.NewTree(chunkAggDistance),
|
||||
refreshes: make(map[string]*locationRefresh),
|
||||
chunkRanges: ranges.NewTree(chunkAggDistance),
|
||||
dictRanges: ranges.NewTree(chunkAggDistance),
|
||||
dictCache: &dictionaryCache{},
|
||||
refreshes: make(map[string]*locationRefresh),
|
||||
}
|
||||
}
|
||||
|
||||
func (d downloads) Add(resp *remotesapi.DownloadLoc) {
|
||||
gr := (*GetRange)(resp.Location.(*remotesapi.DownloadLoc_HttpGetRange).HttpGetRange)
|
||||
path := gr.ResourcePath()
|
||||
hgr := resp.Location.(*remotesapi.DownloadLoc_HttpGetRange).HttpGetRange
|
||||
path := ResourcePath(hgr.Url)
|
||||
if v, ok := d.refreshes[path]; ok {
|
||||
v.Add(resp)
|
||||
} else {
|
||||
@@ -378,8 +376,16 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) {
|
||||
refresh.Add(resp)
|
||||
d.refreshes[path] = refresh
|
||||
}
|
||||
for _, r := range gr.Ranges {
|
||||
d.ranges.Insert(gr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength)
|
||||
for _, r := range hgr.Ranges {
|
||||
var getDict func() (any, error)
|
||||
if r.DictionaryLength != 0 {
|
||||
var first bool
|
||||
getDict, first = d.dictCache.get(path, r.DictionaryOffset, r.DictionaryLength)
|
||||
if first {
|
||||
d.dictRanges.Insert(hgr.Url, nil, r.DictionaryOffset, r.DictionaryLength, nil)
|
||||
}
|
||||
}
|
||||
d.chunkRanges.Insert(hgr.Url, r.Hash[:], r.Offset, r.Length, getDict)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,22 +393,30 @@ func toGetRange(rs []*ranges.GetRange) *GetRange {
|
||||
ret := new(GetRange)
|
||||
for _, r := range rs {
|
||||
ret.Url = r.Url
|
||||
ret.Ranges = append(ret.Ranges, &remotesapi.RangeChunk{
|
||||
Hash: r.Hash,
|
||||
Offset: r.Offset,
|
||||
Length: r.Length,
|
||||
DictionaryOffset: r.DictionaryOffset,
|
||||
DictionaryLength: r.DictionaryLength,
|
||||
ret.Ranges = append(ret.Ranges, &Range{
|
||||
Hash: r.Hash,
|
||||
Offset: r.Offset,
|
||||
Length: r.Length,
|
||||
GetDict: r.GetDict,
|
||||
})
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
type rangeType int
|
||||
|
||||
const (
|
||||
rangeType_Chunk rangeType = iota
|
||||
rangeType_Dictionary
|
||||
)
|
||||
|
||||
// Reads off |locCh| and assembles DownloadLocs into download ranges.
|
||||
func fetcherDownloadRangesThread(ctx context.Context, locCh chan []*remotesapi.DownloadLoc, fetchReqCh chan fetchReq, doneCh chan struct{}) error {
|
||||
downloads := newDownloads()
|
||||
pending := make([]fetchReq, 0)
|
||||
var toSend *GetRange
|
||||
var toSendType rangeType
|
||||
|
||||
for {
|
||||
// pending is our slice of request threads that showed up
|
||||
// asking for a download. We range through it and try to send
|
||||
@@ -413,11 +427,16 @@ func fetcherDownloadRangesThread(ctx context.Context, locCh chan []*remotesapi.D
|
||||
// can get the next range to download from
|
||||
// |downloads.ranges|.
|
||||
if toSend == nil {
|
||||
max := downloads.ranges.DeleteMaxRegion()
|
||||
max := downloads.dictRanges.DeleteMaxRegion()
|
||||
if len(max) == 0 {
|
||||
break
|
||||
max = downloads.chunkRanges.DeleteMaxRegion()
|
||||
if len(max) == 0 {
|
||||
break
|
||||
}
|
||||
toSend, toSendType = toGetRange(max), rangeType_Chunk
|
||||
} else {
|
||||
toSend, toSendType = toGetRange(max), rangeType_Dictionary
|
||||
}
|
||||
toSend = toGetRange(max)
|
||||
}
|
||||
path := toSend.ResourcePath()
|
||||
refresh := downloads.refreshes[path]
|
||||
@@ -427,6 +446,9 @@ func fetcherDownloadRangesThread(ctx context.Context, locCh chan []*remotesapi.D
|
||||
refresh: func(ctx context.Context, err error, client remotesapi.ChunkStoreServiceClient) (string, error) {
|
||||
return refresh.GetURL(ctx, err, client)
|
||||
},
|
||||
rangeType: toSendType,
|
||||
path: path,
|
||||
dictCache: downloads.dictCache,
|
||||
}
|
||||
|
||||
select {
|
||||
@@ -462,7 +484,7 @@ func fetcherDownloadRangesThread(ctx context.Context, locCh chan []*remotesapi.D
|
||||
// nil and our ranges Tree is empty, then we have delivered
|
||||
// every download we will ever see to a download thread. We can
|
||||
// close |doneCh| and return nil.
|
||||
if locCh == nil && downloads.ranges.Len() == 0 {
|
||||
if locCh == nil && downloads.chunkRanges.Len() == 0 && downloads.dictRanges.Len() == 0 {
|
||||
close(doneCh)
|
||||
return nil
|
||||
}
|
||||
@@ -595,7 +617,50 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
case fetchResp := <-respCh:
|
||||
f := fetchResp.get.GetDownloadFunc(ctx, stats, health, fetcher, params, chunkCh, func(ctx context.Context, lastError error, resourcePath string) (string, error) {
|
||||
var i int
|
||||
var cb func(context.Context, []byte) error
|
||||
if fetchResp.rangeType == rangeType_Chunk {
|
||||
cb = func(ctx context.Context, bs []byte) error {
|
||||
rng := fetchResp.get.Ranges[i]
|
||||
i += 1
|
||||
h := hash.New(rng.Hash[:])
|
||||
var cc nbs.ToChunker
|
||||
if rng.GetDict != nil {
|
||||
dictRes, err := rng.GetDict()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cc = nbs.NewArchiveToChunker(h, dictRes.(*gozstd.DDict), bs)
|
||||
} else {
|
||||
var err error
|
||||
cc, err = nbs.NewCompressedChunk(h, bs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
select {
|
||||
case chunkCh <- cc:
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
cb = func(ctx context.Context, bs []byte) error {
|
||||
rng := fetchResp.get.Ranges[i]
|
||||
i += 1
|
||||
var ddict *gozstd.DDict
|
||||
decompressed, err := gozstd.Decompress(nil, bs)
|
||||
if err == nil {
|
||||
ddict, err = gozstd.NewDDict(decompressed)
|
||||
}
|
||||
fetchResp.dictCache.set(fetchResp.path, rng.Offset, rng.Length, ddict, err)
|
||||
// XXX: For now, we fail here on any error, instead of when we try to use the dictionary...
|
||||
// For now, the record in the cache will be terminally failed and is never removed.
|
||||
return err
|
||||
}
|
||||
}
|
||||
f := fetchResp.get.GetDownloadFunc(ctx, stats, health, fetcher, params, cb, func(ctx context.Context, lastError error, resourcePath string) (string, error) {
|
||||
return fetchResp.refresh(ctx, lastError, client)
|
||||
})
|
||||
err := f()
|
||||
@@ -607,17 +672,8 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don
|
||||
}
|
||||
}
|
||||
|
||||
// dictionaryCache caches dictionaries for the chunks in an archive store. When we fetch from a database with an archive,
|
||||
// we get back the path/offset/length of the dictionary for each chunk. These, by definition, are repeatedly used
|
||||
// and we don't want to request the same dictionary multiple times.
|
||||
//
|
||||
// Currently (feb '25), archives generally have only a default dictionary, so this is kind of overkill. Mainly planning
|
||||
// for the future when chunk grouping is the default and we could have thousands of dictionaries.
|
||||
type dictionaryCache struct {
|
||||
cache *lru.TwoQueueCache[DictionaryKey, *gozstd.DDict]
|
||||
pending sync.Map
|
||||
client remotesapi.ChunkStoreServiceClient
|
||||
dlds downloads
|
||||
dictionaries sync.Map
|
||||
}
|
||||
|
||||
// DictionaryKey is the a globaly unique identifier for an archive dictionary.
|
||||
@@ -629,118 +685,27 @@ type DictionaryKey struct {
|
||||
len uint32
|
||||
}
|
||||
|
||||
func NewDictionaryCache(client remotesapi.ChunkStoreServiceClient) *dictionaryCache {
|
||||
c, err := lru.New2Q[DictionaryKey, *gozstd.DDict](1024)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &dictionaryCache{
|
||||
cache: c,
|
||||
client: client,
|
||||
dlds: newDownloads(),
|
||||
}
|
||||
type DictionaryPayload struct {
|
||||
done chan struct{}
|
||||
res any
|
||||
err error
|
||||
}
|
||||
|
||||
func (dc *dictionaryCache) get(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
|
||||
path := rang.ResourcePath()
|
||||
off := rang.Ranges[idx].DictionaryOffset
|
||||
ln := rang.Ranges[idx].DictionaryLength
|
||||
|
||||
key := DictionaryKey{path, off, ln}
|
||||
if dict, ok := dc.cache.Get(key); ok {
|
||||
return dict, nil
|
||||
}
|
||||
|
||||
// Check for an in-flight request. Default dictionary will be requested many times, so we want to avoid
|
||||
// making multiple requests for the same resource.
|
||||
if ch, loaded := dc.pending.LoadOrStore(key, make(chan struct{})); loaded {
|
||||
// There's an ongoing fetch, wait for its completion
|
||||
<-ch.(chan struct{})
|
||||
if dict, ok := dc.cache.Get(key); ok {
|
||||
return dict, nil
|
||||
}
|
||||
return nil, errors.New("failed to fetch dictionary due to in-flight request")
|
||||
}
|
||||
// When update is done, regardless of success or failure, we need to unblock anyone waiting.
|
||||
defer func() {
|
||||
if ch, found := dc.pending.LoadAndDelete(key); found {
|
||||
close(ch.(chan struct{}))
|
||||
}
|
||||
}()
|
||||
|
||||
// Fetch the dictionary
|
||||
ddict, err := dc.fetchDictionary(path, rang.Url, off, ln, stats, recorder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Store the dictionary in the cache
|
||||
dc.cache.Add(key, ddict)
|
||||
|
||||
return ddict, nil
|
||||
func (dc *dictionaryCache) get(path string, offset uint64, length uint32) (func() (any, error), bool) {
|
||||
key := DictionaryKey{path, offset, length}
|
||||
entry, loaded := dc.dictionaries.LoadOrStore(key, &DictionaryPayload{done: make(chan struct{})})
|
||||
payload := entry.(*DictionaryPayload)
|
||||
return func() (any, error) {
|
||||
<-payload.done
|
||||
return payload.res, payload.err
|
||||
}, !loaded
|
||||
}
|
||||
|
||||
// fetchDictionary performs an GET request for a single span which is for a zstd dictionary.
|
||||
func (dc *dictionaryCache) fetchDictionary(path, url string, off uint64, ln uint32, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
|
||||
ctx := context.Background()
|
||||
pathToUrl := dc.dlds.refreshes[path]
|
||||
if pathToUrl == nil {
|
||||
// We manually construct the RangeChunk and DownloadLoc in this case because we are retrieving the dictionary span.
|
||||
// We'll make a single span request, and consume the entire response to create the dictionary.
|
||||
sRang := &remotesapi.HttpGetRange{}
|
||||
sRang.Url = url
|
||||
sRang.Ranges = append(sRang.Ranges, &remotesapi.RangeChunk{Offset: off, Length: ln})
|
||||
rang := &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: sRang}
|
||||
dl := &remotesapi.DownloadLoc{Location: rang}
|
||||
|
||||
refresh := new(locationRefresh)
|
||||
refresh.Add(dl)
|
||||
dc.dlds.refreshes[path] = refresh
|
||||
pathToUrl = refresh
|
||||
}
|
||||
|
||||
urlF := func(lastError error) (string, error) {
|
||||
earl, err := pathToUrl.GetURL(ctx, lastError, dc.client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if earl == "" {
|
||||
earl = path
|
||||
}
|
||||
return earl, nil
|
||||
}
|
||||
|
||||
resp := reliable.StreamingRangeDownload(ctx, reliable.StreamingRangeRequest{
|
||||
Fetcher: globalHttpFetcher,
|
||||
Offset: off,
|
||||
Length: uint64(ln),
|
||||
UrlFact: urlF,
|
||||
Stats: stats,
|
||||
Health: recorder,
|
||||
BackOffFact: func(ctx context.Context) backoff.BackOff {
|
||||
return downloadBackOff(ctx, defaultRequestParams.DownloadRetryCount)
|
||||
},
|
||||
Throughput: reliable.MinimumThroughputCheck{
|
||||
CheckInterval: defaultRequestParams.ThroughputMinimumCheckInterval,
|
||||
BytesPerCheck: defaultRequestParams.ThroughputMinimumBytesPerCheck,
|
||||
NumIntervals: defaultRequestParams.ThroughputMinimumNumIntervals,
|
||||
},
|
||||
RespHeadersTimeout: defaultRequestParams.RespHeadersTimeout,
|
||||
})
|
||||
defer resp.Close()
|
||||
|
||||
buf := make([]byte, ln)
|
||||
_, err := io.ReadFull(resp.Body, buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Dictionaries are compressed, but with vanilla zstd, so there is no dictionary.
|
||||
rawDict, err := gozstd.Decompress(nil, buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return gozstd.NewDDict(rawDict)
|
||||
func (dc *dictionaryCache) set(path string, offset uint64, length uint32, res any, err error) {
|
||||
key := DictionaryKey{path, offset, length}
|
||||
entry, _ := dc.dictionaries.LoadOrStore(key, &DictionaryPayload{done: make(chan struct{})})
|
||||
payload := entry.(*DictionaryPayload)
|
||||
payload.res = res
|
||||
payload.err = err
|
||||
close(payload.done)
|
||||
}
|
||||
|
||||
@@ -373,11 +373,25 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
|
||||
|
||||
// GetRange is structurally the same as remotesapi.HttpGetRange, but with added functions. Instances of GetRange
|
||||
// don't get sent over the wire, so it is not necessary to use the remotesapi, just convenient.
|
||||
type GetRange remotesapi.HttpGetRange
|
||||
type GetRange struct {
|
||||
Url string
|
||||
Ranges []*Range
|
||||
}
|
||||
|
||||
type Range struct {
|
||||
Hash []byte
|
||||
Offset uint64
|
||||
Length uint32
|
||||
GetDict func() (any, error)
|
||||
}
|
||||
|
||||
func ResourcePath(urlS string) string {
|
||||
u, _ := url.Parse(urlS)
|
||||
return fmt.Sprintf("%s://%s%s", u.Scheme, u.Host, u.Path)
|
||||
}
|
||||
|
||||
func (gr *GetRange) ResourcePath() string {
|
||||
u, _ := url.Parse(gr.Url)
|
||||
return fmt.Sprintf("%s://%s%s", u.Scheme, u.Host, u.Path)
|
||||
return ResourcePath(gr.Url)
|
||||
}
|
||||
|
||||
func (gr *GetRange) Append(other *GetRange) {
|
||||
@@ -434,7 +448,7 @@ func sortRangesBySize(ranges []*GetRange) {
|
||||
|
||||
type resourcePathToUrlFunc func(ctx context.Context, lastError error, resourcePath string) (url string, err error)
|
||||
|
||||
func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams, chunkChan chan nbs.ToChunker, pathToUrl resourcePathToUrlFunc) func() error {
|
||||
func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams, resCb func(context.Context, []byte) error, pathToUrl resourcePathToUrlFunc) func() error {
|
||||
if len(gr.Ranges) == 0 {
|
||||
return func() error { return nil }
|
||||
}
|
||||
@@ -469,7 +483,7 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he
|
||||
RespHeadersTimeout: params.RespHeadersTimeout,
|
||||
})
|
||||
defer resp.Close()
|
||||
reader := &RangeChunkReader{Path: gr.ResourcePath(), GetRange: gr, Reader: resp.Body}
|
||||
reader := &RangeChunkReader{GetRange: gr, Reader: resp.Body}
|
||||
for {
|
||||
cc, err := reader.ReadChunk(stats, health)
|
||||
if errors.Is(err, io.EOF) {
|
||||
@@ -478,28 +492,26 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, he
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case chunkChan <- cc:
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
err = resCb(ctx, cc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type RangeChunkReader struct {
|
||||
Path string
|
||||
GetRange *GetRange
|
||||
Reader io.Reader
|
||||
i int
|
||||
skip int
|
||||
}
|
||||
|
||||
func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.HealthRecorder) (nbs.ToChunker, error) {
|
||||
func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.HealthRecorder) ([]byte, error) {
|
||||
if r.skip > 0 {
|
||||
_, err := io.CopyN(io.Discard, r.Reader, int64(r.skip))
|
||||
if err != nil {
|
||||
return nbs.CompressedChunk{}, err
|
||||
return nil, err
|
||||
}
|
||||
r.skip = 0
|
||||
}
|
||||
@@ -508,7 +520,7 @@ func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.Health
|
||||
r.i += 1
|
||||
|
||||
if idx >= len(r.GetRange.Ranges) {
|
||||
return nbs.CompressedChunk{}, io.EOF
|
||||
return nil, io.EOF
|
||||
}
|
||||
if idx < len(r.GetRange.Ranges)-1 {
|
||||
r.skip = int(r.GetRange.GapBetween(idx, idx+1))
|
||||
@@ -516,24 +528,13 @@ func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.Health
|
||||
|
||||
rang := r.GetRange.Ranges[idx]
|
||||
l := rang.Length
|
||||
h := hash.New(rang.Hash)
|
||||
|
||||
buf := make([]byte, l)
|
||||
_, err := io.ReadFull(r.Reader, buf)
|
||||
if err != nil {
|
||||
return nbs.CompressedChunk{}, err
|
||||
} else {
|
||||
if rang.DictionaryLength == 0 {
|
||||
return nbs.NewCompressedChunk(h, buf)
|
||||
} else {
|
||||
dict, err := globalDictCache.get(r.GetRange, idx, stats, health)
|
||||
if err != nil {
|
||||
return nbs.CompressedChunk{}, err
|
||||
}
|
||||
|
||||
return nbs.NewArchiveToChunker(h, dict, buf), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
type locationRefresh struct {
|
||||
|
||||
@@ -21,23 +21,26 @@ import (
|
||||
"github.com/google/btree"
|
||||
)
|
||||
|
||||
// GetRange represents a way to get the contents for a Chunk from a given Url
|
||||
// with an HTTP Range request. The chunk with hash |Hash| can be fetched using
|
||||
// GetRange represents a range of remote data that has semantic meaning to the
|
||||
// ChunkFetcher. These ranges are currently either Chunks, or Dictionaries.
|
||||
// They can be fetched from the remote URL with an HTTP Range request.
|
||||
// For a chunk range, the chunk with hash |Hash| can be fetched using
|
||||
// the |Url| with a Range request starting at |Offset| and reading |Length|
|
||||
// bytes.
|
||||
// bytes. A Dictionary does not have a meaningful Hash, but its identity is
|
||||
// unique for a Url and Offset.
|
||||
//
|
||||
// A |GetRange| struct is a member of a |Region| in the |RegionHeap|.
|
||||
//
|
||||
// Chunk |GetRange|s which depend on Dictionaries can be constructed with
|
||||
// some state which allows them to fetch those dictionaries from a shared
|
||||
// chache when they need them. That is their GetDict callback.
|
||||
type GetRange struct {
|
||||
Url string
|
||||
Hash []byte
|
||||
Offset uint64
|
||||
Length uint32
|
||||
Region *Region
|
||||
|
||||
// Archive file format requires the url/dictionary offset/length to be carried through to fully resolve the chunk.
|
||||
// This information is not used withing the range calculations at all, as the range is not related to the chunk content.
|
||||
DictionaryOffset uint64
|
||||
DictionaryLength uint32
|
||||
Url string
|
||||
Hash []byte
|
||||
Offset uint64
|
||||
Length uint32
|
||||
GetDict func() (any, error)
|
||||
Region *Region
|
||||
}
|
||||
|
||||
// A |Region| represents a continuous range of bytes within in a Url.
|
||||
@@ -150,14 +153,13 @@ func (t *Tree) Len() int {
|
||||
return t.t.Len()
|
||||
}
|
||||
|
||||
func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, dictOffset uint64, dictLength uint32) {
|
||||
func (t *Tree) Insert(url string, hash []byte, offset uint64, length uint32, getDict func() (any, error)) {
|
||||
ins := &GetRange{
|
||||
Url: t.intern(url),
|
||||
Hash: hash,
|
||||
Offset: offset,
|
||||
Length: length,
|
||||
DictionaryOffset: dictOffset,
|
||||
DictionaryLength: dictLength,
|
||||
Url: t.intern(url),
|
||||
Hash: hash,
|
||||
Offset: offset,
|
||||
Length: length,
|
||||
GetDict: getDict,
|
||||
}
|
||||
t.t.ReplaceOrInsert(ins)
|
||||
|
||||
|
||||
@@ -77,11 +77,11 @@ func TestTree(t *testing.T) {
|
||||
tree := NewTree(8 * 1024)
|
||||
// Insert 1KB ranges every 16 KB.
|
||||
for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 {
|
||||
tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, uint64(j), 1024, nil)
|
||||
}
|
||||
// Insert 1KB ranges every 16 KB, offset by 8KB.
|
||||
for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 {
|
||||
tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, uint64(i), 1024, nil)
|
||||
}
|
||||
assertTree(t, tree)
|
||||
})
|
||||
@@ -89,11 +89,11 @@ func TestTree(t *testing.T) {
|
||||
tree := NewTree(8 * 1024)
|
||||
// Insert 1KB ranges every 16 KB, offset by 8KB.
|
||||
for i := 15*16*1024 + 8*1024; i >= 0; i -= 16 * 1024 {
|
||||
tree.Insert("A", []byte{}, uint64(i), 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, uint64(i), 1024, nil)
|
||||
}
|
||||
// Insert 1KB ranges every 16 KB.
|
||||
for i, j := 0, 0; i < 16; i, j = i+1, j+16*1024 {
|
||||
tree.Insert("A", []byte{}, uint64(j), 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, uint64(j), 1024, nil)
|
||||
}
|
||||
assertTree(t, tree)
|
||||
})
|
||||
@@ -111,7 +111,7 @@ func TestTree(t *testing.T) {
|
||||
})
|
||||
tree := NewTree(8 * 1024)
|
||||
for _, offset := range entries {
|
||||
tree.Insert("A", []byte{}, offset, 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, offset, 1024, nil)
|
||||
}
|
||||
assertTree(t, tree)
|
||||
}
|
||||
@@ -126,7 +126,7 @@ func TestTree(t *testing.T) {
|
||||
"B", "A", "9", "8",
|
||||
}
|
||||
for i, j := 0, 0; i < 16; i, j = i+1, j+1024 {
|
||||
tree.Insert(files[i], []byte{}, uint64(j), 1024, 0, 0)
|
||||
tree.Insert(files[i], []byte{}, uint64(j), 1024, nil)
|
||||
}
|
||||
assert.Equal(t, 16, tree.regions.Len())
|
||||
assert.Equal(t, 16, tree.t.Len())
|
||||
@@ -134,17 +134,17 @@ func TestTree(t *testing.T) {
|
||||
t.Run("MergeInMiddle", func(t *testing.T) {
|
||||
tree := NewTree(8 * 1024)
|
||||
// 1KB chunk at byte 0
|
||||
tree.Insert("A", []byte{}, 0, 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, 0, 1024, nil)
|
||||
// 1KB chunk at byte 16KB
|
||||
tree.Insert("A", []byte{}, 16384, 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, 16384, 1024, nil)
|
||||
assert.Equal(t, 2, tree.regions.Len())
|
||||
assert.Equal(t, 2, tree.t.Len())
|
||||
// 1KB chunk at byte 8KB
|
||||
tree.Insert("A", []byte{}, 8192, 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, 8192, 1024, nil)
|
||||
assert.Equal(t, 1, tree.regions.Len())
|
||||
assert.Equal(t, 3, tree.t.Len())
|
||||
tree.Insert("A", []byte{}, 4096, 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, 12228, 1024, 0, 0)
|
||||
tree.Insert("A", []byte{}, 4096, 1024, nil)
|
||||
tree.Insert("A", []byte{}, 12228, 1024, nil)
|
||||
assert.Equal(t, 1, tree.regions.Len())
|
||||
assert.Equal(t, 5, tree.t.Len())
|
||||
e, _ := tree.t.Min()
|
||||
@@ -184,7 +184,7 @@ func TestTree(t *testing.T) {
|
||||
t.Run("InsertAscending", func(t *testing.T) {
|
||||
tree := NewTree(4 * 1024)
|
||||
for _, e := range entries {
|
||||
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
|
||||
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil)
|
||||
}
|
||||
assertTree(t, tree)
|
||||
})
|
||||
@@ -192,7 +192,7 @@ func TestTree(t *testing.T) {
|
||||
tree := NewTree(4 * 1024)
|
||||
for i := len(entries) - 1; i >= 0; i-- {
|
||||
e := entries[i]
|
||||
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
|
||||
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil)
|
||||
}
|
||||
assertTree(t, tree)
|
||||
})
|
||||
@@ -205,7 +205,7 @@ func TestTree(t *testing.T) {
|
||||
})
|
||||
tree := NewTree(4 * 1024)
|
||||
for _, e := range entries {
|
||||
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, 0, 0)
|
||||
tree.Insert(e.url, []byte{e.id}, e.offset, e.length, nil)
|
||||
}
|
||||
assertTree(t, tree)
|
||||
}
|
||||
|
||||
@@ -258,8 +258,7 @@ mutations_and_gc_statement() {
|
||||
[[ "$remotesrv_pid" -gt 0 ]] || false
|
||||
|
||||
cd ../cloned/repo1
|
||||
run dolt fetch
|
||||
[ "$status" -eq 0 ]
|
||||
dolt fetch
|
||||
|
||||
run dolt status
|
||||
[ "$status" -eq 0 ]
|
||||
|
||||
Reference in New Issue
Block a user