Send create:user Event from OAuth Flow (#2683)

* feat: Send create:user event from OAuth flow

* feat: Implement user and tenant creation events in callbacks

* move callback into cb.Do

---------

Co-authored-by: Alexander Belanger <alexander@hatchet.run>
This commit is contained in:
Andrei Gaspar
2026-01-06 20:06:38 +01:00
committed by GitHub
parent 7d662d42a4
commit 4dda2b2884
13 changed files with 171 additions and 125 deletions
+5 -10
View File
@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/google/uuid"
@@ -26,9 +25,6 @@ type PostgresMessageQueue struct {
l *zerolog.Logger
qos int
upsertedQueues map[string]bool
upsertedQueuesMu sync.RWMutex
ttlCache *cache.TTLCache[string, bool]
configFs []MessageQueueImplOpt
@@ -74,12 +70,11 @@ func NewPostgresMQ(repo v1.MessageQueueRepository, fs ...MessageQueueImplOpt) (f
c := cache.NewTTL[string, bool]()
p := &PostgresMessageQueue{
repo: repo,
l: opts.l,
qos: opts.qos,
upsertedQueues: make(map[string]bool),
configFs: fs,
ttlCache: c,
repo: repo,
l: opts.l,
qos: opts.qos,
configFs: fs,
ttlCache: c,
}
err := p.upsertQueue(context.Background(), msgqueue.TASK_PROCESSING_QUEUE)
-47
View File
@@ -629,53 +629,6 @@ func (t *MessageQueueImpl) initQueue(ch *amqp.Channel, q msgqueue.Queue) (string
return name, nil
}
// deleteQueue is a helper function for removing durable queues which are used for tests.
func (t *MessageQueueImpl) deleteQueue(q msgqueue.Queue) error {
poolCh, err := t.subChannels.Acquire(context.Background())
if err != nil {
t.l.Error().Msgf("[deleteQueue] cannot acquire channel for deleting queue: %v", err)
return err
}
ch := poolCh.Value()
if ch.IsClosed() {
poolCh.Destroy()
return fmt.Errorf("channel is closed")
}
defer poolCh.Release()
_, err = ch.QueueDelete(q.Name(), true, true, false)
if err != nil {
t.l.Error().Msgf("cannot delete queue: %q, %v", q.Name(), err)
return err
}
if q.DLQ().Name() != "" {
dlq1 := getTmpDLQName(q.DLQ().Name())
dlq2 := getProcDLQName(q.DLQ().Name())
_, err = ch.QueueDelete(dlq1, true, true, false)
if err != nil {
t.l.Error().Msgf("cannot delete dead letter queue: %q, %v", dlq1, err)
return err
}
_, err = ch.QueueDelete(dlq2, true, true, false)
if err != nil {
t.l.Error().Msgf("cannot delete dead letter queue: %q, %v", dlq2, err)
return err
}
}
return nil
}
func (t *MessageQueueImpl) subscribe(
ctx context.Context,
subId string,
@@ -338,3 +338,50 @@ func TestDeadLetteringSuccess(t *testing.T) {
t.Fatalf("error cleaning up queue: %v", err)
}
}
// deleteQueue is a helper function for removing durable queues which are used for tests.
func (t *MessageQueueImpl) deleteQueue(q msgqueue.Queue) error {
poolCh, err := t.subChannels.Acquire(context.Background())
if err != nil {
t.l.Error().Msgf("[deleteQueue] cannot acquire channel for deleting queue: %v", err)
return err
}
ch := poolCh.Value()
if ch.IsClosed() {
poolCh.Destroy()
return fmt.Errorf("channel is closed")
}
defer poolCh.Release()
_, err = ch.QueueDelete(q.Name(), true, true, false)
if err != nil {
t.l.Error().Msgf("cannot delete queue: %q, %v", q.Name(), err)
return err
}
if q.DLQ().Name() != "" {
dlq1 := getTmpDLQName(q.DLQ().Name())
dlq2 := getProcDLQName(q.DLQ().Name())
_, err = ch.QueueDelete(dlq1, true, true, false)
if err != nil {
t.l.Error().Msgf("cannot delete dead letter queue: %q, %v", dlq1, err)
return err
}
_, err = ch.QueueDelete(dlq2, true, true, false)
if err != nil {
t.l.Error().Msgf("cannot delete dead letter queue: %q, %v", dlq2, err)
return err
}
}
return nil
}