mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-02-21 07:49:48 -06:00
* initial pass on stateful intervals * pr review comments + add evict expired idempotency keys * fix: goroutine leak and name vars better * fix some cleanup logic
275 lines
7.3 KiB
Go
275 lines
7.3 KiB
Go
package operation
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func TestInterval_RunInterval_BasicTiming(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 0,
|
|
startInterval: 50 * time.Millisecond,
|
|
currInterval: 50 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 0,
|
|
incBackoffCount: 3,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
|
defer cancel()
|
|
|
|
ch := interval.RunInterval(ctx)
|
|
|
|
start := time.Now()
|
|
triggerCount := 0
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
elapsed := time.Since(start)
|
|
assert.GreaterOrEqual(t, triggerCount, 3, "Should have triggered at least 3 times")
|
|
assert.LessOrEqual(t, elapsed, 220*time.Millisecond, "Should complete within timeout plus buffer")
|
|
return
|
|
case <-ch:
|
|
triggerCount++
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestInterval_RunInterval_WithJitter(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 20 * time.Millisecond,
|
|
startInterval: 50 * time.Millisecond,
|
|
currInterval: 50 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 0,
|
|
incBackoffCount: 3,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
|
|
defer cancel()
|
|
|
|
ch := interval.RunInterval(ctx)
|
|
|
|
var timings []time.Duration
|
|
lastTrigger := time.Now()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
assert.GreaterOrEqual(t, len(timings), 2, "Should have at least 2 timing measurements")
|
|
for _, timing := range timings {
|
|
assert.GreaterOrEqual(t, timing, 50*time.Millisecond, "Timing should be at least the base interval")
|
|
assert.LessOrEqual(t, timing, 75*time.Millisecond, "Timing should include jitter but not exceed base + max jitter + buffer")
|
|
}
|
|
return
|
|
case <-ch:
|
|
now := time.Now()
|
|
if len(timings) > 0 || !lastTrigger.IsZero() {
|
|
timings = append(timings, now.Sub(lastTrigger))
|
|
}
|
|
lastTrigger = now
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestInterval_RunInterval_ContextCancellation(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 0,
|
|
startInterval: 100 * time.Millisecond,
|
|
currInterval: 100 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 0,
|
|
incBackoffCount: 3,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ch := interval.RunInterval(ctx)
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
cancel()
|
|
|
|
select {
|
|
case <-ch:
|
|
t.Fatal("Should not receive trigger after context cancellation")
|
|
case <-time.After(50 * time.Millisecond):
|
|
}
|
|
}
|
|
|
|
func TestInterval_SetIntervalGauge_ResetOnRowsModified(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 0,
|
|
startInterval: 50 * time.Millisecond,
|
|
currInterval: 200 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 5,
|
|
incBackoffCount: 3,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
interval.SetIntervalGauge(1)
|
|
|
|
assert.Equal(t, 50*time.Millisecond, interval.currInterval, "Should reset to start interval")
|
|
assert.Equal(t, 0, interval.noActivityCount, "Should reset no rows count")
|
|
}
|
|
|
|
func TestInterval_SetIntervalGauge_BackoffMechanism(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 0,
|
|
startInterval: 50 * time.Millisecond,
|
|
currInterval: 50 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 0,
|
|
incBackoffCount: 3,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
interval.SetIntervalGauge(0)
|
|
assert.Equal(t, 1, interval.noActivityCount)
|
|
assert.Equal(t, 50*time.Millisecond, interval.currInterval)
|
|
|
|
interval.SetIntervalGauge(0)
|
|
assert.Equal(t, 2, interval.noActivityCount)
|
|
assert.Equal(t, 50*time.Millisecond, interval.currInterval)
|
|
|
|
interval.SetIntervalGauge(0)
|
|
assert.Equal(t, 0, interval.noActivityCount, "Should reset count after backoff")
|
|
assert.Equal(t, 100*time.Millisecond, interval.currInterval, "Should double the interval")
|
|
|
|
interval.SetIntervalGauge(0)
|
|
interval.SetIntervalGauge(0)
|
|
interval.SetIntervalGauge(0)
|
|
assert.Equal(t, 200*time.Millisecond, interval.currInterval, "Should double again after 3 more zero-row updates")
|
|
}
|
|
|
|
func TestInterval_SetIntervalGauge_ConcurrentAccess(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 0,
|
|
startInterval: 50 * time.Millisecond,
|
|
currInterval: 50 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 0,
|
|
incBackoffCount: 3,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
numGoroutines := 10
|
|
numUpdatesPerGoroutine := 50
|
|
|
|
wg.Add(numGoroutines)
|
|
|
|
for i := 0; i < numGoroutines; i++ {
|
|
go func(goroutineID int) {
|
|
defer wg.Done()
|
|
for j := 0; j < numUpdatesPerGoroutine; j++ {
|
|
rowsModified := j % 4
|
|
interval.SetIntervalGauge(rowsModified)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
assert.GreaterOrEqual(t, interval.currInterval, 50*time.Millisecond, "Interval should be at least the start interval")
|
|
assert.GreaterOrEqual(t, interval.noActivityCount, 0, "No rows count should be non-negative")
|
|
assert.LessOrEqual(t, interval.noActivityCount, interval.incBackoffCount-1, "No rows count should not exceed backoff count")
|
|
}
|
|
|
|
func TestInterval_GetNextTrigger_ReturnsChannel(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 10 * time.Millisecond,
|
|
startInterval: 50 * time.Millisecond,
|
|
currInterval: 50 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 0,
|
|
incBackoffCount: 3,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
triggerCh := interval.getNextTrigger()
|
|
assert.NotNil(t, triggerCh, "Should return a non-nil channel")
|
|
|
|
select {
|
|
case <-triggerCh:
|
|
case <-time.After(70 * time.Millisecond):
|
|
t.Fatal("Trigger should have fired within expected time")
|
|
}
|
|
}
|
|
|
|
func TestInterval_GetNextTrigger_ConcurrentAccess(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 5 * time.Millisecond,
|
|
startInterval: 20 * time.Millisecond,
|
|
currInterval: 20 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 0,
|
|
incBackoffCount: 3,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
numGoroutines := 5
|
|
|
|
wg.Add(numGoroutines)
|
|
|
|
for i := 0; i < numGoroutines; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := 0; j < 10; j++ {
|
|
triggerCh := interval.getNextTrigger()
|
|
assert.NotNil(t, triggerCh, "Should always return a non-nil channel")
|
|
|
|
select {
|
|
case <-triggerCh:
|
|
case <-time.After(50 * time.Millisecond):
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestInterval_RunInterval_Integration(t *testing.T) {
|
|
interval := &Interval{
|
|
maxJitter: 10 * time.Millisecond,
|
|
startInterval: 50 * time.Millisecond,
|
|
currInterval: 50 * time.Millisecond,
|
|
maxInterval: 1 * time.Second,
|
|
noActivityCount: 0,
|
|
incBackoffCount: 2,
|
|
repo: v1.NewNoOpIntervalSettingsRepository(),
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|
defer cancel()
|
|
|
|
ch := interval.RunInterval(ctx)
|
|
|
|
triggerCount := 0
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
assert.GreaterOrEqual(t, triggerCount, 3, "Should have triggered multiple times")
|
|
return
|
|
case <-ch:
|
|
triggerCount++
|
|
|
|
if triggerCount <= 2 {
|
|
interval.SetIntervalGauge(0)
|
|
} else {
|
|
interval.SetIntervalGauge(1)
|
|
}
|
|
}
|
|
}
|
|
}
|