mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-30 03:26:47 -05:00
Merge pull request #1980 from dolthub/aaron/store-nbs-manifest-update-lock-fixes
[no-release-notes] go/store/nbs: Improve manifest lock handling.
This commit is contained in:
@@ -561,7 +561,7 @@ func (fc *fakeConjoiner) Conjoin(ctx context.Context, upstream manifestContents,
|
||||
vers: constants.NomsVersion,
|
||||
root: upstream.root,
|
||||
specs: canned.specs,
|
||||
lock: generateLockHash(upstream.root, canned.specs),
|
||||
lock: generateLockHash(upstream.root, canned.specs, []tableSpec{}),
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
@@ -102,7 +102,7 @@ func conjoin(ctx context.Context, upstream manifestContents, mm manifestUpdater,
|
||||
newContents := manifestContents{
|
||||
vers: upstream.vers,
|
||||
root: upstream.root,
|
||||
lock: generateLockHash(upstream.root, specs),
|
||||
lock: generateLockHash(upstream.root, specs, appendixSpecs),
|
||||
gcGen: upstream.gcGen,
|
||||
specs: specs,
|
||||
appendix: appendixSpecs,
|
||||
|
||||
@@ -509,9 +509,13 @@ func formatSpecs(specs []tableSpec, tableInfo []string) {
|
||||
// persisted manifest against the lock hash it saw last time it loaded the
|
||||
// contents of a manifest. If they do not match, the client must not update
|
||||
// the persisted manifest.
|
||||
func generateLockHash(root hash.Hash, specs []tableSpec) (lock addr) {
|
||||
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) (lock addr) {
|
||||
blockHash := sha512.New()
|
||||
blockHash.Write(root[:])
|
||||
for _, spec := range appendix {
|
||||
blockHash.Write(spec.name[:])
|
||||
}
|
||||
blockHash.Write([]byte{0})
|
||||
for _, spec := range specs {
|
||||
blockHash.Write(spec.name[:])
|
||||
}
|
||||
|
||||
+134
-114
@@ -206,50 +206,54 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
|
||||
var stats Stats
|
||||
var ok bool
|
||||
var contents manifestContents
|
||||
ok, contents, err = nbs.mm.Fetch(ctx, &stats)
|
||||
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
} else if !ok {
|
||||
contents = manifestContents{vers: nbs.upstream.vers}
|
||||
}
|
||||
|
||||
currSpecs := contents.getSpecSet()
|
||||
|
||||
var addCount int
|
||||
for h, count := range updates {
|
||||
a := addr(h)
|
||||
|
||||
if _, ok := currSpecs[a]; !ok {
|
||||
addCount++
|
||||
contents.specs = append(contents.specs, tableSpec{a, count})
|
||||
var updatedContents manifestContents
|
||||
for {
|
||||
ok, contents, ferr := nbs.mm.Fetch(ctx, nbs.stats)
|
||||
if ferr != nil {
|
||||
return manifestContents{}, ferr
|
||||
} else if !ok {
|
||||
contents = manifestContents{vers: nbs.upstream.vers}
|
||||
}
|
||||
}
|
||||
|
||||
if addCount == 0 {
|
||||
return contents, nil
|
||||
}
|
||||
originalLock := contents.lock
|
||||
|
||||
// ensure we dont drop existing appendices
|
||||
if contents.appendix != nil && len(contents.appendix) > 0 {
|
||||
contents, err = fromManifestAppendixOptionNewContents(contents, contents.appendix, ManifestAppendixOption_Set)
|
||||
currSpecs := contents.getSpecSet()
|
||||
|
||||
var addCount int
|
||||
for h, count := range updates {
|
||||
a := addr(h)
|
||||
|
||||
if _, ok := currSpecs[a]; !ok {
|
||||
addCount++
|
||||
contents.specs = append(contents.specs, tableSpec{a, count})
|
||||
}
|
||||
}
|
||||
|
||||
if addCount == 0 {
|
||||
return contents, nil
|
||||
}
|
||||
|
||||
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
|
||||
|
||||
// ensure we dont drop existing appendices
|
||||
if contents.appendix != nil && len(contents.appendix) > 0 {
|
||||
contents, err = fromManifestAppendixOptionNewContents(contents, contents.appendix, ManifestAppendixOption_Set)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
}
|
||||
|
||||
updatedContents, err = nbs.mm.Update(ctx, originalLock, contents, nbs.stats, nil)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
if updatedContents.lock == contents.lock {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var updatedContents manifestContents
|
||||
updatedContents, err = nbs.mm.Update(ctx, contents.lock, contents, &stats, nil)
|
||||
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
newTables, err := nbs.tables.Rebase(ctx, contents.specs, nbs.stats)
|
||||
|
||||
newTables, err := nbs.tables.Rebase(ctx, updatedContents.specs, nbs.stats)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
@@ -278,50 +282,55 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
|
||||
var stats Stats
|
||||
var ok bool
|
||||
var contents manifestContents
|
||||
ok, contents, err = nbs.mm.Fetch(ctx, &stats)
|
||||
var updatedContents manifestContents
|
||||
for {
|
||||
ok, contents, ferr := nbs.mm.Fetch(ctx, nbs.stats)
|
||||
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
} else if !ok {
|
||||
contents = manifestContents{vers: nbs.upstream.vers}
|
||||
}
|
||||
if ferr != nil {
|
||||
return manifestContents{}, ferr
|
||||
} else if !ok {
|
||||
contents = manifestContents{vers: nbs.upstream.vers}
|
||||
}
|
||||
|
||||
currAppendixSpecs := contents.getAppendixSet()
|
||||
originalLock := contents.lock
|
||||
|
||||
appendixSpecs := make([]tableSpec, 0)
|
||||
var addCount int
|
||||
for h, count := range updates {
|
||||
a := addr(h)
|
||||
currAppendixSpecs := contents.getAppendixSet()
|
||||
|
||||
if option == ManifestAppendixOption_Set {
|
||||
appendixSpecs = append(appendixSpecs, tableSpec{a, count})
|
||||
} else {
|
||||
if _, ok := currAppendixSpecs[a]; !ok {
|
||||
addCount++
|
||||
appendixSpecs := make([]tableSpec, 0)
|
||||
var addCount int
|
||||
for h, count := range updates {
|
||||
a := addr(h)
|
||||
|
||||
if option == ManifestAppendixOption_Set {
|
||||
appendixSpecs = append(appendixSpecs, tableSpec{a, count})
|
||||
} else {
|
||||
if _, ok := currAppendixSpecs[a]; !ok {
|
||||
addCount++
|
||||
appendixSpecs = append(appendixSpecs, tableSpec{a, count})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if addCount == 0 && option != ManifestAppendixOption_Set {
|
||||
return contents, nil
|
||||
}
|
||||
|
||||
contents, err = fromManifestAppendixOptionNewContents(contents, appendixSpecs, option)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
updatedContents, err = nbs.mm.Update(ctx, originalLock, contents, nbs.stats, nil)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
if updatedContents.lock == contents.lock {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if addCount == 0 && option != ManifestAppendixOption_Set {
|
||||
return contents, nil
|
||||
}
|
||||
|
||||
contents, err = fromManifestAppendixOptionNewContents(contents, appendixSpecs, option)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
var updatedContents manifestContents
|
||||
updatedContents, err = nbs.mm.Update(ctx, contents.lock, contents, &stats, nil)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
|
||||
newTables, err := nbs.tables.Rebase(ctx, contents.specs, nbs.stats)
|
||||
newTables, err := nbs.tables.Rebase(ctx, updatedContents.specs, nbs.stats)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
}
|
||||
@@ -349,6 +358,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
|
||||
newAppendixSpecs := append([]tableSpec{}, upstreamAppendixSpecs...)
|
||||
contents.appendix = append(newAppendixSpecs, appendixSpecs...)
|
||||
|
||||
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
|
||||
return contents, nil
|
||||
case ManifestAppendixOption_Set:
|
||||
if len(appendixSpecs) < 1 {
|
||||
@@ -362,6 +372,8 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
|
||||
|
||||
// append new appendix specs to contents.appendix
|
||||
contents.appendix = append([]tableSpec{}, appendixSpecs...)
|
||||
|
||||
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
|
||||
return contents, nil
|
||||
default:
|
||||
return manifestContents{}, ErrUnsupportedManifestAppendixOption
|
||||
@@ -1070,7 +1082,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
|
||||
newContents := manifestContents{
|
||||
vers: nbs.upstream.vers,
|
||||
root: current,
|
||||
lock: generateLockHash(current, specs),
|
||||
lock: generateLockHash(current, specs, appendixSpecs),
|
||||
gcGen: nbs.upstream.gcGen,
|
||||
specs: specs,
|
||||
appendix: appendixSpecs,
|
||||
@@ -1145,8 +1157,7 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []TableFile,
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
|
||||
stats := &Stats{}
|
||||
exists, contents, err := nbs.mm.m.ParseIfExists(ctx, stats, nil)
|
||||
exists, contents, err := nbs.mm.m.ParseIfExists(ctx, nbs.stats, nil)
|
||||
|
||||
if err != nil {
|
||||
return hash.Hash{}, nil, nil, err
|
||||
@@ -1212,8 +1223,7 @@ func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) {
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
|
||||
stats := &Stats{}
|
||||
exists, contents, err := nbs.mm.m.ParseIfExists(ctx, stats, nil)
|
||||
exists, contents, err := nbs.mm.m.ParseIfExists(ctx, nbs.stats, nil)
|
||||
|
||||
if err != nil {
|
||||
return uint64(0), err
|
||||
@@ -1402,14 +1412,25 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
|
||||
return chunks.ErrUnsupportedOperation
|
||||
}
|
||||
|
||||
if nbs.upstream.root != last {
|
||||
return errLastRootMismatch
|
||||
}
|
||||
precheck := func() error {
|
||||
nbs.mu.RLock()
|
||||
defer nbs.mu.RUnlock()
|
||||
|
||||
// check to see if the specs have changed since last gc. If they haven't bail early.
|
||||
gcGenCheck := generateLockHash(last, nbs.upstream.specs)
|
||||
if nbs.upstream.gcGen == gcGenCheck {
|
||||
return chunks.ErrNothingToCollect
|
||||
if nbs.upstream.root != last {
|
||||
return errLastRootMismatch
|
||||
}
|
||||
|
||||
// check to see if the specs have changed since last gc. If they haven't bail early.
|
||||
gcGenCheck := generateLockHash(last, nbs.upstream.specs, nbs.upstream.appendix)
|
||||
if nbs.upstream.gcGen == gcGenCheck {
|
||||
return chunks.ErrNothingToCollect
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
err := precheck()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
destNBS := nbs
|
||||
@@ -1439,6 +1460,14 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
currentContents := func() manifestContents {
|
||||
nbs.mu.RLock()
|
||||
defer nbs.mu.RUnlock()
|
||||
return nbs.upstream
|
||||
}()
|
||||
|
||||
return nbs.p.PruneTableFiles(ctx, currentContents)
|
||||
} else {
|
||||
fileIdToNumChunks := tableSpecsToMap(specs)
|
||||
err = destNBS.AddTableFilesToManifest(ctx, fileIdToNumChunks)
|
||||
@@ -1446,20 +1475,8 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
ok, contents, err := nbs.mm.Fetch(ctx, &Stats{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
panic("no manifest")
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return nbs.p.PruneTableFiles(ctx, contents)
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, dest *NomsBlockStore) ([]tableSpec, error) {
|
||||
@@ -1519,8 +1536,19 @@ func (nbs *NomsBlockStore) gcTableSize() (uint64, error) {
|
||||
return nbs.mtSize, nil
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) error {
|
||||
newLock := generateLockHash(nbs.upstream.root, specs)
|
||||
func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (err error) {
|
||||
nbs.mm.LockForUpdate()
|
||||
defer func() {
|
||||
unlockErr := nbs.mm.UnlockForUpdate()
|
||||
if err == nil {
|
||||
err = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
nbs.mu.Lock()
|
||||
defer nbs.mu.Unlock()
|
||||
|
||||
newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{})
|
||||
newContents := manifestContents{
|
||||
vers: nbs.upstream.vers,
|
||||
root: nbs.upstream.root,
|
||||
@@ -1534,19 +1562,13 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) er
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
nbs.mm.LockForUpdate()
|
||||
defer func() {
|
||||
unlockErr := nbs.mm.UnlockForUpdate()
|
||||
upstream, uerr := nbs.mm.UpdateGCGen(ctx, nbs.upstream.lock, newContents, nbs.stats, nil)
|
||||
if uerr != nil {
|
||||
return uerr
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
err = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
upstream, err := nbs.mm.UpdateGCGen(ctx, nbs.upstream.lock, newContents, nbs.stats, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
if upstream.lock != newContents.lock {
|
||||
return errors.New("concurrent manifest edit during GC, before swapTables. GC failed.")
|
||||
}
|
||||
|
||||
// clear memTable
|
||||
@@ -1554,15 +1576,13 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) er
|
||||
|
||||
// clear nbs.tables.novel
|
||||
nbs.tables, err = nbs.tables.Flatten()
|
||||
|
||||
if err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// replace nbs.tables.upstream with gc compacted tables
|
||||
nbs.upstream = upstream
|
||||
nbs.tables, err = nbs.tables.Rebase(ctx, specs, nbs.stats)
|
||||
|
||||
nbs.tables, err = nbs.tables.Rebase(ctx, upstream.specs, nbs.stats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -671,13 +671,24 @@ func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.Hash
|
||||
walker := newParallelRefWalker(ctx, lvs.nbf, concurrency)
|
||||
|
||||
eg.Go(func() error {
|
||||
defer func() {
|
||||
close(keepChunks)
|
||||
_ = walker.Close()
|
||||
}()
|
||||
defer walker.Close()
|
||||
|
||||
visited := toVisit.Copy()
|
||||
return lvs.gcProcessRefs(ctx, visited, []hash.HashSet{toVisit}, keepHashes, walker, hashFilter)
|
||||
err := lvs.gcProcessRefs(ctx, visited, []hash.HashSet{toVisit}, keepHashes, walker, hashFilter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// NOTE: We do not defer this close here. When keepChunks
|
||||
// closes, it signals to NBSStore.MarkAndSweepChunks that we
|
||||
// are done walking the references. If gcProcessRefs returns an
|
||||
// error, we did not successfully walk all references and we do
|
||||
// not want MarkAndSweepChunks finishing its work, swaping
|
||||
// table files, etc. It would be racing with returning an error
|
||||
// here. Instead, we have returned the error above and that
|
||||
// will force it to fail when the errgroup ctx fails.
|
||||
close(keepChunks)
|
||||
return nil
|
||||
})
|
||||
|
||||
err := eg.Wait()
|
||||
@@ -685,16 +696,6 @@ func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.Hash
|
||||
return err
|
||||
}
|
||||
|
||||
if lvs.numBuffChunks() > 0 {
|
||||
return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.")
|
||||
}
|
||||
|
||||
// purge the cache
|
||||
lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size())
|
||||
lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize)
|
||||
lvs.bufferedChunkSize = 0
|
||||
lvs.withBufferedChildren = map[hash.Hash]uint64{}
|
||||
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
@@ -736,6 +737,20 @@ func (lvs *ValueStore) gcProcessRefs(ctx context.Context, visited hash.HashSet,
|
||||
toVisitCount += len(hashes)
|
||||
}
|
||||
}
|
||||
|
||||
lvs.bufferMu.Lock()
|
||||
defer lvs.bufferMu.Unlock()
|
||||
|
||||
if len(lvs.bufferedChunks) > 0 {
|
||||
return errors.New("invalid GC state; bufferedChunks started empty and was not empty at end of run.")
|
||||
}
|
||||
|
||||
// purge the cache
|
||||
lvs.decodedChunks = sizecache.New(lvs.decodedChunks.Size())
|
||||
lvs.bufferedChunks = make(map[hash.Hash]chunks.Chunk, lvs.bufferedChunkSize)
|
||||
lvs.bufferedChunkSize = 0
|
||||
lvs.withBufferedChildren = map[hash.Hash]uint64{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user