mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-30 17:00:57 -06:00
[full-ci] Rescan spaces (#4777)
* Increase the timeout for indexing spaces * Allow for making queries that are not constrained to a RootID * Use nano precision for the Mtime * Add a SpaceDebouncer The debouncer can be used to delay operations on spaces until things have settled down to avoid doing the same operation multiple times. * Do not index subtrees until they have changed (i.e. the mtime differs) * Also pass a user to the space debouncer func * Trigger a rescan of the according space when an event is received * Improve wording/logging * Add changelog * Get rid of superfluous GetUserByClaim call * Fix tests * Fix reindexing triggered by shares by using the event's SpaceOwner * Bump reva to pull in the space owner events changes * Fix changelog * Fix linter issues * Fall back to the executant if no owner was received from the event * Bump reva and go-cs3apis * Fix go.sum * Bump core * Adapt expected failures * Tweak debounce settings
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
# The test runner source for API tests
|
||||
CORE_COMMITID=a4a490bbebdc625c328c816e5f3d09fcd94550a9
|
||||
CORE_COMMITID=6a3c89e917330990f99feabef3b42d1c51c0df1d
|
||||
CORE_BRANCH=master
|
||||
|
||||
# The test runner source for UI tests
|
||||
|
||||
7
changelog/unreleased/rescan-spaces.md
Normal file
7
changelog/unreleased/rescan-spaces.md
Normal file
@@ -0,0 +1,7 @@
|
||||
Bugfix: Trigger a rescan of spaces in the search index when items have changed
|
||||
|
||||
The search service now scans spaces when items have been changed. This fixes the problem
|
||||
that mtime and treesize propagation was not reflected in the search index properly.
|
||||
|
||||
https://github.com/owncloud/ocis/pull/4777
|
||||
https://github.com/owncloud/ocis/issues/4410
|
||||
4
go.mod
4
go.mod
@@ -9,8 +9,8 @@ require (
|
||||
github.com/armon/go-radix v1.0.0
|
||||
github.com/blevesearch/bleve/v2 v2.3.4
|
||||
github.com/coreos/go-oidc/v3 v3.4.0
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221013183308-560ba925a814
|
||||
github.com/disintegration/imaging v1.6.2
|
||||
github.com/ggwhite/go-masker v1.0.9
|
||||
github.com/go-chi/chi/v5 v5.0.7
|
||||
|
||||
8
go.sum
8
go.sum
@@ -349,10 +349,10 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo
|
||||
github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4=
|
||||
github.com/crewjam/saml v0.4.6 h1:XCUFPkQSJLvzyl4cW9OvpWUbRf0gE7VUpU8ZnilbeM4=
|
||||
github.com/crewjam/saml v0.4.6/go.mod h1:ZBOXnNPFzB3CgOkRm7Nd6IVdkG+l/wF+0ZXLqD96t1A=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512 h1:xTvaIsLu1ezoWOJKnV0ehgiowkOiEhMaylaI1lD/Axw=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa h1:DSeaakiPW5zYrGGEDO0BkSZWhqq6LS+rd1DQ1DPztJo=
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa/go.mod h1:QUHLTf/ACFG2ueNP3u1dslv1bIWTTQAqvWFCorVke6o=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965 h1:y4n2j68LLnvac+zw/al8MfPgO5aQiIwLmHM/JzYN8AM=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221013183308-560ba925a814 h1:/IpXuGNX01f2eKM5mSMJFoE+DGq4NULo0WwpO3LzmTg=
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221013183308-560ba925a814/go.mod h1:lq+LRpBDYU1vHUmJDeK7sGquREciO8GDj5/SYIibMPY=
|
||||
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
|
||||
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
|
||||
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
|
||||
|
||||
@@ -3,9 +3,12 @@ package command
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4/client"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
|
||||
searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0"
|
||||
"github.com/owncloud/ocis/v2/services/search/pkg/config"
|
||||
@@ -30,18 +33,20 @@ func Index(cfg *config.Config) *cli.Command {
|
||||
Name: "user",
|
||||
Aliases: []string{"u"},
|
||||
Required: true,
|
||||
Usage: "the username of the user tha shall be used to access the files",
|
||||
Usage: "the username of the user that shall be used to access the files",
|
||||
},
|
||||
},
|
||||
Before: func(c *cli.Context) error {
|
||||
return parser.ParseConfig(cfg)
|
||||
return configlog.ReturnFatal(parser.ParseConfig(cfg))
|
||||
},
|
||||
Action: func(c *cli.Context) error {
|
||||
client := searchsvc.NewSearchProviderService("com.owncloud.api.search", grpc.DefaultClient())
|
||||
_, err := client.IndexSpace(context.Background(), &searchsvc.IndexSpaceRequest{
|
||||
SpaceId: c.String("space"),
|
||||
UserId: c.String("user"),
|
||||
})
|
||||
Action: func(ctx *cli.Context) error {
|
||||
grpcClient := grpc.DefaultClient()
|
||||
grpcClient.Options()
|
||||
c := searchsvc.NewSearchProviderService("com.owncloud.api.search", grpcClient)
|
||||
_, err := c.IndexSpace(context.Background(), &searchsvc.IndexSpaceRequest{
|
||||
SpaceId: ctx.String("space"),
|
||||
UserId: ctx.String("user"),
|
||||
}, func(opts *client.CallOptions) { opts.RequestTimeout = 10 * time.Minute })
|
||||
if err != nil {
|
||||
fmt.Println("failed to index space: " + err.Error())
|
||||
return err
|
||||
|
||||
@@ -222,13 +222,18 @@ func (i *Index) Search(ctx context.Context, req *searchsvc.SearchIndexRequest) (
|
||||
query := bleve.NewConjunctionQuery(
|
||||
bleve.NewQueryStringQuery(req.Query),
|
||||
deletedQuery, // Skip documents that have been marked as deleted
|
||||
bleve.NewQueryStringQuery("RootID:"+idToBleveId(&sprovider.ResourceId{
|
||||
StorageId: req.Ref.GetResourceId().GetStorageId(),
|
||||
SpaceId: req.Ref.GetResourceId().GetSpaceId(),
|
||||
OpaqueId: req.Ref.GetResourceId().GetOpaqueId(),
|
||||
})), // Limit search to the space
|
||||
bleve.NewQueryStringQuery("Path:"+queryEscape(utils.MakeRelativePath(path.Join(req.Ref.Path, "/"))+"*")), // Limit search to this directory in the space
|
||||
)
|
||||
if req.Ref != nil {
|
||||
query = bleve.NewConjunctionQuery(
|
||||
query,
|
||||
bleve.NewQueryStringQuery("RootID:"+idToBleveId(&sprovider.ResourceId{
|
||||
StorageId: req.Ref.GetResourceId().GetStorageId(),
|
||||
SpaceId: req.Ref.GetResourceId().GetSpaceId(),
|
||||
OpaqueId: req.Ref.GetResourceId().GetOpaqueId(),
|
||||
})), // Limit search to the space
|
||||
bleve.NewQueryStringQuery("Path:"+queryEscape(utils.MakeRelativePath(path.Join(req.Ref.Path, "/"))+"*")), // Limit search to this directory in the space
|
||||
)
|
||||
}
|
||||
bleveReq := bleve.NewSearchRequest(query)
|
||||
bleveReq.Size = 200
|
||||
if req.PageSize > 0 {
|
||||
@@ -295,7 +300,7 @@ func toEntity(ref *sprovider.Reference, ri *sprovider.ResourceInfo) *indexDocume
|
||||
}
|
||||
|
||||
if ri.Mtime != nil {
|
||||
doc.Mtime = time.Unix(int64(ri.Mtime.Seconds), int64(ri.Mtime.Nanos)).UTC().Format(time.RFC3339)
|
||||
doc.Mtime = time.Unix(int64(ri.Mtime.Seconds), int64(ri.Mtime.Nanos)).UTC().Format(time.RFC3339Nano)
|
||||
}
|
||||
|
||||
return doc
|
||||
@@ -350,7 +355,7 @@ func fromDocumentMatch(hit *search.DocumentMatch) (*searchmsg.Match, error) {
|
||||
match.Entity.ParentId = resourceIDtoSearchID(parentID)
|
||||
}
|
||||
|
||||
if mtime, err := time.Parse(time.RFC3339, hit.Fields["Mtime"].(string)); err == nil {
|
||||
if mtime, err := time.Parse(time.RFC3339Nano, hit.Fields["Mtime"].(string)); err == nil {
|
||||
match.Entity.LastModifiedTime = ×tamppb.Timestamp{Seconds: mtime.Unix(), Nanos: int32(mtime.Nanosecond())}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@ package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
|
||||
@@ -11,12 +13,43 @@ import (
|
||||
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
|
||||
"github.com/cs3org/reva/v2/pkg/errtypes"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// SpaceDebouncer debounces operations on spaces for a configurable amount of time
|
||||
type SpaceDebouncer struct {
|
||||
after time.Duration
|
||||
f func(id *provider.StorageSpaceId, userID *user.UserId)
|
||||
pending map[string]*time.Timer
|
||||
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// NewSpaceDebouncer returns a new SpaceDebouncer instance
|
||||
func NewSpaceDebouncer(d time.Duration, f func(id *provider.StorageSpaceId, userID *user.UserId)) *SpaceDebouncer {
|
||||
return &SpaceDebouncer{
|
||||
after: d,
|
||||
f: f,
|
||||
pending: map[string]*time.Timer{},
|
||||
}
|
||||
}
|
||||
|
||||
// Debounce restars the debounce timer for the given space
|
||||
func (d *SpaceDebouncer) Debounce(id *provider.StorageSpaceId, userID *user.UserId) {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
||||
if t := d.pending[id.OpaqueId]; t != nil {
|
||||
t.Stop()
|
||||
}
|
||||
|
||||
d.pending[id.OpaqueId] = time.AfterFunc(d.after, func() {
|
||||
d.f(id, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Provider) handleEvent(ev interface{}) {
|
||||
var ref *provider.Reference
|
||||
var owner *user.User
|
||||
switch e := ev.(type) {
|
||||
case events.ItemTrashed:
|
||||
p.logger.Debug().Interface("event", ev).Msg("marking document as deleted")
|
||||
@@ -24,11 +57,10 @@ func (p *Provider) handleEvent(ev interface{}) {
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Interface("Id", e.ID).Msg("failed to remove item from index")
|
||||
}
|
||||
return
|
||||
p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner)
|
||||
case events.ItemRestored:
|
||||
p.logger.Debug().Interface("event", ev).Msg("marking document as restored")
|
||||
ref = e.Ref
|
||||
owner = &user.User{
|
||||
owner := &user.User{
|
||||
Id: e.Executant,
|
||||
}
|
||||
|
||||
@@ -36,13 +68,13 @@ func (p *Provider) handleEvent(ev interface{}) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
statRes, err := p.statResource(ownerCtx, ref, owner)
|
||||
statRes, err := p.statResource(ownerCtx, e.Ref, owner)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).
|
||||
Str("storageid", ref.GetResourceId().GetStorageId()).
|
||||
Str("spaceid", ref.GetResourceId().GetSpaceId()).
|
||||
Str("opaqueid", ref.GetResourceId().GetOpaqueId()).
|
||||
Str("path", ref.GetPath()).
|
||||
Str("storageid", e.Ref.GetResourceId().GetStorageId()).
|
||||
Str("spaceid", e.Ref.GetResourceId().GetSpaceId()).
|
||||
Str("opaqueid", e.Ref.GetResourceId().GetOpaqueId()).
|
||||
Str("path", e.Ref.GetPath()).
|
||||
Msg("failed to make stat call for the restored resource")
|
||||
return
|
||||
}
|
||||
@@ -52,26 +84,24 @@ func (p *Provider) handleEvent(ev interface{}) {
|
||||
err = p.indexClient.Restore(statRes.Info.Id)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).
|
||||
Str("storageid", ref.GetResourceId().GetStorageId()).
|
||||
Str("spaceid", ref.GetResourceId().GetSpaceId()).
|
||||
Str("opaqueid", ref.GetResourceId().GetOpaqueId()).
|
||||
Str("path", ref.GetPath()).
|
||||
Str("storageid", e.Ref.GetResourceId().GetStorageId()).
|
||||
Str("spaceid", e.Ref.GetResourceId().GetSpaceId()).
|
||||
Str("opaqueid", e.Ref.GetResourceId().GetOpaqueId()).
|
||||
Str("path", e.Ref.GetPath()).
|
||||
Msg("failed to restore the changed resource in the index")
|
||||
}
|
||||
default:
|
||||
p.logger.Error().Interface("statRes", statRes).
|
||||
Str("storageid", ref.GetResourceId().GetStorageId()).
|
||||
Str("spaceid", ref.GetResourceId().GetSpaceId()).
|
||||
Str("opaqueid", ref.GetResourceId().GetOpaqueId()).
|
||||
Str("path", ref.GetPath()).
|
||||
Str("storageid", e.Ref.GetResourceId().GetStorageId()).
|
||||
Str("spaceid", e.Ref.GetResourceId().GetSpaceId()).
|
||||
Str("opaqueid", e.Ref.GetResourceId().GetOpaqueId()).
|
||||
Str("path", e.Ref.GetPath()).
|
||||
Msg("failed to stat the restored resource")
|
||||
}
|
||||
|
||||
return
|
||||
p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner)
|
||||
case events.ItemMoved:
|
||||
p.logger.Debug().Interface("event", ev).Msg("resource has been moved, updating the document")
|
||||
ref = e.Ref
|
||||
owner = &user.User{
|
||||
owner := &user.User{
|
||||
Id: e.Executant,
|
||||
}
|
||||
|
||||
@@ -79,7 +109,7 @@ func (p *Provider) handleEvent(ev interface{}) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
statRes, err := p.statResource(ownerCtx, ref, owner)
|
||||
statRes, err := p.statResource(ownerCtx, e.Ref, owner)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Msg("failed to stat the moved resource")
|
||||
return
|
||||
@@ -91,11 +121,11 @@ func (p *Provider) handleEvent(ev interface{}) {
|
||||
|
||||
gpRes, err := p.getPath(ownerCtx, statRes.Info.Id, owner)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Interface("ref", ref).Msg("failed to get path for moved resource")
|
||||
p.logger.Error().Err(err).Interface("ref", e.Ref).Msg("failed to get path for moved resource")
|
||||
return
|
||||
}
|
||||
if gpRes.Status.Code != rpcv1beta1.Code_CODE_OK {
|
||||
p.logger.Error().Interface("status", gpRes.Status).Interface("ref", ref).Msg("failed to get path for moved resource")
|
||||
p.logger.Error().Interface("status", gpRes.Status).Interface("ref", e.Ref).Msg("failed to get path for moved resource")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -103,68 +133,34 @@ func (p *Provider) handleEvent(ev interface{}) {
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Msg("failed to move the changed resource in the index")
|
||||
}
|
||||
return
|
||||
p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner)
|
||||
case events.ContainerCreated:
|
||||
ref = e.Ref
|
||||
owner = &user.User{
|
||||
Id: e.Executant,
|
||||
}
|
||||
p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner)
|
||||
case events.FileUploaded:
|
||||
ref = e.Ref
|
||||
owner = &user.User{
|
||||
Id: e.Executant,
|
||||
}
|
||||
p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner)
|
||||
case events.FileTouched:
|
||||
ref = e.Ref
|
||||
owner = &user.User{
|
||||
Id: e.Executant,
|
||||
}
|
||||
p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner)
|
||||
case events.FileVersionRestored:
|
||||
ref = e.Ref
|
||||
owner = &user.User{
|
||||
Id: e.Executant,
|
||||
}
|
||||
p.reindexSpace(ev, e.Ref, e.Executant, e.SpaceOwner)
|
||||
default:
|
||||
// Not sure what to do here. Skip.
|
||||
return
|
||||
}
|
||||
p.logger.Debug().Interface("event", ev).Msg("resource has been changed, updating the document")
|
||||
}
|
||||
|
||||
ownerCtx, err := p.getAuthContext(owner)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
func (p *Provider) reindexSpace(ev interface{}, ref *provider.Reference, executant, owner *user.UserId) {
|
||||
p.logger.Debug().Interface("event", ev).Msg("resource has been changed, scheduling a space resync")
|
||||
|
||||
statRes, err := p.statResource(ownerCtx, ref, owner)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).
|
||||
Str("storageid", ref.GetResourceId().GetStorageId()).
|
||||
Str("spaceid", ref.GetResourceId().GetSpaceId()).
|
||||
Str("opaqueid", ref.GetResourceId().GetOpaqueId()).
|
||||
Str("path", ref.GetPath()).
|
||||
Msg("failed to make stat call for changed resource")
|
||||
return
|
||||
spaceID := &provider.StorageSpaceId{
|
||||
OpaqueId: storagespace.FormatResourceID(provider.ResourceId{
|
||||
StorageId: ref.GetResourceId().GetStorageId(),
|
||||
SpaceId: ref.GetResourceId().GetSpaceId(),
|
||||
}),
|
||||
}
|
||||
if statRes.Status.Code != rpc.Code_CODE_OK {
|
||||
p.logger.Error().Interface("statRes", statRes).
|
||||
Str("storageid", ref.GetResourceId().GetStorageId()).
|
||||
Str("spaceid", ref.GetResourceId().GetSpaceId()).
|
||||
Str("opaqueid", ref.GetResourceId().GetOpaqueId()).
|
||||
Str("path", ref.GetPath()).
|
||||
Msg("failed to stat the changed resource")
|
||||
return
|
||||
}
|
||||
|
||||
ref, err = p.resolveReference(ownerCtx, ref, statRes.Info)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Msg("error resolving reference")
|
||||
return
|
||||
}
|
||||
err = p.indexClient.Add(ref, statRes.Info)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Msg("error adding updating the resource in the index")
|
||||
if owner != nil {
|
||||
p.indexSpaceDebouncer.Debounce(spaceID, owner)
|
||||
} else {
|
||||
p.logDocCount()
|
||||
p.indexSpaceDebouncer.Debounce(spaceID, executant)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,16 +2,19 @@ package provider_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
|
||||
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
|
||||
sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/services/search/pkg/search/mocks"
|
||||
@@ -20,9 +23,10 @@ import (
|
||||
|
||||
var _ = Describe("Searchprovider", func() {
|
||||
var (
|
||||
p *provider.Provider
|
||||
gwClient *cs3mocks.GatewayAPIClient
|
||||
indexClient *mocks.IndexClient
|
||||
p *provider.Provider
|
||||
gwClient *cs3mocks.GatewayAPIClient
|
||||
indexClient *mocks.IndexClient
|
||||
debouncedIndexCalls int
|
||||
|
||||
ctx context.Context
|
||||
eventsChan chan interface{}
|
||||
@@ -48,8 +52,9 @@ var _ = Describe("Searchprovider", func() {
|
||||
SpaceId: "rootopaqueid",
|
||||
OpaqueId: "opaqueid",
|
||||
},
|
||||
Path: "foo.pdf",
|
||||
Size: 12345,
|
||||
Path: "foo.pdf",
|
||||
Size: 12345,
|
||||
Mtime: utils.TimeToTS(time.Now().Add(-time.Hour)),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -59,7 +64,12 @@ var _ = Describe("Searchprovider", func() {
|
||||
gwClient = &cs3mocks.GatewayAPIClient{}
|
||||
indexClient = &mocks.IndexClient{}
|
||||
|
||||
p = provider.New(gwClient, indexClient, "", eventsChan, logger)
|
||||
debouncedIndexCalls = 0
|
||||
debouncer := provider.NewSpaceDebouncer(100*time.Millisecond, func(id *sprovider.StorageSpaceId, userID *userv1beta1.UserId) {
|
||||
debouncedIndexCalls += 1
|
||||
})
|
||||
|
||||
p = provider.NewWithDebouncer(gwClient, indexClient, "", eventsChan, logger, debouncer)
|
||||
|
||||
gwClient.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{
|
||||
Status: status.NewOK(ctx),
|
||||
@@ -91,37 +101,25 @@ var _ = Describe("Searchprovider", func() {
|
||||
})
|
||||
|
||||
It("triggers an index update when a file has been uploaded", func() {
|
||||
called := false
|
||||
indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool {
|
||||
return riToIndex.Id.OpaqueId == ri.Id.OpaqueId
|
||||
})).Return(nil).Run(func(args mock.Arguments) {
|
||||
called = true
|
||||
})
|
||||
eventsChan <- events.FileUploaded{
|
||||
Ref: ref,
|
||||
Executant: user.Id,
|
||||
}
|
||||
|
||||
Eventually(func() bool {
|
||||
return called
|
||||
}, "2s").Should(BeTrue())
|
||||
Eventually(func() int {
|
||||
return debouncedIndexCalls
|
||||
}, "2s").Should(Equal(1))
|
||||
})
|
||||
|
||||
It("triggers an index update when a file has been touched", func() {
|
||||
called := false
|
||||
indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool {
|
||||
return riToIndex.Id.OpaqueId == ri.Id.OpaqueId
|
||||
})).Return(nil).Run(func(args mock.Arguments) {
|
||||
called = true
|
||||
})
|
||||
eventsChan <- events.FileTouched{
|
||||
Ref: ref,
|
||||
Executant: user.Id,
|
||||
}
|
||||
|
||||
Eventually(func() bool {
|
||||
return called
|
||||
}, "2s").Should(BeTrue())
|
||||
Eventually(func() int {
|
||||
return debouncedIndexCalls
|
||||
}, "2s").Should(Equal(1))
|
||||
})
|
||||
|
||||
It("removes an entry from the index when the file has been deleted", func() {
|
||||
@@ -163,20 +161,14 @@ var _ = Describe("Searchprovider", func() {
|
||||
})
|
||||
|
||||
It("indexes items when a version has been restored", func() {
|
||||
called := false
|
||||
indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool {
|
||||
return riToIndex.Id.OpaqueId == ri.Id.OpaqueId
|
||||
})).Return(nil).Run(func(args mock.Arguments) {
|
||||
called = true
|
||||
})
|
||||
eventsChan <- events.FileVersionRestored{
|
||||
Ref: ref,
|
||||
Executant: user.Id,
|
||||
}
|
||||
|
||||
Eventually(func() bool {
|
||||
return called
|
||||
}, "2s").Should(BeTrue())
|
||||
Eventually(func() int {
|
||||
return debouncedIndexCalls
|
||||
}, "2s").Should(Equal(1))
|
||||
})
|
||||
})
|
||||
|
||||
@@ -203,3 +195,50 @@ var _ = Describe("Searchprovider", func() {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("SpaceDebouncer", func() {
|
||||
var (
|
||||
debouncer *provider.SpaceDebouncer
|
||||
callCount map[string]int
|
||||
|
||||
userId = &user.UserId{
|
||||
OpaqueId: "user",
|
||||
}
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
callCount = map[string]int{}
|
||||
debouncer = provider.NewSpaceDebouncer(50*time.Millisecond, func(id *sprovider.StorageSpaceId, _ *user.UserId) {
|
||||
callCount[id.OpaqueId] += 1
|
||||
})
|
||||
})
|
||||
|
||||
It("debounces", func() {
|
||||
spaceid := &sprovider.StorageSpaceId{
|
||||
OpaqueId: "spaceid",
|
||||
}
|
||||
debouncer.Debounce(spaceid, userId)
|
||||
debouncer.Debounce(spaceid, userId)
|
||||
debouncer.Debounce(spaceid, userId)
|
||||
Eventually(func() int {
|
||||
return callCount["spaceid"]
|
||||
}, "200ms").Should(Equal(1))
|
||||
})
|
||||
|
||||
It("works multiple times", func() {
|
||||
spaceid := &sprovider.StorageSpaceId{
|
||||
OpaqueId: "spaceid",
|
||||
}
|
||||
debouncer.Debounce(spaceid, userId)
|
||||
debouncer.Debounce(spaceid, userId)
|
||||
debouncer.Debounce(spaceid, userId)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
debouncer.Debounce(spaceid, userId)
|
||||
debouncer.Debounce(spaceid, userId)
|
||||
|
||||
Eventually(func() int {
|
||||
return callCount["spaceid"]
|
||||
}, "200ms").Should(Equal(2))
|
||||
})
|
||||
})
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
|
||||
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
|
||||
@@ -22,7 +24,6 @@ import (
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/services/search/pkg/search"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
searchmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0"
|
||||
searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0"
|
||||
@@ -62,6 +63,8 @@ type Provider struct {
|
||||
gwClient gateway.GatewayAPIClient
|
||||
indexClient search.IndexClient
|
||||
machineAuthAPIKey string
|
||||
|
||||
indexSpaceDebouncer *SpaceDebouncer
|
||||
}
|
||||
|
||||
type MatchArray []*searchmsg.Match
|
||||
@@ -84,6 +87,13 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
p.indexSpaceDebouncer = NewSpaceDebouncer(50*time.Millisecond, func(id *provider.StorageSpaceId, userID *user.UserId) {
|
||||
err := p.doIndexSpace(context.Background(), id, userID)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Interface("spaceID", id).Interface("userID", userID).Msg("error while indexing a space")
|
||||
}
|
||||
})
|
||||
|
||||
go func() {
|
||||
for {
|
||||
ev := <-eventsChan
|
||||
@@ -97,6 +107,13 @@ func New(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, mach
|
||||
return p
|
||||
}
|
||||
|
||||
// NewWithDebouncer returns a new provider with a customer index space debouncer
|
||||
func NewWithDebouncer(gwClient gateway.GatewayAPIClient, indexClient search.IndexClient, machineAuthAPIKey string, eventsChan <-chan interface{}, logger log.Logger, debouncer *SpaceDebouncer) *Provider {
|
||||
p := New(gwClient, indexClient, machineAuthAPIKey, eventsChan, logger)
|
||||
p.indexSpaceDebouncer = debouncer
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) {
|
||||
if req.Query == "" {
|
||||
return nil, errtypes.BadRequest("empty query provided")
|
||||
@@ -241,52 +258,70 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s
|
||||
}
|
||||
|
||||
func (p *Provider) IndexSpace(ctx context.Context, req *searchsvc.IndexSpaceRequest) (*searchsvc.IndexSpaceResponse, error) {
|
||||
// get user
|
||||
res, err := p.gwClient.GetUserByClaim(context.Background(), &user.GetUserByClaimRequest{
|
||||
Claim: "username",
|
||||
Value: req.UserId,
|
||||
})
|
||||
if err != nil || res.Status.Code != rpc.Code_CODE_OK {
|
||||
fmt.Println("error: Could not get user by userid")
|
||||
err := p.doIndexSpace(ctx, &provider.StorageSpaceId{OpaqueId: req.SpaceId}, &user.UserId{OpaqueId: req.UserId})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &searchsvc.IndexSpaceResponse{}, nil
|
||||
}
|
||||
|
||||
// Get auth context
|
||||
ownerCtx := ctxpkg.ContextSetUser(context.Background(), res.User)
|
||||
authRes, err := p.gwClient.Authenticate(ownerCtx, &gateway.AuthenticateRequest{
|
||||
func (p *Provider) doIndexSpace(ctx context.Context, spaceID *provider.StorageSpaceId, userID *user.UserId) error {
|
||||
authRes, err := p.gwClient.Authenticate(ctx, &gateway.AuthenticateRequest{
|
||||
Type: "machine",
|
||||
ClientId: "userid:" + res.User.Id.OpaqueId,
|
||||
ClientId: "userid:" + userID.OpaqueId,
|
||||
ClientSecret: p.machineAuthAPIKey,
|
||||
})
|
||||
if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if authRes.GetStatus().GetCode() != rpc.Code_CODE_OK {
|
||||
return nil, fmt.Errorf("could not get authenticated context for user")
|
||||
return fmt.Errorf("could not get authenticated context for user")
|
||||
}
|
||||
ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token)
|
||||
ownerCtx := metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, authRes.Token)
|
||||
|
||||
// Walk the space and index all files
|
||||
walker := walker.NewWalker(p.gwClient)
|
||||
rootId, err := storagespace.ParseID(req.SpaceId)
|
||||
rootID, err := storagespace.ParseID(spaceID.OpaqueId)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Msg(err.Error())
|
||||
return nil, err
|
||||
p.logger.Error().Err(err).Msg("invalid space id")
|
||||
return err
|
||||
}
|
||||
err = walker.Walk(ownerCtx, &rootId, func(wd string, info *provider.ResourceInfo, err error) error {
|
||||
if rootID.StorageId == "" || rootID.SpaceId == "" {
|
||||
p.logger.Error().Err(err).Msg("invalid space id")
|
||||
return fmt.Errorf("invalid space id")
|
||||
}
|
||||
rootID.OpaqueId = rootID.SpaceId
|
||||
|
||||
err = walker.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error {
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Msg("error walking the tree")
|
||||
return err
|
||||
}
|
||||
ref := &provider.Reference{
|
||||
Path: utils.MakeRelativePath(filepath.Join(wd, info.Path)),
|
||||
ResourceId: &rootId,
|
||||
}
|
||||
ref, err = p.resolveReference(ownerCtx, ref, info)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Msg("error resolving reference")
|
||||
|
||||
if info == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
ref := &provider.Reference{
|
||||
Path: utils.MakeRelativePath(filepath.Join(wd, info.Path)),
|
||||
ResourceId: &rootID,
|
||||
}
|
||||
p.logger.Debug().Str("path", ref.Path).Msg("Walking tree")
|
||||
|
||||
// Has this item/subtree changed?
|
||||
searchRes, err := p.indexClient.Search(ownerCtx, &searchsvc.SearchIndexRequest{
|
||||
Query: "+ID:" + storagespace.FormatResourceID(*info.Id) + ` +Mtime:>="` + utils.TSToTime(info.Mtime).Format(time.RFC3339Nano) + `"`,
|
||||
})
|
||||
if err == nil && len(searchRes.Matches) >= 1 {
|
||||
if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER {
|
||||
p.logger.Debug().Str("path", ref.Path).Msg("subtree hasn't changed. Skipping.")
|
||||
return filepath.SkipDir
|
||||
}
|
||||
p.logger.Debug().Str("path", ref.Path).Msg("element hasn't changed. Skipping.")
|
||||
return nil
|
||||
}
|
||||
|
||||
err = p.indexClient.Add(ref, info)
|
||||
if err != nil {
|
||||
p.logger.Error().Err(err).Msg("error adding resource to the index")
|
||||
@@ -296,32 +331,11 @@ func (p *Provider) IndexSpace(ctx context.Context, req *searchsvc.IndexSpaceRequ
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
p.logDocCount()
|
||||
return &searchsvc.IndexSpaceResponse{}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) resolveReference(ctx context.Context, ref *provider.Reference, ri *provider.ResourceInfo) (*provider.Reference, error) {
|
||||
if ref.GetResourceId().GetOpaqueId() == ref.GetResourceId().GetSpaceId() {
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
gpRes, err := p.gwClient.GetPath(ctx, &provider.GetPathRequest{
|
||||
ResourceId: ri.Id,
|
||||
})
|
||||
if err != nil || gpRes.Status.Code != rpcv1beta1.Code_CODE_OK {
|
||||
return nil, err
|
||||
}
|
||||
return &provider.Reference{
|
||||
ResourceId: &provider.ResourceId{
|
||||
StorageId: ref.GetResourceId().GetStorageId(),
|
||||
SpaceId: ref.GetResourceId().GetSpaceId(),
|
||||
OpaqueId: ref.GetResourceId().GetSpaceId(),
|
||||
},
|
||||
Path: utils.MakeRelativePath(gpRes.Path),
|
||||
}, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) logDocCount() {
|
||||
|
||||
@@ -2,6 +2,7 @@ package provider_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
@@ -12,6 +13,7 @@ import (
|
||||
sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
searchmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0"
|
||||
@@ -59,8 +61,9 @@ var _ = Describe("Searchprovider", func() {
|
||||
StorageId: "storageid",
|
||||
OpaqueId: "opaqueid",
|
||||
},
|
||||
Path: "foo.pdf",
|
||||
Size: 12345,
|
||||
Path: "foo.pdf",
|
||||
Size: 12345,
|
||||
Mtime: utils.TimeToTS(time.Now().Add(-time.Hour)),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -105,6 +108,7 @@ var _ = Describe("Searchprovider", func() {
|
||||
indexClient.On("Add", mock.Anything, mock.MatchedBy(func(riToIndex *sprovider.ResourceInfo) bool {
|
||||
return riToIndex.Id.OpaqueId == ri.Id.OpaqueId
|
||||
})).Return(nil)
|
||||
indexClient.On("Search", mock.Anything, mock.Anything).Return(&searchsvc.SearchIndexResponse{}, nil)
|
||||
|
||||
res, err := p.IndexSpace(ctx, &searchsvc.IndexSpaceRequest{
|
||||
SpaceId: "storageid$spaceid!spaceid",
|
||||
|
||||
@@ -566,8 +566,6 @@ _ocdav: api compatibility, return correct status code_
|
||||
|
||||
- [apiAuthWebDav/webDavSpecialURLs.feature:13](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L13)
|
||||
- [apiAuthWebDav/webDavSpecialURLs.feature:24](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L24)
|
||||
- [apiAuthWebDav/webDavSpecialURLs.feature:34](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L34)
|
||||
- [apiAuthWebDav/webDavSpecialURLs.feature:45](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L45)
|
||||
- [apiAuthWebDav/webDavSpecialURLs.feature:55](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L55)
|
||||
- [apiAuthWebDav/webDavSpecialURLs.feature:66](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L66)
|
||||
- [apiAuthWebDav/webDavSpecialURLs.feature:76](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiAuthWebDav/webDavSpecialURLs.feature#L76)
|
||||
|
||||
Reference in New Issue
Block a user