mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-26 19:46:39 -05:00
Merge pull request #8861 from dolthub/aaron/remotestorage-cache-cleanup
[no-release-notes] go: remotestorage: chunk_store.go: Clean up ChunkCache.
This commit is contained in:
@@ -19,22 +19,21 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
)
|
||||
|
||||
// ChunkCache is an interface used for caching chunks
|
||||
// ChunkCache is an interface used for caching chunks and has presence that
|
||||
// has already been fetched from remotestorage. Care should be taken when
|
||||
// using ChunkCache if it is possible for the remote to GC, since in that
|
||||
// case the cache could contain stale data.
|
||||
type ChunkCache interface {
|
||||
// Put puts a slice of chunks into the cache. Error returned if the cache capacity has been exceeded.
|
||||
Put(c []nbs.ToChunker) error
|
||||
// Insert some observed / fetched chunks into the cache. These
|
||||
// chunks may or may not be returned in the future.
|
||||
InsertChunks(cs []nbs.ToChunker)
|
||||
// Get previously cached chunks, if they are still available.
|
||||
GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.ToChunker
|
||||
|
||||
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
|
||||
// is put in it's place
|
||||
Get(h hash.HashSet) map[hash.Hash]nbs.ToChunker
|
||||
|
||||
// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
|
||||
Has(h hash.HashSet) (absent hash.HashSet)
|
||||
|
||||
// PutChunk puts a single chunk in the cache. Returns an error if the cache capacity has been exceeded.
|
||||
PutChunk(chunk nbs.ToChunker) error
|
||||
|
||||
// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
|
||||
// between the last time GetAndClearChunksToFlush was called and now.
|
||||
GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker
|
||||
// Insert all hashes in |h| as existing in the remote.
|
||||
InsertHas(h hash.HashSet)
|
||||
// Returns the absent set from |h|, filtering it by records
|
||||
// which are known to be present in the remote based on
|
||||
// previous |InsertHas| calls.
|
||||
GetCachedHas(h hash.HashSet) (absent hash.HashSet)
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
var ErrCacheCapacityExceeded = errors.New("too much data: the cache capacity has been reached")
|
||||
var ErrWriteBufferCapacityExceeded = errors.New("too much data: the write buffer capacity has been reached")
|
||||
|
||||
var ErrUploadFailed = errors.New("upload failed")
|
||||
|
||||
@@ -123,6 +123,7 @@ type DoltChunkStore struct {
|
||||
root hash.Hash
|
||||
csClient remotesapi.ChunkStoreServiceClient
|
||||
finalizer func() error
|
||||
wb WriteBuffer
|
||||
cache ChunkCache
|
||||
metadata *remotesapi.GetRepoMetadataResponse
|
||||
nbf *types.NomsBinFormat
|
||||
@@ -172,6 +173,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
|
||||
csClient: csClient,
|
||||
finalizer: func() error { return nil },
|
||||
cache: newMapChunkCache(),
|
||||
wb: newMapWriteBuffer(),
|
||||
metadata: metadata,
|
||||
nbf: nbf,
|
||||
httpFetcher: globalHttpFetcher,
|
||||
@@ -185,79 +187,46 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) clone() *DoltChunkStore {
|
||||
ret := *dcs
|
||||
ret.repoToken = new(atomic.Value)
|
||||
return &ret
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
repoId: dcs.repoId,
|
||||
repoPath: dcs.repoPath,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
root: dcs.root,
|
||||
csClient: dcs.csClient,
|
||||
finalizer: dcs.finalizer,
|
||||
cache: dcs.cache,
|
||||
metadata: dcs.metadata,
|
||||
nbf: dcs.nbf,
|
||||
httpFetcher: fetcher,
|
||||
params: dcs.params,
|
||||
stats: dcs.stats,
|
||||
}
|
||||
ret := dcs.clone()
|
||||
ret.httpFetcher = fetcher
|
||||
return ret
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) WithNoopWriteBuffer() *DoltChunkStore {
|
||||
ret := dcs.clone()
|
||||
ret.wb = noopWriteBuffer{}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) WithWriteBuffer(wb WriteBuffer) *DoltChunkStore {
|
||||
ret := dcs.clone()
|
||||
ret.wb = wb
|
||||
return ret
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
repoId: dcs.repoId,
|
||||
repoPath: dcs.repoPath,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
root: dcs.root,
|
||||
csClient: dcs.csClient,
|
||||
finalizer: dcs.finalizer,
|
||||
cache: noopChunkCache,
|
||||
metadata: dcs.metadata,
|
||||
nbf: dcs.nbf,
|
||||
httpFetcher: dcs.httpFetcher,
|
||||
params: dcs.params,
|
||||
stats: dcs.stats,
|
||||
logger: dcs.logger,
|
||||
}
|
||||
ret := dcs.clone()
|
||||
ret.cache = noopChunkCache
|
||||
return ret
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
repoId: dcs.repoId,
|
||||
repoPath: dcs.repoPath,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
root: dcs.root,
|
||||
csClient: dcs.csClient,
|
||||
finalizer: dcs.finalizer,
|
||||
cache: cache,
|
||||
metadata: dcs.metadata,
|
||||
nbf: dcs.nbf,
|
||||
httpFetcher: dcs.httpFetcher,
|
||||
params: dcs.params,
|
||||
stats: dcs.stats,
|
||||
logger: dcs.logger,
|
||||
}
|
||||
ret := dcs.clone()
|
||||
ret.cache = cache
|
||||
return ret
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) WithNetworkRequestParams(params NetworkRequestParams) *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
repoId: dcs.repoId,
|
||||
repoPath: dcs.repoPath,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
root: dcs.root,
|
||||
csClient: dcs.csClient,
|
||||
finalizer: dcs.finalizer,
|
||||
cache: dcs.cache,
|
||||
metadata: dcs.metadata,
|
||||
nbf: dcs.nbf,
|
||||
httpFetcher: dcs.httpFetcher,
|
||||
params: params,
|
||||
stats: dcs.stats,
|
||||
logger: dcs.logger,
|
||||
}
|
||||
ret := dcs.clone()
|
||||
ret.params = params
|
||||
return ret
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger) {
|
||||
@@ -344,19 +313,18 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
|
||||
ctx, span := tracer.Start(ctx, "remotestorage.GetManyCompressed")
|
||||
defer span.End()
|
||||
|
||||
hashToChunk := dcs.cache.Get(hashes)
|
||||
hashToChunk := dcs.cache.GetCachedChunks(hashes)
|
||||
dcs.wb.AddBufferedChunks(hashes, hashToChunk)
|
||||
|
||||
span.SetAttributes(attribute.Int("num_hashes", len(hashes)), attribute.Int("cache_hits", len(hashToChunk)))
|
||||
atomic.AddUint32(&dcs.stats.Hits, uint32(len(hashToChunk)))
|
||||
|
||||
notCached := make([]hash.Hash, 0, len(hashes))
|
||||
for h := range hashes {
|
||||
c := hashToChunk[h]
|
||||
|
||||
if c == nil || c.IsEmpty() {
|
||||
notCached = append(notCached, h)
|
||||
} else {
|
||||
if c, ok := hashToChunk[h]; ok {
|
||||
found(ctx, c)
|
||||
} else {
|
||||
notCached = append(notCached, h)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -633,9 +601,7 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash
|
||||
}
|
||||
// Don't forward on empty/not found chunks.
|
||||
if !cc.IsEmpty() {
|
||||
if err := dcs.cache.PutChunk(cc); err != nil {
|
||||
return err
|
||||
}
|
||||
dcs.cache.InsertChunks([]nbs.ToChunker{cc})
|
||||
found(egCtx, cc)
|
||||
}
|
||||
}
|
||||
@@ -663,8 +629,8 @@ const maxHasManyBatchSize = 16 * 1024
|
||||
// absent from the store.
|
||||
func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
|
||||
// get the set of hashes that isn't already in the cache
|
||||
notCached := dcs.cache.Has(hashes)
|
||||
|
||||
notCached := dcs.cache.GetCachedHas(hashes)
|
||||
dcs.wb.RemovePresentChunks(notCached)
|
||||
if len(notCached) == 0 {
|
||||
return notCached, nil
|
||||
}
|
||||
@@ -673,7 +639,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
|
||||
hashSl, byteSl := HashSetToSlices(notCached)
|
||||
|
||||
absent := make(hash.HashSet)
|
||||
var found []nbs.ToChunker
|
||||
found := make(hash.HashSet)
|
||||
var err error
|
||||
|
||||
batchItr(len(hashSl), maxHasManyBatchSize, func(st, end int) (stop bool) {
|
||||
@@ -714,8 +680,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
|
||||
absent[currHash] = struct{}{}
|
||||
j++
|
||||
} else {
|
||||
c := nbs.ChunkToCompressedChunk(chunks.NewChunkWithHash(currHash, []byte{}))
|
||||
found = append(found, c)
|
||||
found.Insert(currHash)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -731,9 +696,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
|
||||
}
|
||||
|
||||
if len(found) > 0 {
|
||||
if err := dcs.cache.Put(found); err != nil {
|
||||
return hash.HashSet{}, err
|
||||
}
|
||||
dcs.cache.InsertHas(found)
|
||||
}
|
||||
|
||||
return absent, nil
|
||||
@@ -767,7 +730,8 @@ func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chu
|
||||
}
|
||||
|
||||
cc := nbs.ChunkToCompressedChunk(c)
|
||||
if err := dcs.cache.Put([]nbs.ToChunker{cc}); err != nil {
|
||||
err = dcs.wb.Put(cc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -850,8 +814,26 @@ func (dcs *DoltChunkStore) loadRoot(ctx context.Context) error {
|
||||
// persisted root hash from last to current (or keeps it the same).
|
||||
// If last doesn't match the root in persistent storage, returns false.
|
||||
func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
|
||||
hashToChunkCount, err := dcs.uploadChunks(ctx)
|
||||
toUpload := dcs.wb.GetAllForWrite()
|
||||
var resp *remotesapi.CommitResponse
|
||||
defer func() {
|
||||
// We record success based on the CommitResponse
|
||||
// |Success| field, which is only |true| when the call
|
||||
// successfully updated the root hash of the
|
||||
// remote. With the current API, we cannot distinguish
|
||||
// the case where the commit failed because |last| was
|
||||
// stale but the provided chunks were still
|
||||
// successfully added to the remote. If the write is
|
||||
// retried in such a case, we will currently write the
|
||||
// chunks to the remote again.
|
||||
if resp != nil {
|
||||
dcs.wb.WriteCompleted(resp.Success)
|
||||
} else {
|
||||
dcs.wb.WriteCompleted(false)
|
||||
}
|
||||
}()
|
||||
|
||||
hashToChunkCount, err := dcs.uploadChunks(ctx, toUpload)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -873,7 +855,7 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
|
||||
NbsVersion: nbs.StorageVersion,
|
||||
},
|
||||
}
|
||||
resp, err := dcs.csClient.Commit(ctx, req)
|
||||
resp, err = dcs.csClient.Commit(ctx, req)
|
||||
if err != nil {
|
||||
return false, NewRpcError(err, "Commit", dcs.host, req)
|
||||
}
|
||||
@@ -911,10 +893,11 @@ func (dcs *DoltChunkStore) Close() error {
|
||||
return dcs.finalizer()
|
||||
}
|
||||
|
||||
// getting this working using the simplest approach first
|
||||
func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, error) {
|
||||
hashToChunk := dcs.cache.GetAndClearChunksToFlush()
|
||||
|
||||
// Uploads all chunks in |hashToChunk| to the remote store and returns
|
||||
// the manifest entries that correspond to the new table files. Used
|
||||
// by |Commit|. Typically |hashToChunk| will have come from our |wb|
|
||||
// |writeBuffer|.
|
||||
func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[hash.Hash]nbs.CompressedChunk) (map[hash.Hash]int, error) {
|
||||
if len(hashToChunk) == 0 {
|
||||
return map[hash.Hash]int{}, nil
|
||||
}
|
||||
@@ -922,7 +905,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
|
||||
chnks := make([]chunks.Chunk, 0, len(hashToChunk))
|
||||
for _, chable := range hashToChunk {
|
||||
ch, err := chable.ToChunk()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -951,7 +933,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
|
||||
}
|
||||
|
||||
for h, contentHash := range hashToContentHash {
|
||||
// Can parallelize this in the future if needed
|
||||
err := dcs.uploadTableFileWithRetries(ctx, h, uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) {
|
||||
data := hashToData[h]
|
||||
return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil
|
||||
|
||||
@@ -15,134 +15,70 @@
|
||||
package remotestorage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
)
|
||||
|
||||
const (
|
||||
// averageChunkSize is used to estimate the size of chunk for purposes of avoiding excessive memory usage
|
||||
averageChunkSize = 1 << 12
|
||||
)
|
||||
|
||||
// mapChunkCache is a ChunkCache implementation that stores everything in an in memory map.
|
||||
// mapChunkCache is a simple ChunkCache implementation that stores
|
||||
// cached chunks and has records in two separate lru caches.
|
||||
type mapChunkCache struct {
|
||||
mu *sync.Mutex
|
||||
hashToChunk map[hash.Hash]nbs.ToChunker
|
||||
toFlush map[hash.Hash]nbs.ToChunker
|
||||
cm CapacityMonitor
|
||||
chunks *lru.TwoQueueCache[hash.Hash, nbs.ToChunker]
|
||||
has *lru.TwoQueueCache[hash.Hash, struct{}]
|
||||
}
|
||||
|
||||
const defaultCacheChunkCapacity = 32 * 1024
|
||||
const defaultCacheHasCapacity = 1024 * 1024
|
||||
|
||||
func newMapChunkCache() *mapChunkCache {
|
||||
return NewMapChunkCacheWithCapacity(defaultCacheChunkCapacity, defaultCacheHasCapacity)
|
||||
}
|
||||
|
||||
func NewMapChunkCacheWithCapacity(maxChunkCapacity, maxHasCapacity int) *mapChunkCache {
|
||||
chunks, err := lru.New2Q[hash.Hash, nbs.ToChunker](maxChunkCapacity)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
has, err := lru.New2Q[hash.Hash, struct{}](maxHasCapacity)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &mapChunkCache{
|
||||
&sync.Mutex{},
|
||||
make(map[hash.Hash]nbs.ToChunker),
|
||||
make(map[hash.Hash]nbs.ToChunker),
|
||||
NewUncappedCapacityMonitor(),
|
||||
chunks,
|
||||
has,
|
||||
}
|
||||
}
|
||||
|
||||
// used by DoltHub API
|
||||
func NewMapChunkCacheWithMaxCapacity(maxCapacity int64) *mapChunkCache {
|
||||
return &mapChunkCache{
|
||||
&sync.Mutex{},
|
||||
make(map[hash.Hash]nbs.ToChunker),
|
||||
make(map[hash.Hash]nbs.ToChunker),
|
||||
NewFixedCapacityMonitor(maxCapacity),
|
||||
func (cache *mapChunkCache) InsertChunks(cs []nbs.ToChunker) {
|
||||
for _, c := range cs {
|
||||
cache.chunks.Add(c.Hash(), c)
|
||||
}
|
||||
}
|
||||
|
||||
// Put puts a slice of chunks into the cache. Returns an error if the cache capacity has been exceeded.
|
||||
func (mcc *mapChunkCache) Put(chnks []nbs.ToChunker) error {
|
||||
mcc.mu.Lock()
|
||||
defer mcc.mu.Unlock()
|
||||
|
||||
for i := 0; i < len(chnks); i++ {
|
||||
c := chnks[i]
|
||||
h := c.Hash()
|
||||
|
||||
if curr, ok := mcc.hashToChunk[h]; ok {
|
||||
if !curr.IsEmpty() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if mcc.cm.CapacityExceeded(averageChunkSize) {
|
||||
return ErrCacheCapacityExceeded
|
||||
}
|
||||
|
||||
mcc.hashToChunk[h] = c
|
||||
|
||||
if !c.IsEmpty() {
|
||||
mcc.toFlush[h] = c
|
||||
func (cache *mapChunkCache) GetCachedChunks(hs hash.HashSet) map[hash.Hash]nbs.ToChunker {
|
||||
ret := make(map[hash.Hash]nbs.ToChunker)
|
||||
for h := range hs {
|
||||
c, ok := cache.chunks.Get(h)
|
||||
if ok {
|
||||
ret[h] = c
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return ret
|
||||
}
|
||||
|
||||
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
|
||||
// is put in it's place
|
||||
func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]nbs.ToChunker {
|
||||
hashToChunk := make(map[hash.Hash]nbs.ToChunker)
|
||||
func (cache *mapChunkCache) InsertHas(hs hash.HashSet) {
|
||||
for h := range hs {
|
||||
cache.has.Add(h, struct{}{})
|
||||
}
|
||||
}
|
||||
|
||||
mcc.mu.Lock()
|
||||
defer mcc.mu.Unlock()
|
||||
|
||||
for h := range hashes {
|
||||
if c, ok := mcc.hashToChunk[h]; ok {
|
||||
hashToChunk[h] = c
|
||||
} else {
|
||||
hashToChunk[h] = nbs.EmptyCompressedChunk
|
||||
func (cache *mapChunkCache) GetCachedHas(hs hash.HashSet) (absent hash.HashSet) {
|
||||
ret := make(hash.HashSet)
|
||||
for h := range hs {
|
||||
if !cache.has.Contains(h) {
|
||||
ret.Insert(h)
|
||||
}
|
||||
}
|
||||
|
||||
return hashToChunk
|
||||
}
|
||||
|
||||
// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
|
||||
func (mcc *mapChunkCache) Has(hashes hash.HashSet) (absent hash.HashSet) {
|
||||
absent = make(hash.HashSet)
|
||||
|
||||
mcc.mu.Lock()
|
||||
defer mcc.mu.Unlock()
|
||||
|
||||
for h := range hashes {
|
||||
if _, ok := mcc.hashToChunk[h]; !ok {
|
||||
absent[h] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
return absent
|
||||
}
|
||||
|
||||
func (mcc *mapChunkCache) PutChunk(ch nbs.ToChunker) error {
|
||||
mcc.mu.Lock()
|
||||
defer mcc.mu.Unlock()
|
||||
|
||||
h := ch.Hash()
|
||||
if existing, ok := mcc.hashToChunk[h]; !ok || existing.IsEmpty() {
|
||||
if mcc.cm.CapacityExceeded(averageChunkSize) {
|
||||
return ErrCacheCapacityExceeded
|
||||
}
|
||||
mcc.hashToChunk[h] = ch
|
||||
mcc.toFlush[h] = ch
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
|
||||
// between the last time GetAndClearChunksToFlush was called and now.
|
||||
func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker {
|
||||
newToFlush := make(map[hash.Hash]nbs.ToChunker)
|
||||
|
||||
mcc.mu.Lock()
|
||||
defer mcc.mu.Unlock()
|
||||
|
||||
toFlush := mcc.toFlush
|
||||
mcc.toFlush = newToFlush
|
||||
|
||||
return toFlush
|
||||
return ret
|
||||
}
|
||||
|
||||
@@ -15,10 +15,8 @@
|
||||
package remotestorage
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"math/rand/v2"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
@@ -27,68 +25,111 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
)
|
||||
|
||||
func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []nbs.ToChunker) {
|
||||
chks := make([]nbs.ToChunker, n)
|
||||
hashes := make(hash.HashSet)
|
||||
for i := 0; i < n; i++ {
|
||||
size := int(rng.Int31n(99) + 1)
|
||||
bytes := make([]byte, size)
|
||||
for j := 0; j < size; j++ {
|
||||
bytes[j] = byte(rng.Int31n(255))
|
||||
func TestMapChunkCache(t *testing.T) {
|
||||
t.Run("New", func(t *testing.T) {
|
||||
assert.NotNil(t, newMapChunkCache())
|
||||
assert.NotNil(t, NewMapChunkCacheWithCapacity(32, 32))
|
||||
assert.Panics(t, func() {
|
||||
assert.NotNil(t, NewMapChunkCacheWithCapacity(-1, 32))
|
||||
})
|
||||
assert.Panics(t, func() {
|
||||
assert.NotNil(t, NewMapChunkCacheWithCapacity(32, -1))
|
||||
})
|
||||
})
|
||||
t.Run("CachesChunks", func(t *testing.T) {
|
||||
var seed [32]byte
|
||||
rand := rand.NewChaCha8(seed)
|
||||
cache := NewMapChunkCacheWithCapacity(8, 8)
|
||||
inserted := make(hash.HashSet)
|
||||
// Insert some chunks.
|
||||
for i := 0; i < 8; i++ {
|
||||
bs := make([]byte, 512)
|
||||
rand.Read(bs)
|
||||
chk := chunks.NewChunk(bs)
|
||||
inserted.Insert(chk.Hash())
|
||||
cc := nbs.ChunkToCompressedChunk(chk)
|
||||
cache.InsertChunks([]nbs.ToChunker{cc})
|
||||
}
|
||||
|
||||
chk := nbs.ChunkToCompressedChunk(chunks.NewChunk(bytes))
|
||||
chks[i] = chk
|
||||
// Query for those chunks, plus some that were not inserted.
|
||||
query := make(hash.HashSet)
|
||||
for h := range inserted {
|
||||
query.Insert(h)
|
||||
}
|
||||
for i := 0; i < 8; i++ {
|
||||
var bs [512]byte
|
||||
rand.Read(bs[:])
|
||||
query.Insert(hash.Of(bs[:]))
|
||||
}
|
||||
|
||||
hashes[chk.Hash()] = struct{}{}
|
||||
}
|
||||
// Only got back the inserted chunks...
|
||||
cached := cache.GetCachedChunks(query)
|
||||
assert.Len(t, cached, 8)
|
||||
|
||||
return hashes, chks
|
||||
}
|
||||
|
||||
func TestMapChunkCache(t *testing.T) {
|
||||
const chunkBatchSize = 10
|
||||
|
||||
seed := time.Now().UnixNano()
|
||||
rng := rand.New(rand.NewSource(seed))
|
||||
hashes, chks := genRandomChunks(rng, chunkBatchSize)
|
||||
|
||||
mapChunkCache := newMapChunkCache()
|
||||
mapChunkCache.Put(chks)
|
||||
hashToChunk := mapChunkCache.Get(hashes)
|
||||
|
||||
assert.Equal(t, len(hashToChunk), chunkBatchSize, "Did not read back all chunks (seed %d)", seed)
|
||||
|
||||
absent := mapChunkCache.Has(hashes)
|
||||
|
||||
assert.Equal(t, len(absent), 0, "Missing chunks that were added (seed %d)", seed)
|
||||
|
||||
toFlush := mapChunkCache.GetAndClearChunksToFlush()
|
||||
|
||||
assert.True(t, reflect.DeepEqual(toFlush, hashToChunk), "unexpected or missing chunks to flush (seed %d)", seed)
|
||||
|
||||
moreHashes, moreChks := genRandomChunks(rng, chunkBatchSize)
|
||||
|
||||
joinedHashes := make(hash.HashSet)
|
||||
|
||||
for h := range hashes {
|
||||
joinedHashes[h] = struct{}{}
|
||||
}
|
||||
|
||||
for h := range moreHashes {
|
||||
joinedHashes[h] = struct{}{}
|
||||
}
|
||||
|
||||
absent = mapChunkCache.Has(joinedHashes)
|
||||
|
||||
assert.True(t, reflect.DeepEqual(absent, moreHashes), "unexpected absent hashset (seed %d)", seed)
|
||||
|
||||
mapChunkCache.PutChunk(chks[0])
|
||||
mapChunkCache.PutChunk(moreChks[0])
|
||||
|
||||
toFlush = mapChunkCache.GetAndClearChunksToFlush()
|
||||
|
||||
expected := map[hash.Hash]nbs.ToChunker{moreChks[0].Hash(): moreChks[0]}
|
||||
eq := reflect.DeepEqual(toFlush, expected)
|
||||
assert.True(t, eq, "Missing or unexpected chunks to flush (seed %d)", seed)
|
||||
// If we insert more than our max size, and query
|
||||
// for everything inserted, we get back a result
|
||||
// set matching our max size.
|
||||
for i := 0; i < 64; i++ {
|
||||
bs := make([]byte, 512)
|
||||
rand.Read(bs)
|
||||
chk := chunks.NewChunk(bs)
|
||||
inserted.Insert(chk.Hash())
|
||||
cc := nbs.ChunkToCompressedChunk(chk)
|
||||
cache.InsertChunks([]nbs.ToChunker{cc})
|
||||
}
|
||||
cached = cache.GetCachedChunks(inserted)
|
||||
assert.Len(t, cached, 8)
|
||||
})
|
||||
t.Run("CachesHasRecords", func(t *testing.T) {
|
||||
var seed [32]byte
|
||||
rand := rand.NewChaCha8(seed)
|
||||
cache := NewMapChunkCacheWithCapacity(8, 8)
|
||||
query := make(hash.HashSet)
|
||||
for i := 0; i < 64; i++ {
|
||||
var bs [512]byte
|
||||
rand.Read(bs[:])
|
||||
query.Insert(hash.Of(bs[:]))
|
||||
}
|
||||
|
||||
// Querying an empty cache returns all the hashes.
|
||||
res := cache.GetCachedHas(query)
|
||||
assert.NotSame(t, res, query)
|
||||
assert.Len(t, res, 64)
|
||||
for h := range query {
|
||||
_, ok := res[h]
|
||||
assert.True(t, ok, "everything in query is in res")
|
||||
}
|
||||
|
||||
// Insert 8 of our query hashes into the cache.
|
||||
insert := make(hash.HashSet)
|
||||
insertTwo := make(hash.HashSet)
|
||||
i := 0
|
||||
for h := range query {
|
||||
if i < 8 {
|
||||
insert.Insert(h)
|
||||
} else {
|
||||
insertTwo.Insert(h)
|
||||
}
|
||||
i += 1
|
||||
if i == 16 {
|
||||
break
|
||||
}
|
||||
}
|
||||
cache.InsertHas(insert)
|
||||
|
||||
// Querying our original query set returns expected results.
|
||||
res = cache.GetCachedHas(query)
|
||||
assert.Len(t, res, 64-8)
|
||||
for h := range query {
|
||||
if _, ok := insert[h]; !ok {
|
||||
_, ok = res[h]
|
||||
assert.True(t, ok, "everything in query that is not in insert is in res")
|
||||
}
|
||||
}
|
||||
|
||||
// Inserting another 8 hashes hits max limit. Only 8 records cached.
|
||||
cache.InsertHas(insertTwo)
|
||||
res = cache.GetCachedHas(query)
|
||||
assert.Len(t, res, 64-8)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -20,31 +20,22 @@ import (
|
||||
)
|
||||
|
||||
// noopChunkCache is a ChunkCache implementation that stores nothing
|
||||
// ever. Using a noopChunkCache with a remotestore.DoltChunkStore
|
||||
// will cause the DoltChunkStore to behave incorrectly when _writing_
|
||||
// dolt repositories; this should only be used for read-only use
|
||||
// cases.
|
||||
// ever. This causes all fetches to go to the remote server.
|
||||
var noopChunkCache = &noopChunkCacheImpl{}
|
||||
|
||||
type noopChunkCacheImpl struct {
|
||||
}
|
||||
|
||||
func (*noopChunkCacheImpl) Put(chnks []nbs.ToChunker) error {
|
||||
func (*noopChunkCacheImpl) InsertChunks(cs []nbs.ToChunker) {
|
||||
}
|
||||
|
||||
func (*noopChunkCacheImpl) GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.ToChunker {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]nbs.ToChunker {
|
||||
return make(map[hash.Hash]nbs.ToChunker)
|
||||
func (*noopChunkCacheImpl) InsertHas(h hash.HashSet) {
|
||||
}
|
||||
|
||||
func (*noopChunkCacheImpl) Has(hashes hash.HashSet) (absent hash.HashSet) {
|
||||
return hashes
|
||||
}
|
||||
|
||||
func (*noopChunkCacheImpl) PutChunk(ch nbs.ToChunker) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker {
|
||||
panic("noopChunkCache does not support GetAndClearChunksToFlush().")
|
||||
func (*noopChunkCacheImpl) GetCachedHas(h hash.HashSet) (absent hash.HashSet) {
|
||||
return h
|
||||
}
|
||||
|
||||
@@ -0,0 +1,163 @@
|
||||
// Copyright 2025 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remotestorage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
)
|
||||
|
||||
type WriteBuffer interface {
|
||||
// Add a compressed chunk to the write buffer. It will be
|
||||
// returned from future calls to |GetAllForWrite| until a
|
||||
// write is successful.
|
||||
Put(nbs.CompressedChunk) error
|
||||
|
||||
// Returns the current set of written chunks. After this
|
||||
// returns, concurrent calls to other methods may block until
|
||||
// |WriteCompleted| is called. Calls to |GetAllForWrite| must
|
||||
// be followed by a call to |WriteCompleted| once the write
|
||||
// attempt is finished.
|
||||
GetAllForWrite() map[hash.Hash]nbs.CompressedChunk
|
||||
|
||||
// Called after a call to |GetAllForWrite|, this records
|
||||
// success or failure of the write operation. If the write
|
||||
// operation was successful, then the written chunks are now
|
||||
// in the upstream and they can be cleared from the write
|
||||
// buffer. Otherwise, the written chunks are retained in the
|
||||
// write buffer so that the write can be retried.
|
||||
WriteCompleted(success bool)
|
||||
|
||||
// ChunkStore clients expect to read their own writes before a
|
||||
// commit. On the get path, remotestorage should add buffered
|
||||
// chunks matching a given |query| to its |result|. On the
|
||||
// HasMany path, remotestorage should remove present chunks
|
||||
// from its absent set on the HasMany response.
|
||||
AddBufferedChunks(query hash.HashSet, result map[hash.Hash]nbs.ToChunker)
|
||||
// Removes the addresses of any buffered chunks from |hashes|.
|
||||
// Used to filter the |absent| response of a HasMany call so
|
||||
// that buffered chunks are not considered absent.
|
||||
RemovePresentChunks(hashes hash.HashSet)
|
||||
}
|
||||
|
||||
type noopWriteBuffer struct {
|
||||
}
|
||||
|
||||
var _ WriteBuffer = noopWriteBuffer{}
|
||||
|
||||
func (noopWriteBuffer) Put(nbs.CompressedChunk) error {
|
||||
return errors.New("unsupported operation: write on a read-only remotestorage chunk store")
|
||||
}
|
||||
|
||||
func (noopWriteBuffer) GetAllForWrite() map[hash.Hash]nbs.CompressedChunk {
|
||||
panic("attempt to upload chunks on a read-only remotestorage chunk store")
|
||||
}
|
||||
|
||||
func (noopWriteBuffer) WriteCompleted(success bool) {
|
||||
panic("call to WriteCompleted on a noopWriteBuffer")
|
||||
}
|
||||
|
||||
func (noopWriteBuffer) AddBufferedChunks(hash.HashSet, map[hash.Hash]nbs.ToChunker) {
|
||||
}
|
||||
|
||||
func (noopWriteBuffer) RemovePresentChunks(hash.HashSet) {
|
||||
}
|
||||
|
||||
// A simple WriteBuffer which buffers unlimited data in memory and
|
||||
// waits to flush it.
|
||||
type mapWriteBuffer struct {
|
||||
mu sync.Mutex
|
||||
cond sync.Cond
|
||||
// Set when an outstanding write is in progress, |Put| will
|
||||
// block against this. Reset by |WriteCompleted| after the
|
||||
// appropriate updates to |chunks| have been made.
|
||||
writing bool
|
||||
chunks map[hash.Hash]nbs.CompressedChunk
|
||||
}
|
||||
|
||||
func newMapWriteBuffer() *mapWriteBuffer {
|
||||
ret := &mapWriteBuffer{
|
||||
chunks: make(map[hash.Hash]nbs.CompressedChunk),
|
||||
}
|
||||
ret.cond.L = &ret.mu
|
||||
return ret
|
||||
}
|
||||
|
||||
func (b *mapWriteBuffer) Put(cc nbs.CompressedChunk) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
for b.writing {
|
||||
b.cond.Wait()
|
||||
}
|
||||
b.chunks[cc.H] = cc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *mapWriteBuffer) GetAllForWrite() map[hash.Hash]nbs.CompressedChunk {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
for b.writing {
|
||||
b.cond.Wait()
|
||||
}
|
||||
b.writing = true
|
||||
return b.chunks
|
||||
}
|
||||
|
||||
func (b *mapWriteBuffer) WriteCompleted(success bool) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if !b.writing {
|
||||
panic("mapWriteBuffer got WriteCompleted while no write was in progress")
|
||||
}
|
||||
b.writing = false
|
||||
if success {
|
||||
b.chunks = make(map[hash.Hash]nbs.CompressedChunk)
|
||||
}
|
||||
b.cond.Broadcast()
|
||||
}
|
||||
|
||||
func (b *mapWriteBuffer) AddBufferedChunks(hs hash.HashSet, res map[hash.Hash]nbs.ToChunker) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
for h := range hs {
|
||||
cc, ok := b.chunks[h]
|
||||
if ok {
|
||||
res[h] = cc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *mapWriteBuffer) RemovePresentChunks(hashes hash.HashSet) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if len(b.chunks) < len(hashes) {
|
||||
for h := range b.chunks {
|
||||
hashes.Remove(h)
|
||||
}
|
||||
} else {
|
||||
var toRemove []hash.Hash
|
||||
for h := range hashes {
|
||||
if _, ok := b.chunks[h]; ok {
|
||||
toRemove = append(toRemove, h)
|
||||
}
|
||||
}
|
||||
for _, h := range toRemove {
|
||||
hashes.Remove(h)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,154 @@
|
||||
// Copyright 2025 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remotestorage
|
||||
|
||||
import (
|
||||
"math/rand/v2"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
)
|
||||
|
||||
func TestNoopWriteBuffer(t *testing.T) {
|
||||
cache := noopWriteBuffer{}
|
||||
err := cache.Put(nbs.CompressedChunk{})
|
||||
assert.NotNil(t, err)
|
||||
assert.Panics(t, func() {
|
||||
cache.GetAllForWrite()
|
||||
})
|
||||
assert.Panics(t, func() {
|
||||
cache.WriteCompleted(false)
|
||||
})
|
||||
cache.AddBufferedChunks(make(hash.HashSet), make(map[hash.Hash]nbs.ToChunker))
|
||||
cache.RemovePresentChunks(make(hash.HashSet))
|
||||
}
|
||||
|
||||
func TestMapWriteBuffer(t *testing.T) {
|
||||
t.Run("SmokeTest", func(t *testing.T) {
|
||||
// A bit of a typical usage...
|
||||
cache := newMapWriteBuffer()
|
||||
var seed [32]byte
|
||||
rand := rand.NewChaCha8(seed)
|
||||
query := make(hash.HashSet)
|
||||
for i := 0; i < 64; i++ {
|
||||
var bs [512]byte
|
||||
rand.Read(bs[:])
|
||||
query.Insert(hash.Of(bs[:]))
|
||||
}
|
||||
res := make(map[hash.Hash]nbs.ToChunker)
|
||||
cache.AddBufferedChunks(query, res)
|
||||
assert.Len(t, res, 0)
|
||||
|
||||
// Insert some chunks.
|
||||
inserted := make(hash.HashSet)
|
||||
for i := 0; i < 8; i++ {
|
||||
bs := make([]byte, 512)
|
||||
rand.Read(bs)
|
||||
chk := chunks.NewChunk(bs)
|
||||
cache.Put(nbs.ChunkToCompressedChunk(chk))
|
||||
inserted.Insert(chk.Hash())
|
||||
}
|
||||
cache.AddBufferedChunks(query, res)
|
||||
assert.Len(t, res, 0)
|
||||
for h := range inserted {
|
||||
query.Insert(h)
|
||||
}
|
||||
cache.AddBufferedChunks(query, res)
|
||||
assert.Len(t, res, 8)
|
||||
|
||||
cache.RemovePresentChunks(query)
|
||||
assert.Len(t, query, 64)
|
||||
for h := range inserted {
|
||||
query.Insert(h)
|
||||
}
|
||||
|
||||
// Cache continues working for reads during a pending write.
|
||||
toWrite := cache.GetAllForWrite()
|
||||
assert.Len(t, toWrite, 8)
|
||||
res = make(map[hash.Hash]nbs.ToChunker)
|
||||
cache.AddBufferedChunks(query, res)
|
||||
assert.Len(t, res, 8)
|
||||
cache.RemovePresentChunks(query)
|
||||
assert.Len(t, query, 64)
|
||||
|
||||
// After a failure, chunks are still present.
|
||||
cache.WriteCompleted(false)
|
||||
toWrite = cache.GetAllForWrite()
|
||||
assert.Len(t, toWrite, 8)
|
||||
// And after a success, they are cleared.
|
||||
cache.WriteCompleted(true)
|
||||
toWrite = cache.GetAllForWrite()
|
||||
assert.Len(t, toWrite, 0)
|
||||
cache.WriteCompleted(true)
|
||||
})
|
||||
t.Run("ConcurrentPuts", func(t *testing.T) {
|
||||
cache := newMapWriteBuffer()
|
||||
var seed [32]byte
|
||||
seedRand := rand.NewChaCha8(seed)
|
||||
const numThreads = 16
|
||||
var wg sync.WaitGroup
|
||||
// One thread is writing and failing...
|
||||
wg.Add(1)
|
||||
var writes int
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
cached := cache.GetAllForWrite()
|
||||
writes += 1
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
if len(cached) == numThreads*32 {
|
||||
cache.WriteCompleted(false)
|
||||
return
|
||||
}
|
||||
cache.WriteCompleted(false)
|
||||
time.Sleep(100 * time.Microsecond)
|
||||
}
|
||||
}()
|
||||
wg.Add(numThreads)
|
||||
var inserted [numThreads][]hash.Hash
|
||||
for i := 0; i < numThreads; i++ {
|
||||
var seed [32]byte
|
||||
seedRand.Read(seed[:])
|
||||
randCha := rand.NewChaCha8(seed)
|
||||
go func() {
|
||||
for j := 0; j < 32; j++ {
|
||||
var bs [512]byte
|
||||
randCha.Read(bs[:])
|
||||
chk := chunks.NewChunk(bs[:])
|
||||
cache.Put(nbs.ChunkToCompressedChunk(chk))
|
||||
inserted[i] = append(inserted[i], chk.Hash())
|
||||
}
|
||||
defer wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
// All writes failed. Let's make sure we have everything we expect.
|
||||
cached := cache.GetAllForWrite()
|
||||
assert.Len(t, cached, 32*numThreads)
|
||||
for i := range inserted {
|
||||
for _, h := range inserted[i] {
|
||||
_, ok := cached[h]
|
||||
assert.True(t, ok)
|
||||
}
|
||||
}
|
||||
cache.WriteCompleted(true)
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user