mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-22 18:19:17 -05:00
Fix--durable-feedback (#3240)
* feat: duration takes ms * fix: nondeterminism err clarity * fix: cleanup * chore: lint * chore: feedback * fix: typescript * fix: python
This commit is contained in:
@@ -6,9 +6,11 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
@@ -144,16 +146,33 @@ func newDurableEventsRepository(shared *sharedRepository) DurableEventsRepositor
|
||||
}
|
||||
}
|
||||
|
||||
type NonDeterminismDetail struct {
|
||||
Expected string
|
||||
Received string
|
||||
}
|
||||
|
||||
type NonDeterminismError struct {
|
||||
NodeId int64
|
||||
BranchId int64
|
||||
TaskExternalId uuid.UUID
|
||||
ExpectedIdempotencyKey []byte
|
||||
ActualIdempotencyKey []byte
|
||||
NodeId int64
|
||||
BranchId int64
|
||||
TaskExternalId uuid.UUID
|
||||
ExpectedIdempotencyKey []byte
|
||||
ActualIdempotencyKey []byte
|
||||
ExpectedKind sqlcv1.V1DurableEventLogKind
|
||||
ActualKind sqlcv1.V1DurableEventLogKind
|
||||
ExistingEntryId int64
|
||||
ExistingEntryInsertedAt pgtype.Timestamptz
|
||||
ExistingEntryTenantId uuid.UUID
|
||||
Detail *NonDeterminismDetail
|
||||
}
|
||||
|
||||
func (m *NonDeterminismError) Error() string {
|
||||
return fmt.Sprintf("non-determinism detected for durable event log entry in task %s at node id %d", m.TaskExternalId.String(), m.NodeId)
|
||||
msg := fmt.Sprintf("non-determinism error in task %s at node %d:%d", m.TaskExternalId, m.NodeId, m.BranchId)
|
||||
|
||||
if m.Detail != nil {
|
||||
msg += "\n expected: " + m.Detail.Expected + "\n received: " + m.Detail.Received
|
||||
}
|
||||
|
||||
return msg
|
||||
}
|
||||
|
||||
type StaleInvocationError struct {
|
||||
@@ -166,6 +185,130 @@ func (e *StaleInvocationError) Error() string {
|
||||
return fmt.Sprintf("invocation count mismatch for task %s: server has %d, worker sent %d", e.TaskExternalId.String(), e.ExpectedInvocationCount, e.ActualInvocationCount)
|
||||
}
|
||||
|
||||
func formatConditionLabel(c CreateExternalSignalConditionOpt) string {
|
||||
switch c.Kind {
|
||||
case CreateExternalSignalConditionKindSLEEP:
|
||||
if c.SleepFor != nil {
|
||||
return "sleep(" + *c.SleepFor + ")"
|
||||
}
|
||||
return "sleep"
|
||||
case CreateExternalSignalConditionKindUSEREVENT:
|
||||
if c.UserEventKey != nil {
|
||||
return "waitForEvent(" + *c.UserEventKey + ")"
|
||||
}
|
||||
return "waitForEvent"
|
||||
default:
|
||||
return string(c.Kind)
|
||||
}
|
||||
}
|
||||
|
||||
const maxDisplayLabels = 5
|
||||
|
||||
func summarizeLabels(labels []string) string {
|
||||
if len(labels) <= maxDisplayLabels {
|
||||
return strings.Join(labels, ", ")
|
||||
}
|
||||
|
||||
counts := make(map[string]int, len(labels))
|
||||
order := make([]string, 0)
|
||||
|
||||
for _, l := range labels {
|
||||
if counts[l] == 0 {
|
||||
order = append(order, l)
|
||||
}
|
||||
counts[l]++
|
||||
}
|
||||
|
||||
parts := make([]string, 0, min(len(order), maxDisplayLabels))
|
||||
|
||||
for i, name := range order {
|
||||
if i >= maxDisplayLabels {
|
||||
break
|
||||
}
|
||||
|
||||
if counts[name] > 1 {
|
||||
parts = append(parts, fmt.Sprintf("%dx %s", counts[name], name))
|
||||
} else {
|
||||
parts = append(parts, name)
|
||||
}
|
||||
}
|
||||
|
||||
if remaining := len(order) - maxDisplayLabels; remaining > 0 {
|
||||
parts = append(parts, fmt.Sprintf("... +%d more unique", remaining))
|
||||
}
|
||||
|
||||
return strings.Join(parts, ", ")
|
||||
}
|
||||
|
||||
func (opts IngestDurableTaskEventOpts) formatCall() string {
|
||||
switch opts.Kind {
|
||||
case sqlcv1.V1DurableEventLogKindRUN:
|
||||
if opts.TriggerRuns != nil {
|
||||
names := make([]string, 0, len(opts.TriggerRuns.TriggerOpts))
|
||||
for _, t := range opts.TriggerRuns.TriggerOpts {
|
||||
names = append(names, t.WorkflowName)
|
||||
}
|
||||
return "run(" + summarizeLabels(names) + ")"
|
||||
}
|
||||
case sqlcv1.V1DurableEventLogKindWAITFOR:
|
||||
if opts.WaitFor != nil {
|
||||
parts := make([]string, 0, len(opts.WaitFor.WaitForConditions))
|
||||
for _, c := range opts.WaitFor.WaitForConditions {
|
||||
parts = append(parts, formatConditionLabel(c))
|
||||
}
|
||||
return "waitFor(" + summarizeLabels(parts) + ")"
|
||||
}
|
||||
case sqlcv1.V1DurableEventLogKindMEMO:
|
||||
return "memo"
|
||||
}
|
||||
|
||||
return string(opts.Kind)
|
||||
}
|
||||
|
||||
func formatStoredPayload(kind sqlcv1.V1DurableEventLogKind, payload []byte) string {
|
||||
if len(payload) == 0 {
|
||||
return string(kind)
|
||||
}
|
||||
|
||||
switch kind {
|
||||
case sqlcv1.V1DurableEventLogKindRUN:
|
||||
var triggerOpts WorkflowNameTriggerOpts
|
||||
|
||||
if err := json.Unmarshal(payload, &triggerOpts); err != nil {
|
||||
return string(kind)
|
||||
}
|
||||
|
||||
if triggerOpts.WorkflowName != "" {
|
||||
return "run(" + triggerOpts.WorkflowName + ")"
|
||||
}
|
||||
case sqlcv1.V1DurableEventLogKindWAITFOR:
|
||||
var conditions []CreateExternalSignalConditionOpt
|
||||
|
||||
if err := json.Unmarshal(payload, &conditions); err != nil {
|
||||
return string(kind)
|
||||
}
|
||||
|
||||
if len(conditions) > 0 {
|
||||
parts := make([]string, 0, len(conditions))
|
||||
for _, c := range conditions {
|
||||
parts = append(parts, formatConditionLabel(c))
|
||||
}
|
||||
return "waitFor(" + summarizeLabels(parts) + ")"
|
||||
}
|
||||
case sqlcv1.V1DurableEventLogKindMEMO:
|
||||
return "memo"
|
||||
}
|
||||
|
||||
return string(kind)
|
||||
}
|
||||
|
||||
func nonDeterminismDetail(opts IngestDurableTaskEventOpts, expectedKind sqlcv1.V1DurableEventLogKind, existingPayload []byte) *NonDeterminismDetail {
|
||||
return &NonDeterminismDetail{
|
||||
Expected: formatStoredPayload(expectedKind, existingPayload),
|
||||
Received: opts.formatCall(),
|
||||
}
|
||||
}
|
||||
|
||||
type GetOrCreateLogEntryOpt struct {
|
||||
Kind sqlcv1.V1DurableEventLogKind
|
||||
IdempotencyKey []byte
|
||||
@@ -477,11 +620,16 @@ func (r *durableEventsRepository) getOrCreateEventLogEntries(
|
||||
|
||||
if !bytes.Equal(o.IdempotencyKey, existingEntry.IdempotencyKey) {
|
||||
return nil, &NonDeterminismError{
|
||||
BranchId: o.BranchId,
|
||||
NodeId: o.NodeId,
|
||||
TaskExternalId: opts.DurableTaskExternalId,
|
||||
ExpectedIdempotencyKey: existingEntry.IdempotencyKey,
|
||||
ActualIdempotencyKey: o.IdempotencyKey,
|
||||
BranchId: o.BranchId,
|
||||
NodeId: o.NodeId,
|
||||
TaskExternalId: opts.DurableTaskExternalId,
|
||||
ExpectedIdempotencyKey: existingEntry.IdempotencyKey,
|
||||
ActualIdempotencyKey: o.IdempotencyKey,
|
||||
ExpectedKind: existingEntry.Kind,
|
||||
ActualKind: o.Kind,
|
||||
ExistingEntryId: existingEntry.ID,
|
||||
ExistingEntryInsertedAt: existingEntry.InsertedAt,
|
||||
ExistingEntryTenantId: existingEntry.TenantID,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -775,6 +923,26 @@ func (r *durableEventsRepository) IngestDurableTaskEvent(ctx context.Context, op
|
||||
|
||||
logEntries, err := r.getOrCreateEventLogEntries(ctx, tx, getOrCreateOpts)
|
||||
if err != nil {
|
||||
var nde *NonDeterminismError
|
||||
if errors.As(err, &nde) {
|
||||
var existingPayload []byte
|
||||
payloads, retrieveErr := r.payloadStore.Retrieve(ctx, tx, RetrievePayloadOpts{
|
||||
Id: nde.ExistingEntryId,
|
||||
InsertedAt: nde.ExistingEntryInsertedAt,
|
||||
Type: sqlcv1.V1PayloadTypeDURABLEEVENTLOGENTRYDATA,
|
||||
TenantId: nde.ExistingEntryTenantId,
|
||||
})
|
||||
if retrieveErr == nil {
|
||||
existingPayload = payloads[RetrievePayloadOpts{
|
||||
Id: nde.ExistingEntryId,
|
||||
InsertedAt: nde.ExistingEntryInsertedAt,
|
||||
Type: sqlcv1.V1PayloadTypeDURABLEEVENTLOGENTRYDATA,
|
||||
TenantId: nde.ExistingEntryTenantId,
|
||||
}]
|
||||
}
|
||||
nde.Detail = nonDeterminismDetail(opts, nde.ExpectedKind, existingPayload)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to get or create event log entries: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
@@ -277,3 +278,208 @@ func TestCreateIdempotencyKey_AdditionalMetadataIgnored(t *testing.T) {
|
||||
|
||||
assert.Equal(t, base, withMeta)
|
||||
}
|
||||
|
||||
func TestNonDeterminismError_SameKind(t *testing.T) {
|
||||
id := uuid.New()
|
||||
err := &NonDeterminismError{
|
||||
NodeId: 3,
|
||||
BranchId: 1,
|
||||
TaskExternalId: id,
|
||||
Detail: &NonDeterminismDetail{
|
||||
Expected: "waitFor(sleep(2s))",
|
||||
Received: "waitFor(sleep(4s))",
|
||||
},
|
||||
}
|
||||
|
||||
assert.Contains(t, err.Error(), id.String())
|
||||
assert.Contains(t, err.Error(), "node 3:1")
|
||||
assert.Contains(t, err.Error(), "expected: waitFor(sleep(2s))")
|
||||
assert.Contains(t, err.Error(), "received: waitFor(sleep(4s))")
|
||||
}
|
||||
|
||||
func TestNonDeterminismError_DifferentKinds(t *testing.T) {
|
||||
id := uuid.New()
|
||||
err := &NonDeterminismError{
|
||||
NodeId: 5,
|
||||
BranchId: 2,
|
||||
TaskExternalId: id,
|
||||
Detail: &NonDeterminismDetail{
|
||||
Expected: "MEMO",
|
||||
Received: "run(my-workflow)",
|
||||
},
|
||||
}
|
||||
|
||||
assert.Contains(t, err.Error(), "expected: MEMO")
|
||||
assert.Contains(t, err.Error(), "received: run(my-workflow)")
|
||||
}
|
||||
|
||||
func TestNonDeterminismError_NoDetail(t *testing.T) {
|
||||
id := uuid.New()
|
||||
err := &NonDeterminismError{
|
||||
NodeId: 1,
|
||||
BranchId: 1,
|
||||
TaskExternalId: id,
|
||||
}
|
||||
|
||||
msg := err.Error()
|
||||
assert.Contains(t, msg, "non-determinism error")
|
||||
assert.NotContains(t, msg, "expected:")
|
||||
assert.NotContains(t, msg, "received:")
|
||||
}
|
||||
|
||||
func TestNonDeterminismError_ImplementsError(t *testing.T) {
|
||||
err := &NonDeterminismError{TaskExternalId: uuid.New()}
|
||||
var target *NonDeterminismError
|
||||
assert.True(t, errors.As(err, &target))
|
||||
}
|
||||
|
||||
func TestFormatCall_Run(t *testing.T) {
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindRUN},
|
||||
TriggerRuns: &IngestTriggerRunsOpts{
|
||||
TriggerOpts: []*WorkflowNameTriggerOpts{
|
||||
{TriggerTaskData: &TriggerTaskData{WorkflowName: "wf-a"}},
|
||||
{TriggerTaskData: &TriggerTaskData{WorkflowName: "wf-b"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
assert.Equal(t, "run(wf-a, wf-b)", opts.formatCall())
|
||||
}
|
||||
|
||||
func TestFormatCall_WaitFor(t *testing.T) {
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindWAITFOR},
|
||||
WaitFor: &IngestWaitForOpts{
|
||||
WaitForConditions: []CreateExternalSignalConditionOpt{
|
||||
{Kind: CreateExternalSignalConditionKindSLEEP, SleepFor: strPtr("10s")},
|
||||
{Kind: CreateExternalSignalConditionKindUSEREVENT, UserEventKey: strPtr("user:signup")},
|
||||
},
|
||||
},
|
||||
}
|
||||
assert.Equal(t, "waitFor(sleep(10s), waitForEvent(user:signup))", opts.formatCall())
|
||||
}
|
||||
|
||||
func TestFormatCall_Memo(t *testing.T) {
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindMEMO},
|
||||
}
|
||||
assert.Equal(t, "memo", opts.formatCall())
|
||||
}
|
||||
|
||||
func TestFormatCall_RunBulkWithDuplicates(t *testing.T) {
|
||||
triggers := make([]*WorkflowNameTriggerOpts, 0, 8)
|
||||
for i := 0; i < 6; i++ {
|
||||
triggers = append(triggers, &WorkflowNameTriggerOpts{
|
||||
TriggerTaskData: &TriggerTaskData{WorkflowName: "wf-a"},
|
||||
})
|
||||
}
|
||||
triggers = append(triggers,
|
||||
&WorkflowNameTriggerOpts{TriggerTaskData: &TriggerTaskData{WorkflowName: "wf-b"}},
|
||||
&WorkflowNameTriggerOpts{TriggerTaskData: &TriggerTaskData{WorkflowName: "wf-c"}},
|
||||
)
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindRUN},
|
||||
TriggerRuns: &IngestTriggerRunsOpts{TriggerOpts: triggers},
|
||||
}
|
||||
assert.Equal(t, "run(6x wf-a, wf-b, wf-c)", opts.formatCall())
|
||||
}
|
||||
|
||||
func TestFormatCall_RunBulkExceedsMaxLabels(t *testing.T) {
|
||||
names := []string{"a", "b", "c", "d", "e", "f", "g"}
|
||||
triggers := make([]*WorkflowNameTriggerOpts, len(names))
|
||||
for i, n := range names {
|
||||
triggers[i] = &WorkflowNameTriggerOpts{
|
||||
TriggerTaskData: &TriggerTaskData{WorkflowName: n},
|
||||
}
|
||||
}
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindRUN},
|
||||
TriggerRuns: &IngestTriggerRunsOpts{TriggerOpts: triggers},
|
||||
}
|
||||
assert.Equal(t, "run(a, b, c, d, e, ... +2 more unique)", opts.formatCall())
|
||||
}
|
||||
|
||||
func TestFormatCall_WaitForBulkMixed(t *testing.T) {
|
||||
conditions := make([]CreateExternalSignalConditionOpt, 0, 8)
|
||||
for i := 0; i < 4; i++ {
|
||||
conditions = append(conditions, CreateExternalSignalConditionOpt{
|
||||
Kind: CreateExternalSignalConditionKindSLEEP, SleepFor: strPtr("5s"),
|
||||
})
|
||||
}
|
||||
conditions = append(conditions,
|
||||
CreateExternalSignalConditionOpt{Kind: CreateExternalSignalConditionKindUSEREVENT, UserEventKey: strPtr("ev1")},
|
||||
CreateExternalSignalConditionOpt{Kind: CreateExternalSignalConditionKindUSEREVENT, UserEventKey: strPtr("ev2")},
|
||||
CreateExternalSignalConditionOpt{Kind: CreateExternalSignalConditionKindUSEREVENT, UserEventKey: strPtr("ev3")},
|
||||
CreateExternalSignalConditionOpt{Kind: CreateExternalSignalConditionKindUSEREVENT, UserEventKey: strPtr("ev4")},
|
||||
)
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindWAITFOR},
|
||||
WaitFor: &IngestWaitForOpts{WaitForConditions: conditions},
|
||||
}
|
||||
assert.Equal(t, "waitFor(4x sleep(5s), waitForEvent(ev1), waitForEvent(ev2), waitForEvent(ev3), waitForEvent(ev4))", opts.formatCall())
|
||||
}
|
||||
|
||||
func TestFormatCall_RunExactlyAtMaxLabels(t *testing.T) {
|
||||
names := []string{"a", "b", "c", "d", "e"}
|
||||
triggers := make([]*WorkflowNameTriggerOpts, len(names))
|
||||
for i, n := range names {
|
||||
triggers[i] = &WorkflowNameTriggerOpts{
|
||||
TriggerTaskData: &TriggerTaskData{WorkflowName: n},
|
||||
}
|
||||
}
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindRUN},
|
||||
TriggerRuns: &IngestTriggerRunsOpts{TriggerOpts: triggers},
|
||||
}
|
||||
assert.Equal(t, "run(a, b, c, d, e)", opts.formatCall())
|
||||
}
|
||||
|
||||
func TestFormatStoredPayload_Run(t *testing.T) {
|
||||
payload, _ := json.Marshal(WorkflowNameTriggerOpts{
|
||||
TriggerTaskData: &TriggerTaskData{WorkflowName: "my-workflow"},
|
||||
})
|
||||
assert.Equal(t, "run(my-workflow)", formatStoredPayload(sqlcv1.V1DurableEventLogKindRUN, payload))
|
||||
}
|
||||
|
||||
func TestFormatStoredPayload_WaitFor(t *testing.T) {
|
||||
payload, _ := json.Marshal([]CreateExternalSignalConditionOpt{
|
||||
{Kind: CreateExternalSignalConditionKindSLEEP, SleepFor: strPtr("2s")},
|
||||
})
|
||||
assert.Equal(t, "waitFor(sleep(2s))", formatStoredPayload(sqlcv1.V1DurableEventLogKindWAITFOR, payload))
|
||||
}
|
||||
|
||||
func TestFormatStoredPayload_NoPayload(t *testing.T) {
|
||||
assert.Equal(t, "MEMO", formatStoredPayload(sqlcv1.V1DurableEventLogKindMEMO, nil))
|
||||
assert.Equal(t, "RUN", formatStoredPayload(sqlcv1.V1DurableEventLogKindRUN, nil))
|
||||
}
|
||||
|
||||
func TestNonDeterminismDetail_WithPayload(t *testing.T) {
|
||||
existingPayload, _ := json.Marshal([]CreateExternalSignalConditionOpt{
|
||||
{Kind: CreateExternalSignalConditionKindSLEEP, SleepFor: strPtr("2s")},
|
||||
})
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindWAITFOR},
|
||||
WaitFor: &IngestWaitForOpts{
|
||||
WaitForConditions: []CreateExternalSignalConditionOpt{
|
||||
{Kind: CreateExternalSignalConditionKindSLEEP, SleepFor: strPtr("4s")},
|
||||
},
|
||||
},
|
||||
}
|
||||
detail := nonDeterminismDetail(opts, sqlcv1.V1DurableEventLogKindWAITFOR, existingPayload)
|
||||
assert.Equal(t, "waitFor(sleep(2s))", detail.Expected)
|
||||
assert.Equal(t, "waitFor(sleep(4s))", detail.Received)
|
||||
}
|
||||
|
||||
func TestNonDeterminismDetail_KindMismatch(t *testing.T) {
|
||||
opts := IngestDurableTaskEventOpts{
|
||||
BaseIngestEventOpts: &BaseIngestEventOpts{Kind: sqlcv1.V1DurableEventLogKindRUN},
|
||||
TriggerRuns: &IngestTriggerRunsOpts{
|
||||
TriggerOpts: []*WorkflowNameTriggerOpts{
|
||||
{TriggerTaskData: &TriggerTaskData{WorkflowName: "my-wf"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
detail := nonDeterminismDetail(opts, sqlcv1.V1DurableEventLogKindMEMO, nil)
|
||||
assert.Equal(t, "MEMO", detail.Expected)
|
||||
assert.Equal(t, "run(my-wf)", detail.Received)
|
||||
}
|
||||
|
||||
@@ -228,6 +228,8 @@ class DurableEventListener:
|
||||
self._running = False
|
||||
self._buffered_completions.stop_eviction_job()
|
||||
|
||||
self._fail_all_pending(Exception("DurableListener stopped"))
|
||||
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
with suppress(asyncio.CancelledError):
|
||||
@@ -290,6 +292,15 @@ class DurableEventListener:
|
||||
eviction_future.set_exception(exc)
|
||||
self._pending_eviction_acks.clear()
|
||||
|
||||
def _fail_all_pending(self, exc: Exception) -> None:
|
||||
self._fail_pending_acks(exc)
|
||||
|
||||
for future in self._pending_callbacks.values():
|
||||
if not future.done():
|
||||
future.set_exception(exc)
|
||||
self._pending_callbacks.clear()
|
||||
self._buffered_completions.clear()
|
||||
|
||||
async def _receive_loop(self) -> None:
|
||||
while self._running:
|
||||
if not self._stream:
|
||||
|
||||
@@ -14,8 +14,14 @@ class NonDeterminismError(Exception):
|
||||
self.message = message
|
||||
self.node_id = node_id
|
||||
|
||||
detail = (
|
||||
message
|
||||
if message
|
||||
else f"Non-determinism detected in task {task_external_id} on invocation {invocation_count} at node {node_id}"
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
f"Non-determinism detected in task {task_external_id} on invocation {invocation_count} at node {node_id}.\nCheck out our documentation for more details on expectations of durable tasks: https://docs.hatchet.run/v1/patterns/mixing-patterns"
|
||||
f"{detail}\nCheck out our documentation for more details on expectations of durable tasks: https://docs.hatchet.run/v1/patterns/mixing-patterns"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -57,6 +57,9 @@ class TTLCache(Generic[K, V]):
|
||||
def pop(self, key: K) -> V:
|
||||
return self.cache.pop(key).value
|
||||
|
||||
def clear(self) -> None:
|
||||
self.cache.clear()
|
||||
|
||||
def stop_eviction_job(self) -> None:
|
||||
self.eviction_job.cancel()
|
||||
|
||||
|
||||
@@ -189,9 +189,6 @@ class DurableEvictionManager:
|
||||
evicted = 0
|
||||
|
||||
for rec in waiting:
|
||||
if rec.eviction_policy is None:
|
||||
continue
|
||||
|
||||
rec.eviction_reason = _build_eviction_reason(
|
||||
EvictionCause.WORKER_SHUTDOWN, rec
|
||||
)
|
||||
@@ -209,8 +206,9 @@ class DurableEvictionManager:
|
||||
f"DurableEvictionManager: failed to send eviction for "
|
||||
f"step_run_id={rec.step_run_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Always cancel locally even if the server ACK failed, so the
|
||||
# future settles and exit_gracefully doesn't hang.
|
||||
self._evict_run(rec.key)
|
||||
evicted += 1
|
||||
|
||||
|
||||
@@ -246,6 +246,10 @@ async def test_pending_callbacks_survive_disconnect(
|
||||
await harness.start()
|
||||
|
||||
future: asyncio.Future[object] = asyncio.get_event_loop().create_future()
|
||||
# Swallow the exception that stop() will set during teardown
|
||||
future.add_done_callback(
|
||||
lambda f: f.exception() if f.done() and not f.cancelled() else None
|
||||
)
|
||||
harness.listener._pending_callbacks[("task1", 1, 0, 1)] = future # type: ignore[assignment]
|
||||
|
||||
await asyncio.sleep(0.15)
|
||||
|
||||
+55
@@ -282,6 +282,8 @@ describe('DurableListenerClient reconnection', () => {
|
||||
it('preserves pending callbacks (server-side state survives reconnection)', () => {
|
||||
const l = listener as any;
|
||||
const d = makeDeferred();
|
||||
// Swallow the rejection that stop() will produce in afterEach
|
||||
d.promise.catch(() => {});
|
||||
l._pendingCallbacks.set('task:1:0:1', d);
|
||||
|
||||
l._failPendingAcks(new Error('disconnected'));
|
||||
@@ -317,6 +319,57 @@ describe('DurableListenerClient reconnection', () => {
|
||||
});
|
||||
});
|
||||
|
||||
// ── _failAllPending correctness (used on stop) ──
|
||||
|
||||
describe('_failAllPending', () => {
|
||||
beforeEach(async () => {
|
||||
const h = tracked(hangingStream());
|
||||
grpcClient.durableTask.mockReturnValue(h.stream);
|
||||
await listener.start('w1');
|
||||
});
|
||||
|
||||
it('rejects pending callbacks and clears the map', () => {
|
||||
const l = listener as any;
|
||||
const d = makeDeferred();
|
||||
l._pendingCallbacks.set('task:1:0:1', d);
|
||||
|
||||
l._failAllPending(new Error('stopped'));
|
||||
|
||||
expect(l._pendingCallbacks.size).toBe(0);
|
||||
return expect(d.promise).rejects.toThrow('stopped');
|
||||
});
|
||||
|
||||
it('clears buffered completions', () => {
|
||||
const l = listener as any;
|
||||
l._bufferedCompletions.set('task:1:0:1', {
|
||||
durableTaskExternalId: 'task',
|
||||
nodeId: 1,
|
||||
payload: {},
|
||||
});
|
||||
|
||||
l._failAllPending(new Error('stopped'));
|
||||
|
||||
expect(l._bufferedCompletions.size).toBe(0);
|
||||
});
|
||||
|
||||
it('also rejects pending event acks and eviction acks', () => {
|
||||
const l = listener as any;
|
||||
const ackD = makeDeferred();
|
||||
const evD = makeDeferred();
|
||||
l._pendingEventAcks.set('task:1', ackD);
|
||||
l._pendingEvictionAcks.set('task:1', evD);
|
||||
|
||||
l._failAllPending(new Error('stopped'));
|
||||
|
||||
expect(l._pendingEventAcks.size).toBe(0);
|
||||
expect(l._pendingEvictionAcks.size).toBe(0);
|
||||
return Promise.all([
|
||||
expect(ackD.promise).rejects.toThrow('stopped'),
|
||||
expect(evD.promise).rejects.toThrow('stopped'),
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
// ── pending state rejected on stream disconnect ──
|
||||
|
||||
describe('pending state is rejected on stream disconnect', () => {
|
||||
@@ -348,6 +401,8 @@ describe('DurableListenerClient reconnection', () => {
|
||||
await settle();
|
||||
|
||||
const d = makeDeferred();
|
||||
// Swallow the rejection that stop() will produce in afterEach
|
||||
d.promise.catch(() => {});
|
||||
(listener as any)._pendingCallbacks.set('task:1:0:1', d);
|
||||
|
||||
ctrl.end();
|
||||
|
||||
@@ -76,6 +76,10 @@ class TTLMap<K, V> {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
clear(): void {
|
||||
this.cache.clear();
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
clearInterval(this.timer);
|
||||
this.cache.clear();
|
||||
@@ -402,6 +406,16 @@ export class DurableListenerClient {
|
||||
this._pendingEvictionAcks.clear();
|
||||
}
|
||||
|
||||
private _failAllPending(exc: Error): void {
|
||||
this._failPendingAcks(exc);
|
||||
|
||||
for (const d of this._pendingCallbacks.values()) {
|
||||
d.reject(exc);
|
||||
}
|
||||
this._pendingCallbacks.clear();
|
||||
this._bufferedCompletions.clear();
|
||||
}
|
||||
|
||||
private _handleResponse(response: DurableTaskResponse): void {
|
||||
if (response.registerWorker) {
|
||||
// registration acknowledged
|
||||
|
||||
@@ -6,8 +6,11 @@ export class NonDeterminismError extends HatchetError {
|
||||
nodeId: number;
|
||||
|
||||
constructor(taskExternalId: string, invocationCount: number, nodeId: number, message: string) {
|
||||
const detail = message
|
||||
? message
|
||||
: `Non-determinism detected in task ${taskExternalId} on invocation ${invocationCount} at node ${nodeId}`;
|
||||
super(
|
||||
`Non-determinism detected in task ${taskExternalId} on invocation ${invocationCount} at node ${nodeId}: ${message}\n` +
|
||||
`${detail}\n` +
|
||||
`Check out our documentation for more details on expectations of durable tasks: https://docs.hatchet.run/v1/patterns/mixing-patterns`
|
||||
);
|
||||
this.name = 'NonDeterminismError';
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
import { durationToString, durationToMs, Duration } from './duration';
|
||||
|
||||
describe('durationToString', () => {
|
||||
it('passes through a duration string as-is', () => {
|
||||
expect(durationToString('1h30m5s')).toBe('1h30m5s');
|
||||
expect(durationToString('10m')).toBe('10m');
|
||||
expect(durationToString('30s')).toBe('30s');
|
||||
});
|
||||
|
||||
it('converts a DurationObject to a string', () => {
|
||||
expect(durationToString({ hours: 1, minutes: 30, seconds: 5 })).toBe('1h30m5s');
|
||||
expect(durationToString({ minutes: 10 })).toBe('10m');
|
||||
expect(durationToString({ seconds: 45 })).toBe('45s');
|
||||
expect(durationToString({ hours: 2 })).toBe('2h');
|
||||
});
|
||||
|
||||
it('returns "0s" for an empty DurationObject', () => {
|
||||
expect(durationToString({})).toBe('0s');
|
||||
});
|
||||
|
||||
it('converts milliseconds to a string', () => {
|
||||
expect(durationToString(0)).toBe('0s');
|
||||
expect(durationToString(5000)).toBe('5s');
|
||||
expect(durationToString(60_000)).toBe('1m');
|
||||
expect(durationToString(3_600_000)).toBe('1h');
|
||||
expect(durationToString(5_405_000)).toBe('1h30m5s');
|
||||
});
|
||||
|
||||
it('truncates sub-second remainders from milliseconds', () => {
|
||||
expect(durationToString(1500)).toBe('1s');
|
||||
expect(durationToString(999)).toBe('0s');
|
||||
});
|
||||
});
|
||||
|
||||
describe('durationToMs', () => {
|
||||
it('parses a seconds-only string', () => {
|
||||
expect(durationToMs('30s')).toBe(30_000);
|
||||
});
|
||||
|
||||
it('parses a minutes-only string', () => {
|
||||
expect(durationToMs('10m')).toBe(600_000);
|
||||
});
|
||||
|
||||
it('parses an hours-only string', () => {
|
||||
expect(durationToMs('2h')).toBe(7_200_000);
|
||||
});
|
||||
|
||||
it('parses a multi-unit string', () => {
|
||||
expect(durationToMs('1h30m5s')).toBe(5_405_000);
|
||||
});
|
||||
|
||||
it('converts a DurationObject', () => {
|
||||
expect(durationToMs({ hours: 1, minutes: 30, seconds: 5 })).toBe(5_405_000);
|
||||
expect(durationToMs({ seconds: 10 })).toBe(10_000);
|
||||
expect(durationToMs({})).toBe(0);
|
||||
});
|
||||
|
||||
it('returns a number (milliseconds) as-is', () => {
|
||||
expect(durationToMs(42_000)).toBe(42_000);
|
||||
expect(durationToMs(0)).toBe(0);
|
||||
});
|
||||
|
||||
it('throws on an invalid string', () => {
|
||||
expect(() => durationToMs('bad' as Duration)).toThrow(/Invalid duration string/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('round-trip: durationToMs → durationToString', () => {
|
||||
const cases: [Duration, number, string][] = [
|
||||
['1h30m5s', 5_405_000, '1h30m5s'],
|
||||
['10m', 600_000, '10m'],
|
||||
['30s', 30_000, '30s'],
|
||||
[{ hours: 2, minutes: 15 }, 8_100_000, '2h15m'],
|
||||
[60_000, 60_000, '1m'],
|
||||
];
|
||||
|
||||
it.each(cases)('input %j → %d ms → "%s"', (input, expectedMs, expectedStr) => {
|
||||
const ms = durationToMs(input);
|
||||
expect(ms).toBe(expectedMs);
|
||||
expect(durationToString(ms)).toBe(expectedStr);
|
||||
});
|
||||
});
|
||||
@@ -17,13 +17,25 @@ export interface DurationObject {
|
||||
seconds?: number;
|
||||
}
|
||||
|
||||
export type Duration = DurationString | DurationObject;
|
||||
/** A number is treated as milliseconds. */
|
||||
export type Duration = DurationString | DurationObject | number;
|
||||
|
||||
const DURATION_RE = /^(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$/;
|
||||
|
||||
/** Normalizes a Duration to Go-style string format (e.g. "1h30m5s"). */
|
||||
export function durationToString(d: Duration): string {
|
||||
if (typeof d === 'string') return d;
|
||||
if (typeof d === 'number') {
|
||||
const totalSeconds = Math.floor(d / 1000);
|
||||
const h = Math.floor(totalSeconds / 3600);
|
||||
const m = Math.floor((totalSeconds % 3600) / 60);
|
||||
const s = totalSeconds % 60;
|
||||
let out = '';
|
||||
if (h) out += `${h}h`;
|
||||
if (m) out += `${m}m`;
|
||||
if (s || !out) out += `${s}s`;
|
||||
return out;
|
||||
}
|
||||
let s = '';
|
||||
if (d.hours) s += `${d.hours}h`;
|
||||
if (d.minutes) s += `${d.minutes}m`;
|
||||
@@ -32,6 +44,7 @@ export function durationToString(d: Duration): string {
|
||||
}
|
||||
|
||||
export function durationToMs(d: Duration): number {
|
||||
if (typeof d === 'number') return d;
|
||||
if (typeof d === 'object') {
|
||||
return ((d.hours ?? 0) * 3600 + (d.minutes ?? 0) * 60 + (d.seconds ?? 0)) * 1000;
|
||||
}
|
||||
|
||||
@@ -153,8 +153,6 @@ export class DurableEvictionManager {
|
||||
let evicted = 0;
|
||||
|
||||
for (const rec of waiting) {
|
||||
if (!rec.evictionPolicy) continue;
|
||||
|
||||
rec.evictionReason = buildEvictionReason(EvictionCause.WORKER_SHUTDOWN, rec);
|
||||
|
||||
this._logger.debug(
|
||||
@@ -169,9 +167,11 @@ export class DurableEvictionManager {
|
||||
`DurableEvictionManager: failed to send eviction for ` +
|
||||
`task_run_external_id=${rec.taskRunExternalId}: ${getErrorMessage(err)}`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Always cancel locally even if the server ACK failed, so the
|
||||
// future settles and exitGracefully doesn't hang.
|
||||
// This will get resolved by the reassignment of the task.
|
||||
this._evictRun(rec.key);
|
||||
evicted++;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user