diff --git a/.drone.env b/.drone.env index d4f4b0b86..418204b83 100644 --- a/.drone.env +++ b/.drone.env @@ -1,5 +1,5 @@ # The test runner source for API tests -CORE_COMMITID=a4a490bbebdc625c328c816e5f3d09fcd94550a9 +CORE_COMMITID=6a3c89e917330990f99feabef3b42d1c51c0df1d CORE_BRANCH=master # The test runner source for UI tests diff --git a/changelog/unreleased/rescan-spaces.md b/changelog/unreleased/rescan-spaces.md new file mode 100644 index 000000000..8afcabd4c --- /dev/null +++ b/changelog/unreleased/rescan-spaces.md @@ -0,0 +1,7 @@ +Bugfix: Trigger a rescan of spaces in the search index when items have changed + +The search service now scans spaces when items have been changed. This fixes the problem +that mtime and treesize propagation was not reflected in the search index properly. + +https://github.com/owncloud/ocis/pull/4777 +https://github.com/owncloud/ocis/issues/4410 diff --git a/go.mod b/go.mod index 6cc14b686..5fe8da58b 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/armon/go-radix v1.0.0 github.com/blevesearch/bleve/v2 v2.3.4 github.com/coreos/go-oidc/v3 v3.4.0 - github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512 - github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa + github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965 + github.com/cs3org/reva/v2 v2.10.1-0.20221013183308-560ba925a814 github.com/disintegration/imaging v1.6.2 github.com/ggwhite/go-masker v1.0.9 github.com/go-chi/chi/v5 v5.0.7 diff --git a/go.sum b/go.sum index 25107f9c4..10f15b68f 100644 --- a/go.sum +++ b/go.sum @@ -349,10 +349,10 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4= github.com/crewjam/saml v0.4.6 h1:XCUFPkQSJLvzyl4cW9OvpWUbRf0gE7VUpU8ZnilbeM4= github.com/crewjam/saml v0.4.6/go.mod h1:ZBOXnNPFzB3CgOkRm7Nd6IVdkG+l/wF+0ZXLqD96t1A= -github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512 h1:xTvaIsLu1ezoWOJKnV0ehgiowkOiEhMaylaI1lD/Axw= -github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa h1:DSeaakiPW5zYrGGEDO0BkSZWhqq6LS+rd1DQ1DPztJo= -github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa/go.mod h1:QUHLTf/ACFG2ueNP3u1dslv1bIWTTQAqvWFCorVke6o= +github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965 h1:y4n2j68LLnvac+zw/al8MfPgO5aQiIwLmHM/JzYN8AM= +github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= +github.com/cs3org/reva/v2 v2.10.1-0.20221013183308-560ba925a814 h1:/IpXuGNX01f2eKM5mSMJFoE+DGq4NULo0WwpO3LzmTg= +github.com/cs3org/reva/v2 v2.10.1-0.20221013183308-560ba925a814/go.mod h1:lq+LRpBDYU1vHUmJDeK7sGquREciO8GDj5/SYIibMPY= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= diff --git a/services/search/pkg/command/index.go b/services/search/pkg/command/index.go index ecb44fa7d..791cf116a 100644 --- a/services/search/pkg/command/index.go +++ b/services/search/pkg/command/index.go @@ -3,9 +3,12 @@ package command import ( "context" "fmt" + "time" "github.com/urfave/cli/v2" + "go-micro.dev/v4/client" + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" "github.com/owncloud/ocis/v2/services/search/pkg/config" @@ -30,18 +33,20 @@ func Index(cfg *config.Config) *cli.Command { Name: "user", Aliases: []string{"u"}, Required: true, - Usage: "the username of the user tha shall be used to access the files", + Usage: "the username of the user that shall be used to access the files", }, }, Before: func(c *cli.Context) error { - return parser.ParseConfig(cfg) + return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, - Action: func(c *cli.Context) error { - client := searchsvc.NewSearchProviderService("com.owncloud.api.search", grpc.DefaultClient()) - _, err := client.IndexSpace(context.Background(), &searchsvc.IndexSpaceRequest{ - SpaceId: c.String("space"), - UserId: c.String("user"), - }) + Action: func(ctx *cli.Context) error { + grpcClient := grpc.DefaultClient() + grpcClient.Options() + c := searchsvc.NewSearchProviderService("com.owncloud.api.search", grpcClient) + _, err := c.IndexSpace(context.Background(), &searchsvc.IndexSpaceRequest{ + SpaceId: ctx.String("space"), + UserId: ctx.String("user"), + }, func(opts *client.CallOptions) { opts.RequestTimeout = 10 * time.Minute }) if err != nil { fmt.Println("failed to index space: " + err.Error()) return err diff --git a/services/search/pkg/search/index/index.go b/services/search/pkg/search/index/index.go index f822841b0..3a274d1af 100644 --- a/services/search/pkg/search/index/index.go +++ b/services/search/pkg/search/index/index.go @@ -222,13 +222,18 @@ func (i *Index) Search(ctx context.Context, req *searchsvc.SearchIndexRequest) ( query := bleve.NewConjunctionQuery( bleve.NewQueryStringQuery(req.Query), deletedQuery, // Skip documents that have been marked as deleted - bleve.NewQueryStringQuery("RootID:"+idToBleveId(&sprovider.ResourceId{ - StorageId: req.Ref.GetResourceId().GetStorageId(), - SpaceId: req.Ref.GetResourceId().GetSpaceId(), - OpaqueId: req.Ref.GetResourceId().GetOpaqueId(), - })), // Limit search to the space - bleve.NewQueryStringQuery("Path:"+queryEscape(utils.MakeRelativePath(path.Join(req.Ref.Path, "/"))+"*")), // Limit search to this directory in the space ) + if req.Ref != nil { + query = bleve.NewConjunctionQuery( + query, + bleve.NewQueryStringQuery("RootID:"+idToBleveId(&sprovider.ResourceId{ + StorageId: req.Ref.GetResourceId().GetStorageId(), + SpaceId: req.Ref.GetResourceId().GetSpaceId(), + OpaqueId: req.Ref.GetResourceId().GetOpaqueId(), + })), // Limit search to the space + bleve.NewQueryStringQuery("Path:"+queryEscape(utils.MakeRelativePath(path.Join(req.Ref.Path, "/"))+"*")), // Limit search to this directory in the space + ) + } bleveReq := bleve.NewSearchRequest(query) bleveReq.Size = 200 if req.PageSize > 0 { @@ -295,7 +300,7 @@ func toEntity(ref *sprovider.Reference, ri *sprovider.ResourceInfo) *indexDocume } if ri.Mtime != nil { - doc.Mtime = time.Unix(int64(ri.Mtime.Seconds), int64(ri.Mtime.Nanos)).UTC().Format(time.RFC3339) + doc.Mtime = time.Unix(int64(ri.Mtime.Seconds), int64(ri.Mtime.Nanos)).UTC().Format(time.RFC3339Nano) } return doc @@ -350,7 +355,7 @@ func fromDocumentMatch(hit *search.DocumentMatch) (*searchmsg.Match, error) { match.Entity.ParentId = resourceIDtoSearchID(parentID) } - if mtime, err := time.Parse(time.RFC3339, hit.Fields["Mtime"].(string)); err == nil { + if mtime, err := time.Parse(time.RFC3339Nano, hit.Fields["Mtime"].(string)); err == nil { match.Entity.LastModifiedTime = ×tamppb.Timestamp{Seconds: mtime.Unix(), Nanos: int32(mtime.Nanosecond())} } diff --git a/services/search/pkg/search/provider/events.go b/services/search/pkg/search/provider/events.go index c3c7ab7d1..f8b63b58b 100644 --- a/services/search/pkg/search/provider/events.go +++ b/services/search/pkg/search/provider/events.go @@ -2,6 +2,8 @@ package provider import ( "context" + "sync" + "time" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -11,12 +13,43 @@ import ( ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/storagespace" "google.golang.org/grpc/metadata" ) +// SpaceDebouncer debounces operations on spaces for a configurable amount of time +type SpaceDebouncer struct { + after time.Duration + f func(id *provider.StorageSpaceId, userID *user.UserId) + pending map[string]*time.Timer + + mutex sync.Mutex +} + +// NewSpaceDebouncer returns a new SpaceDebouncer instance +func NewSpaceDebouncer(d time.Duration, f func(id *provider.StorageSpaceId, userID *user.UserId)) *SpaceDebouncer { + return &SpaceDebouncer{ + after: d, + f: f, + pending: map[string]*time.Timer{}, + } +} + +// Debounce restars the debounce timer for the given space +func (d *SpaceDebouncer) Debounce(id *provider.StorageSpaceId, userID *user.UserId) { + d.mutex.Lock() + defer d.mutex.Unlock() + + if t := d.pending[id.OpaqueId]; t != nil { + t.Stop() + } + + d.pending[id.OpaqueId] = time.AfterFunc(d.after, func() { + d.f(id, userID) + }) +} + func (p *Provider) handleEvent(ev interface{}) { - var ref *provider.Reference - var owner *user.User switch e := ev.(type) { case events.ItemTrashed: p.logger.Debug().Interface("event", ev).Msg("marking document as deleted") @@ -24,11 +57,10 @@ func (p *Provider) handleEvent(ev interface{}) { if err != nil { p.logger.Error().Err(err).Interface("Id", e.ID).Msg("failed to remove item from index") } - return + p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner) case events.ItemRestored: p.logger.Debug().Interface("event", ev).Msg("marking document as restored") - ref = e.Ref - owner = &user.User{ + owner := &user.User{ Id: e.Executant, } @@ -36,13 +68,13 @@ func (p *Provider) handleEvent(ev interface{}) { if err != nil { return } - statRes, err := p.statResource(ownerCtx, ref, owner) + statRes, err := p.statResource(ownerCtx, e.Ref, owner) if err != nil { p.logger.Error().Err(err). - Str("storageid", ref.GetResourceId().GetStorageId()). - Str("spaceid", ref.GetResourceId().GetSpaceId()). - Str("opaqueid", ref.GetResourceId().GetOpaqueId()). - Str("path", ref.GetPath()). + Str("storageid", e.Ref.GetResourceId().GetStorageId()). + Str("spaceid", e.Ref.GetResourceId().GetSpaceId()). + Str("opaqueid", e.Ref.GetResourceId().GetOpaqueId()). + Str("path", e.Ref.GetPath()). Msg("failed to make stat call for the restored resource") return } @@ -52,26 +84,24 @@ func (p *Provider) handleEvent(ev interface{}) { err = p.indexClient.Restore(statRes.Info.Id) if err != nil { p.logger.Error().Err(err). - Str("storageid", ref.GetResourceId().GetStorageId()). - Str("spaceid", ref.GetResourceId().GetSpaceId()). - Str("opaqueid", ref.GetResourceId().GetOpaqueId()). - Str("path", ref.GetPath()). + Str("storageid", e.Ref.GetResourceId().GetStorageId()). + Str("spaceid", e.Ref.GetResourceId().GetSpaceId()). + Str("opaqueid", e.Ref.GetResourceId().GetOpaqueId()). + Str("path", e.Ref.GetPath()). Msg("failed to restore the changed resource in the index") } default: p.logger.Error().Interface("statRes", statRes). - Str("storageid", ref.GetResourceId().GetStorageId()). - Str("spaceid", ref.GetResourceId().GetSpaceId()). - Str("opaqueid", ref.GetResourceId().GetOpaqueId()). - Str("path", ref.GetPath()). + Str("storageid", e.Ref.GetResourceId().GetStorageId()). + Str("spaceid", e.Ref.GetResourceId().GetSpaceId()). + Str("opaqueid", e.Ref.GetResourceId().GetOpaqueId()). + Str("path", e.Ref.GetPath()). Msg("failed to stat the restored resource") } - - return + p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner) case events.ItemMoved: p.logger.Debug().Interface("event", ev).Msg("resource has been moved, updating the document") - ref = e.Ref - owner = &user.User{ + owner := &user.User{ Id: e.Executant, } @@ -79,7 +109,7 @@ func (p *Provider) handleEvent(ev interface{}) { if err != nil { return } - statRes, err := p.statResource(ownerCtx, ref, owner) + statRes, err := p.statResource(ownerCtx, e.Ref, owner) if err != nil { p.logger.Error().Err(err).Msg("failed to stat the moved resource") return @@ -91,11 +121,11 @@ func (p *Provider) handleEvent(ev interface{}) { gpRes, err := p.getPath(ownerCtx, statRes.Info.Id, owner) if err != nil { - p.logger.Error().Err(err).Interface("ref", ref).Msg("failed to get path for moved resource") + p.logger.Error().Err(err).Interface("ref", e.Ref).Msg("failed to get path for moved resource") return } if gpRes.Status.Code != rpcv1beta1.Code_CODE_OK { - p.logger.Error().Interface("status", gpRes.Status).Interface("ref", ref).Msg("failed to get path for moved resource") + p.logger.Error().Interface("status", gpRes.Status).Interface("ref", e.Ref).Msg("failed to get path for moved resource") return } @@ -103,68 +133,34 @@ func (p *Provider) handleEvent(ev interface{}) { if err != nil { p.logger.Error().Err(err).Msg("failed to move the changed resource in the index") } - return + p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner) case events.ContainerCreated: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } + p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner) case events.FileUploaded: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } + p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner) case events.FileTouched: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } + p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner) case events.FileVersionRestored: - ref = e.Ref - owner = &user.User{ - Id: e.Executant, - } + p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner) default: // Not sure what to do here. Skip. return } - p.logger.Debug().Interface("event", ev).Msg("resource has been changed, updating the document") +} - ownerCtx, err := p.getAuthContext(owner) - if err != nil { - return - } +func (p *Provider) reindexSpace(ev interface{}, ref *provider.Reference, executant, owner *user.UserId) { + p.logger.Debug().Interface("event", ev).Msg("resource has been changed, scheduling a space resync") - statRes, err := p.statResource(ownerCtx, ref, owner) - if err != nil { - p.logger.Error().Err(err). - Str("storageid", ref.GetResourceId().GetStorageId()). - Str("spaceid", ref.GetResourceId().GetSpaceId()). - Str("opaqueid", ref.GetResourceId().GetOpaqueId()). - Str("path", ref.GetPath()). - Msg("failed to make stat call for changed resource") - return + spaceID := &provider.StorageSpaceId{ + OpaqueId: storagespace.FormatResourceID(provider.ResourceId{ + StorageId: ref.GetResourceId().GetStorageId(), + SpaceId: ref.GetResourceId().GetSpaceId(), + }), } - if statRes.Status.Code != rpc.Code_CODE_OK { - p.logger.Error().Interface("statRes", statRes). - Str("storageid", ref.GetResourceId().GetStorageId()). - Str("spaceid", ref.GetResourceId().GetSpaceId()). - Str("opaqueid", ref.GetResourceId().GetOpaqueId()). - Str("path", ref.GetPath()). - Msg("failed to stat the changed resource") - return - } - - ref, err = p.resolveReference(ownerCtx, ref, statRes.Info) - if err != nil { - p.logger.Error().Err(err).Msg("error resolving reference") - return - } - err = p.indexClient.Add(ref, statRes.Info) - if err != nil { - p.logger.Error().Err(err).Msg("error adding updating the resource in the index") + if owner != nil { + p.indexSpaceDebouncer.Debounce(spaceID, owner) } else { - p.logDocCount() + p.indexSpaceDebouncer.Debounce(spaceID, executant) } } diff --git a/services/search/pkg/search/provider/events_test.go b/services/search/pkg/search/provider/events_test.go index 1917b7326..73ca2a17c 100644 --- a/services/search/pkg/search/provider/events_test.go +++ b/services/search/pkg/search/provider/events_test.go @@ -2,16 +2,19 @@ package provider_test import ( "context" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/rgrpc/status" + "github.com/cs3org/reva/v2/pkg/utils" cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/search/pkg/search/mocks" @@ -20,9 +23,10 @@ import ( var _ = Describe("Searchprovider", func() { var ( - p *provider.Provider - gwClient *cs3mocks.GatewayAPIClient - indexClient *mocks.IndexClient + p *provider.Provider + gwClient *cs3mocks.GatewayAPIClient + indexClient *mocks.IndexClient + debouncedIndexCalls int ctx context.Context eventsChan chan interface{} @@ -48,8 +52,9 @@ var _ = Describe("Searchprovider", func() { SpaceId: "rootopaqueid", OpaqueId: "opaqueid", }, - Path: "foo.pdf", - Size: 12345, + Path: "foo.pdf", + Size: 12345, + Mtime: utils.TimeToTS(time.Now().Add(-time.Hour)), } ) @@ -59,7 +64,12 @@ var _ = Describe("Searchprovider", func() { gwClient = &cs3mocks.GatewayAPIClient{} indexClient = &mocks.IndexClient{} - p = provider.New(gwClient, indexClient, "", eventsChan, logger) + debouncedIndexCalls = 0 + debouncer := provider.NewSpaceDebouncer(100*time.Millisecond, func(id *sprovider.StorageSpaceId, userID *userv1beta1.UserId) { + debouncedIndexCalls += 1 + }) + + p = provider.NewWithDebouncer(gwClient, indexClient, "", eventsChan, logger, debouncer) gwClient.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{ Status: status.NewOK(ctx), @@ -91,37 +101,25 @@ var _ = Describe("Searchprovider", func() { }) It("triggers an index update when a file has been uploaded", func() { - called := false - indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool { - return riToIndex.Id.OpaqueId == ri.Id.OpaqueId - })).Return(nil).Run(func(args mock.Arguments) { - called = true - }) eventsChan <- events.FileUploaded{ Ref: ref, Executant: user.Id, } - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) + Eventually(func() int { + return debouncedIndexCalls + }, "2s").Should(Equal(1)) }) It("triggers an index update when a file has been touched", func() { - called := false - indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool { - return riToIndex.Id.OpaqueId == ri.Id.OpaqueId - })).Return(nil).Run(func(args mock.Arguments) { - called = true - }) eventsChan <- events.FileTouched{ Ref: ref, Executant: user.Id, } - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) + Eventually(func() int { + return debouncedIndexCalls + }, "2s").Should(Equal(1)) }) It("removes an entry from the index when the file has been deleted", func() { @@ -163,20 +161,14 @@ var _ = Describe("Searchprovider", func() { }) It("indexes items when a version has been restored", func() { - called := false - indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool { - return riToIndex.Id.OpaqueId == ri.Id.OpaqueId - })).Return(nil).Run(func(args mock.Arguments) { - called = true - }) eventsChan <- events.FileVersionRestored{ Ref: ref, Executant: user.Id, } - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) + Eventually(func() int { + return debouncedIndexCalls + }, "2s").Should(Equal(1)) }) }) @@ -203,3 +195,50 @@ var _ = Describe("Searchprovider", func() { }) }) }) + +var _ = Describe("SpaceDebouncer", func() { + var ( + debouncer *provider.SpaceDebouncer + callCount map[string]int + + userId = &user.UserId{ + OpaqueId: "user", + } + ) + + BeforeEach(func() { + callCount = map[string]int{} + debouncer = provider.NewSpaceDebouncer(50*time.Millisecond, func(id *sprovider.StorageSpaceId, _ *user.UserId) { + callCount[id.OpaqueId] += 1 + }) + }) + + It("debounces", func() { + spaceid := &sprovider.StorageSpaceId{ + OpaqueId: "spaceid", + } + debouncer.Debounce(spaceid, userId) + debouncer.Debounce(spaceid, userId) + debouncer.Debounce(spaceid, userId) + Eventually(func() int { + return callCount["spaceid"] + }, "200ms").Should(Equal(1)) + }) + + It("works multiple times", func() { + spaceid := &sprovider.StorageSpaceId{ + OpaqueId: "spaceid", + } + debouncer.Debounce(spaceid, userId) + debouncer.Debounce(spaceid, userId) + debouncer.Debounce(spaceid, userId) + time.Sleep(100 * time.Millisecond) + + debouncer.Debounce(spaceid, userId) + debouncer.Debounce(spaceid, userId) + + Eventually(func() int { + return callCount["spaceid"] + }, "200ms").Should(Equal(2)) + }) +}) diff --git a/services/search/pkg/search/provider/searchprovider.go b/services/search/pkg/search/provider/searchprovider.go index 9e314a97f..1bfe01286 100644 --- a/services/search/pkg/search/provider/searchprovider.go +++ b/services/search/pkg/search/provider/searchprovider.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "google.golang.org/grpc/metadata" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" @@ -22,7 +24,6 @@ import ( "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/search/pkg/search" - "google.golang.org/grpc/metadata" searchmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0" searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" @@ -62,6 +63,8 @@ type Provider struct { gwClient gateway.GatewayAPIClient indexClient search.IndexClient machineAuthAPIKey string + + indexSpaceDebouncer *SpaceDebouncer } type MatchArray []*searchmsg.Match @@ -84,6 +87,13 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach logger: logger, } + p.indexSpaceDebouncer = NewSpaceDebouncer(50*time.Millisecond, func(id *provider.StorageSpaceId, userID *user.UserId) { + err := p.doIndexSpace(context.Background(), id, userID) + if err != nil { + p.logger.Error().Err(err).Interface("spaceID", id).Interface("userID", userID).Msg("error while indexing a space") + } + }) + go func() { for { ev := <-eventsChan @@ -97,6 +107,13 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach return p } +// NewWithDebouncer returns a new provider with a customer index space debouncer +func NewWithDebouncer(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, machineAuthAPIKey string, eventsChan <-chan interface{}, logger log.Logger, debouncer *SpaceDebouncer) *Provider { + p := New(gwClient, indexClient, machineAuthAPIKey, eventsChan, logger) + p.indexSpaceDebouncer = debouncer + return p +} + func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) { if req.Query == "" { return nil, errtypes.BadRequest("empty query provided") @@ -241,52 +258,70 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s } func (p *Provider) IndexSpace(ctx context.Context, req *searchsvc.IndexSpaceRequest) (*searchsvc.IndexSpaceResponse, error) { - // get user - res, err := p.gwClient.GetUserByClaim(context.Background(), &user.GetUserByClaimRequest{ - Claim: "username", - Value: req.UserId, - }) - if err != nil || res.Status.Code != rpc.Code_CODE_OK { - fmt.Println("error: Could not get user by userid") + err := p.doIndexSpace(ctx, &provider.StorageSpaceId{OpaqueId: req.SpaceId}, &user.UserId{OpaqueId: req.UserId}) + if err != nil { return nil, err } + return &searchsvc.IndexSpaceResponse{}, nil +} - // Get auth context - ownerCtx := ctxpkg.ContextSetUser(context.Background(), res.User) - authRes, err := p.gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{ +func (p *Provider) doIndexSpace(ctx context.Context, spaceID *provider.StorageSpaceId, userID *user.UserId) error { + authRes, err := p.gwClient.Authenticate(ctx, &gateway.AuthenticateRequest{ Type: "machine", - ClientId: "userid:" + res.User.Id.OpaqueId, + ClientId: "userid:" + userID.OpaqueId, ClientSecret: p.machineAuthAPIKey, }) if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { - return nil, err + return err } if authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { - return nil, fmt.Errorf("could not get authenticated context for user") + return fmt.Errorf("could not get authenticated context for user") } - ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token) + ownerCtx := metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, authRes.Token) // Walk the space and index all files walker := walker.NewWalker(p.gwClient) - rootId, err := storagespace.ParseID(req.SpaceId) + rootID, err := storagespace.ParseID(spaceID.OpaqueId) if err != nil { - p.logger.Error().Err(err).Msg(err.Error()) - return nil, err + p.logger.Error().Err(err).Msg("invalid space id") + return err } - err = walker.Walk(ownerCtx, &rootId, func(wd string, info *provider.ResourceInfo, err error) error { + if rootID.StorageId == "" || rootID.SpaceId == "" { + p.logger.Error().Err(err).Msg("invalid space id") + return fmt.Errorf("invalid space id") + } + rootID.OpaqueId = rootID.SpaceId + + err = walker.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error { if err != nil { p.logger.Error().Err(err).Msg("error walking the tree") + return err } - ref := &provider.Reference{ - Path: utils.MakeRelativePath(filepath.Join(wd, info.Path)), - ResourceId: &rootId, - } - ref, err = p.resolveReference(ownerCtx, ref, info) - if err != nil { - p.logger.Error().Err(err).Msg("error resolving reference") + + if info == nil { return nil } + + ref := &provider.Reference{ + Path: utils.MakeRelativePath(filepath.Join(wd, info.Path)), + ResourceId: &rootID, + } + p.logger.Debug().Str("path", ref.Path).Msg("Walking tree") + + // Has this item/subtree changed? + searchRes, err := p.indexClient.Search(ownerCtx, &searchsvc.SearchIndexRequest{ + Query: "+ID:" + storagespace.FormatResourceID(*info.Id) + ` +Mtime:>="` + utils.TSToTime(info.Mtime).Format(time.RFC3339Nano) + `"`, + }) + if err == nil && len(searchRes.Matches) >= 1 { + if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER { + p.logger.Debug().Str("path", ref.Path).Msg("subtree hasn't changed. Skipping.") + return filepath.SkipDir + } + p.logger.Debug().Str("path", ref.Path).Msg("element hasn't changed. Skipping.") + return nil + } + err = p.indexClient.Add(ref, info) if err != nil { p.logger.Error().Err(err).Msg("error adding resource to the index") @@ -296,32 +331,11 @@ func (p *Provider) IndexSpace(ctx context.Context, req *searchsvc.IndexSpaceRequ return nil }) if err != nil { - return nil, err + return err } p.logDocCount() - return &searchsvc.IndexSpaceResponse{}, nil -} - -func (p *Provider) resolveReference(ctx context.Context, ref *provider.Reference, ri *provider.ResourceInfo) (*provider.Reference, error) { - if ref.GetResourceId().GetOpaqueId() == ref.GetResourceId().GetSpaceId() { - return ref, nil - } - - gpRes, err := p.gwClient.GetPath(ctx, &provider.GetPathRequest{ - ResourceId: ri.Id, - }) - if err != nil || gpRes.Status.Code != rpcv1beta1.Code_CODE_OK { - return nil, err - } - return &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: ref.GetResourceId().GetStorageId(), - SpaceId: ref.GetResourceId().GetSpaceId(), - OpaqueId: ref.GetResourceId().GetSpaceId(), - }, - Path: utils.MakeRelativePath(gpRes.Path), - }, nil + return nil } func (p *Provider) logDocCount() { diff --git a/services/search/pkg/search/provider/searchprovider_test.go b/services/search/pkg/search/provider/searchprovider_test.go index 933200ab4..584391987 100644 --- a/services/search/pkg/search/provider/searchprovider_test.go +++ b/services/search/pkg/search/provider/searchprovider_test.go @@ -2,6 +2,7 @@ package provider_test import ( "context" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -12,6 +13,7 @@ import ( sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/v2/pkg/rgrpc/status" + "github.com/cs3org/reva/v2/pkg/utils" cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks" "github.com/owncloud/ocis/v2/ocis-pkg/log" searchmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0" @@ -59,8 +61,9 @@ var _ = Describe("Searchprovider", func() { StorageId: "storageid", OpaqueId: "opaqueid", }, - Path: "foo.pdf", - Size: 12345, + Path: "foo.pdf", + Size: 12345, + Mtime: utils.TimeToTS(time.Now().Add(-time.Hour)), } ) @@ -105,6 +108,7 @@ var _ = Describe("Searchprovider", func() { indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool { return riToIndex.Id.OpaqueId == ri.Id.OpaqueId })).Return(nil) + indexClient.On("Search", mock.Anything, mock.Anything).Return(&searchsvc.SearchIndexResponse{}, nil) res, err := p.IndexSpace(ctx, &searchsvc.IndexSpaceRequest{ SpaceId: "storageid$spaceid!spaceid", diff --git a/tests/acceptance/expected-failures-API-on-OCIS-storage.md b/tests/acceptance/expected-failures-API-on-OCIS-storage.md index 70c7cc6d1..ce2d76b5a 100644 --- a/tests/acceptance/expected-failures-API-on-OCIS-storage.md +++ b/tests/acceptance/expected-failures-API-on-OCIS-storage.md @@ -566,8 +566,6 @@ _ocdav: api compatibility, return correct status code_ - [apiAuthWebDav/webDavSpecialURLs.feature:13](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L13) - [apiAuthWebDav/webDavSpecialURLs.feature:24](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L24) -- [apiAuthWebDav/webDavSpecialURLs.feature:34](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L34) -- [apiAuthWebDav/webDavSpecialURLs.feature:45](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L45) - [apiAuthWebDav/webDavSpecialURLs.feature:55](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L55) - [apiAuthWebDav/webDavSpecialURLs.feature:66](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L66) - [apiAuthWebDav/webDavSpecialURLs.feature:76](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L76)