diff --git a/services/search/pkg/opensearch/engine.go b/services/search/pkg/opensearch/engine.go index 63b04ddf2d..3c68e385d3 100644 --- a/services/search/pkg/opensearch/engine.go +++ b/services/search/pkg/opensearch/engine.go @@ -4,9 +4,7 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" - "math" "path" "strings" @@ -22,8 +20,6 @@ import ( "github.com/opencloud-eu/opencloud/services/search/pkg/engine" ) -var ErrNoContainerType = fmt.Errorf("not a container type") - type Engine struct { index string client *opensearchgoAPI.Client @@ -89,40 +85,37 @@ func (e *Engine) Search(ctx context.Context, sir *searchService.SearchIndexReque ) } - body, err := NewRootQuery(boolQuery, RootQueryOptions{ - Highlight: &RootQueryHighlight{ - PreTags: []string{""}, - PostTags: []string{""}, - Fields: map[string]RootQueryHighlight{ - "Content": {}, - }, - }, - }).MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal query: %w", err) - } - searchParams := opensearchgoAPI.SearchParams{} switch { case sir.PageSize == -1: - searchParams.Size = conversions.ToPointer(math.MaxInt) + searchParams.Size = conversions.ToPointer(1000) case sir.PageSize == 0: searchParams.Size = conversions.ToPointer(200) default: searchParams.Size = conversions.ToPointer(int(sir.PageSize)) } - // fixMe: see getDescendants - if *searchParams.Size > 250 { - searchParams.Size = conversions.ToPointer(250) + req, err := BuildSearchReq(&opensearchgoAPI.SearchReq{ + Indices: []string{e.index}, + Params: searchParams, + }, + boolQuery, + SearchReqOptions{ + Highlight: &HighlightOption{ + PreTags: []string{""}, + PostTags: []string{""}, + Fields: map[string]HighlightOption{ + "Content": {}, + }, + }, + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to build search request: %w", err) } - resp, err := e.client.Search(ctx, &opensearchgoAPI.SearchReq{ - Indices: []string{e.index}, - Body: bytes.NewReader(body), - Params: searchParams, - }) + resp, err := e.client.Search(ctx, req) if err != nil { return nil, fmt.Errorf("failed to search: %w", err) } @@ -174,67 +167,87 @@ func (e *Engine) Upsert(id string, r engine.Resource) error { } func (e *Engine) Move(id string, parentID string, target string) error { + return e.updateSelfAndDescendants(id, func(rootResource engine.Resource) *ScriptOption { + return &ScriptOption{ + Source: ` + if (ctx._source.ID == params.id ) { ctx._source.Name = params.newName; ctx._source.ParentID = params.parentID; } + ctx._source.Path = ctx._source.Path.replace(params.oldPath, params.newPath) + `, + Lang: "painless", + Params: map[string]any{ + "id": id, + "parentID": parentID, + "oldPath": rootResource.Path, + "newPath": utils.MakeRelativePath(target), + "newName": path.Base(utils.MakeRelativePath(target)), + }, + } + }) +} + +func (e *Engine) Delete(id string) error { + return e.updateSelfAndDescendants(id, func(_ engine.Resource) *ScriptOption { + return &ScriptOption{ + Source: "ctx._source.Deleted = params.deleted", + Lang: "painless", + Params: map[string]any{ + "deleted": true, + }, + } + }) +} + +func (e *Engine) Restore(id string) error { + return e.updateSelfAndDescendants(id, func(_ engine.Resource) *ScriptOption { + return &ScriptOption{ + Source: "ctx._source.Deleted = params.deleted", + Lang: "painless", + Params: map[string]any{ + "deleted": false, + }, + } + }) +} + +func (e *Engine) Purge(id string) error { resource, err := e.getResource(id) if err != nil { return fmt.Errorf("failed to get resource: %w", err) } - oldPath := resource.Path - resource.Path = utils.MakeRelativePath(target) - resource.Name = path.Base(resource.Path) - resource.ParentID = parentID - - if err := e.Upsert(id, resource); err != nil { - return fmt.Errorf("failed to upsert resource: %w", err) - } - - descendants, err := e.getDescendants(resource.Type, resource.RootID, oldPath) - if err != nil && !errors.Is(err, ErrNoContainerType) { - return fmt.Errorf("failed to find descendants: %w", err) - } - - for _, descendant := range descendants { - descendant.Path = strings.Replace(descendant.Path, oldPath, resource.Path, 1) - if err := e.Upsert(descendant.ID, descendant); err != nil { - return fmt.Errorf("failed to upsert resource: %w", err) - } - } - - return nil -} - -func (e *Engine) Delete(id string) error { - return e.deleteResource(id, true) -} - -func (e *Engine) Restore(id string) error { - return e.deleteResource(id, false) -} - -func (e *Engine) Purge(id string) error { - _, err := e.client.Document.Delete(context.TODO(), opensearchgoAPI.DocumentDeleteReq{ - Index: e.index, - DocumentID: id, - }) + req, err := BuildDocumentDeleteByQueryReq( + opensearchgoAPI.DocumentDeleteByQueryReq{ + Indices: []string{e.index}, + }, + NewTermQuery[string]("Path").Value(resource.Path), + ) if err != nil { - return fmt.Errorf("failed to purge document: %w", err) + return fmt.Errorf("failed to build delete by query request: %w", err) + } + + resp, err := e.client.Document.DeleteByQuery(context.TODO(), req) + switch { + case err != nil: + return fmt.Errorf("failed to delete by query: %w", err) + case len(resp.Failures) != 0: + return fmt.Errorf("failed to delete by query, failures: %v", resp.Failures) } return nil } func (e *Engine) DocCount() (uint64, error) { - body, err := NewRootQuery( + req, err := BuildIndicesCountReq( + &opensearchgoAPI.IndicesCountReq{ + Indices: []string{e.index}, + }, NewTermQuery[bool]("Deleted").Value(false), - ).MarshalJSON() + ) if err != nil { - return 0, fmt.Errorf("failed to marshal query: %w", err) + return 0, fmt.Errorf("failed to build count request: %w", err) } - resp, err := e.client.Indices.Count(context.TODO(), &opensearchgoAPI.IndicesCountReq{ - Indices: []string{e.index}, - Body: bytes.NewReader(body), - }) + resp, err := e.client.Indices.Count(context.TODO(), req) if err != nil { return 0, fmt.Errorf("failed to count documents: %w", err) } @@ -242,63 +255,52 @@ func (e *Engine) DocCount() (uint64, error) { return uint64(resp.Count), nil } -func (e *Engine) StartBatch(_ int) error { - return nil -} +func (e *Engine) updateSelfAndDescendants(id string, scriptProvider func(engine.Resource) *ScriptOption) error { + if scriptProvider == nil { + return fmt.Errorf("script cannot be nil") + } -func (e *Engine) EndBatch() error { - return nil -} - -func (e *Engine) deleteResource(id string, deleted bool) error { resource, err := e.getResource(id) if err != nil { return fmt.Errorf("failed to get resource: %w", err) } - descendants, err := e.getDescendants(resource.Type, resource.RootID, resource.Path) - if err != nil && !errors.Is(err, ErrNoContainerType) { - return fmt.Errorf("failed to find descendants: %w", err) - } - - body, err := json.Marshal(map[string]any{ - "doc": map[string]bool{ - "Deleted": deleted, + req, err := BuildUpdateByQueryReq( + opensearchgoAPI.UpdateByQueryReq{ + Indices: []string{e.index}, }, - }) + NewTermQuery[string]("Path").Value(resource.Path), + UpdateByQueryReqOptions{ + Script: scriptProvider(resource), + }, + ) if err != nil { - return fmt.Errorf("failed to marshal body: %w", err) + return fmt.Errorf("failed to build update by query request: %w", err) } - for _, resource := range append([]engine.Resource{resource}, descendants...) { - if resource.Deleted == deleted { - continue // already marked as the desired state - } - - if _, err = e.client.Update(context.TODO(), opensearchgoAPI.UpdateReq{ - Index: e.index, - DocumentID: resource.ID, - Body: bytes.NewReader(body), - }); err != nil { - return fmt.Errorf("failed to mark document as deleted: %w", err) - } + resp, err := e.client.UpdateByQuery(context.TODO(), req) + switch { + case err != nil: + return fmt.Errorf("failed to update by query: %w", err) + case len(resp.Failures) != 0: + return fmt.Errorf("failed to update by query, failures: %v", resp.Failures) } return nil } func (e *Engine) getResource(id string) (engine.Resource, error) { - body, err := NewRootQuery( + req, err := BuildSearchReq( + &opensearchgoAPI.SearchReq{ + Indices: []string{e.index}, + }, NewIDsQuery([]string{id}), - ).MarshalJSON() + ) if err != nil { - return engine.Resource{}, fmt.Errorf("failed to marshal query: %w", err) + return engine.Resource{}, fmt.Errorf("failed to build search request: %w", err) } - resp, err := e.client.Search(context.TODO(), &opensearchgoAPI.SearchReq{ - Indices: []string{e.index}, - Body: bytes.NewReader(body), - }) + resp, err := e.client.Search(context.TODO(), req) switch { case err != nil: return engine.Resource{}, fmt.Errorf("failed to search for resource: %w", err) @@ -314,74 +316,10 @@ func (e *Engine) getResource(id string) (engine.Resource, error) { return resource, nil } -func (e *Engine) getDescendants(resourceType uint64, rootID, rootPath string) ([]engine.Resource, error) { - switch { - case resourceType != uint64(storageProvider.ResourceType_RESOURCE_TYPE_CONTAINER): - return nil, fmt.Errorf("%w: %d", ErrNoContainerType, resourceType) - case rootID == "": - return nil, fmt.Errorf("rootID cannot be empty") - case rootPath == "": - return nil, fmt.Errorf("rootPath cannot be empty") - } - - if !strings.HasSuffix(rootPath, "*") { - rootPath = strings.Join(append(strings.Split(rootPath, "/"), "*"), "/") - } - - body, err := NewRootQuery( - NewBoolQuery().Must( - NewTermQuery[string]("RootID").Value(rootID), - NewWildcardQuery("Path").Value(rootPath), - ), - ).MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal query: %w", err) - } - - // unfortunately, we need to use recursion to fetch all descendants because of the paging - // toDo: check the docs to find a better and most important more efficient way to fetch (or update) all descendants - var doSearch func(params opensearchgoAPI.SearchParams) ([]engine.Resource, error) - doSearch = func(params opensearchgoAPI.SearchParams) ([]engine.Resource, error) { - resp, err := e.client.Search(context.TODO(), &opensearchgoAPI.SearchReq{ - Indices: []string{e.index}, - Body: bytes.NewReader(body), - Params: params, - }) - switch { - case err != nil: - return nil, fmt.Errorf("failed to search for document: %w", err) - case resp.Hits.Total.Value == 0 || len(resp.Hits.Hits) == 0: - return nil, nil // no descendants found, fin - } - - descendants := make([]engine.Resource, len(resp.Hits.Hits)) - for i, hit := range resp.Hits.Hits { - descendant, err := convert[engine.Resource](hit.Source) - if err != nil { - return nil, fmt.Errorf("failed to convert hit source %d: %w", i, err) - } - - descendants[i] = descendant - } - - if len(descendants) < resp.Hits.Total.Value { - switch params.From { - case nil: - params.From = opensearchgoAPI.ToPointer(len(resp.Hits.Hits)) - default: - params.From = opensearchgoAPI.ToPointer(*params.From + len(resp.Hits.Hits)) - - } - moreDescendants, err := doSearch(params) - if err != nil { - return nil, fmt.Errorf("failed to search for more descendants: %w", err) - } - - descendants = append(descendants, moreDescendants...) - } - - return descendants, nil - } - - return doSearch(opensearchgoAPI.SearchParams{}) +func (e *Engine) StartBatch(_ int) error { + return nil // todo: implement batch processing +} + +func (e *Engine) EndBatch() error { + return nil // todo: implement batch processing } diff --git a/services/search/pkg/opensearch/engine_test.go b/services/search/pkg/opensearch/engine_test.go index e57c41a049..6d0578ff90 100644 --- a/services/search/pkg/opensearch/engine_test.go +++ b/services/search/pkg/opensearch/engine_test.go @@ -2,6 +2,7 @@ package opensearch_test import ( "fmt" + "strings" "testing" opensearchgo "github.com/opensearch-project/opensearch-go/v4" @@ -33,7 +34,7 @@ func TestEngine_Search(t *testing.T) { indexName := "opencloud-test-resource" tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{indexName}) - tc.Require.IndicesCount([]string{indexName}, "", 0) + tc.Require.IndicesCount([]string{indexName}, nil, 0) defer tc.Require.IndicesDelete([]string{indexName}) @@ -41,8 +42,8 @@ func TestEngine_Search(t *testing.T) { require.NoError(t, err) document := opensearchtest.Testdata.Resources.File - tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document)) - tc.Require.IndicesCount([]string{indexName}, "", 1) + tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document))) + tc.Require.IndicesCount([]string{indexName}, nil, 1) t.Run("most simple search", func(t *testing.T) { resp, err := backend.Search(t.Context(), &searchService.SearchIndexRequest{ @@ -59,8 +60,8 @@ func TestEngine_Search(t *testing.T) { deletedDocument.ID = "1$2!4" deletedDocument.Deleted = true - tc.Require.DocumentCreate(indexName, deletedDocument.ID, opensearchtest.JSONMustMarshal(t, deletedDocument)) - tc.Require.IndicesCount([]string{indexName}, "", 2) + tc.Require.DocumentCreate(indexName, deletedDocument.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, deletedDocument))) + tc.Require.IndicesCount([]string{indexName}, nil, 2) resp, err := backend.Search(t.Context(), &searchService.SearchIndexRequest{ Query: fmt.Sprintf(`"%s"`, document.Name), @@ -76,7 +77,7 @@ func TestEngine_Upsert(t *testing.T) { indexName := "opencloud-test-resource" tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{indexName}) - tc.Require.IndicesCount([]string{indexName}, "", 0) + tc.Require.IndicesCount([]string{indexName}, nil, 0) defer tc.Require.IndicesDelete([]string{indexName}) @@ -87,7 +88,7 @@ func TestEngine_Upsert(t *testing.T) { document := opensearchtest.Testdata.Resources.File require.NoError(t, backend.Upsert(document.ID, document)) - tc.Require.IndicesCount([]string{indexName}, "", 1) + tc.Require.IndicesCount([]string{indexName}, nil, 1) }) } @@ -95,7 +96,7 @@ func TestEngine_Move(t *testing.T) { indexName := "opencloud-test-resource" tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{indexName}) - tc.Require.IndicesCount([]string{indexName}, "", 0) + tc.Require.IndicesCount([]string{indexName}, nil, 0) defer tc.Require.IndicesDelete([]string{indexName}) @@ -104,31 +105,25 @@ func TestEngine_Move(t *testing.T) { t.Run("moves the document to a new path", func(t *testing.T) { document := opensearchtest.Testdata.Resources.File - tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document)) - tc.Require.IndicesCount([]string{indexName}, "", 1) + tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document))) + tc.Require.IndicesCount([]string{indexName}, nil, 1) - resources := opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, - tc.Require.Search( - indexName, - opensearch.NewRootQuery( - opensearch.NewIDsQuery([]string{document.ID}), - ).String(), - ).Hits, - ) + body := opensearchtest.JSONMustMarshal(t, map[string]any{ + "query": map[string]any{ + "ids": map[string]any{ + "values": []string{document.ID}, + }, + }, + }) + + resources := opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits) require.Len(t, resources, 1) require.Equal(t, document.Path, resources[0].Path) document.Path = "./new/path/to/resource" require.NoError(t, backend.Move(document.ID, document.ParentID, document.Path)) - resources = opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, - tc.Require.Search( - indexName, - opensearch.NewRootQuery( - opensearch.NewIDsQuery([]string{document.ID}), - ).String(), - ).Hits, - ) + resources = opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits) require.Len(t, resources, 1) require.Equal(t, document.Path, resources[0].Path) }) @@ -138,7 +133,7 @@ func TestEngine_Delete(t *testing.T) { indexName := "opencloud-test-resource" tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{indexName}) - tc.Require.IndicesCount([]string{indexName}, "", 0) + tc.Require.IndicesCount([]string{indexName}, nil, 0) defer tc.Require.IndicesDelete([]string{indexName}) @@ -147,17 +142,23 @@ func TestEngine_Delete(t *testing.T) { t.Run("mark document as deleted", func(t *testing.T) { document := opensearchtest.Testdata.Resources.File - tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document)) - tc.Require.IndicesCount([]string{indexName}, "", 1) + tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document))) + tc.Require.IndicesCount([]string{indexName}, nil, 1) - tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery( - opensearch.NewTermQuery[bool]("Deleted").Value(true), - ).String(), 0) + body := opensearchtest.JSONMustMarshal(t, map[string]any{ + "query": map[string]any{ + "term": map[string]any{ + "Deleted": map[string]any{ + "value": true, + }, + }, + }, + }) + + tc.Require.IndicesCount([]string{indexName}, strings.NewReader(body), 0) require.NoError(t, backend.Delete(document.ID)) - tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery( - opensearch.NewTermQuery[bool]("Deleted").Value(true), - ).String(), 1) + tc.Require.IndicesCount([]string{indexName}, strings.NewReader(body), 1) }) } @@ -165,7 +166,7 @@ func TestEngine_Restore(t *testing.T) { indexName := "opencloud-test-resource" tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{indexName}) - tc.Require.IndicesCount([]string{indexName}, "", 0) + tc.Require.IndicesCount([]string{indexName}, nil, 0) defer tc.Require.IndicesDelete([]string{indexName}) @@ -175,17 +176,23 @@ func TestEngine_Restore(t *testing.T) { t.Run("mark document as not deleted", func(t *testing.T) { document := opensearchtest.Testdata.Resources.File document.Deleted = true - tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document)) - tc.Require.IndicesCount([]string{indexName}, "", 1) + tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document))) + tc.Require.IndicesCount([]string{indexName}, nil, 1) - tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery( - opensearch.NewTermQuery[bool]("Deleted").Value(true), - ).String(), 1) + body := opensearchtest.JSONMustMarshal(t, map[string]any{ + "query": map[string]any{ + "term": map[string]any{ + "Deleted": map[string]any{ + "value": true, + }, + }, + }, + }) + + tc.Require.IndicesCount([]string{indexName}, strings.NewReader(body), 1) require.NoError(t, backend.Restore(document.ID)) - tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery( - opensearch.NewTermQuery[bool]("Deleted").Value(true), - ).String(), 0) + tc.Require.IndicesCount([]string{indexName}, strings.NewReader(body), 0) }) } @@ -193,7 +200,7 @@ func TestEngine_Purge(t *testing.T) { indexName := "opencloud-test-resource" tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{indexName}) - tc.Require.IndicesCount([]string{indexName}, "", 0) + tc.Require.IndicesCount([]string{indexName}, nil, 0) defer tc.Require.IndicesDelete([]string{indexName}) @@ -202,12 +209,12 @@ func TestEngine_Purge(t *testing.T) { t.Run("purge with full document", func(t *testing.T) { document := opensearchtest.Testdata.Resources.File - tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document)) - tc.Require.IndicesCount([]string{indexName}, "", 1) + tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document))) + tc.Require.IndicesCount([]string{indexName}, nil, 1) require.NoError(t, backend.Purge(document.ID)) - tc.Require.IndicesCount([]string{indexName}, "", 0) + tc.Require.IndicesCount([]string{indexName}, nil, 0) }) } @@ -215,7 +222,7 @@ func TestEngine_DocCount(t *testing.T) { indexName := "opencloud-test-resource" tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{indexName}) - tc.Require.IndicesCount([]string{indexName}, "", 0) + tc.Require.IndicesCount([]string{indexName}, nil, 0) defer tc.Require.IndicesDelete([]string{indexName}) @@ -224,20 +231,20 @@ func TestEngine_DocCount(t *testing.T) { t.Run("ignore deleted documents", func(t *testing.T) { document := opensearchtest.Testdata.Resources.File - tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document)) - tc.Require.IndicesCount([]string{indexName}, "", 1) + tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document))) + tc.Require.IndicesCount([]string{indexName}, nil, 1) count, err := backend.DocCount() require.NoError(t, err) require.Equal(t, uint64(1), count) - tc.Require.Update(indexName, document.ID, opensearchtest.JSONMustMarshal(t, map[string]any{ + tc.Require.Update(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, map[string]any{ "doc": map[string]any{ "Deleted": true, }, - })) + }))) - tc.Require.IndicesCount([]string{indexName}, "", 1) + tc.Require.IndicesCount([]string{indexName}, nil, 1) count, err = backend.DocCount() require.NoError(t, err) diff --git a/services/search/pkg/opensearch/internal/indexes/resource_v1.json b/services/search/pkg/opensearch/internal/indexes/resource_v1.json index d2adc58077..f0f719c4c5 100644 --- a/services/search/pkg/opensearch/internal/indexes/resource_v1.json +++ b/services/search/pkg/opensearch/internal/indexes/resource_v1.json @@ -1,7 +1,23 @@ { "settings": { "number_of_shards": "1", - "number_of_replicas": "1" + "number_of_replicas": "1", + "analysis": { + "analyzer": { + "path_hierarchy": { + "filter": [ + "lowercase" + ], + "tokenizer": "path_hierarchy", + "type": "custom" + } + }, + "tokenizer": { + "path_hierarchy": { + "type": "path_hierarchy" + } + } + } }, "mappings": { "properties": { @@ -19,8 +35,8 @@ "doc_values": false }, "Path": { - "type": "wildcard", - "doc_values": false + "type": "text", + "analyzer": "path_hierarchy" }, "Deleted": { "type": "boolean" diff --git a/services/search/pkg/opensearch/internal/test/os.go b/services/search/pkg/opensearch/internal/test/os.go index b4252fe5ec..677194531e 100644 --- a/services/search/pkg/opensearch/internal/test/os.go +++ b/services/search/pkg/opensearch/internal/test/os.go @@ -4,8 +4,8 @@ import ( "context" "errors" "fmt" + "io" "slices" - "strings" "testing" opensearchgo "github.com/opensearch-project/opensearch-go/v4" @@ -120,10 +120,10 @@ func (tc *TestClient) IndicesDelete(ctx context.Context, indices []string) error } } -func (tc *TestClient) IndicesCreate(ctx context.Context, index string, body string) error { +func (tc *TestClient) IndicesCreate(ctx context.Context, index string, body io.Reader) error { resp, err := tc.c.Indices.Create(ctx, opensearchgoAPI.IndicesCreateReq{ Index: index, - Body: strings.NewReader(body), + Body: body, }) switch { @@ -136,14 +136,14 @@ func (tc *TestClient) IndicesCreate(ctx context.Context, index string, body stri } } -func (tc *TestClient) IndicesCount(ctx context.Context, indices []string, body string) (int, error) { +func (tc *TestClient) IndicesCount(ctx context.Context, indices []string, body io.Reader) (int, error) { if err := tc.IndicesRefresh(ctx, indices, []int{404}); err != nil { return 0, err } resp, err := tc.c.Indices.Count(ctx, &opensearchgoAPI.IndicesCountReq{ Indices: indices, - Body: strings.NewReader(body), + Body: body, }) switch { @@ -154,7 +154,7 @@ func (tc *TestClient) IndicesCount(ctx context.Context, indices []string, body s } } -func (tc *TestClient) DocumentCreate(ctx context.Context, index string, id, body string) error { +func (tc *TestClient) DocumentCreate(ctx context.Context, index, id string, body io.Reader) error { if err := tc.IndicesRefresh(ctx, []string{index}, []int{404}); err != nil { return err } @@ -162,7 +162,7 @@ func (tc *TestClient) DocumentCreate(ctx context.Context, index string, id, body _, err := tc.c.Document.Create(ctx, opensearchgoAPI.DocumentCreateReq{ Index: index, DocumentID: id, - Body: strings.NewReader(body), + Body: body, }) switch { case err != nil: @@ -172,7 +172,7 @@ func (tc *TestClient) DocumentCreate(ctx context.Context, index string, id, body } } -func (tc *TestClient) Update(ctx context.Context, index string, id, body string) error { +func (tc *TestClient) Update(ctx context.Context, index, id string, body io.Reader) error { if err := tc.IndicesRefresh(ctx, []string{index}, []int{404}); err != nil { return err } @@ -180,7 +180,7 @@ func (tc *TestClient) Update(ctx context.Context, index string, id, body string) _, err := tc.c.Update(ctx, opensearchgoAPI.UpdateReq{ Index: index, DocumentID: id, - Body: strings.NewReader(body), + Body: body, }) switch { case err != nil: @@ -190,14 +190,14 @@ func (tc *TestClient) Update(ctx context.Context, index string, id, body string) } } -func (tc *TestClient) Search(ctx context.Context, index string, body string) (opensearchgoAPI.SearchHits, error) { +func (tc *TestClient) Search(ctx context.Context, index string, body io.Reader) (opensearchgoAPI.SearchHits, error) { if err := tc.IndicesRefresh(ctx, []string{index}, []int{404}); err != nil { return opensearchgoAPI.SearchHits{}, err } resp, err := tc.c.Search(ctx, &opensearchgoAPI.SearchReq{ Indices: []string{index}, - Body: strings.NewReader(body), + Body: body, }) if err != nil { return opensearchgoAPI.SearchHits{}, fmt.Errorf("failed to search in index %s: %w", index, err) @@ -219,7 +219,7 @@ func (trc *testRequireClient) IndicesRefresh(indices []string, ignore []int) { require.NoError(trc.t, trc.tc.IndicesRefresh(trc.t.Context(), indices, ignore)) } -func (trc *testRequireClient) IndicesCreate(index string, body string) { +func (trc *testRequireClient) IndicesCreate(index string, body io.Reader) { require.NoError(trc.t, trc.tc.IndicesCreate(trc.t.Context(), index, body)) } @@ -227,7 +227,7 @@ func (trc *testRequireClient) IndicesDelete(indices []string) { require.NoError(trc.t, trc.tc.IndicesDelete(trc.t.Context(), indices)) } -func (trc *testRequireClient) IndicesCount(indices []string, body string, expected int) { +func (trc *testRequireClient) IndicesCount(indices []string, body io.Reader, expected int) { count, err := trc.tc.IndicesCount(trc.t.Context(), indices, body) switch { @@ -239,15 +239,15 @@ func (trc *testRequireClient) IndicesCount(indices []string, body string, expect } } -func (trc *testRequireClient) DocumentCreate(index string, id, body string) { +func (trc *testRequireClient) DocumentCreate(index, id string, body io.Reader) { require.NoError(trc.t, trc.tc.DocumentCreate(trc.t.Context(), index, id, body)) } -func (trc *testRequireClient) Update(index string, id, body string) { +func (trc *testRequireClient) Update(index, id string, body io.Reader) { require.NoError(trc.t, trc.tc.Update(trc.t.Context(), index, id, body)) } -func (trc *testRequireClient) Search(index string, body string) opensearchgoAPI.SearchHits { +func (trc *testRequireClient) Search(index string, body io.Reader) opensearchgoAPI.SearchHits { hits, err := trc.tc.Search(trc.t.Context(), index, body) require.NoError(trc.t, err) return hits diff --git a/services/search/pkg/opensearch/os_dsl.go b/services/search/pkg/opensearch/os_dsl.go index 222f2e0d0c..5af3b9998c 100644 --- a/services/search/pkg/opensearch/os_dsl.go +++ b/services/search/pkg/opensearch/os_dsl.go @@ -107,10 +107,6 @@ func applyBuilders(target map[string]any, key string, bs ...Builder) error { func builderToBoolQuery(b Builder) *BoolQuery { var bq *BoolQuery - if q, ok := b.(*RootQuery); ok { - b = q.query - } - if q, ok := b.(*BoolQuery); !ok { bq = NewBoolQuery().Must(b) } else { diff --git a/services/search/pkg/opensearch/os_dsl_query_root.go b/services/search/pkg/opensearch/os_dsl_query_root.go deleted file mode 100644 index f02087b92f..0000000000 --- a/services/search/pkg/opensearch/os_dsl_query_root.go +++ /dev/null @@ -1,60 +0,0 @@ -package opensearch - -import ( - "encoding/json" -) - -type RootQuery struct { - query Builder - options RootQueryOptions -} - -func NewRootQuery(builder Builder, o ...RootQueryOptions) *RootQuery { - return &RootQuery{query: builder, options: merge(o...)} -} - -func (q *RootQuery) Query(v Builder) *RootQuery { - q.query = v - return q -} - -func (q *RootQuery) Map() (map[string]any, error) { - data, err := convert[map[string]any](q.options) - if err != nil { - return nil, err - } - - if err := applyBuilder(data, "query", q.query); err != nil { - return nil, err - } - - if isEmpty(data) { - return nil, nil - } - - return data, nil -} - -func (q *RootQuery) MarshalJSON() ([]byte, error) { - data, err := q.Map() - if err != nil { - return nil, err - } - - return json.Marshal(data) -} - -func (q *RootQuery) String() string { - b, _ := q.MarshalJSON() - return string(b) -} - -type RootQueryOptions struct { - Highlight *RootQueryHighlight `json:"highlight,omitempty"` -} - -type RootQueryHighlight struct { - PreTags []string `json:"pre_tags,omitempty"` - PostTags []string `json:"post_tags,omitempty"` - Fields map[string]RootQueryHighlight `json:"fields,omitempty"` -} diff --git a/services/search/pkg/opensearch/os_dsl_test.go b/services/search/pkg/opensearch/os_dsl_test.go index c8cccb3022..9bb99173b4 100644 --- a/services/search/pkg/opensearch/os_dsl_test.go +++ b/services/search/pkg/opensearch/os_dsl_test.go @@ -16,11 +16,6 @@ func TestBuilderToBoolQuery(t *testing.T) { Got: opensearch.NewTermQuery[string]("Name").Value("openCloud"), Want: opensearch.NewBoolQuery().Must(opensearch.NewTermQuery[string]("Name").Value("openCloud")), }, - { - Name: "root-query", - Got: opensearch.NewRootQuery(opensearch.NewTermQuery[string]("Name").Value("openCloud")), - Want: opensearch.NewBoolQuery().Must(opensearch.NewTermQuery[string]("Name").Value("openCloud")), - }, { Name: "bool-query", Got: opensearch.NewBoolQuery().Must(opensearch.NewTermQuery[string]("Name").Value("openCloud")), diff --git a/services/search/pkg/opensearch/os_index_test.go b/services/search/pkg/opensearch/os_index_test.go index c1a9b9f4f9..789e3d92d3 100644 --- a/services/search/pkg/opensearch/os_index_test.go +++ b/services/search/pkg/opensearch/os_index_test.go @@ -1,6 +1,7 @@ package opensearch_test import ( + "strings" "testing" "github.com/stretchr/testify/require" @@ -10,7 +11,7 @@ import ( opensearchtest "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test" ) -func TestIndex(t *testing.T) { +func TestIndexManager(t *testing.T) { t.Run("index plausibility", func(t *testing.T) { tests := []opensearchtest.TableTest[opensearch.IndexManager, struct{}]{ { @@ -41,7 +42,7 @@ func TestIndex(t *testing.T) { tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{indexName}) - tc.Require.IndicesCreate(indexName, indexManager.String()) + tc.Require.IndicesCreate(indexName, strings.NewReader(indexManager.String())) require.NoError(t, indexManager.Apply(t.Context(), indexName, tc.Client())) }) @@ -55,7 +56,7 @@ func TestIndex(t *testing.T) { body, err := sjson.Set(indexManager.String(), "settings.number_of_shards", "2") require.NoError(t, err) - tc.Require.IndicesCreate(indexName, body) + tc.Require.IndicesCreate(indexName, strings.NewReader(body)) require.ErrorIs(t, indexManager.Apply(t.Context(), indexName, tc.Client()), opensearch.ErrManualActionRequired) }) diff --git a/services/search/pkg/opensearch/os_request.go b/services/search/pkg/opensearch/os_request.go new file mode 100644 index 0000000000..ef798d0a55 --- /dev/null +++ b/services/search/pkg/opensearch/os_request.go @@ -0,0 +1,124 @@ +package opensearch + +import ( + "bytes" + "encoding/json" + "io" + "strings" + + opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" +) + +type RequestBody[O any] struct { + query Builder + options O +} + +func NewRequestBody[O any](q Builder, o ...O) *RequestBody[O] { + return &RequestBody[O]{query: q, options: merge(o...)} +} + +func (q RequestBody[O]) Map() (map[string]any, error) { + data, err := convert[map[string]any](q.options) + if err != nil { + return nil, err + } + + if err := applyBuilder(data, "query", q.query); err != nil { + return nil, err + } + + if isEmpty(data) { + return nil, nil + } + + return data, nil +} + +func (q RequestBody[O]) MarshalJSON() ([]byte, error) { + data, err := q.Map() + if err != nil { + return nil, err + } + + return json.Marshal(data) +} + +func (q RequestBody[O]) String() string { + b, _ := q.MarshalJSON() + return string(b) +} + +func (q RequestBody[O]) Reader() io.Reader { + return strings.NewReader(q.String()) +} + +//----------------------------------------------------------------------------// + +type HighlightOption struct { + PreTags []string `json:"pre_tags,omitempty"` + PostTags []string `json:"post_tags,omitempty"` + Fields map[string]HighlightOption `json:"fields,omitempty"` +} + +type ScriptOption struct { + Source string `json:"source,omitempty"` + Lang string `json:"lang,omitempty"` + Params map[string]any `json:"params,omitempty"` +} + +//----------------------------------------------------------------------------// + +func BuildSearchReq(req *opensearchgoAPI.SearchReq, q Builder, o ...SearchReqOptions) (*opensearchgoAPI.SearchReq, error) { + body := NewRequestBody(q, o...) + data, err := body.MarshalJSON() + if err != nil { + return nil, err + } + req.Body = bytes.NewReader(data) + return req, nil +} + +type SearchReqOptions struct { + Highlight *HighlightOption `json:"highlight,omitempty"` +} + +//----------------------------------------------------------------------------// + +func BuildDocumentDeleteByQueryReq(req opensearchgoAPI.DocumentDeleteByQueryReq, q Builder) (opensearchgoAPI.DocumentDeleteByQueryReq, error) { + body := NewRequestBody[any](q) + data, err := body.MarshalJSON() + if err != nil { + return req, err + } + req.Body = bytes.NewReader(data) + return req, nil +} + +//----------------------------------------------------------------------------// + +func BuildUpdateByQueryReq(req opensearchgoAPI.UpdateByQueryReq, q Builder, o ...UpdateByQueryReqOptions) (opensearchgoAPI.UpdateByQueryReq, error) { + body := NewRequestBody(q, o...) + data, err := body.MarshalJSON() + if err != nil { + return req, err + } + req.Body = bytes.NewReader(data) + return req, nil +} + +type UpdateByQueryReqOptions struct { + Script *ScriptOption `json:"script,omitempty"` +} + +//----------------------------------------------------------------------------// + +func BuildIndicesCountReq(req *opensearchgoAPI.IndicesCountReq, q Builder) (*opensearchgoAPI.IndicesCountReq, error) { + body := NewRequestBody[any](q) + data, err := body.MarshalJSON() + if err != nil { + return nil, err + } + req.Body = bytes.NewReader(data) + return req, nil +} diff --git a/services/search/pkg/opensearch/os_dsl_query_root_test.go b/services/search/pkg/opensearch/os_request_test.go similarity index 51% rename from services/search/pkg/opensearch/os_dsl_query_root_test.go rename to services/search/pkg/opensearch/os_request_test.go index c88f7c1865..d547472abf 100644 --- a/services/search/pkg/opensearch/os_dsl_query_root_test.go +++ b/services/search/pkg/opensearch/os_request_test.go @@ -1,19 +1,22 @@ package opensearch_test import ( + "io" "testing" + opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch" "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test" ) -func TestRootQuery(t *testing.T) { +func TestRequestBody(t *testing.T) { tests := []opensearchtest.TableTest[opensearch.Builder, map[string]any]{ { Name: "simple", - Got: opensearch.NewRootQuery(opensearch.NewTermQuery[string]("name").Value("tom")), + Got: opensearch.NewRequestBody[any](opensearch.NewTermQuery[string]("name").Value("tom")), Want: map[string]any{ "query": map[string]any{ "term": map[string]any{ @@ -24,20 +27,36 @@ func TestRootQuery(t *testing.T) { }, }, }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + assert.JSONEq(t, opensearchtest.JSONMustMarshal(t, test.Want), opensearchtest.JSONMustMarshal(t, test.Got)) + }) + } +} + +func TestBuildSearchReq(t *testing.T) { + tests := []opensearchtest.TableTest[io.Reader, map[string]any]{ { Name: "highlight", - Got: opensearch.NewRootQuery( - opensearch.NewTermQuery[string]("content").Value("content"), - opensearch.RootQueryOptions{ - Highlight: &opensearch.RootQueryHighlight{ - PreTags: []string{""}, - PostTags: []string{""}, - Fields: map[string]opensearch.RootQueryHighlight{ - "content": {}, + Got: func() io.Reader { + req, _ := opensearch.BuildSearchReq( + &opensearchgoAPI.SearchReq{}, + opensearch.NewTermQuery[string]("content").Value("content"), + opensearch.SearchReqOptions{ + Highlight: &opensearch.HighlightOption{ + PreTags: []string{""}, + PostTags: []string{""}, + Fields: map[string]opensearch.HighlightOption{ + "content": {}, + }, }, }, - }, - ), + ) + + return req.Body + }(), Want: map[string]any{ "query": map[string]any{ "term": map[string]any{ @@ -59,7 +78,9 @@ func TestRootQuery(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { - assert.JSONEq(t, opensearchtest.JSONMustMarshal(t, test.Want), opensearchtest.JSONMustMarshal(t, test.Got)) + body, err := io.ReadAll(test.Got) + require.NoError(t, err) + assert.JSONEq(t, opensearchtest.JSONMustMarshal(t, test.Want), string(body)) }) } }