mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-02-19 22:59:08 -06:00
Do not run cleanup on v1_workflow_concurrency_slot (#2463)
* do not run cleanup on v1_concurrency_slot * fix health endpoints for engine
This commit is contained in:
@@ -164,7 +164,7 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
|
||||
var h *health.Health
|
||||
healthProbes := sc.HasService("health")
|
||||
if healthProbes {
|
||||
h = health.New(sc.EngineRepository, sc.MessageQueue, sc.Version)
|
||||
h = health.New(sc.EngineRepository, sc.MessageQueue, sc.Version, l)
|
||||
cleanup, err := h.Start(sc.Runtime.HealthcheckPort)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not start health: %w", err)
|
||||
@@ -651,7 +651,7 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
|
||||
var h *health.Health
|
||||
|
||||
if healthProbes {
|
||||
h = health.New(sc.EngineRepository, sc.MessageQueue, sc.Version)
|
||||
h = health.New(sc.EngineRepository, sc.MessageQueue, sc.Version, l)
|
||||
|
||||
cleanup, err := h.Start(sc.Runtime.HealthcheckPort)
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/internal/msgqueue"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository"
|
||||
)
|
||||
@@ -19,13 +21,15 @@ type Health struct {
|
||||
|
||||
repository repository.EngineRepository
|
||||
queue msgqueue.MessageQueue
|
||||
l *zerolog.Logger
|
||||
}
|
||||
|
||||
func New(repo repository.EngineRepository, queue msgqueue.MessageQueue, version string) *Health {
|
||||
func New(repo repository.EngineRepository, queue msgqueue.MessageQueue, version string, l *zerolog.Logger) *Health {
|
||||
return &Health{
|
||||
version: version,
|
||||
repository: repo,
|
||||
queue: queue,
|
||||
l: l,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,11 +40,12 @@ func (h *Health) SetReady(ready bool) {
|
||||
func (h *Health) Start(port int) (func() error, error) {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
mux.HandleFunc("/live", func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if !h.ready || !h.queue.IsReady() || !h.repository.Health().IsHealthy(ctx) {
|
||||
h.l.Error().Msg("liveness check failed")
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
@@ -49,7 +54,11 @@ func (h *Health) Start(port int) (func() error, error) {
|
||||
})
|
||||
|
||||
mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if !h.ready || !h.queue.IsReady() || !h.repository.Health().IsHealthy(ctx) {
|
||||
h.l.Error().Msg("readiness check failed")
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
|
||||
@@ -23,6 +24,7 @@ func (a *healthAPIRepository) IsHealthy(ctx context.Context) bool {
|
||||
_, err := a.queries.Health(ctx, a.pool)
|
||||
|
||||
if err != nil { //nolint:gosimple
|
||||
a.l.Err(err).Msg("health check failed")
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -37,21 +39,24 @@ func (a *healthAPIRepository) PgStat() *pgxpool.Stat {
|
||||
type healthEngineRepository struct {
|
||||
queries *dbsqlc.Queries
|
||||
pool *pgxpool.Pool
|
||||
l *zerolog.Logger
|
||||
}
|
||||
|
||||
func NewHealthEngineRepository(pool *pgxpool.Pool) repository.HealthRepository {
|
||||
func NewHealthEngineRepository(pool *pgxpool.Pool, l *zerolog.Logger) repository.HealthRepository {
|
||||
queries := dbsqlc.New()
|
||||
|
||||
return &healthEngineRepository{
|
||||
queries: queries,
|
||||
pool: pool,
|
||||
l: l,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *healthEngineRepository) IsHealthy(ctx context.Context) bool {
|
||||
_, err := a.queries.Health(ctx, a.pool)
|
||||
|
||||
if err != nil { //nolint:gosimple
|
||||
if err != nil {
|
||||
a.l.Err(err).Msg("health check failed")
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -374,7 +374,7 @@ func NewEngineRepository(pool *pgxpool.Pool, cf *server.ConfigFileRuntime, fs ..
|
||||
|
||||
return cleanup()
|
||||
}, &engineRepository{
|
||||
health: NewHealthEngineRepository(pool),
|
||||
health: NewHealthEngineRepository(pool, opts.l),
|
||||
apiToken: NewAPITokenRepository(shared, opts.cache),
|
||||
dispatcher: NewDispatcherRepository(pool, opts.v, opts.l),
|
||||
event: NewEventEngineRepository(shared, opts.metered, cf.EventBuffer),
|
||||
|
||||
@@ -962,24 +962,6 @@ WHERE (task_id, task_inserted_at, task_retry_count) IN (
|
||||
FROM locked_cs
|
||||
);
|
||||
|
||||
-- name: CleanupV1WorkflowConcurrencySlot :execresult
|
||||
WITH active_slots AS (
|
||||
SELECT DISTINCT
|
||||
wcs.strategy_id,
|
||||
wcs.workflow_version_id,
|
||||
wcs.workflow_run_id
|
||||
FROM v1_workflow_concurrency_slot wcs
|
||||
ORDER BY wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id
|
||||
LIMIT @batchSize::int
|
||||
)
|
||||
SELECT
|
||||
cleanup_workflow_concurrency_slots(
|
||||
slot.strategy_id,
|
||||
slot.workflow_version_id,
|
||||
slot.workflow_run_id
|
||||
)
|
||||
FROM active_slots slot;
|
||||
|
||||
-- name: GetTenantTaskStats :many
|
||||
WITH queued_tasks AS (
|
||||
SELECT
|
||||
|
||||
@@ -89,29 +89,6 @@ func (q *Queries) CleanupV1TaskRuntime(ctx context.Context, db DBTX, batchsize i
|
||||
return db.Exec(ctx, cleanupV1TaskRuntime, batchsize)
|
||||
}
|
||||
|
||||
const cleanupV1WorkflowConcurrencySlot = `-- name: CleanupV1WorkflowConcurrencySlot :execresult
|
||||
WITH active_slots AS (
|
||||
SELECT DISTINCT
|
||||
wcs.strategy_id,
|
||||
wcs.workflow_version_id,
|
||||
wcs.workflow_run_id
|
||||
FROM v1_workflow_concurrency_slot wcs
|
||||
ORDER BY wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id
|
||||
LIMIT $1::int
|
||||
)
|
||||
SELECT
|
||||
cleanup_workflow_concurrency_slots(
|
||||
slot.strategy_id,
|
||||
slot.workflow_version_id,
|
||||
slot.workflow_run_id
|
||||
)
|
||||
FROM active_slots slot
|
||||
`
|
||||
|
||||
func (q *Queries) CleanupV1WorkflowConcurrencySlot(ctx context.Context, db DBTX, batchsize int32) (pgconn.CommandTag, error) {
|
||||
return db.Exec(ctx, cleanupV1WorkflowConcurrencySlot, batchsize)
|
||||
}
|
||||
|
||||
const cleanupWorkflowConcurrencySlotsAfterInsert = `-- name: CleanupWorkflowConcurrencySlotsAfterInsert :exec
|
||||
WITH input AS (
|
||||
SELECT
|
||||
|
||||
@@ -3692,15 +3692,6 @@ func (r *TaskRepositoryImpl) Cleanup(ctx context.Context) (bool, error) {
|
||||
shouldContinue = true
|
||||
}
|
||||
|
||||
result, err = r.queries.CleanupV1WorkflowConcurrencySlot(ctx, tx, batchSize)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error cleaning up v1_workflow_concurrency_slot: %v", err)
|
||||
}
|
||||
|
||||
if result.RowsAffected() == batchSize {
|
||||
shouldContinue = true
|
||||
}
|
||||
|
||||
if err := commit(ctx); err != nil {
|
||||
return false, fmt.Errorf("error committing transaction: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user