mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-25 18:49:36 -06:00
go/store/nbs: Fix up some file handle leaks and attempts at early cleanup which fail on Windows.
This commit is contained in:
@@ -518,6 +518,7 @@ func (mp manualPart) run(ctx context.Context, buff []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer reader.Close()
|
||||
_, err = io.ReadFull(reader, buff[mp.start:mp.end])
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper
|
||||
// process actor has already landed a conjoin of its own. Callers must
|
||||
// handle this, likely by rebasing against upstream and re-evaluating the
|
||||
// situation.
|
||||
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, error) {
|
||||
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, cleanupFunc, error) {
|
||||
var conjoined tableSpec
|
||||
var conjoinees, keepers, appendixSpecs []tableSpec
|
||||
var cleanup cleanupFunc
|
||||
@@ -108,12 +108,12 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
|
||||
var err error
|
||||
conjoinees, keepers, err = s.chooseConjoinees(upstream.specs)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
return manifestContents{}, nil, err
|
||||
}
|
||||
|
||||
conjoined, cleanup, err = conjoinTables(ctx, conjoinees, p, stats)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
return manifestContents{}, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,12 +137,11 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
|
||||
var err error
|
||||
upstream, err = mm.Update(ctx, upstream.lock, newContents, stats, nil)
|
||||
if err != nil {
|
||||
return manifestContents{}, err
|
||||
return manifestContents{}, nil, err
|
||||
}
|
||||
|
||||
if newContents.lock == upstream.lock {
|
||||
cleanup()
|
||||
return upstream, nil
|
||||
return upstream, cleanup, nil
|
||||
}
|
||||
|
||||
// Optimistic lock failure. Someone else moved to the root, the
|
||||
@@ -158,11 +157,11 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
|
||||
// and let the client retry
|
||||
if len(appendixSpecs) > 0 {
|
||||
if len(upstream.appendix) != len(appendixSpecs) {
|
||||
return upstream, nil
|
||||
return upstream, func() {}, nil
|
||||
}
|
||||
for i := range upstream.appendix {
|
||||
if upstream.appendix[i].name != appendixSpecs[i].name {
|
||||
return upstream, nil
|
||||
return upstream, func() {}, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,7 +178,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
|
||||
}
|
||||
for _, c := range conjoinees {
|
||||
if _, present := upstreamNames[c.name]; !present {
|
||||
return upstream, nil // Bail!
|
||||
return upstream, func() {}, nil // Bail!
|
||||
}
|
||||
conjoineeSet[c.name] = struct{}{}
|
||||
}
|
||||
|
||||
@@ -206,7 +206,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
fm, p, upstream := setup(startLock, startRoot, c.precompact)
|
||||
|
||||
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats)
|
||||
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats)
|
||||
require.NoError(t, err)
|
||||
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -227,7 +227,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
|
||||
specs := append([]tableSpec{}, upstream.specs...)
|
||||
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, newTable), nil)
|
||||
}}
|
||||
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
require.NoError(t, err)
|
||||
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -247,7 +247,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
|
||||
u := updatePreemptManifest{fm, func() {
|
||||
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, upstream.specs[1:], nil)
|
||||
}}
|
||||
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
require.NoError(t, err)
|
||||
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -289,7 +289,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
fm, p, upstream := setupAppendix(startLock, startRoot, c.precompact, c.appendix)
|
||||
|
||||
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats)
|
||||
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats)
|
||||
require.NoError(t, err)
|
||||
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -313,7 +313,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
|
||||
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, newTable), upstream.appendix)
|
||||
}}
|
||||
|
||||
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
require.NoError(t, err)
|
||||
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -338,7 +338,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
|
||||
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, upstream.specs...), append(app, newTable))
|
||||
}}
|
||||
|
||||
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
require.NoError(t, err)
|
||||
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -362,7 +362,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
|
||||
u := updatePreemptManifest{fm, func() {
|
||||
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, upstream.specs[len(c.appendix)+1:], upstream.appendix[:])
|
||||
}}
|
||||
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
require.NoError(t, err)
|
||||
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -386,7 +386,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
|
||||
fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, specs, append([]tableSpec{}, newTable))
|
||||
}}
|
||||
|
||||
_, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
_, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats)
|
||||
require.NoError(t, err)
|
||||
exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -168,7 +168,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
|
||||
}
|
||||
|
||||
if plan.chunkCount == 0 {
|
||||
return emptyChunkSource{}, nil, nil
|
||||
return emptyChunkSource{}, func() {}, nil
|
||||
}
|
||||
|
||||
name := nameFromSuffixes(plan.suffixes())
|
||||
@@ -189,22 +189,27 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
|
||||
}()
|
||||
|
||||
for _, sws := range plan.sources.sws {
|
||||
var r io.Reader
|
||||
var r io.ReadCloser
|
||||
r, _, ferr = sws.source.reader(ctx)
|
||||
|
||||
if ferr != nil {
|
||||
return "", ferr
|
||||
}
|
||||
|
||||
n, ferr := io.CopyN(temp, r, int64(sws.dataLen))
|
||||
|
||||
if ferr != nil {
|
||||
r.Close()
|
||||
return "", ferr
|
||||
}
|
||||
|
||||
if uint64(n) != sws.dataLen {
|
||||
r.Close()
|
||||
return "", errors.New("failed to copy all data")
|
||||
}
|
||||
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
_, ferr = temp.Write(plan.mergedIndex)
|
||||
|
||||
@@ -1120,7 +1120,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
|
||||
}
|
||||
|
||||
if nbs.c.conjoinRequired(nbs.tables) {
|
||||
newUpstream, err := conjoin(ctx, nbs.c, nbs.upstream, nbs.mm, nbs.p, nbs.stats)
|
||||
newUpstream, cleanup, err := conjoin(ctx, nbs.c, nbs.upstream, nbs.mm, nbs.p, nbs.stats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1137,6 +1137,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cleanup()
|
||||
return errOptimisticLockFailedTables
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user