feat: support maxRuns parameter on workers (#195)

* feat: round robin queueing

* feat: configurable max runs per worker

* fix: address PR review

* docs for max runs and group round robin
This commit is contained in:
abelanger5
2024-02-25 21:48:46 -08:00
committed by GitHub
parent 2d625fec81
commit 6ea38a99f2
46 changed files with 659 additions and 373 deletions
@@ -678,6 +678,7 @@ func (ec *JobsControllerImpl) scheduleStepRun(ctx context.Context, tenantId, ste
workers, err := ec.repo.Worker().ListWorkers(tenantId, &repository.ListWorkersOpts{
Action: &stepRun.Step().ActionID,
LastHeartbeatAfter: &after,
Assignable: repository.BoolPtr(true),
})
if err != nil {
@@ -693,15 +694,17 @@ func (ec *JobsControllerImpl) scheduleStepRun(ctx context.Context, tenantId, ste
selectedWorker := workers[0]
for _, worker := range workers {
if worker.StepRunCount < selectedWorker.StepRunCount {
if worker.RunningStepRuns < selectedWorker.RunningStepRuns {
selectedWorker = worker
}
}
telemetry.WithAttributes(span, servertel.WorkerId(selectedWorker.Worker.ID))
selectedWorkerId := sqlchelpers.UUIDToStr(selectedWorker.Worker.ID)
telemetry.WithAttributes(span, servertel.WorkerId(selectedWorkerId))
// update the job run's designated worker
err = ec.repo.Worker().AddStepRun(tenantId, selectedWorker.Worker.ID, stepRunId)
err = ec.repo.Worker().AddStepRun(tenantId, selectedWorkerId, stepRunId)
if err != nil {
return fmt.Errorf("could not add step run to worker: %w", err)
@@ -728,11 +731,13 @@ func (ec *JobsControllerImpl) scheduleStepRun(ctx context.Context, tenantId, ste
return fmt.Errorf("could not schedule step run timeout task: %w", err)
}
dispatcherId := sqlchelpers.UUIDToStr(selectedWorker.Worker.DispatcherId)
// send a task to the dispatcher
err = ec.tq.AddTask(
ctx,
taskqueue.QueueTypeFromDispatcherID(selectedWorker.Worker.Dispatcher().ID),
stepRunAssignedTask(tenantId, stepRunId, selectedWorker.Worker),
taskqueue.QueueTypeFromDispatcherID(dispatcherId),
stepRunAssignedTask(tenantId, stepRunId, selectedWorkerId, dispatcherId),
)
if err != nil {
@@ -1215,17 +1220,15 @@ func (ec *JobsControllerImpl) getValidTickers() ([]db.TickerModel, error) {
return tickers, nil
}
func stepRunAssignedTask(tenantId, stepRunId string, worker *db.WorkerModel) *taskqueue.Task {
dispatcher := worker.Dispatcher()
func stepRunAssignedTask(tenantId, stepRunId, workerId, dispatcherId string) *taskqueue.Task {
payload, _ := datautils.ToJSONMap(tasktypes.StepRunAssignedTaskPayload{
StepRunId: stepRunId,
WorkerId: worker.ID,
WorkerId: workerId,
})
metadata, _ := datautils.ToJSONMap(tasktypes.StepRunAssignedTaskMetadata{
TenantId: tenantId,
DispatcherId: dispatcher.ID,
DispatcherId: dispatcherId,
})
return &taskqueue.Task{
@@ -169,6 +169,7 @@ func (wc *WorkflowsControllerImpl) scheduleGetGroupAction(
workers, err := wc.repo.Worker().ListWorkers(workflowRun.TenantID, &repository.ListWorkersOpts{
Action: &getAction.ActionID,
LastHeartbeatAfter: &after,
Assignable: repository.BoolPtr(true),
})
if err != nil {
@@ -184,15 +185,17 @@ func (wc *WorkflowsControllerImpl) scheduleGetGroupAction(
selectedWorker := workers[0]
for _, worker := range workers {
if worker.StepRunCount < selectedWorker.StepRunCount {
if worker.RunningStepRuns < selectedWorker.RunningStepRuns {
selectedWorker = worker
}
}
telemetry.WithAttributes(span, servertel.WorkerId(selectedWorker.Worker.ID))
selectedWorkerId := sqlchelpers.UUIDToStr(selectedWorker.Worker.ID)
telemetry.WithAttributes(span, servertel.WorkerId(selectedWorkerId))
// update the job run's designated worker
err = wc.repo.Worker().AddGetGroupKeyRun(tenantId, selectedWorker.Worker.ID, getGroupKeyRun.ID)
err = wc.repo.Worker().AddGetGroupKeyRun(tenantId, selectedWorkerId, getGroupKeyRun.ID)
if err != nil {
return fmt.Errorf("could not add step run to worker: %w", err)
@@ -219,11 +222,18 @@ func (wc *WorkflowsControllerImpl) scheduleGetGroupAction(
return fmt.Errorf("could not schedule step run timeout task: %w", err)
}
dispatcherId := sqlchelpers.UUIDToStr(selectedWorker.Worker.DispatcherId)
// send a task to the dispatcher
err = wc.tq.AddTask(
ctx,
taskqueue.QueueTypeFromDispatcherID(selectedWorker.Worker.Dispatcher().ID),
getGroupActionTask(workflowRun.TenantID, workflowRun.ID, selectedWorker.Worker),
taskqueue.QueueTypeFromDispatcherID(dispatcherId),
getGroupActionTask(
workflowRun.TenantID,
workflowRun.ID,
selectedWorkerId,
dispatcherId,
),
)
if err != nil {
@@ -534,17 +544,15 @@ func (wc *WorkflowsControllerImpl) cancelWorkflowRun(tenantId, workflowRunId str
return errGroup.Wait()
}
func getGroupActionTask(tenantId, workflowRunId string, worker *db.WorkerModel) *taskqueue.Task {
dispatcher := worker.Dispatcher()
func getGroupActionTask(tenantId, workflowRunId, workerId, dispatcherId string) *taskqueue.Task {
payload, _ := datautils.ToJSONMap(tasktypes.GroupKeyActionAssignedTaskPayload{
WorkflowRunId: workflowRunId,
WorkerId: worker.ID,
WorkerId: workerId,
})
metadata, _ := datautils.ToJSONMap(tasktypes.GroupKeyActionAssignedTaskMetadata{
TenantId: tenantId,
DispatcherId: dispatcher.ID,
DispatcherId: dispatcherId,
})
return &taskqueue.Task{