From c10b9db7d6663c442c86358caa39d598ff61d3c7 Mon Sep 17 00:00:00 2001 From: Gabe Ruttner Date: Tue, 3 Jun 2025 16:53:20 -0700 Subject: [PATCH] fix: dont copy acks map (#1811) * dont copy * lock * update release and prerelease --------- Co-authored-by: Alexander Belanger --- .github/workflows/pre-release.yaml | 51 ----------------------- .github/workflows/release.yaml | 36 ++++++++++++++++ internal/services/dispatcher/server.go | 22 ++++------ internal/services/dispatcher/server_v1.go | 10 ++--- 4 files changed, 48 insertions(+), 71 deletions(-) diff --git a/.github/workflows/pre-release.yaml b/.github/workflows/pre-release.yaml index cb8dc82bd..fb11e62ea 100644 --- a/.github/workflows/pre-release.yaml +++ b/.github/workflows/pre-release.yaml @@ -4,45 +4,9 @@ on: - "v*" # Push events to matching v*, i.e. v1.0, v20.15.10 name: Create prerelease w/ binaries and static assets jobs: - load: - runs-on: ubuntu-latest - timeout-minutes: 30 - strategy: - matrix: - migrate-strategy: ["latest"] - rabbitmq-enabled: ["true", "false"] - pg-version: ["15-alpine", "16-alpine", "17-alpine"] - - steps: - - uses: actions/checkout@v4 - - - name: Install Task - uses: arduino/setup-task@v2 - - name: Setup Go - uses: actions/setup-go@v5 - with: - go-version: "1.24" - - - name: Setup pnpm - uses: pnpm/action-setup@v4 - with: - version: 9.15.4 - run_install: false - - - name: Go deps - run: go mod download - - - name: Test - run: | - go test -tags load ./... -p 5 -v -race -failfast -timeout 20m - env: - TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }} - TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }} - TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }} build-push-hatchet-api-amd: name: hatchet-api runs-on: ubuntu-latest - needs: load steps: - name: Get tag name id: tag_name @@ -68,7 +32,6 @@ jobs: build-push-hatchet-api-arm: name: hatchet-api runs-on: hatchet-arm64-2 - needs: load steps: - name: Get tag name id: tag_name @@ -121,7 +84,6 @@ jobs: build-push-hatchet-admin-amd: name: hatchet-admin runs-on: ubuntu-latest - needs: load steps: - name: Get tag name id: tag_name @@ -147,7 +109,6 @@ jobs: build-push-hatchet-admin-arm: name: hatchet-admin runs-on: hatchet-arm64-2 - needs: load steps: - name: Get tag name id: tag_name @@ -200,7 +161,6 @@ jobs: build-push-hatchet-engine-amd: name: hatchet-engine runs-on: ubuntu-latest - needs: load steps: - name: Get tag name id: tag_name @@ -226,7 +186,6 @@ jobs: build-push-hatchet-engine-arm: name: hatchet-engine runs-on: hatchet-arm64-2 - needs: load steps: - name: Get tag name id: tag_name @@ -279,7 +238,6 @@ jobs: build-push-hatchet-migrate-amd: name: hatchet-migrate runs-on: ubuntu-latest - needs: load steps: - name: Get tag name id: tag_name @@ -305,7 +263,6 @@ jobs: build-push-hatchet-migrate-arm: name: hatchet-migrate runs-on: hatchet-arm64-2 - needs: load steps: - name: Get tag name id: tag_name @@ -358,7 +315,6 @@ jobs: build-push-hatchet-frontend-amd: name: hatchet-frontend runs-on: ubuntu-latest - needs: load steps: - name: Get tag name id: tag_name @@ -382,7 +338,6 @@ jobs: build-push-hatchet-frontend-arm: name: hatchet-frontend runs-on: hatchet-arm64-2 - needs: load steps: - name: Get tag name id: tag_name @@ -433,7 +388,6 @@ jobs: build-push-hatchet-lite-amd: name: hatchet-lite-amd runs-on: ubuntu-latest - needs: load steps: - name: Get tag name id: tag_name @@ -487,7 +441,6 @@ jobs: build-push-hatchet-lite-arm: name: hatchet-lite-arm runs-on: hatchet-arm64-2 - needs: load steps: - name: Get tag name id: tag_name @@ -568,7 +521,6 @@ jobs: build-push-hatchet-dashboard-amd: name: hatchet-dashboard-amd runs-on: ubuntu-latest - needs: load steps: - name: Get tag name id: tag_name @@ -604,7 +556,6 @@ jobs: build-push-hatchet-dashboard-arm: name: hatchet-dashboard-arm runs-on: hatchet-arm64-2 - needs: load steps: - name: Get tag name id: tag_name @@ -667,7 +618,6 @@ jobs: build-push-hatchet-loadtest-amd: name: hatchet-loadtest runs-on: ubuntu-latest - needs: load steps: - name: Get tag name id: tag_name @@ -691,7 +641,6 @@ jobs: build-push-hatchet-loadtest-arm: name: hatchet-loadtest runs-on: hatchet-arm64-2 - needs: load steps: - name: Get tag name id: tag_name diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 150f4ad88..7fb9fe0bf 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -8,9 +8,45 @@ on: required: true name: Release jobs: + load: + runs-on: ubuntu-latest + timeout-minutes: 30 + strategy: + matrix: + migrate-strategy: ["latest"] + rabbitmq-enabled: ["true", "false"] + pg-version: ["15-alpine", "16-alpine", "17-alpine"] + + steps: + - uses: actions/checkout@v4 + + - name: Install Task + uses: arduino/setup-task@v2 + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: "1.24" + + - name: Setup pnpm + uses: pnpm/action-setup@v4 + with: + version: 9.15.4 + run_install: false + + - name: Go deps + run: go mod download + + - name: Test + run: | + go test -tags load ./... -p 5 -v -race -failfast -timeout 20m + env: + TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }} + TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }} + TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }} push-hatchet-server: name: Push latest runs-on: ubuntu-latest + needs: load steps: - name: Get tag name id: tag_name diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 678dc577d..28fcb705b 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -827,20 +827,6 @@ func (w *workflowRunAcks) getNonAckdWorkflowRuns() []string { return ids } -func (w *workflowRunAcks) getNonAckdWorkflowRunsMap() map[string]bool { - w.mu.RLock() - defer w.mu.RUnlock() - - // copy ids to a new map - acks := make(map[string]bool, len(w.acks)) - for id, ack := range w.acks { - if !ack { - acks[id] = ack - } - } - return acks -} - func (w *workflowRunAcks) ackWorkflowRun(id string) { w.mu.Lock() defer w.mu.Unlock() @@ -848,6 +834,14 @@ func (w *workflowRunAcks) ackWorkflowRun(id string) { w.acks[id] = true } +func (w *workflowRunAcks) hasWorkflowRun(id string) bool { + w.mu.RLock() + defer w.mu.RUnlock() + + _, ok := w.acks[id] + return ok +} + type sendTimeFilter struct { mu sync.Mutex } diff --git a/internal/services/dispatcher/server_v1.go b/internal/services/dispatcher/server_v1.go index 17430daa2..2542f19e7 100644 --- a/internal/services/dispatcher/server_v1.go +++ b/internal/services/dispatcher/server_v1.go @@ -135,9 +135,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S wg.Add(1) defer wg.Done() - workflowRunIds := acks.getNonAckdWorkflowRunsMap() - - if matchedWorkflowRunIds, ok := s.isMatchingWorkflowRunV1(msg, workflowRunIds); ok { + if matchedWorkflowRunIds, ok := s.isMatchingWorkflowRunV1(msg, acks); ok { if err := iter(matchedWorkflowRunIds); err != nil { s.l.Error().Err(err).Msg("could not iterate over workflow runs") } @@ -987,14 +985,14 @@ func (s *DispatcherImpl) msgsToWorkflowEvent(msgId string, payloads [][]byte, fi return matches, nil } -func (s *DispatcherImpl) isMatchingWorkflowRunV1(msg *msgqueue.Message, workflowRunIds map[string]bool) ([]string, bool) { +func (s *DispatcherImpl) isMatchingWorkflowRunV1(msg *msgqueue.Message, acks *workflowRunAcks) ([]string, bool) { switch msg.ID { case "workflow-run-finished": payloads := msgqueue.JSONConvert[tasktypes.NotifyFinalizedPayload](msg.Payloads) res := make([]string, 0) for _, payload := range payloads { - if _, ok := workflowRunIds[payload.ExternalId]; ok { + if acks.hasWorkflowRun(payload.ExternalId) { res = append(res, payload.ExternalId) } } @@ -1009,7 +1007,7 @@ func (s *DispatcherImpl) isMatchingWorkflowRunV1(msg *msgqueue.Message, workflow res := make([]string, 0) for _, payload := range payloads { - if _, ok := workflowRunIds[payload.WorkflowRunId]; ok { + if acks.hasWorkflowRun(payload.WorkflowRunId) { res = append(res, payload.WorkflowRunId) } }