diff --git a/frontend/app/src/pages/main/workflow-runs/$run/components/step-run-playground.tsx b/frontend/app/src/pages/main/workflow-runs/$run/components/step-run-playground.tsx index e46a5e21b..3389f5f79 100644 --- a/frontend/app/src/pages/main/workflow-runs/$run/components/step-run-playground.tsx +++ b/frontend/app/src/pages/main/workflow-runs/$run/components/step-run-playground.tsx @@ -116,6 +116,8 @@ export function StepRunPlayground({ ) { return 1000; } + + return 5000; }, }); diff --git a/internal/datautils/job_data.go b/internal/datautils/job_data.go index 368706c4f..4a98cc062 100644 --- a/internal/datautils/job_data.go +++ b/internal/datautils/job_data.go @@ -1,11 +1,5 @@ package datautils -import ( - "fmt" - - "github.com/steebchen/prisma-client-go/runtime/types" -) - type TriggeredBy string const ( @@ -34,75 +28,3 @@ type StepRunData struct { } type StepData map[string]interface{} - -func NewJobRunLookupDataFromInputBytes(input []byte, triggeredBy TriggeredBy) (JobRunLookupData, error) { - inputMap, err := jsonBytesToMap(input) - - if err != nil { - return JobRunLookupData{}, fmt.Errorf("failed to convert input to map: %w", err) - } - - return NewJobRunLookupData(inputMap, triggeredBy), nil -} - -func NewJobRunLookupData(input map[string]interface{}, triggeredBy TriggeredBy) JobRunLookupData { - return JobRunLookupData{ - Input: input, - TriggeredBy: triggeredBy, - Steps: map[string]StepData{}, - } -} - -func GetJobRunLookupData(data *types.JSON) (JobRunLookupData, error) { - if data == nil { - return JobRunLookupData{}, nil - } - - currData := JobRunLookupData{} - err := FromJSONType(data, &currData) - - if err != nil { - return JobRunLookupData{}, fmt.Errorf("failed to convert data to map: %w", err) - } - - return currData, nil -} - -// func AddStepOutput(data *types.JSON, stepReadableId string, stepOutput []byte) ([]byte, error) { -// if data == nil { -// data = &types.JSON{} -// } - -// unquoted, err := strconv.Unquote(string(stepOutput)) - -// if err == nil { -// stepOutput = []byte(unquoted) -// } - -// outputMap, err := jsonBytesToMap(stepOutput) - -// if err != nil { -// return nil, fmt.Errorf("failed to convert step output to map: %w", err) -// } - -// currData := JobRunLookupData{} -// err = FromJSONType(data, &currData) - -// if err != nil { -// return nil, fmt.Errorf("failed to convert data to map: %w", err) -// } - -// if currData.Steps == nil { -// currData.Steps = map[string]StepData{} -// } - -// currData.Steps[stepReadableId] = outputMap - -// jsonBytes, err := json.Marshal(currData) - -// if err != nil { -// return nil, fmt.Errorf("failed to marshal data: %w", err) -// } - -// return jsonBytes, nil -// } diff --git a/internal/datautils/map.go b/internal/datautils/map.go index d2bce8c96..aa6354515 100644 --- a/internal/datautils/map.go +++ b/internal/datautils/map.go @@ -23,10 +23,10 @@ func ToJSONMap(data interface{}) (map[string]interface{}, error) { return nil, err } - return jsonBytesToMap(jsonBytes) + return JSONBytesToMap(jsonBytes) } -func jsonBytesToMap(jsonBytes []byte) (map[string]interface{}, error) { +func JSONBytesToMap(jsonBytes []byte) (map[string]interface{}, error) { dataMap := map[string]interface{}{} err := json.Unmarshal(jsonBytes, &dataMap) diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 8f3c83cf3..8b42a6e52 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -400,9 +400,39 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *task var inputBytes []byte var retryCount = stepRun.RetryCount + 1 + // update the input schema for the step run based on the new input if payload.InputData != "" { - // update the input schema for the step run based on the new input - jsonSchemaBytes, err := schema.SchemaBytesFromBytes([]byte(payload.InputData)) + inputBytes = []byte(payload.InputData) + + // Merge the existing input data with the new input data. We don't blindly trust the + // input data because the user could have deleted fields that are required by the step. + // A better solution would be to validate the user input ahead of time. + // NOTE: this is an expensive operation. + if currentInput, ok := stepRun.Input(); ok { + inputMap, err := datautils.JSONBytesToMap([]byte(payload.InputData)) + + if err != nil { + return fmt.Errorf("could not convert input data to map: %w", err) + } + + currentInputMap, err := datautils.JSONBytesToMap(currentInput) + + if err != nil { + return fmt.Errorf("could not convert current input to map: %w", err) + } + + mergedInput := datautils.MergeMaps(currentInputMap, inputMap) + + mergedInputBytes, err := json.Marshal(mergedInput) + + if err != nil { + return fmt.Errorf("could not marshal merged input: %w", err) + } + + inputBytes = mergedInputBytes + } + + jsonSchemaBytes, err := schema.SchemaBytesFromBytes(inputBytes) if err != nil { return err @@ -414,8 +444,6 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *task return err } - inputBytes = []byte(payload.InputData) - // if the input data has been manually set, we reset the retry count as this is a user-triggered retry retryCount = 0 } else { diff --git a/typescript-sdk/examples/dag-worker.ts b/typescript-sdk/examples/dag-worker.ts index 3180d9c04..24ccf57a2 100644 --- a/typescript-sdk/examples/dag-worker.ts +++ b/typescript-sdk/examples/dag-worker.ts @@ -45,8 +45,15 @@ const workflow: Workflow = { { name: 'dag-step4', parents: ['dag-step1', 'dag-step3'], - run: (ctx) => { - console.log('executed step4!'); + run: async (ctx) => { + await sleep(5000); + + // simulate a really slow network call + setTimeout(async () => { + await sleep(1000); + ctx.playground('slow', 'call'); + }, 5000); + return { step4: 'step4' }; }, },