Stats auto refresh (#7424)

* Stats auto refresh prototype

* fix bugs

* test stats update

* add tests, fix bugs

* fix test

* fmt

* delete test

* more tests

* delete table test

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* add and delete stats hooks

* fmt

* concurrency improvements

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* write bug

* more tests and stats functions

* working dolt_stat funcs

* test fixes

* bump

* fmt

* fix wg panic

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* nick comments

---------

Co-authored-by: max-hoffman <max-hoffman@users.noreply.github.com>
This commit is contained in:
Maximilian Hoffman
2024-02-07 18:19:56 -08:00
committed by GitHub
parent ff3a9d543a
commit d019cce6f7
27 changed files with 1741 additions and 188 deletions

View File

@@ -179,14 +179,21 @@ func NewSqlEngine(
"authentication_dolt_jwt": NewAuthenticateDoltJWTPlugin(config.JwksConfig),
})
statsPro := stats.NewProvider()
engine.Analyzer.Catalog.StatsProvider = statsPro
engine.Analyzer.ExecBuilder = rowexec.DefaultBuilder
sessFactory := doltSessionFactory(pro, mrEnv.Config(), bcController, config.Autocommit)
sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, config.Autocommit)
sqlEngine.provider = pro
sqlEngine.contextFactory = sqlContextFactory()
sqlEngine.dsessFactory = sessFactory
sqlEngine.engine = engine
engine.Analyzer.Catalog.StatsProvider = stats.NewProvider()
engine.Analyzer.Catalog.StatsProvider.(*stats.Provider).Load(sql.NewContext(ctx), dbs)
// configuring stats depends on sessionBuilder
// sessionBuilder needs ref to statsProv
if err = statsPro.Configure(ctx, sqlEngine.NewDefaultContext, bThreads, pro, dbs); err != nil {
return nil, err
}
// Load MySQL Db information
if err = engine.Analyzer.Catalog.MySQLDb.LoadData(sql.NewEmptyContext(), data); err != nil {
@@ -229,8 +236,6 @@ func NewSqlEngine(
}
}
sqlEngine.engine = engine
return sqlEngine, nil
}
@@ -386,9 +391,9 @@ func sqlContextFactory() contextFactory {
}
// doltSessionFactory returns a sessionFactory that creates a new DoltSession
func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, config config.ReadWriteConfig, bc *branch_control.Controller, autocommit bool) sessionFactory {
func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, autocommit bool) sessionFactory {
return func(mysqlSess *sql.BaseSession, provider sql.DatabaseProvider) (*dsess.DoltSession, error) {
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc)
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro)
if err != nil {
return nil, err
}

View File

@@ -57,7 +57,7 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
github.com/dolthub/go-mysql-server v0.17.1-0.20240207124505-c0f397a6aaca
github.com/dolthub/go-mysql-server v0.17.1-0.20240207160654-5ed05eb1cc4b
github.com/dolthub/swiss v0.1.0
github.com/goccy/go-json v0.10.2
github.com/google/go-github/v57 v57.0.0

View File

@@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168=
github.com/dolthub/go-mysql-server v0.17.1-0.20240207124505-c0f397a6aaca h1:tI3X4fIUTOT0N8n+GYkPNa384WlJoOBcztK5c5mBzjU=
github.com/dolthub/go-mysql-server v0.17.1-0.20240207124505-c0f397a6aaca/go.mod h1:ANK0a6tyjrZ2cOzDJT3nFsDp80xksI4UfeijFlvnjwE=
github.com/dolthub/go-mysql-server v0.17.1-0.20240207160654-5ed05eb1cc4b h1:FDNaFIT63vRLfn6cTOr0rsgceMlFeuulaUru54HqIIg=
github.com/dolthub/go-mysql-server v0.17.1-0.20240207160654-5ed05eb1cc4b/go.mod h1:ANK0a6tyjrZ2cOzDJT3nFsDp80xksI4UfeijFlvnjwE=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto=
github.com/dolthub/jsonpath v0.0.2-0.20240201003050-392940944c15 h1:sfTETOpsrNJPDn2KydiCtDgVu6Xopq8k3JP8PjFT22s=

View File

