Matt Kaye 08bd27a869 Feat: Dynamic (Event) Filters (#1704)
* feat: add events tables

* fix: tweak PK

* feat: migration

* feat: gen models

* fix: add external id col + index

* fix: uuid pk

* fix: types

* chore: gen

* feat: add index

* Feat: Write events into OLAP tables (#1634)

* feat: query for event creation

* feat: olap impl

* feat: wire up the olap event write

* feat: goroutine?

* feat: start wiring up inserts to triggers

* fix: no `RETURNING`

* fix: hack

* fix: inner join

* feat: attempt 2

* fix: return errors

* chore: lint

* fix: diff

* feat: add new partitions

* fix: eof

* fix: write external ids into table

* chore: gen

* fix: wiring

* fix: event deduping

* fix: insert in bulk

* fix: bug

* refactor: return type of trigger

* fix: unnest ids

* fix: unnest tenant ids

* fix: run ids in bulk insert

* feat: two bulk inserts, one tx

* fix: cruft

* fix: bug

* Update pkg/repository/v1/olap.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: rework to avoid n^2 loop

* fix: transaction timeout

* fix: lint

* fix: use error

* fix: rm penultimate version

* fix: rm penultimate test part ii

* Feat: CEL-based filtering of events (#1676)

* feat: add optional expression to workflow trigger event ref

* feat: proto field for expression

* feat: write and parse the expression

* feat: wire up through put workflow ver request

* feat: query

* fix: naming

* fix: cleanup

* fix: rebase

* Update pkg/repository/v1/trigger.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: skip workflow on cel eval failure

* fix: zero value

* fix: cel evaluator

* fix: usage

* fix: naming + type

* fix: rm event filter from v0 defn

* feat: tests + fix typing

* fix: usage

* fix: construct input

* feat: always write events

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: select existing partitions

* feat: add prio to push event request

* feat: priority from pushed events

* fix: missed a spot

* fix: write events even if they're not tied to any workflows

* fix: revert cel event filtering

* fix: couple more

* fix: simplify

* feat: filters api

* feat: table for storing filters

* feat: migration

* fix: pk ordering

* feat: wiring up initial api

* feat: impl filter list

* feat: wire up inserts of filters

* feat: add resource hint to push event

* feat: multi-select filters

* feat: wire up resource hint on event push

* feat: filtering

* fix: small bug

* fix: rm version id

* fix: query

* fix: panic

* fix: schema

* fix: naming

* fix: rm python changes

* chore: lint

* fix: uuid hint

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: use overwrite for listing filters b/c of nullable arg

* fix: naming

* feat: add events tables

* fix: tweak PK

* feat: migration

* feat: gen models

* fix: add external id col + index

* fix: uuid pk

* fix: types

* chore: gen

* feat: add index

* Feat: Write events into OLAP tables (#1634)

* feat: query for event creation

* feat: olap impl

* feat: wire up the olap event write

* feat: goroutine?

* feat: start wiring up inserts to triggers

* fix: no `RETURNING`

* fix: hack

* fix: inner join

* feat: attempt 2

* fix: return errors

* chore: lint

* fix: diff

* feat: add new partitions

* fix: eof

* fix: write external ids into table

* chore: gen

* fix: wiring

* fix: event deduping

* fix: insert in bulk

* fix: bug

* refactor: return type of trigger

* fix: unnest ids

* fix: unnest tenant ids

* fix: run ids in bulk insert

* feat: two bulk inserts, one tx

* fix: cruft

* fix: bug

* Update pkg/repository/v1/olap.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: rework to avoid n^2 loop

* fix: transaction timeout

* fix: lint

* fix: use error

* fix: rm penultimate version

* fix: rm penultimate test part ii

* Feat: CEL-based filtering of events (#1676)

* feat: add optional expression to workflow trigger event ref

* feat: proto field for expression

* feat: write and parse the expression

* feat: wire up through put workflow ver request

* feat: query

* fix: naming

* fix: cleanup

* fix: rebase

* Update pkg/repository/v1/trigger.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: skip workflow on cel eval failure

* fix: zero value

* fix: cel evaluator

* fix: usage

* fix: naming + type

* fix: rm event filter from v0 defn

* feat: tests + fix typing

* fix: usage

* fix: construct input

* feat: always write events

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: select existing partitions

* feat: add prio to push event request

* feat: priority from pushed events

* fix: missed a spot

* fix: write events even if they're not tied to any workflows

* fix: revert cel event filtering

* fix: couple more

* fix: simplify

* fix: gen api

* fix: merge issues

* feat: filter delete endpoint

* fix: overwrite

* fix: delete filter api wiring

* fix: migration patch

* chore: gen

* fix: merge hell

* Revert "Revert "Feat: Events in the OLAP Repo (#1633)" (#1706)"

This reverts commit bf29269a27.

* Feat: Events Frontend (#1678)

* feat: add events tables

* fix: tweak PK

* feat: migration

* feat: gen models

* fix: add external id col + index

* fix: uuid pk

* fix: types

* chore: gen

* feat: add index

* Feat: Write events into OLAP tables (#1634)

* feat: query for event creation

* feat: olap impl

* feat: wire up the olap event write

* feat: goroutine?

* feat: start wiring up inserts to triggers

* fix: no `RETURNING`

* fix: hack

* fix: inner join

* feat: attempt 2

* fix: return errors

* chore: lint

* fix: diff

* feat: add new partitions

* fix: eof

* fix: write external ids into table

* chore: gen

* fix: wiring

* fix: event deduping

* fix: insert in bulk

* fix: bug

* refactor: return type of trigger

* fix: unnest ids

* fix: unnest tenant ids

* fix: run ids in bulk insert

* feat: two bulk inserts, one tx

* fix: cruft

* fix: bug

* Update pkg/repository/v1/olap.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: rework to avoid n^2 loop

* fix: transaction timeout

* fix: lint

* fix: use error

* fix: rm penultimate version

* fix: rm penultimate test part ii

* Feat: CEL-based filtering of events (#1676)

* feat: add optional expression to workflow trigger event ref

* feat: proto field for expression

* feat: write and parse the expression

* feat: wire up through put workflow ver request

* feat: query

* fix: naming

* fix: cleanup

* fix: rebase

* Update pkg/repository/v1/trigger.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: skip workflow on cel eval failure

* fix: zero value

* fix: cel evaluator

* fix: usage

* fix: naming + type

* fix: rm event filter from v0 defn

* feat: tests + fix typing

* fix: usage

* fix: construct input

* feat: always write events

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: select existing partitions

* feat: add prio to push event request

* feat: priority from pushed events

* fix: missed a spot

* fix: write events even if they're not tied to any workflows

* fix: revert cel event filtering

* fix: couple more

* fix: simplify

* feat: initial API work

* chore: gen ts

* feat: fe skeleton

* feat: wiring up skeleton data

* feat: hook

* fix: bugs

* fix: lint on gen

* fix: couple more

* feat: wire up counts

* feat: initial events cols + styling

* feat: layout

* feat: styling

* fix: cleanup

* feat: use external ids on the FE

* fix: separate openapi spec for new events route

* fix: required param

* fix: update queries and api

* feat: event detail

* fix: page

* fix: rebase

* tweak: table

* feat: add events page to sidebar

* feat: modify queries to allow fetching by triggering event

* feat: add triggering event id to api

* chore: lint

* feat: wire up events api

* fix: rm log

* fix: gen

* feat: wire up status counts

* fix: rm time series

* fix: rm state

* fix: lint

* fix: eof

* chore: lint

* feat: wire up filters

* fix: lint

* chore: api gen

* feat: add events tables

* fix: tweak PK

* feat: migration

* feat: gen models

* fix: add external id col + index

* fix: uuid pk

* fix: types

* chore: gen

* feat: add index

* Feat: Write events into OLAP tables (#1634)

* feat: query for event creation

* feat: olap impl

* feat: wire up the olap event write

* feat: goroutine?

* feat: start wiring up inserts to triggers

* fix: no `RETURNING`

* fix: hack

* fix: inner join

* feat: attempt 2

* fix: return errors

* chore: lint

* fix: diff

* feat: add new partitions

* fix: eof

* fix: write external ids into table

* chore: gen

* fix: wiring

* fix: event deduping

* fix: insert in bulk

* fix: bug

* refactor: return type of trigger

* fix: unnest ids

* fix: unnest tenant ids

* fix: run ids in bulk insert

* feat: two bulk inserts, one tx

* fix: cruft

* fix: bug

* Update pkg/repository/v1/olap.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: rework to avoid n^2 loop

* fix: transaction timeout

* fix: lint

* fix: use error

* fix: rm penultimate version

* fix: rm penultimate test part ii

* Feat: CEL-based filtering of events (#1676)

* feat: add optional expression to workflow trigger event ref

* feat: proto field for expression

* feat: write and parse the expression

* feat: wire up through put workflow ver request

* feat: query

* fix: naming

* fix: cleanup

* fix: rebase

* Update pkg/repository/v1/trigger.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: skip workflow on cel eval failure

* fix: zero value

* fix: cel evaluator

* fix: usage

* fix: naming + type

* fix: rm event filter from v0 defn

* feat: tests + fix typing

* fix: usage

* fix: construct input

* feat: always write events

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: select existing partitions

* feat: add prio to push event request

* feat: priority from pushed events

* fix: missed a spot

* fix: write events even if they're not tied to any workflows

* fix: revert cel event filtering

* fix: couple more

* fix: simplify

* fix: gen api

* fix: gen

* fix: more merge issues

* chore: gen

* fix: lockfile

* fix: merge issues

* chore: gen again

* fix: rm unused fields from openapi spec

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: migration ver

* fix: insert trigger, event types

* fix: bunch o refs

* fix: migration

* fix: queries

* fix: finish wiring up inserts

* fix: misc bugs

* fix: fe filtering

* chore: gen

* fix: migration ver

* chore: lint

* fix: missed a couple

* fix: whitespace

* fix: formatting, gen

* fix: uuid id for filters

* feat: rest api for filters

* fix: tag

* feat: add event id, payload, and metadata to cel env

* fix: rename resource hint to scope

* chore: gen

* fix: same conflict, different day

* feat: send filter payload with input

* fix: lint

* fix: invert filter to "positive" case

* fix: naming

* feat: send payloads back with trigger data

* fix: add case to check if no filters were found

* fix: send additional meta back on bulk push

* fix: cleanup from self review

* fix: more small cleanup

* feat: initial pr feedback

* feat: validation

* fix: populator

* Feat: SDK changes for event filtering (#1677)

* feat: query for event creation

* feat: wire up the olap event write

* feat: goroutine?

* feat: start wiring up inserts to triggers

* fix: no `RETURNING`

* fix: hack

* fix: inner join

* feat: attempt 2

* fix: return errors

* fix: diff

* feat: add new partitions

* fix: write external ids into table

* chore: gen

* fix: wiring

* fix: event deduping

* fix: insert in bulk

* fix: unnest ids

* fix: run ids in bulk insert

* feat: two bulk inserts, one tx

* fix: rework to avoid n^2 loop

* feat: proto field for expression

* fix: rm event filter from v0 defn

* chore: gen python

* Revert "fix: select existing partitions"

This reverts commit fefbdd5122b85c5d807c3dce3aed7d974f01a7d8.

* fix: rebase hell

* feat: prio

* chore: docs

* feat: gen ts

* feat: ts wiring

* feat: go

* fix: prio test bug

* chore: gen

* fix: validation bug

* feat: extend events client

* feat: e2e test

* chore: docs

* fix: test

* fix: unwind event filter

* fix: rm should skip

* chore: gen

* chore: gen

* chore: gen

* feat: resource hints + more e2e tests

* fix: use `cached_property` for id

* fix: raises

* fix: rm print cruft

* feat: wiring + e2e test

* chore: gen

* feat: wire up python sdk

* feat: static payload in test

* fix: use test run id in payload

* fix: longer sleeps

* feat: more tests

* feat: intermediate work

* feat: add validator for filter payload

* fix: rm cruft

* fix: event example

* fix: event example

* fix: e2e tests

* fix: finish cleaning up tests

* fix: __hash__ method

* fix: copilot comments!

* fix: apply namespaces to workflow names in a couple of the feature clients

* fix: handle case where namespace is falsey

* refactor: factor out

* fix: all the other refs to namespaced things

* fix: put `apply_namespace` on the client config

* fix: namespace overrides

* fix: implicitly use function name for hatchet task

* fix: name

* fix: refs

* chore: ver

* fix: durable tests

* feat: add a changelog!

* fix: changelog format

* feat: start wiring up filters in ts

* feat: scopes on event push

* feat: wire up triggering event id filter

* feat: initial ts e2e test work

* fix: run the test

* fix: drive-by bug in AI-gen slop

* fix: test naming

* feat: more test cleanup

* fix: api

* fix: ns override

* fix: test + API

* fix: lint

* fix: cruft

* chore: gen

* fix: dont run in ci?

* fix: wire up Go SDK

* fix: compiler

* fix: examples

* fix: event snippet, I think

* chore: docs

* Revert "fix: examples"

This reverts commit cbf33d6299.

* Revert "fix: compiler"

This reverts commit 52336abeb2.

* fix: add push opt funcs for prio and scope

* chore: ver

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-05-16 15:44:28 -04:00
2025-05-16 15:44:28 -04:00
2025-05-05 08:17:29 -07:00
2025-05-16 15:44:28 -04:00
2025-05-16 15:44:28 -04:00
2025-05-05 08:17:29 -07:00

Hatchet Logo

Run Background Tasks at Scale

Docs License: MIT Go Reference NPM Downloads

Discord Twitter GitHub Repo stars

Hatchet Cloud · Documentation · Website · Issues

What is Hatchet?

Hatchet is a platform for running background tasks, built on top of Postgres. Instead of managing your own task queue or pub/sub system, you can use Hatchet to distribute your functions between a set of workers with minimal configuration or infrastructure.

When should I use Hatchet?

Background tasks are critical for offloading work from your main web application. Usually background tasks are sent through a FIFO (first-in-first-out) queue, which helps guard against traffic spikes (queues can absorb a lot of load) and ensures that tasks are retried when your task handlers error out. Most stacks begin with a library-based queue backed by Redis or RabbitMQ (like Celery or BullMQ). But as your tasks become more complex, these queues become difficult to debug, monitor and start to fail in unexpected ways.

This is where Hatchet comes in. Hatchet is a full-featured background task management platform, with built-in support for chaining complex tasks together into workflows, alerting on failures, making tasks more durable, and viewing tasks in a real-time web dashboard.

Features

📥 Queues

Hatchet is built on a durable task queue that enqueues your tasks and sends them to your workers at a rate that your workers can handle. Hatchet will track the progress of your task and ensure that the work gets completed (or you get alerted), even if your application crashes.

This is particularly useful for:

  • Ensuring that you never drop a user request
  • Flattening large spikes in your application
  • Breaking large, complex logic into smaller, reusable tasks

Read more ➶

  • Python
    # 1. Define your task input
    class SimpleInput(BaseModel):
        message: str
    
    # 2. Define your task using hatchet.task
    @hatchet.task(name="SimpleWorkflow", input_validator=SimpleInput)
    def simple(input: SimpleInput, ctx: Context) -> dict[str, str]:
        return {
          "transformed_message": input.message.lower(),
        }
    
    # 3. Register your task on your worker
    worker = hatchet.worker("test-worker", workflows=[simple])
    worker.start()
    
    # 4. Invoke tasks from your application
    simple.run(SimpleInput(message="Hello World!"))
    
  • Typescript
    // 1. Define your task input
    export type SimpleInput = {
      Message: string;
    };
    
    // 2. Define your task using hatchet.task
    export const simple = hatchet.task({
      name: "simple",
      fn: (input: SimpleInput) => {
        return {
          TransformedMessage: input.Message.toLowerCase(),
        };
      },
    });
    
    // 3. Register your task on your worker
    const worker = await hatchet.worker("simple-worker", {
      workflows: [simple],
    });
    
    await worker.start();
    
    // 4. Invoke tasks from your application
    await simple.run({
      Message: "Hello World!",
    });
    
  • Go
    // 1. Define your task input
    type SimpleInput struct {
      Message string `json:"message"`
    }
    
    // 2. Define your task using factory.NewTask
    simple := factory.NewTask(
      create.StandaloneTask{
        Name: "simple-task",
      }, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
        return &SimpleResult{
          TransformedMessage: strings.ToLower(input.Message),
        }, nil
      },
      hatchet,
    )
    
    // 3. Register your task on your worker
    worker, err := hatchet.Worker(v1worker.WorkerOpts{
      Name: "simple-worker",
      Workflows: []workflow.WorkflowBase{
        simple,
      },
    })
    
    worker.StartBlocking()
    
    // 4. Invoke tasks from your application
    simple.Run(context.Background(), SimpleInput{Message: "Hello, World!"})
    
🎻 Task Orchestration

Hatchet allows you to build complex workflows that can be composed of multiple tasks. For example, if you'd like to break a workload into smaller tasks, you can use Hatchet to create a fanout workflow that spawns multiple tasks in parallel.

Hatchet supports the following mechanisms for task orchestration:

  • DAGs (directed acyclic graphs) — pre-define the shape of your work, automatically routing the outputs of a parent task to the input of a child task. Read more ➶

  • Durable tasks — these tasks are responsible for orchestrating other tasks. They store a full history of all spawned tasks, allowing you to cache intermediate results. Read more ➶

  • Python
    # 1. Define a workflow (a workflow is a collection of tasks)
    simple = hatchet.workflow(name="SimpleWorkflow")
    
    # 2. Attach the first task to the workflow
    @simple.task()
    def task_1(input: EmptyModel, ctx: Context) -> dict[str, str]:
        print("executed task_1")
        return {"result": "task_1"}
    
    # 3. Attach the second task to the workflow, which executes after task_1
    @simple.task(parents=[task_1])
    def task_2(input: EmptyModel, ctx: Context) -> None:
        first_result = ctx.task_output(task_1)
        print(first_result)
    
    # 4. Invoke workflows from your application
    result = simple.run(input_data)
    
  • Typescript
    // 1. Define a workflow (a workflow is a collection of tasks)
    const simple = hatchet.workflow<DagInput, DagOutput>({
      name: "simple",
    });
    
    // 2. Attach the first task to the workflow
    const task1 = simple.task({
      name: "task-1",
      fn: (input) => {
        return {
          result: "task-1",
        };
      },
    });
    
    // 3. Attach the second task to the workflow, which executes after task-1
    const task2 = simple.task({
      name: "task-2",
      parents: [task1],
      fn: (input, ctx) => {
        const firstResult = ctx.getParentOutput(task1);
        console.log(firstResult);
      },
    });
    
    // 4. Invoke workflows from your application
    await simple.run({ Message: "Hello World" });
    
  • Go
    // 1. Define a workflow (a workflow is a collection of tasks)
    simple := v1.WorkflowFactory[DagInput, DagOutput](
        workflow.CreateOpts[DagInput]{
            Name: "simple-workflow",
        },
        hatchet,
    )
    
    // 2. Attach the first task to the workflow
    const task1 = simple.Task(
        task.CreateOpts[DagInput]{
            Name: "task-1",
            Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
                return &SimpleOutput{
                    Result: "task-1",
                }, nil
            },
        },
    );
    
    // 3. Attach the second task to the workflow, which executes after task-1
    const task2 = simple.Task(
        task.CreateOpts[DagInput]{
            Name: "task-2",
            Parents: []task.NamedTask{
                step1,
            },
            Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
                return &SimpleOutput{
                    Result: "task-2",
                }, nil
            },
        },
    );
    
    // 4. Invoke workflows from your application
    simple.Run(ctx, DagInput{})
    
🚦 Flow Control

Don't let busy users crash your application. With Hatchet, you can throttle execution on a per-user, per-tenant and per-queue basis, increasing system stability and limiting the impact of busy users on the rest of your system.

Hatchet supports the following flow control primitives:

  • Concurrency — set a concurrency limit based on a dynamic concurrency key (e.g., each user can only run 10 batch jobs at a given time). Read more ➶

  • Rate limiting — create both global and dynamic rate limits. Read more ➶

  • Python
    # limit concurrency on a per-user basis
    flow_control_workflow = hatchet.workflow(
      name="FlowControlWorkflow",
      concurrency=ConcurrencyExpression(
        expression="input.user_id",
        max_runs=5,
        limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
      ),
      input_validator=FlowControlInput,
    )
    
    # rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit
    @flow_control_workflow.task(
        rate_limits=[
            RateLimit(
                dynamic_key="input.user_id",
                units=1,
                limit=10,
                duration=RateLimitDuration.MINUTE,
            )
        ]
    )
    def rate_limit_task(input: FlowControlInput, ctx: Context) -> None:
        print("executed rate_limit_task")
    
  • Typescript
    // limit concurrency on a per-user basis
    flowControlWorkflow = hatchet.workflow<SimpleInput, SimpleOutput>({
      name: "ConcurrencyLimitWorkflow",
      concurrency: {
        expression: "input.userId",
        maxRuns: 5,
        limitStrategy: ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
      },
    });
    
    // rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit
    flowControlWorkflow.task({
      name: "rate-limit-task",
      rateLimits: [
        {
          dynamicKey: "input.userId",
          units: 1,
          limit: 10,
          duration: RateLimitDuration.MINUTE,
        },
      ],
      fn: async (input) => {
        return {
          Completed: true,
        };
      },
    });
    
  • Go
    // limit concurrency on a per-user basis
    flowControlWorkflow := factory.NewWorkflow[DagInput, DagResult](
      create.WorkflowCreateOpts[DagInput]{
        Name: "simple-dag",
        Concurrency: []*types.Concurrency{
          {
            Expression:    "input.userId",
            MaxRuns:       1,
            LimitStrategy: types.GroupRoundRobin,
          },
        },
      },
      hatchet,
    )
    
    // rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit
    flowControlWorkflow.Task(
      create.WorkflowTask[FlowControlInput, FlowControlOutput]{
        Name: "rate-limit-task",
        RateLimits: []*types.RateLimit{
          {
            Key:            "user-rate-limit",
            KeyExpr:        "input.userId",
            Units:          1,
            LimitValueExpr: 10,
            Duration:       types.Minute,
          },
        },
      }, func(ctx worker.HatchetContext, input FlowControlInput) (interface{}, error) {
        return &SimpleOutput{
          Step: 1,
        }, nil
      },
    )
    
📅 Scheduling

Hatchet has full support for scheduling features, including cron, one-time scheduling, and pausing execution for a time duration. This is particularly useful for:

  • Cron schedules run data pipelines, batch processes, or notification systems on a cron schedule Read more ➶

  • One-time tasks schedule a workflow for a specific time in the future Read more ➶

  • Durable sleep pause execution of a task for a specific duration Read more ➶

  • Python
    tomorrow = datetime.today() + timedelta(days=1)
    
    # schedule a task to run tomorrow
    scheduled = simple.schedule(
      tomorrow,
      SimpleInput(message="Hello, World!")
    )
    
    # schedule a task to run every day at midnight
    cron = simple.cron(
      "every-day",
      "0 0 * * *",
      SimpleInput(message="Hello, World!")
    )
    
  • Typescript
    const tomorrow = new Date(Date.now() + 1000 * 60 * 60 * 24);
    // schedule a task to run tomorrow
    const scheduled = simple.schedule(tomorrow, {
      Message: "Hello, World!",
    });
    
    // schedule a task to run every day at midnight
    const cron = simple.cron("every-day", "0 0 * * *", {
      Message: "Hello, World!",
    });
    
  • Go
    const tomorrow = time.Now().Add(24 * time.Hour);
    
    // schedule a task to run tomorrow
    simple.Schedule(ctx, tomorrow, ScheduleInput{
      Message: "Hello, World!",
    })
    
    // schedule a task to run every day at midnight
    simple.Cron(ctx, "every-day", "0 0 * * *", CronInput{
      Message: "Hello, World!",
    })
    
🚏 Task routing

While the default Hatchet behavior is to implement a FIFO queue, it also supports additional scheduling mechanisms to route your tasks to the ideal worker.

  • Sticky assignment — allows spawned tasks to prefer or require execution on the same worker. Read more ➶

  • Worker affinity — ranks workers to discover which is best suited to handle a given task. Read more ➶

  • Python
    # create a workflow which prefers to run on the same worker, but can be
    # scheduled on any worker if the original worker is busy
    hatchet.workflow(
      name="StickyWorkflow",
      sticky=StickyStrategy.SOFT,
    )
    
    # create a workflow which must run on the same worker
    hatchet.workflow(
      name="StickyWorkflow",
      sticky=StickyStrategy.HARD,
    )
    
  • Typescript
    // create a workflow which prefers to run on the same worker, but can be
    // scheduled on any worker if the original worker is busy
    hatchet.workflow({
      name: "StickyWorkflow",
      sticky: StickyStrategy.SOFT,
    });
    
    // create a workflow which must run on the same worker
    hatchet.workflow({
      name: "StickyWorkflow",
      sticky: StickyStrategy.HARD,
    });
    
  • Go
    // create a workflow which prefers to run on the same worker, but can be
    // scheduled on any worker if the original worker is busy
    factory.NewWorkflow[StickyInput, StickyOutput](
      create.WorkflowCreateOpts[StickyInput]{
        Name: "sticky-dag",
        StickyStrategy: types.StickyStrategy_SOFT,
      },
      hatchet,
    );
    
    // create a workflow which must run on the same worker
    factory.NewWorkflow[StickyInput, StickyOutput](
      create.WorkflowCreateOpts[StickyInput]{
        Name: "sticky-dag",
        StickyStrategy: types.StickyStrategy_HARD,
      },
      hatchet,
    );
    
Event triggers and listeners

Hatchet supports event-based architectures where tasks and workflows can pause execution while waiting for a specific external event. It supports the following features:

  • Event listening — tasks can be paused until a specific event is triggered. Read more ➶

  • Event triggering — events can trigger new workflows or steps in a workflow. Read more ➶

  • Python
    # Create a task which waits for an external user event or sleeps for 10 seconds
    @dag_with_conditions.task(
      parents=[first_task],
      wait_for=[
        or_(
          SleepCondition(timedelta(seconds=10)),
          UserEventCondition(event_key="user:event"),
        )
      ]
    )
    def second_task(input: EmptyModel, ctx: Context) -> dict[str, str]:
        return {"completed": "true"}
    
  • Typescript
    // Create a task which waits for an external user event or sleeps for 10 seconds
    dagWithConditions.task({
      name: "secondTask",
      parents: [firstTask],
      waitFor: Or({ eventKey: "user:event" }, { sleepFor: "10s" }),
      fn: async (_, ctx) => {
        return {
          Completed: true,
        };
      },
    });
    
  • Go
    // Create a task which waits for an external user event or sleeps for 10 seconds
    simple.Task(
      conditionOpts{
        Name: "Step2",
        Parents: []create.NamedTask{
          step1,
        },
        WaitFor: condition.Conditions(
          condition.UserEventCondition("user:event", "'true'"),
          condition.SleepCondition(10 * time.Second),
        ),
      }, func(ctx worker.HatchetContext, input DagWithConditionsInput) (interface{}, error) {
        // ...
      },
    );
    
🖥️ Real-time Web UI

Hatchet comes bundled with a number of features to help you monitor your tasks, workflows, and queues.

Real-time dashboards and metrics

Monitor your tasks, workflows, and queues with live updates to quickly detect issues. Alerting is built in so you can respond to problems as soon as they occur.

https://github.com/user-attachments/assets/b1797540-c9da-4057-b50f-4780f52a2cb9

Logging

Hatchet supports logging from your tasks, allowing you to easily correlate task failures with logs in your system. No more digging through your logging service to figure out why your tasks failed.

https://github.com/user-attachments/assets/427c15cd-8842-4b54-ab2e-3b1cabc01c7b

Alerting

Hatchet supports Slack and email-based alerting for when your tasks fail. Alerts are real-time with adjustable alerting windows.

Quick Start

Hatchet is available as a cloud version or self-hosted. See the following docs to get up and running quickly:

Documentation

The most up-to-date documentation can be found at https://docs.hatchet.run.

Community & Support

  • Discord - best for getting in touch with the maintainers and hanging with the community
  • Github Issues - used for filing bug reports
  • Github Discussions - used for starting in-depth technical discussions that are suited for asynchronous communication
  • Email - best for getting Hatchet Cloud support and for help with billing, data deletion, etc.

Hatchet vs...

Hatchet vs Temporal

Hatchet is designed to be a general-purpose task orchestration platform -- it can be used as a queue, a DAG-based orchestrator, a durable execution engine, or all three. As a result, Hatchet covers a wider array of use-cases, like multiple queueing strategies, rate limiting, DAG features, conditional triggering, streaming features, and much more.

Temporal is narrowly focused on durable execution, and supports a wider range of database backends and result stores, like Apache Cassandra, MySQL, PostgreSQL, and SQLite.

When to use Hatchet: when you'd like to get more control over the underlying queue logic, run DAG-based workflows, or want to simplify self-hosting by only running the Hatchet engine and Postgres.

When to use Temporal: when you'd like to use a non-Postgres result store, or your only workload is best suited for durable execution.

Hatchet vs Task Queues (BullMQ, Celery)

Hatchet is a durable task queue, meaning it persists the history of all executions (up to a retention period), which allows for easy monitoring + debugging and powers a bunch of the durability features above. This isnt the standard behavior of Celery and BullMQ (and you need to rely on third-party UI tools which are extremely limited in functionality, like Celery Flower).

When to use Hatchet: when you'd like results to be persisted and observable in a UI

When to use task queue library like BullMQ/Celery: when you need very high throughput (>10k/s) without retention, or when you'd like to use a single library (instead of a standalone service like Hatchet) to interact with your queue.

Hatchet vs DAG-based platforms (Airflow, Prefect, Dagster)

These tools are usually built with data engineers in mind, and arent designed to run as part of a high-volume application. Theyre usually higher latency and higher cost, with their primary selling point being integrations with common datastores and connectors.

When to use Hatchet: when you'd like to use a DAG-based framework, write your own integrations and functions, and require higher throughput (>100/s)

When to use other DAG-based platforms: when you'd like to use other data stores and connectors that work out of the box

Hatchet vs AI Frameworks

Most AI frameworks are built to run in-memory, with horizontal scaling and durability as an afterthought. While you can use an AI framework in conjunction with Hatchet, most of our users discard their AI framework and use Hatchets primitives to build their applications.

When to use Hatchet: when you'd like full control over your underlying functions and LLM calls, or you require high availability and durability for your functions.

When to use an AI framework: when you'd like to get started quickly with simple abstractions.

Issues

Please submit any bugs that you encounter via Github issues.

I'd Like to Contribute

Please let us know what you're interesting in working on in the #contributing channel on Discord. This will help us shape the direction of the project and will make collaboration much easier!

Description
🪓 Run Background Tasks at Scale
Readme MIT 215 MiB
Languages
Go 27.8%
Python 25.1%
Ruby 20.2%
TypeScript 17.5%
MDX 5%
Other 4.3%