replica benchmark, no event progress for server (#2258)

* small push on write perf benchmark with no prog channel

* remove unused prog funcs

* remove unused prog funcs

* fix bench name

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

Co-authored-by: max-hoffman <max-hoffman@users.noreply.github.com>
This commit is contained in:
Maximilian Hoffman
2021-10-28 15:34:04 -07:00
committed by GitHub
parent f12196df03
commit ca5ae29dd1
6 changed files with 372 additions and 103 deletions

View File

@@ -382,7 +382,7 @@ func TestReadReplica(t *testing.T) {
}
defer os.Chdir(cwd)
multiSetup := testcommands.NewMultiRepoTestSetup(t)
multiSetup := testcommands.NewMultiRepoTestSetup(t.Fatal)
defer os.RemoveAll(multiSetup.Root)
multiSetup.NewDB("read_replica")

View File

@@ -17,7 +17,6 @@ package doltdb
import (
"context"
"io"
"sync"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
@@ -73,10 +72,7 @@ func replicate(ctx context.Context, destDB, srcDB datas.Database, tempTableDir s
return err
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := runProgFuncs(newCtx)
defer stopProgFuncs(cancelFunc, wg, progChan, pullerEventCh)
puller, err := datas.NewPuller(ctx, tempTableDir, defaultChunksPerTF, srcDB, destDB, stRef.TargetHash(), pullerEventCh)
puller, err := datas.NewPuller(ctx, tempTableDir, defaultChunksPerTF, srcDB, destDB, stRef.TargetHash(), nil)
if err == datas.ErrDBUpToDate {
return nil
} else if err != nil {
@@ -100,58 +96,3 @@ func replicate(ctx context.Context, destDB, srcDB datas.Database, tempTableDir s
_, err = destDB.SetHead(ctx, ds, stRef)
return err
}
func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case <-pullerEventCh:
default:
}
}
}
func progFunc(ctx context.Context, progChan <-chan datas.PullProgress) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case <-progChan:
default:
}
}
}
func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan datas.PullProgress, chan datas.PullerEvent) {
pullerEventCh := make(chan datas.PullerEvent)
progChan := make(chan datas.PullProgress)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
progFunc(ctx, progChan)
}()
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh)
}()
return wg, progChan, pullerEventCh
}
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) {
cancel()
close(progChan)
close(pullerEventCh)
wg.Wait()
}

View File

