mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-24 19:29:16 -05:00
test: improve Go testing harness (#1631)
* test: improves testing harness for engine * update CI test * fix: race condition in test * make tests more stable * cleanup pub and sub buffers * fix: goleak on rampup test * feat: matrix tests for engine
This commit is contained in:
+9
-124
@@ -293,8 +293,11 @@ jobs:
|
||||
load:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 30
|
||||
env:
|
||||
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet?sslmode=disable
|
||||
strategy:
|
||||
matrix:
|
||||
migrate-strategy: ["latest", "penultimate"]
|
||||
rabbitmq-enabled: ["true", "false"]
|
||||
pg-version: ["17-alpine", "16-alpine", "15-alpine"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
@@ -312,131 +315,13 @@ jobs:
|
||||
version: 9.15.4
|
||||
run_install: false
|
||||
|
||||
- name: Install Atlas
|
||||
run: |
|
||||
curl -sSf https://atlasgo.sh | sh
|
||||
|
||||
- name: Compose
|
||||
run: docker compose up -d
|
||||
|
||||
- name: Go deps
|
||||
run: go mod download
|
||||
|
||||
- name: Prepare
|
||||
run: |
|
||||
cat > .env <<EOF
|
||||
DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet'
|
||||
SERVER_TLS_CERT_FILE=./hack/dev/certs/cluster.pem
|
||||
SERVER_TLS_KEY_FILE=./hack/dev/certs/cluster.key
|
||||
SERVER_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
|
||||
SERVER_PORT=8080
|
||||
SERVER_URL=http://localhost:8080
|
||||
SERVER_AUTH_COOKIE_SECRETS="something something"
|
||||
SERVER_AUTH_COOKIE_DOMAIN=app.dev.hatchet-tools.com
|
||||
SERVER_AUTH_COOKIE_INSECURE=false
|
||||
SERVER_AUTH_SET_EMAIL_VERIFIED=true
|
||||
SERVER_LOGGER_LEVEL=warn
|
||||
SERVER_LOGGER_FORMAT=console
|
||||
DATABASE_LOGGER_LEVEL=warn
|
||||
DATABASE_LOGGER_FORMAT=console
|
||||
EOF
|
||||
|
||||
- name: Generate
|
||||
run: |
|
||||
go run ./cmd/hatchet-migrate
|
||||
task generate-go
|
||||
task generate-certs
|
||||
task generate-local-encryption-keys
|
||||
|
||||
- name: Setup
|
||||
run: |
|
||||
set -a
|
||||
. .env
|
||||
set +a
|
||||
|
||||
go run ./cmd/hatchet-admin quickstart --generated-config-dir ./generated/
|
||||
|
||||
- name: Test
|
||||
run: |
|
||||
export HATCHET_CLIENT_TOKEN="$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)"
|
||||
|
||||
go test -tags load ./... -p 1 -v -race -failfast
|
||||
|
||||
- name: Teardown
|
||||
run: docker compose down
|
||||
|
||||
load-pgmq:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 30
|
||||
env:
|
||||
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet?sslmode=disable
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install Task
|
||||
uses: arduino/setup-task@v2
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: "1.22"
|
||||
|
||||
- name: Setup pnpm
|
||||
uses: pnpm/action-setup@v4
|
||||
with:
|
||||
version: 9.15.4
|
||||
run_install: false
|
||||
|
||||
- name: Install Atlas
|
||||
run: |
|
||||
curl -sSf https://atlasgo.sh | sh
|
||||
|
||||
- name: Compose
|
||||
run: docker compose up -d
|
||||
|
||||
- name: Go deps
|
||||
run: go mod download
|
||||
|
||||
- name: Prepare
|
||||
run: |
|
||||
cat > .env <<EOF
|
||||
DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet'
|
||||
SERVER_TLS_CERT_FILE=./hack/dev/certs/cluster.pem
|
||||
SERVER_TLS_KEY_FILE=./hack/dev/certs/cluster.key
|
||||
SERVER_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
|
||||
SERVER_PORT=8080
|
||||
SERVER_URL=http://localhost:8080
|
||||
SERVER_AUTH_COOKIE_SECRETS="something something"
|
||||
SERVER_AUTH_COOKIE_DOMAIN=app.dev.hatchet-tools.com
|
||||
SERVER_AUTH_COOKIE_INSECURE=false
|
||||
SERVER_AUTH_SET_EMAIL_VERIFIED=true
|
||||
SERVER_LOGGER_LEVEL=warn
|
||||
SERVER_LOGGER_FORMAT=console
|
||||
DATABASE_LOGGER_LEVEL=warn
|
||||
DATABASE_LOGGER_FORMAT=console
|
||||
SERVER_TASKQUEUE_KIND=postgres
|
||||
EOF
|
||||
|
||||
- name: Generate
|
||||
run: |
|
||||
go run ./cmd/hatchet-migrate
|
||||
task generate-go
|
||||
task generate-certs
|
||||
task generate-local-encryption-keys
|
||||
|
||||
- name: Setup
|
||||
run: |
|
||||
set -a
|
||||
. .env
|
||||
set +a
|
||||
|
||||
go run ./cmd/hatchet-admin quickstart --generated-config-dir ./generated/
|
||||
|
||||
- name: Test
|
||||
run: |
|
||||
export HATCHET_CLIENT_TOKEN="$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)"
|
||||
|
||||
RAMP_UP_DURATION_TIMEOUT=20s go test -tags load ./... -p 1 -v -race -failfast
|
||||
|
||||
- name: Teardown
|
||||
run: docker compose down
|
||||
env:
|
||||
TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }}
|
||||
TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }}
|
||||
TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }}
|
||||
|
||||
@@ -1,18 +1,13 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/cmd/hatchet-admin/cli/seed"
|
||||
"github.com/hatchet-dev/hatchet/pkg/config/loader"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
|
||||
)
|
||||
|
||||
// seedCmd seeds the database with initial data
|
||||
@@ -46,146 +41,5 @@ func runSeed(cf *loader.ConfigLoader) error {
|
||||
|
||||
defer dc.Disconnect() // nolint: errcheck
|
||||
|
||||
shouldSeedUser := dc.Seed.AdminEmail != "" && dc.Seed.AdminPassword != ""
|
||||
var userId string
|
||||
|
||||
if shouldSeedUser {
|
||||
// seed an example user
|
||||
hashedPw, err := repository.HashPassword(dc.Seed.AdminPassword)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
user, err := dc.APIRepository.User().GetUserByEmail(context.Background(), dc.Seed.AdminEmail)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
user, err = dc.APIRepository.User().CreateUser(context.Background(), &repository.CreateUserOpts{
|
||||
Email: dc.Seed.AdminEmail,
|
||||
Name: repository.StringPtr(dc.Seed.AdminName),
|
||||
EmailVerified: repository.BoolPtr(true),
|
||||
Password: hashedPw,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
userId = sqlchelpers.UUIDToStr(user.ID)
|
||||
}
|
||||
|
||||
tenant, err := dc.APIRepository.Tenant().GetTenantBySlug(context.Background(), "default")
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
// seed an example tenant
|
||||
// initialize a tenant
|
||||
sqlcTenant, err := dc.APIRepository.Tenant().CreateTenant(context.Background(), &repository.CreateTenantOpts{
|
||||
ID: &dc.Seed.DefaultTenantID,
|
||||
Name: dc.Seed.DefaultTenantName,
|
||||
Slug: dc.Seed.DefaultTenantSlug,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tenant, err = dc.APIRepository.Tenant().GetTenantByID(context.Background(), sqlchelpers.UUIDToStr(sqlcTenant.ID))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("created tenant", sqlchelpers.UUIDToStr(tenant.ID))
|
||||
|
||||
// add the user to the tenant
|
||||
_, err = dc.APIRepository.Tenant().CreateTenantMember(context.Background(), sqlchelpers.UUIDToStr(tenant.ID), &repository.CreateTenantMemberOpts{
|
||||
Role: "OWNER",
|
||||
UserId: userId,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if dc.Seed.IsDevelopment {
|
||||
err = seedDev(dc.EngineRepository, sqlchelpers.UUIDToStr(tenant.ID))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func seedDev(repo repository.EngineRepository, tenantId string) error {
|
||||
_, err := repo.Workflow().GetWorkflowByName(context.Background(), tenantId, "test-workflow")
|
||||
|
||||
if err != nil {
|
||||
if !errors.Is(err, pgx.ErrNoRows) {
|
||||
return err
|
||||
}
|
||||
|
||||
wf, err := repo.Workflow().CreateNewWorkflow(context.Background(), tenantId, &repository.CreateWorkflowVersionOpts{
|
||||
Name: "test-workflow",
|
||||
Description: repository.StringPtr("This is a test workflow."),
|
||||
Version: repository.StringPtr("v0.1.0"),
|
||||
EventTriggers: []string{
|
||||
"user:create",
|
||||
},
|
||||
Tags: []repository.CreateWorkflowTagOpts{
|
||||
{
|
||||
Name: "Preview",
|
||||
},
|
||||
},
|
||||
Concurrency: &repository.CreateWorkflowConcurrencyOpts{
|
||||
Action: repository.StringPtr("test:concurrency"),
|
||||
},
|
||||
Jobs: []repository.CreateWorkflowJobOpts{
|
||||
{
|
||||
Name: "job-name",
|
||||
Kind: "DEFAULT",
|
||||
Steps: []repository.CreateWorkflowStepOpts{
|
||||
{
|
||||
ReadableId: "echo1",
|
||||
Action: "echo:echo",
|
||||
},
|
||||
{
|
||||
ReadableId: "echo2",
|
||||
Action: "echo:echo",
|
||||
Parents: []string{
|
||||
"echo1",
|
||||
},
|
||||
},
|
||||
{
|
||||
ReadableId: "echo3",
|
||||
Action: "echo:echo",
|
||||
Parents: []string{
|
||||
"echo2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
workflowVersionId := sqlchelpers.UUIDToStr(wf.WorkflowVersion.ID)
|
||||
|
||||
fmt.Println("created workflow version", workflowVersionId)
|
||||
}
|
||||
|
||||
return nil
|
||||
return seed.SeedDatabase(dc)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
package seed
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/config/database"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
|
||||
)
|
||||
|
||||
func SeedDatabase(dc *database.Layer) error {
|
||||
shouldSeedUser := dc.Seed.AdminEmail != "" && dc.Seed.AdminPassword != ""
|
||||
var userID string
|
||||
|
||||
if shouldSeedUser {
|
||||
// seed an example user
|
||||
hashedPw, err := repository.HashPassword(dc.Seed.AdminPassword)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
user, err := dc.APIRepository.User().GetUserByEmail(context.Background(), dc.Seed.AdminEmail)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
user, err = dc.APIRepository.User().CreateUser(context.Background(), &repository.CreateUserOpts{
|
||||
Email: dc.Seed.AdminEmail,
|
||||
Name: repository.StringPtr(dc.Seed.AdminName),
|
||||
EmailVerified: repository.BoolPtr(true),
|
||||
Password: hashedPw,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
userID = sqlchelpers.UUIDToStr(user.ID)
|
||||
}
|
||||
|
||||
_, err := dc.APIRepository.Tenant().GetTenantBySlug(context.Background(), "default")
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
// seed an example tenant
|
||||
// initialize a tenant
|
||||
sqlcTenant, err := dc.APIRepository.Tenant().CreateTenant(context.Background(), &repository.CreateTenantOpts{
|
||||
ID: &dc.Seed.DefaultTenantID,
|
||||
Name: dc.Seed.DefaultTenantName,
|
||||
Slug: dc.Seed.DefaultTenantSlug,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tenant, err := dc.APIRepository.Tenant().GetTenantByID(context.Background(), sqlchelpers.UUIDToStr(sqlcTenant.ID))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("created tenant", sqlchelpers.UUIDToStr(tenant.ID))
|
||||
|
||||
// add the user to the tenant
|
||||
_, err = dc.APIRepository.Tenant().CreateTenantMember(context.Background(), sqlchelpers.UUIDToStr(tenant.ID), &repository.CreateTenantMemberOpts{
|
||||
Role: "OWNER",
|
||||
UserId: userID,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -22,7 +22,26 @@ import (
|
||||
//go:embed migrations/*.sql
|
||||
var embedMigrations embed.FS
|
||||
|
||||
func RunMigrations(ctx context.Context) {
|
||||
type runMigrationsOpt struct {
|
||||
upToPenultimate bool
|
||||
}
|
||||
|
||||
type RunMigrationsOpt func(*runMigrationsOpt)
|
||||
|
||||
func WithUpToPenultimate() RunMigrationsOpt {
|
||||
return func(o *runMigrationsOpt) {
|
||||
o.upToPenultimate = true
|
||||
}
|
||||
}
|
||||
|
||||
func RunMigrations(ctx context.Context, opts ...RunMigrationsOpt) {
|
||||
// Set default options
|
||||
options := &runMigrationsOpt{}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
}
|
||||
|
||||
var db *sql.DB
|
||||
var conn *sql.Conn
|
||||
|
||||
@@ -215,9 +234,32 @@ func RunMigrations(ctx context.Context) {
|
||||
}
|
||||
goose.SetBaseFS(fsys)
|
||||
|
||||
err = goose.Up(db, ".")
|
||||
if err != nil {
|
||||
log.Fatalf("goose: failed to apply migrations: %v", err)
|
||||
switch {
|
||||
case options.upToPenultimate:
|
||||
// Get the second-to-last migration version.
|
||||
migrations, err := listMigrations()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("goose: failed to list migrations: %v", err)
|
||||
}
|
||||
|
||||
if len(migrations) < 2 {
|
||||
log.Fatalf("goose: not enough migrations to roll back to penultimate version")
|
||||
}
|
||||
|
||||
// Get the second-to-last migration version.
|
||||
secondToLastVersion := migrations[len(migrations)-2].Version
|
||||
|
||||
err = goose.UpTo(db, ".", secondToLastVersion)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("goose: failed to apply migrations up to penultimate version: %v", err)
|
||||
}
|
||||
default:
|
||||
err = goose.Up(db, ".")
|
||||
if err != nil {
|
||||
log.Fatalf("goose: failed to apply migrations: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -274,3 +316,13 @@ func parseTableIdentifier(name string) (schema, table string) {
|
||||
}
|
||||
return schema, table
|
||||
}
|
||||
|
||||
// listAllDBVersions returns a list of all migrations, ordered ascending.
|
||||
func listMigrations() (goose.Migrations, error) {
|
||||
var (
|
||||
minVersion = int64(0)
|
||||
maxVersion = int64((1 << 63) - 1)
|
||||
)
|
||||
|
||||
return goose.CollectMigrations(".", minVersion, maxVersion)
|
||||
}
|
||||
|
||||
@@ -3,22 +3,20 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/internal/testutils"
|
||||
"github.com/hatchet-dev/hatchet/pkg/config/shared"
|
||||
"github.com/hatchet-dev/hatchet/pkg/logger"
|
||||
"github.com/hatchet-dev/hatchet/pkg/testing/harness"
|
||||
)
|
||||
|
||||
func TestLoadCLI(t *testing.T) {
|
||||
testutils.Prepare(t)
|
||||
func TestMain(m *testing.M) {
|
||||
harness.RunTestWithEngine(m)
|
||||
}
|
||||
|
||||
func TestLoadCLI(t *testing.T) {
|
||||
type args struct {
|
||||
duration time.Duration
|
||||
eventsPerSecond int
|
||||
@@ -70,18 +68,6 @@ func TestLoadCLI(t *testing.T) {
|
||||
},
|
||||
}}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||
|
||||
setup := sync.WaitGroup{}
|
||||
|
||||
go func() {
|
||||
setup.Add(1)
|
||||
log.Printf("setup start")
|
||||
testutils.SetupEngine(ctx, t)
|
||||
setup.Done()
|
||||
log.Printf("setup end")
|
||||
}()
|
||||
|
||||
// TODO instead of waiting, figure out when the engine setup is complete
|
||||
time.Sleep(15 * time.Second)
|
||||
|
||||
@@ -93,23 +79,5 @@ func TestLoadCLI(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
log.Printf("test complete")
|
||||
setup.Wait()
|
||||
log.Printf("cleanup complete")
|
||||
|
||||
goleak.VerifyNone(
|
||||
t,
|
||||
// worker
|
||||
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
|
||||
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
|
||||
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
|
||||
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
|
||||
// all engine related packages
|
||||
goleak.IgnoreTopFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"),
|
||||
goleak.IgnoreTopFunction("github.com/rabbitmq/amqp091-go.(*Connection).heartbeater"),
|
||||
goleak.IgnoreTopFunction("github.com/rabbitmq/amqp091-go.(*consumers).buffer"),
|
||||
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Server).keepalive"),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -77,6 +77,10 @@ func do(duration time.Duration, startEventsPerSecond, amount int, increase, dela
|
||||
|
||||
time.Sleep(after)
|
||||
|
||||
close(scheduled)
|
||||
close(executed)
|
||||
close(hook)
|
||||
|
||||
log.Printf("✅ success")
|
||||
|
||||
return nil
|
||||
|
||||
@@ -30,6 +30,11 @@ func emit(ctx context.Context, startEventsPerSecond, amount int, increase, durat
|
||||
var eventsPerSecond int
|
||||
go func() {
|
||||
took := <-hook
|
||||
|
||||
if took == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
panic(fmt.Errorf("event took too long to schedule: %s at %d events/s", took, eventsPerSecond))
|
||||
}()
|
||||
for {
|
||||
|
||||
@@ -3,21 +3,21 @@
|
||||
package rampup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/internal/testutils"
|
||||
"github.com/hatchet-dev/hatchet/pkg/config/shared"
|
||||
"github.com/hatchet-dev/hatchet/pkg/logger"
|
||||
"github.com/hatchet-dev/hatchet/pkg/testing/harness"
|
||||
)
|
||||
|
||||
func TestRampUp(t *testing.T) {
|
||||
testutils.Prepare(t)
|
||||
func TestMain(m *testing.M) {
|
||||
harness.RunTestWithEngine(m)
|
||||
}
|
||||
|
||||
func TestRampUp(t *testing.T) {
|
||||
type args struct {
|
||||
duration time.Duration
|
||||
increase time.Duration
|
||||
@@ -76,18 +76,6 @@ func TestRampUp(t *testing.T) {
|
||||
},
|
||||
}}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||
|
||||
setup := sync.WaitGroup{}
|
||||
|
||||
go func() {
|
||||
setup.Add(1)
|
||||
log.Printf("setup start")
|
||||
testutils.SetupEngine(ctx, t)
|
||||
setup.Done()
|
||||
log.Printf("setup end")
|
||||
}()
|
||||
|
||||
// TODO instead of waiting, figure out when the engine setup is complete
|
||||
time.Sleep(15 * time.Second)
|
||||
|
||||
@@ -99,10 +87,4 @@ func TestRampUp(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
log.Printf("test complete")
|
||||
setup.Wait()
|
||||
log.Printf("cleanup complete")
|
||||
}
|
||||
|
||||
@@ -30,6 +30,8 @@ require (
|
||||
github.com/sethvargo/go-retry v0.3.0
|
||||
github.com/spf13/cobra v1.9.1
|
||||
github.com/spf13/viper v1.20.1
|
||||
github.com/testcontainers/testcontainers-go/modules/postgres v0.37.0
|
||||
github.com/testcontainers/testcontainers-go/modules/rabbitmq v0.37.0
|
||||
github.com/tink-crypto/tink-go v0.0.0-20230613075026-d6de17e3f164
|
||||
github.com/tink-crypto/tink-go-gcpkms v0.0.0-20230602082706-31d0d09ccc8d
|
||||
go.opentelemetry.io/otel v1.35.0
|
||||
@@ -53,6 +55,9 @@ require (
|
||||
cloud.google.com/go/auth v0.16.0 // indirect
|
||||
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
|
||||
cloud.google.com/go/compute/metadata v0.6.0 // indirect
|
||||
dario.cat/mergo v1.0.1 // indirect
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
||||
github.com/Microsoft/go-winio v0.6.2 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
|
||||
github.com/bahlo/generic-list-go v0.2.0 // indirect
|
||||
@@ -60,12 +65,21 @@ require (
|
||||
github.com/buger/jsonparser v1.1.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/containerd/log v0.1.0 // indirect
|
||||
github.com/containerd/platforms v0.2.1 // indirect
|
||||
github.com/cpuguy83/dockercfg v0.3.2 // indirect
|
||||
github.com/distribution/reference v0.6.0 // indirect
|
||||
github.com/docker/docker v28.0.1+incompatible // indirect
|
||||
github.com/docker/go-connections v0.5.0 // indirect
|
||||
github.com/docker/go-units v0.5.0 // indirect
|
||||
github.com/ebitengine/purego v0.8.2 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||
github.com/go-openapi/swag v0.23.0 // indirect
|
||||
@@ -88,29 +102,47 @@ require (
|
||||
github.com/jonboulle/clockwork v0.5.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/labstack/gommon v0.4.2 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
|
||||
github.com/magiconair/properties v1.8.10 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mfridman/interpolate v0.0.2 // indirect
|
||||
github.com/moby/docker-image-spec v1.3.1 // indirect
|
||||
github.com/moby/patternmatcher v0.6.0 // indirect
|
||||
github.com/moby/sys/sequential v0.5.0 // indirect
|
||||
github.com/moby/sys/user v0.1.0 // indirect
|
||||
github.com/moby/sys/userns v0.1.0 // indirect
|
||||
github.com/moby/term v0.5.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect
|
||||
github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect
|
||||
github.com/opencontainers/image-spec v1.1.1 // indirect
|
||||
github.com/perimeterx/marshmallow v1.1.5 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.62.0 // indirect
|
||||
github.com/prometheus/procfs v0.16.0 // indirect
|
||||
github.com/sagikazarmark/locafero v0.7.0 // indirect
|
||||
github.com/shirou/gopsutil/v4 v4.25.1 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
github.com/sourcegraph/conc v0.3.0 // indirect
|
||||
github.com/stoewer/go-strcase v1.3.0 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/testcontainers/testcontainers-go v0.37.0 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.12 // indirect
|
||||
github.com/tklauser/numcpus v0.6.1 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasttemplate v1.2.2 // indirect
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
||||
|
||||
@@ -6,8 +6,16 @@ cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIi
|
||||
cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c=
|
||||
cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4j01OwKxG9I=
|
||||
cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg=
|
||||
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
|
||||
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
||||
github.com/Masterminds/semver/v3 v3.3.1 h1:QtNSWtVZ3nBfk8mAOu/B6v7FMJ+NHTIgUPi7rj+4nv4=
|
||||
github.com/Masterminds/semver/v3 v3.3.1/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
|
||||
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
||||
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
|
||||
@@ -24,17 +32,35 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
|
||||
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
|
||||
github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A=
|
||||
github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw=
|
||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA=
|
||||
github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
|
||||
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
|
||||
github.com/creasty/defaults v1.8.0 h1:z27FJxCAa0JKt3utc0sCImAEb+spPucmKoOdLHvHYKk=
|
||||
github.com/creasty/defaults v1.8.0/go.mod h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
|
||||
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||
github.com/docker/docker v28.0.1+incompatible h1:FCHjSRdXhNRFjlHMTv4jUNlIBbTeRjrWfeFuJp7jpo0=
|
||||
github.com/docker/docker v28.0.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
|
||||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/ebitengine/purego v0.8.2 h1:jPPGWs2sZ1UgOSgD2bClL0MJIqu58nOmIcBuXr62z1I=
|
||||
github.com/ebitengine/purego v0.8.2/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
|
||||
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/exaring/otelpgx v0.9.0 h1:Bo0RIhBNrzLlVzih46qBy/KQRvRs9vwRbgT/fE363NM=
|
||||
@@ -66,6 +92,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
|
||||
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
|
||||
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
|
||||
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
|
||||
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
|
||||
@@ -101,6 +129,7 @@ github.com/google/cel-go v0.25.0/go.mod h1:hjEb6r5SuOSlhCHmFoLzu8HGCERvIsDAbxDAy
|
||||
github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw=
|
||||
github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw=
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
@@ -190,6 +219,12 @@ github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0
|
||||
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
|
||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
||||
github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE=
|
||||
github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
@@ -198,10 +233,24 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
|
||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mdelapenya/tlscert v0.2.0 h1:7H81W6Z/4weDvZBNOfQte5GpIMo0lGYEeWbkGp5LJHI=
|
||||
github.com/mdelapenya/tlscert v0.2.0/go.mod h1:O4njj3ELLnJjGdkN7M/vIVCpZ+Cf0L6muqOG4tLSl8o=
|
||||
github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY=
|
||||
github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg=
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
|
||||
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
|
||||
github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=
|
||||
github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc=
|
||||
github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc=
|
||||
github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo=
|
||||
github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg=
|
||||
github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU=
|
||||
github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g=
|
||||
github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28=
|
||||
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
|
||||
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
@@ -209,6 +258,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
|
||||
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||
@@ -225,6 +276,8 @@ github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4=
|
||||
github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
|
||||
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
|
||||
github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M=
|
||||
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
|
||||
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
|
||||
github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s=
|
||||
@@ -238,6 +291,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/posthog/posthog-go v1.4.10 h1:rpCRxxe2a4UPq9VM7rANRNRFZk0w/5To4mhYvNK9ipU=
|
||||
github.com/posthog/posthog-go v1.4.10/go.mod h1:uYC2l1Yktc8E+9FAHJ9QZG4vQf/NHJPD800Hsm7DzoM=
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
||||
github.com/pressly/goose/v3 v3.24.2 h1:c/ie0Gm8rnIVKvnDQ/scHErv46jrDv9b4I0WRcFJzYU=
|
||||
github.com/pressly/goose/v3 v3.24.2/go.mod h1:kjefwFB0eR4w30Td2Gj2Mznyw94vSP+2jJYkOVNbD1k=
|
||||
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
|
||||
@@ -264,6 +319,10 @@ github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsF
|
||||
github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k=
|
||||
github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE=
|
||||
github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas=
|
||||
github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs=
|
||||
github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/slack-go/slack v0.16.0 h1:khp/WCFv+Hb/B/AJaAwvcxKun0hM6grN0bUZ8xG60P8=
|
||||
github.com/slack-go/slack v0.16.0/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw=
|
||||
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
|
||||
@@ -296,10 +355,20 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
||||
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
||||
github.com/testcontainers/testcontainers-go v0.37.0 h1:L2Qc0vkTw2EHWQ08djon0D2uw7Z/PtHS/QzZZ5Ra/hg=
|
||||
github.com/testcontainers/testcontainers-go v0.37.0/go.mod h1:QPzbxZhQ6Bclip9igjLFj6z0hs01bU8lrl2dHQmgFGM=
|
||||
github.com/testcontainers/testcontainers-go/modules/postgres v0.37.0 h1:hsVwFkS6s+79MbKEO+W7A1wNIw1fmkMtF4fg83m6kbc=
|
||||
github.com/testcontainers/testcontainers-go/modules/postgres v0.37.0/go.mod h1:Qj/eGbRbO/rEYdcRLmN+bEojzatP/+NS1y8ojl2PQsc=
|
||||
github.com/testcontainers/testcontainers-go/modules/rabbitmq v0.37.0 h1:JiPjs8fV3qpHWDKyNEhA4Phtjwduj/bgd14Ltz9fzy0=
|
||||
github.com/testcontainers/testcontainers-go/modules/rabbitmq v0.37.0/go.mod h1:5tThy7LY0XMUQCR72cWfqPstL3lxBiG6GVYRhwbj8ZQ=
|
||||
github.com/tink-crypto/tink-go v0.0.0-20230613075026-d6de17e3f164 h1:yhVO0Yhq84FjdcotvFFvDJRNHJ7mO743G12VdcW4Evc=
|
||||
github.com/tink-crypto/tink-go v0.0.0-20230613075026-d6de17e3f164/go.mod h1:HhtDVdE/PRZFRia834tkmcwuscnaAzda1RJUW9Pr3Rg=
|
||||
github.com/tink-crypto/tink-go-gcpkms v0.0.0-20230602082706-31d0d09ccc8d h1:+In5BwTMe2nF3FC6LrYqg71jDyaOOMZ4EQBFUhFq23g=
|
||||
github.com/tink-crypto/tink-go-gcpkms v0.0.0-20230602082706-31d0d09ccc8d/go.mod h1:TXKMH7TDt0h7QXtI9TdYPyly6xZL+ooPpbw30qekmEc=
|
||||
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
|
||||
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
|
||||
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
|
||||
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
|
||||
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
|
||||
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
@@ -312,6 +381,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
|
||||
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU=
|
||||
@@ -322,6 +393,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0f
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0 h1:m639+BofXTvcY1q8CGs4ItwQarYtJPOWmVobfM1HpVI=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0/go.mod h1:LjReUci/F4BUyv+y4dwnq3h/26iNOeC3wAIqgvTIZVo=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU=
|
||||
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
|
||||
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
|
||||
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
|
||||
@@ -360,9 +433,15 @@ golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
|
||||
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
|
||||
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
@@ -404,6 +483,8 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
||||
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
||||
k8s.io/api v0.33.0 h1:yTgZVn1XEe6opVpP1FylmNrIFWuDqe2H0V8CT5gxfIU=
|
||||
k8s.io/api v0.33.0/go.mod h1:CTO61ECK/KU7haa3qq8sarQ0biLq2ju405IZAd9zsiM=
|
||||
k8s.io/apimachinery v0.33.0 h1:1a6kHrJxb2hs4t8EE5wuR/WxKDwGN1FKH3JvDtA0CIQ=
|
||||
|
||||
@@ -21,14 +21,25 @@ type MQPubBuffer struct {
|
||||
|
||||
// buffers is keyed on (tenantId, msgId) and contains a buffer of messages for that tenantId and msgId.
|
||||
buffers sync.Map
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewMQPubBuffer(mq MessageQueue) *MQPubBuffer {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &MQPubBuffer{
|
||||
mq: mq,
|
||||
mq: mq,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MQPubBuffer) Stop() {
|
||||
m.cancel()
|
||||
}
|
||||
|
||||
type msgWithErrCh struct {
|
||||
msg *Message
|
||||
errCh chan error
|
||||
@@ -44,7 +55,7 @@ func (m *MQPubBuffer) Pub(ctx context.Context, queue Queue, msg *Message, wait b
|
||||
buf, ok := m.buffers.Load(k)
|
||||
|
||||
if !ok {
|
||||
buf, _ = m.buffers.LoadOrStore(k, newMsgIdPubBuffer(msg.TenantID, msg.ID, func(msg *Message) error {
|
||||
buf, _ = m.buffers.LoadOrStore(k, newMsgIDPubBuffer(m.ctx, msg.TenantID, msg.ID, func(msg *Message) error {
|
||||
msgCtx, cancel := context.WithTimeout(context.Background(), PUB_TIMEOUT)
|
||||
defer cancel()
|
||||
|
||||
@@ -90,10 +101,10 @@ type msgIdPubBuffer struct {
|
||||
serialize func(t any) ([]byte, error)
|
||||
}
|
||||
|
||||
func newMsgIdPubBuffer(tenantId, msgId string, pub PubFunc) *msgIdPubBuffer {
|
||||
func newMsgIDPubBuffer(ctx context.Context, tenantID, msgID string, pub PubFunc) *msgIdPubBuffer {
|
||||
b := &msgIdPubBuffer{
|
||||
tenantId: tenantId,
|
||||
msgId: msgId,
|
||||
tenantId: tenantID,
|
||||
msgId: msgID,
|
||||
msgIdPubBufferCh: make(chan *msgWithErrCh, PUB_BUFFER_SIZE),
|
||||
notifier: make(chan struct{}),
|
||||
pub: pub,
|
||||
@@ -101,22 +112,20 @@ func newMsgIdPubBuffer(tenantId, msgId string, pub PubFunc) *msgIdPubBuffer {
|
||||
semaphore: make(chan struct{}, PUB_MAX_CONCURRENCY),
|
||||
}
|
||||
|
||||
err := b.startFlusher()
|
||||
|
||||
if err != nil {
|
||||
// TODO: remove panic
|
||||
panic(err)
|
||||
}
|
||||
b.startFlusher(ctx)
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (m *msgIdPubBuffer) startFlusher() error {
|
||||
func (m *msgIdPubBuffer) startFlusher(ctx context.Context) {
|
||||
ticker := time.NewTicker(PUB_FLUSH_INTERVAL)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
m.flush()
|
||||
return
|
||||
case <-ticker.C:
|
||||
go m.flush()
|
||||
case <-m.notifier:
|
||||
@@ -124,8 +133,6 @@ func (m *msgIdPubBuffer) startFlusher() error {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *msgIdPubBuffer) flush() {
|
||||
|
||||
@@ -185,7 +185,7 @@ func (m *MQSubBuffer) handleMsg(ctx context.Context, msg *Message) error {
|
||||
buf, ok := m.buffers.Load(k)
|
||||
|
||||
if !ok {
|
||||
buf, _ = m.buffers.LoadOrStore(k, newMsgIdBuffer(msg.TenantID, msg.ID, m.dst, m.flushInterval, m.bufferSize, m.maxConcurrency, m.disableImmediateFlush))
|
||||
buf, _ = m.buffers.LoadOrStore(k, newMsgIDBuffer(ctx, msg.TenantID, msg.ID, m.dst, m.flushInterval, m.bufferSize, m.maxConcurrency, m.disableImmediateFlush))
|
||||
}
|
||||
|
||||
// this places some backpressure on the consumer if buffers are full
|
||||
@@ -226,10 +226,10 @@ type msgIdBuffer struct {
|
||||
flushInterval time.Duration
|
||||
}
|
||||
|
||||
func newMsgIdBuffer(tenantId, msgId string, dst DstFunc, flushInterval time.Duration, bufferSize, maxConcurrency int, disableImmediateFlush bool) *msgIdBuffer {
|
||||
func newMsgIDBuffer(ctx context.Context, tenantID, msgID string, dst DstFunc, flushInterval time.Duration, bufferSize, maxConcurrency int, disableImmediateFlush bool) *msgIdBuffer {
|
||||
b := &msgIdBuffer{
|
||||
tenantId: tenantId,
|
||||
msgId: msgId,
|
||||
tenantId: tenantID,
|
||||
msgId: msgID,
|
||||
msgIdBufferCh: make(chan *msgWithResultCh, bufferSize),
|
||||
notifier: make(chan struct{}),
|
||||
dst: dst,
|
||||
@@ -238,22 +238,19 @@ func newMsgIdBuffer(tenantId, msgId string, dst DstFunc, flushInterval time.Dura
|
||||
flushInterval: flushInterval,
|
||||
}
|
||||
|
||||
err := b.startFlusher()
|
||||
|
||||
if err != nil {
|
||||
// TODO: remove panic
|
||||
panic(err)
|
||||
}
|
||||
b.startFlusher(ctx)
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (m *msgIdBuffer) startFlusher() error {
|
||||
func (m *msgIdBuffer) startFlusher(ctx context.Context) {
|
||||
ticker := time.NewTicker(m.flushInterval)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
go m.flush()
|
||||
case <-m.notifier:
|
||||
@@ -263,8 +260,6 @@ func (m *msgIdBuffer) startFlusher() error {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *msgIdBuffer) flush() {
|
||||
|
||||
@@ -289,6 +289,8 @@ func (tc *TasksControllerImpl) Start() (func() error, error) {
|
||||
return err
|
||||
}
|
||||
|
||||
tc.pubBuffer.Stop()
|
||||
|
||||
if err := tc.s.Shutdown(); err != nil {
|
||||
return fmt.Errorf("could not shutdown scheduler: %w", err)
|
||||
}
|
||||
|
||||
@@ -370,6 +370,8 @@ func (d *DispatcherImpl) Start() (func() error, error) {
|
||||
|
||||
wg.Wait()
|
||||
|
||||
d.pubBuffer.Stop()
|
||||
|
||||
// drain the existing connections
|
||||
d.l.Debug().Msg("draining existing connections")
|
||||
|
||||
|
||||
@@ -244,7 +244,7 @@ func (s *Scheduler) Start() (func() error, error) {
|
||||
}
|
||||
|
||||
go func(results *v1.QueueResults) {
|
||||
err = s.scheduleStepRuns(ctx, sqlchelpers.UUIDToStr(results.TenantId), results)
|
||||
err := s.scheduleStepRuns(ctx, sqlchelpers.UUIDToStr(results.TenantId), results)
|
||||
|
||||
if err != nil {
|
||||
s.l.Error().Err(err).Msg("could not schedule step runs")
|
||||
@@ -286,6 +286,8 @@ func (s *Scheduler) Start() (func() error, error) {
|
||||
|
||||
wg.Wait()
|
||||
|
||||
s.pubBuffer.Stop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,17 @@ func (m *messageQueueRepository) Listen(ctx context.Context, name string, f func
|
||||
|
||||
l.Handle(name, handler)
|
||||
|
||||
return l.Listen(ctx)
|
||||
err := l.Listen(ctx)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *messageQueueRepository) Notify(ctx context.Context, name string, payload string) error {
|
||||
|
||||
@@ -0,0 +1,366 @@
|
||||
package harness
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/testcontainers/testcontainers-go/modules/postgres"
|
||||
"github.com/testcontainers/testcontainers-go/modules/rabbitmq"
|
||||
"go.uber.org/goleak"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/cmd/hatchet-admin/cli/seed"
|
||||
"github.com/hatchet-dev/hatchet/cmd/hatchet-engine/engine"
|
||||
"github.com/hatchet-dev/hatchet/cmd/hatchet-migrate/migrate"
|
||||
"github.com/hatchet-dev/hatchet/pkg/config/database"
|
||||
"github.com/hatchet-dev/hatchet/pkg/config/loader"
|
||||
"github.com/hatchet-dev/hatchet/pkg/encryption"
|
||||
"github.com/hatchet-dev/hatchet/pkg/random"
|
||||
)
|
||||
|
||||
func getEnvConfig() (string, bool, string) {
|
||||
// Get migration strategy: penultimate or latest
|
||||
migrateStrategy := os.Getenv("TESTING_MATRIX_MIGRATE")
|
||||
if migrateStrategy == "" {
|
||||
migrateStrategy = "latest" // Default value
|
||||
}
|
||||
|
||||
// Get RabbitMQ enabled status
|
||||
rabbitmqEnabled := strings.ToLower(os.Getenv("TESTING_MATRIX_RABBITMQ_ENABLED")) == "true"
|
||||
|
||||
// Get PostgreSQL version
|
||||
pgVersion := os.Getenv("TESTING_MATRIX_PG_VERSION")
|
||||
if pgVersion == "" {
|
||||
pgVersion = "16-alpine" // Default value
|
||||
}
|
||||
|
||||
return migrateStrategy, rabbitmqEnabled, pgVersion
|
||||
}
|
||||
|
||||
func RunTestWithEngine(m *testing.M) {
|
||||
// This runs before all tests
|
||||
cleanup := startEngine()
|
||||
|
||||
// Run the tests
|
||||
exitCode := m.Run()
|
||||
|
||||
// This runs after all tests
|
||||
cleanup()
|
||||
|
||||
// allow a bit of time for the engine to shut down
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
if exitCode == 0 {
|
||||
if err := goleak.Find(
|
||||
goleak.IgnoreTopFunction("github.com/testcontainers/testcontainers-go.(*Reaper).connect.func1"),
|
||||
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
|
||||
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
|
||||
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
|
||||
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
|
||||
// all engine related packages
|
||||
goleak.IgnoreTopFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"),
|
||||
goleak.IgnoreTopFunction("github.com/rabbitmq/amqp091-go.(*Connection).heartbeater"),
|
||||
goleak.IgnoreTopFunction("github.com/rabbitmq/amqp091-go.(*consumers).buffer"),
|
||||
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Server).keepalive"),
|
||||
); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "goleak: Errors on successful test run: %v\n", err)
|
||||
exitCode = 1
|
||||
}
|
||||
}
|
||||
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
||||
func startEngine() func() {
|
||||
setTestingKeysInEnv()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Get configuration values from environment
|
||||
migrateStrategy, rabbitmqEnabled, pgVersion := getEnvConfig()
|
||||
|
||||
log.Printf("Starting engine with migration strategy: %s, RabbitMQ enabled: %t, PostgreSQL version: %s", migrateStrategy, rabbitmqEnabled, pgVersion)
|
||||
|
||||
postgresConnStr, cleanupPostgres := startPostgres(ctx, pgVersion)
|
||||
|
||||
os.Setenv("DATABASE_URL", postgresConnStr)
|
||||
os.Setenv("SERVER_GRPC_INSECURE", "true")
|
||||
os.Setenv("HATCHET_CLIENT_TLS_STRATEGY", "none")
|
||||
os.Setenv("SERVER_AUTH_COOKIE_DOMAIN", "app.dev.hatchet-tools.com")
|
||||
os.Setenv("SERVER_LOGGER_LEVEL", "error")
|
||||
os.Setenv("SERVER_LOGGER_FORMAT", "console")
|
||||
os.Setenv("DATABASE_LOGGER_LEVEL", "error")
|
||||
os.Setenv("DATABASE_LOGGER_FORMAT", "console")
|
||||
os.Setenv("SERVER_ADDITIONAL_LOGGERS_QUEUE_LEVEL", "error")
|
||||
os.Setenv("SERVER_ADDITIONAL_LOGGERS_QUEUE_FORMAT", "console")
|
||||
os.Setenv("SERVER_ADDITIONAL_LOGGERS_PGXSTATS_LEVEL", "error")
|
||||
os.Setenv("SERVER_ADDITIONAL_LOGGERS_PGXSTATS_FORMAT", "console")
|
||||
os.Setenv("SERVER_DEFAULT_ENGINE_VERSION", "V1")
|
||||
|
||||
var cleanupRabbitMQ func() error
|
||||
if rabbitmqEnabled {
|
||||
rabbitMQConnStr, rabbitMQCleanup := startRabbitMQ(ctx)
|
||||
os.Setenv("SERVER_MSGQUEUE_KIND", "rabbitmq")
|
||||
os.Setenv("SERVER_MSGQUEUE_RABBITMQ_URL", rabbitMQConnStr)
|
||||
cleanupRabbitMQ = rabbitMQCleanup
|
||||
} else {
|
||||
os.Setenv("SERVER_MSGQUEUE_KIND", "postgres")
|
||||
cleanupRabbitMQ = func() error { return nil }
|
||||
}
|
||||
|
||||
// Run migrations
|
||||
if migrateStrategy == "penultimate" {
|
||||
migrate.RunMigrations(ctx, migrate.WithUpToPenultimate())
|
||||
} else {
|
||||
migrate.RunMigrations(ctx)
|
||||
}
|
||||
|
||||
cf := loader.NewConfigLoader("")
|
||||
|
||||
dl, err := cf.InitDataLayer()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed to initialize data layer: %v", err)
|
||||
}
|
||||
|
||||
// seed database
|
||||
seedDatabase(dl)
|
||||
|
||||
if err := dl.Disconnect(); err != nil {
|
||||
log.Fatalf("failed to disconnect data layer: %v", err)
|
||||
}
|
||||
|
||||
// set the API token
|
||||
setAPIToken(ctx, cf, dl.Seed.DefaultTenantID)
|
||||
|
||||
engineCh := make(chan error)
|
||||
|
||||
go func() {
|
||||
engineCh <- engine.Run(ctx, cf, "testing")
|
||||
}()
|
||||
|
||||
// Return a cleanup function that properly handles shutdown
|
||||
return func() {
|
||||
cancel()
|
||||
|
||||
err := <-engineCh
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed to run engine: %v", err)
|
||||
}
|
||||
|
||||
err = cleanupPostgres()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed to cleanup postgres: %v", err)
|
||||
}
|
||||
|
||||
if rabbitmqEnabled {
|
||||
err = cleanupRabbitMQ()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed to cleanup rabbitmq: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startPostgres(ctx context.Context, pgVersion string) (string, func() error) {
|
||||
postgresContainer, err := postgres.Run(
|
||||
ctx,
|
||||
fmt.Sprintf("postgres:%s", pgVersion),
|
||||
postgres.WithDatabase("test"),
|
||||
postgres.WithUsername("user"),
|
||||
postgres.WithPassword("password"),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start postgres container: %v", err)
|
||||
}
|
||||
|
||||
connStr, err := postgresContainer.ConnectionString(ctx, "sslmode=disable")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get connection string: %v", err)
|
||||
}
|
||||
|
||||
// loop until the database is ready
|
||||
for i := 0; i < 10; i++ {
|
||||
var db *pgx.Conn
|
||||
db, err = pgx.Connect(ctx, connStr)
|
||||
|
||||
if err != nil {
|
||||
time.Sleep(time.Second * 2)
|
||||
continue
|
||||
}
|
||||
|
||||
// make sure we can ping the database
|
||||
err = db.Ping(ctx)
|
||||
|
||||
if err != nil {
|
||||
time.Sleep(time.Second * 2)
|
||||
continue
|
||||
}
|
||||
|
||||
db.Close(ctx)
|
||||
|
||||
return connStr, func() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
if err := postgresContainer.Terminate(ctx); err != nil {
|
||||
return fmt.Errorf("failed to terminate postgres container: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
log.Fatalf("failed to connect to postgres container after 10 attempts: %v", err)
|
||||
|
||||
// this should never be reached
|
||||
return "", func() error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func startRabbitMQ(ctx context.Context) (string, func() error) {
|
||||
rabbitContainer, err := rabbitmq.Run(
|
||||
ctx,
|
||||
"rabbitmq:3-management-alpine",
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start rabbitmq container: %v", err)
|
||||
}
|
||||
|
||||
// Get the connection URL for RabbitMQ
|
||||
amqpURI, err := rabbitContainer.AmqpURL(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get AMQP URL: %v", err)
|
||||
}
|
||||
|
||||
// loop until RabbitMQ is ready
|
||||
for i := 0; i < 10; i++ {
|
||||
var conn *amqp.Connection
|
||||
conn, err = amqp.Dial(amqpURI)
|
||||
|
||||
if err != nil {
|
||||
time.Sleep(time.Second * 2)
|
||||
continue
|
||||
}
|
||||
|
||||
// make sure we can create a channel
|
||||
var ch *amqp.Channel
|
||||
ch, err = conn.Channel()
|
||||
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
time.Sleep(time.Second * 2)
|
||||
continue
|
||||
}
|
||||
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
|
||||
return amqpURI, func() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
if err := rabbitContainer.Terminate(ctx); err != nil {
|
||||
return fmt.Errorf("failed to terminate rabbitmq container: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
log.Fatalf("failed to connect to rabbitmq container after 10 attempts: %v", err)
|
||||
|
||||
// this should never be reached
|
||||
return "", func() error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func seedDatabase(dc *database.Layer) {
|
||||
log.Printf("Seeding database")
|
||||
|
||||
err := seed.SeedDatabase(dc)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("could not seed database: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Seeding database complete")
|
||||
}
|
||||
|
||||
func setAPIToken(ctx context.Context, cf *loader.ConfigLoader, tenantID string) {
|
||||
log.Printf("Generating API token for Hatchet server")
|
||||
|
||||
cleanup, server, err := cf.CreateServerFromConfig("testing")
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("could not create server config: %v", err)
|
||||
}
|
||||
|
||||
expiresAt := time.Now().Add(time.Hour * 24 * 30)
|
||||
|
||||
defaultTok, err := server.Auth.JWTManager.GenerateTenantToken(
|
||||
ctx,
|
||||
tenantID,
|
||||
"testing",
|
||||
false,
|
||||
&expiresAt,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("could not generate token: %v", err)
|
||||
}
|
||||
|
||||
err = cleanup()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("could not cleanup server: %v", err)
|
||||
}
|
||||
|
||||
err = server.Disconnect()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("could not disconnect server: %v", err)
|
||||
}
|
||||
|
||||
os.Setenv("HATCHET_CLIENT_TOKEN", defaultTok.Token)
|
||||
|
||||
log.Printf("Generated API token for tenant %s", tenantID)
|
||||
}
|
||||
|
||||
func setTestingKeysInEnv() {
|
||||
log.Println("Generating encryption keys for Hatchet server")
|
||||
|
||||
cookieHashKey, err := random.Generate(16)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("could not generate hash key for instance: %v", err)
|
||||
}
|
||||
|
||||
cookieBlockKey, err := random.Generate(16)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("could not generate block key for instance: %v", err)
|
||||
}
|
||||
|
||||
_ = os.Setenv("SERVER_AUTH_COOKIE_SECRETS", fmt.Sprintf("%s %s", cookieHashKey, cookieBlockKey))
|
||||
|
||||
masterKeyBytes, privateEc256, publicEc256, err := encryption.GenerateLocalKeys()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("could not generate local keys: %v", err)
|
||||
}
|
||||
|
||||
_ = os.Setenv("SERVER_ENCRYPTION_MASTER_KEYSET", string(masterKeyBytes))
|
||||
_ = os.Setenv("SERVER_ENCRYPTION_JWT_PRIVATE_KEYSET", string(privateEc256))
|
||||
_ = os.Setenv("SERVER_ENCRYPTION_JWT_PUBLIC_KEYSET", string(publicEc256))
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
//go:build load
|
||||
|
||||
package harness
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
RunTestWithEngine(m)
|
||||
}
|
||||
|
||||
// Tests that the engine starts up and shuts down correctly without leaking
|
||||
// goroutines.
|
||||
func TestStartupShutdown(t *testing.T) {
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
Reference in New Issue
Block a user