Changing event reporting to drop old messages and not grow unbounded when events can't be delivered

This commit is contained in:
Jason Fulghum
2025-08-20 16:30:42 -07:00
parent 111c34d567
commit 032a0ddca8
3 changed files with 26 additions and 6 deletions
+13
View File
@@ -21,6 +21,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/denisbrodbeck/machineid"
"github.com/sirupsen/logrus"
eventsapi "github.com/dolthub/eventsapi_schema/dolt/services/eventsapi/v1alpha1"
)
@@ -181,6 +182,18 @@ func (s *sendingThread) run() {
return
}
s.unsent = append(s.unsent, batch...)
// Events use a best-effort delivery strategy if we can't deliver events fast
// enough, we drop the oldest events and keep the most recent events. This can
// happen if network connectivity is interrupted, or if events come in too fast.
// If we don't drop events, then they can queue up unbounded in |s.unsent| and
// also cause GRPC to hold onto large buffers and eventually run out of memory.
if len(s.unsent) > maxBatchedEvents {
logrus.Warnf("too many events (%d) queued for LogEvents GRPC request; truncating to %d",
len(s.unsent), maxBatchedEvents)
s.unsent = s.unsent[len(s.unsent)-maxBatchedEvents:]
}
if s.emitter != nil {
if timer != nil && !timer.Stop() {
<-timer.C
+1 -4
View File
@@ -156,10 +156,7 @@ func (em *GrpcEmitter) LogEventsRequest(ctx context.Context, req *eventsapi.LogE
// SendLogEventsRequest sends a request using the grpc client
func (em *GrpcEmitter) sendLogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error {
_, err := em.client.LogEvents(ctx, req)
if err != nil {
return err
}
return nil
return err
}
// FileEmitter saves event requests to files
+12 -2
View File
@@ -91,21 +91,31 @@ func TestEventsCollectorEmitting(t *testing.T) {
NumLeft int
}{
{
// failingEmitter will return an error any time LogEvents is called, so no events
// will be processed. Because the collector drops old events when they can't be sent,
// we expect the number of events to be the max batch size.
"Failing",
failingEmitter{},
32*maxBatchedEvents - 1,
maxBatchedEvents,
},
{
// Similar to failingEmitter above, a nil emitter will not process any events, so
// the collector will drop the oldest events and just keep the most recent events.
"Nil",
nil,
32*maxBatchedEvents - 1,
maxBatchedEvents,
},
{
// NullEmitter returns nil for any call to LogEvent, so the number of events left
// unsent in the collector is zero.
"Null",
NullEmitter{},
0,
},
{
// contextAwareEmitter returns an error when LogEvents is called after the collector
// has been stopped. This causes the last incomplete batch (maxBatchedEvents -1) of
// events to be left unsent in the collector.
"ContextAware",
contextAwareEmitter{},
maxBatchedEvents - 1,