mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-21 09:19:32 -05:00
non blocking ctx.Log with meaningful retries (#3106)
This commit is contained in:
+7
-1
@@ -45,6 +45,8 @@ type EventClient interface {
|
||||
|
||||
PutLog(ctx context.Context, taskRunId, msg string, level *string, taskRetryCount *int32) error
|
||||
|
||||
PutLogWithTimestamp(ctx context.Context, taskRunId, msg string, level *string, taskRetryCount *int32, createdAt *timestamppb.Timestamp) error
|
||||
|
||||
PutStreamEvent(ctx context.Context, stepRunId string, message []byte, options ...StreamEventOption) error
|
||||
}
|
||||
|
||||
@@ -193,8 +195,12 @@ func (a *eventClientImpl) BulkPush(ctx context.Context, payload []EventWithAddit
|
||||
}
|
||||
|
||||
func (a *eventClientImpl) PutLog(ctx context.Context, taskRunId, msg string, level *string, taskRetryCount *int32) error {
|
||||
return a.PutLogWithTimestamp(ctx, taskRunId, msg, level, taskRetryCount, timestamppb.Now())
|
||||
}
|
||||
|
||||
func (a *eventClientImpl) PutLogWithTimestamp(ctx context.Context, taskRunId, msg string, level *string, taskRetryCount *int32, createdAt *timestamppb.Timestamp) error {
|
||||
_, err := a.client.PutLog(a.ctx.newContext(ctx), &eventcontracts.PutLogRequest{
|
||||
CreatedAt: timestamppb.Now(),
|
||||
CreatedAt: createdAt,
|
||||
TaskRunExternalId: taskRunId,
|
||||
Message: msg,
|
||||
Level: level,
|
||||
|
||||
+35
-4
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
v1 "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"
|
||||
"github.com/hatchet-dev/hatchet/pkg/client"
|
||||
@@ -344,11 +345,41 @@ func (h *hatchetContext) Log(message string) {
|
||||
message = string(runes[:10_000])
|
||||
}
|
||||
|
||||
err := h.c.Event().PutLog(h, h.a.StepRunId, message, &infoLevel, &h.a.RetryCount)
|
||||
stepRunId := h.a.StepRunId
|
||||
retryCount := h.a.RetryCount
|
||||
createdAt := timestamppb.Now()
|
||||
|
||||
if err != nil {
|
||||
h.l.Err(err).Msg("could not put log")
|
||||
}
|
||||
go func() {
|
||||
const maxRetries = 3
|
||||
baseDelay := 100 * time.Millisecond
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var err error
|
||||
|
||||
for attempt := range maxRetries + 1 {
|
||||
if attempt > 0 {
|
||||
delay := baseDelay * time.Duration(1<<(attempt-1))
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
h.l.Warn().Err(err).Msg("log delivery timed out, abandoning")
|
||||
return
|
||||
case <-time.After(delay):
|
||||
}
|
||||
}
|
||||
|
||||
err = h.c.Event().PutLogWithTimestamp(ctx, stepRunId, message, &infoLevel, &retryCount, createdAt)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
h.l.Warn().Err(err).Msgf("failed to put log (attempt %d/%d)", attempt+1, maxRetries+1)
|
||||
}
|
||||
|
||||
h.l.Err(err).Msg("could not put log after all retries")
|
||||
}()
|
||||
}
|
||||
|
||||
// Deprecated: ReleaseSlot is an internal method used by the new Go SDK.
|
||||
|
||||
Reference in New Issue
Block a user