Files
hatchet/api/v1/server/handlers/workflows/trigger.go
T
abelanger5 092f54c64f refactor: separate api and engine repositories, change ticker logic (#281)
* refactor: separate api and engine repositories, change ticker logic

* fix: nil error blocks

* fix: run migration on load test

* fix: generate db package in load test

* fix: test.yml

* fix: add pnpm to load test

* fix: don't lock CTEs with columns that don't get updated

* fix: update heartbeat for worker every 4 seconds, not 5

* chore: remove dead code

* chore: update python sdk

* chore: add back telemetry attributes
2024-03-21 14:10:34 -04:00

93 lines
2.4 KiB
Go

package workflows
import (
"encoding/json"
"errors"
"fmt"
"github.com/labstack/echo/v4"
"github.com/hatchet-dev/hatchet/api/v1/server/oas/apierrors"
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
"github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/repository"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
)
func (t *WorkflowService) WorkflowRunCreate(ctx echo.Context, request gen.WorkflowRunCreateRequestObject) (gen.WorkflowRunCreateResponseObject, error) {
tenant := ctx.Get("tenant").(*db.TenantModel)
workflow := ctx.Get("workflow").(*db.WorkflowModel)
var workflowVersionId string
if request.Params.Version != nil {
workflowVersionId = request.Params.Version.String()
} else {
versions := workflow.Versions()
if len(versions) == 0 {
return gen.WorkflowRunCreate400JSONResponse(
apierrors.NewAPIErrors("workflow has no versions"),
), nil
}
workflowVersionId = versions[0].ID
}
workflowVersion, err := t.config.EngineRepository.Workflow().GetWorkflowVersionById(tenant.ID, workflowVersionId)
if err != nil {
if errors.Is(err, db.ErrNotFound) {
return gen.WorkflowRunCreate400JSONResponse(
apierrors.NewAPIErrors("version not found"),
), nil
}
return nil, err
}
// make sure input can be marshalled and unmarshalled to input type
inputBytes, err := json.Marshal(request.Body.Input)
if err != nil {
return gen.WorkflowRunCreate400JSONResponse(
apierrors.NewAPIErrors("Invalid input"),
), nil
}
createOpts, err := repository.GetCreateWorkflowRunOptsFromManual(workflowVersion, inputBytes)
if err != nil {
return nil, err
}
workflowRun, err := t.config.APIRepository.WorkflowRun().CreateNewWorkflowRun(ctx.Request().Context(), tenant.ID, createOpts)
if err != nil {
return nil, fmt.Errorf("could not create workflow run: %w", err)
}
// send to workflow processing queue
err = t.config.MessageQueue.AddMessage(
ctx.Request().Context(),
msgqueue.WORKFLOW_PROCESSING_QUEUE,
tasktypes.WorkflowRunQueuedToTask(workflowRun.TenantID, workflowRun.ID),
)
if err != nil {
return nil, fmt.Errorf("could not add workflow run to queue: %w", err)
}
res, err := transformers.ToWorkflowRun(workflowRun)
if err != nil {
return nil, err
}
return gen.WorkflowRunCreate200JSONResponse(
*res,
), nil
}