mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-21 08:40:10 -06:00
Debug: Add some tracing to OLAP workflow run list (#1630)
* feat: add spans for workflow run list * feat: add more spans
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/labstack/echo/v4"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
|
||||
"github.com/hatchet-dev/hatchet/internal/telemetry"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
|
||||
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
|
||||
@@ -17,6 +18,9 @@ import (
|
||||
)
|
||||
|
||||
func (t *V1WorkflowRunsService) WithDags(ctx echo.Context, request gen.V1WorkflowRunListRequestObject) (gen.V1WorkflowRunListResponseObject, error) {
|
||||
spanContext, span := telemetry.NewSpan(ctx.Request().Context(), "v1-workflow-runs-list-only-tasks")
|
||||
defer span.End()
|
||||
|
||||
tenant := ctx.Get("tenant").(*dbsqlc.Tenant)
|
||||
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
|
||||
|
||||
@@ -87,7 +91,7 @@ func (t *V1WorkflowRunsService) WithDags(ctx echo.Context, request gen.V1Workflo
|
||||
}
|
||||
|
||||
dags, total, err := t.config.V1.OLAP().ListWorkflowRuns(
|
||||
ctx.Request().Context(),
|
||||
spanContext,
|
||||
tenantId,
|
||||
opts,
|
||||
)
|
||||
@@ -105,7 +109,7 @@ func (t *V1WorkflowRunsService) WithDags(ctx echo.Context, request gen.V1Workflo
|
||||
}
|
||||
|
||||
tasks, taskIdToDagExternalId, err := t.config.V1.OLAP().ListTasksByDAGId(
|
||||
ctx.Request().Context(),
|
||||
spanContext,
|
||||
tenantId,
|
||||
dagExternalIds,
|
||||
)
|
||||
@@ -121,7 +125,7 @@ func (t *V1WorkflowRunsService) WithDags(ctx echo.Context, request gen.V1Workflo
|
||||
}
|
||||
|
||||
workflowNames, err := t.config.V1.Workflows().ListWorkflowNamesByIds(
|
||||
ctx.Request().Context(),
|
||||
spanContext,
|
||||
tenantId,
|
||||
pgWorkflowIds,
|
||||
)
|
||||
@@ -168,6 +172,9 @@ func (t *V1WorkflowRunsService) WithDags(ctx echo.Context, request gen.V1Workflo
|
||||
}
|
||||
|
||||
func (t *V1WorkflowRunsService) OnlyTasks(ctx echo.Context, request gen.V1WorkflowRunListRequestObject) (gen.V1WorkflowRunListResponseObject, error) {
|
||||
spanContext, span := telemetry.NewSpan(ctx.Request().Context(), "v1-workflow-runs-list-only-tasks")
|
||||
defer span.End()
|
||||
|
||||
tenant := ctx.Get("tenant").(*dbsqlc.Tenant)
|
||||
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
|
||||
|
||||
@@ -233,7 +240,7 @@ func (t *V1WorkflowRunsService) OnlyTasks(ctx echo.Context, request gen.V1Workfl
|
||||
}
|
||||
|
||||
tasks, total, err := t.config.V1.OLAP().ListTasks(
|
||||
ctx.Request().Context(),
|
||||
spanContext,
|
||||
tenantId,
|
||||
opts,
|
||||
)
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/rs/zerolog"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/internal/telemetry"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
|
||||
@@ -441,6 +442,9 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtyp
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error) {
|
||||
ctx, span := telemetry.NewSpan(ctx, "list-tasks-olap")
|
||||
defer span.End()
|
||||
|
||||
tx, err := r.readPool.Begin(ctx)
|
||||
|
||||
if err != nil {
|
||||
@@ -549,6 +553,9 @@ func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opt
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error) {
|
||||
ctx, span := telemetry.NewSpan(ctx, "list-tasks-by-dag-id-olap")
|
||||
defer span.End()
|
||||
|
||||
tx, err := r.readPool.Begin(ctx)
|
||||
taskIdToDagExternalId := make(map[int64]uuid.UUID)
|
||||
|
||||
@@ -631,6 +638,9 @@ func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, ten
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error) {
|
||||
ctx, span := telemetry.NewSpan(ctx, "list-workflow-runs-olap")
|
||||
defer span.End()
|
||||
|
||||
tx, err := r.readPool.Begin(ctx)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/hatchet-dev/hatchet/internal/cel"
|
||||
"github.com/hatchet-dev/hatchet/internal/datautils"
|
||||
"github.com/hatchet-dev/hatchet/internal/digest"
|
||||
"github.com/hatchet-dev/hatchet/internal/telemetry"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
|
||||
)
|
||||
@@ -193,6 +194,9 @@ func newWorkflowRepository(shared *sharedRepository) WorkflowRepository {
|
||||
}
|
||||
|
||||
func (w *workflowRepository) ListWorkflowNamesByIds(ctx context.Context, tenantId string, workflowIds []pgtype.UUID) (map[pgtype.UUID]string, error) {
|
||||
ctx, span := telemetry.NewSpan(ctx, "list-workflow-names-by-ids")
|
||||
defer span.End()
|
||||
|
||||
workflowNames, err := w.queries.ListWorkflowNamesByIds(ctx, w.pool, workflowIds)
|
||||
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user