diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 250012b0a..e71d08f5e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -112,7 +112,8 @@ jobs: - name: Generate run: | - go run github.com/steebchen/prisma-client-go migrate deploy + go run github.com/steebchen/prisma-client-go db push + task generate-all task generate-certs task generate-local-encryption-keys diff --git a/.golangci.yml b/.golangci.yml index 508d2969f..56f7e932d 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -16,6 +16,7 @@ linters: - vet - stylecheck - unconvert + - ineffassign linters-settings: goimports: diff --git a/api/v1/server/authn/middleware.go b/api/v1/server/authn/middleware.go index c55bbd642..3079e7fbb 100644 --- a/api/v1/server/authn/middleware.go +++ b/api/v1/server/authn/middleware.go @@ -136,6 +136,11 @@ func (a *AuthN) handleCookieAuth(c echo.Context) error { } user, err := a.config.Repository.User().GetUserByID(userID) + if err != nil { + a.l.Debug().Err(err).Msg("error getting user by id") + + return forbidden + } // set the user and session in context c.Set("user", user) diff --git a/api/v1/server/handlers/github-app/github_incoming.go b/api/v1/server/handlers/github-app/github_incoming.go index ee1f2850a..e1dc79353 100644 --- a/api/v1/server/handlers/github-app/github_incoming.go +++ b/api/v1/server/handlers/github-app/github_incoming.go @@ -1,15 +1,15 @@ package githubapp import ( + "fmt" "net/http" + githubsdk "github.com/google/go-github/v57/github" "github.com/labstack/echo/v4" "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" "github.com/hatchet-dev/hatchet/internal/integrations/vcs/github" "github.com/hatchet-dev/hatchet/internal/repository" - - githubsdk "github.com/google/go-github/v57/github" ) func (g *GithubAppService) GithubUpdateTenantWebhook(ctx echo.Context, req gen.GithubUpdateTenantWebhookRequestObject) (gen.GithubUpdateTenantWebhookResponseObject, error) { @@ -42,7 +42,9 @@ func (g *GithubAppService) GithubUpdateTenantWebhook(ctx echo.Context, req gen.G switch event := event.(type) { // nolint: gocritic case *githubsdk.PullRequestEvent: - err = g.processPullRequestEvent(webhook.TenantID, event, ctx.Request()) + if err := g.processPullRequestEvent(webhook.TenantID, event, ctx.Request()); err != nil { + return nil, fmt.Errorf("error processing pull request event: %w", err) + } } return nil, nil diff --git a/hack/dev/run-npx-with-env.sh b/hack/dev/run-npx-with-env.sh index e3020f737..fae933854 100644 --- a/hack/dev/run-npx-with-env.sh +++ b/hack/dev/run-npx-with-env.sh @@ -3,7 +3,7 @@ set -eux set -a -. .env +. .env || true set +a exec npx "$@" diff --git a/internal/repository/prisma/step_run.go b/internal/repository/prisma/step_run.go index b788a5165..8e9827f6b 100644 --- a/internal/repository/prisma/step_run.go +++ b/internal/repository/prisma/step_run.go @@ -429,7 +429,7 @@ func (s *stepRunRepository) QueueStepRun(ctx context.Context, tenantId, stepRunI var stepRun *dbsqlc.GetStepRunForEngineRow - err = retrier(s.l, func() error { + retrierErr := retrier(s.l, func() error { tx, err := s.pool.Begin(context.Background()) if err != nil { @@ -452,22 +452,29 @@ func (s *stepRunRepository) QueueStepRun(ctx context.Context, tenantId, stepRunI return repository.ErrStepRunIsNotPending } - stepRun, err = s.updateStepRunCore(ctx, tx, tenantId, updateParams, updateJobRunLookupDataParams) + sr, err := s.updateStepRunCore(ctx, tx, tenantId, updateParams, updateJobRunLookupDataParams) + if err != nil { + return err + } + + stepRun = sr if err != nil { return err } - if err != nil { + if err := tx.Commit(context.Background()); err != nil { return err } - err = tx.Commit(context.Background()) - - return err + return nil }) - err = retrier(s.l, func() error { + if retrierErr != nil { + return nil, fmt.Errorf("could not queue step run: %w", err) + } + + retrierExtraErr := retrier(s.l, func() error { tx, err := s.pool.Begin(context.Background()) if err != nil { @@ -487,7 +494,7 @@ func (s *stepRunRepository) QueueStepRun(ctx context.Context, tenantId, stepRunI return err }) - if err != nil { + if retrierExtraErr != nil { // non-fatal error, log and continue s.l.Err(err).Msg("could not update step run extra") return nil, nil @@ -600,7 +607,7 @@ func (s *stepRunRepository) updateStepRunCore( updateParams dbsqlc.UpdateStepRunParams, updateJobRunLookupDataParams *dbsqlc.UpdateJobRunLookupDataWithStepRunParams, ) (*dbsqlc.GetStepRunForEngineRow, error) { - ctx, span := telemetry.NewSpan(ctx, "update-step-run-core") + ctx, span := telemetry.NewSpan(ctx, "update-step-run-core") // nolint:ineffassign defer span.End() updateStepRun, err := s.queries.UpdateStepRun(ctx, tx, updateParams) @@ -641,7 +648,7 @@ func (s *stepRunRepository) updateStepRunExtra( resolveJobRunParams dbsqlc.ResolveJobRunStatusParams, resolveLaterStepRunsParams dbsqlc.ResolveLaterStepRunsParams, ) (*repository.StepRunUpdateInfo, error) { - ctx, span := telemetry.NewSpan(ctx, "update-step-run-extra") + ctx, span := telemetry.NewSpan(ctx, "update-step-run-extra") // nolint:ineffassign defer span.End() _, err := s.queries.ResolveLaterStepRuns(context.Background(), tx, resolveLaterStepRunsParams) diff --git a/internal/services/admin/server.go b/internal/services/admin/server.go index 005d69b2e..e325d23db 100644 --- a/internal/services/admin/server.go +++ b/internal/services/admin/server.go @@ -61,7 +61,7 @@ func (a *AdminServiceImpl) TriggerWorkflow(ctx context.Context, req *contracts.T ) } - return nil, err + return nil, fmt.Errorf("could not get workflow by name: %w", err) } workflowVersion := &workflow.Versions()[0] @@ -73,7 +73,7 @@ func (a *AdminServiceImpl) TriggerWorkflow(ctx context.Context, req *contracts.T createOpts, err := repository.GetCreateWorkflowRunOptsFromManual(workflowVersion, []byte(req.Input)) if err != nil { - return nil, err + return nil, fmt.Errorf("could not create workflow run opts: %w", err) } workflowRun, err := a.repo.WorkflowRun().CreateNewWorkflowRun(ctx, tenant.ID, createOpts) @@ -88,6 +88,9 @@ func (a *AdminServiceImpl) TriggerWorkflow(ctx context.Context, req *contracts.T msgqueue.WORKFLOW_PROCESSING_QUEUE, tasktypes.WorkflowRunQueuedToTask(workflowRun), ) + if err != nil { + return nil, fmt.Errorf("could not queue workflow run: %w", err) + } return &contracts.TriggerWorkflowResponse{ WorkflowRunId: workflowRun.ID, diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index e43833552..20fbb5d5b 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -8,9 +8,8 @@ import ( "sync" "time" - "github.com/goccy/go-json" - "github.com/go-co-op/gocron/v2" + "github.com/goccy/go-json" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" @@ -26,7 +25,6 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/internal/telemetry/servertel" - hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" ) @@ -253,6 +251,9 @@ func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgq // list the step runs which are startable startableStepRuns, err := ec.repo.StepRun().ListStartableStepRuns(metadata.TenantId, payload.JobRunId, nil) + if err != nil { + return fmt.Errorf("could not list startable step runs: %w", err) + } g := new(errgroup.Group) diff --git a/internal/services/controllers/workflows/controller.go b/internal/services/controllers/workflows/controller.go index 6a2a65f9a..5a75773fd 100644 --- a/internal/services/controllers/workflows/controller.go +++ b/internal/services/controllers/workflows/controller.go @@ -199,7 +199,7 @@ func (wc *WorkflowsControllerImpl) handleTask(ctx context.Context, task *msgqueu } func (ec *WorkflowsControllerImpl) handleGroupKeyRunStarted(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "get-group-key-run-started") + ctx, span := telemetry.NewSpan(ctx, "get-group-key-run-started") // nolint:ineffassign defer span.End() payload := tasktypes.GetGroupKeyRunStartedTaskPayload{} diff --git a/internal/services/controllers/workflows/queue.go b/internal/services/controllers/workflows/queue.go index 7f4e65c02..24259e501 100644 --- a/internal/services/controllers/workflows/queue.go +++ b/internal/services/controllers/workflows/queue.go @@ -213,12 +213,12 @@ func (wc *WorkflowsControllerImpl) scheduleGetGroupAction( } func (wc *WorkflowsControllerImpl) queueWorkflowRunJobs(ctx context.Context, workflowRun *db.WorkflowRunModel) error { - ctx, span := telemetry.NewSpan(ctx, "process-event") + ctx, span := telemetry.NewSpan(ctx, "process-event") // nolint:ineffassign defer span.End() jobRuns := workflowRun.JobRuns() - var err error + var returnErr error for i := range jobRuns { err := wc.mq.AddMessage( @@ -228,11 +228,11 @@ func (wc *WorkflowsControllerImpl) queueWorkflowRunJobs(ctx context.Context, wor ) if err != nil { - err = multierror.Append(err, fmt.Errorf("could not add job run to task queue: %w", err)) + returnErr = multierror.Append(err, fmt.Errorf("could not add job run to task queue: %w", err)) } } - return err + return returnErr } func (wc *WorkflowsControllerImpl) runGetGroupKeyRunRequeue(ctx context.Context) func() { @@ -303,7 +303,7 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunRequeueTenant(ctx context.Co isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now) if isTimedOut { - innerGetGroupKeyRun, err = ec.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{ + _, err := ec.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{ CancelledAt: &now, CancelledReason: repository.StringPtr("SCHEDULING_TIMED_OUT"), Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled), diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 3a14b1860..4807ab4e6 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -46,7 +46,7 @@ func (worker *subscribedWorker) StartStepRun( tenantId string, stepRun *db.StepRunModel, ) error { - ctx, span := telemetry.NewSpan(ctx, "start-step-run") + ctx, span := telemetry.NewSpan(ctx, "start-step-run") // nolint:ineffassign defer span.End() inputBytes := []byte{} @@ -79,11 +79,9 @@ func (worker *subscribedWorker) StartGroupKeyAction( tenantId string, workflowRun *db.WorkflowRunModel, ) error { - ctx, span := telemetry.NewSpan(ctx, "start-group-key-action") + ctx, span := telemetry.NewSpan(ctx, "start-group-key-action") // nolint:ineffassign defer span.End() - inputBytes := []byte{} - concurrency, ok := workflowRun.WorkflowVersion().Concurrency() if !ok { @@ -123,8 +121,6 @@ func (worker *subscribedWorker) StartGroupKeyAction( } } - inputBytes = []byte(inputData) - getGroupKeyRun, ok := workflowRun.GetGroupKeyRun() if !ok { @@ -137,7 +133,7 @@ func (worker *subscribedWorker) StartGroupKeyAction( GetGroupKeyRunId: getGroupKeyRun.ID, ActionType: contracts.ActionType_START_GET_GROUP_KEY, ActionId: concurrencyFn.ActionID, - ActionPayload: string(inputBytes), + ActionPayload: string(inputData), }) } @@ -146,7 +142,7 @@ func (worker *subscribedWorker) CancelStepRun( tenantId string, stepRun *db.StepRunModel, ) error { - ctx, span := telemetry.NewSpan(ctx, "cancel-step-run") + ctx, span := telemetry.NewSpan(ctx, "cancel-step-run") // nolint:ineffassign defer span.End() stepName, _ := stepRun.Step().ReadableID() diff --git a/internal/testutils/env.go b/internal/testutils/env.go index af6be6dc1..7a3b2791d 100644 --- a/internal/testutils/env.go +++ b/internal/testutils/env.go @@ -1,6 +1,7 @@ package testutils import ( + "errors" "os" "path" "path/filepath" @@ -8,6 +9,8 @@ import ( "testing" "github.com/hatchet-dev/hatchet/internal/config/loader" + "github.com/hatchet-dev/hatchet/internal/repository" + "github.com/hatchet-dev/hatchet/internal/repository/prisma/db" ) func Prepare(t *testing.T) { @@ -52,7 +55,27 @@ func Prepare(t *testing.T) { t.Fatalf("could not load server config: %v", err) } + // check if tenant exists + _, err = serverConf.Repository.Tenant().GetTenantByID(tenantId) + if err != nil { + if errors.Is(err, db.ErrNotFound) { + _, err = serverConf.Repository.Tenant().CreateTenant(&repository.CreateTenantOpts{ + ID: &tenantId, + Name: "test-tenant", + Slug: "test-tenant", + }) + if err != nil { + t.Fatalf("could not create tenant: %v", err) + } + } else { + t.Fatalf("could not get tenant: %v", err) + } + } + defaultTok, err := serverConf.Auth.JWTManager.GenerateTenantToken(tenantId, "default") + if err != nil { + t.Fatalf("could not generate default token: %v", err) + } _ = os.Setenv("HATCHET_CLIENT_TOKEN", defaultTok)