mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-07 02:19:46 -05:00
fix: race condition on playground inputs (#225)
* fix: race condition on playground inputs * add example slow network call
This commit is contained in:
@@ -116,6 +116,8 @@ export function StepRunPlayground({
|
||||
) {
|
||||
return 1000;
|
||||
}
|
||||
|
||||
return 5000;
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
@@ -1,11 +1,5 @@
|
||||
package datautils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/steebchen/prisma-client-go/runtime/types"
|
||||
)
|
||||
|
||||
type TriggeredBy string
|
||||
|
||||
const (
|
||||
@@ -34,75 +28,3 @@ type StepRunData struct {
|
||||
}
|
||||
|
||||
type StepData map[string]interface{}
|
||||
|
||||
func NewJobRunLookupDataFromInputBytes(input []byte, triggeredBy TriggeredBy) (JobRunLookupData, error) {
|
||||
inputMap, err := jsonBytesToMap(input)
|
||||
|
||||
if err != nil {
|
||||
return JobRunLookupData{}, fmt.Errorf("failed to convert input to map: %w", err)
|
||||
}
|
||||
|
||||
return NewJobRunLookupData(inputMap, triggeredBy), nil
|
||||
}
|
||||
|
||||
func NewJobRunLookupData(input map[string]interface{}, triggeredBy TriggeredBy) JobRunLookupData {
|
||||
return JobRunLookupData{
|
||||
Input: input,
|
||||
TriggeredBy: triggeredBy,
|
||||
Steps: map[string]StepData{},
|
||||
}
|
||||
}
|
||||
|
||||
func GetJobRunLookupData(data *types.JSON) (JobRunLookupData, error) {
|
||||
if data == nil {
|
||||
return JobRunLookupData{}, nil
|
||||
}
|
||||
|
||||
currData := JobRunLookupData{}
|
||||
err := FromJSONType(data, &currData)
|
||||
|
||||
if err != nil {
|
||||
return JobRunLookupData{}, fmt.Errorf("failed to convert data to map: %w", err)
|
||||
}
|
||||
|
||||
return currData, nil
|
||||
}
|
||||
|
||||
// func AddStepOutput(data *types.JSON, stepReadableId string, stepOutput []byte) ([]byte, error) {
|
||||
// if data == nil {
|
||||
// data = &types.JSON{}
|
||||
// }
|
||||
|
||||
// unquoted, err := strconv.Unquote(string(stepOutput))
|
||||
|
||||
// if err == nil {
|
||||
// stepOutput = []byte(unquoted)
|
||||
// }
|
||||
|
||||
// outputMap, err := jsonBytesToMap(stepOutput)
|
||||
|
||||
// if err != nil {
|
||||
// return nil, fmt.Errorf("failed to convert step output to map: %w", err)
|
||||
// }
|
||||
|
||||
// currData := JobRunLookupData{}
|
||||
// err = FromJSONType(data, &currData)
|
||||
|
||||
// if err != nil {
|
||||
// return nil, fmt.Errorf("failed to convert data to map: %w", err)
|
||||
// }
|
||||
|
||||
// if currData.Steps == nil {
|
||||
// currData.Steps = map[string]StepData{}
|
||||
// }
|
||||
|
||||
// currData.Steps[stepReadableId] = outputMap
|
||||
|
||||
// jsonBytes, err := json.Marshal(currData)
|
||||
|
||||
// if err != nil {
|
||||
// return nil, fmt.Errorf("failed to marshal data: %w", err)
|
||||
// }
|
||||
|
||||
// return jsonBytes, nil
|
||||
// }
|
||||
|
||||
@@ -23,10 +23,10 @@ func ToJSONMap(data interface{}) (map[string]interface{}, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jsonBytesToMap(jsonBytes)
|
||||
return JSONBytesToMap(jsonBytes)
|
||||
}
|
||||
|
||||
func jsonBytesToMap(jsonBytes []byte) (map[string]interface{}, error) {
|
||||
func JSONBytesToMap(jsonBytes []byte) (map[string]interface{}, error) {
|
||||
dataMap := map[string]interface{}{}
|
||||
|
||||
err := json.Unmarshal(jsonBytes, &dataMap)
|
||||
|
||||
@@ -400,9 +400,39 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *task
|
||||
var inputBytes []byte
|
||||
var retryCount = stepRun.RetryCount + 1
|
||||
|
||||
// update the input schema for the step run based on the new input
|
||||
if payload.InputData != "" {
|
||||
// update the input schema for the step run based on the new input
|
||||
jsonSchemaBytes, err := schema.SchemaBytesFromBytes([]byte(payload.InputData))
|
||||
inputBytes = []byte(payload.InputData)
|
||||
|
||||
// Merge the existing input data with the new input data. We don't blindly trust the
|
||||
// input data because the user could have deleted fields that are required by the step.
|
||||
// A better solution would be to validate the user input ahead of time.
|
||||
// NOTE: this is an expensive operation.
|
||||
if currentInput, ok := stepRun.Input(); ok {
|
||||
inputMap, err := datautils.JSONBytesToMap([]byte(payload.InputData))
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not convert input data to map: %w", err)
|
||||
}
|
||||
|
||||
currentInputMap, err := datautils.JSONBytesToMap(currentInput)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not convert current input to map: %w", err)
|
||||
}
|
||||
|
||||
mergedInput := datautils.MergeMaps(currentInputMap, inputMap)
|
||||
|
||||
mergedInputBytes, err := json.Marshal(mergedInput)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal merged input: %w", err)
|
||||
}
|
||||
|
||||
inputBytes = mergedInputBytes
|
||||
}
|
||||
|
||||
jsonSchemaBytes, err := schema.SchemaBytesFromBytes(inputBytes)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -414,8 +444,6 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *task
|
||||
return err
|
||||
}
|
||||
|
||||
inputBytes = []byte(payload.InputData)
|
||||
|
||||
// if the input data has been manually set, we reset the retry count as this is a user-triggered retry
|
||||
retryCount = 0
|
||||
} else {
|
||||
|
||||
@@ -45,8 +45,15 @@ const workflow: Workflow = {
|
||||
{
|
||||
name: 'dag-step4',
|
||||
parents: ['dag-step1', 'dag-step3'],
|
||||
run: (ctx) => {
|
||||
console.log('executed step4!');
|
||||
run: async (ctx) => {
|
||||
await sleep(5000);
|
||||
|
||||
// simulate a really slow network call
|
||||
setTimeout(async () => {
|
||||
await sleep(1000);
|
||||
ctx.playground('slow', 'call');
|
||||
}, 5000);
|
||||
|
||||
return { step4: 'step4' };
|
||||
},
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user