Improve indexing performance using batches

This commit is contained in:
André Duffeck
2025-08-04 10:53:31 +02:00
parent 1afc1331af
commit 3c8e2dacfd
6 changed files with 250 additions and 7 deletions
+81 -3
View File
@@ -8,6 +8,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"time"
"github.com/blevesearch/bleve/v2"
@@ -20,10 +21,11 @@ import (
"github.com/blevesearch/bleve/v2/mapping"
"github.com/blevesearch/bleve/v2/search/query"
storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
libregraph "github.com/opencloud-eu/libre-graph-api-go"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/reva/v2/pkg/errtypes"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
"github.com/opencloud-eu/reva/v2/pkg/utils"
libregraph "github.com/opencloud-eu/libre-graph-api-go"
"google.golang.org/protobuf/types/known/timestamppb"
searchMessage "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0"
@@ -32,10 +34,16 @@ import (
searchQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query"
)
const _batchSize = 500
// Bleve represents a search engine which utilizes bleve to search and store resources.
type Bleve struct {
index bleve.Index
queryCreator searchQuery.Creator[query.Query]
batch *bleve.Batch
batchSize int
m sync.Mutex // batch operations in bleve are not thread-safe
log log.Logger
}
// NewBleveIndex returns a new bleve index
@@ -60,10 +68,11 @@ func NewBleveIndex(root string) (bleve.Index, error) {
}
// NewBleveEngine creates a new Bleve instance
func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query]) *Bleve {
func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query], log log.Logger) *Bleve {
return &Bleve{
index: index,
queryCreator: queryCreator,
log: log,
}
}
@@ -233,8 +242,60 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques
}, nil
}
func (b *Bleve) StartBatch(batchSize int) error {
b.m.Lock()
defer b.m.Unlock()
if batchSize <= 0 {
return errors.New("batch size must be greater than 0")
}
if b.batch != nil {
b.log.Debug().Msg("reusing another batch that has already been started")
return nil
}
b.log.Debug().Msg("Starting new batch")
b.batch = b.index.NewBatch()
b.batchSize = batchSize
return nil
}
func (b *Bleve) EndBatch() error {
b.m.Lock()
defer b.m.Unlock()
if b.batch == nil {
return errors.New("no batch started")
}
b.log.Debug().Int("size", b.batch.Size()).Msg("Ending batch")
if err := b.index.Batch(b.batch); err != nil {
return err
}
b.batch = nil
return nil
}
// Upsert indexes or stores Resource data fields.
func (b *Bleve) Upsert(id string, r Resource) error {
b.m.Lock()
defer b.m.Unlock()
if b.batch != nil {
if err := b.batch.Index(id, r); err != nil {
return err
}
if b.batch.Size() >= b.batchSize {
b.log.Debug().Int("size", b.batch.Size()).Msg("Committing batch")
if err := b.index.Batch(b.batch); err != nil {
return err
}
b.batch = b.index.NewBatch()
}
return nil
}
return b.index.Index(id, r)
}
@@ -298,6 +359,19 @@ func (b *Bleve) Restore(id string) error {
// Purge removes a resource from the index, irreversible operation.
func (b *Bleve) Purge(id string) error {
b.m.Lock()
defer b.m.Unlock()
if b.batch != nil {
b.batch.Delete(id)
if b.batch.Size() >= b.batchSize {
if err := b.index.Batch(b.batch); err != nil {
return err
}
b.batch = b.index.NewBatch()
}
return nil
}
return b.index.Delete(id)
}
@@ -452,7 +526,7 @@ func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource
mutateFunc(it)
return it, b.index.Index(it.ID, it)
return it, b.Upsert(id, *it)
}
func (b *Bleve) setDeleted(id string, deleted bool) error {
@@ -468,6 +542,7 @@ func (b *Bleve) setDeleted(id string, deleted bool) error {
bleve.NewQueryStringQuery("RootID:"+it.RootID),
bleve.NewQueryStringQuery("Path:"+escapeQuery(it.Path+"/*")),
)
bleveReq := bleve.NewSearchRequest(q)
bleveReq.Size = math.MaxInt
bleveReq.Fields = []string{"*"}
@@ -476,6 +551,8 @@ func (b *Bleve) setDeleted(id string, deleted bool) error {
return err
}
b.StartBatch(_batchSize)
defer b.EndBatch()
for _, h := range res.Hits {
_, err := b.updateEntity(h.ID, func(r *Resource) {
r.Deleted = deleted
@@ -484,6 +561,7 @@ func (b *Bleve) setDeleted(id string, deleted bool) error {
return err
}
}
b.EndBatch()
}
return nil
+63 -3
View File
@@ -8,9 +8,10 @@ import (
sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
libregraph "github.com/opencloud-eu/libre-graph-api-go"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
"github.com/opencloud-eu/opencloud/pkg/log"
searchmsg "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0"
searchsvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0"
"github.com/opencloud-eu/opencloud/services/search/pkg/content"
@@ -53,6 +54,7 @@ var _ = Describe("Bleve", func() {
rootResource engine.Resource
parentResource engine.Resource
childResource engine.Resource
childResource2 engine.Resource
)
BeforeEach(func() {
@@ -62,7 +64,7 @@ var _ = Describe("Bleve", func() {
idx, err = bleveSearch.NewMemOnly(mapping)
Expect(err).ToNot(HaveOccurred())
eng = engine.NewBleveEngine(idx, bleve.DefaultCreator)
eng = engine.NewBleveEngine(idx, bleve.DefaultCreator, log.Logger{})
Expect(err).ToNot(HaveOccurred())
rootResource = engine.Resource{
@@ -89,11 +91,20 @@ var _ = Describe("Bleve", func() {
Type: uint64(sprovider.ResourceType_RESOURCE_TYPE_FILE),
Document: content.Document{Name: "child.pdf"},
}
childResource2 = engine.Resource{
ID: "1$2!5",
ParentID: parentResource.ID,
RootID: rootResource.ID,
Path: "./parent d!r/child2.pdf",
Type: uint64(sprovider.ResourceType_RESOURCE_TYPE_FILE),
Document: content.Document{Name: "child2.pdf"},
}
})
Describe("New", func() {
It("returns a new index instance", func() {
b := engine.NewBleveEngine(idx, bleve.DefaultCreator)
b := engine.NewBleveEngine(idx, bleve.DefaultCreator, log.Logger{})
Expect(b).ToNot(BeNil())
})
})
@@ -486,6 +497,55 @@ var _ = Describe("Bleve", func() {
})
})
Describe("StartBatch", func() {
It("starts a new batch", func() {
err := eng.StartBatch(100)
Expect(err).ToNot(HaveOccurred())
err = eng.Upsert(childResource.ID, childResource)
Expect(err).ToNot(HaveOccurred())
count, err := idx.DocCount()
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(uint64(0)))
err = eng.EndBatch()
Expect(err).ToNot(HaveOccurred())
count, err = idx.DocCount()
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(uint64(1)))
query := bleveSearch.NewMatchQuery("child.pdf")
res, err := idx.Search(bleveSearch.NewSearchRequest(query))
Expect(err).ToNot(HaveOccurred())
Expect(res.Hits.Len()).To(Equal(1))
})
It("doesn't overwrite batches that are already in progress", func() {
err := eng.StartBatch(100)
Expect(err).ToNot(HaveOccurred())
err = eng.Upsert(childResource.ID, childResource)
Expect(err).ToNot(HaveOccurred())
count, err := idx.DocCount()
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(uint64(0)))
err = eng.StartBatch(100)
Expect(err).ToNot(HaveOccurred())
err = eng.Upsert(childResource2.ID, childResource2)
Expect(err).ToNot(HaveOccurred())
Expect(eng.EndBatch()).To(Succeed())
count, err = idx.DocCount()
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(uint64(2)))
})
})
Describe("File type specific metadata", func() {
Context("with audio metadata", func() {
+3
View File
@@ -23,6 +23,9 @@ type Engine interface {
Restore(id string) error
Purge(id string) error
DocCount() (uint64, error)
StartBatch(batchSize int) error
EndBatch() error
}
// Resource is the entity that is stored in the index.
@@ -143,6 +143,50 @@ func (_c *Engine_DocCount_Call) RunAndReturn(run func() (uint64, error)) *Engine
return _c
}
// EndBatch provides a mock function for the type Engine
func (_mock *Engine) EndBatch() error {
ret := _mock.Called()
if len(ret) == 0 {
panic("no return value specified for EndBatch")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func() error); ok {
r0 = returnFunc()
} else {
r0 = ret.Error(0)
}
return r0
}
// Engine_EndBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EndBatch'
type Engine_EndBatch_Call struct {
*mock.Call
}
// EndBatch is a helper method to define mock.On call
func (_e *Engine_Expecter) EndBatch() *Engine_EndBatch_Call {
return &Engine_EndBatch_Call{Call: _e.mock.On("EndBatch")}
}
func (_c *Engine_EndBatch_Call) Run(run func()) *Engine_EndBatch_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Engine_EndBatch_Call) Return(err error) *Engine_EndBatch_Call {
_c.Call.Return(err)
return _c
}
func (_c *Engine_EndBatch_Call) RunAndReturn(run func() error) *Engine_EndBatch_Call {
_c.Call.Return(run)
return _c
}
// Move provides a mock function for the type Engine
func (_mock *Engine) Move(id string, parentid string, target string) error {
ret := _mock.Called(id, parentid, target)
@@ -376,6 +420,57 @@ func (_c *Engine_Search_Call) RunAndReturn(run func(ctx context.Context, req *v0
return _c
}
// StartBatch provides a mock function for the type Engine
func (_mock *Engine) StartBatch(batchSize int) error {
ret := _mock.Called(batchSize)
if len(ret) == 0 {
panic("no return value specified for StartBatch")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(int) error); ok {
r0 = returnFunc(batchSize)
} else {
r0 = ret.Error(0)
}
return r0
}
// Engine_StartBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartBatch'
type Engine_StartBatch_Call struct {
*mock.Call
}
// StartBatch is a helper method to define mock.On call
// - batchSize int
func (_e *Engine_Expecter) StartBatch(batchSize interface{}) *Engine_StartBatch_Call {
return &Engine_StartBatch_Call{Call: _e.mock.On("StartBatch", batchSize)}
}
func (_c *Engine_StartBatch_Call) Run(run func(batchSize int)) *Engine_StartBatch_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 int
if args[0] != nil {
arg0 = args[0].(int)
}
run(
arg0,
)
})
return _c
}
func (_c *Engine_StartBatch_Call) Return(err error) *Engine_StartBatch_Call {
_c.Call.Return(err)
return _c
}
func (_c *Engine_StartBatch_Call) RunAndReturn(run func(batchSize int) error) *Engine_StartBatch_Call {
_c.Call.Return(run)
return _c
}
// Upsert provides a mock function for the type Engine
func (_mock *Engine) Upsert(id string, r engine.Resource) error {
ret := _mock.Called(id, r)
+7
View File
@@ -41,6 +41,7 @@ const (
_spaceTypeProject = "project"
_spaceTypeGrant = "grant"
_slowQueryDuration = 500 * time.Millisecond
_batchSize = 500
)
// Searcher is the interface to the SearchService
@@ -459,6 +460,12 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
}()
w := walker.NewWalker(s.gatewaySelector)
s.engine.StartBatch(_batchSize)
defer func() {
if err := s.engine.EndBatch(); err != nil {
s.logger.Error().Err(err).Msg("failed to end batch")
}
}()
err = w.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error {
if err != nil {
s.logger.Error().Err(err).Msg("error walking the tree")
@@ -53,7 +53,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
_ = idx.Close()
}
eng = engine.NewBleveEngine(idx, bleve.DefaultCreator)
eng = engine.NewBleveEngine(idx, bleve.DefaultCreator, logger)
default:
return nil, teardown, fmt.Errorf("unknown search engine: %s", cfg.Engine.Type)
}