Enable loadtest with PgBouncer (#3143)

* enable loadtest with pgbouncer

* use default_query_exec_mode=cache_describe for pgbouncer

* increase threshold

* make sure no regression
This commit is contained in:
Mohammed Nafees
2026-03-03 15:24:30 +01:00
committed by GitHub
parent 101d6d5cbf
commit e72051877e
2 changed files with 174 additions and 4 deletions
+44
View File
@@ -339,6 +339,50 @@ jobs:
TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }}
TESTING_MATRIX_OPTIMISTIC_SCHEDULING: ${{ matrix.optimistic-scheduling }}
load-pgbouncer:
runs-on: ubicloud-standard-8
timeout-minutes: 30
strategy:
matrix:
migrate-strategy: ["latest"]
rabbitmq-enabled: ["true"]
pg-version: ["17-alpine"]
optimistic-scheduling: ["true", "false"]
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install Task
uses: arduino/setup-task@b91d5d2c96a56797b48ac1e0e89220bf64044611 # v2.0.0
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
- name: Setup pnpm
uses: pnpm/action-setup@41ff72655975bd51cab0327fa583b6e92b6d3061 # v4.2.0
with:
version: 10.16.1
run_install: false
- name: Go deps
run: go mod download
- name: Test
run: |
# Disable gzip compression for load tests to reduce CPU overhead
# Compression adds overhead without benefit for 0kb payloads
HATCHET_CLIENT_DISABLE_GZIP_COMPRESSION=true go test -tags load ./... -p 5 -v -race -failfast -timeout 20m
env:
TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }}
TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }}
TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }}
TESTING_MATRIX_OPTIMISTIC_SCHEDULING: ${{ matrix.optimistic-scheduling }}
TESTING_MATRIX_PGBOUNCER_ENABLED: "true"
load-online-migrate:
runs-on: ubicloud-standard-8
timeout-minutes: 30
+130 -4
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net"
"net/url"
"os"
"strconv"
"strings"
@@ -29,7 +30,7 @@ import (
"github.com/hatchet-dev/hatchet/pkg/random"
)
func getEnvConfig() (string, bool, string, bool) {
func getEnvConfig() (string, bool, string, bool, bool) {
// Get migration strategy: penultimate or latest
migrateStrategy := os.Getenv("TESTING_MATRIX_MIGRATE")
if migrateStrategy == "" {
@@ -48,7 +49,10 @@ func getEnvConfig() (string, bool, string, bool) {
// Get whether optimistic scheduling is enabled
isOptimistic := strings.ToLower(os.Getenv("TESTING_MATRIX_OPTIMISTIC_SCHEDULING")) == "true"
return migrateStrategy, rabbitmqEnabled, pgVersion, isOptimistic
// Get whether PgBouncer connection pooling is enabled
pgBouncerEnabled := strings.ToLower(os.Getenv("TESTING_MATRIX_PGBOUNCER_ENABLED")) == "true"
return migrateStrategy, rabbitmqEnabled, pgVersion, isOptimistic, pgBouncerEnabled
}
func RunTestWithEngine(m *testing.M) {
@@ -96,18 +100,34 @@ func startEngine() func() {
ctx, cancel := context.WithCancel(context.Background())
// Get configuration values from environment
migrateStrategy, rabbitmqEnabled, pgVersion, isOptimistic := getEnvConfig()
migrateStrategy, rabbitmqEnabled, pgVersion, isOptimistic, pgBouncerEnabled := getEnvConfig()
log.Printf("Starting engine with migration strategy: %s, RabbitMQ enabled: %t, PostgreSQL version: %s", migrateStrategy, rabbitmqEnabled, pgVersion)
log.Printf("Starting engine with migration strategy: %s, RabbitMQ enabled: %t, PostgreSQL version: %s, PgBouncer enabled: %t", migrateStrategy, rabbitmqEnabled, pgVersion, pgBouncerEnabled)
postgresConnStr, cleanupPostgres := startPostgres(ctx, pgVersion)
// Start PgBouncer if enabled, but don't use it yet (migrations and seeding go directly to postgres)
var pgBouncerConnStr string
var cleanupPgBouncer func() error
if pgBouncerEnabled {
pgPort, err := extractPort(postgresConnStr)
if err != nil {
log.Fatalf("failed to extract postgres port from connection string: %v", err)
}
pgBouncerConnStr, cleanupPgBouncer = startPgBouncer(ctx, pgPort)
} else {
cleanupPgBouncer = func() error { return nil }
}
grpcPort, err := findAvailablePort(7077)
if err != nil {
log.Fatalf("failed to find available port: %v", err)
}
// Use postgres directly for migrations and seeding
os.Setenv("DATABASE_URL", postgresConnStr)
os.Setenv("SERVER_GRPC_INSECURE", "true")
os.Setenv("SERVER_GRPC_PORT", strconv.Itoa(grpcPort))
@@ -175,6 +195,12 @@ func startEngine() func() {
}
setAPIToken(ctx, cf, tenantUUID)
// Switch to PgBouncer for the engine if enabled
if pgBouncerEnabled {
log.Printf("Switching DATABASE_URL to PgBouncer: %s", pgBouncerConnStr)
os.Setenv("DATABASE_URL", pgBouncerConnStr)
}
engineCh := make(chan error)
go func() {
@@ -191,6 +217,14 @@ func startEngine() func() {
log.Fatalf("failed to run engine: %v", err)
}
if pgBouncerEnabled {
err = cleanupPgBouncer()
if err != nil {
log.Fatalf("failed to cleanup pgbouncer: %v", err)
}
}
err = cleanupPostgres()
if err != nil {
@@ -327,6 +361,98 @@ func startRabbitMQ(ctx context.Context) (string, func() error) {
}
}
func extractPort(connStr string) (int, error) {
u, err := url.Parse(connStr)
if err != nil {
return 0, fmt.Errorf("failed to parse connection string: %w", err)
}
port, err := strconv.Atoi(u.Port())
if err != nil {
return 0, fmt.Errorf("failed to parse port from connection string: %w", err)
}
return port, nil
}
func startPgBouncer(ctx context.Context, pgPort int) (string, func() error) {
pgBouncerContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "edoburu/pgbouncer:v1.25.1-p0",
ExposedPorts: []string{"5432/tcp"},
HostAccessPorts: []int{pgPort},
Env: map[string]string{
"DATABASE_URL": fmt.Sprintf("postgres://user:password@host.testcontainers.internal:%d/test", pgPort),
"POOL_MODE": "transaction",
"MAX_CLIENT_CONN": "500",
"DEFAULT_POOL_SIZE": "50",
"AUTH_TYPE": "scram-sha-256",
"MAX_PREPARED_STATEMENTS": "256",
},
},
Started: true,
})
if err != nil {
log.Fatalf("failed to start pgbouncer container: %v", err)
}
host, err := pgBouncerContainer.Host(ctx)
if err != nil {
log.Fatalf("failed to get pgbouncer host: %v", err)
}
mappedPort, err := pgBouncerContainer.MappedPort(ctx, "5432/tcp")
if err != nil {
log.Fatalf("failed to get pgbouncer mapped port: %v", err)
}
connStr := fmt.Sprintf(
"postgresql://user:password@%s:%s/test?sslmode=disable",
host, mappedPort.Port(),
)
// loop until pgbouncer is ready
for i := 0; i < 10; i++ {
var db *pgx.Conn
db, err = pgx.Connect(ctx, connStr)
if err != nil {
time.Sleep(time.Second * 2)
continue
}
err = db.Ping(ctx)
if err != nil {
db.Close(ctx)
time.Sleep(time.Second * 2)
continue
}
db.Close(ctx)
return connStr, func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
err = pgBouncerContainer.Terminate(ctx)
if err != nil {
return fmt.Errorf("failed to terminate pgbouncer container: %w", err)
}
return nil
}
}
log.Fatalf("failed to connect to pgbouncer container after 10 attempts: %v", err)
// this should never be reached
return "", func() error {
return nil
}
}
func seedDatabase(dc *database.Layer) {
log.Printf("Seeding database")