mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-01 06:11:02 -06:00
* Add gzip compression init * revert * Feat: Initial cross-domain identify setup (#2533) * feat: initial setup * fix: factor out * chore: lint * fix: xss vuln * feat: set up properly * fix: lint * fix: key * fix: keys, cleanup * Fix: use sessionStorage instead of localStorage (#2541) * chore(deps): bump golang.org/x/crypto from 0.44.0 to 0.45.0 (#2545) Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.44.0 to 0.45.0. - [Commits](https://github.com/golang/crypto/compare/v0.44.0...v0.45.0) --- updated-dependencies: - dependency-name: golang.org/x/crypto dependency-version: 0.45.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * chore(deps): bump google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml (#2547) Bumps [google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml](https://github.com/google/osv-scanner-action) from 2.2.4 to 2.3.0. - [Release notes](https://github.com/google/osv-scanner-action/releases) - [Commits](https://github.com/google/osv-scanner-action/compare/v2.2.4...v2.3.0) --- updated-dependencies: - dependency-name: google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml dependency-version: 2.3.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [Go SDK] Resubscribe and get a new listener stream when gRPC connections fail (#2544) * fix listener cache issue to resubscribe when erroring out * worker retry message clarification (#2543) * add another retry layer and add comments * fix loop logic * make listener channel retry * Compression test utils, and add log to indicate its enabled * clean + fix * more fallbacks * common pgxpool afterconnect method (#2553) * remove * lint * lint * add cpu monitor during test * fix background monitor and lint * Make envvar to disable compression * cleanup monitoring * PR Feedback * Update paths in compression tests + bump package versions * path issue on test script --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: matt <mrkaye97@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Mohammed Nafees <hello@mnafees.me>
136 lines
4.0 KiB
Python
136 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Compression test script for Python SDK"""
|
|
|
|
import os
|
|
import signal
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from hatchet_sdk import Context, Hatchet
|
|
|
|
# Create large payload (100KB)
|
|
def create_large_payload() -> dict[str, str]:
|
|
payload: dict[str, str] = {}
|
|
chunk = "a" * 1000 # 1KB chunk
|
|
for i in range(100):
|
|
payload[f"chunk_{i}"] = chunk
|
|
return payload
|
|
|
|
|
|
def emit_events(hatchet: Hatchet, total_events: int, events_per_second: int) -> None:
|
|
"""Emit events in a separate thread"""
|
|
interval = 1.0 / events_per_second
|
|
large_payload = create_large_payload()
|
|
event_id = 0
|
|
|
|
print(f"Starting to emit {total_events} events...")
|
|
|
|
while event_id < total_events:
|
|
event = {
|
|
"id": event_id,
|
|
"createdAt": datetime.now().isoformat(),
|
|
"payload": large_payload,
|
|
}
|
|
|
|
try:
|
|
hatchet.event.push("compression-test:event", event)
|
|
event_id += 1
|
|
if event_id % 50 == 0:
|
|
print(f"Emitted {event_id} events...")
|
|
except Exception as e:
|
|
print(f"Error pushing event {event_id}: {e}")
|
|
|
|
# Wait for next interval
|
|
time.sleep(interval)
|
|
|
|
print(f"Finished emitting {event_id} events")
|
|
|
|
|
|
def main() -> None:
|
|
# Namespace is set via environment variable HATCHET_CLIENT_NAMESPACE
|
|
hatchet = Hatchet(debug=False)
|
|
|
|
# Get compression state from environment (default to 'enabled')
|
|
compression_state = os.getenv("COMPRESSION_STATE", "enabled")
|
|
workflow_name = f"{compression_state}-python"
|
|
|
|
# Create workflow
|
|
workflow = hatchet.workflow(
|
|
name=workflow_name,
|
|
on_events=["compression-test:event"],
|
|
)
|
|
|
|
@workflow.task()
|
|
def step1(input_data: Any, ctx: Context) -> dict[str, Any]:
|
|
# EmptyModel allows extra fields, access as attributes
|
|
# The event data is passed as the workflow input
|
|
event_id = getattr(input_data, "id", None)
|
|
print(f"Processing event {event_id}")
|
|
return {
|
|
"processed": True,
|
|
"eventId": event_id,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
|
|
# Create worker
|
|
worker = hatchet.worker(
|
|
"compression-test-worker",
|
|
slots=100,
|
|
workflows=[workflow],
|
|
)
|
|
|
|
# Get number of events from environment variable
|
|
total_events = int(os.getenv("TEST_EVENTS_COUNT", "10"))
|
|
events_per_second = 10
|
|
# Calculate duration needed to send all events
|
|
duration = max(1, total_events / events_per_second) # At least 1 second
|
|
|
|
# Calculate total wait time (worker registration + event emission + processing buffer)
|
|
wait_time = int(duration) + 15 # 5s for registration + duration + 10s buffer
|
|
|
|
# Set up signal handler to stop worker after test duration
|
|
def stop_worker_after_delay():
|
|
time.sleep(wait_time)
|
|
print("Test complete, stopping worker...")
|
|
# Send SIGTERM to current process to trigger worker shutdown
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
|
|
# Start timer to stop worker after test duration
|
|
stop_timer = threading.Timer(wait_time, lambda: os.kill(os.getpid(), signal.SIGTERM))
|
|
stop_timer.daemon = True
|
|
stop_timer.start()
|
|
|
|
# Start emitting events in a separate thread
|
|
emit_thread = threading.Thread(
|
|
target=emit_events,
|
|
args=(hatchet, total_events, events_per_second),
|
|
daemon=True,
|
|
)
|
|
|
|
print("Starting worker...")
|
|
print(f"Emitting {total_events} events over {duration:.1f} seconds...")
|
|
|
|
# Start emitting events
|
|
emit_thread.start()
|
|
|
|
# Wait a moment for events to start
|
|
time.sleep(1)
|
|
|
|
# Start worker (blocking call - will run until SIGTERM)
|
|
try:
|
|
worker.start()
|
|
except KeyboardInterrupt:
|
|
print("Worker stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("Test interrupted")
|
|
except Exception as e:
|
|
print(f"Test failed: {e}")
|
|
raise
|