mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-04 19:41:26 -05:00
Merge remote-tracking branch 'origin/main' into nicktobey/lazy-load
This commit is contained in:
@@ -152,8 +152,6 @@ jobs:
|
||||
cwd: "."
|
||||
pull: "--ff"
|
||||
- name: Check generated protobufs
|
||||
env:
|
||||
USE_BAZEL_VERSION: 7.4.0
|
||||
working-directory: ./proto
|
||||
env:
|
||||
USE_BAZEL_VERSION: 7.4.0
|
||||
|
||||
@@ -16,5 +16,5 @@
|
||||
package doltversion
|
||||
|
||||
const (
|
||||
Version = "1.47.1"
|
||||
Version = "1.47.2"
|
||||
)
|
||||
|
||||
@@ -1728,7 +1728,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
|
||||
// until no possibly-stale ChunkStore state is retained in memory, or failing
|
||||
// certain in-progress operations which cannot be finalized in a timely manner,
|
||||
// etc.
|
||||
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error {
|
||||
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointController types.GCSafepointController) error {
|
||||
collector, ok := ddb.db.Database.(datas.GarbageCollector)
|
||||
if !ok {
|
||||
return fmt.Errorf("this database does not support garbage collection")
|
||||
@@ -1772,7 +1772,7 @@ func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func()
|
||||
return err
|
||||
}
|
||||
|
||||
return collector.GC(ctx, mode, oldGen, newGen, safepointF)
|
||||
return collector.GC(ctx, mode, oldGen, newGen, safepointController)
|
||||
}
|
||||
|
||||
func (ddb *DoltDB) ShallowGC(ctx context.Context) error {
|
||||
|
||||
@@ -419,12 +419,15 @@ func (p *DoltDatabaseProvider) CreateDatabase(ctx *sql.Context, name string) err
|
||||
}
|
||||
|
||||
func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession, rsc *doltdb.ReplicationStatusController) error {
|
||||
// there is no current transaction to commit; this happens in certain tests like
|
||||
currentTx := ctx.GetTransaction()
|
||||
|
||||
err := dSess.CommitTransaction(ctx, currentTx)
|
||||
if err != nil {
|
||||
return err
|
||||
if currentTx != nil {
|
||||
err := dSess.CommitTransaction(ctx, currentTx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
newTx, err := dSess.StartTransaction(ctx, sql.ReadWrite)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package dprocedures
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -27,6 +28,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -57,6 +59,29 @@ func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {
|
||||
|
||||
var ErrServerPerformedGC = errors.New("this connection was established when this server performed an online garbage collection. this connection can no longer be used. please reconnect.")
|
||||
|
||||
type safepointController struct {
|
||||
begin func(context.Context, func(hash.Hash) bool) error
|
||||
preFinalize func(context.Context) error
|
||||
postFinalize func(context.Context) error
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (sc safepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
|
||||
return sc.begin(ctx, keeper)
|
||||
}
|
||||
|
||||
func (sc safepointController) EstablishPreFinalizeSafepoint(ctx context.Context) error {
|
||||
return sc.preFinalize(ctx)
|
||||
}
|
||||
|
||||
func (sc safepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error {
|
||||
return sc.postFinalize(ctx)
|
||||
}
|
||||
|
||||
func (sc safepointController) CancelSafepoint() {
|
||||
sc.cancel()
|
||||
}
|
||||
|
||||
func doDoltGC(ctx *sql.Context, args []string) (int, error) {
|
||||
dbName := ctx.GetCurrentDatabase()
|
||||
|
||||
@@ -116,66 +141,72 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
|
||||
mode = types.GCModeFull
|
||||
}
|
||||
|
||||
// TODO: If we got a callback at the beginning and an
|
||||
// (allowed-to-block) callback at the end, we could more
|
||||
// gracefully tear things down.
|
||||
err = ddb.GC(ctx, mode, func() error {
|
||||
if origepoch != -1 {
|
||||
// Here we need to sanity check role and epoch.
|
||||
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
|
||||
if role.(string) != "primary" {
|
||||
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string))
|
||||
// TODO: Implement safepointController so that begin can capture inflight sessions
|
||||
// and preFinalize can ensure they're all in a good place before returning.
|
||||
sc := safepointController{
|
||||
begin: func(context.Context, func(hash.Hash) bool) error { return nil },
|
||||
preFinalize: func(context.Context) error { return nil },
|
||||
postFinalize: func(context.Context) error {
|
||||
if origepoch != -1 {
|
||||
// Here we need to sanity check role and epoch.
|
||||
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
|
||||
if role.(string) != "primary" {
|
||||
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string))
|
||||
}
|
||||
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
|
||||
if !ok {
|
||||
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.")
|
||||
}
|
||||
if origepoch != epoch.(int) {
|
||||
return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int))
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.")
|
||||
}
|
||||
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
|
||||
if !ok {
|
||||
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.")
|
||||
}
|
||||
if origepoch != epoch.(int) {
|
||||
return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int))
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.")
|
||||
}
|
||||
}
|
||||
|
||||
killed := make(map[uint32]struct{})
|
||||
processes := ctx.ProcessList.Processes()
|
||||
for _, p := range processes {
|
||||
if p.Connection != ctx.Session.ID() {
|
||||
// Kill any inflight query.
|
||||
ctx.ProcessList.Kill(p.Connection)
|
||||
// Tear down the connection itself.
|
||||
ctx.KillConnection(p.Connection)
|
||||
killed[p.Connection] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Look in processes until the connections are actually gone.
|
||||
params := backoff.NewExponentialBackOff()
|
||||
params.InitialInterval = 1 * time.Millisecond
|
||||
params.MaxInterval = 25 * time.Millisecond
|
||||
params.MaxElapsedTime = 3 * time.Second
|
||||
err := backoff.Retry(func() error {
|
||||
killed := make(map[uint32]struct{})
|
||||
processes := ctx.ProcessList.Processes()
|
||||
allgood := true
|
||||
for _, p := range processes {
|
||||
if _, ok := killed[p.Connection]; ok {
|
||||
allgood = false
|
||||
if p.Connection != ctx.Session.ID() {
|
||||
// Kill any inflight query.
|
||||
ctx.ProcessList.Kill(p.Connection)
|
||||
// Tear down the connection itself.
|
||||
ctx.KillConnection(p.Connection)
|
||||
killed[p.Connection] = struct{}{}
|
||||
}
|
||||
}
|
||||
if !allgood {
|
||||
return errors.New("unable to establish safepoint.")
|
||||
|
||||
// Look in processes until the connections are actually gone.
|
||||
params := backoff.NewExponentialBackOff()
|
||||
params.InitialInterval = 1 * time.Millisecond
|
||||
params.MaxInterval = 25 * time.Millisecond
|
||||
params.MaxElapsedTime = 3 * time.Second
|
||||
err := backoff.Retry(func() error {
|
||||
processes := ctx.ProcessList.Processes()
|
||||
allgood := true
|
||||
for _, p := range processes {
|
||||
if _, ok := killed[p.Connection]; ok {
|
||||
allgood = false
|
||||
ctx.ProcessList.Kill(p.Connection)
|
||||
}
|
||||
}
|
||||
if !allgood {
|
||||
return errors.New("unable to establish safepoint.")
|
||||
}
|
||||
return nil
|
||||
}, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.Session.SetTransaction(nil)
|
||||
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
|
||||
return nil
|
||||
}, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.Session.SetTransaction(nil)
|
||||
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
|
||||
return nil
|
||||
})
|
||||
},
|
||||
cancel: func() {},
|
||||
}
|
||||
|
||||
err = ddb.GC(ctx, mode, sc)
|
||||
if err != nil {
|
||||
return cmdFailure, err
|
||||
}
|
||||
|
||||
@@ -448,11 +448,6 @@ func (d *DoltSession) CommitTransaction(ctx *sql.Context, tx sql.Transaction) (e
|
||||
return nil
|
||||
}
|
||||
|
||||
// There is no transaction to commit
|
||||
if tx == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
dirties := d.dirtyWorkingSets()
|
||||
if len(dirties) == 0 {
|
||||
return nil
|
||||
|
||||
@@ -7671,7 +7671,7 @@ var DoltTempTableScripts = []queries.ScriptTest{
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "drop temporary table behavior",
|
||||
Name: "drop temporary table behavior",
|
||||
Dialect: "mysql",
|
||||
SetUpScript: []string{
|
||||
"create table t (i int);",
|
||||
@@ -7723,7 +7723,7 @@ var DoltTempTableScripts = []queries.ScriptTest{
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "drop temporary table t;",
|
||||
Query: "drop temporary table t;",
|
||||
ExpectedErr: sql.ErrUnknownTable,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@@ -509,9 +510,20 @@ func getReplicationRefs(ctx *sql.Context, rrd ReadReplicaDatabase) (
|
||||
func refsToDelete(remRefs, localRefs []doltdb.RefWithHash) []doltdb.RefWithHash {
|
||||
toDelete := make([]doltdb.RefWithHash, 0, len(localRefs))
|
||||
var i, j int
|
||||
|
||||
// Before we map remote refs to local refs to determine which refs to delete, we need to sort them
|
||||
// by Ref.String() – this ensures a unique identifier that does not conflict with other refs, unlike
|
||||
// Ref.GetPath(), which can conflict if a branch or tag has the same name.
|
||||
sort.Slice(remRefs, func(i, j int) bool {
|
||||
return remRefs[i].Ref.String() < remRefs[j].Ref.String()
|
||||
})
|
||||
sort.Slice(localRefs, func(i, j int) bool {
|
||||
return localRefs[i].Ref.String() < localRefs[j].Ref.String()
|
||||
})
|
||||
|
||||
for i < len(remRefs) && j < len(localRefs) {
|
||||
rem := remRefs[i].Ref.GetPath()
|
||||
local := localRefs[j].Ref.GetPath()
|
||||
rem := remRefs[i].Ref.String()
|
||||
local := localRefs[j].Ref.String()
|
||||
if rem == local {
|
||||
i++
|
||||
j++
|
||||
|
||||
@@ -81,12 +81,17 @@ func TestReplicationBranches(t *testing.T) {
|
||||
local: []string{"feature4", "feature5", "feature6", "feature7", "feature8", "feature9"},
|
||||
expToDelete: []string{"feature4", "feature5", "feature6", "feature7", "feature8", "feature9"},
|
||||
},
|
||||
{
|
||||
remote: []string{"main", "new1", "a1"},
|
||||
local: []string{"main", "a1"},
|
||||
expToDelete: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
remoteRefs := make([]doltdb.RefWithHash, len(tt.remote))
|
||||
for i := range tt.remote {
|
||||
remoteRefs[i] = doltdb.RefWithHash{Ref: ref.NewRemoteRef("", tt.remote[i])}
|
||||
remoteRefs[i] = doltdb.RefWithHash{Ref: ref.NewBranchRef(tt.remote[i])}
|
||||
}
|
||||
localRefs := make([]doltdb.RefWithHash, len(tt.local))
|
||||
for i := range tt.local {
|
||||
@@ -97,6 +102,6 @@ func TestReplicationBranches(t *testing.T) {
|
||||
for i := range diff {
|
||||
diffNames[i] = diff[i].Ref.GetPath()
|
||||
}
|
||||
assert.Equal(t, diffNames, tt.expToDelete)
|
||||
assert.Equal(t, tt.expToDelete, diffNames)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,9 +103,13 @@ func ExecuteSql(ctx context.Context, dEnv *env.DoltEnv, root doltdb.RootValue, s
|
||||
}
|
||||
}
|
||||
|
||||
err = dsess.DSessFromSess(sqlCtx.Session).CommitTransaction(sqlCtx, sqlCtx.GetTransaction())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// commit leftover transaction
|
||||
trx := sqlCtx.GetTransaction()
|
||||
if trx != nil {
|
||||
err = dsess.DSessFromSess(sqlCtx.Session).CommitTransaction(sqlCtx, trx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return db.GetRoot(sqlCtx)
|
||||
|
||||
@@ -172,7 +172,9 @@ type MarkAndSweeper interface {
|
||||
// chunks is accessed and copied.
|
||||
SaveHashes(context.Context, []hash.Hash) error
|
||||
|
||||
Close(context.Context) (GCFinalizer, error)
|
||||
Finalize(context.Context) (GCFinalizer, error)
|
||||
|
||||
Close(context.Context) error
|
||||
}
|
||||
|
||||
// A GCFinalizer is returned from a MarkAndSweeper after it is closed.
|
||||
|
||||
@@ -398,10 +398,14 @@ func (i *msvMarkAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *msvMarkAndSweeper) Close(context.Context) (GCFinalizer, error) {
|
||||
func (i *msvMarkAndSweeper) Finalize(context.Context) (GCFinalizer, error) {
|
||||
return msvGcFinalizer{i.ms, i.keepers}, nil
|
||||
}
|
||||
|
||||
func (i *msvMarkAndSweeper) Close(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode) (MarkAndSweeper, error) {
|
||||
if dest != ms {
|
||||
panic("unsupported")
|
||||
|
||||
@@ -198,7 +198,7 @@ type GarbageCollector interface {
|
||||
|
||||
// GC traverses the database starting at the Root and removes
|
||||
// all unreferenced data from persistent storage.
|
||||
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
|
||||
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error
|
||||
}
|
||||
|
||||
// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all
|
||||
|
||||
@@ -1168,8 +1168,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse
|
||||
}
|
||||
|
||||
// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
|
||||
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
|
||||
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF)
|
||||
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error {
|
||||
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointController)
|
||||
}
|
||||
|
||||
func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error {
|
||||
|
||||
@@ -173,6 +173,21 @@ func (tw *CmpChunkTableWriter) Remove() error {
|
||||
return os.Remove(tw.path)
|
||||
}
|
||||
|
||||
// Cancel the inprogress write and attempt to cleanup any
|
||||
// resources associated with it. It is an error to call
|
||||
// Flush{,ToFile} or Reader after canceling the writer.
|
||||
func (tw *CmpChunkTableWriter) Cancel() error {
|
||||
closer, err := tw.sink.Reader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = closer.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tw.Remove()
|
||||
}
|
||||
|
||||
func containsDuplicates(prefixes prefixIndexSlice) bool {
|
||||
if len(prefixes) == 0 {
|
||||
return false
|
||||
|
||||
@@ -45,21 +45,28 @@ func (ea gcErrAccum) Error() string {
|
||||
|
||||
type gcCopier struct {
|
||||
writer *CmpChunkTableWriter
|
||||
tfp tableFilePersister
|
||||
}
|
||||
|
||||
func newGarbageCollectionCopier() (*gcCopier, error) {
|
||||
func newGarbageCollectionCopier(tfp tableFilePersister) (*gcCopier, error) {
|
||||
writer, err := NewCmpChunkTableWriter("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &gcCopier{writer}, nil
|
||||
return &gcCopier{writer, tfp}, nil
|
||||
}
|
||||
|
||||
func (gcc *gcCopier) addChunk(ctx context.Context, c CompressedChunk) error {
|
||||
return gcc.writer.AddCmpChunk(c)
|
||||
}
|
||||
|
||||
func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister) (ts []tableSpec, err error) {
|
||||
// If the writer should be closed and deleted, instead of being used with
|
||||
// copyTablesToDir, call this method.
|
||||
func (gcc *gcCopier) cancel(_ context.Context) error {
|
||||
return gcc.writer.Cancel()
|
||||
}
|
||||
|
||||
func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err error) {
|
||||
var filename string
|
||||
filename, err = gcc.writer.Finish()
|
||||
if err != nil {
|
||||
@@ -79,7 +86,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
|
||||
return nil, fmt.Errorf("invalid filename: %s", filename)
|
||||
}
|
||||
|
||||
exists, err := tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil)
|
||||
exists, err := gcc.tfp.Exists(ctx, addr, uint32(gcc.writer.ChunkCount()), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -94,7 +101,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
|
||||
}
|
||||
|
||||
// Attempt to rename the file to the destination if we are working with a fsTablePersister...
|
||||
if mover, ok := tfp.(movingTableFilePersister); ok {
|
||||
if mover, ok := gcc.tfp.(movingTableFilePersister); ok {
|
||||
err = mover.TryMoveCmpChunkTableWriter(ctx, filename, gcc.writer)
|
||||
if err == nil {
|
||||
return []tableSpec{
|
||||
@@ -114,7 +121,7 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context, tfp tableFilePersister
|
||||
defer r.Close()
|
||||
sz := gcc.writer.ContentLength()
|
||||
|
||||
err = tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount()))
|
||||
err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
+11
-3
@@ -1652,7 +1652,7 @@ func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src NBSCompres
|
||||
return nil, fmt.Errorf("NBS does not support copying garbage collection")
|
||||
}
|
||||
|
||||
gcc, err := newGarbageCollectionCopier()
|
||||
gcc, err := newGarbageCollectionCopier(tfp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1758,11 +1758,12 @@ func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error) {
|
||||
specs, err := i.gcc.copyTablesToDir(ctx, i.tfp)
|
||||
func (i *markAndSweeper) Finalize(ctx context.Context) (chunks.GCFinalizer, error) {
|
||||
specs, err := i.gcc.copyTablesToDir(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i.gcc = nil
|
||||
|
||||
return gcFinalizer{
|
||||
nbs: i.dest,
|
||||
@@ -1771,6 +1772,13 @@ func (i *markAndSweeper) Close(ctx context.Context) (chunks.GCFinalizer, error)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (i *markAndSweeper) Close(ctx context.Context) error {
|
||||
if i.gcc != nil {
|
||||
return i.gcc.cancel(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type gcFinalizer struct {
|
||||
nbs *NomsBlockStore
|
||||
specs []tableSpec
|
||||
|
||||
@@ -345,8 +345,9 @@ func TestNBSCopyGC(t *testing.T) {
|
||||
keepersSlice = append(keepersSlice, h)
|
||||
}
|
||||
require.NoError(t, sweeper.SaveHashes(ctx, keepersSlice))
|
||||
finalizer, err := sweeper.Close(ctx)
|
||||
finalizer, err := sweeper.Finalize(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, sweeper.Close(ctx))
|
||||
require.NoError(t, finalizer.SwapChunksInStore(ctx))
|
||||
st.EndGC()
|
||||
|
||||
|
||||
@@ -565,8 +565,15 @@ const (
|
||||
GCModeFull
|
||||
)
|
||||
|
||||
type GCSafepointController interface {
|
||||
BeginGC(ctx context.Context, keeper func(h hash.Hash) bool) error
|
||||
EstablishPreFinalizeSafepoint(context.Context) error
|
||||
EstablishPostFinalizeSafepoint(context.Context) error
|
||||
CancelSafepoint()
|
||||
}
|
||||
|
||||
// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
|
||||
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
|
||||
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepoint GCSafepointController) error {
|
||||
lvs.versOnce.Do(lvs.expectVersion)
|
||||
|
||||
lvs.transitionToOldGenGC()
|
||||
@@ -600,6 +607,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
|
||||
}
|
||||
defer collector.EndGC()
|
||||
|
||||
var callCancelSafepoint bool
|
||||
if safepoint != nil {
|
||||
err = safepoint.BeginGC(ctx, lvs.gcAddChunk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
callCancelSafepoint = true
|
||||
defer func() {
|
||||
if callCancelSafepoint {
|
||||
safepoint.CancelSafepoint()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
root, err := lvs.Root(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -634,10 +655,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
|
||||
oldGenHasMany = newFileHasMany
|
||||
}
|
||||
|
||||
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
|
||||
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepoint, lvs.transitionToFinalizingGC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
callCancelSafepoint = false
|
||||
|
||||
err = newGenFinalizer.SwapChunksInStore(ctx)
|
||||
if err != nil {
|
||||
@@ -669,6 +691,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
|
||||
}
|
||||
defer collector.EndGC()
|
||||
|
||||
var callCancelSafepoint bool
|
||||
if safepoint != nil {
|
||||
err = safepoint.BeginGC(ctx, lvs.gcAddChunk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
callCancelSafepoint = true
|
||||
defer func() {
|
||||
if callCancelSafepoint {
|
||||
safepoint.CancelSafepoint()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
root, err := lvs.Root(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -682,10 +718,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
|
||||
newGenRefs.Insert(root)
|
||||
|
||||
var finalizer chunks.GCFinalizer
|
||||
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepointF, lvs.transitionToFinalizingGC)
|
||||
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepoint, lvs.transitionToFinalizingGC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
callCancelSafepoint = false
|
||||
|
||||
err = finalizer.SwapChunksInStore(ctx)
|
||||
if err != nil {
|
||||
@@ -718,7 +755,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
|
||||
hashFilter chunks.HasManyFunc,
|
||||
chksMode chunks.GCMode,
|
||||
src, dest chunks.ChunkStoreGarbageCollector,
|
||||
safepointF func() error,
|
||||
safepointController GCSafepointController,
|
||||
finalize func() hash.HashSet) (chunks.GCFinalizer, error) {
|
||||
sweeper, err := src.MarkAndSweepChunks(ctx, lvs.getAddrs, hashFilter, dest, chksMode)
|
||||
if err != nil {
|
||||
@@ -727,18 +764,26 @@ func (lvs *ValueStore) gc(ctx context.Context,
|
||||
|
||||
err = sweeper.SaveHashes(ctx, toVisit.ToSlice())
|
||||
if err != nil {
|
||||
_, cErr := sweeper.Close(ctx)
|
||||
cErr := sweeper.Close(ctx)
|
||||
return nil, errors.Join(err, cErr)
|
||||
}
|
||||
toVisit = nil
|
||||
|
||||
if safepointController != nil {
|
||||
err = safepointController.EstablishPreFinalizeSafepoint(ctx)
|
||||
if err != nil {
|
||||
cErr := sweeper.Close(ctx)
|
||||
return nil, errors.Join(err, cErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Before we call finalize(), we can process the current set of
|
||||
// NewGenToVisit. NewGen -> Finalize is going to block writes until
|
||||
// we are done, so its best to keep it as small as possible.
|
||||
next := lvs.readAndResetNewGenToVisit()
|
||||
err = sweeper.SaveHashes(ctx, next.ToSlice())
|
||||
if err != nil {
|
||||
_, cErr := sweeper.Close(ctx)
|
||||
cErr := sweeper.Close(ctx)
|
||||
return nil, errors.Join(err, cErr)
|
||||
}
|
||||
next = nil
|
||||
@@ -746,18 +791,22 @@ func (lvs *ValueStore) gc(ctx context.Context,
|
||||
final := finalize()
|
||||
err = sweeper.SaveHashes(ctx, final.ToSlice())
|
||||
if err != nil {
|
||||
_, cErr := sweeper.Close(ctx)
|
||||
cErr := sweeper.Close(ctx)
|
||||
return nil, errors.Join(err, cErr)
|
||||
}
|
||||
|
||||
if safepointF != nil {
|
||||
err = safepointF()
|
||||
if safepointController != nil {
|
||||
err = safepointController.EstablishPostFinalizeSafepoint(ctx)
|
||||
if err != nil {
|
||||
_, cErr := sweeper.Close(ctx)
|
||||
cErr := sweeper.Close(ctx)
|
||||
return nil, errors.Join(err, cErr)
|
||||
}
|
||||
}
|
||||
return sweeper.Close(ctx)
|
||||
finalizer, err := sweeper.Finalize(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return finalizer, sweeper.Close(ctx)
|
||||
}
|
||||
|
||||
// Close closes the underlying ChunkStore
|
||||
|
||||
@@ -201,6 +201,48 @@ teardown() {
|
||||
[[ "$output" =~ "b1" ]] || false
|
||||
}
|
||||
|
||||
# When a replica pulls refs, the remote refs are compared with the local refs to identify which local refs
|
||||
# need to be deleted. Branches, tags, and remotes all share the ref space and previous versions of Dolt could
|
||||
# incorrectly map remote refs and local refs, resulting in local refs being incorrectly removed, until future
|
||||
# runs of replica synchronization.
|
||||
@test "replication: local tag refs are not deleted" {
|
||||
# Configure repo1 to push changes on commit and create tag a1
|
||||
cd repo1
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_to_remote remote1
|
||||
dolt sql -q "call dolt_tag('a1');"
|
||||
|
||||
# Configure repo2 to pull changes on read
|
||||
cd ..
|
||||
dolt clone file://./rem1 repo2
|
||||
cd repo2
|
||||
dolt config --local --add sqlserver.global.dolt_read_replica_remote origin
|
||||
dolt config --local --add sqlserver.global.dolt_replicate_all_heads 1
|
||||
run dolt sql -q "select tag_name from dolt_tags;"
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "| tag_name |" ]] || false
|
||||
[[ "$output" =~ "| a1 |" ]] || false
|
||||
|
||||
# Create branch new1 in repo1 – "new1" sorts after "main", but before "a1", and previous
|
||||
# versions of Dolt had problems computing which local refs to delete in this case.
|
||||
cd ../repo1
|
||||
dolt sql -q "call dolt_branch('new1');"
|
||||
|
||||
# Confirm that tag a1 has not been deleted. Note that we need to check for this immediately after
|
||||
# creating branch new1 (i.e. before looking at branches), because the bug in the previous versions
|
||||
# of Dolt would only manifest in the next command, and would be fixed by later remote pulls.
|
||||
cd ../repo2
|
||||
run dolt sql -q "select tag_name from dolt_tags;"
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "| tag_name |" ]] || false
|
||||
[[ "$output" =~ "| a1 |" ]] || false
|
||||
|
||||
# Try again to make sure the results are stable
|
||||
run dolt sql -q "select tag_name from dolt_tags;"
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "| tag_name |" ]] || false
|
||||
[[ "$output" =~ "| a1 |" ]] || false
|
||||
}
|
||||
|
||||
@test "replication: pull branch delete current branch" {
|
||||
skip "broken by latest transaction changes"
|
||||
|
||||
@@ -627,7 +669,6 @@ SQL
|
||||
}
|
||||
|
||||
@test "replication: pull all heads pulls tags" {
|
||||
|
||||
dolt clone file://./rem1 repo2
|
||||
cd repo2
|
||||
dolt checkout -b new_feature
|
||||
|
||||
Reference in New Issue
Block a user