fix: use separate connections for pub and sub (#2358)

* use separate connections for pub and sub

* Update internal/msgqueue/v1/rabbitmq/rabbitmq.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
abelanger5
2025-09-29 14:29:45 -04:00
committed by GitHub
parent a0afe49c85
commit 733feedbff
+20 -10
View File
@@ -41,13 +41,14 @@ type MessageQueueImpl struct {
// lru cache for tenant ids
tenantIdCache *lru.Cache[string, bool]
channels *channelPool
pubChannels *channelPool
subChannels *channelPool
deadLetterBackoff time.Duration
}
func (t *MessageQueueImpl) IsReady() bool {
return t.channels.hasActiveConnection()
return t.pubChannels.hasActiveConnection() && t.subChannels.hasActiveConnection()
}
type MessageQueueImplOpt func(*MessageQueueImplOpts)
@@ -113,13 +114,21 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
newLogger := opts.l.With().Str("service", "rabbitmq").Logger()
opts.l = &newLogger
channelPool, err := newChannelPool(ctx, opts.l, opts.url)
pubChannelPool, err := newChannelPool(ctx, opts.l, opts.url)
if err != nil {
cancel()
return nil, nil
}
subChannelPool, err := newChannelPool(ctx, opts.l, opts.url)
if err != nil {
pubChannelPool.Close()
cancel()
return nil, nil
}
t := &MessageQueueImpl{
ctx: ctx,
identity: identity(),
@@ -127,7 +136,8 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
qos: opts.qos,
configFs: fs,
disableTenantExchangePubs: opts.disableTenantExchangePubs,
channels: channelPool,
pubChannels: pubChannelPool,
subChannels: subChannelPool,
deadLetterBackoff: opts.deadLetterBackoff,
}
@@ -135,7 +145,7 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
t.tenantIdCache, _ = lru.New[string, bool](2000) // nolint: errcheck - this only returns an error if the size is less than 0
// init the queues in a blocking fashion
poolCh, err := channelPool.Acquire(ctx)
poolCh, err := subChannelPool.Acquire(ctx)
if err != nil {
t.l.Error().Msgf("cannot acquire channel: %v", err)
@@ -224,7 +234,7 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
msg.SetOtelCarrier(otelCarrier)
poolCh, err := t.channels.Acquire(ctx)
poolCh, err := t.pubChannels.Acquire(ctx)
if err != nil {
t.l.Error().Msgf("cannot acquire channel: %v", err)
@@ -356,7 +366,7 @@ func (t *MessageQueueImpl) Subscribe(
func (t *MessageQueueImpl) RegisterTenant(ctx context.Context, tenantId string) error {
// create a new fanout exchange for the tenant
poolCh, err := t.channels.Acquire(ctx)
poolCh, err := t.pubChannels.Acquire(ctx)
if err != nil {
t.l.Error().Msgf("cannot acquire channel: %v", err)
@@ -466,7 +476,7 @@ func (t *MessageQueueImpl) initQueue(ch *amqp.Channel, q msgqueue.Queue) (string
// deleteQueue is a helper function for removing durable queues which are used for tests.
func (t *MessageQueueImpl) deleteQueue(q msgqueue.Queue) error {
poolCh, err := t.channels.Acquire(context.Background())
poolCh, err := t.subChannels.Acquire(context.Background())
if err != nil {
t.l.Error().Msgf("cannot acquire channel for deleting queue: %v", err)
@@ -524,7 +534,7 @@ func (t *MessageQueueImpl) subscribe(
var queueName string
if !q.Exclusive() {
poolCh, err := t.channels.Acquire(ctx)
poolCh, err := t.subChannels.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("cannot acquire channel for initializing queue: %v", err)
@@ -554,7 +564,7 @@ func (t *MessageQueueImpl) subscribe(
}
innerFn := func() error {
poolCh, err := t.channels.Acquire(ctx)
poolCh, err := t.subChannels.Acquire(ctx)
if err != nil {
return err