Merge pull request #5998 from kobergj/AddSSEToUserlog

Add SSE to userlog
This commit is contained in:
kobergj
2023-06-29 15:02:49 +02:00
committed by GitHub
37 changed files with 2410 additions and 37 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Add SSE Endpoint
Add a server-sent events (sse) endpoint for the userlog service
https://github.com/owncloud/ocis/pull/5998

2
go.mod
View File

@@ -280,6 +280,7 @@ require (
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/statsd_exporter v0.22.8 // indirect
github.com/r3labs/sse/v2 v2.10.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rivo/uniseg v0.4.2 // indirect
github.com/rs/cors v1.9.0 // indirect
@@ -325,6 +326,7 @@ require (
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect

5
go.sum
View File

@@ -1469,6 +1469,8 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9
github.com/prometheus/statsd_exporter v0.22.8 h1:Qo2D9ZzaQG+id9i5NYNGmbf1aa/KxKbB9aKfMS+Yib0=
github.com/prometheus/statsd_exporter v0.22.8/go.mod h1:/DzwbTEaFTE0Ojz5PqcSk6+PFHOPWGxdXVr6yC8eFOM=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
@@ -1808,6 +1810,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -2383,6 +2386,8 @@ google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cn
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/Acconut/lockfile.v1 v1.1.0/go.mod h1:6UCz3wJ8tSFUsPR6uP/j8uegEtDuEEqFxlpi0JI4Umw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -30,6 +30,10 @@ For the time being, the configuration which user related events are of interest
The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications).
## Subscribing
Additionaly to the oc10 API, the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples to server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect.
## Deleting
To delete events for an user, use a `DELETE` request to `ocs/v2.php/apps/notifications/api/v1/notifications` containing the IDs to delete.

View File

@@ -16,6 +16,7 @@ import (
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/userlog/pkg/logging"
@@ -102,6 +103,7 @@ func Server(cfg *config.Config) *cli.Command {
}
hClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", ogrpc.DefaultClient())
vClient := settingssvc.NewValueService("com.owncloud.api.settings", ogrpc.DefaultClient())
{
server, err := http.Server(
@@ -113,6 +115,7 @@ func Server(cfg *config.Config) *cli.Command {
http.Consumer(consumer),
http.GatewaySelector(gatewaySelector),
http.History(hClient),
http.Value(vClient),
http.RegisteredEvents(_registeredEvents),
)

View File

@@ -28,6 +28,8 @@ type Config struct {
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`
DisableSSE bool `yaml:"disable_sse" env:"USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer be able to connect to the sse endpoint."`
Context context.Context `yaml:"-"`
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/metrics"
"github.com/urfave/cli/v2"
@@ -29,6 +30,7 @@ type Options struct {
Consumer events.Consumer
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
HistoryClient ehsvc.EventHistoryService
ValueClient settingssvc.ValueService
RegisteredEvents []events.Unmarshaller
}
@@ -119,3 +121,10 @@ func RegisteredEvents(evs []events.Unmarshaller) Option {
o.RegisteredEvents = evs
}
}
// Value provides a function to configure the value service client
func Value(vs settingssvc.ValueService) Option {
return func(o *Options) {
o.ValueClient = vs
}
}

View File

@@ -76,6 +76,7 @@ func Server(opts ...Option) (http.Service, error) {
svc.Config(options.Config),
svc.HistoryClient(options.HistoryClient),
svc.GatewaySelector(options.GatewaySelector),
svc.ValueClient(options.ValueClient),
svc.RegisteredEvents(options.RegisteredEvents),
)
if err != nil {

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"embed"
"errors"
"fmt"
"io/fs"
"strings"
@@ -20,7 +19,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/leonelquinteros/gotext"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
)
//go:embed l10n/locale
@@ -56,7 +54,6 @@ type Converter struct {
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
machineAuthAPIKey string
serviceName string
registeredEvents map[string]events.Unmarshaller
translationPath string
// cached within one request not to query other service too much
@@ -67,13 +64,12 @@ type Converter struct {
}
// NewConverter returns a new Converter
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string, registeredEvents map[string]events.Unmarshaller) *Converter {
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string) *Converter {
return &Converter{
locale: loc,
gatewaySelector: gatewaySelector,
machineAuthAPIKey: machineAuthAPIKey,
serviceName: name,
registeredEvents: registeredEvents,
translationPath: translationPath,
spaces: make(map[string]*storageprovider.StorageSpace),
users: make(map[string]*user.User),
@@ -83,20 +79,8 @@ func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPI
}
// ConvertEvent converts an eventhistory event to an OC10Notification
func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) {
etype, ok := c.registeredEvents[event.Type]
if !ok {
// this should not happen
return OC10Notification{}, errors.New("eventtype not registered")
}
einterface, err := etype.Unmarshal(event.Event)
if err != nil {
// this shouldn't happen either
return OC10Notification{}, errors.New("cant unmarshal event")
}
switch ev := einterface.(type) {
func (c *Converter) ConvertEvent(eventid string, event interface{}) (OC10Notification, error) {
switch ev := event.(type) {
default:
return OC10Notification{}, fmt.Errorf("unknown event type: %T", ev)
// file related
@@ -104,31 +88,31 @@ func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) {
switch ev.FinishedStep {
case events.PPStepAntivirus:
res := ev.Result.(events.VirusscanResult)
return c.virusMessage(event.Id, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate)
return c.virusMessage(eventid, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate)
case events.PPStepPolicies:
return c.policiesMessage(event.Id, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now())
return c.policiesMessage(eventid, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now())
default:
return OC10Notification{}, fmt.Errorf("unknown postprocessing step: %s", ev.FinishedStep)
}
// space related
case events.SpaceDisabled:
return c.spaceMessage(event.Id, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceDeleted:
return c.spaceDeletedMessage(event.Id, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp)
return c.spaceDeletedMessage(eventid, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp)
case events.SpaceShared:
return c.spaceMessage(event.Id, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceUnshared:
return c.spaceMessage(event.Id, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceMembershipExpired:
return c.spaceMessage(event.Id, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt)
return c.spaceMessage(eventid, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt)
// share related
case events.ShareCreated:
return c.shareMessage(event.Id, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime))
return c.shareMessage(eventid, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime))
case events.ShareExpired:
return c.shareMessage(event.Id, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt)
return c.shareMessage(eventid, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt)
case events.ShareRemoved:
return c.shareMessage(event.Id, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp)
return c.shareMessage(eventid, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp)
}
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"net/http"
"github.com/cs3org/reva/v2/pkg/ctx"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
)
@@ -31,11 +32,23 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
return
}
conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.registeredEvents)
conv := ul.getConverter(r.Header.Get(HeaderAcceptLanguage))
resp := GetEventResponseOC10{}
for _, e := range evs {
noti, err := conv.ConvertEvent(e)
etype, ok := ul.registeredEvents[e.Type]
if !ok {
ul.log.Error().Str("eventid", e.Id).Str("eventtype", e.Type).Msg("event not registered")
continue
}
einterface, err := etype.Unmarshal(e.Event)
if err != nil {
ul.log.Error().Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to umarshal event")
continue
}
noti, err := conv.ConvertEvent(e.Id, einterface)
if err != nil {
ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event")
continue
@@ -49,6 +62,33 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.Write(b)
}
// HandleSSE is the GET handler for events
func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := ctx.ContextGetUser(r.Context())
if !ok {
ul.log.Error().Msg("sse: no user in context")
w.WriteHeader(http.StatusInternalServerError)
return
}
uid := u.GetId().GetOpaqueId()
if uid == "" {
ul.log.Error().Msg("sse: user in context is broken")
w.WriteHeader(http.StatusInternalServerError)
return
}
stream := ul.sse.CreateStream(uid)
stream.AutoReplay = false
// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()
ul.sse.ServeHTTP(w, r)
}
// HandleDeleteEvents is the DELETE handler for events
func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())

View File

@@ -7,6 +7,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"go-micro.dev/v4/store"
)
@@ -23,6 +24,7 @@ type Options struct {
Config *config.Config
HistoryClient ehsvc.EventHistoryService
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
ValueClient settingssvc.ValueService
RegisteredEvents []events.Unmarshaller
}
@@ -81,3 +83,9 @@ func RegisteredEvents(e []events.Unmarshaller) Option {
o.RegisteredEvents = e
}
}
func ValueClient(vs settingssvc.ValueService) Option {
return func(o *Options) {
o.ValueClient = vs
}
}

View File

@@ -18,9 +18,14 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-chi/chi/v5"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/r3labs/sse/v2"
micrometadata "go-micro.dev/v4/metadata"
"go-micro.dev/v4/store"
"google.golang.org/grpc/metadata"
)
@@ -33,6 +38,8 @@ type UserlogService struct {
cfg *config.Config
historyClient ehsvc.EventHistoryService
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
valueClient settingssvc.ValueService
sse *sse.Server
registeredEvents map[string]events.Unmarshaller
translationPath string
}
@@ -60,17 +67,26 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
cfg: o.Config,
historyClient: o.HistoryClient,
gatewaySelector: o.GatewaySelector,
valueClient: o.ValueClient,
registeredEvents: make(map[string]events.Unmarshaller),
}
if !ul.cfg.DisableSSE {
ul.sse = sse.New()
}
for _, e := range o.RegisteredEvents {
typ := reflect.TypeOf(e)
ul.registeredEvents[typ.String()] = e
}
ul.m.Route("/", func(r chi.Router) {
r.Get("/*", ul.HandleGetEvents)
r.Delete("/*", ul.HandleDeleteEvents)
ul.m.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) {
r.Get("/", ul.HandleGetEvents)
r.Delete("/", ul.HandleDeleteEvents)
if !ul.cfg.DisableSSE {
r.Get("/sse", ul.HandleSSE)
}
})
go ul.MemorizeEvents(ch)
@@ -155,7 +171,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) {
// III) store the eventID for each user
for _, id := range users {
if err := ul.addEventsToUser(id, event.ID); err != nil {
if err := ul.addEventToUser(id, event); err != nil {
ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
continue
}
@@ -219,12 +235,32 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error {
})
}
func (ul *UserlogService) addEventsToUser(userid string, eventids ...string) error {
func (ul *UserlogService) addEventToUser(userid string, event events.Event) error {
if !ul.cfg.DisableSSE {
if err := ul.sendSSE(userid, event); err != nil {
ul.log.Error().Err(err).Str("userid", userid).Str("eventid", event.ID).Msg("cannot create sse event")
}
}
return ul.alterUserEventList(userid, func(ids []string) []string {
return append(ids, eventids...)
return append(ids, event.ID)
})
}
func (ul *UserlogService) sendSSE(userid string, event events.Event) error {
ev, err := ul.getConverter(ul.getUserLocale(userid)).ConvertEvent(event.ID, event.Event)
if err != nil {
return err
}
b, err := json.Marshal(ev)
if err != nil {
return err
}
ul.sse.Publish(userid, &sse.Event{Data: b})
return nil
}
func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error {
exists := make(map[string]struct{}, len(received))
for _, e := range received {
@@ -395,6 +431,29 @@ func (ul *UserlogService) impersonate(uid *user.UserId) context.Context {
return ctx
}
func (ul *UserlogService) getUserLocale(userid string) string {
resp, err := ul.valueClient.GetValueByUniqueIdentifiers(
micrometadata.Set(context.Background(), middleware.AccountID, userid),
&settingssvc.GetValueByUniqueIdentifiersRequest{
AccountUuid: userid,
SettingId: defaults.SettingUUIDProfileLanguage,
},
)
if err != nil {
ul.log.Error().Err(err).Str("userid", userid).Msg("cannot get users locale")
return ""
}
val := resp.GetValue().GetValue().GetListValue().GetValues()
if len(val) == 0 {
return ""
}
return val[0].GetStringValue()
}
func (ul *UserlogService) getConverter(locale string) *Converter {
return NewConverter(locale, ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath)
}
func authenticate(usr *user.User, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string) (context.Context, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {

View File

@@ -22,10 +22,12 @@ import (
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/mocks"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/service"
"github.com/stretchr/testify/mock"
"go-micro.dev/v4/client"
microevents "go-micro.dev/v4/events"
microstore "go-micro.dev/v4/store"
"google.golang.org/grpc"
@@ -43,6 +45,7 @@ var _ = Describe("UserlogService", func() {
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
ehc mocks.EventHistoryService
vc settingssvc.MockValueService
)
BeforeEach(func() {
@@ -69,6 +72,9 @@ var _ = Describe("UserlogService", func() {
}, Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil)
gatewayClient.On("GetUser", mock.Anything, mock.Anything).Return(&user.GetUserResponse{User: &user.User{Id: &user.UserId{OpaqueId: "userid"}}, Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil)
gatewayClient.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil)
vc.GetValueByUniqueIdentifiersFunc = func(ctx context.Context, req *settingssvc.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settingssvc.GetValueResponse, error) {
return nil, nil
}
ul, err = service.NewUserlogService(
service.Config(cfg),
@@ -78,6 +84,7 @@ var _ = Describe("UserlogService", func() {
service.Mux(chi.NewMux()),
service.GatewaySelector(gatewaySelector),
service.HistoryClient(&ehc),
service.ValueClient(&vc),
service.RegisteredEvents([]events.Unmarshaller{
events.SpaceDisabled{},
}),

2
vendor/github.com/r3labs/sse/v2/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,2 @@
.idea
.vscode

15
vendor/github.com/r3labs/sse/v2/.golangci.yml generated vendored Normal file
View File

@@ -0,0 +1,15 @@
linters:
enable-all: true
disable:
- gofmt
- gofumpt
- goimports
- golint # deprecated
- interfacer # deprecated
- maligned # deprecated
- scopelint # deprecated
- varnamelen
linters-settings:
govet:
enable-all: true

80
vendor/github.com/r3labs/sse/v2/CONTRIBUTING.md generated vendored Normal file
View File

@@ -0,0 +1,80 @@
# Contributing guidelines
Looking to contribute something to this project? Here's how you can help:
Please take a moment to review this document in order to make the contribution process easy and effective for everyone involved.
Following these guidelines helps to communicate that you respect the time of the developers managing and developing this open source project. In return, they should reciprocate that respect in addressing your issue or assessing patches and features.
We also have a [code of conduct](https://ernest.io/conduct).
## Using the issue tracker
The issue tracker is the preferred channel for [bug reports](#bug-reports), [features requests](#feature-requests) and [submitting pull requests](#pull-requests), but please respect the following restrictions:
* Please **do not** use the issue tracker for personal support requests.
* Please **do not** derail issues. Keep the discussion on topic and
respect the opinions of others.
<a name="bugs"></a>
## Bug reports
A bug is a _demonstrable problem_ that is caused by the code in the repository.
Good bug reports are extremely helpful - thank you!
Guidelines for bug reports:
1. **Use the GitHub issue search** &mdash; check if the issue has already been
reported.
2. **Check if the issue has been fixed** &mdash; try to reproduce it using the
latest `master` or `develop` branch in the repository.
3. **Isolate the problem** &mdash; create a reduced test case and a live example.
A good bug report shouldn't leave others needing to chase you up for more
information. Please try to be as detailed as possible in your report. What is
your environment? What steps will reproduce the issue? Which environment experience the problem? What would you expect to be the outcome? All these
details will help people to fix any potential bugs.
Example:
> Short and descriptive example bug report title
>
> A summary of the issue and the environment in which it occurs. If
> suitable, include the steps required to reproduce the bug.
>
> 1. This is the first step
> 2. This is the second step
> 3. Further steps, etc.
>
> `<url>` - a link to the reduced test case
>
> Any other information you want to share that is relevant to the issue being
> reported. This might include the lines of code that you have identified as
> causing the bug, and potential solutions (and your opinions on their
> merits).
<a name="features"></a>
## Feature requests
Feature requests are welcome. But take a moment to find out whether your idea
fits with the scope and aims of the project. It's up to *you* to make a strong
case to convince the project's developers of the merits of this feature. Please
provide as much detail and context as possible.
<a name="pull-requests"></a>
## Pull requests
Good pull requests - patches, improvements, new features - are a fantastic
help. They should remain focused in scope and avoid containing unrelated
commits.
[**Please ask first**](https://ernest.io/community) before embarking on any significant pull request (e.g.
implementing features, refactoring code, porting to a different language),
otherwise you risk spending a lot of time working on something that the
project's developers might not want to merge into the project.
Please adhere to the coding conventions used throughout a project (indentation,
accurate comments, etc.) and any other requirements (such as test coverage).

373
vendor/github.com/r3labs/sse/v2/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,373 @@
Mozilla Public License Version 2.0
==================================
1. Definitions
--------------
1.1. "Contributor"
means each individual or legal entity that creates, contributes to
the creation of, or owns Covered Software.
1.2. "Contributor Version"
means the combination of the Contributions of others (if any) used
by a Contributor and that particular Contributor's Contribution.
1.3. "Contribution"
means Covered Software of a particular Contributor.
1.4. "Covered Software"
means Source Code Form to which the initial Contributor has attached
the notice in Exhibit A, the Executable Form of such Source Code
Form, and Modifications of such Source Code Form, in each case
including portions thereof.
1.5. "Incompatible With Secondary Licenses"
means
(a) that the initial Contributor has attached the notice described
in Exhibit B to the Covered Software; or
(b) that the Covered Software was made available under the terms of
version 1.1 or earlier of the License, but not also under the
terms of a Secondary License.
1.6. "Executable Form"
means any form of the work other than Source Code Form.
1.7. "Larger Work"
means a work that combines Covered Software with other material, in
a separate file or files, that is not Covered Software.
1.8. "License"
means this document.
1.9. "Licensable"
means having the right to grant, to the maximum extent possible,
whether at the time of the initial grant or subsequently, any and
all of the rights conveyed by this License.
1.10. "Modifications"
means any of the following:
(a) any file in Source Code Form that results from an addition to,
deletion from, or modification of the contents of Covered
Software; or
(b) any new file in Source Code Form that contains any Covered
Software.
1.11. "Patent Claims" of a Contributor
means any patent claim(s), including without limitation, method,
process, and apparatus claims, in any patent Licensable by such
Contributor that would be infringed, but for the grant of the
License, by the making, using, selling, offering for sale, having
made, import, or transfer of either its Contributions or its
Contributor Version.
1.12. "Secondary License"
means either the GNU General Public License, Version 2.0, the GNU
Lesser General Public License, Version 2.1, the GNU Affero General
Public License, Version 3.0, or any later versions of those
licenses.
1.13. "Source Code Form"
means the form of the work preferred for making modifications.
1.14. "You" (or "Your")
means an individual or a legal entity exercising rights under this
License. For legal entities, "You" includes any entity that
controls, is controlled by, or is under common control with You. For
purposes of this definition, "control" means (a) the power, direct
or indirect, to cause the direction or management of such entity,
whether by contract or otherwise, or (b) ownership of more than
fifty percent (50%) of the outstanding shares or beneficial
ownership of such entity.
2. License Grants and Conditions
--------------------------------
2.1. Grants
Each Contributor hereby grants You a world-wide, royalty-free,
non-exclusive license:
(a) under intellectual property rights (other than patent or trademark)
Licensable by such Contributor to use, reproduce, make available,
modify, display, perform, distribute, and otherwise exploit its
Contributions, either on an unmodified basis, with Modifications, or
as part of a Larger Work; and
(b) under Patent Claims of such Contributor to make, use, sell, offer
for sale, have made, import, and otherwise transfer either its
Contributions or its Contributor Version.
2.2. Effective Date
The licenses granted in Section 2.1 with respect to any Contribution
become effective for each Contribution on the date the Contributor first
distributes such Contribution.
2.3. Limitations on Grant Scope
The licenses granted in this Section 2 are the only rights granted under
this License. No additional rights or licenses will be implied from the
distribution or licensing of Covered Software under this License.
Notwithstanding Section 2.1(b) above, no patent license is granted by a
Contributor:
(a) for any code that a Contributor has removed from Covered Software;
or
(b) for infringements caused by: (i) Your and any other third party's
modifications of Covered Software, or (ii) the combination of its
Contributions with other software (except as part of its Contributor
Version); or
(c) under Patent Claims infringed by Covered Software in the absence of
its Contributions.
This License does not grant any rights in the trademarks, service marks,
or logos of any Contributor (except as may be necessary to comply with
the notice requirements in Section 3.4).
2.4. Subsequent Licenses
No Contributor makes additional grants as a result of Your choice to
distribute the Covered Software under a subsequent version of this
License (see Section 10.2) or under the terms of a Secondary License (if
permitted under the terms of Section 3.3).
2.5. Representation
Each Contributor represents that the Contributor believes its
Contributions are its original creation(s) or it has sufficient rights
to grant the rights to its Contributions conveyed by this License.
2.6. Fair Use
This License is not intended to limit any rights You have under
applicable copyright doctrines of fair use, fair dealing, or other
equivalents.
2.7. Conditions
Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted
in Section 2.1.
3. Responsibilities
-------------------
3.1. Distribution of Source Form
All distribution of Covered Software in Source Code Form, including any
Modifications that You create or to which You contribute, must be under
the terms of this License. You must inform recipients that the Source
Code Form of the Covered Software is governed by the terms of this
License, and how they can obtain a copy of this License. You may not
attempt to alter or restrict the recipients' rights in the Source Code
Form.
3.2. Distribution of Executable Form
If You distribute Covered Software in Executable Form then:
(a) such Covered Software must also be made available in Source Code
Form, as described in Section 3.1, and You must inform recipients of
the Executable Form how they can obtain a copy of such Source Code
Form by reasonable means in a timely manner, at a charge no more
than the cost of distribution to the recipient; and
(b) You may distribute such Executable Form under the terms of this
License, or sublicense it under different terms, provided that the
license for the Executable Form does not attempt to limit or alter
the recipients' rights in the Source Code Form under this License.
3.3. Distribution of a Larger Work
You may create and distribute a Larger Work under terms of Your choice,
provided that You also comply with the requirements of this License for
the Covered Software. If the Larger Work is a combination of Covered
Software with a work governed by one or more Secondary Licenses, and the
Covered Software is not Incompatible With Secondary Licenses, this
License permits You to additionally distribute such Covered Software
under the terms of such Secondary License(s), so that the recipient of
the Larger Work may, at their option, further distribute the Covered
Software under the terms of either this License or such Secondary
License(s).
3.4. Notices
You may not remove or alter the substance of any license notices
(including copyright notices, patent notices, disclaimers of warranty,
or limitations of liability) contained within the Source Code Form of
the Covered Software, except that You may alter any license notices to
the extent required to remedy known factual inaccuracies.
3.5. Application of Additional Terms
You may choose to offer, and to charge a fee for, warranty, support,
indemnity or liability obligations to one or more recipients of Covered
Software. However, You may do so only on Your own behalf, and not on
behalf of any Contributor. You must make it absolutely clear that any
such warranty, support, indemnity, or liability obligation is offered by
You alone, and You hereby agree to indemnify every Contributor for any
liability incurred by such Contributor as a result of warranty, support,
indemnity or liability terms You offer. You may include additional
disclaimers of warranty and limitations of liability specific to any
jurisdiction.
4. Inability to Comply Due to Statute or Regulation
---------------------------------------------------
If it is impossible for You to comply with any of the terms of this
License with respect to some or all of the Covered Software due to
statute, judicial order, or regulation then You must: (a) comply with
the terms of this License to the maximum extent possible; and (b)
describe the limitations and the code they affect. Such description must
be placed in a text file included with all distributions of the Covered
Software under this License. Except to the extent prohibited by statute
or regulation, such description must be sufficiently detailed for a
recipient of ordinary skill to be able to understand it.
5. Termination
--------------
5.1. The rights granted under this License will terminate automatically
if You fail to comply with any of its terms. However, if You become
compliant, then the rights granted under this License from a particular
Contributor are reinstated (a) provisionally, unless and until such
Contributor explicitly and finally terminates Your grants, and (b) on an
ongoing basis, if such Contributor fails to notify You of the
non-compliance by some reasonable means prior to 60 days after You have
come back into compliance. Moreover, Your grants from a particular
Contributor are reinstated on an ongoing basis if such Contributor
notifies You of the non-compliance by some reasonable means, this is the
first time You have received notice of non-compliance with this License
from such Contributor, and You become compliant prior to 30 days after
Your receipt of the notice.
5.2. If You initiate litigation against any entity by asserting a patent
infringement claim (excluding declaratory judgment actions,
counter-claims, and cross-claims) alleging that a Contributor Version
directly or indirectly infringes any patent, then the rights granted to
You by any and all Contributors for the Covered Software under Section
2.1 of this License shall terminate.
5.3. In the event of termination under Sections 5.1 or 5.2 above, all
end user license agreements (excluding distributors and resellers) which
have been validly granted by You or Your distributors under this License
prior to termination shall survive termination.
************************************************************************
* *
* 6. Disclaimer of Warranty *
* ------------------------- *
* *
* Covered Software is provided under this License on an "as is" *
* basis, without warranty of any kind, either expressed, implied, or *
* statutory, including, without limitation, warranties that the *
* Covered Software is free of defects, merchantable, fit for a *
* particular purpose or non-infringing. The entire risk as to the *
* quality and performance of the Covered Software is with You. *
* Should any Covered Software prove defective in any respect, You *
* (not any Contributor) assume the cost of any necessary servicing, *
* repair, or correction. This disclaimer of warranty constitutes an *
* essential part of this License. No use of any Covered Software is *
* authorized under this License except under this disclaimer. *
* *
************************************************************************
************************************************************************
* *
* 7. Limitation of Liability *
* -------------------------- *
* *
* Under no circumstances and under no legal theory, whether tort *
* (including negligence), contract, or otherwise, shall any *
* Contributor, or anyone who distributes Covered Software as *
* permitted above, be liable to You for any direct, indirect, *
* special, incidental, or consequential damages of any character *
* including, without limitation, damages for lost profits, loss of *
* goodwill, work stoppage, computer failure or malfunction, or any *
* and all other commercial damages or losses, even if such party *
* shall have been informed of the possibility of such damages. This *
* limitation of liability shall not apply to liability for death or *
* personal injury resulting from such party's negligence to the *
* extent applicable law prohibits such limitation. Some *
* jurisdictions do not allow the exclusion or limitation of *
* incidental or consequential damages, so this exclusion and *
* limitation may not apply to You. *
* *
************************************************************************
8. Litigation
-------------
Any litigation relating to this License may be brought only in the
courts of a jurisdiction where the defendant maintains its principal
place of business and such litigation shall be governed by laws of that
jurisdiction, without reference to its conflict-of-law provisions.
Nothing in this Section shall prevent a party's ability to bring
cross-claims or counter-claims.
9. Miscellaneous
----------------
This License represents the complete agreement concerning the subject
matter hereof. If any provision of this License is held to be
unenforceable, such provision shall be reformed only to the extent
necessary to make it enforceable. Any law or regulation which provides
that the language of a contract shall be construed against the drafter
shall not be used to construe this License against a Contributor.
10. Versions of the License
---------------------------
10.1. New Versions
Mozilla Foundation is the license steward. Except as provided in Section
10.3, no one other than the license steward has the right to modify or
publish new versions of this License. Each version will be given a
distinguishing version number.
10.2. Effect of New Versions
You may distribute the Covered Software under the terms of the version
of the License under which You originally received the Covered Software,
or under the terms of any subsequent version published by the license
steward.
10.3. Modified Versions
If you create software not governed by this License, and you want to
create a new license for such software, you may create and use a
modified version of this License if you rename the license and remove
any references to the name of the license steward (except to note that
such modified license differs from this License).
10.4. Distributing Source Code Form that is Incompatible With Secondary
Licenses
If You choose to distribute Source Code Form that is Incompatible With
Secondary Licenses under the terms of this version of the License, the
notice described in Exhibit B of this License must be attached.
Exhibit A - Source Code Form License Notice
-------------------------------------------
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
If it is not possible or desirable to put the notice in a particular
file, then You may include the notice in a location (such as a LICENSE
file in a relevant directory) where a recipient would be likely to look
for such a notice.
You may add additional accurate notices of copyright ownership.
Exhibit B - "Incompatible With Secondary Licenses" Notice
---------------------------------------------------------
This Source Code Form is "Incompatible With Secondary Licenses", as
defined by the Mozilla Public License, v. 2.0.

20
vendor/github.com/r3labs/sse/v2/Makefile generated vendored Normal file
View File

@@ -0,0 +1,20 @@
install:
go install -v
build:
go build -v ./...
lint:
golint ./...
go vet ./...
test:
go test -v ./... --cover
deps:
go get -u gopkg.in/cenkalti/backoff.v1
go get -u github.com/golang/lint/golint
go get -u github.com/stretchr/testify
clean:
go clean

191
vendor/github.com/r3labs/sse/v2/README.md generated vendored Normal file
View File

@@ -0,0 +1,191 @@
# SSE - Server Sent Events Client/Server Library for Go
## Synopsis
SSE is a client/server implementation for Server Sent Events for Golang.
## Build status
* Master: [![CircleCI Master](https://circleci.com/gh/r3labs/sse.svg?style=svg)](https://circleci.com/gh/r3labs/sse)
## Quick start
To install:
```
go get github.com/r3labs/sse/v2
```
To Test:
```sh
$ make deps
$ make test
```
#### Example Server
There are two parts of the server. It is comprised of the message scheduler and a http handler function.
The messaging system is started when running:
```go
func main() {
server := sse.New()
}
```
To add a stream to this handler:
```go
func main() {
server := sse.New()
server.CreateStream("messages")
}
```
This creates a new stream inside of the scheduler. Seeing as there are no consumers, publishing a message to this channel will do nothing.
Clients can connect to this stream once the http handler is started by specifying _stream_ as a url parameter, like so:
```
http://server/events?stream=messages
```
In order to start the http server:
```go
func main() {
server := sse.New()
// Create a new Mux and set the handler
mux := http.NewServeMux()
mux.HandleFunc("/events", server.ServeHTTP)
http.ListenAndServe(":8080", mux)
}
```
To publish messages to a stream:
```go
func main() {
server := sse.New()
// Publish a payload to the stream
server.Publish("messages", &sse.Event{
Data: []byte("ping"),
})
}
```
Please note there must be a stream with the name you specify and there must be subscribers to that stream
A way to detect disconnected clients:
```go
func main() {
server := sse.New()
mux := http.NewServeMux()
mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
go func() {
// Received Browser Disconnection
<-r.Context().Done()
println("The client is disconnected here")
return
}()
server.ServeHTTP(w, r)
})
http.ListenAndServe(":8080", mux)
}
```
#### Example Client
The client exposes a way to connect to an SSE server. The client can also handle multiple events under the same url.
To create a new client:
```go
func main() {
client := sse.NewClient("http://server/events")
}
```
To subscribe to an event stream, please use the Subscribe function. This accepts the name of the stream and a handler function:
```go
func main() {
client := sse.NewClient("http://server/events")
client.Subscribe("messages", func(msg *sse.Event) {
// Got some data!
fmt.Println(msg.Data)
})
}
```
Please note that this function will block the current thread. You can run this function in a go routine.
If you wish to have events sent to a channel, you can use SubscribeChan:
```go
func main() {
events := make(chan *sse.Event)
client := sse.NewClient("http://server/events")
client.SubscribeChan("messages", events)
}
```
#### HTTP client parameters
To add additional parameters to the http client, such as disabling ssl verification for self signed certs, you can override the http client or update its options:
```go
func main() {
client := sse.NewClient("http://server/events")
client.Connection.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
}
```
#### URL query parameters
To set custom query parameters on the client or disable the stream parameter altogether:
```go
func main() {
client := sse.NewClient("http://server/events?search=example")
client.SubscribeRaw(func(msg *sse.Event) {
// Got some data!
fmt.Println(msg.Data)
})
}
```
## Contributing
Please read through our
[contributing guidelines](CONTRIBUTING.md).
Included are directions for opening issues, coding standards, and notes on
development.
Moreover, if your pull request contains patches or features, you must include
relevant unit tests.
## Versioning
For transparency into our release cycle and in striving to maintain backward
compatibility, this project is maintained under [the Semantic Versioning guidelines](http://semver.org/).
## Copyright and License
Code and documentation copyright since 2015 r3labs.io authors.
Code released under
[the Mozilla Public License Version 2.0](LICENSE).

390
vendor/github.com/r3labs/sse/v2/client.go generated vendored Normal file
View File

@@ -0,0 +1,390 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"
"gopkg.in/cenkalti/backoff.v1"
)
var (
headerID = []byte("id:")
headerData = []byte("data:")
headerEvent = []byte("event:")
headerRetry = []byte("retry:")
)
func ClientMaxBufferSize(s int) func(c *Client) {
return func(c *Client) {
c.maxBufferSize = s
}
}
// ConnCallback defines a function to be called on a particular connection event
type ConnCallback func(c *Client)
// ResponseValidator validates a response
type ResponseValidator func(c *Client, resp *http.Response) error
// Client handles an incoming server stream
type Client struct {
Retry time.Time
ReconnectStrategy backoff.BackOff
disconnectcb ConnCallback
connectedcb ConnCallback
subscribed map[chan *Event]chan struct{}
Headers map[string]string
ReconnectNotify backoff.Notify
ResponseValidator ResponseValidator
Connection *http.Client
URL string
LastEventID atomic.Value // []byte
maxBufferSize int
mu sync.Mutex
EncodingBase64 bool
Connected bool
}
// NewClient creates a new client
func NewClient(url string, opts ...func(c *Client)) *Client {
c := &Client{
URL: url,
Connection: &http.Client{},
Headers: make(map[string]string),
subscribed: make(map[chan *Event]chan struct{}),
maxBufferSize: 1 << 16,
}
for _, opt := range opts {
opt(c)
}
return c
}
// Subscribe to a data stream
func (c *Client) Subscribe(stream string, handler func(msg *Event)) error {
return c.SubscribeWithContext(context.Background(), stream, handler)
}
// SubscribeWithContext to a data stream with context
func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error {
operation := func() error {
resp, err := c.request(ctx, stream)
if err != nil {
return err
}
if validator := c.ResponseValidator; validator != nil {
err = validator(c, resp)
if err != nil {
return err
}
} else if resp.StatusCode != 200 {
resp.Body.Close()
return fmt.Errorf("could not connect to stream: %s", http.StatusText(resp.StatusCode))
}
defer resp.Body.Close()
reader := NewEventStreamReader(resp.Body, c.maxBufferSize)
eventChan, errorChan := c.startReadLoop(reader)
for {
select {
case err = <-errorChan:
return err
case msg := <-eventChan:
handler(msg)
}
}
}
// Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method
var err error
if c.ReconnectStrategy != nil {
err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify)
} else {
err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify)
}
return err
}
// SubscribeChan sends all events to the provided channel
func (c *Client) SubscribeChan(stream string, ch chan *Event) error {
return c.SubscribeChanWithContext(context.Background(), stream, ch)
}
// SubscribeChanWithContext sends all events to the provided channel with context
func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch chan *Event) error {
var connected bool
errch := make(chan error)
c.mu.Lock()
c.subscribed[ch] = make(chan struct{})
c.mu.Unlock()
operation := func() error {
resp, err := c.request(ctx, stream)
if err != nil {
return err
}
if validator := c.ResponseValidator; validator != nil {
err = validator(c, resp)
if err != nil {
return err
}
} else if resp.StatusCode != 200 {
resp.Body.Close()
return fmt.Errorf("could not connect to stream: %s", http.StatusText(resp.StatusCode))
}
defer resp.Body.Close()
if !connected {
// Notify connect
errch <- nil
connected = true
}
reader := NewEventStreamReader(resp.Body, c.maxBufferSize)
eventChan, errorChan := c.startReadLoop(reader)
for {
var msg *Event
// Wait for message to arrive or exit
select {
case <-c.subscribed[ch]:
return nil
case err = <-errorChan:
return err
case msg = <-eventChan:
}
// Wait for message to be sent or exit
if msg != nil {
select {
case <-c.subscribed[ch]:
return nil
case ch <- msg:
// message sent
}
}
}
}
go func() {
defer c.cleanup(ch)
// Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method
var err error
if c.ReconnectStrategy != nil {
err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify)
} else {
err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify)
}
// channel closed once connected
if err != nil && !connected {
errch <- err
}
}()
err := <-errch
close(errch)
return err
}
func (c *Client) startReadLoop(reader *EventStreamReader) (chan *Event, chan error) {
outCh := make(chan *Event)
erChan := make(chan error)
go c.readLoop(reader, outCh, erChan)
return outCh, erChan
}
func (c *Client) readLoop(reader *EventStreamReader, outCh chan *Event, erChan chan error) {
for {
// Read each new line and process the type of event
event, err := reader.ReadEvent()
if err != nil {
if err == io.EOF {
erChan <- nil
return
}
// run user specified disconnect function
if c.disconnectcb != nil {
c.Connected = false
c.disconnectcb(c)
}
erChan <- err
return
}
if !c.Connected && c.connectedcb != nil {
c.Connected = true
c.connectedcb(c)
}
// If we get an error, ignore it.
var msg *Event
if msg, err = c.processEvent(event); err == nil {
if len(msg.ID) > 0 {
c.LastEventID.Store(msg.ID)
} else {
msg.ID, _ = c.LastEventID.Load().([]byte)
}
// Send downstream if the event has something useful
if msg.hasContent() {
outCh <- msg
}
}
}
}
// SubscribeRaw to an sse endpoint
func (c *Client) SubscribeRaw(handler func(msg *Event)) error {
return c.Subscribe("", handler)
}
// SubscribeRawWithContext to an sse endpoint with context
func (c *Client) SubscribeRawWithContext(ctx context.Context, handler func(msg *Event)) error {
return c.SubscribeWithContext(ctx, "", handler)
}
// SubscribeChanRaw sends all events to the provided channel
func (c *Client) SubscribeChanRaw(ch chan *Event) error {
return c.SubscribeChan("", ch)
}
// SubscribeChanRawWithContext sends all events to the provided channel with context
func (c *Client) SubscribeChanRawWithContext(ctx context.Context, ch chan *Event) error {
return c.SubscribeChanWithContext(ctx, "", ch)
}
// Unsubscribe unsubscribes a channel
func (c *Client) Unsubscribe(ch chan *Event) {
c.mu.Lock()
defer c.mu.Unlock()
if c.subscribed[ch] != nil {
c.subscribed[ch] <- struct{}{}
}
}
// OnDisconnect specifies the function to run when the connection disconnects
func (c *Client) OnDisconnect(fn ConnCallback) {
c.disconnectcb = fn
}
// OnConnect specifies the function to run when the connection is successful
func (c *Client) OnConnect(fn ConnCallback) {
c.connectedcb = fn
}
func (c *Client) request(ctx context.Context, stream string) (*http.Response, error) {
req, err := http.NewRequest("GET", c.URL, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
// Setup request, specify stream to connect to
if stream != "" {
query := req.URL.Query()
query.Add("stream", stream)
req.URL.RawQuery = query.Encode()
}
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Connection", "keep-alive")
lastID, exists := c.LastEventID.Load().([]byte)
if exists && lastID != nil {
req.Header.Set("Last-Event-ID", string(lastID))
}
// Add user specified headers
for k, v := range c.Headers {
req.Header.Set(k, v)
}
return c.Connection.Do(req)
}
func (c *Client) processEvent(msg []byte) (event *Event, err error) {
var e Event
if len(msg) < 1 {
return nil, errors.New("event message was empty")
}
// Normalize the crlf to lf to make it easier to split the lines.
// Split the line by "\n" or "\r", per the spec.
for _, line := range bytes.FieldsFunc(msg, func(r rune) bool { return r == '\n' || r == '\r' }) {
switch {
case bytes.HasPrefix(line, headerID):
e.ID = append([]byte(nil), trimHeader(len(headerID), line)...)
case bytes.HasPrefix(line, headerData):
// The spec allows for multiple data fields per event, concatenated them with "\n".
e.Data = append(e.Data[:], append(trimHeader(len(headerData), line), byte('\n'))...)
// The spec says that a line that simply contains the string "data" should be treated as a data field with an empty body.
case bytes.Equal(line, bytes.TrimSuffix(headerData, []byte(":"))):
e.Data = append(e.Data, byte('\n'))
case bytes.HasPrefix(line, headerEvent):
e.Event = append([]byte(nil), trimHeader(len(headerEvent), line)...)
case bytes.HasPrefix(line, headerRetry):
e.Retry = append([]byte(nil), trimHeader(len(headerRetry), line)...)
default:
// Ignore any garbage that doesn't match what we're looking for.
}
}
// Trim the last "\n" per the spec.
e.Data = bytes.TrimSuffix(e.Data, []byte("\n"))
if c.EncodingBase64 {
buf := make([]byte, base64.StdEncoding.DecodedLen(len(e.Data)))
n, err := base64.StdEncoding.Decode(buf, e.Data)
if err != nil {
err = fmt.Errorf("failed to decode event message: %s", err)
}
e.Data = buf[:n]
}
return &e, err
}
func (c *Client) cleanup(ch chan *Event) {
c.mu.Lock()
defer c.mu.Unlock()
if c.subscribed[ch] != nil {
close(c.subscribed[ch])
delete(c.subscribed, ch)
}
}
func trimHeader(size int, data []byte) []byte {
if data == nil || len(data) < size {
return data
}
data = data[size:]
// Remove optional leading whitespace
if len(data) > 0 && data[0] == 32 {
data = data[1:]
}
// Remove trailing new line
if len(data) > 0 && data[len(data)-1] == 10 {
data = data[:len(data)-1]
}
return data
}

114
vendor/github.com/r3labs/sse/v2/event.go generated vendored Normal file
View File

@@ -0,0 +1,114 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import (
"bufio"
"bytes"
"context"
"io"
"time"
)
// Event holds all of the event source fields
type Event struct {
timestamp time.Time
ID []byte
Data []byte
Event []byte
Retry []byte
Comment []byte
}
func (e *Event) hasContent() bool {
return len(e.ID) > 0 || len(e.Data) > 0 || len(e.Event) > 0 || len(e.Retry) > 0
}
// EventStreamReader scans an io.Reader looking for EventStream messages.
type EventStreamReader struct {
scanner *bufio.Scanner
}
// NewEventStreamReader creates an instance of EventStreamReader.
func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader {
scanner := bufio.NewScanner(eventStream)
initBufferSize := minPosInt(4096, maxBufferSize)
scanner.Buffer(make([]byte, initBufferSize), maxBufferSize)
split := func(data []byte, atEOF bool) (int, []byte, error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
// We have a full event payload to parse.
if i, nlen := containsDoubleNewline(data); i >= 0 {
return i + nlen, data[0:i], nil
}
// If we're at EOF, we have all of the data.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
// Set the split function for the scanning operation.
scanner.Split(split)
return &EventStreamReader{
scanner: scanner,
}
}
// Returns a tuple containing the index of a double newline, and the number of bytes
// represented by that sequence. If no double newline is present, the first value
// will be negative.
func containsDoubleNewline(data []byte) (int, int) {
// Search for each potentially valid sequence of newline characters
crcr := bytes.Index(data, []byte("\r\r"))
lflf := bytes.Index(data, []byte("\n\n"))
crlflf := bytes.Index(data, []byte("\r\n\n"))
lfcrlf := bytes.Index(data, []byte("\n\r\n"))
crlfcrlf := bytes.Index(data, []byte("\r\n\r\n"))
// Find the earliest position of a double newline combination
minPos := minPosInt(crcr, minPosInt(lflf, minPosInt(crlflf, minPosInt(lfcrlf, crlfcrlf))))
// Detemine the length of the sequence
nlen := 2
if minPos == crlfcrlf {
nlen = 4
} else if minPos == crlflf || minPos == lfcrlf {
nlen = 3
}
return minPos, nlen
}
// Returns the minimum non-negative value out of the two values. If both
// are negative, a negative value is returned.
func minPosInt(a, b int) int {
if a < 0 {
return b
}
if b < 0 {
return a
}
if a > b {
return b
}
return a
}
// ReadEvent scans the EventStream for events.
func (e *EventStreamReader) ReadEvent() ([]byte, error) {
if e.scanner.Scan() {
event := e.scanner.Bytes()
return event, nil
}
if err := e.scanner.Err(); err != nil {
if err == context.Canceled {
return nil, io.EOF
}
return nil, err
}
return nil, io.EOF
}

43
vendor/github.com/r3labs/sse/v2/event_log.go generated vendored Normal file
View File

@@ -0,0 +1,43 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import (
"strconv"
"time"
)
// EventLog holds all of previous events
type EventLog []*Event
// Add event to eventlog
func (e *EventLog) Add(ev *Event) {
if !ev.hasContent() {
return
}
ev.ID = []byte(e.currentindex())
ev.timestamp = time.Now()
*e = append(*e, ev)
}
// Clear events from eventlog
func (e *EventLog) Clear() {
*e = nil
}
// Replay events to a subscriber
func (e *EventLog) Replay(s *Subscriber) {
for i := 0; i < len(*e); i++ {
id, _ := strconv.Atoi(string((*e)[i].ID))
if id >= s.eventid {
s.connection <- (*e)[i]
}
}
}
func (e *EventLog) currentindex() string {
return strconv.Itoa(len(*e))
}

120
vendor/github.com/r3labs/sse/v2/http.go generated vendored Normal file
View File

@@ -0,0 +1,120 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import (
"bytes"
"fmt"
"net/http"
"strconv"
"time"
)
// ServeHTTP serves new connections with events for a given stream ...
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, err := w.(http.Flusher)
if !err {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
for k, v := range s.Headers {
w.Header().Set(k, v)
}
// Get the StreamID from the URL
streamID := r.URL.Query().Get("stream")
if streamID == "" {
http.Error(w, "Please specify a stream!", http.StatusInternalServerError)
return
}
stream := s.getStream(streamID)
if stream == nil {
if !s.AutoStream {
http.Error(w, "Stream not found!", http.StatusInternalServerError)
return
}
stream = s.CreateStream(streamID)
}
eventid := 0
if id := r.Header.Get("Last-Event-ID"); id != "" {
var err error
eventid, err = strconv.Atoi(id)
if err != nil {
http.Error(w, "Last-Event-ID must be a number!", http.StatusBadRequest)
return
}
}
// Create the stream subscriber
sub := stream.addSubscriber(eventid, r.URL)
go func() {
<-r.Context().Done()
sub.close()
if s.AutoStream && !s.AutoReplay && stream.getSubscriberCount() == 0 {
s.RemoveStream(streamID)
}
}()
w.WriteHeader(http.StatusOK)
flusher.Flush()
// Push events to client
for ev := range sub.connection {
// If the data buffer is an empty string abort.
if len(ev.Data) == 0 && len(ev.Comment) == 0 {
break
}
// if the event has expired, dont send it
if s.EventTTL != 0 && time.Now().After(ev.timestamp.Add(s.EventTTL)) {
continue
}
if len(ev.Data) > 0 {
fmt.Fprintf(w, "id: %s\n", ev.ID)
if s.SplitData {
sd := bytes.Split(ev.Data, []byte("\n"))
for i := range sd {
fmt.Fprintf(w, "data: %s\n", sd[i])
}
} else {
if bytes.HasPrefix(ev.Data, []byte(":")) {
fmt.Fprintf(w, "%s\n", ev.Data)
} else {
fmt.Fprintf(w, "data: %s\n", ev.Data)
}
}
if len(ev.Event) > 0 {
fmt.Fprintf(w, "event: %s\n", ev.Event)
}
if len(ev.Retry) > 0 {
fmt.Fprintf(w, "retry: %s\n", ev.Retry)
}
}
if len(ev.Comment) > 0 {
fmt.Fprintf(w, ": %s\n", ev.Comment)
}
fmt.Fprint(w, "\n")
flusher.Flush()
}
}

156
vendor/github.com/r3labs/sse/v2/server.go generated vendored Normal file
View File

@@ -0,0 +1,156 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import (
"encoding/base64"
"sync"
"time"
)
// DefaultBufferSize size of the queue that holds the streams messages.
const DefaultBufferSize = 1024
// Server Is our main struct
type Server struct {
// Extra headers adding to the HTTP response to each client
Headers map[string]string
// Sets a ttl that prevents old events from being transmitted
EventTTL time.Duration
// Specifies the size of the message buffer for each stream
BufferSize int
// Encodes all data as base64
EncodeBase64 bool
// Splits an events data into multiple data: entries
SplitData bool
// Enables creation of a stream when a client connects
AutoStream bool
// Enables automatic replay for each new subscriber that connects
AutoReplay bool
// Specifies the function to run when client subscribe or un-subscribe
OnSubscribe func(streamID string, sub *Subscriber)
OnUnsubscribe func(streamID string, sub *Subscriber)
streams map[string]*Stream
muStreams sync.RWMutex
}
// New will create a server and setup defaults
func New() *Server {
return &Server{
BufferSize: DefaultBufferSize,
AutoStream: false,
AutoReplay: true,
streams: make(map[string]*Stream),
Headers: map[string]string{},
}
}
// NewWithCallback will create a server and setup defaults with callback function
func NewWithCallback(onSubscribe, onUnsubscribe func(streamID string, sub *Subscriber)) *Server {
return &Server{
BufferSize: DefaultBufferSize,
AutoStream: false,
AutoReplay: true,
streams: make(map[string]*Stream),
Headers: map[string]string{},
OnSubscribe: onSubscribe,
OnUnsubscribe: onUnsubscribe,
}
}
// Close shuts down the server, closes all of the streams and connections
func (s *Server) Close() {
s.muStreams.Lock()
defer s.muStreams.Unlock()
for id := range s.streams {
s.streams[id].close()
delete(s.streams, id)
}
}
// CreateStream will create a new stream and register it
func (s *Server) CreateStream(id string) *Stream {
s.muStreams.Lock()
defer s.muStreams.Unlock()
if s.streams[id] != nil {
return s.streams[id]
}
str := newStream(id, s.BufferSize, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe)
str.run()
s.streams[id] = str
return str
}
// RemoveStream will remove a stream
func (s *Server) RemoveStream(id string) {
s.muStreams.Lock()
defer s.muStreams.Unlock()
if s.streams[id] != nil {
s.streams[id].close()
delete(s.streams, id)
}
}
// StreamExists checks whether a stream by a given id exists
func (s *Server) StreamExists(id string) bool {
return s.getStream(id) != nil
}
// Publish sends a mesage to every client in a streamID.
// If the stream's buffer is full, it blocks until the message is sent out to
// all subscribers (but not necessarily arrived the clients), or when the
// stream is closed.
func (s *Server) Publish(id string, event *Event) {
stream := s.getStream(id)
if stream == nil {
return
}
select {
case <-stream.quit:
case stream.event <- s.process(event):
}
}
// TryPublish is the same as Publish except that when the operation would cause
// the call to be blocked, it simply drops the message and returns false.
// Together with a small BufferSize, it can be useful when publishing the
// latest message ASAP is more important than reliable delivery.
func (s *Server) TryPublish(id string, event *Event) bool {
stream := s.getStream(id)
if stream == nil {
return false
}
select {
case stream.event <- s.process(event):
return true
default:
return false
}
}
func (s *Server) getStream(id string) *Stream {
s.muStreams.RLock()
defer s.muStreams.RUnlock()
return s.streams[id]
}
func (s *Server) process(event *Event) *Event {
if s.EncodeBase64 {
output := make([]byte, base64.StdEncoding.EncodedLen(len(event.Data)))
base64.StdEncoding.Encode(output, event.Data)
event.Data = output
}
return event
}

153
vendor/github.com/r3labs/sse/v2/stream.go generated vendored Normal file
View File

@@ -0,0 +1,153 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import (
"net/url"
"sync"
"sync/atomic"
)
// Stream ...
type Stream struct {
ID string
event chan *Event
quit chan struct{}
quitOnce sync.Once
register chan *Subscriber
deregister chan *Subscriber
subscribers []*Subscriber
Eventlog EventLog
subscriberCount int32
// Enables replaying of eventlog to newly added subscribers
AutoReplay bool
isAutoStream bool
// Specifies the function to run when client subscribe or un-subscribe
OnSubscribe func(streamID string, sub *Subscriber)
OnUnsubscribe func(streamID string, sub *Subscriber)
}
// newStream returns a new stream
func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream {
return &Stream{
ID: id,
AutoReplay: replay,
subscribers: make([]*Subscriber, 0),
isAutoStream: isAutoStream,
register: make(chan *Subscriber),
deregister: make(chan *Subscriber),
event: make(chan *Event, buffSize),
quit: make(chan struct{}),
Eventlog: make(EventLog, 0),
OnSubscribe: onSubscribe,
OnUnsubscribe: onUnsubscribe,
}
}
func (str *Stream) run() {
go func(str *Stream) {
for {
select {
// Add new subscriber
case subscriber := <-str.register:
str.subscribers = append(str.subscribers, subscriber)
if str.AutoReplay {
str.Eventlog.Replay(subscriber)
}
// Remove closed subscriber
case subscriber := <-str.deregister:
i := str.getSubIndex(subscriber)
if i != -1 {
str.removeSubscriber(i)
}
if str.OnUnsubscribe != nil {
go str.OnUnsubscribe(str.ID, subscriber)
}
// Publish event to subscribers
case event := <-str.event:
if str.AutoReplay {
str.Eventlog.Add(event)
}
for i := range str.subscribers {
str.subscribers[i].connection <- event
}
// Shutdown if the server closes
case <-str.quit:
// remove connections
str.removeAllSubscribers()
return
}
}
}(str)
}
func (str *Stream) close() {
str.quitOnce.Do(func() {
close(str.quit)
})
}
func (str *Stream) getSubIndex(sub *Subscriber) int {
for i := range str.subscribers {
if str.subscribers[i] == sub {
return i
}
}
return -1
}
// addSubscriber will create a new subscriber on a stream
func (str *Stream) addSubscriber(eventid int, url *url.URL) *Subscriber {
atomic.AddInt32(&str.subscriberCount, 1)
sub := &Subscriber{
eventid: eventid,
quit: str.deregister,
connection: make(chan *Event, 64),
URL: url,
}
if str.isAutoStream {
sub.removed = make(chan struct{}, 1)
}
str.register <- sub
if str.OnSubscribe != nil {
go str.OnSubscribe(str.ID, sub)
}
return sub
}
func (str *Stream) removeSubscriber(i int) {
atomic.AddInt32(&str.subscriberCount, -1)
close(str.subscribers[i].connection)
if str.subscribers[i].removed != nil {
str.subscribers[i].removed <- struct{}{}
close(str.subscribers[i].removed)
}
str.subscribers = append(str.subscribers[:i], str.subscribers[i+1:]...)
}
func (str *Stream) removeAllSubscribers() {
for i := 0; i < len(str.subscribers); i++ {
close(str.subscribers[i].connection)
if str.subscribers[i].removed != nil {
str.subscribers[i].removed <- struct{}{}
close(str.subscribers[i].removed)
}
}
atomic.StoreInt32(&str.subscriberCount, 0)
str.subscribers = str.subscribers[:0]
}
func (str *Stream) getSubscriberCount() int {
return int(atomic.LoadInt32(&str.subscriberCount))
}

24
vendor/github.com/r3labs/sse/v2/subscriber.go generated vendored Normal file
View File

@@ -0,0 +1,24 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import "net/url"
// Subscriber ...
type Subscriber struct {
quit chan *Subscriber
connection chan *Event
removed chan struct{}
eventid int
URL *url.URL
}
// Close will let the stream know that the clients connection has terminated
func (s *Subscriber) close() {
s.quit <- s
if s.removed != nil {
<-s.removed
}
}

22
vendor/gopkg.in/cenkalti/backoff.v1/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,22 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

9
vendor/gopkg.in/cenkalti/backoff.v1/.travis.yml generated vendored Normal file
View File

@@ -0,0 +1,9 @@
language: go
go:
- 1.3.3
- tip
before_install:
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
script:
- $HOME/gopath/bin/goveralls -service=travis-ci

20
vendor/gopkg.in/cenkalti/backoff.v1/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2014 Cenk Altı
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

30
vendor/gopkg.in/cenkalti/backoff.v1/README.md generated vendored Normal file
View File

@@ -0,0 +1,30 @@
# Exponential Backoff [![GoDoc][godoc image]][godoc] [![Build Status][travis image]][travis] [![Coverage Status][coveralls image]][coveralls]
This is a Go port of the exponential backoff algorithm from [Google's HTTP Client Library for Java][google-http-java-client].
[Exponential backoff][exponential backoff wiki]
is an algorithm that uses feedback to multiplicatively decrease the rate of some process,
in order to gradually find an acceptable rate.
The retries exponentially increase and stop increasing when a certain threshold is met.
## Usage
See https://godoc.org/github.com/cenkalti/backoff#pkg-examples
## Contributing
* I would like to keep this library as small as possible.
* Please don't send a PR without opening an issue and discussing it first.
* If proposed change is not a common use case, I will probably not accept it.
[godoc]: https://godoc.org/github.com/cenkalti/backoff
[godoc image]: https://godoc.org/github.com/cenkalti/backoff?status.png
[travis]: https://travis-ci.org/cenkalti/backoff
[travis image]: https://travis-ci.org/cenkalti/backoff.png?branch=master
[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master
[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master
[google-http-java-client]: https://github.com/google/google-http-java-client
[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff
[advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_

66
vendor/gopkg.in/cenkalti/backoff.v1/backoff.go generated vendored Normal file
View File

@@ -0,0 +1,66 @@
// Package backoff implements backoff algorithms for retrying operations.
//
// Use Retry function for retrying operations that may fail.
// If Retry does not meet your needs,
// copy/paste the function into your project and modify as you wish.
//
// There is also Ticker type similar to time.Ticker.
// You can use it if you need to work with channels.
//
// See Examples section below for usage examples.
package backoff
import "time"
// BackOff is a backoff policy for retrying an operation.
type BackOff interface {
// NextBackOff returns the duration to wait before retrying the operation,
// or backoff.Stop to indicate that no more retries should be made.
//
// Example usage:
//
// duration := backoff.NextBackOff();
// if (duration == backoff.Stop) {
// // Do not retry operation.
// } else {
// // Sleep for duration and retry operation.
// }
//
NextBackOff() time.Duration
// Reset to initial state.
Reset()
}
// Stop indicates that no more retries should be made for use in NextBackOff().
const Stop time.Duration = -1
// ZeroBackOff is a fixed backoff policy whose backoff time is always zero,
// meaning that the operation is retried immediately without waiting, indefinitely.
type ZeroBackOff struct{}
func (b *ZeroBackOff) Reset() {}
func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 }
// StopBackOff is a fixed backoff policy that always returns backoff.Stop for
// NextBackOff(), meaning that the operation should never be retried.
type StopBackOff struct{}
func (b *StopBackOff) Reset() {}
func (b *StopBackOff) NextBackOff() time.Duration { return Stop }
// ConstantBackOff is a backoff policy that always returns the same backoff delay.
// This is in contrast to an exponential backoff policy,
// which returns a delay that grows longer as you call NextBackOff() over and over again.
type ConstantBackOff struct {
Interval time.Duration
}
func (b *ConstantBackOff) Reset() {}
func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval }
func NewConstantBackOff(d time.Duration) *ConstantBackOff {
return &ConstantBackOff{Interval: d}
}

60
vendor/gopkg.in/cenkalti/backoff.v1/context.go generated vendored Normal file
View File

@@ -0,0 +1,60 @@
package backoff
import (
"time"
"golang.org/x/net/context"
)
// BackOffContext is a backoff policy that stops retrying after the context
// is canceled.
type BackOffContext interface {
BackOff
Context() context.Context
}
type backOffContext struct {
BackOff
ctx context.Context
}
// WithContext returns a BackOffContext with context ctx
//
// ctx must not be nil
func WithContext(b BackOff, ctx context.Context) BackOffContext {
if ctx == nil {
panic("nil context")
}
if b, ok := b.(*backOffContext); ok {
return &backOffContext{
BackOff: b.BackOff,
ctx: ctx,
}
}
return &backOffContext{
BackOff: b,
ctx: ctx,
}
}
func ensureContext(b BackOff) BackOffContext {
if cb, ok := b.(BackOffContext); ok {
return cb
}
return WithContext(b, context.Background())
}
func (b *backOffContext) Context() context.Context {
return b.ctx
}
func (b *backOffContext) NextBackOff() time.Duration {
select {
case <-b.Context().Done():
return Stop
default:
return b.BackOff.NextBackOff()
}
}

156
vendor/gopkg.in/cenkalti/backoff.v1/exponential.go generated vendored Normal file
View File

@@ -0,0 +1,156 @@
package backoff
import (
"math/rand"
"time"
)
/*
ExponentialBackOff is a backoff implementation that increases the backoff
period for each retry attempt using a randomization function that grows exponentially.
NextBackOff() is calculated using the following formula:
randomized interval =
RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
In other words NextBackOff() will range between the randomization factor
percentage below and above the retry interval.
For example, given the following parameters:
RetryInterval = 2
RandomizationFactor = 0.5
Multiplier = 2
the actual backoff period used in the next retry attempt will range between 1 and 3 seconds,
multiplied by the exponential, that is, between 2 and 6 seconds.
Note: MaxInterval caps the RetryInterval and not the randomized interval.
If the time elapsed since an ExponentialBackOff instance is created goes past the
MaxElapsedTime, then the method NextBackOff() starts returning backoff.Stop.
The elapsed time can be reset by calling Reset().
Example: Given the following default arguments, for 10 tries the sequence will be,
and assuming we go over the MaxElapsedTime on the 10th try:
Request # RetryInterval (seconds) Randomized Interval (seconds)
1 0.5 [0.25, 0.75]
2 0.75 [0.375, 1.125]
3 1.125 [0.562, 1.687]
4 1.687 [0.8435, 2.53]
5 2.53 [1.265, 3.795]
6 3.795 [1.897, 5.692]
7 5.692 [2.846, 8.538]
8 8.538 [4.269, 12.807]
9 12.807 [6.403, 19.210]
10 19.210 backoff.Stop
Note: Implementation is not thread-safe.
*/
type ExponentialBackOff struct {
InitialInterval time.Duration
RandomizationFactor float64
Multiplier float64
MaxInterval time.Duration
// After MaxElapsedTime the ExponentialBackOff stops.
// It never stops if MaxElapsedTime == 0.
MaxElapsedTime time.Duration
Clock Clock
currentInterval time.Duration
startTime time.Time
random *rand.Rand
}
// Clock is an interface that returns current time for BackOff.
type Clock interface {
Now() time.Time
}
// Default values for ExponentialBackOff.
const (
DefaultInitialInterval = 500 * time.Millisecond
DefaultRandomizationFactor = 0.5
DefaultMultiplier = 1.5
DefaultMaxInterval = 60 * time.Second
DefaultMaxElapsedTime = 15 * time.Minute
)
// NewExponentialBackOff creates an instance of ExponentialBackOff using default values.
func NewExponentialBackOff() *ExponentialBackOff {
b := &ExponentialBackOff{
InitialInterval: DefaultInitialInterval,
RandomizationFactor: DefaultRandomizationFactor,
Multiplier: DefaultMultiplier,
MaxInterval: DefaultMaxInterval,
MaxElapsedTime: DefaultMaxElapsedTime,
Clock: SystemClock,
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
b.Reset()
return b
}
type systemClock struct{}
func (t systemClock) Now() time.Time {
return time.Now()
}
// SystemClock implements Clock interface that uses time.Now().
var SystemClock = systemClock{}
// Reset the interval back to the initial retry interval and restarts the timer.
func (b *ExponentialBackOff) Reset() {
b.currentInterval = b.InitialInterval
b.startTime = b.Clock.Now()
}
// NextBackOff calculates the next backoff interval using the formula:
// Randomized interval = RetryInterval +/- (RandomizationFactor * RetryInterval)
func (b *ExponentialBackOff) NextBackOff() time.Duration {
// Make sure we have not gone over the maximum elapsed time.
if b.MaxElapsedTime != 0 && b.GetElapsedTime() > b.MaxElapsedTime {
return Stop
}
defer b.incrementCurrentInterval()
if b.random == nil {
b.random = rand.New(rand.NewSource(time.Now().UnixNano()))
}
return getRandomValueFromInterval(b.RandomizationFactor, b.random.Float64(), b.currentInterval)
}
// GetElapsedTime returns the elapsed time since an ExponentialBackOff instance
// is created and is reset when Reset() is called.
//
// The elapsed time is computed using time.Now().UnixNano().
func (b *ExponentialBackOff) GetElapsedTime() time.Duration {
return b.Clock.Now().Sub(b.startTime)
}
// Increments the current interval by multiplying it with the multiplier.
func (b *ExponentialBackOff) incrementCurrentInterval() {
// Check for overflow, if overflow is detected set the current interval to the max interval.
if float64(b.currentInterval) >= float64(b.MaxInterval)/b.Multiplier {
b.currentInterval = b.MaxInterval
} else {
b.currentInterval = time.Duration(float64(b.currentInterval) * b.Multiplier)
}
}
// Returns a random value from the following interval:
// [randomizationFactor * currentInterval, randomizationFactor * currentInterval].
func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration {
var delta = randomizationFactor * float64(currentInterval)
var minInterval = float64(currentInterval) - delta
var maxInterval = float64(currentInterval) + delta
// Get a random value from the range [minInterval, maxInterval].
// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
// we want a 33% chance for selecting either 1, 2 or 3.
return time.Duration(minInterval + (random * (maxInterval - minInterval + 1)))
}

78
vendor/gopkg.in/cenkalti/backoff.v1/retry.go generated vendored Normal file
View File

@@ -0,0 +1,78 @@
package backoff
import "time"
// An Operation is executing by Retry() or RetryNotify().
// The operation will be retried using a backoff policy if it returns an error.
type Operation func() error
// Notify is a notify-on-error function. It receives an operation error and
// backoff delay if the operation failed (with an error).
//
// NOTE that if the backoff policy stated to stop retrying,
// the notify function isn't called.
type Notify func(error, time.Duration)
// Retry the operation o until it does not return error or BackOff stops.
// o is guaranteed to be run at least once.
// It is the caller's responsibility to reset b after Retry returns.
//
// If o returns a *PermanentError, the operation is not retried, and the
// wrapped error is returned.
//
// Retry sleeps the goroutine for the duration returned by BackOff after a
// failed operation returns.
func Retry(o Operation, b BackOff) error { return RetryNotify(o, b, nil) }
// RetryNotify calls notify function with the error and wait duration
// for each failed attempt before sleep.
func RetryNotify(operation Operation, b BackOff, notify Notify) error {
var err error
var next time.Duration
cb := ensureContext(b)
b.Reset()
for {
if err = operation(); err == nil {
return nil
}
if permanent, ok := err.(*PermanentError); ok {
return permanent.Err
}
if next = b.NextBackOff(); next == Stop {
return err
}
if notify != nil {
notify(err, next)
}
t := time.NewTimer(next)
select {
case <-cb.Context().Done():
t.Stop()
return err
case <-t.C:
}
}
}
// PermanentError signals that the operation should not be retried.
type PermanentError struct {
Err error
}
func (e *PermanentError) Error() string {
return e.Err.Error()
}
// Permanent wraps the given err in a *PermanentError.
func Permanent(err error) *PermanentError {
return &PermanentError{
Err: err,
}
}

81
vendor/gopkg.in/cenkalti/backoff.v1/ticker.go generated vendored Normal file
View File

@@ -0,0 +1,81 @@
package backoff
import (
"runtime"
"sync"
"time"
)
// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff.
//
// Ticks will continue to arrive when the previous operation is still running,
// so operations that take a while to fail could run in quick succession.
type Ticker struct {
C <-chan time.Time
c chan time.Time
b BackOffContext
stop chan struct{}
stopOnce sync.Once
}
// NewTicker returns a new Ticker containing a channel that will send the time at times
// specified by the BackOff argument. Ticker is guaranteed to tick at least once.
// The channel is closed when Stop method is called or BackOff stops.
func NewTicker(b BackOff) *Ticker {
c := make(chan time.Time)
t := &Ticker{
C: c,
c: c,
b: ensureContext(b),
stop: make(chan struct{}),
}
go t.run()
runtime.SetFinalizer(t, (*Ticker).Stop)
return t
}
// Stop turns off a ticker. After Stop, no more ticks will be sent.
func (t *Ticker) Stop() {
t.stopOnce.Do(func() { close(t.stop) })
}
func (t *Ticker) run() {
c := t.c
defer close(c)
t.b.Reset()
// Ticker is guaranteed to tick at least once.
afterC := t.send(time.Now())
for {
if afterC == nil {
return
}
select {
case tick := <-afterC:
afterC = t.send(tick)
case <-t.stop:
t.c = nil // Prevent future ticks from being sent to the channel.
return
case <-t.b.Context().Done():
return
}
}
}
func (t *Ticker) send(tick time.Time) <-chan time.Time {
select {
case t.c <- tick:
case <-t.stop:
return nil
}
next := t.b.NextBackOff()
if next == Stop {
t.Stop()
return nil
}
return time.After(next)
}

35
vendor/gopkg.in/cenkalti/backoff.v1/tries.go generated vendored Normal file
View File

@@ -0,0 +1,35 @@
package backoff
import "time"
/*
WithMaxTries creates a wrapper around another BackOff, which will
return Stop if NextBackOff() has been called too many times since
the last time Reset() was called
Note: Implementation is not thread-safe.
*/
func WithMaxTries(b BackOff, max uint64) BackOff {
return &backOffTries{delegate: b, maxTries: max}
}
type backOffTries struct {
delegate BackOff
maxTries uint64
numTries uint64
}
func (b *backOffTries) NextBackOff() time.Duration {
if b.maxTries > 0 {
if b.maxTries <= b.numTries {
return Stop
}
b.numTries++
}
return b.delegate.NextBackOff()
}
func (b *backOffTries) Reset() {
b.numTries = 0
b.delegate.Reset()
}

6
vendor/modules.txt vendored
View File

@@ -1555,6 +1555,9 @@ github.com/prometheus/procfs/internal/util
github.com/prometheus/statsd_exporter/pkg/level
github.com/prometheus/statsd_exporter/pkg/mapper
github.com/prometheus/statsd_exporter/pkg/mapper/fsm
# github.com/r3labs/sse/v2 v2.10.0
## explicit; go 1.13
github.com/r3labs/sse/v2
# github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
## explicit
github.com/rcrowley/go-metrics
@@ -2150,6 +2153,9 @@ google.golang.org/protobuf/types/known/fieldmaskpb
google.golang.org/protobuf/types/known/structpb
google.golang.org/protobuf/types/known/timestamppb
google.golang.org/protobuf/types/known/wrapperspb
# gopkg.in/cenkalti/backoff.v1 v1.1.0
## explicit
gopkg.in/cenkalti/backoff.v1
# gopkg.in/ini.v1 v1.67.0
## explicit
gopkg.in/ini.v1