mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-23 02:34:48 -05:00
fix: enhance SQL query name extraction in otel tracer and fallback (#3277)
* fix: enhance SQL query name extraction in otel tracer and fallback * feat: propagate context to logger * feat: correlation ids * feat: add tenant ids * feat: more richness for partitionable things * fix: tests * feat: capture http errors on spans * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -21,7 +21,7 @@ func (p *PostgresMessageQueue) addTenantExchangeMessage(ctx context.Context, q m
|
||||
err := p.RegisterTenant(ctx, tenantId)
|
||||
|
||||
if err != nil {
|
||||
p.l.Error().Msgf("error registering tenant exchange: %v", err)
|
||||
p.l.Error().Ctx(ctx).Msgf("error registering tenant exchange: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ func (p *PostgresMessageQueue) pubNonDurableMessages(ctx context.Context, queueN
|
||||
return p.repo.Notify(ctx, queueName, string(msgBytes))
|
||||
})
|
||||
} else {
|
||||
p.l.Error().Err(err).Msg("error marshalling message")
|
||||
p.l.Error().Ctx(ctx).Err(err).Msg("error marshalling message")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -477,19 +477,19 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
|
||||
err = t.RegisterTenant(ctx, msg.TenantID)
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("error registering tenant exchange: %v", err)
|
||||
t.l.Error().Ctx(ctx).Str("tenant_id", msg.TenantID.String()).Msgf("error registering tenant exchange: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
t.l.Debug().Msgf("publishing tenant msg to exchange %s", msg.TenantID)
|
||||
t.l.Debug().Ctx(ctx).Str("tenant_id", msg.TenantID.String()).Msgf("publishing tenant msg to exchange %s", msg.TenantID)
|
||||
|
||||
err = pub.PublishWithContext(ctx, msgqueue.GetTenantExchangeName(msg.TenantID), "", false, false, amqp.Publishing{
|
||||
Body: body,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("error publishing tenant msg: %v", err)
|
||||
t.l.Error().Ctx(ctx).Str("tenant_id", msg.TenantID.String()).Msgf("error publishing tenant msg: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -558,7 +558,7 @@ func (t *MessageQueueImpl) RegisterTenant(ctx context.Context, tenantId uuid.UUI
|
||||
poolCh, err := t.pubChannels.Acquire(ctx)
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("[RegisterTenant] cannot acquire channel: %v", err)
|
||||
t.l.Error().Ctx(ctx).Str("tenant_id", tenantId.String()).Msgf("[RegisterTenant] cannot acquire channel: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -571,7 +571,7 @@ func (t *MessageQueueImpl) RegisterTenant(ctx context.Context, tenantId uuid.UUI
|
||||
|
||||
defer poolCh.Release()
|
||||
|
||||
t.l.Debug().Msgf("registering tenant exchange: %s", tenantId)
|
||||
t.l.Debug().Ctx(ctx).Str("tenant_id", tenantId.String()).Msgf("registering tenant exchange: %s", tenantId)
|
||||
|
||||
// create a fanout exchange for the tenant. each consumer of the fanout exchange will get notified
|
||||
// with the tenant events.
|
||||
@@ -586,7 +586,7 @@ func (t *MessageQueueImpl) RegisterTenant(ctx context.Context, tenantId uuid.UUI
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("cannot declare exchange: %q, %v", tenantId, err)
|
||||
t.l.Error().Ctx(ctx).Str("tenant_id", tenantId.String()).Msgf("cannot declare exchange: %q, %v", tenantId, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -343,10 +343,11 @@ func TestDeadLetteringSuccess(t *testing.T) {
|
||||
|
||||
// deleteQueue is a helper function for removing durable queues which are used for tests.
|
||||
func (t *MessageQueueImpl) deleteQueue(q msgqueue.Queue) error {
|
||||
poolCh, err := t.subChannels.Acquire(context.Background())
|
||||
ctx := context.Background()
|
||||
poolCh, err := t.subChannels.Acquire(ctx)
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("[deleteQueue] cannot acquire channel for deleting queue: %v", err)
|
||||
t.l.Error().Ctx(ctx).Msgf("[deleteQueue] cannot acquire channel for deleting queue: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -362,7 +363,7 @@ func (t *MessageQueueImpl) deleteQueue(q msgqueue.Queue) error {
|
||||
_, err = ch.QueueDelete(q.Name(), true, true, false)
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("cannot delete queue: %q, %v", q.Name(), err)
|
||||
t.l.Error().Ctx(ctx).Msgf("cannot delete queue: %q, %v", q.Name(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -373,14 +374,14 @@ func (t *MessageQueueImpl) deleteQueue(q msgqueue.Queue) error {
|
||||
_, err = ch.QueueDelete(dlq1, true, true, false)
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("cannot delete dead letter queue: %q, %v", dlq1, err)
|
||||
t.l.Error().Ctx(ctx).Msgf("cannot delete dead letter queue: %q, %v", dlq1, err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = ch.QueueDelete(dlq2, true, true, false)
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("cannot delete dead letter queue: %q, %v", dlq2, err)
|
||||
t.l.Error().Ctx(ctx).Msgf("cannot delete dead letter queue: %q, %v", dlq2, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user