Feat: OTel Collector (#2863)

* feat: initial otel collector work

* chore: vendor otel protos

* feat: add rate limiter for otelcol

* fix: clean up naming

* feat: shell implementation

* feat: add buf + generate protos

* fix: naming

* fix: naming

* chore: simplify a bit

* fix: rename logger

* fix: naming cleanup

* fix: rm unused var

* fix: rm unused struct

* chore: rm vendored stuff, don't need it apparently
This commit is contained in:
matt
2026-01-28 14:10:15 -05:00
committed by GitHub
parent db724c331f
commit 3c416ee949
7 changed files with 449 additions and 35 deletions
+21
View File
@@ -23,6 +23,7 @@ import (
"github.com/hatchet-dev/hatchet/internal/services/ingestor"
"github.com/hatchet-dev/hatchet/internal/services/partition"
schedulerv1 "github.com/hatchet-dev/hatchet/internal/services/scheduler/v1"
"github.com/hatchet-dev/hatchet/internal/services/otelcol"
"github.com/hatchet-dev/hatchet/internal/services/ticker"
"github.com/hatchet-dev/hatchet/pkg/config/loader"
"github.com/hatchet-dev/hatchet/pkg/config/server"
@@ -405,6 +406,15 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
return nil, fmt.Errorf("could not create admin service (v1): %w", err)
}
oc, err := otelcol.NewOTelCollector(
otelcol.WithRepository(sc.V1),
otelcol.WithLogger(sc.Logger),
)
if err != nil {
return nil, fmt.Errorf("could not create otel collector: %w", err)
}
grpcOpts := []grpc.ServerOpt{
grpc.WithConfig(sc),
grpc.WithIngestor(ei),
@@ -412,6 +422,7 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
grpc.WithDispatcherV1(dv1),
grpc.WithAdmin(adminSvc),
grpc.WithAdminV1(adminv1Svc),
grpc.WithOTelCollector(oc),
grpc.WithLogger(sc.Logger),
grpc.WithAlerter(sc.Alerter),
grpc.WithTLSConfig(sc.TLSConfig),
@@ -763,6 +774,15 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
return nil, fmt.Errorf("could not create admin service (v1): %w", err)
}
oc, err := otelcol.NewOTelCollector(
otelcol.WithRepository(sc.V1),
otelcol.WithLogger(sc.Logger),
)
if err != nil {
return nil, fmt.Errorf("could not create otel collector: %w", err)
}
grpcOpts := []grpc.ServerOpt{
grpc.WithConfig(sc),
grpc.WithIngestor(ei),
@@ -770,6 +790,7 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
grpc.WithDispatcherV1(dv1),
grpc.WithAdmin(adminSvc),
grpc.WithAdminV1(adminv1Svc),
grpc.WithOTelCollector(oc),
grpc.WithLogger(sc.Logger),
grpc.WithAlerter(sc.Alerter),
grpc.WithTLSConfig(sc.TLSConfig),
@@ -18,6 +18,7 @@ type HatchetApiTokenRateLimiter struct {
dispatcherLimiter *rate.Limiter
workflowLimiter *rate.Limiter
adminV1Limiter *rate.Limiter
otelColLimiter *rate.Limiter
}
type HatchetRateLimiter struct {
@@ -37,8 +38,9 @@ func (rl *HatchetRateLimiter) GetOrCreateTenantRateLimiter(rateLimitToken string
eventsLimiter: rate.NewLimiter(rl.rate, rl.burst),
workflowLimiter: rate.NewLimiter(rl.rate, rl.burst),
adminV1Limiter: rate.NewLimiter(rl.rate, rl.burst),
// 10x the rate for dispatcher
// 10x the rate for dispatcher and otelcol
dispatcherLimiter: rate.NewLimiter(rl.rate*10, rl.burst*10),
otelColLimiter: rate.NewLimiter(rl.rate*10, rl.burst*10),
}
}
@@ -93,6 +95,12 @@ func (r *HatchetRateLimiter) Limit(ctx context.Context) error {
return status.Errorf(codes.ResourceExhausted, "admin rate limit exceeded")
}
case "otelcol":
if !r.GetOrCreateTenantRateLimiter(rateLimitToken).otelColLimiter.Allow() {
r.l.Info().Msgf("otel collector rate limit (%v per second) exceeded", r.GetOrCreateTenantRateLimiter(rateLimitToken).otelColLimiter.Limit())
return status.Errorf(codes.ResourceExhausted, "otel collector rate limit exceeded")
}
default:
return status.Errorf(codes.Internal, "service %s not recognized", serviceName)
}
@@ -153,6 +161,8 @@ func matchServiceName(name string) string {
return "workflow"
case strings.HasPrefix(name, "/v1.AdminService"):
return "admin"
case strings.HasPrefix(name, "/opentelemetry.proto.collector"):
return "otelcol"
default:
return "unknown"
}
+51 -34
View File
@@ -30,11 +30,13 @@ import (
"github.com/hatchet-dev/hatchet/internal/services/grpc/middleware"
"github.com/hatchet-dev/hatchet/internal/services/ingestor"
eventcontracts "github.com/hatchet-dev/hatchet/internal/services/ingestor/contracts"
"github.com/hatchet-dev/hatchet/internal/services/otelcol"
v1contracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"
"github.com/hatchet-dev/hatchet/pkg/analytics"
"github.com/hatchet-dev/hatchet/pkg/config/server"
"github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/logger"
collectortracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
_ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec
@@ -46,6 +48,7 @@ type Server struct {
admincontracts.UnimplementedWorkflowServiceServer
v1contracts.UnimplementedAdminServiceServer
v1contracts.UnimplementedV1DispatcherServer
collectortracev1.UnimplementedTraceServiceServer
l *zerolog.Logger
a errors.Alerter
@@ -53,32 +56,34 @@ type Server struct {
port int
bindAddress string
config *server.ServerConfig
ingestor ingestor.Ingestor
dispatcher dispatcher.Dispatcher
dispatcherv1 dispatcherv1.DispatcherService
admin admin.AdminService
adminv1 adminv1.AdminService
tls *tls.Config
insecure bool
config *server.ServerConfig
ingestor ingestor.Ingestor
dispatcher dispatcher.Dispatcher
dispatcherv1 dispatcherv1.DispatcherService
admin admin.AdminService
adminv1 adminv1.AdminService
otelCollector otelcol.OTelCollector
tls *tls.Config
insecure bool
}
type ServerOpt func(*ServerOpts)
type ServerOpts struct {
config *server.ServerConfig
l *zerolog.Logger
a errors.Alerter
analytics analytics.Analytics
port int
bindAddress string
ingestor ingestor.Ingestor
dispatcher dispatcher.Dispatcher
dispatcherv1 dispatcherv1.DispatcherService
admin admin.AdminService
adminv1 adminv1.AdminService
tls *tls.Config
insecure bool
config *server.ServerConfig
l *zerolog.Logger
a errors.Alerter
analytics analytics.Analytics
port int
bindAddress string
ingestor ingestor.Ingestor
dispatcher dispatcher.Dispatcher
dispatcherv1 dispatcherv1.DispatcherService
admin admin.AdminService
adminv1 adminv1.AdminService
otelCollector otelcol.OTelCollector
tls *tls.Config
insecure bool
}
func defaultServerOpts() *ServerOpts {
@@ -173,6 +178,12 @@ func WithAdminV1(a adminv1.AdminService) ServerOpt {
}
}
func WithOTelCollector(oc otelcol.OTelCollector) ServerOpt {
return func(opts *ServerOpts) {
opts.otelCollector = oc
}
}
func NewServer(fs ...ServerOpt) (*Server, error) {
opts := defaultServerOpts()
@@ -192,19 +203,20 @@ func NewServer(fs ...ServerOpt) (*Server, error) {
opts.l = &newLogger
return &Server{
l: opts.l,
a: opts.a,
analytics: opts.analytics,
config: opts.config,
port: opts.port,
bindAddress: opts.bindAddress,
ingestor: opts.ingestor,
dispatcher: opts.dispatcher,
dispatcherv1: opts.dispatcherv1,
admin: opts.admin,
adminv1: opts.adminv1,
tls: opts.tls,
insecure: opts.insecure,
l: opts.l,
a: opts.a,
analytics: opts.analytics,
config: opts.config,
port: opts.port,
bindAddress: opts.bindAddress,
ingestor: opts.ingestor,
dispatcher: opts.dispatcher,
dispatcherv1: opts.dispatcherv1,
admin: opts.admin,
adminv1: opts.adminv1,
otelCollector: opts.otelCollector,
tls: opts.tls,
insecure: opts.insecure,
}, nil
}
@@ -341,6 +353,11 @@ func (s *Server) startGRPC() (func() error, error) {
v1contracts.RegisterAdminServiceServer(grpcServer, s.adminv1)
}
if s.otelCollector != nil {
// Register as the standard OTLP TraceService for OTEL SDK compatibility
collectortracev1.RegisterTraceServiceServer(grpcServer, s.otelCollector)
}
go func() {
if err := grpcServer.Serve(lis); err != nil {
panic(fmt.Errorf("failed to serve: %w", err))
+56
View File
@@ -0,0 +1,56 @@
package otelcol
import (
"fmt"
"github.com/rs/zerolog"
collectortracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"github.com/hatchet-dev/hatchet/pkg/repository"
)
type OTelCollector interface {
collectortracev1.TraceServiceServer
}
type OTelCollectorOpt func(*OTelCollectorOpts)
type OTelCollectorOpts struct {
repo repository.Repository
l *zerolog.Logger
}
func WithRepository(r repository.Repository) OTelCollectorOpt {
return func(opts *OTelCollectorOpts) {
opts.repo = r
}
}
func WithLogger(l *zerolog.Logger) OTelCollectorOpt {
return func(opts *OTelCollectorOpts) {
opts.l = l
}
}
func NewOTelCollector(fs ...OTelCollectorOpt) (OTelCollector, error) {
opts := &OTelCollectorOpts{}
for _, f := range fs {
f(opts)
}
if opts.repo == nil {
return nil, fmt.Errorf("repository is required. use WithRepository")
}
if opts.l == nil {
return nil, fmt.Errorf("logger is required. use WithLogger")
}
newLogger := opts.l.With().Str("service", "otel-collector").Logger()
return &otelCollectorImpl{
repo: opts.repo,
l: &newLogger,
}, nil
}
+244
View File
@@ -0,0 +1,244 @@
package otelcol
import (
"context"
"encoding/json"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog"
collectortracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
)
const (
// keep these in sync with attributes sent from the sdks
AttrHatchetTaskRunID = "hatchet.step_run_id" // Task run external ID from SDK
AttrHatchetWorkflowRunID = "hatchet.workflow_run_id" // Workflow run ID from SDK
)
type otelCollectorImpl struct {
collectortracev1.UnimplementedTraceServiceServer
repo repository.Repository
l *zerolog.Logger
}
func (oc *otelCollectorImpl) Export(ctx context.Context, req *collectortracev1.ExportTraceServiceRequest) (*collectortracev1.ExportTraceServiceResponse, error) {
tenant, ok := ctx.Value("tenant").(*sqlcv1.Tenant)
if !ok {
oc.l.Warn().Msg("no tenant in context for trace export")
return &collectortracev1.ExportTraceServiceResponse{}, nil
}
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
otelColRepo := oc.repo.OTelCollector()
if otelColRepo == nil {
oc.l.Debug().Msg("otel collector repository not configured, discarding spans")
return &collectortracev1.ExportTraceServiceResponse{}, nil
}
spans := oc.convertOTLPToSpanData(req.GetResourceSpans(), tenant.ID)
if len(spans) == 0 {
return &collectortracev1.ExportTraceServiceResponse{}, nil
}
err := otelColRepo.CreateSpans(ctx, tenantId, &repository.CreateSpansOpts{
TenantID: tenantId,
Spans: spans,
})
if err != nil {
oc.l.Error().Err(err).Msg("failed to store spans")
return &collectortracev1.ExportTraceServiceResponse{
PartialSuccess: &collectortracev1.ExportTracePartialSuccess{
RejectedSpans: int64(len(spans)),
ErrorMessage: err.Error(),
},
}, nil
}
oc.l.Debug().Int("span_count", len(spans)).Str("tenant_id", tenantId).Msg("stored spans")
return &collectortracev1.ExportTraceServiceResponse{}, nil
}
func (oc *otelCollectorImpl) convertOTLPToSpanData(resourceSpans []*tracev1.ResourceSpans, tenantID pgtype.UUID) []*repository.SpanData {
var spans []*repository.SpanData
for _, rs := range resourceSpans {
resourceAttrs := oc.serializeAttributes(rs.GetResource().GetAttributes())
for _, ss := range rs.GetScopeSpans() {
scopeName := ss.GetScope().GetName()
for _, span := range ss.GetSpans() {
spanData := &repository.SpanData{
TraceID: span.GetTraceId(),
SpanID: span.GetSpanId(),
ParentSpanID: span.GetParentSpanId(),
Name: span.GetName(),
Kind: int32(span.GetKind()),
StartTimeUnixNano: span.GetStartTimeUnixNano(),
EndTimeUnixNano: span.GetEndTimeUnixNano(),
StatusCode: int32(span.GetStatus().GetCode()),
StatusMessage: span.GetStatus().GetMessage(),
Attributes: oc.serializeAttributes(span.GetAttributes()),
Events: oc.serializeEvents(span.GetEvents()),
Links: oc.serializeLinks(span.GetLinks()),
ResourceAttributes: resourceAttrs,
TenantID: tenantID,
InstrumentationScope: scopeName,
}
oc.extractHatchetCorrelation(span.GetAttributes(), spanData)
spans = append(spans, spanData)
}
}
}
return spans
}
func (oc *otelCollectorImpl) extractHatchetCorrelation(attrs []*commonv1.KeyValue, spanData *repository.SpanData) {
for _, attr := range attrs {
switch attr.GetKey() {
case AttrHatchetTaskRunID:
if strVal := attr.GetValue().GetStringValue(); strVal != "" {
uuid := sqlchelpers.UUIDFromStr(strVal)
spanData.TaskRunExternalID = &uuid
}
case AttrHatchetWorkflowRunID:
if strVal := attr.GetValue().GetStringValue(); strVal != "" {
uuid := sqlchelpers.UUIDFromStr(strVal)
spanData.WorkflowRunID = &uuid
}
}
}
}
func (oc *otelCollectorImpl) serializeAttributes(attrs []*commonv1.KeyValue) []byte {
if len(attrs) == 0 {
return nil
}
attrMap := make(map[string]any, len(attrs))
for _, kv := range attrs {
attrMap[kv.GetKey()] = oc.anyValueToInterface(kv.GetValue())
}
data, err := json.Marshal(attrMap)
if err != nil {
oc.l.Warn().Err(err).Msg("failed to serialize attributes")
return nil
}
return data
}
func (oc *otelCollectorImpl) serializeEvents(events []*tracev1.Span_Event) []byte {
if len(events) == 0 {
return nil
}
eventList := make([]map[string]any, 0, len(events))
for _, event := range events {
eventMap := map[string]any{
"name": event.GetName(),
"time_unix_nano": event.GetTimeUnixNano(),
"dropped_attributes_count": event.GetDroppedAttributesCount(),
}
if len(event.GetAttributes()) > 0 {
attrMap := make(map[string]any, len(event.GetAttributes()))
for _, kv := range event.GetAttributes() {
attrMap[kv.GetKey()] = oc.anyValueToInterface(kv.GetValue())
}
eventMap["attributes"] = attrMap
}
eventList = append(eventList, eventMap)
}
data, err := json.Marshal(eventList)
if err != nil {
oc.l.Warn().Err(err).Msg("failed to serialize events")
return nil
}
return data
}
func (oc *otelCollectorImpl) serializeLinks(links []*tracev1.Span_Link) []byte {
if len(links) == 0 {
return nil
}
linkList := make([]map[string]any, 0, len(links))
for _, link := range links {
linkMap := map[string]any{
"trace_id": link.GetTraceId(),
"span_id": link.GetSpanId(),
"trace_state": link.GetTraceState(),
"dropped_attributes_count": link.GetDroppedAttributesCount(),
}
if len(link.GetAttributes()) > 0 {
attrMap := make(map[string]any, len(link.GetAttributes()))
for _, kv := range link.GetAttributes() {
attrMap[kv.GetKey()] = oc.anyValueToInterface(kv.GetValue())
}
linkMap["attributes"] = attrMap
}
linkList = append(linkList, linkMap)
}
data, err := json.Marshal(linkList)
if err != nil {
oc.l.Warn().Err(err).Msg("failed to serialize links")
return nil
}
return data
}
func (oc *otelCollectorImpl) anyValueToInterface(v *commonv1.AnyValue) interface{} {
if v == nil {
return nil
}
switch val := v.GetValue().(type) {
case *commonv1.AnyValue_StringValue:
return val.StringValue
case *commonv1.AnyValue_BoolValue:
return val.BoolValue
case *commonv1.AnyValue_IntValue:
return val.IntValue
case *commonv1.AnyValue_DoubleValue:
return val.DoubleValue
case *commonv1.AnyValue_ArrayValue:
arr := make([]any, 0, len(val.ArrayValue.GetValues()))
for _, item := range val.ArrayValue.GetValues() {
arr = append(arr, oc.anyValueToInterface(item))
}
return arr
case *commonv1.AnyValue_KvlistValue:
m := make(map[string]any, len(val.KvlistValue.GetValues()))
for _, kv := range val.KvlistValue.GetValues() {
m[kv.GetKey()] = oc.anyValueToInterface(kv.GetValue())
}
return m
case *commonv1.AnyValue_BytesValue:
return val.BytesValue
default:
return nil
}
}
+54
View File
@@ -0,0 +1,54 @@
package repository
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
type SpanData struct {
TraceID []byte // 16 bytes
SpanID []byte // 8 bytes
ParentSpanID []byte // optional
Name string
Kind int32
StartTimeUnixNano uint64
EndTimeUnixNano uint64
StatusCode int32
StatusMessage string
Attributes []byte
Events []byte
Links []byte
ResourceAttributes []byte
TaskRunExternalID *pgtype.UUID // from hatchet.step_run_id attribute
WorkflowRunID *pgtype.UUID // from hatchet.workflow_run_id attribute
TenantID pgtype.UUID // from auth context
InstrumentationScope string
}
type CreateSpansOpts struct {
TenantID string `validate:"required,uuid"`
Spans []*SpanData
}
type OTelCollectorRepository interface {
CreateSpans(ctx context.Context, tenantId string, opts *CreateSpansOpts) error
}
type otelCollectorRepositoryImpl struct {
*sharedRepository
}
func newOTelCollectorRepository(s *sharedRepository) OTelCollectorRepository {
return &otelCollectorRepositoryImpl{
sharedRepository: s,
}
}
func (o *otelCollectorRepositoryImpl) CreateSpans(ctx context.Context, tenantId string, opts *CreateSpansOpts) error {
// intentional no-op, intended to be overridden
return nil
}
+12
View File
@@ -53,6 +53,8 @@ type Repository interface {
User() UserRepository
UserSession() UserSessionRepository
WorkflowSchedules() WorkflowScheduleRepository
OTelCollector() OTelCollectorRepository
OverwriteOTelCollectorRepository(o OTelCollectorRepository)
}
type repositoryImpl struct {
@@ -86,6 +88,7 @@ type repositoryImpl struct {
user UserRepository
userSession UserSessionRepository
workflowSchedules WorkflowScheduleRepository
otelcol OTelCollectorRepository
}
func NewRepository(
@@ -138,6 +141,7 @@ func NewRepository(
user: newUserRepository(shared),
userSession: newUserSessionRepository(shared),
workflowSchedules: newWorkflowScheduleRepository(shared),
otelcol: newOTelCollectorRepository(shared),
}
return impl, func() error {
@@ -286,3 +290,11 @@ func (r *repositoryImpl) UserSession() UserSessionRepository {
func (r *repositoryImpl) WorkflowSchedules() WorkflowScheduleRepository {
return r.workflowSchedules
}
func (r *repositoryImpl) OTelCollector() OTelCollectorRepository {
return r.otelcol
}
func (r *repositoryImpl) OverwriteOTelCollectorRepository(o OTelCollectorRepository) {
r.otelcol = o
}