mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-31 05:08:51 -06:00
fix: nil checks
This commit is contained in:
@@ -44,7 +44,7 @@ func ToTaskSummary(task *v1.TaskWithPayloads) gen.V1TaskSummary {
|
||||
|
||||
var parentTaskExternalId *uuid.UUID
|
||||
|
||||
if task.ParentTaskExternalID != nil && *task.ParentTaskExternalID != uuid.Nil {
|
||||
if task.ParentTaskExternalID != nil {
|
||||
parentTaskExternalIdValue := uuid.MustParse(task.ParentTaskExternalID.String())
|
||||
parentTaskExternalId = &parentTaskExternalIdValue
|
||||
}
|
||||
@@ -153,7 +153,7 @@ func ToTaskRunEventMany(
|
||||
for i, event := range events {
|
||||
var workerId *types.UUID
|
||||
|
||||
if event.WorkerID != nil && *event.WorkerID != uuid.Nil {
|
||||
if event.WorkerID != nil {
|
||||
workerUUid := uuid.MustParse(event.WorkerID.String())
|
||||
workerId = &workerUUid
|
||||
}
|
||||
@@ -188,7 +188,7 @@ func ToWorkflowRunTaskRunEventsMany(
|
||||
for i, event := range events {
|
||||
var workerId *uuid.UUID
|
||||
|
||||
if event.WorkerID != nil && *event.WorkerID != uuid.Nil {
|
||||
if event.WorkerID != nil {
|
||||
workerUUID := uuid.MustParse(event.WorkerID.String())
|
||||
workerId = &workerUUID
|
||||
}
|
||||
@@ -294,7 +294,7 @@ func ToTask(taskWithData *v1.TaskWithPayloads, workflowRunExternalId uuid.UUID,
|
||||
|
||||
var parentTaskExternalId *uuid.UUID
|
||||
|
||||
if taskWithData.ParentTaskExternalID != nil && *taskWithData.ParentTaskExternalID != uuid.Nil {
|
||||
if taskWithData.ParentTaskExternalID != nil {
|
||||
parentTaskUUID, err := uuid.Parse(taskWithData.ParentTaskExternalID.String())
|
||||
|
||||
if err == nil {
|
||||
@@ -397,7 +397,7 @@ func ToWorkflowRunDetails(
|
||||
for i, event := range taskRunEvents {
|
||||
var workerId *uuid.UUID
|
||||
|
||||
if event.WorkerID != nil && *event.WorkerID != uuid.Nil {
|
||||
if event.WorkerID != nil {
|
||||
workerUUID := uuid.MustParse(event.WorkerID.String())
|
||||
workerId = &workerUUID
|
||||
}
|
||||
@@ -482,7 +482,7 @@ func ToTaskTimings(
|
||||
toReturn[i].FinishedAt = &timing.FinishedAt.Time
|
||||
}
|
||||
|
||||
if timing.ParentTaskExternalID != nil && *timing.ParentTaskExternalID != uuid.Nil {
|
||||
if timing.ParentTaskExternalID != nil {
|
||||
parentId := uuid.MustParse(timing.ParentTaskExternalID.String())
|
||||
toReturn[i].ParentTaskExternalId = &parentId
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ func WorkflowRunDataToV1TaskSummary(task *v1.WorkflowRunData, workflowIdsToNames
|
||||
attempt := retryCount + 1
|
||||
|
||||
var parentTaskExternalId *uuid.UUID
|
||||
if task.ParentTaskExternalId != nil && *task.ParentTaskExternalId != uuid.Nil {
|
||||
if task.ParentTaskExternalId != nil {
|
||||
parentTaskExternalIdValue := *task.ParentTaskExternalId
|
||||
parentTaskExternalId = &parentTaskExternalIdValue
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func ToScheduledWorkflowsFromSQLC(scheduled *sqlcv1.ListScheduledWorkflowsRow) *
|
||||
|
||||
var workflowRunIdPtr *uuid.UUID
|
||||
|
||||
if scheduled.WorkflowRunId != nil && *scheduled.WorkflowRunId != uuid.Nil {
|
||||
if scheduled.WorkflowRunId != nil {
|
||||
workflowRunId := uuid.MustParse(scheduled.WorkflowRunId.String())
|
||||
workflowRunIdPtr = &workflowRunId
|
||||
}
|
||||
|
||||
@@ -529,7 +529,7 @@ func (m *sharedRepository) processEventMatches(ctx context.Context, tx sqlcv1.DB
|
||||
dependentMatches := make([]*sqlcv1.SaveSatisfiedMatchConditionsRow, 0)
|
||||
|
||||
for _, match := range satisfiedMatches {
|
||||
if match.TriggerStepID != nil && *match.TriggerStepID != uuid.Nil && match.TriggerExternalID != nil && *match.TriggerExternalID != uuid.Nil {
|
||||
if match.TriggerStepID != nil && match.TriggerExternalID != nil {
|
||||
if match.Action == sqlcv1.V1MatchConditionActionCREATEMATCH {
|
||||
dependentMatches = append(dependentMatches, match)
|
||||
continue
|
||||
@@ -595,7 +595,7 @@ func (m *sharedRepository) processEventMatches(ctx context.Context, tx sqlcv1.DB
|
||||
opt.DagInsertedAt = match.TriggerDagInsertedAt
|
||||
}
|
||||
|
||||
if match.TriggerParentTaskExternalID != nil && *match.TriggerParentTaskExternalID != uuid.Nil {
|
||||
if match.TriggerParentTaskExternalID != nil {
|
||||
externalId := match.TriggerParentTaskExternalID.String()
|
||||
opt.ParentTaskExternalId = &externalId
|
||||
}
|
||||
@@ -662,14 +662,18 @@ func (m *sharedRepository) processEventMatches(ctx context.Context, tx sqlcv1.DB
|
||||
externalIds := make([]uuid.UUID, 0, len(satisfiedMatches))
|
||||
|
||||
for _, match := range satisfiedMatches {
|
||||
if match.SignalTaskID.Valid && match.SignalTaskInsertedAt.Valid && match.SignalExternalID != nil {
|
||||
if match.SignalTaskID.Valid && match.SignalTaskInsertedAt.Valid {
|
||||
taskIds = append(taskIds, TaskIdInsertedAtRetryCount{
|
||||
Id: match.SignalTaskID.Int64,
|
||||
InsertedAt: match.SignalTaskInsertedAt,
|
||||
// signals are durable, meaning they persist between retries, so a retryCount of -1 is used
|
||||
RetryCount: -1,
|
||||
})
|
||||
externalIds = append(externalIds, *match.SignalExternalID)
|
||||
if match.SignalExternalID != nil {
|
||||
externalIds = append(externalIds, *match.SignalExternalID)
|
||||
} else {
|
||||
externalIds = append(externalIds, uuid.Nil)
|
||||
}
|
||||
datas = append(datas, match.McAggregatedData)
|
||||
eventKeys = append(eventKeys, match.SignalKey.String)
|
||||
}
|
||||
@@ -1137,7 +1141,7 @@ func (m *sharedRepository) createAdditionalMatches(ctx context.Context, tx sqlcv
|
||||
additionalMatches := make([]CreateMatchOpts, 0, len(satisfiedMatches))
|
||||
|
||||
for _, match := range satisfiedMatches {
|
||||
if match.TriggerStepID != nil && *match.TriggerStepID != uuid.Nil && match.Action == sqlcv1.V1MatchConditionActionCREATEMATCH {
|
||||
if match.TriggerStepID != nil && match.Action == sqlcv1.V1MatchConditionActionCREATEMATCH {
|
||||
conditions, ok := stepIdsToConditions[match.TriggerStepID.String()]
|
||||
|
||||
if !ok {
|
||||
|
||||
@@ -1079,7 +1079,7 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId uuid
|
||||
dagsToPopulated[externalId] = dag
|
||||
externalIdsForPayloads = append(externalIdsForPayloads, dag.ExternalID)
|
||||
|
||||
if dag.OutputEventExternalID != nil && *dag.OutputEventExternalID != uuid.Nil {
|
||||
if dag.OutputEventExternalID != nil {
|
||||
externalIdsForPayloads = append(externalIdsForPayloads, *dag.OutputEventExternalID)
|
||||
}
|
||||
}
|
||||
@@ -1485,7 +1485,7 @@ func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId u
|
||||
})
|
||||
}
|
||||
|
||||
if event.ExternalID != nil && *event.ExternalID != uuid.Nil {
|
||||
if event.ExternalID != nil {
|
||||
// randomly jitter the inserted at time by +/- 300ms to make collisions virtually impossible
|
||||
dummyInsertedAt := time.Now().Add(time.Duration(rand.Intn(2*300+1)-300) * time.Millisecond)
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ func NewSkippedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
|
||||
e.Output = outputMapBytes
|
||||
e.EventType = sqlcv1.V1TaskEventTypeCOMPLETED
|
||||
|
||||
if task.DesiredWorkerID != nil && *task.DesiredWorkerID != uuid.Nil {
|
||||
if task.DesiredWorkerID != nil {
|
||||
workerId := task.DesiredWorkerID.String()
|
||||
e.WorkerId = &workerId
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func NewFailedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
|
||||
e.ErrorMessage = task.InitialStateReason.String
|
||||
e.EventType = sqlcv1.V1TaskEventTypeFAILED
|
||||
|
||||
if task.DesiredWorkerID != nil && *task.DesiredWorkerID != uuid.Nil {
|
||||
if task.DesiredWorkerID != nil {
|
||||
workerId := task.DesiredWorkerID.String()
|
||||
e.WorkerId = &workerId
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
|
||||
)
|
||||
|
||||
@@ -201,9 +200,9 @@ func getRankedSlots(
|
||||
// if this is a HARD sticky strategy, and there's a desired worker id, it can only be assigned to that
|
||||
// worker. if there's no desired worker id, we assign to any worker.
|
||||
if qi.Sticky == sqlcv1.V1StickyStrategyHARD {
|
||||
if qi.DesiredWorkerID != nil && *qi.DesiredWorkerID != uuid.Nil && workerId == qi.DesiredWorkerID.String() {
|
||||
if qi.DesiredWorkerID != nil && workerId == qi.DesiredWorkerID.String() {
|
||||
validSlots.addSlot(slot, 0)
|
||||
} else if qi.DesiredWorkerID != nil && *qi.DesiredWorkerID == uuid.Nil {
|
||||
} else if qi.DesiredWorkerID == nil {
|
||||
validSlots.addSlot(slot, 0)
|
||||
}
|
||||
|
||||
@@ -213,7 +212,7 @@ func getRankedSlots(
|
||||
// if this is a SOFT sticky strategy, we should prefer the desired worker, but if it is not
|
||||
// available, we can assign to any worker.
|
||||
if qi.Sticky == sqlcv1.V1StickyStrategySOFT {
|
||||
if qi.DesiredWorkerID != nil && *qi.DesiredWorkerID != uuid.Nil && workerId == qi.DesiredWorkerID.String() {
|
||||
if qi.DesiredWorkerID != nil && workerId == qi.DesiredWorkerID.String() {
|
||||
validSlots.addSlot(slot, 1)
|
||||
} else {
|
||||
validSlots.addSlot(slot, 0)
|
||||
|
||||
Reference in New Issue
Block a user