go: sqle/statspro: prollyStats: FlushFromMem: Fix long-held lock when flushing stats to disk.

For sql-server instances with many databases, many branches or many tables
indexes, flushing stats could cause queries to block until stats were finished
flushing. StatsController attempted to rate limited how much I/O bandwidth
would be used to flush stats, but this is currently counter-productive because
all queries are blocked from analyzing successfully until stats are flushed.

This changes it back to flush stats to disk as quickly as possible when we need
to do it. Computing stats is already rate limited, so for large flushes this
should not happen to often.

We will continue to improve things here so that stats writes have less impact
on server resource utilization and do not block user-facing work as they
proceed.
This commit is contained in:
Aaron Son
2025-05-20 15:42:53 -07:00
parent f829604e39
commit a90a4bca07
2 changed files with 244 additions and 23 deletions
@@ -0,0 +1,222 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"strings"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver"
)
func TestStatsGCConcurrency(t *testing.T) {
t.Parallel()
// At one time, it was possible for a GC of stats to block the
// analyzer from accessing stats for a long time. This test sets
// up the following condition:
//
// * 100 branches, each with 10 tables, each with 10 indexes.
// All of the branches and tables and indexes are non-empty.
//
// * One thread calls `call dolt_stats_gc()`
//
// * Other threads continue to connect to the database and run
// qureies.
//
// * We expect all queries to complete without much delay. If
// there is a long delay on the read queries, this fails the
// test.
ctx := context.Background()
u, err := driver.NewDoltUser()
require.NoError(t, err)
t.Cleanup(func() {
u.Cleanup()
})
rs, err := u.MakeRepoStore()
require.NoError(t, err)
repo, err := rs.MakeRepo("stats_gc_concurrency_test")
require.NoError(t, err)
srvSettings := &driver.Server{
Args: []string{"--port", `{{get_port "server"}}`},
DynamicPort: "server",
}
var ports DynamicResources
ports.global = &GlobalPorts
ports.t = t
server := MakeServer(t, repo, srvSettings, &ports)
server.DBName = "stats_gc_concurrency_test"
// Connect to the database
db, err := server.DB(driver.Connection{User: "root"})
require.NoError(t, err)
t.Cleanup(func() {
db.Close()
})
db.SetMaxOpenConns(32)
db.SetMaxIdleConns(32)
numbranches := 3
var branches = []string{"main"}
for i := 0; i < numbranches; i++ {
branches = append(branches, uuid.New().String())
}
var tables = []string{
"t0", "t1", "t2", "t3", "t4",
"t5", "t6", "t7", "t8", "t9",
}
var columns = []string{
"c0", "c1", "c2", "c3", "c4",
"c5", "c6", "c7", "c8", "c9",
}
var columndefs = []string{}
for _, col := range columns {
columndefs = append(columndefs, fmt.Sprintf("`%s` int unique", col))
}
for _, table := range tables {
_, err = db.ExecContext(ctx, fmt.Sprintf(strings.Join([]string{
"CREATE TABLE `%s` (",
"id INT AUTO_INCREMENT PRIMARY KEY,",
strings.Join(columndefs, ",\n"),
");",
}, "\n"), table))
require.NoError(t, err)
}
_, err = db.ExecContext(ctx, "call dolt_commit('-Am', 'initial commit')")
require.NoError(t, err)
for _, branch := range branches[1:] {
_, err = db.ExecContext(ctx, fmt.Sprintf("call dolt_checkout('-b', '%s')", branch))
require.NoError(t, err)
for _, table := range tables {
_, err = db.ExecContext(ctx, fmt.Sprintf("insert into `%s` values (%d, %d, %d, %d, %d, %d, %d, %d, %d, %d, %d)",
table, rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29),
rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29)))
require.NoError(t, err)
}
}
_, err = db.ExecContext(ctx, "call dolt_checkout('main')")
require.NoError(t, err)
for _, table := range tables {
_, err = db.ExecContext(ctx, fmt.Sprintf("insert into `%s` values (%d, %d, %d, %d, %d, %d, %d, %d, %d, %d, %d)",
table, rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29),
rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29), rand.IntN(1<<29)))
require.NoError(t, err)
}
numThreads := 10
// The workers will run until this is closed. This will be closed after the stats_gc() is finished.
stop := make(chan struct{})
maxQuery := make([]time.Duration, numThreads)
eg, egCtx := errgroup.WithContext(ctx)
for i := 0; i < numThreads; i++ {
eg.Go(func() error {
for {
select {
case <-stop:
return nil
default:
}
err := func() (err error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
if elapsed > maxQuery[i] {
maxQuery[i] = elapsed
}
}()
conn, err := db.Conn(egCtx)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, conn.Close())
}()
branch := branches[rand.IntN(len(branches))]
_, err = conn.ExecContext(egCtx, fmt.Sprintf("call dolt_checkout('%s')", branch))
if err != nil {
return err
}
t1 := tables[rand.IntN(len(tables))]
t2 := tables[rand.IntN(len(tables))]
for t2 == t1 {
t2 = tables[rand.IntN(len(tables))]
}
c1 := columns[rand.IntN(len(columns))]
c2 := columns[rand.IntN(len(columns))]
rows, err := conn.QueryContext(egCtx, fmt.Sprintf("select * from %s join %s on %s.%s = %s.%s",
t1, t2, t1, c1, t2, c2))
if err != nil {
return err
}
defer func() {
err = errors.Join(err, rows.Close())
}()
for rows.Next() {
}
return nil
}()
if err != nil {
return err
}
}
})
}
var gcDuration time.Duration
eg.Go(func() (err error) {
defer close(stop)
start := time.Now()
defer func() {
gcDuration = time.Since(start)
}()
conn, err := db.Conn(egCtx)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, conn.Close())
}()
_, err = conn.ExecContext(egCtx, "call dolt_stats_gc()")
if err != nil {
return err
}
return nil
})
require.NoError(t, eg.Wait())
for _, d := range maxQuery {
assert.Greater(t, 3 * time.Second, d)
assert.Greater(t, gcDuration, d)
}
}