mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-02 19:39:56 -05:00
[no-release-notes] go: store/nbs: file_table_reader.go: Fix clone() so that it actually uses the reference count.
A bug in the usage of dynassert meant that clone() was always opening a new table file, instead of using the existing open file. There were a few known consequences: * the window of opportunity when unlocked reads could try to read a closed chunk source and receive an error was much larger than typical. As a result, auto_gc on a standby replica in a cluster was much more likely to fail spuriously. * the number of file descriptors a running server would churn was higher than typical, and some operations were a bit slower / more prone to context switches than they needed to be. This PR also updates some errors to be slightly more verbose about their origin. This PR also updates TestAutoGC in go-sql-server-tests to run less iterations in CI and in -test.short mode. The iterations should be able to be lower since auto GC will fail less often now.
This commit is contained in:
@@ -258,7 +258,7 @@ func buildArchiveReader(ctx context.Context, reader tableReaderAt, footer archiv
|
||||
func (ar archiveReader) clone() (archiveReader, error) {
|
||||
reader, err := ar.reader.clone()
|
||||
if err != nil {
|
||||
return archiveReader{}, nil
|
||||
return archiveReader{}, err
|
||||
}
|
||||
return archiveReader{
|
||||
reader: reader,
|
||||
|
||||
@@ -592,6 +592,9 @@ func (asw *ArchiveStreamWriter) Cancel() error {
|
||||
}
|
||||
|
||||
func (asw *ArchiveStreamWriter) Remove() error {
|
||||
if asw.writer.finalPath == "" {
|
||||
return nil
|
||||
}
|
||||
return os.Remove(asw.writer.finalPath)
|
||||
}
|
||||
|
||||
|
||||
@@ -196,8 +196,8 @@ type fileReaderAt struct {
|
||||
}
|
||||
|
||||
func (fra *fileReaderAt) clone() (tableReaderAt, error) {
|
||||
if dynassert.Assert(atomic.AddInt32(fra.cnt, 1) > 1, "attempt to clone a closed fileReaderAt") {
|
||||
// Restore previous refcnt, even know we're in a weird state...
|
||||
if !dynassert.Assert(atomic.AddInt32(fra.cnt, 1) > 1, "attempt to clone a closed fileReaderAt") {
|
||||
// Restore previous refcnt, despite being in a weird state...
|
||||
atomic.AddInt32(fra.cnt, -1)
|
||||
return newFileReaderAt(fra.path)
|
||||
}
|
||||
|
||||
@@ -74,7 +74,11 @@ func (gcc *gcCopier) addChunk(ctx context.Context, c ToChunker) error {
|
||||
// If the writer should be closed and deleted, instead of being used with
|
||||
// copyTablesToDir, call this method.
|
||||
func (gcc *gcCopier) cancel(_ context.Context) error {
|
||||
return gcc.writer.Cancel()
|
||||
err := gcc.writer.Cancel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("gcCopier cancel err: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err error) {
|
||||
@@ -127,14 +131,14 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err e
|
||||
// Otherwise, write the file through CopyTableFile.
|
||||
r, err := gcc.writer.Reader()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("gc_copier, Reader() error: %w", err)
|
||||
}
|
||||
defer r.Close()
|
||||
sz := gcc.writer.FullLength()
|
||||
|
||||
err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("gc_copier, CopyTableFile error: %w", err)
|
||||
}
|
||||
|
||||
return []tableSpec{
|
||||
|
||||
@@ -2047,7 +2047,7 @@ func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) err
|
||||
// the work we are doing here does not result in a timely
|
||||
// error once the context is canceled.
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
|
||||
if !first {
|
||||
@@ -2099,10 +2099,10 @@ func (i *markAndSweeper) SaveHashes(ctx context.Context, hashes []hash.Hash) err
|
||||
addErr = i.getAddrs(c)(ctx, nextToVisit, func(h hash.Hash) bool { return false })
|
||||
}, gcDependencyMode_NoDependency)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("SaveHashes, error calling getManyCompressed: %w", err)
|
||||
}
|
||||
if addErr != nil {
|
||||
return addErr
|
||||
return fmt.Errorf("SaveHashes, error calling gcc.addChunk: %w", addErr)
|
||||
}
|
||||
if found != len(toVisit) {
|
||||
return fmt.Errorf("dangling references requested during GC. GC not successful. %v", toVisit)
|
||||
@@ -2250,7 +2250,7 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mo
|
||||
for _, css := range oldNovel {
|
||||
err = css.close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("swapTables, oldNovel css.close(): %w", err)
|
||||
return fmt.Errorf("swapTables, oldNovel css.close(), err: %w", err)
|
||||
}
|
||||
}
|
||||
nbs.memtable = nil
|
||||
|
||||
@@ -735,7 +735,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
|
||||
err = sweeper.SaveHashes(ctx, toVisit.ToSlice())
|
||||
if err != nil {
|
||||
cErr := sweeper.Close(ctx)
|
||||
return nil, errors.Join(err, cErr)
|
||||
return nil, errors.Join(fmt.Errorf("Error in SaveHashes call: %w", err), cErr)
|
||||
}
|
||||
toVisit = nil
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ func TestAutoGC(t *testing.T) {
|
||||
var enabled_16, final_16, disabled, final_disabled RepoSize
|
||||
numStatements, numCommits := 512, 16
|
||||
if testing.Short() || os.Getenv("CI") != "" {
|
||||
numStatements = 96
|
||||
numStatements = 64
|
||||
}
|
||||
t.Run("Enable", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
Reference in New Issue
Block a user