Add a perf test for CSV map import (#2461)

Currently we only have a perf test for CSV list import, which uses the
sf-crime dataset. This test uses the 43MB sf-registered-businesses
dataset instead, since sf-crime is too slow. Which is ironic, since we
normally parse sf-crime into a map.

I've also tightened up some of the other perf tests.
- Fixed a bug where Database was shared between runs.
- Make the pure CSV parsing test use a smaller dataset, it doesn't need
  to use something as large as ny-vehicle-registrations.
This commit is contained in:
Ben Kalman
2016-08-31 17:05:00 -07:00
committed by GitHub
parent b857b95bb4
commit 9c694f024b
2 changed files with 94 additions and 69 deletions
+40 -34
View File
@@ -207,37 +207,6 @@ func Run(datasetID string, t *testing.T, suiteT perfSuiteT) {
db, err := spec.GetDatabase(*perfFlag)
assert.NoError(err)
// This is the temporary database for tests to use.
//
// * Why not use a local database + memory store?
// Firstly, because the spec would be "mem", and the spec library doesn't know how to reuse stores.
// Secondly, because it's an unrealistic performance measurement.
//
// * Why use a remote (HTTP) database?
// It's more realistic to exercise the HTTP stack, even if it's just talking over localhost.
//
// * Why provide an option for leveldb vs memory underlying store?
// Again, leveldb is more realistic than memory, and in common cases disk space > memory space.
// However, on this developer's laptop, there is actually very little disk space, and a lot of memory;
// plus making the test run a little bit faster locally is nice.
var chunkStore chunks.ChunkStore
if *perfMemFlag {
chunkStore = chunks.NewMemoryStore()
} else {
ldbDir := suite.TempDir("suite.suite")
chunkStore = chunks.NewLevelDBStoreUseFlags(ldbDir, "")
}
server := datas.NewRemoteDatabaseServer(chunkStore, 0)
portChan := make(chan int)
server.Ready = func() { portChan <- server.Port() }
go server.Run()
defer server.Stop()
port := <-portChan
suite.DatabaseSpec = fmt.Sprintf("http://localhost:%d", port)
suite.Database = datas.NewRemoteDatabase(suite.DatabaseSpec, "")
// List of test runs, each a map of test name => timing info.
testReps := make([]testRep, *perfRepeatFlag)
@@ -276,6 +245,10 @@ func Run(datasetID string, t *testing.T, suiteT perfSuiteT) {
for repIdx := 0; repIdx < *perfRepeatFlag; repIdx++ {
testReps[repIdx] = testRep{}
serverHost, stopServerFn := suite.startServer()
suite.DatabaseSpec = serverHost
suite.Database = datas.NewRemoteDatabase(serverHost, "")
if t, ok := suiteT.(SetupRepSuite); ok {
t.SetupRep()
}
@@ -322,6 +295,8 @@ func Run(datasetID string, t *testing.T, suiteT perfSuiteT) {
if t, ok := suiteT.(TearDownRepSuite); ok {
t.TearDownRep()
}
stopServerFn()
}
if t, ok := suiteT.(testifySuite.TearDownAllSuite); ok {
@@ -363,9 +338,7 @@ func (suite *PerfSuite) Pause(fn func()) {
func callSafe(name string, fun reflect.Value, args ...interface{}) (err interface{}) {
defer func() {
if r := recover(); r != nil {
err = r
}
err = recover()
}()
funArgs := make([]reflect.Value, len(args))
for i, arg := range args {
@@ -422,3 +395,36 @@ func (suite *PerfSuite) getGitHead(dir string) string {
}
return strings.TrimSpace(stdout.String())
}
func (suite *PerfSuite) startServer() (host string, stopFn func()) {
// This is the temporary database for tests to use.
//
// * Why not use a local database + memory store?
// Firstly, because the spec would be "mem", and the spec library doesn't know how to reuse stores.
// Secondly, because it's an unrealistic performance measurement.
//
// * Why use a remote (HTTP) database?
// It's more realistic to exercise the HTTP stack, even if it's just talking over localhost.
//
// * Why provide an option for leveldb vs memory underlying store?
// Again, leveldb is more realistic than memory, and in common cases disk space > memory space.
// However, on this developer's laptop, there is actually very little disk space, and a lot of memory;
// plus making the test run a little bit faster locally is nice.
var chunkStore chunks.ChunkStore
if *perfMemFlag {
chunkStore = chunks.NewMemoryStore()
} else {
ldbDir := suite.TempDir("suite.suite")
chunkStore = chunks.NewLevelDBStoreUseFlags(ldbDir, "")
}
server := datas.NewRemoteDatabaseServer(chunkStore, 0)
portChan := make(chan int)
server.Ready = func() { portChan <- server.Port() }
go server.Run()
port := <-portChan
host = fmt.Sprintf("http://localhost:%d", port)
stopFn = func() { server.Stop() }
return
}
+54 -35
View File
@@ -14,7 +14,6 @@ import (
"testing"
"github.com/attic-labs/noms/go/dataset"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/perf/suite"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/samples/go/csv"
@@ -24,12 +23,9 @@ import (
// CSV perf suites require the testdata directory to be checked out at $GOPATH/src/github.com/attic-labs/testdata (i.e. ../testdata relative to the noms directory).
// TODO: Add ny-vehicle-registrations test when CSV importing is faster (testdata/ny-vehicle-registrations/20150218.*).
type perfSuite struct {
suite.PerfSuite
csvImportExe string
sfcBlobHash hash.Hash
}
func (s *perfSuite) SetupSuite() {
@@ -46,36 +42,57 @@ func (s *perfSuite) SetupSuite() {
func (s *perfSuite) Test01ImportSfCrimeBlobFromTestdata() {
assert := s.NewAssert()
raw := s.openGlob(path.Join(s.Testdata, "sf-crime", "2016-07-28.*"))
defer s.closeGlob(raw)
files := s.openGlob(s.Testdata, "sf-crime", "2016-07-28.*")
defer s.closeGlob(files)
blob := types.NewBlob(io.MultiReader(raw...))
fmt.Fprintf(s.W, "csv/raw is %s\n", humanize.Bytes(blob.Len()))
blob := types.NewBlob(io.MultiReader(files...))
fmt.Fprintf(s.W, "\tsf-crime is %s\n", humanize.Bytes(blob.Len()))
ds := dataset.NewDataset(s.Database, "csv/raw")
ds := dataset.NewDataset(s.Database, "sf-crime/raw")
_, err := ds.CommitValue(blob)
assert.NoError(err)
}
func (s *perfSuite) Test02ImportSfCrimeCSVFromBlob() {
s.execCsvImportExe("sf-crime")
}
func (s *perfSuite) Test03ImportSfRegisteredBusinessesFromBlobAsMap() {
assert := s.NewAssert()
blobSpec := fmt.Sprintf("%s::csv/raw.value", s.DatabaseSpec)
destSpec := fmt.Sprintf("%s::csv", s.DatabaseSpec)
importCmd := exec.Command(s.csvImportExe, "-p", blobSpec, destSpec)
files := s.openGlob(s.Testdata, "sf-registered-businesses", "2016-07-25.csv")
defer s.closeGlob(files)
blob := types.NewBlob(io.MultiReader(files...))
fmt.Fprintf(s.W, "\tsf-reg-bus is %s\n", humanize.Bytes(blob.Len()))
ds := dataset.NewDataset(s.Database, "sf-reg-bus/raw")
_, err := ds.CommitValue(blob)
assert.NoError(err)
s.execCsvImportExe("sf-reg-bus", "--dest-type", "map:0")
}
func (s *perfSuite) execCsvImportExe(dsName string, args ...string) {
assert := s.NewAssert()
blobSpec := fmt.Sprintf("%s::%s/raw.value", s.DatabaseSpec, dsName)
destSpec := fmt.Sprintf("%s::%s", s.DatabaseSpec, dsName)
args = append(args, "-p", blobSpec, destSpec)
importCmd := exec.Command(s.csvImportExe, args...)
importCmd.Stdout = s.W
importCmd.Stderr = os.Stderr
assert.NoError(importCmd.Run())
}
func (s *perfSuite) TestParseNyVehicleRegistrations() {
func (s *perfSuite) TestParseSfCrime() {
assert := s.NewAssert()
raw := s.openGlob(path.Join(s.Testdata, "ny-vehicle-registrations", "20150218.*"))
defer s.closeGlob(raw)
files := s.openGlob(path.Join(s.Testdata, "sf-crime", "2016-07-28.*"))
defer s.closeGlob(files)
reader := csv.NewCSVReader(io.MultiReader(raw...), ',')
reader := csv.NewCSVReader(io.MultiReader(files...), ',')
for {
_, err := reader.Read()
if err != nil {
@@ -85,32 +102,34 @@ func (s *perfSuite) TestParseNyVehicleRegistrations() {
}
}
// openGlob opens all files that match `pattern`. Large CSV files in testdata are broken up into foo.a, foo.b, etc to get around GitHub file size restrictions.
func (s *perfSuite) openGlob(pattern string) (readers []io.Reader) {
// openGlob opens the concatenation of all files that match `pattern`, returned
// as []io.Reader so it can be used immediately with io.MultiReader.
//
// Large CSV files in testdata are broken up into foo.a, foo.b, etc to get
// around GitHub file size restrictions.
func (s *perfSuite) openGlob(pattern ...string) []io.Reader {
assert := s.NewAssert()
s.Pause(func() {
glob, err := filepath.Glob(pattern)
glob, err := filepath.Glob(path.Join(pattern...))
assert.NoError(err)
files := make([]io.Reader, len(glob))
for i, m := range glob {
f, err := os.Open(m)
assert.NoError(err)
readers = make([]io.Reader, len(glob))
for i, m := range glob {
r, err := os.Open(m)
assert.NoError(err)
readers[i] = r
}
})
return
files[i] = f
}
return files
}
// closeGlob closes `readers`. Intended to be used after `openGlob`.
func (s *perfSuite) closeGlob(readers []io.Reader) {
// closeGlob closes all of the files, designed to be used with openGlob.
func (s *perfSuite) closeGlob(files []io.Reader) {
assert := s.NewAssert()
s.Pause(func() {
for _, r := range readers {
assert.NoError(r.(io.ReadCloser).Close())
}
})
for _, f := range files {
assert.NoError(f.(*os.File).Close())
}
}
func TestPerf(t *testing.T) {