mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-06 16:59:39 -06:00
@@ -40,6 +40,9 @@ func main() {
|
||||
}, nil
|
||||
},
|
||||
hatchet.WithWorkflowCron("0 2 * * *"),
|
||||
hatchet.WithWorkflowCronInput(CronInput{
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}),
|
||||
hatchet.WithWorkflowDescription("Daily cleanup and maintenance tasks"),
|
||||
)
|
||||
|
||||
|
||||
2
go.mod
2
go.mod
@@ -36,6 +36,7 @@ require (
|
||||
github.com/testcontainers/testcontainers-go/modules/rabbitmq v0.39.0
|
||||
github.com/tink-crypto/tink-go v0.0.0-20230613075026-d6de17e3f164
|
||||
github.com/tink-crypto/tink-go-gcpkms v0.0.0-20230602082706-31d0d09ccc8d
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0
|
||||
go.opentelemetry.io/otel v1.38.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0
|
||||
@@ -153,7 +154,6 @@ require (
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.38.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
|
||||
|
||||
@@ -41,6 +41,9 @@ type WorkflowCreateOpts[I any] struct {
|
||||
// (optional) The cron expressions for scheduled workflow runs
|
||||
OnCron []string
|
||||
|
||||
// (optional) The JSON-serialized input for cron workflows (defaults to "{}" if nil)
|
||||
CronInput *string
|
||||
|
||||
// (optional) Concurrency settings to control parallel execution
|
||||
Concurrency []types.Concurrency
|
||||
|
||||
|
||||
@@ -40,6 +40,9 @@ func main() {
|
||||
}, nil
|
||||
},
|
||||
hatchet.WithWorkflowCron("0 2 * * *"),
|
||||
hatchet.WithWorkflowCronInput(CronInput{
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}),
|
||||
hatchet.WithWorkflowDescription("Daily cleanup and maintenance tasks"),
|
||||
)
|
||||
// !!
|
||||
|
||||
@@ -119,6 +119,7 @@ type workflowDeclarationImpl[I any, O any] struct {
|
||||
Description *string
|
||||
OnEvents []string
|
||||
OnCron []string
|
||||
CronInput *string
|
||||
Concurrency []types.Concurrency
|
||||
OnFailureTask *task.OnFailureTaskDeclaration[I]
|
||||
StickyStrategy *types.StickyStrategy
|
||||
@@ -179,6 +180,7 @@ func NewWorkflowDeclaration[I any, O any](opts create.WorkflowCreateOpts[I], v0
|
||||
name: workflowName,
|
||||
OnEvents: onEvents,
|
||||
OnCron: opts.OnCron,
|
||||
CronInput: opts.CronInput,
|
||||
Concurrency: opts.Concurrency,
|
||||
// OnFailureTask: opts.OnFailureTask, // TODO: add this back in
|
||||
StickyStrategy: opts.StickyStrategy,
|
||||
@@ -588,6 +590,7 @@ func (w *workflowDeclarationImpl[I, O]) Dump() (*contracts.CreateWorkflowVersion
|
||||
Name: w.name,
|
||||
EventTriggers: w.OnEvents,
|
||||
CronTriggers: w.OnCron,
|
||||
CronInput: w.CronInput,
|
||||
DefaultPriority: w.DefaultPriority,
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -116,6 +117,7 @@ type workflowConfig struct {
|
||||
taskDefaults *create.TaskDefaults
|
||||
defaultPriority *RunPriority
|
||||
stickyStrategy *types.StickyStrategy
|
||||
cronInput *string
|
||||
}
|
||||
|
||||
// WithWorkflowCron configures the workflow to run on a cron schedule.
|
||||
@@ -126,6 +128,24 @@ func WithWorkflowCron(cronExpressions ...string) WorkflowOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithWorkflowCronInput sets the input for cron workflows.
|
||||
func WithWorkflowCronInput(input any) WorkflowOption {
|
||||
return func(config *workflowConfig) {
|
||||
inputJSON := "{}"
|
||||
|
||||
if input != nil {
|
||||
bytes, err := json.Marshal(input)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("could not marshal cron input: %w", err))
|
||||
}
|
||||
|
||||
inputJSON = string(bytes)
|
||||
}
|
||||
|
||||
config.cronInput = &inputJSON
|
||||
}
|
||||
}
|
||||
|
||||
// WithWorkflowEvents configures the workflow to trigger on specific events.
|
||||
func WithWorkflowEvents(events ...string) WorkflowOption {
|
||||
return func(config *workflowConfig) {
|
||||
@@ -183,12 +203,18 @@ func newWorkflow(name string, v0Client v0Client.Client, options ...WorkflowOptio
|
||||
opt(config)
|
||||
}
|
||||
|
||||
if len(config.onCron) > 0 && config.cronInput == nil {
|
||||
emptyJSON := "{}"
|
||||
config.cronInput = &emptyJSON
|
||||
}
|
||||
|
||||
createOpts := create.WorkflowCreateOpts[any]{
|
||||
Name: name,
|
||||
Version: config.version,
|
||||
Description: config.description,
|
||||
OnEvents: config.onEvents,
|
||||
OnCron: config.onCron,
|
||||
CronInput: config.cronInput,
|
||||
Concurrency: config.concurrency,
|
||||
TaskDefaults: config.taskDefaults,
|
||||
StickyStrategy: config.stickyStrategy,
|
||||
|
||||
Reference in New Issue
Block a user