diff --git a/services/search/.mockery.yaml b/services/search/.mockery.yaml index 1c2905419..828082b52 100644 --- a/services/search/.mockery.yaml +++ b/services/search/.mockery.yaml @@ -9,10 +9,11 @@ packages: github.com/opencloud-eu/opencloud/services/search/pkg/engine: interfaces: Engine: {} + Batch: {} github.com/opencloud-eu/opencloud/services/search/pkg/content: interfaces: Extractor: {} Retriever: {} github.com/opencloud-eu/opencloud/services/search/pkg/search: interfaces: - Searcher: {} \ No newline at end of file + Searcher: {} diff --git a/services/search/pkg/engine/bleve.go b/services/search/pkg/engine/bleve.go index 8976cbdf7..d66bef7a7 100644 --- a/services/search/pkg/engine/bleve.go +++ b/services/search/pkg/engine/bleve.go @@ -8,7 +8,6 @@ import ( "path/filepath" "reflect" "strings" - "sync" "time" "github.com/blevesearch/bleve/v2" @@ -34,18 +33,54 @@ import ( searchQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query" ) -const _batchSize = 500 - // Bleve represents a search engine which utilizes bleve to search and store resources. type Bleve struct { index bleve.Index queryCreator searchQuery.Creator[query.Query] - batch *bleve.Batch - batchSize int - m sync.Mutex // batch operations in bleve are not thread-safe log log.Logger } +type batch struct { + engine *Bleve + + batch *bleve.Batch + batchSize int + log log.Logger +} + +func (b *batch) Upsert(id string, r Resource) error { + return b.engine.doUpsert(id, r, b) +} + +func (b *batch) Move(id string, parentid string, target string) error { + return b.engine.doMove(id, parentid, target, b) +} + +func (b *batch) Delete(id string) error { + return b.engine.setDeleted(id, true, b) +} + +func (b *batch) Restore(id string) error { + return b.engine.setDeleted(id, false, b) +} + +func (b *batch) Purge(id string) error { + return b.engine.doPurge(id, b) +} + +func (b *batch) End() error { + if b.batch == nil { + return errors.New("no batch started") + } + + b.log.Debug().Int("size", b.batch.Size()).Msg("Ending batch") + if err := b.engine.index.Batch(b.batch); err != nil { + return err + } + + return nil +} + // NewBleveIndex returns a new bleve index // given path must exist. func NewBleveIndex(root string) (bleve.Index, error) { @@ -242,57 +277,34 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques }, nil } -func (b *Bleve) StartBatch(batchSize int) error { - b.m.Lock() - defer b.m.Unlock() - +func (b *Bleve) StartBatch(batchSize int) (Batch, error) { if batchSize <= 0 { - return errors.New("batch size must be greater than 0") + return nil, errors.New("batch size must be greater than 0") } - if b.batch != nil { - b.log.Debug().Msg("reusing another batch that has already been started") - return nil - } - - b.log.Debug().Msg("Starting new batch") - b.batch = b.index.NewBatch() - b.batchSize = batchSize - return nil -} - -func (b *Bleve) EndBatch() error { - b.m.Lock() - defer b.m.Unlock() - - if b.batch == nil { - return errors.New("no batch started") - } - - b.log.Debug().Int("size", b.batch.Size()).Msg("Ending batch") - if err := b.index.Batch(b.batch); err != nil { - return err - } - - b.batch = nil - return nil + return &batch{ + engine: b, + batch: b.index.NewBatch(), + batchSize: batchSize, + }, nil } // Upsert indexes or stores Resource data fields. func (b *Bleve) Upsert(id string, r Resource) error { - b.m.Lock() - defer b.m.Unlock() + return b.doUpsert(id, r, nil) +} - if b.batch != nil { - if err := b.batch.Index(id, r); err != nil { +func (b *Bleve) doUpsert(id string, r Resource, batch *batch) error { + if batch != nil { + if err := batch.batch.Index(id, r); err != nil { return err } - if b.batch.Size() >= b.batchSize { - b.log.Debug().Int("size", b.batch.Size()).Msg("Committing batch") - if err := b.index.Batch(b.batch); err != nil { + if batch.batch.Size() >= batch.batchSize { + b.log.Debug().Int("size", batch.batch.Size()).Msg("Committing batch") + if err := b.index.Batch(batch.batch); err != nil { return err } - b.batch = b.index.NewBatch() + batch.batch = b.index.NewBatch() } return nil } @@ -301,6 +313,10 @@ func (b *Bleve) Upsert(id string, r Resource) error { // Move updates the resource location and all of its necessary fields. func (b *Bleve) Move(id string, parentid string, target string) error { + return b.doMove(id, parentid, target, nil) +} + +func (b *Bleve) doMove(id string, parentid string, target string, batch *batch) error { r, err := b.getResource(id) if err != nil { return err @@ -312,7 +328,7 @@ func (b *Bleve) Move(id string, parentid string, target string) error { r.Path = nextPath r.Name = path.Base(nextPath) r.ParentID = parentid - }) + }, batch) if err != nil { return err } @@ -333,7 +349,7 @@ func (b *Bleve) Move(id string, parentid string, target string) error { for _, h := range res.Hits { _, err := b.updateEntity(h.ID, func(r *Resource) { r.Path = strings.Replace(r.Path, currentPath, nextPath, 1) - }) + }, batch) if err != nil { return err } @@ -348,27 +364,31 @@ func (b *Bleve) Move(id string, parentid string, target string) error { // instead of removing the resource it just marks it as deleted! // can be undone func (b *Bleve) Delete(id string) error { - return b.setDeleted(id, true) + return b.setDeleted(id, true, nil) } // Restore is the counterpart to Delete. // It restores the resource which makes it available again. func (b *Bleve) Restore(id string) error { - return b.setDeleted(id, false) + return b.setDeleted(id, false, nil) } // Purge removes a resource from the index, irreversible operation. func (b *Bleve) Purge(id string) error { - b.m.Lock() - defer b.m.Unlock() + return b.doPurge(id, nil) +} - if b.batch != nil { - b.batch.Delete(id) - if b.batch.Size() >= b.batchSize { - if err := b.index.Batch(b.batch); err != nil { +func (b *Bleve) doPurge(id string, batch *batch) error { + if batch != nil { + err := batch.Delete(id) + if err != nil { + return err + } + if batch.batch.Size() >= batch.batchSize { + if err := b.index.Batch(batch.batch); err != nil { return err } - b.batch = b.index.NewBatch() + batch.batch = b.index.NewBatch() } return nil } @@ -518,7 +538,7 @@ func getPhotoValue[T any](fields map[string]interface{}) *T { return nil } -func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource, error) { +func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource), batch *batch) (*Resource, error) { it, err := b.getResource(id) if err != nil { return nil, err @@ -526,13 +546,13 @@ func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource mutateFunc(it) - return it, b.Upsert(id, *it) + return it, b.doUpsert(id, *it, batch) } -func (b *Bleve) setDeleted(id string, deleted bool) error { +func (b *Bleve) setDeleted(id string, deleted bool, batch *batch) error { it, err := b.updateEntity(id, func(r *Resource) { r.Deleted = deleted - }) + }, batch) if err != nil { return err } @@ -551,17 +571,14 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { return err } - b.StartBatch(_batchSize) - defer b.EndBatch() for _, h := range res.Hits { _, err := b.updateEntity(h.ID, func(r *Resource) { r.Deleted = deleted - }) + }, batch) if err != nil { return err } } - b.EndBatch() } return nil diff --git a/services/search/pkg/engine/bleve_test.go b/services/search/pkg/engine/bleve_test.go index 05974d4ef..b73ee5d58 100644 --- a/services/search/pkg/engine/bleve_test.go +++ b/services/search/pkg/engine/bleve_test.go @@ -499,17 +499,17 @@ var _ = Describe("Bleve", func() { Describe("StartBatch", func() { It("starts a new batch", func() { - err := eng.StartBatch(100) + b, err := eng.StartBatch(100) Expect(err).ToNot(HaveOccurred()) - err = eng.Upsert(childResource.ID, childResource) + err = b.Upsert(childResource.ID, childResource) Expect(err).ToNot(HaveOccurred()) count, err := idx.DocCount() Expect(err).ToNot(HaveOccurred()) Expect(count).To(Equal(uint64(0))) - err = eng.EndBatch() + err = b.End() Expect(err).ToNot(HaveOccurred()) count, err = idx.DocCount() @@ -522,24 +522,29 @@ var _ = Describe("Bleve", func() { Expect(res.Hits.Len()).To(Equal(1)) }) - It("doesn't overwrite batches that are already in progress", func() { - err := eng.StartBatch(100) + It("doesn't intertwine different batches", func() { + b, err := eng.StartBatch(100) Expect(err).ToNot(HaveOccurred()) - err = eng.Upsert(childResource.ID, childResource) + err = b.Upsert(childResource.ID, childResource) Expect(err).ToNot(HaveOccurred()) count, err := idx.DocCount() Expect(err).ToNot(HaveOccurred()) Expect(count).To(Equal(uint64(0))) - err = eng.StartBatch(100) + b2, err := eng.StartBatch(100) Expect(err).ToNot(HaveOccurred()) - err = eng.Upsert(childResource2.ID, childResource2) + err = b2.Upsert(childResource2.ID, childResource2) Expect(err).ToNot(HaveOccurred()) - Expect(eng.EndBatch()).To(Succeed()) + Expect(b.End()).To(Succeed()) + count, err = idx.DocCount() + Expect(err).ToNot(HaveOccurred()) + Expect(count).To(Equal(uint64(1))) + + Expect(b2.End()).To(Succeed()) count, err = idx.DocCount() Expect(err).ToNot(HaveOccurred()) Expect(count).To(Equal(uint64(2))) diff --git a/services/search/pkg/engine/engine.go b/services/search/pkg/engine/engine.go index 1553662f3..8e34b917e 100644 --- a/services/search/pkg/engine/engine.go +++ b/services/search/pkg/engine/engine.go @@ -17,15 +17,25 @@ var queryEscape = regexp.MustCompile(`([` + regexp.QuoteMeta(`+=&|>