diff --git a/internal/services/dispatcher/dispatcher_v1.go b/internal/services/dispatcher/dispatcher_v1.go index c799e37c8..35b87134c 100644 --- a/internal/services/dispatcher/dispatcher_v1.go +++ b/internal/services/dispatcher/dispatcher_v1.go @@ -9,7 +9,9 @@ import ( "time" "github.com/hashicorp/go-multierror" + "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" msgqueuev1 "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" "github.com/hatchet-dev/hatchet/internal/queueutils" @@ -40,10 +42,76 @@ func (worker *subscribedWorker) StartTaskFromBulk( action.ActionType = contracts.ActionType_START_STEP_RUN action.ActionPayload = string(inputBytes) + return worker.sendToWorker(ctx, action) +} + +func (worker *subscribedWorker) sendToWorker( + ctx context.Context, + action *contracts.AssignedAction, +) error { + ctx, span := telemetry.NewSpan(ctx, "send-to-worker") // nolint:ineffassign + defer span.End() + + telemetry.WithAttributes( + span, + telemetry.AttributeKV{ + Key: "worker_id", + Value: worker.workerId, + }, + ) + + telemetry.WithAttributes( + span, + telemetry.AttributeKV{ + Key: "payload_size", + Value: len(action.ActionPayload), + }, + ) + + _, encodeSpan := telemetry.NewSpan(ctx, "encode-action") + + msg := &grpc.PreparedMsg{} + err := msg.Encode(worker.stream, action) + + if err != nil { + encodeSpan.RecordError(err) + encodeSpan.End() + return fmt.Errorf("could not encode action: %w", err) + } + + encodeSpan.End() + + lockBegin := time.Now() + + _, lockSpan := telemetry.NewSpan(ctx, "acquire-worker-stream-lock") + worker.sendMu.Lock() defer worker.sendMu.Unlock() - return worker.stream.Send(action) + lockSpan.End() + + telemetry.WithAttributes(span, telemetry.AttributeKV{ + Key: "lock_duration_ms", + Value: time.Since(lockBegin).Milliseconds(), + }) + + _, streamSpan := telemetry.NewSpan(ctx, "send-worker-stream") + defer streamSpan.End() + + sendMsgBegin := time.Now() + + err = worker.stream.SendMsg(msg) + + if err != nil { + span.RecordError(err) + } + + if time.Since(sendMsgBegin) > 50*time.Millisecond { + span.SetStatus(codes.Error, "flow control detected") + span.RecordError(fmt.Errorf("send took too long, we may be in flow control: %s", time.Since(sendMsgBegin))) + } + + return err } func (worker *subscribedWorker) CancelTask( diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index e659095ec..6f7fc3e2b 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -41,6 +41,8 @@ type subscribedWorker struct { finished chan<- bool sendMu sync.Mutex + + workerId string } func (worker *subscribedWorker) StartStepRun( @@ -358,7 +360,7 @@ func (s *DispatcherImpl) Listen(request *contracts.WorkerListenRequest, stream c fin := make(chan bool) - s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin}) + s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin, workerId: request.WorkerId}) defer func() { // non-blocking send @@ -480,7 +482,7 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream fin := make(chan bool) - s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin}) + s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin, workerId: request.WorkerId}) defer func() { // non-blocking send diff --git a/internal/services/grpc/server.go b/internal/services/grpc/server.go index c322edf71..3e2e88a42 100644 --- a/internal/services/grpc/server.go +++ b/internal/services/grpc/server.go @@ -307,6 +307,10 @@ func (s *Server) startGRPC() (func() error, error) { s.config.Runtime.GRPCMaxMsgSize, )) + serverOpts = append(serverOpts, grpc.StaticStreamWindowSize( + s.config.Runtime.GRPCStaticStreamWindowSize, + )) + grpcServer := grpc.NewServer(serverOpts...) if s.ingestor != nil { diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index f0bb326d7..2739b1b39 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -148,6 +148,10 @@ type ConfigFileRuntime struct { // GRPCMaxMsgSize is the maximum message size that the grpc server will accept GRPCMaxMsgSize int `mapstructure:"grpcMaxMsgSize" json:"grpcMaxMsgSize,omitempty" default:"4194304"` + // GRPCStaticStreamWindowSize sets the static stream window size for the grpc server. This can help with performance + // with overloaded workers and large messages. Default is 10MB. + GRPCStaticStreamWindowSize int32 `mapstructure:"grpcStaticStreamWindowSize" json:"grpcStaticStreamWindowSize,omitempty" default:"10485760"` + // GRPCRateLimit is the rate limit for the grpc server. We count limits separately for the Workflow, Dispatcher and Events services. Workflow and Events service are set to this rate, Dispatcher is 10X this rate. The rate limit is per second, per engine, per api token. GRPCRateLimit float64 `mapstructure:"grpcRateLimit" json:"grpcRateLimit,omitempty" default:"1000"` @@ -609,6 +613,7 @@ func BindAllEnv(v *viper.Viper) { _ = v.BindEnv("runtime.grpcBroadcastAddress", "SERVER_GRPC_BROADCAST_ADDRESS") _ = v.BindEnv("runtime.grpcInsecure", "SERVER_GRPC_INSECURE") _ = v.BindEnv("runtime.grpcMaxMsgSize", "SERVER_GRPC_MAX_MSG_SIZE") + _ = v.BindEnv("runtime.grpcStaticStreamWindowSize", "SERVER_GRPC_STATIC_STREAM_WINDOW_SIZE") _ = v.BindEnv("runtime.grpcRateLimit", "SERVER_GRPC_RATE_LIMIT") _ = v.BindEnv("runtime.webhookRateLimit", "SERVER_INCOMING_WEBHOOK_RATE_LIMIT") _ = v.BindEnv("runtime.webhookRateLimitBurst", "SERVER_INCOMING_WEBHOOK_RATE_LIMIT_BURST")