Feat: env var for stream event buffer timeout (#3223)

* feat: env var for stream event buffer timeout

* chore: gen
This commit is contained in:
matt
2026-03-11 08:41:18 -07:00
committed by GitHub
parent f9d083600f
commit b942092ba5
4 changed files with 18 additions and 1 deletions
+2
View File
@@ -362,6 +362,7 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize),
dispatcher.WithDefaultMaxWorkerBacklogSize(int64(sc.Runtime.GRPCWorkerStreamMaxBacklogSize)),
dispatcher.WithWorkflowRunBufferSize(sc.Runtime.WorkflowRunBufferSize),
dispatcher.WithStreamEventBufferTimeout(sc.Runtime.StreamEventBufferTimeout),
dispatcher.WithVersion(sc.Version),
)
@@ -800,6 +801,7 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize),
dispatcher.WithDefaultMaxWorkerBacklogSize(int64(sc.Runtime.GRPCWorkerStreamMaxBacklogSize)),
dispatcher.WithWorkflowRunBufferSize(sc.Runtime.WorkflowRunBufferSize),
dispatcher.WithStreamEventBufferTimeout(sc.Runtime.StreamEventBufferTimeout),
dispatcher.WithVersion(sc.Version),
)
@@ -44,6 +44,7 @@ type DispatcherImpl struct {
payloadSizeThreshold int
defaultMaxWorkerBacklogSize int64
workflowRunBufferSize int
streamEventBufferTimeout time.Duration
dispatcherId uuid.UUID
workers *workers
@@ -125,6 +126,7 @@ type DispatcherOpts struct {
payloadSizeThreshold int
defaultMaxWorkerBacklogSize int64
workflowRunBufferSize int
streamEventBufferTimeout time.Duration
version string
}
@@ -140,6 +142,7 @@ func defaultDispatcherOpts() *DispatcherOpts {
payloadSizeThreshold: 3 * 1024 * 1024,
defaultMaxWorkerBacklogSize: 20,
workflowRunBufferSize: 1000,
streamEventBufferTimeout: 5 * time.Second,
}
}
@@ -203,6 +206,12 @@ func WithWorkflowRunBufferSize(size int) DispatcherOpt {
}
}
func WithStreamEventBufferTimeout(timeout time.Duration) DispatcherOpt {
return func(opts *DispatcherOpts) {
opts.streamEventBufferTimeout = timeout
}
}
func WithVersion(version string) DispatcherOpt {
return func(opts *DispatcherOpts) {
opts.version = version
@@ -258,6 +267,7 @@ func New(fs ...DispatcherOpt) (*DispatcherImpl, error) {
payloadSizeThreshold: opts.payloadSizeThreshold,
defaultMaxWorkerBacklogSize: opts.defaultMaxWorkerBacklogSize,
workflowRunBufferSize: opts.workflowRunBufferSize,
streamEventBufferTimeout: opts.streamEventBufferTimeout,
version: opts.version,
}, nil
}
+1 -1
View File
@@ -884,7 +884,7 @@ func (s *DispatcherImpl) subscribeToWorkflowEventsByWorkflowRunIdV1(workflowRunI
var mu sync.Mutex // Mutex to protect activeRunIds
var sendMu sync.Mutex // Mutex to protect sending messages
streamBuffer := NewStreamEventBuffer(5 * time.Second)
streamBuffer := NewStreamEventBuffer(s.streamEventBufferTimeout)
defer streamBuffer.Close()
// Handle events from the stream buffer
+5
View File
@@ -298,6 +298,10 @@ type ConfigFileRuntime struct {
// WorkflowRunBufferSize is the buffer size for workflow run event batching in the dispatcher
WorkflowRunBufferSize int `mapstructure:"workflowRunBufferSize" json:"workflowRunBufferSize,omitempty" default:"1000"`
// StreamEventBufferTimeout is the timeout duration for the stream event buffer in the dispatcher.
// This controls how long the buffer waits for out-of-order events before flushing them.
StreamEventBufferTimeout time.Duration `mapstructure:"streamEventBufferTimeout" json:"streamEventBufferTimeout,omitempty" default:"5s"`
}
type InternalClientTLSConfigFile struct {
@@ -907,6 +911,7 @@ func BindAllEnv(v *viper.Viper) {
// dispatcher options
_ = v.BindEnv("runtime.workflowRunBufferSize", "SERVER_WORKFLOW_RUN_BUFFER_SIZE")
_ = v.BindEnv("runtime.streamEventBufferTimeout", "SERVER_STREAM_EVENT_BUFFER_TIMEOUT")
// payload store options
_ = v.BindEnv("payloadStore.enablePayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_PAYLOAD_DUAL_WRITES")