feat(email): implement smart error categorization and adaptive retry strategy

Changes:
- Add EmailErrorType enum (Retryable, Permanent, RateLimited)
- Implement categorizeError() to analyze SMTP codes and error messages
  - Detect permanent errors (5xx codes, invalid emails)
  - Detect rate limiting (421/429/450 codes)
  - Detect temporary errors (4xx codes, network/DNS issues)
- Implement calculateRetryDelay() with adaptive backoff strategies
  - Retryable: 1min × 2^retry (standard exponential backoff)
  - RateLimited: 5min × 3^retry (aggressive backoff)
  - Permanent: no retry
- Add MarkAsFailedWithDelay() to repository for custom retry delays
- Maintain backward compatibility with existing MarkAsFailed()
This commit is contained in:
Benjamin
2025-11-23 00:14:23 +01:00
parent 93d9e2e575
commit 0c3ba254ee
2 changed files with 198 additions and 35 deletions
@@ -218,7 +218,13 @@ func (r *EmailQueueRepository) MarkAsSent(ctx context.Context, id int64) error {
}
// MarkAsFailed marks an email as failed with error details
// This method uses the default PostgreSQL exponential backoff calculation
func (r *EmailQueueRepository) MarkAsFailed(ctx context.Context, id int64, err error, shouldRetry bool) error {
return r.MarkAsFailedWithDelay(ctx, id, err, shouldRetry, 0)
}
// MarkAsFailedWithDelay marks an email as failed with error details and custom retry delay
func (r *EmailQueueRepository) MarkAsFailedWithDelay(ctx context.Context, id int64, err error, shouldRetry bool, retryDelay time.Duration) error {
errorMsg := err.Error()
errorDetails := map[string]interface{}{
@@ -233,17 +239,34 @@ func (r *EmailQueueRepository) MarkAsFailed(ctx context.Context, id int64, err e
var args []interface{}
if shouldRetry {
// If retrying, increment retry count and calculate next retry time
query = `
UPDATE email_queue
SET status = 'pending',
retry_count = retry_count + 1,
last_error = $1,
error_details = $2,
scheduled_for = calculate_next_retry_time(retry_count + 1)
WHERE id = $3 AND retry_count < max_retries
`
args = []interface{}{errorMsg, errorDetailsJSON, id}
// If retrying, increment retry count and set next retry time
// If retryDelay is 0, use PostgreSQL function for default exponential backoff
// Otherwise, use the custom delay provided by the caller
if retryDelay > 0 {
nextRetry := time.Now().Add(retryDelay)
query = `
UPDATE email_queue
SET status = 'pending',
retry_count = retry_count + 1,
last_error = $1,
error_details = $2,
scheduled_for = $3
WHERE id = $4 AND retry_count < max_retries
`
args = []interface{}{errorMsg, errorDetailsJSON, nextRetry, id}
} else {
// Use default PostgreSQL function
query = `
UPDATE email_queue
SET status = 'pending',
retry_count = retry_count + 1,
last_error = $1,
error_details = $2,
scheduled_for = calculate_next_retry_time(retry_count + 1)
WHERE id = $3 AND retry_count < max_retries
`
args = []interface{}{errorMsg, errorDetailsJSON, id}
}
} else {
// If not retrying, mark as failed
query = `
@@ -284,7 +307,10 @@ func (r *EmailQueueRepository) MarkAsFailed(ctx context.Context, id int64, err e
logger.Logger.Warn("Email max retries reached, marked as failed", "id", id)
}
logger.Logger.Debug("Email marked as failed", "id", id, "should_retry", shouldRetry)
logger.Logger.Debug("Email marked as failed",
"id", id,
"should_retry", shouldRetry,
"retry_delay", retryDelay)
return nil
}
+160 -23
View File
@@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
@@ -12,12 +13,30 @@ import (
"github.com/btouchard/ackify-ce/backend/pkg/logger"
)
// EmailErrorType represents the category of an email sending error
type EmailErrorType int
const (
// ErrorTypeRetryable represents temporary errors that should be retried
// Examples: 4xx SMTP codes, network timeouts, connection refused
ErrorTypeRetryable EmailErrorType = iota
// ErrorTypePermanent represents permanent errors that should not be retried
// Examples: 5xx SMTP codes, invalid email format, mailbox not found
ErrorTypePermanent
// ErrorTypeRateLimited represents rate limiting errors
// Examples: 429 HTTP, SMTP rate limit exceeded
ErrorTypeRateLimited
)
// QueueRepository defines the interface for email queue operations
type QueueRepository interface {
Enqueue(ctx context.Context, input models.EmailQueueInput) (*models.EmailQueueItem, error)
GetNextToProcess(ctx context.Context, limit int) ([]*models.EmailQueueItem, error)
MarkAsSent(ctx context.Context, id int64) error
MarkAsFailed(ctx context.Context, id int64, err error, shouldRetry bool) error
MarkAsFailedWithDelay(ctx context.Context, id int64, err error, shouldRetry bool, retryDelay time.Duration) error
GetRetryableEmails(ctx context.Context, limit int) ([]*models.EmailQueueItem, error)
CleanupOldEmails(ctx context.Context, olderThan time.Duration) (int64, error)
}
@@ -292,11 +311,20 @@ func (w *Worker) processEmail(ctx context.Context, item *models.EmailQueueItem)
"error", err.Error(),
"retry_count", item.RetryCount)
// Determine if we should retry
shouldRetry := item.RetryCount < item.MaxRetries && isRetryableError(err)
// Categorize error and determine retry strategy
errorType := w.categorizeError(err)
shouldRetry := item.RetryCount < item.MaxRetries && w.isRetryableError(err)
retryDelay := w.calculateRetryDelay(errorType, item.RetryCount)
// Mark as failed (with or without retry)
if markErr := w.queueRepo.MarkAsFailed(ctx, item.ID, err, shouldRetry); markErr != nil {
// Log error type for debugging
logger.Logger.Debug("Email error categorized",
"id", item.ID,
"error_type", errorType,
"should_retry", shouldRetry,
"retry_delay", retryDelay)
// Mark as failed with appropriate retry delay
if markErr := w.queueRepo.MarkAsFailedWithDelay(ctx, item.ID, err, shouldRetry, retryDelay); markErr != nil {
logger.Logger.Error("Failed to mark email as failed",
"id", item.ID,
"error", markErr.Error())
@@ -378,29 +406,138 @@ func (w *Worker) performCleanup() {
}
}
// isRetryableError determines if an error is retryable
func isRetryableError(err error) bool {
// TODO: Implement more sophisticated error detection
// For now, retry all errors except explicit data/template errors
errStr := err.Error()
// Don't retry template or data errors
if contains(errStr, "template") || contains(errStr, "unmarshal") || contains(errStr, "invalid") {
return false
// categorizeError analyzes the error and determines the error type
func (w *Worker) categorizeError(err error) EmailErrorType {
if err == nil {
return ErrorTypeRetryable
}
// Retry network and timeout errors
if contains(errStr, "timeout") || contains(errStr, "connection") || contains(errStr, "refused") {
errStr := strings.ToLower(err.Error())
// Permanent errors - SMTP 5xx codes (permanent failure)
// 550: Mailbox not found, user unknown
// 551: User not local; please try forward path
// 552: Exceeded storage allocation
// 553: Mailbox name not allowed
// 554: Transaction failed
if strings.Contains(errStr, "550") ||
strings.Contains(errStr, "551") ||
strings.Contains(errStr, "552") ||
strings.Contains(errStr, "553") ||
strings.Contains(errStr, "554") {
return ErrorTypePermanent
}
// Permanent errors - Invalid email format or data corruption
if strings.Contains(errStr, "invalid email") ||
strings.Contains(errStr, "invalid recipient") ||
strings.Contains(errStr, "invalid sender") ||
strings.Contains(errStr, "unmarshal") ||
strings.Contains(errStr, "template not found") ||
strings.Contains(errStr, "invalid template") {
return ErrorTypePermanent
}
// Rate limiting errors
// 421: Service not available (often rate limiting)
// 429: Too many requests (HTTP)
// 450: Mailbox unavailable (temporary, might be rate limit)
if strings.Contains(errStr, "421") ||
strings.Contains(errStr, "429") ||
strings.Contains(errStr, "450") ||
strings.Contains(errStr, "rate limit") ||
strings.Contains(errStr, "too many requests") ||
strings.Contains(errStr, "quota exceeded") {
return ErrorTypeRateLimited
}
// Retryable errors - SMTP 4xx codes (temporary failure)
// 451: Requested action aborted (temporary error)
// 452: Insufficient system storage
if strings.Contains(errStr, "451") ||
strings.Contains(errStr, "452") {
return ErrorTypeRetryable
}
// Retryable errors - Network and connection issues
if strings.Contains(errStr, "timeout") ||
strings.Contains(errStr, "connection refused") ||
strings.Contains(errStr, "connection reset") ||
strings.Contains(errStr, "network") ||
strings.Contains(errStr, "dial") ||
strings.Contains(errStr, "eof") ||
strings.Contains(errStr, "broken pipe") {
return ErrorTypeRetryable
}
// Retryable errors - DNS issues (temporary)
if strings.Contains(errStr, "no such host") ||
strings.Contains(errStr, "dns") {
return ErrorTypeRetryable
}
// Retryable errors - TLS/SSL issues (might be temporary)
if strings.Contains(errStr, "tls") ||
strings.Contains(errStr, "certificate") {
return ErrorTypeRetryable
}
// Default to retryable for unknown errors
// Better to retry and potentially waste resources than to lose emails
return ErrorTypeRetryable
}
// isRetryableError determines if an error is retryable based on error type
func (w *Worker) isRetryableError(err error) bool {
errorType := w.categorizeError(err)
switch errorType {
case ErrorTypePermanent:
// Never retry permanent errors
return false
case ErrorTypeRateLimited:
// Retry rate limited errors, but they will have exponential backoff
return true
case ErrorTypeRetryable:
// Retry temporary errors
return true
default:
// Default to retryable
return true
}
// Default to retry
return true
}
// contains checks if a string contains a substring (case-insensitive)
func contains(s, substr string) bool {
return len(s) > 0 && len(substr) > 0 &&
(s == substr || len(s) > len(substr) &&
(s[:len(substr)] == substr || s[len(s)-len(substr):] == substr))
// calculateRetryDelay calculates the appropriate retry delay based on error type
func (w *Worker) calculateRetryDelay(errorType EmailErrorType, retryCount int) time.Duration {
var baseDelay time.Duration
switch errorType {
case ErrorTypePermanent:
// No retry for permanent errors
return 0
case ErrorTypeRateLimited:
// Aggressive exponential backoff for rate limiting
// Start at 5 minutes, then 15min, 45min, 2h15min, etc.
// Formula: 5 * 3^retryCount minutes
baseDelay = 5 * time.Minute
multiplier := 1.0
for i := 0; i < retryCount; i++ {
multiplier *= 3
}
return time.Duration(float64(baseDelay) * multiplier)
case ErrorTypeRetryable:
// Standard exponential backoff for temporary errors
// 1min, 2min, 4min, 8min, 16min, 32min...
// Formula: 1 * 2^retryCount minutes
baseDelay = 1 * time.Minute
return baseDelay * time.Duration(1<<uint(retryCount))
default:
// Default: conservative exponential backoff
// 2min, 4min, 8min, 16min, 32min...
baseDelay = 2 * time.Minute
return baseDelay * time.Duration(1<<uint(retryCount))
}
}