mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-30 14:39:56 -05:00
fix: retry rabbitmq connections properly and retry published messages (#369)
This commit is contained in:
@@ -18,6 +18,9 @@ import (
|
||||
"github.com/hatchet-dev/hatchet/internal/msgqueue"
|
||||
)
|
||||
|
||||
const MAX_RETRY_COUNT = 15
|
||||
const RETRY_INTERVAL = 2 * time.Second
|
||||
|
||||
// session composes an amqp.Connection with an amqp.Channel
|
||||
type session struct {
|
||||
*amqp.Connection
|
||||
@@ -255,6 +258,10 @@ func (t *MessageQueueImpl) startPublishing() func() error {
|
||||
pub := <-session
|
||||
|
||||
for {
|
||||
if pub.Channel.IsClosed() || pub.Connection.IsClosed() {
|
||||
break
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -276,9 +283,9 @@ func (t *MessageQueueImpl) startPublishing() func() error {
|
||||
Body: body,
|
||||
})
|
||||
|
||||
// TODO: retry failed delivery on the next session
|
||||
// retry failed delivery on the next session
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("error publishing msg: %v", err)
|
||||
t.msgs <- msg
|
||||
return
|
||||
}
|
||||
|
||||
@@ -358,11 +365,17 @@ func (t *MessageQueueImpl) subscribe(
|
||||
return
|
||||
}
|
||||
|
||||
inner:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case rabbitMsg := <-deliveries:
|
||||
case rabbitMsg, ok := <-deliveries:
|
||||
if !ok {
|
||||
t.l.Info().Msg("deliveries channel closed")
|
||||
break inner
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func(rabbitMsg amqp.Delivery) {
|
||||
@@ -437,6 +450,12 @@ func (t *MessageQueueImpl) subscribe(
|
||||
}(rabbitMsg)
|
||||
}
|
||||
}
|
||||
|
||||
err = sub.CloseDeadline(time.Now())
|
||||
|
||||
if err != nil {
|
||||
t.l.Error().Msgf("cannot close session: %s, %v", sub.LocalAddr().String(), err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -468,9 +487,25 @@ func (t *MessageQueueImpl) redial(ctx context.Context, l *zerolog.Logger, url st
|
||||
return
|
||||
}
|
||||
|
||||
newSession, err := getSession(ctx, l, url)
|
||||
var newSession session
|
||||
var err error
|
||||
|
||||
for i := 0; i < MAX_RETRY_COUNT; i++ {
|
||||
newSession, err = getSession(ctx, l, url)
|
||||
if err == nil {
|
||||
if i > 0 {
|
||||
l.Info().Msgf("re-established session after %d attempts", i)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
l.Error().Msgf("error getting session (attempt %d): %v", i+1, err)
|
||||
time.Sleep(RETRY_INTERVAL)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
l.Error().Msgf("error getting session: %v", err)
|
||||
l.Error().Msgf("failed to get session after %d attempts", MAX_RETRY_COUNT)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user