@@ -19,7 +19,6 @@ import (
"fmt"
"os"
"path/filepath"
"testing"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
@@ -27,6 +26,11 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/encoding"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/types"
)
@@ -46,7 +50,7 @@ type MultiRepoTestSetup struct {
DbPaths map[string]string
Home string
Remotes map[string]env.Remote
T *testing.T
Errhand func(args ...interface{})
}
const (
@@ -56,15 +60,15 @@ const (
)
// TODO this is not a proper builder, dbs need to be added before remotes
func NewMultiRepoTestSetup(t *testing.T) *MultiRepoTestSetup {
func NewMultiRepoTestSetup(errhand func(args ...interface{})) *MultiRepoTestSetup {
dir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatal(err)
errhand(err)
}
homeDir, err := os.MkdirTemp(dir, homePrefix)
if err != nil {
t.Fatal(err)
errhand(err)
}
return &MultiRepoTestSetup{
@@ -75,7 +79,7 @@ func NewMultiRepoTestSetup(t *testing.T) *MultiRepoTestSetup {
Root: dir,
Home: homeDir,
DbPaths: make(map[string]string, 0),
T: t,
Errhand: errhand,
}
}
@@ -95,13 +99,13 @@ func (mr *MultiRepoTestSetup) NewDB(dbName string) {
err := os.Chdir(repo)
if err != nil {
mr.T.Fatal(err)
mr.Errhand(err)
}
// TODO sometimes tempfiles scrubber is racy with tempfolder deleter
dEnv := env.Load(context.Background(), mr.homeProv, filesys.LocalFS, doltdb.LocalDirDoltDB, "test")
if err != nil {
mr.T.Fatal("Failed to initialize environment:" + err.Error())
mr.Errhand("Failed to initialize environment:" + err.Error())
}
cfg, _ := dEnv.Config.GetConfig(env.GlobalConfig)
cfg.SetStrings(map[string]string{
@@ -110,12 +114,12 @@ func (mr *MultiRepoTestSetup) NewDB(dbName string) {
})
err = dEnv.InitRepo(context.Background(), types.Format_Default, name, email, defaultBranch)
if err != nil {
mr.T.Fatal("Failed to initialize environment:" + err.Error())
mr.Errhand("Failed to initialize environment:" + err.Error())
}
ddb, err := doltdb.LoadDoltDB(ctx, types.Format_Default, doltdb.LocalDirDoltDB, filesys.LocalFS)
if err != nil {
mr.T.Fatal("Failed to initialize environment:" + err.Error())
mr.Errhand("Failed to initialize environment:" + err.Error())
}
dEnv = env.Load(context.Background(), mr.homeProv, filesys.LocalFS, doltdb.LocalDirDoltDB, "test")
@@ -152,23 +156,23 @@ func (mr *MultiRepoTestSetup) CloneDB(fromRemote, dbName string) {
dEnv := env.Load(context.Background(), mr.homeProv, filesys.LocalFS, doltdb.LocalDirDoltDB, "test")
dEnv, err = actions.EnvForClone(ctx, srcDB.Format(), r, cloneDir, dEnv.FS, dEnv.Version, mr.homeProv)
if err != nil {
mr.T.Fatal(err)
mr.Errhand(err)
}
err = actions.CloneRemote(ctx, srcDB, r.Name, "", dEnv)
if err != nil {
mr.T.Fatal(err)
mr.Errhand(err)
}
wd, err := os.Getwd()
if err != nil {
mr.T.Fatal(err)
mr.Errhand(err)
}
os.Chdir(cloneDir)
defer os.Chdir(wd)
ddb, err := doltdb.LoadDoltDB(ctx, types.Format_Default, doltdb.LocalDirDoltDB, filesys.LocalFS)
if err != nil {
mr.T.Fatal("Failed to initialize environment:" + err.Error())
mr.Errhand("Failed to initialize environment:" + err.Error())
}
dEnv = env.Load(context.Background(), mr.homeProv, filesys.LocalFS, doltdb.LocalDirDoltDB, "test")
@@ -182,7 +186,7 @@ func (mr *MultiRepoTestSetup) CloneDB(fromRemote, dbName string) {
func (mr *MultiRepoTestSetup) GetRemote(remoteName string) env.Remote {
rem, ok := mr.Remotes[remoteName]
if !ok {
mr.T.Fatal("remote not found")
mr.Errhand("remote not found")
}
return rem
}
@@ -190,7 +194,7 @@ func (mr *MultiRepoTestSetup) GetRemote(remoteName string) env.Remote {
func (mr *MultiRepoTestSetup) GetDB(dbName string) *doltdb.DoltDB {
db, ok := mr.DoltDBs[dbName]
if !ok {
mr.T.Fatal("db not found")
mr.Errhand("db not found")
}
return db
}
@@ -249,7 +253,7 @@ func (mr *MultiRepoTestSetup) CommitWithWorkingSet(dbName string) *doltdb.Commit
func (mr *MultiRepoTestSetup) CreateTable(dbName, tblName string) {
dEnv, ok := mr.MrEnv[dbName]
if !ok {
mr.T.Fatalf("Failed to find db: %s", dbName)
mr.Errhand(fmt.Sprintf("Failed to find db: %s", dbName))
}
imt, sch := dtestutils.CreateTestDataTable(true)
@@ -257,28 +261,28 @@ func (mr *MultiRepoTestSetup) CreateTable(dbName, tblName string) {
for i := 0; i < imt.NumRows(); i++ {
r, err := imt.GetRow(i)
if err != nil {
mr.T.Fatalf("Failed to create table: %s", err.Error())
mr.Errhand(fmt.Sprintf("Failed to create table: %s", err.Error()))
}
rows[i] = r
}
dtestutils.CreateTestTable(mr.T, dEnv, tblName, sch, rows...)
createTestTable(dEnv, tblName, sch, mr.Errhand, rows...)
}
func (mr *MultiRepoTestSetup) StageAll(dbName string) {
dEnv, ok := mr.MrEnv[dbName]
if !ok {
mr.T.Fatalf("Failed to find db: %s", dbName)
mr.Errhand(fmt.Sprintf("Failed to find db: %s", dbName))
}
ctx := context.Background()
roots, err := dEnv.Roots(ctx)
if !ok {
mr.T.Fatalf("Failed to get roots: %s", dbName)
mr.Errhand(fmt.Sprintf("Failed to get roots: %s", dbName))
}
roots, err = actions.StageAllTables(ctx, roots, dEnv.Docs)
err = dEnv.UpdateRoots(ctx, roots)
if err != nil {
mr.T.Fatalf("Failed to update roots: %s", dbName)
mr.Errhand(fmt.Sprintf("Failed to update roots: %s", dbName))
}
}
@@ -286,20 +290,118 @@ func (mr *MultiRepoTestSetup) PushToRemote(dbName, remoteName string) {
ctx := context.Background()
dEnv, ok := mr.MrEnv[dbName]
if !ok {
mr.T.Fatalf("Failed to find db: %s", dbName)
mr.Errhand(fmt.Sprintf("Failed to find db: %s", dbName))
}
ap := cli.CreatePushArgParser()
apr, err := ap.Parse([]string{remoteName, defaultBranch})
if err != nil {
mr.T.Fatalf("Failed to push remote: %s", err.Error())
mr.Errhand(fmt.Sprintf("Failed to push remote: %s", err.Error()))
}
opts, err := env.NewParseOpts(ctx, apr, dEnv.RepoStateReader(), dEnv.DoltDB, false, false)
if err != nil {
mr.T.Fatalf("Failed to push remote: %s", err.Error())
mr.Errhand(fmt.Sprintf("Failed to push remote: %s", err.Error()))
}
err = actions.DoPush(ctx, dEnv.RepoStateReader(), dEnv.RepoStateWriter(), dEnv.DoltDB, dEnv.TempTableFilesDir(), opts, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
if err != nil {
mr.T.Fatalf("Failed to push remote: %s", err.Error())
mr.Errhand(fmt.Sprintf("Failed to push remote: %s", err.Error()))
}
}
// createTestTable creates a new test table with the name, schema, and rows given.
func createTestTable(dEnv *env.DoltEnv, tableName string, sch schema.Schema, errhand func(args ...interface{}), rs ...row.Row) {
imt := table.NewInMemTable(sch)
for _, r := range rs {
_ = imt.AppendRow(r)
}
ctx := context.Background()
vrw := dEnv.DoltDB.ValueReadWriter()
rd := table.NewInMemTableReader(imt)
wr := noms.NewNomsMapCreator(ctx, vrw, sch)
_, _, err := table.PipeRows(ctx, rd, wr, false)
if err != nil {
errhand(err)
}
err = rd.Close(ctx)
if err != nil {
errhand(err)
}
err = wr.Close(ctx)
if err != nil {
errhand(err)
}
schVal, err := encoding.MarshalSchemaAsNomsValue(ctx, vrw, sch)
if err != nil {
errhand(err)
}
empty, err := types.NewMap(ctx, vrw)
if err != nil {
errhand(err)
}
tbl, err := doltdb.NewTable(ctx, vrw, schVal, wr.GetMap(), empty, nil)
if err != nil {
errhand(err)
}
tbl, err = editor.RebuildAllIndexes(ctx, tbl, editor.TestEditorOptions(vrw))
if err != nil {
errhand(err)
}
sch, err = tbl.GetSchema(ctx)
if err != nil {
errhand(err)
}
rows, err := tbl.GetRowData(ctx)
if err != nil {
errhand(err)
}
indexes, err := tbl.GetIndexData(ctx)
if err != nil {
errhand(err)
}
err = putTableToWorking(ctx, dEnv, sch, rows, indexes, tableName, nil)
if err != nil {
errhand(err)
}
}
func putTableToWorking(ctx context.Context, dEnv *env.DoltEnv, sch schema.Schema, rows types.Map, indexData types.Map, tableName string, autoVal types.Value) error {
root, err := dEnv.WorkingRoot(ctx)
if err != nil {
return doltdb.ErrNomsIO
}
vrw := dEnv.DoltDB.ValueReadWriter()
schVal, err := encoding.MarshalSchemaAsNomsValue(ctx, vrw, sch)
if err != nil {
return env.ErrMarshallingSchema
}
tbl, err := doltdb.NewTable(ctx, vrw, schVal, rows, indexData, autoVal)
if err != nil {
return err
}
newRoot, err := root.PutTable(ctx, tableName, tbl)
if err != nil {
return err
}
rootHash, err := root.HashOf()
if err != nil {
return err
}
newRootHash, err := newRoot.HashOf()
if err != nil {
return err
}
if rootHash == newRootHash {
return nil
}
return dEnv.UpdateWorkingRoot(ctx, newRoot)
}

View File

@@ -0,0 +1,2 @@
*.out
*.pdf

View File

@@ -0,0 +1,222 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package serverbench
import (
"context"
"fmt"
"os"
"path"
"runtime"
"runtime/pprof"
"strings"
"testing"
"github.com/gocraft/dbr/v2"
"golang.org/x/sync/errgroup"
srv "github.com/dolthub/dolt/go/cmd/dolt/commands/sqlserver"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/testcommands"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
)
type query string
type serverTest struct {
name string
setup []query
bench []query
}
// usage: `go test -bench .`
func BenchmarkPushOnWrite(b *testing.B) {
setup := make([]query, 1)
setup[0] = "CREATE TABLE bench (a int, b int, c int);"
q := strings.Builder{}
q.WriteString("INSERT INTO bench (a, b, c) VALUES (0, 0, 0)")
i := 1
for i < 1000 {
q.WriteString(fmt.Sprintf(",(%d, %d, %d)", i, i, i))
i++
}
qs := q.String()
bench := make([]query, 100)
commit := query("select dolt_commit('-am', 'cm')")
i = 0
for i < len(bench) {
bench[i] = query(qs)
bench[i+1] = commit
i += 2
}
benchmarkServer(b, serverTest{
name: "smoke bench",
setup: setup,
bench: bench,
})
}
func benchmarkServer(b *testing.B, test serverTest) {
var dEnv *env.DoltEnv
var cfg srv.ServerConfig
ctx := context.Background()
// setup
dEnv, cfg = getEnvAndConfig(ctx, b)
executeServerQueries(ctx, b, dEnv, cfg, test.setup)
// bench
f := getProfFile(b)
err := pprof.StartCPUProfile(f)
if err != nil {
b.Fatal(err)
}
defer func() {
pprof.StopCPUProfile()
if err = f.Close(); err != nil {
b.Fatal(err)
}
fmt.Printf("\twriting CPU profile for %s: %s\n", b.Name(), f.Name())
}()
b.Run(test.name, func(b *testing.B) {
executeServerQueries(ctx, b, dEnv, cfg, test.bench)
})
}
const (
database = "dolt_bench"
port = 1234
name = "name"
email = "name@fake.horse"
)
func getEnvAndConfig(ctx context.Context, b *testing.B) (dEnv *env.DoltEnv, cfg srv.ServerConfig) {
multiSetup := testcommands.NewMultiRepoTestSetup(b.Fatal)
multiSetup.NewDB("dolt_bench")
multiSetup.NewRemote("remote1")
writerName := multiSetup.DbNames[0]
localCfg, ok := multiSetup.MrEnv[writerName].Config.GetConfig(env.LocalConfig)
if !ok {
b.Fatal("local config does not exist")
}
localCfg.SetStrings(map[string]string{doltdb.ReplicateToRemoteKey: "remote1"})
yaml := []byte(fmt.Sprintf(`
log_level: warning
behavior:
read_only: false
user:
name: "root"
password: ""
databases:
- name: "%s"
path: "%s"
listener:
host: localhost
port: %d
max_connections: 128
read_timeout_millis: 28800000
write_timeout_millis: 28800000
`, writerName, multiSetup.DbPaths[writerName], port))
cfg, err := srv.NewYamlConfig(yaml)
if err != nil {
b.Fatal(err)
}
return multiSetup.MrEnv[writerName], cfg
}
func getProfFile(b *testing.B) *os.File {
_, testFile, _, _ := runtime.Caller(0)
f, err := os.Create(path.Join(path.Dir(testFile), b.Name()+".out"))
if err != nil {
b.Fatal(err)
}
return f
}
func executeServerQueries(ctx context.Context, b *testing.B, dEnv *env.DoltEnv, cfg srv.ServerConfig, queries []query) {
serverController := srv.CreateServerController()
eg, ctx := errgroup.WithContext(ctx)
//b.Logf("Starting server with Config %v\n", srv.ConfigInfo(cfg))
eg.Go(func() (err error) {
startErr, closeErr := srv.Serve(ctx, "", cfg, serverController, dEnv)
if startErr != nil {
return startErr
}
if closeErr != nil {
return closeErr
}
return nil
})
if err := serverController.WaitForStart(); err != nil {
b.Fatal(err)
}
for _, q := range queries {
if err := executeQuery(cfg, q); err != nil {
b.Fatal(err)
}
}
serverController.StopServer()
if err := serverController.WaitForClose(); err != nil {
b.Fatal(err)
}
if err := eg.Wait(); err != nil {
b.Fatal(err)
}
}
func executeQuery(cfg srv.ServerConfig, q query) error {
cs := srv.ConnectionString(cfg) + database
conn, err := dbr.Open("mysql", cs, nil)
if err != nil {
return err
}
rows, err := conn.Query(string(q))
if err != nil {
return err
}
for {
if err = rows.Err(); err != nil {
return err
}
if ok := rows.Next(); !ok {
break
}
}
return rows.Err()
}

View File

@@ -122,10 +122,6 @@ func NewTFPullerEvent(et PullerEventType, details *TableFileEventDetails) Puller
// NewPuller creates a new Puller instance to do the syncing. If a nil puller is returned without error that means
// that there is nothing to pull and the sinkDB is already up to date.
func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcDB, sinkDB Database, rootChunkHash hash.Hash, eventCh chan PullerEvent) (*Puller, error) {
if eventCh == nil {
panic("eventCh is required")
}
// Sanity Check
exists, err := srcDB.chunkStore().Has(ctx, rootChunkHash)
@@ -226,10 +222,10 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, ae *atomicerr.AtomicEr
fileSize := fi.Size()
fWithStats := iohelp.NewReaderWithStats(f, fileSize)
fWithStats.Start(func(stats iohelp.ReadStats) {
p.eventCh <- NewTFPullerEvent(UploadTableFileUpdateEvent, &TableFileEventDetails{
p.addEvent(NewTFPullerEvent(UploadTableFileUpdateEvent, &TableFileEventDetails{
CurrentFileSize: fileSize,
Stats: stats,
})
}))
})
defer func() {
fWithStats.Stop()
@@ -253,9 +249,9 @@ func (p *Puller) processCompletedTables(ctx context.Context, ae *atomicerr.Atomi
continue // drain
}
p.eventCh <- NewTFPullerEvent(StartUploadTableFileEvent, &TableFileEventDetails{
p.addEvent(NewTFPullerEvent(StartUploadTableFileEvent, &TableFileEventDetails{
CurrentFileSize: int64(tblFile.wr.ContentLength()),
})
}))
var id string
id, err = tblFile.wr.Finish()
@@ -284,9 +280,9 @@ func (p *Puller) processCompletedTables(ctx context.Context, ae *atomicerr.Atomi
continue
}
p.eventCh <- NewTFPullerEvent(EndUploadTableFileEvent, &TableFileEventDetails{
p.addEvent(NewTFPullerEvent(EndUploadTableFileEvent, &TableFileEventDetails{
CurrentFileSize: int64(ttf.contentLen),
})
}))
fileIdToNumChunks[id] = ttf.numChunks
}
@@ -323,7 +319,7 @@ func (p *Puller) Pull(ctx context.Context) error {
chunksInLevel := len(absent)
twDetails.ChunksInLevel = chunksInLevel
p.eventCh <- NewTWPullerEvent(NewLevelTWEvent, twDetails)
p.addEvent(NewTWPullerEvent(NewLevelTWEvent, twDetails))
var err error
absent, err = p.sinkDBCS.HasMany(ctx, absent)
@@ -333,7 +329,7 @@ func (p *Puller) Pull(ctx context.Context) error {
}
twDetails.ChunksAlreadyHad = chunksInLevel - len(absent)
p.eventCh <- NewTWPullerEvent(DestDBHasTWEvent, twDetails)
p.addEvent(NewTWPullerEvent(DestDBHasTWEvent, twDetails))
if len(absent) > 0 {
leaves, absent, err = p.getCmp(ctx, twDetails, leaves, absent, completedTables)
@@ -438,7 +434,7 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
twDetails.ChunksBuffered++
if twDetails.ChunksBuffered%100 == 0 {
p.eventCh <- NewTWPullerEvent(LevelUpdateTWEvent, twDetails)
p.addEvent(NewTWPullerEvent(LevelUpdateTWEvent, twDetails))
}
err = p.wr.AddCmpChunk(cmpAndRef.cmpChnk)
@@ -448,9 +444,9 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
}
if p.wr.Size() >= p.chunksPerTF {
p.eventCh <- NewTFPullerEvent(TableFileClosedEvent, &TableFileEventDetails{
p.addEvent(NewTFPullerEvent(TableFileClosedEvent, &TableFileEventDetails{
CurrentFileSize: int64(p.wr.ContentLength()),
})
}))
completedTables <- FilledWriters{p.wr}
p.wr = nil
@@ -486,8 +482,14 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
return nil, nil, errors.New("failed to get all chunks.")
}
p.eventCh <- NewTWPullerEvent(LevelDoneTWEvent, twDetails)
p.addEvent(NewTWPullerEvent(LevelDoneTWEvent, twDetails))
twDetails.TreeLevel = maxHeight
return nextLeaves, nextLevel, nil
}
func (p *Puller) addEvent(evt PullerEvent) {
if p.eventCh != nil {
p.eventCh <- evt
}
}