test(timeout): add test for timeout (#344)

This commit is contained in:
Luca Steeb
2024-04-11 00:50:24 +07:00
committed by GitHub
parent 32aebd97b4
commit 2f7483a3be
4 changed files with 240 additions and 1 deletions

2
.gitignore vendored
View File

@@ -1,7 +1,7 @@
.DS_Store
.env
*.env
docker/.env
.envrc
*.pem
app
!frontend/app

48
examples/timeout/main.go Normal file
View File

@@ -0,0 +1,48 @@
package main
import (
"fmt"
"time"
"github.com/joho/godotenv"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
type userCreateEvent struct {
Username string `json:"username"`
UserID string `json:"user_id"`
Data map[string]string `json:"data"`
}
type stepOneOutput struct {
Message string `json:"message"`
}
func main() {
err := godotenv.Load()
if err != nil {
panic(err)
}
events := make(chan string, 50)
cleanup, err := run(events, worker.WorkflowJob{
Name: "timeout",
Description: "timeout",
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
time.Sleep(time.Second * 60)
return nil, nil
}).SetName("step-one").SetTimeout("10s"),
},
})
if err != nil {
panic(err)
}
<-events
if err := cleanup(); err != nil {
panic(fmt.Errorf("cleanup() error = %v", err))
}
}

View File

@@ -0,0 +1,82 @@
//go:build e2e
package main
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/hatchet-dev/hatchet/internal/testutils"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
func TestTimeout(t *testing.T) {
testutils.Prepare(t)
tests := []struct {
name string
job func(done func()) worker.WorkflowJob
}{
{
name: "step timeout",
job: func(done func()) worker.WorkflowJob {
return worker.WorkflowJob{
Name: "timeout",
Description: "timeout",
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
select {
case <-time.After(time.Second * 30):
return &stepOneOutput{
Message: "finished",
}, nil
case <-ctx.Done():
done()
return nil, nil
}
}).SetName("step-one").SetTimeout("10s"),
},
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
events := make(chan string, 50)
cleanup, err := run(events, tt.job(func() {
events <- "done"
}))
if err != nil {
t.Fatalf("run() error = %s", err)
}
var items []string
outer:
for {
select {
case item := <-events:
items = append(items, item)
case <-ctx.Done():
break outer
}
}
assert.Equal(t, []string{
"done", // cancellation signal
"done", // test check
}, items)
if err := cleanup(); err != nil {
t.Fatalf("cleanup() error = %s", err)
}
})
}
}

109
examples/timeout/run.go Normal file
View File

@@ -0,0 +1,109 @@
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
func run(done chan<- string, job worker.WorkflowJob) (func() error, error) {
c, err := client.New()
if err != nil {
return nil, fmt.Errorf("error creating client: %w", err)
}
w, err := worker.NewWorker(
worker.WithClient(
c,
),
)
if err != nil {
return nil, fmt.Errorf("error creating worker: %w", err)
}
err = w.On(
worker.Events("user:create:timeout"),
&job,
)
if err != nil {
return nil, fmt.Errorf("error registering workflow: %w", err)
}
go func() {
log.Printf("pushing event")
testEvent := userCreateEvent{
Username: "echo-test",
UserID: "1234",
Data: map[string]string{
"test": "test",
},
}
// push an event
err := c.Event().Push(
context.Background(),
"user:create:timeout",
testEvent,
)
if err != nil {
panic(fmt.Errorf("error pushing event: %w", err))
}
time.Sleep(20 * time.Second)
client := db.NewClient()
if err := client.Connect(); err != nil {
panic(fmt.Errorf("error connecting to database: %w", err))
}
defer client.Disconnect()
// TODO check for the database status
events, err := client.Event.FindMany(
db.Event.TenantID.Equals(c.TenantId()),
db.Event.Key.Equals("user:create:timeout"),
).With(
db.Event.WorkflowRuns.Fetch().With(
db.WorkflowRunTriggeredBy.Parent.Fetch().With(
db.WorkflowRun.JobRuns.Fetch().With(
db.JobRun.StepRuns.Fetch(),
),
),
),
).Exec(context.Background())
if err != nil {
panic(fmt.Errorf("error finding events: %w", err))
}
for _, event := range events {
for _, workflowRun := range event.WorkflowRuns() {
for _, jobRuns := range workflowRun.Parent().JobRuns() {
for _, stepRun := range jobRuns.StepRuns() {
if stepRun.Status != db.StepRunStatusCancelled {
panic(fmt.Errorf("expected step run to be failed, got %s", stepRun.Status))
}
reason, _ := stepRun.CancelledReason()
if reason != "TIMED_OUT" {
panic(fmt.Errorf("expected step run to be failed, got %s", reason))
}
}
}
}
}
done <- "done"
}()
cleanup, err := w.Start()
if err != nil {
return nil, fmt.Errorf("error starting worker: %w", err)
}
return cleanup, nil
}