fix: LockSignalCreatedEvents performance (#1476)

* fix: LockSignalCreatedEvents performance, part 2

* fix: query

* fix: LockSignalCreatedEvents

* Revert "fix: LockSignalCreatedEvents"

This reverts commit cce5af242f.

* fix: LockSignalCreatedEvents

* fix: whitespace

* Update run.ts

* Update worker.ts

* revert example

---------

Co-authored-by: gabriel ruttner <gabriel.ruttner@gmail.com>
This commit is contained in:
abelanger5
2025-04-03 08:55:20 -04:00
committed by GitHub
parent 5781bb8938
commit 6ab9c70d80
5 changed files with 66 additions and 39 deletions

View File

@@ -2,7 +2,7 @@ version: "3.8"
services:
postgres:
image: postgres:15.6
command: postgres -c 'max_connections=500'
command: postgres -c 'max_connections=500' -c 'shared_preload_libraries=auto_explain' -c 'auto_explain.log_min_duration=1000' -c 'auto_explain.log_analyze=on' -c 'auto_explain.log_verbose=on' -c 'auto_explain.log_timing=on' -c 'auto_explain.log_buffers=on' -c 'auto_explain.log_wal=on' -c 'auto_explain.log_triggers=on' -c 'auto_explain.log_format=text'
restart: always
environment:
- POSTGRES_USER=hatchet

View File

@@ -464,25 +464,38 @@ WITH input AS (
unnest(@taskInsertedAts::timestamptz[]) AS task_inserted_at,
unnest(@eventKeys::text[]) AS event_key
) AS subquery
),
distinct_events AS (
SELECT DISTINCT
task_id, task_inserted_at
FROM
input
),
events_to_lock AS (
SELECT
e.id,
e.event_key,
e.data,
e.task_id,
e.task_inserted_at
FROM
v1_task_event e
JOIN
distinct_events de
ON e.task_id = de.task_id
AND e.task_inserted_at = de.task_inserted_at
WHERE
e.tenant_id = @tenantId::uuid
AND e.event_type = 'SIGNAL_CREATED'
)
SELECT
e.id,
e.event_key,
e.data
e.id,
e.event_key,
e.data
FROM
v1_task_event e
events_to_lock e
WHERE
(e.task_id, e.task_inserted_at, e.event_key) IN (
SELECT
task_id, task_inserted_at, event_key
FROM
input
)
AND e.tenant_id = @tenantId::uuid
AND e.event_type = 'SIGNAL_CREATED'
ORDER BY
e.task_id, e.task_inserted_at, e.id
FOR UPDATE;
e.event_key = ANY(SELECT event_key FROM input);
-- name: ListMatchingSignalEvents :many
WITH input AS (

View File

@@ -1352,36 +1352,49 @@ WITH input AS (
FROM
(
SELECT
unnest($2::bigint[]) AS task_id,
unnest($3::timestamptz[]) AS task_inserted_at,
unnest($4::text[]) AS event_key
unnest($1::bigint[]) AS task_id,
unnest($2::timestamptz[]) AS task_inserted_at,
unnest($3::text[]) AS event_key
) AS subquery
),
distinct_events AS (
SELECT DISTINCT
task_id, task_inserted_at
FROM
input
),
events_to_lock AS (
SELECT
e.id,
e.event_key,
e.data,
e.task_id,
e.task_inserted_at
FROM
v1_task_event e
JOIN
distinct_events de
ON e.task_id = de.task_id
AND e.task_inserted_at = de.task_inserted_at
WHERE
e.tenant_id = $4::uuid
AND e.event_type = 'SIGNAL_CREATED'
)
SELECT
e.id,
e.event_key,
e.data
e.id,
e.event_key,
e.data
FROM
v1_task_event e
events_to_lock e
WHERE
(e.task_id, e.task_inserted_at, e.event_key) IN (
SELECT
task_id, task_inserted_at, event_key
FROM
input
)
AND e.tenant_id = $1::uuid
AND e.event_type = 'SIGNAL_CREATED'
ORDER BY
e.task_id, e.task_inserted_at, e.id
FOR UPDATE
e.event_key = ANY(SELECT event_key FROM input)
`
type LockSignalCreatedEventsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Taskids []int64 `json:"taskids"`
Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"`
Eventkeys []string `json:"eventkeys"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type LockSignalCreatedEventsRow struct {
@@ -1394,10 +1407,10 @@ type LockSignalCreatedEventsRow struct {
// modify the events.
func (q *Queries) LockSignalCreatedEvents(ctx context.Context, db DBTX, arg LockSignalCreatedEventsParams) ([]*LockSignalCreatedEventsRow, error) {
rows, err := db.Query(ctx, lockSignalCreatedEvents,
arg.Tenantid,
arg.Taskids,
arg.Taskinsertedats,
arg.Eventkeys,
arg.Tenantid,
)
if err != nil {
return nil, err

View File

@@ -470,10 +470,10 @@ export class Context<T, K = {}> {
});
try {
// Batch workflowRuns in groups of 500
const batchSize = 1000;
let resp: WorkflowRunRef<P>[] = [];
for (let i = 0; i < workflowRuns.length; i += 500) {
const batch = workflowRuns.slice(i, i + 500);
for (let i = 0; i < workflowRuns.length; i += batchSize) {
const batch = workflowRuns.slice(i, i + batchSize);
const batchResp = await this.client.admin.runWorkflows<Q, P>(batch);
resp = resp.concat(batchResp);
}

View File

@@ -4,6 +4,7 @@ import { parent, child } from './workflow';
async function main() {
const worker = await hatchet.worker('child-workflow-worker', {
workflows: [parent, child],
slots: 100,
});
await worker.start();