mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-18 07:15:00 -06:00
* feat: add table for storing payloads
* feat: add payload type enum
* feat: gen sqlc
* feat: initial sql impl
* feat: add payload store repo to shared
* feat: add overwrite
* fix: impl
* feat: bulk op
* feat: initial wiring of inputs for task triggers
* feat: wire up dag matches
* feat: create V1TaskWithPayload and use it everywhere
* fix: couple bugs
* fix: clean up types
* fix: overwrite
* fix: rm input from replay
* fix: move payload store to shared repo
* fix: schema
* refactor: repo setup
* refactor: repos
* fix: gen
* chore: lint
* fix: rename
* feat: naming, write dag inputs
* fix: more naming, trigger bug
* fix: dual writes for now
* fix: pass in tx
* feat: initial work on offloader
* feat: improve external offloader
* fix: some refs
* add withExternalHandler
* fix: improve impl of external store
* feat: implement offloading, fix other impls
* feat: add query to update JSON
* fix: implement offloading + updating records in payloads table
* feat: add WAL table
* feat: add queries for polling WAL and evicting
* feat: wire up writes into WAL
* fix: get job working
* refactor: improve types
* fix: infinite loop
* feat: improve offloading logic to run in two separate txes
* refactor: rework how overrides work
* fix: lint
* fix: migration number
* fix: migration
* fix: migration version
* fix: revert back to reading payloads out
* fix: fall back to previous input, part i
* fix: input fallback
* fix: add back input to replay
* fix: input fallback in dispatcher
* fix: nil check
* feat: advisory locks, part i
* fix: no skip locked
* feat: hash partitioned wal table
* fix: modify queries a bit, tweak crud enum
* fix: pk order, function to find tenants
* feat: wal processing
* fix: only write wal if an external store is enabled, fix offloading logic
* fix: spacing
* feat: schema cleanup
* fix: rm external store loc name
* fix: set content to null when offloading
* fix: cleanup, naming
* fix: pass overwrite payload store along
* debug: add some logging
* Revert "debug: add some logging"
This reverts commit 43e71eadf1.
* fix: typo
* fx: add offloatAt to store opts for offloading
* fix: handle leasing with advisory lock
* fix: struct def
* fix: requeue on payloads not found
* fix: rm hack for triggers
* fix: revert empty input on write
* fix: write input
* feat: env var for enabling / disabling dual writes
* feat: wire up dual writes
* fix: comments
* feat: generics!
* fix: panic from type cast
* fix: migration
* fix: generic
* fix: hack for T key in map
* fix: cleanup
131 lines
2.7 KiB
Go
131 lines
2.7 KiB
Go
package queueutils
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
type ID interface {
|
|
string | int64
|
|
}
|
|
|
|
type OpMethod[T ID] func(ctx context.Context, id T) (bool, error)
|
|
|
|
// SerialOperation represents a method that can only run serially.
|
|
// It can be configured with a maxJitter duration to add a random delay
|
|
// before executing, which helps prevent the "thundering herd" problem
|
|
// when many operations might start at the same time. The jitter is disabled
|
|
// by default (maxJitter=0) and can be enabled via OperationPool.WithJitter().
|
|
type SerialOperation[T ID] struct {
|
|
mu sync.RWMutex
|
|
shouldContinue bool
|
|
isRunning bool
|
|
id T
|
|
lastRun time.Time
|
|
description string
|
|
timeout time.Duration
|
|
method OpMethod[T]
|
|
maxJitter time.Duration
|
|
}
|
|
|
|
func (o *SerialOperation[T]) RunOrContinue(ql *zerolog.Logger) {
|
|
o.setContinue(true)
|
|
o.Run(ql)
|
|
}
|
|
|
|
func (o *SerialOperation[T]) Run(ql *zerolog.Logger) {
|
|
if !o.setRunning(true, ql) {
|
|
return
|
|
}
|
|
|
|
maxJitter := o.maxJitter
|
|
|
|
go func() {
|
|
defer func() {
|
|
o.setRunning(false, ql)
|
|
}()
|
|
|
|
f := func() {
|
|
o.setContinue(false)
|
|
|
|
// Apply jitter if configured
|
|
if maxJitter > 0 {
|
|
jitter := time.Duration(rand.Int63n(int64(maxJitter))) // nolint:gosec
|
|
time.Sleep(jitter)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), o.timeout)
|
|
defer cancel()
|
|
|
|
shouldContinue, err := o.method(ctx, o.id)
|
|
|
|
if err != nil {
|
|
ql.Err(err).Msgf("could not %s", o.description)
|
|
return
|
|
}
|
|
|
|
// if a continue was set during execution of the scheduler, we'd like to continue no matter what.
|
|
// if a continue was not set, we'd like to set it to the value returned by the scheduler.
|
|
if !o.getContinue() {
|
|
o.setContinue(shouldContinue)
|
|
}
|
|
}
|
|
|
|
f()
|
|
|
|
for o.getContinue() {
|
|
f()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// setRunning sets the running state of the operation and returns true if the state was changed,
|
|
// false if the state was not changed.
|
|
func (o *SerialOperation[T]) setRunning(isRunning bool, ql *zerolog.Logger) bool {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
if isRunning == o.isRunning {
|
|
return false
|
|
}
|
|
|
|
if isRunning {
|
|
var idStr string
|
|
switch id := any(o.id).(type) {
|
|
case string:
|
|
idStr = id
|
|
case int64:
|
|
idStr = strconv.FormatInt(id, 10)
|
|
default:
|
|
panic("unsupported ID type")
|
|
}
|
|
|
|
ql.Info().Str("tenant_id", idStr).TimeDiff("last_run", time.Now(), o.lastRun).Msg(o.description)
|
|
|
|
o.lastRun = time.Now()
|
|
}
|
|
|
|
o.isRunning = isRunning
|
|
|
|
return true
|
|
}
|
|
|
|
func (o *SerialOperation[T]) setContinue(shouldContinue bool) {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
o.shouldContinue = shouldContinue
|
|
}
|
|
|
|
func (o *SerialOperation[T]) getContinue() bool {
|
|
o.mu.RLock()
|
|
defer o.mu.RUnlock()
|
|
|
|
return o.shouldContinue
|
|
}
|