diff --git a/internal/dagutils/checksum.go b/internal/dagutils/checksum.go new file mode 100644 index 000000000..3fcbd6ff0 --- /dev/null +++ b/internal/dagutils/checksum.go @@ -0,0 +1,40 @@ +package dagutils + +import ( + "github.com/hatchet-dev/hatchet/internal/datautils" + "github.com/hatchet-dev/hatchet/internal/digest" + "github.com/hatchet-dev/hatchet/pkg/repository" +) + +func Checksum(opts *repository.CreateWorkflowVersionOpts) (string, error) { + // compute a checksum for the workflow + declaredValues, err := datautils.ToJSONMap(opts) + + if err != nil { + return "", err + } + + // ensure no cycles + for i, job := range opts.Jobs { + if HasCycle(job.Steps) { + return "", &repository.JobRunHasCycleError{ + JobName: job.Name, + } + } + + var err error + opts.Jobs[i].Steps, err = OrderWorkflowSteps(job.Steps) + + if err != nil { + return "", err + } + } + + workflowChecksum, err := digest.DigestValues(declaredValues) + + if err != nil { + return "", err + } + + return workflowChecksum.String(), nil +} diff --git a/internal/services/admin/server.go b/internal/services/admin/server.go index 26e91e29b..4dd3e8ca2 100644 --- a/internal/services/admin/server.go +++ b/internal/services/admin/server.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/hatchet-dev/hatchet/internal/dagutils" "github.com/hatchet-dev/hatchet/internal/msgqueue" "github.com/hatchet-dev/hatchet/internal/services/admin/contracts" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" @@ -245,7 +246,7 @@ func (a *AdminServiceImpl) PutWorkflow(ctx context.Context, req *contracts.PutWo } // workflow exists, look at checksum - newCS, err := createOpts.Checksum() + newCS, err := dagutils.Checksum(createOpts) if err != nil { return nil, err diff --git a/pkg/repository/postgres/workflow.go b/pkg/repository/postgres/workflow.go index 67f592245..a3fb8f11a 100644 --- a/pkg/repository/postgres/workflow.go +++ b/pkg/repository/postgres/workflow.go @@ -902,7 +902,7 @@ func (r *workflowEngineRepository) createWorkflowVersionTxs(ctx context.Context, version = sqlchelpers.TextFromStr(*opts.Version) } - cs, err := opts.Checksum() + cs, err := dagutils.Checksum(opts) if err != nil { return "", err diff --git a/pkg/repository/v1/workflow.go b/pkg/repository/v1/workflow.go index 83c125d32..e65814b81 100644 --- a/pkg/repository/v1/workflow.go +++ b/pkg/repository/v1/workflow.go @@ -70,23 +70,6 @@ type CreateConcurrencyOpts struct { Expression string `validate:"celworkflowrunstr"` } -func (o *CreateWorkflowVersionOpts) Checksum() (string, error) { - // compute a checksum for the workflow - declaredValues, err := datautils.ToJSONMap(o) - - if err != nil { - return "", err - } - - workflowChecksum, err := digest.DigestValues(declaredValues) - - if err != nil { - return "", err - } - - return workflowChecksum.String(), nil -} - type CreateStepOpts struct { // (required) the task name ReadableId string `validate:"hatchetName"` @@ -354,12 +337,17 @@ func (r *workflowRepository) PutWorkflowVersion(ctx context.Context, tenantId st func (r *workflowRepository) createWorkflowVersionTxs(ctx context.Context, tx sqlcv1.DBTX, tenantId, workflowId pgtype.UUID, opts *CreateWorkflowVersionOpts, oldWorkflowVersion *sqlcv1.GetWorkflowVersionForEngineRow) (string, error) { workflowVersionId := uuid.New().String() - cs, err := opts.Checksum() + cs, err := checksumV1(opts) if err != nil { return "", err } + // if the checksum matches the old checksum, we don't need to create a new workflow version + if oldWorkflowVersion != nil && oldWorkflowVersion.WorkflowVersion.Checksum == cs { + return sqlchelpers.UUIDToStr(oldWorkflowVersion.WorkflowVersion.ID), nil + } + createParams := sqlcv1.CreateWorkflowVersionParams{ ID: sqlchelpers.UUIDFromStr(workflowVersionId), Checksum: cs, @@ -854,6 +842,29 @@ func (r *workflowRepository) createJobTx(ctx context.Context, tx sqlcv1.DBTX, te return jobId, nil } +func checksumV1(opts *CreateWorkflowVersionOpts) (string, error) { + // compute a checksum for the workflow + declaredValues, err := datautils.ToJSONMap(opts) + + if err != nil { + return "", err + } + + opts.Tasks, err = orderWorkflowStepsV1(opts.Tasks) + + if err != nil { + return "", err + } + + workflowChecksum, err := digest.DigestValues(declaredValues) + + if err != nil { + return "", err + } + + return workflowChecksum.String(), nil +} + func hasCycleV1(steps []CreateStepOpts) bool { graph := make(map[string][]string) for _, step := range steps { diff --git a/pkg/repository/workflow.go b/pkg/repository/workflow.go index a9c06d4b9..e6a558845 100644 --- a/pkg/repository/workflow.go +++ b/pkg/repository/workflow.go @@ -6,8 +6,6 @@ import ( "fmt" "time" - "github.com/hatchet-dev/hatchet/internal/datautils" - "github.com/hatchet-dev/hatchet/internal/digest" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" ) @@ -85,23 +83,6 @@ type CreateWorkflowConcurrencyOpts struct { Expression *string `validate:"omitempty,celworkflowrunstr"` } -func (o *CreateWorkflowVersionOpts) Checksum() (string, error) { - // compute a checksum for the workflow - declaredValues, err := datautils.ToJSONMap(o) - - if err != nil { - return "", err - } - - workflowChecksum, err := digest.DigestValues(declaredValues) - - if err != nil { - return "", err - } - - return workflowChecksum.String(), nil -} - type CreateWorkflowSchedulesOpts struct { ScheduledTriggers []time.Time