fix: dont copy acks map (#1811)

* dont copy

* lock

* update release and prerelease

---------

Co-authored-by: Alexander Belanger <alexander@hatchet.run>
This commit is contained in:
Gabe Ruttner
2025-06-03 16:53:20 -07:00
committed by Alexander Belanger
parent d1146eea14
commit c10b9db7d6
4 changed files with 48 additions and 71 deletions

View File

@@ -4,45 +4,9 @@ on:
- "v*" # Push events to matching v*, i.e. v1.0, v20.15.10
name: Create prerelease w/ binaries and static assets
jobs:
load:
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
matrix:
migrate-strategy: ["latest"]
rabbitmq-enabled: ["true", "false"]
pg-version: ["15-alpine", "16-alpine", "17-alpine"]
steps:
- uses: actions/checkout@v4
- name: Install Task
uses: arduino/setup-task@v2
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: "1.24"
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 9.15.4
run_install: false
- name: Go deps
run: go mod download
- name: Test
run: |
go test -tags load ./... -p 5 -v -race -failfast -timeout 20m
env:
TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }}
TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }}
TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }}
build-push-hatchet-api-amd:
name: hatchet-api
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -68,7 +32,6 @@ jobs:
build-push-hatchet-api-arm:
name: hatchet-api
runs-on: hatchet-arm64-2
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -121,7 +84,6 @@ jobs:
build-push-hatchet-admin-amd:
name: hatchet-admin
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -147,7 +109,6 @@ jobs:
build-push-hatchet-admin-arm:
name: hatchet-admin
runs-on: hatchet-arm64-2
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -200,7 +161,6 @@ jobs:
build-push-hatchet-engine-amd:
name: hatchet-engine
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -226,7 +186,6 @@ jobs:
build-push-hatchet-engine-arm:
name: hatchet-engine
runs-on: hatchet-arm64-2
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -279,7 +238,6 @@ jobs:
build-push-hatchet-migrate-amd:
name: hatchet-migrate
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -305,7 +263,6 @@ jobs:
build-push-hatchet-migrate-arm:
name: hatchet-migrate
runs-on: hatchet-arm64-2
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -358,7 +315,6 @@ jobs:
build-push-hatchet-frontend-amd:
name: hatchet-frontend
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -382,7 +338,6 @@ jobs:
build-push-hatchet-frontend-arm:
name: hatchet-frontend
runs-on: hatchet-arm64-2
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -433,7 +388,6 @@ jobs:
build-push-hatchet-lite-amd:
name: hatchet-lite-amd
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -487,7 +441,6 @@ jobs:
build-push-hatchet-lite-arm:
name: hatchet-lite-arm
runs-on: hatchet-arm64-2
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -568,7 +521,6 @@ jobs:
build-push-hatchet-dashboard-amd:
name: hatchet-dashboard-amd
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -604,7 +556,6 @@ jobs:
build-push-hatchet-dashboard-arm:
name: hatchet-dashboard-arm
runs-on: hatchet-arm64-2
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -667,7 +618,6 @@ jobs:
build-push-hatchet-loadtest-amd:
name: hatchet-loadtest
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name
@@ -691,7 +641,6 @@ jobs:
build-push-hatchet-loadtest-arm:
name: hatchet-loadtest
runs-on: hatchet-arm64-2
needs: load
steps:
- name: Get tag name
id: tag_name

View File

@@ -8,9 +8,45 @@ on:
required: true
name: Release
jobs:
load:
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
matrix:
migrate-strategy: ["latest"]
rabbitmq-enabled: ["true", "false"]
pg-version: ["15-alpine", "16-alpine", "17-alpine"]
steps:
- uses: actions/checkout@v4
- name: Install Task
uses: arduino/setup-task@v2
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: "1.24"
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 9.15.4
run_install: false
- name: Go deps
run: go mod download
- name: Test
run: |
go test -tags load ./... -p 5 -v -race -failfast -timeout 20m
env:
TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }}
TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }}
TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }}
push-hatchet-server:
name: Push latest
runs-on: ubuntu-latest
needs: load
steps:
- name: Get tag name
id: tag_name

View File

@@ -827,20 +827,6 @@ func (w *workflowRunAcks) getNonAckdWorkflowRuns() []string {
return ids
}
func (w *workflowRunAcks) getNonAckdWorkflowRunsMap() map[string]bool {
w.mu.RLock()
defer w.mu.RUnlock()
// copy ids to a new map
acks := make(map[string]bool, len(w.acks))
for id, ack := range w.acks {
if !ack {
acks[id] = ack
}
}
return acks
}
func (w *workflowRunAcks) ackWorkflowRun(id string) {
w.mu.Lock()
defer w.mu.Unlock()
@@ -848,6 +834,14 @@ func (w *workflowRunAcks) ackWorkflowRun(id string) {
w.acks[id] = true
}
func (w *workflowRunAcks) hasWorkflowRun(id string) bool {
w.mu.RLock()
defer w.mu.RUnlock()
_, ok := w.acks[id]
return ok
}
type sendTimeFilter struct {
mu sync.Mutex
}

View File

@@ -135,9 +135,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S
wg.Add(1)
defer wg.Done()
workflowRunIds := acks.getNonAckdWorkflowRunsMap()
if matchedWorkflowRunIds, ok := s.isMatchingWorkflowRunV1(msg, workflowRunIds); ok {
if matchedWorkflowRunIds, ok := s.isMatchingWorkflowRunV1(msg, acks); ok {
if err := iter(matchedWorkflowRunIds); err != nil {
s.l.Error().Err(err).Msg("could not iterate over workflow runs")
}
@@ -987,14 +985,14 @@ func (s *DispatcherImpl) msgsToWorkflowEvent(msgId string, payloads [][]byte, fi
return matches, nil
}
func (s *DispatcherImpl) isMatchingWorkflowRunV1(msg *msgqueue.Message, workflowRunIds map[string]bool) ([]string, bool) {
func (s *DispatcherImpl) isMatchingWorkflowRunV1(msg *msgqueue.Message, acks *workflowRunAcks) ([]string, bool) {
switch msg.ID {
case "workflow-run-finished":
payloads := msgqueue.JSONConvert[tasktypes.NotifyFinalizedPayload](msg.Payloads)
res := make([]string, 0)
for _, payload := range payloads {
if _, ok := workflowRunIds[payload.ExternalId]; ok {
if acks.hasWorkflowRun(payload.ExternalId) {
res = append(res, payload.ExternalId)
}
}
@@ -1009,7 +1007,7 @@ func (s *DispatcherImpl) isMatchingWorkflowRunV1(msg *msgqueue.Message, workflow
res := make([]string, 0)
for _, payload := range payloads {
if _, ok := workflowRunIds[payload.WorkflowRunId]; ok {
if acks.hasWorkflowRun(payload.WorkflowRunId) {
res = append(res, payload.WorkflowRunId)
}
}