go/store/nbs: fix reentrant deadlock in ref checker

This commit is contained in:
Andy Arthur
2023-01-23 17:44:56 -08:00
parent dcce248733
commit f2cc89edc0
2 changed files with 11 additions and 19 deletions

View File

@@ -136,6 +136,8 @@ func (gcs *GenerationalNBS) Has(ctx context.Context, h hash.Hash) (bool, error)
// HasMany returns a new HashSet containing any members of |hashes| that are absent from the store.
func (gcs *GenerationalNBS) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) {
gcs.newGen.mu.RLock()
defer gcs.newGen.mu.RUnlock()
return gcs.hasMany(toHasRecords(hashes))
}
@@ -146,6 +148,9 @@ func (gcs *GenerationalNBS) hasMany(recs []hasRecord) (absent hash.HashSet, err
} else if len(absent) == 0 {
return absent, nil
}
gcs.oldGen.mu.RLock()
defer gcs.oldGen.mu.RUnlock()
return gcs.oldGen.hasMany(recs)
}
@@ -166,19 +171,7 @@ func (gcs *GenerationalNBS) errorIfDangling(ctx context.Context, addrs hash.Hash
// to Flush(). Put may be called concurrently with other calls to Put(),
// Get(), GetMany(), Has() and HasMany().
func (gcs *GenerationalNBS) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
addrs, err := getAddrs(ctx, c)
if err != nil {
return err
}
err = gcs.errorIfDangling(ctx, addrs)
if err != nil {
return err
}
return gcs.newGen.Put(ctx, c, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
})
return gcs.newGen.Put(ctx, c, getAddrs)
}
// Returns the NomsVersion with which this ChunkSource is compatible.

View File

@@ -623,12 +623,11 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
type refCheck func(reqs []hasRecord) (hash.HashSet, error)
func (nbs *NomsBlockStore) errorIfDangling(checker refCheck) error {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
if (nbs.mt == nil || nbs.mt.pendingRefs == nil) && (len(nbs.tables.novel) == 0) {
return nil // no refs to check
}
sort.Sort(hasRecordByPrefix(nbs.mt.pendingRefs))
absent, err := checker(nbs.mt.pendingRefs)
if err != nil {
return err
@@ -860,15 +859,13 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
defer nbs.stats.HasLatency.SampleTimeSince(t1)
nbs.stats.AddressesPerHas.SampleLen(hashes.Size())
nbs.mu.RLock()
defer nbs.mu.RUnlock()
return nbs.hasMany(toHasRecords(hashes))
}
func (nbs *NomsBlockStore) hasMany(reqs []hasRecord) (hash.HashSet, error) {
sort.Sort(hasRecordByPrefix(reqs))
tables, remaining, err := func() (tables chunkReader, remaining bool, err error) {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
tables = nbs.tables
remaining = true
@@ -983,9 +980,11 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
}
// check for dangling references in |nbs.mt|
nbs.mu.Lock()
if err = nbs.errorIfDangling(checker); err != nil {
return false, err
}
nbs.mu.Unlock()
err = func() error {
// This is unfortunate. We want to serialize commits to the same store