mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-13 11:09:10 -05:00
dolt_gc support: Make waitForGC unblock with error on context canceled. Use more agressive retry-with-backoff policy on awaiting process quiesence in dolt_gc.
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
"github.com/dolthub/dolt/go/cmd/dolt/cli"
|
||||
@@ -100,23 +101,23 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
|
||||
killed[p.Connection] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Look in processes until the connections are actually gone.
|
||||
for i := 0; i < 100; i++ {
|
||||
if i == 100 {
|
||||
return errors.New("unable to establish safepoint.")
|
||||
}
|
||||
params := backoff.NewExponentialBackOff()
|
||||
params.InitialInterval = 1 * time.Millisecond
|
||||
params.MaxInterval = 25 * time.Millisecond
|
||||
params.MaxElapsedTime = 3 * time.Second
|
||||
err := backoff.Retry(func() error {
|
||||
processes := ctx.ProcessList.Processes()
|
||||
done := true
|
||||
for _, p := range processes {
|
||||
if _, ok := killed[p.Connection]; ok {
|
||||
done = false
|
||||
break
|
||||
return errors.New("unable to establish safepoint.")
|
||||
}
|
||||
}
|
||||
if done {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
return nil
|
||||
}, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.Session.SetTransaction(nil)
|
||||
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
|
||||
|
||||
@@ -73,10 +73,14 @@ func (s journalChunkSource) getMany(ctx context.Context, _ *errgroup.Group, reqs
|
||||
var remaining bool
|
||||
// todo: read planning
|
||||
for i := range reqs {
|
||||
if reqs[i].found {
|
||||
continue
|
||||
}
|
||||
data, err := s.get(ctx, *reqs[i].a, stats)
|
||||
if err != nil {
|
||||
return false, err
|
||||
} else if data != nil {
|
||||
reqs[i].found = true
|
||||
ch := chunks.NewChunkWithHash(hash.Hash(*reqs[i].a), data)
|
||||
found(ctx, &ch)
|
||||
} else {
|
||||
@@ -90,12 +94,16 @@ func (s journalChunkSource) getManyCompressed(ctx context.Context, _ *errgroup.G
|
||||
var remaining bool
|
||||
// todo: read planning
|
||||
for i := range reqs {
|
||||
if reqs[i].found {
|
||||
continue
|
||||
}
|
||||
cc, err := s.getCompressed(ctx, *reqs[i].a, stats)
|
||||
if err != nil {
|
||||
return false, err
|
||||
} else if cc.IsEmpty() {
|
||||
remaining = true
|
||||
} else {
|
||||
reqs[i].found = true
|
||||
found(ctx, cc)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,7 +165,10 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash
|
||||
func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) {
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
nbs.waitForGC()
|
||||
err = nbs.waitForGC(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
nbs.checkAllManifestUpdatesExist(ctx, updates)
|
||||
|
||||
@@ -244,7 +247,10 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
|
||||
func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (mi ManifestInfo, err error) {
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
nbs.waitForGC()
|
||||
err = nbs.waitForGC(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
nbs.checkAllManifestUpdatesExist(ctx, updates)
|
||||
|
||||
@@ -392,7 +398,10 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
|
||||
func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root hash.Hash, tableFiles map[hash.Hash]uint32, appendixTableFiles map[hash.Hash]uint32) (err error) {
|
||||
store.mu.Lock()
|
||||
defer store.mu.Unlock()
|
||||
store.waitForGC()
|
||||
err = store.waitForGC(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
contents := manifestContents{
|
||||
root: root,
|
||||
@@ -604,10 +613,20 @@ func (nbs *NomsBlockStore) WithoutConjoiner() *NomsBlockStore {
|
||||
}
|
||||
|
||||
// Wait for GC to complete to continue with writes
|
||||
func (nbs *NomsBlockStore) waitForGC() {
|
||||
for nbs.gcInProgress {
|
||||
func (nbs *NomsBlockStore) waitForGC(ctx context.Context) error {
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
nbs.cond.Broadcast()
|
||||
case <-stop:
|
||||
}
|
||||
}()
|
||||
for nbs.gcInProgress && ctx.Err() == nil {
|
||||
nbs.cond.Wait()
|
||||
}
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
|
||||
@@ -668,7 +687,9 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
|
||||
if nbs.keeperFunc != nil {
|
||||
if nbs.keeperFunc(ch.Hash()) {
|
||||
retry = true
|
||||
nbs.waitForGC()
|
||||
if err := nbs.waitForGC(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1032,7 +1053,10 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
|
||||
|
||||
if nbs.keeperFunc != nil {
|
||||
if nbs.keeperFunc(current) {
|
||||
nbs.waitForGC()
|
||||
err = nbs.waitForGC(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1614,7 +1638,24 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
|
||||
}
|
||||
oldTables := nbs.tables
|
||||
nbs.tables, nbs.upstream = ts, upstream
|
||||
return oldTables.close()
|
||||
err = oldTables.close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// When this is called, we are at a safepoint in the GC process.
|
||||
// We clear novel and the memtable, which are not coming with us
|
||||
// into the new store.
|
||||
oldNovel := nbs.tables.novel
|
||||
nbs.tables.novel = make(chunkSourceSet)
|
||||
for _, css := range oldNovel {
|
||||
err = css.close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
nbs.mt = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetRootChunk changes the root chunk hash from the previous value to the new root.
|
||||
@@ -1625,7 +1666,10 @@ func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash
|
||||
func (nbs *NomsBlockStore) setRootChunk(ctx context.Context, root, previous hash.Hash, checker refCheck) error {
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
nbs.waitForGC()
|
||||
err := nbs.waitForGC(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
err := nbs.updateManifest(ctx, root, previous, checker)
|
||||
|
||||
|
||||
@@ -573,15 +573,15 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS
|
||||
return err
|
||||
}
|
||||
|
||||
if safepointF != nil {
|
||||
err = safepointF()
|
||||
if err != nil {
|
||||
newGen.EndGC()
|
||||
return err
|
||||
}
|
||||
err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, lvs.transitionToFinalizingGC)
|
||||
if err != nil {
|
||||
newGen.EndGC()
|
||||
return err
|
||||
}
|
||||
|
||||
err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, lvs.transitionToFinalizingGC)
|
||||
if safepointF != nil {
|
||||
err = safepointF()
|
||||
}
|
||||
newGen.EndGC()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -610,15 +610,15 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS
|
||||
|
||||
newGenRefs.Insert(root)
|
||||
|
||||
if safepointF != nil {
|
||||
err = safepointF()
|
||||
if err != nil {
|
||||
collector.EndGC()
|
||||
return err
|
||||
}
|
||||
err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, lvs.transitionToFinalizingGC)
|
||||
if err != nil {
|
||||
collector.EndGC()
|
||||
return err
|
||||
}
|
||||
|
||||
err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, lvs.transitionToFinalizingGC)
|
||||
if safepointF != nil {
|
||||
err = safepointF()
|
||||
}
|
||||
collector.EndGC()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -29,6 +29,9 @@ import (
|
||||
)
|
||||
|
||||
func TestConcurrentGC(t *testing.T) {
|
||||
NumThreads := 8
|
||||
Duration := 30 * time.Second
|
||||
|
||||
u, err := driver.NewDoltUser()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
@@ -57,7 +60,7 @@ func TestConcurrentGC(t *testing.T) {
|
||||
_, err = conn.ExecContext(context.Background(), "create table vals (id int primary key, val int)")
|
||||
require.NoError(t, err)
|
||||
vals := []string{}
|
||||
for i := 0; i < 7 * 1024; i++ {
|
||||
for i := 0; i <= 7 * 1024; i++ {
|
||||
vals = append(vals, fmt.Sprintf("(%d,0)", i))
|
||||
}
|
||||
_, err = conn.ExecContext(context.Background(), "insert into vals values " + strings.Join(vals, ","))
|
||||
@@ -65,15 +68,14 @@ func TestConcurrentGC(t *testing.T) {
|
||||
}()
|
||||
|
||||
start := time.Now()
|
||||
dur := 30 * time.Second
|
||||
|
||||
var eg errgroup.Group
|
||||
|
||||
// We're going to spawn 8 threads, each running mutations on their own part of the table...
|
||||
for i := 0; i < 8; i++ {
|
||||
for i := 0; i < NumThreads; i++ {
|
||||
i := i * 1024
|
||||
eg.Go(func() error {
|
||||
for j := 0; time.Since(start) < dur; j++ {
|
||||
for j := 0; time.Since(start) < Duration; j++ {
|
||||
func() {
|
||||
conn, err := db.Conn(context.Background())
|
||||
if err != nil {
|
||||
@@ -93,17 +95,21 @@ func TestConcurrentGC(t *testing.T) {
|
||||
|
||||
// We spawn a thread which calls dolt_gc() periodically
|
||||
eg.Go(func() error {
|
||||
for time.Since(start) < dur {
|
||||
for time.Since(start) < Duration {
|
||||
func() {
|
||||
conn, err := db.Conn(context.Background())
|
||||
if err != nil {
|
||||
t.Logf("err in Conn for dolt_gc: %v", err)
|
||||
return
|
||||
}
|
||||
b := time.Now()
|
||||
_, err = conn.ExecContext(context.Background(), "call dolt_gc()")
|
||||
if err != nil {
|
||||
t.Logf("err in Exec dolt_gc: %v", err)
|
||||
} else {
|
||||
t.Logf("successful dolt_gc took %v", time.Since(b))
|
||||
}
|
||||
// After calling dolt_gc, the connection is bad. Remove it from the connection pool.
|
||||
conn.Raw(func(_ any) error {
|
||||
return sqldriver.ErrBadConn
|
||||
})
|
||||
@@ -114,4 +120,21 @@ func TestConcurrentGC(t *testing.T) {
|
||||
})
|
||||
|
||||
eg.Wait()
|
||||
|
||||
conn, err := db.Conn(context.Background())
|
||||
require.NoError(t, err)
|
||||
rows, err := conn.QueryContext(context.Background(), "select val from vals where id in (0, 1024, 2048, 3072, 4096, 5120, 6144, 7168)")
|
||||
i := 0
|
||||
cnt := 0
|
||||
for rows.Next() {
|
||||
var val int
|
||||
i += 1
|
||||
require.NoError(t, rows.Scan(&val))
|
||||
cnt += val
|
||||
}
|
||||
require.Equal(t, 8, i)
|
||||
t.Logf("successfully updated val %d times", cnt)
|
||||
require.NoError(t, rows.Close())
|
||||
require.NoError(t, rows.Err())
|
||||
require.NoError(t, conn.Close())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user