Files
hatchet/internal/queueutils/operation.go
matt 92843bb277 Feat: Payload Store Repository (#2047)
* feat: add table for storing payloads

* feat: add payload type enum

* feat: gen sqlc

* feat: initial sql impl

* feat: add payload store repo to shared

* feat: add overwrite

* fix: impl

* feat: bulk op

* feat: initial wiring of inputs for task triggers

* feat: wire up dag matches

* feat: create V1TaskWithPayload and use it everywhere

* fix: couple bugs

* fix: clean up types

* fix: overwrite

* fix: rm input from replay

* fix: move payload store to shared repo

* fix: schema

* refactor: repo setup

* refactor: repos

* fix: gen

* chore: lint

* fix: rename

* feat: naming, write dag inputs

* fix: more naming, trigger bug

* fix: dual writes for now

* fix: pass in tx

* feat: initial work on offloader

* feat: improve external offloader

* fix: some refs

* add withExternalHandler

* fix: improve impl of external store

* feat: implement offloading, fix other impls

* feat: add query to update JSON

* fix: implement offloading + updating records in payloads table

* feat: add WAL table

* feat: add queries for polling WAL and evicting

* feat: wire up writes into WAL

* fix: get job working

* refactor: improve types

* fix: infinite loop

* feat: improve offloading logic to run in two separate txes

* refactor: rework how overrides work

* fix: lint

* fix: migration number

* fix: migration

* fix: migration version

* fix: revert back to reading payloads out

* fix: fall back to previous input, part i

* fix: input fallback

* fix: add back input to replay

* fix: input fallback in dispatcher

* fix: nil check

* feat: advisory locks, part i

* fix: no skip locked

* feat: hash partitioned wal table

* fix: modify queries a bit, tweak crud enum

* fix: pk order, function to find tenants

* feat: wal processing

* fix: only write wal if an external store is enabled, fix offloading logic

* fix: spacing

* feat: schema cleanup

* fix: rm external store loc name

* fix: set content to null when offloading

* fix: cleanup, naming

* fix: pass overwrite payload store along

* debug: add some logging

* Revert "debug: add some logging"

This reverts commit 43e71eadf1.

* fix: typo

* fx: add offloatAt to store opts for offloading

* fix: handle leasing with advisory lock

* fix: struct def

* fix: requeue on payloads not found

* fix: rm hack for triggers

* fix: revert empty input on write

* fix: write input

* feat: env var for enabling / disabling dual writes

* feat: wire up dual writes

* fix: comments

* feat: generics!

* fix: panic from type cast

* fix: migration

* fix: generic

* fix: hack for T key in map

* fix: cleanup
2025-09-12 09:53:01 -04:00

131 lines
2.7 KiB
Go

package queueutils
import (
"context"
"math/rand"
"strconv"
"sync"
"time"
"github.com/rs/zerolog"
)
type ID interface {
string | int64
}
type OpMethod[T ID] func(ctx context.Context, id T) (bool, error)
// SerialOperation represents a method that can only run serially.
// It can be configured with a maxJitter duration to add a random delay
// before executing, which helps prevent the "thundering herd" problem
// when many operations might start at the same time. The jitter is disabled
// by default (maxJitter=0) and can be enabled via OperationPool.WithJitter().
type SerialOperation[T ID] struct {
mu sync.RWMutex
shouldContinue bool
isRunning bool
id T
lastRun time.Time
description string
timeout time.Duration
method OpMethod[T]
maxJitter time.Duration
}
func (o *SerialOperation[T]) RunOrContinue(ql *zerolog.Logger) {
o.setContinue(true)
o.Run(ql)
}
func (o *SerialOperation[T]) Run(ql *zerolog.Logger) {
if !o.setRunning(true, ql) {
return
}
maxJitter := o.maxJitter
go func() {
defer func() {
o.setRunning(false, ql)
}()
f := func() {
o.setContinue(false)
// Apply jitter if configured
if maxJitter > 0 {
jitter := time.Duration(rand.Int63n(int64(maxJitter))) // nolint:gosec
time.Sleep(jitter)
}
ctx, cancel := context.WithTimeout(context.Background(), o.timeout)
defer cancel()
shouldContinue, err := o.method(ctx, o.id)
if err != nil {
ql.Err(err).Msgf("could not %s", o.description)
return
}
// if a continue was set during execution of the scheduler, we'd like to continue no matter what.
// if a continue was not set, we'd like to set it to the value returned by the scheduler.
if !o.getContinue() {
o.setContinue(shouldContinue)
}
}
f()
for o.getContinue() {
f()
}
}()
}
// setRunning sets the running state of the operation and returns true if the state was changed,
// false if the state was not changed.
func (o *SerialOperation[T]) setRunning(isRunning bool, ql *zerolog.Logger) bool {
o.mu.Lock()
defer o.mu.Unlock()
if isRunning == o.isRunning {
return false
}
if isRunning {
var idStr string
switch id := any(o.id).(type) {
case string:
idStr = id
case int64:
idStr = strconv.FormatInt(id, 10)
default:
panic("unsupported ID type")
}
ql.Info().Str("tenant_id", idStr).TimeDiff("last_run", time.Now(), o.lastRun).Msg(o.description)
o.lastRun = time.Now()
}
o.isRunning = isRunning
return true
}
func (o *SerialOperation[T]) setContinue(shouldContinue bool) {
o.mu.Lock()
defer o.mu.Unlock()
o.shouldContinue = shouldContinue
}
func (o *SerialOperation[T]) getContinue() bool {
o.mu.RLock()
defer o.mu.RUnlock()
return o.shouldContinue
}