go/store/nbs: archive_writer.go: Fix problem with archive stream writer where we did not remove temptf files when we were pushing changes.

This commit is contained in:
Aaron Son
2025-10-22 16:23:08 -07:00
parent cfa982c420
commit 52e673e445
3 changed files with 159 additions and 3 deletions

View File

@@ -62,7 +62,10 @@ type archiveWriter struct {
// times. It is not used for any other purpose, and there are cases where we bypass checking it (e.g. conjoining archives).
seenChunks hash.HashSet
// MD5 is calculated on the entire output, so this hash sink wraps actual ByteSink.
md5Summer *HashingByteSink
md5Summer *HashingByteSink
// the temporary file path of the output file where we were writing. "" if we were against an in-memory byte sink.
path string
// the final path we put the archive if we called FlushToFile.
finalPath string
stagedBytes stagedByteSpanSlice
stagedChunks stagedChunkRefSlice
@@ -107,6 +110,7 @@ func newArchiveWriter(tmpDir string) (*archiveWriter, error) {
hbSha := NewSHA512HashingByteSink(hbMd5)
return &archiveWriter{
md5Summer: hbMd5,
path: bs.path,
seenChunks: hash.HashSet{},
output: hbSha,
}, nil
@@ -608,10 +612,10 @@ func (asw *ArchiveStreamWriter) Cancel() error {
}
func (asw *ArchiveStreamWriter) Remove() error {
if asw.writer.finalPath == "" {
if asw.writer.path == "" {
return nil
}
return os.Remove(asw.writer.finalPath)
return os.Remove(asw.writer.path)
}
func (asw *ArchiveStreamWriter) writeArchiveToChunker(chunker ArchiveToChunker) (uint32, error) {

View File

@@ -0,0 +1,76 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"crypto/rand"
"io"
"io/fs"
"os"
"testing"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/chunks"
)
func CountFilesInDir(t *testing.T, path string) int {
cnt := 0
err := fs.WalkDir(os.DirFS(path), ".", func(path string, _ fs.DirEntry, err error) error {
if err != nil {
return err
}
cnt += 1
return nil
})
require.NoError(t, err)
return cnt
}
func TestArchiveStreamWriterRemove(t *testing.T) {
t.Run("RemoveOnFinishedWriterRemovesFile", func(t *testing.T) {
dir := t.TempDir()
require.Equal(t, 1, CountFilesInDir(t, dir))
asw, err := NewArchiveStreamWriter(dir)
require.NoError(t, err)
contents := make([]byte, 1024)
_, err = io.ReadFull(rand.Reader, contents)
require.NoError(t, err)
_, err = asw.AddChunk(ChunkToCompressedChunk(chunks.NewChunk(contents)))
require.NoError(t, err)
_, _, err = asw.Finish()
require.NoError(t, err)
rdr, err := asw.Reader()
require.NoError(t, err)
require.NoError(t, rdr.Close())
err = asw.Remove()
require.NoError(t, err)
require.Equal(t, 1, CountFilesInDir(t, dir))
})
t.Run("CancelOnWriterRemovesFile", func(t *testing.T) {
dir := t.TempDir()
require.Equal(t, 1, CountFilesInDir(t, dir))
asw, err := NewArchiveStreamWriter(dir)
require.NoError(t, err)
contents := make([]byte, 1024)
_, err = io.ReadFull(rand.Reader, contents)
require.NoError(t, err)
_, err = asw.AddChunk(ChunkToCompressedChunk(chunks.NewChunk(contents)))
require.NoError(t, err)
err = asw.Cancel()
require.NoError(t, err)
require.Equal(t, 1, CountFilesInDir(t, dir))
})
}

View File

@@ -0,0 +1,76 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"io/fs"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver"
)
// A simple test to ensure that temptf is cleaned up when we push.
//
// This is inconvenient to do from somewhere like `bats` because
// the failure mode we are looking for is before process shutdown,
// i.e., when there is a long running server.
func TestPushTemptfCleanup(t *testing.T) {
t.Parallel()
u, err := driver.NewDoltUser()
require.NoError(t, err)
t.Cleanup(func() {
u.Cleanup()
})
rs, err := u.MakeRepoStore()
require.NoError(t, err)
dbOne, err := rs.MakeRepo("db_one")
require.NoError(t, err)
dir := t.TempDir()
dbOne.CreateRemote("origin", "file://"+dir)
var ports DynamicResources
ports.global = &GlobalPorts
ports.t = t
server := MakeServer(t, rs, &driver.Server{
Args: []string{"--port", `{{get_port "server_one"}}`},
DynamicPort: "server_one",
}, &ports)
require.NotNil(t, server)
db, err := server.DB(driver.Connection{User: "root"})
require.NoError(t, err)
t.Cleanup(func() {
db.Close()
})
conn, err := db.Conn(t.Context())
require.NoError(t, err)
_, err = conn.ExecContext(t.Context(), "USE db_one")
require.NoError(t, err)
_, err = conn.ExecContext(t.Context(), "CALL dolt_push('origin', 'main')")
require.NoError(t, err)
cnt := 0
err = fs.WalkDir(os.DirFS(filepath.Join(dbOne.Dir, ".dolt/temptf")), ".", func(path string, _ fs.DirEntry, err error) error {
if err != nil {
return err
}
cnt += 1
return nil
})
require.NoError(t, err)
require.Equal(t, 1, cnt)
}