Files
hatchet/sdks/python/hatchet_sdk/connection.py
T
Matt Kaye 2f33dd4dbd Feat: Misc. Python improvements + Streaming Improvements (#1846)
* fix: contextvars explicit copy

* feat: fix a ton of ruff errors

* fix: couple more ruff rules

* fix: ignore unhelpful rule

* fix: exception group in newer Python versions for improved handling

* fix: workflow docs

* feat: context docs

* feat: simple task counter

* feat: config for setting max tasks

* feat: graceful exit once worker exceeds max tasks

* fix: optional

* fix: docs

* fix: events docs + gen

* chore: gen

* fix: one more dangling task

* feat: add xdist in ci

* fix: CI

* fix: xdist fails me once again

* fix: fix + extend some tests

* fix: test cleanup

* fix: exception group

* fix: ugh

* feat: changelog

* Add Ruff linter callout to post

* refactor: clean up runner error handling

* feat: improved errors

* fix: lint

* feat: hacky serde impl

* fix: improve serde + formatting

* fix: logging

* fix: lint

* fix: unexpected errors

* fix: naming, ruff

* fix: rm cruft

* Fix: Attempt to fix namespacing issue in event waits (#1885)

* feat: add xdist in ci

* fix: attempt to fix namespacing issue in event waits

* fix: namespaced worker names

* fix: applied namespace to the wrong thing

* fix: rm hack

* drive by: namespacing improvement

* fix: delay

* fix: changelog

* fix: initial log work

* fix: more logging work

* fix: rm print cruft

* feat: use a queue to send logs

* fix: sentinel value to stop the loop

* fix: use the log sender everywhere

* fix: make streaming blocking, remove more thread pools

* feat: changelog

* fix: linting issues

* fix: broken test

* chore: bunch more generated stuff

* fix: changelog

* fix: one more

* fix: mypy

* chore: gen

* Feat: Streaming Improvements (#1886)

* Fix: Filter list improvements (#1899)

* fix: uuid validation

* fix: improve filter filtering

* fix: inner join

* fix: bug in workflow cached prop

* chore: bump

* fix: lint

* chore: changelog

* fix: separate filter queries

* feat: improve filter filtering

* fix: queries and the like

* feat: add xdist in ci

* feat: streaming test + gen

* feat: add index to stream event

* fix: rm langfuse dep

* fix: lf

* chore: gen

* feat: impl index for stream on context

* feat: tweak protos

* feat: extend test

* feat: send event index through queue

* feat: first pass + debug logging

* debug: fixes

* debug: more possible issues

* feat: generate new stream event protos

* feat: first pass at using an alternate exchange for replaying incoming stream events

* fix: exchange create timing

* fix: rm unused protos

* chore: gen

* feat: python cleanup

* fix: revert rabbit changes

* fix: unwind a bunch of cruft

* fix: optional index

* chore: gen python

* fix: event index nil handling

* feat: improve test

* fix: stream impl in sdk

* fix: make test faster

* chore: gen a ton more stuff

* fix: test

* fix: sorting helper

* fix: bug

* fix: one more ordering bug

* feat: add some tests for buffering logic

* feat: hangup test

* feat: test no buffering if no index sent

* fix: regular mutex

* fix: pr feedback

* fix: conflicts
2025-06-25 10:11:01 -04:00

85 lines
2.6 KiB
Python

import os
from typing import Literal, cast, overload
import grpc
from hatchet_sdk.config import ClientConfig
@overload
def new_conn(config: ClientConfig, aio: Literal[False]) -> grpc.Channel: ...
@overload
def new_conn(config: ClientConfig, aio: Literal[True]) -> grpc.aio.Channel: ...
def new_conn(config: ClientConfig, aio: bool) -> grpc.Channel | grpc.aio.Channel:
credentials: grpc.ChannelCredentials | None = None
# load channel credentials
if config.tls_config.strategy == "tls":
root: bytes | None = None
if config.tls_config.root_ca_file:
with open(config.tls_config.root_ca_file, "rb") as f:
root = f.read()
credentials = grpc.ssl_channel_credentials(root_certificates=root)
elif config.tls_config.strategy == "mtls":
assert config.tls_config.root_ca_file
assert config.tls_config.key_file
assert config.tls_config.cert_file
with open(config.tls_config.root_ca_file, "rb") as f:
root = f.read()
with open(config.tls_config.key_file, "rb") as f:
private_key = f.read()
with open(config.tls_config.cert_file, "rb") as f:
certificate_chain = f.read()
credentials = grpc.ssl_channel_credentials(
root_certificates=root,
private_key=private_key,
certificate_chain=certificate_chain,
)
start = grpc if not aio else grpc.aio
channel_options: list[tuple[str, str | int]] = [
("grpc.max_send_message_length", config.grpc_max_send_message_length),
("grpc.max_receive_message_length", config.grpc_max_recv_message_length),
("grpc.keepalive_time_ms", 10 * 1000),
("grpc.keepalive_timeout_ms", 60 * 1000),
("grpc.client_idle_timeout_ms", 60 * 1000),
("grpc.http2.max_pings_without_data", 0),
("grpc.keepalive_permit_without_calls", 1),
]
# Set environment variable to disable fork support. Reference: https://github.com/grpc/grpc/issues/28557
# When steps execute via os.fork, we see `TSI_DATA_CORRUPTED` errors.
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "False"
if config.tls_config.strategy == "none":
conn = start.insecure_channel(
target=config.host_port,
options=channel_options,
)
else:
channel_options.append(
("grpc.ssl_target_name_override", config.tls_config.server_name)
)
conn = start.secure_channel(
target=config.host_port,
credentials=credentials,
options=channel_options,
)
return cast(
grpc.Channel | grpc.aio.Channel,
conn,
)