@@ -1744,7 +1744,17 @@ func (ddb *DoltDB) SetStatisics(ctx context.Context, addr hash.Hash) error {
return err
}
var ErrNoStatistics = errors.New("No statistics found.")
func (ddb *DoltDB) DropStatisics(ctx context.Context) error {
statsDs, err := ddb.db.GetDataset(ctx, ref.NewStatsRef().String())
_, err = ddb.db.Delete(ctx, statsDs, "")
if err != nil {
return err
}
return nil
}
var ErrNoStatistics = errors.New("no statistics found")
// GetStatistics returns the value of the singleton ref.StatsRef for this database
func (ddb *DoltDB) GetStatistics(ctx context.Context) (prolly.Map, error) {

View File

@@ -25,9 +25,9 @@ const StatsVersion int64 = 1
const (
StatsQualifierColName = "qualifier"
StatsDbColName = "database"
StatsTableColName = "table"
StatsIndexColName = "index"
StatsDbColName = "database_name"
StatsTableColName = "table_name"
StatsIndexColName = "index_name"
StatsPositionColName = "position"
StatsCommitHashColName = "commit_hash"
StatsRowCountColName = "row_count"

View File

@@ -625,6 +625,10 @@ func (p *DoltDatabaseProvider) ListDroppedDatabases(ctx *sql.Context) ([]string,
return p.droppedDatabaseManager.ListDroppedDatabases(ctx)
}
func (p *DoltDatabaseProvider) DbFactoryUrl() string {
return p.dbFactoryUrl
}
func (p *DoltDatabaseProvider) UndropDatabase(ctx *sql.Context, name string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()

View File

@@ -47,6 +47,11 @@ var DoltProcedures = []sql.ExternalStoredProcedureDetails{
{Name: "dolt_revert", Schema: int64Schema("status"), Function: doltRevert},
{Name: "dolt_tag", Schema: int64Schema("status"), Function: doltTag},
{Name: "dolt_verify_constraints", Schema: int64Schema("violations"), Function: doltVerifyConstraints},
{Name: "dolt_stats_drop", Schema: doltMergeSchema, Function: statsFunc(statsDrop)},
{Name: "dolt_stats_restart", Schema: doltMergeSchema, Function: statsFunc(statsRestart)},
{Name: "dolt_stats_stop", Schema: doltMergeSchema, Function: statsFunc(statsStop)},
{Name: "dolt_stats_status", Schema: doltMergeSchema, Function: statsFunc(statsStatus)},
}
// stringSchema returns a non-nullable schema with all columns as LONGTEXT.

View File

@@ -0,0 +1,112 @@
// Copyright 2024 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dprocedures
import (
"fmt"
"strings"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
)
func statsFunc(fn func(ctx *sql.Context) (interface{}, error)) func(ctx *sql.Context, args ...string) (sql.RowIter, error) {
return func(ctx *sql.Context, args ...string) (sql.RowIter, error) {
res, err := fn(ctx)
if err != nil {
return nil, err
}
return rowToIter(res), nil
}
}
// AutoRefreshStatsProvider is a sql.StatsProvider that exposes hooks for
// observing and manipulating background database auto refresh threads.
type AutoRefreshStatsProvider interface {
sql.StatsProvider
CancelRefreshThread(string)
StartRefreshThread(*sql.Context, dsess.DoltDatabaseProvider, string, *env.DoltEnv) error
ThreadStatus(string) string
}
// statsRestart tries to stop and then start a refresh thread
func statsRestart(ctx *sql.Context) (interface{}, error) {
dSess := dsess.DSessFromSess(ctx.Session)
statsPro := dSess.StatsProvider()
dbName := strings.ToLower(ctx.GetCurrentDatabase())
if afp, ok := statsPro.(AutoRefreshStatsProvider); ok {
pro := dSess.Provider()
newFs, err := pro.FileSystem().WithWorkingDir(dbName)
if err != nil {
return nil, fmt.Errorf("failed to restart stats collection: %w", err)
}
dEnv := env.Load(ctx, env.GetCurrentUserHomeDir, newFs, pro.DbFactoryUrl(), "TODO")
afp.CancelRefreshThread(dbName)
err = afp.StartRefreshThread(ctx, pro, dbName, dEnv)
if err != nil {
return nil, fmt.Errorf("failed to restart collection: %w", err)
}
return fmt.Sprintf("restarted stats collection: %s", ref.StatsRef{}.String()), nil
}
return nil, fmt.Errorf("provider does not implement AutoRefreshStatsProvider")
}
// statsStatus returns the last update for a stats thread
func statsStatus(ctx *sql.Context) (interface{}, error) {
dSess := dsess.DSessFromSess(ctx.Session)
dbName := strings.ToLower(ctx.GetCurrentDatabase())
pro := dSess.StatsProvider()
if afp, ok := pro.(AutoRefreshStatsProvider); ok {
return afp.ThreadStatus(dbName), nil
}
return nil, fmt.Errorf("provider does not implement AutoRefreshStatsProvider")
}
// statsStop cancels a refresh thread
func statsStop(ctx *sql.Context) (interface{}, error) {
dSess := dsess.DSessFromSess(ctx.Session)
statsPro := dSess.StatsProvider()
dbName := strings.ToLower(ctx.GetCurrentDatabase())
if afp, ok := statsPro.(AutoRefreshStatsProvider); ok {
afp.CancelRefreshThread(dbName)
return fmt.Sprintf("stopped thread: %s", dbName), nil
}
return nil, fmt.Errorf("provider does not implement AutoRefreshStatsProvider")
}
// statsDrop deletes the stats ref
func statsDrop(ctx *sql.Context) (interface{}, error) {
dSess := dsess.DSessFromSess(ctx.Session)
pro := dSess.StatsProvider()
dbName := strings.ToLower(ctx.GetCurrentDatabase())
if afp, ok := pro.(AutoRefreshStatsProvider); ok {
// currently unsafe to drop stats while running refresh
afp.CancelRefreshThread(dbName)
}
err := pro.DropDbStats(ctx, dbName, true)
if err != nil {
return nil, fmt.Errorf("failed to drop stats: %w", err)
}
return fmt.Sprintf("deleted stats ref for %s", dbName), nil
}

View File

@@ -251,6 +251,10 @@ type emptyRevisionDatabaseProvider struct {
sql.DatabaseProvider
}
func (e emptyRevisionDatabaseProvider) DbFactoryUrl() string {
return ""
}
func (e emptyRevisionDatabaseProvider) UndropDatabase(ctx *sql.Context, dbName string) error {
return nil
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/dolthub/go-mysql-server/sql"
sqltypes "github.com/dolthub/go-mysql-server/sql/types"
"github.com/shopspring/decimal"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
@@ -58,6 +59,7 @@ type DoltSession struct {
tempTables map[string][]sql.Table
globalsConf config.ReadWriteConfig
branchController *branch_control.Controller
statsProv sql.StatsProvider
mu *sync.Mutex
fs filesys.Filesys
@@ -94,6 +96,7 @@ func NewDoltSession(
pro DoltDatabaseProvider,
conf config.ReadWriteConfig,
branchController *branch_control.Controller,
statsProvider sql.StatsProvider,
) (*DoltSession, error) {
username := conf.GetStringOrDefault(config.UserNameKey, "")
email := conf.GetStringOrDefault(config.UserEmailKey, "")
@@ -109,6 +112,7 @@ func NewDoltSession(
tempTables: make(map[string][]sql.Table),
globalsConf: globals,
branchController: branchController,
statsProv: statsProvider,
mu: &sync.Mutex{},
fs: pro.FileSystem(),
}
@@ -121,6 +125,11 @@ func (d *DoltSession) Provider() DoltDatabaseProvider {
return d.provider
}
// StatsProvider returns the sql.StatsProvider for this session.
func (d *DoltSession) StatsProvider() sql.StatsProvider {
return d.statsProv
}
// DSessFromSess retrieves a dolt session from a standard sql.Session
func DSessFromSess(sess sql.Session) *DoltSession {
return sess.(*DoltSession)
@@ -1588,6 +1597,9 @@ func setPersistedValue(conf config.WritableConfig, key string, value interface{}
return config.SetFloat(conf, key, float64(v))
case float64:
return config.SetFloat(conf, key, v)
case decimal.Decimal:
f64, _ := v.Float64()
return config.SetFloat(conf, key, f64)
case string:
return config.SetString(conf, key, v)
case bool:

View File

@@ -74,6 +74,7 @@ type DoltDatabaseProvider interface {
sql.MutableDatabaseProvider
// FileSystem returns the filesystem used by this provider, rooted at the data directory for all databases.
FileSystem() filesys.Filesys
DbFactoryUrl() string
// FileSystemForDatabase returns a filesystem, with the working directory set to the root directory
// of the requested database. If the requested database isn't found, a database not found error
// is returned.

View File

@@ -57,6 +57,11 @@ const (
DoltClusterRoleVariable = "dolt_cluster_role"
DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch"
DoltClusterAckWritesTimeoutSecs = "dolt_cluster_ack_writes_timeout_secs"
DoltStatsAutoRefreshEnabled = "dolt_stats_auto_refresh_enabled"
DoltStatsAutoRefreshThreshold = "dolt_stats_auto_refresh_threshold"
DoltStatsAutoRefreshInterval = "dolt_stats_auto_refresh_interval"
DoltStatsMemoryOnly = "dolt_stats_memory_only"
)
const URLTemplateDatabasePlaceholder = "{database}"

View File

@@ -19,6 +19,7 @@ import (
"fmt"
"os"
"runtime"
"sync"
"testing"
"time"
@@ -40,6 +41,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/stats"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/types"
@@ -2095,6 +2097,18 @@ func TestColumnDiffSystemTablePrepared(t *testing.T) {
}
}
func TestStatsFunctions(t *testing.T) {
harness := newDoltHarness(t)
defer harness.Close()
harness.Setup(setup.MydbData)
harness.configureStats = true
for _, test := range StatProcTests {
t.Run(test.Name, func(t *testing.T) {
enginetest.TestScript(t, harness, test)
})
}
}
func TestDiffTableFunction(t *testing.T) {
harness := newDoltHarness(t)
defer harness.Close()
@@ -3062,6 +3076,112 @@ func TestCreateDatabaseErrorCleansUp(t *testing.T) {
require.True(t, isDir)
}
// TestStatsAutoRefreshConcurrency tests some common concurrent patterns that stats
// refresh is subject to -- namely reading/writing the stats objects in (1) DML statements
// (2) auto refresh threads, and (3) manual ANALYZE statements.
// todo: the dolt_stat functions should be concurrency tested
func TestStatsAutoRefreshConcurrency(t *testing.T) {
// create engine
harness := newDoltHarness(t)
harness.Setup(setup.MydbData)
engine := mustNewEngine(t, harness)
defer engine.Close()
enginetest.RunQueryWithContext(t, engine, harness, nil, `create table xy (x int primary key, y int, z int, key (z), key (y,z), key (y,z,x))`)
enginetest.RunQueryWithContext(t, engine, harness, nil, `create table uv (u int primary key, v int, w int, key (w), key (w,u), key (u,w,v))`)
sqlDb, _ := harness.provider.BaseDatabase(harness.NewContext(), "mydb")
// Setting an interval of 0 and a threshold of 0 will result
// in the stats being updated after every operation
intervalSec := time.Duration(0)
thresholdf64 := 0.
bThreads := sql.NewBackgroundThreads()
statsProv := engine.EngineAnalyzer().Catalog.StatsProvider.(*stats.Provider)
// it is important to use new sessions for this test, to avoid working root conflicts
readCtx := enginetest.NewSession(harness)
writeCtx := enginetest.NewSession(harness)
newCtx := func(context.Context) (*sql.Context, error) {
return enginetest.NewSession(harness), nil
}
err := statsProv.InitAutoRefresh(newCtx, sqlDb.Name(), bThreads, intervalSec, thresholdf64)
require.NoError(t, err)
execQ := func(ctx *sql.Context, q string, id int, tag string) {
_, iter, err := engine.Query(ctx, q)
require.NoError(t, err)
_, err = sql.RowIterToRows(ctx, iter)
//fmt.Printf("%s %d\n", tag, id)
require.NoError(t, err)
}
iters := 1_000
{
// 3 threads to test auto-refresh/DML concurrency safety
// - auto refresh (read + write)
// - write (write only)
// - read (read only)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
for i := 0; i < iters; i++ {
q := "select count(*) from xy a join xy b on a.x = b.x"
execQ(readCtx, q, i, "read")
q = "select count(*) from uv a join uv b on a.u = b.u"
execQ(readCtx, q, i, "read")
}
wg.Done()
}()
go func() {
for i := 0; i < iters; i++ {
q := fmt.Sprintf("insert into xy values (%d,%d,%d)", i, i, i)
execQ(writeCtx, q, i, "write")
q = fmt.Sprintf("insert into uv values (%d,%d,%d)", i, i, i)
execQ(writeCtx, q, i, "write")
}
wg.Done()
}()
wg.Wait()
}
{
// 3 threads to test auto-refresh/manual ANALYZE concurrency
// - auto refresh (read + write)
// - add (read + write)
// - drop (write only)
wg := sync.WaitGroup{}
wg.Add(2)
analyzeAddCtx := enginetest.NewSession(harness)
analyzeDropCtx := enginetest.NewSession(harness)
// hammer the provider with concurrent stat updates
go func() {
for i := 0; i < iters; i++ {
execQ(analyzeAddCtx, "analyze table xy,uv", i, "analyze create")
}
wg.Done()
}()
go func() {
for i := 0; i < iters; i++ {
execQ(analyzeDropCtx, "analyze table xy drop histogram on (y,z)", i, "analyze drop yz")
execQ(analyzeDropCtx, "analyze table uv drop histogram on (w,u)", i, "analyze drop wu")
}
wg.Done()
}()
wg.Wait()
}
}
// runMergeScriptTestsInBothDirections creates a new test run, named |name|, and runs the specified merge |tests|
// in both directions (right to left merge, and left to right merge). If
// |runAsPrepared| is true then the test scripts will be run using the prepared

View File

@@ -43,6 +43,7 @@ import (
type DoltHarness struct {
t *testing.T
provider dsess.DoltDatabaseProvider
statsPro sql.StatsProvider
multiRepoEnv *env.MultiRepoEnv
session *dsess.DoltSession
branchControl *branch_control.Controller
@@ -52,6 +53,7 @@ type DoltHarness struct {
resetData []setup.SetupScript
engine *gms.Engine
skipSetupCommit bool
configureStats bool
useLocalFilesystem bool
setupTestProcedures bool
}
@@ -189,11 +191,14 @@ func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) {
require.True(t, ok)
d.provider = doltProvider
statsPro := stats.NewProvider()
d.statsPro = statsPro
var err error
d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), d.provider, d.multiRepoEnv.Config(), d.branchControl)
d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), d.provider, d.multiRepoEnv.Config(), d.branchControl, d.statsPro)
require.NoError(t, err)
e, err := enginetest.NewEngine(t, d, d.provider, d.setupData, stats.NewProvider())
e, err := enginetest.NewEngine(t, d, d.provider, d.setupData, d.statsPro)
if err != nil {
return nil, err
}
@@ -214,6 +219,22 @@ func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) {
}
}
if d.configureStats {
bThreads := sql.NewBackgroundThreads()
e = e.WithBackgroundThreads(bThreads)
dSess := dsess.DSessFromSess(ctx.Session)
dbCache := dSess.DatabaseCache(ctx)
dsessDbs := make([]dsess.SqlDatabase, len(dbs))
for i, dbName := range dbs {
dsessDbs[i], _ = dbCache.GetCachedRevisionDb(fmt.Sprintf("%s/main", dbName), dbName)
}
if err = statsPro.Configure(ctx, func(context.Context) (*sql.Context, error) { return d.NewSession(), nil }, bThreads, doltProvider, dsessDbs); err != nil {
return nil, err
}
}
return e, nil
}
@@ -226,7 +247,7 @@ func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) {
// Get a fresh session if we are reusing the engine
if !initializeEngine {
var err error
d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), d.provider, d.multiRepoEnv.Config(), d.branchControl)
d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), d.provider, d.multiRepoEnv.Config(), d.branchControl, nil)
require.NoError(t, err)
}
@@ -296,7 +317,7 @@ func (d *DoltHarness) newSessionWithClient(client sql.Client) *dsess.DoltSession
localConfig := d.multiRepoEnv.Config()
pro := d.session.Provider()
dSession, err := dsess.NewDoltSession(sql.NewBaseSessionWithClientServer("address", client, 1), pro.(dsess.DoltDatabaseProvider), localConfig, d.branchControl)
dSession, err := dsess.NewDoltSession(sql.NewBaseSessionWithClientServer("address", client, 1), pro.(dsess.DoltDatabaseProvider), localConfig, d.branchControl, d.statsPro)
dSession.SetCurrentDatabase("mydb")
require.NoError(d.t, err)
return dSession
@@ -318,6 +339,7 @@ func (d *DoltHarness) NewDatabases(names ...string) []sql.Database {
d.closeProvider()
d.engine = nil
d.provider = nil
d.statsPro = stats.NewProvider()
d.branchControl = branch_control.CreateDefaultController(context.Background())
@@ -327,7 +349,7 @@ func (d *DoltHarness) NewDatabases(names ...string) []sql.Database {
d.provider = doltProvider
var err error
d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), doltProvider, d.multiRepoEnv.Config(), d.branchControl)
d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), doltProvider, d.multiRepoEnv.Config(), d.branchControl, d.statsPro)
require.NoError(d.t, err)
// TODO: the engine tests should do this for us
@@ -385,7 +407,7 @@ func (d *DoltHarness) NewReadOnlyEngine(provider sql.DatabaseProvider) (enginete
}
// reset the session as well since we have swapped out the database provider, which invalidates caching assumptions
d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), readOnlyProvider, d.multiRepoEnv.Config(), d.branchControl)
d.session, err = dsess.NewDoltSession(enginetest.NewBaseSession(), readOnlyProvider, d.multiRepoEnv.Config(), d.branchControl, d.statsPro)
require.NoError(d.t, err)
return enginetest.NewEngineWithProvider(nil, d, readOnlyProvider), nil

