mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-22 11:29:06 -05:00
Merge pull request #9940 from dolthub/aaron/visit-gc-roots-map-editors
go: sqle: Add support for visting GC roots in current table editors.
This commit is contained in:
@@ -847,6 +847,10 @@ func (d *DoltSession) VisitGCRoots(ctx context.Context, dbName string, keep func
|
||||
panic("gc safepoint establishment found inconsistent state; process could not guarantee it would be able to keep a chunk if we continue")
|
||||
}
|
||||
}
|
||||
err = head.writeSession.VisitGCRoots(ctx, keep)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,11 +15,14 @@
|
||||
package dsess
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
|
||||
@@ -35,6 +38,10 @@ type WriteSession interface {
|
||||
// SetWorkingSet modifies the state of the WriteSession. The WorkingSetRef of |ws| must match the existing Ref.
|
||||
SetWorkingSet(ctx *sql.Context, ws *doltdb.WorkingSet) error
|
||||
|
||||
// VisitGCRoots is used to ensure that a write session's GC roots are retained
|
||||
// in the case that a GC needs to proceed before the write session is flushed.
|
||||
VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error
|
||||
|
||||
// GetOptions returns the editor.Options for this session.
|
||||
GetOptions() editor.Options
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
@@ -29,6 +30,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -249,6 +251,10 @@ func (s *nomsWriteSession) getTableEditor(ctx context.Context, tableName string,
|
||||
return localTableEditor, nil
|
||||
}
|
||||
|
||||
func (s *nomsWriteSession) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
return errors.New("unsupported session-aware GC use on __LD_1__ database.")
|
||||
}
|
||||
|
||||
// setRoot is the inner implementation for SetWorkingRoot that does not acquire any locks
|
||||
func (s *nomsWriteSession) setWorkingSet(ctx context.Context, ws *doltdb.WorkingSet) error {
|
||||
if ws == nil {
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
@@ -85,6 +86,7 @@ type indexWriter interface {
|
||||
Discard(ctx context.Context) error
|
||||
HasEdits(ctx context.Context) bool
|
||||
IterRange(ctx context.Context, rng prolly.Range) (prolly.MapIter, error)
|
||||
VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error
|
||||
}
|
||||
|
||||
type primaryIndexErrBuilder interface {
|
||||
@@ -226,6 +228,10 @@ func (m prollyIndexWriter) Commit(ctx context.Context) error {
|
||||
return m.mut.Checkpoint(ctx)
|
||||
}
|
||||
|
||||
func (m prollyIndexWriter) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
return m.mut.VisitGCRoots(ctx, roots)
|
||||
}
|
||||
|
||||
func (m prollyIndexWriter) Discard(ctx context.Context) error {
|
||||
m.mut.Revert(ctx)
|
||||
return nil
|
||||
@@ -332,6 +338,10 @@ func (m prollySecondaryIndexWriter) keyFromRow(ctx context.Context, sqlRow sql.R
|
||||
return m.keyBld.Build(sharePool)
|
||||
}
|
||||
|
||||
func (m prollySecondaryIndexWriter) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
return m.mut.VisitGCRoots(ctx, roots)
|
||||
}
|
||||
|
||||
func (m prollySecondaryIndexWriter) Insert(ctx context.Context, sqlRow sql.Row) error {
|
||||
k, err := m.keyFromRow(ctx, sqlRow)
|
||||
if err != nil {
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
@@ -50,6 +51,10 @@ func (k prollyKeylessWriter) ValidateKeyViolations(ctx context.Context, sqlRow s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k prollyKeylessWriter) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
return k.mut.VisitGCRoots(ctx, roots)
|
||||
}
|
||||
|
||||
func (k prollyKeylessWriter) Insert(ctx context.Context, sqlRow sql.Row) error {
|
||||
hashId, value, err := k.tuplesFromRow(ctx, sqlRow)
|
||||
if err != nil {
|
||||
@@ -208,6 +213,10 @@ func (writer prollyKeylessSecondaryWriter) Map(ctx context.Context) (prolly.MapI
|
||||
return writer.mut.Map(ctx)
|
||||
}
|
||||
|
||||
func (writer prollyKeylessSecondaryWriter) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
return writer.mut.VisitGCRoots(ctx, roots)
|
||||
}
|
||||
|
||||
// ValidateKeyViolations implements the interface indexWriter.
|
||||
func (writer prollyKeylessSecondaryWriter) ValidateKeyViolations(ctx context.Context, sqlRow sql.Row) error {
|
||||
return nil
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/globalstate"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/pool"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
@@ -179,6 +180,20 @@ func (w *prollyTableWriter) Delete(ctx *sql.Context, sqlRow sql.Row) (err error)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *prollyTableWriter) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
err := w.primary.VisitGCRoots(ctx, roots)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, writer := range w.secondary {
|
||||
err = writer.VisitGCRoots(ctx, roots)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update implements TableWriter.
|
||||
func (w *prollyTableWriter) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row) (err error) {
|
||||
for _, wr := range w.secondary {
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
@@ -25,6 +26,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/globalstate"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
|
||||
// prollyWriteSession handles all edit operations on a table that may also update other tables.
|
||||
@@ -43,6 +45,18 @@ func (s *prollyWriteSession) GetWorkingSet() *doltdb.WorkingSet {
|
||||
return s.workingSet
|
||||
}
|
||||
|
||||
func (s *prollyWriteSession) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
for _, writer := range s.tables {
|
||||
err := writer.VisitGCRoots(ctx, roots)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTableWriter implemented WriteSession.
|
||||
func (s *prollyWriteSession) GetTableWriter(ctx *sql.Context, tableName doltdb.TableName, db string, setter dsess.SessionRootSetter, targetStaging bool) (dsess.TableWriter, error) {
|
||||
s.mut.Lock()
|
||||
|
||||
@@ -55,4 +55,5 @@ type MutableMapInterface interface {
|
||||
HasEdits() bool
|
||||
IterRange(ctx context.Context, rng Range) (MapIter, error)
|
||||
MapInterface(ctx context.Context) (MapInterface, error)
|
||||
VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error
|
||||
}
|
||||
|
||||
@@ -145,7 +145,7 @@ func testRevertAfterFlush(t *testing.T, scale int) {
|
||||
// flush post-checkpoint edits halfway through
|
||||
// this creates a stashed tree in |mut|
|
||||
if i == len(post)/2 {
|
||||
err = mut.flushPending(ctx)
|
||||
err = mut.flushPending(ctx, false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
@@ -63,6 +64,22 @@ func (mut *GenericMutableMap[M, T]) Map(ctx context.Context) (M, error) {
|
||||
return mut.flusher.Map(ctx, mut)
|
||||
}
|
||||
|
||||
func (mut *GenericMutableMap[M, T]) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
// flushPending in order to ensure that our tuples.Static root
|
||||
// is a chunk which includes all of the reachable chunks for
|
||||
// our edits, including chunked internal values like text,
|
||||
// json and geometry.
|
||||
err := mut.flushPending(ctx, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
roots(mut.tuples.Static.GetRoot().HashOf())
|
||||
if mut.stash != nil {
|
||||
roots(mut.stash.Static.GetRoot().HashOf())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// newMutableMap returns a new MutableMap.
|
||||
func newMutableMap(m Map) *MutableMap {
|
||||
return &MutableMap{
|
||||
@@ -108,7 +125,7 @@ func (mut *GenericMutableMap[M, T]) Put(ctx context.Context, key, value val.Tupl
|
||||
return err
|
||||
}
|
||||
if mut.tuples.Edits.Count() > mut.maxPending {
|
||||
return mut.flushPending(ctx)
|
||||
return mut.flushPending(ctx, false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -158,14 +175,43 @@ func (mut *GenericMutableMap[M, T]) Revert(ctx context.Context) {
|
||||
mut.tuples.Edits.Revert(ctx)
|
||||
}
|
||||
|
||||
func (mut *GenericMutableMap[M, T]) flushPending(ctx context.Context) error {
|
||||
func (mut *GenericMutableMap[M, T]) flushPending(ctx context.Context, deep bool) error {
|
||||
stash := mut.stash
|
||||
// if our in-memory edit set contains a checkpoint, we
|
||||
// must stash a copy of |mut.tuples| we can revert to.
|
||||
if mut.tuples.Edits.HasCheckpoint() {
|
||||
cp := mut.tuples.Copy()
|
||||
cp.Edits.Revert(ctx)
|
||||
stash = &cp
|
||||
if deep {
|
||||
// deep is used by online GC. In order to
|
||||
// ensure that all internal chunk pointers in
|
||||
// all reachable cached edits get visited by
|
||||
// GC, we flush the checkpointed edits to
|
||||
// storage as well.
|
||||
//
|
||||
// TODO: Because of the way apply mutations is
|
||||
// currently coupled with GenericMutableMap,
|
||||
// we construct this temporary generic mutable
|
||||
// map to make the call we want to. This could
|
||||
// be better if we could flush the map with a
|
||||
// more straightforward call which passed
|
||||
// along the dependencies.
|
||||
tmpGMM := &GenericMutableMap[M, T]{
|
||||
keyDesc: mut.keyDesc,
|
||||
valDesc: mut.valDesc,
|
||||
flusher: mut.flusher,
|
||||
tuples: cp,
|
||||
stash: nil,
|
||||
maxPending: mut.maxPending,
|
||||
}
|
||||
err := tmpGMM.flushPending(ctx, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stash = &tmpGMM.tuples
|
||||
} else {
|
||||
stash = &cp
|
||||
}
|
||||
}
|
||||
serializer := mut.flusher.GetDefaultSerializer(ctx, mut)
|
||||
sm, err := mut.flusher.ApplyMutationsWithSerializer(ctx, serializer, mut)
|
||||
|
||||
Reference in New Issue
Block a user