mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-01 15:10:02 -05:00
feat: read replica support and docs (#1617)
* feat: read replica support and docs * fix: load logic
This commit is contained in:
@@ -26,5 +26,6 @@ export default {
|
||||
benchmarking: "Benchmarking",
|
||||
"data-retention": "Data Retention",
|
||||
"improving-performance": "Improving Performance",
|
||||
"read-replicas": "Read Replicas",
|
||||
sampling: "Trace Sampling",
|
||||
};
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
# Read Replica Support
|
||||
|
||||
For high-throughput production deployments, Hatchet supports database read replicas to distribute database load and improve read performance. This feature allows you to direct read queries to a separate database instance while continuing to send write operations to the primary database. **This can significantly improve performance in read-heavy workloads without requiring application changes.**
|
||||
|
||||
You can enable read replica support by setting the following environment variables:
|
||||
|
||||
```bash
|
||||
READ_REPLICA_ENABLED=true
|
||||
READ_REPLICA_DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5432/hatchet'
|
||||
READ_REPLICA_MAX_CONNS=200
|
||||
READ_REPLICA_MIN_CONNS=50
|
||||
```
|
||||
|
||||
## Configuration Options
|
||||
|
||||
- `READ_REPLICA_ENABLED`: Set to `true` to enable read replica support
|
||||
- `READ_REPLICA_DATABASE_URL`: Connection string for the read replica database
|
||||
- `READ_REPLICA_MAX_CONNS`: Maximum number of connections in the read replica connection pool
|
||||
- `READ_REPLICA_MIN_CONNS`: Minimum number of connections to maintain in the read replica connection pool
|
||||
|
||||
## Limitations
|
||||
|
||||
- Replication lag may result in slightly stale or missing data being returned from read operations
|
||||
- The read replica is only utilized by analytical queries (to load workflow runs, task runs, and metrics in the UI)
|
||||
@@ -19,6 +19,11 @@ type ConfigFile struct {
|
||||
PostgresDbName string `mapstructure:"dbName" json:"dbName,omitempty" default:"hatchet"`
|
||||
PostgresSSLMode string `mapstructure:"sslMode" json:"sslMode,omitempty" default:"disable"`
|
||||
|
||||
ReadReplicaEnabled bool `mapstructure:"readReplicaEnabled" json:"readReplicaEnabled,omitempty" default:"false"`
|
||||
ReadReplicaDatabaseURL string `mapstructure:"readReplicaDatabaseUrl" json:"readReplicaDatabaseUrl,omitempty" default:""`
|
||||
ReadReplicaMaxConns int `mapstructure:"readReplicaMaxConns" json:"readReplicaMaxConns,omitempty" default:"50"`
|
||||
ReadReplicaMinConns int `mapstructure:"readReplicaMinConns" json:"readReplicaMinConns,omitempty" default:"10"`
|
||||
|
||||
MaxConns int `mapstructure:"maxConns" json:"maxConns,omitempty" default:"50"`
|
||||
MinConns int `mapstructure:"minConns" json:"minConns,omitempty" default:"10"`
|
||||
|
||||
@@ -53,6 +58,8 @@ type Layer struct {
|
||||
|
||||
EssentialPool *pgxpool.Pool
|
||||
|
||||
ReadReplicaPool *pgxpool.Pool
|
||||
|
||||
QueuePool *pgxpool.Pool
|
||||
|
||||
APIRepository repository.APIRepository
|
||||
@@ -79,6 +86,11 @@ func BindAllEnv(v *viper.Viper) {
|
||||
_ = v.BindEnv("maxQueueConns", "DATABASE_MAX_QUEUE_CONNS")
|
||||
_ = v.BindEnv("minQueueConns", "DATABASE_MIN_QUEUE_CONNS")
|
||||
|
||||
_ = v.BindEnv("readReplicaEnabled", "READ_REPLICA_ENABLED")
|
||||
_ = v.BindEnv("readReplicaDatabaseUrl", "READ_REPLICA_DATABASE_URL")
|
||||
_ = v.BindEnv("readReplicaMaxConns", "READ_REPLICA_MAX_CONNS")
|
||||
_ = v.BindEnv("readReplicaMinConns", "READ_REPLICA_MIN_CONNS")
|
||||
|
||||
_ = v.BindEnv("cacheDuration", "CACHE_DURATION")
|
||||
|
||||
_ = v.BindEnv("seed.adminEmail", "ADMIN_EMAIL")
|
||||
|
||||
@@ -191,6 +191,38 @@ func (c *ConfigLoader) InitDataLayer() (res *database.Layer, err error) {
|
||||
return nil, fmt.Errorf("could not connect to database: %w", err)
|
||||
}
|
||||
|
||||
// a pool for read replicas, if enabled
|
||||
var readReplicaPool *pgxpool.Pool
|
||||
|
||||
if cf.ReadReplicaEnabled {
|
||||
if cf.ReadReplicaDatabaseURL == "" {
|
||||
return nil, fmt.Errorf("read replica database url is required if read replica is enabled")
|
||||
}
|
||||
|
||||
readReplicaConfig, err := pgxpool.ParseConfig(cf.ReadReplicaDatabaseURL)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse read replica database url: %w", err)
|
||||
}
|
||||
|
||||
if cf.ReadReplicaMaxConns != 0 {
|
||||
readReplicaConfig.MaxConns = int32(cf.ReadReplicaMaxConns) // nolint: gosec
|
||||
}
|
||||
|
||||
if cf.ReadReplicaMinConns != 0 {
|
||||
readReplicaConfig.MinConns = int32(cf.ReadReplicaMinConns) // nolint: gosec
|
||||
}
|
||||
|
||||
readReplicaConfig.MaxConnLifetime = 15 * 60 * time.Second
|
||||
readReplicaConfig.ConnConfig.Tracer = otelpgx.NewTracer()
|
||||
|
||||
readReplicaPool, err = pgxpool.NewWithConfig(context.Background(), readReplicaConfig)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not connect to read replica database: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
ch := cache.New(cf.CacheDuration)
|
||||
|
||||
entitlementRepo := postgresdb.NewEntitlementRepository(pool, &scf.Runtime, postgresdb.WithLogger(&l), postgresdb.WithCache(ch))
|
||||
@@ -229,6 +261,10 @@ func (c *ConfigLoader) InitDataLayer() (res *database.Layer, err error) {
|
||||
return nil, fmt.Errorf("could not create api repository: %w", err)
|
||||
}
|
||||
|
||||
if readReplicaPool != nil {
|
||||
v1.OLAP().SetReadReplicaPool(readReplicaPool)
|
||||
}
|
||||
|
||||
return &database.Layer{
|
||||
Disconnect: func() error {
|
||||
if err := cleanupEngine(); err != nil {
|
||||
|
||||
+24
-15
@@ -181,6 +181,8 @@ type UpdateDAGStatusRow struct {
|
||||
|
||||
type OLAPRepository interface {
|
||||
UpdateTablePartitions(ctx context.Context) error
|
||||
SetReadReplicaPool(pool *pgxpool.Pool)
|
||||
|
||||
ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
|
||||
ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
|
||||
ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error)
|
||||
@@ -209,6 +211,8 @@ type OLAPRepository interface {
|
||||
type OLAPRepositoryImpl struct {
|
||||
*sharedRepository
|
||||
|
||||
readPool *pgxpool.Pool
|
||||
|
||||
eventCache *lru.Cache[string, bool]
|
||||
|
||||
olapRetentionPeriod time.Duration
|
||||
@@ -231,6 +235,7 @@ func newOLAPRepository(shared *sharedRepository, olapRetentionPeriod time.Durati
|
||||
|
||||
return &OLAPRepositoryImpl{
|
||||
sharedRepository: shared,
|
||||
readPool: shared.pool,
|
||||
eventCache: eventCache,
|
||||
olapRetentionPeriod: olapRetentionPeriod,
|
||||
}
|
||||
@@ -303,6 +308,10 @@ func (o *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool) {
|
||||
o.readPool = pool
|
||||
}
|
||||
|
||||
func StringToReadableStatus(status string) ReadableTaskStatus {
|
||||
switch status {
|
||||
case "QUEUED":
|
||||
@@ -321,7 +330,7 @@ func StringToReadableStatus(status string) ReadableTaskStatus {
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error) {
|
||||
row, err := r.queries.ReadTaskByExternalID(ctx, r.pool, sqlchelpers.UUIDFromStr(taskExternalId))
|
||||
row, err := r.queries.ReadTaskByExternalID(ctx, r.readPool, sqlchelpers.UUIDFromStr(taskExternalId))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -367,7 +376,7 @@ func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error) {
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error) {
|
||||
row, err := r.queries.ReadWorkflowRunByExternalId(ctx, r.pool, workflowRunExternalId)
|
||||
row, err := r.queries.ReadWorkflowRunByExternalId(ctx, r.readPool, workflowRunExternalId)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -402,7 +411,7 @@ func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExt
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error) {
|
||||
taskRun, err := r.queries.PopulateSingleTaskRunData(ctx, r.pool, sqlcv1.PopulateSingleTaskRunDataParams{
|
||||
taskRun, err := r.queries.PopulateSingleTaskRunData(ctx, r.readPool, sqlcv1.PopulateSingleTaskRunDataParams{
|
||||
Taskid: taskId,
|
||||
Tenantid: tenantId,
|
||||
Taskinsertedat: taskInsertedAt,
|
||||
@@ -418,7 +427,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtyp
|
||||
dagId := taskRun.DagID.Int64
|
||||
dagInsertedAt := taskRun.DagInsertedAt
|
||||
|
||||
workflowRunId, err = r.queries.GetWorkflowRunIdFromDagIdInsertedAt(ctx, r.pool, sqlcv1.GetWorkflowRunIdFromDagIdInsertedAtParams{
|
||||
workflowRunId, err = r.queries.GetWorkflowRunIdFromDagIdInsertedAt(ctx, r.readPool, sqlcv1.GetWorkflowRunIdFromDagIdInsertedAtParams{
|
||||
Dagid: dagId,
|
||||
Daginsertedat: dagInsertedAt,
|
||||
})
|
||||
@@ -432,7 +441,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtyp
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error) {
|
||||
tx, err := r.pool.Begin(ctx)
|
||||
tx, err := r.readPool.Begin(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
@@ -540,7 +549,7 @@ func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opt
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error) {
|
||||
tx, err := r.pool.Begin(ctx)
|
||||
tx, err := r.readPool.Begin(ctx)
|
||||
taskIdToDagExternalId := make(map[int64]uuid.UUID)
|
||||
|
||||
if err != nil {
|
||||
@@ -588,7 +597,7 @@ func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId stri
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error) {
|
||||
tx, err := r.pool.Begin(ctx)
|
||||
tx, err := r.readPool.Begin(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -622,7 +631,7 @@ func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, ten
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error) {
|
||||
tx, err := r.pool.Begin(ctx)
|
||||
tx, err := r.readPool.Begin(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
@@ -828,7 +837,7 @@ func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId stri
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error) {
|
||||
rows, err := r.queries.ListTaskEvents(ctx, r.pool, sqlcv1.ListTaskEventsParams{
|
||||
rows, err := r.queries.ListTaskEvents(ctx, r.readPool, sqlcv1.ListTaskEventsParams{
|
||||
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
|
||||
Taskid: taskId,
|
||||
Taskinsertedat: taskInsertedAt,
|
||||
@@ -842,7 +851,7 @@ func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId str
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error) {
|
||||
rows, err := r.queries.ListTaskEventsForWorkflowRun(ctx, r.pool, sqlcv1.ListTaskEventsForWorkflowRunParams{
|
||||
rows, err := r.queries.ListTaskEventsForWorkflowRun(ctx, r.readPool, sqlcv1.ListTaskEventsForWorkflowRunParams{
|
||||
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
|
||||
Workflowrunid: workflowRunId,
|
||||
})
|
||||
@@ -870,7 +879,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId st
|
||||
parentTaskExternalId = *opts.ParentTaskExternalID
|
||||
}
|
||||
|
||||
res, err := r.queries.GetTenantStatusMetrics(context.Background(), r.pool, sqlcv1.GetTenantStatusMetricsParams{
|
||||
res, err := r.queries.GetTenantStatusMetrics(context.Background(), r.readPool, sqlcv1.GetTenantStatusMetricsParams{
|
||||
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
|
||||
Createdafter: sqlchelpers.TimestamptzFromTime(opts.CreatedAfter),
|
||||
WorkflowIds: workflowIds,
|
||||
@@ -1195,7 +1204,7 @@ func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, da
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error) {
|
||||
rows, err := r.queries.GetTaskPointMetrics(ctx, r.pool, sqlcv1.GetTaskPointMetricsParams{
|
||||
rows, err := r.queries.GetTaskPointMetrics(ctx, r.readPool, sqlcv1.GetTaskPointMetricsParams{
|
||||
Interval: durationToPgInterval(bucketInterval),
|
||||
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
|
||||
Createdafter: sqlchelpers.TimestamptzFromTime(*startTimestamp),
|
||||
@@ -1214,7 +1223,7 @@ func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId s
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error) {
|
||||
return r.queries.ReadDAGByExternalID(ctx, r.pool, sqlchelpers.UUIDFromStr(dagExternalId))
|
||||
return r.queries.ReadDAGByExternalID(ctx, r.readPool, sqlchelpers.UUIDFromStr(dagExternalId))
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error) {
|
||||
@@ -1224,7 +1233,7 @@ func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantI
|
||||
externalUUIDs = append(externalUUIDs, sqlchelpers.UUIDFromStr(id))
|
||||
}
|
||||
|
||||
return r.queries.FlattenTasksByExternalIds(ctx, r.pool, sqlcv1.FlattenTasksByExternalIdsParams{
|
||||
return r.queries.FlattenTasksByExternalIds(ctx, r.readPool, sqlcv1.FlattenTasksByExternalIdsParams{
|
||||
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
|
||||
Externalids: externalUUIDs,
|
||||
})
|
||||
@@ -1241,7 +1250,7 @@ func durationToPgInterval(d time.Duration) pgtype.Interval {
|
||||
}
|
||||
|
||||
func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error) {
|
||||
return r.queries.ListWorkflowRunDisplayNames(ctx, r.pool, sqlcv1.ListWorkflowRunDisplayNamesParams{
|
||||
return r.queries.ListWorkflowRunDisplayNames(ctx, r.readPool, sqlcv1.ListWorkflowRunDisplayNamesParams{
|
||||
Tenantid: tenantId,
|
||||
Externalids: externalIds,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user