diff --git a/go.mod b/go.mod index b112e201f..23aac0ef8 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.6.0 github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d - github.com/cs3org/reva/v2 v2.15.1-0.20230809113840-3ceaf17cf7fb + github.com/cs3org/reva/v2 v2.15.1-0.20230810092810-8d195c7859c7 github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e github.com/egirna/icap-client v0.1.1 diff --git a/go.sum b/go.sum index 8b2b629c2..be791b132 100644 --- a/go.sum +++ b/go.sum @@ -864,8 +864,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4= github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc= github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA= -github.com/cs3org/reva/v2 v2.15.1-0.20230809113840-3ceaf17cf7fb h1:ObjTDMACxkuV7pxK8zGwhqqka2ze4KI8zcNF4H6k4Yk= -github.com/cs3org/reva/v2 v2.15.1-0.20230809113840-3ceaf17cf7fb/go.mod h1:F5wAUTPMvq+ze77PU/xl7qhc21YsEIfcl2RuI4H7yJo= +github.com/cs3org/reva/v2 v2.15.1-0.20230810092810-8d195c7859c7 h1:Rc0YEihSvHXIO3UnRxErsi2uHqhk8/7AMdGGg4NERZ4= +github.com/cs3org/reva/v2 v2.15.1-0.20230810092810-8d195c7859c7/go.mod h1:F5wAUTPMvq+ze77PU/xl7qhc21YsEIfcl2RuI4H7yJo= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 5b3fac146..eb9741e12 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -104,7 +104,7 @@ func (av Antivirus) Run() error { if av.c.DebugScanOutcome != "" { av.l.Warn().Str("antivir, clamav", ">>>>>>> ANTIVIRUS_DEBUG_SCAN_OUTCOME IS SET NO ACTUAL VIRUS SCAN IS PERFORMED!") - if err := events.Publish(stream, events.PostprocessingStepFinished{ + if err := events.Publish(context.Background(), stream, events.PostprocessingStepFinished{ FinishedStep: events.PPStepAntivirus, Outcome: events.PostprocessingOutcome(av.c.DebugScanOutcome), UploadID: ev.UploadID, @@ -142,7 +142,7 @@ func (av Antivirus) Run() error { } av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Msg("File scanned") - if err := events.Publish(stream, events.PostprocessingStepFinished{ + if err := events.Publish(context.Background(), stream, events.PostprocessingStepFinished{ FinishedStep: events.PPStepAntivirus, Outcome: outcome, UploadID: ev.UploadID, @@ -195,7 +195,6 @@ func (av Antivirus) process(ev events.StartPostprocessingStep) (scanners.ScanRes } return res, err - } // download will download the file diff --git a/services/graph/pkg/service/v0/approleassignments.go b/services/graph/pkg/service/v0/approleassignments.go index 1927d05ae..fd64f6011 100644 --- a/services/graph/pkg/service/v0/approleassignments.go +++ b/services/graph/pkg/service/v0/approleassignments.go @@ -104,7 +104,7 @@ func (g Graph) CreateAppRoleAssignment(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusCreated) render.JSON(w, r, g.assignmentToAppRoleAssignment(artur.GetAssignment())) } diff --git a/services/graph/pkg/service/v0/educationclasses.go b/services/graph/pkg/service/v0/educationclasses.go index 61fb8c2e1..942e9fa98 100644 --- a/services/graph/pkg/service/v0/educationclasses.go +++ b/services/graph/pkg/service/v0/educationclasses.go @@ -185,7 +185,7 @@ func (g Graph) PatchEducationClass(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) } diff --git a/services/graph/pkg/service/v0/educationuser.go b/services/graph/pkg/service/v0/educationuser.go index e7e003c12..8ae884ee8 100644 --- a/services/graph/pkg/service/v0/educationuser.go +++ b/services/graph/pkg/service/v0/educationuser.go @@ -146,7 +146,7 @@ func (g Graph) PostEducationUser(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusOK) render.JSON(w, r, u) @@ -172,7 +172,6 @@ func (g Graph) GetEducationUser(w http.ResponseWriter, r *http.Request) { logger.Debug().Str("id", userID).Msg("calling get education user from backend") user, err := g.identityEducationBackend.GetEducationUser(r.Context(), userID) - if err != nil { logger.Debug().Err(err).Msg("could not get education user: error fetching education user from backend") errorcode.RenderError(w, r, err) @@ -288,7 +287,7 @@ func (g Graph) DeleteEducationUser(w http.ResponseWriter, r *http.Request) { return } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusNoContent) render.NoContent(w, r) @@ -368,11 +367,10 @@ func (g Graph) PatchEducationUser(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusOK) // TODO StatusNoContent when prefer=minimal is used render.JSON(w, r, u) - } func sortEducationUsers(req *godata.GoDataRequest, users []*libregraph.EducationUser) ([]*libregraph.EducationUser, error) { diff --git a/services/graph/pkg/service/v0/graph.go b/services/graph/pkg/service/v0/graph.go index 9e5f33d7e..2d3c0995c 100644 --- a/services/graph/pkg/service/v0/graph.go +++ b/services/graph/pkg/service/v0/graph.go @@ -89,9 +89,9 @@ func (g Graph) ServeHTTP(w http.ResponseWriter, r *http.Request) { g.mux.ServeHTTP(w, r) } -func (g Graph) publishEvent(ev interface{}) { +func (g Graph) publishEvent(ctx context.Context, ev interface{}) { if g.eventsPublisher != nil { - if err := events.Publish(g.eventsPublisher, ev); err != nil { + if err := events.Publish(ctx, g.eventsPublisher, ev); err != nil { g.logger.Error(). Err(err). Msg("could not publish user created event") diff --git a/services/graph/pkg/service/v0/groups.go b/services/graph/pkg/service/v0/groups.go index f25d903f1..423e1c523 100644 --- a/services/graph/pkg/service/v0/groups.go +++ b/services/graph/pkg/service/v0/groups.go @@ -95,7 +95,7 @@ func (g Graph) PostGroup(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) } render.Status(r, http.StatusOK) // FIXME 201 should return 201 created render.JSON(w, r, grp) @@ -256,7 +256,7 @@ func (g Graph) DeleteGroup(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusNoContent) render.NoContent(w, r) } @@ -366,7 +366,7 @@ func (g Graph) PostGroupMember(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusNoContent) render.NoContent(w, r) } @@ -418,7 +418,7 @@ func (g Graph) DeleteGroupMember(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusNoContent) render.NoContent(w, r) } diff --git a/services/graph/pkg/service/v0/password.go b/services/graph/pkg/service/v0/password.go index 231204088..f272feda5 100644 --- a/services/graph/pkg/service/v0/password.go +++ b/services/graph/pkg/service/v0/password.go @@ -96,6 +96,7 @@ func (g Graph) ChangeOwnPassword(w http.ResponseWriter, r *http.Request) { currentUser := revactx.ContextMustGetUser(r.Context()) g.publishEvent( + ctx, events.UserFeatureChanged{ Executant: currentUser.Id, UserID: u.Id.OpaqueId, diff --git a/services/graph/pkg/service/v0/personaldata.go b/services/graph/pkg/service/v0/personaldata.go index 77bba3ab4..435dc417c 100644 --- a/services/graph/pkg/service/v0/personaldata.go +++ b/services/graph/pkg/service/v0/personaldata.go @@ -141,7 +141,7 @@ func (g Graph) GatherPersonalData(usr *user.User, ref *provider.Reference, token errmsg = err.Error() } - if err := events.Publish(g.eventsPublisher, events.PersonalDataExtracted{ + if err := events.Publish(ctx, g.eventsPublisher, events.PersonalDataExtracted{ Executant: usr.GetId(), Timestamp: utils.TSNow(), ErrorMsg: errmsg, diff --git a/services/graph/pkg/service/v0/tags.go b/services/graph/pkg/service/v0/tags.go index 786eb0e49..24850296b 100644 --- a/services/graph/pkg/service/v0/tags.go +++ b/services/graph/pkg/service/v0/tags.go @@ -135,7 +135,7 @@ func (g Graph) AssignTags(w http.ResponseWriter, r *http.Request) { SpaceOwner: sres.Info.Owner, Executant: revaCtx.ContextMustGetUser(r.Context()).Id, } - if err := events.Publish(g.eventsPublisher, ev); err != nil { + if err := events.Publish(r.Context(), g.eventsPublisher, ev); err != nil { g.logger.Error().Err(err).Msg("Failed to publish TagsAdded event") } } @@ -230,7 +230,7 @@ func (g Graph) UnassignTags(w http.ResponseWriter, r *http.Request) { SpaceOwner: sres.Info.Owner, Executant: revaCtx.ContextMustGetUser(r.Context()).Id, } - if err := events.Publish(g.eventsPublisher, ev); err != nil { + if err := events.Publish(ctx, g.eventsPublisher, ev); err != nil { g.logger.Error().Err(err).Msg("Failed to publish TagsAdded event") } } diff --git a/services/graph/pkg/service/v0/users.go b/services/graph/pkg/service/v0/users.go index ed3b962a3..097e4fa65 100644 --- a/services/graph/pkg/service/v0/users.go +++ b/services/graph/pkg/service/v0/users.go @@ -332,7 +332,7 @@ func (g Graph) PostUser(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusOK) // FIXME 201 should return 201 created render.JSON(w, r, u) @@ -375,7 +375,6 @@ func (g Graph) GetUser(w http.ResponseWriter, r *http.Request) { logger.Debug().Str("id", userID).Msg("calling get user from backend") user, err := g.identityBackend.GetUser(r.Context(), userID, odataReq) - if err != nil { logger.Debug().Err(err).Msg("could not get user: error fetching user from backend") errorcode.RenderError(w, r, err) @@ -594,7 +593,7 @@ func (g Graph) DeleteUser(w http.ResponseWriter, r *http.Request) { return } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusNoContent) render.NoContent(w, r) @@ -725,11 +724,10 @@ func (g Graph) PatchUser(w http.ResponseWriter, r *http.Request) { if currentUser, ok := revactx.ContextGetUser(r.Context()); ok { e.Executant = currentUser.GetId() } - g.publishEvent(e) + g.publishEvent(r.Context(), e) render.Status(r, http.StatusOK) // TODO StatusNoContent when prefer=minimal is used render.JSON(w, r, u) - } const ( diff --git a/services/policies/pkg/service/event/service.go b/services/policies/pkg/service/event/service.go index 40d455e8b..a1dba4590 100644 --- a/services/policies/pkg/service/event/service.go +++ b/services/policies/pkg/service/event/service.go @@ -72,7 +72,7 @@ func (s Service) Run() error { } } - if err := events.Publish(s.stream, events.PostprocessingStepFinished{ + if err := events.Publish(context.Background(), s.stream, events.PostprocessingStepFinished{ Outcome: outcome, UploadID: ev.UploadID, ExecutingUser: ev.ExecutingUser, diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go index 999727f8c..48848f13f 100644 --- a/services/postprocessing/pkg/command/postprocessing.go +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -1,6 +1,7 @@ package command import ( + "context" "time" "github.com/cs3org/reva/v2/pkg/events" @@ -38,7 +39,7 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command { Timestamp: utils.TSNow(), } - if err := events.Publish(stream, ev); err != nil { + if err := events.Publish(context.Background(), stream, ev); err != nil { return err } diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index ab2101dcb..ef9870f6f 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -1,6 +1,7 @@ package service import ( + "context" "encoding/json" "fmt" @@ -88,7 +89,7 @@ func (pps *PostprocessingService) Run() error { pp, err = getPP(pps.store, ev.UploadID) if err != nil { if err == store.ErrNotFound { - if err := events.Publish(pps.pub, events.RestartPostprocessing{ + if err := events.Publish(context.Background(), pps.pub, events.RestartPostprocessing{ UploadID: ev.UploadID, Timestamp: ev.Timestamp, }); err != nil { @@ -109,7 +110,7 @@ func (pps *PostprocessingService) Run() error { } } if next != nil { - if err := events.Publish(pps.pub, next); err != nil { + if err := events.Publish(context.Background(), pps.pub, next); err != nil { pps.log.Error().Err(err).Msg("unable to publish event") return err // we can't publish -> we are screwed } diff --git a/services/search/pkg/search/events_test.go b/services/search/pkg/search/events_test.go index 465355bb1..bd010cb1f 100644 --- a/services/search/pkg/search/events_test.go +++ b/services/search/pkg/search/events_test.go @@ -1,6 +1,8 @@ package search_test import ( + "context" + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" "github.com/cs3org/reva/v2/pkg/events" . "github.com/onsi/ginkgo/v2" @@ -34,7 +36,7 @@ var _ = DescribeTable("events", }) } - err := events.Publish(bus, e) + err := events.Publish(context.Background(), bus, e) Expect(err).To(BeNil()) Eventually(func() int { diff --git a/services/storage-users/pkg/command/trash_bin.go b/services/storage-users/pkg/command/trash_bin.go index fdff7e1d5..0c552a699 100644 --- a/services/storage-users/pkg/command/trash_bin.go +++ b/services/storage-users/pkg/command/trash_bin.go @@ -37,7 +37,7 @@ func PurgeExpiredResources(cfg *config.Config) *cli.Command { return err } - if err := events.Publish(stream, event.PurgeTrashBin{ExecutionTime: time.Now()}); err != nil { + if err := events.Publish(c.Context, stream, event.PurgeTrashBin{ExecutionTime: time.Now()}); err != nil { return err } diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go index c3644ac34..10d898b8e 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go @@ -194,7 +194,7 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error } if ev != nil { - if err := events.Publish(publisher, ev); err != nil { + if err := events.Publish(ctx, publisher, ev); err != nil { log.Error(err) } } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/events/events.go b/vendor/github.com/cs3org/reva/v2/pkg/events/events.go index dd997c77d..662a56f2c 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/events/events.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/events/events.go @@ -19,11 +19,13 @@ package events import ( + "context" "log" "reflect" "github.com/google/uuid" "go-micro.dev/v4/events" + "go.opentelemetry.io/otel/propagation" ) var ( @@ -38,6 +40,9 @@ var ( // MetadatakeyEventID is the key used for the eventID in the metadata map of the event MetadatakeyEventID = "eventid" + + // MetadatakeyTraceParent is the key used for the traceparent in the metadata map of the event + MetadatakeyTraceParent = "traceparent" ) type ( @@ -64,9 +69,10 @@ type ( // Event is the envelope for events Event struct { - Type string - ID string - Event interface{} + Type string + ID string + TraceParent string + Event interface{} } ) @@ -102,9 +108,10 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan Event, error } outchan <- Event{ - Type: et, - ID: e.Metadata[MetadatakeyEventID], - Event: event, + Type: et, + ID: e.Metadata[MetadatakeyEventID], + TraceParent: e.Metadata[MetadatakeyTraceParent], + Event: event, } } }() @@ -123,9 +130,10 @@ func ConsumeAll(s Consumer, group string) (<-chan Event, error) { for { e := <-c outchan <- Event{ - Type: e.Metadata[MetadatakeyEventType], - ID: e.Metadata[MetadatakeyEventID], - Event: e.Payload, + Type: e.Metadata[MetadatakeyEventType], + ID: e.Metadata[MetadatakeyEventID], + TraceParent: e.Metadata[MetadatakeyTraceParent], + Event: e.Payload, } } }() @@ -134,10 +142,30 @@ func ConsumeAll(s Consumer, group string) (<-chan Event, error) { // Publish publishes the ev to the MainQueue from where it is distributed to all subscribers // NOTE: needs to use reflect on runtime -func Publish(s Publisher, ev interface{}) error { +func Publish(ctx context.Context, s Publisher, ev interface{}) error { evName := reflect.TypeOf(ev).String() + traceParent := getTraceParentFromCtx(ctx) return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{ - MetadatakeyEventType: evName, - MetadatakeyEventID: uuid.New().String(), + MetadatakeyEventType: evName, + MetadatakeyEventID: uuid.New().String(), + MetadatakeyTraceParent: traceParent, })) } + +// GetTraceContext extracts the trace context from the event and injects it into the given +// context. +func (e *Event) GetTraceContext(ctx context.Context) context.Context { + return propagation.TraceContext{}.Extract(ctx, propagation.MapCarrier{ + "traceparent": e.TraceParent, + }) +} + +// getTraceParentFromCtx will return a traceparent from the context if it exists. +// it will be a string as specificied here: https://www.w3.org/TR/trace-context/ +// If no trace info in the context, the return will be an empty string +func getTraceParentFromCtx(ctx context.Context) string { + mc := propagation.MapCarrier{} + tc := propagation.TraceContext{} + tc.Inject(ctx, &mc) + return mc["traceparent"] +} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/datatx.go b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/datatx.go index 051c8ef43..23328aadd 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/datatx.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/datatx.go @@ -51,7 +51,7 @@ func EmitFileUploadedEvent(spaceOwnerOrManager, executant *userv1beta1.UserId, r Timestamp: utils.TSNow(), } - return events.Publish(publisher, uploadedEv) + return events.Publish(context.Background(), publisher, uploadedEv) } // InvalidateCache is a helper function which invalidates the stat cache diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go index 115fe727f..b868f7c64 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go @@ -304,7 +304,7 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla // check if share already exists. key := &collaboration.ShareKey{ - //Owner: md.Owner, owner no longer matters as it belongs to the space + // Owner: md.Owner, owner no longer matters as it belongs to the space ResourceId: md.Id, Grantee: g.Grantee, } @@ -337,7 +337,6 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla eg.Go(func() error { err := m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s) - if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -348,7 +347,6 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla eg.Go(func() error { err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID) - if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -369,7 +367,6 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla State: collaboration.ShareState_SHARE_STATE_PENDING, } err := m.UserReceivedStates.Add(ctx, userid, spaceID, rs) - if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -381,7 +378,6 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla eg.Go(func() error { groupid := g.Grantee.GetGroupId().GetOpaqueId() err := m.GroupReceivedCache.Add(ctx, groupid, shareID) - if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -458,7 +454,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc log.Error().Err(err). Msg("failed to unshare expired share") } - if err := events.Publish(m.eventStream, events.ShareExpired{ + if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ ShareID: s.GetId(), ShareOwner: s.GetOwner(), ItemID: s.GetResourceId(), @@ -643,7 +639,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f log.Error().Err(err). Msg("failed to unshare expired share") } - if err := events.Publish(m.eventStream, events.ShareExpired{ + if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ ShareOwner: s.GetOwner(), ItemID: s.GetResourceId(), ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), @@ -711,7 +707,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, log.Error().Err(err). Msg("failed to unshare expired share") } - if err := events.Publish(m.eventStream, events.ShareExpired{ + if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ ShareOwner: s.GetOwner(), ItemID: s.GetResourceId(), ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), @@ -834,7 +830,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati log.Error().Err(err). Msg("failed to unshare expired share") } - if err := events.Publish(m.eventStream, events.ShareExpired{ + if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ ShareOwner: s.GetOwner(), ItemID: s.GetResourceId(), ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), @@ -934,7 +930,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer log.Error().Err(err). Msg("failed to unshare expired share") } - if err := events.Publish(m.eventStream, events.ShareExpired{ + if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ ShareOwner: s.GetOwner(), ItemID: s.GetResourceId(), ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), @@ -988,6 +984,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab func shareIsRoutable(share *collaboration.Share) bool { return strings.Contains(share.Id.OpaqueId, shareid.IDDelimiter) } + func updateShareID(share *collaboration.Share) { share.Id.OpaqueId = shareid.Encode(share.ResourceId.StorageId, share.ResourceId.SpaceId, share.Id.OpaqueId) } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go index e89edca22..5f494e866 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -305,6 +305,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) if err := events.Publish( + ctx, fs.stream, events.UploadReady{ UploadID: ev.UploadID, @@ -342,7 +343,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { continue } // restart postprocessing - if err := events.Publish(fs.stream, events.BytesReceived{ + if err := events.Publish(ctx, fs.stream, events.BytesReceived{ UploadID: up.Info.ID, URL: s, SpaceOwner: n.SpaceOwnerOrManager(up.Ctx), @@ -474,7 +475,6 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { log.Error().Interface("event", ev).Msg("Unknown event") } } - } // Shutdown shuts down the storage diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go index 50a629c43..91342a3d7 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go @@ -274,7 +274,7 @@ func (upload *Upload) FinishUpload(_ context.Context) error { return err } - if err := events.Publish(upload.pub, events.BytesReceived{ + if err := events.Publish(ctx, upload.pub, events.BytesReceived{ UploadID: upload.Info.ID, URL: s, SpaceOwner: n.SpaceOwnerOrManager(upload.Ctx), diff --git a/vendor/modules.txt b/vendor/modules.txt index bfda43577..4ba8c5cbc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.15.1-0.20230809113840-3ceaf17cf7fb +# github.com/cs3org/reva/v2 v2.15.1-0.20230810092810-8d195c7859c7 ## explicit; go 1.20 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime