Files
hatchet/internal/queueutils/operation_test.go
matt 92843bb277 Feat: Payload Store Repository (#2047)
* feat: add table for storing payloads

* feat: add payload type enum

* feat: gen sqlc

* feat: initial sql impl

* feat: add payload store repo to shared

* feat: add overwrite

* fix: impl

* feat: bulk op

* feat: initial wiring of inputs for task triggers

* feat: wire up dag matches

* feat: create V1TaskWithPayload and use it everywhere

* fix: couple bugs

* fix: clean up types

* fix: overwrite

* fix: rm input from replay

* fix: move payload store to shared repo

* fix: schema

* refactor: repo setup

* refactor: repos

* fix: gen

* chore: lint

* fix: rename

* feat: naming, write dag inputs

* fix: more naming, trigger bug

* fix: dual writes for now

* fix: pass in tx

* feat: initial work on offloader

* feat: improve external offloader

* fix: some refs

* add withExternalHandler

* fix: improve impl of external store

* feat: implement offloading, fix other impls

* feat: add query to update JSON

* fix: implement offloading + updating records in payloads table

* feat: add WAL table

* feat: add queries for polling WAL and evicting

* feat: wire up writes into WAL

* fix: get job working

* refactor: improve types

* fix: infinite loop

* feat: improve offloading logic to run in two separate txes

* refactor: rework how overrides work

* fix: lint

* fix: migration number

* fix: migration

* fix: migration version

* fix: revert back to reading payloads out

* fix: fall back to previous input, part i

* fix: input fallback

* fix: add back input to replay

* fix: input fallback in dispatcher

* fix: nil check

* feat: advisory locks, part i

* fix: no skip locked

* feat: hash partitioned wal table

* fix: modify queries a bit, tweak crud enum

* fix: pk order, function to find tenants

* feat: wal processing

* fix: only write wal if an external store is enabled, fix offloading logic

* fix: spacing

* feat: schema cleanup

* fix: rm external store loc name

* fix: set content to null when offloading

* fix: cleanup, naming

* fix: pass overwrite payload store along

* debug: add some logging

* Revert "debug: add some logging"

This reverts commit 43e71eadf1.

* fix: typo

* fx: add offloatAt to store opts for offloading

* fix: handle leasing with advisory lock

* fix: struct def

* fix: requeue on payloads not found

* fix: rm hack for triggers

* fix: revert empty input on write

* fix: write input

* feat: env var for enabling / disabling dual writes

* feat: wire up dual writes

* fix: comments

* feat: generics!

* fix: panic from type cast

* fix: migration

* fix: generic

* fix: hack for T key in map

* fix: cleanup
2025-09-12 09:53:01 -04:00

119 lines
2.6 KiB
Go

//go:build !e2e && !load && !rampup && !integration
package queueutils
import (
"context"
"sync"
"testing"
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)
var l = zerolog.Nop()
func TestSerialOperation_RunOrContinue_NoConcurrentExecution(t *testing.T) {
var runCount int
var mu sync.Mutex
mockMethod := func(ctx context.Context, id string) (bool, error) {
if !mu.TryLock() {
panic("Concurrent execution detected")
}
runCount++
mu.Unlock()
time.Sleep(100 * time.Millisecond) // Simulate method execution time
return false, nil
}
operation := &SerialOperation[string]{
id: "1234",
description: "Test operation",
timeout: 2 * time.Second,
method: mockMethod,
}
// First run
operation.RunOrContinue(&l)
// Try to trigger a set of runs concurrently, it should not start until the first finishes
time.Sleep(10 * time.Millisecond)
operation.RunOrContinue(&l)
operation.RunOrContinue(&l)
operation.RunOrContinue(&l)
operation.RunOrContinue(&l)
// Wait for both to finish
time.Sleep(250 * time.Millisecond)
mu.Lock()
assert.Equal(t, 2, runCount, "The method should not run concurrently")
mu.Unlock()
}
func TestSerialOperation_RunOrContinue_ShouldRunAfterCompletion(t *testing.T) {
var runCount int
var mu sync.Mutex
mockMethod := func(ctx context.Context, id string) (bool, error) {
mu.Lock()
runCount++
mu.Unlock()
time.Sleep(50 * time.Millisecond) // Simulate method execution time
return false, nil
}
operation := &SerialOperation[string]{
id: "1234",
description: "Test operation",
timeout: 2 * time.Second,
method: mockMethod,
}
// First run
operation.RunOrContinue(&l)
time.Sleep(110 * time.Millisecond)
// Second run after first finishes
operation.RunOrContinue(&l)
time.Sleep(110 * time.Millisecond)
mu.Lock()
assert.Equal(t, 2, runCount, "The method should run twice after completion")
mu.Unlock()
}
func TestSerialOperation_RunOrContinue_ShouldRunOnContinues(t *testing.T) {
var runCount int
var mu sync.Mutex
mockMethod := func(ctx context.Context, id string) (bool, error) {
mu.Lock()
runCount++
mu.Unlock()
time.Sleep(25 * time.Millisecond) // Simulate method execution time
return runCount < 5, nil
}
operation := &SerialOperation[string]{
id: "1234",
description: "Test operation",
timeout: 2 * time.Second,
method: mockMethod,
}
// First run
operation.RunOrContinue(&l)
time.Sleep(200 * time.Millisecond)
mu.Lock()
assert.Equal(t, 5, runCount, "The method should run five times on continue")
mu.Unlock()
}