Revamp the Import Benchmarker (#3744)

This commit is contained in:
Vinai Rachakonda
2022-07-13 18:27:14 -07:00
committed by GitHub
parent 35a6eeca6b
commit 3f985c76d1
22 changed files with 1356 additions and 504 deletions

View File

@@ -0,0 +1,80 @@
#!/bin/sh
set -e
if [ "$#" -lt 8 ]; then
echo "Usage: ./get-dolt-dolt-job-json.sh <jobName> <fromServer> <fromVersion> <toServer> <toVersion> <timePrefix> <actorPrefix> <issueNumber>"
exit 1
fi
jobName="$1"
fromServer="$2"
fromVersion="$3"
toServer="$4"
toVersion="$5"
timePrefix="$6"
actorPrefix="$7"
issueNumber="$8" # TODO: Use this to paste the results onto the github issue
echo '
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": "'$jobName'",
"namespace": "performance-benchmarking"
},
"spec": {
"backoffLimit": 1,
"template": {
"spec": {
"serviceAccountName": "import-benchmarking",
"containers": [
{
"name": "import-benchmarking",
"image": "407903926827.dkr.ecr.us-west-2.amazonaws.com/liquidata/import-benchmarking:latest",
"resources": {
"limits": {
"cpu": "7000m"
}
},
"env": [
{ "name": "GOMAXPROCS", "value": "7" },
{ "name": "ACTOR", "value": "'$ACTOR'" },
{ "name": "ACTOR_EMAIL", "value": "'$ACTOR_EMAIL'" },
{ "name": "REPO_ACCESS_TOKEN", "value": "'$REPO_ACCESS_TOKEN'" }
],
"imagePullPolicy": "Always",
"args": [
"--from-server='$fromServer'",
"--from-version='$fromVersion'",
"--to-server='$toServer'",
"--to-version='$toVersion'",
"--bucket=import-benchmarking-github-actions-results",
"--region=us-west-2",
"--results-dir='$timePrefix'",
"--results-prefix='$actorPrefix'",
"--fileNames=100k-sorted.csv",
"--fileNames=100k-random.csv",
"--fileNames=1m-sorted.csv",
"--fileNames=1m-random.csv"
]
}
],
"restartPolicy": "Never",
"nodeSelector": {
"performance-benchmarking-worker": "true"
},
"tolerations": [
{
"effect": "NoSchedule",
"key": "dedicated",
"operator": "Equal",
"value": "performance-benchmarking-worker"
}
]
}
}
}
}
'

View File

@@ -0,0 +1,82 @@
#!/bin/sh
set -e
if [ "$#" -lt 8 ]; then
echo "Usage: ./get-mysql-dolt-job-json.sh <jobName> <fromServer> <fromVersion> <toServer> <toVersion> <timePrefix> <actorPrefix> <issueNumber>"
exit 1
fi
jobName="$1"
fromServer="$2"
fromVersion="$3"
toServer="$4"
toVersion="$5"
timePrefix="$6"
actorPrefix="$7"
issueNumber="$8" # TODO: Use this to paste the results onto the github issue
echo '
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": "'$jobName'",
"namespace": "performance-benchmarking"
},
"spec": {
"backoffLimit": 1,
"template": {
"spec": {
"serviceAccountName": "import-benchmarking",
"containers": [
{
"name": "import-benchmarking",
"image": "407903926827.dkr.ecr.us-west-2.amazonaws.com/liquidata/import-benchmarking:latest",
"resources": {
"limits": {
"cpu": "7000m"
}
},
"env": [
{ "name": "GOMAXPROCS", "value": "7" },
{ "name": "ACTOR", "value": "'$ACTOR'" },
{ "name": "ACTOR_EMAIL", "value": "'$ACTOR_EMAIL'" },
{ "name": "REPO_ACCESS_TOKEN", "value": "'$REPO_ACCESS_TOKEN'" }
],
"imagePullPolicy": "Always",
"args": [
"--from-server='$fromServer'",
"--from-version='$fromVersion'",
"--to-server='$toServer'",
"--to-version='$toVersion'",
"--bucket=import-benchmarking-github-actions-results",
"--region=us-west-2",
"--results-dir='$timePrefix'",
"--results-prefix='$actorPrefix'",
"--mysql-exec=/usr/sbin/mysqld",
"--mysql-schema-file=schema.sql",
"--fileNames=100k-sorted.csv",
"--fileNames=100k-random.csv",
"--fileNames=1m-sorted.csv",
"--fileNames=1m-random.csv"
]
}
],
"restartPolicy": "Never",
"nodeSelector": {
"performance-benchmarking-worker": "true"
},
"tolerations": [
{
"effect": "NoSchedule",
"key": "dedicated",
"operator": "Equal",
"value": "performance-benchmarking-worker"
}
]
}
}
}
}
'

View File

@@ -0,0 +1,71 @@
#!/bin/bash
set -e
if [ -z "$KUBECONFIG" ]; then
echo "Must set KUBECONFIG"
exit 1
fi
if [ -z "$TEMPLATE_SCRIPT" ]; then
echo "Must set TEMPLATE_SCRIPT"
exit 1
fi
if [ -z "$FROM_SERVER" ] || [ -z "$FROM_VERSION" ] || [ -z "$TO_SERVER" ] || [ -z "$TO_VERSION" ]; then
echo "Must set FROM_SERVER FROM_VERSION TO_SERVER and TO_VERSION"
exit 1
fi
if [ -z "$ACTOR" ]; then
echo "Must set ACTOR"
exit 1
fi
if [ -z "$MODE" ]; then
echo "Must set MODE"
exit 1
fi
echo "Setting from $FROM_SERVER: $FROM_VERSION"
# use first 8 characters of TO_VERSION to differentiate
# jobs
short=${TO_VERSION:0:8}
lowered=$(echo "$ACTOR" | tr '[:upper:]' '[:lower:]')
actorShort="$lowered-$short"
# random sleep
sleep 0.$[ ( $RANDOM % 10 ) + 1 ]s
timesuffix=`date +%s%N`
jobname="$actorShort-$timesuffix"
timeprefix=$(date +%Y/%m/%d)
actorprefix="$MODE/$ACTOR/$actorShort"
# set value to ISSUE_NUMBER environment variable
# or default to -1
issuenumber=${ISSUE_NUMBER:-"-1"}
source \
"$TEMPLATE_SCRIPT" \
"$jobname" \
"$FROM_VERSION" \
"$TO_VERSION" \
"$timeprefix" \
"$actorprefix" \
"$issuenumber" > job.json
out=$(KUBECONFIG="$KUBECONFIG" kubectl apply -f job.json || true)
if [ "$out" != "job.batch/$jobname created" ]; then
echo "something went wrong creating job... this job likely already exists in the cluster"
echo "$out"
exit 1
else
echo "$out"
fi
exit 0

View File

