mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-23 01:30:17 -06:00
Fix: Streaming Bugs (#1913)
* fix: bug with json parsing failing * fix: hang up on cancel and fail * fix: pub stream events even if tenant pubs are disabled * fix: condition * fix: eq
This commit is contained in:
@@ -231,7 +231,7 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
|
||||
}
|
||||
|
||||
// if this is a tenant msg, publish to the tenant exchange
|
||||
if !t.disableTenantExchangePubs && msg.TenantID != "" {
|
||||
if (!t.disableTenantExchangePubs || msg.ID == "task-stream-event") && msg.TenantID != "" {
|
||||
// determine if the tenant exchange exists
|
||||
if _, ok := t.tenantIdCache.Get(msg.TenantID); !ok {
|
||||
// register the tenant exchange
|
||||
|
||||
@@ -923,7 +923,7 @@ func (s *DispatcherImpl) subscribeToWorkflowEventsByWorkflowRunIdV1(workflowRunI
|
||||
}
|
||||
|
||||
isWorkflowRunCompletedEvent := e.ResourceType == contracts.ResourceType_RESOURCE_TYPE_WORKFLOW_RUN &&
|
||||
e.EventType == contracts.ResourceEventType_RESOURCE_EVENT_TYPE_COMPLETED
|
||||
(e.EventType == contracts.ResourceEventType_RESOURCE_EVENT_TYPE_COMPLETED || e.EventType == contracts.ResourceEventType_RESOURCE_EVENT_TYPE_FAILED || e.EventType == contracts.ResourceEventType_RESOURCE_EVENT_TYPE_CANCELLED)
|
||||
|
||||
if isWorkflowRunCompletedEvent {
|
||||
e.Hangup = true
|
||||
|
||||
@@ -417,6 +417,11 @@ type TaskMetadata struct {
|
||||
|
||||
func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error) {
|
||||
var tasks []TaskMetadata
|
||||
|
||||
if len(jsonData) == 0 {
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
err := json.Unmarshal(jsonData, &tasks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user