chore: add sentry support to engine (#237)

* chore: add sentry support to engine

* chore: address PR comments
This commit is contained in:
abelanger5
2024-03-06 08:50:49 -08:00
committed by GitHub
parent 85d89886d3
commit 105aa08f3f
6 changed files with 110 additions and 31 deletions

View File

@@ -97,6 +97,7 @@ func Run(ctx context.Context, cf *loader.ConfigLoader) error {
if sc.HasService("jobscontroller") {
jc, err := jobs.New(
jobs.WithAlerter(sc.Alerter),
jobs.WithTaskQueue(sc.TaskQueue),
jobs.WithRepository(sc.Repository),
jobs.WithLogger(sc.Logger),

View File

@@ -32,6 +32,8 @@ import (
"github.com/hatchet-dev/hatchet/internal/taskqueue/rabbitmq"
"github.com/hatchet-dev/hatchet/internal/validator"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/errors/sentry"
)
// LoadDatabaseConfigFile loads the database config file via viper
@@ -189,6 +191,21 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF
return nil, nil, fmt.Errorf("could not create ingestor: %w", err)
}
var alerter errors.Alerter
if cf.Alerting.Sentry.Enabled {
alerter, err = sentry.NewSentryAlerter(&sentry.SentryAlerterOpts{
DSN: cf.Alerting.Sentry.DSN,
Environment: cf.Alerting.Sentry.Environment,
})
if err != nil {
return nil, nil, fmt.Errorf("could not create sentry alerter: %w", err)
}
} else {
alerter = errors.NoOpAlerter{}
}
auth := server.AuthConfig{
ConfigFile: cf.Auth,
}
@@ -302,6 +319,7 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF
}
return cleanup, &server.ServerConfig{
Alerter: alerter,
Runtime: cf.Runtime,
Auth: auth,
Encryption: encryptionSvc,

View File

@@ -18,11 +18,14 @@ import (
"github.com/hatchet-dev/hatchet/internal/taskqueue"
"github.com/hatchet-dev/hatchet/internal/validator"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/errors"
)
type ServerConfigFile struct {
Auth ConfigFileAuth `mapstructure:"auth" json:"auth,omitempty"`
Alerting AlertingConfigFile `mapstructure:"alerting" json:"alerting,omitempty"`
Encryption EncryptionConfigFile `mapstructure:"encryption" json:"encryption,omitempty"`
Runtime ConfigFileRuntime `mapstructure:"runtime" json:"runtime,omitempty"`
@@ -67,6 +70,22 @@ type ConfigFileRuntime struct {
ShutdownWait time.Duration `mapstructure:"shutdownWait" json:"shutdownWait,omitempty" default:"20s"`
}
// Alerting options
type AlertingConfigFile struct {
Sentry SentryConfigFile `mapstructure:"sentry" json:"sentry,omitempty"`
}
type SentryConfigFile struct {
// Enabled controls whether the Sentry service is enabled for this Hatchet instance.
Enabled bool `mapstructure:"enabled" json:"enabled,omitempty"`
// DSN is the Data Source Name for the Sentry instance
DSN string `mapstructure:"dsn" json:"dsn,omitempty"`
// Environment is the environment that the instance is running in
Environment string `mapstructure:"environment" json:"environment,omitempty" default:"development"`
}
// Encryption options
type EncryptionConfigFile struct {
// MasterKeyset is the raw master keyset for the instance. This should be a base64-encoded JSON string. You must set
@@ -180,6 +199,8 @@ type ServerConfig struct {
Auth AuthConfig
Alerter errors.Alerter
Encryption encryption.EncryptionService
Runtime ConfigFileRuntime
@@ -229,6 +250,11 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("runtime.shutdownWait", "SERVER_SHUTDOWN_WAIT")
_ = v.BindEnv("services", "SERVER_SERVICES")
// alerting options
_ = v.BindEnv("alerting.sentry.enabled", "SERVER_ALERTING_SENTRY_ENABLED")
_ = v.BindEnv("alerting.sentry.dsn", "SERVER_ALERTING_SENTRY_DSN")
_ = v.BindEnv("alerting.sentry.environment", "SERVER_ALERTING_SENTRY_ENVIRONMENT")
// encryption options
_ = v.BindEnv("encryption.masterKeyset", "SERVER_ENCRYPTION_MASTER_KEYSET")
_ = v.BindEnv("encryption.masterKeysetFile", "SERVER_ENCRYPTION_MASTER_KEYSET_FILE")

View File

@@ -1,4 +1,4 @@
package datautils
package merge
func isYAMLTable(v interface{}) bool {
_, ok := v.(map[string]interface{})

View File

@@ -14,6 +14,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/datautils/merge"
"github.com/hatchet-dev/hatchet/internal/logger"
"github.com/hatchet-dev/hatchet/internal/repository"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
@@ -23,6 +24,8 @@ import (
"github.com/hatchet-dev/hatchet/internal/taskqueue"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/internal/telemetry/servertel"
hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors"
)
type JobsController interface {
@@ -35,22 +38,27 @@ type JobsControllerImpl struct {
repo repository.Repository
dv datautils.DataDecoderValidator
s gocron.Scheduler
a *hatcheterrors.Wrapped
}
type JobsControllerOpt func(*JobsControllerOpts)
type JobsControllerOpts struct {
tq taskqueue.TaskQueue
l *zerolog.Logger
repo repository.Repository
dv datautils.DataDecoderValidator
tq taskqueue.TaskQueue
l *zerolog.Logger
repo repository.Repository
dv datautils.DataDecoderValidator
alerter hatcheterrors.Alerter
}
func defaultJobsControllerOpts() *JobsControllerOpts {
logger := logger.NewDefaultLogger("jobs-controller")
alerter := hatcheterrors.NoOpAlerter{}
return &JobsControllerOpts{
l: &logger,
dv: datautils.NewDataDecoderValidator(),
l: &logger,
dv: datautils.NewDataDecoderValidator(),
alerter: alerter,
}
}
@@ -66,6 +74,12 @@ func WithLogger(l *zerolog.Logger) JobsControllerOpt {
}
}
func WithAlerter(a hatcheterrors.Alerter) JobsControllerOpt {
return func(opts *JobsControllerOpts) {
opts.alerter = a
}
}
func WithRepository(r repository.Repository) JobsControllerOpt {
return func(opts *JobsControllerOpts) {
opts.repo = r
@@ -102,12 +116,16 @@ func New(fs ...JobsControllerOpt) (*JobsControllerImpl, error) {
return nil, fmt.Errorf("could not create scheduler: %w", err)
}
a := hatcheterrors.NewWrapped(opts.alerter)
a.WithData(map[string]interface{}{"service": "jobs-controller"})
return &JobsControllerImpl{
tq: opts.tq,
l: opts.l,
repo: opts.repo,
dv: opts.dv,
s: s,
a: a,
}, nil
}
@@ -158,6 +176,7 @@ func (jc *JobsControllerImpl) Start() (func() error, error) {
err := jc.handleTask(context.Background(), task)
if err != nil {
jc.l.Error().Err(err).Msg("could not handle job task")
jc.a.WrapErr(fmt.Errorf("could not handle job task: %w", err), map[string]interface{}{"task_id": task.ID}) // nolint: errcheck
}
}(task)
}
@@ -466,7 +485,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *task
inputOverridesMap, ok2 := inputMap["overrides"].(map[string]interface{})
if ok1 && ok2 {
mergedInputOverrides := datautils.MergeMaps(currentInputOverridesMap, inputOverridesMap)
mergedInputOverrides := merge.MergeMaps(currentInputOverridesMap, inputOverridesMap)
inputMap["overrides"] = mergedInputOverrides
@@ -738,8 +757,14 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
// add the rendered data to the step run
stepRun, err := ec.repo.StepRun().GetStepRunById(tenantId, stepRunId)
errData := map[string]interface{}{
"tenant_id": tenantId,
"step_id": stepId,
"step_run_id": stepRunId,
}
if err != nil {
return fmt.Errorf("could not get step run: %w", err)
return ec.a.WrapErr(fmt.Errorf("could not get step run: %w", err), errData)
}
servertel.WithStepRunModel(span, stepRun)
@@ -773,7 +798,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
data, ok := lookupDataModel.Data()
if !ok {
return fmt.Errorf("job run has no lookup data")
return ec.a.WrapErr(fmt.Errorf("job run has no lookup data"), errData)
}
lookupData := &datautils.JobRunLookupData{}
@@ -781,7 +806,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
err := datautils.FromJSONType(&data, lookupData)
if err != nil {
return fmt.Errorf("could not get job run lookup data: %w", err)
return ec.a.WrapErr(fmt.Errorf("could not get job run lookup data: %w", err), errData)
}
userData := map[string]interface{}{}
@@ -819,26 +844,9 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
inputDataBytes, err := json.Marshal(inputData)
if err != nil {
return fmt.Errorf("could not convert input data to json: %w", err)
return ec.a.WrapErr(fmt.Errorf("could not convert input data to json: %w", err), errData)
}
// defer the update of the input schema to the step run
// defer func() {
// jsonSchemaBytes, err := schema.SchemaBytesFromBytes(inputDataBytes)
// if err != nil {
// ec.l.Err(err).Msgf("could not get schema bytes from bytes: %s", err.Error())
// return
// }
// _, err = ec.repo.StepRun().UpdateStepRunInputSchema(stepRun.TenantID, stepRun.ID, jsonSchemaBytes)
// if err != nil {
// ec.l.Err(err).Msgf("could not update step run input schema: %s", err.Error())
// return
// }
// }()
updateStepOpts.Input = inputDataBytes
}
}
@@ -856,10 +864,10 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
return nil
}
return fmt.Errorf("could not update step run: %w", err)
return ec.a.WrapErr(fmt.Errorf("could not update step run: %w", err), errData)
}
return ec.scheduleStepRun(ctx, tenantId, stepId, stepRunId)
return ec.a.WrapErr(ec.scheduleStepRun(ctx, tenantId, stepId, stepRunId), errData)
}
func (ec *JobsControllerImpl) scheduleStepRun(ctx context.Context, tenantId, stepId, stepRunId string) error {

View File

@@ -2,6 +2,8 @@ package errors
import (
"context"
"github.com/hatchet-dev/hatchet/internal/datautils/merge"
)
type Alerter interface {
@@ -11,3 +13,27 @@ type Alerter interface {
type NoOpAlerter struct{}
func (s NoOpAlerter) SendAlert(ctx context.Context, err error, data map[string]interface{}) {}
type Wrapped struct {
a Alerter
data map[string]interface{}
}
func NewWrapped(a Alerter) *Wrapped {
return &Wrapped{
a: a,
}
}
func (w *Wrapped) WithData(data map[string]interface{}) {
w.data = data
}
func (w *Wrapped) WrapErr(err error, data map[string]interface{}) error {
if err == nil {
return nil
}
w.a.SendAlert(context.Background(), err, merge.MergeMaps(w.data, data))
return err
}