mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-30 13:19:44 -06:00
feat(go-sdk): cron and schedules API, minor fixes (#1083)
* feat(go-sdk): cron and schedules API, minor fixes * try to improve code block and docs * revert pre-commit * fix: generate * fix: put overflow in right place * remove branch specs
This commit is contained in:
1
.github/workflows/test.yml
vendored
1
.github/workflows/test.yml
vendored
@@ -38,6 +38,7 @@ jobs:
|
||||
- name: Generate
|
||||
run: |
|
||||
sh ./hack/db/atlas-apply.sh
|
||||
task pre-commit-install
|
||||
task generate-all
|
||||
|
||||
- name: Check for diff
|
||||
|
||||
@@ -116,7 +116,7 @@ tasks:
|
||||
cmds:
|
||||
- sh ./hack/dev/generate-local-encryption-keys.sh ./hack/dev/encryption-keys
|
||||
init-dev-env:
|
||||
- sh ./hack/dev/init-dev-token-and-env.sh
|
||||
- sh ./hack/dev/init-dev-token-and-env.sh
|
||||
generate-dev-api-token:
|
||||
cmds:
|
||||
- sh ./hack/dev/generate-dev-api-token.sh
|
||||
@@ -167,10 +167,8 @@ tasks:
|
||||
- mkdir -p ./python-sdk/certs/ && cp ./hack/dev/certs/ca.cert ./python-sdk/certs/
|
||||
pre-commit-install:
|
||||
cmds:
|
||||
- pip install pre-commit
|
||||
- pip install pre-commit # can use brew install pre-commit if you are on macOS
|
||||
- pre-commit install
|
||||
pre-commit-run:
|
||||
deps:
|
||||
- pre-commit-install
|
||||
cmds:
|
||||
- pre-commit run --all-files || pre-commit run --all-files
|
||||
|
||||
@@ -827,10 +827,9 @@ workflowWorkersCount:
|
||||
tags:
|
||||
- Workflow
|
||||
|
||||
|
||||
scheduledCreate:
|
||||
post:
|
||||
x-resources: ["tenant", "workflow"]
|
||||
x-resources: ["tenant"]
|
||||
description: Schedule a new workflow run for a tenant
|
||||
operationId: scheduled-workflow-run:create
|
||||
parameters:
|
||||
@@ -1024,30 +1023,30 @@ scheduled:
|
||||
minLength: 36
|
||||
maxLength: 36
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/ScheduledWorkflows"
|
||||
description: Successfully retrieved the workflow runs
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: A malformed or bad request
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: Forbidden
|
||||
"404":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: Forbidden
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/ScheduledWorkflows"
|
||||
description: Successfully retrieved the workflow runs
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: A malformed or bad request
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: Forbidden
|
||||
"404":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: Forbidden
|
||||
summary: Get scheduled workflow run
|
||||
tags:
|
||||
- Workflow
|
||||
@@ -1118,30 +1117,30 @@ crons:
|
||||
minLength: 36
|
||||
maxLength: 36
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/CronWorkflows"
|
||||
description: Successfully retrieved the workflow runs
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: A malformed or bad request
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: Forbidden
|
||||
"404":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: Forbidden
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/CronWorkflows"
|
||||
description: Successfully retrieved the workflow runs
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: A malformed or bad request
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: Forbidden
|
||||
"404":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "../../components/schemas/_index.yaml#/APIErrors"
|
||||
description: Forbidden
|
||||
summary: Get cron job workflow run
|
||||
tags:
|
||||
- Workflow
|
||||
|
||||
@@ -10809,28 +10809,28 @@ var swaggerSpec = []string{
|
||||
"evWSg5/Zn3ul5By1Xkl6kBvqLDvum6TBgTEZrRbVW+uupN/d1l9p3l/JgKdmDgkG2qjxXFoJA+50gZmd",
|
||||
"4r51HsftUbzrfk3rlSN2ikEWf/+ax9BUlqAETgifzJE09oE017zD7mTMrb69MigqA+4rQdtocUzNNjQp",
|
||||
"ZmHc/I1mLGzm5Kkm+jXD34rFzVfs27osiULQVVH5eoIYFVlcsCPr5bHUCIREttcHS6rEIA1bKbxJKSx3",
|
||||
"QNmAJvLXqDdssLpQc3VUlcDv8qbZil8r8SsUkjqd2Dbj3CLSl2fh3vOiNCQ13jqsjcxpJNPHg0eAAjAK",
|
||||
"IBPEiuTRX8y/QsKzfONTNuPOS+G61FM7nlCmsFkL3sI5qXDyaQ3jhuf6ApIWS0hXZP8UwwQfeGmSwGrO",
|
||||
"xvyiwBs6tFuJe28wTL5CcioGWyPd0Zka0hmDuC1k8vaFTKCXJoi8MDHuRdEDgt2Uyq4/7qiomotzK5Kb",
|
||||
"JHe2/RoyniAyTUcHHgiCEfAejOR8Gs3iABLIafqSzu9ozyM6ES/j8JUNfUlxeSqHnyPwD4fHNU8LnpjX",
|
||||
"L887hcAXNcuCiG+GtkZeJtZf55BZwJ1cYHEOS/RhAhKzKBjSr4shjnVtjjUGz/pxxqBriLAomgRwPfTG",
|
||||
"hv6b0xtH34rpLUfc347eUPiICLQpbCi1Yd6BKd1Wxzcd4Zr17Yu51niKqxNZuVIECMuNKS6w1Retj1WW",
|
||||
"23MOeznlXWvscwXaOwCeB2NSUaeffceZsU1MUqI2dfN5H3c9piU+OJ+ovvBeBfXxlevor3UIyKvPMySV",
|
||||
"9t6evhLIUg5WVOSi35vRF+/jrqu+FR18BfTFV97SV031cYqkBegriCYoNJPVeTTBDgodwM7G/QoF45wN",
|
||||
"tB5aYkcwHX9DFUKt7tFBNJlA30Fhe33equtz8VinVGN7Tw6iSZSSGmaIUmLHDVH69rYeQaPRltXLaYm0",
|
||||
"Rhll1GNLtjM4G8EET1Hc4AqkdLK7BvEj5EfeTUQUrZXA9ZM2vw+pKGrvRIvciVQM1pNkDDB+ipIKpwQu",
|
||||
"JoUkdWT7KpF6Jcdcn45xOgXhJJtom5QNj0HmZ4hqxfkOiXNOVkVKt2CiBE6oIEuqLn28Ba7USDKXnXWx",
|
||||
"jQRjmxhGIq995toJPV2SkK3OgwPgPazlhWFIR97iB4YaUdPwxeEJjqZR9LAnHFIOfoofLKK8qNARrcsO",
|
||||
"K/x3+wAuMZDZISSbaMP+IJYRURK+VsS8vYiZj8JSydToBSJa2DHHgcCzzX1LNpX1wao5Rhyh2DZdw9by",
|
||||
"zWr8qDj03I1KoIZiZiAmNDnBZtkoBXay7WrZc4vYk10vS1vUlEcz3mR/vFqU/NUYNziFWYY7CmezKt9F",
|
||||
"TYTL7nguNvYhEytuDSsl58RSDAjVv6p9EZmGRqmQeNMKs0klIfNWO0PLa7iVMgQUzg3TWSEwkEqUbS40",
|
||||
"wpLXOGQtp+k5TTDEMsw2d5rMO/lb5bvIPJGtAuwb3Iu20lO+Sa6IDMA2ZmfzMTu665BCMQv6yXfqNCx7",
|
||||
"Tmigcr2HgJEFg0Ra3npr3lKjUZZhLBu1z567mumBW8Fg66tnzJFhGz7Lta4il21aObSSCPPqYSsPjAri",
|
||||
"csxZoyZaJW2nm1TMzp4x3iNMME+gaTwpGyRp3wZ+1iRK5GkOV1DFZvEaNnrAJkmUxiz7ZA6C3CgjKKzT",
|
||||
"d/ji1mYGWLOQWDIjtCC9Nin0NmoTC2WhbiS4ZLYSo5uBDLRvmj9kobQhWym5rjXssu/0x8y6jVNKHdDv",
|
||||
"MK4KAIGYZDyFsDOGxJtC35SjOBf8W65ICTJYMBfJm2UgUeBtlHqkTTjSJhzZYMIRrWgWsgFbvGoVTnIr",
|
||||
"sfwbb7xDJpi/g1xes5QTm7qkKtjKu61SAXNSXFQFnPchG0GQwCTzIetovcpg8ijlQZoE7onrvt69/r8A",
|
||||
"AAD//7hJKYh3/QEA",
|
||||
"QNmAJvLXqDdssLpQc3VUlcDv8qbZil8r8SsUkjqdeOUil6fe3vOiNCQ1LjqsjUxkJHPGg0eAAjAKIJO+",
|
||||
"irjR38a/QsJTe+NTNuPOi966fFM7nkWmsFkLXr05qXDyaa3hhjf6ApIWy0JXZP8UwwQfeGmSwGrOxvx2",
|
||||
"wBs6tFuJe28wTL5CcioGWyPd0Zka0hmDuK1e8vbVS6CXJoi8MDHuRdEDgt2Uyq4/7qiomgtuK5KbJHe2",
|
||||
"/RoyniAyTUcHHgiCEfAejOR8Gs3iABLIafqSzu9ozyM6Ea/d8JUNfUlxeSqHnyPwD4fHNe8JnpjXL887",
|
||||
"hcAXhcqCiG+GtjBeJtZf55BZwJ1cYHEOS/RhAhKzKBjSr4shjnVtjjUGz/pxxqBriLAomgRwPfTGhv6b",
|
||||
"0xtH34rpLUfc347eUPiICLSpZii1Yd6BKd1Wxzcd4Zr17Yu51niKqxNZ+U8ECMuNKS6w1Retj1WW0HMO",
|
||||
"eznlXWtuiAXaOwCeB2NSUZyffceZhU1MUqI2dfN5H3c99iQ+OJ+ovtpeBfXxlevor/UCyEvOMySV9t6e",
|
||||
"vhLI8gxWlOGi35vRF+/jrquoFR18BfTFV97SV03JcYqkBegriCYoNJPVeTTBDgodwM7G/QoF45wNtB5a",
|
||||
"YkcwHX9DZUGt7tFBNJlA30Fhe33equtz8VinVGN7Tw6iSZSSGmaIUmLHDVH69rYeQaPRlhXJaYm0Rhll",
|
||||
"1GNLtjM4G8EET1Hc4AqkdLK7BvEj5EfeTYQRrZXA9ZM2vw+pKGrvRIvciVQM1pNkDDB+ipIKTwQuJoUk",
|
||||
"dWT7KpF6Jcdcn45xOgXhJJtom5QNj0HmZ4hqxfkOiXNOVkVKt2CiBE6oIEuqLn28Ba7USDI/nXWxjQRj",
|
||||
"mxhGIq995toJPV2SkK3OgwPgPazlhWFIR97iB4YaUdPwxeEJjqZR9LAnHFIOfoofLEK7qNARrcsOK/x3",
|
||||
"+6gtMZDZISSbaMP+IJZhUBK+VsS8vYiZD71SydToBSJa2DHHgcCzzX1LNpVFwao5Rhyh2DZHw9byzWr8",
|
||||
"qDj03I1KoIZiZiAmNHm+ZikoBXay7WrZc4vYk10vS1vUlEcz3mR/vFrU+dUYNziFWcY4CmezKt9FTVjL",
|
||||
"7nguNvYhEytuDSsl58RS4AfVv6p9EZmGRqmQeNMKs0klIfNWO0PLa7iVMgQUzg3TWSEwkEqUbS4ewpLX",
|
||||
"OGQtp+k5TTDEMsw2d5rMO/lbJbnIPJGtouob3Iu20lO+SYKIDMA2UGfzgTq665BCMQv6yXfqNCx7Tmig",
|
||||
"cr2HgJEFg0Ra3npr3lKjUZZhLBu1z567mumBW8Fg6ytizJFhGzPLta4il21aObSSCPPqYSsPjAricsxZ",
|
||||
"oyZaZWqnm1RMyZ4x3iNMMM+aaTwpG2Rm3wZ+1mRH5LkNV1C6ZvHCNXrAJkmUxizlZA6C3CgjKKzTd/ji",
|
||||
"1qYDWLOQWDINtCC9NhP0NmoTC6WebiS4ZIoSo5uBjK5vmjRkoVwhWym5rjXssu/0x8y6jVNKHdDvMK4K",
|
||||
"AIGYZDyFsDOGxJtC35SYOBf8W65ICTJYMAHJm6UdUeBtlG+kzTLSZhlZQ5aRRqJZyAZs8apVOMmtxPJv",
|
||||
"vPEOmWD+DnJ5zVJObOqSqmAr77ZKBcxJcVEVcN6HbARBApPMh6yj9SqDyaOUB2kSuCeu+3r3+v8CAAD/",
|
||||
"/3YBafhs/QEA",
|
||||
}
|
||||
|
||||
// GetSwagger returns the content of the embedded swagger specification file
|
||||
|
||||
146
examples/cron-programmatic/main.go
Normal file
146
examples/cron-programmatic/main.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/client"
|
||||
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
|
||||
"github.com/hatchet-dev/hatchet/pkg/worker"
|
||||
)
|
||||
|
||||
// ❓ Create
|
||||
// ... normal workflow definition
|
||||
type printOutput struct{}
|
||||
|
||||
func print(ctx context.Context) (result *printOutput, err error) {
|
||||
fmt.Println("called print:print")
|
||||
|
||||
return &printOutput{}, nil
|
||||
}
|
||||
|
||||
// ,
|
||||
func main() {
|
||||
// ... initialize client, worker and workflow
|
||||
err := godotenv.Load()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c, err := client.New()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
w, err := worker.NewWorker(
|
||||
worker.WithClient(
|
||||
c,
|
||||
),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = w.RegisterWorkflow(
|
||||
&worker.WorkflowJob{
|
||||
On: worker.NoTrigger(),
|
||||
Name: "cron-workflow",
|
||||
Description: "Demonstrates a simple cron workflow",
|
||||
Steps: []*worker.WorkflowStep{
|
||||
worker.Fn(print),
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
interrupt := cmdutils.InterruptChan()
|
||||
|
||||
cleanup, err := w.Start()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// ,
|
||||
|
||||
go func() {
|
||||
// 👀 define the cron expression to run every minute
|
||||
cron, err := c.Cron().Create(
|
||||
context.Background(),
|
||||
"cron-workflow",
|
||||
&client.CronOpts{
|
||||
Name: "every-minute",
|
||||
Expression: "* * * * *",
|
||||
Input: map[string]interface{}{
|
||||
"message": "Hello, world!",
|
||||
},
|
||||
AdditionalMetadata: map[string]string{},
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Println(*cron.Name, cron.Cron)
|
||||
}()
|
||||
|
||||
// ... wait for interrupt signal
|
||||
|
||||
<-interrupt
|
||||
|
||||
if err := cleanup(); err != nil {
|
||||
panic(fmt.Errorf("error cleaning up: %w", err))
|
||||
}
|
||||
|
||||
// ,
|
||||
}
|
||||
|
||||
// !!
|
||||
|
||||
func ListCrons() {
|
||||
|
||||
c, err := client.New()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// ❓ List
|
||||
crons, err := c.Cron().List(context.Background())
|
||||
// !!
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for _, cron := range *crons.Rows {
|
||||
fmt.Println(cron.Cron, *cron.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func DeleteCron(id string) {
|
||||
c, err := client.New()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// ❓ Delete
|
||||
// 👀 id is the cron's metadata id, can get it via cron.Metadata.Id
|
||||
err = c.Cron().Delete(context.Background(), id)
|
||||
// !!
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
"github.com/hatchet-dev/hatchet/pkg/worker"
|
||||
)
|
||||
|
||||
// ❓ Workflow Definition Cron Trigger
|
||||
// ... normal workflow definition
|
||||
type printOutput struct{}
|
||||
|
||||
func print(ctx context.Context) (result *printOutput, err error) {
|
||||
@@ -19,7 +21,9 @@ func print(ctx context.Context) (result *printOutput, err error) {
|
||||
return &printOutput{}, nil
|
||||
}
|
||||
|
||||
// ,
|
||||
func main() {
|
||||
// ... initialize client and worker
|
||||
err := godotenv.Load()
|
||||
|
||||
if err != nil {
|
||||
@@ -42,18 +46,29 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = w.On(
|
||||
worker.Cron("* * * * *"),
|
||||
worker.Fn(print),
|
||||
// ,
|
||||
err = w.RegisterWorkflow(
|
||||
&worker.WorkflowJob{
|
||||
// 👀 define the cron expression to run every minute
|
||||
On: worker.Cron("* * * * *"),
|
||||
Name: "cron-workflow",
|
||||
Description: "Demonstrates a simple cron workflow",
|
||||
Steps: []*worker.WorkflowStep{
|
||||
worker.Fn(print),
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// ... start worker
|
||||
|
||||
interrupt := cmdutils.InterruptChan()
|
||||
|
||||
cleanup, err := w.Start()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -63,4 +78,8 @@ func main() {
|
||||
if err := cleanup(); err != nil {
|
||||
panic(fmt.Errorf("error cleaning up: %w", err))
|
||||
}
|
||||
|
||||
// ,
|
||||
}
|
||||
|
||||
// !!
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -11,33 +12,19 @@ import (
|
||||
"github.com/hatchet-dev/hatchet/pkg/worker"
|
||||
)
|
||||
|
||||
type scheduledInput struct {
|
||||
ScheduledAt time.Time `json:"scheduled_at"`
|
||||
ExecuteAt time.Time `json:"scheduled_for"`
|
||||
}
|
||||
|
||||
type stepOneOutput struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
func StepOne(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
|
||||
input := &scheduledInput{}
|
||||
|
||||
err = ctx.WorkflowInput(input)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get time between execute at and scheduled at
|
||||
timeBetween := time.Since(input.ScheduledAt)
|
||||
|
||||
return &stepOneOutput{
|
||||
Message: fmt.Sprintf("This ran %s after scheduling", timeBetween),
|
||||
}, nil
|
||||
// ❓ Create
|
||||
// ... normal workflow definition
|
||||
type printOutput struct{}
|
||||
|
||||
func print(ctx context.Context) (result *printOutput, err error) {
|
||||
fmt.Println("called print:print")
|
||||
|
||||
return &printOutput{}, nil
|
||||
}
|
||||
|
||||
// ,
|
||||
func main() {
|
||||
// ... initialize client, worker and workflow
|
||||
err := godotenv.Load()
|
||||
|
||||
if err != nil {
|
||||
@@ -60,13 +47,13 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = w.On(
|
||||
worker.NoTrigger(),
|
||||
err = w.RegisterWorkflow(
|
||||
&worker.WorkflowJob{
|
||||
Name: "scheduled-workflow",
|
||||
Description: "This runs at a scheduled time.",
|
||||
On: worker.NoTrigger(),
|
||||
Name: "schedule-workflow",
|
||||
Description: "Demonstrates a simple scheduled workflow",
|
||||
Steps: []*worker.WorkflowStep{
|
||||
worker.Fn(StepOne).SetName("step-one"),
|
||||
worker.Fn(print),
|
||||
},
|
||||
},
|
||||
)
|
||||
@@ -75,44 +62,84 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan())
|
||||
defer cancel()
|
||||
interrupt := cmdutils.InterruptChan()
|
||||
|
||||
cleanup, err := w.Start()
|
||||
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("error cleaning up: %w", err))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// ,
|
||||
|
||||
go func() {
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
executeAt := time.Now().Add(time.Second * 10)
|
||||
executeAt2 := time.Now().Add(time.Second * 20)
|
||||
executeAt3 := time.Now().Add(time.Second * 30)
|
||||
|
||||
err = c.Admin().ScheduleWorkflow(
|
||||
"scheduled-workflow",
|
||||
client.WithSchedules(executeAt, executeAt2, executeAt3),
|
||||
client.WithInput(&scheduledInput{
|
||||
ScheduledAt: time.Now(),
|
||||
ExecuteAt: executeAt,
|
||||
}),
|
||||
// 👀 define the scheduled workflow to run in a minute
|
||||
schedule, err := c.Schedule().Create(
|
||||
context.Background(),
|
||||
"schedule-workflow",
|
||||
&client.ScheduleOpts{
|
||||
// 👀 define the time to run the scheduled workflow, in UTC
|
||||
TriggerAt: time.Now().UTC().Add(time.Minute),
|
||||
Input: map[string]interface{}{
|
||||
"message": "Hello, world!",
|
||||
},
|
||||
AdditionalMetadata: map[string]string{},
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Println(schedule.TriggerAt, schedule.WorkflowName)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-interruptCtx.Done():
|
||||
if err := cleanup(); err != nil {
|
||||
panic(fmt.Errorf("error cleaning up: %w", err))
|
||||
}
|
||||
return
|
||||
default:
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
// ... wait for interrupt signal
|
||||
|
||||
<-interrupt
|
||||
|
||||
if err := cleanup(); err != nil {
|
||||
panic(fmt.Errorf("error cleaning up: %w", err))
|
||||
}
|
||||
|
||||
// ,
|
||||
}
|
||||
|
||||
// !!
|
||||
|
||||
func ListScheduledWorkflows() {
|
||||
c, err := client.New()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// ❓ List
|
||||
schedules, err := c.Schedule().List(context.Background())
|
||||
// !!
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for _, schedule := range *schedules.Rows {
|
||||
fmt.Println(schedule.TriggerAt, schedule.WorkflowName)
|
||||
}
|
||||
}
|
||||
|
||||
func DeleteScheduledWorkflow(id string) {
|
||||
c, err := client.New()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// ❓ Delete
|
||||
// 👀 id is the schedule's metadata id, can get it via schedule.Metadata.Id
|
||||
err = c.Schedule().Delete(context.Background(), id)
|
||||
// !!
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ export const columns = ({
|
||||
<DataTableColumnHeader column={column} title="Cron" />
|
||||
),
|
||||
cell: ({ row }) => (
|
||||
<div className="flex flex-row items-center gap-4 pl-4 whitespace-nowrap">
|
||||
<div className="flex flex-row items-center gap-4 whitespace-nowrap">
|
||||
{row.original.cron}
|
||||
</div>
|
||||
),
|
||||
@@ -29,11 +29,11 @@ export const columns = ({
|
||||
{
|
||||
accessorKey: 'readable',
|
||||
header: ({ column }) => (
|
||||
<DataTableColumnHeader column={column} title="Readable" />
|
||||
<DataTableColumnHeader column={column} title="Description" />
|
||||
),
|
||||
cell: ({ row }) => (
|
||||
<div className="flex flex-row items-center gap-4 pl-4">
|
||||
runs {CronPrettifier.toString(row.original.cron).toLowerCase()} UTC
|
||||
<div className="flex flex-row items-center gap-4">
|
||||
Runs {CronPrettifier.toString(row.original.cron).toLowerCase()} UTC
|
||||
</div>
|
||||
),
|
||||
enableSorting: false,
|
||||
@@ -59,7 +59,7 @@ export const columns = ({
|
||||
<DataTableColumnHeader column={column} title="Workflow" />
|
||||
),
|
||||
cell: ({ row }) => (
|
||||
<div className="flex flex-row items-center gap-4 pl-4">
|
||||
<div className="flex flex-row items-center gap-4">
|
||||
<div className="cursor-pointer hover:underline min-w-fit whitespace-nowrap">
|
||||
<Link to={`/workflows/${row.original.workflowId}`}>
|
||||
{row.original.workflowName}
|
||||
@@ -92,7 +92,7 @@ export const columns = ({
|
||||
<DataTableColumnHeader column={column} title="Created At" />
|
||||
),
|
||||
cell: ({ row }) => (
|
||||
<div className="flex flex-row items-center gap-4 pl-4">
|
||||
<div className="flex flex-row items-center gap-4">
|
||||
<RelativeDate date={row.original.metadata.createdAt} />
|
||||
</div>
|
||||
),
|
||||
@@ -128,7 +128,7 @@ export const columns = ({
|
||||
onClick: () => onDeleteClick(row.original),
|
||||
disabled:
|
||||
row.original.method !== 'API'
|
||||
? 'Cannot delete recurring workflow created via workflow code definition'
|
||||
? 'This cron was created via the workflow code definition. Delete it from the workflow definition instead.'
|
||||
: undefined,
|
||||
},
|
||||
]}
|
||||
|
||||
@@ -48,7 +48,7 @@ export const columns = ({
|
||||
<DataTableColumnHeader column={column} title="Trigger At" />
|
||||
),
|
||||
cell: ({ row }) => (
|
||||
<div className="flex flex-row items-center gap-4 pl-4">
|
||||
<div className="flex flex-row items-center gap-4">
|
||||
<RelativeDate date={row.original.triggerAt} />
|
||||
</div>
|
||||
),
|
||||
@@ -59,7 +59,7 @@ export const columns = ({
|
||||
<DataTableColumnHeader column={column} title="Workflow" />
|
||||
),
|
||||
cell: ({ row }) => (
|
||||
<div className="flex flex-row items-center gap-4 pl-4">
|
||||
<div className="flex flex-row items-center gap-4">
|
||||
<div className="cursor-pointer hover:underline min-w-fit whitespace-nowrap">
|
||||
<a href={`/workflows/${row.original.workflowId}`}>
|
||||
{row.original.workflowName}
|
||||
@@ -92,7 +92,7 @@ export const columns = ({
|
||||
<DataTableColumnHeader column={column} title="Created At" />
|
||||
),
|
||||
cell: ({ row }) => (
|
||||
<div className="flex flex-row items-center gap-4 pl-4">
|
||||
<div className="flex flex-row items-center gap-4">
|
||||
<RelativeDate date={row.original.metadata.createdAt} />
|
||||
</div>
|
||||
),
|
||||
|
||||
@@ -3,7 +3,14 @@ import { parseDocComments } from "./codeParser";
|
||||
import { Src } from "./codeData";
|
||||
import CodeStyleRender from "./CodeStyleRender";
|
||||
import { Button } from "../ui/button";
|
||||
import { CheckIcon, CopyIcon, FoldVertical, MoveRight, MoveUpRight, UnfoldVertical } from "lucide-react";
|
||||
import {
|
||||
CheckIcon,
|
||||
CopyIcon,
|
||||
FoldVertical,
|
||||
MoveRight,
|
||||
MoveUpRight,
|
||||
UnfoldVertical,
|
||||
} from "lucide-react";
|
||||
|
||||
interface CodeRendererProps {
|
||||
source: Src;
|
||||
@@ -11,57 +18,110 @@ interface CodeRendererProps {
|
||||
}
|
||||
|
||||
export const CodeBlock = ({ source, target }: CodeRendererProps) => {
|
||||
const [collapsed, setCollapsed] = React.useState(true);
|
||||
const [plainText, setPlainText] = React.useState(false);
|
||||
const [copied, setCopied] = React.useState(false);
|
||||
const [collapsed, setCollapsed] = React.useState(true);
|
||||
const [plainText, setPlainText] = React.useState(false);
|
||||
const [copied, setCopied] = React.useState(false);
|
||||
|
||||
const parsed = parseDocComments(source.raw, target, collapsed);
|
||||
const parsed = parseDocComments(source.raw, target, collapsed);
|
||||
|
||||
return <>
|
||||
<div className="sticky top-0 z-10 bg-background flex flex-row gap-4 justify-between items-center pl-2 mb-2">
|
||||
<a href={source.githubUrl} target="_blank" rel="noopener noreferrer" className="text-xs text-gray-500 font-mono hover:underline">{source.props.path}</a>
|
||||
<div className="flex gap-2 justify-end">
|
||||
<Button variant="ghost" size="sm" onClick={() => setCollapsed(!collapsed)}>
|
||||
{collapsed ? <><UnfoldVertical className="w-4 h-4 mr-2" />Expand</> : <><FoldVertical className="w-4 h-4 mr-2" />Collapse</>}
|
||||
</Button>
|
||||
<Button variant="outline" size="sm" onClick={() => {
|
||||
navigator.clipboard.writeText(parsed);
|
||||
setCopied(true);
|
||||
setTimeout(() => setCopied(false), 2000);
|
||||
}}>
|
||||
{copied ? <><CheckIcon className="w-4 h-4 mr-2" />Copied</> : <><CopyIcon className="w-4 h-4 mr-2" />Copy</>}
|
||||
</Button>
|
||||
</div>
|
||||
return (
|
||||
<>
|
||||
<div className="sticky top-0 z-10 bg-background flex flex-row gap-4 justify-between items-center pl-2 mb-2">
|
||||
<a
|
||||
href={source.githubUrl}
|
||||
target="_blank"
|
||||
rel="noopener noreferrer"
|
||||
className="text-xs text-gray-500 font-mono hover:underline"
|
||||
>
|
||||
{source.props.path}
|
||||
</a>
|
||||
<div className="flex gap-2 justify-end">
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
onClick={() => setCollapsed(!collapsed)}
|
||||
>
|
||||
{collapsed ? (
|
||||
<>
|
||||
<UnfoldVertical className="w-4 h-4 mr-2" />
|
||||
Expand
|
||||
</>
|
||||
) : (
|
||||
<>
|
||||
<FoldVertical className="w-4 h-4 mr-2" />
|
||||
Collapse
|
||||
</>
|
||||
)}
|
||||
</Button>
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
navigator.clipboard.writeText(parsed);
|
||||
setCopied(true);
|
||||
setTimeout(() => setCopied(false), 2000);
|
||||
}}
|
||||
>
|
||||
{copied ? (
|
||||
<>
|
||||
<CheckIcon className="w-4 h-4 mr-2" />
|
||||
Copied
|
||||
</>
|
||||
) : (
|
||||
<>
|
||||
<CopyIcon className="w-4 h-4 mr-2" />
|
||||
Copy
|
||||
</>
|
||||
)}
|
||||
</Button>
|
||||
</div>
|
||||
{!plainText && <CodeStyleRender parsed={parsed} language={source.language || "text"} />}
|
||||
</div>
|
||||
<div>
|
||||
{!plainText && (
|
||||
<CodeStyleRender
|
||||
parsed={parsed}
|
||||
language={source.language || "text"}
|
||||
/>
|
||||
)}
|
||||
{/* plain text for SEO */}
|
||||
<pre style={{ display: plainText ? 'block' : 'none' }} aria-hidden="true">{parsed}</pre>
|
||||
<pre
|
||||
style={{ display: plainText ? "block" : "none" }}
|
||||
aria-hidden="true"
|
||||
>
|
||||
{parsed}
|
||||
</pre>
|
||||
</div>
|
||||
|
||||
|
||||
<div className="flex flex-row mt-2 justify-between">
|
||||
<div className="flex flex-row mt-2 justify-between">
|
||||
<div className="flex gap-4">
|
||||
<a
|
||||
href={source.githubUrl}
|
||||
target="_blank"
|
||||
rel="noopener noreferrer"
|
||||
>
|
||||
<Button variant="outline" size="sm" className="flex flex-row gap-2">
|
||||
<svg height="16" width="16" viewBox="0 0 16 16" fill="currentColor">
|
||||
<path d="M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z"/>
|
||||
</svg>
|
||||
View Full Code Example on GitHub <MoveUpRight className="w-3 h-3" />
|
||||
</Button>
|
||||
</a>
|
||||
<a href={source.githubUrl} target="_blank" rel="noopener noreferrer">
|
||||
<Button variant="outline" size="sm" className="flex flex-row gap-2">
|
||||
<svg
|
||||
height="16"
|
||||
width="16"
|
||||
viewBox="0 0 16 16"
|
||||
fill="currentColor"
|
||||
>
|
||||
<path d="M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z" />
|
||||
</svg>
|
||||
View Full Code Example on GitHub{" "}
|
||||
<MoveUpRight className="w-3 h-3" />
|
||||
</Button>
|
||||
</a>
|
||||
</div>
|
||||
<div className="flex gap-4">
|
||||
<Button variant="ghost" size="sm" onClick={() => {
|
||||
setCollapsed(false);
|
||||
setPlainText(!plainText)}
|
||||
}>
|
||||
{plainText ? "Code Highlighted" : "Plain Text"}
|
||||
</Button>
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
setCollapsed(false);
|
||||
setPlainText(!plainText);
|
||||
}}
|
||||
>
|
||||
{plainText ? "Code Highlighted" : "Plain Text"}
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
</div>
|
||||
</>
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
@@ -5,8 +5,8 @@ import React, { useEffect, useState, useMemo } from "react";
|
||||
import { codeToHtml } from "shiki";
|
||||
|
||||
interface CodeStyleRenderProps {
|
||||
parsed: string
|
||||
language: string
|
||||
parsed: string;
|
||||
language: string;
|
||||
}
|
||||
|
||||
const CodeStyleRender = ({ parsed, language }: CodeStyleRenderProps) => {
|
||||
@@ -14,25 +14,27 @@ const CodeStyleRender = ({ parsed, language }: CodeStyleRenderProps) => {
|
||||
const theme = useTheme();
|
||||
|
||||
const themeName = useMemo(() => {
|
||||
return theme.resolvedTheme === "dark" ? "github-dark" : "github-light"
|
||||
}, [theme.resolvedTheme])
|
||||
return theme.resolvedTheme === "dark" ? "github-dark" : "github-light";
|
||||
}, [theme.resolvedTheme]);
|
||||
|
||||
useEffect(() => {
|
||||
const asyncHighlight = async () => {
|
||||
const highlightedHtml = await codeToHtml(parsed, {
|
||||
lang: language.toLowerCase(),
|
||||
theme: themeName,
|
||||
});
|
||||
const highlightedHtml = await codeToHtml(parsed, {
|
||||
lang: language.toLowerCase(),
|
||||
theme: themeName,
|
||||
});
|
||||
|
||||
setHtml(highlightedHtml);
|
||||
}
|
||||
setHtml(highlightedHtml);
|
||||
};
|
||||
|
||||
asyncHighlight();
|
||||
}, [parsed, language, themeName]);
|
||||
|
||||
return <>
|
||||
<div dangerouslySetInnerHTML={{ __html: html }}></div>
|
||||
</>
|
||||
return (
|
||||
<>
|
||||
<div dangerouslySetInnerHTML={{ __html: html }}></div>
|
||||
</>
|
||||
);
|
||||
};
|
||||
|
||||
export default CodeStyleRender;
|
||||
|
||||
@@ -27,7 +27,7 @@ export const CronTriggerPyProgrammaticAsync = {
|
||||
};
|
||||
|
||||
export const CronTriggerGoProgrammatic = {
|
||||
path: "examples/cron-trigger/programatic-crons.go",
|
||||
path: "examples/cron-programmatic/main.go",
|
||||
};
|
||||
|
||||
export const getStaticProps = ({}) =>
|
||||
|
||||
@@ -15,7 +15,7 @@ export const ScheduleTriggerPyAsync = {
|
||||
};
|
||||
|
||||
export const ScheduleTriggerGo = {
|
||||
path: "examples/scheduled/programatic.go",
|
||||
path: "examples/scheduled/main.go",
|
||||
};
|
||||
|
||||
export const getStaticProps = ({}) =>
|
||||
|
||||
@@ -222,6 +222,10 @@ nav {
|
||||
background-color: rgba(40, 99, 159, 0.05) !important;
|
||||
}
|
||||
|
||||
pre.shiki {
|
||||
overflow-x: auto;
|
||||
}
|
||||
|
||||
.dark .shiki {
|
||||
background-color: #1e293b !important;
|
||||
}
|
||||
|
||||
@@ -28,6 +28,8 @@ import (
|
||||
|
||||
type Client interface {
|
||||
Admin() AdminClient
|
||||
Cron() CronClient
|
||||
Schedule() ScheduleClient
|
||||
Dispatcher() DispatcherClient
|
||||
Event() EventClient
|
||||
Subscribe() SubscribeClient
|
||||
@@ -43,6 +45,8 @@ type clientImpl struct {
|
||||
conn *grpc.ClientConn
|
||||
|
||||
admin AdminClient
|
||||
cron CronClient
|
||||
schedule ScheduleClient
|
||||
dispatcher DispatcherClient
|
||||
event EventClient
|
||||
subscribe SubscribeClient
|
||||
@@ -301,6 +305,18 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
|
||||
return nil, fmt.Errorf("could not create cloud REST client: %w", err)
|
||||
}
|
||||
|
||||
cronClient, err := NewCronClient(rest, opts.l, opts.v, opts.tenantId, opts.namespace)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create cron client: %w", err)
|
||||
}
|
||||
|
||||
scheduleClient, err := NewScheduleClient(rest, opts.l, opts.v, opts.tenantId, opts.namespace)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create schedule client: %w", err)
|
||||
}
|
||||
|
||||
// if init workflows is set, then we need to initialize the workflows
|
||||
if opts.initWorkflows {
|
||||
if err := initWorkflows(opts.filesLoader, admin); err != nil {
|
||||
@@ -313,6 +329,8 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
|
||||
tenantId: opts.tenantId,
|
||||
l: opts.l,
|
||||
admin: admin,
|
||||
cron: cronClient,
|
||||
schedule: scheduleClient,
|
||||
dispatcher: dispatcher,
|
||||
subscribe: subscribe,
|
||||
event: event,
|
||||
@@ -329,6 +347,14 @@ func (c *clientImpl) Admin() AdminClient {
|
||||
return c.admin
|
||||
}
|
||||
|
||||
func (c *clientImpl) Cron() CronClient {
|
||||
return c.cron
|
||||
}
|
||||
|
||||
func (c *clientImpl) Schedule() ScheduleClient {
|
||||
return c.schedule
|
||||
}
|
||||
|
||||
func (c *clientImpl) Dispatcher() DispatcherClient {
|
||||
return c.dispatcher
|
||||
}
|
||||
|
||||
155
pkg/client/cron.go
Normal file
155
pkg/client/cron.go
Normal file
@@ -0,0 +1,155 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
|
||||
"github.com/hatchet-dev/hatchet/pkg/client/rest"
|
||||
"github.com/hatchet-dev/hatchet/pkg/validator"
|
||||
)
|
||||
|
||||
type CronOpts struct {
|
||||
// Name is the user-friendly name for the cron trigger
|
||||
Name string
|
||||
|
||||
// Expression is the cron expression for the trigger
|
||||
Expression string
|
||||
|
||||
// Input is the input to the workflow
|
||||
Input map[string]interface{}
|
||||
|
||||
// AdditionalMetadata is additional metadata to be stored with the cron trigger
|
||||
AdditionalMetadata map[string]string
|
||||
}
|
||||
|
||||
type CronClient interface {
|
||||
// Create creates a new cron trigger
|
||||
Create(ctx context.Context, workflow string, opts *CronOpts) (*gen.CronWorkflows, error)
|
||||
|
||||
// Delete deletes a cron trigger
|
||||
Delete(ctx context.Context, id string) error
|
||||
|
||||
// List lists all cron triggers
|
||||
List(ctx context.Context) (*gen.CronWorkflowsList, error)
|
||||
}
|
||||
|
||||
type cronClientImpl struct {
|
||||
restClient *rest.ClientWithResponses
|
||||
|
||||
l *zerolog.Logger
|
||||
|
||||
v validator.Validator
|
||||
|
||||
tenantId uuid.UUID
|
||||
|
||||
namespace string
|
||||
}
|
||||
|
||||
func NewCronClient(restClient *rest.ClientWithResponses, l *zerolog.Logger, v validator.Validator, tenantId, namespace string) (CronClient, error) {
|
||||
tenantIdUUID, err := uuid.Parse(tenantId)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &cronClientImpl{
|
||||
restClient: restClient,
|
||||
l: l,
|
||||
v: v,
|
||||
namespace: namespace,
|
||||
tenantId: tenantIdUUID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *cronClientImpl) Create(ctx context.Context, workflow string, opts *CronOpts) (*gen.CronWorkflows, error) {
|
||||
additionalMeta := make(map[string]any)
|
||||
|
||||
for k, v := range opts.AdditionalMetadata {
|
||||
additionalMeta[k] = v
|
||||
}
|
||||
|
||||
resp, err := c.restClient.CronWorkflowTriggerCreate(
|
||||
ctx,
|
||||
c.tenantId,
|
||||
workflow,
|
||||
rest.CreateCronWorkflowTriggerRequest{
|
||||
CronName: opts.Name,
|
||||
CronExpression: opts.Expression,
|
||||
Input: opts.Input,
|
||||
AdditionalMetadata: additionalMeta,
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if response code is not 200-level, return an error
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
// parse the response body into a cron trigger
|
||||
cron := &gen.CronWorkflows{}
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(cron)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal response body: %w", err)
|
||||
}
|
||||
|
||||
return cron, nil
|
||||
}
|
||||
|
||||
func (c *cronClientImpl) Delete(ctx context.Context, id string) error {
|
||||
idUUID, err := uuid.Parse(id)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse id: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.restClient.WorkflowCronDelete(
|
||||
ctx,
|
||||
c.tenantId,
|
||||
idUUID,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if response code is not 200-level, return an error
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cronClientImpl) List(ctx context.Context) (*gen.CronWorkflowsList, error) {
|
||||
resp, err := c.restClient.CronWorkflowList(
|
||||
ctx,
|
||||
c.tenantId,
|
||||
&rest.CronWorkflowListParams{},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// parse the response body into a list of cron triggers
|
||||
cronList := &gen.CronWorkflowsList{}
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(&cronList)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal response body: %w", err)
|
||||
}
|
||||
|
||||
return cronList, nil
|
||||
}
|
||||
152
pkg/client/schedule.go
Normal file
152
pkg/client/schedule.go
Normal file
@@ -0,0 +1,152 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
|
||||
"github.com/hatchet-dev/hatchet/pkg/client/rest"
|
||||
"github.com/hatchet-dev/hatchet/pkg/validator"
|
||||
)
|
||||
|
||||
type ScheduleOpts struct {
|
||||
// TriggerAt is the time at which the scheduled run should be triggered
|
||||
TriggerAt time.Time
|
||||
|
||||
// Input is the input to the workflow
|
||||
Input map[string]interface{}
|
||||
|
||||
// AdditionalMetadata is additional metadata to be stored with the cron trigger
|
||||
AdditionalMetadata map[string]string
|
||||
}
|
||||
|
||||
type ScheduleClient interface {
|
||||
// Create creates a new scheduled workflow run
|
||||
Create(ctx context.Context, workflow string, opts *ScheduleOpts) (*gen.ScheduledWorkflows, error)
|
||||
|
||||
// Delete deletes a scheduled workflow run
|
||||
Delete(ctx context.Context, id string) error
|
||||
|
||||
// List lists all scheduled workflow runs
|
||||
List(ctx context.Context) (*gen.ScheduledWorkflowsList, error)
|
||||
}
|
||||
|
||||
type scheduleClientImpl struct {
|
||||
restClient *rest.ClientWithResponses
|
||||
|
||||
l *zerolog.Logger
|
||||
|
||||
v validator.Validator
|
||||
|
||||
tenantId uuid.UUID
|
||||
|
||||
namespace string
|
||||
}
|
||||
|
||||
func NewScheduleClient(restClient *rest.ClientWithResponses, l *zerolog.Logger, v validator.Validator, tenantId, namespace string) (ScheduleClient, error) {
|
||||
tenantIdUUID, err := uuid.Parse(tenantId)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &scheduleClientImpl{
|
||||
restClient: restClient,
|
||||
l: l,
|
||||
v: v,
|
||||
namespace: namespace,
|
||||
tenantId: tenantIdUUID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *scheduleClientImpl) Create(ctx context.Context, workflow string, opts *ScheduleOpts) (*gen.ScheduledWorkflows, error) {
|
||||
additionalMeta := make(map[string]any)
|
||||
|
||||
for k, v := range opts.AdditionalMetadata {
|
||||
additionalMeta[k] = v
|
||||
}
|
||||
|
||||
resp, err := c.restClient.ScheduledWorkflowRunCreate(
|
||||
ctx,
|
||||
c.tenantId,
|
||||
workflow,
|
||||
rest.ScheduleWorkflowRunRequest{
|
||||
TriggerAt: opts.TriggerAt,
|
||||
Input: opts.Input,
|
||||
AdditionalMetadata: additionalMeta,
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if response code is not 200-level, return an error
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
// parse the response body into a scheduled workflow run
|
||||
scheduledWorkflow := &gen.ScheduledWorkflows{}
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(scheduledWorkflow)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal response body: %w", err)
|
||||
}
|
||||
|
||||
return scheduledWorkflow, nil
|
||||
}
|
||||
|
||||
func (c *scheduleClientImpl) Delete(ctx context.Context, id string) error {
|
||||
idUUID, err := uuid.Parse(id)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not parse id: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.restClient.WorkflowScheduledDelete(
|
||||
ctx,
|
||||
c.tenantId,
|
||||
idUUID,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if response code is not 200-level, return an error
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *scheduleClientImpl) List(ctx context.Context) (*gen.ScheduledWorkflowsList, error) {
|
||||
resp, err := c.restClient.WorkflowScheduledList(
|
||||
ctx,
|
||||
c.tenantId,
|
||||
&rest.WorkflowScheduledListParams{},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// parse the response body into a list of schedules
|
||||
scheduleList := &gen.ScheduledWorkflowsList{}
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(&scheduleList)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal response body: %w", err)
|
||||
}
|
||||
|
||||
return scheduleList, nil
|
||||
}
|
||||
@@ -1111,10 +1111,6 @@ func (r *workflowEngineRepository) createWorkflowVersionTxs(ctx context.Context,
|
||||
}
|
||||
|
||||
if oldWorkflowVersion != nil {
|
||||
|
||||
fmt.Println("oldWorkflowVersion", sqlchelpers.UUIDToStr(oldWorkflowVersion.WorkflowVersion.ID))
|
||||
fmt.Println("sqlcWorkflowTriggers", sqlchelpers.UUIDToStr(sqlcWorkflowTriggers.ID))
|
||||
|
||||
// move existing api crons to the new workflow version
|
||||
err = r.queries.MoveCronTriggerToNewWorkflowTriggers(ctx, tx, dbsqlc.MoveCronTriggerToNewWorkflowTriggersParams{
|
||||
Oldworkflowversionid: oldWorkflowVersion.WorkflowVersion.ID,
|
||||
|
||||
@@ -10,6 +10,9 @@ CREATE TYPE "WorkflowTriggerCronRefMethods" AS ENUM ('DEFAULT', 'API');
|
||||
CREATE TYPE "WorkflowTriggerScheduledRefMethods" AS ENUM ('DEFAULT', 'API');
|
||||
-- Modify "WorkflowTriggerCronRef" table
|
||||
ALTER TABLE "WorkflowTriggerCronRef" ADD COLUMN "name" text NULL, ADD COLUMN "id" uuid NOT NULL, ADD COLUMN "method" "WorkflowTriggerCronRefMethods" NOT NULL DEFAULT 'DEFAULT', ADD CONSTRAINT "WorkflowTriggerCronRef_parentId_cron_name_key" UNIQUE ("parentId", "cron", "name");
|
||||
|
||||
UPDATE "WorkflowTriggerCronRef" SET "name" = '' WHERE "name" IS NULL;
|
||||
|
||||
-- Modify "WorkflowTriggerScheduledRef" table
|
||||
ALTER TABLE "WorkflowTriggerScheduledRef" ADD COLUMN "method" "WorkflowTriggerScheduledRefMethods" NOT NULL DEFAULT 'DEFAULT';
|
||||
|
||||
|
||||
1
sql/migrations/20241204191714_v0.52.5.sql
Normal file
1
sql/migrations/20241204191714_v0.52.5.sql
Normal file
@@ -0,0 +1 @@
|
||||
UPDATE "WorkflowTriggerCronRef" SET "name" = '' WHERE "name" IS NULL;
|
||||
@@ -1,4 +1,4 @@
|
||||
h1:k8i1YyttxFMF1D3qndWWDz53IGb+MG6BGDN4MUZXqUY=
|
||||
h1:sKzk6H+hzjxCLeAk/CkTouxbldj14qwUpoKyby94tN0=
|
||||
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
|
||||
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
|
||||
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
|
||||
@@ -75,4 +75,5 @@ h1:k8i1YyttxFMF1D3qndWWDz53IGb+MG6BGDN4MUZXqUY=
|
||||
20241029122625_v0.51.0.sql h1:nOa4FqmZxSh1yBOJyduX+j15gQavjizTn660wyXjhNk=
|
||||
20241107162939_v0.51.2.sql h1:qtnUITelb0kzAazo99gdTzejmQeOiE8NTP8b8bpQuF0=
|
||||
20241114175346_v0.51.3.sql h1:ZbpRJsCmt6098ilZ3LtOk9LXRzuuwiznXPJmSkZSRpg=
|
||||
20241121142159_v0.52.0.sql h1:Aw4tw+g2CUe7W/JVD+fDX4tXeP5FLNIU3f8U1jtRMnc=
|
||||
20241121142159_v0.52.0.sql h1:fQdRQTxg2PiuFWbXJ9M4jxJLJsOkuWm5wfIuv8UPBVY=
|
||||
20241204191714_v0.52.5.sql h1:aN6nxVv2oT3vCoxVw33nspjxlx/y3YLvyG6gWtmRGQI=
|
||||
|
||||
Reference in New Issue
Block a user