Files
hatchet/internal/msgqueue/rabbitmq/gzip.go
Mohammed Nafees 9e002a6c4c Pool gzip writers to reduce RabbitMQ message compression allocations (#3103)
* use sync.Pool for gzip writers to avoid memory hogging

* comments

* fix deadlock failure

* copilot suggestion

* fix formatting

* use asserts

* PR comments
2026-03-12 13:09:11 +01:00

129 lines
3.3 KiB
Go

package rabbitmq
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"sync"
)
type CompressionResult struct {
Payloads [][]byte
WasCompressed bool
OriginalSize int
CompressedSize int
// CompressionRatio is the ratio of compressed size to original size (compressed / original)
CompressionRatio float64
}
// gzipWriterPool reuses gzip.Writer instances to avoid repeated allocations.
// No explicit size cap is needed: sync.Pool is self-limiting because the Go
// runtime evicts pooled objects during GC, so the pool cannot grow unbounded.
// In practice the pool size is also bounded by the number of goroutines
// concurrently compressing, which is small for a RabbitMQ publish path.
var gzipWriterPool = sync.Pool{
New: func() any {
return gzip.NewWriter(nil)
},
}
func getPayloadSize(payloads [][]byte) int {
totalSize := 0
for _, payload := range payloads {
totalSize += len(payload)
}
return totalSize
}
// compressPayloads compresses message payloads using gzip if they exceed the minimum size threshold.
// Returns compression results including the compressed payloads and compression statistics.
func (t *MessageQueueImpl) compressPayloads(payloads [][]byte) (*CompressionResult, error) {
result := &CompressionResult{
Payloads: payloads,
WasCompressed: false,
}
if !t.compressionEnabled || len(payloads) == 0 {
return result, nil
}
// Calculate total size to determine if compression is worthwhile
totalSize := getPayloadSize(payloads)
result.OriginalSize = totalSize
// Only compress if total size exceeds threshold
if totalSize < t.compressionThreshold {
result.CompressedSize = totalSize
result.CompressionRatio = 1.0
return result, nil
}
compressed := make([][]byte, len(payloads))
compressedSize := 0
for i, payload := range payloads {
var buf bytes.Buffer
w := gzipWriterPool.Get().(*gzip.Writer)
w.Reset(&buf)
if _, err := w.Write(payload); err != nil {
w.Close()
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
}
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close gzip writer: %w", err)
}
gzipWriterPool.Put(w)
compressed[i] = buf.Bytes()
compressedSize += len(compressed[i])
}
result.Payloads = compressed
result.WasCompressed = true
result.CompressedSize = compressedSize
// Calculate compression ratio (compressed / original)
if totalSize > 0 {
result.CompressionRatio = float64(compressedSize) / float64(totalSize)
}
return result, nil
}
// decompressPayloads decompresses message payloads using gzip.
func (t *MessageQueueImpl) decompressPayloads(payloads [][]byte) ([][]byte, error) {
if len(payloads) == 0 {
return payloads, nil
}
decompressed := make([][]byte, len(payloads))
for i, payload := range payloads {
reader, err := gzip.NewReader(bytes.NewReader(payload))
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader for payload %d: %w", i, err)
}
decompressedData, err := io.ReadAll(reader)
if err != nil {
reader.Close()
return nil, fmt.Errorf("failed to read from gzip reader for payload %d: %w", i, err)
}
if err := reader.Close(); err != nil {
return nil, fmt.Errorf("failed to close gzip reader for payload %d: %w", i, err)
}
decompressed[i] = decompressedData
}
return decompressed, nil
}