mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-23 10:39:45 -05:00
Fix: Move event log to a tab on the task run detail (#3067)
* fix: separate tab for activity on run detail * fix: output payload bug * fix: couple more uuid bugs * fix: set var * fix: add event type check
This commit is contained in:
+12
-11
@@ -13,7 +13,6 @@ import { CopyWorkflowConfigButton } from '@/components/v1/shared/copy-workflow-c
|
||||
import { Button } from '@/components/v1/ui/button';
|
||||
import { CodeHighlighter } from '@/components/v1/ui/code-highlighter';
|
||||
import { Loading } from '@/components/v1/ui/loading';
|
||||
import { Separator } from '@/components/v1/ui/separator';
|
||||
import {
|
||||
Tabs,
|
||||
TabsContent,
|
||||
@@ -38,6 +37,7 @@ export enum TabOption {
|
||||
Logs = 'logs',
|
||||
Waterfall = 'waterfall',
|
||||
AdditionalMetadata = 'additional-metadata',
|
||||
Activity = 'activity',
|
||||
}
|
||||
|
||||
interface TaskRunDetailProps {
|
||||
@@ -228,6 +228,9 @@ export const TaskRunDetail = ({
|
||||
}}
|
||||
>
|
||||
<TabsList layout="underlined">
|
||||
<TabsTrigger variant="underlined" value={TabOption.Activity}>
|
||||
Activity
|
||||
</TabsTrigger>
|
||||
<TabsTrigger variant="underlined" value={TabOption.Output}>
|
||||
Output
|
||||
</TabsTrigger>
|
||||
@@ -261,6 +264,14 @@ export const TaskRunDetail = ({
|
||||
<TabsContent value={TabOption.Output}>
|
||||
<V1StepRunOutput taskRunId={taskRunId} />
|
||||
</TabsContent>
|
||||
<TabsContent value={TabOption.Activity}>
|
||||
<div className="py-4">
|
||||
<StepRunEvents
|
||||
taskRunId={taskRunId}
|
||||
fallbackTaskDisplayName={taskRun.displayName}
|
||||
/>
|
||||
</div>
|
||||
</TabsContent>
|
||||
<TabsContent value={TabOption.ChildWorkflowRuns} className="mt-4">
|
||||
<div className="flex flex-col h-96">
|
||||
<RunsProvider
|
||||
@@ -314,16 +325,6 @@ export const TaskRunDetail = ({
|
||||
</TabsContent>
|
||||
)}
|
||||
</Tabs>
|
||||
<Separator className="my-4" />
|
||||
<div className="mb-2 flex flex-col gap-y-2">
|
||||
<h3 className="flex flex-row items-center gap-4 text-lg font-semibold leading-tight text-foreground">
|
||||
Events
|
||||
</h3>
|
||||
<StepRunEvents
|
||||
taskRunId={taskRunId}
|
||||
fallbackTaskDisplayName={taskRun.displayName}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
@@ -867,7 +867,7 @@ func (m *sharedRepository) createEventMatches(ctx context.Context, tx sqlcv1.DBT
|
||||
triggerStepIds := make([]uuid.UUID, len(dagMatches))
|
||||
triggerStepIndices := make([]int64, len(dagMatches))
|
||||
triggerExternalIds := make([]uuid.UUID, len(dagMatches))
|
||||
triggerWorkflowRunIds := make([]uuid.UUID, len(dagMatches))
|
||||
triggerWorkflowRunIds := make([]*uuid.UUID, len(dagMatches))
|
||||
triggerExistingTaskIds := make([]pgtype.Int8, len(dagMatches))
|
||||
triggerExistingTaskInsertedAts := make([]pgtype.Timestamptz, len(dagMatches))
|
||||
triggerParentExternalIds := make([]*uuid.UUID, len(dagMatches))
|
||||
@@ -899,11 +899,7 @@ func (m *sharedRepository) createEventMatches(ctx context.Context, tx sqlcv1.DBT
|
||||
triggerExistingTaskIds[i] = pgtype.Int8{}
|
||||
}
|
||||
|
||||
if match.TriggerWorkflowRunId != nil {
|
||||
triggerWorkflowRunIds[i] = *match.TriggerWorkflowRunId
|
||||
} else {
|
||||
triggerWorkflowRunIds[i] = uuid.UUID{}
|
||||
}
|
||||
triggerWorkflowRunIds[i] = match.TriggerWorkflowRunId
|
||||
|
||||
triggerExistingTaskInsertedAts[i] = match.TriggerExistingTaskInsertedAt
|
||||
}
|
||||
|
||||
+74
-25
@@ -589,7 +589,13 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId uuid.
|
||||
workflowRunId = taskRun.ExternalID
|
||||
}
|
||||
|
||||
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, workflowRunId, taskRun.OutputEventExternalID)
|
||||
externalIds := make([]uuid.UUID, 0)
|
||||
|
||||
if taskRun.OutputEventExternalID != nil {
|
||||
externalIds = append(externalIds, *taskRun.OutputEventExternalID)
|
||||
}
|
||||
|
||||
payloads, err := r.readPayloads(ctx, r.readPool, tenantId, externalIds...)
|
||||
|
||||
if err != nil {
|
||||
return nil, emptyUUID, err
|
||||
@@ -601,9 +607,14 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId uuid.
|
||||
input = taskRun.Input
|
||||
}
|
||||
|
||||
output, exists := payloads[taskRun.OutputEventExternalID]
|
||||
var output []byte
|
||||
if taskRun.OutputEventExternalID != nil {
|
||||
output, exists = payloads[*taskRun.OutputEventExternalID]
|
||||
|
||||
if !exists {
|
||||
if !exists {
|
||||
output = taskRun.Output
|
||||
}
|
||||
} else {
|
||||
output = taskRun.Output
|
||||
}
|
||||
|
||||
@@ -757,7 +768,10 @@ func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId uuid.UUID,
|
||||
externalIds := make([]uuid.UUID, 0)
|
||||
for _, task := range tasksWithData {
|
||||
externalIds = append(externalIds, task.ExternalID)
|
||||
externalIds = append(externalIds, task.OutputEventExternalID)
|
||||
|
||||
if task.OutputEventExternalID != nil {
|
||||
externalIds = append(externalIds, *task.OutputEventExternalID)
|
||||
}
|
||||
}
|
||||
|
||||
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
|
||||
@@ -776,9 +790,15 @@ func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId uuid.UUID,
|
||||
input = task.Input
|
||||
}
|
||||
|
||||
output, exists := payloads[task.OutputEventExternalID]
|
||||
var output []byte
|
||||
|
||||
if !exists {
|
||||
if task.OutputEventExternalID != nil {
|
||||
output, exists = payloads[*task.OutputEventExternalID]
|
||||
|
||||
if !exists {
|
||||
output = task.Output
|
||||
}
|
||||
} else {
|
||||
output = task.Output
|
||||
}
|
||||
|
||||
@@ -845,7 +865,10 @@ func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId uuid
|
||||
externalIds := make([]uuid.UUID, 0)
|
||||
for _, task := range tasksWithData {
|
||||
externalIds = append(externalIds, task.ExternalID)
|
||||
externalIds = append(externalIds, task.OutputEventExternalID)
|
||||
|
||||
if task.OutputEventExternalID != nil {
|
||||
externalIds = append(externalIds, *task.OutputEventExternalID)
|
||||
}
|
||||
}
|
||||
|
||||
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
|
||||
@@ -864,9 +887,15 @@ func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId uuid
|
||||
input = task.Input
|
||||
}
|
||||
|
||||
output, exists := payloads[task.OutputEventExternalID]
|
||||
var output []byte
|
||||
|
||||
if !exists {
|
||||
if task.OutputEventExternalID != nil {
|
||||
output, exists = payloads[*task.OutputEventExternalID]
|
||||
|
||||
if !exists {
|
||||
output = task.Output
|
||||
}
|
||||
} else {
|
||||
output = task.Output
|
||||
}
|
||||
|
||||
@@ -918,7 +947,11 @@ func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, ten
|
||||
externalIds := make([]uuid.UUID, 0)
|
||||
for _, task := range tasksWithData {
|
||||
externalIds = append(externalIds, task.ExternalID)
|
||||
externalIds = append(externalIds, task.OutputEventExternalID)
|
||||
|
||||
if task.OutputEventExternalID != nil {
|
||||
|
||||
externalIds = append(externalIds, *task.OutputEventExternalID)
|
||||
}
|
||||
}
|
||||
|
||||
payloads, err = r.readPayloads(ctx, tx, tenantId, externalIds...)
|
||||
@@ -937,9 +970,14 @@ func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, ten
|
||||
input = task.Input
|
||||
}
|
||||
|
||||
output, exists := payloads[task.OutputEventExternalID]
|
||||
var output []byte
|
||||
if task.OutputEventExternalID != nil {
|
||||
output, exists = payloads[*task.OutputEventExternalID]
|
||||
|
||||
if !exists {
|
||||
if !exists {
|
||||
output = task.Output
|
||||
}
|
||||
} else {
|
||||
output = task.Output
|
||||
}
|
||||
|
||||
@@ -1100,7 +1138,10 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
|
||||
tasksToPopulated[externalId] = task
|
||||
|
||||
externalIdsForPayloads = append(externalIdsForPayloads, task.ExternalID)
|
||||
externalIdsForPayloads = append(externalIdsForPayloads, task.OutputEventExternalID)
|
||||
|
||||
if task.OutputEventExternalID != nil {
|
||||
externalIdsForPayloads = append(externalIdsForPayloads, *task.OutputEventExternalID)
|
||||
}
|
||||
}
|
||||
|
||||
if err := commit(ctx); err != nil {
|
||||
@@ -1140,18 +1181,19 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
|
||||
continue
|
||||
}
|
||||
|
||||
outputEventExternalId := uuid.Nil
|
||||
var outputPayload []byte
|
||||
var exists bool
|
||||
|
||||
if dag.OutputEventExternalID != nil {
|
||||
outputEventExternalId = *dag.OutputEventExternalID
|
||||
}
|
||||
outputPayload, exists = externalIdToPayload[*dag.OutputEventExternalID]
|
||||
|
||||
outputPayload, exists := externalIdToPayload[outputEventExternalId]
|
||||
|
||||
if !exists {
|
||||
if opts.IncludePayloads && outputEventExternalId != uuid.Nil && dag.ReadableStatus == sqlcv1.V1ReadableStatusOlapCOMPLETED {
|
||||
r.l.Error().Msgf("ListWorkflowRuns-1: dag with external_id %s and inserted_at %s has empty payload, falling back to output", dag.ExternalID, dag.InsertedAt.Time)
|
||||
if !exists {
|
||||
if opts.IncludePayloads && dag.OutputEventExternalID != nil && dag.ReadableStatus == sqlcv1.V1ReadableStatusOlapCOMPLETED {
|
||||
r.l.Error().Msgf("ListWorkflowRuns-1: dag with external_id %s and inserted_at %s has empty payload, falling back to output", dag.ExternalID, dag.InsertedAt.Time)
|
||||
}
|
||||
outputPayload = dag.Output
|
||||
}
|
||||
} else {
|
||||
outputPayload = dag.Output
|
||||
}
|
||||
|
||||
@@ -1198,12 +1240,19 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
|
||||
|
||||
retryCount := int(task.RetryCount)
|
||||
|
||||
outputPayload, exists := externalIdToPayload[task.OutputEventExternalID]
|
||||
var outputPayload []byte
|
||||
var exists bool
|
||||
|
||||
if !exists {
|
||||
if opts.IncludePayloads && task.OutputEventExternalID != uuid.Nil && task.Status == sqlcv1.V1ReadableStatusOlapCOMPLETED {
|
||||
r.l.Error().Msgf("ListWorkflowRuns-3: task with external_id %s and inserted_at %s has empty payload, falling back to output", task.ExternalID, task.InsertedAt.Time)
|
||||
if task.OutputEventExternalID != nil {
|
||||
outputPayload, exists = externalIdToPayload[*task.OutputEventExternalID]
|
||||
|
||||
if !exists {
|
||||
if opts.IncludePayloads && task.OutputEventExternalID != nil && task.Status == sqlcv1.V1ReadableStatusOlapCOMPLETED {
|
||||
r.l.Error().Msgf("ListWorkflowRuns-3: task with external_id %s and inserted_at %s has empty payload, falling back to output", task.ExternalID, task.InsertedAt.Time)
|
||||
}
|
||||
outputPayload = task.Output
|
||||
}
|
||||
} else {
|
||||
outputPayload = task.Output
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ SELECT
|
||||
i.display_name,
|
||||
i.workflow_id,
|
||||
i.workflow_version_id,
|
||||
i.parent_task_external_id
|
||||
NULLIF(i.parent_task_external_id, '00000000-0000-0000-0000-000000000000'::uuid)
|
||||
FROM
|
||||
input i
|
||||
RETURNING
|
||||
|
||||
@@ -48,7 +48,7 @@ SELECT
|
||||
i.display_name,
|
||||
i.workflow_id,
|
||||
i.workflow_version_id,
|
||||
i.parent_task_external_id
|
||||
NULLIF(i.parent_task_external_id, '00000000-0000-0000-0000-000000000000'::uuid)
|
||||
FROM
|
||||
input i
|
||||
RETURNING
|
||||
|
||||
@@ -235,7 +235,7 @@ type CreateMatchesForDAGTriggersParams struct {
|
||||
Triggerstepids []uuid.UUID `json:"triggerstepids"`
|
||||
Triggerstepindex []int64 `json:"triggerstepindex"`
|
||||
Triggerexternalids []uuid.UUID `json:"triggerexternalids"`
|
||||
Triggerworkflowrunids []uuid.UUID `json:"triggerworkflowrunids"`
|
||||
Triggerworkflowrunids []*uuid.UUID `json:"triggerworkflowrunids"`
|
||||
Triggerexistingtaskids []pgtype.Int8 `json:"triggerexistingtaskids"`
|
||||
Triggerexistingtaskinsertedat []pgtype.Timestamptz `json:"triggerexistingtaskinsertedat"`
|
||||
TriggerParentTaskExternalIds []*uuid.UUID `json:"triggerparentTaskExternalIds"`
|
||||
|
||||
@@ -515,7 +515,7 @@ SELECT
|
||||
f.finished_at::timestamptz as finished_at,
|
||||
s.started_at::timestamptz as started_at,
|
||||
q.queued_at::timestamptz as queued_at,
|
||||
o.external_id::UUID AS output_event_external_id,
|
||||
o.external_id AS output_event_external_id,
|
||||
o.output as output,
|
||||
e.error_message as error_message,
|
||||
sc.spawned_children,
|
||||
@@ -654,15 +654,17 @@ WITH input AS (
|
||||
e.task_id, e.retry_count DESC
|
||||
), task_output AS (
|
||||
SELECT
|
||||
DISTINCT ON (task_id)
|
||||
task_id,
|
||||
MAX(output::TEXT) FILTER (WHERE readable_status = 'COMPLETED')::JSONB AS output,
|
||||
MAX(external_id::TEXT) FILTER (WHERE readable_status = 'COMPLETED')::UUID AS output_event_external_id
|
||||
output,
|
||||
external_id AS output_event_external_id
|
||||
FROM
|
||||
relevant_events
|
||||
WHERE
|
||||
readable_status = 'COMPLETED'
|
||||
GROUP BY
|
||||
task_id
|
||||
AND event_type = 'FINISHED'
|
||||
ORDER BY
|
||||
task_id, event_timestamp DESC
|
||||
)
|
||||
SELECT
|
||||
t.tenant_id,
|
||||
@@ -696,7 +698,7 @@ SELECT
|
||||
WHEN @includePayloads::BOOLEAN THEN o.output::JSONB
|
||||
ELSE '{}'::JSONB
|
||||
END::JSONB as output,
|
||||
o.output_event_external_id::UUID AS output_event_external_id
|
||||
o.output_event_external_id AS output_event_external_id
|
||||
FROM
|
||||
tasks t
|
||||
LEFT JOIN
|
||||
|
||||
@@ -2441,7 +2441,7 @@ SELECT
|
||||
f.finished_at::timestamptz as finished_at,
|
||||
s.started_at::timestamptz as started_at,
|
||||
q.queued_at::timestamptz as queued_at,
|
||||
o.external_id::UUID AS output_event_external_id,
|
||||
o.external_id AS output_event_external_id,
|
||||
o.output as output,
|
||||
e.error_message as error_message,
|
||||
sc.spawned_children,
|
||||
@@ -2502,7 +2502,7 @@ type PopulateSingleTaskRunDataRow struct {
|
||||
FinishedAt pgtype.Timestamptz `json:"finished_at"`
|
||||
StartedAt pgtype.Timestamptz `json:"started_at"`
|
||||
QueuedAt pgtype.Timestamptz `json:"queued_at"`
|
||||
OutputEventExternalID uuid.UUID `json:"output_event_external_id"`
|
||||
OutputEventExternalID *uuid.UUID `json:"output_event_external_id"`
|
||||
Output []byte `json:"output"`
|
||||
ErrorMessage pgtype.Text `json:"error_message"`
|
||||
SpawnedChildren pgtype.Int8 `json:"spawned_children"`
|
||||
@@ -2670,15 +2670,17 @@ WITH input AS (
|
||||
e.task_id, e.retry_count DESC
|
||||
), task_output AS (
|
||||
SELECT
|
||||
DISTINCT ON (task_id)
|
||||
task_id,
|
||||
MAX(output::TEXT) FILTER (WHERE readable_status = 'COMPLETED')::JSONB AS output,
|
||||
MAX(external_id::TEXT) FILTER (WHERE readable_status = 'COMPLETED')::UUID AS output_event_external_id
|
||||
output,
|
||||
external_id AS output_event_external_id
|
||||
FROM
|
||||
relevant_events
|
||||
WHERE
|
||||
readable_status = 'COMPLETED'
|
||||
GROUP BY
|
||||
task_id
|
||||
AND event_type = 'FINISHED'
|
||||
ORDER BY
|
||||
task_id, event_timestamp DESC
|
||||
)
|
||||
SELECT
|
||||
t.tenant_id,
|
||||
@@ -2712,7 +2714,7 @@ SELECT
|
||||
WHEN $1::BOOLEAN THEN o.output::JSONB
|
||||
ELSE '{}'::JSONB
|
||||
END::JSONB as output,
|
||||
o.output_event_external_id::UUID AS output_event_external_id
|
||||
o.output_event_external_id AS output_event_external_id
|
||||
FROM
|
||||
tasks t
|
||||
LEFT JOIN
|
||||
@@ -2761,7 +2763,7 @@ type PopulateTaskRunDataRow struct {
|
||||
ErrorMessage pgtype.Text `json:"error_message"`
|
||||
RetryCount int32 `json:"retry_count"`
|
||||
Output []byte `json:"output"`
|
||||
OutputEventExternalID uuid.UUID `json:"output_event_external_id"`
|
||||
OutputEventExternalID *uuid.UUID `json:"output_event_external_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) PopulateTaskRunData(ctx context.Context, db DBTX, arg PopulateTaskRunDataParams) ([]*PopulateTaskRunDataRow, error) {
|
||||
|
||||
@@ -153,7 +153,7 @@ type CreateTasksParams struct {
|
||||
Concurrencyparentstrategyids [][]pgtype.Int8 `json:"concurrencyparentstrategyids"`
|
||||
ConcurrencyStrategyIds [][]int64 `json:"concurrencyStrategyIds"`
|
||||
ConcurrencyKeys [][]string `json:"concurrencyKeys"`
|
||||
ParentTaskExternalIds []uuid.UUID `json:"parentTaskExternalIds"`
|
||||
ParentTaskExternalIds []*uuid.UUID `json:"parentTaskExternalIds"`
|
||||
ParentTaskIds []pgtype.Int8 `json:"parentTaskIds"`
|
||||
ParentTaskInsertedAts []pgtype.Timestamptz `json:"parentTaskInsertedAts"`
|
||||
ChildIndex []pgtype.Int8 `json:"childIndex"`
|
||||
|
||||
@@ -1744,7 +1744,7 @@ func (r *sharedRepository) insertTasks(
|
||||
parentStrategyIds := make([][]pgtype.Int8, len(tasks))
|
||||
strategyIds := make([][]int64, len(tasks))
|
||||
concurrencyKeys := make([][]string, len(tasks))
|
||||
parentTaskExternalIds := make([]uuid.UUID, len(tasks))
|
||||
parentTaskExternalIds := make([]*uuid.UUID, len(tasks))
|
||||
parentTaskIds := make([]pgtype.Int8, len(tasks))
|
||||
parentTaskInsertedAts := make([]pgtype.Timestamptz, len(tasks))
|
||||
childIndices := make([]pgtype.Int8, len(tasks))
|
||||
@@ -1823,9 +1823,7 @@ func (r *sharedRepository) insertTasks(
|
||||
dagInsertedAts[i] = task.DagInsertedAt
|
||||
}
|
||||
|
||||
if task.ParentTaskExternalId != nil {
|
||||
parentTaskExternalIds[i] = *task.ParentTaskExternalId
|
||||
}
|
||||
parentTaskExternalIds[i] = task.ParentTaskExternalId
|
||||
|
||||
if task.ParentTaskId != nil {
|
||||
parentTaskIds[i] = pgtype.Int8{
|
||||
@@ -2055,7 +2053,7 @@ func (r *sharedRepository) insertTasks(
|
||||
Concurrencyparentstrategyids: make([][]pgtype.Int8, 0),
|
||||
ConcurrencyStrategyIds: make([][]int64, 0),
|
||||
ConcurrencyKeys: make([][]string, 0),
|
||||
ParentTaskExternalIds: make([]uuid.UUID, 0),
|
||||
ParentTaskExternalIds: make([]*uuid.UUID, 0),
|
||||
ParentTaskIds: make([]pgtype.Int8, 0),
|
||||
ParentTaskInsertedAts: make([]pgtype.Timestamptz, 0),
|
||||
ChildIndex: make([]pgtype.Int8, 0),
|
||||
@@ -3139,7 +3137,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId uuid.UUID
|
||||
|
||||
var parentExternalId uuid.UUID
|
||||
|
||||
if task.ParentTaskExternalID == nil {
|
||||
if task.ParentTaskExternalID != nil {
|
||||
parentExternalId = *task.ParentTaskExternalID
|
||||
}
|
||||
k := getChildSignalEventKey(parentExternalId, task.StepIndex, task.ChildIndex.Int64, childKey)
|
||||
|
||||
Reference in New Issue
Block a user