Hotfix: Handle EOF Properly (#1557)

* fix: handle EOF properly

* chore: version

* fix: debug logs

* fix: rm eof type
This commit is contained in:
Matt Kaye
2025-04-15 13:53:13 -04:00
committed by GitHub
parent 1dcb50c170
commit 3ee5a1ef88
4 changed files with 17 additions and 13 deletions

View File

@@ -7,7 +7,6 @@ from typing import Any, AsyncGenerator, cast
import grpc
import grpc.aio
from grpc._cython import cygrpc # type: ignore[attr-defined]
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from hatchet_sdk.clients.event_ts import ThreadSafeEvent, read_with_interrupt
@@ -267,7 +266,6 @@ class ActionListener:
await self.interrupt.wait()
if not t.done():
# print a warning
logger.warning(
"Interrupted read_with_interrupt task of action listener"
)
@@ -277,9 +275,10 @@ class ActionListener:
break
assigned_action, _ = t.result()
assigned_action, _, is_eof = t.result()
if assigned_action is cygrpc.EOF:
if is_eof:
logger.debug("Handling EOF in Action Listener")
self.retries = self.retries + 1
break

View File

@@ -4,6 +4,8 @@ from typing import Callable, TypeVar, cast, overload
import grpc.aio
from grpc._cython import cygrpc # type: ignore[attr-defined]
from hatchet_sdk.logger import logger
class ThreadSafeEvent(asyncio.Event):
"""
@@ -32,7 +34,7 @@ async def read_with_interrupt(
listener: grpc.aio.UnaryStreamCall[TRequest, TResponse],
interrupt: ThreadSafeEvent,
key_generator: Callable[[TResponse], str],
) -> tuple[TResponse, str]: ...
) -> tuple[TResponse, str, bool]: ...
@overload
@@ -40,22 +42,23 @@ async def read_with_interrupt(
listener: grpc.aio.UnaryStreamCall[TRequest, TResponse],
interrupt: ThreadSafeEvent,
key_generator: None = None,
) -> tuple[TResponse, None]: ...
) -> tuple[TResponse, None, bool]: ...
async def read_with_interrupt(
listener: grpc.aio.UnaryStreamCall[TRequest, TResponse],
interrupt: ThreadSafeEvent,
key_generator: Callable[[TResponse], str] | None = None,
) -> tuple[TResponse, str | None]:
) -> tuple[TResponse, str | None, bool]:
try:
result = cast(TResponse, await listener.read())
if result is cygrpc.EOF:
raise ValueError("Unexpected EOF")
logger.warning("Received EOF from engine")
return cast(TResponse, None), None, True
key = key_generator(result) if key_generator else None
return result, key
return result, key, False
finally:
interrupt.set()

View File

@@ -5,7 +5,6 @@ from typing import Generic, Literal, TypeVar
import grpc
import grpc.aio
from grpc._cython import cygrpc # type: ignore[attr-defined]
from hatchet_sdk.clients.event_ts import ThreadSafeEvent, read_with_interrupt
from hatchet_sdk.config import ClientConfig
@@ -131,9 +130,12 @@ class PooledListener(Generic[R, T, L], ABC):
await asyncio.sleep(DEFAULT_LISTENER_RETRY_INTERVAL)
break
event, key = t.result()
event, key, is_eof = t.result()
if event is cygrpc.EOF:
if is_eof:
logger.debug(
f"Handling EOF in Pooled Listener {self.__class__.__name__}"
)
break
subscriptions = self.to_subscriptions.get(key, [])

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "1.6.0"
version = "1.6.1"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"