Merge pull request #5451 from dolthub/aaron/nbs-exists-check-only-on-UpdateManifest

[no-release-notes] go/store/nbs: Rework tablePersister.Exists checks to only do them when we add files external to the NomsBlockStore.
This commit is contained in:
Aaron Son
2023-03-01 14:34:36 -08:00
committed by GitHub
6 changed files with 59 additions and 61 deletions
+1
View File
@@ -518,6 +518,7 @@ func (mp manualPart) run(ctx context.Context, buff []byte) error {
if err != nil {
return err
}
defer reader.Close()
_, err = io.ReadFull(reader, buff[mp.start:mp.end])
return err
}
+8 -9
View File
@@ -91,7 +91,7 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper
// process actor has already landed a conjoin of its own. Callers must
// handle this, likely by rebasing against upstream and re-evaluating the
// situation.
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, error) {
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, cleanupFunc, error) {
var conjoined tableSpec
var conjoinees, keepers, appendixSpecs []tableSpec
var cleanup cleanupFunc
@@ -108,12 +108,12 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
var err error
conjoinees, keepers, err = s.chooseConjoinees(upstream.specs)
if err != nil {
return manifestContents{}, err
return manifestContents{}, nil, err
}
conjoined, cleanup, err = conjoinTables(ctx, conjoinees, p, stats)
if err != nil {
return manifestContents{}, err
return manifestContents{}, nil, err
}
}
@@ -137,12 +137,11 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
var err error
upstream, err = mm.Update(ctx, upstream.lock, newContents, stats, nil)
if err != nil {
return manifestContents{}, err
return manifestContents{}, nil, err
}
if newContents.lock == upstream.lock {
cleanup()
return upstream, nil
return upstream, cleanup, nil
}
// Optimistic lock failure. Someone else moved to the root, the
@@ -158,11 +157,11 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
// and let the client retry
if len(appendixSpecs) > 0 {
if len(upstream.appendix) != len(appendixSpecs) {
return upstream, nil
return upstream, func() {}, nil
}
for i := range upstream.appendix {
if upstream.appendix[i].name != appendixSpecs[i].name {
return upstream, nil
return upstream, func() {}, nil
}
}
@@ -179,7 +178,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
}
for _, c := range conjoinees {
if _, present := upstreamNames[c.name]; !present {
return upstream, nil // Bail!
return upstream, func() {}, nil // Bail!
}
conjoineeSet[c.name] = struct{}{}
}
+8 -8
View File
@@ -206,7 +206,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
t.Run(c.name, func(t *testing.T) {
fm, p, upstream := setup(startLock, startRoot, c.precompact)
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats)
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats)
require.NoError(t, err)
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
require.NoError(t, err)
@@ -227,7 +227,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
specs := append([]tableSpec{}, upstream.specs...)
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, newTable), nil)
}}
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
require.NoError(t, err)
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
require.NoError(t, err)
@@ -247,7 +247,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
u := updatePreemptManifest{fm, func() {
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, upstream.specs[1:], nil)
}}
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
require.NoError(t, err)
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
require.NoError(t, err)
@@ -289,7 +289,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
t.Run(c.name, func(t *testing.T) {
fm, p, upstream := setupAppendix(startLock, startRoot, c.precompact, c.appendix)
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats)
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats)
require.NoError(t, err)
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
require.NoError(t, err)
@@ -313,7 +313,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, newTable), upstream.appendix)
}}
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
require.NoError(t, err)
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
require.NoError(t, err)
@@ -338,7 +338,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, upstream.specs...), append(app, newTable))
}}
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
require.NoError(t, err)
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
require.NoError(t, err)
@@ -362,7 +362,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
u := updatePreemptManifest{fm, func() {
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, upstream.specs[len(c.appendix)+1:], upstream.appendix[:])
}}
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
require.NoError(t, err)
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
require.NoError(t, err)
@@ -386,7 +386,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, specs, append([]tableSpec{}, newTable))
}}
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
require.NoError(t, err)
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
require.NoError(t, err)
+9 -4
View File
@@ -220,7 +220,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
}
if plan.chunkCount == 0 {
return emptyChunkSource{}, nil, nil
return emptyChunkSource{}, func() {}, nil
}
name := nameFromSuffixes(plan.suffixes())
@@ -250,22 +250,27 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
}()
for _, sws := range plan.sources.sws {
var r io.Reader
var r io.ReadCloser
r, _, ferr = sws.source.reader(ctx)
if ferr != nil {
return "", cleanup, ferr
}
n, ferr := io.CopyN(temp, r, int64(sws.dataLen))
if ferr != nil {
r.Close()
return "", cleanup, ferr
}
if uint64(n) != sws.dataLen {
r.Close()
return "", cleanup, errors.New("failed to copy all data")
}
err := r.Close()
if err != nil {
return "", cleanup, err
}
}
_, ferr = temp.Write(plan.mergedIndex)
+27 -11
View File
@@ -165,6 +165,8 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
defer nbs.mu.Unlock()
nbs.waitForGC()
nbs.checkAllManifestUpdatesExist(ctx, updates)
nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
@@ -211,11 +213,6 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
}
}
err = nbs.tables.checkAllTablesExist(ctx, contents.specs, nbs.stats)
if err != nil {
return manifestContents{}, err
}
updatedContents, err = nbs.mm.Update(ctx, originalLock, contents, nbs.stats, nil)
if err != nil {
return manifestContents{}, err
@@ -247,6 +244,8 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
defer nbs.mu.Unlock()
nbs.waitForGC()
nbs.checkAllManifestUpdatesExist(ctx, updates)
nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
@@ -294,11 +293,6 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
return manifestContents{}, err
}
err = nbs.tables.checkAllTablesExist(ctx, contents.specs, nbs.stats)
if err != nil {
return manifestContents{}, err
}
updatedContents, err = nbs.mm.Update(ctx, originalLock, contents, nbs.stats, nil)
if err != nil {
return manifestContents{}, err
@@ -324,6 +318,27 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
return updatedContents, nil
}
func (nbs *NomsBlockStore) checkAllManifestUpdatesExist(ctx context.Context, updates map[hash.Hash]uint32) error {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(128)
for h, c := range updates {
h := h
c := c
eg.Go(func() error {
a := addr(h)
ok, err := nbs.p.Exists(ctx, a, c, nbs.stats)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("missing table file referenced in UpdateManifest call: %v", a)
}
return nil
})
}
return eg.Wait()
}
func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSpecs []tableSpec, option ManifestAppendixOption) (manifestContents, error) {
contents, upstreamAppendixSpecs := upstream.removeAppendixSpecs()
switch option {
@@ -1105,7 +1120,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
}
if nbs.c.conjoinRequired(nbs.tables) {
newUpstream, err := conjoin(ctx, nbs.c, nbs.upstream, nbs.mm, nbs.p, nbs.stats)
newUpstream, cleanup, err := conjoin(ctx, nbs.c, nbs.upstream, nbs.mm, nbs.p, nbs.stats)
if err != nil {
return err
}
@@ -1122,6 +1137,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
if err != nil {
return err
}
cleanup()
return errOptimisticLockFailedTables
}
+6 -29
View File
@@ -332,29 +332,6 @@ func (ts tableSet) flatten(ctx context.Context) (tableSet, error) {
return flattened, nil
}
func (ts tableSet) checkAllTablesExist(ctx context.Context, specs []tableSpec, stats *Stats) error {
eg, ectx := errgroup.WithContext(ctx)
eg.SetLimit(128)
for _, s := range specs {
// if the table file already exists in our upstream chunkSourceSet, we do not need to
// check with the upstream if it still exists.
if _, ok := ts.upstream[s.name]; ok {
continue
}
spec := s
eg.Go(func() error {
exists, err := ts.p.Exists(ectx, spec.name, spec.chunkCount, stats)
if err != nil {
return err
} else if !exists {
return fmt.Errorf("table spec does not exist")
}
return nil
})
}
return eg.Wait()
}
// rebase returns a new tableSet holding the novel tables managed by |ts| and
// those specified by |specs|.
func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats) (tableSet, error) {
@@ -389,10 +366,6 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats)
novel[t2.hash()] = t2
}
// newly opened tables are unowned, we must
// close them if the rebase operation fails
opened := make(chunkSourceSet, len(specs))
eg, ctx := errgroup.WithContext(ctx)
mu := new(sync.Mutex)
upstream := make(chunkSourceSet, len(specs))
@@ -401,6 +374,11 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats)
if cs, ok := ts.upstream[s.name]; ok {
cl, err := cs.clone()
if err != nil {
_ = eg.Wait()
for _, cs := range upstream {
// close any opened chunkSources
_ = cs.close()
}
return tableSet{}, err
}
mu.Lock()
@@ -417,14 +395,13 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats)
}
mu.Lock()
upstream[cs.hash()] = cs
opened[cs.hash()] = cs
mu.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
for _, cs := range opened {
for _, cs := range upstream {
// close any opened chunkSources
_ = cs.close()
}