diff --git a/api-contracts/openapi/paths/workflow/workflow.yaml b/api-contracts/openapi/paths/workflow/workflow.yaml index 56200bb29..9fa8bfca5 100644 --- a/api-contracts/openapi/paths/workflow/workflow.yaml +++ b/api-contracts/openapi/paths/workflow/workflow.yaml @@ -1309,6 +1309,18 @@ cronsList: format: uuid minLength: 36 maxLength: 36 + - description: The workflow name to get runs for. + in: query + name: workflowName + required: false + schema: + type: string + - description: The cron name to get runs for. + in: query + name: cronName + required: false + schema: + type: string - description: A list of metadata key value pairs to filter by in: query name: additionalMetadata diff --git a/api/v1/server/handlers/workflows/list_crons.go b/api/v1/server/handlers/workflows/list_crons.go index 653046970..c232b59b0 100644 --- a/api/v1/server/handlers/workflows/list_crons.go +++ b/api/v1/server/handlers/workflows/list_crons.go @@ -58,6 +58,14 @@ func (t *WorkflowService) CronWorkflowList(ctx echo.Context, request gen.CronWor listOpts.WorkflowId = &workflowIdStr } + if request.Params.CronName != nil { + listOpts.CronName = request.Params.CronName + } + + if request.Params.WorkflowName != nil { + listOpts.WorkflowName = request.Params.WorkflowName + } + if request.Params.AdditionalMetadata != nil { additionalMetadata := make(map[string]interface{}, len(*request.Params.AdditionalMetadata)) diff --git a/api/v1/server/oas/gen/openapi.gen.go b/api/v1/server/oas/gen/openapi.gen.go index a83123736..b420ba090 100644 --- a/api/v1/server/oas/gen/openapi.gen.go +++ b/api/v1/server/oas/gen/openapi.gen.go @@ -2058,6 +2058,12 @@ type CronWorkflowListParams struct { // WorkflowId The workflow id to get runs for. WorkflowId *openapi_types.UUID `form:"workflowId,omitempty" json:"workflowId,omitempty"` + // WorkflowName The workflow name to get runs for. + WorkflowName *string `form:"workflowName,omitempty" json:"workflowName,omitempty"` + + // CronName The cron name to get runs for. + CronName *string `form:"cronName,omitempty" json:"cronName,omitempty"` + // AdditionalMetadata A list of metadata key value pairs to filter by AdditionalMetadata *[]string `form:"additionalMetadata,omitempty" json:"additionalMetadata,omitempty"` @@ -4531,6 +4537,20 @@ func (w *ServerInterfaceWrapper) CronWorkflowList(ctx echo.Context) error { return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter workflowId: %s", err)) } + // ------------- Optional query parameter "workflowName" ------------- + + err = runtime.BindQueryParameter("form", true, false, "workflowName", ctx.QueryParams(), ¶ms.WorkflowName) + if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter workflowName: %s", err)) + } + + // ------------- Optional query parameter "cronName" ------------- + + err = runtime.BindQueryParameter("form", true, false, "cronName", ctx.QueryParams(), ¶ms.CronName) + if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter cronName: %s", err)) + } + // ------------- Optional query parameter "additionalMetadata" ------------- err = runtime.BindQueryParameter("form", true, false, "additionalMetadata", ctx.QueryParams(), ¶ms.AdditionalMetadata) @@ -13037,41 +13057,41 @@ var swaggerSpec = []string{ "RerWDRtsjTrWZ8+x+AV5X6bb72TAygE8A5h4g1OWtHIKvQjIHbQlPwGYDEJr9pO3b0zZTzYQudemzIYu", "ebrYmi29sV9Alrhf57vJQux0M8Faulk0rzIdUwjvQBYR/8NhryAqNpGYSc39fpHJefl3b/zksQnMk4pP", "9lfimzC7usue1dtbq0z0psZ0LNvpAW8MSDCtXPbUWUyvvl6nfk/CkeEaDCxi1KtXJa+6iGfU3R41JF3i", - "ZLOJmxt8EKRJ3GyR0FbeX8k4B4qkaDJpDJ84SZP4VZspO5M1Um0sCum0E0iUSbzfkBzYdnBbdfLiXcoM", - "XJOrcvzk3Yl8mCtLmanzGXZPmzl+Wl/mTE1tbjh3ZgEZS9iwnWIy2LEVTbAmg5aqpYOf9D978le3YhBV", - "VeV8NUAJZ8dLQ6jV28AqYHTzxSEcqzgYN7HLy1muqmBGUztvfpEgbp57dddtSzLXLgfwbDFnrUl1dmpz", - "F1zfrZT1CuSDm/5mNODq59ad78239905cpvPkbIwvushkrVf7wlyq4+3FLg5SCnSLDe6JbB44++6j29D", - "8BneYxthE3enm3ILFNCGCSAZhk7FjWTbRY60I9ZXHC5dgLtHcegEFWvYGqQvKA6bodl5DwpBM+iBOwpo", - "JabwEWD5xE9fgv/m8M3R3iH939Xh4Qf2v/+x4F50P6YTmIk3BATuUSh811p9FOIxvEtSuE6QP7IZVglz", - "DZbvUIzwdHGYZf+N4nlVQK8U0+vzCFbdb6/WH1i2HbtjzVqiCNfjCGSBgy7JcoEnQKOKrsj+evZcx/jg", - "XS732JnhnRm+eTO8sy072/JFXgbgJcujMgHUpfFu1u9rKFWa63kKaphFVD02eA1Vy0X8hyPZufMibrMX", - "cX3nIkUAOxUu0RlTnTG1M8ZUvoxcVK/EN+tUd14xuPLSbrhwe1XCdF6H1VolFgtgvXbJwU/1514l00lj", - "VJIZ5JY2y47HJhlwYM3sa0T11oYrmXe3i1cqxytZ8NQuIMFCGw2RSythwJ2u1rNT3LdOddyp4l2Pa1qv", - "HHEzDFQyg+f8DU1tPU/gxfDR/pLG/SHNFe+wO+mH60+v+itYc/aCWtA2WmnUsA1tKoNYN3+j6R/bBXnq", - "WZPt8HdicfPlD7cu5aQQdHVUvp5HjJosLviRzfJYWgRCIrvbgxVTYpjFnRTepBSWO6BtQBv5a7UbNliq", - "qb05qkvgV3nS7MSvk/gVBkmTTbxykcvzmO8FSRaThhAd1kZmhZIJ+MEDQBEYR5BJX03cmE/jnyHhedLx", - "CZtx50VvU/KuHU/eV9isBY/enFQ4+XTecMsdfQFJi6X0K7J/hmGKD4IsTWE9Z2N+OuANPdqtwr3XGKaf", - "ITkRg62R7uhMLemMQdyVgnn5UjAwyFJEnpgYD5LkHsHjjMquP2+oqCo9biuSmyR3tv0GMp4gMs3GBwGI", - "ojEI7q3kfJLM5hEkkNP0BZ3fM+ojOhEvhPGZDX1BcXkihy8R+NvDNw33CYGYN6zOO4UgFFXfooRvhrHK", - "oBLrzyVkFnAnF1icwxF9mIDULgpG9OtiiGNd22ONwbN+nDHoWiIsSSYRXA+9saF/cXrj6FsxveWI++Xo", - "DcUPiECX0pDSGuYdmNHtpL7pCFes70DMtUYtrk/kFD8RISw3prjAzl50VqssO2oJeznlXRlOiAXaOwBB", - "AOfE7nk7Zt+x8rCJSSrUpm8+7+Ovx5/EB+cTNZcurKE+vnIT/XVRAHn9foakyt6701cKWZ7Bmppm9Hs7", - "+uJ9/HVVCKODr4C++Mo7+mqo306RtAB9RckExXayOksm2EOxB5hu3K8xMM7YQOuhJaaC6fgbqrHqdI6O", - "kskEhh6Ku+PzVh2fi2qdUo3rOTlKJklGGpghyYgbNyTZy/t6BI0mW1ZxqCPSBmOUUY8r2c7gbAxTPEXz", - "FkcgrZPbMYirkK95N/GMaK0Ebp60/XlIR1F3JlrkTKRjsJkk5wDjxyStiUTgYlJIUk+2rxOpl3LM9dkY", - "J1MQT9RE22RsBAyyUCGqE+c7JM45WRUp3YGJUjihgiytO/TxFrjWIlFxOutiGwnGNjGMRF53zbUTdrok", - "IVebB0cguF/LDcOIjrzFFwwNoqbljcMDTLEAoba4rWgn41cwTB8MNuIgvks+Q/JNDLrS0h4apHlGh6P9", - "w/1DU84ILWzkT9X1xqFqx1XNYkuhcjXk/B16KSRZGheQV7KzqZTK4hjFk3yKH3tyyL1kzp+o5rPJTXuE", - "42mS3O+JKKKDn+IHh/d4VFOI1tUoI/67+1M7MZA9ikdNtOEgHse3axK+Ti+8vF4ov5fTydQauiNa3Dgx", - "x4HAs8shWTaVZfHqOUbYPdg1scbW8s1qgt849Dz2TaCGYmYoJrRJXZU3VGBHbVfHnlvEnswnUNmitjyq", - "eJP98exQ6dpgbXAKc3yYKiIE6wJODTp+d8JNWwf+iRV33rBKRGnltQ41musDSJlZTamQBNMaX1ctIfNW", - "O0PLa3AlMAQU9IZNVwgMZBJlm3vE4shrHLKO08ycJhhiGWYraZPyywynzCQqfNwpFUKLc9FWPm9ok9VD", - "Adi9rtr86yrTcUijmAUfN/SaLCx3Tmhhcr2GVz4LvuzpeOuleUt/QrQMY7mYfe7c1c4O3AoGW1/laY4M", - "14fO3OoqctmmjUMniVA2Dzt5YDUQl2POBjPRKb0+3aRiHn3FeA/qpsOqKVuk098GfjaktOQJKVdQb2jx", - "akNmwCZpks1ZntAcBLlRVlBYpy/wyW/M4bBmIbFk7m55qdSl795Ca2KhfOGtBJfMK2ONDZEpEdpmelko", - "wctWSq4rA7vse4M75t3GGaUOGPYYV0WAQEwUTyHs3UESTGFoyyadC/4tN6QEGSyYNebFcsVo8LZKEtOl", - "hulSw6whNUwr0SxkA3a41SpociexLGJrdsgF8yvI5TVLORkwtZwp2Mm7rTIBc1Jc1AQsB/6NIUhhqgL/", - "esZQQBZJxuVBlkb+B99/vnn+fwEAAP//hXFPsxF1AgA=", + "ZLOJmxt8EKRJ3GyR0FbeX8k4B4qkaDJpDJ84SZP4VZspO5M1Um0sCum0E0iUSbzfkBzYdnBbw1mXztwW", + "vPMmU8o4JaP4NtPRDu2n2s28xzWZOMdP3p3I9rmyhKC6FMHuSUHHT+vLC6oZBRvODFpAxhIWeqd2DVZ6", + "Rc+tyVynSvfgJ/3PnvzVrdRFVRE7X3xQwtnxwhdq9TawChjdfOkLxxoVxk3sso6Wa0aY0dTurqJIEDfP", + "vbrLxCWZa5fDk7aYs9akOju1uQuO/VbKegXywU1/Mxpw9eLrVwvNsQndKXmbT8my7L/roZC13+D5eBsP", + "73OQUqRZ7qtLYPHG33UP5obgM7w2N8ImbobXC9ex8VGGhwkgGYZOpZtk20WOtCPWVxwuXYC7R3HoBBVr", + "2BqkLygOm6HZeQ8KQTPogTsKaCVi8hFg+YBRX4L/5vDN0d4h/d/V4eEH9r//sXqoWPdjOoGZeENA4B6F", + "wnetREghHsO7JIXrBPkjm2GVMNdg+Q7FCE8Xh1n23yieVwX0SjG9Po9g1f32av2BZduxO9asJUZyPY5A", + "FhbpkgoYeAI0quiK7K/nBnaMft7lYpadGd6Z4Zs3wzvbsrMtX+TdA16y+CsTQF2S8mb9voZCrLmep6CG", + "WUTVY4PXULVcxH84kp07L+I2exHXdy5SBLBT4RKdMdUZUztjTOXLyEX1SnyzTlX1FYMrL+2Gy9JXJUzn", + "dVitVWKxANZrlxz8VH/uVfK4NEYlmUFuabPseGySAQfWvMVGVG9tuJJ5d7t4pXK8kgVP7QISLLTRELm0", + "Egbc6VpEO8V961THnSre9bim9coRN8NApWp4zl8I1VYrBV4MH+3vhNyfCV3xDruTXLn5xUp9boZa0DZa", + "R9WwDW3qnlg3f6PJLdsFeeo5oe3wd2Jx88Udty6hphB0dVS+nieamiwu+JHN8lhaBEIiu9uDFVNimMWd", + "FN6kFJY7oG1AG/lrtRs2WIiqvTmqS+BXedLsxK+T+BUGSZNNvHKRy7O07wVJFpOGEB3WRua8kuUFwANA", + "ERhHkElfTdyYT+OfIeFZ4PEJm3HnRW9TarIdT01Y2KwFj96cVDj5dN5wyx19AUmLJSwssn+GYYoPgixN", + "YT1nY3464A092q3CvdcYpp8hORGDrZHu6Ewt6YxB3BW6eflCNzDIUkSemBgPkuQeweOMyq4/b6ioKj1u", + "K5KbJHe2/QYyniAyzcYHAYiiMQjureR8kszmESSQ0/QFnd8z6iM6ES/z8ZkNfUFxeSKHLxH428M3DfcJ", + "gZg3rM47hSAUNe2ihG+GsYaiEuvPJWQWcCcXWJzDEX2YgNQuCkb062KIY13bY43Bs36cMehaIixJJhFc", + "D72xoX9xeuPoWzG95Yj75egNxQ+IQJfCl9Ia5h2Y0e2kvukIV6zvQMy1Ri2uT+QUPxEhLDemuMDOXnRW", + "qyz3awl7OeVdGU6IBdo7AEEA58TueTtm37HysIlJKtSmbz7v46/Hn8QH5xM1F2asoT6+chP9dVEAirw4", + "tit7705fKWRZFGsqttHv7eiL9/HXVf+MDr4C+uIr7+iroTo9RdIC9BUlExTbyeosmWAPxR5gunG/xsA4", + "YwOth5aYCqbjb6iCrNM5OkomExh6KO6Oz1t1fC6qdUo1rufkKJkkGWlghiQjbtyQZC/v6xE0mmxZPaWO", + "SBuMUUY9rmQ7g7MxTPEUzVscgbRObscgrkK+5t3EM6K1Erh50vbnIR1F3ZlokTORjsFmkpwDjB+TtCYS", + "gYtJIUk92b5OpF7KMddnY5xMQTxRE22TsREwyEKFqE6c75A452RVpHQHJkrhhAqytO7Qx1vgWotExems", + "i20kGNvEMBJ53TXXTtjpkoRcbR4cgeB+LTcMIzryFl8wNIialjcODzDFAoTa0r2inYxfwTB9MNiIg/gu", + "+QzJNzHoSguXaJDmGR2O9g/3D005I7SwkT9V1xuHmiRXNYsthcrVkPN36KWQZGlcQF7JzqZSKotjFE/y", + "KX7sySH3kjl/oprPJjftEY6nSXK/J6KIDn6KHxze41FNIVpXo4z47+5P7cRA9igeNdGGg3gc365J+Dq9", + "8PJ6ofxeTidTa+iOaHHjxBwHAs8uh2TZVBb9q+cYYfdg18QaW8s3qwl+49Dz2DeBGoqZoZjQJnVV3lCB", + "HbVdHXtuEXsyn0Bli9ryqOJN9sezQx1vg7XBKczxYaqIEKwLODXo+N0JN20d+CdW3HnDKhGlldc61Giu", + "DyBlZjWlQhJMa3xdtYTMW+0MLa/BlcAQUNAbNl0hMJBJlG3uEYsjr3HIOk4zc5pgiGWYraRNyi8znDKT", + "qPBxp1QILc5FW/m8oU1WDwVg97pq86+rTMchjWIWfNzQa7Kw3Dmhhcn1Gl75LPiyp+Otl+Yt/QnRMozl", + "Yva5c1c7O3ArGGx9dbU5MlwfOnOrq8hlmzYOnSRC2Tzs5IHVQFyOORvMRKf0+nSTinn0FeM9qJsOq6Zs", + "kU5/G/jZkNKSJ6RcQb2hxasNmQGbpEk2Z3lCcxDkRllBYZ2+wCe/MYfDmoXEkrm75aVSl757C62JhfKF", + "txJcMq+MNTZEpkRom+lloQQvWym5rgzssu8N7ph3G2eUOmDYY1wVAQIxUTyFsHcHSTCFoS2bdC74t9yQ", + "EmSwYNaYF8sVo8HbKklMlxqmSw2zhtQwrUSzkA3Y4VaroMmdxLKIrdkhF8yvIJfXLOVkwNRypmAn77bK", + "BMxJcVETsBz4N4YghakK/OsZQwFZJBmXB1ka+R98//nm+f8FAAD//zEI5IvvdQIA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/frontend/app/src/lib/api/generated/Api.ts b/frontend/app/src/lib/api/generated/Api.ts index 2f091e08a..867dcb7c5 100644 --- a/frontend/app/src/lib/api/generated/Api.ts +++ b/frontend/app/src/lib/api/generated/Api.ts @@ -1732,6 +1732,10 @@ export class Api extends HttpClient sqlc.narg('additionalMetadata')::jsonb) + AND (sqlc.narg('cronName')::TEXT IS NULL OR c."name" = sqlc.narg('cronName')::TEXT) + AND (sqlc.narg('workflowName')::TEXT IS NULL OR w."name" = sqlc.narg('workflowName')::TEXT) ORDER BY case when @orderBy = 'name ASC' THEN w."name" END ASC, case when @orderBy = 'name DESC' THEN w."name" END DESC, @@ -856,7 +858,10 @@ WHERE AND (@cronTriggerId::uuid IS NULL OR c."id" = @cronTriggerId::uuid) AND (@workflowId::uuid IS NULL OR w."id" = @workflowId::uuid) AND (sqlc.narg('additionalMetadata')::jsonb IS NULL OR - c."additionalMetadata" @> sqlc.narg('additionalMetadata')::jsonb); + c."additionalMetadata" @> sqlc.narg('additionalMetadata')::jsonb) + AND (sqlc.narg('cronName')::TEXT IS NULL OR c."name" = sqlc.narg('cronName')::TEXT) + AND (sqlc.narg('workflowName')::TEXT IS NULL OR w."name" = sqlc.narg('workflowName')::TEXT) +; -- name: DeleteWorkflowTriggerCronRef :exec DELETE FROM "WorkflowTriggerCronRef" diff --git a/pkg/repository/postgres/dbsqlc/workflows.sql.go b/pkg/repository/postgres/dbsqlc/workflows.sql.go index 42c4928f3..57c4ec2fa 100644 --- a/pkg/repository/postgres/dbsqlc/workflows.sql.go +++ b/pkg/repository/postgres/dbsqlc/workflows.sql.go @@ -80,6 +80,8 @@ WHERE AND ($3::uuid IS NULL OR w."id" = $3::uuid) AND ($4::jsonb IS NULL OR c."additionalMetadata" @> $4::jsonb) + AND ($5::TEXT IS NULL OR c."name" = $5::TEXT) + AND ($6::TEXT IS NULL OR w."name" = $6::TEXT) ` type CountCronWorkflowsParams struct { @@ -87,6 +89,8 @@ type CountCronWorkflowsParams struct { Crontriggerid pgtype.UUID `json:"crontriggerid"` Workflowid pgtype.UUID `json:"workflowid"` AdditionalMetadata []byte `json:"additionalMetadata"` + CronName pgtype.Text `json:"cronName"` + WorkflowName pgtype.Text `json:"workflowName"` } // Get all of the latest workflow versions for the tenant @@ -96,6 +100,8 @@ func (q *Queries) CountCronWorkflows(ctx context.Context, db DBTX, arg CountCron arg.Crontriggerid, arg.Workflowid, arg.AdditionalMetadata, + arg.CronName, + arg.WorkflowName, ) var count int64 err := row.Scan(&count) @@ -1640,16 +1646,18 @@ WHERE AND ($3::uuid IS NULL OR w."id" = $3::uuid) AND ($4::jsonb IS NULL OR c."additionalMetadata" @> $4::jsonb) + AND ($5::TEXT IS NULL OR c."name" = $5::TEXT) + AND ($6::TEXT IS NULL OR w."name" = $6::TEXT) ORDER BY - case when $5 = 'name ASC' THEN w."name" END ASC, - case when $5 = 'name DESC' THEN w."name" END DESC, - case when $5 = 'createdAt ASC' THEN c."createdAt" END ASC , - case when $5 = 'createdAt DESC' THEN c."createdAt" END DESC, + case when $7 = 'name ASC' THEN w."name" END ASC, + case when $7 = 'name DESC' THEN w."name" END DESC, + case when $7 = 'createdAt ASC' THEN c."createdAt" END ASC , + case when $7 = 'createdAt DESC' THEN c."createdAt" END DESC, t."id" ASC OFFSET - COALESCE($6, 0) + COALESCE($8, 0) LIMIT - COALESCE($7, 50) + COALESCE($9, 50) ` type ListCronWorkflowsParams struct { @@ -1657,6 +1665,8 @@ type ListCronWorkflowsParams struct { Crontriggerid pgtype.UUID `json:"crontriggerid"` Workflowid pgtype.UUID `json:"workflowid"` AdditionalMetadata []byte `json:"additionalMetadata"` + CronName pgtype.Text `json:"cronName"` + WorkflowName pgtype.Text `json:"workflowName"` Orderby interface{} `json:"orderby"` Offset interface{} `json:"offset"` Limit interface{} `json:"limit"` @@ -1697,6 +1707,8 @@ func (q *Queries) ListCronWorkflows(ctx context.Context, db DBTX, arg ListCronWo arg.Crontriggerid, arg.Workflowid, arg.AdditionalMetadata, + arg.CronName, + arg.WorkflowName, arg.Orderby, arg.Offset, arg.Limit, diff --git a/pkg/repository/postgres/workflow.go b/pkg/repository/postgres/workflow.go index dd1866664..2ea32b803 100644 --- a/pkg/repository/postgres/workflow.go +++ b/pkg/repository/postgres/workflow.go @@ -332,7 +332,18 @@ func (w *workflowAPIRepository) ListCronWorkflows(ctx context.Context, tenantId countOpts.Workflowid = sqlchelpers.UUIDFromStr(*opts.WorkflowId) } + if opts.CronName != nil { + listOpts.CronName = sqlchelpers.TextFromStr(*opts.CronName) + countOpts.CronName = sqlchelpers.TextFromStr(*opts.CronName) + } + + if opts.WorkflowName != nil { + listOpts.WorkflowName = sqlchelpers.TextFromStr(*opts.WorkflowName) + countOpts.WorkflowName = sqlchelpers.TextFromStr(*opts.WorkflowName) + } + cronWorkflows, err := w.queries.ListCronWorkflows(ctx, w.pool, listOpts) + if err != nil { return nil, 0, err } diff --git a/pkg/repository/workflow_run.go b/pkg/repository/workflow_run.go index 743ec941b..0ca0c208c 100644 --- a/pkg/repository/workflow_run.go +++ b/pkg/repository/workflow_run.go @@ -444,6 +444,12 @@ type ListCronWorkflowsOpts struct { // (optional) additional metadata for the workflow run AdditionalMetadata map[string]interface{} `validate:"omitempty"` + + // (optional) the name of the cron to filter by + CronName *string `validate:"omitempty"` + + // (optional) the name of the workflow to filter by + WorkflowName *string `validate:"omitempty"` } type WorkflowRunAPIRepository interface { diff --git a/sdks/python/docs/runnables.md b/sdks/python/docs/runnables.md index f37169501..1809e4b59 100644 --- a/sdks/python/docs/runnables.md +++ b/sdks/python/docs/runnables.md @@ -30,6 +30,8 @@ - name - tasks - is_durable + - list_runs + - aio_list_runs ## Standalone @@ -50,3 +52,5 @@ - aio_create_cron - create_bulk_run_item - is_durable + - list_runs + - aio_list_runs diff --git a/sdks/python/hatchet_sdk/clients/admin.py b/sdks/python/hatchet_sdk/clients/admin.py index 3c12534f6..3a7acd041 100644 --- a/sdks/python/hatchet_sdk/clients/admin.py +++ b/sdks/python/hatchet_sdk/clients/admin.py @@ -141,19 +141,6 @@ class AdminClient: priority=_options.priority, ) - def _prepare_put_workflow_request( - self, - name: str, - workflow: workflow_protos.CreateWorkflowVersionRequest, - overrides: workflow_protos.CreateWorkflowVersionRequest | None = None, - ) -> workflow_protos.CreateWorkflowVersionRequest: - if overrides is not None: - workflow.MergeFrom(overrides) - - workflow.name = name - - return workflow - def _parse_schedule( self, schedule: datetime | timestamp_pb2.Timestamp ) -> timestamp_pb2.Timestamp: @@ -191,11 +178,9 @@ class AdminClient: @tenacity_retry async def aio_put_workflow( self, - name: str, workflow: workflow_protos.CreateWorkflowVersionRequest, - overrides: workflow_protos.CreateWorkflowVersionRequest | None = None, ) -> workflow_protos.CreateWorkflowVersionResponse: - return await asyncio.to_thread(self.put_workflow, name, workflow, overrides) + return await asyncio.to_thread(self.put_workflow, workflow) @tenacity_retry async def aio_put_rate_limit( @@ -221,12 +206,8 @@ class AdminClient: @tenacity_retry def put_workflow( self, - name: str, workflow: workflow_protos.CreateWorkflowVersionRequest, - overrides: workflow_protos.CreateWorkflowVersionRequest | None = None, ) -> workflow_protos.CreateWorkflowVersionResponse: - opts = self._prepare_put_workflow_request(name, workflow, overrides) - if self.client is None: conn = new_conn(self.config, False) self.client = AdminServiceStub(conn) @@ -234,7 +215,7 @@ class AdminClient: return cast( workflow_protos.CreateWorkflowVersionResponse, self.client.PutWorkflow( - opts, + workflow, metadata=get_metadata(self.token), ), ) @@ -324,6 +305,9 @@ class AdminClient: additional_metadata=options.additional_metadata, desired_worker_id=desired_worker_id, priority=options.priority, + namespace=options.namespace, + sticky=options.sticky, + key=options.key, ) namespace = options.namespace or self.namespace diff --git a/sdks/python/hatchet_sdk/clients/rest/api/workflow_api.py b/sdks/python/hatchet_sdk/clients/rest/api/workflow_api.py index 7c51705d0..d3e2084a3 100644 --- a/sdks/python/hatchet_sdk/clients/rest/api/workflow_api.py +++ b/sdks/python/hatchet_sdk/clients/rest/api/workflow_api.py @@ -88,6 +88,12 @@ class WorkflowApi: Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, + workflow_name: Annotated[ + Optional[StrictStr], Field(description="The workflow name to get runs for.") + ] = None, + cron_name: Annotated[ + Optional[StrictStr], Field(description="The cron name to get runs for.") + ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), @@ -123,6 +129,10 @@ class WorkflowApi: :type limit: int :param workflow_id: The workflow id to get runs for. :type workflow_id: str + :param workflow_name: The workflow name to get runs for. + :type workflow_name: str + :param cron_name: The cron name to get runs for. + :type cron_name: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param order_by_field: The order by field @@ -156,6 +166,8 @@ class WorkflowApi: offset=offset, limit=limit, workflow_id=workflow_id, + workflow_name=workflow_name, + cron_name=cron_name, additional_metadata=additional_metadata, order_by_field=order_by_field, order_by_direction=order_by_direction, @@ -198,6 +210,12 @@ class WorkflowApi: Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, + workflow_name: Annotated[ + Optional[StrictStr], Field(description="The workflow name to get runs for.") + ] = None, + cron_name: Annotated[ + Optional[StrictStr], Field(description="The cron name to get runs for.") + ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), @@ -233,6 +251,10 @@ class WorkflowApi: :type limit: int :param workflow_id: The workflow id to get runs for. :type workflow_id: str + :param workflow_name: The workflow name to get runs for. + :type workflow_name: str + :param cron_name: The cron name to get runs for. + :type cron_name: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param order_by_field: The order by field @@ -266,6 +288,8 @@ class WorkflowApi: offset=offset, limit=limit, workflow_id=workflow_id, + workflow_name=workflow_name, + cron_name=cron_name, additional_metadata=additional_metadata, order_by_field=order_by_field, order_by_direction=order_by_direction, @@ -308,6 +332,12 @@ class WorkflowApi: Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, + workflow_name: Annotated[ + Optional[StrictStr], Field(description="The workflow name to get runs for.") + ] = None, + cron_name: Annotated[ + Optional[StrictStr], Field(description="The cron name to get runs for.") + ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), @@ -343,6 +373,10 @@ class WorkflowApi: :type limit: int :param workflow_id: The workflow id to get runs for. :type workflow_id: str + :param workflow_name: The workflow name to get runs for. + :type workflow_name: str + :param cron_name: The cron name to get runs for. + :type cron_name: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param order_by_field: The order by field @@ -376,6 +410,8 @@ class WorkflowApi: offset=offset, limit=limit, workflow_id=workflow_id, + workflow_name=workflow_name, + cron_name=cron_name, additional_metadata=additional_metadata, order_by_field=order_by_field, order_by_direction=order_by_direction, @@ -401,6 +437,8 @@ class WorkflowApi: offset, limit, workflow_id, + workflow_name, + cron_name, additional_metadata, order_by_field, order_by_direction, @@ -441,6 +479,14 @@ class WorkflowApi: _query_params.append(("workflowId", workflow_id)) + if workflow_name is not None: + + _query_params.append(("workflowName", workflow_name)) + + if cron_name is not None: + + _query_params.append(("cronName", cron_name)) + if additional_metadata is not None: _query_params.append(("additionalMetadata", additional_metadata)) diff --git a/sdks/python/hatchet_sdk/features/cron.py b/sdks/python/hatchet_sdk/features/cron.py index ef6b71064..aeeceab71 100644 --- a/sdks/python/hatchet_sdk/features/cron.py +++ b/sdks/python/hatchet_sdk/features/cron.py @@ -172,6 +172,8 @@ class CronClient(BaseRestClient): additional_metadata: JSONSerializableMapping | None = None, order_by_field: CronWorkflowsOrderByField | None = None, order_by_direction: WorkflowRunOrderByDirection | None = None, + workflow_name: str | None = None, + cron_name: str | None = None, ) -> CronWorkflowsList: """ Retrieve a list of all workflow cron triggers matching the criteria. @@ -182,6 +184,8 @@ class CronClient(BaseRestClient): :param additional_metadata: Filter by additional metadata keys. :param order_by_field: The field to order the list by. :param order_by_direction: The direction to order the list by. + :param workflow_name: The name of the workflow to filter by. + :param cron_name: The name of the cron trigger to filter by. :return: A list of cron workflows. """ @@ -193,6 +197,8 @@ class CronClient(BaseRestClient): additional_metadata=additional_metadata, order_by_field=order_by_field, order_by_direction=order_by_direction, + workflow_name=workflow_name, + cron_name=cron_name, ) def list( @@ -203,6 +209,8 @@ class CronClient(BaseRestClient): additional_metadata: JSONSerializableMapping | None = None, order_by_field: CronWorkflowsOrderByField | None = None, order_by_direction: WorkflowRunOrderByDirection | None = None, + workflow_name: str | None = None, + cron_name: str | None = None, ) -> CronWorkflowsList: """ Retrieve a list of all workflow cron triggers matching the criteria. @@ -213,6 +221,8 @@ class CronClient(BaseRestClient): :param additional_metadata: Filter by additional metadata keys. :param order_by_field: The field to order the list by. :param order_by_direction: The direction to order the list by. + :param workflow_name: The name of the workflow to filter by. + :param cron_name: The name of the cron trigger to filter by. :return: A list of cron workflows. """ @@ -227,6 +237,8 @@ class CronClient(BaseRestClient): ), order_by_field=order_by_field, order_by_direction=order_by_direction, + workflow_name=workflow_name, + cron_name=cron_name, ) def get(self, cron_id: str) -> CronWorkflows: diff --git a/sdks/python/hatchet_sdk/features/runs.py b/sdks/python/hatchet_sdk/features/runs.py index 97f83639b..ce97253af 100644 --- a/sdks/python/hatchet_sdk/features/runs.py +++ b/sdks/python/hatchet_sdk/features/runs.py @@ -131,7 +131,7 @@ class RunsClient(BaseRestClient): async def aio_list( self, - since: datetime = datetime.now() - timedelta(hours=1), + since: datetime | None = None, only_tasks: bool = False, offset: int | None = None, limit: int | None = None, @@ -160,7 +160,7 @@ class RunsClient(BaseRestClient): """ return await asyncio.to_thread( self.list, - since=since, + since=since or datetime.now() - timedelta(days=1), only_tasks=only_tasks, offset=offset, limit=limit, @@ -174,7 +174,7 @@ class RunsClient(BaseRestClient): def list( self, - since: datetime = datetime.now() - timedelta(hours=1), + since: datetime | None = None, only_tasks: bool = False, offset: int | None = None, limit: int | None = None, @@ -204,7 +204,7 @@ class RunsClient(BaseRestClient): with self.client() as client: return self._wra(client).v1_workflow_run_list( tenant=self.client_config.tenant_id, - since=since, + since=since or datetime.now() - timedelta(days=1), only_tasks=only_tasks, offset=offset, limit=limit, diff --git a/sdks/python/hatchet_sdk/runnables/standalone.py b/sdks/python/hatchet_sdk/runnables/standalone.py index df2510859..bdd7437e4 100644 --- a/sdks/python/hatchet_sdk/runnables/standalone.py +++ b/sdks/python/hatchet_sdk/runnables/standalone.py @@ -1,4 +1,5 @@ -from datetime import datetime +import asyncio +from datetime import datetime, timedelta from typing import Any, Generic, cast, get_type_hints from hatchet_sdk.clients.admin import ( @@ -7,7 +8,10 @@ from hatchet_sdk.clients.admin import ( WorkflowRunTriggerConfig, ) from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows +from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus +from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary from hatchet_sdk.contracts.workflows_pb2 import WorkflowVersion +from hatchet_sdk.logger import logger from hatchet_sdk.runnables.task import Task from hatchet_sdk.runnables.types import EmptyModel, R, TWorkflowInput from hatchet_sdk.runnables.workflow import BaseWorkflow, Workflow @@ -294,3 +298,88 @@ class Standalone(BaseWorkflow[TWorkflowInput], Generic[TWorkflowInput, R]): def to_task(self) -> Task[TWorkflowInput, R]: return self._task + + def list_runs( + self, + since: datetime | None = None, + until: datetime | None = None, + limit: int = 100, + offset: int | None = None, + statuses: list[V1TaskStatus] | None = None, + additional_metadata: dict[str, str] | None = None, + worker_id: str | None = None, + parent_task_external_id: str | None = None, + ) -> list[V1TaskSummary]: + """ + List runs of the workflow. + + :param since: The start time for the runs to be listed. + :param until: The end time for the runs to be listed. + :param limit: The maximum number of runs to be listed. + :param offset: The offset for pagination. + :param statuses: The statuses of the runs to be listed. + :param additional_metadata: Additional metadata for filtering the runs. + :param worker_id: The ID of the worker that ran the tasks. + :param parent_task_external_id: The external ID of the parent task. + + :returns: A list of `V1TaskSummary` objects representing the runs of the workflow. + """ + workflows = self.client.workflows.list(workflow_name=self._workflow.name) + + if not workflows.rows: + logger.warning(f"No runs found for {self.name}") + return [] + + workflow = workflows.rows[0] + + response = self.client.runs.list( + workflow_ids=[workflow.metadata.id], + since=since or datetime.now() - timedelta(days=1), + only_tasks=True, + offset=offset, + limit=limit, + statuses=statuses, + until=until, + additional_metadata=additional_metadata, + worker_id=worker_id, + parent_task_external_id=parent_task_external_id, + ) + + return response.rows + + async def aio_list_runs( + self, + since: datetime | None = None, + until: datetime | None = None, + limit: int = 100, + offset: int | None = None, + statuses: list[V1TaskStatus] | None = None, + additional_metadata: dict[str, str] | None = None, + worker_id: str | None = None, + parent_task_external_id: str | None = None, + ) -> list[V1TaskSummary]: + """ + List runs of the workflow. + + :param since: The start time for the runs to be listed. + :param until: The end time for the runs to be listed. + :param limit: The maximum number of runs to be listed. + :param offset: The offset for pagination. + :param statuses: The statuses of the runs to be listed. + :param additional_metadata: Additional metadata for filtering the runs. + :param worker_id: The ID of the worker that ran the tasks. + :param parent_task_external_id: The external ID of the parent task. + + :returns: A list of `V1TaskSummary` objects representing the runs of the workflow. + """ + return await asyncio.to_thread( + self.list_runs, + since=since or datetime.now() - timedelta(days=1), + offset=offset, + limit=limit, + statuses=statuses, + until=until, + additional_metadata=additional_metadata, + worker_id=worker_id, + parent_task_external_id=parent_task_external_id, + ) diff --git a/sdks/python/hatchet_sdk/runnables/workflow.py b/sdks/python/hatchet_sdk/runnables/workflow.py index bb896b947..aab134d23 100644 --- a/sdks/python/hatchet_sdk/runnables/workflow.py +++ b/sdks/python/hatchet_sdk/runnables/workflow.py @@ -1,5 +1,5 @@ import asyncio -from datetime import datetime +from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, Callable, Generic, cast from google.protobuf import timestamp_pb2 @@ -11,6 +11,8 @@ from hatchet_sdk.clients.admin import ( WorkflowRunTriggerConfig, ) from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows +from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus +from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary from hatchet_sdk.context.context import Context, DurableContext from hatchet_sdk.contracts.v1.workflows_pb2 import ( CreateWorkflowVersionRequest, @@ -624,7 +626,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]): [Callable[[TWorkflowInput, DurableContext], R]], Task[TWorkflowInput, R] ]: """ - A decorator to transform a function into a durable Hatchet task that run as part of a workflow. + A decorator to transform a function into a durable Hatchet task that runs as part of a workflow. **IMPORTANT:** This decorator creates a _durable_ task, which works using Hatchet's durable execution capabilities. This is an advanced feature of Hatchet. @@ -846,3 +848,93 @@ class Workflow(BaseWorkflow[TWorkflowInput]): self._on_success_task = _task case _: raise ValueError("Invalid task type") + + def list_runs( + self, + since: datetime | None = None, + until: datetime | None = None, + limit: int = 100, + offset: int | None = None, + statuses: list[V1TaskStatus] | None = None, + additional_metadata: dict[str, str] | None = None, + worker_id: str | None = None, + parent_task_external_id: str | None = None, + only_tasks: bool = False, + ) -> list[V1TaskSummary]: + """ + List runs of the workflow. + + :param since: The start time for the runs to be listed. + :param until: The end time for the runs to be listed. + :param limit: The maximum number of runs to be listed. + :param offset: The offset for pagination. + :param statuses: The statuses of the runs to be listed. + :param additional_metadata: Additional metadata for filtering the runs. + :param worker_id: The ID of the worker that ran the tasks. + :param parent_task_external_id: The external ID of the parent task. + :param only_tasks: Whether to list only task runs. + + :returns: A list of `V1TaskSummary` objects representing the runs of the workflow. + """ + workflows = self.client.workflows.list(workflow_name=self.name) + + if not workflows.rows: + logger.warning(f"No runs found for {self.name}") + return [] + + workflow = workflows.rows[0] + + response = self.client.runs.list( + workflow_ids=[workflow.metadata.id], + since=since or datetime.now() - timedelta(days=1), + only_tasks=only_tasks, + offset=offset, + limit=limit, + statuses=statuses, + until=until, + additional_metadata=additional_metadata, + worker_id=worker_id, + parent_task_external_id=parent_task_external_id, + ) + + return response.rows + + async def aio_list_runs( + self, + since: datetime | None = None, + until: datetime | None = None, + limit: int = 100, + offset: int | None = None, + statuses: list[V1TaskStatus] | None = None, + additional_metadata: dict[str, str] | None = None, + worker_id: str | None = None, + parent_task_external_id: str | None = None, + only_tasks: bool = False, + ) -> list[V1TaskSummary]: + """ + List runs of the workflow. + + :param since: The start time for the runs to be listed. + :param until: The end time for the runs to be listed. + :param limit: The maximum number of runs to be listed. + :param offset: The offset for pagination. + :param statuses: The statuses of the runs to be listed. + :param additional_metadata: Additional metadata for filtering the runs. + :param worker_id: The ID of the worker that ran the tasks. + :param parent_task_external_id: The external ID of the parent task. + :param only_tasks: Whether to list only task runs. + + :returns: A list of `V1TaskSummary` objects representing the runs of the workflow. + """ + return await asyncio.to_thread( + self.list_runs, + since=since or datetime.now() - timedelta(days=1), + only_tasks=only_tasks, + offset=offset, + limit=limit, + statuses=statuses, + until=until, + additional_metadata=additional_metadata, + worker_id=worker_id, + parent_task_external_id=parent_task_external_id, + ) diff --git a/sdks/python/hatchet_sdk/worker/worker.py b/sdks/python/hatchet_sdk/worker/worker.py index 18901255a..bf23605cd 100644 --- a/sdks/python/hatchet_sdk/worker/worker.py +++ b/sdks/python/hatchet_sdk/worker/worker.py @@ -147,7 +147,7 @@ class Worker: def register_workflow_from_opts(self, opts: CreateWorkflowVersionRequest) -> None: try: - self.client.admin.put_workflow(opts.name, opts) + self.client.admin.put_workflow(opts) except Exception as e: logger.error(f"failed to register workflow: {opts.name}") logger.error(e) @@ -159,11 +159,8 @@ class Worker: "workflow must have at least one task registered before registering" ) - opts = workflow.to_proto() - name = workflow.name - try: - self.client.admin.put_workflow(name, opts) + self.client.admin.put_workflow(workflow.to_proto()) except Exception as e: logger.error(f"failed to register workflow: {workflow.name}") logger.error(e) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 789f802ce..f1adab4e8 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "1.6.5" +version = "1.7.0" description = "" authors = ["Alexander Belanger "] readme = "README.md" diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json index 653244cd3..c0d227fcb 100644 --- a/sdks/typescript/package.json +++ b/sdks/typescript/package.json @@ -1,6 +1,6 @@ { "name": "@hatchet-dev/typescript-sdk", - "version": "1.4.2", + "version": "1.5.0", "description": "Background task orchestration & visibility for developers", "types": "dist/index.d.ts", "files": [ diff --git a/sdks/typescript/src/clients/rest/generated/Api.ts b/sdks/typescript/src/clients/rest/generated/Api.ts index 4271b2150..e8eba8fbe 100644 --- a/sdks/typescript/src/clients/rest/generated/Api.ts +++ b/sdks/typescript/src/clients/rest/generated/Api.ts @@ -1718,6 +1718,10 @@ export class Api extends HttpClient