mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-21 09:19:32 -05:00
9e002a6c4c
* use sync.Pool for gzip writers to avoid memory hogging * comments * fix deadlock failure * copilot suggestion * fix formatting * use asserts * PR comments
129 lines
3.3 KiB
Go
129 lines
3.3 KiB
Go
package rabbitmq
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"fmt"
|
|
"io"
|
|
|
|
"sync"
|
|
)
|
|
|
|
type CompressionResult struct {
|
|
Payloads [][]byte
|
|
WasCompressed bool
|
|
OriginalSize int
|
|
CompressedSize int
|
|
|
|
// CompressionRatio is the ratio of compressed size to original size (compressed / original)
|
|
CompressionRatio float64
|
|
}
|
|
|
|
// gzipWriterPool reuses gzip.Writer instances to avoid repeated allocations.
|
|
// No explicit size cap is needed: sync.Pool is self-limiting because the Go
|
|
// runtime evicts pooled objects during GC, so the pool cannot grow unbounded.
|
|
// In practice the pool size is also bounded by the number of goroutines
|
|
// concurrently compressing, which is small for a RabbitMQ publish path.
|
|
var gzipWriterPool = sync.Pool{
|
|
New: func() any {
|
|
return gzip.NewWriter(nil)
|
|
},
|
|
}
|
|
|
|
func getPayloadSize(payloads [][]byte) int {
|
|
totalSize := 0
|
|
for _, payload := range payloads {
|
|
totalSize += len(payload)
|
|
}
|
|
return totalSize
|
|
}
|
|
|
|
// compressPayloads compresses message payloads using gzip if they exceed the minimum size threshold.
|
|
// Returns compression results including the compressed payloads and compression statistics.
|
|
func (t *MessageQueueImpl) compressPayloads(payloads [][]byte) (*CompressionResult, error) {
|
|
result := &CompressionResult{
|
|
Payloads: payloads,
|
|
WasCompressed: false,
|
|
}
|
|
|
|
if !t.compressionEnabled || len(payloads) == 0 {
|
|
return result, nil
|
|
}
|
|
|
|
// Calculate total size to determine if compression is worthwhile
|
|
totalSize := getPayloadSize(payloads)
|
|
result.OriginalSize = totalSize
|
|
|
|
// Only compress if total size exceeds threshold
|
|
if totalSize < t.compressionThreshold {
|
|
result.CompressedSize = totalSize
|
|
result.CompressionRatio = 1.0
|
|
return result, nil
|
|
}
|
|
|
|
compressed := make([][]byte, len(payloads))
|
|
compressedSize := 0
|
|
|
|
for i, payload := range payloads {
|
|
var buf bytes.Buffer
|
|
|
|
w := gzipWriterPool.Get().(*gzip.Writer)
|
|
w.Reset(&buf)
|
|
|
|
if _, err := w.Write(payload); err != nil {
|
|
w.Close()
|
|
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
|
|
}
|
|
|
|
if err := w.Close(); err != nil {
|
|
return nil, fmt.Errorf("failed to close gzip writer: %w", err)
|
|
}
|
|
|
|
gzipWriterPool.Put(w)
|
|
|
|
compressed[i] = buf.Bytes()
|
|
compressedSize += len(compressed[i])
|
|
}
|
|
|
|
result.Payloads = compressed
|
|
result.WasCompressed = true
|
|
result.CompressedSize = compressedSize
|
|
|
|
// Calculate compression ratio (compressed / original)
|
|
if totalSize > 0 {
|
|
result.CompressionRatio = float64(compressedSize) / float64(totalSize)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// decompressPayloads decompresses message payloads using gzip.
|
|
func (t *MessageQueueImpl) decompressPayloads(payloads [][]byte) ([][]byte, error) {
|
|
if len(payloads) == 0 {
|
|
return payloads, nil
|
|
}
|
|
|
|
decompressed := make([][]byte, len(payloads))
|
|
|
|
for i, payload := range payloads {
|
|
reader, err := gzip.NewReader(bytes.NewReader(payload))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create gzip reader for payload %d: %w", i, err)
|
|
}
|
|
|
|
decompressedData, err := io.ReadAll(reader)
|
|
if err != nil {
|
|
reader.Close()
|
|
return nil, fmt.Errorf("failed to read from gzip reader for payload %d: %w", i, err)
|
|
}
|
|
|
|
if err := reader.Close(); err != nil {
|
|
return nil, fmt.Errorf("failed to close gzip reader for payload %d: %w", i, err)
|
|
}
|
|
|
|
decompressed[i] = decompressedData
|
|
}
|
|
|
|
return decompressed, nil
|
|
}
|