mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-24 11:18:35 -05:00
refactor: buffered writes of step run statuses (#941)
* (wip) handle step run updates without deferred updates * refactor: buffered writes of step run statuses * fix: add more safety on tenant pools * add configurable flush period, remove wait for started * flush immediately if last flush time plus flush period is in the past * feat: add configurable flush internal/max items
This commit is contained in:
@@ -4,15 +4,18 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type OperationPool struct {
|
||||
ops sync.Map
|
||||
timeout time.Duration
|
||||
description string
|
||||
method OpMethod
|
||||
ql *zerolog.Logger
|
||||
ops sync.Map
|
||||
timeout time.Duration
|
||||
description string
|
||||
method OpMethod
|
||||
ql *zerolog.Logger
|
||||
setTenantsMu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewOperationPool(ql *zerolog.Logger, timeout time.Duration, description string, method OpMethod) *OperationPool {
|
||||
@@ -24,11 +27,37 @@ func NewOperationPool(ql *zerolog.Logger, timeout time.Duration, description str
|
||||
}
|
||||
}
|
||||
|
||||
func (p *OperationPool) SetTenants(tenants []*dbsqlc.Tenant) {
|
||||
p.setTenantsMu.Lock()
|
||||
defer p.setTenantsMu.Unlock()
|
||||
|
||||
tenantMap := make(map[string]bool)
|
||||
|
||||
for _, t := range tenants {
|
||||
tenantMap[sqlchelpers.UUIDToStr(t.ID)] = true
|
||||
}
|
||||
|
||||
// delete tenants that are not in the list
|
||||
p.ops.Range(func(key, value interface{}) bool {
|
||||
if _, ok := tenantMap[key.(string)]; !ok {
|
||||
p.ops.Delete(key)
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (p *OperationPool) RunOrContinue(id string) {
|
||||
p.setTenantsMu.RLock()
|
||||
defer p.setTenantsMu.RUnlock()
|
||||
|
||||
p.GetOperation(id).RunOrContinue(p.ql)
|
||||
}
|
||||
|
||||
func (p *OperationPool) GetOperation(id string) *SerialOperation {
|
||||
p.setTenantsMu.RLock()
|
||||
defer p.setTenantsMu.RUnlock()
|
||||
|
||||
op, ok := p.ops.Load(id)
|
||||
|
||||
if !ok {
|
||||
|
||||
@@ -976,7 +976,7 @@ func (ec *JobsControllerImpl) handleStepRunFailed(ctx context.Context, task *msg
|
||||
}
|
||||
|
||||
func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRunId, errorReason string, failedAt time.Time) error {
|
||||
stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, tenantId, stepRunId)
|
||||
oldStepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, tenantId, stepRunId)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get step run: %w", err)
|
||||
@@ -986,7 +986,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
|
||||
defer ec.checkTenantQueue(ctx, tenantId)
|
||||
|
||||
// determine if step run should be retried or not
|
||||
shouldRetry := stepRun.SRRetryCount < stepRun.StepRetries
|
||||
shouldRetry := oldStepRun.SRRetryCount < oldStepRun.StepRetries
|
||||
|
||||
if shouldRetry {
|
||||
eventMessage := fmt.Sprintf("Step run failed on %s", failedAt.Format(time.RFC1123))
|
||||
@@ -995,7 +995,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
|
||||
|
||||
if errorReason == "TIMED_OUT" {
|
||||
eventReason = dbsqlc.StepRunEventReasonTIMEDOUT
|
||||
eventMessage = fmt.Sprintf("Step exceeded timeout duration (%s)", stepRun.StepTimeout.String)
|
||||
eventMessage = fmt.Sprintf("Step exceeded timeout duration (%s)", oldStepRun.StepTimeout.String)
|
||||
}
|
||||
|
||||
eventMessage += ", and will be retried."
|
||||
@@ -1005,7 +1005,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
|
||||
EventMessage: repository.StringPtr(eventMessage),
|
||||
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityCRITICAL),
|
||||
EventData: map[string]interface{}{
|
||||
"retry_count": stepRun.SRRetryCount,
|
||||
"retry_count": oldStepRun.SRRetryCount,
|
||||
},
|
||||
})
|
||||
|
||||
@@ -1013,19 +1013,12 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
|
||||
return ec.mq.AddMessage(
|
||||
ctx,
|
||||
msgqueue.JOB_PROCESSING_QUEUE,
|
||||
tasktypes.StepRunRetryToTask(stepRun, nil, errorReason),
|
||||
tasktypes.StepRunRetryToTask(oldStepRun, nil, errorReason),
|
||||
)
|
||||
}
|
||||
|
||||
// get the old step run to figure out the worker and dispatcher id, before we update the step run
|
||||
oldStepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, tenantId, stepRunId)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get step run: %w", err)
|
||||
}
|
||||
|
||||
// fail step run
|
||||
err = ec.repo.StepRun().StepRunFailed(ctx, tenantId, stepRunId, failedAt, errorReason)
|
||||
err = ec.repo.StepRun().StepRunFailed(ctx, tenantId, stepRunId, failedAt, errorReason, int(oldStepRun.SRRetryCount))
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not fail step run: %w", err)
|
||||
|
||||
@@ -35,9 +35,10 @@ type queue struct {
|
||||
// a custom queue logger
|
||||
ql *zerolog.Logger
|
||||
|
||||
tenantQueueOperations *queueutils.OperationPool
|
||||
updateStepRunOperations *queueutils.OperationPool
|
||||
timeoutStepRunOperations *queueutils.OperationPool
|
||||
tenantQueueOperations *queueutils.OperationPool
|
||||
updateStepRunOperations *queueutils.OperationPool
|
||||
updateStepRunV2Operations *queueutils.OperationPool
|
||||
timeoutStepRunOperations *queueutils.OperationPool
|
||||
}
|
||||
|
||||
func newQueue(
|
||||
@@ -68,6 +69,7 @@ func newQueue(
|
||||
|
||||
q.tenantQueueOperations = queueutils.NewOperationPool(ql, time.Second*5, "check tenant queue", q.scheduleStepRuns)
|
||||
q.updateStepRunOperations = queueutils.NewOperationPool(ql, time.Second*30, "update step runs", q.processStepRunUpdates)
|
||||
q.updateStepRunV2Operations = queueutils.NewOperationPool(ql, time.Second*30, "update step runs (v2)", q.processStepRunUpdatesV2)
|
||||
q.timeoutStepRunOperations = queueutils.NewOperationPool(ql, time.Second*30, "timeout step runs", q.processStepRunTimeouts)
|
||||
|
||||
return q, nil
|
||||
@@ -114,6 +116,18 @@ func (q *queue) Start() (func() error, error) {
|
||||
return nil, fmt.Errorf("could not schedule step run timeout: %w", err)
|
||||
}
|
||||
|
||||
_, err = q.s.NewJob(
|
||||
gocron.DurationJob(time.Second*1),
|
||||
gocron.NewTask(
|
||||
q.runTenantUpdateStepRunsV2(ctx),
|
||||
),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("could not schedule step run update (v2): %w", err)
|
||||
}
|
||||
|
||||
q.s.Start()
|
||||
|
||||
postAck := func(task *msgqueue.Message) error {
|
||||
@@ -192,6 +206,7 @@ func (q *queue) handleCheckQueue(ctx context.Context, task *msgqueue.Message) er
|
||||
// if this tenant is registered, then we should check the queue
|
||||
q.tenantQueueOperations.RunOrContinue(metadata.TenantId)
|
||||
q.updateStepRunOperations.RunOrContinue(metadata.TenantId)
|
||||
q.updateStepRunV2Operations.RunOrContinue(metadata.TenantId)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -208,6 +223,8 @@ func (q *queue) runTenantQueues(ctx context.Context) func() {
|
||||
return
|
||||
}
|
||||
|
||||
q.tenantQueueOperations.SetTenants(tenants)
|
||||
|
||||
for i := range tenants {
|
||||
tenantId := sqlchelpers.UUIDToStr(tenants[i].ID)
|
||||
|
||||
@@ -273,6 +290,8 @@ func (q *queue) runTenantUpdateStepRuns(ctx context.Context) func() {
|
||||
return
|
||||
}
|
||||
|
||||
q.updateStepRunOperations.SetTenants(tenants)
|
||||
|
||||
for i := range tenants {
|
||||
tenantId := sqlchelpers.UUIDToStr(tenants[i].ID)
|
||||
|
||||
@@ -356,6 +375,64 @@ func (q *queue) processStepRunUpdates(ctx context.Context, tenantId string) (boo
|
||||
return res.Continue, nil
|
||||
}
|
||||
|
||||
func (q *queue) runTenantUpdateStepRunsV2(ctx context.Context) func() {
|
||||
return func() {
|
||||
q.l.Debug().Msgf("partition: updating step run statuses (v2)")
|
||||
|
||||
// list all tenants
|
||||
tenants, err := q.repo.Tenant().ListTenantsByControllerPartition(ctx, q.p.GetControllerPartitionId())
|
||||
|
||||
if err != nil {
|
||||
q.l.Err(err).Msg("could not list tenants")
|
||||
return
|
||||
}
|
||||
|
||||
q.updateStepRunV2Operations.SetTenants(tenants)
|
||||
|
||||
for i := range tenants {
|
||||
tenantId := sqlchelpers.UUIDToStr(tenants[i].ID)
|
||||
|
||||
q.updateStepRunV2Operations.RunOrContinue(tenantId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queue) processStepRunUpdatesV2(ctx context.Context, tenantId string) (bool, error) {
|
||||
ctx, span := telemetry.NewSpan(ctx, "process-step-run-updates-v2")
|
||||
defer span.End()
|
||||
|
||||
dbCtx, cancel := context.WithTimeout(ctx, 300*time.Second)
|
||||
defer cancel()
|
||||
|
||||
res, err := q.repo.StepRun().ProcessStepRunUpdatesV2(dbCtx, q.ql, tenantId)
|
||||
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("could not process step run updates (v2): %w", err)
|
||||
}
|
||||
|
||||
// for all finished workflow runs, send a message
|
||||
for _, finished := range res.CompletedWorkflowRuns {
|
||||
workflowRunId := sqlchelpers.UUIDToStr(finished.ID)
|
||||
status := string(finished.Status)
|
||||
|
||||
err := q.mq.AddMessage(
|
||||
context.Background(),
|
||||
msgqueue.WORKFLOW_PROCESSING_QUEUE,
|
||||
tasktypes.WorkflowRunFinishedToTask(
|
||||
tenantId,
|
||||
workflowRunId,
|
||||
status,
|
||||
),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
q.l.Error().Err(err).Msg("could not add workflow run finished task to task queue (v2)")
|
||||
}
|
||||
}
|
||||
|
||||
return res.Continue, nil
|
||||
}
|
||||
|
||||
func (q *queue) runTenantTimeoutStepRuns(ctx context.Context) func() {
|
||||
return func() {
|
||||
q.l.Debug().Msgf("partition: running timeout for step runs")
|
||||
@@ -368,6 +445,8 @@ func (q *queue) runTenantTimeoutStepRuns(ctx context.Context) func() {
|
||||
return
|
||||
}
|
||||
|
||||
q.timeoutStepRunOperations.SetTenants(tenants)
|
||||
|
||||
for i := range tenants {
|
||||
tenantId := sqlchelpers.UUIDToStr(tenants[i].ID)
|
||||
|
||||
|
||||
@@ -104,6 +104,12 @@ type ConfigFileRuntime struct {
|
||||
// QueueLimit is the limit of items to return from a single queue at a time
|
||||
SingleQueueLimit int `mapstructure:"singleQueueLimit" json:"singleQueueLimit,omitempty" default:"100"`
|
||||
|
||||
// FlushPeriodMilliseconds is the number of milliseconds before flush
|
||||
FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"`
|
||||
|
||||
// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
|
||||
FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`
|
||||
|
||||
// Allow new tenants to be created
|
||||
AllowSignup bool `mapstructure:"allowSignup" json:"allowSignup,omitempty" default:"true"`
|
||||
|
||||
@@ -485,6 +491,8 @@ func BindAllEnv(v *viper.Viper) {
|
||||
_ = v.BindEnv("msgQueue.rabbitmq.qos", "SERVER_MSGQUEUE_RABBITMQ_QOS")
|
||||
_ = v.BindEnv("runtime.requeueLimit", "SERVER_REQUEUE_LIMIT")
|
||||
_ = v.BindEnv("runtime.singleQueueLimit", "SERVER_SINGLE_QUEUE_LIMIT")
|
||||
_ = v.BindEnv("runtime.flushPeriodMilliseconds", "SERVER_FLUSH_PERIOD_MILLISECONDS")
|
||||
_ = v.BindEnv("runtime.flushItemsThreshold", "SERVER_FLUSH_ITEMS_THRESHOLD")
|
||||
|
||||
// tls options
|
||||
_ = v.BindEnv("tls.tlsStrategy", "SERVER_TLS_STRATEGY")
|
||||
|
||||
@@ -139,10 +139,12 @@ func (b *IngestBuf[T, U]) buffWorker() {
|
||||
b.safeAppendInternalArray(e)
|
||||
b.safeIncSizeOfData(b.calcSizeOfData([]T{e.item}))
|
||||
|
||||
if b.safeCheckSizeOfBuffer() >= b.maxCapacity || b.safeFetchSizeOfData() >= b.maxDataSizeInQueue {
|
||||
// if last flush time + flush period is in the past, flush
|
||||
if time.Now().After(b.safeFetchLastFlush().Add(b.flushPeriod)) {
|
||||
b.flush(b.sliceInternalArray())
|
||||
} else if b.safeCheckSizeOfBuffer() >= b.maxCapacity || b.safeFetchSizeOfData() >= b.maxDataSizeInQueue {
|
||||
b.flush(b.sliceInternalArray())
|
||||
}
|
||||
|
||||
case <-time.After(time.Until(b.safeFetchLastFlush().Add(b.flushPeriod))):
|
||||
|
||||
b.flush(b.sliceInternalArray())
|
||||
|
||||
@@ -1419,6 +1419,7 @@ model QueueItem {
|
||||
enum InternalQueue {
|
||||
WORKER_SEMAPHORE_COUNT
|
||||
STEP_RUN_UPDATE
|
||||
STEP_RUN_UPDATE_V2
|
||||
WORKFLOW_RUN_UPDATE
|
||||
WORKFLOW_RUN_PAUSED
|
||||
}
|
||||
@@ -2271,6 +2272,7 @@ type InternalQueue string
|
||||
const (
|
||||
InternalQueueWorkerSemaphoreCount InternalQueue = "WORKER_SEMAPHORE_COUNT"
|
||||
InternalQueueStepRunUpdate InternalQueue = "STEP_RUN_UPDATE"
|
||||
InternalQueueStepRunUpdateV2 InternalQueue = "STEP_RUN_UPDATE_V2"
|
||||
InternalQueueWorkflowRunUpdate InternalQueue = "WORKFLOW_RUN_UPDATE"
|
||||
InternalQueueWorkflowRunPaused InternalQueue = "WORKFLOW_RUN_PAUSED"
|
||||
)
|
||||
|
||||
@@ -62,6 +62,7 @@ const (
|
||||
InternalQueueSTEPRUNUPDATE InternalQueue = "STEP_RUN_UPDATE"
|
||||
InternalQueueWORKFLOWRUNUPDATE InternalQueue = "WORKFLOW_RUN_UPDATE"
|
||||
InternalQueueWORKFLOWRUNPAUSED InternalQueue = "WORKFLOW_RUN_PAUSED"
|
||||
InternalQueueSTEPRUNUPDATEV2 InternalQueue = "STEP_RUN_UPDATE_V2"
|
||||
)
|
||||
|
||||
func (e *InternalQueue) Scan(src interface{}) error {
|
||||
|
||||
@@ -191,14 +191,16 @@ INSERT INTO
|
||||
"priority"
|
||||
)
|
||||
SELECT
|
||||
@queue::"InternalQueue",
|
||||
input."queue",
|
||||
true,
|
||||
input."data",
|
||||
@tenantId::uuid,
|
||||
input."tenantId",
|
||||
1
|
||||
FROM (
|
||||
SELECT
|
||||
unnest(@datas::json[]) AS "data"
|
||||
unnest(cast(@queues::text[] as"InternalQueue"[])) AS "queue",
|
||||
unnest(@datas::json[]) AS "data",
|
||||
unnest(@tenantIds::uuid[]) AS "tenantId"
|
||||
) AS input
|
||||
ON CONFLICT DO NOTHING;
|
||||
|
||||
|
||||
@@ -95,26 +95,28 @@ INSERT INTO
|
||||
"priority"
|
||||
)
|
||||
SELECT
|
||||
$1::"InternalQueue",
|
||||
input."queue",
|
||||
true,
|
||||
input."data",
|
||||
$2::uuid,
|
||||
input."tenantId",
|
||||
1
|
||||
FROM (
|
||||
SELECT
|
||||
unnest($3::json[]) AS "data"
|
||||
unnest(cast($1::text[] as"InternalQueue"[])) AS "queue",
|
||||
unnest($2::json[]) AS "data",
|
||||
unnest($3::uuid[]) AS "tenantId"
|
||||
) AS input
|
||||
ON CONFLICT DO NOTHING
|
||||
`
|
||||
|
||||
type CreateInternalQueueItemsBulkParams struct {
|
||||
Queue InternalQueue `json:"queue"`
|
||||
Tenantid pgtype.UUID `json:"tenantid"`
|
||||
Datas [][]byte `json:"datas"`
|
||||
Queues []string `json:"queues"`
|
||||
Datas [][]byte `json:"datas"`
|
||||
Tenantids []pgtype.UUID `json:"tenantids"`
|
||||
}
|
||||
|
||||
func (q *Queries) CreateInternalQueueItemsBulk(ctx context.Context, db DBTX, arg CreateInternalQueueItemsBulkParams) error {
|
||||
_, err := db.Exec(ctx, createInternalQueueItemsBulk, arg.Queue, arg.Tenantid, arg.Datas)
|
||||
_, err := db.Exec(ctx, createInternalQueueItemsBulk, arg.Queues, arg.Datas, arg.Tenantids)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
CREATE TYPE "ConcurrencyLimitStrategy" AS ENUM ('CANCEL_IN_PROGRESS', 'DROP_NEWEST', 'QUEUE_NEWEST', 'GROUP_ROUND_ROBIN');
|
||||
|
||||
-- CreateEnum
|
||||
CREATE TYPE "InternalQueue" AS ENUM ('WORKER_SEMAPHORE_COUNT', 'STEP_RUN_UPDATE', 'WORKFLOW_RUN_UPDATE', 'WORKFLOW_RUN_PAUSED');
|
||||
CREATE TYPE "InternalQueue" AS ENUM ('WORKER_SEMAPHORE_COUNT', 'STEP_RUN_UPDATE', 'WORKFLOW_RUN_UPDATE', 'WORKFLOW_RUN_PAUSED', 'STEP_RUN_UPDATE_V2');
|
||||
|
||||
-- CreateEnum
|
||||
CREATE TYPE "InviteLinkStatus" AS ENUM ('PENDING', 'ACCEPTED', 'REJECTED');
|
||||
|
||||
@@ -288,6 +288,8 @@ func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server
|
||||
f(opts)
|
||||
}
|
||||
|
||||
setDefaults(cf)
|
||||
|
||||
newLogger := opts.l.With().Str("service", "database").Logger()
|
||||
opts.l = &newLogger
|
||||
|
||||
@@ -298,9 +300,23 @@ func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server
|
||||
rlCache := cache.New(5 * time.Minute)
|
||||
eventEngine, cleanupEventEngine, err := NewEventEngineRepository(pool, opts.v, opts.l, opts.metered)
|
||||
|
||||
return func() error {
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
stepRunEngine, cleanupStepRunEngine, err := NewStepRunEngineRepository(queuePool, opts.v, opts.l, cf, rlCache)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return func() error {
|
||||
rlCache.Stop()
|
||||
|
||||
if err := cleanupStepRunEngine(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cleanupEventEngine()
|
||||
|
||||
}, &engineRepository{
|
||||
@@ -310,14 +326,14 @@ func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server
|
||||
event: eventEngine,
|
||||
getGroupKeyRun: NewGetGroupKeyRunRepository(pool, opts.v, opts.l),
|
||||
jobRun: NewJobRunEngineRepository(pool, opts.v, opts.l),
|
||||
stepRun: NewStepRunEngineRepository(queuePool, opts.v, opts.l, cf, rlCache),
|
||||
stepRun: stepRunEngine,
|
||||
step: NewStepRepository(pool, opts.v, opts.l),
|
||||
tenant: NewTenantEngineRepository(pool, opts.v, opts.l, opts.cache),
|
||||
tenantAlerting: NewTenantAlertingEngineRepository(pool, opts.v, opts.l, opts.cache),
|
||||
ticker: NewTickerRepository(pool, opts.v, opts.l),
|
||||
worker: NewWorkerEngineRepository(pool, opts.v, opts.l, opts.metered),
|
||||
workflow: NewWorkflowEngineRepository(pool, opts.v, opts.l, opts.metered),
|
||||
workflowRun: NewWorkflowRunEngineRepository(pool, opts.v, opts.l, opts.metered),
|
||||
workflowRun: NewWorkflowRunEngineRepository(stepRunEngine, pool, opts.v, opts.l, opts.metered),
|
||||
streamEvent: NewStreamEventsEngineRepository(pool, opts.v, opts.l),
|
||||
log: NewLogEngineRepository(pool, opts.v, opts.l),
|
||||
rateLimit: NewRateLimitEngineRepository(pool, opts.v, opts.l),
|
||||
|
||||
+719
-193
File diff suppressed because it is too large
Load Diff
@@ -6,11 +6,27 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/config/server"
|
||||
"github.com/hatchet-dev/hatchet/pkg/validator"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultFlushPeriod = 10 * time.Millisecond
|
||||
defaultMaxCapacity = 100
|
||||
)
|
||||
|
||||
func setDefaults(cf *server.ConfigFileRuntime) {
|
||||
if cf.FlushPeriodMilliseconds != 0 {
|
||||
defaultFlushPeriod = time.Duration(cf.FlushPeriodMilliseconds) * time.Millisecond
|
||||
}
|
||||
|
||||
if cf.FlushItemsThreshold != 0 {
|
||||
defaultMaxCapacity = cf.FlushItemsThreshold
|
||||
}
|
||||
}
|
||||
|
||||
// This is a wrapper around the IngestBuf to manage multiple tenants
|
||||
// An example would be T is eventOps and U is *dbsqlc.Event
|
||||
|
||||
@@ -27,6 +43,9 @@ type TenantBufManagerOpts[T any, U any] struct {
|
||||
SizeFunc func(T) int `validate:"required"`
|
||||
L *zerolog.Logger `validate:"required"`
|
||||
V validator.Validator `validate:"required"`
|
||||
|
||||
FlushPeriod *time.Duration
|
||||
FlushItemsThreshold int
|
||||
}
|
||||
|
||||
// Create a new TenantBufferManager with generic types T for input and U for output
|
||||
@@ -43,14 +62,18 @@ func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*Tenant
|
||||
|
||||
defaultOpts := IngestBufOpts[T, U]{
|
||||
// something we can tune if we see this DB transaction is too slow
|
||||
MaxCapacity: 10000,
|
||||
FlushPeriod: 50 * time.Millisecond,
|
||||
MaxCapacity: defaultMaxCapacity,
|
||||
FlushPeriod: defaultFlushPeriod,
|
||||
MaxDataSizeInQueue: 4 * megabyte,
|
||||
OutputFunc: opts.OutputFunc,
|
||||
SizeFunc: opts.SizeFunc,
|
||||
L: opts.L,
|
||||
}
|
||||
|
||||
if opts.FlushPeriod != nil {
|
||||
defaultOpts.FlushPeriod = *opts.FlushPeriod
|
||||
}
|
||||
|
||||
return &TenantBufferManager[T, U]{
|
||||
tenants: sync.Map{},
|
||||
l: opts.L,
|
||||
|
||||
@@ -126,7 +126,7 @@ func (w *workflowRunEngineRepository) QueuePausedWorkflowRun(ctx context.Context
|
||||
ctx,
|
||||
w.pool,
|
||||
w.queries,
|
||||
tenantId,
|
||||
sqlchelpers.UUIDFromStr(tenantId),
|
||||
unpauseWorkflowRunQueueData{
|
||||
WorkflowId: workflowId,
|
||||
WorkflowRunId: workflowRunId,
|
||||
@@ -420,26 +420,28 @@ func (w *workflowRunAPIRepository) GetStepRunsForJobRuns(ctx context.Context, te
|
||||
}
|
||||
|
||||
type workflowRunEngineRepository struct {
|
||||
pool *pgxpool.Pool
|
||||
v validator.Validator
|
||||
queries *dbsqlc.Queries
|
||||
l *zerolog.Logger
|
||||
m *metered.Metered
|
||||
pool *pgxpool.Pool
|
||||
v validator.Validator
|
||||
queries *dbsqlc.Queries
|
||||
l *zerolog.Logger
|
||||
m *metered.Metered
|
||||
stepRunRepository *stepRunEngineRepository
|
||||
|
||||
createCallbacks []repository.Callback[*dbsqlc.WorkflowRun]
|
||||
queuedCallbacks []repository.Callback[pgtype.UUID]
|
||||
}
|
||||
|
||||
func NewWorkflowRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, m *metered.Metered, cbs ...repository.Callback[*dbsqlc.WorkflowRun]) repository.WorkflowRunEngineRepository {
|
||||
func NewWorkflowRunEngineRepository(stepRunRepository *stepRunEngineRepository, pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, m *metered.Metered, cbs ...repository.Callback[*dbsqlc.WorkflowRun]) repository.WorkflowRunEngineRepository {
|
||||
queries := dbsqlc.New()
|
||||
|
||||
return &workflowRunEngineRepository{
|
||||
v: v,
|
||||
pool: pool,
|
||||
queries: queries,
|
||||
l: l,
|
||||
m: m,
|
||||
createCallbacks: cbs,
|
||||
v: v,
|
||||
pool: pool,
|
||||
queries: queries,
|
||||
l: l,
|
||||
m: m,
|
||||
createCallbacks: cbs,
|
||||
stepRunRepository: stepRunRepository,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -698,10 +700,7 @@ func (s *workflowRunEngineRepository) ReplayWorkflowRun(ctx context.Context, ten
|
||||
sev := dbsqlc.StepRunEventSeverityINFO
|
||||
reason := dbsqlc.StepRunEventReasonRETRIEDBYUSER
|
||||
|
||||
defer deferredStepRunEvent(
|
||||
s.l,
|
||||
s.pool,
|
||||
s.queries,
|
||||
defer s.stepRunRepository.deferredStepRunEvent(
|
||||
tenantId,
|
||||
stepRunIdStr,
|
||||
repository.CreateStepRunEventOpts{
|
||||
@@ -1386,8 +1385,8 @@ func insertWorkflowRunQueueItem(
|
||||
ctx,
|
||||
dbtx,
|
||||
queries,
|
||||
tenantId,
|
||||
dbsqlc.InternalQueueWORKFLOWRUNUPDATE,
|
||||
[]pgtype.UUID{sqlchelpers.UUIDFromStr(tenantId)},
|
||||
[]dbsqlc.InternalQueue{dbsqlc.InternalQueueWORKFLOWRUNUPDATE},
|
||||
insertData,
|
||||
)
|
||||
}
|
||||
@@ -1396,7 +1395,7 @@ func insertPausedWorkflowRunQueueItem(
|
||||
ctx context.Context,
|
||||
dbtx dbsqlc.DBTX,
|
||||
queries *dbsqlc.Queries,
|
||||
tenantId string,
|
||||
tenantId pgtype.UUID,
|
||||
data unpauseWorkflowRunQueueData,
|
||||
) error {
|
||||
insertData := make([]any, 1)
|
||||
@@ -1406,8 +1405,8 @@ func insertPausedWorkflowRunQueueItem(
|
||||
ctx,
|
||||
dbtx,
|
||||
queries,
|
||||
tenantId,
|
||||
dbsqlc.InternalQueueWORKFLOWRUNPAUSED,
|
||||
[]pgtype.UUID{tenantId},
|
||||
[]dbsqlc.InternalQueue{dbsqlc.InternalQueueWORKFLOWRUNPAUSED},
|
||||
insertData,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -166,6 +166,11 @@ type ProcessStepRunUpdatesResult struct {
|
||||
Continue bool
|
||||
}
|
||||
|
||||
type ProcessStepRunUpdatesResultV2 struct {
|
||||
CompletedWorkflowRuns []*dbsqlc.ResolveWorkflowRunStatusRow
|
||||
Continue bool
|
||||
}
|
||||
|
||||
type StepRunEngineRepository interface {
|
||||
RegisterWorkflowRunCompletedCallback(callback Callback[*dbsqlc.ResolveWorkflowRunStatusRow])
|
||||
|
||||
@@ -183,7 +188,7 @@ type StepRunEngineRepository interface {
|
||||
|
||||
StepRunCancelled(ctx context.Context, tenantId, stepRunId string, cancelledAt time.Time, cancelledReason string) error
|
||||
|
||||
StepRunFailed(ctx context.Context, tenantId, stepRunId string, failedAt time.Time, errStr string) error
|
||||
StepRunFailed(ctx context.Context, tenantId, stepRunId string, failedAt time.Time, errStr string, retryCount int) error
|
||||
|
||||
ReplayStepRun(ctx context.Context, tenantId, stepRunId string, input []byte) (*dbsqlc.GetStepRunForEngineRow, error)
|
||||
|
||||
@@ -212,6 +217,8 @@ type StepRunEngineRepository interface {
|
||||
|
||||
ProcessStepRunUpdates(ctx context.Context, qlp *zerolog.Logger, tenantId string) (ProcessStepRunUpdatesResult, error)
|
||||
|
||||
ProcessStepRunUpdatesV2(ctx context.Context, qlp *zerolog.Logger, tenantId string) (ProcessStepRunUpdatesResultV2, error)
|
||||
|
||||
QueueStepRuns(ctx context.Context, ql *zerolog.Logger, tenantId string) (QueueStepRunsResult, error)
|
||||
|
||||
CleanupQueueItems(ctx context.Context, tenantId string) error
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE "Event" ADD COLUMN "insertOrder" INTEGER;
|
||||
@@ -0,0 +1,2 @@
|
||||
-- AlterEnum
|
||||
ALTER TYPE "InternalQueue" ADD VALUE 'STEP_RUN_UPDATE_V2';
|
||||
@@ -1345,6 +1345,7 @@ model QueueItem {
|
||||
enum InternalQueue {
|
||||
WORKER_SEMAPHORE_COUNT
|
||||
STEP_RUN_UPDATE
|
||||
STEP_RUN_UPDATE_V2
|
||||
WORKFLOW_RUN_UPDATE
|
||||
WORKFLOW_RUN_PAUSED
|
||||
}
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
-- Add value to enum type: "InternalQueue"
|
||||
ALTER TYPE "InternalQueue" ADD VALUE 'STEP_RUN_UPDATE_V2';
|
||||
@@ -1,4 +1,4 @@
|
||||
h1:/mlnkRldhqWkqHDSSy6fuNu9YCj10/Lvs2b1GhhKXX4=
|
||||
h1:MdUIwxqQIsS9pvLCRU1mB2nai1wTMJ7Vjl3FRjCr6uw=
|
||||
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
|
||||
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
|
||||
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
|
||||
@@ -63,3 +63,4 @@ h1:/mlnkRldhqWkqHDSSy6fuNu9YCj10/Lvs2b1GhhKXX4=
|
||||
20240928144316_v0.48.0.sql h1:TX7/rN8ghoysQXy2OAMX+LdSIEA6tbdd4BZfL9wtxFQ=
|
||||
20240930202706_v0.48.1.sql h1:CcgVHTRA4c9u5rQvSFFy/R9AdiSdTapqfpyUs0KGAf0=
|
||||
20240930233257_v0.49.0.sql h1:B+JMbME62DxaCnesydvQXPg+ZNB0kB/V8gSclh1VdY4=
|
||||
20241004122206_v0.49.1.sql h1:Fas5TXOp4a2g+y5sGBJG9wTaVL/WCaVJ9+ZlASN9Md4=
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
CREATE TYPE "ConcurrencyLimitStrategy" AS ENUM ('CANCEL_IN_PROGRESS', 'DROP_NEWEST', 'QUEUE_NEWEST', 'GROUP_ROUND_ROBIN');
|
||||
|
||||
-- CreateEnum
|
||||
CREATE TYPE "InternalQueue" AS ENUM ('WORKER_SEMAPHORE_COUNT', 'STEP_RUN_UPDATE', 'WORKFLOW_RUN_UPDATE', 'WORKFLOW_RUN_PAUSED');
|
||||
CREATE TYPE "InternalQueue" AS ENUM ('WORKER_SEMAPHORE_COUNT', 'STEP_RUN_UPDATE', 'WORKFLOW_RUN_UPDATE', 'WORKFLOW_RUN_PAUSED', 'STEP_RUN_UPDATE_V2');
|
||||
|
||||
-- CreateEnum
|
||||
CREATE TYPE "InviteLinkStatus" AS ENUM ('PENDING', 'ACCEPTED', 'REJECTED');
|
||||
|
||||
Reference in New Issue
Block a user