mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-04-29 07:19:40 -05:00
fix: adopt search upstream changes
This commit is contained in:
@@ -24,7 +24,7 @@ import (
|
||||
|
||||
const defaultBatchSize = 50
|
||||
|
||||
var _ search.Engine = &Backend{} // ensure Backend implements Engine
|
||||
var _ search.Engine = (*Backend)(nil) // ensure Backend implements Engine
|
||||
|
||||
type Backend struct {
|
||||
index bleve.Index
|
||||
@@ -211,13 +211,13 @@ func (b *Backend) Restore(id string) error {
|
||||
return batch.Push()
|
||||
}
|
||||
|
||||
func (b *Backend) Purge(id string) error {
|
||||
func (b *Backend) Purge(id string, onlyDeleted bool) error {
|
||||
batch, err := b.NewBatch(defaultBatchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := batch.Purge(id); err != nil {
|
||||
if err := batch.Purge(id, onlyDeleted); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -480,6 +480,26 @@ var _ = Describe("Bleve", func() {
|
||||
assertDocCount(rootResource.ID, `"`+parentResource.Document.Name+`"`, 0)
|
||||
assertDocCount(rootResource.ID, `"`+childResource.Document.Name+`"`, 0)
|
||||
})
|
||||
It("removes a resource and ignores its children from the index", func() {
|
||||
err := eng.Upsert(parentResource.ID, parentResource)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
assertDocCount(rootResource.ID, `"`+parentResource.Document.Name+`"`, 1)
|
||||
|
||||
err = eng.Delete(parentResource.ID)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
err = eng.Upsert(childResource.ID, childResource)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
assertDocCount(rootResource.ID, `"`+childResource.Document.Name+`"`, 1)
|
||||
|
||||
err = eng.Purge(parentResource.ID, true)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
assertDocCount(rootResource.ID, `"`+parentResource.Document.Name+`"`, 0)
|
||||
assertDocCount(rootResource.ID, `"`+childResource.Document.Name+`"`, 1)
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Move", func() {
|
||||
|
||||
@@ -2,14 +2,18 @@ package bleve
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/blevesearch/bleve/v2"
|
||||
storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/utils"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
|
||||
)
|
||||
|
||||
var _ search.BatchOperator = &Batch{} // ensure Batch implements BatchOperator
|
||||
var _ search.BatchOperator = (*Batch)(nil) // ensure Batch implements BatchOperator
|
||||
|
||||
type Batch struct {
|
||||
batch *bleve.Batch
|
||||
@@ -38,12 +42,32 @@ func (b *Batch) Upsert(id string, r search.Resource) error {
|
||||
|
||||
func (b *Batch) Move(id, parentID, location string) error {
|
||||
return b.withSizeLimit(func() error {
|
||||
affectedResources, err := searchAndUpdateResourcesLocation(id, parentID, location, b.index)
|
||||
rootResource, err := searchResourceByID(id, b.index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currentPath := rootResource.Path
|
||||
nextPath := utils.MakeRelativePath(location)
|
||||
|
||||
for _, resource := range affectedResources {
|
||||
rootResource.Path = nextPath
|
||||
rootResource.Name = path.Base(nextPath)
|
||||
rootResource.ParentID = parentID
|
||||
|
||||
resources := []*search.Resource{rootResource}
|
||||
|
||||
if rootResource.Type == uint64(storageProvider.ResourceType_RESOURCE_TYPE_CONTAINER) {
|
||||
descendantResources, err := searchResourcesByPath(rootResource.RootID, currentPath, b.index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, descendantResource := range descendantResources {
|
||||
descendantResource.Path = strings.Replace(descendantResource.Path, currentPath, nextPath, 1)
|
||||
resources = append(resources, descendantResource)
|
||||
}
|
||||
}
|
||||
|
||||
for _, resource := range resources {
|
||||
if err := b.batch.Index(resource.ID, resource); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -87,9 +111,39 @@ func (b *Batch) Restore(id string) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Batch) Purge(id string) error {
|
||||
func (b *Batch) Purge(id string, onlyDeleted bool) error {
|
||||
return b.withSizeLimit(func() error {
|
||||
b.batch.Delete(id)
|
||||
rootResource, err := searchResourceByID(id, b.index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var affectResources []*search.Resource
|
||||
add := func(resource *search.Resource) {
|
||||
if onlyDeleted && !resource.Deleted {
|
||||
return
|
||||
}
|
||||
|
||||
affectResources = append(affectResources, resource)
|
||||
}
|
||||
|
||||
add(rootResource)
|
||||
|
||||
if rootResource.Type == uint64(storageProvider.ResourceType_RESOURCE_TYPE_CONTAINER) {
|
||||
descendantResources, err := searchResourcesByPath(rootResource.RootID, rootResource.Path, b.index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, descendantResource := range descendantResources {
|
||||
add(descendantResource)
|
||||
}
|
||||
}
|
||||
|
||||
for _, resource := range affectResources {
|
||||
b.batch.Delete(resource.ID)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,9 +3,7 @@ package bleve
|
||||
import (
|
||||
"errors"
|
||||
"math"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/blevesearch/bleve/v2"
|
||||
"github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
|
||||
@@ -16,7 +14,6 @@ import (
|
||||
"github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode"
|
||||
"github.com/blevesearch/bleve/v2/mapping"
|
||||
storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/utils"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
|
||||
)
|
||||
@@ -125,35 +122,6 @@ func searchResourcesByPath(rootId, lookupPath string, index bleve.Index) ([]*sea
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
func searchAndUpdateResourcesLocation(rootID, parentID, location string, index bleve.Index) ([]*search.Resource, error) {
|
||||
rootResource, err := searchResourceByID(rootID, index)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentPath := rootResource.Path
|
||||
nextPath := utils.MakeRelativePath(location)
|
||||
|
||||
rootResource.Path = nextPath
|
||||
rootResource.Name = path.Base(nextPath)
|
||||
rootResource.ParentID = parentID
|
||||
|
||||
resources := []*search.Resource{rootResource}
|
||||
|
||||
if rootResource.Type == uint64(storageProvider.ResourceType_RESOURCE_TYPE_CONTAINER) {
|
||||
descendantResources, err := searchResourcesByPath(rootResource.RootID, currentPath, index)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, descendantResource := range descendantResources {
|
||||
descendantResource.Path = strings.Replace(descendantResource.Path, currentPath, nextPath, 1)
|
||||
resources = append(resources, descendantResource)
|
||||
}
|
||||
}
|
||||
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
func searchAndUpdateResourcesDeletionState(id string, state bool, index bleve.Index) ([]*search.Resource, error) {
|
||||
rootResource, err := searchResourceByID(id, index)
|
||||
if err != nil {
|
||||
|
||||
@@ -226,13 +226,13 @@ func (b *Backend) Restore(id string) error {
|
||||
return batch.Push()
|
||||
}
|
||||
|
||||
func (b *Backend) Purge(id string) error {
|
||||
func (b *Backend) Purge(id string, onlyDeleted bool) error {
|
||||
batch, err := b.NewBatch(defaultBatchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := batch.Purge(id); err != nil {
|
||||
if err := batch.Purge(id, onlyDeleted); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -212,10 +212,40 @@ func TestEngine_Purge(t *testing.T) {
|
||||
tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document)))
|
||||
tc.Require.IndicesCount([]string{indexName}, nil, 1)
|
||||
|
||||
require.NoError(t, backend.Purge(document.ID))
|
||||
require.NoError(t, backend.Purge(document.ID, false))
|
||||
|
||||
tc.Require.IndicesCount([]string{indexName}, nil, 0)
|
||||
})
|
||||
|
||||
t.Run("purge resource trees", func(t *testing.T) {
|
||||
resourceFolder := opensearchtest.Testdata.Resources.Folder
|
||||
tc.Require.DocumentCreate(indexName, resourceFolder.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, resourceFolder)))
|
||||
|
||||
resourceFile := opensearchtest.Testdata.Resources.File
|
||||
tc.Require.DocumentCreate(indexName, resourceFile.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, resourceFile)))
|
||||
|
||||
tc.Require.IndicesCount([]string{indexName}, nil, 2)
|
||||
|
||||
require.NoError(t, backend.Purge(resourceFolder.ID, false))
|
||||
|
||||
tc.Require.IndicesCount([]string{indexName}, nil, 0)
|
||||
})
|
||||
|
||||
t.Run("purge resource trees and ignores undeleted resources", func(t *testing.T) {
|
||||
resourceFolder := opensearchtest.Testdata.Resources.Folder
|
||||
tc.Require.DocumentCreate(indexName, resourceFolder.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, resourceFolder)))
|
||||
|
||||
resourceFile := opensearchtest.Testdata.Resources.File
|
||||
tc.Require.DocumentCreate(indexName, resourceFile.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, resourceFile)))
|
||||
|
||||
tc.Require.IndicesCount([]string{indexName}, nil, 2)
|
||||
|
||||
require.NoError(t, backend.Delete(resourceFile.ID))
|
||||
tc.Require.IndicesRefresh([]string{indexName}, nil)
|
||||
require.NoError(t, backend.Purge(resourceFolder.ID, true))
|
||||
|
||||
tc.Require.IndicesCount([]string{indexName}, nil, 1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEngine_DocCount(t *testing.T) {
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
|
||||
)
|
||||
|
||||
var _ search.BatchOperator = &Batch{} // ensure Batch implements BatchOperator
|
||||
var _ search.BatchOperator = (*Batch)(nil) // ensure Batch implements BatchOperator
|
||||
|
||||
type Batch struct {
|
||||
client *opensearchgoAPI.Client
|
||||
@@ -136,13 +136,18 @@ func (b *Batch) Restore(id string) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Batch) Purge(id string) error {
|
||||
func (b *Batch) Purge(id string, onlyDeleted bool) error {
|
||||
return b.withSizeLimit(func() error {
|
||||
resource, err := searchResourceByID(context.Background(), b.client, b.index, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get resource: %w", err)
|
||||
}
|
||||
|
||||
query := osu.NewBoolQuery().Must(osu.NewTermQuery[string]("Path").Value(resource.Path))
|
||||
if onlyDeleted {
|
||||
query.Must(osu.NewTermQuery[bool]("Deleted").Value(true))
|
||||
}
|
||||
|
||||
req, err := osu.BuildDocumentDeleteByQueryReq(
|
||||
opensearchgoAPI.DocumentDeleteByQueryReq{
|
||||
Indices: []string{b.index},
|
||||
@@ -150,7 +155,7 @@ func (b *Batch) Purge(id string) error {
|
||||
WaitForCompletion: conversions.ToPointer(true),
|
||||
},
|
||||
},
|
||||
osu.NewTermQuery[string]("Path").Value(resource.Path),
|
||||
query,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build delete by query request: %w", err)
|
||||
|
||||
@@ -16,12 +16,16 @@ var Testdata = struct {
|
||||
Resources resourceTestdata
|
||||
}{
|
||||
Resources: resourceTestdata{
|
||||
File: fromTestData[search.Resource]("resource_file.json"),
|
||||
Root: fromTestData[search.Resource]("resource_root.json"),
|
||||
Folder: fromTestData[search.Resource]("resource_folder.json"),
|
||||
File: fromTestData[search.Resource]("resource_file.json"),
|
||||
},
|
||||
}
|
||||
|
||||
type resourceTestdata struct {
|
||||
File search.Resource
|
||||
Root search.Resource
|
||||
File search.Resource
|
||||
Folder search.Resource
|
||||
}
|
||||
|
||||
func fromTestData[D any](name string) D {
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
{
|
||||
"ID" : "1$2!3",
|
||||
"ID" : "1$1!3",
|
||||
"RootID" : "1$1!1",
|
||||
"ParentID" : "1$1!2",
|
||||
"Path" : "./parent d!r/child.jpg",
|
||||
"Type" : 1,
|
||||
"Title" : "dumme title",
|
||||
"Name" : "dummy name",
|
||||
"Content" : "dummy content",
|
||||
@@ -7,6 +11,8 @@
|
||||
"Mtime" : "2025-07-24 15:15:01.324093 +0200 CEST m=+0.000056251",
|
||||
"MimeType" : "image/jpeg",
|
||||
"Tags" : [ "dummy" ],
|
||||
"Deleted" : false,
|
||||
"Hidden" : false,
|
||||
"audio" : {
|
||||
"album" : "Some Album",
|
||||
"albumArtist" : "Some AlbumArtist",
|
||||
@@ -44,12 +50,5 @@
|
||||
"iso" : 1,
|
||||
"orientation" : 1,
|
||||
"takenDateTime" : "2018-01-01T12:34:56Z"
|
||||
},
|
||||
"omitempty" : "1$2!3",
|
||||
"RootID" : "1$2!1",
|
||||
"Path" : "./doc",
|
||||
"ParentID" : "1$2!2",
|
||||
"Type" : 1,
|
||||
"Deleted" : false,
|
||||
"Hidden" : false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"ID" : "1$1!2",
|
||||
"RootID" : "1$1!1",
|
||||
"ParentID" : "1$1!1",
|
||||
"Path" : "./parent d!r",
|
||||
"Type" : 2
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"ID" : "1$1!1",
|
||||
"RootID" : "1$1!1",
|
||||
"Path" : "."
|
||||
}
|
||||
@@ -32,7 +32,7 @@ type Engine interface {
|
||||
Move(id string, parentid string, target string) error
|
||||
Delete(id string) error
|
||||
Restore(id string) error
|
||||
Purge(id string) error
|
||||
Purge(id string, onlyDeleted bool) error
|
||||
|
||||
NewBatch(batchSize int) (BatchOperator, error)
|
||||
}
|
||||
@@ -42,7 +42,7 @@ type BatchOperator interface {
|
||||
Move(rootID, parentID, location string) error
|
||||
Delete(id string) error
|
||||
Restore(id string) error
|
||||
Purge(id string) error
|
||||
Purge(id string, onlyDeleted bool) error
|
||||
|
||||
Push() error
|
||||
}
|
||||
|
||||
@@ -531,19 +531,13 @@ func (s *Service) PurgeItem(ref *provider.Reference) {
|
||||
return
|
||||
}
|
||||
|
||||
s.engine.StartBatch(s.batchSize)
|
||||
defer func() {
|
||||
if err := s.engine.EndBatch(); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to end batch")
|
||||
}
|
||||
logDocCount(s.engine, s.logger)
|
||||
}()
|
||||
err := s.engine.Purge(storagespace.FormatResourceID(ref.ResourceId), false)
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Interface("Id", ref.ResourceId).Msg("failed to purge item from index")
|
||||
return
|
||||
}
|
||||
s.logger.Info().Interface("Id", ref.ResourceId).Msg("purged item from index")
|
||||
logDocCount(s.engine, s.logger)
|
||||
}
|
||||
|
||||
func (s *Service) PurgeDeleted(spaceID *provider.StorageSpaceId) error {
|
||||
@@ -562,14 +556,14 @@ func (s *Service) PurgeDeleted(spaceID *provider.StorageSpaceId) error {
|
||||
}
|
||||
rootID.OpaqueId = rootID.SpaceId
|
||||
|
||||
s.engine.StartBatch(s.batchSize)
|
||||
defer func() {
|
||||
if err := s.engine.EndBatch(); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to end batch")
|
||||
}
|
||||
logDocCount(s.engine, s.logger)
|
||||
}()
|
||||
return s.engine.Purge(storagespace.FormatResourceID(&rootID), true)
|
||||
if err := s.engine.Purge(storagespace.FormatResourceID(&rootID), true); err != nil {
|
||||
s.logger.Error().Err(err).Interface("Id", &rootID).Msg("failed to purge deleted items from index")
|
||||
return err
|
||||
}
|
||||
|
||||
logDocCount(s.engine, s.logger)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpsertItem indexes or stores Resource data fields.
|
||||
|
||||
Reference in New Issue
Block a user