mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-07 10:31:35 -05:00
feat(engine): readiness & liveness probes (#197)
This commit is contained in:
@@ -3,6 +3,7 @@ package engine
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/internal/config/loader"
|
||||
"github.com/hatchet-dev/hatchet/internal/services/admin"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
"github.com/hatchet-dev/hatchet/internal/services/controllers/workflows"
|
||||
"github.com/hatchet-dev/hatchet/internal/services/dispatcher"
|
||||
"github.com/hatchet-dev/hatchet/internal/services/grpc"
|
||||
"github.com/hatchet-dev/hatchet/internal/services/health"
|
||||
"github.com/hatchet-dev/hatchet/internal/services/heartbeat"
|
||||
"github.com/hatchet-dev/hatchet/internal/services/ingestor"
|
||||
"github.com/hatchet-dev/hatchet/internal/services/ticker"
|
||||
@@ -39,6 +41,17 @@ func Run(ctx context.Context, cf *loader.ConfigLoader) error {
|
||||
|
||||
var teardown []Teardown
|
||||
|
||||
var h *health.Health
|
||||
healthProbes := sc.HasService("health")
|
||||
if healthProbes {
|
||||
h = health.New(sc.Repository, sc.TaskQueue)
|
||||
cleanup := h.Start()
|
||||
teardown = append(teardown, Teardown{
|
||||
name: "health",
|
||||
fn: cleanup,
|
||||
})
|
||||
}
|
||||
|
||||
if sc.HasService("ticker") {
|
||||
t, err := ticker.New(
|
||||
ticker.WithTaskQueue(sc.TaskQueue),
|
||||
@@ -237,8 +250,18 @@ func Run(ctx context.Context, cf *loader.ConfigLoader) error {
|
||||
|
||||
l.Debug().Msgf("engine has started")
|
||||
|
||||
if healthProbes {
|
||||
h.SetReady(true)
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
if healthProbes {
|
||||
h.SetReady(false)
|
||||
}
|
||||
|
||||
time.Sleep(sc.Runtime.ShutdownWait)
|
||||
|
||||
l.Debug().Msgf("interrupt received, shutting down")
|
||||
|
||||
l.Debug().Msgf("waiting for all other services to gracefully exit...")
|
||||
|
||||
@@ -2,6 +2,7 @@ package server
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/spf13/viper"
|
||||
@@ -28,7 +29,7 @@ type ServerConfigFile struct {
|
||||
|
||||
TaskQueue TaskQueueConfigFile `mapstructure:"taskQueue" json:"taskQueue,omitempty"`
|
||||
|
||||
Services []string `mapstructure:"services" json:"services,omitempty" default:"[\"ticker\", \"grpc\", \"eventscontroller\", \"jobscontroller\", \"workflowscontroller\", \"heartbeater\"]"`
|
||||
Services []string `mapstructure:"services" json:"services,omitempty" default:"[\"health\", \"ticker\", \"grpc\", \"eventscontroller\", \"jobscontroller\", \"workflowscontroller\", \"heartbeater\"]"`
|
||||
|
||||
TLS shared.TLSConfigFile `mapstructure:"tls" json:"tls,omitempty"`
|
||||
|
||||
@@ -61,6 +62,9 @@ type ConfigFileRuntime struct {
|
||||
|
||||
// Whether the internal worker is enabled for this instance
|
||||
WorkerEnabled bool `mapstructure:"workerEnabled" json:"workerEnabled,omitempty" default:"false"`
|
||||
|
||||
// ShutdownWait is the time between the readiness probe being offline when a shutdown is triggered and the actual start of cleaning up resources.
|
||||
ShutdownWait time.Duration `mapstructure:"shutdownWait" json:"shutdownWait,omitempty" default:"20s"`
|
||||
}
|
||||
|
||||
// Encryption options
|
||||
@@ -222,6 +226,7 @@ func BindAllEnv(v *viper.Viper) {
|
||||
_ = v.BindEnv("runtime.grpcBroadcastAddress", "SERVER_GRPC_BROADCAST_ADDRESS")
|
||||
_ = v.BindEnv("runtime.grpcInsecure", "SERVER_GRPC_INSECURE")
|
||||
_ = v.BindEnv("runtime.workerEnabled", "SERVER_WORKER_ENABLED")
|
||||
_ = v.BindEnv("runtime.shutdownWait", "SERVER_SHUTDOWN_WAIT")
|
||||
_ = v.BindEnv("services", "SERVER_SERVICES")
|
||||
|
||||
// encryption options
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
package repository
|
||||
|
||||
type HealthRepository interface {
|
||||
IsHealthy() bool
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
-- name: Health :many
|
||||
SELECT id
|
||||
FROM "User"
|
||||
LIMIT 1;
|
||||
@@ -0,0 +1,38 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.24.0
|
||||
// source: health.sql
|
||||
|
||||
package dbsqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const health = `-- name: Health :many
|
||||
SELECT id
|
||||
FROM "User"
|
||||
LIMIT 1
|
||||
`
|
||||
|
||||
func (q *Queries) Health(ctx context.Context, db DBTX) ([]pgtype.UUID, error) {
|
||||
rows, err := db.Query(ctx, health)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []pgtype.UUID
|
||||
for rows.Next() {
|
||||
var id pgtype.UUID
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, id)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
@@ -4,6 +4,7 @@ sql:
|
||||
# database:
|
||||
# uri: "postgres://hatchet:hatchet@localhost:5431/hatchet"
|
||||
queries:
|
||||
- health.sql
|
||||
- events.sql
|
||||
- workflow_runs.sql
|
||||
- workflows.sql
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package prisma
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/internal/repository"
|
||||
"github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
|
||||
"github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc"
|
||||
)
|
||||
|
||||
type healthRepository struct {
|
||||
client *db.PrismaClient
|
||||
queries *dbsqlc.Queries
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func NewHealthRepository(client *db.PrismaClient, pool *pgxpool.Pool) repository.HealthRepository {
|
||||
queries := dbsqlc.New()
|
||||
|
||||
return &healthRepository{
|
||||
client: client,
|
||||
queries: queries,
|
||||
pool: pool,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *healthRepository) IsHealthy() bool {
|
||||
_, err := a.client.User.FindMany().Take(1).Exec(context.Background())
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, err = a.queries.Health(context.Background(), a.pool)
|
||||
if err != nil { //nolint:gosimple
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
@@ -26,6 +26,7 @@ type prismaRepository struct {
|
||||
ticker repository.TickerRepository
|
||||
userSession repository.UserSessionRepository
|
||||
user repository.UserRepository
|
||||
health repository.HealthRepository
|
||||
}
|
||||
|
||||
type PrismaRepositoryOpt func(*PrismaRepositoryOpts)
|
||||
@@ -80,9 +81,14 @@ func NewPrismaRepository(client *db.PrismaClient, pool *pgxpool.Pool, fs ...Pris
|
||||
ticker: NewTickerRepository(client, pool, opts.v, opts.l),
|
||||
userSession: NewUserSessionRepository(client, opts.v),
|
||||
user: NewUserRepository(client, opts.v),
|
||||
health: NewHealthRepository(client, pool),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *prismaRepository) Health() repository.HealthRepository {
|
||||
return r.health
|
||||
}
|
||||
|
||||
func (r *prismaRepository) APIToken() repository.APITokenRepository {
|
||||
return r.apiToken
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repository
|
||||
|
||||
type Repository interface {
|
||||
Health() HealthRepository
|
||||
APIToken() APITokenRepository
|
||||
Event() EventRepository
|
||||
Tenant() TenantRepository
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/internal/repository"
|
||||
"github.com/hatchet-dev/hatchet/internal/taskqueue"
|
||||
)
|
||||
|
||||
type Health struct {
|
||||
ready bool
|
||||
|
||||
repository repository.Repository
|
||||
queue taskqueue.TaskQueue
|
||||
}
|
||||
|
||||
func New(prisma repository.Repository, queue taskqueue.TaskQueue) *Health {
|
||||
return &Health{
|
||||
repository: prisma,
|
||||
queue: queue,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Health) SetReady(ready bool) {
|
||||
h.ready = ready
|
||||
}
|
||||
|
||||
func (h *Health) Start() func() error {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.HandleFunc("/live", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.ready || !h.queue.IsReady() || !h.repository.Health().IsHealthy() || !h.repository.Health().IsHealthy() {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
server := &http.Server{
|
||||
Addr: ":8733",
|
||||
Handler: mux,
|
||||
ReadTimeout: 5 * time.Second,
|
||||
WriteTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
cleanup := func() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := server.Shutdown(ctx); err != nil {
|
||||
return fmt.Errorf("could not shutdown server: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return cleanup
|
||||
}
|
||||
@@ -27,7 +27,7 @@ type session struct {
|
||||
type taskWithQueue struct {
|
||||
*taskqueue.Task
|
||||
|
||||
q taskqueue.Queue `json:"-"`
|
||||
q taskqueue.Queue
|
||||
}
|
||||
|
||||
// TaskQueueImpl implements TaskQueue interface using AMQP.
|
||||
@@ -39,10 +39,16 @@ type TaskQueueImpl struct {
|
||||
|
||||
l *zerolog.Logger
|
||||
|
||||
ready bool
|
||||
|
||||
// lru cache for tenant ids
|
||||
tenantIdCache *lru.Cache[string, bool]
|
||||
}
|
||||
|
||||
func (t *TaskQueueImpl) IsReady() bool {
|
||||
return t.ready
|
||||
}
|
||||
|
||||
type TaskQueueImplOpt func(*TaskQueueImplOpts)
|
||||
|
||||
type TaskQueueImplOpts struct {
|
||||
@@ -51,10 +57,10 @@ type TaskQueueImplOpts struct {
|
||||
}
|
||||
|
||||
func defaultTaskQueueImplOpts() *TaskQueueImplOpts {
|
||||
logger := logger.NewDefaultLogger("rabbitmq")
|
||||
l := logger.NewDefaultLogger("rabbitmq")
|
||||
|
||||
return &TaskQueueImplOpts{
|
||||
l: &logger,
|
||||
l: &l,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,23 +89,20 @@ func New(fs ...TaskQueueImplOpt) (func() error, *TaskQueueImpl) {
|
||||
newLogger := opts.l.With().Str("service", "events-controller").Logger()
|
||||
opts.l = &newLogger
|
||||
|
||||
sessions := redial(ctx, opts.l, opts.url)
|
||||
tasks := make(chan *taskWithQueue)
|
||||
|
||||
// create a new lru cache for tenant ids
|
||||
tenantIdCache, _ := lru.New[string, bool](2000) // nolint: errcheck - this only returns an error if the size is less than 0
|
||||
|
||||
t := &TaskQueueImpl{
|
||||
ctx: ctx,
|
||||
sessions: sessions,
|
||||
tasks: tasks,
|
||||
identity: identity(),
|
||||
l: opts.l,
|
||||
tenantIdCache: tenantIdCache,
|
||||
ctx: ctx,
|
||||
identity: identity(),
|
||||
l: opts.l,
|
||||
}
|
||||
|
||||
t.sessions = t.redial(ctx, opts.l, opts.url)
|
||||
t.tasks = make(chan *taskWithQueue)
|
||||
|
||||
// create a new lru cache for tenant ids
|
||||
t.tenantIdCache, _ = lru.New[string, bool](2000) // nolint: errcheck - this only returns an error if the size is less than 0
|
||||
|
||||
// init the queues in a blocking fashion
|
||||
sub := <-<-sessions
|
||||
sub := <-<-t.sessions
|
||||
if _, err := t.initQueue(sub, taskqueue.EVENT_PROCESSING_QUEUE); err != nil {
|
||||
t.l.Debug().Msgf("error initializing queue: %v", err)
|
||||
cancel()
|
||||
@@ -362,7 +365,7 @@ func (t *TaskQueueImpl) subscribe(subId string, q taskqueue.Queue, sessions chan
|
||||
}
|
||||
|
||||
// redial continually connects to the URL, exiting the program when no longer possible
|
||||
func redial(ctx context.Context, l *zerolog.Logger, url string) chan chan session {
|
||||
func (t *TaskQueueImpl) redial(ctx context.Context, l *zerolog.Logger, url string) chan chan session {
|
||||
sessions := make(chan chan session)
|
||||
|
||||
go func() {
|
||||
@@ -378,6 +381,18 @@ func redial(ctx context.Context, l *zerolog.Logger, url string) chan chan sessio
|
||||
}
|
||||
|
||||
newSession, err := getSession(ctx, l, url)
|
||||
t.ready = true
|
||||
|
||||
ch := newSession.Connection.NotifyClose(make(chan *amqp.Error, 1))
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ch:
|
||||
t.ready = false
|
||||
}
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
l.Error().Msgf("error getting session: %v", err)
|
||||
@@ -401,9 +416,9 @@ func redial(ctx context.Context, l *zerolog.Logger, url string) chan chan sessio
|
||||
func identity() string {
|
||||
hostname, err := os.Hostname()
|
||||
h := sha256.New()
|
||||
fmt.Fprint(h, hostname)
|
||||
fmt.Fprint(h, err)
|
||||
fmt.Fprint(h, os.Getpid())
|
||||
_, _ = fmt.Fprint(h, hostname)
|
||||
_, _ = fmt.Fprint(h, err)
|
||||
_, _ = fmt.Fprint(h, os.Getpid())
|
||||
return fmt.Sprintf("%x", h.Sum(nil))
|
||||
}
|
||||
|
||||
|
||||
@@ -146,4 +146,7 @@ type TaskQueue interface {
|
||||
// on the first message to a tenant to ensure that the tenant is registered, and store the tenant
|
||||
// in an LRU cache which lives in-memory.
|
||||
RegisterTenant(ctx context.Context, tenantId string) error
|
||||
|
||||
// IsReady returns true if the task queue is ready to accept tasks.
|
||||
IsReady() bool
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user