From 0823ae33163ccb93258658164ecfa373ed2fac81 Mon Sep 17 00:00:00 2001 From: Matt Kaye Date: Tue, 15 Apr 2025 17:21:31 -0400 Subject: [PATCH] Hatchet Python Blog Post (#1526) * feat: initial pass at first parts of blog post * feat: initial mkdocs setup * feat: first pass at embedding mkdocs * fix: config * debug: paths * fix: unwind docs hack * feat: start working on mkdocs theme * fix: paths * feat: wrap up post * fix: proof * fix: doc links * fix: rm docs * fix: lint * fix: lint * fix: typos + tweak * fix: tweaks * fix: typo * fix: cleanup --- frontend/docs/pages/blog/_meta.js | 8 +- .../pages/blog/task-queue-modern-python.mdx | 141 ++++++++++++++++++ sdks/python/examples/child/trigger.py | 10 ++ sdks/python/examples/child/worker.py | 9 +- 4 files changed, 162 insertions(+), 6 deletions(-) create mode 100644 frontend/docs/pages/blog/task-queue-modern-python.mdx diff --git a/frontend/docs/pages/blog/_meta.js b/frontend/docs/pages/blog/_meta.js index b5c738b04..7aefe7b9c 100644 --- a/frontend/docs/pages/blog/_meta.js +++ b/frontend/docs/pages/blog/_meta.js @@ -1,4 +1,7 @@ export default { + "task-queue-modern-python": { + "title": "A task queue for modern Python applications" + }, "postgres-events-table": { "title": "Use Postgres for your events table" }, @@ -6,9 +9,10 @@ export default { "title": "Why we moved off Prisma" }, "problems-with-celery": { - "title": "The problems with Celery" + "title": "The problems with Celery", + "display": "hidden", }, "multi-tenant-queues": { "title": "An unfair advantage: multi-tenant queues in Postgres" - } + }, } diff --git a/frontend/docs/pages/blog/task-queue-modern-python.mdx b/frontend/docs/pages/blog/task-queue-modern-python.mdx new file mode 100644 index 000000000..cdc1d9a4a --- /dev/null +++ b/frontend/docs/pages/blog/task-queue-modern-python.mdx @@ -0,0 +1,141 @@ +import DynamicLottie from "../../components/DynamicLottie"; +import * as prefetch from "./_celery_prefetch.json"; +import { Callout } from "nextra/components"; +import { GithubSnippet, getSnippets } from "@/components/code"; + +export const ChildWorker = { + path: "examples/child/worker.py", +}; +export const ChildTrigger = { + path: "examples/child/trigger.py", +}; +export const Cron = { + path: "examples/cron/programatic-sync.py", +}; + +export const getStaticProps = ({}) => + getSnippets([ChildWorker, ChildTrigger, Cron]); + +# **A task queue for modern Python applications** + +
+
+ Matt Kaye +
+

Published on April 10th, 2025

+
+ +_**Disclosure:** I'm an engineer at [Hatchet](https://hatchet.run), a multi-language task queue with Python support. We're [open-source](https://github.com/hatchet-dev/hatchet) (with a cloud version) and we aim to be a drop-in replacement for Celery that supports a modern Python stack._ + +## What is Hatchet? + +Hatchet is a platform for running background tasks, similar to Celery and RQ. We're striving to provide all of the features that you're familiar with, but built around modern Python features and with improved support for observability, chaining tasks together, and durable execution. + +## Modern Python Features + +Modern Python applications often make heavy use of (relatively) new features and tooling that have emerged in Python over the past decade or so. Two of the most widespread are: + +1. The proliferation of type hints, adoption of type checkers like [Mypy](https://mypy-lang.org/) and [Pyright](https://microsoft.github.io/pyright/#/), and growth in popularity of tools like [Pydantic](https://docs.pydantic.dev/latest/) and [attrs](https://www.attrs.org/en/stable/) that lean on them. +2. The adoption of `async` / `await`. + +These two sets of features have also played a role in the explosion of [FastAPI](https://fastapi.tiangolo.com/), which has quickly become one of the most, if not _the_ most, popular web frameworks in Python. + + + If you aren't familiar with FastAPI, I'd recommending skimming through the + documentation to get a sense of some of its features, and on how heavily it + relies on Pydantic and `async` / `await` for building type-safe, performant + web applications. + + +Hatchet's Python SDK has drawn inspiration from FastAPI and is similarly a Pydantic- and async-first way of running background tasks. + +## Pydantic + +When working with Hatchet, you can define inputs and outputs of your tasks as Pydantic models, which the SDK will then serialize and deserialize for you internally. This means that you can write a task like this: + + + +In this example, we've defined a single Hatchet task that takes a Pydantic model as input, and returns a Pydantic model as output. This means that if you want to trigger this task from somewhere else in your codebase, you can do something like this: + + + +The different flavors of `.run` methods are type-safe: The input is typed and can be statically type checked, and is also validated by Pydantic at runtime. This means that when triggering tasks, you don't need to provide a set of untyped positional or keyword arguments, like you might if using Celery. + +## Triggering task runs other ways + +#### Scheduling + +You can also _schedule_ a task for the future (similar to Celery's `eta` or `countdown` features) using the `.schedule` method: + + + +Importantly, Hatchet will not hold scheduled tasks in memory, so it's perfectly safe to schedule tasks for arbitrarily far in the future. + +#### Crons + +Finally, Hatchet also has first-class support for cron jobs. You can either create crons dynamically: + + + +Or you can define them declaratively when you create your workflow: + +```python +cron_workflow = hatchet.workflow(name="CronWorkflow", on_crons=["* * * * *"]) +``` + +Importantly, first-class support for crons in Hatchet means there's no need for a tool like [Beat](https://docs.celeryq.dev/en/latest/userguide/periodic-tasks.html#introduction) in Celery for handling scheduling periodic tasks. + +## `async` / `await` + +With Hatchet, all of your tasks can be defined as either sync or async functions, and Hatchet will run sync tasks in a non-blocking way behind the scenes. If you've worked in FastAPI, this should feel familiar. Ultimately, this gives developers using Hatchet the full power of `asyncio` in Python with no need for workarounds like increasing a `concurrency` setting on a worker in order to handle more concurrent work. + +As a simple example, you can easily run a Hatchet task that makes 10 concurrent API calls using `async` / `await` with `asyncio.gather` and `aiohttp`, as opposed to needing to run each one in a blocking fashion as its own task. For example: + +```python +import asyncio + +from aiohttp import ClientSession + +from hatchet_sdk import Context, EmptyModel, Hatchet + +hatchet = Hatchet() + + +async def fetch(session: ClientSession, url: str) -> bool: + async with session.get(url) as response: + return response.status == 200 + + +@hatchet.task(name="Fetch") +async def hello_from_hatchet(input: EmptyModel, ctx: Context) -> int: + num_requests = 10 + + async with ClientSession() as session: + tasks = [ + fetch(session, "https://docs.hatchet.run/home") for _ in range(num_requests) + ] + + results = await asyncio.gather(*tasks) + + return results.count(True) +``` + +With Hatchet, you can perform all of these requests concurrently, in a single task, as opposed to needing to e.g. enqueue a single task per request. This is more performant on your side (as the client), and also puts less pressure on the backing queue, since it needs to handle an order of magnitude fewer requests in this case. + +Support for `async` / `await` also allows you to make other parts of your codebase asynchronous as well, like database operations. In a setting where your app uses a task queue that does not support `async`, but you want to share CRUD operations between your task queue and main application, you're forced to make all of those operations synchronous. With Hatchet, this is not the case, which allows you to make use of tools like [asyncpg](https://github.com/MagicStack/asyncpg) and similar. + +## Potpourri + +Hatchet's Python SDK also has a handful of other features that make working with Hatchet in Python more enjoyable: + +1. [Lifespans](../home/lifespans.mdx) (in beta) are a feature we've borrowed from [FastAPI's feature of the same name](https://fastapi.tiangolo.com/advanced/events/) which allow you to share state like connection pools across all tasks running on a worker. +2. Hatchet's Python SDK has an [OpenTelemetry instrumentor](../home/opentelemetry) which gives you a window into how your Hatchet workers are performing: How much work they're executing, how long it's taking, and so on. + +## Thank you! + +If you've made it this far, try us out! You can get up and running in just five minutes on [Hatchet Cloud](https://cloud.onhatchet.run/). And if you'd like to learn more, you can find us: + +- On [GitHub](https://github.com/hatchet-dev/hatchet) +- On [Discord](https://hatchet.run/discord) + +Or check out [our documentation](https://docs.hatchet.run/). diff --git a/sdks/python/examples/child/trigger.py b/sdks/python/examples/child/trigger.py index 715da79f3..1ce306622 100644 --- a/sdks/python/examples/child/trigger.py +++ b/sdks/python/examples/child/trigger.py @@ -1,3 +1,5 @@ +# ruff: noqa: E402 + import asyncio # ❓ Running a Task @@ -6,6 +8,14 @@ from examples.child.worker import SimpleInput, child_task child_task.run(SimpleInput(message="Hello, World!")) # !! +# ❓ Schedule a Task +from datetime import datetime, timedelta + +child_task.schedule( + datetime.now() + timedelta(minutes=5), SimpleInput(message="Hello, World!") +) +# !! + async def main() -> None: # ❓ Running a Task AIO diff --git a/sdks/python/examples/child/worker.py b/sdks/python/examples/child/worker.py index b7deb7c95..b2462c637 100644 --- a/sdks/python/examples/child/worker.py +++ b/sdks/python/examples/child/worker.py @@ -2,12 +2,12 @@ from pydantic import BaseModel -from hatchet_sdk import Context, EmptyModel, Hatchet +from hatchet_sdk import Context, Hatchet hatchet = Hatchet(debug=True) -class SimpleInput(EmptyModel): +class SimpleInput(BaseModel): message: str @@ -24,12 +24,13 @@ def step1(input: SimpleInput, ctx: Context) -> SimpleOutput: return SimpleOutput(transformed_message=input.message.upper()) +# ‼️ + + def main() -> None: worker = hatchet.worker("test-worker", slots=1, workflows=[child_task]) worker.start() -# ‼️ - if __name__ == "__main__": main()