feat: add health and metrics api on typescript sdk worker (#2457)

* feat: add health and metrics api on typescript sdk worker

add: prom-client to fetch metrics data
add: track health status of worker across different states

* refactor: keep prom-client as optional dependency

* refactor: remove async import of prom-client

* chore: update package version for ts sdk

* fix: lint

* fix: lint, const enum

---------

Co-authored-by: mrkaye97 <mrkaye97@gmail.com>
This commit is contained in:
Jishnu
2025-11-02 01:18:27 +05:30
committed by GitHub
parent f12c3e96ce
commit e1f70ccb13
10 changed files with 338 additions and 2 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "@hatchet-dev/typescript-sdk",
"version": "1.9.8",
"version": "1.10.0",
"description": "Background task orchestration & visibility for developers",
"types": "dist/index.d.ts",
"files": [
@@ -91,5 +91,8 @@
"yaml": "^2.7.1",
"zod": "^3.24.2"
},
"optionalDependencies": {
"prom-client": "^15.1.3"
},
"packageManager": "pnpm@10.16.1"
}

View File

@@ -144,6 +144,10 @@ importers:
typescript:
specifier: ^5.8.2
version: 5.9.2
optionalDependencies:
prom-client:
specifier: ^15.1.3
version: 15.1.3
packages:
@@ -491,6 +495,10 @@ packages:
resolution: {integrity: sha512-nn5ozdjYQpUCZlWGuxcJY/KpxkWQs4DcbMCmKojjyrYDEAGy4Ce19NN4v5MduafTwJlbKc99UA8YhSVqq9yPZA==}
engines: {node: '>=12.4.0'}
'@opentelemetry/api@1.9.0':
resolution: {integrity: sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==}
engines: {node: '>=8.0.0'}
'@pkgr/core@0.2.9':
resolution: {integrity: sha512-QNqXyfVS2wm9hweSYD2O7F0G06uurj9kZ96TRQE5Y9hU7+tgdZwIkbAKc5Ocy1HxEY2kuDQa6cQ1WRs/O5LFKA==}
engines: {node: ^12.20.0 || ^14.18.0 || >=16.0.0}
@@ -978,6 +986,9 @@ packages:
balanced-match@1.0.2:
resolution: {integrity: sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==}
bintrees@1.0.2:
resolution: {integrity: sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==}
brace-expansion@1.1.12:
resolution: {integrity: sha512-9T9UjW3r0UW5c1Q7GTwllptXwhvYmEzFhzMfZ9H7FQWt+uZePjZPjBP/W1ZEyZ1twGWom5/56TF4lPcqjnDHcg==}
@@ -2426,6 +2437,10 @@ packages:
process-warning@5.0.0:
resolution: {integrity: sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA==}
prom-client@15.1.3:
resolution: {integrity: sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==}
engines: {node: ^16 || ^18 || >=20}
prompts@2.4.2:
resolution: {integrity: sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==}
engines: {node: '>= 6'}
@@ -2715,6 +2730,9 @@ packages:
resolution: {integrity: sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==}
engines: {node: '>=10'}
tdigest@0.1.2:
resolution: {integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==}
test-exclude@6.0.0:
resolution: {integrity: sha512-cAGWPIyOHU6zlmg88jwm7VRyXnMN7iV68OGAbYDk/Mh/xC/pzVPlQtY6ngoIH/5/tciuhGfvESU8GrHrcxD56w==}
engines: {node: '>=8'}
@@ -3496,6 +3514,9 @@ snapshots:
'@nolyfill/is-core-module@1.0.39': {}
'@opentelemetry/api@1.9.0':
optional: true
'@pkgr/core@0.2.9': {}
'@protobufjs/aspromise@1.1.2': {}
@@ -4048,6 +4069,9 @@ snapshots:
balanced-match@1.0.2: {}
bintrees@1.0.2:
optional: true
brace-expansion@1.1.12:
dependencies:
balanced-match: 1.0.2
@@ -5784,6 +5808,12 @@ snapshots:
process-warning@5.0.0: {}
prom-client@15.1.3:
dependencies:
'@opentelemetry/api': 1.9.0
tdigest: 0.1.2
optional: true
prompts@2.4.2:
dependencies:
kleur: 3.0.3
@@ -6116,6 +6146,11 @@ snapshots:
mkdirp: 1.0.4
yallist: 4.0.0
tdigest@0.1.2:
dependencies:
bintrees: 1.0.2
optional: true
test-exclude@6.0.0:
dependencies:
'@istanbuljs/schema': 0.1.3

