Remove dispatch backlog, replace with timeout lock acquisition (#3290)

This commit is contained in:
Julius Park
2026-03-17 16:09:58 -04:00
committed by GitHub
parent f784ea7612
commit 86b25fe4e8
8 changed files with 153 additions and 125 deletions

View File

@@ -345,7 +345,7 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig, cleanup *cleanup.
dispatcher.WithLogger(sc.Logger),
dispatcher.WithCache(cacheInstance),
dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize),
dispatcher.WithDefaultMaxWorkerBacklogSize(int64(sc.Runtime.GRPCWorkerStreamMaxBacklogSize)),
dispatcher.WithDefaultMaxWorkerLockAcquisitionTime(sc.Runtime.GRPCMaxWorkerLockAcquisitionTime),
dispatcher.WithWorkflowRunBufferSize(sc.Runtime.WorkflowRunBufferSize),
dispatcher.WithStreamEventBufferTimeout(sc.Runtime.StreamEventBufferTimeout),
dispatcher.WithVersion(sc.Version),
@@ -771,7 +771,7 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig, cleanup *cleanup.
dispatcher.WithLogger(sc.Logger),
dispatcher.WithCache(cacheInstance),
dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize),
dispatcher.WithDefaultMaxWorkerBacklogSize(int64(sc.Runtime.GRPCWorkerStreamMaxBacklogSize)),
dispatcher.WithDefaultMaxWorkerLockAcquisitionTime(sc.Runtime.GRPCMaxWorkerLockAcquisitionTime),
dispatcher.WithWorkflowRunBufferSize(sc.Runtime.WorkflowRunBufferSize),
dispatcher.WithStreamEventBufferTimeout(sc.Runtime.StreamEventBufferTimeout),
dispatcher.WithVersion(sc.Version),

View File