@@ -0,0 +1,98 @@
name: Run Import Benchmark on Pull Requests
on:
pull_request:
types: [ opened ]
issue_comment:
types: [ created ]
jobs:
validate-commentor:
runs-on: ubuntu-18.04
outputs:
valid: ${{ steps.set_valid.outputs.valid }}
steps:
- uses: actions/checkout@v2
- name: Validate Commentor
id: set_valid
run: ./.github/scripts/performance-benchmarking/validate-commentor.sh "$ACTOR"
env:
ACTOR: ${{ github.actor }}
check-comments:
runs-on: ubuntu-18.04
needs: validate-commentor
if: ${{ needs.validate-commentor.outputs.valid == 'true' }}
outputs:
benchmark: ${{ steps.set_benchmark.outputs.benchmark }}
comment-body: ${{ steps.set_body.outputs.body }}
steps:
- name: Check for Deploy Trigger
uses: dolthub/pull-request-comment-trigger@master
id: check
with:
trigger: '#import-benchmark'
reaction: rocket
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Set Benchmark
if: ${{ steps.check.outputs.triggered == 'true' }}
id: set_benchmark
run: |
echo "::set-output name=benchmark::true"
performance:
runs-on: ubuntu-18.04
needs: [validate-commentor, check-comments]
if: ${{ needs.check-comments.outputs.benchmark == 'true' }}
name: Benchmark Import Performance
steps:
- name: Checkout
uses: actions/checkout@v2
- uses: azure/setup-kubectl@v2.0
with:
version: 'v1.23.6'
- name: Install aws-iam-authenticator
run: |
curl -o aws-iam-authenticator https://amazon-eks.s3.us-west-2.amazonaws.com/1.18.8/2020-09-18/bin/linux/amd64/aws-iam-authenticator && \
chmod +x ./aws-iam-authenticator && \
sudo cp ./aws-iam-authenticator /usr/local/bin/aws-iam-authenticator
aws-iam-authenticator version
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2
- uses: xt0rted/pull-request-comment-branch@v1
id: comment-branch
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
- name: Create and Auth kubeconfig
run: |
echo "$CONFIG" > kubeconfig
KUBECONFIG=kubeconfig kubectl config set-credentials github-actions-dolt --exec-api-version=client.authentication.k8s.io/v1alpha1 --exec-command=aws-iam-authenticator --exec-arg=token --exec-arg=-i --exec-arg=eks-cluster-1
KUBECONFIG=kubeconfig kubectl config set-context github-actions-dolt-context --cluster=eks-cluster-1 --user=github-actions-dolt --namespace=performance-benchmarking
KUBECONFIG=kubeconfig kubectl config use-context github-actions-dolt-context
env:
CONFIG: ${{ secrets.CORP_KUBECONFIG }}
- name: Get pull number
uses: actions/github-script@v3
id: get_pull_number
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: core.setOutput("pull_number", JSON.stringify(context.issue.number));
- name: Run benchmarks
id: run-benchmarks
run: ./.github/scripts/import-benchmarking/run-benchmarks.sh
env:
FROM_SERVER: "dolt"
FROM_VERSION: ${{ github.sha }}
TO_SERVER: "dolt"
TO_VERSION: ${{ steps.comment-branch.outputs.head_sha }}
MODE: 'pullRequest'
ISSUE_NUMBER: ${{ steps.get_pull_number.outputs.pull_number }}
ACTOR: ${{ github.actor }}
REPO_ACCESS_TOKEN: ${{ secrets.REPO_ACCESS_TOKEN }}
KUBECONFIG: "./kubeconfig"
TEMPLATE_SCRIPT: "./.github/scripts/import-benchmarking/get-dolt-dolt-job-json.sh"

View File

@@ -0,0 +1,16 @@
## Import Benchmarker
This library is used to benchmark `dolt table import` with csv/json and `dolt sql` with SQL data. It uses
the Go testing.B package to run the import command.
### Test Files
This package uses several test files that are stored in a private S3 bucket (import-benchmarking-github-actions-results)
which represent different sort order, primary keys, etc.
You can use the sample-config in the cmd package to benchmark against a sample set of files. If a filepath
is not specified, Dolt will generate a random test file for you.
### Notes
* You should name your table "test" in the schema file for the mysql provisioning.

View File

