diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index f87e4819f2..0abdb3f23b 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -18,15 +18,18 @@ import ( "context" "os" "runtime" + "strconv" "strings" gms "github.com/dolthub/go-mysql-server" + "github.com/dolthub/go-mysql-server/eventscheduler" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/analyzer" "github.com/dolthub/go-mysql-server/sql/binlogreplication" "github.com/dolthub/go-mysql-server/sql/mysql_db" "github.com/dolthub/go-mysql-server/sql/rowexec" _ "github.com/dolthub/go-mysql-server/sql/variables" + "github.com/sirupsen/logrus" "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" @@ -69,6 +72,7 @@ type SqlEngineConfig struct { SystemVariables SystemVariables ClusterController *cluster.Controller BinlogReplicaController binlogreplication.BinlogReplicaController + EventSchedulerStatus eventscheduler.SchedulerStatus } // NewSqlEngine returns a SqlEngine @@ -197,10 +201,17 @@ func NewSqlEngine( return nil, err } - sessionFactory := doltSessionFactory(pro, mrEnv.Config(), bcController, config.Autocommit) + sessFactory := doltSessionFactory(pro, mrEnv.Config(), bcController, config.Autocommit) + + if engine.EventScheduler == nil { + err = configureEventScheduler(config, engine, sessFactory, pro) + if err != nil { + return nil, err + } + } if config.BinlogReplicaController != nil { - binLogSession, err := sessionFactory(sql.NewBaseSession(), pro) + binLogSession, err := sessFactory(sql.NewBaseSession(), pro) if err != nil { return nil, err } @@ -214,7 +225,7 @@ func NewSqlEngine( return &SqlEngine{ provider: pro, contextFactory: sqlContextFactory(), - dsessFactory: sessionFactory, + dsessFactory: sessFactory, engine: engine, }, nil } @@ -298,9 +309,8 @@ func (se *SqlEngine) Close() error { // configureBinlogReplicaController configures the binlog replication controller with the |engine|. func configureBinlogReplicaController(config *SqlEngineConfig, engine *gms.Engine, session *dsess.DoltSession) error { - contextFactory := sqlContextFactory() - - executionCtx, err := contextFactory(context.Background(), session) + ctxFactory := sqlContextFactory() + executionCtx, err := ctxFactory(context.Background(), session) if err != nil { return err } @@ -310,6 +320,59 @@ func configureBinlogReplicaController(config *SqlEngineConfig, engine *gms.Engin return nil } +// configureEventScheduler configures the event scheduler with the |engine| for executing events, a |sessFactory| +// for creating sessions, and a DoltDatabaseProvider, |pro|. +func configureEventScheduler(config *SqlEngineConfig, engine *gms.Engine, sessFactory sessionFactory, pro dsqle.DoltDatabaseProvider) error { + // need to give correct user, use the definer as user to run the event definition queries + ctxFactory := sqlContextFactory() + + // getCtxFunc is used to create new session context for event scheduler. + // It starts a transaction that needs to be committed using the function returned. + getCtxFunc := func() (*sql.Context, func() error, error) { + sess, err := sessFactory(sql.NewBaseSession(), pro) + if err != nil { + return nil, func() error { return nil }, err + } + + newCtx, err := ctxFactory(context.Background(), sess) + if err != nil { + return nil, func() error { return nil }, err + } + + ts, ok := newCtx.Session.(sql.TransactionSession) + if !ok { + return nil, func() error { return nil }, nil + } + + tr, err := sess.StartTransaction(newCtx, sql.ReadWrite) + if err != nil { + return nil, func() error { return nil }, err + } + + ts.SetTransaction(tr) + + return newCtx, func() error { + return ts.CommitTransaction(newCtx, tr) + }, nil + } + + // A hidden env var allows overriding the event scheduler period for testing. This option is not + // exposed via configuration because we do not want to encourage customers to use it. If the value + // is equal to or less than 0, then the period is ignored and the default period, 30s, is used. + eventSchedulerPeriod := 0 + eventSchedulerPeriodEnvVar := "DOLT_EVENT_SCHEDULER_PERIOD" + if s, ok := os.LookupEnv(eventSchedulerPeriodEnvVar); ok { + i, err := strconv.Atoi(s) + if err != nil { + logrus.Warnf("unable to parse value '%s' from env var '%s' as an integer", s, eventSchedulerPeriodEnvVar) + } else { + logrus.Warnf("overriding Dolt event scheduler period to %d seconds", i) + eventSchedulerPeriod = i + } + } + return engine.InitializeEventScheduler(getCtxFunc, config.EventSchedulerStatus, eventSchedulerPeriod) +} + // sqlContextFactory returns a contextFactory that creates a new sql.Context with the initial database provided func sqlContextFactory() contextFactory { return func(ctx context.Context, session sql.Session) (*sql.Context, error) { diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 8c48843353..879c8bf8ee 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -23,8 +23,10 @@ import ( "net/http" "runtime" "strconv" + "strings" "time" + "github.com/dolthub/go-mysql-server/eventscheduler" "github.com/dolthub/go-mysql-server/server" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/mysql_db" @@ -116,12 +118,14 @@ func Serve( logrus.TraceLevel.String(), ), Default: logrus.GetLevel().String(), - NotifyChanged: func(scope sql.SystemVariableScope, v sql.SystemVarValue) { - if level, err := logrus.ParseLevel(v.Val.(string)); err == nil { - logrus.SetLevel(level) - } else { - logrus.Warnf("could not parse requested log level %s as a log level. dolt_log_level variable value and logging behavior will diverge.", v.Val.(string)) + NotifyChanged: func(scope sql.SystemVariableScope, v sql.SystemVarValue) error { + level, err := logrus.ParseLevel(v.Val.(string)) + if err != nil { + return fmt.Errorf("could not parse requested log level %s as a log level. dolt_log_level variable value and logging behavior will diverge.", v.Val.(string)) } + + logrus.SetLevel(level) + return nil }, }, }) @@ -177,6 +181,12 @@ func Serve( ClusterController: clusterController, BinlogReplicaController: binlogreplication.DoltBinlogReplicaController, } + esStatus, err := getEventSchedulerStatus(serverConfig.EventSchedulerStatus()) + if err != nil { + return err, nil + } + config.EventSchedulerStatus = esStatus + sqlEngine, err := engine.NewSqlEngine( ctx, mrEnv, @@ -267,7 +277,6 @@ func Serve( var remoteSrv *remotesrv.Server if serverConfig.RemotesapiPort() != nil { - port := *serverConfig.RemotesapiPort() if remoteSrvSqlCtx, err := sqlEngine.NewDefaultContext(ctx); err == nil { listenaddr := fmt.Sprintf(":%d", port) @@ -592,3 +601,16 @@ func checkForUnixSocket(config ServerConfig) (string, bool, error) { return "", false, nil } + +func getEventSchedulerStatus(status string) (eventscheduler.SchedulerStatus, error) { + switch strings.ToLower(status) { + case "on", "1": + return eventscheduler.SchedulerOn, nil + case "off", "0": + return eventscheduler.SchedulerOff, nil + case "disabled": + return eventscheduler.SchedulerDisabled, nil + default: + return eventscheduler.SchedulerDisabled, fmt.Errorf("Error while setting value '%s' to 'event_scheduler'.", status) + } +} diff --git a/go/cmd/dolt/commands/sqlserver/server_test.go b/go/cmd/dolt/commands/sqlserver/server_test.go index 2e4fafefea..00f0142606 100644 --- a/go/cmd/dolt/commands/sqlserver/server_test.go +++ b/go/cmd/dolt/commands/sqlserver/server_test.go @@ -15,7 +15,6 @@ package sqlserver import ( - "fmt" "net/http" "os" "strings" @@ -445,7 +444,6 @@ func runDefaultBranchTests(t *testing.T, tests []defaultBranchTest, conn *dbr.Co } func TestReadReplica(t *testing.T) { - var err error cwd, err := os.Getwd() if err != nil { t.Fatalf("no working directory: %s", err.Error()) @@ -467,10 +465,8 @@ func TestReadReplica(t *testing.T) { sourceDbName := multiSetup.DbNames[1] localCfg, ok := multiSetup.GetEnv(readReplicaDbName).Config.GetConfig(env.LocalConfig) - if !ok { - t.Fatal("local config does not exist") - } - config.NewPrefixConfig(localCfg, env.SqlServerGlobalsPrefix).SetStrings(map[string]string{dsess.ReadReplicaRemote: "remote1", dsess.ReplicateHeads: "main,feature"}) + require.True(t, ok, "local config does not exist") + config.NewPrefixConfig(localCfg, env.SqlServerGlobalsPrefix).SetStrings(map[string]string{dsess.ReadReplicaRemote: "remote1", dsess.ReplicateHeads: "main"}) dsess.InitPersistedSystemVars(multiSetup.GetEnv(readReplicaDbName)) // start server as read replica @@ -480,14 +476,12 @@ func TestReadReplica(t *testing.T) { // set socket to nil to force tcp serverConfig = serverConfig.WithHost("127.0.0.1").WithSocket("") - func() { - os.Chdir(multiSetup.DbPaths[readReplicaDbName]) - go func() { - _, _ = Serve(context.Background(), "0.0.0", serverConfig, sc, multiSetup.GetEnv(readReplicaDbName)) - }() - err = sc.WaitForStart() + os.Chdir(multiSetup.DbPaths[readReplicaDbName]) + go func() { + err, _ = Serve(context.Background(), "0.0.0", serverConfig, sc, multiSetup.GetEnv(readReplicaDbName)) require.NoError(t, err) }() + require.NoError(t, sc.WaitForStart()) defer sc.StopServer() replicatedTable := "new_table" @@ -502,14 +496,16 @@ func TestReadReplica(t *testing.T) { require.NoError(t, err) sess := conn.NewSession(nil) - newBranch := "feature" - multiSetup.NewBranch(sourceDbName, newBranch) - multiSetup.CheckoutBranch(sourceDbName, newBranch) - multiSetup.PushToRemote(sourceDbName, "remote1", newBranch) + multiSetup.NewBranch(sourceDbName, "feature") + multiSetup.CheckoutBranch(sourceDbName, "feature") + multiSetup.PushToRemote(sourceDbName, "remote1", "feature") + + // Configure the read replica to pull the new feature branch we just created + config.NewPrefixConfig(localCfg, env.SqlServerGlobalsPrefix).SetStrings(map[string]string{dsess.ReadReplicaRemote: "remote1", dsess.ReplicateHeads: "main,feature"}) + dsess.InitPersistedSystemVars(multiSetup.GetEnv(readReplicaDbName)) var res []int - - q := sess.SelectBySql(fmt.Sprintf("call dolt_checkout('%s')", newBranch)) + q := sess.SelectBySql("call dolt_checkout('feature');") _, err = q.LoadContext(context.Background(), &res) require.NoError(t, err) assert.ElementsMatch(t, res, []int{0}) diff --git a/go/cmd/dolt/commands/sqlserver/serverconfig.go b/go/cmd/dolt/commands/sqlserver/serverconfig.go index 34bc606f14..3691cbb7ab 100644 --- a/go/cmd/dolt/commands/sqlserver/serverconfig.go +++ b/go/cmd/dolt/commands/sqlserver/serverconfig.go @@ -165,6 +165,8 @@ type ServerConfig interface { RemotesapiPort() *int // ClusterConfig is the configuration for clustering in this sql-server. ClusterConfig() cluster.Config + // EventSchedulerStatus is the configuration for enabling or disabling the event scheduler in this server. + EventSchedulerStatus() string } type validatingServerConfig interface { @@ -200,6 +202,7 @@ type commandLineServerConfig struct { socket string remotesapiPort *int goldenMysqlConn string + eventSchedulerStatus string } var _ ServerConfig = (*commandLineServerConfig)(nil) @@ -481,6 +484,22 @@ func (cfg *commandLineServerConfig) withGoldenMysqlConnectionString(cs string) * return cfg } +func (cfg *commandLineServerConfig) EventSchedulerStatus() string { + switch cfg.eventSchedulerStatus { + case "", "1": + return "ON" + case "0": + return "OFF" + default: + return strings.ToUpper(cfg.eventSchedulerStatus) + } +} + +func (cfg *commandLineServerConfig) withEventScheduler(es string) *commandLineServerConfig { + cfg.eventSchedulerStatus = es + return cfg +} + // DefaultServerConfig creates a `*ServerConfig` that has all of the options set to their default values. func DefaultServerConfig() *commandLineServerConfig { return &commandLineServerConfig{ diff --git a/go/cmd/dolt/commands/sqlserver/sqlserver.go b/go/cmd/dolt/commands/sqlserver/sqlserver.go index ff450939fa..204b6c01c2 100644 --- a/go/cmd/dolt/commands/sqlserver/sqlserver.go +++ b/go/cmd/dolt/commands/sqlserver/sqlserver.go @@ -48,6 +48,7 @@ const ( socketFlag = "socket" remotesapiPortFlag = "remotesapi-port" goldenMysqlConn = "golden" + eventSchedulerStatus = "event-scheduler" ) func indentLines(s string) string { @@ -166,6 +167,7 @@ func (cmd SqlServerCmd) ArgParserWithName(name string) *argparser.ArgParser { ap.SupportsOptionalString(socketFlag, "", "socket file", "Path for the unix socket file. Defaults to '/tmp/mysql.sock'.") ap.SupportsUint(remotesapiPortFlag, "", "remotesapi port", "Sets the port for a server which can expose the databases in this sql-server over remotesapi, so that clients can clone or pull from this server.") ap.SupportsString(goldenMysqlConn, "", "mysql connection string", "Provides a connection string to a MySQL instance to be used to validate query results") + ap.SupportsString(eventSchedulerStatus, "", "status", "Determines whether the Event Scheduler is enabled and running on the server. It has one of the following values: 'ON', 'OFF' or 'DISABLED'.") return ap } @@ -488,6 +490,15 @@ func getCommandLineConfig(creds *cli.UserPassword, apr *argparser.ArgParseResult config.withGoldenMysqlConnectionString(connStr) } + if esStatus, ok := apr.GetValue(eventSchedulerStatus); ok { + // make sure to assign eventSchedulerStatus first here + config.withEventScheduler(strings.ToUpper(esStatus)) + err := sql.SystemVariables.SetGlobal("event_scheduler", config.EventSchedulerStatus()) + if err != nil { + return nil, fmt.Errorf("failed to set event_scheduler. Error: %s", err.Error()) + } + } + return config, nil } @@ -521,6 +532,12 @@ func getYAMLServerConfig(fs filesys.Filesys, path string) (ServerConfig, error) return nil, fmt.Errorf("Failed to set net_write_timeout from yaml file '%s'. Error: %s", path, err.Error()) } } + if cfg.BehaviorConfig.EventSchedulerStatus != nil { + err = sql.SystemVariables.SetGlobal("event_scheduler", cfg.EventSchedulerStatus()) + if err != nil { + return nil, fmt.Errorf("Failed to set event_scheduler from yaml file '%s'. Error: %s", path, err.Error()) + } + } return cfg, nil } diff --git a/go/cmd/dolt/commands/sqlserver/yaml_config.go b/go/cmd/dolt/commands/sqlserver/yaml_config.go index d5fafd6bf6..a075fd880a 100644 --- a/go/cmd/dolt/commands/sqlserver/yaml_config.go +++ b/go/cmd/dolt/commands/sqlserver/yaml_config.go @@ -79,6 +79,8 @@ type BehaviorYAMLConfig struct { // DoltTransactionCommit enables the @@dolt_transaction_commit system variable, which // automatically creates a Dolt commit when any SQL transaction is committed. DoltTransactionCommit *bool `yaml:"dolt_transaction_commit"` + + EventSchedulerStatus *string `yaml:"event_scheduler"` } // UserYAMLConfig contains server configuration regarding the user account clients must use to connect @@ -183,6 +185,7 @@ func serverConfigAsYAMLConfig(cfg ServerConfig) YAMLConfig { strPtr(cfg.PersistenceBehavior()), boolPtr(cfg.DisableClientMultiStatements()), boolPtr(cfg.DoltTransactionCommit()), + strPtr(cfg.EventSchedulerStatus()), }, UserConfig: UserYAMLConfig{ Name: strPtr(cfg.User()), @@ -561,6 +564,20 @@ func (cfg YAMLConfig) ClusterConfig() cluster.Config { return cfg.ClusterCfg } +func (cfg YAMLConfig) EventSchedulerStatus() string { + if cfg.BehaviorConfig.EventSchedulerStatus == nil { + return "ON" + } + switch *cfg.BehaviorConfig.EventSchedulerStatus { + case "1": + return "ON" + case "0": + return "OFF" + default: + return strings.ToUpper(*cfg.BehaviorConfig.EventSchedulerStatus) + } +} + type ClusterYAMLConfig struct { StandbyRemotes_ []StandbyRemoteYAMLConfig `yaml:"standby_remotes"` BootstrapRole_ string `yaml:"bootstrap_role"` diff --git a/go/cmd/dolt/commands/sqlserver/yaml_config_test.go b/go/cmd/dolt/commands/sqlserver/yaml_config_test.go index fa5fcea922..fb48677be5 100644 --- a/go/cmd/dolt/commands/sqlserver/yaml_config_test.go +++ b/go/cmd/dolt/commands/sqlserver/yaml_config_test.go @@ -36,6 +36,7 @@ behavior: dolt_transaction_commit: true persistence_behavior: load disable_client_multi_statements: false + event_scheduler: ON user: name: "" diff --git a/go/go.mod b/go/go.mod index 21d42677e4..af7ce69e0e 100644 --- a/go/go.mod +++ b/go/go.mod @@ -59,7 +59,7 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/creasty/defaults v1.6.0 github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2 - github.com/dolthub/go-mysql-server v0.17.1-0.20230925200702-97cdc9c2cf91 + github.com/dolthub/go-mysql-server v0.17.1-0.20230927003048-44ab355c2753 github.com/dolthub/swiss v0.1.0 github.com/goccy/go-json v0.10.2 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 diff --git a/go/go.sum b/go/go.sum index e50a7c4043..30c946e733 100644 --- a/go/go.sum +++ b/go/go.sum @@ -180,8 +180,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U= github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0= github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y= github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168= -github.com/dolthub/go-mysql-server v0.17.1-0.20230925200702-97cdc9c2cf91 h1:6iBYeUupd44zQW0CjIAMHTNRXNTS3sM2LuXI6z1EScc= -github.com/dolthub/go-mysql-server v0.17.1-0.20230925200702-97cdc9c2cf91/go.mod h1:C0Q8nJKIEnCYXx+ot9F2V7bg7xz5wj+Z1o1BDyNnP7E= +github.com/dolthub/go-mysql-server v0.17.1-0.20230927003048-44ab355c2753 h1:68B9w6CNK+fQQ73rCqkrLaPuWg69TGCE8Kyv9hsZLiY= +github.com/dolthub/go-mysql-server v0.17.1-0.20230927003048-44ab355c2753/go.mod h1:C0Q8nJKIEnCYXx+ot9F2V7bg7xz5wj+Z1o1BDyNnP7E= github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514= github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto= github.com/dolthub/jsonpath v0.0.2-0.20230525180605-8dc13778fd72 h1:NfWmngMi1CYUWU4Ix8wM+USEhjc+mhPlT9JUR/anvbQ= diff --git a/go/libraries/doltcore/sqle/clusterdb/database.go b/go/libraries/doltcore/sqle/clusterdb/database.go index 09188acb93..62b2824a7f 100644 --- a/go/libraries/doltcore/sqle/clusterdb/database.go +++ b/go/libraries/doltcore/sqle/clusterdb/database.go @@ -28,6 +28,8 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" ) +const DoltClusterDbName = "dolt_cluster" + type database struct { statusProvider ClusterStatusProvider } @@ -38,7 +40,7 @@ var _ dsess.SqlDatabase = database{} const StatusTableName = "dolt_cluster_status" func (database) Name() string { - return "dolt_cluster" + return DoltClusterDbName } func (db database) GetTableInsensitive(ctx *sql.Context, tblName string) (sql.Table, bool, error) { diff --git a/go/libraries/doltcore/sqle/database.go b/go/libraries/doltcore/sqle/database.go index be13ce5f4b..7c2b745feb 100644 --- a/go/libraries/doltcore/sqle/database.go +++ b/go/libraries/doltcore/sqle/database.go @@ -22,8 +22,12 @@ import ( "strings" "time" + sqle "github.com/dolthub/go-mysql-server" "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/go-mysql-server/sql/analyzer" "github.com/dolthub/go-mysql-server/sql/fulltext" + "github.com/dolthub/go-mysql-server/sql/plan" + "github.com/dolthub/go-mysql-server/sql/planbuilder" "github.com/dolthub/go-mysql-server/sql/types" "github.com/dolthub/vitess/go/vt/sqlparser" "gopkg.in/src-d/go-errors.v1" @@ -1286,52 +1290,133 @@ func (db Database) GetEvent(ctx *sql.Context, name string) (sql.EventDefinition, for _, frag := range frags { if strings.ToLower(frag.name) == strings.ToLower(name) { - return sql.EventDefinition{ - Name: frag.name, - CreateStatement: frag.fragment, - CreatedAt: frag.created, - SqlMode: frag.sqlMode, - }, true, nil + event, err := db.createEventDefinitionFromFragment(ctx, frag) + if err != nil { + return sql.EventDefinition{}, false, err + } + return *event, true, nil } } return sql.EventDefinition{}, false, nil } // GetEvents implements sql.EventDatabase. -func (db Database) GetEvents(ctx *sql.Context) ([]sql.EventDefinition, error) { +func (db Database) GetEvents(ctx *sql.Context) (events []sql.EventDefinition, token interface{}, err error) { tbl, ok, err := db.GetTableInsensitive(ctx, doltdb.SchemasTableName) if err != nil { - return nil, err + return nil, nil, err } if !ok { - return nil, nil + // If the dolt_schemas table doesn't exist, it's not an error, just no events + return nil, nil, nil } frags, err := getSchemaFragmentsOfType(ctx, tbl.(*WritableDoltTable), eventFragment) + if err != nil { + return nil, nil, err + } + + for _, frag := range frags { + event, err := db.createEventDefinitionFromFragment(ctx, frag) + if err != nil { + return nil, nil, err + } + events = append(events, *event) + } + + // Grab a hash of the dolt_schemas table to use as the identifying token + // to track if events need to be reloaded. + tableHash, err := db.doltSchemaTableHash(ctx) + if err != nil { + return nil, nil, err + } + + return events, tableHash, nil +} + +// NeedsToReloadEvents implements sql.EventDatabase. +func (db Database) NeedsToReloadEvents(ctx *sql.Context, token interface{}) (bool, error) { + hash, ok := token.(hash.Hash) + if !ok { + return false, fmt.Errorf("expected token to be hash.Hash, but received %T", token) + } + + tableHash, err := db.doltSchemaTableHash(ctx) + if err != nil { + return false, err + } + + // If the current hash doesn't match what we last loaded, then we + // need to reload event definitions + return !tableHash.Equal(hash), nil +} + +// doltSchemaTableHash returns the hash of the dolt_schemas table, or any error encountered along the way. +func (db Database) doltSchemaTableHash(ctx *sql.Context) (hash.Hash, error) { + root, err := db.GetRoot(ctx) + if err != nil { + return hash.Hash{}, err + } + + tableHash, _, err := root.GetTableHash(ctx, doltdb.SchemasTableName) + return tableHash, err +} + +// createEventDefinitionFromFragment creates an EventDefinition instance from the schema fragment |frag|. +func (db Database) createEventDefinitionFromFragment(ctx *sql.Context, frag schemaFragment) (*sql.EventDefinition, error) { + catalog := db.getCatalog(ctx) + sqlMode := sql.NewSqlModeFromString(frag.sqlMode) + parsed, err := planbuilder.ParseWithOptions(ctx, catalog, updateEventStatusTemporarilyForNonDefaultBranch(db.revision, frag.fragment), sqlMode.ParserOptions()) if err != nil { return nil, err } - var events []sql.EventDefinition - for _, frag := range frags { - events = append(events, sql.EventDefinition{ - Name: frag.name, - CreateStatement: frag.fragment, - CreatedAt: frag.created, - SqlMode: frag.sqlMode, - }) + eventPlan, ok := parsed.(*plan.CreateEvent) + if !ok { + return nil, fmt.Errorf("unexpected type %T for create event statement", eventPlan) } - return events, nil + + // NOTE: Time fields for events are assumed to be specified in the session's timezone, which defaults to the + // system's timezone. When we store them, we store them at UTC, and when we send them back to a caller + // we convert them back to the caller's session timezone. + // Here we are loading the events from disk, so they are already in UTC and don't need any other + // timezone applied, so we specify "+00:00". + event, err := eventPlan.GetEventDefinition(ctx, frag.created, frag.created, frag.created, "+00:00") + if err != nil { + return nil, err + } + event.SqlMode = frag.sqlMode + + return &event, nil +} + +// getCatalog creates and returns the analyzer.Catalog instance for this database. +func (db Database) getCatalog(ctx *sql.Context) *analyzer.Catalog { + doltSession := dsess.DSessFromSess(ctx.Session) + return sqle.NewDefault(doltSession.Provider()).Analyzer.Catalog } // SaveEvent implements sql.EventDatabase. -func (db Database) SaveEvent(ctx *sql.Context, ed sql.EventDefinition) error { - return db.addFragToSchemasTable(ctx, +func (db Database) SaveEvent(ctx *sql.Context, event sql.EventDefinition) (bool, error) { + // If the database is not the default branch database, then the event is disabled. + // TODO: need better way to determine the default branch; currently it checks only 'main' + if db.revision != env.DefaultInitBranch && event.Status == sql.EventStatus_Enable.String() { + // using revision database name + event.Status = sql.EventStatus_Disable.String() + ctx.Session.Warn(&sql.Warning{ + Level: "Warning", + Code: 1105, + Message: fmt.Sprintf("Event status cannot be enabled for revision database."), + }) + } + + // TODO: store LastAltered, LastExecuted and TimezoneOffset in appropriate place + return event.Status == sql.EventStatus_Enable.String(), db.addFragToSchemasTable(ctx, eventFragment, - ed.Name, - ed.CreateStatement, - ed.CreatedAt, - sql.ErrEventAlreadyExists.New(ed.Name), + event.Name, + event.CreateEventStatement(), + event.CreatedAt, + sql.ErrEventAlreadyExists.New(event.Name), ) } @@ -1341,13 +1426,31 @@ func (db Database) DropEvent(ctx *sql.Context, name string) error { } // UpdateEvent implements sql.EventDatabase. -func (db Database) UpdateEvent(ctx *sql.Context, originalName string, ed sql.EventDefinition) error { +func (db Database) UpdateEvent(ctx *sql.Context, originalName string, event sql.EventDefinition) (bool, error) { // TODO: any EVENT STATUS change should also update the branch-specific event scheduling err := db.DropEvent(ctx, originalName) if err != nil { - return err + return false, err } - return db.SaveEvent(ctx, ed) + return db.SaveEvent(ctx, event) +} + +// UpdateLastExecuted implements sql.EventDatabase +func (db Database) UpdateLastExecuted(ctx *sql.Context, eventName string, lastExecuted time.Time) error { + // TODO: update LastExecuted in appropriate place + return nil +} + +// updateEventStatusTemporarilyForNonDefaultBranch updates the event status from ENABLE to DISABLE if it's not default branch. +// The event status metadata is not updated in storage, but only for display purposes we return event status as 'DISABLE'. +// This function is used temporarily to implement logic of only allowing enabled events to be executed on default branch. +func updateEventStatusTemporarilyForNonDefaultBranch(revision, createStmt string) string { + // TODO: need better way to determine the default branch; currently it checks only 'main' + + if revision == "" || revision == env.DefaultInitBranch { + return createStmt + } + return strings.Replace(createStmt, "ENABLE", "DISABLE", 1) } // GetStoredProcedure implements sql.StoredProcedureDatabase. diff --git a/go/libraries/doltcore/sqle/database_provider.go b/go/libraries/doltcore/sqle/database_provider.go index 224d599b46..d45d7b9f81 100644 --- a/go/libraries/doltcore/sqle/database_provider.go +++ b/go/libraries/doltcore/sqle/database_provider.go @@ -31,6 +31,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/env/actions" "github.com/dolthub/dolt/go/libraries/doltcore/ref" "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/clusterdb" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dfunctions" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" @@ -270,7 +271,7 @@ func (p DoltDatabaseProvider) AllDatabases(ctx *sql.Context) (all []sql.Database for _, db := range p.databases { all = append(all, db) - if showBranches { + if showBranches && db.Name() != clusterdb.DoltClusterDbName { revisionDbs, err := p.allRevisionDbs(ctx, db) if err != nil { // TODO: this interface is wrong, needs to return errors diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go b/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go index 39c375027b..72f7ebbcdc 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go @@ -109,7 +109,7 @@ func doDoltCheckout(ctx *sql.Context, args []string) (statusCode int, successMes return 1, "", err } if !isModification { - return 0, fmt.Sprintf("Already on branch '%s'\n", branchName), nil + return 0, fmt.Sprintf("Already on branch '%s'", branchName), nil } // Check if user wants to checkout branch. diff --git a/integration-tests/bats/events.bats b/integration-tests/bats/events.bats new file mode 100644 index 0000000000..57e6a130b9 --- /dev/null +++ b/integration-tests/bats/events.bats @@ -0,0 +1,266 @@ +#!/usr/bin/env bats +load $BATS_TEST_DIRNAME/helper/common.bash +load $BATS_TEST_DIRNAME/helper/query-server-common.bash + +make_test_repo_and_start_server() { + rm -rf ./"$1" + mkdir "$1" + cd "$1" + + # Override the default event scheduler period (30s) and set it to 1s so that we can run + # tests faster, without having to wait for the default 30s period to elapse several times. + export DOLT_EVENT_SCHEDULER_PERIOD=1 + start_sql_server + + dolt sql-client -P $PORT -u dolt --use-db information_schema -q "CREATE DATABASE repo1;" + dolt sql-client -P $PORT -u dolt --use-db repo1 -q "CREATE TABLE totals (id int PRIMARY KEY AUTO_INCREMENT, int_col int);" + dolt sql-client -P $PORT -u dolt --use-db repo1 -q "call dolt_commit('-Am', 'creating table');" +} + +setup() { + skiponwindows "tests are flaky on Windows" + setup_no_dolt_init + make_test_repo_and_start_server repo1 +} + +teardown() { + stop_sql_server 1 && sleep 0.5 + teardown_common +} + +@test "events: disabling recurring event should not be dropped" { + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT insert1 ON SCHEDULE EVERY 1 DAY DO INSERT INTO totals (int_col) VALUES (1);" + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "ALTER EVENT insert1 DISABLE; SELECT * FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "DISABLED" ]] || false +} + +@test "events: disabling current_timestamp one time event after execution" { + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT insert9 ON SCHEDULE AT CURRENT_TIMESTAMP DO INSERT INTO totals (int_col) VALUES (9);" + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 1 |" ]] || false + + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "| 0 |" ]] || false + + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT insert8 ON SCHEDULE AT CURRENT_TIMESTAMP ON COMPLETION PRESERVE DO INSERT INTO totals (int_col) VALUES (8);" + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 2 |" ]] || false + + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SHOW CREATE EVENT insert8;" + [ $status -eq 0 ] + [[ $output =~ "ON COMPLETION PRESERVE DISABLE" ]] || false + + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "ALTER EVENT insert8 ON COMPLETION NOT PRESERVE; SELECT COUNT(*) FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "| 1 |" ]] || false + + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "ALTER EVENT insert8 ENABLE; SELECT COUNT(*) FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "| 0 |" ]] || false +} + +@test "events: disabling future one time event after execution from the scheduler" { + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT insert9 ON SCHEDULE AT CURRENT_TIMESTAMP + INTERVAL 3 SECOND ON COMPLETION PRESERVE DO INSERT INTO totals (int_col) VALUES (9); SHOW CREATE EVENT insert9;" + [ $status -eq 0 ] + [[ $output =~ "ON COMPLETION PRESERVE ENABLE" ]] || false + + sleep 4 + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 1 |" ]] || false + + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "| 1 |" ]] || false + + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SHOW CREATE EVENT insert9;" + [ $status -eq 0 ] + [[ $output =~ "ON COMPLETION PRESERVE DISABLE" ]] || false +} + +@test "events: recurring event with STARTS and ENDS defined" { + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT insert1 ON SCHEDULE EVERY 2 SECOND STARTS CURRENT_TIMESTAMP + INTERVAL 2 SECOND ENDS CURRENT_TIMESTAMP + INTERVAL 5 SECOND DO INSERT INTO totals (int_col) VALUES (1);" + sleep 10 + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 2 |" ]] || false + + # should be dropped + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "| 0 |" ]] || false +} + +@test "events: recurring event with ENDS defined" { + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT insert1 ON SCHEDULE EVERY 2 SECOND ENDS CURRENT_TIMESTAMP + INTERVAL 3 SECOND ON COMPLETION PRESERVE DO INSERT INTO totals (int_col) VALUES (1); SELECT SLEEP(5);" + sleep 2 + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 2 |" ]] || false + + # should be disabled + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT * FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "DISABLED" ]] || false +} + +@test "events: checking out a branch should disable all events leaving the working set dirty" { + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT insert1 ON SCHEDULE EVERY 2 SECOND ENDS CURRENT_TIMESTAMP + INTERVAL 3 SECOND ON COMPLETION PRESERVE DO INSERT INTO totals (int_col) VALUES (1); SELECT SLEEP(5);" + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CALL DOLT_COMMIT('-am','commit event changes to totals table')" + + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CALL DOLT_CHECKOUT('-b','newbranch')" + # should be disabled + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT * FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "DISABLED" ]] || false +} + +@test "events: events on default branch still run after switching to non-default branch" { + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT insert1 ON SCHEDULE EVERY 3 SECOND ENDS CURRENT_TIMESTAMP + INTERVAL 3 MINUTE DO INSERT INTO totals (int_col) VALUES (1);" + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CALL DOLT_COMMIT('-Am','commit with an event');" + + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CALL DOLT_CHECKOUT('-b','newbranch');" + # should be disabled + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 1 |" ]] || false + + # While we are sleeping, the event executor should still be running our event on schedule, on the main branch + sleep 5 + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CALL DOLT_CHECKOUT('main'); SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ ! $output =~ "| 1 |" ]] || false +} + +# Test that events with multiple statements in nested BEGIN/END blocks work correctly +@test "events: multiple statements in nested BEGIN END blocks in event body" { + # Use dolt sql to pipe in a HEREDOC; Note that this will connect to the running sql-server + cd repo1 + dolt sql << SQL +delimiter // +CREATE EVENT event1234 +ON SCHEDULE AT CURRENT_TIMESTAMP +DO +BEGIN +INSERT INTO totals (int_col) VALUES (111); +BEGIN +INSERT INTO totals (int_col) VALUES (222); +INSERT INTO totals (int_col) VALUES (333); +END; +END; +// +delimiter ; +SQL + + # Verify that our event ran correctly and inserted three rows + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT * FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 1 | 111 |" ]] || false + [[ $output =~ "| 2 | 222 |" ]] || false + [[ $output =~ "| 3 | 333 |" ]] || false + + # Verify that the event did not persist after execution + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "| 0 |" ]] || false +} + +# Test that events containing procedure calls work correctly +@test "events: procedure calls in events" { + # Create a procedure + cd repo1 + dolt sql << SQL +DELIMITER // +CREATE PROCEDURE InsertIntoTotals() +BEGIN + INSERT INTO totals (int_col) VALUES (42); +END // +DELIMITER ; +SQL + + # Use dolt sql to pipe in a HEREDOC; Note that this will connect to the running sql-server + dolt sql << SQL +delimiter // +CREATE EVENT event1234 +ON SCHEDULE AT CURRENT_TIMESTAMP +DO +BEGIN + CALL InsertIntoTotals(); +END; +// +delimiter ; +SQL + + # Verify that our event ran correctly and inserted one row + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT * FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 1 | 42 |" ]] || false + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 1 " ]] || false + + # Verify that the event did not persist after execution + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM information_schema.events;" + [ $status -eq 0 ] + [[ $output =~ "| 0 |" ]] || false +} + +# Test that out-of-band event definition changes (e.g. merges, reverts, or anything else that doesn't go through +# CREATE EVENT statements) are reflected correctly. +@test "events: out-of-band event changes are detected" { + # Use dolt sql to pipe in a HEREDOC; Note that this will connect to the running sql-server + dolt sql << SQL +call dolt_checkout('-b', 'other'); +CREATE EVENT event12345 +ON SCHEDULE EVERY 1 SECOND STARTS CURRENT_TIMESTAMP +DO INSERT INTO totals (int_col) VALUES (42); +call dolt_commit('-Am', 'Adding a new recurring event'); +SQL + + # Verify that our event IS NOT executing (on a non-main branch) + sleep 1 + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT COUNT(*) FROM totals;" + [ $status -eq 0 ] + [[ $output =~ "| 0 " ]] || false + + # Merge our event from other back to main and enable it + dolt sql << SQL +call dolt_checkout('main'); +call dolt_merge('other'); +ALTER EVENT event12345 ENABLE; +call dolt_commit('-am', 'committing enabled event'); +SQL + + # Verify that the new event starts executing on main after we merge it over + sleep 2 + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT (SELECT COUNT(*) FROM totals) > 0;" + [ $status -eq 0 ] + [[ $output =~ "| 1 " ]] || false +} + +@test "events: restarting a sql-server correctly schedules existing events" { + # Create the recurring event and make sure it runs at least once + dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "CREATE EVENT eventTest1 ON SCHEDULE EVERY 1 SECOND STARTS CURRENT_TIMESTAMP DO INSERT INTO totals (int_col) VALUES (111);" + sleep 1 + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT (SELECT COUNT(*) FROM totals) > 0;" + [ $status -eq 0 ] + [[ $output =~ "| 1 " ]] || false + + # Stop the sql-server, truncate the totals table, and assert it's empty + stop_sql_server 1 + dolt sql -q "truncate totals;" + run dolt sql -q "SELECT (SELECT COUNT(*) FROM totals) > 0;" + [ $status -eq 0 ] + [[ $output =~ "| false " ]] || false + + # Restart the server and assert that the event gets scheduled and executed again + start_sql_server + sleep 1 + run dolt sql-client -P $PORT -u dolt --use-db 'repo1' -q "SELECT (SELECT COUNT(*) FROM totals) > 0;" + [ $status -eq 0 ] + [[ $output =~ "| 1 " ]] || false +}