[full-ci] Fix searching shares (#3668)

* Be more robust when limiting the search to a directory in the space

* Add more debug logs

* Fix searching in shares

* Delay indexing by a second so that everything has time to settle

Indexing immediately when the event arrives sometimes causes issues
trying to stat the changed resource.

* Pick up the machine auth secret from the default config

* Adapt to the resourceid refactoring in reva

* Fix unit tests
This commit is contained in:
Andre Duffeck
2022-05-04 13:15:28 +02:00
committed by GitHub
parent b41498c121
commit d540c4405d
5 changed files with 166 additions and 147 deletions
@@ -37,7 +37,7 @@ func DefaultConfig() *config.Config {
Cluster: "ocis-cluster",
ConsumerGroup: "search",
},
MachineAuthAPIKey: "change-me-please",
MachineAuthAPIKey: "",
}
}
+1 -1
View File
@@ -207,7 +207,7 @@ func (i *Index) Search(ctx context.Context, req *searchsvc.SearchIndexRequest) (
bleve.NewQueryStringQuery("Name:"+req.Query),
deletedQuery, // Skip documents that have been marked as deleted
bleve.NewQueryStringQuery("RootID:"+req.Ref.ResourceId.StorageId+"!"+req.Ref.ResourceId.OpaqueId), // Limit search to the space
bleve.NewQueryStringQuery("Path:"+req.Ref.Path+"*"), // Limit search to this directory in the space
bleve.NewQueryStringQuery("Path:"+utils.MakeRelativePath(path.Join(req.Ref.Path, "/"))+"*"), // Limit search to this directory in the space
)
bleveReq := bleve.NewSearchRequest(query)
bleveReq.Size = 200
@@ -0,0 +1,128 @@
package provider
import (
"context"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/events"
"google.golang.org/grpc/metadata"
)
func (p *Provider) handleEvent(ev interface{}) {
var ref *provider.Reference
var owner *user.User
switch e := ev.(type) {
case events.ItemTrashed:
p.logger.Debug().Interface("event", ev).Msg("marking document as deleted")
err := p.indexClient.Delete(e.ID)
if err != nil {
p.logger.Error().Err(err).Interface("Id", e.ID).Msg("failed to remove item from index")
}
return
case events.ItemRestored:
p.logger.Debug().Interface("event", ev).Msg("marking document as restored")
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
statRes, err := p.statResource(ref, owner)
if err != nil {
p.logger.Error().Err(err).Msg("failed to stat the changed resource")
return
}
switch statRes.Status.Code {
case rpc.Code_CODE_OK:
err = p.indexClient.Restore(statRes.Info.Id)
if err != nil {
p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index")
}
default:
p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource")
}
return
case events.ItemMoved:
p.logger.Debug().Interface("event", ev).Msg("resource has been moved, updating the document")
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
statRes, err := p.statResource(ref, owner)
if err != nil {
p.logger.Error().Err(err).Msg("failed to stat the changed resource")
return
}
switch statRes.Status.Code {
case rpc.Code_CODE_OK:
err = p.indexClient.Move(statRes.Info)
if err != nil {
p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index")
}
default:
p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource")
}
return
case events.ContainerCreated:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
case events.FileUploaded:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
case events.FileVersionRestored:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
default:
// Not sure what to do here. Skip.
return
}
p.logger.Debug().Interface("event", ev).Msg("resource has been changed, updating the document")
statRes, err := p.statResource(ref, owner)
if err != nil {
p.logger.Error().Err(err).Msg("failed to stat the changed resource")
return
}
if statRes.Status.Code != rpc.Code_CODE_OK {
p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource")
return
}
err = p.indexClient.Add(ref, statRes.Info)
if err != nil {
p.logger.Error().Err(err).Msg("error adding updating the resource in the index")
} else {
p.logDocCount()
}
}
func (p *Provider) statResource(ref *provider.Reference, owner *user.User) (*provider.StatResponse, error) {
// Get auth
ownerCtx := ctxpkg.ContextSetUser(context.Background(), owner)
authRes, err := p.gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{
Type: "machine",
ClientId: "userid:" + owner.Id.OpaqueId,
ClientSecret: p.machineAuthAPIKey,
})
if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK {
p.logger.Error().Err(err).Interface("authRes", authRes).Msg("error using machine auth")
}
ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token)
// Stat changed resource resource
return p.gwClient.Stat(ownerCtx, &provider.StatRequest{Ref: ref})
}
@@ -5,17 +5,17 @@ import (
"fmt"
"path/filepath"
"strings"
"time"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage/utils/walker"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/extensions/search/pkg/search"
"github.com/owncloud/ocis/ocis-pkg/log"
@@ -43,138 +43,28 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach
go func() {
for {
ev := <-eventsChan
var ref *provider.Reference
var owner *user.User
switch e := ev.(type) {
case events.ItemTrashed:
err := p.indexClient.Delete(e.ID)
if err != nil {
p.logger.Error().Err(err).Interface("Id", e.ID).Msg("failed to remove item from index")
}
continue
case events.ItemRestored:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
statRes, err := p.statResource(ref, owner)
if err != nil {
p.logger.Error().Err(err).Msg("failed to stat the changed resource")
}
switch statRes.Status.Code {
case rpc.Code_CODE_OK:
err = p.indexClient.Restore(statRes.Info.Id)
if err != nil {
p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index")
}
default:
p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource")
}
continue
case events.ItemMoved:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
statRes, err := p.statResource(ref, owner)
if err != nil {
p.logger.Error().Err(err).Msg("failed to stat the changed resource")
}
switch statRes.Status.Code {
case rpc.Code_CODE_OK:
err = p.indexClient.Move(statRes.Info)
if err != nil {
p.logger.Error().Err(err).Msg("failed to restore the changed resource in the index")
}
default:
p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource")
}
continue
case events.ContainerCreated:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
case events.FileUploaded:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
case events.FileVersionRestored:
ref = e.Ref
owner = &user.User{
Id: e.Executant,
}
default:
// Not sure what to do here. Skip.
continue
}
statRes, err := p.statResource(ref, owner)
if err != nil {
p.logger.Error().Err(err).Msg("failed to stat the changed resource")
}
switch statRes.Status.Code {
case rpc.Code_CODE_OK:
err = p.indexClient.Add(ref, statRes.Info)
if err != nil {
p.logger.Error().Err(err).Msg("error adding updating the resource in the index")
} else {
p.logDocCount()
}
default:
p.logger.Error().Interface("statRes", statRes).Msg("failed to stat the changed resource")
}
go func() {
time.Sleep(1 * time.Second) // Give some time to let everything settle down before trying to access it when indexing
p.handleEvent(ev)
}()
}
}()
return p
}
func (p *Provider) statResource(ref *provider.Reference, owner *user.User) (*provider.StatResponse, error) {
// Get auth
ownerCtx := ctxpkg.ContextSetUser(context.Background(), owner)
authRes, err := p.gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{
Type: "machine",
ClientId: "userid:" + owner.Id.OpaqueId,
ClientSecret: p.machineAuthAPIKey,
})
if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK {
p.logger.Error().Err(err).Interface("authRes", authRes).Msg("error using machine auth")
}
ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token)
// Stat changed resource resource
return p.gwClient.Stat(ownerCtx, &provider.StatRequest{Ref: ref})
}
func (p *Provider) logDocCount() {
c, err := p.indexClient.DocCount()
if err != nil {
p.logger.Error().Err(err).Msg("error getting document count from the index")
}
p.logger.Debug().Interface("count", c).Msg("new document count")
}
func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) {
if req.Query == "" {
return nil, errtypes.PreconditionFailed("empty query provided")
}
listSpacesRes, err := p.gwClient.ListStorageSpaces(ctx, &provider.ListStorageSpacesRequest{
Opaque: &typesv1beta1.Opaque{Map: map[string]*typesv1beta1.OpaqueEntry{
"path": {
Decoder: "plain",
Value: []byte("/"),
Filters: []*provider.ListStorageSpacesRequest_Filter{
{
Type: provider.ListStorageSpacesRequest_Filter_TYPE_SPACE_TYPE,
Term: &provider.ListStorageSpacesRequest_Filter_SpaceType{SpaceType: "+grant"},
},
}},
},
})
if err != nil {
return nil, err
@@ -196,12 +86,14 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s
pathPrefix = utils.MakeRelativePath(gpRes.Path)
}
_, rootStorageID := storagespace.SplitStorageID(space.Root.StorageId)
res, err := p.indexClient.Search(ctx, &searchsvc.SearchIndexRequest{
Query: req.Query,
Ref: &searchmsg.Reference{
ResourceId: &searchmsg.ResourceID{
StorageId: space.Root.StorageId,
OpaqueId: space.Root.OpaqueId,
OpaqueId: rootStorageID,
},
Path: pathPrefix,
},
@@ -276,3 +168,11 @@ func (p *Provider) IndexSpace(ctx context.Context, req *searchsvc.IndexSpaceRequ
p.logDocCount()
return &searchsvc.IndexSpaceResponse{}, nil
}
func (p *Provider) logDocCount() {
c, err := p.indexClient.DocCount()
if err != nil {
p.logger.Error().Err(err).Msg("error getting document count from the index")
}
p.logger.Debug().Interface("count", c).Msg("new document count")
}
@@ -51,7 +51,7 @@ var _ = Describe("Searchprovider", func() {
},
},
Id: &sprovider.StorageSpaceId{OpaqueId: "personalspace"},
Root: &sprovider.ResourceId{OpaqueId: "personalspaceroot"},
Root: &sprovider.ResourceId{StorageId: "storageid", OpaqueId: "storageid"},
Name: "personalspace",
}
@@ -113,7 +113,7 @@ var _ = Describe("Searchprovider", func() {
Eventually(func() bool {
return called
}).Should(BeTrue())
}, "2s").Should(BeTrue())
})
It("removes an entry from the index when the file has been deleted", func() {
@@ -135,7 +135,7 @@ var _ = Describe("Searchprovider", func() {
Eventually(func() bool {
return called
}).Should(BeTrue())
}, "2s").Should(BeTrue())
})
It("indexes items when they are being restored", func() {
@@ -152,7 +152,7 @@ var _ = Describe("Searchprovider", func() {
Eventually(func() bool {
return called
}).Should(BeTrue())
}, "2s").Should(BeTrue())
})
It("indexes items when a version has been restored", func() {
@@ -169,12 +169,12 @@ var _ = Describe("Searchprovider", func() {
Eventually(func() bool {
return called
}).Should(BeTrue())
}, "2s").Should(BeTrue())
})
It("indexes items when they are being moved", func() {
called := false
indexClient.On("Move", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool {
indexClient.On("Move", mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool {
return riToIndex.Id.OpaqueId == ri.Id.OpaqueId
})).Return(nil).Run(func(args mock.Arguments) {
called = true
@@ -186,7 +186,7 @@ var _ = Describe("Searchprovider", func() {
Eventually(func() bool {
return called
}).Should(BeTrue())
}, "2s").Should(BeTrue())
})
})
@@ -220,10 +220,7 @@ var _ = Describe("Searchprovider", func() {
Context("with a personal space", func() {
BeforeEach(func() {
gwClient.On("ListStorageSpaces", mock.Anything, mock.MatchedBy(func(req *sprovider.ListStorageSpacesRequest) bool {
p := string(req.Opaque.Map["path"].Value)
return p == "/"
})).Return(&sprovider.ListStorageSpacesResponse{
gwClient.On("ListStorageSpaces", mock.Anything, mock.Anything).Return(&sprovider.ListStorageSpacesResponse{
Status: status.NewOK(ctx),
StorageSpaces: []*sprovider.StorageSpace{personalSpace},
}, nil)
@@ -278,7 +275,7 @@ var _ = Describe("Searchprovider", func() {
SpaceType: "grant",
Owner: otherUser,
Id: &sprovider.StorageSpaceId{OpaqueId: "otherspaceroot!otherspacegrant"},
Root: &sprovider.ResourceId{StorageId: "otherspaceroot", OpaqueId: "otherspacegrant"},
Root: &sprovider.ResourceId{StorageId: "otherspaceroot", OpaqueId: "otherspaceroot"},
Name: "grantspace",
}
gwClient.On("GetPath", mock.Anything, mock.Anything).Return(&sprovider.GetPathResponse{
@@ -288,10 +285,7 @@ var _ = Describe("Searchprovider", func() {
})
It("searches the received spaces (grants)", func() {
gwClient.On("ListStorageSpaces", mock.Anything, mock.MatchedBy(func(req *sprovider.ListStorageSpacesRequest) bool {
p := string(req.Opaque.Map["path"].Value)
return p == "/"
})).Return(&sprovider.ListStorageSpacesResponse{
gwClient.On("ListStorageSpaces", mock.Anything, mock.Anything).Return(&sprovider.ListStorageSpacesResponse{
Status: status.NewOK(ctx),
StorageSpaces: []*sprovider.StorageSpace{grantSpace},
}, nil)
@@ -329,20 +323,17 @@ var _ = Describe("Searchprovider", func() {
Expect(match.Entity.Ref.Path).To(Equal("./to/Shared.pdf"))
indexClient.AssertCalled(GinkgoT(), "Search", mock.Anything, mock.MatchedBy(func(req *searchsvc.SearchIndexRequest) bool {
return req.Query == "foo" && req.Ref.ResourceId.OpaqueId == grantSpace.Root.OpaqueId && req.Ref.Path == "./grant/path"
return req.Query == "foo" && req.Ref.ResourceId.StorageId == grantSpace.Root.StorageId && req.Ref.Path == "./grant/path"
}))
})
It("finds matches in both the personal space AND the grant", func() {
gwClient.On("ListStorageSpaces", mock.Anything, mock.MatchedBy(func(req *sprovider.ListStorageSpacesRequest) bool {
p := string(req.Opaque.Map["path"].Value)
return p == "/"
})).Return(&sprovider.ListStorageSpacesResponse{
gwClient.On("ListStorageSpaces", mock.Anything, mock.Anything).Return(&sprovider.ListStorageSpacesResponse{
Status: status.NewOK(ctx),
StorageSpaces: []*sprovider.StorageSpace{personalSpace, grantSpace},
}, nil)
indexClient.On("Search", mock.Anything, mock.MatchedBy(func(req *searchsvc.SearchIndexRequest) bool {
return req.Ref.ResourceId.OpaqueId == grantSpace.Root.OpaqueId
return req.Ref.ResourceId.StorageId == grantSpace.Root.StorageId
})).Return(&searchsvc.SearchIndexResponse{
Matches: []*searchmsg.Match{
{
@@ -364,7 +355,7 @@ var _ = Describe("Searchprovider", func() {
},
}, nil)
indexClient.On("Search", mock.Anything, mock.MatchedBy(func(req *searchsvc.SearchIndexRequest) bool {
return req.Ref.ResourceId.OpaqueId == personalSpace.Root.OpaqueId
return req.Ref.ResourceId.StorageId == personalSpace.Root.StorageId
})).Return(&searchsvc.SearchIndexResponse{
Matches: []*searchmsg.Match{
{