diff --git a/cmd/hatchet-engine/engine/run.go b/cmd/hatchet-engine/engine/run.go index 5ba21af93..cd3bae3b7 100644 --- a/cmd/hatchet-engine/engine/run.go +++ b/cmd/hatchet-engine/engine/run.go @@ -431,6 +431,7 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro dispatcher.WithLogger(sc.Logger), dispatcher.WithEntitlementsRepository(sc.EntitlementRepository), dispatcher.WithCache(cacheInstance), + dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize), ) if err != nil { @@ -879,6 +880,7 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro dispatcher.WithLogger(sc.Logger), dispatcher.WithEntitlementsRepository(sc.EntitlementRepository), dispatcher.WithCache(cacheInstance), + dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize), ) if err != nil { diff --git a/internal/services/dispatcher/dispatcher.go b/internal/services/dispatcher/dispatcher.go index 7c85d5e76..33fdc3ddc 100644 --- a/internal/services/dispatcher/dispatcher.go +++ b/internal/services/dispatcher/dispatcher.go @@ -54,6 +54,7 @@ type DispatcherImpl struct { repo repository.EngineRepository repov1 v1.Repository cache cache.Cacheable + payloadSizeThreshold int entitlements repository.EntitlementsRepository @@ -126,16 +127,17 @@ func (w *workers) Delete(workerId string) { type DispatcherOpt func(*DispatcherOpts) type DispatcherOpts struct { - mq msgqueue.MessageQueue - mqv1 msgqueuev1.MessageQueue - l *zerolog.Logger - dv datautils.DataDecoderValidator - repo repository.EngineRepository - repov1 v1.Repository - entitlements repository.EntitlementsRepository - dispatcherId string - alerter hatcheterrors.Alerter - cache cache.Cacheable + mq msgqueue.MessageQueue + mqv1 msgqueuev1.MessageQueue + l *zerolog.Logger + dv datautils.DataDecoderValidator + repo repository.EngineRepository + repov1 v1.Repository + entitlements repository.EntitlementsRepository + dispatcherId string + alerter hatcheterrors.Alerter + cache cache.Cacheable + payloadSizeThreshold int } func defaultDispatcherOpts() *DispatcherOpts { @@ -143,10 +145,11 @@ func defaultDispatcherOpts() *DispatcherOpts { alerter := hatcheterrors.NoOpAlerter{} return &DispatcherOpts{ - l: &logger, - dv: datautils.NewDataDecoderValidator(), - dispatcherId: uuid.New().String(), - alerter: alerter, + l: &logger, + dv: datautils.NewDataDecoderValidator(), + dispatcherId: uuid.New().String(), + alerter: alerter, + payloadSizeThreshold: 3 * 1024 * 1024, } } @@ -210,6 +213,12 @@ func WithCache(cache cache.Cacheable) DispatcherOpt { } } +func WithPayloadSizeThreshold(threshold int) DispatcherOpt { + return func(opts *DispatcherOpts) { + opts.payloadSizeThreshold = threshold + } +} + func New(fs ...DispatcherOpt) (*DispatcherImpl, error) { opts := defaultDispatcherOpts() @@ -257,20 +266,21 @@ func New(fs ...DispatcherOpt) (*DispatcherImpl, error) { pubBuffer := msgqueuev1.NewMQPubBuffer(opts.mqv1) return &DispatcherImpl{ - mq: opts.mq, - mqv1: opts.mqv1, - pubBuffer: pubBuffer, - l: opts.l, - dv: opts.dv, - v: validator.NewDefaultValidator(), - repo: opts.repo, - repov1: opts.repov1, - entitlements: opts.entitlements, - dispatcherId: opts.dispatcherId, - workers: &workers{}, - s: s, - a: a, - cache: opts.cache, + mq: opts.mq, + mqv1: opts.mqv1, + pubBuffer: pubBuffer, + l: opts.l, + dv: opts.dv, + v: validator.NewDefaultValidator(), + repo: opts.repo, + repov1: opts.repov1, + entitlements: opts.entitlements, + dispatcherId: opts.dispatcherId, + workers: &workers{}, + s: s, + a: a, + cache: opts.cache, + payloadSizeThreshold: opts.payloadSizeThreshold, }, nil } diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 7ff4bc2ce..678dc577d 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -865,23 +865,21 @@ func (s *sendTimeFilter) canSend() bool { return true } -const payloadSizeThreshold = 3 * 1024 * 1024 - -func cleanResults(results []*contracts.StepRunResult) []*contracts.StepRunResult { +func (d *DispatcherImpl) cleanResults(results []*contracts.StepRunResult) []*contracts.StepRunResult { totalSize, sizeOfOutputs, _ := calculateResultsSize(results) - if totalSize < payloadSizeThreshold { + if totalSize < d.payloadSizeThreshold { return results } - if sizeOfOutputs >= payloadSizeThreshold { + if sizeOfOutputs >= d.payloadSizeThreshold { return nil } // otherwise, attempt to clean the results by removing large error fields cleanedResults := make([]*contracts.StepRunResult, 0, len(results)) - fieldThreshold := (payloadSizeThreshold - sizeOfOutputs) / len(results) // how much overhead we'd have per result or error field, in the worst case + fieldThreshold := (d.payloadSizeThreshold - sizeOfOutputs) / len(results) // how much overhead we'd have per result or error field, in the worst case for _, result := range results { if result == nil { @@ -897,7 +895,7 @@ func cleanResults(results []*contracts.StepRunResult) []*contracts.StepRunResult } // if we are still over the limit, we just return nil - if totalSize, _, _ := calculateResultsSize(cleanedResults); totalSize > payloadSizeThreshold { + if totalSize, _, _ := calculateResultsSize(cleanedResults); totalSize > d.payloadSizeThreshold { return nil } @@ -951,7 +949,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV0(server contracts.Dispatcher_S sendMu := sync.Mutex{} sendEvent := func(e *contracts.WorkflowRunEvent) error { - results := cleanResults(e.Results) + results := s.cleanResults(e.Results) if results == nil { s.l.Warn().Msgf("results size for workflow run %s exceeds 3MB and cannot be reduced", e.WorkflowRunId) diff --git a/internal/services/dispatcher/server_v1.go b/internal/services/dispatcher/server_v1.go index 5aa3308e5..17430daa2 100644 --- a/internal/services/dispatcher/server_v1.go +++ b/internal/services/dispatcher/server_v1.go @@ -47,7 +47,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S iterMu := sync.Mutex{} sendEvent := func(e *contracts.WorkflowRunEvent) error { - results := cleanResults(e.Results) + results := s.cleanResults(e.Results) if results == nil { s.l.Warn().Msgf("results size for workflow run %s exceeds 3MB and cannot be reduced", e.WorkflowRunId)