fix: loop

This commit is contained in:
mrkaye97
2025-07-21 12:28:04 -04:00
parent 12dd4d05e1
commit ffd345dd8a
3 changed files with 27 additions and 16 deletions

View File

@@ -76,6 +76,7 @@ func (o *OLAPControllerImpl) notifyDAGsUpdated(ctx context.Context, rows []v1.Up
tenantId := sqlchelpers.UUIDToStr(row.TenantId)
tenantIdToWorkflowIds[tenantId] = append(tenantIdToWorkflowIds[tenantId], row.WorkflowId)
dagIds = append(dagIds, row.DagId)
dagInsertedAts = append(dagInsertedAts, row.DagInsertedAt)
readableStatuses = append(readableStatuses, row.ReadableStatus)
@@ -96,21 +97,15 @@ func (o *OLAPControllerImpl) notifyDAGsUpdated(ctx context.Context, rows []v1.Up
return err
}
for _, row := range rows {
if row.ReadableStatus == sqlcv1.V1ReadableStatusOlapCOMPLETED || row.ReadableStatus == sqlcv1.V1ReadableStatusOlapFAILED || row.ReadableStatus == sqlcv1.V1ReadableStatusOlapCANCELLED {
workflowName := workflowNames[row.WorkflowId]
for _, duration := range dagDurations {
if duration.ReadableStatus == sqlcv1.V1ReadableStatusOlapCOMPLETED || duration.ReadableStatus == sqlcv1.V1ReadableStatusOlapFAILED || duration.ReadableStatus == sqlcv1.V1ReadableStatusOlapCANCELLED {
workflowName := workflowNames[duration.WorkflowID]
if workflowName == "" {
continue
}
dagDuration := dagDurations[row.ExternalId]
if dagDuration == nil {
continue
}
prometheus.TenantWorkflowDurationBuckets.WithLabelValues(tenantId, workflowName, string(row.ReadableStatus)).Observe(float64(dagDuration.FinishedAt.Time.Sub(dagDuration.StartedAt.Time).Milliseconds()))
prometheus.TenantWorkflowDurationBuckets.WithLabelValues(tenantId, workflowName, string(duration.ReadableStatus)).Observe(float64(duration.FinishedAt.Time.Sub(duration.StartedAt.Time).Milliseconds()))
}
}

View File

@@ -1557,7 +1557,9 @@ WITH input AS (
i.inserted_at,
d.external_id,
d.display_name,
d.tenant_id
d.tenant_id,
d.workflow_id,
d.readable_status
FROM
input i
JOIN
@@ -1614,6 +1616,8 @@ WITH input AS (
)
SELECT
dd.external_id,
dd.workflow_id,
dd.readable_status,
dt.started_at::timestamptz AS started_at,
dt.finished_at::timestamptz AS finished_at
FROM

View File

@@ -366,7 +366,9 @@ WITH input AS (
i.inserted_at,
d.external_id,
d.display_name,
d.tenant_id
d.tenant_id,
d.workflow_id,
d.readable_status
FROM
input i
JOIN
@@ -423,6 +425,8 @@ WITH input AS (
)
SELECT
dd.external_id,
dd.workflow_id,
dd.readable_status,
dt.started_at::timestamptz AS started_at,
dt.finished_at::timestamptz AS finished_at
FROM
@@ -440,9 +444,11 @@ type GetDagDurationsByDagIdsParams struct {
}
type GetDagDurationsByDagIdsRow struct {
ExternalID pgtype.UUID `json:"external_id"`
StartedAt pgtype.Timestamptz `json:"started_at"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
ExternalID pgtype.UUID `json:"external_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
StartedAt pgtype.Timestamptz `json:"started_at"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
}
func (q *Queries) GetDagDurationsByDagIds(ctx context.Context, db DBTX, arg GetDagDurationsByDagIdsParams) ([]*GetDagDurationsByDagIdsRow, error) {
@@ -459,7 +465,13 @@ func (q *Queries) GetDagDurationsByDagIds(ctx context.Context, db DBTX, arg GetD
var items []*GetDagDurationsByDagIdsRow
for rows.Next() {
var i GetDagDurationsByDagIdsRow
if err := rows.Scan(&i.ExternalID, &i.StartedAt, &i.FinishedAt); err != nil {
if err := rows.Scan(
&i.ExternalID,
&i.WorkflowID,
&i.ReadableStatus,
&i.StartedAt,
&i.FinishedAt,
); err != nil {
return nil, err
}
items = append(items, &i)