View File

@@ -26,6 +26,7 @@ import (
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/stats"
@@ -310,7 +311,7 @@ var DoltStatsIOTests = []queries.ScriptTest{
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "select `database`, `table`, `index`, commit_hash, columns, types from dolt_statistics",
Query: "select database_name, table_name, index_name, commit_hash, columns, types from dolt_statistics",
Expected: []sql.Row{
{"mydb", "xy", "primary", "f6la1u3ku5pucfctgrca2afq9vlr4nrs", "x", "bigint"},
{"mydb", "xy", "yz", "9ec31007jaqtahij0tmlmd7j9t9hl1he", "y,z", "int,varchar(500)"},
@@ -348,18 +349,18 @@ var DoltStatsIOTests = []queries.ScriptTest{
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "select `database`, `table`, `index`, commit_hash, columns, types from dolt_statistics where `table` = 'xy'",
Query: "select database_name, table_name, index_name, commit_hash, columns, types from dolt_statistics where table_name = 'xy'",
Expected: []sql.Row{
{"mydb", "xy", "primary", "f6la1u3ku5pucfctgrca2afq9vlr4nrs", "x", "bigint"},
{"mydb", "xy", "yz", "9ec31007jaqtahij0tmlmd7j9t9hl1he", "y,z", "int,varchar(500)"},
},
},
{
Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where `table` = 'xy'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName),
Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where table_name = 'xy'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName),
Expected: []sql.Row{{uint64(6), uint64(6), uint64(0)}, {uint64(6), uint64(3), uint64(0)}},
},
{
Query: "select `table`, `index` from dolt_statistics",
Query: "select `table_name`, `index_name` from dolt_statistics",
Expected: []sql.Row{
{"ab", "primary"},
{"ab", "bc"},
@@ -368,18 +369,137 @@ var DoltStatsIOTests = []queries.ScriptTest{
},
},
{
Query: "select `database`, `table`, `index`, commit_hash, columns, types from dolt_statistics where `table` = 'ab'",
Query: "select database_name, table_name, index_name, commit_hash, columns, types from dolt_statistics where table_name = 'ab'",
Expected: []sql.Row{
{"mydb", "ab", "primary", "t6j206v6b9t8vnmhpcc2i57lom8kejk3", "a", "bigint"},
{"mydb", "ab", "bc", "sibnr73868rb5dqa76opfn4pkelhhqna", "b,c", "int,int"},
},
},
{
Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where `table` = 'ab'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName),
Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where table_name = 'ab'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName),
Expected: []sql.Row{{uint64(6), uint64(6), uint64(0)}, {uint64(6), uint64(3), uint64(0)}},
},
},
},
{
// only edited chunks are scanned and re-written
Name: "incremental stats updates",
SetUpScript: []string{
"CREATE table xy (x bigint primary key, y int, z varchar(500), key(y,z));",
"insert into xy values (0,0,'a'), (2,0,'a'), (4,1,'a'), (6,2,'a')",
"analyze table xy",
"insert into xy values (1,0,'a'), (3,0,'a'), (5,2,'a'), (7,1,'a')",
"analyze table xy",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: fmt.Sprintf("select %s, %s, %s from dolt_statistics where table_name = 'xy'", schema.StatsRowCountColName, schema.StatsDistinctCountColName, schema.StatsNullCountColName),
Expected: []sql.Row{
{uint64(8), uint64(8), uint64(0)},
{uint64(8), uint64(3), uint64(0)},
},
},
},
},
}
var StatProcTests = []queries.ScriptTest{
{
Name: "basic start, status, stop loop",
SetUpScript: []string{
"CREATE table xy (x bigint primary key, y int, z varchar(500), key(y,z));",
"insert into xy values (0,0,'a'), (2,0,'a'), (4,1,'a'), (6,2,'a')",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "select count(*) from dolt_statistics",
ExpectedErrStr: doltdb.ErrNoStatistics.Error(),
},
{
Query: "call dolt_stats_status()",
Expected: []sql.Row{{"no active stats thread"}},
},
// set refresh interval arbitrarily high to avoid updating when we restart
{
Query: "set @@PERSIST.dolt_stats_auto_refresh_interval = 1000;",
Expected: []sql.Row{{}},
},
{
Query: "set @@PERSIST.dolt_stats_auto_refresh_threshold = 0",
Expected: []sql.Row{{}},
},
{
Query: "call dolt_stats_restart()",
},
{
Query: "call dolt_stats_status()",
Expected: []sql.Row{{"restarted thread: mydb"}},
},
{
Query: "set @@PERSIST.dolt_stats_auto_refresh_interval = 0;",
Expected: []sql.Row{{}},
},
// new restart picks up 0-interval, will start refreshing immediately
{
Query: "call dolt_stats_restart()",
},
{
Query: "select sleep(.1)",
},
{
Query: "call dolt_stats_status()",
Expected: []sql.Row{{"updated to hash: vogi4fq0fe8n8rqa80pbsujlmmaljsoo"}},
},
{
Query: "select count(*) from dolt_statistics",
Expected: []sql.Row{{2}},
},
// kill refresh thread
{
Query: "call dolt_stats_stop()",
},
{
Query: "call dolt_stats_status()",
Expected: []sql.Row{{"cancelled thread: mydb"}},
},
// insert without refresh thread will not update stats
{
Query: "insert into xy values (1,0,'a'), (3,0,'a'), (5,2,'a'), (7,1,'a')",
},
{
Query: "select sleep(.1)",
},
{
Query: "call dolt_stats_status()",
Expected: []sql.Row{{"cancelled thread: mydb"}},
},
// manual analyze will update stats
{
Query: "analyze table xy",
Expected: []sql.Row{{"xy", "analyze", "status", "OK"}},
},
{
Query: "call dolt_stats_status()",
Expected: []sql.Row{{"updated to hash: fhnmdo8psvs10od36pqfi0g4cvvu732h"}},
},
{
Query: "select count(*) from dolt_statistics",
Expected: []sql.Row{{2}},
},
// kill refresh thread and delete stats ref
{
Query: "call dolt_stats_drop()",
},
{
Query: "call dolt_stats_status()",
Expected: []sql.Row{{"dropped"}},
},
{
Query: "select count(*) from dolt_statistics",
ExpectedErrStr: doltdb.ErrNoStatistics.Error(),
},
},
},
}
// TestProviderReloadScriptWithEngine runs the test script given with the engine provided.

View File

@@ -33,6 +33,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/env"
dsql "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/stats"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/types"
@@ -141,7 +142,7 @@ func innerInit(h *DoltHarness, dEnv *env.DoltEnv) error {
return err
}
ctx := dsql.NewTestSQLCtxWithProvider(context.Background(), pro)
ctx := dsql.NewTestSQLCtxWithProvider(context.Background(), pro, stats.NewProvider())
h.sess = ctx.Session.(*dsess.DoltSession)
dbs := h.engine.Analyzer.Catalog.AllDatabases(ctx)
@@ -303,6 +304,7 @@ func sqlNewEngine(dEnv *env.DoltEnv) (*sqle.Engine, dsess.DoltDatabaseProvider,
}
pro = pro.WithDbFactoryUrl(doltdb.InMemDoltDB)
engine := sqle.NewDefault(pro)
return engine, pro, nil

View File

@@ -1113,7 +1113,7 @@ func newTestEngine(ctx context.Context, dEnv *env.DoltEnv) (*gms.Engine, *sql.Co
panic(err)
}
doltSession, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, dEnv.Config.WriteableConfig(), nil)
doltSession, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, dEnv.Config.WriteableConfig(), nil, nil)
if err != nil {
panic(err)
}

View File

