From 4c6f0fe809ee7f6888531e2519c5b05ade585bcb Mon Sep 17 00:00:00 2001 From: Brian Hendriks Date: Tue, 21 Sep 2021 14:30:38 -0700 Subject: [PATCH 1/5] pprof-server usage message on start (#2159) --- go/cmd/dolt/dolt.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/go/cmd/dolt/dolt.go b/go/cmd/dolt/dolt.go index 7bbde32c8f..24f7b103b7 100644 --- a/go/cmd/dolt/dolt.go +++ b/go/cmd/dolt/dolt.go @@ -16,7 +16,6 @@ package main import ( "context" - "log" "net/http" _ "net/http/pprof" "os" @@ -148,7 +147,27 @@ func runMain() int { case pprofServerFlag: // serve the pprof endpoints setup in the init function run when "net/http/pprof" is imported go func() { - log.Println(http.ListenAndServe("localhost:6060", nil)) + cyanStar := color.CyanString("*") + cli.Println(cyanStar, "Starting pprof server on port 6060.") + cli.Println(cyanStar, "Go to", color.CyanString("http://localhost:6060/debug/pprof"), "in a browser to see supported endpoints.") + cli.Println(cyanStar) + cli.Println(cyanStar, "Known endpoints are:") + cli.Println(cyanStar, " /allocs: A sampling of all past memory allocations") + cli.Println(cyanStar, " /block: Stack traces that led to blocking on synchronization primitives") + cli.Println(cyanStar, " /cmdline: The command line invocation of the current program") + cli.Println(cyanStar, " /goroutine: Stack traces of all current goroutines") + cli.Println(cyanStar, " /heap: A sampling of memory allocations of live objects. You can specify the gc GET parameter to run GC before taking the heap sample.") + cli.Println(cyanStar, " /mutex: Stack traces of holders of contended mutexes") + cli.Println(cyanStar, " /profile: CPU profile. You can specify the duration in the seconds GET parameter. After you get the profile file, use the go tool pprof command to investigate the profile.") + cli.Println(cyanStar, " /threadcreate: Stack traces that led to the creation of new OS threads") + cli.Println(cyanStar, " /trace: A trace of execution of the current program. You can specify the duration in the seconds GET parameter. After you get the trace file, use the go tool trace command to investigate the trace.") + cli.Println() + + err := http.ListenAndServe("localhost:6060", nil) + + if err != nil { + cli.Println(color.YellowString("pprof server exited with error: %v", err)) + } }() args = args[1:] From b3c1b82b99c45068fe38f8bd9b8a2d73774e023b Mon Sep 17 00:00:00 2001 From: zachmu Date: Thu, 23 Sep 2021 16:30:18 +0000 Subject: [PATCH 2/5] [ga-bump-release] Update Dolt version to 0.28.5 and release v0.28.5 --- go/cmd/dolt/dolt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/cmd/dolt/dolt.go b/go/cmd/dolt/dolt.go index 24f7b103b7..c61caf258c 100644 --- a/go/cmd/dolt/dolt.go +++ b/go/cmd/dolt/dolt.go @@ -49,7 +49,7 @@ import ( ) const ( - Version = "0.28.4" + Version = "0.28.5" ) var dumpDocsCommand = &commands.DumpDocsCmd{} From 5cc08526923fabd036b479f700f3c27d63bac8b2 Mon Sep 17 00:00:00 2001 From: Brian Hendriks Date: Thu, 23 Sep 2021 10:57:31 -0700 Subject: [PATCH 3/5] optimize keyless read row (#2094) --- .../sqle/enginetest/dolt_engine_test.go | 3 +- go/libraries/doltcore/sqle/rows.go | 103 +++++++++++++----- go/libraries/doltcore/sqle/tables.go | 16 ++- integration-tests/bats/auto_increment.bats | 61 ++++++----- .../bats/helper/index-on-writes-common.bash | 4 +- .../bats/import-create-tables.bats | 8 +- integration-tests/bats/index.bats | 6 +- integration-tests/bats/merge.bats | 2 +- integration-tests/bats/query-catalog.bats | 6 +- integration-tests/bats/sql-server.bats | 2 +- integration-tests/bats/sql.bats | 10 +- 11 files changed, 141 insertions(+), 80 deletions(-) diff --git a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go index 442de81239..9bbc6bac48 100644 --- a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go +++ b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go @@ -24,7 +24,8 @@ import ( ) func init() { - sqle.MinRowsPerPartition = 2 + sqle.MinRowsPerPartition = 8 + sqle.MaxRowsPerPartition = 1024 } func TestQueries(t *testing.T) { diff --git a/go/libraries/doltcore/sqle/rows.go b/go/libraries/doltcore/sqle/rows.go index b5765e5db0..66119c8e43 100644 --- a/go/libraries/doltcore/sqle/rows.go +++ b/go/libraries/doltcore/sqle/rows.go @@ -26,6 +26,38 @@ import ( "github.com/dolthub/dolt/go/store/types" ) +var _ sql.RowIter = (*keylessRowIter)(nil) + +type keylessRowIter struct { + keyedIter *DoltMapIter + + cardIdx int + nonCardCols int + + lastRead sql.Row + lastCard uint64 +} + +func (k *keylessRowIter) Next() (sql.Row, error) { + if k.lastCard == 0 { + r, err := k.keyedIter.Next() + + if err != nil { + return nil, err + } + + k.lastCard = r[k.cardIdx].(uint64) + k.lastRead = r[:k.nonCardCols] + } + + k.lastCard-- + return k.lastRead, nil +} + +func (k keylessRowIter) Close(ctx *sql.Context) error { + return k.keyedIter.Close(ctx) +} + // An iterator over the rows of a table. type doltTableRowIter struct { sql.RowIter @@ -43,48 +75,71 @@ func newRowIterator(ctx *sql.Context, tbl *doltdb.Table, projCols []string, part if schema.IsKeyless(sch) { // would be more optimal to project columns into keyless tables also - return newKeylessRowIterator(ctx, tbl, partition) + return newKeylessRowIterator(ctx, tbl, projCols, partition) } else { return newKeyedRowIter(ctx, tbl, projCols, partition) } } -func newKeylessRowIterator(ctx *sql.Context, tbl *doltdb.Table, partition *doltTablePartition) (*doltTableRowIter, error) { - var iter table.SqlTableReader - var err error - if partition.end == NoUpperBound { - iter, err = table.NewBufferedTableReader(ctx, tbl) - } else { - iter, err = table.NewBufferedTableReaderForPartition(ctx, tbl, partition.start, partition.end) - } - +func newKeylessRowIterator(ctx *sql.Context, tbl *doltdb.Table, projectedCols []string, partition *doltTablePartition) (sql.RowIter, error) { + mapIter, err := iterForPartition(ctx, partition) if err != nil { return nil, err } - return &doltTableRowIter{ - ctx: ctx, - reader: iter, + cols, tagToSqlColIdx, err := getTagToResColIdx(ctx, tbl, projectedCols) + if err != nil { + return nil, err + } + + idxOfCardinality := len(cols) + tagToSqlColIdx[schema.KeylessRowCardinalityTag] = idxOfCardinality + + colsCopy := make([]schema.Column, len(cols), len(cols)+1) + copy(colsCopy, cols) + colsCopy = append(colsCopy, schema.NewColumn("__cardinality__", schema.KeylessRowCardinalityTag, types.UintKind, false)) + + conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, colsCopy, len(colsCopy)) + keyedItr, err := NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil + if err != nil { + return nil, err + } + + return &keylessRowIter{ + keyedIter: keyedItr, + cardIdx: idxOfCardinality, + nonCardCols: len(cols), }, nil } func newKeyedRowIter(ctx context.Context, tbl *doltdb.Table, projectedCols []string, partition *doltTablePartition) (sql.RowIter, error) { - var err error - var mapIter types.MapTupleIterator + mapIter, err := iterForPartition(ctx, partition) + if err != nil { + return nil, err + } + + cols, tagToSqlColIdx, err := getTagToResColIdx(ctx, tbl, projectedCols) + if err != nil { + return nil, err + } + + conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, cols, len(cols)) + return NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil +} + +func iterForPartition(ctx context.Context, partition *doltTablePartition) (types.MapTupleIterator, error) { rowData := partition.rowData if partition.end == NoUpperBound { - mapIter, err = rowData.RangeIterator(ctx, 0, rowData.Len()) + return rowData.RangeIterator(ctx, 0, rowData.Len()) } else { - mapIter, err = partition.IteratorForPartition(ctx, rowData) - } - - if err != nil { - return nil, err + return partition.IteratorForPartition(ctx, rowData) } +} +func getTagToResColIdx(ctx context.Context, tbl *doltdb.Table, projectedCols []string) ([]schema.Column, map[uint64]int, error) { sch, err := tbl.GetSchema(ctx) if err != nil { - return nil, err + return nil, nil, err } cols := sch.GetAllCols().GetColumns() @@ -96,9 +151,7 @@ func newKeyedRowIter(ctx context.Context, tbl *doltdb.Table, projectedCols []str tagToSqlColIdx[col.Tag] = i } } - - conv := NewKVToSqlRowConverter(tbl.Format(), tagToSqlColIdx, cols, len(cols)) - return NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil + return cols, tagToSqlColIdx, nil } // Next returns the next row in this row iterator, or an io.EOF error if there aren't any more. diff --git a/go/libraries/doltcore/sqle/tables.go b/go/libraries/doltcore/sqle/tables.go index b1786ed197..243c15260b 100644 --- a/go/libraries/doltcore/sqle/tables.go +++ b/go/libraries/doltcore/sqle/tables.go @@ -47,6 +47,7 @@ const ( partitionMultiplier = 2.0 ) +var MaxRowsPerPartition uint64 = 32 * 1024 var MinRowsPerPartition uint64 = 1024 func init() { @@ -358,15 +359,20 @@ func (t *DoltTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error) { return newDoltTablePartitionIter(rowData, doltTablePartition{0, 0, rowData}), nil } - maxPartitions := uint64(partitionMultiplier * runtime.NumCPU()) - numPartitions := (numElements / MinRowsPerPartition) + 1 + itemsPerPartition := MaxRowsPerPartition + numPartitions := (numElements / itemsPerPartition) + 1 + if numPartitions < uint64(partitionMultiplier*runtime.NumCPU()) { + itemsPerPartition = numElements / uint64(partitionMultiplier*runtime.NumCPU()) - if numPartitions > maxPartitions { - numPartitions = maxPartitions + if itemsPerPartition == 0 { + itemsPerPartition = numElements + numPartitions = 1 + } else { + numPartitions = (numElements / itemsPerPartition) + 1 + } } partitions := make([]doltTablePartition, numPartitions) - itemsPerPartition := numElements / numPartitions for i := uint64(0); i < numPartitions-1; i++ { partitions[i] = doltTablePartition{i * itemsPerPartition, (i + 1) * itemsPerPartition, rowData} } diff --git a/integration-tests/bats/auto_increment.bats b/integration-tests/bats/auto_increment.bats index d691108a0f..039259ab78 100644 --- a/integration-tests/bats/auto_increment.bats +++ b/integration-tests/bats/auto_increment.bats @@ -23,13 +23,13 @@ teardown() { run dolt sql -q "INSERT INTO test (c0) VALUES (44);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 = 44;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 = 44 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "4,44" ]] || false run dolt sql -q "INSERT INTO test (c0) VALUES (55),(66);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 50;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 50 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "5,55" ]] || false [[ "$output" =~ "6,66" ]] || false @@ -45,7 +45,7 @@ CREATE TABLE ai ( INSERT INTO ai VALUES (NULL,1),(NULL,2),(NULL,3); SQL [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM ai;" -r csv + run dolt sql -q "SELECT * FROM ai ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2,2" ]] || false @@ -55,7 +55,7 @@ SQL @test "auto_increment: insert into empty auto_increment table" { run dolt sql -q "INSERT INTO test (c0) VALUES (1);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 = 1;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 = 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false } @@ -67,7 +67,7 @@ SQL [ "$status" -eq 0 ] run dolt sql -q "INSERT INTO test VALUES (2,2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2,2" ]] || false @@ -80,13 +80,14 @@ SQL run dolt sql -q "INSERT INTO test (pk,c0) VALUES (NULL,2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 1;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "2,2" ]] || false run dolt sql -q "INSERT INTO test VALUES (NULL,3), (10,10), (NULL,11);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 2;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 2 ORDER BY pk;" -r csv + echo $output [ "$status" -eq 0 ] [[ "$output" =~ "3,3" ]] || false [[ "$output" =~ "10,10" ]] || false @@ -98,13 +99,13 @@ SQL run dolt sql -q "INSERT INTO test (pk,c0) VALUES (0,2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 1;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "2,2" ]] || false run dolt sql -q "INSERT INTO test VALUES (0,3), (10,10), (0,11);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test WHERE c0 > 2;" -r csv + run dolt sql -q "SELECT * FROM test WHERE c0 > 2 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "3,3" ]] || false [[ "$output" =~ "10,10" ]] || false @@ -121,7 +122,7 @@ INSERT INTO test (c0) VALUES (21); SQL [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2,2" ]] || false @@ -143,7 +144,7 @@ INSERT INTO test2 SELECT (pk + 20), c0 FROM test2; SQL [ "$status" -eq 0 ] - run dolt sql -q "select * from test2 order by pk" -r csv + run dolt sql -q "select * from test2 ORDER BY pk" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2,2" ]] || false @@ -170,7 +171,7 @@ SQL dolt sql -q "INSERT INTO auto_float (pk, c0) VALUES (3.9,4);" dolt sql -q "INSERT INTO auto_float (c0) VALUES (5);" - run dolt sql -q "SELECT * FROM auto_float;" -r csv + run dolt sql -q "SELECT * FROM auto_float ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "1,1" ]] || false [[ "$output" =~ "2.1,2" ]] || false @@ -193,7 +194,7 @@ SQL echo "$TYPE" run dolt sql -q "INSERT INTO auto_$TYPE (c0) VALUES (2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM auto_$TYPE WHERE c0 > 1;" -r csv + run dolt sql -q "SELECT * FROM auto_$TYPE WHERE c0 > 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "2,2" ]] || false done @@ -211,7 +212,7 @@ SQL echo "$TYPE" run dolt sql -q "INSERT INTO auto2_$TYPE (c0) VALUES (2);" [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM auto2_$TYPE WHERE c0 > 1;" -r csv + run dolt sql -q "SELECT * FROM auto2_$TYPE WHERE c0 > 1 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "2,2" ]] || false done @@ -248,7 +249,7 @@ SQL dolt merge other dolt sql -q "INSERT INTO test VALUES (NULL,22);" - run dolt sql -q "SELECT pk FROM test WHERE c0 = 22;" -r csv + run dolt sql -q "SELECT pk FROM test WHERE c0 = 22 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "22" ]] || false } @@ -271,7 +272,7 @@ SQL dolt checkout master dolt merge other dolt sql -q "INSERT INTO test VALUES (NULL,22);" - run dolt sql -q "SELECT pk FROM test WHERE c0 = 22;" -r csv + run dolt sql -q "SELECT pk FROM test WHERE c0 = 22 ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "22" ]] || false } @@ -280,7 +281,7 @@ SQL run dolt sql -q "ALTER TABLE test AUTO_INCREMENT = 10;" [ "$status" -eq 0 ] dolt sql -q "INSERT INTO test VALUES (NULL,10);" - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "10,10" ]] || false @@ -288,7 +289,7 @@ SQL ALTER TABLE test AUTO_INCREMENT = 20; INSERT INTO test VALUES (NULL,20),(30,30),(NULL,31); SQL - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "10,10" ]] || false [[ "$output" =~ "20,20" ]] || false @@ -310,7 +311,7 @@ ALTER TABLE index_test ADD INDEX (c0); INSERT INTO index_test (c0) VALUES (4),(5),(6); SQL - run dolt sql -q "select * from index_test" -r csv + run dolt sql -q "select * from index_test ORDER BY pk" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -329,7 +330,7 @@ SQL dolt sql -q "INSERT INTO test (c0) SELECT pk FROM other;" - run dolt sql -q "SELECT * FROM test;" -r csv + run dolt sql -q "SELECT * FROM test ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -351,7 +352,7 @@ TRUNCATE t; INSERT INTO t (c0) VALUES (1),(2),(3); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -370,7 +371,7 @@ INSERT INTO t (c0) VALUES (1),(2),(3); INSERT INTO t (c0) VALUES (4),(5),(6); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -393,7 +394,7 @@ INSERT INTO t VALUES (4, 4); INSERT INTO t (c0) VALUES (5),(6),(7); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -419,7 +420,7 @@ INSERT into t VALUES (3, 3); INSERT INTO t (c0) VALUES (8); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -451,7 +452,7 @@ SELECT DOLT_MERGE('test'); INSERT INTO t VALUES (NULL,5),(6,6),(NULL,7); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -482,7 +483,7 @@ SELECT DOLT_MERGE('test'); INSERT INTO t VALUES (10,10),(NULL,11); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -515,7 +516,7 @@ SELECT DOLT_MERGE('test'); INSERT INTO t VALUES (3,3),(NULL,6); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -548,7 +549,7 @@ SELECT DOLT_MERGE('test'); INSERT INTO t VALUES (NULL,6); SQL - run dolt sql -q "SELECT * FROM t;" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk;" -r csv [ "$status" -eq 0 ] [[ "${lines[0]}" =~ "pk,c0" ]] || false [[ "${lines[1]}" =~ "1,1" ]] || false @@ -564,7 +565,7 @@ SQL [ "$status" -eq 0 ] dolt sql -q "insert into test2 values (0, 'john', 0)" - run dolt sql -q "SELECT * from test2" -r csv + run dolt sql -q "SELECT * from test2 ORDER BY pk" -r csv [ "$status" -eq 0 ] [[ "$output" =~ "0,john,0" ]] || false } @@ -574,7 +575,7 @@ SQL dolt sql -q "ALTER TABLE t CHANGE COLUMN pk pk int NOT NULL AUTO_INCREMENT PRIMARY KEY;" dolt sql -q 'insert into t values (NULL), (NULL), (NULL)' - run dolt sql -q "SELECT * FROM t" -r csv + run dolt sql -q "SELECT * FROM t ORDER BY pk" -r csv [[ "${lines[0]}" =~ "pk" ]] || false [[ "${lines[1]}" =~ "1" ]] || false [[ "${lines[2]}" =~ "2" ]] || false diff --git a/integration-tests/bats/helper/index-on-writes-common.bash b/integration-tests/bats/helper/index-on-writes-common.bash index 21f93e5336..036918fd90 100644 --- a/integration-tests/bats/helper/index-on-writes-common.bash +++ b/integration-tests/bats/helper/index-on-writes-common.bash @@ -100,12 +100,12 @@ test_mutation() { expected="$3" uses_pk="$4" dolt sql -q "$dml" - run dolt sql -q "select * from $table" -r csv + run dolt sql -q "select * from $table ORDER BY pk1" -r csv [ "$status" -eq "0" ] [ "$output" == "$expected" ] || (echo $output && exit 1) dolt reset --hard dolt sql --batch -q "$dml ; $dml" - run dolt sql -q "select * from $table" -r csv + run dolt sql -q "select * from $table ORDER BY pk1" -r csv [ "$status" -eq "0" ] [ "$output" == "$expected" ] || (echo $output && exit 1) run dolt sql -q "explain $dml" diff --git a/integration-tests/bats/import-create-tables.bats b/integration-tests/bats/import-create-tables.bats index 828a853752..d4e48cd166 100755 --- a/integration-tests/bats/import-create-tables.bats +++ b/integration-tests/bats/import-create-tables.bats @@ -392,7 +392,7 @@ SQL run dolt ls [ "$status" -eq 0 ] [[ "$output" =~ "test" ]] || false - run dolt sql -q "select * from test" + run dolt sql -q "select * from test ORDER BY pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 11 ] [ "${lines[3]}" = '| a | "" | 1 |' ] @@ -418,7 +418,7 @@ SQL [ "$status" -eq 0 ] # schema argument subsets the data and adds empty column - run dolt sql -r csv -q "select * from subset" + run dolt sql -r csv -q "select * from subset ORDER BY pk" [ "$status" -eq 0 ] [ "${lines[0]}" = "pk,c1,c3,noData" ] [ "${lines[1]}" = "0,1,3," ] @@ -440,7 +440,7 @@ SQL run dolt ls [ "$status" -eq 0 ] [[ "$output" =~ "empty_strings_null_values" ]] || false - run dolt sql -q "select * from empty_strings_null_values" + run dolt sql -q "select * from empty_strings_null_values ORDER BY pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 11 ] [ "${lines[3]}" = '| a | "" | 1 |' ] @@ -467,7 +467,7 @@ SQL run dolt ls [ "$status" -eq 0 ] [[ "$output" =~ "test" ]] || false - run dolt sql -q "select * from test" + run dolt sql -q "select * from test ORDER BY pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 11 ] [ "${lines[3]}" = '| a | "" | 1 |' ] diff --git a/integration-tests/bats/index.bats b/integration-tests/bats/index.bats index 6a366212af..91f6493431 100644 --- a/integration-tests/bats/index.bats +++ b/integration-tests/bats/index.bats @@ -2025,13 +2025,13 @@ CREATE TABLE test2 ( INSERT INTO test VALUES (0, NULL), (1, NULL), (2, NULL); INSERT INTO test2 VALUES (0, NULL, NULL), (1, NULL, NULL), (2, 1, NULL), (3, 1, NULL), (4, NULL, 1), (5, NULL, 1); SQL - run dolt sql -q "SELECT * FROM test" -r=json + run dolt sql -q "SELECT * FROM test order by pk" -r=json [ "$status" -eq "0" ] [[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2}]}' ]] || false - run dolt sql -q "SELECT * FROM test WHERE v1 IS NULL" -r=json + run dolt sql -q "SELECT * FROM test WHERE v1 IS NULL order by pk" -r=json [ "$status" -eq "0" ] [[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2}]}' ]] || false - run dolt sql -q "SELECT * FROM test2" -r=json + run dolt sql -q "SELECT * FROM test2 order by pk" -r=json [ "$status" -eq "0" ] [[ "$output" =~ '{"rows": [{"pk":0},{"pk":1},{"pk":2,"v1":1},{"pk":3,"v1":1},{"pk":4,"v2":1},{"pk":5,"v2":1}]}' ]] || false } diff --git a/integration-tests/bats/merge.bats b/integration-tests/bats/merge.bats index 55af6d9f8b..6d66c269ce 100644 --- a/integration-tests/bats/merge.bats +++ b/integration-tests/bats/merge.bats @@ -336,7 +336,7 @@ SQL dolt checkout master run dolt merge other [ "$status" -eq 0 ] - run dolt sql -q "SELECT * FROM quiz;" -r csv + run dolt sql -q "SELECT * FROM quiz ORDER BY pk;" -r csv [[ "${lines[0]}" =~ "pk" ]] || false [[ "${lines[1]}" =~ "10" ]] || false [[ "${lines[2]}" =~ "11" ]] || false diff --git a/integration-tests/bats/query-catalog.bats b/integration-tests/bats/query-catalog.bats index facad96269..2288ccc7a1 100644 --- a/integration-tests/bats/query-catalog.bats +++ b/integration-tests/bats/query-catalog.bats @@ -100,7 +100,7 @@ teardown() { @test "query-catalog: executed saved" { Q1="select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 order by 1" - Q2="select pk from one_pk" + Q2="select pk from one_pk order by pk" dolt sql -q "$Q1" -s name1 dolt sql -q "$Q2" -s name2 @@ -138,7 +138,7 @@ EOF EXPECTED=$(cat <<'EOF' id,display_order,name,query,description name1,1,name1,"select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 order by 1","" -name2,2,name2,select pk from one_pk,"" +name2,2,name2,select pk from one_pk order by pk,"" EOF ) @@ -154,7 +154,7 @@ EOF EXPECTED=$(cat <<'EOF' id,display_order,name,query,description name1,1,name1,"select pk, pk1, pk2 from one_pk,two_pk where one_pk.c1=two_pk.c1 and pk < 3 order by 1 desc","" -name2,2,name2,select pk from one_pk,"" +name2,2,name2,select pk from one_pk order by pk,"" EOF ) diff --git a/integration-tests/bats/sql-server.bats b/integration-tests/bats/sql-server.bats index 6c7739d5cd..19a32eecbe 100644 --- a/integration-tests/bats/sql-server.bats +++ b/integration-tests/bats/sql-server.bats @@ -683,7 +683,7 @@ SQL SELECT DOLT_MERGE('feature-branch'); " - server_query repo1 1 "SELECT * FROM test" "pk\n1\n2\n3\n1000" + server_query repo1 1 "SELECT * FROM test ORDER BY pk" "pk\n1\n2\n3\n1000" server_query repo1 1 "SELECT COUNT(*) FROM dolt_log" "COUNT(*)\n3" } diff --git a/integration-tests/bats/sql.bats b/integration-tests/bats/sql.bats index f712790e38..0682364e8f 100755 --- a/integration-tests/bats/sql.bats +++ b/integration-tests/bats/sql.bats @@ -324,23 +324,23 @@ SQL } @test "sql: basic inner join" { - run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1" + run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 order by pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 8 ] first_join_output=$output - run dolt sql -q "select pk,pk1,pk2 from two_pk join one_pk on one_pk.c1=two_pk.c1" + run dolt sql -q "select pk,pk1,pk2 from two_pk join one_pk on one_pk.c1=two_pk.c1 order by pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 8 ] [ "$output" = "$first_join_output" ] - run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 where pk=1" + run dolt sql -q "select pk,pk1,pk2 from one_pk join two_pk on one_pk.c1=two_pk.c1 where pk=1 order by pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 5 ] - run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1" + run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 order by pk" [ "$status" -eq 0 ] [[ "$output" =~ foo ]] || false [[ "$output" =~ bar ]] || false [ "${#lines[@]}" -eq 8 ] - run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 where one_pk.c1=10" + run dolt sql -q "select pk,pk1,pk2,one_pk.c1 as foo,two_pk.c1 as bar from one_pk join two_pk on one_pk.c1=two_pk.c1 where one_pk.c1=10 order by pk" [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 5 ] [[ "$output" =~ "10" ]] || false From b0a75ccbc07cb7f73164c0d41ed3a115133c95e4 Mon Sep 17 00:00:00 2001 From: Maximilian Hoffman Date: Thu, 23 Sep 2021 11:15:36 -0700 Subject: [PATCH 4/5] replicate HEAD on commit (#2151) * save progress * progress before stash pop * hacky prototype * bad merge * simple bats * delete unnecessary code * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * format * refactor commitHook command structure * progress commit, local testing works * Fix head update issue * re: aaron, fix ctx cleanup * commit hooks recieve error handler from user facing packages * Add storage level replication hook test * Simplify replication parameters * Add bats test for server commit replication * rename backup key, was missing the stop progress channel call * small fixes * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * Attach server Logger.Out to commit hooks * formatting * zach's comments * missed sql-server test * accidentally commented bats Co-authored-by: max-hoffman --- go/cmd/dolt/commands/push.go | 8 +- go/cmd/dolt/commands/sqlserver/server.go | 2 + go/cmd/dolt/dolt.go | 5 +- go/libraries/doltcore/doltdb/commit_hooks.go | 157 ++++++++++++++++++ .../doltcore/doltdb/commit_hooks_test.go | 149 +++++++++++++++++ go/libraries/doltcore/doltdb/doltdb.go | 13 ++ go/libraries/doltcore/env/actions/remotes.go | 3 +- go/libraries/doltcore/env/environment.go | 39 +++++ go/libraries/doltcore/env/remotes.go | 11 ++ .../doltcore/remotestorage/chunk_store.go | 3 +- .../doltcore/sqle/dfunctions/dolt_pull.go | 8 +- go/store/datas/database.go | 8 + go/store/datas/database_common.go | 43 ++++- go/store/datas/puller.go | 9 +- go/store/nbs/store.go | 5 + integration-tests/bats/replication.bats | 54 ++++++ integration-tests/bats/sql-server.bats | 31 +++- 17 files changed, 520 insertions(+), 28 deletions(-) create mode 100644 go/libraries/doltcore/doltdb/commit_hooks.go create mode 100644 go/libraries/doltcore/doltdb/commit_hooks_test.go create mode 100644 integration-tests/bats/replication.bats diff --git a/go/cmd/dolt/commands/push.go b/go/cmd/dolt/commands/push.go index af2f7eea9f..f7fa0f8b74 100644 --- a/go/cmd/dolt/commands/push.go +++ b/go/cmd/dolt/commands/push.go @@ -185,10 +185,8 @@ func pullerProgFunc(ctx context.Context, pullerEventCh chan datas.PullerEvent) { uploadRate := "" for evt := range pullerEventCh { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } switch evt.EventType { case datas.NewLevelTWEvent: @@ -247,10 +245,8 @@ func progFunc(ctx context.Context, progChan chan datas.PullProgress) { lenPrinted := 0 done := false for !done { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } select { case <-ctx.Done(): diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 028f3907ac..5d30ec5eb4 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -238,6 +238,8 @@ func newSessionBuilder(sqlEngine *sqle.Engine, username string, email string, pr cli.PrintErr(err) return nil, nil, nil, err } + + db.GetDoltDB().SetCommitHookLogger(ctx, doltSess.GetLogger().Logger.Out) } return doltSess, ir, vr, nil diff --git a/go/cmd/dolt/dolt.go b/go/cmd/dolt/dolt.go index c61caf258c..13eb855b5c 100644 --- a/go/cmd/dolt/dolt.go +++ b/go/cmd/dolt/dolt.go @@ -253,7 +253,6 @@ func runMain() int { } root, err := env.GetCurrentUserHomeDir() - if err != nil { cli.PrintErrln(color.RedString("Failed to load the HOME directory: %v", err)) return 1 @@ -300,6 +299,10 @@ func runMain() int { defer tempfiles.MovableTempFileProvider.Clean() + if dEnv.DoltDB != nil { + dEnv.DoltDB.SetCommitHookLogger(ctx, cli.OutStream) + } + start := time.Now() res := doltCommand.Exec(ctx, "dolt", args, dEnv) diff --git a/go/libraries/doltcore/doltdb/commit_hooks.go b/go/libraries/doltcore/doltdb/commit_hooks.go new file mode 100644 index 0000000000..94e0e4597d --- /dev/null +++ b/go/libraries/doltcore/doltdb/commit_hooks.go @@ -0,0 +1,157 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package doltdb + +import ( + "context" + "io" + "sync" + + "github.com/dolthub/dolt/go/libraries/doltcore/ref" + + "github.com/dolthub/dolt/go/store/datas" +) + +const BackupToRemoteKey = "DOLT_BACKUP_TO_REMOTE" + +type ReplicateHook struct { + destDB datas.Database + tmpDir string + outf io.Writer +} + +// NewReplicateHook creates a ReplicateHook, parameterizaed by the backup database +// and a local tempfile for pushing +func NewReplicateHook(destDB *DoltDB, tmpDir string) *ReplicateHook { + return &ReplicateHook{destDB: destDB.db, tmpDir: tmpDir} +} + +// Execute implements datas.CommitHook, replicates head updates to the destDb field +func (rh *ReplicateHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { + return replicate(ctx, rh.destDB, db, rh.tmpDir, ds) +} + +// HandleError implements datas.CommitHook +func (rh *ReplicateHook) HandleError(ctx context.Context, err error) error { + if rh.outf != nil { + rh.outf.Write([]byte(err.Error())) + } + return nil +} + +// SetLogger implements datas.CommitHook +func (rh *ReplicateHook) SetLogger(ctx context.Context, wr io.Writer) error { + rh.outf = wr + return nil +} + +// replicate pushes a dataset from srcDB to destDB and force sets the destDB ref to the new dataset value +func replicate(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error { + stRef, ok, err := ds.MaybeHeadRef() + if err != nil { + return err + } + if !ok { + // No head ref, return + return nil + } + + rf, err := ref.Parse(ds.ID()) + if err != nil { + return err + } + + newCtx, cancelFunc := context.WithCancel(ctx) + wg, progChan, pullerEventCh := runProgFuncs(newCtx) + defer stopProgFuncs(cancelFunc, wg, progChan, pullerEventCh) + puller, err := datas.NewPuller(ctx, tempTableDir, defaultChunksPerTF, srcDB, destDB, stRef.TargetHash(), pullerEventCh) + if err == datas.ErrDBUpToDate { + return nil + } else if err != nil { + return err + } + + err = puller.Pull(ctx) + if err != nil { + return err + } + + if err != nil { + return err + } + + ds, err = destDB.GetDataset(ctx, rf.String()) + if err != nil { + return err + } + + _, err = destDB.SetHead(ctx, ds, stRef) + return err +} + +func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) { + for { + if ctx.Err() != nil { + return + } + select { + case <-ctx.Done(): + return + case <-pullerEventCh: + default: + } + } +} + +func progFunc(ctx context.Context, progChan <-chan datas.PullProgress) { + for { + if ctx.Err() != nil { + return + } + select { + case <-ctx.Done(): + return + case <-progChan: + default: + } + } +} + +func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan datas.PullProgress, chan datas.PullerEvent) { + pullerEventCh := make(chan datas.PullerEvent) + progChan := make(chan datas.PullProgress) + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + progFunc(ctx, progChan) + }() + + wg.Add(1) + go func() { + defer wg.Done() + pullerProgFunc(ctx, pullerEventCh) + }() + + return wg, progChan, pullerEventCh +} + +func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) { + cancel() + close(progChan) + close(pullerEventCh) + wg.Wait() +} diff --git a/go/libraries/doltcore/doltdb/commit_hooks_test.go b/go/libraries/doltcore/doltdb/commit_hooks_test.go new file mode 100644 index 0000000000..f9b11ff789 --- /dev/null +++ b/go/libraries/doltcore/doltdb/commit_hooks_test.go @@ -0,0 +1,149 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package doltdb + +import ( + "bytes" + "context" + "errors" + "path/filepath" + "testing" + + "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" + "github.com/dolthub/dolt/go/libraries/doltcore/ref" + "github.com/dolthub/dolt/go/libraries/utils/filesys" + "github.com/dolthub/dolt/go/libraries/utils/test" + "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/types" + + "github.com/stretchr/testify/assert" +) + +func TestReplicateHook(t *testing.T) { + ctx := context.Background() + + // destination repo + testDir, err := test.ChangeToTestDir("TestReplicationDest") + + if err != nil { + panic("Couldn't change the working directory to the test directory.") + } + + committerName := "Bill Billerson" + committerEmail := "bigbillieb@fake.horse" + + tmpDir := filepath.Join(testDir, dbfactory.DoltDataDir) + err = filesys.LocalFS.MkDirs(tmpDir) + + if err != nil { + t.Fatal("Failed to create noms directory") + } + + destDB, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS) + + // source repo + testDir, err = test.ChangeToTestDir("TestReplicationSource") + + if err != nil { + panic("Couldn't change the working directory to the test directory.") + } + + tmpDir = filepath.Join(testDir, dbfactory.DoltDataDir) + err = filesys.LocalFS.MkDirs(tmpDir) + + if err != nil { + t.Fatal("Failed to create noms directory") + } + + ddb, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS) + err = ddb.WriteEmptyRepo(context.Background(), "master", committerName, committerEmail) + + if err != nil { + t.Fatal("Unexpected error creating empty repo", err) + } + + // prepare a commit in the source repo + cs, _ := NewCommitSpec("master") + commit, err := ddb.Resolve(context.Background(), cs, nil) + + if err != nil { + t.Fatal("Couldn't find commit") + } + + meta, err := commit.GetCommitMeta() + assert.NoError(t, err) + + if meta.Name != committerName || meta.Email != committerEmail { + t.Error("Unexpected metadata") + } + + root, err := commit.GetRootValue() + + assert.NoError(t, err) + + names, err := root.GetTableNames(context.Background()) + assert.NoError(t, err) + if len(names) != 0 { + t.Fatal("There should be no tables in empty db") + } + + tSchema := createTestSchema(t) + rowData, _ := createTestRowData(t, ddb.db, tSchema) + tbl, err := CreateTestTable(ddb.db, tSchema, rowData) + + if err != nil { + t.Fatal("Failed to create test table with data") + } + + root, err = root.PutTable(context.Background(), "test", tbl) + assert.NoError(t, err) + + valHash, err := ddb.WriteRootValue(context.Background(), root) + assert.NoError(t, err) + + meta, err = NewCommitMeta(committerName, committerEmail, "Sample data") + if err != nil { + t.Error("Failed to commit") + } + + // setup hook + hook := NewReplicateHook(destDB, tmpDir) + ddb.SetCommitHooks(ctx, []datas.CommitHook{hook}) + + t.Run("replicate to backup remote", func(t *testing.T) { + srcCommit, err := ddb.Commit(context.Background(), valHash, ref.NewBranchRef("master"), meta) + ds, err := ddb.db.GetDataset(ctx, "refs/heads/master") + err = hook.Execute(ctx, ds, ddb.db) + assert.NoError(t, err) + + cs, _ = NewCommitSpec("master") + destCommit, err := destDB.Resolve(context.Background(), cs, nil) + + srcHash, _ := srcCommit.HashOf() + destHash, _ := destCommit.HashOf() + assert.Equal(t, srcHash, destHash) + }) + + t.Run("replicate handle error logs to writer", func(t *testing.T) { + var buffer = &bytes.Buffer{} + err = hook.SetLogger(ctx, buffer) + assert.NoError(t, err) + + msg := "prince charles is a vampire" + hook.HandleError(ctx, errors.New(msg)) + + assert.Equal(t, buffer.String(), msg) + }) +} diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 60f7bedf82..45c6c7ccf8 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "io" "path/filepath" "strings" "time" @@ -1279,3 +1280,15 @@ func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB func (ddb *DoltDB) Clone(ctx context.Context, destDB *DoltDB, eventCh chan<- datas.TableFileEvent) error { return datas.Clone(ctx, ddb.db, destDB.db, eventCh) } + +func (ddb *DoltDB) SetCommitHooks(ctx context.Context, postHooks []datas.CommitHook) *DoltDB { + ddb.db = ddb.db.SetCommitHooks(ctx, postHooks) + return ddb +} + +func (ddb *DoltDB) SetCommitHookLogger(ctx context.Context, wr io.Writer) *DoltDB { + if ddb.db != nil { + ddb.db = ddb.db.SetCommitHookLogger(ctx, wr) + } + return ddb +} diff --git a/go/libraries/doltcore/env/actions/remotes.go b/go/libraries/doltcore/env/actions/remotes.go index 2deae6a7a5..c9fac9f392 100644 --- a/go/libraries/doltcore/env/actions/remotes.go +++ b/go/libraries/doltcore/env/actions/remotes.go @@ -20,12 +20,11 @@ import ( "fmt" "sync" - "github.com/dolthub/dolt/go/libraries/doltcore/remotestorage" - eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/ref" + "github.com/dolthub/dolt/go/libraries/doltcore/remotestorage" "github.com/dolthub/dolt/go/libraries/events" "github.com/dolthub/dolt/go/libraries/utils/earl" "github.com/dolthub/dolt/go/store/datas" diff --git a/go/libraries/doltcore/env/environment.go b/go/libraries/doltcore/env/environment.go index d9ac340e81..6693c41679 100644 --- a/go/libraries/doltcore/env/environment.go +++ b/go/libraries/doltcore/env/environment.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "errors" "fmt" + "os" "path/filepath" "runtime" "strings" @@ -38,6 +39,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" "github.com/dolthub/dolt/go/libraries/utils/config" "github.com/dolthub/dolt/go/libraries/utils/filesys" + "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" ) @@ -56,6 +58,34 @@ const ( tempTablesDir = "temptf" ) +func getCommitHooks(ctx context.Context, dEnv *DoltEnv) ([]datas.CommitHook, error) { + postCommitHooks := make([]datas.CommitHook, 0) + + backupName := os.Getenv(doltdb.BackupToRemoteKey) + if backupName != "" { + remotes, err := dEnv.GetRemotes() + if err != nil { + return nil, err + } + rem, ok := remotes[backupName] + if !ok { + return nil, ErrRemoteNotFound + } + ddb, err := rem.GetRemoteDB(ctx, types.Format_Default) + + if err != nil { + return nil, err + } + replicateHook := doltdb.NewReplicateHook(ddb, dEnv.TempTableFilesDir()) + if err != nil { + return nil, err + } + postCommitHooks = append(postCommitHooks, replicateHook) + } + + return postCommitHooks, nil +} + var zeroHashStr = (hash.Hash{}).String() var ErrPreexistingDoltDir = errors.New(".dolt dir already exists") @@ -158,6 +188,15 @@ func Load(ctx context.Context, hdp HomeDirProvider, fs filesys.Filesys, urlStr, } } + if dbLoadErr == nil { + postCommitHooks, dbLoadErr := getCommitHooks(ctx, dEnv) + if dbLoadErr != nil { + dEnv.DBLoadError = dbLoadErr + } else { + dEnv.DoltDB.SetCommitHooks(ctx, postCommitHooks) + } + } + return dEnv } diff --git a/go/libraries/doltcore/env/remotes.go b/go/libraries/doltcore/env/remotes.go index 0af18dc8b6..42e7f367d4 100644 --- a/go/libraries/doltcore/env/remotes.go +++ b/go/libraries/doltcore/env/remotes.go @@ -57,6 +57,17 @@ type Remote struct { dialer dbfactory.GRPCDialProvider } +func GetRemote(ctx context.Context, remoteName, remoteUrl string, params map[string]string, dialer dbfactory.GRPCDialProvider) (Remote, *doltdb.DoltDB, error) { + r := NewRemote(remoteName, remoteUrl, params, dialer) + ddb, err := r.GetRemoteDB(ctx, types.Format_Default) + + if err != nil { + return NoRemote, nil, err + } + + return r, ddb, nil +} + func NewRemote(name, url string, params map[string]string, dialer dbfactory.GRPCDialProvider) Remote { return Remote{name, url, []string{"refs/heads/*:refs/remotes/" + name + "/*"}, params, dialer} } diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 1abf4ee56d..a13643c263 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -40,7 +40,6 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/tracing" "github.com/dolthub/dolt/go/store/atomicerr" "github.com/dolthub/dolt/go/store/chunks" - "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nbs" "github.com/dolthub/dolt/go/store/types" @@ -66,7 +65,7 @@ var ErrInvalidDoltSpecPath = errors.New("invalid dolt spec path") var globalHttpFetcher HTTPFetcher = &http.Client{} var _ nbs.TableFileStore = (*DoltChunkStore)(nil) -var _ datas.NBSCompressedChunkStore = (*DoltChunkStore)(nil) +var _ nbs.NBSCompressedChunkStore = (*DoltChunkStore)(nil) var _ chunks.ChunkStore = (*DoltChunkStore)(nil) var _ chunks.LoggingChunkStore = (*DoltChunkStore)(nil) diff --git a/go/libraries/doltcore/sqle/dfunctions/dolt_pull.go b/go/libraries/doltcore/sqle/dfunctions/dolt_pull.go index 375cd7f7cf..b323ac14ef 100644 --- a/go/libraries/doltcore/sqle/dfunctions/dolt_pull.go +++ b/go/libraries/doltcore/sqle/dfunctions/dolt_pull.go @@ -155,10 +155,8 @@ func (d DoltPullFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) { func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) { for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } select { case <-ctx.Done(): @@ -171,10 +169,8 @@ func pullerProgFunc(ctx context.Context, pullerEventCh <-chan datas.PullerEvent) func progFunc(ctx context.Context, progChan <-chan datas.PullProgress) { for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } select { case <-ctx.Done(): diff --git a/go/store/datas/database.go b/go/store/datas/database.go index 8959125cf9..8f121d12c9 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -169,6 +169,14 @@ type Database interface { // level detail of the database that should infrequently be needed by // clients. chunkStore() chunks.ChunkStore + + // SetCommitHooks attaches a list of CommitHook that can be executed + // after CommitWithWorkingSet + SetCommitHooks(context.Context, []CommitHook) *database + + // WithCommitHookLogger passes an error handler from the user-facing session + // to a commit hook executed at the datas layer + SetCommitHookLogger(context.Context, io.Writer) *database } func NewDatabase(cs chunks.ChunkStore) Database { diff --git a/go/store/datas/database_common.go b/go/store/datas/database_common.go index a628b816df..e97c1a4e7d 100644 --- a/go/store/datas/database_common.go +++ b/go/store/datas/database_common.go @@ -25,6 +25,7 @@ import ( "context" "errors" "fmt" + "io" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/d" @@ -36,7 +37,8 @@ import ( type database struct { *types.ValueStore - rt rootTracker + rt rootTracker + postCommitHooks []CommitHook } var ( @@ -44,6 +46,16 @@ var ( ErrMergeNeeded = errors.New("dataset head is not ancestor of commit") ) +// CommitHook is an abstraction for executing arbitrary commands after atomic database commits +type CommitHook interface { + // Execute is arbitrary read-only function whose arguments are new Dataset commit into a specific Database + Execute(ctx context.Context, ds Dataset, db Database) error + // HandleError is an bridge function to handle Execute errors + HandleError(ctx context.Context, err error) error + // SetLogger lets clients specify an output stream for HandleError + SetLogger(ctx context.Context, wr io.Writer) error +} + // TODO: fix panics // rootTracker is a narrowing of the ChunkStore interface, to keep Database disciplined about working directly with Chunks type rootTracker interface { @@ -771,6 +783,8 @@ func (db *database) CommitWithWorkingSet( return Dataset{}, Dataset{}, err } + db.callCommitHooks(ctx, commitDS) + return commitDS, workingSetDS, nil } @@ -963,10 +977,35 @@ func buildNewCommit(ctx context.Context, ds Dataset, v types.Value, opts CommitO func (db *database) doHeadUpdate(ctx context.Context, ds Dataset, updateFunc func(ds Dataset) error) (Dataset, error) { err := updateFunc(ds) - if err != nil { return Dataset{}, err } return db.GetDataset(ctx, ds.ID()) } + +func (db *database) SetCommitHooks(ctx context.Context, postHooks []CommitHook) *database { + db.postCommitHooks = postHooks + return db +} + +func (db *database) SetCommitHookLogger(ctx context.Context, wr io.Writer) *database { + for _, h := range db.postCommitHooks { + h.SetLogger(ctx, wr) + } + return db +} + +func (db *database) PostCommitHooks() []CommitHook { + return db.postCommitHooks +} + +func (db *database) callCommitHooks(ctx context.Context, ds Dataset) { + var err error + for _, hook := range db.postCommitHooks { + err = hook.Execute(ctx, ds, db) + if err != nil { + hook.HandleError(ctx, err) + } + } +} diff --git a/go/store/datas/puller.go b/go/store/datas/puller.go index 523af57f24..36913e0224 100644 --- a/go/store/datas/puller.go +++ b/go/store/datas/puller.go @@ -59,17 +59,12 @@ type CmpChnkAndRefs struct { refs map[hash.Hash]int } -type NBSCompressedChunkStore interface { - chunks.ChunkStore - GetManyCompressed(context.Context, hash.HashSet, func(nbs.CompressedChunk)) error -} - // Puller is used to sync data between to Databases type Puller struct { fmt *types.NomsBinFormat srcDB Database - srcChunkStore NBSCompressedChunkStore + srcChunkStore nbs.NBSCompressedChunkStore sinkDBCS chunks.ChunkStore rootChunkHash hash.Hash downloaded hash.HashSet @@ -158,7 +153,7 @@ func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcDB, sink return nil, fmt.Errorf("cannot pull from src to sink; src version is %v and sink version is %v", srcDB.chunkStore().Version(), sinkDB.chunkStore().Version()) } - srcChunkStore, ok := srcDB.chunkStore().(NBSCompressedChunkStore) + srcChunkStore, ok := srcDB.chunkStore().(nbs.NBSCompressedChunkStore) if !ok { return nil, ErrIncompatibleSourceChunkStore } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 454c7f19d2..3ce18484ef 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -81,6 +81,11 @@ func makeGlobalCaches() { makeManifestManager = func(m manifest) manifestManager { return manifestManager{m, manifestCache, manifestLocks} } } +type NBSCompressedChunkStore interface { + chunks.ChunkStore + GetManyCompressed(context.Context, hash.HashSet, func(CompressedChunk)) error +} + type NomsBlockStore struct { mm manifestManager p tablePersister diff --git a/integration-tests/bats/replication.bats b/integration-tests/bats/replication.bats new file mode 100644 index 0000000000..44c3edc337 --- /dev/null +++ b/integration-tests/bats/replication.bats @@ -0,0 +1,54 @@ +#!/usr/bin/env bats +load $BATS_TEST_DIRNAME/helper/common.bash + +setup() { + setup_common + TMPDIRS=$(pwd)/tmpdirs + mkdir -p $TMPDIRS/{bac1,repo1} + + # repo1 -> bac1 -> repo2 + cd $TMPDIRS/repo1 + dolt init + dolt branch feature + dolt remote add backup1 file://../bac1 + cd $TMPDIRS +} + +teardown() { + teardown_common + rm -rf $TMPDIRS + cd $BATS_TMPDIR +} + +@test "replication: default no replication" { + cd repo1 + dolt sql -q "create table t1 (a int primary key)" + dolt commit -am "cm" + + [ ! -d "../bac1/.dolt" ] || false +} + +@test "replication: push on commit" { + export DOLT_BACKUP_TO_REMOTE=backup1 + cd repo1 + dolt remote -v + dolt sql -q "create table t1 (a int primary key)" + dolt commit -am "cm" + + cd .. + dolt clone file://./bac1 repo2 + export DOLT_BACKUP_TO_REMOTE= + cd repo2 + run dolt ls + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 2 ] + [[ "$output" =~ "t1" ]] || false +} + +@test "replication: no tags" { + export DOLT_BACKUP_TO_REMOTE=backup1 + cd repo1 + dolt tag + + [ ! -d "../bac1/.dolt" ] || false +} diff --git a/integration-tests/bats/sql-server.bats b/integration-tests/bats/sql-server.bats index 19a32eecbe..a2a1912cdd 100644 --- a/integration-tests/bats/sql-server.bats +++ b/integration-tests/bats/sql-server.bats @@ -1038,8 +1038,6 @@ while True: dolt remote add origin file://../rem1 start_sql_server repo1 - dolt status - dolt branch dolt push origin master run server_query repo1 1 "select dolt_push() as p" "p\n0" [ "$status" -eq 1 ] @@ -1050,3 +1048,32 @@ while True: skip "In-memory branch doesn't track upstream correctly" server_query repo1 1 "select dolt_push() as p" "p\n1" } + +@test "sql-server: replicate to backup after sql-session commit" { + skiponwindows "Has dependencies that are missing on the Jenkins Windows installation." + + mkdir bac1 + cd repo1 + dolt remote add backup1 file://../bac1 + export DOLT_BACKUP_TO_REMOTE=backup1 + start_sql_server repo1 + + multi_query repo1 1 " + CREATE TABLE test ( + pk int primary key + ); + INSERT INTO test VALUES (0),(1),(2); + SELECT DOLT_ADD('.'); + SELECT DOLT_COMMIT('-m', 'Step 1');" + + cd .. + dolt clone file://./bac1 repo3 + cd repo3 + export DOLT_BACKUP_TO_REMOTE= + run dolt sql -q "select * from test" -r csv + [ "$status" -eq 0 ] + [[ "${lines[0]}" =~ "pk" ]] + [[ "${lines[1]}" =~ "0" ]] + [[ "${lines[2]}" =~ "1" ]] + [[ "${lines[3]}" =~ "2" ]] +} From 4a2070a4664ac71afd257ef007c3478798e6d1ff Mon Sep 17 00:00:00 2001 From: reltuk Date: Thu, 23 Sep 2021 21:36:09 +0000 Subject: [PATCH 5/5] [ga-bump-dep] Bump dependency in Dolt by reltuk --- go/go.mod | 2 +- go/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/go.mod b/go/go.mod index 4673d831f6..33034dd4a2 100644 --- a/go/go.mod +++ b/go/go.mod @@ -23,7 +23,7 @@ require ( github.com/denisbrodbeck/machineid v1.0.1 github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078 github.com/dolthub/fslock v0.0.3 - github.com/dolthub/go-mysql-server v0.10.1-0.20210916214046-f178c6b9d470 + github.com/dolthub/go-mysql-server v0.10.1-0.20210923213428-67d872abc5ef github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 diff --git a/go/go.sum b/go/go.sum index 6736b6697c..6199be282a 100644 --- a/go/go.sum +++ b/go/go.sum @@ -144,8 +144,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U= github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0= -github.com/dolthub/go-mysql-server v0.10.1-0.20210916214046-f178c6b9d470 h1:NCX+0jWQ4UjaHCp+kxponOzzNR5OUUvlbnzlsusbNok= -github.com/dolthub/go-mysql-server v0.10.1-0.20210916214046-f178c6b9d470/go.mod h1:4tCPtzL2cJKkVZcmhD+MEE5SGAj1lBNvZ0mRPNLwKwY= +github.com/dolthub/go-mysql-server v0.10.1-0.20210923213428-67d872abc5ef h1:N/sF06RM8pkFtuPWxbml8mUD9BxjO9cBYgwZ/ohpOCQ= +github.com/dolthub/go-mysql-server v0.10.1-0.20210923213428-67d872abc5ef/go.mod h1:4tCPtzL2cJKkVZcmhD+MEE5SGAj1lBNvZ0mRPNLwKwY= github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 h1:0ol5pj+QlKUKAtqs1LiPM3ZJKs+rHPgLSsMXmhTrCAM= github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms= github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 h1:WRPDbpJWEnPxPmiuOTndT+lUWUeGjx6eoNOK9O4tQQQ=