feat: more features in the load testing harness (#1691)

* fix: make stripped payload size configurable

* feat: more load test features

* Update cmd/hatchet-loadtest/do.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: try to fix load tests

* increase timeout, update goleak ignores

* fix: data race in scheduler with snapshot input

* fix: logger improvements

* add one more goleak ignore

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
abelanger5
2025-05-07 21:39:30 -04:00
committed by GitHub
parent 858f4d9b82
commit 5c5c1aa5a1
16 changed files with 340 additions and 140 deletions
+2 -2
View File
@@ -320,7 +320,7 @@ jobs:
- name: Test
run: |
go test -tags load ./... -p 5 -v -race -failfast
go test -tags load ./... -p 5 -v -race -failfast -timeout 20m
env:
TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }}
TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }}
@@ -356,7 +356,7 @@ jobs:
- name: Test
run: |
go test -tags rampup ./... -p 5 -v -race -failfast
go test -tags rampup ./... -p 5 -v -race -failfast -timeout 20m
env:
TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }}
TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }}
+25 -16
View File
@@ -12,8 +12,8 @@ type avgResult struct {
avg time.Duration
}
func do(namespace string, duration time.Duration, eventsPerSecond int, delay time.Duration, wait time.Duration, concurrency int, workerDelay time.Duration, slots int, failureRate float32, payloadSize string, eventFanout int) error {
l.Info().Msgf("testing with duration=%s, eventsPerSecond=%d, delay=%s, wait=%s, concurrency=%d", duration, eventsPerSecond, delay, wait, concurrency)
func do(config LoadTestConfig) error {
l.Info().Msgf("testing with duration=%s, eventsPerSecond=%d, delay=%s, wait=%s, concurrency=%d", config.Duration, config.Events, config.Delay, config.Wait, config.Concurrency)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -21,12 +21,12 @@ func do(namespace string, duration time.Duration, eventsPerSecond int, delay tim
after := 10 * time.Second
go func() {
time.Sleep(duration + after + wait + 5*time.Second)
time.Sleep(config.Duration + after + config.Wait + 5*time.Second)
cancel()
}()
ch := make(chan int64, 2)
durations := make(chan time.Duration, eventsPerSecond)
durations := make(chan time.Duration, config.Events)
// Compute running average for executed durations using a rolling average.
durationsResult := make(chan avgResult)
@@ -44,25 +44,34 @@ func do(namespace string, duration time.Duration, eventsPerSecond int, delay tim
durationsResult <- avgResult{count: count, avg: avg}
}()
// Start worker and ensure it has time to register
workerStarted := make(chan struct{})
go func() {
if workerDelay > 0 {
if config.WorkerDelay > 0 {
// run a worker to register the workflow
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
run(ctx, namespace, delay, durations, concurrency, slots, failureRate, eventFanout)
run(ctx, config, durations)
cancel()
l.Info().Msgf("wait %s before starting the worker", workerDelay)
time.Sleep(workerDelay)
l.Info().Msgf("wait %s before starting the worker", config.WorkerDelay)
time.Sleep(config.WorkerDelay)
}
l.Info().Msg("starting worker now")
count, uniques := run(ctx, namespace, delay, durations, concurrency, slots, failureRate, eventFanout)
// Signal that worker is starting
close(workerStarted)
count, uniques := run(ctx, config, durations)
close(durations)
ch <- count
ch <- uniques
}()
// Wait for worker to start, then give it time to register workflows
<-workerStarted
time.Sleep(after)
scheduled := make(chan time.Duration, eventsPerSecond)
scheduled := make(chan time.Duration, config.Events)
// Compute running average for scheduled times using a rolling average.
scheduledResult := make(chan avgResult)
@@ -80,7 +89,7 @@ func do(namespace string, duration time.Duration, eventsPerSecond int, delay tim
scheduledResult <- avgResult{count: count, avg: avg}
}()
emitted := emit(ctx, namespace, eventsPerSecond, duration, scheduled, payloadSize)
emitted := emit(ctx, config.Namespace, config.Events, config.Duration, scheduled, config.PayloadSize)
close(scheduled)
executed := <-ch
@@ -89,7 +98,7 @@ func do(namespace string, duration time.Duration, eventsPerSecond int, delay tim
finalDurationResult := <-durationsResult
finalScheduledResult := <-scheduledResult
log.Printf("️ emitted %d, executed %d, uniques %d, using %d events/s", emitted, executed, uniques, eventsPerSecond)
log.Printf("️ emitted %d, executed %d, uniques %d, using %d events/s", emitted, executed, uniques, config.Events)
if executed == 0 {
return fmt.Errorf("❌ no events executed")
@@ -98,12 +107,12 @@ func do(namespace string, duration time.Duration, eventsPerSecond int, delay tim
log.Printf("️ final average duration per executed event: %s", finalDurationResult.avg)
log.Printf("️ final average scheduling time per event: %s", finalScheduledResult.avg)
if int64(eventFanout)*emitted != executed {
log.Printf("⚠️ warning: emitted and executed counts do not match: %d != %d", int64(eventFanout)*emitted, executed)
if int64(config.EventFanout)*emitted*int64(config.DagSteps) != executed {
log.Printf("⚠️ warning: emitted and executed counts do not match: %d != %d", int64(config.EventFanout)*emitted*int64(config.DagSteps), executed)
}
if int64(eventFanout)*emitted != uniques {
return fmt.Errorf("❌ emitted and unique executed counts do not match: %d != %d", int64(eventFanout)*emitted, uniques)
if int64(config.EventFanout)*emitted*int64(config.DagSteps) != uniques {
return fmt.Errorf("❌ emitted and unique executed counts do not match: %d != %d", int64(config.EventFanout)*emitted, uniques)
}
log.Printf("✅ success")
+7 -3
View File
@@ -10,6 +10,7 @@ import (
"time"
"github.com/hatchet-dev/hatchet/pkg/client"
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
)
type Event struct {
@@ -40,8 +41,11 @@ func parseSize(s string) int {
}
func emit(ctx context.Context, namespace string, amountPerSecond int, duration time.Duration, scheduled chan<- time.Duration, payloadArg string) int64 {
c, err := client.New(
client.WithNamespace(namespace),
c, err := v1.NewHatchetClient(
v1.Config{
Namespace: namespace,
Logger: &l,
},
)
if err != nil {
@@ -67,7 +71,7 @@ func emit(ctx context.Context, namespace string, amountPerSecond int, duration t
for ev := range jobCh {
l.Info().Msgf("pushing event %d", ev.ID)
err := c.Event().Push(context.Background(), "load-test:event", ev, client.WithEventMetadata(map[string]string{
err := c.Events().Push(context.Background(), "load-test:event", ev, client.WithEventMetadata(map[string]string{
"event_id": fmt.Sprintf("%d", ev.ID),
}))
if err != nil {
+102 -38
View File
@@ -18,14 +18,7 @@ func TestMain(m *testing.M) {
}
func TestLoadCLI(t *testing.T) {
type args struct {
duration time.Duration
eventsPerSecond int
delay time.Duration
wait time.Duration
workerDelay time.Duration
concurrency int
}
// We're using LoadTestConfig directly instead of an args struct
l = logger.NewStdErr(
&shared.LoggerConfigFile{
@@ -37,47 +30,116 @@ func TestLoadCLI(t *testing.T) {
tests := []struct {
name string
args args
config LoadTestConfig
wantErr bool
}{
{
name: "test with high step delay",
args: args{
duration: 240 * time.Second,
eventsPerSecond: 10,
delay: 10 * time.Second,
wait: 60 * time.Second,
concurrency: 0,
name: "test simple workflow",
config: LoadTestConfig{
Duration: 240 * time.Second,
Events: 10,
Delay: 0 * time.Second,
Wait: 60 * time.Second,
Concurrency: 0,
Slots: 100,
FailureRate: 0.0,
PayloadSize: "0kb",
EventFanout: 1,
DagSteps: 1,
RlKeys: 0,
RlLimit: 0,
RlDurationUnit: "",
},
}, {
name: "test simple with unlimited concurrency",
args: args{
duration: 240 * time.Second,
eventsPerSecond: 10,
delay: 0 * time.Second,
wait: 60 * time.Second,
concurrency: 0,
},
{
name: "test with DAG",
config: LoadTestConfig{
Duration: 240 * time.Second,
Events: 10,
Delay: 0 * time.Second,
Wait: 60 * time.Second,
Concurrency: 0,
Slots: 100,
FailureRate: 0.0,
PayloadSize: "0kb",
EventFanout: 1,
DagSteps: 2,
RlKeys: 0,
RlLimit: 0,
RlDurationUnit: "",
},
},
{
name: "test with event fanout",
config: LoadTestConfig{
Duration: 240 * time.Second,
Events: 10,
Delay: 0 * time.Second,
Wait: 60 * time.Second,
Concurrency: 0,
Slots: 100,
FailureRate: 0.0,
PayloadSize: "0kb",
EventFanout: 2,
DagSteps: 1,
RlKeys: 0,
RlLimit: 0,
RlDurationUnit: "",
},
},
{
name: "test with global concurrency key",
args: args{
duration: 240 * time.Second,
eventsPerSecond: 10,
delay: 0 * time.Second,
wait: 60 * time.Second,
concurrency: 10,
config: LoadTestConfig{
Duration: 240 * time.Second,
Events: 10,
Delay: 0 * time.Second,
Wait: 60 * time.Second,
Concurrency: 10,
Slots: 100,
FailureRate: 0.0,
PayloadSize: "0kb",
EventFanout: 1,
DagSteps: 1,
RlKeys: 0,
RlLimit: 0,
RlDurationUnit: "",
},
},
{
name: "test for many queued events and little worker throughput",
args: args{
duration: 240 * time.Second,
eventsPerSecond: 10,
delay: 0 * time.Second,
workerDelay: 120 * time.Second, // will write 1200 events before the worker is ready
wait: 120 * time.Second,
concurrency: 0,
config: LoadTestConfig{
Duration: 240 * time.Second,
Events: 10,
Delay: 0 * time.Second,
WorkerDelay: 120 * time.Second, // will write 1200 events before the worker is ready
Wait: 120 * time.Second,
Concurrency: 0,
Slots: 100,
FailureRate: 0.0,
PayloadSize: "0kb",
EventFanout: 1,
DagSteps: 1,
RlKeys: 0,
RlLimit: 0,
RlDurationUnit: "",
},
},
{
name: "test with rate limits",
config: LoadTestConfig{
Duration: 240 * time.Second,
Events: 10,
Delay: 0 * time.Second,
Wait: 60 * time.Second,
Concurrency: 0,
Slots: 100,
FailureRate: 0.0,
PayloadSize: "0kb",
EventFanout: 1,
DagSteps: 1,
RlKeys: 10,
RlLimit: 100,
RlDurationUnit: "second",
},
},
}
@@ -96,7 +158,9 @@ func TestLoadCLI(t *testing.T) {
t.Fatalf("could not generate random namespace: %s", err)
}
if err := do(namespace, tt.args.duration, tt.args.eventsPerSecond, tt.args.delay, tt.args.wait, tt.args.concurrency, tt.args.workerDelay, 100, 0.0, "0kb", 1); (err != nil) != tt.wantErr {
testConfig := tt.config
testConfig.Namespace = namespace
if err := do(testConfig); (err != nil) != tt.wantErr {
t.Errorf("do() error = %v, wantErr %v", err, tt.wantErr)
}
})
+41 -23
View File
@@ -17,25 +17,34 @@ import (
var l zerolog.Logger
// LoadTestConfig holds all configuration for the load test
type LoadTestConfig struct {
Namespace string
Events int
Concurrency int
Duration time.Duration
Wait time.Duration
Delay time.Duration
WorkerDelay time.Duration
Slots int
FailureRate float32
PayloadSize string
EventFanout int
DagSteps int
RlKeys int
RlLimit int
RlDurationUnit string
}
func main() {
var events int
var concurrency int
var duration time.Duration
var wait time.Duration
var delay time.Duration
var workerDelay time.Duration
var logLevel string
var slots int
var failureRate float32
var payloadSize string
var eventFanout int
config := LoadTestConfig{}
var loadtest = &cobra.Command{
Use: "loadtest",
Run: func(cmd *cobra.Command, args []string) {
l = logger.NewStdErr(
&shared.LoggerConfigFile{
Level: logLevel,
Level: cmd.Flag("level").Value.String(),
Format: "console",
},
"loadtest",
@@ -48,24 +57,30 @@ func main() {
}()
}
if err := do(os.Getenv("HATCHET_CLIENT_NAMESPACE"), duration, events, delay, wait, concurrency, workerDelay, slots, failureRate, payloadSize, eventFanout); err != nil {
config.Namespace = os.Getenv("HATCHET_CLIENT_NAMESPACE")
if err := do(config); err != nil {
log.Println(err)
panic("load test failed")
}
},
}
loadtest.Flags().IntVarP(&events, "events", "e", 10, "events per second")
loadtest.Flags().IntVarP(&concurrency, "concurrency", "c", 0, "concurrency specifies the maximum events to run at the same time")
loadtest.Flags().DurationVarP(&duration, "duration", "d", 10*time.Second, "duration specifies the total time to run the load test")
loadtest.Flags().DurationVarP(&delay, "delay", "D", 0, "delay specifies the time to wait in each event to simulate slow tasks")
loadtest.Flags().DurationVarP(&wait, "wait", "w", 10*time.Second, "wait specifies the total time to wait until events complete")
loadtest.Flags().DurationVarP(&workerDelay, "workerDelay", "p", 0*time.Second, "workerDelay specifies the time to wait before starting the worker")
loadtest.Flags().IntVarP(&config.Events, "events", "e", 10, "events per second")
loadtest.Flags().IntVarP(&config.Concurrency, "concurrency", "c", 0, "concurrency specifies the maximum events to run at the same time")
loadtest.Flags().DurationVarP(&config.Duration, "duration", "d", 10*time.Second, "duration specifies the total time to run the load test")
loadtest.Flags().DurationVarP(&config.Delay, "delay", "D", 0, "delay specifies the time to wait in each event to simulate slow tasks")
loadtest.Flags().DurationVarP(&config.Wait, "wait", "w", 10*time.Second, "wait specifies the total time to wait until events complete")
loadtest.Flags().DurationVarP(&config.WorkerDelay, "workerDelay", "p", 0*time.Second, "workerDelay specifies the time to wait before starting the worker")
loadtest.Flags().IntVarP(&config.Slots, "slots", "s", 0, "slots specifies the number of slots to use in the worker")
loadtest.Flags().Float32VarP(&config.FailureRate, "failureRate", "f", 0, "failureRate specifies the rate of failure for the worker")
loadtest.Flags().StringVarP(&config.PayloadSize, "payloadSize", "P", "0kb", "payload specifies the size of the payload to send")
loadtest.Flags().IntVarP(&config.EventFanout, "eventFanout", "F", 1, "eventFanout specifies the number of events to fanout")
loadtest.Flags().IntVarP(&config.DagSteps, "dagSteps", "g", 1, "dagSteps specifies the number of steps in the DAG")
loadtest.Flags().IntVar(&config.RlKeys, "rlKeys", 0, "rlKeys specifies the number of keys to use in the rate limit")
loadtest.Flags().IntVar(&config.RlLimit, "rlLimit", 0, "rlLimit specifies the rate limit")
loadtest.Flags().StringVar(&config.RlDurationUnit, "rlDurationUnit", "second", "rlDurationUnit specifies the duration unit for the rate limit (second, minute, hour)")
loadtest.Flags().StringVarP(&logLevel, "level", "l", "info", "logLevel specifies the log level (debug, info, warn, error)")
loadtest.Flags().IntVarP(&slots, "slots", "s", 0, "slots specifies the number of slots to use in the worker")
loadtest.Flags().Float32VarP(&failureRate, "failureRate", "f", 0, "failureRate specifies the rate of failure for the worker")
loadtest.Flags().StringVarP(&payloadSize, "payloadSize", "P", "0kb", "payload specifies the size of the payload to send")
loadtest.Flags().IntVarP(&eventFanout, "eventFanout", "F", 1, "eventFanout specifies the number of events to fanout")
cmd := &cobra.Command{Use: "app"}
cmd.AddCommand(loadtest)
@@ -73,3 +88,6 @@ func main() {
panic(err)
}
}
// Variable to store the log level which is used to configure the logger
var logLevel string
+108 -46
View File
@@ -7,31 +7,27 @@ import (
"sync"
"time"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/create"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/worker"
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
"github.com/hatchet-dev/hatchet/pkg/v1/factory"
"github.com/hatchet-dev/hatchet/pkg/v1/features"
"github.com/hatchet-dev/hatchet/pkg/v1/task"
"github.com/hatchet-dev/hatchet/pkg/v1/worker"
"github.com/hatchet-dev/hatchet/pkg/v1/workflow"
v0worker "github.com/hatchet-dev/hatchet/pkg/worker"
)
type stepOneOutput struct {
Message string `json:"message"`
}
func run(ctx context.Context, namespace string, delay time.Duration, executions chan<- time.Duration, concurrency, slots int, failureRate float32, eventFanout int) (int64, int64) {
c, err := client.New(
client.WithLogLevel("warn"), // nolint: staticcheck
client.WithNamespace(namespace),
)
if err != nil {
panic(err)
}
w, err := worker.NewWorker(
worker.WithClient(
c,
),
worker.WithLogLevel("warn"),
worker.WithMaxRuns(slots),
func run(ctx context.Context, config LoadTestConfig, executions chan<- time.Duration) (int64, int64) {
hatchet, err := v1.NewHatchetClient(
v1.Config{
Namespace: config.Namespace,
Logger: &l,
},
)
if err != nil {
@@ -43,18 +39,7 @@ func run(ctx context.Context, namespace string, delay time.Duration, executions
var uniques int64
var executed []int64
var concurrencyOpts *worker.WorkflowConcurrency
if concurrency > 0 {
concurrencyOpts = worker.Expression("'global'").MaxRuns(int32(concurrency)).LimitStrategy(types.GroupRoundRobin) // nolint: gosec
}
step := func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
var input Event
err = ctx.WorkflowInput(&input)
if err != nil {
return nil, err
}
step := func(ctx v0worker.HatchetContext, input Event) (any, error) {
took := time.Since(input.CreatedAt)
l.Info().Msgf("executing %d took %s", input.ID, took)
@@ -78,10 +63,10 @@ func run(ctx context.Context, namespace string, delay time.Duration, executions
executed = append(executed, input.ID)
mx.Unlock()
time.Sleep(delay)
time.Sleep(config.Delay)
if failureRate > 0 {
if rand.Float32() < failureRate { // nolint:gosec
if config.FailureRate > 0 {
if rand.Float32() < config.FailureRate { // nolint:gosec
return nil, fmt.Errorf("random failure")
}
}
@@ -91,26 +76,103 @@ func run(ctx context.Context, namespace string, delay time.Duration, executions
}, nil
}
for i := range eventFanout {
err = w.RegisterWorkflow(
&worker.WorkflowJob{
Name: fmt.Sprintf("load-test-%d", i),
Description: "Load testing",
On: worker.Event("load-test:event"),
Concurrency: concurrencyOpts,
// ScheduleTimeout: "30s",
Steps: []*worker.WorkflowStep{
worker.Fn(step).SetName("step-one").SetTimeout("5m"),
},
// put the rate limits
for i := range config.RlKeys {
err = hatchet.RateLimits().Upsert(
features.CreateRatelimitOpts{
// FIXME: namespace?
Key: "rl-key-" + fmt.Sprintf("%d", i),
Limit: config.RlLimit,
Duration: types.RateLimitDuration(config.RlDurationUnit),
},
)
if err != nil {
panic(err)
panic(fmt.Errorf("error creating rate limit: %w", err))
}
}
cleanup, err := w.Start()
workflows := []workflow.WorkflowBase{}
for i := range config.EventFanout {
var concurrencyOpt []types.Concurrency
if config.Concurrency > 0 {
maxRuns := int32(config.Concurrency) // nolint: gosec
limitStrategy := types.GroupRoundRobin
concurrencyOpt = []types.Concurrency{
{
Expression: "'global'",
MaxRuns: &maxRuns,
LimitStrategy: &limitStrategy,
},
}
}
loadtest := factory.NewWorkflow[Event, stepOneOutput](
create.WorkflowCreateOpts[Event]{
Name: fmt.Sprintf("load-test-%d", i),
OnEvents: []string{
"load-test:event",
},
Concurrency: concurrencyOpt,
},
hatchet,
)
var prevTask *task.TaskDeclaration[Event]
for j := range config.DagSteps {
var parents []create.NamedTask
if prevTask != nil {
parentTask := prevTask
parents = []create.NamedTask{
parentTask,
}
}
var rateLimits []*types.RateLimit
if config.RlKeys > 0 {
units := 1
rateLimits = []*types.RateLimit{
{
Key: fmt.Sprintf("rl-key-%d", i%config.RlKeys),
Units: &units,
},
}
}
prevTask = loadtest.Task(
create.WorkflowTask[Event, stepOneOutput]{
Name: fmt.Sprintf("step-%d", j),
Parents: parents,
RateLimits: rateLimits,
},
step,
)
}
workflows = append(workflows, loadtest)
}
worker, err := hatchet.Worker(
worker.WorkerOpts{
Name: "load-test-worker",
Workflows: workflows,
Slots: config.Slots,
Logger: &l,
},
)
if err != nil {
panic(fmt.Errorf("error creating worker: %w", err))
}
cleanup, err := worker.Start()
if err != nil {
panic(fmt.Errorf("error starting worker: %w", err))
}
@@ -38,7 +38,7 @@ func (tc *TasksControllerImpl) processSleeps(ctx context.Context, tenantId strin
matchResult, shouldContinue, err := tc.repov1.Tasks().ProcessDurableSleeps(ctx, tenantId)
if err != nil {
return false, fmt.Errorf("could not list step runs to timeout for tenant %s: %w", tenantId, err)
return false, fmt.Errorf("could not list process durable sleeps for tenant %s: %w", tenantId, err)
}
if len(matchResult.CreatedTasks) > 0 {
+5
View File
@@ -36,6 +36,7 @@ type Client interface {
Subscribe() SubscribeClient
API() *rest.ClientWithResponses
CloudAPI() *cloudrest.ClientWithResponses
Logger() *zerolog.Logger
TenantId() string
Namespace() string
CloudRegisterID() *string
@@ -394,6 +395,10 @@ func (c *clientImpl) Subscribe() SubscribeClient {
return c.subscribe
}
func (c *clientImpl) Logger() *zerolog.Logger {
return c.l
}
func (c *clientImpl) API() *rest.ClientWithResponses {
return c.rest
}
+7 -1
View File
@@ -771,13 +771,19 @@ func (r *workflowRepository) createJobTx(ctx context.Context, tx sqlcv1.DBTX, te
createStepExprParams.Keys = append(createStepExprParams.Keys, rateLimit.Key)
createStepExprParams.Expressions = append(createStepExprParams.Expressions, windowExpr)
} else {
rlUnits := int32(1)
if rateLimit.Units != nil {
rlUnits = int32(*rateLimit.Units) // nolint: gosec
}
_, err := r.queries.CreateStepRateLimit(
ctx,
tx,
sqlcv1.CreateStepRateLimitParams{
Stepid: sqlchelpers.UUIDFromStr(stepId),
Ratelimitkey: rateLimit.Key,
Units: int32(*rateLimit.Units), // nolint: gosec
Units: rlUnits, // nolint: gosec
Tenantid: tenantId,
Kind: sqlcv1.StepRateLimitKindSTATIC,
},
+1 -1
View File
@@ -879,7 +879,7 @@ func (s *Scheduler) getSnapshotInput(mustSnapshot bool) (*SnapshotInput, bool) {
uniqueSlots[slot] = true
if slot.used {
if slot.isUsed() {
workerSlotUtilization[workerId].UtilizedSlots++
} else {
workerSlotUtilization[workerId].NonUtilizedSlots++
+7
View File
@@ -58,6 +58,13 @@ func (s *slot) active() bool {
return !s.used && s.expiresAt != nil && s.expiresAt.After(time.Now())
}
func (s *slot) isUsed() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.used
}
func (s *slot) expired() bool {
s.mu.RLock()
defer s.mu.RUnlock()
+4 -1
View File
@@ -61,6 +61,8 @@ func RunTestWithEngine(m *testing.M) {
if exitCode == 0 {
if err := goleak.Find(
goleak.IgnoreTopFunction("github.com/hatchet-dev/hatchet/internal/cache.NewTTL[...].func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/resolver/dns.(*dnsResolver).watcher"),
goleak.IgnoreTopFunction("github.com/testcontainers/testcontainers-go.(*Reaper).connect.func1"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
@@ -92,7 +94,7 @@ func startEngine() func() {
postgresConnStr, cleanupPostgres := startPostgres(ctx, pgVersion)
grpcPort, err := findAvailablePort(7070)
grpcPort, err := findAvailablePort(7077)
if err != nil {
log.Fatalf("failed to find available port: %v", err)
@@ -101,6 +103,7 @@ func startEngine() func() {
os.Setenv("DATABASE_URL", postgresConnStr)
os.Setenv("SERVER_GRPC_INSECURE", "true")
os.Setenv("SERVER_GRPC_PORT", strconv.Itoa(grpcPort))
os.Setenv("SERVER_GRPC_BROADCAST_ADDRESS", fmt.Sprintf("localhost:%d", grpcPort))
os.Setenv("SERVER_HEALTHCHECK", "false")
os.Setenv("HATCHET_CLIENT_TLS_STRATEGY", "none")
os.Setenv("SERVER_AUTH_COOKIE_DOMAIN", "app.dev.hatchet-tools.com")
+7 -1
View File
@@ -58,12 +58,18 @@ type v1HatchetClientImpl struct {
func NewHatchetClient(config ...Config) (HatchetClient, error) {
cf := &v0Config.ClientConfigFile{}
v0Opts := []v0Client.ClientOpt{}
if len(config) > 0 {
opts := config[0]
cf = mapConfigToCF(opts)
if config[0].Logger != nil {
v0Opts = append(v0Opts, v0Client.WithLogger(config[0].Logger))
}
}
client, err := v0Client.NewFromConfigFile(cf)
client, err := v0Client.NewFromConfigFile(cf, v0Opts...)
if err != nil {
return nil, err
}
+3
View File
@@ -1,6 +1,8 @@
package v1
import (
"github.com/rs/zerolog"
v0Config "github.com/hatchet-dev/hatchet/pkg/config/client"
"github.com/hatchet-dev/hatchet/pkg/config/shared"
)
@@ -16,6 +18,7 @@ type Config struct {
RawRunnableActions []string
AutoscalingTarget string
TLS *TLSConfig
Logger *zerolog.Logger
}
type TLSConfig struct {
+19 -3
View File
@@ -94,6 +94,9 @@ type WorkerImpl struct {
// logLevel is the log level for this worker
logLevel string
// logger is the logger used for this worker
logger *zerolog.Logger
// labels are the labels assigned to this worker
labels WorkerLabels
}
@@ -107,6 +110,7 @@ func NewWorker(workersClient features.WorkersClient, v0 v0Client.Client, opts Wo
workers: workersClient,
name: opts.Name,
logLevel: opts.LogLevel,
logger: opts.Logger,
labels: opts.Labels,
workflows: opts.Workflows,
}
@@ -156,12 +160,20 @@ func (w *WorkerImpl) RegisterWorkflows(workflows ...workflow.WorkflowBase) error
// Create non-durable worker on demand if needed and not already created
if hasNonDurableTasks && w.nonDurableWorker == nil {
nonDurableWorker, err := worker.NewWorker(
opts := []worker.WorkerOpt{
worker.WithClient(w.v0),
worker.WithName(w.name),
worker.WithMaxRuns(w.slots),
worker.WithLogLevel(w.logLevel),
worker.WithLabels(w.labels),
}
if w.logger != nil {
opts = append(opts, worker.WithLogger(w.logger))
}
nonDurableWorker, err := worker.NewWorker(
opts...,
)
if err != nil {
return err
@@ -177,11 +189,15 @@ func (w *WorkerImpl) RegisterWorkflows(workflows ...workflow.WorkflowBase) error
logger = w.nonDurableWorker.Logger()
}
durableWorker, err := worker.NewWorker(
opts := []worker.WorkerOpt{
worker.WithClient(w.v0),
worker.WithName(w.name+"-durable"),
worker.WithName(w.name + "-durable"),
worker.WithMaxRuns(w.durableSlots),
worker.WithLogger(logger),
}
durableWorker, err := worker.NewWorker(
opts...,
)
if err != nil {
return err
+1 -4
View File
@@ -9,7 +9,6 @@ import (
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/cloud/rest"
"github.com/hatchet-dev/hatchet/pkg/logger"
)
type ManagedCompute struct {
@@ -29,15 +28,13 @@ func NewManagedCompute(actionRegistry *ActionRegistry, client client.Client, max
runtimeConfigs := getComputeConfigs(actionRegistry, maxRuns)
cloudRegisterID := client.CloudRegisterID()
logger := logger.NewDefaultLogger("managed-compute")
mc := &ManagedCompute{
ActionRegistry: actionRegistry,
Client: client,
MaxRuns: maxRuns,
RuntimeConfigs: runtimeConfigs,
CloudRegisterID: cloudRegisterID,
Logger: &logger,
Logger: client.Logger(),
}
if len(mc.RuntimeConfigs) == 0 {