add jobs which always run on failure (#445)

* (wip) prisma schema

* feat: on-failure steps

* chore: address changes from PR review

* chore: bump migration number
This commit is contained in:
abelanger5
2024-05-06 15:39:22 -04:00
committed by GitHub
parent 485be72fed
commit 7543a0c2a5
31 changed files with 982 additions and 447 deletions
+1
View File
@@ -28,6 +28,7 @@ message CreateWorkflowVersionOpts {
WorkflowConcurrencyOpts concurrency = 8; // (optional) the workflow concurrency options
optional string schedule_timeout = 9; // (optional) the timeout for the schedule
optional string cron_input = 10; // (optional) the input for the cron trigger
optional CreateWorkflowJobOpts on_failure_job = 11; // (optional) the job to run on failure
}
enum ConcurrencyLimitStrategy {
+1
View File
@@ -149,6 +149,7 @@ func seedDev(repo repository.EngineRepository, tenantId string) error {
Jobs: []repository.CreateWorkflowJobOpts{
{
Name: "job-name",
Kind: "DEFAULT",
Steps: []repository.CreateWorkflowStepOpts{
{
ReadableId: "echo1",
+92
View File
@@ -0,0 +1,92 @@
package main
import (
"fmt"
"time"
"github.com/joho/godotenv"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
type stepOneOutput struct {
Message string `json:"message"`
}
func StepOne(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
return nil, fmt.Errorf("test on failure")
}
func OnFailure(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
return &stepOneOutput{
Message: "Failure!",
}, nil
}
func main() {
err := godotenv.Load()
if err != nil {
panic(err)
}
c, err := client.New()
if err != nil {
panic(err)
}
w, err := worker.NewWorker(
worker.WithClient(
c,
),
)
if err != nil {
panic(err)
}
err = w.On(
worker.NoTrigger(),
&worker.WorkflowJob{
Name: "on-failure-workflow",
Description: "This runs at a scheduled time.",
Steps: []*worker.WorkflowStep{
worker.Fn(StepOne).SetName("step-one"),
},
OnFailure: &worker.WorkflowJob{
Name: "scheduled-workflow-failure",
Description: "This runs when the scheduled workflow fails.",
Steps: []*worker.WorkflowStep{
worker.Fn(OnFailure).SetName("on-failure"),
},
},
},
)
if err != nil {
panic(err)
}
interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan())
defer cancel()
cleanup, err := w.Start()
if err != nil {
panic(fmt.Errorf("error cleaning up: %w", err))
}
for {
select {
case <-interruptCtx.Done():
if err := cleanup(); err != nil {
panic(fmt.Errorf("error cleaning up: %w", err))
}
return
default:
time.Sleep(time.Second)
}
}
}
@@ -56,8 +56,10 @@ export default function ExpandedWorkflowRun() {
runQuery.data.jobRuns &&
runQuery.data.jobRuns[0].stepRuns
) {
const stepRun = runQuery.data.jobRuns[0].stepRuns.find(
(stepRun) => stepRun.metadata.id === selectedStepRun.metadata.id,
const stepRun = runQuery.data.jobRuns.find((jobRun) =>
jobRun.stepRuns?.find(
(stepRun) => stepRun.metadata.id === selectedStepRun.metadata.id,
),
);
if (!stepRun) {
@@ -8,7 +8,13 @@ interface WorkflowRunsMetricsProps {
}
const calculatePercentage = (value: number, total: number): number => {
return Math.round((value / total) * 100);
const res = Math.round((value / total) * 100);
if (isNaN(res)) {
return 0;
}
return res;
};
export const WorkflowRunsMetricsView: React.FC<WorkflowRunsMetricsProps> = ({
+4 -3
View File
@@ -3,9 +3,8 @@ package repository
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc"
)
type UpdateJobRunLookupDataOpts struct {
@@ -36,5 +35,7 @@ type JobRunEngineRepository interface {
// run is being manually replayed, but shouldn't be used by most callers.
SetJobRunStatusRunning(ctx context.Context, tenantId, jobRunId string) error
ListJobRunsForWorkflowRun(ctx context.Context, tenantId, workflowRunId string) ([]pgtype.UUID, error)
ListJobRunsForWorkflowRun(ctx context.Context, tenantId, workflowRunId string) ([]*dbsqlc.ListJobRunsForWorkflowRunRow, error)
GetJobRunByWorkflowRunIdAndJobId(ctx context.Context, tenantId, workflowRunId, jobId string) (*dbsqlc.GetJobRunByWorkflowRunIdAndJobIdRow, error)
}
+15 -2
View File
@@ -108,8 +108,21 @@ WHERE
-- name: ListJobRunsForWorkflowRun :many
SELECT
"id"
"id",
"jobId"
FROM
"JobRun" jr
WHERE
jr."workflowRunId" = @workflowRunId::uuid;
jr."workflowRunId" = @workflowRunId::uuid;
-- name: GetJobRunByWorkflowRunIdAndJobId :one
SELECT
"id",
"jobId",
"status"
FROM
"JobRun" jr
WHERE
jr."tenantId" = @tenantId::uuid
AND jr."workflowRunId" = @workflowRunId::uuid
AND jr."jobId" = @jobId::uuid;
@@ -11,28 +11,66 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const getJobRunByWorkflowRunIdAndJobId = `-- name: GetJobRunByWorkflowRunIdAndJobId :one
SELECT
"id",
"jobId",
"status"
FROM
"JobRun" jr
WHERE
jr."tenantId" = $1::uuid
AND jr."workflowRunId" = $2::uuid
AND jr."jobId" = $3::uuid
`
type GetJobRunByWorkflowRunIdAndJobIdParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Workflowrunid pgtype.UUID `json:"workflowrunid"`
Jobid pgtype.UUID `json:"jobid"`
}
type GetJobRunByWorkflowRunIdAndJobIdRow struct {
ID pgtype.UUID `json:"id"`
JobId pgtype.UUID `json:"jobId"`
Status JobRunStatus `json:"status"`
}
func (q *Queries) GetJobRunByWorkflowRunIdAndJobId(ctx context.Context, db DBTX, arg GetJobRunByWorkflowRunIdAndJobIdParams) (*GetJobRunByWorkflowRunIdAndJobIdRow, error) {
row := db.QueryRow(ctx, getJobRunByWorkflowRunIdAndJobId, arg.Tenantid, arg.Workflowrunid, arg.Jobid)
var i GetJobRunByWorkflowRunIdAndJobIdRow
err := row.Scan(&i.ID, &i.JobId, &i.Status)
return &i, err
}
const listJobRunsForWorkflowRun = `-- name: ListJobRunsForWorkflowRun :many
SELECT
"id"
"id",
"jobId"
FROM
"JobRun" jr
WHERE
jr."workflowRunId" = $1::uuid
`
func (q *Queries) ListJobRunsForWorkflowRun(ctx context.Context, db DBTX, workflowrunid pgtype.UUID) ([]pgtype.UUID, error) {
type ListJobRunsForWorkflowRunRow struct {
ID pgtype.UUID `json:"id"`
JobId pgtype.UUID `json:"jobId"`
}
func (q *Queries) ListJobRunsForWorkflowRun(ctx context.Context, db DBTX, workflowrunid pgtype.UUID) ([]*ListJobRunsForWorkflowRunRow, error) {
rows, err := db.Query(ctx, listJobRunsForWorkflowRun, workflowrunid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []pgtype.UUID
var items []*ListJobRunsForWorkflowRunRow
for rows.Next() {
var id pgtype.UUID
if err := rows.Scan(&id); err != nil {
var i ListJobRunsForWorkflowRunRow
if err := rows.Scan(&i.ID, &i.JobId); err != nil {
return nil, err
}
items = append(items, id)
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
@@ -98,6 +98,48 @@ func (ns NullInviteLinkStatus) Value() (driver.Value, error) {
return string(ns.InviteLinkStatus), nil
}
type JobKind string
const (
JobKindDEFAULT JobKind = "DEFAULT"
JobKindONFAILURE JobKind = "ON_FAILURE"
)
func (e *JobKind) Scan(src interface{}) error {
switch s := src.(type) {
case []byte:
*e = JobKind(s)
case string:
*e = JobKind(s)
default:
return fmt.Errorf("unsupported scan type for JobKind: %T", src)
}
return nil
}
type NullJobKind struct {
JobKind JobKind `json:"JobKind"`
Valid bool `json:"valid"` // Valid is true if JobKind is not NULL
}
// Scan implements the Scanner interface.
func (ns *NullJobKind) Scan(value interface{}) error {
if value == nil {
ns.JobKind, ns.Valid = "", false
return nil
}
ns.Valid = true
return ns.JobKind.Scan(value)
}
// Value implements the driver Valuer interface.
func (ns NullJobKind) Value() (driver.Value, error) {
if !ns.Valid {
return nil, nil
}
return string(ns.JobKind), nil
}
type JobRunStatus string
const (
@@ -518,6 +560,7 @@ type Job struct {
Name string `json:"name"`
Description pgtype.Text `json:"description"`
Timeout pgtype.Text `json:"timeout"`
Kind JobKind `json:"kind"`
}
type JobRun struct {
@@ -905,4 +948,5 @@ type WorkflowVersion struct {
WorkflowId pgtype.UUID `json:"workflowId"`
Checksum string `json:"checksum"`
ScheduleTimeout string `json:"scheduleTimeout"`
OnFailureJobId pgtype.UUID `json:"onFailureJobId"`
}
@@ -4,6 +4,9 @@ CREATE TYPE "ConcurrencyLimitStrategy" AS ENUM ('CANCEL_IN_PROGRESS', 'DROP_NEWE
-- CreateEnum
CREATE TYPE "InviteLinkStatus" AS ENUM ('PENDING', 'ACCEPTED', 'REJECTED');
-- CreateEnum
CREATE TYPE "JobKind" AS ENUM ('DEFAULT', 'ON_FAILURE');
-- CreateEnum
CREATE TYPE "JobRunStatus" AS ENUM ('PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELLED');
@@ -188,6 +191,7 @@ CREATE TABLE "Job" (
"name" TEXT NOT NULL,
"description" TEXT,
"timeout" TEXT,
"kind" "JobKind" NOT NULL DEFAULT 'DEFAULT',
CONSTRAINT "Job_pkey" PRIMARY KEY ("id")
);
@@ -642,6 +646,7 @@ CREATE TABLE "WorkflowVersion" (
"workflowId" UUID NOT NULL,
"checksum" TEXT NOT NULL,
"scheduleTimeout" TEXT NOT NULL DEFAULT '5m',
"onFailureJobId" UUID,
CONSTRAINT "WorkflowVersion_pkey" PRIMARY KEY ("id")
);
@@ -901,6 +906,9 @@ CREATE UNIQUE INDEX "WorkflowTriggers_workflowVersionId_key" ON "WorkflowTrigger
-- CreateIndex
CREATE UNIQUE INDEX "WorkflowVersion_id_key" ON "WorkflowVersion"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "WorkflowVersion_onFailureJobId_key" ON "WorkflowVersion"("onFailureJobId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "_ActionToWorker_AB_unique" ON "_ActionToWorker"("A" ASC, "B" ASC);
@@ -1177,6 +1185,9 @@ ALTER TABLE "WorkflowTriggers" ADD CONSTRAINT "WorkflowTriggers_tenantId_fkey" F
-- AddForeignKey
ALTER TABLE "WorkflowTriggers" ADD CONSTRAINT "WorkflowTriggers_workflowVersionId_fkey" FOREIGN KEY ("workflowVersionId") REFERENCES "WorkflowVersion"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowVersion" ADD CONSTRAINT "WorkflowVersion_onFailureJobId_fkey" FOREIGN KEY ("onFailureJobId") REFERENCES "Job"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowVersion" ADD CONSTRAINT "WorkflowVersion_workflowId_fkey" FOREIGN KEY ("workflowId") REFERENCES "Workflow"("id") ON DELETE CASCADE ON UPDATE CASCADE;
@@ -242,13 +242,16 @@ WITH jobRuns AS (
sum(case when runs."status" = 'FAILED' then 1 else 0 end) AS failedRuns,
sum(case when runs."status" = 'CANCELLED' then 1 else 0 end) AS cancelledRuns
FROM "JobRun" as runs
JOIN "Job" as job ON runs."jobId" = job."id"
WHERE
"workflowRunId" = (
SELECT "workflowRunId"
FROM "JobRun"
WHERE "id" = @jobRunId::uuid
) AND
"tenantId" = @tenantId::uuid
runs."tenantId" = @tenantId::uuid AND
-- we should not include onFailure jobs in the calculation
job."kind" = 'DEFAULT'
)
UPDATE "WorkflowRun"
SET "status" = CASE
@@ -568,7 +568,7 @@ const getWorkflowRun = `-- name: GetWorkflowRun :many
SELECT
runs."createdAt", runs."updatedAt", runs."deletedAt", runs."tenantId", runs."workflowVersionId", runs.status, runs.error, runs."startedAt", runs."finishedAt", runs."concurrencyGroupId", runs."displayName", runs.id, runs."gitRepoBranch", runs."childIndex", runs."childKey", runs."parentId", runs."parentStepRunId",
runtriggers.id, runtriggers."createdAt", runtriggers."updatedAt", runtriggers."deletedAt", runtriggers."tenantId", runtriggers."eventId", runtriggers."cronParentId", runtriggers."cronSchedule", runtriggers."scheduledId", runtriggers.input, runtriggers."parentId",
workflowversion.id, workflowversion."createdAt", workflowversion."updatedAt", workflowversion."deletedAt", workflowversion.version, workflowversion."order", workflowversion."workflowId", workflowversion.checksum, workflowversion."scheduleTimeout",
workflowversion.id, workflowversion."createdAt", workflowversion."updatedAt", workflowversion."deletedAt", workflowversion.version, workflowversion."order", workflowversion."workflowId", workflowversion.checksum, workflowversion."scheduleTimeout", workflowversion."onFailureJobId",
workflow."name" as "workflowName",
-- waiting on https://github.com/sqlc-dev/sqlc/pull/2858 for nullable fields
wc."limitStrategy" as "concurrencyLimitStrategy",
@@ -653,6 +653,7 @@ func (q *Queries) GetWorkflowRun(ctx context.Context, db DBTX, arg GetWorkflowRu
&i.WorkflowVersion.WorkflowId,
&i.WorkflowVersion.Checksum,
&i.WorkflowVersion.ScheduleTimeout,
&i.WorkflowVersion.OnFailureJobId,
&i.WorkflowName,
&i.ConcurrencyLimitStrategy,
&i.ConcurrencyMaxRuns,
@@ -691,7 +692,7 @@ SELECT
runs."createdAt", runs."updatedAt", runs."deletedAt", runs."tenantId", runs."workflowVersionId", runs.status, runs.error, runs."startedAt", runs."finishedAt", runs."concurrencyGroupId", runs."displayName", runs.id, runs."gitRepoBranch", runs."childIndex", runs."childKey", runs."parentId", runs."parentStepRunId",
workflow.id, workflow."createdAt", workflow."updatedAt", workflow."deletedAt", workflow."tenantId", workflow.name, workflow.description,
runtriggers.id, runtriggers."createdAt", runtriggers."updatedAt", runtriggers."deletedAt", runtriggers."tenantId", runtriggers."eventId", runtriggers."cronParentId", runtriggers."cronSchedule", runtriggers."scheduledId", runtriggers.input, runtriggers."parentId",
workflowversion.id, workflowversion."createdAt", workflowversion."updatedAt", workflowversion."deletedAt", workflowversion.version, workflowversion."order", workflowversion."workflowId", workflowversion.checksum, workflowversion."scheduleTimeout",
workflowversion.id, workflowversion."createdAt", workflowversion."updatedAt", workflowversion."deletedAt", workflowversion.version, workflowversion."order", workflowversion."workflowId", workflowversion.checksum, workflowversion."scheduleTimeout", workflowversion."onFailureJobId",
-- waiting on https://github.com/sqlc-dev/sqlc/pull/2858 for nullable events field
events.id, events.key, events."createdAt", events."updatedAt"
FROM
@@ -840,6 +841,7 @@ func (q *Queries) ListWorkflowRuns(ctx context.Context, db DBTX, arg ListWorkflo
&i.WorkflowVersion.WorkflowId,
&i.WorkflowVersion.Checksum,
&i.WorkflowVersion.ScheduleTimeout,
&i.WorkflowVersion.OnFailureJobId,
&i.ID,
&i.Key,
&i.CreatedAt,
@@ -969,13 +971,16 @@ WITH jobRuns AS (
sum(case when runs."status" = 'FAILED' then 1 else 0 end) AS failedRuns,
sum(case when runs."status" = 'CANCELLED' then 1 else 0 end) AS cancelledRuns
FROM "JobRun" as runs
JOIN "Job" as job ON runs."jobId" = job."id"
WHERE
"workflowRunId" = (
SELECT "workflowRunId"
FROM "JobRun"
WHERE "id" = $1::uuid
) AND
"tenantId" = $2::uuid
runs."tenantId" = $2::uuid AND
-- we should not include onFailure jobs in the calculation
job."kind" = 'DEFAULT'
)
UPDATE "WorkflowRun"
SET "status" = CASE
@@ -196,7 +196,8 @@ INSERT INTO "Job" (
"workflowVersionId",
"name",
"description",
"timeout"
"timeout",
"kind"
) VALUES (
@id::uuid,
coalesce(sqlc.narg('createdAt')::timestamp, CURRENT_TIMESTAMP),
@@ -206,9 +207,16 @@ INSERT INTO "Job" (
@workflowVersionId::uuid,
@name::text,
@description::text,
@timeout::text
@timeout::text,
coalesce(sqlc.narg('kind')::"JobKind", 'DEFAULT')
) RETURNING *;
-- name: LinkOnFailureJob :one
UPDATE "WorkflowVersion"
SET "onFailureJobId" = @jobId::uuid
WHERE "id" = @workflowVersionId::uuid
RETURNING *;
-- name: CreateStep :one
INSERT INTO "Step" (
"id",
@@ -174,7 +174,8 @@ INSERT INTO "Job" (
"workflowVersionId",
"name",
"description",
"timeout"
"timeout",
"kind"
) VALUES (
$1::uuid,
coalesce($2::timestamp, CURRENT_TIMESTAMP),
@@ -184,8 +185,9 @@ INSERT INTO "Job" (
$6::uuid,
$7::text,
$8::text,
$9::text
) RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "workflowVersionId", name, description, timeout
$9::text,
coalesce($10::"JobKind", 'DEFAULT')
) RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "workflowVersionId", name, description, timeout, kind
`
type CreateJobParams struct {
@@ -198,6 +200,7 @@ type CreateJobParams struct {
Name string `json:"name"`
Description string `json:"description"`
Timeout string `json:"timeout"`
Kind NullJobKind `json:"kind"`
}
func (q *Queries) CreateJob(ctx context.Context, db DBTX, arg CreateJobParams) (*Job, error) {
@@ -211,6 +214,7 @@ func (q *Queries) CreateJob(ctx context.Context, db DBTX, arg CreateJobParams) (
arg.Name,
arg.Description,
arg.Timeout,
arg.Kind,
)
var i Job
err := row.Scan(
@@ -223,6 +227,7 @@ func (q *Queries) CreateJob(ctx context.Context, db DBTX, arg CreateJobParams) (
&i.Name,
&i.Description,
&i.Timeout,
&i.Kind,
)
return &i, err
}
@@ -647,7 +652,7 @@ INSERT INTO "WorkflowVersion" (
$6::text,
$7::uuid,
coalesce($8::text, '5m')
) RETURNING id, "createdAt", "updatedAt", "deletedAt", version, "order", "workflowId", checksum, "scheduleTimeout"
) RETURNING id, "createdAt", "updatedAt", "deletedAt", version, "order", "workflowId", checksum, "scheduleTimeout", "onFailureJobId"
`
type CreateWorkflowVersionParams struct {
@@ -683,6 +688,7 @@ func (q *Queries) CreateWorkflowVersion(ctx context.Context, db DBTX, arg Create
&i.WorkflowId,
&i.Checksum,
&i.ScheduleTimeout,
&i.OnFailureJobId,
)
return &i, err
}
@@ -738,7 +744,7 @@ func (q *Queries) GetWorkflowLatestVersion(ctx context.Context, db DBTX, workflo
const getWorkflowVersionForEngine = `-- name: GetWorkflowVersionForEngine :many
SELECT
workflowversions.id, workflowversions."createdAt", workflowversions."updatedAt", workflowversions."deletedAt", workflowversions.version, workflowversions."order", workflowversions."workflowId", workflowversions.checksum, workflowversions."scheduleTimeout",
workflowversions.id, workflowversions."createdAt", workflowversions."updatedAt", workflowversions."deletedAt", workflowversions.version, workflowversions."order", workflowversions."workflowId", workflowversions.checksum, workflowversions."scheduleTimeout", workflowversions."onFailureJobId",
w."name" as "workflowName",
wc."limitStrategy" as "concurrencyLimitStrategy",
wc."maxRuns" as "concurrencyMaxRuns"
@@ -784,6 +790,7 @@ func (q *Queries) GetWorkflowVersionForEngine(ctx context.Context, db DBTX, arg
&i.WorkflowVersion.WorkflowId,
&i.WorkflowVersion.Checksum,
&i.WorkflowVersion.ScheduleTimeout,
&i.WorkflowVersion.OnFailureJobId,
&i.WorkflowName,
&i.ConcurrencyLimitStrategy,
&i.ConcurrencyMaxRuns,
@@ -798,6 +805,36 @@ func (q *Queries) GetWorkflowVersionForEngine(ctx context.Context, db DBTX, arg
return items, nil
}
const linkOnFailureJob = `-- name: LinkOnFailureJob :one
UPDATE "WorkflowVersion"
SET "onFailureJobId" = $1::uuid
WHERE "id" = $2::uuid
RETURNING id, "createdAt", "updatedAt", "deletedAt", version, "order", "workflowId", checksum, "scheduleTimeout", "onFailureJobId"
`
type LinkOnFailureJobParams struct {
Jobid pgtype.UUID `json:"jobid"`
Workflowversionid pgtype.UUID `json:"workflowversionid"`
}
func (q *Queries) LinkOnFailureJob(ctx context.Context, db DBTX, arg LinkOnFailureJobParams) (*WorkflowVersion, error) {
row := db.QueryRow(ctx, linkOnFailureJob, arg.Jobid, arg.Workflowversionid)
var i WorkflowVersion
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.Version,
&i.Order,
&i.WorkflowId,
&i.Checksum,
&i.ScheduleTimeout,
&i.OnFailureJobId,
)
return &i, err
}
const listWorkflows = `-- name: ListWorkflows :many
SELECT
workflows.id, workflows."createdAt", workflows."updatedAt", workflows."deletedAt", workflows."tenantId", workflows.name, workflows.description
@@ -808,7 +845,7 @@ FROM (
"Workflow" as workflows
LEFT JOIN
(
SELECT id, "createdAt", "updatedAt", "deletedAt", version, "order", "workflowId", checksum, "scheduleTimeout" FROM "WorkflowVersion" as workflowVersion ORDER BY workflowVersion."order" DESC LIMIT 1
SELECT id, "createdAt", "updatedAt", "deletedAt", version, "order", "workflowId", checksum, "scheduleTimeout", "onFailureJobId" FROM "WorkflowVersion" as workflowVersion ORDER BY workflowVersion."order" DESC LIMIT 1
) as workflowVersion ON workflows."id" = workflowVersion."workflowId"
LEFT JOIN
"WorkflowTriggers" as workflowTrigger ON workflowVersion."id" = workflowTrigger."workflowVersionId"
+9 -2
View File
@@ -3,7 +3,6 @@ package prisma
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
@@ -60,10 +59,18 @@ func (j *jobRunEngineRepository) SetJobRunStatusRunning(ctx context.Context, ten
return setJobRunStatusRunning(ctx, j.pool, j.queries, j.l, tenantId, jobRunId)
}
func (j *jobRunEngineRepository) ListJobRunsForWorkflowRun(ctx context.Context, tenantId, workflowRunId string) ([]pgtype.UUID, error) {
func (j *jobRunEngineRepository) ListJobRunsForWorkflowRun(ctx context.Context, tenantId, workflowRunId string) ([]*dbsqlc.ListJobRunsForWorkflowRunRow, error) {
return j.queries.ListJobRunsForWorkflowRun(ctx, j.pool, sqlchelpers.UUIDFromStr(workflowRunId))
}
func (j *jobRunEngineRepository) GetJobRunByWorkflowRunIdAndJobId(ctx context.Context, tenantId, workflowRunId, jobId string) (*dbsqlc.GetJobRunByWorkflowRunIdAndJobIdRow, error) {
return j.queries.GetJobRunByWorkflowRunIdAndJobId(ctx, j.pool, dbsqlc.GetJobRunByWorkflowRunIdAndJobIdParams{
Workflowrunid: sqlchelpers.UUIDFromStr(workflowRunId),
Jobid: sqlchelpers.UUIDFromStr(jobId),
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
})
}
func setJobRunStatusRunning(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Queries, l *zerolog.Logger, tenantId, jobRunId string) error {
tx, err := pool.Begin(ctx)
+7 -18
View File
@@ -157,6 +157,10 @@ func (s *stepRunEngineRepository) ListStepRuns(ctx context.Context, tenantId str
}
}
if opts.JobRunId != nil {
listOpts.JobRunId = sqlchelpers.UUIDFromStr(*opts.JobRunId)
}
srs, err := s.queries.ListStepRuns(ctx, tx, listOpts)
if err != nil {
@@ -875,7 +879,7 @@ func (s *stepRunEngineRepository) updateStepRunCore(
}
if updateParams.Status.Valid &&
isFinalStepRunStatus(updateParams.Status.StepRunStatus) &&
repository.IsFinalStepRunStatus(updateParams.Status.StepRunStatus) &&
// we must have actually updated the status to a different state
string(innerStepRun.Status) != string(updateStepRun.Status) {
_, err := s.queries.UpdateWorkerSemaphore(ctx, tx, dbsqlc.UpdateWorkerSemaphoreParams{
@@ -931,28 +935,13 @@ func (s *stepRunEngineRepository) updateStepRunExtra(
}
return &repository.StepRunUpdateInfo{
JobRunFinalState: isFinalJobRunStatus(jobRun.Status),
WorkflowRunFinalState: isFinalWorkflowRunStatus(workflowRun.Status),
JobRunFinalState: repository.IsFinalJobRunStatus(jobRun.Status),
WorkflowRunFinalState: repository.IsFinalWorkflowRunStatus(workflowRun.Status),
WorkflowRunId: sqlchelpers.UUIDToStr(workflowRun.ID),
WorkflowRunStatus: string(workflowRun.Status),
}, nil
}
func isFinalStepRunStatus(status dbsqlc.StepRunStatus) bool {
return status != dbsqlc.StepRunStatusPENDING &&
status != dbsqlc.StepRunStatusPENDINGASSIGNMENT &&
status != dbsqlc.StepRunStatusASSIGNED &&
status != dbsqlc.StepRunStatusRUNNING
}
func isFinalJobRunStatus(status dbsqlc.JobRunStatus) bool {
return status != dbsqlc.JobRunStatusPENDING && status != dbsqlc.JobRunStatusRUNNING
}
func isFinalWorkflowRunStatus(status dbsqlc.WorkflowRunStatus) bool {
return status != dbsqlc.WorkflowRunStatusPENDING && status != dbsqlc.WorkflowRunStatusRUNNING && status != dbsqlc.WorkflowRunStatusQUEUED
}
// performant query for step run id, only returns what the engine needs
func (s *stepRunEngineRepository) GetStepRunForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunForEngineRow, error) {
res, err := s.queries.GetStepRunForEngine(ctx, s.pool, dbsqlc.GetStepRunForEngineParams{
+150 -114
View File
@@ -672,130 +672,32 @@ func (r *workflowEngineRepository) createWorkflowVersionTxs(ctx context.Context,
// create the workflow jobs
for _, jobOpts := range opts.Jobs {
jobId := uuid.New().String()
jobCp := jobOpts
var (
description, timeout string
)
_, err := r.createJobTx(ctx, tx, tenantId, sqlcWorkflowVersion.ID, opts, &jobCp)
if jobOpts.Description != nil {
description = *jobOpts.Description
if err != nil {
return "", err
}
}
sqlcJob, err := r.queries.CreateJob(
ctx,
tx,
dbsqlc.CreateJobParams{
ID: sqlchelpers.UUIDFromStr(jobId),
Tenantid: tenantId,
Workflowversionid: sqlcWorkflowVersion.ID,
Name: jobOpts.Name,
Description: description,
Timeout: timeout,
},
)
// create the onFailure job if exists
if opts.OnFailureJob != nil {
onFailureJobCp := *opts.OnFailureJob
jobId, err := r.createJobTx(ctx, tx, tenantId, sqlcWorkflowVersion.ID, opts, &onFailureJobCp)
if err != nil {
return "", err
}
for _, stepOpts := range jobOpts.Steps {
stepId := uuid.New().String()
_, err = r.queries.LinkOnFailureJob(ctx, tx, dbsqlc.LinkOnFailureJobParams{
Workflowversionid: sqlcWorkflowVersion.ID,
Jobid: sqlchelpers.UUIDFromStr(jobId),
})
var (
timeout pgtype.Text
customUserData []byte
retries pgtype.Int4
)
if stepOpts.Timeout != nil {
timeout = sqlchelpers.TextFromStr(*stepOpts.Timeout)
}
if stepOpts.UserData != nil {
customUserData = []byte(*stepOpts.UserData)
}
if stepOpts.Retries != nil {
retries = pgtype.Int4{
Valid: true,
Int32: int32(*stepOpts.Retries),
}
}
// upsert the action
_, err := r.queries.UpsertAction(
ctx,
tx,
dbsqlc.UpsertActionParams{
Action: stepOpts.Action,
Tenantid: tenantId,
},
)
if err != nil {
return "", err
}
createStepParams := dbsqlc.CreateStepParams{
ID: sqlchelpers.UUIDFromStr(stepId),
Tenantid: tenantId,
Jobid: sqlchelpers.UUIDFromStr(jobId),
Actionid: stepOpts.Action,
Timeout: timeout,
Readableid: stepOpts.ReadableId,
CustomUserData: customUserData,
Retries: retries,
}
if opts.ScheduleTimeout != nil {
createStepParams.ScheduleTimeout = sqlchelpers.TextFromStr(*opts.ScheduleTimeout)
}
_, err = r.queries.CreateStep(
ctx,
tx,
createStepParams,
)
if err != nil {
return "", err
}
if len(stepOpts.Parents) > 0 {
err := r.queries.AddStepParents(
ctx,
tx,
dbsqlc.AddStepParentsParams{
ID: sqlchelpers.UUIDFromStr(stepId),
Parents: stepOpts.Parents,
Jobid: sqlcJob.ID,
},
)
if err != nil {
return "", err
}
}
if len(stepOpts.RateLimits) > 0 {
for _, rateLimit := range stepOpts.RateLimits {
_, err := r.queries.CreateStepRateLimit(
ctx,
tx,
dbsqlc.CreateStepRateLimitParams{
Stepid: sqlchelpers.UUIDFromStr(stepId),
Ratelimitkey: rateLimit.Key,
Units: int32(rateLimit.Units),
Tenantid: tenantId,
},
)
if err != nil {
return "", err
}
}
}
if err != nil {
return "", err
}
}
@@ -865,6 +767,140 @@ func (r *workflowEngineRepository) createWorkflowVersionTxs(ctx context.Context,
return workflowVersionId, nil
}
func (r *workflowEngineRepository) createJobTx(ctx context.Context, tx pgx.Tx, tenantId, workflowVersionId pgtype.UUID, opts *repository.CreateWorkflowVersionOpts, jobOpts *repository.CreateWorkflowJobOpts) (string, error) {
jobId := uuid.New().String()
var (
description, timeout string
)
if jobOpts.Description != nil {
description = *jobOpts.Description
}
sqlcJob, err := r.queries.CreateJob(
ctx,
tx,
dbsqlc.CreateJobParams{
ID: sqlchelpers.UUIDFromStr(jobId),
Tenantid: tenantId,
Workflowversionid: workflowVersionId,
Name: jobOpts.Name,
Description: description,
Timeout: timeout,
Kind: dbsqlc.NullJobKind{
Valid: true,
JobKind: dbsqlc.JobKind(jobOpts.Kind),
},
},
)
if err != nil {
return "", err
}
for _, stepOpts := range jobOpts.Steps {
stepId := uuid.New().String()
var (
timeout pgtype.Text
customUserData []byte
retries pgtype.Int4
)
if stepOpts.Timeout != nil {
timeout = sqlchelpers.TextFromStr(*stepOpts.Timeout)
}
if stepOpts.UserData != nil {
customUserData = []byte(*stepOpts.UserData)
}
if stepOpts.Retries != nil {
retries = pgtype.Int4{
Valid: true,
Int32: int32(*stepOpts.Retries),
}
}
// upsert the action
_, err := r.queries.UpsertAction(
ctx,
tx,
dbsqlc.UpsertActionParams{
Action: stepOpts.Action,
Tenantid: tenantId,
},
)
if err != nil {
return "", err
}
createStepParams := dbsqlc.CreateStepParams{
ID: sqlchelpers.UUIDFromStr(stepId),
Tenantid: tenantId,
Jobid: sqlchelpers.UUIDFromStr(jobId),
Actionid: stepOpts.Action,
Timeout: timeout,
Readableid: stepOpts.ReadableId,
CustomUserData: customUserData,
Retries: retries,
}
if opts.ScheduleTimeout != nil {
createStepParams.ScheduleTimeout = sqlchelpers.TextFromStr(*opts.ScheduleTimeout)
}
_, err = r.queries.CreateStep(
ctx,
tx,
createStepParams,
)
if err != nil {
return "", err
}
if len(stepOpts.Parents) > 0 {
err := r.queries.AddStepParents(
ctx,
tx,
dbsqlc.AddStepParentsParams{
ID: sqlchelpers.UUIDFromStr(stepId),
Parents: stepOpts.Parents,
Jobid: sqlcJob.ID,
},
)
if err != nil {
return "", err
}
}
if len(stepOpts.RateLimits) > 0 {
for _, rateLimit := range stepOpts.RateLimits {
_, err := r.queries.CreateStepRateLimit(
ctx,
tx,
dbsqlc.CreateStepRateLimitParams{
Stepid: sqlchelpers.UUIDFromStr(stepId),
Ratelimitkey: rateLimit.Key,
Units: int32(rateLimit.Units),
Tenantid: tenantId,
},
)
if err != nil {
return "", err
}
}
}
}
return jobId, nil
}
func defaultWorkflowPopulator() []db.WorkflowRelationWith {
return []db.WorkflowRelationWith{
db.Workflow.Tags.Fetch(),
+17
View File
@@ -10,11 +10,28 @@ import (
)
type ListStepRunsOpts struct {
JobRunId *string `validate:"omitempty,uuid"`
WorkflowRunIds []string `validate:"dive,uuid"`
Status *dbsqlc.StepRunStatus
}
func IsFinalStepRunStatus(status dbsqlc.StepRunStatus) bool {
return status != dbsqlc.StepRunStatusPENDING &&
status != dbsqlc.StepRunStatusPENDINGASSIGNMENT &&
status != dbsqlc.StepRunStatusASSIGNED &&
status != dbsqlc.StepRunStatusRUNNING
}
func IsFinalJobRunStatus(status dbsqlc.JobRunStatus) bool {
return status != dbsqlc.JobRunStatusPENDING && status != dbsqlc.JobRunStatusRUNNING
}
func IsFinalWorkflowRunStatus(status dbsqlc.WorkflowRunStatus) bool {
return status != dbsqlc.WorkflowRunStatusPENDING && status != dbsqlc.WorkflowRunStatusRUNNING && status != dbsqlc.WorkflowRunStatusQUEUED
}
type UpdateStepRunOpts struct {
IsRerun bool
+5 -1
View File
@@ -38,6 +38,8 @@ type CreateWorkflowVersionOpts struct {
// (required) the workflow jobs
Jobs []CreateWorkflowJobOpts `validate:"required,min=1,dive"`
OnFailureJob *CreateWorkflowJobOpts `json:"onFailureJob,omitempty" validate:"omitempty"`
// (optional) the workflow concurrency groups
Concurrency *CreateWorkflowConcurrencyOpts `json:"concurrency,omitempty" validator:"omitnil"`
@@ -96,6 +98,8 @@ type CreateWorkflowJobOpts struct {
// (required) the job steps
Steps []CreateWorkflowStepOpts `validate:"required,min=1,dive"`
Kind string `validate:"required,oneof=DEFAULT ON_FAILURE"`
}
type CreateWorkflowStepOpts struct {
@@ -106,7 +110,7 @@ type CreateWorkflowStepOpts struct {
Action string `validate:"required,actionId"`
// (optional) the step timeout
Timeout *string `validate:"omitempty,duration"`
Timeout *string `validate:"omitnil,duration"`
// (optional) the parents that this step depends on
Parents []string `validate:"dive,hatchetName"`
+188 -173
View File
@@ -185,6 +185,7 @@ type CreateWorkflowVersionOpts struct {
Concurrency *WorkflowConcurrencyOpts `protobuf:"bytes,8,opt,name=concurrency,proto3" json:"concurrency,omitempty"` // (optional) the workflow concurrency options
ScheduleTimeout *string `protobuf:"bytes,9,opt,name=schedule_timeout,json=scheduleTimeout,proto3,oneof" json:"schedule_timeout,omitempty"` // (optional) the timeout for the schedule
CronInput *string `protobuf:"bytes,10,opt,name=cron_input,json=cronInput,proto3,oneof" json:"cron_input,omitempty"` // (optional) the input for the cron trigger
OnFailureJob *CreateWorkflowJobOpts `protobuf:"bytes,11,opt,name=on_failure_job,json=onFailureJob,proto3,oneof" json:"on_failure_job,omitempty"` // (optional) the job to run on failure
}
func (x *CreateWorkflowVersionOpts) Reset() {
@@ -289,6 +290,13 @@ func (x *CreateWorkflowVersionOpts) GetCronInput() string {
return ""
}
func (x *CreateWorkflowVersionOpts) GetOnFailureJob() *CreateWorkflowJobOpts {
if x != nil {
return x.OnFailureJob
}
return nil
}
type WorkflowConcurrencyOpts struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -1173,7 +1181,7 @@ var file_workflows_proto_rawDesc = []byte{
0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x04, 0x6f, 0x70, 0x74, 0x73,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x57,
0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x4f, 0x70,
0x74, 0x73, 0x52, 0x04, 0x6f, 0x70, 0x74, 0x73, 0x22, 0xe2, 0x03, 0x0a, 0x19, 0x43, 0x72, 0x65,
0x74, 0x73, 0x52, 0x04, 0x6f, 0x70, 0x74, 0x73, 0x22, 0xb8, 0x04, 0x0a, 0x19, 0x43, 0x72, 0x65,
0x61, 0x74, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x56, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65,
@@ -1201,158 +1209,164 @@ var file_workflows_proto_rawDesc = []byte{
0x00, 0x52, 0x0f, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x6f,
0x75, 0x74, 0x88, 0x01, 0x01, 0x12, 0x22, 0x0a, 0x0a, 0x63, 0x72, 0x6f, 0x6e, 0x5f, 0x69, 0x6e,
0x70, 0x75, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x09, 0x63, 0x72, 0x6f,
0x6e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x88, 0x01, 0x01, 0x42, 0x13, 0x0a, 0x11, 0x5f, 0x73, 0x63,
0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0d,
0x0a, 0x0b, 0x5f, 0x63, 0x72, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x22, 0x8e, 0x01,
0x0a, 0x17, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72,
0x72, 0x65, 0x6e, 0x63, 0x79, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74,
0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x12, 0x19, 0x0a, 0x08, 0x6d, 0x61, 0x78, 0x5f, 0x72, 0x75, 0x6e, 0x73, 0x18, 0x02, 0x20,
0x01, 0x28, 0x05, 0x52, 0x07, 0x6d, 0x61, 0x78, 0x52, 0x75, 0x6e, 0x73, 0x12, 0x40, 0x0a, 0x0e,
0x6c, 0x69, 0x6d, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x18, 0x03,
0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e,
0x63, 0x79, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x52,
0x0d, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x22, 0x82,
0x01, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f,
0x77, 0x4a, 0x6f, 0x62, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b,
0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2d,
0x0a, 0x05, 0x73, 0x74, 0x65, 0x70, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e,
0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x53, 0x74,
0x65, 0x70, 0x4f, 0x70, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, 0x65, 0x70, 0x73, 0x4a, 0x04, 0x08,
0x03, 0x10, 0x04, 0x22, 0x8b, 0x02, 0x0a, 0x16, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x57, 0x6f,
0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x65, 0x70, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x1f,
0x0a, 0x0b, 0x72, 0x65, 0x61, 0x64, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0a, 0x72, 0x65, 0x61, 0x64, 0x61, 0x62, 0x6c, 0x65, 0x49, 0x64, 0x12,
0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75,
0x74, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x06, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x72,
0x65, 0x6e, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x72, 0x65,
0x6e, 0x74, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x64, 0x61, 0x74, 0x61,
0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x44, 0x61, 0x74, 0x61,
0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28,
0x05, 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x35, 0x0a, 0x0b, 0x72, 0x61,
0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x14, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x74, 0x65, 0x70, 0x52, 0x61, 0x74, 0x65,
0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x0a, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74,
0x73, 0x22, 0x3d, 0x0a, 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x74, 0x65, 0x70, 0x52,
0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x6e,
0x69, 0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x75, 0x6e, 0x69, 0x74, 0x73,
0x22, 0x16, 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77,
0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xdc, 0x02, 0x0a, 0x17, 0x53, 0x63, 0x68,
0x65, 0x64, 0x75, 0x6c, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x73, 0x63, 0x68, 0x65,
0x64, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x20, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x65,
0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x70,
0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x30, 0x0a, 0x12, 0x70, 0x61,
0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x65, 0x70, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64,
0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0f, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74,
0x53, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x24, 0x0a, 0x0b,
0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x06, 0x20, 0x01, 0x28,
0x05, 0x48, 0x02, 0x52, 0x0a, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x88,
0x01, 0x01, 0x12, 0x20, 0x0a, 0x09, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x18,
0x07, 0x20, 0x01, 0x28, 0x09, 0x48, 0x03, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x4b, 0x65,
0x79, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f,
0x69, 0x64, 0x42, 0x15, 0x0a, 0x13, 0x5f, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74,
0x65, 0x70, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x63, 0x68,
0x69, 0x6c, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x63, 0x68,
0x69, 0x6c, 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x22, 0xe8, 0x01, 0x0a, 0x0f, 0x57, 0x6f, 0x72, 0x6b,
0x66, 0x6c, 0x6f, 0x77, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x39, 0x0a, 0x0a, 0x63,
0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x63, 0x72, 0x65,
0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65,
0x64, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41,
0x74, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01,
0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x14, 0x0a, 0x05, 0x6f,
0x72, 0x64, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65,
0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64,
0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77,
0x49, 0x64, 0x22, 0x53, 0x0a, 0x17, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x54, 0x72,
0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x12, 0x1b, 0x0a,
0x09, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x76,
0x65, 0x6e, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65,
0x76, 0x65, 0x6e, 0x74, 0x4b, 0x65, 0x79, 0x22, 0x49, 0x0a, 0x16, 0x57, 0x6f, 0x72, 0x6b, 0x66,
0x6c, 0x6f, 0x77, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x43, 0x72, 0x6f, 0x6e, 0x52, 0x65,
0x66, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x12,
0x0a, 0x04, 0x63, 0x72, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x72,
0x6f, 0x6e, 0x22, 0xa1, 0x02, 0x0a, 0x16, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x57, 0x6f,
0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a,
0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d,
0x65, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x20, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x65, 0x6e,
0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x70, 0x61,
0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x30, 0x0a, 0x12, 0x70, 0x61, 0x72,
0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x65, 0x70, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18,
0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0f, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x53,
0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x24, 0x0a, 0x0b, 0x63,
0x68, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05,
0x48, 0x02, 0x52, 0x0a, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x88, 0x01,
0x01, 0x12, 0x20, 0x0a, 0x09, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x06,
0x20, 0x01, 0x28, 0x09, 0x48, 0x03, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x4b, 0x65, 0x79,
0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69,
0x64, 0x42, 0x15, 0x0a, 0x13, 0x5f, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x65,
0x70, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x63, 0x68, 0x69,
0x6c, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x63, 0x68, 0x69,
0x6c, 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x22, 0x41, 0x0a, 0x17, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65,
0x72, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x26, 0x0a, 0x0f, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x72, 0x75,
0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x77, 0x6f, 0x72, 0x6b,
0x66, 0x6c, 0x6f, 0x77, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x22, 0x6d, 0x0a, 0x13, 0x50, 0x75, 0x74,
0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28,
0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x2e, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x52, 0x61, 0x74,
0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08,
0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x16, 0x0a, 0x14, 0x50, 0x75, 0x74, 0x52,
0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x2a, 0x6c, 0x0a, 0x18, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x4c,
0x69, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x16, 0x0a, 0x12,
0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x5f, 0x49, 0x4e, 0x5f, 0x50, 0x52, 0x4f, 0x47, 0x52, 0x45,
0x53, 0x53, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x44, 0x52, 0x4f, 0x50, 0x5f, 0x4e, 0x45, 0x57,
0x45, 0x53, 0x54, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x51, 0x55, 0x45, 0x55, 0x45, 0x5f, 0x4e,
0x45, 0x57, 0x45, 0x53, 0x54, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x47, 0x52, 0x4f, 0x55, 0x50,
0x5f, 0x52, 0x4f, 0x55, 0x4e, 0x44, 0x5f, 0x52, 0x4f, 0x42, 0x49, 0x4e, 0x10, 0x03, 0x2a, 0x35,
0x0a, 0x11, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x44, 0x75, 0x72, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x45, 0x43, 0x4f, 0x4e, 0x44, 0x10, 0x00, 0x12,
0x0a, 0x0a, 0x06, 0x4d, 0x49, 0x4e, 0x55, 0x54, 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x48,
0x4f, 0x55, 0x52, 0x10, 0x02, 0x32, 0x8a, 0x02, 0x0a, 0x0f, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c,
0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x34, 0x0a, 0x0b, 0x50, 0x75, 0x74,
0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x13, 0x2e, 0x50, 0x75, 0x74, 0x57, 0x6f,
0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e,
0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12,
0x3e, 0x0a, 0x10, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66,
0x6c, 0x6f, 0x77, 0x12, 0x18, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x57, 0x6f,
0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e,
0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12,
0x44, 0x0a, 0x0f, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c,
0x6f, 0x77, 0x12, 0x17, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b,
0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x54, 0x72,
0x69, 0x67, 0x67, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x0c, 0x50, 0x75, 0x74, 0x52, 0x61, 0x74, 0x65,
0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x14, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x61, 0x74, 0x65, 0x4c,
0x69, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x50, 0x75,
0x74, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x68, 0x61, 0x74,
0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x73, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x2f, 0x63, 0x6f, 0x6e,
0x74, 0x72, 0x61, 0x63, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x6e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x88, 0x01, 0x01, 0x12, 0x41, 0x0a, 0x0e, 0x6f, 0x6e, 0x5f,
0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x5f, 0x6a, 0x6f, 0x62, 0x18, 0x0b, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x16, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c,
0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4f, 0x70, 0x74, 0x73, 0x48, 0x02, 0x52, 0x0c, 0x6f, 0x6e, 0x46,
0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x4a, 0x6f, 0x62, 0x88, 0x01, 0x01, 0x42, 0x13, 0x0a, 0x11,
0x5f, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75,
0x74, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x63, 0x72, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, 0x70, 0x75, 0x74,
0x42, 0x11, 0x0a, 0x0f, 0x5f, 0x6f, 0x6e, 0x5f, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x5f,
0x6a, 0x6f, 0x62, 0x22, 0x8e, 0x01, 0x0a, 0x17, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77,
0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x4f, 0x70, 0x74, 0x73, 0x12,
0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x6d, 0x61, 0x78, 0x5f, 0x72,
0x75, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x6d, 0x61, 0x78, 0x52, 0x75,
0x6e, 0x73, 0x12, 0x40, 0x0a, 0x0e, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x72, 0x61,
0x74, 0x65, 0x67, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x43, 0x6f, 0x6e,
0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72,
0x61, 0x74, 0x65, 0x67, 0x79, 0x52, 0x0d, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72, 0x61,
0x74, 0x65, 0x67, 0x79, 0x22, 0x82, 0x01, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x57,
0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x12,
0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61,
0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f,
0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2d, 0x0a, 0x05, 0x73, 0x74, 0x65, 0x70, 0x73, 0x18, 0x04, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x57, 0x6f, 0x72, 0x6b,
0x66, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x65, 0x70, 0x4f, 0x70, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74,
0x65, 0x70, 0x73, 0x4a, 0x04, 0x08, 0x03, 0x10, 0x04, 0x22, 0x8b, 0x02, 0x0a, 0x16, 0x43, 0x72,
0x65, 0x61, 0x74, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x65, 0x70,
0x4f, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x61, 0x64, 0x61, 0x62, 0x6c, 0x65,
0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x72, 0x65, 0x61, 0x64, 0x61,
0x62, 0x6c, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a,
0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x70, 0x75, 0x74,
0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x73, 0x12,
0x18, 0x0a, 0x07, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09,
0x52, 0x07, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x75, 0x73, 0x65,
0x72, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x75, 0x73,
0x65, 0x72, 0x44, 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65,
0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73,
0x12, 0x35, 0x0a, 0x0b, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x73, 0x18,
0x08, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x74,
0x65, 0x70, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x0a, 0x72, 0x61, 0x74,
0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x73, 0x22, 0x3d, 0x0a, 0x13, 0x43, 0x72, 0x65, 0x61, 0x74,
0x65, 0x53, 0x74, 0x65, 0x70, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x10,
0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79,
0x12, 0x14, 0x0a, 0x05, 0x75, 0x6e, 0x69, 0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52,
0x05, 0x75, 0x6e, 0x69, 0x74, 0x73, 0x22, 0x16, 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x57, 0x6f,
0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xdc,
0x02, 0x0a, 0x17, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66,
0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x38,
0x0a, 0x09, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28,
0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73,
0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x70, 0x75,
0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x20,
0x0a, 0x09, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x48, 0x00, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x88, 0x01, 0x01,
0x12, 0x30, 0x0a, 0x12, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x65, 0x70, 0x5f,
0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0f,
0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x88,
0x01, 0x01, 0x12, 0x24, 0x0a, 0x0b, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65,
0x78, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x48, 0x02, 0x52, 0x0a, 0x63, 0x68, 0x69, 0x6c, 0x64,
0x49, 0x6e, 0x64, 0x65, 0x78, 0x88, 0x01, 0x01, 0x12, 0x20, 0x0a, 0x09, 0x63, 0x68, 0x69, 0x6c,
0x64, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x48, 0x03, 0x52, 0x08, 0x63,
0x68, 0x69, 0x6c, 0x64, 0x4b, 0x65, 0x79, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x70,
0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x42, 0x15, 0x0a, 0x13, 0x5f, 0x70, 0x61, 0x72,
0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x65, 0x70, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x42,
0x0e, 0x0a, 0x0c, 0x5f, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x42,
0x0c, 0x0a, 0x0a, 0x5f, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x22, 0xe8, 0x01,
0x0a, 0x0f, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69,
0x64, 0x12, 0x39, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d,
0x70, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x39, 0x0a, 0x0a,
0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70,
0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05,
0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66,
0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f,
0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x22, 0x53, 0x0a, 0x17, 0x57, 0x6f, 0x72, 0x6b,
0x66, 0x6c, 0x6f, 0x77, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74,
0x52, 0x65, 0x66, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64,
0x12, 0x1b, 0x0a, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x4b, 0x65, 0x79, 0x22, 0x49, 0x0a,
0x16, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72,
0x43, 0x72, 0x6f, 0x6e, 0x52, 0x65, 0x66, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x65, 0x6e,
0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65,
0x6e, 0x74, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x72, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x04, 0x63, 0x72, 0x6f, 0x6e, 0x22, 0xa1, 0x02, 0x0a, 0x16, 0x54, 0x72, 0x69,
0x67, 0x67, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x20, 0x0a,
0x09, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x48, 0x00, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12,
0x30, 0x0a, 0x12, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x65, 0x70, 0x5f, 0x72,
0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0f, 0x70,
0x61, 0x72, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x88, 0x01,
0x01, 0x12, 0x24, 0x0a, 0x0b, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78,
0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x48, 0x02, 0x52, 0x0a, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x49,
0x6e, 0x64, 0x65, 0x78, 0x88, 0x01, 0x01, 0x12, 0x20, 0x0a, 0x09, 0x63, 0x68, 0x69, 0x6c, 0x64,
0x5f, 0x6b, 0x65, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x48, 0x03, 0x52, 0x08, 0x63, 0x68,
0x69, 0x6c, 0x64, 0x4b, 0x65, 0x79, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x70, 0x61,
0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x42, 0x15, 0x0a, 0x13, 0x5f, 0x70, 0x61, 0x72, 0x65,
0x6e, 0x74, 0x5f, 0x73, 0x74, 0x65, 0x70, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x42, 0x0e,
0x0a, 0x0c, 0x5f, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x0c,
0x0a, 0x0a, 0x5f, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x22, 0x41, 0x0a, 0x17,
0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x26, 0x0a, 0x0f, 0x77, 0x6f, 0x72, 0x6b, 0x66,
0x6c, 0x6f, 0x77, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0d, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x22,
0x6d, 0x0a, 0x13, 0x50, 0x75, 0x74, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69,
0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x2e,
0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e,
0x32, 0x12, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x44, 0x75, 0x72, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x16,
0x0a, 0x14, 0x50, 0x75, 0x74, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x6c, 0x0a, 0x18, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72,
0x72, 0x65, 0x6e, 0x63, 0x79, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65,
0x67, 0x79, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x5f, 0x49, 0x4e, 0x5f,
0x50, 0x52, 0x4f, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x44, 0x52,
0x4f, 0x50, 0x5f, 0x4e, 0x45, 0x57, 0x45, 0x53, 0x54, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x51,
0x55, 0x45, 0x55, 0x45, 0x5f, 0x4e, 0x45, 0x57, 0x45, 0x53, 0x54, 0x10, 0x02, 0x12, 0x15, 0x0a,
0x11, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x52, 0x4f, 0x55, 0x4e, 0x44, 0x5f, 0x52, 0x4f, 0x42,
0x49, 0x4e, 0x10, 0x03, 0x2a, 0x35, 0x0a, 0x11, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69,
0x74, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x45, 0x43,
0x4f, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x4d, 0x49, 0x4e, 0x55, 0x54, 0x45, 0x10,
0x01, 0x12, 0x08, 0x0a, 0x04, 0x48, 0x4f, 0x55, 0x52, 0x10, 0x02, 0x32, 0x8a, 0x02, 0x0a, 0x0f,
0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12,
0x34, 0x0a, 0x0b, 0x50, 0x75, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x13,
0x2e, 0x50, 0x75, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x56, 0x65,
0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x3e, 0x0a, 0x10, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x18, 0x2e, 0x53, 0x63, 0x68, 0x65,
0x64, 0x75, 0x6c, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x56, 0x65,
0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72,
0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x17, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67,
0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x18, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x66,
0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x0c, 0x50,
0x75, 0x74, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x14, 0x2e, 0x50, 0x75,
0x74, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x15, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68,
0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64,
0x65, 0x76, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72,
0x6e, 0x61, 0x6c, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x61, 0x64, 0x6d,
0x69, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -1394,26 +1408,27 @@ var file_workflows_proto_depIdxs = []int32{
17, // 1: CreateWorkflowVersionOpts.scheduled_triggers:type_name -> google.protobuf.Timestamp
5, // 2: CreateWorkflowVersionOpts.jobs:type_name -> CreateWorkflowJobOpts
4, // 3: CreateWorkflowVersionOpts.concurrency:type_name -> WorkflowConcurrencyOpts
0, // 4: WorkflowConcurrencyOpts.limit_strategy:type_name -> ConcurrencyLimitStrategy
6, // 5: CreateWorkflowJobOpts.steps:type_name -> CreateWorkflowStepOpts
7, // 6: CreateWorkflowStepOpts.rate_limits:type_name -> CreateStepRateLimit
17, // 7: ScheduleWorkflowRequest.schedules:type_name -> google.protobuf.Timestamp
17, // 8: WorkflowVersion.created_at:type_name -> google.protobuf.Timestamp
17, // 9: WorkflowVersion.updated_at:type_name -> google.protobuf.Timestamp
1, // 10: PutRateLimitRequest.duration:type_name -> RateLimitDuration
2, // 11: WorkflowService.PutWorkflow:input_type -> PutWorkflowRequest
9, // 12: WorkflowService.ScheduleWorkflow:input_type -> ScheduleWorkflowRequest
13, // 13: WorkflowService.TriggerWorkflow:input_type -> TriggerWorkflowRequest
15, // 14: WorkflowService.PutRateLimit:input_type -> PutRateLimitRequest
10, // 15: WorkflowService.PutWorkflow:output_type -> WorkflowVersion
10, // 16: WorkflowService.ScheduleWorkflow:output_type -> WorkflowVersion
14, // 17: WorkflowService.TriggerWorkflow:output_type -> TriggerWorkflowResponse
16, // 18: WorkflowService.PutRateLimit:output_type -> PutRateLimitResponse
15, // [15:19] is the sub-list for method output_type
11, // [11:15] is the sub-list for method input_type
11, // [11:11] is the sub-list for extension type_name
11, // [11:11] is the sub-list for extension extendee
0, // [0:11] is the sub-list for field type_name
5, // 4: CreateWorkflowVersionOpts.on_failure_job:type_name -> CreateWorkflowJobOpts
0, // 5: WorkflowConcurrencyOpts.limit_strategy:type_name -> ConcurrencyLimitStrategy
6, // 6: CreateWorkflowJobOpts.steps:type_name -> CreateWorkflowStepOpts
7, // 7: CreateWorkflowStepOpts.rate_limits:type_name -> CreateStepRateLimit
17, // 8: ScheduleWorkflowRequest.schedules:type_name -> google.protobuf.Timestamp
17, // 9: WorkflowVersion.created_at:type_name -> google.protobuf.Timestamp
17, // 10: WorkflowVersion.updated_at:type_name -> google.protobuf.Timestamp
1, // 11: PutRateLimitRequest.duration:type_name -> RateLimitDuration
2, // 12: WorkflowService.PutWorkflow:input_type -> PutWorkflowRequest
9, // 13: WorkflowService.ScheduleWorkflow:input_type -> ScheduleWorkflowRequest
13, // 14: WorkflowService.TriggerWorkflow:input_type -> TriggerWorkflowRequest
15, // 15: WorkflowService.PutRateLimit:input_type -> PutRateLimitRequest
10, // 16: WorkflowService.PutWorkflow:output_type -> WorkflowVersion
10, // 17: WorkflowService.ScheduleWorkflow:output_type -> WorkflowVersion
14, // 18: WorkflowService.TriggerWorkflow:output_type -> TriggerWorkflowResponse
16, // 19: WorkflowService.PutRateLimit:output_type -> PutRateLimitResponse
16, // [16:20] is the sub-list for method output_type
12, // [12:16] is the sub-list for method input_type
12, // [12:12] is the sub-list for extension type_name
12, // [12:12] is the sub-list for extension extendee
0, // [0:12] is the sub-list for field type_name
}
func init() { file_workflows_proto_init() }
+61 -38
View File
@@ -348,48 +348,25 @@ func getCreateWorkflowOpts(req *contracts.PutWorkflowRequest) (*repository.Creat
for i, job := range req.Opts.Jobs {
jobCp := job
res, err := getCreateJobOpts(jobCp, "DEFAULT")
steps := make([]repository.CreateWorkflowStepOpts, len(job.Steps))
for j, step := range job.Steps {
stepCp := step
parsedAction, err := types.ParseActionID(step.Action)
if err != nil {
return nil, err
}
retries := int(stepCp.Retries)
steps[j] = repository.CreateWorkflowStepOpts{
ReadableId: stepCp.ReadableId,
Action: parsedAction.String(),
Parents: stepCp.Parents,
Retries: &retries,
}
if stepCp.Timeout != "" {
steps[j].Timeout = &stepCp.Timeout
}
for _, rateLimit := range stepCp.RateLimits {
steps[j].RateLimits = append(steps[j].RateLimits, repository.CreateWorkflowStepRateLimitOpts{
Key: rateLimit.Key,
Units: int(rateLimit.Units),
})
}
if stepCp.UserData != "" {
steps[j].UserData = &stepCp.UserData
}
if err != nil {
return nil, err
}
jobs[i] = repository.CreateWorkflowJobOpts{
Name: jobCp.Name,
Description: &jobCp.Description,
Steps: steps,
jobs[i] = *res
}
var onFailureJob *repository.CreateWorkflowJobOpts
if req.Opts.OnFailureJob != nil {
onFailureJobCp, err := getCreateJobOpts(req.Opts.OnFailureJob, "ON_FAILURE")
if err != nil {
return nil, err
}
onFailureJob = onFailureJobCp
}
scheduledTriggers := make([]time.Time, 0)
@@ -433,10 +410,56 @@ func getCreateWorkflowOpts(req *contracts.PutWorkflowRequest) (*repository.Creat
CronInput: cronInput,
ScheduledTriggers: scheduledTriggers,
Jobs: jobs,
OnFailureJob: onFailureJob,
ScheduleTimeout: req.Opts.ScheduleTimeout,
}, nil
}
func getCreateJobOpts(req *contracts.CreateWorkflowJobOpts, kind string) (*repository.CreateWorkflowJobOpts, error) {
steps := make([]repository.CreateWorkflowStepOpts, len(req.Steps))
for j, step := range req.Steps {
stepCp := step
parsedAction, err := types.ParseActionID(step.Action)
if err != nil {
return nil, err
}
retries := int(stepCp.Retries)
steps[j] = repository.CreateWorkflowStepOpts{
ReadableId: stepCp.ReadableId,
Action: parsedAction.String(),
Parents: stepCp.Parents,
Retries: &retries,
}
if stepCp.Timeout != "" {
steps[j].Timeout = &stepCp.Timeout
}
for _, rateLimit := range stepCp.RateLimits {
steps[j].RateLimits = append(steps[j].RateLimits, repository.CreateWorkflowStepRateLimitOpts{
Key: rateLimit.Key,
Units: int(rateLimit.Units),
})
}
if stepCp.UserData != "" {
steps[j].UserData = &stepCp.UserData
}
}
return &repository.CreateWorkflowJobOpts{
Name: req.Name,
Description: &req.Description,
Steps: steps,
Kind: kind,
}, nil
}
func toWorkflowVersion(workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow) *contracts.WorkflowVersion {
version := &contracts.WorkflowVersion{
Id: sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID),
@@ -202,6 +202,8 @@ func (ec *JobsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Mes
switch task.ID {
case "job-run-queued":
return ec.handleJobRunQueued(ctx, task)
case "job-run-cancelled":
return ec.handleJobRunCancelled(ctx, task)
case "step-run-retry":
return ec.handleStepRunRetry(ctx, task)
case "step-run-queued":
@@ -268,7 +270,58 @@ func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgq
err = g.Wait()
if err != nil {
ec.l.Err(err).Msg("could not run step run requeue")
ec.l.Err(err).Msg("could not run job run queued")
return err
}
return nil
}
func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-job-run-cancelled")
defer span.End()
payload := tasktypes.JobRunCancelledTaskPayload{}
metadata := tasktypes.JobRunCancelledTaskMetadata{}
err := ec.dv.DecodeAndValidate(task.Payload, &payload)
if err != nil {
return fmt.Errorf("could not decode job task payload: %w", err)
}
err = ec.dv.DecodeAndValidate(task.Metadata, &metadata)
if err != nil {
return fmt.Errorf("could not decode job task metadata: %w", err)
}
stepRuns, err := ec.repo.StepRun().ListStepRuns(ctx, metadata.TenantId, &repository.ListStepRunsOpts{
JobRunId: &payload.JobRunId,
})
if err != nil {
return fmt.Errorf("could not list step runs: %w", err)
}
g := new(errgroup.Group)
for _, stepRun := range stepRuns {
stepRunCp := stepRun
g.Go(func() error {
return ec.mq.AddMessage(
ctx,
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.StepRunCancelToTask(stepRunCp, "JOB_RUN_CANCELLED"),
)
})
}
err = g.Wait()
if err != nil {
ec.l.Err(err).Msg("could not run job run cancelled")
return err
}
@@ -119,6 +119,45 @@ func (wc *WorkflowsControllerImpl) handleWorkflowRunFinished(ctx context.Context
wc.l.Info().Msgf("finishing workflow run %s", workflowRunId)
// if there's an onFailure job, start that job
if workflowRun.WorkflowVersion.OnFailureJobId.Valid {
jobRun, err := wc.repo.JobRun().GetJobRunByWorkflowRunIdAndJobId(
ctx,
metadata.TenantId,
workflowRunId,
sqlchelpers.UUIDToStr(workflowRun.WorkflowVersion.OnFailureJobId),
)
if err != nil {
return fmt.Errorf("could not get job run: %w", err)
}
if !repository.IsFinalJobRunStatus(jobRun.Status) {
if workflowRun.WorkflowRun.Status == dbsqlc.WorkflowRunStatusFAILED {
err = wc.mq.AddMessage(
ctx,
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.JobRunQueuedToTask(metadata.TenantId, sqlchelpers.UUIDToStr(jobRun.ID)),
)
if err != nil {
return fmt.Errorf("could not add job run to task queue: %w", err)
}
} else if jobRun.Status != dbsqlc.JobRunStatus(db.JobRunStatusCancelled) {
// cancel the onFailure job
err = wc.mq.AddMessage(
ctx,
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.JobRunCancelledToTask(metadata.TenantId, sqlchelpers.UUIDToStr(jobRun.ID)),
)
if err != nil {
return fmt.Errorf("could not add job run to task queue: %w", err)
}
}
}
}
if workflowRun.ConcurrencyLimitStrategy.Valid {
wc.l.Info().Msgf("workflow %s has concurrency settings", workflowRunId)
@@ -217,7 +256,12 @@ func (wc *WorkflowsControllerImpl) queueWorkflowRunJobs(ctx context.Context, wor
var returnErr error
for i := range jobRuns {
jobRunId := sqlchelpers.UUIDToStr(jobRuns[i])
// don't start job runs that are onFailure
if workflowRun.WorkflowVersion.OnFailureJobId.Valid && jobRuns[i].JobId == workflowRun.WorkflowVersion.OnFailureJobId {
continue
}
jobRunId := sqlchelpers.UUIDToStr(jobRuns[i].ID)
err := wc.mq.AddMessage(
context.Background(),
+25
View File
@@ -30,6 +30,31 @@ func JobRunQueuedToTask(tenantId, jobRunId string) *msgqueue.Message {
}
}
type JobRunCancelledTaskPayload struct {
JobRunId string `json:"job_run_id" validate:"required,uuid"`
}
type JobRunCancelledTaskMetadata struct {
TenantId string `json:"tenant_id" validate:"required,uuid"`
}
func JobRunCancelledToTask(tenantId, jobRunId string) *msgqueue.Message {
payload, _ := datautils.ToJSONMap(JobRunCancelledTaskPayload{
JobRunId: jobRunId,
})
metadata, _ := datautils.ToJSONMap(JobRunCancelledTaskMetadata{
TenantId: tenantId,
})
return &msgqueue.Message{
ID: "job-run-cancelled",
Payload: payload,
Metadata: metadata,
Retries: 3,
}
}
type JobRunTimedOutTaskPayload struct {
JobRunId string `json:"job_run_id" validate:"required,uuid"`
}
+6 -17
View File
@@ -128,27 +128,16 @@ func (t *TickerImpl) runCronWorkflow(tenantId, workflowVersionId, cron, cronPare
return
}
jobRuns, err := t.repo.JobRun().ListJobRunsForWorkflowRun(ctx, tenantId, workflowRunId)
err = t.mq.AddMessage(
context.Background(),
msgqueue.WORKFLOW_PROCESSING_QUEUE,
tasktypes.WorkflowRunQueuedToTask(tenantId, workflowRunId),
)
if err != nil {
t.l.Err(err).Msg("could not list job runs for workflow run")
t.l.Err(err).Msg("could not add workflow run queued task")
return
}
for _, jobRunId := range jobRuns {
jobRunStr := sqlchelpers.UUIDToStr(jobRunId)
err = t.mq.AddMessage(
context.Background(),
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.JobRunQueuedToTask(tenantId, jobRunStr),
)
if err != nil {
t.l.Err(err).Msg("could not add job run queued task")
continue
}
}
}
}
+8 -19
View File
@@ -128,7 +128,7 @@ func (t *TickerImpl) runScheduledWorkflow(tenantId, workflowVersionId, scheduled
fs := make([]repository.CreateWorkflowRunOpt, 0)
if scheduled.ParentId.Valid {
if scheduled.ParentWorkflowRunId.Valid {
var childKey *string
if scheduled.ChildKey.Valid {
@@ -136,7 +136,7 @@ func (t *TickerImpl) runScheduledWorkflow(tenantId, workflowVersionId, scheduled
}
fs = append(fs, repository.WithParent(
sqlchelpers.UUIDToStr(scheduled.ParentId),
sqlchelpers.UUIDToStr(scheduled.ParentWorkflowRunId),
sqlchelpers.UUIDToStr(scheduled.ParentStepRunId),
int(scheduled.ChildIndex.Int32),
childKey,
@@ -163,28 +163,17 @@ func (t *TickerImpl) runScheduledWorkflow(tenantId, workflowVersionId, scheduled
return
}
jobRuns, err := t.repo.JobRun().ListJobRunsForWorkflowRun(ctx, tenantId, workflowRunId)
err = t.mq.AddMessage(
context.Background(),
msgqueue.WORKFLOW_PROCESSING_QUEUE,
tasktypes.WorkflowRunQueuedToTask(tenantId, workflowRunId),
)
if err != nil {
t.l.Err(err).Msg("could not list job runs for workflow run")
t.l.Err(err).Msg("could not add workflow run queued task")
return
}
for _, jobRunId := range jobRuns {
jobRunStr := sqlchelpers.UUIDToStr(jobRunId)
err = t.mq.AddMessage(
context.Background(),
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.JobRunQueuedToTask(tenantId, jobRunStr),
)
if err != nil {
t.l.Err(err).Msg("could not add job run queued task")
continue
}
}
// get the scheduler
schedulerVal, ok := t.scheduledWorkflows.Load(getScheduledWorkflowKey(workflowVersionId, scheduledWorkflowId))
+56 -34
View File
@@ -246,45 +246,28 @@ func (a *adminClientImpl) getPutRequest(workflow *types.Workflow) (*admincontrac
}
}
if workflow.OnFailureJob != nil {
onFailureJob, err := a.getJobOpts("on-failure", workflow.OnFailureJob)
if err != nil {
return nil, fmt.Errorf("could not get on failure job opts: %w", err)
}
opts.OnFailureJob = onFailureJob
}
jobOpts := make([]*admincontracts.CreateWorkflowJobOpts, 0)
for jobName, job := range workflow.Jobs {
jobOpt := &admincontracts.CreateWorkflowJobOpts{
Name: jobName,
Description: job.Description,
jobCp := job
res, err := a.getJobOpts(jobName, &jobCp)
if err != nil {
return nil, fmt.Errorf("could not get job opts: %w", err)
}
stepOpts := make([]*admincontracts.CreateWorkflowStepOpts, len(job.Steps))
for i, step := range job.Steps {
inputBytes, err := json.Marshal(step.With)
if err != nil {
return nil, fmt.Errorf("could not marshal step inputs: %w", err)
}
stepOpt := &admincontracts.CreateWorkflowStepOpts{
ReadableId: step.ID,
Action: step.ActionID,
Timeout: step.Timeout,
Inputs: string(inputBytes),
Parents: step.Parents,
Retries: int32(step.Retries),
}
for _, rateLimit := range step.RateLimits {
stepOpt.RateLimits = append(stepOpt.RateLimits, &admincontracts.CreateStepRateLimit{
Key: rateLimit.Key,
Units: int32(rateLimit.Units),
})
}
stepOpts[i] = stepOpt
}
jobOpt.Steps = stepOpts
jobOpts = append(jobOpts, jobOpt)
jobOpts = append(jobOpts, res)
}
opts.ScheduledTriggers = make([]*timestamppb.Timestamp, len(workflow.Triggers.Schedules))
@@ -299,3 +282,42 @@ func (a *adminClientImpl) getPutRequest(workflow *types.Workflow) (*admincontrac
Opts: opts,
}, nil
}
func (a *adminClientImpl) getJobOpts(jobName string, job *types.WorkflowJob) (*admincontracts.CreateWorkflowJobOpts, error) {
jobOpt := &admincontracts.CreateWorkflowJobOpts{
Name: jobName,
Description: job.Description,
}
stepOpts := make([]*admincontracts.CreateWorkflowStepOpts, len(job.Steps))
for i, step := range job.Steps {
inputBytes, err := json.Marshal(step.With)
if err != nil {
return nil, fmt.Errorf("could not marshal step inputs: %w", err)
}
stepOpt := &admincontracts.CreateWorkflowStepOpts{
ReadableId: step.ID,
Action: step.ActionID,
Timeout: step.Timeout,
Inputs: string(inputBytes),
Parents: step.Parents,
Retries: int32(step.Retries),
}
for _, rateLimit := range step.RateLimits {
stepOpt.RateLimits = append(stepOpt.RateLimits, &admincontracts.CreateStepRateLimit{
Key: rateLimit.Key,
Units: int32(rateLimit.Units),
})
}
stepOpts[i] = stepOpt
}
jobOpt.Steps = stepOpts
return jobOpt, nil
}
+2
View File
@@ -21,6 +21,8 @@ type Workflow struct {
Triggers WorkflowTriggers `yaml:"triggers"`
Jobs map[string]WorkflowJob `yaml:"jobs"`
OnFailureJob *WorkflowJob `yaml:"onFailureJob,omitempty"`
}
type WorkflowConcurrencyLimitStrategy string
+23 -3
View File
@@ -144,6 +144,8 @@ type WorkflowJob struct {
// The steps that are run in the job
Steps []*WorkflowStep
OnFailure *WorkflowJob
}
type WorkflowConcurrency struct {
@@ -169,20 +171,30 @@ func (c *WorkflowConcurrency) LimitStrategy(limitStrategy types.WorkflowConcurre
}
func (j *WorkflowJob) ToWorkflow(svcName string, namespace string) types.Workflow {
apiJob, err := j.ToWorkflowJob(svcName, namespace)
if err != nil {
panic(err)
}
var onFailureJob *types.WorkflowJob
if j.OnFailure != nil {
onFailureJob, err = j.OnFailure.ToWorkflowJob(svcName, namespace)
if err != nil {
panic(err)
}
}
jobs := map[string]types.WorkflowJob{
j.Name: *apiJob,
}
w := types.Workflow{
Name: namespace + j.Name,
Jobs: jobs,
Name: namespace + j.Name,
Jobs: jobs,
OnFailureJob: onFailureJob,
}
if j.Concurrency != nil {
@@ -235,6 +247,14 @@ func (j *WorkflowJob) ToActionMap(svcName string) map[string]any {
res["concurrency:"+getFnName(j.Concurrency.fn)] = j.Concurrency.fn
}
if j.OnFailure != nil {
onFailureActionMap := j.OnFailure.ToActionMap(svcName)
for k, v := range onFailureActionMap {
res[k] = v
}
}
return res
}
@@ -0,0 +1,20 @@
/*
Warnings:
- A unique constraint covering the columns `[onFailureJobId]` on the table `WorkflowVersion` will be added. If there are existing duplicate values, this will fail.
*/
-- CreateEnum
CREATE TYPE "JobKind" AS ENUM ('DEFAULT', 'ON_FAILURE');
-- AlterTable
ALTER TABLE "Job" ADD COLUMN "kind" "JobKind" NOT NULL DEFAULT 'DEFAULT';
-- AlterTable
ALTER TABLE "WorkflowVersion" ADD COLUMN "onFailureJobId" UUID;
-- CreateIndex
CREATE UNIQUE INDEX "WorkflowVersion_onFailureJobId_key" ON "WorkflowVersion"("onFailureJobId");
-- AddForeignKey
ALTER TABLE "WorkflowVersion" ADD CONSTRAINT "WorkflowVersion_onFailureJobId_fkey" FOREIGN KEY ("onFailureJobId") REFERENCES "Job"("id") ON DELETE SET NULL ON UPDATE CASCADE;
+18
View File
@@ -330,6 +330,10 @@ model WorkflowVersion {
// the declared jobs
jobs Job[]
// a job that runs when the workflow fails
onFailureJob Job? @relation("OnFailureJob", fields: [onFailureJobId], references: [id])
onFailureJobId String? @unique @db.Uuid
// all runs for the workflow
runs WorkflowRun[]
@@ -469,6 +473,14 @@ model WorkflowTriggerScheduledRef {
@@unique([parentId, parentStepRunId, childKey])
}
enum JobKind {
// DEFAULT job kinds get started immediately when the workflow execution starts
DEFAULT
// ON_FAILURE job kinds get started when the workflow fails
ON_FAILURE
}
model Job {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
@@ -499,6 +511,12 @@ model Job {
// any runs for this job
runs JobRun[]
// the kind of job
kind JobKind @default(DEFAULT)
// a link to workflow
failureRelations WorkflowVersion? @relation("OnFailureJob")
// jobs names are unique per workflow
@@unique([workflowVersionId, name])
}