add visibility to stream send event (#2174)

* add visibility to stream send event

* more otel

* track down stream timings

* experimental: use PrepareMsg before writing to the stream

* add control over stream window size, add error to span if large delays in stream sends
This commit is contained in:
abelanger5
2025-08-22 09:51:31 -04:00
committed by GitHub
parent 22322fdc94
commit 67aef4fa64
4 changed files with 82 additions and 3 deletions

View File

@@ -9,7 +9,9 @@ import (
"time"
"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
msgqueuev1 "github.com/hatchet-dev/hatchet/internal/msgqueue/v1"
"github.com/hatchet-dev/hatchet/internal/queueutils"
@@ -40,10 +42,76 @@ func (worker *subscribedWorker) StartTaskFromBulk(
action.ActionType = contracts.ActionType_START_STEP_RUN
action.ActionPayload = string(inputBytes)
return worker.sendToWorker(ctx, action)
}
func (worker *subscribedWorker) sendToWorker(
ctx context.Context,
action *contracts.AssignedAction,
) error {
ctx, span := telemetry.NewSpan(ctx, "send-to-worker") // nolint:ineffassign
defer span.End()
telemetry.WithAttributes(
span,
telemetry.AttributeKV{
Key: "worker_id",
Value: worker.workerId,
},
)
telemetry.WithAttributes(
span,
telemetry.AttributeKV{
Key: "payload_size",
Value: len(action.ActionPayload),
},
)
_, encodeSpan := telemetry.NewSpan(ctx, "encode-action")
msg := &grpc.PreparedMsg{}
err := msg.Encode(worker.stream, action)
if err != nil {
encodeSpan.RecordError(err)
encodeSpan.End()
return fmt.Errorf("could not encode action: %w", err)
}
encodeSpan.End()
lockBegin := time.Now()
_, lockSpan := telemetry.NewSpan(ctx, "acquire-worker-stream-lock")
worker.sendMu.Lock()
defer worker.sendMu.Unlock()
return worker.stream.Send(action)
lockSpan.End()
telemetry.WithAttributes(span, telemetry.AttributeKV{
Key: "lock_duration_ms",
Value: time.Since(lockBegin).Milliseconds(),
})
_, streamSpan := telemetry.NewSpan(ctx, "send-worker-stream")
defer streamSpan.End()
sendMsgBegin := time.Now()
err = worker.stream.SendMsg(msg)
if err != nil {
span.RecordError(err)
}
if time.Since(sendMsgBegin) > 50*time.Millisecond {
span.SetStatus(codes.Error, "flow control detected")
span.RecordError(fmt.Errorf("send took too long, we may be in flow control: %s", time.Since(sendMsgBegin)))
}
return err
}
func (worker *subscribedWorker) CancelTask(

View File

@@ -41,6 +41,8 @@ type subscribedWorker struct {
finished chan<- bool
sendMu sync.Mutex
workerId string
}
func (worker *subscribedWorker) StartStepRun(
@@ -358,7 +360,7 @@ func (s *DispatcherImpl) Listen(request *contracts.WorkerListenRequest, stream c
fin := make(chan bool)
s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin})
s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin, workerId: request.WorkerId})
defer func() {
// non-blocking send
@@ -480,7 +482,7 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
fin := make(chan bool)
s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin})
s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin, workerId: request.WorkerId})
defer func() {
// non-blocking send

View File

@@ -307,6 +307,10 @@ func (s *Server) startGRPC() (func() error, error) {
s.config.Runtime.GRPCMaxMsgSize,
))
serverOpts = append(serverOpts, grpc.StaticStreamWindowSize(
s.config.Runtime.GRPCStaticStreamWindowSize,
))
grpcServer := grpc.NewServer(serverOpts...)
if s.ingestor != nil {

View File

@@ -148,6 +148,10 @@ type ConfigFileRuntime struct {
// GRPCMaxMsgSize is the maximum message size that the grpc server will accept
GRPCMaxMsgSize int `mapstructure:"grpcMaxMsgSize" json:"grpcMaxMsgSize,omitempty" default:"4194304"`
// GRPCStaticStreamWindowSize sets the static stream window size for the grpc server. This can help with performance
// with overloaded workers and large messages. Default is 10MB.
GRPCStaticStreamWindowSize int32 `mapstructure:"grpcStaticStreamWindowSize" json:"grpcStaticStreamWindowSize,omitempty" default:"10485760"`
// GRPCRateLimit is the rate limit for the grpc server. We count limits separately for the Workflow, Dispatcher and Events services. Workflow and Events service are set to this rate, Dispatcher is 10X this rate. The rate limit is per second, per engine, per api token.
GRPCRateLimit float64 `mapstructure:"grpcRateLimit" json:"grpcRateLimit,omitempty" default:"1000"`
@@ -609,6 +613,7 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("runtime.grpcBroadcastAddress", "SERVER_GRPC_BROADCAST_ADDRESS")
_ = v.BindEnv("runtime.grpcInsecure", "SERVER_GRPC_INSECURE")
_ = v.BindEnv("runtime.grpcMaxMsgSize", "SERVER_GRPC_MAX_MSG_SIZE")
_ = v.BindEnv("runtime.grpcStaticStreamWindowSize", "SERVER_GRPC_STATIC_STREAM_WINDOW_SIZE")
_ = v.BindEnv("runtime.grpcRateLimit", "SERVER_GRPC_RATE_LIMIT")
_ = v.BindEnv("runtime.webhookRateLimit", "SERVER_INCOMING_WEBHOOK_RATE_LIMIT")
_ = v.BindEnv("runtime.webhookRateLimitBurst", "SERVER_INCOMING_WEBHOOK_RATE_LIMIT_BURST")