diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 242f399bf5..f95aafcf3f 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -518,6 +518,7 @@ func (mp manualPart) run(ctx context.Context, buff []byte) error { if err != nil { return err } + defer reader.Close() _, err = io.ReadFull(reader, buff[mp.start:mp.end]) return err } diff --git a/go/store/nbs/conjoiner.go b/go/store/nbs/conjoiner.go index bc0085b84c..3c23e77f69 100644 --- a/go/store/nbs/conjoiner.go +++ b/go/store/nbs/conjoiner.go @@ -91,7 +91,7 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper // process actor has already landed a conjoin of its own. Callers must // handle this, likely by rebasing against upstream and re-evaluating the // situation. -func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, error) { +func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, cleanupFunc, error) { var conjoined tableSpec var conjoinees, keepers, appendixSpecs []tableSpec var cleanup cleanupFunc @@ -108,12 +108,12 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, var err error conjoinees, keepers, err = s.chooseConjoinees(upstream.specs) if err != nil { - return manifestContents{}, err + return manifestContents{}, nil, err } conjoined, cleanup, err = conjoinTables(ctx, conjoinees, p, stats) if err != nil { - return manifestContents{}, err + return manifestContents{}, nil, err } } @@ -137,12 +137,11 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, var err error upstream, err = mm.Update(ctx, upstream.lock, newContents, stats, nil) if err != nil { - return manifestContents{}, err + return manifestContents{}, nil, err } if newContents.lock == upstream.lock { - cleanup() - return upstream, nil + return upstream, cleanup, nil } // Optimistic lock failure. Someone else moved to the root, the @@ -158,11 +157,11 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, // and let the client retry if len(appendixSpecs) > 0 { if len(upstream.appendix) != len(appendixSpecs) { - return upstream, nil + return upstream, func() {}, nil } for i := range upstream.appendix { if upstream.appendix[i].name != appendixSpecs[i].name { - return upstream, nil + return upstream, func() {}, nil } } @@ -179,7 +178,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, } for _, c := range conjoinees { if _, present := upstreamNames[c.name]; !present { - return upstream, nil // Bail! + return upstream, func() {}, nil // Bail! } conjoineeSet[c.name] = struct{}{} } diff --git a/go/store/nbs/conjoiner_test.go b/go/store/nbs/conjoiner_test.go index 19cd246e21..fe0bdf4d4c 100644 --- a/go/store/nbs/conjoiner_test.go +++ b/go/store/nbs/conjoiner_test.go @@ -206,7 +206,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { t.Run(c.name, func(t *testing.T) { fm, p, upstream := setup(startLock, startRoot, c.precompact) - _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats) + _, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats) require.NoError(t, err) exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil) require.NoError(t, err) @@ -227,7 +227,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { specs := append([]tableSpec{}, upstream.specs...) fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, newTable), nil) }} - _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) + _, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) require.NoError(t, err) exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil) require.NoError(t, err) @@ -247,7 +247,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { u := updatePreemptManifest{fm, func() { fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, upstream.specs[1:], nil) }} - _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) + _, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) require.NoError(t, err) exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil) require.NoError(t, err) @@ -289,7 +289,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { t.Run(c.name, func(t *testing.T) { fm, p, upstream := setupAppendix(startLock, startRoot, c.precompact, c.appendix) - _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats) + _, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, fm, p, stats) require.NoError(t, err) exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil) require.NoError(t, err) @@ -313,7 +313,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, newTable), upstream.appendix) }} - _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) + _, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) require.NoError(t, err) exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil) require.NoError(t, err) @@ -338,7 +338,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, append(specs, upstream.specs...), append(app, newTable)) }} - _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) + _, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) require.NoError(t, err) exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil) require.NoError(t, err) @@ -362,7 +362,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { u := updatePreemptManifest{fm, func() { fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, upstream.specs[len(c.appendix)+1:], upstream.appendix[:]) }} - _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) + _, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) require.NoError(t, err) exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil) require.NoError(t, err) @@ -386,7 +386,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) { fm.set(constants.NomsVersion, computeAddr([]byte("lock2")), startRoot, specs, append([]tableSpec{}, newTable)) }} - _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) + _, _, err := conjoin(context.Background(), inlineConjoiner{}, upstream, u, p, stats) require.NoError(t, err) exists, newUpstream, err := fm.ParseIfExists(context.Background(), stats, nil) require.NoError(t, err) diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index b3a339591b..a3670111f7 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -168,7 +168,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource } if plan.chunkCount == 0 { - return emptyChunkSource{}, nil, nil + return emptyChunkSource{}, func() {}, nil } name := nameFromSuffixes(plan.suffixes()) @@ -189,22 +189,27 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource }() for _, sws := range plan.sources.sws { - var r io.Reader + var r io.ReadCloser r, _, ferr = sws.source.reader(ctx) - if ferr != nil { return "", ferr } n, ferr := io.CopyN(temp, r, int64(sws.dataLen)) - if ferr != nil { + r.Close() return "", ferr } if uint64(n) != sws.dataLen { + r.Close() return "", errors.New("failed to copy all data") } + + err := r.Close() + if err != nil { + return "", err + } } _, ferr = temp.Write(plan.mergedIndex) diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 8dc240ebb2..b4777c92b3 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1120,7 +1120,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has } if nbs.c.conjoinRequired(nbs.tables) { - newUpstream, err := conjoin(ctx, nbs.c, nbs.upstream, nbs.mm, nbs.p, nbs.stats) + newUpstream, cleanup, err := conjoin(ctx, nbs.c, nbs.upstream, nbs.mm, nbs.p, nbs.stats) if err != nil { return err } @@ -1137,6 +1137,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has if err != nil { return err } + cleanup() return errOptimisticLockFailedTables }