diff --git a/internal/queueutils/pool.go b/internal/queueutils/pool.go index 04e1b7830..bdfe6d3d7 100644 --- a/internal/queueutils/pool.go +++ b/internal/queueutils/pool.go @@ -4,15 +4,18 @@ import ( "sync" "time" + "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" + "github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers" "github.com/rs/zerolog" ) type OperationPool struct { - ops sync.Map - timeout time.Duration - description string - method OpMethod - ql *zerolog.Logger + ops sync.Map + timeout time.Duration + description string + method OpMethod + ql *zerolog.Logger + setTenantsMu sync.RWMutex } func NewOperationPool(ql *zerolog.Logger, timeout time.Duration, description string, method OpMethod) *OperationPool { @@ -24,11 +27,37 @@ func NewOperationPool(ql *zerolog.Logger, timeout time.Duration, description str } } +func (p *OperationPool) SetTenants(tenants []*dbsqlc.Tenant) { + p.setTenantsMu.Lock() + defer p.setTenantsMu.Unlock() + + tenantMap := make(map[string]bool) + + for _, t := range tenants { + tenantMap[sqlchelpers.UUIDToStr(t.ID)] = true + } + + // delete tenants that are not in the list + p.ops.Range(func(key, value interface{}) bool { + if _, ok := tenantMap[key.(string)]; !ok { + p.ops.Delete(key) + } + + return true + }) +} + func (p *OperationPool) RunOrContinue(id string) { + p.setTenantsMu.RLock() + defer p.setTenantsMu.RUnlock() + p.GetOperation(id).RunOrContinue(p.ql) } func (p *OperationPool) GetOperation(id string) *SerialOperation { + p.setTenantsMu.RLock() + defer p.setTenantsMu.RUnlock() + op, ok := p.ops.Load(id) if !ok { diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index ac25b48d9..b8eac5e15 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -976,7 +976,7 @@ func (ec *JobsControllerImpl) handleStepRunFailed(ctx context.Context, task *msg } func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRunId, errorReason string, failedAt time.Time) error { - stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, tenantId, stepRunId) + oldStepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, tenantId, stepRunId) if err != nil { return fmt.Errorf("could not get step run: %w", err) @@ -986,7 +986,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun defer ec.checkTenantQueue(ctx, tenantId) // determine if step run should be retried or not - shouldRetry := stepRun.SRRetryCount < stepRun.StepRetries + shouldRetry := oldStepRun.SRRetryCount < oldStepRun.StepRetries if shouldRetry { eventMessage := fmt.Sprintf("Step run failed on %s", failedAt.Format(time.RFC1123)) @@ -995,7 +995,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun if errorReason == "TIMED_OUT" { eventReason = dbsqlc.StepRunEventReasonTIMEDOUT - eventMessage = fmt.Sprintf("Step exceeded timeout duration (%s)", stepRun.StepTimeout.String) + eventMessage = fmt.Sprintf("Step exceeded timeout duration (%s)", oldStepRun.StepTimeout.String) } eventMessage += ", and will be retried." @@ -1005,7 +1005,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun EventMessage: repository.StringPtr(eventMessage), EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityCRITICAL), EventData: map[string]interface{}{ - "retry_count": stepRun.SRRetryCount, + "retry_count": oldStepRun.SRRetryCount, }, }) @@ -1013,19 +1013,12 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun return ec.mq.AddMessage( ctx, msgqueue.JOB_PROCESSING_QUEUE, - tasktypes.StepRunRetryToTask(stepRun, nil, errorReason), + tasktypes.StepRunRetryToTask(oldStepRun, nil, errorReason), ) } - // get the old step run to figure out the worker and dispatcher id, before we update the step run - oldStepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, tenantId, stepRunId) - - if err != nil { - return fmt.Errorf("could not get step run: %w", err) - } - // fail step run - err = ec.repo.StepRun().StepRunFailed(ctx, tenantId, stepRunId, failedAt, errorReason) + err = ec.repo.StepRun().StepRunFailed(ctx, tenantId, stepRunId, failedAt, errorReason, int(oldStepRun.SRRetryCount)) if err != nil { return fmt.Errorf("could not fail step run: %w", err) diff --git a/internal/services/controllers/jobs/queue.go b/internal/services/controllers/jobs/queue.go index 477ff9eed..2e4793930 100644 --- a/internal/services/controllers/jobs/queue.go +++ b/internal/services/controllers/jobs/queue.go @@ -35,9 +35,10 @@ type queue struct { // a custom queue logger ql *zerolog.Logger - tenantQueueOperations *queueutils.OperationPool - updateStepRunOperations *queueutils.OperationPool - timeoutStepRunOperations *queueutils.OperationPool + tenantQueueOperations *queueutils.OperationPool + updateStepRunOperations *queueutils.OperationPool + updateStepRunV2Operations *queueutils.OperationPool + timeoutStepRunOperations *queueutils.OperationPool } func newQueue( @@ -68,6 +69,7 @@ func newQueue( q.tenantQueueOperations = queueutils.NewOperationPool(ql, time.Second*5, "check tenant queue", q.scheduleStepRuns) q.updateStepRunOperations = queueutils.NewOperationPool(ql, time.Second*30, "update step runs", q.processStepRunUpdates) + q.updateStepRunV2Operations = queueutils.NewOperationPool(ql, time.Second*30, "update step runs (v2)", q.processStepRunUpdatesV2) q.timeoutStepRunOperations = queueutils.NewOperationPool(ql, time.Second*30, "timeout step runs", q.processStepRunTimeouts) return q, nil @@ -114,6 +116,18 @@ func (q *queue) Start() (func() error, error) { return nil, fmt.Errorf("could not schedule step run timeout: %w", err) } + _, err = q.s.NewJob( + gocron.DurationJob(time.Second*1), + gocron.NewTask( + q.runTenantUpdateStepRunsV2(ctx), + ), + ) + + if err != nil { + cancel() + return nil, fmt.Errorf("could not schedule step run update (v2): %w", err) + } + q.s.Start() postAck := func(task *msgqueue.Message) error { @@ -192,6 +206,7 @@ func (q *queue) handleCheckQueue(ctx context.Context, task *msgqueue.Message) er // if this tenant is registered, then we should check the queue q.tenantQueueOperations.RunOrContinue(metadata.TenantId) q.updateStepRunOperations.RunOrContinue(metadata.TenantId) + q.updateStepRunV2Operations.RunOrContinue(metadata.TenantId) return nil } @@ -208,6 +223,8 @@ func (q *queue) runTenantQueues(ctx context.Context) func() { return } + q.tenantQueueOperations.SetTenants(tenants) + for i := range tenants { tenantId := sqlchelpers.UUIDToStr(tenants[i].ID) @@ -273,6 +290,8 @@ func (q *queue) runTenantUpdateStepRuns(ctx context.Context) func() { return } + q.updateStepRunOperations.SetTenants(tenants) + for i := range tenants { tenantId := sqlchelpers.UUIDToStr(tenants[i].ID) @@ -356,6 +375,64 @@ func (q *queue) processStepRunUpdates(ctx context.Context, tenantId string) (boo return res.Continue, nil } +func (q *queue) runTenantUpdateStepRunsV2(ctx context.Context) func() { + return func() { + q.l.Debug().Msgf("partition: updating step run statuses (v2)") + + // list all tenants + tenants, err := q.repo.Tenant().ListTenantsByControllerPartition(ctx, q.p.GetControllerPartitionId()) + + if err != nil { + q.l.Err(err).Msg("could not list tenants") + return + } + + q.updateStepRunV2Operations.SetTenants(tenants) + + for i := range tenants { + tenantId := sqlchelpers.UUIDToStr(tenants[i].ID) + + q.updateStepRunV2Operations.RunOrContinue(tenantId) + } + } +} + +func (q *queue) processStepRunUpdatesV2(ctx context.Context, tenantId string) (bool, error) { + ctx, span := telemetry.NewSpan(ctx, "process-step-run-updates-v2") + defer span.End() + + dbCtx, cancel := context.WithTimeout(ctx, 300*time.Second) + defer cancel() + + res, err := q.repo.StepRun().ProcessStepRunUpdatesV2(dbCtx, q.ql, tenantId) + + if err != nil { + return false, fmt.Errorf("could not process step run updates (v2): %w", err) + } + + // for all finished workflow runs, send a message + for _, finished := range res.CompletedWorkflowRuns { + workflowRunId := sqlchelpers.UUIDToStr(finished.ID) + status := string(finished.Status) + + err := q.mq.AddMessage( + context.Background(), + msgqueue.WORKFLOW_PROCESSING_QUEUE, + tasktypes.WorkflowRunFinishedToTask( + tenantId, + workflowRunId, + status, + ), + ) + + if err != nil { + q.l.Error().Err(err).Msg("could not add workflow run finished task to task queue (v2)") + } + } + + return res.Continue, nil +} + func (q *queue) runTenantTimeoutStepRuns(ctx context.Context) func() { return func() { q.l.Debug().Msgf("partition: running timeout for step runs") @@ -368,6 +445,8 @@ func (q *queue) runTenantTimeoutStepRuns(ctx context.Context) func() { return } + q.timeoutStepRunOperations.SetTenants(tenants) + for i := range tenants { tenantId := sqlchelpers.UUIDToStr(tenants[i].ID) diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index f34e196c5..25aa52bca 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -104,6 +104,12 @@ type ConfigFileRuntime struct { // QueueLimit is the limit of items to return from a single queue at a time SingleQueueLimit int `mapstructure:"singleQueueLimit" json:"singleQueueLimit,omitempty" default:"100"` + // FlushPeriodMilliseconds is the number of milliseconds before flush + FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"` + + // FlushItemsThreshold is the number of items to hold in memory until flushing to the database + FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"` + // Allow new tenants to be created AllowSignup bool `mapstructure:"allowSignup" json:"allowSignup,omitempty" default:"true"` @@ -485,6 +491,8 @@ func BindAllEnv(v *viper.Viper) { _ = v.BindEnv("msgQueue.rabbitmq.qos", "SERVER_MSGQUEUE_RABBITMQ_QOS") _ = v.BindEnv("runtime.requeueLimit", "SERVER_REQUEUE_LIMIT") _ = v.BindEnv("runtime.singleQueueLimit", "SERVER_SINGLE_QUEUE_LIMIT") + _ = v.BindEnv("runtime.flushPeriodMilliseconds", "SERVER_FLUSH_PERIOD_MILLISECONDS") + _ = v.BindEnv("runtime.flushItemsThreshold", "SERVER_FLUSH_ITEMS_THRESHOLD") // tls options _ = v.BindEnv("tls.tlsStrategy", "SERVER_TLS_STRATEGY") diff --git a/pkg/repository/prisma/buffered.go b/pkg/repository/prisma/buffered.go index 9fe55ceda..4984252d3 100644 --- a/pkg/repository/prisma/buffered.go +++ b/pkg/repository/prisma/buffered.go @@ -139,10 +139,12 @@ func (b *IngestBuf[T, U]) buffWorker() { b.safeAppendInternalArray(e) b.safeIncSizeOfData(b.calcSizeOfData([]T{e.item})) - if b.safeCheckSizeOfBuffer() >= b.maxCapacity || b.safeFetchSizeOfData() >= b.maxDataSizeInQueue { + // if last flush time + flush period is in the past, flush + if time.Now().After(b.safeFetchLastFlush().Add(b.flushPeriod)) { + b.flush(b.sliceInternalArray()) + } else if b.safeCheckSizeOfBuffer() >= b.maxCapacity || b.safeFetchSizeOfData() >= b.maxDataSizeInQueue { b.flush(b.sliceInternalArray()) } - case <-time.After(time.Until(b.safeFetchLastFlush().Add(b.flushPeriod))): b.flush(b.sliceInternalArray()) diff --git a/pkg/repository/prisma/db/db_gen.go b/pkg/repository/prisma/db/db_gen.go index c63e60024..65fd52348 100644 --- a/pkg/repository/prisma/db/db_gen.go +++ b/pkg/repository/prisma/db/db_gen.go @@ -1419,6 +1419,7 @@ model QueueItem { enum InternalQueue { WORKER_SEMAPHORE_COUNT STEP_RUN_UPDATE + STEP_RUN_UPDATE_V2 WORKFLOW_RUN_UPDATE WORKFLOW_RUN_PAUSED } @@ -2271,6 +2272,7 @@ type InternalQueue string const ( InternalQueueWorkerSemaphoreCount InternalQueue = "WORKER_SEMAPHORE_COUNT" InternalQueueStepRunUpdate InternalQueue = "STEP_RUN_UPDATE" + InternalQueueStepRunUpdateV2 InternalQueue = "STEP_RUN_UPDATE_V2" InternalQueueWorkflowRunUpdate InternalQueue = "WORKFLOW_RUN_UPDATE" InternalQueueWorkflowRunPaused InternalQueue = "WORKFLOW_RUN_PAUSED" ) diff --git a/pkg/repository/prisma/dbsqlc/models.go b/pkg/repository/prisma/dbsqlc/models.go index aacde3531..b248b0161 100644 --- a/pkg/repository/prisma/dbsqlc/models.go +++ b/pkg/repository/prisma/dbsqlc/models.go @@ -62,6 +62,7 @@ const ( InternalQueueSTEPRUNUPDATE InternalQueue = "STEP_RUN_UPDATE" InternalQueueWORKFLOWRUNUPDATE InternalQueue = "WORKFLOW_RUN_UPDATE" InternalQueueWORKFLOWRUNPAUSED InternalQueue = "WORKFLOW_RUN_PAUSED" + InternalQueueSTEPRUNUPDATEV2 InternalQueue = "STEP_RUN_UPDATE_V2" ) func (e *InternalQueue) Scan(src interface{}) error { diff --git a/pkg/repository/prisma/dbsqlc/queue.sql b/pkg/repository/prisma/dbsqlc/queue.sql index b565d6387..59e728305 100644 --- a/pkg/repository/prisma/dbsqlc/queue.sql +++ b/pkg/repository/prisma/dbsqlc/queue.sql @@ -191,14 +191,16 @@ INSERT INTO "priority" ) SELECT - @queue::"InternalQueue", + input."queue", true, input."data", - @tenantId::uuid, + input."tenantId", 1 FROM ( SELECT - unnest(@datas::json[]) AS "data" + unnest(cast(@queues::text[] as"InternalQueue"[])) AS "queue", + unnest(@datas::json[]) AS "data", + unnest(@tenantIds::uuid[]) AS "tenantId" ) AS input ON CONFLICT DO NOTHING; diff --git a/pkg/repository/prisma/dbsqlc/queue.sql.go b/pkg/repository/prisma/dbsqlc/queue.sql.go index e47a6256d..359920ecd 100644 --- a/pkg/repository/prisma/dbsqlc/queue.sql.go +++ b/pkg/repository/prisma/dbsqlc/queue.sql.go @@ -95,26 +95,28 @@ INSERT INTO "priority" ) SELECT - $1::"InternalQueue", + input."queue", true, input."data", - $2::uuid, + input."tenantId", 1 FROM ( SELECT - unnest($3::json[]) AS "data" + unnest(cast($1::text[] as"InternalQueue"[])) AS "queue", + unnest($2::json[]) AS "data", + unnest($3::uuid[]) AS "tenantId" ) AS input ON CONFLICT DO NOTHING ` type CreateInternalQueueItemsBulkParams struct { - Queue InternalQueue `json:"queue"` - Tenantid pgtype.UUID `json:"tenantid"` - Datas [][]byte `json:"datas"` + Queues []string `json:"queues"` + Datas [][]byte `json:"datas"` + Tenantids []pgtype.UUID `json:"tenantids"` } func (q *Queries) CreateInternalQueueItemsBulk(ctx context.Context, db DBTX, arg CreateInternalQueueItemsBulkParams) error { - _, err := db.Exec(ctx, createInternalQueueItemsBulk, arg.Queue, arg.Tenantid, arg.Datas) + _, err := db.Exec(ctx, createInternalQueueItemsBulk, arg.Queues, arg.Datas, arg.Tenantids) return err } diff --git a/pkg/repository/prisma/dbsqlc/schema.sql b/pkg/repository/prisma/dbsqlc/schema.sql index 98901b22e..003d36d60 100644 --- a/pkg/repository/prisma/dbsqlc/schema.sql +++ b/pkg/repository/prisma/dbsqlc/schema.sql @@ -2,7 +2,7 @@ CREATE TYPE "ConcurrencyLimitStrategy" AS ENUM ('CANCEL_IN_PROGRESS', 'DROP_NEWEST', 'QUEUE_NEWEST', 'GROUP_ROUND_ROBIN'); -- CreateEnum -CREATE TYPE "InternalQueue" AS ENUM ('WORKER_SEMAPHORE_COUNT', 'STEP_RUN_UPDATE', 'WORKFLOW_RUN_UPDATE', 'WORKFLOW_RUN_PAUSED'); +CREATE TYPE "InternalQueue" AS ENUM ('WORKER_SEMAPHORE_COUNT', 'STEP_RUN_UPDATE', 'WORKFLOW_RUN_UPDATE', 'WORKFLOW_RUN_PAUSED', 'STEP_RUN_UPDATE_V2'); -- CreateEnum CREATE TYPE "InviteLinkStatus" AS ENUM ('PENDING', 'ACCEPTED', 'REJECTED'); diff --git a/pkg/repository/prisma/repository.go b/pkg/repository/prisma/repository.go index 15f43fb01..18e7003b7 100644 --- a/pkg/repository/prisma/repository.go +++ b/pkg/repository/prisma/repository.go @@ -288,6 +288,8 @@ func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server f(opts) } + setDefaults(cf) + newLogger := opts.l.With().Str("service", "database").Logger() opts.l = &newLogger @@ -298,9 +300,23 @@ func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server rlCache := cache.New(5 * time.Minute) eventEngine, cleanupEventEngine, err := NewEventEngineRepository(pool, opts.v, opts.l, opts.metered) - return func() error { + if err != nil { + return nil, nil, err + } + stepRunEngine, cleanupStepRunEngine, err := NewStepRunEngineRepository(queuePool, opts.v, opts.l, cf, rlCache) + + if err != nil { + return nil, nil, err + } + + return func() error { rlCache.Stop() + + if err := cleanupStepRunEngine(); err != nil { + return err + } + return cleanupEventEngine() }, &engineRepository{ @@ -310,14 +326,14 @@ func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server event: eventEngine, getGroupKeyRun: NewGetGroupKeyRunRepository(pool, opts.v, opts.l), jobRun: NewJobRunEngineRepository(pool, opts.v, opts.l), - stepRun: NewStepRunEngineRepository(queuePool, opts.v, opts.l, cf, rlCache), + stepRun: stepRunEngine, step: NewStepRepository(pool, opts.v, opts.l), tenant: NewTenantEngineRepository(pool, opts.v, opts.l, opts.cache), tenantAlerting: NewTenantAlertingEngineRepository(pool, opts.v, opts.l, opts.cache), ticker: NewTickerRepository(pool, opts.v, opts.l), worker: NewWorkerEngineRepository(pool, opts.v, opts.l, opts.metered), workflow: NewWorkflowEngineRepository(pool, opts.v, opts.l, opts.metered), - workflowRun: NewWorkflowRunEngineRepository(pool, opts.v, opts.l, opts.metered), + workflowRun: NewWorkflowRunEngineRepository(stepRunEngine, pool, opts.v, opts.l, opts.metered), streamEvent: NewStreamEventsEngineRepository(pool, opts.v, opts.l), log: NewLogEngineRepository(pool, opts.v, opts.l), rateLimit: NewRateLimitEngineRepository(pool, opts.v, opts.l), diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index 4ed757452..b5f96a9c8 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -264,12 +264,23 @@ type stepRunEngineRepository struct { cachedMinQueuedIds sync.Map cachedStepIdHasRateLimit *cache.Cache callbacks []repository.Callback[*dbsqlc.ResolveWorkflowRunStatusRow] + + bulkStatusBuffer *TenantBufferManager[*updateStepRunQueueData, pgtype.UUID] + bulkEventBuffer *TenantBufferManager[*updateStepRunQueueData, int] } -func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, cf *server.ConfigFileRuntime, rlCache *cache.Cache) repository.StepRunEngineRepository { +func (s *stepRunEngineRepository) cleanup() error { + if err := s.bulkStatusBuffer.cleanup(); err != nil { + return err + } + + return s.bulkEventBuffer.cleanup() +} + +func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, cf *server.ConfigFileRuntime, rlCache *cache.Cache) (*stepRunEngineRepository, func() error, error) { queries := dbsqlc.New() - return &stepRunEngineRepository{ + s := &stepRunEngineRepository{ pool: pool, v: v, l: l, @@ -277,6 +288,344 @@ func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *ze cf: cf, cachedStepIdHasRateLimit: rlCache, } + + err := s.startBuffers() + + if err != nil { + l.Err(err).Msg("could not start buffers") + return nil, nil, err + } + + return s, s.cleanup, nil +} + +func sizeOfUpdateData(item *updateStepRunQueueData) int { + size := len(item.Output) + len(item.StepRunId) + + if item.Event != nil && item.Event.EventMessage != nil { + eventLength := len(*item.Event.EventMessage) + size += eventLength + } + + if item.Error != nil { + errorLength := len(*item.Error) + size += errorLength + } + + return size +} + +func (s *stepRunEngineRepository) startBuffers() error { + statusBufOpts := TenantBufManagerOpts[*updateStepRunQueueData, pgtype.UUID]{ + OutputFunc: s.bulkUpdateStepRunStatuses, + SizeFunc: sizeOfUpdateData, + L: s.l, + V: s.v, + } + + var err error + s.bulkStatusBuffer, err = NewTenantBufManager(statusBufOpts) + + if err != nil { + return err + } + + eventBufOpts := TenantBufManagerOpts[*updateStepRunQueueData, int]{ + OutputFunc: s.bulkWriteStepRunEvents, + SizeFunc: sizeOfUpdateData, + L: s.l, + V: s.v, + } + + s.bulkEventBuffer, err = NewTenantBufManager(eventBufOpts) + + return err +} + +func (s *stepRunEngineRepository) bulkUpdateStepRunStatuses(ctx context.Context, opts []*updateStepRunQueueData) ([]pgtype.UUID, error) { + startParams := dbsqlc.BulkStartStepRunParams{} + failParams := dbsqlc.BulkFailStepRunParams{} + cancelParams := dbsqlc.BulkCancelStepRunParams{} + finishParams := dbsqlc.BulkFinishStepRunParams{} + stepRunIds := make([]pgtype.UUID, 0, len(opts)) + + eventTimeSeen := make([]pgtype.Timestamp, 0, len(opts)) + eventReasons := make([]dbsqlc.StepRunEventReason, 0, len(opts)) + eventStepRunIds := make([]pgtype.UUID, 0, len(opts)) + eventSeverities := make([]dbsqlc.StepRunEventSeverity, 0, len(opts)) + eventMessages := make([]string, 0, len(opts)) + eventData := make([]map[string]interface{}, 0, len(opts)) + + for _, item := range opts { + stepRunId := sqlchelpers.UUIDFromStr(item.StepRunId) + stepRunIds = append(stepRunIds, stepRunId) + + if item.Status == nil { + continue + } + + switch dbsqlc.StepRunStatus(*item.Status) { + case dbsqlc.StepRunStatusRUNNING: + startParams.Steprunids = append(startParams.Steprunids, stepRunId) + startParams.Startedats = append(startParams.Startedats, sqlchelpers.TimestampFromTime(*item.StartedAt)) + case dbsqlc.StepRunStatusFAILED: + failParams.Steprunids = append(failParams.Steprunids, stepRunId) + failParams.Finishedats = append(failParams.Finishedats, sqlchelpers.TimestampFromTime(*item.FinishedAt)) + failParams.Errors = append(failParams.Errors, *item.Error) + case dbsqlc.StepRunStatusCANCELLED: + cancelParams.Steprunids = append(cancelParams.Steprunids, stepRunId) + cancelParams.Cancelledats = append(cancelParams.Cancelledats, sqlchelpers.TimestampFromTime(*item.CancelledAt)) + cancelParams.Finishedats = append(cancelParams.Finishedats, sqlchelpers.TimestampFromTime(*item.CancelledAt)) + cancelParams.Cancelledreasons = append(cancelParams.Cancelledreasons, *item.CancelledReason) + case dbsqlc.StepRunStatusSUCCEEDED: + finishParams.Steprunids = append(finishParams.Steprunids, stepRunId) + finishParams.Finishedats = append(finishParams.Finishedats, sqlchelpers.TimestampFromTime(*item.FinishedAt)) + finishParams.Outputs = append(finishParams.Outputs, item.Output) + } + + switch dbsqlc.StepRunStatus(*item.Status) { + case dbsqlc.StepRunStatusRUNNING: + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.StartedAt)) + eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonSTARTED) + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityINFO) + eventMessages = append(eventMessages, fmt.Sprintf("Step run started at %s", item.StartedAt.Format(time.RFC1123))) + eventData = append(eventData, map[string]interface{}{}) + case dbsqlc.StepRunStatusFAILED: + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.FinishedAt)) + + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventMessage := fmt.Sprintf("Step run failed on %s", item.FinishedAt.Format(time.RFC1123)) + eventReason := dbsqlc.StepRunEventReasonFAILED + + if item.Error != nil && *item.Error == "TIMED_OUT" { + eventReason = dbsqlc.StepRunEventReasonTIMEDOUT + eventMessage = "Step exceeded timeout duration" + } + + eventReasons = append(eventReasons, eventReason) + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityCRITICAL) + eventMessages = append(eventMessages, eventMessage) + eventData = append(eventData, map[string]interface{}{ + "retry_count": item.RetryCount, + }) + case dbsqlc.StepRunStatusCANCELLED: + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.CancelledAt)) + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonCANCELLED) + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityWARNING) + eventMessages = append(eventMessages, fmt.Sprintf("Step run was cancelled on %s for the following reason: %s", item.CancelledAt.Format(time.RFC1123), *item.CancelledReason)) + eventData = append(eventData, map[string]interface{}{}) + case dbsqlc.StepRunStatusSUCCEEDED: + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.FinishedAt)) + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonFINISHED) + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityINFO) + eventMessages = append(eventMessages, fmt.Sprintf("Step run finished at %s", item.FinishedAt.Format(time.RFC1123))) + eventData = append(eventData, map[string]interface{}{}) + } + } + + tx, commit, rollback, err := prepareTx(ctx, s.pool, s.l, 25000) + + if err != nil { + return nil, err + } + + defer rollback() + + if len(startParams.Steprunids) > 0 { + err = s.queries.BulkStartStepRun(ctx, tx, startParams) + + if err != nil { + return nil, fmt.Errorf("could not start step runs: %w", err) + } + } + + if len(failParams.Steprunids) > 0 { + err = s.queries.BulkFailStepRun(ctx, tx, failParams) + + if err != nil { + return nil, fmt.Errorf("could not fail step runs: %w", err) + } + } + + if len(cancelParams.Steprunids) > 0 { + err = s.queries.BulkCancelStepRun(ctx, tx, cancelParams) + + if err != nil { + return nil, fmt.Errorf("could not cancel step runs: %w", err) + } + } + + if len(finishParams.Steprunids) > 0 { + err = s.queries.BulkFinishStepRun(ctx, tx, finishParams) + + if err != nil { + return nil, fmt.Errorf("could not finish step runs: %w", err) + } + } + + insertInternalQITenantIds := make([]pgtype.UUID, 0, len(opts)) + insertInternalQIQueues := make([]dbsqlc.InternalQueue, 0, len(opts)) + insertInternalQIData := make([]any, 0, len(opts)) + + for _, item := range opts { + itemCp := item + + insertInternalQITenantIds = append(insertInternalQITenantIds, sqlchelpers.UUIDFromStr(itemCp.TenantId)) + insertInternalQIQueues = append(insertInternalQIQueues, dbsqlc.InternalQueueSTEPRUNUPDATEV2) + insertInternalQIData = append(insertInternalQIData, itemCp) + } + + err = bulkInsertInternalQueueItem( + ctx, + tx, + s.queries, + insertInternalQITenantIds, + insertInternalQIQueues, + insertInternalQIData, + ) + + if err != nil { + return nil, err + } + + err = commit(ctx) + + if err != nil { + return nil, err + } + + bulkStepRunEvents( + ctx, + s.l, + s.pool, + s.queries, + eventStepRunIds, + eventTimeSeen, + eventReasons, + eventSeverities, + eventMessages, + eventData, + ) + + return stepRunIds, nil +} + +func (s *stepRunEngineRepository) bulkWriteStepRunEvents(ctx context.Context, opts []*updateStepRunQueueData) ([]int, error) { + res := make([]int, 0, len(opts)) + eventTimeSeen := make([]pgtype.Timestamp, 0, len(opts)) + eventReasons := make([]dbsqlc.StepRunEventReason, 0, len(opts)) + eventStepRunIds := make([]pgtype.UUID, 0, len(opts)) + eventSeverities := make([]dbsqlc.StepRunEventSeverity, 0, len(opts)) + eventMessages := make([]string, 0, len(opts)) + eventData := make([]map[string]interface{}, 0, len(opts)) + dedupe := make(map[string]bool) + + for i, item := range opts { + stepRunId := sqlchelpers.UUIDFromStr(item.StepRunId) + res = append(res, i) + + if item.Event != nil { + if item.Event.EventMessage == nil || item.Event.EventReason == nil { + continue + } + + dedupeKey := fmt.Sprintf("EVENT-%s-%s", item.StepRunId, *item.Event.EventReason) + + if _, ok := dedupe[dedupeKey]; ok { + continue + } + + dedupe[dedupeKey] = true + + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventMessages = append(eventMessages, *item.Event.EventMessage) + eventReasons = append(eventReasons, *item.Event.EventReason) + + if item.Event.EventSeverity != nil { + eventSeverities = append(eventSeverities, *item.Event.EventSeverity) + } else { + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityINFO) + } + + if item.Event.EventData != nil { + eventData = append(eventData, item.Event.EventData) + } else { + eventData = append(eventData, map[string]interface{}{}) + } + + if item.Event.Timestamp != nil { + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.Event.Timestamp)) + } else { + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(time.Now().UTC())) + } + + continue + } + + if item.Status == nil { + continue + } + + switch dbsqlc.StepRunStatus(*item.Status) { + case dbsqlc.StepRunStatusRUNNING: + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.StartedAt)) + eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonSTARTED) + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityINFO) + eventMessages = append(eventMessages, fmt.Sprintf("Step run started at %s", item.StartedAt.Format(time.RFC1123))) + eventData = append(eventData, map[string]interface{}{}) + case dbsqlc.StepRunStatusFAILED: + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.FinishedAt)) + + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventMessage := fmt.Sprintf("Step run failed on %s", item.FinishedAt.Format(time.RFC1123)) + eventReason := dbsqlc.StepRunEventReasonFAILED + + if item.Error != nil && *item.Error == "TIMED_OUT" { + eventReason = dbsqlc.StepRunEventReasonTIMEDOUT + eventMessage = "Step exceeded timeout duration" + } + + eventReasons = append(eventReasons, eventReason) + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityCRITICAL) + eventMessages = append(eventMessages, eventMessage) + eventData = append(eventData, map[string]interface{}{ + "retry_count": item.RetryCount, + }) + case dbsqlc.StepRunStatusCANCELLED: + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.CancelledAt)) + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonCANCELLED) + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityWARNING) + eventMessages = append(eventMessages, fmt.Sprintf("Step run was cancelled on %s for the following reason: %s", item.CancelledAt.Format(time.RFC1123), *item.CancelledReason)) + eventData = append(eventData, map[string]interface{}{}) + case dbsqlc.StepRunStatusSUCCEEDED: + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.FinishedAt)) + eventStepRunIds = append(eventStepRunIds, stepRunId) + eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonFINISHED) + eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityINFO) + eventMessages = append(eventMessages, fmt.Sprintf("Step run finished at %s", item.FinishedAt.Format(time.RFC1123))) + eventData = append(eventData, map[string]interface{}{}) + } + } + + bulkStepRunEvents( + ctx, + s.l, + s.pool, + s.queries, + eventStepRunIds, + eventTimeSeen, + eventReasons, + eventSeverities, + eventMessages, + eventData, + ) + + return res, nil } func (s *stepRunEngineRepository) RegisterWorkflowRunCompletedCallback(callback repository.Callback[*dbsqlc.ResolveWorkflowRunStatusRow]) { @@ -630,34 +979,26 @@ func (s *stepRunEngineRepository) DeferredStepRunEvent( return } - deferredStepRunEvent( - s.l, - s.pool, - s.queries, + s.deferredStepRunEvent( tenantId, stepRunId, opts, ) } -func deferredStepRunEvent( - l *zerolog.Logger, - dbtx dbsqlc.DBTX, - queries *dbsqlc.Queries, +func (s *stepRunEngineRepository) deferredStepRunEvent( tenantId, stepRunId string, opts repository.CreateStepRunEventOpts, ) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - err := insertStepRunQueueItem(ctx, dbtx, queries, tenantId, updateStepRunQueueData{ + // fire-and-forget for events + _, err := s.bulkEventBuffer.BuffItem(tenantId, &updateStepRunQueueData{ StepRunId: stepRunId, + TenantId: tenantId, Event: &opts, }) if err != nil { - l.Err(err).Msg("could not create deferred step run event") - return + s.l.Error().Err(err).Msg("could not buffer event") } } @@ -1578,7 +1919,7 @@ func (s *stepRunEngineRepository) GetQueueCounts(ctx context.Context, tenantId s func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp *zerolog.Logger, tenantId string) (repository.ProcessStepRunUpdatesResult, error) { ql := qlp.With().Str("tenant_id", tenantId).Logger() - startedAt := time.Now().UTC() + // startedAt := time.Now().UTC() emptyRes := repository.ProcessStepRunUpdatesResult{ Continue: false, @@ -1623,6 +1964,143 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp return emptyRes, fmt.Errorf("could not convert internal queue item data to worker semaphore queue data: %w", err) } + succeededStepRuns, completedWorkflowRuns, err := s.processStepRunUpdates(ctx, &ql, tenantId, tx, data) + + if err != nil { + return emptyRes, fmt.Errorf("could not process step run updates v0: %w", err) + } + + qiIds := make([]int64, 0, len(data)) + + for _, item := range queueItems { + qiIds = append(qiIds, item.ID) + } + + // update the processed semaphore queue items + err = s.queries.MarkInternalQueueItemsProcessed(ctx, tx, qiIds) + + if err != nil { + return emptyRes, fmt.Errorf("could not mark worker semaphore queue items processed: %w", err) + } + + err = commit(ctx) + + if err != nil { + return emptyRes, fmt.Errorf("could not commit transaction: %w", err) + } + + for _, cb := range s.callbacks { + for _, wr := range completedWorkflowRuns { + wrCp := wr + cb.Do(s.l, tenantId, wrCp) + } + } + + return repository.ProcessStepRunUpdatesResult{ + SucceededStepRuns: succeededStepRuns, + CompletedWorkflowRuns: completedWorkflowRuns, + Continue: len(queueItems) == limit, + }, nil +} + +func (s *stepRunEngineRepository) ProcessStepRunUpdatesV2(ctx context.Context, qlp *zerolog.Logger, tenantId string) (repository.ProcessStepRunUpdatesResultV2, error) { + ql := qlp.With().Str("tenant_id", tenantId).Logger() + + emptyRes := repository.ProcessStepRunUpdatesResultV2{ + Continue: false, + } + + ctx, span := telemetry.NewSpan(ctx, "process-step-run-updates-database") + defer span.End() + + pgTenantId := sqlchelpers.UUIDFromStr(tenantId) + + limit := 100 + + if s.cf.SingleQueueLimit != 0 { + limit = s.cf.SingleQueueLimit * 4 // we call update step run 4x + } + + tx, commit, rollback, err := prepareTx(ctx, s.pool, s.l, 25000) + + if err != nil { + return emptyRes, err + } + + defer rollback() + + // list queues + queueItems, err := s.queries.ListInternalQueueItems(ctx, tx, dbsqlc.ListInternalQueueItemsParams{ + Tenantid: pgTenantId, + Queue: dbsqlc.InternalQueueSTEPRUNUPDATEV2, + Limit: pgtype.Int4{ + Int32: int32(limit), // nolint: gosec + Valid: true, + }, + }) + + if err != nil { + return emptyRes, fmt.Errorf("could not list queues: %w", err) + } + + data, err := toQueueItemData[updateStepRunQueueData](queueItems) + + if err != nil { + return emptyRes, fmt.Errorf("could not convert internal queue item data to worker semaphore queue data: %w", err) + } + + var completedWorkflowRuns []*dbsqlc.ResolveWorkflowRunStatusRow + + completedWorkflowRunsV1, err := s.processStepRunUpdatesV2(ctx, &ql, tenantId, tx, data) + + if err != nil { + return emptyRes, fmt.Errorf("could not process step run updates v1: %w", err) + } + + completedWorkflowRuns = append(completedWorkflowRuns, completedWorkflowRunsV1...) + + qiIds := make([]int64, 0, len(data)) + + for _, item := range queueItems { + qiIds = append(qiIds, item.ID) + } + + // update the processed semaphore queue items + err = s.queries.MarkInternalQueueItemsProcessed(ctx, tx, qiIds) + + if err != nil { + return emptyRes, fmt.Errorf("could not mark worker semaphore queue items processed: %w", err) + } + + err = commit(ctx) + + if err != nil { + return emptyRes, fmt.Errorf("could not commit transaction: %w", err) + } + + for _, cb := range s.callbacks { + for _, wr := range completedWorkflowRuns { + wrCp := wr + cb.Do(s.l, tenantId, wrCp) + } + } + + return repository.ProcessStepRunUpdatesResultV2{ + CompletedWorkflowRuns: completedWorkflowRuns, + Continue: len(queueItems) == limit, + }, nil +} + +func (s *stepRunEngineRepository) processStepRunUpdates( + ctx context.Context, + qlp *zerolog.Logger, + tenantId string, + tx dbsqlc.DBTX, + data []updateStepRunQueueData, +) (succeededStepRuns []*dbsqlc.GetStepRunForEngineRow, completedWorkflowRuns []*dbsqlc.ResolveWorkflowRunStatusRow, err error) { + // startedAt := time.Now().UTC() + pgTenantId := sqlchelpers.UUIDFromStr(tenantId) + startParams := dbsqlc.BulkStartStepRunParams{} failParams := dbsqlc.BulkFailStepRunParams{} cancelParams := dbsqlc.BulkCancelStepRunParams{} @@ -1718,6 +2196,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp case dbsqlc.StepRunStatusCANCELLED: cancelParams.Steprunids = append(cancelParams.Steprunids, stepRunId) cancelParams.Cancelledats = append(cancelParams.Cancelledats, sqlchelpers.TimestampFromTime(*item.CancelledAt)) + cancelParams.Finishedats = append(cancelParams.Finishedats, sqlchelpers.TimestampFromTime(*item.CancelledAt)) eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.CancelledAt)) cancelParams.Cancelledreasons = append(cancelParams.Cancelledreasons, *item.CancelledReason) eventStepRunIds = append(eventStepRunIds, stepRunId) @@ -1742,7 +2221,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp err = s.queries.BulkStartStepRun(ctx, tx, startParams) if err != nil { - return emptyRes, fmt.Errorf("could not start step runs: %w", err) + return nil, nil, fmt.Errorf("could not start step runs: %w", err) } } @@ -1750,7 +2229,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp err = s.queries.BulkFailStepRun(ctx, tx, failParams) if err != nil { - return emptyRes, fmt.Errorf("could not fail step runs: %w", err) + return nil, nil, fmt.Errorf("could not fail step runs: %w", err) } } @@ -1758,7 +2237,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp err = s.queries.BulkCancelStepRun(ctx, tx, cancelParams) if err != nil { - return emptyRes, fmt.Errorf("could not cancel step runs: %w", err) + return nil, nil, fmt.Errorf("could not cancel step runs: %w", err) } } @@ -1766,13 +2245,13 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp err = s.queries.BulkFinishStepRun(ctx, tx, finishParams) if err != nil { - return emptyRes, fmt.Errorf("could not finish step runs: %w", err) + return nil, nil, fmt.Errorf("could not finish step runs: %w", err) } } - durationUpdateStepRuns := time.Since(startedAt) + // durationUpdateStepRuns := time.Since(startedAt) - startResolveJobRunStatus := time.Now() + // startResolveJobRunStatus := time.Now() // update the job runs and workflow runs as well jobRunIds, err := s.queries.ResolveJobRunStatus(ctx, tx, dbsqlc.ResolveJobRunStatusParams{ @@ -1781,77 +2260,78 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp }) if err != nil { - return emptyRes, fmt.Errorf("could not resolve job run status: %w", err) + return nil, nil, fmt.Errorf("could not resolve job run status: %w", err) } - durationResolveJobRunStatus := time.Since(startResolveJobRunStatus) + // durationResolveJobRunStatus := time.Since(startResolveJobRunStatus) - startResolveWorkflowRuns := time.Now() + // startResolveWorkflowRuns := time.Now() - succeededStepRuns, err := s.queries.GetStepRunForEngine(ctx, tx, dbsqlc.GetStepRunForEngineParams{ + succeededStepRuns, err = s.queries.GetStepRunForEngine(ctx, tx, dbsqlc.GetStepRunForEngineParams{ Ids: finishParams.Steprunids, TenantId: pgTenantId, }) if err != nil { - return emptyRes, fmt.Errorf("could not get succeeded step runs: %w", err) + return nil, nil, fmt.Errorf("could not get succeeded step runs: %w", err) } - completedWorkflowRuns, err := s.queries.ResolveWorkflowRunStatus(ctx, tx, dbsqlc.ResolveWorkflowRunStatusParams{ + completedWorkflowRuns, err = s.queries.ResolveWorkflowRunStatus(ctx, tx, dbsqlc.ResolveWorkflowRunStatusParams{ Jobrunids: jobRunIds, Tenantid: pgTenantId, }) if err != nil { - return emptyRes, fmt.Errorf("could not resolve workflow run status: %w", err) + return nil, nil, fmt.Errorf("could not resolve workflow run status: %w", err) } - durationResolveWorkflowRuns := time.Since(startResolveWorkflowRuns) - - qiIds := make([]int64, 0, len(data)) - - for _, item := range queueItems { - qiIds = append(qiIds, item.ID) - } - - startMarkQueueItemsProcessed := time.Now() - - // update the processed semaphore queue items - err = s.queries.MarkInternalQueueItemsProcessed(ctx, tx, qiIds) - - if err != nil { - return emptyRes, fmt.Errorf("could not mark worker semaphore queue items processed: %w", err) - } - - durationMarkQueueItemsProcessed := time.Since(startMarkQueueItemsProcessed) - - startRunEvents := time.Now() + // durationResolveWorkflowRuns := time.Since(startResolveWorkflowRuns) // NOTE: actually not deferred bulkStepRunEvents(ctx, s.l, tx, s.queries, eventStepRunIds, eventTimeSeen, eventReasons, eventSeverities, eventMessages, eventData) - durationRunEvents := time.Since(startRunEvents) + // defer printProcessStepRunUpdateInfo(ql, tenantId, startedAt, len(stepRunIds), durationUpdateStepRuns, durationResolveJobRunStatus, durationResolveWorkflowRuns, durationMarkQueueItemsProcessed, durationRunEvents) - err = commit(ctx) + return succeededStepRuns, completedWorkflowRuns, nil +} + +func (s *stepRunEngineRepository) processStepRunUpdatesV2( + ctx context.Context, + qlp *zerolog.Logger, + tenantId string, + tx dbsqlc.DBTX, + data []updateStepRunQueueData, +) (completedWorkflowRuns []*dbsqlc.ResolveWorkflowRunStatusRow, err error) { + // startedAt := time.Now().UTC() + pgTenantId := sqlchelpers.UUIDFromStr(tenantId) + + stepRunIds := make([]pgtype.UUID, 0, len(data)) + + for _, item := range data { + stepRunId := sqlchelpers.UUIDFromStr(item.StepRunId) + stepRunIds = append(stepRunIds, stepRunId) + } + + // update the job runs and workflow runs as well + jobRunIds, err := s.queries.ResolveJobRunStatus(ctx, tx, dbsqlc.ResolveJobRunStatusParams{ + Steprunids: stepRunIds, + Tenantid: pgTenantId, + }) if err != nil { - return emptyRes, fmt.Errorf("could not commit transaction: %w", err) + return nil, fmt.Errorf("could not resolve job run status: %w", err) } - for _, cb := range s.callbacks { - for _, wr := range completedWorkflowRuns { - wrCp := wr - cb.Do(s.l, tenantId, wrCp) - } + completedWorkflowRuns, err = s.queries.ResolveWorkflowRunStatus(ctx, tx, dbsqlc.ResolveWorkflowRunStatusParams{ + Jobrunids: jobRunIds, + Tenantid: pgTenantId, + }) + + if err != nil { + return nil, fmt.Errorf("could not resolve workflow run status: %w", err) } - defer printProcessStepRunUpdateInfo(ql, tenantId, startedAt, len(stepRunIds), durationUpdateStepRuns, durationResolveJobRunStatus, durationResolveWorkflowRuns, durationMarkQueueItemsProcessed, durationRunEvents) - - return repository.ProcessStepRunUpdatesResult{ - SucceededStepRuns: succeededStepRuns, - CompletedWorkflowRuns: completedWorkflowRuns, - Continue: len(queueItems) == limit, - }, nil + return completedWorkflowRuns, nil } func (s *stepRunEngineRepository) CleanupQueueItems(ctx context.Context, tenantId string) error { @@ -1998,21 +2478,24 @@ func (s *stepRunEngineRepository) StepRunStarted(ctx context.Context, tenantId, running := string(dbsqlc.StepRunStatusRUNNING) - // write a queue item that the step run has started - err := insertStepRunQueueItem( - ctx, - s.pool, - s.queries, - tenantId, - updateStepRunQueueData{ - StepRunId: stepRunId, - StartedAt: &startedAt, - Status: &running, - }, - ) + data := &updateStepRunQueueData{ + StepRunId: stepRunId, + TenantId: tenantId, + StartedAt: &startedAt, + Status: &running, + } + + _, err := s.bulkStatusBuffer.BuffItem(tenantId, data) if err != nil { - return fmt.Errorf("could not insert step run queue item: %w", err) + return fmt.Errorf("could not buffer event: %w", err) + } + + // fire-and-forget for events + _, err = s.bulkEventBuffer.BuffItem(tenantId, data) + + if err != nil { + return fmt.Errorf("could not buffer event: %w", err) } return nil @@ -2022,8 +2505,51 @@ func (s *stepRunEngineRepository) StepRunSucceeded(ctx context.Context, tenantId ctx, span := telemetry.NewSpan(ctx, "step-run-started-db") defer span.End() + // write a queue item to release the worker semaphore + err := s.releaseWorkerSemaphoreSlot(ctx, s.pool, tenantId, stepRunId) + + if err != nil { + return fmt.Errorf("could not release worker semaphore queue items: %w", err) + } + finished := string(dbsqlc.StepRunStatusSUCCEEDED) + data := &updateStepRunQueueData{ + StepRunId: stepRunId, + TenantId: tenantId, + FinishedAt: &finishedAt, + Status: &finished, + Output: output, + } + + // we write to the buffer first so we don't get race conditions when we resolve workflow run statuses + done, err := s.bulkStatusBuffer.BuffItem(tenantId, data) + + if err != nil { + return fmt.Errorf("could not buffer step run succeeded: %w", err) + } + + var response *flushResponse[pgtype.UUID] + + select { + case response = <-done: + case <-ctx.Done(): + return ctx.Err() + case <-time.After(20 * time.Second): + return fmt.Errorf("timeout waiting for step run succeeded to be flushed to db") + } + + if response.err != nil { + return fmt.Errorf("could not flush step run succeeded: %w", response.err) + } + + // fire-and-forget for events + _, err = s.bulkEventBuffer.BuffItem(tenantId, data) + + if err != nil { + return fmt.Errorf("could not buffer event: %w", err) + } + tx, err := s.pool.Begin(ctx) if err != nil { @@ -2032,31 +2558,6 @@ func (s *stepRunEngineRepository) StepRunSucceeded(ctx context.Context, tenantId defer deferRollback(ctx, s.l, tx.Rollback) - // write a queue item to release the worker semaphore - err = s.releaseWorkerSemaphoreSlot(ctx, tx, tenantId, stepRunId) - - if err != nil { - return fmt.Errorf("could not release worker semaphore queue items: %w", err) - } - - // write a queue item that the step run has finished - err = insertStepRunQueueItem( - ctx, - tx, - s.queries, - tenantId, - updateStepRunQueueData{ - StepRunId: stepRunId, - FinishedAt: &finishedAt, - Status: &finished, - Output: output, - }, - ) - - if err != nil { - return fmt.Errorf("could not insert step run queue item: %w", err) - } - // update the job run lookup data err = s.queries.UpdateJobRunLookupDataWithStepRun(ctx, tx, dbsqlc.UpdateJobRunLookupDataWithStepRunParams{ Steprunid: sqlchelpers.UUIDFromStr(stepRunId), @@ -2079,8 +2580,43 @@ func (s *stepRunEngineRepository) StepRunCancelled(ctx context.Context, tenantId ctx, span := telemetry.NewSpan(ctx, "step-run-cancelled-db") defer span.End() + // write a queue item to release the worker semaphore + err := s.releaseWorkerSemaphoreSlot(ctx, s.pool, tenantId, stepRunId) + + if err != nil { + return fmt.Errorf("could not release worker semaphore queue items: %w", err) + } + cancelled := string(dbsqlc.StepRunStatusCANCELLED) + data := &updateStepRunQueueData{ + StepRunId: stepRunId, + TenantId: tenantId, + CancelledAt: &cancelledAt, + CancelledReason: &cancelledReason, + Status: &cancelled, + } + + done, err := s.bulkStatusBuffer.BuffItem(tenantId, data) + + if err != nil { + return fmt.Errorf("could not buffer step run succeeded: %w", err) + } + + var response *flushResponse[pgtype.UUID] + + select { + case response = <-done: + case <-ctx.Done(): + return ctx.Err() + case <-time.After(20 * time.Second): + return fmt.Errorf("timeout waiting for step run succeeded to be flushed to db") + } + + if response.err != nil { + return fmt.Errorf("could not flush step run succeeded: %w", response.err) + } + tx, err := s.pool.Begin(ctx) if err != nil { @@ -2089,13 +2625,6 @@ func (s *stepRunEngineRepository) StepRunCancelled(ctx context.Context, tenantId defer deferRollback(ctx, s.l, tx.Rollback) - // release the worker semaphore - err = s.releaseWorkerSemaphoreSlot(ctx, tx, tenantId, stepRunId) - - if err != nil { - return fmt.Errorf("could not release worker semaphore queue items: %w", err) - } - // check that the step run is not in a final state stepRun, err := s.getStepRunForEngineTx(ctx, tx, tenantId, stepRunId) @@ -2104,24 +2633,6 @@ func (s *stepRunEngineRepository) StepRunCancelled(ctx context.Context, tenantId } if !repository.IsFinalStepRunStatus(stepRun.SRStatus) { - // write a queue item that the step run has failed - err = insertStepRunQueueItem( - ctx, - tx, - s.queries, - tenantId, - updateStepRunQueueData{ - StepRunId: stepRunId, - CancelledAt: &cancelledAt, - CancelledReason: &cancelledReason, - Status: &cancelled, - }, - ) - - if err != nil { - return fmt.Errorf("could not insert step run queue item: %w", err) - } - _, err = s.queries.ResolveLaterStepRuns(ctx, tx, dbsqlc.ResolveLaterStepRunsParams{ Steprunid: sqlchelpers.UUIDFromStr(stepRunId), Tenantid: sqlchelpers.UUIDFromStr(tenantId), @@ -2140,12 +2651,55 @@ func (s *stepRunEngineRepository) StepRunCancelled(ctx context.Context, tenantId return nil } -func (s *stepRunEngineRepository) StepRunFailed(ctx context.Context, tenantId, stepRunId string, failedAt time.Time, errStr string) error { +func (s *stepRunEngineRepository) StepRunFailed(ctx context.Context, tenantId, stepRunId string, failedAt time.Time, errStr string, retryCount int) error { ctx, span := telemetry.NewSpan(ctx, "step-run-failed-db") defer span.End() + // release the worker semaphore + err := s.releaseWorkerSemaphoreSlot(ctx, s.pool, tenantId, stepRunId) + + if err != nil { + return fmt.Errorf("could not release worker semaphore queue items: %w", err) + } + failed := string(dbsqlc.StepRunStatusFAILED) + data := &updateStepRunQueueData{ + StepRunId: stepRunId, + TenantId: tenantId, + RetryCount: retryCount, + FinishedAt: &failedAt, + Error: &errStr, + Status: &failed, + } + + done, err := s.bulkStatusBuffer.BuffItem(tenantId, data) + + if err != nil { + return fmt.Errorf("could not buffer step run succeeded: %w", err) + } + + var response *flushResponse[pgtype.UUID] + + select { + case response = <-done: + case <-ctx.Done(): + return ctx.Err() + case <-time.After(20 * time.Second): + return fmt.Errorf("timeout waiting for step run succeeded to be flushed to db") + } + + if response.err != nil { + return fmt.Errorf("could not flush step run succeeded: %w", response.err) + } + + // fire-and-forget for events + _, err = s.bulkEventBuffer.BuffItem(tenantId, data) + + if err != nil { + return fmt.Errorf("could not buffer event: %w", err) + } + tx, err := s.pool.Begin(ctx) if err != nil { @@ -2154,13 +2708,6 @@ func (s *stepRunEngineRepository) StepRunFailed(ctx context.Context, tenantId, s defer deferRollback(ctx, s.l, tx.Rollback) - // release the worker semaphore - err = s.releaseWorkerSemaphoreSlot(ctx, tx, tenantId, stepRunId) - - if err != nil { - return fmt.Errorf("could not release worker semaphore queue items: %w", err) - } - // check that the step run is not in a final state stepRun, err := s.getStepRunForEngineTx(ctx, tx, tenantId, stepRunId) @@ -2169,25 +2716,6 @@ func (s *stepRunEngineRepository) StepRunFailed(ctx context.Context, tenantId, s } if !repository.IsFinalStepRunStatus(stepRun.SRStatus) { - // write a queue item that the step run has failed - err = insertStepRunQueueItem( - ctx, - tx, - s.queries, - tenantId, - updateStepRunQueueData{ - StepRunId: stepRunId, - RetryCount: int(stepRun.SRRetryCount), - FinishedAt: &failedAt, - Error: &errStr, - Status: &failed, - }, - ) - - if err != nil { - return fmt.Errorf("could not insert step run queue item: %w", err) - } - _, err = s.queries.ResolveLaterStepRuns(ctx, tx, dbsqlc.ResolveLaterStepRunsParams{ Steprunid: sqlchelpers.UUIDFromStr(stepRunId), Tenantid: sqlchelpers.UUIDFromStr(tenantId), @@ -2227,10 +2755,7 @@ func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, s sev := dbsqlc.StepRunEventSeverityINFO reason := dbsqlc.StepRunEventReasonRETRIEDBYUSER - defer deferredStepRunEvent( - s.l, - s.pool, - s.queries, + defer s.deferredStepRunEvent( tenantId, stepRunId, repository.CreateStepRunEventOpts{ @@ -2295,10 +2820,7 @@ func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, s sev := dbsqlc.StepRunEventSeverityINFO reason := dbsqlc.StepRunEventReasonRETRIEDBYUSER - defer deferredStepRunEvent( - s.l, - s.pool, - s.queries, + defer s.deferredStepRunEvent( tenantId, laterStepRunId, repository.CreateStepRunEventOpts{ @@ -2801,10 +3323,7 @@ func (s *stepRunEngineRepository) RefreshTimeoutBy(ctx context.Context, tenantId sev := dbsqlc.StepRunEventSeverityINFO reason := dbsqlc.StepRunEventReasonTIMEOUTREFRESHED - defer deferredStepRunEvent( - s.l, - s.pool, - s.queries, + defer s.deferredStepRunEvent( tenantId, stepRunId, repository.CreateStepRunEventOpts{ @@ -2874,8 +3393,8 @@ func (s *stepRunEngineRepository) removeFinalizedStepRuns(ctx context.Context, t return remaining, cancelled, nil } -func (s *stepRunEngineRepository) releaseWorkerSemaphoreSlot(ctx context.Context, tx pgx.Tx, tenantId, stepRunId string) error { - oldWorkerIdAndRetryCount, err := s.queries.UpdateStepRunUnsetWorkerId(ctx, tx, dbsqlc.UpdateStepRunUnsetWorkerIdParams{ +func (s *stepRunEngineRepository) releaseWorkerSemaphoreSlot(ctx context.Context, dbtx dbsqlc.DBTX, tenantId, stepRunId string) error { + oldWorkerIdAndRetryCount, err := s.queries.UpdateStepRunUnsetWorkerId(ctx, dbtx, dbsqlc.UpdateStepRunUnsetWorkerIdParams{ Steprunid: sqlchelpers.UUIDFromStr(stepRunId), Tenantid: sqlchelpers.UUIDFromStr(tenantId), }) @@ -2884,7 +3403,7 @@ func (s *stepRunEngineRepository) releaseWorkerSemaphoreSlot(ctx context.Context return err } - return s.queries.RemoveTimeoutQueueItem(ctx, tx, dbsqlc.RemoveTimeoutQueueItemParams{ + return s.queries.RemoveTimeoutQueueItem(ctx, dbtx, dbsqlc.RemoveTimeoutQueueItemParams{ Steprunid: sqlchelpers.UUIDFromStr(stepRunId), Retrycount: oldWorkerIdAndRetryCount.RetryCount, }) @@ -2910,6 +3429,7 @@ func toQueueItemData[d any](items []*dbsqlc.InternalQueueItem) ([]d, error) { type updateStepRunQueueData struct { StepRunId string `json:"step_run_id"` + TenantId string `json:"tenant_id"` RetryCount int `json:"retry_count,omitempty"` Event *repository.CreateStepRunEventOpts `json:"event,omitempty"` @@ -2923,32 +3443,32 @@ type updateStepRunQueueData struct { Status *string `json:"status,omitempty"` } -func insertStepRunQueueItem( - ctx context.Context, - dbtx dbsqlc.DBTX, - queries *dbsqlc.Queries, - tenantId string, - data updateStepRunQueueData, -) error { - insertData := make([]any, 1) - insertData[0] = data +// func insertStepRunQueueItem( +// ctx context.Context, +// dbtx dbsqlc.DBTX, +// queries *dbsqlc.Queries, +// tenantId string, +// data updateStepRunQueueData, +// ) error { +// insertData := make([]any, 1) +// insertData[0] = data - return bulkInsertInternalQueueItem( - ctx, - dbtx, - queries, - tenantId, - dbsqlc.InternalQueueSTEPRUNUPDATE, - insertData, - ) -} +// return bulkInsertInternalQueueItem( +// ctx, +// dbtx, +// queries, +// tenantId, +// dbsqlc.InternalQueueSTEPRUNUPDATEV2, +// insertData, +// ) +// } func bulkInsertInternalQueueItem( ctx context.Context, dbtx dbsqlc.DBTX, queries *dbsqlc.Queries, - tenantId string, - queue dbsqlc.InternalQueue, + tenantIds []pgtype.UUID, + queues []dbsqlc.InternalQueue, data []any, ) error { // construct bytes for the data @@ -2964,10 +3484,16 @@ func bulkInsertInternalQueueItem( insertData[i] = b } + insertQueues := make([]string, len(queues)) + + for i, q := range queues { + insertQueues[i] = string(q) + } + err := queries.CreateInternalQueueItemsBulk(ctx, dbtx, dbsqlc.CreateInternalQueueItemsBulkParams{ - Tenantid: sqlchelpers.UUIDFromStr(tenantId), - Queue: queue, - Datas: insertData, + Tenantids: tenantIds, + Queues: insertQueues, + Datas: insertData, }) if err != nil { diff --git a/pkg/repository/prisma/tenant_buffer.go b/pkg/repository/prisma/tenant_buffer.go index 0bdfe961e..b5f660924 100644 --- a/pkg/repository/prisma/tenant_buffer.go +++ b/pkg/repository/prisma/tenant_buffer.go @@ -6,11 +6,27 @@ import ( "sync" "time" + "github.com/hatchet-dev/hatchet/pkg/config/server" "github.com/hatchet-dev/hatchet/pkg/validator" "github.com/rs/zerolog" ) +var ( + defaultFlushPeriod = 10 * time.Millisecond + defaultMaxCapacity = 100 +) + +func setDefaults(cf *server.ConfigFileRuntime) { + if cf.FlushPeriodMilliseconds != 0 { + defaultFlushPeriod = time.Duration(cf.FlushPeriodMilliseconds) * time.Millisecond + } + + if cf.FlushItemsThreshold != 0 { + defaultMaxCapacity = cf.FlushItemsThreshold + } +} + // This is a wrapper around the IngestBuf to manage multiple tenants // An example would be T is eventOps and U is *dbsqlc.Event @@ -27,6 +43,9 @@ type TenantBufManagerOpts[T any, U any] struct { SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` V validator.Validator `validate:"required"` + + FlushPeriod *time.Duration + FlushItemsThreshold int } // Create a new TenantBufferManager with generic types T for input and U for output @@ -43,14 +62,18 @@ func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*Tenant defaultOpts := IngestBufOpts[T, U]{ // something we can tune if we see this DB transaction is too slow - MaxCapacity: 10000, - FlushPeriod: 50 * time.Millisecond, + MaxCapacity: defaultMaxCapacity, + FlushPeriod: defaultFlushPeriod, MaxDataSizeInQueue: 4 * megabyte, OutputFunc: opts.OutputFunc, SizeFunc: opts.SizeFunc, L: opts.L, } + if opts.FlushPeriod != nil { + defaultOpts.FlushPeriod = *opts.FlushPeriod + } + return &TenantBufferManager[T, U]{ tenants: sync.Map{}, l: opts.L, diff --git a/pkg/repository/prisma/workflow_run.go b/pkg/repository/prisma/workflow_run.go index a776158f1..fb0c88630 100644 --- a/pkg/repository/prisma/workflow_run.go +++ b/pkg/repository/prisma/workflow_run.go @@ -126,7 +126,7 @@ func (w *workflowRunEngineRepository) QueuePausedWorkflowRun(ctx context.Context ctx, w.pool, w.queries, - tenantId, + sqlchelpers.UUIDFromStr(tenantId), unpauseWorkflowRunQueueData{ WorkflowId: workflowId, WorkflowRunId: workflowRunId, @@ -420,26 +420,28 @@ func (w *workflowRunAPIRepository) GetStepRunsForJobRuns(ctx context.Context, te } type workflowRunEngineRepository struct { - pool *pgxpool.Pool - v validator.Validator - queries *dbsqlc.Queries - l *zerolog.Logger - m *metered.Metered + pool *pgxpool.Pool + v validator.Validator + queries *dbsqlc.Queries + l *zerolog.Logger + m *metered.Metered + stepRunRepository *stepRunEngineRepository createCallbacks []repository.Callback[*dbsqlc.WorkflowRun] queuedCallbacks []repository.Callback[pgtype.UUID] } -func NewWorkflowRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, m *metered.Metered, cbs ...repository.Callback[*dbsqlc.WorkflowRun]) repository.WorkflowRunEngineRepository { +func NewWorkflowRunEngineRepository(stepRunRepository *stepRunEngineRepository, pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, m *metered.Metered, cbs ...repository.Callback[*dbsqlc.WorkflowRun]) repository.WorkflowRunEngineRepository { queries := dbsqlc.New() return &workflowRunEngineRepository{ - v: v, - pool: pool, - queries: queries, - l: l, - m: m, - createCallbacks: cbs, + v: v, + pool: pool, + queries: queries, + l: l, + m: m, + createCallbacks: cbs, + stepRunRepository: stepRunRepository, } } @@ -698,10 +700,7 @@ func (s *workflowRunEngineRepository) ReplayWorkflowRun(ctx context.Context, ten sev := dbsqlc.StepRunEventSeverityINFO reason := dbsqlc.StepRunEventReasonRETRIEDBYUSER - defer deferredStepRunEvent( - s.l, - s.pool, - s.queries, + defer s.stepRunRepository.deferredStepRunEvent( tenantId, stepRunIdStr, repository.CreateStepRunEventOpts{ @@ -1386,8 +1385,8 @@ func insertWorkflowRunQueueItem( ctx, dbtx, queries, - tenantId, - dbsqlc.InternalQueueWORKFLOWRUNUPDATE, + []pgtype.UUID{sqlchelpers.UUIDFromStr(tenantId)}, + []dbsqlc.InternalQueue{dbsqlc.InternalQueueWORKFLOWRUNUPDATE}, insertData, ) } @@ -1396,7 +1395,7 @@ func insertPausedWorkflowRunQueueItem( ctx context.Context, dbtx dbsqlc.DBTX, queries *dbsqlc.Queries, - tenantId string, + tenantId pgtype.UUID, data unpauseWorkflowRunQueueData, ) error { insertData := make([]any, 1) @@ -1406,8 +1405,8 @@ func insertPausedWorkflowRunQueueItem( ctx, dbtx, queries, - tenantId, - dbsqlc.InternalQueueWORKFLOWRUNPAUSED, + []pgtype.UUID{tenantId}, + []dbsqlc.InternalQueue{dbsqlc.InternalQueueWORKFLOWRUNPAUSED}, insertData, ) } diff --git a/pkg/repository/step_run.go b/pkg/repository/step_run.go index 5d8985bc3..97d56b590 100644 --- a/pkg/repository/step_run.go +++ b/pkg/repository/step_run.go @@ -166,6 +166,11 @@ type ProcessStepRunUpdatesResult struct { Continue bool } +type ProcessStepRunUpdatesResultV2 struct { + CompletedWorkflowRuns []*dbsqlc.ResolveWorkflowRunStatusRow + Continue bool +} + type StepRunEngineRepository interface { RegisterWorkflowRunCompletedCallback(callback Callback[*dbsqlc.ResolveWorkflowRunStatusRow]) @@ -183,7 +188,7 @@ type StepRunEngineRepository interface { StepRunCancelled(ctx context.Context, tenantId, stepRunId string, cancelledAt time.Time, cancelledReason string) error - StepRunFailed(ctx context.Context, tenantId, stepRunId string, failedAt time.Time, errStr string) error + StepRunFailed(ctx context.Context, tenantId, stepRunId string, failedAt time.Time, errStr string, retryCount int) error ReplayStepRun(ctx context.Context, tenantId, stepRunId string, input []byte) (*dbsqlc.GetStepRunForEngineRow, error) @@ -212,6 +217,8 @@ type StepRunEngineRepository interface { ProcessStepRunUpdates(ctx context.Context, qlp *zerolog.Logger, tenantId string) (ProcessStepRunUpdatesResult, error) + ProcessStepRunUpdatesV2(ctx context.Context, qlp *zerolog.Logger, tenantId string) (ProcessStepRunUpdatesResultV2, error) + QueueStepRuns(ctx context.Context, ql *zerolog.Logger, tenantId string) (QueueStepRunsResult, error) CleanupQueueItems(ctx context.Context, tenantId string) error diff --git a/prisma/migrations/20241003135009_v0_49_0/migration.sql b/prisma/migrations/20241003135009_v0_49_0/migration.sql new file mode 100644 index 000000000..71a9aabdb --- /dev/null +++ b/prisma/migrations/20241003135009_v0_49_0/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Event" ADD COLUMN "insertOrder" INTEGER; diff --git a/prisma/migrations/20241004122152_v0_49_1/migration.sql b/prisma/migrations/20241004122152_v0_49_1/migration.sql new file mode 100644 index 000000000..ec909e885 --- /dev/null +++ b/prisma/migrations/20241004122152_v0_49_1/migration.sql @@ -0,0 +1,2 @@ +-- AlterEnum +ALTER TYPE "InternalQueue" ADD VALUE 'STEP_RUN_UPDATE_V2'; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index a645f36dc..943bdd1a4 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -1345,6 +1345,7 @@ model QueueItem { enum InternalQueue { WORKER_SEMAPHORE_COUNT STEP_RUN_UPDATE + STEP_RUN_UPDATE_V2 WORKFLOW_RUN_UPDATE WORKFLOW_RUN_PAUSED } diff --git a/sql/migrations/20241004122206_v0.49.1.sql b/sql/migrations/20241004122206_v0.49.1.sql new file mode 100644 index 000000000..9a39e07c7 --- /dev/null +++ b/sql/migrations/20241004122206_v0.49.1.sql @@ -0,0 +1,2 @@ +-- Add value to enum type: "InternalQueue" +ALTER TYPE "InternalQueue" ADD VALUE 'STEP_RUN_UPDATE_V2'; diff --git a/sql/migrations/atlas.sum b/sql/migrations/atlas.sum index 7de1568a9..37ba2f2b8 100644 --- a/sql/migrations/atlas.sum +++ b/sql/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:/mlnkRldhqWkqHDSSy6fuNu9YCj10/Lvs2b1GhhKXX4= +h1:MdUIwxqQIsS9pvLCRU1mB2nai1wTMJ7Vjl3FRjCr6uw= 20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k= 20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo= 20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs= @@ -63,3 +63,4 @@ h1:/mlnkRldhqWkqHDSSy6fuNu9YCj10/Lvs2b1GhhKXX4= 20240928144316_v0.48.0.sql h1:TX7/rN8ghoysQXy2OAMX+LdSIEA6tbdd4BZfL9wtxFQ= 20240930202706_v0.48.1.sql h1:CcgVHTRA4c9u5rQvSFFy/R9AdiSdTapqfpyUs0KGAf0= 20240930233257_v0.49.0.sql h1:B+JMbME62DxaCnesydvQXPg+ZNB0kB/V8gSclh1VdY4= +20241004122206_v0.49.1.sql h1:Fas5TXOp4a2g+y5sGBJG9wTaVL/WCaVJ9+ZlASN9Md4= diff --git a/sql/schema/schema.sql b/sql/schema/schema.sql index 525fc943c..f08acce29 100644 --- a/sql/schema/schema.sql +++ b/sql/schema/schema.sql @@ -2,7 +2,7 @@ CREATE TYPE "ConcurrencyLimitStrategy" AS ENUM ('CANCEL_IN_PROGRESS', 'DROP_NEWEST', 'QUEUE_NEWEST', 'GROUP_ROUND_ROBIN'); -- CreateEnum -CREATE TYPE "InternalQueue" AS ENUM ('WORKER_SEMAPHORE_COUNT', 'STEP_RUN_UPDATE', 'WORKFLOW_RUN_UPDATE', 'WORKFLOW_RUN_PAUSED'); +CREATE TYPE "InternalQueue" AS ENUM ('WORKER_SEMAPHORE_COUNT', 'STEP_RUN_UPDATE', 'WORKFLOW_RUN_UPDATE', 'WORKFLOW_RUN_PAUSED', 'STEP_RUN_UPDATE_V2'); -- CreateEnum CREATE TYPE "InviteLinkStatus" AS ENUM ('PENDING', 'ACCEPTED', 'REJECTED');