Merge remote-tracking branch 'origin/main' into aaron/autogc

This commit is contained in:
Aaron Son
2025-02-21 08:41:59 -08:00
14 changed files with 752 additions and 295 deletions

View File

@@ -206,6 +206,8 @@ var getWriteableSystemTables = func() []string {
}
}
// getGeneratedSystemTables is a function which returns the names of all generated system tables. This is not
// simply a list of constants because doltgres swaps out the functions used to generate different names.
var getGeneratedSystemTables = func() []string {
return []string{
GetBranchesTableName(),
@@ -218,6 +220,7 @@ var getGeneratedSystemTables = func() []string {
GetStatusTableName(),
GetRemotesTableName(),
GetHelpTableName(),
GetBackupsTableName(),
}
}
@@ -386,6 +389,10 @@ var GetHelpTableName = func() string {
return HelpTableName
}
var GetBackupsTableName = func() string {
return BackupsTableName
}
const (
// LogTableName is the log system table name
LogTableName = "dolt_log"
@@ -593,5 +600,6 @@ const (
)
const (
HelpTableName = "dolt_help"
HelpTableName = "dolt_help"
BackupsTableName = "dolt_backups"
)

View File

@@ -19,22 +19,21 @@ import (
"github.com/dolthub/dolt/go/store/nbs"
)
// ChunkCache is an interface used for caching chunks
// ChunkCache is an interface used for caching chunks and has presence that
// has already been fetched from remotestorage. Care should be taken when
// using ChunkCache if it is possible for the remote to GC, since in that
// case the cache could contain stale data.
type ChunkCache interface {
// Put puts a slice of chunks into the cache. Error returned if the cache capacity has been exceeded.
Put(c []nbs.ToChunker) error
// Insert some observed / fetched chunks into the cache. These
// chunks may or may not be returned in the future.
InsertChunks(cs []nbs.ToChunker)
// Get previously cached chunks, if they are still available.
GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.ToChunker
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
Get(h hash.HashSet) map[hash.Hash]nbs.ToChunker
// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
Has(h hash.HashSet) (absent hash.HashSet)
// PutChunk puts a single chunk in the cache. Returns an error if the cache capacity has been exceeded.
PutChunk(chunk nbs.ToChunker) error
// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker
// Insert all hashes in |h| as existing in the remote.
InsertHas(h hash.HashSet)
// Returns the absent set from |h|, filtering it by records
// which are known to be present in the remote based on
// previous |InsertHas| calls.
GetCachedHas(h hash.HashSet) (absent hash.HashSet)
}

View File

@@ -46,7 +46,7 @@ import (
"github.com/dolthub/dolt/go/store/types"
)
var ErrCacheCapacityExceeded = errors.New("too much data: the cache capacity has been reached")
var ErrWriteBufferCapacityExceeded = errors.New("too much data: the write buffer capacity has been reached")
var ErrUploadFailed = errors.New("upload failed")
@@ -123,6 +123,7 @@ type DoltChunkStore struct {
root hash.Hash
csClient remotesapi.ChunkStoreServiceClient
finalizer func() error
wb WriteBuffer
cache ChunkCache
metadata *remotesapi.GetRepoMetadataResponse
nbf *types.NomsBinFormat
@@ -172,6 +173,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
csClient: csClient,
finalizer: func() error { return nil },
cache: newMapChunkCache(),
wb: newMapWriteBuffer(),
metadata: metadata,
nbf: nbf,
httpFetcher: globalHttpFetcher,
@@ -185,79 +187,46 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
return cs, nil
}
func (dcs *DoltChunkStore) clone() *DoltChunkStore {
ret := *dcs
ret.repoToken = new(atomic.Value)
return &ret
}
func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: fetcher,
params: dcs.params,
stats: dcs.stats,
}
ret := dcs.clone()
ret.httpFetcher = fetcher
return ret
}
func (dcs *DoltChunkStore) WithNoopWriteBuffer() *DoltChunkStore {
ret := dcs.clone()
ret.wb = noopWriteBuffer{}
return ret
}
func (dcs *DoltChunkStore) WithWriteBuffer(wb WriteBuffer) *DoltChunkStore {
ret := dcs.clone()
ret.wb = wb
return ret
}
func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: noopChunkCache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: dcs.params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.cache = noopChunkCache
return ret
}
func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: dcs.params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.cache = cache
return ret
}
func (dcs *DoltChunkStore) WithNetworkRequestParams(params NetworkRequestParams) *DoltChunkStore {
return &DoltChunkStore{
repoId: dcs.repoId,
repoPath: dcs.repoPath,
repoToken: new(atomic.Value),
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
httpFetcher: dcs.httpFetcher,
params: params,
stats: dcs.stats,
logger: dcs.logger,
}
ret := dcs.clone()
ret.params = params
return ret
}
func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger) {
@@ -344,19 +313,18 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
ctx, span := tracer.Start(ctx, "remotestorage.GetManyCompressed")
defer span.End()
hashToChunk := dcs.cache.Get(hashes)
hashToChunk := dcs.cache.GetCachedChunks(hashes)
dcs.wb.AddBufferedChunks(hashes, hashToChunk)
span.SetAttributes(attribute.Int("num_hashes", len(hashes)), attribute.Int("cache_hits", len(hashToChunk)))
atomic.AddUint32(&dcs.stats.Hits, uint32(len(hashToChunk)))
notCached := make([]hash.Hash, 0, len(hashes))
for h := range hashes {
c := hashToChunk[h]
if c == nil || c.IsEmpty() {
notCached = append(notCached, h)
} else {
if c, ok := hashToChunk[h]; ok {
found(ctx, c)
} else {
notCached = append(notCached, h)
}
}
@@ -633,9 +601,7 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash
}
// Don't forward on empty/not found chunks.
if !cc.IsEmpty() {
if err := dcs.cache.PutChunk(cc); err != nil {
return err
}
dcs.cache.InsertChunks([]nbs.ToChunker{cc})
found(egCtx, cc)
}
}
@@ -663,8 +629,8 @@ const maxHasManyBatchSize = 16 * 1024
// absent from the store.
func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
// get the set of hashes that isn't already in the cache
notCached := dcs.cache.Has(hashes)
notCached := dcs.cache.GetCachedHas(hashes)
dcs.wb.RemovePresentChunks(notCached)
if len(notCached) == 0 {
return notCached, nil
}
@@ -673,7 +639,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
hashSl, byteSl := HashSetToSlices(notCached)
absent := make(hash.HashSet)
var found []nbs.ToChunker
found := make(hash.HashSet)
var err error
batchItr(len(hashSl), maxHasManyBatchSize, func(st, end int) (stop bool) {
@@ -714,8 +680,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
absent[currHash] = struct{}{}
j++
} else {
c := nbs.ChunkToCompressedChunk(chunks.NewChunkWithHash(currHash, []byte{}))
found = append(found, c)
found.Insert(currHash)
}
}
@@ -731,9 +696,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
}
if len(found) > 0 {
if err := dcs.cache.Put(found); err != nil {
return hash.HashSet{}, err
}
dcs.cache.InsertHas(found)
}
return absent, nil
@@ -767,7 +730,8 @@ func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chu
}
cc := nbs.ChunkToCompressedChunk(c)
if err := dcs.cache.Put([]nbs.ToChunker{cc}); err != nil {
err = dcs.wb.Put(cc)
if err != nil {
return err
}
return nil
@@ -850,8 +814,26 @@ func (dcs *DoltChunkStore) loadRoot(ctx context.Context) error {
// persisted root hash from last to current (or keeps it the same).
// If last doesn't match the root in persistent storage, returns false.
func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
hashToChunkCount, err := dcs.uploadChunks(ctx)
toUpload := dcs.wb.GetAllForWrite()
var resp *remotesapi.CommitResponse
defer func() {
// We record success based on the CommitResponse
// |Success| field, which is only |true| when the call
// successfully updated the root hash of the
// remote. With the current API, we cannot distinguish
// the case where the commit failed because |last| was
// stale but the provided chunks were still
// successfully added to the remote. If the write is
// retried in such a case, we will currently write the
// chunks to the remote again.
if resp != nil {
dcs.wb.WriteCompleted(resp.Success)
} else {
dcs.wb.WriteCompleted(false)
}
}()
hashToChunkCount, err := dcs.uploadChunks(ctx, toUpload)
if err != nil {
return false, err
}
@@ -873,7 +855,7 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
NbsVersion: nbs.StorageVersion,
},
}
resp, err := dcs.csClient.Commit(ctx, req)
resp, err = dcs.csClient.Commit(ctx, req)
if err != nil {
return false, NewRpcError(err, "Commit", dcs.host, req)
}
@@ -911,10 +893,11 @@ func (dcs *DoltChunkStore) Close() error {
return dcs.finalizer()
}
// getting this working using the simplest approach first
func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, error) {
hashToChunk := dcs.cache.GetAndClearChunksToFlush()
// Uploads all chunks in |hashToChunk| to the remote store and returns
// the manifest entries that correspond to the new table files. Used
// by |Commit|. Typically |hashToChunk| will have come from our |wb|
// |writeBuffer|.
func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[hash.Hash]nbs.CompressedChunk) (map[hash.Hash]int, error) {
if len(hashToChunk) == 0 {
return map[hash.Hash]int{}, nil
}
@@ -922,7 +905,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
chnks := make([]chunks.Chunk, 0, len(hashToChunk))
for _, chable := range hashToChunk {
ch, err := chable.ToChunk()
if err != nil {
return nil, err
}
@@ -951,7 +933,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
}
for h, contentHash := range hashToContentHash {
// Can parallelize this in the future if needed
err := dcs.uploadTableFileWithRetries(ctx, h, uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) {
data := hashToData[h]
return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil

View File

@@ -15,134 +15,70 @@
package remotestorage
import (
"sync"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
)
const (
// averageChunkSize is used to estimate the size of chunk for purposes of avoiding excessive memory usage
averageChunkSize = 1 << 12
)
// mapChunkCache is a ChunkCache implementation that stores everything in an in memory map.
// mapChunkCache is a simple ChunkCache implementation that stores
// cached chunks and has records in two separate lru caches.
type mapChunkCache struct {
mu *sync.Mutex
hashToChunk map[hash.Hash]nbs.ToChunker
toFlush map[hash.Hash]nbs.ToChunker
cm CapacityMonitor
chunks *lru.TwoQueueCache[hash.Hash, nbs.ToChunker]
has *lru.TwoQueueCache[hash.Hash, struct{}]
}
const defaultCacheChunkCapacity = 32 * 1024
const defaultCacheHasCapacity = 1024 * 1024
func newMapChunkCache() *mapChunkCache {
return NewMapChunkCacheWithCapacity(defaultCacheChunkCapacity, defaultCacheHasCapacity)
}
func NewMapChunkCacheWithCapacity(maxChunkCapacity, maxHasCapacity int) *mapChunkCache {
chunks, err := lru.New2Q[hash.Hash, nbs.ToChunker](maxChunkCapacity)
if err != nil {
panic(err)
}
has, err := lru.New2Q[hash.Hash, struct{}](maxHasCapacity)
if err != nil {
panic(err)
}
return &mapChunkCache{
&sync.Mutex{},
make(map[hash.Hash]nbs.ToChunker),
make(map[hash.Hash]nbs.ToChunker),
NewUncappedCapacityMonitor(),
chunks,
has,
}
}
// used by DoltHub API
func NewMapChunkCacheWithMaxCapacity(maxCapacity int64) *mapChunkCache {
return &mapChunkCache{
&sync.Mutex{},
make(map[hash.Hash]nbs.ToChunker),
make(map[hash.Hash]nbs.ToChunker),
NewFixedCapacityMonitor(maxCapacity),
func (cache *mapChunkCache) InsertChunks(cs []nbs.ToChunker) {
for _, c := range cs {
cache.chunks.Add(c.Hash(), c)
}
}
// Put puts a slice of chunks into the cache. Returns an error if the cache capacity has been exceeded.
func (mcc *mapChunkCache) Put(chnks []nbs.ToChunker) error {
mcc.mu.Lock()
defer mcc.mu.Unlock()
for i := 0; i < len(chnks); i++ {
c := chnks[i]
h := c.Hash()
if curr, ok := mcc.hashToChunk[h]; ok {
if !curr.IsEmpty() {
continue
}
}
if mcc.cm.CapacityExceeded(averageChunkSize) {
return ErrCacheCapacityExceeded
}
mcc.hashToChunk[h] = c
if !c.IsEmpty() {
mcc.toFlush[h] = c
func (cache *mapChunkCache) GetCachedChunks(hs hash.HashSet) map[hash.Hash]nbs.ToChunker {
ret := make(map[hash.Hash]nbs.ToChunker)
for h := range hs {
c, ok := cache.chunks.Get(h)
if ok {
ret[h] = c
}
}
return nil
return ret
}
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]nbs.ToChunker {
hashToChunk := make(map[hash.Hash]nbs.ToChunker)
func (cache *mapChunkCache) InsertHas(hs hash.HashSet) {
for h := range hs {
cache.has.Add(h, struct{}{})
}
}
mcc.mu.Lock()
defer mcc.mu.Unlock()
for h := range hashes {
if c, ok := mcc.hashToChunk[h]; ok {
hashToChunk[h] = c
} else {
hashToChunk[h] = nbs.EmptyCompressedChunk
func (cache *mapChunkCache) GetCachedHas(hs hash.HashSet) (absent hash.HashSet) {
ret := make(hash.HashSet)
for h := range hs {
if !cache.has.Contains(h) {
ret.Insert(h)
}
}
return hashToChunk
}
// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
func (mcc *mapChunkCache) Has(hashes hash.HashSet) (absent hash.HashSet) {
absent = make(hash.HashSet)
mcc.mu.Lock()
defer mcc.mu.Unlock()
for h := range hashes {
if _, ok := mcc.hashToChunk[h]; !ok {
absent[h] = struct{}{}
}
}
return absent
}
func (mcc *mapChunkCache) PutChunk(ch nbs.ToChunker) error {
mcc.mu.Lock()
defer mcc.mu.Unlock()
h := ch.Hash()
if existing, ok := mcc.hashToChunk[h]; !ok || existing.IsEmpty() {
if mcc.cm.CapacityExceeded(averageChunkSize) {
return ErrCacheCapacityExceeded
}
mcc.hashToChunk[h] = ch
mcc.toFlush[h] = ch
}
return nil
}
// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker {
newToFlush := make(map[hash.Hash]nbs.ToChunker)
mcc.mu.Lock()
defer mcc.mu.Unlock()
toFlush := mcc.toFlush
mcc.toFlush = newToFlush
return toFlush
return ret
}

View File

@@ -15,10 +15,8 @@
package remotestorage
import (
"math/rand"
"reflect"
"math/rand/v2"
"testing"
"time"
"github.com/stretchr/testify/assert"
@@ -27,68 +25,111 @@ import (
"github.com/dolthub/dolt/go/store/nbs"
)
func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []nbs.ToChunker) {
chks := make([]nbs.ToChunker, n)
hashes := make(hash.HashSet)
for i := 0; i < n; i++ {
size := int(rng.Int31n(99) + 1)
bytes := make([]byte, size)
for j := 0; j < size; j++ {
bytes[j] = byte(rng.Int31n(255))
func TestMapChunkCache(t *testing.T) {
t.Run("New", func(t *testing.T) {
assert.NotNil(t, newMapChunkCache())
assert.NotNil(t, NewMapChunkCacheWithCapacity(32, 32))
assert.Panics(t, func() {
assert.NotNil(t, NewMapChunkCacheWithCapacity(-1, 32))
})
assert.Panics(t, func() {
assert.NotNil(t, NewMapChunkCacheWithCapacity(32, -1))
})
})
t.Run("CachesChunks", func(t *testing.T) {
var seed [32]byte
rand := rand.NewChaCha8(seed)
cache := NewMapChunkCacheWithCapacity(8, 8)
inserted := make(hash.HashSet)
// Insert some chunks.
for i := 0; i < 8; i++ {
bs := make([]byte, 512)
rand.Read(bs)
chk := chunks.NewChunk(bs)
inserted.Insert(chk.Hash())
cc := nbs.ChunkToCompressedChunk(chk)
cache.InsertChunks([]nbs.ToChunker{cc})
}
chk := nbs.ChunkToCompressedChunk(chunks.NewChunk(bytes))
chks[i] = chk
// Query for those chunks, plus some that were not inserted.
query := make(hash.HashSet)
for h := range inserted {
query.Insert(h)
}
for i := 0; i < 8; i++ {
var bs [512]byte
rand.Read(bs[:])
query.Insert(hash.Of(bs[:]))
}
hashes[chk.Hash()] = struct{}{}
}
// Only got back the inserted chunks...
cached := cache.GetCachedChunks(query)
assert.Len(t, cached, 8)
return hashes, chks
}
func TestMapChunkCache(t *testing.T) {
const chunkBatchSize = 10
seed := time.Now().UnixNano()
rng := rand.New(rand.NewSource(seed))
hashes, chks := genRandomChunks(rng, chunkBatchSize)
mapChunkCache := newMapChunkCache()
mapChunkCache.Put(chks)
hashToChunk := mapChunkCache.Get(hashes)
assert.Equal(t, len(hashToChunk), chunkBatchSize, "Did not read back all chunks (seed %d)", seed)
absent := mapChunkCache.Has(hashes)
assert.Equal(t, len(absent), 0, "Missing chunks that were added (seed %d)", seed)
toFlush := mapChunkCache.GetAndClearChunksToFlush()
assert.True(t, reflect.DeepEqual(toFlush, hashToChunk), "unexpected or missing chunks to flush (seed %d)", seed)
moreHashes, moreChks := genRandomChunks(rng, chunkBatchSize)
joinedHashes := make(hash.HashSet)
for h := range hashes {
joinedHashes[h] = struct{}{}
}
for h := range moreHashes {
joinedHashes[h] = struct{}{}
}
absent = mapChunkCache.Has(joinedHashes)
assert.True(t, reflect.DeepEqual(absent, moreHashes), "unexpected absent hashset (seed %d)", seed)
mapChunkCache.PutChunk(chks[0])
mapChunkCache.PutChunk(moreChks[0])
toFlush = mapChunkCache.GetAndClearChunksToFlush()
expected := map[hash.Hash]nbs.ToChunker{moreChks[0].Hash(): moreChks[0]}
eq := reflect.DeepEqual(toFlush, expected)
assert.True(t, eq, "Missing or unexpected chunks to flush (seed %d)", seed)
// If we insert more than our max size, and query
// for everything inserted, we get back a result
// set matching our max size.
for i := 0; i < 64; i++ {
bs := make([]byte, 512)
rand.Read(bs)
chk := chunks.NewChunk(bs)
inserted.Insert(chk.Hash())
cc := nbs.ChunkToCompressedChunk(chk)
cache.InsertChunks([]nbs.ToChunker{cc})
}
cached = cache.GetCachedChunks(inserted)
assert.Len(t, cached, 8)
})
t.Run("CachesHasRecords", func(t *testing.T) {
var seed [32]byte
rand := rand.NewChaCha8(seed)
cache := NewMapChunkCacheWithCapacity(8, 8)
query := make(hash.HashSet)
for i := 0; i < 64; i++ {
var bs [512]byte
rand.Read(bs[:])
query.Insert(hash.Of(bs[:]))
}
// Querying an empty cache returns all the hashes.
res := cache.GetCachedHas(query)
assert.NotSame(t, res, query)
assert.Len(t, res, 64)
for h := range query {
_, ok := res[h]
assert.True(t, ok, "everything in query is in res")
}
// Insert 8 of our query hashes into the cache.
insert := make(hash.HashSet)
insertTwo := make(hash.HashSet)
i := 0
for h := range query {
if i < 8 {
insert.Insert(h)
} else {
insertTwo.Insert(h)
}
i += 1
if i == 16 {
break
}
}
cache.InsertHas(insert)
// Querying our original query set returns expected results.
res = cache.GetCachedHas(query)
assert.Len(t, res, 64-8)
for h := range query {
if _, ok := insert[h]; !ok {
_, ok = res[h]
assert.True(t, ok, "everything in query that is not in insert is in res")
}
}
// Inserting another 8 hashes hits max limit. Only 8 records cached.
cache.InsertHas(insertTwo)
res = cache.GetCachedHas(query)
assert.Len(t, res, 64-8)
})
}

View File

@@ -20,31 +20,22 @@ import (
)
// noopChunkCache is a ChunkCache implementation that stores nothing
// ever. Using a noopChunkCache with a remotestore.DoltChunkStore
// will cause the DoltChunkStore to behave incorrectly when _writing_
// dolt repositories; this should only be used for read-only use
// cases.
// ever. This causes all fetches to go to the remote server.
var noopChunkCache = &noopChunkCacheImpl{}
type noopChunkCacheImpl struct {
}
func (*noopChunkCacheImpl) Put(chnks []nbs.ToChunker) error {
func (*noopChunkCacheImpl) InsertChunks(cs []nbs.ToChunker) {
}
func (*noopChunkCacheImpl) GetCachedChunks(h hash.HashSet) map[hash.Hash]nbs.ToChunker {
return nil
}
func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]nbs.ToChunker {
return make(map[hash.Hash]nbs.ToChunker)
func (*noopChunkCacheImpl) InsertHas(h hash.HashSet) {
}
func (*noopChunkCacheImpl) Has(hashes hash.HashSet) (absent hash.HashSet) {
return hashes
}
func (*noopChunkCacheImpl) PutChunk(ch nbs.ToChunker) error {
return nil
}
func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]nbs.ToChunker {
panic("noopChunkCache does not support GetAndClearChunksToFlush().")
func (*noopChunkCacheImpl) GetCachedHas(h hash.HashSet) (absent hash.HashSet) {
return h
}

View File

@@ -0,0 +1,163 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotestorage
import (
"errors"
"sync"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
)
type WriteBuffer interface {
// Add a compressed chunk to the write buffer. It will be
// returned from future calls to |GetAllForWrite| until a
// write is successful.
Put(nbs.CompressedChunk) error
// Returns the current set of written chunks. After this
// returns, concurrent calls to other methods may block until
// |WriteCompleted| is called. Calls to |GetAllForWrite| must
// be followed by a call to |WriteCompleted| once the write
// attempt is finished.
GetAllForWrite() map[hash.Hash]nbs.CompressedChunk
// Called after a call to |GetAllForWrite|, this records
// success or failure of the write operation. If the write
// operation was successful, then the written chunks are now
// in the upstream and they can be cleared from the write
// buffer. Otherwise, the written chunks are retained in the
// write buffer so that the write can be retried.
WriteCompleted(success bool)
// ChunkStore clients expect to read their own writes before a
// commit. On the get path, remotestorage should add buffered
// chunks matching a given |query| to its |result|. On the
// HasMany path, remotestorage should remove present chunks
// from its absent set on the HasMany response.
AddBufferedChunks(query hash.HashSet, result map[hash.Hash]nbs.ToChunker)
// Removes the addresses of any buffered chunks from |hashes|.
// Used to filter the |absent| response of a HasMany call so
// that buffered chunks are not considered absent.
RemovePresentChunks(hashes hash.HashSet)
}
type noopWriteBuffer struct {
}
var _ WriteBuffer = noopWriteBuffer{}
func (noopWriteBuffer) Put(nbs.CompressedChunk) error {
return errors.New("unsupported operation: write on a read-only remotestorage chunk store")
}
func (noopWriteBuffer) GetAllForWrite() map[hash.Hash]nbs.CompressedChunk {
panic("attempt to upload chunks on a read-only remotestorage chunk store")
}
func (noopWriteBuffer) WriteCompleted(success bool) {
panic("call to WriteCompleted on a noopWriteBuffer")
}
func (noopWriteBuffer) AddBufferedChunks(hash.HashSet, map[hash.Hash]nbs.ToChunker) {
}
func (noopWriteBuffer) RemovePresentChunks(hash.HashSet) {
}
// A simple WriteBuffer which buffers unlimited data in memory and
// waits to flush it.
type mapWriteBuffer struct {
mu sync.Mutex
cond sync.Cond
// Set when an outstanding write is in progress, |Put| will
// block against this. Reset by |WriteCompleted| after the
// appropriate updates to |chunks| have been made.
writing bool
chunks map[hash.Hash]nbs.CompressedChunk
}
func newMapWriteBuffer() *mapWriteBuffer {
ret := &mapWriteBuffer{
chunks: make(map[hash.Hash]nbs.CompressedChunk),
}
ret.cond.L = &ret.mu
return ret
}
func (b *mapWriteBuffer) Put(cc nbs.CompressedChunk) error {
b.mu.Lock()
defer b.mu.Unlock()
for b.writing {
b.cond.Wait()
}
b.chunks[cc.H] = cc
return nil
}
func (b *mapWriteBuffer) GetAllForWrite() map[hash.Hash]nbs.CompressedChunk {
b.mu.Lock()
defer b.mu.Unlock()
for b.writing {
b.cond.Wait()
}
b.writing = true
return b.chunks
}
func (b *mapWriteBuffer) WriteCompleted(success bool) {
b.mu.Lock()
defer b.mu.Unlock()
if !b.writing {
panic("mapWriteBuffer got WriteCompleted while no write was in progress")
}
b.writing = false
if success {
b.chunks = make(map[hash.Hash]nbs.CompressedChunk)
}
b.cond.Broadcast()
}
func (b *mapWriteBuffer) AddBufferedChunks(hs hash.HashSet, res map[hash.Hash]nbs.ToChunker) {
b.mu.Lock()
defer b.mu.Unlock()
for h := range hs {
cc, ok := b.chunks[h]
if ok {
res[h] = cc
}
}
}
func (b *mapWriteBuffer) RemovePresentChunks(hashes hash.HashSet) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.chunks) < len(hashes) {
for h := range b.chunks {
hashes.Remove(h)
}
} else {
var toRemove []hash.Hash
for h := range hashes {
if _, ok := b.chunks[h]; ok {
toRemove = append(toRemove, h)
}
}
for _, h := range toRemove {
hashes.Remove(h)
}
}
}

View File

@@ -0,0 +1,154 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotestorage
import (
"math/rand/v2"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
)
func TestNoopWriteBuffer(t *testing.T) {
cache := noopWriteBuffer{}
err := cache.Put(nbs.CompressedChunk{})
assert.NotNil(t, err)
assert.Panics(t, func() {
cache.GetAllForWrite()
})
assert.Panics(t, func() {
cache.WriteCompleted(false)
})
cache.AddBufferedChunks(make(hash.HashSet), make(map[hash.Hash]nbs.ToChunker))
cache.RemovePresentChunks(make(hash.HashSet))
}
func TestMapWriteBuffer(t *testing.T) {
t.Run("SmokeTest", func(t *testing.T) {
// A bit of a typical usage...
cache := newMapWriteBuffer()
var seed [32]byte
rand := rand.NewChaCha8(seed)
query := make(hash.HashSet)
for i := 0; i < 64; i++ {
var bs [512]byte
rand.Read(bs[:])
query.Insert(hash.Of(bs[:]))
}
res := make(map[hash.Hash]nbs.ToChunker)
cache.AddBufferedChunks(query, res)
assert.Len(t, res, 0)
// Insert some chunks.
inserted := make(hash.HashSet)
for i := 0; i < 8; i++ {
bs := make([]byte, 512)
rand.Read(bs)
chk := chunks.NewChunk(bs)
cache.Put(nbs.ChunkToCompressedChunk(chk))
inserted.Insert(chk.Hash())
}
cache.AddBufferedChunks(query, res)
assert.Len(t, res, 0)
for h := range inserted {
query.Insert(h)
}
cache.AddBufferedChunks(query, res)
assert.Len(t, res, 8)
cache.RemovePresentChunks(query)
assert.Len(t, query, 64)
for h := range inserted {
query.Insert(h)
}
// Cache continues working for reads during a pending write.
toWrite := cache.GetAllForWrite()
assert.Len(t, toWrite, 8)
res = make(map[hash.Hash]nbs.ToChunker)
cache.AddBufferedChunks(query, res)
assert.Len(t, res, 8)
cache.RemovePresentChunks(query)
assert.Len(t, query, 64)
// After a failure, chunks are still present.
cache.WriteCompleted(false)
toWrite = cache.GetAllForWrite()
assert.Len(t, toWrite, 8)
// And after a success, they are cleared.
cache.WriteCompleted(true)
toWrite = cache.GetAllForWrite()
assert.Len(t, toWrite, 0)
cache.WriteCompleted(true)
})
t.Run("ConcurrentPuts", func(t *testing.T) {
cache := newMapWriteBuffer()
var seed [32]byte
seedRand := rand.NewChaCha8(seed)
const numThreads = 16
var wg sync.WaitGroup
// One thread is writing and failing...
wg.Add(1)
var writes int
go func() {
defer wg.Done()
for {
cached := cache.GetAllForWrite()
writes += 1
time.Sleep(5 * time.Millisecond)
if len(cached) == numThreads*32 {
cache.WriteCompleted(false)
return
}
cache.WriteCompleted(false)
time.Sleep(100 * time.Microsecond)
}
}()
wg.Add(numThreads)
var inserted [numThreads][]hash.Hash
for i := 0; i < numThreads; i++ {
var seed [32]byte
seedRand.Read(seed[:])
randCha := rand.NewChaCha8(seed)
go func() {
for j := 0; j < 32; j++ {
var bs [512]byte
randCha.Read(bs[:])
chk := chunks.NewChunk(bs[:])
cache.Put(nbs.ChunkToCompressedChunk(chk))
inserted[i] = append(inserted[i], chk.Hash())
}
defer wg.Done()
}()
}
wg.Wait()
// All writes failed. Let's make sure we have everything we expect.
cached := cache.GetAllForWrite()
assert.Len(t, cached, 32*numThreads)
for i := range inserted {
for _, h := range inserted[i] {
_, ok := cached[h]
assert.True(t, ok)
}
}
cache.WriteCompleted(true)
})
}

View File

@@ -722,6 +722,14 @@ func (db Database) getTableInsensitive(ctx *sql.Context, head *doltdb.Commit, ds
if !resolve.UseSearchPath || isDoltgresSystemTable {
dt, found = dtables.NewHelpTable(ctx, db.Name(), lwrName), true
}
case doltdb.GetBackupsTableName(), doltdb.BackupsTableName:
isDoltgresSystemTable, err := resolve.IsDoltgresSystemTable(ctx, tname, root)
if err != nil {
return nil, false, err
}
if !resolve.UseSearchPath || isDoltgresSystemTable {
dt, found = dtables.NewBackupsTable(db, lwrName), true
}
}
if found {

View File

@@ -0,0 +1,117 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dtables
import (
"fmt"
"io"
"sort"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
)
type BackupsTable struct {
db dsess.SqlDatabase
tableName string
}
var _ sql.Table = (*BackupsTable)(nil)
func NewBackupsTable(db dsess.SqlDatabase, tableName string) *BackupsTable {
return &BackupsTable{db: db, tableName: tableName}
}
func (bt BackupsTable) Name() string {
return bt.tableName
}
func (bt BackupsTable) String() string {
return bt.tableName
}
func (bt BackupsTable) Schema() sql.Schema {
columns := []*sql.Column{
{Name: "name", Type: types.Text, Source: bt.tableName, PrimaryKey: true, Nullable: false, DatabaseSource: bt.db.Name()},
{Name: "url", Type: types.Text, Source: bt.tableName, PrimaryKey: false, Nullable: false, DatabaseSource: bt.db.Name()},
}
return columns
}
func (bt BackupsTable) Collation() sql.CollationID {
return sql.Collation_Default
}
func (bt BackupsTable) Partitions(context *sql.Context) (sql.PartitionIter, error) {
return index.SinglePartitionIterFromNomsMap(nil), nil
}
func (bt BackupsTable) PartitionRows(context *sql.Context, _ sql.Partition) (sql.RowIter, error) {
return newBackupsIter(context)
}
type backupsItr struct {
names []string
urls map[string]string
idx int
}
var _ sql.RowIter = (*backupsItr)(nil)
func (bi *backupsItr) Next(ctx *sql.Context) (sql.Row, error) {
if bi.idx < len(bi.names) {
bi.idx++
name := bi.names[bi.idx-1]
return sql.NewRow(name, bi.urls[name]), nil
}
return nil, io.EOF
}
func (bi *backupsItr) Close(_ *sql.Context) error { return nil }
func newBackupsIter(ctx *sql.Context) (*backupsItr, error) {
dbName := ctx.GetCurrentDatabase()
if len(dbName) == 0 {
return nil, fmt.Errorf("Empty database name.")
}
sess := dsess.DSessFromSess(ctx.Session)
dbData, ok := sess.GetDbData(ctx, dbName)
if !ok {
return nil, sql.ErrDatabaseNotFound.New(dbName)
}
backups, err := dbData.Rsr.GetBackups()
if err != nil {
return nil, err
}
names := make([]string, 0)
urls := map[string]string{}
backups.Iter(func(key string, val env.Remote) bool {
names = append(names, key)
urls[key] = val.Url
return true
})
sort.Strings(names)
return &backupsItr{names: names, urls: urls, idx: 0}, nil
}

View File

@@ -1404,6 +1404,12 @@ func TestBrokenSystemTableQueries(t *testing.T) {
enginetest.RunQueryTests(t, h, BrokenSystemTableQueries)
}
func TestBackupsSystemTable(t *testing.T) {
h := newDoltHarness(t)
defer h.Close()
enginetest.TestScript(t, h, BackupsSystemTableQueries)
}
func TestHistorySystemTable(t *testing.T) {
harness := newDoltEnginetestHarness(t).WithParallelism(2)
RunHistorySystemTableTests(t, harness)

View File

@@ -7556,6 +7556,7 @@ var DoltSystemVariables = []queries.ScriptTest{
{
Query: "SHOW TABLES;",
Expected: []sql.Row{
{"dolt_backups"},
{"dolt_branches"},
{"dolt_commit_ancestors"},
{"dolt_commit_diff_test"},

View File

@@ -52,3 +52,54 @@ var BrokenSystemTableQueries = []queries.QueryTest{
Expected: []sql.Row{},
},
}
var BackupsSystemTableQueries = queries.ScriptTest{
Name: "dolt_backups table",
SetUpScript: []string{
`call dolt_backup("add", "backup3", "file:///tmp/backup3");`,
`call dolt_backup("add", "backup1", "file:///tmp/backup1");`,
`call dolt_backup("add", "backup2", "aws://[ddb_table:ddb_s3_bucket]/db1");`,
},
Assertions: []queries.ScriptTestAssertion{
{
// Query for just the names because on Windows the Drive letter is inserted into the file path
Query: "select name from dolt_backups;",
Expected: []sql.Row{
{"backup1"},
{"backup2"},
{"backup3"},
},
},
{
Query: "select url from dolt_backups where name = 'backup2';",
Expected: []sql.Row{{"aws://[ddb_table:ddb_s3_bucket]/db1"}},
},
{
Query: "delete from dolt_backups where name = 'backup1';",
ExpectedErrStr: "table doesn't support DELETE FROM",
},
{
Query: "update dolt_backups set name = 'backup1' where name = 'backup2';",
ExpectedErrStr: "table doesn't support UPDATE",
},
{
Query: "insert into dolt_backups values ('backup4', 'file:///tmp/broken');", // nolint: gas
ExpectedErrStr: "table doesn't support INSERT INTO",
},
{
Query: "call dolt_backup('add', 'backup4', 'aws://[ddb_table_4:ddb_s3_bucket_4]/db1');",
Expected: []sql.Row{{0}},
},
{
Query: "call dolt_backup('remove', 'backup1');",
Expected: []sql.Row{{0}},
},
{
Query: "select * from dolt_backups where url like 'aws://%'",
Expected: []sql.Row{
{"backup2", "aws://[ddb_table:ddb_s3_bucket]/db1"},
{"backup4", "aws://[ddb_table_4:ddb_s3_bucket_4]/db1"},
},
},
},
}

View File

@@ -60,7 +60,7 @@ teardown() {
@test "ls: --system shows system tables" {
run dolt ls --system
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 23 ]
[ "${#lines[@]}" -eq 24 ]
[[ "$output" =~ "System tables:" ]] || false
[[ "$output" =~ "dolt_status" ]] || false
[[ "$output" =~ "dolt_commits" ]] || false
@@ -70,6 +70,7 @@ teardown() {
[[ "$output" =~ "dolt_conflicts" ]] || false
[[ "$output" =~ "dolt_remotes" ]] || false
[[ "$output" =~ "dolt_branches" ]] || false
[[ "$output" =~ "dolt_backups" ]] || false
[[ "$output" =~ "dolt_remote_branches" ]] || false
[[ "$output" =~ "dolt_help" ]] || false
[[ "$output" =~ "dolt_constraint_violations_table_one" ]] || false