refactor(search):

- introduce path_hierarchy analyzer and tokenizer
- optimize performance by using the os painless script api to restore, purge and delete documents
This commit is contained in:
fschade
2025-08-07 15:31:22 +02:00
parent a9d21bbb15
commit f3750f32c9
10 changed files with 374 additions and 336 deletions

View File

@@ -4,9 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math"
"path"
"strings"
@@ -22,8 +20,6 @@ import (
"github.com/opencloud-eu/opencloud/services/search/pkg/engine"
)
var ErrNoContainerType = fmt.Errorf("not a container type")
type Engine struct {
index string
client *opensearchgoAPI.Client
@@ -89,40 +85,37 @@ func (e *Engine) Search(ctx context.Context, sir *searchService.SearchIndexReque
)
}
body, err := NewRootQuery(boolQuery, RootQueryOptions{
Highlight: &RootQueryHighlight{
PreTags: []string{"<mark>"},
PostTags: []string{"</mark>"},
Fields: map[string]RootQueryHighlight{
"Content": {},
},
},
}).MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal query: %w", err)
}
searchParams := opensearchgoAPI.SearchParams{}
switch {
case sir.PageSize == -1:
searchParams.Size = conversions.ToPointer(math.MaxInt)
searchParams.Size = conversions.ToPointer(1000)
case sir.PageSize == 0:
searchParams.Size = conversions.ToPointer(200)
default:
searchParams.Size = conversions.ToPointer(int(sir.PageSize))
}
// fixMe: see getDescendants
if *searchParams.Size > 250 {
searchParams.Size = conversions.ToPointer(250)
req, err := BuildSearchReq(&opensearchgoAPI.SearchReq{
Indices: []string{e.index},
Params: searchParams,
},
boolQuery,
SearchReqOptions{
Highlight: &HighlightOption{
PreTags: []string{"<mark>"},
PostTags: []string{"</mark>"},
Fields: map[string]HighlightOption{
"Content": {},
},
},
},
)
if err != nil {
return nil, fmt.Errorf("failed to build search request: %w", err)
}
resp, err := e.client.Search(ctx, &opensearchgoAPI.SearchReq{
Indices: []string{e.index},
Body: bytes.NewReader(body),
Params: searchParams,
})
resp, err := e.client.Search(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to search: %w", err)
}
@@ -174,67 +167,87 @@ func (e *Engine) Upsert(id string, r engine.Resource) error {
}
func (e *Engine) Move(id string, parentID string, target string) error {
return e.updateSelfAndDescendants(id, func(rootResource engine.Resource) *ScriptOption {
return &ScriptOption{
Source: `
if (ctx._source.ID == params.id ) { ctx._source.Name = params.newName; ctx._source.ParentID = params.parentID; }
ctx._source.Path = ctx._source.Path.replace(params.oldPath, params.newPath)
`,
Lang: "painless",
Params: map[string]any{
"id": id,
"parentID": parentID,
"oldPath": rootResource.Path,
"newPath": utils.MakeRelativePath(target),
"newName": path.Base(utils.MakeRelativePath(target)),
},
}
})
}
func (e *Engine) Delete(id string) error {
return e.updateSelfAndDescendants(id, func(_ engine.Resource) *ScriptOption {
return &ScriptOption{
Source: "ctx._source.Deleted = params.deleted",
Lang: "painless",
Params: map[string]any{
"deleted": true,
},
}
})
}
func (e *Engine) Restore(id string) error {
return e.updateSelfAndDescendants(id, func(_ engine.Resource) *ScriptOption {
return &ScriptOption{
Source: "ctx._source.Deleted = params.deleted",
Lang: "painless",
Params: map[string]any{
"deleted": false,
},
}
})
}
func (e *Engine) Purge(id string) error {
resource, err := e.getResource(id)
if err != nil {
return fmt.Errorf("failed to get resource: %w", err)
}
oldPath := resource.Path
resource.Path = utils.MakeRelativePath(target)
resource.Name = path.Base(resource.Path)
resource.ParentID = parentID
if err := e.Upsert(id, resource); err != nil {
return fmt.Errorf("failed to upsert resource: %w", err)
}
descendants, err := e.getDescendants(resource.Type, resource.RootID, oldPath)
if err != nil && !errors.Is(err, ErrNoContainerType) {
return fmt.Errorf("failed to find descendants: %w", err)
}
for _, descendant := range descendants {
descendant.Path = strings.Replace(descendant.Path, oldPath, resource.Path, 1)
if err := e.Upsert(descendant.ID, descendant); err != nil {
return fmt.Errorf("failed to upsert resource: %w", err)
}
}
return nil
}
func (e *Engine) Delete(id string) error {
return e.deleteResource(id, true)
}
func (e *Engine) Restore(id string) error {
return e.deleteResource(id, false)
}
func (e *Engine) Purge(id string) error {
_, err := e.client.Document.Delete(context.TODO(), opensearchgoAPI.DocumentDeleteReq{
Index: e.index,
DocumentID: id,
})
req, err := BuildDocumentDeleteByQueryReq(
opensearchgoAPI.DocumentDeleteByQueryReq{
Indices: []string{e.index},
},
NewTermQuery[string]("Path").Value(resource.Path),
)
if err != nil {
return fmt.Errorf("failed to purge document: %w", err)
return fmt.Errorf("failed to build delete by query request: %w", err)
}
resp, err := e.client.Document.DeleteByQuery(context.TODO(), req)
switch {
case err != nil:
return fmt.Errorf("failed to delete by query: %w", err)
case len(resp.Failures) != 0:
return fmt.Errorf("failed to delete by query, failures: %v", resp.Failures)
}
return nil
}
func (e *Engine) DocCount() (uint64, error) {
body, err := NewRootQuery(
req, err := BuildIndicesCountReq(
&opensearchgoAPI.IndicesCountReq{
Indices: []string{e.index},
},
NewTermQuery[bool]("Deleted").Value(false),
).MarshalJSON()
)
if err != nil {
return 0, fmt.Errorf("failed to marshal query: %w", err)
return 0, fmt.Errorf("failed to build count request: %w", err)
}
resp, err := e.client.Indices.Count(context.TODO(), &opensearchgoAPI.IndicesCountReq{
Indices: []string{e.index},
Body: bytes.NewReader(body),
})
resp, err := e.client.Indices.Count(context.TODO(), req)
if err != nil {
return 0, fmt.Errorf("failed to count documents: %w", err)
}
@@ -242,63 +255,52 @@ func (e *Engine) DocCount() (uint64, error) {
return uint64(resp.Count), nil
}
func (e *Engine) StartBatch(_ int) error {
return nil
}
func (e *Engine) updateSelfAndDescendants(id string, scriptProvider func(engine.Resource) *ScriptOption) error {
if scriptProvider == nil {
return fmt.Errorf("script cannot be nil")
}
func (e *Engine) EndBatch() error {
return nil
}
func (e *Engine) deleteResource(id string, deleted bool) error {
resource, err := e.getResource(id)
if err != nil {
return fmt.Errorf("failed to get resource: %w", err)
}
descendants, err := e.getDescendants(resource.Type, resource.RootID, resource.Path)
if err != nil && !errors.Is(err, ErrNoContainerType) {
return fmt.Errorf("failed to find descendants: %w", err)
}
body, err := json.Marshal(map[string]any{
"doc": map[string]bool{
"Deleted": deleted,
req, err := BuildUpdateByQueryReq(
opensearchgoAPI.UpdateByQueryReq{
Indices: []string{e.index},
},
})
NewTermQuery[string]("Path").Value(resource.Path),
UpdateByQueryReqOptions{
Script: scriptProvider(resource),
},
)
if err != nil {
return fmt.Errorf("failed to marshal body: %w", err)
return fmt.Errorf("failed to build update by query request: %w", err)
}
for _, resource := range append([]engine.Resource{resource}, descendants...) {
if resource.Deleted == deleted {
continue // already marked as the desired state
}
if _, err = e.client.Update(context.TODO(), opensearchgoAPI.UpdateReq{
Index: e.index,
DocumentID: resource.ID,
Body: bytes.NewReader(body),
}); err != nil {
return fmt.Errorf("failed to mark document as deleted: %w", err)
}
resp, err := e.client.UpdateByQuery(context.TODO(), req)
switch {
case err != nil:
return fmt.Errorf("failed to update by query: %w", err)
case len(resp.Failures) != 0:
return fmt.Errorf("failed to update by query, failures: %v", resp.Failures)
}
return nil
}
func (e *Engine) getResource(id string) (engine.Resource, error) {
body, err := NewRootQuery(
req, err := BuildSearchReq(
&opensearchgoAPI.SearchReq{
Indices: []string{e.index},
},
NewIDsQuery([]string{id}),
).MarshalJSON()
)
if err != nil {
return engine.Resource{}, fmt.Errorf("failed to marshal query: %w", err)
return engine.Resource{}, fmt.Errorf("failed to build search request: %w", err)
}
resp, err := e.client.Search(context.TODO(), &opensearchgoAPI.SearchReq{
Indices: []string{e.index},
Body: bytes.NewReader(body),
})
resp, err := e.client.Search(context.TODO(), req)
switch {
case err != nil:
return engine.Resource{}, fmt.Errorf("failed to search for resource: %w", err)
@@ -314,74 +316,10 @@ func (e *Engine) getResource(id string) (engine.Resource, error) {
return resource, nil
}
func (e *Engine) getDescendants(resourceType uint64, rootID, rootPath string) ([]engine.Resource, error) {
switch {
case resourceType != uint64(storageProvider.ResourceType_RESOURCE_TYPE_CONTAINER):
return nil, fmt.Errorf("%w: %d", ErrNoContainerType, resourceType)
case rootID == "":
return nil, fmt.Errorf("rootID cannot be empty")
case rootPath == "":
return nil, fmt.Errorf("rootPath cannot be empty")
}
if !strings.HasSuffix(rootPath, "*") {
rootPath = strings.Join(append(strings.Split(rootPath, "/"), "*"), "/")
}
body, err := NewRootQuery(
NewBoolQuery().Must(
NewTermQuery[string]("RootID").Value(rootID),
NewWildcardQuery("Path").Value(rootPath),
),
).MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal query: %w", err)
}
// unfortunately, we need to use recursion to fetch all descendants because of the paging
// toDo: check the docs to find a better and most important more efficient way to fetch (or update) all descendants
var doSearch func(params opensearchgoAPI.SearchParams) ([]engine.Resource, error)
doSearch = func(params opensearchgoAPI.SearchParams) ([]engine.Resource, error) {
resp, err := e.client.Search(context.TODO(), &opensearchgoAPI.SearchReq{
Indices: []string{e.index},
Body: bytes.NewReader(body),
Params: params,
})
switch {
case err != nil:
return nil, fmt.Errorf("failed to search for document: %w", err)
case resp.Hits.Total.Value == 0 || len(resp.Hits.Hits) == 0:
return nil, nil // no descendants found, fin
}
descendants := make([]engine.Resource, len(resp.Hits.Hits))
for i, hit := range resp.Hits.Hits {
descendant, err := convert[engine.Resource](hit.Source)
if err != nil {
return nil, fmt.Errorf("failed to convert hit source %d: %w", i, err)
}
descendants[i] = descendant
}
if len(descendants) < resp.Hits.Total.Value {
switch params.From {
case nil:
params.From = opensearchgoAPI.ToPointer(len(resp.Hits.Hits))
default:
params.From = opensearchgoAPI.ToPointer(*params.From + len(resp.Hits.Hits))
}
moreDescendants, err := doSearch(params)
if err != nil {
return nil, fmt.Errorf("failed to search for more descendants: %w", err)
}
descendants = append(descendants, moreDescendants...)
}
return descendants, nil
}
return doSearch(opensearchgoAPI.SearchParams{})
func (e *Engine) StartBatch(_ int) error {
return nil // todo: implement batch processing
}
func (e *Engine) EndBatch() error {
return nil // todo: implement batch processing
}

View File

@@ -2,6 +2,7 @@ package opensearch_test
import (
"fmt"
"strings"
"testing"
opensearchgo "github.com/opensearch-project/opensearch-go/v4"
@@ -33,7 +34,7 @@ func TestEngine_Search(t *testing.T) {
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
tc.Require.IndicesCount([]string{indexName}, nil, 0)
defer tc.Require.IndicesDelete([]string{indexName})
@@ -41,8 +42,8 @@ func TestEngine_Search(t *testing.T) {
require.NoError(t, err)
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document)))
tc.Require.IndicesCount([]string{indexName}, nil, 1)
t.Run("most simple search", func(t *testing.T) {
resp, err := backend.Search(t.Context(), &searchService.SearchIndexRequest{
@@ -59,8 +60,8 @@ func TestEngine_Search(t *testing.T) {
deletedDocument.ID = "1$2!4"
deletedDocument.Deleted = true
tc.Require.DocumentCreate(indexName, deletedDocument.ID, opensearchtest.JSONMustMarshal(t, deletedDocument))
tc.Require.IndicesCount([]string{indexName}, "", 2)
tc.Require.DocumentCreate(indexName, deletedDocument.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, deletedDocument)))
tc.Require.IndicesCount([]string{indexName}, nil, 2)
resp, err := backend.Search(t.Context(), &searchService.SearchIndexRequest{
Query: fmt.Sprintf(`"%s"`, document.Name),
@@ -76,7 +77,7 @@ func TestEngine_Upsert(t *testing.T) {
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
tc.Require.IndicesCount([]string{indexName}, nil, 0)
defer tc.Require.IndicesDelete([]string{indexName})
@@ -87,7 +88,7 @@ func TestEngine_Upsert(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
require.NoError(t, backend.Upsert(document.ID, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.IndicesCount([]string{indexName}, nil, 1)
})
}
@@ -95,7 +96,7 @@ func TestEngine_Move(t *testing.T) {
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
tc.Require.IndicesCount([]string{indexName}, nil, 0)
defer tc.Require.IndicesDelete([]string{indexName})
@@ -104,31 +105,25 @@ func TestEngine_Move(t *testing.T) {
t.Run("moves the document to a new path", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document)))
tc.Require.IndicesCount([]string{indexName}, nil, 1)
resources := opensearchtest.SearchHitsMustBeConverted[engine.Resource](t,
tc.Require.Search(
indexName,
opensearch.NewRootQuery(
opensearch.NewIDsQuery([]string{document.ID}),
).String(),
).Hits,
)
body := opensearchtest.JSONMustMarshal(t, map[string]any{
"query": map[string]any{
"ids": map[string]any{
"values": []string{document.ID},
},
},
})
resources := opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits)
require.Len(t, resources, 1)
require.Equal(t, document.Path, resources[0].Path)
document.Path = "./new/path/to/resource"
require.NoError(t, backend.Move(document.ID, document.ParentID, document.Path))
resources = opensearchtest.SearchHitsMustBeConverted[engine.Resource](t,
tc.Require.Search(
indexName,
opensearch.NewRootQuery(
opensearch.NewIDsQuery([]string{document.ID}),
).String(),
).Hits,
)
resources = opensearchtest.SearchHitsMustBeConverted[engine.Resource](t, tc.Require.Search(indexName, strings.NewReader(body)).Hits)
require.Len(t, resources, 1)
require.Equal(t, document.Path, resources[0].Path)
})
@@ -138,7 +133,7 @@ func TestEngine_Delete(t *testing.T) {
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
tc.Require.IndicesCount([]string{indexName}, nil, 0)
defer tc.Require.IndicesDelete([]string{indexName})
@@ -147,17 +142,23 @@ func TestEngine_Delete(t *testing.T) {
t.Run("mark document as deleted", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document)))
tc.Require.IndicesCount([]string{indexName}, nil, 1)
tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery(
opensearch.NewTermQuery[bool]("Deleted").Value(true),
).String(), 0)
body := opensearchtest.JSONMustMarshal(t, map[string]any{
"query": map[string]any{
"term": map[string]any{
"Deleted": map[string]any{
"value": true,
},
},
},
})
tc.Require.IndicesCount([]string{indexName}, strings.NewReader(body), 0)
require.NoError(t, backend.Delete(document.ID))
tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery(
opensearch.NewTermQuery[bool]("Deleted").Value(true),
).String(), 1)
tc.Require.IndicesCount([]string{indexName}, strings.NewReader(body), 1)
})
}
@@ -165,7 +166,7 @@ func TestEngine_Restore(t *testing.T) {
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
tc.Require.IndicesCount([]string{indexName}, nil, 0)
defer tc.Require.IndicesDelete([]string{indexName})
@@ -175,17 +176,23 @@ func TestEngine_Restore(t *testing.T) {
t.Run("mark document as not deleted", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
document.Deleted = true
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document)))
tc.Require.IndicesCount([]string{indexName}, nil, 1)
tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery(
opensearch.NewTermQuery[bool]("Deleted").Value(true),
).String(), 1)
body := opensearchtest.JSONMustMarshal(t, map[string]any{
"query": map[string]any{
"term": map[string]any{
"Deleted": map[string]any{
"value": true,
},
},
},
})
tc.Require.IndicesCount([]string{indexName}, strings.NewReader(body), 1)
require.NoError(t, backend.Restore(document.ID))
tc.Require.IndicesCount([]string{indexName}, opensearch.NewRootQuery(
opensearch.NewTermQuery[bool]("Deleted").Value(true),
).String(), 0)
tc.Require.IndicesCount([]string{indexName}, strings.NewReader(body), 0)
})
}
@@ -193,7 +200,7 @@ func TestEngine_Purge(t *testing.T) {
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
tc.Require.IndicesCount([]string{indexName}, nil, 0)
defer tc.Require.IndicesDelete([]string{indexName})
@@ -202,12 +209,12 @@ func TestEngine_Purge(t *testing.T) {
t.Run("purge with full document", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document)))
tc.Require.IndicesCount([]string{indexName}, nil, 1)
require.NoError(t, backend.Purge(document.ID))
tc.Require.IndicesCount([]string{indexName}, "", 0)
tc.Require.IndicesCount([]string{indexName}, nil, 0)
})
}
@@ -215,7 +222,7 @@ func TestEngine_DocCount(t *testing.T) {
indexName := "opencloud-test-resource"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCount([]string{indexName}, "", 0)
tc.Require.IndicesCount([]string{indexName}, nil, 0)
defer tc.Require.IndicesDelete([]string{indexName})
@@ -224,20 +231,20 @@ func TestEngine_DocCount(t *testing.T) {
t.Run("ignore deleted documents", func(t *testing.T) {
document := opensearchtest.Testdata.Resources.File
tc.Require.DocumentCreate(indexName, document.ID, opensearchtest.JSONMustMarshal(t, document))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.DocumentCreate(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, document)))
tc.Require.IndicesCount([]string{indexName}, nil, 1)
count, err := backend.DocCount()
require.NoError(t, err)
require.Equal(t, uint64(1), count)
tc.Require.Update(indexName, document.ID, opensearchtest.JSONMustMarshal(t, map[string]any{
tc.Require.Update(indexName, document.ID, strings.NewReader(opensearchtest.JSONMustMarshal(t, map[string]any{
"doc": map[string]any{
"Deleted": true,
},
}))
})))
tc.Require.IndicesCount([]string{indexName}, "", 1)
tc.Require.IndicesCount([]string{indexName}, nil, 1)
count, err = backend.DocCount()
require.NoError(t, err)

View File

@@ -1,7 +1,23 @@
{
"settings": {
"number_of_shards": "1",
"number_of_replicas": "1"
"number_of_replicas": "1",
"analysis": {
"analyzer": {
"path_hierarchy": {
"filter": [
"lowercase"
],
"tokenizer": "path_hierarchy",
"type": "custom"
}
},
"tokenizer": {
"path_hierarchy": {
"type": "path_hierarchy"
}
}
}
},
"mappings": {
"properties": {
@@ -19,8 +35,8 @@
"doc_values": false
},
"Path": {
"type": "wildcard",
"doc_values": false
"type": "text",
"analyzer": "path_hierarchy"
},
"Deleted": {
"type": "boolean"

View File

@@ -4,8 +4,8 @@ import (
"context"
"errors"
"fmt"
"io"
"slices"
"strings"
"testing"
opensearchgo "github.com/opensearch-project/opensearch-go/v4"
@@ -120,10 +120,10 @@ func (tc *TestClient) IndicesDelete(ctx context.Context, indices []string) error
}
}
func (tc *TestClient) IndicesCreate(ctx context.Context, index string, body string) error {
func (tc *TestClient) IndicesCreate(ctx context.Context, index string, body io.Reader) error {
resp, err := tc.c.Indices.Create(ctx, opensearchgoAPI.IndicesCreateReq{
Index: index,
Body: strings.NewReader(body),
Body: body,
})
switch {
@@ -136,14 +136,14 @@ func (tc *TestClient) IndicesCreate(ctx context.Context, index string, body stri
}
}
func (tc *TestClient) IndicesCount(ctx context.Context, indices []string, body string) (int, error) {
func (tc *TestClient) IndicesCount(ctx context.Context, indices []string, body io.Reader) (int, error) {
if err := tc.IndicesRefresh(ctx, indices, []int{404}); err != nil {
return 0, err
}
resp, err := tc.c.Indices.Count(ctx, &opensearchgoAPI.IndicesCountReq{
Indices: indices,
Body: strings.NewReader(body),
Body: body,
})
switch {
@@ -154,7 +154,7 @@ func (tc *TestClient) IndicesCount(ctx context.Context, indices []string, body s
}
}
func (tc *TestClient) DocumentCreate(ctx context.Context, index string, id, body string) error {
func (tc *TestClient) DocumentCreate(ctx context.Context, index, id string, body io.Reader) error {
if err := tc.IndicesRefresh(ctx, []string{index}, []int{404}); err != nil {
return err
}
@@ -162,7 +162,7 @@ func (tc *TestClient) DocumentCreate(ctx context.Context, index string, id, body
_, err := tc.c.Document.Create(ctx, opensearchgoAPI.DocumentCreateReq{
Index: index,
DocumentID: id,
Body: strings.NewReader(body),
Body: body,
})
switch {
case err != nil:
@@ -172,7 +172,7 @@ func (tc *TestClient) DocumentCreate(ctx context.Context, index string, id, body
}
}
func (tc *TestClient) Update(ctx context.Context, index string, id, body string) error {
func (tc *TestClient) Update(ctx context.Context, index, id string, body io.Reader) error {
if err := tc.IndicesRefresh(ctx, []string{index}, []int{404}); err != nil {
return err
}
@@ -180,7 +180,7 @@ func (tc *TestClient) Update(ctx context.Context, index string, id, body string)
_, err := tc.c.Update(ctx, opensearchgoAPI.UpdateReq{
Index: index,
DocumentID: id,
Body: strings.NewReader(body),
Body: body,
})
switch {
case err != nil:
@@ -190,14 +190,14 @@ func (tc *TestClient) Update(ctx context.Context, index string, id, body string)
}
}
func (tc *TestClient) Search(ctx context.Context, index string, body string) (opensearchgoAPI.SearchHits, error) {
func (tc *TestClient) Search(ctx context.Context, index string, body io.Reader) (opensearchgoAPI.SearchHits, error) {
if err := tc.IndicesRefresh(ctx, []string{index}, []int{404}); err != nil {
return opensearchgoAPI.SearchHits{}, err
}
resp, err := tc.c.Search(ctx, &opensearchgoAPI.SearchReq{
Indices: []string{index},
Body: strings.NewReader(body),
Body: body,
})
if err != nil {
return opensearchgoAPI.SearchHits{}, fmt.Errorf("failed to search in index %s: %w", index, err)
@@ -219,7 +219,7 @@ func (trc *testRequireClient) IndicesRefresh(indices []string, ignore []int) {
require.NoError(trc.t, trc.tc.IndicesRefresh(trc.t.Context(), indices, ignore))
}
func (trc *testRequireClient) IndicesCreate(index string, body string) {
func (trc *testRequireClient) IndicesCreate(index string, body io.Reader) {
require.NoError(trc.t, trc.tc.IndicesCreate(trc.t.Context(), index, body))
}
@@ -227,7 +227,7 @@ func (trc *testRequireClient) IndicesDelete(indices []string) {
require.NoError(trc.t, trc.tc.IndicesDelete(trc.t.Context(), indices))
}
func (trc *testRequireClient) IndicesCount(indices []string, body string, expected int) {
func (trc *testRequireClient) IndicesCount(indices []string, body io.Reader, expected int) {
count, err := trc.tc.IndicesCount(trc.t.Context(), indices, body)
switch {
@@ -239,15 +239,15 @@ func (trc *testRequireClient) IndicesCount(indices []string, body string, expect
}
}
func (trc *testRequireClient) DocumentCreate(index string, id, body string) {
func (trc *testRequireClient) DocumentCreate(index, id string, body io.Reader) {
require.NoError(trc.t, trc.tc.DocumentCreate(trc.t.Context(), index, id, body))
}
func (trc *testRequireClient) Update(index string, id, body string) {
func (trc *testRequireClient) Update(index, id string, body io.Reader) {
require.NoError(trc.t, trc.tc.Update(trc.t.Context(), index, id, body))
}
func (trc *testRequireClient) Search(index string, body string) opensearchgoAPI.SearchHits {
func (trc *testRequireClient) Search(index string, body io.Reader) opensearchgoAPI.SearchHits {
hits, err := trc.tc.Search(trc.t.Context(), index, body)
require.NoError(trc.t, err)
return hits

View File

@@ -107,10 +107,6 @@ func applyBuilders(target map[string]any, key string, bs ...Builder) error {
func builderToBoolQuery(b Builder) *BoolQuery {
var bq *BoolQuery
if q, ok := b.(*RootQuery); ok {
b = q.query
}
if q, ok := b.(*BoolQuery); !ok {
bq = NewBoolQuery().Must(b)
} else {

View File

@@ -1,60 +0,0 @@
package opensearch
import (
"encoding/json"
)
type RootQuery struct {
query Builder
options RootQueryOptions
}
func NewRootQuery(builder Builder, o ...RootQueryOptions) *RootQuery {
return &RootQuery{query: builder, options: merge(o...)}
}
func (q *RootQuery) Query(v Builder) *RootQuery {
q.query = v
return q
}
func (q *RootQuery) Map() (map[string]any, error) {
data, err := convert[map[string]any](q.options)
if err != nil {
return nil, err
}
if err := applyBuilder(data, "query", q.query); err != nil {
return nil, err
}
if isEmpty(data) {
return nil, nil
}
return data, nil
}
func (q *RootQuery) MarshalJSON() ([]byte, error) {
data, err := q.Map()
if err != nil {
return nil, err
}
return json.Marshal(data)
}
func (q *RootQuery) String() string {
b, _ := q.MarshalJSON()
return string(b)
}
type RootQueryOptions struct {
Highlight *RootQueryHighlight `json:"highlight,omitempty"`
}
type RootQueryHighlight struct {
PreTags []string `json:"pre_tags,omitempty"`
PostTags []string `json:"post_tags,omitempty"`
Fields map[string]RootQueryHighlight `json:"fields,omitempty"`
}

View File

@@ -16,11 +16,6 @@ func TestBuilderToBoolQuery(t *testing.T) {
Got: opensearch.NewTermQuery[string]("Name").Value("openCloud"),
Want: opensearch.NewBoolQuery().Must(opensearch.NewTermQuery[string]("Name").Value("openCloud")),
},
{
Name: "root-query",
Got: opensearch.NewRootQuery(opensearch.NewTermQuery[string]("Name").Value("openCloud")),
Want: opensearch.NewBoolQuery().Must(opensearch.NewTermQuery[string]("Name").Value("openCloud")),
},
{
Name: "bool-query",
Got: opensearch.NewBoolQuery().Must(opensearch.NewTermQuery[string]("Name").Value("openCloud")),

View File

@@ -1,6 +1,7 @@
package opensearch_test
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
@@ -10,7 +11,7 @@ import (
opensearchtest "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test"
)
func TestIndex(t *testing.T) {
func TestIndexManager(t *testing.T) {
t.Run("index plausibility", func(t *testing.T) {
tests := []opensearchtest.TableTest[opensearch.IndexManager, struct{}]{
{
@@ -41,7 +42,7 @@ func TestIndex(t *testing.T) {
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{indexName})
tc.Require.IndicesCreate(indexName, indexManager.String())
tc.Require.IndicesCreate(indexName, strings.NewReader(indexManager.String()))
require.NoError(t, indexManager.Apply(t.Context(), indexName, tc.Client()))
})
@@ -55,7 +56,7 @@ func TestIndex(t *testing.T) {
body, err := sjson.Set(indexManager.String(), "settings.number_of_shards", "2")
require.NoError(t, err)
tc.Require.IndicesCreate(indexName, body)
tc.Require.IndicesCreate(indexName, strings.NewReader(body))
require.ErrorIs(t, indexManager.Apply(t.Context(), indexName, tc.Client()), opensearch.ErrManualActionRequired)
})

View File

@@ -0,0 +1,124 @@
package opensearch
import (
"bytes"
"encoding/json"
"io"
"strings"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
)
type RequestBody[O any] struct {
query Builder
options O
}
func NewRequestBody[O any](q Builder, o ...O) *RequestBody[O] {
return &RequestBody[O]{query: q, options: merge(o...)}
}
func (q RequestBody[O]) Map() (map[string]any, error) {
data, err := convert[map[string]any](q.options)
if err != nil {
return nil, err
}
if err := applyBuilder(data, "query", q.query); err != nil {
return nil, err
}
if isEmpty(data) {
return nil, nil
}
return data, nil
}
func (q RequestBody[O]) MarshalJSON() ([]byte, error) {
data, err := q.Map()
if err != nil {
return nil, err
}
return json.Marshal(data)
}
func (q RequestBody[O]) String() string {
b, _ := q.MarshalJSON()
return string(b)
}
func (q RequestBody[O]) Reader() io.Reader {
return strings.NewReader(q.String())
}
//----------------------------------------------------------------------------//
type HighlightOption struct {
PreTags []string `json:"pre_tags,omitempty"`
PostTags []string `json:"post_tags,omitempty"`
Fields map[string]HighlightOption `json:"fields,omitempty"`
}
type ScriptOption struct {
Source string `json:"source,omitempty"`
Lang string `json:"lang,omitempty"`
Params map[string]any `json:"params,omitempty"`
}
//----------------------------------------------------------------------------//
func BuildSearchReq(req *opensearchgoAPI.SearchReq, q Builder, o ...SearchReqOptions) (*opensearchgoAPI.SearchReq, error) {
body := NewRequestBody(q, o...)
data, err := body.MarshalJSON()
if err != nil {
return nil, err
}
req.Body = bytes.NewReader(data)
return req, nil
}
type SearchReqOptions struct {
Highlight *HighlightOption `json:"highlight,omitempty"`
}
//----------------------------------------------------------------------------//
func BuildDocumentDeleteByQueryReq(req opensearchgoAPI.DocumentDeleteByQueryReq, q Builder) (opensearchgoAPI.DocumentDeleteByQueryReq, error) {
body := NewRequestBody[any](q)
data, err := body.MarshalJSON()
if err != nil {
return req, err
}
req.Body = bytes.NewReader(data)
return req, nil
}
//----------------------------------------------------------------------------//
func BuildUpdateByQueryReq(req opensearchgoAPI.UpdateByQueryReq, q Builder, o ...UpdateByQueryReqOptions) (opensearchgoAPI.UpdateByQueryReq, error) {
body := NewRequestBody(q, o...)
data, err := body.MarshalJSON()
if err != nil {
return req, err
}
req.Body = bytes.NewReader(data)
return req, nil
}
type UpdateByQueryReqOptions struct {
Script *ScriptOption `json:"script,omitempty"`
}
//----------------------------------------------------------------------------//
func BuildIndicesCountReq(req *opensearchgoAPI.IndicesCountReq, q Builder) (*opensearchgoAPI.IndicesCountReq, error) {
body := NewRequestBody[any](q)
data, err := body.MarshalJSON()
if err != nil {
return nil, err
}
req.Body = bytes.NewReader(data)
return req, nil
}

View File

@@ -1,19 +1,22 @@
package opensearch_test
import (
"io"
"testing"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test"
)
func TestRootQuery(t *testing.T) {
func TestRequestBody(t *testing.T) {
tests := []opensearchtest.TableTest[opensearch.Builder, map[string]any]{
{
Name: "simple",
Got: opensearch.NewRootQuery(opensearch.NewTermQuery[string]("name").Value("tom")),
Got: opensearch.NewRequestBody[any](opensearch.NewTermQuery[string]("name").Value("tom")),
Want: map[string]any{
"query": map[string]any{
"term": map[string]any{
@@ -24,20 +27,36 @@ func TestRootQuery(t *testing.T) {
},
},
},
}
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
assert.JSONEq(t, opensearchtest.JSONMustMarshal(t, test.Want), opensearchtest.JSONMustMarshal(t, test.Got))
})
}
}
func TestBuildSearchReq(t *testing.T) {
tests := []opensearchtest.TableTest[io.Reader, map[string]any]{
{
Name: "highlight",
Got: opensearch.NewRootQuery(
opensearch.NewTermQuery[string]("content").Value("content"),
opensearch.RootQueryOptions{
Highlight: &opensearch.RootQueryHighlight{
PreTags: []string{"<b>"},
PostTags: []string{"</b>"},
Fields: map[string]opensearch.RootQueryHighlight{
"content": {},
Got: func() io.Reader {
req, _ := opensearch.BuildSearchReq(
&opensearchgoAPI.SearchReq{},
opensearch.NewTermQuery[string]("content").Value("content"),
opensearch.SearchReqOptions{
Highlight: &opensearch.HighlightOption{
PreTags: []string{"<b>"},
PostTags: []string{"</b>"},
Fields: map[string]opensearch.HighlightOption{
"content": {},
},
},
},
},
),
)
return req.Body
}(),
Want: map[string]any{
"query": map[string]any{
"term": map[string]any{
@@ -59,7 +78,9 @@ func TestRootQuery(t *testing.T) {
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
assert.JSONEq(t, opensearchtest.JSONMustMarshal(t, test.Want), opensearchtest.JSONMustMarshal(t, test.Got))
body, err := io.ReadAll(test.Got)
require.NoError(t, err)
assert.JSONEq(t, opensearchtest.JSONMustMarshal(t, test.Want), string(body))
})
}
}