go/store/types: value_store.go: Minor cleanups to error handling and sanity checks in GC.

This commit is contained in:
Aaron Son
2021-08-06 09:16:26 -07:00
parent 467a519fb2
commit 690aed042d

View File

@@ -671,13 +671,23 @@ func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.Hash
walker := newParallelRefWalker(ctx, lvs.nbf, concurrency)
eg.Go(func() error {
defer func() {
close(keepChunks)
_ = walker.Close()
}()
defer walker.Close()
visited := toVisit.Copy()
return lvs.gcProcessRefs(ctx, visited, []hash.HashSet{toVisit}, keepHashes, walker, hashFilter)
err := lvs.gcProcessRefs(ctx, visited, []hash.HashSet{toVisit}, keepHashes, walker, hashFilter)
if err != nil {
return err
}
// NOTE: We do not defer this close here. When keepChunk's
// closes, it signals to NBSStore.MarkAndSweepChunks that we
// are done walking the references. If gcProcessRefs returns an
// error, we did not successfully walk all references and we do
// not want MarkAndSweepChunks finishing its work, swaping
// table files, etc., to race with returning an error here.
// Instead, force it to fail when the errgroup ctx fails.
close(keepChunks)
return nil
})
err := eg.Wait()
@@ -685,16 +695,6 @@ func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.Hash
return err
}
if lvs.numBuffChunks() > 0 {
return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.")
}
// purge the cache
lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size())
lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize)
lvs.bufferedChunkSize = 0
lvs.withBufferedChildren = map[hash.Hash]uint64{}
return eg.Wait()
}
@@ -736,6 +736,20 @@ func (lvs *ValueStore) gcProcessRefs(ctx context.Context, visited hash.HashSet,
toVisitCount += len(hashes)
}
}
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
if len(lvs.bufferedChunks) > 0 {
return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.")
}
// purge the cache
lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size())
lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize)
lvs.bufferedChunkSize = 0
lvs.withBufferedChildren = map[hash.Hash]uint64{}
return nil
}