go/doltcore/doltdb: fix GC race in value_store

This commit is contained in:
Andy Arthur
2023-02-02 16:17:27 -08:00
parent 64deebfa4e
commit 76b457e566
4 changed files with 36 additions and 9 deletions
+13 -8
View File
@@ -46,12 +46,14 @@ func TestConcurrentGC(t *testing.T) {
name: "smoke test",
setup: []string{"CREATE TABLE t (id int primary key)"},
clients: []client{
{queries: func(id string, i int) (queries []string) {
return []string{
fmt.Sprintf("INSERT INTO t VALUES (%d)", i),
"SELECT COUNT(*) FROM t",
}
}},
{
id: "client",
queries: func(id string, i int) (queries []string) {
return []string{
fmt.Sprintf("INSERT INTO t VALUES (%d)", i),
"SELECT COUNT(*) FROM t",
}
}},
},
},
{
@@ -72,6 +74,7 @@ func TestConcurrentGC(t *testing.T) {
// writes only to that branch
clients: func() []client {
cc := []client{{
id: "gc_client",
queries: func(string, int) []string {
return []string{"CALL dolt_gc();"}
},
@@ -125,6 +128,7 @@ func testConcurrentGC(t *testing.T, test concurrentGCtest) {
eg, ectx := errgroup.WithContext(ctx)
for _, c := range test.clients {
cl := c
require.NotZero(t, cl.id)
eg.Go(func() error {
return runWithSqlSession(ectx, eng, func(sctx *sql.Context, eng *engine.SqlEngine) error {
// generate and run 128 batches of queries
@@ -133,9 +137,10 @@ func testConcurrentGC(t *testing.T, test concurrentGCtest) {
for _, q := range batch {
qerr := execQuery(sctx, eng, q)
if qerr != nil {
// allow clients to error
// allow clients to error, but close connection
// todo: restrict errors to dangling refs
t.Logf("error in client %s: %s", cl.id, qerr.Error())
// t.Logf("error in client %s: %s", cl.id, qerr.Error())
return nil
}
}
}
+1 -1
View File
@@ -821,7 +821,7 @@ func (lvs *ValueStore) gcProcessRefs(ctx context.Context, visited hash.HashSet,
}
// purge the cache
lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size())
lvs.decodedChunks.Purge()
lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize)
lvs.bufferedChunkSize = 0
lvs.withBufferedChildren = map[hash.Hash]uint64{}
+11
View File
@@ -140,6 +140,17 @@ func (c *SizeCache) Drop(key interface{}) {
}
}
func (c *SizeCache) Purge() {
c.mu.Lock()
defer c.mu.Unlock()
for key := range c.cache {
delete(c.cache, key)
}
c.totalSize = 0
c.lru = list.List{}
}
func (c *SizeCache) Size() uint64 {
return c.maxSize
}
@@ -93,6 +93,17 @@ func TestSizeCache(t *testing.T) {
assert.Equal(uint64(800), c.totalSize)
assert.Equal(4, c.lru.Len())
assert.Equal(4, len(c.cache))
c.Purge()
assert.Equal(uint64(0), c.totalSize)
for i, v := range []string{"data-1", "data-2", "data-3", "data-4", "data-5", "data-6", "data-7", "data-8", "data-9"} {
c.Add(hashFromString(v), defSize, v)
maxElements := uint64(i + 1)
if maxElements >= uint64(5) {
maxElements = uint64(5)
}
assert.Equal(maxElements*defSize, c.totalSize)
}
}
func TestSizeCacheWithExpiry(t *testing.T) {