Fix: Call PopulateTaskRunData sequentially (#2097)

* fix: call task data lookup query sequentially

* fix: error fmt

* feat: add more span attrs

* fix: end + ctx handling

* fix: int type

* fix: handle dupes, factor out into helper

* fix: naming

* fix: unwind naming change

* fix: naming
This commit is contained in:
matt
2025-08-06 18:40:02 -04:00
committed by GitHub
parent 797c043e6c
commit 285f1728d5
3 changed files with 220 additions and 201 deletions
+113 -71
View File
@@ -15,6 +15,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"
"github.com/hatchet-dev/hatchet/internal/telemetry"
@@ -605,22 +606,18 @@ func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opt
return nil, 0, err
}
taskIds := make([]int64, 0)
taskInsertedAts := make([]pgtype.Timestamptz, 0)
idsInsertedAts := make([]TaskIdInsertedAt, 0, len(rows))
for _, row := range rows {
taskIds = append(taskIds, row.ID)
taskInsertedAts = append(taskInsertedAts, row.InsertedAt)
idsInsertedAts = append(idsInsertedAts, TaskIdInsertedAt{
ID: row.ID,
InsertedAt: row.InsertedAt,
})
}
tasksWithData, err := r.queries.PopulateTaskRunData(ctx, tx, sqlcv1.PopulateTaskRunDataParams{
Includepayloads: opts.IncludePayloads,
Taskids: taskIds,
Taskinsertedats: taskInsertedAts,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
})
tasksWithData, err := r.populateTaskRunData(ctx, tx, tenantId, idsInsertedAts, opts.IncludePayloads)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
if err != nil {
return nil, 0, err
}
@@ -659,26 +656,19 @@ func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId stri
return nil, taskIdToDagExternalId, err
}
idsInsertedAts := make([]TaskIdInsertedAt, 0, len(tasks))
for _, row := range tasks {
taskIdToDagExternalId[row.TaskID] = uuid.MustParse(sqlchelpers.UUIDToStr(row.DagExternalID))
idsInsertedAts = append(idsInsertedAts, TaskIdInsertedAt{
ID: row.TaskID,
InsertedAt: row.TaskInsertedAt,
})
}
taskIds := make([]int64, 0)
taskInsertedAts := make([]pgtype.Timestamptz, 0)
tasksWithData, err := r.populateTaskRunData(ctx, tx, tenantId, idsInsertedAts, includePayloads)
for _, row := range tasks {
taskIds = append(taskIds, row.TaskID)
taskInsertedAts = append(taskInsertedAts, row.TaskInsertedAt)
}
tasksWithData, err := r.queries.PopulateTaskRunData(ctx, tx, sqlcv1.PopulateTaskRunDataParams{
Taskids: taskIds,
Taskinsertedats: taskInsertedAts,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Includepayloads: includePayloads,
})
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
if err != nil {
return nil, taskIdToDagExternalId, err
}
@@ -690,6 +680,9 @@ func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId stri
}
func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error) {
ctx, span := telemetry.NewSpan(ctx, "list-tasks-by-id-and-inserted-at-olap")
defer span.End()
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.readPool, r.l, 15000)
if err != nil {
@@ -698,22 +691,18 @@ func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, ten
defer rollback()
taskIds := make([]int64, 0)
taskInsertedAts := make([]pgtype.Timestamptz, 0)
idsInsertedAts := make([]TaskIdInsertedAt, 0, len(taskMetadata))
for _, metadata := range taskMetadata {
taskIds = append(taskIds, metadata.TaskID)
taskInsertedAts = append(taskInsertedAts, sqlchelpers.TimestamptzFromTime(metadata.TaskInsertedAt))
idsInsertedAts = append(idsInsertedAts, TaskIdInsertedAt{
ID: metadata.TaskID,
InsertedAt: pgtype.Timestamptz{Time: metadata.TaskInsertedAt, Valid: true},
})
}
tasksWithData, err := r.queries.PopulateTaskRunData(ctx, tx, sqlcv1.PopulateTaskRunDataParams{
Taskids: taskIds,
Taskinsertedats: taskInsertedAts,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Includepayloads: true,
})
tasksWithData, err := r.populateTaskRunData(ctx, tx, tenantId, idsInsertedAts, true)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
if err != nil {
return nil, err
}
@@ -811,16 +800,17 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId stri
runIdsWithDAGs := make([]int64, 0)
runInsertedAtsWithDAGs := make([]pgtype.Timestamptz, 0)
runIdsWithTasks := make([]int64, 0)
runInsertedAtsWithTasks := make([]pgtype.Timestamptz, 0)
idsInsertedAts := make([]TaskIdInsertedAt, 0, len(workflowRunIds))
for _, row := range workflowRunIds {
if row.Kind == sqlcv1.V1RunKindDAG {
runIdsWithDAGs = append(runIdsWithDAGs, row.ID)
runInsertedAtsWithDAGs = append(runInsertedAtsWithDAGs, row.InsertedAt)
} else {
runIdsWithTasks = append(runIdsWithTasks, row.ID)
runInsertedAtsWithTasks = append(runInsertedAtsWithTasks, row.InsertedAt)
idsInsertedAts = append(idsInsertedAts, TaskIdInsertedAt{
ID: row.ID,
InsertedAt: row.InsertedAt,
})
}
}
@@ -843,24 +833,6 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId stri
dagsToPopulated[externalId] = dag
}
populatedTasks, err := r.queries.PopulateTaskRunData(ctx, tx, sqlcv1.PopulateTaskRunDataParams{
Taskids: runIdsWithTasks,
Taskinsertedats: runInsertedAtsWithTasks,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Includepayloads: opts.IncludePayloads,
})
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return nil, 0, err
}
tasksToPopulated := make(map[string]*sqlcv1.PopulateTaskRunDataRow)
for _, task := range populatedTasks {
externalId := sqlchelpers.UUIDToStr(task.ExternalID)
tasksToPopulated[externalId] = task
}
count, err := r.queries.CountWorkflowRuns(ctx, tx, countParams)
if err != nil {
@@ -868,6 +840,19 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId stri
count = int64(len(workflowRunIds))
}
tasksToPopulated := make(map[string]*sqlcv1.PopulateTaskRunDataRow)
populatedTasks, err := r.populateTaskRunData(ctx, tx, tenantId, idsInsertedAts, opts.IncludePayloads)
if err != nil {
return nil, 0, err
}
for _, task := range populatedTasks {
externalId := sqlchelpers.UUIDToStr(task.ExternalID)
tasksToPopulated[externalId] = task
}
if err := commit(ctx); err != nil {
return nil, 0, err
}
@@ -1446,6 +1431,9 @@ func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, te
}
func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error) {
ctx, span := telemetry.NewSpan(ctx, "get-task-timings-olap")
defer span.End()
if depth > 10 {
return nil, nil, fmt.Errorf("depth too large")
}
@@ -1490,26 +1478,21 @@ func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string
return nil, nil, err
}
taskIds := make([]int64, 0)
taskInsertedAts := make([]pgtype.Timestamptz, 0)
// associate each run external id with a depth
idsToDepth := make(map[string]int32)
idsInsertedAts := make([]TaskIdInsertedAt, 0, len(runsList))
for _, row := range runsList {
taskIds = append(taskIds, row.ID)
taskInsertedAts = append(taskInsertedAts, row.InsertedAt)
idsToDepth[sqlchelpers.UUIDToStr(row.ExternalID)] = row.Depth
idsInsertedAts = append(idsInsertedAts, TaskIdInsertedAt{
ID: row.ID,
InsertedAt: row.InsertedAt,
})
}
tasksWithData, err := r.queries.PopulateTaskRunData(ctx, r.readPool, sqlcv1.PopulateTaskRunDataParams{
Taskids: taskIds,
Taskinsertedats: taskInsertedAts,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
})
tasksWithData, err := r.populateTaskRunData(ctx, nil, tenantId, idsInsertedAts, false)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
if err != nil {
return nil, nil, err
}
@@ -1786,3 +1769,62 @@ func (r *OLAPRepositoryImpl) StoreCELEvaluationFailures(ctx context.Context, ten
Errors: errorMessages,
})
}
type TaskIdInsertedAt struct {
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
}
func (r *OLAPRepositoryImpl) populateTaskRunData(ctx context.Context, tx pgx.Tx, tenantId string, opts []TaskIdInsertedAt, includePayloads bool) ([]*sqlcv1.PopulateTaskRunDataRow, error) {
ctx, span := telemetry.NewSpan(ctx, "populate-task-run-data-olap")
defer span.End()
uniqueTaskIdInsertedAts := make(map[TaskIdInsertedAt]struct{})
for _, opt := range opts {
uniqueTaskIdInsertedAts[TaskIdInsertedAt{
ID: opt.ID,
InsertedAt: opt.InsertedAt,
}] = struct{}{}
}
span.SetAttributes(attribute.KeyValue{
Key: "populate-task-run-data-olap.batch_size",
Value: attribute.IntValue(len(uniqueTaskIdInsertedAts)),
})
if len(uniqueTaskIdInsertedAts) == 0 {
r.l.Warn().Msg("populateTaskRunData called with empty opts, returning empty result")
return []*sqlcv1.PopulateTaskRunDataRow{}, nil
}
idInsertedAtToData := make(map[TaskIdInsertedAt]*sqlcv1.PopulateTaskRunDataRow)
for idInsertedAt := range uniqueTaskIdInsertedAts {
taskData, err := r.queries.PopulateTaskRunData(ctx, tx, sqlcv1.PopulateTaskRunDataParams{
Includepayloads: includePayloads,
Taskid: idInsertedAt.ID,
Taskinsertedat: idInsertedAt.InsertedAt,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
})
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return nil, err
}
if errors.Is(err, pgx.ErrNoRows) {
r.l.Warn().Msgf("task %d not found with inserted at %s", idInsertedAt.ID, idInsertedAt.InsertedAt.Time)
continue
}
idInsertedAtToData[idInsertedAt] = taskData
}
result := make([]*sqlcv1.PopulateTaskRunDataRow, 0)
for _, taskData := range idInsertedAtToData {
result = append(result, taskData)
}
return result, nil
}
+35 -40
View File
@@ -492,58 +492,53 @@ LEFT JOIN
WHERE
(t.tenant_id, t.id, t.inserted_at) = (@tenantId::uuid, @taskId::bigint, @taskInsertedAt::timestamptz);
-- name: PopulateTaskRunData :many
WITH input AS (
-- name: PopulateTaskRunData :one
WITH task AS (
SELECT
UNNEST(@taskIds::bigint[]) AS id,
UNNEST(@taskInsertedAts::timestamptz[]) AS inserted_at
), tasks AS (
SELECT
DISTINCT ON(t.tenant_id, t.id, t.inserted_at)
t.tenant_id,
t.id,
t.inserted_at,
t.queue,
t.action_id,
t.step_id,
t.workflow_id,
t.workflow_version_id,
t.schedule_timeout,
t.step_timeout,
t.priority,
t.sticky,
t.desired_worker_id,
t.external_id,
t.display_name,
t.input,
t.additional_metadata,
t.readable_status,
t.parent_task_external_id,
t.workflow_run_id,
t.latest_retry_count
tenant_id,
id,
inserted_at,
queue,
action_id,
step_id,
workflow_id,
workflow_version_id,
schedule_timeout,
step_timeout,
priority,
sticky,
desired_worker_id,
external_id,
display_name,
input,
additional_metadata,
readable_status,
parent_task_external_id,
workflow_run_id,
latest_retry_count
FROM
v1_tasks_olap t
JOIN
input i ON i.id = t.id AND i.inserted_at = t.inserted_at
v1_tasks_olap
WHERE
t.tenant_id = @tenantId::uuid
tenant_id = @tenantId::uuid
AND id = @taskId::bigint
AND inserted_at = @taskInsertedAt::timestamptz
), relevant_events AS (
SELECT
e.*
FROM
v1_task_events_olap e
JOIN
tasks t ON t.id = e.task_id AND t.tenant_id = e.tenant_id AND t.inserted_at = e.task_inserted_at
task t ON (t.id, t.inserted_at, t.tenant_id) = (e.task_id, e.task_inserted_at, e.tenant_id)
), max_retry_counts AS (
SELECT
e.tenant_id,
e.task_id,
e.task_inserted_at,
MAX(e.retry_count) AS max_retry_count
tenant_id,
task_id,
task_inserted_at,
MAX(retry_count) AS max_retry_count
FROM
relevant_events e
relevant_events
GROUP BY
e.tenant_id, e.task_id, e.task_inserted_at
tenant_id, task_id, task_inserted_at
), queued_ats AS (
SELECT
e.task_id::bigint,
@@ -649,7 +644,7 @@ SELECT
ELSE '{}'::JSONB
END::JSONB as output
FROM
tasks t
task t
LEFT JOIN
finished_ats f ON f.task_id = t.id
LEFT JOIN
+72 -90
View File
@@ -1919,58 +1919,53 @@ func (q *Queries) PopulateSingleTaskRunData(ctx context.Context, db DBTX, arg Po
return &i, err
}
const populateTaskRunData = `-- name: PopulateTaskRunData :many
WITH input AS (
const populateTaskRunData = `-- name: PopulateTaskRunData :one
WITH task AS (
SELECT
UNNEST($2::bigint[]) AS id,
UNNEST($3::timestamptz[]) AS inserted_at
), tasks AS (
SELECT
DISTINCT ON(t.tenant_id, t.id, t.inserted_at)
t.tenant_id,
t.id,
t.inserted_at,
t.queue,
t.action_id,
t.step_id,
t.workflow_id,
t.workflow_version_id,
t.schedule_timeout,
t.step_timeout,
t.priority,
t.sticky,
t.desired_worker_id,
t.external_id,
t.display_name,
t.input,
t.additional_metadata,
t.readable_status,
t.parent_task_external_id,
t.workflow_run_id,
t.latest_retry_count
tenant_id,
id,
inserted_at,
queue,
action_id,
step_id,
workflow_id,
workflow_version_id,
schedule_timeout,
step_timeout,
priority,
sticky,
desired_worker_id,
external_id,
display_name,
input,
additional_metadata,
readable_status,
parent_task_external_id,
workflow_run_id,
latest_retry_count
FROM
v1_tasks_olap t
JOIN
input i ON i.id = t.id AND i.inserted_at = t.inserted_at
v1_tasks_olap
WHERE
t.tenant_id = $4::uuid
tenant_id = $2::uuid
AND id = $3::bigint
AND inserted_at = $4::timestamptz
), relevant_events AS (
SELECT
e.tenant_id, e.id, e.inserted_at, e.task_id, e.task_inserted_at, e.event_type, e.workflow_id, e.event_timestamp, e.readable_status, e.retry_count, e.error_message, e.output, e.worker_id, e.additional__event_data, e.additional__event_message
FROM
v1_task_events_olap e
JOIN
tasks t ON t.id = e.task_id AND t.tenant_id = e.tenant_id AND t.inserted_at = e.task_inserted_at
task t ON (t.id, t.inserted_at, t.tenant_id) = (e.task_id, e.task_inserted_at, e.tenant_id)
), max_retry_counts AS (
SELECT
e.tenant_id,
e.task_id,
e.task_inserted_at,
MAX(e.retry_count) AS max_retry_count
tenant_id,
task_id,
task_inserted_at,
MAX(retry_count) AS max_retry_count
FROM
relevant_events e
relevant_events
GROUP BY
e.tenant_id, e.task_id, e.task_inserted_at
tenant_id, task_id, task_inserted_at
), queued_ats AS (
SELECT
e.task_id::bigint,
@@ -2076,7 +2071,7 @@ SELECT
ELSE '{}'::JSONB
END::JSONB as output
FROM
tasks t
task t
LEFT JOIN
finished_ats f ON f.task_id = t.id
LEFT JOIN
@@ -2091,10 +2086,10 @@ ORDER BY t.inserted_at DESC, t.id DESC
`
type PopulateTaskRunDataParams struct {
Includepayloads bool `json:"includepayloads"`
Taskids []int64 `json:"taskids"`
Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"`
Tenantid pgtype.UUID `json:"tenantid"`
Includepayloads bool `json:"includepayloads"`
Tenantid pgtype.UUID `json:"tenantid"`
Taskid int64 `json:"taskid"`
Taskinsertedat pgtype.Timestamptz `json:"taskinsertedat"`
}
type PopulateTaskRunDataRow struct {
@@ -2125,55 +2120,42 @@ type PopulateTaskRunDataRow struct {
Output []byte `json:"output"`
}
func (q *Queries) PopulateTaskRunData(ctx context.Context, db DBTX, arg PopulateTaskRunDataParams) ([]*PopulateTaskRunDataRow, error) {
rows, err := db.Query(ctx, populateTaskRunData,
func (q *Queries) PopulateTaskRunData(ctx context.Context, db DBTX, arg PopulateTaskRunDataParams) (*PopulateTaskRunDataRow, error) {
row := db.QueryRow(ctx, populateTaskRunData,
arg.Includepayloads,
arg.Taskids,
arg.Taskinsertedats,
arg.Tenantid,
arg.Taskid,
arg.Taskinsertedat,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*PopulateTaskRunDataRow
for rows.Next() {
var i PopulateTaskRunDataRow
if err := rows.Scan(
&i.TenantID,
&i.ID,
&i.InsertedAt,
&i.ExternalID,
&i.Queue,
&i.ActionID,
&i.StepID,
&i.WorkflowID,
&i.WorkflowVersionID,
&i.ScheduleTimeout,
&i.StepTimeout,
&i.Priority,
&i.Sticky,
&i.DisplayName,
&i.AdditionalMetadata,
&i.ParentTaskExternalID,
&i.Input,
&i.Status,
&i.WorkflowRunID,
&i.FinishedAt,
&i.StartedAt,
&i.QueuedAt,
&i.ErrorMessage,
&i.RetryCount,
&i.Output,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
var i PopulateTaskRunDataRow
err := row.Scan(
&i.TenantID,
&i.ID,
&i.InsertedAt,
&i.ExternalID,
&i.Queue,
&i.ActionID,
&i.StepID,
&i.WorkflowID,
&i.WorkflowVersionID,
&i.ScheduleTimeout,
&i.StepTimeout,
&i.Priority,
&i.Sticky,
&i.DisplayName,
&i.AdditionalMetadata,
&i.ParentTaskExternalID,
&i.Input,
&i.Status,
&i.WorkflowRunID,
&i.FinishedAt,
&i.StartedAt,
&i.QueuedAt,
&i.ErrorMessage,
&i.RetryCount,
&i.Output,
)
return &i, err
}
const readDAGByExternalID = `-- name: ReadDAGByExternalID :one