feat: postgres-backed message queue (#1119)

This commit is contained in:
abelanger5
2024-12-18 09:00:54 -05:00
committed by GitHub
parent c696263d20
commit dcb67a1dac
29 changed files with 1588 additions and 30 deletions
+165
View File
@@ -197,6 +197,95 @@ jobs:
- name: Teardown
run: docker compose down
e2e-pgmq:
runs-on: ubuntu-latest
timeout-minutes: 30
env:
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet?sslmode=disable
steps:
- uses: actions/checkout@v4
- name: Install Task
uses: arduino/setup-task@v2
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: "1.22"
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 9.1.1
run_install: false
- name: Install Atlas
run: |
curl -sSf https://atlasgo.sh | sh
- name: Compose
run: docker compose up -d
- name: Go deps
run: go mod download
- name: Prepare
run: |
cat > .env <<EOF
HATCHET_CLIENT_TENANT_ID=707d0855-80ab-4e1f-a156-f1c4546cbf52
DATABASE_URL="postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet"
HATCHET_CLIENT_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
HATCHET_CLIENT_TLS_SERVER_NAME="cluster"
SERVER_TLS_CERT_FILE=./hack/dev/certs/cluster.pem
SERVER_TLS_KEY_FILE=./hack/dev/certs/cluster.key
SERVER_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
SERVER_ENCRYPTION_MASTER_KEYSET_FILE=./hack/dev/encryption-keys/master.key
SERVER_ENCRYPTION_JWT_PRIVATE_KEYSET_FILE=./hack/dev/encryption-keys/private_ec256.key
SERVER_ENCRYPTION_JWT_PUBLIC_KEYSET_FILE=./hack/dev/encryption-keys/public_ec256.key
SERVER_LOGGER_LEVEL=warn
SERVER_LOGGER_FORMAT=console
DATABASE_LOGGER_LEVEL=warn
DATABASE_LOGGER_FORMAT=console
DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet'
SERVER_PORT=8080
SERVER_URL=http://localhost:8080
SERVER_AUTH_COOKIE_SECRETS="kPpegRDNpofgkUsr HoWe67haMOF5qnaB"
SERVER_AUTH_COOKIE_DOMAIN=app.dev.hatchet-tools.com
SERVER_AUTH_COOKIE_INSECURE=false
SERVER_AUTH_SET_EMAIL_VERIFIED=true
SERVER_TASKQUEUE_KIND=postgres
EOF
- name: Generate
run: |
sh ./hack/db/atlas-apply.sh
task generate-go
task generate-certs
task generate-local-encryption-keys
- name: Run engine
run: |
set -a
. .env
set +a
go run ./cmd/hatchet-admin quickstart
go run ./cmd/hatchet-engine --config ./generated/ &
go run ./cmd/hatchet-api --config ./generated/ &
sleep 30
- name: Test
run: |
go test -tags e2e ./... -p 1 -v -failfast
- name: Teardown
run: docker compose down
load:
runs-on: ubuntu-latest
timeout-minutes: 30
@@ -271,3 +360,79 @@ jobs:
- name: Teardown
run: docker compose down
load-pgmq:
runs-on: ubuntu-latest
timeout-minutes: 30
env:
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet?sslmode=disable
steps:
- uses: actions/checkout@v4
- name: Install Task
uses: arduino/setup-task@v2
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: "1.22"
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 9.1.1
run_install: false
- name: Install Atlas
run: |
curl -sSf https://atlasgo.sh | sh
- name: Compose
run: docker compose up -d
- name: Go deps
run: go mod download
- name: Prepare
run: |
cat > .env <<EOF
DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet'
SERVER_TLS_CERT_FILE=./hack/dev/certs/cluster.pem
SERVER_TLS_KEY_FILE=./hack/dev/certs/cluster.key
SERVER_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
SERVER_PORT=8080
SERVER_URL=http://localhost:8080
SERVER_AUTH_COOKIE_SECRETS="something something"
SERVER_AUTH_COOKIE_DOMAIN=app.dev.hatchet-tools.com
SERVER_AUTH_COOKIE_INSECURE=false
SERVER_AUTH_SET_EMAIL_VERIFIED=true
SERVER_LOGGER_LEVEL=warn
SERVER_LOGGER_FORMAT=console
DATABASE_LOGGER_LEVEL=warn
DATABASE_LOGGER_FORMAT=console
SERVER_TASKQUEUE_KIND=postgres
EOF
- name: Generate
run: |
sh ./hack/db/atlas-apply.sh
task generate-go
task generate-certs
task generate-local-encryption-keys
- name: Setup
run: |
set -a
. .env
set +a
go run ./cmd/hatchet-admin quickstart --generated-config-dir ./generated/
- name: Test
run: |
export HATCHET_CLIENT_TOKEN="$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)"
RAMP_UP_DURATION_TIMEOUT=20s go test -tags load ./... -p 1 -v -race -failfast
- name: Teardown
run: docker compose down
@@ -278,7 +278,6 @@ func (m *MonitoringService) run(ctx context.Context, cf clientconfig.ClientConfi
_, err = m.config.APIRepository.Workflow().DeleteWorkflow(ctx, cf.TenantId, workflowId)
if err != nil {
m.l.Error().Msgf("error deleting workflow: %s", err)
} else {
+16 -1
View File
@@ -5,6 +5,7 @@ package rampup
import (
"context"
"log"
"os"
"sync"
"testing"
"time"
@@ -41,6 +42,20 @@ func TestRampUp(t *testing.T) {
"loadtest",
)
// get ramp up duration from env
maxAcceptableDurationSeconds := 2 * time.Second
if os.Getenv("RAMP_UP_DURATION_TIMEOUT") != "" {
var parseErr error
maxAcceptableDurationSeconds, parseErr = time.ParseDuration(os.Getenv("RAMP_UP_DURATION_TIMEOUT"))
if parseErr != nil {
t.Fatalf("could not parse RAMP_UP_DURATION_TIMEOUT %s: %s", os.Getenv("RAMP_UP_DURATION_TIMEOUT"), parseErr)
}
}
log.Printf("TestRampUp with maxAcceptableDurationSeconds: %s", maxAcceptableDurationSeconds.String())
tests := []struct {
name string
args args
@@ -55,7 +70,7 @@ func TestRampUp(t *testing.T) {
delay: 0 * time.Second,
wait: 30 * time.Second,
includeDroppedEvents: true,
maxAcceptableDuration: 2 * time.Second,
maxAcceptableDuration: maxAcceptableDurationSeconds,
maxAcceptableSchedule: 2 * time.Second,
concurrency: 0,
},
+1
View File
@@ -18,6 +18,7 @@ require (
github.com/hatchet-dev/timediff v0.0.4
github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb
github.com/jackc/pgx/v5 v5.7.1
github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c
github.com/jackc/puddle/v2 v2.2.2
github.com/joho/godotenv v1.5.1
github.com/labstack/echo/v4 v4.13.1
+2
View File
@@ -154,6 +154,8 @@ github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb h1:pSv+zRVeAYjbX
github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb/go.mod h1:CRUuPsmIajLt3dZIlJ5+O8IDSib6y8yrst8DkCthTa4=
github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs=
github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA=
github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c h1:bTgmg761ac9Ki27HoLx8IBvc+T+Qj6eptBpKahKIRT4=
github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c/go.mod h1:N4E1APLOYrbM11HH5kdqAjDa8RJWVwD3JqWpvH22h64=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
+16
View File
@@ -0,0 +1,16 @@
package postgres
import (
"context"
)
func (p *PostgresMessageQueue) addTenantExchangeMessage(ctx context.Context, tenantId string, msgBytes []byte) error {
// determine if the exchange message is greater than 8kb
if len(msgBytes) > 8000 {
// if the message is greater than 8kb, store the message in the database
return p.repo.AddMessage(ctx, tenantId, msgBytes)
}
// if the message is less than 8kb, publish the message to the channel
return p.repo.Notify(ctx, tenantId, string(msgBytes))
}
+331
View File
@@ -0,0 +1,331 @@
package postgres
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/queueutils"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/logger"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
)
type PostgresMessageQueue struct {
repo repository.MessageQueueRepository
l *zerolog.Logger
qos int
upsertedQueues map[string]bool
upsertedQueuesMu sync.RWMutex
}
type MessageQueueImplOpt func(*MessageQueueImplOpts)
type MessageQueueImplOpts struct {
l *zerolog.Logger
qos int
}
func defaultMessageQueueImplOpts() *MessageQueueImplOpts {
l := logger.NewDefaultLogger("postgresmq")
return &MessageQueueImplOpts{
l: &l,
qos: 100,
}
}
func WithLogger(l *zerolog.Logger) MessageQueueImplOpt {
return func(opts *MessageQueueImplOpts) {
opts.l = l
}
}
func WithQos(qos int) MessageQueueImplOpt {
return func(opts *MessageQueueImplOpts) {
opts.qos = qos
}
}
func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) *PostgresMessageQueue {
opts := defaultMessageQueueImplOpts()
for _, f := range fs {
f(opts)
}
return &PostgresMessageQueue{
repo: repo,
l: opts.l,
qos: opts.qos,
upsertedQueues: make(map[string]bool),
}
}
func (p *PostgresMessageQueue) cleanup() error {
return nil
}
func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue) {
pCp := NewPostgresMQ(p.repo)
return pCp.cleanup, pCp
}
func (p *PostgresMessageQueue) SetQOS(prefetchCount int) {
p.qos = prefetchCount
}
func (p *PostgresMessageQueue) AddMessage(ctx context.Context, queue msgqueue.Queue, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "add-message")
defer span.End()
// inject otel carrier into the message
if task.OtelCarrier == nil {
task.OtelCarrier = telemetry.GetCarrier(ctx)
}
err := p.upsertQueue(ctx, queue)
if err != nil {
return err
}
msgBytes, err := json.Marshal(task)
if err != nil {
p.l.Error().Err(err).Msg("error marshalling message")
return err
}
err = p.repo.AddMessage(ctx, queue.Name(), msgBytes)
if err != nil {
p.l.Error().Err(err).Msg("error adding message")
return err
}
if task.TenantID() != "" {
return p.addTenantExchangeMessage(ctx, task.TenantID(), msgBytes)
}
return nil
}
func (p *PostgresMessageQueue) Subscribe(queue msgqueue.Queue, preAck msgqueue.AckHook, postAck msgqueue.AckHook) (func() error, error) {
err := p.upsertQueue(context.Background(), queue)
if err != nil {
return nil, err
}
subscribeCtx, cancel := context.WithCancel(context.Background())
// spawn a goroutine to update the lastActive time on the message queue every 60 seconds, if the queue is autoDeleted
go func() {
ticker := time.NewTicker(60 * time.Second)
for {
select {
case <-subscribeCtx.Done():
ticker.Stop()
return
case <-ticker.C:
err := p.repo.UpdateQueueLastActive(subscribeCtx, queue.Name())
if err != nil {
p.l.Error().Err(err).Msg("error updating lastActive time")
}
}
}
}()
doTask := func(task msgqueue.Message, ackId *int64) error {
err = preAck(&task)
if err != nil {
p.l.Error().Err(err).Msg("error pre-acking message")
return err
}
if ackId != nil {
err = p.repo.AckMessage(subscribeCtx, *ackId)
if err != nil {
p.l.Error().Err(err).Msg("error acking message")
return err
}
}
err = postAck(&task)
if err != nil {
p.l.Error().Err(err).Msg("error post-acking message")
return err
}
return nil
}
do := func(messages []*dbsqlc.ReadMessagesRow) error {
var errs error
for _, message := range messages {
var task msgqueue.Message
err := json.Unmarshal(message.Payload, &task)
if err != nil {
p.l.Error().Err(err).Msg("error unmarshalling message")
errs = multierror.Append(errs, err)
}
err = doTask(task, &message.ID)
if err != nil {
p.l.Error().Err(err).Msg("error running task")
errs = multierror.Append(errs, err)
}
}
return errs
}
op := queueutils.NewOperationPool(p.l, 60*time.Second, "postgresmq", queueutils.OpMethod(func(ctx context.Context, id string) (bool, error) {
messages, err := p.repo.ReadMessages(subscribeCtx, queue.Name(), p.qos)
if err != nil {
p.l.Error().Err(err).Msg("error reading messages")
}
var eg errgroup.Group
eg.Go(func() error {
return do(messages)
})
err = eg.Wait()
if err != nil {
p.l.Error().Err(err).Msg("error processing messages")
}
return len(messages) == p.qos, nil
}))
// we poll once per second for new messages
ticker := time.NewTicker(time.Second)
// we use the listener to poll for new messages more quickly
newMsgCh := make(chan struct{})
// start the listener
go func() {
err = p.repo.Listen(subscribeCtx, queue.Name(), func(ctx context.Context, notification *repository.PubMessage) error {
// if this is an exchange queue, and the message starts with JSON '{', then we process the message directly
if queue.FanoutExchangeKey() != "" && len(notification.Payload) >= 1 && notification.Payload[0] == '{' {
var task msgqueue.Message
err := json.Unmarshal([]byte(notification.Payload), &task)
if err != nil {
p.l.Error().Err(err).Msg("error unmarshalling message")
return err
}
return doTask(task, nil)
}
newMsgCh <- struct{}{}
return nil
})
if err != nil {
p.l.Error().Err(err).Msg("error listening for new messages")
return
}
}()
go func() {
for {
select {
case <-subscribeCtx.Done():
return
case <-ticker.C:
op.RunOrContinue(queue.Name())
case <-newMsgCh:
op.RunOrContinue(queue.Name())
}
}
}()
return func() error {
cancel()
ticker.Stop()
close(newMsgCh)
return nil
}, nil
}
func (p *PostgresMessageQueue) RegisterTenant(ctx context.Context, tenantId string) error {
return nil
}
func (p *PostgresMessageQueue) IsReady() bool {
return true
}
func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Queue) error {
// place a lock on the upserted queues
p.upsertedQueuesMu.RLock()
// check if the queue has been upserted
if _, exists := p.upsertedQueues[queue.Name()]; exists {
p.upsertedQueuesMu.RUnlock()
return nil
}
// otherwise, lock for writing
p.upsertedQueuesMu.RUnlock()
exclusive := queue.Exclusive()
// If the queue is a fanout exchange, then it is not exclusive. This is different from the RabbitMQ
// implementation, where a fanout exchange will map to an exclusively bound queue which has a random
// suffix appended to the queue name. In this implementation, there is no concept of an exchange.
if queue.FanoutExchangeKey() != "" {
exclusive = false
}
var consumer *string
if exclusive {
str := uuid.New().String()
consumer = &str
}
// bind the queue
err := p.repo.BindQueue(ctx, queue.Name(), queue.Durable(), queue.AutoDeleted(), exclusive, consumer)
if err != nil {
p.l.Error().Err(err).Msg("error binding queue")
return err
}
// place a lock on the upserted queues
p.upsertedQueuesMu.Lock()
defer p.upsertedQueuesMu.Unlock()
// add the queue to the upserted queues
p.upsertedQueues[queue.Name()] = true
return nil
}
@@ -265,6 +265,18 @@ func (rc *RetentionControllerImpl) Start() (func() error, error) {
cancel()
return nil, fmt.Errorf("could not set up runDeleteRetryQueueItems: %w", err)
}
_, err = rc.s.NewJob(
gocron.DurationJob(queueInterval),
gocron.NewTask(
rc.runDeleteMessageQueueItems(ctx),
),
)
if err != nil {
cancel()
return nil, fmt.Errorf("could not set up runDeleteMessageQueueItems: %w", err)
}
}
rc.s.Start()
@@ -0,0 +1,41 @@
package retention
import (
"context"
"time"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
)
func (rc *RetentionControllerImpl) runDeleteMessageQueueItems(ctx context.Context) func() {
return func() {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
rc.l.Debug().Msgf("retention controller: deleting queue items")
err := rc.ForTenants(ctx, rc.runDeleteMessageQueueItemsTenant)
if err != nil {
rc.l.Err(err).Msg("could not run delete queue items")
}
}
}
func (rc *RetentionControllerImpl) runDeleteMessageQueueItemsTenant(ctx context.Context, tenant dbsqlc.Tenant) error {
ctx, span := telemetry.NewSpan(ctx, "delete-queue-items-tenant")
defer span.End()
if tenant.Name != "internal" {
return nil
}
err := rc.repo.MessageQueue().CleanupQueues(ctx)
if err != nil {
return err
}
return rc.repo.MessageQueue().CleanupMessageQueueItems(ctx)
}
+18 -6
View File
@@ -21,6 +21,7 @@ import (
"github.com/hatchet-dev/hatchet/internal/integrations/email"
"github.com/hatchet-dev/hatchet/internal/integrations/email/postmark"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/msgqueue/postgres"
"github.com/hatchet-dev/hatchet/internal/msgqueue/rabbitmq"
"github.com/hatchet-dev/hatchet/internal/services/ingestor"
"github.com/hatchet-dev/hatchet/pkg/analytics"
@@ -278,12 +279,23 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF
var ing ingestor.Ingestor
if cf.MessageQueue.Enabled {
cleanup1, mq = rabbitmq.New(
rabbitmq.WithURL(cf.MessageQueue.RabbitMQ.URL),
rabbitmq.WithLogger(&l),
rabbitmq.WithQos(cf.MessageQueue.RabbitMQ.Qos),
rabbitmq.WithDisableTenantExchangePubs(cf.Runtime.DisableTenantPubs),
)
switch strings.ToLower(cf.MessageQueue.Kind) {
case "postgres":
l.Warn().Msg("Using a Postgres-backed message queue. This feature is still in beta.")
mq = postgres.NewPostgresMQ(
dc.EngineRepository.MessageQueue(),
postgres.WithLogger(&l),
postgres.WithQos(cf.MessageQueue.Postgres.Qos),
)
case "rabbitmq":
cleanup1, mq = rabbitmq.New(
rabbitmq.WithURL(cf.MessageQueue.RabbitMQ.URL),
rabbitmq.WithLogger(&l),
rabbitmq.WithQos(cf.MessageQueue.RabbitMQ.Qos),
rabbitmq.WithDisableTenantExchangePubs(cf.Runtime.DisableTenantPubs),
)
}
ing, err = ingestor.NewIngestor(
ingestor.WithEventRepository(dc.EngineRepository.Event()),
+7 -1
View File
@@ -338,11 +338,17 @@ type ConfigFileAuthCookie struct {
type MessageQueueConfigFile struct {
Enabled bool `mapstructure:"enabled" json:"enabled,omitempty" default:"true"`
Kind string `mapstructure:"kind" json:"kind,omitempty" validate:"required"`
Kind string `mapstructure:"kind" json:"kind,omitempty" validate:"required" default:"rabbitmq"`
Postgres PostgresMQConfigFile `mapstructure:"postgres" json:"postgres,omitempty"`
RabbitMQ RabbitMQConfigFile `mapstructure:"rabbitmq" json:"rabbitmq,omitempty" validate:"required"`
}
type PostgresMQConfigFile struct {
Qos int `mapstructure:"qos" json:"qos,omitempty" default:"100"`
}
type RabbitMQConfigFile struct {
URL string `mapstructure:"url" json:"url,omitempty" validate:"required" default:"amqp://user:password@localhost:5672/"`
Qos int `mapstructure:"qos" json:"qos,omitempty" default:"100"`
+29
View File
@@ -0,0 +1,29 @@
package repository
import (
"context"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
)
type PubMessage struct {
Channel string
Payload string
}
type MessageQueueRepository interface {
// PubSub
Listen(ctx context.Context, name string, f func(ctx context.Context, notification *PubMessage) error) error
Notify(ctx context.Context, name string, payload string) error
// Queues
BindQueue(ctx context.Context, queue string, durable, autoDeleted, exclusive bool, exclusiveConsumer *string) error
UpdateQueueLastActive(ctx context.Context, queue string) error
CleanupQueues(ctx context.Context) error
// Messages
AddMessage(ctx context.Context, queue string, payload []byte) error
ReadMessages(ctx context.Context, queue string, qos int) ([]*dbsqlc.ReadMessagesRow, error)
AckMessage(ctx context.Context, id int64) error
CleanupMessageQueueItems(ctx context.Context) error
}
+64
View File
@@ -0,0 +1,64 @@
package prisma
import (
"context"
"fmt"
"github.com/hatchet-dev/hatchet/pkg/repository/buffer"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
)
func newAckMQBuffer(shared *sharedRepository) (*buffer.TenantBufferManager[int64, int], error) {
userEventBufOpts := buffer.TenantBufManagerOpts[int64, int]{
Name: "ack_mq",
OutputFunc: shared.bulkAckMessages,
SizeFunc: sizeOfMessage,
L: shared.l,
V: shared.v,
}
manager, err := buffer.NewTenantBufManager(userEventBufOpts)
if err != nil {
shared.l.Err(err).Msg("could not create tenant buffer manager")
return nil, err
}
return manager, nil
}
func sizeOfMessage(item int64) int {
return 64
}
func (r *sharedRepository) bulkAckMessages(ctx context.Context, opts []int64) ([]*int, error) {
res := make([]*int, 0, len(opts))
for _, o := range opts {
i := int(o)
res = append(res, &i)
}
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 10000)
if err != nil {
return nil, fmt.Errorf("could not prepare transaction: %w", err)
}
defer rollback()
err = r.queries.BulkAckMessages(ctx, tx, opts)
if err != nil {
return nil, fmt.Errorf("could not ack messages: %w", err)
}
err = commit(ctx)
if err != nil {
return nil, fmt.Errorf("could not commit transaction: %w", err)
}
return res, nil
}
+83
View File
@@ -0,0 +1,83 @@
package prisma
import (
"context"
"fmt"
"time"
"github.com/hatchet-dev/hatchet/pkg/repository/buffer"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
"github.com/jackc/pgx/v5/pgtype"
)
func newAddMQBuffer(shared *sharedRepository) (*buffer.TenantBufferManager[addMessage, int], error) {
userEventBufOpts := buffer.TenantBufManagerOpts[addMessage, int]{
Name: "add_mq",
OutputFunc: shared.bulkAddMessages,
SizeFunc: sizeOfMQMessage,
L: shared.l,
V: shared.v,
}
manager, err := buffer.NewTenantBufManager(userEventBufOpts)
if err != nil {
shared.l.Err(err).Msg("could not create tenant buffer manager")
return nil, err
}
return manager, nil
}
func sizeOfMQMessage(item addMessage) int {
return len(item.payload)
}
type addMessage struct {
queue string
payload []byte
}
func (r *sharedRepository) bulkAddMessages(ctx context.Context, opts []addMessage) ([]*int, error) {
res := make([]*int, 0, len(opts))
p := []dbsqlc.BulkAddMessageParams{}
for index, opt := range opts {
i := index
res = append(res, &i)
p = append(p, dbsqlc.BulkAddMessageParams{
QueueId: pgtype.Text{
String: opt.queue,
Valid: true,
},
Payload: opt.payload,
ExpiresAt: sqlchelpers.TimestampFromTime(time.Now().UTC().Add(5 * time.Minute)),
ReadAfter: sqlchelpers.TimestampFromTime(time.Now().UTC()),
})
}
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 10000)
if err != nil {
return nil, fmt.Errorf("could not prepare transaction: %w", err)
}
defer rollback()
_, err = r.queries.BulkAddMessage(ctx, tx, p)
if err != nil {
return nil, fmt.Errorf("could not ack messages: %w", err)
}
err = commit(ctx)
if err != nil {
return nil, fmt.Errorf("could not commit transaction: %w", err)
}
return res, nil
}
+35
View File
@@ -9,6 +9,41 @@ import (
"context"
)
// iteratorForBulkAddMessage implements pgx.CopyFromSource.
type iteratorForBulkAddMessage struct {
rows []BulkAddMessageParams
skippedFirstNextCall bool
}
func (r *iteratorForBulkAddMessage) Next() bool {
if len(r.rows) == 0 {
return false
}
if !r.skippedFirstNextCall {
r.skippedFirstNextCall = true
return true
}
r.rows = r.rows[1:]
return len(r.rows) > 0
}
func (r iteratorForBulkAddMessage) Values() ([]interface{}, error) {
return []interface{}{
r.rows[0].Payload,
r.rows[0].QueueId,
r.rows[0].ReadAfter,
r.rows[0].ExpiresAt,
}, nil
}
func (r iteratorForBulkAddMessage) Err() error {
return nil
}
func (q *Queries) BulkAddMessage(ctx context.Context, db DBTX, arg []BulkAddMessageParams) (int64, error) {
return db.CopyFrom(ctx, []string{"MessageQueueItem"}, []string{"payload", "queueId", "readAfter", "expiresAt"}, &iteratorForBulkAddMessage{rows: arg})
}
// iteratorForCreateEvents implements pgx.CopyFromSource.
type iteratorForCreateEvents struct {
rows []CreateEventsParams
+60
View File
@@ -362,6 +362,48 @@ func (ns NullLogLineLevel) Value() (driver.Value, error) {
return string(ns.LogLineLevel), nil
}
type MessageQueueItemStatus string
const (
MessageQueueItemStatusPENDING MessageQueueItemStatus = "PENDING"
MessageQueueItemStatusASSIGNED MessageQueueItemStatus = "ASSIGNED"
)
func (e *MessageQueueItemStatus) Scan(src interface{}) error {
switch s := src.(type) {
case []byte:
*e = MessageQueueItemStatus(s)
case string:
*e = MessageQueueItemStatus(s)
default:
return fmt.Errorf("unsupported scan type for MessageQueueItemStatus: %T", src)
}
return nil
}
type NullMessageQueueItemStatus struct {
MessageQueueItemStatus MessageQueueItemStatus `json:"MessageQueueItemStatus"`
Valid bool `json:"valid"` // Valid is true if MessageQueueItemStatus is not NULL
}
// Scan implements the Scanner interface.
func (ns *NullMessageQueueItemStatus) Scan(value interface{}) error {
if value == nil {
ns.MessageQueueItemStatus, ns.Valid = "", false
return nil
}
ns.Valid = true
return ns.MessageQueueItemStatus.Scan(value)
}
// Value implements the driver Valuer interface.
func (ns NullMessageQueueItemStatus) Value() (driver.Value, error) {
if !ns.Valid {
return nil, nil
}
return string(ns.MessageQueueItemStatus), nil
}
type StepExpressionKind string
const (
@@ -1269,6 +1311,24 @@ type LogLine struct {
Metadata []byte `json:"metadata"`
}
type MessageQueue struct {
Name string `json:"name"`
LastActive pgtype.Timestamp `json:"lastActive"`
Durable bool `json:"durable"`
AutoDeleted bool `json:"autoDeleted"`
Exclusive bool `json:"exclusive"`
ExclusiveConsumerId pgtype.UUID `json:"exclusiveConsumerId"`
}
type MessageQueueItem struct {
ID int64 `json:"id"`
Payload []byte `json:"payload"`
ReadAfter pgtype.Timestamp `json:"readAfter"`
ExpiresAt pgtype.Timestamp `json:"expiresAt"`
QueueId pgtype.Text `json:"queueId"`
Status MessageQueueItemStatus `json:"status"`
}
type Queue struct {
ID int64 `json:"id"`
TenantId pgtype.UUID `json:"tenantId"`
+124
View File
@@ -0,0 +1,124 @@
-- name: UpsertMessageQueue :one
INSERT INTO
"MessageQueue" (
"name",
"durable",
"autoDeleted",
"exclusive",
"exclusiveConsumerId"
)
VALUES (
@name::text,
@durable::boolean,
@autoDeleted::boolean,
@exclusive::boolean,
CASE WHEN sqlc.narg('exclusiveConsumerId')::uuid IS NOT NULL THEN sqlc.narg('exclusiveConsumerId')::uuid ELSE NULL END
) ON CONFLICT ("name") DO UPDATE
SET
"durable" = @durable::boolean,
"autoDeleted" = @autoDeleted::boolean,
"exclusive" = @exclusive::boolean,
"exclusiveConsumerId" = CASE WHEN sqlc.narg('exclusiveConsumerId')::uuid IS NOT NULL THEN sqlc.narg('exclusiveConsumerId')::uuid ELSE NULL END
RETURNING *;
-- name: UpdateMessageQueueActive :exec
UPDATE
"MessageQueue"
SET
"lastActive" = NOW()
WHERE
"name" = @name::text;
-- name: CleanupMessageQueue :exec
DELETE FROM
"MessageQueue"
WHERE
"lastActive" < NOW() - INTERVAL '1 hour'
AND "autoDeleted" = true;
-- name: AddMessage :exec
INSERT INTO
"MessageQueueItem" (
"payload",
"queueId",
"readAfter",
"expiresAt"
)
VALUES
(
@payload::jsonb,
@queueId::text,
NOW(),
NOW() + INTERVAL '5 minutes'
);
-- name: BulkAddMessage :copyfrom
INSERT INTO
"MessageQueueItem" (
"payload",
"queueId",
"readAfter",
"expiresAt"
)
VALUES (
$1,
$2,
$3,
$4
);
-- name: ReadMessages :many
WITH messages AS (
SELECT
*
FROM
"MessageQueueItem"
WHERE
"expiresAt" > NOW()
AND "queueId" = @queueId::text
AND "readAfter" <= NOW()
AND "status" = 'PENDING'
ORDER BY
"id" ASC
LIMIT
COALESCE(sqlc.narg('limit')::integer, 1000)
FOR UPDATE SKIP LOCKED
)
UPDATE
"MessageQueueItem"
SET
"status" = 'ASSIGNED'
FROM
messages
WHERE
"MessageQueueItem"."id" = messages."id"
RETURNING messages.*;
-- name: BulkAckMessages :exec
DELETE FROM
"MessageQueueItem"
WHERE
"id" = ANY(@ids::bigint[])
AND "status" = 'ASSIGNED';
-- name: DeleteExpiredMessages :exec
DELETE FROM
"MessageQueueItem"
WHERE
"expiresAt" < NOW();
-- name: GetMinMaxExpiredMessageQueueItems :one
SELECT
COALESCE(MIN("id"), 0)::bigint AS "minId",
COALESCE(MAX("id"), 0)::bigint AS "maxId"
FROM
"MessageQueueItem"
WHERE
"expiresAt" < NOW();
-- name: CleanupMessageQueueItems :exec
DELETE FROM "MessageQueueItem"
WHERE "expiresAt" < NOW()
AND
"id" >= @minId::bigint
AND "id" <= @maxId::bigint;
+259
View File
@@ -0,0 +1,259 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.24.0
// source: mq.sql
package dbsqlc
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const addMessage = `-- name: AddMessage :exec
INSERT INTO
"MessageQueueItem" (
"payload",
"queueId",
"readAfter",
"expiresAt"
)
VALUES
(
$1::jsonb,
$2::text,
NOW(),
NOW() + INTERVAL '5 minutes'
)
`
type AddMessageParams struct {
Payload []byte `json:"payload"`
Queueid string `json:"queueid"`
}
func (q *Queries) AddMessage(ctx context.Context, db DBTX, arg AddMessageParams) error {
_, err := db.Exec(ctx, addMessage, arg.Payload, arg.Queueid)
return err
}
const bulkAckMessages = `-- name: BulkAckMessages :exec
DELETE FROM
"MessageQueueItem"
WHERE
"id" = ANY($1::bigint[])
AND "status" = 'ASSIGNED'
`
func (q *Queries) BulkAckMessages(ctx context.Context, db DBTX, ids []int64) error {
_, err := db.Exec(ctx, bulkAckMessages, ids)
return err
}
type BulkAddMessageParams struct {
Payload []byte `json:"payload"`
QueueId pgtype.Text `json:"queueId"`
ReadAfter pgtype.Timestamp `json:"readAfter"`
ExpiresAt pgtype.Timestamp `json:"expiresAt"`
}
const cleanupMessageQueue = `-- name: CleanupMessageQueue :exec
DELETE FROM
"MessageQueue"
WHERE
"lastActive" < NOW() - INTERVAL '1 hour'
AND "autoDeleted" = true
`
func (q *Queries) CleanupMessageQueue(ctx context.Context, db DBTX) error {
_, err := db.Exec(ctx, cleanupMessageQueue)
return err
}
const cleanupMessageQueueItems = `-- name: CleanupMessageQueueItems :exec
DELETE FROM "MessageQueueItem"
WHERE "expiresAt" < NOW()
AND
"id" >= $1::bigint
AND "id" <= $2::bigint
`
type CleanupMessageQueueItemsParams struct {
Minid int64 `json:"minid"`
Maxid int64 `json:"maxid"`
}
func (q *Queries) CleanupMessageQueueItems(ctx context.Context, db DBTX, arg CleanupMessageQueueItemsParams) error {
_, err := db.Exec(ctx, cleanupMessageQueueItems, arg.Minid, arg.Maxid)
return err
}
const deleteExpiredMessages = `-- name: DeleteExpiredMessages :exec
DELETE FROM
"MessageQueueItem"
WHERE
"expiresAt" < NOW()
`
func (q *Queries) DeleteExpiredMessages(ctx context.Context, db DBTX) error {
_, err := db.Exec(ctx, deleteExpiredMessages)
return err
}
const getMinMaxExpiredMessageQueueItems = `-- name: GetMinMaxExpiredMessageQueueItems :one
SELECT
COALESCE(MIN("id"), 0)::bigint AS "minId",
COALESCE(MAX("id"), 0)::bigint AS "maxId"
FROM
"MessageQueueItem"
WHERE
"expiresAt" < NOW()
`
type GetMinMaxExpiredMessageQueueItemsRow struct {
MinId int64 `json:"minId"`
MaxId int64 `json:"maxId"`
}
func (q *Queries) GetMinMaxExpiredMessageQueueItems(ctx context.Context, db DBTX) (*GetMinMaxExpiredMessageQueueItemsRow, error) {
row := db.QueryRow(ctx, getMinMaxExpiredMessageQueueItems)
var i GetMinMaxExpiredMessageQueueItemsRow
err := row.Scan(&i.MinId, &i.MaxId)
return &i, err
}
const readMessages = `-- name: ReadMessages :many
WITH messages AS (
SELECT
id, payload, "readAfter", "expiresAt", "queueId", status
FROM
"MessageQueueItem"
WHERE
"expiresAt" > NOW()
AND "queueId" = $1::text
AND "readAfter" <= NOW()
AND "status" = 'PENDING'
ORDER BY
"id" ASC
LIMIT
COALESCE($2::integer, 1000)
FOR UPDATE SKIP LOCKED
)
UPDATE
"MessageQueueItem"
SET
"status" = 'ASSIGNED'
FROM
messages
WHERE
"MessageQueueItem"."id" = messages."id"
RETURNING messages.id, messages.payload, messages."readAfter", messages."expiresAt", messages."queueId", messages.status
`
type ReadMessagesParams struct {
Queueid string `json:"queueid"`
Limit pgtype.Int4 `json:"limit"`
}
type ReadMessagesRow struct {
ID int64 `json:"id"`
Payload []byte `json:"payload"`
ReadAfter pgtype.Timestamp `json:"readAfter"`
ExpiresAt pgtype.Timestamp `json:"expiresAt"`
QueueId pgtype.Text `json:"queueId"`
Status MessageQueueItemStatus `json:"status"`
}
func (q *Queries) ReadMessages(ctx context.Context, db DBTX, arg ReadMessagesParams) ([]*ReadMessagesRow, error) {
rows, err := db.Query(ctx, readMessages, arg.Queueid, arg.Limit)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ReadMessagesRow
for rows.Next() {
var i ReadMessagesRow
if err := rows.Scan(
&i.ID,
&i.Payload,
&i.ReadAfter,
&i.ExpiresAt,
&i.QueueId,
&i.Status,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const updateMessageQueueActive = `-- name: UpdateMessageQueueActive :exec
UPDATE
"MessageQueue"
SET
"lastActive" = NOW()
WHERE
"name" = $1::text
`
func (q *Queries) UpdateMessageQueueActive(ctx context.Context, db DBTX, name string) error {
_, err := db.Exec(ctx, updateMessageQueueActive, name)
return err
}
const upsertMessageQueue = `-- name: UpsertMessageQueue :one
INSERT INTO
"MessageQueue" (
"name",
"durable",
"autoDeleted",
"exclusive",
"exclusiveConsumerId"
)
VALUES (
$1::text,
$2::boolean,
$3::boolean,
$4::boolean,
CASE WHEN $5::uuid IS NOT NULL THEN $5::uuid ELSE NULL END
) ON CONFLICT ("name") DO UPDATE
SET
"durable" = $2::boolean,
"autoDeleted" = $3::boolean,
"exclusive" = $4::boolean,
"exclusiveConsumerId" = CASE WHEN $5::uuid IS NOT NULL THEN $5::uuid ELSE NULL END
RETURNING name, "lastActive", durable, "autoDeleted", exclusive, "exclusiveConsumerId"
`
type UpsertMessageQueueParams struct {
Name string `json:"name"`
Durable bool `json:"durable"`
Autodeleted bool `json:"autodeleted"`
Exclusive bool `json:"exclusive"`
ExclusiveConsumerId pgtype.UUID `json:"exclusiveConsumerId"`
}
func (q *Queries) UpsertMessageQueue(ctx context.Context, db DBTX, arg UpsertMessageQueueParams) (*MessageQueue, error) {
row := db.QueryRow(ctx, upsertMessageQueue,
arg.Name,
arg.Durable,
arg.Autodeleted,
arg.Exclusive,
arg.ExclusiveConsumerId,
)
var i MessageQueue
err := row.Scan(
&i.Name,
&i.LastActive,
&i.Durable,
&i.AutoDeleted,
&i.Exclusive,
&i.ExclusiveConsumerId,
)
return &i, err
}
+1
View File
@@ -24,6 +24,7 @@ sql:
- webhook_workers.sql
- queue.sql
- lease.sql
- mq.sql
schema:
- ../../../../sql/schema/schema.sql
strict_order_by: false
+3 -7
View File
@@ -230,8 +230,7 @@ tenants_to_update AS (
ROW_NUMBER() OVER () AS row_number
FROM
"Tenant" AS tenants
WHERE
tenants."slug" != 'internal'
-- For the controller partition, we DO use the internal tenant as well
)
UPDATE
"Tenant" AS tenants
@@ -267,11 +266,8 @@ WITH active_partitions AS (
FROM
"Tenant" AS tenants
WHERE
tenants."slug" != 'internal' AND
(
"controllerPartitionId" IS NULL OR
"controllerPartitionId" IN (SELECT "id" FROM inactive_partitions)
)
"controllerPartitionId" IS NULL OR
"controllerPartitionId" IN (SELECT "id" FROM inactive_partitions)
), update_tenants AS (
UPDATE "Tenant" AS tenants
SET "controllerPartitionId" = partitions."id"
+3 -7
View File
@@ -723,8 +723,7 @@ tenants_to_update AS (
ROW_NUMBER() OVER () AS row_number
FROM
"Tenant" AS tenants
WHERE
tenants."slug" != 'internal'
-- For the controller partition, we DO use the internal tenant as well
)
UPDATE
"Tenant" AS tenants
@@ -838,11 +837,8 @@ WITH active_partitions AS (
FROM
"Tenant" AS tenants
WHERE
tenants."slug" != 'internal' AND
(
"controllerPartitionId" IS NULL OR
"controllerPartitionId" IN (SELECT "id" FROM inactive_partitions)
)
"controllerPartitionId" IS NULL OR
"controllerPartitionId" IN (SELECT "id" FROM inactive_partitions)
), update_tenants AS (
UPDATE "Tenant" AS tenants
SET "controllerPartitionId" = partitions."id"
+184
View File
@@ -0,0 +1,184 @@
package prisma
import (
"context"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgxlisten"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
)
type messageQueueRepository struct {
*sharedRepository
}
func NewMessageQueueRepository(shared *sharedRepository) *messageQueueRepository {
return &messageQueueRepository{
sharedRepository: shared,
}
}
func (m *messageQueueRepository) Listen(ctx context.Context, name string, f func(ctx context.Context, notification *repository.PubMessage) error) error {
l := &pgxlisten.Listener{
Connect: func(ctx context.Context) (*pgx.Conn, error) {
pgxpoolConn, err := m.pool.Acquire(ctx)
if err != nil {
return nil, err
}
return pgxpoolConn.Conn(), nil
},
LogError: func(ctx context.Context, err error) {
m.l.Warn().Err(err).Msg("error in listener")
},
ReconnectDelay: 10 * time.Second,
}
var handler pgxlisten.HandlerFunc = func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error {
return f(ctx, &repository.PubMessage{
Channel: notification.Channel,
Payload: notification.Payload,
})
}
l.Handle(name, handler)
return l.Listen(ctx)
}
func (m *messageQueueRepository) Notify(ctx context.Context, name string, payload string) error {
_, err := m.pool.Exec(ctx, "select pg_notify($1,$2)", pgx.Identifier{name}.Sanitize(), payload)
return err
}
func (m *messageQueueRepository) AddMessage(ctx context.Context, queue string, payload []byte) error {
// NOTE: hack for tenant, just passing in an empty string for now
_, err := m.bulkAddMQBuffer.FireAndWait(ctx, "", addMessage{
queue: queue,
payload: payload,
})
return err
}
func (m *messageQueueRepository) BindQueue(ctx context.Context, queue string, durable, autoDeleted, exclusive bool, exclusiveConsumer *string) error {
// if exclusive, but no consumer, return error
if exclusive && exclusiveConsumer == nil {
return errors.New("exclusive queue must have exclusive consumer")
}
params := dbsqlc.UpsertMessageQueueParams{
Name: queue,
Durable: durable,
Autodeleted: autoDeleted,
Exclusive: exclusive,
}
if exclusiveConsumer != nil {
params.ExclusiveConsumerId = sqlchelpers.UUIDFromStr(*exclusiveConsumer)
}
_, err := m.queries.UpsertMessageQueue(ctx, m.pool, params)
return err
}
func (m *messageQueueRepository) UpdateQueueLastActive(ctx context.Context, queue string) error {
return m.queries.UpdateMessageQueueActive(ctx, m.pool, queue)
}
func (m *messageQueueRepository) CleanupQueues(ctx context.Context) error {
return m.queries.CleanupMessageQueue(ctx, m.pool)
}
func (m *messageQueueRepository) ReadMessages(ctx context.Context, queue string, qos int) ([]*dbsqlc.ReadMessagesRow, error) {
ctx, span := telemetry.NewSpan(ctx, "pgmq-read-messages")
defer span.End()
return m.queries.ReadMessages(ctx, m.pool, dbsqlc.ReadMessagesParams{
Queueid: queue,
Limit: pgtype.Int4{Int32: int32(qos), Valid: true}, // nolint: gosec
})
}
func (m *messageQueueRepository) AckMessage(ctx context.Context, id int64) error {
// NOTE: hack for tenant, just passing in an empty string for now
return m.bulkAckMQBuffer.FireForget("", id)
}
func (m *messageQueueRepository) CleanupMessageQueueItems(ctx context.Context) error {
// setup telemetry
ctx, span := telemetry.NewSpan(ctx, "cleanup-message-queues-database")
defer span.End()
// get the min and max queue items
minMax, err := m.queries.GetMinMaxExpiredMessageQueueItems(ctx, m.pool)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
return fmt.Errorf("could not get min max processed queue items: %w", err)
}
if minMax == nil {
return nil
}
minId := minMax.MinId
maxId := minMax.MaxId
if maxId == 0 {
return nil
}
// iterate until we have no more queue items to process
var batchSize int64 = 10000
var currBatch int64
for {
if ctx.Err() != nil {
return ctx.Err()
}
currBatch++
currMax := minId + batchSize*currBatch
if currMax > maxId {
currMax = maxId
}
// get the next batch of queue items
err := m.queries.CleanupMessageQueueItems(ctx, m.pool, dbsqlc.CleanupMessageQueueItemsParams{
Minid: minId,
Maxid: minId + batchSize*currBatch,
})
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
return fmt.Errorf("could not cleanup queue items: %w", err)
}
if currMax == maxId {
break
}
}
return nil
}
+6
View File
@@ -215,6 +215,7 @@ type engineRepository struct {
rateLimit repository.RateLimitEngineRepository
webhookWorker repository.WebhookWorkerEngineRepository
scheduler repository.SchedulerRepository
mq repository.MessageQueueRepository
}
func (r *engineRepository) Health() repository.HealthRepository {
@@ -293,6 +294,10 @@ func (r *engineRepository) Scheduler() repository.SchedulerRepository {
return r.scheduler
}
func (r *engineRepository) MessageQueue() repository.MessageQueueRepository {
return r.mq
}
func NewEngineRepository(pool *pgxpool.Pool, essentialPool *pgxpool.Pool, cf *server.ConfigFileRuntime, fs ...PrismaRepositoryOpt) (func() error, repository.EngineRepository, error) {
opts := defaultPrismaRepositoryOpts()
@@ -343,6 +348,7 @@ func NewEngineRepository(pool *pgxpool.Pool, essentialPool *pgxpool.Pool, cf *se
rateLimit: NewRateLimitEngineRepository(pool, opts.v, opts.l),
webhookWorker: NewWebhookWorkerEngineRepository(pool, opts.v, opts.l),
scheduler: newSchedulerRepository(shared),
mq: NewMessageQueueRepository(shared),
},
err
}
+28
View File
@@ -25,6 +25,8 @@ type sharedRepository struct {
bulkQueuer *buffer.TenantBufferManager[bulkQueueStepRunOpts, pgtype.UUID]
bulkUserEventBuffer *buffer.TenantBufferManager[*repository.CreateEventOpts, dbsqlc.Event]
bulkWorkflowRunBuffer *buffer.TenantBufferManager[*repository.CreateWorkflowRunOpts, dbsqlc.WorkflowRun]
bulkAckMQBuffer *buffer.TenantBufferManager[int64, int]
bulkAddMQBuffer *buffer.TenantBufferManager[addMessage, int]
wrRunningCallbacks []repository.TenantScopedCallback[pgtype.UUID]
}
@@ -75,12 +77,26 @@ func newSharedRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.L
return nil, nil, err
}
ackMQBuffer, err := newAckMQBuffer(s)
if err != nil {
return nil, nil, err
}
addMQBuffer, err := newAddMQBuffer(s)
if err != nil {
return nil, nil, err
}
s.bulkStatusBuffer = statusBuffer
s.bulkEventBuffer = eventBuffer
s.bulkSemaphoreReleaser = semaphoreReleaser
s.bulkQueuer = queuer
s.bulkUserEventBuffer = userEventBuffer
s.bulkWorkflowRunBuffer = workflowRunBuffer
s.bulkAckMQBuffer = ackMQBuffer
s.bulkAddMQBuffer = addMQBuffer
return s, func() error {
var multiErr error
@@ -121,6 +137,18 @@ func newSharedRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.L
multiErr = multierror.Append(multiErr, err)
}
err = ackMQBuffer.Cleanup()
if err != nil {
multiErr = multierror.Append(multiErr, err)
}
err = addMQBuffer.Cleanup()
if err != nil {
multiErr = multierror.Append(multiErr, err)
}
return multiErr
}, nil
}
+3 -4
View File
@@ -609,9 +609,8 @@ func (w *workflowRunAPIRepository) GetStepRunsForJobRuns(ctx context.Context, te
type workflowRunEngineRepository struct {
*sharedRepository
m *metered.Metered
cf *server.ConfigFileRuntime
stepRunRepository *stepRunEngineRepository
m *metered.Metered
cf *server.ConfigFileRuntime
createCallbacks []repository.TenantScopedCallback[*dbsqlc.WorkflowRun]
queuedCallbacks []repository.TenantScopedCallback[pgtype.UUID]
@@ -1446,7 +1445,7 @@ func (s *workflowRunEngineRepository) ReplayWorkflowRun(ctx context.Context, ten
sev := dbsqlc.StepRunEventSeverityINFO
reason := dbsqlc.StepRunEventReasonRETRIEDBYUSER
defer s.stepRunRepository.deferredStepRunEvent(
defer s.deferredStepRunEvent(
tenantId,
repository.CreateStepRunEventOpts{
StepRunId: stepRunIdStr,
+1
View File
@@ -46,6 +46,7 @@ type EngineRepository interface {
RateLimit() RateLimitEngineRepository
WebhookWorker() WebhookWorkerEngineRepository
Scheduler() SchedulerRepository
MessageQueue() MessageQueueRepository
}
type EntitlementsRepository interface {
+62 -1
View File
@@ -3,4 +3,65 @@ ALTER TYPE "ConcurrencyLimitStrategy" ADD VALUE 'CANCEL_NEWEST';
-- Add value to enum type: "WorkflowRunStatus"
ALTER TYPE "WorkflowRunStatus" ADD VALUE 'CANCELLING';
-- Add value to enum type: "WorkflowRunStatus"
ALTER TYPE "WorkflowRunStatus" ADD VALUE 'CANCELLED';
ALTER TYPE "WorkflowRunStatus" ADD VALUE 'CANCELLED';
-- Create enum type "MessageQueueItemStatus"
CREATE TYPE "MessageQueueItemStatus" AS ENUM('PENDING', 'ASSIGNED');
-- Create "MessageQueue" table
CREATE TABLE
"MessageQueue" (
"name" text NOT NULL,
"lastActive" TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP,
"durable" boolean NOT NULL DEFAULT true,
"autoDeleted" boolean NOT NULL DEFAULT false,
"exclusive" boolean NOT NULL DEFAULT false,
"exclusiveConsumerId" uuid NULL,
PRIMARY KEY ("name")
);
-- Create "MessageQueueItem" table
CREATE TABLE
"MessageQueueItem" (
"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
"payload" jsonb NOT NULL,
"readAfter" timestamp(3) NULL,
"expiresAt" timestamp(3) NULL,
"queueId" text,
"status" "MessageQueueItemStatus" NOT NULL DEFAULT 'PENDING',
PRIMARY KEY ("id"),
CONSTRAINT "MessageQueueItem_queueId_fkey" FOREIGN KEY ("queueId") REFERENCES "MessageQueue" ("name") ON UPDATE NO ACTION ON DELETE SET NULL
);
-- Create index "MessageQueueItem_queueId_expiresAt_readAfter_status_id_idx" to table: "MessageQueueItem"
CREATE INDEX "MessageQueueItem_queueId_expiresAt_readAfter_status_id_idx" ON "MessageQueueItem" (
"expiresAt",
"queueId",
"readAfter",
"status",
"id"
);
-- Function to publish NOTIFY message on insert into MessageQueueItem
CREATE
OR REPLACE FUNCTION notify_message_queue_item () RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
NEW."queueId"::TEXT,
NEW."id"::TEXT
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Trigger to invoke the notify function after insert
CREATE TRIGGER trigger_notify_message_queue_item
AFTER INSERT ON "MessageQueueItem" FOR EACH ROW
EXECUTE FUNCTION notify_message_queue_item ();
-- Update the existing function to prevent internal name or slug to be a no-op
CREATE
OR REPLACE FUNCTION prevent_internal_name_or_slug () RETURNS trigger AS $$
BEGIN
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
+2 -2
View File
@@ -1,4 +1,4 @@
h1:kdjvzXzgnUl33aT2nO6fMW3ZNMIfO5zf4/dxS6cp1sQ=
h1:1Az5U4thlaLVJj4xo1BN9WtRVjaMytq41j5vy94dyuE=
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
@@ -79,4 +79,4 @@ h1:kdjvzXzgnUl33aT2nO6fMW3ZNMIfO5zf4/dxS6cp1sQ=
20241204191714_v0.52.5.sql h1:6oJgHJynK+YtwQoD/VnqiCMda409K96A4Oq2l8h3dQ0=
20241206231312_v0.52.12.sql h1:6L/zXbiVC24nqSzJzqItPFKCA3HPyMk0T5pBPnmXQgg=
20241216175807_v0.52.13.sql h1:rMwIaYvy3WX/F7/go1J3vI+WNYnABpASv0ATPJt1pE8=
20241217152316_v0.53.0.sql h1:sXmW2KigCn3hGZxCJhSPk6GjO3b+ppDgfMiLz5Xv3RQ=
20241217152316_v0.53.0.sql h1:iFz58oq8r6rDcM3HcainoblLXwOpCgayvNdQwC77Sho=
+32
View File
@@ -1077,6 +1077,38 @@ CREATE TABLE
CREATE TABLE
"_WorkflowToWorkflowTag" ("A" UUID NOT NULL, "B" UUID NOT NULL);
-- CreateTable
CREATE TABLE "MessageQueue" (
"name" TEXT NOT NULL,
"lastActive" TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP,
"durable" BOOLEAN NOT NULL DEFAULT true,
"autoDeleted" BOOLEAN NOT NULL DEFAULT false,
"exclusive" BOOLEAN NOT NULL DEFAULT false,
"exclusiveConsumerId" UUID,
CONSTRAINT "MessageQueue_pkey" PRIMARY KEY ("name")
);
-- CreateEnum
CREATE TYPE "MessageQueueItemStatus" AS ENUM (
'PENDING',
'ASSIGNED'
);
-- CreateTable
CREATE TABLE "MessageQueueItem" (
"id" bigint GENERATED ALWAYS AS IDENTITY,
"payload" JSONB NOT NULL,
"readAfter" TIMESTAMP(3),
"expiresAt" TIMESTAMP(3),
"queueId" TEXT,
"status" "MessageQueueItemStatus" NOT NULL DEFAULT 'PENDING',
CONSTRAINT "MessageQueueItem_pkey" PRIMARY KEY ("id"),
CONSTRAINT "MessageQueueItem_queueId_fkey" FOREIGN KEY ("queueId") REFERENCES "MessageQueue" ("name") ON DELETE SET NULL
);
-- Create an index for message queue item
CREATE INDEX "MessageQueueItem_queueId_expiresAt_readAfter_status_id_idx" ON "MessageQueueItem" ("expiresAt", "queueId", "readAfter", "status", "id");
-- CreateIndex
CREATE UNIQUE INDEX "APIToken_id_key" ON "APIToken" ("id" ASC);