Merge branch 'main' into fulghum/binlog-replication

This commit is contained in:
Jason Fulghum
2023-03-20 14:35:59 -07:00
46 changed files with 1774 additions and 595 deletions

View File

@@ -22,7 +22,6 @@ import (
"strconv"
"strings"
textdiff "github.com/andreyvit/diff"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
@@ -43,7 +42,6 @@ import (
type diffOutput int
type diffPart int
type diffMode int
const (
SchemaOnlyDiff diffPart = 1 // 0b0001
@@ -63,7 +61,6 @@ const (
SummaryFlag = "summary"
whereParam = "where"
limitParam = "limit"
SQLFlag = "sql"
SkinnyFlag = "skinny"
MergeBase = "merge-base"
DiffMode = "diff-mode"
@@ -526,21 +523,27 @@ func diffUserTables(ctx context.Context, dEnv *env.DoltEnv, dArgs *diffArgs) err
return errhand.VerboseErrorFromError(err)
}
doltSchemasChanged := false
for _, td := range tableDeltas {
if !dArgs.tableSet.Contains(td.FromName) && !dArgs.tableSet.Contains(td.ToName) {
if !shouldPrintTableDelta(dArgs.tableSet, td) {
continue
}
if strings.ToLower(td.ToName) != doltdb.SchemasTableName {
if isDoltSchemasTable(td) {
// save dolt_schemas table diff for last in diff output
doltSchemasChanged = true
} else {
verr := diffUserTable(sqlCtx, td, sqlEng, dArgs, dw)
if verr != nil {
return verr
}
} else {
// dolt_schemas table is treated as a special case. diff the rows of the table, and print fragments as DDL
verr := diffDoltSchemasTable(sqlCtx, td, sqlEng, dArgs, dw)
if verr != nil {
return verr
}
}
}
if doltSchemasChanged {
verr := diffDoltSchemasTable(sqlCtx, sqlEng, dArgs, dw)
if verr != nil {
return verr
}
}
@@ -552,6 +555,15 @@ func diffUserTables(ctx context.Context, dEnv *env.DoltEnv, dArgs *diffArgs) err
return nil
}
func shouldPrintTableDelta(tablesToPrint *set.StrSet, td diff.TableDelta) bool {
// TODO: this should be case insensitive
return tablesToPrint.Contains(td.FromName) || tablesToPrint.Contains(td.ToName)
}
func isDoltSchemasTable(td diff.TableDelta) bool {
return td.FromName == doltdb.SchemasTableName || td.ToName == doltdb.SchemasTableName
}
func diffUserTable(
ctx *sql.Context,
td diff.TableDelta,
@@ -581,7 +593,7 @@ func diffUserTable(
}
if dArgs.diffParts&SchemaOnlyDiff != 0 {
err := dw.WriteSchemaDiff(ctx, dArgs.toRoot, td)
err := dw.WriteTableSchemaDiff(ctx, dArgs.toRoot, td)
if err != nil {
return errhand.VerboseErrorFromError(err)
}
@@ -603,17 +615,14 @@ func diffUserTable(
func diffDoltSchemasTable(
sqlCtx *sql.Context,
td diff.TableDelta,
sqlEng *engine.SqlEngine,
dArgs *diffArgs,
dw diffWriter,
) errhand.VerboseError {
err := dw.BeginTable(sqlCtx, td)
if err != nil {
return errhand.VerboseErrorFromError(err)
}
query := fmt.Sprintf("select from_fragment,to_fragment from dolt_diff('%s','%s','%s')", dArgs.fromRef, dArgs.toRef, doltdb.SchemasTableName)
query := fmt.Sprintf("select from_name,to_name,from_type,to_type,from_fragment,to_fragment "+
"from dolt_diff('%s','%s','%s') "+
"order by coalesce(from_type, to_type), coalesce(from_name, to_name)",
dArgs.fromRef, dArgs.toRef, doltdb.SchemasTableName)
_, rowIter, err := sqlEng.Query(sqlCtx, query)
if err != nil {
@@ -629,30 +638,52 @@ func diffDoltSchemasTable(
return errhand.VerboseErrorFromError(err)
}
from := ""
var fragmentName string
if row[0] != nil {
from = fmt.Sprintf("%v;", row[0])
fragmentName = row[0].(string)
} else {
fragmentName = row[1].(string)
}
to := ""
if row[1] != nil {
to = fmt.Sprintf("%v;", row[1])
var fragmentType string
if row[2] != nil {
fragmentType = row[2].(string)
} else {
fragmentType = row[3].(string)
}
if from != to {
cli.Println(textdiff.LineDiff(from, to))
var oldFragment string
var newFragment string
if row[4] != nil {
oldFragment = row[4].(string)
// Typically schema fragements have the semicolons stripped, so put it back on
if len(oldFragment) > 0 && oldFragment[len(oldFragment)-1] != ';' {
oldFragment += ";"
}
}
if row[5] != nil {
newFragment = row[5].(string)
// Typically schema fragements have the semicolons stripped, so put it back on
if len(newFragment) > 0 && newFragment[len(newFragment)-1] != ';' {
newFragment += ";"
}
}
}
return nil
}
func writeSqlSchemaDiff(ctx context.Context, td diff.TableDelta, toSchemas map[string]schema.Schema) errhand.VerboseError {
ddlStatements, err := diff.SqlSchemaDiff(ctx, td, toSchemas)
if err != nil {
return errhand.VerboseErrorFromError(err)
}
for _, stmt := range ddlStatements {
cli.Println(stmt)
switch fragmentType {
case "view":
err := dw.WriteViewDiff(sqlCtx, fragmentName, oldFragment, newFragment)
if err != nil {
return nil
}
case "trigger":
err := dw.WriteTriggerDiff(sqlCtx, fragmentName, oldFragment, newFragment)
if err != nil {
return nil
}
default:
cli.PrintErrf("Unrecognized schema element type: %s", fragmentType)
continue
}
}
return nil

View File

@@ -16,6 +16,7 @@ package commands
import (
"context"
ejson "encoding/json"
"fmt"
"io"
@@ -42,8 +43,12 @@ import (
type diffWriter interface {
// BeginTable is called when a new table is about to be written, before any schema or row diffs are written
BeginTable(ctx context.Context, td diff.TableDelta) error
// WriteSchemaDiff is called to write a schema diff for the table given (if requested by args)
WriteSchemaDiff(ctx context.Context, toRoot *doltdb.RootValue, td diff.TableDelta) error
// WriteTableSchemaDiff is called to write a schema diff for the table given (if requested by args)
WriteTableSchemaDiff(ctx context.Context, toRoot *doltdb.RootValue, td diff.TableDelta) error
// WriteTriggerDiff is called to write a trigger diff
WriteTriggerDiff(ctx context.Context, triggerName, oldDefn, newDefn string) error
// WriteViewDiff is called to write a view diff
WriteViewDiff(ctx context.Context, viewName, oldDefn, newDefn string) error
// RowWriter returns a row writer for the table delta provided, which will have Close() called on it when rows are
// done being written.
RowWriter(ctx context.Context, td diff.TableDelta, unionSch sql.Schema) (diff.SqlRowDiffWriter, error)
@@ -216,7 +221,7 @@ func (t tabularDiffWriter) BeginTable(ctx context.Context, td diff.TableDelta) e
return nil
}
func (t tabularDiffWriter) WriteSchemaDiff(ctx context.Context, toRoot *doltdb.RootValue, td diff.TableDelta) error {
func (t tabularDiffWriter) WriteTableSchemaDiff(ctx context.Context, toRoot *doltdb.RootValue, td diff.TableDelta) error {
fromSch, toSch, err := td.GetSchemas(ctx)
if err != nil {
return errhand.BuildDError("cannot retrieve schema for table %s", td.ToName).AddCause(err).Build()
@@ -266,6 +271,17 @@ func (t tabularDiffWriter) WriteSchemaDiff(ctx context.Context, toRoot *doltdb.R
return nil
}
func (t tabularDiffWriter) WriteTriggerDiff(ctx context.Context, triggerName, oldDefn, newDefn string) error {
// identical implementation
return t.WriteViewDiff(ctx, triggerName, oldDefn, newDefn)
}
func (t tabularDiffWriter) WriteViewDiff(ctx context.Context, viewName, oldDefn, newDefn string) error {
diffString := textdiff.LineDiff(oldDefn, newDefn)
cli.Println(diffString)
return nil
}
func (t tabularDiffWriter) RowWriter(ctx context.Context, td diff.TableDelta, unionSch sql.Schema) (diff.SqlRowDiffWriter, error) {
return tabular.NewFixedWidthDiffTableWriter(unionSch, iohelp.NopWrCloser(cli.CliOut), 100), nil
}
@@ -282,13 +298,50 @@ func (s sqlDiffWriter) BeginTable(ctx context.Context, td diff.TableDelta) error
return nil
}
func (s sqlDiffWriter) WriteSchemaDiff(ctx context.Context, toRoot *doltdb.RootValue, td diff.TableDelta) error {
func (s sqlDiffWriter) WriteTableSchemaDiff(ctx context.Context, toRoot *doltdb.RootValue, td diff.TableDelta) error {
toSchemas, err := toRoot.GetAllSchemas(ctx)
if err != nil {
return errhand.BuildDError("could not read schemas from toRoot").AddCause(err).Build()
}
return writeSqlSchemaDiff(ctx, td, toSchemas)
ddlStatements, err := diff.SqlSchemaDiff(ctx, td, toSchemas)
if err != nil {
return errhand.VerboseErrorFromError(err)
}
for _, stmt := range ddlStatements {
cli.Println(stmt)
}
return nil
}
func (s sqlDiffWriter) WriteTriggerDiff(ctx context.Context, triggerName, oldDefn, newDefn string) error {
// definitions will already be semicolon terminated, no need to add additional ones
if oldDefn == "" {
cli.Println(newDefn)
} else if newDefn == "" {
cli.Println(fmt.Sprintf("DROP TRIGGER %s;", sql.QuoteIdentifier(triggerName)))
} else {
cli.Println(fmt.Sprintf("DROP TRIGGER %s;", sql.QuoteIdentifier(triggerName)))
cli.Println(newDefn)
}
return nil
}
func (s sqlDiffWriter) WriteViewDiff(ctx context.Context, viewName, oldDefn, newDefn string) error {
// definitions will already be semicolon terminated, no need to add additional ones
if oldDefn == "" {
cli.Println(newDefn)
} else if newDefn == "" {
cli.Println(fmt.Sprintf("DROP VIEW %s;", sql.QuoteIdentifier(viewName)))
} else {
cli.Println(fmt.Sprintf("DROP VIEW %s;", sql.QuoteIdentifier(viewName)))
cli.Println(newDefn)
}
return nil
}
func (s sqlDiffWriter) RowWriter(ctx context.Context, td diff.TableDelta, unionSch sql.Schema) (diff.SqlRowDiffWriter, error) {
@@ -301,11 +354,12 @@ func (s sqlDiffWriter) RowWriter(ctx context.Context, td diff.TableDelta, unionS
}
type jsonDiffWriter struct {
wr io.WriteCloser
schemaDiffWriter diff.SchemaDiffWriter
rowDiffWriter diff.SqlRowDiffWriter
schemaDiffsWritten int
tablesWritten int
wr io.WriteCloser
schemaDiffWriter diff.SchemaDiffWriter
rowDiffWriter diff.SqlRowDiffWriter
tablesWritten int
triggersWritten int
viewsWritten int
}
var _ diffWriter = (*tabularDiffWriter)(nil)
@@ -316,12 +370,27 @@ func newJsonDiffWriter(wr io.WriteCloser) (*jsonDiffWriter, error) {
}, nil
}
const tablesHeader = `"tables":[`
const jsonDiffTableHeader = `{"name":"%s","schema_diff":`
const jsonDiffFooter = `}]}`
const jsonDiffDataDiffHeader = `],"data_diff":[`
const jsonDataDiffFooter = `}]`
func (j *jsonDiffWriter) beginDocumentIfNecessary() error {
if j.tablesWritten == 0 && j.triggersWritten == 0 && j.viewsWritten == 0 {
_, err := j.wr.Write([]byte("{"))
return err
}
return nil
}
func (j *jsonDiffWriter) BeginTable(ctx context.Context, td diff.TableDelta) error {
if j.schemaDiffWriter == nil {
err := iohelp.WriteAll(j.wr, []byte(`{"tables":[`))
err := j.beginDocumentIfNecessary()
if err != nil {
return err
}
if j.tablesWritten == 0 {
err := iohelp.WriteAll(j.wr, []byte(tablesHeader))
if err != nil {
return err
}
@@ -332,7 +401,12 @@ func (j *jsonDiffWriter) BeginTable(ctx context.Context, td diff.TableDelta) err
}
}
err := iohelp.WriteAll(j.wr, []byte(fmt.Sprintf(jsonDiffTableHeader, td.ToName)))
tableName := td.FromName
if len(tableName) == 0 {
tableName = td.ToName
}
err = iohelp.WriteAll(j.wr, []byte(fmt.Sprintf(jsonDiffTableHeader, tableName)))
if err != nil {
return err
}
@@ -343,7 +417,7 @@ func (j *jsonDiffWriter) BeginTable(ctx context.Context, td diff.TableDelta) err
return err
}
func (j *jsonDiffWriter) WriteSchemaDiff(ctx context.Context, toRoot *doltdb.RootValue, td diff.TableDelta) error {
func (j *jsonDiffWriter) WriteTableSchemaDiff(ctx context.Context, toRoot *doltdb.RootValue, td diff.TableDelta) error {
toSchemas, err := toRoot.GetAllSchemas(ctx)
if err != nil {
return errhand.BuildDError("could not read schemas from toRoot").AddCause(err).Build()
@@ -366,7 +440,7 @@ func (j *jsonDiffWriter) WriteSchemaDiff(ctx context.Context, toRoot *doltdb.Roo
func (j *jsonDiffWriter) RowWriter(ctx context.Context, td diff.TableDelta, unionSch sql.Schema) (diff.SqlRowDiffWriter, error) {
// close off the schema diff block, start the data block
err := iohelp.WriteAll(j.wr, []byte(`],"data_diff":[`))
err := iohelp.WriteAll(j.wr, []byte(jsonDiffDataDiffHeader))
if err != nil {
return nil, err
}
@@ -390,9 +464,133 @@ func (j *jsonDiffWriter) RowWriter(ctx context.Context, td diff.TableDelta, unio
return j.rowDiffWriter, err
}
func (j *jsonDiffWriter) WriteTriggerDiff(ctx context.Context, triggerName, oldDefn, newDefn string) error {
err := j.beginDocumentIfNecessary()
if err != nil {
return err
}
if j.triggersWritten == 0 {
// end the table if necessary
if j.tablesWritten > 0 {
_, err := j.wr.Write([]byte(jsonDataDiffFooter + ","))
if err != nil {
return err
}
}
_, err := j.wr.Write([]byte(`"triggers":[`))
if err != nil {
return err
}
} else {
_, err := j.wr.Write([]byte(","))
if err != nil {
return err
}
}
triggerNameBytes, err := ejson.Marshal(triggerName)
if err != nil {
return err
}
oldDefnBytes, err := ejson.Marshal(oldDefn)
if err != nil {
return err
}
newDefnBytes, err := ejson.Marshal(newDefn)
if err != nil {
return err
}
_, err = j.wr.Write([]byte(fmt.Sprintf(`{"name":%s,"from_definition":%s,"to_definition":%s}`,
triggerNameBytes, oldDefnBytes, newDefnBytes)))
if err != nil {
return err
}
j.triggersWritten++
return nil
}
func (j *jsonDiffWriter) WriteViewDiff(ctx context.Context, viewName, oldDefn, newDefn string) error {
err := j.beginDocumentIfNecessary()
if err != nil {
return err
}
if j.viewsWritten == 0 {
// end the previous block if necessary
if j.tablesWritten > 0 && j.triggersWritten == 0 {
_, err := j.wr.Write([]byte(jsonDataDiffFooter + ","))
if err != nil {
return err
}
} else if j.triggersWritten > 0 {
_, err := j.wr.Write([]byte("],"))
if err != nil {
return err
}
}
}
if j.viewsWritten == 0 {
_, err := j.wr.Write([]byte(`"views":[`))
if err != nil {
return err
}
} else {
_, err := j.wr.Write([]byte(","))
if err != nil {
return err
}
}
viewNameBytes, err := ejson.Marshal(viewName)
if err != nil {
return err
}
oldDefnBytes, err := ejson.Marshal(oldDefn)
if err != nil {
return err
}
newDefnBytes, err := ejson.Marshal(newDefn)
if err != nil {
return err
}
_, err = j.wr.Write([]byte(fmt.Sprintf(`{"name":%s,"from_definition":%s,"to_definition":%s}`,
viewNameBytes, oldDefnBytes, newDefnBytes)))
if err != nil {
return err
}
j.viewsWritten++
return nil
}
func (j *jsonDiffWriter) Close(ctx context.Context) error {
if j.tablesWritten > 0 {
err := iohelp.WriteLine(j.wr, jsonDiffFooter)
if j.tablesWritten > 0 || j.triggersWritten > 0 || j.viewsWritten > 0 {
// We only need to close off the "tables" array if we didn't also write a view / trigger
// (which also closes that array)
if j.triggersWritten == 0 && j.viewsWritten == 0 {
_, err := j.wr.Write([]byte(jsonDataDiffFooter))
if err != nil {
return err
}
} else {
// if we did write a trigger or view, we need to close off that array
_, err := j.wr.Write([]byte("]"))
if err != nil {
return err
}
}
err := iohelp.WriteLine(j.wr, "}")
if err != nil {
return err
}

View File

@@ -110,7 +110,7 @@ func (cmd GarbageCollectionCmd) Exec(ctx context.Context, commandStr string, arg
return HandleVErrAndExitCode(verr, usage)
}
err = dEnv.DoltDB.GC(ctx)
err = dEnv.DoltDB.GC(ctx, nil)
if err != nil {
if errors.Is(err, chunks.ErrNothingToCollect) {
cli.PrintErrln(color.YellowString("Nothing to collect."))

View File

@@ -75,6 +75,11 @@ func (cmd SendMetricsCmd) ArgParser() *argparser.ArgParser {
// Exec is the implementation of the command that flushes the events to the grpc service
// Exec executes the command
func (cmd SendMetricsCmd) Exec(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
if dEnv.DoltDB != nil { // see go/cmd/dolt/dolt.go:interceptSendMetrics()
cli.PrintErrln("expected DoltEnv without DoltDB")
return 1
}
ap := cmd.ArgParser()
help, _ := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, cli.CommandDocumentationContent{ShortDesc: sendMetricsShortDesc}, ap))

View File

@@ -57,7 +57,7 @@ import (
)
const (
Version = "0.75.3"
Version = "0.75.4"
)
var dumpDocsCommand = &commands.DumpDocsCmd{}
@@ -308,6 +308,10 @@ func runMain() int {
warnIfMaxFilesTooLow()
ctx := context.Background()
if ok, exit := interceptSendMetrics(ctx, args); ok {
return exit
}
dEnv := env.Load(ctx, env.GetCurrentUserHomeDir, filesys.LocalFS, doltdb.LocalDirDoltDB, Version)
dEnv.IgnoreLockFile = ignoreLockFile
@@ -447,3 +451,11 @@ func processEventsDir(args []string, dEnv *env.DoltEnv) error {
return nil
}
func interceptSendMetrics(ctx context.Context, args []string) (bool, int) {
if len(args) < 1 || args[0] != commands.SendMetricsCommand {
return false, 0
}
dEnv := env.LoadWithoutDB(ctx, env.GetCurrentUserHomeDir, filesys.LocalFS, Version)
return true, doltCommand.Exec(ctx, "dolt", args, dEnv)
}

View File

@@ -58,7 +58,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/cespare/xxhash v1.1.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/go-mysql-server v0.14.1-0.20230315164027-6ed5b50a67be
github.com/dolthub/go-mysql-server v0.14.1-0.20230316190411-50a2e79d79aa
github.com/google/flatbuffers v2.0.6+incompatible
github.com/jmoiron/sqlx v1.3.4
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6

View File

@@ -166,8 +166,8 @@ github.com/dolthub/flatbuffers v1.13.0-dh.1 h1:OWJdaPep22N52O/0xsUevxJ6Qfw1M2txC
github.com/dolthub/flatbuffers v1.13.0-dh.1/go.mod h1:CorYGaDmXjHz1Z7i50PYXG1Ricn31GcA2wNOTFIQAKE=
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-mysql-server v0.14.1-0.20230315164027-6ed5b50a67be h1:pAY54vAbP2TmJhuV2nId7/IPO/+I2JSV1IogRj9RFWw=
github.com/dolthub/go-mysql-server v0.14.1-0.20230315164027-6ed5b50a67be/go.mod h1:Mo0dPxaaVFWQoxLRBH7UXKO2H6yHXq3dRmq4/vvARbI=
github.com/dolthub/go-mysql-server v0.14.1-0.20230316190411-50a2e79d79aa h1:Sw6IR/LHmTm2OueJjjftSbOH5kLEZVBfrWgpJ25ZsPg=
github.com/dolthub/go-mysql-server v0.14.1-0.20230316190411-50a2e79d79aa/go.mod h1:Mo0dPxaaVFWQoxLRBH7UXKO2H6yHXq3dRmq4/vvARbI=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=

View File

@@ -159,6 +159,14 @@ func GetTableDeltas(ctx context.Context, fromRoot, toRoot *doltdb.RootValue) (de
return nil, err
}
// Make sure we always return the same order of deltas
sort.Slice(deltas, func(i, j int) bool {
if deltas[i].FromName == deltas[j].FromName {
return deltas[i].ToName < deltas[j].ToName
}
return deltas[i].FromName < deltas[j].FromName
})
return deltas, nil
}
@@ -552,7 +560,6 @@ func fkSlicesAreEqual(from, to []doltdb.ForeignKey) bool {
// SqlSchemaDiff returns a slice of DDL statements that will transform the schema in the from delta to the schema in
// the to delta.
// TODO: this doesn't handle constraints or triggers
func SqlSchemaDiff(ctx context.Context, td TableDelta, toSchemas map[string]schema.Schema) ([]string, error) {
fromSch, toSch, err := td.GetSchemas(ctx)
if err != nil {

View File

@@ -1221,7 +1221,16 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
}
// GC performs garbage collection on this ddb.
func (ddb *DoltDB) GC(ctx context.Context) error {
//
// If |safepointF| is non-nil, it will be called at some point after the GC begins
// and before the GC ends. It will be called without
// Database/ValueStore/NomsBlockStore locks held. If should establish
// safepoints in every application-level in-progress read and write workflow
// against this DoltDB. Examples of doing this include, for example, blocking
// until no possibly-stale ChunkStore state is retained in memory, or failing
// certain in-progress operations which cannot be finalized in a timely manner,
// etc.
func (ddb *DoltDB) GC(ctx context.Context, safepointF func() error) error {
collector, ok := ddb.db.Database.(datas.GarbageCollector)
if !ok {
return fmt.Errorf("this database does not support garbage collection")
@@ -1265,7 +1274,7 @@ func (ddb *DoltDB) GC(ctx context.Context) error {
return err
}
return collector.GC(ctx, oldGen, newGen)
return collector.GC(ctx, oldGen, newGen, safepointF)
}
func (ddb *DoltDB) ShallowGC(ctx context.Context) error {

View File

@@ -16,227 +16,21 @@ package doltdb_test
import (
"context"
"fmt"
"io"
"testing"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/cmd/dolt/commands"
"github.com/dolthub/dolt/go/cmd/dolt/commands/engine"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures"
"github.com/dolthub/dolt/go/store/hash"
)
func TestConcurrentGC(t *testing.T) {
dprocedures.DoltGCFeatureFlag = true
// Each test spawns concurrent clients execute queries, some clients
// will trigger gc processes. When all clients finish, we run a final
// gc process to validate that no dangling references remain.
tests := []concurrentGCtest{
{
name: "smoke test",
setup: []string{"CREATE TABLE t (id int primary key)"},
clients: []client{
{
id: "client",
queries: func(id string, i int) (queries []string) {
return []string{
fmt.Sprintf("INSERT INTO t VALUES (%d)", i),
"SELECT COUNT(*) FROM t",
}
}},
},
},
{
name: "aaron's repro",
// create 32 branches
setup: func() []string {
queries := []string{
"CREATE TABLE t (id int primary key, val TEXT)",
"CALL dcommit('-Am', 'new table t');",
}
for b := 0; b < 32; b++ {
q := fmt.Sprintf("CALL dolt_checkout('-b', 'branch_%d');", b)
queries = append(queries, q)
}
return queries
}(),
// for each branch, create a single client that
// writes only to that branch
clients: func() []client {
cc := []client{{
id: "gc_client",
queries: func(string, int) []string {
return []string{"CALL dolt_gc();"}
},
}}
for b := 0; b < 32; b++ {
branch := fmt.Sprintf("branch_%d", b)
cc = append(cc, client{
id: branch,
queries: func(id string, idx int) []string {
q := fmt.Sprintf("INSERT INTO `%s/%s`.t VALUES (%d, '%s_%d')",
testDB, id, idx, id, idx)
return []string{q}
}})
}
return cc
}(),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testConcurrentGC(t, test)
})
}
}
// concurrentGCtest tests concurrent GC
type concurrentGCtest struct {
name string
setup []string
clients []client
}
type client struct {
id string
queries func(id string, idx int) []string
}
func testConcurrentGC(t *testing.T, test concurrentGCtest) {
ctx := context.Background()
eng := setupSqlEngine(t, ctx)
err := runWithSqlSession(ctx, eng, func(sctx *sql.Context, eng *engine.SqlEngine) error {
for _, q := range test.setup {
if err := execQuery(sctx, eng, q); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
eg, ectx := errgroup.WithContext(ctx)
for _, c := range test.clients {
cl := c
require.NotZero(t, cl.id)
eg.Go(func() error {
return runWithSqlSession(ectx, eng, func(sctx *sql.Context, eng *engine.SqlEngine) error {
defer func() {
if r := recover(); r != nil {
//t.Logf("panic in client %s: %v", cl.id, r)
}
}()
// generate and run 128 batches of queries
for i := 0; i < 128; i++ {
batch := cl.queries(cl.id, i)
for _, q := range batch {
qerr := execQuery(sctx, eng, q)
if qerr != nil {
// allow clients to error, but close connection
// todo: restrict errors to dangling refs
// t.Logf("error in client %s: %s", cl.id, qerr.Error())
return nil
}
}
}
return nil
})
})
}
require.NoError(t, eg.Wait())
// now run a full GC and assert we don't find dangling refs
err = runWithSqlSession(ctx, eng, func(sctx *sql.Context, eng *engine.SqlEngine) (err error) {
qq := []string{
// ensure we have garbage to collect
"CREATE TABLE garbage (val int)",
"DROP TABLE garbage",
"CALL dolt_gc()",
}
for _, q := range qq {
if err = execQuery(sctx, eng, q); err != nil {
return err
}
}
return
})
require.NoError(t, err)
}
func runWithSqlSession(ctx context.Context, eng *engine.SqlEngine, cb func(sctx *sql.Context, eng *engine.SqlEngine) error) error {
sess, err := eng.NewDoltSession(ctx, sql.NewBaseSession())
if err != nil {
return err
}
sctx := sql.NewContext(ctx, sql.WithSession(sess))
sctx.SetCurrentDatabase(testDB)
sctx.Session.SetClient(sql.Client{User: "root", Address: "%"})
return cb(sctx, eng)
}
func execQuery(sctx *sql.Context, eng *engine.SqlEngine, query string) (err error) {
_, iter, err := eng.Query(sctx, query)
if err != nil {
return err
}
defer func() {
// tx commit
if cerr := iter.Close(sctx); err == nil {
err = cerr
}
}()
for {
_, err = iter.Next(sctx)
if err == io.EOF {
err = nil
break
} else if err != nil {
return err
}
}
return
}
const (
// DB name matches dtestutils.CreateTestEnv()
testDB = "dolt"
)
func setupSqlEngine(t *testing.T, ctx context.Context) (eng *engine.SqlEngine) {
dEnv := dtestutils.CreateTestEnv()
mrEnv, err := env.MultiEnvForDirectory(
ctx,
dEnv.Config.WriteableConfig(),
dEnv.FS,
dEnv.Version,
dEnv.IgnoreLockFile,
dEnv)
if err != nil {
panic(err)
}
eng, err = engine.NewSqlEngine(ctx, mrEnv, engine.FormatNull, &engine.SqlEngineConfig{
ServerUser: "root",
ServerHost: "localhost",
Autocommit: true,
})
if err != nil {
panic(err)
}
return
}
func TestGarbageCollection(t *testing.T) {
require.True(t, true)
assert.True(t, true)
@@ -331,7 +125,7 @@ func testGarbageCollection(t *testing.T, test gcTest) {
}
}
err := dEnv.DoltDB.GC(ctx)
err := dEnv.DoltDB.GC(ctx, nil)
require.NoError(t, err)
test.postGCFunc(ctx, t, dEnv.DoltDB, res)

View File

@@ -106,40 +106,46 @@ func (dEnv *DoltEnv) GetRemoteDB(ctx context.Context, format *types.NomsBinForma
}
}
// Load loads the DoltEnv for the .dolt directory determined by resolving the specified urlStr with the specified Filesys.
func Load(ctx context.Context, hdp HomeDirProvider, fs filesys.Filesys, urlStr string, version string) *DoltEnv {
func LoadWithoutDB(ctx context.Context, hdp HomeDirProvider, fs filesys.Filesys, version string) *DoltEnv {
cfg, cfgErr := LoadDoltCliConfig(hdp, fs)
repoState, rsErr := LoadRepoState(fs)
ddb, dbLoadErr := doltdb.LoadDoltDB(ctx, types.Format_Default, urlStr, fs)
dEnv := &DoltEnv{
Version: version,
Config: cfg,
CfgLoadErr: cfgErr,
RepoState: repoState,
RSLoadErr: rsErr,
DoltDB: ddb,
DBLoadError: dbLoadErr,
FS: fs,
urlStr: urlStr,
hdp: hdp,
}
if dEnv.RepoState != nil {
remotes := make(map[string]Remote, len(dEnv.RepoState.Remotes))
for n, r := range dEnv.RepoState.Remotes {
// deep copy remotes and backups ¯\_(ツ)_/¯ (see commit c59cbead)
if repoState != nil {
remotes := make(map[string]Remote, len(repoState.Remotes))
for n, r := range repoState.Remotes {
remotes[n] = r
}
dEnv.RepoState.Remotes = remotes
repoState.Remotes = remotes
backups := make(map[string]Remote, len(dEnv.RepoState.Backups))
for n, r := range dEnv.RepoState.Backups {
backups := make(map[string]Remote, len(repoState.Backups))
for n, r := range repoState.Backups {
backups[n] = r
}
dEnv.RepoState.Backups = backups
repoState.Backups = backups
}
return &DoltEnv{
Version: version,
Config: cfg,
CfgLoadErr: cfgErr,
RepoState: repoState,
RSLoadErr: rsErr,
FS: fs,
hdp: hdp,
}
}
// Load loads the DoltEnv for the .dolt directory determined by resolving the specified urlStr with the specified Filesys.
func Load(ctx context.Context, hdp HomeDirProvider, fs filesys.Filesys, urlStr string, version string) *DoltEnv {
dEnv := LoadWithoutDB(ctx, hdp, fs, version)
ddb, dbLoadErr := doltdb.LoadDoltDB(ctx, types.Format_Default, urlStr, fs)
dEnv.DoltDB = ddb
dEnv.DBLoadError = dbLoadErr
dEnv.urlStr = urlStr
if dbLoadErr == nil && dEnv.HasDoltDir() {
if !dEnv.HasDoltTempTableDir() {
tmpDir, err := dEnv.TempTableFilesDir()
@@ -172,7 +178,7 @@ func Load(ctx context.Context, hdp HomeDirProvider, fs filesys.Filesys, urlStr s
}
}
if rsErr == nil && dbLoadErr == nil {
if dEnv.RSLoadErr == nil && dbLoadErr == nil {
// If the working set isn't present in the DB, create it from the repo state. This step can be removed post 1.0.
_, err := dEnv.WorkingSet(ctx)
if errors.Is(err, doltdb.ErrWorkingSetNotFound) {

View File

@@ -106,6 +106,7 @@ func NewMerger(
}
// MergeTable merges schema and table data for the table tblName.
// TODO: this code will loop infinitely when merging certain schema changes
func (rm *RootMerger) MergeTable(ctx context.Context, tblName string, opts editor.Options, mergeOpts MergeOpts) (*doltdb.Table, *MergeStats, error) {
tm, err := rm.makeTableMerger(ctx, tblName)
if err != nil {

View File

@@ -118,7 +118,7 @@ func (c ChkConflict) String() string {
return ""
}
var ErrMergeWithDifferentPkSets = errors.New("error: cannot merge two tables with different primary key sets")
var ErrMergeWithDifferentPks = errors.New("error: cannot merge two tables with different primary keys")
// SchemaMerge performs a three-way merge of ourSch, theirSch, and ancSch.
func SchemaMerge(ctx context.Context, format *types.NomsBinFormat, ourSch, theirSch, ancSch schema.Schema, tblName string) (sch schema.Schema, sc SchemaConflict, err error) {
@@ -129,8 +129,8 @@ func SchemaMerge(ctx context.Context, format *types.NomsBinFormat, ourSch, their
// TODO: We'll remove this once it's possible to get diff and merge on different primary key sets
// TODO: decide how to merge different orders of PKS
if !schema.ArePrimaryKeySetsDiffable(format, ourSch, theirSch) {
return nil, SchemaConflict{}, ErrMergeWithDifferentPkSets
if !schema.ArePrimaryKeySetsDiffable(format, ourSch, theirSch) || !schema.ArePrimaryKeySetsDiffable(format, ourSch, ancSch) {
return nil, SchemaConflict{}, ErrMergeWithDifferentPks
}
var mergedCC *schema.ColCollection

View File

@@ -458,7 +458,7 @@ var mergeSchemaConflictTests = []mergeSchemaConflictTest{
{commands.CommitCmd{}, []string{"-m", "modified branch other"}},
{commands.CheckoutCmd{}, []string{env.DefaultInitBranch}},
},
expectedErr: merge.ErrMergeWithDifferentPkSets,
expectedErr: merge.ErrMergeWithDifferentPks,
},
}

View File

@@ -29,6 +29,7 @@ import (
)
func TestBlobStringConvertNomsValueToValue(t *testing.T) {
vrw := types.NewMemoryValueStore()
tests := []struct {
typ *blobStringType
input types.Blob
@@ -37,19 +38,19 @@ func TestBlobStringConvertNomsValueToValue(t *testing.T) {
}{
{
generateBlobStringType(t, 10),
mustBlobString(t, "0 "),
mustBlobString(t, vrw, "0 "),
"0 ",
false,
},
{
generateBlobStringType(t, 80),
mustBlobString(t, "this is some text that will be returned"),
mustBlobString(t, vrw, "this is some text that will be returned"),
"this is some text that will be returned",
false,
},
{
&blobStringType{gmstypes.CreateLongText(sql.Collation_Default)},
mustBlobString(t, " This is a sentence. "),
mustBlobString(t, vrw, " This is a sentence. "),
" This is a sentence. ",
false,
},
@@ -69,6 +70,7 @@ func TestBlobStringConvertNomsValueToValue(t *testing.T) {
}
func TestBlobStringConvertValueToNomsValue(t *testing.T) {
vrw := types.NewMemoryValueStore()
tests := []struct {
typ *blobStringType
input interface{}
@@ -78,32 +80,31 @@ func TestBlobStringConvertValueToNomsValue(t *testing.T) {
{
generateBlobStringType(t, 10),
"0 ",
mustBlobString(t, "0 "),
mustBlobString(t, vrw, "0 "),
false,
},
{
generateBlobStringType(t, 80),
int64(28354),
mustBlobString(t, "28354"),
mustBlobString(t, vrw, "28354"),
false,
},
{
&blobStringType{gmstypes.CreateLongText(sql.Collation_Default)},
float32(3724.75),
mustBlobString(t, "3724.75"),
mustBlobString(t, vrw, "3724.75"),
false,
},
{
generateBlobStringType(t, 80),
time.Date(2030, 1, 2, 4, 6, 3, 472382485, time.UTC),
mustBlobString(t, "2030-01-02 04:06:03.472382"),
mustBlobString(t, vrw, "2030-01-02 04:06:03.472382"),
false,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf(`%v %v`, test.typ.String(), test.input), func(t *testing.T) {
vrw := types.NewMemoryValueStore()
output, err := test.typ.ConvertValueToNomsValue(context.Background(), vrw, test.input)
if !test.expectedErr {
require.NoError(t, err)
@@ -116,6 +117,7 @@ func TestBlobStringConvertValueToNomsValue(t *testing.T) {
}
func TestBlobStringFormatValue(t *testing.T) {
vrw := types.NewMemoryValueStore()
tests := []struct {
typ *blobStringType
input types.Blob
@@ -124,19 +126,19 @@ func TestBlobStringFormatValue(t *testing.T) {
}{
{
generateBlobStringType(t, 10),
mustBlobString(t, "0 "),
mustBlobString(t, vrw, "0 "),
"0 ",
false,
},
{
generateBlobStringType(t, 80),
mustBlobString(t, "this is some text that will be returned"),
mustBlobString(t, vrw, "this is some text that will be returned"),
"this is some text that will be returned",
false,
},
{
&blobStringType{gmstypes.CreateLongText(sql.Collation_Default)},
mustBlobString(t, " This is a sentence. "),
mustBlobString(t, vrw, " This is a sentence. "),
" This is a sentence. ",
false,
},
@@ -156,6 +158,7 @@ func TestBlobStringFormatValue(t *testing.T) {
}
func TestBlobStringParseValue(t *testing.T) {
vrw := types.NewMemoryValueStore()
tests := []struct {
typ *blobStringType
input string
@@ -165,26 +168,25 @@ func TestBlobStringParseValue(t *testing.T) {
{
generateBlobStringType(t, 10),
"0 ",
mustBlobString(t, "0 "),
mustBlobString(t, vrw, "0 "),
false,
},
{
generateBlobStringType(t, 80),
"this is some text that will be returned",
mustBlobString(t, "this is some text that will be returned"),
mustBlobString(t, vrw, "this is some text that will be returned"),
false,
},
{
&blobStringType{gmstypes.CreateLongText(sql.Collation_Default)},
" This is a sentence. ",
mustBlobString(t, " This is a sentence. "),
mustBlobString(t, vrw, " This is a sentence. "),
false,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf(`%v %v`, test.typ.String(), test.input), func(t *testing.T) {
vrw := types.NewMemoryValueStore()
output, err := StringDefaultType.ConvertToType(context.Background(), vrw, test.typ, types.String(test.input))
if !test.expectedErr {
require.NoError(t, err)

View File

@@ -168,8 +168,7 @@ func generateBlobStringType(t *testing.T, length int64) *blobStringType {
return &blobStringType{gmstypes.MustCreateStringWithDefaults(sqltypes.Text, length)}
}
func mustBlobString(t *testing.T, str string) types.Blob {
vrw := types.NewMemoryValueStore()
func mustBlobString(t *testing.T, vrw types.ValueReadWriter, str string) types.Blob {
blob, err := types.NewBlob(context.Background(), vrw, strings.NewReader(str))
require.NoError(t, err)
return blob

View File

@@ -32,7 +32,8 @@ import (
func TestTypeInfoSuite(t *testing.T) {
t.Skip()
typeInfoArrays, validTypeValues := generateTypeInfoArrays(t)
vrw := types.NewMemoryValueStore()
typeInfoArrays, validTypeValues := generateTypeInfoArrays(t, vrw)
t.Run("VerifyArray", func(t *testing.T) {
verifyTypeInfoArrays(t, typeInfoArrays, validTypeValues)
})
@@ -343,7 +344,7 @@ func testTypeInfoConversionsExist(t *testing.T, tiArrays [][]TypeInfo) {
}
// generate unique TypeInfos for each type, and also values that are valid for at least one of the TypeInfos for the matching row
func generateTypeInfoArrays(t *testing.T) ([][]TypeInfo, [][]types.Value) {
func generateTypeInfoArrays(t *testing.T, vrw types.ValueReadWriter) ([][]TypeInfo, [][]types.Value) {
return [][]TypeInfo{
generateBitTypes(t, 16),
{&blobStringType{gmstypes.TinyText}, &blobStringType{gmstypes.Text},
@@ -377,8 +378,8 @@ func generateTypeInfoArrays(t *testing.T) ([][]TypeInfo, [][]types.Value) {
},
[][]types.Value{
{types.Uint(1), types.Uint(207), types.Uint(79147), types.Uint(34845728), types.Uint(9274618927)}, //Bit
{mustBlobString(t, ""), mustBlobString(t, "a"), mustBlobString(t, "abc"), //BlobString
mustBlobString(t, "abcdefghijklmnopqrstuvwxyz"), mustBlobString(t, "هذا هو بعض نماذج النص التي أستخدمها لاختبار عناصر")},
{mustBlobString(t, vrw, ""), mustBlobString(t, vrw, "a"), mustBlobString(t, vrw, "abc"), //BlobString
mustBlobString(t, vrw, "abcdefghijklmnopqrstuvwxyz"), mustBlobString(t, vrw, "هذا هو بعض نماذج النص التي أستخدمها لاختبار عناصر")},
{types.Bool(false), types.Bool(true)}, //Bool
{types.Timestamp(time.Date(1000, 1, 1, 0, 0, 0, 0, time.UTC)), //Datetime
types.Timestamp(time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC)),

View File

@@ -18,7 +18,9 @@ import (
"errors"
"fmt"
"os"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
@@ -32,12 +34,12 @@ const (
)
func init() {
if os.Getenv("DOLT_ENABLE_GC_PROCEDURE") != "" {
DoltGCFeatureFlag = true
if os.Getenv("DOLT_DISABLE_GC_PROCEDURE") != "" {
DoltGCFeatureFlag = false
}
}
var DoltGCFeatureFlag = false
var DoltGCFeatureFlag = true
// doltGC is the stored procedure to run online garbage collection on a database.
func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {
@@ -51,6 +53,8 @@ func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {
return rowToIter(int64(res)), nil
}
var ErrServerPerformedGC = errors.New("this connection was established when this server performed an online garbage collection. this connection can no longer be used. please reconnect.")
func doDoltGC(ctx *sql.Context, args []string) (int, error) {
dbName := ctx.GetCurrentDatabase()
@@ -82,7 +86,43 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
return cmdFailure, err
}
} else {
err = ddb.GC(ctx)
// TODO: If we got a callback at the beginning and an
// (allowed-to-block) callback at the end, we could more
// gracefully tear things down.
err = ddb.GC(ctx, func() error {
killed := make(map[uint32]struct{})
processes := ctx.ProcessList.Processes()
for _, p := range processes {
if p.Connection != ctx.Session.ID() {
// Kill any inflight query.
ctx.ProcessList.Kill(p.Connection)
// Tear down the connection itself.
ctx.KillConnection(p.Connection)
killed[p.Connection] = struct{}{}
}
}
// Look in processes until the connections are actually gone.
params := backoff.NewExponentialBackOff()
params.InitialInterval = 1 * time.Millisecond
params.MaxInterval = 25 * time.Millisecond
params.MaxElapsedTime = 3 * time.Second
err := backoff.Retry(func() error {
processes := ctx.ProcessList.Processes()
for _, p := range processes {
if _, ok := killed[p.Connection]; ok {
return errors.New("unable to establish safepoint.")
}
}
return nil
}, params)
if err != nil {
return err
}
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
return nil
})
if err != nil {
return cmdFailure, err
}

View File

@@ -708,3 +708,49 @@ var AddIndexScripts = []queries.ScriptTest{
},
},
}
var AddDropPrimaryKeysScripts = []queries.ScriptTest{
{
Name: "drop primary key blocked when foreign key present",
SetUpScript: []string{
"create table parent (a int primary key )",
"create table child (b int primary key, c int, key (c))",
"alter table child add constraint fk1 foreign key (c) references parent (a)",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "alter table parent drop primary key",
ExpectedErr: sql.ErrCantDropIndex,
},
},
},
{
Name: "drop primary key succeeds when foreign key present on other column",
SetUpScript: []string{
"create table parent (a int primary key, d int, key (d))",
"create table child (b int primary key, c int, key (c))",
"alter table child add constraint fk1 foreign key (c) references parent (d)",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "alter table parent drop primary key",
Expected: []sql.Row{{types.OkResult{RowsAffected: 0x0, InsertID: 0x0}}},
},
},
},
{
Name: "drop primary key succeeds when foreign key present on other table",
SetUpScript: []string{
"create table unrelated (a int primary key, d int)",
"create table parent (a int primary key)",
"create table child (b int primary key, c int, key (c))",
"alter table child add constraint fk1 foreign key (c) references parent (a)",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "alter table unrelated drop primary key",
Expected: []sql.Row{{types.OkResult{RowsAffected: 0x0, InsertID: 0x0}}},
},
},
},
}

View File

@@ -938,6 +938,13 @@ func TestDoltDdlScripts(t *testing.T) {
require.NoError(t, err)
enginetest.TestScriptWithEngine(t, e, harness, script)
}
// TODO: these scripts should be general enough to go in GMS
for _, script := range AddDropPrimaryKeysScripts {
e, err := harness.NewEngine(t)
require.NoError(t, err)
enginetest.TestScriptWithEngine(t, e, harness, script)
}
}
func TestBrokenDdlScripts(t *testing.T) {

View File

@@ -1428,7 +1428,7 @@ var Dolt1MergeScripts = []queries.ScriptTest{
Assertions: []queries.ScriptTestAssertion{
{
Query: "CALL DOLT_MERGE('right');",
ExpectedErrStr: "error: cannot merge two tables with different primary key sets",
ExpectedErrStr: "error: cannot merge two tables with different primary keys",
},
},
},

View File

@@ -102,16 +102,47 @@ type LoggingChunkStore interface {
SetLogger(logger DebugLogger)
}
// The sentinel error returned by BeginGC(addChunk) implementations
// indicating that the store must wait until the GC is over and then
// ensure that the attempted write makes it into the new data for the chunk
// store.
var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block")
// ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection.
type ChunkStoreGarbageCollector interface {
ChunkStore
// MarkAndSweepChunks expects |keepChunks| to receive the chunk hashes
// that should be kept in the chunk store. Once |keepChunks| is closed
// and MarkAndSweepChunks returns, the chunk store will only have the
// chunks sent on |keepChunks| and will have removed all other content
// from the ChunkStore.
MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error
// After BeginGC returns, every newly written chunk or root should be
// passed to the provided |addChunk| function. This behavior must
// persist until EndGC is called. MarkAndSweepChunks will only ever
// be called bracketed between a BeginGC and an EndGC call.
//
// If during processing the |addChunk| function returns the
// |true|, then the ChunkStore must block the write until |EndGC| is
// called. At that point, the ChunkStore is responsible for ensuring
// that the chunk which it was attempting to write makes it into chunk
// store.
//
// This function should not block indefinitely and should return an
// error if a GC is already in progress.
BeginGC(addChunk func(hash.Hash) bool) error
// EndGC indicates that the GC is over. The previously provided
// addChunk function must not be called after this function function.
EndGC()
// MarkAndSweepChunks is expected to read chunk addresses off of
// |hashes|, which represent chunks which should be copied into the
// provided |dest| store. Once |hashes| is closed,
// MarkAndSweepChunks is expected to update the contents of the store
// to only include the chunk whose addresses which were sent along on
// |hashes|.
//
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error
}
type PrefixChunkStore interface {

View File

@@ -23,6 +23,7 @@ package chunks
import (
"context"
"errors"
"fmt"
"sync"
@@ -50,19 +51,25 @@ func (ms *MemoryStorage) NewView() ChunkStore {
if version == "" {
version = constants.FormatLD1String
}
v := &MemoryStoreView{storage: ms, rootHash: ms.rootHash, version: version}
v.gcCond = sync.NewCond(&v.mu)
return &MemoryStoreView{storage: ms, rootHash: ms.rootHash, version: version}
return v
}
// NewViewWithFormat makes a MemoryStoreView with a specific NomsBinFormat.
func (ms *MemoryStorage) NewViewWithFormat(nbf string) ChunkStore {
return &MemoryStoreView{storage: ms, rootHash: ms.rootHash, version: nbf}
v := &MemoryStoreView{storage: ms, rootHash: ms.rootHash, version: nbf}
v.gcCond = sync.NewCond(&v.mu)
return v
}
// NewViewWithVersion vends a MemoryStoreView backed by this MemoryStorage. It's
// initialized with the currently "persisted" root. Uses the default format.
func (ms *MemoryStorage) NewViewWithDefaultFormat() ChunkStore {
return &MemoryStoreView{storage: ms, rootHash: ms.rootHash, version: constants.FormatDefaultString}
v := &MemoryStoreView{storage: ms, rootHash: ms.rootHash, version: constants.FormatDefaultString}
v.gcCond = sync.NewCond(&v.mu)
return v
}
// Get retrieves the Chunk with the Hash h, returning EmptyChunk if it's not
@@ -130,8 +137,13 @@ type MemoryStoreView struct {
pending map[hash.Hash]Chunk
pendingRefs hash.HashSet
rootHash hash.Hash
mu sync.RWMutex
version string
mu sync.RWMutex
isGC bool
gcCond *sync.Cond
keeperFunc func(hash.Hash) bool
version string
storage *MemoryStorage
}
@@ -221,6 +233,12 @@ func (ms *MemoryStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCb
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.keeperFunc != nil {
if ms.keeperFunc(c.Hash()) {
ms.waitForGC()
}
}
if ms.pendingRefs == nil {
ms.pendingRefs = addrs
} else {
@@ -255,9 +273,45 @@ func (ms *MemoryStoreView) Root(ctx context.Context) (hash.Hash, error) {
return ms.rootHash, nil
}
func (ms *MemoryStoreView) waitForGC() {
for ms.isGC {
ms.gcCond.Wait()
}
}
func (ms *MemoryStoreView) transitionToGC(keeperFunc func(hash.Hash) bool) error {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.isGC == true {
return errors.New("gc already in progress")
}
ms.isGC = true
ms.keeperFunc = keeperFunc
ms.gcCond.Broadcast()
return nil
}
func (ms *MemoryStoreView) transitionToNoGC() {
ms.mu.Lock()
defer ms.mu.Unlock()
if !ms.isGC {
panic("attempt to toggle GC to false when GC is not true")
}
ms.isGC = false
ms.keeperFunc = nil
ms.gcCond.Broadcast()
}
func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.keeperFunc != nil {
if ms.keeperFunc(current) {
ms.waitForGC()
}
}
if last != ms.rootHash {
return false, nil
}
@@ -275,21 +329,31 @@ func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash)
return success, nil
}
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error {
func (ms *MemoryStoreView) BeginGC(keeper func(hash.Hash) bool) error {
return ms.transitionToGC(keeper)
}
func (ms *MemoryStoreView) EndGC() {
ms.transitionToNoGC()
}
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error {
if dest != ms {
panic("unsupported")
}
if last != ms.rootHash {
return fmt.Errorf("last does not match ms.Root()")
ms.mu.Lock()
if !ms.isGC {
panic("MarkAndSweepChunks called without BeginGC")
}
ms.mu.Unlock()
keepers := make(map[hash.Hash]Chunk, ms.storage.Len())
LOOP:
for {
select {
case hs, ok := <-keepChunks:
case hs, ok := <-hashes:
if !ok {
break LOOP
}
@@ -305,6 +369,8 @@ LOOP:
}
}
ms.mu.Lock()
defer ms.mu.Unlock()
ms.storage = &MemoryStorage{rootHash: ms.rootHash, data: keepers}
ms.pending = map[hash.Hash]Chunk{}
return nil

View File

@@ -71,13 +71,28 @@ func (s *TestStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCb) e
return s.ChunkStore.Put(ctx, c, getAddrs)
}
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest ChunkStore) error {
func (s *TestStoreView) BeginGC(keeper func(hash.Hash) bool) error {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok {
return ErrUnsupportedOperation
}
return collector.BeginGC(keeper)
}
func (s *TestStoreView) EndGC() {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok {
panic(ErrUnsupportedOperation)
}
collector.EndGC()
}
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, last, keepChunks, collector)
return collector.MarkAndSweepChunks(ctx, hashes, collector)
}
func (s *TestStoreView) Reads() int {

View File

@@ -166,7 +166,7 @@ type GarbageCollector interface {
// GC traverses the database starting at the Root and removes
// all unreferenced data from persistent storage.
GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error
GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
}
// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all

View File

@@ -97,7 +97,7 @@ func (db *database) loadDatasetsNomsMap(ctx context.Context, rootHash hash.Hash)
}
if val == nil {
return types.EmptyMap, errors.New("Root hash doesn't exist")
return types.EmptyMap, fmt.Errorf("Root hash doesn't exist: %v", rootHash)
}
return val.(types.Map), nil
@@ -114,7 +114,7 @@ func (db *database) loadDatasetsRefmap(ctx context.Context, rootHash hash.Hash)
}
if val == nil {
return prolly.AddressMap{}, errors.New("Root hash doesn't exist")
return prolly.AddressMap{}, fmt.Errorf("Root hash doesn't exist: %v", rootHash)
}
return parse_storeroot([]byte(val.(types.SerialMessage)), db.nodeStore())
@@ -855,8 +855,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string) error {
}
// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
func (db *database) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error {
return db.ValueStore.GC(ctx, oldGenRefs, newGenRefs)
func (db *database) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
return db.ValueStore.GC(ctx, oldGenRefs, newGenRefs, safepointF)
}
func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error {

View File

@@ -129,26 +129,39 @@ func (j *chunkJournal) bootstrapJournalWriter(ctx context.Context) (err error) {
return err
}
var contents manifestContents
ok, contents, err = j.backing.ParseIfExists(ctx, &Stats{}, nil)
mc, err := trueUpBackingManifest(ctx, root, j.backing)
if err != nil {
return err
}
if ok {
// the journal file is the source of truth for the root hash, true-up persisted manifest
contents.root = root
contents, err = j.backing.Update(ctx, contents.lock, contents, &Stats{}, nil)
if err != nil {
return err
}
} else {
return fmt.Errorf("manifest not found when opening chunk journal")
}
j.contents = contents
j.contents = mc
return
}
// the journal file is the source of truth for the root hash, true-up persisted manifest
func trueUpBackingManifest(ctx context.Context, root hash.Hash, backing manifest) (manifestContents, error) {
ok, mc, err := backing.ParseIfExists(ctx, &Stats{}, nil)
if err != nil {
return manifestContents{}, err
} else if !ok {
return manifestContents{}, fmt.Errorf("manifest not found when opening chunk journal")
}
prev := mc.lock
next := generateLockHash(root, mc.specs, mc.appendix)
mc.lock = next
mc.root = root
mc, err = backing.Update(ctx, prev, mc, &Stats{}, nil)
if err != nil {
return manifestContents{}, err
} else if mc.lock != next {
return manifestContents{}, errOptimisticLockFailedTables
} else if mc.root != root {
return manifestContents{}, errOptimisticLockFailedRoot
}
return mc, nil
}
// Persist implements tablePersister.
func (j *chunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
if err := j.maybeInit(ctx); err != nil {

View File

@@ -73,10 +73,14 @@ func (s journalChunkSource) getMany(ctx context.Context, _ *errgroup.Group, reqs
var remaining bool
// todo: read planning
for i := range reqs {
if reqs[i].found {
continue
}
data, err := s.get(ctx, *reqs[i].a, stats)
if err != nil {
return false, err
} else if data != nil {
reqs[i].found = true
ch := chunks.NewChunkWithHash(hash.Hash(*reqs[i].a), data)
found(ctx, &ch)
} else {
@@ -90,12 +94,16 @@ func (s journalChunkSource) getManyCompressed(ctx context.Context, _ *errgroup.G
var remaining bool
// todo: read planning
for i := range reqs {
if reqs[i].found {
continue
}
cc, err := s.getCompressed(ctx, *reqs[i].a, stats)
if err != nil {
return false, err
} else if cc.IsEmpty() {
remaining = true
} else {
reqs[i].found = true
found(ctx, cc)
}
}

View File

@@ -71,8 +71,16 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() chunks.TableFileStoreOps {
return nbsMW.nbs.SupportedOperations()
}
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest chunks.ChunkStore) error {
return nbsMW.nbs.MarkAndSweepChunks(ctx, last, keepChunks, dest)
func (nbsMW *NBSMetricWrapper) BeginGC(keeper func(hash.Hash) bool) error {
return nbsMW.nbs.BeginGC(keeper)
}
func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) error {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest)
}
// PruneTableFiles deletes old table files that are no longer referenced in the manifest.

View File

@@ -59,7 +59,12 @@ func TestChunkStoreVersion(t *testing.T) {
}()
assert.Equal(constants.FormatLD1String, store.Version())
newRoot := hash.Of([]byte("new root"))
newChunk := chunks.NewChunk([]byte("new root"))
require.NoError(t, store.Put(context.Background(), newChunk, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
}))
newRoot := newChunk.Hash()
if assert.True(store.Commit(context.Background(), newRoot, hash.Hash{})) {
assert.Equal(constants.FormatLD1String, store.Version())
}
@@ -204,14 +209,18 @@ func TestChunkStoreCommitOptimisticLockFail(t *testing.T) {
}()
// Simulate another process writing a manifest behind store's back.
newRoot, chunks, err := interloperWrite(fm, p, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2"))
newRoot, chks, err := interloperWrite(fm, p, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2"))
require.NoError(t, err)
newRoot2 := hash.Of([]byte("new root 2"))
newChunk := chunks.NewChunk([]byte("new root 2"))
require.NoError(t, store.Put(context.Background(), newChunk, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
}))
newRoot2 := newChunk.Hash()
success, err := store.Commit(context.Background(), newRoot2, hash.Hash{})
require.NoError(t, err)
assert.False(success)
assertDataInStore(chunks, store, assert)
assertDataInStore(chks, store, assert)
success, err = store.Commit(context.Background(), newRoot2, newRoot)
require.NoError(t, err)
assert.True(success)

View File

@@ -95,7 +95,8 @@ type NomsBlockStore struct {
upstream manifestContents
cond *sync.Cond
gcInProgress atomic.Bool
gcInProgress bool
keeperFunc func(hash.Hash) bool
mtSize uint64
putCount uint64
@@ -164,7 +165,10 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash
func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
err = nbs.waitForGC(ctx)
if err != nil {
return
}
nbs.checkAllManifestUpdatesExist(ctx, updates)
@@ -243,7 +247,10 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (mi ManifestInfo, err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
err = nbs.waitForGC(ctx)
if err != nil {
return
}
nbs.checkAllManifestUpdatesExist(ctx, updates)
@@ -391,7 +398,10 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root hash.Hash, tableFiles map[hash.Hash]uint32, appendixTableFiles map[hash.Hash]uint32) (err error) {
store.mu.Lock()
defer store.mu.Unlock()
store.waitForGC()
err = store.waitForGC(ctx)
if err != nil {
return
}
contents := manifestContents{
root: root,
@@ -603,10 +613,20 @@ func (nbs *NomsBlockStore) WithoutConjoiner() *NomsBlockStore {
}
// Wait for GC to complete to continue with writes
func (nbs *NomsBlockStore) waitForGC() {
for nbs.gcInProgress.Load() {
func (nbs *NomsBlockStore) waitForGC(ctx context.Context) error {
stop := make(chan struct{})
defer close(stop)
go func() {
select {
case <-ctx.Done():
nbs.cond.Broadcast()
case <-stop:
}
}()
for nbs.gcInProgress && ctx.Err() == nil {
nbs.cond.Wait()
}
return ctx.Err()
}
func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
@@ -639,28 +659,43 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
}
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
if nbs.mt == nil {
nbs.mt = newMemTable(nbs.mtSize)
}
a := addr(ch.Hash())
addChunkRes := nbs.mt.addChunk(a, ch.Data())
if addChunkRes == chunkNotAdded {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.stats)
if err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
return false, err
retry := true
var addChunkRes addChunkResult
for retry {
retry = false
if nbs.mt == nil {
nbs.mt = newMemTable(nbs.mtSize)
}
nbs.tables = ts
nbs.mt = newMemTable(nbs.mtSize)
a := addr(ch.Hash())
addChunkRes = nbs.mt.addChunk(a, ch.Data())
if addChunkRes == chunkNotAdded {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.stats)
if err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
return false, err
}
nbs.tables = ts
nbs.mt = newMemTable(nbs.mtSize)
addChunkRes = nbs.mt.addChunk(a, ch.Data())
}
if addChunkRes == chunkAdded || addChunkRes == chunkExists {
if nbs.keeperFunc != nil && nbs.keeperFunc(ch.Hash()) {
retry = true
if err := nbs.waitForGC(ctx); err != nil {
return false, err
}
continue
}
}
if addChunkRes == chunkAdded {
nbs.mt.addChildRefs(addrs)
}
}
if addChunkRes == chunkAdded {
nbs.mt.addChildRefs(addrs)
}
return addChunkRes == chunkAdded || addChunkRes == chunkExists, nil
}
@@ -668,8 +703,21 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
type refCheck func(reqs []hasRecord) (hash.HashSet, error)
func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) error {
if !root.IsEmpty() {
var hr [1]hasRecord
a := addr(root)
hr[0].a = &a
hr[0].prefix = a.Prefix()
absent, err := checker(hr[:])
if err != nil {
return err
} else if absent.Size() > 0 {
return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}
}
if nbs.mt == nil || nbs.mt.pendingRefs == nil {
return nil // no refs to check
return nil // no pending refs to check
}
sort.Sort(hasRecordByPrefix(nbs.mt.pendingRefs))
@@ -680,17 +728,6 @@ func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) err
return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}
var hr [1]hasRecord
a := addr(root)
hr[0].a = &a
hr[0].prefix = a.Prefix()
absent, err = checker(hr[:])
if err != nil {
return err
} else if absent.Size() > 0 {
return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}
return nil
}
@@ -956,7 +993,10 @@ func toHasRecords(hashes hash.HashSet) []hasRecord {
func (nbs *NomsBlockStore) Rebase(ctx context.Context) error {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
return nbs.rebase(ctx)
}
func (nbs *NomsBlockStore) rebase(ctx context.Context) error {
exists, contents, _, err := nbs.mm.Fetch(ctx, nbs.stats)
if err != nil {
return err
@@ -1000,21 +1040,27 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
defer nbs.stats.CommitLatency.SampleTimeSince(t1)
nbs.mu.Lock()
nbs.waitForGC()
defer nbs.mu.Unlock()
if nbs.keeperFunc != nil {
if nbs.keeperFunc(current) {
err = nbs.waitForGC(ctx)
if err != nil {
return false, err
}
}
}
anyPossiblyNovelChunks := nbs.mt != nil || len(nbs.tables.novel) > 0
if !anyPossiblyNovelChunks && current == last {
nbs.mu.Unlock()
err := nbs.Rebase(ctx)
err := nbs.rebase(ctx)
if err != nil {
return false, err
}
return true, nil
}
defer nbs.mu.Unlock()
// check for dangling references in |nbs.mt|
if err = nbs.errorIfDangling(current, checker); err != nil {
if errors.Is(err, ErrDanglingRef) {
@@ -1192,7 +1238,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
newTables, err := nbs.tables.flatten(ctx)
if err != nil {
return nil
return err
}
nbs.upstream = newContents
@@ -1430,26 +1476,30 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context, checker refCheck
}, mtime)
}
func (nbs *NomsBlockStore) setGCInProgress(inProgress bool) bool {
func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool) error {
nbs.cond.L.Lock()
defer nbs.cond.L.Unlock()
swapped := nbs.gcInProgress.CompareAndSwap(!inProgress, inProgress)
if swapped {
nbs.cond.Broadcast()
return true
}
return false
}
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest chunks.ChunkStore) error {
swapped := nbs.setGCInProgress(true)
if !swapped {
if nbs.gcInProgress {
return errors.New("gc already in progress")
}
defer nbs.setGCInProgress(false)
nbs.gcInProgress = true
nbs.keeperFunc = keeper
nbs.cond.Broadcast()
return nil
}
func (nbs *NomsBlockStore) EndGC() {
nbs.cond.L.Lock()
defer nbs.cond.L.Unlock()
if !nbs.gcInProgress {
panic("EndGC called when gc was not in progress")
}
nbs.gcInProgress = false
nbs.keeperFunc = nil
nbs.cond.Broadcast()
}
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) error {
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return chunks.ErrUnsupportedOperation
@@ -1459,12 +1509,8 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
nbs.mu.RLock()
defer nbs.mu.RUnlock()
if nbs.upstream.root != last {
return errLastRootMismatch
}
// check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(last, nbs.upstream.specs, nbs.upstream.appendix)
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix)
if nbs.upstream.gcGen == gcGenCheck {
return chunks.ErrNothingToCollect
}
@@ -1486,7 +1532,7 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
}
}
specs, err := nbs.copyMarkedChunks(ctx, keepChunks, destNBS)
specs, err := nbs.copyMarkedChunks(ctx, hashes, destNBS)
if err != nil {
return err
}
@@ -1498,12 +1544,7 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Has
return nbs.swapTables(ctx, specs)
} else {
fileIdToNumChunks := tableSpecsToMap(specs)
err = destNBS.AddTableFilesToManifest(ctx, fileIdToNumChunks)
if err != nil {
return err
}
return nil
return destNBS.AddTableFilesToManifest(ctx, fileIdToNumChunks)
}
}
@@ -1518,6 +1559,8 @@ func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-ch
return nil, err
}
// TODO: We should clean up gcc on error.
LOOP:
for {
select {
@@ -1528,12 +1571,14 @@ LOOP:
var addErr error
mu := new(sync.Mutex)
hashset := hash.NewHashSet(hs...)
found := 0
err := nbs.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) {
mu.Lock()
defer mu.Unlock()
if addErr != nil {
return
}
found += 1
addErr = gcc.addChunk(ctx, c)
})
if err != nil {
@@ -1542,6 +1587,9 @@ LOOP:
if addErr != nil {
return nil, addErr
}
if found != len(hashset) {
return nil, fmt.Errorf("dangling references requested during GC. GC not successful. %v", hashset)
}
case <-ctx.Done():
return nil, ctx.Err()
}
@@ -1549,23 +1597,6 @@ LOOP:
return gcc.copyTablesToDir(ctx, tfp)
}
// todo: what's the optimal table size to copy to?
func (nbs *NomsBlockStore) gcTableSize() (uint64, error) {
total, err := nbs.tables.physicalLen()
if err != nil {
return 0, err
}
avgTableSize := total / uint64(nbs.tables.Size()+1)
// max(avgTableSize, defaultMemTableSize)
if avgTableSize > nbs.mtSize {
return avgTableSize, nil
}
return nbs.mtSize, nil
}
func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
@@ -1587,7 +1618,7 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
specs: specs,
}
// nothing has changed. Bail early
// Nothing has changed. Bail early.
if newContents.gcGen == nbs.upstream.gcGen {
return nil
}
@@ -1608,7 +1639,30 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
}
oldTables := nbs.tables
nbs.tables, nbs.upstream = ts, upstream
return oldTables.close()
err = oldTables.close()
if err != nil {
return err
}
// When this is called, we are at a safepoint in the GC process.
// We clear novel and the memtable, which are not coming with us
// into the new store.
oldNovel := nbs.tables.novel
nbs.tables.novel = make(chunkSourceSet)
for _, css := range oldNovel {
err = css.close()
if err != nil {
return err
}
}
if nbs.mt != nil {
var thrown []string
for a := range nbs.mt.chunks {
thrown = append(thrown, a.String())
}
}
nbs.mt = nil
return nil
}
// SetRootChunk changes the root chunk hash from the previous value to the new root.
@@ -1619,7 +1673,10 @@ func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash
func (nbs *NomsBlockStore) setRootChunk(ctx context.Context, root, previous hash.Hash, checker refCheck) error {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.waitForGC()
err := nbs.waitForGC(ctx)
if err != nil {
return err
}
for {
err := nbs.updateManifest(ctx, root, previous, checker)

View File

@@ -330,7 +330,9 @@ func TestNBSCopyGC(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
msErr = st.MarkAndSweepChunks(ctx, r, keepChan, nil)
require.NoError(t, st.BeginGC(nil))
msErr = st.MarkAndSweepChunks(ctx, keepChan, nil)
st.EndGC()
wg.Done()
}()
for h := range keepers {

View File

@@ -79,11 +79,22 @@ type ValueStore struct {
nbf *NomsBinFormat
versOnce sync.Once
gcMu sync.Mutex
gcCond *sync.Cond
doingGC bool
gcMu sync.Mutex
gcCond *sync.Cond
gcState gcState
gcOut int
gcNewAddrs hash.HashSet
}
type gcState int
const (
gcState_NoGC gcState = 0
gcState_NewGen = iota
gcState_OldGen
gcState_Finalizing
)
func AddrsFromNomsValue(ctx context.Context, c chunks.Chunk, nbf *NomsBinFormat) (addrs hash.HashSet, err error) {
addrs = hash.NewHashSet()
if NomsKind(c.Data()[0]) == SerialMessageKind {
@@ -143,6 +154,7 @@ func newValueStoreWithCacheAndPending(cs chunks.ChunkStore, cacheSize, pendingMa
cs: cs,
decodedChunks: sizecache.New(cacheSize),
versOnce: sync.Once{},
gcNewAddrs: make(hash.HashSet),
}
vs.gcCond = sync.NewCond(&vs.gcMu)
return vs
@@ -243,9 +255,9 @@ func (lvs *ValueStore) ReadManyValues(ctx context.Context, hashes hash.HashSlice
d.PanicIfTrue(v == nil)
nv := v.(Value)
foundValues[h] = nv
continue
} else {
remaining.Insert(h)
}
remaining.Insert(h)
}
if len(remaining) != 0 {
@@ -311,6 +323,12 @@ func (lvs *ValueStore) WriteValue(ctx context.Context, v Value) (Ref, error) {
return Ref{}, err
}
finalize, err := lvs.waitForNotFinalizingGC(ctx)
if err != nil {
return Ref{}, err
}
defer finalize()
err = lvs.cs.Put(ctx, c, lvs.getAddrs)
if err != nil {
return Ref{}, err
@@ -321,11 +339,9 @@ func (lvs *ValueStore) WriteValue(ctx context.Context, v Value) (Ref, error) {
func (lvs *ValueStore) Root(ctx context.Context) (hash.Hash, error) {
root, err := lvs.cs.Root(ctx)
if err != nil {
return hash.Hash{}, err
}
return root, nil
}
@@ -333,47 +349,167 @@ func (lvs *ValueStore) Rebase(ctx context.Context) error {
return lvs.cs.Rebase(ctx)
}
// Call with lvs.gcMu locked. Blocks until doingGC == false, releasing the
// lock while we are blocked. Returns with the lock held, doingGC == false.
func (lvs *ValueStore) waitForGC() {
for lvs.doingGC {
// Call with lvs.gcMu locked. Blocks until gcState == gcState_NoGC,
// releasing the lock while blocked.
//
// Returns with the lock held and gcState == gcState_NoGC.
func (lvs *ValueStore) waitForNoGC() {
for lvs.gcState != gcState_NoGC {
lvs.gcCond.Wait()
}
}
// Call without lvs.gcMu held. If val == false, then doingGC must equal
// true when this is called. We will set it to false and return without
// lvs.gcMu held. If val == true, we will set doingGC to true and return
// with lvs.gcMu not held.
// Call without lvs.gcMu locked.
//
// When val == true, this routine will block until it has a unique opportunity
// to toggle doingGC from false to true while holding the lock.
func (lvs *ValueStore) toggleGC(val bool) {
// Will block the caller until gcState != gcState_Finalizing.
//
// When this function returns, ValueStore is guaranteed to be in a state such
// that lvs.gcAddChunk will not return ErrAddChunkMustBlock. This function
// returns a finalizer which must be called. gcAddChunk will not return
// MustBlock until after the finalizer which this function returns is called.
//
// The critical sections delimited by `waitForNotFinalizingGC` and its return
// value should fully enclose any critical section which takes `bufferMu`.
//
// These sections are coping with the fact that no call into NomsBlockStore
// while we hold `lvs.bufferMu` is allowed to see `ErrAddChunkMustBlock` or we
// will block with the lock held. While the lock is held, reads cannot
// progress, and the GC process will not complete.
func (lvs *ValueStore) waitForNotFinalizingGC(ctx context.Context) (func(), error) {
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
if !val {
if !lvs.doingGC {
panic("tried to toggleGC to false while it was not true...")
stop := make(chan struct{})
defer close(stop)
go func() {
select {
case <-ctx.Done():
lvs.gcCond.Broadcast()
case <-stop:
}
lvs.doingGC = false
lvs.gcCond.Broadcast()
} else {
lvs.waitForGC()
lvs.doingGC = true
}()
for lvs.gcState == gcState_Finalizing && ctx.Err() == nil {
lvs.gcCond.Wait()
}
return
if err := ctx.Err(); err != nil {
return nil, err
}
lvs.gcOut += 1
lvs.gcCond.Broadcast()
return func() {
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
lvs.gcOut -= 1
lvs.gcCond.Broadcast()
}, nil
}
// Commit flushes all bufferedChunks into the ChunkStore
// and attempts to Commit, updating the root to |current| (or keeping
// it the same as Root()). If the root has moved since this ValueStore was
// opened, or last Rebased(), it will return false and will have internally
// rebased. Until Commit() succeeds, no work of the ValueStore will be visible
// to other readers of the underlying ChunkStore.
func (lvs *ValueStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
// Call without lvs.gcMu held. Puts the ValueStore into its initial GC state,
// where it is collecting OldGen. gcAddChunk begins accumulating new gen addrs.
func (lvs *ValueStore) transitionToOldGenGC() {
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
lvs.waitForGC()
lvs.waitForNoGC()
lvs.gcState = gcState_OldGen
lvs.gcCond.Broadcast()
}
// Call without lvs.gcMu held. Puts the ValueStore into the second in-progress
// GC state, where we are copying newgen to newgen. Returns all the novel
// addresses which were collected by lvs.gcAddChunk while we were collecting
// into the oldgen.
func (lvs *ValueStore) transitionToNewGenGC() hash.HashSet {
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
if lvs.gcState != gcState_OldGen {
panic("attempt to transition to NewGenGC from state != OldGenGC.")
}
lvs.gcState = gcState_NewGen
ret := lvs.gcNewAddrs
lvs.gcNewAddrs = make(hash.HashSet)
lvs.gcCond.Broadcast()
return ret
}
// Call without lvs.gcMu held. Puts the ValueStore into the third and final
// in-progress GC state, where we are finalizing the newgen GC. Returns all the
// novel addresses which were collected by lvs.gcAddChunk. This function will
// block until all inprogress `waitForNotFinalizingGC()` critical sections are
// complete, and will take the accumulated addresses then.
//
// The attempt to start new critical sections will block because the gcState is
// already Finalizing, but existing critical sections run to completion and
// count down gcOut.
func (lvs *ValueStore) transitionToFinalizingGC() hash.HashSet {
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
if lvs.gcState != gcState_NewGen {
panic("attempt to transition to FinalizingGC from state != NewGenGC.")
}
lvs.gcState = gcState_Finalizing
for lvs.gcOut != 0 {
lvs.gcCond.Wait()
}
ret := lvs.gcNewAddrs
lvs.gcNewAddrs = make(hash.HashSet)
lvs.gcCond.Broadcast()
return ret
}
// Call without lvs.gcMu held. Transitions the ValueStore to the quiescent
// state where we are not running a GC. This is a valid transition from any
// gcState in the case of an error.
//
// gcOut is not reset here, because it is maintained by paired up increments
// and decrements which do not have to do with the gcState per se.
func (lvs *ValueStore) transitionToNoGC() {
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
if len(lvs.gcNewAddrs) > 0 {
// In the case of an error during a GC, we transition to NoGC
// and it's expected to drop these addresses.
lvs.gcNewAddrs = make(hash.HashSet)
}
lvs.gcState = gcState_NoGC
lvs.gcCond.Broadcast()
}
func (lvs *ValueStore) gcAddChunk(h hash.Hash) bool {
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
if lvs.gcState == gcState_NoGC {
panic("ValueStore gcAddChunk called while no GC is ongoing")
}
if lvs.gcState == gcState_Finalizing && lvs.gcOut == 0 {
return true
}
lvs.gcNewAddrs.Insert(h)
return false
}
func (lvs *ValueStore) readAndResetNewGenToVisit() hash.HashSet {
lvs.gcMu.Lock()
defer lvs.gcMu.Unlock()
if lvs.gcState == gcState_NewGen {
ret := lvs.gcNewAddrs
lvs.gcNewAddrs = make(hash.HashSet)
return ret
}
return make(hash.HashSet)
}
// Commit attempts to Commit, updating the root to |current| (or
// keeping it the same as Root()). If the root has moved since this
// ValueStore was opened, or last Rebased(), it will return false and
// will have internally rebased. Until Commit() succeeds, no work of
// the ValueStore will be visible to other readers of the underlying
// ChunkStore.
func (lvs *ValueStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
c, err := lvs.waitForNotFinalizingGC(ctx)
if err != nil {
return false, err
}
defer c()
success, err := lvs.cs.Commit(ctx, current, last)
if err != nil {
@@ -414,47 +550,82 @@ func makeBatches(hss []hash.HashSet, count int) [][]hash.Hash {
}
// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet) error {
lvs.toggleGC(true)
defer lvs.toggleGC(false)
func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
lvs.versOnce.Do(lvs.expectVersion)
root, err := lvs.Root(ctx)
lvs.transitionToOldGenGC()
defer lvs.transitionToNoGC()
if err != nil {
return err
}
rootVal, err := lvs.ReadValue(ctx, root)
if err != nil {
return err
}
if rootVal == nil {
// empty root
return nil
}
newGenRefs.Insert(root)
if gcs, ok := lvs.cs.(chunks.GenerationalCS); ok {
oldGen := gcs.OldGen()
newGen := gcs.NewGen()
err = lvs.gc(ctx, root, oldGenRefs, oldGen.HasMany, newGen, oldGen)
err := newGen.BeginGC(lvs.gcAddChunk)
if err != nil {
return err
}
err = lvs.gc(ctx, root, newGenRefs, oldGen.HasMany, newGen, newGen)
root, err := lvs.Root(ctx)
if err != nil {
newGen.EndGC()
return err
}
if root == (hash.Hash{}) {
// empty root
newGen.EndGC()
return nil
}
oldGenRefs, err = oldGen.HasMany(ctx, oldGenRefs)
if err != nil {
return err
}
newGenRefs.Insert(root)
err = lvs.gc(ctx, oldGenRefs, oldGen.HasMany, newGen, oldGen, nil, func() hash.HashSet {
n := lvs.transitionToNewGenGC()
newGenRefs.InsertAll(n)
return make(hash.HashSet)
})
if err != nil {
newGen.EndGC()
return err
}
err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, safepointF, lvs.transitionToFinalizingGC)
newGen.EndGC()
if err != nil {
return err
}
} else if collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector); ok {
if len(oldGenRefs) > 0 {
newGenRefs.InsertAll(oldGenRefs)
extraNewGenRefs := lvs.transitionToNewGenGC()
newGenRefs.InsertAll(extraNewGenRefs)
newGenRefs.InsertAll(oldGenRefs)
err := collector.BeginGC(lvs.gcAddChunk)
if err != nil {
return err
}
err = lvs.gc(ctx, root, newGenRefs, unfilteredHashFunc, collector, collector)
root, err := lvs.Root(ctx)
if err != nil {
collector.EndGC()
return err
}
if root == (hash.Hash{}) {
// empty root
collector.EndGC()
return nil
}
newGenRefs.Insert(root)
err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC)
collector.EndGC()
if err != nil {
return err
}
@@ -462,18 +633,28 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS
return chunks.ErrUnsupportedOperation
}
// TODO: The decodedChunks cache can potentially allow phantom reads of
// already collected chunks until we clear it...
lvs.decodedChunks.Purge()
if tfs, ok := lvs.cs.(chunks.TableFileStore); ok {
return tfs.PruneTableFiles(ctx)
}
return nil
}
func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.HashSet, hashFilter HashFilterFunc, src, dest chunks.ChunkStoreGarbageCollector) error {
func (lvs *ValueStore) gc(ctx context.Context,
toVisit hash.HashSet,
hashFilter HashFilterFunc,
src, dest chunks.ChunkStoreGarbageCollector,
safepointF func() error,
finalize func() hash.HashSet) error {
keepChunks := make(chan []hash.Hash, gcBuffSize)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return src.MarkAndSweepChunks(ctx, root, keepChunks, dest)
return src.MarkAndSweepChunks(ctx, keepChunks, dest)
})
keepHashes := func(hs []hash.Hash) error {
@@ -494,8 +675,7 @@ func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.Hash
eg.Go(func() error {
defer walker.Close()
visited := toVisit.Copy()
err := lvs.gcProcessRefs(ctx, visited, []hash.HashSet{toVisit}, keepHashes, walker, hashFilter)
err := lvs.gcProcessRefs(ctx, toVisit, keepHashes, walker, hashFilter, safepointF, finalize)
if err != nil {
return err
}
@@ -515,47 +695,102 @@ func (lvs *ValueStore) gc(ctx context.Context, root hash.Hash, toVisit hash.Hash
return eg.Wait()
}
func (lvs *ValueStore) gcProcessRefs(ctx context.Context, visited hash.HashSet, toVisit []hash.HashSet, keepHashes func(hs []hash.Hash) error, walker *parallelRefWalker, hashFilter HashFilterFunc) error {
if len(toVisit) != 1 {
panic("Must be one initial hashset to visit")
func (lvs *ValueStore) gcProcessRefs(ctx context.Context,
initialToVisit hash.HashSet, keepHashes func(hs []hash.Hash) error,
walker *parallelRefWalker, hashFilter HashFilterFunc,
safepointF func() error,
finalize func() hash.HashSet) error {
visited := make(hash.HashSet)
process := func(initialToVisit hash.HashSet) error {
visited.InsertAll(initialToVisit)
toVisitCount := len(initialToVisit)
toVisit := []hash.HashSet{initialToVisit}
for toVisitCount > 0 {
batches := makeBatches(toVisit, toVisitCount)
toVisit = make([]hash.HashSet, len(batches)+1)
toVisitCount = 0
for i, batch := range batches {
if err := keepHashes(batch); err != nil {
return err
}
vals, err := lvs.ReadManyValues(ctx, batch)
if err != nil {
return err
}
for i, v := range vals {
if v == nil {
return fmt.Errorf("gc failed, dangling reference requested %v", batch[i])
}
}
hashes, err := walker.GetRefSet(visited, vals)
if err != nil {
return err
}
// continue processing
hashes, err = hashFilter(ctx, hashes)
if err != nil {
return err
}
toVisit[i] = hashes
toVisitCount += len(hashes)
}
}
return nil
}
err := process(initialToVisit)
if err != nil {
return err
}
toVisitCount := len(toVisit[0])
for toVisitCount > 0 {
batches := makeBatches(toVisit, toVisitCount)
toVisit = make([]hash.HashSet, len(batches))
toVisitCount = 0
for i, batch := range batches {
if err := keepHashes(batch); err != nil {
return err
}
// We can accumulate hashes which which are already visited. We prune
// those here.
vals, err := lvs.ReadManyValues(ctx, batch)
if err != nil {
return err
// Before we call finalize(), we can process the current set of
// NewGenToVisit. NewGen -> Finalize is going to block writes until
// we are done, so its best to keep it as small as possible.
next := lvs.readAndResetNewGenToVisit()
if len(next) > 0 {
nextCopy := next.Copy()
for h, _ := range nextCopy {
if visited.Has(h) {
next.Remove(h)
}
if len(vals) != len(batch) {
return errors.New("dangling reference found in chunk store")
}
hashes, err := walker.GetRefSet(visited, vals)
if err != nil {
return err
}
// continue processing
hashes, err = hashFilter(ctx, hashes)
if err != nil {
return err
}
toVisit[i] = hashes
toVisitCount += len(hashes)
}
next, err = hashFilter(ctx, next)
if err != nil {
return err
}
err = process(next)
if err != nil {
return err
}
}
lvs.decodedChunks.Purge()
final := finalize()
finalCopy := final.Copy()
for h, _ := range finalCopy {
if visited.Has(h) {
final.Remove(h)
}
}
finalCopy = nil
final, err = hashFilter(ctx, final)
if err != nil {
return err
}
err = process(final)
if err != nil {
return err
}
if safepointF != nil {
return safepointF()
}
return nil
}

View File

@@ -193,7 +193,7 @@ func TestGC(t *testing.T) {
require.NoError(t, err)
assert.NotNil(v2)
err = vs.GC(ctx, hash.HashSet{}, hash.HashSet{})
err = vs.GC(ctx, hash.HashSet{}, hash.HashSet{}, nil)
require.NoError(t, err)
v1, err = vs.ReadValue(ctx, h1) // non-nil

View File

@@ -50,6 +50,7 @@ RUN apt update -y && \
postgresql-contrib \
libpq-dev \
nodejs \
lsof \
postgresql-server-dev-all && \
update-ca-certificates -f

View File

@@ -93,11 +93,13 @@ SQL
run dolt ls --all
[ "$status" -eq 0 ]
[[ "$output" =~ "dolt_schemas" ]]
[[ "$output" =~ "dolt_schemas" ]] || false
dolt diff
run dolt diff
[ "$status" -eq 0 ]
[[ "$output" =~ "dolt_schemas" ]]
[[ "$output" =~ "-create view four as select 2+2 as res from dual" ]] || false
[[ ! "$output" =~ "dolt_schemas" ]] || false
dolt commit -Am "dropped a view"
dolt sql -q "drop view six"
@@ -105,11 +107,12 @@ SQL
# Dropping all views should result in the dolt_schemas table deleting itself
run dolt ls --all
[ "$status" -eq 0 ]
[[ ! "$output" =~ "dolt_schemas" ]]
[[ ! "$output" =~ "dolt_schemas" ]] || false
run dolt diff
[ "$status" -eq 0 ]
[[ "$output" =~ "deleted table" ]]
[[ "$output" =~ "-create view six as select 3+3 as res from dual" ]] || false
[[ ! "$output" =~ "deleted table" ]] || false
dolt commit -Am "no views left"

View File

@@ -0,0 +1 @@
{"tables":[{"name":"test","schema_diff":[],"data_diff":[{"from_row":{"c1":5,"c2":6,"pk":4},"to_row":{}},{"from_row":{},"to_row":{"c1":8,"c2":9,"pk":7}}]}],"triggers":[{"name":"tr1","from_definition":"","to_definition":"create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;"}],"views":[{"name":"v1","from_definition":"","to_definition":"create view v1 as select \"hello\" from test;"}]}

View File

@@ -69,7 +69,6 @@ SQL
}
@test "garbage_collection: call GC in sql script" {
export DOLT_ENABLE_GC_PROCEDURE="true"
dolt sql <<SQL
CREATE TABLE t (pk int primary key);
INSERT INTO t VALUES (1),(2),(3);

View File

@@ -120,6 +120,161 @@ EOF
[[ "$output" =~ "$EXPECTED" ]] || false
}
@test "json-diff: views" {
dolt sql <<SQL
drop table test;
create table test (pk int primary key, c1 int, c2 int);
call dolt_add('.');
insert into test values (1,2,3);
insert into test values (4,5,6);
SQL
dolt commit -am "First commit"
dolt sql <<SQL
create view v1 as select * from test;
SQL
dolt commit -Am "Second commit"
dolt diff -r json HEAD HEAD~
run dolt diff -r json HEAD HEAD~
EXPECTED=$(cat <<'EOF'
{"views":[{"name":"v1","from_definition":"create view v1 as select * from test;","to_definition":""}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
dolt diff -r json HEAD~ HEAD
run dolt diff -r json HEAD~ HEAD
EXPECTED=$(cat <<'EOF'
{"views":[{"name":"v1","from_definition":"","to_definition":"create view v1 as select * from test;"}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
dolt sql <<SQL
drop view v1;
create view v1 as select "one" from dual;
SQL
dolt commit -Am "redefined view"
dolt diff -r json HEAD~ HEAD
run dolt diff -r json HEAD~ HEAD
EXPECTED=$(cat <<'EOF'
{"views":[{"name":"v1","from_definition":"create view v1 as select * from test;","to_definition":"create view v1 as select \"one\" from dual;"}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
}
@test "json-diff: views, triggers, tables" {
dolt sql <<SQL
drop table test;
create table test (pk int primary key, c1 int, c2 int);
call dolt_add('.');
insert into test values (1,2,3);
insert into test values (4,5,6);
SQL
dolt commit -am "Table with rows"
dolt sql <<SQL
insert into test values (7,8,9);
delete from test where pk = 4;
SQL
dolt commit -Am "Table data diff"
dolt sql <<SQL
create view v1 as select "hello" from test;
SQL
dolt commit -Am "View"
dolt sql <<SQL
create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;
SQL
dolt commit -Am "Trigger"
# Only table data diff
dolt diff -r json HEAD~3 HEAD~2
run dolt diff -r json HEAD~3 HEAD~2
EXPECTED=$(cat <<'EOF'
{"tables":[{"name":"test","schema_diff":[],"data_diff":[{"from_row":{"c1":5,"c2":6,"pk":4},"to_row":{}},{"from_row":{},"to_row":{"c1":8,"c2":9,"pk":7}}]}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
# Only view diff
dolt diff -r json HEAD~2 HEAD~
run dolt diff -r json HEAD~2 HEAD~
EXPECTED=$(cat <<'EOF'
{"views":[{"name":"v1","from_definition":"","to_definition":"create view v1 as select \"hello\" from test;"}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
# Only trigger diff
dolt diff -r json HEAD~ HEAD
run dolt diff -r json HEAD~ HEAD
EXPECTED=$(cat <<'EOF'
{"triggers":[{"name":"tr1","from_definition":"","to_definition":"create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;"}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
# View and trigger diff
dolt diff -r json HEAD~2 HEAD
run dolt diff -r json HEAD~2 HEAD
EXPECTED=$(cat <<'EOF'
{"triggers":[{"name":"tr1","from_definition":"","to_definition":"create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;"}],"views":[{"name":"v1","from_definition":"","to_definition":"create view v1 as select \"hello\" from test;"}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
# Table and view diff
dolt diff -r json HEAD~3 HEAD~
run dolt diff -r json HEAD~3 HEAD~
EXPECTED=$(cat <<'EOF'
{"tables":[{"name":"test","schema_diff":[],"data_diff":[{"from_row":{"c1":5,"c2":6,"pk":4},"to_row":{}},{"from_row":{},"to_row":{"c1":8,"c2":9,"pk":7}}]}],"views":[{"name":"v1","from_definition":"","to_definition":"create view v1 as select \"hello\" from test;"}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
# All three kinds of diff
dolt diff -r json HEAD~3 HEAD
run dolt diff -r json HEAD~3 HEAD
EXPECTED=$(cat <<'EOF'
{"tables":[{"name":"test","schema_diff":[],"data_diff":[{"from_row":{"c1":5,"c2":6,"pk":4},"to_row":{}},{"from_row":{},"to_row":{"c1":8,"c2":9,"pk":7}}]}],"triggers":[{"name":"tr1","from_definition":"","to_definition":"create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;"}],"views":[{"name":"v1","from_definition":"","to_definition":"create view v1 as select \"hello\" from test;"}]}
EOF
)
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
}
@test "json-diff: with table args" {
dolt sql -q 'create table other (pk int not null primary key)'
dolt add .

View File

@@ -27,6 +27,43 @@ teardown() {
teardown_common
}
@test "merge: three-way merge with longer key on both left and right" {
# Base table has a key length of 2. Left and right will both add a column to
# the key, and the keys for all rows will differ in the last column.
dolt sql <<SQL
create table t1 (a int, b int, c int, primary key (a,b));
insert into t1 values (1,1,1), (2,2,2);
call dolt_commit('-Am', 'new table');
call dolt_branch('b1');
call dolt_branch('b2');
call dolt_checkout('b1');
alter table t1 add column d int not null default 4;
alter table t1 drop primary key;
alter table t1 add primary key (a,b,d);
update t1 set d = 5;
call dolt_commit('-Am', 'added a column to the primary key with value 5');
call dolt_checkout('b2');
alter table t1 add column d int not null default 4;
alter table t1 drop primary key;
alter table t1 add primary key (a,b,d);
update t1 set d = 6;
call dolt_commit('-Am', 'added a column to the primary key with value 6');
SQL
dolt merge b1
skip "merge hangs"
run dolt merge b2
log_status_eq 1
[[ "$output" =~ "cause: error: cannot merge table t1 because its different primary keys differ" ]]
}
@test "merge: 3way merge doesn't stomp working changes" {
dolt checkout -b merge_branch
dolt SQL -q "INSERT INTO test1 values (0,1,2)"

View File

@@ -261,7 +261,7 @@ teardown() {
run dolt merge test -m "merge other"
[ "$status" -eq 1 ]
[[ "$output" =~ 'error: cannot merge two tables with different primary key sets' ]] || false
[[ "$output" =~ 'error: cannot merge two tables with different primary keys' ]] || false
}
@test "primary-key-changes: merge on branch with primary key added throws an error" {
@@ -288,7 +288,7 @@ teardown() {
run dolt merge test -m "merge other"
[ "$status" -eq 1 ]
[[ "$output" =~ 'error: cannot merge two tables with different primary key sets' ]] || false
[[ "$output" =~ 'error: cannot merge two tables with different primary keys' ]] || false
}
@test "primary-key-changes: diff on primary key schema change shows schema level diff but does not show row level diff" {
@@ -527,11 +527,11 @@ SQL
run dolt merge test -m "merge other"
[ "$status" -eq 1 ]
[[ "$output" =~ 'error: cannot merge two tables with different primary key sets' ]] || false
[[ "$output" =~ 'error: cannot merge two tables with different primary keys' ]] || false
run dolt sql -q "call dolt_merge('test')"
[ "$status" -eq 1 ]
[[ "$output" =~ 'error: cannot merge two tables with different primary key sets' ]] || false
[[ "$output" =~ 'error: cannot merge two tables with different primary keys' ]] || false
skip "Dolt doesn't correctly store primary key order if it doesn't match the column order"
}

View File

@@ -713,3 +713,154 @@ SQL
[ "$status" -eq 0 ]
[[ "$output" =~ 'INSERT INTO `test` (`pk`,`c1`) VALUES (0,NULL)' ]] || false
}
@test "sql-diff: views, triggers, tables" {
dolt sql <<SQL
create table test (pk int primary key, c1 int, c2 int);
call dolt_add('.');
insert into test values (1,2,3);
insert into test values (4,5,6);
SQL
dolt commit -am "Table with rows"
dolt sql <<SQL
insert into test values (7,8,9);
delete from test where pk = 4;
SQL
dolt commit -Am "Table data diff"
dolt sql <<SQL
create view v1 as select "hello" from test;
SQL
dolt commit -Am "View"
dolt sql <<SQL
create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;
SQL
dolt commit -Am "Trigger"
# Only table data diff
dolt diff -r sql HEAD~3 HEAD~2
run dolt diff -r sql HEAD~3 HEAD~2
[ "$status" -eq 0 ]
cat > expected <<'EOF'
DELETE FROM `test` WHERE `pk`=4;
INSERT INTO `test` (`pk`,`c1`,`c2`) VALUES (7,8,9);
EOF
# We can't do direct bash comparisons because of newlines, so use diff to compare
# Diff returns non-zero unless empty
cat > actual <<EOF
${output}
EOF
diff -w expected actual
# Only view diff
dolt diff -r sql HEAD~2 HEAD~
run dolt diff -r sql HEAD~2 HEAD~
[ "$status" -eq 0 ]
cat > expected <<'EOF'
create view v1 as select "hello" from test;
EOF
cat > actual <<EOF
${output}
EOF
diff -w expected actual
# Only trigger diff
dolt diff -r sql HEAD~ HEAD
run dolt diff -r sql HEAD~ HEAD
[ "$status" -eq 0 ]
cat > expected <<'EOF'
create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;
EOF
cat > actual <<EOF
${output}
EOF
diff -w expected actual
# View and trigger diff
dolt diff -r sql HEAD~2 HEAD
run dolt diff -r sql HEAD~2 HEAD
[ "$status" -eq 0 ]
cat > expected <<EOF
create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;
create view v1 as select "hello" from test;
EOF
cat > actual <<EOF
${output}
EOF
diff -w expected actual
# Table and view diff
dolt diff -r sql HEAD~3 HEAD~
run dolt diff -r sql HEAD~3 HEAD~
[ "$status" -eq 0 ]
cat > expected <<'EOF'
DELETE FROM `test` WHERE `pk`=4;
INSERT INTO `test` (`pk`,`c1`,`c2`) VALUES (7,8,9);
create view v1 as select "hello" from test;
EOF
cat > actual <<EOF
${output}
EOF
diff -w expected actual
# All three kinds of diff
dolt diff -r sql HEAD~3 HEAD
run dolt diff -r sql HEAD~3 HEAD
[ "$status" -eq 0 ]
cat > expected <<'EOF'
DELETE FROM `test` WHERE `pk`=4;
INSERT INTO `test` (`pk`,`c1`,`c2`) VALUES (7,8,9);
create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 1;
create view v1 as select "hello" from test;
EOF
cat > actual <<EOF
${output}
EOF
diff -w expected actual
# check alterations of triggers and views
dolt sql <<SQL
drop trigger tr1;
drop view v1;
create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 100;
create view v1 as select "goodbye" from test;
SQL
dolt commit -am "new view and trigger defs"
dolt diff -r sql HEAD~ HEAD
run dolt diff -r sql HEAD~ HEAD
[ "$status" -eq 0 ]
cat > expected <<'EOF'
DROP TRIGGER `tr1`;
create trigger tr1 before insert on test for each row set new.c1 = new.c1 + 100;
DROP VIEW `v1`;
create view v1 as select "goodbye" from test;
EOF
cat > actual <<EOF
${output}
EOF
diff -w expected actual
}

View File

@@ -0,0 +1,227 @@
// Copyright 2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"database/sql"
sqldriver "database/sql/driver"
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver"
)
func TestConcurrentGC(t *testing.T) {
var gct gcTest
gct.numThreads = 8
gct.duration = 10 * time.Second
t.Run("NoCommits", func(t *testing.T) {
gct.run(t)
})
gct.commit = true
t.Run("WithCommits", func(t *testing.T) {
gct.run(t)
})
}
type gcTest struct {
numThreads int
duration time.Duration
commit bool
}
func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) {
conn, err := db.Conn(ctx)
require.NoError(t, err)
defer conn.Close()
// We're going to bootstrap the database with a table which has id, val, id == [0,7*1024], val == 0.
_, err = conn.ExecContext(ctx, "create table vals (id int primary key, val int)")
require.NoError(t, err)
vals := []string{}
for i := 0; i <= (gct.numThreads-1)*1024; i++ {
vals = append(vals, fmt.Sprintf("(%d,0)", i))
}
_, err = conn.ExecContext(ctx, "insert into vals values "+strings.Join(vals, ","))
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "call dolt_commit('-Am', 'create vals table')")
require.NoError(t, err)
}
func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int) error {
conn, err := db.Conn(ctx)
if err != nil {
t.Logf("err in Conn: %v", err)
return nil
}
defer conn.Close()
_, err = conn.ExecContext(ctx, "update vals set val = val+1 where id = ?", i)
if err != nil {
if !assert.NotContains(t, err.Error(), "dangling ref") {
return err
}
if !assert.NotContains(t, err.Error(), "is unexpected noms value") {
return err
}
if !assert.NotContains(t, err.Error(), "interface conversion: types.Value is nil") {
return err
}
t.Logf("err in Exec update: %v", err)
}
if gct.commit {
_, err = conn.ExecContext(ctx, fmt.Sprintf("call dolt_commit('-am', 'increment vals id = %d')", i))
if err != nil {
if !assert.NotContains(t, err.Error(), "dangling ref") {
return err
}
if !assert.NotContains(t, err.Error(), "is unexpected noms value") {
return err
}
if !assert.NotContains(t, err.Error(), "interface conversion: types.Value is nil") {
return err
}
t.Logf("err in Exec call dolt_commit: %v", err)
}
}
return nil
}
func (gct gcTest) doGC(t *testing.T, ctx context.Context, db *sql.DB) error {
conn, err := db.Conn(ctx)
if err != nil {
t.Logf("err in Conn for dolt_gc: %v", err)
return nil
}
defer func() {
// After calling dolt_gc, the connection is bad. Remove it from the connection pool.
conn.Raw(func(_ any) error {
return sqldriver.ErrBadConn
})
}()
b := time.Now()
_, err = conn.ExecContext(ctx, "call dolt_gc()")
if err != nil {
if !assert.NotContains(t, err.Error(), "dangling ref") {
return err
}
if !assert.NotContains(t, err.Error(), "is unexpected noms value") {
return err
}
if !assert.NotContains(t, err.Error(), "interface conversion: types.Value is nil") {
return err
}
t.Logf("err in Exec dolt_gc: %v", err)
} else {
t.Logf("successful dolt_gc took %v", time.Since(b))
}
return nil
}
func (gct gcTest) finalize(t *testing.T, ctx context.Context, db *sql.DB) {
conn, err := db.Conn(ctx)
require.NoError(t, err)
var ids []any
var qmarks []string
for i := 0; i < gct.numThreads*1024; i += 1024 {
ids = append(ids, i)
qmarks = append(qmarks, "?")
}
rows, err := conn.QueryContext(ctx, fmt.Sprintf("select val from vals where id in (%s)", strings.Join(qmarks, ",")), ids...)
require.NoError(t, err)
i := 0
cnt := 0
for rows.Next() {
var val int
i += 1
require.NoError(t, rows.Scan(&val))
cnt += val
}
require.Equal(t, len(ids), i)
t.Logf("successfully updated val %d times", cnt)
require.NoError(t, rows.Close())
require.NoError(t, rows.Err())
rows, err = conn.QueryContext(ctx, "select count(*) from dolt_log")
require.NoError(t, err)
require.True(t, rows.Next())
require.NoError(t, rows.Scan(&cnt))
t.Logf("database has %d commit(s)", cnt)
require.False(t, rows.Next())
require.NoError(t, rows.Close())
require.NoError(t, rows.Err())
require.NoError(t, conn.Close())
}
func (gct gcTest) run(t *testing.T) {
u, err := driver.NewDoltUser()
require.NoError(t, err)
t.Cleanup(func() {
u.Cleanup()
})
rs, err := u.MakeRepoStore()
require.NoError(t, err)
repo, err := rs.MakeRepo("concurrent_gc_test")
require.NoError(t, err)
server := MakeServer(t, repo, &driver.Server{})
server.DBName = "concurrent_gc_test"
db, err := server.DB(driver.Connection{User: "root"})
require.NoError(t, err)
defer db.Close()
gct.createDB(t, context.Background(), db)
start := time.Now()
eg, egCtx := errgroup.WithContext(context.Background())
// We're going to spawn 8 threads, each running mutations on their own part of the table...
for i := 0; i < gct.numThreads; i++ {
i := i * 1024
eg.Go(func() error {
for j := 0; time.Since(start) < gct.duration && egCtx.Err() == nil; j++ {
if err := gct.doUpdate(t, egCtx, db, i); err != nil {
return err
}
}
return nil
})
}
// We spawn a thread which calls dolt_gc() periodically
eg.Go(func() error {
for time.Since(start) < gct.duration && egCtx.Err() == nil {
if err := gct.doGC(t, egCtx, db); err != nil {
return err
}
time.Sleep(100 * time.Millisecond)
}
return nil
})
require.NoError(t, eg.Wait())
gct.finalize(t, context.Background(), db)
}

View File

@@ -5,6 +5,7 @@ go 1.19
require (
github.com/dolthub/dolt/go v0.40.4
github.com/stretchr/testify v1.8.1
golang.org/x/sync v0.1.0
gopkg.in/yaml.v3 v3.0.1
)

View File

@@ -14,6 +14,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

View File

@@ -68,6 +68,20 @@ export const diffTests = [
q: "SELECT * FROM dolt_diff_stat(:fromRefName, :toRefName)",
p: { fromRefName: "HEAD", toRefName: "WORKING" },
res: [
{
table_name: "dolt_schemas",
rows_unmodified: 0,
rows_added: 1,
rows_deleted: 0,
rows_modified: 0,
cells_added: 4,
cells_deleted: 0,
cells_modified: 0,
old_row_count: 0,
new_row_count: 1,
old_cell_count: 0,
new_cell_count: 4,
},
{
table_name: "test",
rows_unmodified: 2,
@@ -96,20 +110,6 @@ export const diffTests = [
old_cell_count: 3,
new_cell_count: 0,
},
{
table_name: "dolt_schemas",
rows_unmodified: 0,
rows_added: 1,
rows_deleted: 0,
rows_modified: 0,
cells_added: 4,
cells_deleted: 0,
cells_modified: 0,
old_row_count: 0,
new_row_count: 1,
old_cell_count: 0,
new_cell_count: 4,
},
],
},
{
@@ -306,6 +306,20 @@ export const diffTests = [
q: "SELECT * FROM dolt_diff_stat(:refRange)",
p: { refRange: "main...HEAD" },
res: [
{
table_name: "dolt_schemas",
rows_unmodified: 0,
rows_added: 1,
rows_deleted: 0,
rows_modified: 0,
cells_added: 4,
cells_deleted: 0,
cells_modified: 0,
old_row_count: 0,
new_row_count: 1,
old_cell_count: 0,
new_cell_count: 4,
},
{
table_name: "test",
rows_unmodified: 2,
@@ -334,20 +348,6 @@ export const diffTests = [
old_cell_count: 3,
new_cell_count: 0,
},
{
table_name: "dolt_schemas",
rows_unmodified: 0,
rows_added: 1,
rows_deleted: 0,
rows_modified: 0,
cells_added: 4,
cells_deleted: 0,
cells_modified: 0,
old_row_count: 0,
new_row_count: 1,
old_cell_count: 0,
new_cell_count: 4,
},
],
},
{