diff --git a/api/v1/server/oas/transformers/v1/events.go b/api/v1/server/oas/transformers/v1/events.go index aa2f06b82..d8a9a307c 100644 --- a/api/v1/server/oas/transformers/v1/events.go +++ b/api/v1/server/oas/transformers/v1/events.go @@ -6,7 +6,7 @@ import ( "github.com/google/uuid" "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" - "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" ) func parseTriggeredRuns(triggeredRuns []byte) ([]gen.V1EventTriggeredRun, error) { @@ -49,7 +49,7 @@ func parseTriggeredRuns(triggeredRuns []byte) ([]gen.V1EventTriggeredRun, error) return result, nil } -func ToV1EventList(events []*sqlcv1.ListEventsRow, limit, offset, total int64) gen.V1EventList { +func ToV1EventList(events []*v1.ListEventsRow, limit, offset, total int64) gen.V1EventList { rows := make([]gen.V1Event, len(events)) numPages := int64(math.Ceil(float64(total) / float64(limit))) @@ -86,15 +86,15 @@ func ToV1EventList(events []*sqlcv1.ListEventsRow, limit, offset, total int64) g Id: row.EventExternalID.String(), }, WorkflowRunSummary: gen.V1EventWorkflowRunSummary{ - Cancelled: row.CancelledCount.Int64, - Succeeded: row.CompletedCount.Int64, - Queued: row.QueuedCount.Int64, - Failed: row.FailedCount.Int64, - Running: row.RunningCount.Int64, + Cancelled: row.CancelledCount, + Succeeded: row.CompletedCount, + Queued: row.QueuedCount, + Failed: row.FailedCount, + Running: row.RunningCount, }, Payload: &payload, SeenAt: &row.EventSeenAt.Time, - Scope: &row.EventScope.String, + Scope: &row.EventScope, TriggeredRuns: &triggeredRuns, } } diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index fdd225dbe..530510cb6 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -227,7 +227,7 @@ type OLAPRepository interface { GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error - ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*sqlcv1.ListEventsRow, *int64, error) + ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*ListEventsRow, *int64, error) ListEventKeys(ctx context.Context, tenantId string) ([]string, error) GetDagDurationsByDagIds(ctx context.Context, tenantId string, dagIds []int64, dagInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) ([]*sqlcv1.GetDagDurationsByDagIdsRow, error) @@ -1535,7 +1535,24 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev return nil } -func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*sqlcv1.ListEventsRow, *int64, error) { +type ListEventsRow struct { + TenantID pgtype.UUID `json:"tenant_id"` + EventID int64 `json:"event_id"` + EventExternalID pgtype.UUID `json:"event_external_id"` + EventSeenAt pgtype.Timestamptz `json:"event_seen_at"` + EventKey string `json:"event_key"` + EventPayload []byte `json:"event_payload"` + EventAdditionalMetadata []byte `json:"event_additional_metadata"` + EventScope string `json:"event_scope"` + QueuedCount int64 `json:"queued_count"` + RunningCount int64 `json:"running_count"` + CompletedCount int64 `json:"completed_count"` + CancelledCount int64 `json:"cancelled_count"` + FailedCount int64 `json:"failed_count"` + TriggeredRuns []byte `json:"triggered_runs"` +} + +func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*ListEventsRow, *int64, error) { events, err := r.queries.ListEvents(ctx, r.readPool, opts) if err != nil { @@ -1558,7 +1575,71 @@ func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEve return nil, nil, err } - return events, &eventCount, nil + eventExternalIds := make([]pgtype.UUID, len(events)) + + for i, event := range events { + eventExternalIds[i] = event.ExternalID + } + + eventData, err := r.queries.PopulateEventData(ctx, r.readPool, sqlcv1.PopulateEventDataParams{ + Eventexternalids: eventExternalIds, + Tenantid: opts.Tenantid, + }) + + if err != nil { + return nil, nil, fmt.Errorf("error populating event data: %v", err) + } + + externalIdToEventData := make(map[pgtype.UUID][]*sqlcv1.PopulateEventDataRow) + + for _, data := range eventData { + externalIdToEventData[data.ExternalID] = append(externalIdToEventData[data.ExternalID], data) + } + + result := make([]*ListEventsRow, 0) + + for _, event := range events { + data, exists := externalIdToEventData[event.ExternalID] + + if !exists || len(data) == 0 { + result = append(result, &ListEventsRow{ + TenantID: event.TenantID, + EventID: event.ID, + EventExternalID: event.ExternalID, + EventSeenAt: event.SeenAt, + EventKey: event.Key, + EventPayload: event.Payload, + EventAdditionalMetadata: event.AdditionalMetadata, + EventScope: event.Scope.String, + QueuedCount: 0, + RunningCount: 0, + CompletedCount: 0, + CancelledCount: 0, + FailedCount: 0, + }) + } else { + for _, d := range data { + result = append(result, &ListEventsRow{ + TenantID: event.TenantID, + EventID: event.ID, + EventExternalID: event.ExternalID, + EventSeenAt: event.SeenAt, + EventKey: event.Key, + EventPayload: event.Payload, + EventAdditionalMetadata: event.AdditionalMetadata, + EventScope: event.Scope.String, + QueuedCount: d.QueuedCount, + RunningCount: d.RunningCount, + CompletedCount: d.CompletedCount, + CancelledCount: d.CancelledCount, + FailedCount: d.FailedCount, + TriggeredRuns: d.TriggeredRuns, + }) + } + } + } + + return result, &eventCount, nil } func (r *OLAPRepositoryImpl) ListEventKeys(ctx context.Context, tenantId string) ([]string, error) { diff --git a/pkg/repository/v1/sqlcv1/olap.sql b/pkg/repository/v1/sqlcv1/olap.sql index 72cf54a79..f47630f61 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql +++ b/pkg/repository/v1/sqlcv1/olap.sql @@ -1348,109 +1348,86 @@ WHERE ; --- name: ListEvents :many -WITH included_events AS ( - SELECT - e.*, - JSON_AGG(JSON_BUILD_OBJECT('run_external_id', r.external_id, 'filter_id', etr.filter_id)) FILTER (WHERE r.external_id IS NOT NULL)::JSONB AS triggered_runs - FROM v1_event_lookup_table_olap elt - JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at) - LEFT JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at) - LEFT JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) - WHERE - e.tenant_id = @tenantId - AND ( - sqlc.narg('keys')::TEXT[] IS NULL OR - "key" = ANY(sqlc.narg('keys')::TEXT[]) - ) - AND e.seen_at >= @since::TIMESTAMPTZ - AND ( - sqlc.narg('until')::TIMESTAMPTZ IS NULL OR - e.seen_at <= sqlc.narg('until')::TIMESTAMPTZ - ) - AND ( - sqlc.narg('workflowIds')::UUID[] IS NULL OR - r.workflow_id = ANY(sqlc.narg('workflowIds')::UUID[]) - ) - AND ( - sqlc.narg('eventIds')::UUID[] IS NULL OR - elt.external_id = ANY(sqlc.narg('eventIds')::UUID[]) - ) - AND ( - sqlc.narg('additionalMetadata')::JSONB IS NULL OR - e.additional_metadata @> sqlc.narg('additionalMetadata')::JSONB - ) - AND ( - CAST(sqlc.narg('statuses')::text[] AS v1_readable_status_olap[]) IS NULL OR - r.readable_status = ANY(CAST(sqlc.narg('statuses')::text[] AS v1_readable_status_olap[])) - ) - AND ( - sqlc.narg('scopes')::TEXT[] IS NULL OR - e.scope = ANY(sqlc.narg('scopes')::TEXT[]) - ) - GROUP BY - e.tenant_id, - e.id, - e.external_id, - e.seen_at, - e.key, - e.payload, - e.additional_metadata, - e.scope - ORDER BY e.seen_at DESC, e.id - OFFSET - COALESCE(sqlc.narg('offset')::BIGINT, 0) - LIMIT - COALESCE(sqlc.narg('limit')::BIGINT, 50) -), status_counts AS ( - SELECT - e.tenant_id, - e.id, - e.seen_at, - COUNT(*) FILTER (WHERE r.readable_status = 'QUEUED') AS queued_count, - COUNT(*) FILTER (WHERE r.readable_status = 'RUNNING') AS running_count, - COUNT(*) FILTER (WHERE r.readable_status = 'COMPLETED') AS completed_count, - COUNT(*) FILTER (WHERE r.readable_status = 'CANCELLED') AS cancelled_count, - COUNT(*) FILTER (WHERE r.readable_status = 'FAILED') AS failed_count - FROM - included_events e - LEFT JOIN - v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at) - LEFT JOIN - v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) - GROUP BY - e.tenant_id, e.id, e.seen_at -) - +-- name: PopulateEventData :many SELECT - e.tenant_id, - e.id AS event_id, - e.external_id AS event_external_id, - e.seen_at AS event_seen_at, - e.key AS event_key, - e.payload AS event_payload, - e.additional_metadata AS event_additional_metadata, - e.scope AS event_scope, - sc.queued_count, - sc.running_count, - sc.completed_count, - sc.cancelled_count, - sc.failed_count, - e.triggered_runs -FROM - included_events e -LEFT JOIN - status_counts sc ON (e.tenant_id, e.id, e.seen_at) = (sc.tenant_id, sc.id, sc.seen_at) -ORDER BY e.seen_at DESC + elt.external_id, + COUNT(*) FILTER (WHERE r.readable_status = 'QUEUED') AS queued_count, + COUNT(*) FILTER (WHERE r.readable_status = 'RUNNING') AS running_count, + COUNT(*) FILTER (WHERE r.readable_status = 'COMPLETED') AS completed_count, + COUNT(*) FILTER (WHERE r.readable_status = 'CANCELLED') AS cancelled_count, + COUNT(*) FILTER (WHERE r.readable_status = 'FAILED') AS failed_count, + JSON_AGG(JSON_BUILD_OBJECT('run_external_id', r.external_id, 'filter_id', etr.filter_id)) FILTER (WHERE r.external_id IS NOT NULL)::JSONB AS triggered_runs +FROM v1_event_lookup_table_olap elt +JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at) +JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at) +JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) +WHERE + elt.external_id = ANY(@eventExternalIds::uuid[]) + AND elt.tenant_id = @tenantId::uuid +GROUP BY elt.external_id +; + +-- name: ListEvents :many +SELECT e.* +FROM v1_event_lookup_table_olap elt +JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at) +WHERE + e.tenant_id = @tenantId + AND ( + sqlc.narg('keys')::TEXT[] IS NULL OR + "key" = ANY(sqlc.narg('keys')::TEXT[]) + ) + AND e.seen_at >= @since::TIMESTAMPTZ + AND ( + sqlc.narg('until')::TIMESTAMPTZ IS NULL OR + e.seen_at <= sqlc.narg('until')::TIMESTAMPTZ + ) + AND ( + sqlc.narg('workflowIds')::UUID[] IS NULL OR + EXISTS ( + SELECT 1 + FROM v1_event_to_run_olap etr + JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) + WHERE + (etr.event_id, etr.event_seen_at) = (e.id, e.seen_at) + AND r.workflow_id = ANY(sqlc.narg('workflowIds')::UUID[]::UUID[]) + ) + ) + AND ( + sqlc.narg('eventIds')::UUID[] IS NULL OR + elt.external_id = ANY(sqlc.narg('eventIds')::UUID[]) + ) + AND ( + sqlc.narg('additionalMetadata')::JSONB IS NULL OR + e.additional_metadata @> sqlc.narg('additionalMetadata')::JSONB + ) + AND ( + CAST(sqlc.narg('statuses')::TEXT[] AS v1_readable_status_olap[]) IS NULL OR + EXISTS ( + SELECT 1 + FROM v1_event_to_run_olap etr + JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) + WHERE + (etr.event_id, etr.event_seen_at) = (e.id, e.seen_at) + AND r.readable_status = ANY(CAST(sqlc.narg('statuses')::text[]::TEXT[] AS v1_readable_status_olap[])) + ) + ) + AND ( + sqlc.narg('scopes')::TEXT[] IS NULL OR + e.scope = ANY(sqlc.narg('scopes')::TEXT[]) + ) +ORDER BY e.seen_at DESC, e.id +OFFSET + COALESCE(sqlc.narg('offset')::BIGINT, 0) +LIMIT + COALESCE(sqlc.narg('limit')::BIGINT, 50) ; -- name: CountEvents :one WITH included_events AS ( - SELECT DISTINCT e.* + SELECT e.* FROM v1_event_lookup_table_olap elt JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at) - LEFT JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at) - LEFT JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) WHERE e.tenant_id = @tenantId AND ( @@ -1464,7 +1441,14 @@ WITH included_events AS ( ) AND ( sqlc.narg('workflowIds')::UUID[] IS NULL OR - r.workflow_id = ANY(sqlc.narg('workflowIds')::UUID[]) + EXISTS ( + SELECT 1 + FROM v1_event_to_run_olap etr + JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) + WHERE + (etr.event_id, etr.event_seen_at) = (e.id, e.seen_at) + AND r.workflow_id = ANY(sqlc.narg('workflowIds')::UUID[]::UUID[]) + ) ) AND ( sqlc.narg('eventIds')::UUID[] IS NULL OR @@ -1475,8 +1459,15 @@ WITH included_events AS ( e.additional_metadata @> sqlc.narg('additionalMetadata')::JSONB ) AND ( - CAST(sqlc.narg('statuses')::text[] AS v1_readable_status_olap[]) IS NULL OR - r.readable_status = ANY(CAST(sqlc.narg('statuses')::text[] AS v1_readable_status_olap[])) + CAST(sqlc.narg('statuses')::TEXT[] AS v1_readable_status_olap[]) IS NULL OR + EXISTS ( + SELECT 1 + FROM v1_event_to_run_olap etr + JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) + WHERE + (etr.event_id, etr.event_seen_at) = (e.id, e.seen_at) + AND r.readable_status = ANY(CAST(sqlc.narg('statuses')::text[]::TEXT[] AS v1_readable_status_olap[])) + ) ) AND ( sqlc.narg('scopes')::TEXT[] IS NULL OR diff --git a/pkg/repository/v1/sqlcv1/olap.sql.go b/pkg/repository/v1/sqlcv1/olap.sql.go index 215192eed..c81917548 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql.go +++ b/pkg/repository/v1/sqlcv1/olap.sql.go @@ -21,11 +21,9 @@ type BulkCreateEventTriggersParams struct { const countEvents = `-- name: CountEvents :one WITH included_events AS ( - SELECT DISTINCT e.tenant_id, e.id, e.external_id, e.seen_at, e.key, e.payload, e.additional_metadata, e.scope + SELECT e.tenant_id, e.id, e.external_id, e.seen_at, e.key, e.payload, e.additional_metadata, e.scope FROM v1_event_lookup_table_olap elt JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at) - LEFT JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at) - LEFT JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) WHERE e.tenant_id = $1 AND ( @@ -39,7 +37,14 @@ WITH included_events AS ( ) AND ( $5::UUID[] IS NULL OR - r.workflow_id = ANY($5::UUID[]) + EXISTS ( + SELECT 1 + FROM v1_event_to_run_olap etr + JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) + WHERE + (etr.event_id, etr.event_seen_at) = (e.id, e.seen_at) + AND r.workflow_id = ANY($5::UUID[]::UUID[]) + ) ) AND ( $6::UUID[] IS NULL OR @@ -50,8 +55,15 @@ WITH included_events AS ( e.additional_metadata @> $7::JSONB ) AND ( - CAST($8::text[] AS v1_readable_status_olap[]) IS NULL OR - r.readable_status = ANY(CAST($8::text[] AS v1_readable_status_olap[])) + CAST($8::TEXT[] AS v1_readable_status_olap[]) IS NULL OR + EXISTS ( + SELECT 1 + FROM v1_event_to_run_olap etr + JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) + WHERE + (etr.event_id, etr.event_seen_at) = (e.id, e.seen_at) + AND r.readable_status = ANY(CAST($8::text[]::TEXT[] AS v1_readable_status_olap[])) + ) ) AND ( $9::TEXT[] IS NULL OR @@ -779,99 +791,59 @@ func (q *Queries) ListEventKeys(ctx context.Context, db DBTX, tenantid pgtype.UU } const listEvents = `-- name: ListEvents :many -WITH included_events AS ( - SELECT - e.tenant_id, e.id, e.external_id, e.seen_at, e.key, e.payload, e.additional_metadata, e.scope, - JSON_AGG(JSON_BUILD_OBJECT('run_external_id', r.external_id, 'filter_id', etr.filter_id)) FILTER (WHERE r.external_id IS NOT NULL)::JSONB AS triggered_runs - FROM v1_event_lookup_table_olap elt - JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at) - LEFT JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at) - LEFT JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) - WHERE - e.tenant_id = $1 - AND ( - $2::TEXT[] IS NULL OR - "key" = ANY($2::TEXT[]) +SELECT e.tenant_id, e.id, e.external_id, e.seen_at, e.key, e.payload, e.additional_metadata, e.scope +FROM v1_event_lookup_table_olap elt +JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at) +WHERE + e.tenant_id = $1 + AND ( + $2::TEXT[] IS NULL OR + "key" = ANY($2::TEXT[]) + ) + AND e.seen_at >= $3::TIMESTAMPTZ + AND ( + $4::TIMESTAMPTZ IS NULL OR + e.seen_at <= $4::TIMESTAMPTZ + ) + AND ( + $5::UUID[] IS NULL OR + EXISTS ( + SELECT 1 + FROM v1_event_to_run_olap etr + JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) + WHERE + (etr.event_id, etr.event_seen_at) = (e.id, e.seen_at) + AND r.workflow_id = ANY($5::UUID[]::UUID[]) ) - AND e.seen_at >= $3::TIMESTAMPTZ - AND ( - $4::TIMESTAMPTZ IS NULL OR - e.seen_at <= $4::TIMESTAMPTZ + ) + AND ( + $6::UUID[] IS NULL OR + elt.external_id = ANY($6::UUID[]) + ) + AND ( + $7::JSONB IS NULL OR + e.additional_metadata @> $7::JSONB + ) + AND ( + CAST($8::TEXT[] AS v1_readable_status_olap[]) IS NULL OR + EXISTS ( + SELECT 1 + FROM v1_event_to_run_olap etr + JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) + WHERE + (etr.event_id, etr.event_seen_at) = (e.id, e.seen_at) + AND r.readable_status = ANY(CAST($8::text[]::TEXT[] AS v1_readable_status_olap[])) ) - AND ( - $5::UUID[] IS NULL OR - r.workflow_id = ANY($5::UUID[]) - ) - AND ( - $6::UUID[] IS NULL OR - elt.external_id = ANY($6::UUID[]) - ) - AND ( - $7::JSONB IS NULL OR - e.additional_metadata @> $7::JSONB - ) - AND ( - CAST($8::text[] AS v1_readable_status_olap[]) IS NULL OR - r.readable_status = ANY(CAST($8::text[] AS v1_readable_status_olap[])) - ) - AND ( - $9::TEXT[] IS NULL OR - e.scope = ANY($9::TEXT[]) - ) - GROUP BY - e.tenant_id, - e.id, - e.external_id, - e.seen_at, - e.key, - e.payload, - e.additional_metadata, - e.scope - ORDER BY e.seen_at DESC, e.id - OFFSET - COALESCE($10::BIGINT, 0) - LIMIT - COALESCE($11::BIGINT, 50) -), status_counts AS ( - SELECT - e.tenant_id, - e.id, - e.seen_at, - COUNT(*) FILTER (WHERE r.readable_status = 'QUEUED') AS queued_count, - COUNT(*) FILTER (WHERE r.readable_status = 'RUNNING') AS running_count, - COUNT(*) FILTER (WHERE r.readable_status = 'COMPLETED') AS completed_count, - COUNT(*) FILTER (WHERE r.readable_status = 'CANCELLED') AS cancelled_count, - COUNT(*) FILTER (WHERE r.readable_status = 'FAILED') AS failed_count - FROM - included_events e - LEFT JOIN - v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at) - LEFT JOIN - v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) - GROUP BY - e.tenant_id, e.id, e.seen_at -) - -SELECT - e.tenant_id, - e.id AS event_id, - e.external_id AS event_external_id, - e.seen_at AS event_seen_at, - e.key AS event_key, - e.payload AS event_payload, - e.additional_metadata AS event_additional_metadata, - e.scope AS event_scope, - sc.queued_count, - sc.running_count, - sc.completed_count, - sc.cancelled_count, - sc.failed_count, - e.triggered_runs -FROM - included_events e -LEFT JOIN - status_counts sc ON (e.tenant_id, e.id, e.seen_at) = (sc.tenant_id, sc.id, sc.seen_at) -ORDER BY e.seen_at DESC + ) + AND ( + $9::TEXT[] IS NULL OR + e.scope = ANY($9::TEXT[]) + ) +ORDER BY e.seen_at DESC, e.id +OFFSET + COALESCE($10::BIGINT, 0) +LIMIT + COALESCE($11::BIGINT, 50) ` type ListEventsParams struct { @@ -888,24 +860,7 @@ type ListEventsParams struct { Limit pgtype.Int8 `json:"limit"` } -type ListEventsRow struct { - TenantID pgtype.UUID `json:"tenant_id"` - EventID int64 `json:"event_id"` - EventExternalID pgtype.UUID `json:"event_external_id"` - EventSeenAt pgtype.Timestamptz `json:"event_seen_at"` - EventKey string `json:"event_key"` - EventPayload []byte `json:"event_payload"` - EventAdditionalMetadata []byte `json:"event_additional_metadata"` - EventScope pgtype.Text `json:"event_scope"` - QueuedCount pgtype.Int8 `json:"queued_count"` - RunningCount pgtype.Int8 `json:"running_count"` - CompletedCount pgtype.Int8 `json:"completed_count"` - CancelledCount pgtype.Int8 `json:"cancelled_count"` - FailedCount pgtype.Int8 `json:"failed_count"` - TriggeredRuns []byte `json:"triggered_runs"` -} - -func (q *Queries) ListEvents(ctx context.Context, db DBTX, arg ListEventsParams) ([]*ListEventsRow, error) { +func (q *Queries) ListEvents(ctx context.Context, db DBTX, arg ListEventsParams) ([]*V1EventsOlap, error) { rows, err := db.Query(ctx, listEvents, arg.Tenantid, arg.Keys, @@ -923,24 +878,18 @@ func (q *Queries) ListEvents(ctx context.Context, db DBTX, arg ListEventsParams) return nil, err } defer rows.Close() - var items []*ListEventsRow + var items []*V1EventsOlap for rows.Next() { - var i ListEventsRow + var i V1EventsOlap if err := rows.Scan( &i.TenantID, - &i.EventID, - &i.EventExternalID, - &i.EventSeenAt, - &i.EventKey, - &i.EventPayload, - &i.EventAdditionalMetadata, - &i.EventScope, - &i.QueuedCount, - &i.RunningCount, - &i.CompletedCount, - &i.CancelledCount, - &i.FailedCount, - &i.TriggeredRuns, + &i.ID, + &i.ExternalID, + &i.SeenAt, + &i.Key, + &i.Payload, + &i.AdditionalMetadata, + &i.Scope, ); err != nil { return nil, err } @@ -1581,6 +1530,68 @@ func (q *Queries) PopulateDAGMetadata(ctx context.Context, db DBTX, arg Populate return items, nil } +const populateEventData = `-- name: PopulateEventData :many +SELECT + elt.external_id, + COUNT(*) FILTER (WHERE r.readable_status = 'QUEUED') AS queued_count, + COUNT(*) FILTER (WHERE r.readable_status = 'RUNNING') AS running_count, + COUNT(*) FILTER (WHERE r.readable_status = 'COMPLETED') AS completed_count, + COUNT(*) FILTER (WHERE r.readable_status = 'CANCELLED') AS cancelled_count, + COUNT(*) FILTER (WHERE r.readable_status = 'FAILED') AS failed_count, + JSON_AGG(JSON_BUILD_OBJECT('run_external_id', r.external_id, 'filter_id', etr.filter_id)) FILTER (WHERE r.external_id IS NOT NULL)::JSONB AS triggered_runs +FROM v1_event_lookup_table_olap elt +JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at) +JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at) +JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at) +WHERE + elt.external_id = ANY($1::uuid[]) + AND elt.tenant_id = $2::uuid +GROUP BY elt.external_id +` + +type PopulateEventDataParams struct { + Eventexternalids []pgtype.UUID `json:"eventexternalids"` + Tenantid pgtype.UUID `json:"tenantid"` +} + +type PopulateEventDataRow struct { + ExternalID pgtype.UUID `json:"external_id"` + QueuedCount int64 `json:"queued_count"` + RunningCount int64 `json:"running_count"` + CompletedCount int64 `json:"completed_count"` + CancelledCount int64 `json:"cancelled_count"` + FailedCount int64 `json:"failed_count"` + TriggeredRuns []byte `json:"triggered_runs"` +} + +func (q *Queries) PopulateEventData(ctx context.Context, db DBTX, arg PopulateEventDataParams) ([]*PopulateEventDataRow, error) { + rows, err := db.Query(ctx, populateEventData, arg.Eventexternalids, arg.Tenantid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*PopulateEventDataRow + for rows.Next() { + var i PopulateEventDataRow + if err := rows.Scan( + &i.ExternalID, + &i.QueuedCount, + &i.RunningCount, + &i.CompletedCount, + &i.CancelledCount, + &i.FailedCount, + &i.TriggeredRuns, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const populateSingleTaskRunData = `-- name: PopulateSingleTaskRunData :one WITH selected_retry_count AS ( SELECT