From b0c6c7cd4654ab1019e8fc4d3d35c8ec52f0785c Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Wed, 4 Dec 2024 16:18:05 -0500 Subject: [PATCH] feat(go-sdk): cron and schedules API, minor fixes (#1083) * feat(go-sdk): cron and schedules API, minor fixes * try to improve code block and docs * revert pre-commit * fix: generate * fix: put overflow in right place * remove branch specs --- .github/workflows/test.yml | 1 + Taskfile.yaml | 6 +- .../openapi/paths/workflow/workflow.yaml | 99 ++++++----- api/v1/server/oas/gen/openapi.gen.go | 44 ++--- examples/cron-programmatic/main.go | 146 +++++++++++++++++ examples/cron/main.go | 25 ++- examples/scheduled/main.go | 137 +++++++++------- .../components/recurring-columns.tsx | 14 +- .../components/scheduled-runs-columns.tsx | 6 +- frontend/docs/components/code/CodeBlock.tsx | 150 ++++++++++++----- .../docs/components/code/CodeStyleRender.tsx | 28 ++-- .../features/triggering-runs/cron-trigger.mdx | 2 +- .../triggering-runs/schedule-trigger.mdx | 2 +- frontend/docs/styles/global.css | 4 + pkg/client/client.go | 26 +++ pkg/client/cron.go | 155 ++++++++++++++++++ pkg/client/schedule.go | 152 +++++++++++++++++ pkg/repository/prisma/workflow.go | 4 - sql/migrations/20241121142159_v0.52.0.sql | 3 + sql/migrations/20241204191714_v0.52.5.sql | 1 + sql/migrations/atlas.sum | 5 +- 21 files changed, 800 insertions(+), 210 deletions(-) create mode 100644 examples/cron-programmatic/main.go create mode 100644 pkg/client/cron.go create mode 100644 pkg/client/schedule.go create mode 100644 sql/migrations/20241204191714_v0.52.5.sql diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e3be2cd71..615f87366 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,6 +38,7 @@ jobs: - name: Generate run: | sh ./hack/db/atlas-apply.sh + task pre-commit-install task generate-all - name: Check for diff diff --git a/Taskfile.yaml b/Taskfile.yaml index 150abec55..e5b3d3df2 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -116,7 +116,7 @@ tasks: cmds: - sh ./hack/dev/generate-local-encryption-keys.sh ./hack/dev/encryption-keys init-dev-env: - - sh ./hack/dev/init-dev-token-and-env.sh + - sh ./hack/dev/init-dev-token-and-env.sh generate-dev-api-token: cmds: - sh ./hack/dev/generate-dev-api-token.sh @@ -167,10 +167,8 @@ tasks: - mkdir -p ./python-sdk/certs/ && cp ./hack/dev/certs/ca.cert ./python-sdk/certs/ pre-commit-install: cmds: - - pip install pre-commit + - pip install pre-commit # can use brew install pre-commit if you are on macOS - pre-commit install pre-commit-run: - deps: - - pre-commit-install cmds: - pre-commit run --all-files || pre-commit run --all-files diff --git a/api-contracts/openapi/paths/workflow/workflow.yaml b/api-contracts/openapi/paths/workflow/workflow.yaml index 8cb3a62c8..318e52131 100644 --- a/api-contracts/openapi/paths/workflow/workflow.yaml +++ b/api-contracts/openapi/paths/workflow/workflow.yaml @@ -827,10 +827,9 @@ workflowWorkersCount: tags: - Workflow - scheduledCreate: post: - x-resources: ["tenant", "workflow"] + x-resources: ["tenant"] description: Schedule a new workflow run for a tenant operationId: scheduled-workflow-run:create parameters: @@ -1024,30 +1023,30 @@ scheduled: minLength: 36 maxLength: 36 responses: - "200": - content: - application/json: - schema: - $ref: "../../components/schemas/_index.yaml#/ScheduledWorkflows" - description: Successfully retrieved the workflow runs - "400": - content: - application/json: - schema: - $ref: "../../components/schemas/_index.yaml#/APIErrors" - description: A malformed or bad request - "403": - content: - application/json: - schema: - $ref: "../../components/schemas/_index.yaml#/APIErrors" - description: Forbidden - "404": - content: - application/json: - schema: - $ref: "../../components/schemas/_index.yaml#/APIErrors" - description: Forbidden + "200": + content: + application/json: + schema: + $ref: "../../components/schemas/_index.yaml#/ScheduledWorkflows" + description: Successfully retrieved the workflow runs + "400": + content: + application/json: + schema: + $ref: "../../components/schemas/_index.yaml#/APIErrors" + description: A malformed or bad request + "403": + content: + application/json: + schema: + $ref: "../../components/schemas/_index.yaml#/APIErrors" + description: Forbidden + "404": + content: + application/json: + schema: + $ref: "../../components/schemas/_index.yaml#/APIErrors" + description: Forbidden summary: Get scheduled workflow run tags: - Workflow @@ -1118,30 +1117,30 @@ crons: minLength: 36 maxLength: 36 responses: - "200": - content: - application/json: - schema: - $ref: "../../components/schemas/_index.yaml#/CronWorkflows" - description: Successfully retrieved the workflow runs - "400": - content: - application/json: - schema: - $ref: "../../components/schemas/_index.yaml#/APIErrors" - description: A malformed or bad request - "403": - content: - application/json: - schema: - $ref: "../../components/schemas/_index.yaml#/APIErrors" - description: Forbidden - "404": - content: - application/json: - schema: - $ref: "../../components/schemas/_index.yaml#/APIErrors" - description: Forbidden + "200": + content: + application/json: + schema: + $ref: "../../components/schemas/_index.yaml#/CronWorkflows" + description: Successfully retrieved the workflow runs + "400": + content: + application/json: + schema: + $ref: "../../components/schemas/_index.yaml#/APIErrors" + description: A malformed or bad request + "403": + content: + application/json: + schema: + $ref: "../../components/schemas/_index.yaml#/APIErrors" + description: Forbidden + "404": + content: + application/json: + schema: + $ref: "../../components/schemas/_index.yaml#/APIErrors" + description: Forbidden summary: Get cron job workflow run tags: - Workflow diff --git a/api/v1/server/oas/gen/openapi.gen.go b/api/v1/server/oas/gen/openapi.gen.go index 701206f63..297b31188 100644 --- a/api/v1/server/oas/gen/openapi.gen.go +++ b/api/v1/server/oas/gen/openapi.gen.go @@ -10809,28 +10809,28 @@ var swaggerSpec = []string{ "evWSg5/Zn3ul5By1Xkl6kBvqLDvum6TBgTEZrRbVW+uupN/d1l9p3l/JgKdmDgkG2qjxXFoJA+50gZmd", "4r51HsftUbzrfk3rlSN2ikEWf/+ax9BUlqAETgifzJE09oE017zD7mTMrb69MigqA+4rQdtocUzNNjQp", "ZmHc/I1mLGzm5Kkm+jXD34rFzVfs27osiULQVVH5eoIYFVlcsCPr5bHUCIREttcHS6rEIA1bKbxJKSx3", - "QNmAJvLXqDdssLpQc3VUlcDv8qbZil8r8SsUkjqd2Dbj3CLSl2fh3vOiNCQ13jqsjcxpJNPHg0eAAjAK", - "IBPEiuTRX8y/QsKzfONTNuPOS+G61FM7nlCmsFkL3sI5qXDyaQ3jhuf6ApIWS0hXZP8UwwQfeGmSwGrO", - "xvyiwBs6tFuJe28wTL5CcioGWyPd0Zka0hmDuC1k8vaFTKCXJoi8MDHuRdEDgt2Uyq4/7qiomotzK5Kb", - "JHe2/RoyniAyTUcHHgiCEfAejOR8Gs3iABLIafqSzu9ozyM6ES/j8JUNfUlxeSqHnyPwD4fHNU8LnpjX", - "L887hcAXNcuCiG+GtkZeJtZf55BZwJ1cYHEOS/RhAhKzKBjSr4shjnVtjjUGz/pxxqBriLAomgRwPfTG", - "hv6b0xtH34rpLUfc347eUPiICLQpbCi1Yd6BKd1Wxzcd4Zr17Yu51niKqxNZuVIECMuNKS6w1Retj1WW", - "23MOeznlXWvscwXaOwCeB2NSUaeffceZsU1MUqI2dfN5H3c9piU+OJ+ovvBeBfXxlevor3UIyKvPMySV", - "9t6evhLIUg5WVOSi35vRF+/jrqu+FR18BfTFV97SV031cYqkBegriCYoNJPVeTTBDgodwM7G/QoF45wN", - "tB5aYkcwHX9DFUKt7tFBNJlA30Fhe33equtz8VinVGN7Tw6iSZSSGmaIUmLHDVH69rYeQaPRltXLaYm0", - "Rhll1GNLtjM4G8EET1Hc4AqkdLK7BvEj5EfeTUQUrZXA9ZM2vw+pKGrvRIvciVQM1pNkDDB+ipIKpwQu", - "JoUkdWT7KpF6Jcdcn45xOgXhJJtom5QNj0HmZ4hqxfkOiXNOVkVKt2CiBE6oIEuqLn28Ba7USDKXnXWx", - "jQRjmxhGIq995toJPV2SkK3OgwPgPazlhWFIR97iB4YaUdPwxeEJjqZR9LAnHFIOfoofLKK8qNARrcsO", - "K/x3+wAuMZDZISSbaMP+IJYRURK+VsS8vYiZj8JSydToBSJa2DHHgcCzzX1LNpX1wao5Rhyh2DZdw9by", - "zWr8qDj03I1KoIZiZiAmNDnBZtkoBXay7WrZc4vYk10vS1vUlEcz3mR/vFqU/NUYNziFWYY7CmezKt9F", - "TYTL7nguNvYhEytuDSsl58RSDAjVv6p9EZmGRqmQeNMKs0klIfNWO0PLa7iVMgQUzg3TWSEwkEqUbS40", - "wpLXOGQtp+k5TTDEMsw2d5rMO/lb5bvIPJGtAuwb3Iu20lO+Sa6IDMA2ZmfzMTu665BCMQv6yXfqNCx7", - "Tmigcr2HgJEFg0Ra3npr3lKjUZZhLBu1z567mumBW8Fg66tnzJFhGz7Lta4il21aObSSCPPqYSsPjAri", - "csxZoyZaJW2nm1TMzp4x3iNMME+gaTwpGyRp3wZ+1iRK5GkOV1DFZvEaNnrAJkmUxiz7ZA6C3CgjKKzT", - "d/ji1mYGWLOQWDIjtCC9Nin0NmoTC2WhbiS4ZLYSo5uBDLRvmj9kobQhWym5rjXssu/0x8y6jVNKHdDv", - "MK4KAIGYZDyFsDOGxJtC35SjOBf8W65ICTJYMBfJm2UgUeBtlHqkTTjSJhzZYMIRrWgWsgFbvGoVTnIr", - "sfwbb7xDJpi/g1xes5QTm7qkKtjKu61SAXNSXFQFnPchG0GQwCTzIetovcpg8ijlQZoE7onrvt69/r8A", - "AAD//7hJKYh3/QEA", + "QNmAJvLXqDdssLpQc3VUlcDv8qbZil8r8SsUkjqdeOUil6fe3vOiNCQ1LjqsjUxkJHPGg0eAAjAKIJO+", + "irjR38a/QsJTe+NTNuPOi966fFM7nkWmsFkLXr05qXDyaa3hhjf6ApIWy0JXZP8UwwQfeGmSwGrOxvx2", + "wBs6tFuJe28wTL5CcioGWyPd0Zka0hmDuK1e8vbVS6CXJoi8MDHuRdEDgt2Uyq4/7qiomgtuK5KbJHe2", + "/RoyniAyTUcHHgiCEfAejOR8Gs3iABLIafqSzu9ozyM6Ea/d8JUNfUlxeSqHnyPwD4fHNe8JnpjXL887", + "hcAXhcqCiG+GtjBeJtZf55BZwJ1cYHEOS/RhAhKzKBjSr4shjnVtjjUGz/pxxqBriLAomgRwPfTGhv6b", + "0xtH34rpLUfc347eUPiICLSpZii1Yd6BKd1Wxzcd4Zr17Yu51niKqxNZ+U8ECMuNKS6w1Retj1WW0HMO", + "eznlXWtuiAXaOwCeB2NSUZyffceZhU1MUqI2dfN5H3c99iQ+OJ+ovtpeBfXxlevor/UCyEvOMySV9t6e", + "vhLI8gxWlOGi35vRF+/jrquoFR18BfTFV97SV03JcYqkBegriCYoNJPVeTTBDgodwM7G/QoF45wNtB5a", + "YkcwHX9DZUGt7tFBNJlA30Fhe33equtz8VinVGN7Tw6iSZSSGmaIUmLHDVH69rYeQaPRlhXJaYm0Rhll", + "1GNLtjM4G8EET1Hc4AqkdLK7BvEj5EfeTYQRrZXA9ZM2vw+pKGrvRIvciVQM1pNkDDB+ipIKTwQuJoUk", + "dWT7KpF6Jcdcn45xOgXhJJtom5QNj0HmZ4hqxfkOiXNOVkVKt2CiBE6oIEuqLn28Ba7USDI/nXWxjQRj", + "mxhGIq995toJPV2SkK3OgwPgPazlhWFIR97iB4YaUdPwxeEJjqZR9LAnHFIOfoofLEK7qNARrcsOK/x3", + "+6gtMZDZISSbaMP+IJZhUBK+VsS8vYiZD71SydToBSJa2DHHgcCzzX1LNpVFwao5Rhyh2DZHw9byzWr8", + "qDj03I1KoIZiZiAmNHm+ZikoBXay7WrZc4vYk10vS1vUlEcz3mR/vFrU+dUYNziFWcY4CmezKt9FTVjL", + "7nguNvYhEytuDSsl58RS4AfVv6p9EZmGRqmQeNMKs0klIfNWO0PLa7iVMgQUzg3TWSEwkEqUbS4ewpLX", + "OGQtp+k5TTDEMsw2d5rMO/lbJbnIPJGtouob3Iu20lO+SYKIDMA2UGfzgTq665BCMQv6yXfqNCx7Tmig", + "cr2HgJEFg0Ra3npr3lKjUZZhLBu1z567mumBW8Fg6ytizJFhGzPLta4il21aObSSCPPqYSsPjAricsxZ", + "oyZaZWqnm1RMyZ4x3iNMMM+aaTwpG2Rm3wZ+1mRH5LkNV1C6ZvHCNXrAJkmUxizlZA6C3CgjKKzTd/ji", + "1qYDWLOQWDINtCC9NhP0NmoTC6WebiS4ZIoSo5uBjK5vmjRkoVwhWym5rjXssu/0x8y6jVNKHdDvMK4K", + "AIGYZDyFsDOGxJtC35SYOBf8W65ICTJYMAHJm6UdUeBtlG+kzTLSZhlZQ5aRRqJZyAZs8apVOMmtxPJv", + "vPEOmWD+DnJ5zVJObOqSqmAr77ZKBcxJcVEVcN6HbARBApPMh6yj9SqDyaOUB2kSuCeu+3r3+v8CAAD/", + "/3YBafhs/QEA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/examples/cron-programmatic/main.go b/examples/cron-programmatic/main.go new file mode 100644 index 000000000..46f396e35 --- /dev/null +++ b/examples/cron-programmatic/main.go @@ -0,0 +1,146 @@ +package main + +import ( + "context" + "fmt" + + "github.com/joho/godotenv" + + "github.com/hatchet-dev/hatchet/pkg/client" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" + "github.com/hatchet-dev/hatchet/pkg/worker" +) + +// ❓ Create +// ... normal workflow definition +type printOutput struct{} + +func print(ctx context.Context) (result *printOutput, err error) { + fmt.Println("called print:print") + + return &printOutput{}, nil +} + +// , +func main() { + // ... initialize client, worker and workflow + err := godotenv.Load() + + if err != nil { + panic(err) + } + + c, err := client.New() + + if err != nil { + panic(err) + } + + w, err := worker.NewWorker( + worker.WithClient( + c, + ), + ) + + if err != nil { + panic(err) + } + + err = w.RegisterWorkflow( + &worker.WorkflowJob{ + On: worker.NoTrigger(), + Name: "cron-workflow", + Description: "Demonstrates a simple cron workflow", + Steps: []*worker.WorkflowStep{ + worker.Fn(print), + }, + }, + ) + + if err != nil { + panic(err) + } + + interrupt := cmdutils.InterruptChan() + + cleanup, err := w.Start() + + if err != nil { + panic(err) + } + + // , + + go func() { + // 👀 define the cron expression to run every minute + cron, err := c.Cron().Create( + context.Background(), + "cron-workflow", + &client.CronOpts{ + Name: "every-minute", + Expression: "* * * * *", + Input: map[string]interface{}{ + "message": "Hello, world!", + }, + AdditionalMetadata: map[string]string{}, + }, + ) + + if err != nil { + panic(err) + } + + fmt.Println(*cron.Name, cron.Cron) + }() + + // ... wait for interrupt signal + + <-interrupt + + if err := cleanup(); err != nil { + panic(fmt.Errorf("error cleaning up: %w", err)) + } + + // , +} + +// !! + +func ListCrons() { + + c, err := client.New() + + if err != nil { + panic(err) + } + + // ❓ List + crons, err := c.Cron().List(context.Background()) + // !! + + if err != nil { + panic(err) + } + + for _, cron := range *crons.Rows { + fmt.Println(cron.Cron, *cron.Name) + } +} + +func DeleteCron(id string) { + c, err := client.New() + + if err != nil { + panic(err) + } + + // ❓ Delete + // 👀 id is the cron's metadata id, can get it via cron.Metadata.Id + err = c.Cron().Delete(context.Background(), id) + // !! + + if err != nil { + panic(err) + } + +} diff --git a/examples/cron/main.go b/examples/cron/main.go index 8e5c62728..c7d4c2d66 100644 --- a/examples/cron/main.go +++ b/examples/cron/main.go @@ -11,6 +11,8 @@ import ( "github.com/hatchet-dev/hatchet/pkg/worker" ) +// ❓ Workflow Definition Cron Trigger +// ... normal workflow definition type printOutput struct{} func print(ctx context.Context) (result *printOutput, err error) { @@ -19,7 +21,9 @@ func print(ctx context.Context) (result *printOutput, err error) { return &printOutput{}, nil } +// , func main() { + // ... initialize client and worker err := godotenv.Load() if err != nil { @@ -42,18 +46,29 @@ func main() { panic(err) } - err = w.On( - worker.Cron("* * * * *"), - worker.Fn(print), + // , + err = w.RegisterWorkflow( + &worker.WorkflowJob{ + // 👀 define the cron expression to run every minute + On: worker.Cron("* * * * *"), + Name: "cron-workflow", + Description: "Demonstrates a simple cron workflow", + Steps: []*worker.WorkflowStep{ + worker.Fn(print), + }, + }, ) if err != nil { panic(err) } + // ... start worker + interrupt := cmdutils.InterruptChan() cleanup, err := w.Start() + if err != nil { panic(err) } @@ -63,4 +78,8 @@ func main() { if err := cleanup(); err != nil { panic(fmt.Errorf("error cleaning up: %w", err)) } + + // , } + +// !! diff --git a/examples/scheduled/main.go b/examples/scheduled/main.go index a4fe859c3..676186399 100644 --- a/examples/scheduled/main.go +++ b/examples/scheduled/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "time" @@ -11,33 +12,19 @@ import ( "github.com/hatchet-dev/hatchet/pkg/worker" ) -type scheduledInput struct { - ScheduledAt time.Time `json:"scheduled_at"` - ExecuteAt time.Time `json:"scheduled_for"` -} - -type stepOneOutput struct { - Message string `json:"message"` -} - -func StepOne(ctx worker.HatchetContext) (result *stepOneOutput, err error) { - input := &scheduledInput{} - - err = ctx.WorkflowInput(input) - - if err != nil { - return nil, err - } - - // get time between execute at and scheduled at - timeBetween := time.Since(input.ScheduledAt) - - return &stepOneOutput{ - Message: fmt.Sprintf("This ran %s after scheduling", timeBetween), - }, nil +// ❓ Create +// ... normal workflow definition +type printOutput struct{} + +func print(ctx context.Context) (result *printOutput, err error) { + fmt.Println("called print:print") + + return &printOutput{}, nil } +// , func main() { + // ... initialize client, worker and workflow err := godotenv.Load() if err != nil { @@ -60,13 +47,13 @@ func main() { panic(err) } - err = w.On( - worker.NoTrigger(), + err = w.RegisterWorkflow( &worker.WorkflowJob{ - Name: "scheduled-workflow", - Description: "This runs at a scheduled time.", + On: worker.NoTrigger(), + Name: "schedule-workflow", + Description: "Demonstrates a simple scheduled workflow", Steps: []*worker.WorkflowStep{ - worker.Fn(StepOne).SetName("step-one"), + worker.Fn(print), }, }, ) @@ -75,44 +62,84 @@ func main() { panic(err) } - interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan()) - defer cancel() + interrupt := cmdutils.InterruptChan() cleanup, err := w.Start() + if err != nil { - panic(fmt.Errorf("error cleaning up: %w", err)) + panic(err) } + // , + go func() { - time.Sleep(5 * time.Second) - - executeAt := time.Now().Add(time.Second * 10) - executeAt2 := time.Now().Add(time.Second * 20) - executeAt3 := time.Now().Add(time.Second * 30) - - err = c.Admin().ScheduleWorkflow( - "scheduled-workflow", - client.WithSchedules(executeAt, executeAt2, executeAt3), - client.WithInput(&scheduledInput{ - ScheduledAt: time.Now(), - ExecuteAt: executeAt, - }), + // 👀 define the scheduled workflow to run in a minute + schedule, err := c.Schedule().Create( + context.Background(), + "schedule-workflow", + &client.ScheduleOpts{ + // 👀 define the time to run the scheduled workflow, in UTC + TriggerAt: time.Now().UTC().Add(time.Minute), + Input: map[string]interface{}{ + "message": "Hello, world!", + }, + AdditionalMetadata: map[string]string{}, + }, ) if err != nil { panic(err) } + + fmt.Println(schedule.TriggerAt, schedule.WorkflowName) }() - for { - select { - case <-interruptCtx.Done(): - if err := cleanup(); err != nil { - panic(fmt.Errorf("error cleaning up: %w", err)) - } - return - default: - time.Sleep(time.Second) - } + // ... wait for interrupt signal + + <-interrupt + + if err := cleanup(); err != nil { + panic(fmt.Errorf("error cleaning up: %w", err)) + } + + // , +} + +// !! + +func ListScheduledWorkflows() { + c, err := client.New() + + if err != nil { + panic(err) + } + + // ❓ List + schedules, err := c.Schedule().List(context.Background()) + // !! + + if err != nil { + panic(err) + } + + for _, schedule := range *schedules.Rows { + fmt.Println(schedule.TriggerAt, schedule.WorkflowName) + } +} + +func DeleteScheduledWorkflow(id string) { + c, err := client.New() + + if err != nil { + panic(err) + } + + // ❓ Delete + // 👀 id is the schedule's metadata id, can get it via schedule.Metadata.Id + err = c.Schedule().Delete(context.Background(), id) + // !! + + if err != nil { + panic(err) } } diff --git a/frontend/app/src/pages/main/recurring/components/recurring-columns.tsx b/frontend/app/src/pages/main/recurring/components/recurring-columns.tsx index 316e79022..bbe3cb4cf 100644 --- a/frontend/app/src/pages/main/recurring/components/recurring-columns.tsx +++ b/frontend/app/src/pages/main/recurring/components/recurring-columns.tsx @@ -20,7 +20,7 @@ export const columns = ({ ), cell: ({ row }) => ( -
+
{row.original.cron}
), @@ -29,11 +29,11 @@ export const columns = ({ { accessorKey: 'readable', header: ({ column }) => ( - + ), cell: ({ row }) => ( -
- runs {CronPrettifier.toString(row.original.cron).toLowerCase()} UTC +
+ Runs {CronPrettifier.toString(row.original.cron).toLowerCase()} UTC
), enableSorting: false, @@ -59,7 +59,7 @@ export const columns = ({ ), cell: ({ row }) => ( -
+
{row.original.workflowName} @@ -92,7 +92,7 @@ export const columns = ({ ), cell: ({ row }) => ( -
+
), @@ -128,7 +128,7 @@ export const columns = ({ onClick: () => onDeleteClick(row.original), disabled: row.original.method !== 'API' - ? 'Cannot delete recurring workflow created via workflow code definition' + ? 'This cron was created via the workflow code definition. Delete it from the workflow definition instead.' : undefined, }, ]} diff --git a/frontend/app/src/pages/main/scheduled-runs/components/scheduled-runs-columns.tsx b/frontend/app/src/pages/main/scheduled-runs/components/scheduled-runs-columns.tsx index 827c7f66c..5f1bcb762 100644 --- a/frontend/app/src/pages/main/scheduled-runs/components/scheduled-runs-columns.tsx +++ b/frontend/app/src/pages/main/scheduled-runs/components/scheduled-runs-columns.tsx @@ -48,7 +48,7 @@ export const columns = ({ ), cell: ({ row }) => ( -
+
), @@ -59,7 +59,7 @@ export const columns = ({ ), cell: ({ row }) => ( -
+
{row.original.workflowName} @@ -92,7 +92,7 @@ export const columns = ({ ), cell: ({ row }) => ( -
+
), diff --git a/frontend/docs/components/code/CodeBlock.tsx b/frontend/docs/components/code/CodeBlock.tsx index f07130160..335b1abe2 100644 --- a/frontend/docs/components/code/CodeBlock.tsx +++ b/frontend/docs/components/code/CodeBlock.tsx @@ -3,7 +3,14 @@ import { parseDocComments } from "./codeParser"; import { Src } from "./codeData"; import CodeStyleRender from "./CodeStyleRender"; import { Button } from "../ui/button"; -import { CheckIcon, CopyIcon, FoldVertical, MoveRight, MoveUpRight, UnfoldVertical } from "lucide-react"; +import { + CheckIcon, + CopyIcon, + FoldVertical, + MoveRight, + MoveUpRight, + UnfoldVertical, +} from "lucide-react"; interface CodeRendererProps { source: Src; @@ -11,57 +18,110 @@ interface CodeRendererProps { } export const CodeBlock = ({ source, target }: CodeRendererProps) => { - const [collapsed, setCollapsed] = React.useState(true); - const [plainText, setPlainText] = React.useState(false); - const [copied, setCopied] = React.useState(false); + const [collapsed, setCollapsed] = React.useState(true); + const [plainText, setPlainText] = React.useState(false); + const [copied, setCopied] = React.useState(false); - const parsed = parseDocComments(source.raw, target, collapsed); + const parsed = parseDocComments(source.raw, target, collapsed); - return <> -
- {source.props.path} -
- - -
+ return ( + <> +
+ + {source.props.path} + +
+ +
- {!plainText && } +
+
+ {!plainText && ( + + )} {/* plain text for SEO */} - + +
- -
+
- +
-
- +
-} + ); +}; diff --git a/frontend/docs/components/code/CodeStyleRender.tsx b/frontend/docs/components/code/CodeStyleRender.tsx index 4da41b9ec..e8cedf23d 100644 --- a/frontend/docs/components/code/CodeStyleRender.tsx +++ b/frontend/docs/components/code/CodeStyleRender.tsx @@ -5,8 +5,8 @@ import React, { useEffect, useState, useMemo } from "react"; import { codeToHtml } from "shiki"; interface CodeStyleRenderProps { - parsed: string - language: string + parsed: string; + language: string; } const CodeStyleRender = ({ parsed, language }: CodeStyleRenderProps) => { @@ -14,25 +14,27 @@ const CodeStyleRender = ({ parsed, language }: CodeStyleRenderProps) => { const theme = useTheme(); const themeName = useMemo(() => { - return theme.resolvedTheme === "dark" ? "github-dark" : "github-light" - }, [theme.resolvedTheme]) + return theme.resolvedTheme === "dark" ? "github-dark" : "github-light"; + }, [theme.resolvedTheme]); useEffect(() => { const asyncHighlight = async () => { - const highlightedHtml = await codeToHtml(parsed, { - lang: language.toLowerCase(), - theme: themeName, - }); + const highlightedHtml = await codeToHtml(parsed, { + lang: language.toLowerCase(), + theme: themeName, + }); - setHtml(highlightedHtml); - } + setHtml(highlightedHtml); + }; asyncHighlight(); }, [parsed, language, themeName]); - return <> -
- + return ( + <> +
+ + ); }; export default CodeStyleRender; diff --git a/frontend/docs/pages/home/features/triggering-runs/cron-trigger.mdx b/frontend/docs/pages/home/features/triggering-runs/cron-trigger.mdx index e2dfc9e4f..2a8632439 100644 --- a/frontend/docs/pages/home/features/triggering-runs/cron-trigger.mdx +++ b/frontend/docs/pages/home/features/triggering-runs/cron-trigger.mdx @@ -27,7 +27,7 @@ export const CronTriggerPyProgrammaticAsync = { }; export const CronTriggerGoProgrammatic = { - path: "examples/cron-trigger/programatic-crons.go", + path: "examples/cron-programmatic/main.go", }; export const getStaticProps = ({}) => diff --git a/frontend/docs/pages/home/features/triggering-runs/schedule-trigger.mdx b/frontend/docs/pages/home/features/triggering-runs/schedule-trigger.mdx index bbf6fa6b1..84f2e5067 100644 --- a/frontend/docs/pages/home/features/triggering-runs/schedule-trigger.mdx +++ b/frontend/docs/pages/home/features/triggering-runs/schedule-trigger.mdx @@ -15,7 +15,7 @@ export const ScheduleTriggerPyAsync = { }; export const ScheduleTriggerGo = { - path: "examples/scheduled/programatic.go", + path: "examples/scheduled/main.go", }; export const getStaticProps = ({}) => diff --git a/frontend/docs/styles/global.css b/frontend/docs/styles/global.css index c257c1226..04dfcf329 100644 --- a/frontend/docs/styles/global.css +++ b/frontend/docs/styles/global.css @@ -222,6 +222,10 @@ nav { background-color: rgba(40, 99, 159, 0.05) !important; } +pre.shiki { + overflow-x: auto; +} + .dark .shiki { background-color: #1e293b !important; } diff --git a/pkg/client/client.go b/pkg/client/client.go index 28d5c601b..24bbe95ea 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -28,6 +28,8 @@ import ( type Client interface { Admin() AdminClient + Cron() CronClient + Schedule() ScheduleClient Dispatcher() DispatcherClient Event() EventClient Subscribe() SubscribeClient @@ -43,6 +45,8 @@ type clientImpl struct { conn *grpc.ClientConn admin AdminClient + cron CronClient + schedule ScheduleClient dispatcher DispatcherClient event EventClient subscribe SubscribeClient @@ -301,6 +305,18 @@ func newFromOpts(opts *ClientOpts) (Client, error) { return nil, fmt.Errorf("could not create cloud REST client: %w", err) } + cronClient, err := NewCronClient(rest, opts.l, opts.v, opts.tenantId, opts.namespace) + + if err != nil { + return nil, fmt.Errorf("could not create cron client: %w", err) + } + + scheduleClient, err := NewScheduleClient(rest, opts.l, opts.v, opts.tenantId, opts.namespace) + + if err != nil { + return nil, fmt.Errorf("could not create schedule client: %w", err) + } + // if init workflows is set, then we need to initialize the workflows if opts.initWorkflows { if err := initWorkflows(opts.filesLoader, admin); err != nil { @@ -313,6 +329,8 @@ func newFromOpts(opts *ClientOpts) (Client, error) { tenantId: opts.tenantId, l: opts.l, admin: admin, + cron: cronClient, + schedule: scheduleClient, dispatcher: dispatcher, subscribe: subscribe, event: event, @@ -329,6 +347,14 @@ func (c *clientImpl) Admin() AdminClient { return c.admin } +func (c *clientImpl) Cron() CronClient { + return c.cron +} + +func (c *clientImpl) Schedule() ScheduleClient { + return c.schedule +} + func (c *clientImpl) Dispatcher() DispatcherClient { return c.dispatcher } diff --git a/pkg/client/cron.go b/pkg/client/cron.go new file mode 100644 index 000000000..35bcaa2ff --- /dev/null +++ b/pkg/client/cron.go @@ -0,0 +1,155 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/google/uuid" + "github.com/rs/zerolog" + + "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" + "github.com/hatchet-dev/hatchet/pkg/client/rest" + "github.com/hatchet-dev/hatchet/pkg/validator" +) + +type CronOpts struct { + // Name is the user-friendly name for the cron trigger + Name string + + // Expression is the cron expression for the trigger + Expression string + + // Input is the input to the workflow + Input map[string]interface{} + + // AdditionalMetadata is additional metadata to be stored with the cron trigger + AdditionalMetadata map[string]string +} + +type CronClient interface { + // Create creates a new cron trigger + Create(ctx context.Context, workflow string, opts *CronOpts) (*gen.CronWorkflows, error) + + // Delete deletes a cron trigger + Delete(ctx context.Context, id string) error + + // List lists all cron triggers + List(ctx context.Context) (*gen.CronWorkflowsList, error) +} + +type cronClientImpl struct { + restClient *rest.ClientWithResponses + + l *zerolog.Logger + + v validator.Validator + + tenantId uuid.UUID + + namespace string +} + +func NewCronClient(restClient *rest.ClientWithResponses, l *zerolog.Logger, v validator.Validator, tenantId, namespace string) (CronClient, error) { + tenantIdUUID, err := uuid.Parse(tenantId) + + if err != nil { + return nil, err + } + + return &cronClientImpl{ + restClient: restClient, + l: l, + v: v, + namespace: namespace, + tenantId: tenantIdUUID, + }, nil +} + +func (c *cronClientImpl) Create(ctx context.Context, workflow string, opts *CronOpts) (*gen.CronWorkflows, error) { + additionalMeta := make(map[string]any) + + for k, v := range opts.AdditionalMetadata { + additionalMeta[k] = v + } + + resp, err := c.restClient.CronWorkflowTriggerCreate( + ctx, + c.tenantId, + workflow, + rest.CreateCronWorkflowTriggerRequest{ + CronName: opts.Name, + CronExpression: opts.Expression, + Input: opts.Input, + AdditionalMetadata: additionalMeta, + }, + ) + + if err != nil { + return nil, err + } + + // if response code is not 200-level, return an error + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + // parse the response body into a cron trigger + cron := &gen.CronWorkflows{} + + err = json.NewDecoder(resp.Body).Decode(cron) + + if err != nil { + return nil, fmt.Errorf("could not unmarshal response body: %w", err) + } + + return cron, nil +} + +func (c *cronClientImpl) Delete(ctx context.Context, id string) error { + idUUID, err := uuid.Parse(id) + + if err != nil { + return fmt.Errorf("could not parse id: %w", err) + } + + resp, err := c.restClient.WorkflowCronDelete( + ctx, + c.tenantId, + idUUID, + ) + + if err != nil { + return err + } + + // if response code is not 200-level, return an error + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil +} + +func (c *cronClientImpl) List(ctx context.Context) (*gen.CronWorkflowsList, error) { + resp, err := c.restClient.CronWorkflowList( + ctx, + c.tenantId, + &rest.CronWorkflowListParams{}, + ) + + if err != nil { + return nil, err + } + + // parse the response body into a list of cron triggers + cronList := &gen.CronWorkflowsList{} + + err = json.NewDecoder(resp.Body).Decode(&cronList) + + if err != nil { + return nil, fmt.Errorf("could not unmarshal response body: %w", err) + } + + return cronList, nil +} diff --git a/pkg/client/schedule.go b/pkg/client/schedule.go new file mode 100644 index 000000000..7dad0719d --- /dev/null +++ b/pkg/client/schedule.go @@ -0,0 +1,152 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog" + + "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" + "github.com/hatchet-dev/hatchet/pkg/client/rest" + "github.com/hatchet-dev/hatchet/pkg/validator" +) + +type ScheduleOpts struct { + // TriggerAt is the time at which the scheduled run should be triggered + TriggerAt time.Time + + // Input is the input to the workflow + Input map[string]interface{} + + // AdditionalMetadata is additional metadata to be stored with the cron trigger + AdditionalMetadata map[string]string +} + +type ScheduleClient interface { + // Create creates a new scheduled workflow run + Create(ctx context.Context, workflow string, opts *ScheduleOpts) (*gen.ScheduledWorkflows, error) + + // Delete deletes a scheduled workflow run + Delete(ctx context.Context, id string) error + + // List lists all scheduled workflow runs + List(ctx context.Context) (*gen.ScheduledWorkflowsList, error) +} + +type scheduleClientImpl struct { + restClient *rest.ClientWithResponses + + l *zerolog.Logger + + v validator.Validator + + tenantId uuid.UUID + + namespace string +} + +func NewScheduleClient(restClient *rest.ClientWithResponses, l *zerolog.Logger, v validator.Validator, tenantId, namespace string) (ScheduleClient, error) { + tenantIdUUID, err := uuid.Parse(tenantId) + + if err != nil { + return nil, err + } + + return &scheduleClientImpl{ + restClient: restClient, + l: l, + v: v, + namespace: namespace, + tenantId: tenantIdUUID, + }, nil +} + +func (c *scheduleClientImpl) Create(ctx context.Context, workflow string, opts *ScheduleOpts) (*gen.ScheduledWorkflows, error) { + additionalMeta := make(map[string]any) + + for k, v := range opts.AdditionalMetadata { + additionalMeta[k] = v + } + + resp, err := c.restClient.ScheduledWorkflowRunCreate( + ctx, + c.tenantId, + workflow, + rest.ScheduleWorkflowRunRequest{ + TriggerAt: opts.TriggerAt, + Input: opts.Input, + AdditionalMetadata: additionalMeta, + }, + ) + + if err != nil { + return nil, err + } + + // if response code is not 200-level, return an error + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + // parse the response body into a scheduled workflow run + scheduledWorkflow := &gen.ScheduledWorkflows{} + + err = json.NewDecoder(resp.Body).Decode(scheduledWorkflow) + + if err != nil { + return nil, fmt.Errorf("could not unmarshal response body: %w", err) + } + + return scheduledWorkflow, nil +} + +func (c *scheduleClientImpl) Delete(ctx context.Context, id string) error { + idUUID, err := uuid.Parse(id) + + if err != nil { + return fmt.Errorf("could not parse id: %w", err) + } + + resp, err := c.restClient.WorkflowScheduledDelete( + ctx, + c.tenantId, + idUUID, + ) + + if err != nil { + return err + } + + // if response code is not 200-level, return an error + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil +} + +func (c *scheduleClientImpl) List(ctx context.Context) (*gen.ScheduledWorkflowsList, error) { + resp, err := c.restClient.WorkflowScheduledList( + ctx, + c.tenantId, + &rest.WorkflowScheduledListParams{}, + ) + + if err != nil { + return nil, err + } + + // parse the response body into a list of schedules + scheduleList := &gen.ScheduledWorkflowsList{} + + err = json.NewDecoder(resp.Body).Decode(&scheduleList) + + if err != nil { + return nil, fmt.Errorf("could not unmarshal response body: %w", err) + } + + return scheduleList, nil +} diff --git a/pkg/repository/prisma/workflow.go b/pkg/repository/prisma/workflow.go index 63406db46..703558f16 100644 --- a/pkg/repository/prisma/workflow.go +++ b/pkg/repository/prisma/workflow.go @@ -1111,10 +1111,6 @@ func (r *workflowEngineRepository) createWorkflowVersionTxs(ctx context.Context, } if oldWorkflowVersion != nil { - - fmt.Println("oldWorkflowVersion", sqlchelpers.UUIDToStr(oldWorkflowVersion.WorkflowVersion.ID)) - fmt.Println("sqlcWorkflowTriggers", sqlchelpers.UUIDToStr(sqlcWorkflowTriggers.ID)) - // move existing api crons to the new workflow version err = r.queries.MoveCronTriggerToNewWorkflowTriggers(ctx, tx, dbsqlc.MoveCronTriggerToNewWorkflowTriggersParams{ Oldworkflowversionid: oldWorkflowVersion.WorkflowVersion.ID, diff --git a/sql/migrations/20241121142159_v0.52.0.sql b/sql/migrations/20241121142159_v0.52.0.sql index 9476d02df..c66a438cc 100644 --- a/sql/migrations/20241121142159_v0.52.0.sql +++ b/sql/migrations/20241121142159_v0.52.0.sql @@ -10,6 +10,9 @@ CREATE TYPE "WorkflowTriggerCronRefMethods" AS ENUM ('DEFAULT', 'API'); CREATE TYPE "WorkflowTriggerScheduledRefMethods" AS ENUM ('DEFAULT', 'API'); -- Modify "WorkflowTriggerCronRef" table ALTER TABLE "WorkflowTriggerCronRef" ADD COLUMN "name" text NULL, ADD COLUMN "id" uuid NOT NULL, ADD COLUMN "method" "WorkflowTriggerCronRefMethods" NOT NULL DEFAULT 'DEFAULT', ADD CONSTRAINT "WorkflowTriggerCronRef_parentId_cron_name_key" UNIQUE ("parentId", "cron", "name"); + +UPDATE "WorkflowTriggerCronRef" SET "name" = '' WHERE "name" IS NULL; + -- Modify "WorkflowTriggerScheduledRef" table ALTER TABLE "WorkflowTriggerScheduledRef" ADD COLUMN "method" "WorkflowTriggerScheduledRefMethods" NOT NULL DEFAULT 'DEFAULT'; diff --git a/sql/migrations/20241204191714_v0.52.5.sql b/sql/migrations/20241204191714_v0.52.5.sql new file mode 100644 index 000000000..8c3ea3254 --- /dev/null +++ b/sql/migrations/20241204191714_v0.52.5.sql @@ -0,0 +1 @@ +UPDATE "WorkflowTriggerCronRef" SET "name" = '' WHERE "name" IS NULL; \ No newline at end of file diff --git a/sql/migrations/atlas.sum b/sql/migrations/atlas.sum index 3aaf2867b..7e01254eb 100644 --- a/sql/migrations/atlas.sum +++ b/sql/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:k8i1YyttxFMF1D3qndWWDz53IGb+MG6BGDN4MUZXqUY= +h1:sKzk6H+hzjxCLeAk/CkTouxbldj14qwUpoKyby94tN0= 20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k= 20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo= 20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs= @@ -75,4 +75,5 @@ h1:k8i1YyttxFMF1D3qndWWDz53IGb+MG6BGDN4MUZXqUY= 20241029122625_v0.51.0.sql h1:nOa4FqmZxSh1yBOJyduX+j15gQavjizTn660wyXjhNk= 20241107162939_v0.51.2.sql h1:qtnUITelb0kzAazo99gdTzejmQeOiE8NTP8b8bpQuF0= 20241114175346_v0.51.3.sql h1:ZbpRJsCmt6098ilZ3LtOk9LXRzuuwiznXPJmSkZSRpg= -20241121142159_v0.52.0.sql h1:Aw4tw+g2CUe7W/JVD+fDX4tXeP5FLNIU3f8U1jtRMnc= +20241121142159_v0.52.0.sql h1:fQdRQTxg2PiuFWbXJ9M4jxJLJsOkuWm5wfIuv8UPBVY= +20241204191714_v0.52.5.sql h1:aN6nxVv2oT3vCoxVw33nspjxlx/y3YLvyG6gWtmRGQI=