mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-08 02:49:17 -05:00
fix: patch list queries to only count 10k rows (#708)
* fix: patch list queries to only count 10k rows * fix: from --------- Co-authored-by: gabriel ruttner <gabriel.ruttner@gmail.com>
This commit is contained in:
@@ -7,36 +7,52 @@ WHERE
|
||||
"id" = @id::uuid;
|
||||
|
||||
-- name: CountEvents :one
|
||||
WITH events AS (
|
||||
SELECT
|
||||
events."id", events."createdAt"
|
||||
FROM
|
||||
"Event" as events
|
||||
LEFT JOIN
|
||||
"WorkflowRunTriggeredBy" as runTriggers ON events."id" = runTriggers."eventId"
|
||||
LEFT JOIN
|
||||
"WorkflowRun" as runs ON runTriggers."parentId" = runs."id"
|
||||
LEFT JOIN
|
||||
"WorkflowVersion" as workflowVersion ON workflowVersion."id" = runs."workflowVersionId"
|
||||
LEFT JOIN
|
||||
"Workflow" as workflow ON workflowVersion."workflowId" = workflow."id"
|
||||
WHERE
|
||||
events."tenantId" = $1 AND
|
||||
(
|
||||
sqlc.narg('keys')::text[] IS NULL OR
|
||||
events."key" = ANY(sqlc.narg('keys')::text[])
|
||||
) AND
|
||||
(
|
||||
sqlc.narg('additionalMetadata')::jsonb IS NULL OR
|
||||
events."additionalMetadata" @> sqlc.narg('additionalMetadata')::jsonb
|
||||
) AND
|
||||
(
|
||||
(sqlc.narg('workflows')::text[])::uuid[] IS NULL OR
|
||||
(workflow."id" = ANY(sqlc.narg('workflows')::text[]::uuid[]))
|
||||
) AND
|
||||
(
|
||||
sqlc.narg('search')::text IS NULL OR
|
||||
jsonb_path_exists(events."data", cast(concat('$.** ? (@.type() == "string" && @ like_regex "', sqlc.narg('search')::text, '")') as jsonpath))
|
||||
) AND
|
||||
(
|
||||
sqlc.narg('statuses')::text[] IS NULL OR
|
||||
"status" = ANY(cast(sqlc.narg('statuses')::text[] as "WorkflowRunStatus"[]))
|
||||
)
|
||||
GROUP BY
|
||||
events."id"
|
||||
ORDER BY
|
||||
case when @orderBy = 'createdAt ASC' THEN events."createdAt" END ASC ,
|
||||
case when @orderBy = 'createdAt DESC' then events."createdAt" END DESC
|
||||
LIMIT 10000
|
||||
)
|
||||
SELECT
|
||||
count(*) OVER() AS total
|
||||
count(events) AS total
|
||||
FROM
|
||||
"Event" as events
|
||||
LEFT JOIN
|
||||
"WorkflowRunTriggeredBy" as runTriggers ON events."id" = runTriggers."eventId"
|
||||
LEFT JOIN
|
||||
"WorkflowRun" as runs ON runTriggers."parentId" = runs."id"
|
||||
LEFT JOIN
|
||||
"WorkflowVersion" as workflowVersion ON workflowVersion."id" = runs."workflowVersionId"
|
||||
LEFT JOIN
|
||||
"Workflow" as workflow ON workflowVersion."workflowId" = workflow."id"
|
||||
WHERE
|
||||
events."tenantId" = $1 AND
|
||||
(
|
||||
sqlc.narg('keys')::text[] IS NULL OR
|
||||
events."key" = ANY(sqlc.narg('keys')::text[])
|
||||
) AND
|
||||
(
|
||||
(sqlc.narg('workflows')::text[])::uuid[] IS NULL OR
|
||||
(workflow."id" = ANY(sqlc.narg('workflows')::text[]::uuid[]))
|
||||
) AND
|
||||
(
|
||||
sqlc.narg('search')::text IS NULL OR
|
||||
jsonb_path_exists(events."data", cast(concat('$.** ? (@.type() == "string" && @ like_regex "', sqlc.narg('search')::text, '")') as jsonpath))
|
||||
) AND
|
||||
(
|
||||
sqlc.narg('statuses')::text[] IS NULL OR
|
||||
"status" = ANY(cast(sqlc.narg('statuses')::text[] as "WorkflowRunStatus"[]))
|
||||
);
|
||||
events;
|
||||
|
||||
-- name: CreateEvent :one
|
||||
INSERT INTO "Event" (
|
||||
|
||||
@@ -12,53 +12,73 @@ import (
|
||||
)
|
||||
|
||||
const countEvents = `-- name: CountEvents :one
|
||||
WITH events AS (
|
||||
SELECT
|
||||
events."id", events."createdAt"
|
||||
FROM
|
||||
"Event" as events
|
||||
LEFT JOIN
|
||||
"WorkflowRunTriggeredBy" as runTriggers ON events."id" = runTriggers."eventId"
|
||||
LEFT JOIN
|
||||
"WorkflowRun" as runs ON runTriggers."parentId" = runs."id"
|
||||
LEFT JOIN
|
||||
"WorkflowVersion" as workflowVersion ON workflowVersion."id" = runs."workflowVersionId"
|
||||
LEFT JOIN
|
||||
"Workflow" as workflow ON workflowVersion."workflowId" = workflow."id"
|
||||
WHERE
|
||||
events."tenantId" = $1 AND
|
||||
(
|
||||
$2::text[] IS NULL OR
|
||||
events."key" = ANY($2::text[])
|
||||
) AND
|
||||
(
|
||||
$3::jsonb IS NULL OR
|
||||
events."additionalMetadata" @> $3::jsonb
|
||||
) AND
|
||||
(
|
||||
($4::text[])::uuid[] IS NULL OR
|
||||
(workflow."id" = ANY($4::text[]::uuid[]))
|
||||
) AND
|
||||
(
|
||||
$5::text IS NULL OR
|
||||
jsonb_path_exists(events."data", cast(concat('$.** ? (@.type() == "string" && @ like_regex "', $5::text, '")') as jsonpath))
|
||||
) AND
|
||||
(
|
||||
$6::text[] IS NULL OR
|
||||
"status" = ANY(cast($6::text[] as "WorkflowRunStatus"[]))
|
||||
)
|
||||
GROUP BY
|
||||
events."id"
|
||||
ORDER BY
|
||||
case when $7 = 'createdAt ASC' THEN events."createdAt" END ASC ,
|
||||
case when $7 = 'createdAt DESC' then events."createdAt" END DESC
|
||||
LIMIT 10000
|
||||
)
|
||||
SELECT
|
||||
count(*) OVER() AS total
|
||||
count(events) AS total
|
||||
FROM
|
||||
"Event" as events
|
||||
LEFT JOIN
|
||||
"WorkflowRunTriggeredBy" as runTriggers ON events."id" = runTriggers."eventId"
|
||||
LEFT JOIN
|
||||
"WorkflowRun" as runs ON runTriggers."parentId" = runs."id"
|
||||
LEFT JOIN
|
||||
"WorkflowVersion" as workflowVersion ON workflowVersion."id" = runs."workflowVersionId"
|
||||
LEFT JOIN
|
||||
"Workflow" as workflow ON workflowVersion."workflowId" = workflow."id"
|
||||
WHERE
|
||||
events."tenantId" = $1 AND
|
||||
(
|
||||
$2::text[] IS NULL OR
|
||||
events."key" = ANY($2::text[])
|
||||
) AND
|
||||
(
|
||||
($3::text[])::uuid[] IS NULL OR
|
||||
(workflow."id" = ANY($3::text[]::uuid[]))
|
||||
) AND
|
||||
(
|
||||
$4::text IS NULL OR
|
||||
jsonb_path_exists(events."data", cast(concat('$.** ? (@.type() == "string" && @ like_regex "', $4::text, '")') as jsonpath))
|
||||
) AND
|
||||
(
|
||||
$5::text[] IS NULL OR
|
||||
"status" = ANY(cast($5::text[] as "WorkflowRunStatus"[]))
|
||||
)
|
||||
events
|
||||
`
|
||||
|
||||
type CountEventsParams struct {
|
||||
TenantId pgtype.UUID `json:"tenantId"`
|
||||
Keys []string `json:"keys"`
|
||||
Workflows []string `json:"workflows"`
|
||||
Search pgtype.Text `json:"search"`
|
||||
Statuses []string `json:"statuses"`
|
||||
TenantId pgtype.UUID `json:"tenantId"`
|
||||
Keys []string `json:"keys"`
|
||||
AdditionalMetadata []byte `json:"additionalMetadata"`
|
||||
Workflows []string `json:"workflows"`
|
||||
Search pgtype.Text `json:"search"`
|
||||
Statuses []string `json:"statuses"`
|
||||
Orderby interface{} `json:"orderby"`
|
||||
}
|
||||
|
||||
func (q *Queries) CountEvents(ctx context.Context, db DBTX, arg CountEventsParams) (int64, error) {
|
||||
row := db.QueryRow(ctx, countEvents,
|
||||
arg.TenantId,
|
||||
arg.Keys,
|
||||
arg.AdditionalMetadata,
|
||||
arg.Workflows,
|
||||
arg.Search,
|
||||
arg.Statuses,
|
||||
arg.Orderby,
|
||||
)
|
||||
var total int64
|
||||
err := row.Scan(&total)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
-- name: CountWorkflowRuns :one
|
||||
SELECT
|
||||
count(runs) OVER() AS total
|
||||
FROM
|
||||
WITH runs AS (
|
||||
SELECT runs."id", runs."createdAt"
|
||||
FROM
|
||||
"WorkflowRun" as runs
|
||||
LEFT JOIN
|
||||
"WorkflowRunTriggeredBy" as runTriggers ON runTriggers."parentId" = runs."id"
|
||||
@@ -56,7 +56,17 @@ WHERE
|
||||
(
|
||||
sqlc.narg('finishedAfter')::timestamp IS NULL OR
|
||||
runs."finishedAt" > sqlc.narg('finishedAfter')::timestamp
|
||||
);
|
||||
)
|
||||
ORDER BY
|
||||
case when @orderBy = 'createdAt ASC' THEN runs."createdAt" END ASC ,
|
||||
case when @orderBy = 'createdAt DESC' then runs."createdAt" END DESC,
|
||||
runs."id" ASC
|
||||
LIMIT 10000
|
||||
)
|
||||
SELECT
|
||||
count(runs) AS total
|
||||
FROM
|
||||
runs;
|
||||
|
||||
-- name: WorkflowRunsMetricsCount :one
|
||||
SELECT
|
||||
|
||||
@@ -12,9 +12,9 @@ import (
|
||||
)
|
||||
|
||||
const countWorkflowRuns = `-- name: CountWorkflowRuns :one
|
||||
SELECT
|
||||
count(runs) OVER() AS total
|
||||
FROM
|
||||
WITH runs AS (
|
||||
SELECT runs."id", runs."createdAt"
|
||||
FROM
|
||||
"WorkflowRun" as runs
|
||||
LEFT JOIN
|
||||
"WorkflowRunTriggeredBy" as runTriggers ON runTriggers."parentId" = runs."id"
|
||||
@@ -70,6 +70,16 @@ WHERE
|
||||
$12::timestamp IS NULL OR
|
||||
runs."finishedAt" > $12::timestamp
|
||||
)
|
||||
ORDER BY
|
||||
case when $13 = 'createdAt ASC' THEN runs."createdAt" END ASC ,
|
||||
case when $13 = 'createdAt DESC' then runs."createdAt" END DESC,
|
||||
runs."id" ASC
|
||||
LIMIT 10000
|
||||
)
|
||||
SELECT
|
||||
count(runs) AS total
|
||||
FROM
|
||||
runs
|
||||
`
|
||||
|
||||
type CountWorkflowRunsParams struct {
|
||||
@@ -85,6 +95,7 @@ type CountWorkflowRunsParams struct {
|
||||
Statuses []string `json:"statuses"`
|
||||
CreatedAfter pgtype.Timestamp `json:"createdAfter"`
|
||||
FinishedAfter pgtype.Timestamp `json:"finishedAfter"`
|
||||
Orderby interface{} `json:"orderby"`
|
||||
}
|
||||
|
||||
func (q *Queries) CountWorkflowRuns(ctx context.Context, db DBTX, arg CountWorkflowRunsParams) (int64, error) {
|
||||
@@ -101,6 +112,7 @@ func (q *Queries) CountWorkflowRuns(ctx context.Context, db DBTX, arg CountWorkf
|
||||
arg.Statuses,
|
||||
arg.CreatedAfter,
|
||||
arg.FinishedAfter,
|
||||
arg.Orderby,
|
||||
)
|
||||
var total int64
|
||||
err := row.Scan(&total)
|
||||
|
||||
@@ -98,6 +98,7 @@ func (r *eventAPIRepository) ListEvents(tenantId string, opts *repository.ListEv
|
||||
|
||||
if opts.AdditionalMetadata != nil {
|
||||
queryParams.AdditionalMetadata = opts.AdditionalMetadata
|
||||
countParams.AdditionalMetadata = opts.AdditionalMetadata
|
||||
}
|
||||
|
||||
orderByField := "createdAt"
|
||||
@@ -112,6 +113,7 @@ func (r *eventAPIRepository) ListEvents(tenantId string, opts *repository.ListEv
|
||||
}
|
||||
|
||||
queryParams.Orderby = orderByField + " " + orderByDirection
|
||||
countParams.Orderby = orderByField + " " + orderByDirection
|
||||
|
||||
tx, err := r.pool.Begin(context.Background())
|
||||
|
||||
|
||||
@@ -387,6 +387,7 @@ func listWorkflowRuns(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Q
|
||||
}
|
||||
|
||||
queryParams.Orderby = orderByField + " " + orderByDirection
|
||||
countParams.Orderby = orderByField + " " + orderByDirection
|
||||
|
||||
tx, err := pool.Begin(ctx)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user