@@ -1,248 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"testing"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/cmd/dolt/commands"
"github.com/dolthub/dolt/go/cmd/dolt/commands/tblcmds"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/test"
"github.com/dolthub/dolt/go/store/types"
)
const (
testHomeDir = "/user/tester"
)
type doltCommandFunc func(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int
func removeTempDoltDataDir(fs filesys.Filesys) {
cwd, err := os.Getwd()
if err != nil {
log.Fatal(err)
}
doltDir := filepath.Join(cwd, dbfactory.DoltDir)
exists, _ := fs.Exists(doltDir)
if exists {
err := fs.Delete(doltDir, true)
if err != nil {
log.Fatal(err)
}
}
}
func getWorkingDir(fs filesys.Filesys) string {
workingDir := test.TestDir(testHomeDir)
err := fs.MkDirs(workingDir)
if err != nil {
log.Fatal(err)
}
return workingDir
}
func createTestEnvWithFS(fs filesys.Filesys, workingDir string) *env.DoltEnv {
removeTempDoltDataDir(fs)
testHomeDirFunc := func() (string, error) { return workingDir, nil }
const name = "test mcgibbins"
const email = "bigfakeytester@fake.horse"
dEnv := env.Load(context.Background(), testHomeDirFunc, fs, doltdb.LocalDirDoltDB, "test")
err := dEnv.InitRepo(context.Background(), types.Format_Default, name, email, env.DefaultInitBranch)
if err != nil {
panic("Failed to initialize environment")
}
return dEnv
}
// BenchmarkDoltImport returns a function that runs benchmarks for importing
// a test dataset into Dolt
func BenchmarkDoltImport(rows int, cols []*SeedColumn, format string) func(b *testing.B) {
fs := filesys.LocalFS
wd := getWorkingDir(fs)
return func(b *testing.B) {
doltImport(b, fs, rows, cols, wd, format)
}
}
// BenchmarkDoltExport returns a function that runs benchmarks for exporting
// a test dataset out of Dolt
func BenchmarkDoltExport(rows int, cols []*SeedColumn, format string) func(b *testing.B) {
fs := filesys.LocalFS
wd := getWorkingDir(fs)
return func(b *testing.B) {
doltExport(b, fs, rows, cols, wd, format)
}
}
// BenchmarkDoltSQLSelect returns a function that runs benchmarks for executing a sql query
// against a Dolt table
func BenchmarkDoltSQLSelect(rows int, cols []*SeedColumn, format string) func(b *testing.B) {
fs := filesys.LocalFS
wd := getWorkingDir(fs)
return func(b *testing.B) {
doltSQLSelect(b, fs, rows, cols, wd, format)
}
}
func doltImport(b *testing.B, fs filesys.Filesys, rows int, cols []*SeedColumn, workingDir, format string) {
pathToImportFile := filepath.Join(workingDir, fmt.Sprintf("testData%s", format))
oldStdin := os.Stdin
defer func() { os.Stdin = oldStdin }()
commandFunc, commandStr, args, dEnv := getBenchmarkingTools(fs, rows, cols, workingDir, pathToImportFile, format)
runBenchmark(b, commandFunc, commandStr, args, dEnv)
}
func doltExport(b *testing.B, fs filesys.Filesys, rows int, cols []*SeedColumn, workingDir, format string) {
pathToImportFile := filepath.Join(workingDir, fmt.Sprintf("testData%s", format))
oldStdin := os.Stdin
commandFunc, commandStr, args, dEnv := getBenchmarkingTools(fs, rows, cols, workingDir, pathToImportFile, format)
// import
status := commandFunc(context.Background(), commandStr, args, dEnv)
if status != 0 {
log.Fatalf("failed to import table successfully with exit code %d \n", status)
}
// revert stdin
os.Stdin = oldStdin
args = []string{"-f", "testTable", pathToImportFile}
runBenchmark(b, tblcmds.ExportCmd{}.Exec, "dolt table export", args, dEnv)
}
func doltSQLSelect(b *testing.B, fs filesys.Filesys, rows int, cols []*SeedColumn, workingDir, format string) {
testTable := "testTable"
pathToImportFile := filepath.Join(workingDir, fmt.Sprintf("testData%s", format))
oldStdin := os.Stdin
commandFunc, commandStr, args, dEnv := getBenchmarkingTools(fs, rows, cols, workingDir, pathToImportFile, format)
// import
status := commandFunc(context.Background(), commandStr, args, dEnv)
if status != 0 {
log.Fatalf("failed to import table successfully with exit code %d \n", status)
}
// revert stdin
os.Stdin = oldStdin
args = []string{"-q", fmt.Sprintf("select count(*) from %s", testTable)}
runBenchmark(b, commands.SqlCmd{}.Exec, "dolt sql", args, dEnv)
}
func runBenchmark(b *testing.B, commandFunc doltCommandFunc, commandStr string, args []string, dEnv *env.DoltEnv) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
status := commandFunc(context.Background(), commandStr, args, dEnv)
if status != 0 {
log.Fatalf("running benchmark failed with exit code... %d \n", status)
}
}
}
func getBenchmarkingTools(fs filesys.Filesys, rows int, cols []*SeedColumn, workingDir, pathToImportFile, format string) (commandFunc doltCommandFunc, commandStr string, args []string, dEnv *env.DoltEnv) {
testTable := "testTable"
sch := NewSeedSchema(rows, cols, format)
switch format {
case csvExt:
dEnv = setupDEnvImport(fs, sch, workingDir, testTable, "", pathToImportFile)
args = []string{"-c", "-f", testTable, pathToImportFile}
commandStr = "dolt table import"
commandFunc = tblcmds.ImportCmd{}.Exec
case sqlExt:
dEnv = setupDEnvImport(fs, sch, workingDir, testTable, "", pathToImportFile)
args = []string{}
commandStr = "dolt sql"
commandFunc = commands.SqlCmd{}.Exec
stdin := getStdinForSQLBenchmark(fs, pathToImportFile)
os.Stdin = stdin
case jsonExt:
pathToSchemaFile := filepath.Join(workingDir, fmt.Sprintf("testSchema%s", format))
dEnv = setupDEnvImport(fs, sch, workingDir, testTable, pathToSchemaFile, pathToImportFile)
args = []string{"-c", "-f", "-s", pathToSchemaFile, testTable, pathToImportFile}
commandStr = "dolt table import"
commandFunc = tblcmds.ImportCmd{}.Exec
default:
log.Fatalf("cannot import file, unsupported file format %s \n", format)
}
return commandFunc, commandStr, args, dEnv
}
func setupDEnvImport(fs filesys.Filesys, sch *SeedSchema, workingDir, tableName, pathToSchemaFile, pathToImportFile string) *env.DoltEnv {
wc, err := fs.OpenForWrite(pathToImportFile, os.ModePerm)
if err != nil {
log.Fatal(err)
}
defer wc.Close()
ds := NewDSImpl(wc, sch, seedRandom, tableName)
if pathToSchemaFile != "" {
// write schema file
err := fs.WriteFile(pathToSchemaFile, sch.Bytes())
if err != nil {
panic("unable to write data file to filesystem")
}
}
ds.GenerateData()
return createTestEnvWithFS(fs, workingDir)
}
func getStdinForSQLBenchmark(fs filesys.Filesys, pathToImportFile string) *os.File {
content, err := fs.ReadFile(pathToImportFile)
if err != nil {
log.Fatal(err)
}
tmpfile, err := os.CreateTemp("", "temp")
if err != nil {
log.Fatal(err)
}
defer file.Remove(tmpfile.Name()) // clean up
if _, err := tmpfile.Write(content); err != nil {
log.Fatal(err)
}
if err := tmpfile.Close(); err != nil {
log.Fatal(err)
}
f, err := os.Open(tmpfile.Name())
if err != nil {
log.Fatal(err)
}
return f
}

View File

@@ -0,0 +1,53 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"flag"
"log"
"os"
"github.com/dolthub/dolt/go/performance/import_benchmarker"
)
const (
resultsTableName = "results"
)
var configPath = flag.String("config", "", "the path to a config file")
func main() {
flag.Parse()
// Construct a config
config, err := import_benchmarker.NewDefaultImportBenchmarkConfig()
if *configPath != "" {
config, err = import_benchmarker.FromFileConfig(*configPath)
}
if err != nil {
log.Fatal(err.Error())
}
// Get the working directory the tests will be executing in
wd := import_benchmarker.GetWorkingDir()
// Generate the tests and the benchmarker.
results := import_benchmarker.RunBenchmarkTests(config, wd)
import_benchmarker.SerializeResults(results, wd, resultsTableName, "csv")
os.Exit(0)
}

View File

@@ -0,0 +1,27 @@
{
"Jobs": [
{
"Name": "Medium Dolt import with Sorted Rows",
"NumRows": 1000000,
"Sorted": true,
"Format": "csv",
"FilePath": "/Users/vinairachakonda/misc/import-benchmark-files/100k-sorted.csv",
"Program": "mysql",
"Version": "8.0.22",
"ExecPath": "/usr/local/bin/mysqld",
"SchemaPath": "/Users/vinairachakonda/go/src/dolthub/dolt/go/performance/import_benchmarker/cmd/schema.sql"
},
{
"Name": "Medium MySQL import with Sorted Rows",
"NumRows": 1000000,
"Sorted": true,
"Format": "csv",
"FilePath": "/Users/vinairachakonda/misc/import-benchmark-files/100k-sorted.csv",
"Program": "dolt",
"Version": "HEAD",
"ExecPath": "/Users/vinairachakonda/go/bin/dolt",
"SchemaPath": "/Users/vinairachakonda/go/src/dolthub/dolt/go/performance/import_benchmarker/cmd/schema.sql"
}
],
"MysqlConnectionProtocol": "tcp"
}

View File

