mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-29 14:09:54 -05:00
Add gzip compression (#2539)
* Add gzip compression init * revert * Feat: Initial cross-domain identify setup (#2533) * feat: initial setup * fix: factor out * chore: lint * fix: xss vuln * feat: set up properly * fix: lint * fix: key * fix: keys, cleanup * Fix: use sessionStorage instead of localStorage (#2541) * chore(deps): bump golang.org/x/crypto from 0.44.0 to 0.45.0 (#2545) Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.44.0 to 0.45.0. - [Commits](https://github.com/golang/crypto/compare/v0.44.0...v0.45.0) --- updated-dependencies: - dependency-name: golang.org/x/crypto dependency-version: 0.45.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * chore(deps): bump google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml (#2547) Bumps [google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml](https://github.com/google/osv-scanner-action) from 2.2.4 to 2.3.0. - [Release notes](https://github.com/google/osv-scanner-action/releases) - [Commits](https://github.com/google/osv-scanner-action/compare/v2.2.4...v2.3.0) --- updated-dependencies: - dependency-name: google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml dependency-version: 2.3.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [Go SDK] Resubscribe and get a new listener stream when gRPC connections fail (#2544) * fix listener cache issue to resubscribe when erroring out * worker retry message clarification (#2543) * add another retry layer and add comments * fix loop logic * make listener channel retry * Compression test utils, and add log to indicate its enabled * clean + fix * more fallbacks * common pgxpool afterconnect method (#2553) * remove * lint * lint * add cpu monitor during test * fix background monitor and lint * Make envvar to disable compression * cleanup monitoring * PR Feedback * Update paths in compression tests + bump package versions * path issue on test script --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: matt <mrkaye97@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Mohammed Nafees <hello@mnafees.me>
This commit is contained in:
@@ -224,7 +224,7 @@ jobs:
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version: '1.24'
|
||||
go-version: "1.24"
|
||||
|
||||
- name: Setup pnpm
|
||||
uses: pnpm/action-setup@v4
|
||||
@@ -332,7 +332,9 @@ jobs:
|
||||
|
||||
- name: Test
|
||||
run: |
|
||||
go test -tags load ./... -p 5 -v -race -failfast -timeout 20m
|
||||
# Disable gzip compression for load tests to reduce CPU overhead
|
||||
# Compression adds overhead without benefit for 0kb payloads
|
||||
HATCHET_CLIENT_DISABLE_GZIP_COMPRESSION=true go test -tags load ./... -p 5 -v -race -failfast -timeout 20m
|
||||
env:
|
||||
TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }}
|
||||
TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }}
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
results/
|
||||
*.log
|
||||
*.summary
|
||||
network_stats_*.log
|
||||
@@ -0,0 +1,35 @@
|
||||
FROM golang:1.25.0-alpine AS builder
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy go mod files first for better caching
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
# Copy only what's needed for the build
|
||||
COPY cmd/hatchet-loadtest/ ./cmd/hatchet-loadtest/
|
||||
COPY pkg/ ./pkg/
|
||||
COPY internal/ ./internal/
|
||||
COPY api/ ./api/
|
||||
COPY api-contracts/ ./api-contracts/
|
||||
|
||||
# Build the loadtest binary
|
||||
RUN cd cmd/hatchet-loadtest && \
|
||||
CGO_ENABLED=0 GOOS=linux go build -tags=load -o /app/hatchet-loadtest .
|
||||
|
||||
FROM alpine:latest
|
||||
|
||||
RUN apk --no-cache add ca-certificates
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY --from=builder /app/hatchet-loadtest .
|
||||
|
||||
# Default command runs the load test
|
||||
CMD ["./hatchet-loadtest", "loadtest", \
|
||||
"--events", "10", \
|
||||
"--duration", "60s", \
|
||||
"--payloadSize", "100kb", \
|
||||
"--dagSteps", "1", \
|
||||
"--eventFanout", "1", \
|
||||
"--slots", "100"]
|
||||
@@ -0,0 +1,33 @@
|
||||
FROM python:3.11-slim AS builder
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy Python SDK files only
|
||||
COPY sdks/python/pyproject.toml ./sdks/python/
|
||||
COPY sdks/python/poetry.lock ./sdks/python/
|
||||
COPY sdks/python/hatchet_sdk/ ./sdks/python/hatchet_sdk/
|
||||
COPY sdks/python/README.md ./sdks/python/
|
||||
|
||||
# Install build dependencies
|
||||
RUN pip install --upgrade pip && \
|
||||
pip install poetry
|
||||
|
||||
# Build Python SDK
|
||||
WORKDIR /app/sdks/python
|
||||
RUN poetry build
|
||||
|
||||
FROM python:3.11-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Install runtime dependencies
|
||||
RUN pip install --upgrade pip
|
||||
|
||||
# Copy and install the built SDK
|
||||
COPY --from=builder /app/sdks/python/dist/*.whl ./
|
||||
RUN pip install *.whl
|
||||
|
||||
# Copy test file
|
||||
COPY hack/dev/compression-test/tests/python_test.py /app/python_test.py
|
||||
|
||||
CMD ["python", "python_test.py"]
|
||||
@@ -0,0 +1,52 @@
|
||||
FROM node:20-alpine AS builder
|
||||
|
||||
# Install pnpm
|
||||
RUN npm install -g pnpm
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy package files
|
||||
COPY sdks/typescript/package*.json ./sdks/typescript/
|
||||
COPY sdks/typescript/pnpm-lock.yaml ./sdks/typescript/
|
||||
COPY sdks/typescript/tsconfig.json ./sdks/typescript/
|
||||
|
||||
# Copy scripts directory (needed for postinstall script)
|
||||
COPY sdks/typescript/scripts/ ./sdks/typescript/scripts/
|
||||
|
||||
# Install dependencies
|
||||
WORKDIR /app/sdks/typescript
|
||||
RUN pnpm install --frozen-lockfile
|
||||
|
||||
# Copy source code (only TypeScript SDK)
|
||||
COPY sdks/typescript/src/ ./src/
|
||||
COPY sdks/typescript/package.json ./
|
||||
|
||||
# Build the SDK
|
||||
RUN pnpm run tsc:build
|
||||
|
||||
# Install tsconfig-paths as a dev dependency so it's available in node_modules
|
||||
# This is needed for ts-node -r tsconfig-paths/register to work
|
||||
RUN pnpm add -D tsconfig-paths
|
||||
|
||||
# Copy test file into the SDK directory
|
||||
COPY hack/dev/compression-test/tests/typescript_test.ts ./test.ts
|
||||
|
||||
FROM node:20-alpine
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy built files and dependencies
|
||||
COPY --from=builder /app/sdks/typescript/dist ./dist
|
||||
COPY --from=builder /app/sdks/typescript/node_modules ./node_modules
|
||||
COPY --from=builder /app/sdks/typescript/package.json ./
|
||||
COPY --from=builder /app/sdks/typescript/tsconfig.json ./
|
||||
COPY --from=builder /app/sdks/typescript/test.ts ./typescript_test.ts
|
||||
|
||||
# Install ts-node and typescript globally for running TypeScript
|
||||
# tsconfig-paths is already in node_modules from the builder stage
|
||||
RUN npm install -g ts-node typescript
|
||||
|
||||
# Set NODE_PATH to include dist
|
||||
ENV NODE_PATH=/app/dist
|
||||
|
||||
CMD ["ts-node", "-r", "tsconfig-paths/register", "typescript_test.ts"]
|
||||
@@ -0,0 +1,71 @@
|
||||
.PHONY: setup build-baseline build-compression test-baseline test-compression report help
|
||||
|
||||
help:
|
||||
@echo "Compression Testing Suite"
|
||||
@echo ""
|
||||
@echo "Available targets:"
|
||||
@echo " setup - Initial setup (create network, directories)"
|
||||
@echo " build-baseline - Build baseline images (requires: git checkout main)"
|
||||
@echo " build-compression - Build compression images (current branch)"
|
||||
@echo " test-baseline - Run all baseline tests"
|
||||
@echo " test-compression - Run all compression tests"
|
||||
@echo " report - Generate comparison report"
|
||||
@echo ""
|
||||
@echo "Individual SDK tests:"
|
||||
@echo " test-go-baseline - Test Go SDK (baseline)"
|
||||
@echo " test-go-compression - Test Go SDK (compression)"
|
||||
@echo " test-ts-baseline - Test TypeScript SDK (baseline)"
|
||||
@echo " test-ts-compression - Test TypeScript SDK (compression)"
|
||||
@echo " test-py-baseline - Test Python SDK (baseline)"
|
||||
@echo " test-py-compression - Test Python SDK (compression)"
|
||||
|
||||
setup:
|
||||
./scripts/setup.sh
|
||||
|
||||
build-baseline:
|
||||
@echo "Building baseline images (main branch)..."
|
||||
@echo "Make sure you're on the main branch!"
|
||||
@read -p "Press enter to continue..."
|
||||
@cd ../.. && docker build -t go-disabled-compression -f hack/dev/compression-test/Dockerfile.client-go .
|
||||
@cd ../.. && docker build -t typescript-disabled-compression -f hack/dev/compression-test/Dockerfile.client-ts .
|
||||
@cd ../.. && docker build -t python-disabled-compression -f hack/dev/compression-test/Dockerfile.client-python .
|
||||
@echo "Baseline images built successfully!"
|
||||
|
||||
build-compression:
|
||||
@echo "Building compression images (current branch)..."
|
||||
@read -p "Press enter to continue..."
|
||||
@cd ../.. && docker build -t go-enabled-compression -f hack/dev/compression-test/Dockerfile.client-go .
|
||||
@cd ../.. && docker build -t typescript-enabled-compression -f hack/dev/compression-test/Dockerfile.client-ts .
|
||||
@cd ../.. && docker build -t python-enabled-compression -f hack/dev/compression-test/Dockerfile.client-python .
|
||||
@echo "Compression images built successfully!"
|
||||
|
||||
test-baseline:
|
||||
./scripts/run_all_tests.sh disabled
|
||||
|
||||
test-compression:
|
||||
./scripts/run_all_tests.sh enabled
|
||||
|
||||
test-go-baseline:
|
||||
./scripts/run_test.sh go disabled
|
||||
|
||||
test-go-compression:
|
||||
./scripts/run_test.sh go enabled
|
||||
|
||||
test-ts-baseline:
|
||||
./scripts/run_test.sh typescript disabled
|
||||
|
||||
test-ts-compression:
|
||||
./scripts/run_test.sh typescript enabled
|
||||
|
||||
test-py-baseline:
|
||||
./scripts/run_test.sh python disabled
|
||||
|
||||
test-py-compression:
|
||||
./scripts/run_test.sh python enabled
|
||||
|
||||
report:
|
||||
./scripts/generate_report.sh
|
||||
|
||||
clean:
|
||||
docker rm -f hatchet-engine hatchet-client-go hatchet-client-typescript hatchet-client-python 2>/dev/null || true
|
||||
rm -rf results/*.log results/*.summary
|
||||
@@ -0,0 +1,53 @@
|
||||
# Quick Start Guide
|
||||
|
||||
## Build and Test (Current Branch)
|
||||
|
||||
1. **Set your environment variables**:
|
||||
|
||||
```bash
|
||||
export HATCHET_CLIENT_TOKEN="your-token-here"
|
||||
export HATCHET_CLIENT_HOST_PORT="localhost:7070" # Where your engine is running
|
||||
```
|
||||
|
||||
2. **Build all images** (from repo root):
|
||||
|
||||
```bash
|
||||
cd /path/to/hatchet
|
||||
./hack/dev/compression-test/scripts/build_all.sh enabled
|
||||
```
|
||||
|
||||
3. **Run all tests**:
|
||||
|
||||
```bash
|
||||
cd hack/dev/compression-test
|
||||
./scripts/run_all_tests.sh enabled
|
||||
```
|
||||
|
||||
4. **Generate report**:
|
||||
```bash
|
||||
./scripts/generate_report.sh
|
||||
```
|
||||
|
||||
## What You Need
|
||||
|
||||
- **HATCHET_CLIENT_TOKEN**: Required - Your Hatchet API token
|
||||
- **HATCHET_CLIENT_HOST_PORT**: Required - Where your engine gRPC is running (e.g., `localhost:7070`)
|
||||
- **Engine running**: You need to have your Hatchet engine running separately (not managed by these scripts)
|
||||
|
||||
## What Gets Tested
|
||||
|
||||
Each SDK (Go, TypeScript, Python) will:
|
||||
|
||||
- Connect to your engine
|
||||
- Emit 600 events over 60 seconds (10 events/second)
|
||||
- Each event has a 100KB payload
|
||||
- Network traffic is measured and compared
|
||||
|
||||
## Results
|
||||
|
||||
Results are saved in `results/` directory:
|
||||
|
||||
- `results/enabled/` - Test results with compression
|
||||
- `results/disabled/` - Test results without compression (run on main branch)
|
||||
|
||||
The final report shows bandwidth savings from compression.
|
||||
@@ -0,0 +1,151 @@
|
||||
# Compression Testing Suite
|
||||
|
||||
This directory contains scripts and configurations to test gRPC compression across Go, TypeScript, and Python SDKs.
|
||||
|
||||
## Overview
|
||||
|
||||
Tests measure network traffic bytes for each SDK comparing:
|
||||
|
||||
- **Baseline**: Main branch (no compression)
|
||||
- **With Compression**: Current branch (compression enabled)
|
||||
|
||||
## Docker Image Tags
|
||||
|
||||
All images use the format: `{sdk}-{state}-compression`
|
||||
|
||||
- `go-disabled-compression` / `go-enabled-compression`
|
||||
- `typescript-disabled-compression` / `typescript-enabled-compression`
|
||||
- `python-disabled-compression` / `python-enabled-compression`
|
||||
- `engine-disabled-compression` / `engine-enabled-compression`
|
||||
|
||||
## Build Instructions
|
||||
|
||||
### Step 1: Build Baseline Images (Main Branch)
|
||||
|
||||
**Important:** All docker build commands must be run from the repository root, not from the testing directory.
|
||||
|
||||
```bash
|
||||
# Checkout main branch
|
||||
git checkout main
|
||||
|
||||
# Navigate to repository root (if not already there)
|
||||
cd /path/to/hatchet
|
||||
|
||||
# Build Go SDK
|
||||
docker build -t go-disabled-compression -f hack/dev/compression-test/Dockerfile.client-go .
|
||||
|
||||
# Build TypeScript SDK
|
||||
docker build -t typescript-disabled-compression -f hack/dev/compression-test/Dockerfile.client-ts .
|
||||
|
||||
# Build Python SDK
|
||||
docker build -t python-disabled-compression -f hack/dev/compression-test/Dockerfile.client-python .
|
||||
|
||||
# Build Engine (optional - you may manage this separately)
|
||||
# docker build -t engine-disabled-compression -f Dockerfile .
|
||||
```
|
||||
|
||||
### Step 2: Build Compression Images (Current Branch)
|
||||
|
||||
```bash
|
||||
# Checkout your compression branch
|
||||
git checkout <your-compression-branch>
|
||||
|
||||
# Navigate to repository root
|
||||
cd /path/to/hatchet
|
||||
|
||||
# Build Go SDK
|
||||
docker build -t go-enabled-compression -f hack/dev/compression-test/Dockerfile.client-go .
|
||||
|
||||
# Build TypeScript SDK
|
||||
docker build -t typescript-enabled-compression -f hack/dev/compression-test/Dockerfile.client-ts .
|
||||
|
||||
# Build Python SDK
|
||||
docker build -t python-enabled-compression -f hack/dev/compression-test/Dockerfile.client-python .
|
||||
|
||||
# Build Engine (optional - you may manage this separately)
|
||||
# docker build -t engine-enabled-compression -f Dockerfile .
|
||||
```
|
||||
|
||||
## Running Tests
|
||||
|
||||
### Prerequisites
|
||||
|
||||
1. **Engine must be running**: Start your Hatchet engine separately (not managed by these scripts)
|
||||
|
||||
- The engine should be accessible at the host/port you specify in `HATCHET_CLIENT_HOST_PORT`
|
||||
|
||||
2. **Set required environment variables**:
|
||||
|
||||
```bash
|
||||
export HATCHET_CLIENT_TOKEN="your-token-here"
|
||||
export HATCHET_CLIENT_HOST_PORT="localhost:7070" # gRPC address where your engine is running
|
||||
```
|
||||
|
||||
**Optional environment variables**:
|
||||
|
||||
```bash
|
||||
export HATCHET_CLIENT_SERVER_URL="http://localhost:8080" # HTTP server URL (for Go SDK, defaults to http://localhost:8080)
|
||||
export HATCHET_CLIENT_API_URL="http://localhost:8080" # API URL (for TypeScript SDK, defaults to http://localhost:8080)
|
||||
export HATCHET_CLIENT_NAMESPACE="compression-test" # Namespace (optional, defaults to compression-test)
|
||||
```
|
||||
|
||||
**Note**:
|
||||
|
||||
- `HATCHET_CLIENT_TENANT_ID` is extracted from the token automatically, so you don't need to set it.
|
||||
- The scripts use `host` network mode, so containers can access your engine running on the host.
|
||||
|
||||
### Quick Start
|
||||
|
||||
```bash
|
||||
# Navigate to testing directory
|
||||
cd hack/dev/compression-test
|
||||
|
||||
# Run setup (creates network and directories)
|
||||
./scripts/setup.sh
|
||||
|
||||
# Run all baseline tests
|
||||
./scripts/run_all_tests.sh disabled
|
||||
|
||||
# Switch engine to compression version, then run compression tests
|
||||
./scripts/run_all_tests.sh enabled
|
||||
|
||||
# Generate comparison report
|
||||
./scripts/generate_report.sh
|
||||
```
|
||||
|
||||
### Individual SDK Tests
|
||||
|
||||
```bash
|
||||
# Test Go SDK (baseline)
|
||||
./scripts/run_test.sh go disabled
|
||||
|
||||
# Test Go SDK (compression)
|
||||
./scripts/run_test.sh go enabled
|
||||
|
||||
# Test TypeScript SDK
|
||||
./scripts/run_test.sh typescript disabled
|
||||
./scripts/run_test.sh typescript enabled
|
||||
|
||||
# Test Python SDK
|
||||
./scripts/run_test.sh python disabled
|
||||
./scripts/run_test.sh python enabled
|
||||
```
|
||||
|
||||
## Test Parameters
|
||||
|
||||
All tests use standardized parameters:
|
||||
|
||||
- Duration: 60 seconds
|
||||
- Events per second: 10
|
||||
- Payload size: 100KB
|
||||
- DAG steps: 1
|
||||
- Event fanout: 1
|
||||
|
||||
## Results
|
||||
|
||||
Results are saved in `results/` directory:
|
||||
|
||||
- `results/baseline/` - Baseline test results
|
||||
- `results/compressed/` - Compression test results
|
||||
|
||||
Final comparison report is generated by `scripts/generate_report.sh`
|
||||
@@ -0,0 +1,62 @@
|
||||
version: "3.8"
|
||||
|
||||
services:
|
||||
client-go:
|
||||
image: go-${COMPRESSION_STATE:-enabled}-compression
|
||||
container_name: hatchet-client-go
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
environment:
|
||||
- HATCHET_CLIENT_HOST_PORT=${HATCHET_CLIENT_HOST_PORT:-192.168.65.254:7070}
|
||||
- HATCHET_CLIENT_SERVER_URL=${HATCHET_CLIENT_SERVER_URL:-http://192.168.65.254:8080}
|
||||
- HATCHET_CLIENT_TOKEN=${HATCHET_CLIENT_TOKEN}
|
||||
- HATCHET_CLIENT_NAMESPACE=${HATCHET_CLIENT_NAMESPACE:-compression-test}
|
||||
- HATCHET_CLIENT_LOG_LEVEL=${HATCHET_CLIENT_LOG_LEVEL:-debug}
|
||||
- HATCHET_CLIENT_TLS_STRATEGY=none
|
||||
- COMPRESSION_STATE=${COMPRESSION_STATE:-enabled}
|
||||
command:
|
||||
- ./hatchet-loadtest
|
||||
- loadtest
|
||||
- --events
|
||||
- "${TEST_EVENTS_RATE:-10}"
|
||||
- --duration
|
||||
- "${TEST_DURATION:-60s}"
|
||||
- --payloadSize
|
||||
- "100kb"
|
||||
- --dagSteps
|
||||
- "1"
|
||||
- --eventFanout
|
||||
- "1"
|
||||
- --slots
|
||||
- "100"
|
||||
- --wait
|
||||
- "${TEST_WAIT:-60s}"
|
||||
|
||||
client-typescript:
|
||||
image: typescript-${COMPRESSION_STATE:-enabled}-compression
|
||||
container_name: hatchet-client-typescript
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
environment:
|
||||
- HATCHET_CLIENT_HOST_PORT=${HATCHET_CLIENT_HOST_PORT:-192.168.65.254:7070}
|
||||
- HATCHET_CLIENT_API_URL=${HATCHET_CLIENT_API_URL:-http://192.168.65.254:8080}
|
||||
- HATCHET_CLIENT_TOKEN=${HATCHET_CLIENT_TOKEN}
|
||||
- HATCHET_CLIENT_NAMESPACE=${HATCHET_CLIENT_NAMESPACE:-compression-test}
|
||||
- HATCHET_CLIENT_TLS_STRATEGY=none
|
||||
- TEST_EVENTS_COUNT=${TEST_EVENTS_COUNT:-10}
|
||||
- COMPRESSION_STATE=${COMPRESSION_STATE:-enabled}
|
||||
command: ["ts-node", "-r", "tsconfig-paths/register", "typescript_test.ts"]
|
||||
|
||||
client-python:
|
||||
image: python-${COMPRESSION_STATE:-enabled}-compression
|
||||
container_name: hatchet-client-python
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
environment:
|
||||
- HATCHET_CLIENT_HOST_PORT=${HATCHET_CLIENT_HOST_PORT:-192.168.65.254:7070}
|
||||
- HATCHET_CLIENT_TOKEN=${HATCHET_CLIENT_TOKEN}
|
||||
- HATCHET_CLIENT_NAMESPACE=${HATCHET_CLIENT_NAMESPACE:-compression-test}
|
||||
- HATCHET_CLIENT_TLS_STRATEGY=none
|
||||
- TEST_EVENTS_COUNT=${TEST_EVENTS_COUNT:-10}
|
||||
- COMPRESSION_STATE=${COMPRESSION_STATE:-enabled}
|
||||
command: ["python", "python_test.py"]
|
||||
+54
@@ -0,0 +1,54 @@
|
||||
#!/bin/bash
|
||||
# build_all.sh - Build all Docker images for current branch
|
||||
|
||||
set -e
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
REPO_ROOT="$(cd "$TEST_DIR/../../.." && pwd)"
|
||||
|
||||
# If compression is enabled or disabled (e.g. based on the branch main vs sid/add-gzip-compression)
|
||||
STATE=${1:-enabled}
|
||||
|
||||
echo "=========================================="
|
||||
echo "Building Docker Images"
|
||||
echo "State: $STATE"
|
||||
echo "=========================================="
|
||||
echo ""
|
||||
|
||||
# Navigate to repo root
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
# Build each SDK
|
||||
echo "Building go SDK image..."
|
||||
docker build -t "go-${STATE}-compression" -f "hack/dev/compression-test/Dockerfile.client-go" . || {
|
||||
echo "Error: Failed to build go SDK image"
|
||||
exit 1
|
||||
}
|
||||
echo "✓ go SDK image built successfully"
|
||||
echo ""
|
||||
|
||||
echo "Building typescript SDK image..."
|
||||
docker build -t "typescript-${STATE}-compression" -f "hack/dev/compression-test/Dockerfile.client-ts" . || {
|
||||
echo "Error: Failed to build typescript SDK image"
|
||||
exit 1
|
||||
}
|
||||
echo "✓ typescript SDK image built successfully"
|
||||
echo ""
|
||||
|
||||
echo "Building python SDK image..."
|
||||
docker build -t "python-${STATE}-compression" -f "hack/dev/compression-test/Dockerfile.client-python" . || {
|
||||
echo "Error: Failed to build python SDK image"
|
||||
exit 1
|
||||
}
|
||||
echo "✓ python SDK image built successfully"
|
||||
echo ""
|
||||
|
||||
echo "=========================================="
|
||||
echo "All images built successfully!"
|
||||
echo "=========================================="
|
||||
echo ""
|
||||
echo "Images created:"
|
||||
echo " - go-${STATE}-compression"
|
||||
echo " - typescript-${STATE}-compression"
|
||||
echo " - python-${STATE}-compression"
|
||||
+82
@@ -0,0 +1,82 @@
|
||||
#!/bin/bash
|
||||
# build_and_test.sh - Build images and run tests for current branch
|
||||
|
||||
set -e
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
REPO_ROOT="$(cd "$TEST_DIR/../../.." && pwd)"
|
||||
|
||||
STATE=${1:-enabled}
|
||||
HATCHET_CLIENT_TOKEN=${HATCHET_CLIENT_TOKEN:-""}
|
||||
|
||||
if [ -z "$HATCHET_CLIENT_TOKEN" ]; then
|
||||
echo "Error: HATCHET_CLIENT_TOKEN environment variable is required"
|
||||
echo "Usage: export HATCHET_CLIENT_TOKEN='your-token' && $0 [enabled|disabled]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "=========================================="
|
||||
echo "Building and Testing Compression Suite"
|
||||
echo "State: $STATE"
|
||||
echo "=========================================="
|
||||
echo ""
|
||||
|
||||
# Navigate to repo root
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
echo "Building Docker images..."
|
||||
echo ""
|
||||
|
||||
# Build Go SDK
|
||||
echo "Building Go SDK image..."
|
||||
docker build -t "go-${STATE}-compression" -f hack/dev/compression-test/Dockerfile.client-go . || {
|
||||
echo "Error: Failed to build Go SDK image"
|
||||
exit 1
|
||||
}
|
||||
|
||||
# Build TypeScript SDK
|
||||
echo "Building TypeScript SDK image..."
|
||||
docker build -t "typescript-${STATE}-compression" -f hack/dev/compression-test/Dockerfile.client-ts . || {
|
||||
echo "Error: Failed to build TypeScript SDK image"
|
||||
exit 1
|
||||
}
|
||||
|
||||
# Build Python SDK
|
||||
echo "Building Python SDK image..."
|
||||
docker build -t "python-${STATE}-compression" -f hack/dev/compression-test/Dockerfile.client-python . || {
|
||||
echo "Error: Failed to build Python SDK image"
|
||||
exit 1
|
||||
}
|
||||
|
||||
echo ""
|
||||
echo "All images built successfully!"
|
||||
echo ""
|
||||
|
||||
# Navigate to test directory
|
||||
cd "$TEST_DIR"
|
||||
|
||||
# Run setup if needed
|
||||
if ! docker network ls | grep -q hatchet-test; then
|
||||
echo "Running setup..."
|
||||
./scripts/setup.sh
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "Running tests with HATCHET_CLIENT_TOKEN..."
|
||||
echo ""
|
||||
|
||||
# Export token for all test runs
|
||||
export HATCHET_CLIENT_TOKEN
|
||||
|
||||
# Run all tests
|
||||
./scripts/run_all_tests.sh "$STATE"
|
||||
|
||||
echo ""
|
||||
echo "Generating report..."
|
||||
./scripts/generate_report.sh
|
||||
|
||||
echo ""
|
||||
echo "=========================================="
|
||||
echo "Build and test complete!"
|
||||
echo "=========================================="
|
||||
+62
@@ -0,0 +1,62 @@
|
||||
#!/bin/bash
|
||||
# collect_results.sh - Collect and aggregate results from all test runs
|
||||
|
||||
set -e
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
RESULTS_DIR="$TEST_DIR/results"
|
||||
|
||||
if [ ! -d "$RESULTS_DIR/disabled" ] || [ ! -d "$RESULTS_DIR/enabled" ]; then
|
||||
echo "Error: Results directories not found"
|
||||
echo "Please run tests first: ./scripts/run_all_tests.sh disabled && ./scripts/run_all_tests.sh enabled"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Collecting results..."
|
||||
|
||||
# Function to parse bytes from summary file
|
||||
parse_bytes() {
|
||||
local file=$1
|
||||
if [ -f "$file" ]; then
|
||||
source "$file"
|
||||
echo "$TOTAL_BYTES"
|
||||
else
|
||||
echo "0"
|
||||
fi
|
||||
}
|
||||
|
||||
# Collect results for each SDK
|
||||
declare -A baseline_results
|
||||
declare -A compressed_results
|
||||
|
||||
for SDK in go typescript python; do
|
||||
baseline_file="$RESULTS_DIR/disabled/${SDK}_network.log.summary"
|
||||
compressed_file="$RESULTS_DIR/enabled/${SDK}_network.log.summary"
|
||||
|
||||
baseline_results[$SDK]=$(parse_bytes "$baseline_file")
|
||||
compressed_results[$SDK]=$(parse_bytes "$compressed_file")
|
||||
done
|
||||
|
||||
# Calculate totals
|
||||
baseline_total=0
|
||||
compressed_total=0
|
||||
|
||||
for SDK in go typescript python; do
|
||||
baseline_total=$(echo "$baseline_total + ${baseline_results[$SDK]}" | bc)
|
||||
compressed_total=$(echo "$compressed_total + ${compressed_results[$SDK]}" | bc)
|
||||
done
|
||||
|
||||
# Save aggregated results
|
||||
{
|
||||
echo "BASELINE_TOTAL=$baseline_total"
|
||||
echo "COMPRESSED_TOTAL=$compressed_total"
|
||||
echo "GO_BASELINE=${baseline_results[go]}"
|
||||
echo "GO_COMPRESSED=${compressed_results[go]}"
|
||||
echo "TYPESCRIPT_BASELINE=${baseline_results[typescript]}"
|
||||
echo "TYPESCRIPT_COMPRESSED=${compressed_results[typescript]}"
|
||||
echo "PYTHON_BASELINE=${baseline_results[python]}"
|
||||
echo "PYTHON_COMPRESSED=${compressed_results[python]}"
|
||||
} > "$RESULTS_DIR/aggregated_results.txt"
|
||||
|
||||
echo "Results collected and saved to: $RESULTS_DIR/aggregated_results.txt"
|
||||
+102
@@ -0,0 +1,102 @@
|
||||
#!/bin/bash
|
||||
# generate_report.sh - Generate comparison report
|
||||
|
||||
set -e
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
RESULTS_DIR="$TEST_DIR/results"
|
||||
|
||||
# Collect results first
|
||||
"$SCRIPT_DIR/collect_results.sh"
|
||||
|
||||
# Load aggregated results
|
||||
if [ ! -f "$RESULTS_DIR/aggregated_results.txt" ]; then
|
||||
echo "Error: Aggregated results not found. Run collect_results.sh first."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
source "$RESULTS_DIR/aggregated_results.txt"
|
||||
|
||||
# Helper function to format bytes
|
||||
format_bytes() {
|
||||
local bytes=$1
|
||||
if [ $(echo "$bytes > 1073741824" | bc) -eq 1 ]; then
|
||||
echo "$(echo "scale=2; $bytes / 1073741824" | bc) GB"
|
||||
elif [ $(echo "$bytes > 1048576" | bc) -eq 1 ]; then
|
||||
echo "$(echo "scale=2; $bytes / 1048576" | bc) MB"
|
||||
elif [ $(echo "$bytes > 1024" | bc) -eq 1 ]; then
|
||||
echo "$(echo "scale=2; $bytes / 1024" | bc) KB"
|
||||
else
|
||||
echo "${bytes} B"
|
||||
fi
|
||||
}
|
||||
|
||||
# Helper function to calculate percentage reduction
|
||||
calc_reduction() {
|
||||
local baseline=$1
|
||||
local compressed=$2
|
||||
if [ $(echo "$baseline > 0" | bc) -eq 1 ]; then
|
||||
local diff=$(echo "$baseline - $compressed" | bc)
|
||||
local percent=$(echo "scale=1; ($diff / $baseline) * 100" | bc)
|
||||
echo "$percent"
|
||||
else
|
||||
echo "0"
|
||||
fi
|
||||
}
|
||||
|
||||
# Calculate reductions
|
||||
go_reduction=$(calc_reduction "$GO_BASELINE" "$GO_COMPRESSED")
|
||||
ts_reduction=$(calc_reduction "$TYPESCRIPT_BASELINE" "$TYPESCRIPT_COMPRESSED")
|
||||
python_reduction=$(calc_reduction "$PYTHON_BASELINE" "$PYTHON_COMPRESSED")
|
||||
total_reduction=$(calc_reduction "$BASELINE_TOTAL" "$COMPRESSED_TOTAL")
|
||||
|
||||
# Generate report
|
||||
REPORT_FILE="$RESULTS_DIR/compression_report.txt"
|
||||
|
||||
cat > "$REPORT_FILE" <<EOF
|
||||
========================================
|
||||
Compression Test Results
|
||||
========================================
|
||||
|
||||
Baseline (No Compression):
|
||||
Go SDK: $(format_bytes "$GO_BASELINE")
|
||||
TypeScript SDK: $(format_bytes "$TYPESCRIPT_BASELINE")
|
||||
Python SDK: $(format_bytes "$PYTHON_BASELINE")
|
||||
Total: $(format_bytes "$BASELINE_TOTAL")
|
||||
|
||||
With Compression:
|
||||
Go SDK: $(format_bytes "$GO_COMPRESSED") ($go_reduction% reduction)
|
||||
TypeScript SDK: $(format_bytes "$TYPESCRIPT_COMPRESSED") ($ts_reduction% reduction)
|
||||
Python SDK: $(format_bytes "$PYTHON_COMPRESSED") ($python_reduction% reduction)
|
||||
Total: $(format_bytes "$COMPRESSED_TOTAL") ($total_reduction% reduction)
|
||||
|
||||
Bandwidth Savings:
|
||||
Total Saved: $(format_bytes "$(echo "$BASELINE_TOTAL - $COMPRESSED_TOTAL" | bc)")
|
||||
Reduction: $total_reduction%
|
||||
|
||||
========================================
|
||||
Detailed Breakdown
|
||||
========================================
|
||||
|
||||
Go SDK:
|
||||
Baseline: $(format_bytes "$GO_BASELINE")
|
||||
Compressed: $(format_bytes "$GO_COMPRESSED")
|
||||
Savings: $(format_bytes "$(echo "$GO_BASELINE - $GO_COMPRESSED" | bc)") ($go_reduction%)
|
||||
|
||||
TypeScript SDK:
|
||||
Baseline: $(format_bytes "$TYPESCRIPT_BASELINE")
|
||||
Compressed: $(format_bytes "$TYPESCRIPT_COMPRESSED")
|
||||
Savings: $(format_bytes "$(echo "$TYPESCRIPT_BASELINE - $TYPESCRIPT_COMPRESSED" | bc)") ($ts_reduction%)
|
||||
|
||||
Python SDK:
|
||||
Baseline: $(format_bytes "$PYTHON_BASELINE")
|
||||
Compressed: $(format_bytes "$PYTHON_COMPRESSED")
|
||||
Savings: $(format_bytes "$(echo "$PYTHON_BASELINE - $PYTHON_COMPRESSED" | bc)") ($python_reduction%)
|
||||
|
||||
========================================
|
||||
EOF
|
||||
|
||||
cat "$REPORT_FILE"
|
||||
echo ""
|
||||
echo "Full report saved to: $REPORT_FILE"
|
||||
+268
@@ -0,0 +1,268 @@
|
||||
#!/bin/bash
|
||||
# monitor_network.sh - Monitors network traffic for a Docker container
|
||||
|
||||
set -e
|
||||
|
||||
CONTAINER_NAME=$1
|
||||
DURATION=${2:-60}
|
||||
OUTPUT_FILE=${3:-/tmp/network_stats_${CONTAINER_NAME}.log}
|
||||
|
||||
if [ -z "$CONTAINER_NAME" ]; then
|
||||
echo "Usage: $0 <container_name> [duration_seconds] [output_file]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Monitoring network for container: $CONTAINER_NAME"
|
||||
echo "Duration: ${DURATION}s"
|
||||
echo "Output: $OUTPUT_FILE"
|
||||
|
||||
# Clear output file
|
||||
> "$OUTPUT_FILE"
|
||||
|
||||
# Wait a moment for container to be ready
|
||||
sleep 1
|
||||
|
||||
# Helper function to parse stats and convert to bytes
|
||||
parse_stats_to_bytes() {
|
||||
local stats_line="$1"
|
||||
local rx_bytes=0
|
||||
local tx_bytes=0
|
||||
|
||||
# Strip any remaining ANSI codes and clean up
|
||||
stats_line=$(echo "$stats_line" | sed 's/\x1b\[[0-9;]*[a-zA-Z]//g' | sed 's/\x1b\[[0-9;]*m//g' | tr -d '\r' | xargs)
|
||||
|
||||
# Docker stats format: "1.2MB / 3.4MB" or "1.2KB / 3.4KB" or "2.95kB / 2.35kB" etc.
|
||||
# Try to match the pattern - handle both uppercase and lowercase 'k'
|
||||
if [[ $stats_line =~ ([0-9.]+)([kmgtKMG]?B)\ */\ *([0-9.]+)([kmgtKMG]?B) ]]; then
|
||||
RX_VAL=${BASH_REMATCH[1]}
|
||||
RX_UNIT=${BASH_REMATCH[2]}
|
||||
TX_VAL=${BASH_REMATCH[3]}
|
||||
TX_UNIT=${BASH_REMATCH[4]}
|
||||
|
||||
# Normalize units to uppercase
|
||||
RX_UNIT=$(echo "$RX_UNIT" | tr '[:lower:]' '[:upper:]')
|
||||
TX_UNIT=$(echo "$TX_UNIT" | tr '[:lower:]' '[:upper:]')
|
||||
|
||||
rx_bytes=$(awk -v val="$RX_VAL" -v unit="$RX_UNIT" 'BEGIN {
|
||||
if (unit == "B") print int(val)
|
||||
else if (unit == "KB") print int(val * 1024)
|
||||
else if (unit == "MB") print int(val * 1024 * 1024)
|
||||
else if (unit == "GB") print int(val * 1024 * 1024 * 1024)
|
||||
else if (unit == "TB") print int(val * 1024 * 1024 * 1024 * 1024)
|
||||
else print int(val)
|
||||
}')
|
||||
|
||||
tx_bytes=$(awk -v val="$TX_VAL" -v unit="$TX_UNIT" 'BEGIN {
|
||||
if (unit == "B") print int(val)
|
||||
else if (unit == "KB") print int(val * 1024)
|
||||
else if (unit == "MB") print int(val * 1024 * 1024)
|
||||
else if (unit == "GB") print int(val * 1024 * 1024 * 1024)
|
||||
else if (unit == "TB") print int(val * 1024 * 1024 * 1024 * 1024)
|
||||
else print int(val)
|
||||
}')
|
||||
elif [[ $stats_line =~ ([0-9.]+)([KMGT]?iB)\ */\ *([0-9.]+)([KMGT]?iB) ]]; then
|
||||
# Handle KiB, MiB, GiB, TiB format
|
||||
RX_VAL=${BASH_REMATCH[1]}
|
||||
RX_UNIT=${BASH_REMATCH[2]}
|
||||
TX_VAL=${BASH_REMATCH[3]}
|
||||
TX_UNIT=${BASH_REMATCH[4]}
|
||||
|
||||
rx_bytes=$(awk -v val="$RX_VAL" -v unit="$RX_UNIT" 'BEGIN {
|
||||
if (unit == "KiB") print int(val * 1024)
|
||||
else if (unit == "MiB") print int(val * 1024 * 1024)
|
||||
else if (unit == "GiB") print int(val * 1024 * 1024 * 1024)
|
||||
else if (unit == "TiB") print int(val * 1024 * 1024 * 1024 * 1024)
|
||||
else print int(val)
|
||||
}')
|
||||
|
||||
tx_bytes=$(awk -v val="$TX_VAL" -v unit="$TX_UNIT" 'BEGIN {
|
||||
if (unit == "KiB") print int(val * 1024)
|
||||
else if (unit == "MiB") print int(val * 1024 * 1024)
|
||||
else if (unit == "GiB") print int(val * 1024 * 1024 * 1024)
|
||||
else if (unit == "TiB") print int(val * 1024 * 1024 * 1024 * 1024)
|
||||
else print int(val)
|
||||
}')
|
||||
fi
|
||||
|
||||
echo "$rx_bytes $tx_bytes"
|
||||
}
|
||||
|
||||
# Helper function to strip ANSI escape codes
|
||||
strip_ansi() {
|
||||
echo "$1" | sed 's/\x1b\[[0-9;]*[a-zA-Z]//g' | sed 's/\x1b\[[0-9;]*m//g' | tr -d '\r'
|
||||
}
|
||||
|
||||
# Get initial stats (retry a few times if container not ready)
|
||||
INITIAL_STATS="0B / 0B"
|
||||
for i in {1..10}; do
|
||||
RAW_STATS=$(docker stats --no-stream --format "{{.NetIO}}" "$CONTAINER_NAME" 2>/dev/null || echo "0B / 0B")
|
||||
INITIAL_STATS=$(strip_ansi "$RAW_STATS")
|
||||
# Check if we got valid stats (not empty and not just "0B / 0B")
|
||||
if [ -n "$INITIAL_STATS" ] && [ "$INITIAL_STATS" != "0B / 0B" ] && [ "$INITIAL_STATS" != "-- / --" ] && [ "$INITIAL_STATS" != "" ]; then
|
||||
break
|
||||
fi
|
||||
if [ "$i" -eq 10 ]; then
|
||||
echo "Warning: Could not get initial stats after 10 attempts, using 0B / 0B" >> "$OUTPUT_FILE"
|
||||
INITIAL_STATS="0B / 0B"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
echo "Initial: $INITIAL_STATS" >> "$OUTPUT_FILE"
|
||||
|
||||
INITIAL_BYTES=$(parse_stats_to_bytes "$INITIAL_STATS")
|
||||
INITIAL_RX=$(echo $INITIAL_BYTES | awk '{print $1}')
|
||||
INITIAL_TX=$(echo $INITIAL_BYTES | awk '{print $2}')
|
||||
|
||||
# Set up trap to write summary if interrupted (after INITIAL_STATS is set)
|
||||
handle_exit() {
|
||||
# Ensure summary is written before exiting
|
||||
if [ -n "$OUTPUT_FILE" ] && [ -n "$INITIAL_STATS" ] && [ ! -f "${OUTPUT_FILE}.summary" ]; then
|
||||
# Get final stats one more time, or use last valid stats
|
||||
FINAL_STATS="0B / 0B"
|
||||
for i in {1..5}; do
|
||||
if docker ps -a --format '{{.Names}}' | grep -q "^${CONTAINER_NAME}$"; then
|
||||
RAW_STATS=$(docker stats --no-stream --format "{{.NetIO}}" "$CONTAINER_NAME" 2>/dev/null || echo "0B / 0B")
|
||||
FINAL_STATS=$(strip_ansi "$RAW_STATS")
|
||||
if [ -n "$FINAL_STATS" ] && [ "$FINAL_STATS" != "-- / --" ] && [ "$FINAL_STATS" != "" ] && [ "$FINAL_STATS" != "0B / 0B" ]; then
|
||||
LAST_VALID_STATS="$FINAL_STATS"
|
||||
break
|
||||
fi
|
||||
fi
|
||||
sleep 0.2
|
||||
done
|
||||
|
||||
# If final stats are 0B / 0B but we have a last valid stats, use that
|
||||
if [ "$FINAL_STATS" = "0B / 0B" ] && [ -n "$LAST_VALID_STATS" ] && [ "$LAST_VALID_STATS" != "0B / 0B" ]; then
|
||||
FINAL_STATS="$LAST_VALID_STATS"
|
||||
fi
|
||||
|
||||
if [ -n "$FINAL_STATS" ] && [ "$FINAL_STATS" != "0B / 0B" ]; then
|
||||
echo "Final: $FINAL_STATS" >> "$OUTPUT_FILE"
|
||||
FINAL_BYTES=$(parse_stats_to_bytes "$FINAL_STATS")
|
||||
FINAL_RX=$(echo $FINAL_BYTES | awk '{print $1}')
|
||||
FINAL_TX=$(echo $FINAL_BYTES | awk '{print $2}')
|
||||
TOTAL_RX=$((FINAL_RX - INITIAL_RX))
|
||||
TOTAL_TX=$((FINAL_TX - INITIAL_TX))
|
||||
TOTAL_BYTES=$((TOTAL_RX + TOTAL_TX))
|
||||
|
||||
{
|
||||
echo "RX_BYTES=$TOTAL_RX"
|
||||
echo "TX_BYTES=$TOTAL_TX"
|
||||
echo "TOTAL_BYTES=$TOTAL_BYTES"
|
||||
} > "${OUTPUT_FILE}.summary"
|
||||
fi
|
||||
fi
|
||||
exit 0
|
||||
}
|
||||
trap 'handle_exit' TERM INT
|
||||
|
||||
# Monitor periodically and capture stats
|
||||
INTERVAL=5 # Check every 5 seconds
|
||||
ITERATIONS=$((DURATION / INTERVAL))
|
||||
LAST_VALID_STATS="0B / 0B"
|
||||
for i in $(seq 1 $ITERATIONS); do
|
||||
# Check if container still exists - if not, break early but still write summary
|
||||
if ! docker ps -a --format '{{.Names}}' | grep -q "^${CONTAINER_NAME}$"; then
|
||||
break
|
||||
fi
|
||||
RAW_STATS=$(docker stats --no-stream --format "{{.NetIO}}" "$CONTAINER_NAME" 2>/dev/null || echo "0B / 0B")
|
||||
STATS=$(strip_ansi "$RAW_STATS")
|
||||
echo "$STATS" >> "$OUTPUT_FILE"
|
||||
# Keep track of last valid (non-zero) stats
|
||||
if [ -n "$STATS" ] && [ "$STATS" != "0B / 0B" ] && [ "$STATS" != "-- / --" ] && [ "$STATS" != "" ]; then
|
||||
LAST_VALID_STATS="$STATS"
|
||||
fi
|
||||
sleep "$INTERVAL"
|
||||
done
|
||||
|
||||
# Get final stats (retry a few times, even if container stopped)
|
||||
# Use last valid stats if container stopped and returns 0B / 0B
|
||||
FINAL_STATS="0B / 0B"
|
||||
for i in {1..10}; do
|
||||
# Check if container exists (running or stopped)
|
||||
if docker ps -a --format '{{.Names}}' | grep -q "^${CONTAINER_NAME}$"; then
|
||||
RAW_STATS=$(docker stats --no-stream --format "{{.NetIO}}" "$CONTAINER_NAME" 2>/dev/null || echo "0B / 0B")
|
||||
FINAL_STATS=$(strip_ansi "$RAW_STATS")
|
||||
if [ -n "$FINAL_STATS" ] && [ "$FINAL_STATS" != "-- / --" ] && [ "$FINAL_STATS" != "" ] && [ "$FINAL_STATS" != "0B / 0B" ]; then
|
||||
LAST_VALID_STATS="$FINAL_STATS"
|
||||
break
|
||||
fi
|
||||
fi
|
||||
sleep 0.5
|
||||
done
|
||||
|
||||
# If final stats are 0B / 0B but we have a last valid stats, use that
|
||||
if [ "$FINAL_STATS" = "0B / 0B" ] && [ -n "$LAST_VALID_STATS" ] && [ "$LAST_VALID_STATS" != "0B / 0B" ]; then
|
||||
FINAL_STATS="$LAST_VALID_STATS"
|
||||
fi
|
||||
|
||||
echo "Final: $FINAL_STATS" >> "$OUTPUT_FILE"
|
||||
|
||||
FINAL_BYTES=$(parse_stats_to_bytes "$FINAL_STATS")
|
||||
FINAL_RX=$(echo $FINAL_BYTES | awk '{print $1}')
|
||||
FINAL_TX=$(echo $FINAL_BYTES | awk '{print $2}')
|
||||
|
||||
# Calculate difference
|
||||
TOTAL_RX=$((FINAL_RX - INITIAL_RX))
|
||||
TOTAL_TX=$((FINAL_TX - INITIAL_TX))
|
||||
|
||||
echo "Monitoring complete. Results saved to: $OUTPUT_FILE"
|
||||
|
||||
# Check if bc is available, if not use awk for calculations
|
||||
if command -v bc >/dev/null 2>&1; then
|
||||
USE_BC=true
|
||||
else
|
||||
USE_BC=false
|
||||
echo "Warning: bc not found, using awk for calculations (may be less precise)"
|
||||
fi
|
||||
|
||||
# TOTAL_RX and TOTAL_TX are already calculated above as the difference
|
||||
# between initial and final stats
|
||||
|
||||
# Helper function to format bytes in human-readable format
|
||||
format_bytes() {
|
||||
local bytes=$1
|
||||
if [ "$USE_BC" = true ]; then
|
||||
if [ $(echo "$bytes >= 1099511627776" | bc) -eq 1 ]; then
|
||||
echo "$(echo "scale=2; $bytes / 1099511627776" | bc) TB"
|
||||
elif [ $(echo "$bytes >= 1073741824" | bc) -eq 1 ]; then
|
||||
echo "$(echo "scale=2; $bytes / 1073741824" | bc) GB"
|
||||
elif [ $(echo "$bytes >= 1048576" | bc) -eq 1 ]; then
|
||||
echo "$(echo "scale=2; $bytes / 1048576" | bc) MB"
|
||||
elif [ $(echo "$bytes >= 1024" | bc) -eq 1 ]; then
|
||||
echo "$(echo "scale=2; $bytes / 1024" | bc) KB"
|
||||
else
|
||||
echo "${bytes} B"
|
||||
fi
|
||||
else
|
||||
if [ $bytes -ge 1099511627776 ]; then
|
||||
awk "BEGIN {printf \"%.2f TB\", $bytes / 1099511627776}"
|
||||
elif [ $bytes -ge 1073741824 ]; then
|
||||
awk "BEGIN {printf \"%.2f GB\", $bytes / 1073741824}"
|
||||
elif [ $bytes -ge 1048576 ]; then
|
||||
awk "BEGIN {printf \"%.2f MB\", $bytes / 1048576}"
|
||||
elif [ $bytes -ge 1024 ]; then
|
||||
awk "BEGIN {printf \"%.2f KB\", $bytes / 1024}"
|
||||
else
|
||||
echo "${bytes} B"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
# Output summary
|
||||
echo "=== Network Summary ==="
|
||||
TOTAL_BYTES=$((TOTAL_RX + TOTAL_TX))
|
||||
RX_FORMATTED=$(format_bytes $TOTAL_RX)
|
||||
TX_FORMATTED=$(format_bytes $TOTAL_TX)
|
||||
TOTAL_FORMATTED=$(format_bytes $TOTAL_BYTES)
|
||||
echo "Total Received: $RX_FORMATTED"
|
||||
echo "Total Sent: $TX_FORMATTED"
|
||||
echo "Total: $TOTAL_FORMATTED"
|
||||
|
||||
# Save summary to file
|
||||
{
|
||||
echo "RX_BYTES=$TOTAL_RX"
|
||||
echo "TX_BYTES=$TOTAL_TX"
|
||||
echo "TOTAL_BYTES=$TOTAL_BYTES"
|
||||
} > "${OUTPUT_FILE}.summary"
|
||||
+138
@@ -0,0 +1,138 @@
|
||||
#!/bin/bash
|
||||
# Recalculate network summary from an existing log file
|
||||
|
||||
LOG_FILE=$1
|
||||
if [ -z "$LOG_FILE" ]; then
|
||||
echo "Usage: $0 <log_file>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Helper function to strip ANSI escape codes
|
||||
strip_ansi() {
|
||||
echo "$1" | sed 's/\x1b\[[0-9;]*[a-zA-Z]//g' | sed 's/\x1b\[[0-9;]*m//g' | tr -d '\r'
|
||||
}
|
||||
|
||||
# Helper function to parse stats and convert to bytes
|
||||
parse_stats_to_bytes() {
|
||||
local stats_line="$1"
|
||||
local rx_bytes=0
|
||||
local tx_bytes=0
|
||||
|
||||
stats_line=$(echo "$stats_line" | sed 's/\x1b\[[0-9;]*[a-zA-Z]//g' | sed 's/\x1b\[[0-9;]*m//g' | tr -d '\r' | xargs)
|
||||
|
||||
if [[ $stats_line =~ ([0-9.]+)([kmgtKMG]?B)\ */\ *([0-9.]+)([kmgtKMG]?B) ]]; then
|
||||
RX_VAL=${BASH_REMATCH[1]}
|
||||
RX_UNIT=${BASH_REMATCH[2]}
|
||||
TX_VAL=${BASH_REMATCH[3]}
|
||||
TX_UNIT=${BASH_REMATCH[4]}
|
||||
|
||||
RX_UNIT=$(echo "$RX_UNIT" | tr '[:lower:]' '[:upper:]')
|
||||
TX_UNIT=$(echo "$TX_UNIT" | tr '[:lower:]' '[:upper:]')
|
||||
|
||||
rx_bytes=$(awk -v val="$RX_VAL" -v unit="$RX_UNIT" 'BEGIN {
|
||||
if (unit == "B") print int(val)
|
||||
else if (unit == "KB") print int(val * 1024)
|
||||
else if (unit == "MB") print int(val * 1024 * 1024)
|
||||
else if (unit == "GB") print int(val * 1024 * 1024 * 1024)
|
||||
else if (unit == "TB") print int(val * 1024 * 1024 * 1024 * 1024)
|
||||
else print int(val)
|
||||
}')
|
||||
|
||||
tx_bytes=$(awk -v val="$TX_VAL" -v unit="$TX_UNIT" 'BEGIN {
|
||||
if (unit == "B") print int(val)
|
||||
else if (unit == "KB") print int(val * 1024)
|
||||
else if (unit == "MB") print int(val * 1024 * 1024)
|
||||
else if (unit == "GB") print int(val * 1024 * 1024 * 1024)
|
||||
else if (unit == "TB") print int(val * 1024 * 1024 * 1024 * 1024)
|
||||
else print int(val)
|
||||
}')
|
||||
fi
|
||||
|
||||
echo "$rx_bytes $tx_bytes"
|
||||
}
|
||||
# Extract initial and final stats from log file
|
||||
# First, strip ANSI codes and get clean lines
|
||||
CLEAN_LOG=$(mktemp)
|
||||
sed 's/\x1b\[[0-9;]*[a-zA-Z]//g' "$LOG_FILE" | sed 's/\x1b\[[0-9;]*m//g' | tr -d '\r' > "$CLEAN_LOG"
|
||||
|
||||
# Try to find Initial line
|
||||
INITIAL_LINE=$(grep "^Initial:" "$CLEAN_LOG" | head -1 | sed 's/^Initial: *//' | xargs)
|
||||
|
||||
# Try to find Final line
|
||||
FINAL_LINE=$(grep "^Final:" "$CLEAN_LOG" | tail -1 | sed 's/^Final: *//' | xargs)
|
||||
|
||||
# If no Final line, try to get the last valid stats line
|
||||
if [ -z "$FINAL_LINE" ]; then
|
||||
FINAL_LINE=$(grep -E "^[0-9]+\.[0-9]+[kmgtKMG]?B / [0-9]+\.[0-9]+[kmgtKMG]?B$" "$CLEAN_LOG" | tail -1 | xargs)
|
||||
fi
|
||||
|
||||
# If no Initial line, try to get the first valid stats line
|
||||
if [ -z "$INITIAL_LINE" ]; then
|
||||
INITIAL_LINE=$(grep -E "^[0-9]+\.[0-9]+[kmgtKMG]?B / [0-9]+\.[0-9]+[kmgtKMG]?B$" "$CLEAN_LOG" | head -1 | xargs)
|
||||
fi
|
||||
|
||||
rm -f "$CLEAN_LOG"
|
||||
|
||||
if [ -z "$INITIAL_LINE" ] || [ -z "$FINAL_LINE" ]; then
|
||||
echo "Error: Could not find Initial or Final stats in log file"
|
||||
echo "Initial: '$INITIAL_LINE'"
|
||||
echo "Final: '$FINAL_LINE'"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
INITIAL_STATS=$(strip_ansi "$INITIAL_LINE")
|
||||
FINAL_STATS=$(strip_ansi "$FINAL_LINE")
|
||||
|
||||
INITIAL_BYTES=$(parse_stats_to_bytes "$INITIAL_STATS")
|
||||
FINAL_BYTES=$(parse_stats_to_bytes "$FINAL_STATS")
|
||||
|
||||
INITIAL_RX=$(echo $INITIAL_BYTES | awk '{print $1}')
|
||||
INITIAL_TX=$(echo $INITIAL_BYTES | awk '{print $2}')
|
||||
FINAL_RX=$(echo $FINAL_BYTES | awk '{print $1}')
|
||||
FINAL_TX=$(echo $FINAL_BYTES | awk '{print $2}')
|
||||
|
||||
TOTAL_RX=$((FINAL_RX - INITIAL_RX))
|
||||
TOTAL_TX=$((FINAL_TX - INITIAL_TX))
|
||||
TOTAL_BYTES=$((TOTAL_RX + TOTAL_TX))
|
||||
|
||||
# Helper function to format bytes in human-readable format
|
||||
format_bytes() {
|
||||
local bytes=$1
|
||||
if [ $bytes -ge 1099511627776 ]; then
|
||||
awk "BEGIN {printf \"%.2f TB\", $bytes / 1099511627776}"
|
||||
elif [ $bytes -ge 1073741824 ]; then
|
||||
awk "BEGIN {printf \"%.2f GB\", $bytes / 1073741824}"
|
||||
elif [ $bytes -ge 1048576 ]; then
|
||||
awk "BEGIN {printf \"%.2f MB\", $bytes / 1048576}"
|
||||
elif [ $bytes -ge 1024 ]; then
|
||||
awk "BEGIN {printf \"%.2f KB\", $bytes / 1024}"
|
||||
else
|
||||
echo "${bytes} B"
|
||||
fi
|
||||
}
|
||||
|
||||
RX_FORMATTED=$(format_bytes $TOTAL_RX)
|
||||
TX_FORMATTED=$(format_bytes $TOTAL_TX)
|
||||
TOTAL_FORMATTED=$(format_bytes $TOTAL_BYTES)
|
||||
|
||||
echo "Initial: $INITIAL_STATS ($INITIAL_RX bytes RX, $INITIAL_TX bytes TX)"
|
||||
echo "Final: $FINAL_STATS ($FINAL_RX bytes RX, $FINAL_TX bytes TX)"
|
||||
echo ""
|
||||
echo "=== Network Summary ==="
|
||||
echo "Total Received: $RX_FORMATTED"
|
||||
echo "Total Sent: $TX_FORMATTED"
|
||||
echo "Total: $TOTAL_FORMATTED"
|
||||
echo ""
|
||||
echo "RX_BYTES=$TOTAL_RX"
|
||||
echo "TX_BYTES=$TOTAL_TX"
|
||||
echo "TOTAL_BYTES=$TOTAL_BYTES"
|
||||
|
||||
# Save to summary file
|
||||
{
|
||||
echo "RX_BYTES=$TOTAL_RX"
|
||||
echo "TX_BYTES=$TOTAL_TX"
|
||||
echo "TOTAL_BYTES=$TOTAL_BYTES"
|
||||
} > "${LOG_FILE}.summary"
|
||||
|
||||
echo ""
|
||||
echo "Summary saved to: ${LOG_FILE}.summary"
|
||||
+52
@@ -0,0 +1,52 @@
|
||||
#!/bin/bash
|
||||
# run_all_tests.sh - Run all SDK tests for a given compression state
|
||||
|
||||
set -e
|
||||
|
||||
STATE=$1
|
||||
EVENTS_COUNT=${2:-10} # Default to 10 events if not specified
|
||||
|
||||
if [ -z "$STATE" ]; then
|
||||
echo "Usage: $0 <state> [events_count]"
|
||||
echo " state: enabled or disabled"
|
||||
echo " events_count: number of events to send (default: 10)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Validate required environment variables
|
||||
if [ -z "$HATCHET_CLIENT_TOKEN" ]; then
|
||||
echo "Error: HATCHET_CLIENT_TOKEN environment variable is required"
|
||||
echo "Usage: export HATCHET_CLIENT_TOKEN='your-token' && $0 <state>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Set default host port for macOS Docker (use IP to avoid IPv6 resolution issues)
|
||||
if [ -z "$HATCHET_CLIENT_HOST_PORT" ]; then
|
||||
# Get the Docker gateway IP (host.docker.internal IPv4)
|
||||
GATEWAY_IP=$(docker run --rm alpine getent hosts host.docker.internal 2>/dev/null | awk '{print $1}' | grep -E '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | head -1)
|
||||
if [ -z "$GATEWAY_IP" ]; then
|
||||
GATEWAY_IP="192.168.65.254" # Default Docker Desktop gateway
|
||||
fi
|
||||
export HATCHET_CLIENT_HOST_PORT="${GATEWAY_IP}:7070"
|
||||
echo "Using default HATCHET_CLIENT_HOST_PORT: $HATCHET_CLIENT_HOST_PORT"
|
||||
fi
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
|
||||
echo "=========================================="
|
||||
echo "Running all SDK tests (${STATE} compression)"
|
||||
echo "Events per test: ${EVENTS_COUNT}"
|
||||
echo "=========================================="
|
||||
echo ""
|
||||
|
||||
# Run tests sequentially in order: python, typescript, go
|
||||
for SDK in python typescript go; do
|
||||
echo "Running ${SDK} SDK test..."
|
||||
"$SCRIPT_DIR/run_test.sh" "$SDK" "$STATE" "$EVENTS_COUNT"
|
||||
echo ""
|
||||
sleep 5 # Brief pause between tests
|
||||
done
|
||||
|
||||
echo "=========================================="
|
||||
echo "All tests complete for ${STATE} compression"
|
||||
echo "=========================================="
|
||||
+215
@@ -0,0 +1,215 @@
|
||||
#!/bin/bash
|
||||
# run_test.sh - Run compression test for a specific SDK
|
||||
|
||||
set -e
|
||||
|
||||
SDK=$1
|
||||
STATE=$2
|
||||
EVENTS_COUNT=${3:-10} # Default to 10 events if not specified
|
||||
|
||||
if [ -z "$SDK" ] || [ -z "$STATE" ]; then
|
||||
echo "Usage: $0 <sdk> <state> [events_count]"
|
||||
echo " sdk: go, typescript, or python"
|
||||
echo " state: enabled or disabled"
|
||||
echo " events_count: number of events to send (default: 10)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$STATE" != "enabled" ] && [ "$STATE" != "disabled" ]; then
|
||||
echo "Error: state must be 'enabled' or 'disabled'"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
RESULTS_DIR="$TEST_DIR/results/$STATE"
|
||||
CLIENT_CONTAINER="hatchet-client-${SDK}"
|
||||
|
||||
mkdir -p "$RESULTS_DIR"
|
||||
|
||||
echo "=========================================="
|
||||
echo "Running ${SDK} SDK test (${STATE} compression)"
|
||||
echo "Events: ${EVENTS_COUNT}"
|
||||
echo "=========================================="
|
||||
|
||||
# Validate required environment variables
|
||||
if [ -z "$HATCHET_CLIENT_TOKEN" ]; then
|
||||
echo "Error: HATCHET_CLIENT_TOKEN environment variable is required"
|
||||
echo "Usage: export HATCHET_CLIENT_TOKEN='your-token' && $0 <sdk> <state>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Set default host port for macOS Docker (use IP to avoid IPv6 resolution issues)
|
||||
if [ -z "$HATCHET_CLIENT_HOST_PORT" ]; then
|
||||
# Get the Docker gateway IP (host.docker.internal IPv4)
|
||||
GATEWAY_IP=$(docker run --rm alpine getent hosts host.docker.internal 2>/dev/null | awk '{print $1}' | grep -E '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | head -1)
|
||||
if [ -z "$GATEWAY_IP" ]; then
|
||||
GATEWAY_IP="192.168.65.254" # Default Docker Desktop gateway
|
||||
fi
|
||||
export HATCHET_CLIENT_HOST_PORT="${GATEWAY_IP}:7070"
|
||||
echo "Using default HATCHET_CLIENT_HOST_PORT: $HATCHET_CLIENT_HOST_PORT"
|
||||
fi
|
||||
|
||||
# Check if client image exists
|
||||
IMAGE_NAME="${SDK}-${STATE}-compression"
|
||||
if ! docker image inspect "$IMAGE_NAME" >/dev/null 2>&1; then
|
||||
echo "Error: Docker image '$IMAGE_NAME' not found"
|
||||
echo "Please build it first. See README.md for build instructions."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Clean up any existing container
|
||||
docker rm -f "$CLIENT_CONTAINER" 2>/dev/null || true
|
||||
|
||||
# Start the client container in background
|
||||
echo "Starting ${SDK} client container..."
|
||||
cd "$TEST_DIR"
|
||||
|
||||
# Export environment variables for docker-compose
|
||||
export COMPRESSION_STATE="$STATE"
|
||||
export HATCHET_CLIENT_TOKEN="${HATCHET_CLIENT_TOKEN}"
|
||||
export HATCHET_CLIENT_HOST_PORT="${HATCHET_CLIENT_HOST_PORT}"
|
||||
export HATCHET_CLIENT_SERVER_URL="${HATCHET_CLIENT_SERVER_URL:-http://localhost:8080}"
|
||||
export HATCHET_CLIENT_API_URL="${HATCHET_CLIENT_API_URL:-${HATCHET_CLIENT_SERVER_URL:-http://localhost:8080}}"
|
||||
export HATCHET_CLIENT_NAMESPACE="${HATCHET_CLIENT_NAMESPACE:-compression-test}"
|
||||
export TEST_EVENTS_COUNT="${EVENTS_COUNT}"
|
||||
|
||||
# For Go test: --events is the rate (events per second), not total count
|
||||
# To get approximately EVENTS_COUNT events, we need: duration = EVENTS_COUNT / rate
|
||||
# We'll use 10 events per second rate
|
||||
# Note: Due to timing, this may emit slightly more than EVENTS_COUNT events
|
||||
EVENTS_PER_SECOND=10
|
||||
# Calculate duration: events_count / rate (rounded up to nearest second)
|
||||
DURATION_SECONDS=$((EVENTS_COUNT / EVENTS_PER_SECOND))
|
||||
# Round up if there's a remainder
|
||||
if [ $((EVENTS_COUNT % EVENTS_PER_SECOND)) -gt 0 ]; then
|
||||
DURATION_SECONDS=$((DURATION_SECONDS + 1))
|
||||
fi
|
||||
# Ensure minimum of 1 second
|
||||
if [ $DURATION_SECONDS -lt 1 ]; then
|
||||
DURATION_SECONDS=1
|
||||
fi
|
||||
export TEST_EVENTS_RATE="${EVENTS_PER_SECOND}"
|
||||
export TEST_DURATION="${DURATION_SECONDS}s"
|
||||
# Wait time should be duration + buffer for events to complete processing
|
||||
# Add buffer for processing time (events take time to execute)
|
||||
export TEST_WAIT="$((DURATION_SECONDS + 5))s"
|
||||
|
||||
# Run docker-compose with environment variables
|
||||
docker-compose run -d \
|
||||
--name "$CLIENT_CONTAINER" \
|
||||
"client-${SDK}" > /dev/null 2>&1
|
||||
|
||||
# Wait a moment for container to start
|
||||
sleep 2
|
||||
|
||||
# Calculate monitoring duration based on test duration
|
||||
# For Go: use TEST_DURATION, for others: calculate from events count
|
||||
if [ "$SDK" = "go" ]; then
|
||||
# Parse duration (e.g., "5s" -> 5)
|
||||
MONITOR_DURATION=$(echo "$TEST_DURATION" | sed 's/s$//')
|
||||
MONITOR_DURATION=$((MONITOR_DURATION + 10)) # Add buffer
|
||||
else
|
||||
# For Python/TypeScript: events / 10 events per second + buffer
|
||||
EVENTS_PER_SECOND=10
|
||||
MONITOR_DURATION=$((EVENTS_COUNT / EVENTS_PER_SECOND + 15)) # Add 15 second buffer
|
||||
fi
|
||||
|
||||
# Start network monitoring
|
||||
echo "Starting network monitoring..."
|
||||
"$SCRIPT_DIR/monitor_network.sh" "$CLIENT_CONTAINER" "$MONITOR_DURATION" "$RESULTS_DIR/${SDK}_network.log" &
|
||||
MONITOR_PID=$!
|
||||
|
||||
# Stream logs in real-time (limit to last 10 lines to avoid huge files)
|
||||
echo "Streaming container logs (press Ctrl+C to stop streaming, container will continue)..."
|
||||
docker logs -f --tail 10 "$CLIENT_CONTAINER" 2>&1 | tee "$RESULTS_DIR/${SDK}_test.log" &
|
||||
LOGS_PID=$!
|
||||
|
||||
# Wait for container to complete (with timeout)
|
||||
echo "Waiting for test to complete..."
|
||||
# Increase timeout for TypeScript (it may take longer to process)
|
||||
if [ "$SDK" = "typescript" ]; then
|
||||
TIMEOUT=180 # 3 minutes timeout for TypeScript
|
||||
else
|
||||
TIMEOUT=120 # 2 minutes timeout for others
|
||||
fi
|
||||
ELAPSED=0
|
||||
while [ $ELAPSED -lt $TIMEOUT ]; do
|
||||
if ! docker ps --format '{{.Names}}' | grep -q "^${CLIENT_CONTAINER}$"; then
|
||||
# Container stopped
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
ELAPSED=$((ELAPSED + 1))
|
||||
done
|
||||
|
||||
# If container is still running after timeout, kill it
|
||||
if docker ps --format '{{.Names}}' | grep -q "^${CLIENT_CONTAINER}$"; then
|
||||
echo "Warning: Test timed out after ${TIMEOUT}s, stopping container..."
|
||||
docker stop "$CLIENT_CONTAINER" > /dev/null 2>&1 || true
|
||||
sleep 2 # Give it a moment to stop
|
||||
fi
|
||||
|
||||
# Stop log streaming
|
||||
kill $LOGS_PID 2>/dev/null || true
|
||||
wait $LOGS_PID 2>/dev/null || true
|
||||
|
||||
# Wait for monitoring to complete - it needs to finish to write the summary file
|
||||
# The monitoring script runs for MONITOR_DURATION seconds, then writes the summary
|
||||
# Wait for it to complete (MONITOR_DURATION + small buffer for file I/O)
|
||||
MONITOR_TIMEOUT=$((MONITOR_DURATION + 5))
|
||||
ELAPSED=0
|
||||
while [ $ELAPSED -lt $MONITOR_TIMEOUT ]; do
|
||||
if ! kill -0 $MONITOR_PID 2>/dev/null; then
|
||||
# Monitoring script finished
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
ELAPSED=$((ELAPSED + 1))
|
||||
done
|
||||
|
||||
# If still running after timeout, force kill it
|
||||
if kill -0 $MONITOR_PID 2>/dev/null; then
|
||||
kill -KILL $MONITOR_PID 2>/dev/null || true
|
||||
fi
|
||||
wait $MONITOR_PID 2>/dev/null || true
|
||||
|
||||
# Clean up container
|
||||
docker rm -f "$CLIENT_CONTAINER" > /dev/null 2>&1 || true
|
||||
|
||||
# Helper function to format bytes in human-readable format
|
||||
format_bytes() {
|
||||
local bytes=$1
|
||||
if [ $bytes -ge 1099511627776 ]; then
|
||||
awk "BEGIN {printf \"%.2f TB\", $bytes / 1099511627776}"
|
||||
elif [ $bytes -ge 1073741824 ]; then
|
||||
awk "BEGIN {printf \"%.2f GB\", $bytes / 1073741824}"
|
||||
elif [ $bytes -ge 1048576 ]; then
|
||||
awk "BEGIN {printf \"%.2f MB\", $bytes / 1048576}"
|
||||
elif [ $bytes -ge 1024 ]; then
|
||||
awk "BEGIN {printf \"%.2f KB\", $bytes / 1024}"
|
||||
else
|
||||
echo "${bytes} B"
|
||||
fi
|
||||
}
|
||||
|
||||
# Extract network summary
|
||||
if [ -f "$RESULTS_DIR/${SDK}_network.log.summary" ]; then
|
||||
source "$RESULTS_DIR/${SDK}_network.log.summary"
|
||||
RX_FORMATTED=$(format_bytes $RX_BYTES)
|
||||
TX_FORMATTED=$(format_bytes $TX_BYTES)
|
||||
TOTAL_FORMATTED=$(format_bytes $TOTAL_BYTES)
|
||||
echo ""
|
||||
echo "=== Test Results ==="
|
||||
echo "SDK: $SDK"
|
||||
echo "State: $STATE"
|
||||
echo "RX Bytes: $RX_FORMATTED ($RX_BYTES bytes)"
|
||||
echo "TX Bytes: $TX_FORMATTED ($TX_BYTES bytes)"
|
||||
echo "Total Bytes: $TOTAL_FORMATTED ($TOTAL_BYTES bytes)"
|
||||
echo ""
|
||||
echo "Results saved to: $RESULTS_DIR/${SDK}_network.log.summary"
|
||||
else
|
||||
echo "Warning: Could not find network summary file"
|
||||
fi
|
||||
|
||||
echo "Test complete for ${SDK} SDK (${STATE})"
|
||||
Executable
+57
@@ -0,0 +1,57 @@
|
||||
#!/bin/bash
|
||||
# setup.sh - Initial setup script
|
||||
|
||||
set -e
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
|
||||
echo "Setting up compression test environment..."
|
||||
|
||||
# Create results directories
|
||||
mkdir -p "$TEST_DIR/results/baseline"
|
||||
mkdir -p "$TEST_DIR/results/enabled"
|
||||
|
||||
# Note: Using host network mode, so no network creation needed
|
||||
|
||||
# Check for required tools
|
||||
echo ""
|
||||
echo "Checking for required tools..."
|
||||
|
||||
MISSING_TOOLS=()
|
||||
|
||||
if ! command -v docker >/dev/null 2>&1; then
|
||||
MISSING_TOOLS+=("docker")
|
||||
fi
|
||||
|
||||
if ! command -v docker-compose >/dev/null 2>&1 && ! docker compose version >/dev/null 2>&1; then
|
||||
MISSING_TOOLS+=("docker-compose")
|
||||
fi
|
||||
|
||||
if [ ${#MISSING_TOOLS[@]} -gt 0 ]; then
|
||||
echo "Error: Missing required tools: ${MISSING_TOOLS[*]}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "✓ All required tools are installed"
|
||||
|
||||
echo ""
|
||||
echo "Setup complete!"
|
||||
echo ""
|
||||
echo "Next steps:"
|
||||
echo "1. Build baseline images (main branch):"
|
||||
echo " git checkout main"
|
||||
echo " docker build -t go-disabled-compression -f Dockerfile.client-go .."
|
||||
echo " docker build -t typescript-disabled-compression -f Dockerfile.client-ts .."
|
||||
echo " docker build -t python-disabled-compression -f Dockerfile.client-python .."
|
||||
echo ""
|
||||
echo "2. Build compression images (current branch):"
|
||||
echo " git checkout <your-compression-branch>"
|
||||
echo " docker build -t go-enabled-compression -f Dockerfile.client-go .."
|
||||
echo " docker build -t typescript-enabled-compression -f Dockerfile.client-ts .."
|
||||
echo " docker build -t python-enabled-compression -f Dockerfile.client-python .."
|
||||
echo ""
|
||||
echo "3. Start engine and run tests:"
|
||||
echo " ./scripts/run_all_tests.sh disabled"
|
||||
echo " ./scripts/run_all_tests.sh enabled"
|
||||
echo " ./scripts/generate_report.sh"
|
||||
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Compression test script for Python SDK"""
|
||||
|
||||
import os
|
||||
import signal
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from hatchet_sdk import Context, Hatchet
|
||||
|
||||
# Create large payload (100KB)
|
||||
def create_large_payload() -> dict[str, str]:
|
||||
payload: dict[str, str] = {}
|
||||
chunk = "a" * 1000 # 1KB chunk
|
||||
for i in range(100):
|
||||
payload[f"chunk_{i}"] = chunk
|
||||
return payload
|
||||
|
||||
|
||||
def emit_events(hatchet: Hatchet, total_events: int, events_per_second: int) -> None:
|
||||
"""Emit events in a separate thread"""
|
||||
interval = 1.0 / events_per_second
|
||||
large_payload = create_large_payload()
|
||||
event_id = 0
|
||||
|
||||
print(f"Starting to emit {total_events} events...")
|
||||
|
||||
while event_id < total_events:
|
||||
event = {
|
||||
"id": event_id,
|
||||
"createdAt": datetime.now().isoformat(),
|
||||
"payload": large_payload,
|
||||
}
|
||||
|
||||
try:
|
||||
hatchet.event.push("compression-test:event", event)
|
||||
event_id += 1
|
||||
if event_id % 50 == 0:
|
||||
print(f"Emitted {event_id} events...")
|
||||
except Exception as e:
|
||||
print(f"Error pushing event {event_id}: {e}")
|
||||
|
||||
# Wait for next interval
|
||||
time.sleep(interval)
|
||||
|
||||
print(f"Finished emitting {event_id} events")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
# Namespace is set via environment variable HATCHET_CLIENT_NAMESPACE
|
||||
hatchet = Hatchet(debug=False)
|
||||
|
||||
# Get compression state from environment (default to 'enabled')
|
||||
compression_state = os.getenv("COMPRESSION_STATE", "enabled")
|
||||
workflow_name = f"{compression_state}-python"
|
||||
|
||||
# Create workflow
|
||||
workflow = hatchet.workflow(
|
||||
name=workflow_name,
|
||||
on_events=["compression-test:event"],
|
||||
)
|
||||
|
||||
@workflow.task()
|
||||
def step1(input_data: Any, ctx: Context) -> dict[str, Any]:
|
||||
# EmptyModel allows extra fields, access as attributes
|
||||
# The event data is passed as the workflow input
|
||||
event_id = getattr(input_data, "id", None)
|
||||
print(f"Processing event {event_id}")
|
||||
return {
|
||||
"processed": True,
|
||||
"eventId": event_id,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# Create worker
|
||||
worker = hatchet.worker(
|
||||
"compression-test-worker",
|
||||
slots=100,
|
||||
workflows=[workflow],
|
||||
)
|
||||
|
||||
# Get number of events from environment variable
|
||||
total_events = int(os.getenv("TEST_EVENTS_COUNT", "10"))
|
||||
events_per_second = 10
|
||||
# Calculate duration needed to send all events
|
||||
duration = max(1, total_events / events_per_second) # At least 1 second
|
||||
|
||||
# Calculate total wait time (worker registration + event emission + processing buffer)
|
||||
wait_time = int(duration) + 15 # 5s for registration + duration + 10s buffer
|
||||
|
||||
# Set up signal handler to stop worker after test duration
|
||||
def stop_worker_after_delay():
|
||||
time.sleep(wait_time)
|
||||
print("Test complete, stopping worker...")
|
||||
# Send SIGTERM to current process to trigger worker shutdown
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
|
||||
# Start timer to stop worker after test duration
|
||||
stop_timer = threading.Timer(wait_time, lambda: os.kill(os.getpid(), signal.SIGTERM))
|
||||
stop_timer.daemon = True
|
||||
stop_timer.start()
|
||||
|
||||
# Start emitting events in a separate thread
|
||||
emit_thread = threading.Thread(
|
||||
target=emit_events,
|
||||
args=(hatchet, total_events, events_per_second),
|
||||
daemon=True,
|
||||
)
|
||||
|
||||
print("Starting worker...")
|
||||
print(f"Emitting {total_events} events over {duration:.1f} seconds...")
|
||||
|
||||
# Start emitting events
|
||||
emit_thread.start()
|
||||
|
||||
# Wait a moment for events to start
|
||||
time.sleep(1)
|
||||
|
||||
# Start worker (blocking call - will run until SIGTERM)
|
||||
try:
|
||||
worker.start()
|
||||
except KeyboardInterrupt:
|
||||
print("Worker stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print("Test interrupted")
|
||||
except Exception as e:
|
||||
print(f"Test failed: {e}")
|
||||
raise
|
||||
@@ -0,0 +1,126 @@
|
||||
import Hatchet from './dist/index';
|
||||
|
||||
// Create large payload (100KB)
|
||||
const createLargePayload = (): Record<string, string> => {
|
||||
const payload: Record<string, string> = {};
|
||||
const chunk = 'a'.repeat(1000); // 1KB chunk
|
||||
for (let i = 0; i < 100; i++) {
|
||||
payload[`chunk_${i}`] = chunk;
|
||||
}
|
||||
return payload;
|
||||
};
|
||||
|
||||
interface TestEvent {
|
||||
id: number;
|
||||
createdAt: string;
|
||||
payload: Record<string, string>;
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const hatchet = Hatchet.init({
|
||||
namespace: process.env.HATCHET_CLIENT_NAMESPACE || 'compression-test',
|
||||
});
|
||||
|
||||
const worker = await hatchet.worker('compression-test-worker', {
|
||||
slots: 100,
|
||||
});
|
||||
|
||||
// Get compression state from environment (default to 'enabled')
|
||||
const compressionState = process.env.COMPRESSION_STATE || 'enabled';
|
||||
const workflowId = `${compressionState}-typescript`;
|
||||
|
||||
// Register workflow
|
||||
await worker.registerWorkflow({
|
||||
id: workflowId,
|
||||
description: 'Test workflow for compression testing',
|
||||
on: {
|
||||
event: 'compression-test:event',
|
||||
},
|
||||
steps: [
|
||||
{
|
||||
name: 'step1',
|
||||
run: async (ctx) => {
|
||||
const input = ctx.workflowInput() as TestEvent;
|
||||
console.log(`Processing event ${input.id}`);
|
||||
return {
|
||||
processed: true,
|
||||
eventId: input.id,
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
// Get number of events from environment variable
|
||||
const totalEvents = parseInt(process.env.TEST_EVENTS_COUNT || '10', 10);
|
||||
const eventsPerSecond = 10;
|
||||
const interval = 1000 / eventsPerSecond; // 100ms between events
|
||||
const duration = Math.max(1000, (totalEvents / eventsPerSecond) * 1000); // Calculate duration from events
|
||||
|
||||
// Start worker in background (don't await - it's blocking)
|
||||
console.log('Starting worker...');
|
||||
const workerPromise = worker.start().catch((error) => {
|
||||
console.error('Worker error:', error);
|
||||
});
|
||||
|
||||
// Wait for worker to register
|
||||
await new Promise((resolve) => setTimeout(resolve, 5000));
|
||||
|
||||
console.log(`Emitting ${totalEvents} events over ${duration / 1000} seconds...`);
|
||||
|
||||
const largePayload = createLargePayload();
|
||||
let eventId = 0;
|
||||
|
||||
const emitEvents = async () => {
|
||||
while (eventId < totalEvents) {
|
||||
const event: TestEvent = {
|
||||
id: eventId++,
|
||||
createdAt: new Date().toISOString(),
|
||||
payload: largePayload,
|
||||
};
|
||||
|
||||
try {
|
||||
await hatchet.events.push('compression-test:event', event);
|
||||
} catch (error) {
|
||||
console.error(`Error pushing event ${eventId}:`, error);
|
||||
}
|
||||
|
||||
// Wait for next interval
|
||||
await new Promise((resolve) => setTimeout(resolve, interval));
|
||||
}
|
||||
|
||||
console.log(`Finished emitting ${eventId} events`);
|
||||
};
|
||||
|
||||
// Start emitting events and wait for completion
|
||||
await emitEvents();
|
||||
|
||||
// Wait additional time for events to be processed
|
||||
// Add buffer for processing time (events take time to execute)
|
||||
const processingBuffer = 10000; // 10 seconds buffer for processing
|
||||
const waitTime = duration + processingBuffer;
|
||||
console.log(`Waiting ${waitTime / 1000} seconds for events to be processed...`);
|
||||
await new Promise((resolve) => setTimeout(resolve, waitTime));
|
||||
|
||||
console.log('Test complete, stopping worker...');
|
||||
try {
|
||||
// Stop worker with a timeout to prevent hanging
|
||||
await Promise.race([
|
||||
worker.stop(),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Worker stop timeout')), 10000)
|
||||
)
|
||||
]);
|
||||
} catch (error) {
|
||||
console.error('Error stopping worker:', error);
|
||||
// Force exit if stop hangs
|
||||
}
|
||||
console.log('Worker stopped, exiting...');
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
main().catch((error) => {
|
||||
console.error('Test failed:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -37,6 +37,7 @@ import (
|
||||
"github.com/hatchet-dev/hatchet/pkg/logger"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
_ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
@@ -213,6 +214,7 @@ func (s *Server) Start() (func() error, error) {
|
||||
|
||||
func (s *Server) startGRPC() (func() error, error) {
|
||||
s.l.Debug().Msgf("starting grpc server on %s:%d", s.bindAddress, s.port)
|
||||
s.l.Info().Msg("gzip compression enabled for gRPC server")
|
||||
|
||||
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.bindAddress, s.port))
|
||||
|
||||
|
||||
+25
-14
@@ -13,6 +13,7 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
_ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
@@ -90,6 +91,8 @@ type ClientOpts struct {
|
||||
filesLoader filesLoaderFunc
|
||||
initWorkflows bool
|
||||
presetWorkerLabels map[string]string
|
||||
|
||||
disableGzipCompression bool
|
||||
}
|
||||
|
||||
func defaultClientOpts(token *string, cf *client.ClientConfigFile) *ClientOpts {
|
||||
@@ -121,20 +124,21 @@ func defaultClientOpts(token *string, cf *client.ClientConfigFile) *ClientOpts {
|
||||
logger := logger.NewDefaultLogger("client")
|
||||
|
||||
return &ClientOpts{
|
||||
tenantId: clientConfig.TenantId,
|
||||
token: clientConfig.Token,
|
||||
l: &logger,
|
||||
v: validator.NewDefaultValidator(),
|
||||
tls: clientConfig.TLSConfig,
|
||||
hostPort: clientConfig.GRPCBroadcastAddress,
|
||||
serverURL: clientConfig.ServerURL,
|
||||
filesLoader: types.DefaultLoader,
|
||||
namespace: clientConfig.Namespace,
|
||||
cloudRegisterID: clientConfig.CloudRegisterID,
|
||||
runnableActions: clientConfig.RunnableActions,
|
||||
noGrpcRetry: clientConfig.NoGrpcRetry,
|
||||
sharedMeta: make(map[string]string),
|
||||
presetWorkerLabels: clientConfig.PresetWorkerLabels,
|
||||
tenantId: clientConfig.TenantId,
|
||||
token: clientConfig.Token,
|
||||
l: &logger,
|
||||
v: validator.NewDefaultValidator(),
|
||||
tls: clientConfig.TLSConfig,
|
||||
hostPort: clientConfig.GRPCBroadcastAddress,
|
||||
serverURL: clientConfig.ServerURL,
|
||||
filesLoader: types.DefaultLoader,
|
||||
namespace: clientConfig.Namespace,
|
||||
cloudRegisterID: clientConfig.CloudRegisterID,
|
||||
runnableActions: clientConfig.RunnableActions,
|
||||
noGrpcRetry: clientConfig.NoGrpcRetry,
|
||||
sharedMeta: make(map[string]string),
|
||||
presetWorkerLabels: clientConfig.PresetWorkerLabels,
|
||||
disableGzipCompression: clientConfig.DisableGzipCompression,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,6 +271,13 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
|
||||
grpc.WithKeepaliveParams(keepAliveParams),
|
||||
}
|
||||
|
||||
if !opts.disableGzipCompression {
|
||||
grpcOpts = append(grpcOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
|
||||
opts.l.Info().Msg("gzip compression enabled for gRPC client")
|
||||
} else {
|
||||
opts.l.Info().Msg("gzip compression disabled for gRPC client")
|
||||
}
|
||||
|
||||
if !opts.noGrpcRetry {
|
||||
retryOnCodes := []codes.Code{
|
||||
codes.ResourceExhausted,
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
_ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec
|
||||
|
||||
dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts"
|
||||
sharedcontracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"
|
||||
|
||||
+11
-10
@@ -133,16 +133,17 @@ func GetClientConfigFromConfigFile(cf *client.ClientConfigFile) (res *client.Cli
|
||||
}
|
||||
|
||||
return &client.ClientConfig{
|
||||
TenantId: cf.TenantId,
|
||||
TLSConfig: tlsConf,
|
||||
Token: cf.Token,
|
||||
ServerURL: serverURL,
|
||||
GRPCBroadcastAddress: grpcBroadcastAddress,
|
||||
Namespace: namespace,
|
||||
CloudRegisterID: cf.CloudRegisterID,
|
||||
RunnableActions: rawRunnableActions,
|
||||
NoGrpcRetry: cf.NoGrpcRetry,
|
||||
PresetWorkerLabels: presetLabels,
|
||||
TenantId: cf.TenantId,
|
||||
TLSConfig: tlsConf,
|
||||
Token: cf.Token,
|
||||
ServerURL: serverURL,
|
||||
GRPCBroadcastAddress: grpcBroadcastAddress,
|
||||
Namespace: namespace,
|
||||
CloudRegisterID: cf.CloudRegisterID,
|
||||
RunnableActions: rawRunnableActions,
|
||||
NoGrpcRetry: cf.NoGrpcRetry,
|
||||
PresetWorkerLabels: presetLabels,
|
||||
DisableGzipCompression: cf.DisableGzipCompression,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
_ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec
|
||||
"google.golang.org/grpc/keepalive"
|
||||
grpcMetadata "google.golang.org/grpc/metadata"
|
||||
|
||||
@@ -25,10 +26,11 @@ type GRPCClient struct {
|
||||
}
|
||||
|
||||
type clientOpts struct {
|
||||
l *zerolog.Logger
|
||||
hostPort string
|
||||
token string
|
||||
tls *tls.Config
|
||||
l *zerolog.Logger
|
||||
hostPort string
|
||||
token string
|
||||
tls *tls.Config
|
||||
disableGzipCompression bool
|
||||
}
|
||||
|
||||
type GRPCClientOpt func(*clientOpts)
|
||||
@@ -58,6 +60,12 @@ func WithLogger(l *zerolog.Logger) func(*clientOpts) {
|
||||
}
|
||||
}
|
||||
|
||||
func WithDisableGzipCompression(disable bool) func(*clientOpts) {
|
||||
return func(opts *clientOpts) {
|
||||
opts.disableGzipCompression = disable
|
||||
}
|
||||
}
|
||||
|
||||
func defaultOpts() *clientOpts {
|
||||
l := logger.NewDefaultLogger("client")
|
||||
|
||||
@@ -120,6 +128,13 @@ func NewGRPCClient(fs ...GRPCClientOpt) (*GRPCClient, error) {
|
||||
grpc.WithKeepaliveParams(keepAliveParams),
|
||||
}
|
||||
|
||||
if !opts.disableGzipCompression {
|
||||
grpcOpts = append(grpcOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
|
||||
opts.l.Info().Msg("gzip compression enabled for gRPC client")
|
||||
} else {
|
||||
opts.l.Info().Msg("gzip compression disabled for gRPC client")
|
||||
}
|
||||
|
||||
conn, err := grpc.NewClient(
|
||||
opts.hostPort,
|
||||
grpcOpts...,
|
||||
|
||||
@@ -32,6 +32,8 @@ type ClientConfigFile struct {
|
||||
RawRunnableActions []string `mapstructure:"runnableActions" json:"runnableActions,omitempty"`
|
||||
|
||||
AutoscalingTarget string `mapstructure:"autoscalingTarget" json:"autoscalingTarget,omitempty"`
|
||||
|
||||
DisableGzipCompression bool `mapstructure:"disableGzipCompression" json:"disableGzipCompression,omitempty"`
|
||||
}
|
||||
|
||||
type ClientTLSConfigFile struct {
|
||||
@@ -57,6 +59,8 @@ type ClientConfig struct {
|
||||
RunnableActions []string
|
||||
|
||||
PresetWorkerLabels map[string]string
|
||||
|
||||
DisableGzipCompression bool
|
||||
}
|
||||
|
||||
func BindAllEnv(v *viper.Viper) {
|
||||
@@ -70,6 +74,7 @@ func BindAllEnv(v *viper.Viper) {
|
||||
_ = v.BindEnv("runnableActions", "HATCHET_CLOUD_ACTIONS")
|
||||
_ = v.BindEnv("noGrpcRetry", "HATCHET_CLIENT_NO_GRPC_RETRY")
|
||||
_ = v.BindEnv("autoscalingTarget", "HATCHET_CLIENT_AUTOSCALING_TARGET")
|
||||
_ = v.BindEnv("disableGzipCompression", "HATCHET_CLIENT_DISABLE_GZIP_COMPRESSION")
|
||||
|
||||
// tls options
|
||||
_ = v.BindEnv("tls.base.tlsStrategy", "HATCHET_CLIENT_TLS_STRATEGY")
|
||||
|
||||
@@ -56,6 +56,7 @@ def new_conn(config: ClientConfig, aio: bool) -> grpc.Channel | grpc.aio.Channel
|
||||
("grpc.client_idle_timeout_ms", 60 * 1000),
|
||||
("grpc.http2.max_pings_without_data", 0),
|
||||
("grpc.keepalive_permit_without_calls", 1),
|
||||
("grpc.default_compression_algorithm", grpc.Compression.Gzip),
|
||||
]
|
||||
|
||||
# Set environment variable to disable fork support. Reference: https://github.com/grpc/grpc/issues/28557
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "hatchet-sdk"
|
||||
version = "1.21.2"
|
||||
version = "1.21.3"
|
||||
description = "This is the official Python SDK for Hatchet, a distributed, fault-tolerant task queue. The SDK allows you to easily integrate Hatchet's task scheduling and workflow orchestration capabilities into your Python applications."
|
||||
authors = [
|
||||
"Alexander Belanger <alexander@hatchet.run>",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@hatchet-dev/typescript-sdk",
|
||||
"version": "1.10.2",
|
||||
"version": "1.10.3",
|
||||
"description": "Background task orchestration & visibility for developers",
|
||||
"types": "dist/index.d.ts",
|
||||
"files": [
|
||||
|
||||
@@ -17,6 +17,8 @@ export const channelFactory = (config: ClientConfig, credentials: ChannelCredent
|
||||
'grpc.keepalive_time_ms': 10 * 1000,
|
||||
// Allow keepalive pings when there are no gRPC calls.
|
||||
'grpc.keepalive_permit_without_calls': 1,
|
||||
// Enable gzip compression for all calls on this channel
|
||||
'grpc.default_compression_algorithm': 2, // 2 = Gzip compression
|
||||
});
|
||||
|
||||
export const addTokenMiddleware = (token: string) =>
|
||||
|
||||
Reference in New Issue
Block a user