go: store/datas/database_common: store/nbs/store.go: Fix some issues when pushing to a dolt sql-server that is itself running GC.

A push to a remote works by uploading the missing content and then adding
references to it in the remote datastore. If the remote is running a GC during
the push, it is possible for the newly added data to be collected and no longer
be available when the references are added.

This should cause a transient failure which is safe is retry. There were a
couple bugs which could instead cause a panic. This makes some changes to
safeguard against those case.
This commit is contained in:
Aaron Son
2025-06-09 17:19:19 -07:00
parent b787711666
commit 956f073dfd
8 changed files with 117 additions and 54 deletions

View File

@@ -245,7 +245,6 @@ func PushToRemoteBranch[C doltdb.Context](ctx C, rsr env.RepoStateReader[C], tem
switch err {
case nil:
cli.Println()
return nil
case doltdb.ErrUpToDate, doltdb.ErrIsAhead, ErrCantFF, datas.ErrMergeNeeded, datas.ErrDirtyWorkspace, ErrShallowPushImpossible:
return err

View File

@@ -279,6 +279,11 @@ func (db *database) doSetHead(ctx context.Context, ds Dataset, addr hash.Hash, w
return err
}
if newHead == nil {
// This can happen on an attempt to set a head to an address which does not exist in the database.
return fmt.Errorf("SetHead failed: attempt to set a dataset head to an address which is not in the store")
}
newVal := newHead.value()
headType := newHead.TypeName()

View File

@@ -25,6 +25,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
@@ -133,8 +134,8 @@ func (ftp *fsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fil
defer func() {
cerr := temp.Close()
if err == nil {
err = cerr
if cerr != nil {
err = errors.Join(err, fmt.Errorf("error Closing temp in CopyTableFile: %w", cerr))
}
}()
@@ -161,6 +162,7 @@ func (ftp *fsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fil
ftp.toKeep[filepath.Clean(path)] = struct{}{}
}
defer ftp.removeMu.Unlock()
// logrus.Infof("added table file: %s", path)
return file.Rename(tn, path)
}
@@ -198,8 +200,8 @@ func (ftp *fsTablePersister) persistTable(ctx context.Context, name hash.Hash, d
defer func() {
closeErr := temp.Close()
if ferr == nil {
ferr = closeErr
if closeErr != nil {
ferr = errors.Join(ferr, fmt.Errorf("error Closing temp in persistTable: %w", closeErr))
}
}()
@@ -408,7 +410,7 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func()
if _, ok := ftp.curTmps[filepath.Clean(p)]; !ok {
err := file.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
ea.add(p, err)
ea.add(p, fmt.Errorf("error file.Remove unfilteredTempFiles: %w", err))
}
}
ftp.removeMu.Unlock()
@@ -419,7 +421,7 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, keeper func()
if _, ok := ftp.toKeep[filepath.Clean(p)]; !ok {
err := file.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
ea.add(p, err)
ea.add(p, fmt.Errorf("error file.Remove unfilteredTableFiles: %w", err))
}
}
ftp.removeMu.Unlock()

View File

@@ -29,6 +29,7 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/dolthub/dolt/go/store/hash"
@@ -107,7 +108,9 @@ func newFileReaderAt(path string) (*fileReaderAt, error) {
// Size returns the number of bytes for regular files and is system dependent for others (Some of which can be negative).
return nil, fmt.Errorf("%s has invalid size: %d", path, fi.Size())
}
return &fileReaderAt{f, path, fi.Size()}, nil
cnt := new(int32)
*cnt = 1
return &fileReaderAt{f, path, fi.Size(), cnt}, nil
}
func nomsFileTableReader(ctx context.Context, path string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) {
@@ -187,22 +190,31 @@ type fileReaderAt struct {
f *os.File
path string
sz int64
cnt *int32
}
func (fra *fileReaderAt) clone() (tableReaderAt, error) {
f, err := os.Open(fra.path)
if err != nil {
return nil, err
if atomic.AddInt32(fra.cnt, 1) == 1 {
panic("attempt to clone a closed fileReaderAt")
}
return &fileReaderAt{
f,
fra.f,
fra.path,
fra.sz,
fra.cnt,
}, nil
}
func (fra *fileReaderAt) Close() error {
return fra.f.Close()
cnt := atomic.AddInt32(fra.cnt, -1)
if cnt == 0 {
// logrus.Infof("closing %s", fra.path)
return fra.f.Close()
} else if cnt < 0 {
panic("invalid cnt on fileReaderAt")
} else {
return nil
}
}
func (fra *fileReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {

View File

@@ -216,14 +216,14 @@ func (nbs *NomsBlockStore) getChunkLocations(ctx context.Context, hashes hash.Ha
ranges := make(map[*chunkSource]map[hash.Hash]Range)
gcb, err := fn(tables.upstream, gr, ranges, keeper)
if needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err); err != nil {
if needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, false, endRead, err); err != nil {
return nil, err
} else if needsContinue {
continue
}
gcb, err = fn(tables.novel, gr, ranges, keeper)
if needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err); err != nil {
if needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, true, endRead, err); err != nil {
return nil, err
} else if needsContinue {
continue
@@ -251,7 +251,7 @@ func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.Ha
return res, nil
}
func (nbs *NomsBlockStore) handleUnlockedRead(ctx context.Context, gcb gcBehavior, endRead func(), err error) (bool, error) {
func (nbs *NomsBlockStore) handleUnlockedRead(ctx context.Context, gcb gcBehavior, final bool, endRead func(), err error) (bool, error) {
if err != nil {
if endRead != nil {
nbs.mu.Lock()
@@ -269,7 +269,7 @@ func (nbs *NomsBlockStore) handleUnlockedRead(ctx context.Context, gcb gcBehavio
nbs.mu.Unlock()
return true, err
} else {
if endRead != nil {
if endRead != nil && final {
nbs.mu.Lock()
endRead()
nbs.mu.Unlock()
@@ -919,7 +919,7 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
nbs.mu.Unlock()
data, gcb, err := tables.get(ctx, h, keeper, nbs.stats)
needContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err)
needContinue, err := nbs.handleUnlockedRead(ctx, gcb, true, endRead, err)
if err != nil {
return chunks.EmptyChunk, err
}
@@ -1014,7 +1014,7 @@ func (nbs *NomsBlockStore) getManyWithFunc(
_, gcb, err := getManyFunc(ctx, tables, eg, reqs, keeper, nbs.stats)
return gcb, errors.Join(err, eg.Wait())
}()
needContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err)
needContinue, err := nbs.handleUnlockedRead(ctx, gcb, true, endRead, err)
if err != nil {
return err
}
@@ -1103,7 +1103,7 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
nbs.mu.Unlock()
has, gcb, err := tables.has(h, keeper)
needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err)
needsContinue, err := nbs.handleUnlockedRead(ctx, gcb, true, endRead, err)
if err != nil {
return false, err
}
@@ -1165,7 +1165,7 @@ func (nbs *NomsBlockStore) hasManyDep(ctx context.Context, hashes hash.HashSet,
nbs.mu.Unlock()
remaining, gcb, err := tables.hasMany(reqs, keeper)
needContinue, err := nbs.handleUnlockedRead(ctx, gcb, endRead, err)
needContinue, err := nbs.handleUnlockedRead(ctx, gcb, true, endRead, err)
if err != nil {
return nil, err
}
@@ -1804,7 +1804,7 @@ func (nbs *NomsBlockStore) addTableFilesToManifest(ctx context.Context, fileIdTo
// checks pass.
sources, err := nbs.openChunkSourcesForAddTableFiles(ctx, fileIdHashToNumChunks)
if err != nil {
return err
return fmt.Errorf("addTableFiles, openChunkSources: %w", err)
}
// If these sources get added to the store, they will get cloned.
// Either way, we want to close these instances when we are done.
@@ -1826,7 +1826,7 @@ func (nbs *NomsBlockStore) addTableFilesToManifest(ctx context.Context, fileIdTo
err = refCheckAllSources(ctx, getAddrs, refCheck, sources.sources, nbs.stats)
if err != nil {
// There was an error checking all references.
return err
return fmt.Errorf("addTableFiles, refCheckAllSources: %w", err)
}
}
@@ -1834,7 +1834,7 @@ func (nbs *NomsBlockStore) addTableFilesToManifest(ctx context.Context, fileIdTo
// We add them to the set of table files in the store.
_, gcGenMismatch, err := nbs.updateManifestAddFiles(ctx, fileIdHashToNumChunks, nil, &sources.gcGen, sources.sources)
if err != nil {
return err
return fmt.Errorf("addTableFiles, updateManifestAddFiles: %w", err)
} else if gcGenMismatch {
// A gcGenMismatch means that the store has changed out from under
// us as we were running these checks. We want to retry.
@@ -2233,13 +2233,13 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mo
// replace nbs.tables.upstream with gc compacted tables
ts, err := nbs.tables.rebase(ctx, upstream.specs, nil, nbs.stats)
if err != nil {
return err
return fmt.Errorf("swapTables, rebase: %w", err)
}
oldTables := nbs.tables
nbs.tables, nbs.upstream = ts, upstream
err = oldTables.close()
if err != nil {
return err
return fmt.Errorf("swapTables, oldTables.close(): %w", err)
}
// When this is called, we are at a safepoint in the GC process.
@@ -2250,13 +2250,7 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mo
for _, css := range oldNovel {
err = css.close()
if err != nil {
return err
}
}
if nbs.memtable != nil {
var thrown []string
for a := range nbs.memtable.chunks {
thrown = append(thrown, a.String())
return fmt.Errorf("swapTables, oldNovel css.close(): %w", err)
}
}
nbs.memtable = nil

View File

@@ -742,12 +742,7 @@ func (tr tableReader) currentSize() uint64 {
}
func (tr tableReader) close() error {
err := tr.idx.Close()
if err != nil {
tr.r.Close()
return err
}
return tr.r.Close()
return errors.Join(tr.idx.Close(), tr.r.Close())
}
func (tr tableReader) clone() (tableReader, error) {

View File

@@ -338,23 +338,14 @@ func (ts tableSet) physicalLen() (uint64, error) {
return lenNovel + lenUp, nil
}
func (ts tableSet) close() error {
var firstErr error
setErr := func(err error) {
if err != nil && firstErr == nil {
firstErr = err
}
}
func (ts tableSet) close() (err error) {
for _, t := range ts.novel {
err := t.close()
setErr(err)
err = errors.Join(err, t.close())
}
for _, t := range ts.upstream {
err := t.close()
setErr(err)
err = errors.Join(err, t.close())
}
return firstErr
return
}
// Size returns the number of tables in this tableSet.

View File

@@ -73,6 +73,18 @@ func TestAutoGC(t *testing.T) {
require.NoError(t, err)
t.Logf("standby size: %v", rs)
})
t.Run("PushToRemotesAPI", func(t *testing.T) {
t.Parallel()
var s AutoGCTest
s.Enable = true
s.EnableRemotesAPI = true
s.Archive = sa.archive
enabled_16, final_16 = runAutoGCTest(t, &s, numStatements, 2)
assert.Contains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC")
assert.Contains(t, string(s.StandbyServer.Output.Bytes()), "Successfully completed auto GC")
assert.NotContains(t, string(s.PrimaryServer.Output.Bytes()), "dangling references requested during GC")
assert.NotContains(t, string(s.StandbyServer.Output.Bytes()), "dangling references requested during GC")
})
})
}
})
@@ -103,6 +115,8 @@ type AutoGCTest struct {
StandbyServer *driver.SqlServer
StandbyDB *sql.DB
EnableRemotesAPI bool
Ports *DynamicResources
}
@@ -119,7 +133,7 @@ func (s *AutoGCTest) Setup(ctx context.Context, t *testing.T) {
s.CreatePrimaryServer(ctx, t, u)
if s.Replicate {
if s.Replicate || s.EnableRemotesAPI {
u, err := driver.NewDoltUser()
require.NoError(t, err)
t.Cleanup(func() {
@@ -129,6 +143,10 @@ func (s *AutoGCTest) Setup(ctx context.Context, t *testing.T) {
}
s.CreatePrimaryDatabase(ctx, t)
if s.EnableRemotesAPI {
s.CreateUsersAndRemotes(ctx, t, u)
}
}
func (s *AutoGCTest) CreatePrimaryServer(ctx context.Context, t *testing.T, u driver.DoltUser) {
@@ -176,6 +194,7 @@ cluster:
server := MakeServer(t, repo, &driver.Server{
Args: []string{"--config", "server.yaml"},
DynamicPort: "primary_server_port",
Envs: []string{"DOLT_REMOTE_PASSWORD=insecurepassword"},
}, s.Ports)
server.DBName = "auto_gc_test"
@@ -212,6 +231,15 @@ behavior:
enable: %v%v
`, s.Enable, archiveFragment)
var remotesapiFragment string
if s.EnableRemotesAPI {
remotesapiFragment = `
remotesapi:
port: {{get_port "standby_remotesapi_port"}}
read_only: false
`
}
var clusterFragment string
if s.Replicate {
clusterFragment = `
@@ -228,7 +256,7 @@ cluster:
err = driver.WithFile{
Name: "server.yaml",
Contents: behaviorFragment + clusterFragment,
Contents: behaviorFragment + remotesapiFragment + clusterFragment,
Template: s.Ports.ApplyTemplate,
}.WriteAtDir(repo.Dir)
require.NoError(t, err)
@@ -285,6 +313,26 @@ create table vals (
require.NoError(t, conn.Close())
}
// Create a user on the stanby used for pushing from the primary to the standby.
// Create the remote itself on the primary, pointing to the standby.
func (s *AutoGCTest) CreateUsersAndRemotes(ctx context.Context, t *testing.T, u driver.DoltUser) {
conn, err := s.StandbyDB.Conn(ctx)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "create user remoteuser@'%' identified by 'insecurepassword'")
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "grant all on *.* to remoteuser@'%'")
require.NoError(t, err)
require.NoError(t, conn.Close())
conn, err = s.PrimaryDB.Conn(ctx)
require.NoError(t, err)
port, ok := s.Ports.GetPort("standby_remotesapi_port")
require.True(t, ok)
_, err = conn.ExecContext(ctx, fmt.Sprintf("call dolt_remote('add', 'origin', 'http://localhost:%d/auto_gc_test')", port))
require.NoError(t, err)
require.NoError(t, conn.Close())
}
func autoGCInsertStatement(i int) string {
var vals []string
for j := i * 1024; j < (i+1)*1024; j++ {
@@ -307,6 +355,8 @@ func runAutoGCTest(t *testing.T, s *AutoGCTest, numStatements int, commitEvery i
ctx := context.Background()
s.Setup(ctx, t)
var pushAttempts, pushSuccesses int
for i := 0; i < numStatements; i++ {
stmt := autoGCInsertStatement(i)
conn, err := s.PrimaryDB.Conn(ctx)
@@ -315,10 +365,25 @@ func runAutoGCTest(t *testing.T, s *AutoGCTest, numStatements int, commitEvery i
if i%commitEvery == 0 {
_, err = conn.ExecContext(ctx, "call dolt_commit('-am', 'insert from "+strconv.Itoa(i*1024)+"')")
require.NoError(t, err)
if s.EnableRemotesAPI {
// Pushes are allowed to fail transiently, since the pushed data can be GCd before
// it is added to the store. But pushes should mostly succeed.
pushAttempts += 1
_, err = conn.ExecContext(ctx, "call dolt_push('origin', '--force', '--user', 'remoteuser', 'main')")
if err == nil {
pushSuccesses += 1
}
}
}
require.NoError(t, conn.Close())
}
if s.EnableRemotesAPI {
// Pushes should succeed at least 33% of the time.
// This is a conservative lower bound.
require.Less(t, float64(pushAttempts)*.33, float64(pushSuccesses))
}
before, err := GetRepoSize(s.PrimaryDir)
require.NoError(t, err)
conn, err := s.PrimaryDB.Conn(ctx)