@@ -33,20 +33,20 @@ type Dispatcher interface {
type DispatcherImpl struct {
contracts.UnimplementedDispatcherServer
s gocron.Scheduler
mqv1 msgqueue.MessageQueue
pubBuffer *msgqueue.MQPubBuffer
sharedNonBufferedReaderv1 *msgqueue.SharedTenantReader
sharedBufferedReaderv1 *msgqueue.SharedBufferedTenantReader
l *zerolog.Logger
dv datautils.DataDecoderValidator
v validator.Validator
repov1 v1.Repository
cache cache.Cacheable
payloadSizeThreshold int
defaultMaxWorkerBacklogSize int64
workflowRunBufferSize int
streamEventBufferTimeout time.Duration
s gocron.Scheduler
mqv1 msgqueue.MessageQueue
pubBuffer *msgqueue.MQPubBuffer
sharedNonBufferedReaderv1 *msgqueue.SharedTenantReader
sharedBufferedReaderv1 *msgqueue.SharedBufferedTenantReader
l *zerolog.Logger
dv datautils.DataDecoderValidator
v validator.Validator
repov1 v1.Repository
cache cache.Cacheable
payloadSizeThreshold int
defaultMaxWorkerLockAcquisitionTime time.Duration
workflowRunBufferSize int
streamEventBufferTimeout time.Duration
dispatcherId uuid.UUID
workers *workers
@@ -121,19 +121,19 @@ func (w *workers) Delete(workerId uuid.UUID) {
type DispatcherOpt func(*DispatcherOpts)
type DispatcherOpts struct {
mqv1 msgqueue.MessageQueue
l *zerolog.Logger
dv datautils.DataDecoderValidator
repov1 v1.Repository
dispatcherId uuid.UUID
alerter hatcheterrors.Alerter
cache cache.Cacheable
analytics analytics.Analytics
payloadSizeThreshold int
defaultMaxWorkerBacklogSize int64
workflowRunBufferSize int
streamEventBufferTimeout time.Duration
version string
mqv1 msgqueue.MessageQueue
l *zerolog.Logger
dv datautils.DataDecoderValidator
repov1 v1.Repository
dispatcherId uuid.UUID
alerter hatcheterrors.Alerter
cache cache.Cacheable
analytics analytics.Analytics
payloadSizeThreshold int
defaultMaxWorkerLockAcquisitionTime time.Duration
workflowRunBufferSize int
streamEventBufferTimeout time.Duration
version string
}
func defaultDispatcherOpts() *DispatcherOpts {
@@ -141,15 +141,15 @@ func defaultDispatcherOpts() *DispatcherOpts {
alerter := hatcheterrors.NoOpAlerter{}
return &DispatcherOpts{
l: &logger,
dv: datautils.NewDataDecoderValidator(),
dispatcherId: uuid.New(),
alerter: alerter,
analytics: analytics.NoOpAnalytics{},
payloadSizeThreshold: 3 * 1024 * 1024,
defaultMaxWorkerBacklogSize: 20,
workflowRunBufferSize: 1000,
streamEventBufferTimeout: 5 * time.Second,
l: &logger,
dv: datautils.NewDataDecoderValidator(),
dispatcherId: uuid.New(),
alerter: alerter,
analytics: analytics.NoOpAnalytics{},
payloadSizeThreshold: 3 * 1024 * 1024,
defaultMaxWorkerLockAcquisitionTime: 250 * time.Millisecond,
workflowRunBufferSize: 1000,
streamEventBufferTimeout: 5 * time.Second,
}
}
@@ -201,9 +201,9 @@ func WithPayloadSizeThreshold(threshold int) DispatcherOpt {
}
}
func WithDefaultMaxWorkerBacklogSize(size int64) DispatcherOpt {
func WithDefaultMaxWorkerLockAcquisitionTime(t time.Duration) DispatcherOpt {
return func(opts *DispatcherOpts) {
opts.defaultMaxWorkerBacklogSize = size
opts.defaultMaxWorkerLockAcquisitionTime = t
}
}
@@ -266,23 +266,23 @@ func New(fs ...DispatcherOpt) (*DispatcherImpl, error) {
pubBuffer := msgqueue.NewMQPubBuffer(opts.mqv1)
return &DispatcherImpl{
mqv1: opts.mqv1,
pubBuffer: pubBuffer,
l: opts.l,
dv: opts.dv,
v: validator.NewDefaultValidator(),
repov1: opts.repov1,
dispatcherId: opts.dispatcherId,
workers: &workers{},
s: s,
a: a,
cache: opts.cache,
payloadSizeThreshold: opts.payloadSizeThreshold,
defaultMaxWorkerBacklogSize: opts.defaultMaxWorkerBacklogSize,
workflowRunBufferSize: opts.workflowRunBufferSize,
analytics: opts.analytics,
streamEventBufferTimeout: opts.streamEventBufferTimeout,
version: opts.version,
mqv1: opts.mqv1,
pubBuffer: pubBuffer,
l: opts.l,
dv: opts.dv,
v: validator.NewDefaultValidator(),
repov1: opts.repov1,
dispatcherId: opts.dispatcherId,
workers: &workers{},
s: s,
a: a,
cache: opts.cache,
payloadSizeThreshold: opts.payloadSizeThreshold,
defaultMaxWorkerLockAcquisitionTime: opts.defaultMaxWorkerLockAcquisitionTime,
workflowRunBufferSize: opts.workflowRunBufferSize,
analytics: opts.analytics,
streamEventBufferTimeout: opts.streamEventBufferTimeout,
version: opts.version,
}, nil
}

View File

@@ -213,7 +213,7 @@ func (s *DispatcherImpl) Listen(request *contracts.WorkerListenRequest, stream c
fin := make(chan bool)
s.workers.Add(workerId, sessionId, newSubscribedWorker(stream, fin, workerId, 20, s.pubBuffer))
s.workers.Add(workerId, sessionId, newSubscribedWorker(stream, fin, workerId, s.defaultMaxWorkerLockAcquisitionTime, s.pubBuffer))
defer func() {
// non-blocking send
@@ -341,7 +341,7 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
fin := make(chan bool)
s.workers.Add(workerId, sessionId, newSubscribedWorker(stream, fin, workerId, s.defaultMaxWorkerBacklogSize, s.pubBuffer))
s.workers.Add(workerId, sessionId, newSubscribedWorker(stream, fin, workerId, s.defaultMaxWorkerLockAcquisitionTime, s.pubBuffer))
defer func() {
// non-blocking send

View File

@@ -1,7 +1,7 @@
package dispatcher
import (
"sync"
"time"
"github.com/google/uuid"
@@ -16,15 +16,10 @@ type subscribedWorker struct {
// finished is used to signal closure of a client subscribing goroutine
finished chan<- bool
sendMu sync.Mutex
sendLock *TimeoutLock
workerId uuid.UUID
backlogSize int64
backlogSizeMu sync.Mutex
maxBacklogSize int64
pubBuffer *msgqueue.MQPubBuffer
}
@@ -32,18 +27,15 @@ func newSubscribedWorker(
stream contracts.Dispatcher_ListenServer,
fin chan<- bool,
workerId uuid.UUID,
maxBacklogSize int64,
maxLockAcquisitionTime time.Duration,
pubBuffer *msgqueue.MQPubBuffer,
) *subscribedWorker {
if maxBacklogSize <= 0 {
maxBacklogSize = 20
}
lock := NewTimeoutLock(maxLockAcquisitionTime)
return &subscribedWorker{
stream: stream,
finished: fin,
workerId: workerId,
maxBacklogSize: maxBacklogSize,
pubBuffer: pubBuffer,
stream: stream,
finished: fin,
workerId: workerId,
pubBuffer: pubBuffer,
sendLock: lock,
}
}

View File

@@ -59,32 +59,6 @@ func (worker *subscribedWorker) StartTaskFromBulk(
return nil
}
func (worker *subscribedWorker) incBacklogSize(delta int64) bool {
worker.backlogSizeMu.Lock()
defer worker.backlogSizeMu.Unlock()
if worker.backlogSize+delta > worker.maxBacklogSize {
return false
}
worker.backlogSize += delta
return true
}
func (worker *subscribedWorker) decBacklogSize(delta int64) int64 {
worker.backlogSizeMu.Lock()
defer worker.backlogSizeMu.Unlock()
worker.backlogSize -= delta
if worker.backlogSize < 0 {
worker.backlogSize = 0
}
return worker.backlogSize
}
func (worker *subscribedWorker) sendToWorker(
ctx context.Context,
action *contracts.AssignedAction,
@@ -121,12 +95,10 @@ func (worker *subscribedWorker) sendToWorker(
encodeSpan.End()
incSuccess := worker.incBacklogSize(1)
if !incSuccess {
err := fmt.Errorf("worker backlog size exceeded max of %d", worker.maxBacklogSize)
if !worker.sendLock.Acquire() {
err = fmt.Errorf("could not acquire worker send mutex, flow control is active")
span.RecordError(err)
span.SetStatus(codes.Error, "worker backlog size exceeded max")
span.SetStatus(codes.Error, "flow control is active")
return err
}
@@ -134,10 +106,8 @@ func (worker *subscribedWorker) sendToWorker(
_, lockSpan := telemetry.NewSpan(ctx, "acquire-worker-stream-lock")
worker.sendMu.Lock()
defer worker.sendMu.Unlock()
lockSpan.End()
defer worker.sendLock.Release()
defer lockSpan.End()
telemetry.WithAttributes(span, telemetry.AttributeKV{
Key: "lock.duration_ms",
@@ -153,8 +123,6 @@ func (worker *subscribedWorker) sendToWorker(
go func() {
defer close(sentCh)
defer worker.decBacklogSize(1)
err = worker.stream.SendMsg(msg)
if err != nil {
@@ -195,9 +163,8 @@ func (worker *subscribedWorker) CancelTask(
action.ActionType = contracts.ActionType_CANCEL_STEP_RUN
sentCh := make(chan error, 1)
incSuccess := worker.incBacklogSize(1)
if !incSuccess {
acquiredLock := worker.sendLock.Acquire()
if !acquiredLock {
msg, err := tasktypesv1.MonitoringEventMessageFromInternal(
task.TenantID,
tasktypesv1.CreateMonitoringEventPayload{
@@ -206,7 +173,7 @@ func (worker *subscribedWorker) CancelTask(
WorkerId: &worker.workerId,
EventType: sqlcv1.V1EventTypeOlapCOULDNOTSENDTOWORKER,
EventTimestamp: time.Now().UTC(),
EventMessage: "Worker backlog size exceeded",
EventMessage: fmt.Sprintf("Could not acquire send lock before timeout of %s ", worker.sendLock.timeout),
},
)
if err != nil {
@@ -223,10 +190,7 @@ func (worker *subscribedWorker) CancelTask(
go func() {
defer close(sentCh)
defer worker.decBacklogSize(1)
worker.sendMu.Lock()
defer worker.sendMu.Unlock()
defer worker.sendLock.Release()
sentCh <- worker.stream.Send(action)
}()

View File

@@ -0,0 +1,32 @@
package dispatcher
import (
"time"
)
type TimeoutLock struct {
sem chan struct{}
timeout time.Duration
}
func (l *TimeoutLock) Acquire() bool {
select {
// attempt to send to the semaphore, blocks on contention because it has a buffer of 1
case l.sem <- struct{}{}:
return true
// timing out dequeues the semaphore send
case <-time.After(l.timeout):
return false
}
}
func (l *TimeoutLock) Release() {
<-l.sem
}
func NewTimeoutLock(timeout time.Duration) *TimeoutLock {
return &TimeoutLock{
sem: make(chan struct{}, 1),
timeout: timeout,
}
}

View File

@@ -0,0 +1,40 @@
package dispatcher
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func BenchmarkLockAcquisition(b *testing.B) {
l := NewTimeoutLock(time.Second)
for i := 0; i < b.N; i++ {
go func() {
if l.Acquire() {
l.Release()
}
}()
}
}
func TestLockAcquisitionTimeout(t *testing.T) {
// Not all the locks should be acquired because of the timeout
l := NewTimeoutLock(5 * time.Millisecond)
numberAcquired := 0
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if l.Acquire() {
defer l.Release()
time.Sleep(time.Millisecond)
numberAcquired++
}
}()
}
wg.Wait()
assert.NotEqual(t, 10, numberAcquired)
}

View File

@@ -185,9 +185,9 @@ type ConfigFileRuntime struct {
// GRPCMaxMsgSize is the maximum message size that the grpc server will accept
GRPCMaxMsgSize int `mapstructure:"grpcMaxMsgSize" json:"grpcMaxMsgSize,omitempty" default:"4194304"`
// GRPCWorkerStreamMaxBacklogSize is the maximum number of messages that can be queued for a worker before we start rejecting new
// messages. Default is 20.
GRPCWorkerStreamMaxBacklogSize int `mapstructure:"grpcWorkerStreamMaxBacklogSize" json:"grpcWorkerStreamMaxBacklogSize,omitempty" default:"20"`
// GRPCWorkerMaxLockAcquisitionTime is the maximum amount of time the dispatcher will wait while attempting
// to send a messages to a worker. If it waits longer, the request will be rejected. Default is 250ms
GRPCMaxWorkerLockAcquisitionTime time.Duration `mapstructure:"grpcWorkerMaxLockAcquisitionTime" json:"grpcWorkerMaxLockAcquisitionTime,omitempty" default:"250ms"`
// GRPCStaticStreamWindowSize sets the static stream window size for the grpc server. This can help with performance
// with overloaded workers and large messages. Default is 10MB.
@@ -684,7 +684,7 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("runtime.grpcBroadcastAddress", "SERVER_GRPC_BROADCAST_ADDRESS")
_ = v.BindEnv("runtime.grpcInsecure", "SERVER_GRPC_INSECURE")
_ = v.BindEnv("runtime.grpcMaxMsgSize", "SERVER_GRPC_MAX_MSG_SIZE")
_ = v.BindEnv("runtime.grpcWorkerStreamMaxBacklogSize", "SERVER_GRPC_WORKER_STREAM_MAX_BACKLOG_SIZE")
_ = v.BindEnv("runtime.grpcWorkerMaxLockAcquisitionTime", "SERVER_GRPC_WORKER_MAX_LOCK_ACQUISITION_TIME")
_ = v.BindEnv("runtime.grpcStaticStreamWindowSize", "SERVER_GRPC_STATIC_STREAM_WINDOW_SIZE")
_ = v.BindEnv("runtime.grpcRateLimit", "SERVER_GRPC_RATE_LIMIT")
_ = v.BindEnv("runtime.schedulerConcurrencyRateLimit", "SCHEDULER_CONCURRENCY_RATE_LIMIT")