diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 266b8f792b..a0e964353f 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1406,14 +1406,25 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has return chunks.ErrUnsupportedOperation } - if nbs.upstream.root != last { - return errLastRootMismatch - } + precheck := func() error { + nbs.mu.RLock() + defer nbs.mu.RUnlock() - // check to see if the specs have changed since last gc. If they haven't bail early. - gcGenCheck := generateLockHash(last, nbs.upstream.specs, nbs.upstream.appendix) - if nbs.upstream.gcGen == gcGenCheck { - return chunks.ErrNothingToCollect + if nbs.upstream.root != last { + return errLastRootMismatch + } + + // check to see if the specs have changed since last gc. If they haven't bail early. + gcGenCheck := generateLockHash(last, nbs.upstream.specs, nbs.upstream.appendix) + if nbs.upstream.gcGen == gcGenCheck { + return chunks.ErrNothingToCollect + } + + return nil + } + err := precheck() + if err != nil { + return err } specs, err := nbs.copyMarkedChunks(ctx, keepChunks) @@ -1432,18 +1443,13 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has return ctx.Err() } - ok, contents, err := nbs.mm.Fetch(ctx, &Stats{}) - if err != nil { - return err - } - if !ok { - panic("no manifest") - } - if ctx.Err() != nil { - return ctx.Err() - } + currentContents := func() manifestContents { + nbs.mu.RLock() + defer nbs.mu.RUnlock() + return nbs.upstream + }() - return nbs.p.PruneTableFiles(ctx, contents) + return nbs.p.PruneTableFiles(ctx, currentContents) } func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash) ([]tableSpec, error) { @@ -1503,7 +1509,18 @@ func (nbs *NomsBlockStore) gcTableSize() (uint64, error) { return nbs.mtSize, nil } -func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) error { +func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (err error) { + nbs.mm.LockForUpdate() + defer func() { + unlockErr := nbs.mm.UnlockForUpdate() + if err == nil { + err = unlockErr + } + }() + + nbs.mu.Lock() + defer nbs.mu.Unlock() + newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{}) newContents := manifestContents{ vers: nbs.upstream.vers, @@ -1513,19 +1530,13 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) er specs: specs, } - var err error - nbs.mm.LockForUpdate() - defer func() { - unlockErr := nbs.mm.UnlockForUpdate() + upstream, uerr := nbs.mm.UpdateGCGen(ctx, nbs.upstream.lock, newContents, nbs.stats, nil) + if uerr != nil { + return uerr + } - if err == nil { - err = unlockErr - } - }() - - upstream, err := nbs.mm.UpdateGCGen(ctx, nbs.upstream.lock, newContents, nbs.stats, nil) - if err != nil { - return err + if upstream.lock != newContents.lock { + return errors.New("concurrent manifest edit during GC, before swapTables. GC failed.") } // clear memTable @@ -1533,15 +1544,13 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) er // clear nbs.tables.novel nbs.tables, err = nbs.tables.Flatten() - if err != nil { - return nil + return err } // replace nbs.tables.upstream with gc compacted tables nbs.upstream = upstream - nbs.tables, err = nbs.tables.Rebase(ctx, specs, nbs.stats) - + nbs.tables, err = nbs.tables.Rebase(ctx, upstream.specs, nbs.stats) if err != nil { return err }