refactor: pointers and lists

This commit is contained in:
mrkaye97
2026-01-30 19:51:22 -05:00
parent e8624f617b
commit dfe04ed78e
15 changed files with 45 additions and 45 deletions
@@ -36,7 +36,7 @@ func (tc *TasksControllerImpl) processTaskReassignments(ctx context.Context, ten
prometheus.TenantReassignedTasks.WithLabelValues(tenantId).Add(float64(len(res.RetriedTasks)))
for _, task := range res.ReleasedTasks {
var workerId *string
var workerId *uuid.UUID
if task.WorkerID != uuid.Nil {
workerIdStr := task.WorkerID.String()
+2 -2
View File
@@ -357,7 +357,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S
return nil
}
iter := func(workflowRunIds []string) error {
iter := func(workflowRunIds []uuid.UUID) error {
if len(workflowRunIds) == 0 {
return nil
}
@@ -1121,7 +1121,7 @@ type listWorkflowRunsResult struct {
AdditionalMetadata map[string]interface{}
}
func (s *DispatcherImpl) listWorkflowRuns(ctx context.Context, tenantId uuid.UUID, workflowRunIds []string) ([]*listWorkflowRunsResult, error) {
func (s *DispatcherImpl) listWorkflowRuns(ctx context.Context, tenantId uuid.UUID, workflowRunIds []uuid.UUID) ([]*listWorkflowRunsResult, error) {
// use cache heavily
res := make([]*listWorkflowRunsResult, 0)
workflowRunIdsToLookup := make([]string, 0)
@@ -92,7 +92,7 @@ type CreateMonitoringEventPayload struct {
RetryCount int32 `json:"retry_count"`
WorkerId *string `json:"worker_id,omitempty"`
WorkerId *uuid.UUID `json:"worker_id,omitempty"`
EventType sqlcv1.V1EventTypeOlap `json:"event_type"`
@@ -102,7 +102,7 @@ type CreateMonitoringEventPayload struct {
}
func MonitoringEventMessageFromActionEvent(tenantId uuid.UUID, taskId int64, retryCount int32, request *contracts.StepActionEvent) (*msgqueue.Message, error) {
var workerId *string
var workerId *uuid.UUID
if _, err := uuid.Parse(request.WorkerId); err == nil {
workerId = &request.WorkerId
+2 -2
View File
@@ -9,7 +9,7 @@ type Analytics interface {
// tenantId is an optional tenant ID to associate with this event.
// set contains key-value pairs to set on the user/group profile (e.g. email, name, etc.).
// metadata contains additional metadata to attach to the event.
Enqueue(event string, userId string, tenantId *string, set map[string]interface{}, metadata map[string]interface{})
Enqueue(event string, userId string, tenantId *uuid.UUID, set map[string]interface{}, metadata map[string]interface{})
// Tenant updates properties for a tenant group.
// tenantId is the ID of the tenant to update.
@@ -19,7 +19,7 @@ type Analytics interface {
type NoOpAnalytics struct{}
func (a NoOpAnalytics) Enqueue(event string, userId string, tenantId *string, set map[string]interface{}, metadata map[string]interface{}) {
func (a NoOpAnalytics) Enqueue(event string, userId string, tenantId *uuid.UUID, set map[string]interface{}, metadata map[string]interface{}) {
}
func (a NoOpAnalytics) Tenant(tenantId uuid.UUID, data map[string]interface{}) {}
+1 -1
View File
@@ -37,7 +37,7 @@ func NewPosthogAnalytics(opts *PosthogAnalyticsOpts) (*PosthogAnalytics, error)
}, nil
}
func (p *PosthogAnalytics) Enqueue(event string, userId string, tenantId *string, set map[string]interface{}, metadata map[string]interface{}) {
func (p *PosthogAnalytics) Enqueue(event string, userId string, tenantId *uuid.UUID, set map[string]interface{}, metadata map[string]interface{}) {
var group posthog.Groups
+1 -1
View File
@@ -28,7 +28,7 @@ type ChildWorkflowOpts struct {
ParentStepRunId uuid.UUID
ChildIndex int
ChildKey *string
DesiredWorkerId *string
DesiredWorkerId *uuid.UUID
AdditionalMetadata *map[string]string
Priority *int32
}
+4 -4
View File
@@ -48,7 +48,7 @@ type GetActionListenerRequest struct {
Actions []string
MaxRuns *int
Labels map[string]interface{}
WebhookId *string
WebhookId *uuid.UUID
}
// ActionPayload unmarshals the action payload into the target. It also validates the resulting target.
@@ -115,13 +115,13 @@ type Action struct {
ChildKey *string
// the parent workflow run id
ParentWorkflowRunId *string
ParentWorkflowRunId *uuid.UUID
Priority int32 `json:"priority,omitempty"`
WorkflowId *string `json:"workflowId,omitempty"`
WorkflowId *uuid.UUID `json:"workflowId,omitempty"`
WorkflowVersionId *string `json:"workflowVersionId,omitempty"`
WorkflowVersionId *uuid.UUID `json:"workflowVersionId,omitempty"`
}
type WorkerActionListener interface {
+10 -10
View File
@@ -1036,7 +1036,7 @@ type StepRun struct {
TenantId uuid.UUID `json:"tenantId"`
TimeoutAt *time.Time `json:"timeoutAt,omitempty"`
TimeoutAtEpoch *int `json:"timeoutAtEpoch,omitempty"`
WorkerId *string `json:"workerId,omitempty"`
WorkerId *uuid.UUID `json:"workerId,omitempty"`
}
// StepRunArchive defines model for StepRunArchive.
@@ -1074,10 +1074,10 @@ type StepRunEvent struct {
Message string `json:"message"`
Reason StepRunEventReason `json:"reason"`
Severity StepRunEventSeverity `json:"severity"`
StepRunId *string `json:"stepRunId,omitempty"`
StepRunId *uuid.UUID `json:"stepRunId,omitempty"`
TimeFirstSeen time.Time `json:"timeFirstSeen"`
TimeLastSeen time.Time `json:"timeLastSeen"`
WorkflowRunId *string `json:"workflowRunId,omitempty"`
WorkflowRunId *uuid.UUID `json:"workflowRunId,omitempty"`
}
// StepRunEventList defines model for StepRunEventList.
@@ -2257,7 +2257,7 @@ type WorkflowRunShape struct {
Status WorkflowRunStatus `json:"status"`
TenantId uuid.UUID `json:"tenantId"`
TriggeredBy WorkflowRunTriggeredBy `json:"triggeredBy"`
WorkflowId *string `json:"workflowId,omitempty"`
WorkflowId *uuid.UUID `json:"workflowId,omitempty"`
WorkflowVersion *WorkflowVersion `json:"workflowVersion,omitempty"`
WorkflowVersionId uuid.UUID `json:"workflowVersionId"`
}
@@ -2285,7 +2285,7 @@ type WorkflowRunTriggeredBy struct {
CronSchedule *string `json:"cronSchedule,omitempty"`
EventId *string `json:"eventId,omitempty"`
Metadata APIResourceMeta `json:"metadata"`
ParentWorkflowRunId *string `json:"parentWorkflowRunId,omitempty"`
ParentWorkflowRunId *uuid.UUID `json:"parentWorkflowRunId,omitempty"`
}
// WorkflowRunsCancelRequest defines model for WorkflowRunsCancelRequest.
@@ -2334,8 +2334,8 @@ type WorkflowTriggers struct {
Crons *[]WorkflowTriggerCronRef `json:"crons,omitempty"`
Events *[]WorkflowTriggerEventRef `json:"events,omitempty"`
Metadata *APIResourceMeta `json:"metadata,omitempty"`
TenantId *string `json:"tenant_id,omitempty"`
WorkflowVersionId *string `json:"workflow_version_id,omitempty"`
TenantId *uuid.UUID `json:"tenant_id,omitempty"`
WorkflowVersionId *uuid.UUID `json:"workflow_version_id,omitempty"`
}
// WorkflowUpdateRequest defines model for WorkflowUpdateRequest.
@@ -2383,9 +2383,9 @@ type WorkflowVersionMeta struct {
// WorkflowWorkersCount defines model for WorkflowWorkersCount.
type WorkflowWorkersCount struct {
FreeSlotCount *int `json:"freeSlotCount,omitempty"`
MaxSlotCount *int `json:"maxSlotCount,omitempty"`
WorkflowRunId *string `json:"workflowRunId,omitempty"`
FreeSlotCount *int `json:"freeSlotCount,omitempty"`
MaxSlotCount *int `json:"maxSlotCount,omitempty"`
WorkflowRunId *uuid.UUID `json:"workflowRunId,omitempty"`
}
// V1DagListTasksParams defines parameters for V1DagListTasks.
+3 -3
View File
@@ -78,9 +78,9 @@ type CreateMatchOpts struct {
TriggerDAGInsertedAt pgtype.Timestamptz
TriggerExternalId *string
TriggerExternalId *uuid.UUID
TriggerWorkflowRunId *string
TriggerWorkflowRunId *uuid.UUID
TriggerStepId *string
@@ -106,7 +106,7 @@ type CreateMatchOpts struct {
SignalTaskInsertedAt pgtype.Timestamptz
SignalExternalId *string
SignalExternalId *uuid.UUID
SignalKey *string
}
+2 -2
View File
@@ -243,7 +243,7 @@ type OLAPRepository interface {
// ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG.
// In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG.
ListTasksByExternalIds(ctx context.Context, tenantId uuid.UUID, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
ListTasksByExternalIds(ctx context.Context, tenantId uuid.UUID, externalIds []uuid.UUID) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
GetTaskTimings(ctx context.Context, tenantId uuid.UUID, workflowRunId uuid.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
@@ -1914,7 +1914,7 @@ func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId uuid.UUI
return r.queries.ReadDAGByExternalID(ctx, r.readPool, uuid.MustParse(dagExternalId))
}
func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId uuid.UUID, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error) {
func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId uuid.UUID, externalIds []uuid.UUID) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error) {
externalUUIDs := make([]uuid.UUID, 0)
for _, id := range externalIds {
+1 -1
View File
@@ -18,7 +18,7 @@ type TaskOutputEvent struct {
RetryCount int32 `json:"retry_count"`
WorkerId *string `json:"worker_id"`
WorkerId *uuid.UUID `json:"worker_id"`
Output []byte `json:"output"`
+8 -8
View File
@@ -47,7 +47,7 @@ type CreateTaskOpts struct {
AdditionalMetadata []byte
// (optional) the desired worker id
DesiredWorkerId *string
DesiredWorkerId *uuid.UUID
// (optional) the DAG id for the task
DagId *int64
@@ -59,7 +59,7 @@ type CreateTaskOpts struct {
InitialState sqlcv1.V1TaskInitialState
// (optional) the parent task external id
ParentTaskExternalId *string
ParentTaskExternalId *uuid.UUID
// (optional) the parent task id
ParentTaskId *int64
@@ -219,7 +219,7 @@ type TaskRepository interface {
// FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids.
// This is non-cacheable because tasks can be added to a workflow run as it executes.
FlattenExternalIds(ctx context.Context, tenantId uuid.UUID, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
FlattenExternalIds(ctx context.Context, tenantId uuid.UUID, externalIds []uuid.UUID) ([]*sqlcv1.FlattenExternalIdsRow, error)
CompleteTasks(ctx context.Context, tenantId uuid.UUID, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
@@ -231,7 +231,7 @@ type TaskRepository interface {
ListTaskMetas(ctx context.Context, tenantId uuid.UUID, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
ListFinalizedWorkflowRuns(ctx context.Context, tenantId uuid.UUID, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
ListFinalizedWorkflowRuns(ctx context.Context, tenantId uuid.UUID, rootExternalIds []uuid.UUID) ([]*ListFinalizedWorkflowRunsResponse, error)
// ListTaskParentOutputs is a method to return the output of a task's parent and grandparent tasks. This is for v0 compatibility
// with the v1 engine, and shouldn't be called from new v1 endpoints.
@@ -449,11 +449,11 @@ func (r *sharedRepository) GetTaskByExternalId(ctx context.Context, tenantId, ta
return res, nil
}
func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId uuid.UUID, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error) {
func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId uuid.UUID, externalIds []uuid.UUID) ([]*sqlcv1.FlattenExternalIdsRow, error) {
return r.lookupExternalIds(ctx, r.pool, tenantId, externalIds)
}
func (r *sharedRepository) lookupExternalIds(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error) {
func (r *sharedRepository) lookupExternalIds(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, externalIds []uuid.UUID) ([]*sqlcv1.FlattenExternalIdsRow, error) {
externalIdsToLookup := make([]uuid.UUID, 0, len(externalIds))
res := make([]*sqlcv1.FlattenExternalIdsRow, 0, len(externalIds))
@@ -860,7 +860,7 @@ func (r *TaskRepositoryImpl) failTasksTx(ctx context.Context, tx sqlcv1.DBTX, te
}, nil
}
func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId uuid.UUID, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error) {
func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId uuid.UUID, rootExternalIds []uuid.UUID) ([]*ListFinalizedWorkflowRunsResponse, error) {
start := time.Now()
checkpoint := time.Now()
@@ -1085,7 +1085,7 @@ func (r *sharedRepository) listTasks(ctx context.Context, dbtx sqlcv1.DBTX, tena
})
}
func (r *TaskRepositoryImpl) listTaskOutputEvents(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, taskExternalIds []string) ([]*TaskOutputEvent, error) {
func (r *TaskRepositoryImpl) listTaskOutputEvents(ctx context.Context, tx sqlcv1.DBTX, tenantId uuid.UUID, taskExternalIds []uuid.UUID) ([]*TaskOutputEvent, error) {
externalIds := make([]uuid.UUID, 0)
eventTypes := make([][]string, 0)
+1 -1
View File
@@ -61,7 +61,7 @@ type UpdateTenantMemberOpts struct {
type GetQueueMetricsOpts struct {
// (optional) a list of workflow ids to filter by
WorkflowIds []string `validate:"omitempty,dive,uuid"`
WorkflowIds []uuid.UUID `validate:"omitempty,dive,uuid"`
// (optional) exact metadata to filter by
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
+4 -4
View File
@@ -45,10 +45,10 @@ type TriggerTaskData struct {
AdditionalMetadata []byte `json:"additional_metadata"`
// (optional) the desired worker id
DesiredWorkerId *string `json:"desired_worker_id"`
DesiredWorkerId *uuid.UUID `json:"desired_worker_id"`
// (optional) the parent external id
ParentExternalId *string `json:"parent_external_id"`
ParentExternalId *uuid.UUID `json:"parent_external_id"`
// (optional) the parent task id
ParentTaskId *int64 `json:"parent_task_id"`
@@ -667,12 +667,12 @@ type triggerTuple struct {
additionalMetadata []byte
desiredWorkerId *string
desiredWorkerId *uuid.UUID
priority *int32
// relevant parameters for child workflows
parentExternalId *string
parentExternalId *uuid.UUID
parentTaskId *int64
parentTaskInsertedAt *time.Time
childIndex *int64
+3 -3
View File
@@ -112,7 +112,7 @@ type WorkerRepository interface {
DeleteOldWorkers(ctx context.Context, tenantId uuid.UUID, lastHeartbeatBefore time.Time) (bool, error)
GetDispatcherIdsForWorkers(ctx context.Context, tenantId uuid.UUID, workerIds []string) (map[string][]string, error)
GetDispatcherIdsForWorkers(ctx context.Context, tenantId uuid.UUID, workerIds []uuid.UUID) (map[string][]string, error)
}
type workerRepository struct {
@@ -254,7 +254,7 @@ func (w *workerRepository) CountActiveWorkersPerTenant() (map[uuid.UUID]int64, e
return tenantToWorkers, nil
}
func (w *workerRepository) GetWorkerActionsByWorkerId(tenantId uuid.UUID, workerIds []string) (map[string][]string, error) {
func (w *workerRepository) GetWorkerActionsByWorkerId(tenantId uuid.UUID, workerIds []uuid.UUID) (map[string][]string, error) {
uuidWorkerIds := make([]uuid.UUID, len(workerIds))
for i, workerId := range workerIds {
uuidWorkerIds[i] = uuid.MustParse(workerId)
@@ -641,7 +641,7 @@ func (w *workerRepository) DeleteOldWorkers(ctx context.Context, tenantId uuid.U
return hasMore, nil
}
func (w *workerRepository) GetDispatcherIdsForWorkers(ctx context.Context, tenantId uuid.UUID, workerIds []string) (map[string][]string, error) {
func (w *workerRepository) GetDispatcherIdsForWorkers(ctx context.Context, tenantId uuid.UUID, workerIds []uuid.UUID) (map[string][]string, error) {
pgWorkerIds := make([]uuid.UUID, len(workerIds))
for i, workerId := range workerIds {