Files
hatchet/hack/dev/compression-test/tests/python_test.py
Sid Premkumar 709dd89a18 Add gzip compression (#2539)
* Add gzip compression init

* revert

* Feat: Initial cross-domain identify setup (#2533)

* feat: initial setup

* fix: factor out

* chore: lint

* fix: xss vuln

* feat: set up properly

* fix: lint

* fix: key

* fix: keys, cleanup

* Fix: use sessionStorage instead of localStorage (#2541)

* chore(deps): bump golang.org/x/crypto from 0.44.0 to 0.45.0 (#2545)

Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.44.0 to 0.45.0.
- [Commits](https://github.com/golang/crypto/compare/v0.44.0...v0.45.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-version: 0.45.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml (#2547)

Bumps [google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml](https://github.com/google/osv-scanner-action) from 2.2.4 to 2.3.0.
- [Release notes](https://github.com/google/osv-scanner-action/releases)
- [Commits](https://github.com/google/osv-scanner-action/compare/v2.2.4...v2.3.0)

---
updated-dependencies:
- dependency-name: google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml
  dependency-version: 2.3.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [Go SDK] Resubscribe and get a new listener stream when gRPC connections fail (#2544)

* fix listener cache issue to resubscribe when erroring out

* worker retry message clarification (#2543)

* add another retry layer and add comments

* fix loop logic

* make listener channel retry

* Compression test utils, and add log to indicate its enabled

* clean + fix

* more fallbacks

* common pgxpool afterconnect method (#2553)

* remove

* lint

* lint

* add cpu monitor during test

* fix background monitor and lint

* Make envvar to disable compression

* cleanup monitoring

* PR Feedback

* Update paths in compression tests + bump package versions

* path issue on test script

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: matt <mrkaye97@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Mohammed Nafees <hello@mnafees.me>
2025-11-26 17:14:38 -05:00

136 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""Compression test script for Python SDK"""
import os
import signal
import threading
import time
from datetime import datetime
from typing import Any
from hatchet_sdk import Context, Hatchet
# Create large payload (100KB)
def create_large_payload() -> dict[str, str]:
payload: dict[str, str] = {}
chunk = "a" * 1000 # 1KB chunk
for i in range(100):
payload[f"chunk_{i}"] = chunk
return payload
def emit_events(hatchet: Hatchet, total_events: int, events_per_second: int) -> None:
"""Emit events in a separate thread"""
interval = 1.0 / events_per_second
large_payload = create_large_payload()
event_id = 0
print(f"Starting to emit {total_events} events...")
while event_id < total_events:
event = {
"id": event_id,
"createdAt": datetime.now().isoformat(),
"payload": large_payload,
}
try:
hatchet.event.push("compression-test:event", event)
event_id += 1
if event_id % 50 == 0:
print(f"Emitted {event_id} events...")
except Exception as e:
print(f"Error pushing event {event_id}: {e}")
# Wait for next interval
time.sleep(interval)
print(f"Finished emitting {event_id} events")
def main() -> None:
# Namespace is set via environment variable HATCHET_CLIENT_NAMESPACE
hatchet = Hatchet(debug=False)
# Get compression state from environment (default to 'enabled')
compression_state = os.getenv("COMPRESSION_STATE", "enabled")
workflow_name = f"{compression_state}-python"
# Create workflow
workflow = hatchet.workflow(
name=workflow_name,
on_events=["compression-test:event"],
)
@workflow.task()
def step1(input_data: Any, ctx: Context) -> dict[str, Any]:
# EmptyModel allows extra fields, access as attributes
# The event data is passed as the workflow input
event_id = getattr(input_data, "id", None)
print(f"Processing event {event_id}")
return {
"processed": True,
"eventId": event_id,
"timestamp": datetime.now().isoformat(),
}
# Create worker
worker = hatchet.worker(
"compression-test-worker",
slots=100,
workflows=[workflow],
)
# Get number of events from environment variable
total_events = int(os.getenv("TEST_EVENTS_COUNT", "10"))
events_per_second = 10
# Calculate duration needed to send all events
duration = max(1, total_events / events_per_second) # At least 1 second
# Calculate total wait time (worker registration + event emission + processing buffer)
wait_time = int(duration) + 15 # 5s for registration + duration + 10s buffer
# Set up signal handler to stop worker after test duration
def stop_worker_after_delay():
time.sleep(wait_time)
print("Test complete, stopping worker...")
# Send SIGTERM to current process to trigger worker shutdown
os.kill(os.getpid(), signal.SIGTERM)
# Start timer to stop worker after test duration
stop_timer = threading.Timer(wait_time, lambda: os.kill(os.getpid(), signal.SIGTERM))
stop_timer.daemon = True
stop_timer.start()
# Start emitting events in a separate thread
emit_thread = threading.Thread(
target=emit_events,
args=(hatchet, total_events, events_per_second),
daemon=True,
)
print("Starting worker...")
print(f"Emitting {total_events} events over {duration:.1f} seconds...")
# Start emitting events
emit_thread.start()
# Wait a moment for events to start
time.sleep(1)
# Start worker (blocking call - will run until SIGTERM)
try:
worker.start()
except KeyboardInterrupt:
print("Worker stopped")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Test interrupted")
except Exception as e:
print(f"Test failed: {e}")
raise