Fix olap txs (#1763)

* jitter

* times

* configurable olap jitter and interval

* fix--olap-tx

* unlock mu

* err
This commit is contained in:
Gabe Ruttner
2025-05-21 10:17:39 -07:00
committed by GitHub
parent d32bb66233
commit 59edb02a8b
2 changed files with 46 additions and 21 deletions
@@ -189,14 +189,14 @@ func New(fs ...OLAPControllerOpt) (*OLAPControllerImpl, error) {
o.updateTaskStatusOperations = queueutils.NewOperationPool(
opts.l,
time.Second*30,
time.Second*15,
"update task statuses",
o.updateTaskStatuses,
).WithJitter(time.Duration(jitterMs) * time.Millisecond)
o.updateDAGStatusOperations = queueutils.NewOperationPool(
opts.l,
time.Second*30,
time.Second*15,
"update dag statuses",
o.updateDAGStatuses,
).WithJitter(time.Duration(jitterMs) * time.Millisecond)
+44 -19
View File
@@ -471,7 +471,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtyp
params.RetryCount = pgtype.Int4{Int32: int32(*retryCount), Valid: true}
}
taskRun, err := r.queries.PopulateSingleTaskRunData(ctx, r.pool, params)
taskRun, err := r.queries.PopulateSingleTaskRunData(ctx, r.readPool, params)
if err != nil {
return nil, emptyUUID, err
@@ -988,8 +988,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId st
params.CreatedBefore = sqlchelpers.TimestamptzFromTime(*opts.CreatedBefore)
}
res, err := r.queries.GetTenantStatusMetrics(context.Background(), r.pool, params)
res, err := r.queries.GetTenantStatusMetrics(ctx, r.readPool, params)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return []TaskRunMetric{}, nil
@@ -1134,6 +1133,8 @@ func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId st
}
mu.Lock()
defer mu.Unlock()
isSaturated = isSaturated || statusUpdateRes.Count == int64(limit)
if len(statusUpdateRes.TaskIds) != len(statusUpdateRes.TaskInsertedAts) ||
@@ -1151,8 +1152,6 @@ func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId st
})
}
mu.Unlock()
return nil
})
}
@@ -1179,10 +1178,10 @@ func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId str
partitionNumber := i
eg.Go(func() error {
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000)
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 15000)
if err != nil {
return err
return fmt.Errorf("failed to prepare transaction: %w", err)
}
defer rollback()
@@ -1194,14 +1193,16 @@ func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId str
})
if err != nil {
return err
return fmt.Errorf("failed to update DAG statuses: %w", err)
}
if err := commit(ctx); err != nil {
return err
return fmt.Errorf("failed to commit transaction: %w", err)
}
mu.Lock()
defer mu.Unlock()
isSaturated = isSaturated || statusUpdateRes.Count == int64(limit)
if len(statusUpdateRes.DagIds) != len(statusUpdateRes.DagInsertedAts) ||
@@ -1219,14 +1220,12 @@ func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId str
})
}
mu.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
return false, nil, err
return false, nil, fmt.Errorf("failed to wait for status update goroutines: %w", err)
}
return isSaturated, rows, nil
@@ -1261,9 +1260,22 @@ func (r *OLAPRepositoryImpl) writeTaskBatch(ctx context.Context, tenantId string
})
}
_, err := r.queries.CreateTasksOLAP(ctx, r.pool, params)
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000)
if err != nil {
return err
}
defer rollback()
return err
_, err = r.queries.CreateTasksOLAP(ctx, tx, params)
if err != nil {
return err
}
if err := commit(ctx); err != nil {
return err
}
return nil
}
func (r *OLAPRepositoryImpl) writeDAGBatch(ctx context.Context, tenantId string, dags []*DAGWithData) error {
@@ -1290,9 +1302,22 @@ func (r *OLAPRepositoryImpl) writeDAGBatch(ctx context.Context, tenantId string,
})
}
_, err := r.queries.CreateDAGsOLAP(ctx, r.pool, params)
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000)
if err != nil {
return err
}
defer rollback()
return err
_, err = r.queries.CreateDAGsOLAP(ctx, tx, params)
if err != nil {
return err
}
if err := commit(ctx); err != nil {
return err
}
return nil
}
func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error {
@@ -1368,7 +1393,7 @@ func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string
// start out by getting a list of task external ids for the workflow run id
rootTaskExternalIds := make([]pgtype.UUID, 0)
rootTasks, err := r.queries.FlattenTasksByExternalIds(ctx, r.pool, sqlcv1.FlattenTasksByExternalIdsParams{
rootTasks, err := r.queries.FlattenTasksByExternalIds(ctx, r.readPool, sqlcv1.FlattenTasksByExternalIdsParams{
Externalids: []pgtype.UUID{workflowRunId},
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
})
@@ -1381,7 +1406,7 @@ func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string
rootTaskExternalIds = append(rootTaskExternalIds, task.ExternalID)
}
runsList, err := r.queries.GetRunsListRecursive(ctx, r.pool, sqlcv1.GetRunsListRecursiveParams{
runsList, err := r.queries.GetRunsListRecursive(ctx, r.readPool, sqlcv1.GetRunsListRecursiveParams{
Taskexternalids: rootTaskExternalIds,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Depth: depth,
@@ -1404,7 +1429,7 @@ func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string
idsToDepth[sqlchelpers.UUIDToStr(row.ExternalID)] = row.Depth
}
tasksWithData, err := r.queries.PopulateTaskRunData(ctx, r.pool, sqlcv1.PopulateTaskRunDataParams{
tasksWithData, err := r.queries.PopulateTaskRunData(ctx, r.readPool, sqlcv1.PopulateTaskRunDataParams{
Taskids: taskIds,
Taskinsertedats: taskInsertedAts,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),