View File

@@ -10,9 +10,15 @@ const ClientTLSConfigSchema = z.object({
server_name: z.string().optional(),
});
const HealthcheckConfigSchema = z.object({
enabled: z.boolean().optional().default(false),
port: z.number().optional().default(8001),
});
export const ClientConfigSchema = z.object({
token: z.string(),
tls_config: ClientTLSConfigSchema,
healthcheck: HealthcheckConfigSchema.optional(),
host_port: z.string(),
api_url: z.string(),
log_level: z.enum(['OFF', 'DEBUG', 'INFO', 'WARN', 'ERROR']).optional(),

View File

@@ -6,3 +6,6 @@ tls_config:
key_file: 'TLS_KEY_FILE_YAML'
ca_file: 'TLS_ROOT_CA_FILE_YAML'
server_name: 'TLS_SERVER_NAME_YAML'
healthcheck:
enabled: true
port: 8005

View File

@@ -25,6 +25,10 @@ describe('Client', () => {
server_name: 'TLS_SERVER_NAME',
tls_strategy: 'tls',
},
healthcheck: {
enabled: true,
port: 8002,
},
},
{
credentials: ChannelCredentials.createInsecure(),
@@ -47,6 +51,10 @@ describe('Client', () => {
ca_file: 'TLS_ROOT_CA_FILE',
server_name: 'TLS_SERVER_NAME',
},
healthcheck: {
enabled: true,
port: 8002,
},
})
);
});
@@ -78,6 +86,10 @@ describe('Client', () => {
server_name: 'TLS_SERVER_NAME',
tls_strategy: 'tls',
},
healthcheck: {
enabled: false,
port: 8003,
},
},
{
config_path: './fixtures/.hatchet.yaml',
@@ -101,6 +113,10 @@ describe('Client', () => {
ca_file: 'TLS_ROOT_CA_FILE',
server_name: 'TLS_SERVER_NAME',
},
healthcheck: {
enabled: false,
port: 8003,
},
})
);
});

View File

@@ -8,6 +8,8 @@ fdescribe('ConfigLoader', () => {
process.env.HATCHET_CLIENT_TLS_KEY_FILE = 'TLS_KEY_FILE';
process.env.HATCHET_CLIENT_TLS_ROOT_CA_FILE = 'TLS_ROOT_CA_FILE';
process.env.HATCHET_CLIENT_TLS_SERVER_NAME = 'TLS_SERVER_NAME';
process.env.HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED = 'true';
process.env.HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT = '8001';
});
it('should load from environment variables', () => {
@@ -27,6 +29,10 @@ fdescribe('ConfigLoader', () => {
ca_file: 'TLS_ROOT_CA_FILE',
server_name: 'TLS_SERVER_NAME',
},
healthcheck: {
enabled: true,
port: 8001,
},
});
});
@@ -75,6 +81,10 @@ fdescribe('ConfigLoader', () => {
ca_file: 'TLS_ROOT_CA_FILE_YAML',
server_name: 'TLS_SERVER_NAME_YAML',
},
healthcheck: {
enabled: true,
port: 8002,
},
});
});
@@ -97,6 +107,10 @@ fdescribe('ConfigLoader', () => {
ca_file: 'TLS_ROOT_CA_FILE_YAML',
server_name: 'TLS_SERVER_NAME_YAML',
},
healthcheck: {
enabled: true,
port: 8002,
},
});
});
});

View File

