Do not intertwine different batch operations

This commit is contained in:
André Duffeck
2025-08-07 12:13:40 +02:00
committed by fschade
parent 3164fb2474
commit 9f9e03794d
8 changed files with 516 additions and 137 deletions

View File

@@ -9,10 +9,11 @@ packages:
github.com/opencloud-eu/opencloud/services/search/pkg/engine:
interfaces:
Engine: {}
Batch: {}
github.com/opencloud-eu/opencloud/services/search/pkg/content:
interfaces:
Extractor: {}
Retriever: {}
github.com/opencloud-eu/opencloud/services/search/pkg/search:
interfaces:
Searcher: {}
Searcher: {}

View File

@@ -8,7 +8,6 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"time"
"github.com/blevesearch/bleve/v2"
@@ -34,18 +33,54 @@ import (
searchQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query"
)
const _batchSize = 500
// Bleve represents a search engine which utilizes bleve to search and store resources.
type Bleve struct {
index bleve.Index
queryCreator searchQuery.Creator[query.Query]
batch *bleve.Batch
batchSize int
m sync.Mutex // batch operations in bleve are not thread-safe
log log.Logger
}
type batch struct {
engine *Bleve
batch *bleve.Batch
batchSize int
log log.Logger
}
func (b *batch) Upsert(id string, r Resource) error {
return b.engine.doUpsert(id, r, b)
}
func (b *batch) Move(id string, parentid string, target string) error {
return b.engine.doMove(id, parentid, target, b)
}
func (b *batch) Delete(id string) error {
return b.engine.setDeleted(id, true, b)
}
func (b *batch) Restore(id string) error {
return b.engine.setDeleted(id, false, b)
}
func (b *batch) Purge(id string) error {
return b.engine.doPurge(id, b)
}
func (b *batch) End() error {
if b.batch == nil {
return errors.New("no batch started")
}
b.log.Debug().Int("size", b.batch.Size()).Msg("Ending batch")
if err := b.engine.index.Batch(b.batch); err != nil {
return err
}
return nil
}
// NewBleveIndex returns a new bleve index
// given path must exist.
func NewBleveIndex(root string) (bleve.Index, error) {
@@ -242,57 +277,34 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques
}, nil
}
func (b *Bleve) StartBatch(batchSize int) error {
b.m.Lock()
defer b.m.Unlock()
func (b *Bleve) StartBatch(batchSize int) (Batch, error) {
if batchSize <= 0 {
return errors.New("batch size must be greater than 0")
return nil, errors.New("batch size must be greater than 0")
}
if b.batch != nil {
b.log.Debug().Msg("reusing another batch that has already been started")
return nil
}
b.log.Debug().Msg("Starting new batch")
b.batch = b.index.NewBatch()
b.batchSize = batchSize
return nil
}
func (b *Bleve) EndBatch() error {
b.m.Lock()
defer b.m.Unlock()
if b.batch == nil {
return errors.New("no batch started")
}
b.log.Debug().Int("size", b.batch.Size()).Msg("Ending batch")
if err := b.index.Batch(b.batch); err != nil {
return err
}
b.batch = nil
return nil
return &batch{
engine: b,
batch: b.index.NewBatch(),
batchSize: batchSize,
}, nil
}
// Upsert indexes or stores Resource data fields.
func (b *Bleve) Upsert(id string, r Resource) error {
b.m.Lock()
defer b.m.Unlock()
return b.doUpsert(id, r, nil)
}
if b.batch != nil {
if err := b.batch.Index(id, r); err != nil {
func (b *Bleve) doUpsert(id string, r Resource, batch *batch) error {
if batch != nil {
if err := batch.batch.Index(id, r); err != nil {
return err
}
if b.batch.Size() >= b.batchSize {
b.log.Debug().Int("size", b.batch.Size()).Msg("Committing batch")
if err := b.index.Batch(b.batch); err != nil {
if batch.batch.Size() >= batch.batchSize {
b.log.Debug().Int("size", batch.batch.Size()).Msg("Committing batch")
if err := b.index.Batch(batch.batch); err != nil {
return err
}
b.batch = b.index.NewBatch()
batch.batch = b.index.NewBatch()
}
return nil
}
@@ -301,6 +313,10 @@ func (b *Bleve) Upsert(id string, r Resource) error {
// Move updates the resource location and all of its necessary fields.
func (b *Bleve) Move(id string, parentid string, target string) error {
return b.doMove(id, parentid, target, nil)
}
func (b *Bleve) doMove(id string, parentid string, target string, batch *batch) error {
r, err := b.getResource(id)
if err != nil {
return err
@@ -312,7 +328,7 @@ func (b *Bleve) Move(id string, parentid string, target string) error {
r.Path = nextPath
r.Name = path.Base(nextPath)
r.ParentID = parentid
})
}, batch)
if err != nil {
return err
}
@@ -333,7 +349,7 @@ func (b *Bleve) Move(id string, parentid string, target string) error {
for _, h := range res.Hits {
_, err := b.updateEntity(h.ID, func(r *Resource) {
r.Path = strings.Replace(r.Path, currentPath, nextPath, 1)
})
}, batch)
if err != nil {
return err
}
@@ -348,27 +364,31 @@ func (b *Bleve) Move(id string, parentid string, target string) error {
// instead of removing the resource it just marks it as deleted!
// can be undone
func (b *Bleve) Delete(id string) error {
return b.setDeleted(id, true)
return b.setDeleted(id, true, nil)
}
// Restore is the counterpart to Delete.
// It restores the resource which makes it available again.
func (b *Bleve) Restore(id string) error {
return b.setDeleted(id, false)
return b.setDeleted(id, false, nil)
}
// Purge removes a resource from the index, irreversible operation.
func (b *Bleve) Purge(id string) error {
b.m.Lock()
defer b.m.Unlock()
return b.doPurge(id, nil)
}
if b.batch != nil {
b.batch.Delete(id)
if b.batch.Size() >= b.batchSize {
if err := b.index.Batch(b.batch); err != nil {
func (b *Bleve) doPurge(id string, batch *batch) error {
if batch != nil {
err := batch.Delete(id)
if err != nil {
return err
}
if batch.batch.Size() >= batch.batchSize {
if err := b.index.Batch(batch.batch); err != nil {
return err
}
b.batch = b.index.NewBatch()
batch.batch = b.index.NewBatch()
}
return nil
}
@@ -518,7 +538,7 @@ func getPhotoValue[T any](fields map[string]interface{}) *T {
return nil
}
func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource, error) {
func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource), batch *batch) (*Resource, error) {
it, err := b.getResource(id)
if err != nil {
return nil, err
@@ -526,13 +546,13 @@ func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource
mutateFunc(it)
return it, b.Upsert(id, *it)
return it, b.doUpsert(id, *it, batch)
}
func (b *Bleve) setDeleted(id string, deleted bool) error {
func (b *Bleve) setDeleted(id string, deleted bool, batch *batch) error {
it, err := b.updateEntity(id, func(r *Resource) {
r.Deleted = deleted
})
}, batch)
if err != nil {
return err
}
@@ -551,17 +571,14 @@ func (b *Bleve) setDeleted(id string, deleted bool) error {
return err
}
b.StartBatch(_batchSize)
defer b.EndBatch()
for _, h := range res.Hits {
_, err := b.updateEntity(h.ID, func(r *Resource) {
r.Deleted = deleted
})
}, batch)
if err != nil {
return err
}
}
b.EndBatch()
}
return nil

View File

@@ -499,17 +499,17 @@ var _ = Describe("Bleve", func() {
Describe("StartBatch", func() {
It("starts a new batch", func() {
err := eng.StartBatch(100)
b, err := eng.StartBatch(100)
Expect(err).ToNot(HaveOccurred())
err = eng.Upsert(childResource.ID, childResource)
err = b.Upsert(childResource.ID, childResource)
Expect(err).ToNot(HaveOccurred())
count, err := idx.DocCount()
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(uint64(0)))
err = eng.EndBatch()
err = b.End()
Expect(err).ToNot(HaveOccurred())
count, err = idx.DocCount()
@@ -522,24 +522,29 @@ var _ = Describe("Bleve", func() {
Expect(res.Hits.Len()).To(Equal(1))
})
It("doesn't overwrite batches that are already in progress", func() {
err := eng.StartBatch(100)
It("doesn't intertwine different batches", func() {
b, err := eng.StartBatch(100)
Expect(err).ToNot(HaveOccurred())
err = eng.Upsert(childResource.ID, childResource)
err = b.Upsert(childResource.ID, childResource)
Expect(err).ToNot(HaveOccurred())
count, err := idx.DocCount()
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(uint64(0)))
err = eng.StartBatch(100)
b2, err := eng.StartBatch(100)
Expect(err).ToNot(HaveOccurred())
err = eng.Upsert(childResource2.ID, childResource2)
err = b2.Upsert(childResource2.ID, childResource2)
Expect(err).ToNot(HaveOccurred())
Expect(eng.EndBatch()).To(Succeed())
Expect(b.End()).To(Succeed())
count, err = idx.DocCount()
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(uint64(1)))
Expect(b2.End()).To(Succeed())
count, err = idx.DocCount()
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(uint64(2)))

View File

@@ -17,15 +17,25 @@ var queryEscape = regexp.MustCompile(`([` + regexp.QuoteMeta(`+=&|><!(){}[]^\"~*
// Engine is the interface to the search engine
type Engine interface {
Search(ctx context.Context, req *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error)
DocCount() (uint64, error)
Upsert(id string, r Resource) error
Move(id string, parentid string, target string) error
Delete(id string) error
Restore(id string) error
Purge(id string) error
DocCount() (uint64, error)
StartBatch(batchSize int) error
EndBatch() error
StartBatch(batchSize int) (Batch, error)
}
type Batch interface {
Upsert(id string, r Resource) error
Move(id string, parentid string, target string) error
Delete(id string) error
Restore(id string) error
Purge(id string) error
End() error
}
// Resource is the entity that is stored in the index.

View File

@@ -0,0 +1,354 @@
// Code generated by mockery; DO NOT EDIT.
// github.com/vektra/mockery
// template: testify
package mocks
import (
"github.com/opencloud-eu/opencloud/services/search/pkg/engine"
mock "github.com/stretchr/testify/mock"
)
// NewBatch creates a new instance of Batch. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewBatch(t interface {
mock.TestingT
Cleanup(func())
}) *Batch {
mock := &Batch{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Batch is an autogenerated mock type for the Batch type
type Batch struct {
mock.Mock
}
type Batch_Expecter struct {
mock *mock.Mock
}
func (_m *Batch) EXPECT() *Batch_Expecter {
return &Batch_Expecter{mock: &_m.Mock}
}
// Delete provides a mock function for the type Batch
func (_mock *Batch) Delete(id string) error {
ret := _mock.Called(id)
if len(ret) == 0 {
panic("no return value specified for Delete")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(string) error); ok {
r0 = returnFunc(id)
} else {
r0 = ret.Error(0)
}
return r0
}
// Batch_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete'
type Batch_Delete_Call struct {
*mock.Call
}
// Delete is a helper method to define mock.On call
// - id string
func (_e *Batch_Expecter) Delete(id interface{}) *Batch_Delete_Call {
return &Batch_Delete_Call{Call: _e.mock.On("Delete", id)}
}
func (_c *Batch_Delete_Call) Run(run func(id string)) *Batch_Delete_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 string
if args[0] != nil {
arg0 = args[0].(string)
}
run(
arg0,
)
})
return _c
}
func (_c *Batch_Delete_Call) Return(err error) *Batch_Delete_Call {
_c.Call.Return(err)
return _c
}
func (_c *Batch_Delete_Call) RunAndReturn(run func(id string) error) *Batch_Delete_Call {
_c.Call.Return(run)
return _c
}
// End provides a mock function for the type Batch
func (_mock *Batch) End() error {
ret := _mock.Called()
if len(ret) == 0 {
panic("no return value specified for End")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func() error); ok {
r0 = returnFunc()
} else {
r0 = ret.Error(0)
}
return r0
}
// Batch_End_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'End'
type Batch_End_Call struct {
*mock.Call
}
// End is a helper method to define mock.On call
func (_e *Batch_Expecter) End() *Batch_End_Call {
return &Batch_End_Call{Call: _e.mock.On("End")}
}
func (_c *Batch_End_Call) Run(run func()) *Batch_End_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Batch_End_Call) Return(err error) *Batch_End_Call {
_c.Call.Return(err)
return _c
}
func (_c *Batch_End_Call) RunAndReturn(run func() error) *Batch_End_Call {
_c.Call.Return(run)
return _c
}
// Move provides a mock function for the type Batch
func (_mock *Batch) Move(id string, parentid string, target string) error {
ret := _mock.Called(id, parentid, target)
if len(ret) == 0 {
panic("no return value specified for Move")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(string, string, string) error); ok {
r0 = returnFunc(id, parentid, target)
} else {
r0 = ret.Error(0)
}
return r0
}
// Batch_Move_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Move'
type Batch_Move_Call struct {
*mock.Call
}
// Move is a helper method to define mock.On call
// - id string
// - parentid string
// - target string
func (_e *Batch_Expecter) Move(id interface{}, parentid interface{}, target interface{}) *Batch_Move_Call {
return &Batch_Move_Call{Call: _e.mock.On("Move", id, parentid, target)}
}
func (_c *Batch_Move_Call) Run(run func(id string, parentid string, target string)) *Batch_Move_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 string
if args[0] != nil {
arg0 = args[0].(string)
}
var arg1 string
if args[1] != nil {
arg1 = args[1].(string)
}
var arg2 string
if args[2] != nil {
arg2 = args[2].(string)
}
run(
arg0,
arg1,
arg2,
)
})
return _c
}
func (_c *Batch_Move_Call) Return(err error) *Batch_Move_Call {
_c.Call.Return(err)
return _c
}
func (_c *Batch_Move_Call) RunAndReturn(run func(id string, parentid string, target string) error) *Batch_Move_Call {
_c.Call.Return(run)
return _c
}
// Purge provides a mock function for the type Batch
func (_mock *Batch) Purge(id string) error {
ret := _mock.Called(id)
if len(ret) == 0 {
panic("no return value specified for Purge")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(string) error); ok {
r0 = returnFunc(id)
} else {
r0 = ret.Error(0)
}
return r0
}
// Batch_Purge_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Purge'
type Batch_Purge_Call struct {
*mock.Call
}
// Purge is a helper method to define mock.On call
// - id string
func (_e *Batch_Expecter) Purge(id interface{}) *Batch_Purge_Call {
return &Batch_Purge_Call{Call: _e.mock.On("Purge", id)}
}
func (_c *Batch_Purge_Call) Run(run func(id string)) *Batch_Purge_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 string
if args[0] != nil {
arg0 = args[0].(string)
}
run(
arg0,
)
})
return _c
}
func (_c *Batch_Purge_Call) Return(err error) *Batch_Purge_Call {
_c.Call.Return(err)
return _c
}
func (_c *Batch_Purge_Call) RunAndReturn(run func(id string) error) *Batch_Purge_Call {
_c.Call.Return(run)
return _c
}
// Restore provides a mock function for the type Batch
func (_mock *Batch) Restore(id string) error {
ret := _mock.Called(id)
if len(ret) == 0 {
panic("no return value specified for Restore")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(string) error); ok {
r0 = returnFunc(id)
} else {
r0 = ret.Error(0)
}
return r0
}
// Batch_Restore_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Restore'
type Batch_Restore_Call struct {
*mock.Call
}
// Restore is a helper method to define mock.On call
// - id string
func (_e *Batch_Expecter) Restore(id interface{}) *Batch_Restore_Call {
return &Batch_Restore_Call{Call: _e.mock.On("Restore", id)}
}
func (_c *Batch_Restore_Call) Run(run func(id string)) *Batch_Restore_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 string
if args[0] != nil {
arg0 = args[0].(string)
}
run(
arg0,
)
})
return _c
}
func (_c *Batch_Restore_Call) Return(err error) *Batch_Restore_Call {
_c.Call.Return(err)
return _c
}
func (_c *Batch_Restore_Call) RunAndReturn(run func(id string) error) *Batch_Restore_Call {
_c.Call.Return(run)
return _c
}
// Upsert provides a mock function for the type Batch
func (_mock *Batch) Upsert(id string, r engine.Resource) error {
ret := _mock.Called(id, r)
if len(ret) == 0 {
panic("no return value specified for Upsert")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(string, engine.Resource) error); ok {
r0 = returnFunc(id, r)
} else {
r0 = ret.Error(0)
}
return r0
}
// Batch_Upsert_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Upsert'
type Batch_Upsert_Call struct {
*mock.Call
}
// Upsert is a helper method to define mock.On call
// - id string
// - r engine.Resource
func (_e *Batch_Expecter) Upsert(id interface{}, r interface{}) *Batch_Upsert_Call {
return &Batch_Upsert_Call{Call: _e.mock.On("Upsert", id, r)}
}
func (_c *Batch_Upsert_Call) Run(run func(id string, r engine.Resource)) *Batch_Upsert_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 string
if args[0] != nil {
arg0 = args[0].(string)
}
var arg1 engine.Resource
if args[1] != nil {
arg1 = args[1].(engine.Resource)
}
run(
arg0,
arg1,
)
})
return _c
}
func (_c *Batch_Upsert_Call) Return(err error) *Batch_Upsert_Call {
_c.Call.Return(err)
return _c
}
func (_c *Batch_Upsert_Call) RunAndReturn(run func(id string, r engine.Resource) error) *Batch_Upsert_Call {
_c.Call.Return(run)
return _c
}

View File

@@ -143,50 +143,6 @@ func (_c *Engine_DocCount_Call) RunAndReturn(run func() (uint64, error)) *Engine
return _c
}
// EndBatch provides a mock function for the type Engine
func (_mock *Engine) EndBatch() error {
ret := _mock.Called()
if len(ret) == 0 {
panic("no return value specified for EndBatch")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func() error); ok {
r0 = returnFunc()
} else {
r0 = ret.Error(0)
}
return r0
}
// Engine_EndBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EndBatch'
type Engine_EndBatch_Call struct {
*mock.Call
}
// EndBatch is a helper method to define mock.On call
func (_e *Engine_Expecter) EndBatch() *Engine_EndBatch_Call {
return &Engine_EndBatch_Call{Call: _e.mock.On("EndBatch")}
}
func (_c *Engine_EndBatch_Call) Run(run func()) *Engine_EndBatch_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Engine_EndBatch_Call) Return(err error) *Engine_EndBatch_Call {
_c.Call.Return(err)
return _c
}
func (_c *Engine_EndBatch_Call) RunAndReturn(run func() error) *Engine_EndBatch_Call {
_c.Call.Return(run)
return _c
}
// Move provides a mock function for the type Engine
func (_mock *Engine) Move(id string, parentid string, target string) error {
ret := _mock.Called(id, parentid, target)
@@ -421,20 +377,31 @@ func (_c *Engine_Search_Call) RunAndReturn(run func(ctx context.Context, req *v0
}
// StartBatch provides a mock function for the type Engine
func (_mock *Engine) StartBatch(batchSize int) error {
func (_mock *Engine) StartBatch(batchSize int) (engine.Batch, error) {
ret := _mock.Called(batchSize)
if len(ret) == 0 {
panic("no return value specified for StartBatch")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(int) error); ok {
var r0 engine.Batch
var r1 error
if returnFunc, ok := ret.Get(0).(func(int) (engine.Batch, error)); ok {
return returnFunc(batchSize)
}
if returnFunc, ok := ret.Get(0).(func(int) engine.Batch); ok {
r0 = returnFunc(batchSize)
} else {
r0 = ret.Error(0)
if ret.Get(0) != nil {
r0 = ret.Get(0).(engine.Batch)
}
}
return r0
if returnFunc, ok := ret.Get(1).(func(int) error); ok {
r1 = returnFunc(batchSize)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Engine_StartBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartBatch'
@@ -461,12 +428,12 @@ func (_c *Engine_StartBatch_Call) Run(run func(batchSize int)) *Engine_StartBatc
return _c
}
func (_c *Engine_StartBatch_Call) Return(err error) *Engine_StartBatch_Call {
_c.Call.Return(err)
func (_c *Engine_StartBatch_Call) Return(batch engine.Batch, err error) *Engine_StartBatch_Call {
_c.Call.Return(batch, err)
return _c
}
func (_c *Engine_StartBatch_Call) RunAndReturn(run func(batchSize int) error) *Engine_StartBatch_Call {
func (_c *Engine_StartBatch_Call) RunAndReturn(run func(batchSize int) (engine.Batch, error)) *Engine_StartBatch_Call {
_c.Call.Return(run)
return _c
}

View File

@@ -463,9 +463,12 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
}()
w := walker.NewWalker(s.gatewaySelector)
s.engine.StartBatch(s.batchSize)
batch, err := s.engine.StartBatch(s.batchSize)
if err != nil {
return err
}
defer func() {
if err := s.engine.EndBatch(); err != nil {
if err := batch.End(); err != nil {
s.logger.Error().Err(err).Msg("failed to end batch")
}
}()
@@ -498,7 +501,7 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
return nil
}
s.UpsertItem(ref)
s.doUpsertItem(ref, batch)
return nil
})
@@ -515,7 +518,17 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
// TrashItem marks the item as deleted.
func (s *Service) TrashItem(rID *provider.ResourceId) {
err := s.engine.Delete(storagespace.FormatResourceID(rID))
batch, err := s.engine.StartBatch(s.batchSize)
if err != nil {
s.logger.Error().Err(err).Msg("failed to start batch")
return
}
defer func() {
if err := batch.End(); err != nil {
s.logger.Error().Err(err).Msg("failed to end batch")
}
}()
err = batch.Delete(storagespace.FormatResourceID(rID))
if err != nil {
s.logger.Error().Err(err).Interface("Id", rID).Msg("failed to remove item from index")
}
@@ -523,6 +536,11 @@ func (s *Service) TrashItem(rID *provider.ResourceId) {
// UpsertItem indexes or stores Resource data fields.
func (s *Service) UpsertItem(ref *provider.Reference) {
s.doUpsertItem(ref, nil)
}
// doUpsertItem indexes or stores Resource data fields.
func (s *Service) doUpsertItem(ref *provider.Reference, batch engine.Batch) {
ctx, stat, path := s.resInfo(ref)
if ctx == nil || stat == nil || path == "" {
return
@@ -551,7 +569,12 @@ func (s *Service) UpsertItem(ref *provider.Reference) {
r.ParentID = storagespace.FormatResourceID(parentID)
}
if err = s.engine.Upsert(r.ID, r); err != nil {
if batch != nil {
err = batch.Upsert(r.ID, r)
} else {
err = s.engine.Upsert(r.ID, r)
}
if err != nil {
s.logger.Error().Err(err).Msg("error adding updating the resource in the index")
} else {
logDocCount(s.engine, s.logger)

View File

@@ -117,19 +117,21 @@ var _ = Describe("Searchprovider", func() {
Describe("IndexSpace", func() {
It("walks the space and indexes all files", func() {
batch := &engineMocks.Batch{}
batch.EXPECT().End().Return(nil)
gatewayClient.On("GetUserByClaim", mock.Anything, mock.Anything).Return(&userv1beta1.GetUserByClaimResponse{
Status: status.NewOK(context.Background()),
User: user,
}, nil)
extractor.On("Extract", mock.Anything, mock.Anything, mock.Anything).Return(content.Document{}, nil)
indexClient.On("StartBatch", mock.Anything, mock.Anything).Return(nil)
indexClient.On("EndBatch", mock.Anything, mock.Anything).Return(nil)
indexClient.On("Upsert", mock.Anything, mock.Anything).Return(nil)
indexClient.On("StartBatch", mock.Anything, mock.Anything).Return(batch, nil)
batch.On("Upsert", mock.Anything, mock.Anything).Return(nil)
indexClient.On("Search", mock.Anything, mock.Anything).Return(&searchsvc.SearchIndexResponse{}, nil)
gatewayClient.On("Stat", mock.Anything, mock.Anything).Return(&sprovider.StatResponse{
Status: status.NewOK(context.Background()),
Info: ri,
}, nil)
err := s.IndexSpace(&sprovider.StorageSpaceId{OpaqueId: "storageid$spaceid!spaceid"})
Expect(err).ShouldNot(HaveOccurred())
})