feat: deduplicated enqueue error and additional context methods (#747)

* feat: additional context fields and dedupe error

* fix: case on error properly
This commit is contained in:
abelanger5
2024-07-26 11:32:56 -07:00
committed by GitHub
parent 16827223da
commit 9efd9368fd
3 changed files with 79 additions and 14 deletions
+22
View File
@@ -9,6 +9,8 @@ import (
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
admincontracts "github.com/hatchet-dev/hatchet/internal/services/admin/contracts"
@@ -36,6 +38,14 @@ type AdminClient interface {
PutRateLimit(key string, opts *types.RateLimitOpts) error
}
type DedupeViolationErr struct {
details string
}
func (d *DedupeViolationErr) Error() string {
return fmt.Sprintf("DedupeViolationErr: %s", d.details)
}
type adminClientImpl struct {
client admincontracts.WorkflowServiceClient
@@ -191,6 +201,12 @@ func (a *adminClientImpl) RunWorkflow(workflowName string, input interface{}, op
res, err := a.client.TriggerWorkflow(a.ctx.newContext(context.Background()), &request)
if err != nil {
if status.Code(err) == codes.AlreadyExists {
return "", &DedupeViolationErr{
details: fmt.Sprintf("could not trigger workflow: %s", err.Error()),
}
}
return "", fmt.Errorf("could not trigger workflow: %w", err)
}
@@ -221,6 +237,12 @@ func (a *adminClientImpl) RunChildWorkflow(workflowName string, input interface{
})
if err != nil {
if status.Code(err) == codes.AlreadyExists {
return "", &DedupeViolationErr{
details: fmt.Sprintf("could not trigger child workflow: %s", err.Error()),
}
}
return "", fmt.Errorf("could not trigger child workflow: %w", err)
}
+41 -14
View File
@@ -97,6 +97,18 @@ type Action struct {
// the count of the retry attempt
RetryCount int32 `json:"retryCount"`
// the additional metadata for the workflow run
AdditionalMetadata map[string]string
// the child index for the workflow run
ChildIndex *int32
// the child key for the workflow run
ChildKey *string
// the parent workflow run id
ParentWorkflowRunId *string
}
type WorkerActionListener interface {
@@ -329,21 +341,36 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error
unquoted := assignedAction.ActionPayload
var additionalMetadata map[string]string
if assignedAction.AdditionalMetadata != nil {
err := json.Unmarshal([]byte(*assignedAction.AdditionalMetadata), &additionalMetadata)
if err != nil {
a.l.Error().Err(err).Msgf("could not unmarshal additional metadata")
continue
}
}
ch <- &Action{
TenantId: assignedAction.TenantId,
WorkflowRunId: assignedAction.WorkflowRunId,
GetGroupKeyRunId: assignedAction.GetGroupKeyRunId,
WorkerId: a.workerId,
JobId: assignedAction.JobId,
JobName: assignedAction.JobName,
JobRunId: assignedAction.JobRunId,
StepId: assignedAction.StepId,
StepName: assignedAction.StepName,
StepRunId: assignedAction.StepRunId,
ActionId: assignedAction.ActionId,
ActionType: actionType,
ActionPayload: []byte(unquoted),
RetryCount: assignedAction.RetryCount,
TenantId: assignedAction.TenantId,
WorkflowRunId: assignedAction.WorkflowRunId,
GetGroupKeyRunId: assignedAction.GetGroupKeyRunId,
WorkerId: a.workerId,
JobId: assignedAction.JobId,
JobName: assignedAction.JobName,
JobRunId: assignedAction.JobRunId,
StepId: assignedAction.StepId,
StepName: assignedAction.StepName,
StepRunId: assignedAction.StepRunId,
ActionId: assignedAction.ActionId,
ActionType: actionType,
ActionPayload: []byte(unquoted),
RetryCount: assignedAction.RetryCount,
AdditionalMetadata: additionalMetadata,
ChildIndex: assignedAction.ChildWorkflowIndex,
ChildKey: assignedAction.ChildWorkflowKey,
ParentWorkflowRunId: assignedAction.ParentWorkflowRunId,
}
}
}()
+16
View File
@@ -321,6 +321,22 @@ func (h *hatchetContext) SpawnWorkflow(workflowName string, input any, opts *Spa
}, nil
}
func (h *hatchetContext) AdditionalMetadata() map[string]string {
return h.a.AdditionalMetadata
}
func (h *hatchetContext) ChildIndex() *int32 {
return h.a.ChildIndex
}
func (h *hatchetContext) ChildKey() *string {
return h.a.ChildKey
}
func (h *hatchetContext) ParentWorkflowRunId() *string {
return h.a.ParentWorkflowRunId
}
func (h *hatchetContext) populateStepDataForGroupKeyRun() error {
if h.stepData != nil {
return nil