From cf4b81217a65ed3015cb768e1e10cd0bc9386a59 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 17 Mar 2023 11:48:18 -0700 Subject: [PATCH] dolt_gc support: Make waitForGC unblock with error on context canceled. Use more agressive retry-with-backoff policy on awaiting process quiesence in dolt_gc. --- .../doltcore/sqle/dprocedures/dolt_gc.go | 23 +++---- go/store/nbs/journal_chunk_source.go | 8 +++ go/store/nbs/store.go | 62 ++++++++++++++++--- go/store/types/value_store.go | 28 ++++----- .../concurrent_gc_test.go | 33 ++++++++-- 5 files changed, 115 insertions(+), 39 deletions(-) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index d5064950c7..ebecb1b749 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -20,6 +20,7 @@ import ( "os" "time" + "github.com/cenkalti/backoff/v4" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/dolt/go/cmd/dolt/cli" @@ -100,23 +101,23 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { killed[p.Connection] = struct{}{} } } + // Look in processes until the connections are actually gone. - for i := 0; i < 100; i++ { - if i == 100 { - return errors.New("unable to establish safepoint.") - } + params := backoff.NewExponentialBackOff() + params.InitialInterval = 1 * time.Millisecond + params.MaxInterval = 25 * time.Millisecond + params.MaxElapsedTime = 3 * time.Second + err := backoff.Retry(func() error { processes := ctx.ProcessList.Processes() - done := true for _, p := range processes { if _, ok := killed[p.Connection]; ok { - done = false - break + return errors.New("unable to establish safepoint.") } } - if done { - break - } - time.Sleep(50 * time.Millisecond) + return nil + }, params) + if err != nil { + return err } ctx.Session.SetTransaction(nil) dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC) diff --git a/go/store/nbs/journal_chunk_source.go b/go/store/nbs/journal_chunk_source.go index 31b8cab188..16dbaff89c 100644 --- a/go/store/nbs/journal_chunk_source.go +++ b/go/store/nbs/journal_chunk_source.go @@ -73,10 +73,14 @@ func (s journalChunkSource) getMany(ctx context.Context, _ *errgroup.Group, reqs var remaining bool // todo: read planning for i := range reqs { + if reqs[i].found { + continue + } data, err := s.get(ctx, *reqs[i].a, stats) if err != nil { return false, err } else if data != nil { + reqs[i].found = true ch := chunks.NewChunkWithHash(hash.Hash(*reqs[i].a), data) found(ctx, &ch) } else { @@ -90,12 +94,16 @@ func (s journalChunkSource) getManyCompressed(ctx context.Context, _ *errgroup.G var remaining bool // todo: read planning for i := range reqs { + if reqs[i].found { + continue + } cc, err := s.getCompressed(ctx, *reqs[i].a, stats) if err != nil { return false, err } else if cc.IsEmpty() { remaining = true } else { + reqs[i].found = true found(ctx, cc) } } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index ceb16cbd5b..989308bc61 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -165,7 +165,10 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) { nbs.mu.Lock() defer nbs.mu.Unlock() - nbs.waitForGC() + err = nbs.waitForGC(ctx) + if err != nil { + return + } nbs.checkAllManifestUpdatesExist(ctx, updates) @@ -244,7 +247,10 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash. func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (mi ManifestInfo, err error) { nbs.mu.Lock() defer nbs.mu.Unlock() - nbs.waitForGC() + err = nbs.waitForGC(ctx) + if err != nil { + return + } nbs.checkAllManifestUpdatesExist(ctx, updates) @@ -392,7 +398,10 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root hash.Hash, tableFiles map[hash.Hash]uint32, appendixTableFiles map[hash.Hash]uint32) (err error) { store.mu.Lock() defer store.mu.Unlock() - store.waitForGC() + err = store.waitForGC(ctx) + if err != nil { + return + } contents := manifestContents{ root: root, @@ -604,10 +613,20 @@ func (nbs *NomsBlockStore) WithoutConjoiner() *NomsBlockStore { } // Wait for GC to complete to continue with writes -func (nbs *NomsBlockStore) waitForGC() { - for nbs.gcInProgress { +func (nbs *NomsBlockStore) waitForGC(ctx context.Context) error { + stop := make(chan struct{}) + defer close(stop) + go func() { + select { + case <-ctx.Done(): + nbs.cond.Broadcast() + case <-stop: + } + }() + for nbs.gcInProgress && ctx.Err() == nil { nbs.cond.Wait() } + return ctx.Err() } func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error { @@ -668,7 +687,9 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs if nbs.keeperFunc != nil { if nbs.keeperFunc(ch.Hash()) { retry = true - nbs.waitForGC() + if err := nbs.waitForGC(ctx); err != nil { + return false, err + } } } } @@ -1032,7 +1053,10 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash, if nbs.keeperFunc != nil { if nbs.keeperFunc(current) { - nbs.waitForGC() + err = nbs.waitForGC(ctx) + if err != nil { + return false, err + } } } @@ -1614,7 +1638,24 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e } oldTables := nbs.tables nbs.tables, nbs.upstream = ts, upstream - return oldTables.close() + err = oldTables.close() + if err != nil { + return err + } + + // When this is called, we are at a safepoint in the GC process. + // We clear novel and the memtable, which are not coming with us + // into the new store. + oldNovel := nbs.tables.novel + nbs.tables.novel = make(chunkSourceSet) + for _, css := range oldNovel { + err = css.close() + if err != nil { + return err + } + } + nbs.mt = nil + return nil } // SetRootChunk changes the root chunk hash from the previous value to the new root. @@ -1625,7 +1666,10 @@ func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash func (nbs *NomsBlockStore) setRootChunk(ctx context.Context, root, previous hash.Hash, checker refCheck) error { nbs.mu.Lock() defer nbs.mu.Unlock() - nbs.waitForGC() + err := nbs.waitForGC(ctx) + if err != nil { + return err + } for { err := nbs.updateManifest(ctx, root, previous, checker) diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index e442a46f6f..ab9cbb0903 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -573,15 +573,15 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS return err } - if safepointF != nil { - err = safepointF() - if err != nil { - newGen.EndGC() - return err - } + err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, lvs.transitionToFinalizingGC) + if err != nil { + newGen.EndGC() + return err } - err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, lvs.transitionToFinalizingGC) + if safepointF != nil { + err = safepointF() + } newGen.EndGC() if err != nil { return err @@ -610,15 +610,15 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS newGenRefs.Insert(root) - if safepointF != nil { - err = safepointF() - if err != nil { - collector.EndGC() - return err - } + err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, lvs.transitionToFinalizingGC) + if err != nil { + collector.EndGC() + return err } - err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, lvs.transitionToFinalizingGC) + if safepointF != nil { + err = safepointF() + } collector.EndGC() if err != nil { return err diff --git a/integration-tests/go-sql-server-driver/concurrent_gc_test.go b/integration-tests/go-sql-server-driver/concurrent_gc_test.go index 0309549832..10d37d1015 100644 --- a/integration-tests/go-sql-server-driver/concurrent_gc_test.go +++ b/integration-tests/go-sql-server-driver/concurrent_gc_test.go @@ -29,6 +29,9 @@ import ( ) func TestConcurrentGC(t *testing.T) { + NumThreads := 8 + Duration := 30 * time.Second + u, err := driver.NewDoltUser() require.NoError(t, err) t.Cleanup(func() { @@ -57,7 +60,7 @@ func TestConcurrentGC(t *testing.T) { _, err = conn.ExecContext(context.Background(), "create table vals (id int primary key, val int)") require.NoError(t, err) vals := []string{} - for i := 0; i < 7 * 1024; i++ { + for i := 0; i <= 7 * 1024; i++ { vals = append(vals, fmt.Sprintf("(%d,0)", i)) } _, err = conn.ExecContext(context.Background(), "insert into vals values " + strings.Join(vals, ",")) @@ -65,15 +68,14 @@ func TestConcurrentGC(t *testing.T) { }() start := time.Now() - dur := 30 * time.Second var eg errgroup.Group // We're going to spawn 8 threads, each running mutations on their own part of the table... - for i := 0; i < 8; i++ { + for i := 0; i < NumThreads; i++ { i := i * 1024 eg.Go(func() error { - for j := 0; time.Since(start) < dur; j++ { + for j := 0; time.Since(start) < Duration; j++ { func() { conn, err := db.Conn(context.Background()) if err != nil { @@ -93,17 +95,21 @@ func TestConcurrentGC(t *testing.T) { // We spawn a thread which calls dolt_gc() periodically eg.Go(func() error { - for time.Since(start) < dur { + for time.Since(start) < Duration { func() { conn, err := db.Conn(context.Background()) if err != nil { t.Logf("err in Conn for dolt_gc: %v", err) return } + b := time.Now() _, err = conn.ExecContext(context.Background(), "call dolt_gc()") if err != nil { t.Logf("err in Exec dolt_gc: %v", err) + } else { + t.Logf("successful dolt_gc took %v", time.Since(b)) } + // After calling dolt_gc, the connection is bad. Remove it from the connection pool. conn.Raw(func(_ any) error { return sqldriver.ErrBadConn }) @@ -114,4 +120,21 @@ func TestConcurrentGC(t *testing.T) { }) eg.Wait() + + conn, err := db.Conn(context.Background()) + require.NoError(t, err) + rows, err := conn.QueryContext(context.Background(), "select val from vals where id in (0, 1024, 2048, 3072, 4096, 5120, 6144, 7168)") + i := 0 + cnt := 0 + for rows.Next() { + var val int + i += 1 + require.NoError(t, rows.Scan(&val)) + cnt += val + } + require.Equal(t, 8, i) + t.Logf("successfully updated val %d times", cnt) + require.NoError(t, rows.Close()) + require.NoError(t, rows.Err()) + require.NoError(t, conn.Close()) }