diff --git a/services/search/pkg/bleve/backend.go b/services/search/pkg/bleve/backend.go index 98488257b2..3358f442f9 100644 --- a/services/search/pkg/bleve/backend.go +++ b/services/search/pkg/bleve/backend.go @@ -160,48 +160,70 @@ func (b *Backend) DocCount() (uint64, error) { } func (b *Backend) Upsert(id string, r search.Resource) error { - return b.withBatch(func(batch search.BatchOperator) error { - return batch.Upsert(id, r) - }) -} - -func (b *Backend) Move(rootID, parentID, location string) error { - return b.withBatch(func(batch search.BatchOperator) error { - return batch.Move(rootID, parentID, location) - }) -} - -func (b *Backend) Delete(id string) error { - return b.withBatch(func(batch search.BatchOperator) error { - return batch.Delete(id) - }) -} - -func (b *Backend) Restore(id string) error { - return b.withBatch(func(batch search.BatchOperator) error { - return batch.Restore(id) - }) -} - -func (b *Backend) Purge(id string) error { - return b.withBatch(func(batch search.BatchOperator) error { - return batch.Purge(id) - }) -} - -func (b *Backend) NewBatch(size int) (search.BatchOperator, error) { - return NewBatch(b.index, size) -} - -func (b *Backend) withBatch(f func(batch search.BatchOperator) error) error { batch, err := b.NewBatch(defaultBatchSize) if err != nil { return err } - if err := f(batch); err != nil { + if err := batch.Upsert(id, r); err != nil { return err } return batch.Push() } + +func (b *Backend) Move(rootID, parentID, location string) error { + batch, err := b.NewBatch(defaultBatchSize) + if err != nil { + return err + } + + if err := batch.Move(rootID, parentID, location); err != nil { + return err + } + + return batch.Push() +} + +func (b *Backend) Delete(id string) error { + batch, err := b.NewBatch(defaultBatchSize) + if err != nil { + return err + } + + if err := batch.Delete(id); err != nil { + return err + } + + return batch.Push() +} + +func (b *Backend) Restore(id string) error { + batch, err := b.NewBatch(defaultBatchSize) + if err != nil { + return err + } + + if err := batch.Restore(id); err != nil { + return err + } + + return batch.Push() +} + +func (b *Backend) Purge(id string) error { + batch, err := b.NewBatch(defaultBatchSize) + if err != nil { + return err + } + + if err := batch.Purge(id); err != nil { + return err + } + + return batch.Push() +} + +func (b *Backend) NewBatch(size int) (search.BatchOperator, error) { + return NewBatch(b.index, size) +} diff --git a/services/search/pkg/opensearch/backend.go b/services/search/pkg/opensearch/backend.go index 2ffb72571d..a1af1afbc6 100644 --- a/services/search/pkg/opensearch/backend.go +++ b/services/search/pkg/opensearch/backend.go @@ -1,27 +1,27 @@ package opensearch import ( - "bytes" "context" - "encoding/json" "fmt" - "path" "strings" "time" storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" - opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" "github.com/opencloud-eu/opencloud/pkg/conversions" searchMessage "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0" searchService "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0" - "github.com/opencloud-eu/opencloud/services/search/pkg/engine" "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/convert" "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/osu" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" ) +const defaultBatchSize = 50 + var ( ErrUnhealthyCluster = fmt.Errorf("cluster is not healthy") ) @@ -66,7 +66,7 @@ func NewBackend(index string, client *opensearchgoAPI.Client) (*Backend, error) return &Backend{index: index, client: client}, nil } -func (be *Backend) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) { +func (b *Backend) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) { boolQuery, err := convert.KQLToOpenSearchBoolQuery(sir.Query) if err != nil { return nil, fmt.Errorf("failed to convert KQL query to OpenSearch bool query: %w", err) @@ -104,7 +104,7 @@ func (be *Backend) Search(ctx context.Context, sir *searchService.SearchIndexReq } req, err := osu.BuildSearchReq(&opensearchgoAPI.SearchReq{ - Indices: []string{be.index}, + Indices: []string{b.index}, Params: searchParams, }, boolQuery, @@ -122,7 +122,7 @@ func (be *Backend) Search(ctx context.Context, sir *searchService.SearchIndexReq return nil, fmt.Errorf("failed to build search request: %w", err) } - resp, err := be.client.Search(ctx, req) + resp, err := b.client.Search(ctx, req) if err != nil { return nil, fmt.Errorf("failed to search: %w", err) } @@ -155,101 +155,10 @@ func (be *Backend) Search(ctx context.Context, sir *searchService.SearchIndexReq }, nil } -func (be *Backend) Upsert(id string, r engine.Resource) error { - body, err := json.Marshal(r) - if err != nil { - return fmt.Errorf("failed to marshal resource: %w", err) - } - - _, err = be.client.Index(context.TODO(), opensearchgoAPI.IndexReq{ - Index: be.index, - DocumentID: id, - Body: bytes.NewReader(body), - }) - if err != nil { - return fmt.Errorf("failed to index document: %w", err) - } - - return nil -} - -func (be *Backend) Move(id string, parentID string, target string) error { - return be.updateSelfAndDescendants(id, func(rootResource engine.Resource) *osu.BodyParamScript { - return &osu.BodyParamScript{ - Source: ` - if (ctx._source.ID == params.id ) { ctx._source.Name = params.newName; ctx._source.ParentID = params.parentID; } - ctx._source.Path = ctx._source.Path.replace(params.oldPath, params.newPath) - `, - Lang: "painless", - Params: map[string]any{ - "id": id, - "parentID": parentID, - "oldPath": rootResource.Path, - "newPath": utils.MakeRelativePath(target), - "newName": path.Base(utils.MakeRelativePath(target)), - }, - } - }) -} - -func (be *Backend) Delete(id string) error { - return be.updateSelfAndDescendants(id, func(_ engine.Resource) *osu.BodyParamScript { - return &osu.BodyParamScript{ - Source: "ctx._source.Deleted = params.deleted", - Lang: "painless", - Params: map[string]any{ - "deleted": true, - }, - } - }) -} - -func (be *Backend) Restore(id string) error { - return be.updateSelfAndDescendants(id, func(_ engine.Resource) *osu.BodyParamScript { - return &osu.BodyParamScript{ - Source: "ctx._source.Deleted = params.deleted", - Lang: "painless", - Params: map[string]any{ - "deleted": false, - }, - } - }) -} - -func (be *Backend) Purge(id string) error { - resource, err := be.getResource(id) - if err != nil { - return fmt.Errorf("failed to get resource: %w", err) - } - - req, err := osu.BuildDocumentDeleteByQueryReq( - opensearchgoAPI.DocumentDeleteByQueryReq{ - Indices: []string{be.index}, - Params: opensearchgoAPI.DocumentDeleteByQueryParams{ - WaitForCompletion: conversions.ToPointer(true), - }, - }, - osu.NewTermQuery[string]("Path").Value(resource.Path), - ) - if err != nil { - return fmt.Errorf("failed to build delete by query request: %w", err) - } - - resp, err := be.client.Document.DeleteByQuery(context.TODO(), req) - switch { - case err != nil: - return fmt.Errorf("failed to delete by query: %w", err) - case len(resp.Failures) != 0: - return fmt.Errorf("failed to delete by query, failures: %v", resp.Failures) - } - - return nil -} - -func (be *Backend) DocCount() (uint64, error) { +func (b *Backend) DocCount() (uint64, error) { req, err := osu.BuildIndicesCountReq( &opensearchgoAPI.IndicesCountReq{ - Indices: []string{be.index}, + Indices: []string{b.index}, }, osu.NewTermQuery[bool]("Deleted").Value(false), ) @@ -257,7 +166,7 @@ func (be *Backend) DocCount() (uint64, error) { return 0, fmt.Errorf("failed to build count request: %w", err) } - resp, err := be.client.Indices.Count(context.TODO(), req) + resp, err := b.client.Indices.Count(context.TODO(), req) if err != nil { return 0, fmt.Errorf("failed to count documents: %w", err) } @@ -265,74 +174,71 @@ func (be *Backend) DocCount() (uint64, error) { return uint64(resp.Count), nil } -func (be *Backend) updateSelfAndDescendants(id string, scriptProvider func(engine.Resource) *osu.BodyParamScript) error { - if scriptProvider == nil { - return fmt.Errorf("script cannot be nil") - } - - resource, err := be.getResource(id) +func (b *Backend) Upsert(id string, r search.Resource) error { + batch, err := b.NewBatch(defaultBatchSize) if err != nil { - return fmt.Errorf("failed to get resource: %w", err) + return err } - req, err := osu.BuildUpdateByQueryReq( - opensearchgoAPI.UpdateByQueryReq{ - Indices: []string{be.index}, - Params: opensearchgoAPI.UpdateByQueryParams{ - WaitForCompletion: conversions.ToPointer(true), - }, - }, - osu.NewTermQuery[string]("Path").Value(resource.Path), - osu.UpdateByQueryBodyParams{ - Script: scriptProvider(resource), - }, - ) - if err != nil { - return fmt.Errorf("failed to build update by query request: %w", err) + if err := batch.Upsert(id, r); err != nil { + return err } - resp, err := be.client.UpdateByQuery(context.TODO(), req) - switch { - case err != nil: - return fmt.Errorf("failed to update by query: %w", err) - case len(resp.Failures) != 0: - return fmt.Errorf("failed to update by query, failures: %v", resp.Failures) - } - - return nil + return batch.Push() } -func (be *Backend) getResource(id string) (engine.Resource, error) { - req, err := osu.BuildSearchReq( - &opensearchgoAPI.SearchReq{ - Indices: []string{be.index}, - }, - osu.NewIDsQuery(id), - ) +func (b *Backend) Move(id string, parentID string, target string) error { + batch, err := b.NewBatch(defaultBatchSize) if err != nil { - return engine.Resource{}, fmt.Errorf("failed to build search request: %w", err) + return err } - resp, err := be.client.Search(context.TODO(), req) - switch { - case err != nil: - return engine.Resource{}, fmt.Errorf("failed to search for resource: %w", err) - case resp.Hits.Total.Value == 0 || len(resp.Hits.Hits) == 0: - return engine.Resource{}, fmt.Errorf("document with id %s not found", id) + if err := batch.Move(id, parentID, target); err != nil { + return err } - resource, err := conversions.To[engine.Resource](resp.Hits.Hits[0].Source) + return batch.Push() +} + +func (b *Backend) Delete(id string) error { + batch, err := b.NewBatch(defaultBatchSize) if err != nil { - return engine.Resource{}, fmt.Errorf("failed to convert hit source: %w", err) + return err } - return resource, nil + if err := batch.Delete(id); err != nil { + return err + } + + return batch.Push() } -func (be *Backend) StartBatch(_ int) error { - return nil // todo: implement batch processing +func (b *Backend) Restore(id string) error { + batch, err := b.NewBatch(defaultBatchSize) + if err != nil { + return err + } + + if err := batch.Restore(id); err != nil { + return err + } + + return batch.Push() } -func (be *Backend) EndBatch() error { - return nil // todo: implement batch processing +func (b *Backend) Purge(id string) error { + batch, err := b.NewBatch(defaultBatchSize) + if err != nil { + return err + } + + if err := batch.Purge(id); err != nil { + return err + } + + return batch.Push() +} + +func (b *Backend) NewBatch(size int) (search.BatchOperator, error) { + return NewBatch(b.client, b.index, size) } diff --git a/services/search/pkg/opensearch/backend_test.go b/services/search/pkg/opensearch/backend_test.go index 6b653c7dff..ab11a5f6ab 100644 --- a/services/search/pkg/opensearch/backend_test.go +++ b/services/search/pkg/opensearch/backend_test.go @@ -10,9 +10,9 @@ import ( "github.com/stretchr/testify/require" searchService "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0" - "github.com/opencloud-eu/opencloud/services/search/pkg/engine" "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch" "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" ) func TestNewBackend(t *testing.T) { @@ -116,14 +116,14 @@ func TestEngine_Move(t *testing.T) { }, }) - resources := opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits) + resources := opensearchtest.SearchHitsMustBeConverted[search.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits) require.Len(t, resources, 1) require.Equal(t, document.Path, resources[0].Path) document.Path = "./new/path/to/resource" require.NoError(t, backend.Move(document.ID, document.ParentID, document.Path)) - resources = opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits) + resources = opensearchtest.SearchHitsMustBeConverted[search.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits) require.Len(t, resources, 1) require.Equal(t, document.Path, resources[0].Path) }) diff --git a/services/search/pkg/opensearch/batch.go b/services/search/pkg/opensearch/batch.go new file mode 100644 index 0000000000..6ac5bb6083 --- /dev/null +++ b/services/search/pkg/opensearch/batch.go @@ -0,0 +1,243 @@ +package opensearch + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "path" + "strings" + "sync" + + "github.com/opencloud-eu/reva/v2/pkg/utils" + opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + + "github.com/opencloud-eu/opencloud/pkg/conversions" + "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/osu" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" +) + +var _ search.BatchOperator = &Batch{} // ensure Batch implements BatchOperator + +type Batch struct { + client *opensearchgoAPI.Client + index string + size int + log log.Logger + operations []any + mu sync.Mutex +} + +func NewBatch(client *opensearchgoAPI.Client, index string, size int) (*Batch, error) { + if size <= 0 { + return nil, errors.New("batch size must be greater than 0") + } + + return &Batch{ + client: client, + size: size, + index: index, + }, nil +} + +func (b *Batch) Upsert(id string, r search.Resource) error { + return b.withSizeLimit(func() error { + body, err := conversions.To[map[string]any](r) + if err != nil { + return fmt.Errorf("failed to marshal resource: %w", err) + } + + op := func() []map[string]any { + return []map[string]any{ + {"index": map[string]any{"_index": b.index, "_id": id}}, + body, + } + } + + b.mu.Lock() + b.operations = append(b.operations, op) + b.mu.Unlock() + + return nil + }) +} + +func (b *Batch) Move(id, parentID, location string) error { + return b.withSizeLimit(func() error { + op := func() error { + return updateSelfAndDescendants(context.Background(), b.client, b.index, id, func(rootResource search.Resource) *osu.BodyParamScript { + return &osu.BodyParamScript{ + Source: ` + if (ctx._source.ID == params.id ) { ctx._source.Name = params.newName; ctx._source.ParentID = params.parentID; } + ctx._source.Path = ctx._source.Path.replace(params.oldPath, params.newPath) + `, + Lang: "painless", + Params: map[string]any{ + "id": id, + "parentID": parentID, + "oldPath": rootResource.Path, + "newPath": utils.MakeRelativePath(location), + "newName": path.Base(utils.MakeRelativePath(location)), + }, + } + }) + } + + b.mu.Lock() + b.operations = append(b.operations, op) + b.mu.Unlock() + + return nil + }) +} + +func (b *Batch) Delete(id string) error { + return b.withSizeLimit(func() error { + op := func() error { + return updateSelfAndDescendants(context.Background(), b.client, b.index, id, func(_ search.Resource) *osu.BodyParamScript { + return &osu.BodyParamScript{ + Source: "ctx._source.Deleted = params.deleted", + Lang: "painless", + Params: map[string]any{ + "deleted": true, + }, + } + }) + } + + b.mu.Lock() + b.operations = append(b.operations, op) + b.mu.Unlock() + + return nil + }) +} + +func (b *Batch) Restore(id string) error { + return b.withSizeLimit(func() error { + op := func() error { + return updateSelfAndDescendants(context.Background(), b.client, b.index, id, func(_ search.Resource) *osu.BodyParamScript { + return &osu.BodyParamScript{ + Source: "ctx._source.Deleted = params.deleted", + Lang: "painless", + Params: map[string]any{ + "deleted": false, + }, + } + }) + } + + b.mu.Lock() + b.operations = append(b.operations, op) + b.mu.Unlock() + + return nil + }) +} + +func (b *Batch) Purge(id string) error { + return b.withSizeLimit(func() error { + resource, err := searchResourceByID(context.Background(), b.client, b.index, id) + if err != nil { + return fmt.Errorf("failed to get resource: %w", err) + } + + req, err := osu.BuildDocumentDeleteByQueryReq( + opensearchgoAPI.DocumentDeleteByQueryReq{ + Indices: []string{b.index}, + Params: opensearchgoAPI.DocumentDeleteByQueryParams{ + WaitForCompletion: conversions.ToPointer(true), + }, + }, + osu.NewTermQuery[string]("Path").Value(resource.Path), + ) + if err != nil { + return fmt.Errorf("failed to build delete by query request: %w", err) + } + + op := func() error { + resp, err := b.client.Document.DeleteByQuery(context.TODO(), req) + switch { + case err != nil: + return fmt.Errorf("failed to delete by query: %w", err) + case len(resp.Failures) != 0: + return fmt.Errorf("failed to delete by query, failures: %v", resp.Failures) + } + + return nil + } + + b.mu.Lock() + b.operations = append(b.operations, op) + b.mu.Unlock() + + return nil + }) +} + +func (b *Batch) Push() error { + b.mu.Lock() + defer b.mu.Unlock() + defer func() { // cleanup + b.operations = nil + }() + + var bulkOperations []map[string]any + pushBulkOperations := func() error { + if len(bulkOperations) == 0 { + return nil + } + + var body strings.Builder + for _, operation := range bulkOperations { + part, err := json.Marshal(operation) + if err != nil { + return fmt.Errorf("failed to marshal bulk operation: %w", err) + } + body.Write(part) + body.WriteString("\n") + } + + if _, err := b.client.Bulk(context.Background(), opensearchgoAPI.BulkReq{ + Body: strings.NewReader(body.String()), + }); err != nil { + return fmt.Errorf("failed to execute bulk operations: %w", err) + } + + bulkOperations = nil + return nil + } + + // keep the order of operations in the batch intact, + // unfortunately, operations like DeleteByQuery cannot be part of the bulk API, + // so we need to push the previous bulk operations before executing such operations + // this might lead to smaller bulks than the configured size, but ensures correct order + for _, operation := range b.operations { + switch op := operation.(type) { + case func() []map[string]any: + bulkOperations = append(bulkOperations, op()...) + case func() error: + if err := pushBulkOperations(); err != nil { + return fmt.Errorf("failed to push operations: %w", err) + } + if err := op(); err != nil { + return fmt.Errorf("failed to execute operation: %w", err) + } + } + } + + return pushBulkOperations() +} + +func (b *Batch) withSizeLimit(f func() error) error { + if err := f(); err != nil { + return err + } + + if len(b.operations) >= b.size { + return b.Push() + } + + return nil +} diff --git a/services/search/pkg/opensearch/internal/convert/opensearch.go b/services/search/pkg/opensearch/internal/convert/opensearch.go index 8febaaf7ec..c4d8212dcd 100644 --- a/services/search/pkg/opensearch/internal/convert/opensearch.go +++ b/services/search/pkg/opensearch/internal/convert/opensearch.go @@ -5,17 +5,18 @@ import ( "strings" "time" - "github.com/opencloud-eu/reva/v2/pkg/storagespace" opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/opencloud-eu/reva/v2/pkg/storagespace" + "github.com/opencloud-eu/opencloud/pkg/conversions" searchMessage "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0" - "github.com/opencloud-eu/opencloud/services/search/pkg/engine" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" ) func OpenSearchHitToMatch(hit opensearchgoAPI.SearchHit) (*searchMessage.Match, error) { - resource, err := conversions.To[engine.Resource](hit.Source) + resource, err := conversions.To[search.Resource](hit.Source) if err != nil { return nil, fmt.Errorf("failed to convert hit source: %w", err) } diff --git a/services/search/pkg/opensearch/internal/test/testdata.go b/services/search/pkg/opensearch/internal/test/testdata.go index d46b0b8aec..84ceef3a93 100644 --- a/services/search/pkg/opensearch/internal/test/testdata.go +++ b/services/search/pkg/opensearch/internal/test/testdata.go @@ -6,7 +6,7 @@ import ( "fmt" "path" - "github.com/opencloud-eu/opencloud/services/search/pkg/engine" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" ) //go:embed testdata/*.json @@ -16,12 +16,12 @@ var Testdata = struct { Resources resourceTestdata }{ Resources: resourceTestdata{ - File: fromTestData[engine.Resource]("resource_file.json"), + File: fromTestData[search.Resource]("resource_file.json"), }, } type resourceTestdata struct { - File engine.Resource + File search.Resource } func fromTestData[D any](name string) D { diff --git a/services/search/pkg/opensearch/opensearch.go b/services/search/pkg/opensearch/opensearch.go new file mode 100644 index 0000000000..0b25c5d718 --- /dev/null +++ b/services/search/pkg/opensearch/opensearch.go @@ -0,0 +1,77 @@ +package opensearch + +import ( + "context" + "fmt" + + opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + + "github.com/opencloud-eu/opencloud/services/search/pkg/search" + + "github.com/opencloud-eu/opencloud/pkg/conversions" + "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/osu" +) + +func searchResourceByID(ctx context.Context, client *opensearchgoAPI.Client, index string, id string) (search.Resource, error) { + req, err := osu.BuildSearchReq( + &opensearchgoAPI.SearchReq{ + Indices: []string{index}, + }, + osu.NewIDsQuery(id), + ) + if err != nil { + return search.Resource{}, fmt.Errorf("failed to build search request: %w", err) + } + + resp, err := client.Search(ctx, req) + switch { + case err != nil: + return search.Resource{}, fmt.Errorf("failed to search for resource: %w", err) + case resp.Hits.Total.Value == 0 || len(resp.Hits.Hits) == 0: + return search.Resource{}, fmt.Errorf("document with id %s not found", id) + } + + resource, err := conversions.To[search.Resource](resp.Hits.Hits[0].Source) + if err != nil { + return search.Resource{}, fmt.Errorf("failed to convert hit source: %w", err) + } + + return resource, nil +} + +func updateSelfAndDescendants(ctx context.Context, client *opensearchgoAPI.Client, index string, id string, scriptProvider func(search.Resource) *osu.BodyParamScript) error { + if scriptProvider == nil { + return fmt.Errorf("script cannot be nil") + } + + resource, err := searchResourceByID(context.Background(), client, index, id) + if err != nil { + return fmt.Errorf("failed to get resource: %w", err) + } + + req, err := osu.BuildUpdateByQueryReq( + opensearchgoAPI.UpdateByQueryReq{ + Indices: []string{index}, + Params: opensearchgoAPI.UpdateByQueryParams{ + WaitForCompletion: conversions.ToPointer(true), + }, + }, + osu.NewTermQuery[string]("Path").Value(resource.Path), + osu.UpdateByQueryBodyParams{ + Script: scriptProvider(resource), + }, + ) + if err != nil { + return fmt.Errorf("failed to build update by query request: %w", err) + } + + resp, err := client.UpdateByQuery(ctx, req) + switch { + case err != nil: + return fmt.Errorf("failed to update by query: %w", err) + case len(resp.Failures) != 0: + return fmt.Errorf("failed to update by query, failures: %v", resp.Failures) + } + + return nil +} diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index b0927ed54c..914000ff77 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -31,7 +31,6 @@ import ( "github.com/opencloud-eu/opencloud/services/search/pkg/bleve" "github.com/opencloud-eu/opencloud/services/search/pkg/config" "github.com/opencloud-eu/opencloud/services/search/pkg/content" - "github.com/opencloud-eu/opencloud/services/search/pkg/engine" "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch" bleveQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query/bleve" "github.com/opencloud-eu/opencloud/services/search/pkg/search"