mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-06 19:35:18 -05:00
Merge pull request #6744 from dolthub/aaron/fix-hasCache-dangling-references-bug-2
go/store/nbs: store.go: Clean up how we update hasCache so that we only update it after successfully writing the memtable.
This commit is contained in:
@@ -16,6 +16,8 @@ package doltdb_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
@@ -28,7 +30,13 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
"github.com/dolthub/dolt/go/store/prolly"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
|
||||
func TestGarbageCollection(t *testing.T) {
|
||||
@@ -40,6 +48,8 @@ func TestGarbageCollection(t *testing.T) {
|
||||
testGarbageCollection(t, gct)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("HasCacheDataCorruption", testGarbageCollectionHasCacheDataCorruptionBugFix)
|
||||
}
|
||||
|
||||
type stage struct {
|
||||
@@ -140,3 +150,118 @@ func testGarbageCollection(t *testing.T, test gcTest) {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, test.expected, actual)
|
||||
}
|
||||
|
||||
// In September 2023, we found a failure to handle the `hasCache` in
|
||||
// `*NomsBlockStore` appropriately while cleaning up a memtable into which
|
||||
// dangling references had been written could result in writing chunks to a
|
||||
// database which referenced non-existant chunks.
|
||||
//
|
||||
// The general pattern was to get new chunk addresses into the hasCache, but
|
||||
// not written to the store, and then to have an incoming chunk add a refenece
|
||||
// to missing chunk. At that time, we would clear the memtable, since it had
|
||||
// invalid chunks in it, but we wouldn't purge the hasCache. Later writes which
|
||||
// attempted to reference the chunks which had made it into the hasCache would
|
||||
// succeed.
|
||||
//
|
||||
// One such concrete pattern for doing this is implemented below. We do:
|
||||
//
|
||||
// 1) Put a new chunk to the database -- C1.
|
||||
//
|
||||
// 2) Run a GC.
|
||||
//
|
||||
// 3) Put a new chunk to the database -- C2.
|
||||
//
|
||||
// 4) Call NBS.Commit() with a stale last hash.Hash. This causes us to cache C2
|
||||
// as present in the store, but it does not get written to disk, because the
|
||||
// optimistic concurrency control on the value of the current root hash fails.
|
||||
//
|
||||
// 5) Put a chunk referencing C1 to the database -- R1.
|
||||
//
|
||||
// 5) Call NBS.Commit(). This causes ErrDanglingRef. C1 was written before the
|
||||
// GC and is no longer in the store. C2 is also cleared from the pending write
|
||||
// set.
|
||||
//
|
||||
// 6) Put a chunk referencing C2 to the database -- R2.
|
||||
//
|
||||
// 7) Call NBS.Commit(). This should fail, since R2 references C2 and C2 is not
|
||||
// in the store. However, C2 is in the cache as a result of step #4, and so
|
||||
// this does not fail. R2 gets written to disk with a dangling reference to C2.
|
||||
func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
d, err := os.MkdirTemp(t.TempDir(), "hascachetest-")
|
||||
require.NoError(t, err)
|
||||
|
||||
ddb, err := doltdb.LoadDoltDB(ctx, types.Format_DOLT, "file://"+d, filesys.LocalFS)
|
||||
require.NoError(t, err)
|
||||
defer ddb.Close()
|
||||
|
||||
err = ddb.WriteEmptyRepo(ctx, "main", "Aaron Son", "aaron@dolthub.com")
|
||||
require.NoError(t, err)
|
||||
|
||||
root, err := ddb.NomsRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
ns := ddb.NodeStore()
|
||||
|
||||
c1 := newIntMap(t, ctx, ns, 1, 1)
|
||||
_, err = ns.Write(ctx, c1.Node())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ddb.GC(ctx, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
c2 := newIntMap(t, ctx, ns, 2, 2)
|
||||
_, err = ns.Write(ctx, c2.Node())
|
||||
require.NoError(t, err)
|
||||
|
||||
success, err := ddb.CommitRoot(ctx, c2.HashOf(), c2.HashOf())
|
||||
require.NoError(t, err)
|
||||
require.False(t, success, "committing the root with a last hash which does not match the current root must fail")
|
||||
|
||||
r1 := newAddrMap(t, ctx, ns, "r1", c1.HashOf())
|
||||
_, err = ns.Write(ctx, r1.Node())
|
||||
require.NoError(t, err)
|
||||
|
||||
success, err = ddb.CommitRoot(ctx, root, root)
|
||||
require.True(t, errors.Is(err, nbs.ErrDanglingRef), "committing a reference to just-collected c1 must fail with ErrDanglingRef")
|
||||
|
||||
r2 := newAddrMap(t, ctx, ns, "r2", c2.HashOf())
|
||||
_, err = ns.Write(ctx, r2.Node())
|
||||
require.NoError(t, err)
|
||||
|
||||
success, err = ddb.CommitRoot(ctx, root, root)
|
||||
require.True(t, errors.Is(err, nbs.ErrDanglingRef), "committing a reference to c2, which was erased with the ErrDanglingRef above, must also fail with ErrDanglingRef")
|
||||
}
|
||||
|
||||
func newIntMap(t *testing.T, ctx context.Context, ns tree.NodeStore, k, v int8) prolly.Map {
|
||||
desc := val.NewTupleDescriptor(val.Type{
|
||||
Enc: val.Int8Enc,
|
||||
Nullable: false,
|
||||
})
|
||||
|
||||
tb := val.NewTupleBuilder(desc)
|
||||
tb.PutInt8(0, k)
|
||||
keyTuple := tb.Build(ns.Pool())
|
||||
|
||||
tb.PutInt8(0, v)
|
||||
valueTuple := tb.Build(ns.Pool())
|
||||
|
||||
m, err := prolly.NewMapFromTuples(ctx, ns, desc, desc, keyTuple, valueTuple)
|
||||
require.NoError(t, err)
|
||||
return m
|
||||
}
|
||||
|
||||
func newAddrMap(t *testing.T, ctx context.Context, ns tree.NodeStore, key string, h hash.Hash) prolly.AddressMap {
|
||||
m, err := prolly.NewEmptyAddressMap(ns)
|
||||
require.NoError(t, err)
|
||||
|
||||
editor := m.Editor()
|
||||
err = editor.Add(ctx, key, h)
|
||||
require.NoError(t, err)
|
||||
|
||||
m, err = editor.Flush(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ package nbs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -154,18 +153,6 @@ func (gcs *GenerationalNBS) hasMany(recs []hasRecord) (absent hash.HashSet, err
|
||||
return gcs.oldGen.hasMany(recs)
|
||||
}
|
||||
|
||||
func (gcs *GenerationalNBS) errorIfDangling(ctx context.Context, addrs hash.HashSet) error {
|
||||
absent, err := gcs.HasMany(ctx, addrs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(absent) != 0 {
|
||||
s := absent.String()
|
||||
return fmt.Errorf("Found dangling references to %s", s)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Put caches c in the ChunkSource. Upon return, c must be visible to
|
||||
// subsequent Get and Has calls, but must not be persistent until a call
|
||||
// to Flush(). Put may be called concurrently with other calls to Put(),
|
||||
|
||||
+40
-66
@@ -705,6 +705,36 @@ func (nbs *NomsBlockStore) putChunk(ctx context.Context, c chunks.Chunk, getAddr
|
||||
return nil
|
||||
}
|
||||
|
||||
// When we have chunks with dangling references in our memtable, we have to
|
||||
// throw away the entire memtable.
|
||||
func (nbs *NomsBlockStore) handlePossibleDanglingRefError(err error) {
|
||||
if errors.Is(err, ErrDanglingRef) {
|
||||
nbs.mt = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Writes to a Dolt database typically involve mutating some tuple maps and
|
||||
// then mutating the top-level address map which points to all the branch heads
|
||||
// and working sets. Each internal node of the address map can have many
|
||||
// references and many of them typically change quite slowly. We keep a cache
|
||||
// of recently written references which we know are in the database so that we
|
||||
// don't have to check the table file indexes for these chunks when we write
|
||||
// references to them again in the near future.
|
||||
//
|
||||
// This cache needs to be treated in a principled manner. The integrity checks
|
||||
// that we run against the a set of chunks we are attempting to write consider
|
||||
// the to-be-written chunks themselves as also being in the database. This is
|
||||
// correct, assuming that all the chunks are written at the same time. However,
|
||||
// we should not add the results of those presence checks to the cache until
|
||||
// those chunks actually land in the database.
|
||||
func (nbs *NomsBlockStore) addPendingRefsToHasCache() {
|
||||
for _, e := range nbs.mt.pendingRefs {
|
||||
if e.has {
|
||||
nbs.hasCache.Add(*e.a, struct{}{})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs hash.HashSet, checker refCheck) (bool, error) {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return false, err
|
||||
@@ -725,11 +755,10 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
|
||||
if addChunkRes == chunkNotAdded {
|
||||
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrDanglingRef) {
|
||||
nbs.mt = nil
|
||||
}
|
||||
nbs.handlePossibleDanglingRefError(err)
|
||||
return false, err
|
||||
}
|
||||
nbs.addPendingRefsToHasCache()
|
||||
nbs.tables = ts
|
||||
nbs.mt = newMemTable(nbs.mtSize)
|
||||
addChunkRes = nbs.mt.addChunk(a, ch.Data())
|
||||
@@ -757,7 +786,6 @@ type refCheck func(reqs []hasRecord) (hash.HashSet, error)
|
||||
func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) error {
|
||||
if !root.IsEmpty() {
|
||||
a := addr(root)
|
||||
// We use |Get| here, since it updates recency of the entry.
|
||||
if _, ok := nbs.hasCache.Get(a); !ok {
|
||||
var hr [1]hasRecord
|
||||
hr[0].a = &a
|
||||
@@ -771,32 +799,6 @@ func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) err
|
||||
nbs.hasCache.Add(a, struct{}{})
|
||||
}
|
||||
}
|
||||
|
||||
if nbs.mt == nil || nbs.mt.pendingRefs == nil {
|
||||
return nil // no pending refs to check
|
||||
}
|
||||
|
||||
for i := range nbs.mt.pendingRefs {
|
||||
// All of these are going to be |Add|ed after the call. We use
|
||||
// |Contains| to check here so the frequency count only gets
|
||||
// bumped once.
|
||||
if nbs.hasCache.Contains(*nbs.mt.pendingRefs[i].a) {
|
||||
nbs.mt.pendingRefs[i].has = true
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(hasRecordByPrefix(nbs.mt.pendingRefs))
|
||||
absent, err := checker(nbs.mt.pendingRefs)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if absent.Size() > 0 {
|
||||
return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
|
||||
}
|
||||
|
||||
for _, e := range nbs.mt.pendingRefs {
|
||||
nbs.hasCache.Add(*e.a, struct{}{})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1130,39 +1132,6 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// check for dangling references in |nbs.mt|
|
||||
if err = nbs.errorIfDangling(current, checker); err != nil {
|
||||
if errors.Is(err, ErrDanglingRef) {
|
||||
nbs.mt = nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
// This is unfortunate. We want to serialize commits to the same store
|
||||
// so that we avoid writing a bunch of unreachable small tables which result
|
||||
// from optimistic lock failures. However, this means that the time to
|
||||
// write tables is included in "commit" time and if all commits are
|
||||
// serialized, it means a lot more waiting.
|
||||
// "non-trivial" tables are persisted here, outside of the commit-lock.
|
||||
// all other tables are persisted in updateManifest()
|
||||
if nbs.mt != nil {
|
||||
cnt, err := nbs.mt.count()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if cnt > preflushChunkCount {
|
||||
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrDanglingRef) {
|
||||
nbs.mt = nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
nbs.tables, nbs.mt = ts, nil
|
||||
}
|
||||
}
|
||||
|
||||
nbs.mm.LockForUpdate()
|
||||
defer func() {
|
||||
unlockErr := nbs.mm.UnlockForUpdate()
|
||||
@@ -1233,11 +1202,10 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
|
||||
if cnt > 0 {
|
||||
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrDanglingRef) {
|
||||
nbs.mt = nil
|
||||
}
|
||||
nbs.handlePossibleDanglingRefError(err)
|
||||
return err
|
||||
}
|
||||
nbs.addPendingRefsToHasCache()
|
||||
nbs.tables, nbs.mt = ts, nil
|
||||
}
|
||||
}
|
||||
@@ -1250,6 +1218,12 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
|
||||
return errOptimisticLockFailedTables
|
||||
}
|
||||
|
||||
// check for dangling reference to the new root
|
||||
if err = nbs.errorIfDangling(current, checker); err != nil {
|
||||
nbs.handlePossibleDanglingRefError(err)
|
||||
return err
|
||||
}
|
||||
|
||||
specs, err := nbs.tables.toSpecs()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -302,10 +302,6 @@ func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, h
|
||||
return tableSet{}, fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
|
||||
}
|
||||
|
||||
for _, e := range mt.pendingRefs {
|
||||
hasCache.Add(*e.a, struct{}{})
|
||||
}
|
||||
|
||||
cs, err := ts.p.Persist(ctx, mt, ts, stats)
|
||||
if err != nil {
|
||||
return tableSet{}, err
|
||||
|
||||
Reference in New Issue
Block a user