fix: internal queue items performance and race conditions (#943)

* fix: don't use xmin hack

* fix: assign not append

* refactor: parallel step run updates via hashes

* fix: intermittent double execution of child step runs

* fix: rollback rate limits

* fix: bulk event writes from single buffer

* expose cleanup

* fix: race conditions on failures and cancellations

* change logger defaults to warn and console
This commit is contained in:
abelanger5
2024-10-07 11:16:53 -04:00
committed by GitHub
parent f20ba7bb66
commit 3d218302ff
16 changed files with 682 additions and 546 deletions

View File

@@ -324,7 +324,7 @@ func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgq
}
// list the step runs which are startable
startableStepRuns, err := ec.repo.StepRun().ListStartableStepRuns(ctx, metadata.TenantId, payload.JobRunId, nil)
startableStepRuns, err := ec.repo.StepRun().ListInitialStepRunsForJobRun(ctx, metadata.TenantId, payload.JobRunId)
if err != nil {
return fmt.Errorf("could not list startable step runs: %w", err)
@@ -445,7 +445,8 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq
retryCount := int(stepRun.SRRetryCount) + 1
// write an event
defer ec.repo.StepRun().DeferredStepRunEvent(metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), repository.CreateStepRunEventOpts{
defer ec.repo.StepRun().DeferredStepRunEvent(metadata.TenantId, repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonRETRYING),
EventMessage: repository.StringPtr(
fmt.Sprintf("Retrying step run. This is retry %d / %d", retryCount, stepRun.StepRetries),
@@ -918,13 +919,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m
return fmt.Errorf("could not update step run: %w", err)
}
stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId)
if err != nil {
return fmt.Errorf("could not get step run: %w", err)
}
nextStepRuns, err := ec.repo.StepRun().ListStartableStepRuns(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.JobRunId), &payload.StepRunId)
nextStepRuns, err := ec.repo.StepRun().ListStartableStepRuns(ctx, metadata.TenantId, payload.StepRunId, true)
if err != nil {
ec.l.Error().Err(err).Msg("could not list startable step runs")
@@ -1000,7 +995,8 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
eventMessage += ", and will be retried."
defer ec.repo.StepRun().DeferredStepRunEvent(tenantId, stepRunId, repository.CreateStepRunEventOpts{
defer ec.repo.StepRun().DeferredStepRunEvent(tenantId, repository.CreateStepRunEventOpts{
StepRunId: stepRunId,
EventReason: repository.StepRunEventReasonPtr(eventReason),
EventMessage: repository.StringPtr(eventMessage),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityCRITICAL),

View File

@@ -322,10 +322,9 @@ func (q *queue) processStepRunUpdates(ctx context.Context, tenantId string) (boo
// queue the next step runs
tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId)
jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)
nextStepRuns, err := q.repo.StepRun().ListStartableStepRuns(ctx, tenantId, jobRunId, &stepRunId)
nextStepRuns, err := q.repo.StepRun().ListStartableStepRuns(ctx, tenantId, stepRunId, false)
if err != nil {
q.l.Error().Err(err).Msg("could not list startable step runs")
@@ -410,6 +409,44 @@ func (q *queue) processStepRunUpdatesV2(ctx context.Context, tenantId string) (b
return false, fmt.Errorf("could not process step run updates (v2): %w", err)
}
// for all succeeded step runs, check for startable child step runs
err = queueutils.MakeBatched(20, res.SucceededStepRuns, func(group []*dbsqlc.GetStepRunForEngineRow) error {
for _, stepRun := range group {
if stepRun.SRChildCount == 0 {
continue
}
// queue the next step runs
tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)
nextStepRuns, err := q.repo.StepRun().ListStartableStepRuns(ctx, tenantId, stepRunId, false)
if err != nil {
q.l.Error().Err(err).Msg("could not list startable step runs")
continue
}
for _, nextStepRun := range nextStepRuns {
err := q.mq.AddMessage(
context.Background(),
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.StepRunQueuedToTask(nextStepRun),
)
if err != nil {
q.l.Error().Err(err).Msg("could not queue next step run")
}
}
}
return nil
})
if err != nil {
return false, fmt.Errorf("could not process succeeded step runs: %w", err)
}
// for all finished workflow runs, send a message
for _, finished := range res.CompletedWorkflowRuns {
workflowRunId := sqlchelpers.UUIDToStr(finished.ID)

View File

@@ -783,7 +783,7 @@ func (wc *WorkflowsControllerImpl) startJobRun(ctx context.Context, tenantId, jo
}
// list the step runs which are startable
startableStepRuns, err := wc.repo.StepRun().ListStartableStepRuns(ctx, tenantId, jobRunId, nil)
startableStepRuns, err := wc.repo.StepRun().ListInitialStepRunsForJobRun(ctx, tenantId, jobRunId)
if err != nil {
return fmt.Errorf("could not list startable step runs: %w", err)

View File

@@ -514,8 +514,8 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms
if success {
defer d.repo.StepRun().DeferredStepRunEvent(
metadata.TenantId,
sqlchelpers.UUIDToStr(stepRun.SRID),
repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventMessage: repository.StringPtr("Sent step run to the assigned worker"),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonSENTTOWORKER),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityINFO),
@@ -529,8 +529,8 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms
defer d.repo.StepRun().DeferredStepRunEvent(
metadata.TenantId,
sqlchelpers.UUIDToStr(stepRun.SRID),
repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventMessage: repository.StringPtr("Could not send step run to assigned worker"),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityWARNING),

View File

@@ -13,10 +13,10 @@ type TLSConfigFile struct {
}
type LoggerConfigFile struct {
Level string `mapstructure:"level" json:"level,omitempty" default:"debug"`
Level string `mapstructure:"level" json:"level,omitempty" default:"warn"`
// format can be "json" or "console"
Format string `mapstructure:"format" json:"format,omitempty" default:"json"`
Format string `mapstructure:"format" json:"format,omitempty" default:"console"`
}
type OpenTelemetryConfigFile struct {

View File

@@ -104,12 +104,10 @@ INSERT INTO "Event" (
-- name: GetInsertedEvents :many
SELECT * FROM "Event"
WHERE xmin::text = (txid_current() % (2^32)::bigint)::text
WHERE "id" = ANY(@ids::uuid[])
ORDER BY "insertOrder" ASC;
-- name: ListEvents :many
WITH filtered_events AS (
SELECT

View File

@@ -281,14 +281,13 @@ func (q *Queries) GetEventsForRange(ctx context.Context, db DBTX) ([]*GetEventsF
}
const getInsertedEvents = `-- name: GetInsertedEvents :many
SELECT id, "createdAt", "updatedAt", "deletedAt", key, "tenantId", "replayedFromId", data, "additionalMetadata", "insertOrder" FROM "Event"
WHERE xmin::text = (txid_current() % (2^32)::bigint)::text
WHERE "id" = ANY($1::uuid[])
ORDER BY "insertOrder" ASC
`
func (q *Queries) GetInsertedEvents(ctx context.Context, db DBTX) ([]*Event, error) {
rows, err := db.Query(ctx, getInsertedEvents)
func (q *Queries) GetInsertedEvents(ctx context.Context, db DBTX, ids []pgtype.UUID) ([]*Event, error) {
rows, err := db.Query(ctx, getInsertedEvents, ids)
if err != nil {
return nil, err
}

View File

@@ -203,37 +203,60 @@ WHERE
AND child_run."status" = 'PENDING'
AND step_run_order."A" IS NULL;
-- name: ListStartableStepRuns :many
WITH job_run AS (
SELECT "status", "deletedAt"
FROM "JobRun"
WHERE
"id" = @jobRunId::uuid
AND "status" = 'RUNNING'
AND "deletedAt" IS NULL
)
-- name: ListStartableStepRunsManyParents :many
SELECT
DISTINCT ON (child_run."id")
child_run."id" AS "id"
FROM
"StepRun" AS child_run
"StepRun" AS parent_run
LEFT JOIN
"_StepRunOrder" AS step_run_order ON step_run_order."B" = child_run."id"
"_StepRunOrder" AS step_run_order ON step_run_order."A" = parent_run."id"
JOIN
job_run ON true
"StepRun" AS child_run ON step_run_order."B" = child_run."id"
WHERE
child_run."jobRunId" = @jobRunId::uuid
parent_run."id" = @parentStepRunId::uuid
AND child_run."status" = 'PENDING'
-- we look for whether the step run is startable ASSUMING that succeededParentStepRunId has succeeded,
-- so we are making sure that all other parent step runs have succeeded
-- we look for whether the step run is startable by ensuring that all parent step runs have succeeded
AND NOT EXISTS (
SELECT 1
FROM "_StepRunOrder" AS parent_order
JOIN "StepRun" AS parent_run ON parent_order."A" = parent_run."id"
WHERE
parent_order."B" = child_run."id"
AND parent_run."id" != sqlc.arg('succeededParentStepRunId')::uuid
AND parent_run."status" != 'SUCCEEDED'
)
-- AND we ensure that there's at least 2 parent step runs
AND EXISTS (
SELECT 1
FROM "_StepRunOrder" AS parent_order
JOIN "StepRun" AS parent_run ON parent_order."A" = parent_run."id"
WHERE
parent_order."B" = child_run."id"
OFFSET 1
);
-- name: ListStartableStepRunsSingleParent :many
SELECT
DISTINCT ON (child_run."id")
child_run."id" AS "id"
FROM
"StepRun" AS parent_run
LEFT JOIN
"_StepRunOrder" AS step_run_order ON step_run_order."A" = parent_run."id"
JOIN
"StepRun" AS child_run ON step_run_order."B" = child_run."id"
WHERE
parent_run."id" = @parentStepRunId::uuid
AND child_run."status" = 'PENDING'
-- we look for whether the step run is startable ASSUMING that parentStepRunId has succeeded,
-- but we only have one parent step run
AND NOT EXISTS (
SELECT 1
FROM "_StepRunOrder" AS parent_order
JOIN "StepRun" AS parent_run ON parent_order."A" = parent_run."id"
WHERE
parent_order."B" = child_run."id"
AND parent_run."id" != @parentStepRunId::uuid
);
-- name: ListStepRuns :many

View File

@@ -1523,47 +1523,86 @@ func (q *Queries) ListNonFinalChildStepRuns(ctx context.Context, db DBTX, arg Li
return items, nil
}
const listStartableStepRuns = `-- name: ListStartableStepRuns :many
WITH job_run AS (
SELECT "status", "deletedAt"
FROM "JobRun"
WHERE
"id" = $1::uuid
AND "status" = 'RUNNING'
AND "deletedAt" IS NULL
)
const listStartableStepRunsManyParents = `-- name: ListStartableStepRunsManyParents :many
SELECT
DISTINCT ON (child_run."id")
child_run."id" AS "id"
FROM
"StepRun" AS child_run
"StepRun" AS parent_run
LEFT JOIN
"_StepRunOrder" AS step_run_order ON step_run_order."B" = child_run."id"
"_StepRunOrder" AS step_run_order ON step_run_order."A" = parent_run."id"
JOIN
job_run ON true
"StepRun" AS child_run ON step_run_order."B" = child_run."id"
WHERE
child_run."jobRunId" = $1::uuid
parent_run."id" = $1::uuid
AND child_run."status" = 'PENDING'
-- we look for whether the step run is startable ASSUMING that succeededParentStepRunId has succeeded,
-- so we are making sure that all other parent step runs have succeeded
-- we look for whether the step run is startable by ensuring that all parent step runs have succeeded
AND NOT EXISTS (
SELECT 1
FROM "_StepRunOrder" AS parent_order
JOIN "StepRun" AS parent_run ON parent_order."A" = parent_run."id"
WHERE
parent_order."B" = child_run."id"
AND parent_run."id" != $2::uuid
AND parent_run."status" != 'SUCCEEDED'
)
-- AND we ensure that there's at least 2 parent step runs
AND EXISTS (
SELECT 1
FROM "_StepRunOrder" AS parent_order
JOIN "StepRun" AS parent_run ON parent_order."A" = parent_run."id"
WHERE
parent_order."B" = child_run."id"
OFFSET 1
)
`
type ListStartableStepRunsParams struct {
Jobrunid pgtype.UUID `json:"jobrunid"`
SucceededParentStepRunId pgtype.UUID `json:"succeededParentStepRunId"`
func (q *Queries) ListStartableStepRunsManyParents(ctx context.Context, db DBTX, parentsteprunid pgtype.UUID) ([]pgtype.UUID, error) {
rows, err := db.Query(ctx, listStartableStepRunsManyParents, parentsteprunid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []pgtype.UUID
for rows.Next() {
var id pgtype.UUID
if err := rows.Scan(&id); err != nil {
return nil, err
}
items = append(items, id)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
func (q *Queries) ListStartableStepRuns(ctx context.Context, db DBTX, arg ListStartableStepRunsParams) ([]pgtype.UUID, error) {
rows, err := db.Query(ctx, listStartableStepRuns, arg.Jobrunid, arg.SucceededParentStepRunId)
const listStartableStepRunsSingleParent = `-- name: ListStartableStepRunsSingleParent :many
SELECT
DISTINCT ON (child_run."id")
child_run."id" AS "id"
FROM
"StepRun" AS parent_run
LEFT JOIN
"_StepRunOrder" AS step_run_order ON step_run_order."A" = parent_run."id"
JOIN
"StepRun" AS child_run ON step_run_order."B" = child_run."id"
WHERE
parent_run."id" = $1::uuid
AND child_run."status" = 'PENDING'
-- we look for whether the step run is startable ASSUMING that parentStepRunId has succeeded,
-- but we only have one parent step run
AND NOT EXISTS (
SELECT 1
FROM "_StepRunOrder" AS parent_order
JOIN "StepRun" AS parent_run ON parent_order."A" = parent_run."id"
WHERE
parent_order."B" = child_run."id"
AND parent_run."id" != $1::uuid
)
`
func (q *Queries) ListStartableStepRunsSingleParent(ctx context.Context, db DBTX, parentsteprunid pgtype.UUID) ([]pgtype.UUID, error) {
rows, err := db.Query(ctx, listStartableStepRunsSingleParent, parentsteprunid)
if err != nil {
return nil, err
}

View File

@@ -234,7 +234,7 @@ type eventEngineRepository struct {
}
func (r *eventEngineRepository) cleanup() error {
return r.bulkCreateBuffer.cleanup()
return r.bulkCreateBuffer.Cleanup()
}
func NewEventEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, m *metered.Metered) (repository.EventEngineRepository, func() error, error) {
@@ -333,11 +333,13 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos
return nil, nil, err
}
params := make([]dbsqlc.CreateEventsParams, len(opts.Events))
ids := make([]pgtype.UUID, len(opts.Events))
for i, event := range opts.Events {
eventId := uuid.New().String()
params[i] = dbsqlc.CreateEventsParams{
ID: sqlchelpers.UUIDFromStr(uuid.New().String()),
ID: sqlchelpers.UUIDFromStr(eventId),
Key: event.Key,
TenantId: sqlchelpers.UUIDFromStr(event.TenantId),
Data: event.Data,
@@ -348,6 +350,7 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos
params[i].ReplayedFromId = sqlchelpers.UUIDFromStr(*event.ReplayedEvent)
}
ids[i] = sqlchelpers.UUIDFromStr(eventId)
}
// start a transaction
@@ -371,7 +374,7 @@ func (r *eventEngineRepository) BulkCreateEvent(ctx context.Context, opts *repos
r.l.Info().Msgf("inserted %d events", insertCount)
events, err := r.queries.GetInsertedEvents(ctx, tx)
events, err := r.queries.GetInsertedEvents(ctx, tx, ids)
if err != nil {
return nil, nil, fmt.Errorf("could not retrieve inserted events: %w", err)
@@ -419,6 +422,7 @@ func (r *eventEngineRepository) BulkCreateEventSharedTenant(ctx context.Context,
}
}
params := make([]dbsqlc.CreateEventsParams, len(opts))
ids := make([]pgtype.UUID, len(opts))
for i, event := range opts {
@@ -426,8 +430,10 @@ func (r *eventEngineRepository) BulkCreateEventSharedTenant(ctx context.Context,
return nil, fmt.Errorf("number of resources is out of range for int 32")
}
eventId := uuid.New().String()
params[i] = dbsqlc.CreateEventsParams{
ID: sqlchelpers.UUIDFromStr(uuid.New().String()),
ID: sqlchelpers.UUIDFromStr(eventId),
Key: event.Key,
TenantId: sqlchelpers.UUIDFromStr(event.TenantId),
Data: event.Data,
@@ -439,6 +445,7 @@ func (r *eventEngineRepository) BulkCreateEventSharedTenant(ctx context.Context,
params[i].ReplayedFromId = sqlchelpers.UUIDFromStr(*event.ReplayedEvent)
}
ids[i] = sqlchelpers.UUIDFromStr(eventId)
}
// start a transaction
@@ -462,7 +469,7 @@ func (r *eventEngineRepository) BulkCreateEventSharedTenant(ctx context.Context,
r.l.Info().Msgf("inserted %d events", insertCount)
events, err := r.queries.GetInsertedEvents(ctx, tx)
events, err := r.queries.GetInsertedEvents(ctx, tx, ids)
if err != nil {
return nil, fmt.Errorf("could not retrieve inserted events: %w", err)

File diff suppressed because it is too large Load Diff

View File

@@ -115,7 +115,7 @@ func (t *TenantBufferManager[T, U]) createTenantBuf(
}
// cleanup all tenant buffers
func (t *TenantBufferManager[T, U]) cleanup() error {
func (t *TenantBufferManager[T, U]) Cleanup() error {
t.tenants.Range(func(key, value interface{}) bool {
ingestBuf := value.(*IngestBuf[T, U])
_ = ingestBuf.cleanup()

View File

@@ -214,7 +214,7 @@ func TestTenantBufferManager_Cleanup(t *testing.T) {
require.NoError(t, err)
// Ensure buffers are cleaned up
err = manager.cleanup()
err = manager.Cleanup()
require.NoError(t, err)
// Try to buff an item after cleanup, should return an error

View File

@@ -702,8 +702,8 @@ func (s *workflowRunEngineRepository) ReplayWorkflowRun(ctx context.Context, ten
defer s.stepRunRepository.deferredStepRunEvent(
tenantId,
stepRunIdStr,
repository.CreateStepRunEventOpts{
StepRunId: stepRunIdStr,
EventMessage: repository.StringPtr("Workflow run was replayed, resetting step run result"),
EventSeverity: &sev,
EventReason: &reason,

View File

@@ -40,6 +40,8 @@ func IsFinalWorkflowRunStatus(status dbsqlc.WorkflowRunStatus) bool {
}
type CreateStepRunEventOpts struct {
StepRunId string `validate:"required,uuid"`
EventMessage *string
EventReason *dbsqlc.StepRunEventReason
@@ -167,6 +169,7 @@ type ProcessStepRunUpdatesResult struct {
}
type ProcessStepRunUpdatesResultV2 struct {
SucceededStepRuns []*dbsqlc.GetStepRunForEngineRow
CompletedWorkflowRuns []*dbsqlc.ResolveWorkflowRunStatusRow
Continue bool
}
@@ -225,14 +228,20 @@ type StepRunEngineRepository interface {
CleanupInternalQueueItems(ctx context.Context, tenantId string) error
ListStartableStepRuns(ctx context.Context, tenantId, jobRunId string, parentStepRunId *string) ([]*dbsqlc.GetStepRunForEngineRow, error)
ListInitialStepRunsForJobRun(ctx context.Context, tenantId, jobRunId string) ([]*dbsqlc.GetStepRunForEngineRow, error)
// ListStartableStepRuns returns a list of step runs that are in a startable state, assuming that the parentStepRunId has succeeded.
// The singleParent flag is used to determine if we should reject listing step runs with many parents. This is important to avoid
// race conditions where a step run is started by multiple parents completing at the same time. As a result, singleParent=false should
// be called from a serializable process after processing step run status updates.
ListStartableStepRuns(ctx context.Context, tenantId, parentStepRunId string, singleParent bool) ([]*dbsqlc.GetStepRunForEngineRow, error)
ArchiveStepRunResult(ctx context.Context, tenantId, stepRunId string, err *string) error
RefreshTimeoutBy(ctx context.Context, tenantId, stepRunId string, opts RefreshTimeoutBy) (pgtype.Timestamp, error)
DeferredStepRunEvent(
tenantId, stepRunId string,
tenantId string,
opts CreateStepRunEventOpts,
)

View File

@@ -146,7 +146,7 @@ func GeneratePlan(
// if we're rate limited then call rollback on the rate limits (this can happen if we've succeeded on one rate limit
// but failed on another)
for key := range stepRunRateUnits[stepId] {
for key := range stepRunRateUnits[stepRunId] {
if rateLimit, ok := rateLimits[key]; ok {
rateLimit.Rollback(stepRunId)
}
@@ -155,7 +155,7 @@ func GeneratePlan(
plan.HandleUnassigned(qi)
// if we can't assign the slot to any worker then we rollback the rate limit
for key := range stepRunRateUnits[stepId] {
for key := range stepRunRateUnits[stepRunId] {
if rateLimit, ok := rateLimits[key]; ok {
rateLimit.Rollback(stepRunId)
}