Files
hatchet/internal/queueutils/pool.go
matt 92843bb277 Feat: Payload Store Repository (#2047)
* feat: add table for storing payloads

* feat: add payload type enum

* feat: gen sqlc

* feat: initial sql impl

* feat: add payload store repo to shared

* feat: add overwrite

* fix: impl

* feat: bulk op

* feat: initial wiring of inputs for task triggers

* feat: wire up dag matches

* feat: create V1TaskWithPayload and use it everywhere

* fix: couple bugs

* fix: clean up types

* fix: overwrite

* fix: rm input from replay

* fix: move payload store to shared repo

* fix: schema

* refactor: repo setup

* refactor: repos

* fix: gen

* chore: lint

* fix: rename

* feat: naming, write dag inputs

* fix: more naming, trigger bug

* fix: dual writes for now

* fix: pass in tx

* feat: initial work on offloader

* feat: improve external offloader

* fix: some refs

* add withExternalHandler

* fix: improve impl of external store

* feat: implement offloading, fix other impls

* feat: add query to update JSON

* fix: implement offloading + updating records in payloads table

* feat: add WAL table

* feat: add queries for polling WAL and evicting

* feat: wire up writes into WAL

* fix: get job working

* refactor: improve types

* fix: infinite loop

* feat: improve offloading logic to run in two separate txes

* refactor: rework how overrides work

* fix: lint

* fix: migration number

* fix: migration

* fix: migration version

* fix: revert back to reading payloads out

* fix: fall back to previous input, part i

* fix: input fallback

* fix: add back input to replay

* fix: input fallback in dispatcher

* fix: nil check

* feat: advisory locks, part i

* fix: no skip locked

* feat: hash partitioned wal table

* fix: modify queries a bit, tweak crud enum

* fix: pk order, function to find tenants

* feat: wal processing

* fix: only write wal if an external store is enabled, fix offloading logic

* fix: spacing

* feat: schema cleanup

* fix: rm external store loc name

* fix: set content to null when offloading

* fix: cleanup, naming

* fix: pass overwrite payload store along

* debug: add some logging

* Revert "debug: add some logging"

This reverts commit 43e71eadf1.

* fix: typo

* fx: add offloatAt to store opts for offloading

* fix: handle leasing with advisory lock

* fix: struct def

* fix: requeue on payloads not found

* fix: rm hack for triggers

* fix: revert empty input on write

* fix: write input

* feat: env var for enabling / disabling dual writes

* feat: wire up dual writes

* fix: comments

* feat: generics!

* fix: panic from type cast

* fix: migration

* fix: generic

* fix: hack for T key in map

* fix: cleanup
2025-09-12 09:53:01 -04:00

92 lines
1.9 KiB
Go

package queueutils
import (
"sync"
"time"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"github.com/rs/zerolog"
)
type OperationPool[T ID] struct {
ops sync.Map
timeout time.Duration
description string
method OpMethod[T]
ql *zerolog.Logger
maxJitter time.Duration
}
func NewOperationPool[T ID](ql *zerolog.Logger, timeout time.Duration, description string, method OpMethod[T]) *OperationPool[T] {
return &OperationPool[T]{
timeout: timeout,
description: description,
method: method,
ql: ql,
maxJitter: 0, // Default to no jitter
}
}
func (p *OperationPool[T]) WithJitter(maxJitter time.Duration) *OperationPool[T] {
p.maxJitter = maxJitter
return p
}
func (p *OperationPool[T]) SetTenants(tenants []*dbsqlc.Tenant) {
tenantMap := make(map[string]bool)
for _, t := range tenants {
tenantMap[sqlchelpers.UUIDToStr(t.ID)] = true
}
// delete tenants that are not in the list
p.ops.Range(func(key, value interface{}) bool {
if _, ok := tenantMap[key.(string)]; !ok {
p.ops.Delete(key)
}
return true
})
}
func (p *OperationPool[T]) SetPartitions(partitions []int64) {
partitionMap := make(map[int64]bool)
for _, partitionId := range partitions {
partitionMap[partitionId] = true
}
p.ops.Range(func(key, value interface{}) bool {
if _, ok := partitionMap[key.(int64)]; !ok {
p.ops.Delete(key)
}
return true
})
}
func (p *OperationPool[T]) RunOrContinue(id T) {
p.GetOperation(id).RunOrContinue(p.ql)
}
func (p *OperationPool[T]) GetOperation(id T) *SerialOperation[T] {
op, ok := p.ops.Load(id)
if !ok {
op = &SerialOperation[T]{
id: id,
lastRun: time.Now(),
description: p.description,
timeout: p.timeout,
method: p.method,
maxJitter: p.maxJitter,
}
p.ops.Store(id, op)
}
return op.(*SerialOperation[T])
}