go/libraries/events: Fix up a bug in timer handler for new sendingThread.

This commit is contained in:
Aaron Son
2024-02-06 16:33:53 -08:00
parent 9a3ceefdae
commit 3902282a2e
2 changed files with 82 additions and 11 deletions
+21 -11
View File
@@ -86,9 +86,9 @@ func NewCollector(version string, emitter Emitter) *Collector {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for evt := range evtCh {
for evt := range c.evtCh {
c.events = append(c.events, evt)
if len(c.events) > maxBatchedEvents {
if len(c.events) >= maxBatchedEvents {
c.st.batchCh <- c.events
c.events = nil
}
@@ -154,10 +154,7 @@ func (s *sendingThread) stop() []*eventsapi.ClientEvent {
func (s *sendingThread) run() {
defer s.wg.Done()
timer := time.NewTimer(365 * 24 * time.Hour)
if !timer.Stop() {
<-timer.C
}
var timer *time.Timer
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = time.Second
@@ -165,23 +162,36 @@ func (s *sendingThread) run() {
bo.MaxElapsedTime = 0
for {
var timerCh <-chan time.Time
if timer != nil {
timerCh = timer.C
}
select {
case batch, ok := <-s.batchCh:
if !ok {
if s.emitter != nil && len(s.unsent) > 0 {
err := s.emitter.LogEvents(s.logCtx, s.version, s.unsent)
if err == nil {
s.unsent = nil
}
}
return
}
s.unsent = append(s.unsent, batch...)
if !timer.Stop() {
<-timer.C
}
if s.emitter != nil {
timer.Reset(0)
if timer != nil && !timer.Stop() {
<-timer.C
timer.Reset(0)
} else {
timer = time.NewTimer(0)
}
}
case <-timer.C:
case <-timerCh:
err := s.emitter.LogEvents(s.logCtx, s.version, s.unsent)
if err == nil {
s.unsent = nil
bo.Reset()
timer = nil
} else {
timer.Reset(bo.NextBackOff())
}
+61
View File
@@ -15,6 +15,8 @@
package events
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
@@ -59,3 +61,62 @@ func TestEvents(t *testing.T) {
_, isTimer := clientEvents[0].Metrics[1].MetricOneof.(*eventsapi.ClientEventMetric_Duration)
assert.True(t, isTimer)
}
type failingEmitter struct {
}
func (failingEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
return errors.New("i always fail")
}
func (failingEmitter) LogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error {
return errors.New("i always fail")
}
func TestEventsCollectorEmitting(t *testing.T) {
for _, tc := range []struct {
Name string
Emitter Emitter
NumLeft int
}{
{
"Failing",
failingEmitter{},
32 * maxBatchedEvents,
},
{
"Nil",
nil,
32 * maxBatchedEvents,
},
{
"Null",
NullEmitter{},
0,
},
} {
t.Run(tc.Name, func(t *testing.T) {
collector := NewCollector("invalid", tc.Emitter)
for i := 0; i < 32*maxBatchedEvents; i++ {
remoteUrl := "https://dolthub.com/org/repo"
testEvent := NewEvent(eventsapi.ClientEventType_CLONE)
testEvent.SetAttribute(eventsapi.AttributeID_REMOTE_URL_SCHEME, remoteUrl)
counter := NewCounter(eventsapi.MetricID_METRIC_UNSPECIFIED)
counter.Inc()
testEvent.AddMetric(counter)
timer := NewTimer(eventsapi.MetricID_METRIC_UNSPECIFIED)
timer.Stop()
testEvent.AddMetric(timer)
collector.CloseEventAndAdd(testEvent)
}
clientEvents := collector.Close()
assert.Len(t, clientEvents, tc.NumLeft)
})
}
}