feat: handle server initiated eviction on workers (#3200)

* refactor: ts in sync with python

* fix: waits respect abort signal

* chore: gen, lint

* fix: update key extraction logic in durable workflow example

* fix: flake

* fix: racey test

* chore: gen

* feat: send eviction to engine

* feat: handle eviction on workers

* fix: tests and invocation scoped eviction

* fix: cleanup scope

* refactor: cleanup how we do action keys in ts

* chore: gen, lint

* fix: test

* chore: lint

* fix: mock

* fix: cleanup

* chore: lint

* chore: copilot feedback

* fix: reconnect durable listener

* chore: feedback

* fix: bump worker status on wait and reconnect

* refactor: rename eviction notice

* fix: return error on query failure

* chore: lint

* chore: gen

* refactor: remove early completion

* Revert "refactor: remove early completion"

This reverts commit f756ce37b0.

* chore: remove unused param
This commit is contained in:
Gabe Ruttner
2026-03-09 05:16:46 -07:00
committed by GitHub
parent f667b7a630
commit 468de0c584
33 changed files with 2315 additions and 622 deletions
+9
View File
@@ -72,6 +72,14 @@ message DurableTaskAwaitedCompletedEntry {
string durable_task_external_id = 1;
int64 branch_id = 2;
int64 node_id = 3;
int32 invocation_count = 4;
}
// Sent by the server to notify a worker that its invocation is stale and should be cancelled.
message DurableTaskServerEvictNotice {
string durable_task_external_id = 1;
int32 invocation_count = 2;
string reason = 3;
}
message DurableTaskWorkerStatusRequest {
@@ -157,6 +165,7 @@ message DurableTaskResponse {
DurableTaskEventLogEntryCompletedResponse entry_completed = 5;
DurableTaskErrorResponse error = 6;
DurableTaskEvictionAckResponse eviction_ack = 7;
DurableTaskServerEvictNotice server_evict = 8;
}
}
+56 -66
View File
@@ -1,78 +1,68 @@
import {
Or,
SleepCondition,
UserEventCondition,
} from "@hatchet-dev/typescript-sdk/v1/conditions";
import { NonDeterminismError } from "@hatchet-dev/typescript-sdk/util/errors/non-determinism-error";
import sleep from "@hatchet-dev/typescript-sdk/util/sleep";
import { hatchet } from "../hatchet-client";
import { Or, SleepCondition, UserEventCondition } from '@hatchet-dev/typescript-sdk/v1/conditions';
import { NonDeterminismError } from '@hatchet-dev/typescript-sdk/util/errors/non-determinism-error';
import sleep from '@hatchet-dev/typescript-sdk/util/sleep';
import { hatchet } from '../hatchet-client';
export const EVENT_KEY = "durable-example:event";
export const EVENT_KEY = 'durable-example:event';
export const SLEEP_TIME_SECONDS = 2;
export const SLEEP_TIME = `${SLEEP_TIME_SECONDS}s` as const;
// > Create a durable workflow
export const durableWorkflow = hatchet.workflow({
name: "durable-workflow",
name: 'durable-workflow',
});
durableWorkflow.task({
name: "ephemeral_task",
name: 'ephemeral_task',
fn: async () => {
console.log("Running non-durable task");
console.log('Running non-durable task');
},
});
durableWorkflow.durableTask({
name: "durable_task",
executionTimeout: "10m",
name: 'durable_task',
executionTimeout: '10m',
fn: async (_input, ctx) => {
console.log("Waiting for sleep");
console.log('Waiting for sleep');
await ctx.sleepFor(SLEEP_TIME);
console.log("Sleep finished");
console.log('Sleep finished');
console.log("Waiting for event");
console.log('Waiting for event');
await ctx.waitFor({ eventKey: EVENT_KEY });
console.log("Event received");
console.log('Event received');
return { status: "success" };
return { status: 'success' };
},
});
function extractKeyAndEventId(waitResult: unknown): {
key: string;
eventId: string;
} {
function extractKeyAndEventId(waitResult: unknown): { key: string; eventId: string } {
// DurableContext.waitFor currently returns the CREATE payload directly.
// The shape is typically `{ [readableDataKey]: { [eventId]: ... } }`.
const obj = waitResult as Record<string, Record<string, unknown>>;
if (obj && typeof obj === "object") {
const key = Object.keys(obj)[0];
if (obj && typeof obj === 'object') {
const [key] = Object.keys(obj);
const inner = obj[key];
if (inner && typeof inner === "object" && !Array.isArray(inner)) {
const eventId = Object.keys(inner)[0];
if (inner && typeof inner === 'object' && !Array.isArray(inner)) {
const [eventId] = Object.keys(inner);
if (eventId) {
return { key, eventId };
}
}
if (key) {
return { key: "CREATE", eventId: key };
return { key: 'CREATE', eventId: key };
}
}
return { key: "CREATE", eventId: "" };
return { key: 'CREATE', eventId: '' };
}
durableWorkflow.durableTask({
name: "wait_for_or_group_1",
executionTimeout: "10m",
name: 'wait_for_or_group_1',
executionTimeout: '10m',
fn: async (_input, ctx) => {
const start = Date.now();
const waitResult = await ctx.waitFor(
Or(
new SleepCondition(SLEEP_TIME, "sleep"),
new UserEventCondition(EVENT_KEY, "", "event"),
),
Or(new SleepCondition(SLEEP_TIME, 'sleep'), new UserEventCondition(EVENT_KEY, '', 'event'))
);
const { key, eventId } = extractKeyAndEventId(waitResult);
return {
@@ -84,15 +74,15 @@ durableWorkflow.durableTask({
});
durableWorkflow.durableTask({
name: "wait_for_or_group_2",
executionTimeout: "10m",
name: 'wait_for_or_group_2',
executionTimeout: '10m',
fn: async (_input, ctx) => {
const start = Date.now();
const waitResult = await ctx.waitFor(
Or(
new SleepCondition(`${6 * SLEEP_TIME_SECONDS}s`, "sleep"),
new UserEventCondition(EVENT_KEY, "", "event"),
),
new SleepCondition(`${6 * SLEEP_TIME_SECONDS}s`, 'sleep'),
new UserEventCondition(EVENT_KEY, '', 'event')
)
);
const { key, eventId } = extractKeyAndEventId(waitResult);
return {
@@ -104,8 +94,8 @@ durableWorkflow.durableTask({
});
durableWorkflow.durableTask({
name: "wait_for_multi_sleep",
executionTimeout: "10m",
name: 'wait_for_multi_sleep',
executionTimeout: '10m',
fn: async (_input, ctx) => {
const start = Date.now();
// sleep 3 times
@@ -118,8 +108,8 @@ durableWorkflow.durableTask({
});
export const waitForSleepTwice = hatchet.durableTask({
name: "wait-for-sleep-twice",
executionTimeout: "10m",
name: 'wait-for-sleep-twice',
executionTimeout: '10m',
fn: async (_input, ctx) => {
try {
const start = Date.now();
@@ -134,15 +124,15 @@ export const waitForSleepTwice = hatchet.durableTask({
// --- Spawn child from durable task ---
export const spawnChildTask = hatchet.task({
name: "spawn-child-task",
name: 'spawn-child-task',
fn: async (input: { n?: number }) => {
return { message: `hello from child ${input.n ?? 1}` };
},
});
export const durableWithSpawn = hatchet.durableTask({
name: "durable-with-spawn",
executionTimeout: "10s",
name: 'durable-with-spawn',
executionTimeout: '10s',
fn: async (_input, ctx) => {
const childResult = await spawnChildTask.run({});
return { child_output: childResult };
@@ -150,8 +140,8 @@ export const durableWithSpawn = hatchet.durableTask({
});
export const durableWithBulkSpawn = hatchet.durableTask({
name: "durable-with-bulk-spawn",
executionTimeout: "10m",
name: 'durable-with-bulk-spawn',
executionTimeout: '10m',
fn: async (input: { n?: number }, ctx) => {
const n = input.n ?? 10;
const inputs = Array.from({ length: n }, (_, i) => ({ n: i }));
@@ -161,8 +151,8 @@ export const durableWithBulkSpawn = hatchet.durableTask({
});
export const durableSleepEventSpawn = hatchet.durableTask({
name: "durable-sleep-event-spawn",
executionTimeout: "10m",
name: 'durable-sleep-event-spawn',
executionTimeout: '10m',
fn: async (_input, ctx) => {
const start = Date.now();
@@ -182,8 +172,8 @@ export const durableSleepEventSpawn = hatchet.durableTask({
// --- Spawn child using explicit ctx.spawnChild ---
export const durableWithExplicitSpawn = hatchet.durableTask({
name: "durable-with-explicit-spawn",
executionTimeout: "10m",
name: 'durable-with-explicit-spawn',
executionTimeout: '10m',
fn: async (_input, ctx) => {
const childResult = await ctx.spawnChild(spawnChildTask, {});
return { child_output: childResult };
@@ -193,8 +183,8 @@ export const durableWithExplicitSpawn = hatchet.durableTask({
// --- Non-determinism detection ---
export const durableNonDeterminism = hatchet.durableTask({
name: "durable-non-determinism",
executionTimeout: "10s",
name: 'durable-non-determinism',
executionTimeout: '10s',
fn: async (_input, ctx) => {
const sleepTime = ctx.invocationCount * 2;
@@ -225,8 +215,8 @@ export const durableNonDeterminism = hatchet.durableTask({
const MEMO_SLEEP_MS = 2000;
export const memoTask = hatchet.durableTask({
name: "memo-task",
executionTimeout: "10m",
name: 'memo-task',
executionTimeout: '10m',
fn: async (input: { message: string }, ctx) => {
const start = Date.now();
const res = await ctx.memo(async () => {
@@ -245,8 +235,8 @@ export const REPLAY_RESET_MEMOIZED_MAX_SECONDS = 5;
const REPLAY_RESET_SLEEP = `${REPLAY_RESET_SLEEP_SECONDS}s` as const;
export const durableReplayReset = hatchet.durableTask({
name: "durable-replay-reset",
executionTimeout: "20s",
name: 'durable-replay-reset',
executionTimeout: '20s',
fn: async (_input, ctx) => {
let start = Date.now();
await ctx.sleepFor(REPLAY_RESET_SLEEP);
@@ -271,29 +261,29 @@ export const durableReplayReset = hatchet.durableTask({
// --- Spawn DAG from durable task ---
export const dagChildWorkflow = hatchet.workflow({
name: "dag-child-workflow-ts",
name: 'dag-child-workflow-ts',
});
const dagChild1 = dagChildWorkflow.task({
name: "dag-child-1",
name: 'dag-child-1',
fn: async () => {
await sleep(1000);
return { result: "child1" };
return { result: 'child1' };
},
});
dagChildWorkflow.task({
name: "dag-child-2",
name: 'dag-child-2',
parents: [dagChild1],
fn: async () => {
await sleep(2000);
return { result: "child2" };
return { result: 'child2' };
},
});
export const durableSpawnDag = hatchet.durableTask({
name: "durable-spawn-dag",
executionTimeout: "10s",
name: 'durable-spawn-dag',
executionTimeout: '10s',
fn: async (_input, ctx) => {
const sleepStart = Date.now();
const sleepResult = await ctx.sleepFor(SLEEP_TIME);
@@ -18,7 +18,7 @@ export const child = hatchet.workflow<ChildInput>({
export const child1 = child.task({
name: 'child1',
fn: async (input: ChildInput, ctx) => {
await sleep(10000);
await sleep(30 * 1000);
ctx.logger.info('hello from the child1', { hello: 'moon' });
return {
@@ -50,6 +50,7 @@ export const child3 = child.task({
export const parent = hatchet.task({
name: 'parent',
executionTimeout: '5m',
fn: async (input: ParentInput, ctx) => {
const c = await ctx.runChild(child, {
Message: input.Message,
+1 -1
View File
@@ -418,7 +418,7 @@ func (d *DispatcherImpl) handleDurableCallbackCompleted(ctx context.Context, tas
)
if err != nil {
d.l.Error().Err(err).Msgf("failed to deliver callback completion for task %s", payload.TaskExternalId)
d.l.Warn().Err(err).Msgf("failed to deliver callback completion for task %s (worker may still be reconnecting; polling path will catch up)", payload.TaskExternalId)
}
}
+77
View File
@@ -478,6 +478,18 @@ func (d *DispatcherServiceImpl) sendNonDeterminismError(invocation *durableTaskI
})
}
func (d *DispatcherServiceImpl) sendStaleInvocationEviction(invocation *durableTaskInvocation, sie *v1.StaleInvocationError) error {
return invocation.send(&contracts.DurableTaskResponse{
Message: &contracts.DurableTaskResponse_ServerEvict{
ServerEvict: &contracts.DurableTaskServerEvictNotice{
DurableTaskExternalId: sie.TaskExternalId.String(),
InvocationCount: sie.ActualInvocationCount,
Reason: sie.Error(),
},
},
})
}
func (d *DispatcherServiceImpl) deliverSatisfiedEntries(taskExternalId string, result *v1.IngestDurableTaskEventResult) error {
switch result.Kind {
case sqlcv1.V1DurableEventLogKindRUN:
@@ -556,8 +568,11 @@ func (d *DispatcherServiceImpl) handleMemo(
})
var nde *v1.NonDeterminismError
var sie *v1.StaleInvocationError
if err != nil && errors.As(err, &nde) {
return d.sendNonDeterminismError(invocation, nde, req.InvocationCount)
} else if err != nil && errors.As(err, &sie) {
return d.sendStaleInvocationEviction(invocation, sie)
} else if err != nil {
return status.Errorf(codes.Internal, "failed to ingest memo event: %v", err)
}
@@ -624,8 +639,11 @@ func (d *DispatcherServiceImpl) handleTriggerRuns(
})
var nde *v1.NonDeterminismError
var sie *v1.StaleInvocationError
if err != nil && errors.As(err, &nde) {
return d.sendNonDeterminismError(invocation, nde, req.InvocationCount)
} else if err != nil && errors.As(err, &sie) {
return d.sendStaleInvocationEviction(invocation, sie)
} else if err != nil {
return status.Errorf(codes.Internal, "failed to ingest trigger runs event: %v", err)
}
@@ -722,8 +740,11 @@ func (d *DispatcherServiceImpl) handleWaitFor(
})
var nde *v1.NonDeterminismError
var sie *v1.StaleInvocationError
if err != nil && errors.As(err, &nde) {
return d.sendNonDeterminismError(invocation, nde, req.InvocationCount)
} else if err != nil && errors.As(err, &sie) {
return d.sendStaleInvocationEviction(invocation, sie)
} else if err != nil {
return status.Errorf(codes.Internal, "failed to ingest wait_for event: %v", err)
}
@@ -864,13 +885,18 @@ func (d *DispatcherServiceImpl) handleWorkerStatus(
return nil
}
uniqueExternalIds := make(map[uuid.UUID]int32)
waiting := make([]v1.TaskExternalIdNodeIdBranchId, 0, len(req.WaitingEntries))
for _, cb := range req.WaitingEntries {
taskExternalId, err := uuid.Parse(cb.DurableTaskExternalId)
if err != nil {
d.l.Warn().Err(err).Msgf("invalid durable_task_external_id in worker_status: %s", cb.DurableTaskExternalId)
continue
}
uniqueExternalIds[taskExternalId] = cb.InvocationCount
waiting = append(waiting, v1.TaskExternalIdNodeIdBranchId{
TaskExternalId: taskExternalId,
NodeId: cb.NodeId,
@@ -882,6 +908,57 @@ func (d *DispatcherServiceImpl) handleWorkerStatus(
return nil
}
if len(uniqueExternalIds) > 0 {
externalIds := make([]uuid.UUID, 0, len(uniqueExternalIds))
for extId := range uniqueExternalIds {
externalIds = append(externalIds, extId)
}
tasks, err := d.repo.Tasks().FlattenExternalIds(ctx, invocation.tenantId, externalIds)
if err != nil {
return fmt.Errorf("failed to look up tasks for invocation count check in worker_status: %w", err)
}
if len(tasks) > 0 {
idInsertedAts := make([]v1.IdInsertedAt, 0, len(tasks))
taskIdToExternalId := make(map[v1.IdInsertedAt]uuid.UUID, len(tasks))
for _, t := range tasks {
key := v1.IdInsertedAt{ID: t.ID, InsertedAt: t.InsertedAt}
idInsertedAts = append(idInsertedAts, key)
taskIdToExternalId[key] = t.ExternalID
}
idInsertedAtToInvocationCount, err := d.repo.DurableEvents().GetDurableTaskInvocationCounts(ctx, invocation.tenantId, idInsertedAts)
if err != nil {
return fmt.Errorf("failed to get invocation counts in worker_status: %w", err)
}
for key, currentCount := range idInsertedAtToInvocationCount {
extId, ok := taskIdToExternalId[key]
if !ok || currentCount == nil {
continue
}
workerInvocationCount, has := uniqueExternalIds[extId]
if !has {
continue
}
if workerInvocationCount < *currentCount {
err = invocation.send(&contracts.DurableTaskResponse{
Message: &contracts.DurableTaskResponse_ServerEvict{
ServerEvict: &contracts.DurableTaskServerEvictNotice{
DurableTaskExternalId: extId.String(),
InvocationCount: workerInvocationCount,
Reason: fmt.Sprintf("stale invocation: server has %d, worker sent %d", *currentCount, workerInvocationCount),
},
},
})
if err != nil {
d.l.Error().Err(err).Msgf("failed to send server eviction notification for task %s", extId.String())
}
}
}
}
}
callbacks, err := d.repo.DurableEvents().GetSatisfiedDurableEvents(ctx, invocation.tenantId, waiting)
if err != nil {
return fmt.Errorf("failed to get satisfied callbacks: %w", err)
File diff suppressed because it is too large Load Diff
+15 -2
View File
@@ -155,6 +155,16 @@ func (m *NonDeterminismError) Error() string {
return fmt.Sprintf("non-determinism detected for durable event log entry in task %s at node id %d", m.TaskExternalId.String(), m.NodeId)
}
type StaleInvocationError struct {
TaskExternalId uuid.UUID
ExpectedInvocationCount int32
ActualInvocationCount int32
}
func (e *StaleInvocationError) Error() string {
return fmt.Sprintf("invocation count mismatch for task %s: server has %d, worker sent %d", e.TaskExternalId.String(), e.ExpectedInvocationCount, e.ActualInvocationCount)
}
type GetOrCreateLogEntryOpt struct {
Kind sqlcv1.V1DurableEventLogKind
IdempotencyKey []byte
@@ -653,8 +663,11 @@ func (r *durableEventsRepository) IngestDurableTaskEvent(ctx context.Context, op
}
if logFile.LatestInvocationCount != opts.InvocationCount {
// TODO-DURABLE: should evict this invocation if this happens
return nil, fmt.Errorf("invocation count mismatch: expected %d, got %d, rejecting event write", logFile.LatestInvocationCount, opts.InvocationCount)
return nil, &StaleInvocationError{
TaskExternalId: opts.Task.ExternalID,
ExpectedInvocationCount: logFile.LatestInvocationCount,
ActualInvocationCount: opts.InvocationCount,
}
}
baseNodeId := logFile.LatestNodeID + 1
+26
View File
@@ -3,13 +3,39 @@
package repository
import (
"errors"
"testing"
"github.com/google/uuid"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
"github.com/stretchr/testify/assert"
)
func TestStaleInvocationError_ImplementsError(t *testing.T) {
id := uuid.New()
err := &StaleInvocationError{
TaskExternalId: id,
ExpectedInvocationCount: 3,
ActualInvocationCount: 1,
}
var target *StaleInvocationError
assert.True(t, errors.As(err, &target))
assert.Equal(t, id, target.TaskExternalId)
assert.Equal(t, int32(3), target.ExpectedInvocationCount)
assert.Equal(t, int32(1), target.ActualInvocationCount)
assert.Contains(t, err.Error(), id.String())
assert.Contains(t, err.Error(), "server has 3")
assert.Contains(t, err.Error(), "worker sent 1")
}
func TestStaleInvocationError_NotMatchedByOtherErrors(t *testing.T) {
err := errors.New("some other error")
var target *StaleInvocationError
assert.False(t, errors.As(err, &target))
}
func TestResolveBranchForNode_NoBranchPoints(t *testing.T) {
// Single branch, no forks. All nodes resolve to branch 1.
branchPoints := map[int64]*sqlcv1.V1DurableEventLogBranchPoint{}
@@ -1,6 +1,6 @@
import asyncio
import json
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Callable
from contextlib import suppress
from dataclasses import dataclass
from typing import Annotated, Literal, cast
@@ -136,7 +136,12 @@ PendingEvictionAck = tuple[TaskExternalId, InvocationCount]
class DurableEventListener:
def __init__(self, config: ClientConfig, admin_client: AdminClient):
def __init__(
self,
config: ClientConfig,
admin_client: AdminClient,
on_server_evict: Callable[[str, int], None] | None = None,
):
self.config = config
self.token = config.token
self.admin_client = admin_client
@@ -158,9 +163,11 @@ class DurableEventListener:
PendingCallback, asyncio.Future[DurableTaskEventLogEntryResult]
] = {}
# TODO-DURABLE: This is a hack to handle the case where a task is completed before the event listener is connected.
# We should probably figure out WHY this is happening and fix it.
self._early_completions: dict[
# Completions that arrived before wait_for_callback() registered a
# future in _pending_callbacks. This happens when the server delivers
# an entry_completed between the event ack and the wait_for_callback
# call (e.g. an already-satisfied sleep delivered via polling).
self._buffered_completions: dict[
PendingCallback, DurableTaskEventLogEntryResult
] = {}
@@ -169,6 +176,8 @@ class DurableEventListener:
self._running = False
self._start_lock = asyncio.Lock()
self._on_server_evict = on_server_evict
@property
def worker_id(self) -> str | None:
return self._worker_id
@@ -193,6 +202,7 @@ class DurableEventListener:
)
await self._register_worker()
await self._poll_worker_status()
logger.info("durable event listener connected")
async def start(self, worker_id: str) -> None:
@@ -251,10 +261,11 @@ class DurableEventListener:
waiting = [
DurableTaskAwaitedCompletedEntry(
durable_task_external_id=task_ext_id,
invocation_count=inv_count,
node_id=node_id,
branch_id=branch_id,
)
for (task_ext_id, _, branch_id, node_id) in self._pending_callbacks
for (task_ext_id, inv_count, branch_id, node_id) in self._pending_callbacks
]
request = DurableTaskRequest(
@@ -271,6 +282,11 @@ class DurableEventListener:
future.set_exception(exc)
self._pending_event_acks.clear()
for eviction_future in self._pending_eviction_acks.values():
if not eviction_future.done():
eviction_future.set_exception(exc)
self._pending_eviction_acks.clear()
async def _receive_loop(self) -> None:
while self._running:
if not self._stream:
@@ -395,7 +411,7 @@ class DurableEventListener:
completed_future.set_result(result)
del self._pending_callbacks[completed_key]
else:
self._early_completions[completed_key] = result
self._buffered_completions[completed_key] = result
elif response.HasField("eviction_ack"):
eviction_ack = response.eviction_ack
eviction_key = (
@@ -406,6 +422,19 @@ class DurableEventListener:
future = self._pending_eviction_acks.pop(eviction_key)
if not future.done():
future.set_result(None)
elif response.HasField("server_evict"):
evict = response.server_evict
logger.info(
f"received server eviction notification for task {evict.durable_task_external_id} "
f"invocation {evict.invocation_count}: {evict.reason}"
)
self.cleanup_task_state(
evict.durable_task_external_id, evict.invocation_count
)
if self._on_server_evict is not None:
self._on_server_evict(
evict.durable_task_external_id, evict.invocation_count
)
elif response.HasField("error"):
error = response.error
exc: Exception
@@ -537,12 +566,13 @@ class DurableEventListener:
) -> DurableTaskEventLogEntryResult:
key = (durable_task_external_id, invocation_count, branch_id, node_id)
if key in self._early_completions:
return self._early_completions.pop(key)
if key in self._buffered_completions:
return self._buffered_completions.pop(key)
if key not in self._pending_callbacks:
future: asyncio.Future[DurableTaskEventLogEntryResult] = asyncio.Future()
self._pending_callbacks[key] = future
await self._poll_worker_status()
return await self._pending_callbacks[key]
@@ -572,11 +602,11 @@ class DurableEventListener:
stale_early_keys = [
ek
for ek in self._early_completions
for ek in self._buffered_completions
if ek[0] == durable_task_external_id and ek[1] <= invocation_count
]
for ek in stale_early_keys:
del self._early_completions[ek]
del self._buffered_completions[ek]
_EVICTION_ACK_TIMEOUT_S = 30.0
@@ -65,9 +65,6 @@ TMemo = TypeVar("TMemo", bound=ValidTaskReturnType)
if TYPE_CHECKING:
from hatchet_sdk.runnables.task import Task
from hatchet_sdk.runnables.workflow import (
BaseWorkflow,
)
def _compute_memo_key(task_run_external_id: str, *args: Any, **kwargs: Any) -> bytes:
@@ -626,8 +623,6 @@ class DurableContext(Context):
# TODO-DURABLE: instrumentor for this
async def _spawn_children_no_wait(
self,
## TODO-DURABLE: Remove this param?
workflow: BaseWorkflow[TWorkflowInput],
configs: list[WorkflowRunTriggerConfig],
) -> list[tuple[int, int, str]]:
listener = self._durable_listener
File diff suppressed because one or more lines are too long
@@ -101,14 +101,26 @@ class DurableTaskEvictionAckResponse(_message.Message):
def __init__(self, invocation_count: _Optional[int] = ..., durable_task_external_id: _Optional[str] = ...) -> None: ...
class DurableTaskAwaitedCompletedEntry(_message.Message):
__slots__ = ("durable_task_external_id", "branch_id", "node_id")
__slots__ = ("durable_task_external_id", "branch_id", "node_id", "invocation_count")
DURABLE_TASK_EXTERNAL_ID_FIELD_NUMBER: _ClassVar[int]
BRANCH_ID_FIELD_NUMBER: _ClassVar[int]
NODE_ID_FIELD_NUMBER: _ClassVar[int]
INVOCATION_COUNT_FIELD_NUMBER: _ClassVar[int]
durable_task_external_id: str
branch_id: int
node_id: int
def __init__(self, durable_task_external_id: _Optional[str] = ..., branch_id: _Optional[int] = ..., node_id: _Optional[int] = ...) -> None: ...
invocation_count: int
def __init__(self, durable_task_external_id: _Optional[str] = ..., branch_id: _Optional[int] = ..., node_id: _Optional[int] = ..., invocation_count: _Optional[int] = ...) -> None: ...
class DurableTaskServerEvictNotice(_message.Message):
__slots__ = ("durable_task_external_id", "invocation_count", "reason")
DURABLE_TASK_EXTERNAL_ID_FIELD_NUMBER: _ClassVar[int]
INVOCATION_COUNT_FIELD_NUMBER: _ClassVar[int]
REASON_FIELD_NUMBER: _ClassVar[int]
durable_task_external_id: str
invocation_count: int
reason: str
def __init__(self, durable_task_external_id: _Optional[str] = ..., invocation_count: _Optional[int] = ..., reason: _Optional[str] = ...) -> None: ...
class DurableTaskWorkerStatusRequest(_message.Message):
__slots__ = ("worker_id", "waiting_entries")
@@ -189,7 +201,7 @@ class DurableTaskErrorResponse(_message.Message):
def __init__(self, ref: _Optional[_Union[DurableEventLogEntryRef, _Mapping]] = ..., error_type: _Optional[_Union[DurableTaskErrorType, str]] = ..., error_message: _Optional[str] = ...) -> None: ...
class DurableTaskResponse(_message.Message):
__slots__ = ("register_worker", "memo_ack", "trigger_runs_ack", "wait_for_ack", "entry_completed", "error", "eviction_ack")
__slots__ = ("register_worker", "memo_ack", "trigger_runs_ack", "wait_for_ack", "entry_completed", "error", "eviction_ack", "server_evict")
REGISTER_WORKER_FIELD_NUMBER: _ClassVar[int]
MEMO_ACK_FIELD_NUMBER: _ClassVar[int]
TRIGGER_RUNS_ACK_FIELD_NUMBER: _ClassVar[int]
@@ -197,6 +209,7 @@ class DurableTaskResponse(_message.Message):
ENTRY_COMPLETED_FIELD_NUMBER: _ClassVar[int]
ERROR_FIELD_NUMBER: _ClassVar[int]
EVICTION_ACK_FIELD_NUMBER: _ClassVar[int]
SERVER_EVICT_FIELD_NUMBER: _ClassVar[int]
register_worker: DurableTaskResponseRegisterWorker
memo_ack: DurableTaskEventMemoAckResponse
trigger_runs_ack: DurableTaskEventTriggerRunsAckResponse
@@ -204,7 +217,8 @@ class DurableTaskResponse(_message.Message):
entry_completed: DurableTaskEventLogEntryCompletedResponse
error: DurableTaskErrorResponse
eviction_ack: DurableTaskEvictionAckResponse
def __init__(self, register_worker: _Optional[_Union[DurableTaskResponseRegisterWorker, _Mapping]] = ..., memo_ack: _Optional[_Union[DurableTaskEventMemoAckResponse, _Mapping]] = ..., trigger_runs_ack: _Optional[_Union[DurableTaskEventTriggerRunsAckResponse, _Mapping]] = ..., wait_for_ack: _Optional[_Union[DurableTaskEventWaitForAckResponse, _Mapping]] = ..., entry_completed: _Optional[_Union[DurableTaskEventLogEntryCompletedResponse, _Mapping]] = ..., error: _Optional[_Union[DurableTaskErrorResponse, _Mapping]] = ..., eviction_ack: _Optional[_Union[DurableTaskEvictionAckResponse, _Mapping]] = ...) -> None: ...
server_evict: DurableTaskServerEvictNotice
def __init__(self, register_worker: _Optional[_Union[DurableTaskResponseRegisterWorker, _Mapping]] = ..., memo_ack: _Optional[_Union[DurableTaskEventMemoAckResponse, _Mapping]] = ..., trigger_runs_ack: _Optional[_Union[DurableTaskEventTriggerRunsAckResponse, _Mapping]] = ..., wait_for_ack: _Optional[_Union[DurableTaskEventWaitForAckResponse, _Mapping]] = ..., entry_completed: _Optional[_Union[DurableTaskEventLogEntryCompletedResponse, _Mapping]] = ..., error: _Optional[_Union[DurableTaskErrorResponse, _Mapping]] = ..., eviction_ack: _Optional[_Union[DurableTaskEvictionAckResponse, _Mapping]] = ..., server_evict: _Optional[_Union[DurableTaskServerEvictNotice, _Mapping]] = ...) -> None: ...
class RegisterDurableEventRequest(_message.Message):
__slots__ = ("task_id", "signal_key", "conditions")
@@ -733,7 +733,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
input=self._serialize_input(input, target="string"),
options=self._create_options_with_combined_additional_meta(options),
)
refs = await durable_ctx._spawn_children_no_wait(self, [config])
refs = await durable_ctx._spawn_children_no_wait([config])
if not refs:
raise RuntimeError(
"Failed to spawn durable child workflow: no run references returned"
@@ -817,7 +817,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
"""
durable_ctx = ctx_durable_context.get()
if durable_ctx is not None and durable_ctx._supports_durable_eviction:
spawned_refs = await durable_ctx._spawn_children_no_wait(self, workflows)
spawned_refs = await durable_ctx._spawn_children_no_wait(workflows)
return await asyncio.gather(
*[
durable_ctx._aio_result_for_spawned_child(
@@ -20,6 +20,7 @@ class EvictionCause(str, Enum):
class DurableRunRecord(BaseModel):
key: ActionKey
step_run_id: str
invocation_count: int
eviction_policy: EvictionPolicy | None
registered_at: datetime
@@ -47,12 +48,14 @@ class DurableEvictionCache:
self,
key: ActionKey,
step_run_id: str,
invocation_count: int,
now: datetime,
eviction_policy: EvictionPolicy | None,
) -> None:
self._runs[key] = DurableRunRecord(
key=key,
step_run_id=step_run_id,
invocation_count=invocation_count,
eviction_policy=eviction_policy,
registered_at=now,
)
@@ -66,6 +69,12 @@ class DurableEvictionCache:
def get_all_waiting(self) -> list[DurableRunRecord]:
return [r for r in self._runs.values() if r.is_waiting]
def find_key_by_step_run_id(self, step_run_id: str) -> ActionKey | None:
for key, rec in self._runs.items():
if rec.step_run_id == step_run_id:
return key
return None
def mark_waiting(
self,
key: ActionKey,
@@ -73,11 +73,13 @@ class DurableEvictionManager:
key: ActionKey,
*,
step_run_id: str,
invocation_count: int,
eviction_policy: EvictionPolicy | None,
) -> None:
self._cache.register_run(
key,
step_run_id,
invocation_count=invocation_count,
now=self._now(),
eviction_policy=eviction_policy,
)
@@ -102,6 +104,10 @@ class DurableEvictionManager:
def mark_active(self, key: ActionKey) -> None:
self._cache.mark_active(key, now=self._now())
def _evict_run(self, key: ActionKey) -> None:
self._cancel_local(key)
self.unregister_run(key)
async def _run_loop(self) -> None:
interval = self._config.check_interval.total_seconds()
@@ -155,9 +161,25 @@ class DurableEvictionManager:
)
await self._request_eviction_with_ack(key, rec)
self._evict_run(key)
self._cancel_local(key)
self.unregister_run(key)
def handle_server_eviction(self, step_run_id: str, invocation_count: int) -> None:
"""Handle a server-initiated eviction notification for a stale invocation."""
key = self._cache.find_key_by_step_run_id(step_run_id)
if key is None:
return
rec = self._cache.get(key)
if rec is not None and rec.invocation_count != invocation_count:
return
logger.info(
"DurableEvictionManager: server-initiated eviction for "
"step_run_id=%s invocation_count=%d",
step_run_id,
invocation_count,
)
self._evict_run(key)
async def evict_all_waiting(self) -> int:
"""Evict every currently-waiting durable run. Used during graceful shutdown."""
@@ -189,8 +211,7 @@ class DurableEvictionManager:
)
continue
self._cancel_local(rec.key)
self.unregister_run(rec.key)
self._evict_run(rec.key)
evicted += 1
return evicted
@@ -143,7 +143,9 @@ class Runner:
)
if has_durable_tasks and self._supports_durable_eviction:
self.durable_event_listener = DurableEventListener(
self.config, admin_client=self.admin_client
self.config,
admin_client=self.admin_client,
on_server_evict=self._server_evict_callback,
)
elif has_durable_tasks:
self.durable_event_listener = PreEvictionDurableEventListener(self.config)
@@ -213,6 +215,15 @@ class Runner:
if key in self.tasks:
self.tasks[key].cancel()
def _server_evict_callback(
self, durable_task_external_id: str, invocation_count: int
) -> None:
"""Called from DurableEventListener when the server notifies a stale invocation."""
if self.durable_eviction_manager is not None:
self.durable_eviction_manager.handle_server_eviction(
durable_task_external_id, invocation_count
)
async def _eviction_request(self, key: ActionKey, rec: DurableRunRecord) -> None:
"""Called from DurableEvictionManager when it needs to request eviction from the server."""
if not isinstance(self.durable_event_listener, DurableEventListener):
@@ -442,8 +453,15 @@ class Runner:
del self.threads[key]
if key in self.contexts:
if self.contexts[key].exit_flag:
ctx = self.contexts[key]
if ctx.exit_flag:
self.cancellations[key] = True
if isinstance(
self.durable_event_listener, DurableEventListener
) and isinstance(ctx, DurableContext):
self.durable_event_listener.cleanup_task_state(
ctx.step_run_id, ctx.invocation_count
)
del self.contexts[key]
if self.durable_eviction_manager is not None:
@@ -531,6 +549,7 @@ class Runner:
self.durable_eviction_manager.register_run(
action.key,
step_run_id=action.step_run_id,
invocation_count=action.durable_task_invocation_count or 1,
eviction_policy=action_func.durable_eviction,
)
+27 -24
View File
@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -153,7 +153,7 @@ propcache = ">=0.2.0"
yarl = ">=1.17.0,<2.0"
[package.extras]
speedups = ["Brotli (>=1.2)", "aiodns (>=3.3.0)", "backports.zstd", "brotlicffi (>=1.2)"]
speedups = ["Brotli (>=1.2) ; platform_python_implementation == \"CPython\"", "aiodns (>=3.3.0)", "backports.zstd ; platform_python_implementation == \"CPython\" and python_version < \"3.14\"", "brotlicffi (>=1.2) ; platform_python_implementation != \"CPython\""]
[[package]]
name = "aiosignal"
@@ -201,7 +201,7 @@ idna = ">=2.8"
typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""}
[package.extras]
trio = ["trio (>=0.31.0)", "trio (>=0.32.0)"]
trio = ["trio (>=0.31.0) ; python_version < \"3.10\"", "trio (>=0.32.0) ; python_version >= \"3.10\""]
[[package]]
name = "async-timeout"
@@ -210,7 +210,7 @@ description = "Timeout context manager for asyncio programs"
optional = false
python-versions = ">=3.8"
groups = ["main"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"},
{file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"},
@@ -235,7 +235,7 @@ description = "Backport of asyncio.Runner, a context manager that controls event
optional = false
python-versions = "<3.11,>=3.8"
groups = ["test"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "backports_asyncio_runner-1.2.0-py3-none-any.whl", hash = "sha256:0da0a936a8aeb554eccb426dc55af3ba63bcdc69fa1a600b5bb305413a4477b5"},
{file = "backports_asyncio_runner-1.2.0.tar.gz", hash = "sha256:a5aa7b2b7d8f8bfcaa2b57313f70792df84e32a2a746f585213373f900b42162"},
@@ -343,6 +343,7 @@ files = [
{file = "certifi-2026.2.25-py3-none-any.whl", hash = "sha256:027692e4402ad994f1c42e52a4997a9763c646b73e4096e4d5d6db8af1d6f0fa"},
{file = "certifi-2026.2.25.tar.gz", hash = "sha256:e887ab5cee78ea814d3472169153c2d12cd43b14bd03329a39a9c6e2e80bfba7"},
]
markers = {main = "extra == \"otel\""}
[[package]]
name = "charset-normalizer"
@@ -466,6 +467,7 @@ files = [
{file = "charset_normalizer-3.4.4-py3-none-any.whl", hash = "sha256:7a32c560861a02ff789ad905a2fe94e3f840803362c84fecf1851cb4cf3dc37f"},
{file = "charset_normalizer-3.4.4.tar.gz", hash = "sha256:94537985111c35f28720e43603b8e7b43a6ecfb2ce1d3058bbe955b73404e21a"},
]
markers = {main = "extra == \"otel\""}
[[package]]
name = "click"
@@ -520,7 +522,7 @@ files = [
]
[package.extras]
dev = ["docstring-parser[docs]", "docstring-parser[test]", "pre-commit (>=2.16.0)"]
dev = ["docstring-parser[docs]", "docstring-parser[test]", "pre-commit (>=2.16.0) ; python_version >= \"3.9\""]
docs = ["pydoctor (>=25.4.0)"]
test = ["pytest"]
@@ -531,7 +533,7 @@ description = "Backport of PEP 654 (exception groups)"
optional = false
python-versions = ">=3.7"
groups = ["docs", "test"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "exceptiongroup-1.3.1-py3-none-any.whl", hash = "sha256:a7a39a3bd276781e98394987d3a5701d0c4edffb633bb7a5144577f82c773598"},
{file = "exceptiongroup-1.3.1.tar.gz", hash = "sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219"},
@@ -1009,7 +1011,7 @@ httpcore = "==1.*"
idna = "*"
[package.extras]
brotli = ["brotli", "brotlicffi"]
brotli = ["brotli ; platform_python_implementation == \"CPython\"", "brotlicffi ; platform_python_implementation != \"CPython\""]
cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
@@ -1047,13 +1049,13 @@ files = [
zipp = ">=3.20"
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
enabler = ["pytest-enabler (>=3.4)"]
perf = ["ipython"]
test = ["flufl.flake8", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"]
type = ["mypy (<1.19)", "pytest-mypy (>=1.0.1)"]
type = ["mypy (<1.19) ; platform_python_implementation == \"PyPy\"", "pytest-mypy (>=1.0.1)"]
[[package]]
name = "iniconfig"
@@ -1470,7 +1472,7 @@ watchdog = ">=2.0"
[package.extras]
i18n = ["babel (>=2.9.0)"]
min-versions = ["babel (==2.9.0)", "click (==7.0)", "colorama (==0.4)", "ghp-import (==1.0)", "importlib-metadata (==4.4)", "jinja2 (==2.11.1)", "markdown (==3.3.6)", "markupsafe (==2.0.1)", "mergedeep (==1.3.4)", "mkdocs-get-deps (==0.2.0)", "packaging (==20.5)", "pathspec (==0.11.1)", "pyyaml (==5.1)", "pyyaml-env-tag (==0.1)", "watchdog (==2.0)"]
min-versions = ["babel (==2.9.0)", "click (==7.0)", "colorama (==0.4) ; platform_system == \"Windows\"", "ghp-import (==1.0)", "importlib-metadata (==4.4) ; python_version < \"3.10\"", "jinja2 (==2.11.1)", "markdown (==3.3.6)", "markupsafe (==2.0.1)", "mergedeep (==1.3.4)", "mkdocs-get-deps (==0.2.0)", "packaging (==20.5)", "pathspec (==0.11.1)", "pyyaml (==5.1)", "pyyaml-env-tag (==0.1)", "watchdog (==2.0)"]
[[package]]
name = "mkdocs-autorefs"
@@ -2282,12 +2284,12 @@ typing-extensions = {version = ">=4.6", markers = "python_version < \"3.13\""}
tzdata = {version = "*", markers = "sys_platform == \"win32\""}
[package.extras]
binary = ["psycopg-binary (==3.3.3)"]
c = ["psycopg-c (==3.3.3)"]
binary = ["psycopg-binary (==3.3.3) ; implementation_name != \"pypy\""]
c = ["psycopg-c (==3.3.3) ; implementation_name != \"pypy\""]
dev = ["ast-comments (>=1.1.2)", "black (>=26.1.0)", "codespell (>=2.2)", "cython-lint (>=0.16)", "dnspython (>=2.1)", "flake8 (>=4.0)", "isort-psycopg", "isort[colors] (>=6.0)", "mypy (>=1.19.0)", "pre-commit (>=4.0.1)", "types-setuptools (>=57.4)", "types-shapely (>=2.0)", "wheel (>=0.37)"]
docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"]
pool = ["psycopg-pool"]
test = ["anyio (>=4.0)", "mypy (>=1.19.0)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"]
test = ["anyio (>=4.0)", "mypy (>=1.19.0) ; implementation_name != \"pypy\"", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"]
[[package]]
name = "psycopg-pool"
@@ -2327,7 +2329,7 @@ typing-inspection = ">=0.4.2"
[package.extras]
email = ["email-validator (>=2.0.0)"]
timezone = ["tzdata"]
timezone = ["tzdata ; python_version >= \"3.9\" and platform_system == \"Windows\""]
[[package]]
name = "pydantic-core"
@@ -2838,6 +2840,7 @@ files = [
{file = "requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6"},
{file = "requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf"},
]
markers = {main = "extra == \"otel\""}
[package.dependencies]
certifi = ">=2017.4.17"
@@ -2890,13 +2893,13 @@ files = [
]
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.13.0)"]
core = ["importlib_metadata (>=6)", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.13.0) ; sys_platform != \"cygwin\""]
core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.18.*)", "pytest-mypy"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.18.*)", "pytest-mypy"]
[[package]]
name = "six"
@@ -2975,7 +2978,7 @@ description = "A lil' TOML parser"
optional = false
python-versions = ">=3.8"
groups = ["docs", "lint", "test"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "tomli-2.4.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:b5ef256a3fd497d4973c11bf142e9ed78b150d36f5773f1ca6088c230ffc5867"},
{file = "tomli-2.4.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:5572e41282d5268eb09a697c89a7bee84fae66511f87533a6f88bd2f7b652da9"},
@@ -3194,10 +3197,10 @@ files = [
]
[package.extras]
brotli = ["brotli (>=1.2.0)", "brotlicffi (>=1.2.0.0)"]
brotli = ["brotli (>=1.2.0) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=1.2.0.0) ; platform_python_implementation != \"CPython\""]
h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["backports-zstd (>=1.0.0)"]
zstd = ["backports-zstd (>=1.0.0) ; python_version < \"3.14\""]
[[package]]
name = "uvicorn"
@@ -3217,7 +3220,7 @@ h11 = ">=0.8"
typing-extensions = {version = ">=4.0", markers = "python_version < \"3.11\""}
[package.extras]
standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"]
standard = ["colorama (>=0.4) ; sys_platform == \"win32\"", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.15.1) ; sys_platform != \"win32\" and sys_platform != \"cygwin\" and platform_python_implementation != \"PyPy\"", "watchfiles (>=0.13)", "websockets (>=10.4)"]
[[package]]
name = "watchdog"
@@ -3513,7 +3516,7 @@ files = [
]
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
enabler = ["pytest-enabler (>=2.2)"]
@@ -21,8 +21,12 @@ def test_ttl_eviction_prefers_oldest_waiting_and_priority() -> None:
eviction_low_prio = EvictionPolicy(ttl=timedelta(seconds=10), priority=0)
eviction_high_prio = EvictionPolicy(ttl=timedelta(seconds=10), priority=10)
cache.register_run(key1, "run-1", now=dt(0), eviction_policy=eviction_high_prio)
cache.register_run(key2, "run-2", now=dt(0), eviction_policy=eviction_low_prio)
cache.register_run(
key1, "run-1", invocation_count=1, now=dt(0), eviction_policy=eviction_high_prio
)
cache.register_run(
key2, "run-2", invocation_count=1, now=dt(0), eviction_policy=eviction_low_prio
)
cache.mark_waiting(
key1, now=dt(0), wait_kind="workflow_run_result", resource_id="wf1"
@@ -47,10 +51,13 @@ def test_none_eviction_params_never_selected() -> None:
key_no = "run-no/0"
key_yes = "run-yes/0"
cache.register_run(key_no, "run-no", now=dt(0), eviction_policy=None)
cache.register_run(
key_no, "run-no", invocation_count=1, now=dt(0), eviction_policy=None
)
cache.register_run(
key_yes,
"run-yes",
invocation_count=1,
now=dt(0),
eviction_policy=EvictionPolicy(ttl=timedelta(seconds=1)),
)
@@ -76,6 +83,7 @@ def test_capacity_eviction_respects_allow_capacity_and_min_wait() -> None:
cache.register_run(
key_blocked,
"run-blocked",
invocation_count=1,
now=dt(0),
eviction_policy=EvictionPolicy(
ttl=timedelta(hours=1), allow_capacity_eviction=False, priority=0
@@ -84,6 +92,7 @@ def test_capacity_eviction_respects_allow_capacity_and_min_wait() -> None:
cache.register_run(
key_ok,
"run-ok",
invocation_count=1,
now=dt(0),
eviction_policy=EvictionPolicy(
ttl=timedelta(hours=1), allow_capacity_eviction=True, priority=0
@@ -124,7 +133,9 @@ def test_concurrent_waits_keep_waiting_until_all_resolved() -> None:
key = "run-bulk/0"
policy = EvictionPolicy(ttl=timedelta(seconds=5), priority=0)
cache.register_run(key, "run-bulk", now=dt(0), eviction_policy=policy)
cache.register_run(
key, "run-bulk", invocation_count=1, now=dt(0), eviction_policy=policy
)
cache.mark_waiting(key, now=dt(1), wait_kind="spawn_child", resource_id="child0")
cache.mark_waiting(key, now=dt(1), wait_kind="spawn_child", resource_id="child1")
@@ -162,13 +173,48 @@ def test_concurrent_waits_keep_waiting_until_all_resolved() -> None:
assert rec.waiting_since is None
def test_find_key_by_step_run_id_returns_matching_key() -> None:
cache = DurableEvictionCache()
cache.register_run(
"run-a/0", "ext-a", invocation_count=1, now=dt(0), eviction_policy=None
)
cache.register_run(
"run-b/0", "ext-b", invocation_count=1, now=dt(0), eviction_policy=None
)
assert cache.find_key_by_step_run_id("ext-a") == "run-a/0"
assert cache.find_key_by_step_run_id("ext-b") == "run-b/0"
def test_find_key_by_step_run_id_returns_none_for_unknown() -> None:
cache = DurableEvictionCache()
cache.register_run(
"run-a/0", "ext-a", invocation_count=1, now=dt(0), eviction_policy=None
)
assert cache.find_key_by_step_run_id("no-such-id") is None
def test_find_key_by_step_run_id_returns_none_after_unregister() -> None:
cache = DurableEvictionCache()
cache.register_run(
"run-a/0", "ext-a", invocation_count=1, now=dt(0), eviction_policy=None
)
assert cache.find_key_by_step_run_id("ext-a") == "run-a/0"
cache.unregister_run("run-a/0")
assert cache.find_key_by_step_run_id("ext-a") is None
def test_mark_active_floors_at_zero() -> None:
"""Extra mark_active calls (defensive) should not go negative."""
cache = DurableEvictionCache()
key = "run-extra/0"
policy = EvictionPolicy(ttl=timedelta(seconds=5), priority=0)
cache.register_run(key, "run-extra", now=dt(0), eviction_policy=policy)
cache.register_run(
key, "run-extra", invocation_count=1, now=dt(0), eviction_policy=policy
)
cache.mark_waiting(key, now=dt(0), wait_kind="sleep", resource_id="s")
cache.mark_active(key, now=dt(1))
@@ -0,0 +1,115 @@
from __future__ import annotations
from datetime import timedelta
from unittest.mock import AsyncMock, MagicMock
from hatchet_sdk.runnables.eviction import EvictionPolicy
from hatchet_sdk.worker.durable_eviction.manager import (
DurableEvictionConfig,
DurableEvictionManager,
)
def _make_manager(
cancel_local: MagicMock | None = None,
) -> tuple[DurableEvictionManager, MagicMock]:
cancel = cancel_local or MagicMock()
request_eviction = AsyncMock()
mgr = DurableEvictionManager(
durable_slots=10,
cancel_local=cancel,
request_eviction_with_ack=request_eviction,
config=DurableEvictionConfig(check_interval=timedelta(hours=1)),
)
return mgr, cancel
def test_handle_server_eviction_cancels_and_unregisters() -> None:
mgr, cancel = _make_manager()
key = "run-1/0"
mgr.register_run(
key,
step_run_id="ext-1",
invocation_count=2,
eviction_policy=EvictionPolicy(ttl=timedelta(seconds=30)),
)
mgr.mark_waiting(key, wait_kind="sleep", resource_id="s1")
mgr.handle_server_eviction("ext-1", 2)
cancel.assert_called_once_with(key)
assert mgr.cache.get(key) is None
def test_handle_server_eviction_unknown_id_is_noop() -> None:
mgr, cancel = _make_manager()
mgr.register_run(
"run-1/0", step_run_id="ext-1", invocation_count=1, eviction_policy=None
)
mgr.handle_server_eviction("no-such-id", 1)
cancel.assert_not_called()
assert mgr.cache.get("run-1/0") is not None
def test_handle_server_eviction_only_evicts_matching_run() -> None:
mgr, cancel = _make_manager()
mgr.register_run(
"run-1/0",
step_run_id="ext-1",
invocation_count=1,
eviction_policy=EvictionPolicy(ttl=timedelta(seconds=30)),
)
mgr.register_run(
"run-2/0",
step_run_id="ext-2",
invocation_count=1,
eviction_policy=EvictionPolicy(ttl=timedelta(seconds=30)),
)
mgr.mark_waiting("run-1/0", wait_kind="sleep", resource_id="s1")
mgr.mark_waiting("run-2/0", wait_kind="sleep", resource_id="s2")
mgr.handle_server_eviction("ext-1", 1)
cancel.assert_called_once_with("run-1/0")
assert mgr.cache.get("run-1/0") is None
assert mgr.cache.get("run-2/0") is not None
def test_handle_server_eviction_skips_newer_invocation() -> None:
mgr, cancel = _make_manager()
mgr.register_run(
"run-1/0",
step_run_id="ext-1",
invocation_count=3,
eviction_policy=EvictionPolicy(ttl=timedelta(seconds=30)),
)
mgr.mark_waiting("run-1/0", wait_kind="sleep", resource_id="s1")
mgr.handle_server_eviction("ext-1", 2)
cancel.assert_not_called()
assert mgr.cache.get("run-1/0") is not None
def test_handle_server_eviction_evicts_exact_invocation_match() -> None:
mgr, cancel = _make_manager()
mgr.register_run(
"run-1/0",
step_run_id="ext-1",
invocation_count=5,
eviction_policy=EvictionPolicy(ttl=timedelta(seconds=30)),
)
mgr.mark_waiting("run-1/0", wait_kind="sleep", resource_id="s1")
mgr.handle_server_eviction("ext-1", 5)
cancel.assert_called_once_with("run-1/0")
assert mgr.cache.get("run-1/0") is None
View File
+13
View File
@@ -0,0 +1,13 @@
"""Override the session-scoped autouse worker fixture from the root conftest
so that pure unit tests can run without a live Hatchet server."""
from __future__ import annotations
from collections.abc import Iterator
import pytest
@pytest.fixture(scope="session", autouse=True)
def worker() -> Iterator[None]:
yield None
@@ -0,0 +1,398 @@
"""Tests for DurableEventListener reconnection logic."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import grpc
import grpc.aio
import pytest
from hatchet_sdk.clients.listeners.durable_event_listener import (
DEFAULT_RECONNECT_INTERVAL,
DurableEventListener,
)
_MODULE = "hatchet_sdk.clients.listeners.durable_event_listener"
class ControllableStream:
"""Async iterator whose lifetime can be controlled from tests."""
def __init__(self) -> None:
self._queue: asyncio.Queue[tuple[str, object]] = asyncio.Queue()
def push(self, response: object) -> None:
self._queue.put_nowait(("response", response))
def end(self) -> None:
self._queue.put_nowait(("end", None))
def fail(self, error: BaseException) -> None:
self._queue.put_nowait(("error", error))
def __aiter__(self) -> ControllableStream:
return self
async def __anext__(self) -> object:
kind, value = await self._queue.get()
if kind == "end":
raise StopAsyncIteration
if kind == "error":
raise value # type: ignore[misc]
return value
def _make_grpc_error(code: grpc.StatusCode, details: str = "") -> grpc.aio.AioRpcError:
empty: grpc.aio.Metadata = grpc.aio.Metadata()
return grpc.aio.AioRpcError(code, empty, empty, details)
class _Harness:
"""Sets up a DurableEventListener with fully mocked gRPC dependencies."""
def __init__(self) -> None:
config = MagicMock()
config.token = "test-token"
admin_client = MagicMock()
self.listener = DurableEventListener(config, admin_client)
self.streams: list[ControllableStream] = []
self.call_count = 0
self._mock_conn = MagicMock()
self._mock_conn.close = AsyncMock()
self._patches: list[Any] = [
patch(f"{_MODULE}.new_conn", return_value=self._mock_conn),
patch(f"{_MODULE}.V1DispatcherStub", side_effect=self._make_stub),
patch(f"{_MODULE}.get_metadata", return_value=[]),
patch(f"{_MODULE}.DEFAULT_RECONNECT_INTERVAL", 0.01),
]
for p in self._patches:
p.start()
def _make_stub(self, _channel: object) -> MagicMock:
stub = MagicMock()
stub.DurableTask.side_effect = self._next_stream
return stub
def _next_stream(self, *_a: object, **_kw: object) -> ControllableStream:
idx = min(self.call_count, len(self.streams) - 1)
self.call_count += 1
return self.streams[idx]
def add_eof_stream(self) -> ControllableStream:
s = ControllableStream()
s.end()
self.streams.append(s)
return s
def add_hanging_stream(self) -> ControllableStream:
s = ControllableStream()
self.streams.append(s)
return s
def add_error_stream(self, error: BaseException) -> ControllableStream:
s = ControllableStream()
s.fail(error)
self.streams.append(s)
return s
async def start(self, worker_id: str = "w1") -> None:
await self.listener.start(worker_id)
async def teardown(self) -> None:
try:
await self.listener.stop()
except Exception:
pass
for s in self.streams:
try:
s.end()
except Exception:
pass
for p in self._patches:
p.stop()
@pytest.fixture
async def harness() -> AsyncIterator[_Harness]:
h = _Harness()
yield h
await h.teardown()
# ── reconnection on stream EOF ──
async def test_opens_new_stream_after_eof(harness: _Harness) -> None:
harness.add_eof_stream()
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.15)
assert harness.call_count >= 2
async def test_multiple_eof_reconnects(harness: _Harness) -> None:
for _ in range(3):
harness.add_eof_stream()
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.3)
assert harness.call_count >= 4
# ── reconnection on gRPC error ──
async def test_reconnects_on_unavailable(harness: _Harness) -> None:
err = _make_grpc_error(grpc.StatusCode.UNAVAILABLE, "server unavailable")
harness.add_error_stream(err)
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.15)
assert harness.call_count >= 2
async def test_reconnects_on_internal_error(harness: _Harness) -> None:
err = _make_grpc_error(grpc.StatusCode.INTERNAL, "internal")
harness.add_error_stream(err)
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.15)
assert harness.call_count >= 2
async def test_reconnects_on_generic_exception(harness: _Harness) -> None:
s = ControllableStream()
harness.streams.append(s)
harness.add_hanging_stream()
await harness.start()
s.fail(RuntimeError("unexpected"))
await asyncio.sleep(0.15)
assert harness.call_count >= 2
# ── does NOT reconnect on CANCELLED ──
async def test_breaks_out_on_grpc_cancelled(harness: _Harness) -> None:
err = _make_grpc_error(grpc.StatusCode.CANCELLED, "cancelled")
harness.add_error_stream(err)
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.15)
assert harness.call_count == 1
# ── does NOT reconnect after stop ──
async def test_no_reconnect_after_stop(harness: _Harness) -> None:
harness.add_hanging_stream()
await harness.start()
await harness.listener.stop()
await asyncio.sleep(0.15)
assert harness.call_count == 1
# ── _fail_pending_acks correctness ──
async def test_fail_pending_acks_clears_event_acks(harness: _Harness) -> None:
harness.add_hanging_stream()
await harness.start()
future: asyncio.Future[object] = asyncio.get_event_loop().create_future()
harness.listener._pending_event_acks[("task1", 1)] = future # type: ignore[assignment]
harness.listener._fail_pending_acks(ConnectionResetError("disconnected"))
assert len(harness.listener._pending_event_acks) == 0
with pytest.raises(ConnectionResetError, match="disconnected"):
future.result()
async def test_pending_callbacks_survive_disconnect(
harness: _Harness,
) -> None:
"""Pending callbacks should survive a disconnect.
Callbacks represent server-side durable event log entries that persist
across connections. After reconnection, _poll_worker_status re-reports
them and GetSatisfiedDurableEvents delivers completions on the new stream.
"""
harness.add_eof_stream()
harness.add_hanging_stream()
await harness.start()
future: asyncio.Future[object] = asyncio.get_event_loop().create_future()
harness.listener._pending_callbacks[("task1", 1, 0, 1)] = future # type: ignore[assignment]
await asyncio.sleep(0.15)
assert not future.done(), (
"_pending_callbacks were failed on disconnect — "
"callbacks should survive reconnection so the polling path can deliver them"
)
assert ("task1", 1, 0, 1) in harness.listener._pending_callbacks
async def test_fail_pending_acks_clears_eviction_acks_on_disconnect(
harness: _Harness,
) -> None:
"""Pending eviction acks should be failed on disconnect.
If _fail_pending_acks does not clear _pending_eviction_acks, eviction
acknowledgments will hang indefinitely after a reconnection.
"""
harness.add_eof_stream()
harness.add_hanging_stream()
await harness.start()
future: asyncio.Future[None] = asyncio.get_event_loop().create_future()
harness.listener._pending_eviction_acks[("task1", 1)] = future
await asyncio.sleep(0.15)
assert future.done(), (
"_pending_eviction_acks were not failed on disconnect — "
"eviction acks will hang forever after reconnection"
)
# ── pending event acks rejected on EOF (integration) ──
async def test_event_acks_rejected_when_stream_ends(
harness: _Harness,
) -> None:
stream1 = ControllableStream()
harness.streams.append(stream1)
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.05)
future: asyncio.Future[object] = asyncio.get_event_loop().create_future()
harness.listener._pending_event_acks[("task1", 1)] = future # type: ignore[assignment]
stream1.end()
await asyncio.sleep(0.15)
assert future.done()
with pytest.raises(ConnectionResetError):
future.result()
async def test_event_acks_rejected_when_stream_errors(
harness: _Harness,
) -> None:
stream1 = ControllableStream()
harness.streams.append(stream1)
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.05)
future: asyncio.Future[object] = asyncio.get_event_loop().create_future()
harness.listener._pending_event_acks[("task1", 1)] = future # type: ignore[assignment]
stream1.fail(_make_grpc_error(grpc.StatusCode.UNAVAILABLE, "gone"))
await asyncio.sleep(0.15)
assert future.done()
with pytest.raises(ConnectionResetError):
future.result()
# ── worker re-registration ──
async def test_request_queue_exists_after_each_connect(
harness: _Harness,
) -> None:
harness.add_eof_stream()
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.15)
assert harness.call_count >= 2
assert harness.listener._request_queue is not None
# ── connect failure during reconnect ──
async def test_survives_connect_failure_and_keeps_running(
harness: _Harness,
) -> None:
"""When _connect() fails during reconnection, the receive loop should
not crash. It should continue running and try reconnecting again."""
stream1 = ControllableStream()
harness.streams.append(stream1)
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.05)
import hatchet_sdk.clients.listeners.durable_event_listener as mod
original_new_conn = getattr(mod, "new_conn")
setattr(mod, "new_conn", MagicMock(side_effect=ConnectionError("network down")))
stream1.end()
await asyncio.sleep(0.3)
setattr(mod, "new_conn", original_new_conn)
assert harness.listener._running is True
# ── listener state after reconnect ──
async def test_still_running_after_reconnect(harness: _Harness) -> None:
harness.add_eof_stream()
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.15)
assert harness.listener._running is True
async def test_has_new_stream_after_reconnect(harness: _Harness) -> None:
s1 = ControllableStream()
harness.streams.append(s1)
harness.add_hanging_stream()
await harness.start()
await asyncio.sleep(0.05)
old_stream = harness.listener._stream
s1.end()
await asyncio.sleep(0.15)
assert harness.listener._stream is not old_stream
@@ -1,8 +1,4 @@
import {
DispatcherClient as PbDispatcherClient,
AssignedAction,
ActionType,
} from '@hatchet/protoc/dispatcher';
import { DispatcherClient as PbDispatcherClient, AssignedAction } from '@hatchet/protoc/dispatcher';
import { Status } from 'nice-grpc';
import { getGrpcErrorCode } from '@util/grpc-error';
@@ -23,28 +19,20 @@ enum ListenStrategy {
LISTEN_STRATEGY_V2 = 2,
}
export type Action = AssignedAction & {
/** @deprecated use taskRunId */
stepRunId?: string;
/** @deprecated use taskId */
stepId?: string;
};
export type ActionKey = `${string}/${number}`;
export type ActionKey = string;
export type Action = AssignedAction & { readonly key: ActionKey };
export function createActionKey(action: Action): ActionKey {
switch (action.actionType) {
case ActionType.START_GET_GROUP_KEY:
return `${action.getGroupKeyRunId}/${action.retryCount}`;
case ActionType.CANCEL_STEP_RUN:
case ActionType.START_STEP_RUN:
case ActionType.UNRECOGNIZED:
return `${action.taskRunExternalId}/${action.retryCount}`;
default:
// eslint-disable-next-line no-case-declarations
const exhaustivenessCheck: never = action.actionType;
throw new Error(`Unhandled action type: ${exhaustivenessCheck}`);
}
export function createAction(assignedAction: AssignedAction): Action {
const action = assignedAction as Action;
Object.defineProperty(action, 'key', {
get(): ActionKey {
return `${this.taskRunExternalId}/${this.retryCount}`;
},
enumerable: true,
configurable: true,
});
return action;
}
export class ActionListener {
@@ -87,13 +75,7 @@ export class ActionListener {
const listenClient = await client.getListenClient();
for await (const assignedAction of listenClient) {
const action: Action = {
...assignedAction,
stepRunId: assignedAction.taskRunExternalId,
stepId: assignedAction.taskId,
};
yield action;
yield createAction(assignedAction);
}
} catch (e: unknown) {
// If the stream was aborted (e.g., during worker shutdown), exit gracefully
@@ -0,0 +1,424 @@
/* eslint-disable require-yield */
import sleep from '@hatchet/util/sleep';
import { DurableListenerClient } from './durable-listener-client';
jest.mock('@hatchet/util/sleep', () => ({
__esModule: true,
default: jest.fn(() => Promise.resolve()),
}));
const mockedSleep = jest.mocked(sleep);
function noopLogger() {
return { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() };
}
function mockConfig(): any {
return { logger: () => noopLogger(), log_level: 'OFF' };
}
async function settle(ms = 50): Promise<void> {
await new Promise<void>((r) => {
setTimeout(r, ms);
});
}
function emptyStream(): AsyncIterable<any> {
return (async function* empty() {})();
}
function errorStream(err: Error): AsyncIterable<any> {
return (async function* throwErr() {
throw err;
})();
}
function hangingStream(): { stream: AsyncIterable<any>; end: () => void } {
let resolver!: () => void;
const gate = new Promise<void>((r) => {
resolver = r;
});
const stream = (async function* hang() {
await gate;
})();
return { stream, end: () => resolver() };
}
function controllableStream() {
const buffer: any[] = [];
let waiter: ((v: { response?: any; done?: boolean; error?: Error }) => void) | null = null;
let ended = false;
return {
push(response: any) {
if (waiter) {
const w = waiter;
waiter = null;
w({ response });
} else {
buffer.push(response);
}
},
end() {
ended = true;
if (waiter) {
const w = waiter;
waiter = null;
w({ done: true });
}
},
error(err: Error) {
if (waiter) {
const w = waiter;
waiter = null;
w({ error: err });
}
},
stream: {
async *[Symbol.asyncIterator]() {
while (true) {
if (buffer.length > 0) {
yield buffer.shift()!;
continue;
}
if (ended) return;
const result = await new Promise<{ response?: any; done?: boolean; error?: Error }>(
(r) => {
waiter = r;
}
);
if (result.error) throw result.error;
if (result.done) return;
if (result.response !== undefined) yield result.response;
}
},
},
};
}
function makeDeferred<T = any>() {
let resolve!: (v: T) => void;
let reject!: (r: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
describe('DurableListenerClient reconnection', () => {
let grpcClient: any;
let listener: DurableListenerClient;
const openStreams: { end: () => void }[] = [];
function tracked(s: ReturnType<typeof hangingStream>) {
openStreams.push(s);
return s;
}
beforeEach(() => {
jest.clearAllMocks();
grpcClient = { durableTask: jest.fn() };
const factory = { create: jest.fn(() => grpcClient) };
listener = new DurableListenerClient(mockConfig(), {} as any, factory as any);
});
afterEach(async () => {
await listener.stop();
for (const s of openStreams) s.end();
openStreams.length = 0;
await settle(10);
});
// ── reconnection on stream EOF ──
describe('reconnects on stream EOF', () => {
it('opens a new stream after the first stream ends', async () => {
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call === 1 ? emptyStream() : h.stream));
await listener.start('w1');
await settle();
expect(grpcClient.durableTask).toHaveBeenCalledTimes(2);
});
it('sleeps DEFAULT_RECONNECT_INTERVAL before reconnecting', async () => {
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call === 1 ? emptyStream() : h.stream));
await listener.start('w1');
await settle();
expect(mockedSleep).toHaveBeenCalledWith(3000);
});
});
// ── reconnection on stream error ──
describe('reconnects on stream error', () => {
it('opens a new stream after a non-abort error', async () => {
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() =>
++call === 1 ? errorStream(new Error('network reset')) : h.stream
);
await listener.start('w1');
await settle();
expect(grpcClient.durableTask).toHaveBeenCalledTimes(2);
});
it('sleeps before reconnecting on error', async () => {
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() =>
++call === 1 ? errorStream(new Error('fail')) : h.stream
);
await listener.start('w1');
await settle();
expect(mockedSleep).toHaveBeenCalledWith(3000);
});
});
// ── no reconnect when stopped ──
describe('does not reconnect when stopped', () => {
it('does not open a new stream after stop()', async () => {
const h = tracked(hangingStream());
grpcClient.durableTask.mockReturnValue(h.stream);
await listener.start('w1');
await listener.stop();
await settle();
expect(grpcClient.durableTask).toHaveBeenCalledTimes(1);
});
});
// ── multiple sequential reconnects ──
describe('multiple sequential reconnects', () => {
it('recovers through several consecutive EOFs', async () => {
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call <= 3 ? emptyStream() : h.stream));
await listener.start('w1');
await settle(150);
expect(grpcClient.durableTask.mock.calls.length).toBeGreaterThanOrEqual(4);
});
it('recovers through several consecutive errors', async () => {
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() =>
++call <= 3 ? errorStream(new Error(`err-${call}`)) : h.stream
);
await listener.start('w1');
await settle(150);
expect(grpcClient.durableTask.mock.calls.length).toBeGreaterThanOrEqual(4);
});
});
// ── worker re-registration ──
describe('worker re-registration on reconnect', () => {
it('enqueues a registerWorker request for each connection', async () => {
const registrations: any[] = [];
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation((reqIter: AsyncIterable<any>) => {
call++;
(async () => {
const iter = reqIter[Symbol.asyncIterator]();
const first = await iter.next();
if (!first.done) registrations.push(first.value);
})();
return call === 1 ? emptyStream() : h.stream;
});
await listener.start('w1');
await settle(100);
expect(registrations.length).toBeGreaterThanOrEqual(2);
for (const reg of registrations) {
expect(reg).toHaveProperty('registerWorker');
expect(reg.registerWorker.workerId).toBe('w1');
}
});
});
// ── _failPendingAcks correctness ──
describe('_failPendingAcks', () => {
beforeEach(async () => {
const h = tracked(hangingStream());
grpcClient.durableTask.mockReturnValue(h.stream);
await listener.start('w1');
});
it('rejects all pending event acks and clears the map', () => {
const l = listener as any;
const d = makeDeferred();
l._pendingEventAcks.set('task:1', d);
l._failPendingAcks(new Error('disconnected'));
expect(l._pendingEventAcks.size).toBe(0);
return expect(d.promise).rejects.toThrow('disconnected');
});
it('preserves pending callbacks (server-side state survives reconnection)', () => {
const l = listener as any;
const d = makeDeferred();
l._pendingCallbacks.set('task:1:0:1', d);
l._failPendingAcks(new Error('disconnected'));
expect(l._pendingCallbacks.size).toBe(1);
expect(l._pendingCallbacks.get('task:1:0:1')).toBe(d);
});
it('rejects all pending eviction acks and clears the map', () => {
const l = listener as any;
const d = makeDeferred();
l._pendingEvictionAcks.set('task:1', d);
l._failPendingAcks(new Error('disconnected'));
expect(l._pendingEvictionAcks.size).toBe(0);
return expect(d.promise).rejects.toThrow('disconnected');
});
it('preserves buffered completions (server-side state survives reconnection)', () => {
const l = listener as any;
const completion = {
durableTaskExternalId: 'task',
nodeId: 1,
payload: {},
};
l._bufferedCompletions.set('task:1:0:1', completion);
l._failPendingAcks(new Error('disconnected'));
expect(l._bufferedCompletions.size).toBe(1);
expect(l._bufferedCompletions.get('task:1:0:1')).toBe(completion);
});
});
// ── pending state rejected on stream disconnect ──
describe('pending state is rejected on stream disconnect', () => {
it('rejects pending event acks when stream ends (EOF)', async () => {
const ctrl = controllableStream();
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call === 1 ? ctrl.stream : h.stream));
await listener.start('w1');
await settle();
const d = makeDeferred();
(listener as any)._pendingEventAcks.set('task:1', d);
const assertion = expect(d.promise).rejects.toThrow('durable stream disconnected');
ctrl.end();
await settle();
await assertion;
});
it('preserves pending callbacks when stream ends (EOF)', async () => {
const ctrl = controllableStream();
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call === 1 ? ctrl.stream : h.stream));
await listener.start('w1');
await settle();
const d = makeDeferred();
(listener as any)._pendingCallbacks.set('task:1:0:1', d);
ctrl.end();
await settle();
expect((listener as any)._pendingCallbacks.size).toBe(1);
expect((listener as any)._pendingCallbacks.get('task:1:0:1')).toBe(d);
});
it('rejects pending eviction acks when stream ends (EOF)', async () => {
const ctrl = controllableStream();
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call === 1 ? ctrl.stream : h.stream));
await listener.start('w1');
await settle();
const d = makeDeferred();
(listener as any)._pendingEvictionAcks.set('task:1', d);
const assertion = expect(d.promise).rejects.toThrow('durable stream disconnected');
ctrl.end();
await settle();
await assertion;
});
it('rejects pending event acks when stream errors', async () => {
const ctrl = controllableStream();
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call === 1 ? ctrl.stream : h.stream));
await listener.start('w1');
await settle();
const d = makeDeferred();
(listener as any)._pendingEventAcks.set('task:1', d);
const assertion = expect(d.promise).rejects.toThrow('durable stream error');
ctrl.error(new Error('transport failure'));
await settle();
await assertion;
});
});
// ── listener remains operational after reconnect ──
describe('listener state after reconnect', () => {
it('is still running after reconnect', async () => {
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call === 1 ? emptyStream() : h.stream));
await listener.start('w1');
await settle();
expect((listener as any)._running).toBe(true);
});
it('creates a fresh AbortController for the new stream', async () => {
const h = tracked(hangingStream());
let call = 0;
grpcClient.durableTask.mockImplementation(() => (++call === 1 ? emptyStream() : h.stream));
await listener.start('w1');
await settle();
const abort = (listener as any)._receiveAbort as AbortController;
expect(abort).toBeDefined();
expect(abort.signal.aborted).toBe(false);
});
});
});
@@ -167,13 +167,19 @@ export class DurableListenerClient {
PendingCallbackKey,
Deferred<DurableTaskEventLogEntryResult>
>();
private _earlyCompletions = new Map<PendingCallbackKey, DurableTaskEventLogEntryResult>();
// Completions that arrived before waitForCallback() registered a deferred
// in _pendingCallbacks. This happens when the server delivers an
// entryCompleted between the event ack and the waitForCallback call
// (e.g. an already-satisfied sleep delivered via polling).
private _bufferedCompletions = new Map<PendingCallbackKey, DurableTaskEventLogEntryResult>();
private _pendingEvictionAcks = new Map<PendingEvictionAckKey, Deferred<void>>();
private _receiveAbort: AbortController | undefined;
private _statusInterval: ReturnType<typeof setInterval> | undefined;
private _startLock: Promise<void> | undefined;
onServerEvict: ((durableTaskExternalId: string, invocationCount: number) => void) | undefined;
constructor(config: ClientConfig, channel: Channel, factory: ClientFactory) {
this.config = config;
this.client = factory.create(V1DispatcherDefinition, channel);
@@ -231,6 +237,8 @@ export class DurableListenerClient {
registerWorker: { workerId: this._workerId! } as DurableTaskRequestRegisterWorker,
});
this._pollWorkerStatus();
void this._streamLoop();
this.logger.info('durable event listener connected');
@@ -309,6 +317,7 @@ export class DurableListenerClient {
const parts = key.split(':');
waitingEntries.push({
durableTaskExternalId: parts[0],
invocationCount: parseInt(parts[1], 10),
branchId: parseInt(parts[2], 10),
nodeId: parseInt(parts[3], 10),
});
@@ -328,17 +337,10 @@ export class DurableListenerClient {
}
this._pendingEventAcks.clear();
for (const d of this._pendingCallbacks.values()) {
d.reject(exc);
}
this._pendingCallbacks.clear();
for (const d of this._pendingEvictionAcks.values()) {
d.reject(exc);
}
this._pendingEvictionAcks.clear();
this._earlyCompletions.clear();
}
private _handleResponse(response: DurableTaskResponse): void {
@@ -407,7 +409,7 @@ export class DurableListenerClient {
pending.resolve(result);
this._pendingCallbacks.delete(key);
} else {
this._earlyCompletions.set(key, result);
this._bufferedCompletions.set(key, result);
}
} else if (response.evictionAck) {
const ack = response.evictionAck;
@@ -417,6 +419,16 @@ export class DurableListenerClient {
pending.resolve();
this._pendingEvictionAcks.delete(key);
}
} else if (response.serverEvict) {
const evict = response.serverEvict;
this.logger.info(
`received server eviction notification for task ${evict.durableTaskExternalId} ` +
`invocation ${evict.invocationCount}: ${evict.reason}`
);
this.cleanupTaskState(evict.durableTaskExternalId, evict.invocationCount);
if (this.onServerEvict) {
this.onServerEvict(evict.durableTaskExternalId, evict.invocationCount);
}
} else if (response.error) {
const { error } = response;
const { ref } = error;
@@ -540,14 +552,15 @@ export class DurableListenerClient {
): Promise<DurableTaskEventLogEntryResult> {
const key = callbackKey(durableTaskExternalId, invocationCount, branchId, nodeId);
const early = this._earlyCompletions.get(key);
const early = this._bufferedCompletions.get(key);
if (early) {
this._earlyCompletions.delete(key);
this._bufferedCompletions.delete(key);
return early;
}
if (!this._pendingCallbacks.has(key)) {
this._pendingCallbacks.set(key, deferred<DurableTaskEventLogEntryResult>());
this._pollWorkerStatus();
}
const d = this._pendingCallbacks.get(key)!;
@@ -611,10 +624,10 @@ export class DurableListenerClient {
}
}
for (const k of this._earlyCompletions.keys()) {
for (const k of this._bufferedCompletions.keys()) {
const parts = k.split(':');
if (parts[0] === durableTaskExternalId && parseInt(parts[1], 10) <= invocationCount) {
this._earlyCompletions.delete(k);
this._bufferedCompletions.delete(k);
}
}
}
+149 -1
View File
@@ -102,6 +102,14 @@ export interface DurableTaskAwaitedCompletedEntry {
durableTaskExternalId: string;
branchId: number;
nodeId: number;
invocationCount: number;
}
/** Sent by the server to notify a worker that its invocation is stale and should be cancelled. */
export interface DurableTaskServerEvictNotice {
durableTaskExternalId: string;
invocationCount: number;
reason: string;
}
export interface DurableTaskWorkerStatusRequest {
@@ -181,6 +189,7 @@ export interface DurableTaskResponse {
entryCompleted?: DurableTaskEventLogEntryCompletedResponse | undefined;
error?: DurableTaskErrorResponse | undefined;
evictionAck?: DurableTaskEvictionAckResponse | undefined;
serverEvict?: DurableTaskServerEvictNotice | undefined;
}
export interface RegisterDurableEventRequest {
@@ -1082,7 +1091,7 @@ export const DurableTaskEvictionAckResponse: MessageFns<DurableTaskEvictionAckRe
};
function createBaseDurableTaskAwaitedCompletedEntry(): DurableTaskAwaitedCompletedEntry {
return { durableTaskExternalId: '', branchId: 0, nodeId: 0 };
return { durableTaskExternalId: '', branchId: 0, nodeId: 0, invocationCount: 0 };
}
export const DurableTaskAwaitedCompletedEntry: MessageFns<DurableTaskAwaitedCompletedEntry> = {
@@ -1099,6 +1108,9 @@ export const DurableTaskAwaitedCompletedEntry: MessageFns<DurableTaskAwaitedComp
if (message.nodeId !== 0) {
writer.uint32(24).int64(message.nodeId);
}
if (message.invocationCount !== 0) {
writer.uint32(32).int32(message.invocationCount);
}
return writer;
},
@@ -1133,6 +1145,14 @@ export const DurableTaskAwaitedCompletedEntry: MessageFns<DurableTaskAwaitedComp
message.nodeId = longToNumber(reader.int64());
continue;
}
case 4: {
if (tag !== 32) {
break;
}
message.invocationCount = reader.int32();
continue;
}
}
if ((tag & 7) === 4 || tag === 0) {
break;
@@ -1149,6 +1169,9 @@ export const DurableTaskAwaitedCompletedEntry: MessageFns<DurableTaskAwaitedComp
: '',
branchId: isSet(object.branchId) ? globalThis.Number(object.branchId) : 0,
nodeId: isSet(object.nodeId) ? globalThis.Number(object.nodeId) : 0,
invocationCount: isSet(object.invocationCount)
? globalThis.Number(object.invocationCount)
: 0,
};
},
@@ -1163,6 +1186,9 @@ export const DurableTaskAwaitedCompletedEntry: MessageFns<DurableTaskAwaitedComp
if (message.nodeId !== 0) {
obj.nodeId = Math.round(message.nodeId);
}
if (message.invocationCount !== 0) {
obj.invocationCount = Math.round(message.invocationCount);
}
return obj;
},
@@ -1176,6 +1202,106 @@ export const DurableTaskAwaitedCompletedEntry: MessageFns<DurableTaskAwaitedComp
message.durableTaskExternalId = object.durableTaskExternalId ?? '';
message.branchId = object.branchId ?? 0;
message.nodeId = object.nodeId ?? 0;
message.invocationCount = object.invocationCount ?? 0;
return message;
},
};
function createBaseDurableTaskServerEvictNotice(): DurableTaskServerEvictNotice {
return { durableTaskExternalId: '', invocationCount: 0, reason: '' };
}
export const DurableTaskServerEvictNotice: MessageFns<DurableTaskServerEvictNotice> = {
encode(
message: DurableTaskServerEvictNotice,
writer: BinaryWriter = new BinaryWriter()
): BinaryWriter {
if (message.durableTaskExternalId !== '') {
writer.uint32(10).string(message.durableTaskExternalId);
}
if (message.invocationCount !== 0) {
writer.uint32(16).int32(message.invocationCount);
}
if (message.reason !== '') {
writer.uint32(26).string(message.reason);
}
return writer;
},
decode(input: BinaryReader | Uint8Array, length?: number): DurableTaskServerEvictNotice {
const reader = input instanceof BinaryReader ? input : new BinaryReader(input);
const end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseDurableTaskServerEvictNotice();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1: {
if (tag !== 10) {
break;
}
message.durableTaskExternalId = reader.string();
continue;
}
case 2: {
if (tag !== 16) {
break;
}
message.invocationCount = reader.int32();
continue;
}
case 3: {
if (tag !== 26) {
break;
}
message.reason = reader.string();
continue;
}
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skip(tag & 7);
}
return message;
},
fromJSON(object: any): DurableTaskServerEvictNotice {
return {
durableTaskExternalId: isSet(object.durableTaskExternalId)
? globalThis.String(object.durableTaskExternalId)
: '',
invocationCount: isSet(object.invocationCount)
? globalThis.Number(object.invocationCount)
: 0,
reason: isSet(object.reason) ? globalThis.String(object.reason) : '',
};
},
toJSON(message: DurableTaskServerEvictNotice): unknown {
const obj: any = {};
if (message.durableTaskExternalId !== '') {
obj.durableTaskExternalId = message.durableTaskExternalId;
}
if (message.invocationCount !== 0) {
obj.invocationCount = Math.round(message.invocationCount);
}
if (message.reason !== '') {
obj.reason = message.reason;
}
return obj;
},
create(base?: DeepPartial<DurableTaskServerEvictNotice>): DurableTaskServerEvictNotice {
return DurableTaskServerEvictNotice.fromPartial(base ?? {});
},
fromPartial(object: DeepPartial<DurableTaskServerEvictNotice>): DurableTaskServerEvictNotice {
const message = createBaseDurableTaskServerEvictNotice();
message.durableTaskExternalId = object.durableTaskExternalId ?? '';
message.invocationCount = object.invocationCount ?? 0;
message.reason = object.reason ?? '';
return message;
},
};
@@ -2006,6 +2132,7 @@ function createBaseDurableTaskResponse(): DurableTaskResponse {
entryCompleted: undefined,
error: undefined,
evictionAck: undefined,
serverEvict: undefined,
};
}
@@ -2044,6 +2171,9 @@ export const DurableTaskResponse: MessageFns<DurableTaskResponse> = {
if (message.evictionAck !== undefined) {
DurableTaskEvictionAckResponse.encode(message.evictionAck, writer.uint32(58).fork()).join();
}
if (message.serverEvict !== undefined) {
DurableTaskServerEvictNotice.encode(message.serverEvict, writer.uint32(66).fork()).join();
}
return writer;
},
@@ -2119,6 +2249,14 @@ export const DurableTaskResponse: MessageFns<DurableTaskResponse> = {
message.evictionAck = DurableTaskEvictionAckResponse.decode(reader, reader.uint32());
continue;
}
case 8: {
if (tag !== 66) {
break;
}
message.serverEvict = DurableTaskServerEvictNotice.decode(reader, reader.uint32());
continue;
}
}
if ((tag & 7) === 4 || tag === 0) {
break;
@@ -2149,6 +2287,9 @@ export const DurableTaskResponse: MessageFns<DurableTaskResponse> = {
evictionAck: isSet(object.evictionAck)
? DurableTaskEvictionAckResponse.fromJSON(object.evictionAck)
: undefined,
serverEvict: isSet(object.serverEvict)
? DurableTaskServerEvictNotice.fromJSON(object.serverEvict)
: undefined,
};
},
@@ -2175,6 +2316,9 @@ export const DurableTaskResponse: MessageFns<DurableTaskResponse> = {
if (message.evictionAck !== undefined) {
obj.evictionAck = DurableTaskEvictionAckResponse.toJSON(message.evictionAck);
}
if (message.serverEvict !== undefined) {
obj.serverEvict = DurableTaskServerEvictNotice.toJSON(message.serverEvict);
}
return obj;
},
@@ -2211,6 +2355,10 @@ export const DurableTaskResponse: MessageFns<DurableTaskResponse> = {
object.evictionAck !== undefined && object.evictionAck !== null
? DurableTaskEvictionAckResponse.fromPartial(object.evictionAck)
: undefined;
message.serverEvict =
object.serverEvict !== undefined && object.serverEvict !== null
? DurableTaskServerEvictNotice.fromPartial(object.serverEvict)
: undefined;
return message;
},
};
@@ -34,6 +34,7 @@ import { createHash } from 'crypto';
import { InternalWorker } from './worker-internal';
import { Duration, durationToString } from '../duration';
import { DurableEvictionManager } from './eviction/eviction-manager';
import { ActionKey } from './eviction/eviction-cache';
import { supportsEviction } from './engine-version';
import { waitForPreEviction } from './deprecated/pre-eviction';
// TODO remove this once we have a proper next step type
@@ -858,16 +859,20 @@ export class DurableContext<T, K = {}> extends Context<T, K> {
return this.action.durableTaskInvocationCount ?? 1;
}
private get _actionKey(): ActionKey {
return this.action.key;
}
private async withEvictionWait<R>(
waitKind: string,
resourceId: string,
fn: () => Promise<R>
): Promise<R> {
this._evictionManager?.markWaiting(this.action.taskRunExternalId, waitKind, resourceId);
this._evictionManager?.markWaiting(this._actionKey, waitKind, resourceId);
try {
return await fn();
} finally {
this._evictionManager?.markActive(this.action.taskRunExternalId);
this._evictionManager?.markActive(this._actionKey);
}
}
@@ -25,22 +25,23 @@ describe('DurableEvictionCache', () => {
describe('registerRun / unregisterRun / get', () => {
it('registers and retrieves a run', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
const rec = cache.get('k1');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
const rec = cache.get('k1/0');
expect(rec).toBeDefined();
expect(rec!.taskRunExternalId).toBe('ext-1');
expect(rec!.invocationCount).toBe(1);
expect(rec!.registeredAt).toBe(T0);
expect(rec!.waitingSince).toBeUndefined();
});
it('returns undefined for unknown key', () => {
expect(cache.get('nope')).toBeUndefined();
expect(cache.get('nope/0')).toBeUndefined();
});
it('unregisters a run', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.unregisterRun('k1');
expect(cache.get('k1')).toBeUndefined();
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.unregisterRun('k1/0');
expect(cache.get('k1/0')).toBeUndefined();
});
});
@@ -48,75 +49,99 @@ describe('DurableEvictionCache', () => {
describe('markWaiting / markActive / getAllWaiting', () => {
it('markWaiting sets wait fields', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0 + ONE_SEC, 'sleep', 'res-1');
const rec = cache.get('k1')!;
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0 + ONE_SEC, 'sleep', 'res-1');
const rec = cache.get('k1/0')!;
expect(rec.waitingSince).toBe(T0 + ONE_SEC);
expect(rec.waitKind).toBe('sleep');
expect(rec.waitResourceId).toBe('res-1');
});
it('markActive clears wait fields', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0 + ONE_SEC, 'sleep', 'res-1');
cache.markActive('k1');
const rec = cache.get('k1')!;
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0 + ONE_SEC, 'sleep', 'res-1');
cache.markActive('k1/0');
const rec = cache.get('k1/0')!;
expect(rec.waitingSince).toBeUndefined();
expect(rec.waitKind).toBeUndefined();
expect(rec.waitResourceId).toBeUndefined();
});
it('ref-counts concurrent waits so one markActive does not clear waiting', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0, 'runChild', 'child-0');
cache.markWaiting('k1', T0, 'runChild', 'child-1');
cache.markWaiting('k1', T0, 'runChild', 'child-2');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'runChild', 'child-0');
cache.markWaiting('k1/0', T0, 'runChild', 'child-1');
cache.markWaiting('k1/0', T0, 'runChild', 'child-2');
cache.markActive('k1');
const rec = cache.get('k1')!;
cache.markActive('k1/0');
const rec = cache.get('k1/0')!;
expect(rec._waitCount).toBe(2);
expect(rec.waitingSince).toBe(T0);
cache.markActive('k1');
cache.markActive('k1/0');
expect(rec._waitCount).toBe(1);
expect(rec.waitingSince).toBe(T0);
cache.markActive('k1');
cache.markActive('k1/0');
expect(rec._waitCount).toBe(0);
expect(rec.waitingSince).toBeUndefined();
});
it('waitingSince is set only on the first markWaiting call', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0, 'runChild', 'child-0');
cache.markWaiting('k1', T0 + ONE_SEC, 'runChild', 'child-1');
const rec = cache.get('k1')!;
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'runChild', 'child-0');
cache.markWaiting('k1/0', T0 + ONE_SEC, 'runChild', 'child-1');
const rec = cache.get('k1/0')!;
expect(rec.waitingSince).toBe(T0);
});
it('markActive never goes below zero', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.markActive('k1');
cache.markActive('k1');
expect(cache.get('k1')!._waitCount).toBe(0);
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'sleep', 'r');
cache.markActive('k1/0');
cache.markActive('k1/0');
expect(cache.get('k1/0')!._waitCount).toBe(0);
});
it('getAllWaiting returns only waiting runs', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.registerRun('k2', 'ext-2', T0, makePolicy());
cache.markWaiting('k1', T0, 'sleep', 'r1');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.registerRun('k2/0', 'ext-2', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'sleep', 'r1');
expect(cache.getAllWaiting()).toHaveLength(1);
expect(cache.getAllWaiting()[0].key).toBe('k1');
expect(cache.getAllWaiting()[0].key).toBe('k1/0');
});
it('markWaiting on unknown key is a no-op', () => {
cache.markWaiting('unknown', T0, 'sleep', 'r');
expect(cache.get('unknown')).toBeUndefined();
cache.markWaiting('unknown/0', T0, 'sleep', 'r');
expect(cache.get('unknown/0')).toBeUndefined();
});
it('markActive on unknown key is a no-op', () => {
expect(() => cache.markActive('unknown')).not.toThrow();
expect(() => cache.markActive('unknown/0')).not.toThrow();
});
});
// ------- findKeyByTaskRunExternalId -------
describe('findKeyByTaskRunExternalId', () => {
it('returns the matching key', () => {
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.registerRun('k2/0', 'ext-2', 1, T0, makePolicy());
expect(cache.findKeyByTaskRunExternalId('ext-1')).toBe('k1/0');
expect(cache.findKeyByTaskRunExternalId('ext-2')).toBe('k2/0');
});
it('returns undefined for unknown taskRunExternalId', () => {
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
expect(cache.findKeyByTaskRunExternalId('no-such-id')).toBeUndefined();
});
it('returns undefined after unregister', () => {
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
expect(cache.findKeyByTaskRunExternalId('ext-1')).toBe('k1/0');
cache.unregisterRun('k1/0');
expect(cache.findKeyByTaskRunExternalId('ext-1')).toBeUndefined();
});
});
@@ -134,15 +159,15 @@ describe('DurableEvictionCache', () => {
});
it('returns undefined when runs exist but none are waiting', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ ttl: '1m' }));
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ ttl: '1m' }));
expect(
cache.selectEvictionCandidate(T0 + ONE_MIN * 5, DURABLE_SLOTS, RESERVE_SLOTS, MIN_WAIT_MS)
).toBeUndefined();
});
it('returns undefined when waiting runs have no eviction policy', () => {
cache.registerRun('k1', 'ext-1', T0, undefined);
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, undefined);
cache.markWaiting('k1/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + ONE_MIN * 5, DURABLE_SLOTS, RESERVE_SLOTS, MIN_WAIT_MS)
).toBeUndefined();
@@ -152,63 +177,63 @@ describe('DurableEvictionCache', () => {
describe('TTL-based eviction', () => {
it('evicts a run whose TTL has been exceeded', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ ttl: '1m' }));
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ ttl: '1m' }));
cache.markWaiting('k1/0', T0, 'sleep', 'r');
const result = cache.selectEvictionCandidate(
T0 + ONE_MIN + 1,
DURABLE_SLOTS,
RESERVE_SLOTS,
MIN_WAIT_MS
);
expect(result).toBe('k1');
expect(result).toBe('k1/0');
});
it('does not evict when TTL has not been exceeded', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ ttl: '5m' }));
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ ttl: '5m' }));
cache.markWaiting('k1/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + ONE_MIN, DURABLE_SLOTS, RESERVE_SLOTS, MIN_WAIT_MS)
).toBeUndefined();
});
it('evicts regardless of capacity when TTL is exceeded', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ ttl: '1s' }));
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ ttl: '1s' }));
cache.markWaiting('k1/0', T0, 'sleep', 'r');
const noCapacityPressureSlots = 100;
expect(
cache.selectEvictionCandidate(T0 + 2 * ONE_SEC, noCapacityPressureSlots, 0, MIN_WAIT_MS)
).toBe('k1');
).toBe('k1/0');
});
it('picks lowest priority among TTL-eligible candidates', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ ttl: '1s', priority: 5 }));
cache.registerRun('k2', 'ext-2', T0, makePolicy({ ttl: '1s', priority: 1 }));
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.markWaiting('k2', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ ttl: '1s', priority: 5 }));
cache.registerRun('k2/0', 'ext-2', 1, T0, makePolicy({ ttl: '1s', priority: 1 }));
cache.markWaiting('k1/0', T0, 'sleep', 'r');
cache.markWaiting('k2/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + 2 * ONE_SEC, DURABLE_SLOTS, RESERVE_SLOTS, MIN_WAIT_MS)
).toBe('k2');
).toBe('k2/0');
});
it('breaks priority ties by longest waiting (earliest waitingSince)', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ ttl: '1s', priority: 0 }));
cache.registerRun('k2', 'ext-2', T0, makePolicy({ ttl: '1s', priority: 0 }));
cache.markWaiting('k1', T0 + 100, 'sleep', 'r');
cache.markWaiting('k2', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ ttl: '1s', priority: 0 }));
cache.registerRun('k2/0', 'ext-2', 1, T0, makePolicy({ ttl: '1s', priority: 0 }));
cache.markWaiting('k1/0', T0 + 100, 'sleep', 'r');
cache.markWaiting('k2/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + 2 * ONE_SEC, DURABLE_SLOTS, RESERVE_SLOTS, MIN_WAIT_MS)
).toBe('k2');
).toBe('k2/0');
});
it('uses DurationObject TTL', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ ttl: { seconds: 30 } }));
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ ttl: { seconds: 30 } }));
cache.markWaiting('k1/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + 29_000, DURABLE_SLOTS, RESERVE_SLOTS, MIN_WAIT_MS)
).toBeUndefined();
expect(
cache.selectEvictionCandidate(T0 + 31_000, DURABLE_SLOTS, RESERVE_SLOTS, MIN_WAIT_MS)
).toBe('k1');
).toBe('k1/0');
});
});
@@ -217,8 +242,8 @@ describe('DurableEvictionCache', () => {
describe('capacity-based eviction', () => {
it('evicts under capacity pressure when min wait is met', () => {
for (let i = 0; i < DURABLE_SLOTS; i += 1) {
cache.registerRun(`k${i}`, `ext-${i}`, T0, makePolicy());
cache.markWaiting(`k${i}`, T0, 'sleep', 'r');
cache.registerRun(`k${i}/0`, `ext-${i}`, 1, T0, makePolicy());
cache.markWaiting(`k${i}/0`, T0, 'sleep', 'r');
}
const result = cache.selectEvictionCandidate(
T0 + MIN_WAIT_MS + 1,
@@ -230,8 +255,8 @@ describe('DurableEvictionCache', () => {
});
it('does not evict when there is no capacity pressure', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(
T0 + MIN_WAIT_MS + 1,
@@ -244,8 +269,8 @@ describe('DurableEvictionCache', () => {
it('does not evict when min wait threshold has not been met', () => {
for (let i = 0; i < DURABLE_SLOTS; i += 1) {
cache.registerRun(`k${i}`, `ext-${i}`, T0, makePolicy());
cache.markWaiting(`k${i}`, T0, 'sleep', 'r');
cache.registerRun(`k${i}/0`, `ext-${i}`, 1, T0, makePolicy());
cache.markWaiting(`k${i}/0`, T0, 'sleep', 'r');
}
expect(
cache.selectEvictionCandidate(
@@ -259,8 +284,14 @@ describe('DurableEvictionCache', () => {
it('respects allowCapacityEviction=false', () => {
for (let i = 0; i < DURABLE_SLOTS; i += 1) {
cache.registerRun(`k${i}`, `ext-${i}`, T0, makePolicy({ allowCapacityEviction: false }));
cache.markWaiting(`k${i}`, T0, 'sleep', 'r');
cache.registerRun(
`k${i}/0`,
`ext-${i}`,
1,
T0,
makePolicy({ allowCapacityEviction: false })
);
cache.markWaiting(`k${i}/0`, T0, 'sleep', 'r');
}
expect(
cache.selectEvictionCandidate(
@@ -273,11 +304,17 @@ describe('DurableEvictionCache', () => {
});
it('skips allowCapacityEviction=false but evicts others', () => {
cache.registerRun('protected', 'ext-p', T0, makePolicy({ allowCapacityEviction: false }));
cache.markWaiting('protected', T0, 'sleep', 'r');
cache.registerRun(
'protected/0',
'ext-p',
1,
T0,
makePolicy({ allowCapacityEviction: false })
);
cache.markWaiting('protected/0', T0, 'sleep', 'r');
for (let i = 1; i < DURABLE_SLOTS; i += 1) {
cache.registerRun(`k${i}`, `ext-${i}`, T0, makePolicy());
cache.markWaiting(`k${i}`, T0, 'sleep', 'r');
cache.registerRun(`k${i}/0`, `ext-${i}`, 1, T0, makePolicy());
cache.markWaiting(`k${i}/0`, T0, 'sleep', 'r');
}
const result = cache.selectEvictionCandidate(
T0 + MIN_WAIT_MS + 1,
@@ -286,34 +323,34 @@ describe('DurableEvictionCache', () => {
MIN_WAIT_MS
);
expect(result).toBeDefined();
expect(result).not.toBe('protected');
expect(result).not.toBe('protected/0');
});
it('picks lowest priority among capacity candidates', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ priority: 10 }));
cache.registerRun('k2', 'ext-2', T0, makePolicy({ priority: 2 }));
cache.registerRun('k3', 'ext-3', T0, makePolicy({ priority: 5 }));
cache.registerRun('k4', 'ext-4', T0, makePolicy({ priority: 7 }));
for (const k of ['k1', 'k2', 'k3', 'k4']) {
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ priority: 10 }));
cache.registerRun('k2/0', 'ext-2', 1, T0, makePolicy({ priority: 2 }));
cache.registerRun('k3/0', 'ext-3', 1, T0, makePolicy({ priority: 5 }));
cache.registerRun('k4/0', 'ext-4', 1, T0, makePolicy({ priority: 7 }));
for (const k of ['k1/0', 'k2/0', 'k3/0', 'k4/0'] as const) {
cache.markWaiting(k, T0, 'sleep', 'r');
}
expect(
cache.selectEvictionCandidate(T0 + MIN_WAIT_MS + 1, 4, RESERVE_SLOTS, MIN_WAIT_MS)
).toBe('k2');
).toBe('k2/0');
});
it('breaks capacity priority ties by longest waiting', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ priority: 0 }));
cache.registerRun('k2', 'ext-2', T0, makePolicy({ priority: 0 }));
cache.registerRun('k3', 'ext-3', T0, makePolicy({ priority: 0 }));
cache.registerRun('k4', 'ext-4', T0, makePolicy({ priority: 0 }));
cache.markWaiting('k1', T0 + 200, 'sleep', 'r');
cache.markWaiting('k2', T0, 'sleep', 'r');
cache.markWaiting('k3', T0 + 100, 'sleep', 'r');
cache.markWaiting('k4', T0 + 300, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ priority: 0 }));
cache.registerRun('k2/0', 'ext-2', 1, T0, makePolicy({ priority: 0 }));
cache.registerRun('k3/0', 'ext-3', 1, T0, makePolicy({ priority: 0 }));
cache.registerRun('k4/0', 'ext-4', 1, T0, makePolicy({ priority: 0 }));
cache.markWaiting('k1/0', T0 + 200, 'sleep', 'r');
cache.markWaiting('k2/0', T0, 'sleep', 'r');
cache.markWaiting('k3/0', T0 + 100, 'sleep', 'r');
cache.markWaiting('k4/0', T0 + 300, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + MIN_WAIT_MS + 1000, 4, RESERVE_SLOTS, MIN_WAIT_MS)
).toBe('k2');
).toBe('k2/0');
});
});
@@ -321,10 +358,10 @@ describe('DurableEvictionCache', () => {
describe('reserveSlots', () => {
it('reserve slots reduce the effective capacity threshold', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.registerRun('k2', 'ext-2', T0, makePolicy());
cache.registerRun('k3', 'ext-3', T0, makePolicy());
for (const k of ['k1', 'k2', 'k3']) {
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.registerRun('k2/0', 'ext-2', 1, T0, makePolicy());
cache.registerRun('k3/0', 'ext-3', 1, T0, makePolicy());
for (const k of ['k1/0', 'k2/0', 'k3/0'] as const) {
cache.markWaiting(k, T0, 'sleep', 'r');
}
@@ -335,8 +372,8 @@ describe('DurableEvictionCache', () => {
});
it('no pressure when waiting count is below effective threshold', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'sleep', 'r');
// 4 slots - 0 reserved = 4 effective; 1 waiting < 4 → no pressure
expect(
@@ -345,8 +382,8 @@ describe('DurableEvictionCache', () => {
});
it('reserveSlots >= durableSlots means no capacity eviction', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + MIN_WAIT_MS + 1, 4, 4, MIN_WAIT_MS)
).toBeUndefined();
@@ -360,16 +397,16 @@ describe('DurableEvictionCache', () => {
describe('durableSlots edge cases', () => {
it('durableSlots=0 means no capacity eviction', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + MIN_WAIT_MS + 1, 0, 0, MIN_WAIT_MS)
).toBeUndefined();
});
it('durableSlots negative means no capacity eviction', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy());
cache.markWaiting('k1', T0, 'sleep', 'r');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy());
cache.markWaiting('k1/0', T0, 'sleep', 'r');
expect(
cache.selectEvictionCandidate(T0 + MIN_WAIT_MS + 1, -1, 0, MIN_WAIT_MS)
).toBeUndefined();
@@ -380,13 +417,13 @@ describe('DurableEvictionCache', () => {
describe('TTL takes precedence over capacity', () => {
it('selects TTL-eligible candidate even when capacity candidates also exist', () => {
cache.registerRun('ttl-run', 'ext-t', T0, makePolicy({ ttl: '1s', priority: 10 }));
cache.registerRun('cap-run', 'ext-c', T0, makePolicy({ priority: 0 }));
cache.markWaiting('ttl-run', T0, 'sleep', 'r');
cache.markWaiting('cap-run', T0, 'sleep', 'r');
cache.registerRun('ttl-run/0', 'ext-t', 1, T0, makePolicy({ ttl: '1s', priority: 10 }));
cache.registerRun('cap-run/0', 'ext-c', 1, T0, makePolicy({ priority: 0 }));
cache.markWaiting('ttl-run/0', T0, 'sleep', 'r');
cache.markWaiting('cap-run/0', T0, 'sleep', 'r');
const result = cache.selectEvictionCandidate(T0 + 2 * ONE_SEC, 2, 0, MIN_WAIT_MS);
expect(result).toBe('ttl-run');
expect(result).toBe('ttl-run/0');
});
});
@@ -394,16 +431,16 @@ describe('DurableEvictionCache', () => {
describe('evictionReason is set on the chosen record', () => {
it('sets TTL reason on TTL eviction', () => {
cache.registerRun('k1', 'ext-1', T0, makePolicy({ ttl: '30s' }));
cache.markWaiting('k1', T0, 'sleep', 'res-1');
cache.registerRun('k1/0', 'ext-1', 1, T0, makePolicy({ ttl: '30s' }));
cache.markWaiting('k1/0', T0, 'sleep', 'res-1');
cache.selectEvictionCandidate(T0 + 31_000, DURABLE_SLOTS, RESERVE_SLOTS, MIN_WAIT_MS);
expect(cache.get('k1')!.evictionReason).toMatch(/TTL.*exceeded/);
expect(cache.get('k1/0')!.evictionReason).toMatch(/TTL.*exceeded/);
});
it('sets capacity reason on capacity eviction', () => {
for (let i = 0; i < DURABLE_SLOTS; i += 1) {
cache.registerRun(`k${i}`, `ext-${i}`, T0, makePolicy());
cache.markWaiting(`k${i}`, T0, 'sleep', 'res');
cache.registerRun(`k${i}/0`, `ext-${i}`, 1, T0, makePolicy());
cache.markWaiting(`k${i}/0`, T0, 'sleep', 'res');
}
const key = cache.selectEvictionCandidate(
T0 + MIN_WAIT_MS + 1,
@@ -420,8 +457,9 @@ describe('DurableEvictionCache', () => {
describe('buildEvictionReason', () => {
function makeRecord(overrides: Partial<DurableRunRecord> = {}): DurableRunRecord {
return {
key: 'k1',
key: 'k1/0',
taskRunExternalId: 'ext-1',
invocationCount: 1,
evictionPolicy: { ttl: '30s', allowCapacityEviction: true, priority: 0 },
registeredAt: T0,
waitingSince: T0,
@@ -1,7 +1,8 @@
import { durationToMs } from '../../duration';
import { ActionKey } from '@hatchet/clients/dispatcher/action-listener';
import { EvictionPolicy } from './eviction-policy';
import { durationToMs } from '../../duration';
export type ActionKey = string;
export type { ActionKey };
export enum EvictionCause {
TTL_EXCEEDED = 'ttl_exceeded',
@@ -12,6 +13,7 @@ export enum EvictionCause {
export interface DurableRunRecord {
key: ActionKey;
taskRunExternalId: string;
invocationCount: number;
evictionPolicy: EvictionPolicy | undefined;
registeredAt: number;
@@ -32,12 +34,14 @@ export class DurableEvictionCache {
registerRun(
key: ActionKey,
taskRunExternalId: string,
invocationCount: number,
now: number,
evictionPolicy: EvictionPolicy | undefined
): void {
this._runs.set(key, {
key,
taskRunExternalId,
invocationCount,
evictionPolicy,
registeredAt: now,
waitingSince: undefined,
@@ -60,6 +64,13 @@ export class DurableEvictionCache {
return [...this._runs.values()].filter((r) => r._waitCount > 0);
}
findKeyByTaskRunExternalId(taskRunExternalId: string): ActionKey | undefined {
for (const [key, rec] of this._runs) {
if (rec.taskRunExternalId === taskRunExternalId) return key;
}
return undefined;
}
markWaiting(key: ActionKey, now: number, waitKind: string, resourceId: string): void {
const rec = this._runs.get(key);
if (!rec) return;
@@ -0,0 +1,116 @@
import { Logger } from '@hatchet/util/logger';
import { DurableEvictionManager } from './eviction-manager';
import { DurableEvictionCache } from './eviction-cache';
class NoopLogger extends Logger {
debug() {}
info() {}
green() {}
warn() {}
error() {}
util() {}
}
function makeManager() {
const cancelLocal = jest.fn();
const requestEvictionWithAck = jest.fn().mockResolvedValue(undefined);
const cache = new DurableEvictionCache();
const manager = new DurableEvictionManager({
durableSlots: 10,
cancelLocal,
requestEvictionWithAck,
config: { checkIntervalMs: 3_600_000 },
cache,
logger: new NoopLogger(),
});
return { manager, cancelLocal, requestEvictionWithAck, cache };
}
describe('DurableEvictionManager', () => {
describe('handleServerEviction', () => {
it('cancels and unregisters the matching run when invocationCount matches', () => {
const { manager, cancelLocal } = makeManager();
manager.registerRun('run-1/0', 'ext-1', 2, {
ttl: '30s',
allowCapacityEviction: true,
priority: 0,
});
manager.markWaiting('run-1/0', 'sleep', 's1');
manager.handleServerEviction('ext-1', 2);
expect(cancelLocal).toHaveBeenCalledWith('run-1/0');
expect(manager.cache.get('run-1/0')).toBeUndefined();
});
it('is a no-op for unknown taskRunExternalId', () => {
const { manager, cancelLocal } = makeManager();
manager.registerRun('run-1/0', 'ext-1', 1, undefined);
manager.handleServerEviction('no-such-id', 1);
expect(cancelLocal).not.toHaveBeenCalled();
expect(manager.cache.get('run-1/0')).toBeDefined();
});
it('only evicts the matching run, not others', () => {
const { manager, cancelLocal } = makeManager();
manager.registerRun('run-1/0', 'ext-1', 1, {
ttl: '30s',
allowCapacityEviction: true,
priority: 0,
});
manager.registerRun('run-2/0', 'ext-2', 1, {
ttl: '30s',
allowCapacityEviction: true,
priority: 0,
});
manager.markWaiting('run-1/0', 'sleep', 's1');
manager.markWaiting('run-2/0', 'sleep', 's2');
manager.handleServerEviction('ext-1', 1);
expect(cancelLocal).toHaveBeenCalledTimes(1);
expect(cancelLocal).toHaveBeenCalledWith('run-1/0');
expect(manager.cache.get('run-1/0')).toBeUndefined();
expect(manager.cache.get('run-2/0')).toBeDefined();
});
it('does not evict when invocationCount does not match (newer invocation)', () => {
const { manager, cancelLocal } = makeManager();
manager.registerRun('run-1/0', 'ext-1', 3, {
ttl: '30s',
allowCapacityEviction: true,
priority: 0,
});
manager.markWaiting('run-1/0', 'sleep', 's1');
manager.handleServerEviction('ext-1', 2);
expect(cancelLocal).not.toHaveBeenCalled();
expect(manager.cache.get('run-1/0')).toBeDefined();
});
it('evicts when invocationCount matches exactly', () => {
const { manager, cancelLocal } = makeManager();
manager.registerRun('run-1/0', 'ext-1', 5, {
ttl: '30s',
allowCapacityEviction: true,
priority: 0,
});
manager.markWaiting('run-1/0', 'sleep', 's1');
manager.handleServerEviction('ext-1', 5);
expect(cancelLocal).toHaveBeenCalledWith('run-1/0');
expect(manager.cache.get('run-1/0')).toBeUndefined();
});
});
});
@@ -70,9 +70,10 @@ export class DurableEvictionManager {
registerRun(
key: ActionKey,
taskRunExternalId: string,
invocationCount: number,
evictionPolicy: EvictionPolicy | undefined
): void {
this._cache.registerRun(key, taskRunExternalId, Date.now(), evictionPolicy);
this._cache.registerRun(key, taskRunExternalId, invocationCount, Date.now(), evictionPolicy);
}
unregisterRun(key: ActionKey): void {
@@ -87,6 +88,11 @@ export class DurableEvictionManager {
this._cache.markActive(key);
}
private _evictRun(key: ActionKey): void {
this._cancelLocal(key);
this.unregisterRun(key);
}
private async _tickSafe(): Promise<void> {
if (this._ticking) return;
this._ticking = true;
@@ -123,11 +129,23 @@ export class DurableEvictionManager {
);
await this._requestEvictionWithAck(key, rec);
this._cancelLocal(key);
this.unregisterRun(key);
this._evictRun(key);
}
}
handleServerEviction(taskRunExternalId: string, invocationCount: number): void {
const key = this._cache.findKeyByTaskRunExternalId(taskRunExternalId);
if (!key) return;
const rec = this._cache.get(key);
if (rec && rec.invocationCount !== invocationCount) return;
this._logger.info(
`DurableEvictionManager: server-initiated eviction for task_run_external_id=${taskRunExternalId} invocation_count=${invocationCount}`
);
this._evictRun(key);
}
async evictAllWaiting(): Promise<number> {
this.stop();
@@ -154,8 +172,7 @@ export class DurableEvictionManager {
continue;
}
this._cancelLocal(rec.key);
this.unregisterRun(rec.key);
this._evictRun(rec.key);
evicted++;
}
@@ -1,5 +1,7 @@
import { InternalWorker } from '@hatchet/v1/client/worker/worker-internal';
import { createAction } from '@hatchet/clients/dispatcher/action-listener';
import HatchetPromise, { CancellationReason } from '@util/hatchet-promise/hatchet-promise';
import { ActionType } from '@hatchet-dev/typescript-sdk/protoc/dispatcher';
describe('V1Worker handleCancelStepRun cancellation supervision', () => {
beforeEach(() => {
@@ -19,6 +21,8 @@ describe('V1Worker handleCancelStepRun cancellation supervision', () => {
};
const taskExternalId = 'task-1';
const retryCount = 0;
const actionKey = `${taskExternalId}/${retryCount}`;
// Use the real HatchetPromise behavior: cancel rejects the wrapper immediately,
// while the underlying work (`inner`) continues.
@@ -44,8 +48,8 @@ describe('V1Worker handleCancelStepRun cancellation supervision', () => {
},
cancellingTaskRuns: new Set(),
evictionManager: undefined,
futures: { [taskExternalId]: future },
contexts: { [taskExternalId]: ctx },
futures: { [actionKey]: future },
contexts: { [actionKey]: ctx },
cleanupRun(id: string) {
this.evictionManager?.unregisterRun(id);
delete this.futures[id];
@@ -53,7 +57,22 @@ describe('V1Worker handleCancelStepRun cancellation supervision', () => {
},
};
const action: any = { taskRunExternalId: taskExternalId };
const action = createAction({
taskRunExternalId: taskExternalId,
retryCount,
tenantId: 'tenant-1',
workflowRunId: 'workflow-run-1',
getGroupKeyRunId: '',
jobId: 'job-1',
jobName: 'job-1',
jobRunId: 'job-run-1',
taskId: 'task-1',
actionId: 'action-1',
actionType: ActionType.START_STEP_RUN,
actionPayload: 'action-payload-1',
taskName: 'task-1',
priority: 1,
});
const p = InternalWorker.prototype.handleCancelStepRun.call(fakeThis, action);
@@ -64,8 +83,8 @@ describe('V1Worker handleCancelStepRun cancellation supervision', () => {
expect(cancelSpy).toHaveBeenCalled();
expect(logger.warn).toHaveBeenCalled();
expect(fakeThis.futures[taskExternalId]).toBeUndefined();
expect(fakeThis.contexts[taskExternalId]).toBeUndefined();
expect(fakeThis.futures[actionKey]).toBeUndefined();
expect(fakeThis.contexts[actionKey]).toBeUndefined();
});
it('suppresses "was cancelled" debug log when cancellation is supervised', async () => {
@@ -3,7 +3,7 @@ import {
TaskRunTerminatedError,
isTaskRunTerminatedError,
} from '@util/errors/task-run-terminated-error';
import { Action, ActionListener } from '@clients/dispatcher/action-listener';
import { Action, ActionKey, ActionListener } from '@clients/dispatcher/action-listener';
import {
StepActionEvent,
StepActionEventType,
@@ -39,7 +39,7 @@ import { HealthServer, workerStatus, type WorkerStatus } from './health-server';
import { SlotConfig } from '../../slot-types';
import { DurableEvictionManager } from './eviction/eviction-manager';
import { EvictionPolicy, DEFAULT_DURABLE_TASK_EVICTION_POLICY } from './eviction/eviction-policy';
import { ActionKey, DurableRunRecord } from './eviction/eviction-cache';
import { DurableRunRecord } from './eviction/eviction-cache';
import { supportsEviction } from './engine-version';
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
@@ -68,8 +68,8 @@ export class InternalWorker {
evictionManager: DurableEvictionManager | undefined;
workflow_registry: Array<WorkflowDefinition> = [];
listener: ActionListener | undefined;
futures: Record<Action['taskRunExternalId'], HatchetPromise<any>> = {};
contexts: Record<Action['taskRunExternalId'], Context<any, any>> = {};
futures: Record<ActionKey, HatchetPromise<any>> = {};
contexts: Record<ActionKey, Context<any, any>> = {};
slots?: number;
durableSlots?: number;
slotConfig: SlotConfig;
@@ -444,7 +444,10 @@ export class InternalWorker {
const ctx = this.contexts[key] as DurableContext<any, any> | undefined;
if (ctx) {
const invocationCount = ctx.invocationCount ?? 1;
this.client.durableListener.cleanupTaskState(key, invocationCount);
this.client.durableListener.cleanupTaskState(
ctx.action.taskRunExternalId,
invocationCount
);
if (ctx.abortController) {
ctx.abortController.abort(err);
}
@@ -455,8 +458,8 @@ export class InternalWorker {
future.cancel(CancellationReason.EVICTED_BY_WORKER);
}
},
requestEvictionWithAck: async (_key: ActionKey, rec: DurableRunRecord) => {
const ctx = this.contexts[rec.taskRunExternalId] as DurableContext<any, any> | undefined;
requestEvictionWithAck: async (key: ActionKey, rec: DurableRunRecord) => {
const ctx = this.contexts[key] as DurableContext<any, any> | undefined;
const invocationCount = ctx?.invocationCount ?? 1;
await this.client.durableListener.sendEvictInvocation(
rec.taskRunExternalId,
@@ -467,18 +470,30 @@ export class InternalWorker {
logger: this.logger,
});
this.client.durableListener.onServerEvict = (durableTaskExternalId, invocationCount) => {
this.evictionManager?.handleServerEviction(durableTaskExternalId, invocationCount);
};
this.evictionManager.start();
return this.evictionManager;
}
private cleanupRun(taskRunExternalId: string): void {
this.evictionManager?.unregisterRun(taskRunExternalId);
delete this.futures[taskRunExternalId];
delete this.contexts[taskRunExternalId];
private cleanupRun(key: ActionKey): void {
const ctx = this.contexts[key];
if (ctx instanceof DurableContext) {
this.client.durableListener.cleanupTaskState(
ctx.action.taskRunExternalId,
ctx.invocationCount
);
}
this.evictionManager?.unregisterRun(key);
delete this.futures[key];
delete this.contexts[key];
}
async handleStartStepRun(action: Action) {
const { actionId, taskRunExternalId, taskName } = action;
const actionKey = action.key;
try {
const isDurable = this.durable_action_set.has(actionId);
@@ -492,7 +507,12 @@ export class InternalWorker {
await durableListener.ensureStarted(this.workerId || '');
mgr = this.ensureEvictionManager();
const evictionPolicy = this.eviction_policies.get(actionId);
mgr.registerRun(taskRunExternalId, taskRunExternalId, evictionPolicy);
mgr.registerRun(
actionKey,
taskRunExternalId,
action.durableTaskInvocationCount ?? 1,
evictionPolicy
);
}
context = new DurableContext(
@@ -507,14 +527,14 @@ export class InternalWorker {
context = new Context(action, this.client, this);
}
this.contexts[taskRunExternalId] = context;
this.contexts[actionKey] = context;
const step = this.action_registry[actionId];
if (!step) {
this.logger.error(`Registered actions: '${Object.keys(this.action_registry).join(', ')}'`);
this.logger.error(`Could not find step '${actionId}'`);
this.cleanupRun(taskRunExternalId);
this.cleanupRun(actionKey);
return;
}
@@ -604,7 +624,7 @@ export class InternalWorker {
`Could not send action event: ${actionEventError.message || actionEventError}`
);
} finally {
this.cleanupRun(taskRunExternalId);
this.cleanupRun(actionKey);
}
};
@@ -636,7 +656,7 @@ export class InternalWorker {
} catch (e: any) {
this.logger.error(`Could not send action event: ${e.message}`);
} finally {
this.cleanupRun(taskRunExternalId);
this.cleanupRun(actionKey);
}
};
@@ -665,7 +685,7 @@ export class InternalWorker {
await success(result);
})()
);
this.futures[taskRunExternalId] = future;
this.futures[actionKey] = future;
// Send the action event to the dispatcher
const event = this.getStepActionEvent(
@@ -690,10 +710,10 @@ export class InternalWorker {
);
}
} finally {
this.cleanupRun(taskRunExternalId);
this.cleanupRun(actionKey);
}
} catch (e: any) {
this.cleanupRun(taskRunExternalId);
this.cleanupRun(actionKey);
this.logger.error('Could not send action event (outer): ', e);
}
}
@@ -741,10 +761,11 @@ export class InternalWorker {
async handleCancelStepRun(action: Action) {
const { taskRunExternalId, taskName } = action;
const actionKey = action.key;
try {
const future = this.futures[taskRunExternalId];
const context = this.contexts[taskRunExternalId];
const future = this.futures[actionKey];
const context = this.contexts[actionKey];
const cancelErr = new TaskRunTerminatedError('cancelled', 'Cancelled by worker');
if (context && context.abortController) {
@@ -809,7 +830,7 @@ export class InternalWorker {
`Cancellation: error while supervising cancellation for task run ${taskRunExternalId}: ${e?.message || e}`
);
} finally {
this.cleanupRun(taskRunExternalId);
this.cleanupRun(actionKey);
}
}