@@ -17,7 +17,9 @@ type EnvVars =
| 'HATCHET_CLIENT_TLS_ROOT_CA_FILE'
| 'HATCHET_CLIENT_TLS_SERVER_NAME'
| 'HATCHET_CLIENT_LOG_LEVEL'
| 'HATCHET_CLIENT_NAMESPACE';
| 'HATCHET_CLIENT_NAMESPACE'
| 'HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED'
| 'HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT';
type TLSStrategy = 'tls' | 'mtls';
@@ -46,6 +48,12 @@ export class ConfigLoader {
const token = override?.token ?? yaml?.token ?? this.env('HATCHET_CLIENT_TOKEN');
const healthCheckConfig = override?.healthcheck ??
yaml?.healthcheck ?? {
enabled: this.env('HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED') === 'true',
port: parseInt(this.env('HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT') || '8001', 10),
};
if (!token) {
throw new Error(
'No token provided. Provide it by setting the HATCHET_CLIENT_TOKEN environment variable.'
@@ -91,6 +99,7 @@ export class ConfigLoader {
host_port: grpcBroadcastAddress,
api_url: apiUrl,
tls_config: tlsConfig,
healthcheck: healthCheckConfig,
log_level:
override?.log_level ??
yaml?.log_level ??

View File

@@ -6,3 +6,6 @@ tls_config:
key_file: 'TLS_KEY_FILE_YAML'
ca_file: 'TLS_ROOT_CA_FILE_YAML'
server_name: 'TLS_SERVER_NAME_YAML'
healthcheck:
enabled: true
port: 8002

View File

@@ -0,0 +1,164 @@
import { createServer, Server, IncomingMessage, ServerResponse } from 'node:http';
import { Logger } from '@hatchet/util/logger';
export const workerStatus = {
INITIALIZED: 'INITIALIZED',
STARTING: 'STARTING',
HEALTHY: 'HEALTHY',
UNHEALTHY: 'UNHEALTHY',
} as const;
export type WorkerStatus = (typeof workerStatus)[keyof typeof workerStatus];
interface HealthCheckResponse {
status: string;
name: string;
slots: number;
actions: string[];
labels: Record<string, string | number>;
nodeVersion: string;
}
export class HealthServer {
private server: Server | null = null;
private register: any = null;
private workerStatusGauge: any = null;
private workerSlotsGauge: any = null;
private workerActionsGauge: any = null;
private metricsInitialized: boolean = false;
constructor(
private port: number,
private getStatus: () => WorkerStatus,
private workerName: string,
private getSlots: () => number,
private getActions: () => string[],
private getLabels: () => Record<string, string | number>,
private logger: Logger
) {
this.initializeMetrics();
}
private async handleRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
const url = req.url || '';
if (url === '/health' && req.method === 'GET') {
await this.handleHealth(res);
} else if (url === '/metrics' && req.method === 'GET') {
await this.handleMetrics(res);
} else {
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('Not Found');
}
}
private async handleHealth(res: ServerResponse): Promise<void> {
const response: HealthCheckResponse = {
status: this.getStatus(),
name: this.workerName,
slots: this.getSlots(),
actions: this.getActions(),
labels: this.getLabels(),
nodeVersion: process.version,
};
res.writeHead(200, { 'Content-Type': 'application/json' });
await res.end(JSON.stringify(response));
}
private initializeMetrics(): void {
try {
// @ts-ignore - prom-client is an optional dependency
// eslint-disable-next-line
const { Registry, Gauge, collectDefaultMetrics } = require('prom-client');
this.register = new Registry();
collectDefaultMetrics({ register: this.register });
this.workerStatusGauge = new Gauge({
name: 'hatchet_worker_status',
help: 'Current status of the Hatchet worker',
registers: [this.register],
collect: () => {
this.workerStatusGauge!.set(this.getStatus() === workerStatus.HEALTHY ? 1 : 0);
},
});
this.workerSlotsGauge = new Gauge({
name: 'hatchet_worker_slots',
help: 'Total slots available on the worker',
registers: [this.register],
collect: () => {
this.workerSlotsGauge!.set(this.getSlots());
},
});
this.workerActionsGauge = new Gauge({
name: 'hatchet_worker_actions',
help: 'Number of registered actions on the worker',
registers: [this.register],
collect: () => {
this.workerActionsGauge!.set(this.getActions().length);
},
});
this.metricsInitialized = true;
} catch (error) {
this.metricsInitialized = false;
this.logger.error('Metrics initialization failed - prom-client dependency not installed');
}
}
private async handleMetrics(res: ServerResponse): Promise<void> {
if (!this.metricsInitialized || !this.register) {
this.logger.error('Metrics initialization failed - prom-client dependency not installed');
res.writeHead(503, { 'Content-Type': 'text/plain' });
res.end('Metrics initialization failed');
return;
}
try {
const metrics = await this.register.metrics();
res.writeHead(200, { 'Content-Type': this.register.contentType });
res.end(metrics);
} catch (error) {
this.logger.error(`Error generating metrics: ${error}`);
res.writeHead(500, { 'Content-Type': 'text/plain' });
res.end('Error generating metrics');
}
}
async start(): Promise<void> {
return new Promise((resolve, reject) => {
try {
this.server = createServer((req, res) => {
this.handleRequest(req, res);
});
this.server.listen(this.port, '0.0.0.0', () => {
this.logger.info(`Health check server running on port ${this.port}`);
resolve();
});
this.server.on('error', (error) => {
this.logger.error(`Failed to start health check server: ${error.message}`);
reject(error);
});
} catch (error) {
this.logger.error(`Failed to start health check server: ${error}`);
reject(error);
}
});
}
async stop(): Promise<void> {
if (this.server) {
return new Promise((resolve) => {
this.server!.close(() => {
this.logger.info('Health check server stopped!');
resolve();
});
});
}
return Promise.resolve();
}
}

View File

@@ -37,6 +37,7 @@ import { CreateStep, mapRateLimit, StepRunFunction } from '@hatchet/step';
import { applyNamespace } from '@hatchet/util/apply-namespace';
import { Context, DurableContext } from './context';
import { parentRunContextManager } from '../../parent-run-context-vars';
import { HealthServer, workerStatus, type WorkerStatus } from './health-server';
export type ActionRegistry = Record<Action['actionId'], Function>;
@@ -45,6 +46,8 @@ export interface WorkerOpts {
handleKill?: boolean;
maxRuns?: number;
labels?: WorkerLabels;
healthPort?: number;
enableHealthServer?: boolean;
}
export class V1Worker {
@@ -67,6 +70,12 @@ export class V1Worker {
labels: WorkerLabels = {};
healthPort: number;
enableHealthServer: boolean;
private healthServer: HealthServer | undefined;
private status: WorkerStatus = workerStatus.INITIALIZED;
constructor(
client: HatchetClient,
options: {
@@ -83,6 +92,9 @@ export class V1Worker {
this.labels = options.labels || {};
this.enableHealthServer = client.config.healthcheck?.enabled ?? false;
this.healthPort = client.config.healthcheck?.port ?? 8001;
process.on('SIGTERM', () => this.exitGracefully(true));
process.on('SIGINT', () => this.exitGracefully(true));
@@ -90,6 +102,54 @@ export class V1Worker {
this.handle_kill = options.handleKill === undefined ? true : options.handleKill;
this.logger = client.config.logger(`Worker/${this.name}`, this.client.config.log_level);
if (this.enableHealthServer && this.healthPort) {
this.initializeHealthServer();
}
}
private initializeHealthServer(): void {
if (!this.healthPort) {
this.logger.warn('Health server enabled but no port specified');
return;
}
this.healthServer = new HealthServer(
this.healthPort,
() => this.status,
this.name,
() => this.getAvailableSlots(),
() => this.getRegisteredActions(),
() => this.getFilteredLabels(),
this.logger
);
}
private getAvailableSlots(): number {
if (!this.maxRuns) {
return 0;
}
const currentRuns = Object.keys(this.futures).length;
return Math.max(0, this.maxRuns - currentRuns);
}
private getRegisteredActions(): string[] {
return Object.keys(this.action_registry);
}
private getFilteredLabels(): Record<string, string | number> {
const filtered: Record<string, string | number> = {};
for (const [key, value] of Object.entries(this.labels)) {
if (value !== undefined) {
filtered[key] = value;
}
}
return filtered;
}
private setStatus(status: WorkerStatus): void {
this.status = status;
this.logger.debug(`Worker status changed to: ${status}`);
}
private registerActions(workflow: Workflow) {
@@ -768,6 +828,7 @@ export class V1Worker {
async exitGracefully(handleKill: boolean) {
this.killing = true;
this.setStatus(workerStatus.UNHEALTHY);
this.logger.info('Starting to exit...');
@@ -784,6 +845,14 @@ export class V1Worker {
this.logger.info('Successfully finished pending tasks.');
if (this.healthServer) {
try {
await this.healthServer.stop();
} catch (e: any) {
this.logger.error(`Could not stop health server: ${e.message}`);
}
}
if (handleKill) {
this.logger.info('Exiting hatchet worker...');
process.exit(0);
@@ -791,6 +860,18 @@ export class V1Worker {
}
async start() {
this.setStatus(workerStatus.STARTING);
if (this.healthServer) {
try {
await this.healthServer.start();
} catch (e: any) {
this.logger.error(`Could not start health server: ${e.message}`);
this.setStatus(workerStatus.UNHEALTHY);
return;
}
}
// ensure all workflows are registered
await Promise.all(this.registeredWorkflowPromises);
@@ -808,6 +889,7 @@ export class V1Worker {
});
this.workerId = this.listener.workerId;
this.setStatus(workerStatus.HEALTHY);
const generator = this.listener.actions();
@@ -821,6 +903,7 @@ export class V1Worker {
void this.handleAction(action);
}
} catch (e: any) {
this.setStatus(workerStatus.UNHEALTHY);
if (this.killing) {
this.logger.info(`Exiting worker, ignoring error: ${e.message}`);
return;