Update DB queries

This commit is contained in:
Sid Premkumar
2025-12-15 11:19:40 -05:00
parent cb15f6ebfe
commit 49cf7a9e24
5 changed files with 68 additions and 33 deletions

View File

@@ -3,7 +3,6 @@ package alerting
import (
"context"
"fmt"
"strings"
"time"
"github.com/hashicorp/go-multierror"
@@ -27,7 +26,7 @@ type TenantAlertManager struct {
}
func New(repo repository.EngineRepository, e encryption.EncryptionService, serverURL string, email email.EmailService) *TenantAlertManager {
return &TenantAlertManager{repo, e, serverURL, email}
return &TenantAlertManager{repo: repo, enc: e, serverURL: serverURL, email: email}
}
func (t *TenantAlertManager) HandleAlert(tenantId string) error {
@@ -88,7 +87,10 @@ func (t *TenantAlertManager) SendWorkflowRunAlertV1(tenantId string, failedRuns
return fmt.Errorf("could not get tenant alerting settings: %w", err)
}
failedItems := t.getFailedItemsV1(failedRuns)
failedItems, err := t.getFailedItemsV1(failedRuns)
if err != nil {
return fmt.Errorf("could not process failed workflow runs: %w", err)
}
if len(failedItems) == 0 {
return nil
@@ -219,55 +221,69 @@ func (t *TenantAlertManager) getFailedItems(failedWorkflowRuns *repository.ListW
WorkflowName: workflowRun.Workflow.Name,
WorkflowRunReadableId: readableId,
RelativeDate: timediff.TimeDiff(finishedAt),
AbsoluteDate: finishedAt.Format("01/02/2006 03:04 pm UTC"),
AbsoluteDate: finishedAt.Format("01/02/2006 03:04:05 pm UTC"),
})
}
return res
}
func (t *TenantAlertManager) getFailedItemsV1(failedRuns []*v1.WorkflowRunData) []alerttypes.WorkflowRunFailedItem {
func (t *TenantAlertManager) getFailedItemsV1(failedRuns []*v1.WorkflowRunData) ([]alerttypes.WorkflowRunFailedItem, error) {
res := make([]alerttypes.WorkflowRunFailedItem, 0)
for i, workflowRun := range failedRuns {
if i >= 5 {
break
if len(failedRuns) == 0 {
return res, nil
}
// Limit to first 5 failed runs
maxRuns := 5
if len(failedRuns) < maxRuns {
maxRuns = len(failedRuns)
}
// Build result items using workflow name directly from WorkflowRunData
for i := 0; i < maxRuns; i++ {
workflowRun := failedRuns[i]
if workflowRun == nil {
continue
}
if !workflowRun.ExternalID.Valid {
continue
}
if !workflowRun.TenantID.Valid {
continue
}
workflowRunId := sqlchelpers.UUIDToStr(workflowRun.ExternalID)
tenantId := sqlchelpers.UUIDToStr(workflowRun.TenantID)
tenantIdStr := sqlchelpers.UUIDToStr(workflowRun.TenantID)
readableId := workflowRun.DisplayName
// Extract base workflow name (remove suffix pattern: -{6chars})
baseWorkflowName := readableId
if lastDash := strings.LastIndex(readableId, "-"); lastDash > 0 {
// Check if suffix looks like a 6-character random suffix (alphanumeric)
suffix := readableId[lastDash+1:]
if len(suffix) == 6 && isAlphanumeric(suffix) {
baseWorkflowName = readableId[:lastDash]
}
// Use WorkflowName directly from the query result (populated via SQL JOIN)
workflowName := workflowRun.WorkflowName
if workflowName == "" {
// Fallback to DisplayName if workflow was deleted or join failed
workflowName = readableId
}
// Validate FinishedAt is valid
if !workflowRun.FinishedAt.Valid || workflowRun.FinishedAt.Time.IsZero() {
// Skip runs with invalid finished_at timestamps
continue
}
res = append(res, alerttypes.WorkflowRunFailedItem{
Link: fmt.Sprintf("%s/tenants/%s/runs/%s", t.serverURL, tenantId, workflowRunId),
WorkflowName: baseWorkflowName,
Link: fmt.Sprintf("%s/tenants/%s/runs/%s", t.serverURL, tenantIdStr, workflowRunId),
WorkflowName: workflowName,
WorkflowRunReadableId: readableId,
RelativeDate: timediff.TimeDiff(workflowRun.FinishedAt.Time),
AbsoluteDate: workflowRun.FinishedAt.Time.Format("01/02/2006 03:04 pm UTC"),
AbsoluteDate: workflowRun.FinishedAt.Time.Format("01/02/2006 03:04:05 pm UTC"),
})
}
return res
}
func isAlphanumeric(s string) bool {
for _, r := range s {
if !((r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9')) {
return false
}
}
return true
return res, nil
}
func (t *TenantAlertManager) SendExpiringTokenAlert(tenantId string, token *dbsqlc.PollExpiringTokensRow) error {

View File

@@ -66,7 +66,7 @@ func (t *TenantAlertManager) getSlackWorkflowRunTextAndBlocks(numFailed int, fai
res = append(res, slack.NewSectionBlock(
slack.NewTextBlockObject(
slack.MarkdownType,
fmt.Sprintf(":warning: %s failed %s\n_%s_", workflowRun.WorkflowName, workflowRun.RelativeDate, workflowRun.AbsoluteDate),
fmt.Sprintf(":warning: %s failed %s\n`%s`", workflowRun.WorkflowName, workflowRun.RelativeDate, workflowRun.AbsoluteDate),
false,
false,
),

View File

@@ -113,6 +113,7 @@ type WorkflowRunData struct {
TaskInsertedAt *pgtype.Timestamptz `json:"task_inserted_at,omitempty"`
TenantID pgtype.UUID `json:"tenant_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowName string `json:"workflow_name"`
WorkflowVersionId pgtype.UUID `json:"workflow_version_id"`
RetryCount *int `json:"retry_count,omitempty"`
}
@@ -1148,6 +1149,7 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId stri
InsertedAt: dag.InsertedAt,
ExternalID: dag.ExternalID,
WorkflowID: dag.WorkflowID,
WorkflowName: dag.WorkflowName,
DisplayName: dag.DisplayName,
ReadableStatus: dag.ReadableStatus,
AdditionalMetadata: dag.AdditionalMetadata,
@@ -1198,6 +1200,7 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId stri
InsertedAt: task.InsertedAt,
ExternalID: task.ExternalID,
WorkflowID: task.WorkflowID,
WorkflowName: task.WorkflowName,
WorkflowVersionId: task.WorkflowVersionID,
DisplayName: task.DisplayName,
ReadableStatus: task.Status,

View File

@@ -555,6 +555,7 @@ WITH input AS (
t.step_id,
t.workflow_id,
t.workflow_version_id,
COALESCE(w.name, '') AS workflow_name,
t.schedule_timeout,
t.step_timeout,
t.priority,
@@ -572,6 +573,8 @@ WITH input AS (
v1_tasks_olap t
JOIN
input i ON i.id = t.id AND i.inserted_at = t.inserted_at
LEFT JOIN
"Workflow" w ON t.workflow_id = w.id
WHERE
t.tenant_id = @tenantId::uuid
), relevant_events AS (
@@ -674,6 +677,7 @@ SELECT
t.step_id,
t.workflow_id,
t.workflow_version_id,
t.workflow_name,
t.schedule_timeout,
t.step_timeout,
t.priority,
@@ -1097,6 +1101,7 @@ WITH input AS (
r.kind,
r.workflow_id,
d.display_name,
COALESCE(w.name, '') AS workflow_name,
CASE
WHEN @includePayloads::BOOLEAN THEN d.input
ELSE '{}'::JSONB
@@ -1107,6 +1112,7 @@ WITH input AS (
FROM input i
JOIN v1_runs_olap r ON (i.id, i.inserted_at) = (r.id, r.inserted_at)
JOIN v1_dags_olap d ON (r.id, r.inserted_at) = (d.id, d.inserted_at)
LEFT JOIN "Workflow" w ON r.workflow_id = w.id
WHERE r.tenant_id = @tenantId::uuid AND r.kind = 'DAG'
), relevant_events AS (
SELECT r.run_id, e.*

View File

@@ -1900,6 +1900,7 @@ WITH input AS (
r.kind,
r.workflow_id,
d.display_name,
COALESCE(w.name, '') AS workflow_name,
CASE
WHEN $1::BOOLEAN THEN d.input
ELSE '{}'::JSONB
@@ -1910,6 +1911,7 @@ WITH input AS (
FROM input i
JOIN v1_runs_olap r ON (i.id, i.inserted_at) = (r.id, r.inserted_at)
JOIN v1_dags_olap d ON (r.id, r.inserted_at) = (d.id, d.inserted_at)
LEFT JOIN "Workflow" w ON r.workflow_id = w.id
WHERE r.tenant_id = $4::uuid AND r.kind = 'DAG'
), relevant_events AS (
SELECT r.run_id, e.tenant_id, e.id, e.inserted_at, e.external_id, e.task_id, e.task_inserted_at, e.event_type, e.workflow_id, e.event_timestamp, e.readable_status, e.retry_count, e.error_message, e.output, e.worker_id, e.additional__event_data, e.additional__event_message
@@ -1953,7 +1955,7 @@ WITH input AS (
)
SELECT
r.dag_id, r.run_id, r.tenant_id, r.inserted_at, r.external_id, r.readable_status, r.kind, r.workflow_id, r.display_name, r.input, r.additional_metadata, r.workflow_version_id, r.parent_task_external_id,
r.dag_id, r.run_id, r.tenant_id, r.inserted_at, r.external_id, r.readable_status, r.kind, r.workflow_id, r.display_name, r.workflow_name, r.input, r.additional_metadata, r.workflow_version_id, r.parent_task_external_id,
m.created_at,
m.started_at,
m.finished_at,
@@ -1989,6 +1991,7 @@ type PopulateDAGMetadataRow struct {
Kind V1RunKind `json:"kind"`
WorkflowID pgtype.UUID `json:"workflow_id"`
DisplayName string `json:"display_name"`
WorkflowName string `json:"workflow_name"`
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
@@ -2026,6 +2029,7 @@ func (q *Queries) PopulateDAGMetadata(ctx context.Context, db DBTX, arg Populate
&i.Kind,
&i.WorkflowID,
&i.DisplayName,
&i.WorkflowName,
&i.Input,
&i.AdditionalMetadata,
&i.WorkflowVersionID,
@@ -2331,6 +2335,7 @@ WITH input AS (
t.step_id,
t.workflow_id,
t.workflow_version_id,
COALESCE(w.name, '') AS workflow_name,
t.schedule_timeout,
t.step_timeout,
t.priority,
@@ -2348,6 +2353,8 @@ WITH input AS (
v1_tasks_olap t
JOIN
input i ON i.id = t.id AND i.inserted_at = t.inserted_at
LEFT JOIN
"Workflow" w ON t.workflow_id = w.id
WHERE
t.tenant_id = $4::uuid
), relevant_events AS (
@@ -2450,6 +2457,7 @@ SELECT
t.step_id,
t.workflow_id,
t.workflow_version_id,
t.workflow_name,
t.schedule_timeout,
t.step_timeout,
t.priority,
@@ -2505,6 +2513,7 @@ type PopulateTaskRunDataRow struct {
StepID pgtype.UUID `json:"step_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
WorkflowName string `json:"workflow_name"`
ScheduleTimeout string `json:"schedule_timeout"`
StepTimeout pgtype.Text `json:"step_timeout"`
Priority pgtype.Int4 `json:"priority"`
@@ -2548,6 +2557,7 @@ func (q *Queries) PopulateTaskRunData(ctx context.Context, db DBTX, arg Populate
&i.StepID,
&i.WorkflowID,
&i.WorkflowVersionID,
&i.WorkflowName,
&i.ScheduleTimeout,
&i.StepTimeout,
&i.Priority,