mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-29 12:49:45 -06:00
fix: race condition on err in pgmq (#1198)
This commit is contained in:
@@ -150,7 +150,7 @@ func (p *PostgresMessageQueue) Subscribe(queue msgqueue.Queue, preAck msgqueue.A
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
doTask := func(task msgqueue.Message, ackId *int64) error {
|
doTask := func(task msgqueue.Message, ackId *int64) error {
|
||||||
err = preAck(&task)
|
err := preAck(&task)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.l.Error().Err(err).Msg("error pre-acking message")
|
p.l.Error().Err(err).Msg("error pre-acking message")
|
||||||
@@ -229,7 +229,7 @@ func (p *PostgresMessageQueue) Subscribe(queue msgqueue.Queue, preAck msgqueue.A
|
|||||||
|
|
||||||
// start the listener
|
// start the listener
|
||||||
go func() {
|
go func() {
|
||||||
err = p.repo.Listen(subscribeCtx, queue.Name(), func(ctx context.Context, notification *repository.PubMessage) error {
|
err := p.repo.Listen(subscribeCtx, queue.Name(), func(ctx context.Context, notification *repository.PubMessage) error {
|
||||||
// if this is an exchange queue, and the message starts with JSON '{', then we process the message directly
|
// if this is an exchange queue, and the message starts with JSON '{', then we process the message directly
|
||||||
if queue.FanoutExchangeKey() != "" && len(notification.Payload) >= 1 && notification.Payload[0] == '{' {
|
if queue.FanoutExchangeKey() != "" && len(notification.Payload) >= 1 && notification.Payload[0] == '{' {
|
||||||
var task msgqueue.Message
|
var task msgqueue.Message
|
||||||
|
|||||||
Reference in New Issue
Block a user