mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-11 18:49:14 -06:00
go/store/nbs: drop chunk journal in swapTables -> UpdateGCGen following a garbage collection
This commit is contained in:
@@ -213,9 +213,7 @@ func (fm fileManifest) UpdateGCGen(ctx context.Context, lastLock addr, newConten
|
||||
checker := func(upstream, contents manifestContents) error {
|
||||
if contents.gcGen == upstream.gcGen {
|
||||
return errors.New("UpdateGCGen() must update the garbage collection generation")
|
||||
}
|
||||
|
||||
if contents.root != upstream.root {
|
||||
} else if contents.root != upstream.root {
|
||||
return errors.New("UpdateGCGen() cannot update the root")
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -199,35 +199,15 @@ func (j *chunkJournal) Exists(ctx context.Context, name addr, chunkCount uint32,
|
||||
|
||||
// PruneTableFiles implements tablePersister.
|
||||
func (j *chunkJournal) PruneTableFiles(ctx context.Context, keeper func() []addr, mtime time.Time) error {
|
||||
// sanity check that we're not deleting the journal
|
||||
var keepJournal bool
|
||||
for _, a := range keeper() {
|
||||
if a == journalAddr {
|
||||
keepJournal = true
|
||||
}
|
||||
}
|
||||
if !keepJournal {
|
||||
ok, prev, err := j.backing.ParseIfExists(ctx, &Stats{}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if !ok {
|
||||
return errors.New("failed to find manifest at " + j.path)
|
||||
}
|
||||
// flush |j.contents| (with latest root hash) to the backing manifest file
|
||||
contents, err := j.backing.Update(ctx, prev.lock, j.contents, &Stats{}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if contents.lock != j.contents.lock {
|
||||
return errOptimisticLockFailedTables
|
||||
}
|
||||
wr := j.wr
|
||||
j.wr = nil
|
||||
// close current journal writer and delete files
|
||||
if err = wr.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = deleteJournalAndIndexFiles(ctx, wr.path); err != nil {
|
||||
return err
|
||||
}
|
||||
if j.wr != nil && !keepJournal {
|
||||
return errors.New("cannot drop chunk journal through tablePersister.PruneTableFiles()")
|
||||
}
|
||||
return j.persister.PruneTableFiles(ctx, keeper, mtime)
|
||||
}
|
||||
@@ -266,18 +246,9 @@ func (j *chunkJournal) Update(ctx context.Context, lastLock addr, next manifestC
|
||||
|
||||
// if |next| has a different table file set, flush to |j.backing|
|
||||
if !equalSpecs(j.contents.specs, next.specs) {
|
||||
_, mc, err := j.backing.ParseIfExists(ctx, stats, nil)
|
||||
if err != nil {
|
||||
if err := j.flushToBackingManifest(ctx, next, stats); err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
lastLock = mc.lock
|
||||
|
||||
mc, err = j.backing.Update(ctx, lastLock, next, stats, nil)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
} else if mc.lock != next.lock {
|
||||
return manifestContents{}, errOptimisticLockFailedTables
|
||||
}
|
||||
}
|
||||
|
||||
if err := j.wr.commitRootHash(next.root); err != nil {
|
||||
@@ -292,19 +263,63 @@ func (j *chunkJournal) Update(ctx context.Context, lastLock addr, next manifestC
|
||||
func (j *chunkJournal) UpdateGCGen(ctx context.Context, lastLock addr, next manifestContents, stats *Stats, writeHook func() error) (manifestContents, error) {
|
||||
updater, ok := j.backing.(manifestGCGenUpdater)
|
||||
if !ok {
|
||||
err := fmt.Errorf("backing manifest (%s) does not support garbage collection", j.backing.Name())
|
||||
return manifestContents{}, fmt.Errorf("manifest (%s) does not support garbage collection", j.backing.Name())
|
||||
} else if j.contents.lock != lastLock {
|
||||
return j.contents, nil // |next| is stale
|
||||
}
|
||||
|
||||
// UpdateGCGen below cannot update the root hash, only the GC generation
|
||||
// flush |j.contents| with the latest root hash here
|
||||
if err := j.flushToBackingManifest(ctx, j.contents, stats); err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
latest, err := updater.UpdateGCGen(ctx, lastLock, next, stats, writeHook)
|
||||
latest, err := updater.UpdateGCGen(ctx, j.contents.lock, next, stats, writeHook)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
} else if latest.root == next.root {
|
||||
j.contents = next // success
|
||||
}
|
||||
|
||||
// if we're landing a new manifest without the chunk journal
|
||||
// then physically delete the journal here and cleanup |j.wr|
|
||||
if !containsJournalSpec(latest.specs) {
|
||||
if err = j.dropJournalWriter(ctx); err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
}
|
||||
return latest, nil
|
||||
}
|
||||
|
||||
// flushToBackingManifest attempts to update the backing file manifest with |next|. This is necessary
|
||||
// when making manifest updates other than root hash updates (adding new table files, updating GC gen, etc).
|
||||
func (j *chunkJournal) flushToBackingManifest(ctx context.Context, next manifestContents, stats *Stats) error {
|
||||
_, prev, err := j.backing.ParseIfExists(ctx, stats, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var mc manifestContents
|
||||
mc, err = j.backing.Update(ctx, prev.lock, next, stats, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if mc.lock != next.lock {
|
||||
return errOptimisticLockFailedTables
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *chunkJournal) dropJournalWriter(ctx context.Context) error {
|
||||
curr := j.wr
|
||||
if j.wr == nil {
|
||||
return nil
|
||||
}
|
||||
j.wr = nil
|
||||
if err := curr.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return deleteJournalAndIndexFiles(ctx, curr.path)
|
||||
}
|
||||
|
||||
// ParseIfExists implements manifest.
|
||||
func (j *chunkJournal) ParseIfExists(ctx context.Context, stats *Stats, readHook func() error) (ok bool, mc manifestContents, err error) {
|
||||
if j.wr == nil {
|
||||
@@ -371,3 +386,12 @@ func (c journalConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, ke
|
||||
keepers = append(keepers, stash)
|
||||
return
|
||||
}
|
||||
|
||||
func containsJournalSpec(specs []tableSpec) (ok bool) {
|
||||
for _, spec := range specs {
|
||||
if spec.name == journalAddr {
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -68,6 +68,22 @@ SQL
|
||||
[ "$status" -eq "0" ]
|
||||
}
|
||||
|
||||
@test "garbage_collection: call GC in sql script" {
|
||||
export DOLT_ENABLE_GC_PROCEDURE="true"
|
||||
dolt sql <<SQL
|
||||
CREATE TABLE t (pk int primary key);
|
||||
INSERT INTO t VALUES (1),(2),(3);
|
||||
CALL dolt_commit('-Am', 'new table with three rows');
|
||||
INSERT INTO t VALUES (11),(12),(13);
|
||||
SQL
|
||||
dolt reset --hard
|
||||
dolt sql <<SQL
|
||||
INSERT INTO t VALUES (21),(22),(23);
|
||||
CALL dolt_commit('-Am', 'new table with three rows');
|
||||
CALL dolt_gc();
|
||||
SQL
|
||||
}
|
||||
|
||||
@test "garbage_collection: blob types work after GC" {
|
||||
dolt sql -q "create table t(pk int primary key, val text)"
|
||||
dolt sql -q "insert into t values (1, 'one'), (2, 'two');"
|
||||
|
||||
Reference in New Issue
Block a user