Files
hatchet/sdks/python/hatchet_sdk/connection.py
Sid Premkumar 709dd89a18 Add gzip compression (#2539)
* Add gzip compression init

* revert

* Feat: Initial cross-domain identify setup (#2533)

* feat: initial setup

* fix: factor out

* chore: lint

* fix: xss vuln

* feat: set up properly

* fix: lint

* fix: key

* fix: keys, cleanup

* Fix: use sessionStorage instead of localStorage (#2541)

* chore(deps): bump golang.org/x/crypto from 0.44.0 to 0.45.0 (#2545)

Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.44.0 to 0.45.0.
- [Commits](https://github.com/golang/crypto/compare/v0.44.0...v0.45.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-version: 0.45.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml (#2547)

Bumps [google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml](https://github.com/google/osv-scanner-action) from 2.2.4 to 2.3.0.
- [Release notes](https://github.com/google/osv-scanner-action/releases)
- [Commits](https://github.com/google/osv-scanner-action/compare/v2.2.4...v2.3.0)

---
updated-dependencies:
- dependency-name: google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml
  dependency-version: 2.3.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [Go SDK] Resubscribe and get a new listener stream when gRPC connections fail (#2544)

* fix listener cache issue to resubscribe when erroring out

* worker retry message clarification (#2543)

* add another retry layer and add comments

* fix loop logic

* make listener channel retry

* Compression test utils, and add log to indicate its enabled

* clean + fix

* more fallbacks

* common pgxpool afterconnect method (#2553)

* remove

* lint

* lint

* add cpu monitor during test

* fix background monitor and lint

* Make envvar to disable compression

* cleanup monitoring

* PR Feedback

* Update paths in compression tests + bump package versions

* path issue on test script

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: matt <mrkaye97@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Mohammed Nafees <hello@mnafees.me>
2025-11-26 17:14:38 -05:00

91 lines
3.0 KiB
Python

import os
from typing import Literal, cast, overload
import grpc
from hatchet_sdk.config import ClientConfig
@overload
def new_conn(config: ClientConfig, aio: Literal[False]) -> grpc.Channel: ...
@overload
def new_conn(config: ClientConfig, aio: Literal[True]) -> grpc.aio.Channel: ...
def new_conn(config: ClientConfig, aio: bool) -> grpc.Channel | grpc.aio.Channel:
credentials: grpc.ChannelCredentials | None = None
# load channel credentials
if config.tls_config.strategy == "tls":
root: bytes | None = None
if config.tls_config.root_ca_file:
with open(config.tls_config.root_ca_file, "rb") as f:
root = f.read()
credentials = grpc.ssl_channel_credentials(root_certificates=root)
elif config.tls_config.strategy == "mtls":
assert config.tls_config.root_ca_file
assert config.tls_config.key_file
assert config.tls_config.cert_file
with open(config.tls_config.root_ca_file, "rb") as f:
root = f.read()
with open(config.tls_config.key_file, "rb") as f:
private_key = f.read()
with open(config.tls_config.cert_file, "rb") as f:
certificate_chain = f.read()
credentials = grpc.ssl_channel_credentials(
root_certificates=root,
private_key=private_key,
certificate_chain=certificate_chain,
)
start = grpc if not aio else grpc.aio
channel_options: list[tuple[str, str | int]] = [
("grpc.max_send_message_length", config.grpc_max_send_message_length),
("grpc.max_receive_message_length", config.grpc_max_recv_message_length),
("grpc.keepalive_time_ms", 10 * 1000),
("grpc.keepalive_timeout_ms", 60 * 1000),
("grpc.client_idle_timeout_ms", 60 * 1000),
("grpc.http2.max_pings_without_data", 0),
("grpc.keepalive_permit_without_calls", 1),
("grpc.default_compression_algorithm", grpc.Compression.Gzip),
]
# Set environment variable to disable fork support. Reference: https://github.com/grpc/grpc/issues/28557
# When steps execute via os.fork, we see `TSI_DATA_CORRUPTED` errors.
# needs to be the string "True" or "False"
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = str(config.grpc_enable_fork_support)
if config.grpc_enable_fork_support:
# See discussion: https://github.com/hatchet-dev/hatchet/pull/2057#discussion_r2243233357
os.environ["GRPC_POLL_STRATEGY"] = "poll"
if config.tls_config.strategy == "none":
conn = start.insecure_channel(
target=config.host_port,
options=channel_options,
)
else:
channel_options.append(
("grpc.ssl_target_name_override", config.tls_config.server_name)
)
conn = start.secure_channel(
target=config.host_port,
credentials=credentials,
options=channel_options,
)
return cast(
grpc.Channel | grpc.aio.Channel,
conn,
)