@@ -0,0 +1,356 @@
// Copyright 2024 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"context"
"fmt"
"strings"
"time"
"github.com/dolthub/go-mysql-server/sql"
types2 "github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
)
const asyncAutoRefreshStats = "async_auto_refresh_stats"
func (p *Provider) Configure(ctx context.Context, ctxFactory func(ctx context.Context) (*sql.Context, error), bThreads *sql.BackgroundThreads, pro *sqle.DoltDatabaseProvider, dbs []dsess.SqlDatabase) error {
p.SetStarter(NewInitDatabaseHook(p, ctxFactory, bThreads, nil))
if _, disabled, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsMemoryOnly); disabled == int8(1) {
return nil
}
loadCtx, err := ctxFactory(ctx)
if err != nil {
return err
}
if err := p.Load(loadCtx, dbs); err != nil {
return err
}
if _, enabled, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshEnabled); enabled == int8(1) {
_, threshold, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshThreshold)
_, interval, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshInterval)
interval64, _, _ := types2.Int64.Convert(interval)
intervalSec := time.Second * time.Duration(interval64.(int64))
thresholdf64 := threshold.(float64)
for _, db := range dbs {
if err := p.InitAutoRefresh(ctxFactory, db.Name(), bThreads, intervalSec, thresholdf64); err != nil {
return err
}
}
pro.InitDatabaseHook = NewInitDatabaseHook(p, ctxFactory, bThreads, pro.InitDatabaseHook)
pro.DropDatabaseHook = NewDropDatabaseHook(p, ctxFactory, pro.DropDatabaseHook)
}
return nil
}
func (p *Provider) InitAutoRefresh(ctxFactory func(ctx context.Context) (*sql.Context, error), dbName string, bThreads *sql.BackgroundThreads, checkInterval time.Duration, updateThresh float64) error {
// this is only called after initial statistics are finished loading
// launch a thread that periodically checks freshness
// retain handle to cancel on drop database
// todo: add Cancel(name) to sql.BackgroundThreads interface
p.mu.Lock()
defer p.mu.Unlock()
dropDbCtx, dbStatsCancel := context.WithCancel(context.Background())
p.cancelers[dbName] = dbStatsCancel
return bThreads.Add(fmt.Sprintf("%s_%s", asyncAutoRefreshStats, dbName), func(ctx context.Context) {
timer := time.NewTimer(checkInterval)
for {
// wake up checker on interval
select {
case <-ctx.Done():
timer.Stop()
return
case <-dropDbCtx.Done():
timer.Stop()
return
case <-timer.C:
sqlCtx, err := ctxFactory(ctx)
if err != nil {
return
}
sqlCtx.GetLogger().Debugf("starting statistics refresh check for '%s': %s", dbName, time.Now().String())
timer.Reset(checkInterval)
// Iterate all dbs, tables, indexes. Each db will collect
// []indexMeta above refresh threshold. We read and process those
// chunks' statistics. We merge updated chunks with precomputed
// chunks. The full set of statistics for each database lands
// 1) in the provider's most recent set of database statistics, and
// 2) on disk in the database's statistics ref'd prolly.Map.
curStats := p.getStats(dbName)
if curStats == nil {
curStats = newDbStats(dbName)
}
newStats := make(map[sql.StatQualifier]*DoltStats)
var deletedStats []sql.StatQualifier
qualExists := make(map[sql.StatQualifier]bool)
tableExistsAndSkipped := make(map[string]bool)
// important: update session references every loop
dSess := dsess.DSessFromSess(sqlCtx.Session)
prov := dSess.Provider()
ddb, ok := dSess.GetDoltDB(sqlCtx, dbName)
if !ok {
sqlCtx.GetLogger().Debugf("statistics refresh error: database not found %s", dbName)
}
sqlDb, err := prov.Database(sqlCtx, dbName)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
tables, err := sqlDb.GetTableNames(sqlCtx)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
for _, table := range tables {
sqlTable, ok, err := sqlDb.GetTableInsensitive(sqlCtx, table)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
if !ok {
sqlCtx.GetLogger().Debugf("statistics refresh error: table not found %s", table)
continue
}
var dTab *doltdb.Table
switch t := sqlTable.(type) {
case *sqle.AlterableDoltTable:
dTab, err = t.DoltTable.DoltTable(sqlCtx)
case *sqle.WritableDoltTable:
dTab, err = t.DoltTable.DoltTable(sqlCtx)
case *sqle.DoltTable:
dTab, err = t.DoltTable(sqlCtx)
default:
err = fmt.Errorf("failed to unwrap dolt table from type: %T", sqlTable)
}
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
tableHash, err := dTab.GetRowDataHash(ctx)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
if curStats.getLatestHash(table) == tableHash {
// no data changes since last check
tableExistsAndSkipped[table] = true
sqlCtx.GetLogger().Debugf("statistics refresh: table hash unchanged since last check: %s", tableHash)
continue
} else {
sqlCtx.GetLogger().Debugf("statistics refresh: new table hash: %s", tableHash)
}
iat, ok := sqlTable.(sql.IndexAddressableTable)
if !ok {
sqlCtx.GetLogger().Debugf("statistics refresh error: table does not support indexes %s", table)
continue
}
indexes, err := iat.GetIndexes(sqlCtx)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
// collect indexes and ranges to be updated
var idxMetas []indexMeta
for _, index := range indexes {
qual := sql.NewStatQualifier(dbName, table, strings.ToLower(index.ID()))
qualExists[qual] = true
curStat := curStats.getIndexStats(qual)
if curStat == nil {
curStat = NewDoltStats()
curStat.Qual = qual
cols := make([]string, len(index.Expressions()))
tablePrefix := fmt.Sprintf("%s.", table)
for i, c := range index.Expressions() {
cols[i] = strings.TrimPrefix(strings.ToLower(c), tablePrefix)
}
curStat.Columns = cols
}
sqlCtx.GetLogger().Debugf("statistics refresh index: %s", qual.String())
updateMeta, err := newIdxMeta(sqlCtx, curStat, dTab, index, curStat.Columns)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
curCnt := float64(len(curStat.active))
updateCnt := float64(len(updateMeta.updateChunks))
deleteCnt := float64(len(curStat.active) - len(updateMeta.preexisting))
sqlCtx.GetLogger().Debugf("statistics current: %d, new: %d, delete: %d", int(curCnt), int(updateCnt), int(deleteCnt))
if curCnt == 0 || (deleteCnt+updateCnt)/curCnt > updateThresh {
sqlCtx.GetLogger().Debugf("statistics updating: %s", updateMeta.qual)
// mark index for updating
idxMetas = append(idxMetas, updateMeta)
// update lastest hash if we haven't already
curStats.setLatestHash(table, tableHash)
}
}
// get new buckets for index chunks to update
newTableStats, err := updateStats(sqlCtx, sqlTable, dTab, indexes, idxMetas)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
// merge new chunks with preexisting chunks
for _, updateMeta := range idxMetas {
stat := newTableStats[updateMeta.qual]
if stat != nil {
newStats[updateMeta.qual] = mergeStatUpdates(stat, updateMeta)
}
}
}
func() {
curStats.mu.Lock()
defer curStats.mu.Unlock()
for _, s := range curStats.stats {
// table or index delete leaves hole in stats
// this is separate from threshold check
if !tableExistsAndSkipped[s.Qual.Table()] && !qualExists[s.Qual] {
// only delete stats we've verified are deleted
deletedStats = append(deletedStats, s.Qual)
}
}
}()
prevMap := curStats.getCurrentMap()
if prevMap.KeyDesc().Count() == 0 {
kd, vd := schema.StatsTableDoltSchema.GetMapDescriptors()
prevMap, err = prolly.NewMapFromTuples(ctx, ddb.NodeStore(), kd, vd)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
}
if len(deletedStats) == 0 && len(newStats) == 0 {
continue
}
if len(deletedStats) > 0 {
sqlCtx.GetLogger().Debugf("statistics refresh: deleting stats %#v", deletedStats)
}
delMap, err := deleteStats(sqlCtx, prevMap, deletedStats...)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
newMap, err := flushStats(sqlCtx, delMap, newStats)
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
curStats.setCurrentMap(newMap)
for q, s := range newStats {
curStats.setIndexStats(q, s)
}
p.setStats(dbName, curStats)
err = ddb.SetStatisics(ctx, newMap.HashOf())
if err != nil {
sqlCtx.GetLogger().Debugf("statistics refresh error: %s", err.Error())
continue
}
}
}
})
}
func newIdxMeta(ctx *sql.Context, curStats *DoltStats, doltTable *doltdb.Table, sqlIndex sql.Index, cols []string) (indexMeta, error) {
var idx durable.Index
var err error
if strings.EqualFold(sqlIndex.ID(), "PRIMARY") {
idx, err = doltTable.GetRowData(ctx)
} else {
idx, err = doltTable.GetIndexRowData(ctx, sqlIndex.ID())
}
if err != nil {
return indexMeta{}, err
}
prollyMap := durable.ProllyMapFromIndex(idx)
// get newest histogram target level hashes
levelNodes, err := tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt)
if err != nil {
return indexMeta{}, err
}
var addrs []hash.Hash
var preservedStats []DoltBucket
var missingAddrs float64
var missingChunks []tree.Node
var missingOffsets [][]uint64
var offset uint64
for _, n := range levelNodes {
// Compare the previous histogram chunks to the newest tree chunks.
// Partition the newest chunks into 1) preserved or 2) missing.
// Missing chunks will need to be scanned on a stats update, so
// track the (start, end) ordinal offsets to simplify the read iter.
treeCnt, err := n.TreeCount()
if err != nil {
return indexMeta{}, err
}
addrs = append(addrs, n.HashOf())
if bucketIdx, ok := curStats.active[n.HashOf()]; !ok {
missingChunks = append(missingChunks, n)
missingOffsets = append(missingOffsets, []uint64{offset, offset + uint64(treeCnt)})
missingAddrs++
} else {
preservedStats = append(preservedStats, curStats.Histogram[bucketIdx])
}
offset += uint64(treeCnt)
}
return indexMeta{
qual: curStats.Qual,
cols: cols,
updateChunks: missingChunks,
updateOrdinals: missingOffsets,
preexisting: preservedStats,
allAddrs: addrs,
}, nil
}

View File

@@ -0,0 +1,58 @@
// Copyright 2024 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"context"
"time"
"github.com/dolthub/go-mysql-server/sql"
types2 "github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
)
func NewInitDatabaseHook(statsProv *Provider, ctxFactory func(ctx context.Context) (*sql.Context, error), bThreads *sql.BackgroundThreads, orig sqle.InitDatabaseHook) sqle.InitDatabaseHook {
return func(ctx *sql.Context, pro *sqle.DoltDatabaseProvider, name string, denv *env.DoltEnv) error {
if orig != nil {
err := orig(ctx, pro, name, denv)
if err != nil {
return err
}
}
_, threshold, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshThreshold)
_, interval, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsAutoRefreshInterval)
interval64, _, _ := types2.Int64.Convert(interval)
intervalSec := time.Second * time.Duration(interval64.(int64))
thresholdf64 := threshold.(float64)
return statsProv.InitAutoRefresh(ctxFactory, name, bThreads, intervalSec, thresholdf64)
}
}
func NewDropDatabaseHook(statsProv *Provider, ctxFactory func(ctx context.Context) (*sql.Context, error), orig sqle.DropDatabaseHook) sqle.DropDatabaseHook {
return func(name string) {
if orig != nil {
orig(name)
}
ctx, err := ctxFactory(context.Background())
if err != nil {
return
}
statsProv.CancelRefreshThread(name)
statsProv.DropDbStats(ctx, name, false)
}
}

View File

@@ -36,13 +36,13 @@ import (
)
func loadStats(ctx *sql.Context, db dsess.SqlDatabase, m prolly.Map) (*dbStats, error) {
dbStat := &dbStats{db: db.Name(), active: make(map[hash.Hash]int), stats: make(map[sql.StatQualifier]*DoltStats)}
dbStat := newDbStats(db.Name())
iter, err := dtables.NewStatsIter(ctx, m)
if err != nil {
return nil, err
}
currentStat := &DoltStats{}
currentStat := NewDoltStats()
var lowerBound sql.Row
for {
row, err := iter.Next(ctx)
@@ -123,9 +123,14 @@ func loadStats(ctx *sql.Context, db dsess.SqlDatabase, m prolly.Map) (*dbStats,
}
currentStat.fds = fds
currentStat.colSet = colSet
currentStat.updateActive()
dbStat.stats[currentStat.Qual] = currentStat
}
currentStat = &DoltStats{Qual: qual, Columns: columns, LowerBound: lowerBound}
currentStat = NewDoltStats()
currentStat.Qual = qual
currentStat.Columns = columns
currentStat.LowerBound = lowerBound
}
if currentStat.Histogram == nil {
@@ -148,7 +153,7 @@ func loadStats(ctx *sql.Context, db dsess.SqlDatabase, m prolly.Map) (*dbStats,
UpperBound: boundRow,
}
dbStat.active[commit] = position
currentStat.active[commit] = position
currentStat.Histogram = append(currentStat.Histogram, bucket)
currentStat.RowCount += uint64(rowCount)
currentStat.DistinctCount += uint64(distinctCount)
@@ -157,6 +162,18 @@ func loadStats(ctx *sql.Context, db dsess.SqlDatabase, m prolly.Map) (*dbStats,
currentStat.CreatedAt = createdAt
}
}
currentStat.LowerBound, err = loadLowerBound(ctx, currentStat.Qual)
if err != nil {
return nil, err
}
fds, colSet, err := loadFuncDeps(ctx, db, currentStat.Qual)
if err != nil {
return nil, err
}
currentStat.fds = fds
currentStat.colSet = colSet
currentStat.updateActive()
dbStat.setIndexStats(currentStat.Qual, currentStat)
dbStat.stats[currentStat.Qual] = currentStat
return dbStat, nil
}

