diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json index bb992545a..fd88d10e0 100644 --- a/sdks/typescript/package.json +++ b/sdks/typescript/package.json @@ -1,6 +1,6 @@ { "name": "@hatchet-dev/typescript-sdk", - "version": "1.9.8", + "version": "1.10.0", "description": "Background task orchestration & visibility for developers", "types": "dist/index.d.ts", "files": [ @@ -91,5 +91,8 @@ "yaml": "^2.7.1", "zod": "^3.24.2" }, + "optionalDependencies": { + "prom-client": "^15.1.3" + }, "packageManager": "pnpm@10.16.1" } diff --git a/sdks/typescript/pnpm-lock.yaml b/sdks/typescript/pnpm-lock.yaml index faa65f231..851efaece 100644 --- a/sdks/typescript/pnpm-lock.yaml +++ b/sdks/typescript/pnpm-lock.yaml @@ -144,6 +144,10 @@ importers: typescript: specifier: ^5.8.2 version: 5.9.2 + optionalDependencies: + prom-client: + specifier: ^15.1.3 + version: 15.1.3 packages: @@ -491,6 +495,10 @@ packages: resolution: {integrity: sha512-nn5ozdjYQpUCZlWGuxcJY/KpxkWQs4DcbMCmKojjyrYDEAGy4Ce19NN4v5MduafTwJlbKc99UA8YhSVqq9yPZA==} engines: {node: '>=12.4.0'} + '@opentelemetry/api@1.9.0': + resolution: {integrity: sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==} + engines: {node: '>=8.0.0'} + '@pkgr/core@0.2.9': resolution: {integrity: sha512-QNqXyfVS2wm9hweSYD2O7F0G06uurj9kZ96TRQE5Y9hU7+tgdZwIkbAKc5Ocy1HxEY2kuDQa6cQ1WRs/O5LFKA==} engines: {node: ^12.20.0 || ^14.18.0 || >=16.0.0} @@ -978,6 +986,9 @@ packages: balanced-match@1.0.2: resolution: {integrity: sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==} + bintrees@1.0.2: + resolution: {integrity: sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==} + brace-expansion@1.1.12: resolution: {integrity: sha512-9T9UjW3r0UW5c1Q7GTwllptXwhvYmEzFhzMfZ9H7FQWt+uZePjZPjBP/W1ZEyZ1twGWom5/56TF4lPcqjnDHcg==} @@ -2426,6 +2437,10 @@ packages: process-warning@5.0.0: resolution: {integrity: sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA==} + prom-client@15.1.3: + resolution: {integrity: sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==} + engines: {node: ^16 || ^18 || >=20} + prompts@2.4.2: resolution: {integrity: sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==} engines: {node: '>= 6'} @@ -2715,6 +2730,9 @@ packages: resolution: {integrity: sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==} engines: {node: '>=10'} + tdigest@0.1.2: + resolution: {integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==} + test-exclude@6.0.0: resolution: {integrity: sha512-cAGWPIyOHU6zlmg88jwm7VRyXnMN7iV68OGAbYDk/Mh/xC/pzVPlQtY6ngoIH/5/tciuhGfvESU8GrHrcxD56w==} engines: {node: '>=8'} @@ -3496,6 +3514,9 @@ snapshots: '@nolyfill/is-core-module@1.0.39': {} + '@opentelemetry/api@1.9.0': + optional: true + '@pkgr/core@0.2.9': {} '@protobufjs/aspromise@1.1.2': {} @@ -4048,6 +4069,9 @@ snapshots: balanced-match@1.0.2: {} + bintrees@1.0.2: + optional: true + brace-expansion@1.1.12: dependencies: balanced-match: 1.0.2 @@ -5784,6 +5808,12 @@ snapshots: process-warning@5.0.0: {} + prom-client@15.1.3: + dependencies: + '@opentelemetry/api': 1.9.0 + tdigest: 0.1.2 + optional: true + prompts@2.4.2: dependencies: kleur: 3.0.3 @@ -6116,6 +6146,11 @@ snapshots: mkdirp: 1.0.4 yallist: 4.0.0 + tdigest@0.1.2: + dependencies: + bintrees: 1.0.2 + optional: true + test-exclude@6.0.0: dependencies: '@istanbuljs/schema': 0.1.3 diff --git a/sdks/typescript/src/clients/hatchet-client/client-config.ts b/sdks/typescript/src/clients/hatchet-client/client-config.ts index 8e99a73ca..48139772e 100644 --- a/sdks/typescript/src/clients/hatchet-client/client-config.ts +++ b/sdks/typescript/src/clients/hatchet-client/client-config.ts @@ -10,9 +10,15 @@ const ClientTLSConfigSchema = z.object({ server_name: z.string().optional(), }); +const HealthcheckConfigSchema = z.object({ + enabled: z.boolean().optional().default(false), + port: z.number().optional().default(8001), +}); + export const ClientConfigSchema = z.object({ token: z.string(), tls_config: ClientTLSConfigSchema, + healthcheck: HealthcheckConfigSchema.optional(), host_port: z.string(), api_url: z.string(), log_level: z.enum(['OFF', 'DEBUG', 'INFO', 'WARN', 'ERROR']).optional(), diff --git a/sdks/typescript/src/clients/hatchet-client/fixtures/.hatchet.yaml b/sdks/typescript/src/clients/hatchet-client/fixtures/.hatchet.yaml index f90aaa437..62bc0f7cf 100644 --- a/sdks/typescript/src/clients/hatchet-client/fixtures/.hatchet.yaml +++ b/sdks/typescript/src/clients/hatchet-client/fixtures/.hatchet.yaml @@ -6,3 +6,6 @@ tls_config: key_file: 'TLS_KEY_FILE_YAML' ca_file: 'TLS_ROOT_CA_FILE_YAML' server_name: 'TLS_SERVER_NAME_YAML' +healthcheck: + enabled: true + port: 8005 diff --git a/sdks/typescript/src/clients/hatchet-client/hatchet-client.test.ts b/sdks/typescript/src/clients/hatchet-client/hatchet-client.test.ts index 1b5c551fd..8065e7098 100644 --- a/sdks/typescript/src/clients/hatchet-client/hatchet-client.test.ts +++ b/sdks/typescript/src/clients/hatchet-client/hatchet-client.test.ts @@ -25,6 +25,10 @@ describe('Client', () => { server_name: 'TLS_SERVER_NAME', tls_strategy: 'tls', }, + healthcheck: { + enabled: true, + port: 8002, + }, }, { credentials: ChannelCredentials.createInsecure(), @@ -47,6 +51,10 @@ describe('Client', () => { ca_file: 'TLS_ROOT_CA_FILE', server_name: 'TLS_SERVER_NAME', }, + healthcheck: { + enabled: true, + port: 8002, + }, }) ); }); @@ -78,6 +86,10 @@ describe('Client', () => { server_name: 'TLS_SERVER_NAME', tls_strategy: 'tls', }, + healthcheck: { + enabled: false, + port: 8003, + }, }, { config_path: './fixtures/.hatchet.yaml', @@ -101,6 +113,10 @@ describe('Client', () => { ca_file: 'TLS_ROOT_CA_FILE', server_name: 'TLS_SERVER_NAME', }, + healthcheck: { + enabled: false, + port: 8003, + }, }) ); }); diff --git a/sdks/typescript/src/util/config-loader/config-loader.test.ts b/sdks/typescript/src/util/config-loader/config-loader.test.ts index 5f5ffc45a..ed07af0a3 100644 --- a/sdks/typescript/src/util/config-loader/config-loader.test.ts +++ b/sdks/typescript/src/util/config-loader/config-loader.test.ts @@ -8,6 +8,8 @@ fdescribe('ConfigLoader', () => { process.env.HATCHET_CLIENT_TLS_KEY_FILE = 'TLS_KEY_FILE'; process.env.HATCHET_CLIENT_TLS_ROOT_CA_FILE = 'TLS_ROOT_CA_FILE'; process.env.HATCHET_CLIENT_TLS_SERVER_NAME = 'TLS_SERVER_NAME'; + process.env.HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED = 'true'; + process.env.HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT = '8001'; }); it('should load from environment variables', () => { @@ -27,6 +29,10 @@ fdescribe('ConfigLoader', () => { ca_file: 'TLS_ROOT_CA_FILE', server_name: 'TLS_SERVER_NAME', }, + healthcheck: { + enabled: true, + port: 8001, + }, }); }); @@ -75,6 +81,10 @@ fdescribe('ConfigLoader', () => { ca_file: 'TLS_ROOT_CA_FILE_YAML', server_name: 'TLS_SERVER_NAME_YAML', }, + healthcheck: { + enabled: true, + port: 8002, + }, }); }); @@ -97,6 +107,10 @@ fdescribe('ConfigLoader', () => { ca_file: 'TLS_ROOT_CA_FILE_YAML', server_name: 'TLS_SERVER_NAME_YAML', }, + healthcheck: { + enabled: true, + port: 8002, + }, }); }); }); diff --git a/sdks/typescript/src/util/config-loader/config-loader.ts b/sdks/typescript/src/util/config-loader/config-loader.ts index eaae89aae..a4846d79b 100644 --- a/sdks/typescript/src/util/config-loader/config-loader.ts +++ b/sdks/typescript/src/util/config-loader/config-loader.ts @@ -17,7 +17,9 @@ type EnvVars = | 'HATCHET_CLIENT_TLS_ROOT_CA_FILE' | 'HATCHET_CLIENT_TLS_SERVER_NAME' | 'HATCHET_CLIENT_LOG_LEVEL' - | 'HATCHET_CLIENT_NAMESPACE'; + | 'HATCHET_CLIENT_NAMESPACE' + | 'HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED' + | 'HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT'; type TLSStrategy = 'tls' | 'mtls'; @@ -46,6 +48,12 @@ export class ConfigLoader { const token = override?.token ?? yaml?.token ?? this.env('HATCHET_CLIENT_TOKEN'); + const healthCheckConfig = override?.healthcheck ?? + yaml?.healthcheck ?? { + enabled: this.env('HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED') === 'true', + port: parseInt(this.env('HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT') || '8001', 10), + }; + if (!token) { throw new Error( 'No token provided. Provide it by setting the HATCHET_CLIENT_TOKEN environment variable.' @@ -91,6 +99,7 @@ export class ConfigLoader { host_port: grpcBroadcastAddress, api_url: apiUrl, tls_config: tlsConfig, + healthcheck: healthCheckConfig, log_level: override?.log_level ?? yaml?.log_level ?? diff --git a/sdks/typescript/src/util/config-loader/fixtures/.hatchet.yaml b/sdks/typescript/src/util/config-loader/fixtures/.hatchet.yaml index f90aaa437..288604783 100644 --- a/sdks/typescript/src/util/config-loader/fixtures/.hatchet.yaml +++ b/sdks/typescript/src/util/config-loader/fixtures/.hatchet.yaml @@ -6,3 +6,6 @@ tls_config: key_file: 'TLS_KEY_FILE_YAML' ca_file: 'TLS_ROOT_CA_FILE_YAML' server_name: 'TLS_SERVER_NAME_YAML' +healthcheck: + enabled: true + port: 8002 diff --git a/sdks/typescript/src/v1/client/worker/health-server.ts b/sdks/typescript/src/v1/client/worker/health-server.ts new file mode 100644 index 000000000..dc88a78e0 --- /dev/null +++ b/sdks/typescript/src/v1/client/worker/health-server.ts @@ -0,0 +1,164 @@ +import { createServer, Server, IncomingMessage, ServerResponse } from 'node:http'; +import { Logger } from '@hatchet/util/logger'; + +export const workerStatus = { + INITIALIZED: 'INITIALIZED', + STARTING: 'STARTING', + HEALTHY: 'HEALTHY', + UNHEALTHY: 'UNHEALTHY', +} as const; +export type WorkerStatus = (typeof workerStatus)[keyof typeof workerStatus]; + +interface HealthCheckResponse { + status: string; + name: string; + slots: number; + actions: string[]; + labels: Record; + nodeVersion: string; +} + +export class HealthServer { + private server: Server | null = null; + private register: any = null; + private workerStatusGauge: any = null; + private workerSlotsGauge: any = null; + private workerActionsGauge: any = null; + private metricsInitialized: boolean = false; + + constructor( + private port: number, + private getStatus: () => WorkerStatus, + private workerName: string, + private getSlots: () => number, + private getActions: () => string[], + private getLabels: () => Record, + private logger: Logger + ) { + this.initializeMetrics(); + } + + private async handleRequest(req: IncomingMessage, res: ServerResponse): Promise { + const url = req.url || ''; + + if (url === '/health' && req.method === 'GET') { + await this.handleHealth(res); + } else if (url === '/metrics' && req.method === 'GET') { + await this.handleMetrics(res); + } else { + res.writeHead(404, { 'Content-Type': 'text/plain' }); + res.end('Not Found'); + } + } + + private async handleHealth(res: ServerResponse): Promise { + const response: HealthCheckResponse = { + status: this.getStatus(), + name: this.workerName, + slots: this.getSlots(), + actions: this.getActions(), + labels: this.getLabels(), + nodeVersion: process.version, + }; + + res.writeHead(200, { 'Content-Type': 'application/json' }); + await res.end(JSON.stringify(response)); + } + + private initializeMetrics(): void { + try { + // @ts-ignore - prom-client is an optional dependency + // eslint-disable-next-line + const { Registry, Gauge, collectDefaultMetrics } = require('prom-client'); + + this.register = new Registry(); + collectDefaultMetrics({ register: this.register }); + + this.workerStatusGauge = new Gauge({ + name: 'hatchet_worker_status', + help: 'Current status of the Hatchet worker', + registers: [this.register], + collect: () => { + this.workerStatusGauge!.set(this.getStatus() === workerStatus.HEALTHY ? 1 : 0); + }, + }); + + this.workerSlotsGauge = new Gauge({ + name: 'hatchet_worker_slots', + help: 'Total slots available on the worker', + registers: [this.register], + collect: () => { + this.workerSlotsGauge!.set(this.getSlots()); + }, + }); + + this.workerActionsGauge = new Gauge({ + name: 'hatchet_worker_actions', + help: 'Number of registered actions on the worker', + registers: [this.register], + collect: () => { + this.workerActionsGauge!.set(this.getActions().length); + }, + }); + this.metricsInitialized = true; + } catch (error) { + this.metricsInitialized = false; + this.logger.error('Metrics initialization failed - prom-client dependency not installed'); + } + } + + private async handleMetrics(res: ServerResponse): Promise { + if (!this.metricsInitialized || !this.register) { + this.logger.error('Metrics initialization failed - prom-client dependency not installed'); + res.writeHead(503, { 'Content-Type': 'text/plain' }); + res.end('Metrics initialization failed'); + return; + } + + try { + const metrics = await this.register.metrics(); + res.writeHead(200, { 'Content-Type': this.register.contentType }); + res.end(metrics); + } catch (error) { + this.logger.error(`Error generating metrics: ${error}`); + res.writeHead(500, { 'Content-Type': 'text/plain' }); + res.end('Error generating metrics'); + } + } + + async start(): Promise { + return new Promise((resolve, reject) => { + try { + this.server = createServer((req, res) => { + this.handleRequest(req, res); + }); + + this.server.listen(this.port, '0.0.0.0', () => { + this.logger.info(`Health check server running on port ${this.port}`); + resolve(); + }); + + this.server.on('error', (error) => { + this.logger.error(`Failed to start health check server: ${error.message}`); + reject(error); + }); + } catch (error) { + this.logger.error(`Failed to start health check server: ${error}`); + reject(error); + } + }); + } + + async stop(): Promise { + if (this.server) { + return new Promise((resolve) => { + this.server!.close(() => { + this.logger.info('Health check server stopped!'); + resolve(); + }); + }); + } + + return Promise.resolve(); + } +} diff --git a/sdks/typescript/src/v1/client/worker/worker-internal.ts b/sdks/typescript/src/v1/client/worker/worker-internal.ts index 7feb19c06..e009c76fc 100644 --- a/sdks/typescript/src/v1/client/worker/worker-internal.ts +++ b/sdks/typescript/src/v1/client/worker/worker-internal.ts @@ -37,6 +37,7 @@ import { CreateStep, mapRateLimit, StepRunFunction } from '@hatchet/step'; import { applyNamespace } from '@hatchet/util/apply-namespace'; import { Context, DurableContext } from './context'; import { parentRunContextManager } from '../../parent-run-context-vars'; +import { HealthServer, workerStatus, type WorkerStatus } from './health-server'; export type ActionRegistry = Record; @@ -45,6 +46,8 @@ export interface WorkerOpts { handleKill?: boolean; maxRuns?: number; labels?: WorkerLabels; + healthPort?: number; + enableHealthServer?: boolean; } export class V1Worker { @@ -67,6 +70,12 @@ export class V1Worker { labels: WorkerLabels = {}; + healthPort: number; + enableHealthServer: boolean; + + private healthServer: HealthServer | undefined; + private status: WorkerStatus = workerStatus.INITIALIZED; + constructor( client: HatchetClient, options: { @@ -83,6 +92,9 @@ export class V1Worker { this.labels = options.labels || {}; + this.enableHealthServer = client.config.healthcheck?.enabled ?? false; + this.healthPort = client.config.healthcheck?.port ?? 8001; + process.on('SIGTERM', () => this.exitGracefully(true)); process.on('SIGINT', () => this.exitGracefully(true)); @@ -90,6 +102,54 @@ export class V1Worker { this.handle_kill = options.handleKill === undefined ? true : options.handleKill; this.logger = client.config.logger(`Worker/${this.name}`, this.client.config.log_level); + + if (this.enableHealthServer && this.healthPort) { + this.initializeHealthServer(); + } + } + + private initializeHealthServer(): void { + if (!this.healthPort) { + this.logger.warn('Health server enabled but no port specified'); + return; + } + + this.healthServer = new HealthServer( + this.healthPort, + () => this.status, + this.name, + () => this.getAvailableSlots(), + () => this.getRegisteredActions(), + () => this.getFilteredLabels(), + this.logger + ); + } + + private getAvailableSlots(): number { + if (!this.maxRuns) { + return 0; + } + const currentRuns = Object.keys(this.futures).length; + return Math.max(0, this.maxRuns - currentRuns); + } + + private getRegisteredActions(): string[] { + return Object.keys(this.action_registry); + } + + private getFilteredLabels(): Record { + const filtered: Record = {}; + for (const [key, value] of Object.entries(this.labels)) { + if (value !== undefined) { + filtered[key] = value; + } + } + return filtered; + } + + private setStatus(status: WorkerStatus): void { + this.status = status; + this.logger.debug(`Worker status changed to: ${status}`); } private registerActions(workflow: Workflow) { @@ -768,6 +828,7 @@ export class V1Worker { async exitGracefully(handleKill: boolean) { this.killing = true; + this.setStatus(workerStatus.UNHEALTHY); this.logger.info('Starting to exit...'); @@ -784,6 +845,14 @@ export class V1Worker { this.logger.info('Successfully finished pending tasks.'); + if (this.healthServer) { + try { + await this.healthServer.stop(); + } catch (e: any) { + this.logger.error(`Could not stop health server: ${e.message}`); + } + } + if (handleKill) { this.logger.info('Exiting hatchet worker...'); process.exit(0); @@ -791,6 +860,18 @@ export class V1Worker { } async start() { + this.setStatus(workerStatus.STARTING); + + if (this.healthServer) { + try { + await this.healthServer.start(); + } catch (e: any) { + this.logger.error(`Could not start health server: ${e.message}`); + this.setStatus(workerStatus.UNHEALTHY); + return; + } + } + // ensure all workflows are registered await Promise.all(this.registeredWorkflowPromises); @@ -808,6 +889,7 @@ export class V1Worker { }); this.workerId = this.listener.workerId; + this.setStatus(workerStatus.HEALTHY); const generator = this.listener.actions(); @@ -821,6 +903,7 @@ export class V1Worker { void this.handleAction(action); } } catch (e: any) { + this.setStatus(workerStatus.UNHEALTHY); if (this.killing) { this.logger.info(`Exiting worker, ignoring error: ${e.message}`); return;