mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-22 10:10:07 -05:00
Fix: Chunk and recursively retry too-large message sends (#2761)
* feat: recursively split payload list into chunks * fix: use slices.Chunk and run sequentially * fix: return error if only one payload * fix: log error * fix: couple edge cases
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -294,7 +295,8 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
|
||||
|
||||
var compressionResult *CompressionResult
|
||||
|
||||
if len(msg.Payloads) > 0 {
|
||||
// don't re-compress if the message was already compressed
|
||||
if len(msg.Payloads) > 0 && !msg.Compressed {
|
||||
var err error
|
||||
compressionResult, err = t.compressPayloads(msg.Payloads)
|
||||
if err != nil {
|
||||
@@ -360,10 +362,47 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
|
||||
bodySize := len(body)
|
||||
|
||||
if bodySize > t.maxPayloadSize {
|
||||
err := fmt.Errorf("message size %d bytes exceeds maximum allowed size of %d bytes", bodySize, t.maxPayloadSize)
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, "message size exceeds maximum allowed size")
|
||||
return err
|
||||
if len(msg.Payloads) == 1 {
|
||||
err := fmt.Errorf("message size %d bytes exceeds maximum allowed size of %d bytes", bodySize, t.maxPayloadSize)
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, "message size exceeds maximum allowed size")
|
||||
return err
|
||||
}
|
||||
|
||||
// split the payload in half each time
|
||||
// can change this value to configure the number of chunks we split the payload
|
||||
// into. more chunks means more messages published, but a smaller likelihood of needing
|
||||
// to recurse multiple times, and vice versa.
|
||||
numChunks := 2
|
||||
payloadsPerChunk := len(msg.Payloads) / numChunks
|
||||
|
||||
if payloadsPerChunk < 1 {
|
||||
payloadsPerChunk = 1
|
||||
}
|
||||
|
||||
// publish chunks sequentially to avoid channel pool exhaustion
|
||||
// parallel publishing at the chunk level causes too many concurrent channel acquisitions
|
||||
for chunk := range slices.Chunk(msg.Payloads, payloadsPerChunk) {
|
||||
// recursively call pubMessage with the chunked payloads
|
||||
// if the payload chunks are still too large, this will continue to split them
|
||||
// until they are under the max size.
|
||||
err := t.pubMessage(ctx, q, &msgqueue.Message{
|
||||
ID: msg.ID,
|
||||
Payloads: chunk,
|
||||
TenantID: msg.TenantID,
|
||||
ImmediatelyExpire: msg.ImmediatelyExpire,
|
||||
Persistent: msg.Persistent,
|
||||
OtelCarrier: msg.OtelCarrier,
|
||||
Retries: msg.Retries,
|
||||
Compressed: msg.Compressed,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if bodySize > maxSizeErrorLogThreshold {
|
||||
|
||||
Reference in New Issue
Block a user