View File

@@ -15,6 +15,7 @@
package stats
import (
"context"
"errors"
"fmt"
"strings"
@@ -25,18 +26,27 @@ import (
"github.com/dolthub/go-mysql-server/sql/stats"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dtables"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
)
var ErrFailedToLoad = errors.New("failed to load statistics")
type DoltStats struct {
level int
chunks []hash.Hash
mu *sync.Mutex
// chunks is a list of addresses for the histogram fanout level
chunks []hash.Hash
// active maps a chunk/bucket address to its position in
// the histogram. 1-indexed to differentiate from an empty
// field on disk
active map[hash.Hash]int
RowCount uint64
DistinctCount uint64
NullCount uint64
@@ -52,12 +62,17 @@ type DoltStats struct {
colSet sql.ColSet
}
func NewDoltStats() *DoltStats {
return &DoltStats{mu: &sync.Mutex{}, active: make(map[hash.Hash]int)}
}
func DoltStatsFromSql(stat sql.Statistic) (*DoltStats, error) {
hist, err := DoltHistFromSql(stat.Histogram(), stat.Types())
if err != nil {
return nil, err
}
return &DoltStats{
mu: &sync.Mutex{},
Qual: stat.Qualifier(),
RowCount: stat.RowCount(),
DistinctCount: stat.DistinctCount(),
@@ -74,7 +89,35 @@ func DoltStatsFromSql(stat sql.Statistic) (*DoltStats, error) {
}, nil
}
func (s *DoltStats) updateActive() {
s.mu.Lock()
defer s.mu.Unlock()
newActive := make(map[hash.Hash]int)
for i, hash := range s.chunks {
newActive[hash] = i
}
s.active = newActive
}
func (s *DoltStats) updateCounts() {
s.mu.Lock()
defer s.mu.Unlock()
var newDistinct uint64
var newRows uint64
var newNulls uint64
for _, b := range s.Histogram {
newDistinct += b.DistinctCount
newRows += b.RowCount
newNulls += b.NullCount
}
s.RowCount = newRows
s.DistinctCount = newDistinct
s.NullCount = newNulls
}
func (s *DoltStats) toSql() sql.Statistic {
s.mu.Lock()
defer s.mu.Unlock()
typStrs := make([]string, len(s.Types))
for i, typ := range s.Types {
typStrs[i] = typ.String()
@@ -142,16 +185,21 @@ func (s DoltHistogram) toSql() []*stats.Bucket {
}
type indexMeta struct {
db string
table string
index string
cols []string
qual sql.StatQualifier
cols []string
updateChunks []tree.Node
// [start, stop] ordinals for each chunk for update
updateOrdinals [][]uint64
preexisting []DoltBucket
allAddrs []hash.Hash
}
func NewProvider() *Provider {
return &Provider{
mu: &sync.Mutex{},
dbStats: make(map[string]*dbStats),
mu: &sync.Mutex{},
dbStats: make(map[string]*dbStats),
cancelers: make(map[string]context.CancelFunc),
status: make(map[string]string),
}
}
@@ -162,29 +210,127 @@ type Provider struct {
mu *sync.Mutex
latestRootAddr hash.Hash
dbStats map[string]*dbStats
cancelers map[string]context.CancelFunc
starter sqle.InitDatabaseHook
status map[string]string
}
// each database has one statistics table that is a collection of the
// table stats in the database
type dbStats struct {
db string
active map[hash.Hash]int
stats map[sql.StatQualifier]*DoltStats
currentMap prolly.Map
mu *sync.Mutex
db string
stats map[sql.StatQualifier]*DoltStats
currentMap prolly.Map
latestRoot *doltdb.RootValue
latestTableHashes map[string]hash.Hash
}
func newDbStats(dbName string) *dbStats {
return &dbStats{
mu: &sync.Mutex{},
db: dbName,
stats: make(map[sql.StatQualifier]*DoltStats),
latestTableHashes: make(map[string]hash.Hash),
}
}
var _ sql.StatsProvider = (*Provider)(nil)
func (p *Provider) StartRefreshThread(ctx *sql.Context, pro dsess.DoltDatabaseProvider, name string, env *env.DoltEnv) error {
err := p.starter(ctx, pro.(*sqle.DoltDatabaseProvider), name, env)
if err != nil {
p.UpdateStatus(name, fmt.Sprintf("error restarting thread %s: %s", name, err.Error()))
return err
}
p.UpdateStatus(name, fmt.Sprintf("restarted thread: %s", name))
return nil
}
func (p *Provider) SetStarter(hook sqle.InitDatabaseHook) {
p.starter = hook
}
func (p *Provider) CancelRefreshThread(dbName string) {
p.mu.Lock()
defer p.mu.Unlock()
if cancel, ok := p.cancelers[dbName]; ok {
cancel()
p.status[dbName] = fmt.Sprintf("cancelled thread: %s", dbName)
}
}
func (p *Provider) ThreadStatus(dbName string) string {
if msg, ok := p.status[dbName]; ok {
return msg
}
return "no active stats thread"
}
func (p *Provider) setStats(dbName string, s *dbStats) {
p.mu.Lock()
defer p.mu.Unlock()
p.dbStats[dbName] = s
if s != nil && len(s.stats) > 0 {
p.status[dbName] = fmt.Sprintf("updated to hash: %s", s.currentMap.HashOf())
}
}
func (p *Provider) getStats(dbName string) *dbStats {
p.mu.Lock()
defer p.mu.Unlock()
s, _ := p.dbStats[dbName]
return s
}
func (s *dbStats) getLatestHash(tableName string) hash.Hash {
s.mu.Lock()
defer s.mu.Unlock()
h, _ := s.latestTableHashes[tableName]
return h
}
func (s *dbStats) setLatestHash(tableName string, h hash.Hash) {
s.mu.Lock()
defer s.mu.Unlock()
s.latestTableHashes[tableName] = h
}
func (s *dbStats) getCurrentMap() prolly.Map {
return s.currentMap
}
func (s *dbStats) setCurrentMap(m prolly.Map) {
s.currentMap = m
}
func (s *dbStats) getIndexStats(qual sql.StatQualifier) *DoltStats {
s.mu.Lock()
defer s.mu.Unlock()
stat, _ := s.stats[qual]
return stat
}
func (s *dbStats) setIndexStats(qual sql.StatQualifier, stat *DoltStats) {
s.mu.Lock()
defer s.mu.Unlock()
s.stats[qual] = stat
}
func (s *dbStats) dropIndexStats(qual sql.StatQualifier) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.stats, qual)
}
// Init scans the statistics tables, populating the |stats| attribute.
// Statistics are not available for reading until we've finished loading.
func (p *Provider) Load(ctx *sql.Context, dbs []dsess.SqlDatabase) error {
p.mu.Lock()
defer p.mu.Unlock()
for _, db := range dbs {
// set map keys so concurrent orthogonal writes are OK
p.dbStats[strings.ToLower(db.Name())] = &dbStats{db: strings.ToLower(db.Name()), stats: make(map[sql.StatQualifier]*DoltStats)}
p.setStats(strings.ToLower(db.Name()), newDbStats(strings.ToLower(db.Name())))
}
eg, ctx := ctx.NewErrgroup()
for _, db := range dbs {
// copy closure variables
@@ -216,7 +362,7 @@ func (p *Provider) Load(ctx *sql.Context, dbs []dsess.SqlDatabase) error {
} else if err != nil {
return err
}
p.dbStats[dbName] = stats
p.setStats(dbName, stats)
return nil
})
}
@@ -225,8 +371,10 @@ func (p *Provider) Load(ctx *sql.Context, dbs []dsess.SqlDatabase) error {
func (p *Provider) GetTableStats(ctx *sql.Context, db, table string) ([]sql.Statistic, error) {
var ret []sql.Statistic
if dbStats := p.dbStats[strings.ToLower(db)]; dbStats != nil {
for qual, stat := range p.dbStats[strings.ToLower(db)].stats {
if dbStat := p.getStats(strings.ToLower(db)); dbStat != nil {
dbStat.mu.Lock()
defer dbStat.mu.Unlock()
for qual, stat := range dbStat.stats {
if strings.EqualFold(db, qual.Database) && strings.EqualFold(table, qual.Tab) {
ret = append(ret, stat.toSql())
}
@@ -241,49 +389,75 @@ func (p *Provider) SetStats(ctx *sql.Context, stats sql.Statistic) error {
return err
}
dbName := strings.ToLower(stats.Qualifier().Database)
if _, ok := p.dbStats[dbName]; !ok {
p.dbStats[dbName] = &dbStats{db: dbName, stats: make(map[sql.StatQualifier]*DoltStats)}
stat := p.getStats(dbName)
if stat == nil {
stat = newDbStats(dbName)
}
p.dbStats[dbName].stats[stats.Qualifier()] = doltStats
stat.setIndexStats(stats.Qualifier(), doltStats)
p.setStats(dbName, stat)
return nil
}
func (p *Provider) GetStats(ctx *sql.Context, qual sql.StatQualifier, cols []string) (sql.Statistic, bool) {
if dbStats := p.dbStats[strings.ToLower(qual.Database)]; dbStats != nil {
if s, ok := p.dbStats[strings.ToLower(qual.Database)].stats[qual]; ok {
return s.toSql(), true
if stat := p.getStats(strings.ToLower(qual.Database)); stat != nil {
idxStat := stat.getIndexStats(qual)
if idxStat != nil {
return idxStat.toSql(), true
}
}
return nil, false
}
func (p *Provider) DropStats(ctx *sql.Context, qual sql.StatQualifier, cols []string) error {
if dbStats := p.dbStats[strings.ToLower(qual.Database)]; dbStats != nil {
delete(p.dbStats[strings.ToLower(qual.Database)].stats, qual)
func (p *Provider) DropDbStats(ctx *sql.Context, db string, flush bool) error {
p.setStats(db, nil)
p.mu.Lock()
defer p.mu.Unlock()
p.status[db] = "dropped"
if flush {
dSess := dsess.DSessFromSess(ctx.Session)
ddb, ok := dSess.GetDoltDB(ctx, db)
if !ok {
return nil
}
return ddb.DropStatisics(ctx)
}
return nil
}
func (p *Provider) DropStats(ctx *sql.Context, qual sql.StatQualifier, cols []string) error {
if stat := p.getStats(strings.ToLower(qual.Database)); stat != nil {
stat.dropIndexStats(qual)
p.UpdateStatus(qual.Db(), fmt.Sprintf("dropped statisic: %s", qual.String()))
}
return nil
}
func (p *Provider) UpdateStatus(db string, msg string) {
p.mu.Lock()
defer p.mu.Unlock()
p.status[db] = msg
}
func (p *Provider) RowCount(ctx *sql.Context, db, table string) (uint64, error) {
var cnt uint64
if dbStats := p.dbStats[strings.ToLower(db)]; dbStats != nil {
for qual, s := range p.dbStats[strings.ToLower(db)].stats {
if strings.EqualFold(db, qual.Database) && strings.EqualFold(table, qual.Table()) {
if s.RowCount > cnt {
cnt = s.RowCount
}
if dbStat := p.getStats(strings.ToLower(db)); dbStat != nil {
dbStat.mu.Lock()
defer dbStat.mu.Unlock()
for qual, s := range dbStat.stats {
if strings.EqualFold(db, qual.Database) && strings.EqualFold(table, qual.Table()) && strings.EqualFold(qual.Index(), "primary") {
return s.RowCount, nil
}
}
}
return cnt, nil
return 0, nil
}
func (p *Provider) DataLength(_ *sql.Context, db, table string) (uint64, error) {
var avgSize uint64
for meta, s := range p.dbStats[strings.ToLower(db)].stats {
if strings.EqualFold(db, meta.Database) && strings.EqualFold(table, meta.Table()) {
if s.AvgSize > avgSize {
avgSize = s.AvgSize
if dbStat := p.getStats(strings.ToLower(db)); dbStat != nil {
dbStat.mu.Lock()
defer dbStat.mu.Unlock()
for qual, s := range dbStat.stats {
if strings.EqualFold(db, qual.Database) && strings.EqualFold(table, qual.Table()) && strings.EqualFold(qual.Index(), "primary") {
return s.AvgSize, nil
}
}
}
@@ -291,7 +465,9 @@ func (p *Provider) DataLength(_ *sql.Context, db, table string) (uint64, error)
}
func (p *Provider) RefreshTableStats(ctx *sql.Context, table sql.Table, db string) error {
tableName := strings.ToLower(table.Name())
dbName := strings.ToLower(db)
iat, ok := table.(sql.IndexAddressableTable)
if !ok {
return nil
@@ -301,7 +477,42 @@ func (p *Provider) RefreshTableStats(ctx *sql.Context, table sql.Table, db strin
return err
}
tablePrefix := fmt.Sprintf("%s.", strings.ToLower(table.Name()))
// it's important to update session references every call
dSess := dsess.DSessFromSess(ctx.Session)
prov := dSess.Provider()
sqlDb, err := prov.Database(ctx, dbName)
if err != nil {
return err
}
sqlTable, ok, err := sqlDb.GetTableInsensitive(ctx, tableName)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("error creating statistics for table: %s; table not found", tableName)
}
var dTab *doltdb.Table
switch t := sqlTable.(type) {
case *sqle.AlterableDoltTable:
dTab, err = t.DoltTable.DoltTable(ctx)
case *sqle.WritableDoltTable:
dTab, err = t.DoltTable.DoltTable(ctx)
case *sqle.DoltTable:
dTab, err = t.DoltTable(ctx)
default:
return fmt.Errorf("failed to unwrap dolt table from type: %T", sqlTable)
}
if err != nil {
return err
}
curStats := p.getStats(dbName)
if curStats == nil {
curStats = newDbStats(dbName)
}
tablePrefix := fmt.Sprintf("%s.", tableName)
var idxMetas []indexMeta
for _, idx := range indexes {
cols := make([]string, len(idx.Expressions()))
@@ -309,34 +520,37 @@ func (p *Provider) RefreshTableStats(ctx *sql.Context, table sql.Table, db strin
cols[i] = strings.TrimPrefix(strings.ToLower(c), tablePrefix)
}
idxMeta := indexMeta{
db: db,
table: strings.ToLower(table.Name()),
index: strings.ToLower(idx.ID()),
cols: cols,
qual := sql.NewStatQualifier(db, table.Name(), strings.ToLower(idx.ID()))
curStat := curStats.getIndexStats(qual)
if curStat == nil {
curStat = NewDoltStats()
curStat.Qual = qual
}
idxMeta, err := newIdxMeta(ctx, curStat, dTab, idx, cols)
if err != nil {
return err
}
idxMetas = append(idxMetas, idxMeta)
}
newStats, err := refreshStats(ctx, indexes, idxMetas)
newTableStats, err := updateStats(ctx, sqlTable, dTab, indexes, idxMetas)
if err != nil {
return err
}
sess := dsess.DSessFromSess(ctx.Session)
ddb, ok := sess.GetDoltDB(ctx, dbName)
// merge new chunks with preexisting chunks
newStats := make(map[sql.StatQualifier]*DoltStats)
for _, idxMeta := range idxMetas {
stat := newTableStats[idxMeta.qual]
newStats[idxMeta.qual] = mergeStatUpdates(stat, idxMeta)
}
ddb, ok := dSess.GetDoltDB(ctx, dbName)
if !ok {
return fmt.Errorf("database not found in session for stats update: %s", db)
}
if _, ok := p.dbStats[dbName]; !ok {
p.dbStats[dbName] = &dbStats{db: strings.ToLower(db), stats: make(map[sql.StatQualifier]*DoltStats)}
}
for qual, stats := range newStats {
p.dbStats[dbName].stats[qual] = stats
}
prevMap := p.dbStats[dbName].currentMap
prevMap := curStats.currentMap
if prevMap.KeyDesc().Count() == 0 {
kd, vd := schema.StatsTableDoltSchema.GetMapDescriptors()
prevMap, err = prolly.NewMapFromTuples(ctx, ddb.NodeStore(), kd, vd)
@@ -349,7 +563,12 @@ func (p *Provider) RefreshTableStats(ctx *sql.Context, table sql.Table, db strin
return err
}
p.dbStats[dbName].currentMap = newMap
curStats.setCurrentMap(newMap)
for k, v := range newStats {
curStats.setIndexStats(k, v)
}
p.setStats(dbName, curStats)
return ddb.SetStatisics(ctx, newMap.HashOf())
}

View File

@@ -18,7 +18,6 @@ import (
"container/heap"
"context"
"errors"
"fmt"
"io"
"strings"
"time"
@@ -28,9 +27,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/val"
)
@@ -40,121 +37,103 @@ const (
mcvCnt = 3
)
// refreshStats builds histograms for each index statistic metadata
// indicated in |newStats|.
func refreshStats(ctx *sql.Context, indexes []sql.Index, idxMetas []indexMeta) (map[sql.StatQualifier]*DoltStats, error) {
dSess := dsess.DSessFromSess(ctx.Session)
prov := dSess.Provider()
db, err := prov.Database(ctx, idxMetas[0].db)
if err != nil {
return nil, err
}
tab, ok, err := db.GetTableInsensitive(ctx, idxMetas[0].table)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("error creating statistics for table: %s; table not found", idxMetas[0].table)
}
// updateStats builds histograms for a list of index statistic metadata.
// We only read chunk ranges indicated by |indexMeta.updateOrdinals|. If
// the returned buckets are a subset of the index the caller is responsible
// for reconciling the difference.
func updateStats(ctx *sql.Context, sqlTable sql.Table, dTab *doltdb.Table, indexes []sql.Index, idxMetas []indexMeta) (map[sql.StatQualifier]*DoltStats, error) {
nameToIdx := make(map[string]sql.Index)
for _, idx := range indexes {
nameToIdx[strings.ToLower(idx.ID())] = idx
}
var dTab *doltdb.Table
switch t := tab.(type) {
case *sqle.AlterableDoltTable:
dTab, err = t.DoltTable.DoltTable(ctx)
case *sqle.WritableDoltTable:
dTab, err = t.DoltTable.DoltTable(ctx)
case *sqle.DoltTable:
dTab, err = t.DoltTable(ctx)
default:
return nil, fmt.Errorf("failed to unwrap dolt table from type: %T", tab)
}
if err != nil {
return nil, err
}
ret := make(map[sql.StatQualifier]*DoltStats)
for i, meta := range idxMetas {
for _, meta := range idxMetas {
var idx durable.Index
var err error
if strings.EqualFold(meta.index, "PRIMARY") {
if strings.EqualFold(meta.qual.Index(), "PRIMARY") {
idx, err = dTab.GetRowData(ctx)
} else {
idx, err = dTab.GetIndexRowData(ctx, meta.index)
idx, err = dTab.GetIndexRowData(ctx, meta.qual.Index())
}
if err != nil {
return nil, err
}
prollyMap := durable.ProllyMapFromIndex(idx)
keyBuilder := val.NewTupleBuilder(prollyMap.KeyDesc())
buffPool := prollyMap.NodeStore().Pool()
firstIter, err := prollyMap.IterOrdinalRange(ctx, 0, 1)
sqlIdx := nameToIdx[strings.ToLower(meta.qual.Index())]
fds, colSet, err := stats.IndexFds(meta.qual.Table(), sqlTable.Schema(), sqlIdx)
if err != nil {
return nil, err
}
keyBytes, _, err := firstIter.Next(ctx)
if err != nil {
return nil, err
}
for i := range keyBuilder.Desc.Types {
keyBuilder.PutRaw(i, keyBytes.GetField(i))
}
prefixLen := len(meta.cols)
firstKey := keyBuilder.BuildPrefixNoRecycle(buffPool, prefixLen)
firstRow := make(sql.Row, prefixLen)
for i := 0; i < prefixLen; i++ {
firstRow[i], err = tree.GetField(ctx, prollyMap.KeyDesc(), i, firstKey, prollyMap.NodeStore())
if err != nil {
return nil, err
}
}
var types []sql.Type
for _, cet := range indexes[i].ColumnExpressionTypes() {
for _, cet := range nameToIdx[strings.ToLower(meta.qual.Index())].ColumnExpressionTypes() {
types = append(types, cet.Type)
}
// find level
levelNodes, err := tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt)
if cnt, err := prollyMap.Count(); err != nil {
return nil, err
} else if cnt == 0 {
// table is empty
ret[meta.qual] = NewDoltStats()
ret[meta.qual].chunks = meta.allAddrs
ret[meta.qual].CreatedAt = time.Now()
ret[meta.qual].Columns = meta.cols
ret[meta.qual].Types = types
ret[meta.qual].Qual = meta.qual
ret[meta.qual].fds = fds
ret[meta.qual].colSet = colSet
continue
}
firstRow, err := firstRowForIndex(ctx, prollyMap, keyBuilder, len(meta.cols))
if err != nil {
return nil, err
}
var addrs []hash.Hash
for _, n := range levelNodes {
addrs = append(addrs, n.HashOf())
// find level if not exists
if len(meta.updateChunks) == 0 {
levelNodes, err := tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt)
if err != nil {
return nil, err
}
var chunks []tree.Node
var offsets [][]uint64
var offset uint64
for _, n := range levelNodes {
chunks = append(chunks, n)
treeCnt, err := n.TreeCount()
if err != nil {
return nil, err
}
offsets = append(offsets, []uint64{offset, offset + uint64(treeCnt)})
offset += uint64(treeCnt)
}
meta.updateChunks = chunks
meta.updateOrdinals = offsets
}
qual := sql.NewStatQualifier(meta.db, meta.table, meta.index)
updater := newBucketBuilder(qual, len(meta.cols), prollyMap.KeyDesc())
ret[qual] = &DoltStats{
level: levelNodes[0].Level(),
chunks: addrs,
CreatedAt: time.Now(),
Columns: meta.cols,
Types: types,
Qual: qual,
}
updater := newBucketBuilder(meta.qual, len(meta.cols), prollyMap.KeyDesc())
ret[meta.qual] = NewDoltStats()
ret[meta.qual].chunks = meta.allAddrs
ret[meta.qual].CreatedAt = time.Now()
ret[meta.qual].Columns = meta.cols
ret[meta.qual].Types = types
ret[meta.qual].Qual = meta.qual
var start, stop uint64
// read leaf rows for each bucket
for i, _ := range levelNodes {
for i, chunk := range meta.updateChunks {
// each node is a bucket
updater.newBucket()
// we read exclusive range [node first key, next node first key)
start = stop
leafCnt, err := levelNodes[i].TreeCount()
if err != nil {
return nil, err
}
stop = start + uint64(leafCnt)
start, stop = meta.updateOrdinals[i][0], meta.updateOrdinals[i][1]
iter, err := prollyMap.IterOrdinalRange(ctx, start, stop)
if err != nil {
return nil, err
@@ -172,7 +151,7 @@ func refreshStats(ctx *sql.Context, indexes []sql.Index, idxMetas []indexMeta) (
keyBuilder.PutRaw(i, keyBytes.GetField(i))
}
updater.add(keyBuilder.BuildPrefixNoRecycle(buffPool, updater.prefixLen))
updater.add(keyBuilder.BuildPrefixNoRecycle(prollyMap.Pool(), updater.prefixLen))
keyBuilder.Recycle()
}
@@ -181,16 +160,10 @@ func refreshStats(ctx *sql.Context, indexes []sql.Index, idxMetas []indexMeta) (
if err != nil {
return nil, err
}
bucket.Chunk = addrs[i]
bucket.Chunk = chunk.HashOf()
ret[updater.qual].Histogram = append(ret[updater.qual].Histogram, bucket)
}
sqlIdx := nameToIdx[strings.ToLower(qual.Index())]
fds, colSet, err := stats.IndexFds(qual.Table(), tab.Schema(), sqlIdx)
if err != nil {
return nil, err
}
ret[updater.qual].DistinctCount = uint64(updater.globalDistinct)
ret[updater.qual].RowCount = uint64(updater.globalCount)
ret[updater.qual].LowerBound = firstRow
@@ -200,6 +173,65 @@ func refreshStats(ctx *sql.Context, indexes []sql.Index, idxMetas []indexMeta) (
return ret, nil
}
func mergeStatUpdates(newStats *DoltStats, idxMeta indexMeta) *DoltStats {
if len(newStats.Histogram) == len(idxMeta.allAddrs) {
newStats.updateActive()
return newStats
}
oldHist := idxMeta.preexisting
var mergeHist DoltHistogram
newHist := newStats.Histogram
var i, j int
for _, chunkAddr := range idxMeta.allAddrs {
if i < len(oldHist) && oldHist[i].Chunk == chunkAddr {
mergeHist = append(mergeHist, oldHist[i])
i++
} else if j < len(newHist) && newHist[j].Chunk == chunkAddr {
mergeHist = append(mergeHist, newHist[j])
j++
}
}
newStats.Histogram = mergeHist
newStats.chunks = idxMeta.allAddrs
newStats.updateActive()
newStats.updateCounts()
return newStats
}
func firstRowForIndex(ctx *sql.Context, prollyMap prolly.Map, keyBuilder *val.TupleBuilder, prefixLen int) (sql.Row, error) {
if cnt, err := prollyMap.Count(); err != nil {
return nil, err
} else if cnt == 0 {
return nil, nil
}
buffPool := prollyMap.NodeStore().Pool()
// first row is ordinal 0
firstIter, err := prollyMap.IterOrdinalRange(ctx, 0, 1)
if err != nil {
return nil, err
}
keyBytes, _, err := firstIter.Next(ctx)
if err != nil {
return nil, err
}
for i := range keyBuilder.Desc.Types {
keyBuilder.PutRaw(i, keyBytes.GetField(i))
}
firstKey := keyBuilder.BuildPrefixNoRecycle(buffPool, prefixLen)
firstRow := make(sql.Row, prefixLen)
for i := 0; i < prefixLen; i++ {
firstRow[i], err = tree.GetField(ctx, prollyMap.KeyDesc(), i, firstKey, prollyMap.NodeStore())
if err != nil {
return nil, err
}
}
return firstRow, nil
}
func newBucketBuilder(qual sql.StatQualifier, prefixLen int, tupleDesc val.TupleDesc) *bucketBuilder {
return &bucketBuilder{
qual: qual,

View File

@@ -24,21 +24,33 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
stypes "github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
// About ~200 20 byte address fit in a ~4k chunk. Chunk sizes
// are approximate, but certainly shouldn't reach the square
// of the expected size.
const maxBucketFanout = 200 * 200
func newStatsTable(ctx *sql.Context, ns tree.NodeStore, vrw stypes.ValueReadWriter) (*doltdb.Table, error) {
return doltdb.CreateEmptyTable(ctx, ns, vrw, schema.StatsTableDoltSchema)
}
// flushStats writes a set of table statistics to the given node store, and returns a new prolly.Map
func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualifier]*DoltStats) (prolly.Map, error) {
if _, disabled, _ := sql.SystemVariables.GetGlobal(dsess.DoltStatsMemoryOnly); disabled == int8(1) {
// do not write to disk
return prolly.Map{}, nil
}
sch := schema.StatsTableDoltSchema
kd, vd := sch.GetMapDescriptors()
m := prev.Mutate()
var m *prolly.MutableMap
m = prev.Mutate()
pool := prev.NodeStore().Pool()
keyBuilder := val.NewTupleBuilder(kd)
@@ -56,10 +68,10 @@ func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualif
}
return b.String()
}
var pos int64
for qual, stats := range tableStats {
var pos int64
// delete previous entries for this index
// delete previous entries for this index -> (db, table, index, pos)
keyBuilder.PutString(0, qual.Database)
keyBuilder.PutString(1, qual.Table())
keyBuilder.PutString(2, qual.Index())
@@ -68,11 +80,11 @@ func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualif
keyBuilder.PutString(0, qual.Database)
keyBuilder.PutString(1, qual.Table())
keyBuilder.PutString(2, qual.Index())
keyBuilder.PutInt64(3, 10000)
keyBuilder.PutInt64(3, maxBucketFanout+1)
maxKey := keyBuilder.Build(pool)
// there is a limit on the number of buckets for a given index, iter
// will terminate after we run over.
// will terminate before maxBucketFanout
iter, err := prev.IterKeyRange(ctx, firstKey, maxKey)
if err != nil {
return prolly.Map{}, err
@@ -100,6 +112,13 @@ func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualif
}
typesStr := typesB.String()
if len(stats.Types) != len(stats.Columns) {
ctx.GetLogger().Println(stats.Qual.String())
ctx.GetLogger().Println(typesStr)
ctx.GetLogger().Println(strings.Join(stats.Columns, ","))
panic("invalid statistic")
}
for _, h := range stats.Histogram {
var upperBoundElems []string
for _, v := range h.UpperBound {
@@ -139,3 +158,54 @@ func flushStats(ctx *sql.Context, prev prolly.Map, tableStats map[sql.StatQualif
return m.Map(ctx)
}
func deleteStats(ctx *sql.Context, prev prolly.Map, quals ...sql.StatQualifier) (prolly.Map, error) {
if cnt, err := prev.Count(); err != nil {
return prolly.Map{}, err
} else if cnt == 0 {
return prev, nil
}
sch := schema.StatsTableDoltSchema
kd, _ := sch.GetMapDescriptors()
var m *prolly.MutableMap
m = prev.Mutate()
pool := prev.NodeStore().Pool()
keyBuilder := val.NewTupleBuilder(kd)
for _, qual := range quals {
// delete previous entries for this index -> (db, table, index, pos)
keyBuilder.PutString(0, qual.Database)
keyBuilder.PutString(1, qual.Table())
keyBuilder.PutString(2, qual.Index())
keyBuilder.PutInt64(3, 0)
firstKey := keyBuilder.Build(pool)
keyBuilder.PutString(0, qual.Database)
keyBuilder.PutString(1, qual.Table())
keyBuilder.PutString(2, qual.Index())
keyBuilder.PutInt64(3, maxBucketFanout+1)
maxKey := keyBuilder.Build(pool)
// there is a limit on the number of buckets for a given index, iter
// will terminate before maxBucketFanout
iter, err := prev.IterKeyRange(ctx, firstKey, maxKey)
if err != nil {
return prolly.Map{}, err
}
for {
k, _, err := iter.Next(ctx)
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return prolly.Map{}, err
}
err = m.Put(ctx, k, nil)
if err != nil {
return prolly.Map{}, err
}
}
}
return m.Map(ctx)
}

View File

@@ -195,6 +195,34 @@ func AddDoltSystemVariables() {
Type: types.NewSystemBoolType("dolt_dont_merge_json"),
Default: int8(0),
},
{
Name: dsess.DoltStatsAutoRefreshEnabled,
Dynamic: true,
Scope: sql.SystemVariableScope_Global,
Type: types.NewSystemBoolType(dsess.DoltStatsAutoRefreshEnabled),
Default: int8(0),
},
{
Name: dsess.DoltStatsMemoryOnly,
Dynamic: true,
Scope: sql.SystemVariableScope_Global,
Type: types.NewSystemBoolType(dsess.DoltStatsMemoryOnly),
Default: int8(0),
},
{
Name: dsess.DoltStatsAutoRefreshThreshold,
Dynamic: true,
Scope: sql.SystemVariableScope_Global,
Type: types.NewSystemDoubleType(dsess.DoltStatsAutoRefreshEnabled, 0, 10),
Default: float64(.5),
},
{
Name: dsess.DoltStatsAutoRefreshInterval,
Dynamic: true,
Scope: sql.SystemVariableScope_Global,
Type: types.NewSystemIntType(dsess.DoltStatsAutoRefreshInterval, 0, 1<<10, false),
Default: 120,
},
})
}

View File

@@ -110,8 +110,8 @@ func ExecuteSql(dEnv *env.DoltEnv, root *doltdb.RootValue, statements string) (*
return db.GetRoot(ctx)
}
func NewTestSQLCtxWithProvider(ctx context.Context, pro dsess.DoltDatabaseProvider) *sql.Context {
s, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, config2.NewMapConfig(make(map[string]string)), branch_control.CreateDefaultController(ctx))
func NewTestSQLCtxWithProvider(ctx context.Context, pro dsess.DoltDatabaseProvider, statsPro sql.StatsProvider) *sql.Context {
s, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, config2.NewMapConfig(make(map[string]string)), branch_control.CreateDefaultController(ctx), statsPro)
if err != nil {
panic(err)
}
@@ -132,7 +132,8 @@ func NewTestEngine(dEnv *env.DoltEnv, ctx context.Context, db dsess.SqlDatabase)
}
engine := sqle.NewDefault(pro)
sqlCtx := NewTestSQLCtxWithProvider(ctx, pro)
sqlCtx := NewTestSQLCtxWithProvider(ctx, pro, nil)
sqlCtx.SetCurrentDatabase(db.Name())
return engine, sqlCtx, nil
}

View File

@@ -0,0 +1,350 @@
#!/usr/bin/env bats
load $BATS_TEST_DIRNAME/helper/common.bash
load $BATS_TEST_DIRNAME/helper/query-server-common.bash
setup() {
skiponwindows "tests are flaky on Windows"
if [ "$SQL_ENGINE" = "remote-engine" ]; then
skip "This test tests remote connections directly, SQL_ENGINE is not needed."
fi
setup_common
TMPDIRS=$(pwd)/tmpdirs
mkdir -p $TMPDIRS/{repo1,repo2}
cd $TMPDIRS/repo1
dolt init
dolt sql <<SQL
create table ab (a int primary key, b int, key (b,a));
SQL
cd $TMPDIRS/repo2
dolt init
dolt sql <<SQL
create table xy (x int primary key, y int, key (y,x));
create table ab (a int primary key, b int, key (b,a));
SQL
cd $TMPDIRS
}
teardown() {
teardown_common
stop_sql_server 1
rm -rf $TMPDIRS
cd $BATS_TMPDIR
}
@test "stats: empty initial stats" {
cd repo2
dolt sql -q "insert into xy values (0,0), (1,1)"
start_sql_server
sleep 1
stop_sql_server
# no statistics error if ref does not exist
run dolt sql -r csv -q "select database_name, table_name, index_name from dolt_statistics"
[ "$status" -eq 1 ]
[[ "$output" =~ "no statistics found" ]] || false
# setting variables doesn't hang or error
dolt sql -q "set @@PERSIST.dolt_stats_auto_refresh_enabled = 1;"
dolt sql -q "set @@PERSIST.dolt_stats_auto_refresh_threshold = .5"
dolt sql -q "set @@PERSIST.dolt_stats_auto_refresh_interval = 1;"
# auto refresh can only initialize at server startup
start_sql_server
# need to trigger at least one refresh cycle
sleep 1
# only statistics for non-empty tables are collected
run dolt sql -r csv -q "select database_name, table_name, index_name from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[0]}" = "database_name,table_name,index_name" ]
[ "${lines[1]}" = "repo2,xy,primary" ]
[ "${lines[2]}" = "repo2,xy,yx" ]
# appending new chunks picked up
dolt sql -q "insert into xy select x, 1 from (with recursive inputs(x) as (select 4 union select x+1 from inputs where x < 1000) select * from inputs) dt;"
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "8" ]
# updates picked up
dolt sql -q "update xy set y = 2 where x between 100 and 800"
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "8" ]
}
@test "stats: deletes refresh" {
cd repo2
dolt sql -q "insert into xy select x, 1 from (with recursive inputs(x) as (select 4 union select x+1 from inputs where x < 1000) select * from inputs) dt;"
# setting variables doesn't hang or error
dolt sql -q "set @@persist.dolt_stats_auto_refresh_enabled = 1;"
dolt sql -q "set @@persist.dolt_stats_auto_refresh_threshold = .5"
dolt sql -q "set @@persist.dolt_stats_auto_refresh_interval = 1;"
start_sql_server
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "8" ]
# delete >50% of rows
dolt sql -q "delete from xy where x > 500"
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "4" ]
}
@test "stats: add/delete table" {
cd repo1
dolt sql -q "insert into ab values (0,0), (1,0), (2,0)"
# setting variables doesn't hang or error
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = .5"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;"
start_sql_server
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "2" ]
# add table
dolt sql -q "create table xy (x int primary key, y int)"
# schema changes don't impact the table hash
dolt sql -q "insert into xy values (0,0)"
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics where table_name = 'xy'"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "1" ]
dolt sql -q "truncate table xy"
sleep 1
dolt sql -q "select * from xy"
dolt sql -q "select * from dolt_statistics where table_name = 'xy'"
run dolt sql -r csv -q "select count(*) from dolt_statistics where table_name = 'xy'"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "0" ]
dolt sql -q "drop table xy"
run dolt sql -r csv -q "select count(*) from dolt_statistics where table_name = 'xy'"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "0" ]
}
@test "stats: add/delete index" {
cd repo2
dolt sql -q "insert into xy values (0,0), (1,0), (2,0)"
# setting variables doesn't hang or error
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = .5"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;"
start_sql_server
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "2" ]
# delete secondary
dolt sql -q "alter table xy drop index yx"
# schema changes don't impact the table hash
dolt sql -q "insert into xy values (3,0)"
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "1" ]
dolt sql -q "alter table xy add index yx (y,x)"
# row change to impact table hash
dolt sql -q "insert into xy values (4,0)"
sleep 1
run dolt sql -r csv -q "select count(*) from dolt_statistics"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "2" ]
}
@test "stats: most common values" {
cd repo2
dolt sql -q "alter table xy add index (y)"
dolt sql -q "insert into xy values (0,0), (1,0), (2,0), (3,0), (4,0), (5,0)"
# setting variables doesn't hang or error
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = .5"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;"
# auto refresh can only initialize at server startup
start_sql_server
# need to trigger at least one refresh cycle
sleep 1
run dolt sql -r csv -q "select mcv1 from dolt_statistics where index_name = 'y'"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "0" ]
sleep 1
dolt sql -q "update xy set y = 2 where x between 0 and 3"
sleep 1
run dolt sql -r csv -q "select mcv1 as mcv from dolt_statistics where index_name = 'y' union select mcv2 as mcv from dolt_statistics where index_name = 'y' order by mcv"
[ "$status" -eq 0 ]
[ "${lines[1]}" = "0" ]
[ "${lines[2]}" = "2" ]
}
@test "stats: multi db" {
cd repo1
dolt sql -q "insert into ab values (0,0), (1,1)"
cd ../repo2
dolt sql -q "insert into ab values (0,0), (1,1)"
dolt sql -q "insert into xy values (0,0), (1,1)"
cd ..
start_sql_server
sleep 1
stop_sql_server
run dolt sql -r csv -q "select database_name, table_name, index_name from dolt_statistics"
[ "$status" -eq 1 ]
[[ "$output" =~ "no statistics found" ]] || false
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = 0.5"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;"
start_sql_server
sleep 1
dolt sql -q "use repo1"
run dolt sql -r csv -q "select database_name, table_name, index_name from dolt_statistics order by index_name"
[ "$status" -eq 0 ]
[ "${lines[0]}" = "database_name,table_name,index_name" ]
[ "${lines[1]}" = "repo1,ab,ba" ]
[ "${lines[2]}" = "repo1,ab,primary" ]
run dolt sql -r csv -q "select database_name, table_name, index_name from repo2.dolt_statistics order by index_name"
[ "$status" -eq 0 ]
[ "${lines[0]}" = "database_name,table_name,index_name" ]
[ "${lines[1]}" = "repo2,ab,ba" ]
[ "${lines[2]}" = "repo2,ab,primary" ]
[ "${lines[3]}" = "repo2,xy,primary" ]
[ "${lines[4]}" = "repo2,xy,yx" ]
}
@test "stats: add/delete database" {
cd repo1
# setting variables doesn't hang or error
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_enabled = 1;"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_threshold = .5"
dolt sql -q "SET @@persist.dolt_stats_auto_refresh_interval = 1;"
start_sql_server
dolt sql -q "insert into ab values (0,0), (1,0), (2,0)"
dolt sql <<SQL
create database repo2;
create table repo2.xy (x int primary key, y int, key(y,x));
insert into repo2.xy values (0,0), (1,0), (2,0);
SQL
sleep 1
# specify database_name filter even though can only see active db stats
run dolt sql -r csv <<SQL
use repo2;
select count(*) from dolt_statistics where database_name = 'repo2';
SQL
[ "$status" -eq 0 ]
[ "${lines[2]}" = "2" ]
# drop repo2
dolt sql -q "drop database repo2"
sleep 1
# we can't access repo2 stats, but still try
run dolt sql -r csv <<SQL
select count(*) from dolt_statistics where database_name = 'repo2';
SQL
[ "$status" -eq 0 ]
[ "${lines[1]}" = "0" ]
dolt sql <<SQL
create database repo2;
create table repo2.xy (x int primary key, y int, key(y,x));
SQL
sleep 1
# no rows yet
run dolt sql -r csv <<SQL
use repo2;
select count(*) from dolt_statistics where database_name = 'repo2';
SQL
[ "$status" -eq 0 ]
[ "${lines[2]}" = "0" ]
dolt sql <<SQL
use repo2;
insert into xy values (0,0);
SQL
sleep 1
# insert initializes stats
run dolt sql -r csv <<SQL
use repo2;
select count(*) from dolt_statistics where database_name = 'repo2';
SQL
[ "$status" -eq 0 ]
[ "${lines[2]}" = "2" ]
}