mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-30 05:09:44 -06:00
fix: continue processing events after local scheduling
This commit is contained in:
@@ -996,6 +996,10 @@ func (tc *TasksControllerImpl) handleProcessUserEventTrigger(ctx context.Context
|
||||
eventIdToOpts := make(map[string]v1.EventTriggerOpts)
|
||||
|
||||
for _, msg := range msgs {
|
||||
if msg.WasProcessedLocally {
|
||||
continue
|
||||
}
|
||||
|
||||
opt := v1.EventTriggerOpts{
|
||||
ExternalId: msg.EventExternalId,
|
||||
Key: msg.EventKey,
|
||||
|
||||
@@ -89,6 +89,8 @@ func (i *IngestorImpl) ingest(ctx context.Context, tenant *dbsqlc.Tenant, eventO
|
||||
res = append(res, e)
|
||||
}
|
||||
|
||||
wasProcessedLocally := false
|
||||
|
||||
if i.localScheduler != nil {
|
||||
localWorkerIds := map[string]struct{}{}
|
||||
|
||||
@@ -138,15 +140,12 @@ func (i *IngestorImpl) ingest(ctx context.Context, tenant *dbsqlc.Tenant, eventO
|
||||
if dispatcherErr != nil {
|
||||
i.l.Error().Err(dispatcherErr).Msg("could not handle local assignments")
|
||||
}
|
||||
|
||||
// we return nil because the failed assignments would have been requeued by the local dispatcher,
|
||||
// and we have already written the tasks to the database
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// if there's no scheduling error, we return here because the tasks have been scheduled optimistically
|
||||
// if there's no scheduling error, the event was processed locally. Note that we don't return here because
|
||||
// we still need to enqueue the event to ensure downstream processing (triggers, durable events)
|
||||
if schedulingErr == nil {
|
||||
return res, nil
|
||||
wasProcessedLocally = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,6 +155,8 @@ func (i *IngestorImpl) ingest(ctx context.Context, tenant *dbsqlc.Tenant, eventO
|
||||
var outerErr error
|
||||
|
||||
for _, event := range eventOpts {
|
||||
event.WasProcessedLocally = wasProcessedLocally
|
||||
|
||||
msg, err := msgqueue.NewTenantMessage(
|
||||
tenantId,
|
||||
"user-event",
|
||||
|
||||
@@ -15,6 +15,7 @@ type UserEventTaskPayload struct {
|
||||
EventPriority *int32 `json:"event_priority,omitempty"`
|
||||
EventScope *string `json:"event_scope,omitempty"`
|
||||
TriggeringWebhookName *string `json:"triggering_webhook_name,omitempty"`
|
||||
WasProcessedLocally bool `json:"was_processed_locally"`
|
||||
}
|
||||
|
||||
func NewInternalEventMessage(tenantId string, timestamp time.Time, events ...v1.InternalTaskEvent) (*msgqueue.Message, error) {
|
||||
|
||||
Reference in New Issue
Block a user