go/store/nbs: store.go: Have MarkAndSweep respect NBSStore mutex. Improve error handling slightly.

This commit is contained in:
Aaron Son
2021-08-05 15:59:37 -07:00
parent cbf225ed22
commit 2e9ead3e78

View File

@@ -1406,14 +1406,25 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
return chunks.ErrUnsupportedOperation
}
if nbs.upstream.root != last {
return errLastRootMismatch
}
precheck := func() error {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
// check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(last, nbs.upstream.specs, nbs.upstream.appendix)
if nbs.upstream.gcGen == gcGenCheck {
return chunks.ErrNothingToCollect
if nbs.upstream.root != last {
return errLastRootMismatch
}
// check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(last, nbs.upstream.specs, nbs.upstream.appendix)
if nbs.upstream.gcGen == gcGenCheck {
return chunks.ErrNothingToCollect
}
return nil
}
err := precheck()
if err != nil {
return err
}
specs, err := nbs.copyMarkedChunks(ctx, keepChunks)
@@ -1432,18 +1443,13 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
return ctx.Err()
}
ok, contents, err := nbs.mm.Fetch(ctx, &Stats{})
if err != nil {
return err
}
if !ok {
panic("no manifest")
}
if ctx.Err() != nil {
return ctx.Err()
}
currentContents := func() manifestContents {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
return nbs.upstream
}()
return nbs.p.PruneTableFiles(ctx, contents)
return nbs.p.PruneTableFiles(ctx, currentContents)
}
func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash) ([]tableSpec, error) {
@@ -1503,7 +1509,18 @@ func (nbs *NomsBlockStore) gcTableSize() (uint64, error) {
return nbs.mtSize, nil
}
func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) error {
func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (err error) {
nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
if err == nil {
err = unlockErr
}
}()
nbs.mu.Lock()
defer nbs.mu.Unlock()
newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{})
newContents := manifestContents{
vers: nbs.upstream.vers,
@@ -1513,19 +1530,13 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) er
specs: specs,
}
var err error
nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
upstream, uerr := nbs.mm.UpdateGCGen(ctx, nbs.upstream.lock, newContents, nbs.stats, nil)
if uerr != nil {
return uerr
}
if err == nil {
err = unlockErr
}
}()
upstream, err := nbs.mm.UpdateGCGen(ctx, nbs.upstream.lock, newContents, nbs.stats, nil)
if err != nil {
return err
if upstream.lock != newContents.lock {
return errors.New("concurrent manifest edit during GC, before swapTables. GC failed.")
}
// clear memTable
@@ -1533,15 +1544,13 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) er
// clear nbs.tables.novel
nbs.tables, err = nbs.tables.Flatten()
if err != nil {
return nil
return err
}
// replace nbs.tables.upstream with gc compacted tables
nbs.upstream = upstream
nbs.tables, err = nbs.tables.Rebase(ctx, specs, nbs.stats)
nbs.tables, err = nbs.tables.Rebase(ctx, upstream.specs, nbs.stats)
if err != nil {
return err
}