Run cleanup on more tables (#2467)

* cleanup more tables

* use task retention period

* use task retention period

* cleanup

* fix query
This commit is contained in:
Mohammed Nafees
2025-10-31 03:47:36 +05:30
committed by GitHub
parent bc3dc53433
commit 1aabbe3e94
5 changed files with 159 additions and 0 deletions
+17
View File
@@ -260,3 +260,20 @@ JOIN
ORDER BY
m.id
FOR UPDATE;
-- name: CleanupMatchWithMatchConditions :exec
WITH deleted_match_ids AS (
DELETE FROM
v1_match
WHERE
signal_task_inserted_at < @date::date
OR trigger_dag_inserted_at < @date::date
OR trigger_parent_task_inserted_at < @date::date
OR trigger_existing_task_inserted_at < @date::date
RETURNING
id
)
DELETE FROM
v1_match_condition
WHERE
v1_match_id IN (SELECT id FROM deleted_match_ids);
+23
View File
@@ -11,6 +11,29 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const cleanupMatchWithMatchConditions = `-- name: CleanupMatchWithMatchConditions :exec
WITH deleted_match_ids AS (
DELETE FROM
v1_match
WHERE
signal_task_inserted_at < $1::date
OR trigger_dag_inserted_at < $1::date
OR trigger_parent_task_inserted_at < $1::date
OR trigger_existing_task_inserted_at < $1::date
RETURNING
id
)
DELETE FROM
v1_match_condition
WHERE
v1_match_id IN (SELECT id FROM deleted_match_ids)
`
func (q *Queries) CleanupMatchWithMatchConditions(ctx context.Context, db DBTX, date pgtype.Date) error {
_, err := db.Exec(ctx, cleanupMatchWithMatchConditions, date)
return err
}
type CreateMatchConditionsParams struct {
V1MatchID int64 `json:"v1_match_id"`
TenantID pgtype.UUID `json:"tenant_id"`
+40
View File
@@ -471,3 +471,43 @@ WHERE (task_id, task_inserted_at, retry_count) IN (
SELECT task_id, task_inserted_at, retry_count
FROM locked_qis
);
-- name: CleanupV1RetryQueueItem :execresult
WITH locked_qis as (
SELECT qi.task_id, qi.task_inserted_at, qi.task_retry_count
FROM v1_retry_queue_item qi
WHERE NOT EXISTS (
SELECT 1
FROM v1_task vt
WHERE qi.task_id = vt.id
AND qi.task_inserted_at = vt.inserted_at
)
ORDER BY qi.task_id, qi.task_inserted_at, qi.task_retry_count
LIMIT @batchSize::int
FOR UPDATE SKIP LOCKED
)
DELETE FROM v1_retry_queue_item
WHERE (task_id, task_inserted_at) IN (
SELECT task_id, task_inserted_at
FROM locked_qis
);
-- name: CleanupV1RateLimitedQueueItem :execresult
WITH locked_qis as (
SELECT qi.task_id, qi.task_inserted_at, qi.retry_count
FROM v1_rate_limited_queue_items qi
WHERE NOT EXISTS (
SELECT 1
FROM v1_task vt
WHERE qi.task_id = vt.id
AND qi.task_inserted_at = vt.inserted_at
)
ORDER BY qi.task_id, qi.task_inserted_at, qi.retry_count
LIMIT @batchSize::int
FOR UPDATE SKIP LOCKED
)
DELETE FROM v1_rate_limited_queue_items
WHERE (task_id, task_inserted_at) IN (
SELECT task_id, task_inserted_at
FROM locked_qis
);
+50
View File
@@ -77,6 +77,56 @@ func (q *Queries) CleanupV1QueueItem(ctx context.Context, db DBTX, batchsize int
return db.Exec(ctx, cleanupV1QueueItem, batchsize)
}
const cleanupV1RateLimitedQueueItem = `-- name: CleanupV1RateLimitedQueueItem :execresult
WITH locked_qis as (
SELECT qi.task_id, qi.task_inserted_at, qi.retry_count
FROM v1_rate_limited_queue_items qi
WHERE NOT EXISTS (
SELECT 1
FROM v1_task vt
WHERE qi.task_id = vt.id
AND qi.task_inserted_at = vt.inserted_at
)
ORDER BY qi.task_id, qi.task_inserted_at, qi.retry_count
LIMIT $1::int
FOR UPDATE SKIP LOCKED
)
DELETE FROM v1_rate_limited_queue_items
WHERE (task_id, task_inserted_at) IN (
SELECT task_id, task_inserted_at
FROM locked_qis
)
`
func (q *Queries) CleanupV1RateLimitedQueueItem(ctx context.Context, db DBTX, batchsize int32) (pgconn.CommandTag, error) {
return db.Exec(ctx, cleanupV1RateLimitedQueueItem, batchsize)
}
const cleanupV1RetryQueueItem = `-- name: CleanupV1RetryQueueItem :execresult
WITH locked_qis as (
SELECT qi.task_id, qi.task_inserted_at, qi.task_retry_count
FROM v1_retry_queue_item qi
WHERE NOT EXISTS (
SELECT 1
FROM v1_task vt
WHERE qi.task_id = vt.id
AND qi.task_inserted_at = vt.inserted_at
)
ORDER BY qi.task_id, qi.task_inserted_at, qi.task_retry_count
LIMIT $1::int
FOR UPDATE SKIP LOCKED
)
DELETE FROM v1_retry_queue_item
WHERE (task_id, task_inserted_at) IN (
SELECT task_id, task_inserted_at
FROM locked_qis
)
`
func (q *Queries) CleanupV1RetryQueueItem(ctx context.Context, db DBTX, batchsize int32) (pgconn.CommandTag, error) {
return db.Exec(ctx, cleanupV1RetryQueueItem, batchsize)
}
const deleteTasksFromQueue = `-- name: DeleteTasksFromQueue :exec
WITH input AS (
SELECT
+29
View File
@@ -3664,6 +3664,35 @@ func (r *TaskRepositoryImpl) Cleanup(ctx context.Context) (bool, error) {
shouldContinue = true
}
result, err = r.queries.CleanupV1RetryQueueItem(ctx, tx, batchSize)
if err != nil {
return false, fmt.Errorf("error cleaning up v1_retry_queue_item: %v", err)
}
if result.RowsAffected() == batchSize {
shouldContinue = true
}
result, err = r.queries.CleanupV1RateLimitedQueueItem(ctx, tx, batchSize)
if err != nil {
return false, fmt.Errorf("error cleaning up v1_rate_limited_queue_items: %v", err)
}
if result.RowsAffected() == batchSize {
shouldContinue = true
}
today := time.Now().UTC()
removeBefore := today.Add(-1 * r.taskRetentionPeriod)
err = r.queries.CleanupMatchWithMatchConditions(ctx, tx, pgtype.Date{
Time: removeBefore,
Valid: true,
})
if err != nil {
return false, fmt.Errorf("error cleaning up v1_match and v1_match_condition: %v", err)
}
result, err = r.queries.CleanupV1TaskRuntime(ctx, tx, batchSize)
if err != nil {
return false, fmt.Errorf("error cleaning up v1_task_runtime: %v", err)