correctly unwrap PrivilegedDatabases in validation

This commit is contained in:
Andy Arthur
2022-06-07 10:20:28 -07:00
parent fcf78b5b8e
commit 906c875f47
2 changed files with 68 additions and 48 deletions

View File

@@ -69,6 +69,7 @@ var _ enginetest.VersionedDBHarness = (*DoltHarness)(nil)
var _ enginetest.ForeignKeyHarness = (*DoltHarness)(nil)
var _ enginetest.KeylessTableHarness = (*DoltHarness)(nil)
var _ enginetest.ReadOnlyDatabaseHarness = (*DoltHarness)(nil)
var _ enginetest.ValidatingHarness = (*DoltHarness)(nil)
func newDoltHarness(t *testing.T) *DoltHarness {
dEnv := dtestutils.CreateTestEnv()
@@ -447,10 +448,7 @@ func (d *DoltHarness) SnapshotTable(db sql.VersionedDatabase, name string, asOf
func (d *DoltHarness) ValidateEngine(ctx *sql.Context, e *gms.Engine) (err error) {
for _, db := range e.Analyzer.Catalog.AllDatabases(ctx) {
if _, ok := db.(sqle.Database); !ok {
continue
}
if err = ValidateDatabase(ctx, db.(sqle.Database)); err != nil {
if err = ValidateDatabase(ctx, db); err != nil {
return err
}
}

View File

@@ -18,16 +18,29 @@ import (
"context"
"fmt"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/mysql_db"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
)
func ValidateDatabase(ctx context.Context, db sqle.Database) (err error) {
func ValidateDatabase(ctx context.Context, db sql.Database) (err error) {
switch tdb := db.(type) {
case sqle.Database:
return ValidateDoltDatabase(ctx, tdb)
case mysql_db.PrivilegedDatabase:
return ValidateDatabase(ctx, tdb.Unwrap())
default:
return nil
}
}
func ValidateDoltDatabase(ctx context.Context, db sqle.Database) (err error) {
if !types.IsFormat_DOLT_1(db.GetDoltDB().Format()) {
return nil
}
@@ -45,18 +58,64 @@ var validationStages = []validator{
validateChunkReferences,
}
// validateChunkReferences checks for dangling chunks.
func validateChunkReferences(ctx context.Context, db sqle.Database) error {
validateIndex := func(ctx context.Context, idx durable.Index) error {
pm := durable.ProllyMapFromIndex(idx)
return pm.WalkNodes(ctx, func(ctx context.Context, nd tree.Node) error {
if nd.Size() <= 0 {
return fmt.Errorf("encountered nil tree.Node")
}
return nil
})
}
cb := func(n string, t *doltdb.Table, sch schema.Schema) (stop bool, err error) {
if sch == nil {
return true, fmt.Errorf("expected non-nil schema: %v", sch)
}
rows, err := t.GetRowData(ctx)
if err != nil {
return true, err
}
if err = validateIndex(ctx, rows); err != nil {
return true, err
}
indexes, err := t.GetIndexSet(ctx)
if err != nil {
return true, err
}
err = durable.IterAllIndexes(ctx, sch, indexes, func(_ string, idx durable.Index) error {
return validateIndex(ctx, idx)
})
if err != nil {
return true, err
}
return
}
return iterDatabaseTables(ctx, db, cb)
}
// iterDatabaseTables is a utility to factor out common validation access patterns.
func iterDatabaseTables(
ctx context.Context,
db sqle.Database,
cb func(name string, t *doltdb.Table, sch schema.Schema) (bool, error),
) error {
ddb := db.GetDoltDB()
bb, err := ddb.GetBranches(ctx)
branches, err := ddb.GetBranches(ctx)
if err != nil {
return err
}
for i := range bb {
for i := range branches {
var c *doltdb.Commit
var r *doltdb.RootValue
c, err = ddb.ResolveCommitRef(ctx, bb[i])
c, err = ddb.ResolveCommitRef(ctx, branches[i])
if err != nil {
return err
}
@@ -64,46 +123,9 @@ func validateChunkReferences(ctx context.Context, db sqle.Database) error {
if err != nil {
return err
}
err = r.IterTables(ctx, func(_ string, t *doltdb.Table, sch schema.Schema) (stop bool, err error) {
if sch == nil {
return true, fmt.Errorf("expected non-nil schema: %v", sch)
}
rows, err := t.GetRowData(ctx)
if err != nil {
return true, err
}
if err = validateIndexChunkReferences(ctx, rows); err != nil {
return false, err
}
indexes, err := t.GetIndexSet(ctx)
if err != nil {
return true, err
}
err = durable.IterAllIndexes(ctx, sch, indexes, func(_ string, idx durable.Index) error {
return validateIndexChunkReferences(ctx, idx)
})
if err != nil {
return false, err
}
return
})
if err != nil {
if err = r.IterTables(ctx, cb); err != nil {
return err
}
}
return nil
}
func validateIndexChunkReferences(ctx context.Context, idx durable.Index) error {
pm := durable.ProllyMapFromIndex(idx)
return pm.WalkNodes(ctx, func(ctx context.Context, nd tree.Node) error {
if nd.Size() <= 0 {
return fmt.Errorf("encountered nil tree.Node")
}
return nil
})
}