Feat: typescript middleware (#3066)

* feat: typed middleware

* feat: chaining

* feat: typed global input

* feat: typed global output

* feat: inferred types from middleware

* feat: with chaining

* docs: initial pass

* feat: implicit chaining

* fix: implicit spread

* docs: separate examples

* refactor: rename middleware hooks from `pre`/`post` to `before`/`after` for consistency

* fix: search

* chore: lint

* fix: tests

* Update frontend/docs/pages/home/middleware.mdx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* release: 1.13.0

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Gabe Ruttner
2026-02-23 04:16:00 -08:00
committed by GitHub
parent c97b967e2b
commit de68e1375a
24 changed files with 1390 additions and 94 deletions

View File

@@ -0,0 +1,52 @@
// > Init a client with middleware
import { HatchetClient, HatchetMiddleware } from '@hatchet-dev/typescript-sdk/v1';
export type GlobalInputType = {
first: number;
second: number;
};
export type GlobalOutputType = {
extra: number;
};
const myMiddleware = {
before: (input, ctx) => {
input.first;
return { ...input, dependency: 'abc-123' };
},
after: (output, ctx, input) => {
return { ...output, additionalData: 2 };
},
} satisfies HatchetMiddleware<GlobalInputType, GlobalOutputType>;
export const hatchetWithMiddleware = HatchetClient.init<GlobalInputType, GlobalOutputType>()
.withMiddleware(myMiddleware);
// > Chaining middleware
const firstMiddleware = {
before: (input, ctx) => {
input.first;
return { ...input, dependency: 'abc-123' };
},
after: (output, ctx, input) => {
return { ...output, firstExtra: 3 };
},
} satisfies HatchetMiddleware<GlobalInputType>;
const secondMiddleware = {
before: (input, ctx) => {
input.dependency; // available from previous middleware
return { ...input, anotherDep: true };
},
after: (output, ctx, input) => {
return { ...output, secondExtra: 4 };
},
} satisfies HatchetMiddleware<GlobalInputType & { dependency: string }>;
export const hatchetWithMiddlewareChaining = HatchetClient.init<GlobalInputType>()
.withMiddleware(firstMiddleware)
.withMiddleware(secondMiddleware);

View File

@@ -0,0 +1,98 @@
// @ts-nocheck
// These snippets demonstrate common middleware patterns.
// They reference external packages (@aws-sdk/*) that are NOT
// dependencies of the Hatchet SDK — install them in your own project.
// > End-to-end encryption
import { HatchetClient, HatchetMiddleware } from '@hatchet-dev/typescript-sdk/v1';
import { randomUUID } from 'crypto';
import { createCipheriv, createDecipheriv, randomBytes } from 'crypto';
const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.ENCRYPTION_KEY!, 'hex');
type EncryptedEnvelope = { ciphertext: string; iv: string; tag: string };
function encrypt(plaintext: string): EncryptedEnvelope {
const iv = randomBytes(16);
const cipher = createCipheriv(ALGORITHM, KEY, iv);
const encrypted = Buffer.concat([cipher.update(plaintext, 'utf8'), cipher.final()]);
return {
ciphertext: encrypted.toString('base64'),
iv: iv.toString('base64'),
tag: cipher.getAuthTag().toString('base64'),
};
}
function decrypt(ciphertext: string, iv: string, tag: string): string {
const decipher = createDecipheriv(ALGORITHM, KEY, Buffer.from(iv, 'base64'));
decipher.setAuthTag(Buffer.from(tag, 'base64'));
return decipher.update(ciphertext, 'base64', 'utf8') + decipher.final('utf8');
}
type EncryptedInput = { __encrypted?: EncryptedEnvelope };
const e2eEncryption: HatchetMiddleware<EncryptedInput> = {
before: (input) => {
if (!input.__encrypted) return input;
const { ciphertext, iv, tag } = input.__encrypted;
const decrypted = JSON.parse(decrypt(ciphertext, iv, tag));
return { ...input, ...decrypted, __encrypted: undefined };
},
after: (output) => {
const payload = JSON.stringify(output);
return { __encrypted: encrypt(payload) };
},
};
const encryptionClient = HatchetClient.init<EncryptedInput>()
.withMiddleware(e2eEncryption);
// > Offloading large payloads to S3
import { S3Client, PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
const s3 = new S3Client({ region: process.env.AWS_REGION });
const BUCKET = process.env.S3_BUCKET!;
const PAYLOAD_THRESHOLD = 256 * 1024; // 256 KB
async function uploadToS3(data: unknown): Promise<string> {
const key = `hatchet-payloads/${randomUUID()}.json`;
await s3.send(new PutObjectCommand({
Bucket: BUCKET,
Key: key,
Body: JSON.stringify(data),
ContentType: 'application/json',
}));
return getSignedUrl(s3, new GetObjectCommand({ Bucket: BUCKET, Key: key }), {
expiresIn: 3600,
});
}
async function downloadFromS3(url: string): Promise<unknown> {
const res = await fetch(url);
return res.json();
}
type S3Input = { __s3Url?: string };
const s3Offload: HatchetMiddleware<S3Input> = {
before: async (input) => {
if (input.__s3Url) {
const restored = await downloadFromS3(input.__s3Url) as Record<string, any>;
return { ...restored, __s3Url: undefined };
}
return input;
},
after: async (output) => {
const serialized = JSON.stringify(output);
if (serialized.length > PAYLOAD_THRESHOLD) {
const url = await uploadToS3(output);
return { __s3Url: url };
}
return output;
},
};
const s3Client = HatchetClient.init<S3Input>()
.withMiddleware(s3Offload);

View File

@@ -0,0 +1,18 @@
import { taskWithMiddleware } from "./workflow";
async function main() {
// > Running a task with middleware
const result = await taskWithMiddleware.run({
message: 'hello', // string (from TaskInput)
first: 1, // number (from GlobalInputType)
second: 2, // number (from GlobalInputType)
});
console.log('result', result.message); // string (from TaskOutput)
console.log('result', result.extra); // number (from GlobalOutputType)
console.log('result', result.additionalData); // number (from Post Middleware)
}
if (require.main === module) {
main();
}

View File

@@ -0,0 +1,14 @@
import { hatchetWithMiddleware } from './client';
import { taskWithMiddleware } from './workflow';
async function main() {
const worker = await hatchetWithMiddleware.worker('task-with-middleware', {
workflows: [taskWithMiddleware],
});
await worker.start();
}
if (require.main === module) {
main();
}

View File

@@ -0,0 +1,23 @@
import { hatchetWithMiddleware } from "./client";
type TaskInput = {
message: string;
};
type TaskOutput = {
message: string;
};
export const taskWithMiddleware = hatchetWithMiddleware.task<TaskInput, TaskOutput>({
name: 'task-with-middleware',
fn: (input, _ctx) => {
console.log('task', input.message); // string (from TaskInput)
console.log('task', input.first); // number (from GlobalInputType)
console.log('task', input.second); // number (from GlobalInputType)
console.log('task', input.dependency); // string (from Pre Middleware)
return {
message: input.message,
extra: 1,
};
},
});

View File

@@ -118,6 +118,9 @@ export default {
streaming: {
title: "Streaming",
},
middleware: {
title: "Middleware & Dependency Injection",
},
"--v1-migration-guides": {
title: "V1 Migration Guides",
type: "separator",
@@ -136,6 +139,5 @@ export default {
asyncio: "Asyncio",
pydantic: "Pydantic",
lifespans: "Lifespans",
"dependency-injection": "Dependency Injection",
dataclasses: "Dataclass Support",
};

View File

@@ -1,43 +0,0 @@
import { snippets } from "@/lib/generated/snippets";
import { Snippet } from "@/components/code";
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
import UniversalTabs from "@/components/UniversalTabs";
# Dependency Injection
<Callout type="error" emoji="🚨">
Dependency injection is an **experimental feature** in Hatchet, and is subject
to change.
</Callout>
Hatchet's Python SDK allows you to inject **_dependencies_** into your tasks, FastAPI style. These dependencies can be either synchronous or asynchronous functions. They are executed before the task is triggered, and their results are injected into the task as parameters.
This behaves almost identically to [FastAPI's dependency injection](https://fastapi.tiangolo.com/tutorial/dependencies/), and is intended to be used in the same way. Dependencies are useful for sharing logic between tasks that you'd like to avoid repeating, or would like to factor out of the task logic itself (e.g. to make testing easier).
<Callout type="warning" emoji="⚠️">
Since dependencies are run before tasks are executed, having many dependencies (or any that take a long time to evaluate) can cause tasks to experience significantly delayed start times, as they must wait for all dependencies to finish evaluating.
</Callout>
## Usage
To add dependencies to your tasks, import `Depends` from the `hatchet_sdk`. Then:
<Snippet
src={snippets.python.dependency_injection.worker.declare_dependencies}
/>
In this example, we've declared two dependencies: one synchronous and one asynchronous. You can do anything you like in your dependencies, such as creating database sessions, managing configuration, sharing instances of service-layer logic, and more.
Once you've defined your dependency functions, inject them into your tasks as follows:
<Snippet
src={snippets.python.dependency_injection.worker.inject_dependencies}
/>
<Callout type="warning" emoji="⚠️">
Important note: Your dependency functions must take two positional arguments:
the workflow input and the `Context` (the same as any other task).
</Callout>
That's it! Now, whenever your task is triggered, its dependencies will be evaluated, and the results will be injected into the task at runtime for you to use as needed.

View File

@@ -0,0 +1,293 @@
import { snippets } from "@/lib/generated/snippets";
import { Snippet } from "@/components/code";
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
import UniversalTabs from "@/components/UniversalTabs";
# Middleware & Dependency Injection
Middleware lets you run logic **before** and **after** every task on a client, without touching individual task definitions. Common uses include injecting request IDs, enriching inputs with shared data, encrypting/decrypting payloads, and normalizing or augmenting outputs.
<UniversalTabs items={["Python", "Typescript", "Go", "Ruby"]}>
<Tabs.Tab title="Python">
Hatchet's Python SDK uses FastAPI-style dependency injection to run logic
before tasks and inject the results as parameters. Dependencies are declared
as functions and wired into tasks with `Depends`.
</Tabs.Tab>
<Tabs.Tab title="Typescript">
Middleware hooks are registered on the client with `withMiddleware` and are
fully type-safe — TypeScript sees the union of fields from the task input
type and any values returned by `before` hooks, and similarly for task
outputs and `after` hooks.
</Tabs.Tab>
<Tabs.Tab title="Go">
<Callout type="info">
Middleware support for the Go SDK is coming soon. Join our
[Discord](https://hatchet.run/discord) to stay up to date.
</Callout>
</Tabs.Tab>
<Tabs.Tab title="Ruby">
In Ruby, this pattern uses callable objects (lambdas/procs) passed as `deps`
when defining tasks. Dependencies are evaluated before each task run and
made available via `ctx.deps`.
</Tabs.Tab>
</UniversalTabs>
## Defining Middleware
<UniversalTabs items={["Python", "Typescript", "Go", "Ruby"]}>
<Tabs.Tab title="Python">
Define your dependency functions — they receive the workflow input and context, and their return values are injected into the task as parameters.
<Snippet src={snippets.python.dependency_injection.worker.declare_dependencies} />
</Tabs.Tab>
<Tabs.Tab title="Typescript">
Create a client and attach middleware with `before` and `after` hooks.
- **`before(input, ctx)`** runs before the task. Its return value **replaces** the task input.
- **`after(output, ctx, input)`** runs after the task. Its return value **replaces** the task output.
<Snippet src={snippets.typescript.middleware.client.init_a_client_with_middleware} />
<Callout type="warning" emoji="⚠️">
**Spread the original value if you want to keep it.** The return value of each hook **replaces** the input (or output) entirely — it does not shallow-merge. If you omit `...input` in a `before` hook, the original fields are lost. The same applies to `...output` in an `after` hook.
```typescript
// ✅ Keeps original fields and adds `requestId`
before: (input) => ({ ...input, requestId: crypto.randomUUID() })
// ❌ Replaces input entirely — task only receives { requestId }
before: (input) => ({ requestId: crypto.randomUUID() })
```
</Callout>
### Chaining Middleware
You can chain multiple `.withMiddleware()` calls to run hooks in sequence. Each `before` hook receives the return value of the previous `before` hook (or the original input for the first hook), and each `after` hook receives the return value of the previous `after` hook.
<Snippet src={snippets.typescript.middleware.client.chaining_middleware} />
</Tabs.Tab>
<Tabs.Tab title="Go">
<Callout type="info">
Middleware support for the Go SDK is coming soon. Join our [Discord](https://hatchet.run/discord) to stay up to date.
</Callout>
</Tabs.Tab>
<Tabs.Tab title="Ruby">
Define your dependencies as callable objects (lambdas). They receive the input, context, and optionally a hash of previously resolved dependencies for chaining.
<Snippet src={snippets.ruby.dependency_injection.worker.declare_dependencies_ruby_uses_callable_objects_instead_of_pythons_depends} />
</Tabs.Tab>
</UniversalTabs>
## How Middleware Executes
<UniversalTabs items={["Python", "Typescript", "Go", "Ruby"]}>
<Tabs.Tab title="Python">
Dependencies are resolved before each task execution. Each dependency function receives the original workflow input and the task context, and its return value is injected as a named parameter to the task function.
</Tabs.Tab>
<Tabs.Tab title="Typescript">
When a task runs, the worker applies middleware hooks in this order:
<Steps>
### Before hooks run in registration order
Each `before` hook receives the current input and the task `Context`. Its return value **replaces** the input for the next hook (or the task itself). Returning `undefined` (or `void`) skips replacement and passes the input through unchanged.
### The task function executes
The task receives the final input after all `before` hooks have run.
### After hooks run in registration order
Each `after` hook receives the current output, the task `Context`, and the final input. Its return value **replaces** the output for the next hook (or the final result). Returning `undefined` skips replacement.
</Steps>
Both `before` and `after` hooks can be **async** — return a `Promise` and it will be awaited before proceeding.
<Callout type="info">
If a middleware hook throws an error, the task run fails with that error. There is no built-in error recovery within middleware — use try/catch inside your hooks if you need graceful fallback.
</Callout>
### The `ctx` Parameter
The second parameter of both `before` and `after` hooks is the task `Context` object. This gives middleware access to:
- `ctx.workflowRunId` — the ID of the current workflow run
- `ctx.stepRunId` — the ID of the current step run
- `ctx.log()` — emit structured logs visible in the Hatchet dashboard
- `ctx.cancel()` — cancel the current run from within middleware
### Global Types vs Middleware Types
There are two ways extra fields end up on a task's input:
| Mechanism | Set via | Required at call site? | Available at runtime? |
|---|---|---|---|
| **Global input type** | `HatchetClient.init<T>()` | Yes — callers must provide these fields | Yes |
| **Middleware before hook** | `.withMiddleware({ before })` | No — injected automatically by the worker | Yes |
Global input types (`T` in `init<T>()`) represent fields that **callers must supply** when triggering a task. This is useful when you know every task must always receive certain parameters — for example, a `userId` for authentication or a `tenantId` for multi-tenant routing. By declaring these as the global type, TypeScript enforces that every caller provides them.
Middleware `before` hooks, on the other hand, inject fields that are **computed at runtime** (e.g. request IDs, decrypted secrets, fetched config) and are **not** required from callers.
```typescript
type RequiredContext = { userId: string; orgId: string };
const client = HatchetClient.init<RequiredContext>()
.withMiddleware({
before: (input) => ({
...input,
resolvedAt: Date.now(), // injected, not required from caller
permissions: lookupPerms(input.userId), // derived from global type
}),
});
// Callers MUST provide userId and orgId — TypeScript enforces this
await myTask.run({ userId: 'usr_123', orgId: 'org_456', /* ...task fields */ });
```
</Tabs.Tab>
<Tabs.Tab title="Go">
<Callout type="info">
Middleware support for the Go SDK is coming soon. Join our [Discord](https://hatchet.run/discord) to stay up to date.
</Callout>
</Tabs.Tab>
<Tabs.Tab title="Ruby">
Dependencies are resolved in the order they are declared in the `deps` hash. Each dependency function can optionally receive already-resolved dependencies as its third argument, enabling chaining.
</Tabs.Tab>
</UniversalTabs>
## Using Middleware in Tasks
<UniversalTabs items={["Python", "Typescript", "Go", "Ruby"]}>
<Tabs.Tab title="Python">
Inject dependencies into your tasks using `Depends` and type annotations. The dependency results are passed directly as function parameters.
<Snippet src={snippets.python.dependency_injection.worker.inject_dependencies} />
<Callout type="warning" emoji="⚠️">
Your dependency functions must take two positional arguments: the workflow input and the `Context` (the same as any other task).
</Callout>
</Tabs.Tab>
<Tabs.Tab title="Typescript">
Tasks created from a middleware-enabled client automatically receive the merged input and output types. There is no extra configuration needed on the task itself.
<Snippet src={snippets.typescript.middleware.workflow.all} />
The task's `input` type is the intersection of `TaskInput`, `GlobalInputType`, and the return type of the `before` middleware hook. The task's return type must satisfy `TaskOutput` and `GlobalOutputType`, while the caller receives the intersection of those with the `after` middleware return type.
</Tabs.Tab>
<Tabs.Tab title="Go">
<Callout type="info">
Middleware support for the Go SDK is coming soon. Join our [Discord](https://hatchet.run/discord) to stay up to date.
</Callout>
</Tabs.Tab>
<Tabs.Tab title="Ruby">
Pass a `deps` hash when defining a task. The resolved dependency values are available inside the task block via `ctx.deps`.
<Snippet src={snippets.ruby.dependency_injection.worker.inject_dependencies} />
</Tabs.Tab>
</UniversalTabs>
## Running a Worker
<UniversalTabs items={["Python", "Typescript", "Go", "Ruby"]}>
<Tabs.Tab title="Python">
No special worker configuration is needed — dependencies are evaluated automatically each time a task runs.
</Tabs.Tab>
<Tabs.Tab title="Typescript">
Workers are created from the same middleware-enabled client. No special setup is required — the middleware hooks are applied automatically when tasks execute.
<Snippet src={snippets.typescript.middleware.worker.all} />
</Tabs.Tab>
<Tabs.Tab title="Go">
<Callout type="info">
Middleware support for the Go SDK is coming soon. Join our [Discord](https://hatchet.run/discord) to stay up to date.
</Callout>
</Tabs.Tab>
<Tabs.Tab title="Ruby">
No special worker configuration is needed — dependencies are resolved automatically before each task execution.
</Tabs.Tab>
</UniversalTabs>
## Practical Examples
The examples below show TypeScript middleware for common production patterns. Each can be adapted to the Python dependency injection model by extracting the same logic into a dependency function.
### End-to-End Encryption
Encrypt sensitive input fields before they reach the Hatchet server, and decrypt the output on the way back. This ensures plaintext data never leaves your worker process.
<Snippet src={snippets.typescript.middleware.recipes.end_to_end_encryption} />
<Callout type="info">
The `before` hook decrypts incoming data so your task function works with
plaintext. The `after` hook encrypts the output before it is stored. The
encryption key never leaves the worker environment.
</Callout>
### Offloading Large Payloads to S3
When task inputs or outputs exceed Hatchet's payload size limit (or you simply want to keep large blobs out of the control plane), upload them to S3 and pass a signed URL instead.
<Snippet
src={snippets.typescript.middleware.recipes.offloading_large_payloads_to_s3}
/>
<Callout type="warning" emoji="⚠️">
The caller is responsible for uploading oversized inputs to S3 before triggering the task. The `before` hook only handles the download side. You can use the same `uploadToS3` helper on the caller side to upload the input and pass `{ __s3Url: url }` as the task input.
</Callout>
## FAQ
### What is Hatchet middleware and how does it differ from Express middleware?
Hatchet middleware runs **inside the worker process** around each task invocation — not on an HTTP request path. A `before` hook transforms input before the task runs, and an `after` hook transforms output after. Unlike Express middleware, there is no `next()` function; hooks return their result directly and the runner chains them automatically.
### Can I use middleware with both tasks and workflows?
Yes. Middleware is registered on the `HatchetClient` instance, so it applies to every task created from that client — whether the task is a standalone `client.task()` or part of a multi-step `client.workflow()`. Each step in a workflow will have middleware applied independently.
### Does middleware run on the server or on the worker?
Middleware runs entirely **on the worker**. The Hatchet server never sees or executes your middleware code. This is what makes patterns like end-to-end encryption possible — plaintext data stays within your infrastructure.
### What happens if my middleware throws an error?
If a `before` or `after` hook throws (or returns a rejected `Promise`), the task run fails with that error. There is no automatic retry of middleware itself, but the task's configured retry policy will still apply, re-running the task (and its middleware) from scratch.
### Can I use async/await in middleware hooks?
Yes. Both `before` and `after` hooks can be synchronous or asynchronous. If a hook returns a `Promise`, the worker will `await` it before proceeding to the next hook or the task function.
### How do I share state between `before` and `after` hooks?
The `after` hook receives the task input (after `before` hooks have run) as its third argument. Add fields in `before` (e.g. `startedAt`, `traceId`) and read them from `input` in `after`. There is no separate shared context object — the input itself is the carrier.
### Does middleware apply to child tasks spawned via fanout?
Middleware is scoped to the **client instance**. If a child task is defined on the same middleware-enabled client, its middleware will run when that child task executes. If the child task uses a different client instance, only that client's middleware (if any) applies.
### Can I selectively skip middleware for certain tasks?
Middleware applies to **all** tasks on a given client. To skip middleware for specific tasks, create a second client without middleware and define those tasks on it. This is a deliberate design choice — middleware is a cross-cutting concern, and selective opt-out is handled at the client boundary.
### Is there a performance overhead to using middleware?
Middleware hooks are plain JavaScript functions that run in-process on the worker. The overhead is the execution time of your hook code. For lightweight operations (adding a field, logging), the overhead is negligible. For heavier operations (network calls like S3 uploads or decryption), the task's total duration will include that time, so keep hooks as efficient as possible.
### What is the difference between global types and middleware types in TypeScript?
Global types (`HatchetClient.init<GlobalInput, GlobalOutput>()`) define fields that **callers must provide** when triggering a task. Middleware types (inferred from `withMiddleware` return values) define fields that are **injected at runtime** by the worker. Both end up on the task's `input` type, but only global types appear in the caller-facing `run()` signature.
### Can I use middleware for rate limiting or authentication?
Yes. A `before` hook can check rate limits, validate API keys, or verify JWTs before the task runs. If the check fails, throw an error to abort the task. However, for rate limiting specifically, consider using Hatchet's built-in [rate limiting](/home/rate-limits) feature, which operates at the scheduling layer and is more efficient than in-worker checks.
### How do I test middleware in isolation?
Middleware hooks are plain functions — you can unit-test them directly by calling them with mock input and a mock context object. For integration tests, the e2e test pattern of creating a client, attaching middleware, defining a task, starting a worker, and asserting on the result works well. See the [middleware example on GitHub](https://github.com/hatchet-dev/hatchet/tree/main/examples/typescript/middleware) for a complete test setup.

View File

@@ -369,7 +369,7 @@ const TEST_CASES: SearchTestCase[] = [
{
name: "dependency injection",
query: "dependency injection",
expectAnyOf: ["home/dependency-injection"],
expectAnyOf: ["home/middleware"],
},
{
name: "dataclass",

View File

@@ -5,6 +5,13 @@ All notable changes to Hatchet's TypeScript SDK will be documented in this chang
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.13.0] - 2026-02-23
### Added
- Introduced client middleware support with composable `before`/`after` hooks to customize request handling and response processing.
- Added middleware examples and recipes to demonstrate practical client-side patterns.
## [1.12.1] - 2026-02-18
### Fixed

View File

@@ -1,6 +1,6 @@
{
"name": "@hatchet-dev/typescript-sdk",
"version": "1.12.1",
"version": "1.13.0",
"description": "Background task orchestration & visibility for developers",
"types": "dist/index.d.ts",
"files": [

View File

@@ -1,5 +1,6 @@
import { ChannelCredentials } from 'nice-grpc';
import { z } from 'zod';
import type { Context } from '@hatchet/v1/client/worker/context';
import { Logger, LogLevel } from '@util/logger';
const ClientTLSConfigSchema = z.object({
@@ -15,6 +16,13 @@ const HealthcheckConfigSchema = z.object({
port: z.number().optional().default(8001),
});
const TaskMiddlewareSchema = z
.object({
before: z.any().optional(),
after: z.any().optional(),
})
.optional();
export const ClientConfigSchema = z.object({
token: z.string(),
tls_config: ClientTLSConfigSchema,
@@ -24,11 +32,80 @@ export const ClientConfigSchema = z.object({
log_level: z.enum(['OFF', 'DEBUG', 'INFO', 'WARN', 'ERROR']).optional(),
tenant_id: z.string(),
namespace: z.string().optional(),
middleware: TaskMiddlewareSchema,
});
export type LogConstructor = (context: string, logLevel?: LogLevel) => Logger;
/**
* A middleware function that runs before every task invocation.
* Returns extra fields to replace the task input, or void to skip.
* @template T - The expected input type for the hook.
* @param input - The current task input.
* @param ctx - The task execution context.
* @returns The new input value, or void to pass through unchanged.
*/
export type BeforeHookFn<T = any> = (
input: T,
ctx: Context<any>
) => Record<string, any> | void | Promise<Record<string, any> | void>;
/**
* A middleware function that runs after every task invocation.
* Returns extra fields to replace the task output, or void to skip.
* @param output - The task output.
* @param ctx - The task execution context.
* @param input - The task input (after before-hooks have run).
* @returns The new output value, or void to pass through unchanged.
*/
export type AfterHookFn<TOutput = any, TInput = any> = (
output: TOutput,
ctx: Context<any>,
input: TInput
) => Record<string, any> | void | Promise<Record<string, any> | void>;
/**
* Middleware hooks that run before/after every task invocation.
*
* Each hook can be a single function or an array of functions.
* When an array is provided the functions run in order and each
* result replaces the value (input for `before`, output for `after`).
*
* Return `void` (or `undefined`) from a hook to pass through unchanged.
*/
export type TaskMiddleware<TInput = any, TOutput = any> = {
before?: BeforeHookFn<TInput> | readonly BeforeHookFn<TInput>[];
after?: AfterHookFn<TOutput, TInput> | readonly AfterHookFn<TOutput, TInput>[];
};
type NonVoidReturn<F> = F extends (...args: any[]) => infer R
? Exclude<Awaited<R>, void | undefined>
: {};
type MergeReturns<T> = T extends readonly [infer F, ...infer Rest]
? NonVoidReturn<F> & MergeReturns<Rest>
: {};
export type InferMiddlewareBefore<M> = M extends { before: infer P }
? P extends (...args: any[]) => any
? NonVoidReturn<P>
: P extends readonly any[]
? MergeReturns<P>
: {}
: {};
export type InferMiddlewareAfter<M> = M extends { after: infer P }
? P extends (...args: any[]) => any
? NonVoidReturn<P>
: P extends readonly any[]
? MergeReturns<P>
: {}
: {};
export type ClientConfig = z.infer<typeof ClientConfigSchema> & {
credentials?: ChannelCredentials;
} & { logger: LogConstructor };
} & {
logger: LogConstructor;
middleware?: TaskMiddleware;
};
export type ClientTLSConfig = z.infer<typeof ClientTLSConfigSchema>;

View File

@@ -128,6 +128,157 @@ describe('Worker', () => {
expect(sendActionEventSpy).toHaveBeenCalledTimes(2);
});
it('should apply middleware before/after (merge semantics)', async () => {
const worker = new V0Worker(hatchet, { name: 'WORKER_NAME' });
const order: string[] = [];
const seenInputs: any[] = [];
hatchet.config.middleware = {
before: (_input: any, ctx: any) => {
order.push('before');
expect(ctx.taskRunExternalId()).toBe(mockStart.taskRunExternalId);
return { data: 2 };
},
after: (_output: any, _ctx: any, input: any) => {
order.push('after');
return { observed: input.data };
},
};
const getActionEventSpy = jest.spyOn(worker, 'getStepActionEvent');
jest.spyOn(worker.client.dispatcher, 'sendStepActionEvent').mockResolvedValue({
tenantId: 'TENANT_ID',
workerId: 'WORKER_ID',
});
const startSpy = jest.fn().mockImplementation((ctx: any) => {
order.push('step');
seenInputs.push(ctx.input);
return { ok: true };
});
worker.action_registry = {
[mockStart.actionId]: startSpy,
};
worker.handleStartStepRun(mockStart);
await sleep(100);
expect(order).toEqual(['before', 'step', 'after']);
// before merges { data: 2 } into existing input
expect(seenInputs[0]).toEqual(expect.objectContaining({ data: 2 }));
// after merges { observed: 2 } into the task result { ok: true }
expect(getActionEventSpy).toHaveBeenNthCalledWith(
2,
expect.anything(),
StepActionEventType.STEP_EVENT_TYPE_COMPLETED,
false,
{ ok: true, observed: 2 },
0
);
});
it('should apply array of before/after hooks in order', async () => {
const worker = new V0Worker(hatchet, { name: 'WORKER_NAME' });
const order: string[] = [];
const seenInputs: any[] = [];
hatchet.config.middleware = {
before: [
(_input: any, _ctx: any) => {
order.push('before1');
return { a: 1 };
},
(_input: any, _ctx: any) => {
order.push('before2');
return { b: 2 };
},
],
after: [
(_output: any, _ctx: any, _input: any) => {
order.push('after1');
return { x: 10 };
},
(_output: any, _ctx: any, _input: any) => {
order.push('after2');
return { y: 20 };
},
],
};
const getActionEventSpy = jest.spyOn(worker, 'getStepActionEvent');
jest.spyOn(worker.client.dispatcher, 'sendStepActionEvent').mockResolvedValue({
tenantId: 'TENANT_ID',
workerId: 'WORKER_ID',
});
const startSpy = jest.fn().mockImplementation((ctx: any) => {
order.push('step');
seenInputs.push(ctx.input);
return { ok: true };
});
worker.action_registry = {
[mockStart.actionId]: startSpy,
};
worker.handleStartStepRun(mockStart);
await sleep(100);
expect(order).toEqual(['before1', 'before2', 'step', 'after1', 'after2']);
expect(seenInputs[0]).toEqual(expect.objectContaining({ a: 1, b: 2 }));
expect(getActionEventSpy).toHaveBeenNthCalledWith(
2,
expect.anything(),
StepActionEventType.STEP_EVENT_TYPE_COMPLETED,
false,
{ ok: true, x: 10, y: 20 },
0
);
});
it('should treat middleware errors as task errors', async () => {
const worker = new V0Worker(hatchet, { name: 'WORKER_NAME' });
hatchet.config.middleware = {
before: () => {
throw new Error('middleware exploded');
},
};
const getActionEventSpy = jest.spyOn(worker, 'getStepActionEvent');
jest.spyOn(worker.client.dispatcher, 'sendStepActionEvent').mockResolvedValue({
tenantId: 'TENANT_ID',
workerId: 'WORKER_ID',
});
const startSpy = jest.fn();
worker.action_registry = {
[mockStart.actionId]: startSpy,
};
worker.handleStartStepRun(mockStart);
await sleep(100);
expect(startSpy).not.toHaveBeenCalled();
expect(getActionEventSpy).toHaveBeenNthCalledWith(
2,
expect.anything(),
StepActionEventType.STEP_EVENT_TYPE_FAILED,
false,
expect.anything(),
0
);
});
it('should fail gracefully', async () => {
const worker = new V0Worker(hatchet, { name: 'WORKER_NAME' });

View File

@@ -252,7 +252,35 @@ export class V0Worker {
}
const run = async () => {
return step(context);
const { middleware } = this.client.config;
if (middleware?.before) {
const hooks = Array.isArray(middleware.before) ? middleware.before : [middleware.before];
for (const hook of hooks) {
const extra = await hook(context.input, context as any);
if (extra !== undefined) {
const merged = { ...(context.input as any), ...extra };
(context as any).input = merged;
if ((context as any).data && typeof (context as any).data === 'object') {
(context as any).data.input = merged;
}
}
}
}
let result: any = await step(context);
if (middleware?.after) {
const hooks = Array.isArray(middleware.after) ? middleware.after : [middleware.after];
for (const hook of hooks) {
const extra = await hook(result, context as any, context.input);
if (extra !== undefined) {
result = { ...result, ...extra };
}
}
}
return result;
};
const success = async (result: any) => {

View File

@@ -5,6 +5,9 @@ import {
ClientConfigSchema,
HatchetClientOptions,
LegacyHatchetClient,
TaskMiddleware,
InferMiddlewareBefore,
InferMiddlewareAfter,
} from '@hatchet/clients/hatchet-client';
import { AxiosRequestConfig } from 'axios';
import WorkflowRunRef from '@hatchet/util/workflow-run-ref';
@@ -36,7 +39,13 @@ import { MetricsClient } from './features/metrics';
import { WorkersClient } from './features/workers';
import { WorkflowsClient } from './features/workflows';
import { RunsClient } from './features/runs';
import { InputType, OutputType, UnknownInputType, StrictWorkflowOutputType } from '../types';
import {
InputType,
OutputType,
UnknownInputType,
StrictWorkflowOutputType,
Resolved,
} from '../types';
import { RatelimitsClient } from './features';
import { AdminClient } from './admin';
import { FiltersClient } from './features/filters';
@@ -46,11 +55,25 @@ import { CELClient } from './features/cel';
import { TenantClient } from './features/tenant';
import { WebhooksClient } from './features/webhooks';
type MergeIfNonEmpty<Base, Extra extends Record<string, any>> = keyof Extra extends never
? Base
: Base & Extra;
/**
* HatchetV1 implements the main client interface for interacting with the Hatchet workflow engine.
* It provides methods for creating and executing workflows, as well as managing workers.
*
* @template GlobalInput - Global input type required by all tasks. Set via `init<T>()`. Defaults to `{}`.
* @template MiddlewareBefore - Extra fields merged into task input by pre-middleware hooks. Inferred from middleware config.
* @template MiddlewareAfter - Extra fields merged into task output by post-middleware hooks. Inferred from middleware config.
*/
export class HatchetClient implements IHatchetClient {
export class HatchetClient<
GlobalInput extends Record<string, any> = {},
GlobalOutput extends Record<string, any> = {},
MiddlewareBefore extends Record<string, any> = {},
MiddlewareAfter extends Record<string, any> = {},
> implements IHatchetClient
{
/** The underlying v0 client instance */
_v0: LegacyHatchetClient;
_api: Api;
@@ -153,17 +176,59 @@ export class HatchetClient implements IHatchetClient {
/**
* Static factory method to create a new Hatchet client instance.
* @param config - Optional configuration for the client
* @param options - Optional client options
* @param axiosConfig - Optional Axios configuration for HTTP requests
* @returns A new Hatchet client instance
* @template T - Global input type required by all tasks created from this client. Defaults to `{}`.
* @template U - Global output type required by all tasks created from this client. Defaults to `{}`.
* @param config - Optional configuration for the client.
* @param options - Optional client options.
* @param axiosConfig - Optional Axios configuration for HTTP requests.
* @returns A new Hatchet client instance. Chain `.withMiddleware()` to attach typed middleware.
*/
static init(
config?: Partial<ClientConfig>,
static init<T extends Record<string, any> = {}, U extends Record<string, any> = {}>(
config?: Omit<Partial<ClientConfig>, 'middleware'>,
options?: HatchetClientOptions,
axiosConfig?: AxiosRequestConfig
): HatchetClient {
return new HatchetClient(config, options, axiosConfig);
): HatchetClient<T, U> {
return new HatchetClient(config, options, axiosConfig) as unknown as HatchetClient<T, U>;
}
/**
* Attaches middleware to this client and returns a re-typed instance
* with inferred pre/post middleware types.
*
* Use this after `init<T, U>()` to get full middleware return-type inference
* that TypeScript can't provide when global types are explicitly set on `init`.
*/
withMiddleware<
const M extends TaskMiddleware<
Resolved<GlobalInput, MiddlewareBefore>,
Resolved<GlobalOutput, MiddlewareAfter>
>,
>(
middleware: M
): HatchetClient<
GlobalInput,
GlobalOutput,
MiddlewareBefore & InferMiddlewareBefore<M>,
MiddlewareAfter & InferMiddlewareAfter<M>
> {
const existing: TaskMiddleware = (this._config as any).middleware || {};
const toArray = <T>(v: T | readonly T[] | undefined): T[] => {
if (v == null) return [];
if (Array.isArray(v)) return [...v];
return [v as T];
};
(this._config as any).middleware = {
before: [...toArray(existing.before), ...toArray(middleware.before)],
after: [...toArray(existing.after), ...toArray(middleware.after)],
};
return this as unknown as HatchetClient<
GlobalInput,
GlobalOutput,
MiddlewareBefore & InferMiddlewareBefore<M>,
MiddlewareAfter & InferMiddlewareAfter<M>
>;
}
private _config: ClientConfig;
@@ -182,8 +247,12 @@ export class HatchetClient implements IHatchetClient {
*/
workflow<I extends InputType = UnknownInputType, O extends StrictWorkflowOutputType = {}>(
options: CreateWorkflowOpts
): WorkflowDeclaration<I, O> {
return CreateWorkflow<I, O>(options, this);
): WorkflowDeclaration<I, O, Resolved<GlobalInput, MiddlewareBefore>> {
return CreateWorkflow<I, O>(options, this) as WorkflowDeclaration<
I,
O,
Resolved<GlobalInput, MiddlewareBefore>
>;
}
/**
@@ -195,8 +264,11 @@ export class HatchetClient implements IHatchetClient {
* @returns A TaskWorkflowDeclaration instance
*/
task<I extends InputType = UnknownInputType, O extends OutputType = void>(
options: CreateTaskWorkflowOpts<I, O>
): TaskWorkflowDeclaration<I, O>;
options: CreateTaskWorkflowOpts<
I & Resolved<GlobalInput, MiddlewareBefore>,
MergeIfNonEmpty<O, GlobalOutput>
>
): TaskWorkflowDeclaration<I, O, GlobalInput, GlobalOutput, MiddlewareBefore, MiddlewareAfter>;
/**
* Creates a new task workflow with types inferred from the function parameter.
@@ -218,7 +290,7 @@ export class HatchetClient implements IHatchetClient {
options: {
fn: Fn;
} & Omit<CreateTaskWorkflowOpts<I, O>, 'fn'>
): TaskWorkflowDeclaration<I, O>;
): TaskWorkflowDeclaration<I, O, GlobalInput, GlobalOutput, MiddlewareBefore, MiddlewareAfter>;
/**
* Implementation of the task method.
@@ -236,8 +308,11 @@ export class HatchetClient implements IHatchetClient {
* @returns A TaskWorkflowDeclaration instance for a durable task
*/
durableTask<I extends InputType, O extends OutputType>(
options: CreateDurableTaskWorkflowOpts<I, O>
): TaskWorkflowDeclaration<I, O>;
options: CreateDurableTaskWorkflowOpts<
I & Resolved<GlobalInput, MiddlewareBefore>,
MergeIfNonEmpty<O, GlobalOutput>
>
): TaskWorkflowDeclaration<I, O, GlobalInput, GlobalOutput, MiddlewareBefore, MiddlewareAfter>;
/**
* Creates a new durable task workflow with types inferred from the function parameter.
@@ -259,7 +334,7 @@ export class HatchetClient implements IHatchetClient {
options: {
fn: Fn;
} & Omit<CreateDurableTaskWorkflowOpts<I, O>, 'fn'>
): TaskWorkflowDeclaration<I, O>;
): TaskWorkflowDeclaration<I, O, GlobalInput, GlobalOutput, MiddlewareBefore, MiddlewareAfter>;
/**
* Implementation of the durableTask method.

View File

@@ -572,7 +572,34 @@ export class V1Worker {
childIndex: 0,
desiredWorkerId: this.workerId || '',
});
return step(context);
const { middleware } = this.client.config;
if (middleware?.before) {
const hooks = Array.isArray(middleware.before) ? middleware.before : [middleware.before];
for (const hook of hooks) {
const returned = await hook(context.input, context as any);
if (returned !== undefined) {
(context as any).input = returned;
if ((context as any).data && typeof (context as any).data === 'object') {
(context as any).data.input = returned;
}
}
}
}
let result: any = await step(context);
if (middleware?.after) {
const hooks = Array.isArray(middleware.after) ? middleware.after : [middleware.after];
for (const hook of hooks) {
const returned = await hook(result, context as any, context.input);
if (returned !== undefined) {
result = returned;
}
}
}
return result;
};
const success = async (result: any) => {

View File

@@ -22,7 +22,7 @@ import {
} from './task';
import { Duration } from './client/duration';
import { MetricsClient } from './client/features/metrics';
import { InputType, OutputType, UnknownInputType, JsonObject } from './types';
import { InputType, OutputType, UnknownInputType, JsonObject, Resolved } from './types';
import { Context, DurableContext } from './client/worker/context';
import { parentRunContextManager } from './parent-run-context-vars';
@@ -585,6 +585,7 @@ export class BaseWorkflowDeclaration<
export class WorkflowDeclaration<
I extends InputType = UnknownInputType,
O extends OutputType = void,
MiddlewareBefore extends Record<string, any> = {},
> extends BaseWorkflowDeclaration<I, O> {
/**
* Adds a task to the workflow.
@@ -599,10 +600,10 @@ export class WorkflowDeclaration<
Name extends string,
Fn extends Name extends keyof O
? (
input: I,
ctx: Context<I>
input: I & MiddlewareBefore,
ctx: Context<I & MiddlewareBefore>
) => O[Name] extends OutputType ? O[Name] | Promise<O[Name]> : void
: (input: I, ctx: Context<I>) => void,
: (input: I & MiddlewareBefore, ctx: Context<I & MiddlewareBefore>) => void,
FnReturn = ReturnType<Fn> extends Promise<infer P> ? P : ReturnType<Fn>,
TO extends OutputType = Name extends keyof O
? O[Name] extends OutputType
@@ -713,10 +714,10 @@ export class WorkflowDeclaration<
Name extends string,
Fn extends Name extends keyof O
? (
input: I,
ctx: DurableContext<I>
input: I & MiddlewareBefore,
ctx: DurableContext<I & MiddlewareBefore>
) => O[Name] extends OutputType ? O[Name] | Promise<O[Name]> : void
: (input: I, ctx: DurableContext<I>) => void,
: (input: I & MiddlewareBefore, ctx: DurableContext<I & MiddlewareBefore>) => void,
FnReturn = ReturnType<Fn> extends Promise<infer P> ? P : ReturnType<Fn>,
TO extends OutputType = Name extends keyof O
? O[Name] extends OutputType
@@ -737,13 +738,26 @@ export class WorkflowDeclaration<
}
}
/**
* A standalone task workflow that can be run, scheduled, or triggered via cron.
*
* @template I - The task-specific input type.
* @template O - The task output type.
* @template GlobalInput - Global input type from the client, merged into all run/schedule/cron input signatures.
* @template MiddlewareBefore - Extra fields added to the task fn input by pre-middleware hooks.
* @template MiddlewareAfter - Extra fields merged into the task output by post-middleware hooks.
*/
export class TaskWorkflowDeclaration<
I extends InputType = UnknownInputType,
O extends OutputType = void,
GlobalInput extends Record<string, any> = {},
GlobalOutput extends Record<string, any> = {},
MiddlewareBefore extends Record<string, any> = {},
MiddlewareAfter extends Record<string, any> = {},
> extends BaseWorkflowDeclaration<I, O> {
_standalone_task_name: string;
constructor(options: CreateTaskWorkflowOpts<I, O>, client?: IHatchetClient) {
constructor(options: CreateTaskWorkflowOpts<any, any>, client?: IHatchetClient) {
super({ ...options }, client);
this._standalone_task_name = options.name;
@@ -753,34 +767,142 @@ export class TaskWorkflowDeclaration<
});
}
async run(input: I, options?: RunOpts): Promise<O>;
async run(input: I[], options?: RunOpts): Promise<O[]>;
async run(input: I | I[], options?: RunOpts): Promise<O | O[]> {
// note: typescript is not smart enough to infer that input is an array
/**
* Triggers a task run and waits for the result.
* @param input - The input data for the task, including global input fields.
* @param options - Optional configuration for this task run.
* @returns A promise that resolves with the task output merged with post-middleware fields.
*/
async runAndWait(
input: I & GlobalInput,
options?: RunOpts
): Promise<O & Resolved<GlobalOutput, MiddlewareAfter>>;
async runAndWait(
input: (I & GlobalInput)[],
options?: RunOpts
): Promise<(O & Resolved<GlobalOutput, MiddlewareAfter>)[]>;
async runAndWait(
input: (I & GlobalInput) | (I & GlobalInput)[],
options?: RunOpts
): Promise<
(O & Resolved<GlobalOutput, MiddlewareAfter>) | (O & Resolved<GlobalOutput, MiddlewareAfter>)[]
> {
return Array.isArray(input)
? super.run(input, options, this._standalone_task_name)
: super.run(input, options, this._standalone_task_name);
? (super.runAndWait(input, options, this._standalone_task_name) as Promise<
(O & Resolved<GlobalOutput, MiddlewareAfter>)[]
>)
: (super.runAndWait(input, options, this._standalone_task_name) as Promise<
O & Resolved<GlobalOutput, MiddlewareAfter>
>);
}
/**
* Triggers a workflow run without waiting for completion.
* @param input The input data for the workflow.
* @param options Optional configuration for this workflow run.
* @returns A WorkflowRunRef containing the run ID and methods to get results and interact with the run.
* @throws Error if the workflow is not bound to a Hatchet client.
* Triggers a task run and waits for the result.
* @param input - The input data for the task, including global input fields.
* @param options - Optional configuration for this task run.
* @returns A promise that resolves with the task output merged with post-middleware fields.
*/
async runNoWait(input: I, options?: RunOpts): Promise<WorkflowRunRef<O>>;
async runNoWait(input: I[], options?: RunOpts): Promise<WorkflowRunRef<O>[]>;
async runNoWait(
input: I | I[],
async run(
input: I & GlobalInput,
options?: RunOpts
): Promise<WorkflowRunRef<O> | WorkflowRunRef<O>[]> {
// note: typescript is not smart enough to infer that input is an array
): Promise<O & Resolved<GlobalOutput, MiddlewareAfter>>;
async run(
input: (I & GlobalInput)[],
options?: RunOpts
): Promise<(O & Resolved<GlobalOutput, MiddlewareAfter>)[]>;
async run(
input: (I & GlobalInput) | (I & GlobalInput)[],
options?: RunOpts
): Promise<
(O & Resolved<GlobalOutput, MiddlewareAfter>) | (O & Resolved<GlobalOutput, MiddlewareAfter>)[]
> {
return Array.isArray(input)
? super.runNoWait(input, options, this._standalone_task_name)
: super.runNoWait(input, options, this._standalone_task_name);
? (super.run(input, options, this._standalone_task_name) as Promise<
(O & Resolved<GlobalOutput, MiddlewareAfter>)[]
>)
: (super.run(input, options, this._standalone_task_name) as Promise<
O & Resolved<GlobalOutput, MiddlewareAfter>
>);
}
/**
* Triggers a task run without waiting for completion.
* @param input - The input data for the task, including global input fields.
* @param options - Optional configuration for this task run.
* @returns A WorkflowRunRef containing the run ID and methods to get results.
*/
async runNoWait(
input: I & GlobalInput,
options?: RunOpts
): Promise<WorkflowRunRef<O & Resolved<GlobalOutput, MiddlewareAfter>>>;
async runNoWait(
input: (I & GlobalInput)[],
options?: RunOpts
): Promise<WorkflowRunRef<O & Resolved<GlobalOutput, MiddlewareAfter>>[]>;
async runNoWait(
input: (I & GlobalInput) | (I & GlobalInput)[],
options?: RunOpts
): Promise<
| WorkflowRunRef<O & Resolved<GlobalOutput, MiddlewareAfter>>
| WorkflowRunRef<O & Resolved<GlobalOutput, MiddlewareAfter>>[]
> {
return Array.isArray(input)
? (super.runNoWait(input, options, this._standalone_task_name) as Promise<
WorkflowRunRef<O & Resolved<GlobalOutput, MiddlewareAfter>>[]
>)
: (super.runNoWait(input, options, this._standalone_task_name) as Promise<
WorkflowRunRef<O & Resolved<GlobalOutput, MiddlewareAfter>>
>);
}
/**
* Schedules the task to run at a specific date and time.
* @param enqueueAt - The date when the task should be triggered.
* @param input - The input data for the task, including global input fields.
* @param options - Optional configuration for this task run.
* @returns A promise that resolves with the scheduled workflow details.
*/
async schedule(
enqueueAt: Date,
input: I & GlobalInput,
options?: RunOpts
): Promise<ScheduledWorkflows> {
return super.schedule(enqueueAt, input, options);
}
/**
* Schedules the task to run after a specified delay.
* @param duration - The delay in seconds before the task should run.
* @param input - The input data for the task, including global input fields.
* @param options - Optional configuration for this task run.
* @returns A promise that resolves with the scheduled workflow details.
*/
async delay(
duration: number,
input: I & GlobalInput,
options?: RunOpts
): Promise<ScheduledWorkflows> {
return super.delay(duration, input, options);
}
/**
* Creates a cron schedule for the task.
* @param name - The name of the cron schedule.
* @param expression - The cron expression defining the schedule.
* @param input - The input data for the task, including global input fields.
* @param options - Optional configuration for this task run.
* @returns A promise that resolves with the cron workflow details.
*/
async cron(
name: string,
expression: string,
input: I & GlobalInput,
options?: RunOpts
): Promise<CronWorkflows> {
return super.cron(name, expression, input, options);
}
/** Returns the underlying task definition for this declaration. */
get taskDef() {
return this.definition._tasks[0];
}

View File

@@ -0,0 +1,53 @@
// > Init a client with middleware
import { HatchetClient, HatchetMiddleware } from '@hatchet/v1';
export type GlobalInputType = {
first: number;
second: number;
};
export type GlobalOutputType = {
extra: number;
};
const myMiddleware = {
before: (input, ctx) => {
console.log('before', input.first);
return { ...input, dependency: 'abc-123' };
},
after: (output, ctx, input) => {
return { ...output, additionalData: 2 };
},
} satisfies HatchetMiddleware<GlobalInputType, GlobalOutputType>;
export const hatchetWithMiddleware = HatchetClient.init<
GlobalInputType,
GlobalOutputType
>().withMiddleware(myMiddleware);
// !!
// > Chaining middleware
const firstMiddleware = {
before: (input, ctx) => {
console.log('before', input.first);
return { ...input, dependency: 'abc-123' };
},
after: (output, ctx, input) => {
return { ...output, firstExtra: 3 };
},
} satisfies HatchetMiddleware<GlobalInputType>;
const secondMiddleware = {
before: (input, ctx) => {
console.log('before', input.dependency); // available from previous middleware
return { ...input, anotherDep: true };
},
after: (output, ctx, input) => {
return { ...output, secondExtra: 4 };
},
} satisfies HatchetMiddleware<GlobalInputType & { dependency: string }>;
export const hatchetWithMiddlewareChaining = HatchetClient.init<GlobalInputType>()
.withMiddleware(firstMiddleware)
.withMiddleware(secondMiddleware);
// !!

View File

@@ -0,0 +1,129 @@
import sleep from '@hatchet/util/sleep';
import { HatchetClient } from '@hatchet/v1';
import { Worker } from '../../client/worker/worker';
describe('middleware-e2e', () => {
let worker: Worker;
afterEach(async () => {
if (worker) {
await worker.stop();
await sleep(2000);
}
});
it('should inject before middleware fields into task input and after middleware fields into output', async () => {
const client = HatchetClient.init<
{ first: number; second: number },
{ extra: number }
>().withMiddleware({
before: (input) => {
return { ...input, dependency: `dep-${input.first}-${input.second}` };
},
after: (output) => {
return { ...output, additionalData: 42 };
},
});
const task = client.task<{ message: string }, { message: string; extra: number }>({
name: 'middleware-e2e-single',
fn: (input) => {
return {
message: `${input.message}:${input.dependency}`,
extra: input.first + input.second,
};
},
});
worker = await client.worker('middleware-e2e-worker', {
workflows: [task],
});
void worker.start();
await sleep(5000);
const result = await task.run({
message: 'hello',
first: 10,
second: 20,
});
expect(result.message).toBe('hello:dep-10-20');
expect(result.extra).toBe(30);
expect(result.additionalData).toBe(42);
}, 60000);
it('should strip fields not included in middleware return when input is not spread', async () => {
const client = HatchetClient.init<{ first: number; second: number }>().withMiddleware({
before: (input) => {
return { dependency: `dep-${input.first}-${input.second}` };
},
after: (output) => {
return { additionalData: 99 };
},
});
const task = client.task<{}, { result: string }>({
name: 'middleware-e2e-no-spread',
fn: (input) => {
return {
result: input.dependency,
};
},
});
worker = await client.worker('middleware-e2e-no-spread-worker', {
workflows: [task],
});
void worker.start();
await sleep(5000);
const result = await task.run({ first: 10, second: 20 });
expect(result.additionalData).toBe(99);
expect((result as any).result).toBeUndefined();
}, 60000);
it('should chain multiple withMiddleware calls with accumulated context', async () => {
const client = HatchetClient.init<{ value: number }>()
.withMiddleware({
before: (input) => {
return { ...input, doubled: input.value * 2 };
},
after: (output) => {
return { ...output, postFirst: true };
},
})
.withMiddleware({
before: (input) => {
return { ...input, quadrupled: input.doubled * 2 };
},
after: (output) => {
return { ...output, postSecond: true };
},
});
const task = client.task<{}, { result: number }>({
name: 'middleware-e2e-chained',
fn: (input) => {
return {
result: input.quadrupled,
};
},
});
worker = await client.worker('middleware-e2e-chained-worker', {
workflows: [task],
});
void worker.start();
await sleep(5000);
const result = await task.run({ value: 5 });
expect(result.result).toBe(20);
expect(result.postFirst).toBe(true);
expect(result.postSecond).toBe(true);
}, 60000);
});

View File

@@ -0,0 +1,99 @@
// @ts-nocheck
// These snippets demonstrate common middleware patterns.
// They reference external packages (@aws-sdk/*) that are NOT
// dependencies of the Hatchet SDK — install them in your own project.
// > End-to-end encryption
import { HatchetClient, HatchetMiddleware } from '@hatchet/v1';
import { randomUUID, createCipheriv, createDecipheriv, randomBytes } from 'crypto';
// !!
// > Offloading large payloads to S3
import { S3Client, PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.ENCRYPTION_KEY!, 'hex');
type EncryptedEnvelope = { ciphertext: string; iv: string; tag: string };
function encrypt(plaintext: string): EncryptedEnvelope {
const iv = randomBytes(16);
const cipher = createCipheriv(ALGORITHM, KEY, iv);
const encrypted = Buffer.concat([cipher.update(plaintext, 'utf8'), cipher.final()]);
return {
ciphertext: encrypted.toString('base64'),
iv: iv.toString('base64'),
tag: cipher.getAuthTag().toString('base64'),
};
}
function decrypt(ciphertext: string, iv: string, tag: string): string {
const decipher = createDecipheriv(ALGORITHM, KEY, Buffer.from(iv, 'base64'));
decipher.setAuthTag(Buffer.from(tag, 'base64'));
return decipher.update(ciphertext, 'base64', 'utf8') + decipher.final('utf8');
}
type EncryptedInput = { encrypted?: EncryptedEnvelope };
const e2eEncryption: HatchetMiddleware<EncryptedInput> = {
before: (input) => {
if (!input.encrypted) return input;
const { ciphertext, iv, tag } = input.encrypted;
const decrypted = JSON.parse(decrypt(ciphertext, iv, tag));
return { ...input, ...decrypted, encrypted: undefined };
},
after: (output) => {
const payload = JSON.stringify(output);
return { encrypted: encrypt(payload) };
},
};
const encryptionClient = HatchetClient.init<EncryptedInput>().withMiddleware(e2eEncryption);
const s3 = new S3Client({ region: process.env.AWS_REGION });
const BUCKET = process.env.S3_BUCKET!;
const PAYLOAD_THRESHOLD = 256 * 1024; // 256 KB
async function uploadToS3(data: unknown): Promise<string> {
const key = `hatchet-payloads/${randomUUID()}.json`;
await s3.send(
new PutObjectCommand({
Bucket: BUCKET,
Key: key,
Body: JSON.stringify(data),
ContentType: 'application/json',
})
);
return getSignedUrl(s3, new GetObjectCommand({ Bucket: BUCKET, Key: key }), {
expiresIn: 3600,
});
}
async function downloadFromS3(url: string): Promise<unknown> {
const res = await fetch(url);
return res.json();
}
type S3Input = { s3Url?: string };
const s3Offload: HatchetMiddleware<S3Input> = {
before: async (input) => {
if (input.s3Url) {
const restored = (await downloadFromS3(input.s3Url)) as Record<string, any>;
return { ...restored, s3Url: undefined };
}
return input;
},
after: async (output) => {
const serialized = JSON.stringify(output);
if (serialized.length > PAYLOAD_THRESHOLD) {
const url = await uploadToS3(output);
return { s3Url: url };
}
return output;
},
};
const s3Client = HatchetClient.init<S3Input>().withMiddleware(s3Offload);
// !!

View File

@@ -0,0 +1,19 @@
import { taskWithMiddleware } from './workflow';
async function main() {
// > Running a task with middleware
const result = await taskWithMiddleware.run({
message: 'hello', // string (from TaskInput)
first: 1, // number (from GlobalInputType)
second: 2, // number (from GlobalInputType)
});
console.log('result', result.message); // string (from TaskOutput)
console.log('result', result.extra); // number (from GlobalOutputType)
console.log('result', result.additionalData); // number (from Post Middleware)
// !!
}
if (require.main === module) {
main();
}

View File

@@ -0,0 +1,14 @@
import { hatchetWithMiddleware } from './client';
import { taskWithMiddleware } from './workflow';
async function main() {
const worker = await hatchetWithMiddleware.worker('task-with-middleware', {
workflows: [taskWithMiddleware],
});
await worker.start();
}
if (require.main === module) {
main();
}

View File

@@ -0,0 +1,24 @@
import { hatchetWithMiddleware } from './client';
type TaskInput = {
message: string;
};
type TaskOutput = {
message: string;
};
export const taskWithMiddleware = hatchetWithMiddleware.task<TaskInput, TaskOutput>({
name: 'task-with-middleware',
fn: (input, _ctx) => {
console.log('task', input.message); // string (from TaskInput)
console.log('task', input.first); // number (from GlobalInputType)
console.log('task', input.second); // number (from GlobalInputType)
console.log('task', input.dependency); // string (from Pre Middleware)
return {
message: input.message,
extra: 1,
};
},
});
// !!

View File

@@ -17,6 +17,18 @@ export interface WorkflowOutputType {
[key: string]: JsonObject;
}
/**
* Resolves the effective type after middleware processing.
* Middleware return values replace (not merge) the original — if a hook
* omits fields by not spreading, those fields are stripped at runtime.
* Falls back to `Base` when no middleware is attached (`Middleware = {}`).
*/
export type Resolved<Base extends Record<string, any>, Middleware extends Record<string, any>> = [
keyof Middleware,
] extends [never]
? Base
: Middleware;
// Helper type to check if a type is a valid workflow output structure
type IsValidWorkflowOutput<T> = T extends Record<string, JsonObject> ? true : false;
@@ -31,3 +43,5 @@ export type StrictWorkflowOutputType<T = any> =
// Symbol used for the error message
declare const ERROR_WORKFLOW_OUTPUT: unique symbol;
export type { TaskMiddleware as HatchetMiddleware } from '@hatchet/clients/hatchet-client/client-config';