@@ -0,0 +1,13 @@
CREATE TABLE `test` (
`pk` int NOT NULL,
`c1` bigint DEFAULT NULL,
`c2` char(1) DEFAULT NULL,
`c3` datetime DEFAULT NULL,
`c4` double DEFAULT NULL,
`c5` tinyint DEFAULT NULL,
`c6` float DEFAULT NULL,
`c7` varchar(255) DEFAULT NULL,
`c8` varbinary(255) DEFAULT NULL,
`c9` text DEFAULT NULL,
PRIMARY KEY (`pk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin;

View File

@@ -0,0 +1,270 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package import_benchmarker
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"github.com/dolthub/dolt/go/performance/utils/sysbench_runner"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)
const (
smallSet = 100000
mediumSet = 1000000
largeSet = 10000000
testTable = "test"
)
var (
ErrMissingMysqlSchemaFile = errors.New("error: Must supply schema file for mysql jobs")
ErrImproperMysqlFileFormat = errors.New("error: Improper schema file for mysql")
ErrUnsupportedProgram = errors.New("error: Unsupported program only dolt or mysql used")
ErrUnsupportedFileFormat = errors.New("error: Unsupport formated. Only csv, json or sql allowed")
)
type ImportBenchmarkJob struct {
// Name of the job
Name string
// NumRows represents the number of rows being imported in the job.
NumRows int
// Sorted represents whether the data is sorted or not.
Sorted bool
// Format is either csv, json or sql.
Format string
// Filepath is the path to the data file. If empty data is generated instead.
Filepath string
// Program is either Dolt or MySQL.
Program string
// Version tracks the current version of Dolt or MySQL being used
Version string
// ExecPath is a path towards a Dolt or MySQL executable. This is also useful when running different versions of Dolt.
ExecPath string
// SchemaPath is a path towards a generated schema. It is needed for MySQL testing and optional for Dolt testing
SchemaPath string
}
type ImportBenchmarkConfig struct {
Jobs []*ImportBenchmarkJob
// MysqlConnectionProtocol is either tcp or unix. On our kubernetes benchmarking deployments unix is needed. To run this
// locally you want tcp
MysqlConnectionProtocol string
// MysqlPort is used to connect with a MySQL port
MysqlPort int
// MysqlHost is used to connect with a MySQL host
MysqlHost string
}
// NewDefaultImportBenchmarkConfig returns a default import configuration where data is generated with accordance to
// the medium set.
func NewDefaultImportBenchmarkConfig() (*ImportBenchmarkConfig, error) {
jobs := []*ImportBenchmarkJob{
{
Name: "dolt_import_small",
NumRows: smallSet,
Sorted: false,
Format: csvExt,
Version: "HEAD", // Use whatever dolt is installed locally
ExecPath: "dolt", // Assumes dolt is installed locally
Program: "dolt",
},
}
config := &ImportBenchmarkConfig{
Jobs: jobs,
}
err := config.ValidateAndUpdateDefaults()
if err != nil {
return nil, err
}
return config, nil
}
// FromFileConfig takes in a configuration file (encoded as JSON) and returns the relevant importBenchmark config
func FromFileConfig(configPath string) (*ImportBenchmarkConfig, error) {
data, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, err
}
config := &ImportBenchmarkConfig{
Jobs: make([]*ImportBenchmarkJob, 0),
}
err = json.Unmarshal(data, config)
if err != nil {
return nil, err
}
err = config.ValidateAndUpdateDefaults()
if err != nil {
return nil, err
}
return config, nil
}
func (c *ImportBenchmarkConfig) ValidateAndUpdateDefaults() error {
if c.MysqlConnectionProtocol == "" {
c.MysqlConnectionProtocol = "tcp"
}
if c.MysqlHost == "" {
c.MysqlHost = defaultHost
}
if c.MysqlPort == 0 {
c.MysqlPort = defaultPort
}
for _, job := range c.Jobs {
err := job.updateDefaultsAndValidate()
if err != nil {
return err
}
}
return nil
}
func (j *ImportBenchmarkJob) updateDefaultsAndValidate() error {
j.Program = strings.ToLower(j.Program)
programAsServerType := sysbench_runner.ServerType(j.Program)
switch programAsServerType {
case sysbench_runner.MySql:
if j.SchemaPath == "" {
return ErrMissingMysqlSchemaFile
}
if j.Format != csvExt {
return ErrImproperMysqlFileFormat
}
case sysbench_runner.Dolt:
default:
return ErrUnsupportedProgram
}
j.Format = strings.ToLower(j.Format)
seen := false
for _, f := range supportedFormats {
if f == j.Format {
seen = true
}
}
if !seen {
return ErrUnsupportedFileFormat
}
return nil
}
func getMysqlConfigFromConfig(c *ImportBenchmarkConfig) sysbench_runner.MysqlConfig {
return sysbench_runner.MysqlConfig{Socket: defaultSocket, Host: c.MysqlHost, ConnectionProtocol: c.MysqlConnectionProtocol, Port: c.MysqlPort}
}
// generateTestFilesIfNeeded creates the test conditions for an import benchmark to execute. In the case that the config
// dictates that data needs to be generated, this function handles that
func generateTestFilesIfNeeded(config *ImportBenchmarkConfig) *ImportBenchmarkConfig {
jobs := make([]*ImportBenchmarkJob, 0)
for _, job := range config.Jobs {
// Preset csv path
if job.Filepath != "" {
jobs = append(jobs, job)
} else {
filePath, fileFormat := generateTestFile(job)
job.Filepath = filePath
job.Format = fileFormat
jobs = append(jobs, job)
}
}
config.Jobs = jobs
return config
}
// generateTestFile is used to create a generated test case with a randomly generated csv file.
func generateTestFile(job *ImportBenchmarkJob) (string, string) {
sch := NewSeedSchema(job.NumRows, genSampleCols(), job.Format)
pathToImportFile := filepath.Join(GetWorkingDir(), fmt.Sprintf("testData.%s", sch.FileFormatExt))
wc, err := filesys.LocalFS.OpenForWrite(pathToImportFile, os.ModePerm)
if err != nil {
log.Fatalf(err.Error())
}
defer wc.Close()
ds := NewDSImpl(wc, sch, seedRandom, testTable)
ds.GenerateData()
return pathToImportFile, sch.FileFormatExt
}
func RunBenchmarkTests(config *ImportBenchmarkConfig, workingDir string) []result {
config = generateTestFilesIfNeeded(config)
// Split into the two jobs because we want
doltJobs := make([]*ImportBenchmarkJob, 0)
mySQLJobs := make([]*ImportBenchmarkJob, 0)
for _, job := range config.Jobs {
switch strings.ToLower(job.Program) {
case "dolt":
doltJobs = append(doltJobs, job)
case "mysql":
if job.Format != csvExt {
log.Fatal("mysql import benchmarking only supports csv files")
}
mySQLJobs = append(mySQLJobs, job)
default:
log.Fatal("error: Invalid program. Must use dolt or mysql. See the sample config")
}
}
results := make([]result, 0)
for _, doltJob := range doltJobs {
results = append(results, BenchmarkDoltImportJob(doltJob, workingDir))
}
results = append(results, BenchmarkMySQLImportJobs(mySQLJobs, getMysqlConfigFromConfig(config))...)
return results
}

View File

@@ -0,0 +1,149 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package import_benchmarker
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestGeneratedConfigCanBeImported(t *testing.T) {
t.Skip() // Skipping since dolt isn't installed on the github actions vm
config, err := NewDefaultImportBenchmarkConfig()
assert.NoError(t, err)
wd := GetWorkingDir()
results := RunBenchmarkTests(config, wd)
assert.Equal(t, 1, len(results))
assert.Equal(t, "dolt_import_small", results[0].name)
// Sanity check: An import of 100,000 should never take more than 15 seconds
assert.LessOrEqual(t, results[0].br.T, time.Second*15)
os.RemoveAll(filepath.Join(wd, "testData.csv"))
}
func TestCanGenerateFilesForAllFormats(t *testing.T) {
config := &ImportBenchmarkConfig{Jobs: make([]*ImportBenchmarkJob, 0)}
// Create jobs for all configs
for _, format := range supportedFormats {
job := &ImportBenchmarkJob{
Name: "dolt_import_small",
NumRows: smallSet,
Sorted: false,
Format: format,
Version: "HEAD", // Use whatever dolt is installed locally
ExecPath: "dolt", // Assumes dolt is installed locally
}
config.Jobs = append(config.Jobs, job)
}
assert.Equal(t, 3, len(config.Jobs))
config = generateTestFilesIfNeeded(config)
for _, job := range config.Jobs {
file, err := os.Open(job.Filepath)
assert.NoError(t, err)
err = file.Close()
assert.NoError(t, err)
err = os.Remove(job.Filepath)
assert.NoError(t, err)
}
}
func TestBadConfigurations(t *testing.T) {
t.Run("non csv format MySQL Job is considered invalid", func(t *testing.T) {
mysqlJob := createSampleMysqlJob()
mysqlJob.Format = jsonExt
config := &ImportBenchmarkConfig{Jobs: []*ImportBenchmarkJob{mysqlJob}}
err := config.ValidateAndUpdateDefaults()
assert.Error(t, err)
assert.Equal(t, ErrImproperMysqlFileFormat, err)
})
t.Run("MySQL Job with no schema file errors", func(t *testing.T) {
mysqlJob := createSampleMysqlJob()
mysqlJob.SchemaPath = ""
config := &ImportBenchmarkConfig{Jobs: []*ImportBenchmarkJob{mysqlJob}}
err := config.ValidateAndUpdateDefaults()
assert.Error(t, err)
assert.Equal(t, ErrMissingMysqlSchemaFile, err)
})
t.Run("improper program type passed in ", func(t *testing.T) {
doltJob := createSampleDoltJob()
doltJob.Program = "fake-program"
config := &ImportBenchmarkConfig{Jobs: []*ImportBenchmarkJob{doltJob}}
err := config.ValidateAndUpdateDefaults()
assert.Error(t, err)
assert.Equal(t, ErrUnsupportedProgram, err)
})
t.Run("improper file extension passed in", func(t *testing.T) {
doltJob := createSampleDoltJob()
doltJob.Format = "psv"
config := &ImportBenchmarkConfig{Jobs: []*ImportBenchmarkJob{doltJob}}
err := config.ValidateAndUpdateDefaults()
assert.Error(t, err)
assert.Equal(t, ErrUnsupportedFileFormat, err)
})
}
// createSampleMysqlJob creates a simple MySQL job that is particularly valuable for the future
func createSampleMysqlJob() *ImportBenchmarkJob {
job := ImportBenchmarkJob{
Name: "Mysql Dummy",
Format: csvExt,
Version: "8.0.22", // Use whatever dolt is installed locally
ExecPath: "/usr/mysql", // Assumes dolt is installed locally
Program: "mysql",
SchemaPath: "/schema",
}
return &job
}
func createSampleDoltJob() *ImportBenchmarkJob {
job := &ImportBenchmarkJob{
Name: "dolt_import_small",
NumRows: smallSet,
Sorted: false,
Format: csvExt,
Version: "HEAD", // Use whatever dolt is installed locally
ExecPath: "dolt", // Assumes dolt is installed locally
Program: "dolt",
}
return job
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package main
package import_benchmarker
import (
"io"

View File

@@ -0,0 +1,208 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package import_benchmarker
import (
"bytes"
"context"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"strconv"
"testing"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/performance/utils/sysbench_runner"
)
// BenchmarkDoltImportJob returns a function that runs benchmarks for importing
// a test dataset into Dolt
func BenchmarkDoltImportJob(job *ImportBenchmarkJob, workingDir string) result {
oldStdin := os.Stdin
defer func() { os.Stdin = oldStdin }()
setupAndInitializeDoltRepo(filesys.LocalFS, workingDir, job.ExecPath)
defer RemoveDoltDataDir(filesys.LocalFS, workingDir) // remove the repo each time
commandStr, args := getBenchmarkingTools(job, workingDir)
br := testing.Benchmark(func(b *testing.B) {
runBenchmarkCommand(b, commandStr, args, workingDir)
})
return result{
name: job.Name,
format: job.Format,
rows: job.NumRows,
columns: len(genSampleCols()),
sizeOnDisk: getSizeOnDisk(filesys.LocalFS, workingDir),
br: br,
doltVersion: job.Version,
program: "dolt",
}
}
// setupAndInitializeDoltRepo calls the `dolt init` command on the workingDir to create a new Dolt repository.
func setupAndInitializeDoltRepo(fs filesys.Filesys, workingDir, doltExecPath string) {
RemoveDoltDataDir(fs, workingDir)
err := sysbench_runner.DoltVersion(context.Background(), doltExecPath)
if err != nil {
log.Fatal(err.Error())
}
err = sysbench_runner.UpdateDoltConfig(context.Background(), doltExecPath)
if err != nil {
log.Fatal(err.Error())
}
init := execCommand(context.Background(), doltExecPath, "init")
init.Dir = workingDir
err = init.Run()
if err != nil {
log.Fatal(err.Error())
}
}
// getBenchmarkingTools setups up the relevant environment for testing.
func getBenchmarkingTools(job *ImportBenchmarkJob, workingDir string) (commandStr string, args []string) {
switch job.Format {
case csvExt:
args = []string{"table", "import", "-c", "-f", testTable, job.Filepath}
if job.SchemaPath != "" {
args = append(args, "-s", job.SchemaPath)
}
case sqlExt:
stdin := getStdinForSQLBenchmark(filesys.LocalFS, job.Filepath)
os.Stdin = stdin
args = []string{"sql"}
case jsonExt:
pathToSchemaFile := filepath.Join(workingDir, fmt.Sprintf("testSchema%s", job.Format))
if job.SchemaPath != "" {
pathToSchemaFile = job.SchemaPath
}
args = []string{"table", "import", "-c", "-f", "-s", pathToSchemaFile, testTable, job.Filepath}
default:
log.Fatalf("cannot import file, unsupported file format %s \n", job.Format)
}
return job.ExecPath, args
}
// runBenchmarkCommand runs and times the benchmark. This is the critical portion of the code
func runBenchmarkCommand(b *testing.B, commandStr string, args []string, wd string) {
// Note that we can rerun this because dolt import uses the -f parameter
for i := 0; i < b.N; i++ {
cmd := execCommand(context.Background(), commandStr, args...)
var errBytes bytes.Buffer
cmd.Dir = wd
cmd.Stdout = os.Stdout
cmd.Stderr = &errBytes
err := cmd.Run()
if err != nil {
log.Fatalf("error running benchmark: %v", errBytes.String())
}
}
}
// RemoveDoltDataDir is used to remove the .dolt repository
func RemoveDoltDataDir(fs filesys.Filesys, dir string) {
doltDir := filepath.Join(dir, dbfactory.DoltDir)
exists, _ := fs.Exists(doltDir)
if exists {
err := fs.Delete(doltDir, true)
if err != nil {
log.Fatal(err)
}
}
}
func execCommand(ctx context.Context, name string, arg ...string) *exec.Cmd {
e := exec.CommandContext(ctx, name, arg...)
return e
}
// getSizeOnDisk returns the size of the .dolt repo. This is useful for understanding how a repo grows in size in
// proportion to the number of rows.
func getSizeOnDisk(fs filesys.Filesys, workingDir string) float64 {
doltDir := filepath.Join(workingDir, dbfactory.DoltDir)
exists, _ := fs.Exists(doltDir)
if !exists {
return 0
}
size, err := dirSizeMB(doltDir)
if err != nil {
log.Fatal(err.Error())
}
roundedStr := fmt.Sprintf("%.2f", size)
rounded, _ := strconv.ParseFloat(roundedStr, 2)
return rounded
}
// cc: https://stackoverflow.com/questions/32482673/how-to-get-directory-total-size
func dirSizeMB(path string) (float64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})
sizeMB := float64(size) / 1024.0 / 1024.0
return sizeMB, err
}
func getStdinForSQLBenchmark(fs filesys.Filesys, pathToImportFile string) *os.File {
content, err := fs.ReadFile(pathToImportFile)
if err != nil {
log.Fatal(err)
}
tmpfile, err := os.CreateTemp("", "temp")
if err != nil {
log.Fatal(err)
}
defer file.Remove(tmpfile.Name()) // clean up
if _, err := tmpfile.Write(content); err != nil {
log.Fatal(err)
}
if err := tmpfile.Close(); err != nil {
log.Fatal(err)
}
f, err := os.Open(tmpfile.Name())
if err != nil {
log.Fatal(err)
}
return f
}

View File

@@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package main
package import_benchmarker
import (
"fmt"
"log"
"math/rand"
"os"
"strconv"
"strings"
@@ -188,3 +189,8 @@ func getSQLHeader(cols []*SeedColumn, tableName, format string) string {
return strings.Join(statement, "")
}
func GetWorkingDir() string {
wd, _ := os.Getwd()
return wd
}

View File

@@ -1,186 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"flag"
"log"
"os"
"testing"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)
const (
smallSet = 1000
mediumSet = 100000
largeSet = 10000000
)
var outputPath = flag.String("outputPath", "./", "the path where the serialized results file will be stored.")
var outputFormat = flag.String("outputFormat", ".csv", "the format used to serialize the benchmarking results.")
var resultsTableName = flag.String("resultsTableName", "results", "the name of the results table.")
var csvFlag = flag.Bool("csv", false, "test importing .csv file into dolt")
var jsonFlag = flag.Bool("json", false, "test importing .json file into dolt")
var sqlFlag = flag.Bool("sql", false, "test importing .sql file into dolt")
var flagStrs = []flagStr{
{b: csvFlag, s: csvExt},
{b: jsonFlag, s: jsonExt},
{b: sqlFlag, s: sqlExt},
}
type flagStr struct {
b *bool
s string
}
func main() {
flag.Parse()
results := make([]result, 0)
testFmts := make([]string, 0)
for _, fs := range flagStrs {
if *fs.b {
if fs.s == sqlExt {
log.Fatal("benchmarking dolt sql imports currently disabled")
}
testFmts = append(testFmts, fs.s)
}
}
if len(testFmts) == 0 {
log.Fatal("must provide flag(s) format for testing dolt imports, ie -csv, -json, -sql \n")
}
for _, frmt := range testFmts {
benchmarks := []struct {
Name string
Format string
Rows int
Columns int
BM func(b *testing.B)
}{
{
Name: "dolt_import_small",
Format: frmt,
Rows: smallSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltImport(smallSet, genSampleCols(), frmt),
},
{
Name: "dolt_import_medium",
Format: frmt,
Rows: mediumSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltImport(mediumSet, genSampleCols(), frmt),
},
{
Name: "dolt_import_large",
Format: frmt,
Rows: largeSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltImport(largeSet, genSampleCols(), frmt),
},
}
for _, b := range benchmarks {
br := testing.Benchmark(b.BM)
res := result{
name: b.Name,
format: b.Format,
rows: b.Rows,
columns: b.Columns,
br: br,
}
results = append(results, res)
}
}
// benchmark other dolt commands with and just use a single import format
for _, frmt := range []string{csvExt} {
benchmarks := []struct {
Name string
Format string
Rows int
Columns int
BM func(b *testing.B)
}{
{
Name: "dolt_export_small",
Format: frmt,
Rows: smallSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltExport(smallSet, genSampleCols(), frmt),
},
{
Name: "dolt_export_medium",
Format: frmt,
Rows: mediumSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltExport(mediumSet, genSampleCols(), frmt),
},
{
Name: "dolt_export_large",
Format: frmt,
Rows: largeSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltExport(largeSet, genSampleCols(), frmt),
},
{
Name: "dolt_sql_select_small",
Format: frmt,
Rows: smallSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltSQLSelect(smallSet, genSampleCols(), frmt),
},
{
Name: "dolt_sql_select_medium",
Format: frmt,
Rows: mediumSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltSQLSelect(mediumSet, genSampleCols(), frmt),
},
{
Name: "dolt_sql_select_large",
Format: frmt,
Rows: largeSet,
Columns: len(genSampleCols()),
BM: BenchmarkDoltSQLSelect(largeSet, genSampleCols(), frmt),
},
}
for _, b := range benchmarks {
br := testing.Benchmark(b.BM)
res := result{
name: b.Name,
format: b.Format,
rows: b.Rows,
columns: b.Columns,
br: br,
}
results = append(results, res)
}
}
// write results data
serializeResults(results, *outputPath, *resultsTableName, *outputFormat)
// cleanup temp dolt data dir
removeTempDoltDataDir(filesys.LocalFS)
os.Exit(0)
}

View File

@@ -0,0 +1,189 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package import_benchmarker
import (
"bytes"
"context"
"database/sql"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"testing"
"time"
"github.com/go-sql-driver/mysql"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/performance/utils/sysbench_runner"
)
const (
defaultHost = "127.0.0.1"
defaultPort = 3306
defaultSocket = "/var/run/mysqld/mysqld.sock"
dbName = "test"
)
func BenchmarkMySQLImportJobs(jobs []*ImportBenchmarkJob, mConfig sysbench_runner.MysqlConfig) []result {
if len(jobs) == 0 {
return nil
}
ctx := context.Background()
withCancelCtx, cancel := context.WithCancel(ctx)
gServer, serverCtx := errgroup.WithContext(withCancelCtx)
var serverErr bytes.Buffer
// Assume first server is okay
server := getMysqlServer(serverCtx, jobs[0].ExecPath, getServersArgs())
server.Stderr = &serverErr
// launch the mysql server
gServer.Go(func() error {
err := server.Run()
if err != nil {
log.Fatal(serverErr.String())
return err
}
return nil
})
// sleep to allow the server to start
time.Sleep(5 * time.Second)
// setup the relevant testing database and permissions
err := sysbench_runner.SetupDB(ctx, mConfig, dbName)
if err != nil {
cancel()
log.Fatal(err.Error())
}
log.Println("successfully setup the database")
// handle user interrupt
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-quit
defer wg.Done()
signal.Stop(quit)
cancel()
}()
results := make([]result, len(jobs))
for i, job := range jobs {
// benchmark the actual job
br := testing.Benchmark(func(b *testing.B) {
benchmarkLoadData(ctx, b, mConfig, job)
})
results[i] = result{
name: job.Name,
format: job.Format,
rows: job.NumRows,
columns: len(genSampleCols()),
sizeOnDisk: -1, // TODO: Think about how to collect MySQL table size
br: br,
doltVersion: job.Version,
program: "mysql",
}
}
return results
}
func benchmarkLoadData(ctx context.Context, b *testing.B, mConfig sysbench_runner.MysqlConfig, job *ImportBenchmarkJob) {
dsn, err := sysbench_runner.FormatDsn(mConfig)
if err != nil {
log.Fatal(err)
}
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal(err)
}
defer func() {
rerr := db.Close()
if err == nil {
err = rerr
}
}()
err = db.Ping()
if err != nil {
log.Fatal(err)
}
_, err = db.ExecContext(ctx, fmt.Sprintf("USE %s", dbName))
if err != nil {
log.Fatal(err)
}
// Load the schema for the test table. This assumes the table has the same name as testTable
data, err := ioutil.ReadFile(job.SchemaPath)
if err != nil {
log.Fatal(err)
}
// Register the local file as per https://github.com/go-sql-driver/mysql#load-data-local-infile-support
mysql.RegisterLocalFile(job.Filepath)
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Since dolt also creates the table on import we'll add dropping and creating the table to the benchmark
_, err = db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", testTable))
if err != nil {
log.Fatal(err)
}
// Run the CREATE TABLE command stored in the schema file
// TODO: This schema file must have the same name as testTable.
_, err = db.ExecContext(ctx, string(data))
if err != nil {
log.Fatal(err)
}
// Run LOAD DATA on the csv file
_, err = db.ExecContext(ctx, fmt.Sprintf(`LOAD DATA LOCAL INFILE '%s' REPLACE INTO TABLE %s FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' IGNORE 1 LINES`, job.Filepath, testTable))
if err != nil {
log.Fatal(err)
}
log.Printf("MySQL server loaded file %s \n", job.Filepath)
}
}
// getServerArgs returns the arguments that run the mysql servier
func getServersArgs() []string {
return []string{"--user=mysql", fmt.Sprintf("--port=%d", defaultPort), "--local-infile=ON"}
}
// getMysqlServer returns a exec.Cmd for a dolt server
func getMysqlServer(ctx context.Context, serverExec string, params []string) *exec.Cmd {
return execCommand(ctx, serverExec, params...)
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package main
package import_benchmarker
import (
"fmt"
@@ -28,14 +28,17 @@ import (
)
type result struct {
name string
format string
rows int
columns int
br testing.BenchmarkResult
name string
format string
rows int
columns int
sizeOnDisk float64
br testing.BenchmarkResult
program string
doltVersion string
}
// RDSImpl is a Dataset containing results of benchmarking
// RSImpl is a Dataset containing results of benchmarking
type RSImpl struct {
// Schema defines the structure of the Dataset
Schema *SeedSchema
@@ -103,50 +106,47 @@ func getResultsRow(res result, cols []*SeedColumn) []string {
// set name
row[0] = res.name
// set program
row[1] = res.program
// set version
row[2] = res.doltVersion
// set format
row[1] = res.format
row[3] = res.format
// set rows
row[2] = fmt.Sprintf("%d", res.rows)
row[4] = fmt.Sprintf("%d", res.rows)
// set cols
row[3] = fmt.Sprintf("%d", res.columns)
row[5] = fmt.Sprintf("%d", res.columns)
// set iterations
row[4] = fmt.Sprintf("%d", res.br.N)
row[6] = fmt.Sprintf("%d", res.br.N)
// set time
row[5] = res.br.T.String()
// set bytes
row[6] = fmt.Sprintf("%v", res.br.Bytes)
// set mem_allocs
row[7] = fmt.Sprintf("%v", res.br.MemAllocs)
// set mem_bytes
row[8] = fmt.Sprintf("%v", res.br.MemBytes)
// set alloced_bytes_per_op
row[9] = fmt.Sprintf("%v", res.br.AllocedBytesPerOp())
//set allocs_per_op
row[10] = fmt.Sprintf("%v", res.br.AllocsPerOp())
row[7] = res.br.T.String()
// set size_on_disk
row[8] = fmt.Sprintf("%v", res.sizeOnDisk)
// set rows_per_second
row[9] = fmt.Sprintf("%.2f", float64(res.rows)/res.br.T.Seconds())
// set datetime
t := time.Now()
row[11] = fmt.Sprintf("%04d-%02d-%02d %02d:%02d", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute())
row[10] = fmt.Sprintf("%04d-%02d-%02d %02d:%02d", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute())
return row
}
func genResultsCols() []*SeedColumn {
return []*SeedColumn{
NewSeedColumn("name", false, types.StringKind, supplied),
NewSeedColumn("program", false, types.StringKind, supplied),
NewSeedColumn("version", false, types.StringKind, supplied),
NewSeedColumn("format", false, types.StringKind, supplied),
NewSeedColumn("rows", false, types.StringKind, supplied),
NewSeedColumn("columns", false, types.StringKind, supplied),
NewSeedColumn("iterations", false, types.StringKind, supplied),
NewSeedColumn("time", false, types.TimestampKind, supplied),
NewSeedColumn("bytes", false, types.IntKind, supplied),
NewSeedColumn("mem_allocs", false, types.IntKind, supplied),
NewSeedColumn("mem_bytes", false, types.IntKind, supplied),
NewSeedColumn("alloced_bytes_per_op", false, types.StringKind, supplied),
NewSeedColumn("allocs_per_op", false, types.StringKind, supplied),
NewSeedColumn("size_on_disk(MB)", false, types.StringKind, supplied),
NewSeedColumn("rows_per_second", false, types.StringKind, supplied),
NewSeedColumn("date_time", false, types.StringKind, supplied),
}
}
func serializeResults(results []result, path, tableName, format string) {
func SerializeResults(results []result, path, tableName, format string) string {
var sch *SeedSchema
switch format {
case csvExt:
@@ -156,7 +156,7 @@ func serializeResults(results []result, path, tableName, format string) {
}
now := time.Now()
fs := filesys.LocalFS
resultsFile := filepath.Join(path, fmt.Sprintf("benchmark_results-%04d-%02d-%02d%s", now.Year(), now.Month(), now.Day(), format))
resultsFile := filepath.Join(path, fmt.Sprintf("benchmark_results-%04d-%02d-%02d.%s", now.Year(), now.Month(), now.Day(), format))
wc, err := fs.OpenForWrite(resultsFile, os.ModePerm)
if err != nil {
log.Fatal(err)
@@ -165,4 +165,6 @@ func serializeResults(results []result, path, tableName, format string) {
ds := NewRSImpl(wc, sch, results, tableName)
ds.GenerateData()
return resultsFile
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package main
package import_benchmarker
import (
"fmt"
@@ -23,9 +23,9 @@ import (
)
const (
csvExt = ".csv"
jsonExt = ".json"
sqlExt = ".sql"
csvExt = "csv"
jsonExt = "json"
sqlExt = "sql"
increment = GenType("increment")
random = GenType("random")
@@ -148,18 +148,18 @@ func getColSchemaJSON(seedCols []*SeedColumn) []byte {
return []byte(strings.Join(statement, ""))
}
// TODO: Support autogeneration for a wider variety of types
func genSampleCols() []*SeedColumn {
return []*SeedColumn{
NewSeedColumn("id", true, types.IntKind, increment),
NewSeedColumn("int1", false, types.IntKind, random),
NewSeedColumn("int2", false, types.IntKind, increment),
NewSeedColumn("int3", false, types.IntKind, random),
NewSeedColumn("int4", false, types.IntKind, increment),
NewSeedColumn("int5", false, types.IntKind, increment),
NewSeedColumn("str1", false, types.StringKind, random),
NewSeedColumn("str2", false, types.StringKind, random),
NewSeedColumn("str3", false, types.StringKind, random),
NewSeedColumn("str4", false, types.StringKind, random),
NewSeedColumn("str5", false, types.StringKind, random),
NewSeedColumn("pk", true, types.IntKind, increment),
NewSeedColumn("c1", false, types.IntKind, random),
NewSeedColumn("c2", false, types.IntKind, increment),
NewSeedColumn("c3", false, types.IntKind, random),
NewSeedColumn("c4", false, types.IntKind, increment),
NewSeedColumn("c5", false, types.IntKind, increment),
NewSeedColumn("c6", false, types.StringKind, random),
NewSeedColumn("c7", false, types.StringKind, random),
NewSeedColumn("c8", false, types.StringKind, random),
NewSeedColumn("c9", false, types.StringKind, random),
}
}

View File

@@ -41,12 +41,12 @@ var stampFunc = func() string { return time.Now().UTC().Format(stampFormat) }
func BenchmarkDolt(ctx context.Context, config *Config, serverConfig *ServerConfig) (Results, error) {
serverParams := serverConfig.GetServerArgs()
err := doltVersion(ctx, serverConfig)
err := DoltVersion(ctx, serverConfig.ServerExec)
if err != nil {
return nil, err
}
err = UpdateDoltConfig(ctx, serverConfig)
err = UpdateDoltConfig(ctx, serverConfig.ServerExec)
if err != nil {
return nil, err
}
@@ -127,9 +127,9 @@ func BenchmarkDolt(ctx context.Context, config *Config, serverConfig *ServerConf
return results, nil
}
// doltVersion ensures the dolt binary can run
func doltVersion(ctx context.Context, config *ServerConfig) error {
doltVersion := ExecCommand(ctx, config.ServerExec, "version")
// DoltVersion ensures the dolt binary can run
func DoltVersion(ctx context.Context, serverExec string) error {
doltVersion := ExecCommand(ctx, serverExec, "version")
return doltVersion.Run()
}
@@ -170,17 +170,17 @@ func initDoltRepo(ctx context.Context, config *ServerConfig, initBigRepo bool, n
}
// UpdateDoltConfig updates the dolt config if necessary
func UpdateDoltConfig(ctx context.Context, config *ServerConfig) error {
err := checkSetDoltConfig(ctx, config, "user.name", "benchmark")
func UpdateDoltConfig(ctx context.Context, serverExec string) error {
err := checkSetDoltConfig(ctx, serverExec, "user.name", "benchmark")
if err != nil {
return err
}
return checkSetDoltConfig(ctx, config, "user.email", "benchmark@dolthub.com")
return checkSetDoltConfig(ctx, serverExec, "user.email", "benchmark@dolthub.com")
}
// checkSetDoltConfig checks the output of `dolt config --global --get` and sets the key, val if necessary
func checkSetDoltConfig(ctx context.Context, config *ServerConfig, key, val string) error {
check := ExecCommand(ctx, config.ServerExec, "config", "--global", "--get", key)
func checkSetDoltConfig(ctx context.Context, serverExec, key, val string) error {
check := ExecCommand(ctx, serverExec, "config", "--global", "--get", key)
err := check.Run()
if err != nil {
// config get calls exit with 1 if not set
@@ -188,7 +188,7 @@ func checkSetDoltConfig(ctx context.Context, config *ServerConfig, key, val stri
return err
}
set := ExecCommand(ctx, config.ServerExec, "config", "--global", "--add", key, val)
set := ExecCommand(ctx, serverExec, "config", "--global", "--add", key, val)
err := set.Run()
if err != nil {
return err

View File

@@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
@@ -29,6 +30,13 @@ import (
"golang.org/x/sync/errgroup"
)
type MysqlConfig struct {
Socket string
ConnectionProtocol string
Port int
Host string
}
// BenchmarkMysql benchmarks mysql based on the provided configurations
func BenchmarkMysql(ctx context.Context, config *Config, serverConfig *ServerConfig) (Results, error) {
withKeyCtx, cancel := context.WithCancel(ctx)
@@ -38,7 +46,7 @@ func BenchmarkMysql(ctx context.Context, config *Config, serverConfig *ServerCon
var serverCtx context.Context
var server *exec.Cmd
if serverConfig.Host == defaultHost {
fmt.Println("Launching the default server")
log.Println("Launching the default server")
localServer = true
gServer, serverCtx = errgroup.WithContext(withKeyCtx)
serverParams := serverConfig.GetServerArgs()
@@ -53,11 +61,12 @@ func BenchmarkMysql(ctx context.Context, config *Config, serverConfig *ServerCon
time.Sleep(10 * time.Second)
// setup mysqldb
err := SetupDB(ctx, serverConfig, dbName)
err := SetupDB(ctx, GetMysqlConnectionConfigFromServerConfig(serverConfig), dbName)
if err != nil {
cancel()
return nil, err
}
log.Println("Successfully set up the MySQL database")
}
// handle user interrupt
@@ -120,8 +129,8 @@ func getMysqlServer(ctx context.Context, config *ServerConfig, params []string)
return ExecCommand(ctx, config.ServerExec, params...)
}
func SetupDB(ctx context.Context, serverConfig *ServerConfig, databaseName string) (err error) {
dsn, err := formatDSN(serverConfig)
func SetupDB(ctx context.Context, mConfig MysqlConfig, databaseName string) (err error) {
dsn, err := FormatDsn(mConfig)
if err != nil {
return err
}
@@ -161,6 +170,10 @@ func SetupDB(ctx context.Context, serverConfig *ServerConfig, databaseName strin
if err != nil {
return err
}
_, err = db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL local_infile = 'ON'"))
if err != nil {
return err
}
// Required for running groupby_scan.lua without error
_, err = db.ExecContext(ctx, "SET GLOBAL sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));")
@@ -171,18 +184,27 @@ func SetupDB(ctx context.Context, serverConfig *ServerConfig, databaseName strin
return
}
func formatDSN(serverConfig *ServerConfig) (string, error) {
func FormatDsn(mConfig MysqlConfig) (string, error) {
var socketPath string
if serverConfig.Socket != "" {
socketPath = serverConfig.Socket
if mConfig.Socket != "" {
socketPath = mConfig.Socket
} else {
socketPath = defaultSocket
}
if serverConfig.ConnectionProtocol == tcpProtocol {
return fmt.Sprintf("root@tcp(%s:%d)/", defaultHost, serverConfig.Port), nil
} else if serverConfig.ConnectionProtocol == unixProtocol {
if mConfig.ConnectionProtocol == tcpProtocol {
return fmt.Sprintf("root@tcp(%s:%d)/", mConfig.Host, mConfig.Port), nil
} else if mConfig.ConnectionProtocol == unixProtocol {
return fmt.Sprintf("root@unix(%s)/", socketPath), nil
} else {
return "", ErrUnsupportedConnectionProtocol
}
}
func GetMysqlConnectionConfigFromServerConfig(config *ServerConfig) MysqlConfig {
return MysqlConfig{
Socket: config.Socket,
ConnectionProtocol: config.ConnectionProtocol,
Port: config.Port,
Host: defaultHost,
}
}

View File

@@ -38,7 +38,7 @@ const (
func BenchmarkDolt(ctx context.Context, tppcConfig *TpccBenchmarkConfig, serverConfig *sysbench_runner.ServerConfig) (sysbench_runner.Results, error) {
serverParams := serverConfig.GetServerArgs()
err := sysbench_runner.UpdateDoltConfig(ctx, serverConfig)
err := sysbench_runner.UpdateDoltConfig(ctx, serverConfig.ServerExec)
if err != nil {
return nil, err
}

View File

@@ -51,7 +51,7 @@ func BenchmarkMysql(ctx context.Context, config *TpccBenchmarkConfig, serverConf
time.Sleep(10 * time.Second)
// setup mysqldb
err := sysbench_runner.SetupDB(ctx, serverConfig, dbName)
err := sysbench_runner.SetupDB(ctx, sysbench_runner.GetMysqlConnectionConfigFromServerConfig(serverConfig), dbName)
if err != nil {
cancel()
return nil, err