Files
hatchet/sdks/python/examples/webhook_with_scope/worker.py
Jishnu ed43cae0a2 feat: Extend webhook support for scope_expression and payload (#2874)
* add: scope_expression and payload columns for v1_webhook

* refactor: insert or update sql cmds for v1_webhook

* feat: update api clients, openapi schema for new webhook body

* refactor: receiver and transformer for v1 webhook

* add: python sdk changes

* feat: ts sdk changes

* feat: add FE for webhook new params

* fix: scope expression empty payload

* add: support for scope and payload for go client

* fix: lint

* fix: error message UI on webhook

* fix: lint

* fix: migraiton conflict, build failure

* fix: error handling

* update docs, add tests

* fix: lint, test file name
2026-02-04 12:44:52 -05:00

65 lines
1.4 KiB
Python

from typing import Any
from pydantic import BaseModel
from hatchet_sdk import Context, DefaultFilter, Hatchet
hatchet = Hatchet(debug=True)
class WebhookInputWithScope(BaseModel):
type: str
message: str
scope: str | None = None
class WebhookInputWithStaticPayload(BaseModel):
type: str
message: str
source: str | None = None
environment: str | None = None
metadata: dict[str, Any] | None = None
tags: list[str] | None = None
customer_id: str | None = None
processed: bool | None = None
@hatchet.task(
input_validator=WebhookInputWithScope,
on_events=["webhook-scope:test"],
default_filters=[
DefaultFilter(
expression="true",
scope="test-scope-value",
payload={},
)
],
)
def webhook_with_scope(input: WebhookInputWithScope, ctx: Context) -> dict[str, Any]:
return input.model_dump()
@hatchet.task(
input_validator=WebhookInputWithStaticPayload,
on_events=["webhook-static:test"],
)
def webhook_with_static_payload(
input: WebhookInputWithStaticPayload, ctx: Context
) -> dict[str, Any]:
return input.model_dump()
def main() -> None:
worker = hatchet.worker(
"webhook-scope-worker",
workflows=[
webhook_with_scope,
webhook_with_static_payload,
],
)
worker.start()
if __name__ == "__main__":
main()