feat: gzip compression for large payloads, persistent OLAP writes (#2368)

* debug: remove event pub

* add additional spans to publish message

* debug: don't publish payloads

* fix: persistent messages on olap

* add back other payloads

* remove pub buffers temporarily

* fix: correct queue

* hacky partitioning

* add back pub buffers to scheduler

* don't send no worker events

* add attributes for queue name and message id to publish

* add back pub buffers to grpc api

* remove pubs again, no worker writes though

* task processing queue hashes

* remove payloads again

* gzip compression over 5kb

* add back task controller payloads

* add back no worker requeueing event, with expirable lru cache

* add back pub buffers

* remove hash partitioned queues

* small fixes

* ignore lru cache top fn

* config vars for compression, disable by default

---------

Co-authored-by: Alexander Belanger <alexander@hatchet.run>
This commit is contained in:
matt
2025-10-08 11:44:04 -04:00
committed by GitHub
parent c48a3211b5
commit d677cb2b08
10 changed files with 295 additions and 43 deletions

View File

@@ -186,6 +186,9 @@ func (m *msgIdPubBuffer) flush() {
msgsWithErrCh := make([]*msgWithErrCh, 0)
payloadBytes := make([][]byte, 0)
var isPersistent *bool
var immediatelyExpire *bool
var retries *int
// read all messages currently in the buffer
for i := 0; i < PUB_BUFFER_SIZE; i++ {
@@ -194,6 +197,18 @@ func (m *msgIdPubBuffer) flush() {
msgsWithErrCh = append(msgsWithErrCh, msg)
payloadBytes = append(payloadBytes, msg.msg.Payloads...)
if isPersistent == nil {
isPersistent = &msg.msg.Persistent
}
if immediatelyExpire == nil {
immediatelyExpire = &msg.msg.ImmediatelyExpire
}
if retries == nil {
retries = &msg.msg.Retries
}
default:
i = PUB_BUFFER_SIZE
}
@@ -203,11 +218,25 @@ func (m *msgIdPubBuffer) flush() {
return
}
err := m.pub(&Message{
msgToSend := &Message{
TenantID: m.tenantId,
ID: m.msgId,
Payloads: payloadBytes,
})
}
if isPersistent != nil {
msgToSend.Persistent = *isPersistent
}
if immediatelyExpire != nil {
msgToSend.ImmediatelyExpire = *immediatelyExpire
}
if retries != nil {
msgToSend.Retries = *retries
}
err := m.pub(msgToSend)
for _, msgWithErrCh := range msgsWithErrCh {
if msgWithErrCh.errCh != nil {

View File

@@ -28,6 +28,9 @@ type Message struct {
// Retries is the number of retries for the task.
Retries int `json:"retries"`
// Compressed indicates whether the payloads are gzip compressed
Compressed bool `json:"compressed,omitempty"`
}
func NewTenantMessage[T any](tenantId, id string, immediatelyExpire, persistent bool, payloads ...T) (*Message, error) {

View File

@@ -0,0 +1,106 @@
package rabbitmq
import (
"bytes"
"compress/gzip"
"fmt"
"io"
)
type CompressionResult struct {
Payloads [][]byte
WasCompressed bool
OriginalSize int
CompressedSize int
// CompressionRatio is the ratio of compressed size to original size (compressed / original)
CompressionRatio float64
}
// compressPayloads compresses message payloads using gzip if they exceed the minimum size threshold.
// Returns compression results including the compressed payloads and compression statistics.
func (t *MessageQueueImpl) compressPayloads(payloads [][]byte) (*CompressionResult, error) {
result := &CompressionResult{
Payloads: payloads,
WasCompressed: false,
}
if !t.compressionEnabled || len(payloads) == 0 {
return result, nil
}
// Calculate total size to determine if compression is worthwhile
totalSize := 0
for _, payload := range payloads {
totalSize += len(payload)
}
result.OriginalSize = totalSize
// Only compress if total size exceeds threshold
if totalSize < t.compressionThreshold {
result.CompressedSize = totalSize
result.CompressionRatio = 1.0
return result, nil
}
compressed := make([][]byte, len(payloads))
compressedSize := 0
for i, payload := range payloads {
var buf bytes.Buffer
gzipWriter := gzip.NewWriter(&buf)
if _, err := gzipWriter.Write(payload); err != nil {
gzipWriter.Close()
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
}
if err := gzipWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to close gzip writer: %w", err)
}
compressed[i] = buf.Bytes()
compressedSize += len(compressed[i])
}
result.Payloads = compressed
result.WasCompressed = true
result.CompressedSize = compressedSize
// Calculate compression ratio (compressed / original)
if totalSize > 0 {
result.CompressionRatio = float64(compressedSize) / float64(totalSize)
}
return result, nil
}
// decompressPayloads decompresses message payloads using gzip.
func (t *MessageQueueImpl) decompressPayloads(payloads [][]byte) ([][]byte, error) {
if len(payloads) == 0 {
return payloads, nil
}
decompressed := make([][]byte, len(payloads))
for i, payload := range payloads {
reader, err := gzip.NewReader(bytes.NewReader(payload))
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader for payload %d: %w", i, err)
}
decompressedData, err := io.ReadAll(reader)
if err != nil {
reader.Close()
return nil, fmt.Errorf("failed to read from gzip reader for payload %d: %w", i, err)
}
if err := reader.Close(); err != nil {
return nil, fmt.Errorf("failed to close gzip reader for payload %d: %w", i, err)
}
decompressed[i] = decompressedData
}
return decompressed, nil
}

View File

@@ -45,6 +45,11 @@ type MessageQueueImpl struct {
subChannels *channelPool
deadLetterBackoff time.Duration
compressionEnabled bool
// compressionThreshold is the minimum payload size (in bytes) that will be compressed
compressionThreshold int
}
func (t *MessageQueueImpl) IsReady() bool {
@@ -61,6 +66,10 @@ type MessageQueueImplOpts struct {
deadLetterBackoff time.Duration
maxPubChannels int32
maxSubChannels int32
compressionEnabled bool
// compressionThreshold is the minimum payload size (in bytes) that will be compressed
compressionThreshold int
}
func defaultMessageQueueImplOpts() *MessageQueueImplOpts {
@@ -115,6 +124,18 @@ func WithDeadLetterBackoff(backoff time.Duration) MessageQueueImplOpt {
}
}
func WithGzipCompression(enabled bool, threshold int) MessageQueueImplOpt {
return func(opts *MessageQueueImplOpts) {
opts.compressionEnabled = enabled
if threshold <= 0 {
threshold = 5 * 1024 // default to 5KB
}
opts.compressionThreshold = threshold
}
}
// New creates a new MessageQueueImpl.
func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
ctx, cancel := context.WithCancel(context.Background())
@@ -165,6 +186,8 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
pubChannels: pubChannelPool,
subChannels: subChannelPool,
deadLetterBackoff: opts.deadLetterBackoff,
compressionEnabled: opts.compressionEnabled,
compressionThreshold: opts.compressionThreshold,
}
// create a new lru cache for tenant ids
@@ -262,13 +285,39 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
msg.SetOtelCarrier(otelCarrier)
poolCh, err := t.pubChannels.Acquire(ctx)
var compressionResult *CompressionResult
if len(msg.Payloads) > 0 {
var err error
compressionResult, err = t.compressPayloads(msg.Payloads)
if err != nil {
t.l.Error().Msgf("error compressing payloads: %v", err)
return fmt.Errorf("failed to compress payloads: %w", err)
}
if compressionResult.WasCompressed {
msg.Payloads = compressionResult.Payloads
msg.Compressed = true
t.l.Debug().Msgf("compressed payloads for message %s: original=%d bytes, compressed=%d bytes, ratio=%.2f%%",
msg.ID, compressionResult.OriginalSize, compressionResult.CompressedSize, compressionResult.CompressionRatio*100)
}
}
acquireCtx, acquireSpan := telemetry.NewSpan(ctx, "acquire_publish_channel")
poolCh, err := t.pubChannels.Acquire(acquireCtx)
if err != nil {
acquireSpan.RecordError(err)
acquireSpan.SetStatus(codes.Error, "error acquiring publish channel")
acquireSpan.End()
t.l.Error().Msgf("cannot acquire channel: %v", err)
return err
}
acquireSpan.End()
pub := poolCh.Value()
if pub.IsClosed() {
@@ -285,7 +334,7 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
t.l.Debug().Msgf("publishing msg to queue %s", q.Name())
@@ -302,13 +351,38 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
pubMsg.Expiration = "0"
}
ctx, pubSpan := telemetry.NewSpan(ctx, "publish_message")
spanAttrs := []attribute.KeyValue{
attribute.String("MessageQueueImpl.publish_message.queue_name", q.Name()),
attribute.String("MessageQueueImpl.publish_message.tenant_id", msg.TenantID),
attribute.String("MessageQueueImpl.publish_message.message_id", msg.ID),
}
// Add compression metrics if payloads were present
if compressionResult != nil && compressionResult.WasCompressed {
spanAttrs = append(spanAttrs,
attribute.Bool("MessageQueueImpl.publish_message.compressed", compressionResult.WasCompressed),
attribute.Int("MessageQueueImpl.publish_message.original_size", compressionResult.OriginalSize),
attribute.Int("MessageQueueImpl.publish_message.compressed_size", compressionResult.CompressedSize),
attribute.Float64("MessageQueueImpl.publish_message.compression_ratio", compressionResult.CompressionRatio),
)
}
pubSpan.SetAttributes(spanAttrs...)
err = pub.PublishWithContext(ctx, "", q.Name(), false, false, pubMsg)
// retry failed delivery on the next session
if err != nil {
pubSpan.RecordError(err)
pubSpan.SetStatus(codes.Error, "error publishing message")
pubSpan.End()
return err
}
pubSpan.End()
// if this is a tenant msg, publish to the tenant exchange
if (!t.disableTenantExchangePubs || msg.ID == "task-stream-event") && msg.TenantID != "" {
// determine if the tenant exchange exists
@@ -663,6 +737,19 @@ func (t *MessageQueueImpl) subscribe(
return
}
if msg.Compressed {
decompressedPayloads, err := t.decompressPayloads(msg.Payloads)
if err != nil {
t.l.Error().Msgf("error decompressing payloads: %v", err)
// reject this message
if err := rabbitMsg.Reject(false); err != nil {
t.l.Error().Msgf("error rejecting message: %v", err)
}
return
}
msg.Payloads = decompressedPayloads
}
// determine if we've hit the max number of retries
xDeath, exists := rabbitMsg.Headers["x-death"].([]interface{})