Merge pull request #9135 from dolthub/aaron/async-conjoin

go/store/nbs: conjoiner.go,store.go: Make conjoin asynchronous.
This commit is contained in:
Aaron Son
2025-04-23 10:22:09 -07:00
committed by GitHub
5 changed files with 252 additions and 143 deletions
+4 -2
View File
@@ -62,8 +62,8 @@ func (bs *InMemoryBlobstore) Path() string {
// Get retrieves an io.reader for the portion of a blob specified by br along with
// its version
func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) {
bs.mutex.Lock()
defer bs.mutex.Unlock()
bs.mutex.RLock()
defer bs.mutex.RUnlock()
if val, ok := bs.blobs[key]; ok {
if ver, ok := bs.versions[key]; ok && ver != "" {
@@ -114,6 +114,8 @@ func (bs *InMemoryBlobstore) CheckAndPut(ctx context.Context, expectedVersion, k
// For InMemoryBlobstore instances error should never be returned (though other
// implementations of this interface can)
func (bs *InMemoryBlobstore) Exists(ctx context.Context, key string) (bool, error) {
bs.mutex.RLock()
defer bs.mutex.RUnlock()
_, ok := bs.blobs[key]
return ok, nil
}
+149 -101
View File
@@ -28,8 +28,6 @@ import (
"time"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/hash"
)
type conjoinStrategy interface {
@@ -52,6 +50,7 @@ func (c inlineConjoiner) conjoinRequired(ts tableSet) bool {
// chooseConjoinees implements conjoinStrategy. Current approach is to choose the smallest N tables which,
// when removed and replaced with the conjoinment, will leave the conjoinment as the smallest table.
// We also keep taking table files until we get below maxTables.
func (c inlineConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, keepers []tableSpec, err error) {
sorted := make([]tableSpec, len(upstream))
copy(sorted, upstream)
@@ -65,7 +64,9 @@ func (c inlineConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, kee
for i < len(sorted) {
next := sorted[i].chunkCount
if sum <= next {
break
if c.maxTables == 0 || len(sorted)-i < c.maxTables {
break
}
}
sum += next
i++
@@ -86,6 +87,142 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper
return
}
// A conjoinOperation is a multi-step process that a NomsBlockStore runs to
// conjoin the table files in the store.
//
// Conjoining the table files in a store involves copying all the data
// from |n| files into a single file, and replacing the entries for
// those table files in the manifest with the single, conjoin table
// file. Conjoining is a periodic maintanence operation which is
// automatically done against NomsBlockStores.
//
// Conjoining a lot of chunks across a number of table files can take
// a long time. On every manifest update, including every Commit,
// NomsBlockStore checks if the store needs conjoining. If it does, it
// starts an ansynchronous process which will create the new table
// file from the table files which have been chosen to be conjoined.
// This process will run in the background until the table file is
// created and in the right place. Then the conjoin finalization will
// take place. When finalizing a conjoin, the manifest contents of the
// store are updated. The conjoin only succeeds if all the table files
// which were conjoined are still in the manifest when we go to update
// it. Otherwise the conjoined table file is deleted and the store can
// try to create a new conjoined file if it is still necessary.
//
// A conjoinOperation is created when a conjoinStrategy |conjoinRequired| returns true.
type conjoinOperation struct {
// The computed things we conjoined in |conjoin|.
conjoinees []tableSpec
// The tableSpec for the conjoined file.
conjoined tableSpec
// Anything to run as cleanup after we complete successfully.
// This comes directly from persister.ConjoinAll, but needs to
// be run after the manifest update lands successfully.
cleanup cleanupFunc
}
// Compute what we will conjoin and prepare to do it. This should be
// done synchronously and with the Mutex held by NomsBlockStore.
func (op *conjoinOperation) prepareConjoin(ctx context.Context, strat conjoinStrategy, upstream manifestContents) error {
if upstream.NumAppendixSpecs() != 0 {
upstream, _ = upstream.removeAppendixSpecs()
}
var err error
op.conjoinees, _, err = strat.chooseConjoinees(upstream.specs)
if err != nil {
return err
}
return nil
}
// Actually runs persister.ConjoinAll, after conjoinees are chosen by
// |prepareConjoin|. This should be done asynchronously by
// NomsBlockStore.
func (op *conjoinOperation) conjoin(ctx context.Context, persister tablePersister, stats *Stats) error {
var err error
op.conjoined, op.cleanup, err = conjoinTables(ctx, op.conjoinees, persister, stats)
if err != nil {
return err
}
return nil
}
// Land the update in the conjoin result in the manifest as an update
// which removes the conjoinees and adds the conjoined. Only updates
// the manifest by adding the conjoined file if all conjoinees are
// still present in the manifest.
//
// Whether the conjoined file lands or not, this returns a nil error
// if it runs to completion successfully and it returns a cleanupFunc
// which should be run.
func (op *conjoinOperation) updateManifest(ctx context.Context, upstream manifestContents, mm manifestUpdater, stats *Stats) (manifestContents, cleanupFunc, error) {
conjoineeSet := toSpecSet(op.conjoinees)
for {
upstreamSet := toSpecSet(upstream.specs)
canApply := true
alreadyApplied := false
for h := range conjoineeSet {
if _, ok := upstreamSet[h]; !ok {
canApply = false
break
}
}
if canApply {
newSpecs := make([]tableSpec, len(upstream.specs)-len(conjoineeSet)+1)
ins := 0
for i, s := range upstream.specs {
if _, ok := conjoineeSet[s.name]; !ok {
newSpecs[ins] = s
ins += 1
}
if i == len(upstream.appendix) {
newSpecs[ins] = op.conjoined
ins += 1
}
}
newContents := manifestContents{
nbfVers: upstream.nbfVers,
root: upstream.root,
lock: generateLockHash(upstream.root, newSpecs, upstream.appendix, nil),
gcGen: upstream.gcGen,
specs: newSpecs,
appendix: upstream.appendix,
}
updated, err := mm.Update(ctx, upstream.lock, newContents, stats, nil)
if err != nil {
return manifestContents{}, func() {}, err
}
if newContents.lock == updated.lock {
return updated, op.cleanup, nil
}
// Go back around the loop, trying to apply against the new upstream.
upstream = updated
} else {
if _, ok := upstreamSet[op.conjoined.name]; ok {
alreadyApplied = true
}
if !alreadyApplied {
// In theory we could delete the conjoined
// table file here, since its conjoinees are
// no longer in the manifest and it itself is
// not in the manifest either.
//
// tablePersister does not expose a
// functionality to prune it, and it will get
// picked up by GC anyway, so we do not do
// that here.
return upstream, func() {}, nil
} else {
return upstream, func() {}, nil
}
}
}
}
// conjoin attempts to use |p| to conjoin some number of tables referenced
// by |upstream|, allowing it to update |mm| with a new, smaller, set of tables
// that references precisely the same set of chunks. Conjoin() may not
@@ -94,105 +231,16 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper
// handle this, likely by rebasing against upstream and re-evaluating the
// situation.
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, cleanupFunc, error) {
var conjoined tableSpec
var conjoinees, keepers, appendixSpecs []tableSpec
var cleanup cleanupFunc
for {
if conjoinees == nil {
// Appendix table files should never be conjoined
// so we remove them before conjoining and add them
// back after
if upstream.NumAppendixSpecs() != 0 {
upstream, appendixSpecs = upstream.removeAppendixSpecs()
}
var err error
conjoinees, keepers, err = s.chooseConjoinees(upstream.specs)
if err != nil {
return manifestContents{}, nil, err
}
conjoined, cleanup, err = conjoinTables(ctx, conjoinees, p, stats)
if err != nil {
return manifestContents{}, nil, err
}
}
specs := append(make([]tableSpec, 0, len(keepers)+1), conjoined)
if len(appendixSpecs) > 0 {
specs = append(make([]tableSpec, 0, len(specs)+len(appendixSpecs)), appendixSpecs...)
specs = append(specs, conjoined)
}
specs = append(specs, keepers...)
newContents := manifestContents{
nbfVers: upstream.nbfVers,
root: upstream.root,
lock: generateLockHash(upstream.root, specs, appendixSpecs, nil),
gcGen: upstream.gcGen,
specs: specs,
appendix: appendixSpecs,
}
var err error
upstream, err = mm.Update(ctx, upstream.lock, newContents, stats, nil)
if err != nil {
return manifestContents{}, nil, err
}
if newContents.lock == upstream.lock {
return upstream, cleanup, nil
}
// Optimistic lock failure. Someone else moved to the root, the
// set of tables, or both out from under us. If we can re-use
// the conjoin we already performed, we want to try again.
// Currently, we will only do so if ALL conjoinees are still
// present upstream. If we can't re-use...then someone else
// almost certainly landed a conjoin upstream. In this case,
// bail and let clients ask again if they think they still
// can't proceed.
// If the appendix has changed we simply bail
// and let the client retry
if len(appendixSpecs) > 0 {
if len(upstream.appendix) != len(appendixSpecs) {
return upstream, func() {}, nil
}
for i := range upstream.appendix {
if upstream.appendix[i].name != appendixSpecs[i].name {
return upstream, func() {}, nil
}
}
// No appendix change occurred, so we remove the appendix
// on the "latest" upstream which will be added back
// before the conjoin completes
upstream, appendixSpecs = upstream.removeAppendixSpecs()
}
conjoineeSet := map[hash.Hash]struct{}{}
upstreamNames := map[hash.Hash]struct{}{}
for _, spec := range upstream.specs {
upstreamNames[spec.name] = struct{}{}
}
for _, c := range conjoinees {
if _, present := upstreamNames[c.name]; !present {
return upstream, func() {}, nil // Bail!
}
conjoineeSet[c.name] = struct{}{}
}
// Filter conjoinees out of upstream.specs to generate new set of keepers
keepers = make([]tableSpec, 0, len(upstream.specs)-len(conjoinees))
for _, spec := range upstream.specs {
if _, present := conjoineeSet[spec.name]; !present {
keepers = append(keepers, spec)
}
}
var op conjoinOperation
err := op.prepareConjoin(ctx, s, upstream)
if err != nil {
return manifestContents{}, nil, err
}
err = op.conjoin(ctx, p, stats)
if err != nil {
return manifestContents{}, nil, err
}
return op.updateManifest(ctx, upstream, mm, stats)
}
func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister, stats *Stats) (conjoined tableSpec, cleanup cleanupFunc, err error) {
+10
View File
@@ -147,6 +147,16 @@ func TestStats(t *testing.T) {
_, err = store.Commit(context.Background(), h, h)
require.NoError(t, err)
waitForConjoin(store)
assert.Equal(uint64(1), stats(store).ConjoinLatency.Samples())
// TODO: Once random conjoin hack is out, test other conjoin stats
}
func waitForConjoin(nbs *NomsBlockStore) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
for nbs.conjoinOp != nil {
nbs.conjoinOpCond.Wait()
}
}
+86 -40
View File
@@ -109,6 +109,9 @@ type NomsBlockStore struct {
tables tableSet
upstream manifestContents
conjoinOp *conjoinOperation
conjoinOpCond *sync.Cond
// Guarded by |mu|. Notified on gcInProgress and gcOutstandingReads changes.
// Used to implement |waitForGC|.
gcCond *sync.Cond
@@ -269,34 +272,76 @@ func (nbs *NomsBlockStore) handleUnlockedRead(ctx context.Context, gcb gcBehavio
}
}
func (nbs *NomsBlockStore) conjoinIfRequired(ctx context.Context) (bool, error) {
func (nbs *NomsBlockStore) startConjoinIfRequired(ctx context.Context) error {
if nbs.conjoinOp != nil {
return nil
}
if nbs.conjoiner.conjoinRequired(nbs.tables) {
nbs.logger.WithField("upstream_len", len(nbs.tables.upstream)).Info("beginning conjoin of database")
newUpstream, cleanup, err := conjoin(ctx, nbs.conjoiner, nbs.upstream, nbs.manifestMgr, nbs.persister, nbs.stats)
var op = &conjoinOperation{}
err := op.prepareConjoin(ctx, nbs.conjoiner, nbs.upstream)
if err != nil {
nbs.logger.WithError(err).Info("conjoin of database failed")
return false, err
return err
}
newTables, err := nbs.tables.rebase(ctx, newUpstream.specs, nil, nbs.stats)
if err != nil {
nbs.logger.WithError(err).Info("during conjoin, updating database with new table files failed")
return false, err
}
nbs.upstream = newUpstream
oldTables := nbs.tables
nbs.tables = newTables
nbs.logger.WithField("new_upstream_len", len(nbs.tables.upstream)).Info("conjoin completed successfully")
err = oldTables.close()
if err != nil {
return true, err
}
cleanup()
return true, nil
} else {
return false, nil
nbs.conjoinOp = op
go func(ctx context.Context) {
// We use context.Background(), since this context will outlive the caller
// and it does not access NomsBlockStore storage directly, instead operating
// only on tablePersister and manifestUpdater.
err := op.conjoin(ctx, nbs.persister, nbs.stats)
nbs.finalizeConjoin(ctx, err)
}(context.Background())
}
return nil
}
// Called in an asynchronous context from the goroutine that |startConjoinIfRequired| kicks off.
//
// Responsible for calling conjoinOp.updateManifest under lock and dealing with its results.
func (nbs *NomsBlockStore) finalizeConjoin(ctx context.Context, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
defer func() {
nbs.conjoinOp = nil
nbs.conjoinOpCond.Broadcast()
}()
if err != nil {
nbs.logger.WithError(err).Warn("conjoin of database failed with error")
return
}
nbs.manifestMgr.LockForUpdate()
defer func() {
err := nbs.manifestMgr.UnlockForUpdate()
if err != nil {
nbs.logger.WithError(err).Warn("during conjoin, unlocking manifest manager for update failed with error")
}
}()
newUpstream, cleanup, err := nbs.conjoinOp.updateManifest(ctx, nbs.upstream, nbs.manifestMgr, nbs.stats)
if err != nil {
nbs.logger.WithError(err).Warn("during conjoin, updating database manifest with new table files failed")
}
newTables, err := nbs.tables.rebase(ctx, newUpstream.specs, nil, nbs.stats)
if err != nil {
nbs.logger.WithError(err).Warn("during conjoin, updating database with new table files failed")
return
}
nbs.upstream = newUpstream
oldTables := nbs.tables
nbs.tables = newTables
nbs.logger.WithField("new_upstream_len", len(nbs.tables.upstream)).Info("conjoin completed successfully")
err = oldTables.close()
if err != nil {
nbs.logger.WithError(err).Warn("during conjoin, closing old table files failed with error")
return
}
cleanup()
}
func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (ManifestInfo, error) {
@@ -321,7 +366,7 @@ func (nbs *NomsBlockStore) updateManifestAddFiles(ctx context.Context, updates m
err = errors.Join(err, nbs.manifestMgr.UnlockForUpdate())
}()
_, err = nbs.conjoinIfRequired(ctx)
err = nbs.startConjoinIfRequired(ctx)
if err != nil {
return manifestContents{}, false, err
}
@@ -655,6 +700,7 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager
logger: logrus.StandardLogger().WithField("pkg", "store.noms"),
}
nbs.gcCond = sync.NewCond(&nbs.mu)
nbs.conjoinOpCond = sync.NewCond(&nbs.mu)
t1 := time.Now()
defer nbs.stats.OpenLatency.SampleTimeSince(t1)
@@ -1376,13 +1422,10 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
break
}
didConjoin, err := nbs.conjoinIfRequired(ctx)
err := nbs.startConjoinIfRequired(ctx)
if err != nil {
return err
}
if didConjoin {
return errOptimisticLockFailedTables
}
// check for dangling reference to the new root
if err = nbs.errorIfDangling(current, checker); err != nil {
@@ -1453,17 +1496,16 @@ func (nbs *NomsBlockStore) AccessMode() chunks.ExclusiveAccessMode {
return nbs.persister.AccessMode()
}
func (nbs *NomsBlockStore) Close() (err error) {
if cerr := nbs.persister.Close(); cerr != nil {
err = cerr
func (nbs *NomsBlockStore) Close() error {
nbs.mu.Lock()
defer nbs.mu.Unlock()
for nbs.conjoinOp != nil {
nbs.conjoinOpCond.Wait()
}
if cerr := nbs.tables.close(); cerr != nil {
err = cerr
}
if cerr := nbs.manifestMgr.Close(); cerr != nil {
err = cerr
}
return
err := nbs.persister.Close()
err = errors.Join(err, nbs.tables.close())
err = errors.Join(err, nbs.manifestMgr.Close())
return err
}
func (nbs *NomsBlockStore) Stats() interface{} {
@@ -1849,8 +1891,12 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) {
}
func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool, _ chunks.GCMode) error {
nbs.gcCond.L.Lock()
defer nbs.gcCond.L.Unlock()
nbs.mu.Lock()
defer nbs.mu.Unlock()
// Block until there is no ongoing conjoin...
for nbs.conjoinOp != nil {
nbs.conjoinOpCond.Wait()
}
if nbs.gcInProgress {
return errors.New("gc already in progress")
}
+3
View File
@@ -213,10 +213,13 @@ func TestNBSPruneTableFiles(t *testing.T) {
}, st.refCheck)
require.NoError(t, err)
require.True(t, ok)
ok, err = st.Commit(ctx, st.upstream.root, st.upstream.root)
require.True(t, ok)
require.NoError(t, err)
waitForConjoin(st)
_, sources, _, err := st.Sources(ctx)
require.NoError(t, err)
assert.Greater(t, numTableFiles, len(sources))