mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-23 02:40:49 -05:00
Clean up and refactor for better naming
This commit is contained in:
@@ -271,18 +271,9 @@ func ConfigureServices(
|
||||
InitAutoGCController := &svcs.AnonService{
|
||||
InitF: func(context.Context) error {
|
||||
if cfg.ServerConfig.AutoGCBehavior() != nil && cfg.ServerConfig.AutoGCBehavior().Enable() {
|
||||
|
||||
// NM4 - there has got to be a better way.
|
||||
var cmp chunks.GCCompression
|
||||
switch cfg.ServerConfig.AutoGCBehavior().ArchiveLevel() {
|
||||
case 0:
|
||||
cmp = chunks.OldSkhool
|
||||
case 1:
|
||||
cmp = chunks.NewSkhool
|
||||
case 2:
|
||||
cmp = chunks.FutureSkhool
|
||||
default:
|
||||
panic("invalid archive level")
|
||||
cmp := chunks.GCArchiveLevel(cfg.ServerConfig.AutoGCBehavior().ArchiveLevel())
|
||||
if cmp < chunks.NoArchive || cmp > chunks.MaxArchiveLevel {
|
||||
return fmt.Errorf("invalid value for %s: %d", cli.ArchiveLevelParam, cmp)
|
||||
}
|
||||
|
||||
config.AutoGCController = sqle.NewAutoGCController(cmp, lgr)
|
||||
|
||||
@@ -1775,7 +1775,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
|
||||
// until no possibly-stale ChunkStore state is retained in memory, or failing
|
||||
// certain in-progress operations which cannot be finalized in a timely manner,
|
||||
// etc.
|
||||
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, cmp chunks.GCCompression, safepointController types.GCSafepointController) error {
|
||||
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, cmp chunks.GCArchiveLevel, safepointController types.GCSafepointController) error {
|
||||
collector, ok := ddb.db.Database.(datas.GarbageCollector)
|
||||
if !ok {
|
||||
return fmt.Errorf("this database does not support garbage collection")
|
||||
|
||||
@@ -141,7 +141,7 @@ func testGarbageCollection(t *testing.T, test gcTest) {
|
||||
}
|
||||
|
||||
ddb := dEnv.DoltDB(ctx)
|
||||
err := ddb.GC(ctx, types.GCModeDefault, chunks.OldSkhool, purgingSafepointController{ddb})
|
||||
err := ddb.GC(ctx, types.GCModeDefault, chunks.NoArchive, purgingSafepointController{ddb})
|
||||
require.NoError(t, err)
|
||||
test.postGCFunc(ctx, t, dEnv.DoltDB(ctx), res)
|
||||
|
||||
@@ -210,7 +210,7 @@ func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) {
|
||||
_, err = ns.Write(ctx, c1.Node())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ddb.GC(ctx, types.GCModeDefault, chunks.OldSkhool, purgingSafepointController{ddb})
|
||||
err = ddb.GC(ctx, types.GCModeDefault, chunks.NoArchive, purgingSafepointController{ddb})
|
||||
require.NoError(t, err)
|
||||
|
||||
c2 := newIntMap(t, ctx, ns, 2, 2)
|
||||
|
||||
@@ -53,16 +53,15 @@ type AutoGCController struct {
|
||||
ctxF func(context.Context) (*sql.Context, error)
|
||||
threads *sql.BackgroundThreads
|
||||
|
||||
// NM4 - does config get stuffed in here???
|
||||
cmpLevel chunks.GCCompression
|
||||
arcLevel chunks.GCArchiveLevel
|
||||
}
|
||||
|
||||
func NewAutoGCController(cmpLevel chunks.GCCompression, lgr *logrus.Logger) *AutoGCController {
|
||||
func NewAutoGCController(arcLevel chunks.GCArchiveLevel, lgr *logrus.Logger) *AutoGCController {
|
||||
return &AutoGCController{
|
||||
workCh: make(chan autoGCWork),
|
||||
lgr: lgr,
|
||||
hooks: make(map[string]*autoGCCommitHook),
|
||||
cmpLevel: cmpLevel,
|
||||
arcLevel: arcLevel,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,7 +154,7 @@ func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF fun
|
||||
defer sql.SessionEnd(sqlCtx.Session)
|
||||
sql.SessionCommandBegin(sqlCtx.Session)
|
||||
defer sql.SessionCommandEnd(sqlCtx.Session)
|
||||
err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault, c.cmpLevel, work.name)
|
||||
err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault, c.arcLevel, work.name)
|
||||
if err != nil {
|
||||
if !errors.Is(err, chunks.ErrNothingToCollect) {
|
||||
c.lgr.Warnf("sqle/auto_gc: Attempt to auto GC database %s failed with error: %v", work.name, err)
|
||||
|
||||
@@ -40,12 +40,12 @@ func TestAutoGCController(t *testing.T) {
|
||||
}
|
||||
t.Run("Hook", func(t *testing.T) {
|
||||
t.Run("NeverStarted", func(t *testing.T) {
|
||||
controller := NewAutoGCController(chunks.OldSkhool, NewLogger())
|
||||
controller := NewAutoGCController(chunks.NoArchive, NewLogger())
|
||||
hook := controller.newCommitHook("some_database", nil)
|
||||
hook.stop()
|
||||
})
|
||||
t.Run("StartedBeforeNewHook", func(t *testing.T) {
|
||||
controller := NewAutoGCController(chunks.OldSkhool, NewLogger())
|
||||
controller := NewAutoGCController(chunks.NoArchive, NewLogger())
|
||||
bg := sql.NewBackgroundThreads()
|
||||
defer bg.Shutdown()
|
||||
err := controller.RunBackgroundThread(bg, CtxFactory)
|
||||
@@ -57,7 +57,7 @@ func TestAutoGCController(t *testing.T) {
|
||||
hook.stop()
|
||||
})
|
||||
t.Run("StartedAfterNewHook", func(t *testing.T) {
|
||||
controller := NewAutoGCController(chunks.OldSkhool, NewLogger())
|
||||
controller := NewAutoGCController(chunks.NoArchive, NewLogger())
|
||||
bg := sql.NewBackgroundThreads()
|
||||
defer bg.Shutdown()
|
||||
ctx := context.Background()
|
||||
@@ -69,7 +69,7 @@ func TestAutoGCController(t *testing.T) {
|
||||
hook.stop()
|
||||
})
|
||||
t.Run("ExecuteOnCanceledCtx", func(t *testing.T) {
|
||||
controller := NewAutoGCController(chunks.OldSkhool, NewLogger())
|
||||
controller := NewAutoGCController(chunks.NoArchive, NewLogger())
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
dEnv := CreateTestEnvWithName("some_database")
|
||||
@@ -79,7 +79,7 @@ func TestAutoGCController(t *testing.T) {
|
||||
})
|
||||
})
|
||||
t.Run("gcBgThread", func(t *testing.T) {
|
||||
controller := NewAutoGCController(chunks.OldSkhool, NewLogger())
|
||||
controller := NewAutoGCController(chunks.NoArchive, NewLogger())
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
@@ -93,7 +93,7 @@ func TestAutoGCController(t *testing.T) {
|
||||
})
|
||||
t.Run("DatabaseProviderHooks", func(t *testing.T) {
|
||||
t.Run("Unstarted", func(t *testing.T) {
|
||||
controller := NewAutoGCController(chunks.OldSkhool, NewLogger())
|
||||
controller := NewAutoGCController(chunks.NoArchive, NewLogger())
|
||||
ctx, err := CtxFactory(context.Background())
|
||||
require.NoError(t, err)
|
||||
dEnv := CreateTestEnvWithName("some_database")
|
||||
@@ -102,7 +102,7 @@ func TestAutoGCController(t *testing.T) {
|
||||
controller.DropDatabaseHook()(nil, "some_database")
|
||||
})
|
||||
t.Run("Started", func(t *testing.T) {
|
||||
controller := NewAutoGCController(chunks.OldSkhool, NewLogger())
|
||||
controller := NewAutoGCController(chunks.NoArchive, NewLogger())
|
||||
bg := sql.NewBackgroundThreads()
|
||||
defer bg.Shutdown()
|
||||
err := controller.RunBackgroundThread(bg, CtxFactory)
|
||||
|
||||
@@ -241,16 +241,16 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
|
||||
mode = types.GCModeFull
|
||||
}
|
||||
|
||||
cmpLvl := chunks.OldSkhool
|
||||
cmpLvl := chunks.NoArchive
|
||||
if apr.Contains(cli.ArchiveLevelParam) {
|
||||
lvl, ok := apr.GetInt(cli.ArchiveLevelParam)
|
||||
if !ok {
|
||||
return cmdFailure, fmt.Errorf("parse error for value for %s: %s", cli.ArchiveLevelParam, apr.GetValues()[cli.ArchiveLevelParam])
|
||||
}
|
||||
if lvl < int(chunks.OldSkhool) || lvl > int(chunks.FutureSkhool) {
|
||||
if lvl < int(chunks.NoArchive) || lvl > int(chunks.MaxArchiveLevel) {
|
||||
return cmdFailure, fmt.Errorf("invalid value for %s: %d", cli.ArchiveLevelParam, lvl)
|
||||
}
|
||||
cmpLvl = chunks.GCCompression(lvl)
|
||||
cmpLvl = chunks.GCArchiveLevel(lvl)
|
||||
}
|
||||
|
||||
err := RunDoltGC(ctx, ddb, mode, cmpLvl, ctx.GetCurrentDatabase())
|
||||
@@ -262,7 +262,7 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
|
||||
return cmdSuccess, nil
|
||||
}
|
||||
|
||||
func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chunks.GCCompression, dbname string) error {
|
||||
func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chunks.GCArchiveLevel, dbname string) error {
|
||||
var sc types.GCSafepointController
|
||||
if UseSessionAwareSafepointController {
|
||||
dSess := dsess.DSessFromSess(ctx.Session)
|
||||
|
||||
@@ -208,13 +208,16 @@ const (
|
||||
GCMode_Full
|
||||
)
|
||||
|
||||
type GCCompression int
|
||||
type GCArchiveLevel int
|
||||
|
||||
// NM4 - not sure why the types package is where we define stuff like this...
|
||||
const (
|
||||
OldSkhool GCCompression = iota
|
||||
NewSkhool
|
||||
FutureSkhool
|
||||
// NoArchive means that the GC process will write chunks into the classic table format with Snappy compression.
|
||||
NoArchive GCArchiveLevel = iota
|
||||
// SimpleArchive means that the GC process will write chunks into archives, using a single dictionary for all chunks.
|
||||
SimpleArchive
|
||||
// GroupedArchive means that the GC process will write chunks into archives, using chunk group dictionaries.
|
||||
GroupedArchive
|
||||
MaxArchiveLevel = SimpleArchive // Currently GroupedArchives are not supported in GC.
|
||||
)
|
||||
|
||||
// ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection.
|
||||
@@ -245,7 +248,7 @@ type ChunkStoreGarbageCollector interface {
|
||||
// filtered through the |filter| and their references are walked with
|
||||
// |getAddrs|, each of those addresses being filtered and copied as
|
||||
// well.
|
||||
MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode, cmp GCCompression) (MarkAndSweeper, error)
|
||||
MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode, cmp GCArchiveLevel) (MarkAndSweeper, error)
|
||||
|
||||
// Count returns the number of chunks in the store.
|
||||
Count() (uint32, error)
|
||||
|
||||
@@ -406,7 +406,7 @@ func (i *msvMarkAndSweeper) Close(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, _ GCMode, _ GCCompression) (MarkAndSweeper, error) {
|
||||
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, _ GCMode, _ GCArchiveLevel) (MarkAndSweeper, error) {
|
||||
if dest != ms {
|
||||
panic("unsupported")
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ func (s *TestStoreView) EndGC(mode GCMode) {
|
||||
collector.EndGC(mode)
|
||||
}
|
||||
|
||||
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode, cmp GCCompression) (MarkAndSweeper, error) {
|
||||
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrsCurry, filter HasManyFunc, dest ChunkStore, mode GCMode, cmp GCArchiveLevel) (MarkAndSweeper, error) {
|
||||
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
|
||||
if !ok || dest != s {
|
||||
return nil, ErrUnsupportedOperation
|
||||
|
||||
@@ -198,7 +198,7 @@ type GarbageCollector interface {
|
||||
|
||||
// GC traverses the database starting at the Root and removes
|
||||
// all unreferenced data from persistent storage.
|
||||
GC(ctx context.Context, mode types.GCMode, cmp chunks.GCCompression, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error
|
||||
GC(ctx context.Context, mode types.GCMode, cmp chunks.GCArchiveLevel, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error
|
||||
}
|
||||
|
||||
// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all
|
||||
|
||||
@@ -1168,7 +1168,7 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse
|
||||
}
|
||||
|
||||
// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
|
||||
func (db *database) GC(ctx context.Context, mode types.GCMode, cmp chunks.GCCompression, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error {
|
||||
func (db *database) GC(ctx context.Context, mode types.GCMode, cmp chunks.GCArchiveLevel, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error {
|
||||
return db.ValueStore.GC(ctx, mode, cmp, oldGenRefs, newGenRefs, safepointController)
|
||||
}
|
||||
|
||||
|
||||
@@ -49,13 +49,16 @@ type gcCopier struct {
|
||||
tfp tableFilePersister
|
||||
}
|
||||
|
||||
func newGarbageCollectionCopier(cmp chunks.GCCompression, tfp tableFilePersister) (*gcCopier, error) {
|
||||
func newGarbageCollectionCopier(cmp chunks.GCArchiveLevel, tfp tableFilePersister) (*gcCopier, error) {
|
||||
var writer GenericTableWriter
|
||||
var err error
|
||||
if cmp == chunks.NewSkhool { // NM4 - FutureSkhool too??? May need to group after?? Not sure how this is gonna work.
|
||||
switch cmp {
|
||||
case chunks.SimpleArchive:
|
||||
writer, err = NewArchiveStreamWriter("")
|
||||
} else {
|
||||
case chunks.NoArchive:
|
||||
writer, err = NewCmpChunkTableWriter("")
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid archive level: %s", cmp)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -530,7 +530,7 @@ func (gcs *GenerationalNBS) EndGC(mode chunks.GCMode) {
|
||||
gcs.newGen.EndGC(mode)
|
||||
}
|
||||
|
||||
func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode, cmp chunks.GCCompression) (chunks.MarkAndSweeper, error) {
|
||||
func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode, cmp chunks.GCArchiveLevel) (chunks.MarkAndSweeper, error) {
|
||||
return markAndSweepChunks(ctx, gcs.newGen, gcs, dest, getAddrs, filter, mode, cmp)
|
||||
}
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ func (nbsMW *NBSMetricWrapper) EndGC(mode chunks.GCMode) {
|
||||
nbsMW.nbs.EndGC(mode)
|
||||
}
|
||||
|
||||
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode, cmp chunks.GCCompression) (chunks.MarkAndSweeper, error) {
|
||||
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode, cmp chunks.GCArchiveLevel) (chunks.MarkAndSweeper, error) {
|
||||
return nbsMW.nbs.MarkAndSweepChunks(ctx, getAddrs, filter, dest, mode, cmp)
|
||||
}
|
||||
|
||||
|
||||
@@ -1896,12 +1896,12 @@ func (nbs *NomsBlockStore) beginRead() (endRead func()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode, cmp chunks.GCCompression) (chunks.MarkAndSweeper, error) {
|
||||
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, dest chunks.ChunkStore, mode chunks.GCMode, cmp chunks.GCArchiveLevel) (chunks.MarkAndSweeper, error) {
|
||||
valctx.ValidateContext(ctx)
|
||||
return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, mode, cmp)
|
||||
}
|
||||
|
||||
func markAndSweepChunks(_ context.Context, nbs *NomsBlockStore, src CompressedChunkStoreForGC, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode, cmp chunks.GCCompression) (chunks.MarkAndSweeper, error) {
|
||||
func markAndSweepChunks(_ context.Context, nbs *NomsBlockStore, src CompressedChunkStoreForGC, dest chunks.ChunkStore, getAddrs chunks.GetAddrsCurry, filter chunks.HasManyFunc, mode chunks.GCMode, cmp chunks.GCArchiveLevel) (chunks.MarkAndSweeper, error) {
|
||||
ops := nbs.SupportedOperations()
|
||||
if !ops.CanGC || !ops.CanPrune {
|
||||
return nil, chunks.ErrUnsupportedOperation
|
||||
|
||||
@@ -339,7 +339,7 @@ func TestNBSCopyGC(t *testing.T) {
|
||||
noopFilter := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) {
|
||||
return hashes, nil
|
||||
}
|
||||
sweeper, err := st.MarkAndSweepChunks(ctx, noopGetAddrs, noopFilter, nil, chunks.GCMode_Full, chunks.OldSkhool)
|
||||
sweeper, err := st.MarkAndSweepChunks(ctx, noopGetAddrs, noopFilter, nil, chunks.GCMode_Full, chunks.NoArchive)
|
||||
require.NoError(t, err)
|
||||
keepersSlice := make([]hash.Hash, 0, len(keepers))
|
||||
for h := range keepers {
|
||||
|
||||
@@ -547,7 +547,7 @@ type GCSafepointController interface {
|
||||
}
|
||||
|
||||
// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
|
||||
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, cmp chunks.GCCompression, oldGenRefs, newGenRefs hash.HashSet, safepoint GCSafepointController) error {
|
||||
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, cmp chunks.GCArchiveLevel, oldGenRefs, newGenRefs hash.HashSet, safepoint GCSafepointController) error {
|
||||
lvs.versOnce.Do(lvs.expectVersion)
|
||||
|
||||
lvs.transitionToOldGenGC()
|
||||
@@ -718,7 +718,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
|
||||
toVisit hash.HashSet,
|
||||
hashFilter chunks.HasManyFunc,
|
||||
chksMode chunks.GCMode,
|
||||
cmp chunks.GCCompression,
|
||||
cmp chunks.GCArchiveLevel,
|
||||
src, dest chunks.ChunkStoreGarbageCollector,
|
||||
safepointController GCSafepointController,
|
||||
finalize func() hash.HashSet) (chunks.GCFinalizer, error) {
|
||||
|
||||
@@ -201,7 +201,7 @@ func TestGC(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(v2)
|
||||
|
||||
err = vs.GC(ctx, GCModeDefault, chunks.OldSkhool, hash.HashSet{}, hash.HashSet{}, purgingSafepointController{vs})
|
||||
err = vs.GC(ctx, GCModeDefault, chunks.NoArchive, hash.HashSet{}, hash.HashSet{}, purgingSafepointController{vs})
|
||||
require.NoError(t, err)
|
||||
|
||||
v1, err = vs.ReadValue(ctx, h1) // non-nil
|
||||
|
||||
Reference in New Issue
Block a user