fix: checksums on workflow versions (#1410)

This commit is contained in:
abelanger5
2025-03-26 08:00:39 -07:00
committed by GitHub
parent 33b15c915c
commit cff2b37a6a
5 changed files with 72 additions and 39 deletions

View File

@@ -0,0 +1,40 @@
package dagutils
import (
"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/digest"
"github.com/hatchet-dev/hatchet/pkg/repository"
)
func Checksum(opts *repository.CreateWorkflowVersionOpts) (string, error) {
// compute a checksum for the workflow
declaredValues, err := datautils.ToJSONMap(opts)
if err != nil {
return "", err
}
// ensure no cycles
for i, job := range opts.Jobs {
if HasCycle(job.Steps) {
return "", &repository.JobRunHasCycleError{
JobName: job.Name,
}
}
var err error
opts.Jobs[i].Steps, err = OrderWorkflowSteps(job.Steps)
if err != nil {
return "", err
}
}
workflowChecksum, err := digest.DigestValues(declaredValues)
if err != nil {
return "", err
}
return workflowChecksum.String(), nil
}

View File

@@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/hatchet-dev/hatchet/internal/dagutils"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/services/admin/contracts"
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
@@ -245,7 +246,7 @@ func (a *AdminServiceImpl) PutWorkflow(ctx context.Context, req *contracts.PutWo
}
// workflow exists, look at checksum
newCS, err := createOpts.Checksum()
newCS, err := dagutils.Checksum(createOpts)
if err != nil {
return nil, err

View File

@@ -902,7 +902,7 @@ func (r *workflowEngineRepository) createWorkflowVersionTxs(ctx context.Context,
version = sqlchelpers.TextFromStr(*opts.Version)
}
cs, err := opts.Checksum()
cs, err := dagutils.Checksum(opts)
if err != nil {
return "", err

View File

@@ -70,23 +70,6 @@ type CreateConcurrencyOpts struct {
Expression string `validate:"celworkflowrunstr"`
}
func (o *CreateWorkflowVersionOpts) Checksum() (string, error) {
// compute a checksum for the workflow
declaredValues, err := datautils.ToJSONMap(o)
if err != nil {
return "", err
}
workflowChecksum, err := digest.DigestValues(declaredValues)
if err != nil {
return "", err
}
return workflowChecksum.String(), nil
}
type CreateStepOpts struct {
// (required) the task name
ReadableId string `validate:"hatchetName"`
@@ -354,12 +337,17 @@ func (r *workflowRepository) PutWorkflowVersion(ctx context.Context, tenantId st
func (r *workflowRepository) createWorkflowVersionTxs(ctx context.Context, tx sqlcv1.DBTX, tenantId, workflowId pgtype.UUID, opts *CreateWorkflowVersionOpts, oldWorkflowVersion *sqlcv1.GetWorkflowVersionForEngineRow) (string, error) {
workflowVersionId := uuid.New().String()
cs, err := opts.Checksum()
cs, err := checksumV1(opts)
if err != nil {
return "", err
}
// if the checksum matches the old checksum, we don't need to create a new workflow version
if oldWorkflowVersion != nil && oldWorkflowVersion.WorkflowVersion.Checksum == cs {
return sqlchelpers.UUIDToStr(oldWorkflowVersion.WorkflowVersion.ID), nil
}
createParams := sqlcv1.CreateWorkflowVersionParams{
ID: sqlchelpers.UUIDFromStr(workflowVersionId),
Checksum: cs,
@@ -854,6 +842,29 @@ func (r *workflowRepository) createJobTx(ctx context.Context, tx sqlcv1.DBTX, te
return jobId, nil
}
func checksumV1(opts *CreateWorkflowVersionOpts) (string, error) {
// compute a checksum for the workflow
declaredValues, err := datautils.ToJSONMap(opts)
if err != nil {
return "", err
}
opts.Tasks, err = orderWorkflowStepsV1(opts.Tasks)
if err != nil {
return "", err
}
workflowChecksum, err := digest.DigestValues(declaredValues)
if err != nil {
return "", err
}
return workflowChecksum.String(), nil
}
func hasCycleV1(steps []CreateStepOpts) bool {
graph := make(map[string][]string)
for _, step := range steps {

View File

@@ -6,8 +6,6 @@ import (
"fmt"
"time"
"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/digest"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
)
@@ -85,23 +83,6 @@ type CreateWorkflowConcurrencyOpts struct {
Expression *string `validate:"omitempty,celworkflowrunstr"`
}
func (o *CreateWorkflowVersionOpts) Checksum() (string, error) {
// compute a checksum for the workflow
declaredValues, err := datautils.ToJSONMap(o)
if err != nil {
return "", err
}
workflowChecksum, err := digest.DigestValues(declaredValues)
if err != nil {
return "", err
}
return workflowChecksum.String(), nil
}
type CreateWorkflowSchedulesOpts struct {
ScheduledTriggers []time.Time