integration-tests/go-sql-server-driver: concurrent_gc_test.go: Add an initial test case.

This commit is contained in:
Aaron Son
2023-03-16 16:12:48 -07:00
parent fd9e2a4bea
commit b9fbf6d733
7 changed files with 123 additions and 432 deletions
@@ -1,207 +0,0 @@
// Copyright 2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package doltdb
import (
"context"
"flag"
"fmt"
"net/url"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/message"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
var ConcGCMapEditsSleepMillis = flag.Int("conc_gc_map_edits_sleep_millis", 100, "number of seconds to sleep between GCs")
var ConcGCMapEditsNumIters = flag.Int("conc_gc_map_edits_num_iters", 16, "number of GC iterations to run")
var ConcGCMapEditsOldGenPerc = flag.Float64("conc_gc_map_edits_old_gen_perc", 1, "percentage of refs to put in the old gen")
func TestConcurrentMapEditsGC(t *testing.T) {
scales := []int{
1,
2,
4,
8,
16,
32,
64,
128,
256,
512,
1024,
2048,
4096,
}
tmpdir, err := os.MkdirTemp("", "prolly_concurrent_gc_test-*")
require.NoError(t, err)
t.Cleanup(func() {
os.RemoveAll(tmpdir)
})
ctx := context.Background()
db, _, ns, err := dbfactory.FileFactory{}.CreateDB(ctx, types.Format_DOLT, &url.URL{
Scheme: "file",
Path: tmpdir,
}, nil)
require.NoError(t, err)
var mu sync.RWMutex
roots := make([]hash.Hash, len(scales))
js := make([]int, len(scales))
sharedPool := pool.NewBuffPool()
stop := make(chan struct{})
var wg sync.WaitGroup
wg.Add(len(scales))
for i := range scales {
i := i
ctx := context.Background()
ds, err := db.GetDataset(ctx, fmt.Sprintf("scale_%d", i))
require.NoError(t, err)
kd := val.NewTupleDescriptor(
val.Type{Enc: val.Int64Enc, Nullable: false},
)
vd := val.NewTupleDescriptor(
val.Type{Enc: val.Int64Enc, Nullable: true},
)
var mutKeyBuilder = val.NewTupleBuilder(vd)
var mutValBuilder = val.NewTupleBuilder(kd)
serializer := message.NewProllyMapSerializer(vd, ns.Pool())
chunker, err := tree.NewEmptyChunker(ctx, ns, serializer)
require.NoError(t, err)
for j := 0; j < scales[i]; j++ {
newNumber := int64(j)
mutKeyBuilder.PutInt64(0, newNumber)
k := mutKeyBuilder.Build(sharedPool)
mutValBuilder.PutInt64(0, newNumber)
v := mutValBuilder.Build(sharedPool)
err := chunker.AddPair(ctx, tree.Item(k), tree.Item(v))
require.NoError(t, err)
}
root, err := chunker.Done(ctx)
require.NoError(t, err)
m := prolly.NewMap(root, ns, kd, vd)
cm, err := datas.NewCommitMeta("testing", "testing@testing.com", "some commit")
require.NoError(t, err)
ds, err = db.Commit(ctx, ds, tree.ValueFromNode(m.Node()), datas.CommitOptions{
Meta: cm,
})
require.NoError(t, err)
addr, ok := ds.MaybeHeadAddr()
require.True(t, ok)
roots[i] = addr
go func() {
defer wg.Done()
j := 1
for {
select {
case <-stop:
t.Logf("scales[i]: %d, js[i]: %d", scales[i], j)
js[i] = j
return
default:
}
newNumber := int64(scales[i] + j)
mutKeyBuilder.PutInt64(0, newNumber)
k := mutKeyBuilder.Build(sharedPool)
mutValBuilder.PutInt64(0, newNumber)
v := mutValBuilder.Build(sharedPool)
mut := m.Mutate()
err := mut.Put(ctx, k, v)
require.NoError(t, err)
j++
m, err = mut.Map(ctx)
require.NoError(t, err)
cm, err := datas.NewCommitMeta("testing", "testing@testing.com", "some commit")
require.NoError(t, err)
ds, err = db.Commit(ctx, ds, tree.ValueFromNode(m.Node()), datas.CommitOptions{
Meta: cm,
})
require.NoError(t, err)
addr, ok := ds.MaybeHeadAddr()
require.True(t, ok)
mu.RLock()
roots[i] = addr
mu.RUnlock()
}
}()
}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < *ConcGCMapEditsNumIters; i++ {
time.Sleep(time.Duration(*ConcGCMapEditsSleepMillis) * time.Millisecond)
ctx := context.Background()
newhashes := make(hash.HashSet)
oldhashes := make(hash.HashSet)
cutoff := int(float64(len(roots)-1) * (*ConcGCMapEditsOldGenPerc))
mu.Lock()
for i, h := range roots {
if i >= cutoff {
newhashes.Insert(h)
} else {
oldhashes.Insert(h)
}
}
mu.Unlock()
t.Logf("%v: running gc", time.Now())
i := 0
err := db.(datas.GarbageCollector).GC(ctx, oldhashes, newhashes, nil)
t.Logf("%v: finished gc with err: %v", i, err)
}
close(stop)
}()
wg.Wait()
// TODO: Load maps at roots and iterate them...
}
-221
View File
@@ -16,242 +16,21 @@ package doltdb_test
import (
"context"
"fmt"
"io"
"testing"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/cmd/dolt/commands"
"github.com/dolthub/dolt/go/cmd/dolt/commands/engine"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures"
"github.com/dolthub/dolt/go/store/hash"
)
func TestConcurrentGC(t *testing.T) {
dprocedures.DoltGCFeatureFlag = true
// Each test spawns concurrent clients execute queries, some clients
// will trigger gc processes. When all clients finish, we run a final
// gc process to validate that no dangling references remain.
tests := []concurrentGCtest{
{
name: "smoke test",
setup: []string{"CREATE TABLE t (id int primary key)"},
clients: []client{
{
id: "client",
queries: func(id string, i int) (queries []string) {
return []string{
fmt.Sprintf("INSERT INTO t VALUES (%d)", i),
"SELECT COUNT(*) FROM t",
}
}},
},
},
{
name: "aaron's repro",
// create 32 branches
setup: func() []string {
queries := []string{
"CREATE TABLE t (id int primary key, val TEXT)",
"CALL dcommit('-Am', 'new table t');",
}
for b := 0; b < 32; b++ {
q := fmt.Sprintf("CALL dolt_checkout('-b', 'branch_%d');", b)
queries = append(queries, q)
}
return queries
}(),
// for each branch, create a single client that
// writes only to that branch
clients: func() []client {
cc := []client{{
id: "gc_client",
queries: func(string, int) []string {
return []string{"CALL dolt_gc();"}
},
}}
for b := 0; b < 32; b++ {
branch := fmt.Sprintf("branch_%d", b)
cc = append(cc, client{
id: branch,
queries: func(id string, idx int) []string {
q := fmt.Sprintf("INSERT INTO `%s/%s`.t VALUES (%d, '%s_%d')",
testDB, id, idx, id, idx)
return []string{q}
}})
}
return cc
}(),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testConcurrentGC(t, test)
})
}
}
// concurrentGCtest tests concurrent GC
type concurrentGCtest struct {
name string
setup []string
clients []client
}
type client struct {
id string
queries func(id string, idx int) []string
}
func testConcurrentGC(t *testing.T, test concurrentGCtest) {
ctx := context.Background()
eng := setupSqlEngine(t, ctx)
err := runWithSqlSession(ctx, eng, func(sctx *sql.Context, eng *engine.SqlEngine) error {
for _, q := range test.setup {
if err := execQuery(sctx, eng, q); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
eg, ectx := errgroup.WithContext(ctx)
for _, c := range test.clients {
cl := c
require.NotZero(t, cl.id)
eg.Go(func() error {
return runWithSqlSession(ectx, eng, func(sctx *sql.Context, eng *engine.SqlEngine) error {
defer func() {
if r := recover(); r != nil {
// t.Logf("panic in client %s: %v", cl.id, r)
}
}()
// generate and run 128 batches of queries
for i := 0; i < 128; i++ {
batch := cl.queries(cl.id, i)
for _, q := range batch {
qerr := execQuery(sctx, eng, q)
if qerr != nil {
// allow clients to error, but close connection
// todo: restrict errors to dangling refs
// t.Logf("error in client %s: %s", cl.id, qerr.Error())
return nil
}
}
}
return nil
})
})
}
require.NoError(t, eg.Wait())
// now run a simple write, which is allowed to fail spuriously
runWithSqlSession(ctx, eng, func(sctx *sql.Context, eng *engine.SqlEngine) (err error) {
qq := []string{
// ensure we have garbage to collect
"CREATE TABLE garbage (val int)",
"DROP TABLE garbage",
}
for _, q := range qq {
if err = execQuery(sctx, eng, q); err != nil {
return err
}
}
return
})
// now run a full GC and assert we don't find dangling refs
err = runWithSqlSession(ctx, eng, func(sctx *sql.Context, eng *engine.SqlEngine) (err error) {
qq := []string{
// ensure we have garbage to collect
"CREATE TABLE garbage (val int)",
"DROP TABLE garbage",
"CALL dolt_gc()",
}
for _, q := range qq {
if err = execQuery(sctx, eng, q); err != nil {
return err
}
}
return
})
require.NoError(t, err)
}
func runWithSqlSession(ctx context.Context, eng *engine.SqlEngine, cb func(sctx *sql.Context, eng *engine.SqlEngine) error) error {
sess, err := eng.NewDoltSession(ctx, sql.NewBaseSession())
if err != nil {
return err
}
sctx := sql.NewContext(ctx, sql.WithSession(sess))
sctx.SetCurrentDatabase(testDB)
sctx.Session.SetClient(sql.Client{User: "root", Address: "%"})
return cb(sctx, eng)
}
func execQuery(sctx *sql.Context, eng *engine.SqlEngine, query string) (err error) {
_, iter, err := eng.Query(sctx, query)
if err != nil {
return err
}
defer func() {
// tx commit
if cerr := iter.Close(sctx); err == nil {
err = cerr
}
}()
for {
_, err = iter.Next(sctx)
if err == io.EOF {
err = nil
break
} else if err != nil {
return err
}
}
return
}
const (
// DB name matches dtestutils.CreateTestEnv()
testDB = "dolt"
)
func setupSqlEngine(t *testing.T, ctx context.Context) (eng *engine.SqlEngine) {
dEnv := dtestutils.CreateTestEnv()
mrEnv, err := env.MultiEnvForDirectory(
ctx,
dEnv.Config.WriteableConfig(),
dEnv.FS,
dEnv.Version,
dEnv.IgnoreLockFile,
dEnv)
if err != nil {
panic(err)
}
eng, err = engine.NewSqlEngine(ctx, mrEnv, engine.FormatNull, &engine.SqlEngineConfig{
ServerUser: "root",
ServerHost: "localhost",
Autocommit: true,
})
if err != nil {
panic(err)
}
return
}
func TestGarbageCollection(t *testing.T) {
require.True(t, true)
assert.True(t, true)
@@ -33,12 +33,12 @@ const (
)
func init() {
if os.Getenv("DOLT_ENABLE_GC_PROCEDURE") != "" {
DoltGCFeatureFlag = true
if os.Getenv("DOLT_DISABLE_GC_PROCEDURE") != "" {
DoltGCFeatureFlag = false
}
}
var DoltGCFeatureFlag = false
var DoltGCFeatureFlag = true
// doltGC is the stored procedure to run online garbage collection on a database.
func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {
@@ -69,7 +69,6 @@ SQL
}
@test "garbage_collection: call GC in sql script" {
export DOLT_ENABLE_GC_PROCEDURE="true"
dolt sql <<SQL
CREATE TABLE t (pk int primary key);
INSERT INTO t VALUES (1),(2),(3);
@@ -0,0 +1,117 @@
// Copyright 2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"testing"
"strings"
"time"
"fmt"
sqldriver "database/sql/driver"
"golang.org/x/sync/errgroup"
"github.com/stretchr/testify/require"
driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver"
)
func TestConcurrentGC(t *testing.T) {
u, err := driver.NewDoltUser()
require.NoError(t, err)
t.Cleanup(func() {
u.Cleanup()
})
rs, err := u.MakeRepoStore()
require.NoError(t, err)
repo, err := rs.MakeRepo("concurrent_gc_test")
require.NoError(t, err)
server := MakeServer(t, repo, &driver.Server{})
server.DBName = "concurrent_gc_test"
db, err := server.DB(driver.Connection{User: "root"})
require.NoError(t, err)
defer db.Close()
func() {
conn, err := db.Conn(context.Background())
require.NoError(t, err)
defer conn.Close()
// We're going to bootstrap the database with a table which has id, val, id == [0,7*1024], val == 0.
_, err = conn.ExecContext(context.Background(), "create table vals (id int primary key, val int)")
require.NoError(t, err)
vals := []string{}
for i := 0; i < 7 * 1024; i++ {
vals = append(vals, fmt.Sprintf("(%d,0)", i))
}
_, err = conn.ExecContext(context.Background(), "insert into vals values " + strings.Join(vals, ","))
require.NoError(t, err)
}()
start := time.Now()
dur := 30 * time.Second
var eg errgroup.Group
// We're going to spawn 8 threads, each running mutations on their own part of the table...
for i := 0; i < 8; i++ {
i := i * 1024
eg.Go(func() error {
for j := 0; time.Since(start) < dur; j++ {
func() {
conn, err := db.Conn(context.Background())
if err != nil {
t.Logf("err in Conn: %v", err)
return
}
defer conn.Close()
_, err = conn.ExecContext(context.Background(), "update vals set val = val+1 where id = ?", i)
if err != nil {
t.Logf("err in Exec: %v", err)
}
}()
}
return nil
})
}
// We spawn a thread which calls dolt_gc() periodically
eg.Go(func() error {
for time.Since(start) < dur {
func() {
conn, err := db.Conn(context.Background())
if err != nil {
t.Logf("err in Conn for dolt_gc: %v", err)
return
}
_, err = conn.ExecContext(context.Background(), "call dolt_gc()")
if err != nil {
t.Logf("err in Exec dolt_gc: %v", err)
}
conn.Raw(func(_ any) error {
return sqldriver.ErrBadConn
})
}()
time.Sleep(100 * time.Millisecond)
}
return nil
})
eg.Wait()
}
@@ -5,6 +5,7 @@ go 1.19
require (
github.com/dolthub/dolt/go v0.40.4
github.com/stretchr/testify v1.8.1
golang.org/x/sync v0.1.0
gopkg.in/yaml.v3 v3.0.1
)
@@ -14,6 +14,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=